Home | Benchmarks | Categories | Atom Feed

Posted on Wed 16 January 2019 under Databases

1.1 Billion Taxi Rides: 108-core ClickHouse Cluster

ClickHouse is an open-source, columnar-oriented database. It has a sweet spot where 100s of analysts can query unrolled-up data quickly, even when tens of billions of new records a day are introduced. The infrastructure costs supporting such a system can come in under $100K / year, and potentially half of that if usage permits. At one point Yandex Metrica's ClickHouse installation had 10s of trillions of records. Beyond Yandex, ClickHouse has also seen success at Bloomberg and Cloudflare.

Two years ago I benchmarked the database using a single machine and it came out as the fastest free database software I'd seen complete the benchmark. Since then, they've continued to add features including support for Kafka, HDFS and ZStandard compression. Last year they added support for stacking compression methods so that delta-of-delta compression became possible. When compressing time-series data, gauge values can compress well with delta encoding but counters will do better with delta-of-delta encoding. Good compression has been a key to ClickHouse's performance.

ClickHouse is made up of 170K lines of C++ code when excluding 3rd-party libraries and is one of the smaller distributed database codebases. In contrast, SQLite doesn't support distribution and has 235K lines of C code. As of this writing, 207 engineers have contributed to ClickHouse and the rate of commits has been accelerating for some time.

In March of 2017, ClickHouse began maintaining a change log as an easy way to keep track of developments. They've also broken up the monolithic documentation file into a hierarchy of Markdown-based files. Issues and features for the software are tracked via GitHub and overall this software has become much more approachable in the past few years.

In this post, I'm going to take a look at ClickHouse's clustered performance on AWS EC2 using 36-core CPUs and NVMe storage.

UPDATE: A week after the initial publication of this post I re-ran the benchmark with an improved configuration and achieved much better results. This post has been updated to reflect those changes.

Launching an AWS EC2 Cluster

I'll be using three c5d.9xlarge EC2 instances for this post. They each contain 36 vCPUs, 72 GB of RAM, 900 GB of NVMe SSD storage and support 10 Gigabit networking. They cost $1.962 / hour each in eu-west-1 when launched on-demand. I'll be using Ubuntu Server 16.04 LTS for the operating system.

The firewall is set up so each machine can communicate with one another without restrictions but only my IPv4 address is white-listed to SSH into the cluster.

NVMe Storage, Up and Running

On each of the servers, I'll create an EXT4-formatted file system on the NVMe storage for ClickHouse to work off of.

$ sudo mkfs -t ext4 /dev/nvme1n1
$ sudo mkdir /ch
$ sudo mount /dev/nvme1n1 /ch

Once that's set up you can see its mount point and that 783 GB of capacity is available on each of the systems.

$ lsblk
NAME        MAJ:MIN RM   SIZE RO TYPE MOUNTPOINT
loop0         7:0    0  87.9M  1 loop /snap/core/5742
loop1         7:1    0  16.5M  1 loop /snap/amazon-ssm-agent/784
nvme0n1     259:1    0     8G  0 disk
└─nvme0n1p1 259:2    0     8G  0 part /
nvme1n1     259:0    0 838.2G  0 disk /ch
$ df -h
Filesystem      Size  Used Avail Use% Mounted on
udev             35G     0   35G   0% /dev
tmpfs           6.9G  8.8M  6.9G   1% /run
/dev/nvme0n1p1  7.7G  967M  6.8G  13% /
tmpfs            35G     0   35G   0% /dev/shm
tmpfs           5.0M     0  5.0M   0% /run/lock
tmpfs            35G     0   35G   0% /sys/fs/cgroup
/dev/loop0       88M   88M     0 100% /snap/core/5742
/dev/loop1       17M   17M     0 100% /snap/amazon-ssm-agent/784
tmpfs           6.9G     0  6.9G   0% /run/user/1000
/dev/nvme1n1    825G   73M  783G   1% /ch

The dataset I'll be using in this benchmark is a data dump I've produced of 1.1 billion taxi trips conducted in New York City over a six-year period. The Billion Taxi Rides in Redshift blog post goes into detail on how I put this dataset together. They're stored on AWS S3 so I'll configure the AWS CLI with my access and secret keys.

$ sudo apt update
$ sudo apt install awscli
$ aws configure

I'll set the client's concurrent requests limit to 100 so the files download quicker than they would with stock settings.

$ aws configure set \
    default.s3.max_concurrent_requests \
    100

I'll download the taxi ride dataset off of AWS S3 and store it on the NVMe drive on the first server. This dataset is ~104 GB when in GZIP-compressed, CSV format.

$ sudo mkdir -p /ch/csv
$ sudo chown -R ubuntu /ch/csv
$ aws s3 sync s3://<bucket>/csv /ch/csv

Installing ClickHouse

I'll install OpenJDK's Java 8 distribution as it's needed to run Apache ZooKeeper, a prerequisite of a distributed ClickHouse setup on all three machines.

$ sudo apt update
$ sudo apt install \
    openjdk-8-jre \
    openjdk-8-jdk-headless

I then set the JAVA_HOME environment variable.

$ sudo vi /etc/profile
export JAVA_HOME=/usr
$ source /etc/profile

I'll then use Ubuntu's package management to install ClickHouse 18.16.1, glances and ZooKeeper on all three machines.

$ sudo apt-key adv \
    --keyserver hkp://keyserver.ubuntu.com:80 \
    --recv E0C56BD4
$ echo "deb http://repo.yandex.ru/clickhouse/deb/stable/ main/" | \
    sudo tee /etc/apt/sources.list.d/clickhouse.list
$ sudo apt update

$ sudo apt install \
    clickhouse-client \
    clickhouse-server \
    glances \
    zookeeperd

I'll create a data directory for ClickHouse as well as some configuration overrides on all three servers.

$ sudo mkdir /ch/clickhouse
$ sudo chown -R clickhouse /ch/clickhouse

$ sudo mkdir -p /etc/clickhouse-server/conf.d
$ sudo vi /etc/clickhouse-server/conf.d/taxis.conf

These are the configuration overrides I'll be using.

<?xml version="1.0"?>
<yandex>
    <listen_host>0.0.0.0</listen_host>
    <path>/ch/clickhouse/</path>

    <remote_servers>
        <perftest_3shards>
            <shard>
                <replica>
                    <host>172.30.2.192</host>
                    <port>9000</port>
                 </replica>
            </shard>
            <shard>
                 <replica>
                    <host>172.30.2.162</host>
                    <port>9000</port>
                 </replica>
            </shard>
            <shard>
                 <replica>
                    <host>172.30.2.36</host>
                    <port>9000</port>
                 </replica>
            </shard>
        </perftest_3shards>
    </remote_servers>

    <zookeeper-servers>
        <node>
            <host>172.30.2.192</host>
            <port>2181</port>
        </node>
        <node>
            <host>172.30.2.162</host>
            <port>2181</port>
        </node>
        <node>
            <host>172.30.2.36</host>
            <port>2181</port>
        </node>
    </zookeeper-servers>

    <macros>
        <shard>03</shard>
        <replica>01</replica>
    </macros>
</yandex>

I'll then launch ZooKeeper and the ClickHouse Server on all three machines.

$ sudo /etc/init.d/zookeeper start
$ sudo service clickhouse-server start

Loading Data into ClickHouse

On the first server, I'll create a trips table that will hold the taxi trips dataset using the Log engine.

$ clickhouse-client --host=0.0.0.0
CREATE TABLE trips (
    trip_id                 UInt32,
    vendor_id               String,

    pickup_datetime         DateTime,
    dropoff_datetime        Nullable(DateTime),

    store_and_fwd_flag      Nullable(FixedString(1)),
    rate_code_id            Nullable(UInt8),
    pickup_longitude        Nullable(Float64),
    pickup_latitude         Nullable(Float64),
    dropoff_longitude       Nullable(Float64),
    dropoff_latitude        Nullable(Float64),
    passenger_count         Nullable(UInt8),
    trip_distance           Nullable(Float64),
    fare_amount             Nullable(Float32),
    extra                   Nullable(Float32),
    mta_tax                 Nullable(Float32),
    tip_amount              Nullable(Float32),
    tolls_amount            Nullable(Float32),
    ehail_fee               Nullable(Float32),
    improvement_surcharge   Nullable(Float32),
    total_amount            Nullable(Float32),
    payment_type            Nullable(String),
    trip_type               Nullable(UInt8),
    pickup                  Nullable(String),
    dropoff                 Nullable(String),

    cab_type                Nullable(String),

    precipitation           Nullable(Int8),
    snow_depth              Nullable(Int8),
    snowfall                Nullable(Int8),
    max_temperature         Nullable(Int8),
    min_temperature         Nullable(Int8),
    average_wind_speed      Nullable(Int8),

    pickup_nyct2010_gid     Nullable(Int8),
    pickup_ctlabel          Nullable(String),
    pickup_borocode         Nullable(Int8),
    pickup_boroname         Nullable(String),
    pickup_ct2010           Nullable(String),
    pickup_boroct2010       Nullable(String),
    pickup_cdeligibil       Nullable(FixedString(1)),
    pickup_ntacode          Nullable(String),
    pickup_ntaname          Nullable(String),
    pickup_puma             Nullable(String),

    dropoff_nyct2010_gid    Nullable(UInt8),
    dropoff_ctlabel         Nullable(String),
    dropoff_borocode        Nullable(UInt8),
    dropoff_boroname        Nullable(String),
    dropoff_ct2010          Nullable(String),
    dropoff_boroct2010      Nullable(String),
    dropoff_cdeligibil      Nullable(String),
    dropoff_ntacode         Nullable(String),
    dropoff_ntaname         Nullable(String),
    dropoff_puma            Nullable(String)
) ENGINE = Log;

I'll then decompress and load each of the CSV files into the trips table. The following completed in 55 minutes and 10 seconds. The data directory was 134 GB in size following this operation.

$ time (for FILENAME in /ch/csv/trips_x*.csv.gz; do
            echo $FILENAME
            gunzip -c $FILENAME | \
                clickhouse-client \
                    --host=0.0.0.0 \
                    --query="INSERT INTO trips FORMAT CSV"
        done)

The import rate was 155 MB/s of uncompressed CSV content. I suspect this was due to a bottleneck with GZIP decompression. It might have been quicker to decompress all the gzip files in parallel using xargs and then load in the decompressed data. Below is what glances reported during the CSV import process.

$ sudo glances
ip-172-30-2-200 (Ubuntu 16.04 64bit / Linux 4.4.0-1072-aws)                                                                                                 Uptime: 0:11:42
CPU       8.2%  nice:     0.0%                           LOAD    36-core                           MEM      9.8%  active:    5.20G                           SWAP      0.0%
user:     6.0%  irq:      0.0%                           1 min:    2.24                            total:  68.7G  inactive:  61.0G                           total:       0
system:   0.9%  iowait:   1.3%                           5 min:    1.83                            used:   6.71G  buffers:   66.4M                           used:        0
idle:    91.8%  steal:    0.0%                           15 min:   1.01                            free:   62.0G  cached:    61.6G                           free:        0

NETWORK     Rx/s   Tx/s   TASKS 370 (507 thr), 2 run, 368 slp, 0 oth sorted automatically by cpu_percent, flat view
ens5        136b    2Kb
lo         343Mb  343Mb     CPU%  MEM%  VIRT   RES   PID USER        NI S    TIME+ IOR/s IOW/s Command
                           100.4   1.5 1.65G 1.06G  9909 ubuntu       0 S  1:01.33     0     0 clickhouse-client --host=0.0.0.0 --query=INSERT INTO trips FORMAT CSV
DISK I/O     R/s    W/s     85.1   0.0 4.65M  708K  9908 ubuntu       0 R  0:50.60   32M     0 gzip -d -c /ch/csv/trips_xac.csv.gz
loop0          0      0     54.9   5.1 8.14G 3.49G  8091 clickhous    0 S  1:44.23     0   45M /usr/bin/clickhouse-server --config=/etc/clickhouse-server/config.xml
loop1          0      0      4.5   0.0     0     0   319 root         0 S  0:07.50    1K     0 kworker/u72:2
nvme0n1        0     3K      2.3   0.0 91.1M 28.9M  9912 root         0 R  0:01.56     0     0 /usr/bin/python3 /usr/bin/glances
nvme0n1p1      0     3K      0.3   0.0     0     0   960 root       -20 S  0:00.10     0     0 kworker/28:1H
nvme1n1    32.1M   495M      0.3   0.0     0     0  1058 root       -20 S  0:00.90     0     0 kworker/23:1H

I'll first free up some space on the NVMe drive by removing the source CSV files before continuing.

$ sudo rm -fr /ch/csv

Converting into Columnar Form

ClickHouse's Log engine will store data in a row-centric format. In order to query the data faster, I'll convert it into a columnar-centric format using the MergeTree engine.

$ clickhouse-client --host=0.0.0.0

The following completed in 34 minutes and 50 seconds. The data directory was 237 GB in size following this operation.

CREATE TABLE trips_mergetree
    ENGINE = MergeTree(pickup_date, pickup_datetime, 8192)
    AS SELECT
        trip_id,
        CAST(vendor_id AS Enum8('1' = 1,
                                '2' = 2,
                                'CMT' = 3,
                                'VTS' = 4,
                                'DDS' = 5,
                                'B02512' = 10,
                                'B02598' = 11,
                                'B02617' = 12,
                                'B02682' = 13,
                                'B02764' = 14)) AS vendor_id,
        toDate(pickup_datetime)                 AS pickup_date,
        ifNull(pickup_datetime, toDateTime(0))  AS pickup_datetime,
        toDate(dropoff_datetime)                AS dropoff_date,
        ifNull(dropoff_datetime, toDateTime(0)) AS dropoff_datetime,
        assumeNotNull(store_and_fwd_flag)       AS store_and_fwd_flag,
        assumeNotNull(rate_code_id)             AS rate_code_id,

        assumeNotNull(pickup_longitude)         AS pickup_longitude,
        assumeNotNull(pickup_latitude)          AS pickup_latitude,
        assumeNotNull(dropoff_longitude)        AS dropoff_longitude,
        assumeNotNull(dropoff_latitude)         AS dropoff_latitude,
        assumeNotNull(passenger_count)          AS passenger_count,
        assumeNotNull(trip_distance)            AS trip_distance,
        assumeNotNull(fare_amount)              AS fare_amount,
        assumeNotNull(extra)                    AS extra,
        assumeNotNull(mta_tax)                  AS mta_tax,
        assumeNotNull(tip_amount)               AS tip_amount,
        assumeNotNull(tolls_amount)             AS tolls_amount,
        assumeNotNull(ehail_fee)                AS ehail_fee,
        assumeNotNull(improvement_surcharge)    AS improvement_surcharge,
        assumeNotNull(total_amount)             AS total_amount,
        assumeNotNull(payment_type)             AS payment_type_,
        assumeNotNull(trip_type)                AS trip_type,

        pickup AS pickup,
        pickup AS dropoff,

        CAST(assumeNotNull(cab_type)
            AS Enum8('yellow' = 1, 'green' = 2))
                                AS cab_type,

        precipitation           AS precipitation,
        snow_depth              AS snow_depth,
        snowfall                AS snowfall,
        max_temperature         AS max_temperature,
        min_temperature         AS min_temperature,
        average_wind_speed      AS average_wind_speed,

        pickup_nyct2010_gid     AS pickup_nyct2010_gid,
        pickup_ctlabel          AS pickup_ctlabel,
        pickup_borocode         AS pickup_borocode,
        pickup_boroname         AS pickup_boroname,
        pickup_ct2010           AS pickup_ct2010,
        pickup_boroct2010       AS pickup_boroct2010,
        pickup_cdeligibil       AS pickup_cdeligibil,
        pickup_ntacode          AS pickup_ntacode,
        pickup_ntaname          AS pickup_ntaname,
        pickup_puma             AS pickup_puma,

        dropoff_nyct2010_gid    AS dropoff_nyct2010_gid,
        dropoff_ctlabel         AS dropoff_ctlabel,
        dropoff_borocode        AS dropoff_borocode,
        dropoff_boroname        AS dropoff_boroname,
        dropoff_ct2010          AS dropoff_ct2010,
        dropoff_boroct2010      AS dropoff_boroct2010,
        dropoff_cdeligibil      AS dropoff_cdeligibil,
        dropoff_ntacode         AS dropoff_ntacode,
        dropoff_ntaname         AS dropoff_ntaname,
        dropoff_puma            AS dropoff_puma
    FROM trips;

This is what glances looked like during the operation:

ip-172-30-2-200 (Ubuntu 16.04 64bit / Linux 4.4.0-1072-aws)                                                                                                 Uptime: 1:06:09
CPU      10.3%  nice:     0.0%                           LOAD    36-core                           MEM     16.1%  active:    13.3G                           SWAP      0.0%
user:     7.9%  irq:      0.0%                           1 min:    1.87                            total:  68.7G  inactive:  52.8G                           total:       0
system:   1.6%  iowait:   0.8%                           5 min:    1.76                            used:   11.1G  buffers:   71.8M                           used:        0
idle:    89.7%  steal:    0.0%                           15 min:   1.95                            free:   57.6G  cached:    57.2G                           free:        0

NETWORK     Rx/s   Tx/s   TASKS 367 (523 thr), 1 run, 366 slp, 0 oth sorted automatically by cpu_percent, flat view
ens5         1Kb    8Kb
lo           2Kb    2Kb     CPU%  MEM%  VIRT   RES   PID USER        NI S    TIME+ IOR/s IOW/s Command
                           241.9  12.8 20.7G 8.78G  8091 clickhous    0 S 30:36.73   34M  125M /usr/bin/clickhouse-server --config=/etc/clickhouse-server/config.xml
DISK I/O     R/s    W/s      2.6   0.0 90.4M 28.3M  9948 root         0 R  1:18.53     0     0 /usr/bin/python3 /usr/bin/glances
loop0          0      0      1.3   0.0     0     0   203 root         0 S  0:09.82     0     0 kswapd0
loop1          0      0      0.3   0.1  315M 61.3M 15701 ubuntu       0 S  0:00.40     0     0 clickhouse-client --host=0.0.0.0
nvme0n1        0     3K      0.3   0.0     0     0     7 root         0 S  0:00.83     0     0 rcu_sched
nvme0n1p1      0     3K      0.0   0.0     0     0   142 root         0 S  0:00.22     0     0 migration/27
nvme1n1    25.8M   330M      0.0   0.0 59.7M 1.79M  2764 ubuntu       0 S  0:00.00     0     0 (sd-pam)

In the last benchmark, several columns were cast and re-computed. I found a number of those functions no longer worked properly on this dataset. In order to get around this, I removed the offending functions and loaded in the data without casting into more granular data types.

Distributing Data Across the Cluster

I'll be distributing the data across all three nodes in the cluster. To start, I'll create the table below on all three machines.

$ clickhouse-client --host=0.0.0.0
CREATE TABLE trips_mergetree_third (
    trip_id                 UInt32,
    vendor_id               String,
    pickup_date             Date,
    pickup_datetime         DateTime,
    dropoff_date            Date,
    dropoff_datetime        Nullable(DateTime),
    store_and_fwd_flag      Nullable(FixedString(1)),
    rate_code_id            Nullable(UInt8),
    pickup_longitude        Nullable(Float64),
    pickup_latitude         Nullable(Float64),
    dropoff_longitude       Nullable(Float64),
    dropoff_latitude        Nullable(Float64),
    passenger_count         Nullable(UInt8),
    trip_distance           Nullable(Float64),
    fare_amount             Nullable(Float32),
    extra                   Nullable(Float32),
    mta_tax                 Nullable(Float32),
    tip_amount              Nullable(Float32),
    tolls_amount            Nullable(Float32),
    ehail_fee               Nullable(Float32),
    improvement_surcharge   Nullable(Float32),
    total_amount            Nullable(Float32),
    payment_type            Nullable(String),
    trip_type               Nullable(UInt8),
    pickup                  Nullable(String),
    dropoff                 Nullable(String),

    cab_type                Nullable(String),

    precipitation           Nullable(Int8),
    snow_depth              Nullable(Int8),
    snowfall                Nullable(Int8),
    max_temperature         Nullable(Int8),
    min_temperature         Nullable(Int8),
    average_wind_speed      Nullable(Int8),

    pickup_nyct2010_gid     Nullable(Int8),
    pickup_ctlabel          Nullable(String),
    pickup_borocode         Nullable(Int8),
    pickup_boroname         Nullable(String),
    pickup_ct2010           Nullable(String),
    pickup_boroct2010       Nullable(String),
    pickup_cdeligibil       Nullable(FixedString(1)),
    pickup_ntacode          Nullable(String),
    pickup_ntaname          Nullable(String),
    pickup_puma             Nullable(String),

    dropoff_nyct2010_gid    Nullable(UInt8),
    dropoff_ctlabel         Nullable(String),
    dropoff_borocode        Nullable(UInt8),
    dropoff_boroname        Nullable(String),
    dropoff_ct2010          Nullable(String),
    dropoff_boroct2010      Nullable(String),
    dropoff_cdeligibil      Nullable(String),
    dropoff_ntacode         Nullable(String),
    dropoff_ntaname         Nullable(String),
    dropoff_puma            Nullable(String)
) ENGINE = MergeTree(pickup_date, pickup_datetime, 8192);

I'll then make sure the first server can see all three nodes in the cluster.

SELECT *
FROM system.clusters
WHERE cluster = 'perftest_3shards'
FORMAT Vertical;
Row 1:
──────
cluster:          perftest_3shards
shard_num:        1
shard_weight:     1
replica_num:      1
host_name:        172.30.2.192
host_address:     172.30.2.192
port:             9000
is_local:         1
user:             default
default_database:

Row 2:
──────
cluster:          perftest_3shards
shard_num:        2
shard_weight:     1
replica_num:      1
host_name:        172.30.2.162
host_address:     172.30.2.162
port:             9000
is_local:         0
user:             default
default_database:

Row 3:
──────
cluster:          perftest_3shards
shard_num:        3
shard_weight:     1
replica_num:      1
host_name:        172.30.2.36
host_address:     172.30.2.36
port:             9000
is_local:         0
user:             default
default_database:

I'll then define a new table on the first server that's based on the trips_mergetree_third schema and uses the Distributed engine.

CREATE TABLE trips_mergetree_x3
    AS trips_mergetree_third
    ENGINE = Distributed(perftest_3shards,
                         default,
                         trips_mergetree_third,
                         rand());

I'll then copy the data out of the MergeTree-based table and onto all three servers. The following completed in 34 minutes and 44 seconds.

INSERT INTO trips_mergetree_x3
    SELECT * FROM trips_mergetree;

Following the above operation, I gave ClickHouse 15 minutes to recede from its storage high-water mark. The data directories ended up being 264 GB, 34 GB and 33 GB in size respectively on each of the three servers.

ClickHouse Cluster Benchmark

The following were the fastest times I saw after running each query multiple times on the trips_mergetree_x3 table.

$ clickhouse-client --host=0.0.0.0

The following completed in 2.449 seconds.

SELECT cab_type, count(*)
FROM trips_mergetree_x3
GROUP BY cab_type;

The following completed in 0.691 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_mergetree_x3
GROUP BY passenger_count;

The following completed in 0.582 seconds.

SELECT passenger_count,
       toYear(pickup_date) AS year,
       count(*)
FROM trips_mergetree_x3
GROUP BY passenger_count,
         year;

The following completed in 0.983 seconds.

SELECT passenger_count,
       toYear(pickup_date) AS year,
       round(trip_distance) AS distance,
       count(*)
FROM trips_mergetree_x3
GROUP BY passenger_count,
         year,
         distance
ORDER BY year,
         count(*) DESC;

For comparison, I've run the same queries on the MergeTree-based table which sits solely on the first server.

ClickHouse Single-Node Benchmark

The following were the fastest times I saw after running each query multiple times on the trips_mergetree table.

The following completed in 0.241 seconds.

SELECT cab_type, count(*)
FROM trips_mergetree
GROUP BY cab_type;

The following completed in 0.826 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_mergetree
GROUP BY passenger_count;

The following completed in 1.209 seconds.

SELECT passenger_count,
       toYear(pickup_date) AS year,
       count(*)
FROM trips_mergetree
GROUP BY passenger_count,
         year;

The following completed in 1.781 seconds.

SELECT passenger_count,
       toYear(pickup_date) AS year,
       round(trip_distance) AS distance,
       count(*)
FROM trips_mergetree
GROUP BY passenger_count,
         year,
         distance
ORDER BY year,
         count(*) DESC;

Thoughts on the Results

This is the first time a free, CPU-based database has managed to out-perform a GPU-based database in my benchmarks. That GPU database has since undergone two revisions but nonetheless, the performance ClickHouse has found on a single node is very impressive.

That being said, there is an order of magnitude of overhead when running Query 1 on the distributed engine. I'm hoping I've missed something in my research for this post because it would be good to see all query times drop as I add more nodes to the cluster. It's great the other queries saw a ~2x performance increase when distributed.

It would be nice to see ClickHouse evolve in such a way that storage and compute could be decoupled so that they could scale independently. The HDFS support that has been added in the last year could be a step towards this. On the compute side, if a single query can be sped up as more nodes are added to the cluster then the future for this software will be very bright.

Thank you for taking the time to read this post. I offer both consulting and hands-on development services to clients in North America and Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

Copyright © 2014 - 2025 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.