Здраво, јас сум Дмитриј Логвиненко - инженер за податоци на Одделот за аналитика на групата компании Везет.
Ќе ви кажам за прекрасна алатка за развој на ETL процеси - Apache Airflow. Но, Airflow е толку разновиден и повеќеслоен што треба внимателно да го погледнете дури и ако не сте вклучени во протокот на податоци, но имате потреба периодично да стартувате какви било процеси и да го следите нивното извршување.
И да, не само што ќе кажам, туку и ќе покажам: програмата има многу код, слики од екранот и препораки.

Она што обично го гледате кога ќе го пребарувате на гугл зборот Airflow / Wikimedia Commons
Содржина
Вовед
Apache Airflow е исто како Џанго:
- напишано во питон
- има одличен административен панел,
- може да се прошири на неодредено време
- само подобро, а направен е за сосема други цели, имено (како што пишува пред ката):
- работи и следење задачи на неограничен број машини (колку што ќе ви дозволат целер/Кубернети и вашата совест)
- со генерирање на динамичен работен тек од многу лесен за пишување и разбирање Python код
- и можност за поврзување на било какви бази на податоци и API-и едни со други користејќи готови компоненти и домашни приклучоци (што е исклучително едноставно).
Ние користиме Apache Airflow вака:
- собираме податоци од различни извори (многу примероци на SQL Server и PostgreSQL, различни API со метрика на апликацијата, дури и 1C) во DWH и ODS (имаме Vertica и Clickhouse).
- колку напредна
cron, кој ги започнува процесите на консолидација на податоците на ODS, а исто така го следи нивното одржување.
До неодамна, нашите потреби ги покриваше еден мал сервер со 32 јадра и 50 GB RAM. Во протокот на воздух, ова функционира:
- повеќе 200 дари (всушност работни текови, во кои ги наполнивме задачите),
- во секоја во просек 70 задачи,
- оваа добрина започнува (исто така во просек) еднаш на час.
А за тоа како се проширивме, ќе напишам подолу, но сега да го дефинираме über-проблемот што ќе го решиме:
Има три оригинални SQL сервери, секој со 50 бази на податоци - примероци од еден проект, соодветно, имаат иста структура (скоро секаде, mua-ha-ha), што значи дека секој има табела Orders (за среќа, табела со тоа името може да се втурне во секој бизнис). Ги земаме податоците со додавање полиња за услуги (изворниот сервер, изворната база на податоци, ИД на задачата ETL) и наивно ги фрламе во, да речеме, Вертика.
Ајде да одиме!
Главниот дел, практичен (и малку теоретски)
Зошто ние (и вие)
Кога дрвјата беа големи, а јас бев едноставен SQL-Шик во една руска малопродажба, ги измамивме процесите на ETL ака текови на податоци користејќи две алатки кои ни се достапни:
- Центар за напојување на информатика - екстремно распространет систем, исклучително продуктивен, со сопствен хардвер, сопствена верзија. Искористив не дај Боже 1% од неговите можности. Зошто? Па, пред се, овој интерфејс, некаде од 380-тите, ментално нè притиска. Второ, оваа алатка е дизајнирана за екстремно фенси процеси, бесна повторна употреба на компоненти и други многу важни-претпријатија-трикови. За фактот дека чини, како крилото на Ербас АXNUMX / година, нема да кажеме ништо.
Внимавајте, скриншот може малку да ги повреди луѓето под 30 години

- Сервер за интеграција на SQL Server - го користевме овој другар во нашите интра-проектни текови. Па, всушност: ние веќе користиме SQL Server, и би било некако неразумно да не ги користиме неговите ETL алатки. Сè во него е добро: и интерфејсот е убав, и извештаите за напредокот... Но, ова не е причината зошто ги сакаме софтверските производи, о, не за ова. Верзија на тоа
dtsx(што е XML со мешани јазли на зачувај) можеме, но која е поентата? Како да направите пакет со задачи што ќе влече стотици табели од еден сервер на друг? Да, колку сто, ќе ти падне показалецот од дваесет парчиња, со кликнување на копчето на глувчето. Но, дефинитивно изгледа помодерно:
Сигурно баравме излези. Дури и случај речиси дошол до генератор на пакети SSIS напишан ...
…и потоа ме најде нова работа. И Apache Airflow ме престигна на неа.
Кога дознав дека описите на процесот ETL се едноставни Python код, едноставно не танцував од радост. Вака се верзии и се разликуваа тековите на податоци, а прелевањето табели со една структура од стотици бази на податоци во една цел стана прашање на код на Пајтон во еден и пол или два екрани од 13 инчи.
Составување на кластерот
Да не организираме комплетна градинка и да не зборуваме за сосема очигледни работи овде, како инсталирање на Airflow, вашата избрана база на податоци, Celery и други случаи опишани во доковите.
За да можеме веднаш да започнеме со експерименти, скицирав docker-compose.yml во кој:
- Ајде всушност да подигнеме Струење: Распоредувач, веб-сервер. Цветот, исто така, ќе се врти таму за да ги следи задачите на Целерот (бидејќи веќе е втурнат
apache/airflow:1.10.10-python3.7, но не ни пречи) - PostgreSQL, во кој Airflow ќе ги запише своите сервисни информации (податоци за распоредувачот, статистика за извршување итн.), а Celery ќе ги означува завршените задачи;
- Redis, кој ќе делува како брокер за задачи за Целер;
- Работник со целер, кои ќе бидат ангажирани за директно извршување на задачите.
- Во папката
./dagsќе ги додадеме нашите датотеки со опис на dags. Тие ќе бидат подигнати на мува, така што нема потреба да жонглирате со целиот оџак по секое кивање.
На некои места, кодот во примерите не е целосно прикажан (за да не се натрупува текстот), но некаде се менува во процесот. Целосните примери за работни кодови може да се најдат во складиштето .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerЗабелешки:
- Во склопувањето на композицијата во голема мера се потпирав на добро познатата слика - не заборавајте да го проверите. Можеби не ви треба ништо друго во вашиот живот.
- Сите поставки за проток на воздух се достапни не само преку
airflow.cfg, но и преку променливите на околината (благодарение на програмерите), кои злонамерно ги искористив. - Секако, не е подготвен за производство: намерно не ставав отчукувања на срцето на контејнерите, не се замарав со безбедноста. Но, го направив минимумот погоден за нашите експериментатори.
- Забележи го тоа:
- Папката dag мора да биде достапна и за распоредувачот и за работниците.
- Истото важи и за сите библиотеки од трети страни - сите тие мора да се инсталираат на машини со распоредувач и работници.
Па, сега е едноставно:
$ docker-compose up --scale worker=3Откако сè ќе се зголеми, можете да ги погледнете веб-интерфејсите:
- Проток на воздух:
- Цвеќе:
Основни концепти
Ако не разбравте ништо во сите овие „дамки“, тогаш еве краток речник:
- Распоредувачот - најважниот вујко во Airflow, кој контролира роботите да работат напорно, а не личноста: го следи распоредот, ажурира дамки, започнува задачи.
Во принцип, во постарите верзии, тој имаше проблеми со меморијата (не, не амнезија, туку протекување) и параметарот на наследството остана дури и во конфигурациите
run_duration— неговиот интервал за рестартирање. Но, сега се е во ред. - ДАГ (познато како „даг“) - „насочен ацикличен график“, но таквата дефиниција ќе им каже на неколку луѓе, но всушност тоа е контејнер за задачи кои се во интеракција едни со други (види подолу) или аналог на пакетот во SSIS и Workflow во Informatica .
Покрај дамките, може сè уште да има поддаги, но најверојатно нема да дојдеме до нив.
- ДАГ Испратена - иницијализиран даг, кој е доделен свој
execution_date. Даграните од истата даг можат да работат паралелно (ако сте ги направиле вашите задачи идемпотентни, се разбира). - Оператор се парчиња код одговорни за извршување на одредена акција. Постојат три типа на оператори:
- акцијакако нашиот омилен
PythonOperator, кој може да изврши кој било (валиден) Python код; - пренос на, кои пренесуваат податоци од место до место, да речеме,
MsSqlToHiveTransfer; - сензор од друга страна, ќе ви овозможи да реагирате или да го забавите понатамошното извршување на даг додека не се случи некој настан.
HttpSensorможе да ја повлече наведената крајна точка и кога се чека саканиот одговор, започнете го преносотGoogleCloudStorageToS3Operator. Истражувачкиот ум ќе праша: „Зошто? На крајот на краиштата, можете да правите повторувања токму во операторот!“ И тогаш, за да не се заглави базенот на задачи со суспендирани оператори. Сензорот се вклучува, проверува и умира пред следниот обид.
- акцијакако нашиот омилен
- Задача - декларираните оператори, без разлика на типот, и прикачени на даг се унапредуваат во ранг на задача.
- пример на задача - кога генералниот планер одлучи дека е време да испрати задачи во битка на изведувачите-работници (точно на лице место, ако користиме
LocalExecutorили на оддалечен јазол во случај наCeleryExecutor), им доделува контекст (т.е. збир на променливи - параметри за извршување), ги проширува шаблоните за команди или барања и ги здружува.
Ние генерираме задачи
Прво, да ја претставиме општата шема на нашето тесто, а потоа сè повеќе ќе се нурнеме во деталите, бидејќи применуваме некои нетривијални решенија.
Значи, во наједноставна форма, таквата дамка ќе изгледа вака:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Ајде да го сфатиме:
- Прво, ги увезуваме потребните либови и нешто друго;
sql_server_ds- ДалиList[namedtuple[str, str]]со имињата на врските од Airflow Connections и базите на податоци од кои ќе ја земеме нашата плоча;dag- објавата на нашата даг, која нужно мора да биде инglobals(), инаку Airflow нема да го најде. Даг исто така треба да каже:- како се вика
orders- ова име потоа ќе се појави во веб-интерфејсот, - дека ќе работи од полноќ на осми јули,
- и треба да работи, приближно на секои 6 часа (за тешки момци овде наместо
timedelta()дозволенатаcron-линија0 0 0/6 ? * * *, за помалку кул - израз како@daily);
- како се вика
workflow()ќе ја заврши главната работа, но не сега. Засега, само ќе го фрлиме нашиот контекст во дневникот.- И сега едноставната магија за создавање задачи:
- трчаме низ нашите извори;
- иницијализираат
PythonOperator, кој ќе ја изврши нашата куклаworkflow(). Не заборавајте да наведете уникатно (во рамките на даг) име на задачата и да ја врзете самата даг. Знамеprovide_contextза возврат, ќе внесе дополнителни аргументи во функцијата, кои внимателно ќе ги собереме користејќи ги**context.
Засега тоа е се. Што добивме:
- нова дамка во веб-интерфејсот,
- сто и пол сто задачи кои ќе се извршуваат паралелно (доколку тоа го дозволуваат поставките Airflow, Celery и капацитетот на серверот).
Па, речиси го сфатив.

Кој ќе ги инсталира зависностите?
За да ја поедноставам целата работа, се зафркнав docker-compose.yml обработка requirements.txt на сите јазли.
Сега го нема:

Сивите квадрати се примери на задачи обработени од распоредувачот.
Чекаме малку, работните задачи ги кршат:

Зелените, секако, успешно си ја завршија работата. Црвените не се многу успешни.
Патем, нема папка на нашиот прод
./dags, нема синхронизација помеѓу машините - сите дамки лежат воgitна нашиот Gitlab, а Gitlab CI дистрибуира ажурирања на машините кога се спојуваатmaster.
Малку за Цветот
Додека работниците ни ги тресат цуцлите, да се потсетиме на уште една алатка која може да ни покаже нешто - Цвет.
Првата страница со збирни информации за работничките јазли:

Најинтензивната страница со задачи што отидоа на работа:

Најдосадната страница со статус на нашиот брокер:

Најсветлата страница е со графикони за статус на задачи и време на нивното извршување:

Го товариме недоволно
Значи, сите задачи се разработени, можете да ги однесете ранетите.

И имаше многу ранети - од една или друга причина. Во случај на правилна употреба на Airflow, токму овие квадрати покажуваат дека податоците дефинитивно не пристигнале.
Треба да го гледате дневникот и да ги рестартирате паднатите примери на задачи.
Со кликнување на кој било квадрат, ќе ги видиме дејствата што ни се достапни:

Можете да земете и да направите Расчистување на паднатите. Односно, забораваме дека нешто не успеа таму, а истата задача за пример ќе оди кај распоредувачот.

Јасно е дека тоа да се прави со глувчето со сите црвени квадрати не е многу хумано - тоа не е она што го очекуваме од Airflow. Нормално, имаме оружје за масовно уништување: Browse/Task Instances

Ајде да избереме сè одеднаш и да се вратиме на нула, кликнете на точната ставка:

По чистењето, нашите такси изгледаат вака (тие веќе чекаат распоредувачот да ги закаже):

Врски, куки и други променливи
Време е да го погледнеме следниот ДАГ, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Дали сите некогаш направиле ажурирање на извештајот? Ова е повторно таа: има список на извори од каде да се добијат податоците; има листа каде да се стави; не заборавајте да завивате кога сè се случило или пукнало (добро, ова не е за нас, не).
Ајде повторно да ја разгледаме датотеката и да ги погледнеме новите нејасни работи:
from commons.operators import TelegramBotSendMessage- ништо не не спречува да направиме сопствени оператори, што го искористивме правејќи мала обвивка за испраќање пораки до Unblocked. (Подолу ќе зборуваме повеќе за овој оператор);default_args={}- dag може да ги дистрибуира истите аргументи на сите свои оператори;to='{{ var.value.all_the_kings_men }}'- Полеtoнема да имаме хардкодирано, туку динамично генерирано со помош на Jinja и променлива со листа на е-пошта, која внимателно ја ставивAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— услов за стартување на операторот. Во нашиот случај, писмото ќе лета до газдите само ако сите зависности се решат успешно;tg_bot_conn_id='tg_main'- аргументиconn_idприфатете ги идентификаторите за поврзување што ги создавамеAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- пораките во Telegram ќе одлетаат само ако има паднати задачи;task_concurrency=1- забрануваме истовремено стартување на неколку примери на задачи од една задача. Во спротивно, ќе добиеме истовремено лансирање на неколкуVerticaOperator(гледајќи во една маса);report_update >> [email, tg]- сèVerticaOperatorсе спојуваат во испраќањето писма и пораки, како ова:

Но, бидејќи операторите на известувачите имаат различни услови за стартување, само еден ќе работи. Во приказот на дрвото, сè изгледа малку помалку визуелно:

Ќе кажам неколку зборови за макроа и нивните пријатели - променливи.
Макроата се заштитни места на Jinja кои можат да заменат различни корисни информации во аргументи на операторот. На пример, вака:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} ќе се прошири на содржината на контекстната променлива execution_date во формат YYYY-MM-DD: 2020-07-14. Најдобриот дел е што контекстуалните променливи се приковани на одреден пример на задача (квадрат во приказот на дрвото), а кога ќе се рестартираат, држачите на места ќе се прошират на истите вредности.
Доделените вредности може да се видат со помош на копчето Rendered на секој примерок на задача. Вака задачата со испраќање писмо:

И така на задачата со испраќање порака:

Комплетна листа на вградени макроа за најновата достапна верзија е достапна овде:
Згора на тоа, со помош на приклучоци, можеме да декларираме свои макроа, но тоа е друга приказна.
Покрај претходно дефинираните работи, можеме да ги замениме вредностите на нашите променливи (веќе го користев ова во кодот погоре). Ајде да создадеме внатре Admin/Variables неколку работи:

Сè што можете да користите:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Вредноста може да биде скалар, или може да биде и JSON. Во случај на JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}само користете ја патеката до саканиот клуч: {{ var.json.bot_config.bot.token }}.
Буквално ќе кажам еден збор и ќе покажам еден скриншот за соединенија. Сè е елементарно овде: на страницата Admin/Connections создаваме врска, ги додаваме нашите најавувања / лозинки и поспецифични параметри таму. Како ова:

Лозинките може да се шифрираат (потемелно од стандардното), или можете да го изоставите типот на врската (како што направив за tg_main) - факт е дека списокот на типови е харджичен во моделите Airflow и не може да се прошири без да се навлезе во изворните кодови (ако ненадејно не прогуглав нешто, поправете ме), но ништо нема да не спречи да добиеме кредити само со име.
Можете исто така да направите неколку врски со исто име: во овој случај, методот BaseHook.get_connection(), што ни добива врски по име, ќе даде случајно од неколку имењаци (пологично би било да се направи Round Robin, но да го оставиме тоа на совеста на развивачите на Airflow).
Променливите и врските се секако одлични алатки, но важно е да не се изгуби рамнотежата: кои делови од вашите текови ги складирате во самиот код и кои делови му ги давате на Airflow за складирање. Од една страна, може да биде погодно брзо да ја промените вредноста, на пример, поштенско сандаче, преку интерфејсот. Од друга страна, ова е сепак враќање на кликнувањето на глувчето, од кое ние (јас) сакавме да се ослободиме.
Работата со врски е една од задачите куки. Општо земено, куките за проток на воздух се точки за поврзување со услуги и библиотеки од трети страни. На пр. JiraHook ќе ни отвори клиент за да комуницираме со Jira (можете да ги преместувате задачите напред и назад), и со помош на SambaHook можете да туркате локална датотека до smb- точка.
Парсирање на прилагодениот оператор
И блиску дојдовме да погледнеме како е направен TelegramBotSendMessage
Код commons/operators.py со вистинскиот оператор:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Овде, како и сè друго во Airflow, сè е многу едноставно:
- Наследен од
BaseOperator, кој имплементира неколку работи специфични за протокот на воздух (погледнете го вашето слободно време) - Декларирани полиња
template_fields, во која Џинџа ќе бара макроа за обработка. - Подреди вистинските аргументи за
__init__(), поставете ги стандардните вредности каде што е потребно. - Не заборавивме ниту на иницијализацијата на предокот.
- Ја отвори соодветната кука
TelegramBotHookдобил клиентски објект од него. - Преземен (редефиниран) метод
BaseOperator.execute(), кој Airfow ќе го замрси кога ќе дојде време да се стартува операторот - во него ќе ја спроведеме главната акција, заборавајќи да се најавиме. (Ние се најавуваме, патем, веднашstdoutиstderr- Протокот на воздух ќе пресретне сè, ќе го завитка убаво, ќе го разложи каде што е потребно.)
Ајде да видиме што имаме commons/hooks.py. Првиот дел од датотеката, со самата кука:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientНе знам ни што да објаснам овде, само ќе ги забележам важните точки:
- Ние наследуваме, размислете за аргументите - во повеќето случаи тоа ќе биде еден:
conn_id; - Надминувачки стандардни методи: се ограничив
get_conn(), во која ги добивам параметрите за поврзување по име и само го добивам делотextra(ова е JSON поле), во кое јас (според моите упатства!) го ставам бот токенот Telegram:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Јас создавам пример од нашата
TelegramBot, давајќи му специфичен знак.
Тоа е се. Можете да добиете клиент од кука користејќи TelegramBotHook().clent или TelegramBotHook().get_conn().
И вториот дел од датотеката, во која правам микрообвивка за Telegram REST API, за да не го влечете истото за еден метод sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Правилниот начин е да се собере сето тоа:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- во додатокот, ставете во јавно складиште и дајте го на Open Source.
Додека го проучувавме сето ова, ажурирањата на нашите извештаи успеаја успешно да пропаднат и да ми испратат порака за грешка на каналот. Ќе проверам дали не е во ред...

Нешто пукна во нашиот дужд! Зарем тоа не го очекувавме? Точно!
Ќе истуриш?
Дали чувствувате дека нешто пропуштив? Изгледа вети дека ќе префрли податоци од SQL Server на Vertica, а потоа ги зеде и се оттргна од темата, ѓубрето!
Ова ѕверство беше намерно, едноставно морав да ти дешифрирам некоја терминологија. Сега можете да одите понатаму.
Нашиот план беше овој:
- Дали даг
- Генерирајте задачи
- Погледнете колку е се убаво
- Доделете броеви на сесии за пополнување
- Добијте податоци од SQL Server
- Ставете податоци во Вертика
- Собира статистика
Така, за сето ова да почне да функционира, направив мал додаток на нашата docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyТаму подигаме:
- Вертика како домаќин
dwhсо најмногу стандардни поставки, - три примери на SQL Server,
- ние ги пополнуваме базите на податоци во вторите со некои податоци (во никој случај не гледајте
mssql_init.py!)
Го лансираме сето добро со помош на малку покомплицирана команда од минатиот пат:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Она што го генерира нашиот чудотворен рандомизатор, можете да ја користите ставката Data Profiling/Ad Hoc Query:

Главната работа не е да им се покаже на аналитичарите
елаборираат на ETL сесии Нема, таму сè е тривијално: правиме основа, има знак во неа, завиткуваме сè со контекстуален менаџер и сега го правиме ова:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15сесија.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passВремето дојде собирајте ги нашите податоци од нашите сто и пол стотина маси. Ајде да го направиме ова со помош на многу непретенциозни линии:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Со помош на кука добиваме од Airflow
pymssql- поврзете - Ајде да замениме ограничување во форма на датум во барањето - тоа ќе биде фрлено во функцијата од моторот на шаблонот.
- Хранење на нашето барање
pandasкој ќе не добиеDataFrame- ќе ни биде од корист во иднина.
Јас користам замена
{dt}наместо параметар за барање%sне затоа што сум злобен Пинокио, туку затоа штоpandasне може да се справиpymssqlи го лизга последниотparams: Listиако навистина сакаtuple.
Исто така, забележете дека инвеститоротpymssqlодлучи повеќе да не го поддржува, и време е да се иселимеpyodbc.
Ајде да видиме со што Airflow ги наполнил аргументите на нашите функции:

Ако нема податоци, тогаш нема смисла да се продолжи. Но, исто така е чудно полнењето да се смета за успешно. Но, ова не е грешка. А-а-ах, што да правам?! А еве што:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException ќе му каже на Airflow дека нема грешки, но ние ја прескокнуваме задачата. Интерфејсот нема да има зелен или црвен квадрат, туку розов.
Ајде да ги фрлиме нашите податоци повеќе колони:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Имено
- Базата на податоци од која ги земавме нарачките,
- ID на нашата поплавна сесија (ќе биде различно за секоја задача),
- Хеш од изворот и ID на нарачка - така што во конечната база на податоци (каде што сè е преточено во една табела) имаме единствен ID на нарачка.
Останува претпоследниот чекор: истурете сè во Вертика. И, чудно е доволно, еден од најспектакуларните и најефикасните начини да го направите ова е преку CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Правиме специјален ресивер
StringIO. pandasљубезно ќе ги стави нашитеDataFrameво формаCSV- линии.- Ајде да отвориме врска со нашата омилена Вертика со кука.
- И сега со помош
copy()испратете ги нашите податоци директно до Вертика!
Ќе земеме од возачот колку линии се пополнети и ќе му кажеме на менаџерот на сесијата дека сè е во ред:
session.loaded_rows = cursor.rowcount
session.successful = TrueТоа е се.
На продажба, ние рачно ја креираме целната плоча. Еве си дозволив мала машина:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)користам
VerticaOperator()Јас креирам шема на база на податоци и табела (ако веќе не постојат, се разбира). Главната работа е правилно да ги распоредите зависностите:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadСумирање
- Па, - рече малото глувче, - нели, сега
Дали сте убедени дека јас сум најстрашното животно во шумата?
Џулија Доналдсон, Груфало
Мислам дека ако јас и моите колеги имавме конкуренција: кој брзо ќе создаде и ќе започне процес на ETL од нула: тие со нивниот SSIS и глушец и јас со Airflow ... И тогаш ќе ја споредиме и леснотијата на одржување ... Леле, мислам дека ќе се согласите дека ќе ги победам на сите фронтови!
Ако малку посериозно, тогаш Apache Airflow - со опишување процеси во форма на програмски код - ја заврши мојата работа многу поудобно и попријатно.
Неговата неограничена екстензивност, и во смисла на приклучоци и предиспозиција за приспособливост, ви дава можност да го користите Airflow во речиси секоја област: дури и во целиот циклус на собирање, подготовка и обработка на податоци, дури и при лансирање ракети (на Марс, на курс).
Конечниот дел, референца и информации
Греблото што го собравме за вас
start_date. Да, ова е веќе локален мем. Преку главниот аргумент на Дагstart_dateсите поминуваат. Накратко, ако наведете воstart_dateтековниот датум иschedule_interval- Еден ден, тогаш ДАГ ќе почне утре не порано.start_date = datetime(2020, 7, 7, 0, 1, 2)И нема повеќе проблеми.
Постои уште една грешка во времето на траење поврзана со неа:
Task is missing the start_date parameter, што најчесто покажува дека сте заборавиле да се поврзете со операторот даг.- Сите на една машина. Да, и бази (самиот проток на воздух и нашата обвивка), и веб-сервер, и распоредувач и работници. И дури и успеа. Но, со текот на времето, бројот на задачи за услугите растеше и кога PostgreSQL почна да одговара на индексот за 20 секунди наместо за 5 ms, ние го зедовме и го однесовме.
- Локален извршител. Да, ние сè уште седиме на него, а веќе дојдовме до работ на бездната. Досега ни беше доволен LocalExecutor, но сега е време да се прошириме со барем еден работник и ќе треба да работиме напорно за да се преселиме во CeleryExecutor. И со оглед на фактот дека можете да работите со него на една машина, ништо не ве спречува да користите Celery дури и на сервер, кој „се разбира, никогаш нема да влезе во производство, искрено!“
- Неупотреба вградени алатки:
- Врски за складирање на акредитиви за услуги,
- SLA промаши да одговори на задачи кои не успеале навреме,
- xcom за размена на метаподатоци (реков целподатоци!) помеѓу даг задачи.
- Злоупотреба на пошта. Па, што да кажам? Беа поставени предупредувања за сите повторувања на паднатите задачи. Сега мојот работен Gmail има повеќе од 90 илјади е-пошта од Airflow, а муцката за веб-пошта одбива да земе и избрише повеќе од 100 истовремено.
Повеќе стапици:
Повеќе алатки за автоматизација
За да работиме уште повеќе со глава, а не со раце, Airflow ни го подготви ова:
- - се уште има статус на Експериментал, што не го спречува да работи. Со него, вие не само што можете да добивате информации за дамки и задачи, туку и да запрете/започнете даг, да креирате DAG Run или базен.
- - многу алатки се достапни преку командната линија кои не само што се незгодни за користење преку WebUI, туку генерално ги нема. На пример:
backfillпотребни за рестартирање на примероци на задачи.
На пример, дојдоа аналитичари и рекоа: „А ти, другар, имаш глупости во податоците од 1 до 13 јануари! Поправете го, поправете го, поправете го, поправете го!" И вие сте таква рингла:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Основна услуга:
initdb,resetdb,upgradedb,checkdb. run, што ви овозможува да извршите една задача на пример, па дури и да дадете бод за сите зависности. Покрај тоа, можете да го извршите прекуLocalExecutor, дури и ако имате кластер Целер.- Го прави скоро истото
test, само исто така во бази не пишува ништо. connectionsовозможува масовно создавање на врски од школка.
- - прилично хардкор начин на интеракција, кој е наменет за приклучоци, а не рој во него со мали раце. Но, кој ќе не спречи да одиме
/home/airflow/dags, трчајipythonи да почнеш да се плеткаш? Можете, на пример, да ги извезете сите врски со следниов код:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Поврзување со метабазата на проток на воздух. Не препорачувам да пишувате на него, но добивањето состојби на задачи за различни специфични метрики може да биде многу побрзо и полесно отколку да користите некој од API-ите.
Да речеме дека не сите наши задачи се идемотентни, но понекогаш може да паднат и тоа е нормално. Но, неколку блокади се веќе сомнителни и би требало да се провери.
Пазете се SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
референци
И, се разбира, првите десет линкови од издавањето на Google се содржината на папката Airflow од моите обележувачи.
- - се разбира, мора да почнеме од канцеларијата. документација, но кој ги чита упатствата?
- - Па, барем прочитајте ги препораките од креаторите.
- - самиот почеток: корисничкиот интерфејс во слики
- - основните концепти се добро опишани, ако (одеднаш!) не сте разбрале нешто од мене.
- - краток водич за поставување кластер за проток на воздух.
- - речиси истиот интересен напис, освен можеби повеќе формализам, и помалку примери.
- — за работата во врска со целер.
- - за идемотентноста на задачите, вчитување по ID наместо датум, трансформација, структура на датотека и други интересни работи.
- - зависности на задачи и Trigger Rule, кои ги спомнав само попатно.
- - како да се надминат некои „работи како што е предвидено“ во распоредувачот, да се вчитаат изгубените податоци и да се даде приоритет на задачите.
- - корисни SQL прашања за метаподатоци за проток на воздух.
- - има корисен дел за создавање прилагоден сензор.
- — интересна кратка забелешка за изградба на инфраструктура на AWS за Data Science.
- - вообичаени грешки (кога некој сè уште не ги чита упатствата).
- - насмевнете се како луѓето користат патерици за складирање лозинки, иако можете само да користите Connections.
- - имплицитно препраќање на DAG, фрлање контекст во функциите, повторно за зависности, а исто така и за прескокнување на стартувања на задачи.
- - за употребата
default argumentsиparamsво шаблони, како и променливи и врски. - - приказна за тоа како планерот се подготвува за Airflow 2.0.
- - малку застарена статија за распоредување на нашиот кластер во
docker-compose. - - динамични задачи користејќи шаблони и проследување контекст.
- — стандардни и прилагодени известувања по пошта и Slack.
- - Задачи за разгранување, макроа и XCom.
И врските користени во статијата:
- - местенка достапни за употреба во шаблони.
- — Вообичаени грешки при креирање дамки.
- -
docker-composeза експериментирање, дебагирање и многу повеќе. - — Пајтон обвивка за Telegram REST API.
Извор: www.habr.com




