Handling Real-Time Updates in ClickHouse

Mutable data is generally unwelcome in OLAP databases. ClickHouse is no exception to the rule. Like some other OLAP products, ClickHouse did not even support updates originally. Later on, updates were added, but like many other things they were added in a “ClickHouse way.”

Even now, ClickHouse updates are asynchronous, which makes them difficult to use in interactive applications. Still, in many use cases users need to apply modifications to existing data and expect to see the effect immediately. Can ClickHouse do that? Sure it can.

781-2.png

A Short History of ClickHouse Updates

Back in 2016, the ClickHouse team published an article titled “How To Update Data in ClickHouse”. ClickHouse did not support data modifications at that time. Only special insert  structures could be used in order to emulate updates, and data had to be dropped by partitions.

Under the pressure of GDPR requirements ClickHouse team delivered UPDATEs and DELETEs in 2018. The follow-up article Updates and Deletes in ClickHouse is still one of the most read articles in the Altinity blog. Those asynchronous, non-atomic updates are implemented as ALTER TABLE UPDATE statements, and can potentially shuffle a lot of data. This is useful for bulk operations and infrequent updates, when immediate results are not needed. “Normal” SQL updates are still missing in ClickHouse, though they reliably appear in the roadmap every year. If real-time update behavior is required, we have to use a different approach. Let’s consider a practical use case and compare different ways of doing it in ClickHouse.

Use Case

Consider a system that generates various kinds of alerts. Users or machine learning algorithms query the database from time to time to review new alerts and acknowledge them. Acknowledgement operations need to modify the alert record in the database. Once acknowledged, alerts should disappear from the users’ views. This looks like an OLTP operation that is alien to ClickHouse.

Since we cannot use updates, we will have to insert a modified record instead. Once two records are in the database, we need an efficient way to get the latest one. For that we will try 3 different approaches:

  • ReplacingMergeTree

  • Aggregate functions

  • AggregatingMergeTree

ReplacingMergeTree

Let’s start with creating a table that stores alerts.

CREATE TABLE alerts(
  tenant_id     UInt32,
  alert_id      String,
  timestamp     DateTime Codec(Delta, LZ4),
  alert_data    String,
  acked         UInt8 DEFAULT 0,
  ack_time      DateTime DEFAULT toDateTime(0),
  ack_user      LowCardinality(String) DEFAULT ''
)
ENGINE = ReplacingMergeTree(ack_time)
PARTITION BY tuple()
ORDER BY (tenant_id, timestamp, alert_id);

For simplicity, all alert specific columns are packaged into a generic ‘alert_data’ column. But you can imagine that alert may contain dozens or even hundreds of columns. Also, ‘alert_id’ is a random string in our example.

Note the ReplacingMergeTree engine. ReplacingMergeTee is a special table engine that replaces data by primary key (ORDER BY) — the newer version of the row with the same key value will replace the older one. ‘Newness’ is determined by a column, ‘ack_time’ in our case. The replacement is performed during background merge operation. It does not happen immediately and there is no guarantee it happens at all, so consistency of the query results is a concern. ClickHouse has a special syntax to work with such tables, though, and we will be using it in the queries below.

Before we run queries, let’s fill the table with some data. We generate 10M alerts for 1000 tenants:

INSERT INTO alerts(tenant_id, alert_id, timestamp, alert_data)
SELECT
  toUInt32(rand(1)%1000+1) AS tenant_id,
  randomPrintableASCII(64) as alert_id,
  toDateTime('2020-01-01 00:00:00') + rand(2)%(3600*24*30) as timestamp,
  randomPrintableASCII(1024) as alert_data
FROM numbers(10000000);

Next, let’s acknowledge 99% of alerts, providing new values for ‘acked’, ‘ack_user’ and ‘ack_time’ columns. Instead of an update, we just insert a new row.

INSERT INTO alerts (tenant_id, alert_id, timestamp, alert_data, acked, ack_user, ack_time)
SELECT tenant_id, alert_id, timestamp, alert_data, 
  1 as acked, 
  concat('user', toString(rand()%1000)) as ack_user,       now() as ack_time
FROM alerts WHERE cityHash64(alert_id) % 99 != 0;

If we query this table now, we will see something like:

SELECT count() FROM alerts

┌──count()─┐
│ 19898060 │
└──────────┘

1 rows in set. Elapsed: 0.008 sec. 

So we definitely have both acknowledged and non-acknowledged rows in the table. So replacing does not happen yet. In order to see the ‘real’ data, we have to add a FINAL keyword.

SELECT count() FROM alerts FINAL

┌──count()─┐
│ 10000000 │
└──────────┘

1 rows in set. Elapsed: 3.693 sec. Processed 19.90 million rows, 1.71 GB (5.39 million rows/s., 463.39 MB/s.) 

The count is correct now, but look at the query time! With FINAL, ClickHouse has to scan all rows and merge them by primary key in query time. That produces the correct answer but with a lot of overhead. Let’s see if we can do better by filtering only rows that have not been acknowledged.

SELECT count() FROM alerts FINAL WHERE NOT acked

┌─count()─┐
│  101940 │
└─────────┘

1 rows in set. Elapsed: 3.570 sec. Processed 19.07 million rows, 1.64 GB (5.34 million rows/s., 459.38 MB/s.) 

The query time and amount of data processed is the same, even though the count is much smaller. Filtering does not help to speed up the query. As the table size grows, the cost may be even more substantial. It does not scale.

Note: for the sake of readability, all queries and query times are presented as if they are running in ‘clickhouse-client’. In fact, we tried queries multiple times in order to make sure results are consistent and confirm it with the ‘clickhouse-benchmark’ utility as well.

Ok, querying the entire table is not that helpful. Can we still use ReplacingMergeTree for our use case? Let’s pick a random tenant_id, and select all records that were not acknowledged yet — imagine there is a dashboard that the user is looking into. I like Ray Bradbury, so I picked 451. Since ‘alert_data’ is just random garbage, we will calculate a checksum, and will use it to confirm that the results are the same across multiple approaches:

SELECT 
  count(), 
  sum(cityHash64(*)) AS data
FROM alerts FINAL
WHERE (tenant_id = 451) AND (NOT acked)

┌─count()─┬─────────────────data─┐
│      90 │ 18441617166277032220 │
└─────────┴──────────────────────┘

1 rows in set. Elapsed: 0.278 sec. Processed 106.50 thousand rows, 119.52 MB (383.45 thousand rows/s., 430.33 MB/s.)

That was pretty fast! In 278 ms we could query all non-acknowledged data. Why is it fast this time? The difference is in the filter condition. ‘tenant_id’ is a part of a primary key, so ClickHouse can filter data before FINAL. In this case, ReplacingMergeTree becomes efficient.

Let’s try a user filter as well and query the number of alerts acknowledged by a particular user. The cardinality of the column is the same — we have 1000 users and can try user451.

SELECT count() FROM alerts FINAL
WHERE (ack_user = 'user451') AND acked

┌─count()─┐
│    9725 │
└─────────┘

1 rows in set. Elapsed: 4.778 sec. Processed 19.04 million rows, 1.69 GB (3.98 million rows/s., 353.21 MB/s.)

That is very slow now because the index is not used. ClickHouse scanned all 19.04 million rows. Note that we cannot add ‘ack_user’ to the index, since it will break ReplacingMergeTree semantics. We can do a trick with PREWHERE, though:

SELECT count() FROM alerts FINAL
PREWHERE (ack_user = 'user451') AND acked

┌─count()─┐
│    9725 │
└─────────┘

1 rows in set. Elapsed: 0.639 sec. Processed 19.04 million rows, 942.40 MB (29.80 million rows/s., 1.48 GB/s.)

PREWHERE is a special hint for ClickHouse to apply a filter differently. Usually ClickHouse is smart enough to move conditions to PREWHERE automatically, so a user should not care. It did not happen this time, so it’s good we’ve checked.

Aggregate Functions

ClickHouse is known for supporting a wide variety of aggregate functions. In the latest versions it has got more than 100. Combined with 9 aggregate function combinators (see https://clickhouse.tech/docs/en/query_language/agg_functions/combinators/), this gives enormous flexibility to an experienced user. For this use case, we do not need anything advanced, and will be using only 3 functions: ‘argMax’, ‘max’ and ‘any’.

The same query for the 451st tenant can be executed with an ‘argMax’ aggregate function as follows:

SELECT count(), sum(cityHash64(*)) data FROM (
  SELECT tenant_id, alert_id, timestamp, 
         argMax(alert_data, ack_time) alert_data, 
         argMax(acked, ack_time) acked,
         max(ack_time) ack_time_,
         argMax(ack_user, ack_time) ack_user
  FROM alerts 
  GROUP BY tenant_id, alert_id, timestamp
) 
WHERE tenant_id=451 AND NOT acked;

┌─count()─┬─────────────────data─┐
│      90 │ 18441617166277032220 │
└─────────┴──────────────────────┘

1 rows in set. Elapsed: 0.059 sec. Processed 73.73 thousand rows, 82.74 MB (1.25 million rows/s., 1.40 GB/s.)

Same result, same number of rows, but 4 times better performance! This is ClickHouse aggregation efficiency. The downside is that the query becomes more complex. But we can make it simpler.

Let’s note, that when acknowledging an alert, we only update 3 columns:

  • acked: 0 => 1

  • ack_time: 0 => now()

  • ack_user: ‘’ => ‘user1’

In all 3 cases, the column value increases! So instead of the somewhat bulky ‘argMax’ we can use ‘max.’ Since we do not change ‘alert_data,’ we do not need any actual aggregation on this column. ClickHouse has a nice ‘any’ aggregate function for this purpose. It picks any value without extra overhead:

SELECT count(), sum(cityHash64(*)) data FROM (
  SELECT tenant_id, alert_id, timestamp, 
    any(alert_data) alert_data, 
    max(acked) acked, 
    max(ack_time) ack_time,
    max(ack_user) ack_user
  FROM alerts
  GROUP BY tenant_id, alert_id, timestamp
) 
WHERE tenant_id=451 AND NOT acked;

┌─count()─┬─────────────────data─┐
│      90 │ 18441617166277032220 │
└─────────┴──────────────────────┘

1 rows in set. Elapsed: 0.055 sec. Processed 73.73 thousand rows, 82.74 MB (1.34 million rows/s., 1.50 GB/s.)

The query becomes simple and it is slightly faster! The reason is that with the ‘any’ function, ClickHouse does not need to calculate ‘max’ on the ‘alert_data’ column!

AggregatingMergeTree

AggregatingMergeTree is one of the most powerful ClickHouse features. When coupled with materialized views, it enables real-time data aggregation. Since we used aggregate functions in the previous approach, can we make it even better with AggregatingMergeTree? Actually, it’s not much of an improvement.

We only update a row once, so there are only two rows to aggregate for a group. For this scenario, AggregatingMergeTree is not the best option. We can do a trick, however. We know that alerts are always inserted as non-acknowledged first, and then become acknowledged. Once a user acknowledges the alert, only 3 columns need to be modified. Can we save a disk space and improve performance if we do not duplicate data for the other columns?

Let’s create a table that implements the aggregation using the ‘max’ aggregate function. Instead of ‘max’, we could also use ‘any’, but that would require columns to be Nullable — ‘any’ would pick a not-null value.

DROP TABLE alerts_amt_max;

CREATE TABLE alerts_amt_max (
  tenant_id     UInt32,
  alert_id      String,
  timestamp     DateTime Codec(Delta, LZ4),
  alert_data    SimpleAggregateFunction(max, String),
  acked         SimpleAggregateFunction(max, UInt8),
  ack_time      SimpleAggregateFunction(max, DateTime),
  ack_user      SimpleAggregateFunction(max, LowCardinality(String))
)
Engine = AggregatingMergeTree()
ORDER BY (tenant_id, timestamp, alert_id);

Since the original data was random, we will populate the new table using existing data from ‘alerts’. We will do it in two inserts, like before, one for non-acknowledged alerts and another one for acknowledged:

INSERT INTO alerts_amt_max SELECT * FROM alerts WHERE NOT acked;

INSERT INTO alerts_amt_max 
SELECT tenant_id, alert_id, timestamp,
  '' as alert_data, 
  acked, ack_time, ack_user 
FROM alerts WHERE acked;

Note that we insert an empty string instead of ‘alert_data’ for acknowledged events. We know that data does not change, and we can store it only once! The aggregate function will fill the gap. In the real application, we can just skip all columns that do not change and let them get default values.

Once we have the data, let’s check the data sizes first:

SELECT 
    table, 
    sum(rows) AS r, 
    sum(data_compressed_bytes) AS c, 
    sum(data_uncompressed_bytes) AS uc, 
    uc / c AS ratio
FROM system.parts
WHERE active AND (database = 'last_state')
GROUP BY table

┌─table──────────┬────────r─┬───────────c─┬──────────uc─┬──────────────ratio─┐
│ alerts         │ 19039439 │ 20926009562 │ 21049307710 │ 1.0058921003373666 │
│ alerts_amt_max │ 19039439 │ 10723636061 │ 10902048178 │ 1.0166372782501314 │
└────────────────┴──────────┴─────────────┴─────────────┴────────────────────┘

Well, we have almost no compression, thanks to random strings. But aggregate is two times smaller, since we do not have to store ‘alerts_data’ twice.

Now let’s try the query over the aggregate table:

SELECT count(), sum(cityHash64(*)) data FROM (
   SELECT tenant_id, alert_id, timestamp, 
          max(alert_data) alert_data, 
          max(acked) acked, 
          max(ack_time) ack_time,
          max(ack_user) ack_user
     FROM alerts_amt_max
   GROUP BY tenant_id, alert_id, timestamp
) 
WHERE tenant_id=451 AND NOT acked;

┌─count()─┬─────────────────data─┐
│      90 │ 18441617166277032220 │
└─────────┴──────────────────────┘

1 rows in set. Elapsed: 0.036 sec. Processed 73.73 thousand rows, 40.75 MB (2.04 million rows/s., 1.13 GB/s.)

Thanks to AggregatingMergeTree, we process less data (40MB vs 82MB before) and it is now even more efficient.

Materializing The Update

ClickHouse will do its best to merge data in the background, removing duplicate rows and performing aggregation. Sometimes, however, it makes sense to force the merge, in order to release disk space, for example. This can be done with the OPTIMIZE FINAL statement. OPTIMIZE is a blocking and expensive operation, therefore it cannot be performed too often. Let’s see if it makes any difference for the query performance.

OPTIMIZE TABLE alerts FINAL
Ok.
0 rows in set. Elapsed: 105.675 sec.

OPTIMIZE TABLE alerts_amt_max FINAL
Ok.
0 rows in set. Elapsed: 70.121 sec.

After OPTIMIZE FINAL, both tables have the same number of rows and identical data.

┌─table──────────┬────────r─┬───────────c─┬──────────uc─┬────────────ratio─┐
│ alerts         │ 10000000 │ 10616223201 │ 10859490300 │ 1.02291465565429 │
│ alerts_amt_max │ 10000000 │ 10616223201 │ 10859490300 │ 1.02291465565429 │
└────────────────┴──────────┴─────────────┴─────────────┴──────────────────┘

The difference in performance between different approaches becomes less notable. Here is the summary table:

After inserts After OPTIMIZE FINAL
ReplacingMergeTree FINAL 0.278 0.037
argMax 0.059 0.034
any/max 0.055 0.029
AggregatingMergeTree 0.036 0.026

Conclusion

ClickHouse provides a rich toolset to handle real-time updates such as ReplacingMergeTree, CollapsingMergeTree (not reviewed here), AggregatingMergeTree and aggregate functions. All those approaches have three common properties:

  • Data is “modified” by inserting the new version. Inserts in ClickHouse are extremely fast.

  • There are efficient ways to emulate update semantics similar to those of OLTP databases

  • However, the actual modification does not happen immediately.

The choice of the particular approach depends on the application use case. ReplacingMergeTree is straightforward and the most convenient for the user, but may only be used for small to medium sized tables or if the data is always queried by the primary key. Use of aggregate functions gives more flexibility and performance but requires quite a lot of query rewrite. And finally, AggregatingMergeTree allows storage saving, keeping only modified columns. These are good tools to have in the arsenal of ClickHouse DB designer and apply when needed.

Share

5 Comments

  1. Hi,
    I have a question if we have high write rate (~100 rps), would the ReplacingMergeTree handle that load, or would it throw the “merges are much slower than ingestions: too many parts” exception?

    1. Hi Ibrahim,
      It is similar to any other MergeTree table. ClickHouse is not designed for high frequency rights, but few techniques can be used:
      – Buffer table engine — this one can accumulate inserts in RAM and flush them on a certain thresholds. Unflushed data can be lost on a hard restart.
      – In-memory MergeTree parts — similar to above, but with WAL and more user-friendly in general. The downside is replication, if table is replicated inserts are still registered in ZooKeeper that may become a bottleneck.
      – App-level buffering or tools like clickhouse-bulk.

  2. Hi Alexander Zaitsev,
    I think this article is a bit misleading. You generate 10M alerts for 1000 tenants but you always query the data with tenant_id=451. Since your data is sorted by tenant_id ClickHouse will always work with 10k alerts.
    So what you really demonstrate here is that ClickHouse can deduplicate your alerts with an aggregation but only for 10k alerts. What you demonstrate also is that ClickHouse is fast at selecting 10k contigous alerts from the 10M dataset but this is no surprise.
    If you need more tenant_id or worse, if you want to query on all tenant_id the performance will be very very poor, I think it should be mentionned in this article for the sake of clarity.
    Cheers,
    Paul.

    1. Hi Paul,
      The article has been inspired by a case we had with one of Altinity clients. Consider a SaaS system that stores and tracks alerts for different customers (tenants). When customer is looking into the dashboard, the data should always be filtered by a tenant, so I picked a single tenant_id for the performance check. Performance on the full dataset will be definitely not that good (though, it has been improved quite a lot since the time this article was written), but that was not a requirement. If real-time updates need to be applied to the huge dataset, CollapsingMergeTree should be used instead. This is the very first link in the article.

      As a side note, we are working on lightweight updates and deletes in ClickHouse. Here is a RFC https://github.com/ClickHouse/ClickHouse/issues/19627

      Cheers,
      Alexander

      1. Hi Alexander,
        Thank you for your reply and for working on the RFC. I guess deduplication and columnar-based database for OLAP doesn’t mix-up well regardless you are using ClickHouse or another solution. But there are solutions if you are ok with trades, it’s just more work for us the software engineer;)

        Cheers,
        Paul.

Comments are closed.