A Day in the Life of a ClickHouse Query

RECORDED: Thursday, Feb 10 | 10 am PT
SPEAKERS: Robert Hodges, CEO, Altinity

Why do queries run out of memory? How can I make my queries even faster? How should I size ClickHouse nodes for the best cost-efficiency? The key to these questions and many others is knowing what happens inside ClickHouse when a query runs. This webinar is a gentle introduction to ClickHouse internals, focusing on topics that will help your applications run faster and more efficiently. We’ll discuss the basic flow of query execution, dig into how ClickHouse handles aggregation and joins, and show you how ClickHouse distributes processing within a single CPU as well as across many nodes in the network. After attending this webinar you’ll understand how to open up the black box and see what the parts are doing.

Webinar Slides

Here are the slides:

Webinar Transcript:

I’d like to welcome everyone to our webinar A Day in the Life of a ClickHouse Query. We’re going to be giving an introduction to ClickHouse internals for application developers, providing you with basic information about how things work inside so that you can make your queries both faster and more efficient.

My name is Robert Hodges. I’ll be doing the presentation today and I’m backed by a bunch of people in Altinity Engineering. I’m going to do some introductions and then get into the meat of the talk. I just want to give you some upfront information that will help you enjoy this talk more. 

First, it is being recorded. We will be posting the recording after this talk is done. If you signed up, you’ll get a link for it and you can go find it yourself.

We’ll also include the slides so all that information including samples and things like that you might want to copy and paste will be available. Second, if you have questions, feel free to put them either in the chat or the Q&A box which is provided by Zoom. If the questions are relevant to what I’m talking about, I’ll take them as I’m going along; otherwise, we have time at the end of the talk and we can dig into things. 

Let’s go ahead and do some deeper introductions. So once again, my name is Robert Hodges and I am a database geek. I first started working with databases in 1983, I had a couple of breaks doing mathematical programming and virtualization at places like VMware. In that time, I worked with about 20 different databases so that’s the main thing I’ve done in my career. 

My day job is I’m CEO of Altinity. Altinity Engineering is a big participant in this talk. It would not be possible to put this information together without all the help from the folks that we (Altinity) have who work on ClickHouse, support our customers that are on ClickHouse, and then build things on top of it so that team has centuries of experience in database and applications. I’d like to particularly call out Tatiana Saltikova and Denis Zhuravlev for their help in preparing this talk.

Finally, Altinity is a company — we do ClickHouse support and services including Altinity.Cloud. We are focused on ClickHouse and building applications on top of ClickHouse. We’re also the authors of the Kubernetes Operator for ClickHouse which many of you on this call may already use. So that’s us. A little bit about you — the audience here — we’re assuming that you are developers, you’re using ClickHouse, and what you’d like to do is learn a little bit more about ClickHouse — how it works underneath so that you can actually use it more effectively. So let’s go ahead and dive in!

ClickHouse Basics

3:04 ClickHouse Basics

We’ll start with the basics and then we’ll get into the internals so let’s just introduce ClickHouse in case there’s anybody on this call who has not heard it before.

3:15 What is ClickHouse?

ClickHouse is an SQL data warehouse. I like to think of it as the first open-source SQL data warehouse that can play with the big kids. It’s really outstanding. It has a number of outstanding features: Of course, it understands SQL — that’s the winning language among data warehouses. It’s extremely portable — runs anywhere Linux does — so bare metal to the cloud and as well as Kubernetes. It has a shared-nothing architecture, which is a traditional data warehouse architecture where you have a set of nodes with attached storage connected by a network. 

It is moving of course to a decoupled compute and storage model but that’s something that’s in progress, and it’s going to play out over time like just about all data warehouses or the vast majority of them. 

It stores data in columns. Row databases are easy to update; it’s easy to get individual rows and so they’re very efficient for things that do transaction processing like running e-commerce sites. Column store is very well optimized for reads and you’ll see examples of that. Speaking of reads, ClickHouse is able to do both parallel as well as vectorized reads. 

By parallel, we mean spreading it across multiple CPUs, spreading across multiple nodes connected by a network. Vectorization is when we can take arrays of data and push them onto the CPU and process them using CMD instructions so single instruction multiple data which allows us to take advantage of concurrency inside the CPU itself. 

ClickHouse will run on everything from a laptop all the way to huge clusters containing tens of Petabytes and hundreds of nodes. It’s open source so this is a really big permanent differentiating feature. It’s Apache 2.0 — you can use it anywhere you want, for any business purpose. And as a result of these features and a huge community of contributors in 2021, there are probably at least 400 unique contributors posting PRS to ClickHouse. 

It’s turned into the core engine for real-time analytics, so reading data from event streams, ELT processes, and object storage, and then pushing them out to different ways that people like to consume and doing it extremely fast. We won’t do much more marketing on ClickHouse.

Understand the ClickHouse Engine

5:57 ClickHouse Engine

What we’re going to do is focus on how it works inside and the reason for that is simple: like a jet engine, if you want to make the jet engine go faster or get more performance out of it, you need to understand how the engine works. Same thing with ClickHouse. One of the things that’s really cool about ClickHouse is it has a relatively simple execution model. 

If you used a database like Oracle you know that there’s a very complex query planning engine. There is a great deal of complexity built into that — that occurs inside the black box and you don’t really know what it’s doing or how it’s doing it.

That’s not the case with ClickHouse, there’s really no magic in what we’re doing, and in fact, a developer who understands what threads are and what a hash table is has a lot of the knowledge that you need to have to understand how ClickHouse works and to use it better. Then what we can do is take that knowledge and apply it to the queries that we’re executing and make them faster and more efficient. That’s exactly what we’re focused on here by introducing the plumbing. We’ll then show you how things work and tie it back to how that makes things faster.

7:06 What Happens When You Insert Data?

So this talk is about queries but in order to have a query — by which I mean a SQL select statement — it’s very helpful to actually have some data to select. We’re going to start. Our first query that we’re going to start on is inserting data.

7:28 Creating a Table

So to insert data, we need a table: let’s create one. Probably most people on this call have seen this or executed these before. This is an example of a table in ClickHouse; it has columns with different data types. It has an engine called MergeTree (that’s the standard engine for dealing with large amounts of data). 

It has a couple of interesting things which, if you’re familiar with other databases, will be new. It has what’s known as a partition key so it’s a way of dividing the contents of the table up into what we call parts. We’ll dig into that more deeply but the basic idea is — this table is going to be pretty big, and we need to give ClickHouse a hint about how to break the data into pieces in a reasonable way. 

For example, if you’re doing time series, you would partition by month — that’s what this one is doing. We have this function which turns your date into a year and month and then we have a bunch of parts — each of which essentially contains data for an individual month and then it becomes pretty simple.

For example, after 12 months are lapsed, we can get rid of the last month very easily. It also helps us with queries. We also have ordering. It’s very important in data warehouses to pick an ordering of the data. It’s kind of like using a clustered index in a traditional database. We’ll talk a little bit about that as well so this is your standard table definition. Let’s see how we insert data.

9:00 Inserting Data

Here’s a typical insert statement and for those of you who are data warehouse experts or have used them for a long time, I just want to assure you, this is not the way most people actually insert data. But it’s an example. We have an insert statement. We have the name of the table and then we have a couple rows that we’re inserting here. We’ll use this as an example because what we want to do is show how the data actually gets into ClickHouse. Then we can start to think about how we can make it faster.

9:35 How Does ClickHouse Actually Process an Insert?

Like most databases across the world, your query is going to arrive in the database coming in over the network. We’re going to do some parsing to figure out what it says. We’re going to do some planning and then we’re going to actually execute it. 

In this case, we’re going to load data and then we’re going to respond with a result. This looks kind of slow because I was inserting it over the internet. In fact, this only takes a tiny fraction of a section to execute. What’s interesting is when we’re loading the data, what’s actually happening inside ClickHouse? 

What we do is we take these two rows which you can see over on the left side with a little bit of data left out we’re going to pull them into ClickHouse. We’re going to create one of these parts — one of these divisions of the table — we’re going to construct it in memory. Moreover, we’re going to build the index on it. We’ll show you that in a little bit and we’re also going to sort it. This is all going to be done in memory, and then as soon as that’s done, we’ll be able to flush that out to the files that actually represent this part in storage.

