Real-time, Exactly-once Data Ingestion from Kafka to ClickHouse® at eBay

Presenters: Jun Li, Principal Architect, eBay & Robert Hodges, CEO, Altinity
In this webinar, hosted by Altinity’s Robert Hodges, eBay Principal Architect Jun Li presents the Block Aggregator, a data loader that delivers real-time, exactly-once data ingestion from Kafka to ClickHouse. The core problem is that ClickHouse prefers large block inserts rather than row-by-row loading, while a naive loader that replays Kafka messages after a failure can cause either data loss or data duplication. Kafka’s own transaction mechanism cannot be applied because ClickHouse does not follow its commit protocol.
The solution has the aggregator deterministically produce identical blocks, relying on a Kafka metadata store to track processing state and on ClickHouse block deduplication to discard any block that is replayed. Jun Li walks through the multi-data-center deployment model, the mapping of topics to tables and rows, the aggregator’s internal architecture, and the deterministic replay protocol with its three phases and per-partition metadata. He also covers a runtime verifier that watches Kafka’s metadata topic for anomalies, and shares hard-won lessons from production, including compiling the full ClickHouse codebase into the aggregator, dynamic schema updates, ZooKeeper pressure, distributed locking, and the insert quorum error code 286.
The talk closes with production numbers from an eBay cluster of more than 2,000 shards, sustaining roughly 280,000 messages per second with end-to-end lag usually under 30 seconds. A wide-ranging Q&A covers writing to multiple tables in one stream, throughput, the prospect of open sourcing the aggregator, how it compares to the built-in ClickHouse Kafka engine, and deduplication corner cases.
If you’d like to receive the PowerPoint presentation, please contact us at marketing@altinity.com.
Key Moments (Timestamps)
Key moments generated with AI assistance.
- 0:07 – Introduction and housekeeping
- 1:34 – The problem: loading Kafka into ClickHouse efficiently
- 5:01 – Multi-data-center deployment model
- 11:49 – Mapping topics, tables, rows, and messages
- 12:50 – Block Aggregator architecture in detail
- 16:44 – Achieving exactly-once delivery
- 22:01 – Replay mode and the processing loop
- 27:05 – Resiliency testing and the runtime verifier
- 30:31 – Implementation issues and lessons learned
- 36:30 – Testing, the quorum error, and alerting
- 40:45 – Production deployment and performance
- 45:21 – Q&A
Webinar Transcript
0:07 — Introduction and Housekeeping
Robert Hodges: Hello everybody, my name is Robert Hodges, I work for Altinity, and I’d like to welcome you to a webinar on real-time, exactly-once data ingestion from Kafka to ClickHouse. Our presentation today is being delivered by Jun Li from eBay. Jun Li belongs to one of a couple of teams that we have worked with in the ClickHouse community over the last couple of years, so Jun, it’s a real pleasure to have you here.
Before we jump in, I’d like to share some information about this webinar that will help you enjoy it and post questions. The webinar, as you may be able to see from your screen, is being recorded. We will post the link as soon as it’s available, and we’ll also share a link to the slides, so you don’t need to take frantic notes. All of this will be available afterwards. Second, you can ask questions. There’s a Q&A box, and you can also post them in the chat. The Q&A is easiest to manage, but either way works, so please feel free to post questions. We are planning about a 45-minute talk today, so we’ll have lots of time at the end to answer them. Queue them up and Jun will knock them off as soon as the talk is done. With that, Jun, I’d like to turn it over to you, so please take it away.
Jun Li: Okay, thanks Robert for the introduction. Hello everyone, my name is Jun Li. I am from the data infrastructure team at eBay. Today’s presentation is about our work on real-time data ingestion from Kafka to ClickHouse with an exactly-once guarantee. This is joint work with my colleague Mohammad Roohitavaf.
1:34 — The Problem: Loading Kafka into ClickHouse Efficiently
Jun Li: In the real-time analytics processing system, many upstream data sources are involved to insert data into Kafka, and a downstream loader subscribes to Kafka and then fetches the Kafka message stream they are interested in. This loader then loads the data into a real-time analytics database, and queries can then be issued against the analytics database afterwards. With Kafka serving as a buffering mechanism, the upstream before Kafka and the downstream after Kafka can be decoupled. For example, if the analytics database is under maintenance and becomes read-only, the query can still go through and upstream data can continue to arrive at Kafka.
We choose ClickHouse as the real-time analytics database. In ClickHouse, to achieve high-performance data loading, ClickHouse prefers to have data loaders accumulate many Kafka messages into a large block and then load the block into the database. According to the ClickHouse architecture document, the recommendation is to insert data in packets of at least 1,000 rows, and no more than a single request per second. A single loader that loads Kafka messages one at a time will result in low data ingestion performance, because too many persistent files will be produced at the database, and it causes a data loader loading exception, because the background merging process in the database cannot catch up with the high data loading rate.
Having Kafka sit between the data sources and the data loaders allows data to be buffered for batch loading. So we call the data loader that aggregates Kafka messages into a large block before loading to ClickHouse the Block Aggregator. To the Block Aggregator, Kafka becomes the data source, and the database is the data sink. The network connection between Kafka and the aggregator, and between the aggregator and the database backend, can fail independently. The aggregator itself can also crash and may or may not get restarted afterwards. A naive retry implementation in loading data will either cause data loss or data duplication to the data that is to be persisted in the database. Kafka provides a transaction mechanism for exactly-once delivery, but since ClickHouse has not been designed to follow the transaction commit protocol defined by Kafka, the transaction mechanism cannot be applied here.
So we developed our own solution to provide exactly-once message delivery to ClickHouse from Kafka. The key to the solution is to have the aggregator deterministically produce identical blocks to ClickHouse. We rely on two existing runtime supports to achieve our solution. The first is the Kafka metadata store, to keep track of the aggregator’s processing states. The second is the block deduplication detection supported by ClickHouse.
For the remainder of the presentation, I will present the Block Aggregator we developed for multi-data-center deployment in eBay’s own cloud environment. Then I will detail the deterministic message replay protocol we have developed. Then I will describe the runtime verifier, the monitoring and debugging tool for the Block Aggregator, to make sure the message replay protocol that has been implemented is really working. Then I will highlight some issues and the expertise related to our Block Aggregator implementation and its deployment to the production environment. Finally, I will show some performance numbers for the Block Aggregator that has been deployed in production.
5:01 — Multi-Data-Center Deployment Model
Jun Li: This is our deployment model, to deploy the Kafka cluster and ClickHouse database in multiple data centers. In our eBay cloud environment, data centers are geographically separated with a cross-data-center latency of about 20 milliseconds. In this diagram we have two Kafka clusters in two data centers. These two Kafka clusters are independent from each other. The data producers can insert a string to the Kafka cluster in the same data center as the application, so that we can achieve low latency. They can also insert data into the remote Kafka cluster in a different data center in case a failure happens and the local Kafka cluster is not functional. Note that in this system model, messages stored in these two Kafka data centers are completely different. They are not mirroring each other.
The ClickHouse cluster has multiple shards, and each shard has multiple replicas located in two data centers. In this diagram we have two replicas in each data center, so each shard has a total of four replicas. Each black dot in the ClickHouse replica is our Block Aggregator. The Block Aggregator collocates with each of the ClickHouse replicas. In the Kubernetes environment, the aggregator and the ClickHouse replica are put into two different containers in the same pod. Each Block Aggregator subscribes to both Kafka clusters, and each Block Aggregator only inserts a block into the collocated ClickHouse replica. We rely on the ClickHouse replication protocol to replicate the inserted block to the other replicas in the same shard.
In terms of the Kafka topic definition, each shard has its unique topic in its Kafka cluster. For each topic we have multiple partitions. The number of partitions assigned to each topic is the same as the number of replicas defined for each ClickHouse shard, so that loading from Kafka to ClickHouse can be balanced across the replicas. In this diagram the number of replicas in each shard is four, and therefore each topic has been assigned four partitions. Note that instead of a two-data-center deployment model, in this diagram we can have three data centers to deploy ClickHouse, in order to achieve even higher ability to counter data-center-level failures.
For this and the next several slides, let me go through several of the data-center-level failure scenarios, so you can see how our solution can deal with these failures. Although a data center failure may be rare nowadays, data center maintenance is still a routine, for example for monthly security patching. During the maintenance window, a large number of machines can go down and get impacted. If we can design and implement it to be resilient to such scheduled maintenance, and in general to be tolerant of data-center-level failure, then it should be resilient to scheduled maintenance as well.
Suppose now one Kafka data center is stopped. The application inserts into the other Kafka cluster that is still alive, and in the ClickHouse cluster, the Block Aggregator can still subscribe to the active Kafka cluster, create blocks, and insert them into ClickHouse. Once the Kafka data center failure gets recovered, the Block Aggregator will automatically resume data fetching from the message offset where it stopped before the failure, and Kafka is viewed as a persistent queue.
In this case, instead of one Kafka cluster being down, we can have the ClickHouse cluster having one data center going down, and thus half of the replicas going down. The Block Aggregator located in the still-active replicas can still subscribe to both Kafka clusters and insert blocks into the ClickHouse replicas that are still alive. In this scenario we need to configure the insert quorum equal to two in ClickHouse, that is, replication is considered successful only if at least two replicas acknowledge receiving the blocks successfully. If insert quorum is chosen to be greater than two, then under the data-center failure scenario, the insertion to ClickHouse will always fail. Later on, when failure recovers and the replicas in the other data center get back online, the ClickHouse replication protocol will automatically trigger, and data synchronization happens among the replicas. Each replica will fetch the parts they do not have into their local store.
Let’s consider the even more severe situation. Suppose one Kafka cluster and one ClickHouse data center are both down. This can happen if one Kafka cluster and one ClickHouse cluster are both deployed in a single data center. Still, in this case, half the replicas in ClickHouse survive, and thus half of the Block Aggregators survive, and data ingestion from the active Kafka cluster can continue to function. When the data center failure gets resolved, the Block Aggregator resumes message consumption from the Kafka cluster from where it stopped, and the ClickHouse data replication gets triggered automatically to synchronize data across the replicas.
11:49 — Mapping Topics, Tables, Rows, and Messages
Jun Li: In the previous slides we saw how the shard gets assigned with the topic, so let’s examine more on the mapping of topics, tables, rows, and messages. In this diagram, a topic contains consecutive messages, each assigned with a message offset. In each message there’s a header to describe the table the message belongs to, and then multiple rows associated with the same specified table. Each message is an opaque byte array based on the protocol buffer binary encoding. When the Block Aggregator decodes the protocol-buffer-encoded message, it relies on the table schema fetched from the collocated ClickHouse replica to perform the message decoding.
A topic can support multiple tables. When a table is added, there’s no schema change to the Kafka copy, and the number of topics does not grow when tables continue to be added. Note that each Block Aggregator subscribes to two Kafka clusters independently. The messages from the two Kafka data centers get merged when the table rows get merged at the ClickHouse backend by the background merging process.
12:50 — Block Aggregator Architecture in Detail
Jun Li: This diagram shows the aggregator architecture in detail. In each Kafka cluster, for each topic we have multiple partitions, and we also have a Kafka internal system topic to commit the Block Aggregator-related metadata. In each Block Aggregator collocated with the ClickHouse replica, there are two Kafka connectors, each having its own independent processing group corresponding to the topic they subscribe to in one of the Kafka clusters. Since the number of partitions is the same as the number of replicas, under normal operational conditions each Block Aggregator can receive a balanced load from each of the two Kafka clusters.
The current application produces table rows as Kafka messages and inserts them into the Kafka broker. Multiple rows can be batched for insertion efficiency, and also for fetching efficiency later by the Block Aggregator. At the same time, at start time, the Block Aggregator will retrieve the table schema definition from the collocated ClickHouse replica. Schema update is supported. Whenever a new table is introduced or an existing table schema gets updated, the Block Aggregator can get the updated schema via some external event triggering, such as a configuration being changed.
The message consumer in the Kafka connector subscribes to the topic and then starts to fetch the Kafka messages. The message consumer then hands the correct message to the partition handler. It is possible that one Kafka connector can be assigned more than one partition under some failure conditions. The partition handler identifies the table name from the Kafka message and then deserializes the Kafka message based on the table schema. The rows in each in-memory buffer that belong to the same table continue to grow. When the buffer reaches a threshold, either the size reaches the limit, say 10 megabytes, or the lifetime of the buffer reaches a limit, say one second, then the buffer is sealed and the block gets pushed to the background processing pool. The flushing task is responsible to send the block into the replica collocated in the same node. Once the blocks get accepted by the ClickHouse replica, the block gets replicated to the other replicas in the same shard by the ClickHouse replication protocol.
We choose insert quorum equal to two to force two replicas to have the same data copies before the block-flushing tasks get a successful acknowledgement. Compared to quorum equal to one, which is the default value, the latency for data insertion will be higher, but data reliability and data availability will be higher as well. The offset of the processed message and other metadata also get persisted by the partition handler to the Kafka metadata store, which is a Kafka consistent topic. I will come back to the metadata later.
In summary, the key features of the Block Aggregator we have developed are the following. It’s deployed in multiple data centers. Multiple tables are supported for each topic and each partition. And due to our exactly-once delivery protocol from Kafka to ClickHouse, there’s no data loss and no data duplication. We also have over 100 metrics in the Block Aggregator to systematically monitor the aggregator’s progress in terms of online support, like processing rate, failure rate, distribution of loading time and block size, and some abnormal behavior such as a message offset in Kafka being rewound or skipped.
16:44 — Achieving Exactly-Once Delivery
Jun Li: Next, let’s focus on how we can achieve exactly-once delivery, that is, no data loss and no duplication, in our Block Aggregator. One way to implement the message replay can be as follows. The Block Aggregator retrieves messages m1 and m2 and constructs a block in memory, then the aggregator flushes the block to the database, and after that it commits the offset to Kafka. Suppose the third step, committing to Kafka, fails. Then in the recovery phase, messages m1 and m2 get picked up again, along with message m3 which is new, the block is constructed and inserted into the database successfully. Notice that m1 and m2 get duplicated in this case.
So what if we change the protocol slightly? We have messages m1 and m2 fetched and constructed as a block. This time we commit the offset to Kafka first and then flush the block to the database. Suppose the block flushing fails. Then in the recovery phase, the new messages m3 and m4 get picked up, because in phase one the offset commit was successful. So this time m3 and m4 are aggregated in a block and flushed successfully, but with this protocol, m1 and m2 get lost in the database.
So let’s get to our solution. Our solution requires the block-level deduplication supported by ClickHouse. ClickHouse stores its metadata in ZooKeeper. Each block inserted has its hash value as a fingerprint, registered in ZooKeeper. When a new block is inserted, the block’s hash value has to be checked to avoid block duplication. Two blocks are identical if they have the same block size, contain the same rows, and the rows are in the same order. I believe that in the architecture document of ClickHouse, what they mention about this block deduplication is actually trying to prevent duplication from communication failures.
We also rely on the runtime support from Kafka to store its processing state as metadata to Kafka. In case of failure, the next Block Aggregator will pick up the stored metadata and know how to resume message processing and construct the exact same blocks for each table. Here, when we say the next Block Aggregator, it is not necessarily the aggregator located on the same replica. It can be from a different replica in case Kafka partition balancing happens.
The metadata structure is very concise, and it’s per partition. The replica ID records which replica the Block Aggregator is associated with. Then for each table it has the following data structure: table name, the beginning message offset, and the end message offset of the current block. We also check in memory the minimum of all the beginning message offsets of all tables and the maximum of all the end message offsets for all the tables. So in the example shown here, the last block for table 1 to be loaded to ClickHouse has a start offset of 0 and an end offset of 29. It has a total of 20 messages consumed. For table 2, the last block has a start offset of 5 and an end offset of 20, so 10 messages for table 2 have been consumed. In total, from the minimum offset of 0 to the maximum offset of 29, we have 30 messages, 20 for table 1 and 10 for table 2.
There’s a special block we record in metadata. It has the begin message offset equal to the end message offset plus one. This block is to indicate that there is no message for the table that has an offset less than the beginning message offset, or that any message the table has with a message offset less than the beginning offset has been received and acknowledged by ClickHouse. In the example here, all of the messages that have a message offset less than 30 have been acknowledged by ClickHouse already.
So with the metadata structure introduced, the message processing consists of three phases. In the first phase, the aggregator consumes the Kafka messages corresponding to a table into its own local memory, and the metadata gets updated as the Kafka message consumption is making progress. In the second phase, the metadata structure gets committed to the Kafka metadata store, more specifically a system built-in topic called consumer offset. In the third phase, each block corresponding to a different table gets inserted into the backend database. Notice that the message offset committed to Kafka is the minimum of the beginning message offsets from all of the tables recorded in that metadata instance. Thus, in case of a failure of the Block Aggregator, the next Block Aggregator starts message consumption from this recorded message offset. Again, the message processing here is per partition.
22:01 — Replay Mode and the Processing Loop
Jun Li: So the aggregator always starts from the message offset previously committed. There are two exclusive modes in the aggregator. The replay mode is to try to send out the last blocks again to ClickHouse to avoid data loss. Then the aggregator switches to the consume mode afterwards, which is the normal operation mode, after the replay mode is finished. Because if the block has already been accepted, then the block will be silently discarded without an exception being thrown back to the data loader. The mode switching from consume mode to replay mode is based on the condition of whether the current message offset being consumed is still smaller than the retrieved metadata’s maximum offset.
At a high level, the Kafka connector’s main processing loop contains an inner loop and an outer loop. They consume the messages and check the buffer, and if a failure happens, they disconnect from Kafka and recreate a partition handler, then go back to the outer loop where potentially balancing can happen. In the outer loop, at the beginning, we make sure that both Kafka and ClickHouse are healthy and connected to this Kafka connector. In each iteration we consume a batch of messages from Kafka and feed them to the corresponding partition handler. The partition handler accumulates messages in a block. When a partition handler reconstructs the latest block that was intended to be sent to ClickHouse before the crash, it sends the block, and once all the partition handlers reconstruct and flush the latest pre-crash blocks, then we switch to the consume mode. In the consume mode, the partition handler flushes blocks based on size or time duration. For each block that’s ready, the partition handler first records its intention to Kafka, that is, conveys the metadata to Kafka, and then flushes the block to ClickHouse. So in every step, if a failure happens, the control breaks the inner loop, goes to disconnect from Kafka, clears the partition handler, gets to the outermost loop, and starts from there again. Once the control breaks the inner loop and disconnects from the Kafka partition, balancing can happen at the broker side, and that partition handling will be picked up by the connector in the other replica.
Maybe this is too abstract, so just to make some clarification. A partition handler can be dynamically created or destroyed by the Kafka client library linked with the Block Aggregator. The library always honors the decision from the remote Kafka broker. Under some failure conditions, one Kafka connector can be assigned two partition handlers. It’s the partition handler that performs the metadata update for the corresponding partition. So if you have two partitions assigned to this Kafka connector, they each commit separately to the Kafka broker. Each partition handler can process multiple tables, because each partition can support multiple tables.
At any given time, each partition handler can only have one in-flight block for each table to be inserted to ClickHouse. No new block can be submitted until the current in-flight block is successfully acknowledged by ClickHouse. Therefore, this is very similar to the write-ahead log in a traditional database. The metadata commit is just one block per table ahead, and therefore it’s a write-ahead log with one block. In other words, when failure happens, at most one block per table needs to be replayed.
Suppose now let’s examine some failure conditions. Suppose the block insertion to ClickHouse fails. The outermost processing loop will disconnect from the Kafka broker, and that triggers partition balancing at the Kafka broker. As a result, a different replica’s Kafka connector will be assigned for the partition, and block insertion continues at this new replica. So the balancing allows ClickHouse to globally recharge with the last committed state over multiple replicas. The same failure mechanism can be applicable to other failure conditions, such as failure to commit metadata to Kafka. Therefore, the Kafka consumer group rebalancing is a good indicator that a failure cannot be recovered by the Block Aggregator itself.
27:05 — Resiliency Testing and the Runtime Verifier
Jun Li: This shows an example captured from one of the resiliency test runs in our test suite, to simulate one data center being down. Two aggregators in two replicas are shut down in one shard, and the traffic gets transferred to the remaining two other aggregators. We can see that the block insertion rate that belongs to the two still-active replicas increased to 2x, until the two other aggregators that have been down recover and resume back to the normal situation, where each one of them has more or less the uniform block insertion rate.
Now let’s go to the next session, which is the runtime verifier as the debugger for our Block Aggregator. The metadata structure introduced for deterministic message replay can also help us verify whether the message replay across multiple replicas is always correct and does not cause any data loss or data duplication. The aggregator commits the metadata to Kafka for each partition before flushing the blocks to ClickHouse. The metadata commit actually gets appended into a Kafka topic called consumer offset. By subscribing to this special system topic in Kafka, the runtime verifier can learn all of the blocks that have been flushed to ClickHouse successfully by all the aggregators in the entire cluster.
We capture two kinds of anomaly. One is called backward anomaly and the other is called overlap anomaly. The tracking is simple, based on the beginning offset and end offset of the table recorded in metadata. Note that in this table, time progresses upward, and m is the metadata instance of a table that gets committed. In terms of the verifier implementation, the verifier has to subscribe to the consumer offset and retrieve the metadata instances in sequence. But the consumer offset is a partitioned topic, and Kafka does not guarantee ordering across partitions, so the verifier has to order the metadata instances based on time by itself, based on their commit timestamps recorded by the brokers. Therefore, the clock of the brokers cannot deviate too much compared to the interval of committing metadata. This is not a problem in the Block Aggregator, because it commits metadata only every several seconds, and this time granularity is compatible with or larger than the normal clock skew at the brokers.
30:31 — Implementation Issues and Lessons Learned
Jun Li: Let’s move to the next section to share the issues and experience we gained from implementing the Block Aggregator and deploying it into production.
First of all, instead of taking the C++ ClickHouse client library, which is one of the sub-repos in ClickHouse, to develop our Block Aggregator, we choose to compile the entire ClickHouse code base and link the ClickHouse library with the Block Aggregator, so that we can take advantage of the full implementation of ClickHouse. With this ClickHouse code base, we get the native TCP communication protocol between the Block Aggregator and the ClickHouse server. We can have the same functionality in the Block Aggregator just like the ClickHouse client tool. In particular, we can issue the select query and get the query result in the Block Aggregator. Although the aggregator is designed for insertion only, the select query can be used for testing purposes. The table schema can be retrieved, and the block header that represents the table column definition can be constructed from the library. We can also rely on ClickHouse library calls to construct a column from the protocol-buffer-based message based on the table schema. We can also rely on the ClickHouse code to evaluate the column default expressions and populate them into the block, which is then compressed. And finally, we use the ZooKeeper client library to have distributed locking, and I will touch on this distributed locking later in the presentation.
The second topic I’d like to touch on is the dynamic schema update. The schema update is a sporadic request. We have a separate schema management system to update the schema definition to each shard across the entire cluster. Then we restart each aggregator, and during the initialization of the aggregator, it will retrieve the new table schema definition from the collocated ClickHouse replica. Then we notify the current application via an offline channel, for example through email, and sometime later the application will update its application logic with the new schema definition. However, the current application can be deployed in many instances across multiple data centers, therefore there’s no way to coordinate all of them to start to use a new table schema. As a result, we have a requirement from our Block Aggregator that it has to be able to deserialize the Kafka message into a block for the Kafka message that’s constructed based on the current table schema or based on the updated table schema. The solution to address this requirement is that in a table schema update, we have to make sure that a column can only be added but not deleted. As a result, the aggregator can always have the superset of the columns needed to perform the message deserialization.
ClickHouse relies on ZooKeeper to be its own metadata store and to perform replication across multiple replicas. Each block insertion takes about 15 remote calls to the ZooKeeper server cluster. Block insertion is performed in parallel, and thus the pressure to the ZooKeeper cluster is proportional to the number of tables. Our ZooKeeper cluster is deployed across three data centers, with a cross-data-center latency of about 20 milliseconds, so the communication overhead in our ZooKeeper cluster is higher than in a different deployment model that uses only a single active data center with some followers. But then what we get is automated failover, and it is much easier to manage. In one of our large ClickHouse deployments, such as the one with 2,050 shards, we experienced a large number of ZooKeeper hardware exceptions, which is due to frequent ZooKeeper session expirations. So we may create the cluster to have multiple ZooKeeper clusters, each ClickHouse sub-cluster, say 50 shards, collocated with a ZooKeeper cluster. The actual number of shards to be allocated with a ZooKeeper cluster depends on the block insertion rate per table and the total number of blocks involved in the insertion. So the 50 shards is from this particular case, but not necessarily in general.
I mentioned that there should be distributed locking, so why do we need it? Before the setting called insert_quorum_parallel, which is in ClickHouse but only recently introduced in ClickHouse 21.7 that we have been using for a year, these settings did not exist, and the implementation enforced that in one shard each table only allows one replica to perform the data insertion at any given time. To avoid having a lot of pre-crash exception rates because of this enforcement, we have to coordinate the data insertion over the multiple aggregators using distributed locking. We leveraged the ZooKeeper library in ClickHouse that provides a distributed locking implementation for us. In a more recent ClickHouse version, for example 21.8, the new setting insert_quorum_parallel is introduced, and its default value becomes true, so data insertion into multiple replicas is allowed. But according to analysis published by Altinity, with this setting the current ClickHouse implementation breaks some sequential consistency and may have other side effects. So as a result, in our own ClickHouse release internally, we disable the setting and we still enforce the distributed locking at the Block Aggregator side.
36:30 — Testing, the Quorum Error, and Alerting
Jun Li: In terms of how to bring this Block Aggregator into production quality, besides the traditional unit testing, we have a Block Aggregator-related test suite. The first one is the resiliency test suite, which runs in an actual cluster with 32 replicas, and we follow the chaos monkey approach to kill the processes and containers across ZooKeeper, ClickHouse, and the Block Aggregator. We also kill them in batches, so that we simulate a whole data center going down, and for each test case, we verify whether the data loading can recover and continue.
Complementary to this chaos-monkey-based approach, we also have a second small-scale integration test suite that runs on a single machine, but with all the different processes from ZooKeeper, ClickHouse, and the Block Aggregator. We have a controller to control the stop and start of each process, and we have a small table insertion into this small cluster. Furthermore, we can have fault injection that is pre-deployed in the aggregator source code and gets turned on when the test is running. For example, we can deliberately control the aggregator to not accept the Kafka message for 10 seconds, and then for each test case we can vary whether data loss and data duplication really happened or not. For the first one, the resiliency testing, it’s not easy to do this kind of data loss and data duplication testing.
There’s one challenge we have been facing since our system went to production, because of this quorum issue. As I mentioned earlier, insert quorum equal to two is the default setting in order to guarantee high data reliability, but with this setting we not only introduce a high latency that we can tolerate, but also introduce an exception. A particular exception is error code 286, and this is a typical error message shown here on the screen, that the quorum from the previous write has not been satisfied yet, because there’s only one replica in the quorum that needs yet another replica in order to satisfy the condition of insert quorum equal to two. And because it cannot be satisfied, and because all the replicas consume the same topic stream, all of them actually get stuck. So you will see that for about one hour, the whole shard will not make progress, or will only make small progress, if this exception happens.
In order to address this quorum issue, we developed an in-house tool which scans the ZooKeeper subtree associated with the log replication queue, and then inspects whether, inside the queue, the queue commands cannot be confirmed. Once we understand why the queue commands cannot get cleared, then the quorum can automatically get satisfied, and afterwards data insertion resumes in the shard. In order to be aware until the failure happens, we actually have alerts defined, including, for example, that we detect a long duration of time when a shard does not have block insertion, and the block insertion experiences a non-zero failure rate with error code 286. We also check the replication queue. We expose the system table replication_queue size as a metric, so that we can write the alert rule saying that replicas cannot have a replication queue size greater than some value, for example 50.
40:45 — Production Deployment and Performance
Jun Li: Let’s move to the final session, about the deployment of our Block Aggregator. This is an example of the cluster we deployed in the eBay cloud environment. It has been in production since last November. We have two Kafka clusters in two data centers. The ClickHouse cluster has 2,050 shards across two clusters, each with four replicas, with two replicas in each data center. So each aggregator collocates with a replica.
The performance numbers collected from this example cluster: at the peak it consumed about 280,000 messages per second, with 22 megabytes being consumed across the entire cluster of 2,050 shards. The block insertion time is in seconds with respect to the block size. Two tables have a relatively large size, larger than 100 kilobytes per block, but one table is actually smaller because the actual traffic from this table 2 is very small compared to the other two tables. This can be shown from the number of rows in a block: table 2 has only about two rows per block, but the other two tables have a thousand rows in a block. There’s an end-to-end message consumption lag, which is defined as the duration from the time the message gets stored to the Kafka cluster to the time the message has the corresponding table row stored into ClickHouse and replicated to at least two replicas and is ready to be queried. In this example, the entire message consumption lag is typically lower than 30 seconds.
This shows the block insertion rate in this 2,050-shard cluster. In the 24-hour window, you’ll see that most of the time it’s very stable and uniform across all the shards. This is the lag chart I mentioned. Most of the time it is less than 25 or 30 milliseconds, but at one instance it shot up to like 50 seconds. That could be due to some communication latency or a transient failure, or could be due to the quorum not being satisfied, but anyway it can recover automatically by itself without human intervention. There’s also rebalancing. I mentioned that the rebalancing rate is a good indicator that there’s a severe error that happened in the cluster that each individual Block Aggregator cannot solve, and therefore triggers the rebalancing. This is some severe error, and for the entire 24-hour window there’s really no rebalancing across the 2,050 shards, or actually about 200 streams or topics per data center Kafka, so totally 500 streams. I also mentioned the ZooKeeper, that we have used the multi-ZooKeeper configuration cluster in order to address the high pressure coming from the large insertion. You saw that most of the time it’s zero, and occasionally it shoots up to 60 milliseconds, but even the 60 milliseconds is very small compared to the previous situation where we had one single ZooKeeper cluster to handle all the traffic.
As a summary of the presentation, using a streaming platform like Kafka is a standard way to transfer data across data processing systems. For a database such as ClickHouse, block loading is more efficient compared to individual record loading. Under failure conditions, replaying Kafka messages may cause data loss or data duplication, so we developed a solution to deterministically produce identical blocks under various failure conditions, so that the backend ClickHouse can detect and remove the duplicated blocks. The same solution allows us to verify that blocks are always produced correctly under failure conditions, and this solution has been deployed into production. So that concludes my presentation. Thank you.
45:21 — Q&A
Robert Hodges: Thank you, Jun, that was great. We have several questions, which I’d like to queue up. There was a question from Jared. Which client supports writes to multiple tables in a single write? I’m currently using HTTP with row binary, so I don’t think I can write to multiple tables at once.
Jun Li: Good question. We don’t have this problem, because we do not use any existing C++ client library available, including the C++ client library. We compile and link the entire ClickHouse code base into our Block Aggregator, so we can do whatever ClickHouse is able to do. So when the messages come, we control the message consumption, and therefore we can have multiple tables in a single stream.
Robert Hodges: Jared, to his credit, had figured that out fairly quickly when this slide popped up, so good one, Jared. I thought it was an interesting question worth expanding on. There’s a question from Nikolai, and I’m reading out of the chat, please feel free to put them in the Q&A box. From Nikolai, you had a number of 280,000 messages per second. Could you clarify whether that’s per replica, per shard, or per cluster?
Jun Li: Quick question, let me see. So as I mentioned, these two Kafka clusters hold different data. It’s not about mirroring. The client always goes to the Kafka cluster closer to it, and only when a failure happens it goes to the remote one. So there’s no duplicate data, the same data is not stored in both Kafka clusters. Therefore, when you pick it up, each aggregator, although there are four of them associated with four replicas, they pick up different messages. So when I sum this to 280,000 messages per second, it’s actually the unique messages consumed from all the aggregators. There’s no such thing as duplicated messages at all, because we are not measuring from ClickHouse.
Robert Hodges: So overall, is it correct to understand that 280K per second is an overall message processing rate?
Jun Li: Yes. This is similar to what you’d get if you deploy your data loader, for example picking up some data loader from open source, and that would be your data loading array.
Robert Hodges: There was a follow-on question, also from Nikolai. Was the system stressed with higher rates of data, or is this just your normal insertion rate?
Jun Li: We test that. This one is in production. At one point they needed to double the number of tables because of some situations where they needed to migrate tables. So this customer went up to about 1.1 times, actually about double. But then we started to see the ZooKeeper exception. When we doubled, we started to see the exception go up, although it’s still tolerable. But it looks like doubling that message rate is the one we don’t want to go further with, because of this exception. That means we have reached the limit that can be processed by that ZooKeeper sub-cluster, even though it’s a 50-shard cluster. If we go higher than that rate, it looks like we still hit ZooKeeper becoming a bottleneck. Certainly, because our ZooKeeper is across three data centers, not the one that Altinity recommends, using a single data center with a follower as standby. We don’t do that, so that may be one reason we can only scale that much. But hopefully in a coming release there’s a new implementation, what’s called the coordination module inside ClickHouse, replacing the existing ZooKeeper implementation using some kind of Raft-based C++ implementation. Because it’s per-shard, I believe this problem will go away.
Robert Hodges: I have a question for you, if you don’t mind me jumping in. You show a single cluster in all of your slides. Do you actually have a second cluster reading from the Kafka queues in another data center?
Jun Li: Why do I need a second one? It’s two data centers, two Kafka clusters, and we retain some time, I think two or three days. Usually a data center going down is rare, maybe several hours, so we feel it’s always enough to have two data centers. What’s your concern?
Robert Hodges: What you have here is a great picture. You have the Kafka clusters, which means you can have transient failures in either data center, but people can always find a Kafka cluster they can produce into. But in your architecture you have a single ClickHouse cluster in one data center, is that correct?
Jun Li: No, no. I have two. I have two, I mean, I have DC1, DC2, and DC3 in this diagram.
Robert Hodges: Excellent. Okay, so you’ve got the replicas shared across data centers, got it. So you basically can tolerate a failure in one data center, and because you set insert quorum to two, you can continue to operate with one data center down. Excellent, thank you.
Robert Hodges: So we have lots more questions, so steel yourself. Here’s one that came up twice already. Do you plan to release the Block Aggregator to open source?
Jun Li: That’s one of the reasons I wanted to have this seminar, to share with you my high-level design and get your feedback. Robert, I think we need to have another discussion with you internally to see how to move forward. It’s my desire to open source it, and I believe it’s eBay’s spirit to open source it, because look at the community, and it’s our design and our spirit to share.
Robert Hodges: I love that answer, thank you so much. I can see somebody already celebrating. Here’s another one, from Yu Xing. ClickHouse has a Kafka engine, so how does that compare to the Block Aggregator, and what problems do you see with the ClickHouse built-in Kafka engine?
Jun Li: Maybe in my presentation I didn’t emphasize the uniqueness enough. We claim that you don’t have data loss and data duplication when loading data from Kafka to ClickHouse based on the protocol we defined. The multi-table support may not be the most significant, and the multi-data-center deployment maybe you can also do. But because we developed this Block Aggregator ourselves from scratch, leveraging a lot of the ClickHouse code, one thing I would mention is that there are over a hundred metrics, so throughput, failures, all the things. Frankly, in the entire system we deploy, there are already about 300 metrics from ClickHouse, and I add maybe 150 of these, so together over 500 metrics that we have dashboards for at different aggregation levels to visualize the behavior of the system. I emphasize this is exactly-once. I think the Kafka engine is at-least-once guaranteed in ClickHouse, and for the real production code, you really need enough monitoring, but I believe what gets exposed in the engine today is very minimal. For example, with this I can calculate the lag by retrieving the lead time exposed from the Kafka broker, divided by the message processing rate of the aggregator, and that I would consider the lag time. This is the most important metric on the dashboard we use, because everything happens in the aggregator, in Kafka, and in ClickHouse. If something goes wrong, this number would go to like 30 minutes and an alert would fire.
Robert Hodges: I think that’s a great point you’ve made. I have a couple more questions, because we’re coming up on the top of the hour and I want to make sure we get these. Another question from Yu Xing: what happens when the aggregator crashes during replay mode?
Jun Li: Because the replay does not commit, the replay mode will finish and then move on. So if a replay crashes, the next one still picks up from the committed offset. So it does not matter for the replay mode.
Robert Hodges: I have one final question from my side. It seems to me there might be a corner case where you’re dependent on ClickHouse deduplication to throw away blocks when you’re replaying, and that’s a key part, almost like a sliding window protocol. Are there corner cases? ClickHouse, as I recall, by default stores a hundred hashes. Is there a corner case where, because of a partition in Kafka somehow being unavailable for a long period of time, you could lose a block, because you’d read more blocks from other partitions, load them into the table, and then forget a block that was loaded partially from a partition a long time ago?
Jun Li: That, well, you are the better expert in ClickHouse. This part I had not delved into the code. I believe there’s a retention time. Based on my understanding, there’s a retention time of up to seven days to keep the hash, I thought that’s a parameter. So if seven days is the retention time, that would be fine enough. If it’s longer than seven days, but anyway, because the Kafka retention time is three days, you would never be able to replay anyway.
Robert Hodges: Great, okay, I’ll double-check that, because this used to be the behavior, but you’re using a more modern version of ClickHouse. So we are at the top of the hour, actually a minute over. Jun, I want to thank you for this really awesome talk. I’m going to make up an award for you. I think you get the Jim Gray memorial transaction processing award. If you’re familiar with Jim Gray, this is the best transaction processing talk we’ve ever had in this series. Thank you so much, and on behalf of our audience, thank you, and thank the audience for these really great questions.
Jun Li: Okay, thank you very much.
Robert Hodges: Once again, this is recorded, you’ll get the link and the slides. If you want to follow up, you can send us email at info@altinity.com, and we will forward questions to Jun Li. Jun, thanks again, and have a great day.
Jun Li: You too. Bye.
FAQ
What is the Block Aggregator and what problem does it solve? The Block Aggregator is a data loader eBay built to ingest data from Kafka into ClickHouse. ClickHouse loads data far more efficiently in large blocks than row by row, so the aggregator batches Kafka messages into blocks before inserting them. Its key contribution is exactly-once delivery: under failures, a naive loader would either lose or duplicate data, and Kafka’s transaction mechanism cannot be used because ClickHouse does not follow Kafka’s commit protocol.
How does the Block Aggregator achieve exactly-once delivery? It makes retries deterministic. The aggregator records per-partition metadata in a Kafka system topic that tracks exactly which messages form each block, so after a failure the next aggregator can rebuild byte-for-byte identical blocks. ClickHouse block deduplication then recognizes any block it has already stored, by comparing the block’s size, rows, and row order, and silently discards the duplicate. The result is no data loss and no duplication.
How is the system deployed across data centers? There are two independent Kafka clusters in two data centers, holding different (not mirrored) data, and a ClickHouse cluster whose shards have replicas spread across the data centers. A Block Aggregator is collocated with each ClickHouse replica in the same Kubernetes pod and subscribes to both Kafka clusters. With insert quorum set to two, the system keeps ingesting even when an entire data center, including half the replicas, goes down, and ClickHouse replication resynchronizes when the data center recovers.
Why does eBay compile the whole ClickHouse codebase into the aggregator? Rather than using a standalone C++ client library, eBay links the full ClickHouse library so the aggregator has the same capabilities as the ClickHouse client. This gives it the native TCP protocol, the ability to retrieve table schemas and build block headers, construction of columns from protocol-buffer messages, evaluation of default expressions, block compression, and a ZooKeeper-based distributed locking implementation.
How does the Block Aggregator compare to ClickHouse’s built-in Kafka engine? The built-in Kafka engine provides at-least-once delivery, whereas the Block Aggregator is designed for exactly-once delivery with no loss or duplication. The aggregator also adds extensive observability, with over 100 of its own metrics on top of ClickHouse’s, feeding dashboards at multiple aggregation levels, including an end-to-end lag metric. It was also purpose-built for multi-table streams and multi-data-center deployments.
What were the main production challenges? ZooKeeper pressure was significant: each block insert makes many ZooKeeper calls, and pressure scales with the number of tables, so eBay split shards across multiple ZooKeeper clusters. They also enforce distributed locking so only one replica inserts per shard at a time, avoiding consistency issues from parallel quorum inserts. And insert quorum of two can produce error 286 when a quorum is not yet satisfied, which can stall a shard until an in-house tool inspects and clears the replication queue.
© 2026 Altinity, Inc. All rights reserved. Altinity®, Altinity.Cloud®, and Altinity Stable® are registered trademarks of Altinity, Inc. ClickHouse® is a registered trademark of ClickHouse, Inc. Altinity is not affiliated with or associated with ClickHouse, Inc. Kubernetes, MySQL, and PostgreSQL are trademarks and property of their respective owners.
ClickHouse® is a registered trademark of ClickHouse, Inc.; Altinity is not affiliated with or associated with ClickHouse, Inc.