Airflow — інструмент, щоб зручно та швидко розробляти та підтримувати batch-процеси обробки даних

Airflow — інструмент, щоб зручно та швидко розробляти та підтримувати batch-процеси обробки даних

Привіт, Хабре! У цій статті я хочу розповісти про один чудовий інструмент для розробки batch-процесів обробки даних, наприклад, в інфраструктурі корпоративного DWH або вашого DataLake. Йтиметься про Apache Airflow (далі Airflow). Він несправедливо обділений увагою на Хабре, і в основному я спробую переконати вас у тому, що як мінімум на Airflow варто дивитися при виборі планувальника для ETL/ELT-процесів.

Раніше я писав серію статей на тему DWH, коли працював у Тінькофф Банку. Тепер я став частиною команди Mail.Ru Group та займаюся розвитком платформи для аналізу даних на ігровому напрямку. Власне, у міру появи новин та цікавих рішень ми з командою розповідатимемо тут про нашу платформу для аналітики даних.

Пролог

Тож почнемо. Що таке Airflow? Це бібліотека (ну або набір бібліотек) для розробки, планування та моніторингу робочих процесів. Основна особливість Airflow: для опису (розробки) процесів використовується код Python. Звідси випливає безліч переваг для організації вашого проекту та розробки: по суті, ваш (наприклад) ETL-проект – це просто Python-проект, і ви можете його організовувати як вам зручно, враховуючи особливості інфраструктури, розмір команди та інші вимоги. Інструментально просто. Використовуйте, наприклад, PyCharm+Git. Це чудово та дуже зручно!

Тепер розглянемо основні сутності Airflow. Зрозумівши їхню суть і призначення, ви оптимально організуєте архітектуру процесів. Мабуть, основна сутність це Directed Acyclic Graph (далі DAG).

DAG

DAG - це деяке смислове поєднання ваших завдань, які ви хочете виконати в строго певній послідовності за розкладом. Airflow представляє зручний web-інтерфейс для роботи з DAG'ами та іншими сутностями:

Airflow — інструмент, щоб зручно та швидко розробляти та підтримувати batch-процеси обробки даних

DAG може виглядати таким чином:

Airflow — інструмент, щоб зручно та швидко розробляти та підтримувати batch-процеси обробки даних

Розробник, проектуючи DAG, закладає набір операторів, де будуть побудовані завдання всередині DAG'а. Тут ми приходимо до однієї важливої ​​сутності: Airflow Operator.

Оператори

Оператор - це сутність, на підставі якої створюються екземпляри завдань, де описується, що відбуватиметься під час виконання екземпляра завдання. Релізи Airflow з GitHub вже містять набір операторів, які готові до використання. Приклади:

  • BashOperator - оператор для виконання bash-команди.
  • PythonOperator – оператор для виклику Python-коду.
  • EmailOperator - оператор для відправки email'а.
  • HTTPOperator – оператор для роботи з http-запитами.
  • SqlOperator - оператор для виконання SQL-коду.
  • Sensor - оператор очікування події (настання потрібного часу, появи потрібного файлу, рядки в базі БД, відповіді з API - і т.д., і т.п.).

Є більш специфічні оператори: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Ви також можете розробляти оператори, орієнтуючись на свої особливості, та використовувати їх у проекті. Наприклад, ми створили MongoDBToHiveViaHdfsTransfer, оператор експорту документів з MongoDB до Hive, і кілька операторів для роботи з Натисніть Будинок: CHLoadFromHiveOperator та CHTableLoaderOperator. По суті, як тільки в проекті виникає код, що часто використовується, побудований на базових операторах, можна задуматися про те, щоб зібрати його в новий оператор. Це спростить подальшу розробку, і ви поповните свою бібліотеку операторів у проекті.

Далі всі ці екземпляри завдань потрібно виконувати, і тепер мова піде про планувальника.

Планувальник

Планувальник завдань в Airflow побудований на Селера. Celery - це Python-бібліотека, що дозволяє організувати чергу плюс асинхронне та розподілене виконання завдань. З боку Airflow всі завдання поділяються на пули. Пули створюються вручну. Як правило, їхня мета - обмежити навантаження на роботу з джерелом або типізувати завдання всередині DWH. Пулами можна керувати через web-інтерфейс:

Airflow — інструмент, щоб зручно та швидко розробляти та підтримувати batch-процеси обробки даних

Кожен пул має обмеження кількості слотів. При створенні DAG'а йому задається пул:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Пул, заданий лише на рівні DAG'а, можна перевизначити лише на рівні завдання.
За планування всіх завдань Airflow відповідає окремий процес — Scheduler. Власне, Scheduler займається всією механікою постановки завдань виконання. Завдання, перш ніж потрапити на виконання, проходить кілька етапів:

  1. У DAG'і виконані попередні завдання, нову можна поставити у чергу.
  2. Черга сортується залежно від пріоритету завдань (пріоритетами також можна керувати), і, якщо в пулі є вільний слот, завдання можна взяти в роботу.
  3. Якщо є вільний worker celery, завдання прямує до нього; починається робота, яку ви запрограмували в задачі, використовуючи той чи інший оператор.

Досить просто.

Scheduler працює на безлічі всіх DAG'ів та всіх завдань усередині DAG'ів.

