Apache Airflow: Making ETL Easier

Hi, I'm Dmitry Logvinenko - Data Engineer of the Analytics Department of the Vezet group of companies.

I will tell you about a wonderful tool for developing ETL processes - Apache Airflow. But Airflow is so versatile and multifaceted that you should take a closer look at it even if you are not involved in data flows, but have a need to periodically launch any processes and monitor their execution.

And yes, I will not only tell, but also show: the program has a lot of code, screenshots and recommendations.

Apache Airflow: Making ETL Easier
What you usually see when you google the word Airflow / Wikimedia Commons

Table of contents

Introduction

Apache Airflow is just like Django:

  • written in python
  • there is a great admin panel,
  • expandable indefinitely

- only better, and it was made for completely different purposes, namely (as it is written before the kat):

  • running and monitoring tasks on an unlimited number of machines (as many Celery / Kubernetes and your conscience will allow you)
  • with dynamic workflow generation from very easy to write and understand Python code
  • and the ability to connect any databases and APIs with each other using both ready-made components and home-made plugins (which is extremely simple).

We use Apache Airflow like this:

  • we collect data from various sources (many SQL Server and PostgreSQL instances, various APIs with application metrics, even 1C) in DWH and ODS (we have Vertica and Clickhouse).
  • how advanced cron, which starts the data consolidation processes on the ODS, and also monitors their maintenance.

Until recently, our needs were covered by one small server with 32 cores and 50 GB of RAM. In Airflow, this works:

  • more 200 dag (actually workflows, in which we stuffed tasks),
  • in each on average 70 tasks,
  • this goodness starts (also on average) once an hour.

And about how we expanded, I will write below, but now let's define the über-problem that we will solve:

There are three source SQL Servers, each with 50 databases - instances of one project, respectively, they have the same structure (almost everywhere, mua-ha-ha), which means that each has an Orders table (fortunately, a table with that name can be push into any business). We take the data by adding service fields (source server, source database, ETL task ID) and naively throw them into, say, Vertica.

Let's go!

The main part, practical (and a little theoretical)

Why do we (and you)

When the trees were big and I was simple SQL-schik in one Russian retail, we scammed ETL processes aka data flows using two tools available to us:

  • Informatica Power Center - an extremely spreading system, extremely productive, with its own hardware, its own versioning. I used God forbid 1% of its capabilities. Why? Well, first of all, this interface, somewhere from the 380s, mentally put pressure on us. Secondly, this contraption is designed for extremely fancy processes, furious component reuse and other very-important-enterprise-tricks. About what it costs, like the wing of the Airbus AXNUMX / year, we will not say anything.

    Beware, a screenshot can hurt people under 30 a little

    Apache Airflow: Making ETL Easier

  • SQL Server Integration Server - we used this comrade in our intra-project flows. Well, in fact: we already use SQL Server, and it would be somehow unreasonable not to use its ETL tools. Everything in it is good: both the interface is beautiful, and the progress reports ... But this is not why we love software products, oh, not for this. Version it dtsx (which is XML with nodes shuffled on save) we can, but what's the point? How about making a task package that will drag hundreds of tables from one server to another? Yes, what a hundred, your index finger will fall off from twenty pieces, clicking on the mouse button. But it definitely looks more fashionable:

    Apache Airflow: Making ETL Easier

We certainly looked for ways out. Case even almost came to a self-written SSIS package generator ...

…and then a new job found me. And Apache Airflow overtook me on it.

When I found out that ETL process descriptions are simple Python code, I just did not dance for joy. This is how data streams were versioned and diffed, and pouring tables with a single structure from hundreds of databases into one target became a matter of Python code in one and a half or two 13 ”screens.

Assembling the cluster

Let's not arrange a completely kindergarten, and not talk about completely obvious things here, like installing Airflow, your chosen database, Celery and other cases described in the docks.

So that we can immediately begin experiments, I sketched docker-compose.yml wherein:

  • Let's actually raise Airflow: Scheduler, Webserver. Flower will also be spinning there to monitor Celery tasks (because it has already been pushed into apache/airflow:1.10.10-python3.7, but we don't mind)
  • PostgreSQL, in which Airflow will write its service information (scheduler data, execution statistics, etc.), and Celery will mark completed tasks;
  • Redis, which will act as a task broker for Celery;
  • Celery worker, which will be engaged in the direct execution of tasks.
  • To folder ./dags we will add our files with the description of dags. They will be picked up on the fly, so there is no need to juggle the entire stack after each sneeze.

