Hi, is mise Dmitry Logvinenko - Einnseanair DĂ ta aig Roinn Analytics buidheann chompanaidhean Vezet.
Innsidh mi dhut mu inneal mÏorbhaileach airson pròiseasan ETL a leasachadh - Apache Airflow. Ach tha Airflow cho ioma-chruthach agus ioma-thaobhach gum bu chòir dhut sÚil nas mionaidiche a thoirt air eadhon ged nach eil thu an sàs ann an sruthan dà ta, ach gu bheil feum agad air pròiseasan sam bith a chuir air bhog bho à m gu à m agus sÚil a chumail air an coileanadh.
Agus tha, chan e a-mhĂ in innsidh mi, ach cuideachd seallaidh mi: tha tòrr còd, seallaidhean-sgrĂŹn agus molaidhean aig aâ phrògram.

Na chĂŹ thu mar as trice nuair a nĂŹ thu google am facal Airflow / Wikimedia Commons
ClĂ r-innse
Ro-rĂ dh
Tha Apache Airflow dĂŹreach mar Django:
- sgrĂŹobhte ann am python
- tha pannal rianachd math ann,
- aâ leudachadh gun chrĂŹoch
- a-mhà in nas fheà rr, agus chaidh a dhèanamh airson adhbharan gu tur eadar-dhealaichte, is e sin (mar a tha e sgrÏobhte ron kat):
- aâ ruith agus aâ cumail sĂšil air gnĂŹomhan air Ă ireamh neo-chuingealaichte de dhâ innealan (mar a bheir mòran de Celery / Kubernetes agus do chogais dhut)
- le gineadh sruth-obrach fiÚghantach bho chòd Python a tha gu math furasta a sgrÏobhadh agus a thuigsinn
- agus an comas stòran-dĂ ta agus APIan sam bith a cheangal ri chèile aâ cleachdadh an dĂ chuid co-phĂ irtean deiseil agus plugins dèanta aig an taigh (a tha gu math sĂŹmplidh).
Bidh sinn aâ cleachdadh Apache Airflow mar seo:
- bidh sinn aâ cruinneachadh dĂ ta bho dhiofar thobraichean (mòran eisimpleirean SQL Server agus PostgreSQL, diofar APIan le metrics tagraidh, eadhon 1C) ann an DWH agus ODS (tha Vertica agus Clickhouse againn).
- cho adhartach
cron, a thòisicheas pròiseasan daingneachadh dà ta air an ODS, agus cuideachd a 'cumail sÚil air an cumail suas.
Gu ruige o chionn ghoirid, bha na feumalachdan againn air an còmhdach le aon fhrithealaiche beag le 32 cores agus 50 GB de RAM. Ann an Airflow, tha seo ag obair:
- йОНоо 200 dag (gu dearbh sruthan-obrach, anns an do lÏon sinn gnÏomhan),
- anns gach aon gu cuibheasach 70 obair,
- tha am maitheas seo a 'tòiseachadh (cuideachd gu cuibheasach) uair san uair.
Agus mu mar a leudaich sinn, sgrĂŹobhaidh mi gu h-ĂŹosal, ach a-nis mĂŹnichidh sinn an duilgheadas Ăźber a dhâ fhuasglas sinn:
Tha trĂŹ frithealaichean SQL tĂšsail ann, gach fear le stòran-dĂ ta 50 - eisimpleirean de aon phròiseact, fa leth, tha an aon structar aca (cha mhòr anns a h-uile Ă ite, mua-ha-ha), a tha aâ ciallachadh gu bheil clĂ r Ărdughan aig gach fear (gu fortanach, clĂ r le sin faodar ainm a phutadh a-steach do ghnĂŹomhachas sam bith). Bidh sinn aâ toirt an dĂ ta le bhith aâ cur raointean seirbheis ris (frithealaiche stòr, stòr-dĂ ta stòr, ID gnĂŹomh ETL) agus gan tilgeil a-steach gu naive, can, Vertica.
Leamamaid!
Am prÏomh phà irt, practaigeach (agus beagan teòiridheach)
Carson a tha sinne (agus thusa)
Nuair a bha na craobhan mòr agus bha mi sĂŹmplidh SQL-schik ann an aon reic Ruiseanach, rinn sinn sgam air pròiseasan ETL aka sruthan dĂ ta aâ cleachdadh dĂ inneal a bha rim faighinn dhuinn:
- Ionad cumhachd Informatica - siostam air leth sgaoileadh, air leth cinneasach, le bathar-cruaidh fhèin, dreach fhèin. Chleachd mi Dia aâ toirmeasg 1% de na comasan aige. Carson? Uill, an toiseach, chuir an eadar-aghaidh seo, am badeigin bho na 380n, cuideam inntinn oirnn. San dĂ rna h-Ă ite, tha an contraption seo air a dhealbhadh airson pròiseasan air leth sĂšbailte, ath-chleachdadh phĂ irtean feargach agus cleasan iomairt eile a tha fĂŹor chudromach. Mu dheidhinn gu bheil e aâ cosg, mar sgiath an Airbus AXNUMX / bliadhna, cha bhith sinn ag rĂ dh dad.
Thoir an aire, faodaidh dealbh-sgrĂŹn beagan a ghoirteachadh do dhaoine fo 30

