Home | Benchmarks | Categories | Atom Feed

Posted on Mon 01 August 2016 under File and Data Management

Building a Data Pipeline with Airflow

It's rare these days that I come across a project that can get by on a single piece of database software. Each database has its own speciality and as an ensemble, multiple databases are worth more than the sum of their parts. Data pipelines are used to monitor and control the flow of data between databases and other endpoints. Some pipeline managers can handle complex lifecycles and retry steps within a job should a failure arise.

For about a year now I've been using Airflow as a data pipeline orchestration tool with my clients. It's a tool that Maxime Beauchemin began building at Airbnb in October of 2014. Maxime has had a fantastic data-oriented career, working as a Senior BI Engineer at Yahoo and as a Data Engineering Manager at Facebook prior to his arrival at Airbnb in late 2014. In March of 2016, Airflow entered incubation as an Apache project.

When I first began using Airflow I was relieved to see that at its core is a plain and simple Flask project. I was able to read through its Python codebase in a morning and have confidence that I could work my way through its architecture.

In this blog post, I'll set up a data pipeline that takes currency exchange rates, stores them in PostgreSQL and then caches the latest exchange rates in Redis.

A Foreign Exchange Rate API

Open Exchange Rates has a "Forever Free" plan that offers an hourly rate feed with a US Dollar base. With this plan, you can make 1,000 requests a month. Currency exchange markets operate 24 hours a day, 5 days a week. They start at 10PM GMT on Sunday and run till 10PM GMT on Friday. This means rates in this feed should only change 520 times a month.

Once you've signed up and got your Application ID the API call is pretty simple:

$ curl -S https://openexchangerates.org/api/latest.json?app_id=... | \
    python -mjson.tool
{
  "disclaimer": "Exchange rates provided for informational purposes only and do not constitute financial advice of any kind. Although every attempt is made to ensure quality, no guarantees are made of accuracy, validity, availability, or fitness for any purpose. All usage subject to acceptance of Terms: https://openexchangerates.org/terms/",
  "license": "Data sourced from various providers; resale prohibited; no warranties given of any kind. All usage subject to License Agreement: https://openexchangerates.org/license/",
  "timestamp": 1470047453,
  "base": "USD",
  "rates": {
    "AED": 3.67283,
    "AFN": 68.7291,
    "ALL": 122.653,
    "AMD": 476.12,
    "ANG": 1.77625,
    "AOA": 165.563833,
    "ARS": 14.99704,
    "AUD": 1.31945
  }
}

Installing Prerequisites

I'm going to try and change as little of Airflow's default setup as possible. This means leaving MySQL to store the job results.

The following was run on a fresh Ubuntu 14.04.3 LTS installation.

$ sudo apt update
$ sudo apt install \
    libffi-dev \
    libmysqlclient-dev \
    mysql-server \
    postgresql-9.3 \
    postgresql-server-dev-9.3 \
    python-dev \
    python-pip \
    python-virtualenv \
    rabbitmq-server \
    redis-server

I'll add my Linux account to PostgreSQL's list of super users and then create the rates database which will store the exchange rate history.

$ sudo su - postgres -c \
    "createuser \
        --pwprompt \
        --superuser \
        mark"
$ createdb rates

I'll then create the MySQL database for airflow. The default Airflow configuration has "airflow" baked in as the username and password used to connect to MySQL.

$ mysql \
    -uroot \
    -proot \
    -e "CREATE DATABASE airflow
        DEFAULT CHARACTER SET utf8
        DEFAULT COLLATE utf8_general_ci;

        GRANT ALL PRIVILEGES
        ON airflow.*
        TO 'airflow'@'localhost'
        IDENTIFIED BY 'airflow';

        FLUSH PRIVILEGES;"

I'll then start Redis as it doesn't start automatically when you install it in Ubuntu via apt.

$ sudo /etc/init.d/redis-server start

Creating an Exchange Rates Table

As of this writing, the API returns rates for 172 currencies paired with the US Dollar. I'm going to store the currency identifier as an ENUM. When I import an exchange rate I'll make sure its pair exists. Any pairs supported in the future will need to be added to this enum.

For the exchange rate, I'll use the DECIMAL data type in PostgreSQL. Most of the rates are stated up to 6 decimal places though Bitcoin (BTC) is stated till the 12th decimal place.

The timestamp is an epoch in UTC. I'll convert it from its integer form into a datetime object and when it's inserted psycopg2 will convert it into a TIMESTAMP.

