Replicating data from PostgreSQL to ClickHouse with the Altinity Sink Connector
Transactional databases like PostgreSQL routinely process hundreds of thousands of transactions per second on busy web properties.
For users analyzing such transactions, an analytic database like ClickHouse is a perfect fit, as it provides benefits like columnar storage, efficient data compression, and parallel query processing. These features allow users to get quick answers to complex questions, even in cases involving very large datasets.
The faster transactions get from PostgreSQL into ClickHouse, the faster users get benefits. For this reason, we are introducing simple, 100% open source replication to move data from PostgreSQL to ClickHouse in real-time.
How it works
Our initial implementation of replication was based on the Kafka Connect framework. With the Kafka connect framework, there are drawbacks from an operational perspective, as the solution involves running multiple services on top of Kafka.
We, therefore, developed a newer solution that only requires a single executable to transfer data from PostgreSQL to ClickHouse.
Find it here: https://github.com/Altinity/clickhouse-sink-connector
There are several challenges with migrating data from PostgreSQL to ClickHouse. The main goal for developing this solution is to minimize the manual effort in performing the migration. Here are some of the major benefits of the new approach.
- Single executable: From an operational perspective, it becomes easy to have one single executable without requiring any additional services.
- Streaming replication: In addition to pulling in initial records from PostgreSQL to ClickHouse, the sink connector light-weight’s strengths are with streaming replication. Any change (Insert/Update/Delete) will be replicated in near real-time to ClickHouse.
- On-Prem/Cloud/Kubernetes Support: Depending on the industry, clients might be running their workloads on-premises or on the cloud. Our primary objective is to design a solution that works everywhere and is easily maintainable with monitoring capabilities.
- Schema/datatype conversion: This is an area where a lot of time and effort is usually spent with mapping data types and the ranges. The SQL syntax is also different in the two databases, a tool that will help with automatic creation of schema would avoid manual effort in creating schema.
- Batch Inserts: Clickhouse performs well when the rows are inserted in batches, unlike transactional databases, Clickhouse is not designed to handle frequent updates/deletes.
We will walk through the steps involved in setting up the tool.
Setup – Docker compose (PostgreSQL)
Step 1:: Clone the repo with the following command.
git clone https://github.com/Altinity/clickhouse-sink-connector.git cd sink-connector-lightweight
Pre-requisites: Docker and Docker compose need to be installed. https://docs.docker.com/compose/install/
Step 2: The configuration file to update PostgreSQL and ClickHouse information is provided in yaml format. Use your favorite editor to update the configuration.
cd docker vi config_postgres.yml
Step 3: Update PostgreSQL connection information in config_postgres.yaml: database.hostname,
database.port, database.user
and database.password.
database.hostname: "postgres" database.port: "5432" database.user: "root" database.password: "root" database.server.name: "ER54"
Step 4: Enter the PostgreSQL databases and tables that need to be replicated in the following format schema_name:table_name
.
table.include.list: public.employees
You can also use schema.include.list to include only the list of schemas that you need to be replicated.
schema.include.list: public plugin.name: "pgoutput" table.include.list: "public.tm"
Step 5: Update ClickHouse information in config_postgres.yaml: clickhouse.server.url, clickhouse.server.user, clickhouse.server.pass, clickhouse.server.port.
Also, Update ClickHouse information for the following fields that are used to store the offset information- offset.storage.jdbc.url, offset.storage.jdbc.user, offset.storage.jdbc.password, schema.history.internal.jdbc.url, schema.history.internal.jdbc.user,
and schema.history.internal.jdbc.password.
clickhouse.server.url: "clickhouse" clickhouse.server.user: "root" clickhouse.server.pass: "root" clickhouse.server.port: "8123" clickhouse.server.database: "test" offset.storage.jdbc.url: "jdbc:clickhouse://clickhouse:8123" offset.storage.jdbc.user: "root" offset.storage.jdbc.password: "root" schema.history.internal.jdbc.url: "jdbc:clickhouse://clickhouse:8123" schema.history.internal.jdbc.user: "root" schema.history.internal.jdbc.password: "root"
Step 6: Set snapshot.mode
to initial
if you like to replicate existing records. Otherwise, set snapshot.mode
to schema_only
to replicate schema and only the records that are modified after the connector is started.
Step 7: Start the connector using docker compose and the docker compose yaml file.
# This starts PostgreSQL and ClickHouse containers export SINK_LIGHTWEIGHT_VERSION=latest cd sink-connector-lightweight/docker docker-compose -f docker-compose-postgres.yml up
The docker-compose-postgres.yml includes a sample Postgres schema and inserts test data to postgreSQL.
After the connector is started, you will notice that the connector reads the PostgreSQL schema when data is updated and creates the corresponding ClickHouse tables. This behavior can be enabled by setting this configuration variable (auto.create.tables is set to true) in the yaml file.
PostgreSQL table
\d+ public.tm; Table "public.tm" Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description -------------------------+--------------------------+-----------+----------+---------+----------+-------------+--------------+------------- id | uuid | | not null | | plain | | | secid | uuid | | | | plain | | | acc_id | uuid | | | | plain | | | ccatz | character varying | | | | extended | | | tcred | boolean | | | false | plain | | | am | numeric(21,5) | | | | main | | | set_date | timestamp with time zone | | | | plain | | | created | timestamp with time zone | | | | plain | | | updated | timestamp with time zone | | | | plain | | | events_id | uuid | | | | plain | | | events_transaction_id | uuid | | | | plain | | | events_status | character varying | | | | extended | | | events_payment_snapshot | jsonb | | | | extended | | | events_created | timestamp with time zone | | | | plain | | | vid | uuid | | | | plain | | | vtid | uuid | | | | plain | | | vstatus | character varying | | | | extended | | | vamount | numeric(21,5) | | | | main | | | vcreated | timestamp with time zone | | | | plain | | | vbilling_currency | character varying | | | | extended | | |
ClickHouse Table
│ CREATE TABLE test.tm ( `id` UUID, `secid` Nullable(UUID), `acc_id` Nullable(UUID), `ccatz` Nullable(String), `tcred` Nullable(Bool), `am` Nullable(Decimal(21, 5)), `set_date` Nullable(DateTime64(6)), `created` Nullable(DateTime64(6)), `updated` Nullable(DateTime64(6)), `events_id` Nullable(UUID), `events_transaction_id` Nullable(UUID), `events_status` Nullable(String), `events_payment_snapshot` Nullable(String), `events_created` Nullable(DateTime64(6)), `vid` Nullable(UUID), `vtid` Nullable(UUID), `vstatus` Nullable(String), `vamount` Nullable(Decimal(21, 5)), `vcreated` Nullable(DateTime64(6)), `vbilling_currency` Nullable(String), `_sign` Int8, `_version` UInt64 ) ENGINE = ReplacingMergeTree(_version) ORDER BY tuple() SETTINGS index_granularity = 8192 │
By default, the connector creates tables with the ReplacingMergeTree engine, so that updates and deletions can be handled.
In addition to the table, you will also notice that the 2 rows from the PostgreSQL table(public.tm
) are inserted into ClickHouse.
public=# select * from public.tm; -[ RECORD 1 ]-----------+------------------------------------- id | 9cb52b2a-8ef2-4987-8856-c79a1b2c2f71 secid | 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72 acc_id | 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72 ccatz | IDR tcred | t am | 200000.00000 set_date | 2022-10-16 16:53:15.01957+00 created | 2022-10-16 16:53:15.01957+00 updated | 2022-10-16 16:53:15.01957+00 events_id | b4763f4a-2e3f-41ae-9715-4ab113e2f53c events_transaction_id | 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72 events_status | events_payment_snapshot | {"Hello": "World"} events_created | 2022-10-16 16:53:15.01957+00 vid | vtid | vstatus | vamount | vcreated | vbilling_currency | -[ RECORD 2 ]-----------+------------------------------------- id | 9cb52b2a-8ef2-4987-8856-c79a1b2c2f73 secid | 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72 acc_id | 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72 ccatz | IDR tcred | t am | 200000.00000 set_date | 2022-10-16 16:53:15.01957+00 created | 2022-10-16 16:53:15.01957+00 updated | 2022-10-16 16:53:15.01957+00 events_id | b4763f4a-2e3f-41ae-9715-4ab113e2f53c events_transaction_id | 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72 events_status | events_payment_snapshot | events_created | 2022-10-16 16:53:15.01957+00 vid | 9cb52b2a-8ef2-4987-8856-c79a1b2c2f71 vtid | 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72 vstatus | COMPLETED vamount | 200000.00000 vcreated | 2022-10-16 16:53:15.01957+00 vbilling_currency | IDR
ClickHouse: On the Clickhouse side, you will notice that the tool automatically creates the databases and the table schema, then inserts the data.
866908bb3f4a :)
select * from tm format Vertical;
SELECT *
FROM tm
FORMAT Vertical
Query id: acef9cbb-8fae-402f-aeec-9ebd710cd7dd
Row 1:
──────
id: 9cb52b2a-8ef2-4987-8856-c79a1b2c2f71
secid: 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
acc_id: 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
ccatz: IDR
tcred: true
am: 200000
set_date: 2022-10-16 16:53:15.019570
created: 2022-10-16 16:53:15.019570
updated: 2022-10-16 16:53:15.019570
events_id: b4763f4a-2e3f-41ae-9715-4ab113e2f53c
events_transaction_id: 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
events_status: ᴺᵁᴸᴸ
events_payment_snapshot: {"Hello": "World"}
events_created: 2022-10-16 16:53:15.019570
vid: ᴺᵁᴸᴸ
vtid: ᴺᵁᴸᴸ
vstatus: ᴺᵁᴸᴸ
vamount: ᴺᵁᴸᴸ
vcreated: ᴺᵁᴸᴸ
vbilling_currency: ᴺᵁᴸᴸ
_sign: 1
_version: 1689793587162
Row 2:
──────
id: 9cb52b2a-8ef2-4987-8856-c79a1b2c2f73
secid: 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
acc_id: 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
ccatz: IDR
tcred: true
am: 200000
set_date: 2022-10-16 16:53:15.019570
created: 2022-10-16 16:53:15.019570
updated: 2022-10-16 16:53:15.019570
events_id: b4763f4a-2e3f-41ae-9715-4ab113e2f53c
events_transaction_id: 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
events_status: ᴺᵁᴸᴸ
events_payment_snapshot: ᴺᵁᴸᴸ
events_created: 2022-10-16 16:53:15.019570
vid: 9cb52b2a-8ef2-4987-8856-c79a1b2c2f71
vtid: 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
vstatus: COMPLETED
vamount: 200000
vcreated: 2022-10-16 16:53:15.019570
vbilling_currency: IDR
_sign: 1
_version: 1689793587162
2 rows in set. Elapsed: 0.002 sec.
Real-time Replication:
Let’s try updating the data in PostgreSQL and check if the data gets replicated in ClickHouse.
public=# update public.tm set ccatz='IDR2' where 1=1;
ClickHouse: Notice the values are instantly updated in ClickHouse.
f8bc5c40b8fb 🙂 select ccatz from test.tm final; SELECT ccatz FROM test.tm FINAL Query id: f50455d9-b038-446f-a4c8-e543be248a83 ┌─ccatz─┐ │ IDR2 │ │ IDR2 │ └───────┘ 2 rows in set. Elapsed: 0.003 sec.
State Storage:
The state of the replication is stored in ClickHouse. The table name and database names can be configured in the yaml file. (offset.storage.offset.storage.jdbc.offset.table.name,
schema.history.internal.jdbc.schema.history.table.name
)
866908bb3f4a 🙂 select * from altinity_sink_connector.replica_source_info format Vertical; SELECT * FROM altinity_sink_connector.replica_source_info FORMAT Vertical Query id: 7e5bc89c-526d-4493-9857-ce44ea46787b Row 1: ────── id: 03a1f831-8cf7-4990-9ed5-dfeb9023403a offset_key: ["debezium-embedded-postgres",{"server":"embeddedconnector"}] offset_val: {"transaction_id":null,"lsn_proc":27468760,"messageType":"UPDATE","lsn":27468760,"txId":743,"ts_usec":1689794644747659} record_insert_ts: 2023-07-19 19:24:05 record_insert_seq: 1 1 row in set. Elapsed: 0.002 sec.
Connecting to External PostgreSQL and ClickHouse
cd sink-connector-lightweight/docker docker-compose -f docker-compose-postgres-standalone.yml up
Troubleshooting/Monitoring:
The logs are printed on the console and can be viewed using the docker logs command.
docker logs docker_debezium-embedded_1
Logs contain useful information on the status of the replication along with any failures.(Example failures with connecting to either PostgreSQL or ClickHouse or permission issues).
We also have developed a tool (sink-connector-client) that is used to start/stop replication and to monitor the status of replication.
cd sink-connector-client ./sink-connector-client NAME: Sink Connector Lightweight CLI - CLI for Sink Connector Lightweight, operations to get status, start/stop replication and set binlog/gtid position USAGE: sink-connector-client [global options] command [command options] [arguments...] VERSION: 1.0 COMMANDS: start_replica Start the replication stop_replica Stop the replication show_replica_status Status of replication change_replication_source Update binlog file/position and gtids lsn Update lsn(For postgreSQL) help, h Shows a list of commands or help for one command GLOBAL OPTIONS: --host value Host server address of sink connector --port value Port of sink connector --secure If true, then use https, else http --help, -h show help --version, -v print the version
Status of replication: The following command will print the lag, status (Running/stopped), and the last processed LSN from PostgreSQL.
./sink-connector-client show_replica_status [ { "Seconds_Behind_Source": 0 }, { "Replica_Running": true }, { "Database": "test" }, { "record_insert_ts": 2023-07-19T19, "offset_key": "[\"debezium-embedded-postgres\",{\"server\":\"embeddedconnector\"}]", "record_insert_seq": 1, "id": "9270ca3c-3340-4f07-819e-6d6fb8707595", "offset_val": "{\"transaction_id\":null,\"lsn_proc\":27409872,\"messageType\":\"UPDATE\",\"lsn\":27409872,\"txId\":740,\"ts_usec\":1689796710292558}" } ]
Starting/Stopping replication: There is also a convenient option for temporarily stopping replication and starting it again.
./sink-connector-client stop_replica 2023/07/19 16:13:05 ***** Stopping replication..... ***** 2023/07/19 16:13:06 2023/07/19 16:13:06 ***** Replication stopped successfully *****
./sink-connector-client start_replica
Next Steps:
We are actively working on adding new features. Some of the immediate features we are working on are
- Ability to start replication at a specific LSN position.
- Grafana dashboard with status of replication, lag, and Clickhouse metrics
- Tool to perform initial load/dump.
We encourage you to try the solution: https://github.com/Altinity/clickhouse-sink-connector. You can start discussions or create feature requests on the GitHub project.
Alternatively, you can also reach out to us directly on AltinityDB Slack if you have any questions.
About The Author
Kanthi Subramanian is a software engineer at Altinity. He has been focusing on developing the Kafka Sink connector for Clickhouse, Lightweight version for performing replication from popular transactional databases to ClickHouse.