ClickHouse Kafka Engine FAQ

By Mikhail Filimonov on May 4th, 2020

ClickHouse Kafka Engine FAQ

Mikhail Filimonov ClickHouseKafkaTutorial

с
Kafka engine.png

Kafka is a popular way to stream data into ClickHouse. ClickHouse has a built-in connector for this purpose — the Kafka engine. Our friends from Cloudfare originally contributed this engine to ClickHouse. The Kafka engine has been reworked quite a lot since then and is now maintained by Altinity developers. It is not always evident how to use it in the most efficient way, though. We tried to fill the gap with a Kafka webinar, which was a success. In this article we collected typical questions that we get in our support cases regarding the Kafka engine usage. We hope that our recommendations will help to avoid common problems. 

Q. What is a good ClickHouse version to use for the Kafka engine?

There was a lot of development in 2019 in order to make the Kafka engine stable. It was finally stabilized in the 19.16.14 Altinity Stable version, so this one or any later 19.16 ClickHouse version is good. Older versions may have issues with data consistency when consuming data from Kafka (losses & duplicates).

20.x versions should also work correctly with Kafka, but make sure those are certified as Altinity Stable releases.

Q. How can I use the Kafka engine table? Can I select from the Kafka table directly?

The Kafka engine is designed for one time data retrieval. This means that once data is queried from a Kafka table it is considered as consumed from the queue. Therefore you should never select data from a Kafka engine table directly, but use a materialized view instead. A materialized view is triggered once the data is available in a Kafka engine table. It automatically moves data from a Kafka table to some MergeTree or Distributed engine table. So, you need at least 3 tables:

  • The source Kafka engine table

  • The destination table (MergeTree family or Distributed)

  • Materialized view to move the data

Here is the typical example:

-- Consumer
CREATE TABLE test.kafka (key UInt64, value UInt64)
    ENGINE = Kafka
    SETTINGS kafka_broker_list = 'kafka1:19092',
             kafka_topic_list = 'my_topic',
             kafka_group_name = 'my_conumber_group_name',
             kafka_format = 'JSONEachRow',
             kafka_max_block_size = 1048576;
-- Destination table
CREATE TABLE test.view (key UInt64, value UInt64)
    ENGINE = MergeTree()
    ORDER BY key;
-- Materialized View to move the data from a Kafka topic to a ClickHouse table
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka;

Sometimes it is necessary to apply different transformations to the data coming from Kafka, for example to store raw data and aggregates. In this case, it is possible to have several materialized views attached to a single Kafka engine table, or cascading materialized views. Unfortunately, use of multiple or cascading materialized views is less reliable, as inserts are not atomic. In the case of failure, you get an inconsistent state. Atomic inserts to materialized views are planned for later in 2020.

Q. How can I know the coordinates of the consumed message? 

A Kafka engine table provides the following virtual columns:

  • _topic String

  • _key String

  • _offset UInt64

  • _partition UInt64

  • _timestamp Nullable(DateTime)

Virtual columns should not be created in a Kafka engine table because they are available automatically. 

Q. How can I change the settings of a Kafka engine table?

In order to change the settings, you need to drop and recreate a Kafka table. ALTER TABLE MODIFY SETTINGS for Kafka engine tables is planned for later in 2020.

Q. How can I use a Kafka engine table in a cluster?

The best practice is to create a Kafka engine table on every ClickHouse server, so that every server consumes some partitions and flushes rows to the local ReplicatedMergeTree table. Note that all Kafka engine tables should use the same consumer group name in order to consume the same topic together in parallel.

If the number of consumers (ClickHouse servers with Kafka engine tables) is higher than the number of partitions in the topic, some consumers will do nothing. There should be enough Kafka partitions in the topic to make it possible to consume the topic in parallel by several ClickHouse servers.

Another possibility is to flush data from a Kafka engine table into a Distributed table. It requires more careful configuration, though. In particular, the Distributed table needs to have some sharding key (not a random hash). This is required in order for the deduplication of ReplicatedMergeTree to work properly. Distributed tables will retry inserts of the same block, and those can be deduped by ClickHouse.

