ClickHouse Cost-Efficiency in Action: Analyzing 500 Billion Rows on an Intel NUC

By Alexander Zaitsev on January 2nd, 2020

ClickHouse Cost-Efficiency in Action: Analyzing 500 Billion Rows on an Intel NUC

Alexander Zaitsev benchmarkCase StudyClickHouseTime Series

Cost-efficiency and performance are critical for big data analytics. For this reason a recent blog post from ScyllaDB guys caught our attention. They collected over 500 billion data points and were able to query it with 1B rows/sec query scan performance. The test rig was a beefy and expensive packet.com cluster: 83 n2.xlarge.x86 instances, 28 cores and 384RAM each. This is a nice demo of ScyllaDB cluster management. But looking at the numbers we realized it’s not very impressive as an example of efficient analytics. We can prove that using ClickHouse.

As ClickHouse users know, 1 billion rows per second is nothing special. Using back-of-envelope math it was obvious that we could store 500B rows and analyze them quickly with a single ClickHouse server. Our first thought was to use a high-powered VM in Amazon. But even this seems like brute force, if you have the right technology and technique.  I have a 3 year old Intel NUC mini PC that we use for conference demos. Why not demonstrate using that? Let’s use some ClickHouse magic to store all of the data on cheap hardware and get better query performance than ScyllaDB.

Test methodology

The ScyllaDB test simulates the scenario of a million homes, all equipped with smart home sensors capable of reporting thermometer readings every minute, for a year’s worth of data. That equates to:

1,000,000 * 24 * 60 * 365 = 525.6 billion data samples for a single year.

The ScyllaDB team used a Go program to generate test data and load it into the database cluster. Once they loaded the data–unfortunately, they do not have numbers to indicate load time–they run another Go program on cluster of workers instances (24 x c2.medium.x86) that issue queries to find temperature anomalies. Being mainly a key-value store, ScyllaDB required a lot of individual lookups by sensor and date pair, followed by aggregation of results. This is how they achieved 1B+ rows/sec scan performance.

Test hardware

For this test we’ll utilize my three-year-old NUC as mentioned above. If you compare it side by side with ScyllaDB setup it looks like a herd of elephants vs. the mouse.

  ScyllaDB ClickHouse
Servers 83 x n2.xlarge.x86 for the database
24 x c2.medium.x86 for the worker nodes
1 Intel NUC
CPU cores 83 x 28 cores (Intel Xeon Gold 5120) + 24 x 24 cores (AMD EPYC 7401p)
= 2900 cores
4 cores (Intel i7-6770HQ)
RAM 83 x 384 GB + 24 x 64 GB
= 33408 GB
32 GB
Storage 83 x 3.8TB NVMe SSD + 24 x 960GB SSD
= 338 TB NVMe SSD
1 TB NVMe SSD

Here is a picture of my Intel NUC.  Note the inspiring performance on the ClickHouse sticker!  (OK, that result was on a bigger machine.)

ClickHouse approach

The ClickHouse schema to store sensor data is very simple and similar to ScyllaDB:

CREATE TABLE billy.readings (
    sensor_id Int32 Codec(DoubleDelta, LZ4),
    time DateTime Codec(DoubleDelta, LZ4),
    date ALIAS toDate(time),
    temperature Decimal(5,2) Codec(T64, LZ4)
) Engine = MergeTree
PARTITION BY toYYYYMM(time)
ORDER BY (sensor_id, time);

One trick here is that we do not need to store a ‘date’ column but use ALIAS instead. ALIAS columns are always calculated in the query. Conversion from DateTime to Date is very efficient, so we can save storage in return for a modest computational cost.

3 columns requires just 12 bytes per measurement uncompressed. Once DoubleDelta and T64 codecs are applied, ClickHouse stores data as compactly as just 1.3 bytes per measurement, significantly reducing storage requirements. Simple math gives us 525.6 billion rows * 1.3 bytes/row = 680GB+. That’s not that much, so we can put everything on a single host as planned.

ScyllaDB used an exponential distribution of temperatures, generating random number for every measurement. That does not match well to real-life scenarios, where temperatures gradually rise through the day and go down at night for every sensor. I therefore decided to generate a different distribution using a composition of two sinusoids: one for the day and another one for the year. Every sensor also has a different average temperature and 5% random error for precision has been applied as well. With a more realistic temperature distribution subsequent values tend to change insignificantly. This allows ClickHouse to compress measurements effectively with the T64 codec.

Data has been populated with the following query. The ‘broken’ sensor 473869 has been added as well. It generates anomalies for 5 minutes on August 27th. This is a needle in the haystack we are going to be searching for.

INSERT INTO billy.readings (sensor_id, time, temperature)
WITH
  toDateTime(toDate('2019-01-01')) as start_time, 
  1000000 as num_sensors,
  365 as num_days,
  24*60 as num_minutes,
  num_days * num_minutes as total_minutes
SELECT
  intDiv(number, num_minutes) % num_sensors as sensor_id, 
  start_time + (intDiv(number, num_minutes*num_sensors) as day)*24*60*60 + (number % num_minutes as minute)*60 time, 
  60 + 20*sin(cityHash64(sensor_id)) /* median deviation */
  + 15*sin(2*pi()/num_days*day) /* seasonal deviation */  
  + 10*sin(2*pi()/num_minutes*minute)*(1 + rand(1)%100/2000) /* daily deviation */
  + if(sensor_id = 473869 and 
       time between '2019-08-27 13:00:00' and '2019-08-27 13:05:00', -50 + rand(2)%100, 0) 
       /* sensor outage, generates huge error on 2019-08-27 */ 
  as temperature
FROM numbers_mt(525600000000)
SETTINGS max_block_size=1048576;

The load query uses multithreaded number generator ‘numbers_mt’ and inserts data in (date, sensor_id, time) sort order — this is the most efficient from insertion performance when we are boosting ClickHouse to its limits. In a real application your ingestion pipeline is going to be different, of course.

Before running the INSERT statement above, let’s stop for a minute and have a look into the test queries. There were 3 of them:

  1. Find minimum and maximum temperature for 3 months of data, and sensor and date when it happened.
  2. Same query as #1 but excluding the sensor that has been returned by the first query.
  3. Same query as #2 but for the whole year of data.

ScyllaDB is not an analytical database, so it has to run a query for every sensor and date, store result in a map and then extract min/max statistics. See the source code for details. ClickHouse can do it more efficiently inside the database itself.

Let’s also note that those queries find extreme temperatures by date and sensor at query time. It looks like an enormous overhead even if data can be cached. It would be better if we calculate daily statistics like max and min at insertion time. In this case we would only need to store two numbers per sensor per day — maximum and minimum temperature — reducing data size dramatically. One of ClickHouse’s signature features — materialized views with aggregating merge tree tables — does exactly this! Thus, we add the following materialized view:

CREATE MATERIALIZED VIEW billy.readings_daily(
  sensor_id Int32 Codec(DoubleDelta, LZ4),
  date Codec(DoubleDelta, LZ4),
  temp_min SimpleAggregateFunction(min, Decimal(5,2)),
  temp_max SimpleAggregateFunction(max, Decimal(5,2))
) Engine = AggregatingMergeTree
PARTITION BY toYYYYMM(date)
ORDER BY (sensor_id, date)
AS 
SELECT sensor_id, date, 
   min(temperature) as temp_min,
   max(temperature) as temp_max
FROM billy.readings
GROUP BY sensor_id, date;

The materialized view inserts minimum and maximum temperature for every sensor and date in real time into an underlying table. ClickHouse does not just calculate min/max for every new inserted block, but also re-aggregates during background merge process thereby keeping the number of rows optimal. At the end we should have 1440 times fewer rows in the aggregate than the source table.

Note the SimpleAggregateFunction data type. This is an instruction to ClickHouse that it should apply ‘min’ or ‘max’ aggregation function during its merge process. More complicated functions require AggregateFunction data type with state and are more difficult to work with. Check out our webinar “ClickHouse and the Magic of Materialized Views” for more detail.

Test results

Data loading is not blazingly fast. It progressed at a speed of 8-9M rows/sec on Intel NUC, and took 17.5 hours to complete. All 4 NUC CPU cores were busy, and the box warmed the room with its fan all night.

Once the data is loaded, we can check compression and row sizes:

SELECT
    table,
    sum(rows) rows, 
    count() parts, 
    sum(data_compressed_bytes) AS comp_bytes, 
    sum(data_uncompressed_bytes) AS uncomp_bytes, 
    round(uncomp_bytes/comp_bytes, 2) AS ratio,
    round(comp_bytes/rows,2) avg_row
FROM system.parts
WHERE active AND database = 'billy' and table like '%readings%'
group by table
order by table;


|table------------------|---------rows-|-parts-|---comp_bytes-|--uncomp_bytes-|-ratio-|-avg_row-|
| .inner.readings_daily |    365004003 |    16 |   1904346383 |    5110056042 |  2.68 |    5.22 |
| readings              | 525600000000 |   150 | 685201424872 | 6307200000000 |   9.2 |     1.3 |
|-----------------------|--------------|-------|--------------|---------------|-------|---------|

As you can see we have 525B rows in the raw data table taking 685GB of storage thanks to 1:9 compression ratio. Our aggregate table (‘.inner.readings_daily’ is an internal storage table for materialized view) has only 365M rows and consumes 0.3% of raw table disk space.

Let’s try to query the raw table first:

SELECT count()
FROM billy.readings

|-----count()--|
| 525600000000 |
|--------------|
1 rows in set. Elapsed: 0.003 sec. 

That was instant. Let’s look now for the latest date we have data for:

SELECT max(date)
FROM billy.readings
WHERE sensor_id = 1

|--max(date)-|
| 2019-12-31 |
|------------|
1 rows in set. Elapsed: 0.479 sec. Processed 942.08 thousand rows, 7.54 MB (1.97 million rows/s., 15.74 MB/s.) 

That was almost instant as well, but only because we had a usable primary key index.

Now let’s run a query to find temperature extremes:

SELECT 
    min(temperature) AS `Absolute Min`, 
    argMin((sensor_id, date), temperature) AS `Absolute Min Sensor`, 
    max(temperature) AS `Absolute Max`, 
    argMax((sensor_id, date), temperature) AS `Absolute Max Sensor`
FROM billy.readings
WHERE (date >= '2019-06-01') AND (date <= '2019-08-31')

? Progress: 13.53 billion rows, 64.54 GB (317.92 million rows/s., 1.52 GB/s.) 

It runs as fast as it can on a mini PC scanning over 300 million rows per second, which is just 3-4 times slower than ScyllaDB cluster. We can estimate that query would take 20 to 30 minutes to complete at this pace. But we do not want to wait, so let’s cancel the query and run it using our materialized view instead:

SELECT 
    min(temp_min) AS `Absolute Min`, 
    argMin((sensor_id, date), temp_min) AS `Absolute Min Sensor`, 
    max(temp_max) AS `Absolute Max`, 
    argMax((sensor_id, date), temp_max) AS `Absolute Max Sensor`
FROM billy.readings_daily
WHERE (date >= '2019-06-01') AND (date <= '2019-08-31')

|-Absolute Min-|-Absolute Min Sensor---|-Absolute Max-|-Absolute Max Sensor---|
|        -9.79 | (473869,'2019-08-27') |       147.31 | (998379,'2019-06-01') |
|--------------|-----------------------|--------------|----------------------
1 rows in set. Elapsed: 2.296 sec. Processed 92.00 million rows, 1.29 GB (40.06 million rows/s., 560.88 MB/s.)  

Our ‘broken’ sensor is there! It took us 2.3s to find it whereas the ScyllaDB cluster took 110 seconds.

SELECT 
    min(temp_min) AS `Absolute Min`, 
    argMin((sensor_id, date), temp_min) AS `Absolute Min Sensor`, 
    max(temp_max) AS `Absolute Max`, 
    argMax((sensor_id, date), temp_max) AS `Absolute Max Sensor`
FROM billy.readings_daily 
WHERE date between '2019-06-01' and '2019-08-31' and sensor_id != 473869;

|-Absolute Min-|-Absolute Min Sensor-|-Absolute Max-|-Absolute Max Sensor---|
|        14.52 | (194,'2019-08-24')  |       147.31 | (998379,'2019-06-01') |
|--------------|---------------------|--------------|-----------------------|
1 rows in set. Elapsed: 2.196 sec. Processed 92.00 million rows, 1.29 GB (41.89 million rows/s., 586.47 MB/s.)  

The timing is about the same as in the previous query. ScyllaDB cluster was better with 938ms due to caching on its 83 data nodes having almost 32TB of memory. Our Intel NUC has 1000 times less memory, so caching is limited. Now let’s run the query for the whole year.

SELECT 
    min(temp_min) AS `Absolute Min`, 
    argMin((sensor_id, date), temp_min) AS `Absolute Min Sensor`, 
    max(temp_max) AS `Absolute Max`, 
    argMax((sensor_id, date), temp_max) AS `Absolute Max Sensor`
FROM billy.readings_daily 
WHERE date between '2019-01-01' and '2019-12-31'and sensor_id != 473869;

|-Absolute Min-|-Absolute Min Sensor-|-Absolute Max-|-Absolute Max Sensor---|
|        14.50 | (892,'2019-10-01')  |       155.08 | (507573,'2019-04-02') |
|--------------|---------------------|--------------|-----------------------|
1 rows in set. Elapsed: 8.634 sec. Processed 365.00 million rows, 5.11 GB (42.28 million rows/s., 591.85 MB/s.) 

8.6 seconds vs 542 seconds in ScyllaDB.

As you can see ClickHouse can answer the same question 10 to 100 times faster using just a tiny box instead of a 83 + 24 node cluster. With a bigger ClickHouse server the results would be even better. And ClickHouse can scale to the cluster as well. Of course, the ScyllaDB test queries raw data, while we have built a dedicated data structure in ClickHouse. The right way to think of it as a special type of index for this use case that is easy to build and cheap to store. The raw data is still preserved, so we can easily query a broken sensor to see how it performs.

SELECT
    toStartOfHour(time) AS h,
    bar(toFloat32(min(temperature)), -15, 50, 50) as " -10 to 50 range "
FROM billy.readings
WHERE (sensor_id = 473869) AND (date = '2019-08-27')
GROUP BY h
ORDER BY h ASC

┌───────────────────h─┬─ -10 to 50 range ──────────────────────────────────┐
│ 2019-08-27 00:00:00 │ █████████████████████████████████████████ │
│ 2019-08-27 01:00:00 │ ███████████████████████████████████████████ │
│ 2019-08-27 02:00:00 │ █████████████████████████████████████████████ │
│ 2019-08-27 03:00:00 │ ██████████████████████████████████████████████ │
│ 2019-08-27 04:00:00 │ ████████████████████████████████████████████████ │
│ 2019-08-27 05:00:00 │ █████████████████████████████████████████████████ │
│ 2019-08-27 06:00:00 │ █████████████████████████████████████████████████ │
│ 2019-08-27 07:00:00 │ ████████████████████████████████████████████████ │
│ 2019-08-27 08:00:00 │ ███████████████████████████████████████████████ │
│ 2019-08-27 09:00:00 │ ██████████████████████████████████████████████ │
│ 2019-08-27 10:00:00 │ ███████████████████████████████████████████ │
│ 2019-08-27 11:00:00 │ █████████████████████████████████████████ │
│ 2019-08-27 12:00:00 │ ████████████████████████████████████████ │
│ 2019-08-27 13:00:00 │ ████ │
│ 2019-08-27 14:00:00 │ ████████████████████████████████████ │
│ 2019-08-27 15:00:00 │ ██████████████████████████████████ │
│ 2019-08-27 16:00:00 │ █████████████████████████████████ │
│ 2019-08-27 17:00:00 │ █████████████████████████████████ │
│ 2019-08-27 18:00:00 │ █████████████████████████████████ │
│ 2019-08-27 19:00:00 │ █████████████████████████████████ │
│ 2019-08-27 20:00:00 │ ██████████████████████████████████ │
│ 2019-08-27 21:00:00 │ ████████████████████████████████████ │
│ 2019-08-27 22:00:00 │ █████████████████████████████████████ │
│ 2019-08-27 23:00:00 │ ████████████████████████████████████████ │
└─────────────────────┴────────────────────────────────────────────────────┘
24 rows in set. Elapsed: 0.082 sec. Processed 106.50 thousand rows, 1.28 MB (1.29 million rows/s., 15.50 MB/s.) 

 Here I used another unique ClickHouse feature in order to visualise results -- ‘bar’ function -- that may evoke nostalgia from the pre-Windows times. You can clearly see the drop at 13:00.

Conclusion

ScyllaDB’s impressive 1B rows/sec brute force scan performance is a cool marketing claim to demonstrate the potential of ScyllaDB to manage clusters. When it comes to real life IoT use cases, however, the choice of technology and proper application is much more important.

ClickHouse can do the same job much more efficiently using advanced compression and materialized views. A single ClickHouse server can be used to collect and monitor temperature data from 1,000,000 homes, find temperature anomalies, provide data for real-time visualisation and much more. Since it is a single server, setting it up, loading 500B rows and running sample queries is very easy. You can try it by yourself during long winter evenings sitting in a comfortable couch in front of a fire. Just don’t forget to check our blog for new articles! Happy New Year!

  1. Hello,
    I got this when create MATERIALIZED VIEW, Do you have any idea?

    Syntax error: failed at position 98 (line 3, col 8):

    CREATE MATERIALIZED VIEW billy.readings_daily(
    sensor_id Int32 Codec(DoubleDelta, LZ4),
    date Codec(DoubleDelta, LZ4),
    temp_min SimpleAggregateFunction(min, Decimal(5,2)),
    temp_max SimpleAggregateFunction(max, Decimal(5,2))
    ) Engine = AggregatingMergeTree
    PARTITION BY toYYYYMM(date)
    ORDER BY (sensor_id, date)
    AS
    SELECT sensor_id, toDate(time) as date,
    min(temperature) as temp_min,
    max(temperature) as temp_max
    FROM billy.readings
    GROUP BY sensor_id, date;

    Expected one of: DEFAULT, MATERIALIZED, ALIAS, COMMENT, CODEC

    1. Looks as though there’s a possible copy paste error. The third line should be:

      date Date Codec(DoubleDelta, LZ4),

      The Date datatype was missing. I fixed the article. Thanks for the double check!

      Also, if you want to load data, create the view then run the load script, which is given previously.

  2. Hello,
    You need to add the POPULATE command, otherwise the materialized view will be empty

    CREATE MATERIALIZED VIEW billy.readings_daily(
    sensor_id Int32 Codec(DoubleDelta, LZ4),
    date Date Codec(DoubleDelta, LZ4),
    temp_min SimpleAggregateFunction(min, Decimal(5,2)),
    temp_max SimpleAggregateFunction(max, Decimal(5,2))
    ) Engine = AggregatingMergeTree
    PARTITION BY toYYYYMM(date)
    ORDER BY (sensor_id, date)
    AS
    POPULATE
    SELECT sensor_id, date,
    min(temperature) as temp_min,
    max(temperature) as temp_max
    FROM billy.readings
    GROUP BY sensor_id, date;

    1. Hi Sergey,
      Your comment is correct. However, POPULATE would take too long if successful at all at this amount of data. In fact, MV has been created upfront before loading the data, but appears in the middle of the story for easier reading.
      Cheers,
      Alexander


Leave a Reply

%d bloggers like this: