Привіт, я Дмитро Логвиненко – Data Engineer відділу аналітики групи компаній «Везе».
Я розповім вам про чудовий інструмент розробки ETL-процесів — Apache Airflow. Але Airflow настільки універсальний і багатогранний, що вам варто придивитися до нього навіть якщо ви не займаєтеся потоками даних, а потребуєте періодично запускати будь-які процеси і стежити за їх виконанням.
І так, я не лише розповідатиму, а й показуватиму: у програмі багато коду, скріншотів та рекомендацій.

Що зазвичай бачиш, коли гуглиш слово Airflow / Wikimedia Commons
Зміст
Запровадження
Apache Airflow - він прямо як Django:
- написаний на Python,
- є відмінна адмінка,
- необмежено розширюємо,
- тільки краще, та й зроблено зовсім для інших цілей, а саме (як написано до ката):
- запуск та моніторинг завдань на необмеженій кількості машин (скільки вам дозволить Celery/Kubernetes та ваша совість)
- з динамічною генерацією workflow з дуже легкого для написання та сприйняття Python-коду
- і можливістю пов'язувати один з одним будь-які бази даних та API за допомогою як готових компонентів, так і саморобних плагінів (що робиться надзвичайно просто).
Ми використовуємо Apache Airflow так:
- збираємо дані з різних джерел (безліч інстансів SQL Server і PostgreSQL, різні API з метриками додатків, навіть 1С) в DWH і ODS (у нас це Vertica і Clickhouse).
- як просунутий
cron, який запускає процеси консолідації даних на ODS, а також слідкує за їх обслуговуванням.
Донедавна наші потреби покривав один невеликий сервер на 32 ядрах та 50 GB оперативної пам'яті. В Airflow при цьому працює:
- більш 200 дагов (Власне workflows, в які ми набили завдання),
- у кожному в середньому по 70 тяган,
- запускається це добро (теж у середньому) раз на годину.
А про те, як ми розширювалися, я напишу нижче, а зараз давайте визначимо über-завдання, яке ми вирішуватимемо:
Є три вихідні SQL Server'и, на кожному по 50 баз даних - інстансів одного проекту, відповідно, структура у них однакова (майже скрізь, муа-ха-ха), а значить у кожній є таблиця Orders (благо таблицю з такою назвою можна заштовхати у будь-який бізнес). Ми забираємо дані, додаючи службові поля (сервер-джерело, база-джерело, ідентифікатор ETL-завдання) і наївним чином кинемо їх у, скажімо, Vertica.
Поїхали!
Частина основна, практична (і трохи теоретична)
Навіщо воно нам (і вам)
Коли дерева були великі, а я був простим SQL-щиком в одному російському рітейлі, ми шпарили ETL-процеси aka потоки даних за допомогою двох доступних нам засобів:
- Informatica Power Center - Вкрай розлога система, надзвичайно продуктивна, зі своїми залозками, власним версіонуванням. Використав я дай боже 1% її можливостей. Чому? Ну, по-перше, цей інтерфейс десь із нульових психічно тиснув на нас. По-друге, ця штуковина заточена під надзвичайно наворочені процеси, шалене перевикористання компонентів та інші дуже-важливі-ентерпрайз-фішечки. Про те, що стоїть вона, як крило Airbus A380/рік, ми промовчимо.
Обережно, скріншот може зробити людям молодше 30 трохи боляче

- SQL Server Integration Server — цим товаришем ми користувалися у своїх внутрішньопроектних потоках. Ну а насправді: SQL Server ми вже використовуємо, і не користуватися його ETL-тулзи було б якось нерозумно. Все в ньому добре: і інтерфейс красивий, і звіти виконання ... Але не за це ми любимо програмні продукти, ох не за це. Версіонувати його
dtsx(який являє собою XML з нодами, що перемішуються при збереженні) ми можемо, а толку? А зробити пакет тяган, який перетягне сотню таблиць з одного сервера на інший? Так що сотню, у вас від двадцяти штук відвалиться вказівний палець, що клацає мишачою кнопкою. Але виглядає він, безперечно, більш модно:
Ми, безумовно, шукали виходи. Справа навіть майже дійшло до самописного генератора SSIS-пакетів.
…а потім мене знайшла нова робота. А на ній мене наздогнав Apache Airflow.
Коли я дізнався, що описи ETL-процесів – це простий Python-код, я щойно не танцював від радості. Ось так потоки даних зазнали версіонування та дифу, а зсипати таблиці з єдиною структурою з сотні баз даних в один таргет стало справою Python-коду в півтора-два 13” екрани.
Збираємо кластер
Давайте не влаштовувати зовсім дитячий садок, і не говорити тут про зовсім очевидні речі, на кшталт установки Airflow, обраної вами БД, Celery та інших справ, описаних у доках.
Щоб ми могли відразу розпочати експерименти, я накидав docker-compose.yml в якому:
- Піднімемо власне Повітряний потік: Scheduler, Webserver. Там же буде крутитися Flower для моніторингу Celery-задач (бо його вже заштовхали в
apache/airflow:1.10.10-python3.7, А ми і не проти); - PostgreSQL, в який Airflow писатиме свою службову інформацію (дані планувальника, статистика виконання і т. д.), а Celery - відзначати завершені таски;
- Redisякий виступатиме брокером завдань для Celery;
- Celery worker, що й займеться безпосереднім виконанням завдань.
- У папку
./dagsми будемо складати наші файли з описом дагов. Вони підхоплюватимуться на льоту, тому пересмикувати весь стек після кожного чіха не потрібно.
Де-не-де код у прикладах наведено не повністю (щоб не захаращувати текст), а десь він модифікується в процесі. Цілісні працюючі приклади коду можна подивитися в репозиторії .
докер-компост.імл
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerПримітки:
- У складанні композу я багато в чому спирався на відомий образ – обов'язково подивіться. Може, вам у житті більше нічого не знадобиться.
- Усі налаштування Airflow доступні не тільки через
airflow.cfgале через змінні середовища (слава розробникам), чим я злісно скористався. - Природно, він не production-ready: я навмисно не ставив heartbeats на контейнери, не морочився з безпекою. Але щонайменше підходящий для наших експериментиків я зробив.
- Зверніть увагу, що:
- Папка з дагами має бути доступна як планувальнику, так і воркерам.
- Те саме стосується й усіх сторонніх бібліотек — всі вони мають бути встановлені на машини з шедулером та воркерами.
Ну а тепер просто:
$ docker-compose up --scale worker=3Після того, як все підніметься, можна дивитися на веб-інтерфейси:
- Повітряний потік:
- Квітка:
Основні поняття
Якщо ви нічого не зрозуміли у всіх цих «дагах», то ось короткий словничок:
- Планувальник — найголовніший дядько в Airflow, який контролює, щоб працювали роботи, а не людина: стежить за розкладом, оновлює даги, запускає таски.
Взагалі, у старих версіях, у нього були проблеми з пам'яттю (ні, не амнезія, а витоку) і в конфігах навіть залишився легасі-параметр
run_duration- Інтервал його перезапуску. Але зараз все добре. - DAG (він же "даг") - "спрямований ациклічний граф", але таке визначення мало кому що скаже, а по суті це контейнер для тягарів, що взаємодіють один з одним (див. нижче) або аналог Package в SSIS і Workflow в Informatica.
Крім дагів ще можуть бути сабдаги, але ми до них швидше за все не дістанемося.
- DAG Run - Ініціалізований даг, якому присвоєно свій
execution_date. Даграни одного дага можуть працювати паралельно (якщо ви, звичайно, зробили свої таски ідемпотентними). - Оператор - це шматочки коду, відповідальні за виконання будь-якої конкретної дії. Є три типи операторів:
- діятинаприклад наш улюблений
PythonOperator, який може виконати будь-який (валідний) Python-код; - переклад, які перевозять дані з місця на місце, скажімо,
MsSqlToHiveTransfer; - датчик ж дозволить реагувати або пригальмувати подальше виконання дагу до настання будь-якої події.
HttpSensorможе смикати вказаний ендпойнт, і коли дочекається відповідь, запустити трансферGoogleCloudStorageToS3Operator. Допитливий розум запитає: «Навіщо? Адже можна робити повтори прямо в операторі! А потім, щоб не забивати пул тасків операторами, що підвисли. Сенсор запускається, перевіряє та вмирає до наступної спроби.
- діятинаприклад наш улюблений
- Завдання — оголошені оператори незалежно від типу та прикріплені до дагу підвищуються до чину таска.
- Task instance — коли генерал-планувальник вирішив, що таски настав час відправляти в бій на виконавці-воркери (прямо на місці, якщо ми використовуємо
LocalExecutorабо на віддалену ноду у випадку зCeleryExecutor), він призначає їм контекст (тобто комплект змінних - параметрів виконання), розгортає шаблони команд чи запитів і складає в пул.
Генеруємо таски
Спершу позначимо загальну схему нашого дага, а потім все більше і більше занурюватимемося в деталі, тому що ми застосовуємо деякі нетривіальні рішення.
Отже, у найпростішому вигляді такий даг виглядатиме так:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Давайте розбиратися:
- Спершу імпортуємо потрібні ліби та дещо ще;
sql_server_ds- цеList[namedtuple[str, str]]з іменами коннектів з Airflow Connections та базами даних з яких ми забиратимемо нашу табличку;dag- оголошення нашого дага, яке обов'язково має лежати вglobals(), інакше Airflow його не знайде. Дагу також слід сказати:- що його звуть
orders— це ім'я потім маячить у веб-інтерфейсі, - що працюватиме він, починаючи з півночі восьмого липня,
- а запускати він повинен приблизно кожні 6 годин (для крутих хлопців тут замість
timedelta()допустимаcron-рядок0 0 0/6 ? * * *для менш крутих - вираз начебто@daily);
- що його звуть
workflow()робитиме основну роботу, але не зараз. Зараз ми просто висипаємо наш контекст у балку.- А тепер проста магія створення тасків:
- пробігаємо нашими джерелами;
- ініціалізуємо
PythonOperator, який буде виконувати нашу пустушкуworkflow(). Не забувайте вказувати унікальне (в рамках дага) ім'я таска та підв'язувати сам даг. Прапорprovide_contextу свою чергу насипатиме в функцію додаткових аргументів, які ми дбайливо зберемо за допомогою**context.
Поки що на цьому все. Що ми отримали:
- новий даг у веб-інтерфейсі,
- півтори сотні тяг, які будуть виконуватися паралельно (якщо дозволять налаштування Airflow, Celery і потужності серверів).
Ну майже отримали.

