Hi, jien Dmitry Logvinenko - Inġinier tad-Data tad-Dipartiment tal-Analitika tal-grupp ta 'kumpaniji Vezet.
Jien ngħidlek dwar għodda mill-isbaħ għall-iżvilupp ta 'proċessi ETL - Apache Airflow. Iżda Airflow huwa tant versatili u multidimensjonali li għandek tagħti ħarsa aktar mill-qrib lejha anki jekk m'intix involut fi flussi tad-dejta, iżda jkollok bżonn li perjodikament tniedi kwalunkwe proċess u tissorvelja l-eżekuzzjoni tagħhom.
U iva, mhux biss ngħid, imma nuri wkoll: il-programm għandu ħafna kodiċi, screenshots u rakkomandazzjonijiet.

Dak li s-soltu tara meta tpoġġi fuq Google l-kelma Airflow / Wikimedia Commons
Tabella tal-kontenut
Introduzzjoni
Apache Airflow huwa bħal Django:
- miktuba bil-python
- hemm panel amministrattiv kbir,
- tespandi indefinittivament
- aħjar biss, u sar għal skopijiet kompletament differenti, jiġifieri (kif miktub qabel il-kata):
- it-tmexxija u l-monitoraġġ tal-kompiti fuq numru illimitat ta 'magni (kif ħafna Karfus / Kubernetes u l-kuxjenza tiegħek jippermettulek)
- bil-ġenerazzjoni tal-fluss tax-xogħol dinamiku minn faċli ħafna biex tikteb u tifhem il-kodiċi Python
- u l-abbiltà li tikkonnettja kwalunkwe databases u APIs ma 'xulxin billi tuża kemm komponenti lesti kif ukoll plugins magħmulin mid-dar (li huwa estremament sempliċi).
Aħna nużaw Apache Airflow bħal dan:
- aħna niġbru data minn sorsi varji (ħafna istanzi ta 'SQL Server u PostgreSQL, diversi APIs b'metriċi ta' applikazzjoni, anke 1C) f'DWH u ODS (għandna Vertica u Clickhouse).
- kemm avvanzat
cron, li jibda l-proċessi ta' konsolidazzjoni tad-dejta fuq l-ODS, u jimmonitorja wkoll il-manutenzjoni tagħhom.
Sa ftit ilu, il-bżonnijiet tagħna kienu koperti minn server żgħir wieħed bi 32 core u 50 GB RAM. Fl-Airflow, dan jaħdem:
- aktar 200 dag (fil-fatt flussi tax-xogħol, li fihom nimtlew kompiti),
- f'kull medja 70 biċċa xogħol,
- din it-tjubija tibda (ukoll bħala medja) darba fis-siegħa.
U dwar kif espanna, se nikteb hawn taħt, imma issa ejja niddefinixxu l-über-problema li se nsolvu:
Hemm tliet SQL Servers oriġinali, kull wieħed b'50 database - każijiet ta 'proġett wieħed, rispettivament, għandhom l-istess struttura (kważi kullimkien, mua-ha-ha), li jfisser li kull wieħed għandu tabella tal-Ordnijiet (fortunatament, tabella b'dik isem jista 'jiġi push fi kwalunkwe negozju). Aħna nieħdu d-dejta billi nżidu oqsma tas-servizz (server tas-sors, database tas-sors, ID tal-kompitu ETL) u b'mod naiv nitfgħuhom, ngħidu aħna, Vertica.
Ejja ħa mmorru!
Il-parti prinċipali, prattika (u ftit teoretika)
Għaliex aħna (u int)
Meta s-siġar kienu kbar u kont sempliċi SQL-schik f'bejgħ bl-imnut Russu wieħed, aħna scammed proċessi ETL aka flussi tad-dejta bl-użu ta 'żewġ għodod disponibbli għalina:
- Ċentru tal-Enerġija tal-Informatika - sistema li tinfirex ħafna, estremament produttiva, bil-ħardwer tagħha stess, il-verżjoni tagħha stess. Jien użajt Alla jipprojbixxi 1% tal-kapaċitajiet tiegħu. Għaliex? Ukoll, l-ewwelnett, din l-interface, x'imkien mis-snin 380, mentalment poġġiet pressjoni fuqna. It-tieni nett, dan il-kontraption huwa ddisinjat għal proċessi estremament fancy, użu mill-ġdid ta 'komponenti furious u tricks oħra importanti ħafna ta' intrapriża. Dwar il-fatt li tiswa, bħall-ġwienaħ tal-Airbus AXNUMX / sena, mhux se ngħidu xejn.
Oqgħod attent, screenshot jista 'jweġġa' ftit nies taħt it-30

- SQL Server Integrazzjoni Server - użajna dan il-kompa fil-flussi intra-proġett tagħna. Ukoll, fil-fatt: aħna diġà nużaw SQL Server, u jkun b'xi mod irraġonevoli li ma tużax l-għodod ETL tagħha. Kollox fih huwa tajjeb: kemm l-interface hija sabiħa, kif ukoll ir-rapporti tal-progress ... Iżda dan mhux għaliex inħobbu l-prodotti tas-softwer, oh, mhux għal dan. Verżjoni dan
dtsx(li huwa XML b'nodes shuffled fuq issalva) nistgħu, imma x'inhu l-punt? Kif dwar li tagħmel pakkett ta 'kompitu li se jkaxkru mijiet ta' tabelli minn server għal ieħor? Iva, liema mija, subgħajk l-indiċi se taqa 'minn għoxrin biċċa, tikklikkja fuq il-buttuna tal-maws. Imma żgur tidher aktar moda:
Żgur li fittixna modi ta’ ħruġ. Kawża anke kważi wasal għal ġeneratur tal-pakkett SSIS li nkiteb waħdu...
…u mbagħad sabni impjieg ġdid. U Apache Airflow qabeżni fuqha.
Meta sibt li d-deskrizzjonijiet tal-proċess ETL huma kodiċi Python sempliċi, jien biss ma żfinx għall-ferħ. Dan huwa kif il-flussi tad-dejta ġew verżjoni u differenti, u t-tferrigħ ta 'tabelli bi struttura waħda minn mijiet ta' databases f'mira waħda saret kwistjoni ta 'kodiċi Python fi skrins wieħed u nofs jew żewġ 13 ".
Assemblaġġ tal-cluster
Ejja ma nirranġawx kindergarten kompletament, u ma nitkellmux dwar affarijiet kompletament ovvji hawn, bħall-installazzjoni tal-Airflow, id-database magħżula tiegħek, Karfus u każijiet oħra deskritti fil-baċiri.
Sabiex inkunu nistgħu immedjatament nibdew esperimenti, abbozzajt docker-compose.yml F'liema:
- Ejja fil-fatt ngħollu Fluss tal-arja: Scheduler, Webserver. Fjura se tkun qed iddur hemm ukoll biex tissorvelja l-kompiti tal-karfus (għax diġà ġiet imbuttata fiha
apache/airflow:1.10.10-python3.7, imma ma niddejqux) - PostgreSQL, li fiha Airflow se tikteb l-informazzjoni tas-servizz tagħha (dejta tal-iskeduler, statistika tal-eżekuzzjoni, eċċ.), u Karfus se jimmarka l-kompiti lesti;
- Ddistribwit mill-, li se jaġixxi bħala sensar tal-kompitu għal Karfus;
- Ħaddiem tal-karfus, li se jkunu involuti fl-eżekuzzjoni diretta tal-kompiti.
- Biex folder
./dagsse nżidu l-fajls tagħna bid-deskrizzjoni tad-dags. Dawn se jinġabru fuq il-fly, u għalhekk m'hemmx għalfejn juggle mal-munzell kollu wara kull għatis.
F'xi postijiet, il-kodiċi fl-eżempji ma jintwerax kompletament (sabiex ma jħarrekx it-test), iżda x'imkien jiġi modifikat fil-proċess. Eżempji kompluti tal-kodiċi tax-xogħol jistgħu jinstabu fir-repożitorju .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerNoti:
- Fl-assemblaġġ tal-kompożizzjoni, kont invokat l-aktar fuq l-immaġni magħrufa - kun żgur li tiċċekkjaha. Forsi m’għandek bżonn xi ħaġa oħra f’ħajtek.
- Is-settings kollha Airflow huma disponibbli mhux biss permezz
airflow.cfg, iżda wkoll permezz ta 'varjabbli ambjentali (grazzi għall-iżviluppaturi), li b'mod malizzjuż ħadt vantaġġ minnhom. - Naturalment, mhuwiex lest għall-produzzjoni: ma poġġiex deliberatament taħbit tal-qalb fuq kontenituri, ma ddejjaqtx bis-sigurtà. Imma għamilt il-minimu adattat għall-esperimentaturi tagħna.
- Innota li:
- Il-folder tad-dag għandu jkun aċċessibbli kemm għall-iskedatur kif ukoll għall-ħaddiema.
- L-istess japplika għal-libreriji kollha ta 'partijiet terzi - għandhom kollha jiġu installati fuq magni bi scheduler u ħaddiema.
Ukoll, issa huwa sempliċi:
$ docker-compose up --scale worker=3Wara li kollox jogħla, tista 'tħares lejn l-interfaces tal-web:
- Fluss ta 'l-arja:
- Fjura:
Kunċetti bażiċi
Jekk ma fhimt xejn f’dawn id-“dags”, allura hawn dizzjunarju qasir:
- skedar - l-iktar ziju importanti fl-Airflow, li jikkontrolla li r-robots jaħdmu iebes, u mhux persuna: jimmonitorja l-iskeda, jaġġorna d-dags, iniedi l-kompiti.
B'mod ġenerali, f'verżjonijiet anzjani, kellu problemi bil-memorja (le, mhux amnesija, iżda tnixxijiet) u l-parametru tal-legat saħansitra baqa 'fil-konfigurazzjonijiet
run_duration— l-intervall mill-ġdid tiegħu. Imma issa kollox tajjeb. - DAG (magħruf ukoll bħala "dag") - "graff aċikliku dirett", iżda definizzjoni bħal din tgħid ftit nies, iżda fil-fatt hija kontenitur għal kompiti li jinteraġixxu ma 'xulxin (ara hawn taħt) jew analogu ta' Pakkett f'SSIS u Workflow f'Informatica .
Minbarra d-dags, xorta jista 'jkun hemm subdags, iżda x'aktarx mhux se naslu għalihom.
- DAG Run - dag inizjalizzat, li huwa assenjat tiegħu stess
execution_date. Dagrans ta 'l-istess dag jistgħu jaħdmu b'mod parallel (jekk għamilt il-kompiti tiegħek idempotent, ovvjament). - operatur huma biċċiet ta' kodiċi responsabbli għat-twettiq ta' azzjoni speċifika. Hemm tliet tipi ta’ operaturi:
- azzjonibħall-favorit tagħna
PythonOperator, li jista' jesegwixxi kwalunkwe kodiċi Python (validu); - trasferiment, li jittrasportaw data minn post għal post, ngħidu aħna,
MsSqlToHiveTransfer; - senser min-naħa l-oħra, se jippermettilek tirreaġixxi jew tnaqqas il-mod l-eżekuzzjoni ulterjuri tad-dag sakemm iseħħ avveniment.
HttpSensorjista 'jiġbed l-endpoint speċifikat, u meta r-rispons mixtieq ikun qed jistenna, ibda t-trasferimentGoogleCloudStorageToS3Operator. Moħħ kurżjuż jistaqsi: “għaliex? Wara kollox, tista’ tagħmel repetizzjonijiet eżatt fl-operatur!” U mbagħad, sabiex ma jinstaddux il-ġabra ta 'kompiti ma' operaturi sospiżi. Is-sensor jibda, jiċċekkja u jmut qabel l-attentat li jmiss.
- azzjonibħall-favorit tagħna
- Segretarjali u klerikali. - operaturi ddikjarati, irrispettivament mit-tip, u mehmuża mad-dag jiġu promossi għall-grad ta' kompitu.
- eżempju tal-kompitu - meta l-pjanifikatur ġenerali ddeċieda li kien wasal iż-żmien li jintbagħtu l-kompiti fil-battalja fuq l-artisti-ħaddiema (eżatt fuq il-post, jekk nużaw
LocalExecutorjew għal node remot fil-każ taCeleryExecutor), jassenja lilhom kuntest (jiġifieri, sett ta 'varjabbli - parametri ta' eżekuzzjoni), jespandi mudelli ta 'kmand jew mistoqsija, u jiġborhom.
Aħna niġġeneraw kompiti
L-ewwel, ejja tiddeskrivi l-iskema ġenerali ta 'doug tagħna, u mbagħad aħna se adsa fid-dettalji aktar u aktar, minħabba li napplikaw xi soluzzjonijiet mhux trivjali.
Allura, fil-forma l-aktar sempliċi tagħha, tali dag se tidher bħal din:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Ejja nsemmu:
- L-ewwel, aħna jimportaw il-libs meħtieġa u xi haga ohra;
sql_server_ds- dan huList[namedtuple[str, str]]bl-ismijiet tal-konnessjonijiet minn Airflow Connections u d-databases li minnhom se nieħdu l-platt tagħna;dag- it-tħabbira tad-dag tagħna, li bilfors irid ikun figlobals(), inkella Airflow ma ssibhiex. Doug jeħtieġ ukoll jgħid:- X'jismu
orders- dan l-isem imbagħad jidher fl-interface tal-web, - li se jaħdem minn nofs il-lejl fit-tmienja ta’ Lulju,
- u għandha taħdem, bejn wieħed u ieħor kull 6 sigħat (għal guys iebsa hawn minflok
timedelta()permissibblicron-linja0 0 0/6 ? * * *, għall-inqas jibred - espressjoni simili@daily);
- X'jismu
workflow()se tagħmel ix-xogħol ewlieni, iżda mhux issa. Għalissa, aħna ser biss dump kuntest tagħna fil-log.- U issa l-maġija sempliċi tal-ħolqien tal-kompiti:
- aħna għaddejjin minn sorsi tagħna;
- initialize
PythonOperator, li se jesegwixxi l-manikin tagħnaworkflow(). Tinsiex li tispeċifika isem uniku (fi ħdan id-dag) tal-kompitu u torbot id-dag innifsu. Bandieraprovide_contextimbagħad, se pour argumenti addizzjonali fil-funzjoni, li aħna se jiġbru bir-reqqa bl-użu**context.
Għalissa, dak kollu. Dak li ksibna:
- dag ġdid fl-interface tal-web,
- mitt u nofs xogħol li se jiġu esegwiti b'mod parallel (jekk il-Fluss tal-Ajru, is-settings tal-Kfus u l-kapaċità tas-server jippermettu dan).
Ukoll, kważi ltqajna.

Min se jinstalla d-dipendenzi?
Biex tissimplifika din il-ħaġa kollha, daħħalt docker-compose.yml ipproċessar requirements.txt fuq in-nodi kollha.
Issa marret:

Il-kwadri griżi huma każijiet ta' xogħol ipproċessati mill-iskedar.
Nistennew ftit, il-kompiti jinqabdu mill-ħaddiema:

Dawk ħodor, ovvjament, temmew b'suċċess ix-xogħol tagħhom. Il-Ħomor ma tantx għandhom suċċess.
Mill-mod, m'hemm l-ebda folder fuq il-prod tagħna
./dags, m'hemm l-ebda sinkronizzazzjoni bejn il-magni - id-dags kollha jinsabugitfuq Gitlab tagħna, u Gitlab CI jiddistribwixxi aġġornamenti lill-magni meta jingħaqdumaster.
Ftit dwar Fjura
Waqt li l-ħaddiema qed jaqtgħu l-paċifikati tagħna, ejja niftakru għodda oħra li tista’ turina xi ħaġa – Fjura.
L-ewwel paġna b'informazzjoni fil-qosor dwar in-nodi tal-ħaddiema:

L-aktar paġna intensa b'kompiti li marru jaħdmu:

L-aktar paġna boring bl-istatus tas-sensar tagħna:

L-isbaħ paġna hija bil-grafiċi tal-istatus tal-kompitu u l-ħin tal-eżekuzzjoni tagħhom:

Aħna tagħbija l-mgħabija
Allura, il-kompiti kollha ħadmu, tista 'twettaq il-midruba.

U kien hemm ħafna midruba - għal xi raġuni jew oħra. Fil-każ tal-użu korrett tal-Airflow, dawn il-kwadri stess jindikaw li d-dejta żgur ma waslitx.
Ikollok bżonn tara l-log u terġa 'tibda l-istanzi tal-kompitu waqgħu.
Billi tikklikkja fuq kwalunkwe kwadru, naraw l-azzjonijiet disponibbli għalina:

Tista 'tieħu u tagħmel ċari l-waqa'. Jiġifieri, ninsew li xi ħaġa falliet hemmhekk, u l-istess kompitu ta 'istanza se jmur għand l-iskedar.

Huwa ċar li tagħmel dan bil-maws bil-kwadri ħomor kollha mhux uman ħafna - dan mhux dak li nistennew mill-Airflow. Naturalment, għandna armi tal-qerda tal-massa: Browse/Task Instances

Ejja nagħżlu kollox f'daqqa u reset għal żero, ikklikkja l-oġġett it-tajjeb:

Wara t-tindif, it-taxis tagħna jidhru bħal dawn (diġà qed jistennew li l-iskedatur jiskedahom):

Konnessjonijiet, ganċijiet u varjabbli oħra
Wasal iż-żmien li nħarsu lejn id-DAG li jmiss, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Kulħadd qatt għamel aġġornament tar-rapport? Din hija għal darb'oħra tagħha: hemm lista ta 'sorsi minn fejn tikseb id-data; hemm lista fejn tpoġġi; ma ninsewx li tfaċċar meta kollox ġara jew inkiser (tajjeb, dan mhux dwarna, le).
Ejja nerġgħu ngħaddu mill-fajl u nħarsu lejn l-affarijiet oskurati ġodda:
from commons.operators import TelegramBotSendMessage- xejn ma jipprevjeni milli nagħmlu l-operaturi tagħna stess, li ħadna vantaġġ minnhom billi għamilna wrapper żgħir biex nibagħtu messaġġi lil Unblocked. (Se nitkellmu aktar dwar dan l-operatur hawn taħt);default_args={}- dag tista' tqassam l-istess argumenti lill-operaturi kollha tagħha;to='{{ var.value.all_the_kings_men }}'- qasamtomhux se jkollna hardcoded, iżda ġenerati b'mod dinamiku bl-użu ta 'Jinja u varjabbli b'lista ta' emails, li nressaq bir-reqqaAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— il-kondizzjoni biex jibda l-operatur. Fil-każ tagħna, l-ittra se ttir lejn il-pumijiet biss jekk id-dipendenzi kollha jkunu ħadmu b'suċċess;tg_bot_conn_id='tg_main'- argumenticonn_idjaċċettaw IDs tal-konnessjoni li noħolqu fihomAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- messaġġi f'Telegram se jtiru biss jekk ikun hemm kompiti waqgħu;task_concurrency=1- nipprojbixxu t-tnedija simultanja ta' diversi każijiet ta' kompitu ta' kompitu wieħed. Inkella, aħna se tikseb it-tnedija simultanja ta 'diversiVerticaOperator(tħares lejn mejda waħda);report_update >> [email, tg]- kollhaVerticaOperatorjikkonverġu biex jibagħtu ittri u messaġġi, bħal dan:

Iżda peress li l-operaturi tan-notifikanti għandhom kundizzjonijiet ta’ tnedija differenti, wieħed biss jaħdem. Fit-Tree View, kollox jidher ftit inqas viżwali:

Se ngħid ftit kliem dwar makro u sħabhom - varjabbli.
Macros huma placeholders Jinja li jistgħu jissostitwixxu informazzjoni utli varji fl-argumenti tal-operatur. Per eżempju, bħal dan:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} se jespandi għall-kontenut tal-varjabbli tal-kuntest execution_date fil-format YYYY-MM-DD: 2020-07-14. L-aħjar parti hija li l-varjabbli tal-kuntest huma nailed għal eżempju ta 'kompitu speċifiku (kwadru fit-Tree View), u meta jerġgħu jibdew, il-placeholders se jespandu għall-istess valuri.
Il-valuri assenjati jistgħu jitqiesu billi tuża l-buttuna Rendered fuq kull istanza tal-kompitu. Dan huwa kif il-kompitu li tibgħat ittra:

U għalhekk fil-kompitu li tibgħat messaġġ:

Lista kompluta ta' macros inkorporati għall-aħħar verżjoni disponibbli hija disponibbli hawn:
Barra minn hekk, bl-għajnuna ta 'plugins, nistgħu niddikjaraw macros tagħna stess, iżda dik hija storja oħra.
Minbarra l-affarijiet predefiniti, nistgħu nissostitwixxu l-valuri tal-varjabbli tagħna (diġà użajt dan fil-kodiċi ta 'hawn fuq). Ejja noħolqu ġewwa Admin/Variables ftit affarijiet:

Dak kollu li tista' tuża:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Il-valur jista 'jkun skalar, jew jista' jkun ukoll JSON. Fil-każ ta' JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}uża biss it-triq għaċ-ċavetta mixtieqa: {{ var.json.bot_config.bot.token }}.
Se litteralment ngħid kelma waħda u nuri screenshot wieħed dwar konnessjonijiet. Kollox huwa elementari hawn: fuq il-paġna Admin/Connections noħolqu konnessjoni, żid il-logins / passwords tagħna u parametri aktar speċifiċi hemmhekk. Bħal dan:

Il-passwords jistgħu jiġu encrypted (aktar bir-reqqa mill-default), jew tista 'tħalli barra t-tip ta' konnessjoni (kif għamilt għal tg_main) - il-fatt hu li l-lista tat-tipi hija wajerjata fil-mudelli Airflow u ma tistax tiġi estiża mingħajr ma tidħol fil-kodiċijiet tas-sors (jekk f'daqqa waħda ma kontx google xi ħaġa, jekk jogħġbok ikkoreġini), iżda xejn ma jwaqqafna milli niksbu krediti biss billi isem.
Tista 'wkoll tagħmel diversi konnessjonijiet bl-istess isem: f'dan il-każ, il-metodu BaseHook.get_connection(), li jġibna konnessjonijiet bl-isem, se jagħti addoċċ minn diversi namesakes (ikun aktar loġiku li tagħmel Round Robin, iżda ejja nħallu fuq il-kuxjenza ta 'l-iżviluppaturi Airflow).
Varjabbli u Konnessjonijiet huma ċertament għodod friski, iżda huwa importanti li ma titlifx il-bilanċ: liema partijiet tal-flussi tiegħek taħżen fil-kodiċi innifsu, u liema partijiet tagħti lil Airflow għall-ħażna. Min-naħa waħda, jista 'jkun konvenjenti li jinbidel malajr il-valur, pereżempju, kaxxa postali, permezz tal-UI. Min-naħa l-oħra, dan għadu ritorn għall-klikk tal-maws, li minnha ridna (jien) neħilsu.
Il-ħidma b'konnessjonijiet hija waħda mill-kompiti ganċijiet. B'mod ġenerali, il-ganċijiet tal-Fluss tal-Ajru huma punti għall-konnessjoni ma 'servizzi u libreriji ta' partijiet terzi. Eż., JiraHook se tiftaħ klijent għalina biex jinteraġixxu ma 'Jira (tista' ċċaqlaq il-kompiti 'l quddiem u 'l quddiem), u bl-għajnuna ta' SambaHook tista 'timbotta fajl lokali biex smb-punt.
Parsing tal-operatur tad-dwana
U sirna viċin li nħarsu lejn kif issir TelegramBotSendMessage
Kodiċi commons/operators.py mal-operatur attwali:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Hawnhekk, bħal kull ħaġa oħra fl-Airflow, kollox huwa sempliċi ħafna:
- Wiret minn
BaseOperator, li timplimenta pjuttost ftit affarijiet speċifiċi għall-fluss tal-arja (ħares lejn il-ħin liberu tiegħek) - Oqsma ddikjarati
template_fields, li fih Jinja se tfittex macros biex tipproċessa. - Irranġat l-argumenti t-tajba għal
__init__(), issettja l-inadempjenzi fejn meħtieġ. - Ma ninsewx dwar l-inizjalizzazzjoni tal-antenat lanqas.
- Infetaħ il-ganċ korrispondenti
TelegramBotHookirċieva oġġett klijent mingħandu. - Metodu overridden (definit mill-ġdid).
BaseOperator.execute(), li Airfow se twitch meta jasal iż-żmien li tniedi l-operatur - fiha se nimplimentaw l-azzjoni ewlenija, u ninsew li tidħol. (Aħna nilloggjaw, bil-mod, eżattstdoutиstderr- Il-fluss ta 'l-arja se jinterċetta kollox, ikebbeb sewwa, jiddekomponih fejn meħtieġ.)
Ejja naraw x'għandna commons/hooks.py. L-ewwel parti tal-fajl, bil-ganċ innifsu:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientLanqas naf x'għandi nispjega hawn, ser ninnota biss il-punti importanti:
- Aħna nirtu, aħseb dwar l-argumenti - f'ħafna każijiet se jkun wieħed:
conn_id; - Metodi standard li jipprevjenu: I llimitat ruħi
get_conn(), li fiha nieħu l-parametri tal-konnessjoni bl-isem u nġib biss is-sezzjoniextra(dan huwa qasam JSON), li fih jien (skond l-istruzzjonijiet tiegħi stess!) Inpoġġi t-token tal-bot Telegram:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Noħloq eżempju tagħna
TelegramBot, billi tagħtiha token speċifiku.
Dak kollox. Tista 'tikseb klijent minn ganċ bl-użu TelegramBotHook().clent jew TelegramBotHook().get_conn().
U t-tieni parti tal-fajl, li fiha nagħmel microwrapper għat-Telegram REST API, sabiex ma nkaxkarx l-istess għal metodu wieħed sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Il-mod korrett huwa li żżid kollox:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- fil-plugin, poġġi f'repożitorju pubbliku, u agħtih lil Open Source.
Waqt li konna qed nistudjaw dan kollu, l-aġġornamenti tar-rapporti tagħna rnexxielhom ifallu b'suċċess u jibagħtuli messaġġ ta' żball fil-kanal. Se niċċekkja biex nara jekk hux ħażin...

Xi ħaġa kissret fid-doġ tagħna! Mhux hekk konna nistennew? Eżattament!
Int se tferra?
Tħoss li tlift xi ħaġa? Jidher li huwa wiegħed li jittrasferixxi d-data minn SQL Server għal Vertica, u mbagħad ħadha u mċaqlaq barra mis-suġġett, il-kanna!
Din l-atroċità kienet intenzjonata, kelli sempliċement niddeċifra xi terminoloġija għalik. Issa tista’ tmur lil hinn.
Il-pjan tagħna kien dan:
- Do dag
- Iġġenera ħidmiet
- Ara kemm hu sabiħ kollox
- Assenja numri tas-sessjoni għall-mili
- Ikseb data minn SQL Server
- Poġġi d-dejta f'Vertica
- Iġbor l-istatistika
Allura, biex dan kollu jibda jaħdem, għamilt żieda żgħira għal tagħna docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyHemmhekk inqajmu:
- Vertica bħala ospitanti
dwhbl-aktar settings default, - tliet każijiet ta' SQL Server,
- aħna nimlew id-databases f'dawn tal-aħħar b'xi dejta (fl-ebda każ ma tħaresx lejn
mssql_init.py!)
Inniedu t-tajjeb kollu bl-għajnuna ta 'kmand kemmxejn aktar ikkumplikat mill-aħħar darba:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Dak li randomizer miracle tagħna iġġenerat, tista 'tuża l-oġġett Data Profiling/Ad Hoc Query:

Il-ħaġa prinċipali mhix li turiha lill-analisti
telabora fuq Sessjonijiet ETL Mhux se, kollox huwa trivjali hemmhekk: nagħmlu bażi, hemm sinjal fiha, nagħżlu kollox ma 'maniġer tal-kuntest, u issa nagħmlu dan:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passWasal iż-żmien jiġbru d-data tagħna minn mitt u nofs tabella tagħna. Ejja nagħmlu dan bl-għajnuna ta 'linji bla pretenzjoni ħafna:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Bl-għajnuna ta 'ganċ niksbu minn Airflow
pymssql-connect - Issostitwixxi restrizzjoni fil-forma ta 'data fit-talba - se tintefa' fil-funzjoni mill-magna tal-mudell.
- It-tmigħ tat-talba tagħna
pandasmin se jġibnaDataFrame- se jkun utli għalina fil-futur.
Qed nuża sostituzzjoni
{dt}minflok parametru talba%smhux għax jien Pinocchio ħażin, imma għaxpandasma jistax jimmaniġġjapymssqlu tiżloq l-aħħar waħdaparams: Listgħalkemm verament iridtuple.
Innota wkoll li l-iżviluppaturpymssqliddeċieda li ma jappoġġjahx aktar, u wasal iż-żmien li tiċċaqlaqpyodbc.
Ejja naraw b'liema Airflow mimli l-argumenti tal-funzjonijiet tagħna:

Jekk ma jkunx hemm dejta, allura m'hemm l-ebda punt li tkompli. Iżda hija wkoll stramba li tikkunsidra l-mili ta 'suċċess. Iżda dan mhuwiex żball. A-ah-ah, x'għandek tagħmel?! U hawn xi:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException se tgħid Airflow li m'hemm l-ebda żbalji, iżda aħna naqbżu l-kompitu. L-interface mhux se jkollu kwadru aħdar jew aħmar, iżda roża.
Ejja toss id-data tagħna kolonni multipli:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Jiġifieri:
- Id-database li minnha ħadna l-ordnijiet,
- ID tas-sessjoni tal-għargħar tagħna (se tkun differenti għal kull kompitu),
- Hash mis-sors u l-ID tal-ordni - sabiex fid-database finali (fejn kollox jitferra f'tabella waħda) ikollna ID tal-ordni unika.
Il-pass ta’ qabel tal-aħħar jibqa’: ferra’ kollox ġo Vertica. U, b'mod stramb, wieħed mill-aktar modi spettakolari u effiċjenti biex isir dan huwa permezz tas-CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Qed nagħmlu riċevitur speċjali
StringIO. pandasse ġentilment tpoġġi tagħnaDataFramefil - forma ta 'CSV-linji.- Ejja niftħu konnessjoni mal-Vertica favorita tagħna b'ganċ.
- U issa bl-għajnuna
copy()ibgħat id-dejta tagħna direttament lil Vertika!
Se nieħdu mingħand ix-xufier kemm imtlew linji, u ngħidu lill-maniġer tas-sessjoni li kollox huwa tajjeb:
session.loaded_rows = cursor.rowcount
session.successful = TrueDak kollox.
Fuq il-bejgħ, noħolqu l-pjanċa fil-mira manwalment. Hawnhekk ħallejt lili nnifsi magna żgħira:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)Qed nuża
VerticaOperator()Noħloq skema tad-database u tabella (jekk ma jeżistux diġà, ovvjament). Il-ħaġa prinċipali hija li tirranġa b'mod korrett id-dipendenzi:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadBħala sinteżi
- Ukoll, - qal il-ġurdien żgħir, - hux, issa
Int konvint li jien l-aktar annimal terribbli fil-foresta?
Julia Donaldson, Il-Gruffalo
Naħseb li kieku jien u l-kollegi tiegħi kellna kompetizzjoni: min se joħloq u jniedi malajr proċess ETL mill-bidu: huma bl-SSIS tagħhom u maws u jien bl-Airflow ... U allura nqabblu wkoll il-faċilità tal-manutenzjoni ... Ara naqra, naħseb li inti taqbel li se ngħaqqadhom minn kull naħa!
Jekk xi ftit aktar bis-serjetà, allura Apache Airflow - billi ddeskriviet proċessi fil-forma ta 'kodiċi tal-programm - għamel ix-xogħol tiegħi ħafna aktar komdu u pjaċevoli.
L-estensibbiltà illimitata tagħha, kemm f’termini ta’ plug-ins kif ukoll ta’ predispożizzjoni għall-iskalabbiltà, tagħtik l-opportunità li tuża Airflow fi kważi kull qasam: anke fiċ-ċiklu sħiħ tal-ġbir, it-tħejjija u l-ipproċessar tad-dejta, anke fit-tnedija tar-rokits (sa Mars, ta’ kors).
Parti finali, referenza u informazzjoni
Ir-rake li ġbarna għalik
start_date. Iva, dan diġà huwa meme lokali. Via l-argument ewlieni ta’ Dougstart_datekollha jgħaddu. Fil-qosor, jekk tispeċifika fistart_datedata kurrenti, uschedule_interval- jum wieħed, imbagħad DAG se jibda għada mhux qabel.start_date = datetime(2020, 7, 7, 0, 1, 2)U mhux aktar problemi.
Hemm żball ieħor ta' runtime assoċjat miegħu:
Task is missing the start_date parameter, li ħafna drabi jindika li insejt torbot mal-operatur dag.- Kollha fuq magna waħda. Iva, u bażijiet (Airflow innifsu u kisi tagħna), u web server, u Scheduler, u ħaddiema. U anke ħadem. Iżda maż-żmien, in-numru ta 'kompiti għas-servizzi kiber, u meta PostgreSQL beda jirrispondi għall-indiċi f'20 s minflok 5 ms, ħadna u ġarrejna.
- Eżekutur Lokali. Iva, għadna bilqiegħda fuqha, u diġà wasalna fit-tarf tal-abbiss. LocalExecutor kien biżżejjed għalina s'issa, iżda issa wasal iż-żmien li nespandu b'mill-inqas ħaddiem wieħed, u ser ikollna naħdmu ħafna biex nimxu għal CeleryExecutor. U fid-dawl tal-fatt li tista 'taħdem magħha fuq magna waħda, xejn ma jwaqqafk milli tuża Karfus anki fuq server, li "naturalment, qatt mhu se jidħol fil-produzzjoni, onestament!"
- Nuqqas ta' użu għodod integrati:
- Konnessjonijiet biex taħżen il-kredenzjali tas-servizz,
- SLA Miss biex twieġeb għall-kompiti li ma ħadmux fil-ħin,
- xcom għall-iskambju tal-metadata (għidt metadata!) bejn il-kompiti dag.
- Abbuż tal-posta. Ukoll, x'nista 'ngħid? Ġew stabbiliti twissijiet għar-repetizzjonijiet kollha tal-kompiti waqgħu. Issa l-Gmail tax-xogħol tiegħi għandu > 90k email mill-Airflow, u l-geddum tal-posta tal-web jirrifjuta li jiġbor u jħassar aktar minn 100 kull darba.
Aktar nases:
Aktar għodod ta 'awtomazzjoni
Sabiex naħdmu aktar b'rasna u mhux b'idejna, Airflow ħejja għalina dan:
- - għad għandu l-istatus ta' Sperimentali, li ma jwaqqafx milli jaħdem. Biha, tista 'mhux biss tikseb informazzjoni dwar dags u ħidmiet, iżda wkoll twaqqaf/tibda dag, toħloq DAG Run jew pool.
- - ħafna għodod huma disponibbli permezz tal-linja tal-kmand li mhumiex biss inkonvenjenti biex jintużaw permezz tal-WebUI, iżda ġeneralment huma assenti. Pereżempju:
backfillmeħtieġa biex jerġgħu jibdew l-istanzi tal-kompitu.
Per eżempju, l-analisti ġew u qalu: “U int, sħabu, għandek xi ħaġa bla sens fid-dejta mill-1 sat-13 ta’ Jannar! Waħħalha, irranġaha, irranġaha, irranġaha!” U int tali hob:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Servizz bażi:
initdb,resetdb,upgradedb,checkdb. run, li jippermettilek tmexxi kompitu ta 'istanza waħda, u anke punteġġ fuq id-dipendenzi kollha. Barra minn hekk, tista 'taħdemha permezzLocalExecutor, anki jekk għandek raggruppament Karfus.- Jagħmel pjuttost l-istess ħaġa
test, biss ukoll fil-bażijiet ma jikteb xejn. connectionsjippermetti ħolqien tal-massa ta 'konnessjonijiet mill-qoxra.
- - mod pjuttost iebes ta 'interazzjoni, li huwa maħsub għall-plugins, u mhux swarming fih b'idejn żgħar. Imma min iwaqqafna milli mmorru
/home/airflow/dags, runipythonu tibda taqbad? Tista', pereżempju, tesporta l-konnessjonijiet kollha bil-kodiċi li ġej:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Konnessjoni mal-metadatabase Airflow. Ma nirrakkomandax li nikteb lilha, iżda li tikseb stati tal-kompitu għal diversi metriċi speċifiċi jista 'jkun ħafna aktar mgħaġġel u eħfef milli tuża kwalunkwe mill-APIs.
Ejja ngħidu li mhux il-kompiti tagħna kollha huma idempotenti, iżda xi drabi jistgħu jaqgħu, u dan huwa normali. Iżda ftit imblukkar huma diġà suspettużi, u jkun meħtieġ li tiċċekkja.
Oqgħod attent SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
referenzi
U ovvjament, l-ewwel għaxar links mill-ħruġ ta 'Google huma l-kontenut tal-folder Airflow mill-bookmarks tiegħi.
- - ovvjament, irridu nibdew bl-uffiċċju. dokumentazzjoni, imma min jaqra l-istruzzjonijiet?
- - Ukoll, mill-inqas aqra r-rakkomandazzjonijiet mill-ħallieqa.
- - il-bidu nett: l-interface tal-utent fi stampi
- - il-kunċetti bażiċi huma deskritti tajjeb, jekk (f'daqqa waħda!) Ma fhimtx xi ħaġa mingħandi.
- - gwida qasira għat-twaqqif ta' cluster Airflow.
- - kważi l-istess artiklu interessanti, ħlief forsi aktar formaliżmu, u inqas eżempji.
- — dwar ix-xogħol flimkien mal-karfus.
- - dwar l-idempotenza tal-kompiti, it-tagħbija bl-ID minflok id-data, it-trasformazzjoni, l-istruttura tal-fajl u affarijiet interessanti oħra.
- - dipendenzi tal-kompiti u Trigger Rule, li semmejt biss fil-mogħdija.
- - kif tegħleb xi "xogħlijiet kif intenzjonat" fl-iskedar, tniżżel dejta mitlufa u tipprijoritizza l-kompiti.
- — mistoqsijiet SQL utli għall-metadata Airflow.
- - hemm taqsima utli dwar il-ħolqien ta 'sensor tad-dwana.
- — nota qasira interessanti dwar il-bini ta' infrastruttura fuq AWS għax-Xjenza tad-Data.
- - żbalji komuni (meta xi ħadd għadu ma jaqrax l-istruzzjonijiet).
- - tbissem kif in-nies jaħżnu l-passwords, għalkemm tista 'sempliċement tuża Konnessjonijiet.
- - twassil impliċitu tad-DAG, kuntest li jitfa' funzjonijiet, għal darb'oħra dwar id-dipendenzi, u wkoll dwar it-tnedija tal-kompiti ta' qbiż.
- - dwar l-użu
default argumentsиparamsf'mudelli, kif ukoll Varjabbli u Konnessjonijiet. - - storja dwar kif il-pjanifikatur qed jipprepara għall-Airflow 2.0.
- - artikolu kemmxejn skadut dwar l-iskjerament tal-cluster tagħna fi
docker-compose. - - ħidmiet dinamiċi bl-użu ta' mudelli u trażmissjoni tal-kuntest.
- — notifiki standard u personalizzati bil-posta u Slack.
- - Ħidmiet tal-fergħat, macros u XCom.
U l-links użati fl-artiklu:
- - placeholders disponibbli għall-użu f'mudelli.
- — Żbalji komuni meta jinħolqu dags.
- -
docker-composegħall-esperimentazzjoni, debugging u aktar. - — Wrapper Python għal Telegram REST API.
Sors: www.habr.com




