Blog

Building A Better ClickHouse® for Parquet in Practice

Contents:

A few weeks ago we discussed how to read external Parquet data from ClickHouse. The article raised some performance concerns when querying big collections of Parquet files out of AWS S3 buckets. We took those concerns very seriously and planned some improvements as a part of the Building A Better ClickHouse roadmap. Time flies fast. Altinity engineers already have implemented some of the roadmap items. Here’s a report on progress, before we get back and continue our research.

Reintroducing the Problem

In the previous article on this topic we used AWS public blockchain data as an example, in particular the public Bitcoin dataset located in the following S3 bucket:

  • s3://aws-public-blockchain/v1.0/btc/

The bucket contains multiple Parquet files with the following path pattern:

  • blocks/date={YYYY-MM-DD}/{id}.snappy.parquet
  • transactions/date={YYYY-MM-DD}/{id}.snappy.parquet

In order to read data we have created an overlay database, but we can read it using an S3 table function as well:

CREATE DATABASE aws_public_blockchain

ENGINE = S3('s3://aws-public-blockchain/v1.0/btc/', 'NOSIGN')
SELECT uniq(_path), count()
FROM aws_public_blockchain."transactions/**.snappy.parquet"

┌─uniq(_path)─┬───count()──┐
│        5672 │ 1001204525 │
└─────────────┴────────────┘

1 row in set. Elapsed: 31.846 sec. Processed 1.00 billion rows, 0.00 B (31.44 million rows/s., 0.00 B/s.)
Peak memory usage: 10.88 GiB.

We also tried to run analytic queries on top of Parquet data. In particular, we were testing if WHERE conditions are handled efficiently by ClickHouse:

SELECT date, count()
FROM aws_public_blockchain."transactions/**.snappy.parquet"
WHERE date>='2024-01-01'
GROUP BY date ORDER BY date

┌─date───────┬─count()─┐
│ 2024-01-01 │  657752 │
│ 2024-01-02 │  367319 │
│ 2024-01-03 │  502749 │
...
└────────────┴─────────┘

129 rows in set. Elapsed: 27.752 sec. Processed 57.97 million rows, 1.54 GB (2.09 million rows/s., 55.54 MB/s.)
Peak memory usage: 5.15 GiB.

The query takes almost 30 seconds. This is a full scan, since ClickHouse needs to check every Parquet file. Even though it can use minimum and maximum values for the ‘date’ column stored in the header of the Parquet file row group, it still needs to access all the files. That is quite inefficient. We can use the glob filter to select only 2024 files:

SELECT date, count()
FROM aws_public_blockchain."transactions/date=2024*/*.snappy.parquet"
WHERE date>='2024-01-01'
GROUP BY date ORDER BY date
...
129 rows in set. Elapsed: 2.062 sec. Processed 55.44 million rows, 1.52 GB (26.89 million rows/s., 736.78 MB/s.)
Peak memory usage: 334.81 MiB.

Now it takes only 2 seconds, thanks to the glob pattern that pre-filters files! But it requires us to know how data is organized in the bucket and apply changes to the table name itself, which is not quite SQL-ish.

We proposed a prototype of the cache implementation that can help with queries like above but does not require the user to know the data organization in the bucket. We created a reverse index table that would map column ranges to file names or paths:

CREATE TABLE aws_public_blockchain_idx (
  _path String,
  _file String,
  column String,
  min_value String,
  max_value String
) Engine = MergeTree
PARTITION BY tuple()
ORDER BY (column, min_value)

Then we populated it with minimum and maximum values for every column parsing Parquet metadata:

INSERT INTO aws_public_blockchain_idx
SELECT _path, _file, column, min(min_value), max(max_value) from (
WITH arrayJoin(tupleElement(arrayJoin(row_groups), 'columns')) as row_groups_columns
     tupleElement(row_groups_columns, 'statistics') as statistics
SELECT _path, _file,
        tupleElement(row_groups_columns, 'name') column,
        tupleElement(statistics, 'min') min_value,
        tupleElement(statistics, 'max') max_value
FROM s3('s3://aws-public-blockchain/v1.0/btc/transactions/**.snappy.parquet', NOSIGN, 'ParquetMetadata') 
)
GROUP BY _path, _file, column

Finally, we used this index table by applying the WHERE condition to the index table first, extracting paths from the index table, and and then filtering on a virtual column  ‘_path’:

SELECT date, count()
FROM aws_public_blockchain."transactions/**.snappy.parquet"
WHERE _path in (
     SELECT _path FROM aws_public_blockchain_idx
      WHERE column='date' and toDateOrNull(min_value)>='2024-01-01')
  AND date >= '2024-01-01'
GROUP BY date ORDER BY date
...
78 rows in set. Elapsed: 61.714 sec. Processed 32.13 million rows, 645.16 MB (520.65 thousand rows/s., 10.45 MB/s.)
Peak memory usage: 8.60 MiB.

That worked, but it took more than 60 seconds! Apparently, when the ‘_path’ virtual column was used in a filter condition, ClickHouse processed paths sequentially in a single thread. That was definitely a stain on the fastest analytical database of the planet that we could not stand!

Fixing ClickHouse

One of the greatest things about open source is that if there is a problem, you can fix it by yourself! But first we needed to understand what ClickHouse did wrong here.

​​The short answer is that reading files from S3 was performed sequentially in one stream because of wrong estimation of the number of objects in the bucket.

In order to work fast, ClickHouse reads data in parallel when possible. This is true for reading from an S3 bucket as well: ClickHouse reads objects in multiple streams. In some cases, however, the number of reading streams can be bigger than the number of objects in the bucket. For example, if we take a trivial query “SELECT * FROM s3('s3://mybucket/file.parquet')” there is a single object, so there is no point in reading the bucket in many streams.  (To be accurate, the object itself may be read in parallel, but it is a different story). In order to avoid over allocating of reading streams, ClickHouse estimates the number of objects in the bucket and limits the number of reading streams accordingly.

This estimation was based on the number of objects retrieved by initial processing of the glob pattern. In the query above, when the first 1000 objects were listed from the bucket, none of them were matching to the ‘_path’ column. ClickHouse erroneously estimated that there are zero files that to be read and switched to a “failsafe” mode: “something may be wrong, let’s continue query processing, but we’ll read all the files one by one in one stream”.

Once the problem was clear, it was straightforward to fix. Details can be found in PR #62120.

The Better ClickHouse Arrives!

Once the pull request was reviewed and merged, we could immediately test it on the same Altinity.Cloud demo server as before – builds from ClickHouse master branch are uploaded to DockerHub under the ‘clickhouse/clickhouse-server:head’ image tag, so everybody can try it out. At the time of writing, this was ClickHouse version 24.5.1.679, so this feature will be fully available in the 24.5 release.

Result: the query now runs in 3 seconds!

SELECT date, count()
FROM aws_public_blockchain."transactions/**.snappy.parquet"
WHERE _path in (
     SELECT _path FROM aws_public_blockchain_idx
      WHERE column='date' and toDateOrNull(min_value)>='2024-01-01')
  AND date >= '2024-01-01'
GROUP BY date ORDER BY date
...
78 rows in set. Elapsed: 3.391 sec. Processed 32.13 million rows, 645.16 MB (9.47 million rows/s., 190.24 MB/s.)
Peak memory usage: 314.87 MiB.

So the fix was totally successful, and the query speed has been improved by 18 times! 

Now we can continue improving the metadata cache prototype. Let’s notice that the number of rows (78) did not change since the original blog post. In fact, the last row is for ‘2024-03-18’ – this is the data when the index table has been created. The cache is outdated. We can certainly re-generate it, but it is a manual operation that is better avoided. We need to maintain the cache in a consistent way, and update it periodically. Is there a way to do it? 

Up until recently, the only mechanism to have scheduled tasks in ClickHouse was external dictionaries. Dictionaries are refreshed on a schedule, allowing them to run some external jobs. We use it pretty often to automate simple ETL tasks, see “Analyzing DockerHub Pull Counts with Clickhouse and Altinity.Cloud” article for an example. At the end of 2023, a new feature, Refreshable Materialized Views, was implemented. It allows you to refresh tables and do some other interesting things. In our case, it works perfectly for refreshing the cache:

CREATE MATERIALIZED VIEW aws_public_blockchain_idx_update
REFRESH EVERY 1 DAY
TO aws_public_blockchain_idx
AS
SELECT _path, _file, column, min(min_value), max(max_value) from (
WITH arrayJoin(tupleElement(arrayJoin(row_groups), 'columns')) as row_groups_columns,
     tupleElement(row_groups_columns, 'statistics') as statistics
SELECT _path, _file,
        tupleElement(row_groups_columns, 'name') column,
        tupleElement(statistics, 'min') min_value,
        tupleElement(statistics, 'max') max_value
FROM s3('s3://aws-public-blockchain/v1.0/btc/transactions/**.snappy.parquet', NOSIGN, 'ParquetMetadata') 
)
GROUP BY _path, _file, column

So, what is happening here? First, we specified a REFRESH clause that tells ClickHouse to re-run the query every 1 DAY. Second, we used ‘TO’ syntax in order to direct results to the previously created index table. Finally, there is a query itself that does the job.

Once created, we can check the status in the special system table:

SELECT * FROM system.view_refreshes FORMAT Vertical
Row 1:
──────
database:               default
view:                   aws_public_blockchain_idx_update
status:                 Scheduled
last_refresh_result:    Finished
last_refresh_time:      2024-05-08 21:11:11
last_success_time:      2024-05-08 21:11:11
duration_ms:            25171
next_refresh_time:      2024-05-10 00:00:00
remaining_dependencies: []
exception:              
refresh_count:          1
progress:               inf
elapsed:                0
read_rows:              5704
read_bytes:             90413536
...

We can see that the table has been populated in 25 seconds. The next update is scheduled for May 10th midnight. What if we do not want to wait for so long… Can we trigger it out of band? Yes we can: there is a special SYSTEM REFRESH VIEW statement for that exact purpose.

SYSTEM REFRESH VIEW aws_public_blockchain_idx_update

Let’s get back to our blockchain query to confirm that the index is correct. 

SELECT date, count()
FROM aws_public_blockchain."transactions/**.snappy.parquet"
WHERE _path in (
     SELECT _path FROM aws_public_blockchain_idx
      WHERE column='date' and toDateOrNull(min_value)>='2024-01-01')
  AND date >= '2024-01-01'
GROUP BY date ORDER BY date

┌─date───────┬─count()─┐
│ 2024-01-01 │  657752 │
...
│ 2024-05-08 │  235928 │
└────────────┴─────────┘

129 rows in set. Elapsed: 4.383 sec. Processed 55.35 million rows, 1.41 GB (12.63 million rows/s., 320.62 MB/s.)
Peak memory usage: 452.16 MiB.

Now we can see data up to May 8th as expected!

Conclusion

ClickHouse can access external Parquet data using table functions like s3(), as the Altinity blog has demonstrated on numerous occasions. It works great with a single file, but struggled with collection of files until recently. Using globs somewhat helps, but it requires an understanding of file name patterns. We have identified a few bottlenecks in the ClickHouse code, and fixed them, thanks to the open source. The fixes improved performance of queries over blockchain data almost 20 times! We have also used one new neat ClickHouse feature that allowed us to improve our metadata cache prototype.

This is by no means the end. In our “Building a Better ClickHouse” roadmap we described other improvements that make ClickHouse a better database for big data. We will keep working on them and will report on progress. If you are interested in joining the joy – welcome! ClickHouse is fun, and we are happy to share it with everybody!

P.S., We’re hiring ClickHouse engineers. If you are a wizard with C++ and database internals, check out our careers page

Share

Related: