Dia duit, is mise Dmitry Logvinenko - Innealtóir Sonraí de chuid na Roinne Analytics de ghrúpa cuideachtaí Vezet.
Inseoidh mé duit faoi uirlis iontach chun próisis ETL a fhorbairt - Apache Airflow. Ach tá Airflow chomh ilghnéitheach agus chomh ilghnéitheach sin gur chóir duit breathnú níos géire air fiú mura bhfuil baint agat le sreafaí sonraí, ach go bhfuil gá agat próisis ar bith a sheoladh go tréimhsiúil agus monatóireacht a dhéanamh ar a gcur i gcrích.
Agus tá, ní inseoidh mé ní hamháin, ach freisin taispeánfaidh mé: tá go leor cód, screenshots agus moltaí ag an gclár.

An rud a fheiceann tú de ghnáth nuair a dhéanann tú google an focal Airflow / Wikimedia Commons
Tábla na nÁbhar
Réamhrá
Tá Apache Airflow díreach cosúil le Django:
- scríofa i python
- tá painéal riaracháin iontach ann,
- inmhéadaithe ar feadh tréimhse éiginnte
- ach níos fearr, agus rinneadh é chun críocha go hiomlán difriúil, eadhon (mar atá sé scríofa roimh an kata):
- tascanna a rith agus monatóireacht a dhéanamh ar líon neamhtheoranta meaisíní (mar a cheadóidh go leor Soilire / Kubernetes agus do choinsiasa duit)
- le giniúint sreabhadh oibre dinimiciúil ó chód Python an-éasca a scríobh agus a thuiscint
- agus an cumas aon bhunachair shonraí agus APIanna a nascadh lena chéile trí úsáid a bhaint as comhpháirteanna réamhdhéanta agus forlíontáin de dhéantús baile (rud atá thar a bheith simplí).
Úsáidimid Apache Airflow mar seo:
- bailímid sonraí ó fhoinsí éagsúla (go leor cásanna SQL Server agus PostgreSQL, APIanna éagsúla le méadracht feidhmchláir, fiú 1C) in DWH agus ODS (tá Vertica agus Clickhouse againn).
- conas chun cinn
cron, a chuireann tús leis na próisis chomhdhlúthaithe sonraí ar ODS, agus a dhéanann monatóireacht ar a gcothabháil freisin.
Go dtí le déanaí, bhí ár gcuid riachtanas clúdaithe ag freastalaí beag amháin le 32 cores agus 50 GB RAM. In Airflow, oibríonn sé seo:
- níos mó 200 dag (sreafaí oibre i ndáiríre, inar líonta muid tascanna),
- i ngach ar an meán 70 tasc,
- tosaíonn an mhaitheas seo (ar an meán freisin) uair an chloig.
Agus faoin gcaoi ar mhéadaigh muid, scríobhfaidh mé thíos, ach anois déanaimis sainmhíniú ar an bhfadhb über a réiteoimid:
Tá trí Fhreastalaí SQL bunaidh, gach ceann acu le 50 bunachar sonraí - cásanna de thionscadal amháin, faoi seach, tá an struchtúr céanna acu (beagnach i ngach áit, mua-ha-ha), rud a chiallaíonn go bhfuil tábla Orduithe ag gach ceann acu (go fortunately, tábla leis sin is féidir ainm a bhrú isteach in aon ghnó). Glacaimid na sonraí trí réimsí seirbhíse (freastalaí foinse, bunachar sonraí foinse, ID tasc ETL) a chur leis agus caithimid go naive iad, abair, Vertica.
A ligean ar dul!
An phríomhchuid, praiticiúil (agus beagán teoiriciúil)
Cén fáth a bhfuil muid (agus tú)
Nuair a bhí na crainn mór agus bhí mé simplí SQL-schik i miondíol Rúisis amháin, rinneamar próisis ETL a scamadh mar a chéile le sreafaí sonraí ag baint úsáide as dhá uirlis atá ar fáil dúinn:
- Lárionad Cumhachta Informatica - córas thar a bheith scaipthe, thar a bheith táirgiúil, lena crua-earraí féin, a leagan féin. Bhain mé úsáid as Dia forbid 1% dá cumais. Cén fáth? Bhuel, ar an gcéad dul síos, chuir an comhéadan seo, áit éigin ó na 380í, brú orainn go meabhrach. Ar an dara dul síos, tá an contraption seo deartha le haghaidh próisis an-mhaisiúla, athúsáid comhpháirte buile agus cleasanna eile atá thar a bheith tábhachtach don fhiontar. Maidir leis an méid a chosnaíonn sé, cosúil le sciathán an Airbus AXNUMX / bliain, ní déarfaimid rud ar bith.
Bí ar an airdeall, is féidir le screenshot daoine faoi 30 a ghortú beagán

- Freastalaí Comhtháthaithe SQL Server - d'úsáideamar an comrádaí seo inár sreafaí laistigh den tionscadal. Bhuel, i ndáiríre: úsáidimid SQL Server cheana féin, agus bheadh sé míréasúnta ar bhealach éigin gan a chuid uirlisí ETL a úsáid. Tá gach rud go maith: tá an comhéadan álainn araon, agus tuairiscíonn an dul chun cinn ... Ach ní hé seo an fáth go bhfuil grá againn ar tháirgí bogearraí, ó, ní le haghaidh seo. Leagan é
dtsx(a bhfuil XML le nóid shuffled ar shábháil) is féidir linn, ach cad é an pointe? Cad faoi thasc-phacáiste a dhéanamh a tharraingeoidh na céadta tábla ó fhreastalaí amháin go freastalaí eile? Sea, cad céad, beidh do mhéar innéacs titim amach as fiche píosaí, cliceáil ar an cnaipe luiche. Ach is cinnte go bhféachann sé níos faiseanta:
Is cinnte gur lorg muid bealaí amach. Cás fiú beagnach tháinig go dtí gineadóir pacáiste SSIS féinscríofa ...
…agus ansin fuair post nua mé. Agus rug Apache Airflow orm air.
Nuair a fuair mé amach go bhfuil cur síos ar phróiseas ETL cód simplí Python, ní raibh mé ag damhsa le haghaidh áthas. Seo é an chaoi a ndearnadh leaganacha agus difríocht de shruthanna sonraí, agus tháinig sé chun bheith ina ábhar de chód Python i gceann go leith nó dhá scáileán 13” táblaí le struchtúr amháin a dhoirteadh ó na céadta bunachar sonraí isteach i sprioc amháin.
Cnuasach an bhraisle
Ná déanaimis kindergarten go hiomlán a shocrú, agus gan labhairt faoi rudaí go hiomlán soiléir anseo, cosúil le Airflow a shuiteáil, do bhunachar sonraí roghnaithe, Soilire agus cásanna eile a thuairiscítear sna duganna.
Ionas gur féidir linn turgnaimh a thosú láithreach, rinne mé sceitse docker-compose.yml ina bhfuil:
- A ligean ar a ardú i ndáiríre Sruth aeir: Sceidealóir, Freastalaí Gréasáin. Beidh Flower ag sníomh ann freisin chun monatóireacht a dhéanamh ar thascanna Soilire (toisc go bhfuil sé curtha isteach cheana féin
apache/airflow:1.10.10-python3.7, ach is cuma linn) - PostgreSQL, ina scríobhfaidh Airflow a chuid faisnéise seirbhíse (sonraí sceidealaithe, staitisticí forghníomhaithe, etc.), agus marcálfaidh Soilire tascanna críochnaithe;
- Redis, a fheidhmeoidh mar bhróicéir tasc do Soilire;
- Oibrí soilire, a bheidh ag gabháil do chur i gcrích díreach cúraimí.
- Go fillteán
./dagscuirfimid ár gcomhaid leis an tuairisc ar dags. Déanfar iad a phiocadh suas ar an eitilt, mar sin ní gá a juggle an chairn iomlán tar éis gach sraothartach.
I roinnt áiteanna, nach bhfuil an cód sna samplaí a thaispeáint go hiomlán (ionas nach a tranglam suas an téacs), ach áit éigin tá sé modhnaithe sa phróiseas. Is féidir samplaí de chóid oibre iomlána a fháil sa stór .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerNótaí:
- I gcomhthionól an chomhdhéanamh, bhí mé ag brath go mór ar an íomhá aitheanta go maith - bí cinnte é a sheiceáil. B'fhéidir nach bhfuil aon rud eile ag teastáil uait i do shaol.
- Tá gach socrú Airflow ar fáil ní hamháin tríd
airflow.cfg, ach freisin trí athróga timpeallachta (a bhuíochas leis na forbróirí), ar bhain mé leas mailíseach astu. - Ar ndóigh, níl sé réidh le táirgeadh: níor chuir mé buille croí ar choimeádáin d'aon ghnó, níor bhac mé le slándáil. Ach rinne mé an t-íosmhéid oiriúnach dár turgnamhóirí.
- Tabhair faoi deara:
- Caithfidh an fillteán dag a bheith inrochtana don sceidealóir agus do na hoibrithe araon.
- Baineann an rud céanna le gach leabharlann tríú páirtí - ní mór iad go léir a shuiteáil ar mheaisíní le sceidealóir agus oibrithe.
Bhuel, anois tá sé simplí:
$ docker-compose up --scale worker=3Tar éis gach rud a ardú, is féidir leat breathnú ar na comhéadain gréasáin:
- Sreabhadh aeir:
- Bláth:
Bunchoincheapa
Más rud é nár thuig tú aon rud sna “dags” seo go léir, seo chugainn foclóir gairid:
- Sceidealóir - an t-uncail is tábhachtaí i Airflow, ag rialú go n-oibríonn robots go crua, agus ní duine: déanann sé monatóireacht ar an sceideal, nuashonraíonn dags, seolann sé tascanna.
Go ginearálta, i leaganacha níos sine, bhí fadhbanna aige le cuimhne (níl, ní amnesia, ach sceitheadh) agus d'fhan an paraiméadar oidhreachta fiú sna cumraíochtaí
run_duration- a eatramh atosú. Ach anois tá gach rud go breá. - GCM (aka "dag") - "graf aicyclic faoi threoir", ach is beag duine a inseoidh sainmhíniú den sórt sin, ach i ndáiríre is coimeádán é le haghaidh tascanna a idirghníomhaíonn lena chéile (féach thíos) nó analóg de Phacáiste i SSIS agus Sreabhadh Oibre in Informatica .
Chomh maith le daga, d'fhéadfadh go mbeadh fo-dhuillí ann fós, ach is dócha nach n-éireoidh linn iad.
- Rith DAG - dag tosaithe, a sanntar a chuid féin
execution_date. Is féidir le dagrans den dag céanna oibriú ag an am céanna (má tá do thascanna déanta agat gan mhoill, ar ndóigh). - oibreoir is píosaí cód iad atá freagrach as gníomh sonrach a dhéanamh. Tá trí chineál oibreoirí ann:
- gníomhcosúil lenár is fearr leat
PythonOperator, ar féidir leo aon chód Python (bailí) a fhorghníomhú; - aistriú, a iompraíonn sonraí ó áit go háit, abair,
MsSqlToHiveTransfer; - braiteoir ar an láimh eile, ligfidh sé duit freagairt nó mhoilliú a dhéanamh ar fhorghníomhú breise an dag go dtí go dtarlaíonn teagmhas.
HttpSensoris féidir leis an críochphointe sonraithe a tharraingt, agus nuair a bhíonn an freagra inmhianaithe ag fanacht, cuir tús leis an aistriúGoogleCloudStorageToS3Operator. Fiafróidh meon fiosrach: “cén fáth? Tar éis an tsaoil, is féidir leat athrá a dhéanamh díreach san oibreoir!” Agus ansin, ionas nach gcuirfear bac ar an linn tascanna le hoibreoirí ar fionraí. Tosaíonn an braiteoir, seiceálann agus bás roimh an gcéad iarracht eile.
- gníomhcosúil lenár is fearr leat
- Tasc - déantar oibreoirí dearbhaithe, beag beann ar an gcineál, agus atá ceangailte leis an dag a ardú go céim an taisc.
- shampla tasc - nuair a chinn an pleanálaí ginearálta go raibh sé in am tascanna a chur chun catha ar taibheoirí-oibrithe (ar an bpointe, má úsáidimid
LocalExecutornó chuig nód iargúlta i gcásCeleryExecutor), sannann sé comhthéacs dóibh (i.e., sraith athróg - paraiméadair forghníomhaithe), leathnaíonn sé teimpléid ordaithe nó fiosrúcháin, agus comhthiomsaíonn sé iad.
Gineann muid tascanna
Ar dtús, déanaimis breac-chuntas ar scéim ghinearálta ár doug, agus ansin déanfaimid níos mó agus níos mó isteach ar na sonraí, toisc go gcuirimid roinnt réitigh neamhfhánacha i bhfeidhm.
Mar sin, sa bhfoirm is simplí, beidh cuma mar seo ar a leithéid de dag:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Déanaimis é a dhéanamh amach:
- Gcéad dul síos, allmhairímid na libs riachtanacha agus rud éigin eile;
sql_server_ds- An bhfuilList[namedtuple[str, str]]le hainmneacha na nasc ó Airflow Connections agus na bunachair shonraí óna dtógfaimid ár bpláta;dag- an fógra ár dag, ní mór a bheith riachtanach iglobals(), nó ní bhfaighidh Airflow é. Ní mór do Doug a rá freisin:- Cad is ainm dó
orders- beidh an t-ainm seo le feiceáil sa chomhéadan gréasáin ansin, - go n-oibreoidh sé ó mheadhon oidhche an t-ochtmhadh d'Iúl,
- agus ba chóir é a rith, thart ar gach 6 uair an chloig (do guys diana anseo in ionad
timedelta()inghlacthacron-líne0 0 0/6 ? * * *, le haghaidh an níos lú fionnuar - slonn mar@daily);
- Cad is ainm dó
workflow()Déanfaidh an príomh-jab, ach ní anois. Faoi láthair, ní dhéanfaimid ach ár gcomhthéacs a dhumpáil isteach sa loga.- Agus anois an draíocht simplí a bhaineann le tascanna a chruthú:
- rithimid tríd ár bhfoinsí;
- tosaigh
PythonOperator, a fhorghníomhóidh ár Caochadánworkflow(). Ná déan dearmad ainm uathúil (laistigh den dag) don tasc a shonrú agus an dag féin a cheangal. Bratachprovide_contextina dhiaidh sin, Doirt argóintí breise isteach an fheidhm, a bheidh muid a bhailiú go cúramach ag baint úsáide as**context.
Go dtí seo, sin é go léir. Cad a fuair muid:
- dag nua sa chomhéadan gréasáin,
- céad go leith tasc a dhéanfar go comhthreomhar (má cheadaíonn na socruithe Airflow, Soilire agus cumas an fhreastalaí é).
Bhuel, fuair beagnach é.

Cé a shuiteáilfidh na spleáchais?
Chun an rud ar fad seo a shimpliú, chuaigh mé isteach docker-compose.yml próiseáil requirements.txt ar gach nóid.
Anois tá sé imithe:

Is tascanna iad cearnóga liatha a phróiseálann an sceidealóir.
Fanann muid beagán, gearrann na hoibrithe na tascanna:

Tá na cinn glas, ar ndóigh, tar éis a gcuid oibre a chríochnú go rathúil. Níl an-rath ar Reds.
Dála an scéil, níl aon fhillteán ar ár dtáirgí
./dags, níl aon sioncrónú idir meaisíní - luíonn gach dags isteachgitar ár Gitlab, agus dáileann Gitlab CI nuashonruithe ar mheaisíní agus iad á gcumasc isteachmaster.
Beagán faoi Flower
Cé go bhfuil na hoibrithe ag caoineadh ár n-pacifiers, cuimhnigh ar uirlis eile a fhéadfaidh rud éigin a thaispeáint dúinn - Flower.
An chéad leathanach ina bhfuil faisnéis achomair ar nóid oibrithe:

An leathanach is déine le tascanna a chuaigh ag obair:

An leathanach is leadránach le stádas ár mbróicéir:

Tá an leathanach is gile le graif stádais tascanna agus a gcuid ama cur i gcrích:

Táimid luchtaithe an underloaded
Mar sin, d'oibrigh na tascanna go léir amach, is féidir leat an lucht créachtaithe a dhéanamh.

Agus bhí go leor lucht créachtaithe - ar chúis amháin nó eile. I gcás úsáid cheart a bhaint as Airflow, léiríonn na cearnóga céanna seo nach raibh na sonraí cinnte.
Ní mór duit féachaint ar an logáil agus atosú ar na cásanna tasc tite.
Trí chliceáil ar aon chearnóg, feicfimid na gníomhartha atá ar fáil dúinn:

Is féidir leat a ghlacadh agus a dhéanamh Glan an tite. Is é sin, déanaimid dearmad go bhfuil rud éigin teipthe ann, agus beidh an tasc ásc céanna téigh go dtí an sceidealóir.

Is léir nach bhfuil sé an-daonnúil é seo a dhéanamh leis an luch leis na cearnóga dearga go léir - ní hé seo an rud a mbeimid ag súil leis ó Airflow. Ar ndóigh, tá airm ollscriosta againn: Browse/Task Instances

Roghnaigh muid gach rud ag an am céanna agus athshocróimid go nialas, cliceáil ar an mír cheart:

Tar éis a ghlanadh, breathnaíonn ár tacsaithe mar seo (tá siad ag fanacht leis an sceidealóir iad a sceidealú cheana féin):

Naisc, crúcaí agus athróga eile
Tá sé in am breathnú ar an gcéad DAG eile, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]An ndearna gach duine nuashonrú tuairisce riamh? Seo í arís: tá liosta foinsí ann óna bhfaightear na sonraí; tá liosta cá háit le cur; ná déan dearmad honk nuair a tharla nó a bhris gach rud (bhuel, níl sé seo fúinn, níl).
A ligean ar dul tríd an comhad arís agus breathnú ar an stuif nua doiléir:
from commons.operators import TelegramBotSendMessage- ní chuireann aon rud cosc orainn ár n-oibreoirí féin a dhéanamh, rud a bhaineamar leas as trí fhillteán beag a dhéanamh chun teachtaireachtaí a sheoladh chuig Unblocked. (Beidh muid ag caint níos mó faoin oibreoir seo thíos);default_args={}- is féidir le dag na hargóintí céanna a dháileadh ar a cuid oibreoirí go léir;to='{{ var.value.all_the_kings_men }}'- Gorttoní bheidh hardcoded againn, ach ginte go dinimiciúil ag baint úsáide as Jinja agus athróg le liosta de na ríomhphoist, a chuir mé go cúramach iAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— coinníoll chun an t-oibreoir a thosú. Is é ár gcás, beidh an litir ag eitilt go dtí an bosses ach amháin má tá gach spleáchas oibriú amach go rathúil;tg_bot_conn_id='tg_main'- argóintíconn_idglacadh le haitheantais naisc a chruthaímid iontuAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- ní bheidh teachtaireachtaí i Telegram eitilt ar shiúl ach amháin má tá tascanna tite;task_concurrency=1- cuirimid cosc ar roinnt cásanna de thasc amháin a sheoladh go comhuaineach. Seachas sin, gheobhaidh muid an seoladh comhuaineach roinntVerticaOperator(ag féachaint ar bhord amháin);report_update >> [email, tg]- go léirVerticaOperatorteacht le chéile agus litreacha agus teachtaireachtaí á seoladh, mar seo:

Ach ós rud é go bhfuil coinníollacha seolta éagsúla ag oibreoirí fógra, ní oibreoidh ach duine amháin. Sa Tree View, tá cuma beagán níos lú ar gach rud:

Beidh mé ag rá cúpla focal faoi macraí agus a gcairde - athróga.
Is áitshealbhóirí Jinja iad Macraí ar féidir leo faisnéis úsáideach éagsúla a chur in áit argóintí oibreoirí. Mar shampla, mar seo:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} leathnófar go dtí ábhar na hathróige comhthéacs é execution_date san fhormáid YYYY-MM-DD: 2020-07-14. Is é an chuid is fearr ná go gcuirtear athróga comhthéacs in iúl do thasc ar leith (cearnóg sa Tree View), agus nuair a atosófar iad, leathnóidh na sealbhóirí áite chuig na luachanna céanna.
Is féidir na luachanna sannta a fheiceáil ag baint úsáide as an gcnaipe Rindreáilte ar gach tasc ásc. Seo mar a bhí an tasc le litir a sheoladh:

