Apache Airflow: робимо ETL простіше

Привіт, я Дмитро Логвиненко – Data Engineer відділу аналітики групи компаній «Везе».

Я розповім вам про чудовий інструмент розробки ETL-процесів — Apache Airflow. Але Airflow настільки універсальний і багатогранний, що вам варто придивитися до нього навіть якщо ви не займаєтеся потоками даних, а потребуєте періодично запускати будь-які процеси і стежити за їх виконанням.

І так, я не лише розповідатиму, а й показуватиму: у програмі багато коду, скріншотів та рекомендацій.

Apache Airflow: робимо ETL простіше
Що зазвичай бачиш, коли гуглиш слово Airflow / Wikimedia Commons

Зміст

Запровадження

Apache Airflow - він прямо як Django:

  • написаний на Python,
  • є відмінна адмінка,
  • необмежено розширюємо,

- тільки краще, та й зроблено зовсім для інших цілей, а саме (як написано до ката):

  • запуск та моніторинг завдань на необмеженій кількості машин (скільки вам дозволить Celery/Kubernetes та ваша совість)
  • з динамічною генерацією workflow з дуже легкого для написання та сприйняття Python-коду
  • і можливістю пов'язувати один з одним будь-які бази даних та API за допомогою як готових компонентів, так і саморобних плагінів (що робиться надзвичайно просто).

Ми використовуємо Apache Airflow так:

  • збираємо дані з різних джерел (безліч інстансів SQL Server і PostgreSQL, різні API з метриками додатків, навіть 1С) в DWH і ODS (у нас це Vertica і Clickhouse).
  • як просунутий cron, який запускає процеси консолідації даних на ODS, а також слідкує за їх обслуговуванням.

Донедавна наші потреби покривав один невеликий сервер на 32 ядрах та 50 GB оперативної пам'яті. В Airflow при цьому працює:

  • більш 200 дагов (Власне workflows, в які ми набили завдання),
  • у кожному в середньому по 70 тяган,
  • запускається це добро (теж у середньому) раз на годину.

А про те, як ми розширювалися, я напишу нижче, а зараз давайте визначимо über-завдання, яке ми вирішуватимемо:

Є три вихідні SQL Server'и, на кожному по 50 баз даних - інстансів одного проекту, відповідно, структура у них однакова (майже скрізь, муа-ха-ха), а значить у кожній є таблиця Orders (благо таблицю з такою назвою можна заштовхати у будь-який бізнес). Ми забираємо дані, додаючи службові поля (сервер-джерело, база-джерело, ідентифікатор ETL-завдання) і наївним чином кинемо їх у, скажімо, Vertica.

Поїхали!

Частина основна, практична (і трохи теоретична)

Навіщо воно нам (і вам)

Коли дерева були великі, а я був простим SQL-щиком в одному російському рітейлі, ми шпарили ETL-процеси aka потоки даних за допомогою двох доступних нам засобів:

  • Informatica Power Center - Вкрай розлога система, надзвичайно продуктивна, зі своїми залозками, власним версіонуванням. Використав я дай боже 1% її можливостей. Чому? Ну, по-перше, цей інтерфейс десь із нульових психічно тиснув на нас. По-друге, ця штуковина заточена під надзвичайно наворочені процеси, шалене перевикористання компонентів та інші дуже-важливі-ентерпрайз-фішечки. Про те, що стоїть вона, як крило Airbus A380/рік, ми промовчимо.

    Обережно, скріншот може зробити людям молодше 30 трохи боляче

    Apache Airflow: робимо ETL простіше

  • SQL Server Integration Server — цим товаришем ми користувалися у своїх внутрішньопроектних потоках. Ну а насправді: SQL Server ми вже використовуємо, і не користуватися його ETL-тулзи було б якось нерозумно. Все в ньому добре: і інтерфейс красивий, і звіти виконання ... Але не за це ми любимо програмні продукти, ох не за це. Версіонувати його dtsx (який являє собою XML з нодами, що перемішуються при збереженні) ми можемо, а толку? А зробити пакет тяган, який перетягне сотню таблиць з одного сервера на інший? Так що сотню, у вас від двадцяти штук відвалиться вказівний палець, що клацає мишачою кнопкою. Але виглядає він, безперечно, більш модно:

    Apache Airflow: робимо ETL простіше

Ми, безумовно, шукали виходи. Справа навіть майже дійшло до самописного генератора SSIS-пакетів.

…а потім мене знайшла нова робота. А на ній мене наздогнав Apache Airflow.

Коли я дізнався, що описи ETL-процесів – це простий Python-код, я щойно не танцював від радості. Ось так потоки даних зазнали версіонування та дифу, а зсипати таблиці з єдиною структурою з сотні баз даних в один таргет стало справою Python-коду в півтора-два 13” екрани.

Збираємо кластер

Давайте не влаштовувати зовсім дитячий садок, і не говорити тут про зовсім очевидні речі, на кшталт установки Airflow, обраної вами БД, Celery та інших справ, описаних у доках.

Щоб ми могли відразу розпочати експерименти, я накидав docker-compose.yml в якому:

  • Піднімемо власне Повітряний потік: Scheduler, Webserver. Там же буде крутитися Flower для моніторингу Celery-задач (бо його вже заштовхали в apache/airflow:1.10.10-python3.7, А ми і не проти);
  • PostgreSQL, в який Airflow писатиме свою службову інформацію (дані планувальника, статистика виконання і т. д.), а Celery - відзначати завершені таски;
  • Redisякий виступатиме брокером завдань для Celery;
  • Celery worker, що й займеться безпосереднім виконанням завдань.
  • У папку ./dags ми будемо складати наші файли з описом дагов. Вони підхоплюватимуться на льоту, тому пересмикувати весь стек після кожного чіха не потрібно.

Де-не-де код у прикладах наведено не повністю (щоб не захаращувати текст), а десь він модифікується в процесі. Цілісні працюючі приклади коду можна подивитися в репозиторії https://github.com/dm-logv/airflow-tutorial.

докер-компост.імл

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Примітки:

  • У складанні композу я багато в чому спирався на відомий образ puckel/docker-airflow – обов'язково подивіться. Може, вам у житті більше нічого не знадобиться.
  • Усі налаштування Airflow доступні не тільки через airflow.cfgале через змінні середовища (слава розробникам), чим я злісно скористався.
  • Природно, він не production-ready: я навмисно не ставив heartbeats на контейнери, не морочився з безпекою. Але щонайменше підходящий для наших експериментиків я зробив.
  • Зверніть увагу, що:
    • Папка з дагами має бути доступна як планувальнику, так і воркерам.
    • Те саме стосується й усіх сторонніх бібліотек — всі вони мають бути встановлені на машини з шедулером та воркерами.

Ну а тепер просто:

$ docker-compose up --scale worker=3

Після того, як все підніметься, можна дивитися на веб-інтерфейси:

Основні поняття

Якщо ви нічого не зрозуміли у всіх цих «дагах», то ось короткий словничок:

  • Планувальник — найголовніший дядько в Airflow, який контролює, щоб працювали роботи, а не людина: стежить за розкладом, оновлює даги, запускає таски.

    Взагалі, у старих версіях, у нього були проблеми з пам'яттю (ні, не амнезія, а витоку) і в конфігах навіть залишився легасі-параметр run_duration - Інтервал його перезапуску. Але зараз все добре.

  • DAG (він же "даг") - "спрямований ациклічний граф", але таке визначення мало кому що скаже, а по суті це контейнер для тягарів, що взаємодіють один з одним (див. нижче) або аналог Package в SSIS і Workflow в Informatica.

    Крім дагів ще можуть бути сабдаги, але ми до них швидше за все не дістанемося.

  • DAG Run - Ініціалізований даг, якому присвоєно свій execution_date. Даграни одного дага можуть працювати паралельно (якщо ви, звичайно, зробили свої таски ідемпотентними).
  • Оператор - це шматочки коду, відповідальні за виконання будь-якої конкретної дії. Є три типи операторів:
    • діюнаприклад наш улюблений PythonOperator, який може виконати будь-який (валідний) Python-код;
    • переклад, які перевозять дані з місця на місце, скажімо, MsSqlToHiveTransfer;
    • датчик ж дозволить реагувати або пригальмувати подальше виконання дагу до настання будь-якої події. HttpSensor може смикати вказаний ендпойнт, і коли дочекається відповідь, запустити трансфер GoogleCloudStorageToS3Operator. Допитливий розум запитає: «Навіщо? Адже можна робити повтори прямо в операторі! А потім, щоб не забивати пул тасків операторами, що підвисли. Сенсор запускається, перевіряє та вмирає до наступної спроби.
  • Завдання — оголошені оператори незалежно від типу та прикріплені до дагу підвищуються до чину таска.
  • Task instance — коли генерал-планувальник вирішив, що таски настав час відправляти в бій на виконавці-воркери (прямо на місці, якщо ми використовуємо LocalExecutor або на віддалену ноду у випадку з CeleryExecutor), він призначає їм контекст (тобто комплект змінних - параметрів виконання), розгортає шаблони команд чи запитів і складає в пул.

