A Day in the Life of a ClickHouse® Query

Recorded: Thursday, Feb 10 | 10:00 am PT
Presenters: Robert Hodges, CEO, Altinity
This webinar, presented by Robert Hodges, CEO of Altinity, is an introduction to ClickHouse internals aimed at application developers who already use ClickHouse and want to understand how it works underneath so they can write faster, more efficient queries. The premise is simple: much like tuning a jet engine, you get more performance once you understand the engine, and ClickHouse has a refreshingly simple execution model built on familiar concepts like threads and hash tables.
The talk follows the life of a query from the inside. It starts with inserts, showing how data lands in memory, gets sorted and indexed, and is flushed into parts, and how the max insert threads setting and bigger batches speed things up. It then moves through storage internals, covering parts, the sparse primary key index, granules, and why MergeTree merges small parts into larger ones. From there, it explains how reads work: aggregation via parallel scans and hash tables, joins implemented as hash joins with the right-side table loaded into memory, and the trick of pushing aggregation ahead of a join to cut memory use dramatically.
It closes with distributed queries, where data is sharded and replicated, and a distributed table routes work to local replicas, and shows how reordering joins as subqueries pushes heavy work out to the shards. Throughout, Robert emphasizes using ClickHouse system tables like the query log and system.parts to measure what queries actually do, so that optimization decisions rest on real numbers rather than guesswork.
Here are the slides:
Key Moments (Timestamps)
Key moments generated with AI assistance.
- 0:04 – Welcome and webinar logistics
- 1:31 – Robert Hodges introduces himself and Altinity
- 3:04 – What ClickHouse is and its key features
- 5:57 – Why understanding the engine matters
- 9:35 – How ClickHouse processes an INSERT
- 11:22 – Parallelizing inserts with insert threads
- 15:53 – Measuring queries with the query log
- 18:04 – Parts, sparse index, and granules
- 21:32 – Why is it called MergeTree
- 27:37 – Tips for optimizing inserts
- 29:22 – How aggregation and GROUP BY work
- 37:31 – Memory limits and faster aggregation
- 41:01 – How joins combine data between tables
- 47:24 – Distributed queries, sharding, replication
- 53:49 – Where to learn more and wrap-up
Webinar Transcript
0:04 – Welcome, Logistics, and Introductions
Robert: I’d like to welcome everyone to our webinar, “A Day in the Life of a ClickHouse Query.” We’re going to give an introduction to ClickHouse internals for application developers, providing you with basic information about how things work inside, so that you can make your queries both faster and more efficient.
My name is Robert Hodges, and I’ll be doing the presentation today. I’m backed by a bunch of people in Altinity engineering.
In order to dive in, I’m going to do some introductions and then get into the meat of the talk. First, some upfront information that will help you enjoy this talk more. One, it is being recorded. We’re recording it in two ways. We will be posting the best of the recordings after this talk is done. If you signed up, you’ll get a link for it, and you can go find it yourself. We’ll also include the slides, so all that information, including samples and things you might want to copy and paste, is available.
Second thing: if you have questions, feel free to put them either in the chat or the Q&A box, which is provided by Zoom. If the questions are relevant to what I’m talking about, I’ll take them as I go along. Otherwise, we have time at the end of the talk, and we can dig into things. We’ll try to end pretty much at the top of the hour.
With that, let’s go ahead and do some deeper introductions. Once again, my name is Robert Hodges, and I am a database geek. I first started working with databases in 1983. I had a couple of breaks doing mathematical programming and virtualization at places like VMware. In that time, I worked with about 20 different databases, so that’s the main thing I’ve done in my career. My day job is that I’m the CEO of Altinity.
Altinity Engineering is a big participant in this talk. It would not be possible to put this information together without all the help from the folks we have who work on ClickHouse, help support our customers who are on ClickHouse, and build things on top of it. That team has centuries of experience in databases and applications. I’d like to particularly call out Tatyana Saltikova and Denis Zharovlov for their help in preparing this talk.
Finally, Altinity is a company. We do support and services, including Altinity.Cloud, focused on ClickHouse and building applications on top of ClickHouse. We’re also the authors of the Altinity Kubernetes Operator for ClickHouse, which many of you on this call may already use.
So that’s us. A little bit about you, the audience. We’re assuming that you are developers, that you’re using ClickHouse, and that what you’d like to do is learn a little bit more about how ClickHouse works underneath so that you can use it more effectively.
3:04 – Introducing ClickHouse: An Open Source SQL Data Warehouse
Robert: Let’s go ahead and dive in. We’ll start with the basics and then get into the internals. Let me introduce ClickHouse, in case there’s anybody on this call who has not heard of it before.
ClickHouse is a SQL data warehouse. I like to think of it as the first open-source SQL data warehouse that can play with the big kids. It’s really outstanding and has a number of outstanding features. Of course, it understands SQL, which is the winning language among data warehouses and databases in general.
ClickHouse itself has a number of really great features. For example, it’s extremely portable. It runs anywhere Linux does, from bare metal to cloud, as well as Kubernetes. It has a shared-nothing architecture, which is a traditional data warehouse architecture where you have a set of nodes with attached storage connected by a network. It is moving toward a decoupled compute and storage model, but that’s something that’s in progress and will play out over time, like just about all data warehouses, or the vast majority of them.
It stores data in columns. Row databases are easy to update; it’s easy to get individual rows, and they’re very efficient for things that do transaction processing, like running e-commerce sites. A column store is very well optimized for reads, and you’ll see examples of that.
Speaking of reads, ClickHouse is able to do both parallel and vectorized reads. By parallel, we mean spreading work across multiple CPUs and across multiple nodes connected by a network. Vectorization is when we can take arrays of data and push them onto the CPU and process them using SIMD instructions, single instruction multiple data, which allows us to take advantage of concurrency inside the CPU itself.
ClickHouse will run on everything from a laptop all the way to huge clusters containing tens of petabytes and hundreds of nodes. And it’s open source, which is a really big, permanent, differentiating feature. It’s Apache 2.0. You can use it anywhere you want for any business purpose.
As a result of these features and a huge community of contributors, in 2021, there were probably at least 400 unique contributors posting PRs to ClickHouse. It has turned into the core engine for real-time analytics: reading data from event streams, ELT processes, and object storage, and then pushing it out to the different ways people like to consume it, and doing it extremely fast.
So that’s ClickHouse. We won’t do much more marketing on it. What we’re going to do is focus on how it works inside, and the reason for that is simple.
5:57 – Understand the Engine, and the First Query: INSERT
Robert: Like a jet engine, if you want to make it go faster or get more performance out of it, you need to understand how the engine works. Same thing with ClickHouse.
One of the things that’s really cool about ClickHouse is that it has a relatively simple execution model. If you’ve used a database like Oracle, you know there’s a very complex query planning engine, with a great deal of complexity built into it that occurs inside the black box, and you don’t really know what it’s doing or how it’s doing it. That’s not the case with ClickHouse. There’s really no magic in what we’re doing. In fact, any developer who understands what threads are and understands what a hash table is has a lot of the knowledge needed to understand how ClickHouse works and to use it better. Then we can take that knowledge and apply it to the queries we’re executing and make them faster and more efficient. That’s exactly what we’re focused on here. By introducing the plumbing, we’ll show you how things work and tie it back to how that makes things faster.
This talk is about queries, but in order to have a query, by which I mean a SQL SELECT statement, it’s very helpful to actually have some data to select. So the first query we’re going to start with is inserting data.
To insert data, we need a table, so let’s create one. Most people on this call have probably seen or executed something like this before. This is an example of a typical small table in ClickHouse. It has columns with different data types. It has an engine called MergeTree, the standard engine for dealing with large amounts of data. It also has a couple of interesting things that will be new if you’re familiar with other databases.
It has what’s known as a partition key. This is a way of dividing the contents of the table into what we call parts. We’ll dig into that more deeply, but the basic idea is that this table is going to be pretty big, and we need to give ClickHouse a hint about how to break the data into pieces in a reasonable way. For example, if you’re doing time series, you would partition by month, which is what this one is doing. We have a function that turns your date into a year and month, and then we have a bunch of parts, each of which essentially contains data for an individual month. It then becomes pretty simple, for example, after 12 months have elapsed, to get rid of the last month very easily. It also helps us with queries.
We also have ordering. It’s very important in data warehouses to order the data, to pick an ordering of the data. It’s kind of like using a clustered index in a traditional database, and we’ll talk a little bit about that as well. So that’s your standard table definition.
Let’s see how we insert data. Here’s a typical INSERT statement. For those of you who are data warehouse experts or have used them for a long time, I just want to assure you that this is not the way most people actually insert data, but it’s an example. We have an INSERT statement, the name of the table, and then some values, a couple of rows that we’re inserting here. We’ll use this as an example because what we want to do is show how the data actually gets into ClickHouse and then start to think about how we can make it faster.
9:35 – How ClickHouse Processes an INSERT
Robert: How does ClickHouse actually process an insert? Like most databases across the world, your query arrives in the database coming in over the network. We do some parsing to figure out what it says, we do some planning, and then we actually execute it. In this case, we load data and then respond with a result. This looks kind of slow, but that’s basically because I was inserting over the internet. In fact, it only takes a tiny fraction of a second to execute.
What’s interesting is what’s actually happening inside ClickHouse when we load the data. We take these two rows, which you can see over on the left side with a little bit of data left out, and we pull them into ClickHouse to create one of these parts, one of these divisions of the table. We construct it in memory. Moreover, we build the index on it, and we sort it. This is all done in memory. As soon as that’s done, we flush it out to the files that actually represent this part in storage.
The key thing to note is that whenever you have a block of data you’re inserting, that data is going to live in memory for a while while it gets properly formatted, and then it’s going to get flushed out to storage.
This is a pretty simple query. But what we want to know is, what if we were doing a really large insert, where we’re throwing in a few hundred million rows of data? That’s, of course, something we can also do.
11:22 – Parallelizing Inserts and Reading the Query Log
Robert: When we begin to insert large amounts of data, we want to parallelize this process, because we don’t want to wait for a single thread to do all this work and then flush it out to storage. We’d like to get many threads to do it. ClickHouse has a setting called max insert threads, which tells ClickHouse how many threads it’s allowed to use when doing an insert. In this case, we’re setting it to four threads, and then we process an INSERT command.
What this insert is doing is inserting data into a table called on-time test. This is a table of airline flight data, and it is going to select that data from another table. There are other ways we could do this, where we could be inserting from a CSV loaded from outside or reading from S3, but in both cases, the processing is essentially going to be the same.
So we select from this table called on time. We limit it just because I wanted it to be completed in a reasonable amount of time, so I could try it in different ways. It comes in, we do the parsing, we do some planning to figure out how to execute it, and then we start four threads. They go crunching along, reading different pieces of this data as it’s selected out of the source table. As that happens, each of those threads is building parts in memory, and as they’re complete, it flushes them out to storage. These operations could, over the course of the insert, create many parts, perhaps even hundreds of parts. Each thread will be working on a part, and as that part gets done, it will get flushed to storage.
The amount of memory that gets used here is now multiplied by four because each of these threads has to have enough space in memory to build its part and then send it out to storage.
A question just popped up: does the ClickHouse client do some query processing? The ClickHouse client is one of the ways you can load data, and the answer is yes. In this particular case, this is running entirely inside the server, so the ClickHouse client won’t help at all; it’s all done inside the server. But if you were reading, for example, a CSV file that you were just piping in, then yes, there’s some parsing done there, and the ClickHouse client will help do some of the formatting of the data up front.
So this is the process we follow, and no matter how the insert is processed, it’s basically going to work this way.
What’s kind of interesting now is that because we’re putting the parts in memory and because we need available RAM to do this, we have a trade-off. By adding threads, we can parallelize things and speed them up, but at the same time, it consumes more memory.
Here’s an example of that exact same query. I tried it in different ways, setting max insert threads to one, two, and four. If I measure the insert time, the time to completion with one thread, in my particular case, on a not-very-powerful server, it took 33 seconds to load those rows. I think there were about 16 million of them. With two threads, it dropped by half. With four threads, it dropped by half again. Adding eight threads in the end didn’t really help; it just couldn’t parallelize beyond that.
At the same time, we see that as the speed drops by half, the RAM doubles. So it’s actually a nice relationship: the increase in RAM roughly correlates to the parallelization.
By the way, some interesting questions are coming up. Many of them I’m going to defer to the end because they require extended discussions, and I want to make sure we get this covered.
You can see this relationship in the graph. This is something you can do for yourself. In fact, one of the questions is, how did I find that information out? We have the mental model of what’s going on inside ClickHouse. The question is, can we get some statistics so we can actually see the trade-off between these things and compare them scientifically? The answer is absolutely yes.
ClickHouse has probably the best system tables of any database I’ve ever worked with. They’re really outstanding, very easy to understand, and have a wealth of interesting information. One of the most useful system tables for this kind of work is called the query log. The query log is something you have to enable. There’s a setting inside ClickHouse, in the config.xml, where you can turn the query log on. Once you do, every time a query runs, it will insert it into a table called system.query_log by default.
This is an example of a query that gets logged. There are different kinds of events associated with a query, for example, start and finish are two of them. An interesting field here is the initial query, which is a very useful field. It tells you whether this is the first query that landed on ClickHouse or a query that was sent to us from another node doing a distributed select. We’ll talk about that a little later, but it’s super useful to be able to tell apart. We can see how long it lasted, what it read, and, very interestingly, the maximum memory that was used at any time. And of course, we see the query itself.
With this simple query, you can just run it and quickly find out what you just executed and how many resources it used. There are many more columns in here; I’m just picking the ones that were most useful for this talk. This is a query you’ll want to run constantly when you’re doing performance work to look into what your queries are doing.
18:04 – Inside Storage: Parts, the Sparse Index, and MergeTree
Robert: Another question, besides how we get the statistics, is to dig a little more deeply into what’s going on down inside ClickHouse when we do the insert. We talked about the tables, we showed the table definition and the partition key, and now we want to look at what the results are in storage.
As I mentioned, we partition the data and build parts. For every partition, in our case, we’re partitioned by month, we end up with one or more parts, which are blocks of data that are indexed. There’s a thing we call a sparse index. By default, ClickHouse has a notion of a primary key, which by default is the same as the sort order. Whenever possible, ClickHouse will use that key to locate the blocks, the bits of data, to read when it’s processing a select. The columns here can be sorted. In this particular example, they were sorted by carrier, origin, and flight date, so the index matches that, and the values belonging to each row will be sorted in this order.
The sparse index is interesting because it truly is sparse. Its goal is to be able to fit into memory, so we can’t make it very big. What ClickHouse does by default, if you don’t tell it anything else with a MergeTree table, is create an entry for every 8,192 rows. So when you’re locating things using the primary key index, if you look for a single row, you will by default read data for about 8,000 rows. Fortunately, ClickHouse minimizes I/O as much as possible, but it’s not as efficient as reading a single row in a database like MySQL.
Looking deeper into the part itself, we can see this primary key index with its different entries. We call these sections of about 8,000 rows a granule, so that’s a chunk of data. We have the index, and then for each column, we have separate files. By default, there’s a mark file, column.mrk. This is just a long array that says, for every entry in the primary key index, where the actual binary data is stored. That mark has a pointer into a compressed block. The compressed block generally contains multiple granules of data, but that tells ClickHouse where, if I’m looking at the column called airline and I want to read a particular row, I can roughly find the data, then go decompress it and read it.
That’s basically what’s happening underneath the covers. You can recognize that if you’re reading data, it’s very efficient, because you just read the columns you need, and then only those parts that are relevant. If you’re writing data, it’s much more complicated. We could end up creating hundreds of files just to add a couple of rows. We’ll talk about that in a minute.
One final thing: why is this called MergeTree? The answer is because it merges. When you write the data, we end up creating one or more parts each time you insert, but they might not be very big. In my first example, there were only two rows, which is not very efficient, because when we’re reading, we’d like to open a small number of files and have nice long runs so we get high compression and don’t have to jump around to a lot of different places in storage or keep a bunch of file descriptors open.
So what ClickHouse does in the background is rewrite the initial parts you put in to create bigger parts. You can count on the fact that if you do inserts, and maybe your parts have a million rows each, you’ll get a few of those, and ClickHouse will, at its own convenience, look at the parts and say. We can now merge these together. It takes all the data, constructs a new part that contains all of them together, and deactivates the old ones. This happens silently. It’s completely transactional. You don’t notice it happening, other than the fact that things speed up.
One other thing that’s interesting, and different from other databases: updates and deletes also require rewriting the parts. We won’t talk more about that, but it’s another example of how these databases work differently from a row-oriented database like MySQL or Postgres.
To complete this discussion, bigger parts are more efficient: fewer file descriptors, longer runs, and better locality of data, assuming you’ve got your sort orders and index correct. What we typically recommend is that you pick a partition by key when you’re starting out that gives you nice fat partitions. They can be up to 300 gigs; these things can be really large, but one gig is a nice size.
Another thing you should think about is trying to keep this to less than a thousand total parts per table on a node. If you’re getting more parts than that, it’s fine to spread them across a bunch of nodes; ClickHouse is perfectly happy with that. But having too many parts is going to slow down the reads.
When you’re typically starting out, you have no idea how to make this choice. So here’s a simple one. Most data we see is oriented by time and has time as a dimension. Just partition it by month. That tends to work out well for a lot of data sets. Then you can test it out, and if you find that doesn’t work, you can try something else.
Another thing that’s really important, because of this merging process, is that we want to insert relatively large blocks of data, because then ClickHouse will have less work to do to merge them into their final form. If you have a ClickHouse server and you’re loading a lot of data and you just load very small batches, like a couple of hundred rows at a time, ClickHouse will begin to thrash because it spends so much time trying to merge these into larger blocks. It’s also going to be harder on your hardware because every time you rewrite these parts, you add wear to the SSDs. But the most obvious thing is that it hits your performance.
The simplest way, as a developer just starting out, to make blocks bigger is just to batch your input. If you’re reading from Kafka, wait for a second, see how much you get, and then push it in. You don’t have to wait until you get a certain amount of data, but just don’t do it so often that ClickHouse is constantly having to merge a lot of small parts.
There are parameters you can use to tweak these block sizes underneath. That’s a kind of complicated subject. I’ll just say that they are there, and what you should do before you tweak them is look at the logs and the actual part sizes. There are some interesting issues. One came up in a question about idempotency. There’s a relationship between what’s idempotent and the block size. We’ll talk about that in the Q&A.
A final question: since part size is important, how can I see how big they are? There’s another great system table for this called system.parts. To get information on what the parts are doing, here’s a typical query. I’ve highlighted a couple of interesting columns. Active is basically a flag that says whether the part is in use. When ClickHouse does a merge, it deactivates the old parts but doesn’t garbage collect them by default for about eight minutes, so if you’re curious to see how big your initial parts are, you can still see them. Level equals zero means the part has been inserted but not yet merged. So you can use this to distinguish between parts that just arrived, to see if they’re a nice size, versus parts that have been merged. The other columns are just the name and so forth, and how much space they use. This is a super useful table, and along with your query log, it gives you a lot of good information about what’s going on underneath.
27:37 – Optimizing Inserts
Robert: At this point, we’ve got a lot of information about how to handle inserts, which is the first kind of query you have to deal with. Obviously, it’s not fetching or reading data for you; it’s writing the data, but here are some basic summaries of things you can do.
Max insert threads is a really simple way of changing the parallelism. You add more threads, up to a point, and it makes things go faster. Another thing is the parallel loading we talked about; the parsing of input can also be parallelized, particularly for tab-separated values, comma-separated values, and the tuples you have when you say INSERT with a VALUES clause. You can enable it with a property. And of course, just write bigger blocks; that’s the simplest thing that, along with max insert threads, will help things go faster.
If you want to make it less memory intensive, decrease max insert threads. There’s a typo on the slide; it should be max insert threads. That reduces the number of parts in memory. You can disable the input format parallel processing; I believe it’s off by default. And of course, you can write smaller blocks. Your block size is going to be limited by the amount of available memory, so if you find you’re running out, just make your blocks smaller.
These are all simple ways, just based on understanding a model, that we can make inserts run a lot faster.
29:22 – Aggregation: How GROUP BY Works
Robert: Now that we’ve got some data loaded, let’s talk about how to make queries work, and we’re going to start with something really easy. Most queries in ClickHouse involve aggregation, and in this next section, we’ll focus on how that works, because this is the basic operation that allows you to group measurements around interesting properties of the data, which we call dimensions.
For example, this query, if you’re not a SQL expert, is saying: for all the carriers we have, find out the average flight delay for each one, and give me a list back showing the delay in descending order, so the airline with the greatest departure delays shows at the top of the list. This average is an aggregation function. It’s basically totting up numbers, and that result gets printed in the results.
If we send this query over to ClickHouse once we’ve got the data loaded, what happens? It arrives at the ClickHouse server, which parses it. It then does some planning to figure out how to distribute the work to get this information, and the next step is what we call a scan. The scan is multi-threaded. By default, a new server will try to allocate up to half of the available CPUs, which Linux thinks of as a CPU. So if you’re on Amazon and you see that your processor has eight vCPUs, ClickHouse will by default try to take four of them.
Each of those threads scans. It gets an assignment of work and scans the parts it has to cover and look into in storage. It runs down these parts, doing a streaming function where it reads the data, knows how it’s supposed to group it, and then collects the data under those groups in buckets, which are basically stored in hash tables. You construct these tables, and then eventually each of the threads, which has one or more hash tables, gets merged and sorted, and the result is passed back to you.
This is a very efficient process. These threads are scanning. There’s no caching layer for this built into ClickHouse. What ClickHouse does, if you’re familiar with Linux internals, is depend on the page cache to help make things faster. That just means that if you do I/O on the same block from storage, it’ll read it from memory if it’s available. It’s very efficient. It doesn’t have to load things into a buffer cache and then read them, the way something like InnoDB does. It’s just ripping down storage and constructing these hash tables on the fly.
An interesting question is, how would you actually parallelize this query? Let’s take this average. It turns out that if you have to average a very large group of numbers, you can split them into sets, and in each set you can compute what’s called a weighted average. This is exactly how it’s done inside the ClickHouse code. It just remembers the values, the total amount of the things it’s summing up, which we call the numerator, and the count, the denominator. You can do all those independently, and then you have a very simple calculation at the end to get the final average. We call this a scan. This corresponds to the steps: you scan, and then you merge. The things maintained in the meantime we call partial aggregates, because they haven’t been fully aggregated, but they’re on their way.
If you think you’ve seen this before, you probably have. This is basically MapReduce. Most data warehouses do this in some form or another, with varying degrees of speed, but that’s basically what’s going on here. The cool thing about aggregates is that every aggregate in ClickHouse follows this process. It just has a different representation of the data in its partially aggregated form.
If we look at one of those threads and how it’s computing the aggregation, we see it scanning the storage it’s been assigned. It keeps track of the groups it’s passing over, and for each of those groups, it has a hash table. These are airlines, like American, Alaska, and Delta, and it stores the partial aggregates as a bunch of arrays hanging off these keys. Each thread has this hash structure; all the other threads have it, and at the end, ClickHouse has a process for merging the contents of these hash tables.
For anybody who’s looked at ClickHouse code, this is simpler than what actually happens. ClickHouse has many hash table implementations. If you have large numbers of keys, ClickHouse will actually split the hash table into multiple levels because that allows it to merge the hash values in parallel. But this is basically what’s going on. What this means is that while you’re aggregating, you’re building up these structures in memory, so you need enough memory to do that.
With this notion of parallelism, as well as keys and the way that aggregates are stored, you can now understand what changes the performance of aggregation. If you process a simple aggregate like the one I showed, it’s very quick. Without changing anything, this was scanning many millions of rows and using a tiny amount of RAM. But if we extend this out, and the GROUP BY is much larger, with a much larger number of keys, and we also use a much heavier aggregate that has to store a hash value of the unique airline identifiers within each of these GROUP BY keys, this uses much more memory. We’re talking about a difference of about a million times in the amount of memory we’re using, going from kilobytes to megabytes. The actual processing time is also quite a bit larger, but not on such a radical scale. So all of a sudden we’re dealing with a lot of memory.
You can also parallelize things. You can use max threads to boost the performance very easily just by changing the setting. Here’s an example of that heavier query. I’m limiting it because it’s actually scanning a couple of hundred million rows, and I’m trying different thread settings. When I go from one to four, it drops the response by a factor of four, so it parallelizes very well. Going to 16 didn’t help a lot. It’s interesting that the amount of RAM you use can also be affected by parallelization, but not in the way you’d think. You might think that more threads would mean more hash tables and more structures built, but it doesn’t always use more memory. In fact, in this case, it used a bit less as we parallelized. So this is an example where you actually have to look at what’s going on to understand the effects.
37:31 – Memory Limits and Faster Aggregation
Robert: Speaking of memory, it’s probably good to talk about memory limits, because they’re out there and ClickHouse will just stop you in your tracks when you exceed them.
For a single query, there’s a property called max memory usage. By default, it’s 10 gigs. I’d say, particularly if you’re starting out, don’t change this. If you’re hitting this limit, start figuring out why your query is eating so much memory and make it smaller. There’s also a setting, by default, I believe it’s unlimited, for the maximum amount of usage for users. That says that if all the queries for your user exceed this number, we’ll start failing them. And finally, there’s a limit on the amount of memory on the server. This is a pretty brutal parameter. If you hit this, things just start failing randomly because the server will no longer allocate the memory necessary for them. You can play around with these, but they’re what’s driving what happens when you run out of memory and when ClickHouse throws errors. As I say, it’s usually better to try to fix your query than to tweak it, but of course, there are cases where that’s not true.
Here are some tips to make your aggregation queries faster, based on the knowledge we have. One is just to remove or exchange the heavy aggregation function. The one I picked, uniqExact, is very expensive, because it has to find every exact value and store the keys for all of them. There are estimated versions of this which are faster. I don’t know if they use less memory; I didn’t benchmark them, but they would fix that. You can reduce the number of values in GROUP BY. These first two things make a huge difference in the amount of memory you use and the speed. You can increase max threads; parallelism works up to a point. And then there’s stuff you’ll do anyway for any query: reducing I/O by filtering rows you don’t want, and improving compression of data in storage. We’re not going to talk about compression in this talk, but there are a bunch of ways you can improve it and just read less data for the same number of rows. These are good ways to make aggregation go faster that you can develop just by understanding how it works and then looking at the numbers.
Similarly, you can reduce memory usage. Again, reduce the heavy aggregation functions and reduce the number of values in GROUP BY, which has a huge impact on memory. You can reduce the number of threads. And if you’re really running out of memory on aggregates, there’s something you can do: you can dump them to external storage. This is a handy feature. It’s going to run a lot slower, but you’re going to get your query working. And of course, again, if you don’t need to scan rows, don’t, so filter things out and make the query smaller.
That’s the model for aggregation, and some basic things we can draw out of it to make aggregation run faster and more efficiently.
41:01 – Joins: Combining Data Between Tables
Robert: Let’s look at joins. This is another major thing we do in SQL, where we’re combining data between tables. If you’re a SQL expert, this is old hat, but if you’re learning SQL, here’s a typical example of a join.
What we’re trying to do in this airline data set is compute some statistics, like the number of flights going to particular destinations, and we’d like to have the airport name, the long one that appears on the sign when you drive up. So we’re going to do a join, because that airport name is not included in the base data. There’s another table. On time is your base data, airports is where we have this list of a few thousand airports, and we join them on a condition. That combines the data.
Let’s look at what ClickHouse does to do this. ClickHouse uses, by default, what’s called a hash join. Let’s look back at the join. We have the left side and the right side. The left side is the big one, on time, which has a couple of hundred million rows. Airports are on the small right side; it has about seven thousand rows. We take that right-side table and load the row values, including the key we’re joining on, as well as anything we’re including in the final result, in this case, the airport name. We put that into a hash table sitting in RAM, a hash table plus a bunch of blocks of data.
One thing that’s really important to notice is that whatever you need from that table has to go into RAM. So if you joined on a very large table and referenced a bunch of columns on the right side, they would all get pulled in and loaded into memory. That’s significant because if it’s too big you’ll run out.
The next thing ClickHouse does, once it’s got that, is just do a scan on the left-side table. It goes ahead and does the aggregation process we just described, and as it’s running through, it looks over into the hash table so it can grab the information it needs, if available, and then it merges the result.
It’s very useful to understand what’s going on under the covers. Here we have airports, our right-side table, loaded into memory. We have on time, and we’re doing the scan process, so we have a bunch of threads running down it. As each of those threads is working, it’s creating partial aggregates like what we described, plus it’s also picking up the airport name. One thing that’s interesting is that we’re accumulating a bunch of repeated information that ClickHouse doesn’t necessarily know what to do with yet. We need to keep some of this stuff hanging around until we can do the final merge, in which case some of it may be thrown away, but ClickHouse doesn’t necessarily know that you’re doing that at this point.
That leads to a question. Wouldn’t it be nice if we could do all the heavy lifting of computing the aggregates, let that stuff merge, and then have a relatively small amount of data that we could join with the in-RAM hash table? We can do exactly that, and here’s how it works.
We load the right-side table, we scan, we merge, and then join and sort. ClickHouse has a very simple way of doing this, which is to take the initial query and, instead of scanning the table on time, change it to be a subquery. So it goes and selects the destination values and does the aggregation first, and then it joins against the other data. What we see here is that this reduces the amount of memory we’re using by a factor of about 20,000. This is a relatively small query, and the aggregate is pretty cheap. If you had larger aggregates or you’re selecting more things, this could become very significant. So this is a really simple way: just by understanding how joins work and ordering them, you can control this inside SQL and make a big difference in performance.
This leads to some simple ways to keep joins fast and efficient. There are many other things you can do, and we won’t get into those, but keep that right-side table small. You can do multiple tables; there’s no problem, but bear in mind that every time you refer to it, it’s going to get read and loaded into memory, and it’s going to eat up RAM until your query is done. Minimize the columns you join from the right side, because they’re going to get dragged in and stored. Try to reduce the number of rows. These are all on the right-side table. You can push the aggregation to happen after the join, so you don’t drag around unnecessary data.
And finally, a really simple thing you often want to do, even as an app developer: if you find you’re running out of memory, or you’re being slowed down because these joins are using a lot of memory or taking a long time to load, you can use what’s called a dictionary. You can imagine it as sort of like that hash table we constructed, except that it’s loaded once and then a bunch of queries can share it. One other thing, if you’re thinking about this: we talked about joins, but the IN operator, which you’re probably familiar with if you’re a SQL person, is really a join under the covers, and it uses the same mechanism.
47:24 – Distributed Queries: Sharding and Replication
Robert: That’s joins. We have one last topic, and we’re just going to scratch the surface of it: how does a distributed query work?
This is where, once you reach a certain size of data, or if you decide you’d like to have high availability, you’re typically going to shard your data so you’re going to break it up into pieces and you’re going to replicate it so that you have multiple copies, even within single shards.
This is an example of a sharded data set. Instead of having the on-time table, we’re going to call it on-time local, and we have a couple of copies. We divide it into two pieces. The first shard, the on-time local values, is red. There are two of them because there are two replicas. The other half is blue; that’s also two replicas. We have two other kinds of tables. We’re going to do what’s called a distributed table, which is really just a view, a very sophisticated one, that knows where the underlying replicas are located. If you query it, it will automatically find them for you and pull everything together. And finally, we have a table that’s what we call fully replicated, where there’s just a copy of it. It’s a small table, and we have a copy replicated to every single server, so no matter which server you go to, it’s always going to be there.
We won’t talk about the details of how sharding is set up; there are lots of presentations, blogs, and documentation about that. But here’s what happens when a query comes in to land on one of these sharded and replicated data sets. You have your application over on the left. It sends a query in. In the simplest case, you query against your distributed table on time. We call the node where you arrived the initiator. What the distributed table does as it processes this is go look up where the replicas are. If it has a local replica, by default, it’s going to use that. It’ll find another replica for the other shard, and then it sends at least a piece of that query out to these local replicas, gets that aggregation scan, that left-side scan, done there, and pulls the results back to merge and sort. So your application only sees one node, but under the covers, we’re going to all the locations of the data.
Within the distributed table, it rewrites the queries more or less and sends them out to the shards. So the query that arrives is selecting from on time, and it’ll get rewritten to select from on time local, the real table, and that gets sent to each shard. This is a very simple example, with little difference in the query.
An interesting question is, what if we’re joining? Here’s what happens. By default, kind of like in the example we showed before, these joins get pushed down into the different shards. Here’s a join doing a similar kind of thing, where we want to get the airport name. We’re joining on time, which is the distributed table, with defaults, which is not a distributed table; it’s a real local table. ClickHouse will say, fine, I’ll rewrite this to be on time locally, and just push it down to the shard.
In this case, though, we’re going to have the same problem we had a minute ago, which is that we’ll end up doing the join locally and dragging around data we don’t really need yet, which just ends up getting thrown away. One way we can speed this up is, much like before, we can make that left-side table a subquery. If you do this, then the query in the middle gets sent to your remote servers, and the stuff around it gets processed on the initiator node. This is a simple way of dividing this up. If you look in the query log, you can see this happening; you can see the query that arrived at the remote nodes, and you can infer that the rest of it was processed on the initiator.
It is way more complicated when multiple tables are involved, and they’re distributed. We’re not going to go into that. You can just glance at this and understand that you start to have to give ClickHouse guidance about when you need to send stuff out to go scan these distributed tables. It gets complicated fast. The basic thing, though, is that if you keep the queries simple, you usually don’t have to deal with this too much.
That gets us to tips for making distributed queries more efficient. Think about where your data is located. If you’re going to join down in the shard, as you’re processing the shard, the data you’re joining on has to be there in full, so you have to think about that in advance. Another thing: you can move the WHERE and the heavy grouping work. If you can get that into the left side of the join and get it pushed out, that’s where you’re going to get high parallelization. You want that stuff sent out to the nodes, so use a subquery, get that stuff pushed out, and that’ll allow you to use all the CPUs you have available. So you can use a subquery to order the joins after the remote scan. And you can use the query log, which is really important, to see what’s executing on the initiator versus what was actually sent out to the nodes. That enables you to disentangle who’s doing what work and where.
There are many more things to talk about with distributed queries. It’s a deep subject, and this is where things start to become quite complicated, but we’re going to leave it here.
53:49 – Where to Learn More and Wrap-Up
Robert: Let’s just talk about where to learn more. There are a bunch of great data sources. These include the ClickHouse official docs, which should be your first stop, the Altinity blog and YouTube channel, and the knowledge base. We do an enormous amount of work on performance, so there are lots of articles, videos, and knowledge base articles about this. And there are meetups, other people’s blogs, and lots of external resources you can find through Google or your favorite search engine.
The next question is, where did the data come from for this talk? Partially out of our heads, but there are some really important things to know if you want to study performance more deeply. The Altinity knowledge base has really excellent information; hot off the presses, as we’re solving support cases, we build articles. The ClickHouse source code, of course, is a great place to look. It’s well documented, and if you want to understand aggregates, that’s a great place to look. And there are a lot of talks and blog articles I’ve referenced; I used some of those. Unfortunately, for non-Russian speakers, some of the really good ones are in Russian, but what we’re trying to do is get some of that information spread more widely.
So that’s the end of this talk. I’m glad to see that we’ve got a couple of our experts on here with me, Tatyana and Denis, so we can take a few minutes if you have questions, and we’d be happy to answer them. Otherwise, just get out there and surf some big data with ClickHouse. It’s been a pleasure having you on this talk, and we’re delighted to help you. I’m going to go ahead and kill one of the recordings here, and then we’ll go ahead.
FAQ
What is a part in ClickHouse, and why does ClickHouse merge them? A part is a block of data on disk for a given partition, stored in sorted, indexed, and compressed form. Each insert creates one or more parts, which may be small. MergeTree rewrites small parts into larger ones in the background because larger parts mean fewer file descriptors, longer compressed runs, and better read performance. The merge is transactional and silent; you mostly notice it because queries get faster.
How can I make inserts faster without using too much memory? The main lever is max insert threads, which controls how many threads build parts in parallel. Adding threads roughly halves insert time each time, up to a point, but each thread needs its own memory, so RAM use rises proportionally. To use less memory, lower max insert threads or write smaller blocks. The simplest broadly useful tip is to batch inputs into larger blocks so ClickHouse has fewer small parts to merge.
Why does my aggregation query use so much memory? Aggregation builds hash tables in memory, keyed by your GROUP BY columns. Two things drive memory the most: the number of distinct GROUP BY keys, and the type of aggregate function. Heavy functions like uniqExact store every distinct value, which can multiply memory by orders of magnitude. Reducing GROUP BY cardinality, swapping a heavy function for an estimating one, or filtering rows out earlier all reduce memory.
Why does ClickHouse load the right-side table of a join into memory? ClickHouse uses a hash join by default. It loads the right-side table’s join key and any selected columns into an in-memory hash table, then scans the left side and looks up matches. Everything you reference from the right side stays in RAM until the query finishes, so a large right side can exhaust memory. Keep the right side small, select only the columns you need, and consider a dictionary when many queries share the same lookup table.
How do I see what a query actually did? Enable the query log in config.xml. ClickHouse then records each query in system.query_log, including duration, rows, and bytes read, and peak memory. The system.parts table shows part sizes, whether a part is active, and its merge level. Together, these tables let you compare query variants scientifically instead of guessing.
What changes when a query runs across a sharded, distributed cluster? With sharding and replication, you query a distributed table, which acts like a smart view that knows where the replicas live. The node you hit, called the initiator, rewrites the query against the local tables and pushes the scan and aggregation out to the shards, then merges the results. For joins, the data you join on must be present on each shard, and wrapping the left side as a subquery pushes heavy grouping out to the nodes for better parallelism.
© 2026 Altinity, Inc. All rights reserved. Altinity®, Altinity.Cloud®, and Altinity Stable® are registered trademarks of Altinity, Inc. ClickHouse® is a registered trademark of ClickHouse, Inc. Altinity is not affiliated with or associated with ClickHouse, Inc. Kubernetes, MySQL, and PostgreSQL are trademarks and property of their respective owners.
ClickHouse® is a registered trademark of ClickHouse, Inc.; Altinity is not affiliated with or associated with ClickHouse, Inc.
I am looking forward for this webinar. I have seen your previous session on clickhouse tutorial 1 it was amazing.
excited
Excellent talk ! Robert explains wonderful about how to tunning the sql. how to make balance between cpu (time cost) and mem cost.