ClickHouse is the workhorse of many services at Yandex and several other large Internet firms in Russia. These companies serve an audience of 258 million Russian speakers worldwide and have some of the greatest demands for distributed OLAP systems in Europe.
This year has seen good progress in ClickHouse's development and stability. Support has been added for HDFS, ZFS and Btrfs for both reading datasets and storing table data, a T64 codec which can significantly improve ZStandard compression, faster LZ4 performance and tiered storage.
Anyone uncomfortable with the number of moving parts in a typical Hadoop setup might find assurances in ClickHouse as being a single piece of software rather than a loose collection of several different projects. For anyone unwilling to pay for Cloud hosting, ClickHouse can run off a laptop running MacOS; paired with Python and Tableau there would be little reason to connect to the outside world for most analytical operations. Being written in C++ means there are no JVM configurations to consider when running in standalone mode.
ClickHouse relies heavily on 3rd-party libraries which helps keep the C++ code base at ~300K lines of code. To contrast, PostgreSQL's current master branch has about 800K lines of C code and MySQL has 2M lines of C++. There have been 13 developers that have made at least 100 commits to the project this year. PostgreSQL has only had five developers reach the same target, MySQL has only seen two. ClickHouse's engineers have managed to deliver a new release every ten days on average for the past 2.5 years.
In this post I'm going to benchmark several ways of importing data into ClickHouse.
ClickHouse Up & Running
ClickHouse supports clustering but for the sake of simplicity I'll be using a single machine for these benchmarks. The machine in question has an Intel Core i5 4670K clocked at 3.4 GHz, 8 GB of DDR3 RAM, a SanDisk SDSSDHII960G 960 GB SSD drive which is connected via a SATA interface. The machine is running a fresh installation of Ubuntu 16.04.2 LTS and I'll be running version 19.15.3.6 of ClickHouse.
The dataset used in this post is the first 80 million records from my 1.1 Billion Taxi Ride Benchmarks. The steps taken to produce this dataset are described here. The 80 million lines are broken up into 4 files of 20 million lines each. There have been three formats of each file produced: uncompressed CSV totalling 36.4 GB, GZIP-compressed CSV totalling 7.5 GB and Snappy-compressed Parquet format totalling 7.6 GB.
Below I'll install ClickHouse 19.15.3.6, MySQL 5.7.27, PostgreSQL 11.5, OpenJDK and ZooKeeper for Kafka and Pigz, a parallel GZIP implementation.
$ sudo apt-key adv \
--keyserver hkp://keyserver.ubuntu.com:80 \
--recv E0C56BD4
$ wget -qO- https://www.postgresql.org/media/keys/ACCC4CF8.asc \
| sudo apt-key add -
$ echo "deb http://repo.yandex.ru/clickhouse/deb/stable/ main/" \
| sudo tee /etc/apt/sources.list.d/clickhouse.list
$ echo "deb http://apt.postgresql.org/pub/repos/apt/ xenial-pgdg main" \
| sudo tee /etc/apt/sources.list.d/pgdg.list
$ sudo apt update
$ sudo apt install \
clickhouse-client \
clickhouse-server \
mysql-server \
openjdk-8-jre \
openjdk-8-jdk-headless \
pigz \
postgresql \
postgresql-contrib \
zookeeperd
I'll be using TCP port 9000 for Hadoop's Name Node later in this post so I'll move ClickHouse's server port from 9000 to 9001.
$ sudo vi /etc/clickhouse-server/config.xml
Below is the line I changed in the file above.
<tcp_port>9001</tcp_port>
I'll then launch the ClickHouse Server.
$ sudo service clickhouse-server start
I'll save the password used to connect to the server in a configuration file along with the non-standard TCP port. This will save me having to specify these each time I connect using the ClickHouse Client.
$ mkdir -p ~/.clickhouse-client
$ vi ~/.clickhouse-client/config.xml
<config>
<port>9001</port>
<password>root</password>
</config>
Throughout this post I'll be importing data into a ClickHouse table which uses the "Log" engine to store data in a row-centric fashion.
$ clickhouse-client
CREATE TABLE trips (
trip_id UInt32,
vendor_id String,
pickup_datetime DateTime,
dropoff_datetime Nullable(DateTime),
store_and_fwd_flag Nullable(FixedString(1)),
rate_code_id Nullable(UInt8),
pickup_longitude Nullable(Float64),
pickup_latitude Nullable(Float64),
dropoff_longitude Nullable(Float64),
dropoff_latitude Nullable(Float64),
passenger_count Nullable(UInt8),
trip_distance Nullable(Float64),
fare_amount Nullable(Float32),
extra Nullable(Float32),
mta_tax Nullable(Float32),
tip_amount Nullable(Float32),
tolls_amount Nullable(Float32),
ehail_fee Nullable(Float32),
improvement_surcharge Nullable(Float32),
total_amount Nullable(Float32),
payment_type Nullable(String),
trip_type Nullable(UInt8),
pickup Nullable(String),
dropoff Nullable(String),
cab_type Nullable(String),
precipitation Nullable(Int8),
snow_depth Nullable(Int8),
snowfall Nullable(Int8),
max_temperature Nullable(Int8),
min_temperature Nullable(Int8),
average_wind_speed Nullable(Int8),
pickup_nyct2010_gid Nullable(Int8),
pickup_ctlabel Nullable(String),
pickup_borocode Nullable(Int8),
pickup_boroname Nullable(String),
pickup_ct2010 Nullable(String),
pickup_boroct2010 Nullable(String),
pickup_cdeligibil Nullable(FixedString(1)),
pickup_ntacode Nullable(String),
pickup_ntaname Nullable(String),
pickup_puma Nullable(String),
dropoff_nyct2010_gid Nullable(UInt8),
dropoff_ctlabel Nullable(String),
dropoff_borocode Nullable(UInt8),
dropoff_boroname Nullable(String),
dropoff_ct2010 Nullable(String),
dropoff_boroct2010 Nullable(String),
dropoff_cdeligibil Nullable(String),
dropoff_ntacode Nullable(String),
dropoff_ntaname Nullable(String),
dropoff_puma Nullable(String)
) ENGINE = Log;
Importing From MySQL
I'll first setup a login and password for MySQL.
$ sudo su
$ mysql -uroot -p
CREATE USER 'mark'@'localhost' IDENTIFIED BY 'test';
GRANT ALL PRIVILEGES ON *.* TO 'mark'@'localhost';
FLUSH PRIVILEGES;
$ exit
Then I'll create a database and a table that will store the 80 million trips dataset.
$ mysql -umark -p
CREATE DATABASE trips;
USE trips
CREATE TABLE trips (
trip_id INT,
vendor_id VARCHAR(3),
pickup_datetime DATETIME,
dropoff_datetime DATETIME,
store_and_fwd_flag VARCHAR(1),
rate_code_id SMALLINT,
pickup_longitude DECIMAL(18,14),
pickup_latitude DECIMAL(18,14),
dropoff_longitude DECIMAL(18,14),
dropoff_latitude DECIMAL(18,14),
passenger_count SMALLINT,
trip_distance DECIMAL(18,6),
fare_amount DECIMAL(18,6),
extra DECIMAL(18,6),
mta_tax DECIMAL(18,6),
tip_amount DECIMAL(18,6),
tolls_amount DECIMAL(18,6),
ehail_fee DECIMAL(18,6),
improvement_surcharge DECIMAL(18,6),
total_amount DECIMAL(18,6),
payment_type VARCHAR(3),
trip_type SMALLINT,
pickup VARCHAR(50),
dropoff VARCHAR(50),
cab_type VARCHAR(6),
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel VARCHAR(10),
pickup_borocode SMALLINT,
pickup_boroname VARCHAR(13),
pickup_ct2010 VARCHAR(6),
pickup_boroct2010 VARCHAR(7),
pickup_cdeligibil VARCHAR(1),
pickup_ntacode VARCHAR(4),
pickup_ntaname VARCHAR(56),
pickup_puma VARCHAR(4),
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel VARCHAR(10),
dropoff_borocode SMALLINT,
dropoff_boroname VARCHAR(13),
dropoff_ct2010 VARCHAR(6),
dropoff_boroct2010 VARCHAR(7),
dropoff_cdeligibil VARCHAR(1),
dropoff_ntacode VARCHAR(4),
dropoff_ntaname VARCHAR(56),
dropoff_puma VARCHAR(4),
PRIMARY KEY (trip_id)
);
I'll then import the four CSV files containing the 80 million trips into MySQL. Note I'm sourcing the data from the uncompressed CSVs.
LOAD DATA LOCAL INFILE '/home/mark/000000_0.csv'
INTO TABLE trips
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
LOAD DATA LOCAL INFILE '/home/mark/000000_1.csv'
INTO TABLE trips
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
LOAD DATA LOCAL INFILE '/home/mark/000000_2.csv'
INTO TABLE trips
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
LOAD DATA LOCAL INFILE '/home/mark/000000_3.csv'
INTO TABLE trips
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
The above took 20 minutes and 23 seconds to complete.
I'll launch ClickHouse's Client, create a table pointing to the trips table on MySQL and then import that data into a ClickHouse table which will store the records using the "Log" engine.
$ clickhouse-client
CREATE DATABASE mysql_db
ENGINE = MySQL('localhost:3306', 'trips', 'mark', 'test');
INSERT INTO trips
SELECT * FROM mysql_db.trips;
The above took 10 minutes and 57 seconds. MySQL's internal format needed 42 GB of space to store the dataset. The dataset is 9.9 GB when kept in ClickHouse's internal Log engine format. During the import I could see ClickHouse using 50% of a CPU core and MySQL needing 2.75 CPU cores of capacity. The disk throughput required never breached more than 20% of what the SSD is capable of.
Importing From JSON
I'll dump the 80 million rows in ClickHouse to JSON.
$ clickhouse-client --query='SELECT * FROM trips FORMAT JSONEachRow' \
> out.json
The resulting JSON file is 100 GB in size and took 9 minutes and 22 seconds to produce.
Here is a sampling of the first three rows.
$ head -n3 out.json
{"trip_id":471162830,"vendor_id":"CMT","pickup_datetime":"2011-09-27 10:19:11","dropoff_datetime":"2011-09-27 10:24:19","store_and_fwd_flag":"N","rate_code_id":1,"pickup_longitude":-73.994426,"pickup_latitude":40.745873,"dropoff_longitude":-74.005886,"dropoff_latitude":40.745395,"passenger_count":1,"trip_distance":0.8,"fare_amount":4.9,"extra":0,"mta_tax":0.5,"tip_amount":0,"tolls_amount":0,"ehail_fee":0,"improvement_surcharge":0,"total_amount":5.4,"payment_type":"CSH","trip_type":0,"pickup":"1156","dropoff":"2098","cab_type":"yellow","precipitation":0,"snow_depth":0,"snowfall":0,"max_temperature":-6,"min_temperature":-50,"average_wind_speed":16,"pickup_nyct2010_gid":-124,"pickup_ctlabel":"91","pickup_borocode":1,"pickup_boroname":"Manhattan","pickup_ct2010":"009100","pickup_boroct2010":"1009100","pickup_cdeligibil":"I","pickup_ntacode":"MN13","pickup_ntaname":"Hudson Yards-Chelsea-Flatiron-Union Square","pickup_puma":"3807","dropoff_nyct2010_gid":50,"dropoff_ctlabel":"99","dropoff_borocode":1,"dropoff_boroname":"Manhattan","dropoff_ct2010":"009900","dropoff_boroct2010":"1009900","dropoff_cdeligibil":"I","dropoff_ntacode":"MN13","dropoff_ntaname":"Hudson Yards-Chelsea-Flatiron-Union Square","dropoff_puma":"3807"}
{"trip_id":471162831,"vendor_id":"VTS","pickup_datetime":"2011-09-15 17:14:00","dropoff_datetime":"2011-09-15 17:48:00","store_and_fwd_flag":"\u0000","rate_code_id":1,"pickup_longitude":-73.968207,"pickup_latitude":40.752457,"dropoff_longitude":-74.008213,"dropoff_latitude":40.74876,"passenger_count":1,"trip_distance":3.15,"fare_amount":17.7,"extra":1,"mta_tax":0.5,"tip_amount":0,"tolls_amount":0,"ehail_fee":0,"improvement_surcharge":0,"total_amount":19.2,"payment_type":"CSH","trip_type":0,"pickup":"1432","dropoff":"2098","cab_type":"yellow","precipitation":18,"snow_depth":0,"snowfall":0,"max_temperature":-12,"min_temperature":117,"average_wind_speed":21,"pickup_nyct2010_gid":-104,"pickup_ctlabel":"90","pickup_borocode":1,"pickup_boroname":"Manhattan","pickup_ct2010":"009000","pickup_boroct2010":"1009000","pickup_cdeligibil":"I","pickup_ntacode":"MN19","pickup_ntaname":"Turtle Bay-East Midtown","pickup_puma":"3808","dropoff_nyct2010_gid":50,"dropoff_ctlabel":"99","dropoff_borocode":1,"dropoff_boroname":"Manhattan","dropoff_ct2010":"009900","dropoff_boroct2010":"1009900","dropoff_cdeligibil":"I","dropoff_ntacode":"MN13","dropoff_ntaname":"Hudson Yards-Chelsea-Flatiron-Union Square","dropoff_puma":"3807"}
{"trip_id":471162832,"vendor_id":"VTS","pickup_datetime":"2011-09-25 12:55:00","dropoff_datetime":"2011-09-25 13:05:00","store_and_fwd_flag":"\u0000","rate_code_id":1,"pickup_longitude":-73.99651,"pickup_latitude":40.72563,"dropoff_longitude":-74.007203,"dropoff_latitude":40.75135,"passenger_count":1,"trip_distance":2.48,"fare_amount":8.5,"extra":0,"mta_tax":0.5,"tip_amount":1,"tolls_amount":0,"ehail_fee":0,"improvement_surcharge":0,"total_amount":10,"payment_type":"CRD","trip_type":0,"pickup":"1471","dropoff":"2098","cab_type":"yellow","precipitation":0,"snow_depth":0,"snowfall":0,"max_temperature":11,"min_temperature":-45,"average_wind_speed":4,"pickup_nyct2010_gid":-65,"pickup_ctlabel":"55.02","pickup_borocode":1,"pickup_boroname":"Manhattan","pickup_ct2010":"005502","pickup_boroct2010":"1005502","pickup_cdeligibil":"I","pickup_ntacode":"MN23","pickup_ntaname":"West Village","pickup_puma":"3810","dropoff_nyct2010_gid":50,"dropoff_ctlabel":"99","dropoff_borocode":1,"dropoff_boroname":"Manhattan","dropoff_ct2010":"009900","dropoff_boroct2010":"1009900","dropoff_cdeligibil":"I","dropoff_ntacode":"MN13","dropoff_ntaname":"Hudson Yards-Chelsea-Flatiron-Union Square","dropoff_puma":"3807"}
I'll split the JSON file into four files so that I can attempt parallel imports of the JSON data.
$ split --lines=20000000 out.json
The above operation produced four files of 25 GB each.
$ ls -alht xa*
-rw-rw-r-- 1 mark mark 25G Oct 15 04:13 xad
-rw-rw-r-- 1 mark mark 25G Oct 15 04:09 xac
-rw-rw-r-- 1 mark mark 25G Oct 15 04:06 xab
-rw-rw-r-- 1 mark mark 25G Oct 15 04:02 xaa
I'll truncate the trips table in ClickHouse and then import the above four JSON files sequentially.
$ clickhouse-client --query="TRUNCATE TABLE trips"
$ for FILENAME in xa*; do
clickhouse-client \
--query="INSERT INTO trips FORMAT JSONEachRow" < $FILENAME
done
The above took 10 minutes and 11 seconds. The CPU usage during the import never breached 50%. The SSD achieved read speeds of 225 MB/s and writes of 40 MB/s.
I'll truncate the trips table and attempt a parallel import to see if I can reduce the import time.
$ clickhouse-client --query="TRUNCATE TABLE trips"
$ vi jobs
clickhouse-client --query=\"INSERT INTO trips FORMAT JSONEachRow\" < xaa
clickhouse-client --query=\"INSERT INTO trips FORMAT JSONEachRow\" < xab
clickhouse-client --query=\"INSERT INTO trips FORMAT JSONEachRow\" < xac
clickhouse-client --query=\"INSERT INTO trips FORMAT JSONEachRow\" < xad
$ cat jobs | xargs -n1 -P4 -I% bash -c "%"
The above attempt with four parallel processes failed with the following error.
Code: 209. DB::NetException: Timeout exceeded while reading from socket ([::1]:9000): while receiving packet from localhost:9000
I'll attempt a parallel import with two processes instead.
$ cat jobs | xargs -n1 -P2 -I% bash -c "%"
The above took 8 minutes and 43 seconds.
Since the SSD is showing such a high read rate I'll compress the JSON files and attempt a sequential import.
$ pigz --keep xa*
9.2 GB of GZIP-compressed JSON files were produced by the above operation. I'll then truncate the trips table and import the GZIP-compressed data.
$ clickhouse-client --query="TRUNCATE TABLE trips"
$ for FILENAME in xa*.gz; do
gunzip -c $FILENAME \
| clickhouse-client \
--query="INSERT INTO trips FORMAT JSONEachRow"
done
The above took 8 minutes and 9 seconds.
Importing From PostgreSQL
I'll create credentials in PostgreSQL, a database and then a table which will store the 80 million trips.
$ sudo -u postgres \
bash -c "psql -c \"CREATE USER mark
WITH PASSWORD 'test'
SUPERUSER;\""
$ createdb trips
$ psql trips
CREATE TABLE trips (
trip_id INT,
vendor_id VARCHAR(3),
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag VARCHAR(1),
rate_code_id SMALLINT,
pickup_longitude DECIMAL(18,14),
pickup_latitude DECIMAL(18,14),
dropoff_longitude DECIMAL(18,14),
dropoff_latitude DECIMAL(18,14),
passenger_count SMALLINT,
trip_distance DECIMAL(18,6),
fare_amount DECIMAL(18,6),
extra DECIMAL(18,6),
mta_tax DECIMAL(18,6),
tip_amount DECIMAL(18,6),
tolls_amount DECIMAL(18,6),
ehail_fee DECIMAL(18,6),
improvement_surcharge DECIMAL(18,6),
total_amount DECIMAL(18,6),
payment_type VARCHAR(3),
trip_type SMALLINT,
pickup VARCHAR(50),
dropoff VARCHAR(50),
cab_type VARCHAR(6),
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel VARCHAR(10),
pickup_borocode SMALLINT,
pickup_boroname VARCHAR(13),
pickup_ct2010 VARCHAR(6),
pickup_boroct2010 VARCHAR(7),
pickup_cdeligibil VARCHAR(1),
pickup_ntacode VARCHAR(4),
pickup_ntaname VARCHAR(56),
pickup_puma VARCHAR(4),
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel VARCHAR(10),
dropoff_borocode SMALLINT,
dropoff_boroname VARCHAR(13),
dropoff_ct2010 VARCHAR(6),
dropoff_boroct2010 VARCHAR(7),
dropoff_cdeligibil VARCHAR(1),
dropoff_ntacode VARCHAR(4),
dropoff_ntaname VARCHAR(56),
dropoff_puma VARCHAR(4)
);
I will then import the 80 million records from four CSV files sequentially.
\copy trips FROM '000000_0.csv' DELIMITER ',' CSV
\copy trips FROM '000000_1.csv' DELIMITER ',' CSV
\copy trips FROM '000000_2.csv' DELIMITER ',' CSV
\copy trips FROM '000000_3.csv' DELIMITER ',' CSV
The above completed in 11 minutes and 32 seconds. Nether the SSD nor the CPU were at capacity during the above operation. It's worth looking at tuning PostgreSQL, examining alternative import methods and seeing if parallelism could have a strong effect on import times. The resulting dataset is 31 GB in PostgreSQL's internal format.
I'll truncate the trips table in ClickHouse and import the dataset from PostgreSQL using UNIX pipes to deliver CSV data to ClickHouse.
$ clickhouse-client --query="TRUNCATE TABLE trips"
$ psql trips -c "COPY trips TO STDOUT WITH CSV" \
| clickhouse-client --query="INSERT INTO trips FORMAT CSV"
The above took 9 minutes and 39 seconds. The CPU showed 70% of capacity being utilised while the SSD showed peaks of 60 MB/s being read and 120 MB/s being written at any one time.
Importing From Kafka
I'll install Kafka manually using the binary package distributed by one of Apache's mirrors.
$ sudo mkdir -p /opt/kafka
$ wget -c -O kafka.tgz \
http://www-eu.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz
$ sudo tar xzvf kafka.tgz \
--directory=/opt/kafka \
--strip 1
I'll then create a log file for Kafka which will be owned by my UNIX account.
$ sudo touch /var/log/kafka.log
$ sudo chown mark /var/log/kafka.log
I'll then launch Kafka's server process.
$ sudo nohup /opt/kafka/bin/kafka-server-start.sh \
/opt/kafka/config/server.properties \
> /var/log/kafka.log 2>&1 &
I'll create a trips topic in Kafka.
$ /opt/kafka/bin/kafka-topics.sh \
--create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic trips
I'll then import the CSV data into the trips topic in Kafka.
$ cat 000000_[0-3].csv \
| /opt/kafka/bin/kafka-console-producer.sh \
--topic trips \
--broker-list localhost:9092 \
1>/dev/null
The above took 8 minutes and 30 seconds. During that time the CPU was at 90% of capacity and the SSD achieved read and write speeds between 70 - 100 MB/s at any one time. The resulting dataset is 36 GB in Kafka's internal format.
I'll truncate the trips table in ClickHouse, then create a table in ClickHouse which points to the trips Kafka topic and then import its contents into the trips table in ClickHouse.
$ clickhouse-client --query="TRUNCATE TABLE trips"
$ clickhouse-client
CREATE TABLE trips_kafka_csv (
trip_id UInt32,
vendor_id String,
pickup_datetime DateTime,
dropoff_datetime Nullable(DateTime),
store_and_fwd_flag Nullable(FixedString(1)),
rate_code_id Nullable(UInt8),
pickup_longitude Nullable(Float64),
pickup_latitude Nullable(Float64),
dropoff_longitude Nullable(Float64),
dropoff_latitude Nullable(Float64),
passenger_count Nullable(UInt8),
trip_distance Nullable(Float64),
fare_amount Nullable(Float32),
extra Nullable(Float32),
mta_tax Nullable(Float32),
tip_amount Nullable(Float32),
tolls_amount Nullable(Float32),
ehail_fee Nullable(Float32),
improvement_surcharge Nullable(Float32),
total_amount Nullable(Float32),
payment_type Nullable(String),
trip_type Nullable(UInt8),
pickup Nullable(String),
dropoff Nullable(String),
cab_type Nullable(String),
precipitation Nullable(Int8),
snow_depth Nullable(Int8),
snowfall Nullable(Int8),
max_temperature Nullable(Int8),
min_temperature Nullable(Int8),
average_wind_speed Nullable(Int8),
pickup_nyct2010_gid Nullable(Int8),
pickup_ctlabel Nullable(String),
pickup_borocode Nullable(Int8),
pickup_boroname Nullable(String),
pickup_ct2010 Nullable(String),
pickup_boroct2010 Nullable(String),
pickup_cdeligibil Nullable(FixedString(1)),
pickup_ntacode Nullable(String),
pickup_ntaname Nullable(String),
pickup_puma Nullable(String),
dropoff_nyct2010_gid Nullable(UInt8),
dropoff_ctlabel Nullable(String),
dropoff_borocode Nullable(UInt8),
dropoff_boroname Nullable(String),
dropoff_ct2010 Nullable(String),
dropoff_boroct2010 Nullable(String),
dropoff_cdeligibil Nullable(String),
dropoff_ntacode Nullable(String),
dropoff_ntaname Nullable(String),
dropoff_puma Nullable(String)
) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'trips',
kafka_group_name = 'group1',
kafka_format = 'CSV',
kafka_num_consumers = 4;
INSERT INTO trips
SELECT * FROM trips_kafka_csv;
During the above operation the CPU was at 50% capacity and there was almost no disk activity to speak of. The progress indicator showed 3,170 rows per second being imported into ClickHouse.
→ Progress: 5.67 million rows, 3.33 GB (3.17 thousand rows/s., 1.86 MB/s.)
This is a capture from top during the above operation.
top - 02:53:24 up 1:30, 3 users, load average: 0.99, 2.14, 1.75
Tasks: 206 total, 2 running, 204 sleeping, 0 stopped, 0 zombie
%Cpu0 : 0.4 us, 0.0 sy, 0.0 ni, 99.6 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu1 : 1.6 us, 0.4 sy, 0.0 ni, 98.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu2 : 8.3 us, 91.7 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu3 : 0.4 us, 0.4 sy, 0.0 ni, 99.3 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
KiB Mem : 24.3/8157556 [||||||||||||||||||||||| ]
KiB Swap: 1.2/8385532 [| ]
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
11422 clickho+ 20 0 5544596 417140 29588 S 150.2 5.1 3:24.86 /usr/bin/clickhouse-server
30574 root 20 0 5730472 998060 10408 S 0.7 12.2 8:58.01 java -Xmx1G -Xms1G -server
Had I allowed the operation to continue it would have taken seven hours to complete. I haven't come across any obvious parameter tuning in the ClickHouse manual at this point so I'll have to see if this can be addressed in a later post.
Traditionally I'd use Flume to land Kafka streams onto HDFS and then run periodic jobs via Airflow to import time-partitioned datasets into OLAP systems similar to ClickHouse. At some point when I've uncovered the bottleneck above I'll aim to publish a comparison of these methods.
Importing CSV
I'll import four GZIP-compressed CSV files sequentially using gunzip to decompress the contents before presenting them to ClickHouse via a UNIX pipe. Each file is slightly less than 1,929 MB in size when compressed.
$ clickhouse-client --query="TRUNCATE TABLE trips"
$ for FILENAME in *.csv.gz; do
gunzip -c $FILENAME \
| clickhouse-client \
--query="INSERT INTO trips FORMAT CSV"
done
The above completed in 4 minutes and 45 seconds.
I'll perform the same import but use four parallel processes.
$ clickhouse-client --query="TRUNCATE TABLE trips"
$ vi jobs
gunzip -c 000000_0.csv.gz | clickhouse-client --query=\"INSERT INTO trips FORMAT CSV\"
gunzip -c 000000_1.csv.gz | clickhouse-client --query=\"INSERT INTO trips FORMAT CSV\"
gunzip -c 000000_2.csv.gz | clickhouse-client --query=\"INSERT INTO trips FORMAT CSV\"
gunzip -c 000000_3.csv.gz | clickhouse-client --query=\"INSERT INTO trips FORMAT CSV\"
$ cat jobs | xargs -n1 -P4 -I% bash -c "%"
The above completed in 4 minutes and 39 seconds. Barely any improvement at all.
I'll see if using data that is already decompressed speeds up the import process. Below I'll import four decompressed files sequentially.
$ clickhouse-client --query="TRUNCATE TABLE trips"
$ for FILENAME in *.csv; do
clickhouse-client \
--query="INSERT INTO trips FORMAT CSV" < $FILENAME
done
The above completed in 5 minutes and 59 seconds.
I'll try the same operation with four parallel processes.
$ clickhouse-client --query="TRUNCATE TABLE trips"
$ vi jobs
clickhouse-client --query=\"INSERT INTO trips FORMAT CSV\" < 000000_0.csv
clickhouse-client --query=\"INSERT INTO trips FORMAT CSV\" < 000000_1.csv
clickhouse-client --query=\"INSERT INTO trips FORMAT CSV\" < 000000_2.csv
clickhouse-client --query=\"INSERT INTO trips FORMAT CSV\" < 000000_3.csv
$ cat jobs | xargs -n1 -P4 -I% bash -c "%"
The above completed in 6 minutes and 6 seconds. It appears that parallel imports, regardless of file compression, has little or even a negative impact on import times. The increased amount of data being read off of disk appears to have a negative impact on importing versus simply reading compressed data off disk and decompressing it just prior to import.
Importing Parquet
The Parquet dataset is Snappy-compressed and is made up of four files that are ~1,983 MB each in size.
$ clickhouse-client --query="TRUNCATE TABLE trips"
$ for FILENAME in *.pq; do
cat $FILENAME \
| clickhouse-client \
--query="INSERT INTO trips FORMAT Parquet"
done
The above completed in 2 minutes and 38 seconds.
I'll attempt the same operation again but using four parallel processes to import the data.
$ clickhouse-client --query="TRUNCATE TABLE trips"
$ vi jobs
cat 000000_0.pq | clickhouse-client --query=\"INSERT INTO trips FORMAT Parquet\"
cat 000000_1.pq | clickhouse-client --query=\"INSERT INTO trips FORMAT Parquet\"
cat 000000_2.pq | clickhouse-client --query=\"INSERT INTO trips FORMAT Parquet\"
cat 000000_3.pq | clickhouse-client --query=\"INSERT INTO trips FORMAT Parquet\"
$ cat jobs | xargs -n1 -P4 -I% bash -c "%"
The above completed in 2 minutes and 48 seconds. Much like what was seen with CSV data, parallel imports didn't have a positive impact on total import time.
Importing From HDFS
I've installed HDFS via the Hadoop distribution provided by Apache. The installation steps I took can be found in my Hadoop 3 Installation Guide. Do note I didn't install Hive, Presto nor Spark for this exercise as they're not needed.
I'll copy the decompressed CSV files onto HDFS.
$ hdfs dfs -mkdir -p /csv
$ hdfs dfs -copyFromLocal ~/*.csv /csv/
I'll then connect to ClickHouse and create a table representing the CSV data on HDFS.
$ clickhouse-client
CREATE TABLE trips_hdfs_csv (
trip_id UInt32,
vendor_id String,
pickup_datetime DateTime,
dropoff_datetime Nullable(DateTime),
store_and_fwd_flag Nullable(FixedString(1)),
rate_code_id Nullable(UInt8),
pickup_longitude Nullable(Float64),
pickup_latitude Nullable(Float64),
dropoff_longitude Nullable(Float64),
dropoff_latitude Nullable(Float64),
passenger_count Nullable(UInt8),
trip_distance Nullable(Float64),
fare_amount Nullable(Float32),
extra Nullable(Float32),
mta_tax Nullable(Float32),
tip_amount Nullable(Float32),
tolls_amount Nullable(Float32),
ehail_fee Nullable(Float32),
improvement_surcharge Nullable(Float32),
total_amount Nullable(Float32),
payment_type Nullable(String),
trip_type Nullable(UInt8),
pickup Nullable(String),
dropoff Nullable(String),
cab_type Nullable(String),
precipitation Nullable(Int8),
snow_depth Nullable(Int8),
snowfall Nullable(Int8),
max_temperature Nullable(Int8),
min_temperature Nullable(Int8),
average_wind_speed Nullable(Int8),
pickup_nyct2010_gid Nullable(Int8),
pickup_ctlabel Nullable(String),
pickup_borocode Nullable(Int8),
pickup_boroname Nullable(String),
pickup_ct2010 Nullable(String),
pickup_boroct2010 Nullable(String),
pickup_cdeligibil Nullable(FixedString(1)),
pickup_ntacode Nullable(String),
pickup_ntaname Nullable(String),
pickup_puma Nullable(String),
dropoff_nyct2010_gid Nullable(UInt8),
dropoff_ctlabel Nullable(String),
dropoff_borocode Nullable(UInt8),
dropoff_boroname Nullable(String),
dropoff_ct2010 Nullable(String),
dropoff_boroct2010 Nullable(String),
dropoff_cdeligibil Nullable(String),
dropoff_ntacode Nullable(String),
dropoff_ntaname Nullable(String),
dropoff_puma Nullable(String)
) ENGINE = HDFS('hdfs://localhost:9000/csv/*', 'CSV');
I'll then truncate the trips table and import the data off HDFS into a native ClickHouse table.
TRUNCATE TABLE trips;
INSERT INTO trips
SELECT * FROM trips_hdfs_csv;
The above completed in 3 minutes and 11 seconds. During the import the CPU was almost completely utilised. ClickHouse had almost three cores saturated while the HDFS DataNode took half a core to itself.
top - 07:13:06 up 30 min, 2 users, load average: 6.72, 3.33, 1.76
Tasks: 201 total, 3 running, 198 sleeping, 0 stopped, 0 zombie
%Cpu0 : 78.0 us, 15.7 sy, 0.0 ni, 1.0 id, 3.5 wa, 0.0 hi, 1.7 si, 0.0 st
%Cpu1 : 40.0 us, 29.1 sy, 0.0 ni, 1.1 id, 8.7 wa, 0.0 hi, 21.1 si, 0.0 st
%Cpu2 : 57.6 us, 20.9 sy, 0.0 ni, 10.4 id, 9.7 wa, 0.0 hi, 1.4 si, 0.0 st
%Cpu3 : 70.3 us, 15.9 sy, 0.0 ni, 5.1 id, 5.8 wa, 0.0 hi, 2.9 si, 0.0 st
KiB Mem : 8157556 total, 149076 free, 2358936 used, 5649544 buff/cache
KiB Swap: 8386556 total, 8298996 free, 87560 used. 5465288 avail Mem
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
1102 clickho+ 20 0 5583684 1.446g 74952 S 289.0 18.6 4:39.64 /usr/bin/clickhouse-server
1894 root 20 0 3836828 276156 24244 S 0.3 3.4 0:09.45 /usr/bin/java -Dproc_namenode
2059 root 20 0 3823000 268328 24388 S 47.2 3.3 3:28.72 /usr/bin/java -Dproc_datanode
Given the large amount of memory allocated towards the buffer / cache I suspect HDFS has done a good job of getting Linux to keep as many HDFS blocks as possible in the Page Cache.
I'll perform the same import off of HDFS but this time I'll use a Parquet-formatted, Snappy-compressed copy of the dataset. Below I'll copy the Parquet files onto HDFS.
$ hdfs dfs -mkdir -p /pq
$ hdfs dfs -copyFromLocal ~/*.pq /pq/
I'll then create a table in ClickHouse to represent that dataset as a table.
$ clickhouse-client
CREATE TABLE trips_hdfs_pq (
trip_id UInt32,
vendor_id String,
pickup_datetime DateTime,
dropoff_datetime Nullable(DateTime),
store_and_fwd_flag Nullable(FixedString(1)),
rate_code_id Nullable(UInt8),
pickup_longitude Nullable(Float64),
pickup_latitude Nullable(Float64),
dropoff_longitude Nullable(Float64),
dropoff_latitude Nullable(Float64),
passenger_count Nullable(UInt8),
trip_distance Nullable(Float64),
fare_amount Nullable(Float32),
extra Nullable(Float32),
mta_tax Nullable(Float32),
tip_amount Nullable(Float32),
tolls_amount Nullable(Float32),
ehail_fee Nullable(Float32),
improvement_surcharge Nullable(Float32),
total_amount Nullable(Float32),
payment_type Nullable(String),
trip_type Nullable(UInt8),
pickup Nullable(String),
dropoff Nullable(String),
cab_type Nullable(String),
precipitation Nullable(Int8),
snow_depth Nullable(Int8),
snowfall Nullable(Int8),
max_temperature Nullable(Int8),
min_temperature Nullable(Int8),
average_wind_speed Nullable(Int8),
pickup_nyct2010_gid Nullable(Int8),
pickup_ctlabel Nullable(String),
pickup_borocode Nullable(Int8),
pickup_boroname Nullable(String),
pickup_ct2010 Nullable(String),
pickup_boroct2010 Nullable(String),
pickup_cdeligibil Nullable(FixedString(1)),
pickup_ntacode Nullable(String),
pickup_ntaname Nullable(String),
pickup_puma Nullable(String),
dropoff_nyct2010_gid Nullable(UInt8),
dropoff_ctlabel Nullable(String),
dropoff_borocode Nullable(UInt8),
dropoff_boroname Nullable(String),
dropoff_ct2010 Nullable(String),
dropoff_boroct2010 Nullable(String),
dropoff_cdeligibil Nullable(String),
dropoff_ntacode Nullable(String),
dropoff_ntaname Nullable(String),
dropoff_puma Nullable(String)
) ENGINE = HDFS('hdfs://localhost:9000/pq/*', 'Parquet');
I'll then truncate the trips table and import the data off HDFS into a native ClickHouse table.
TRUNCATE TABLE trips;
INSERT INTO trips
SELECT * FROM trips_hdfs_pq;
The above operation failed stating I'd hit a single-query memory limit for ClickHouse
Received exception from server (version 19.15.3):
Code: 241. DB::Exception: Received from localhost:9001. DB::Exception: Memory limit (for query) exceeded: would use 10.63 GiB (attempt to allocate chunk of 2684354560 bytes), maximum: 9.31 GiB.
I had to upgrade the RAM on the system from 8 GB to 20 GB and set the maximum memory limit to an arbitrarily high number.
TRUNCATE TABLE trips;
SET max_memory_usage = 20000000000;
INSERT INTO trips
SELECT * FROM trips_hdfs_pq;
The above completed in 1 minute and 56 seconds. The fastest of any method I've attempted. With the above the CPU was almost fully utilised, the SSD was nearing its throughput limits and almost the entirety of the RAM available to the system was in use.
Conclusion
The format in which data is presented and the disk system it is presented with to ClickHouse can have a huge impact on import times. Importing compressed JSON off of disk was 7.3x slower than loading Parquet off of HDFS. Parallelism doesn't seem to have any benefits either.
It was great to see how fast Parquet data would load off of HDFS but it's a shame to see the RAM-intensity of this operation. Every other import ran fine with no more than 8 GB of system memory. A ratio of 8 GB of RAM per CPU core appears to be the sweet spot for this particular workload.
Below is a summary, sorted from fastest to slowest, of the above imports.
Seconds | Method |
---|---|
116 | Snappy-compressed Parquet off HDFS (sequential) |
158 | Snappy-compressed Parquet off disk (sequential) |
168 | Snappy-compressed Parquet off disk (parallel x 4) |
191 | Uncompressed CSV off HDFS (sequential) |
278 | GZIP-compressed CSV off disk (parallel x 4) |
285 | GZIP-compressed CSV off disk (sequential) |
358 | Uncompressed CSV off disk (sequential) |
366 | Uncompressed CSV off disk (parallel x 4) |
522 | Uncompressed JSON off disk (parallel x 2) |
579 | PostgreSQL |
610 | Uncompressed JSON off disk (sequential) |
657 | MySQL |
849 | GZIP-compressed JSON off disk (sequential) |
Another interesting highlight was the time it took to get the dataset into PostgreSQL, MySQL and Kafka sans any indexing.
Seconds | Method |
---|---|
358 | Uncompressed CSV into ClickHouse |
510 | Uncompressed CSV into Kafka |
692 | Uncompressed CSV into PostgreSQL |
1223 | Uncompressed CSV into MySQL |
The above could probably be improved upon but nonetheless I'm surprised to see such a delta with bulk importing into row-centric storage.