How We Stopped Our ClickHouse DB From Exploding

Dittofeed is a happy customer of Altinity. This article was written by Max Gurewitz, co-founder of Dittofeed.

Micro-batching in ClickHouse for User Segmentation

I will share some techniques for calculating live user segments while keeping your ClickHouse instance from exploding! User segmentation is key to understanding your business or application’s users’ behaviors. For example, you might want to keep track of which users have gotten stuck in an onboarding flow or which users haven’t logged in within the last 30 days.

We’re building an application, Dittofeed, an Open-Source customer engagement platform. It’s used to automate email and SMS messaging (among other channels). We use ClickHouse as our primary store of user events and to calculate live user segments.

We use the techniques discussed below to:

  • Guarantee idempotency when inserting events.
  • Improve scalability with micro-batching.
  • Handle late and out-of-order events.

What Is ClickHouse and Why Should You Use It?

ClickHouse is one of the world’s most powerful OLAP databases. It excels in use cases that require efficiently running (relatively) low throughput queries that access many rows. This contrasts with a standard OLTP database you might be more familiar with, like Postgres. In relative terms, OLTP databases like Postgres excel at making frequent queries that access a relatively small number of rows.

ClickHouse differs in several respects from data warehouses like Snowflake, Redshift, BigQuery, etc., which are also OLAP stores. ClickHouse provides powerful tools for application engineers to optimize critical queries. On the other hand, data warehouses are specialized for query flexibility and the Business Intelligence use case. Critically, ClickHouse is open source (Apache License 2.0), which was an important consideration when designing Dittofeed (MIT License).

ClickHouse also excels at storing and querying semi-structured data, like event logs. Previously, many engineering teams used Elasticsearch in a similar niche to ClickHouse, building applications like Kibana. Increasingly, developers are choosing ClickHouse over Elasticsearch for its unparalleled performance characteristics. For example, our friends at hyperdx.io are using ClickHouse to build an open-source OpenTelemetry provider!

User Segments – How We Use ClickHouse

We use ClickHouse to store and query our semi-structured event logs describing users’ behavior. The results of these queries allow us to decide which users to message, when, and how to message them. Relevantly, we use ClickHouse to calculate user segments.

User segments, also called audiences in some platforms, are just a list of users calculated based on the user’s traits or past behaviors. Calculating user segments is central to Dittofeed’s platform, and as we began to scale, it’s caused us problems!

We provide a low-code interface for Dittofeed workspace members (not to be confused with end-users) to define segments. These segment definitions are constructed in Dittofeed’s UI and are stored as JSON. These definitions are later translated into ClickHouse queries. We run the queries on a polling period to keep the segment assignments live and updated as users enter and exit segments. The following typescript pseudocode demonstrates this logic:

async function main() {
  while (true) {
    await reCalculateSegments();
    await sleep(POLLING_INTERVAL);
  }
}

The concern of this article is how to define the tables and queries underlying the reCalculateSegments function. This turns out to be a fairly general problem that many engineers can benefit from understanding!

For this problem, I’ve provided a git repo containing a series of test implementations in increasing order of complexity. Check it out to follow along!

Sample Segment Definition

For this article, we’re going to use a simple illustrative segment. That segment will contain users who have performed at least 2 BUTTON_CLICK events.

The Naive Approach

In this naive approach, we’ll create two tables. One is to store our user events. The other is to store segment assignments.

CREATE TABLE user_events_naive (
    user_id String,
    -- we care when this value equals 'BUTTON_CLICK'
    event_name LowCardinality(String),
    timestamp DateTime
)
Engine = MergeTree()
ORDER BY (user_id, event_name, timestamp);

CREATE TABLE segment_assignments_naive (
    user_id String,
    value Boolean,
    assigned_at DateTime DEFAULT now(),
)
Engine = ReplacingMergeTree()
ORDER BY (user_id);

We insert assignments by checking which users have at least two BUTTON_CLICK events.

INSERT INTO segment_assignments_naive (user_id, value)
SELECT user_id, count() >= 2
FROM user_events_naive
WHERE event_name = 'BUTTON_CLICK'
GROUP BY user_id;

SELECT
  user_id,
  -- We have to use an aggregation here to select the latest value because the
  -- ReplacingMergeTree engine replaces values asynchronously, so we need to be
  -- sure to skip over stale values.
  argMax(value, assigned_at) AS latest_value
FROM segment_assignments_naive
GROUP BY user_id
HAVING latest_value = True;

The above code is implemented here:

1-naive.test.ts

We use a ReplacingMergeTree engine table for our segment assignments so that when we re-calculate our segment values, the new values replace the old ones.

This approach is relatively simple. However, it has several issues. Let’s start with one, a lack of idempotency.

Idempotency

