Saluton, mi estas Dmitry Logvinenko - Datuma Inĝeniero de la Analitika Sekcio de la grupo de kompanioj Vezet.
Mi rakontos al vi pri mirinda ilo por disvolvi ETL-procezojn - Apache Airflow. Sed Airflow estas tiel diverstalenta kaj multfaceta, ke vi devus pli detale rigardi ĝin eĉ se vi ne estas implikita en datumfluoj, sed bezonas periode lanĉi iujn ajn procezojn kaj kontroli ilian ekzekuton.
Kaj jes, mi ne nur rakontos, sed ankaŭ montros: la programo havas multajn kodojn, ekrankopiojn kaj rekomendojn.

Kion vi kutime vidas kiam vi guglas la vorton Airflow / Wikimedia Commons
Enhavtabelo
Enkonduko
Apache Airflow estas same kiel Django:
- skribita en python
- estas bonega administra panelo,
- vastigebla senfine
- nur pli bone, kaj ĝi estis farita por tute malsamaj celoj, nome (kiel ĝi estas skribita antaŭ la kata):
- prizorgi kaj monitori taskojn sur senlima nombro da maŝinoj (kiel multaj Celery / Kubernetes kaj via konscienco permesos al vi)
- kun dinamika laborflua generacio de tre facile skribi kaj kompreni Python-kodon
- kaj la kapablo konekti ajnajn datumbazojn kaj API-ojn unu kun la alia uzante kaj pretajn komponantojn kaj memfaritajn kromaĵojn (kio estas ekstreme simpla).
Ni uzas Apache Airflow tiel:
- ni kolektas datumojn de diversaj fontoj (multaj okazoj de SQL Server kaj PostgreSQL, diversaj API-oj kun aplikaj metrikoj, eĉ 1C) en DWH kaj ODS (ni havas Vertica kaj Clickhouse).
- kiom progresinta
cron, kiu komencas la datumsolidigajn procezojn sur la ODS, kaj ankaŭ kontrolas ilian prizorgadon.
Ĝis antaŭ nelonge, niaj bezonoj estis kovritaj de unu malgranda servilo kun 32 kernoj kaj 50 GB da RAM. En Airflow, ĉi tio funkcias:
- pli 200 dagoj (fakte laborfluoj, en kiuj ni plenigis taskojn),
- en ĉiu averaĝe 70 taskoj,
- ĉi tiu boneco komenciĝas (ankaŭ averaĝe) unufoje en horo.
Kaj pri kiel ni vastigis, mi skribos sube, sed nun ni difinu la über-problemon, kiun ni solvos:
Estas tri fontaj SQL-Serviloj, ĉiu kun 50 datumbazoj - okazoj de unu projekto, respektive, ili havas la saman strukturon (preskaŭ ĉie, mua-ha-ha), kio signifas, ke ĉiu havas Ordojn-tabelon (feliĉe, tabelo kun tiu). nomo povas esti puŝita en ajnan komercon). Ni prenas la datumojn aldonante servokampojn (fontoservilo, fonta datumbazo, ETL-tasko-identigilo) kaj naive ĵetas ilin en, ekzemple, Vertica.
Ni iru!
La ĉefa parto, praktika (kaj iom teoria)
Kial ni (kaj vi)
Kiam la arboj estis grandaj kaj mi estis simpla SQL-schik en unu rusa podetala komerco, ni trompis ETL-procezojn alinome datumfluojn uzante du ilojn disponeblajn al ni:
- Informatica Potenca Centro - ege disvastiĝanta sistemo, ege produktiva, kun propra aparataro, propra versio. Mi uzis Dio malpermesu 1% de ĝiaj kapabloj. Kial? Nu, antaŭ ĉio, ĉi tiu interfaco, ie el la 380-aj jaroj, mense premas nin. Due, ĉi tiu aparato estas desegnita por ekstreme luksaj procezoj, furioza reuzo de komponantoj kaj aliaj tre gravaj entreprenaj lertaĵoj. Pri kio kostas, kiel la flugilo de la Airbus AXNUMX/jaro, ni nenion diros.
Atentu, ekrankopio povas iomete vundi homojn sub 30 jarojn

- Servilo de Integriĝo de SQL-Servilo — ni uzis tiun ĉi kamaradon en niaj intraprojektaj fluoj. Nu, fakte: ni jam uzas SQL-Servilon, kaj estus iel malracie ne uzi ĝiajn ETL-ilojn. Ĉio en ĝi estas bona: kaj la interfaco estas bela, kaj la progreso-raportoj... Sed ne tial ni amas softvaraĵojn, ho, ne por ĉi tio. Versio ĝin
dtsx(kio estas XML kun nodoj miksitaj dum konservado) ni povas, sed kio estas la signifo? Kio pri fari taskan pakon, kiu trenos centojn da tabloj de unu servilo al alia? Jes, kia cent, via montrofingro defalos el dudek pecoj, klakante sur la musbutono. Sed ĝi certe aspektas pli moda:
Ni certe serĉis elirejojn. Kazo eĉ preskaŭ venis al memskribita SSIS-pakaĵgeneratoro ...
…kaj tiam nova laboro trovis min. Kaj Apache Airflow superis min sur ĝi.
Kiam mi eksciis, ke ETL-procezaj priskriboj estas simplaj Python-kodo, mi simple ne dancis pro ĝojo. Jen kiel datumfluoj estis versiigitaj kaj malsametaj, kaj verŝi tabelojn kun ununura strukturo el centoj da datumbazoj en unu celon fariĝis afero de Python-kodo en unu kaj duono aŭ du ekranoj de 13 ".
Kunvenante la areton
Ni ne aranĝu tute infanĝardenon, kaj ne parolu pri tute evidentaj aferoj ĉi tie, kiel instali Airflow, vian elektitan datumbazon, Celery kaj aliajn kazojn priskribitajn en la dokoj.
Por ke ni povu tuj komenci eksperimentojn, mi skizis docker-compose.yml en kiu:
- Ni efektive altigu Aerfluo: Planilo, Retservilo. Floro ankaŭ turniĝos tie por monitori Celery-taskojn (ĉar ĝi jam estis puŝita enen
apache/airflow:1.10.10-python3.7, sed ni ne ĝenas) - PostgreSQL, en kiu Airflow skribos ĝiajn servajn informojn (planildatumoj, ekzekutstatistikoj, ktp.), kaj Celery markos plenumitajn taskojn;
- Redis, kiu funkcios kiel taskoperanto por Celery;
- Celeriolaboristo, kiu okupiĝos pri la rekta plenumo de taskoj.
- Al dosierujo
./dagsni aldonos niajn dosierojn kun la priskribo de dags. Ili estos prenitaj sur la muŝo, do ne necesas ĵongli la tutan stakon post ĉiu terno.
Kelkloke la kodo en la ekzemploj ne estas tute montrata (por ne malordigi la tekston), sed ie ĝi estas modifita en la procezo. Kompletaj laborkodaj ekzemploj troveblas en la deponejo .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerNotoj:
- En la muntado de la komponado mi plejparte fidis je la konata bildo - nepre kontrolu ĝin. Eble vi ne bezonas ion alian en via vivo.
- Ĉiuj agordoj de Aerfluo disponeblas ne nur pere
airflow.cfg, sed ankaŭ per mediovariabloj (danke al la programistoj), kiujn mi malice profitis. - Kompreneble, ĝi ne estas preta por produktado: mi intence ne metis korbatojn sur ujojn, mi ne ĝenis min pri sekureco. Sed mi faris la minimumon taŭgan por niaj eksperimentantoj.
- Notu tion:
- La dag-dosierujo devas esti alirebla por kaj la planisto kaj la laboristoj.
- La sama validas por ĉiuj triaj bibliotekoj - ili ĉiuj devas esti instalitaj sur maŝinoj kun planilo kaj laboristoj.
Nu, nun ĝi estas simpla:
$ docker-compose up --scale worker=3Post kiam ĉio leviĝas, vi povas rigardi la retajn interfacojn:
- Aerfluo:
- Floro:
Bazaj konceptoj
Se vi nenion komprenis en ĉiuj ĉi tiuj "dags", do jen mallonga vortaro:
- Planisto - la plej grava onklo en Airflow, kiu kontrolas, ke robotoj laboras forte, kaj ne homo: kontrolas la horaron, ĝisdatigas dagojn, lanĉas taskojn.
Ĝenerale, en pli malnovaj versioj, li havis problemojn kun memoro (ne, ne memorperdonon, sed likojn) kaj la hereda parametro eĉ restis en la agordoj.
run_duration— ĝia rekomenca intervalo. Sed nun ĉio estas en ordo. - DAG (alinome "dag") - "direktita acikla grafeo", sed tia difino rakontos al malmultaj homoj, sed fakte ĝi estas ujo por taskoj interrilatantaj (vidu malsupre) aŭ analogo de Pako en SSIS kaj Workflow en Informatica. .
Krom dagoj, eble ankoraŭ ekzistas subdagoj, sed ni plej verŝajne ne atingos ilin.
- DAG Kuru - pravalorigita dag, kiu estas asignita sia propra
execution_date. Dagranoj de la sama dag povas funkcii paralele (se vi faris viajn taskojn idempotent, kompreneble). - Funkciigisto estas pecoj de kodo respondecaj por plenumi specifan agon. Estas tri specoj de funkciigistoj:
- agokiel nia plej ŝatata
PythonOperator, kiu povas ekzekuti ajnan (validan) Python-kodon; - transigo, kiuj transportas datumojn de loko al loko, ekzemple,
MsSqlToHiveTransfer; - Sensilo aliflanke, ĝi permesos vin reagi aŭ malrapidigi la pluan ekzekuton de la dag ĝis okazaĵo okazas.
HttpSensorpovas tiri la specifitan finpunkton, kaj kiam la dezirata respondo atendas, komenci la translokigonGoogleCloudStorageToS3Operator. Sciema menso demandos: “kial? Post ĉio, vi povas fari ripetojn ĝuste en la funkciigisto!" Kaj poste, por ne ŝtopi la aron de taskoj kun nuligitaj operatoroj. La sensilo komenciĝas, kontrolas kaj mortas antaŭ la sekva provo.
- agokiel nia plej ŝatata
- tasko - deklaritaj operatoroj, sendepende de tipo, kaj ligitaj al la dag estas promociitaj al la rango de tasko.
- tasko-instanco - kiam la ĝenerala planisto decidis, ke estas tempo sendi taskojn en batalon al prezentisto-laboristoj (ĝuste surloke, se ni uzas
LocalExecutoraŭ al fora nodo en la kazo deCeleryExecutor), ĝi asignas kuntekston al ili (t.e., aro de variabloj - ekzekutparametroj), vastigas komand- aŭ demandŝablonojn, kaj kunigas ilin.
Ni generas taskojn
Unue, ni skizu la ĝeneralan skemon de nia doug, kaj poste ni pli kaj pli plonĝos en la detalojn, ĉar ni aplikas kelkajn ne-trivialajn solvojn.
Do, en ĝia plej simpla formo, tia dago aspektos jene:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Ni eltrovu ĝin:
- Unue, ni importas la necesajn libs kaj io alia;
sql_server_dsEstasList[namedtuple[str, str]]kun la nomoj de la konektoj de Airflow Connections kaj la datumbazoj el kiuj ni prenos nian teleron;dag- la anonco de nia dag, kiu nepre devas esti englobals(), alie Airflow ne trovos ĝin. Doug ankaŭ devas diri:- kio estas lia nomo
orders- ĉi tiu nomo tiam aperos en la retinterfaco, - ke li laboros ekde noktomezo la okan de julio,
- kaj ĝi devus funkcii, proksimume ĉiujn 6 horojn (por malmolaj uloj ĉi tie anstataŭ
timedelta()akcepteblacron-linio0 0 0/6 ? * * *, por la malpli mojosa - esprimo kiel@daily);
- kio estas lia nomo
workflow()faros la ĉefan laboron, sed ne nun. Nuntempe ni simple forĵetos nian kuntekston en la protokolon.- Kaj nun la simpla magio krei taskojn:
- ni trakuras niajn fontojn;
- pravalorigi
PythonOperator, kiu ekzekutos nian manikintonworkflow(). Ne forgesu specifi unikan (ene de la dag) nomon de la tasko kaj ligi la dag mem. Flagoprovide_contextsiavice, verŝos pliajn argumentojn en la funkcion, kiun ni zorge kolektos uzante**context.
Nuntempe, tio estas ĉio. Kion ni ricevis:
- nova dag en la retinterfaco,
- cent kaj duono da taskoj, kiuj estos ekzekutitaj paralele (se la Aerfluo, Celery-agordoj kaj servila kapablo tion permesas).
Nu, preskaŭ ricevis ĝin.

Kiu instalos la dependecojn?
Por simpligi ĉi tiun tutan aferon, mi ŝraŭbis docker-compose.yml prilaborado requirements.txt sur ĉiuj nodoj.
Nun ĝi malaperis:

Grizaj kvadratoj estas taskokazoj prilaboritaj de la planilo.
Ni atendu iomete, la taskoj estas klakitaj de la laboristoj:

La verdaj, kompreneble, sukcese finis sian laboron. Ruĝecoj ne tre sukcesas.
Cetere, ne estas dosierujo sur nia prod
./dags, ne estas sinkronigo inter maŝinoj - ĉiuj dagoj kuŝasgitsur nia Gitlab, kaj Gitlab CI distribuas ĝisdatigojn al maŝinoj dum kunfandadomaster.
Iom pri Floro
Dum la laboristoj draŝas niajn suĉilojn, ni rememoru alian ilon, kiu povas ion montri al ni - Floro.
La unua paĝo kun resumaj informoj pri labornodoj:

La plej intensa paĝo kun taskoj kiuj funkciis:

La plej enuiga paĝo kun la statuso de nia makleristo:

La plej hela paĝo estas kun taskaj statografikoj kaj ilia ekzekuttempo:

Ni ŝarĝas la subŝarĝitajn
Do, ĉiuj taskoj funkciis, vi povas forporti la vunditojn.

Kaj estis multaj vunditoj — pro unu aŭ alia kialo. En la kazo de la ĝusta uzo de Airflow, ĉi tiuj mem kvadratoj indikas, ke la datumoj certe ne alvenis.
Vi devas rigardi la protokolon kaj rekomenci la falintajn taskokazojn.
Alklakante iun kvadraton, ni vidos la disponeblajn agojn por ni:

Vi povas preni kaj fari Klarigi la falinton. Tio estas, ni forgesas, ke io malsukcesis tie, kaj la sama ekzempla tasko iros al la planilo.

Estas klare, ke fari ĉi tion per la muso kun ĉiuj ruĝaj kvadratoj ne estas tre humana - tio ne estas kion ni atendas de Airflow. Kompreneble, ni havas amasdetruajn armilojn: Browse/Task Instances

Ni elektu ĉion samtempe kaj restarigi al nulo, alklaku la ĝustan eron:

Post purigado, niaj taksioj aspektas tiel (ili jam atendas ke la planisto planu ilin):

Konektoj, hokoj kaj aliaj variabloj
Estas tempo rigardi la sekvan DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Ĉu ĉiuj iam faris raportan ĝisdatigon? Jen ŝi denove: estas listo de fontoj de kie akiri la datumojn; estas listo kie meti; ne forgesu klaksoni kiam ĉio okazis aŭ rompiĝis (nu, ĉi tio ne temas pri ni, ne).
Ni trarigardu la dosieron denove kaj rigardu la novajn obskurajn aferojn:
from commons.operators import TelegramBotSendMessage- nenio malhelpas al ni fari niajn proprajn telefonistojn, kiujn ni profitis farante malgrandan envolvaĵon por sendi mesaĝojn al Unblocked. (Ni parolos pli pri ĉi tiu operatoro sube);default_args={}- dag povas distribui la samajn argumentojn al ĉiuj ĝiaj operatoroj;to='{{ var.value.all_the_kings_men }}'- kampotoni ne havos malmolkoditajn, sed dinamike generitajn uzante Jinja kaj variablon kun listo de retpoŝtoj, kiujn mi zorge enmetisAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— kondiĉo por ekfunkciigi la funkciigiston. En nia kazo, la letero flugos al la estroj nur se ĉiuj dependecoj funkciis sukcese;tg_bot_conn_id='tg_main'- argumentojconn_idakceptu konektajn identigilojn en kiuj ni kreasAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- mesaĝoj en Telegram forflugos nur se estas falintaj taskoj;task_concurrency=1- ni malpermesas la samtempan lanĉon de pluraj taskokazoj de unu tasko. Alie, ni ricevos la samtempan lanĉon de plurajVerticaOperator(rigardante unu tablon);report_update >> [email, tg]- ĉiujVerticaOperatorkonverĝu en sendado de leteroj kaj mesaĝoj, kiel ĉi tio:

Sed ĉar sciigistoj havas malsamajn lanĉajn kondiĉojn, nur unu funkcios. En la Arba Vido, ĉio aspektas iom malpli vida:

Mi diros kelkajn vortojn pri makrooj kaj iliaj amikoj - variabloj.
Makrooj estas Jinja anstataŭiloj kiuj povas anstataŭigi diversajn utilajn informojn en funkciigistargumentojn. Ekzemple, tiel:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} vastiĝos al la enhavo de la kunteksta variablo execution_date en formato YYYY-MM-DD: 2020-07-14. La plej bona parto estas, ke kuntekstaj variabloj estas najlitaj al specifa taskokazaĵo (kvadrato en la Arba Vido), kaj kiam rekomencitaj, la anstataŭiloj vastiĝos al la samaj valoroj.
La asignitaj valoroj povas esti viditaj per la Butono Redonita en ĉiu tasko. Jen kiel la tasko kun sendado de letero:

Kaj do ĉe la tasko kun sendado de mesaĝo:

Kompleta listo de enkonstruitaj makrooj por la plej nova disponebla versio estas havebla ĉi tie:
Krome, helpe de kromprogramoj, ni povas deklari niajn proprajn makroojn, sed tio estas alia historio.
Krom la antaŭdifinitaj aferoj, ni povas anstataŭigi la valorojn de niaj variabloj (mi jam uzis ĉi tion en la supra kodo). Ni kreu enen Admin/Variables kelkaj aferoj:

Ĉio, kion vi povas uzi:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')La valoro povas esti skalaro, aŭ ĝi ankaŭ povas esti JSON. En kazo de JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}simple uzu la vojon al la dezirata ŝlosilo: {{ var.json.bot_config.bot.token }}.
Mi laŭvorte diros unu vorton kaj montros unu ekrankopion pri ligoj. Ĉio estas elementa ĉi tie: sur la paĝo Admin/Connections ni kreas konekton, aldonas niajn ensalutojn / pasvortojn kaj pli specifajn parametrojn tie. Kiel tio:

Pasvortoj povas esti ĉifritaj (pli ĝisfunde ol la defaŭlta), aŭ vi povas forlasi la konektotipo (kiel mi faris por tg_main) - la fakto estas, ke la listo de tipoj estas fiksita en Airflow-modeloj kaj ne povas esti vastigita sen eniri la fontkodojn (se subite mi ne guglos ion, bonvolu korekti min), sed nenio malhelpos nin akiri kreditojn nur per nomo.
Vi ankaŭ povas fari plurajn ligojn kun la sama nomo: en ĉi tiu kazo, la metodo BaseHook.get_connection(), kiu ricevas al ni konektojn laŭnome, donos hazarda de pluraj samnomuloj (estus pli logike fari Round Robin, sed ni lasu ĝin sur la konscienco de la programistoj de Airflow).
Variabloj kaj Konektoj certe estas bonegaj iloj, sed gravas ne perdi la ekvilibron: kiujn partojn de viaj fluoj vi konservas en la kodo mem, kaj kiujn partojn vi donas al Airflow por stokado. Unuflanke, povas esti oportune ŝanĝi la valoron, ekzemple, retskatolo, per la UI. Aliflanke, ĉi tio ankoraŭ estas reveno al la musklako, de kiu ni (mi) volis forigi.
Labori kun konektoj estas unu el la taskoj hokoj. Ĝenerale, Airflow-hokoj estas punktoj por konekti ĝin al triaj servoj kaj bibliotekoj. Ekz. JiraHook malfermos klienton por ke ni interagu kun Jira (vi povas movi taskojn tien kaj reen), kaj kun la helpo de SambaHook vi povas puŝi lokan dosieron al smb-punkto.
Analizante la kutiman funkciigiston
Kaj ni alproksimiĝis rigardi kiel ĝi estas farita TelegramBotSendMessage
Kodo commons/operators.py kun la fakta funkciigisto:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Ĉi tie, kiel ĉio alia en Airflow, ĉio estas tre simpla:
- Heredita de
BaseOperator, kiu efektivigas sufiĉe multajn Aerfluajn specifajn aferojn (rigardu vian libertempon) - Deklaritaj kampoj
template_fields, en kiu Jinja serĉos makroojn por procesi. - Aranĝis la ĝustajn argumentojn por
__init__(), agordu la defaŭltojn kie necese. - Ni ankaŭ ne forgesis pri la inicialigo de la praulo.
- Malfermis la respondan hokon
TelegramBotHookricevis klientan objekton de ĝi. - Anstataŭita (redifinita) metodo
BaseOperator.execute(), kiun Airfow svingos kiam venos la tempo lanĉi la funkciigiston - en ĝi ni efektivigos la ĉefan agon, forgesante ensaluti. (Ni ensalutas, cetere, tujstdoutиstderr- Aerfluo kaptos ĉion, envolvos ĝin bele, malkomponos ĝin kie necese.)
Ni vidu kion ni havas commons/hooks.py. La unua parto de la dosiero, kun la hoko mem:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientMi eĉ ne scias kion klarigi ĉi tie, mi nur notos la gravajn punktojn:
- Ni heredas, pensu pri la argumentoj - plejofte ĝi estos unu:
conn_id; - Superregado de normaj metodoj: mi limigis min
get_conn(), en kiu mi ricevas la konektajn parametrojn laŭnome kaj nur ricevas la sekcionextra(ĉi tio estas JSON-kampo), en kiu mi (laŭ miaj propraj instrukcioj!) metis la Telegram-bot-ĵetonon:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Mi kreas ekzemplon de nia
TelegramBot, donante al ĝi specifan ĵetonon.
Tio estas ĉio. Vi povas akiri klienton de hoko uzante TelegramBotHook().clent aŭ TelegramBotHook().get_conn().
Kaj la dua parto de la dosiero, en kiu mi faras mikrokonvolvaĵon por la Telegram REST API, por ne treni la saman por unu metodo sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))La ĝusta maniero estas aldoni ĉion:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- en la kromprogramon, metu en publikan deponejon, kaj donu ĝin al Malferma Fonto.
Dum ni studis ĉion ĉi, niaj raportaj ĝisdatigoj sukcesis malsukcesi kaj sendi al mi erarmesaĝon en la kanalo. Mi kontrolos ĉu ĝi estas malĝusta...

Io rompiĝis en nia doĝo! Ĉu ne tion ni atendis? Ĝuste!
Ĉu vi intencas verŝi?
Ĉu vi sentas, ke mi maltrafis ion? Ŝajnas, ke li promesis transdoni datumojn de SQL-Servilo al Vertica, kaj tiam li prenis ĝin kaj foriris de la temo, la kanajlo!
Tiu ĉi abomenaĵo estis intencita, mi simple devis deĉifri por vi ian terminologion. Nun vi povas iri plu.
Nia plano estis jena:
- Do dag
- Generu taskojn
- Vidu kiel bela ĉio estas
- Asignu numerojn de sesio al plenigaĵoj
- Akiru datumojn de SQL-Servilo
- Metu datumojn en Vertica
- Kolektu statistikojn
Do, por ekfunkciigi ĉi tion, mi faris malgrandan aldonon al nia docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyTie ni levas:
- Vertica kiel gastiganto
dwhkun la plej defaŭltaj agordoj, - tri okazoj de SQL Server,
- ni plenigas la datumbazojn en ĉi-lasta per iuj datumoj (neniuokaze ne enrigardu
mssql_init.py!)
Ni lanĉas ĉion bonan helpe de iom pli komplika komando ol la lastan fojon:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Kion nia mirakla randomigilo generis, vi povas uzi la eron Data Profiling/Ad Hoc Query:

La ĉefa afero estas ne montri ĝin al analizistoj
prilabori ETL-sesioj Mi ne faros, ĉio estas bagatela tie: ni faras bazon, estas signo en ĝi, ni envolvas ĉion per kunteksta administranto, kaj nun ni faras ĉi tion:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15sesio.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passLa tempo venis kolekti niajn datumojn el niaj unu kaj duono da tabloj. Ni faru tion helpe de tre senpretendaj linioj:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Helpe de hoko ni ricevas de Airflow
pymssql-konekti - Ni anstataŭigu limigon en formo de dato en la peton - ĝi estos ĵetita en la funkcion de la ŝablona motoro.
- Nutrante nian peton
pandaskiu ricevos ninDataFrame— ĝi estos utila al ni estonte.
Mi uzas anstataŭigon
{dt}anstataŭ peti parametron%sne ĉar mi estas malbona Pinokjo, sed ĉarpandasne povas manipulipymssqlkaj glitas la lastanparams: Listkvankam li vere volastuple.
Rimarku ankaŭ, ke la programistopymssqldecidis ne plu subteni lin, kaj estas tempo elmoviĝipyodbc.
Ni vidu, per kio Airflow plenigis la argumentojn de niaj funkcioj:

Se ne estas datumoj, tiam ne utilas daŭrigi. Sed estas ankaŭ strange konsideri la plenigon sukcesa. Sed ĉi tio ne estas eraro. A-ah-ah, kion fari?! Kaj jen kio:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException diras al Airflow, ke ne estas eraroj, sed ni preterlasas la taskon. La interfaco ne havos verdan aŭ ruĝan kvadraton, sed rozkoloran.
Ni ĵetu niajn datumojn multoblaj kolumnoj:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Nome:
- La datumbazo de kiu ni prenis la mendojn,
- ID de nia inunda sesio (ĝi estos malsama por ĉiu tasko),
- Hash de la fonto kaj mendo ID - tiel ke en la fina datumbazo (kie ĉio estas verŝita en unu tablon) ni havas unikan mendo ID.
La antaŭlasta paŝo restas: verŝu ĉion en Vertica. Kaj, strange, unu el la plej spektaklaj kaj efikaj manieroj fari tion estas per CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Ni faras specialan ricevilon
StringIO. pandasafable metos nianDataFrameen la formo deCSV-linioj.- Ni malfermu konekton al nia plej ŝatata Vertica per hoko.
- Kaj nun kun la helpo
copy()sendu niajn datumojn rekte al Vertika!
Ni prenos de la ŝoforo kiom da linioj estis plenigitaj, kaj diros al la seanca administranto, ke ĉio estas en ordo:
session.loaded_rows = cursor.rowcount
session.successful = TrueTio estas ĉio.
Sur la vendo, ni kreas la celplaton permane. Jen mi permesis al mi malgrandan maŝinon:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)Mi uzas
VerticaOperator()Mi kreas datumbazan skemon kaj tabelon (se ili ne jam ekzistas, kompreneble). La ĉefa afero estas ĝuste aranĝi la dependecojn:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadSupre
- Nu, - diris la museto, - ĉu ne, nun
Ĉu vi estas konvinkita, ke mi estas la plej terura besto en la arbaro?
Julia Donaldson, La Grufalo
Mi pensas, se miaj kolegoj kaj mi havus konkurson: kiu rapide kreos kaj lanĉos ETL-procezon de nulo: ili kun sia SSIS kaj muso kaj mi kun Airflow ... Kaj tiam ni ankaŭ komparus la facilecon de prizorgado ... Ve, mi pensas, ke vi konsentos, ke mi venkos ilin ĉiuflanke!
Se iom pli serioze, tiam Apache Airflow - priskribante procezojn en formo de programkodo - faris mian laboron multe pli komforta kaj ĝua.
Ĝia senlima etendebleco, kaj laŭ kromprogramoj kaj dispozicio al skaleblo, donas al vi la ŝancon uzi Airflow en preskaŭ ajna areo: eĉ en la plena ciklo de kolektado, preparado kaj prilaborado de datumoj, eĉ en lanĉado de raketoj (al Marso, de kompreneble).
Parto fina, referenco kaj informo
La rastilon ni kolektis por vi
start_date. Jes, ĉi tio jam estas loka memo. Per la ĉefa argumento de Dougstart_dateĉiuj pasas. Mallonge, se vi specifas enstart_dateaktuala dato, kajschedule_interval- unu tagon, tiam DAG komenciĝos morgaŭ ne pli frue.start_date = datetime(2020, 7, 7, 0, 1, 2)Kaj ne plu problemoj.
Estas alia rultempa eraro asociita kun ĝi:
Task is missing the start_date parameter, kiu plej ofte indikas, ke vi forgesis ligi al la operatoro dag.- Ĉio sur unu maŝino. Jes, kaj bazoj (Airflow mem kaj nia tegaĵo), kaj retservilo, kaj horaro, kaj laboristoj. Kaj ĝi eĉ funkciis. Sed kun la tempo, la nombro da taskoj por servoj kreskis, kaj kiam PostgreSQL komencis respondi al la indekso en 20 s anstataŭ 5 ms, ni prenis ĝin kaj forportis ĝin.
- Loka Executor. Jes, ni ankoraŭ sidas sur ĝi, kaj ni jam venis al la rando de la abismo. LocalExecutor sufiĉis al ni ĝis nun, sed nun estas tempo plivastigi kun almenaŭ unu laboristo, kaj ni devos multe klopodi por translokiĝi al CeleryExecutor. Kaj pro tio, ke vi povas labori kun ĝi sur unu maŝino, nenio malhelpas vin uzi Celery eĉ sur servilo, kiu "kompreneble, neniam eniros en produktadon, honeste!"
- Ne-uzo enkonstruitaj iloj:
- Ligoj stoki servajn akreditaĵojn,
- SLA Misses respondi al taskoj kiuj ne funkciis ĝustatempe,
- xcom por interŝanĝo de metadatenoj (mi diris metadatumoj!) inter dag taskoj.
- Poŝtmisuzo. Nu, kion mi povas diri? Atentigoj estis starigitaj por ĉiuj ripetoj de falintaj taskoj. Nun mia laboro Gmail havas >90k retpoŝtojn de Airflow, kaj la retpoŝta muzelo rifuzas preni kaj forigi pli ol 100 samtempe.
Pli da malfacilaĵoj:
Pli da aŭtomatigaj iloj
Por ke ni laboru eĉ pli per niaj kapoj kaj ne per niaj manoj, Airflow preparis por ni ĉi tion:
- — li ankoraŭ havas la statuson de Eksperimenta, kio ne malhelpas lin labori. Per ĝi, vi povas ne nur akiri informojn pri dagoj kaj taskoj, sed ankaŭ ĉesigi/komenci dag, krei DAG Run aŭ naĝejon.
- - multaj iloj disponeblas per la komandlinio, kiuj ne nur maloportunas uzi per la WebUI, sed ĝenerale forestas. Ekzemple:
backfillbezonata por rekomenci taskokazojn.
Ekzemple, analizistoj venis kaj diris: “Kaj vi, kamarado, havas stultaĵojn en la datumoj de la 1-a ĝis la 13-a de januaro! Riparu ĝin, riparu ĝin, riparu ĝin, riparu ĝin!" Kaj vi estas tia kuirilo:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Baza servo:
initdb,resetdb,upgradedb,checkdb. run, kiu ebligas al vi ruli unu ekzaktan taskon, kaj eĉ poentiĝi pri ĉiuj dependecoj. Plie, vi povas ruli ĝin perLocalExecutor, eĉ se vi havas Celery-grupon.- Faras preskaŭ la samon
test, nur ankaŭ en bazoj skribas nenion. connectionspermesas amasan kreadon de ligoj de la ŝelo.
- - sufiĉe malfacila maniero interagi, kiu estas destinita por kromaĵojn, kaj ne svarmi en ĝi per malgrandaj manoj. Sed kiu malhelpas nin iri
/home/airflow/dags, kuriipythonkaj komenci fuŝi? Vi povas, ekzemple, eksporti ĉiujn konektojn kun la sekva kodo:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Konektante al la metadatumbazo de Airflow. Mi ne rekomendas skribi al ĝi, sed ricevi taskostatojn por diversaj specifaj metrikoj povas esti multe pli rapida kaj pli facila ol per iu ajn el la APIoj.
Ni diru, ke ne ĉiuj niaj taskoj estas idempotentaj, sed ili foje povas fali, kaj tio estas normala. Sed kelkaj blokadoj jam estas suspektindaj, kaj estus necese kontroli.
Atentu SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
referencoj
Kaj kompreneble, la unuaj dek ligiloj de la elsendo de Guglo estas la enhavo de la dosierujo Airflow el miaj legosignoj.
- — kompreneble, ni devas komenci per la oficejo. dokumentado, sed kiu legas la instrukciojn?
- - Nu, almenaŭ legu la rekomendojn de la kreintoj.
- - la komenco mem: la uzantinterfaco en bildoj
- - la bazaj konceptoj estas bone priskribitaj, se (subite!) Vi ne komprenis ion de mi.
- - mallonga gvidilo por starigi Aerfluan areton.
- - preskaŭ la sama interesa artikolo, krom eble pli da formalismo, kaj malpli da ekzemploj.
- — pri laborado kune kun Celerio.
- - pri la idempotenco de taskoj, ŝarĝo per ID anstataŭ dato, transformo, dosierstrukturo kaj aliaj interesaj aferoj.
- - dependecoj de taskoj kaj Trigger Rule, kiujn mi menciis nur preterpase.
- - kiel venki iujn "funkciojn kiel celitaj" en la planilo, ŝarĝi perditajn datumojn kaj prioritatigi taskojn.
- - utilaj SQL-demandoj al Airflow-metadatenoj.
- - estas utila sekcio pri kreado de kutima sensilo.
- — interesa mallonga noto pri konstruado de infrastrukturo sur AWS por Datuma Scienco.
- - oftaj eraroj (kiam iu ankoraŭ ne legas la instrukciojn).
- - ridetu kiel homoj lambastonas stoki pasvortojn, kvankam vi povas simple uzi Konektoj.
- - implicita DAG plusendado, kuntekstado en funkcioj, denove pri dependecoj, kaj ankaŭ pri transsalti taskolanĉojn.
- - pri la uzo
default argumentsиparamsen ŝablonoj, same kiel Variabloj kaj Konektoj. - - rakonto pri kiel la planisto prepariĝas por Airflow 2.0.
- - iomete malmoderna artikolo pri deplojado de nia areto enen
docker-compose. - - dinamikaj taskoj uzante ŝablonojn kaj kuntekstan plusendon.
- - normaj kaj kutimaj sciigoj per poŝto kaj Slack.
- - Branĉaj taskoj, makrooj kaj XCom.
Kaj la ligiloj uzataj en la artikolo:
- - anstataŭiloj disponeblaj por uzo en ŝablonoj.
- — Oftaj eraroj dum kreado de dags.
- -
docker-composepor eksperimentado, senararigado kaj pli. - — Python-envolvilo por Telegram REST API.
fonto: www.habr.com