Генеруємо таски

Спершу позначимо загальну схему нашого дага, а потім все більше і більше занурюватимемося в деталі, тому що ми застосовуємо деякі нетривіальні рішення.

Отже, у найпростішому вигляді такий даг виглядатиме так:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Давайте розбиратися:

  • Спершу імпортуємо потрібні ліби та дещо ще;
  • sql_server_ds - це List[namedtuple[str, str]] з іменами коннектів з Airflow Connections та базами даних з яких ми забиратимемо нашу табличку;
  • dag - оголошення нашого дага, яке обов'язково має лежати в globals(), інакше Airflow його не знайде. Дагу також слід сказати:
    • що його звуть orders — це ім'я потім маячить у веб-інтерфейсі,
    • що працюватиме він, починаючи з півночі восьмого липня,
    • а запускати він повинен приблизно кожні 6 годин (для крутих хлопців тут замість timedelta() допустима cron-рядок 0 0 0/6 ? * * *для менш крутих - вираз начебто @daily);
  • workflow() робитиме основну роботу, але не зараз. Зараз ми просто висипаємо наш контекст у балку.
  • А тепер проста магія створення тасків:
    • пробігаємо нашими джерелами;
    • ініціалізуємо PythonOperator, який буде виконувати нашу пустушку workflow(). Не забувайте вказувати унікальне (в рамках дага) ім'я таска та підв'язувати сам даг. Прапор provide_context у свою чергу насипатиме в функцію додаткових аргументів, які ми дбайливо зберемо за допомогою **context.

Поки що на цьому все. Що ми отримали:

  • новий даг у веб-інтерфейсі,
  • півтори сотні тяг, які будуть виконуватися паралельно (якщо дозволять налаштування Airflow, Celery і потужності серверів).

Ну майже отримали.

Apache Airflow: робимо ETL простіше
Залежно хто ставитиме?

Щоб всю цю справу спростити я вкорячив у docker-compose.yml обробку requirements.txt на всіх нодах.

Ось тепер помчала:

Apache Airflow: робимо ETL простіше

Сірі квадратики - task instances, оброблені планувальником.

Трохи чекаємо, завдання розхоплюють воркери:

Apache Airflow: робимо ETL простіше

Зелені, ясна річ, успішно відпрацювали. Червоні – не дуже успішно.

До речі, на нашому праді жодної папки ./dags, що синхронізується між машинами немає - всі даги лежать у git на нашому Gitlab, а Gitlab CI розкладає оновлення на машини при мерджі master.

Трохи про Flower

Поки воркери молотять наші тасочки-пустушки, згадаємо ще один інструмент, який може нам дещо показати — Flower.

Найперша сторінка із сумарною інформацією по нодам-воркерам:

Apache Airflow: робимо ETL простіше

Найнасиченіша сторінка із завданнями, що вирушили в роботу:

Apache Airflow: робимо ETL простіше

Найнудніша сторінка зі станом нашого брокера:

Apache Airflow: робимо ETL простіше

Найяскравіша сторінка — з графіками стану тяган та їх часом виконання:

Apache Airflow: робимо ETL простіше

Довантажуємо недовантажене

Отже, всі таски відпрацювали, можна забирати поранених.

Apache Airflow: робимо ETL простіше

А поранених виявилося чимало — з тих чи інших причин. У разі правильного використання Airflow ось ці квадрати говорять про те, що дані безумовно не доїхали.

Потрібно дивитися лог і перезапускати task instances, що впали.

Жм'якнувши на будь-який квадрат, побачимо доступні нам дії:

Apache Airflow: робимо ETL простіше

Можна взяти, і зробити Clear впав. Тобто ми забуваємо про те, що там щось завалилося, і той самий інстанс таска піде планувальнику.

Apache Airflow: робимо ETL простіше

Зрозуміло, що робити так мишкою з усіма червоними квадратами не дуже гуманно – не на це ми чекаємо від Airflow. Звичайно, у нас є зброя масової поразки: Browse/Task Instances

Apache Airflow: робимо ETL простіше

Виберемо все разом і обнулимо натиснемо правильний пункт:

Apache Airflow: робимо ETL простіше

Після очищення наші таксі виглядають так (вони вже чекають не дочекаються, коли шедулер їх запланує):

Apache Airflow: робимо ETL простіше

З'єднання, хуки та інші змінні

Саме час подивитися на наступний DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Адже все-таки робили обновлялку звітів? Це знову вона: є перелік джерел, звідки забрати дані; є перелік, куди покласти; не забуваємо посигналити, коли все трапилося чи зламалося (ну це не про нас, ні).

Давайте знову пройдемося файлом і подивимося на нові незрозумілі штуки:

  • from commons.operators import TelegramBotSendMessage — нам ніщо не заважає робити свої оператори, ніж ми й скористалися, зробивши невелику обгортку для надсилання повідомлень до Розблокованого. (Про цей оператор ми ще поговоримо нижче);
  • default_args={} — даг може роздавати одні й самі аргументи всім своїм операторам;
  • to='{{ var.value.all_the_kings_men }}' - поле to у нас буде не захардшкіреним, а формованим динамічно за допомогою Jinja та змінної зі списком email-ів, яку я дбайливо поклав у Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - Умова запуску оператора. У нашому випадку лист полетить босам тільки якщо всі залежності відпрацювали. успішно;
  • tg_bot_conn_id='tg_main' - аргументи conn_id приймають у собі ідентифікатори з'єднань, які ми створюємо в Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED — повідомлення в Telegram відлетять тільки за наявності тягаків, що впали;
  • task_concurrency=1 - Забороняємо одночасний запуск кількох task instances одного тяга. В іншому випадку, ми отримаємо одночасний запуск кількох VerticaOperator (Дивляться одну таблицю);
  • report_update >> [email, tg] - Усе VerticaOperator зійдуться у надсиланні листа та повідомлення, ось так:
    Apache Airflow: робимо ETL простіше

    Але оскільки оператори-нотифікатори мають різні умови запуску, працюватиме лише один. У Tree View все виглядає дещо менш наочно:
    Apache Airflow: робимо ETL простіше

Скажу пару слів про макросах та їхніх друзів змінних.

Макроси - це Jinja-плейсхолдери, які можуть підставляти різну корисну інформацію аргументи операторів. Наприклад, так:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} розгорнеться у вміст змінної контексту execution_date у форматі YYYY-MM-DD: 2020-07-14. Найприємніше, що змінні контексту прибиваються цвяхами до певного інстансу тяга (квадратику в Tree View), і при перезапуску плейсхолдери розкриються в ті самі значення.

Призначені значення можна дивитися за допомогою кнопки Rendered на кожному таск-інстансі. Ось так біля тяга з відправкою листа:

Apache Airflow: робимо ETL простіше

А так біля таски з відправкою повідомлення:

Apache Airflow: робимо ETL простіше

Повний список вбудованих макросів для останньої доступної версії доступний тут: Macros Reference

Більше того, за допомогою плагінів ми можемо оголошувати власні макроси, але це вже зовсім інша історія.

Крім зумовлених штук, ми можемо підставляти значення своїх змінних (вище в коді я вже скористався цим). Створимо в Admin/Variables пару штук:

Apache Airflow: робимо ETL простіше

Все, можна скористатися:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

У значенні може бути скаляр, а може бути і JSON. У випадку JSON-а:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

просто використовуємо шлях до потрібного ключа: {{ var.json.bot_config.bot.token }}.

Скажу буквально одне слово і покажу один скріншот про з'єднання. Тут все просто: на сторінці Admin/Connections створюємо з'єднання, складаємо туди наші логіни/паролі та більш специфічні параметри. Ось так:

Apache Airflow: робимо ETL простіше

Паролі можна шифрувати (ретельніше, ніж у варіанті за умовчанням), а можна не вказувати тип з'єднання (як я зробив для tg_main) - Справа в тому, що список типів зашитий в моделях Airflow і розширенню без влазіння в вихідні джерела не піддається (якщо раптом я чогось не догуглил - прошу мене поправити), але отримати креди просто на ім'я нам ніщо не завадить.

А ще можна зробити кілька з'єднань з одним ім'ям: у такому разі метод BaseHook.get_connection(), який дістає нам з'єднання на ім'я, буде віддавати випадкового з кількох тезок (було б логічніше зробити Round Robin, але залишимо це на совісті розробників Airflow).

Variables і Connections, безумовно, класні засоби, але важливо не втратити баланс: які частини ваших потоків ви зберігаєте власне в коді, а які віддаєте на зберігання Airflow. З одного боку швидко змінити значення, наприклад, ящик розсилки, може бути зручно через UI. А з іншого — це все-таки повернення до мишеклика, якого ми (я) хотіли позбутися.

Робота зі з'єднаннями – це одне із завдань хуків. Взагалі хуки Airflow - це точки підключення його до сторонніх сервісів та бібліотек. Наприклад, JiraHook відкриє нам клієнт взаємодії з Jira (можна завдання подвигать туди-сюди), а з допомогою SambaHook можна запустити локальний файл на smb-Точку.

Розбираємо кастомний оператор

І ми впритул підібралися до того, щоб подивитися на те, як зроблено TelegramBotSendMessage

Код commons/operators.py з власне оператором:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Тут, як і решта Airflow, все дуже просто:

  • Відспадкували від BaseOperator, який реалізує досить багато Airflow-специфічних штук (подивіться на дозвіллі)
  • Оголосили поля template_fields, в яких Jinja шукатиме макроси для обробки.
  • Організували правильні аргументи для __init__(), Розставили замовчування, де треба.
  • Про ініціалізацію предка теж не забули.
  • Відкрили відповідний хук TelegramBotHookотримали від нього об'єкт-клієнт.
  • Оверрайднули (перевизначили) метод BaseOperator.execute(), який Airfow буде пересмикувати, коли настане час запускати оператор - в ньому ми і реалізуємо основну дію, забувши залогуватися. (Лоґуємося, до речі, прямо в stdout и stderr - Airflow все перехопить, красиво оберне, розкладе, куди треба.)

Давайте дивитися, що в нас у commons/hooks.py. Перша частина файлика, із самим хуком:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Я навіть не знаю, що тут можна пояснювати, просто зазначу важливі моменти:

  • Наслідуємо, думаємо над аргументами — у більшості випадків він буде один: conn_id;
  • Перевизначаємо стандартні методи: я обмежився get_conn(), в якому я отримую параметри з'єднання на ім'я і всього лише дістаю секцію extra (це поле для JSON), в яку я (за своєю ж інструкцією!) поклав токен Telegram-бота: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Створю екземпляр нашого TelegramBot, віддаючи йому конкретний токен.

От і все. Отримати клієнт з хука можна за допомогою TelegramBotHook().clent або TelegramBotHook().get_conn().

І друга частина файлика, в якому я зробити мікрообгортку для Telegram REST API, щоб не тягнути той же python-telegram-bot заради одного методу sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Правильний шлях - скласти все це: TelegramBotSendMessage, TelegramBotHook, TelegramBot - У плагін, покласти в загальнодоступний репозиторій, і віддати в Open Source.

Поки ми все це вивчали, наші оновлення звітів встигли успішно завалитися і надіслати мені канал повідомлення про помилку. Піду перевіряти, що знову не так…

Apache Airflow: робимо ETL простіше
У нашому дазі щось зламалося! А чи не на це ми чекали? Саме!

Наливати будеш?

Чи відчуваєте, що щось я пропустив? Як би обіцяв дані з SQL Server в Vertica переливати, і тут взяв і з'їхав з теми, негідник!

Злочинство це було навмисним, я просто повинен був розшифрувати вам деяку термінологію. Тепер можна їхати далі.

План у нас був такий:

  1. Зробити даг
  2. Нагенерувати таски
  3. Подивитися, як все красиво
  4. Надавати заливкам номери сесій
  5. Забрати дані з SQL Server
  6. Покласти дані у Vertica
  7. Зібрати статистику

Отже, щоби все це запустити, я зробив маленьке доповнення до нашого docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Там ми піднімаємо:

  • Vertica як хост dwh з дефолтними налаштуваннями,
  • три екземпляри SQL Server,
  • наповнюємо бази в останніх деякими даними (у жодному разі не заглядайте в mssql_init.py!)

Запускаємо все добро за допомогою трохи складнішої, ніж минулого разу, команди:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Що нагенерував наш чудорандомайзер, можна, скориставшись пунктом Data Profiling/Ad Hoc Query:

Apache Airflow: робимо ETL простіше
Головне, не показувати це аналітикам

Детально зупинятися на ETL-сесіях я не буду, там все тривіально: робимо базу, в ній табличку, обертаємо все менеджером контексту, і тепер робимо так:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

настала пора забрати наші дані з наших півтори сотні таблиць. Зробимо це за допомогою дуже невигадливих рядків:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. За допомогою хука отримаємо з Airflow pymssql-коннект
  2. У запит підставимо обмеження у вигляді дати - у функцію її підкине шаблонизатор.
  3. Згодовуємо наш запит pandas, який дістане для нас DataFrame - Він нам знадобиться надалі.

Я використовую підстановку {dt} замість параметра запиту %s не тому, що я злий Буратіно, а тому що pandas не може впоратися з pymssql і підсовує останньому params: Listхоча той дуже хоче tuple.
Також зверніть увагу, що розробник pymssql вирішив більше його не підтримувати, і саме час з'їхати на pyodbc.

Подивимося, чим Airflow нашпигував аргументи наших функцій:

Apache Airflow: робимо ETL простіше

Якщо даних не виявилося, то немає сенсу продовжувати. Але вважати заливання успішним теж дивно. Але це не помилка. А-а-а, що робити? А ось що:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException скаже Airflow, що помилки, власне, немає, а таск ми пропускаємо. В інтерфейсі буде не зелений і червоний квадратик, а кольору pink.

Підкинемо нашим даним кілька колонок:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

А саме:

  • БД, з якої ми забрали замовлення,
  • Ідентифікатор нашої сесії (вона буде різною) на кожен таск),
  • Хеш від джерела та ідентифікатора замовлення – щоб у кінцевій базі (де все ссипеться в одну таблицю) ми мали унікальний ідентифікатор замовлення.

Залишився передостанній крок: залити все у Vertica. А, як не дивно, один із найефективніших ефективних способів зробити це через CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Ми робимо спецприймач StringIO.
  2. pandas люб'язно складе в нього наш DataFrame у вигляді CSV-Рядок.
  3. Відкриємо з'єднання до нашої коханої Vertica хуком.
  4. А тепер за допомогою copy() відправимо наші дані прямо до Вертики!

З драйвера заберемо, скільки рядків засипалося, і скажемо менеджеру сесії, що все ОК:

session.loaded_rows = cursor.rowcount
session.successful = True

От і все.

На виробі ми створюємо цільову табличку вручну. Тут я дозволив собі невеликий автомат:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Я за допомогою VerticaOperator() створюю схему БД та таблицю (якщо їх ще немає, природно). Головне, правильно розставити залежності:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Підводимо підсумки

— Ну ось, — сказала мишеня, — чи не так, тепер
Ти переконався, що в лісі я найстрашніший звір?

Джулія Дональдсон, «Груффало»

Думаю, якби ми з моїми колегами влаштували змагання: хто швидше складе і запустить з нуля ETL-процес: вони зі своїми SSIS і мишкою і я з Airflow... А потім ми ще порівняли зручність супроводу... Ух, думаю, ви погодитеся, що я обійду їх по всіх напрямках!

Якщо ж трохи серйозніше, то Apache Airflow - за рахунок опису процесів у вигляді програмного коду - зробив мою роботу набагато зручніше та приємніше.

Його ж необмежена розширюваність: як у плані плагінів, так і схильність до масштабованості - дає вам можливість застосовувати Airflow практично в будь-якій галузі: хоч у повному циклі збору, підготовки та обробки даних, хоч у запуску ракет (на Марс, звичайно ж).

Частина заключна, довідково-інформаційна

Граблі, які ми зібрали за вас

  • start_date. Так, це вже локальний мемасик. Через головний аргумент дага start_date проходять усі. Коротко, якщо вказати в start_date поточну дату, а в schedule_interval - Один день, то DAG запуститься завтра не раніше.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    І більше жодних проблем.

    З ним же пов'язана ще одна помилка виконання: Task is missing the start_date parameter, Що найчастіше говорить про те, що ви забули прив'язати до оператора даг.

  • Все на одній машині. Так, і бази (самого Airflow та нашої обмазки), і веб-сервер, і планувальник, і воркери. І вона навіть працювала. Але згодом кількість завдань у сервісів зростала, і коли PostgreSQL став віддавати відповідь за індексом за 20 с замість 5 мс, ми її взяли та забрали.
  • LocalExecutor. Так, ми сидимо на ньому досі, і ми вже підійшли до краю прірви. LocalExecutor'а нам досі вистачало, але зараз настав час розширитися мінімум одним воркером, і доведеться піднапрягтися, щоб переїхати на CeleryExecutor. А через те, що з ним можна працювати і на одній машині, то нічого не зупиняє від використання Celery навіть на сервері, який «природно, ніколи не піде в прод, чесслово!»
  • Невикористання вбудованих засобів:
    • Зв'язки для зберігання облікових даних сервісів,
    • SLA Misses для реагування на таски, які не відпрацювали вчасно,
    • XCom для обміну метаданими (я сказав метаданими!) між тягами дага.
  • Зловживання поштою. Ну, що тут сказати? Були налаштовані оповіщення на всі повтори тяганів. Тепер у моєму робочому Gmail >90k листів від Airflow, і веб-морда пошти відмовляється брати та видаляти більше ніж по 100 штук за раз.

Більше підводного каміння: Apache Airflow Pitfails

Засоби ще більшої автоматизації

Для того, щоб нам ще більше працювати головою, а не руками, Airflow заготовила для нас ось що:

  • REST API - Він досі має статус Experimental, що не заважає йому працювати. З його допомогою можна не тільки отримувати інформацію про даги та тяги, але зупинити/запустити даг, створити DAG Run або пул.
  • CLI — через командний рядок доступні багато засобів, які не просто незручні у користуванні через WebUI, а взагалі відсутні. Наприклад:
    • backfill необхідний повторного запуску інстансів тяган.
      Наприклад, прийшли аналітики, кажуть: «А у вас, товаришу, нісенітниця в даних із 1 по 13 січня! Чині-чіні-чіні-чіні!». А ти такий хоба:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Обслуговування бази: initdb, resetdb, upgradedb, checkdb.
    • run, який дозволяє запустити один інстанс таска, та ще й забити на все залежності. Більше того, можна запустити його через LocalExecutorнавіть якщо у вас Celery-кластер.
    • Приблизно те саме робить test, Тільки ще і в базах нічого не пише.
    • connections дозволяє масово створювати підключення із шеллу.
  • API Python - Досить хардкорний спосіб взаємодії, який призначений для плагінів, а не копошення в ньому ручками. Але хто ж нам завадить піти у /home/airflow/dags, запустити ipython і почати свавілля? Можна, наприклад, експортувати всі підключення таким кодом:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Підключення до бази метаданих Airflow. Писати в неї я не рекомендую, а от діставати стани тяган для різних специфічних метрик можна значно швидше і простіше, ніж через будь-який з API.

    Скажімо, далеко не всі наші таски ідемпотентні, а іноді можуть падати і це нормально. Але кілька завалів — це вже підозріло, і треба було б перевірити.

    Обережно, SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

Посилання

Ну і звичайно перші десять посилань з видачі гугла вміст папки Airflow з моїх закладок.

І посилання, задіяні у статті:

Джерело: habr.com