Our table/query structure is not idempotent! What’s idempotency, and why do we care? An operation is idempotent if performing it multiple times is logically equivalent to performing it a single time. This is one of the most important concepts for understanding distributed systems.

In distributed systems, even in the best case, messages either get delivered at least once with the potential for duplicates, or at most once with a potential for dropped messages. To know which bucket you fall into, a simple rule of thumb is to ask whether clients implement retries when issuing requests to your API. If they do, in the best case you’ve got at least once delivery. Otherwise, you may have at most once delivery.

Luckily, we can combine at least once delivery, with idempotency, to achieve logically exactly once delivery. In our case, we allow clients to send duplicate events and are smart enough to ignore the duplicates when calculating user segments. We can achieve that by introducing an idempotency key message_id to our events table.

CREATE TABLE user_events_idempotent (
    user_id String,
    event_name LowCardinality(String),
    message_id String,
    timestamp DateTime
)
Engine = MergeTree()
ORDER BY (user_id, event_name, timestamp, message_id);

CREATE TABLE segment_assignments_idempotent (
    user_id String,
    value Boolean,
    assigned_at DateTime DEFAULT now()
)
Engine = ReplacingMergeTree()
ORDER BY (user_id);

Then, we can count the number of unique message Ids when calculating the number of times a user has performed an action.

INSERT INTO segment_assignments_idempotent (user_id, value)
SELECT user_id, uniq(message_id) >= 2
FROM user_events_idempotent
WHERE event_name = 'BUTTON_CLICK'
GROUP BY user_id;

If you can engineer your APIs to be idempotent, you’ll already be doing better than the bulk of the field! For example, other popular customer engagement platforms like Braze and OneSignal don’t include idempotency keys in their APIs and are thus susceptible to storing duplicated user events and miscalculating segments. This can result in consumers of these platforms messaging the wrong users at the wrong times!

The above code is implemented here:

2-idempotent.test.ts

Still, in the intro, I alluded to our database being at risk of exploding. We haven’t addressed performance at all. Our existing table and query structures have a critical flaw. We’re querying against every event, and re-calculating every user segment with every polling period!

That means as your events table grows larger, and your application acquires more users, these queries will grow slower and more expensive over time, putting your application at risk!

To solve this problem, we’ll need an approach that allows us only to consider new events as they’re inserted, and to recalculate segments incrementally.

Micro-Batching

We’ll approach this problem by using a technique called “micro-batching”. Micro-batching can be considered a way to simulate stream processing, by processing incoming events in small batches. This requires introducing several new tables. A lot is happening here, so let’s look at the following SQL statements and accompanying comments.

CREATE TABLE user_events_micro_batch (
    user_id String,
    event_name LowCardinality(String),
    message_id String,
    timestamp DateTime
)
Engine = MergeTree()
ORDER BY (user_id, event_name, timestamp, message_id);

-- We make use of the AggregatingMergeTree engine which allows us to to store
-- the intermediate state representing each user's message id count. This allows
-- us to avoid having to re-sum all of a user's BUTTON_CLICK event message id's
-- with each new polling period, and instead only add the new events to an
-- existing count. ClickHouse conveniently merges the aggregation state in the
-- background.
CREATE TABLE user_states_micro_batch (
    user_id String,
    event_count AggregateFunction(uniq, String),
    computed_at DateTime DEFAULT now(),
)
Engine = AggregatingMergeTree()
ORDER BY (user_id);

-- This table allows us to track the last time a user's state was updated,
-- within the last 100 days. This is useful so that we can only update user
-- segment's for users whose state have been updated since the last polling
-- period.
CREATE TABLE updated_user_states_micro_batch (
    user_id String,
    computed_at DateTime DEFAULT now()
)
Engine = MergeTree()
PARTITION BY toYYYYMMDD(computed_at)
ORDER BY computed_at
TTL toStartOfDay(computed_at) + interval 100 day;

-- This materialized view is used to automatically populate the updated states
-- table whenever the states table receives new data.
CREATE MATERIALIZED VIEW updated_user_states_micro_batch_mv
TO updated_user_states_micro_batch
AS SELECT
    user_id,
    computed_at
FROM user_states_micro_batch;

CREATE TABLE segment_assignments_micro_batch (
    user_id String,
    value Boolean,
    assigned_at DateTime DEFAULT now()
)
Engine = ReplacingMergeTree()
ORDER BY (user_id);

Now, we need to populate the assignments table in two steps: inserting into the state table and then inserting into the assignments table.

INSERT INTO user_states_micro_batch
SELECT
    user_id,
    -- The following is the syntax for producing the incremental state used by
    -- AggregatingMergeTree tables.
    uniqState(message_id),
    parseDateTimeBestEffort({now:String})
FROM user_events_micro_batch
WHERE
    event_name = 'BUTTON_CLICK'
    AND timestamp >= parseDateTimeBestEffort({lower_bound:String})
GROUP BY user_id;

INSERT INTO segment_assignments_micro_batch
SELECT
    user_id,
    uniqMerge(event_count) >= 2,
    parseDateTimeBestEffort({now:String})
FROM user_states_micro_batch
WHERE
    -- We use this clause to select only users whose state has been updated in
    -- the latest polling period
    user_id IN (
        SELECT user_id
        FROM updated_user_states_micro_batch
        WHERE computed_at >= parseDateTimeBestEffort({now:String})
    )
GROUP BY user_id;

Our polling pseudocode also needs to be updated. Now, our reCalculateSegments function depends on when the last polling period occurred, so it can window its queries.

async function main() {
  let lastPollingPeriod: Date | null = await getLastPollingPeriod();

  while (true) {
    await reCalculateSegments(lastPollingPeriod);
    await sleep(POLLING_INTERVAL);

    lastPollingPeriod = new Date();
    await updateLastPollingPeriod(lastPollingPeriod);
  }
}

This code introduces two new function calls, getLastPollingPeriod, and updateLastPollingPeriod, which are methods for reading and writing a single timestamp. We store the equivalent of this timestamp in our Postgres database.

This relatively complex setup gets us segment recalculations whose performance depends on the number of events in a single polling period, but crucially does not directly depend on the total number of user events.

The above code is implemented here:

3-microBatch.test.ts

However, we’re still not done. Something’s still not quite right.

Event Time vs Processing Time

What exactly is the timestamp on our events table? Is it the time when the user behavior occurred, the “event time”? Or is it the time when the event was recorded, the “processing time”? This is an important distinction because you might receive events that are drastically out of order in event time.

Imagine an end-user’s actions are logged as events, but their device loses network connectivity, only to regain it an hour later. The result will be events received an hour late in event time! Because we lack ordering guarantees with respect to event time, it’s important that we do our micro-batch windowing in processing time.

As an aside, due to imperfect clocks, events can also be recorded out of order with respect to processing time. This issue can be exacerbated when you introduce sharding to your database. However, that’s a discussion for another time…

Let’s see how we can address this issue of distinguishing event time from processing time with our new table structure.

CREATE TABLE user_events_event_time (
    user_id String,
    event_name LowCardinality(String),
    message_id String,
    -- We store event time and processing time separately.
    event_time DateTime,
    processing_time DateTime
)
Engine = MergeTree()
-- Crucially our sorting key uses processing time, *not* event time.
ORDER BY (user_id, event_name, processing_time, message_id);

CREATE TABLE user_states_event_time (
    user_id String,
    event_count AggregateFunction(uniq, String),
    -- We maintain state representing the last event time.
    last_event_time AggregateFunction(max, DateTime),
    computed_at DateTime DEFAULT now(),
)
Engine = AggregatingMergeTree()
ORDER BY (user_id);

CREATE TABLE segment_assignments_event_time (
    user_id String,
    value Boolean,
    -- We record the last relevant time for the segment.
    last_event_time DateTime,
    assigned_at DateTime DEFAULT now()
)
Engine = ReplacingMergeTree()
ORDER BY (user_id);

Then the accompanying queries.

INSERT INTO segment_assignments_event_time
SELECT
    user_id,
    uniqMerge(event_count) >= 2,
    maxMerge(last_event_time),
    parseDateTimeBestEffort({now:String})
FROM user_states_event_time
WHERE
    user_id IN (
        SELECT user_id
        FROM updated_user_states_event_time
        WHERE computed_at >= parseDateTimeBestEffort({now:String})
    )
GROUP BY user_id;

INSERT INTO segment_assignments_event_time
SELECT
    user_id,
    uniqMerge(event_count) >= 2,
    maxMerge(last_event_time),
    parseDateTimeBestEffort({now:String})
FROM user_states_event_time
WHERE
    user_id IN (
        SELECT user_id
        FROM updated_user_states_event_time
        WHERE computed_at >= parseDateTimeBestEffort({now:String})
    )
GROUP BY user_id;

SELECT
    user_id,
    argMax(last_event_time, assigned_at) AS last_event_time,
    argMax(value, assigned_at) AS latest_value
FROM segment_assignments_event_time
GROUP BY user_id
HAVING latest_value = True;

This new table and query structure allows us to record event and processing times separately and surface event times with our segment assignments!

The above code is implemented here:

4-eventTime.test.ts

Conclusion

We covered a lot of concepts and code in this article. We’ve covered how to calculate user segments in ClickHouse with the following qualities:

  • Idempotency
  • Micro-batching
  • Distinguished event and processing times.

If you enjoyed this article, feel free to give Dittofeed’s repo a star.

Share

Related: