Mhoroi, ndini Dmitry Logvinenko - Data Injiniya weDhipatimendi reAnalytics reboka reVezet remakambani.
Ini ndichakuudza nezve chishandiso chinoshamisa chekugadzira ETL maitiro - Apache Airflow. Asi Airflow inoshanda zvakasiyana-siyana uye yakawandisa zvekuti iwe unofanirwa kutarisisa pairi kunyangwe iwe usingabatanidzwe mukuyerera kwedata, asi uine chido chenguva nenguva kuvhura chero maitiro uye kutarisa maitiro avo.
Uye hongu, ini handisi kuzongotaura chete, asiwo kuratidza: chirongwa chine yakawanda kodhi, skrini uye kurudziro.
Zvaunowanzo kuona kana iwe uchi google izwi rekuti Airflow / Wikimedia Commons
Tafura yezvinyorwa
Nhanganyaya Chikamu chikuru, chinoshanda (uye zvishoma theoretical) Chikamu chekupedzisira, chirevo uye ruzivo nezvakanyorwa
Nhanganyaya
Apache Airflow yakangofanana neDjango:
- yakanyorwa nepython
- pane huru admin panel,
- inogona kuwedzerwa nekusingaperi
- zviri nani chete, uye yakagadzirirwa zvinangwa zvakasiyana zvachose, izvo (sezvazvakanyorwa pamberi pekat):
- kumhanya uye kutarisa mabasa pahuwandu husingagumi hwemakina (sezvakawanda zveCelery / Kubernetes uye hana yako ichakubvumidza)
- ine simba rekufambiswa kwebasa kubva nyore nyore kunyora uye kunzwisisa Python kodhi
- uye kugona kubatanidza chero dhatabhesi uye APIs nemumwe nemumwe uchishandisa ese akagadzirira-akagadzirwa zvikamu uye plugins dzakagadzirwa kumba (izvo zviri nyore kwazvo).
Isu tinoshandisa Apache Airflow seizvi:
- isu tinounganidza data kubva kwakasiyana siyana (yakawanda SQL Server uye PostgreSQL zviitiko, akasiyana API ane application metrics, kunyangwe 1C) muDWH neODS (tine Vertica uye Clickhouse).
- zvafamba sei
cron
, iyo inotanga maitiro ekubatanidza data paODS, uye inoongororawo kuchengetedza kwavo.
Kusvika nguva pfupi yadarika, zvatinoda zvakafukidzwa neimwe diki server ine 32 cores uye 50 GB ye RAM. Mu Airflow, izvi zvinoshanda:
- Π±ΠΎΠ»Π΅Π΅ 200 dags (chaizvoizvo mafambiro ebasa, matakaisa mabasa),
- mune imwe neimwe paavhareji 70 mabasa,
- kunaka uku kunotanga (zvakare paavhareji) kamwe paawa.
Uye nezve mawedzerero atakaita, ini ndichanyora pazasi, asi ikozvino ngatitsanangurirei ΓΌber-dambudziko ratinozogadzirisa:
Kune matatu masosi eSQL Servers, imwe neimwe ine makumi mashanu dhatabhesi - zviitiko zveimwe purojekiti, zvichiteerana, ivo vane chimiro chakafanana (kunenge kwese kwese, mua-ha-ha), zvinoreva kuti imwe neimwe ine Orders tafura (nerombo rakanaka, tafura ine izvozvo. zita rinogona kusundirwa mune chero bhizinesi). Isu tinotora iyo data nekuwedzera masevhisi masevhisi (sosi server, sosi dhatabhesi, ETL basa ID) uye nekuzvikandira mukati, toti, Vertica.
Ngatitangei!
Chikamu chikuru, chinoshanda (uye zvishoma theoretical)
Sei zviri kwatiri (uye kwauri)
Apo miti yakanga yakakura uye ndakanga ndiri nyore SQL
-schik mune imwe yekutengesa yeRussia, isu takabiridzira ETL maitiro aka data inoyerera tichishandisa maturusi maviri anowanikwa kwatiri:
- Informatica Power Center -Iyo yakanyanya kupararira sisitimu, inogadzira zvakanyanya, ine yayo hardware, yayo pachayo shanduro. Ndakashandisa Mwari arambidza 1% yezvaanogona. Sei? Zvakanaka, kutanga kwezvose, iyi interface, kumwe kubva kuma380s, mupfungwa inotimanikidza. Chechipiri, iyi contraption yakagadzirirwa maitiro akanyanya kunaka, kutsamwa kwechikamu kushandiswa zvakare uye mamwe akakosha-bhizinesi-manomano. Nezve izvo zvinodhura, sebapiro reAirbus AXNUMX / gore, isu hatitaure chero chinhu.
Chenjerera, skrini inogona kukuvadza vanhu vari pasi pemakore makumi matatu zvishoma
- SQL Server Integration Server - isu takashandisa iyi komuredhi mukuyerera kwedu kwemukati-purojekiti. Zvakanaka, kutaura zvazviri: isu tatoshandisa SQL Server, uye zvingave zvisina musoro kusashandisa maturusi ayo ETL. Zvose zviri mairi zvakanaka: zvose zviri mukati zvakanaka, uye kufambira mberi kunoshuma ... Asi ichi hachisi chikonzero nei tichida zvigadzirwa zve software, o, kwete nokuda kweizvi. Shanduro yacho
dtsx
(iyo iri XML ine node dzakashongedzwa pakuchengetedza) tinogona, asi chinangwa chii? Zvakadini nekugadzira pasuru yebasa iyo inodhonza mazana ematafura kubva kune imwe server kuenda kune imwe? Hongu, zana ripi, munwe wako we index uchadonha kubva pazvidimbu makumi maviri, uchidzvanya pane bhatani rembeva. Asi iye anotaridzika zvakanyanya mufashoni:
Zvechokwadi takatsvaka nzira dzokubuda nadzo. Case even zvimwe akauya kune anozvinyora ega SSIS package jenareta ...
...uyezve basa idzva rakandiwana. Uye Apache Airflow yakandibata pairi.
Pandakaona kuti tsananguro yeETL iri nyore Python kodhi, handina kungotamba nemufaro. Aya ndiwo mashandurirwo akaitwa hova dzedata uye nekusiyana, uye kudurura matafura ane chimiro chimwe kubva kumazana emadhatabhesi mune chimwe chinangwa yakava nyaya yePython kodhi mune imwe nehafu kana maviri 13 βskrini.
Kuunganidza sumbu
Ngatirege kuronga zvachose kindergarten, uye kwete kutaura pamusoro pezvinhu zviri pachena pano, sekuisa Airflow, dhatabhesi yako yakasarudzwa, Celery nezvimwe zviitiko zvinotsanangurwa mumadhokisi.
Kuitira kuti tikwanise kutanga kuyedza, ini ndakadhirowa docker-compose.yml
umo:
- Ngatisimudzei chaizvo wokufema: Murongi, Webserver. Ruva richange richitenderera ipapo kutarisa Celery mabasa (nekuti yakatosundirwa mukati
apache/airflow:1.10.10-python3.7
, asi isu hatinetseke) - PostgreSQL, umo Airflow ichanyora ruzivo rwayo rwebasa (scheduler data, execution statistics, nezvimwewo), uye Celery ichamaka mabasa apera;
- Redis, iyo ichaita semutengesi webasa weCelery;
- Celery mushandi, iyo ichave ichibatanidzwa mukuita zvakananga kwemabasa.
- To folder
./dags
isu tichawedzera mafaera edu netsananguro yemadags. Ivo vanozonhongwa panhunzi, saka hapana chikonzero chekumhanyisa murwi wese mushure mekuhotsira kwega kwega.
Mune dzimwe nzvimbo, iyo kodhi mumienzaniso haina kunyatso kuratidzwa (kuitira kuti isabatanidza zvinyorwa), asi pane imwe nzvimbo inogadziriswa mukuita. Mienzaniso yakakwana yekodhi yekushanda inogona kuwanikwa mune repository
https://github.com/dm-logv/airflow-tutorial .
docker-kunyora.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- broker
Notes:
- Mumusangano wekuumbwa, ndainyanya kuvimba nemufananidzo unozivikanwa
puckel/docker-airflow - iva nechokwadi chekutarisa. Pamwe hapana chimwe chaunoda muhupenyu hwako. - Yese Airflow marongero anowanikwa kwete kuburikidza chete
airflow.cfg
, asi zvakare kuburikidza nekusiyana kwezvakatipoteredza (ndatenda kune vanogadzira), izvo zvandakatora mukana nazvo. - Sezvingatarisirwa, haina kugadzirwa-yakagadzirira: Ini nemaune handina kuisa marovero emwoyo pamidziyo, handina kunetseka nekuchengetedzeka. Asi ini ndakaita zvishoma zvakakodzera kune vedu vanoedza.
- Ziva kuti:
- Iyo dag folda inofanirwa kuwanikwa kune vese vanoronga uye nevashandi.
- Izvi zvinoshandawo kune ese echitatu-bato raibhurari - iwo ese anofanirwa kuiswa pamakina ane scheduler nevashandi.
Zvakanaka, ikozvino zviri nyore:
$ docker-compose up --scale worker=3
Mushure mekunge zvese zvasimuka, unogona kutarisa pawebhu interfaces:
Basic pfungwa
Kana iwe usina chaunonzwisisa mune ese aya "dags", heino duramazwi pfupi:
- Muparidzi - babamunini vanonyanya kukosha muAirflow, kutonga kuti marobhoti anoshanda nesimba, uye kwete munhu: anotarisisa chirongwa, anogadziridza dags, anotanga mabasa.
Kazhinji, mushanduro dzekare, akange aine matambudziko nendangariro (kwete, kwete amnesia, asi kuvuza) uye paramende yenhaka yakatoramba iri mumagadzirirwo.
run_duration
- nguva yayo yekutanga. Asi iye zvino zvinhu zvose zvakanaka. - Dag (aka "dag") - "directed acyclic graph", asi tsananguro yakadaro inoudza vanhu vashoma, asi kutaura zvazviri igaba remabasa anodyidzana (ona pazasi) kana analogue yePackage muSSIS uye Workflow muInformatica. .
Pamusoro pema dags, panogona kunge paine subdags, asi isu kazhinji hatizosvika kwavari.
- DAG Run - yakatanga dag, iyo inopihwa yayo
execution_date
. Dagrans yedag imwechete inogona kushanda pamwe chete (kana iwe wakaita kuti mabasa ako ave asina simba, hongu). - Operator zvidimbu zvekodhi zvine basa rekuita chimwe chiitiko. Kune marudzi matatu evashandisi:
- chiitosekuda kwedu
PythonOperator
, iyo inogona kuita chero (yakakodzera) Python kodhi; - chinja, iyo inotakura data kubva kune imwe nzvimbo kuenda kune imwe, inoti,
MsSqlToHiveTransfer
; - Switch kune rumwe rutivi, zvinokutendera kuti uite kana kunonoka kupfuudza kuurayiwa kwedag kusvikira chiitiko chaitika.
HttpSensor
inogona kudhonza iyo yakatsanangurwa yekupedzisira, uye kana yaunoda mhinduro yakamirira, tanga kutamisaGoogleCloudStorageToS3Operator
. Murangariro wokuda kuziva unobvunza kuti: βnei? Mushure mezvose, unogona kudzokorodza mushandisi! " Uye zvino, kuitira kuti usavhare dziva remabasa nevaya vakamiswa. Iyo sensor inotanga, inotarisa uye inofa isati yaedza inotevera.
- chiitosekuda kwedu
- basa - vakaziviswa vanoshanda, zvisinei nerudzi, uye vakanamatira kune dag vanosimudzirwa kune chinzvimbo chebasa.
- basa muenzaniso - apo mukuru kuronga akafunga kuti yaive nguva yekutumira mabasa kuhondo kune vanoita-vashandi (papo chaipo, kana tikashandisa.
LocalExecutor
kana kuti kure node panyaya yeCeleryExecutor
), inopa mamiriro kwavari (kureva, seti yezvinosiyana - execution paramita), inowedzera kuraira kana kubvunza matemplate, uye kuabatanidza.
Isu tinogadzira mabasa
Chekutanga, ngatitarisei hurongwa hwakajairwa hwedoug yedu, tobva tanyura mune zvakadzama uye zvakanyanya, nekuti isu tinoshandisa zvisiri-zvidiki mhinduro.
Saka, muchimiro chayo chakareruka, dag rakadaro richaita seizvi:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Ngatizvinzwisisei:
- Kutanga, isu tinopinza iyo inodiwa libs uye chimwe chinhu;
sql_server_ds
- ichi chiList[namedtuple[str, str]]
nemazita ekubatanidza kubva kuAirflow Connections uye dhatabhesi kwatinotora ndiro yedu;dag
- kuziviswa kwedag redu, iro rinofanira kunge riri mukatiglobals()
, kana zvisina kudaro Airflow haizoiwana. Doug anodawo kuti:- zita rake ndiani
orders
-Iri zita rinobva raonekwa muwebhu interface, - kuti achashanda kubva pakati pehusiku musi wechisere waChikunguru,
- uye inofanira kumhanya, angangoita maawa matanhatu ega ega (kune vakomana vakaoma pano pane
timedelta()
zvinotenderwacron
-line0 0 0/6 ? * * *
, kune zvishoma kutonhorera - kutaura senge@daily
);
- zita rake ndiani
workflow()
achaita basa guru, asi kwete ikozvino. Parizvino, isu tichangorasa mamiriro edu mulog.- Uye ikozvino mashiripiti akareruka ekugadzira mabasa:
- tinomhanya nemumatsime edu;
- tanga
PythonOperator
, iyo ichaita dummy yeduworkflow()
. Usakanganwa kutsanangura rakasiyana (mukati me dag) zita rebasa uye sunga iyo dag pachayo. Flagprovide_context
zvakare, inodururira dzimwe nharo muchiitiko, chatinozonyatsounganidza tichishandisa**context
.
Parizvino, ndizvo chete. Zvatakawana:
- dag nyowani muwebhu interface,
- zana nehafu mabasa anozoitwa nenzira yakafanana (kana iyo Airflow, Celery marongero uye server simba inobvumira).
Zvakanaka, ndapotsa ndazviwana.
Ndiani achaisa ma dependencies?
Kurerutsa chinhu ichi chose, ndakapinda mukati docker-compose.yml
processing requirements.txt
pamanodhi ese.
Zvino zvaenda:
Gray squares zviitiko zvebasa zvinogadziriswa neanoronga.
Isu tinomirira zvishoma, mabasa anotorwa nevashandi:
Iwo akasvibira, hongu, akabudirira kupedza basa rawo. Zvitsvuku hazvina kubudirira zvakanyanya.
Nenzira, hapana folda pane yedu prod
./dags
, hapana kuwiriranisa pakati pemichina - ese madhagi akarara mukatigit
paGitlab yedu, uye Gitlab CI inogovera zvigadziriso kumashini kana uchibatanidza mukatimaster
.
A little about Ruva
Panguva iyo vashandi vari kurova-rova pacifiers edu, ngatiyeuke chimwe chishandiso chinogona kutiratidza chimwe chinhu - Ruva.
Iro peji rekutanga rine ruzivo rwepfupiso pamanodhi evashandi:
Iro peji rakanyanyisa rine mabasa akaenda kunoshanda:
Iro peji rinobhowa rine chimiro chebroker yedu:
Iro peji rinopenya rine basa rekuita magirafu uye nguva yavo yekuuraya:
Tinoisa pasi pasi
Saka, mabasa ese ashanda, unogona kutakura vakakuvara.
Uye kwaiva nevazhinji vakakuvadzwa - nokuda kwechikonzero chimwe kana chimwe. Panyaya yekushandiswa chaiko kwe Airflow, aya masikweya chaiwo anoratidza kuti data harina kusvika.
Iwe unofanirwa kutarisa irogi uye wotangazve akadonha ebasa zviitiko.
Nekudzvanya pane chero sikweya, tinoona zviito zviripo kwatiri:
Iwe unogona kutora uye kuita Bvisa iyo yakawa. Ndiko kuti, tinokanganwa kuti chimwe chinhu chakundikana ipapo, uye basa rimwechete remuenzaniso richaenda kumugadziri.
Zviripachena kuti kuita izvi negonzo nemakona ese matsvuku hazvina hunhu - izvi hazvisi izvo zvatinotarisira kubva kuAirflow. Sezvingatarisirwa, tine zvombo zvekuparadza kukuru: Browse/Task Instances
Ngatisarudze zvese kamwechete uye tosetazve ku zero, tinya chinhu chakakodzera:
Mushure mekuchenesa, matekisi edu anotaridzika seizvi (vakatomirira kuti mugadziri azvironge):
Zvisungo, hoko nezvimwe zvakasiyana
Yave nguva yekutarisa DAG inotevera, update_reports.py
:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ΠΠΎΡΠΏΠΎΠ΄Π° Ρ
ΠΎΡΠΎΡΠΈΠ΅, ΠΎΡΡΠ΅ΡΡ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Ρ"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ΠΠ°ΡΠ°Ρ, ΠΏΡΠΎΡΡΠΏΠ°ΠΉΡΡ, ΠΌΡ {{ dag.dag_id }} ΡΡΠΎΠ½ΠΈΠ»ΠΈ
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Munhu wese akamboita report update? Uyu ndiye zvakare: pane rondedzero yezvinyorwa kubva kwaunogona kuwana iyo data; pane runyoro rwekuiswa; usakanganwa kuhon kana zvese zvaitika kana kuputsika (zvakanaka, izvi hazvisi nezvedu, kwete).
Ngatiendei nefaira zvakare uye titarise zvinhu zvitsva zvisingaoneki:
from commons.operators import TelegramBotSendMessage
- hapana chinotitadzisa kugadzira edu maopareta, ayo atakatora mukana nekugadzira diki wrapper yekutumira mameseji kune Unblocked. (Tichataura zvakawanda nezve mushandisi uyu pazasi);default_args={}
- dag inogona kugovera nharo dzakafanana kune vese vashandisi vayo;to='{{ var.value.all_the_kings_men }}'
- mundato
isu hatizove nehardcode, asi zvine simba kugadzirwa tichishandisa Jinja uye chinosiyana chine runyorwa rwemaemail, ini ndakanyatsoisa mukati.Admin/Variables
;trigger_rule=TriggerRule.ALL_SUCCESS
- mamiriro ekutanga mushandisi. Kwatiri, tsamba inobhururuka kuenda kumaboss chete kana zvese zvinoenderana nazvo zvashanda kubudirira;tg_bot_conn_id='tg_main'
- nharoconn_id
gamuchira maID ekubatanidza atinogadzira mukatiAdmin/Connections
;trigger_rule=TriggerRule.ONE_FAILED
- mameseji muTeregiramu anobhururuka achienda chete kana paine akadonha mabasa;task_concurrency=1
- Isu tinorambidza kuvhurwa panguva imwe chete kweakati wandei ebasa zviitiko zvebasa rimwe. Zvikasadaro, isu tichawana kuvhurwa panguva imwe chete kwevanoverengekaVerticaOperator
(achitarisa tafura imwe);report_update >> [email, tg]
- zveseVerticaOperator
sangana mukutumira tsamba nemameseji, seizvi:
Asi sezvo vazivisi vanoshanda vaine akasiyana ekutanga mamiriro, imwe chete ndiyo inoshanda. MuMuti Wekuona, zvese zvinotaridzika zvishoma zvishoma:
Ndichataura mashoko mashoma pamusoro macros neshamwari dzavo - variables.
Macros iJinja inobata nzvimbo iyo inogona kutsiva ruzivo rwakasiyana runobatsira kuita nharo dzevashandisi. Semuenzaniso, seizvi:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }}
ichawedzera kune zviri mukati mekusiyana kwechirevo execution_date
muchimiro YYYY-MM-DD
: 2020-07-14
. Chikamu chakanakisa ndechekuti misiyano yemamiriro ekunze inorovererwa kune chaiyo yebasa chiitiko (skweya muMuti View), uye kana yatangwazve, vanobatirira vanozowedzera kune imwecheteyo kukosha.
Iwo akagoverwa kukosha anogona kutariswa uchishandisa Rendered bhatani pane yega yega basa chiitiko. Aya ndiwo maitiro ebasa rekutumira tsamba:
Uye saka pabasa nekutumira meseji:
Rondedzero yakazara yeakavakirwa-mukati macros yeyazvino vhezheni iripo inowanikwa pano:
Uyezve, nerubatsiro rwema plugins, tinogona kuzivisa edu macros, asi iyo imwe nyaya.
Pamusoro pezvinhu zvakafanotsanangurwa, isu tinogona kutsiva kukosha kwezvakasiyana zvedu (ini ndakatoshandisa izvi mune kodhi iri pamusoro). Ngatigadzirirei mukati Admin/Variables
zvinhu zviviri:
Zvese zvaunogona kushandisa:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')
Iko kukosha kunogona kuva scalar, kana inogonawo kuva JSON. Kana iri JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}
ingoshandisa nzira kune kiyi yaunoda: {{ var.json.bot_config.bot.token }}
.
Ini ndichataura izwi rimwe chete uye kuratidza imwe skrini nezve kubatana. Zvese ndezvekutanga pano: pane peji Admin/Connections
isu tinogadzira chinongedzo, wedzera yedu logins / mapassword uye mamwe chaiwo paramita ipapo. Sezvizvi:
Mapassword anogona kuvharirwa (kunyanya kupfuura iyo default), kana iwe unogona kusiya kunze rudzi rwekubatanidza (sezvandakaitira tg_main
) - chokwadi ndechekuti rondedzero yemhando dzakaomeswa muAirflow modhi uye haigone kuwedzerwa pasina kupinda mumakodhi makodhi (kana kamwe kamwe ndisina kuGoogle chimwe chinhu, ndapota ndiruramise), asi hapana chinotitadzisa kuwana zvikwereti chete. zita.
Iwe unogona zvakare kuita akati wandei kubatanidza ane zita rimwechete: mune ino kesi, iyo nzira BaseHook.get_connection()
, iyo inotiwanira hukama nemazita, ichapa random kubva kune akati wandei mazita (zvingave zvine musoro kugadzira Round Robin, asi ngatiisiye pahana yevagadziri veAirflow).
Variables uye maConnections zvechokwadi maturusi anotonhorera, asi zvakakosha kuti usarasikirwe nechiyero: ndedzipi zvikamu zvekuyerera kwako zvaunochengeta mukodhi pachayo, uye zvikamu zvipi zvaunopa Airflow kuchengetedza. Kune rimwe divi, zvinogona kuve nyore kukurumidza kuchinja kukosha, semuenzaniso, bhokisi rekutumira, kuburikidza neUI. Kune rimwe divi, izvi zvichiri kudzoka kune mbeva yekudzvanya, kubva isu (ini) taida kubvisa.
Kushanda nekubatanidza ndiro rimwe remabasa zvikorekedzo. Kazhinji, Airflow hooks mapoinzi ekubatanidza kune wechitatu-bato masevhisi nemaraibhurari. Eg, JiraHook
ichavhura mutengi kuti titaurirane naJira (unogona kufambisa mabasa nekudzoka), uye nerubatsiro rwe SambaHook
unogona kusundira faira remunharaunda ku smb
-point.
Kuongorora tsika opareta
Uye tave pedyo nekutarisa kuti inogadzirwa sei TelegramBotSendMessage
kodhi commons/operators.py
nemushandisi chaiye:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Pano, sezvimwe zvese muAirflow, zvese zviri nyore:
- Nhaka kubva
BaseOperator
, iyo inoshandisa akati wandei Airflow-chaiwo zvinhu (tarisa kuzorora kwako) - Declared fields
template_fields
, umo Jinja achatsvaga macros ekugadzirisa. - Akaronga nharo dzakakodzera
__init__()
, isa zvisizvo pazvinenge zvakakodzera. - Hatina kukanganwa nezvekutanga kwemadzitateguru zvakare.
- Yakavhura hoko yaienderana
TelegramBotHook
yakagamuchira chinhu chemutengi kubva kwairi. - Overridden (redefined) nzira
BaseOperator.execute()
, iyo Airfow ichazununguka kana nguva yasvika yekuvhura opareta - mairi isu tichaita chiitiko chikuru, tichikanganwa kupinda. (Tinopinda, nenzira, mukati chaimostdout
ΠΈstderr
- Kuyerera kwemhepo kunopindira zvese, kuchiputira zvakanaka, kuodza pazvinenge zvakakodzera.)
Ngationei zvatinazvo commons/hooks.py
. Chikamu chekutanga chefaira, ine hoko pachayo:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Ini handitomboziva zvekutsanangura pano, ini ndinongoona zvakakosha mapoinzi:
- Isu tinotora nhaka, funga nezve nharo - kazhinji ichava imwe:
conn_id
; - Kupfuura nzira dzakajairika: Ndakazviganhurira
get_conn()
, umo ini ndinowana maparamita ekubatanidza nezita uye ndingowana chikamuextra
(iyi munda weJSON), umo ini (maererano nemirairo yangu!) ndinoisa iyo Telegraph bot token:{"bot_token": "YOuRAwEsomeBOtToKen"}
. - Ini ndinogadzira muenzaniso wedu
TelegramBot
, achichipa chiratidzo chaicho.
Ndizvo zvose. Unogona kuwana mutengi kubva pachiredzo uchishandisa TelegramBotHook().clent
kana TelegramBotHook().get_conn()
.
Uye chikamu chechipiri chefaira, umo ini ndinogadzira microwrapper yeTeregiramu REST API, kuti urege kudhonza zvakafanana. python-telegram-bot
sendMessage
.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))
Nzira yakarurama ndeyokuwedzera zvose:
TelegramBotSendMessage
,TelegramBotHook
,TelegramBot
- mune plugin, isa mune yeruzhinji repository, uye ipa iyo Open Source.
Tichiri kudzidza zvese izvi, mishumo yedu yakagadziridzwa yakakwanisa kutadza kubudirira uye kunditumira meseji yekukanganisa muchiteshi. Ndotarisa kuti ndione kana zvisirizvo...
Chimwe chinhu chakaputsika mukati medu! Handizvo zvataitarisira here? Ndizvozvo!
Uri kuzodira here?
Unonzwa ndasuwa chimwe chinhu here? Zvinoita sekuti akavimbisa kuendesa data kubva kuSQL Server kuenda kuVertica, uye akabva aitora ndokubvisa musoro, tsotsi!
Hutsinye uhwu hwaive hwemaune, ndaifanira kungodudzira mamwe mazwi ekutaura kwauri. Iye zvino unogona kuenda mberi.
Chirongwa chedu chaive ichi:
- Do dag
- Gadzira mabasa
- Ona kunaka kwakaita zvinhu zvese
- Ipa nhamba dzechikamu kuti uzadze
- Tora data kubva kuSQL Server
- Isa data muVertica
- Unganidza nhamba
Saka, kuti zvose zvitange, ndakaita kuwedzera kuduku kune yedu docker-compose.yml
:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.py
Ikoko tinosimudza:
- Vertica semuenzi
dwh
ine zvigadziriso zvakanyanya, - zviitiko zvitatu zveSQL Server,
- isu tinozadza dhatabhesi mune yekupedzisira neimwe data (pasina mhosva usatarise mukati
mssql_init.py
!)
Isu tinotangisa zvese zvakanaka nerubatsiro rwekuraira zvishoma kwakaoma kupfuura nguva yekupedzisira:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Izvo zvakagadzirwa nechishamiso chedu randomizer, unogona kushandisa chinhu chacho Data Profiling/Ad Hoc Query
:
Chinhu chikuru hachisi chokuchiratidza kune vaongorori
tsanangura pamusoro ETL zvikamu Ini handidi, zvese zvidiki ipapo: tinoita hwaro, pane chiratidzo mairi, tinoputira zvese nemaneja wemamiriro, uye ikozvino tinoita izvi:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Nguva yasvika unganidza data redu kubva pamatafura edu zana nehafu. Ngatiitei izvi nerubatsiro rwemitsara isingaremekedzi:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
- Nekubatsirwa kwehokwe tinowana kubva kuAirflow
pymssql
-batanidza - Ngatitsivei kurambidzwa muchimiro chezuva muchikumbiro - chinokandwa mubasa neinjini yetemplate.
- Kudyisa chikumbiro chedu
pandas
ndiani achatitoraDataFrame
- zvichatibatsira mune ramangwana.
Ndiri kushandisa chinotsiva
{dt}
pachinzvimbo chekukumbira parameter%s
kwete nekuti ndiri Pinocchio akaipa, asi nekutipandas
haigoni kubatapymssql
uye anotsvedza wekupedzisiraparams: List
kunyange zvazvo achida chaizvotuple
.
Uyewo cherechedza kuti mugadziripymssql
akasarudza kusamutsigira zvakare, uye yave nguva yekubuda kunzepyodbc
.
Ngationei izvo Airflow yakazadza nharo dzemabasa edu ne:
Kana pasina data, saka hapana chikonzero chekuenderera mberi. Asi zvakare zvinoshamisa kufunga nezvekuzadza kwakabudirira. Asi uku hakusi kukanganisa. A-ah-ah, kuita sei?! Uye heino chii:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException
tichaudza Airflow kuti hapana zvikanganiso, asi isu tinosvetuka basa racho. Iyo interface haizove negirinhi kana tsvuku sikweya, asi pink.
Ngatikande data redu mbiru dzakawanda:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])
Zita:
- Database kwatakatora maodha,
- ID yechikamu chedu mafashama (zvichave zvakasiyana pabasa rose),
- A hashi kubva kunobva uye kurongeka ID - kuitira kuti mudura rekupedzisira (apo zvese zvinodururwa mutafura imwe) isu tine yakasarudzika ID.
Danho rekupedzisira rinosara: dururira zvese muVertica. Uye, zvisingaite, imwe yedzakanakisa uye inoshanda nzira dzekuita izvi kuburikidza neCSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
- Tiri kugadzira yakakosha kugamuchira
StringIO
. pandas
achaisa zvedu nomutsaDataFrame
muchimiroCSV
-mitsetse.- Ngativhure chinongedzo kune yedu yatinoda Vertica ine hoko.
- Uye zvino nerubatsiro
copy()
tumira data redu zvakananga kuVertika!
Tichatora kubva kumutyairi kuti mitsetse mingani yakazadzwa, uye toudza maneja wechikamu kuti zvese zvakanaka:
session.loaded_rows = cursor.rowcount
session.successful = True
Ndizvo zvose.
Pakutengesa, tinogadzira ndiro inotarirwa nemaoko. Apa ndakazvibvumira muchina mudiki:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
Ndiri kushandisa
VerticaOperator()
Ini ndinogadzira dhatabhesi schema uye tafura (kana ivo vasati vavapo, hongu). Chinhu chikuru ndechekugadzirisa nemazvo zvinoenderana:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> load
Summing up
- Zvakanaka, - akadaro mbeva duku, - handizvo, ikozvino
Une chokwadi here kuti ndini mhuka inotyisa kwazvo musango?
Julia Donaldson, The Gruffalo
Ini ndinofunga kana ini nevandinoshanda navo tive nemakwikwi: ndiani achakurumidza kugadzira uye kutanga chirongwa cheETL kubva pakutanga: ivo neSSIS yavo uye mbeva uye ini neAirflow ... Uye isu taizofananidzawo kureruka kwekugadzirisa ... Wow, ndinofunga unobvuma kuti ndichavarova pamativi ese!
Kana zvishoma zvakanyanya, saka Apache Airflow - nekutsanangura maitiro muchimiro chekodhi yepurogiramu - akaita basa rangu. zvakawanda zvakanyanya kugadzikana uye zvinonakidza.
Kuwedzera kwayo kusingagumi, zvese maererano neplug-ins uye predisposition kune scalability, inokupa iwe mukana wekushandisa Airflow munharaunda chero ipi zvayo: kunyangwe mukutenderera kuzere kwekuunganidza, kugadzirira uye kugadzirisa data, kunyangwe mukuvhura maroketi (kuMars, course).
Chikamu chekupedzisira, chirevo uye ruzivo
Raka takakuunganidzirai
start_date
. Hongu, iyi yatova meme yemunharaunda. Via Doug's nharo hurustart_date
zvose zvinopfuura. Muchidimbu, kana iwe ukataura mukatistart_date
zuva razvino, uyeschedule_interval
- rimwe zuva, ipapo DAG ichatanga mangwana kwete kare.start_date = datetime(2020, 7, 7, 0, 1, 2)
Uye hapasisina matambudziko.
Pane imwe runtime kukanganisa kwakabatana nayo:
Task is missing the start_date parameter
, iyo inowanzoratidza kuti wakanganwa kusunga kune dag operator.- Zvese pamushini mumwe. Hongu, uye mabhesi (Airflow pachayo uye kupfeka kwedu), uye webhu server, uye scheduler, uye vashandi. Uye zvakatoshanda. Asi nekufamba kwenguva, huwandu hwemabasa emasevhisi hwakakura, uye PostgreSQL payakatanga kupindura kune index mu20 s panzvimbo ye5 ms, takaitora tikaenda nayo.
- LocalExecutor. Hongu, isu tichiri kugara pairi, uye isu tatosvika kumucheto kwegomba rakadzika-dzika. LocalExecutor yanga yakatikwanira kusvika parizvino, asi ikozvino yave nguva yekuwedzera nemushandi mumwechete, uye isu tichafanira kushanda nesimba kuti tiende kuCeleryExecutor. Uye tichifunga nezvenyaya yekuti iwe unogona kushanda nayo pamushini mumwe chete, hapana chinokutadzisa kushandisa Celery kunyangwe pane sevha, iyo "zvechokwadi, haimbofi yakapinda mukugadzirwa, kutendeseka!"
- Kusashandisa zvishandiso zvakavakwa:
- Connections kuchengetedza zvitupa zvebasa,
- SLA Misses kupindura kumabasa asina kushanda nenguva,
- xcom yekuchinjana metadata (ndakadaro metadata!) pakati pe dag mabasa.
- Kushungurudzwa kwetsamba. Zvakanaka, chii chandingati? Nyevero dzakagadzirirwa kudzokororwa kwese kwemabasa akadonha. Ikozvino basa rangu Gmail ine >90k maemail kubva kuAirflow, uye webhu mail muzzle inoramba kutora nekudzima anopfuura zana panguva.
Mimwe misungo:
Apache Airflow Pitfails
Zvimwe otomatiki zvishandiso
Kuti isu tishande zvakanyanya nemisoro yedu uye kwete nemaoko edu, Airflow yakatigadzirira izvi:
VAMWE API - achiri nechimiro cheKuedza, izvo zvisingamutadzisi kushanda. Nayo, haugone chete kuwana ruzivo nezve dags uye mabasa, asi zvakare mira / tanga dag, gadzira DAG Run kana dziva.CLI - Maturusi mazhinji anowanikwa kuburikidza nemutsara wekuraira izvo zvisiri kungonetsa kushandisa kuburikidza neWebUI, asi kazhinji asipo. Semuyenzaniso:backfill
inodiwa kutangazve zviitiko zvebasa.
Semuenzaniso, vaongorori vakauya ndokuti: βUye iwe, sahwira, une upenzi mune data kubva muna Ndira 1 kusvika 13! Gadzirisa, gadzirisa, gadzirisa, gadzirisa!" Uye iwe uri hobho yakadaro:airflow backfill -s '2020-01-01' -e '2020-01-13' orders
- Base service:
initdb
,resetdb
,upgradedb
,checkdb
. run
, iyo inokutendera kuti umhanye chiitiko chimwe chete, uye kunyangwe zvibodzwa pane zvese zvinotsamira. Uyezve, unogona kuimhanyisa kuburikidzaLocalExecutor
, kunyangwe uine Celery cluster.- Inoita zvakafanana zvakafanana
test
, chete mumabhesi hapana chaanonyora. connections
inobvumira kusikwa kwehuwandu hwekubatanidza kubva kune shell.
python api - iyo yakaomesesa nzira yekudyidzana, iyo inoitirwa plugins, uye kwete kuputika mairi nemaoko madiki. Asi ndiani achatitadzisa kuenda/home/airflow/dags
, mhanyaipython
uye kutanga kukanganisa? Iwe unogona, semuenzaniso, kutumira kunze zvese zvinongedzo neinotevera kodhi:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d)
- Kubatanidza kune Airflow metadatabase. Ini handikurudzire kuinyorera, asi kuwana nzvimbo dzebasa kune akasiyana-siyana metrics kunogona kukurumidza uye nyore pane kushandisa chero maAPI.
Ngatitii haasi ese emabasa edu asina simba, asi anogona kudonha dzimwe nguva, uye izvi zvakajairika. Asi mashoma blockages atonyumwira, uye zvingave zvakakosha kuti utarise.
Ngwarira SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
nezvakanyorwa
Uye zvechokwadi, gumi ekutanga zvinongedzo kubva pakuburitswa kweGoogle zviri mukati meiyo Airflow folda kubva kumabhukimaki angu.
Apache Airflow Documentation - hongu, tinofanira kutanga nehofisi. zvinyorwa, asi ndiani anoverenga mirairo?Maitiro Akanakisisa - Zvakanaka, verenga kurudziro kubva kuvagadziri.Iyo Airflow UI - kutanga chaiko: mushandisi interface mumifananidzoKunzwisisa Apache Airflow's key concepts - mazano ekutanga anotsanangurwa zvakanaka, kana (kamwe kamwe!) Iwe hauna kunzwisisa chimwe chinhu kubva kwandiri.Tianlong's Blog -Nhungamiro Yekuvaka Iyo Airflow Server/Cluster - ipfupi gwara rekumisikidza Airflow cluster.Kumhanya Apache Airflow At Lyft - chinenge chinyorwa chinonakidza chimwechete, kunze kwezvimwe zvemaitiro, uye mienzaniso mishoma.Maitiro Apache Airflow Inogovera Mabasa pane Celery vashandi - nezvekushanda pamwe chete neCelery.DAG Kunyora Maitiro Akanyanya muApache Airflow - nezve idepotency yemabasa, kurodha neID pachinzvimbo chezuva, shanduko, chimiro chefaira uye zvimwe zvinonakidza zvinhu.Kugadzirisa Kutsamira muApache Airflow - kutsamira kwemabasa uye Trigger Rule, yandakataura chete mukupfuura.Kuyerera Kwemhepo: Kana DAG Yako Yave Kure Kuseri Kwehurongwa - maitiro ekukunda mamwe "anoshanda sezvakarongwa" mune inoronga, kurodha yakarasika data uye kukoshesa mabasa.Inobatsira SQL mibvunzo yeApache Airflow - inobatsira SQL mibvunzo kune Airflow metadata.Tanga kugadzira mafambiro neApache Airflow - pane chikamu chinobatsira nezve kugadzira tsika sensor.Kuvaka iyo Fetchr Data Sainzi Infra paAWS ine Presto uye Airflow -chinofadza chipfupi chinyoro chekuvaka chivakwa paAWS yeData Sayenzi.7 Zvakajairwa Zvikanganiso zvekutarisa kana Debugging Airflow DAGs - zvikanganiso zvinowanzoitika (kana mumwe munhu asati averenga mirairo).Chengetedza uye uwane password uchishandisa Apache Airflow -nyemwerera mabatiro anoita vanhu kuchengetedza mapassword, kunyangwe iwe uchingogona kushandisa maConnections.Iyo Zen yePython uye Apache Airflow - yakanyatsojeka DAG kutumira, kukanda mamiriro mumabasa, zvakare nezve kutsamira, uye zvakare nezve kusvetuka basa kutangwa.Kuyerera Kwemhepo: Mazano Madiki Anozivikanwa, Manomano, uye Maitiro Akanakisisa - nezve kushandiswadefault arguments
ΠΈparams
mumatemplate, pamwe neVariables uye maConnections.Kutsvaga iyo Airflow Scheduler - nyaya yekuti murongi ari kugadzirira sei Airflow 2.0.Apache Airflow ine 3 Celery vashandi mune docker-compose -chinyorwa chekare zvishoma nezve kuendesa cluster yedu mukatidocker-compose
.4 Kutemera Mabasa Uchishandisa Mhepo Inoyerera - mabasa ane simba uchishandisa matemplate uye mamiriro ekutumira.Kukanganisa Zviziviso mu Airflow - yakajairwa uye yakasarudzika zviziviso netsamba uye Slack.Airflow Workshop: yakaoma maDAG asina madondoro -Bazi mabasa, macros uye XCom.
Uye zvinongedzo zvakashandiswa muchinyorwa:
macro reference - zvibatiso zviripo zvekushandisa mumatemplate.Misungo Yakajairika-Kuyerera Kwemhepo -Kukanganisa kwakajairika pakugadzira dags.puckel/docker-airflow: Docker Apache Airflow -docker-compose
yekuedza, kugadzirisa uye nezvimwe.python-telegram-bot/python-telegram-bot: Takakuitira kuputira iwe haugone kuramba -Python wrapper yeTeregiramu REST API.
Source: www.habr.com