Sveiki, es esmu Dmitrijs Logviņenko — uzņēmumu grupas Vezet Analytics nodaļas datu inženieris.
Pastāstīšu par brīnišķīgu rīku ETL procesu izstrādei - Apache Airflow. Taču Airflow ir tik daudzpusīga un daudzpusīga, ka jums vajadzētu to aplūkot vērīgāk pat tad, ja neesat iesaistīts datu plūsmās, bet jums ir nepieciešams periodiski palaist kādus procesus un uzraudzīt to izpildi.
Un jā, es ne tikai pastāstīšu, bet arī parādīšu: programmā ir daudz koda, ekrānuzņēmumu un ieteikumu.

Ko parasti redzat, meklējot Google vārdu Airflow / Wikimedia Commons
Satura
Ievads
Apache Airflow ir gluži kā Django:
- rakstīts python valodā
- ir lielisks administratora panelis,
- paplašināms uz nenoteiktu laiku
- tikai labāk, un tas tika izgatavots pavisam citiem mērķiem, proti (kā rakstīts pirms kata):
- uzdevumu izpilde un uzraudzība neierobežotā skaitā iekārtu (cik daudz Selery/Kubernetes un jūsu sirdsapziņa to ļaus)
- ar dinamisku darbplūsmas ģenerēšanu no ļoti viegli rakstāma un saprotama Python koda
- un iespēja savienot jebkuras datu bāzes un API savā starpā, izmantojot gan gatavus komponentus, gan paštaisītus spraudņus (kas ir ārkārtīgi vienkārši).
Mēs izmantojam Apache Airflow šādi:
- mēs apkopojam datus no dažādiem avotiem (daudziem SQL Server un PostgreSQL gadījumiem, dažādām API ar lietojumprogrammu metriku, pat 1C) DWH un ODS (mums ir Vertica un Clickhouse).
- cik attīstīts
cron, kas uzsāk datu konsolidācijas procesus ODS, kā arī uzrauga to uzturēšanu.
Vēl nesen mūsu vajadzības apmierināja viens neliels serveris ar 32 kodoliem un 50 GB RAM. Gaisa plūsmā tas darbojas:
- vairāk 200 dienas (faktiski darbplūsmas, kurās mēs ievietojām uzdevumus),
- katrā vidēji 70 uzdevumi,
- šis labums sākas (arī vidēji) reizi stundā.
Un par to, kā mēs paplašinājāmies, es rakstīšu zemāk, bet tagad definēsim über-problēmu, kuru mēs atrisināsim:
Ir trīs avota SQL serveri, katrs ar 50 datu bāzēm - attiecīgi viena projekta gadījumi, tiem ir vienāda struktūra (gandrīz visur, mua-ha-ha), kas nozīmē, ka katram ir pasūtījumu tabula (par laimi, tabula ar to vārdu var iespiest jebkurā biznesā). Mēs ņemam datus, pievienojot servisa laukus (avota serveris, avota datu bāze, ETL uzdevuma ID) un naivi iemetam tos, piemēram, Vertica.
Iesim!
Galvenā daļa, praktiskā (un nedaudz teorētiska)
Kāpēc mēs (un jūs)
Kad koki bija lieli un es biju vienkāršs SQL-schik vienā Krievijas mazumtirdzniecībā mēs izkrāpām ETL procesus jeb datu plūsmas, izmantojot divus mums pieejamos rīkus:
- Informācijas enerģijas centrs - ārkārtīgi izplatīta sistēma, ārkārtīgi produktīva, ar savu aparatūru, savu versiju veidošanu. Es izmantoju 1% no tās iespējām. Kāpēc? Pirmkārt, šī saskarne kaut kur no 380. gadiem radīja mums garīgu spiedienu. Otrkārt, šī ierīce ir paredzēta ārkārtīgi smalkiem procesiem, niknai komponentu atkārtotai izmantošanai un citiem ļoti svarīgiem uzņēmuma trikiem. Par to, ka tas maksā, tāpat kā Airbus AXNUMX spārns / gadā, mēs neko neteiksim.
Uzmanieties, ekrānuzņēmums var nedaudz ievainot cilvēkus, kas jaunāki par 30 gadiem

- SQL servera integrācijas serveris - mēs izmantojām šo biedru mūsu iekšējās projekta plūsmās. Nu, patiesībā: mēs jau izmantojam SQL Server, un būtu kaut kā nesaprātīgi neizmantot tā ETL rīkus. Viss tajā ir labs: gan interfeiss ir skaists, gan progresa ziņojumi... Bet ne tāpēc mēs mīlam programmatūras produktus, ak, ne tāpēc. Versija to
dtsx(kas ir XML ar mezgliem, kas sajaukti saglabāšanas laikā) mēs varam, bet kāda jēga? Kā būtu ar uzdevumu pakotnes izveidi, kas vilks simtiem tabulu no viena servera uz otru? Jā, kāds simts, rādītājpirksts nokritīs no divdesmit gabaliņiem, noklikšķinot uz peles pogas. Bet tas noteikti izskatās modernāk:
Mēs noteikti meklējām izejas. Lieta pat gandrīz nonācis pie pašrakstīta SSIS pakotņu ģeneratora ...
…un tad mani atrada jauns darbs. Un Apache Airflow mani apsteidza tajā.
Kad uzzināju, ka ETL procesu apraksti ir vienkāršs Python kods, es vienkārši nedejoju aiz prieka. Tādā veidā datu straumes tika versētas un diferencētas, un tabulu ar vienu struktūru ieliešana no simtiem datu bāzu vienā mērķī kļuva par Python koda jautājumu pusotra vai divos 13 collu ekrānos.
Klastera salikšana
Nekārtosim pilnīgi bērnudārzu un nerunāsim šeit par pilnīgi pašsaprotamām lietām, piemēram, Airflow instalēšanu, jūsu izvēlēto datubāzi, Selerijas un citiem dokos aprakstītajiem gadījumiem.
Lai mēs nekavējoties varētu sākt eksperimentus, es ieskicēju docker-compose.yml kurā:
- Patiesībā paaugstināsim Airflow: plānotājs, tīmekļa serveris. Flower arī tur griezīsies, lai uzraudzītu Selerijas uzdevumus (jo tas jau ir iespiests
apache/airflow:1.10.10-python3.7, bet mums nav nekas pretī) - PostgreSQL, kurā Airflow ierakstīs savu servisa informāciju (plānotāja datus, izpildes statistiku utt.), bet Selery atzīmēs izpildītos uzdevumus;
- Redis, kas darbosies kā Selerijas uzdevumu brokeris;
- Selerijas strādnieks, kas nodarbosies ar tiešu uzdevumu izpildi.
- Uz mapi
./dagsmēs pievienosim savus failus ar dags aprakstu. Tie tiks savākti lidojuma laikā, tāpēc pēc katras šķaudīšanas nav nepieciešams žonglēt ar visu kaudzīti.
Dažās vietās kods piemēros nav pilnībā parādīts (lai nepārblīvētu tekstu), bet kaut kur tas tiek modificēts procesā. Pilnus darba kodu piemērus var atrast repozitorijā .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerPiezīmes:
- Kompozīcijas montāžā lielā mērā paļāvos uz labi zināmo tēlu - noteikti pārbaudiet to. Varbūt tev dzīvē nekas cits nav vajadzīgs.
- Visi gaisa plūsmas iestatījumi ir pieejami ne tikai caur
airflow.cfg, bet arī ar vides mainīgajiem (paldies izstrādātājiem), kurus es ļaunprātīgi izmantoju. - Protams, tas nav gatavs ražošanai: es apzināti neliku sirdspukstus uz konteineriem, es neuztraucos ar drošību. Bet es izdarīju mūsu eksperimentētājiem piemēroto minimumu.
- Pieraksti to:
- Mapei dag ir jābūt pieejamai gan plānotājam, gan darbiniekiem.
- Tas pats attiecas uz visām trešo pušu bibliotēkām — tām visām jābūt instalētām iekārtās ar plānotāju un darbiniekiem.
Nu, tagad tas ir vienkārši:
$ docker-compose up --scale worker=3Kad viss ir pacēlies, varat apskatīt tīmekļa saskarnes:
- Gaisa plūsma:
- Zieds:
Pamatjēdzieni
Ja jūs neko nesapratāt visos šajos "dags", tad šeit ir īsa vārdnīca:
- Plānotājs - vissvarīgākais onkulis Airflow, kas kontrolē, lai roboti smagi strādātu, nevis cilvēks: uzrauga grafiku, atjaunina dienas, palaiž uzdevumus.
Kopumā vecākās versijās viņam bija problēmas ar atmiņu (nē, nevis amnēzija, bet noplūdes) un mantotais parametrs pat palika konfigurācijās
run_duration— tā restartēšanas intervāls. Bet tagad viss ir kārtībā. - DAG (aka "dag") - "virzīts aciklisks grafiks", taču šāda definīcija pateiks dažiem cilvēkiem, taču patiesībā tas ir konteiners uzdevumiem, kas mijiedarbojas viens ar otru (skatīt zemāk) vai paketes SSIS un darbplūsmas analogs informaticā. .
Papildus dagiem joprojām var būt subdags, bet mēs, visticamāk, līdz tiem netiksim.
- DAG Skrējiens - inicializēts dag, kuram tiek piešķirts savs
execution_date. Tā paša dag Dagrans var strādāt paralēli (ja jūs, protams, padarījāt savus uzdevumus idempotentus). - operators ir koda daļas, kas ir atbildīgas par noteiktas darbības veikšanu. Ir trīs veidu operatori:
- rīcībakā mūsu mīļākie
PythonOperator, kas var izpildīt jebkuru (derīgu) Python kodu; - pārsūtīt, kas pārsūta datus no vienas vietas uz otru, piemēram,
MsSqlToHiveTransfer; - devējs no otras puses, tas ļaus jums reaģēt vai palēnināt turpmāko dag izpildi, līdz notiek notikums.
HttpSensorvar izvilkt norādīto beigu punktu un, kad vēlamā atbilde gaida, sākt pārsūtīšanuGoogleCloudStorageToS3Operator. Ziņkārīgs prāts jautās: “Kāpēc? Galu galā jūs varat veikt atkārtojumus tieši pie operatora! Un pēc tam, lai neaizsprostotu uzdevumu kopumu ar apturētajiem operatoriem. Sensors ieslēdzas, pārbauda un nomirst pirms nākamā mēģinājuma.
- rīcībakā mūsu mīļākie
- Uzdevums - deklarētie operatori neatkarīgi no veida un pievienoti dag tiek paaugstināti uz uzdevuma pakāpi.
- uzdevuma gadījums - kad ģenerālplānotājs nolēma, ka ir pienācis laiks sūtīt uzdevumus kaujā izpildītājiem-strādniekiem (tieši uz vietas, ja mēs izmantojam
LocalExecutorvai uz attālo mezglu, jaCeleryExecutor), tas piešķir tiem kontekstu (t.i., mainīgo lielumu kopu - izpildes parametrus), paplašina komandu vai vaicājumu veidnes un apvieno tās.
Mēs ģenerējam uzdevumus
Vispirms ieskicētu mūsu douga vispārējo shēmu, un tad arvien vairāk iedziļināsimies detaļās, jo mēs izmantojam dažus netriviālus risinājumus.
Tātad vienkāršākajā formā šāds dags izskatīsies šādi:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Izdomāsim:
- Pirmkārt, mēs importējam nepieciešamos libs un kaut kas cits;
sql_server_ds-ŠoList[namedtuple[str, str]]ar savienojumu nosaukumiem no Airflow Connections un datu bāzēm, no kurām mēs ņemsim savu plāksni;dag- mūsu dienas paziņojums, kuram obligāti jābūt iekšāglobals(), pretējā gadījumā Airflow to neatradīs. Dagam arī jāsaka:- kāds ir viņa vārds
orders- šis nosaukums tiks parādīts tīmekļa saskarnē, - ka viņš strādās no astotā jūlija pusnakts,
- un tam vajadzētu darboties aptuveni ik pēc 6 stundām (nevis skarbajiem puišiem
timedelta()pieļaujamacron- līnija0 0 0/6 ? * * *, mazāk foršajiem - izteiciens patīk@daily);
- kāds ir viņa vārds
workflow()darīs galveno darbu, bet ne tagad. Pagaidām mēs vienkārši iekļausim savu kontekstu žurnālā.- Un tagad vienkārša uzdevumu izveides burvība:
- mēs skrienam cauri saviem avotiem;
- palaist
PythonOperator, kas izpildīs mūsu manekenuworkflow(). Neaizmirstiet norādīt unikālu (dag ietvaros) uzdevuma nosaukumu un piesaistīt pašu dag. Karogsprovide_contextsavukārt funkcijā iebērs papildu argumentus, kurus rūpīgi apkoposim izmantojot**context.
Pagaidām tas arī viss. Ko mēs saņēmām:
- jauna diena tīmekļa saskarnē,
- pusotrs simts uzdevumu, kas tiks izpildīti paralēli (ja to ļaus Airflow, Selery iestatījumi un servera jauda).
Nu, gandrīz sapratu.

Kurš instalēs atkarības?
Lai vienkāršotu šo visu, es ieskrūvēju docker-compose.yml apstrāde requirements.txt visos mezglos.
Tagad tas ir pazudis:

Pelēki kvadrāti ir uzdevumu gadījumi, ko apstrādā plānotājs.
Nedaudz pagaidām, darbus ķer strādnieki:

Zaļie, protams, savu darbu ir veiksmīgi pabeiguši. Sarkanie nav īpaši veiksmīgi.
Starp citu, mūsu prod nav mapes
./dags, nav sinhronizācijas starp mašīnām - visi dags atrodas iekšāgitmūsu Gitlab, un Gitlab CI izplata atjauninājumus iekārtām, kad tās tiek apvienotasmaster.
Mazliet par Ziedu
Kamēr strādnieki dauza mūsu knupīšus, atcerēsimies vēl vienu rīku, kas var mums kaut ko parādīt - Ziedu.
Pati pirmā lapa ar kopsavilkuma informāciju par darbinieku mezgliem:

Visintensīvākā lapa ar uzdevumiem, kas tika veikti:

Garlaicīgākā lapa ar mūsu brokera statusu:

Spilgtākā lapa ir ar uzdevumu statusa grafikiem un to izpildes laiku:

Mēs ielādējam nepietiekami noslogotos
Tātad, visi uzdevumi ir izpildīti, jūs varat aizvest ievainotos.

Un bija daudz ievainoto - viena vai otra iemesla dēļ. Pareizas Airflow lietošanas gadījumā tieši šie kvadrāti norāda, ka dati noteikti nav saņemti.
Jums jāskatās žurnāls un jārestartē kritušās uzdevumu instances.
Noklikšķinot uz jebkura kvadrāta, mēs redzēsim mums pieejamās darbības:

Jūs varat ņemt un padarīt Clear kritušo. Tas ir, mēs aizmirstam, ka tur kaut kas neizdevās, un tas pats instances uzdevums tiks nosūtīts plānotājam.

Skaidrs, ka to darīt ar peli ar visiem sarkanajiem kvadrātiņiem nav īpaši humāni – tas nav tas, ko mēs sagaidām no Airflow. Protams, mums ir masu iznīcināšanas ieroči: Browse/Task Instances

Atlasīsim visu uzreiz un atiestatīsim uz nulli, noklikšķiniet uz pareizā vienuma:

Pēc tīrīšanas mūsu taksometri izskatās šādi (viņi jau gaida, kad plānotājs tos saplānos):

Savienojumi, āķi un citi mainīgie
Ir pienācis laiks apskatīt nākamo DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Vai visi kādreiz ir veikuši pārskata atjaunināšanu? Šī atkal ir viņa: ir saraksts ar avotiem, no kuriem iegūt datus; ir saraksts, kur likt; neaizmirstiet paburkšķēt, kad viss notika vai salūza (nu, tas nav par mums, nē).
Pārskatīsim failu vēlreiz un apskatīsim jaunos neskaidros materiālus:
from commons.operators import TelegramBotSendMessage- nekas neliedz mums izveidot savus operatorus, ko izmantojām, izveidojot nelielu iesaiņojumu ziņojumu nosūtīšanai uz Unbloed. (Par šo operatoru vairāk runāsim tālāk);default_args={}- dag var izplatīt vienādus argumentus visiem saviem operatoriem;to='{{ var.value.all_the_kings_men }}'- laukstomums nebūs kodēts, bet dinamiski ģenerēts, izmantojot Jinja un mainīgo ar e-pasta sarakstu, ko es rūpīgi ievietojuAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— nosacījums operatora palaišanai. Mūsu gadījumā vēstule aizlidos pie priekšniekiem tikai tad, ja visas atkarības būs atrisinātas veiksmīgi;tg_bot_conn_id='tg_main'- argumenticonn_idpieņemt mūsu izveidotos savienojuma IDAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- ziņojumi telegrammā aizlidos tikai tad, ja būs nokrituši uzdevumi;task_concurrency=1- mēs aizliedzam vairāku viena uzdevuma uzdevumu vienlaicīgu palaišanu. Pretējā gadījumā mēs saņemsim vairāku vienlaicīgu palaišanuVerticaOperator(skatoties uz vienu galdu);report_update >> [email, tg]- vissVerticaOperatorsaplūst vēstuļu un ziņojumu sūtīšanā, piemēram:

Bet, tā kā paziņotāju operatoriem ir dažādi palaišanas nosacījumi, darbosies tikai viens. Koka skatā viss izskatās mazāk vizuāli:

Es teikšu dažus vārdus par makro un viņu draugi - mainīgie.
Makro ir Jinja vietturi, kas operatora argumentos var aizstāt dažādu noderīgu informāciju. Piemēram, šādi:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} tiks paplašināts līdz konteksta mainīgā saturam execution_date formātā YYYY-MM-DD: 2020-07-14. Labākais ir tas, ka konteksta mainīgie tiek pienagloti konkrētam uzdevuma gadījumam (kvadrātiņam koka skatā), un, restartējot, vietturi tiks paplašināti līdz tādām pašām vērtībām.
Piešķirtās vērtības var apskatīt, izmantojot pogu Rendered katrā uzdevuma instancē. Šis ir uzdevums ar vēstules nosūtīšanu:

Un tā uzdevumā ar ziņojuma nosūtīšanu:

Pilns jaunākās pieejamās versijas iebūvēto makro saraksts ir pieejams šeit:
Turklāt ar spraudņu palīdzību mēs varam deklarēt savus makro, bet tas ir cits stāsts.
Papildus iepriekš definētajām lietām mēs varam aizstāt mūsu mainīgo vērtības (es to jau izmantoju iepriekš minētajā kodā). Izveidosim iekšā Admin/Variables pāris lietas:

Viss, ko varat izmantot:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Vērtība var būt skalārs vai arī JSON. JSON gadījumā:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}vienkārši izmantojiet ceļu uz vajadzīgo atslēgu: {{ var.json.bot_config.bot.token }}.
Es burtiski teikšu vienu vārdu un parādīšu vienu ekrānuzņēmumu par savienojumi. Šeit viss ir elementāri: lapā Admin/Connections mēs izveidojam savienojumu, pievienojam savus pieteikumvārdus / paroles un specifiskākus parametrus. Kā šis:

Paroles var šifrēt (pamatīgāk nekā noklusējuma), vai arī varat nepievienot savienojuma veidu (kā es to darīju tg_main) - fakts ir tāds, ka Airflow modeļos tipu saraksts ir savienots un to nevar paplašināt, neiedziļinoties avota kodos (ja pēkšņi kaut ko nemeklēju googlē, lūdzu, izlabojiet mani), taču nekas netraucēs mums iegūt kredītus. nosaukums.
Varat arī izveidot vairākus savienojumus ar tādu pašu nosaukumu: šajā gadījumā metode BaseHook.get_connection(), kas iegūst mums savienojumus pēc nosaukuma, dos nejauši no vairākiem vārdabrāliem (loģiskāk būtu uztaisīt Round Robin, bet atstāsim to uz Airflow izstrādātāju sirdsapziņas).
Mainīgie un savienojumi noteikti ir lieliski rīki, taču ir svarīgi nezaudēt līdzsvaru: kuras plūsmas daļas jūs saglabājat pašā kodā un kuras daļas nododat glabāšanai Airflow. No vienas puses, var būt ērti ātri mainīt vērtību, piemēram, pasta kastīti, izmantojot lietotāja saskarni. No otras puses, šī joprojām ir atgriešanās pie peles klikšķa, no kura mēs (es) gribējām atbrīvoties.
Darbs ar savienojumiem ir viens no uzdevumiem āķi. Kopumā Airflow āķi ir punkti, lai to savienotu ar trešo pušu pakalpojumiem un bibliotēkām. Piemēram, JiraHook atvērs klientu, lai mēs varētu sadarboties ar Jira (jūs varat pārvietot uzdevumus uz priekšu un atpakaļ), un ar SambaHook varat nosūtīt vietējo failu smb- punkts.
Pielāgotā operatora parsēšana
Un mēs tuvojāmies tam, lai apskatītu, kā tas ir izgatavots TelegramBotSendMessage
Kods commons/operators.py ar faktisko operatoru:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Šeit, tāpat kā viss pārējais Airflow, viss ir ļoti vienkāršs:
- Mantojums no
BaseOperator, kas ievieš diezgan daudz ar gaisa plūsmu raksturīgu lietu (skatieties uz savu atpūtu) - Deklarētie lauki
template_fields, kurā Jinja meklēs makro, ko apstrādāt. - Sakārtoja pareizos argumentus par
__init__(), iestatiet noklusējuma iestatījumus, ja nepieciešams. - Neaizmirsām arī par senča inicializāciju.
- Atvēra atbilstošo āķi
TelegramBotHooksaņēma no tā klienta objektu. - Ignorēta (pārdefinēta) metode
BaseOperator.execute(), kuru Airfow raustīs, kad pienāks laiks palaist operatoru - tajā mēs īstenosim galveno darbību, aizmirstot pieteikties. (Starp citu, mēs piesakāmies tiešistdoutиstderr- Gaisa plūsma visu pārtvers, skaisti iesaiņos, sadalīs, kur nepieciešams.)
Paskatīsimies, kas mums ir commons/hooks.py. Pirmā faila daļa ar pašu āķi:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientEs pat nezinu, ko šeit paskaidrot, es tikai atzīmēšu svarīgos punktus:
- Mēs mantojam, domājam par argumentiem - vairumā gadījumu tas būs viens:
conn_id; - Standartmetožu ignorēšana: es sevi ierobežoju
get_conn(), kurā es iegūstu savienojuma parametrus pēc nosaukuma un vienkārši iegūstu sadaļuextra(tas ir JSON lauks), kurā es (saskaņā ar saviem norādījumiem!) ievietoju Telegram robota pilnvaru:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Es izveidoju mūsu piemēru
TelegramBot, piešķirot tam īpašu pilnvaru.
Tas ir viss. Jūs varat iegūt klientu no āķa, izmantojot TelegramBotHook().clent vai TelegramBotHook().get_conn().
Un faila otrā daļa, kurā es izveidoju mikroiesaiņojumu Telegram REST API, lai nevilktu to pašu vienai metodei sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Pareizais veids ir to visu saskaitīt:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- spraudnī ievietojiet publiskā repozitorijā un nododiet to atvērtajam pirmkodam.
Kamēr mēs to visu pētījām, mūsu pārskatu atjauninājumi veiksmīgi neizdevās un man kanālā nosūtīja kļūdas ziņojumu. Es iešu pārbaudīt, vai tas nav kārtībā...

Mūsu dogā kaut kas salūza! Vai tas nav tas, ko mēs gaidījām? tieši tā!
Vai tu taisies liet?
Vai tev liekas, ka es kaut ko palaidu garām? Šķiet, ka viņš solīja pārsūtīt datus no SQL servera uz Vertica un tad ņēma un aizgāja no tēmas, nelietis!
Šī zvērība bija tīša, man vienkārši bija jāatšifrē kāda terminoloģija jūsu vietā. Tagad jūs varat doties tālāk.
Mūsu plāns bija šāds:
- Do dag
- Ģenerējiet uzdevumus
- Redziet, cik viss ir skaisti
- Piešķiriet aizpildījumam sesijas numurus
- Iegūstiet datus no SQL Server
- Ievietojiet datus Vertica
- Savākt statistiku
Tāpēc, lai tas viss sāktu darboties, es veicu nelielu papildinājumu mūsu docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyTur mēs paaugstinām:
- Vertica kā saimniekdators
dwhar visvairāk noklusējuma iestatījumiem, - trīs SQL Server gadījumi,
- mēs aizpildām pēdējās esošās datu bāzes ar dažiem datiem (nekādā gadījumā neieskatieties
mssql_init.py!)
Mēs palaižam visu labo, izmantojot nedaudz sarežģītāku komandu nekā pagājušajā reizē:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Varat izmantot šo vienumu, ko ģenerēja mūsu brīnumu nejaušinātājs Data Profiling/Ad Hoc Query:

Galvenais to nerādīt analītiķiem
sīkāk izstrādāt ETL sesijas Es nedarīšu, tur viss ir triviāli: mēs izveidojam pamatni, tajā ir zīme, mēs visu aptinam ar konteksta pārvaldnieku, un tagad mēs darām šādi:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passLaiks ir pienācis apkopot mūsu datus no mūsu pusotra simta galdiņiem. Darīsim to ar ļoti nepretenciozu līniju palīdzību:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Ar āķa palīdzību tiekam no Airflow
pymssql- savienot - Aizstāsim pieprasījumā ierobežojumu datuma veidā — to funkcijā iemetīs veidnes dzinējs.
- Tiek izpildīts mūsu pieprasījums
pandaskurš mūs dabūsDataFrame- tas mums noderēs nākotnē.
Es izmantoju aizstāšanu
{dt}pieprasījuma parametra vietā%snevis tāpēc, ka es būtu ļauns Pinokio, bet gan tāpēcpandasnevar tikt galāpymssqlun paslīd pēdējoparams: Listlai gan viņš ļoti vēlastuple.
Ņemiet vērā arī to, ka izstrādātājspymssqlnolēma viņu vairs neatbalstīt, un ir pienācis laiks izvāktiespyodbc.
Apskatīsim, ar ko Airflow papildināja mūsu funkciju argumentus:

Ja nav datu, tad nav jēgas turpināt. Bet ir arī dīvaini uzskatīt pildījumu par veiksmīgu. Bet tā nav kļūda. A-ah-ah, ko darīt?! Un, lūk, kas:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException pateiks Airflow, ka kļūdu nav, bet mēs izlaižam uzdevumu. Interfeisam būs nevis zaļš vai sarkans kvadrāts, bet gan rozā.
Izmetīsim savus datus vairākas kolonnas:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Proti
- Datubāze, no kuras mēs saņēmām pasūtījumus,
- Mūsu plūdu sesijas ID (tas būs atšķirīgs katram uzdevumam),
- Hash no avota un pasūtījuma ID - lai gala datu bāzē (kur viss ir saliets vienā tabulā) mums būtu unikāls pasūtījuma ID.
Atliek priekšpēdējais solis: ielej visu Vertikā. Un, dīvainā kārtā, viens no iespaidīgākajiem un efektīvākajiem veidiem, kā to izdarīt, ir CSV fails!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Mēs izgatavojam īpašu uztvērēju
StringIO. pandaslaipni ieliks mūsuDataFrameformāCSV- līnijas.- Atvērsim savienojumu ar mūsu iecienīto Vertica ar āķi.
- Un tagad ar palīdzību
copy()nosūtiet mūsu datus tieši Vertika!
Mēs paņemsim no vadītāja, cik rindu ir aizpildītas, un pateiksim sesijas vadītājam, ka viss ir kārtībā:
session.loaded_rows = cursor.rowcount
session.successful = TrueTas ir viss.
Pārdošanā mēs manuāli izveidojam mērķa plāksni. Šeit es atļāvu sev nelielu mašīnu:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)es lietoju
VerticaOperator()Es izveidoju datu bāzes shēmu un tabulu (ja tādas vēl nav, protams). Galvenais ir pareizi sakārtot atkarības:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadApkopojot
- Nu, - sacīja mazā pele, - vai ne, tagad
Vai esat pārliecināts, ka esmu visbriesmīgākais dzīvnieks mežā?
Džūlija Donaldsone, Grufalo
Es domāju, ja man un maniem kolēģiem būtu konkurss: kurš ātri izveidos un sāks ETL procesu no nulles: viņi ar savu SSIS un peli un es ar Airflow ... Un tad mēs arī salīdzinātu apkopes vieglumu ... Oho, es domāju, ka piekritīsiet, ka es viņus pārspēšu visās frontēs!
Ja nedaudz nopietnāk, tad Apache Airflow - aprakstot procesus programmas koda veidā - izdarīja manu darbu daudz ērtāk un patīkamāk.
Tā neierobežotā paplašināmība gan spraudņu, gan mērogojamības ziņā sniedz iespēju izmantot Airflow gandrīz jebkurā jomā: pat pilnā datu vākšanas, sagatavošanas un apstrādes ciklā, pat palaižot raķetes (uz Marsu, no kurss).
Daļas noslēgums, atsauce un informācija
Grābeklis, ko esam jums savākuši
start_date. Jā, šī jau ir vietēja mēma. Via Doug galvenais argumentsstart_dateviss pāriet. Īsumā, ja norādātstart_datepašreizējais datums unschedule_interval- kādu dienu, tad DAG sāksies rīt ne agrāk.start_date = datetime(2020, 7, 7, 0, 1, 2)Un vairs nekādu problēmu.
Ar to ir saistīta cita izpildlaika kļūda:
Task is missing the start_date parameter, kas visbiežāk norāda, ka esat aizmirsis saistīt ar dag operatoru.- Viss vienā mašīnā. Jā, un bāzes (pati Airflow un mūsu pārklājums), un tīmekļa serveris, un plānotājs, un darbinieki. Un tas pat strādāja. Bet laika gaitā pakalpojumu uzdevumu skaits pieauga, un, kad PostgreSQL sāka reaģēt uz indeksu 20 s, nevis 5 ms, mēs to paņēmām un aiznesām.
- Vietējais izpildītājs. Jā, mēs joprojām uz tā sēžam, un jau esam nonākuši bezdibeņa malā. Ar LocalExecutor mums līdz šim ir bijis pietiekami, bet tagad ir pienācis laiks paplašināties ar vismaz vienu darbinieku, un mums būs smagi jāstrādā, lai pārietu uz CeleryExecutor. Un, ņemot vērā to, ka jūs varat strādāt ar to vienā mašīnā, nekas neliedz jums izmantot Selery pat serverī, kas, "protams, nekad nenonāks ražošanā, godīgi sakot!"
- Nelietošana iebūvētie instrumenti:
- savienojumi uzglabāt pakalpojumu akreditācijas datus,
- SLA Miss reaģēt uz uzdevumiem, kas nav izdevies laikā,
- xcom metadatu apmaiņai (es teicu mērķisdati!) starp dag uzdevumiem.
- Pasta ļaunprātīga izmantošana. Nu ko es varu teikt? Par visiem kritušo uzdevumu atkārtojumiem tika iestatīti brīdinājumi. Tagad manā darba pakalpojumā Gmail ir vairāk nekā 90 100 e-pasta ziņojumu no Airflow, un tīmekļa pasta uzgalis atsakās vienlaikus uztvert un dzēst vairāk nekā XNUMX.
Vairāk kļūmju:
Vairāk automatizācijas rīku
Lai mēs vēl vairāk strādātu ar galvu, nevis ar rokām, Airflow mums ir sagatavojis sekojošo:
- - viņam joprojām ir Eksperimentāla statuss, kas viņam netraucē strādāt. Ar to jūs varat ne tikai iegūt informāciju par dagiem un uzdevumiem, bet arī apturēt/sākt dag, izveidot DAG Run vai pūlu.
- - Komandrindā ir pieejami daudzi rīki, kurus ir ne tikai neērti lietot, izmantojot WebUI, bet arī parasti to nav. Piemēram:
backfillnepieciešams, lai restartētu uzdevumu gadījumus.
Piemēram, atnāca analītiķi un teica: “Un jums, biedri, ir muļķības datos no 1. līdz 13. janvārim! Labojiet, labojiet, labojiet, labojiet!" Un tu esi tāda plīts virsma:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Bāzes pakalpojums:
initdb,resetdb,upgradedb,checkdb. run, kas ļauj izpildīt vienu gadījumu uzdevumu un pat iegūt punktus par visām atkarībām. Turklāt jūs varat to palaist, izmantojotLocalExecutor, pat ja jums ir seleriju kopa.- Dara gandrīz to pašu
test, tikai arī bāzēs neko neraksta. connectionsļauj masveidā izveidot savienojumus no čaulas.
- - diezgan stingrs mijiedarbības veids, kas paredzēts spraudņiem, nevis spieto tajā ar mazām rociņām. Bet kas mums liedz iet
/home/airflow/dags, palaistipythonun sākt jaukties? Varat, piemēram, eksportēt visus savienojumus ar šādu kodu:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Savienojuma izveide ar Airflow metadatu bāzi. Es neiesaku tai rakstīt, taču uzdevumu stāvokļu iegūšana dažādiem specifiskiem rādītājiem var būt daudz ātrāka un vienkāršāka nekā ar kādu no API.
Pieņemsim, ka ne visi mūsu uzdevumi ir idempotenti, bet dažreiz tie var nokrist, un tas ir normāli. Bet daži aizsprostojumi jau ir aizdomīgi, un tas būtu jāpārbauda.
Uzmanieties no SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
atsauces
Un, protams, pirmās desmit saites no Google izdošanas ir mapes Airflow saturs no manām grāmatzīmēm.
- - Protams, jāsāk ar biroju. dokumentāciju, bet kurš lasa instrukcijas?
- – Labi, vismaz izlasi veidotāju ieteikumus.
- - pats sākums: lietotāja interfeiss attēlos
- - pamatjēdzieni ir labi aprakstīti, ja (pēkšņi!) Jūs kaut ko no manis nesapratāt.
- - īss ceļvedis gaisa plūsmas klastera iestatīšanai.
- - gandrīz tas pats interesants raksts, izņemot varbūt vairāk formālismu un mazāk piemēru.
- — par sadarbību ar Seleriju.
- - par uzdevumu idempotenci, ielādi pēc ID datuma vietā, transformāciju, failu struktūru un citām interesantām lietām.
- - uzdevumu atkarības un Trigger Rule, ko minēju tikai garāmejot.
- - kā pārvarēt dažus "darbus, kā paredzēts" plānotājā, ielādēt zaudētos datus un noteikt uzdevumu prioritātes.
- — noderīgi SQL vaicājumi Airflow metadatiem.
- - ir noderīga sadaļa par pielāgota sensora izveidi.
- — interesanta īsa piezīme par infrastruktūras izveidi AWS datu zinātnei.
- - izplatītas kļūdas (ja kāds joprojām neizlasa instrukcijas).
- - smaidiet, kā cilvēki uzglabā paroles, lai gan jūs varat vienkārši izmantot savienojumus.
- - netieša DAG pārsūtīšana, konteksta ievadīšana funkcijās, atkal par atkarībām un arī par uzdevumu palaišanas izlaišanu.
- - par lietošanu
default argumentsиparamsveidnēs, kā arī mainīgajos un savienojumos. - - stāsts par to, kā plānotājs gatavojas Airflow 2.0.
- - nedaudz novecojis raksts par mūsu klastera izvietošanu
docker-compose. - - dinamiski uzdevumi, izmantojot veidnes un konteksta pārsūtīšanu.
- — standarta un pielāgoti paziņojumi pa pastu un Slack.
- - Sazarojumu uzdevumi, makro un XCom.
Un rakstā izmantotās saites:
- - Vietturi, kas pieejami lietošanai veidnēs.
- — Biežākās kļūdas, veidojot dags.
- Sākot no
docker-composeeksperimentēšanai, atkļūdošanai un citiem. - — Python iesaiņojums Telegram REST API.
Avots: www.habr.com




