Kumusta, ako si Dmitry Logvinenko - Data Engineer ng Analytics Department ng Vezet group of companies.
Sasabihin ko sa iyo ang tungkol sa isang kahanga-hangang tool para sa pagbuo ng mga proseso ng ETL - Apache Airflow. Ngunit ang Airflow ay napaka-versatile at multifaceted na dapat mong tingnan ito nang mabuti kahit na hindi ka kasali sa mga daloy ng data, ngunit kailangan mong pana-panahong maglunsad ng anumang mga proseso at subaybayan ang kanilang pagpapatupad.
At oo, hindi ko lang sasabihin, ngunit ipapakita din: ang programa ay may maraming code, mga screenshot at rekomendasyon.

Ano ang karaniwan mong nakikita kapag nag-google ka ng salitang Airflow / Wikimedia Commons
Talaan ng nilalaman
Pagpapakilala
Ang Apache Airflow ay katulad ng Django:
- nakasulat sa python
- mayroong isang mahusay na admin panel,
- lumalawak nang walang katapusan
- mas mabuti lamang, at ito ay ginawa para sa ganap na magkakaibang mga layunin, lalo na (tulad ng nakasulat bago ang kata):
- pagpapatakbo at pagsubaybay sa mga gawain sa isang walang limitasyong bilang ng mga makina (tulad ng maraming Celery / Kubernetes at ang iyong budhi ay magbibigay-daan sa iyo)
- na may dynamic na henerasyon ng daloy ng trabaho mula sa napakadaling isulat at maunawaan ang Python code
- at ang kakayahang kumonekta sa anumang mga database at API sa isa't isa gamit ang parehong handa na mga bahagi at gawang bahay na mga plugin (na napakasimple).
Ginagamit namin ang Apache Airflow tulad nito:
- nangongolekta kami ng data mula sa iba't ibang source (maraming SQL Server at PostgreSQL instance, iba't ibang API na may mga sukatan ng application, kahit 1C) sa DWH at ODS (mayroon kaming Vertica at Clickhouse).
- gaano ka advanced
cron, na nagsisimula sa mga proseso ng pagsasama-sama ng data sa ODS, at sinusubaybayan din ang kanilang pagpapanatili.
Hanggang kamakailan, ang aming mga pangangailangan ay sakop ng isang maliit na server na may 32 core at 50 GB ng RAM. Sa Airflow, ito ay gumagana:
- pa 200 araw (talagang mga daloy ng trabaho, kung saan pinalamanan namin ang mga gawain),
- sa bawat isa sa karaniwan 70 gawain,
- ang kabutihang ito ay nagsisimula (sa karaniwan din) minsan sa isang oras.
At tungkol sa kung paano tayo lumawak, magsusulat ako sa ibaba, ngunit ngayon tukuyin natin ang über-problema na malulutas natin:
Mayroong tatlong pinagmulang SQL Server, bawat isa ay may 50 database - mga pagkakataon ng isang proyekto, ayon sa pagkakabanggit, mayroon silang parehong istraktura (halos saanman, mua-ha-ha), na nangangahulugan na ang bawat isa ay may isang talahanayan ng Mga Order (sa kabutihang palad, isang talahanayan na may ganoong pangalan ay maaaring itulak sa anumang negosyo). Kinukuha namin ang data sa pamamagitan ng pagdaragdag ng mga field ng serbisyo (source server, source database, ETL task ID) at walang muwang na itinapon ang mga ito sa, sabihin nating, Vertica.
Sabihin pumunta!
Ang pangunahing bahagi, praktikal (at isang maliit na teoretikal)
Bakit tayo (at ikaw)
Nung malalaki na ang mga puno at simple lang ako SQL-schik sa isang retail na Ruso, na-scam namin ang mga proseso ng ETL aka daloy ng data gamit ang dalawang tool na available sa amin:
- Informatica Power Center - isang napakalawak na sistema, lubhang produktibo, na may sariling hardware, sariling bersyon. Ginamit ko ang 1% ng mga kakayahan nito. Bakit? Buweno, una sa lahat, ang interface na ito, sa isang lugar mula noong 380s, ay naglalagay ng presyon sa amin. Pangalawa, ang gamit na ito ay idinisenyo para sa napakagandang proseso, galit na galit na muling paggamit ng bahagi at iba pang napakahalagang-enterprise-trick. Tungkol sa kung ano ang gastos, tulad ng pakpak ng Airbus AXNUMX / taon, wala kaming sasabihin.
Mag-ingat, ang isang screenshot ay maaaring makasakit ng kaunti sa mga taong wala pang 30

- SQL Server Integration Server - ginamit namin ang kasamang ito sa aming mga daloy ng intra-proyekto. Well, sa katunayan: gumagamit na kami ng SQL Server, at kahit papaano ay hindi makatwiran na hindi gamitin ang mga tool na ETL nito. Lahat ng nasa loob nito ay maganda: parehong maganda ang interface, at ang mga ulat ng pag-unlad ... Ngunit hindi ito ang dahilan kung bakit gustung-gusto namin ang mga produkto ng software, oh, hindi para dito. Bersyon ito
dtsx(na XML na may mga node na na-shuffle sa pag-save) magagawa natin, ngunit ano ang punto? Paano ang tungkol sa paggawa ng isang pakete ng gawain na magda-drag ng daan-daang mga talahanayan mula sa isang server patungo sa isa pa? Oo, kung ano ang isang daan, ang iyong hintuturo ay mahuhulog mula sa dalawampung piraso, pag-click sa pindutan ng mouse. Ngunit tiyak na mukhang mas sunod sa moda:
Tiyak na naghanap kami ng mga paraan. Kaso kahit halos dumating sa isang self-written SSIS package generator ...
…at pagkatapos ay isang bagong trabaho ang nakahanap sa akin. At naabutan ako ng Apache Airflow dito.
Nang malaman ko na ang mga paglalarawan sa proseso ng ETL ay simpleng Python code, hindi lang ako sumayaw sa tuwa. Ito ay kung paano ang mga stream ng data ay na-version at diffed, at ang pagbuhos ng mga talahanayan na may isang solong istraktura mula sa daan-daang mga database sa isang target ay naging isang bagay ng Python code sa isa at kalahati o dalawang 13 "screens.
Pagtitipon ng kumpol
Huwag nating ayusin ang isang ganap na kindergarten, at huwag pag-usapan ang mga ganap na halatang bagay dito, tulad ng pag-install ng Airflow, ang iyong napiling database, Celery at iba pang mga kaso na inilarawan sa mga pantalan.
Para makapagsimula agad kami ng mga eksperimento, nag-sketch ako docker-compose.yml kung saan:
- Talagang taasan natin Airflow: Scheduler, Webserver. Iikot din doon ang bulaklak para subaybayan ang mga gawain ng Celery (dahil na-push na ito
apache/airflow:1.10.10-python3.7, ngunit hindi kami tututol) - PostgreSQL, kung saan isusulat ng Airflow ang impormasyon ng serbisyo nito (data ng scheduler, mga istatistika ng pagpapatupad, atbp.), at markahan ng Celery ang mga natapos na gawain;
- Redis, na magsisilbing task broker para sa Celery;
- Manggagawa ng kintsay, na sasabak sa direktang pagsasagawa ng mga gawain.
- Sa folder
./dagsidaragdag namin ang aming mga file na may paglalarawan ng dags. Dadalhin sila sa mabilisang paraan, kaya hindi na kailangang i-juggle ang buong stack pagkatapos ng bawat pagbahin.
Sa ilang mga lugar, ang code sa mga halimbawa ay hindi ganap na ipinapakita (upang hindi makalat ang teksto), ngunit sa isang lugar ito ay binago sa proseso. Ang mga kumpletong halimbawa ng working code ay matatagpuan sa repositoryo .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerremarks:
- Sa pagpupulong ng komposisyon, higit na umasa ako sa kilalang imahe - siguraduhing suriin ito. Baka wala ka nang kailangan sa buhay mo.
- Ang lahat ng mga setting ng Airflow ay magagamit hindi lamang sa pamamagitan ng
airflow.cfg, ngunit sa pamamagitan din ng mga variable ng kapaligiran (salamat sa mga developer), na sinamantala ko nang masama. - Naturally, hindi ito handa sa produksyon: Sinadya kong hindi naglagay ng mga tibok ng puso sa mga lalagyan, hindi ako nag-abala sa seguridad. Ngunit ginawa ko ang pinakamababang angkop para sa aming mga eksperimento.
- Tandaan na:
- Ang dag folder ay dapat na ma-access ng scheduler at ng mga manggagawa.
- Ang parehong naaangkop sa lahat ng mga third-party na aklatan - lahat sila ay dapat na naka-install sa mga machine na may scheduler at mga manggagawa.
Well, ngayon ito ay simple:
$ docker-compose up --scale worker=3Pagkatapos bumangon ang lahat, maaari mong tingnan ang mga web interface:
- Daloy ng hangin:
- Bulaklak:
Mga pangunahing konsepto
Kung wala kang naiintindihan sa lahat ng "dags" na ito, narito ang isang maikling diksyunaryo:
- Scheduler - ang pinakamahalagang tiyuhin sa Airflow, na kinokontrol na ang mga robot ay nagtatrabaho nang husto, at hindi isang tao: sinusubaybayan ang iskedyul, nag-a-update ng mga araw, naglulunsad ng mga gawain.
Sa pangkalahatan, sa mga mas lumang bersyon, nagkaroon siya ng mga problema sa memorya (hindi, hindi amnesia, ngunit tumutulo) at ang legacy na parameter ay nanatili pa sa mga config.
run_duration— ang agwat ng pag-restart nito. Pero ngayon maayos na ang lahat. - Magdaga (aka "dag") - "itinuro ang acyclic graph", ngunit ang ganitong kahulugan ay magsasabi sa ilang tao, ngunit sa katunayan ito ay isang lalagyan para sa mga gawain na nakikipag-ugnayan sa isa't isa (tingnan sa ibaba) o isang analogue ng Package sa SSIS at Workflow sa Informatica .
Bilang karagdagan sa mga dag, maaaring mayroon pa ring mga subdag, ngunit malamang na hindi natin sila mapupuntahan.
- DAG Run - inisyal na dag, na itinalaga sa sarili nito
execution_date. Ang mga Dagran ng parehong dag ay maaaring gumana nang magkatulad (kung ginawa mong idempotent ang iyong mga gawain, siyempre). - Opereytor ay mga piraso ng code na responsable para sa pagsasagawa ng isang partikular na aksyon. Mayroong tatlong uri ng mga operator:
- aksyonparang paborito natin
PythonOperator, na maaaring magsagawa ng anumang (wastong) Python code; - ilipat, na nagdadala ng data mula sa isang lugar patungo sa lugar, sabihin nating,
MsSqlToHiveTransfer; - sensor sa kabilang banda, ito ay magbibigay-daan sa iyo na mag-react o pabagalin ang karagdagang pagpapatupad ng dag hanggang sa mangyari ang isang kaganapan.
HttpSensormaaaring hilahin ang tinukoy na endpoint, at kapag naghihintay ang nais na tugon, simulan ang paglipatGoogleCloudStorageToS3Operator. Magtatanong ang isang matanong na isip: “bakit? Pagkatapos ng lahat, maaari kang gumawa ng mga pag-uulit sa mismong operator!" At pagkatapos, upang hindi mabara ang pool ng mga gawain sa mga nasuspinde na operator. Ang sensor ay nagsisimula, nagsusuri at namatay bago ang susunod na pagtatangka.
- aksyonparang paborito natin
- Gawain - ang mga ipinahayag na operator, anuman ang uri, at naka-attach sa dag ay na-promote sa ranggo ng gawain.
- halimbawa ng gawain - nang magpasya ang pangkalahatang tagaplano na oras na upang magpadala ng mga gawain sa labanan sa mga performer-manggagawa (sa mismong lugar, kung gagamitin natin
LocalExecutoro sa isang malayong node sa kaso ngCeleryExecutor), nagtatalaga ito ng konteksto sa kanila (ibig sabihin, isang set ng mga variable - mga parameter ng pagpapatupad), nagpapalawak ng mga template ng command o query, at pinagsama ang mga ito.
Bumubuo kami ng mga gawain
Una, balangkasin natin ang pangkalahatang pamamaraan ng ating doug, at pagkatapos ay sumisid tayo sa mga detalye nang higit pa at higit pa, dahil nag-aaplay tayo ng ilang mga di-maliit na solusyon.
Kaya, sa pinakasimpleng anyo nito, ang gayong dag ay magiging ganito:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Alamin natin ito:
- Una, ini-import namin ang mga kinakailangang libs at iba pa;
sql_server_ds- AyList[namedtuple[str, str]]kasama ang mga pangalan ng mga koneksyon mula sa Airflow Connections at ang mga database kung saan kami kukuha ng aming plato;dag- ang anunsyo ng ating dag, na dapat ay nasaglobals(), kung hindi, hindi ito mahahanap ng Airflow. Kailangan ding sabihin ni Doug:- Ano ang pangalan niya
orders- lalabas ang pangalang ito sa web interface, - na siya ay magtatrabaho mula hatinggabi sa ikawalo ng Hulyo,
- at dapat itong tumakbo, humigit-kumulang bawat 6 na oras (para sa mga mahihirap na lalaki dito sa halip na
timedelta()matanggapcron-linya0 0 0/6 ? * * *, para sa hindi gaanong cool - isang expression tulad ng@daily);
- Ano ang pangalan niya
workflow()gagawin ang pangunahing trabaho, ngunit hindi ngayon. Sa ngayon, itatapon lang namin ang aming konteksto sa log.- At ngayon ang simpleng magic ng paglikha ng mga gawain:
- tumatakbo kami sa aming mga mapagkukunan;
- magpasimula
PythonOperator, na magpapatupad ng ating dummyworkflow(). Huwag kalimutang tukuyin ang isang natatanging (sa loob ng dag) na pangalan ng gawain at itali ang dag mismo. Bandilaprovide_contextsa turn, ay magbubuhos ng karagdagang mga argumento sa function, na maingat naming kokolektahin gamit**context.
Sa ngayon, yun lang. Ang nakuha namin:
- bagong dag sa web interface,
- isa at kalahating daang gawain na isasagawa nang magkatulad (kung pinapayagan ito ng Airflow, Celery na mga setting at kapasidad ng server).
Well, halos nakuha ito.

Sino ang mag-i-install ng mga dependencies?
Upang pasimplehin ang buong bagay na ito, nag-screwed ako docker-compose.yml pagpoproseso requirements.txt sa lahat ng node.
Ngayon ay wala na:

Ang mga gray na parisukat ay mga instance ng gawain na pinoproseso ng scheduler.
Naghintay kami ng kaunti, ang mga gawain ay kinuha ng mga manggagawa:

Ang mga berde, siyempre, ay matagumpay na nakumpleto ang kanilang trabaho. Ang mga pula ay hindi masyadong matagumpay.
Oo nga pala, walang folder sa prod namin
./dags, walang pag-synchronize sa pagitan ng mga makina - lahat ng dags ay namamalagigitsa aming Gitlab, at ang Gitlab CI ay namamahagi ng mga update sa mga makina kapag nagsasamamaster.
Medyo tungkol sa Flower
Habang hinahampas ng mga manggagawa ang ating mga pacifier, alalahanin natin ang isa pang tool na maaaring magpakita sa atin ng isang bagay - Bulaklak.
Ang pinakaunang pahina na may buod ng impormasyon sa mga node ng manggagawa:

Ang pinaka-matinding page na may mga gawaing napunta sa trabaho:

Ang pinaka nakakainip na page na may status ng aming broker:

Ang pinakamaliwanag na pahina ay may mga graph ng katayuan ng gawain at ang kanilang oras ng pagpapatupad:

Nilo-load namin ang underloaded
Kaya, lahat ng mga gawain ay nagtrabaho, maaari mong dalhin ang mga nasugatan.

At maraming nasugatan - sa isang kadahilanan o iba pa. Sa kaso ng tamang paggamit ng Airflow, ang mismong mga parisukat na ito ay nagpapahiwatig na ang data ay tiyak na hindi dumating.
Kailangan mong panoorin ang log at i-restart ang mga nahulog na pagkakataon sa gawain.
Sa pamamagitan ng pag-click sa anumang parisukat, makikita namin ang mga aksyon na magagamit sa amin:

Maaari mong kunin at gawin ang Clear the fallen. Iyon ay, nakalimutan namin na may isang bagay na nabigo doon, at ang parehong gawain ng halimbawa ay mapupunta sa scheduler.

Malinaw na ang paggawa nito gamit ang mouse gamit ang lahat ng pulang parisukat ay hindi masyadong makatao - hindi ito ang inaasahan natin mula sa Airflow. Naturally, mayroon tayong mga sandata ng malawakang pagkawasak: Browse/Task Instances

Piliin natin ang lahat nang sabay-sabay at i-reset sa zero, i-click ang tamang item:

Pagkatapos maglinis, ganito ang hitsura ng aming mga taxi (hinihintay na nila ang scheduler na mag-iskedyul ng mga ito):

Mga koneksyon, kawit at iba pang mga variable
Oras na para tingnan ang susunod na DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Nakagawa na ba ang lahat ng update sa ulat? Ito na naman siya: may listahan ng mga source kung saan kukuha ng data; mayroong isang listahan kung saan ilalagay; huwag kalimutang bumusina kapag nangyari o nasira ang lahat (well, this is not about us, no).
Balikan natin ang file at tingnan ang mga bagong hindi kilalang bagay:
from commons.operators import TelegramBotSendMessage- walang pumipigil sa amin na gumawa ng sarili naming mga operator, na sinamantala namin sa pamamagitan ng paggawa ng maliit na wrapper para sa pagpapadala ng mga mensahe sa Unblocked. (Mag-uusap pa kami tungkol sa operator na ito sa ibaba);default_args={}- maaaring ipamahagi ng dag ang parehong mga argumento sa lahat ng mga operator nito;to='{{ var.value.all_the_kings_men }}'- patlangtohindi kami magkakaroon ng hardcoded, ngunit dynamic na nabuo gamit ang Jinja at isang variable na may listahan ng mga email, na maingat kong inilagayAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— kundisyon para simulan ang operator. Sa aming kaso, ang liham ay lilipad lamang sa mga boss kung ang lahat ng mga dependency ay gumana matagumpay;tg_bot_conn_id='tg_main'- mga argumentoconn_idtanggapin ang mga connection ID kung saan kami gumagawaAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- Ang mga mensahe sa Telegram ay lilipad lamang kung may mga bumagsak na gawain;task_concurrency=1- ipinagbabawal namin ang sabay-sabay na paglulunsad ng ilang pagkakataon ng gawain ng isang gawain. Kung hindi, makukuha natin ang sabay-sabay na paglulunsad ng ilanVerticaOperator(nakatingin sa isang table);report_update >> [email, tg]- lahatVerticaOperatornagsasama-sama sa pagpapadala ng mga liham at mensahe, tulad nito:

Ngunit dahil may iba't ibang kundisyon sa paglulunsad ang mga operator ng notifier, isa lang ang gagana. Sa Tree View, ang lahat ay mukhang hindi gaanong nakikita:

Magsasabi ako ng ilang mga salita tungkol sa mga macro at ang kanilang mga kaibigan - mga variable.
Ang mga macro ay mga placeholder ng Jinja na maaaring palitan ang iba't ibang kapaki-pakinabang na impormasyon sa mga argumento ng operator. Halimbawa, tulad nito:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} lalawak sa mga nilalaman ng variable ng konteksto execution_date sa format YYYY-MM-DD: 2020-07-14. Ang pinakamagandang bahagi ay ang mga variable ng konteksto ay ipinako sa isang partikular na halimbawa ng gawain (isang parisukat sa Tree View), at kapag na-restart, ang mga placeholder ay lalawak sa parehong mga halaga.
Ang mga itinalagang halaga ay maaaring matingnan gamit ang Na-render na pindutan sa bawat halimbawa ng gawain. Ganito ang gawain sa pagpapadala ng liham:

At kaya sa gawain sa pagpapadala ng mensahe:

Available dito ang kumpletong listahan ng mga built-in na macro para sa pinakabagong available na bersyon:
Bukod dito, sa tulong ng mga plugin, maaari naming ipahayag ang aming sariling mga macro, ngunit iyon ay isa pang kuwento.
Bilang karagdagan sa mga paunang natukoy na bagay, maaari naming palitan ang mga halaga ng aming mga variable (ginamit ko na ito sa code sa itaas). Lumikha tayo Admin/Variables ilang bagay:

Lahat ng magagamit mo:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Ang value ay maaaring isang scalar, o maaari rin itong JSON. Sa kaso ng JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}gamitin lamang ang landas sa nais na susi: {{ var.json.bot_config.bot.token }}.
Literal na sasabihin ko ang isang salita at magpapakita ng isang screenshot tungkol sa соединения. Ang lahat ay elementarya dito: sa pahina Admin/Connections lumikha kami ng isang koneksyon, idagdag ang aming mga pag-login / password at mas tiyak na mga parameter doon. Ganito:

Maaaring i-encrypt ang mga password (mas lubusan kaysa sa default), o maaari mong iwanan ang uri ng koneksyon (tulad ng ginawa ko para sa tg_main) - ang katotohanan ay ang listahan ng mga uri ay naka-hardwired sa mga modelo ng Airflow at hindi mapapalawak nang hindi nakapasok sa mga source code (kung bigla akong hindi nag-google ng isang bagay, mangyaring itama ako), ngunit walang makakapigil sa amin na makakuha ng mga kredito sa pamamagitan lamang ng pangalan.
Maaari ka ring gumawa ng ilang mga koneksyon na may parehong pangalan: sa kasong ito, ang pamamaraan BaseHook.get_connection(), na nagbibigay sa amin ng mga koneksyon ayon sa pangalan, ay magbibigay random mula sa ilang mga pangalan (mas lohikal na gawin ang Round Robin, ngunit hayaan natin ito sa budhi ng mga developer ng Airflow).
Ang mga Variable at Koneksyon ay tiyak na mga cool na tool, ngunit mahalagang hindi mawalan ng balanse: kung aling mga bahagi ng iyong mga daloy ang iniimbak mo sa mismong code, at kung aling mga bahagi ang ibibigay mo sa Airflow para sa imbakan. Sa isang banda, maaari itong maging maginhawa upang mabilis na baguhin ang halaga, halimbawa, isang mailing box, sa pamamagitan ng UI. Sa kabilang banda, ito ay isang pagbabalik pa rin sa pag-click ng mouse, kung saan nais naming (ko) na alisin.
Ang pagtatrabaho sa mga koneksyon ay isa sa mga gawain mga kawit. Sa pangkalahatan, ang mga Airflow hook ay mga punto para sa pagkonekta nito sa mga serbisyo at library ng third-party. Hal, JiraHook ay magbubukas ng isang kliyente para makipag-ugnayan tayo kay Jira (maaari mong ilipat ang mga gawain pabalik-balik), at sa tulong ng SambaHook maaari mong itulak ang isang lokal na file sa smb-punto.
Pag-parse ng custom na operator
At malapit na naming tingnan kung paano ito ginawa TelegramBotSendMessage
Kodigo commons/operators.py kasama ang aktwal na operator:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Dito, tulad ng lahat ng iba pa sa Airflow, ang lahat ay napaka-simple:
- Nagmana sa
BaseOperator, na nagpapatupad ng ilang bagay na partikular sa Airflow (tingnan ang iyong paglilibang) - Ipinahayag na mga patlang
template_fields, kung saan maghahanap si Jinja ng mga macro na ipoproseso. - Inayos ang mga tamang argumento para sa
__init__(), itakda ang mga default kung saan kinakailangan. - Hindi rin namin nakalimutan ang pagsisimula ng ninuno.
- Binuksan ang kaukulang hook
TelegramBotHooknakatanggap ng isang bagay ng kliyente mula dito. - Overridden (muling tinukoy) na pamamaraan
BaseOperator.execute(), kung saan ang Airfow ay kikibot pagdating ng oras upang ilunsad ang operator - sa loob nito ay ipapatupad namin ang pangunahing aksyon, na nakakalimutang mag-log in. (Nag-log in kami, sa pamamagitan ng paraan, papasokstdoutиstderr- Harangin ng daloy ng hangin ang lahat, ibalot ito nang maganda, mabubulok kung kinakailangan.)
Tingnan natin kung ano ang mayroon tayo commons/hooks.py. Ang unang bahagi ng file, kasama ang hook mismo:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientHindi ko alam kung ano ang ipaliwanag dito, papansinin ko lang ang mga mahahalagang punto:
- Nagmana kami, isipin ang tungkol sa mga argumento - sa karamihan ng mga kaso ito ay magiging isa:
conn_id; - Overriding sa mga karaniwang pamamaraan: Nilimitahan ko ang aking sarili
get_conn(), kung saan nakukuha ko ang mga parameter ng koneksyon ayon sa pangalan at kunin lang ang seksyonextra(ito ay isang field ng JSON), kung saan ako (ayon sa sarili kong mga tagubilin!) ay naglagay ng Telegram bot token:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Lumilikha ako ng isang halimbawa ng aming
TelegramBot, binibigyan ito ng partikular na token.
Iyon lang. Maaari kang makakuha ng isang kliyente mula sa isang hook gamit TelegramBotHook().clent o TelegramBotHook().get_conn().
At ang pangalawang bahagi ng file, kung saan gumawa ako ng microwrapper para sa Telegram REST API, upang hindi i-drag ang pareho para sa isang paraan sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Ang tamang paraan ay idagdag ang lahat ng ito:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- sa plugin, ilagay sa isang pampublikong imbakan, at ibigay ito sa Open Source.
Habang pinag-aaralan namin ang lahat ng ito, matagumpay na nabigo ang aming mga update sa ulat at nagpadala sa akin ng mensahe ng error sa channel. Titingnan ko kung mali...

May nabasag sa aming aso! Hindi ba iyon ang inaasahan natin? Eksakto!
Magbubuhos ka ba?
Feeling mo may na-miss ako? Tila nangako siyang maglilipat ng data mula sa SQL Server patungo sa Vertica, at pagkatapos ay kinuha niya ito at inilipat ang paksa, ang scoundrel!
Ang kabangisan na ito ay sinadya, kailangan ko lang mag-decipher ng ilang terminolohiya para sa iyo. Ngayon ay maaari kang magpatuloy.
Ang plano namin ay ito:
- Gawin mo
- Bumuo ng mga gawain
- Tingnan kung gaano kaganda ang lahat
- Magtalaga ng mga numero ng session upang punan
- Kumuha ng data mula sa SQL Server
- Ilagay ang data sa Vertica
- Kolektahin ang mga istatistika
Kaya, upang maisakatuparan ang lahat ng ito, gumawa ako ng isang maliit na karagdagan sa aming docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyDoon itinataas namin:
- Vertica bilang host
dwhna may pinakamaraming default na setting, - tatlong pagkakataon ng SQL Server,
- pinupuno namin ang mga database sa huli ng ilang data (sa anumang kaso ay hindi tumingin sa
mssql_init.py!)
Inilunsad namin ang lahat ng mabuti sa tulong ng isang bahagyang mas kumplikadong utos kaysa sa huling pagkakataon:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Kung ano ang nabuo ng aming miracle randomizer, maaari mong gamitin ang item Data Profiling/Ad Hoc Query:

Ang pangunahing bagay ay hindi ipakita ito sa mga analyst
ipaliwanag sa Mga sesyon ng ETL I won't, everything is trivial there: we make a base, there is a sign in it, we wrap everything with a context manager, and now we do this:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passDumating na ang oras kolektahin ang aming data mula sa aming isa at kalahating daang mesa. Gawin natin ito sa tulong ng napaka-hindi mapagpanggap na mga linya:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Sa tulong ng isang hook na nakukuha natin mula sa Airflow
pymssql-kunekta - Palitan natin ang isang paghihigpit sa anyo ng isang petsa sa kahilingan - ito ay itatapon sa function ng template engine.
- Pagpapakain sa aming kahilingan
pandassino ang kukuha sa atinDataFrame- ito ay magiging kapaki-pakinabang sa atin sa hinaharap.
Gumagamit ako ng substitution
{dt}sa halip na isang parameter ng kahilingan%shindi dahil isa akong masamang Pinocchio, kundi dahilpandashindi makayananpymssqlat dumulas ang huliparams: Listalthough gusto niya talagatuple.
Tandaan din na ang developerpymssqlnagpasya na hindi na siya suportahan, at oras na para umalispyodbc.
Tingnan natin kung ano ang pinalamanan ng Airflow sa mga argumento ng aming mga function:

Kung walang data, walang saysay na magpatuloy. Ngunit kakaiba din na isaalang-alang ang pagpuno na matagumpay. Ngunit hindi ito isang pagkakamali. A-ah-ah, anong gagawin?! At narito kung ano:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException nagsasabi sa Airflow na walang mga error, ngunit nilalaktawan namin ang gawain. Ang interface ay hindi magkakaroon ng berde o pulang parisukat, ngunit pink.
Itapon natin ang ating data maraming column:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Katulad
- Ang database kung saan namin kinuha ang mga order,
- ID ng aming session sa pagbaha (magiiba ito para sa bawat gawain),
- Isang hash mula sa source at order ID - upang sa huling database (kung saan ang lahat ay ibinuhos sa isang table) mayroon kaming natatanging order ID.
Ang penultimate na hakbang ay nananatili: ibuhos ang lahat sa Vertica. At, kakaiba, isa sa mga pinakakahanga-hanga at mahusay na paraan upang gawin ito ay sa pamamagitan ng CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Gumagawa kami ng isang espesyal na receiver
StringIO. pandasay mabait na ilagay ang amingDataFramesa formCSV-mga linya.- Magbukas tayo ng koneksyon sa paborito nating Vertica na may kawit.
- At ngayon sa tulong
copy()ipadala ang aming data nang direkta sa Vertika!
Kukunin namin mula sa driver kung gaano karaming mga linya ang napunan, at sasabihin sa session manager na ang lahat ay OK:
session.loaded_rows = cursor.rowcount
session.successful = TrueYun lang
Sa pagbebenta, ginagawa namin nang manu-mano ang target na plato. Dito ko pinayagan ang aking sarili ng isang maliit na makina:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)Gumagamit ako
VerticaOperator()Lumilikha ako ng isang database schema at isang talahanayan (kung hindi pa sila umiiral, siyempre). Ang pangunahing bagay ay ang wastong ayusin ang mga dependencies:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadLagom
- Buweno, - sabi ng maliit na daga, - hindi ba, ngayon
Kumbinsido ka ba na ako ang pinaka-kahila-hilakbot na hayop sa kagubatan?
Julia Donaldson, Ang Gruffalo
Sa palagay ko kung ang aking mga kasamahan at ako ay nagkaroon ng isang kumpetisyon: sino ang mabilis na lilikha at maglulunsad ng isang proseso ng ETL mula sa simula: sila kasama ang kanilang SSIS at isang mouse at ako ay may Airflow ... At pagkatapos ay ihahambing din namin ang kadalian ng pagpapanatili ... Wow, sa tingin ko ay sasang-ayon ka na talunin ko sila sa lahat ng larangan!
Kung medyo seryoso, pagkatapos ay ginawa ng Apache Airflow - sa pamamagitan ng paglalarawan ng mga proseso sa anyo ng program code - ang aking trabaho marami mas komportable at masaya.
Ang walang limitasyong pagpapalawak nito, kapwa sa mga tuntunin ng mga plug-in at predisposition sa scalability, ay nagbibigay sa iyo ng pagkakataong gumamit ng Airflow sa halos anumang lugar: kahit na sa buong cycle ng pagkolekta, paghahanda at pagproseso ng data, kahit na sa paglulunsad ng mga rocket (sa Mars, ng kurso).
Pangwakas na bahagi, sanggunian at impormasyon
Ang kalaykay na nakolekta namin para sa iyo
start_date. Oo, isa na itong lokal na meme. Sa pamamagitan ng pangunahing argumento ni Dougstart_datepasado lahat. Sa madaling sabi, kung tinukoy mo sastart_datekasalukuyang petsa, atschedule_interval- isang araw, pagkatapos ay magsisimula ang DAG bukas nang hindi mas maaga.start_date = datetime(2020, 7, 7, 0, 1, 2)At wala nang problema.
May isa pang runtime error na nauugnay dito:
Task is missing the start_date parameter, na kadalasang nagpapahiwatig na nakalimutan mong itali sa operator ng dag.- Lahat sa isang makina. Oo, at mga base (Airflow mismo at ang aming coating), at isang web server, at isang scheduler, at mga manggagawa. At gumana pa ito. Ngunit sa paglipas ng panahon, ang bilang ng mga gawain para sa mga serbisyo ay lumago, at nang magsimulang tumugon ang PostgreSQL sa index sa loob ng 20 s sa halip na 5 ms, kinuha namin ito at dinala ito.
- LocalExecutor. Oo, nakaupo pa rin kami dito, at nakarating na kami sa gilid ng bangin. Ang LocalExecutor ay sapat na para sa amin sa ngayon, ngunit ngayon ay oras na upang palawakin kasama ng kahit isang manggagawa, at kailangan naming magtrabaho nang husto upang lumipat sa CeleryExecutor. At dahil sa katotohanan na maaari mong gawin ito sa isang makina, walang pumipigil sa iyo na gumamit ng Celery kahit sa isang server, na "siyempre, hindi kailanman mapupunta sa produksyon, sa totoo lang!"
- Hindi nagagamit built-in na mga tool:
- Connections upang mag-imbak ng mga kredensyal ng serbisyo,
- SLA Miss upang tumugon sa mga gawain na hindi nagtagumpay sa oras,
- xcom para sa pagpapalitan ng metadata (sabi ko metadata!) sa pagitan ng mga gawain sa dag.
- Pang-aabuso sa mail. Well, ano ang masasabi ko? Nai-set up ang mga alerto para sa lahat ng pag-uulit ng mga nahuling gawain. Ngayon ang Gmail ko sa trabaho ay may >90k na email mula sa Airflow, at ang web mail muzzle ay tumangging kumuha at magtanggal ng higit sa 100 nang sabay-sabay.
Higit pang mga pitfalls:
Higit pang mga tool sa automation
Para mas makapagtrabaho tayo gamit ang ating mga ulo at hindi gamit ang ating mga kamay, inihanda ito ng Airflow para sa atin:
- - mayroon pa rin siyang status na Eksperimental, na hindi pumipigil sa kanya na magtrabaho. Gamit ito, hindi ka lamang makakakuha ng impormasyon tungkol sa mga dag at gawain, ngunit huminto/magsimula din ng isang araw, lumikha ng isang DAG Run o isang pool.
- - maraming mga tool ang magagamit sa pamamagitan ng command line na hindi lamang nakakaabala na gamitin sa pamamagitan ng WebUI, ngunit sa pangkalahatan ay wala. Halimbawa:
backfillkailangan upang i-restart ang mga instance ng gawain.
Halimbawa, dumating ang mga analyst at nagsabi: "At ikaw, kasama, may katarantaduhan sa data mula Enero 1 hanggang 13! Ayusin mo, ayusin mo, ayusin mo!" At ikaw ay isang libangan:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Batayang serbisyo:
initdb,resetdb,upgradedb,checkdb. run, na nagbibigay-daan sa iyong magpatakbo ng isang instance na gawain, at kahit na puntos sa lahat ng dependencies. Bukod dito, maaari mo itong patakbuhin sa pamamagitan ngLocalExecutor, kahit na mayroon kang kumpol ng Celery.- Gumagawa ng halos parehong bagay
test, lamang din sa mga base nagsusulat wala. connectionsnagbibigay-daan sa paggawa ng masa ng mga koneksyon mula sa shell.
- - isang medyo hardcore na paraan ng pakikipag-ugnayan, na kung saan ay inilaan para sa mga plugin, at hindi swarming sa loob nito na may maliit na mga kamay. Ngunit sino ang pipigil sa aming pumunta
/home/airflow/dags, tumakboipythonat magsimulang manggulo? Maaari mong, halimbawa, i-export ang lahat ng koneksyon gamit ang sumusunod na code:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Pagkonekta sa Airflow metadatabase. Hindi ko inirerekomenda ang pagsulat dito, ngunit ang pagkuha ng mga estado ng gawain para sa iba't ibang partikular na sukatan ay maaaring maging mas mabilis at mas madali kaysa sa paggamit ng alinman sa mga API.
Sabihin nating hindi lahat ng ating mga gawain ay idempotent, ngunit kung minsan ay maaaring mahulog ito, at ito ay normal. Ngunit ang ilang mga blockage ay kahina-hinala na, at ito ay kinakailangan upang suriin.
Ingat SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
sanggunian
At siyempre, ang unang sampung link mula sa pagpapalabas ng Google ay ang mga nilalaman ng folder ng Airflow mula sa aking mga bookmark.
- - siyempre, kailangan nating magsimula sa opisina. dokumentasyon, ngunit sino ang nagbabasa ng mga tagubilin?
- - Well, basahin man lang ang mga rekomendasyon mula sa mga creator.
- - ang pinakasimula: ang user interface sa mga larawan
- - ang mga pangunahing konsepto ay mahusay na inilarawan, kung (bigla!) Hindi mo naintindihan ang isang bagay mula sa akin.
- - isang maikling gabay para sa pag-set up ng Airflow cluster.
- - halos kaparehong kawili-wiling artikulo, maliban sa mas pormalismo, at mas kaunting mga halimbawa.
- — tungkol sa pagtatrabaho kasabay ng Celery.
- - tungkol sa idempotency ng mga gawain, naglo-load sa pamamagitan ng ID sa halip na petsa, pagbabago, istraktura ng file at iba pang mga kagiliw-giliw na bagay.
- - dependencies ng mga gawain at Trigger Rule, na binanggit ko lamang sa pagpasa.
- - kung paano pagtagumpayan ang ilang "gumagana ayon sa nilalayon" sa scheduler, i-load ang nawalang data at unahin ang mga gawain.
- — kapaki-pakinabang na mga query sa SQL sa Airflow metadata.
- - mayroong isang kapaki-pakinabang na seksyon tungkol sa paglikha ng isang pasadyang sensor.
- — isang kawili-wiling maikling tala tungkol sa pagbuo ng isang imprastraktura sa AWS para sa Data Science.
- - mga karaniwang pagkakamali (kapag hindi pa rin binabasa ng isang tao ang mga tagubilin).
- - ngumiti kung paano i-saklay ng mga tao ang pag-iimbak ng mga password, bagama't maaari mo lamang gamitin ang Mga Koneksyon.
- - implicit DAG forwarding, context throwing in functions, again about dependencies, and also about skipping task launchs.
- - tungkol sa paggamit
default argumentsиparamssa mga template, pati na rin sa Mga Variable at Koneksyon. - - isang kuwento tungkol sa kung paano naghahanda ang tagaplano para sa Airflow 2.0.
- - isang medyo luma na artikulo tungkol sa pag-deploy ng aming cluster sa
docker-compose. - - mga dynamic na gawain gamit ang mga template at pagpapasa ng konteksto.
- — karaniwan at pasadyang mga abiso sa pamamagitan ng koreo at Slack.
- - Mga sumasanga na gawain, macro at XCom.
At ang mga link na ginamit sa artikulo:
- - magagamit ang mga placeholder para sa mga template.
- — Mga karaniwang pagkakamali kapag gumagawa ng mga dag.
- -
docker-composepara sa eksperimento, pag-debug at higit pa. - — Python wrapper para sa Telegram REST API.
Pinagmulan: www.habr.com