Q. How can I configure a Kafka engine table for the maximum performance?

Single table performance depends on row size, used format, number of rows per message, etc. One Kafka table usually can handle 60K-300K simple messages per second.

To achieve the best performance for a single table, ‘kafka_max_block_size’ setting should be increased to values 512K-1M. The default value is 64K, which is too small. We are going to address it in the next versions.

Further improvements are possible if a single topic is consumed by several servers (replicas) or by several Kafka engine tables on the same server.

In the current implementation,’kafka_num_consumers = 1’ should be always used, as increasing doesn’t give any improvement —  it is currently locked in a single thread. 

Instead, one can create several Kafka engine tables coupled with corresponding materialized views that would move the data over to the same target table. That way each Kafka engine table would work in a separate thread. 

Proper multithreading implementation is yet another roadmap item for 2020.

Q. What configuration options can be adjusted?

The following settings are specific to how the Kafka engine consumes messages:

  • kafka_max_block_size (default 65536) — the threshold to commit the block to ClickHouse in number of rows, configured on a table level

  • kafka_skip_broken_messages — the number of errors to tolerate when parsing messages, configured on a table level

  • stream_flush_interval_ms (default 7500) –the threshold to commit the block to ClickHouse in milliseconds, configured on a user profile level; may potentially affect other streaming tables as well

  • kafka_max_wait_ms — timeout to wait while acknowledging the message, configured on a user profile level

There are a number of options specific to the Kafka library as well. Those can be added into a separate section of config.xml or preferably used in a separate file in config.d/. See the list of options at: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

In general, the settings model is bizarre now and needs to be improved.

Q. Does the Kafka engine support authentication against a Kafka cluster? Our Kafka cluster is configured as SASL_PLAINTEXT. How do I provide a username and password for this cluster?

The Kafka engine supports authentication. This is a global server-level setting that should be placed in config.xml. Here is an example of the configuration:

<kafka>
    <security_protocol>sasl_plaintext</security_protocol>
    <sasl_mechanism>PLAIN</sasl_mechanism>
    <sasl_username>test</sasl_username>
    <sasl_password>test</sasl_password>
    <debug>all</debug>
    <auto_offset_reset>latest</auto_offset_reset>
    <compression_type>snappy</compression_type>
</kafka>

See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

Note for Docker users: outbound SSL connections did not work in ClickHouse docker image due to missing certificates. The issue has been fixed in 19.16.19.85, 20.1.12, 20.3.9 and any later releases.

Q. How can I control data retention in Kafka?

Users typically don’t remove data from Kafka brokers after consuming it. It is managed on a broker side (see this article for an example). Instead, once the message is consumed, the current reading position of the topic for that particular consumer group is adjusted. That makes it possible to consume the same data by several different consumer groups independently (i.e. one can stream the same data to ClickHouse, to Hadoop, etc). There is also a possibility to rewind or fast-forward the reading position of the consumer group.

Q. How can I rewind or replay messages? How can I fast-forward or skip messages?

Follow this procedure:

Step 1: Detach Kafka tables in ClickHouse across all cluster nodes.

Step 2: Run the ‘kafka-consumer-groups’ tool:

kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute

There are several resetting options; refer to the kafka-consumer-groups documentation for details:

  • –shift-by <positive_or_negative_integer>

  • –to-current

  • –to-latest

  • –to-offset <offset_integer>

  • –to-datetime <datetime_string>

  • –by-duration <duration_string>

Step 3: Reattach kafka tables 

See also the configuration settings:

<kafka>
    <auto_offset_reset>smallest</auto_offset_reset>
</kafka>

Q. How can I deal with malformed messages?

In general, well-formed data is highly preferable in ClickHouse. Depending on the data format, ClickHouse can tolerate problems in stream better or worse.

ClickHouse has a tendency to use faster, but less robust, options to parse the input. There are quite a lot of options to switch the parser behavior in corner cases, but fine-tuning the behavior is tricky and not always possible. For example, there is a setting ‘input_format_skip_unknown_fields’ applicable to JSONEachRow format. Once enabled at a profile level, ClickHouse would parse JSON in messages containing more fields than listed in the table definition.

Another setting is ‘kafka_skip_broken_messages.’ It does not work for all formats, and sometimes can produce unexpected results. For example, if a single message contains several rows, and one row is malformed, it will skip the row, not all rows in the message itself (as parameter name suggests). 

In the case of JSON / TSV and similar text formats, there is a possibility to parse all fields as strings and cast them to appropriate types at the materialized view level.

There is a lot of room for improvement here. In particular, ClickHouse could put malformed messages in a separate stream with special virtual columns, like _error, and maybe _raw_message. A user then could attach a materialized view to filter them out or store separately.

Q. A system with lots of Kafka engine tables generates timeouts. What’s happening?

Every Kafka table uses a thread from a “background_schedule” thread pool. If there are too many Kafka tables, it makes sense to increase the background_schedule_pool_size setting. Also, monitor BackgroundSchedulePoolTask.

Additionally, a Kafka library (librdkafka) internally creates one thread per broker and service threads, so the overall number of threads used by ClickHouse may be too high, resulting in lots of context switches.

In order to reduce the number of Kafka engine tables consuming different topics, it is possible to use a single Kafka engine table, and let multiple materialized views filter the data by the virtual column ‘_topic.’

Q. The data is missing data when consumed from a Kafka table.

That should never happen if the recommended ClickHouse version is used. It could happen on older versions where there were several bugs leading to missing data. If you experience that in the recommended version or above, please contact us with a bug report.

Q. There are data duplicates when consumed from a Kafka table.

Duplicates are theoretically possible, since current implementation ensures the at-least-once contract. That should not happen in normal circumstances if the recommended ClickHouse version is used. It can happen in rare corner cases, though. For example, if the data was inserted into ClickHouse, and immediately after the connection to the Kafka broker has been lost, then ClickHouse was not able to commit a new offset.

Note that duplicates were an annoying problem in older versions, therefore using the recommended ClickHouse version is a must.

Q. Something does not work. How can I troubleshoot?

At that point, the most informative sources are the ClickHouse trace log, and the debug log for Kafka library (librdkafka), which can be enabled in config.xml as follows:.

<kafka>
   <debug>all</debug>
</kafka>

librdkafka library logs are written to stderr.log.

Q. What are the alternatives to the Kafka engine?

For the Java crowd it is natural to develop a consumer and stream Kafka topic to ClickHouse using ClickHouse JDBC driver. That gives ultimate control.

Some other tools that can be be used in order to plug Kafka and ClickHouse:

Summary and the roadmap

ClickHouse Kafka engine is a great piece of functionality that allows easy integration of ClickHouse with Apache Kafka. It is used by many ClickHouse users with success. Nothing is perfect in the world, however, and there is a lot of room for the improvement. It is the feedback from users that helps us to improve ClickHouse and the Kafka engine. We have a clear plan for how to make the Kafka engine even better, more reliable and easier to use. We are happy to share this plan with you and welcome any suggestions:

  • Usability features:

    • Multithreaded consumer

    • Format specific parser customization

    • Support Kafka headers as virtual columns

    • ALTER TABLE MODIFY SETTINGS

    • Better errors handling

    • Consistent configuration (server, profile, table level)

  • Exactly once semantics (EOS) support once rdkafka library version 1.5 is released by Confluent

  • Introspection tools for easier monitoring and troubleshooting:

    • system.kafka table for monitoring consumers

    • Kafka engine-related metrics in system.metrics

    • Redirect Kafka logs to ClickHouse logs (instead of stderr)

There is also a list of issues that can be found by comp-kafka label at ClickHouse GitHub repository.

ClickHouse is fun, and ClickHouse with Kafka is double fun! Stay tuned!

Leave a Reply