Щоб Scheduler почав роботу з DAG'ом, DAG'у потрібно задати розклад:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Є набір готових preset'ів: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Також можна використовувати cron-вирази:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Дата виконання

Щоб розібратися в тому, як працює Airflow, важливо розуміти, що таке Execution Date для DAG'а. У Airflow DAG має вимір Execution Date, т. е. залежно від розкладу роботи DAG'а створюються екземпляри завдань кожну Execution Date. І за кожну Execution Date завдання можна виконати повторно - або, наприклад, DAG може працювати одночасно в кількох Execution Date. Це наочно відображено тут:

Airflow — інструмент, щоб зручно та швидко розробляти та підтримувати batch-процеси обробки даних

На жаль (а можливо, і на щастя: залежить від ситуації), якщо керується реалізація завдання в DAG'і, то виконання в попередніх Execution Date піде вже з урахуванням коригувань. Це добре, якщо потрібно перерахувати дані в минулих періодах новим алгоритмом, але погано, тому що втрачається відтворюваність результату (звісно, ​​ніхто не заважає повернути з Git'а потрібну версію вихідного коду і разово порахувати те, що потрібно так, як потрібно).

Генерація завдань

Реалізація DAG'а — код на Python, тому ми маємо дуже зручний спосіб скоротити обсяг коду при роботі, наприклад, із шардованими джерелами. Нехай у вас як джерело три шарди MySQL, вам потрібно злазити в кожен і забрати якісь дані. Причому незалежно та паралельно. Код на Python у DAG'і може виглядати так:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG виходить таким:

Airflow — інструмент, щоб зручно та швидко розробляти та підтримувати batch-процеси обробки даних

При цьому можна додати або прибрати шард, просто скоригувавши налаштування та оновивши DAG. Зручно!

Можна використовувати більш складну генерацію коду, наприклад працювати з джерелами у вигляді БД або описувати табличну структуру, алгоритм роботи з таблицею і з урахуванням особливостей інфраструктури DWH генерувати процес завантаження N таблиць до вас у сховище. Або, наприклад, роботу з API, яке не підтримує роботу з параметром у вигляді списку, ви можете згенерувати за цим списком N завдань у DAG'і, обмежити паралельність запитів в API пулом і вигрібти з API необхідні дані. Гнучко!

Репозиторій

В Airflow є свій бекенд-репозиторій, БД (може бути MySQL або Postgres, у нас Postgres), в якій зберігаються стани завдань, DAG'ів, налаштування з'єднань, глобальні змінні і т.д., і т.п. сказати, що репозиторій в Airflow дуже простий (близько 20 таблиць) і зручний, якщо ви хочете побудувати свій процес над ним. Згадується 100500 XNUMX таблиць у репозиторії Informatica, які потрібно було довго курити, перш ніж зрозуміти, як побудувати запит.

моніторинг

Враховуючи простоту репозиторію, ви можете самі побудувати зручний для вас процес моніторингу завдань. Ми використовуємо блокнот у Zeppelin, де дивимося стан завдань:

Airflow — інструмент, щоб зручно та швидко розробляти та підтримувати batch-процеси обробки даних

Це може бути і web-інтерфейс самого Airflow:

Airflow — інструмент, щоб зручно та швидко розробляти та підтримувати batch-процеси обробки даних

Код Airflow відкритий, тому ми додали до себе алертинг в Telegram. Кожен працюючий інстанс завдання, якщо відбувається помилка, спамить у групу Telegram, де складається вся команда розробки та підтримки.

Отримуємо через Telegram оперативне реагування (якщо таке потрібно), через Zeppelin – загальну картину із завдань Airflow.

Разом

Airflow насамперед open source, і не потрібно чекати від нього чудес. Будьте готові витратити час і сили на те, щоб вибудувати діюче рішення. Мета з розряду досяжних, повірте, воно того варте. Швидкість розробки, гнучкість, простота додавання нових процесів вам сподобається. Звісно, ​​треба приділяти багато уваги організації проекту, стабільності роботи самого Airflow: чудес не буває.

Зараз у нас Airflow щодня відпрацьовує близько 6,5 тисяч задач. За характером вони досить різні. Є завдання завантаження даних в основне DWH з безлічі різних і дуже специфічних джерел, є завдання розрахунку вітрин всередині основного DWH, є завдання публікації даних в швидке DWH, є багато різних завдань - і Airflow всі їх пережовує день за днем. Якщо ж говорити цифрами, то це 2,3 тисячі ELT задач різної складності всередині DWH (Hadoop), навколо 2,5 сотень баз даних джерел, це команда з 4-ох ETL розробників, які діляться на ETL процесинг даних у DWH і ELT процесинг даних усередині DWH і ще одного адміна, що займається інфраструктурою сервісу.

Плани на майбутнє

Кількість процесів неминуче зростає, і основне, чим ми займатимемося в частині інфраструктури Airflow – це масштабування. Ми хочемо побудувати кластер Airflow, виділити пару ніг для worker'ов Celery і зробити голову, що дублює себе, з процесами планування завдань і репозиторієм.

Епілог

Це, звичайно, далеко не все, що хотілося б розповісти про Airflow, але основні моменти я постарався висвітлити. Апетит приходить під час їжі, спробуйте - і вам сподобається 🙂

Джерело: habr.com

Додати коментар або відгук