Replicating data from PostgreSQL to ClickHouse with the Altinity Sink Connector

Transactional databases like PostgreSQL routinely process hundreds of thousands of transactions per second on busy web properties. 

For users analyzing such transactions, an analytic database like ClickHouse is a perfect fit, as it provides benefits like columnar storage, efficient data compression, and parallel query processing. These features allow users to get quick answers to complex questions, even in cases involving very large datasets. 

The faster transactions get from PostgreSQL into ClickHouse, the faster users get benefits. For this reason, we are introducing simple, 100% open source replication to move data from PostgreSQL to ClickHouse in real-time. 

How it works

Our initial implementation of replication was based on the Kafka Connect framework. With the Kafka connect framework, there are drawbacks from an operational perspective, as the solution involves running multiple services on top of Kafka. 

We, therefore, developed a newer solution that only requires a single executable to transfer data from PostgreSQL to ClickHouse.

Find it here: https://github.com/Altinity/clickhouse-sink-connector

Replicate data from PostgreSQL to Clickhouse with the Altinity Sink connector Lightweight version

There are several challenges with migrating data from PostgreSQL to ClickHouse. The main goal for developing this solution is to minimize the manual effort in performing the migration. Here are some of the major benefits of the new approach.

  • Single executable: From an operational perspective, it becomes easy to have one single executable without requiring any additional services.
  • Streaming replication: In addition to pulling in initial records from PostgreSQL to ClickHouse, the sink connector light-weight’s strengths are with streaming replication. Any change (Insert/Update/Delete) will be replicated in near real-time to ClickHouse.
  • On-Prem/Cloud/Kubernetes Support: Depending on the industry, clients might be running their workloads on-premises or on the cloud. Our primary objective is to design a solution that works everywhere and is easily maintainable with monitoring capabilities. 
  • Schema/datatype conversion: This is an area where a lot of time and effort is usually spent with mapping data types and the ranges. The SQL syntax is also different in the two databases, a tool that will help with automatic creation of schema would avoid manual effort in creating schema.
  • Batch Inserts: Clickhouse performs well when the rows are inserted in batches, unlike transactional databases, Clickhouse is not designed to handle frequent updates/deletes.

We will walk through the steps involved in setting up the tool. 

Setup – Docker compose (PostgreSQL)

Step 1:: Clone the repo with the following command.

git clone https://github.com/Altinity/clickhouse-sink-connector.git
cd sink-connector-lightweight

Pre-requisites: Docker and Docker compose need to be installed. https://docs.docker.com/compose/install/

Step 2: The configuration file to update PostgreSQL and ClickHouse information is provided in yaml format. Use your favorite editor to update the configuration.

cd docker
vi config_postgres.yml

Step 3:  Update PostgreSQL connection information in config_postgres.yaml: database.hostname, database.port, database.user and database.password.

database.hostname: "postgres"
database.port: "5432"
database.user: "root"
database.password: "root"
database.server.name: "ER54"

Step 4:  Enter the PostgreSQL databases and tables that need to be replicated in the following format schema_name:table_name.

table.include.list: public.employees

You can also use schema.include.list to include only the list of schemas that you need to be replicated.  

schema.include.list: public
plugin.name: "pgoutput"
table.include.list: "public.tm"

Step 5:  Update ClickHouse information in config_postgres.yaml: clickhouse.server.url, clickhouse.server.user, clickhouse.server.pass, clickhouse.server.port. 

Also, Update ClickHouse information for the following fields that are used to store the offset information- offset.storage.jdbc.url, offset.storage.jdbc.user, offset.storage.jdbc.password, schema.history.internal.jdbc.url, schema.history.internal.jdbc.user, and schema.history.internal.jdbc.password.

clickhouse.server.url: "clickhouse"
clickhouse.server.user: "root"
clickhouse.server.pass: "root"
clickhouse.server.port: "8123"
clickhouse.server.database: "test"

offset.storage.jdbc.url: "jdbc:clickhouse://clickhouse:8123"
offset.storage.jdbc.user: "root"
offset.storage.jdbc.password: "root"
schema.history.internal.jdbc.url: "jdbc:clickhouse://clickhouse:8123"
schema.history.internal.jdbc.user: "root"
schema.history.internal.jdbc.password: "root"