In some places, the code in the examples is not completely shown (so as not to clutter up the text), but somewhere it is modified in the process. Complete working code examples can be found in the repository https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Notes:

  • In the assembly of the composition, I largely relied on the well-known image puckel/docker-airflow - be sure to check it out. Maybe you don't need anything else in your life.
  • All Airflow settings are available not only through airflow.cfg, but also through environment variables (thanks to the developers), which I maliciously took advantage of.
  • Naturally, it is not production-ready: I ​​deliberately did not put heartbeats on containers, I did not bother with security. But I did the minimum suitable for our experimenters.
  • Note that:
    • The dag folder must be accessible to both the scheduler and the workers.
    • The same applies to all third-party libraries - they must all be installed on machines with a scheduler and workers.

Well, now it's simple:

$ docker-compose up --scale worker=3

After everything rises, you can look at the web interfaces:

Basic concepts

If you didn’t understand anything in all these “dags”, then here is a short dictionary:

  • Scheduler - the most important uncle in Airflow, who controls that robots work hard, and not a person: monitors the schedule, updates dags, launches tasks.

    In general, in older versions, he had problems with memory (no, not amnesia, but leaks) and the legacy parameter even remained in the configs run_duration — its restart interval. But now everything is fine.

  • DAY (aka "dag") - "directed acyclic graph", but such a definition will tell few people, but in fact it is a container for tasks interacting with each other (see below) or an analogue of Package in SSIS and Workflow in Informatica.

    In addition to dags, there may still be subdags, but we most likely will not get to them.

  • DAG Run - initialized dag, which is assigned its own execution_date. Dagrans of the same dag can work in parallel (if you have made your tasks idempotent, of course).
  • Operator are pieces of code responsible for performing a specific action. There are three types of operators:
    • actionlike our favorite PythonOperator, which can execute any (valid) Python code;
    • transfer, which transport data from place to place, say, MsSqlToHiveTransfer;
    • sensor on the other hand, it will allow you to react or slow down the further execution of the dag until an event occurs. HttpSensor can pull the specified endpoint, and when the desired response is waiting, start the transfer GoogleCloudStorageToS3Operator. An inquisitive mind will ask: “why? After all, you can do repetitions right in the operator!” And then, in order not to clog the pool of tasks with suspended operators. The sensor starts, checks and dies before the next attempt.
  • Task - declared operators, regardless of type, and attached to the dag are promoted to the rank of task.
  • task instance - when the general planner decided that it was time to send tasks into battle on performer-workers (right on the spot, if we use LocalExecutor or to a remote node in the case of CeleryExecutor), it assigns a context to them (i.e., a set of variables - execution parameters), expands command or query templates, and pools them.

We generate tasks

First, let's outline the general scheme of our doug, and then we will dive into the details more and more, because we apply some non-trivial solutions.

So, in its simplest form, such a dag will look like this:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Let's understand:

  • First, we import the necessary libs and something else;
  • sql_server_ds - Is List[namedtuple[str, str]] with the names of the connections from Airflow Connections and the databases from which we will take our plate;
  • dag - the announcement of our dag, which must necessarily be in globals(), otherwise Airflow won't find it. Doug also needs to say:
    • what is his name orders - this name will then appear in the web interface,
    • that he will work from midnight on the eighth of July,
    • and it should run, approximately every 6 hours (for tough guys here instead of timedelta() admissible cron-line 0 0 0/6 ? * * *, for the less cool - an expression like @daily);
  • workflow() will do the main job, but not now. For now, we'll just dump our context into the log.
  • And now the simple magic of creating tasks:
    • we run through our sources;
    • initialize PythonOperator, which will execute our dummy workflow(). Do not forget to specify a unique (within the dag) name of the task and tie the dag itself. Flag provide_context in turn, will pour additional arguments into the function, which we will carefully collect using **context.

For now, that's all. What we got:

  • new dag in the web interface,
  • one and a half hundred tasks that will be executed in parallel (if the Airflow, Celery settings and server capacity allow it).

Well, almost got it.

Apache Airflow: Making ETL Easier
Who will install the dependencies?

To simplify this whole thing, I screwed in docker-compose.yml processing requirements.txt on all nodes.

Now it's gone:

Apache Airflow: Making ETL Easier

Gray squares are task instances processed by the scheduler.

We wait a bit, the tasks are snapped up by the workers:

Apache Airflow: Making ETL Easier

The green ones, of course, have successfully completed their work. Reds are not very successful.

By the way, there is no folder on our prod ./dags, there is no synchronization between machines - all dags lie in git on our Gitlab, and Gitlab CI distributes updates to machines when merging in master.

A little about Flower

While the workers are thrashing our pacifiers, let's remember another tool that can show us something - Flower.

The very first page with summary information on worker nodes:

Apache Airflow: Making ETL Easier

The most intense page with tasks that went to work:

Apache Airflow: Making ETL Easier

The most boring page with the status of our broker:

Apache Airflow: Making ETL Easier

The brightest page is with task status graphs and their execution time:

Apache Airflow: Making ETL Easier

We load the underloaded

So, all the tasks have worked out, you can carry away the wounded.

Apache Airflow: Making ETL Easier

And there were many wounded - for one reason or another. In the case of the correct use of Airflow, these very squares indicate that the data definitely did not arrive.

You need to watch the log and restart the fallen task instances.

By clicking on any square, we will see the actions available to us:

Apache Airflow: Making ETL Easier

You can take and make Clear the fallen. That is, we forget that something has failed there, and the same instance task will go to the scheduler.

Apache Airflow: Making ETL Easier

It is clear that doing this with the mouse with all the red squares is not very humane - this is not what we expect from Airflow. Naturally, we have weapons of mass destruction: Browse/Task Instances

Apache Airflow: Making ETL Easier

Let's select everything at once and reset to zero, click the correct item:

Apache Airflow: Making ETL Easier

After cleaning, our taxis look like this (they are already waiting for the scheduler to schedule them):

Apache Airflow: Making ETL Easier

Connections, hooks and other variables

It's time to look at the next DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Has everyone ever done a report update? This is her again: there is a list of sources from where to get the data; there is a list where to put; do not forget to honk when everything happened or broke (well, this is not about us, no).

Let's go through the file again and look at the new obscure stuff:

  • from commons.operators import TelegramBotSendMessage - nothing prevents us from making our own operators, which we took advantage of by making a small wrapper for sending messages to Unblocked. (We will talk more about this operator below);
  • default_args={} - dag can distribute the same arguments to all its operators;
  • to='{{ var.value.all_the_kings_men }}' - field to we will not have hardcoded, but dynamically generated using Jinja and a variable with a list of emails, which I carefully put in Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — condition for starting the operator. In our case, the letter will fly to the bosses only if all dependencies have worked out successfully;
  • tg_bot_conn_id='tg_main' - arguments conn_id accept connection IDs that we create in Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - messages in Telegram will fly away only if there are fallen tasks;
  • task_concurrency=1 - we prohibit the simultaneous launch of several task instances of one task. Otherwise, we will get the simultaneous launch of several VerticaOperator (looking at one table);
  • report_update >> [email, tg] - all VerticaOperator converge in sending letters and messages, like this:
    Apache Airflow: Making ETL Easier

    But since notifier operators have different launch conditions, only one will work. In the Tree View, everything looks a little less visual:
    Apache Airflow: Making ETL Easier

I will say a few words about macros and their friends - variables.

Macros are Jinja placeholders that can substitute various useful information into operator arguments. For example, like this:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} will expand to the contents of the context variable execution_date in the format YYYY-MM-DD: 2020-07-14. The best part is that context variables are nailed to a specific task instance (a square in the Tree View), and when restarted, the placeholders will expand to the same values.

The assigned values ​​can be viewed using the Rendered button on each task instance. This is how the task with sending a letter:

Apache Airflow: Making ETL Easier

And so at the task with sending a message:

Apache Airflow: Making ETL Easier

A complete list of built-in macros for the latest available version is available here: macros reference

Moreover, with the help of plugins, we can declare our own macros, but that's another story.

In addition to the predefined things, we can substitute the values ​​of our variables (I already used this in the code above). Let's create in Admin/Variables a couple of things:

Apache Airflow: Making ETL Easier

Everything you can use:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

The value can be a scalar, or it can also be JSON. In case of JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

just use the path to the desired key: {{ var.json.bot_config.bot.token }}.

I will literally say one word and show one screenshot about connections. Everything is elementary here: on the page Admin/Connections we create a connection, add our logins / passwords and more specific parameters there. Like this:

Apache Airflow: Making ETL Easier

Passwords can be encrypted (more thoroughly than the default), or you can leave out the connection type (as I did for tg_main) - the fact is that the list of types is hardwired in Airflow models and cannot be expanded without getting into the source codes (if suddenly I didn’t google something, please correct me), but nothing will stop us from getting credits just by name.

You can also make several connections with the same name: in this case, the method BaseHook.get_connection(), which gets us connections by name, will give random from several namesakes (it would be more logical to make Round Robin, but let's leave it on the conscience of the Airflow developers).

Variables and Connections are certainly cool tools, but it's important not to lose the balance: which parts of your flows you store in the code itself, and which parts you give to Airflow for storage. On the one hand, it can be convenient to quickly change the value, for example, a mailing box, through the UI. On the other hand, this is still a return to the mouse click, from which we (I) wanted to get rid of.

Working with connections is one of the tasks hooks. In general, Airflow hooks are points for connecting it to third-party services and libraries. Eg, JiraHook will open a client for us to interact with Jira (you can move tasks back and forth), and with the help of SambaHook you can push a local file to smb-point.

Parsing the custom operator

And we got close to looking at how it's made TelegramBotSendMessage

Code commons/operators.py with the actual operator:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Here, like everything else in Airflow, everything is very simple:

  • Inherited from BaseOperator, which implements quite a few Airflow-specific things (look at your leisure)
  • Declared fields template_fields, in which Jinja will look for macros to process.
  • Arranged the right arguments for __init__(), set the defaults where necessary.
  • We didn't forget about the initialization of the ancestor either.
  • Opened the corresponding hook TelegramBotHookreceived a client object from it.
  • Overridden (redefined) method BaseOperator.execute(), which Airfow will twitch when the time comes to launch the operator - in it we will implement the main action, forgetting to log in. (We log in, by the way, right in stdout и stderr - Airflow will intercept everything, wrap it beautifully, decompose it where necessary.)

Let's see what we have commons/hooks.py. The first part of the file, with the hook itself:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

I don’t even know what to explain here, I’ll just note the important points:

  • We inherit, think about the arguments - in most cases it will be one: conn_id;
  • Overriding standard methods: I limited myself get_conn(), in which I get the connection parameters by name and just get the section extra (this is a JSON field), in which I (according to my own instructions!) put the Telegram bot token: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • I create an instance of our TelegramBot, giving it a specific token.

That's all. You can get a client from a hook using TelegramBotHook().clent or TelegramBotHook().get_conn().

And the second part of the file, in which I make a microwrapper for the Telegram REST API, so as not to drag the same python-telegram-bot for one method sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

The correct way is to add it all up: TelegramBotSendMessage, TelegramBotHook, TelegramBot - in the plugin, put in a public repository, and give it to Open Source.

While we were studying all this, our report updates managed to fail successfully and send me an error message in the channel. I'm going to check to see if it's wrong...

Apache Airflow: Making ETL Easier
Something broke in our doge! Isn't that what we were expecting? Exactly!

Are you going to pour?

Do you feel I missed something? It seems that he promised to transfer data from SQL Server to Vertica, and then he took it and moved off the topic, the scoundrel!

This atrocity was intentional, I simply had to decipher some terminology for you. Now you can go further.

Our plan was this:

  1. Do dag
  2. Generate tasks
  3. See how beautiful everything is
  4. Assign session numbers to fills
  5. Get data from SQL Server
  6. Put data into Vertica
  7. Collect statistics

So, to get this all up and running, I made a small addition to our docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

There we raise:

  • Vertica as host dwh with the most default settings,
  • three instances of SQL Server,
  • we fill the databases in the latter with some data (in no case do not look into mssql_init.py!)

We launch all the good with the help of a slightly more complicated command than last time:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

What our miracle randomizer generated, you can use the item Data Profiling/Ad Hoc Query:

Apache Airflow: Making ETL Easier
The main thing is not to show it to analysts

elaborate on ETL sessions I won’t, everything is trivial there: we make a base, there is a sign in it, we wrap everything with a context manager, and now we do this:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

The time has come collect our data from our one and a half hundred tables. Let's do this with the help of very unpretentious lines:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. With the help of a hook we get from Airflow pymssql-connect
  2. Let's substitute a restriction in the form of a date into the request - it will be thrown into the function by the template engine.
  3. Feeding our request pandaswho will get us DataFrame - it will be useful to us in the future.

I am using substitution {dt} instead of a request parameter %s not because I'm an evil Pinocchio, but because pandas can't cope with pymssql and slips the last one params: Listalthough he really wants tuple.
Also note that the developer pymssql decided not to support him anymore, and it's time to move out pyodbc.

Let's see what Airflow stuffed the arguments of our functions with:

Apache Airflow: Making ETL Easier

If there is no data, then there is no point in continuing. But it is also strange to consider the filling successful. But this is not a mistake. A-ah-ah, what to do?! And here's what:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException will tell Airflow that there are no errors, but we skip the task. The interface will not have a green or red square, but pink.

Let's toss our data multiple columns:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

These are:

  • The database from which we took the orders,
  • ID of our flooding session (it will be different for every task),
  • A hash from the source and order ID - so that in the final database (where everything is poured into one table) we have a unique order ID.

The penultimate step remains: pour everything into Vertica. And, oddly enough, one of the most spectacular and efficient ways to do this is through CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. We are making a special receiver StringIO.
  2. pandas will kindly put our DataFrame as CSV-lines.
  3. Let's open a connection to our favorite Vertica with a hook.
  4. And now with the help copy() send our data directly to Vertika!

We will take from the driver how many lines were filled up, and tell the session manager that everything is OK:

session.loaded_rows = cursor.rowcount
session.successful = True

That's all.

On the sale, we create the target plate manually. Here I allowed myself a small machine:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

I am using VerticaOperator() I create a database schema and a table (if they don't already exist, of course). The main thing is to correctly arrange the dependencies:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Summing up

- Well, - said the little mouse, - isn't it, now
Are you convinced that I am the most terrible animal in the forest?

Julia Donaldson, The Gruffalo

I think if my colleagues and I had a competition: who will quickly create and launch an ETL process from scratch: they with their SSIS and a mouse and me with Airflow ... And then we would also compare the ease of maintenance ... Wow, I think you will agree that I will beat them on all fronts!

If a little more seriously, then Apache Airflow - by describing processes in the form of program code - did my job much more comfortable and enjoyable.

Its unlimited extensibility, both in terms of plug-ins and predisposition to scalability, gives you the opportunity to use Airflow in almost any area: even in the full cycle of collecting, preparing and processing data, even in launching rockets (to Mars, of course).

Part final, reference and information

The rake we have collected for you

  • start_date. Yes, this is already a local meme. Via Doug's main argument start_date all pass. Briefly, if you specify in start_date current date, and schedule_interval - one day, then DAG will start tomorrow no earlier.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    And no more problems.

    There is another runtime error associated with it: Task is missing the start_date parameter, which most often indicates that you forgot to bind to the dag operator.

  • All on one machine. Yes, and bases (Airflow itself and our coating), and a web server, and a scheduler, and workers. And it even worked. But over time, the number of tasks for services grew, and when PostgreSQL began to respond to the index in 20 s instead of 5 ms, we took it and carried it away.
  • LocalExecutor. Yes, we are still sitting on it, and we have already come to the edge of the abyss. LocalExecutor has been enough for us so far, but now it's time to expand with at least one worker, and we'll have to work hard to move to CeleryExecutor. And in view of the fact that you can work with it on one machine, nothing stops you from using Celery even on a server, which “of course, will never go into production, honestly!”
  • Non-use built-in tools:
    • Connections to store service credentials,
    • SLA Misses to respond to tasks that did not work out on time,
    • xcom for metadata exchange (I said Metadata!) between dag tasks.
  • Mail abuse. Well, what can I say? Alerts were set up for all repetitions of fallen tasks. Now my work Gmail has >90k emails from Airflow, and the web mail muzzle refuses to pick up and delete more than 100 at a time.

More pitfalls: Apache Airflow Pitfails

More automation tools

In order for us to work even more with our heads and not with our hands, Airflow has prepared for us this:

  • REST API - he still has the status of Experimental, which does not prevent him from working. With it, you can not only get information about dags and tasks, but also stop/start a dag, create a DAG Run or a pool.
  • CLI - many tools are available through the command line that are not just inconvenient to use through the WebUI, but are generally absent. For example:
    • backfill needed to restart task instances.
      For example, analysts came and said: “And you, comrade, have nonsense in the data from January 1 to 13! Fix it, fix it, fix it, fix it!" And you are such a hob:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Base service: initdb, resetdb, upgradedb, checkdb.
    • run, which allows you to run one instance task, and even score on all dependencies. Moreover, you can run it via LocalExecutor, even if you have a Celery cluster.
    • Does pretty much the same thing test, only also in bases writes nothing.
    • connections allows mass creation of connections from the shell.
  • python api - a rather hardcore way of interacting, which is intended for plugins, and not swarming in it with little hands. But who's to stop us from going to /home/airflow/dagsrun ipython and start messing around? You can, for example, export all connections with the following code:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Connecting to the Airflow metadatabase. I do not recommend writing to it, but getting task states for various specific metrics can be much faster and easier than through any of the APIs.

    Let's say that not all of our tasks are idempotent, but they can sometimes fall, and this is normal. But a few blockages are already suspicious, and it would be necessary to check.

    Beware SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

references

And of course, the first ten links from the issuance of Google are the contents of the Airflow folder from my bookmarks.

And the links used in the article:

Source: habr.com