Home | Benchmarks | Categories | Atom Feed

Posted on Thu 26 April 2018 under Databases

Using SQL to query Kafka, MongoDB, MySQL, PostgreSQL and Redis with Presto

Presto is a query engine that began life at Facebook five years ago. Today it's used by over 1,000 Facebook staff members to analyse 300+ petabytes of data that they keep in their data warehouse. Presto doesn't store any data itself but instead interfaces with existing databases. It's common for Presto to query ORC-formatted files on HDFS or S3 but there is also support for a wide variety of other storage systems as well.

In many organisations there is no single database that all information is stored in. ETL processes can be useful for copying data between databases but there can be cases where company policy doesn't allow for copying certain data around and/or the overhead of keeping the data fresh is too much of a burden.

Presto has the ability to query multiple databases in the same query. The data can even be stored in different pieces of database software and that software doesn't even need to keep data stored in a tabular form; document stores, key-value stores and streams work just as well. Presto can run a SQL query against a Kafka topic stream while joining dimensional data from PostgreSQL, Redis, MongoDB and ORC-formatted files on HDFS in the same query.

Presto is a very fast query engine but will ultimately be limited by the databases it's connecting to. Presto running an aggregation query on a billion records in ORC format stored on HDFS will almost always outperform running the same query against a MySQL server.

In this blog post I'll walk through the setup and data imports for five databases and then query them using Presto 0.196. The databases and versions being used are Kafka 1.1.0, MongoDB 3.2.19, MySQL 5.7.21, PostgreSQL 9.5.12 and Redis 3.0.6.

I'll be using the first 1,000 records from the dataset I use in my 1.1 Billion Taxi Rides benchmarks. This dataset has 51 fields comprised of a variety of data types. To save time I've cast all of the fields as VARCHARs when mapping them in the non-tabular storage engines.

The machine I'm using has an Intel Core i5-4670K CPU clocked at 3.40 GHz, 12 GB of RAM and 200 GB of NVMe, SSD-based storage capacity. I'll be using a fresh installation of Ubuntu 16.04.2 LTS with a single-node Hadoop installation built off the instructions in my Hadoop 3 Single-Node Install Guide blog post. The instructions for installing Presto can be found in that guide.

Installing Five Different Databases

I'll first add MongoDB's repository details to my Ubuntu installation. There will be a complaint that "The following signatures couldn't be verified because the public key is not available". Despite this MongoDB will still install. The problem is described in detail here.

$ sudo apt-key adv \
    --keyserver hkp://keyserver.ubuntu.com:80 \
    --recv EA312927
$ echo "deb [ arch=amd64 ] https://repo.mongodb.org/apt/ubuntu xenial/mongodb-org/3.6 multiverse" | \
    sudo tee /etc/apt/sources.list.d/mongodb-org.list
$ sudo apt update

I'll then use Ubuntu's package management to install everything except Kafka.

$ sudo apt install \
    mongodb-org \
    mysql-server \
    postgresql \
    redis-server \
    zookeeperd

By default MySQL & PostgreSQL are both setup to start after installation and start after each time the system boots. The following will do the same for MongoDB and Redis.

$ for SERVICE in mongod redis-server; do
      sudo systemctl start  $SERVICE
      sudo systemctl enable $SERVICE
  done

I'll install Kafka manually using the binary package distributed by one of Apache's mirrors.

$ sudo mkdir /opt/kafka
$ wget -c -O kafka.tgz \
    http://www-eu.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
$ sudo tar xzvf kafka.tgz \
    --directory=/opt/kafka \
    --strip 1

I'll then create a log file for Kafka which will be owned by my UNIX account.

$ sudo touch /var/log/kafka.log
$ sudo chown mark /var/log/kafka.log

I'll then launch Kafka's server process.

$ sudo nohup /opt/kafka/bin/kafka-server-start.sh \
             /opt/kafka/config/server.properties \
             > /var/log/kafka.log 2>&1 &

Importing Data into Kafka

I'll create a Kafka topic called "trips" with both a replication factor and partition count of 1. These aren't values you'd find in production systems and should be treated as only being used for educational purposes.

$ /opt/kafka/bin/kafka-topics.sh \
    --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic trips

The 1.1 billion records are stored in CSV format and are split up into 56 GZIP-compressed files. I'll decompress the first 1,000 lines from the first of the 56 GZIP files and import those records into the "trips" Kafka topic.

$ gunzip -c trips_xaa.csv.gz \
    | head -n1000 \
    | /opt/kafka/bin/kafka-console-producer.sh \
        --topic trips \
        --broker-list localhost:9092

The data will be stored as raw CSV data in Kafka with one line of CSV content per event. Later on in this blog post I'll create configuration in Presto that will cast each Kafka event into a named and typed, tabular form.

Importing Data into MongoDB

When importing into MongoDB I'll use a file to name each of the fields in the CSV data.

$ vi fields.list
trip_id
vendor_id
pickup_datetime
dropoff_datetime
store_and_fwd_flag
rate_code_id
pickup_longitude
pickup_latitude
dropoff_longitude
dropoff_latitude
passenger_count
trip_distance
fare_amount
extra
mta_tax
tip_amount
tolls_amount
ehail_fee
improvement_surcharge
total_amount
payment_type
trip_type
pickup
dropoff
cab_type
precipitation
snow_depth
snowfall
max_temperature
min_temperature
average_wind_speed
pickup_nyct2010_gid
pickup_ctlabel
pickup_borocode
pickup_boroname
pickup_ct2010
pickup_boroct2010
pickup_cdeligibil
pickup_ntacode
pickup_ntaname
pickup_puma
dropoff_nyct2010_gid
dropoff_ctlabel
dropoff_borocode
dropoff_boroname
dropoff_ct2010
dropoff_boroct2010
dropoff_cdeligibil
dropoff_ntacode
dropoff_ntaname
dropoff_puma

That file listing the field names is included as a parameter below. Again, I'm importing the first 1,000 records of the taxi trip dataset.

$ zcat trips_xaa.csv.gz \
    | head -n1000 \
    | mongoimport \
        --db taxi \
        --collection trips \
        --type csv \
        --fieldFile fields.list \
        --numInsertionWorkers 4

Importing Data into MySQL

Before importing any data into MySQL I'll create access credentials. The following will create an account with mark as the username, test as the password and grant all privileges on the taxi database.

$ sudo bash -c "mysql -uroot -p \
    -e\"CREATE USER 'mark'@'localhost' IDENTIFIED BY 'test';
        GRANT ALL PRIVILEGES ON taxi.* TO 'mark'@'localhost';
        FLUSH PRIVILEGES;\""

MySQL doesn't support importing GZIP data directly via STDIN so I'll decompress 1,000 records into a CSV file before connecting to MySQL.

$ gunzip -c trips_xaa.csv.gz \
    | head -n1000 \
    > trips.csv
$ mysql -umark -p

Below I'll create a new "taxi" database and a "trips" table within it.

CREATE DATABASE taxi;
USE taxi

CREATE TABLE trips (
    trip_id                 INT,
    vendor_id               VARCHAR(3),
    pickup_datetime         DATETIME,
    dropoff_datetime        DATETIME,
    store_and_fwd_flag      VARCHAR(1),
    rate_code_id            SMALLINT,
    pickup_longitude        DECIMAL(18,14),
    pickup_latitude         DECIMAL(18,14),
    dropoff_longitude       DECIMAL(18,14),
    dropoff_latitude        DECIMAL(18,14),
    passenger_count         SMALLINT,
    trip_distance           DECIMAL(6,3),
    fare_amount             DECIMAL(6,2),
    extra                   DECIMAL(6,2),
    mta_tax                 DECIMAL(6,2),
    tip_amount              DECIMAL(6,2),
    tolls_amount            DECIMAL(6,2),
    ehail_fee               DECIMAL(6,2),
    improvement_surcharge   DECIMAL(6,2),
    total_amount            DECIMAL(6,2),
    payment_type            VARCHAR(3),
    trip_type               SMALLINT,
    pickup                  VARCHAR(50),
    dropoff                 VARCHAR(50),

    cab_type                VARCHAR(6),

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          VARCHAR(10),
    pickup_borocode         SMALLINT,
    pickup_boroname         VARCHAR(13),
    pickup_ct2010           VARCHAR(6),
    pickup_boroct2010       VARCHAR(7),
    pickup_cdeligibil       VARCHAR(1),
    pickup_ntacode          VARCHAR(4),
    pickup_ntaname          VARCHAR(56),
    pickup_puma             VARCHAR(4),

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         VARCHAR(10),
    dropoff_borocode        SMALLINT,
    dropoff_boroname        VARCHAR(13),
    dropoff_ct2010          VARCHAR(6),
    dropoff_boroct2010      VARCHAR(7),
    dropoff_cdeligibil      VARCHAR(1),
    dropoff_ntacode         VARCHAR(4),
    dropoff_ntaname         VARCHAR(56),
    dropoff_puma            VARCHAR(4)
);

The following will load the uncompressed CSV file into the trips table.

LOAD DATA
LOCAL INFILE 'trips.csv'
INTO TABLE trips
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';

Importing Data into PostgreSQL

As I did with MySQL, I'll create an account, database and trips table in PostgreSQL.

$ sudo -u postgres \
    bash -c "psql -c \"CREATE USER mark
                       WITH PASSWORD 'test'
                       SUPERUSER;\""
$ createdb taxi
$ psql taxi
CREATE TABLE trips (
    trip_id                 INT,
    vendor_id               VARCHAR(3),
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      VARCHAR(1),
    rate_code_id            SMALLINT,
    pickup_longitude        DECIMAL(18,14),
    pickup_latitude         DECIMAL(18,14),
    dropoff_longitude       DECIMAL(18,14),
    dropoff_latitude        DECIMAL(18,14),
    passenger_count         SMALLINT,
    trip_distance           DECIMAL(6,3),
    fare_amount             DECIMAL(6,2),
    extra                   DECIMAL(6,2),
    mta_tax                 DECIMAL(6,2),
    tip_amount              DECIMAL(6,2),
    tolls_amount            DECIMAL(6,2),
    ehail_fee               DECIMAL(6,2),
    improvement_surcharge   DECIMAL(6,2),
    total_amount            DECIMAL(6,2),
    payment_type            VARCHAR(3),
    trip_type               SMALLINT,
    pickup                  VARCHAR(50),
    dropoff                 VARCHAR(50),

    cab_type                VARCHAR(6),

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          VARCHAR(10),
    pickup_borocode         SMALLINT,
    pickup_boroname         VARCHAR(13),
    pickup_ct2010           VARCHAR(6),
    pickup_boroct2010       VARCHAR(7),
    pickup_cdeligibil       VARCHAR(1),
    pickup_ntacode          VARCHAR(4),
    pickup_ntaname          VARCHAR(56),
    pickup_puma             VARCHAR(4),

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         VARCHAR(10),
    dropoff_borocode        SMALLINT,
    dropoff_boroname        VARCHAR(13),
    dropoff_ct2010          VARCHAR(6),
    dropoff_boroct2010      VARCHAR(7),
    dropoff_cdeligibil      VARCHAR(1),
    dropoff_ntacode         VARCHAR(4),
    dropoff_ntaname         VARCHAR(56),
    dropoff_puma            VARCHAR(4)
);

PostgreSQL does support importing uncompressed CSV data via STDIN. Below is the command I ran to import 1,000 taxi trip records.

$ zcat trips_xaa.csv.gz \
    | head -n1000 \
    | psql taxi \
        -c "COPY trips
            FROM stdin CSV;"

Importing Data into Redis

Redis is a key-value store. It doesn't store data in a tabular form like PostgreSQL or MySQL does. Below I'll import 1,000 taxi trip records. Each record will have a key prefixed with "trip" followed by an underscore and suffixed with the trip's numeric identifier. The value of each record will be a line of raw CSV data.

$ zcat trips_xaa.csv.gz \
    | head -n1000 \
    | awk -F, '{ print " SET", "\"trip_"$1"\"", "\""$0"\"" }' \
    | redis-cli

Configuring Presto's Connections

In order for Presto to query Kafka a connector must be setup in Presto's catalog folder. Note the names of any tables you want to use must be specified in this file.

$ sudo vi /opt/presto/etc/catalog/kafka.properties
connector.name=kafka
kafka.nodes=localhost:9092
kafka.table-names=trips
kafka.hide-internal-columns=false

Kafka stores each record as a raw CSV string. Below will cast each of the fields in these records into a named and typed form. I've used VARCHAR for each field type but a wider variety of types are supported.

$ sudo mkdir -p /opt/presto/etc/kafka
$ sudo vi /opt/presto/etc/kafka/trips.json
{
    "tableName": "trips",
    "schemaName": "default",
    "topicName": "trips",
    "message": {
        "dataFormat": "csv",
        "fields": [
            {"type": "VARCHAR", "mapping": 0,  "name": "trip_id"},
            {"type": "VARCHAR", "mapping": 1,  "name": "vendor_id"},
            {"type": "VARCHAR", "mapping": 2,  "name": "pickup_datetime"},
            {"type": "VARCHAR", "mapping": 3,  "name": "dropoff_datetime"},
            {"type": "VARCHAR", "mapping": 4,  "name": "store_and_fwd_flag"},
            {"type": "VARCHAR", "mapping": 5,  "name": "rate_code_id"},
            {"type": "VARCHAR", "mapping": 6,  "name": "pickup_longitude"},
            {"type": "VARCHAR", "mapping": 7,  "name": "pickup_latitude"},
            {"type": "VARCHAR", "mapping": 8,  "name": "dropoff_longitude"},
            {"type": "VARCHAR", "mapping": 9,  "name": "dropoff_latitude"},
            {"type": "VARCHAR", "mapping": 10, "name": "passenger_count"},
            {"type": "VARCHAR", "mapping": 11, "name": "trip_distance"},
            {"type": "VARCHAR", "mapping": 12, "name": "fare_amount"},
            {"type": "VARCHAR", "mapping": 13, "name": "extra"},
            {"type": "VARCHAR", "mapping": 14, "name": "mta_tax"},
            {"type": "VARCHAR", "mapping": 15, "name": "tip_amount"},
            {"type": "VARCHAR", "mapping": 16, "name": "tolls_amount"},
            {"type": "VARCHAR", "mapping": 17, "name": "ehail_fee"},
            {"type": "VARCHAR", "mapping": 18, "name": "improvement_surcharge"},
            {"type": "VARCHAR", "mapping": 19, "name": "total_amount"},
            {"type": "VARCHAR", "mapping": 20, "name": "payment_type"},
            {"type": "VARCHAR", "mapping": 21, "name": "trip_type"},
            {"type": "VARCHAR", "mapping": 22, "name": "pickup"},
            {"type": "VARCHAR", "mapping": 23, "name": "dropoff"},
            {"type": "VARCHAR", "mapping": 24, "name": "cab_type"},
            {"type": "VARCHAR", "mapping": 25, "name": "precipitation"},
            {"type": "VARCHAR", "mapping": 26, "name": "snow_depth"},
            {"type": "VARCHAR", "mapping": 27, "name": "snowfall"},
            {"type": "VARCHAR", "mapping": 28, "name": "max_temperature"},
            {"type": "VARCHAR", "mapping": 29, "name": "min_temperature"},
            {"type": "VARCHAR", "mapping": 30, "name": "average_wind_speed"},
            {"type": "VARCHAR", "mapping": 31, "name": "pickup_nyct2010_gid"},
            {"type": "VARCHAR", "mapping": 32, "name": "pickup_ctlabel"},
            {"type": "VARCHAR", "mapping": 33, "name": "pickup_borocode"},
            {"type": "VARCHAR", "mapping": 34, "name": "pickup_boroname"},
            {"type": "VARCHAR", "mapping": 35, "name": "pickup_ct2010"},
            {"type": "VARCHAR", "mapping": 36, "name": "pickup_boroct2010"},
            {"type": "VARCHAR", "mapping": 37, "name": "pickup_cdeligibil"},
            {"type": "VARCHAR", "mapping": 38, "name": "pickup_ntacode"},
            {"type": "VARCHAR", "mapping": 39, "name": "pickup_ntaname"},
            {"type": "VARCHAR", "mapping": 40, "name": "pickup_puma"},
            {"type": "VARCHAR", "mapping": 41, "name": "dropoff_nyct2010_gid"},
            {"type": "VARCHAR", "mapping": 42, "name": "dropoff_ctlabel"},
            {"type": "VARCHAR", "mapping": 43, "name": "dropoff_borocode"},
            {"type": "VARCHAR", "mapping": 44, "name": "dropoff_boroname"},
            {"type": "VARCHAR", "mapping": 45, "name": "dropoff_ct2010"},
            {"type": "VARCHAR", "mapping": 46, "name": "dropoff_boroct2010"},
            {"type": "VARCHAR", "mapping": 47, "name": "dropoff_cdeligibil"},
            {"type": "VARCHAR", "mapping": 48, "name": "dropoff_ntacode"},
            {"type": "VARCHAR", "mapping": 49, "name": "dropoff_ntaname"},
            {"type": "VARCHAR", "mapping": 50, "name": "dropoff_puma"}
        ]
    }
}

MongoDB already named each of the fields in each record so only the connector details are needed for Presto to communicate with it.

$ sudo vi /opt/presto/etc/catalog/mongodb.properties
connector.name=mongodb
mongodb.seeds=127.0.0.1

The following will give Presto the credentials and connection details for MySQL.

$ sudo vi /opt/presto/etc/catalog/mysql.properties
connector.name=mysql
connection-url=jdbc:mysql://127.0.0.1:3306
connection-user=mark
connection-password=test

The following will give Presto the credentials and connection details for PostgreSQL.

$ sudo vi /opt/presto/etc/catalog/postgresql.properties
connector.name=postgresql
connection-url=jdbc:postgresql://127.0.0.1:5432/taxi
connection-user=mark
connection-password=test

The following will give Presto the connection details for Redis. The schema and table name being used is also declared in this file.

$ sudo vi /opt/presto/etc/catalog/redis.properties
connector.name=redis
redis.table-names=schema1.trips
redis.nodes=localhost:6379

Redis stores each record in raw CSV format. The following will define a schema that will name and type cast each CSV field so they're usable in a tabular form in Presto.

$ sudo mkdir -p /opt/presto/etc/redis
$ sudo vi /opt/presto/etc/redis/trips.json
{
    "tableName": "trips",
    "schemaName": "schema1",
    "value": {
        "dataFormat": "csv",
        "fields": [
            {"type": "VARCHAR", "mapping": 0,  "name": "trip_id"},
            {"type": "VARCHAR", "mapping": 1,  "name": "vendor_id"},
            {"type": "VARCHAR", "mapping": 2,  "name": "pickup_datetime"},
            {"type": "VARCHAR", "mapping": 3,  "name": "dropoff_datetime"},
            {"type": "VARCHAR", "mapping": 4,  "name": "store_and_fwd_flag"},
            {"type": "VARCHAR", "mapping": 5,  "name": "rate_code_id"},
            {"type": "VARCHAR", "mapping": 6,  "name": "pickup_longitude"},
            {"type": "VARCHAR", "mapping": 7,  "name": "pickup_latitude"},
            {"type": "VARCHAR", "mapping": 8,  "name": "dropoff_longitude"},
            {"type": "VARCHAR", "mapping": 9,  "name": "dropoff_latitude"},
            {"type": "VARCHAR", "mapping": 10, "name": "passenger_count"},
            {"type": "VARCHAR", "mapping": 11, "name": "trip_distance"},
            {"type": "VARCHAR", "mapping": 12, "name": "fare_amount"},
            {"type": "VARCHAR", "mapping": 13, "name": "extra"},
            {"type": "VARCHAR", "mapping": 14, "name": "mta_tax"},
            {"type": "VARCHAR", "mapping": 15, "name": "tip_amount"},
            {"type": "VARCHAR", "mapping": 16, "name": "tolls_amount"},
            {"type": "VARCHAR", "mapping": 17, "name": "ehail_fee"},
            {"type": "VARCHAR", "mapping": 18, "name": "improvement_surcharge"},
            {"type": "VARCHAR", "mapping": 19, "name": "total_amount"},
            {"type": "VARCHAR", "mapping": 20, "name": "payment_type"},
            {"type": "VARCHAR", "mapping": 21, "name": "trip_type"},
            {"type": "VARCHAR", "mapping": 22, "name": "pickup"},
            {"type": "VARCHAR", "mapping": 23, "name": "dropoff"},
            {"type": "VARCHAR", "mapping": 24, "name": "cab_type"},
            {"type": "VARCHAR", "mapping": 25, "name": "precipitation"},
            {"type": "VARCHAR", "mapping": 26, "name": "snow_depth"},
            {"type": "VARCHAR", "mapping": 27, "name": "snowfall"},
            {"type": "VARCHAR", "mapping": 28, "name": "max_temperature"},
            {"type": "VARCHAR", "mapping": 29, "name": "min_temperature"},
            {"type": "VARCHAR", "mapping": 30, "name": "average_wind_speed"},
            {"type": "VARCHAR", "mapping": 31, "name": "pickup_nyct2010_gid"},
            {"type": "VARCHAR", "mapping": 32, "name": "pickup_ctlabel"},
            {"type": "VARCHAR", "mapping": 33, "name": "pickup_borocode"},
            {"type": "VARCHAR", "mapping": 34, "name": "pickup_boroname"},
            {"type": "VARCHAR", "mapping": 35, "name": "pickup_ct2010"},
            {"type": "VARCHAR", "mapping": 36, "name": "pickup_boroct2010"},
            {"type": "VARCHAR", "mapping": 37, "name": "pickup_cdeligibil"},
            {"type": "VARCHAR", "mapping": 38, "name": "pickup_ntacode"},
            {"type": "VARCHAR", "mapping": 39, "name": "pickup_ntaname"},
            {"type": "VARCHAR", "mapping": 40, "name": "pickup_puma"},
            {"type": "VARCHAR", "mapping": 41, "name": "dropoff_nyct2010_gid"},
            {"type": "VARCHAR", "mapping": 42, "name": "dropoff_ctlabel"},
            {"type": "VARCHAR", "mapping": 43, "name": "dropoff_borocode"},
            {"type": "VARCHAR", "mapping": 44, "name": "dropoff_boroname"},
            {"type": "VARCHAR", "mapping": 45, "name": "dropoff_ct2010"},
            {"type": "VARCHAR", "mapping": 46, "name": "dropoff_boroct2010"},
            {"type": "VARCHAR", "mapping": 47, "name": "dropoff_cdeligibil"},
            {"type": "VARCHAR", "mapping": 48, "name": "dropoff_ntacode"},
            {"type": "VARCHAR", "mapping": 49, "name": "dropoff_ntaname"},
            {"type": "VARCHAR", "mapping": 50, "name": "dropoff_puma"}
        ]
    }
}

With all those configuration changes in place I'll restart the Presto server process.

$ sudo /opt/presto/bin/launcher restart

Querying Multiple Databases with Presto

The following with launch Presto's CLI and then query the five databases in a single query. Note the naming convention used here is catalogue.schema.table.

$ presto
SELECT (
    SELECT COUNT(*) FROM kafka.default.trips
), (
    SELECT COUNT(*) FROM mysql.taxi.trips
), (
    SELECT COUNT(*) FROM redis.schema1.trips
), (
    SELECT COUNT(*) FROM mongodb.taxi.trips
), (
    SELECT COUNT(*) FROM postgresql.public.trips
);
 _col0 | _col1 | _col2 | _col3 | _col4
-------+-------+-------+-------+-------
  1000 |  1000 |  1000 |  1000 |  1000

The same query can also be executed via Python. The following will install Python and PyHive among a few other dependencies.

$ sudo apt install \
    python \
    python-pip \
    virtualenv
$ virtualenv .pyhive
$ source .pyhive/bin/activate
$ pip install pyhive requests

I'll launch Python's REPL and execute the same query executed above.

$ python
from pyhive import presto


sql = """SELECT (
             SELECT COUNT(*) FROM kafka.default.trips
         ), (
             SELECT COUNT(*) FROM mysql.taxi.trips
         ), (
             SELECT COUNT(*) FROM redis.schema1.trips
         ), (
             SELECT COUNT(*) FROM mongodb.taxi.trips
         ), (
             SELECT COUNT(*) FROM postgresql.public.trips
         )"""
cursor = presto.connect('0.0.0.0').cursor()
cursor.execute(sql)
cursor.fetchall()

The output looks like the following.

[(1000, 1000, 1000, 1000, 1000)]

Presto can also export query results to CSV format. The following will collect all the events from Kafka's trips topic and output them to a file.

$ presto \
    --execute "SELECT * FROM kafka.default.trips" \
    --output-format CSV_HEADER \
    > trips_from_kafka.csv
Thank you for taking the time to read this post. I offer both consulting and hands-on development services to clients in North America and Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

Copyright © 2014 - 2025 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.