Agus mar sin ag an tasc le teachtaireacht a sheoladh:

Tá liosta iomlán macraí ionsuite don leagan is déanaí atá ar fáil ar fáil anseo:
Thairis sin, le cabhair ó fhorlíontáin, is féidir linn ár macraí féin a dhearbhú, ach sin scéal eile.
Chomh maith leis na rudaí réamhshainithe, is féidir linn luachanna ár n-athróg a chur in ionad (d'úsáid mé é seo cheana féin sa chód thuas). A ligean ar a chruthú i Admin/Variables cupla rud:

Gach rud is féidir leat a úsáid:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Is féidir leis an luach a bheith ina scálach, nó is féidir gur JSON é freisin. I gcás JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}bain úsáid as an cosán go dtí an eochair atá ag teastáil: {{ var.json.bot_config.bot.token }}.
Déarfaidh mé focal amháin go litriúil agus taispeánfaidh mé seat amháin faoi naisc. Tá gach rud bunrang anseo: ar an leathanach Admin/Connections cruthaímid nasc, cuirimid ár logins / pasfhocail agus paraiméadair níos sainiúla ann. Mar seo:

Is féidir pasfhocail a chriptiú (níos cruinne ná an réamhshocrú), nó is féidir leat an cineál ceangail a fhágáil amach (mar a rinne mé do tg_main) - is é an bhfíric go bhfuil an liosta de na cineálacha hardwired i múnlaí Airflow agus ní féidir a leathnú gan dul isteach ar na cóid foinse (más rud é go tobann ní raibh mé google rud éigin, le do thoil ceart dom), ach ní bheidh aon rud a stopadh dúinn ó creidmheasanna a fháil díreach ag ainm.
Is féidir leat a dhéanamh freisin naisc éagsúla leis an ainm céanna: sa chás seo, an modh BaseHook.get_connection(), a fhaigheann naisc de réir ainm dúinn, a thabhairt randamach ó roinnt ainmneacha (bheadh sé níos loighciúil Babhta Robin a dhéanamh, ach fágaimis é ar choinsias na bhforbróirí Airflow).
Is cinnte gur uirlisí fionnuara iad Athróga agus Naisc, ach tá sé tábhachtach gan an t-iarmhéid a chailleadh: cé na codanna de do shreabhadh a stórálann tú sa chód féin, agus na codanna a thugann tú do Airflow le haghaidh stórála. Ar thaobh amháin, is féidir go mbeadh sé áisiúil an luach a athrú go tapa, mar shampla, bosca poist, tríd an Chomhéadain. Ar an láimh eile, tá sé seo fós ar ais chuig an cliceáil luiche, as a raibh muid (mé) ag iarraidh a fháil réidh.
Tá oibriú le naisc ar cheann de na tascanna crúcaí. Go ginearálta, is pointí iad crúcaí Airflow chun é a nascadh le seirbhísí agus leabharlanna tríú páirtí. m.sh., JiraHook osclóidh cliant dúinn idirghníomhú le Jira (is féidir leat tascanna a bhogadh ar ais agus amach), agus le cabhair ó SambaHook is féidir leat comhad áitiúil a bhrú chun smb-pointe.
An t-oibreoir saincheaptha a pharsáil
Agus tháinig gar dúinn féachaint ar conas a dhéantar é TelegramBotSendMessage
Cód commons/operators.py leis an oibreoir iarbhír:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Anseo, cosúil le gach rud eile in Airflow, tá gach rud an-simplí:
- Oidhreacht ó
BaseOperator, a chuireann go leor rudaí a bhaineann go sonrach le Aer-Shreabhadh i bhfeidhm (féach ar do chuid fóillíochta) - Réimsí dearbhaithe
template_fields, ina mbeidh Jinja lorg Macraí a phróiseáil. - D'eagraigh na hargóintí cearta ar son
__init__(), socraigh na mainneachtainí nuair is gá. - Ní dhearna muid dearmad faoi thúsú an sinsear ach an oiread.
- D'oscail an hook comhfhreagrach
TelegramBotHookfuair rud cliant uaidh. - Modh sáraithe (athshainithe).
BaseOperator.execute(), a dhéanfaidh Airfow twitch nuair a thagann an t-am chun an t-oibreoir a sheoladh - ann cuirfimid an príomhghníomh i bhfeidhm, ag dearmad logáil isteach. (Logaimid isteach, dála an scéil, ceart isteachstdoutиstderr- Déanfaidh aer-sreabhadh gach rud a thascradh, é a fhilleadh go hálainn, é a dhianscaoileadh nuair is gá.)
A ligean ar a fheiceáil cad atá againn commons/hooks.py. An chéad chuid den chomhad, leis an duán féin:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientNíl a fhios agam fiú cad atá le míniú anseo, ní dhéanfaidh mé ach na pointí tábhachtacha a thabhairt faoi deara:
- Déanaimid oidhreacht, smaoinímid ar na hargóintí - i bhformhór na gcásanna beidh sé ar cheann:
conn_id; - Sárú ar mhodhanna caighdeánacha: chuir mé teorainn orm féin
get_conn(), ina bhfaighidh mé na paraiméadair nasc de réir ainm agus díreach an t-alt a fháilextra(réimse JSON é seo), inar chuir mé (de réir mo threoracha féin!) an comhartha bot Telegram:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Cruthaím sampla dár
TelegramBot, rud a thugann comhartha sonrach dó.
Sin é an méid. Is féidir leat cliant a fháil ó Hook ag baint úsáide as TelegramBotHook().clent nó TelegramBotHook().get_conn().
Agus an dara cuid den chomhad, ina ndéanaim microwrapper don Telegram REST API, ionas nach tarraingim an rud céanna ar mhodh amháin sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Is é an bealach ceart é a shuimiú:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- sa bhreiseán, cuir i stór poiblí é, agus tabhair chuig Foinse Oscailte é.
Agus muid ag déanamh staidéir air seo go léir, d’éirigh lenár nuashonruithe tuairisce theip go rathúil agus chuir siad teachtaireacht earráide chugam sa chainéal. Táim chun seiceáil féachaint an bhfuil sé mícheart ...

Bhris rud éigin inár madra! Nach é sin a rabhamar ag súil leis? Díreach!
An bhfuil tú ag dul a dhoirteadh?
An mbraitheann tú gur chaill mé rud éigin? Dealraíonn sé gur gheall sé sonraí a aistriú ó SQL Server go Vertica, agus ansin thóg sé é agus bhog sé as an ábhar, an scoundrel!
Bhí an t-uafás seo d'aon ghnó, ní raibh orm ach téarmaíocht éigin a mhíniú duit. Anois is féidir leat dul níos faide.
Ba é seo an plean a bhí againn:
- Déan dag
- Gin tascanna
- Féach cé chomh hálainn atá gach rud
- Sann uimhreacha seisiúin le líonadh
- Faigh sonraí ó SQL Server
- Cuir sonraí isteach i Vertica
- Bailigh staitisticí
Mar sin, chun é seo a chur ar bun agus a reáchtáil, chuir mé beagán breise lenár gcuid docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyArdaímid ansin:
- Vertica mar óstach
dwhleis na socruithe is réamhshocraithe, - trí chás de fhreastalaí SQL,
- líonaimid na bunachair shonraí sa dara ceann le roinnt sonraí (ní breathnú isteach i gcás ar bith
mssql_init.py!)
Seolann muid gach rud go maith le cabhair ó ordú beagán níos casta ná an uair dheireanach:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Cad a ghin ár randomizer miracle, is féidir leat úsáid a bhaint as an mír Data Profiling/Ad Hoc Query:

Is é an rud is mó ná é a thaispeáint d'anailísithe
mionléiriú ar Seisiúin ETL Ní dhéanfaidh mé, tá gach rud fánach ansin: déanaimid bonn, tá comhartha ann, fillteann muid gach rud le bainisteoir comhthéacs, agus anois déanaimid é seo:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15seisiún.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passTá an t-am tagtha ár sonraí a bhailiú ónár gcéad go leith tábla. Déanaimis é seo le cabhair ó línte an-unpretentious:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Le cabhair ó Hook a fháil againn ó Airflow
pymssql-ceangal - Cuir srian i bhfoirm dáta isteach san iarratas - cuirfidh an t-inneall teimpléad isteach é san fheidhm.
- Ár n-iarratas a bheathú
pandasa gheobhaidh sinnDataFrame- beidh sé úsáideach dúinn amach anseo.
Tá ionadú á úsáid agam
{dt}in ionad paraiméadar iarratais%sní toisc gur Pinocchio olc mé, ach toiscpandasNí féidir a láimhseáilpymssqlagus duillíní an ceann deireanachparams: Listcé gur mian leis i ndáiríretuple.
Chomh maith leis sin faoi deara go bhfuil an forbróirpymssqlchinn gan tacaíocht a thabhairt dó níos mó, agus tá sé in am chun bogadh amachpyodbc.
Feicfimid cad a líontar argóintí ár bhfeidhmeanna le Airflow:

Mura bhfuil aon sonraí ann, níl aon phointe ann leanúint ar aghaidh. Ach tá sé aisteach freisin an líonadh a mheas go rathúil. Ach ní botún é seo. A-ah-ah, cad atá le déanamh?! Agus seo cad:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException insíonn sé do Airflow nach bhfuil aon earráidí, ach ní dhéanaimid an tasc. Ní bheidh cearnach glas nó dearg ag an gcomhéadan, ach bándearg.
Caithfimid ár sonraí colúin iolracha:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])eadhon
- An bunachar sonraí ónar ghlacamar na horduithe,
- ID ár seisiún tuilte (beidh sé difriúil do gach tasc),
- A hash ón bhfoinse agus ID ordú - ionas go mbeidh sa bhunachar sonraí deiridh (ina bhfuil gach rud poured isteach tábla amháin) ní mór dúinn ID ordú uathúil.
Tá an chéim leathdhéanach fós: doirt gach rud isteach i Vertica. Agus, rud aisteach go leor, is é CSV ceann de na bealaí is iontach agus is éifeachtaí chun é seo a dhéanamh!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Táimid ag déanamh glacadóir speisialta
StringIO. pandascuirfidh cineálta árDataFramei bhfoirmCSV-línte.- A ligean ar oscailt nasc chuig ár Vertica is fearr leat le Hook.
- Agus anois le cabhair
copy()seol ár sonraí go díreach chuig Vertika!
Tógfaimid ón tiománaí cé mhéad líne a líonadh, agus inseoimid do bhainisteoir an tseisiúin go bhfuil gach rud ceart go leor:
session.loaded_rows = cursor.rowcount
session.successful = TrueSin é an méid.
Ar an díolachán, cruthaímid an spriocphláta de láimh. Anseo cheadaigh mé meaisín beag dom féin:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)Tá mé ag úsáid
VerticaOperator()Cruthaím scéimre bunachar sonraí agus tábla (mura bhfuil siad ann cheana féin, ar ndóigh). Is é an rud is mó ná na spleáchais a shocrú i gceart:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadAchoimre
- Bhuel, - a dúirt an luch beag, - nach bhfuil, anois
An bhfuil tú cinnte gurb mise an t-ainmhí is uafásach san fhoraois?
Julia Donaldson, An Gruffalo
I mo thuairimse, dá mbeadh mo chomhghleacaithe agus mé comórtas: cé a chruthú agus a sheoladh go tapa próiseas ETL ó scratch: siad lena SSIS agus luch agus mé le Airflow ... Agus ansin ba mhaith linn a chur i gcomparáid freisin ar an éascaíocht cothabhála ... Wow, sílim go n-aontóidh tú go mbuafaidh mé iad ar gach taobh!
Más rud é beagán níos dáiríre, ansin rinne Apache Airflow - trí chur síos a dhéanamh ar phróisis i bhfoirm cód cláir - mo phost i bhfad níos compordaí agus níos taitneamhaí.
Tugann a shíneadh neamhtheoranta, i dtéarmaí breiseán agus réamhshuíomh le hinscálaitheacht, an deis duit Airflow a úsáid i mbeagnach aon réimse: fiú amháin sa timthriall iomlán de bhailiú, ullmhú agus próiseáil sonraí, fiú agus roicéad á seoladh (go Mars, de. cúrsa).
Cuid deiridh, tagairt agus faisnéis
An raic atá bailithe againn duit
start_date. Sea, is meme áitiúil é seo cheana féin. Trí phríomh-argóint Dougstart_datepas ar fad. Go hachomair, má shonraíonn tú istart_datedáta reatha, agusschedule_interval- lá amháin, ansin beidh DAG ag tosú amárach tráth nach luaithe.start_date = datetime(2020, 7, 7, 0, 1, 2)Agus gan fadhbanna níos mó.
Tá earráid ama rite eile bainteach leis:
Task is missing the start_date parameter, rud a léiríonn go minic go ndearna tú dearmad ceangal a dhéanamh leis an oibreoir dag.- Go léir ar mheaisín amháin. Sea, agus bunanna (Airflow féin agus ár sciath), agus freastalaí gréasáin, agus sceidealóir, agus oibrithe. Agus d'oibrigh sé fiú. Ach le himeacht ama, d'fhás líon na dtascanna do sheirbhísí, agus nuair a thosaigh PostgreSQL ag freagairt don innéacs i 20 s in ionad 5 ms, thógamar é agus thugamar ar shiúl é.
- Seiceadóir Áitiúil. Sea, táimid fós inár suí air, agus tá muid tagtha go dtí imeall an duibheagáin cheana féin. Is leor LocalExecutor dúinn go dtí seo, ach anois tá sé in am leathnú le hoibrí amháin ar a laghad, agus beidh orainn oibriú go crua chun bogadh go CeleryExecutor. Agus i bhfianaise gur féidir leat oibriú leis ar mheaisín amháin, ní chuireann aon rud bac ort Soilire a úsáid fiú ar fhreastalaí, rud “ar ndóigh, ní rachaidh sé isteach i dtáirgeadh, go hionraic!”
- Neamhúsáid uirlisí ionsuite:
- Naisc chun dintiúir seirbhíse a stóráil,
- SLA chailleann freagairt do thascanna nár oibrigh amach in am,
- xcom le haghaidh malartú meiteashonraí (a dúirt mé meiteadata!) idir thascanna dag.
- Mí-úsáid ríomhphoist. Bhuel, cad is féidir liom a rá? Socraíodh foláirimh le haghaidh gach athrá ar thascanna tite. Anois tá >90k r-phoist ag Gmail ó Airflow, agus diúltaíonn an muzzle ríomhphoist gréasáin níos mó ná 100 a phiocadh suas agus a scriosadh ag an am céanna.
Tuilleadh gaistí:
Tuilleadh uirlisí uathoibrithe
Ionas gur féidir linn oibriú níos mó fós lenár gcinn agus ní lenár lámha, d’ullmhaigh Airflow dúinn é seo:
- - tá stádas Turgnamhach fós aige, rud nach gcuireann cosc air oibriú. Leis, ní hamháin gur féidir leat eolas a fháil faoi dags agus faoi thascanna, ach freisin stop a chur / tús a chur le dag, Rith DAG nó linn snámha a chruthú.
- - tá go leor uirlisí ar fáil tríd an líne ordaithe nach bhfuil deacair iad a úsáid tríd an WebUI, ach atá as láthair go ginearálta. Mar shampla:
backfillag teastáil chun cásanna tasc a atosú.
Mar shampla, tháinig anailísithe agus dúirt: “Agus tá nonsense agatsa, a chomrádaí, sna sonraí ón 1 Eanáir go dtí an 13 Eanáir! Deisigh é, deisigh é, deisigh é, deisigh é!" Agus is hobad den sórt sin thú:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Bunseirbhís:
initdb,resetdb,upgradedb,checkdb. run, a ligeann duit tasc ásc amháin a rith, agus fiú scóráil ar gach spleáchas. Thairis sin, is féidir leat é a rith tríLocalExecutor, fiú má tá braisle Soilire agat.- An rud céanna go leor
test, ach freisin i boinn scríobhann rud ar bith. connectionsCeadaíonn cruthú mais naisc as an bhlaosc.
- - bealach idirghníomhaithe sách crua, atá beartaithe le haghaidh breiseán, agus gan a bheith ag snámh inti le lámha beaga. Ach cé atá chun stop a chur orainn dul go dtí
/home/airflow/dags, rithipythonagus tosú ag praiseach thart? Is féidir leat, mar shampla, gach nasc leis an gcód seo a leanas a onnmhairiú:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Ag nascadh le bunachar meiteashonraí Airflow. Ní mholaim scríobh chuige, ach d’fhéadfadh sé a bheith i bhfad níos tapúla agus níos éasca sonraí tasc a fháil le haghaidh méadrachtaí sonracha éagsúla ná trí aon cheann de na APInna.
Ligean le rá nach bhfuil gach ceann dár tascanna idempotent, ach is féidir leo titim uaireanta, agus tá sé seo gnáth. Ach tá cúpla blockchain amhrasach cheana féin, agus bheadh sé riachtanach a sheiceáil.
Seachain SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
tagairtí
Agus ar ndóigh, is iad na chéad deich nasc ó eisiúint Google ábhar an fhillteáin Airflow ó mo leabharmharcanna.
- - ar ndóigh, ní mór dúinn tosú leis an oifig. doiciméadú, ach cé a léann na treoracha?
- - Bhuel, ar a laghad a léamh na moltaí ó na creators.
- - an tús: an comhéadan úsáideora i bpictiúir
- - tá cur síos maith ar na bunchoincheapa, más rud é (go tobann!) nár thuig tú rud éigin uaim.
- - treoir ghairid chun braisle Aershreafa a bhunú.
- - beagnach an t-alt suimiúil céanna, ach amháin b'fhéidir níos foirmiúlachas, agus níos lú samplaí.
- — faoi oibriú i gcomhar le Soilire.
- - faoi neamhláithreacht na dtascanna, luchtú aitheantais in ionad dáta, claochlú, struchtúr comhaid agus rudaí suimiúla eile.
- - spleáchais na dtascanna agus an Riail Truicear, rud a luaigh mé ach amháin le linn a rith.
- - conas roinnt "oibreacha mar a bhí beartaithe" sa sceidealóir a shárú, sonraí a cailleadh a luchtú agus tascanna a chur in ord tosaíochta.
- — fiosruithe úsáideacha SQL ar mheiteashonraí Airflow.
- - tá alt úsáideach ann maidir le braiteoir saincheaptha a chruthú.
- — nóta gearr suimiúil faoi bhonneagar a thógáil ar AWS don Eolaíocht Sonraí.
- - botúin choitianta (nuair nach léann duine na treoracha fós).
- - aoibh gháire conas a crutch daoine pasfhocail a stóráil, cé gur féidir leat é a úsáid ach Connections.
- - cur ar aghaidh intuigthe DAG, caitheamh comhthéacs i bhfeidhmeanna, arís faoi spleáchais, agus freisin faoi thasc a sheoladh gan bacadh le.
- - faoin úsáid
default argumentsиparamsi dteimpléid, chomh maith le Athróga agus Naisc. - - scéal faoin gcaoi a bhfuil an pleanálaí ag ullmhú do Airflow 2.0.
- - alt beagán as dáta faoi imscaradh ár mbraisle i
docker-compose. - - tascanna dinimiciúla ag baint úsáide as teimpléid agus cur ar aghaidh comhthéacs.
- — fógraí caighdeánacha agus saincheaptha tríd an bpost agus Slack.
- - Tascanna brainse, macraí agus XCom.
Agus na naisc a úsáidtear san alt:
- - áitshealbhóirí ar fáil le húsáid i dteimpléid.
- — Botúin choitianta agus dags á gcruthú.
- -
docker-composele haghaidh turgnamh, dífhabhtaithe agus níos mó. - - Timfhilleadh Python le haghaidh Telegram REST API.
Foinse: will.com




