Home | Benchmarks | Categories | Atom Feed

Posted on Thu 15 December 2016 under Databases

1.1 Billion Taxi Rides on Amazon Athena

Amazon Athena is server-less way to query your data that lives on S3 using SQL. It excels with datasets that are anywhere up to multiple petabytes in size. It's interface is a simple web page that you can access from the AWS console. Queries cost $5 per terabyte of data scanned with a 10 MB minimum per query. If a query fails you won't be charged for it but if you cancel a query part way through you'll be charged for the data scanned up until that point.

Data can be stored in CSV, JSON, ORC, Parquet and even Apache web logs format. You can even use compressed CSV files in GZIP format to save on query costs and improve performance over regular, uncompressed CSV files.

Athena can execute queries from either the us-east-1 (North Virginia) or the us-west-2 (Oregon) regions though the S3 data being queried can live in other parts of the world.

It's important to note that Athena is not a general purpose database. Under the hood is Presto, a query execution engine that runs on top of the Hadoop stack. Athena's purpose is to ask questions rather than insert records quickly or update random records with low latency.

That being said, Presto's performance, given it can work on some of the world's largest datasets, is impressive. Presto is used daily by analysts at Facebook on their multi-petabyte data warehouse so the fact that such a powerful tool is available via a simple web interface with no servers to manage is pretty amazing to say the least.

In this blog post I'll see how fast AWS Athena can query 1.1 billion taxi rides. The set of data I'm working with was originally put together from their original data sources with scripts written by Todd W Schneider. In my Billion Taxi Rides in Redshift blog post I exported the data from PostgreSQL into denormalised CSV files and compressed them into 56 gzip files. The files in GZIP form are 104 GB in size and decompressed take up around 500 GB of space. This is the same dataset I've used to benchmark BigQuery, Dataproc, Elasticsearch, EMR, MapD, PostgreSQL and Redshift.

AWS CLI Up and Running

All the following commands were run on a fresh install of Ubuntu 14.04.3.

To start, I'll install the AWS CLI tool and a few dependencies it needs to run.

$ sudo apt update
$ sudo apt install \
    python-pip \
    python-virtualenv
$ virtualenv amazon
$ source amazon/bin/activate
$ pip install awscli

I'll then enter my AWS credentials.

$ read AWS_ACCESS_KEY_ID
$ read AWS_SECRET_ACCESS_KEY
$ export AWS_ACCESS_KEY_ID
$ export AWS_SECRET_ACCESS_KEY

I'll run configure to make sure us-east-1 is my default region.

$ aws configure
AWS Access Key ID [********************]:
AWS Secret Access Key [********************]:
Default region name [us-east-1]: us-east-1
Default output format [None]:

The following will allow for 100 concurrent requests to run when interacting with S3 and does a good job of saturating my Internet connection.

$ aws configure set \
    default.s3.max_concurrent_requests \
    100

I've created an S3 bucket in the US Standard region called "taxis-ath". The following will sync the 56 compressed CSV files on my system with that bucket into a folder called "csv".

$ aws s3 sync 20M-blocks/ \
    s3://taxis-ath/csv/

Setting Up Athena

When you first open the Athena web interface there is a step-by-step setup wizard to help you create a table and point it at your data. There is a step where you need to add each of the fields in for the table using buttons and drop downs. Because I have 51 fields and they can be added via SQL I simply created a dummy table to just get past the wizard and get to the query editor where I was able to run the following to setup my trips table.

CREATE EXTERNAL TABLE trips_csv (
    trip_id                 INT,
    vendor_id               VARCHAR(3),
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      VARCHAR(1),
    rate_code_id            SMALLINT,
    pickup_longitude        DECIMAL(18,14),
    pickup_latitude         DECIMAL(18,14),
    dropoff_longitude       DECIMAL(18,14),
    dropoff_latitude        DECIMAL(18,14),
    passenger_count         SMALLINT,
    trip_distance           DECIMAL(6,3),
    fare_amount             DECIMAL(6,2),
    extra                   DECIMAL(6,2),
    mta_tax                 DECIMAL(6,2),
    tip_amount              DECIMAL(6,2),
    tolls_amount            DECIMAL(6,2),
    ehail_fee               DECIMAL(6,2),
    improvement_surcharge   DECIMAL(6,2),
    total_amount            DECIMAL(6,2),
    payment_type            VARCHAR(3),
    trip_type               SMALLINT,
    pickup                  VARCHAR(50),
    dropoff                 VARCHAR(50),

    cab_type                VARCHAR(6),

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          VARCHAR(10),
    pickup_borocode         SMALLINT,
    pickup_boroname         VARCHAR(13),
    pickup_ct2010           VARCHAR(6),
    pickup_boroct2010       VARCHAR(7),
    pickup_cdeligibil       VARCHAR(1),
    pickup_ntacode          VARCHAR(4),
    pickup_ntaname          VARCHAR(56),
    pickup_puma             VARCHAR(4),

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         VARCHAR(10),
    dropoff_borocode        SMALLINT,
    dropoff_boroname        VARCHAR(13),
    dropoff_ct2010          VARCHAR(6),
    dropoff_boroct2010      VARCHAR(7),
    dropoff_cdeligibil      VARCHAR(1),
    dropoff_ntacode         VARCHAR(4),
    dropoff_ntaname         VARCHAR(56),
    dropoff_puma            VARCHAR(4)
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  LOCATION 's3://taxis-ath/csv/';

The CSV Benchmark

With the table setup I ran the following queries to see how quickly they could return their results.

The times quoted below are the lowest query durations seen. As with all my benchmarks, I use the lowest query time as a way of indicating "top speed".

The following completed in 1 minute 53 seconds and scanned 104.26 GB of data.

SELECT cab_type,
       count(*)
FROM trips_csv
GROUP BY cab_type;

The following completed in 1 minute 56 seconds and scanned 104.25 GB of data.

SELECT passenger_count,
       avg(total_amount)
FROM trips_csv
GROUP BY passenger_count;

The following completed in 1 minute 54 seconds and scanned 104.26 GB of data.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_csv
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 2 minutes 35 seconds and scanned 104.3 GB of data.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_csv
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

Optimising using Columnar Storage

Unlike Redshift and BigQuery, Athena doesn't store your data internally so it won't sort, compress and organise the data into columns for you. CSV files don't have summaries of the data they contain in each column and not even by groups of rows.

If data is laid out one column at a time then a query can use just its columns of interest rather than every row in the dataset as it hunts down the columns of data it's interested in.

If that data is sorted then it can have summaries every x many records giving statistics on those values. This allows a query to skip over whole sections of a dataset.

For queries where looking at the value of each cell of information is required, if it reads the data off the disk in a compressed form then it'll not be bottlenecked by the storage device so much. Reading 1 GB of data off a disk will always be faster than reading 10 GB of data off a disk.

The above three optimisations are the kinds you get when you store your data in ORC or Parquet format. If you convert your CSV data into one of these formats you will not only lower your costs of each query, you'll see significant speed increases as well. You can think of ORC and Parquet as the ZIP and RAR formats of the Hadoop ecosystem.

Normally in a query execution tool like Hive on a Hadoop cluster you can run the following to read data from a CSV-formatted table into an ORC formatted table.

INSERT INTO trips_orc
SELECT * FROM trips_csv;

Unfortunately, Athena will return an error stating this functionality has been restricted from their interface.

Your query has the following error(s):

Queries of this type are not supported (Service: AmazonAthena; Status Code: 400; Error Code: InvalidRequestException; Request ID: 1a5d7606-c117-11e6-8d0b-2dd8e091a03e)

Setting up an EMR Cluster

To get around the data conversion restrictions in Athena I'll launch an EMR cluster to convert the data. Do note, there are other ways, including using a stand-alone Spark installation, to convert your data from CSV into Parquet and/or ORC format. Using EMR allows me to convert the data with a minimal amount of shell and SQL scripts, is very reliable and can be scaled out to the petabyte level if need be.

I'll be using a cluster of m3.xlarge instances in the us-east-1 region running EMR 5.2.0 on Amazon's 2.7.3 Hadoop Distribution with Hive as my only optional install. There will be one master node which will be an on-demand instance as we cannot afford to have it disappear on us and 4 spot instances, 2 core nodes and 2 task nodes, which Hadoop should be able to handle the loss of should one of them disappear due to rising spot prices.

The machines have a combined 20 vCPUs, 75 GB of RAM and 400 GB of SSD capacity across 10 drives. The master node costs $0.266 / hour and I've set the maximum price I'm willing to spend on the other 4 nodes to $0.06 / hour each.

I ran the following to create the cluster. It was provisioned and bootstrapped in 9 minutes.

$ aws emr create-cluster \
    --name 'My cluster' \
    --region us-east-1 \
    --release-label emr-5.2.0 \
    --applications \
        Name=Hadoop \
        Name=Hive \
    --instance-groups '[{
        "InstanceCount":2,
        "BidPrice":"0.06",
        "InstanceGroupType":"CORE",
        "InstanceType":"m3.xlarge",
        "Name":"Core - 2"
    },{
        "InstanceCount":2,
        "BidPrice":"0.06",
        "InstanceGroupType":"TASK",
        "InstanceType":"m3.xlarge",
        "Name":"Task - 3"
    },{
        "InstanceCount":1,
        "InstanceGroupType":"MASTER",
        "InstanceType":"m3.xlarge",
        "Name":"Master - 1"
    }]' \
    --ec2-attributes '{
        "KeyName":"emr",
        "InstanceProfile":"EMR_EC2_DefaultRole",
        "SubnetId":"subnet-acf1bfda",
        "EmrManagedSlaveSecurityGroup":"sg-2d321350",
        "EmrManagedMasterSecurityGroup":"sg-3332134e"
    }' \
    --auto-scaling-role EMR_AutoScaling_DefaultRole \
    --service-role EMR_DefaultRole \
    --termination-protected \
    --enable-debugging \
    --scale-down-behavior TERMINATE_AT_INSTANCE_HOUR \
    --log-uri 's3n://aws-logs-591231097547-us-east-1/elasticmapreduce/'

With the cluster up I can SSH into the master node. I've added a ServerAliveInterval parameter to my SSH connection settings so that my terminal shouldn't time out if there is a period of no conversation between my machine and the master node.

$ ssh -i ~/.ssh/emr.pem \
      -o ServerAliveInterval=50 \
      hadoop@54.89.167.90

Once connected, I was greeted with the following.

       __|  __|_  )
       _|  (     /   Amazon Linux AMI
      ___|\___|___|

https://aws.amazon.com/amazon-linux-ami/2016.09-release-notes/
6 package(s) needed for security, out of 10 available
Run "sudo yum update" to apply all updates.

EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR

[hadoop@ip-172-30-3-18 ~]$

I'll first enter a screen so I can leave work running and not worry about an SSH connection failure stopping my workload and then I'll launch Hive.

$ screen
$ hive

I'll create a table for my source CSV data. It will use the existing s3://taxis-ath/csv/ folder of data that I uploaded earlier. Once this table has been created the data is accessible right away.

CREATE EXTERNAL TABLE trips_csv (
    trip_id                 INT,
    vendor_id               VARCHAR(3),
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      VARCHAR(1),
    rate_code_id            SMALLINT,
    pickup_longitude        DECIMAL(18,14),
    pickup_latitude         DECIMAL(18,14),
    dropoff_longitude       DECIMAL(18,14),
    dropoff_latitude        DECIMAL(18,14),
    passenger_count         SMALLINT,
    trip_distance           DECIMAL(6,3),
    fare_amount             DECIMAL(6,2),
    extra                   DECIMAL(6,2),
    mta_tax                 DECIMAL(6,2),
    tip_amount              DECIMAL(6,2),
    tolls_amount            DECIMAL(6,2),
    ehail_fee               DECIMAL(6,2),
    improvement_surcharge   DECIMAL(6,2),
    total_amount            DECIMAL(6,2),
    payment_type            VARCHAR(3),
    trip_type               SMALLINT,
    pickup                  VARCHAR(50),
    dropoff                 VARCHAR(50),

    cab_type                VARCHAR(6),

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          VARCHAR(10),
    pickup_borocode         SMALLINT,
    pickup_boroname         VARCHAR(13),
    pickup_ct2010           VARCHAR(6),
    pickup_boroct2010       VARCHAR(7),
    pickup_cdeligibil       VARCHAR(1),
    pickup_ntacode          VARCHAR(4),
    pickup_ntaname          VARCHAR(56),
    pickup_puma             VARCHAR(4),

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         VARCHAR(10),
    dropoff_borocode        SMALLINT,
    dropoff_boroname        VARCHAR(13),
    dropoff_ct2010          VARCHAR(6),
    dropoff_boroct2010      VARCHAR(7),
    dropoff_cdeligibil      VARCHAR(1),
    dropoff_ntacode         VARCHAR(4),
    dropoff_ntaname         VARCHAR(56),
    dropoff_puma            VARCHAR(4)
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  LOCATION 's3://taxis-ath/csv/';

I'll then create a table to store the data in ORC Format. The data for this table will live in s3://taxis-ath/orc/.

CREATE EXTERNAL TABLE trips_orc (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS orc
  LOCATION 's3://taxis-ath/orc/';

I'll then create a table to store the data in Parquet Format. The data for this table will live in s3://taxis-ath/parquet/.

CREATE EXTERNAL TABLE trips_parquet (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS parquet
  LOCATION 's3://taxis-ath/parquet/';

I've also created both the trips_orc and trips_parquet tables in Athena's query editor as well so once those tables' S3 buckets are populated by Hive their data will be available in Athena's interface as well.

Populating the Tables

The following Hive query will populate the ORC-formatted table. The following completed in 2 hours and 47 minutes.

INSERT INTO TABLE trips_orc
SELECT * FROM trips_csv;
Query ID = hadoop_20161214085649_60eb11e1-cf66-4e44-8d3b-2fff1eab86f1
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1481705329458_0001)

----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container     SUCCEEDED     56         56        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 01/01  [==========================>>] 100%  ELAPSED TIME: 9509.19 s
----------------------------------------------------------------------------------------------
Loading data to table default.trips_orc
OK
Time taken: 10029.06 seconds

The following Hive query will populate the Parquet-formatted table. The following completed in 3 hours and 13 minutes (~15% slower than the ORC table).

INSERT INTO TABLE trips_parquet
SELECT * FROM trips_csv;
Query ID = hadoop_20161214211841_5e63d67d-7f29-4e54-9790-bbb8a864d97b
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1481749215350_0001)

----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container     SUCCEEDED     56         56        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 01/01  [==========================>>] 100%  ELAPSED TIME: 10315.67 s
----------------------------------------------------------------------------------------------
Loading data to table default.trips_parquet
OK
Time taken: 11623.85 seconds

These tables were populated using stock Hive settings for their formats. There is a lot of room for tuning each format's parameters though I've found EMR's Hive settings have had a lot of attention already by AWS' engineers and are pretty good out of the box.

The above queries did run at very different times of the day so I wouldn't consider this a good ORC vs Parquet shoot-out.

Also, different tools in the Hadoop ecosystem have different levels of performance optimisation for each of these formats so it's important to know where your data will be consumed and what the query patterns will look like before deciding on which format to go with for your dataset.

The ORC Benchmark

With the ORC data loaded I ran the following queries in Athena's query editor.

The times quoted below are the lowest query durations seen. As with all my benchmarks, I use the lowest query time as a way of indicating "top speed".

The following completed in 7.5 seconds and scanned 52.81 MB of data.

SELECT cab_type,
       count(*)
FROM trips_orc
GROUP BY cab_type;

The following completed in 4.99 seconds and scanned 2.69 GB of data.

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;

The following completed in 4.85 seconds and scanned 4.87 GB of data.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 3.98 seconds and scanned 7.26 GB of data.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

The 52.81 MB of scanned data in the first query is very significant. When this query ran on the CSV table there was 104.26 GB of data scanned. You're charged based on amount of data scanned in each query on Athena so running the same query on ORC data made it ~2,022x cheaper. The ORC query was also 15x faster as well.

This is why it's important to setup an ETL process to convert a copy of your data from CSV, JSON and whatnot into a columnar format like ORC. I tend to use Airflow for setting up these sorts of automated tasks though there are more AWS-centric ways of tackling this problem as well.

All of the above queries were miles quicker and cheaper to run with the data in ORC format compared to compressed CSVs.

The Parquet Benchmark

With the ORC benchmark complete I turned to running the same queries on the Parquet-formatted table.

The times quoted below are the lowest query durations seen. As with all my benchmarks, I use the lowest query time as a way of indicating "top speed".

The following completed in 6.41 seconds and scanned 603.69 KB of data.

SELECT cab_type,
       count(*)
FROM trips_parquet
GROUP BY cab_type;

The following completed in 6.19 seconds and scanned 102.29 GB of data.

SELECT passenger_count,
       avg(total_amount)
FROM trips_parquet
GROUP BY passenger_count;

The following completed in 6.09 seconds and scanned 102.29 GB of data.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_parquet
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 6.63 seconds and scanned 102.3 GB of data.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_parquet
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

The query times generally didn't outperform the ORC times but the amount of data being scanned is interesting. 3 of the 4 queries scanned almost as much data as the CSV queries making them almost the same price on Athena. But the first query was able to take advantage of Parquet's summary statistics and only read 603.69 KB of data. There is a 10 MB minimum to each query on Athena so the cost was only 10,676x cheaper than querying the CSV data and around ~5x cheaper than the same query on the ORC-formatted table.

I'm suspecting there are improvements that could be made in the way Presto works with Parquet-formatted data. I've seen in the past that ORC-formatted tables generally performed better but I haven't examined their release notes in detail for a while now and there could be some interesting areas of potential optimisation revealed in them.

A Shout-out to the Presto Developers

AWS have done a really good job on getting this service working so well. I believe this will bring a lot of users into the Hadoop ecosystem and it does a lot for usability of the underlying Hadoop tooling. Some people complain that Hadoop environments can be complicated to setup; Athena does a lot to remove those complexities.

But at the core of this service is the query engine Presto. Burak Emre, a contributor to the Presto project, pointed out to me on Twitter that the project itself is small and could use more developers to help keep up the good progress they've been making.

Burak highlighted that there is a guide to getting started with contributing to Presto as well as open tickets that have been marked as "beginner tasks".

In my opinion Presto is one of the best query engines in the Hadoop World so having contributions to it would not only look great on a CV but will help a lot of people with the ever-growing datasets humanity is challenged with analysing.

Thank you for taking the time to read this post. I offer both consulting and hands-on development services to clients in North America and Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

Copyright © 2014 - 2024 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.