Kaixo, Dmitry Logvinenko naiz - Vezet enpresa taldeko Analitika Saileko datu-ingeniaria.
ETL prozesuak garatzeko tresna zoragarri baten berri emango dizut - Apache Airflow. Baina Airflow hain polifazetikoa eta polifazetikoa da, non gehiago aztertu beharko zenuke datu-fluxuetan parte hartzen ez baduzu ere, baina aldian-aldian edozein prozesu abiarazi eta haien exekuzioa kontrolatu beharra daukazu.
Eta bai, kontatu ez ezik, erakutsi ere egingo dut: programak kode, pantaila-argazki eta gomendio asko ditu.

Airflow / Wikimedia Commons hitza Googlen ikusten duzuna
Edukien taula
Sarrera
Apache Airflow Django bezalakoa da:
- python idatzita
- administrazio panel bikaina dago,
- mugagabe zabal daiteke
- hobea baino ez, eta helburu guztiz ezberdinetarako egin zen, hots (kat aurretik idatzita dagoen bezala):
- zereginak exekutatu eta kontrolatu makina kopuru mugagabean (Apioa / Kubernetes askok eta zure kontzientziari esker)
- Python kodea idazteko eta ulertzeko oso erraza den lan-fluxuaren sorkuntza dinamikoarekin
- eta edozein datu-base eta API elkarren artean konektatzeko gaitasuna prest egindako osagaiak eta etxeko pluginak erabiliz (oso erraza da).
Apache Airflow honela erabiltzen dugu:
- Hainbat iturritako datuak biltzen ditugu (SQL Server eta PostgreSQL instantzia asko, hainbat API aplikazio-neurriekin, baita 1C ere) DWH eta ODSn (Vertica eta Clickhouse ditugu).
- zein aurreratua
cron, ODSn datuak finkatzeko prozesuak hasten dituena, eta horien mantentze-lanak ere kontrolatzen ditu.
Duela gutxi arte, gure beharrak 32 nukleo eta 50 GB RAM zituen zerbitzari txiki batek estali zituen. Airflow-en, honek funtzionatzen du:
- gehiago 200 egun (benetan lan-fluxuak, zeinetan zereginak bete genituen),
- bakoitzean batez beste 70 zeregin,
- ontasun hau hasten da (batez bestekoa ere) orduan behin.
Eta nola zabaldu ginenari buruz, jarraian idatziko dut, baina orain definitu dezagun ebatziko dugun über-arazoa:
Jatorrizko hiru SQL zerbitzari daude, bakoitzak 50 datu-baserekin - proiektu baten instantziak, hurrenez hurren, egitura bera dute (ia nonahi, mua-ha-ha), hau da, bakoitzak Eskaerak taula bat dauka (zorionez, taula horrekin). izena edozein negoziotara bultza daiteke). Datuak zerbitzu-eremuak gehituz hartzen ditugu (iturburu-zerbitzaria, iturburu-datu-basea, ETL ataza ID) eta inozoki botatzen ditugu, esate baterako, Verticara.
Goazen!
Zati nagusia, praktikoa (eta apur bat teorikoa)
Zergatik egiten dugu (eta zu)
Zuhaitzak handiak zirenean eta ni sinplea nintzenean SQL-schik Errusiako txikizkako merkataritza batean, ETL prozesuak edo datu-fluxuak iruzur egin genituen gure eskura ditugun bi tresna erabiliz:
- Informatika Power Center - Oso zabaltzen den sistema, izugarri produktiboa, bere hardwarea, bere bertsioa duena. Jainkoak debekatu nuen bere gaitasunen %1 erabili nuen. Zergatik? Bada, lehenik eta behin, interfaze honek, nonbait, 380ko hamarkadakoa, mentalki presioa egiten zigun. Bigarrenik, tramankulu hau oso prozesu dotoreetarako, osagaien berrerabilpen amorratuetarako eta beste enpresa-trikimailu oso garrantzitsuak egiteko diseinatuta dago. Zer kostatzen den, Airbus AXNUMX / urteko hegala bezala, ez dugu ezer esango.
Kontuz, pantaila-argazkiak 30 urtetik beherako pertsonei min pixka bat egin diezaieke

- SQL Server Integrazio Zerbitzaria - kamarada hau gure proiektu barneko fluxuetan erabili dugu. Beno, egia esan: dagoeneko erabiltzen dugu SQL Server, eta nolabait zentzugabea izango litzateke bere ETL tresnak ez erabiltzea. Bertan dena ona da: bai interfazea ederra da, bai aurrerapen txostenak... Baina ez da horregatik maite ditugu software produktuak, oh, ez horregatik. Bertsioa
dtsx(zein da XML gordetzean nodoak nahastuta) egin dezakegu, baina zertarako? Zer moduz zerbitzari batetik bestera ehunka taula arrastatuko dituen zeregin pakete bat egitea? Bai, zer ehun, zure hatz erakuslea eroriko zaizu hogei zatitatik, saguaren botoian klik eginez. Baina zalantzarik gabe modanagoa dirudi:
Zalantzarik gabe, irteerak bilatu ditugu. Kasua ere ia norberak idatzitako SSIS pakete-sorgailu batera iritsi zen...
… eta gero lan berri batek aurkitu ninduen. Eta Apache Airflow-ek aurreratu ninduen.
ETL prozesuen deskribapenak Python kode sinpleak direla jakin nuenean, ez nuen pozez dantza egin. Horrela datu-korronteak bertsionatu eta desberdintzen ziren, eta ehunka datu-baseetatik egitura bakarreko taulak helburu bakarrean sartzea Python kodearen kontua bihurtu zen 13 ”pantaila eta erdi edo bitan.
Klusterra muntatzea
Ez dezagun antolatu guztiz haurtzaindegia, eta ez dezagun hitz egin hemen guztiz ageriko gauzei buruz, adibidez, Airflow instalatzea, aukeratutako datu-basea, Apioa eta kaietan deskribatutako beste kasu batzuei buruz.
Esperimentuak berehala has gaitezen, zirriborratu nuen docker-compose.yml zeinetan:
- Goazen benetan Airflow: Programatzailea, Web zerbitzaria. Flower ere bertan biraka egingo du Apioaren zereginak kontrolatzeko (dagoeneko sartu delako
apache/airflow:1.10.10-python3.7, baina ez zaigu axola) - PostgreSQL, zeinetan Airflow-ek bere zerbitzuaren informazioa idatziko du (planifikatzailearen datuak, exekuzio-estatistikak, etab.), eta Apioak amaitutako zereginak markatuko ditu;
- Birbanaketa, Apioaren zereginen bitartekari gisa jardungo duena;
- Apioaren langilea, zereginen zuzeneko exekuzioan arituko dena.
- Karpetara
./dagsgure fitxategiak gehituko ditugu dags-en deskribapenarekin. Hegan bertan jasoko dira, beraz, ez dago zertan pila osoa malabarerik egin doministiku bakoitzaren ondoren.
Zenbait tokitan, adibideetako kodea ez da guztiz erakusten (testua ez nahasteko), baina nonbait aldatu egiten da prozesuan. Lan-kodeen adibide osoak biltegian aurki daitezke .
Docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerOharrak:
- Konposizioaren muntaketan, irudi ezagunean oinarritu nintzen neurri handi batean - Ziurtatu egiaztatzea. Agian ez duzu beste ezer behar zure bizitzan.
- Aire-fluxuaren ezarpen guztiak erabilgarri daude bidez ez ezik
airflow.cfg, baina baita ingurune aldagaien bidez ere (garatzaileei esker), gaiztoki aprobetxatu nituenak. - Jakina, ez dago produkziorako prest: nahita ez nituen ontzietan bihotz taupadak jarri, ez nuen segurtasunarekin trabarik jarri. Baina gure esperimentatzaileentzat egokia den minimoa egin nuen.
- Apuntatu hori:
- Dag karpetak eskuragarri egon behar du programatzaileak zein langileak.
- Gauza bera gertatzen da hirugarrenen liburutegi guztietan: guztiak programatzaile eta langileak dituzten makinetan instalatu behar dira.
Beno, orain erraza da:
$ docker-compose up --scale worker=3Dena igo ondoren, web interfazeei begiratu dezakezu:
- Aire fluxua:
- Lorea:
Oinarrizko kontzeptuak
"Dag" guzti hauetan ezer ulertzen ez bazenuen, hona hemen hiztegi labur bat:
- Antolatzaileak - Airflow-eko osabarik garrantzitsuena, robotek gogor lan egiten dutela kontrolatzen duena, eta ez pertsona batek: ordutegia kontrolatzen du, eguneratzen ditu, lanak abiarazten ditu.
Oro har, bertsio zaharretan, memoriarekin arazoak izan zituen (ez, ez amnesia, baina filtrazioak) eta ondarearen parametroa konfigurazioetan ere geratu zen.
run_duration- bere berrabiarazteko tartea. Baina orain dena ondo dago. - DAG (aka "dag") - "zuzendutako grafiko aziklikoa", baina definizio horrek jende gutxiri esango dio, baina, hain zuzen ere, elkarren artean elkarreragiten duten zereginetarako edukiontzi bat da (ikus behean) edo SSIS-en paketearen eta Informatica-n Workflow-en antzeko bat da. .
Dagez gain, oraindik ere azpidagak egon daitezke, baina ziurrenik ez gara iritsiko.
- DAG Korrika - hasieratutako dag, berea esleitzen zaiona
execution_date. Dag bereko Dagranek paraleloan lan egin dezakete (zure zereginak idempotent egin badituzu, noski). - Operator ekintza zehatz bat egiteaz arduratzen diren kode zatiak dira. Hiru operadore mota daude:
- ekintzagure gogokoena bezala
PythonOperator, edozein (baliozko) Python kode exekutatu dezakeena; - transferitzeko, datuak leku batetik bestera garraiatzen dituztenak, esate baterako,
MsSqlToHiveTransfer; - sentsore bestetik, erreakzionatzeko edo dagaren exekuzio gehiago moteltzeko aukera emango dizu, gertaera bat gertatu arte.
HttpSensorzehaztutako amaiera-puntua atera dezake, eta nahi den erantzuna zain dagoenean, hasi transferentziaGoogleCloudStorageToS3Operator. Buru jakintsu batek galdetuko du: “zergatik? Azken finean, errepikapenak zuzenean egin ditzakezu operadorean!». Eta gero, esekitako operadoreekin zereginen multzoa ez estaltzeko. Sentsorea abiarazi, egiaztatu eta hil egiten da hurrengo saiakeraren aurretik.
- ekintzagure gogokoena bezala
- Task - deklaratutako operadoreak, mota edozein dela ere, eta dag-ari atxikitakoak zeregin mailara igotzen dira.
- ataza instantzia - Antolatzaile orokorrak erabaki zuenean, zereginak borrokara bidaltzeko garaia zela interprete-langileei (bere bertan, erabiltzen badugu
LocalExecutoredo urruneko nodo batera, kasuanCeleryExecutor), testuinguru bat esleitzen die (hau da, aldagai multzo bat - exekuzio-parametroak), komando edo kontsulta txantiloiak zabaltzen ditu eta biltzen ditu.
Zereginak sortzen ditugu
Lehenik eta behin, gure dougaren eskema orokorra azalduko dugu, eta gero eta xehetasunetan murgilduko gara gero eta gehiago, irtenbide ez-hutsak aplikatzen ditugulako.
Beraz, bere forma errazenean, honelako itxura izango du:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Asma dezagun:
- Lehenik eta behin, beharrezko liburuak inportatzen ditugu eta beste zerbait;
sql_server_ds- IsList[namedtuple[str, str]]Airflow Connections-en konexioen izenekin eta gure plaka hartuko dugun datu-baseekin;dag- gure dagaren iragarkia, derrigorrez egon behar duenaglobals(), bestela Airflow-ek ez du aurkituko. Dougek ere esan behar du:- zein da bere izena
orders- izen hau web interfazean agertuko da, - uztailaren XNUMXko gauerditik aurrera lan egingo duela,
- eta exekutatu beharko luke, gutxi gorabehera, 6 orduz behin (hemengo gogorrentzat ordez
timedelta()onargarriacron-lerroa0 0 0/6 ? * * *, hain cool-a bezalako esamolde bat@daily);
- zein da bere izena
workflow()egingo du lan nagusia, baina ez orain. Oraingoz, gure testuingurua erregistrora botako dugu.- Eta orain zereginak sortzeko magia sinplea:
- gure iturrietatik igarotzen gara;
- hasieratu
PythonOperator, gure manikia exekutatuko duenaworkflow(). Ez ahaztu zereginaren izen esklusibo bat (dag barruan) zehaztea eta dag bera lotzea. Banderaprovide_contextaldi berean, argumentu gehigarriak isuriko ditu funtziora, eta arretaz bilduko ditugu erabiliz**context.
Oraingoz, hori da dena. Lortu duguna:
- dag berria web interfazean,
- paraleloan gauzatuko diren ehun eta erdi zeregin (Aire-fluxua, Apioaren ezarpenak eta zerbitzariaren ahalmenak ahalbidetzen badu).
Beno, ia lortu.

Nork instalatuko ditu mendekotasunak?
Hori guztia errazteko, izorratu nuen docker-compose.yml prozesatzea requirements.txt nodo guztietan.
Orain desagertu da:

Karratu grisak programatzaileak prozesatutako ataza-instantziak dira.
Pixka bat itxaron dugu, zereginak langileek hartzen dituzte:

Berdeek, noski, ongi bukatu dute euren lana. Gorriek ez dute arrakasta handirik.
Bide batez, gure produktuan ez dago karpetarik
./dags, ez dago makinen arteko sinkronizaziorik - dag guztiak daudegitgure Gitlab-en, eta Gitlab CI-k eguneraketak banatzen ditu makinetan bateratzeanmaster.
Loreari buruz pixka bat
Langileak txupeteak kolpatzen ari diren bitartean, gogoan dezagun zerbait erakutsi diezagukeen beste tresna bat: Lorea.
Langile-nodoei buruzko laburpen informazioa duen lehen orrialdea:

Lanera joandako zereginak dituen orrialderik biziena:

Gure broker-aren egoera duen orrialde aspergarriena:

Orrialde distiratsuena zereginen egoera grafikoekin eta haien exekuzio denborarekin dago:

Azpikargatutakoa kargatzen dugu
Beraz, zeregin guztiak bete dira, zaurituak eraman ditzakezu.

Eta zauritu asko egon ziren, arrazoi bategatik edo besteagatik. Airflow-aren erabilera zuzenaren kasuan, lauki horiek beraiek adierazten dute datuak behin betiko ez zirela iritsi.
Erregistroa ikusi eta eroritako ataza-instantziak berrabiarazi behar dituzu.
Edozein laukitan klik eginez gero, eskura ditugun ekintzak ikusiko ditugu:

Hartu eta egin dezakezu Clear eroria. Hau da, ahaztu egiten zaigu hor zerbaitek huts egin duela, eta instantzia-zeregin bera programatzailera joango da.

Argi dago saguarekin lauki gorri guztiekin egitea ez dela oso gizatiarra; hori ez da Airflow-tik espero duguna. Jakina, suntsipen masiboko armak ditugu: Browse/Task Instances

Hautatu dena aldi berean eta berrezarri dezagun zerora, egin klik elementu egokian:

Garbitu ondoren, gure taxiek itxura hau dute (dagoeneko programatzaileak programatzeko zain daude):

Konexioak, kakoak eta bestelako aldagaiak
Hurrengo DAG-ari begiratzeko garaia da, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Denek egin al dute inoiz txostenen eguneratzerik? Hau da berriro ere: datuak nondik ateratzeko iturrien zerrenda dago; zerrenda bat dago non jarri; ez ahaztu dena gertatu edo hautsi zenean bozina jotzea (beno, hau ez da guri buruz, ez).
Azter ditzagun berriro fitxategia eta ikus ditzagun gauza ilun berriak:
from commons.operators import TelegramBotSendMessage- ezerk ez digu eragozten gure operadoreak egitea, eta hori aprobetxatu genuen Unblocked-era mezuak bidaltzeko bilgarri txiki bat eginez. (Gehiago hitz egingo dugu operadore honi buruz jarraian);default_args={}- dag-ek argumentu berdinak banatu ditzake bere operadore guztiei;to='{{ var.value.all_the_kings_men }}'-eremuatoez dugu gogor kodetua izango, baina dinamikoki sortuko dugu Jinja eta mezu elektronikoen zerrenda duen aldagai bat erabiliz, kontu handiz jarri dudanaAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— operadorea martxan jartzeko baldintza. Gure kasuan, eskutitzak nagusiengana hegan egingo du mendekotasun guztiak funtzionatu badira bakarrik arrakastaz;tg_bot_conn_id='tg_main'- argudioakconn_idonartzen ditugun konexio IDakAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- Telegram-eko mezuak hutsik dauden zereginak badaude soilik ihes egingo dute;task_concurrency=1- Zeregin bateko hainbat ataza-instantzia aldi berean abiaraztea debekatzen dugu. Bestela, hainbat aldi berean abian jartzea lortuko duguVerticaOperator(mahai bati begira);report_update >> [email, tg]- guztiakVerticaOperatorgutunak eta mezuak bidaltzean bat egiten dute, honela:

Baina jakinarazleen operadoreek abiarazteko baldintza desberdinak dituztenez, bakarrak funtzionatuko du. Zuhaitz ikuspegian, dena apur bat gutxiago ikusten da:

Hitz batzuk esango ditut makroak eta haien lagunak - aldagaiak.
Makroak Jinja leku-markak dira, eta hainbat informazio erabilgarria ordezka dezakete operadorearen argumentuetan. Adibidez, honela:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} testuinguru aldagaiaren edukietara zabalduko da execution_date formatuan YYYY-MM-DD: 2020-07-14. Zatirik onena da testuinguru-aldagaiak ataza-instantzia zehatz batean iltzatuta daudela (Zuhaitz ikuspegiko karratu batean), eta berrabiarazten denean, leku-markak balio berdinetara zabalduko dira.
Esleitutako balioak ataza-instantzia bakoitzean Errendatutako botoia erabiliz ikus daitezke. Hau da gutun bat bidaltzeko zeregina:

Eta horrela mezu bat bidaltzeko zereginean:

Eskuragarri dagoen azken bertsiorako integratutako makroen zerrenda osoa eskuragarri dago hemen:
Gainera, pluginen laguntzaz, geure makroak deklara ditzakegu, baina hori beste istorio bat da.
Aurrez zehaztutako gauzez gain, gure aldagaien balioak ordezka ditzakegu (dagoeneko erabili nuen goiko kodean). Sortu dezagun Admin/Variables gauza pare bat:

Erabili dezakezun guztia:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Balioa eskalar bat izan daiteke, edo JSON ere izan daiteke. JSON kasuan:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}erabili nahi duzun gakorako bidea: {{ var.json.bot_config.bot.token }}.
Hitz bat esango dut literalki eta pantaila-argazki bat erakutsiko dut konexio. Hemen dena oinarrizkoa da: orrialdean Admin/Connections konexio bat sortzen dugu, gure saio-hasiera / pasahitzak eta parametro zehatzagoak gehitzen ditugu bertan. Horrela:

Pasahitzak enkriptatu daitezke (lehenetsia baino sakonago), edo konexio mota kanpoan utzi dezakezu (nik egin nuen bezala). tg_main) - Izan ere, mota-zerrenda Airflow ereduetan kablekatuta dago eta ezin dela zabaldu iturburu-kodeetan sartu gabe (bat-batean ez badut zerbait Googlen bilatu, zuzendu mesedez), baina ezerk ez digu utziko kredituak lortzea. izena.
Izen bereko hainbat konexio ere egin ditzakezu: kasu honetan, metodoa BaseHook.get_connection(), izenez konexioak lortzen dizkigu, emango du ausaz hainbat izenetatik (logikoagoa litzateke Round Robin egitea, baina utzi dezagun Airflow garatzaileen kontzientzia).
Aldagaiak eta konexioak tresna politak dira, zalantzarik gabe, baina garrantzitsua da oreka ez galtzea: zure fluxuen zein zati gordetzen dituzun kodean bertan, eta zein zati ematen dizkiozun Airflow-i biltegiratzeko. Alde batetik, komenigarria izan daiteke balioa azkar aldatzea, adibidez, posta-kutxa bat, UI bidez. Bestalde, hau oraindik saguaren klikaren itzulera da, bertatik kendu nahi genuen (nik).
Konexioekin lan egitea da zereginetako bat amuak. Oro har, Airflow amuak hirugarrenen zerbitzu eta liburutegietara konektatzeko puntuak dira. Adibidez, JiraHook Bezero bat irekiko digu Jirarekin elkarreragiteko (zereginak aurrera eta atzera mugi ditzakezu), eta honen laguntzaz SambaHook tokiko fitxategi bat bultza dezakezu smb-puntua.
Operadore pertsonalizatua analizatzen
Eta nola egiten den ikustera gerturatu ginen TelegramBotSendMessage
Code commons/operators.py benetako operadorearekin:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Hemen, Airflow-en beste guztia bezala, dena oso erraza da:
- Oinordetzan hartua
BaseOperator, Airflow-eko gauza espezifiko batzuk ezartzen dituena (begiratu zure aisialdia) - Deklaratutako eremuak
template_fields, eta bertan Jinjak prozesatzeko makroak bilatuko ditu. - Argudio egokiak antolatu
__init__(), ezarri lehenetsiak beharrezkoa denean. - Arbasoaren hasieratzeaz ere ez genuen ahaztu.
- Dagokion amua ireki
TelegramBotHookbezero objektu bat jaso zuen bertatik. - Gaingabetutako (birdefinitutako) metodoa
BaseOperator.execute(), Airfow-ek operadorea abiarazteko unea iristean kiskatuko duena - bertan ekintza nagusia ezarriko dugu, saioa hasteko ahaztuta. (Saioa egiten dugu, bide batez, berehalastdoutиstderr- Aire-fluxuak dena atzemango du, ederki bilduko du, beharrezkoa den lekuan deskonposatuko du.)
Ea zer daukagun commons/hooks.py. Fitxategiaren lehen zatia, kakoarekin berarekin:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientHemen ere ez dakit zer azaldu, puntu garrantzitsuak ohartaraziko ditut:
- Oinordetzan hartzen dugu, pentsatu argudioak - kasu gehienetan bat izango da:
conn_id; - Metodo estandarrak gainditzea: mugatu dut
get_conn(), zeinetan konexio-parametroak izenez jasotzen ditudan eta atala besterik ez dut lortzenextra(JSON eremua da hau), eta bertan (nire argibideen arabera!) Telegram bot tokena jarri nuen:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Gure instantzia bat sortzen dut
TelegramBot, seinale zehatz bat emanez.
Hori da dena. Bezero bat kako batetik lor dezakezu erabiliz TelegramBotHook().clent edo TelegramBotHook().get_conn().
Eta fitxategiaren bigarren zatia, Telegram REST APIrako mikrobilgarri bat egiten dudana, berdina ez arrastatu metodo baterako sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Modu zuzena dena gehitzea da:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- pluginean, biltegi publiko batean jarri eta Kode Irekiari eman.
Hori guztia aztertzen ari ginen bitartean, gure txostenen eguneratzeek huts egitea lortu zuten eta kanalean errore-mezu bat bidaltzea lortu zuten. Egiaztatuko dut ea gaizki dagoen...

Gure dogean zerbait hautsi da! Ez al da hori espero genuena? Zehazki!
Botatuko duzu?
Zerbait galdu dudala sentitzen al duzu? Badirudi SQL Server-etik Vertica-ra datuak transferitzeko agindu zuela, eta orduan hartu eta gaitik aldendu zen, zitala!
Ankerkeria hau nahita izan zen, terminologia bat deszifratu besterik ez nuen egin behar zuretzat. Orain harago joan zaitezke.
Gure plana hau zen:
- Egin dag
- Sortu zereginak
- Ikusi zein ederra den dena
- Esleitu saio-zenbakiak betegarriei
- Lortu datuak SQL Server-etik
- Jarri datuak Vertican
- Bildu estatistikak
Beraz, hau guztia martxan jartzeko, gehigarri txiki bat egin nuen gurean docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyHor planteatzen dugu:
- Vertica ostalari gisa
dwhezarpen lehenetsienekin, - SQL Server-en hiru instantzia,
- azken honetan datu-baseak datu batzuekin betetzen ditugu (inola ere ez begiratu
mssql_init.py!)
On guztia abiarazten dugu azken aldian baino komando apur bat konplikatuago baten laguntzaz:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Gure miraria ausazkoak sortu duena, elementua erabil dezakezu Data Profiling/Ad Hoc Query:

Gauza nagusia analistei ez erakustea da
landu ETL saioak Ez dut egingo, hor dena hutsala da: oinarri bat egiten dugu, bertan seinale bat dago, dena testuinguru-kudeatzaile batekin biltzen dugu, eta orain hau egiten dugu:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15saioa.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passIritsi da ordua bildu gure datuak gure ehun eta erdi mahaietatik. Egin dezagun hau oso itxurarik gabeko lerroen laguntzaz:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Amu baten laguntzaz Airflow-tik lortzen dugu
pymssql-konektatu - Ordez dezagun data baten formako murrizketa eskaeran - txantiloi motorrak funtziora botako du.
- Gure eskaera elikatzen
pandasnork lortuko gaituDataFrame- baliagarria izango zaigu etorkizunean.
Ordezkapena erabiltzen ari naiz
{dt}eskaera-parametro baten ordez%sez Pinotxo gaiztoa naizelako, baizik etapandasezin maneiatupymssqleta azkena irrist egiten duparams: Listbenetan nahi duen arrentuple.
Kontuan izan ere garatzaileakpymssqlgehiago ez onartzea erabaki zuen, eta alde egiteko garaia dapyodbc.
Ikus dezagun Airflow-ek gure funtzioen argudioak zerekin betetzen dituen:

Daturik ez badago, ez du balio jarraitzeak. Baina bitxia ere bada betetzea arrakastatsua izatea. Baina hau ez da akats bat. A-ah-ah, zer egin?! Eta hona hemen zer:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException Akatsik ez dagoela esango dio Airflow-i, baina zeregina saltatzen dugu. Interfazeak ez du karratu berdea edo gorria izango, arrosa baizik.
Bota ditzagun gure datuak hainbat zutabe:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Hots
- Eskaerak hartu genituen datu-basea,
- Gure uholde saioaren IDa (desberdina izango da zeregin bakoitzerako),
- Iturburuaren eta eskaeraren IDaren hash bat - beraz, azken datu-basean (non dena taula batean isurtzen den) eskaera ID bakarra dugu.
Azkenaurreko urratsa geratzen da: dena bota Verticara. Eta, bitxia bada ere, hori egiteko modurik ikusgarri eta eraginkorrenetako bat CSV bidezkoa da!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Hartzaile berezi bat egiten ari gara
StringIO. pandasmesedez jarriko dugu gureDataFrameinprimakianCSV-lerroak.- Ireki dezagun gure gogoko Verticarako konexioa kako batekin.
- Eta orain laguntzarekin
copy()bidali gure datuak zuzenean Vertika-ra!
Gidariarengandik zenbat lerro bete diren hartuko dugu, eta dena ondo dagoela esango diogu saio-kudeatzaileari:
session.loaded_rows = cursor.rowcount
session.successful = TrueHori da dena.
Salmentan, xede-plaka eskuz sortzen dugu. Hona hemen makina txiki bat onartu nuen:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)erabiltzen ari naiz
VerticaOperator()Datu-basearen eskema eta taula bat sortzen ditut (ez badaude, noski). Gauza nagusia mendekotasunak ondo antolatzea da:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadLaburbilduz
- Beno, - esan zuen sagutxoak, - ez al da, orain
Konbentzituta al zaude basoko animaliarik ikaragarriena naizela?
Julia Donaldson, Gruffalo
Uste dut nire lankideek eta biok lehiaketa bat izango bagenu: nork azkar sortu eta abiaraziko duen ETL prozesu bat hutsetik: haiek beren SSIS eta sagu batekin eta ni Airflow-arekin ... Eta gero mantentze-erraztasuna ere alderatuko genuke ... Aupa, uste dut ados egongo zarela alde guztietan irabaziko ditudala!
Pixka bat serioago bada, orduan Apache Airflow - prozesuak programa-kode moduan deskribatuz - nire lana egin zuen asko erosoagoa eta atseginagoa.
Bere hedagarritasun mugagabeak, bai plug-inen aldetik, bai eskalagarritasunerako joerari dagokionez, Airflow ia edozein arlotan erabiltzeko aukera ematen dizu: datuak biltzeko, prestatzeko eta prozesatzeko ziklo osoan ere, baita suziriak jaurtitzerakoan ere (Martera, ikastaroa).
Azken zatia, erreferentzia eta informazioa
Zuretzat bildu dugun arrastoa
start_date. Bai, hau dagoeneko tokiko meme bat da. Dougen argudio nagusiaren bidezstart_dateguztiak pasatzen dira. Laburbilduz, zehazten baduzustart_dateuneko data, etaschedule_interval- Egun batean, orduan DAG bihar hasiko da ez lehenago.start_date = datetime(2020, 7, 7, 0, 1, 2)Eta arazorik ez.
Beste exekuzio-errore bat dago lotuta:
Task is missing the start_date parameter, gehienetan dag operadoreari lotzea ahaztu zaizula adierazten duena.- Dena makina batean. Bai, eta oinarriak (Airflow bera eta gure estaldura), eta web zerbitzari bat, eta programatzaile bat, eta langileak. Eta funtzionatu ere egin zuen. Baina denborarekin, zerbitzuen ataza kopurua hazi egin zen, eta PostgreSQL indizeari 20 s-tan 5 ms beharrean XNUMX s-tan erantzuten hasi zenean, hartu eta eraman genuen.
- LocalExecutor. Bai, oraindik eserita gaude, eta dagoeneko amildegiaren ertzera iritsi gara. LocalExecutor nahikoa izan zaigu orain arte, baina orain gutxienez langile batekin zabaltzeko garaia da, eta gogor lan egin beharko dugu CeleryExecutor-era pasatzeko. Eta makina batean lan egin dezakezula ikusita, ezerk ez zaitu gelditzen Apioa zerbitzari batean ere erabiltzea, "noski, ez da inoiz produkziora sartuko, zintzotasunez!"
- Ez erabiltzea integratutako tresnak:
- Konexioak zerbitzuaren kredentzialak gordetzeko,
- SLA andereñoak garaiz funtzionatu ez zuten zereginei erantzuteko,
- xcom metadatuak trukatzeko (esan nuen metadatuak!) dag zereginen artean.
- Posta gehiegikeria. Tira, zer esan dezaket? Alertak ezarri ziren eroritako zereginen errepikapen guztietarako. Orain nire laneko Gmail-ek Airflow-eko 90 mezu elektroniko ditu, eta web-postaren mukiak uko egiten dio aldi berean 100 baino gehiago jasotzeari eta ezabatzeari.
Zalantza gehiago:
Automatizazio tresna gehiago
Gure buruekin eta ez eskuekin are gehiago lan egiteko, Airflow-ek hau prestatu digu:
- - oraindik Experimental estatusa du, eta horrek ez dio lan egitea eragozten. Harekin, dag eta zereginei buruzko informazioa lortu ez ezik, dag bat gelditu/hasi, DAG Run edo igerileku bat ere sortu dezakezu.
- - Komando-lerroaren bidez tresna asko daude eskuragarri, WebUI-ren bidez erabiltzeko deserosoak ez direnak, baina orokorrean ez daudenak. Adibidez:
backfillataza-instantziak berrabiarazteko beharrezkoak.
Esaterako, analistak etorri ziren eta esan zuten: “Eta zuk, kamarada, txorakeriak dituzu urtarrilaren 1etik 13ra bitarteko datuetan! Konpondu, konpondu, konpondu, konpondu!" Eta halako sukaldea zara:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Oinarrizko zerbitzua:
initdb,resetdb,upgradedb,checkdb. run, instantzia-zeregin bat exekutatzeko aukera ematen duena, eta baita mendekotasun guztietan puntuatzea ere. Gainera, bidez exekutatu dezakezuLocalExecutor, nahiz eta Apioaren multzoa izan.- Ia gauza bera egiten du
test, soilik oinarrietan ere ez du ezer idazten. connectionsshelletik konexioak masiboki sortzeko aukera ematen du.
- - Elkarreragiteko modu gogor samarra, pluginetarako pentsatuta dagoena, eta ez esku txikiekin ibiltzea. Baina norengana joatea galaraziko digu
/home/airflow/dags, Korrika eginipythoneta nahasten hasi? Adibidez, konexio guztiak esporta ditzakezu kode honekin:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Airflow metadatubasera konektatzen. Ez dut idaztea gomendatzen, baina hainbat metrika zehatzetarako ataza-egoerak lortzea askoz azkarrago eta errazagoa izan daiteke APIren bidez baino.
Demagun gure zeregin guztiak ez direla idepotenteak, baina batzuetan erori daitezkeela, eta hori normala da. Baina blokeo batzuk susmagarriak dira dagoeneko, eta egiaztatu beharko litzateke.
Kontuz SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
Erreferentziak
Eta, jakina, Google-ren jaulkipeneko lehen hamar estekak nire laster-marketako Airflow karpetako edukiak dira.
- - noski, bulegotik hasi behar dugu. dokumentazioa, baina nork irakurtzen ditu argibideak?
- - Tira, irakur itzazu behintzat sortzaileen gomendioak.
- - hasieratik: erabiltzailearen interfazea irudietan
- - Oinarrizko kontzeptuak ondo deskribatuta daude, baldin (bat-batean!) Zuk ez bazenuen zerbait ulertu.
- - Airflow kluster bat konfiguratzeko gida labur bat.
- - ia artikulu interesgarri bera, agian formalismo gehiago eta adibide gutxiago izan ezik.
- — Apioarekin batera lan egiteari buruz.
- - Zereginen inpotentziari buruz, IDaren arabera kargatzea dataren ordez, eraldaketa, fitxategien egitura eta beste gauza interesgarri batzuk.
- - atazen menpekotasunak eta Trigger Rule, iraganean bakarrik aipatu ditudanak.
- - Antolatzailean "nahi bezala lan" batzuk nola gainditu, galdutako datuak kargatu eta zereginak lehenetsi.
- - SQL kontsulta erabilgarriak Airflow metadatuetarako.
- - Sentsore pertsonalizatu bat sortzeari buruzko atal erabilgarria dago.
- — Datuen Zientziarako AWS-en azpiegitura bat eraikitzeari buruzko ohar labur interesgarri bat.
- - ohiko akatsak (norbaitek oraindik argibideak irakurtzen ez dituenean).
- - Irribarre egin nola jendeak pasahitzak gordetzeko makuluak egiten dituen, nahiz eta Konexioak soilik erabil ditzakezun.
- - DAG birbidaltze inplizitua, funtzioak testuingurua botatzea, berriro mendekotasunei buruz, eta baita ataza abiarazteei buruz ere.
- - erabilerari buruz
default argumentsиparamstxantiloietan, baita Aldagaiak eta Konexioak ere. - - Planifikatzailea Airflow 2.0rako prestatzen ari den istorio bat.
- - Gure kluster-en inplementari buruzko artikulu zaharkitu samarra
docker-compose. - - zeregin dinamikoak txantiloiak eta testuingurua birbidaltzea erabiliz.
- — Posta eta Slack bidez jakinarazpen estandarrak eta pertsonalizatuak.
- - Adarkatze-zereginak, makroak eta XCom.
Eta artikuluan erabilitako estekak:
- - txantiloietan erabiltzeko erabilgarri dauden leku-markak.
- — Dagak sortzean ohiko akatsak.
- -
docker-composeesperimentaziorako, arazketarako eta beste. - - Telegram REST APIrako Python bilgarria.
Iturria: www.habr.com