Step 6:  Set snapshot.mode to initial if you like to replicate existing records. Otherwise, set snapshot.mode to schema_only to replicate schema and only the records that are modified after the connector is started.

Step 7: Start the connector using docker compose and the docker compose yaml file.

# This starts PostgreSQL and ClickHouse containers
export SINK_LIGHTWEIGHT_VERSION=latest
cd sink-connector-lightweight/docker
docker-compose -f docker-compose-postgres.yml up

The docker-compose-postgres.yml includes a sample Postgres schema and inserts test data to postgreSQL.

After the connector is started, you will notice that the connector reads the PostgreSQL schema when data is updated and creates the corresponding ClickHouse tables. This behavior can be enabled by setting this configuration variable (auto.create.tables is set to true) in the yaml file.

PostgreSQL table

 \d+ public.tm;
                                                             Table "public.tm"
         Column          |           Type           | Collation | Nullable | Default | Storage  | Compression | Stats target | Description 
-------------------------+--------------------------+-----------+----------+---------+----------+-------------+--------------+-------------
 id                      | uuid                     |           | not null |         | plain    |             |              | 
 secid                   | uuid                     |           |          |         | plain    |             |              | 
 acc_id                  | uuid                     |           |          |         | plain    |             |              | 
 ccatz                   | character varying        |           |          |         | extended |             |              | 
 tcred                   | boolean                  |           |          | false   | plain    |             |              | 
 am                      | numeric(21,5)            |           |          |         | main     |             |              | 
 set_date                | timestamp with time zone |           |          |         | plain    |             |              | 
 created                 | timestamp with time zone |           |          |         | plain    |             |              | 
 updated                 | timestamp with time zone |           |          |         | plain    |             |              | 
 events_id               | uuid                     |           |          |         | plain    |             |              | 
 events_transaction_id   | uuid                     |           |          |         | plain    |             |              | 
 events_status           | character varying        |           |          |         | extended |             |              | 
 events_payment_snapshot | jsonb                    |           |          |         | extended |             |              | 
 events_created          | timestamp with time zone |           |          |         | plain    |             |              | 
 vid                     | uuid                     |           |          |         | plain    |             |              | 
 vtid                    | uuid                     |           |          |         | plain    |             |              | 
 vstatus                 | character varying        |           |          |         | extended |             |              | 
 vamount                 | numeric(21,5)            |           |          |         | main     |             |              | 
 vcreated                | timestamp with time zone |           |          |         | plain    |             |              | 
 vbilling_currency       | character varying        |           |          |         | extended |             |              | 

ClickHouse Table

│ CREATE TABLE test.tm
(
    `id` UUID,
    `secid` Nullable(UUID),
    `acc_id` Nullable(UUID),
    `ccatz` Nullable(String),
    `tcred` Nullable(Bool),
    `am` Nullable(Decimal(21, 5)),
    `set_date` Nullable(DateTime64(6)),
    `created` Nullable(DateTime64(6)),
    `updated` Nullable(DateTime64(6)),
    `events_id` Nullable(UUID),
    `events_transaction_id` Nullable(UUID),
    `events_status` Nullable(String),
    `events_payment_snapshot` Nullable(String),
    `events_created` Nullable(DateTime64(6)),
    `vid` Nullable(UUID),
    `vtid` Nullable(UUID),
    `vstatus` Nullable(String),
    `vamount` Nullable(Decimal(21, 5)),
    `vcreated` Nullable(DateTime64(6)),
    `vbilling_currency` Nullable(String),
    `_sign` Int8,
    `_version` UInt64
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY tuple()
SETTINGS index_granularity = 8192 │

By default, the connector creates tables with the ReplacingMergeTree engine, so that updates and deletions can be handled.

In addition to the table, you will also notice that the 2 rows from the PostgreSQL table(public.tm)  are inserted into ClickHouse.

public=# select * from public.tm;
-[ RECORD 1 ]-----------+-------------------------------------
id                      | 9cb52b2a-8ef2-4987-8856-c79a1b2c2f71
secid                   | 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
acc_id                  | 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
ccatz                   | IDR
tcred                   | t
am                      | 200000.00000
set_date                | 2022-10-16 16:53:15.01957+00
created                 | 2022-10-16 16:53:15.01957+00
updated                 | 2022-10-16 16:53:15.01957+00
events_id               | b4763f4a-2e3f-41ae-9715-4ab113e2f53c
events_transaction_id   | 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
events_status           | 
events_payment_snapshot | {"Hello": "World"}
events_created          | 2022-10-16 16:53:15.01957+00
vid                     | 
vtid                    | 
vstatus                 | 
vamount                 | 
vcreated                | 
vbilling_currency       | 
-[ RECORD 2 ]-----------+-------------------------------------
id                      | 9cb52b2a-8ef2-4987-8856-c79a1b2c2f73
secid                   | 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
acc_id                  | 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
ccatz                   | IDR
tcred                   | t
am                      | 200000.00000
set_date                | 2022-10-16 16:53:15.01957+00
created                 | 2022-10-16 16:53:15.01957+00
updated                 | 2022-10-16 16:53:15.01957+00
events_id               | b4763f4a-2e3f-41ae-9715-4ab113e2f53c
events_transaction_id   | 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
events_status           | 
events_payment_snapshot | 
events_created          | 2022-10-16 16:53:15.01957+00
vid                     | 9cb52b2a-8ef2-4987-8856-c79a1b2c2f71
vtid                    | 9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
vstatus                 | COMPLETED
vamount                 | 200000.00000
vcreated                | 2022-10-16 16:53:15.01957+00
vbilling_currency       | IDR

ClickHouse: On the Clickhouse side, you will notice that the tool automatically creates the databases and the table schema, then inserts the data.

866908bb3f4a :) select * from tm format Vertical;

SELECT *
FROM tm
FORMAT Vertical

Query id: acef9cbb-8fae-402f-aeec-9ebd710cd7dd

Row 1:
──────
id:                      9cb52b2a-8ef2-4987-8856-c79a1b2c2f71
secid:                   9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
acc_id:                  9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
ccatz:                   IDR
tcred:                   true
am:                      200000
set_date:                2022-10-16 16:53:15.019570
created:                 2022-10-16 16:53:15.019570
updated:                 2022-10-16 16:53:15.019570
events_id:               b4763f4a-2e3f-41ae-9715-4ab113e2f53c
events_transaction_id:   9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
events_status:           ᴺᵁᴸᴸ
events_payment_snapshot: {"Hello": "World"}
events_created:          2022-10-16 16:53:15.019570
vid:                     ᴺᵁᴸᴸ
vtid:                    ᴺᵁᴸᴸ
vstatus:                 ᴺᵁᴸᴸ
vamount:                 ᴺᵁᴸᴸ
vcreated:                ᴺᵁᴸᴸ
vbilling_currency:       ᴺᵁᴸᴸ
_sign:                   1
_version:                1689793587162

Row 2:
──────
id:                      9cb52b2a-8ef2-4987-8856-c79a1b2c2f73
secid:                   9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
acc_id:                  9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
ccatz:                   IDR
tcred:                   true
am:                      200000
set_date:                2022-10-16 16:53:15.019570
created:                 2022-10-16 16:53:15.019570
updated:                 2022-10-16 16:53:15.019570
events_id:               b4763f4a-2e3f-41ae-9715-4ab113e2f53c
events_transaction_id:   9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
events_status:           ᴺᵁᴸᴸ
events_payment_snapshot: ᴺᵁᴸᴸ
events_created:          2022-10-16 16:53:15.019570
vid:                     9cb52b2a-8ef2-4987-8856-c79a1b2c2f71
vtid:                    9cb52b2a-8ef2-4987-8856-c79a1b2c2f72
vstatus:                 COMPLETED
vamount:                 200000
vcreated:                2022-10-16 16:53:15.019570
vbilling_currency:       IDR
_sign:                   1
_version:                1689793587162

2 rows in set. Elapsed: 0.002 sec.

Real-time Replication:

Let’s try updating the data in PostgreSQL and check if the data gets replicated in ClickHouse.

public=# update public.tm set ccatz='IDR2' where 1=1;

ClickHouse:  Notice the values are instantly updated in ClickHouse.

f8bc5c40b8fb 🙂 select ccatz from test.tm final;

SELECT ccatz
FROM test.tm
FINAL

Query id: f50455d9-b038-446f-a4c8-e543be248a83

┌─ccatz─┐
│ IDR2  │
│ IDR2  │
└───────┘

2 rows in set. Elapsed: 0.003 sec.

State Storage:

The state of the replication is stored in ClickHouse. The table name and database names can be configured in the yaml file. (offset.storage.offset.storage.jdbc.offset.table.name, schema.history.internal.jdbc.schema.history.table.name)

866908bb3f4a 🙂 select * from altinity_sink_connector.replica_source_info format Vertical;

SELECT *
FROM altinity_sink_connector.replica_source_info
FORMAT Vertical

Query id: 7e5bc89c-526d-4493-9857-ce44ea46787b

Row 1:
──────
id:                03a1f831-8cf7-4990-9ed5-dfeb9023403a
offset_key:        ["debezium-embedded-postgres",{"server":"embeddedconnector"}]
offset_val:        {"transaction_id":null,"lsn_proc":27468760,"messageType":"UPDATE","lsn":27468760,"txId":743,"ts_usec":1689794644747659}
record_insert_ts:  2023-07-19 19:24:05
record_insert_seq: 1

1 row in set. Elapsed: 0.002 sec.

Connecting to External PostgreSQL and ClickHouse

cd sink-connector-lightweight/docker
docker-compose -f docker-compose-postgres-standalone.yml up

Troubleshooting/Monitoring:

 The logs are printed on the console and can be viewed using the docker logs command.

docker logs docker_debezium-embedded_1

Logs contain useful information on the status of the replication along with any failures.(Example failures with connecting to either PostgreSQL or ClickHouse or permission issues).

We also have developed a tool (sink-connector-client) that is used to start/stop replication and to monitor the status of replication.

cd sink-connector-client
./sink-connector-client
NAME:
   Sink Connector Lightweight CLI - CLI for Sink Connector Lightweight, operations to get status, start/stop replication and set binlog/gtid position

USAGE:
   sink-connector-client [global options] command [command options] [arguments...]

VERSION:
   1.0

COMMANDS:
   start_replica              Start the replication
   stop_replica               Stop the replication
   show_replica_status        Status of replication
   change_replication_source  Update binlog file/position and gtids
   lsn                        Update lsn(For postgreSQL)
   help, h                    Shows a list of commands or help for one command

GLOBAL OPTIONS:
   --host value   Host server address of sink connector
   --port value   Port of sink connector
   --secure       If true, then use https, else http
   --help, -h     show help
   --version, -v  print the version

Status of replication: The following command will print the lag, status (Running/stopped), and the last processed LSN from PostgreSQL.

./sink-connector-client show_replica_status
[
  {
    "Seconds_Behind_Source": 0
  },
  {
    "Replica_Running": true
  },
  {
    "Database": "test"
  },
  {
    "record_insert_ts": 2023-07-19T19,
    "offset_key": "[\"debezium-embedded-postgres\",{\"server\":\"embeddedconnector\"}]",
    "record_insert_seq": 1,
    "id": "9270ca3c-3340-4f07-819e-6d6fb8707595",
    "offset_val": "{\"transaction_id\":null,\"lsn_proc\":27409872,\"messageType\":\"UPDATE\",\"lsn\":27409872,\"txId\":740,\"ts_usec\":1689796710292558}"
  }
]

Starting/Stopping replication: There is also a convenient option for temporarily stopping replication and starting it again.

 ./sink-connector-client stop_replica
2023/07/19 16:13:05 ***** Stopping replication..... *****
2023/07/19 16:13:06
2023/07/19 16:13:06 ***** Replication stopped successfully *****
./sink-connector-client start_replica

Next Steps:
We are actively working on adding new features. Some of the immediate features we are working on are

  • Ability to start replication at a specific LSN position.
  • Grafana dashboard with status of replication, lag, and Clickhouse metrics
  • Tool to perform initial load/dump.

We encourage you to try the solution: https://github.com/Altinity/clickhouse-sink-connector. You can start discussions or create feature requests on the GitHub project.

Alternatively, you can also reach out to us directly on AltinityDB Slack if you have any questions.

About The Author

Kanthi Subramanian is a software engineer at Altinity. He has been focusing on developing the Kafka Sink connector for Clickhouse, Lightweight version for performing replication from popular transactional databases to ClickHouse.

Share

Related: