Салом, ман Дмитрий Логвиненко - муҳандиси маълумоти шӯъбаи таҳлили гурӯҳи ширкатҳои Vezet.
Ман ба шумо дар бораи як воситаи олиҷаноб барои таҳияи равандҳои ETL нақл мекунам - Apache Airflow. Аммо Airflow он қадар гуногунҷанба ва гуногунҷанба аст, ки шумо бояд онро бодиққат аз назар гузаронед, ҳатто агар шумо дар ҷараёни маълумот иштирок надоред, аммо лозим аст, ки давра ба давра ҳама гуна равандҳоро оғоз кунед ва иҷрои онҳоро назорат кунед.
Ва ҳа, ман на танҳо мегӯям, балки нишон медиҳам: барнома бисёр код, скриншотҳо ва тавсияҳо дорад.
Он чизеро, ки шумо одатан ҳангоми ҷустуҷӯи Google калимаи Airflow / Wikimedia Commons мебинед
Мундариҷа
Муқаддима
Apache Airflow ба монанди Django аст:
- дар python навишта шудааст
- як панели бузурги администратор вуҷуд дорад,
- ба таври номуайян васеъ карда мешавад
- танҳо беҳтар аст ва он барои мақсадҳои тамоман дигар сохта шудааст, яъне (чунон ки пеш аз кат навишта шудааст):
- иҷро ва мониторинги вазифаҳо дар шумораи номаҳдуди мошинҳо (чунон ки бисёре аз Celery / Kubernetes ва виҷдони шумо ба шумо имкон медиҳанд)
- бо тавлиди ҷараёни кории динамикӣ аз навиштан ва фаҳмидани рамзи Python хеле осон аст
- ва қобилияти пайваст кардани ҳама гуна пойгоҳи додаҳо ва API бо ҳам бо истифода аз ҷузъҳои тайёр ва плагинҳои хонагӣ (ки хеле содда аст).
Мо Apache Airflow-ро чунин истифода мебарем:
- мо маълумотро аз сарчашмаҳои гуногун ҷамъ меорем (бисёр мисолҳои SQL Server ва PostgreSQL, API-ҳои гуногун бо ченакҳои барнома, ҳатто 1C) дар DWH ва ODS (мо Vertica ва Clickhouse дорем).
- чи кадар пешкадам
cron
, ки равандҳои муттаҳидсозии маълумотро дар ODS оғоз мекунад ва инчунин нигоҳдории онҳоро назорат мекунад.
То ба наздикӣ эҳтиёҷоти моро як сервери хурд бо 32 ядро ва 50 ГБ хотираи оперативӣ қонеъ мекард. Дар Airflow, ин кор мекунад:
- более 200 дак (воқеан ҷараёнҳои корӣ, ки дар онҳо мо вазифаҳоро пур кардаем),
- дар ҳар як ба ҳисоби миёна 70 вазифа,
- ин некӣ оғоз мешавад (инчунин ба ҳисоби миёна) як маротиба дар як соат.
Ва дар бораи он ки мо чӣ гуна васеъ кардем, ман дар зер менависам, аммо ҳоло биёед мушкилоти über-ро муайян кунем, ки мо онро ҳал мекунем:
Се сервери аслии SQL мавҷуд аст, ки ҳар яки онҳо 50 пойгоҳи додаҳо доранд - намунаҳои як лоиҳа, мутаносибан, онҳо сохтори якхела доранд (қариб дар ҳама ҷо, муа-ха-ха), яъне ҳар кадоми онҳо ҷадвали фармоишҳо доранд (хушбахтона, ҷадвал бо он ном метавонад ба ҳама гуна тиҷорат тела дода шавад). Мо маълумотро тавассути илова кардани майдонҳои хидматрасонӣ (сервери манбаъ, пойгоҳи додаҳои манбаъ, ID вазифаи ETL) мегирем ва соддалавҳона онҳоро ба Vertica мепартоем.
Биёед биравем!
Қисми асосӣ, амалӣ (ва каме назариявӣ)
Чаро мо (ва шумо)
Вақте ки дарахтон калон буданд ва ман содда будам SQL
-schik дар як чаканаи Русия, мо бо истифода аз ду асбоби дастраси мо равандҳои ETL-ро, яъне ҷараёни маълумотро фиреб додем:
- Маркази энергетикии информатика - системаи бениҳоят паҳншаванда, бениҳоят самаранок, бо сахтафзори худ, версияи худ. Худо накунад 1% имконияташро истифода бурдам. Чаро? Хуб, пеш аз ҳама, ин интерфейс, дар ҷое аз солҳои 380-ум, рӯҳан ба мо фишор овард. Дуюм, ин зиддият барои равандҳои бениҳоят зебо, истифодаи такрории ҷузъҳои хашмгин ва дигар ҳиллаҳои хеле муҳими корхона пешбинӣ шудааст. Дар бораи он, ки он ба монанди боли Airbus AXNUMX дар як сол арзиш дорад, мо чизе намегӯем.
Эҳтиёт бошед, скриншот метавонад ба одамони то 30-сола каме осеб расонад
- Сервери ҳамгироии SQL Server — мо ин рафикро дар чараёни дохили-лоихавии худ истифода бурдем. Хуб, дар асл: мо аллакай SQL Server-ро истифода мебарем ва истифода накардани абзорҳои ETL-и он то андозае беасос мебуд. Ҳама чиз дар он хуб аст: ҳам интерфейс зебо аст ва ҳам гузоришҳои пешрафт ... Аммо аз ин сабаб нест, ки мо маҳсулоти нармафзорро дӯст медорем, оҳ, на барои ин. Версияи он
dtsx
(ки XML бо гиреҳҳои омехта дар нигоҳдорӣ аст) мо метавонем, аммо чӣ маъно дорад? Дар бораи сохтани бастаи вазифаҳо, ки садҳо ҷадвалҳоро аз як сервер ба сервери дигар кашола мекунанд, чӣ гуфтан мумкин аст? Бале, чй сад, ангушти ишорати ту аз бист порча меафтад, тугмаи мушро пахш мекунад. Аммо он бешубҳа мӯдтар ба назар мерасад:
Мо бешубҳа роҳҳои баромаданро ҷустуҷӯ кардем. Ҳолат ҳатто қариб ба генератори бастаи худнависи SSIS омад ...
...ва баъд маро кори нав пайдо кард. Ва Apache Airflow ба ман расид.
Вақте ки ман фаҳмидам, ки тавсифи раванди ETL рамзи оддии Python аст, ман танҳо аз шодӣ рақс накардам. Маҳз ҳамин тавр ҷараёнҳои додаҳо версия ва тафовут карда шуданд ва рехтани ҷадвалҳо бо сохтори ягона аз садҳо пойгоҳи додаҳо ба як ҳадаф дар якуним ё ду экрани 13 ”ба рамзи Python табдил ёфт.
Ҷамъоварии кластер
Биёед як кӯдакистонро комилан ташкил накунем ва дар ин ҷо дар бораи чизҳои комилан равшан сухан нагӯем, ба монанди насб кардани Airflow, пойгоҳи додаҳои интихобкардаи шумо, Карафс ва дигар ҳолатҳое, ки дар докҳо тавсиф шудаанд.
Барои он ки мо фавран таҷрибаҳоро оғоз кунем, ман нақша кашидам docker-compose.yml
ки дар он:
- Биёед воқеан баланд кунем Равиши ҳаво: Барномасоз, веб-сервер. Гул низ дар он ҷо барои назорат кардани вазифаҳои карафс чарх мезанад (зеро он аллакай ба он тела дода шудааст
apache/airflow:1.10.10-python3.7
, аммо мо зид нестем) - PostgreSQL, ки дар он Airflow маълумоти хидматрасонии худро (маълумоти банақшагирӣ, омори иҷро ва ғ.) менависад ва Celery вазифаҳои анҷомшударо қайд мекунад;
- Redis, ки ҳамчун брокери вазифа барои Celery амал мекунад;
- Коргари карафс, ки ба ичрои бевоситаи супоришхо машгул мешаванд.
- Ба папка
./dags
мо файлҳои худро бо тавсифи дагҳо илова мекунем. Онҳо дар магасе гирифта мешаванд, аз ин рӯ пас аз ҳар атса задан лозим нест, ки тамоми стекро ҷунглер кунед.
Дар баъзе ҷойҳо, коди мисолҳо пурра нишон дода нашудааст (барои он ки матнро парешон накунад), аммо дар ҷое он дар ҷараёни он тағир дода мешавад. Намунаҳои мукаммали коди кориро дар анбор пайдо кардан мумкин аст
https://github.com/dm-logv/airflow-tutorial .
Доктор-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- broker
Эзоҳ:
- Дар чамъоварии композиция ман бештар ба образи маъруф такя кардам
puckel / docker-ҳаво - ҳатман онро тафтиш кунед. Шояд шумо дар ҳаёти худ ба чизи дигаре ниёз надоред. - Ҳама танзимоти ҷараёни ҳаво на танҳо тавассути
airflow.cfg
, балки инчунин тавассути тағирёбандаҳои муҳити зист (ба шарофати таҳиягарон), ки ман бадқасдона аз онҳо истифода бурдам. - Табиист, ки он ба истеҳсолот омода нест: ман дидаву дониста набзи дилро ба контейнерҳо наовардам, ман бо амният ташвиш надидам. Аммо ман ҳадди ақали барои таҷрибаомӯзони мо мувофиқро кардам.
- Дар назар гиред, ки:
- Папкаи даг бояд ҳам ба нақшакаш ва ҳам барои коргарон дастрас бошад.
- Ин ба ҳамаи китобхонаҳои тарафи сеюм дахл дорад - ҳамаи онҳо бояд дар мошинҳо бо нақшакаш ва коргарон насб карда шаванд.
Хуб, ҳоло он оддӣ аст:
$ docker-compose up --scale worker=3
Пас аз ҳама чиз баланд мешавад, шумо метавонед ба интерфейсҳои веб нигаред:
- Равиши ҳаво:
http://127.0.0.1:8080/admin/ - Гул:
http://127.0.0.1:5555/dashboard
Мафҳумҳои асосӣ
Агар шумо дар ҳамаи ин "дагҳо" чизе нафаҳмида бошед, пас ин аст луғати кӯтоҳ:
- Scheduler - амаки муҳимтарин дар Airflow, ки назорат мекунад, ки роботҳо сахт кор кунанд, на шахс: ҷадвалро назорат мекунад, дагҳоро нав мекунад, вазифаҳоро оғоз мекунад.
Умуман, дар версияҳои кӯҳна, ӯ бо хотира мушкилот дошт (не, на амнезия, балки ихроҷ) ва параметри меросӣ ҳатто дар конфигуратсияҳо боқӣ монд.
run_duration
— фосилаи аз нав оғоз кардани он. Аммо ҳоло ҳама чиз хуб аст. - DAG (aka "dag") - "графики асикликии равонашуда", аммо чунин таъриф ба кам одамон нақл мекунад, аммо дар асл он як контейнер барои вазифаҳое мебошад, ки бо ҳамдигар ҳамкорӣ мекунанд (ба поён нигаред) ё аналоги Баста дар SSIS ва Workflow дар Informatica .
Илова ба дагҳо, метавонанд то ҳол зердастҳо бошанд, аммо мо эҳтимол ба онҳо намерасем.
- DAG Run - дагҳои ибтидоӣ, ки худаш таъин шудааст
execution_date
. Дагранҳои ҳамон даг метавонанд дар мувозӣ кор кунанд (агар шумо вазифаҳои худро idempotent карда бошед, албатта). - Оператор қисмҳои рамзҳо мебошанд, ки барои иҷрои амали мушаххас масъуланд. Се намуди операторҳо мавҷуданд:
- амалмисли дӯстдоштаи мо
PythonOperator
, ки метавонад ҳама гуна (эътибор) рамзи Python -ро иҷро кунад; - интиқол додан, ки маълумотро аз як ҷо ба ҷои дигар интиқол медиҳанд, мегӯянд,
MsSqlToHiveTransfer
; - санҷандаро аз тарафи дигар, он ба шумо имкон медиҳад, ки то ба амал омадани ҳодиса вокуниш нишон диҳед ё иҷрои минбаъдаи дагро суст кунед.
HttpSensor
метавонад нуқтаи ниҳоии муайяншударо кашад ва вақте ки посухи дилхоҳ интизор аст, интиқолро оғоз кунедGoogleCloudStorageToS3Operator
. Ақли пуртаҷриба мепурсад: «Чаро? Баъд аз ҳама, шумо метавонед такрорҳоро мустақиман дар оператор иҷро кунед! ” Ва он гоҳ, барои он ки ҳавзи вазифаҳоро бо операторони боздошташуда банд накунанд. Сенсор пеш аз кӯшиши навбатӣ оғоз меёбад, месанҷад ва мемирад.
- амалмисли дӯстдоштаи мо
- Вазифаи - операторони эълоншуда, сарфи назар аз намуд ва ба даг вобаста ба рутбаи вазифа пешбарӣ карда мешаванд.
- мисоли вазифа - вақте ки банақшагири генералӣ тасмим гирифт, ки вақти он расидааст, ки вазифаҳоро ба корбарон фиристем (дар ҳамон ҷо, агар мо истифода барем.
LocalExecutor
ё ба гирехи дурдаст дар суратиCeleryExecutor
), он ба онҳо контекст таъин мекунад (яъне маҷмӯи тағирёбандаҳо - параметрҳои иҷро), қолабҳои фармон ё дархостро васеъ мекунад ва онҳоро ба ҳам меорад.
Мо вазифаҳоро эҷод мекунем
Аввалан, биёед схемаи умумии догамонро шарҳ диҳем ва сипас мо ба тафсилот бештар ва бештар ғарқ мешавем, зеро мо баъзе ҳалли ночизро истифода мебарем.
Ҳамин тавр, дар соддатарин шакли худ, чунин даг чунин хоҳад буд:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Биёед онро муайян кунем:
- Аввалан, мо libs заруриро ворид мекунем ва чизи дигар;
sql_server_ds
ОёList[namedtuple[str, str]]
бо номҳои пайвастшавӣ аз Airflow Connections ва пойгоҳи додаҳое, ки мо таблиғи худро мегирем;dag
— эълони дагамон, ки бояд хатман дарglobals()
, вагарна Airflow онро намеёбад. Дуг инчунин бояд бигӯяд:- номи вай чист
orders
- он гоҳ ин ном дар интерфейси веб пайдо мешавад, - ки вай аз нисфи шаби хаштуми июль кор мекунад.
- ва он бояд тақрибан ҳар 6 соат кор кунад (барои бачаҳои сахтгир дар ин ҷо ба ҷои
timedelta()
қобили қабул астcron
-хат0 0 0/6 ? * * *
, барои камтар сард - ифодаи монанди@daily
);
- номи вай чист
workflow()
кори асосиро ичро мекунад, вале хозир не. Дар айни замон, мо танҳо контексти худро ба гузориш мепартоем.- Ва ҳоло ҷодуи оддии эҷоди вазифаҳо:
- мо аз сарчашмаҳои худ мегузарем;
- оғоз кардан
PythonOperator
, ки дамми моро ичро мекунадworkflow()
. Фаромӯш накунед, ки номи беназири (дар дохили даг) супориш ва бастани худи даг. Парчамprovide_context
дар навбати худ, ба функсия далелҳои иловагӣ мерезанд, ки мо бодиққат бо истифода аз он ҷамъ хоҳем кард**context
.
Ҳозирча ҳаммаси шу. Он чизе ки мо гирифтем:
- Даги нав дар интерфейси веб,
- якунимсад вазифа, ки дар баробари иҷро карда мешаванд (агар ҷараёни ҳаво, танзимоти карафс ва қобилияти сервер ба он имкон диҳанд).
Бале, қариб гирифтам.
Вобастагиҳоро кӣ насб мекунад?
Барои содда кардани ин ҳама, ман ба кор даромадам docker-compose.yml
коркард requirements.txt
дар ҳама гиреҳҳо.
Ҳоло он нест:
Майдонҳои хокистарӣ мисолҳои вазифаҳое мебошанд, ки аз ҷониби нақшакаш коркард мешаванд.
Мо каме интизор мешавем, вазифаҳоро коргарон ба миён меоранд:
Сабзавотхо, албатта, кори худро бомуваффакият анчом доданд. Сурхҳо чандон муваффақ нестанд.
Воқеан, дар маҳсулоти мо ягон папка нест
./dags
, дар байни мошинҳо синхронизатсия вуҷуд надорад - ҳама дагҳо дарgit
дар Gitlab мо ва Gitlab CI ҳангоми якҷояшавӣ навсозиро ба мошинҳо паҳн мекунадmaster
.
Каме дар бораи Гул
Хангоме ки коргарон сосикахои моро мезананд, биёед асбоби дигареро ба хотир орем, ки ба мо чизе нишон дода метавонад — Гул.
Саҳифаи аввал бо маълумоти мухтасар дар бораи гиреҳҳои корӣ:
Саҳифаи пурқувваттарин бо вазифаҳое, ки ба кор рафтаанд:
Саҳифаи дилгиркунанда бо мақоми брокери мо:
Саҳифаи дурахшонтарин бо графикҳои ҳолати вазифаҳо ва вақти иҷрои онҳост:
Мо борҳои камборро бор мекунем
Ҳамин тавр, ҳама вазифаҳо иҷро шуданд, шумо метавонед захмиёнро кашед.
Ва бисьёр ярадорон буданд — бо ин ё он сабаб. Дар сурати истифодаи дурусти Airflow, ин квадратҳо нишон медиҳанд, ки маълумот бешубҳа нарасидааст.
Ба шумо лозим аст, ки сабтро тамошо кунед ва мисолҳои вазифаи афтодаро аз нав оғоз кунед.
Бо пахш кардани ягон мураббаъ, мо амалҳои дастрасро мебинем:
Шумо метавонед бигиред ва Clear афтода. Яъне, мо фаромӯш мекунем, ки чизе дар он ҷо ноком шудааст ва ҳамон як вазифаи мисол ба нақшакаш меравад.
Маълум аст, ки ин кор бо муш бо тамоми квадратҳои сурх чандон инсондӯстона нест - ин чизе нест, ки мо аз Airflow интизорем. Табиист, ки мо силоҳҳои қатли ом дорем: Browse/Task Instances
Биёед ҳама чизро якбора интихоб кунем ва ба сифр барқарор кунем, банди дурустро клик кунед:
Пас аз тозакунӣ, таксиҳои мо чунин ба назар мерасанд (онҳо аллакай интизоранд, ки нақшакаш онҳоро ба нақша гирад):
Пайвастҳо, қалмоқҳо ва дигар тағирёбандаҳо
Вақти он расидааст, ки ба DAG оянда нигаред, update_reports.py
:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Оё ҳама ягон бор навсозии гузоришро анҷом додаанд? Ин боз вай аст: рӯйхати манбаъҳо вуҷуд дорад, ки маълумотро аз куҷо гирифтан мумкин аст; рӯйхат вуҷуд дорад, ки дар куҷо гузоштан лозим аст; Фаромӯш накунед, ки вақте ки ҳама чиз рӯй дод ё вайрон шуд (хуб, ин дар бораи мо нест, не).
Биёед бори дигар файлро аз назар гузаронем ва ба чизҳои нави норавшан назар андозем:
from commons.operators import TelegramBotSendMessage
- ҳеҷ чиз ба мо халал намерасонад, ки операторони худамонро созем, ки мо аз он бо сохтани як бастаи хурд барои фиристодани паёмҳо ба Unblocked бартарӣ гирифтем. (Дар бораи ин оператор дар зер бештар сӯҳбат хоҳем кард);default_args={}
- dag метавонад як далелҳоро ба ҳамаи операторони худ тақсим кунад;to='{{ var.value.all_the_kings_men }}'
- майдонto
мо коди сахт нахоҳем дошт, аммо бо истифода аз Jinja ва тағирёбанда бо рӯйхати мактубҳо, ки ман онҳоро бодиққат ворид мекунам, ба таври динамикӣ тавлид карда мешавад.Admin/Variables
;trigger_rule=TriggerRule.ALL_SUCCESS
— шарти ба кор андохтани оператор. Дар ҳолати мо, мактуб танҳо ба сардорон парвоз мекунад, агар ҳама вобастагӣ кор кунанд бомуваффакият;tg_bot_conn_id='tg_main'
- далелҳоconn_id
ID-ҳои пайвастшавӣ, ки мо дар он эҷод мекунем, қабул кунедAdmin/Connections
;trigger_rule=TriggerRule.ONE_FAILED
- паёмҳо дар Telegram танҳо дар сурати мавҷуд будани вазифаҳои афтода парвоз мекунанд;task_concurrency=1
- мо оғози ҳамзамон якчанд мисолҳои вазифаҳои як вазифаро манъ мекунем. Дар акси ҳол, мо дар як вақт ба кор андохтани якчанд даст меоремVerticaOperator
(ба як миз нигариста);report_update >> [email, tg]
- ҳамаVerticaOperator
дар фиристодани мактубҳо ва паёмҳо, ба монанди инҳо муттаҳид шавед:
Аммо азбаски операторони огоҳкунанда шартҳои гуногуни оғозёбӣ доранд, танҳо яке кор мекунад. Дар намуди дарахт, ҳама чиз каме визуалӣ ба назар мерасад:
Дар бораи он чанд сухан мегӯям макросҳо ва дӯстони онҳо - тағирёбандаҳо.
Макросҳо ҷойгузинҳои Ҷинжа мебошанд, ки метавонанд маълумоти гуногуни муфидро дар аргументҳои оператор иваз кунанд. Масалан, ба ин монанд:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }}
ба мундариҷаи тағирёбандаи контекст васеъ хоҳад шуд execution_date
дар формат YYYY-MM-DD
: 2020-07-14
. Беҳтарин қисми он аст, ки тағирёбандаҳои контекстӣ ба як мисоли вазифаи мушаххас мехкӯб карда мешаванд (як мураббаъ дар Намоиши дарахтон) ва ҳангоми аз нав оғоз кардани ҷойнишинҳо ба ҳамон арзишҳо васеъ мешаванд.
Арзишҳои таъиншударо бо истифода аз тугмаи Rendered дар ҳар як мисоли вазифа дидан мумкин аст. Вазифаи фиристодани мактуб чунин аст:
Ва ҳамин тавр дар вазифаи фиристодани паём:
Рӯйхати пурраи макросҳои дарунсохт барои версияи охирини дастрас дар ин ҷо дастрас аст:
Гузашта аз ин, бо ёрии плагинҳо, мо метавонем макросҳои худро эълон кунем, аммо ин як ҳикояи дигар аст.
Илова ба чизҳои пешакӣ муайяншуда, мо метавонем арзишҳои тағирёбандаҳои худро иваз кунем (ман инро аллакай дар коди боло истифода кардам). биёед дар Admin/Variables
чанд чиз:
Ҳама чизеро, ки шумо метавонед истифода баред:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')
Арзиш метавонад скаляр бошад ё он метавонад JSON бошад. Дар сурати JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}
танҳо роҳро ба калиди дилхоҳ истифода баред: {{ var.json.bot_config.bot.token }}
.
Ман аслан як калима мегӯям ва як скриншотро дар бораи он нишон медиҳам пайвастагиҳо. Дар ин ҷо ҳама чиз ибтидоӣ аст: дар саҳифа Admin/Connections
мо пайваст эҷод мекунем, логинҳо / паролҳо ва параметрҳои мушаххаси худро дар он ҷо илова мекунем. Монанди ин:
Паролҳоро метавон рамзгузорӣ кард (назар ба пешфарз амиқтар) ё шумо метавонед намуди пайвастро тарк кунед (чунон ки ман барои он кардам tg_main
) - далел дар он аст, ки рӯйхати намудҳо дар моделҳои Airflow пайваст карда шудаанд ва онҳоро бидуни ворид шудан ба кодҳои сарчашма васеъ кардан мумкин нест (агар ногаҳон ман чизеро дар Google нагирифта бошам, лутфан маро ислоҳ кунед), аммо ҳеҷ чиз моро аз гирифтани кредит бозмедорад. ном.
Шумо инчунин метавонед якчанд пайвастҳоро бо як ном созед: дар ин ҳолат, усул BaseHook.get_connection()
, ки ба мо пайвандҳо бо номи меорад, хоҳад дод тасодуфӣ аз якчанд номҳо (сохтани Round Robin мантиқтар мебуд, аммо биёед онро ба виҷдони таҳиягарони Airflow гузорем).
Тағйирёбандаҳо ва Пайвастшавӣ албатта абзорҳои олӣ мебошанд, аммо муҳим аст, ки мувозинатро аз даст надиҳед: кадом қисмҳои ҷараёнҳои худро шумо дар худи код нигоҳ медоред ва кадом қисмҳоро ба Airflow барои нигоҳдорӣ медиҳед. Аз як тараф, метавонад ба зудӣ тағир додани арзиш, масалан, қуттии почтаи электронӣ тавассути UI қулай бошад. Аз тарафи дигар, ин ҳанӯз бозгашт ба клики муш аст, ки мо (ман) аз он халос шудан мехостем.
Кор бо пайвандҳо яке аз вазифаҳост қалмоқҳо. Умуман, қалмоқҳои Airflow нуқтаҳои барои пайваст кардани он ба хидматҳо ва китобхонаҳои тарафи сеюм мебошанд. Масалан, JiraHook
барои муошират бо Jira муштарӣ мекушояд (шумо метавонед вазифаҳоро ба пеш ва пас интиқол диҳед) ва бо ёрии SambaHook
шумо метавонед файли маҳаллиро ба smb
-нуқта.
Таҳлили оператори фармоишӣ
Ва мо наздик шудем, ки бубинем, ки он чӣ гуна сохта шудааст TelegramBotSendMessage
рамз commons/operators.py
бо оператори воқеӣ:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Дар ин ҷо, мисли ҳама чизи дигар дар Airflow, ҳама чиз хеле содда аст:
- Аз мерос гирифта шудааст
BaseOperator
, ки як қатор чизҳои мушаххаси ҷараёни ҳаворо амалӣ мекунад (ба вақти истироҳати худ нигаред) - Майдонҳои эълоншуда
template_fields
, ки дар он Ҷинжа барои коркарди макросҳо ҷустуҷӯ хоҳад кард. - Далелҳои дурустро барои
__init__()
, дар ҷои зарурӣ пешфарзҳоро муқаррар кунед. - Мо дар бораи ибтидои ниёгон низ фаромӯш накардаем.
- Қуттии мувофиқро кушод
TelegramBotHook
аз он объекти муштариро гирифт. - Усули аз нав муайяншуда (аз нав муайяншуда).
BaseOperator.execute()
, ки Airfow вақте ки вақти ба кор андохтани оператор фаро мерасад, қаҳва мекунад - дар он мо амали асосиро иҷро карда, ворид шуданро фаромӯш мекунем. (Дар омади гап, мо ворид мешавемstdout
иstderr
- Ҷараёни ҳаво ҳама чизро дарбар мегирад, онро зебо печонед ва дар ҷои зарурӣ онро таҷзия мекунад.)
Биёед бубинем, ки мо чӣ дорем commons/hooks.py
. Қисми якуми файл бо худи қалмоқ:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Ман ҳатто намедонам, ки дар ин ҷо чӣ шарҳ диҳам, ман танҳо нуктаҳои муҳимро қайд мекунам:
- Мо мерос мегирем, дар бораи далелҳо фикр мекунем - дар аксари ҳолатҳо он як хоҳад буд:
conn_id
; - Бартарии усулҳои стандартӣ: Ман худамро маҳдуд кардам
get_conn()
, ки дар он ман параметрҳои пайвастшавӣ аз рӯи ном ва танҳо ба даст фаслиextra
(ин майдони JSON аст), ки дар он ман (тибқи дастурҳои худам!) аломати боти Telegram -ро гузоштам:{"bot_token": "YOuRAwEsomeBOtToKen"}
. - Ман як мисоли моро эҷод мекунам
TelegramBot
, додани он аломати мушаххас.
Ҳамааш ҳамин. Шумо метавонед муштариро аз қалмоқ истифода баред TelegramBotHook().clent
ё TelegramBotHook().get_conn()
.
Ва қисми дуюми файл, ки дар он ман барои API Telegram REST микропрепарат месозам, то ҳамон чизеро кашола накунам. python-telegram-bot
sendMessage
.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))
Роҳи дуруст ин аст, ки ҳама чизро илова кунед:
TelegramBotSendMessage
,TelegramBotHook
,TelegramBot
- дар плагин, дар як анбори ҷамъиятӣ ҷойгир кунед ва онро ба кушодаасос диҳед.
Ҳангоме ки мо ин ҳамаро меомӯзем, навсозии гузориши мо муяссар шуд, ки бомуваффақият ноком шаванд ва дар канал ба ман паёми хато фиристанд. Ман меравам тафтиш кунам, ки оё ин нодуруст аст ...
Дар доги мо чизе шикаст! Оё ин чизе нест, ки мо интизор будем? Айнан!
Шумо рехтед?
Оё шумо ҳис мекунед, ки ман чизеро аз даст додаам? Чунин ба назар мерасад, ки вай ваъда дода буд, ки маълумотро аз SQL Server ба Vertica интиқол медиҳад ва сипас онро гирифта, аз мавзӯъ дур шуд, харом!
Ин ваҳшиёна қасдан буд, ман маҷбур будам, ки барои шумо баъзе истилоҳотро ифшо кунам. Акнун шумо метавонед бештар равед.
Нақшаи мо чунин буд:
- Кунед
- Эҷоди вазифаҳо
- Бубинед, ки ҳама чиз чӣ қадар зебост
- Рақамҳои сессияро барои пур кардан таъин кунед
- Маълумотро аз SQL Server гиред
- Маълумотро ба Vertica гузоред
- Ҷамъоварии омор
Ҳамин тавр, барои ба кор даровардани ин ҳама, ман ба мо як каме илова кардам docker-compose.yml
:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.py
Дар он ҷо мо баланд мекунем:
- Vertica ҳамчун мизбон
dwh
бо танзимоти пешфарзтарин, - се мисоли SQL Server,
- мо пойгоҳи додаҳои охиринро бо баъзе маълумотҳо пур мекунем (ба ҳеҷ ваҷҳ ба
mssql_init.py
!)
Мо ҳама чизҳои хубро бо ёрии фармони каме мураккабтар аз дафъаи гузашта оғоз мекунем:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Он чизе ки рандомизатори мӯъҷизаи мо тавлид кардааст, шумо метавонед ашёро истифода баред Data Profiling/Ad Hoc Query
:
Муҳим он аст, ки онро ба таҳлилгарон нишон надиҳед
муфассалтар кунед Сеансҳои ETL Ман намехоҳам, дар он ҷо ҳама чиз ночиз аст: мо пойгоҳ месозем, дар он нишона ҳаст, мо ҳама чизро бо менеҷери контекст мепӯшем ва ҳоло ин корро мекунем:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Вакти он расидааст маълумоти моро ҷамъ кунед аз якунимсад мизи мо. Биёед инро бо ёрии хатҳои хеле содда иҷро кунем:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
- Бо ёрии қалмоқе, ки мо аз Airflow мегирем
pymssql
-пайваст кунед - Биёед маҳдудиятро дар шакли сана дар дархост иваз кунем - он аз ҷониби муҳаррики шаблон ба функсия партофта мешавад.
- Дархости мо
pandas
кй моро мегирадDataFrame
— дар оянда ба мо фоиданок мешавад.
Ман ивазкуниро истифода мебарам
{dt}
ба ҷои параметри дархост%s
на аз он сабаб, ки ман Пиноккио бад ҳастам, балки аз он сабабpandas
аз ӯҳдаи худ гирифта наметавонадpymssql
ва охиринашро мепартоядparams: List
гарчанде ки вай дар хакикат мехохадtuple
.
Инчунин қайд кунед, ки таҳиякунандаpymssql
тасмим гирифт, ки дигар ӯро дастгирӣ накунад ва вақти он расидааст, ки берун раведpyodbc
.
Биёед бубинем, ки Airflow далелҳои функсияҳои моро бо чӣ пур кардааст:
Агар ягон маълумот мавҷуд набошад, пас идома додан маъно надорад. Аммо ин ҳам аҷиб аст, ки пур кардани онро бомуваффақият ҳисоб кунед. Аммо ин хато нест. А-а-а, чй бояд кард?! Ва ин аст он чизе:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException
ба Airflow мегӯяд, ки ҳеҷ гуна хатогӣ вуҷуд надорад, аммо мо супоришро мегузаронем. Интерфейс майдони сабз ё сурх надорад, аммо гулобӣ.
Биёед маълумоти худро партоем сутунҳои сершумор:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])
Яъне:
- Махзани маълумоте, ки мо аз он фармоиш гирифтаем,
- ID-и сессияи обхезии мо (он гуногун хоҳад буд барои ҳар як вазифа),
- Хеш аз манбаъ ва ID фармоиш - ба тавре ки дар базаи ниҳоӣ (дар он ҷо ҳама чиз ба як ҷадвал рехта мешавад) мо ID-и фармоишии беназир дорем.
Қадами охирин боқӣ мемонад: ҳама чизро ба Vertica резед. Ва, аҷиб аст, ки яке аз роҳҳои ҷолибтарин ва самараноки ин тавассути CSV аст!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
- Мо приёмникхои махсус тайёр карда истодаем
StringIO
. pandas
моро бо эхтиром мегузорадDataFrame
дар шаклиCSV
-хатҳо.- Биёед бо қалмоқ ба Vertica дӯстдоштаамон пайваст шавем.
- Ва ҳоло бо кӯмаки
copy()
маълумоти моро бевосита ба Вертика фиристед!
Мо аз ронанда чанд сатр пур кардашударо мегирем ва ба мудири сеанс мегӯем, ки ҳамааш хуб аст:
session.loaded_rows = cursor.rowcount
session.successful = True
Ҳамааш ҳамин.
Дар фурӯш, мо лавҳаи ҳадафро дастӣ эҷод мекунем. Дар ин ҷо ман ба худ як мошини хурд иҷозат додам:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
Ман истифода мебарам
VerticaOperator()
Ман схемаи пойгоҳи додаҳо ва ҷадвал эҷод мекунам (агар онҳо аллакай вуҷуд надошта бошанд, албатта). Хӯроки асосии он аст, ки дуруст ба танзим даровардани вобастагӣ:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> load
Натиҷа
— Хайр, — гуфт муши хурдакак, — не, холо
Оё шумо боварӣ доред, ки ман даҳшатноктарин ҳайвони ҷангал ҳастам?
Ҷулия Доналдсон, The Gruffalo
Ман фикр мекунам, ки агар ман ва ҳамкасбони ман рақобат дошта бошем: кӣ раванди ETL-ро аз сифр зуд эҷод мекунад ва оғоз мекунад: онҳо бо SSIS ва муш ва ман бо Airflow ... Ва он гоҳ мо инчунин осонии нигоҳубинро муқоиса мекардем ... Вой, ман фикр мекунам, ки шумо розӣ мешавед, ки ман онҳоро дар ҳама ҷабҳаҳо мезанам!
Агар каме ҷиддӣтар бошад, пас Apache Airflow - бо тавсифи равандҳо дар шакли коди барнома - кори маро иҷро кард хеле зиёд бароҳаттар ва лаззатбахштар.
Тавсеаи номаҳдуди он ҳам аз ҷиҳати плагинҳо ва ҳам майл ба миқёспазирӣ ба шумо имкон медиҳад, ки Airflow-ро тақрибан дар ҳама соҳаҳо истифода баред: ҳатто дар давраи пурраи ҷамъоварӣ, омодасозӣ ва коркарди маълумот, ҳатто ҳангоми сар додани мушакҳо (ба Миррих, курс).
Қисми ниҳоӣ, маълумотнома ва маълумот
Раке, ки мо барои шумо ҷамъ овардаем
start_date
. Бале, ин аллакай як хотираи маҳаллӣ аст. Тавассути далели асосии Дугstart_date
ҳама мегузарад. Хулоса, агар шумо дарstart_date
санаи ҷорӣ ваschedule_interval
— як руз, баъд ДАГ пагох на пештар сар мешавад.start_date = datetime(2020, 7, 7, 0, 1, 2)
Ва дигар мушкилот нест.
Боз як хатои вақти корӣ бо он алоқаманд аст:
Task is missing the start_date parameter
, ки аксар вақт нишон медиҳад, ки шумо пайваст кардани операторро фаромӯш кардаед.- Ҳама дар як мошин. Бале, ва пойгоҳҳо (худи Airflow ва қабати мо) ва веб-сервер, нақшакаш ва коргарон. Ва он ҳатто кор мекард. Аммо бо гузашти вақт, шумораи вазифаҳо барои хидматҳо афзоиш ёфт ва вақте ки PostgreSQL ба индекс дар 20 сония ба ҷои 5 мс посух дод, мо онро гирифта бурдем.
- Иҷрокунандаи маҳаллӣ. Бале, мо хануз дар болои он нишастаем ва аллакай ба лаби варта расидаем. LocalExecutor то ҳол барои мо кофӣ буд, аммо ҳоло вақти он расидааст, ки ҳадди аққал як коргарро васеъ кунем ва мо бояд сахт меҳнат кунем, то ба CeleryExecutor гузарем. Ва бо назардошти он, ки шумо метавонед бо он дар як мошин кор кунед, ҳеҷ чиз шуморо аз истифодаи Celery ҳатто дар сервер, ки "албатта, ҳеҷ гоҳ ба истеҳсолот ворид намешавад, ростқавлона!"
- Истифодаи ғайриманқул асбобҳои дарунсохт:
- Пайвастшавӣ барои нигоҳ доштани маълумоти хидматӣ,
- Садақаҳои SLA ҷавоб додан ба вазифаҳое, ки сари вақт иҷро нашуданд,
- xcom барои мубодилаи метамаълумот (ман гуфтам ҳадафмаълумот!) байни вазифахои даг.
- сӯиистифода аз почта. Хуб, ман чӣ гуфта метавонам? Огоҳӣ барои ҳама такрори вазифаҳои афтода муқаррар карда шуд. Ҳоло дар Gmail кори ман зиёда аз 90 ҳазор мактубҳои Airflow дорад ва мозаи почтаи интернетӣ аз гирифтан ва нест кардани зиёда аз 100 дар як вақт худдорӣ мекунад.
Мушкилоти бештар:
Pitfails Airflow Apache
Воситаҳои автоматикунонии бештар
Барои он ки мо бо сари худ бештар кор кунем, на бо дастҳоямон, Airflow барои мо инро омода кардааст:
оромии API — то хол макоми Эксперименталиро дорад, ки ин ба кор халал намерасонад. Бо он, шумо метавонед на танҳо дар бораи дагҳо ва вазифаҳо маълумот гиред, балки инчунин дагро қатъ/оғоз кунед, DAG Run ё ҳавз эҷод кунед.CLI - асбобҳои зиёде тавассути сатри фармон дастрасанд, ки истифодаи онҳо тавассути WebUI на танҳо нороҳат аст, балки умуман вуҷуд надоранд. Барои намуна:backfill
барои аз нав оғоз кардани мисолҳои вазифа лозим аст.
Масалан, тахлилгарон омада гуфтанд: «Ва шумо, рафикон, дар маълумотхои аз 1 то 13 январь сафсатае доред! Ислоҳ кунед, ислоҳ кунед, ислоҳ кунед, ислоҳ кунед! ” Ва шумо чунин як ошпаз ҳастед:airflow backfill -s '2020-01-01' -e '2020-01-13' orders
- Хадамоти базавӣ:
initdb
,resetdb
,upgradedb
,checkdb
. run
, ки ба шумо имкон медиҳад, ки як вазифаи мисолиро иҷро кунед ва ҳатто дар ҳама вобастагӣ холҳоро ба даст оред. Илова бар ин, шумо метавонед онро тавассутиLocalExecutor
, ҳатто агар шумо кластери карафс дошта бошед.- Қариб ҳамон корро мекунад
test
, танҳо дар базаҳо чизе наменависад. connections
имкон медихад, ки аз пнёх ба таври оммавй алокахо барпо карда шаванд.
API Python - як роҳи хеле сахти мутақобила, ки барои плагинҳо пешбинӣ шудааст ва дар он бо дастони хурд гирд намеояд. Аммо кй моро аз рафтан бозмедорад/home/airflow/dags
, давиданipython
ва сар задан дар атрофи? Шумо метавонед, масалан, ҳама пайвастҳоро бо рамзи зерин содир кунед:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d)
- Пайвастшавӣ ба метабазаи Airflow. Ман ба он навиштанро тавсия намедиҳам, аммо гирифтани ҳолати вазифаҳо барои ченакҳои мушаххас метавонад нисбат ба истифодаи ҳама гуна APIҳо хеле зудтар ва осонтар бошад.
Биёед бигӯем, ки на ҳама вазифаҳои мо беэътиноӣ мекунанд, аммо онҳо баъзан метавонанд афтоданд ва ин муқаррарӣ аст. Аммо якчанд блокҳо аллакай шубҳаноканд ва тафтиш кардан лозим аст.
SQL-ро эҳтиёт кунед!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
мурожиат
Ва албатта, даҳ истиноди аввал аз нашри Google мундариҷаи ҷузвдони Airflow аз хатчӯбҳои ман мебошанд.
Ҳуҷҷатҳои ҷараёни ҳавоии Apache — Албатта, мо бояд аз идора сар кунем. ҳуҷҷатҳо, аммо кй дастурҳоро мехонад?Беҳтарин таҷрибаҳо — Хайр, акаллан тавсияхои эчодкоронро хонед.UI Airflow - ибтидо: интерфейси корбар дар тасвирҳоФаҳмидани мафҳумҳои асосии Apache Airflow - мафҳумҳои асосӣ хуб тавсиф шудаанд, агар (ногаҳон!) Шумо аз ман чизе нафаҳмидед.Блоги Тианлонг - Роҳнамо дар бораи чӣ гуна сохтани сервер/кластери ҷараёни ҳаво - дастури кӯтоҳ барои таъсиси кластери Airflow.Иҷрои ҷараёни ҳавоии Apache дар Lyft — кариб хамон маколаи шавковар, ба чуз шояд формализм бештар ва мисолхои кам.Чӣ тавр Apache Airflow корҳоро дар коргарони карафс тақсим мекунад — дар бораи кори якчоя бо Кельдерей.Таҷрибаҳои беҳтарини навиштани DAG дар Apache Airflow - дар бораи номутаносибии вазифаҳо, боркунӣ аз рӯи ID ба ҷои сана, табдил, сохтори файл ва дигар чизҳои ҷолиб.Идоракунии вобастагӣ дар Apache Airflow - вобастагии вазифаҳо ва Қоидаи триггер, ки ман танҳо дар гузашта зикр кардам.Ҷараёни ҳаво: Вақте ки DAG шумо аз ҷадвал дур аст - чӣ гуна бартараф кардани баъзе "корҳо" -ро дар нақшакаш, бор кардани маълумоти гумшуда ва афзалият додани вазифаҳо.Дархостҳои муфиди SQL барои Apache Airflow - дархостҳои муфиди SQL ба метамаълумоти Airflow.Ба таҳияи ҷараёнҳои корӣ бо Apache Airflow оғоз кунед - як бахши муфид дар бораи эҷоди сенсори фармоишӣ вуҷуд дорад.Сохтани Fetchr Data Science Infra дар AWS бо Presto ва Airflow - ёддошти кӯтоҳи ҷолиб дар бораи сохтани инфрасохтор дар AWS for Data Science.7 Хатоҳои умумӣ барои тафтиш ҳангоми ислоҳи DAGs ҷараёни ҳаво - хатогиҳои умумӣ (вақте ки касе то ҳол дастурҳоро нахондааст).Бо истифода аз Apache Airflow паролро нигоҳ доред ва дастрас кунед - табассум кунед, ки чӣ тавр одамон гузарвожаҳоро нигоҳ медоранд, гарчанде ки шумо метавонед танҳо Пайвасткунакҳоро истифода баред.Зен аз Python ва Apache Airflow - интиқоли номуайяни DAG, интиқоли контекст дар функсияҳо, боз дар бораи вобастагӣ ва инчунин дар бораи гузариш аз оғози кор.Ҷараёни ҳаво: Маслиҳатҳо, ҳилаҳо ва таҷрибаҳои камтар маълум - дар бораи истифодаdefault arguments
иparams
дар қолабҳо, инчунин Тағирёбандаҳо ва Пайвастшавӣ.Профили банақшагирии ҷараёни ҳаво - ҳикоя дар бораи чӣ гуна банақшагири ба Airflow 2.0 омодагӣ мегирад.Apache Airflow бо 3 коргари карафс дар docker-compose - мақолаи каме кӯҳнашуда дар бораи ҷойгиркунии кластери мо дарdocker-compose
.4 Шаблонсозии вазифаҳо бо истифода аз контексти ҷараёни ҳаво - вазифаҳои динамикӣ бо истифода аз қолабҳо ва интиқоли контекст.Огоҳиҳои хатогӣ дар ҷараёни ҳаво — огоҳиномаҳои стандартӣ ва фармоишӣ тавассути почта ва Slack.Семинари ҷараёни ҳаво: DAG-ҳои мураккаб бидуни асобаҳо - Вазифаҳои филиалҳо, макросҳо ва XCom.
Ва истинодҳое, ки дар мақола истифода шудаанд:
истинод ба макро - ҷойнишинҳое, ки барои истифода дар қолабҳо дастрасанд.Мушкилоти умумӣ - ҷараёни ҳаво — Хатохои маъмул хангоми сохтани дагхо.puckel / docker-hava: Docker Apache Airflow -docker-compose
барои таҷриба, ислоҳ ва ғайра.python-telegram-bot/python-telegram-bot: Мо ба шумо як парпеч сохтем, ки шумо рад карда наметавонед — Сарпӯши Python барои API Telegram REST.
Манбаъ: will.com