Привіт, я Дмитро Логвиненко – Data Engineer відділу аналітики групи компаній «Везе».
Я розповім вам про чудовий інструмент розробки ETL-процесів — Apache Airflow. Але Airflow настільки універсальний і багатогранний, що вам варто придивитися до нього навіть якщо ви не займаєтеся потоками даних, а потребуєте періодично запускати будь-які процеси і стежити за їх виконанням.
І так, я не лише розповідатиму, а й показуватиму: у програмі багато коду, скріншотів та рекомендацій.
Що зазвичай бачиш, коли гуглиш слово Airflow / Wikimedia Commons
- тільки краще, та й зроблено зовсім для інших цілей, а саме (як написано до ката):
запуск та моніторинг завдань на необмеженій кількості машин (скільки вам дозволить Celery/Kubernetes та ваша совість)
з динамічною генерацією workflow з дуже легкого для написання та сприйняття Python-коду
і можливістю пов'язувати один з одним будь-які бази даних та API за допомогою як готових компонентів, так і саморобних плагінів (що робиться надзвичайно просто).
Ми використовуємо Apache Airflow так:
збираємо дані з різних джерел (безліч інстансів SQL Server і PostgreSQL, різні API з метриками додатків, навіть 1С) в DWH і ODS (у нас це Vertica і Clickhouse).
як просунутий cron, який запускає процеси консолідації даних на ODS, а також слідкує за їх обслуговуванням.
Донедавна наші потреби покривав один невеликий сервер на 32 ядрах та 50 GB оперативної пам'яті. В Airflow при цьому працює:
більш 200 дагов (Власне workflows, в які ми набили завдання),
у кожному в середньому по 70 тяган,
запускається це добро (теж у середньому) раз на годину.
А про те, як ми розширювалися, я напишу нижче, а зараз давайте визначимо über-завдання, яке ми вирішуватимемо:
Є три вихідні SQL Server'и, на кожному по 50 баз даних - інстансів одного проекту, відповідно, структура у них однакова (майже скрізь, муа-ха-ха), а значить у кожній є таблиця Orders (благо таблицю з такою назвою можна заштовхати у будь-який бізнес). Ми забираємо дані, додаючи службові поля (сервер-джерело, база-джерело, ідентифікатор ETL-завдання) і наївним чином кинемо їх у, скажімо, Vertica.
Поїхали!
Частина основна, практична (і трохи теоретична)
Навіщо воно нам (і вам)
Коли дерева були великі, а я був простим SQL-щиком в одному російському рітейлі, ми шпарили ETL-процеси aka потоки даних за допомогою двох доступних нам засобів:
Informatica Power Center - Вкрай розлога система, надзвичайно продуктивна, зі своїми залозками, власним версіонуванням. Використав я дай боже 1% її можливостей. Чому? Ну, по-перше, цей інтерфейс десь із нульових психічно тиснув на нас. По-друге, ця штуковина заточена під надзвичайно наворочені процеси, шалене перевикористання компонентів та інші дуже-важливі-ентерпрайз-фішечки. Про те, що стоїть вона, як крило Airbus A380/рік, ми промовчимо.
Обережно, скріншот може зробити людям молодше 30 трохи боляче
SQL Server Integration Server — цим товаришем ми користувалися у своїх внутрішньопроектних потоках. Ну а насправді: SQL Server ми вже використовуємо, і не користуватися його ETL-тулзи було б якось нерозумно. Все в ньому добре: і інтерфейс красивий, і звіти виконання ... Але не за це ми любимо програмні продукти, ох не за це. Версіонувати його dtsx (який являє собою XML з нодами, що перемішуються при збереженні) ми можемо, а толку? А зробити пакет тяган, який перетягне сотню таблиць з одного сервера на інший? Так що сотню, у вас від двадцяти штук відвалиться вказівний палець, що клацає мишачою кнопкою. Але виглядає він, безперечно, більш модно:
Ми, безумовно, шукали виходи. Справа навіть майже дійшло до самописного генератора SSIS-пакетів.
…а потім мене знайшла нова робота. А на ній мене наздогнав Apache Airflow.
Коли я дізнався, що описи ETL-процесів – це простий Python-код, я щойно не танцював від радості. Ось так потоки даних зазнали версіонування та дифу, а зсипати таблиці з єдиною структурою з сотні баз даних в один таргет стало справою Python-коду в півтора-два 13” екрани.
Збираємо кластер
Давайте не влаштовувати зовсім дитячий садок, і не говорити тут про зовсім очевидні речі, на кшталт установки Airflow, обраної вами БД, Celery та інших справ, описаних у доках.
Щоб ми могли відразу розпочати експерименти, я накидав docker-compose.yml в якому:
Піднімемо власне Повітряний потік: Scheduler, Webserver. Там же буде крутитися Flower для моніторингу Celery-задач (бо його вже заштовхали в apache/airflow:1.10.10-python3.7, А ми і не проти);
PostgreSQL, в який Airflow писатиме свою службову інформацію (дані планувальника, статистика виконання і т. д.), а Celery - відзначати завершені таски;
Redisякий виступатиме брокером завдань для Celery;
Celery worker, що й займеться безпосереднім виконанням завдань.
У папку ./dags ми будемо складати наші файли з описом дагов. Вони підхоплюватимуться на льоту, тому пересмикувати весь стек після кожного чіха не потрібно.
Де-не-де код у прикладах наведено не повністю (щоб не захаращувати текст), а десь він модифікується в процесі. Цілісні працюючі приклади коду можна подивитися в репозиторії https://github.com/dm-logv/airflow-tutorial.
У складанні композу я багато в чому спирався на відомий образ puckel/docker-airflow – обов'язково подивіться. Може, вам у житті більше нічого не знадобиться.
Усі налаштування Airflow доступні не тільки через airflow.cfgале через змінні середовища (слава розробникам), чим я злісно скористався.
Природно, він не production-ready: я навмисно не ставив heartbeats на контейнери, не морочився з безпекою. Але щонайменше підходящий для наших експериментиків я зробив.
Зверніть увагу, що:
Папка з дагами має бути доступна як планувальнику, так і воркерам.
Те саме стосується й усіх сторонніх бібліотек — всі вони мають бути встановлені на машини з шедулером та воркерами.
Ну а тепер просто:
$ docker-compose up --scale worker=3
Після того, як все підніметься, можна дивитися на веб-інтерфейси:
Якщо ви нічого не зрозуміли у всіх цих «дагах», то ось короткий словничок:
Планувальник — найголовніший дядько в Airflow, який контролює, щоб працювали роботи, а не людина: стежить за розкладом, оновлює даги, запускає таски.
Взагалі, у старих версіях, у нього були проблеми з пам'яттю (ні, не амнезія, а витоку) і в конфігах навіть залишився легасі-параметр run_duration - Інтервал його перезапуску. Але зараз все добре.
DAG (він же "даг") - "спрямований ациклічний граф", але таке визначення мало кому що скаже, а по суті це контейнер для тягарів, що взаємодіють один з одним (див. нижче) або аналог Package в SSIS і Workflow в Informatica.
Крім дагів ще можуть бути сабдаги, але ми до них швидше за все не дістанемося.
DAG Run - Ініціалізований даг, якому присвоєно свій execution_date. Даграни одного дага можуть працювати паралельно (якщо ви, звичайно, зробили свої таски ідемпотентними).
Оператор - це шматочки коду, відповідальні за виконання будь-якої конкретної дії. Є три типи операторів:
діюнаприклад наш улюблений PythonOperator, який може виконати будь-який (валідний) Python-код;
переклад, які перевозять дані з місця на місце, скажімо, MsSqlToHiveTransfer;
датчик ж дозволить реагувати або пригальмувати подальше виконання дагу до настання будь-якої події. HttpSensor може смикати вказаний ендпойнт, і коли дочекається відповідь, запустити трансфер GoogleCloudStorageToS3Operator. Допитливий розум запитає: «Навіщо? Адже можна робити повтори прямо в операторі! А потім, щоб не забивати пул тасків операторами, що підвисли. Сенсор запускається, перевіряє та вмирає до наступної спроби.
Завдання — оголошені оператори незалежно від типу та прикріплені до дагу підвищуються до чину таска.
Task instance — коли генерал-планувальник вирішив, що таски настав час відправляти в бій на виконавці-воркери (прямо на місці, якщо ми використовуємо LocalExecutor або на віддалену ноду у випадку з CeleryExecutor), він призначає їм контекст (тобто комплект змінних - параметрів виконання), розгортає шаблони команд чи запитів і складає в пул.
Генеруємо таски
Спершу позначимо загальну схему нашого дага, а потім все більше і більше занурюватимемося в деталі, тому що ми застосовуємо деякі нетривіальні рішення.
Отже, у найпростішому вигляді такий даг виглядатиме так:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Давайте розбиратися:
Спершу імпортуємо потрібні ліби та дещо ще;
sql_server_ds - це List[namedtuple[str, str]] з іменами коннектів з Airflow Connections та базами даних з яких ми забиратимемо нашу табличку;
dag - оголошення нашого дага, яке обов'язково має лежати в globals(), інакше Airflow його не знайде. Дагу також слід сказати:
що його звуть orders — це ім'я потім маячить у веб-інтерфейсі,
що працюватиме він, починаючи з півночі восьмого липня,
а запускати він повинен приблизно кожні 6 годин (для крутих хлопців тут замість timedelta() допустима cron-рядок 0 0 0/6 ? * * *для менш крутих - вираз начебто @daily);
workflow() робитиме основну роботу, але не зараз. Зараз ми просто висипаємо наш контекст у балку.
А тепер проста магія створення тасків:
пробігаємо нашими джерелами;
ініціалізуємо PythonOperator, який буде виконувати нашу пустушку workflow(). Не забувайте вказувати унікальне (в рамках дага) ім'я таска та підв'язувати сам даг. Прапор provide_context у свою чергу насипатиме в функцію додаткових аргументів, які ми дбайливо зберемо за допомогою **context.
Поки що на цьому все. Що ми отримали:
новий даг у веб-інтерфейсі,
півтори сотні тяг, які будуть виконуватися паралельно (якщо дозволять налаштування Airflow, Celery і потужності серверів).
Ну майже отримали.
Залежно хто ставитиме?
Щоб всю цю справу спростити я вкорячив у docker-compose.yml обробку requirements.txt на всіх нодах.
Зелені, ясна річ, успішно відпрацювали. Червоні – не дуже успішно.
До речі, на нашому праді жодної папки ./dags, що синхронізується між машинами немає - всі даги лежать у git на нашому Gitlab, а Gitlab CI розкладає оновлення на машини при мерджі master.
Трохи про Flower
Поки воркери молотять наші тасочки-пустушки, згадаємо ще один інструмент, який може нам дещо показати — Flower.
Найперша сторінка із сумарною інформацією по нодам-воркерам:
Найнасиченіша сторінка із завданнями, що вирушили в роботу:
Найнудніша сторінка зі станом нашого брокера:
Найяскравіша сторінка — з графіками стану тяган та їх часом виконання:
Довантажуємо недовантажене
Отже, всі таски відпрацювали, можна забирати поранених.
А поранених виявилося чимало — з тих чи інших причин. У разі правильного використання Airflow ось ці квадрати говорять про те, що дані безумовно не доїхали.
Потрібно дивитися лог і перезапускати task instances, що впали.
Жм'якнувши на будь-який квадрат, побачимо доступні нам дії:
Можна взяти, і зробити Clear впав. Тобто ми забуваємо про те, що там щось завалилося, і той самий інстанс таска піде планувальнику.
Зрозуміло, що робити так мишкою з усіма червоними квадратами не дуже гуманно – не на це ми чекаємо від Airflow. Звичайно, у нас є зброя масової поразки: Browse/Task Instances
Виберемо все разом і обнулимо натиснемо правильний пункт:
Після очищення наші таксі виглядають так (вони вже чекають не дочекаються, коли шедулер їх запланує):
З'єднання, хуки та інші змінні
Саме час подивитися на наступний DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Адже все-таки робили обновлялку звітів? Це знову вона: є перелік джерел, звідки забрати дані; є перелік, куди покласти; не забуваємо посигналити, коли все трапилося чи зламалося (ну це не про нас, ні).
Давайте знову пройдемося файлом і подивимося на нові незрозумілі штуки:
from commons.operators import TelegramBotSendMessage — нам ніщо не заважає робити свої оператори, ніж ми й скористалися, зробивши невелику обгортку для надсилання повідомлень до Розблокованого. (Про цей оператор ми ще поговоримо нижче);
default_args={} — даг може роздавати одні й самі аргументи всім своїм операторам;
to='{{ var.value.all_the_kings_men }}' - поле to у нас буде не захардшкіреним, а формованим динамічно за допомогою Jinja та змінної зі списком email-ів, яку я дбайливо поклав у Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS - Умова запуску оператора. У нашому випадку лист полетить босам тільки якщо всі залежності відпрацювали. успішно;
tg_bot_conn_id='tg_main' - аргументи conn_id приймають у собі ідентифікатори з'єднань, які ми створюємо в Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED — повідомлення в Telegram відлетять тільки за наявності тягаків, що впали;
task_concurrency=1 - Забороняємо одночасний запуск кількох task instances одного тяга. В іншому випадку, ми отримаємо одночасний запуск кількох VerticaOperator (Дивляться одну таблицю);
report_update >> [email, tg] - Усе VerticaOperator зійдуться у надсиланні листа та повідомлення, ось так:
Але оскільки оператори-нотифікатори мають різні умови запуску, працюватиме лише один. У Tree View все виглядає дещо менш наочно:
Скажу пару слів про макросах та їхніх друзів змінних.
Макроси - це Jinja-плейсхолдери, які можуть підставляти різну корисну інформацію аргументи операторів. Наприклад, так:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} розгорнеться у вміст змінної контексту execution_date у форматі YYYY-MM-DD: 2020-07-14. Найприємніше, що змінні контексту прибиваються цвяхами до певного інстансу тяга (квадратику в Tree View), і при перезапуску плейсхолдери розкриються в ті самі значення.
Призначені значення можна дивитися за допомогою кнопки Rendered на кожному таск-інстансі. Ось так біля тяга з відправкою листа:
А так біля таски з відправкою повідомлення:
Повний список вбудованих макросів для останньої доступної версії доступний тут: Macros Reference
Більше того, за допомогою плагінів ми можемо оголошувати власні макроси, але це вже зовсім інша історія.
Крім зумовлених штук, ми можемо підставляти значення своїх змінних (вище в коді я вже скористався цим). Створимо в Admin/Variables пару штук:
просто використовуємо шлях до потрібного ключа: {{ var.json.bot_config.bot.token }}.
Скажу буквально одне слово і покажу один скріншот про з'єднання. Тут все просто: на сторінці Admin/Connections створюємо з'єднання, складаємо туди наші логіни/паролі та більш специфічні параметри. Ось так:
Паролі можна шифрувати (ретельніше, ніж у варіанті за умовчанням), а можна не вказувати тип з'єднання (як я зробив для tg_main) - Справа в тому, що список типів зашитий в моделях Airflow і розширенню без влазіння в вихідні джерела не піддається (якщо раптом я чогось не догуглил - прошу мене поправити), але отримати креди просто на ім'я нам ніщо не завадить.
А ще можна зробити кілька з'єднань з одним ім'ям: у такому разі метод BaseHook.get_connection(), який дістає нам з'єднання на ім'я, буде віддавати випадкового з кількох тезок (було б логічніше зробити Round Robin, але залишимо це на совісті розробників Airflow).
Variables і Connections, безумовно, класні засоби, але важливо не втратити баланс: які частини ваших потоків ви зберігаєте власне в коді, а які віддаєте на зберігання Airflow. З одного боку швидко змінити значення, наприклад, ящик розсилки, може бути зручно через UI. А з іншого — це все-таки повернення до мишеклика, якого ми (я) хотіли позбутися.
Робота зі з'єднаннями – це одне із завдань хуків. Взагалі хуки Airflow - це точки підключення його до сторонніх сервісів та бібліотек. Наприклад, JiraHook відкриє нам клієнт взаємодії з Jira (можна завдання подвигать туди-сюди), а з допомогою SambaHook можна запустити локальний файл на smb-Точку.
Розбираємо кастомний оператор
І ми впритул підібралися до того, щоб подивитися на те, як зроблено TelegramBotSendMessage
Код commons/operators.py з власне оператором:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Тут, як і решта Airflow, все дуже просто:
Відспадкували від BaseOperator, який реалізує досить багато Airflow-специфічних штук (подивіться на дозвіллі)
Оголосили поля template_fields, в яких Jinja шукатиме макроси для обробки.
Організували правильні аргументи для __init__(), Розставили замовчування, де треба.
Про ініціалізацію предка теж не забули.
Відкрили відповідний хук TelegramBotHookотримали від нього об'єкт-клієнт.
Оверрайднули (перевизначили) метод BaseOperator.execute(), який Airfow буде пересмикувати, коли настане час запускати оператор - в ньому ми і реалізуємо основну дію, забувши залогуватися. (Лоґуємося, до речі, прямо в stdout и stderr - Airflow все перехопить, красиво оберне, розкладе, куди треба.)
Давайте дивитися, що в нас у commons/hooks.py. Перша частина файлика, із самим хуком:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Я навіть не знаю, що тут можна пояснювати, просто зазначу важливі моменти:
Наслідуємо, думаємо над аргументами — у більшості випадків він буде один: conn_id;
Перевизначаємо стандартні методи: я обмежився get_conn(), в якому я отримую параметри з'єднання на ім'я і всього лише дістаю секцію extra (це поле для JSON), в яку я (за своєю ж інструкцією!) поклав токен Telegram-бота: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Створю екземпляр нашого TelegramBot, віддаючи йому конкретний токен.
От і все. Отримати клієнт з хука можна за допомогою TelegramBotHook().clent або TelegramBotHook().get_conn().
І друга частина файлика, в якому я зробити мікрообгортку для Telegram REST API, щоб не тягнути той же python-telegram-bot заради одного методу sendMessage.
Правильний шлях - скласти все це: TelegramBotSendMessage, TelegramBotHook, TelegramBot - У плагін, покласти в загальнодоступний репозиторій, і віддати в Open Source.
Поки ми все це вивчали, наші оновлення звітів встигли успішно завалитися і надіслати мені канал повідомлення про помилку. Піду перевіряти, що знову не так…
У нашому дазі щось зламалося! А чи не на це ми чекали? Саме!
Наливати будеш?
Чи відчуваєте, що щось я пропустив? Як би обіцяв дані з SQL Server в Vertica переливати, і тут взяв і з'їхав з теми, негідник!
Злочинство це було навмисним, я просто повинен був розшифрувати вам деяку термінологію. Тепер можна їхати далі.
План у нас був такий:
Зробити даг
Нагенерувати таски
Подивитися, як все красиво
Надавати заливкам номери сесій
Забрати дані з SQL Server
Покласти дані у Vertica
Зібрати статистику
Отже, щоби все це запустити, я зробив маленьке доповнення до нашого docker-compose.yml:
наповнюємо бази в останніх деякими даними (у жодному разі не заглядайте в mssql_init.py!)
Запускаємо все добро за допомогою трохи складнішої, ніж минулого разу, команди:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Що нагенерував наш чудорандомайзер, можна, скориставшись пунктом Data Profiling/Ad Hoc Query:
Головне, не показувати це аналітикам
Детально зупинятися на ETL-сесіях я не буду, там все тривіально: робимо базу, в ній табличку, обертаємо все менеджером контексту, і тепер робимо так:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
настала пора забрати наші дані з наших півтори сотні таблиць. Зробимо це за допомогою дуже невигадливих рядків:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
За допомогою хука отримаємо з Airflow pymssql-коннект
У запит підставимо обмеження у вигляді дати - у функцію її підкине шаблонизатор.
Згодовуємо наш запит pandas, який дістане для нас DataFrame - Він нам знадобиться надалі.
Я використовую підстановку {dt} замість параметра запиту %s не тому, що я злий Буратіно, а тому що pandas не може впоратися з pymssql і підсовує останньому params: Listхоча той дуже хоче tuple.
Також зверніть увагу, що розробник pymssql вирішив більше його не підтримувати, і саме час з'їхати на pyodbc.
Подивимося, чим Airflow нашпигував аргументи наших функцій:
Якщо даних не виявилося, то немає сенсу продовжувати. Але вважати заливання успішним теж дивно. Але це не помилка. А-а-а, що робити? А ось що:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException скаже Airflow, що помилки, власне, немає, а таск ми пропускаємо. В інтерфейсі буде не зелений і червоний квадратик, а кольору pink.
— Ну ось, — сказала мишеня, — чи не так, тепер
Ти переконався, що в лісі я найстрашніший звір?
Джулія Дональдсон, «Груффало»
Думаю, якби ми з моїми колегами влаштували змагання: хто швидше складе і запустить з нуля ETL-процес: вони зі своїми SSIS і мишкою і я з Airflow... А потім ми ще порівняли зручність супроводу... Ух, думаю, ви погодитеся, що я обійду їх по всіх напрямках!
Якщо ж трохи серйозніше, то Apache Airflow - за рахунок опису процесів у вигляді програмного коду - зробив мою роботу набагато зручніше та приємніше.
Його ж необмежена розширюваність: як у плані плагінів, так і схильність до масштабованості - дає вам можливість застосовувати Airflow практично в будь-якій галузі: хоч у повному циклі збору, підготовки та обробки даних, хоч у запуску ракет (на Марс, звичайно ж).
Частина заключна, довідково-інформаційна
Граблі, які ми зібрали за вас
start_date. Так, це вже локальний мемасик. Через головний аргумент дага start_date проходять усі. Коротко, якщо вказати в start_date поточну дату, а в schedule_interval - Один день, то DAG запуститься завтра не раніше.
start_date = datetime(2020, 7, 7, 0, 1, 2)
І більше жодних проблем.
З ним же пов'язана ще одна помилка виконання: Task is missing the start_date parameter, Що найчастіше говорить про те, що ви забули прив'язати до оператора даг.
Все на одній машині. Так, і бази (самого Airflow та нашої обмазки), і веб-сервер, і планувальник, і воркери. І вона навіть працювала. Але згодом кількість завдань у сервісів зростала, і коли PostgreSQL став віддавати відповідь за індексом за 20 с замість 5 мс, ми її взяли та забрали.
LocalExecutor. Так, ми сидимо на ньому досі, і ми вже підійшли до краю прірви. LocalExecutor'а нам досі вистачало, але зараз настав час розширитися мінімум одним воркером, і доведеться піднапрягтися, щоб переїхати на CeleryExecutor. А через те, що з ним можна працювати і на одній машині, то нічого не зупиняє від використання Celery навіть на сервері, який «природно, ніколи не піде в прод, чесслово!»
Невикористання вбудованих засобів:
Зв'язки для зберігання облікових даних сервісів,
SLA Misses для реагування на таски, які не відпрацювали вчасно,
XCom для обміну метаданими (я сказав метаданими!) між тягами дага.
Зловживання поштою. Ну, що тут сказати? Були налаштовані оповіщення на всі повтори тяганів. Тепер у моєму робочому Gmail >90k листів від Airflow, і веб-морда пошти відмовляється брати та видаляти більше ніж по 100 штук за раз.
Для того, щоб нам ще більше працювати головою, а не руками, Airflow заготовила для нас ось що:
REST API - Він досі має статус Experimental, що не заважає йому працювати. З його допомогою можна не тільки отримувати інформацію про даги та тяги, але зупинити/запустити даг, створити DAG Run або пул.
CLI — через командний рядок доступні багато засобів, які не просто незручні у користуванні через WebUI, а взагалі відсутні. Наприклад:
backfill необхідний повторного запуску інстансів тяган.
Наприклад, прийшли аналітики, кажуть: «А у вас, товаришу, нісенітниця в даних із 1 по 13 січня! Чині-чіні-чіні-чіні!». А ти такий хоба:
Обслуговування бази: initdb, resetdb, upgradedb, checkdb.
run, який дозволяє запустити один інстанс таска, та ще й забити на все залежності. Більше того, можна запустити його через LocalExecutorнавіть якщо у вас Celery-кластер.
Приблизно те саме робить test, Тільки ще і в базах нічого не пише.
connections дозволяє масово створювати підключення із шеллу.
API Python - Досить хардкорний спосіб взаємодії, який призначений для плагінів, а не копошення в ньому ручками. Але хто ж нам завадить піти у /home/airflow/dags, запустити ipython і почати свавілля? Можна, наприклад, експортувати всі підключення таким кодом:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Підключення до бази метаданих Airflow. Писати в неї я не рекомендую, а от діставати стани тяган для різних специфічних метрик можна значно швидше і простіше, ніж через будь-який з API.
Скажімо, далеко не всі наші таски ідемпотентні, а іноді можуть падати і це нормально. Але кілька завалів — це вже підозріло, і треба було б перевірити.
Обережно, SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
Посилання
Ну і звичайно перші десять посилань з видачі гугла вміст папки Airflow з моїх закладок.