- SQL Server Integration Server - chleachd sinn aâ chompanach seo nar sruthan taobh a-staigh aâ phròiseict. Uill, gu dearbh: tha sinn mu thrĂ th aâ cleachdadh SQL Server, agus bhiodh e dòigh air choireigin mĂŹ-reusanta gun na h-innealan ETL aca a chleachdadh. Tha a h-uile dad math: tha an dĂ chuid an eadar-aghaidh brèagha, agus tha an adhartas ag aithris ... Ach chan e seo as coireach gu bheil sinn dèidheil air bathar-bog, o, chan ann airson seo. Tionndadh e
dtsx(is e sin XML le nodan air an gluasad air sĂ bhaladh) is urrainn dhuinn, ach dè aâ phuing a thâ ann? Dè mu dheidhinn pasgan gnĂŹomh a dhèanamh a tharraingeas ceudan de chlĂ ran bho aon fhrithealaiche gu frithealaiche eile? Seadh, dè ceud, tuitidh do mheur-chlĂ r Ă fichead pĂŹos, aâ briogadh air putan na luchaige. Ach tha e gu cinnteach aâ coimhead nas fhasanta:
Bha sinn gu cinnteach aâ coimhead airson dòighean a-mach. CĂšis eadhon cha mhòr thĂ inig mi gu gineadair pacaid SSIS fèin-sgrĂŹobhte ...
âŚagus an uairsin lorg obair Ăšr mi. Agus thug Apache Airflow thairis mi air.
Nuair a fhuair mi a-mach gur e còd Python sĂŹmplidh a thâ ann an tuairisgeulan pròiseas ETL, cha do rinn mi dannsa airson toileachas. Seo mar a bha sruthan dĂ ta air an tionndadh agus air an eadar-dhealachadh, agus thĂ inig dòrtadh bĂšird le aon structar bho cheudan de stòran-dĂ ta gu aon targaid gu bhith na chĂšis de chòd Python ann an aon gu leth no dhĂ de scrionaichean 13 âł.
A 'cruinneachadh na buidhne
Nach cuir sinn air dòigh sgoil-Ă raich gu tur, agus na bi aâ bruidhinn mu dheidhinn rudan gu tur follaiseach an seo, leithid stĂ ladh Airflow, an stòr-dĂ ta a thagh thu, Soilleir agus cĂšisean eile a tha air am mĂŹneachadh anns na docaichean.
Gus an urrainn dhuinn deuchainnean a thòiseachadh sa bhad, rinn mi sgeidse docker-compose.yml anns a bheil:
- DĂŹreach togaidh sinn suas Airflow: ClĂ r-ama, frithealaiche lĂŹn. Bidh Flower cuideachd aâ snĂŹomh an sin gus sĂšil a chumail air gnĂŹomhan soilire (seach gu bheil e air a phutadh a-steach mu thrĂ th
apache/airflow:1.10.10-python3.7, ach chan eil dragh againn) - PostgreSQL, anns am bi Airflow aâ sgrĂŹobhadh am fiosrachadh seirbheis aige (dĂ ta clĂ raiche, staitistig cur an gnĂŹomh, msaa), agus comharraichidh Celery gnĂŹomhan crĂŹochnaichte;
- Redis, a bhios mar neach-malairt gnĂŹomhan airson Celery;
- Neach-obrach soilire, a bhios an sĂ s ann an coileanadh dĂŹreach gnĂŹomhan.
- Gu pasgan
./dagscuiridh sinn na faidhlichean againn leis an tuairisgeul air dags. Thèid an togail air an itealan, agus mar sin chan fheumar an stac gu lèir a cheangal às deidh gach sreothartaich.
Ann an cuid de dh'à iteachan, chan eil an còd anns na h-eisimpleirean air a shealltainn gu tur (gus nach cuir thu dragh air an teacsa), ach an à iteigin tha e air atharrachadh sa phròiseas. Gheibhear eisimpleirean de chòd obrachaidh iomlan anns an stòr .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerNotaichean:
- Ann an co-chruinneachadh an sgrÏobhaidh, bha mi gu mòr an urra ris an Ïomhaigh ainmeil - bi cinnteach gun toir thu sÚil air. Is dòcha nach eil feum agad air dad sam bith eile nad bheatha.
- Tha a h-uile suidheachadh Airflow ri fhaighinn chan ann a-mhĂ in troimhe
airflow.cfg, ach cuideachd tro chaochladairean Ă rainneachd (taing don luchd-leasachaidh), a ghabh mi gu mĂŹ-fhortanach brath air. - Gu nĂ darra, chan eil e deiseil airson cinneasachadh: cha do chuir mi buillean cridhe air soithichean a dhâaona ghnothach, cha do chuir mi dragh air tèarainteachd. Ach rinn mi an ĂŹre as lugha a bha iomchaidh airson ar luchd-deuchainn.
- Thoir fa-near:
- Feumaidh am pasgan dag a bhith ruigsinneach don chlĂ r-ama agus don luchd-obrach.
- Tha an aon rud a 'buntainn ris a h-uile treas-phĂ rtaidh leabharlannan - feumaidh iad uile a bhith air a stĂ ladh air innealan le clĂ r-ama agus luchd-obrach.
Uill, a-nis tha e sĂŹmplidh:
$ docker-compose up --scale worker=3Ăs deidh a h-uile cĂ il èirigh, faodaidh tu coimhead air an eadar-aghaidh lĂŹn:
- Sruth adhair:
- FlĂšr:
Bun-bheachdan bunaiteach
Mura do thuig thu dad anns na âdagaicheanâ sin uile, seo faclair goirid:
- Prògramadair - an uncail as cudromaiche ann an Airflow, a bhios a 'riaghladh gu bheil innealan-fuadain ag obair gu cruaidh, agus chan e duine: a' cumail sÚil air a 'chlà r-ama, ag Úrachadh bhiodagan, a' cur air bhog gnÏomhan.
San fharsaingeachd, ann an dreachan nas sine, bha duilgheadasan aige le cuimhne (chan e, chan e amnesia, ach aoidion) agus dhâ fhan am paramadair dĂŹleab eadhon anns na configs
run_duration- an eadar-ama ath-thòiseachadh aige. Ach a-nis tha a h-uile dad gu math. - DAG (aka âdagâ) - âgraf acyclic stiĂširichteâ, ach innsidh mĂŹneachadh mar seo glè bheag de dhaoine, ach gu dearbh tha e na ghobhar airson gnĂŹomhan a tha ag eadar-obrachadh le chèile (faic gu h-ĂŹosal) no analogue de phasgan ann an SSIS agus Sruth-obrach ann an Informatica .
A bharrachd air dagaichean, 's dòcha gum bi fo-bhuidhnean ann fhathast, ach is coltaiche nach fhaigh sinn thuca.
- DAG Ruith - daga tòiseachaidh, a tha air a shònrachadh fhèin
execution_date. Faodaidh dagrans den aon bhiod obrachadh aig an aon à m (ma tha thu air do ghnÏomhan a dhèanamh neo-chomasach, gu dearbh). - Operator nam pÏosan còd le uallach airson gnÏomh sònraichte a choileanadh. Tha trÏ seòrsaichean de ghnÏomhaichean ann:
- gnĂŹomhamar am fear as fheĂ rr leinn
PythonOperator, as urrainn còd Python (dligheach) sam bith a chuir an gnĂŹomh; - gluasad, a bhios aâ giĂšlan dĂ ta bho Ă ite gu Ă ite, can,
MsSqlToHiveTransfer; - mothachadh air an lĂ imh eile, leigidh e leat freagairt no slaodadh sĂŹos air coileanadh aâ bhiod a bharrachd gus an tachair tachartas.
HttpSensoris urrainn dhaibh an ceann-uidhe ainmichte a tharraing, agus nuair a tha am freagairt a tha thu ag iarraidh aâ feitheamh, tòisich air aâ ghluasadGoogleCloudStorageToS3Operator. Bidh inntinn fiosrachail aâ faighneachd: âcarson? Ăs deidh na h-uile, faodaidh tu ath-aithris a dhèanamh ceart anns a âghnĂŹomhaiche!â Agus an uairsin, gus nach cuir thu bacadh air an raon de ghnĂŹomhan le gnĂŹomhaichean crochte. Bidh an sensor aâ tòiseachadh, aâ sgrĂšdadh agus aâ bĂ sachadh ron ath oidhirp.
- gnĂŹomhamar am fear as fheĂ rr leinn
- GnĂŹomh - tha luchd-obrachaidh dearbhte, ge bith dè an seòrsa, agus a tha ceangailte ris aâ bhiod air an Ă rdachadh gu ĂŹre na h-obrach.
- eisimpleir de ghnĂŹomhan - nuair a cho-dhĂšin an dealbhaiche coitcheann gu robh an t-Ă m ann gnĂŹomhan a chuir gu cath air luchd-ciĂšil (dĂŹreach san spot, ma chleachdas sinn
LocalExecutorno gu nĂłd iomallach ann an cĂšisCeleryExecutor), bidh e aâ sònrachadh co-theacs dhaibh (ie, seata de chaochladairean - paramadairean buileachaidh), aâ leudachadh teamplaidean Ă ithne no ceist, agus gan cruinneachadh.
Bidh sinn aâ cruthachadh ghnĂŹomhan
An toiseach, bheir sinn cunntas air sgeama coitcheann ar doug, agus an uairsin bidh sinn a âdĂ ibheadh ââââa-steach don fhiosrachadh barrachd is barrachd, oir bidh sinn aâ cleachdadh cuid de fhuasglaidhean nach eil cho beag.
Mar sin, anns an fhoirm as sĂŹmplidhe, seallaidh a leithid de bhiodag mar seo:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Nach dèan sinn a-mach e:
- An toiseach, bidh sinn aâ toirt a-steach na libs riatanach agus rud eile;
sql_server_dsA bheilList[namedtuple[str, str]]le ainmean nan ceanglaichean bho Airflow Connections agus na stòran-dĂ ta Ă s an toir sinn ar truinnsear;dagâ foillseachadh ar biodag, a dh' fheumas a bhi a stighglobals(), air neo cha lorgar Airflow e. Feumaidh Doug cuideachd a rĂ dh:- ciod is ainm dha
orders- nochdaidh an t-ainm seo an uairsin anns an eadar-aghaidh lĂŹn, - gun oibrich e o mheadhon oidhche air an ochdamh la de'n Iuchar,
- agus bu chòir dha ruith, timcheall air a h-uile 6 uairean (airson balaich duilich an seo an à ite
timedelta()ceadaichtecron- loidhne0 0 0/6 ? * * *, airson an fheadhainn nach eil cho fionnar - abairt mar@daily);
- ciod is ainm dha
workflow()nĂŹ am prĂŹomh obair, ach chan ann a-nis. Airson a-nis, bidh sinn dĂŹreach aâ tilgeil ar co-theacsa dhan loga.- Agus a-nis an draoidheachd shĂŹmplidh airson gnĂŹomhan a chruthachadh:
- tha sinn a' ruith troimh ar tobraichean ;
- toiseachadh
PythonOperator, a chuireas an gnĂŹomh ar dummyworkflow(). Na dĂŹ-chuimhnich ainm sònraichte (taobh a-staigh an daga) den obair a shònrachadh agus an daga fhèin a cheangal. Bratachprovide_contextan uair sin, dòirtidh sinn argamaidean a bharrachd a-steach don ghnĂŹomh, a chruinnicheas sinn gu faiceallach aâ cleachdadh**context.
Airson a-nis, tha sin uile. Na fhuair sinn:
- dag Ăšr san eadar-aghaidh lĂŹn,
- ceud gu leth gnÏomh a thèid a chuir gu bàs aig an aon à m (ma cheadaicheas na roghainnean Airflow, Celery agus comas frithealaiche).
Uill, cha mhòr nach dâ fhuair.