The key thing to note here is that whenever you have a block of data that you’re inserting, that data is going to go live in memory for a while [until it gets properly formatted. Then it’s going to get flushed out to storage. This is a pretty simple query. 

Okay, let’s say we were doing a really large insert where maybe we’re throwing in a few hundred million rows of data. That’s of course something we can also do.

Improving ClickHouse Efficiency

11:22 How Can We Make This More Efficient? Parallelize!

When we begin to insert large amounts of data, we want to parallelize this process because we don’t want to wait for a single thread to do all this work and then flush it out of the storage. We’d like to get many threads to do it. ClickHouse has a setting — it’s called max insert threads. This tells ClickHouse: okay when you’re doing an insert, here’s how many threads you’re allowed to use at the same time. 

In this case, we’re setting it to four threads and then we’re processing an insert command. What this insert is doing is inserting data into a table called on-time test (this is a table of airline flight data). It is going to select that data from another table. There are other ways we could do this where we could, of course, be inserting it from CSV loaded from outside or we could be reading it from S3, but in both cases, the processing essentially is going to be the same. 

So we’re going to go ahead and select from this table called on-time. We’re going to limit it just because I wanted it to complete in a reasonable amount of time so that I could try it in different ways. So what happens is: it comes in. We do the parsing. We do some planning to figure out how to execute it. Then what we’re going to do is we’re going to start four threads and they’re going to just go crunching along, reading different pieces of this data as it’s selected out of the source table. 

As that happens, each of those threads is going to be building parts in memory. As they’re complete, it will then flush them out to storage. These operations could in fact create, over the course of the insert, many parts (perhaps even hundreds of parts). 

In this particular case, each thread will be working on a part and as that part gets done, it will get flushed to storage so the amount of memory that gets used here is now multiplied by four because each of these threads has to have enough space in memory so they can build this part and then send it out to storage. That’s basically the process that we use.

A question did just pop up: does the ClickHouse client do some query processing if your ClickHouse client is one of the ways that you can load data? And the answer to that is yes. In this particular case, this is running entirely inside the server so the ClickHouse client won’t help at all. It’s all done inside the server. But if you were reading, for example, a CSV file that you were just piping in, yes there’s some parsing that’s actually done there. And the ClickHouse client will help do some of the formatting of the data upfront.

14:07 Parallelism Affects Speed and Memory Usage

This is the process that we follow and no matter how the insert is processed, it’s basically going to work this way. So what’s kind of interesting — now we have a case where because we’re putting the parts in memory and because we need available RAM to be able to do this, we now have kind of a trade-off. By adding, we can parallelize things and it will speed things up, but at the same time, it’s going to consume more memory.

Here’s an example of that exact same query. What I did was I tried it in different ways: just setting this max and the insert threads to different values of one, two, and four. What I can see is if I measure the insert time to completion with one thread, in my particular case this was not a super powerful server, it’s going to take 33 seconds to load those rows. I think there are about 16 million of them. If I have two threads, it drops by half. If I have four threads, it drops by half again. 

Then, of course, adding eight threads which I did in the end, didn’t really help. It just couldn’t parallelize beyond that. What we do see as the speed drops by half, the RAM doubles. It’s actually kind of a nice relationship that we can see — that the increase in RAM roughly correlates to the parallelization. 

And by the way, there are some interesting questions that are coming up that I’m going to defer to the end because they require extended discussions and I want to make sure that we get this covered. So you can see this relationship in the graph.

15:55 ClickHouse System Tables

This is something that you can do for yourself and in fact, one of the questions is ‘How did I find that information out so I know what’s going on inside? Can we get some statistics so we can actually see the trade-off between these things and then compare them scientifically?’ The answer is absolutely yes.

ClickHouse has probably the best system tables of any database I’ve ever worked with. They’re really outstanding and they’re very easy to understand. They have a wealth of interesting information. One of the most useful system tables for this kind of work is called the query log.

16:37 The Query Log

The query log is something you have to enable. There’s a setting inside the config.xml. You can go ahead and turn the query log on. What it’ll then do is every time a query runs, it will insert it into a table called system.query log by default. This is an example. There are different kinds of events associated with a query. 

For example, start and finish. There’s an interesting field here: it is an initial query. That’s a very useful field. What that’s saying is that the first query that landed on ClickHouse or is it a query that was perhaps sent to us from another node doing a distributed select. We’ll talk about that a little later but that’s super useful (to be able to tell those apart). 

We can see how long it lasted. We can see what it read. And then we can see the memory and the maximum memory that was used at any time. Of course, we see the query itself so with this simple query, you can just run this and you can quickly find out — What did I just execute? How many resources did it use?

There are many more columns here. I’m just picking the ones that were most useful for this talk. So this is a query that you’ll want to run constantly to look into what your queries are doing. 

So another question: What happens when you INSERT?

What Happens When You INSERT?

18:04 What’s Going on Down There When We INSERT?

Let’s dig a little bit more deeply into what’s going on down inside ClickHouse. We talked about the tables, we showed the table definition, we showed the partition key, and what we want to do now is look at what are the results in storage. As I mentioned, we partition the data and we build parts for every partition. 

[In this case], we’re partitioned by months and we’re going to end up with one or more parts which are blocks of data that are indexed. 

There’s a thing we call a sparse index. By default, ClickHouse has a notion of a primary key. It’s by default the same as the sort order. What you can do is whenever possible, ClickHouse will use that key to locate bits of data to read when it’s processing a SELECT. The columns here can be sorted.  

In this particular example, they were sorted by carrier origin and flight date so the index matches that. Then if you go look in the columns, the values which belong to each row, will be sorted in this order. This thing says the rows and the part belong to the same year, that was in my example, but actually, they were months but you get the idea.

19:26 Sparse Index

So the sparse index is interesting because it truly is sparse. Its goal is to be able to fit into memory. We can’t make it very big and what ClickHouse does by default (if you don’t tell it anything else) with a MergeTree table — it will create an entry for every 8192 rows. So when you’re locating things using the primary key index, if you look for a single row, you will by default read data for 8000 rows. Fortunately, ClickHouse minimizes I/O as much as possible but it’s not as efficient as reading a single row in a database like MySQL.

ClickHouse MergeTree

20:10 Understanding What’s in The MergeTree Part

We can see this primary key index and you can see these different entries — we call these ~8000 rows or so a granule, so that’s a chunk of data that we have to index. Then for each column, we have separate files by default. There’s a mark column dot .mrk — this is just a long array that says ‘Hey! For every entry in the primary key index, just show me where the actual binary data is stored’, and so that mark has a pointer into a compressed block. 

The compressed block generally contains multiple granules of data but that tells ClickHouse ‘Hey if I’m looking at the column called airline, I’m looking to read a particular row, I can roughly find where the data is. And then I have to decompress it and read it.’

So that’s basically what’s happening underneath the covers. And in fact, as you see this, you can recognize that if you’re actually reading data, it’s very efficient because you’re going to just read the columns that you need, and then only those parts that are relevant. If you’re writing data, it’s much more complicated. We could end up creating hundreds of files just to add a couple of rows. We’ll talk about that in a minute.

21:32 Why MergeTree? Because It Merges!

One final thing is why is this called MergeTree. Well, the answer is because it merges. When you write the data, we’re going to end up creating one or more parts each time you insert data. But they might not be very big. 

For example, in my first example, there were only two rows. Well, that’s not very efficient because when we’re reading, we’d actually like to open a small number of files and then have nice long run links. This is so we get high compression and we don’t have to jump around to different places in storage and keep a bunch of file descriptors open. 

What ClickHouse does in the background is it rewrites the initial parts that you put in to create bigger parts. So you can count on the fact that if you do inserts and maybe a million rows per part, what will happen is, ClickHouse will, at its own convenience, look at the parts and say okay we can now merge these together.

So what it will do is take all the data. It will construct a new part that contains all of them together, and it will deactivate the old ones. This happens silently. It’s completely transactional, you don’t notice it happening other than the fact that things speed up.

One other thing that’s interesting and different from other databases, updates and deletes also require rewriting the parts. We won’t talk more about that but it’s just another example of a difference between how these databases work differently from a row-oriented database like MySQL or PostgreSQL.

23:22 Bigger Parts Are More Efficient

To complete this discussion, bigger parts are more efficient: fewer file descriptors, longer run links, and better locality of data (assuming you’ve gotten source sort orders and index are correct). So what we typically recommend is that you want to pick a PARTITION BY key when you’re starting out that gives you nice fat partitions. They can be up to 300 Gigs; these things can be really large.

Another thing you should think about is trying to keep this less than a thousand total parts per table on a node. If you’re getting more parts than that, it’s fine to spread them across a bunch of nodes. ClickHouse is perfectly happy with that but having too many parts is going to slow down the reads. Now when you’re typically starting out, you have no idea how to make this choice, so here’s just a simple idea. 

Most data that we see is oriented by time; it has time as a dimension. So just partition it by month; that’s a good one, it tends to work out well for a lot of data sets. And then you can test it out. If you find that doesn’t work, you can try something else.

Another thing that’s really important is because of this merging process, we want to insert relatively large blocks of data because, in ClickHouse we’ll have less work to do to merge them into their final form. So if you have a ClickHouse server and you’re loading a lot of data, if you just load very small batches like a couple hundred rows at a time, what will happen is ClickHouse will just begin to basically thrash because it spends so much time trying to merge these into larger blocks. 

It’s also going to be harder on your hardware because every time you rewrite these parts, you’re adding wear to the SSDs. But the most obvious thing is it changes your performance

The simplest way as a developer to make blocks bigger is just to batch your input. If you’re reading from Kafka, wait for a second and see how much you get, and then push it in. You don’t have to wait till you get a certain amount of data, but just don’t do it so often that ClickHouse is constantly having to merge a lot of small parts. There are parameters that you can use to then tweak these block sizes underneath.

That’s a kind of complicated subject. I’ll just say that they are there and what you should do is before you tweak those, you want to look at logs and the actual part sizes. There are some interesting issues. One of these issues came up in a question about item potency. There’s a relationship between item potent and the block size. We’ll talk about that in the Q&A.

26:25 How Can I See How Big Table Parts Are?

So part size is important. How can I see how big they are? Well, there’s another great system table for this. It’s called system.parts. Here’s a typical query: I’ve highlighted a couple of interesting fields or columns in that table. Active is basically a flag that says whether the part is in use. So when ClickHouse does a merge, what it does is it deactivates the old parts but it doesn’t garbage collect them by default for about eight minutes.

So if you’re curious to see how big your initial parts are, you can still see them. This level=zero means the part has been inserted but not yet merged. So you can use this basically to distinguish between parts that just arrived to see if they’re a nice size versus the parts that have been merged.

Then the other things are just the name and so on and so forth and how much space they use.

So this is a super useful table, and along with your query logs, it gives you a lot of good information about what’s going on underneath.

At this point, we’ve got a lot of information about how to handle inserts, which is the first kind of query that you have to deal with.

Optimize INSERT

27:43 Tips to Optimize INSERT

Obviously, it’s not reading data for you, it’s writing the data but here are some sort of basic summaries for things you can do. 1) Max insert threads is a really simple way of changing the parallelism. You add more threads up to a point it makes things go faster. Another thing you can do is we talked about the parallel loading. Actually the parsing of input can also be parallelized particularly for tab separated values, comma separated values, and those tuples that you have when you say insert with a values clause. Those can be parallelized. You can enable it with this property.

Then of course just write bigger blocks — that’s the simplest thing that will, along with max insert threads, help things go faster. If you want to make it less memory intensive, decrease max insert threads. There’s a typo here – it should be max insert threads so that’s going to reduce the number of parts in memory. 

2) You can disable the input_format_parallel_processing. I believe it’s off by default. 3) And then of course you can write smaller blocks. If your block size is going to be limited by the amount of available memory, for example, if you’re running out, just make your block smaller. These are all simple ways, just based on understanding a model, that we can make inserts run a lot faster.

So now that we’ve got some data loaded, let’s talk about how to make queries work and we’re going to start with something really easy. 

How Do Basic Queries Work?

29:22 Aggregation is A Key Feature of Analytic Queries

So most queries involve aggregation in ClickHouse. What we’re going to do in this next section is focus on how that works because this is the basic operation that allows you to group measurements around interesting properties of the data. We call these dimensions.

If you’re not an SQL expert, this query is saying — for all the carriers that we have found out, what’s the average flight delay for each one of them, and please give me a list back showing the delay in descending order so the airline with the greatest departure delays is going to show at the top of the list. So this average is an aggregation function. It’s basically totting up numbers and then that result will get printed in the results.

Process a Query with Aggregates

30:12 How Does ClickHouse Process a Query with Aggregates?

So if we send this query over to ClickHouse. It’s going to arrive at the ClickHouse server which will parse it. It’s then going to do some planning to figure out how to distribute the work to get this information, and that next step is going to be what we call a scan.

The scan is multi-threaded by default. In a new server, it will by default try to allocate for itself up to half of the available CPUs — what Linux thinks as a CPU. So if you’re on Amazon and you see that your processor has eight vCPUS, ClickHouse, by default, will try to take four of them. And then what each of those threads is going to do is it’s going to scan, it’s going to get an assignment of work, it will go ahead and scan the parts that it has to cover and it has to look into in storage, and it will run down these parts — basically doing a streaming function where it’s going to read the data.

It knows how it’s supposed to group it and then it’s going to collect the data under those groups in buckets which are basically stored in hash tables. 

So you’ll construct these tables and then eventually each of the threads (which we will see in more detail in a second) has one or more hash tables. At the end, they’re going to get merged and sorted. And the result will get passed back to you. 

This is a very efficient process. It’s scanning and there’s no caching layer for this built into ClickHouse. What ClickHouse does is it, (if you’re familiar with Linux internals), it does depend on the page cache to help make things faster. But that just means if you do I/O on the same block from storage, it’ll just read it from memory if it’s available.

It’s very efficient. It doesn’t have to load things into a buffer cache and then read them the way something like NOBD does. It’s just ripping down storage and then constructing these hash tables on the fly. An interesting question though is: if you’re looking at this, how would you actually do this parallelization here? How would you actually parallelize this query?

Compute an Average in Parallel

32:36 How Can You Compute an Average in Parallel?

Well, here’s the answer! Let’s just take this average. It turns out that, if you have to average a very large group of numbers, you can split them up into sets and then in each set, you can compute what’s called a weighted average. This is exactly how it’s actually done inside the ClickHouse code. It just remembers the values — the total amount of the things that it’s summing up — and then the number (we call it the numerator and the denominator) so you can do all those independently and then you have a very simple calculation at the end to get the final average. We call this a scan. 

This corresponds to the steps you scan and then you merge, And these things that are maintained in the meantime (we call them partial aggregates) so they haven’t been fully aggregated but they’re sort of on their way.

If you think you’ve seen this before, you probably have. This is basically map reduce. Most data warehouses do this in some form or another with varying degrees of speed. But that’s basically what’s going on here. 

The cool thing about aggregates is every aggregate in ClickHouse follows this process. It just has a different representation of the data in its partially aggregated form.

ClickHouse Thread Doing Aggregation

33:55 How Does a ClickHouse Thread Do Aggregation?

So what we’ll see then is if we look at one of those threads, what it’s doing, and how it’s computing the aggregation, we’re going to see it scanning the storage that it’s been assigned.

It can see the groups that it’s passing over and then for each of those groups, it’s basically going to have a hash table where it’s going to store — for example, these are airlines like Alaska and Delta — the partial aggregates as a bunch of arrays hanging off these keys. So each thread is going to have this hash structure and all the other threads have it. And then at the end, what ClickHouse will do is, it has a process for merging the contents of these hash tables. 

For anybody who’s looked at ClickHouse code, this is simpler than actually what happens. ClickHouse has many hash table implementations. If you have large numbers of keys, ClickHouse will actually split the hash table into multiple levels because that allows it to merge the hash values in parallel. But this is basically what’s going on and what this means is that while you’re aggregating, you’re just building up these structures in memory so you need to have enough memory to do that.

Aggregation Performance Drivers

35:14 We Can Now Understand Aggregation Performance Drivers

