Përshëndetje, unë jam Dmitry Logvinenko - Inxhinier i të Dhënave i Departamentit të Analitikës të grupit të kompanive Vezet.
Unë do t'ju tregoj për një mjet të mrekullueshëm për zhvillimin e proceseve ETL - Apache Airflow. Por Airflow është kaq i gjithanshëm dhe i shumëanshëm sa duhet ta shikoni më nga afër edhe nëse nuk jeni të përfshirë në rrjedhat e të dhënave, por keni nevojë të nisni periodikisht çdo proces dhe të monitoroni ekzekutimin e tyre.
Dhe po, unë jo vetëm që do të tregoj, por edhe do të tregoj: programi ka shumë kode, pamje nga ekrani dhe rekomandime.

ĂfarĂ« shihni zakonisht kur kĂ«rkoni nĂ« google fjalĂ«n Airflow / Wikimedia Commons
Përmbajtje
Paraqitje
Apache Airflow është njësoj si Django:
- shkruar në python
- ka një panel të madh admin,
- zgjerohet pafundësisht
- vetëm më mirë, dhe është bërë për qëllime krejtësisht të ndryshme, domethënë (siç është shkruar para kata):
- ekzekutimi dhe monitorimi i detyrave në një numër të pakufizuar makinerish (aq sa Celery / Kubernetes dhe ndërgjegjja juaj do t'ju lejojë)
- me gjenerim dinamik të rrjedhës së punës nga kodi Python shumë i lehtë për t'u shkruar dhe kuptuar
- dhe aftësinë për të lidhur çdo bazë të dhënash dhe API me njëra-tjetrën duke përdorur si komponentë të gatshëm ashtu edhe shtojca të bëra në shtëpi (që është jashtëzakonisht e thjeshtë).
Ne përdorim Apache Airflow si kjo:
- ne mbledhim të dhëna nga burime të ndryshme (shumë instanca të SQL Server dhe PostgreSQL, API të ndryshme me metrikë aplikacioni, madje edhe 1C) në DWH dhe ODS (kemi Vertica dhe Clickhouse).
- sa e avancuar
cron, i cili fillon proceset e konsolidimit të të dhënave në ODS, si dhe monitoron mirëmbajtjen e tyre.
Deri vonë, nevojat tona mbuloheshin nga një server i vogël me 32 bërthama dhe 50 GB RAM. Në Airflow, kjo funksionon:
- më shumë 200 dags (në të vërtetë flukset e punës, në të cilat kemi mbushur detyrat),
- në secilin mesatarisht 70 detyra,
- fillon kjo mirësi (edhe mesatarisht) një herë në orë.
Dhe pĂ«r mĂ«nyrĂ«n se si u zgjeruam, do tĂ« shkruaj mĂ« poshtĂ«, por tani le tĂ« pĂ«rcaktojmĂ« problemin ĂŒber qĂ« do tĂ« zgjidhim:
Ekzistojnë tre serverë SQL me burim, secili me 50 baza të dhënash - shembuj të një projekti, përkatësisht, ata kanë të njëjtën strukturë (pothuajse kudo, mua-ha-ha), që do të thotë se secili ka një tabelë Urdhrash (për fat, një tabelë me atë emri mund të shtyhet në çdo biznes). Ne i marrim të dhënat duke shtuar fushat e shërbimit (server burimi, baza e të dhënave burimore, ID-ja e detyrës ETL) dhe në mënyrë naive i hedhim ato në, të themi, Vertica.
Le të shkojë!
Pjesa kryesore, praktike (dhe pak teorike)
Pse ne (dhe ju)
Kur pemët ishin të mëdha dhe unë isha i thjeshtë SQL-sik në një shitje me pakicë ruse, ne mashtruam proceset ETL të quajtura rrjedhat e të dhënave duke përdorur dy mjete të disponueshme për ne:
- Qendra e Energjisë Informatica - një sistem jashtëzakonisht i përhapur, jashtëzakonisht produktiv, me harduerin e tij, versionin e tij. Unë përdora Zoti na ruajt 1% të aftësive të saj. Pse? Epo, para së gjithash, kjo ndërfaqe, diku nga vitet 380, na bëri presion mendërisht. Së dyti, ky konstruksion është projektuar për procese jashtëzakonisht të bukura, ripërdorim të furishëm të komponentëve dhe truke të tjera shumë të rëndësishme të ndërmarrjes. Për faktin se kushton, si krahu i Airbus AXNUMX / vit, nuk do të themi asgjë.
Kujdes, një pamje nga ekrani mund të dëmtojë pak njerëzit nën 30 vjeç

- Serveri i integrimit të serverit SQL - ne e kemi përdorur këtë shok në rrjedhat tona brenda projektit. Epo, në fakt: ne tashmë përdorim SQL Server, dhe do të ishte disi e paarsyeshme të mos përdorim mjetet e tij ETL. Gjithçka në të është e mirë: edhe ndërfaqja është e bukur, edhe raportet e progresit... Por kjo nuk është arsyeja pse ne i duam produktet softuerike, oh, jo për këtë. Versioni i tij
dtsx(që është XML me nyje të përziera në ruajtje) mundemi, por cila është pika? Si thua të bësh një paketë detyrash që do të tërheqë qindra tabela nga një server në tjetrin? Po, sa njëqind, gishti tregues do të bjerë nga njëzet pjesë, duke klikuar në butonin e miut. Por padyshim që duket më në modë:
Sigurisht që kërkuam rrugëdalje. Madje rasti pothuajse erdhi në një gjenerator të paketave SSIS të shkruar vetë ...
âŠdhe mĂ« pas mĂ« gjeti njĂ« punĂ« e re. Dhe Apache Airflow mĂ« kapi mbi tĂ«.
Kur kuptova se pĂ«rshkrimet e procesit ETL janĂ« kod tĂ« thjeshtĂ« Python, thjesht nuk kĂ«rceva nga gĂ«zimi. KĂ«shtu u versionuan dhe u ndryshuan rrjedhat e tĂ« dhĂ«nave, dhe derdhja e tabelave me njĂ« strukturĂ« tĂ« vetme nga qindra baza tĂ« dhĂ«nash nĂ« njĂ« objektiv u bĂ« çështje e kodit tĂ« Python nĂ« njĂ« ekran e gjysmĂ« ose dy 13â .
Mbledhja e grupit
Le të mos organizojmë një kopsht fëmijësh plotësisht dhe të mos flasim për gjëra krejtësisht të dukshme këtu, si instalimi i Airflow, databaza e zgjedhur nga ju, Selinoja dhe rastet e tjera të përshkruara në doke.
Që të mund të fillojmë menjëherë eksperimentet, skicova docker-compose.yml në të cilën:
- Le të ngremë në fakt Airflow: Scheduler, Webserver. Lulja gjithashtu do të rrotullohet atje për të monitoruar detyrat e Selinos (sepse ajo tashmë është futur
apache/airflow:1.10.10-python3.7, por nuk na shqetëson) - PostgreSQL, në të cilin Airflow do të shkruajë informacionin e tij të shërbimit (të dhënat e planifikuesit, statistikat e ekzekutimit, etj.), dhe Celery do të shënojë detyrat e përfunduara;
- Redis, i cili do të veprojë si një ndërmjetës detyrash për Selino;
- Punëtor selino, të cilat do të angazhohen në ekzekutimin e drejtpërdrejtë të detyrave.
- NĂ« dosje
./dagsne do të shtojmë skedarët tanë me përshkrimin e dags. Ata do të merren menjëherë, kështu që nuk ka nevojë të mashtroni të gjithë pirgun pas çdo teshtitjeje.
Në disa vende, kodi në shembuj nuk tregohet plotësisht (për të mos rrëmuar tekstin), por diku modifikohet gjatë procesit. Shembuj të plotë të kodit të punës mund të gjenden në depo .
prerës-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerShenime:
- Në montimin e kompozimit u mbështeta kryesisht në imazhin e njohur - Sigurohuni që ta kontrolloni. Ndoshta nuk keni nevojë për asgjë tjetër në jetën tuaj.
- Të gjitha cilësimet e rrjedhës së ajrit janë të disponueshme jo vetëm përmes
airflow.cfg, por edhe përmes variablave të mjedisit (falë zhvilluesve), nga të cilat kam përfituar keqdashje. - Natyrisht, nuk është gati për prodhim: qëllimisht nuk vendosa rrahje zemre në kontejnerë, nuk u mërzita me sigurinë. Por unë bëra minimumin e përshtatshëm për eksperimentuesit tanë.
- Vini re se:
- Dosja dag duhet të jetë e aksesueshme si për planifikuesin ashtu edhe për punëtorët.
- E njëjta gjë vlen për të gjitha bibliotekat e palëve të treta - ato duhet të instalohen të gjitha në makina me një planifikues dhe punëtorë.
Epo, tani është e thjeshtë:
$ docker-compose up --scale worker=3Pasi të ngrihet gjithçka, mund të shikoni ndërfaqet në internet:
- airflow:
- Lule:
Konceptet themelore
Nëse nuk keni kuptuar asgjë në të gjitha këto "dags", atëherë këtu është një fjalor i shkurtër:
- Programues - xhaxhai më i rëndësishëm në Airflow, duke kontrolluar që robotët të punojnë shumë, dhe jo një person: monitoron orarin, përditëson dags, nis detyrat.
Në përgjithësi, në versionet më të vjetra, ai kishte probleme me kujtesën (jo, jo amnezi, por rrjedhje) dhe parametri i trashëgimisë madje mbeti në konfigurime
run_duration- intervali i rifillimit të tij. Por tani gjithçka është në rregull. - Dag (aka "dag") - "grafiku aciklik i drejtuar", por një përkufizim i tillë do t'u tregojë pak njerëzve, por në fakt është një enë për detyrat që ndërveprojnë me njëra-tjetrën (shih më poshtë) ose një analog i Paketës në SSIS dhe Workflow në Informatica .
Përveç dags, mund të ketë ende nëndegë, por ne me shumë mundësi nuk do t'i arrijmë ato.
- DAG Run - dag i inicializuar, i cili është caktuar i veti
execution_date. Dagranët e të njëjtit dag mund të punojnë paralelisht (nëse i keni bërë detyrat tuaja idempotente, sigurisht). - operator janë pjesë kodi përgjegjëse për kryerjen e një veprimi specifik. Ekzistojnë tre lloje të operatorëve:
- veprimsi e preferuara jonë
PythonOperator, i cili mund të ekzekutojë çdo kod (të vlefshëm) Python; - transferuar, të cilat transportojnë të dhëna nga një vend në tjetrin, të themi,
MsSqlToHiveTransfer; - sensor nga ana tjetër, do t'ju lejojë të reagoni ose të ngadalësoni ekzekutimin e mëtejshëm të dagut derisa të ndodhë një ngjarje.
HttpSensormund tĂ« tĂ«rheqĂ« pikĂ«n pĂ«rfundimtare tĂ« specifikuar, dhe kur pĂ«rgjigja e dĂ«shiruar Ă«shtĂ« nĂ« pritje, filloni transferiminGoogleCloudStorageToS3Operator. NjĂ« mendje kureshtare do tĂ« pyesĂ«: âpse? NĂ« fund tĂ« fundit, ju mund tĂ« bĂ«ni pĂ«rsĂ«ritje pikĂ«risht nĂ« operator!â Dhe pastaj, pĂ«r tĂ« mos bllokuar grupin e detyrave me operatorĂ« tĂ« pezulluar. Sensori fillon, kontrollon dhe vdes pĂ«rpara pĂ«rpjekjes tjetĂ«r.
- veprimsi e preferuara jonë
- Detyrë - Operatorët e deklaruar, pavarësisht nga lloji, dhe të bashkangjitur me dag promovohen në gradën e detyrës.
- shembulli i detyrës - kur planifikuesi i përgjithshëm vendosi se ishte koha për të dërguar detyrat në betejë për punonjësit e performancës (në vend, nëse përdorim
LocalExecutorose në një nyje të largët në rastin eCeleryExecutor), u cakton atyre një kontekst (d.m.th., një grup variablash - parametra ekzekutimi), zgjeron modelet e komandave ose pyetjeve dhe i bashkon ato.
Ne gjenerojmë detyra
Së pari, le të përshkruajmë skemën e përgjithshme të brumit tonë, dhe më pas do të zhytemi në detaje gjithnjë e më shumë, sepse aplikojmë disa zgjidhje jo të parëndësishme.
Pra, në formën e tij më të thjeshtë, një dag i tillë do të duket kështu:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Le ta kuptojmë:
- Së pari, ne importojmë libs e nevojshme dhe diçka tjetër;
sql_server_ds- AList[namedtuple[str, str]]me emrat e lidhjeve nga Airflow Connections dhe bazat e të dhënave nga të cilat do të marrim pllakën tonë;dag- njoftimi i dagut tonë, i cili duhet të jetë domosdoshmërisht nëglobals(), përndryshe Airflow nuk do ta gjejë atë. Doug gjithashtu duhet të thotë:- si e ka emrin
orders- ky emër do të shfaqet më pas në ndërfaqen e internetit, - se ai do të punojë nga mesnata e tetë korrikut,
- dhe duhet të funksionojë, afërsisht çdo 6 orë (për djemtë e fortë këtu në vend të
timedelta()e pranueshmecron-linjë0 0 0/6 ? * * *, për më pak cool - një shprehje si@daily);
- si e ka emrin
workflow()do të bëjë punën kryesore, por jo tani. Tani për tani, ne thjesht do ta hedhim kontekstin tonë në regjistër.- Dhe tani magjia e thjeshtë e krijimit të detyrave:
- ne kalojmë nëpër burimet tona;
- inicializoj
PythonOperator, i cili do të ekzekutojë bedelin tonëworkflow(). Mos harroni të specifikoni një emër unik (brenda dag) të detyrës dhe lidhni vetë dag. Flamuriprovide_contextnga ana tjetër, do të derdhë argumente shtesë në funksion, të cilat do t'i mbledhim me kujdes duke përdorur**context.
Tani pĂ«r tani, kjo Ă«shtĂ« e gjitha. ĂfarĂ« kemi marrĂ«:
- Dag i ri në ndërfaqen e internetit,
- njëqind e gjysmë detyra që do të ekzekutohen paralelisht (nëse e lejojnë cilësimet Airflow, Celery dhe kapaciteti i serverit).
Epo, pothuajse e kuptova.

Kush do të instalojë varësitë?
Për ta thjeshtuar të gjithë këtë gjë, u futa docker-compose.yml përpunimit requirements.txt në të gjitha nyjet.
Tani ka ikur:

Sheshat gri janë raste detyrash të përpunuara nga planifikuesi.
Ne presim pak, detyrat janë këputur nga punëtorët:

Të gjelbërt sigurisht që e kanë përfunduar me sukses punën e tyre. Të kuqtë nuk janë shumë të suksesshëm.
Nga rruga, nuk ka asnjë dosje në prodhimin tonë
./dags, nuk ka sinkronizim midis makinave - të gjitha dags qëndrojnë brendagitnë Gitlab tonë dhe Gitlab CI shpërndan përditësime te makinat kur bashkohenmaster.
Pak për Lulën
Ndërsa punëtorët po na rrahin biberonin, le të kujtojmë një mjet tjetër që mund të na tregojë diçka - Lulja.
Faqja e parë me informacion përmbledhës mbi nyjet e punëtorëve:

Faqja më intensive me detyrat që shkuan në punë:

Faqja më e mërzitshme me statusin e ndërmjetësit tonë:

Faqja më e ndritshme është me grafikët e statusit të detyrës dhe kohën e ekzekutimit të tyre:

Ne ngarkojmë nënngarkesat
Pra, të gjitha detyrat kanë funksionuar, ju mund të merrni me vete të plagosurit.

Dhe kishte shumë të plagosur - për një arsye ose një tjetër. Në rastin e përdorimit të saktë të Airflow, pikërisht këto katrorë tregojnë se të dhënat definitivisht nuk kanë mbërritur.
Ju duhet të shikoni regjistrin dhe të rinisni rastet e detyrave të rënë.
Duke klikuar nĂ« çdo katror, ââne do tĂ« shohim veprimet e disponueshme pĂ«r ne:

Ju mund të merrni dhe të pastroni të rënët. Kjo do të thotë, ne harrojmë se diçka ka dështuar atje, dhe e njëjta detyrë e shembullit do t'i shkojë planifikuesit.

ĂshtĂ« e qartĂ« se ta bĂ«sh kĂ«tĂ« me miun me tĂ« gjithĂ« katrorĂ«t e kuq nuk Ă«shtĂ« shumĂ« humane - kjo nuk Ă«shtĂ« ajo qĂ« presim nga Airflow. Natyrisht, ne kemi armĂ« tĂ« shkatĂ«rrimit nĂ« masĂ«: Browse/Task Instances

Le të zgjedhim gjithçka menjëherë dhe të rivendosim në zero, klikoni artikullin e duhur:

Pas pastrimit, taksitë tona duken kështu (ata tashmë presin që planifikuesi t'i planifikojë):

Lidhjet, grepa dhe variabla të tjerë
ĂshtĂ« koha pĂ«r tĂ« parĂ« DAG-nĂ« e radhĂ«s, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ĐĐŸŃĐżĐŸĐŽĐ° Ń
ĐŸŃĐŸŃОД, ĐŸŃŃĐ”ŃŃ ĐŸĐ±ĐœĐŸĐČĐ»Đ”ĐœŃ"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ĐаŃаŃ, ĐżŃĐŸŃŃпаĐčŃŃ, ĐŒŃ {{ dag.dag_id }} ŃŃĐŸĐœĐžĐ»Đž
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]A ka bërë të gjithë ndonjëherë një përditësim të raportit? Kjo është sërish ajo: ka një listë burimesh nga mund të merren të dhënat; ka një listë ku të vendosni; mos harroni të bini kur gjithçka ndodhi ose u prish (epo, kjo nuk ka të bëjë me ne, jo).
Le të kalojmë përsëri skedarin dhe të shohim gjërat e reja të paqarta:
from commons.operators import TelegramBotSendMessage- asgjĂ« nuk na pengon tĂ« bĂ«jmĂ« operatorĂ«t tanĂ«, tĂ« cilĂ«t e shfrytĂ«zuam duke bĂ«rĂ« njĂ« mbĂ«shtjellĂ«s tĂ« vogĂ«l pĂ«r dĂ«rgimin e mesazheve nĂ« Unblocked. (PĂ«r kĂ«tĂ« operator do tĂ« flasim mĂ« poshtĂ«);default_args={}- dag mund tĂ« shpĂ«rndajĂ« tĂ« njĂ«jtat argumente pĂ«r tĂ« gjithĂ« operatorĂ«t e tij;to='{{ var.value.all_the_kings_men }}'- fushĂ«tone nuk do tĂ« kemi tĂ« koduar, por tĂ« gjeneruar nĂ« mĂ«nyrĂ« dinamike duke pĂ«rdorur Jinja dhe njĂ« variabĂ«l me njĂ« listĂ« emailesh, tĂ« cilat i vendosa me kujdesAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESSâ kushti pĂ«r ndezjen e operatorit. NĂ« rastin tonĂ«, letra do tĂ« fluturojĂ« te shefat vetĂ«m nĂ«se tĂ« gjitha varĂ«sitĂ« kanĂ« funksionuar me sukses;tg_bot_conn_id='tg_main'- argumentetconn_idpranoni ID-tĂ« e lidhjes nĂ« tĂ« cilat ne krijojmĂ«Admin/Connections;trigger_rule=TriggerRule.ONE_FAILED- mesazhet nĂ« Telegram do tĂ« fluturojnĂ« larg vetĂ«m nĂ«se ka detyra tĂ« dĂ«shtuara;task_concurrency=1- ne ndalojmĂ« nisjen e njĂ«kohshme tĂ« disa rasteve tĂ« detyrave tĂ« njĂ« detyre. PĂ«rndryshe, ne do tĂ« marrim nisjen e njĂ«kohshme tĂ« disaVerticaOperator(duke shikuar nĂ« njĂ« tryezĂ«);report_update >> [email, tg]- tĂ« gjithaVerticaOperatorkonvergojnĂ« nĂ« dĂ«rgimin e letrave dhe mesazheve, si kjo:

Por meqenëse operatorët e njoftimit kanë kushte të ndryshme nisjeje, vetëm një do të funksionojë. Në Pamjen e Pemës, gjithçka duket pak më pak vizuale:

Unë do të them disa fjalë për makro dhe miqtë e tyre - variablave.
Makrot janë mbajtëse vendesh Jinja që mund të zëvendësojnë informacione të ndryshme të dobishme në argumentet e operatorit. Për shembull, si kjo:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} do të zgjerohet në përmbajtjen e ndryshores së kontekstit execution_date në format YYYY-MM-DD: 2020-07-14. Pjesa më e mirë është se variablat e kontekstit janë gozhduar në një shembull specifik të detyrës (një katror në Pamjen e Pemës), dhe kur të riniset, mbajtësit e vendeve do të zgjerohen në të njëjtat vlera.
Vlerat e caktuara mund të shihen duke përdorur butonin Rendered në çdo shembull të detyrës. Kjo është se si detyra me dërgimin e një letre:

Dhe kështu në detyrën me dërgimin e një mesazhi:

Një listë e plotë e makrove të integruara për versionin më të fundit të disponueshëm është në dispozicion këtu:
Për më tepër, me ndihmën e shtojcave, ne mund të deklarojmë makrot tona, por kjo është një histori tjetër.
Përveç gjërave të paracaktuara, ne mund të zëvendësojmë vlerat e variablave tanë (e kam përdorur tashmë në kodin e mësipërm). Le të krijojmë në Admin/Variables nja dy gjera:

Gjithçka që mund të përdorni:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Vlera mund të jetë skalar, ose mund të jetë gjithashtu JSON. Në rastin e JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}thjesht përdorni shtegun për në çelësin e dëshiruar: {{ var.json.bot_config.bot.token }}.
Do të them fjalë për fjalë një fjalë dhe do të tregoj një pamje nga ekrani lidhjet. Gjithçka është elementare këtu: në faqe Admin/Connections ne krijojmë një lidhje, shtojmë hyrjet / fjalëkalimet tona dhe parametra më specifikë atje. Si kjo:

Fjalëkalimet mund të kodohen (më shumë se standardi), ose mund të lini jashtë llojin e lidhjes (siç bëra për tg_main) - Fakti është se lista e llojeve është e integruar në modelet Airflow dhe nuk mund të zgjerohet pa hyrë në kodet burimore (nëse papritmas nuk kam kërkuar në google diçka, ju lutem më korrigjoni), por asgjë nuk do të na ndalojë të marrim kredite vetëm duke emri.
Ju gjithashtu mund të bëni disa lidhje me të njëjtin emër: në këtë rast, metoda BaseHook.get_connection(), e cila na merr lidhjet me emër, do të japë e rastit nga disa emra (do të ishte më logjike të bëhej Round Robin, por le ta lëmë në ndërgjegjen e zhvilluesve të Airflow).
Variablat dhe lidhjet janë padyshim mjete të mira, por është e rëndësishme të mos humbni ekuilibrin: cilat pjesë të rrjedhave tuaja ruani në vetë kodin dhe cilat pjesë i jepni Airflow për ruajtje. Nga njëra anë, mund të jetë e përshtatshme për të ndryshuar shpejt vlerën, për shembull, një kuti postare, përmes UI. Nga ana tjetër, ky është ende një rikthim në klikimin e miut, nga i cili ne (unë) donim të shpëtonim.
Puna me lidhje është një nga detyrat grepa. Në përgjithësi, grepat e rrjedhës së ajrit janë pika për lidhjen e tij me shërbimet dhe bibliotekat e palëve të treta. P.sh. JiraHook do të hapë një klient për ne që të ndërveprojmë me Jira (ju mund të lëvizni detyrat përpara dhe mbrapa), dhe me ndihmën e SambaHook ju mund të shtyni një skedar lokal në smb-pikë.
Analiza e operatorit personal
Dhe ne u afruam për të parë se si është bërë TelegramBotSendMessage
Kod commons/operators.py me operatorin aktual:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Këtu, si çdo gjë tjetër në Airflow, gjithçka është shumë e thjeshtë:
- Trashëguar nga
BaseOperator, i cili zbaton mjaft gjëra specifike për rrjedhën e ajrit (shikoni kohën tuaj të lirë) - Fushat e deklaruara
template_fields, në të cilën Jinja do të kërkojë makro për të përpunuar. - Rregulloi argumentet e duhura për
__init__(), vendosni parazgjedhjet aty ku është e nevojshme. - Nuk harruam as inicializimin e paraardhësve.
- Hapi grepin përkatës
TelegramBotHookmori një objekt klient prej tij. - Metoda e anashkaluar (e ripërcaktuar).
BaseOperator.execute(), të cilin Airfow do ta tërheqë kur të vijë koha për të nisur operatorin - në të ne do të zbatojmë veprimin kryesor, duke harruar të identifikohemi. (Meqë ra fjala, ne hyjmë menjëherëstdoutОstderr- Rrjedha e ajrit do të përgjojë gjithçka, do ta mbështjellë bukur, do ta zbërthejë aty ku është e nevojshme.)
Le të shohim se çfarë kemi commons/hooks.py. Pjesa e parë e skedarit, me vetë grepin:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientUnë as nuk di se çfarë të shpjegoj këtu, do të shënoj vetëm pikat e rëndësishme:
- Ne trashëgojmë, mendojmë për argumentet - në shumicën e rasteve do të jetë një:
conn_id; - Mbështetja e metodave standarde: Kufizova veten
get_conn(), në të cilën marr parametrat e lidhjes me emër dhe thjesht marr seksioninextra(kjo është një fushë JSON), në të cilën unë (sipas udhëzimeve të mia!) vendos tokenin bot Telegram:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Unë krijoj një shembull tonën
TelegramBot, duke i dhënë një shenjë specifike.
Kjo eshte e gjitha. Ju mund të merrni një klient nga një goditje duke përdorur TelegramBotHook().clent ose TelegramBotHook().get_conn().
Dhe pjesa e dytë e skedarit, në të cilën bëj një mikrombështjellës për Telegram REST API, në mënyrë që të mos zvarritet e njëjta për një metodë sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Mënyra e duhur është t'i shtoni të gjitha:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- në shtojcë, vendoseni në një depo publike dhe jepjani në Open Source.
Ndërsa po studionim të gjitha këto, përditësimet tona të raportit dështuan me sukses dhe më dërguan një mesazh gabimi në kanal. Unë do të kontrolloj për të parë nëse është gabim ...

Diçka u prish në dozhin tonë! A nuk është kjo ajo që ne prisnim? Pikërisht!
Do të derdhësh?
A mendoni se më ka munguar diçka? Duket se ka premtuar transferimin e të dhënave nga SQL Server në Vertica, dhe më pas e ka marrë dhe është larguar nga tema, i poshtër!
Kjo mizori ishte e qëllimshme, thjesht duhej të deshifroja disa terminologji për ju. Tani mund të shkoni më tej.
Plani ynë ishte ky:
- Bëj gjumë
- Gjeneroni detyra
- Shihni sa e bukur është gjithçka
- Cakto numrat e sesioneve për mbushjet
- Merrni të dhëna nga SQL Server
- Vendosni të dhënat në Vertica
- Mblidhni statistika
Kështu që, për t'i vënë në punë të gjitha këto, bëra një shtesë të vogël në tonën docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyAty ngremë:
- Vertica si pritës
dwhme cilësimet më të paracaktuara, - tre instanca të SQL Server,
- ne i mbushim bazat e të dhënave në këtë të fundit me disa të dhëna (në asnjë rast mos i shikoni
mssql_init.py!)
Ne nisim të gjitha të mirat me ndihmën e një komande pak më të komplikuar se herën e kaluar:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3ĂfarĂ« gjeneroi randomizuesi ynĂ« i mrekullueshĂ«m, mund ta pĂ«rdorni artikullin Data Profiling/Ad Hoc Query:

Gjëja kryesore nuk është t'ua tregosh analistët
shtjelloj mbi seancat ETL Nuk do, gjithçka është e parëndësishme atje: ne bëjmë një bazë, ka një shenjë në të, ne mbështjellim gjithçka me një menaxher konteksti dhe tani bëjmë këtë:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15sesioni.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passKa ardhur koha mbledhin të dhënat tona nga tavolinat tona njëqind e gjysmë. Le ta bëjmë këtë me ndihmën e linjave shumë jo modeste:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Me ndihmën e një grepi marrim nga Airflow
pymssql-lidhe - Le të zëvendësojmë një kufizim në formën e një date në kërkesë - ai do të hidhet në funksion nga motori i shabllonit.
- Duke ushqyer kërkesën tonë
pandaskush do të na marrëDataFrame- do të jetë e dobishme për ne në të ardhmen.
Unë jam duke përdorur zëvendësimin
{dt}në vend të një parametri të kërkesës%sjo sepse jam një Pinoku i keq, por sepsepandasnuk mund të përballojëpymssqldhe rrëshqet e funditparams: Listedhe pse me të vërtetë dëshirontuple.
Gjithashtu vini re se zhvilluesipymssqlvendosi të mos e mbështeste më dhe është koha për t'u larguarpyodbc.
Le të shohim se me çfarë Airflow i mbushi argumentet e funksioneve tona:

Nëse nuk ka të dhëna, atëherë nuk ka kuptim të vazhdohet. Por është gjithashtu e çuditshme të konsiderohet e suksesshme mbushja. Por ky nuk është një gabim. A-ah-ah, çfarë të bëjmë?! Dhe ja çfarë:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException i thotë Airflow se nuk ka gabime, por ne e kapërcejmë detyrën. Ndërfaqja nuk do të ketë një katror të gjelbër ose të kuq, por rozë.
Le të hedhim të dhënat tona kolona të shumta:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Domethënë
- Baza e të dhënave nga e cila morëm porositë,
- ID e sesionit tonë të përmbytjes (do të jetë ndryshe për çdo detyrë),
- Një hash nga burimi dhe ID-ja e porosisë - në mënyrë që në bazën e të dhënave përfundimtare (ku gjithçka derdhet në një tabelë) të kemi një ID unike të porosisë.
Hapi i parafundit mbetet: derdhni gjithçka në Vertica. Dhe, çuditërisht, një nga mënyrat më spektakolare dhe efikase për ta bërë këtë është përmes CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Ne po bëjmë një marrës të veçantë
StringIO. pandasme dashamirësi do të vendosë tonaDataFramesiCSV-linjat.- Le të hapim një lidhje me Vertica-n tonë të preferuar me një goditje.
- Dhe tani me ndihmën
copy()dërgoni të dhënat tona direkt në Vertika!
Ne do të marrim nga shoferi sa rreshta u mbushën dhe do t'i tregojmë menaxherit të sesionit që gjithçka është në rregull:
session.loaded_rows = cursor.rowcount
session.successful = TrueKjo eshte e gjitha.
Në shitje, ne krijojmë pllakën e synuar me dorë. Këtu i lejova vetes një makinë të vogël:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)po perdor
VerticaOperator()Unë krijoj një skemë të dhënash dhe një tabelë (nëse nuk ekzistojnë tashmë, sigurisht). Gjëja kryesore është të rregulloni saktë varësitë:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadPërmbledhja
- Epo, - tha miu i vogël, - a nuk është, tani
A jeni i bindur se unë jam kafsha më e tmerrshme në pyll?
Julia Donaldson, Gruffalo
Unë mendoj se nëse kolegët e mi dhe unë do të kishim një konkurrencë: kush do të krijojë dhe do të nisë shpejt një proces ETL nga e para: ata me SSIS-in e tyre dhe një mi dhe unë me Airflow ... Dhe atëherë do të krahasonim gjithashtu lehtësinë e mirëmbajtjes ... Uau, mendoj se do të pranoni që unë do t'i mund në të gjitha frontet!
Nëse pak më seriozisht, atëherë Apache Airflow - duke përshkruar proceset në formën e kodit të programit - bëri punën time shumë më të rehatshme dhe të këndshme.
Zgjerimi i tij i pakufizuar, si në aspektin e shtojcave dhe predispozicionit ndaj shkallëzimit, ju jep mundësinë të përdorni Airflow në pothuajse çdo fushë: edhe në ciklin e plotë të mbledhjes, përgatitjes dhe përpunimit të të dhënave, madje edhe në lëshimin e raketave (në Mars, të kurs).
Pjesa përfundimtare, referenca dhe informacioni
Grabujë që kemi mbledhur për ju
start_date. Po, kjo tashmë është një meme lokale. Nëpërmjet argumentit kryesor të Dougstart_datekalojnë të gjithë. Shkurtimisht, nëse specifikoni nëstart_datedata aktuale dheschedule_interval- një ditë, atëherë DAG do të fillojë nesër jo më herët.start_date = datetime(2020, 7, 7, 0, 1, 2)Dhe nuk ka më probleme.
Ekziston një gabim tjetër i kohës së ekzekutimit të lidhur me të:
Task is missing the start_date parameter, e cila më shpesh tregon se keni harruar të lidheni me operatorin dag.- Të gjitha në një makinë. Po, dhe bazat (vetë Airflow dhe veshja jonë), dhe një server në internet, dhe një planifikues dhe punëtorë. Dhe madje funksionoi. Por me kalimin e kohës, numri i detyrave për shërbimet u rrit dhe kur PostgreSQL filloi t'i përgjigjet indeksit në 20 s në vend të 5 ms, ne e morëm atë dhe e morëm.
- Ekzekutuesi lokal. Po, ne ende jemi ulur mbi të dhe tashmë kemi ardhur në buzë të humnerës. LocalExecutor na ka mjaftuar deri më tani, por tani është koha për t'u zgjeruar me të paktën një punëtor dhe do të duhet të punojmë shumë për të kaluar në CeleryExecutor. Dhe duke pasur parasysh faktin se mund të punoni me të në një makinë, asgjë nuk ju ndalon të përdorni Seleno edhe në një server, i cili "natyrisht, nuk do të hyjë kurrë në prodhim, sinqerisht!"
- Mospërdorimi mjete të integruara:
- Lidhjet për të ruajtur kredencialet e shërbimit,
- SLA mungon për t'iu përgjigjur detyrave që nuk funksionuan në kohë,
- xcom për shkëmbimin e meta të dhënave (i thashë metatë dhëna!) ndërmjet detyrave dag.
- Abuzimi me postën. Epo, çfarë mund të them? Alarmet u vendosën për të gjitha përsëritjet e detyrave të rënë. Tani Gmail i punës sime ka >90 mijë emaile nga Airflow, dhe gryka e postës në ueb refuzon të marrë dhe të fshijë më shumë se 100 në të njëjtën kohë.
Më shumë gracka:
Më shumë mjete automatizimi
Në mënyrë që ne të punojmë edhe më shumë me kokën dhe jo me duart tona, Airflow ka përgatitur për ne këtë:
- - ai ka ende statusin e Eksperimentit, gjë që nuk e pengon të punojë. Me të, ju jo vetëm që mund të merrni informacione rreth dags dhe detyrave, por gjithashtu mund të ndaloni/filloni një dag, të krijoni një DAG Run ose një pishinë.
- - shumë mjete janë të disponueshme përmes linjës së komandës që nuk janë thjesht të papërshtatshme për t'u përdorur përmes WebUI, por në përgjithësi mungojnë. Për shembull:
backfillnevojiten për të rifilluar rastet e detyrave.
PĂ«r shembull, erdhĂ«n analistĂ«t dhe thanĂ«: âDhe ti shok, ke marrĂ«zi nĂ« tĂ« dhĂ«nat nga 1 deri nĂ« 13 janar! Rregullojeni, rregullojeni, rregulloni, rregullojeni!" Dhe ju jeni njĂ« gatim i tillĂ«:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- ShĂ«rbimi bazĂ«:
initdb,resetdb,upgradedb,checkdb. run, i cili ju lejon të ekzekutoni një detyrë shembulli dhe madje të shënoni në të gjitha varësitë. Për më tepër, ju mund ta ekzekutoni atë nëpërmjetLocalExecutor, edhe nëse keni një grup Selino.- Bën pothuajse të njëjtën gjë
test, vetem edhe ne baza nuk shkruan asgje. connectionslejon krijimin masiv të lidhjeve nga guaska.
- - një mënyrë mjaft e fortë e ndërveprimit, e cila është menduar për shtojca, dhe jo për t'u mbushur me duar të vogla. Por kush do të na ndalojë të shkojmë
/home/airflow/dags, vraponiipythondhe filloni të ngatërroni? Për shembull, mund të eksportoni të gjitha lidhjet me kodin e mëposhtëm:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Lidhja me bazën e meta të dhënave të Airflow. Unë nuk rekomandoj t'i shkruani atij, por marrja e gjendjeve të detyrave për metrika të ndryshme specifike mund të jetë shumë më e shpejtë dhe më e lehtë sesa përmes ndonjë prej API-ve.
Le të themi se jo të gjitha detyrat tona janë idempotente, por ndonjëherë mund të bien dhe kjo është normale. Por disa bllokime tashmë janë të dyshimta dhe do të ishte e nevojshme të kontrolloheshin.
Kujdes SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
Referencat
Dhe sigurisht, dhjetë lidhjet e para nga lëshimi i Google janë përmbajtja e dosjes Airflow nga faqeshënuesit e mi.
- - Sigurisht, duhet të fillojmë me zyrën. dokumentacionin, por kush i lexon udhëzimet?
- - Epo, të paktën lexoni rekomandimet nga krijuesit.
- - fillimi: ndërfaqja e përdoruesit në foto
- - konceptet bazë janë përshkruar mirë, nëse (papritmas!) nuk keni kuptuar diçka nga unë.
- - një udhëzues i shkurtër për vendosjen e një grupi të rrjedhës së ajrit.
- - pothuajse i njëjti artikull interesant, përveç ndoshta më shumë formalizëm dhe më pak shembuj.
- â nĂ« lidhje me punĂ«n nĂ« lidhje me Selino.
- - për idempotencën e detyrave, ngarkimin me ID në vend të datës, transformimin, strukturën e skedarit dhe gjëra të tjera interesante.
- - vartësitë e detyrave dhe Rregulla e këmbëzës, të cilat i përmenda vetëm kalimthi.
- - si të kapërceni disa "punë siç synohet" në planifikues, të ngarkoni të dhënat e humbura dhe t'i jepni përparësi detyrave.
- â pyetje tĂ« dobishme SQL pĂ«r metadatat e Airflow.
- - ekziston një seksion i dobishëm për krijimin e një sensori të personalizuar.
- â njĂ« shĂ«nim i shkurtĂ«r interesant rreth ndĂ«rtimit tĂ« njĂ« infrastrukture nĂ« AWS pĂ«r ShkencĂ«n e tĂ« DhĂ«nave.
- - gabime të zakonshme (kur dikush ende nuk i lexon udhëzimet).
- - buzëqeshni se si njerëzit përdorin patericë për ruajtjen e fjalëkalimeve, megjithëse mund të përdorni vetëm Lidhjet.
- - përcjellja e nënkuptuar e DAG, futja e kontekstit në funksione, përsëri për varësitë, dhe gjithashtu për anashkalimin e nisjeve të detyrave.
- - në lidhje me përdorimin
default argumentsОparamsnë shabllone, si dhe Variablat dhe Lidhjet. - - një histori se si planifikuesi po përgatitet për Airflow 2.0.
- - një artikull paksa i vjetëruar në lidhje me vendosjen e grupit tonë në
docker-compose. - - detyra dinamike duke përdorur shabllone dhe përcjellje të kontekstit.
- â Njoftimet standarde dhe tĂ« personalizuara me postĂ« dhe Slack.
- - Detyrat e degëzimit, makro dhe XCom.
Dhe lidhjet e përdorura në artikull:
- - mbajtësit e vendeve të disponueshme për përdorim në shabllone.
- â Gabimet e zakonshme gjatĂ« krijimit tĂ« damave.
- -
docker-composepĂ«r eksperimentim, korrigjim dhe mĂ« shumĂ«. - â MbĂ«shtjellĂ«si Python pĂ«r Telegram REST API.
Burimi: www.habr.com