Cò a stà laicheas na h-eisimeileachd?
Gus an rud gu lèir seo a dhèanamh nas sÏmplidhe, chuir mi a-steach e docker-compose.yml giollachd requirements.txt air na h-uile nithibh.
A-nis tha e air falbh:

Tha ceĂ rnagan glasa nan eisimpleirean gnĂŹomh air an giullachd leis aâ chlĂ r-ama.
Bidh sinn a 'feitheamh beagan, bidh an luchd-obrach a' toirt seachad na gnĂŹomhan:

Tha an fheadhainn uaine, gu dearbh, air an obair a chrĂŹochnachadh gu soirbheachail. Chan eil dearg gu math soirbheachail.
Co-dhiĂš, chan eil pasgan sam bith air ar prod
./dags, chan eil sioncronadh eadar innealan - tha na dagaichean uile nan laighe a-steachgitair ar Gitlab, agus bidh Gitlab CI aâ sgaoileadh Ăšrachaidhean gu innealan nuair a thig iad còmhlamaster.
Beagan mu dheidhinn Flower
Fhad âs a tha an luchd-obrach aâ bualadh air na pacifiers againn, cuimhnichidh sinn inneal eile a sheallas rudeigin dhuinn - Flower.
Aâ chiad duilleag le fiosrachadh geĂ rr-chunntas air nodan luchd-obrach:

An duilleag as dian le gnĂŹomhan a chaidh gu obair:

An duilleag as dorra le inbhe ar broker:

Tha an duilleag as soilleire le grafaichean inbhe gnĂŹomh agus an Ăšine cur gu bĂ s:

Bidh sinn a 'luchdachadh an fheadhainn nach eil air an luchdachadh
Mar sin, gu bheil na gnÏomhan gu lèir air obrachadh a-mach, faodaidh tu an fheadhainn leòinte a thoirt air falbh.

Agus bha mòran leònte - airson adhbhar air choireigin. A thaobh cleachdadh ceart de Airflow, tha na fĂŹor cheĂ rnagan sin aâ nochdadh nach do rĂ inig an dĂ ta gu cinnteach.
Feumaidh tu coimhead air an loga agus ath-thòiseachadh na h-eisimpleirean gnÏomh a tha air tuiteam.
Le bhith aâ cliogadh air ceĂ rnag sam bith, chĂŹ sinn na gnĂŹomhan a tha rim faighinn dhuinn:

Faodaidh tu an fheadhainn a thuit a ghabhail agus a dhèanamh Glan. Is e sin, bidh sinn aâ dĂŹochuimhneachadh gu bheil rudeigin air fĂ iligeadh an sin, agus thèid an aon ghnĂŹomh eisimpleir chun chlĂ r-ama.

Tha e soilleir nach eil e gu math daonnach a bhith aâ dèanamh seo leis an luchag leis na ceĂ rnagan dearga gu lèir - chan e seo a tha sinn aâ dĂšileachadh bho Airflow. Gu nĂ darra, tha armachd lèir-sgrios againn: Browse/Task Instances

Taghamaid a h-uile cĂ il aig an aon Ă m agus ath-shuidhich sinn gu neoni, cliog air an rud cheart:

Ăs deidh glanadh, tha na tacsaidhean againn a âcoimhead mar seo (tha iad mu thrĂ th aâ feitheamh ris an neach-clĂ raidh an clĂ radh):

Ceanglaichean, dubhan agus caochladairean eile
Tha an t-Ă m ann coimhead air an ath DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ĐĐžŃпОда Ń
ĐžŃĐžŃио, ĐžŃŃĐľŃŃ ĐžĐąĐ˝ĐžĐ˛ĐťĐľĐ˝Ń"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ĐаŃаŃ, ĐżŃĐžŃŃпаКŃŃ, ĐźŃ {{ dag.dag_id }} ŃŃОниНи
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]A bheil a h-uile duine a-riamh air Úrachadh aithisg a dhèanamh? Seo i a-rithist: tha liosta de thobraichean bho far am faighear an dà ta; tha liosta ann far an cuir thu; na dÏ-chuimhnich urram a thoirt nuair a thachair no a bhris a h-uile cà il (uill, chan eil seo mu ar deidhinn, chan eil).
Rachamaid tron ââââfhaidhle a-rithist agus coimhead air na rudan Ăšra doilleir:
from commons.operators import TelegramBotSendMessage- chan eil dad aâ cur stad oirnn bho bhith aâ dèanamh ar gnĂŹomhaichean fhèin, rud a ghabh sinn brath air le bhith aâ dèanamh pasgan beag airson teachdaireachdan a chuir gu Unblocked. (Bruidhnidh sinn barrachd mun ghnĂŹomhaiche seo gu h-ĂŹosal);default_args={}- faodaidh dag na h-aon argamaidean a sgaoileadh air a h-uile gnĂŹomhaiche;to='{{ var.value.all_the_kings_men }}'- achadhtocha bhith sinn air còd cruaidh, ach air a ghineadh gu dinamach aâ cleachdadh Jinja agus caochladair le liosta de phuist-d, a chuir mi a-steach gu faiceallachAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS- suidheachadh airson an gnĂŹomhaiche a thòiseachadh. Anns a 'chĂšis againn, cha tèid an litir gu na ceannardan a-mhĂ in ma tha a h-uile eisimeileachd air obrachadh a-mach gu soirbheachail;tg_bot_conn_id='tg_main'- argamaideanconn_idgabh ri IDan ceangail a chruthaicheas sinn annAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- cha tèid teachdaireachdan ann an Telegram air falbh ach ma tha gnĂŹomhan air tuiteam;task_concurrency=1- bidh sinn aâ toirmeasg grunn eisimpleirean gnĂŹomh de aon ghnĂŹomh a chuir air bhog aig an aon Ă m. Rud eile, gheibh sinn grunn chuir air bhog aig an aon Ă mVerticaOperator(a 'coimhead air aon bhòrd);report_update >> [email, tg]- uileVerticaOperatortighinn còmhla ann a bhith aâ cur litrichean is teachdaireachdan, mar seo:

Ach leis gu bheil suidheachaidhean tòiseachaidh eadar-dhealaichte aig gnĂŹomhaichean fios, chan obraich ach aon. Ann an Tree View, tha a h-uile dad aâ coimhead beagan nas lugha de lèirsinn:

Canaidh mi beagan fhaclan mu dheidhinn macros agus an caraidean - caochladairean.
Tha Macros nan luchd-Ă ite Jinja a dhâ fhaodas diofar fhiosrachadh feumail a chuir an Ă ite argamaidean gnĂŹomhaiche. Mar eisimpleir, mar seo:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} leudaichidh e gu susbaint an caochladair co-theacsa execution_date anns a âchruth YYYY-MM-DD: 2020-07-14. Is e am pĂ irt as fheĂ rr gu bheil caochladairean co-theacsa air an ceangal ri eisimpleir gnĂŹomh sònraichte (ceĂ rnag ann an Tree View), agus nuair a thèid ath-thòiseachadh, leudaichidh an luchd-Ă ite gu na h-aon luachan.
Faodar na luachan ainmichte fhaicinn aâ cleachdadh aâ phutan Rendered air gach eisimpleir gnĂŹomh. Seo mar a tha an obair le bhith aâ cur litir:

Agus mar sin aig an obair le bhith aâ cur teachdaireachd:

Tha liosta iomlan de macros togte airson an dreach as Ăšire a tha ri fhaighinn ri fhaighinn an seo:
A bharrachd air an sin, le cuideachadh bho plugins, is urrainn dhuinn na macros againn fhèin ainmeachadh, ach sin sgeulachd eile.
A bharrachd air na rudan ro-mhÏnichte, is urrainn dhuinn luachan ar caochladairean a chuir an à ite (chleachd mi seo sa chòd gu h-à rd mu thrà th). Cruthaichidh sinn a-steach Admin/Variables rud no dhà :

A h-uile rud as urrainn dhut a chleachdadh:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Faodaidh an luach a bhith na sgalar, no faodaidh e a bhith JSON cuideachd. A thaobh JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}dĂŹreach cleachd an t-slighe chun an iuchair a tha thu ag iarraidh: {{ var.json.bot_config.bot.token }}.
Canaidh mi aon fhacal gu litireil agus seallaidh mi aon dealbh-sgrĂŹn mu dheidhinn ceanglaichean. Tha a h-uile dad bunaiteach an seo: air an duilleag Admin/Connections bidh sinn aâ cruthachadh ceangal, aâ cur ar logins / faclan-faire agus paramadairean nas sònraichte an sin. Mar seo:

Faodar faclan-faire a chrioptachadh (nas mionaidiche na an à bhaist), no faodaidh tu an seòrsa ceangail fhà gail a-mach (mar a rinn mi airson tg_main). ainm.
Faodaidh tu cuideachd grunn cheanglaichean a dhèanamh leis an aon ainm: sa chÚis seo, am modh BaseHook.get_connection(), a gheibh ceanglaichean dhuinn le ainm, bheir air thuaiream bho ghrunn ainmean (bhiodh e na bu loidsigeach Round Robin a dhèanamh, ach fà gamaid e air cogais luchd-leasachaidh Airflow).
Tha caochlaidhean agus Ceanglaichean gu cinnteach nan innealan fionnar, ach tha e cudromach gun a bhith a 'call an cothromachadh: dè na pĂ irtean de na sruthan agad a bhios tu a' stòradh sa chòd fhèin, agus dè na pĂ irtean a bheir thu dha Airflow airson a stòradh. Air an aon lĂ imh, faodaidh e a bhith goireasach an luach atharrachadh gu sgiobalta, mar eisimpleir, bogsa puist, tron ââââUI. Air an lĂ imh eile, tha seo fhathast na thilleadh gu cliog na luchaige, Ă s an robh sinn (mi) airson faighinn cuidhteas.
Is e obair le ceanglaichean aon de na gnÏomhan dubhan. San fharsaingeachd, tha dubhan Airflow nam puingean airson a cheangal ri seirbheisean treas-phà rtaidh agus leabharlannan. m.e., JiraHook fosglaidh neach-dèiligidh dhuinn gus eadar-obrachadh le Jira (faodaidh tu gnÏomhan a ghluasad air ais is air adhart), agus le cuideachadh bho SambaHook faodaidh tu faidhle ionadail a phutadh gu smb-puing.
A 'parsadh a' ghnĂŹomhaiche Ă bhaisteach
Agus thà inig sinn faisg air coimhead air mar a tha e air a dhèanamh TelegramBotSendMessage
còd a ' commons/operators.py leis an fhÏor ghnÏomhaiche:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)An seo, mar a h-uile cĂ il eile ann an Airflow, tha a h-uile dad gu math sĂŹmplidh:
- Air a shealbhachadh bho
BaseOperator, a bhios aâ buileachadh grunn rudan a tha sònraichte do Airflow (seall air do chur-seachad) - Raointean ainmichte
template_fields, anns am bi Jinja aâ coimhead airson macros airson a phròiseasadh. - Chuir e air dòigh na h-argamaidean ceart airson
__init__(), suidhich na roghainnean bunaiteach far a bheil sin riatanach. - Cha do dhĂŹochuimhnich sinn mu thĂšs an sinnsear nas motha.
- Dhâ fhosgail an dubhan co-fhreagarrach
TelegramBotHookfhuair e nÏ neach-dèiligidh bhuaithe. - Modh ath-mhÏnichte (ath-mhÏnichte).
BaseOperator.execute(), a bhios Airfow aâ tionndadh nuair a thig an t-Ă m airson aâ ghnĂŹomhaiche a chuir air bhog - annta cuiridh sinn am prĂŹomh ghnĂŹomh an gnĂŹomh, aâ dĂŹochuimhneachadh logadh a-steach. (Bidh sinn aâ logadh a-steach, co-dhiĂš, dĂŹreach a-steachstdoutиstderr- Bidh sruth-adhair aâ toirt a-steach a h-uile cĂ il, ga phasgadh gu breagha, ga lobhadh far a bheil sin riatanach.)
ChĂŹ sinn na thâ againn commons/hooks.py. Aâ chiad phĂ irt den fhaidhle, leis an dubhan fhèin:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientChan eil fios agam eadhon dè a mhÏnicheas mi an seo, bheir mi fa-near na puingean cudromach:
- Bidh sinn a 'sealbhachadh, a' smaoineachadh mu na h-argamaidean - sa mhòr-chuid de chÚisean bidh e mar aon:
conn_id; - Aâ dol thairis air modhan Ă bhaisteach: chuir mi bacadh orm fhĂŹn
get_conn(), anns am faigh mi na paramadairean ceangail a rèir ainm agus dĂŹreach faigh an earrannextra(is e seo raon JSON), anns an do chuir mi (a rèir an stiĂširidh agam fhĂŹn!) an tòcan bot Telegram:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Bidh mi aâ cruthachadh eisimpleir de ar
TelegramBot, a 'toirt comharradh sònraichte dha.
Sin e. Gheibh thu neach-dèiligidh bho dubhan a 'cleachdadh TelegramBotHook().clent no TelegramBotHook().get_conn().
Agus an dĂ rna pĂ irt den fhaidhle, anns am bi mi aâ dèanamh microwrapper airson an Telegram REST API, gus nach slaod mi an aon rud airson aon dòigh sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Is e an dòigh cheart a h-uile cà il a chur ris:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- anns aâ plugan, cuir a-steach stòr poblach, agus thoir gu Open Source e.
Fhad âs a bha sinn aâ sgrĂšdadh seo gu lèir, chaidh aig na h-Ăšrachaidhean aithisg againn air fĂ iligeadh gu soirbheachail agus chuir iad teachdaireachd mearachd thugam san t-sianal. Tha mi aâ dol a choimhead feuch a bheil e ceĂ rr...

Bhris rudeigin nar cĂš! Nach e sin a bha sinn an dĂšil? DĂŹreach!
A bheil thu aâ dol a dhòrtadh?
A bheil thu aâ faireachdainn gun do chaill mi rudeigin? Tha e coltach gun do gheall e dĂ ta a ghluasad bho SQL Server gu Vertica, agus an uairsin ghabh e e agus ghluais e far a 'chuspair, an scoundrel!
Bha an uamhas seo a dhâaona ghnothach, cha robh agam ach beagan briathrachais a mhĂŹneachadh dhut. A-nis faodaidh tu a dhol nas fhaide.
Bâ e seo ar plana:
- Dèan daga
- Cruthaich gnĂŹomhan
- Faic cho breagha sa tha a h-uile dad
- Sònraich à ireamhan seisean ri lÏonadh
- Faigh dĂ ta bho SQL Server
- Cuir dĂ ta ann an Vertica
- Cruinnich staitistig
Mar sin, airson seo a thoirt gu buil, chuir mi beagan ris an fheadhainn againn docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyAn sin tha sinn a 'togail:
- Vertica mar aoigh
dwhleis na roghainnean as bunaitiche, - trĂŹ eisimpleirean de SQL Server,
- bidh sinn aâ lĂŹonadh na stòran-dĂ ta anns an fhear mu dheireadh le beagan dĂ ta (gun fhios nach coimhead thu a-steach
mssql_init.py!)
Bidh sinn aâ cur air bhog a h-uile rud math le cuideachadh bho Ă ithne beagan nas iom-fhillte na an turas mu dheireadh:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Na chruthaich an randomizer mĂŹorbhuileach againn, faodaidh tu an rud a chleachdadh Data Profiling/Ad Hoc Query:

Is e am prĂŹomh rud gun a bhith ga shealltainn do luchd-anailis
mion-sgrĂšdadh air Seiseanan ETL Cha dèan, tha a h-uile dad beag an sin: bidh sinn aâ dèanamh bunait, tha soidhne ann, bidh sinn aâ cuairteachadh a h-uile cĂ il le manaidsear co-theacsa, agus a-nis bidh sinn aâ dèanamh seo:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15seisean.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passTha an t-à m air tighinn cruinnich ar dà ta o'n ceud gu leth clà r. Feuch an dèan sinn seo le cuideachadh bho loidhnichean fÏor mhÏ-mhisneachail:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Le cuideachadh bho dubhan gheibh sinn bho Airflow
pymssql- ceangal - Leig leinn cuingealachadh ann an cruth ceann-latha a chuir a-steach don iarrtas - thèid a thilgeil a-steach don ghnÏomh leis an einnsean teamplaid.
- A 'biathadh ar n-iarrtas
pandascò gheibh sinnDataFrame- bidh e feumail dhuinn san à m ri teachd.
Tha mi aâ cleachdadh ionadachadh
{dt}an Ă ite paramadair iarrtas%schan ann a chionn 's gur e Pinocchio olc a th' annam, ach air sgĂ thpandaschan urrainn a lĂ imhseachadhpymssqlagus sleamhnaich am fear mu dheireadhparams: Listged a tha e dha-rĂŹribh ag iarraidhtuple.
Thoir fa-near cuideachd gu bheil an leasaichepymssqlcho-dhĂšin e gun a bhith aâ toirt taic dha tuilleadh, agus tha an t-Ă m ann gluasad a-machpyodbc.
ChÏ sinn dè a lÏon Airflow argamaidean ar gnÏomhan le:

Mura h-eil dà ta ann, chan eil adhbhar ann leantainn air adhart. Ach tha e neònach cuideachd beachdachadh air an lÏonadh soirbheachail. Ach chan e mearachd a tha seo. A-ah-ah, dè a nÏ thu?! Agus seo na tha:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException ag innse do Airflow nach eil mearachdan ann, ach gun leum sinn air aâ ghnĂŹomh. Cha bhi ceĂ rnag uaine no dearg air an eadar-aghaidh, ach pinc.
Tilgeamaid an dĂ ta againn ioma colbhan:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Is e sin:
- An stòr-dà ta às an do ghabh sinn na h-òrdughan,
- ID den t-seisean tuiltean againn (bidh e eadar-dhealaichte airson gach obair),
- Hash bhon stòr agus ID òrduigh - gus am bi ID òrdugh sònraichte againn anns an stòr-dà ta mu dheireadh (far a bheil a h-uile cà il air a dhòrtadh ann an aon bhòrd).
Tha an dà rna ceum fhathast: dòrtadh a h-uile cà il a-steach gu Vertica. Agus, gu neònach gu leòr, is e CSV aon de na dòighean as iongantaiche agus as èifeachdaiche seo a dhèanamh!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Tha sinn aâ dèanamh inneal-glacaidh sònraichte
StringIO. pandascuiridh sinn gu caoimhneilDataFrameanns an riochdCSV-loidhnichean.- Fosglaidh sinn ceangal ris an Vertica as fheĂ rr leinn le dubhan.
- Agus a-nis le cuideachadh
copy()cuir an dĂ ta againn gu dĂŹreach gu Vertika!
Bheir sinn bhon draibhear cia mheud loidhne a chaidh a lÏonadh, agus innis do mhanaidsear an t-seisein gu bheil a h-uile dad ceart gu leòr:
session.loaded_rows = cursor.rowcount
session.successful = TrueSin e.
Air an reic, bidh sinn a 'cruthachadh a' phlĂ ta targaid le lĂ imh. An seo leig mi leam inneal beag:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)Tha mi a 'cleachdadh
VerticaOperator()Bidh mi aâ cruthachadh sgeama stòr-dĂ ta agus clĂ r (mura h-eil iad ann mar-thĂ , gu dearbh). Is e am prĂŹomh rud na eisimeileachd a chuir air dòigh gu ceart:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadA 'togail suas
â Uill, â ars' an luch bheag, â nach 'eil, a nis
A bheil thu cinnteach gur mise am beathach as uamhasach sa choille?
Julia Donaldson, An Gruffalo
Tha mi aâ smaoineachadh nam biodh farpais aig mo cho-obraichean agus mise: cò a chruthaicheas agus a chuireas air bhog pròiseas ETL bhon fhĂŹor thoiseach: iadsan leis an SSIS agus an luchag agus mise le Airflow ... Agus an uairsin bhiodh sinn cuideachd aâ dèanamh coimeas eadar cho furasta agus a tha e cumail suas ... Wow, tha mi aâ smaoineachadh gun aontaich thu gun dèan mi aâ chĂšis orra air a h-uile taobh!
Ma tha e beagan nas cudromaiche, rinn Apache Airflow - le bhith aâ toirt cunntas air pròiseasan ann an cruth còd prògram - an obair agam mòran nas comhfhurtail agus nas tlachdmhoire.
Tha an leudachadh gun chrĂŹoch, an dĂ chuid a thaobh plug-ins agus ro-shealladh air scalability, aâ toirt cothrom dhut sruth-adhair a chleachdadh ann an cha mhòr raon sam bith: eadhon anns a âchearcall iomlan de bhith aâ tional, ag ullachadh agus a âgiullachd dĂ ta, eadhon ann a bhith aâ cur air bhog rocaidean (gu Mars, de cĂšrsa).
PĂ irt dheireannach, iomradh agus fiosrachadh
An rĂ can a chruinnich sinn dhut
start_date. Tha, is e meme ionadail a tha seo mu thrà th. Tro phrÏomh argamaid Dougstart_dateuile seachad. Ann an Úine ghoirid, ma shònraicheas tu a-steachstart_dateceann-latha là ithreach, agusschedule_interval- aon latha, an uairsin tòisichidh DAG a-mà ireach gun a bhith nas trà ithe.start_date = datetime(2020, 7, 7, 0, 1, 2)Agus chan eil barrachd dhuilgheadasan ann.
Tha mearachd runtime eile co-cheangailte ris:
Task is missing the start_date parameter, a tha mar as trice a 'nochdadh gun do dhÏochuimhnich thu ceangal a dhèanamh ris a' ghnÏomhaiche biodag.- A h-uile còmhla ris air aon inneal. Tha, agus bunaitean (Airflow fhèin agus an còmhdach againn), agus frithealaiche lÏn, agus clà r-ama, agus luchd-obrach. Agus dh'obraich e eadhon. Ach thar Úine, dh'fhàs an à ireamh de ghnÏomhan airson seirbheisean, agus nuair a thòisich PostgreSQL a 'freagairt ris a' chlà r-amais ann an 20 s an à ite 5 ms, thug sinn air falbh e agus thug sinn air falbh e.
- GnĂŹomhaiche Ionadail. Tha, tha sinn fhathast nar suidhe air, agus tha sinn mu thrĂ th air tighinn gu oir an dubh-aigein. Tha LocalExecutor air a bhith gu leòr dhuinn gu ruige seo, ach a-nis tha an t-Ă m ann leudachadh le co-dhiĂš aon neach-obrach, agus feumaidh sinn a bhith ag obair gu cruaidh gus gluasad gu CeleryExecutor. Agus leis gun urrainn dhut obrachadh leis air aon inneal, chan eil dad aâ cur stad ort bho bhith aâ cleachdadh Celery eadhon air frithealaiche, nach bi âgu dearbh, aâ dol a-steach gu cinneasachadh, gu h-onarach! â
- Neo-chleachdadh innealan togte:
- Connections gus teisteanasan seirbheis a stòradh,
- SLA ag ionndrainn freagairt a thoirt do ghnĂŹomhan nach do dhâobraich ann an tĂŹde,
- xcom airson iomlaid meata-dĂ ta (thuirt mi metadĂ ta!) eadar gnĂŹomhan dag.
- MĂŹ-ghnĂ thachadh puist. Uill, dè as urrainn dhomh a rĂ dh? Chaidh rabhaidhean a chuir air dòigh airson gach ath-aithris de ghnĂŹomhan a thuit. A-nis tha an obair agam aig Gmail > 90k puist-d bho Airflow, agus tha am post post-lĂŹn aâ diĂšltadh barrachd air 100 a thogail agus a dhubhadh Ă s aig an aon Ă m.
Barrachd dhuilgheadasan:
Barrachd innealan fèin-ghluasaid
Gus an obraich sinn eadhon nas motha le ar cinn agus chan ann le ar lĂ mhan, tha Airflow air seo ullachadh dhuinn:
- - tha inbhe Experimental aige fhathast, nach eil a 'cur bacadh air bho bhith ag obair. Leis, chan e a-mhà in gum faigh thu fiosrachadh mu bhiodagan agus gnÏomhan, ach cuideachd stad / tòisich daga, cruthaich DAG Run no amar.
- - tha mòran innealan rim faighinn tron ââââloidhne-Ă ithne nach eil dĂŹreach mĂŹ-ghoireasach a chleachdadh tron ââââWebUI, ach a tha sa chumantas neo-lĂ thaireach. Mar eisimpleir:
backfilla dhÏth gus eisimpleirean de ghnÏomhan ath-thòiseachadh.
Mar eisimpleir, thĂ inig luchd-anailis agus thuirt iad: âAgus thusa, a chompanach, tha neòinean anns an dĂ ta bho 1 Faoilleach gu 13! Ceartaich e, socraich e, socraich e, socraich e!â Agus tha thu nad hob mar sin:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Seirbheis bunaiteach:
initdb,resetdb,upgradedb,checkdb. run, a leigeas leat aon ghnÏomh eisimpleir a ruith, agus eadhon sgòr a dhèanamh air a h-uile eisimeileachd. A bharrachd air an sin, faodaidh tu a ruith troLocalExecutor, fiÚ 's ma tha brabhsair Celery agad.- A 'dèanamh gu Ïre mhòr an aon rud
test, a-mhĂ in cuideachd anns na bunaitean chan eil e aâ sgrĂŹobhadh dad. connectionsa 'ceadachadh ceanglaichean a chruthachadh bhon t-slige.
- - dòigh caran cruaidh air eadar-obrachadh, a thathar an dĂšil airson plugins, agus gun a bhith aâ snĂ mh ann le lĂ mhan beaga. Ach cò a tha gus stad a chuir oirnn bho bhith aâ dol
/home/airflow/dagsruithipythonagus tòisich aâ dol mun cuairt? Faodaidh tu, mar eisimpleir, Ă s-mhalairt a h-uile ceangal leis aâ chòd a leanas:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Aâ ceangal ri stòr-dĂ ta meata-dĂ ta Airflow. Chan eil mi aâ moladh sgrĂŹobhadh thuige, ach faodaidh a bhith aâ faighinn stĂ itean gnĂŹomh airson grunn mheatairean sònraichte a bhith fada nas luaithe agus nas fhasa na tro aon de na APIan.
Canaidh sinn nach eil a h-uile gnĂŹomh againn neo-chomasach, ach uaireannan faodaidh iad tuiteam, agus tha seo Ă bhaisteach. Ach tha cuid de bhacaidhean mar-thĂ amharasach, agus bhiodh feum air sgrĂšdadh.
Thoir an aire SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
iomraidhean
Agus gu dearbh, is e aâ chiad deich ceanglaichean bho sgaoileadh Google na tha anns aâ phasgan Airflow bho na comharran-leabhair agam.
- - gu dearbh, feumaidh sinn tòiseachadh leis an oifis. sgrÏobhainnean, ach cò a leughas an stiÚireadh?
- - Uill, co-dhiĂš leugh na molaidhean bhon luchd-cruthachaidh.
- - an fhĂŹor thoiseach: an eadar-aghaidh cleachdaiche ann an dealbhan
- - tha na bun-bheachdan air an deagh mhĂŹneachadh, ma tha (gu h-obann!) Cha do thuig thu rudeigin bhuam.
- - stiĂšireadh goirid mu bhith aâ stèidheachadh cruinneachadh Airflow.
- - cha mhòr an aon artaigil inntinneach, ach a-mhà in 's dòcha barrachd foirmeileachd, agus nas lugha de eisimpleirean.
- - mu bhith ag obair còmhla ri Celery.
- - mu neo-chomasachd ghnĂŹomhan, luchdachadh le ID an Ă ite ceann-latha, cruth-atharrachadh, structar faidhle agus rudan inntinneach eile.
- - eisimeileachd ghnĂŹomhan agus Riaghailt Trigger, air an tug mi iomradh a-mhĂ in nuair a chaidh mi seachad.
- - mar a gheibh thu thairis air cuid de âobraichean mar a bha dĂšilâ anns a âchlĂ r-ama, luchdaich dĂ ta a chaidh air chall agus prĂŹomhachas a thoirt do ghnĂŹomhan.
- - ceistean feumail SQL gu meata-dĂ ta Airflow.
- - tha earrann feumail ann mu bhith aâ cruthachadh sensor Ă bhaisteach.
- - nota goirid inntinneach mu bhith aâ togail bun-structair air AWS airson Saidheans DĂ ta.
- - mearachdan cumanta (nuair nach eil cuideigin fhathast a 'leughadh an stiĂšireadh).
- - gĂ ire mar a bhios daoine a âstòradh faclan-faire, ged as urrainn dhut dĂŹreach Ceanglaichean a chleachdadh.
- - cuir air adhart DAG so-thuigsinn, tilgeil co-theacsa ann an gnĂŹomhan, a-rithist mu eisimeileachd, agus cuideachd mu bhith aâ leum air bhog gnĂŹomhan.
- - mu dheidhinn cleachdadh
default argumentsиparamsann an teamplaidean, a bharrachd air caochlaidhean agus ceanglaichean. - - sgeulachd mu mar a tha an dealbhaiche ag ullachadh airson Airflow 2.0.
- - artaigil a tha beagan seann-fhasanta mu bhith aâ cleachdadh ar cruinneachadh ann an
docker-compose. - - gnĂŹomhan fiĂšghantach aâ cleachdadh teamplaidean agus cur air adhart co-theacsa.
- - fiosan Ă bhaisteach agus Ă bhaisteach tron ââââphost agus Slack.
- - GnĂŹomhan meur, macros agus XCom.
Agus na ceanglaichean a chleachdar san artaigil:
- - luchd-Ă ite rim faighinn airson an cleachdadh ann an teamplaidean.
- - Mearachdan cumanta nuair a bhios tu aâ cruthachadh bhiodagan.
- -
docker-composeairson deuchainnean, debugging agus barrachd. - - Còmhdach Python airson Telegram REST API.
Source: www.habr.com




