Airflow is a tool to conveniently and quickly develop and maintain batch data processing processes

Airflow is a tool to conveniently and quickly develop and maintain batch data processing processes

Hey Habr! In this article, I want to talk about one great tool for developing batch data processing processes, for example, in the infrastructure of a corporate DWH or your DataLake. We will talk about Apache Airflow (hereinafter referred to as Airflow). He is unfairly deprived of attention on Habré, and in the main part I will try to convince you that at least Airflow is worth looking at when choosing a scheduler for your ETL / ELT processes.

Previously, I wrote a series of articles on the topic of DWH when I worked at Tinkoff Bank. Now I have become a part of the Mail.Ru Group team and am developing a platform for data analysis in the gaming area. Actually, as news and interesting solutions appear, the team and I will talk here about our platform for data analytics.

Prologue

So, let's begin. What is Airflow? This is a library (or set of libraries) to develop, plan and monitor workflows. The main feature of Airflow is that Python code is used to describe (develop) processes. This has a lot of advantages for organizing your project and development: in fact, your (for example) ETL project is just a Python project, and you can organize it as you like, taking into account the features of the infrastructure, team size and other requirements. Instrumentally, everything is simple. Use for example PyCharm + Git. It's great and very convenient!

Now let's look at the main entities of Airflow. Having understood their essence and purpose, you will optimally organize the process architecture. Perhaps the main entity is the Directed Acyclic Graph (hereinafter DAG).

DAY

DAG is some semantic association of your tasks that you want to complete in a strictly defined sequence on a specific schedule. Airflow presents a convenient web interface for working with DAGs and other entities:

Airflow is a tool to conveniently and quickly develop and maintain batch data processing processes

DAG might look like this:

Airflow is a tool to conveniently and quickly develop and maintain batch data processing processes

The developer, when designing a DAG, lays down a set of operators on which tasks within the DAG will be built. Here we come to another important entity: the Airflow Operator.

Operators

An operator is an entity on the basis of which job instances are created, which describes what will happen during the execution of a job instance. Airflow releases from GitHub already contain a set of statements ready to be used. Examples:

  • BashOperator is an operator for executing a bash command.
  • PythonOperator is an operator for calling Python code.
  • EmailOperator - operator for sending email.
  • HTTPOperator - an operator for working with http requests.
  • SqlOperator is an operator for executing SQL code.
  • Sensor is an operator for waiting for an event (the arrival of the desired time, the appearance of the required file, a row in the database, a response from the API, etc., etc.).

There are more specific operators: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

You can also develop operators to suit your needs and use them in your project. For example, we have created MongoDBToHiveViaHdfsTransfer, an operator for exporting documents from MongoDB to Hive, and several operators for working with clickhouse: CHLoadFromHiveOperator and CHTableLoaderOperator. In fact, as soon as a project has frequently used code built on basic statements, you can think about compiling it into a new statement. This will simplify further development, and you will add to your library of operators in the project.

Further, all these instances of tasks need to be performed, and now we will talk about the scheduler.

Scheduler

The task scheduler in Airflow is built on Celery. Celery is a Python library that allows you to organize a queue plus asynchronous and distributed execution of tasks. From the Airflow side, all tasks are divided into pools. Pools are created manually. As a rule, their purpose is to limit the load on working with the source or to type tasks inside the DWH. Pools can be managed via the web interface:

Airflow is a tool to conveniently and quickly develop and maintain batch data processing processes

Each pool has a limit on the number of slots. When creating a DAG, it is given a pool:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

The pool set at the DAG level can be overridden at the task level.
A separate process, Scheduler, is responsible for scheduling all tasks in Airflow. Actually, the Scheduler deals with all the mechanics of setting tasks for execution. A task goes through several stages before being executed:

  1. Previous tasks have been completed in the DAG, a new one can be queued.
  2. The queue is sorted depending on the priority of tasks (priorities can also be controlled), and if there is a free slot in the pool, the task can be taken to work.
  3. If there is a free worker celery, the task is sent to it; the work that you programmed in the task begins, using one or another operator.

Simple enough.

The Scheduler runs on a set of all DAGs and all tasks within DAGs.

In order for the Scheduler to start working with the DAG, the DAG needs to set a schedule:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

There is a set of ready-made presets: @once, @hourly, @daily, @weekly, @monthly, @yearly.

You can also use cron expressions:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Execution Date

To understand how Airflow works, it's important to understand what an Execution Date is for a DAG. The Airflow DAG has the Execution Date dimension, i.e., depending on the DAG's work schedule, task instances are created for each Execution Date. And for each Execution Date, tasks can be re-executed - or, for example, a DAG can work simultaneously in several Execution Dates. This is clearly shown here:

Airflow is a tool to conveniently and quickly develop and maintain batch data processing processes

Unfortunately (or maybe fortunately: it depends on the situation), if the implementation of the task in the DAG is correct, then the execution in the previous Execution Date will go with the adjustments. This is good if you need to recalculate data in past periods using a new algorithm, but it’s bad because the reproducibility of the result is lost (of course, no one bothers to return the required version of the source code from Git and calculate what you need once, as needed).

Task generation

The DAG implementation is Python code, so we have a very convenient way to reduce the amount of code when working, for example, with sharded sources. Suppose you have three MySQL shards as a source, you need to climb into each and pick up some data. And independently and in parallel. The Python code in the DAG might look like this:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

The DAG looks like this:

Airflow is a tool to conveniently and quickly develop and maintain batch data processing processes

At the same time, you can add or remove a shard by simply adjusting the setting and updating the DAG. Comfortable!

You can also use more complex code generation, for example, work with sources in the form of a database or describe a tabular structure, an algorithm for working with a table, and, taking into account the features of the DWH infrastructure, generate the process of loading N tables into your storage. Or, for example, working with an API that does not support working with a parameter in the form of a list, you can generate N tasks in a DAG using this list, limit the parallelism of requests in the API to a pool, and extract the necessary data from the API. Flexible!

Repository

Airflow has its own backend repository, a database (maybe MySQL or Postgres, we have Postgres), which stores the states of tasks, DAGs, connection settings, global variables, etc., etc. Here I would like to to say that the repository in Airflow is very simple (about 20 tables) and convenient if you want to build any of your processes on it. I remember 100500 tables in the Informatica repository, which had to be smoked for a long time before understanding how to build a query.

Monitoring

Given the simplicity of the repository, you can build a process for monitoring tasks that is convenient for you. We use a notepad in Zeppelin, where we look at the status of tasks:

Airflow is a tool to conveniently and quickly develop and maintain batch data processing processes

It can also be the web interface of Airflow itself:

Airflow is a tool to conveniently and quickly develop and maintain batch data processing processes

The Airflow code is open, so we added an alert in Telegram. Each running task instance, if an error occurs, spams to the Telegram group, where the entire development and support team consists.

We get a prompt response through Telegram (if required), through Zeppelin - an overall picture of the tasks in Airflow.

Total

Airflow is open source first and foremost, and don't expect miracles from it. Be prepared to put in the time and effort to build a working solution. A goal from the category of achievable, believe me, it's worth it. Development speed, flexibility, ease of adding new processes - you will love it. Of course, you need to pay a lot of attention to the organization of the project, the stability of the work of Airflow itself: there are no miracles.

Now we have Airflow working daily about 6,5 thousand tasks. They are quite different in nature. There are tasks for loading data into the main DWH from many different and very specific sources, there are tasks for calculating storefronts inside the main DWH, there are tasks for publishing data into a fast DWH, there are many, many different tasks - and Airflow chews them all day after day. Speaking in numbers, this is 2,3 thousand ELT tasks of varying complexity inside DWH (Hadoop), about 2,5 hundred databases sources, this is a command from 4 ETL developers, which are divided into ETL data processing in DWH and ELT data processing within DWH and of course more one admin, which deals with the infrastructure of the service.

Plans for the future

The number of processes is inevitably growing, and the main thing we will be doing in terms of the Airflow infrastructure is scaling. We want to build an Airflow cluster, allocate a couple of legs for Celery workers, and make a duplicate head with job scheduling processes and a repository.

Finale

This, of course, is far from everything that I would like to talk about Airflow, but I tried to highlight the main points. Appetite comes with eating, try it and you will like it 🙂

Source: habr.com

Add a comment