Hi, jien Dmitry Logvinenko - Inġinier tad-Data tad-Dipartiment tal-Analitika tal-grupp ta 'kumpaniji Vezet.
Jien ngħidlek dwar għodda mill-isbaħ għall-iżvilupp ta 'proċessi ETL - Apache Airflow. Iżda Airflow huwa tant versatili u multidimensjonali li għandek tagħti ħarsa aktar mill-qrib lejha anki jekk m'intix involut fi flussi tad-dejta, iżda jkollok bżonn li perjodikament tniedi kwalunkwe proċess u tissorvelja l-eżekuzzjoni tagħhom.
U iva, mhux biss ngħid, imma nuri wkoll: il-programm għandu ħafna kodiċi, screenshots u rakkomandazzjonijiet.
Dak li s-soltu tara meta tpoġġi fuq Google l-kelma Airflow / Wikimedia Commons
- aħjar biss, u sar għal skopijiet kompletament differenti, jiġifieri (kif miktub qabel il-kata):
it-tmexxija u l-monitoraġġ tal-kompiti fuq numru illimitat ta 'magni (kif ħafna Karfus / Kubernetes u l-kuxjenza tiegħek jippermettulek)
bil-ġenerazzjoni tal-fluss tax-xogħol dinamiku minn faċli ħafna biex tikteb u tifhem il-kodiċi Python
u l-abbiltà li tikkonnettja kwalunkwe databases u APIs ma 'xulxin billi tuża kemm komponenti lesti kif ukoll plugins magħmulin mid-dar (li huwa estremament sempliċi).
Aħna nużaw Apache Airflow bħal dan:
aħna niġbru data minn sorsi varji (ħafna istanzi ta 'SQL Server u PostgreSQL, diversi APIs b'metriċi ta' applikazzjoni, anke 1C) f'DWH u ODS (għandna Vertica u Clickhouse).
kemm avvanzat cron, li jibda l-proċessi ta' konsolidazzjoni tad-dejta fuq l-ODS, u jimmonitorja wkoll il-manutenzjoni tagħhom.
Sa ftit ilu, il-bżonnijiet tagħna kienu koperti minn server żgħir wieħed bi 32 core u 50 GB RAM. Fl-Airflow, dan jaħdem:
aktar 200 dag (fil-fatt flussi tax-xogħol, li fihom nimtlew kompiti),
f'kull medja 70 biċċa xogħol,
din it-tjubija tibda (ukoll bħala medja) darba fis-siegħa.
U dwar kif espanna, se nikteb hawn taħt, imma issa ejja niddefinixxu l-über-problema li se nsolvu:
Hemm tliet SQL Servers oriġinali, kull wieħed b'50 database - każijiet ta 'proġett wieħed, rispettivament, għandhom l-istess struttura (kważi kullimkien, mua-ha-ha), li jfisser li kull wieħed għandu tabella tal-Ordnijiet (fortunatament, tabella b'dik isem jista 'jiġi push fi kwalunkwe negozju). Aħna nieħdu d-dejta billi nżidu oqsma tas-servizz (server tas-sors, database tas-sors, ID tal-kompitu ETL) u b'mod naiv nitfgħuhom, ngħidu aħna, Vertica.
Ejja ħa mmorru!
Il-parti prinċipali, prattika (u ftit teoretika)
Għaliex aħna (u int)
Meta s-siġar kienu kbar u kont sempliċi SQL-schik f'bejgħ bl-imnut Russu wieħed, aħna scammed proċessi ETL aka flussi tad-dejta bl-użu ta 'żewġ għodod disponibbli għalina:
Ċentru tal-Enerġija tal-Informatika - sistema li tinfirex ħafna, estremament produttiva, bil-ħardwer tagħha stess, il-verżjoni tagħha stess. Jien użajt Alla jipprojbixxi 1% tal-kapaċitajiet tiegħu. Għaliex? Ukoll, l-ewwelnett, din l-interface, x'imkien mis-snin 380, mentalment poġġiet pressjoni fuqna. It-tieni nett, dan il-kontraption huwa ddisinjat għal proċessi estremament fancy, użu mill-ġdid ta 'komponenti furious u tricks oħra importanti ħafna ta' intrapriża. Dwar il-fatt li tiswa, bħall-ġwienaħ tal-Airbus AXNUMX / sena, mhux se ngħidu xejn.
Oqgħod attent, screenshot jista 'jweġġa' ftit nies taħt it-30
SQL Server Integrazzjoni Server - użajna dan il-kompa fil-flussi intra-proġett tagħna. Ukoll, fil-fatt: aħna diġà nużaw SQL Server, u jkun b'xi mod irraġonevoli li ma tużax l-għodod ETL tagħha. Kollox fih huwa tajjeb: kemm l-interface hija sabiħa, kif ukoll ir-rapporti tal-progress ... Iżda dan mhux għaliex inħobbu l-prodotti tas-softwer, oh, mhux għal dan. Verżjoni dan dtsx (li huwa XML b'nodes shuffled fuq issalva) nistgħu, imma x'inhu l-punt? Kif dwar li tagħmel pakkett ta 'kompitu li se jkaxkru mijiet ta' tabelli minn server għal ieħor? Iva, liema mija, subgħajk l-indiċi se taqa 'minn għoxrin biċċa, tikklikkja fuq il-buttuna tal-maws. Imma żgur tidher aktar moda:
Żgur li fittixna modi ta’ ħruġ. Kawża anke kważi wasal għal ġeneratur tal-pakkett SSIS li nkiteb waħdu...
…u mbagħad sabni impjieg ġdid. U Apache Airflow qabeżni fuqha.
Meta sibt li d-deskrizzjonijiet tal-proċess ETL huma kodiċi Python sempliċi, jien biss ma żfinx għall-ferħ. Dan huwa kif il-flussi tad-dejta ġew verżjoni u differenti, u t-tferrigħ ta 'tabelli bi struttura waħda minn mijiet ta' databases f'mira waħda saret kwistjoni ta 'kodiċi Python fi skrins wieħed u nofs jew żewġ 13 ".
Assemblaġġ tal-cluster
Ejja ma nirranġawx kindergarten kompletament, u ma nitkellmux dwar affarijiet kompletament ovvji hawn, bħall-installazzjoni tal-Airflow, id-database magħżula tiegħek, Karfus u każijiet oħra deskritti fil-baċiri.
Sabiex inkunu nistgħu immedjatament nibdew esperimenti, abbozzajt docker-compose.yml F'liema:
Ejja fil-fatt ngħollu Fluss tal-arja: Scheduler, Webserver. Fjura se tkun qed iddur hemm ukoll biex tissorvelja l-kompiti tal-karfus (għax diġà ġiet imbuttata fiha apache/airflow:1.10.10-python3.7, imma ma niddejqux)
PostgreSQL, li fiha Airflow se tikteb l-informazzjoni tas-servizz tagħha (dejta tal-iskeduler, statistika tal-eżekuzzjoni, eċċ.), u Karfus se jimmarka l-kompiti lesti;
Ddistribwit mill-, li se jaġixxi bħala sensar tal-kompitu għal Karfus;
Ħaddiem tal-karfus, li se jkunu involuti fl-eżekuzzjoni diretta tal-kompiti.
Biex folder ./dags se nżidu l-fajls tagħna bid-deskrizzjoni tad-dags. Dawn se jinġabru fuq il-fly, u għalhekk m'hemmx għalfejn juggle mal-munzell kollu wara kull għatis.
F'xi postijiet, il-kodiċi fl-eżempji ma jintwerax kompletament (sabiex ma jħarrekx it-test), iżda x'imkien jiġi modifikat fil-proċess. Eżempji kompluti tal-kodiċi tax-xogħol jistgħu jinstabu fir-repożitorju https://github.com/dm-logv/airflow-tutorial.
Fl-assemblaġġ tal-kompożizzjoni, kont invokat l-aktar fuq l-immaġni magħrufa puckel/docker-airflow - kun żgur li tiċċekkjaha. Forsi m’għandek bżonn xi ħaġa oħra f’ħajtek.
Is-settings kollha Airflow huma disponibbli mhux biss permezz airflow.cfg, iżda wkoll permezz ta 'varjabbli ambjentali (grazzi għall-iżviluppaturi), li b'mod malizzjuż ħadt vantaġġ minnhom.
Naturalment, mhuwiex lest għall-produzzjoni: ma poġġiex deliberatament taħbit tal-qalb fuq kontenituri, ma ddejjaqtx bis-sigurtà. Imma għamilt il-minimu adattat għall-esperimentaturi tagħna.
Innota li:
Il-folder tad-dag għandu jkun aċċessibbli kemm għall-iskedatur kif ukoll għall-ħaddiema.
L-istess japplika għal-libreriji kollha ta 'partijiet terzi - għandhom kollha jiġu installati fuq magni bi scheduler u ħaddiema.
Ukoll, issa huwa sempliċi:
$ docker-compose up --scale worker=3
Wara li kollox jogħla, tista 'tħares lejn l-interfaces tal-web:
Jekk ma fhimt xejn f’dawn id-“dags”, allura hawn dizzjunarju qasir:
skedar - l-iktar ziju importanti fl-Airflow, li jikkontrolla li r-robots jaħdmu iebes, u mhux persuna: jimmonitorja l-iskeda, jaġġorna d-dags, iniedi l-kompiti.
B'mod ġenerali, f'verżjonijiet anzjani, kellu problemi bil-memorja (le, mhux amnesija, iżda tnixxijiet) u l-parametru tal-legat saħansitra baqa 'fil-konfigurazzjonijiet run_duration — l-intervall mill-ġdid tiegħu. Imma issa kollox tajjeb.
DAG (magħruf ukoll bħala "dag") - "graff aċikliku dirett", iżda definizzjoni bħal din tgħid ftit nies, iżda fil-fatt hija kontenitur għal kompiti li jinteraġixxu ma 'xulxin (ara hawn taħt) jew analogu ta' Pakkett f'SSIS u Workflow f'Informatica .
Minbarra d-dags, xorta jista 'jkun hemm subdags, iżda x'aktarx mhux se naslu għalihom.
DAG Run - dag inizjalizzat, li huwa assenjat tiegħu stess execution_date. Dagrans ta 'l-istess dag jistgħu jaħdmu b'mod parallel (jekk għamilt il-kompiti tiegħek idempotent, ovvjament).
operatur huma biċċiet ta' kodiċi responsabbli għat-twettiq ta' azzjoni speċifika. Hemm tliet tipi ta’ operaturi:
azzjonibħall-favorit tagħna PythonOperator, li jista' jesegwixxi kwalunkwe kodiċi Python (validu);
trasferiment, li jittrasportaw data minn post għal post, ngħidu aħna, MsSqlToHiveTransfer;
senser min-naħa l-oħra, se jippermettilek tirreaġixxi jew tnaqqas il-mod l-eżekuzzjoni ulterjuri tad-dag sakemm iseħħ avveniment. HttpSensor jista 'jiġbed l-endpoint speċifikat, u meta r-rispons mixtieq ikun qed jistenna, ibda t-trasferiment GoogleCloudStorageToS3Operator. Moħħ kurżjuż jistaqsi: “għaliex? Wara kollox, tista’ tagħmel repetizzjonijiet eżatt fl-operatur!” U mbagħad, sabiex ma jinstaddux il-ġabra ta 'kompiti ma' operaturi sospiżi. Is-sensor jibda, jiċċekkja u jmut qabel l-attentat li jmiss.
Segretarjali u klerikali. - operaturi ddikjarati, irrispettivament mit-tip, u mehmuża mad-dag jiġu promossi għall-grad ta' kompitu.
eżempju tal-kompitu - meta l-pjanifikatur ġenerali ddeċieda li kien wasal iż-żmien li jintbagħtu l-kompiti fil-battalja fuq l-artisti-ħaddiema (eżatt fuq il-post, jekk nużaw LocalExecutor jew għal node remot fil-każ ta CeleryExecutor), jassenja lilhom kuntest (jiġifieri, sett ta 'varjabbli - parametri ta' eżekuzzjoni), jespandi mudelli ta 'kmand jew mistoqsija, u jiġborhom.
Aħna niġġeneraw kompiti
L-ewwel, ejja tiddeskrivi l-iskema ġenerali ta 'doug tagħna, u mbagħad aħna se adsa fid-dettalji aktar u aktar, minħabba li napplikaw xi soluzzjonijiet mhux trivjali.
Allura, fil-forma l-aktar sempliċi tagħha, tali dag se tidher bħal din:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Ejja nsemmu:
L-ewwel, aħna jimportaw il-libs meħtieġa u xi haga ohra;
sql_server_ds - dan hu List[namedtuple[str, str]] bl-ismijiet tal-konnessjonijiet minn Airflow Connections u d-databases li minnhom se nieħdu l-platt tagħna;
dag - it-tħabbira tad-dag tagħna, li bilfors irid ikun fi globals(), inkella Airflow ma ssibhiex. Doug jeħtieġ ukoll jgħid:
X'jismu orders - dan l-isem imbagħad jidher fl-interface tal-web,
li se jaħdem minn nofs il-lejl fit-tmienja ta’ Lulju,
u għandha taħdem, bejn wieħed u ieħor kull 6 sigħat (għal guys iebsa hawn minflok timedelta() permissibbli cron-linja 0 0 0/6 ? * * *, għall-inqas jibred - espressjoni simili @daily);
workflow() se tagħmel ix-xogħol ewlieni, iżda mhux issa. Għalissa, aħna ser biss dump kuntest tagħna fil-log.
U issa l-maġija sempliċi tal-ħolqien tal-kompiti:
aħna għaddejjin minn sorsi tagħna;
initialize PythonOperator, li se jesegwixxi l-manikin tagħna workflow(). Tinsiex li tispeċifika isem uniku (fi ħdan id-dag) tal-kompitu u torbot id-dag innifsu. Bandiera provide_context imbagħad, se pour argumenti addizzjonali fil-funzjoni, li aħna se jiġbru bir-reqqa bl-użu **context.
Għalissa, dak kollu. Dak li ksibna:
dag ġdid fl-interface tal-web,
mitt u nofs xogħol li se jiġu esegwiti b'mod parallel (jekk il-Fluss tal-Ajru, is-settings tal-Kfus u l-kapaċità tas-server jippermettu dan).
Ukoll, kważi ltqajna.
Min se jinstalla d-dipendenzi?
Biex tissimplifika din il-ħaġa kollha, daħħalt docker-compose.yml ipproċessar requirements.txt fuq in-nodi kollha.
Issa marret:
Il-kwadri griżi huma każijiet ta' xogħol ipproċessati mill-iskedar.
Dawk ħodor, ovvjament, temmew b'suċċess ix-xogħol tagħhom. Il-Ħomor ma tantx għandhom suċċess.
Mill-mod, m'hemm l-ebda folder fuq il-prod tagħna ./dags, m'hemm l-ebda sinkronizzazzjoni bejn il-magni - id-dags kollha jinsabu git fuq Gitlab tagħna, u Gitlab CI jiddistribwixxi aġġornamenti lill-magni meta jingħaqdu master.
Ftit dwar Fjura
Waqt li l-ħaddiema qed jaqtgħu l-paċifikati tagħna, ejja niftakru għodda oħra li tista’ turina xi ħaġa – Fjura.
L-ewwel paġna b'informazzjoni fil-qosor dwar in-nodi tal-ħaddiema:
L-aktar paġna intensa b'kompiti li marru jaħdmu:
L-aktar paġna boring bl-istatus tas-sensar tagħna:
L-isbaħ paġna hija bil-grafiċi tal-istatus tal-kompitu u l-ħin tal-eżekuzzjoni tagħhom:
Aħna tagħbija l-mgħabija
Allura, il-kompiti kollha ħadmu, tista 'twettaq il-midruba.
U kien hemm ħafna midruba - għal xi raġuni jew oħra. Fil-każ tal-użu korrett tal-Airflow, dawn il-kwadri stess jindikaw li d-dejta żgur ma waslitx.
Ikollok bżonn tara l-log u terġa 'tibda l-istanzi tal-kompitu waqgħu.
Billi tikklikkja fuq kwalunkwe kwadru, naraw l-azzjonijiet disponibbli għalina:
Tista 'tieħu u tagħmel ċari l-waqa'. Jiġifieri, ninsew li xi ħaġa falliet hemmhekk, u l-istess kompitu ta 'istanza se jmur għand l-iskedar.
Huwa ċar li tagħmel dan bil-maws bil-kwadri ħomor kollha mhux uman ħafna - dan mhux dak li nistennew mill-Airflow. Naturalment, għandna armi tal-qerda tal-massa: Browse/Task Instances
Ejja nagħżlu kollox f'daqqa u reset għal żero, ikklikkja l-oġġett it-tajjeb:
Wara t-tindif, it-taxis tagħna jidhru bħal dawn (diġà qed jistennew li l-iskedatur jiskedahom):
Konnessjonijiet, ganċijiet u varjabbli oħra
Wasal iż-żmien li nħarsu lejn id-DAG li jmiss, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Kulħadd qatt għamel aġġornament tar-rapport? Din hija għal darb'oħra tagħha: hemm lista ta 'sorsi minn fejn tikseb id-data; hemm lista fejn tpoġġi; ma ninsewx li tfaċċar meta kollox ġara jew inkiser (tajjeb, dan mhux dwarna, le).
Ejja nerġgħu ngħaddu mill-fajl u nħarsu lejn l-affarijiet oskurati ġodda:
from commons.operators import TelegramBotSendMessage - xejn ma jipprevjeni milli nagħmlu l-operaturi tagħna stess, li ħadna vantaġġ minnhom billi għamilna wrapper żgħir biex nibagħtu messaġġi lil Unblocked. (Se nitkellmu aktar dwar dan l-operatur hawn taħt);
default_args={} - dag tista' tqassam l-istess argumenti lill-operaturi kollha tagħha;
to='{{ var.value.all_the_kings_men }}' - qasam to mhux se jkollna hardcoded, iżda ġenerati b'mod dinamiku bl-użu ta 'Jinja u varjabbli b'lista ta' emails, li nressaq bir-reqqa Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS — il-kondizzjoni biex jibda l-operatur. Fil-każ tagħna, l-ittra se ttir lejn il-pumijiet biss jekk id-dipendenzi kollha jkunu ħadmu b'suċċess;
tg_bot_conn_id='tg_main' - argumenti conn_id jaċċettaw IDs tal-konnessjoni li noħolqu fihom Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - messaġġi f'Telegram se jtiru biss jekk ikun hemm kompiti waqgħu;
task_concurrency=1 - nipprojbixxu t-tnedija simultanja ta' diversi każijiet ta' kompitu ta' kompitu wieħed. Inkella, aħna se tikseb it-tnedija simultanja ta 'diversi VerticaOperator (tħares lejn mejda waħda);
report_update >> [email, tg] - kollha VerticaOperator jikkonverġu biex jibagħtu ittri u messaġġi, bħal dan:
Iżda peress li l-operaturi tan-notifikanti għandhom kundizzjonijiet ta’ tnedija differenti, wieħed biss jaħdem. Fit-Tree View, kollox jidher ftit inqas viżwali:
Se ngħid ftit kliem dwar makro u sħabhom - varjabbli.
Macros huma placeholders Jinja li jistgħu jissostitwixxu informazzjoni utli varji fl-argumenti tal-operatur. Per eżempju, bħal dan:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} se jespandi għall-kontenut tal-varjabbli tal-kuntest execution_date fil-format YYYY-MM-DD: 2020-07-14. L-aħjar parti hija li l-varjabbli tal-kuntest huma nailed għal eżempju ta 'kompitu speċifiku (kwadru fit-Tree View), u meta jerġgħu jibdew, il-placeholders se jespandu għall-istess valuri.
Il-valuri assenjati jistgħu jitqiesu billi tuża l-buttuna Rendered fuq kull istanza tal-kompitu. Dan huwa kif il-kompitu li tibgħat ittra:
U għalhekk fil-kompitu li tibgħat messaġġ:
Lista kompluta ta' macros inkorporati għall-aħħar verżjoni disponibbli hija disponibbli hawn: referenza macros
Barra minn hekk, bl-għajnuna ta 'plugins, nistgħu niddikjaraw macros tagħna stess, iżda dik hija storja oħra.
Minbarra l-affarijiet predefiniti, nistgħu nissostitwixxu l-valuri tal-varjabbli tagħna (diġà użajt dan fil-kodiċi ta 'hawn fuq). Ejja noħolqu ġewwa Admin/Variables ftit affarijiet:
uża biss it-triq għaċ-ċavetta mixtieqa: {{ var.json.bot_config.bot.token }}.
Se litteralment ngħid kelma waħda u nuri screenshot wieħed dwar konnessjonijiet. Kollox huwa elementari hawn: fuq il-paġna Admin/Connections noħolqu konnessjoni, żid il-logins / passwords tagħna u parametri aktar speċifiċi hemmhekk. Bħal dan:
Il-passwords jistgħu jiġu encrypted (aktar bir-reqqa mill-default), jew tista 'tħalli barra t-tip ta' konnessjoni (kif għamilt għal tg_main) - il-fatt hu li l-lista tat-tipi hija wajerjata fil-mudelli Airflow u ma tistax tiġi estiża mingħajr ma tidħol fil-kodiċijiet tas-sors (jekk f'daqqa waħda ma kontx google xi ħaġa, jekk jogħġbok ikkoreġini), iżda xejn ma jwaqqafna milli niksbu krediti biss billi isem.
Tista 'wkoll tagħmel diversi konnessjonijiet bl-istess isem: f'dan il-każ, il-metodu BaseHook.get_connection(), li jġibna konnessjonijiet bl-isem, se jagħti addoċċ minn diversi namesakes (ikun aktar loġiku li tagħmel Round Robin, iżda ejja nħallu fuq il-kuxjenza ta 'l-iżviluppaturi Airflow).
Varjabbli u Konnessjonijiet huma ċertament għodod friski, iżda huwa importanti li ma titlifx il-bilanċ: liema partijiet tal-flussi tiegħek taħżen fil-kodiċi innifsu, u liema partijiet tagħti lil Airflow għall-ħażna. Min-naħa waħda, jista 'jkun konvenjenti li jinbidel malajr il-valur, pereżempju, kaxxa postali, permezz tal-UI. Min-naħa l-oħra, dan għadu ritorn għall-klikk tal-maws, li minnha ridna (jien) neħilsu.
Il-ħidma b'konnessjonijiet hija waħda mill-kompiti ganċijiet. B'mod ġenerali, il-ganċijiet tal-Fluss tal-Ajru huma punti għall-konnessjoni ma 'servizzi u libreriji ta' partijiet terzi. Eż., JiraHook se tiftaħ klijent għalina biex jinteraġixxu ma 'Jira (tista' ċċaqlaq il-kompiti 'l quddiem u 'l quddiem), u bl-għajnuna ta' SambaHook tista 'timbotta fajl lokali biex smb-punt.
Parsing tal-operatur tad-dwana
U sirna viċin li nħarsu lejn kif issir TelegramBotSendMessage
Kodiċi commons/operators.py mal-operatur attwali:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Hawnhekk, bħal kull ħaġa oħra fl-Airflow, kollox huwa sempliċi ħafna:
Wiret minn BaseOperator, li timplimenta pjuttost ftit affarijiet speċifiċi għall-fluss tal-arja (ħares lejn il-ħin liberu tiegħek)
Oqsma ddikjarati template_fields, li fih Jinja se tfittex macros biex tipproċessa.
Irranġat l-argumenti t-tajba għal __init__(), issettja l-inadempjenzi fejn meħtieġ.
Ma ninsewx dwar l-inizjalizzazzjoni tal-antenat lanqas.
Metodu overridden (definit mill-ġdid). BaseOperator.execute(), li Airfow se twitch meta jasal iż-żmien li tniedi l-operatur - fiha se nimplimentaw l-azzjoni ewlenija, u ninsew li tidħol. (Aħna nilloggjaw, bil-mod, eżatt stdout и stderr - Il-fluss ta 'l-arja se jinterċetta kollox, ikebbeb sewwa, jiddekomponih fejn meħtieġ.)
Ejja naraw x'għandna commons/hooks.py. L-ewwel parti tal-fajl, bil-ganċ innifsu:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Lanqas naf x'għandi nispjega hawn, ser ninnota biss il-punti importanti:
Aħna nirtu, aħseb dwar l-argumenti - f'ħafna każijiet se jkun wieħed: conn_id;
Metodi standard li jipprevjenu: I llimitat ruħi get_conn(), li fiha nieħu l-parametri tal-konnessjoni bl-isem u nġib biss is-sezzjoni extra (dan huwa qasam JSON), li fih jien (skond l-istruzzjonijiet tiegħi stess!) Inpoġġi t-token tal-bot Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Noħloq eżempju tagħna TelegramBot, billi tagħtiha token speċifiku.
Dak kollox. Tista 'tikseb klijent minn ganċ bl-użu TelegramBotHook().clent jew TelegramBotHook().get_conn().
U t-tieni parti tal-fajl, li fiha nagħmel microwrapper għat-Telegram REST API, sabiex ma nkaxkarx l-istess python-telegram-bot għal metodu wieħed sendMessage.
Il-mod korrett huwa li żżid kollox: TelegramBotSendMessage, TelegramBotHook, TelegramBot - fil-plugin, poġġi f'repożitorju pubbliku, u agħtih lil Open Source.
Waqt li konna qed nistudjaw dan kollu, l-aġġornamenti tar-rapporti tagħna rnexxielhom ifallu b'suċċess u jibagħtuli messaġġ ta' żball fil-kanal. Se niċċekkja biex nara jekk hux ħażin...
Xi ħaġa kissret fid-doġ tagħna! Mhux hekk konna nistennew? Eżattament!
Int se tferra?
Tħoss li tlift xi ħaġa? Jidher li huwa wiegħed li jittrasferixxi d-data minn SQL Server għal Vertica, u mbagħad ħadha u mċaqlaq barra mis-suġġett, il-kanna!
Din l-atroċità kienet intenzjonata, kelli sempliċement niddeċifra xi terminoloġija għalik. Issa tista’ tmur lil hinn.
Il-pjan tagħna kien dan:
Do dag
Iġġenera ħidmiet
Ara kemm hu sabiħ kollox
Assenja numri tas-sessjoni għall-mili
Ikseb data minn SQL Server
Poġġi d-dejta f'Vertica
Iġbor l-istatistika
Allura, biex dan kollu jibda jaħdem, għamilt żieda żgħira għal tagħna docker-compose.yml:
Vertica bħala ospitanti dwh bl-aktar settings default,
tliet każijiet ta' SQL Server,
aħna nimlew id-databases f'dawn tal-aħħar b'xi dejta (fl-ebda każ ma tħaresx lejn mssql_init.py!)
Inniedu t-tajjeb kollu bl-għajnuna ta 'kmand kemmxejn aktar ikkumplikat mill-aħħar darba:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Dak li randomizer miracle tagħna iġġenerat, tista 'tuża l-oġġett Data Profiling/Ad Hoc Query:
Il-ħaġa prinċipali mhix li turiha lill-analisti
telabora fuq Sessjonijiet ETL Mhux se, kollox huwa trivjali hemmhekk: nagħmlu bażi, hemm sinjal fiha, nagħżlu kollox ma 'maniġer tal-kuntest, u issa nagħmlu dan:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Wasal iż-żmien jiġbru d-data tagħna minn mitt u nofs tabella tagħna. Ejja nagħmlu dan bl-għajnuna ta 'linji bla pretenzjoni ħafna:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Bl-għajnuna ta 'ganċ niksbu minn Airflow pymssql-connect
Issostitwixxi restrizzjoni fil-forma ta 'data fit-talba - se tintefa' fil-funzjoni mill-magna tal-mudell.
It-tmigħ tat-talba tagħna pandasmin se jġibna DataFrame - se jkun utli għalina fil-futur.
Qed nuża sostituzzjoni {dt} minflok parametru talba %s mhux għax jien Pinocchio ħażin, imma għax pandas ma jistax jimmaniġġja pymssql u tiżloq l-aħħar waħda params: Listgħalkemm verament irid tuple.
Innota wkoll li l-iżviluppatur pymssql iddeċieda li ma jappoġġjahx aktar, u wasal iż-żmien li tiċċaqlaq pyodbc.
Ejja naraw b'liema Airflow mimli l-argumenti tal-funzjonijiet tagħna:
Jekk ma jkunx hemm dejta, allura m'hemm l-ebda punt li tkompli. Iżda hija wkoll stramba li tikkunsidra l-mili ta 'suċċess. Iżda dan mhuwiex żball. A-ah-ah, x'għandek tagħmel?! U hawn xi:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException se tgħid Airflow li m'hemm l-ebda żbalji, iżda aħna naqbżu l-kompitu. L-interface mhux se jkollu kwadru aħdar jew aħmar, iżda roża.
ID tas-sessjoni tal-għargħar tagħna (se tkun differenti għal kull kompitu),
Hash mis-sors u l-ID tal-ordni - sabiex fid-database finali (fejn kollox jitferra f'tabella waħda) ikollna ID tal-ordni unika.
Il-pass ta’ qabel tal-aħħar jibqa’: ferra’ kollox ġo Vertica. U, b'mod stramb, wieħed mill-aktar modi spettakolari u effiċjenti biex isir dan huwa permezz tas-CSV!
Fuq il-bejgħ, noħolqu l-pjanċa fil-mira manwalment. Hawnhekk ħallejt lili nnifsi magna żgħira:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
Qed nuża VerticaOperator() Noħloq skema tad-database u tabella (jekk ma jeżistux diġà, ovvjament). Il-ħaġa prinċipali hija li tirranġa b'mod korrett id-dipendenzi:
- Ukoll, - qal il-ġurdien żgħir, - hux, issa
Int konvint li jien l-aktar annimal terribbli fil-foresta?
Julia Donaldson, Il-Gruffalo
Naħseb li kieku jien u l-kollegi tiegħi kellna kompetizzjoni: min se joħloq u jniedi malajr proċess ETL mill-bidu: huma bl-SSIS tagħhom u maws u jien bl-Airflow ... U allura nqabblu wkoll il-faċilità tal-manutenzjoni ... Ara naqra, naħseb li inti taqbel li se ngħaqqadhom minn kull naħa!
Jekk xi ftit aktar bis-serjetà, allura Apache Airflow - billi ddeskriviet proċessi fil-forma ta 'kodiċi tal-programm - għamel ix-xogħol tiegħi ħafna aktar komdu u pjaċevoli.
L-estensibbiltà illimitata tagħha, kemm f’termini ta’ plug-ins kif ukoll ta’ predispożizzjoni għall-iskalabbiltà, tagħtik l-opportunità li tuża Airflow fi kważi kull qasam: anke fiċ-ċiklu sħiħ tal-ġbir, it-tħejjija u l-ipproċessar tad-dejta, anke fit-tnedija tar-rokits (sa Mars, ta’ kors).
Parti finali, referenza u informazzjoni
Ir-rake li ġbarna għalik
start_date. Iva, dan diġà huwa meme lokali. Via l-argument ewlieni ta’ Doug start_date kollha jgħaddu. Fil-qosor, jekk tispeċifika fi start_date data kurrenti, u schedule_interval - jum wieħed, imbagħad DAG se jibda għada mhux qabel.
start_date = datetime(2020, 7, 7, 0, 1, 2)
U mhux aktar problemi.
Hemm żball ieħor ta' runtime assoċjat miegħu: Task is missing the start_date parameter, li ħafna drabi jindika li insejt torbot mal-operatur dag.
Kollha fuq magna waħda. Iva, u bażijiet (Airflow innifsu u kisi tagħna), u web server, u Scheduler, u ħaddiema. U anke ħadem. Iżda maż-żmien, in-numru ta 'kompiti għas-servizzi kiber, u meta PostgreSQL beda jirrispondi għall-indiċi f'20 s minflok 5 ms, ħadna u ġarrejna.
Eżekutur Lokali. Iva, għadna bilqiegħda fuqha, u diġà wasalna fit-tarf tal-abbiss. LocalExecutor kien biżżejjed għalina s'issa, iżda issa wasal iż-żmien li nespandu b'mill-inqas ħaddiem wieħed, u ser ikollna naħdmu ħafna biex nimxu għal CeleryExecutor. U fid-dawl tal-fatt li tista 'taħdem magħha fuq magna waħda, xejn ma jwaqqafk milli tuża Karfus anki fuq server, li "naturalment, qatt mhu se jidħol fil-produzzjoni, onestament!"
Nuqqas ta' użu għodod integrati:
Konnessjonijiet biex taħżen il-kredenzjali tas-servizz,
SLA Miss biex twieġeb għall-kompiti li ma ħadmux fil-ħin,
xcom għall-iskambju tal-metadata (għidt metadata!) bejn il-kompiti dag.
Abbuż tal-posta. Ukoll, x'nista 'ngħid? Ġew stabbiliti twissijiet għar-repetizzjonijiet kollha tal-kompiti waqgħu. Issa l-Gmail tax-xogħol tiegħi għandu > 90k email mill-Airflow, u l-geddum tal-posta tal-web jirrifjuta li jiġbor u jħassar aktar minn 100 kull darba.
Sabiex naħdmu aktar b'rasna u mhux b'idejna, Airflow ħejja għalina dan:
SERĦAN API - għad għandu l-istatus ta' Sperimentali, li ma jwaqqafx milli jaħdem. Biha, tista 'mhux biss tikseb informazzjoni dwar dags u ħidmiet, iżda wkoll twaqqaf/tibda dag, toħloq DAG Run jew pool.
CLI - ħafna għodod huma disponibbli permezz tal-linja tal-kmand li mhumiex biss inkonvenjenti biex jintużaw permezz tal-WebUI, iżda ġeneralment huma assenti. Pereżempju:
backfill meħtieġa biex jerġgħu jibdew l-istanzi tal-kompitu.
Per eżempju, l-analisti ġew u qalu: “U int, sħabu, għandek xi ħaġa bla sens fid-dejta mill-1 sat-13 ta’ Jannar! Waħħalha, irranġaha, irranġaha, irranġaha!” U int tali hob:
run, li jippermettilek tmexxi kompitu ta 'istanza waħda, u anke punteġġ fuq id-dipendenzi kollha. Barra minn hekk, tista 'taħdemha permezz LocalExecutor, anki jekk għandek raggruppament Karfus.
Jagħmel pjuttost l-istess ħaġa test, biss ukoll fil-bażijiet ma jikteb xejn.
connections jippermetti ħolqien tal-massa ta 'konnessjonijiet mill-qoxra.
API Python - mod pjuttost iebes ta 'interazzjoni, li huwa maħsub għall-plugins, u mhux swarming fih b'idejn żgħar. Imma min iwaqqafna milli mmorru /home/airflow/dags, run ipython u tibda taqbad? Tista', pereżempju, tesporta l-konnessjonijiet kollha bil-kodiċi li ġej:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Konnessjoni mal-metadatabase Airflow. Ma nirrakkomandax li nikteb lilha, iżda li tikseb stati tal-kompitu għal diversi metriċi speċifiċi jista 'jkun ħafna aktar mgħaġġel u eħfef milli tuża kwalunkwe mill-APIs.
Ejja ngħidu li mhux il-kompiti tagħna kollha huma idempotenti, iżda xi drabi jistgħu jaqgħu, u dan huwa normali. Iżda ftit imblukkar huma diġà suspettużi, u jkun meħtieġ li tiċċekkja.
Oqgħod attent SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
referenzi
U ovvjament, l-ewwel għaxar links mill-ħruġ ta 'Google huma l-kontenut tal-folder Airflow mill-bookmarks tiegħi.
DAG Kitba Aħjar Prattiki fl-Apache Airflow - dwar l-idempotenza tal-kompiti, it-tagħbija bl-ID minflok id-data, it-trasformazzjoni, l-istruttura tal-fajl u affarijiet interessanti oħra.
Iż-Zen ta 'Python u Apache Airflow - twassil impliċitu tad-DAG, kuntest li jitfa' funzjonijiet, għal darb'oħra dwar id-dipendenzi, u wkoll dwar it-tnedija tal-kompiti ta' qbiż.