The primary key will be a SERIAL field which is an auto-incrementing integer that will run till 2,147,483,647. If we collect 172 exchange rates per hour, 24 hours a day, 5 days a week, 52 weeks a year we won't hit this 2.1 billion limit for 2,000 years.

$ psql rates
CREATE TYPE USD_PAIR AS ENUM (
    'AED', 'AFN', 'ALL', 'AMD', 'ANG', 'AOA', 'ARS',
    'AUD', 'AWG', 'AZN', 'BAM', 'BBD', 'BDT', 'BGN',
    'BHD', 'BIF', 'BMD', 'BND', 'BOB', 'BRL', 'BSD',
    'BTC', 'BTN', 'BWP', 'BYN', 'BYR', 'BZD', 'CAD',
    'CDF', 'CHF', 'CLF', 'CLP', 'CNY', 'COP', 'CRC',
    'CUC', 'CUP', 'CVE', 'CZK', 'DJF', 'DKK', 'DOP',
    'DZD', 'EEK', 'EGP', 'ERN', 'ETB', 'EUR', 'FJD',
    'FKP', 'GBP', 'GEL', 'GGP', 'GHS', 'GIP', 'GMD',
    'GNF', 'GTQ', 'GYD', 'HKD', 'HNL', 'HRK', 'HTG',
    'HUF', 'IDR', 'ILS', 'IMP', 'INR', 'IQD', 'IRR',
    'ISK', 'JEP', 'JMD', 'JOD', 'JPY', 'KES', 'KGS',
    'KHR', 'KMF', 'KPW', 'KRW', 'KWD', 'KYD', 'KZT',
    'LAK', 'LBP', 'LKR', 'LRD', 'LSL', 'LTL', 'LVL',
    'LYD', 'MAD', 'MDL', 'MGA', 'MKD', 'MMK', 'MNT',
    'MOP', 'MRO', 'MTL', 'MUR', 'MVR', 'MWK', 'MXN',
    'MYR', 'MZN', 'NAD', 'NGN', 'NIO', 'NOK', 'NPR',
    'NZD', 'OMR', 'PAB', 'PEN', 'PGK', 'PHP', 'PKR',
    'PLN', 'PYG', 'QAR', 'RON', 'RSD', 'RUB', 'RWF',
    'SAR', 'SBD', 'SCR', 'SDG', 'SEK', 'SGD', 'SHP',
    'SLL', 'SOS', 'SRD', 'STD', 'SVC', 'SYP', 'SZL',
    'THB', 'TJS', 'TMT', 'TND', 'TOP', 'TRY', 'TTD',
    'TWD', 'TZS', 'UAH', 'UGX', 'USD', 'UYU', 'UZS',
    'VEF', 'VND', 'VUV', 'WST', 'XAF', 'XAG', 'XAU',
    'XCD', 'XDR', 'XOF', 'XPD', 'XPF', 'XPT', 'YER',
    'ZAR', 'ZMK', 'ZMW', 'ZWL');

CREATE TABLE rates (
    pk SERIAL,
    pair USD_PAIR,
    valid_until TIMESTAMP WITH TIME ZONE,
    rate DECIMAL,
    CONSTRAINT pk PRIMARY KEY (pk, pair, valid_until)
);

Installing Airflow

I'll create a virtual environment, activate it and install the python modules. As of this writing, Airflow 1.7.1.3 is the latest version available via PyPI. When including [postgres] alongside Airflow it'll install psycopg2 automatically.

$ virtualenv .pipeline
$ source .pipeline/bin/activate
$ pip install \
    airflow[postgres] \
    celery \
    cryptography \
    MySQL-python \
    redis

I'll then initialise Airflow's database and workspace.

$ airflow initdb

By default example DAGs will be loaded up for Airflow so if you haven't installed any Hive dependencies you'll see this annoying error message a lot:

ERROR [airflow.models.DagBag] Failed to import: /home/mark/.pipeline/local/lib/python2.7/site-packages/airflow/example_dags/example_twitter_dag.py
Traceback (most recent call last):
  File "/home/mark/.pipeline/local/lib/python2.7/site-packages/airflow/models.py", line 247, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/home/mark/.pipeline/local/lib/python2.7/site-packages/airflow/example_dags/example_twitter_dag.py", line 26, in <module>
    from airflow.operators import BashOperator, HiveOperator, PythonOperator
ImportError: cannot import name HiveOperator

To avoid loading these examples run the following to patch Airflow's configuration:

$ sed -i 's/load_examples = True/load_examples = False/' \
    airflow/airflow.cfg

Setting up Database Connections

Airflow has an inventory system for database and API connections. The easiest way to add these in is via the web interface. The following will launch the webserver on port 8080 and bind it to the 0.0.0.0 network interface.

$ airflow webserver

If you then load up the following URL it'll take you to the connections page:

$ open http://127.0.0.1:8080/admin/connection/

I'll create two connections. The first will have:

  • A connection type of HTTP.
  • A connection identifier of openexchangerates.
  • A host string of the full API endpoint: https://openexchangerates.org/api/latest.json?app_id=....

The second connection will have:

  • A connection type of Postgres.
  • A connection identifier of rates.
  • A host string of 127.0.0.1.
  • A schema string (database name) of rates.
  • A login of mark and the complementing password I created earlier.

Creating a Forex DAG

Directed Acyclic Graphs (DAGs) are trees of nodes that Airflow's workers will traverse. Each node in the graph can be thought of as steps and the group of steps make up the overall job. The graphs are acyclic so you cannot jump to a previous task in the graph but you can jump to another task that hasn't yet run in the current job. This means individual steps can be retried repeatedly if they have a failure without having to restart the whole job.

The following will set up the dags folder and create the DAG for fetching exchange rates, storing them in PostgreSQL and caching the latest rates in Redis.

$ mkdir airflow/dags
$ vi airflow/dags/rates.py
from datetime import datetime, timedelta
import json


from airflow.hooks import HttpHook, PostgresHook
from airflow.operators import PythonOperator
from airflow.models import DAG
import redis


def get_rates(ds, **kwargs):
    pg_hook = PostgresHook(postgres_conn_id='rates')
    api_hook = HttpHook(http_conn_id='openexchangerates', method='GET')

    # If either of these raises an exception then we'll be notified via
    # Airflow
    resp = api_hook.run('')
    resp = json.loads(resp.content)

    # These are the only valid pairs the DB supports at the moment. Anything
    # else that turns up will be ignored.
    valid_pairs = (
        'AED', 'AFN', 'ALL', 'AMD', 'ANG', 'AOA', 'ARS',
        'AUD', 'AWG', 'AZN', 'BAM', 'BBD', 'BDT', 'BGN',
        'BHD', 'BIF', 'BMD', 'BND', 'BOB', 'BRL', 'BSD',
        'BTC', 'BTN', 'BWP', 'BYN', 'BYR', 'BZD', 'CAD',
        'CDF', 'CHF', 'CLF', 'CLP', 'CNY', 'COP', 'CRC',
        'CUC', 'CUP', 'CVE', 'CZK', 'DJF', 'DKK', 'DOP',
        'DZD', 'EEK', 'EGP', 'ERN', 'ETB', 'EUR', 'FJD',
        'FKP', 'GBP', 'GEL', 'GGP', 'GHS', 'GIP', 'GMD',
        'GNF', 'GTQ', 'GYD', 'HKD', 'HNL', 'HRK', 'HTG',
        'HUF', 'IDR', 'ILS', 'IMP', 'INR', 'IQD', 'IRR',
        'ISK', 'JEP', 'JMD', 'JOD', 'JPY', 'KES', 'KGS',
        'KHR', 'KMF', 'KPW', 'KRW', 'KWD', 'KYD', 'KZT',
        'LAK', 'LBP', 'LKR', 'LRD', 'LSL', 'LTL', 'LVL',
        'LYD', 'MAD', 'MDL', 'MGA', 'MKD', 'MMK', 'MNT',
        'MOP', 'MRO', 'MTL', 'MUR', 'MVR', 'MWK', 'MXN',
        'MYR', 'MZN', 'NAD', 'NGN', 'NIO', 'NOK', 'NPR',
        'NZD', 'OMR', 'PAB', 'PEN', 'PGK', 'PHP', 'PKR',
        'PLN', 'PYG', 'QAR', 'RON', 'RSD', 'RUB', 'RWF',
        'SAR', 'SBD', 'SCR', 'SDG', 'SEK', 'SGD', 'SHP',
        'SLL', 'SOS', 'SRD', 'STD', 'SVC', 'SYP', 'SZL',
        'THB', 'TJS', 'TMT', 'TND', 'TOP', 'TRY', 'TTD',
        'TWD', 'TZS', 'UAH', 'UGX', 'USD', 'UYU', 'UZS',
        'VEF', 'VND', 'VUV', 'WST', 'XAF', 'XAG', 'XAU',
        'XCD', 'XDR', 'XOF', 'XPD', 'XPF', 'XPT', 'YER',
        'ZAR', 'ZMK', 'ZMW', 'ZWL')

    rates_insert = """INSERT INTO rates (pair, valid_until, rate)
                      VALUES (%s, %s, %s);"""

    # If this raises an exception then we'll be notified via Airflow
    valid_until = datetime.fromtimestamp(resp['timestamp'])

    for (iso2, rate) in resp['rates'].iteritems():
        # If converting the rate to a float fails for whatever reason then
        # just move on.
        try:
            rate = float(rate)
        except:
            continue

        iso2 = iso2.upper().strip()

        if iso2 not in valid_pairs or rate < 0:
            continue

        pg_hook.run(rates_insert, parameters=(iso2,
                                              valid_until,
                                              rate))


def cache_latest_rates(ds, **kwargs):
    redis_conn = redis.StrictRedis()

    pg_hook = PostgresHook(postgres_conn_id='rates')
    latest_rates = """SELECT DISTINCT ON (pair)
                             pair, rate
                      FROM   rates
                      ORDER  BY pair, valid_until DESC;"""

    for iso2, rate in pg_hook.get_records(latest_rates):
        redis_conn.set(iso2, rate)


args = {
    'owner': 'mark',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Run at the top of the hour Monday to Friday.
# Note: This doesn't line up with the market hours of
# 10PM Sunday till 10PM Friday GMT.
dag = DAG(dag_id='rates',
          default_args=args,
          schedule_interval='0 * * * 1,2,3,4,5',
          dagrun_timeout=timedelta(seconds=30))

get_rates_task = \
    PythonOperator(task_id='get_rates',
                   provide_context=True,
                   python_callable=get_rates,
                   dag=dag)

cache_latest_rates_task = \
    PythonOperator(task_id='cache_latest_rates',
                   provide_context=True,
                   python_callable=cache_latest_rates,
                   dag=dag)

get_rates_task.set_downstream(cache_latest_rates_task)

Launching the Scheduler & Worker

Airflow has two commands to getting jobs to execute, the first schedules the jobs to run and the second starts at least one worker to run jobs waiting to be taken on. This might seem like one command too many but if you're setting up a distributed system to take on a lot of work then having these divisions of responsibility helps out a lot.

$ airflow scheduler &
$ airflow worker &

With those two commands running, we can now see Airflow at work.

If you launch the admin page you can hit the switch next to the rates DAG to enable it.

$ open http://127.0.0.1:8080/admin/

You can then view each node in the graph. Their borders are coloured with their current status.

$ open 'http://127.0.0.1:8080/admin/airflow/graph?dag_id=rates'

If you click any node you'll see a modal with various actions. Click the "View Log" button in the top row of the actions list to see the last log output for that task. Having logs isolated helps find errors quickly without needing to grep or tail any logs manually.

If you click "Task Details" you'll see the isolated Python code for that specific operator. This helps by not requiring you to track down Python files manually.

Airflow makes running a data pipeline very convenient. You can perform most debugging and auditing tasks from your browser.

Once the DAG has run once successfully you'll be able to see the data in PostgreSQL and Redis:

$ echo 'get "GBP"' | redis-cli
"0.757637"
$ echo "COPY (SELECT *
              FROM rates
              ORDER BY valid_until
              DESC LIMIT 10)
        TO STDOUT WITH CSV;" | psql rates
3969,BGN,2016-08-01 16:45:05+03,1.752679
3972,DKK,2016-08-01 16:45:05+03,6.661185
3967,GHS,2016-08-01 16:45:05+03,3.969
3968,EGP,2016-08-01 16:45:05+03,8.879764
3970,PAB,2016-08-01 16:45:05+03,1.0
3971,BOB,2016-08-01 16:45:05+03,6.92172
3965,DZD,2016-08-01 16:45:05+03,110.12886
3966,NAD,2016-08-01 16:45:05+03,13.95132
3974,LBP,2016-08-01 16:45:05+03,1508.666667
3973,BWP,2016-08-01 16:45:05+03,10.557125
Thank you for taking the time to read this post. I offer both consulting and hands-on development services to clients in North America and Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

Copyright © 2014 - 2024 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.