With this notion of parallelism, keys, and the way that aggregates are stored, you can understand what are the things that change the performance of aggregation. For example, if you just process a simple aggregate like the one I showed, it’s very quick. Without changing anything, this was scanning many millions of rows and it uses a tiny amount of RAM. 

If we extend this out and this GROUP BY is much larger (it has a much larger number of keys), it also uses a much heavier aggregate which actually has to store a hash value of the unique airline identifiers within each of these GROUP BY keys. This uses much more memory. We’re talking here about a difference of about a million times in the amount of memory that we’re using. We’re going from Kilobytes to Megabytes, and the actual processing time is also quite a bit larger but not on such a radical scale.

So all of a sudden, we’re actually dealing with a lot of memory here.

36:27 Parallelism Affects Speed and Memory Usage

You can also parallelize things. You can use max threads. We can boost the performance very easily by just changing the setting of the max thread. Here’s an example of that heavier query where we’re going ahead and running it. (I’m going to limit it because this is actually a couple hundred million rows that we were scanning, and I’m just trying in different thread settings). 

You can see that when I go from one to four, it drops the response by a factor of four. So it parallelizes very well. Going to 16 didn’t help a lot. It’s interesting that the amount of RAM you use can be affected by parallelization, but not in the way you think.

You might think that having more threads would mean more hash tables and more structures built, but it doesn’t always use more memory. In fact, in this case, it used a bit less as we parallelized. So this is an example where you actually have to look at what’s going on to understand the effects.

ClickHouse Memory Limits

37:31 ClickHouse Has Memory Limits

Speaking of memory, it’s probably good to just talk about memory limits. ClickHouse will just stop you in your tracks when you exceed them. For a single query, there’s a property called max memory usage. It is by default 10 Gigs. I would say, particularly if you’re starting out, don’t change this. If you’re hitting this limit, start figuring out why your query is eating so much memory and make it smaller. 

There’s also a setting that you can put in. It’s by default I believe unlimited — it’s the maximum amount of usage for users and that’ll say if all of the queries for your user exceed this number, then we’ll start failing them. Finally, there’s a limit on the amount of memory on the server. This is a really brutal parameter. If you hit this, things just start failing randomly because the server will no longer allocate the memory necessary for them. You can play around with these but these are sort of driving what’s happening when you run out of memory and when ClickHouse will throw errors. 

As I say, it’s usually better to try and fix your query than tweak it. But of course, they’re cases where that’s not true. 

Make Aggregation Queries Faster

38:49 Tips to Make Your Aggregation Queries Faster

Here are some tips to make your aggregation queries (your basic queries) faster. 1) Just remove or exchange the heavy aggregation function. So the one that I picked (unique exact) is very expensive because it has to find every exact value and it has to store keys for all of them. There are estimated versions of this that are faster, and I don’t know if they use less memory. I didn’t benchmark them but they would fix that. 

2) You can reduce the number of values in GROUP BY. These first two things make a huge difference in the amount of memory that you use and speed.

3) You can increase max threads (parallelism works up to a point) and then 4) there’s just stuff that you’ll do anyway for any query like reducing I/O: filtering rows that you don’t want and then improving compression of data and storage. We’re not going to talk about that in this talk but there’s a whole bunch of different ways that you can improve compression and just read less data for the same number of rows.

So these are good ways to make aggregation go faster that you can develop by just understanding how it works and then looking at the numbers.

If you want to read more about aggregation, check out our blogs: ClickHouse Aggregation Fun, Part 1:Internals and Handy Tools and ClickHouse Aggregation Fun, Part 2: Exploring and Fixing Performance.

Reduce Memory Usage in Aggregation Queries

40:11 Tips to Reduce Memory Usage in Aggregation Queries

Similarly, you can reduce memory usage. Again, reduce the heavy aggregation functions, and reduce the number of values in GROUP BY. Again this has a huge impact on memory. You can reduce the number of threads. There is something you can do if you’re really running out of memory on aggregate. You can dump them to external storage. This is a handy feature; it’s going to run a lot slower but you’re going to get your query working. And of course, if you don’t need to scan rows, filter things out and make the query smaller.

So that’s the model for aggregation and some basic things that we can draw out of that to make aggregation run faster and more efficiently.

40:56 JOIN Combines Data Between Tables

Let’s look at ClickHouse JOINs. This is another major thing that we do in SQL. What we’re doing is combining the data between tables. If you’re an SQL expert, this is an old hat. But if you’re learning SQL, here’s a typical example of a JOIN.

What we’re trying to do in this airline data set is we’re going to compute some statistics like the number of flights going to particular destinations — we’d like to have the airport name, sort of the long one that appears on the sign when you drive up. So we’re going to do a JOIN. That airport name is not included in the base data.

There’s another table, so on time is your base data. Airports is where we have this list of a few thousand airports and then we JOIN them on a condition. So that’s going to combine the data.

Process a Query with a JOIN

41:44 How Does ClickHouse Process a Query with a JOIN?

