When CSV data is imported from originating files into an ORC-formatted table, hive will map the underlying ORC files one-to-one with the CSV files by default on Google Cloud's Dataproc.
So compressed CSV files like this:
2024092233 2016-04-11T09:53:27Z gs://taxi-trips/csv/trips_xaa.csv.gz
2023686578 2016-04-11T09:53:27Z gs://taxi-trips/csv/trips_xab.csv.gz
Will turn into ORC files like this:
2410749849 2016-04-28T11:59:41Z gs://taxi-trips/orc_snappy/000000_0
2431245233 2016-04-28T11:59:42Z gs://taxi-trips/orc_snappy/000001_0
The amount of data in each ORC file is determined, at least up to 2 GB of compressed CSV data, by the amount of data in each CSV file. I wanted to know if smaller files could in some way speed up or adversely affect performance.
Splitting up 1.1 Billion Records
Using the CSV data I created in my Billion Taxi Rides in Redshift blog post I'll create three new copies of the dataset. They will be made up of 64 MB, 256 MB and 1024 MB GZIP files respectively. I had to experiment a bit but after a while I found that to create 64 MB GZIP files of the CSV data I needed to have 655,000 lines in each file, for 256 MB I needed 2,650,000 lines and for 1 GB I needed 10,600,000 lines of data.
I have the original dataset of 56 GZIP-compressed CSV files in a folder called ~/taxi-trips. From that folder I'll create the three new datasets. Below are the commands I ran to do so.
$ cd ~/taxi-trips
$ mkdir 64mb 256mb 1024mb
$ gunzip -c *.csv.gz | \
split -l 655000 \
--filter="gzip > 64mb/trips_\$FILE.csv.gz"
$ gunzip -c *.csv.gz | \
split -l 2650000 \
--filter="gzip > 256mb/trips_\$FILE.csv.gz"
$ gunzip -c *.csv.gz | \
split -l 10600000 \
--filter="gzip > 1024mb/trips_\$FILE.csv.gz"
All resulting datasets were the same size so 104 GB x 4 = 416 GB of drive capacity was taken up.
Dataproc Cluster Up & Running
The CSV data will live on Google Cloud Storage (GCS) and will be imported directly into ORC files that live on HDFS. Below are the commands I ran to upload the data to GCS.
$ gsutil \
-o GSUtil:parallel_composite_upload_threshold=150M \
-m cp \
~/taxi-trips/64mb/*.csv.gz \
gs://taxi-trips/csv-64mb/
$ gsutil \
-o GSUtil:parallel_composite_upload_threshold=150M \
-m cp \
~/taxi-trips/256mb/*.csv.gz \
gs://taxi-trips/csv-256mb/
$ gsutil \
-o GSUtil:parallel_composite_upload_threshold=150M \
-m cp \
~/taxi-trips/1024mb/*.csv.gz \
gs://taxi-trips/csv-1024mb/
Because ORC is a compressed format I shouldn't need more than 120 GB of pre-replicated HDFS capacity for each dataset but to be safe I've provisioned 1,200 GB of capacity on each node in my cluster. Before I launched this cluster I requested a quota increase from 10,240 GB to 14,000 GB for my total persistent disk space in EUROPE-WEST1.
$ gcloud dataproc clusters \
create trips \
--zone europe-west1-b \
--master-machine-type n1-standard-4 \
--master-boot-disk-size 1200 \
--num-workers 10 \
--worker-machine-type n1-standard-4 \
--worker-boot-disk-size 1200 \
--scopes 'https://www.googleapis.com/auth/cloud-platform' \
--project taxis-1273 \
--initialization-actions 'gs://taxi-trips/presto.sh'
To see detailed notes on launching a Dataproc cluster like this please see my blog post Billion Taxi Rides on Google's Dataproc running Presto.
The cluster took around 2 minutes to launch. I was then able to SSH into the master node.
$ gcloud compute ssh \
trips-m \
--zone europe-west1-b
Six Hive Tables
I'll be creating 6 tables in Hive. Three tables for the CSV files which will represent the 64 MB, 256 MB and 1024 MB datasets and three ORC-formatted tables. The ORC-formatted tables store the data in a compressed, columnar form which is much faster to query than when the data is in CSV format.
For clarity the table names for the CSV data are:
- trips_csv_64mb
- trips_csv_256mb
- trips_csv_1024mb
Each of these tables will point to their respective folders on GCS via the LOCATION directive.
CREATE EXTERNAL TABLE trips_csv_64mb (
trip_id INT,
vendor_id VARCHAR(3),
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag VARCHAR(1),
rate_code_id SMALLINT,
pickup_longitude DECIMAL(18,14),
pickup_latitude DECIMAL(18,14),
dropoff_longitude DECIMAL(18,14),
dropoff_latitude DECIMAL(18,14),
passenger_count SMALLINT,
trip_distance DECIMAL(6,3),
fare_amount DECIMAL(6,2),
extra DECIMAL(6,2),
mta_tax DECIMAL(6,2),
tip_amount DECIMAL(6,2),
tolls_amount DECIMAL(6,2),
ehail_fee DECIMAL(6,2),
improvement_surcharge DECIMAL(6,2),
total_amount DECIMAL(6,2),
payment_type VARCHAR(3),
trip_type SMALLINT,
pickup VARCHAR(50),
dropoff VARCHAR(50),
cab_type VARCHAR(6),
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel VARCHAR(10),
pickup_borocode SMALLINT,
pickup_boroname VARCHAR(13),
pickup_ct2010 VARCHAR(6),
pickup_boroct2010 VARCHAR(7),
pickup_cdeligibil VARCHAR(1),
pickup_ntacode VARCHAR(4),
pickup_ntaname VARCHAR(56),
pickup_puma VARCHAR(4),
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel VARCHAR(10),
dropoff_borocode SMALLINT,
dropoff_boroname VARCHAR(13),
dropoff_ct2010 VARCHAR(6),
dropoff_boroct2010 VARCHAR(7),
dropoff_cdeligibil VARCHAR(1),
dropoff_ntacode VARCHAR(4),
dropoff_ntaname VARCHAR(56),
dropoff_puma VARCHAR(4)
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 'gs://taxi-trips/csv-64mb/';
CREATE EXTERNAL TABLE trips_csv_256mb (
trip_id INT,
vendor_id VARCHAR(3),
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag VARCHAR(1),
rate_code_id SMALLINT,
pickup_longitude DECIMAL(18,14),
pickup_latitude DECIMAL(18,14),
dropoff_longitude DECIMAL(18,14),
dropoff_latitude DECIMAL(18,14),
passenger_count SMALLINT,
trip_distance DECIMAL(6,3),
fare_amount DECIMAL(6,2),
extra DECIMAL(6,2),
mta_tax DECIMAL(6,2),
tip_amount DECIMAL(6,2),
tolls_amount DECIMAL(6,2),
ehail_fee DECIMAL(6,2),
improvement_surcharge DECIMAL(6,2),
total_amount DECIMAL(6,2),
payment_type VARCHAR(3),
trip_type SMALLINT,
pickup VARCHAR(50),
dropoff VARCHAR(50),
cab_type VARCHAR(6),
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel VARCHAR(10),
pickup_borocode SMALLINT,
pickup_boroname VARCHAR(13),
pickup_ct2010 VARCHAR(6),
pickup_boroct2010 VARCHAR(7),
pickup_cdeligibil VARCHAR(1),
pickup_ntacode VARCHAR(4),
pickup_ntaname VARCHAR(56),
pickup_puma VARCHAR(4),
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel VARCHAR(10),
dropoff_borocode SMALLINT,
dropoff_boroname VARCHAR(13),
dropoff_ct2010 VARCHAR(6),
dropoff_boroct2010 VARCHAR(7),
dropoff_cdeligibil VARCHAR(1),
dropoff_ntacode VARCHAR(4),
dropoff_ntaname VARCHAR(56),
dropoff_puma VARCHAR(4)
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 'gs://taxi-trips/csv-256mb/';
CREATE EXTERNAL TABLE trips_csv_1024mb (
trip_id INT,
vendor_id VARCHAR(3),
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag VARCHAR(1),
rate_code_id SMALLINT,
pickup_longitude DECIMAL(18,14),
pickup_latitude DECIMAL(18,14),
dropoff_longitude DECIMAL(18,14),
dropoff_latitude DECIMAL(18,14),
passenger_count SMALLINT,
trip_distance DECIMAL(6,3),
fare_amount DECIMAL(6,2),
extra DECIMAL(6,2),
mta_tax DECIMAL(6,2),
tip_amount DECIMAL(6,2),
tolls_amount DECIMAL(6,2),
ehail_fee DECIMAL(6,2),
improvement_surcharge DECIMAL(6,2),
total_amount DECIMAL(6,2),
payment_type VARCHAR(3),
trip_type SMALLINT,
pickup VARCHAR(50),
dropoff VARCHAR(50),
cab_type VARCHAR(6),
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel VARCHAR(10),
pickup_borocode SMALLINT,
pickup_boroname VARCHAR(13),
pickup_ct2010 VARCHAR(6),
pickup_boroct2010 VARCHAR(7),
pickup_cdeligibil VARCHAR(1),
pickup_ntacode VARCHAR(4),
pickup_ntaname VARCHAR(56),
pickup_puma VARCHAR(4),
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel VARCHAR(10),
dropoff_borocode SMALLINT,
dropoff_boroname VARCHAR(13),
dropoff_ct2010 VARCHAR(6),
dropoff_boroct2010 VARCHAR(7),
dropoff_cdeligibil VARCHAR(1),
dropoff_ntacode VARCHAR(4),
dropoff_ntaname VARCHAR(56),
dropoff_puma VARCHAR(4)
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 'gs://taxi-trips/csv-1024mb/';
I'll then create three counterpart tables to store data in ORC format on HDFS. These tables are using, bar one exception, the fastest settings I found in my 33x Faster Queries on Google Cloud's Dataproc blog post. The stripe size, which has a large impact on performance, is set at 67,108,864 bytes so that it's consistent across all three tables and small enough to support the 64 MB version of the dataset.
For reference these are the table names:
- trips_64mb
- trips_256mb
- trips_1024mb
CREATE EXTERNAL TABLE trips_64mb (
trip_id INT,
vendor_id STRING,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag STRING,
rate_code_id SMALLINT,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
passenger_count SMALLINT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
payment_type STRING,
trip_type SMALLINT,
pickup STRING,
dropoff STRING,
cab_type STRING,
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel STRING,
pickup_borocode SMALLINT,
pickup_boroname STRING,
pickup_ct2010 STRING,
pickup_boroct2010 STRING,
pickup_cdeligibil STRING,
pickup_ntacode STRING,
pickup_ntaname STRING,
pickup_puma STRING,
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel STRING,
dropoff_borocode SMALLINT,
dropoff_boroname STRING,
dropoff_ct2010 STRING,
dropoff_boroct2010 STRING,
dropoff_cdeligibil STRING,
dropoff_ntacode STRING,
dropoff_ntaname STRING,
dropoff_puma STRING
) STORED AS orc
LOCATION '/trips_64mb/'
TBLPROPERTIES ("orc.compress"="SNAPPY",
"orc.stripe.size"="67108864",
"orc.row.index.stride"="50000");
CREATE EXTERNAL TABLE trips_256mb (
trip_id INT,
vendor_id STRING,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag STRING,
rate_code_id SMALLINT,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
passenger_count SMALLINT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
payment_type STRING,
trip_type SMALLINT,
pickup STRING,
dropoff STRING,
cab_type STRING,
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel STRING,
pickup_borocode SMALLINT,
pickup_boroname STRING,
pickup_ct2010 STRING,
pickup_boroct2010 STRING,
pickup_cdeligibil STRING,
pickup_ntacode STRING,
pickup_ntaname STRING,
pickup_puma STRING,
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel STRING,
dropoff_borocode SMALLINT,
dropoff_boroname STRING,
dropoff_ct2010 STRING,
dropoff_boroct2010 STRING,
dropoff_cdeligibil STRING,
dropoff_ntacode STRING,
dropoff_ntaname STRING,
dropoff_puma STRING
) STORED AS orc
LOCATION '/trips_256mb/'
TBLPROPERTIES ("orc.compress"="SNAPPY",
"orc.stripe.size"="67108864",
"orc.row.index.stride"="50000");
CREATE EXTERNAL TABLE trips_1024mb (
trip_id INT,
vendor_id STRING,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag STRING,
rate_code_id SMALLINT,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
passenger_count SMALLINT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
payment_type STRING,
trip_type SMALLINT,
pickup STRING,
dropoff STRING,
cab_type STRING,
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel STRING,
pickup_borocode SMALLINT,
pickup_boroname STRING,
pickup_ct2010 STRING,
pickup_boroct2010 STRING,
pickup_cdeligibil STRING,
pickup_ntacode STRING,
pickup_ntaname STRING,
pickup_puma STRING,
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel STRING,
dropoff_borocode SMALLINT,
dropoff_boroname STRING,
dropoff_ct2010 STRING,
dropoff_boroct2010 STRING,
dropoff_cdeligibil STRING,
dropoff_ntacode STRING,
dropoff_ntaname STRING,
dropoff_puma STRING
) STORED AS orc
LOCATION '/trips_1024mb/'
TBLPROPERTIES ("orc.compress"="SNAPPY",
"orc.stripe.size"="67108864",
"orc.row.index.stride"="50000");
Importing 3.3 Billion Records
I then ran the following to import the data from CSV into ORC format.
$ screen
$ echo "INSERT INTO TABLE trips_64mb
SELECT * FROM trips_csv_64mb;
INSERT INTO TABLE trips_256mb
SELECT * FROM trips_csv_256mb;
INSERT INTO TABLE trips_1024mb
SELECT * FROM trips_csv_1024mb;" | hive
Underlying Files
Breaking up the source files into 64, 256 and 1024 MB chunks results in ORC file counts that match their CSV counterparts. Below you can see the 64 MB dataset has 1,701 files, the 256 MB dataset has 421 files and the 1024 MB dataset has 106 files.
$ hadoop dfs -ls /trips_64mb | grep -c trips_
1701
$ hadoop dfs -ls /trips_256mb | grep -c trips_
421
$ hadoop dfs -ls /trips_1024mb | grep -c trips_
106
Benchmarking File Size Differences
I ran the following queries using Presto 0.144.1.
$ presto \
--catalog hive \
--schema default
I ran this query on each table repeatedly. Listed below are the lowest, and often most common query times.
SELECT cab_type,
count(*)
FROM <table>
GROUP BY cab_type;
- trips_64mb: 13 seconds
- trips_256mb: 13 seconds
- trips_1024mb: 12 seconds
As you can see there is no noticeable difference in query performance despite the difference in the number of underlying files used to store each table's data.
Here are the times for the following query.
SELECT passenger_count,
avg(total_amount)
FROM <table>
GROUP BY passenger_count;
- trips_64mb: 10 seconds
- trips_256mb: 10 seconds
- trips_1024mb: 11 seconds
Again, no real difference in query times.