Прывітанне, я Зміцер Лагвіненка – Data Engineer аддзела аналітыкі групы кампаній «Вязе».
Я раскажу вам пра выдатную прыладу для распрацоўкі ETL-працэсаў — Apache Airflow. Але Airflow настолькі ўніверсальны і шматгранны, што вам варта дагледзецца да яго нават калі вы не займаецеся струменямі дадзеных, а маеце запатрабаванне перыядычна запускаць якія-небудзь працэсы і сачыць за іх выкананнем.
І так, я буду не толькі расказваць, але і паказваць: у праграме шмат кода, скрыншотаў і рэкамендацый.
Што звычайна бачыш, калі гугліш слова Airflow / Wikimedia Commons
— толькі лепш, ды і зроблены зусім для іншых мэт, а менавіта (як напісана да ката):
запуск і маніторынг задач на неабмежаванай колькасці машын (колькі вам дазволіць Celery/Kubernetes і ваша сумленне)
з дынамічнай генерацыяй workflow з вельмі лёгкага для напісання і ўспрыманні Python-кода
і магчымасцю звязваць сябар з сябра любыя базы дадзеных і API з дапамогай як гатовых кампанентаў, так і самаробных плагінаў (што робіцца надзвычай проста).
Мы выкарыстоўваем Apache Airflow так:
збіраем дадзеныя з розных крыніц (мноства інстансаў SQL Server і PostgreSQL, розныя API з метрыкамі прыкладанняў, нават 1С) у DWH і ODS (у нас гэта Vertica і Clickhouse).
як прасунуты cron, які запускае працэсы кансалідацыі дадзеных на ODS, а таксама сочыць за іх абслугоўваннем.
Да нядаўняга часу нашы запатрабаванні пакрываў адзін невялікі сервер на 32 ядрах і 50 GB аператыўкі. У Airflow пры гэтым працуе:
больш 200 дагоў (Уласна workflows, у якія мы набілі задачы),
у кожным у сярэднім па 70 цягак,
запускаецца гэта дабро (таксама ў сярэднім) раз у гадзіну.
А пра тое, як мы пашыраліся, я напішу ніжэй, а зараз давайце вызначым über-задачу, якую мы будзем вырашаць:
Ёсць тры зыходных SQL Server'а, на кожным па 50 баз дадзеных - інстансаў аднаго праекта, адпаведна, структура ў іх аднолькавая (амаль усюды, муа-ха-ха), а значыць у кожнай ёсць табліца Orders (балазе табліцу з такой назвай можна заштурхаць у любы бізнэс). Мы забіраем дадзеныя, дадаючы службовыя палі (сервер-крыніца, база-крыніца, ідэнтыфікатар ETL-задачы) і наіўнай выявай кінем іх у, скажам, Vertica.
Паехалі!
Частка асноўная, практычная (і крыху тэарэтычная)
Навошта яно нам (і вам)
Калі дрэвы былі вялікімі, а я быў простым SQL-шчыкам у адным расійскім рытэйле, мы шпарылі ETL-працэсы aka струмені дадзеных з дапамогай двух даступных нам сродкаў:
Informatica Power Center - вельмі раскідзістая сістэма, надзвычай прадукцыйная, са сваімі жалязякамі, уласным версіяваннем. Выкарыстоўваў я дай бог 1% яе магчымасцяў. Чаму? Ну, па-першае, гэты інтэрфейс недзе з нулявых псіхічна ціснуў на нас. Па-другое, гэтая штуковіна заменчаная пад надзвычай наварочаныя працэсы, лютае перавыкарыстанне кампанентаў і іншыя вельмі-важныя-энтэрпрайз-фішачкі. Пра тое што варта яна, як крыло Airbus A380/год, мы прамаўчым.
Асцярожна, скрыншот можа зрабіць людзям малодшай 30 крыху балюча
SQL Server Integration Server — гэтым таварышам мы карысталіся ў сваіх унутрыпраектных патоках. Ну а на самой справе: SQL Server мы ўжо выкарыстоўваем, і не карыстацца яго ETL-тулзы было б неяк неразумна. Усё ў ім добра: і інтэрфейс прыгожы, і справаздачы выканання… Але не за гэта мы любім праграмныя прадукты, ох не за гэта. Версіянаваць яго dtsx (які ўяўляе сабой XML з якія змешваюцца пры захаванні нодамі) мы можам, а толку? А зрабіць пакет цягоў, які перацягне сотню табліц з аднаго сервера на іншы? Ды што сотню, у вас ад дваццаці штук адваліцца паказальны палец, які пстрыкае па мышынай кнопцы. Але выглядае ён, вызначана, больш модна:
Мы безумоўна шукалі выхады. Справа нават амаль дайшло да самапіснага генератара SSIS-пакетаў…
… а потым мяне знайшла новая праца. А на ёй мяне нагнаў Apache Airflow.
Калі я даведаўся, што апісанні ETL-працэсаў - гэта просты Python-код, я толькі што не скакаў ад радасці. Вось так плыні дадзеных падвергліся версіянаванню і дыфу, а ссыпаць табліцы з адзінай структурай з сотні баз дадзеных у адзін таргет стала справай Python-кода ў паўтара-два 13” экрана.
Збіраны кластар
Давайце не ўладкоўваць зусім ужо дзіцячы сад, і не казаць тут пра зусім відавочныя рэчы, накшталт усталёўкі Airflow, абранай вамі БД, Celery і іншых спраў, апісаных у доках.
Каб мы маглі адразу прыступіць да эксперыментаў, я накідаў docker-compose.yml у якім:
Падымем уласна Паветраны паток: Scheduler, Webserver. Там жа будзе круціцца Flower для маніторынгу Celery-задач (таму што яго ўжо заштурхалі ў apache/airflow:1.10.10-python3.7, а мы і не супраць);
PostgreSQL, у які Airflow будзе пісаць сваю службовую інфармацыю (дадзеныя планавальніка, статыстыка выканання і т. д.), а Celery – адзначаць завершаныя цягі;
Redis, які будзе выступаць брокерам задач для Celery;
Celery worker, які і зоймецца непасрэдным выкананнем задачак.
У тэчку ./dags мы будзем складаць нашы файлы з апісаннем дагоў. Яны будуць падхапляцца на лета, таму ператоргваць увесь стэк пасля кожнага чыха не трэба.
Дзе-нідзе код у прыкладах прыведзены не цалкам (каб не загрувашчваць тэкст), а дзесьці ён мадыфікуецца ў працэсе. Суцэльныя працавальныя прыклады кода можна паглядзець у рэпазітары https://github.com/dm-logv/airflow-tutorial.
У зборцы кампозу я шмат у чым абапіраўся на вядомую выяву puckel/docker-airflow - абавязкова паглядзіце. Можа вам у жыцці больш нічога і не спатрэбіцца.
Усе наладкі Airflow даступныя не толькі праз airflow.cfg, Але і праз зменныя асяроддзі (слава распрацоўнікам), чым я злосна скарыстаўся.
Натуральна, ён не production-ready: я наўмысна не ставіў heartbeats на кантэйнеры, не затлумляўся з бяспекай. Але мінімум, прыдатны для нашых эксперыментыкаў я зрабіў.
Звярніце ўвагу, што:
Тэчка з дагамі павінна быць даступная як планавальніку, так і воркерам.
Тое ж самае тычыцца і ўсіх іншых бібліятэк - яны ўсе павінны быць устаноўлены на машыны з шэдулерам і воркерамі.
Ну а зараз проста:
$ docker-compose up --scale worker=3
Пасля таго, як усё паднімецца, можна глядзець на вэб-інтэрфейсы:
Калі вы нічога не зразумелі ва ўсіх гэтых "дагах", то вось кароткі слоўнічак:
Планавальнік – самы галоўны дзядзька ў Airflow, які кантралюе, каб укалывалі робаты, а не чалавек: сочыць за раскладам, абнаўляе дагі, запускае цягі.
Наогул, у старых версіях, у яго былі праблемы з памяццю (не, не амнезія, а ўцечкі) і ў канфігах нават застаўся легасі-параметр run_duration - Інтэрвал яго перазапуску. Але зараз усё добра.
ДЗЕНЬ (ён жа "даг") - "накіраваны ацыклічны граф", але такое вызначэнне мала каму што скажа, а па сутнасці гэта кантэйнер для ўзаемадзейнічаюць адзін з адным цягачаў (гл. ніжэй) або аналаг Package ў SSIS і Workflow ў Informatica.
Апроч дагаў яшчэ могуць быць сабдагі, але мы да іх хутчэй за ўсё не дабяромся.
DAG Run - ініцыялізаваны даг, якому прысвоены свой execution_date. Даграны аднаго дага могуць суцэль працаваць раўналежна (калі вы, вядома, зрабілі свае цягі ідэмпатэнтнымі).
аператар - Гэта кавалачкі кода, адказныя за выкананне якога-небудзь канкрэтнага дзеяння. Ёсць тры тыпу аператараў:
дзеянне, як напрыклад наш каханы PythonOperator, які ў сілах выканаць любы (валідны) Python-код;
пераклад, якія перавозяць дадзеныя з месца на месца, скажам, MsSqlToHiveTransfer;
датчык ж дазволіць рэагаваць або прытармазіць далейшае выкананне дага да наступлення якой-небудзь падзеі. HttpSensor можа тузаць паказаны эндпойнт, і калі дачакаецца патрэбны адказ, запусціць трансфер GoogleCloudStorageToS3Operator. Дапытлівы розум спытае: «навошта? Бо можна рабіць паўторы прама ў аператары!» А затым, каб не забіваць пул цягоў падвіслым аператарамі. Сэнсар запускаецца, правярае і памірае да наступнай спробы.
Задача - Абвешчаныя аператары па-за залежнасці ад тыпу і прымацаваныя да дагу павышаюцца да чыну цяга.
Task instance - калі генерал-планавальнік вырашыў, што цягі сітавіна адпраўляць у бой на выканаўцы-воркеры (прама на месцы, калі мы выкарыстаем LocalExecutor ці на выдаленую ноду ў выпадку з CeleryExecutor), ён прызначае ім кантэкст (т. е. камплект зменных - параметраў выканання), разгортвае шаблоны каманд або запытаў і складае іх у пул.
Генеруем цягі
Перш абазначым агульную схему нашага дага, а затым будзем усё больш і больш апускацца ў дэталі, таму што мы ўжываем некаторыя нетрывіяльныя рашэнні.
Такім чынам, у найпростым выглядзе падобны даг будзе выглядаць так:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Давайце разбірацца:
Спачатку імпартуем патрэбныя лісы і сёе-тое яшчэ;
sql_server_ds - Гэта List[namedtuple[str, str]] з імёнамі канэктаў з Airflow Connections і базамі дадзеных з якіх мы будзем забіраць нашу таблічку;
dag - Аб'ява нашага дага, якое абавязкова павінна ляжаць у globals(), інакш Airflow яго не знойдзе. Дагу таксама трэба сказаць:
што яго клічуць orders – гэтае імя потым будзе маячыць у вэб-інтэрфейсе,
што працаваць ён будзе, пачынаючы з паўночы восьмага ліпеня,
а запускаць ён павінен, прыкладна кожныя 6 гадзін (для крутых хлопцаў тут замест timedelta() дапушчальная cron-радок 0 0 0/6 ? * * *, для менш крутых - выраз накшталт @daily);
workflow() будзе рабіць асноўную працу, але не зараз. Цяпер мы проста высыпем наш кантэкст у лог.
А зараз простая магія стварэння цягал:
прабягаем па нашых крыніцах;
ініцыялізуем PythonOperator, які будзе выконваць нашу пустышку workflow(). Не забывайце ўказваць унікальнае (у рамках дага) імя цяга і падвязваць сам даг. Сцяг provide_context у сваю чаргу насыпець у функцыю дадатковых аргументаў, якія мы беражліва збяром з дапамогай **context.
Пакуль на гэтым усё. Што мы атрымалі:
новы даг у вэб-інтэрфейсе,
паўтары сотні цягінаў, якія будуць выконвацца раўналежна (калі то дазволяць налады Airflow, Celery і магутнасці сервераў).
Ну амаль атрымалі.
Залежнасці хто будзе ставіць?
Каб усю гэтую справу спрасціць я ўкарачыў у docker-compose.yml апрацоўку requirements.txt на ўсіх нодах.
Зялёныя, зразумелая справа, - паспяхова адпрацавалі. Чырвоныя - не вельмі паспяхова.
Дарэчы, на нашым продзе ніякай тэчкі ./dags, якая сінхранізуецца паміж машынамі няма - усе дагі ляжаць у git на нашым Gitlab, а Gitlab CI раскладвае абнаўлення на машыны пры мэрджы ў master.
Трохі аб Flower
Пакуль воркеры малоцяць нашы тасачкі-пустышкі, успомнім пра яшчэ адзін інструмент, які можа нам сёе-тое паказаць – Flower.
Самая першая старонка з сумарнай інфармацыяй па нодам-воркерам:
Самая насычаная старонка з задачамі, якія адправіліся ў працу:
Самая сумная старонка са станам нашага брокера:
Самая яркая старонка - з графікамі стану цягліц і іх часам выканання:
Дагружаем недагружанае
Такім чынам, усе цягі адпрацавалі, можна выносіць параненых.
А параненых аказалася нямала — па тых ці іншых прычынах. У выпадку правільнага выкарыстання Airflow вось гэтыя самыя квадраты кажуць аб тым, што дадзеныя дакладна не даехалі.
Трэба глядзець лог і перазапускаць падалі task instances.
Кінуўшы на любы квадрат, убачым даступныя нам дзеянні:
Можна ўзяць, і зрабіць Clear які ўпаў. Гэта значыць, мы забываемся пра тое, што там нешта завалілася, і той жа самы інстанс цяга сыдзе планавальніку.
Зразумела, што рабіць так мышкай з усімі чырвонымі квадратамі не вельмі гуманна не гэтага мы чакаем ад Airflow. Натуральна, у нас ёсць зброя масавай паразы: Browse/Task Instances
Выберам усё зараз і абнулім націснем правільны пункт:
Пасля ачысткі нашы таксі выглядаюць так (яны ўжо чакаюць не дачакаюцца, калі шэдулер іх заплануе):
Злучэнні, хукі і іншыя зменныя
Самы час паглядзець на наступны DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Усё ж калі-небудзь рабілі абнаўлялку справаздач? Гэта зноў яна: ёсць спіс крыніц, адкуль забраць дадзеныя; ёсць спіс, куды пакласці; не забываем пасігналіць, калі ўсё здарылася ці зламалася (ну гэта не пра нас, не).
Давайце зноў пройдземся па файле і паглядзім на новыя незразумелыя штукі:
from commons.operators import TelegramBotSendMessage - нам нішто не перашкаджае рабіць свае аператары, чым мы і скарысталіся, зрабіўшы невялікую абгортачку для адпраўкі паведамленняў у Разблакаваны. (Пра гэтага аператара мы яшчэ пагаворым ніжэй);
default_args={} - Даг можа раздаваць адны і тыя ж аргументы ўсім сваім аператарам;
to='{{ var.value.all_the_kings_men }}' - поле to у нас будзе не захардскураным, а фармаваным дынамічна з дапамогай Jinja і зменнай са спісам email-ов, якую я клапатліва паклаў у Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS - Умова запуску аператара. У нашым выпадку, ліст паляціць босам толькі калі ўсе залежнасці адпрацавалі. паспяхова;
tg_bot_conn_id='tg_main' - аргументы conn_id прымаюць у сябе ідэнтыфікатары злучэнняў, якія мы ствараем у Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - паведамленні ў Telegram паляцяць толькі пры наяўнасці ўпалых цягліц;
task_concurrency=1 - забараняем адначасовы запуск некалькіх task instances аднаго цяга. У адваротным выпадку, мы атрымаем адначасовы запуск некалькіх VerticaOperator (якія глядзяць на адну табліцу);
report_update >> [email, tg] - усё VerticaOperator сыдуцца ў адпраўцы ліста і паведамленні, вось так:
Але паколькі ў аператараў-натыфікатараў стаяць розныя ўмовы запуску, працаваць будзе толькі адзін. У Tree View усё выглядае некалькі меней навочна:
Скажу пару слоў аб макрасах і іх сябрах зменных.
Макрасы – гэта Jinja-плейсхолдэры, якія могуць падстаўляць розную карысную інфармацыю ў аргументы аператараў. Напрыклад, так:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} разгорнецца ў змесціва зменнай кантэксту execution_date у фармаце YYYY-MM-DD: 2020-07-14. Самае прыемнае, што зменныя кантэксты прыбіваюцца цвікамі да вызначанага інстансу цяга (квадратыку ў Tree View), і пры перазапуску плейсхолдэры расчыняцца ў тыя ж самыя значэнні.
Прысвоеныя значэнні можна глядзець з дапамогай кнопкі Rendered на кожным таск-інстансе. Вось так у цяга з адпраўкай ліста:
А так у цягі з адпраўкай паведамлення:
Поўны спіс убудаваных макрасаў для апошняй даступнай версіі даступны тут: Macros Reference
Больш за тое, з дапамогай плагінаў, мы можам аб'яўляць уласныя макрасы, але гэта ўжо зусім іншая гісторыя.
Апроч наканаваных штук, мы можам падстаўляць значэнні сваіх зменных (вышэй у кодзе я ўжо гэтым скарыстаўся). Створым у Admin/Variables пару штук:
проста выкарыстоўваем шлях да патрэбнага ключа: {{ var.json.bot_config.bot.token }}.
Скажу літаральна адно слова і пакажу адзін скрыншот пра злучэння. Тут усё элементарна: на старонцы Admin/Connections ствараем злучэнне, складаем туды нашы лагіны/паролі і больш спецыфічныя параметры. Вось так:
Паролі можна шыфраваць (больш старанна, чым у варыянце па змаўчанні), а можна не паказваць тып злучэння (як я зрабіў для tg_main) - справа ў тым, што спіс тыпаў зашыты ў мадэлях Airflow і пашырэнню без залажэння ў зыходнікі не паддаецца (калі раптам я чагосьці не дагугліў - прашу мяне паправіць), але атрымаць крэды проста па імені нам нішто не перашкодзіць.
А яшчэ можна зрабіць некалькі злучэнняў з адным імем: у такім разе метад BaseHook.get_connection(), які дастае нам злучэння па імені, будзе аддаваць выпадковага з некалькіх цёзак (было б лагічна зрабіць Round Robin, але пакінем гэта на сумленні распрацоўнікаў Airflow).
Variables і Connections, безумоўна, класныя сродкі, але важна не страціць баланс: якія часткі вашых патокаў вы захоўваеце ўласна ў кодзе, а якія - аддаяце на захоўванне Airflow. З аднаго боку хутка памяняць значэнне, напрыклад, скрыню рассылання, можа быць зручна праз UI. А з другога - гэта ўсё-ткі зварот да мышакліку, ад якога мы (я) хацелі пазбавіцца.
Праца са злучэннямі - гэта адна з задач хукаў. Наогул хукі Airflow - гэта кропкі падлучэння яго да іншых сэрвісаў і бібліятэкам. Напрыклад, JiraHook адкрые для нас кліент для ўзаемадзеяння з Jira (можна задачкі падштурхоўваць туды-сюды), а з дапамогай SambaHook можна запушыць лакальны файл на smb-кропку.
Разбіраны кастамны аператар
І мы ўшчыльную падабраліся да таго, каб паглядзець на тое, як зроблены TelegramBotSendMessage
Код commons/operators.py з уласна аператарам:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Тут, як і астатняе ў Airflow, усё вельмі проста:
Адспадкаваліся ад BaseOperator, Які рэалізуе даволі шмат Airflow-спецыфічных штук (паглядзіце на вольным часе)
Абвясцілі палі template_fields, у якіх Jinja будзе шукаць макрасы для апрацоўкі.
Арганізавалі правільныя аргументы для __init__(), расставілі змаўчанні, дзе трэба.
Аб ініцыялізацыі продка таксама не забыліся.
Адкрылі адпаведны хук TelegramBotHook, атрымалі ад яго аб'ект-кліент
Аверрайднулі (перавызначылі) метад BaseOperator.execute(), які Airfow будзе паторгваць, калі наступіць час запускаць аператар – у ім мы і рэалізуем асноўнае дзеянне, на забыўшыся залагавацца. (Лагуем, дарэчы, прама ў stdout и stderr - Airflow усё перахопіць, хораша абгарне, раскладзе, куды трэба.)
Давайце глядзець, што ў нас у commons/hooks.py. Першая частка файліка, з самім хукам:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Я нават не ведаю, што тут можна тлумачыць, проста адзначу важныя моманты:
Успадкоўваемся, думаем над аргументамі - у большасці выпадкаў ён будзе адзін: conn_id;
Перавызначаем стандартныя метады: я абмежаваўся get_conn(), у якім я атрымліваю параметры злучэння па імені і ўсяго толькі дастаю секцыю extra (гэтае поле для JSON), у якую я (па сваёй жа інструкцыі!) паклаў токен Telegram-бота: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Ствараю асобнік нашага TelegramBot, аддаючы яму ўжо канкрэтны токен.
Вось і ўсё. Атрымаць кліент з хука можна з дапамогай TelegramBotHook().clent або TelegramBotHook().get_conn().
І другая частка файліка, у якім я зрабіць мікраабгортачку для Telegram REST API, каб не цягнуць той жа python-telegram-bot дзеля аднаго метаду sendMessage.
Правільны шлях - скласці ўсё гэта: TelegramBotSendMessage, TelegramBotHook, TelegramBot - у плягін, пакласці ў агульнадаступны рэпазітар, і аддаць у Open Source.
Пакуль мы ўсё гэта вывучалі, нашы абнаўленні справаздач паспелі паспяхова заваліцца і адправіць мне ў канал паведамленне аб памылцы. Пайду правяраць, што зноў не так…
У нашым дазе нешта зламалася! А ці не гэтага мы чакалі? Менавіта!
Наліваць-то будзеш?
Адчуваеце, нешта я прапусціў? Як бы абяцаў дадзеныя з SQL Server у Vertica пераліваць, і тут узяў і з'ехаў з тэмы, нягоднік!
Злачынства гэта было наўмысным, я проста абавязаны быў расшыфраваць вам сякую-такую тэрміналогію. Цяпер можна ехаць далей.
План у нас быў такі:
Зрабіць даг
Нагенераваць цягі
Паглядзець, як усё прыгожа
Прысвойваць заліўкам нумара сесій
Забраць дадзеныя з SQL Server
Пакласці дадзеныя ў Vertica
Сабраць статыстыку
Такім чынам, каб усё гэта запусціць, я зрабіў маленькі дадатак да нашага docker-compose.yml:
напаўняем базы ў апошніх некаторымі дадзенымі (ні ў якім разе не зазірайце ў mssql_init.py!)
Запускаем усё дабро з дапамогай крыху больш складанай, чым у мінулы раз, каманды:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Што нагенераваў наш цударандамайзер, можна, скарыстаўшыся пунктам Data Profiling/Ad Hoc Query:
Галоўнае, не паказваць гэта аналітыкам
Падрабязна спыняцца на ETL-сесіях я не буду, там усё трывіяльна: які робіцца базу, у ёй таблічку, абарачаем усё мэнэджарам кантэксту, і зараз які робіцца так:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Надышоў час забраць нашы дадзеныя з нашых паўтары сотняў табліц. Зробім гэта з дапамогай вельмі немудрагелістых радкоў:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
З дапамогай хука атрымаем з Airflow pymssql-канэкт
У запыт падставім абмежаванне ў выглядзе даты - у функцыю яе падкіне шаблонизатор.
Скормліваем наш запыт pandas, які дастане для нас DataFrame - Ён нам спатрэбіцца ў далейшым.
Я выкарыстоўваю падстаноўку {dt} замест параметра запыту %s не таму, што я злосны Бураціна, а таму што pandas не можа справіцца з pymssql і падсоўвае апошняму params: List, хоць той вельмі хоча tuple.
Таксама звярніце ўвагу, што распрацоўшчык pymssql вырашыў больш яго не падтрымліваць, і самы час з'ехаць на pyodbc.
Паглядзім, чым Airflow нашпігаваў аргументы нашых функцый:
Калі дадзеных не аказалася, то працягваць сэнсу няма. Але лічыць заліванне паспяховай таксама дзіўна. Але гэта і не памылка. А-а-а, што рабіць?! А вось што:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException скажа Airflow, што памылкі, уласна няма, а таск мы прапускаем. У інтэрфейсе будзе не зялёны і не чырвоны квадрацік, а колеры pink.
- Ну вось, - сказала мышаня, - ці не праўда, цяпер
Ты пераканаўся, што ў лесе я самы страшны звер?
Джулія Дональдсан, «Груфала»
Думаю, калі б мы з маімі калегамі зладзілі спаборніцтва: хто хутчэй складзе і запусціць з нуля ETL-працэс: яны са сваімі SSIS і мышкай і я з Airflow… А потым бы мы яшчэ параўналі выгоду суправаджэння… Ух, думаю, вы пагадзіцеся, што я абыду іх па ўсіх франтах!
Калі ж крыху больш сур'ёзны, то Apache Airflow – за кошт апісання працэсаў у выглядзе праграмнага кода – зрабіў маю працу значна зручней і прыемней.
Яго ж неабмежаваная пашыральнасць: як у плане плагінаў, так і схільнасць да маштабаванасці – дае вам магчымасць ужываць Airflow практычна ў любой вобласці: хоць у поўным цыкле збору, падрыхтоўкі і апрацоўкі дадзеных, хоць у запуску ракет (на Марс, вядома ж).
Частка заключная, даведачна-інфармацыйная
Граблі, якія мы сабралі за вас
start_date. Так, гэта ўжо лакальны мемасік. Праз галоўны аргумент дага start_date праходзяць усе. Коратка, калі паказаць у start_date бягучую дату, а ў schedule_interval - Адзін дзень, то DAG запусціцца заўтра не раней.
start_date = datetime(2020, 7, 7, 0, 1, 2)
І больш ніякіх праблем.
З ім жа звязана і яшчэ адна памылка выканання: Task is missing the start_date parameter, Якая часцей за ўсё кажа аб тым, што вы забыліся прывязаць да аператара даг.
Усё на адной машыне. Так, і базы (самога Airflow і нашай абмазкі), і вэб-сервер, і планавальнік, і воркеры. І яно нават працавала. Але з часам колькасць задач у сэрвісаў расла, і калі PostgreSQL стаў аддаваць адказ па азначніку за 20 з замест 5 мс, мы яго ўзялі і панеслі.
LocalExecutor. Так, мы сядзім на ім да гэтага часу, і мы ўжо падышлі да краю прорвы. LocalExecutor'а нам да гэтага часу хапала, але цяпер прыйшла пара пашырыцца мінімум адным воркерам, і прыйдзецца паднапружыцца, каб пераехаць на CeleryExecutor. А з прычыны таго, што з ім можна працаваць і на адной машынай, то нічога не спыняе ад выкарыстання Celery нават не серверы, які "натуральна, ніколі не пайдзе ў прод, чеслово!"
Невыкарыстанне убудаваных сродкаў:
Сувязі для захоўвання уліковых дадзеных сэрвісаў,
SLA Misses для рэагавання на цягі, якія не адпрацавалі своечасова,
XCom для абмену метададзенымі (я сказаў мэтададзенымі!) паміж цягамі дага.
Злоўжыванне поштай. Ну што тут сказаць? Былі настроены абвесткі на ўсе паўторы паваленых цягал. Зараз у маім працоўным Gmail >90k лістоў ад Airflow, і вэб-морда пошты адмаўляецца браць і выдаляць больш за па 100 штук за раз.
Для таго каб нам яшчэ больш працаваць галавой, а не рукамі, Airflow нарыхтавала для нас вось што:
REST API - Ён да гэтага часу мае статус Experimental, што не перашкаджае яму працаваць. З яго дапамогай можна не толькі атрымліваць інфармацыю аб дагах і цягах, але спыніць/запусціць даг, стварыць DAG Run ці пул.
CLI - праз камандны радок даступныя многія сродкі, якія не проста нязручныя ў звароце праз WebUI, а наогул адсутнічаюць. Напрыклад:
backfill патрэбен для паўторнага запуску інстансаў цягак.
Напрыклад, дашлі аналітыкі, кажуць: «А ў вас, таварыш, глупства ў дадзеных з 1 па 13 студзеня! Чыні-чыні-чыні-чыні!». А ты такі хоба:
run, які дазваляе запусціць адзін інстанс цяга, ды яшчэ і забіць на ўсё залежнасці. Больш за тое, можна запусціць яго праз LocalExecutor, нават калі ў вас Celery-кластар.
Прыкладна тое ж самае робіць test, толькі яшчэ і ў баз нічога не піша.
connections дазваляе масава ствараць падлучэнні з шелла.
API Python - Даволі хардкорны спосаб узаемадзеяння, які прызначаны для плагінаў, а не кешкання ў ім ручанькамі. Але хто ж нам перашкодзіць пайсці ў /home/airflow/dags, запусціць ipython і пачаць бязмежжа? Можна, напрыклад, экспартаваць усе падлучэнні такім кодам:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Падключэнне да базы метададзеных Airflow. Пісаць у яе я не рэкамендую, а вось даставаць станы цягліц для розных спецыфічных метрык можна значна хутчэй і прасцей, чым праз любы з API.
Скажам, далёка не ўсе нашы цягі ідэмпатэнтныя, а могуць часам падаць і гэта нармальна. Але некалькі завалаў - гэта ўжо падазрона, і трэба было б праверыць.
Асцярожна, SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
Спасылкі
Ну і натуральна першыя дзесяць спасылак з выдачы гугла змесціва тэчкі Airflow з маіх закладак.