Let’s look at what ClickHouse does. ClickHouse uses by default what’s called a hash JOIN. The way that it works is that we’re going to take that JOIN — let’s look back at it so we have the left side and we have the right side. The left side is the big one — on time that’s got a couple hundred million rows of airports. And the right side is small as it has about seven thousand rows. 

So what we’re going to do is take that right side table and we’re going to load row values that include the key that we’re JOINing on as well as anything that we’re including in the final result. In this case, it’s the airport name so we’re going to put that in another hash table and it’s going to be sitting in RAM. It’s going to be a hash table and then a bunch of blocks of data.

One thing that’s really important to notice here is whatever it is you need from that table, it’s got to go into RAM. If you JOINed on a very large table and you reference that right side a bunch of columns, they would all get pulled in and loaded into memory. That’s significant because if it’s too big, you’ll run out. 

The next thing that ClickHouse does is once it’s got that, it’s going to just do a scan on the left side table. It’s going to go ahead and do the aggregation process that we just described. As it’s running through, it’s going to be looking over into this hash table so that it can actually grab the information it needs if it’s available and then it’ll merge the result.

43:11 Let’s Look More Deeply at What’s Happening in The Scan

So that’s the basic process. As this happens, it’s very useful to understand what’s going on under the covers. So here we have airports that have been loaded (that’s our right-side table loaded into memory). We have on time, we’re doing the scan process so we have a bunch of threads running down that. As those threads are working, it’s creating partial aggregates like what we described plus it’s also picking up the airport name. 

One of the things that’s kind of interesting here is we’re basically accumulating a bunch of repeated information which ClickHouse doesn’t necessarily know what to do with yet.

So we need to keep some of this stuff hanging around until we can do the final merge, in case some of it may be thrown away, but ClickHouse doesn’t necessarily know that you’re doing that at this point. So that leads to a question: wouldn’t it be nice if we could do all the heavy lifting of computing the aggregates, let that stuff merge, and then we’d have a relatively small amount of data that we could then JOIN with the in-RAM hash table?

We can do exactly that and here’s how it works.

44:37 It Would Be More Efficient to JOIN After Aggregating

So we’re going to load the right side table. We’re going to scan, we’re going to merge, and then JOIN and sort. So this picture. ClickHouse has a very simple way of doing this which is to take the initial query, and instead of scanning the table on time, it can go ahead and change that to be a sub-query. 

45:01 Sub-query

So in fact, what it’s going to do is it’s going to go select the destination values and do the aggregation. In that particular case, it’s going to do this first and then it will JOIN against the other data. What we see here is that it reduces the amount of memory that we’re using by a factor of about 20,000. This is a relatively small query because the aggregate is pretty cheap. 

If you had larger aggregates or were selecting more things, this could actually become very significant. So this is a really simple way of just understanding how JOINs work and ordering them. You can control this inside SQL and make a big difference in performance.

Speaking of performance, you can learn secrets to ClickHouse query performance in our past webinars: part 1 and part 2.

Keep JOINS Fast and Efficient 

45:45 Simple Ways to Keep JOINs Fast and Efficient

So this leads to some simple ways to keep JOINs fast and efficient. There are many other things you can do. We won’t get into those but keep that right side table small, you can do multiple tables and there’s no problem. But just bear in mind every time you refer to it, it’s going to get loaded into memory (read and loaded), and it’s going to eat up RAM until your query is done.

Minimize the columns JOIN from the right side because they’re going to get dragged in and stored. Try and reduce the number of rows — these are all on the right side table. You can push to JOIN after the aggregation is done so that you don’t drag around unnecessary data. Then finally a really simple thing that you often want to do even as an app developer is if you find you’re running out of memory or you’re being slowed down because of the fact that these JOINs are either using a lot of memory or taking a long time to load, you can use what’s called a dictionary

You can imagine that as sort of like that hash table that we constructed except that it’s loaded once and then a bunch of queries can share it. One other thing, the in operator which you’re probably familiar with if you’re a SQL person, it’s really a JOIN under the covers and it uses the same mechanism.

Okay, that’s JOINs.

Distributed Query

47:10 Example of a Distributed Dataset

We have one last topic and just going to touch the surface of it. How does a distributed query work? In ClickHouse once you reach a certain size of data or if you decide that you’d like to have high availability, you’re typically going to shard your data. So you’re going to break it up into pieces and you’re going to replicate it so that you have multiple copies even within single charts. This is an example of a sharded data set so that instead of having the ontime table we’re going to call it ontime local.

We have a couple of copies. We’re going to divide it into two pieces so the first shard (the ontime local) values are red. There are two of them because there are two replicas. The other half is blue — that’s also two replicas. We have two other kinds of tables. We’re going to do what’s called a distributed table which acts like a view basically, a very sophisticated one but it knows where the underlying replicas are located. 

Then finally we have a table which is what we call fully replicated where there’s just a copy of it. It’s a small table, we just have a copy of it replicated to every single server. So no matter which server you go to it’s always going to be there. 