Залежно хто ставитиме?
Щоб всю цю справу спростити я вкорячив у docker-compose.yml обробку requirements.txt на всіх нодах.
Ось тепер помчала:

Сірі квадратики - task instances, оброблені планувальником.
Трохи чекаємо, завдання розхоплюють воркери:

Зелені, ясна річ, успішно відпрацювали. Червоні – не дуже успішно.
До речі, на нашому праді жодної папки
./dags, що синхронізується між машинами немає - всі даги лежать уgitна нашому Gitlab, а Gitlab CI розкладає оновлення на машини при мерджіmaster.
Трохи про Flower
Поки воркери молотять наші тасочки-пустушки, згадаємо ще один інструмент, який може нам дещо показати — Flower.
Найперша сторінка із сумарною інформацією по нодам-воркерам:

Найнасиченіша сторінка із завданнями, що вирушили в роботу:

Найнудніша сторінка зі станом нашого брокера:

Найяскравіша сторінка — з графіками стану тяган та їх часом виконання:

Довантажуємо недовантажене
Отже, всі таски відпрацювали, можна забирати поранених.

А поранених виявилося чимало — з тих чи інших причин. У разі правильного використання Airflow ось ці квадрати говорять про те, що дані безумовно не доїхали.
Потрібно дивитися лог і перезапускати task instances, що впали.
Жм'якнувши на будь-який квадрат, побачимо доступні нам дії:

Можна взяти, і зробити Clear впав. Тобто ми забуваємо про те, що там щось завалилося, і той самий інстанс таска піде планувальнику.

Зрозуміло, що робити так мишкою з усіма червоними квадратами не дуже гуманно – не на це ми чекаємо від Airflow. Звичайно, у нас є зброя масової поразки: Browse/Task Instances

Виберемо все разом і обнулимо натиснемо правильний пункт:

Після очищення наші таксі виглядають так (вони вже чекають не дочекаються, коли шедулер їх запланує):

З'єднання, хуки та інші змінні
Саме час подивитися на наступний DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Адже все-таки робили обновлялку звітів? Це знову вона: є перелік джерел, звідки забрати дані; є перелік, куди покласти; не забуваємо посигналити, коли все трапилося чи зламалося (ну це не про нас, ні).
Давайте знову пройдемося файлом і подивимося на нові незрозумілі штуки:
from commons.operators import TelegramBotSendMessage— нам ніщо не заважає робити свої оператори, ніж ми й скористалися, зробивши невелику обгортку для надсилання повідомлень до Розблокованого. (Про цей оператор ми ще поговоримо нижче);default_args={}— даг може роздавати одні й самі аргументи всім своїм операторам;to='{{ var.value.all_the_kings_men }}'- полеtoу нас буде не захардшкіреним, а формованим динамічно за допомогою Jinja та змінної зі списком email-ів, яку я дбайливо поклав уAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS- Умова запуску оператора. У нашому випадку лист полетить босам тільки якщо всі залежності відпрацювали. успішно;tg_bot_conn_id='tg_main'- аргументиconn_idприймають у собі ідентифікатори з'єднань, які ми створюємо вAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED— повідомлення в Telegram відлетять тільки за наявності тягаків, що впали;task_concurrency=1- Забороняємо одночасний запуск кількох task instances одного тяга. В іншому випадку, ми отримаємо одночасний запуск кількохVerticaOperator(Дивляться одну таблицю);report_update >> [email, tg]- УсеVerticaOperatorзійдуться у надсиланні листа та повідомлення, ось так:

Але оскільки оператори-нотифікатори мають різні умови запуску, працюватиме лише один. У Tree View все виглядає дещо менш наочно:

Скажу пару слів про макросах та їхніх друзів змінних.
Макроси - це Jinja-плейсхолдери, які можуть підставляти різну корисну інформацію аргументи операторів. Наприклад, так:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} розгорнеться у вміст змінної контексту execution_date у форматі YYYY-MM-DD: 2020-07-14. Найприємніше, що змінні контексту прибиваються цвяхами до певного інстансу тяга (квадратику в Tree View), і при перезапуску плейсхолдери розкриються в ті самі значення.
Призначені значення можна дивитися за допомогою кнопки Rendered на кожному таск-інстансі. Ось так біля тяга з відправкою листа:

А так біля таски з відправкою повідомлення:

Повний список вбудованих макросів для останньої доступної версії доступний тут:
Більше того, за допомогою плагінів ми можемо оголошувати власні макроси, але це вже зовсім інша історія.
Крім зумовлених штук, ми можемо підставляти значення своїх змінних (вище в коді я вже скористався цим). Створимо в Admin/Variables пару штук:

Все, можна скористатися:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')У значенні може бути скаляр, а може бути і JSON. У випадку JSON-а:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}просто використовуємо шлях до потрібного ключа: {{ var.json.bot_config.bot.token }}.
Скажу буквально одне слово і покажу один скріншот про з'єднання. Тут все просто: на сторінці Admin/Connections створюємо з'єднання, складаємо туди наші логіни/паролі та більш специфічні параметри. Ось так:

Паролі можна шифрувати (ретельніше, ніж у варіанті за умовчанням), а можна не вказувати тип з'єднання (як я зробив для tg_main) - Справа в тому, що список типів зашитий в моделях Airflow і розширенню без влазіння в вихідні джерела не піддається (якщо раптом я чогось не догуглил - прошу мене поправити), але отримати креди просто на ім'я нам ніщо не завадить.
А ще можна зробити кілька з'єднань з одним ім'ям: у такому разі метод BaseHook.get_connection(), який дістає нам з'єднання на ім'я, буде віддавати випадкового з кількох тезок (було б логічніше зробити Round Robin, але залишимо це на совісті розробників Airflow).
Variables і Connections, безумовно, класні засоби, але важливо не втратити баланс: які частини ваших потоків ви зберігаєте власне в коді, а які віддаєте на зберігання Airflow. З одного боку швидко змінити значення, наприклад, ящик розсилки, може бути зручно через UI. А з іншого — це все-таки повернення до мишеклика, якого ми (я) хотіли позбутися.
Робота зі з'єднаннями – це одне із завдань хуків. Взагалі хуки Airflow - це точки підключення його до сторонніх сервісів та бібліотек. Наприклад, JiraHook відкриє нам клієнт взаємодії з Jira (можна завдання подвигать туди-сюди), а з допомогою SambaHook можна запустити локальний файл на smb-Точку.
Розбираємо кастомний оператор
І ми впритул підібралися до того, щоб подивитися на те, як зроблено TelegramBotSendMessage
Код commons/operators.py з власне оператором:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Тут, як і решта Airflow, все дуже просто:
- Відспадкували від
BaseOperator, який реалізує досить багато Airflow-специфічних штук (подивіться на дозвіллі) - Оголосили поля
template_fields, в яких Jinja шукатиме макроси для обробки. - Організували правильні аргументи для
__init__(), Розставили замовчування, де треба. - Про ініціалізацію предка теж не забули.
- Відкрили відповідний хук
TelegramBotHookотримали від нього об'єкт-клієнт. - Оверрайднули (перевизначили) метод
BaseOperator.execute(), який Airfow буде пересмикувати, коли настане час запускати оператор - в ньому ми і реалізуємо основну дію, забувши залогуватися. (Лоґуємося, до речі, прямо вstdoutиstderr- Airflow все перехопить, красиво оберне, розкладе, куди треба.)
Давайте дивитися, що в нас у commons/hooks.py. Перша частина файлика, із самим хуком:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientЯ навіть не знаю, що тут можна пояснювати, просто зазначу важливі моменти:
- Наслідуємо, думаємо над аргументами — у більшості випадків він буде один:
conn_id; - Перевизначаємо стандартні методи: я обмежився
get_conn(), в якому я отримую параметри з'єднання на ім'я і всього лише дістаю секціюextra(це поле для JSON), в яку я (за своєю ж інструкцією!) поклав токен Telegram-бота:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Створю екземпляр нашого
TelegramBot, віддаючи йому конкретний токен.
От і все. Отримати клієнт з хука можна за допомогою TelegramBotHook().clent або TelegramBotHook().get_conn().
І друга частина файлика, в якому я зробити мікрообгортку для Telegram REST API, щоб не тягнути той же заради одного методу sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Правильний шлях - скласти все це:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- У плагін, покласти в загальнодоступний репозиторій, і віддати в Open Source.
Поки ми все це вивчали, наші оновлення звітів встигли успішно завалитися і надіслати мені канал повідомлення про помилку. Піду перевіряти, що знову не так…

У нашому дазі щось зламалося! А чи не на це ми чекали? Саме!
Наливати будеш?
Чи відчуваєте, що щось я пропустив? Як би обіцяв дані з SQL Server в Vertica переливати, і тут взяв і з'їхав з теми, негідник!
Злочинство це було навмисним, я просто повинен був розшифрувати вам деяку термінологію. Тепер можна їхати далі.
План у нас був такий:
- Зробити даг
- Нагенерувати таски
- Подивитися, як все красиво
- Надавати заливкам номери сесій
- Забрати дані з SQL Server
- Покласти дані у Vertica
- Зібрати статистику
Отже, щоби все це запустити, я зробив маленьке доповнення до нашого docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyТам ми піднімаємо:
- Vertica як хост
dwhз дефолтними налаштуваннями, - три екземпляри SQL Server,
- наповнюємо бази в останніх деякими даними (у жодному разі не заглядайте в
mssql_init.py!)
Запускаємо все добро за допомогою трохи складнішої, ніж минулого разу, команди:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Що нагенерував наш чудорандомайзер, можна, скориставшись пунктом Data Profiling/Ad Hoc Query:

Головне, не показувати це аналітикам
Детально зупинятися на ETL-сесіях я не буду, там все тривіально: робимо базу, в ній табличку, обертаємо все менеджером контексту, і тепер робимо так:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passнастала пора забрати наші дані з наших півтори сотні таблиць. Зробимо це за допомогою дуже невигадливих рядків:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- За допомогою хука отримаємо з Airflow
pymssql-коннект - У запит підставимо обмеження у вигляді дати - у функцію її підкине шаблонизатор.
- Згодовуємо наш запит
pandas, який дістане для насDataFrame- Він нам знадобиться надалі.
Я використовую підстановку
{dt}замість параметра запиту%sне тому, що я злий Буратіно, а тому щоpandasне може впоратися зpymssqlі підсовує останньомуparams: Listхоча той дуже хочеtuple.
Також зверніть увагу, що розробникpymssqlвирішив більше його не підтримувати, і саме час з'їхати наpyodbc.
Подивимося, чим Airflow нашпигував аргументи наших функцій:

Якщо даних не виявилося, то немає сенсу продовжувати. Але вважати заливання успішним теж дивно. Але це не помилка. А-а-а, що робити? А ось що:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException скаже Airflow, що помилки, власне, немає, а таск ми пропускаємо. В інтерфейсі буде не зелений і червоний квадратик, а кольору pink.
Підкинемо нашим даним кілька колонок:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])А саме:
- БД, з якої ми забрали замовлення,
- Ідентифікатор нашої сесії (вона буде різною) на кожен таск),
- Хеш від джерела та ідентифікатора замовлення – щоб у кінцевій базі (де все ссипеться в одну таблицю) ми мали унікальний ідентифікатор замовлення.
Залишився передостанній крок: залити все у Vertica. А, як не дивно, один із найефективніших ефективних способів зробити це через CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Ми робимо спецприймач
StringIO. pandasлюб'язно складе в нього нашDataFrameу виглядіCSV-Рядок.- Відкриємо з'єднання до нашої коханої Vertica хуком.
- А тепер за допомогою
copy()відправимо наші дані прямо до Вертики!
З драйвера заберемо, скільки рядків засипалося, і скажемо менеджеру сесії, що все ОК:
session.loaded_rows = cursor.rowcount
session.successful = TrueОт і все.
На виробі ми створюємо цільову табличку вручну. Тут я дозволив собі невеликий автомат:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)Я за допомогою
VerticaOperator()створюю схему БД та таблицю (якщо їх ще немає, природно). Головне, правильно розставити залежності:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadПідводимо підсумки
— Ну ось, — сказала мишеня, — чи не так, тепер
Ти переконався, що в лісі я найстрашніший звір?
Джулія Дональдсон, «Груффало»
Думаю, якби ми з моїми колегами влаштували змагання: хто швидше складе і запустить з нуля ETL-процес: вони зі своїми SSIS і мишкою і я з Airflow... А потім ми ще порівняли зручність супроводу... Ух, думаю, ви погодитеся, що я обійду їх по всіх напрямках!
Якщо ж трохи серйозніше, то Apache Airflow - за рахунок опису процесів у вигляді програмного коду - зробив мою роботу набагато зручніше та приємніше.
Його ж необмежена розширюваність: як у плані плагінів, так і схильність до масштабованості - дає вам можливість застосовувати Airflow практично в будь-якій галузі: хоч у повному циклі збору, підготовки та обробки даних, хоч у запуску ракет (на Марс, звичайно ж).
Частина заключна, довідково-інформаційна
Граблі, які ми зібрали за вас
start_date. Так, це вже локальний мемасик. Через головний аргумент дагаstart_dateпроходять усі. Коротко, якщо вказати вstart_dateпоточну дату, а вschedule_interval- Один день, то DAG запуститься завтра не раніше.start_date = datetime(2020, 7, 7, 0, 1, 2)І більше жодних проблем.
З ним же пов'язана ще одна помилка виконання:
Task is missing the start_date parameter, Що найчастіше говорить про те, що ви забули прив'язати до оператора даг.- Все на одній машині. Так, і бази (самого Airflow та нашої обмазки), і веб-сервер, і планувальник, і воркери. І вона навіть працювала. Але згодом кількість завдань у сервісів зростала, і коли PostgreSQL став віддавати відповідь за індексом за 20 с замість 5 мс, ми її взяли та забрали.
- LocalExecutor. Так, ми сидимо на ньому досі, і ми вже підійшли до краю прірви. LocalExecutor'а нам досі вистачало, але зараз настав час розширитися мінімум одним воркером, і доведеться піднапрягтися, щоб переїхати на CeleryExecutor. А через те, що з ним можна працювати і на одній машині, то нічого не зупиняє від використання Celery навіть на сервері, який «природно, ніколи не піде в прод, чесслово!»
- Невикористання вбудованих засобів:
- Зв'язки для зберігання облікових даних сервісів,
- SLA Misses для реагування на таски, які не відпрацювали вчасно,
- XCom для обміну метаданими (я сказав метаданими!) між тягами дага.
- Зловживання поштою. Ну, що тут сказати? Були налаштовані оповіщення на всі повтори тяганів. Тепер у моєму робочому Gmail >90k листів від Airflow, і веб-морда пошти відмовляється брати та видаляти більше ніж по 100 штук за раз.
Більше підводного каміння:
Засоби ще більшої автоматизації
Для того, щоб нам ще більше працювати головою, а не руками, Airflow заготовила для нас ось що:
- - Він досі має статус Experimental, що не заважає йому працювати. З його допомогою можна не тільки отримувати інформацію про даги та тяги, але зупинити/запустити даг, створити DAG Run або пул.
- — через командний рядок доступні багато засобів, які не просто незручні у користуванні через WebUI, а взагалі відсутні. Наприклад:
backfillнеобхідний повторного запуску інстансів тяган.
Наприклад, прийшли аналітики, кажуть: «А у вас, товаришу, нісенітниця в даних із 1 по 13 січня! Чині-чіні-чіні-чіні!». А ти такий хоба:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Обслуговування бази:
initdb,resetdb,upgradedb,checkdb. run, який дозволяє запустити один інстанс таска, та ще й забити на все залежності. Більше того, можна запустити його черезLocalExecutorнавіть якщо у вас Celery-кластер.- Приблизно те саме робить
test, Тільки ще і в базах нічого не пише. connectionsдозволяє масово створювати підключення із шеллу.
- - Досить хардкорний спосіб взаємодії, який призначений для плагінів, а не копошення в ньому ручками. Але хто ж нам завадить піти у
/home/airflow/dags, запуститиipythonі почати свавілля? Можна, наприклад, експортувати всі підключення таким кодом:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Підключення до бази метаданих Airflow. Писати в неї я не рекомендую, а от діставати стани тяган для різних специфічних метрик можна значно швидше і простіше, ніж через будь-який з API.
Скажімо, далеко не всі наші таски ідемпотентні, а іноді можуть падати і це нормально. Але кілька завалів — це вже підозріло, і треба було б перевірити.
Обережно, SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
Посилання
Ну і звичайно перші десять посилань з видачі гугла вміст папки Airflow з моїх закладок.
- - Звичайно, треба почати з оф. документації, але хто читає інструкції?
- - Ну хоча б рекомендації від творців прочитайте.
- — саме початок: інтерфейс користувача в картинках
- — добре розписані базові поняття, якщо ви (раптом!) щось не зрозуміли в мене.
- - Короткий гайд з налаштування Airflow-кластера.
- — майже така ж цікава стаття, хіба що більшого формалізму, а менших прикладів.
- - Про роботу у зв'язці з Celery.
- — про ідемпотентність тяг, завантаження по ID замість дати, трансформації, структуру файлів та інші цікаві речі.
- — залежності тяган та Trigger Rule, які я згадав лише побіжно.
- — як долати деякі «працює, як задумано» у планувальника, завантажувати втрачені дані та розставляти пріоритети тяган.
- - корисні SQL-запити до метаданих Airflow.
- - Є корисний розділ для створення кастомного сенсора.
- - Цікава коротка нотатка про побудову інфраструктури на AWS для Data Science.
- — поширені помилки (коли дехто не читає інструкцій).
- — посміхніться, як люди милить зберігання паролів, хоча можна просто використовувати Connections.
- - неявний проміжок DAG, закидання контексту на функції, знову про залежності, а також про пропуск запусків тяг.
- - Про використання
default argumentsиparamsу шаблонах, а також про Variables та Connections. - - Розповідь про те, як планувальник готують до Airflow 2.0.
- - Трохи застаріла стаття про деплой нашого кластера в
docker-compose. - — динамічний таск за допомогою шаблонів та прокидання контексту.
- — стандартні та кастомні оповіщення поштою та Slack.
- — Розгалуження тяг, макроси та XCom.
І посилання, задіяні у статті:
- — доступні для використання у шаблонах плейсхолдери.
- — Поширені помилки під час створення дагов.
- -
docker-composeдля експериментів, налагодження та не тільки. - - Python-обгортка для Telegram REST API.
Джерело: habr.com