48:40 Distributed Send Subqueries to Multiple Nodes

We won’t talk about the details about how sharding is set up. There are lots of presentations, blogs, and documentation about how to do that. 

Altinity has done a deep dive into ClickHouse sharding and replication. Make sure to check out the webinar!

Here’s what happens when a query comes in to actually land on one of these sharded and replicated data sets. So you have your application, it’s over on the left. What you’re going to do is query in the simplest case against ontime — that’s your distributed table. So we call that node where you arrived ‘the initiator’. 

What the distributed table is going to do, as it processes this, is it’s going to go look up where the replicas are. For example, if it has a local replica, it’ll just by default use that. It’s going to find another replica for the other shard and then what it’s going to do is it’s going to send at least a piece of that query out to these local replicas, get that aggregation, scan that left side scan, and it’s going to pull the results back, merge, and sort. That’s the basic idea. Your application only sees one node but under the covers, we’re going to all the locations of the data. 

49:55 Queries Are Pushed to All Shards

Within the distributed table, it’s going to rewrite the queries more or less and send them out to the shards. So the query that arrives is selecting from ontime. It’ll get rewritten to select from ontime local, that’s the real table, and that will get sent to each shard.

An interesting question is what if we’re JOINing? well here’s what happens.

ClickHouse Pushes Down JOINs by Default

This is similar to the example we showed, by default, before these JOINs are going to get pushed down into the different shards.

Here’s a JOIN doing a similar kind of thing where we want to get the airport name.

You can see that we’re JOINing ontime (which is the distributed table) with default (which is not a distributed table). It’s a real local table. ClickHouse will say ‘Okay fine, what I’ll do is rewrite this to be ontime local’ and that’s just going to go ahead and push it down to the shard. In this case, though, we’re going to have the same problem that we had just a minute ago which is that we’re going to end up doing the JOIN locally and dragging around data that we don’t really need yet (which will just end up getting thrown away). 

51:19 Left Side Table a Sub-query

So one of the ways we can speed this up is we can go ahead, and as we did before, we can make that left side table actually a subquery. If you do this then what will happen is this query that’s in the middle will get sent to your remote servers and then the stuff that’s around it will actually get processed on the initiator node. This is a simple way of dividing this up.

If you look in the query log, you can see that this is happening and actually see that this query is what arrived at the remote nodes, and you can infer from that the rest of it was processed on the initiator.

52:04 It’s More Complex When Multiple Tables Are Distributed

It is way more complicated when multiple tables are involved and they’re distributed. We’re not going to go into that. I’ll just leave with this. You have to give ClickHouse guidance about when you need to send stuff out to go scan these distributed tables. It gets complicated fast. The basic thing though is if you keep the ClickHouse queries simple, you usually don’t have to deal with this too much. 

52:38 Tips to Make Distributed Queries More Efficient

So that gets us to tips for making distributed queries more efficient. Think about where your data are located. If you’re going to JOIN down in the shard, as you’re processing the shard, the data that you’re JOINing on has to be there in full. So you’re going to have to think about that in advance.

Another thing is you can move the wear and the heavy grouping work if you can get that into the left side of the JOIN and get that pushed out. That’s going to be the part where you’re going to get high parallelization. You want that stuff sent out to the nodes so you can use a subquery to get that stuff pushed out. That’ll allow you to use all these CPUs that you have available to you. 

You can use a subquery to order the JOINs after the remote scan, and you can use the query log (this is really important to see what’s executing on the initiator versus what was actually sent out to the nodes). That enables you to disentangle a little bit about who’s doing what work and where. So many more things to talk about with distributed queries. It’s a deep subject and this is where things start to become quite complicated but we’re going to leave it here and just talk about where to learn more.

54:04 Resources for ClickHouse

There are a bunch of great data sources — these include the ClickHouse official docs that should be your first stop, the Altinity Blog, the Altinity YouTube channel, and the ClickHouse Knowledge Base. We do an enormous amount of work on performance so there are lots of articles and videos and knowledge-based articles about this. There are ClickHouse meetups, other people [have] blogs, and lots of external resources that you can find through Google or your favorite search engine.

54:29 – 55:45 Wrap-Up and Conclusion

Where did the data come from on this talk? Well partially out of our heads, but there are some really important things to know if you want to study ClickHouse performance more deeply. Altinity Knowledge Base has really excellent information just sort of hot-off-the-presses as we’re solving support cases [and] building articles.

The ClickHouse source code, of course, is a great place to look. It’s well-documented. If you want to understand aggregates, that’s a great place to look. Then there are a lot of talks and blog articles I’ve referenced. It’s been a pleasure having you on this talk and we’re delighted to help you.




  1. I am looking forward for this webinar. I have seen your previous session on clickhouse tutorial 1 it was amazing.

  2. Excellent talk ! Robert explains wonderful about how to tunning the sql. how to make balance between cpu (time cost) and mem cost.

Comments are closed.