Helo, Dmitry Logvinenko ydw i - Peiriannydd Data Adran Ddadansoddeg grŵp cwmnïau Vezet.
Byddaf yn dweud wrthych am offeryn gwych ar gyfer datblygu prosesau ETL - Apache Airflow. Ond mae Airflow mor amlbwrpas ac amlochrog fel y dylech edrych yn agosach arno hyd yn oed os nad ydych yn ymwneud â llif data, ond bod angen lansio unrhyw brosesau o bryd i'w gilydd a monitro eu gweithrediad.
Ac ie, byddaf nid yn unig yn dweud, ond hefyd yn dangos: mae gan y rhaglen lawer o god, sgrinluniau ac argymhellion.

Beth fyddwch chi'n ei weld fel arfer pan fyddwch chi'n google y gair Airflow / Wikimedia Commons
Tabl cynnwys
Cyflwyniad
Mae Apache Airflow yn union fel Django:
- wedi ei ysgrifennu yn python
- mae yna banel gweinyddol gwych,
- ehangu am gyfnod amhenodol
- dim ond yn well, ac fe'i gwnaed i ddibenion hollol wahanol, sef (fel y mae'n ysgrifenedig cyn y kata):
- rhedeg a monitro tasgau ar nifer anghyfyngedig o beiriannau (fel y bydd llawer o Seleri / Kubernetes a'ch cydwybod yn caniatáu ichi)
- gyda chynhyrchu llif gwaith deinamig o hawdd iawn i ysgrifennu a deall cod Python
- a'r gallu i gysylltu unrhyw gronfeydd data ac APIs â'i gilydd gan ddefnyddio cydrannau parod ac ategion cartref (sy'n hynod o syml).
Rydym yn defnyddio Apache Airflow fel hyn:
- rydym yn casglu data o wahanol ffynonellau (llawer o achosion SQL Server a PostgreSQL, APIs amrywiol gyda metrigau cais, hyd yn oed 1C) yn DWH ac ODS (mae gennym Vertica a Clickhouse).
- pa mor ddatblygedig
cron, sy'n cychwyn y prosesau cydgrynhoi data ar yr ODS, a hefyd yn monitro eu cynnal.
Tan yn ddiweddar, roedd ein hanghenion yn cael eu cwmpasu gan un gweinydd bach gyda 32 cores a 50 GB o RAM. Mewn Llif Awyr, mae hyn yn gweithio:
- mwy 200 o dagiau (llifoedd gwaith mewn gwirionedd, lle buom yn stwffio tasgau),
- ym mhob un ar gyfartaledd 70 o dasgau,
- mae'r daioni hwn yn dechrau (hefyd ar gyfartaledd) unwaith yr awr.
Ac am sut y gwnaethom ehangu, byddaf yn ysgrifennu isod, ond yn awr gadewch i ni ddiffinio'r über-problem y byddwn yn ei datrys:
Mae yna dri Gweinyddwr SQL gwreiddiol, pob un â 50 o gronfeydd data - enghreifftiau o un prosiect, yn y drefn honno, mae ganddyn nhw'r un strwythur (bron ym mhobman, mua-ha-ha), sy'n golygu bod gan bob un dabl Gorchmynion (yn ffodus, tabl gyda hynny gellir gwthio enw i mewn i unrhyw fusnes). Rydym yn cymryd y data trwy ychwanegu meysydd gwasanaeth (gweinydd ffynhonnell, cronfa ddata ffynhonnell, ID tasg ETL) ac yn eu taflu'n naïf i, dyweder, Vertica.
Gadewch i ni fynd!
Y brif ran, ymarferol (ac ychydig yn ddamcaniaethol)
Pam ydyn ni (a chi)
Pan oedd y coed yn fawr ac roeddwn i'n syml SQL-schik mewn un adwerthu yn Rwsia, fe wnaethon ni sgamio prosesau ETL fel llif data gan ddefnyddio dau offeryn sydd ar gael i ni:
- Canolfan Bwer Informatica - system hynod o wasgaredig, hynod gynhyrchiol, gyda'i chaledwedd ei hun, ei fersiwn ei hun. Defnyddiais Dduw yn gwahardd 1% o'i alluoedd. Pam? Wel, yn gyntaf oll, mae'r rhyngwyneb hwn, rhywle o'r 380au, yn feddyliol wedi rhoi pwysau arnom ni. Yn ail, mae'r contraption hwn wedi'i gynllunio ar gyfer prosesau hynod ffansi, ailddefnyddio cydrannau ffyrnig a thriciau menter-pwysig iawn eraill. Ynglŷn â'r ffaith ei fod yn costio, fel adain yr Airbus AXNUMX / blwyddyn, ni fyddwn yn dweud unrhyw beth.
Byddwch yn ofalus, gall sgrinlun brifo pobl o dan 30 ychydig

- SQL Gweinyddwr Integreiddio Gweinydd - defnyddiwyd y cymrawd hwn yn ein llifoedd o fewn y prosiect. Wel, mewn gwirionedd: rydym eisoes yn defnyddio SQL Server, a byddai'n afresymol rhywsut i beidio â defnyddio ei offer ETL. Mae popeth ynddo yn dda: mae'r rhyngwyneb yn brydferth, ac mae'r adroddiadau cynnydd ... Ond nid dyma pam rydyn ni'n caru cynhyrchion meddalwedd, o, nid ar gyfer hyn. Fersiwn iddo
dtsx(sef XML gyda nodau siffrwd ar gadw) gallwn, ond beth yw'r pwynt? Beth am wneud pecyn tasg a fydd yn llusgo cannoedd o dablau o un gweinydd i'r llall? Ie, am gant, bydd eich mynegfys yn disgyn oddi ar ugain darn, gan glicio ar fotwm y llygoden. Ond mae'n bendant yn edrych yn fwy ffasiynol:
Yn sicr fe wnaethon ni chwilio am ffyrdd allan. Achos hyd yn oed bron daeth i gynhyrchydd pecyn SSIS hunan-ysgrifenedig ...
…ac yna daeth swydd newydd o hyd i mi. A goddiweddodd Apache Airflow fi arno.
Pan wnes i ddarganfod bod disgrifiadau proses ETL yn god Python syml, wnes i ddim dawnsio er llawenydd. Dyma sut y cafodd ffrydiau data eu fersiwnio a'u gwahaniaethu, a daeth arllwys tablau gydag un strwythur o gannoedd o gronfeydd data i mewn i un targed yn fater o god Python mewn un a hanner neu ddwy sgrin 13”.
Cydosod y clwstwr
Gadewch i ni beidio â threfnu kindergarten yn gyfan gwbl, a pheidio â siarad am bethau cwbl amlwg yma, fel gosod Airflow, y gronfa ddata o'ch dewis, Seleri ac achosion eraill a ddisgrifir yn y dociau.
Er mwyn i ni allu dechrau arbrofion ar unwaith, brasluniais docker-compose.yml lle:
- Gadewch i ni godi mewn gwirionedd Llif aer: Trefnydd, gweinydd gwe. Bydd Flower hefyd yn troelli yno i fonitro tasgau Seleri (gan ei fod eisoes wedi cael ei wthio i mewn
apache/airflow:1.10.10-python3.7, ond does dim ots gennym ni) - PostgreSQL, lle bydd Airflow yn ysgrifennu ei wybodaeth gwasanaeth (data amserlennydd, ystadegau gweithredu, ac ati), a bydd Seleri yn marcio tasgau gorffenedig;
- Redis, a fydd yn gweithredu fel brocer tasgau ar gyfer Seleri;
- Gweithiwr seleri, a fydd yn ymwneud â chyflawni tasgau'n uniongyrchol.
- I ffolder
./dagsbyddwn yn ychwanegu ein ffeiliau gyda'r disgrifiad o dags. Byddant yn cael eu codi ar y hedfan, felly nid oes angen jyglo'r pentwr cyfan ar ôl pob tisian.
Mewn rhai mannau, nid yw'r cod yn yr enghreifftiau yn cael ei ddangos yn gyfan gwbl (er mwyn peidio ag annibendod y testun), ond yn rhywle mae'n cael ei addasu yn y broses. Mae enghreifftiau o god gweithio cyflawn i'w gweld yn y gadwrfa .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerПримечания:
- Yng nghynulliad y cyfansoddiad, dibynnais i raddau helaeth ar y ddelwedd adnabyddus - gofalwch eich bod yn edrych arno. Efallai nad oes angen unrhyw beth arall yn eich bywyd.
- Mae pob gosodiad Airflow ar gael nid yn unig drwodd
airflow.cfg, ond hefyd trwy newidynnau amgylchedd (diolch i'r datblygwyr), y cymerais fantais yn faleisus. - Yn naturiol, nid yw'n barod i gynhyrchu: ni wnes i roi curiadau calon ar gynwysyddion yn fwriadol, ni wnes i drafferthu â diogelwch. Ond fe wnes i'r lleiafswm sy'n addas ar gyfer ein harbrofwyr.
- Sylwch fod:
- Rhaid i'r ffolder dag fod yn hygyrch i'r trefnydd a'r gweithwyr.
- Mae'r un peth yn wir am bob llyfrgell trydydd parti - rhaid eu gosod i gyd ar beiriannau gyda rhaglennydd a gweithwyr.
Wel, nawr mae'n syml:
$ docker-compose up --scale worker=3Ar ôl i bopeth godi, gallwch edrych ar y rhyngwynebau gwe:
- Llif aer:
- Blodyn:
Cysyniadau sylfaenol
Os nad oeddech chi'n deall unrhyw beth yn yr holl “dagiau”, yna dyma eiriadur byr:
- Scheduler - yr ewythr pwysicaf yn Airflow, sy'n rheoli bod robotiaid yn gweithio'n galed, ac nid person: yn monitro'r amserlen, yn diweddaru dagiau, yn lansio tasgau.
Yn gyffredinol, mewn fersiynau hŷn, roedd ganddo broblemau gyda'r cof (na, nid amnesia, ond gollyngiadau) ac roedd y paramedr etifeddiaeth hyd yn oed yn aros yn y cyfluniadau
run_duration- ei egwyl ailgychwyn. Ond nawr mae popeth yn iawn. - DAG (aka "dag") - "graff acyclic cyfeiriedig", ond bydd diffiniad o'r fath yn dweud ychydig o bobl, ond mewn gwirionedd mae'n gynhwysydd ar gyfer tasgau sy'n rhyngweithio â'i gilydd (gweler isod) neu analog o Pecyn yn SSIS a Llif Gwaith yn Informatica .
Yn ogystal â dagiau, efallai y bydd subdagiau o hyd, ond yn fwyaf tebygol ni fyddwn yn cyrraedd atynt.
- Rhedeg DAG - dag cychwynnol, sy'n cael ei neilltuo ei hun
execution_date. Gall dagrans o'r un dag weithio ochr yn ochr (os ydych chi wedi gwneud eich tasgau'n analluog, wrth gwrs). - Gweithredwr yn ddarnau o god sy'n gyfrifol am gyflawni gweithred benodol. Mae tri math o weithredwyr:
- gweithredufel ein ffefryn
PythonOperator, a all weithredu unrhyw god Python (dilys); - trosglwyddo, sy'n cludo data o le i le, dyweder,
MsSqlToHiveTransfer; - synhwyrydd ar y llaw arall, bydd yn caniatáu ichi ymateb neu arafu gweithrediad pellach y dag nes bod digwyddiad yn digwydd.
HttpSensoryn gallu tynnu'r pwynt terfyn penodedig, a phan fydd yr ymateb a ddymunir yn aros, dechreuwch y trosglwyddiadGoogleCloudStorageToS3Operator. Bydd meddwl chwilfrydig yn gofyn: “pam? Wedi'r cyfan, gallwch chi ailadrodd yn iawn yn y gweithredwr!" Ac yna, er mwyn peidio â chlocsio'r gronfa o dasgau gyda gweithredwyr ataliedig. Mae'r synhwyrydd yn cychwyn, yn gwirio ac yn marw cyn yr ymgais nesaf.
- gweithredufel ein ffefryn
- Gorchwyl - mae gweithredwyr datganedig, waeth beth fo'u math, ac sydd ynghlwm wrth y dag yn cael eu dyrchafu i reng y dasg.
- enghraifft tasg - pan benderfynodd y cynlluniwr cyffredinol ei bod hi'n bryd anfon tasgau i'r frwydr ar y perfformwyr-weithwyr (yn y fan a'r lle, os ydyn ni'n defnyddio
LocalExecutorneu i nod pell yn achosCeleryExecutor), mae'n aseinio cyd-destun iddynt (h.y., set o newidynnau - paramedrau gweithredu), yn ehangu templedi gorchymyn neu ymholiad, ac yn eu cyfuno.
Rydym yn cynhyrchu tasgau
Yn gyntaf, gadewch i ni amlinellu cynllun cyffredinol ein doug, ac yna byddwn yn plymio i'r manylion fwyfwy, oherwydd ein bod yn cymhwyso rhai atebion nad ydynt yn ddibwys.
Felly, yn ei ffurf symlaf, bydd dag o'r fath yn edrych fel hyn:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Gadewch i ni ddarganfod:
- Yn gyntaf, rydym yn mewnforio y libs angenrheidiol a Rhywbeth arall;
sql_server_ds- A ywList[namedtuple[str, str]]gydag enwau'r cysylltiadau o Airflow Connections a'r cronfeydd data y byddwn yn cymryd ein plât ohonynt;dag— cyhoeddiad ein doug, y mae yn rhaid fod ynddo o angenrheidrwyddglobals(), fel arall ni fydd Airflow yn dod o hyd iddo. Mae angen i Doug ddweud hefyd:- beth yw ei enw
orders- bydd yr enw hwn wedyn yn ymddangos yn y rhyngwyneb gwe, - y bydd yn gweithio o hanner nos ar yr wythfed o Orffennaf,
- a dylai redeg, tua bob 6 awr (ar gyfer dynion caled yma yn lle
timedelta()derbyniadwycron-llinell0 0 0/6 ? * * *, ar gyfer y llai oer - mynegiant fel@daily);
- beth yw ei enw
workflow()yn gwneud y prif waith, ond nid nawr. Am y tro, byddwn yn taflu ein cyd-destun i'r log.- Ac yn awr yr hud syml o greu tasgau:
- rydym yn rhedeg trwy ein ffynonellau;
- ymgychwyn
PythonOperator, a fydd yn gweithredu ein dymiworkflow(). Peidiwch ag anghofio nodi enw unigryw (o fewn y dag) y dasg a chlymu'r dag ei hun. Banerprovide_contextyn ei dro, yn arllwys dadleuon ychwanegol i'r swyddogaeth, y byddwn yn eu casglu'n ofalus gan ddefnyddio**context.
Am y tro, dyna i gyd. Yr hyn a gawsom:
- dag newydd yn y rhyngwyneb gwe,
- cant a hanner o dasgau a fydd yn cael eu cyflawni ochr yn ochr (os yw'r gosodiadau Llif Awyr, Seleri a chynhwysedd y gweinydd yn caniatáu hynny).
Wel, bron wedi ei gael.

Pwy fydd yn gosod y dibyniaethau?
I symleiddio'r holl beth hwn, fe wnes i sgriwio i mewn docker-compose.yml prosesu requirements.txt ar bob nod.
Nawr mae wedi mynd:

Mae sgwariau llwyd yn enghreifftiau o dasgau a brosesir gan y trefnydd.
Rydym yn aros ychydig, mae'r tasgau'n cael eu bachu gan y gweithwyr:

Mae'r rhai gwyrdd, wrth gwrs, wedi cwblhau eu gwaith yn llwyddiannus. Nid yw cochion yn llwyddiannus iawn.
Gyda llaw, nid oes ffolder ar ein prod
./dags, nid oes cydamseriad rhwng peiriannau - mae pob dag yn gorwedd i mewngitar ein Gitlab, ac mae Gitlab CI yn dosbarthu diweddariadau i beiriannau wrth unomaster.
Ychydig am Blodau
Tra mae'r gweithwyr yn dyrnu ein heddychwyr, gadewch i ni gofio arf arall a all ddangos rhywbeth i ni - Flower.
Y dudalen gyntaf gyda gwybodaeth gryno am nodau gweithwyr:

Y dudalen fwyaf dwys gyda thasgau a aeth i'r gwaith:

Y dudalen fwyaf diflas gyda statws ein brocer:

Mae'r dudalen ddisgleiriaf gyda graffiau statws tasg a'u hamser gweithredu:

Rydym yn llwytho y underloaded
Felly, mae'r holl dasgau wedi gweithio allan, gallwch chi gario'r clwyfedig i ffwrdd.

Ac roedd yna lawer wedi'u clwyfo - am ryw reswm neu'i gilydd. Yn achos y defnydd cywir o Llif Awyr, mae'r union sgwariau hyn yn dangos na chyrhaeddodd y data yn bendant.
Mae angen i chi wylio'r log ac ailgychwyn yr achosion tasg sydd wedi cwympo.
Drwy glicio ar unrhyw sgwâr, byddwn yn gweld y camau gweithredu sydd ar gael i ni:

Gallwch chi gymryd a gwneud Clirio'r syrthio. Hynny yw, rydym yn anghofio bod rhywbeth wedi methu yno, a bydd yr un dasg enghraifft yn mynd i'r trefnydd.

Mae'n amlwg nad yw gwneud hyn gyda'r llygoden gyda'r holl sgwariau coch yn drugarog iawn - nid dyma'r hyn yr ydym yn ei ddisgwyl gan Airflow. Yn naturiol, mae gennym arfau dinistr torfol: Browse/Task Instances

Gadewch i ni ddewis popeth ar unwaith ac ailosod i sero, cliciwch ar yr eitem gywir:

Ar ôl glanhau, mae ein tacsis yn edrych fel hyn (maen nhw eisoes yn aros i'r trefnydd eu hamserlennu):

Cysylltiadau, bachau a newidynnau eraill
Mae'n bryd edrych ar y DAG nesaf, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Ydy pawb erioed wedi diweddaru adroddiad? Dyma hi eto: mae rhestr o ffynonellau o ble i gael y data; mae rhestr lle i roi; peidiwch ag anghofio honk pan ddigwyddodd neu dorrodd popeth (wel, nid yw hyn yn ymwneud â ni, na).
Gadewch i ni fynd trwy'r ffeil eto ac edrych ar y stwff aneglur newydd:
from commons.operators import TelegramBotSendMessage- nid oes dim yn ein hatal rhag gwneud ein gweithredwyr ein hunain, y gwnaethom fanteisio arno trwy wneud papur lapio bach ar gyfer anfon negeseuon i Unblocked. (Byddwn yn siarad mwy am y gweithredwr hwn isod);default_args={}— gall dag ddosbarthu yr un dadleuon i'w holl weithredwyr ;to='{{ var.value.all_the_kings_men }}'- maestoni fydd gennym god caled, ond wedi'i gynhyrchu'n ddeinamig gan ddefnyddio Jinja a newidyn gyda rhestr o negeseuon e-bost, a roddais yn ofalus i mewnAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— amod ar gyfer cychwyn y gweithredwr. Yn ein hachos ni, dim ond os yw'r holl ddibyniaethau wedi gweithio allan y bydd y llythyr yn hedfan i'r penaethiaid yn llwyddiannus;tg_bot_conn_id='tg_main'- dadleuonconn_idderbyn IDau cysylltiad rydym yn creu ynddyntAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- bydd negeseuon yn Telegram yn hedfan i ffwrdd dim ond os bydd tasgau wedi cwympo;task_concurrency=1- rydym yn gwahardd lansio sawl tasg o un dasg ar yr un pryd. Fel arall, byddwn yn cael lansiad ar yr un pryd o sawl unVerticaOperator(edrych ar un bwrdd);report_update >> [email, tg]- I gydVerticaOperatorcydgyfeirio wrth anfon llythyrau a negeseuon, fel hyn:

Ond gan fod gan weithredwyr hysbyswyr amodau lansio gwahanol, dim ond un fydd yn gweithio. Yn y Tree View, mae popeth yn edrych ychydig yn llai gweledol:

Dywedaf ychydig eiriau am macros a'u ffrindiau - newidynnau.
Mae macros yn ddalfannau Jinja a all amnewid gwybodaeth ddefnyddiol amrywiol yn ddadleuon gweithredwr. Er enghraifft, fel hyn:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} yn ehangu i gynnwys y newidyn cyd-destun execution_date mewn fformat YYYY-MM-DD: 2020-07-14. Y rhan orau yw bod newidynnau cyd-destun yn cael eu hoelio i enghraifft o dasg benodol (sgwâr yn y Tree View), a phan fyddant yn cael eu hailddechrau, bydd y dalfannau yn ehangu i'r un gwerthoedd.
Gellir gweld y gwerthoedd a neilltuwyd gan ddefnyddio'r botwm Rendro ar bob tasg. Dyma sut mae'r dasg o anfon llythyr:

Ac felly ar y dasg o anfon neges:

Mae rhestr gyflawn o macros adeiledig ar gyfer y fersiwn ddiweddaraf sydd ar gael ar gael yma:
Ar ben hynny, gyda chymorth ategion, gallwn ddatgan ein macros ein hunain, ond stori arall yw honno.
Yn ogystal â'r pethau rhagddiffiniedig, gallwn amnewid gwerthoedd ein newidynnau (defnyddiais hyn eisoes yn y cod uchod). Gadewch i ni greu i mewn Admin/Variables cwpl o bethau:

Popeth y gallwch ei ddefnyddio:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Gall y gwerth fod yn sgalar, neu gall fod yn JSON hefyd. Yn achos JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}defnyddiwch y llwybr i'r allwedd a ddymunir: {{ var.json.bot_config.bot.token }}.
Byddaf yn llythrennol yn dweud un gair ac yn dangos un sgrinlun amdano cysylltiadau. Mae popeth yn elfennol yma: ar y dudalen Admin/Connections rydym yn creu cysylltiad, yn ychwanegu ein mewngofnodi / cyfrineiriau a pharamedrau mwy penodol yno. Fel hyn:

Gellir amgryptio cyfrineiriau (yn fwy trylwyr na'r rhagosodedig), neu gallwch adael y math o gysylltiad allan (fel y gwnes ar gyfer tg_main) - y ffaith yw bod y rhestr o fathau yn galed mewn modelau Airflow ac ni ellir ei ehangu heb fynd i mewn i'r codau ffynhonnell (os yn sydyn ni wnes i google rhywbeth, cywirwch fi), ond ni fydd dim yn ein hatal rhag cael credydau yn unig enw.
Gallwch hefyd wneud sawl cysylltiad gyda'r un enw: yn yr achos hwn, y dull BaseHook.get_connection(), sy'n cael cysylltiadau i ni yn ôl enw, yn rhoi ar hap o nifer o enwau (byddai'n fwy rhesymegol i wneud Rownd Robin, ond gadewch i ni ei adael ar gydwybod y datblygwyr Airflow).
Mae Newidynnau a Chysylltiadau yn sicr yn offer cŵl, ond mae'n bwysig peidio â cholli'r cydbwysedd: pa rannau o'ch llifau rydych chi'n eu storio yn y cod ei hun, a pha rannau rydych chi'n eu rhoi i Airflow i'w storio. Ar y naill law, gall fod yn gyfleus newid y gwerth yn gyflym, er enghraifft, blwch postio, trwy'r UI. Ar y llaw arall, mae hwn yn dal i fod yn dychwelyd i'r clic llygoden, yr oeddem ni (fi) am gael gwared ohono.
Gweithio gyda chysylltiadau yw un o'r tasgau bachau. Yn gyffredinol, mae bachau Llif Awyr yn bwyntiau ar gyfer ei gysylltu â gwasanaethau a llyfrgelloedd trydydd parti. Ee, JiraHook yn agor cleient i ni ryngweithio â Jira (gallwch symud tasgau yn ôl ac ymlaen), a gyda chymorth SambaHook gallwch chi wthio ffeil leol i smb-pwynt.
Dosrannu'r gweithredwr arferiad
Ac fe ddaethon ni'n agos at edrych ar sut mae'n cael ei wneud TelegramBotSendMessage
Cod commons/operators.py gyda'r gweithredwr gwirioneddol:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Yma, fel popeth arall yn Airflow, mae popeth yn syml iawn:
- Etifeddwyd o
BaseOperator, sy'n gweithredu cryn dipyn o bethau llif aer penodol (edrychwch ar eich hamdden) - Meysydd a ddatganwyd
template_fields, lle bydd Jinja yn chwilio am macros i'w prosesu. - Wedi trefnu'r dadleuon cywir o blaid
__init__(), gosodwch y rhagosodiadau lle bo angen. - Wnaethon ni ddim anghofio am ddechreuad yr hynafiad chwaith.
- Wedi agor y bachyn cyfatebol
TelegramBotHookwedi derbyn gwrthrych cleient ohono. - Dull wedi'i ddiystyru (ailddiffinio).
BaseOperator.execute(), a fydd Airfow yn twitch pan ddaw'r amser i lansio'r gweithredwr - ynddo byddwn yn gweithredu'r prif weithred, gan anghofio mewngofnodi. (Rydyn ni'n mewngofnodi, gyda llaw, reit i mewnstdoutиstderr- Bydd llif aer yn rhyng-gipio popeth, yn ei lapio'n hyfryd, yn ei ddadelfennu lle bo angen.)
Gawn ni weld beth sydd gennym ni commons/hooks.py. Rhan gyntaf y ffeil, gyda'r bachyn ei hun:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientNid wyf hyd yn oed yn gwybod beth i'w esbonio yma, byddaf yn nodi'r pwyntiau pwysig:
- Rydyn ni'n etifeddu, meddyliwch am y dadleuon - yn y rhan fwyaf o achosion bydd yn un:
conn_id; - Diystyru dulliau safonol: cyfyngais fy hun
get_conn(), yr wyf yn cael y paramedrau cysylltiad yn ôl enw a dim ond yn cael yr adranextra(maes JSON yw hwn), lle rhoddais i (yn ôl fy nghyfarwyddiadau fy hun!) y tocyn Telegram bot:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Rwy'n creu enghraifft o'n
TelegramBot, gan roi tocyn penodol iddo.
Dyna i gyd. Gallwch gael cleient o fachyn gan ddefnyddio TelegramBotHook().clent neu TelegramBotHook().get_conn().
Ac ail ran y ffeil, lle rwy'n gwneud microlapiwr ar gyfer y Telegram REST API, er mwyn peidio â llusgo'r un peth am un dull sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Y ffordd gywir yw ychwanegu'r cyfan i fyny:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- yn yr ategyn, rhowch mewn ystorfa gyhoeddus, a'i roi i Ffynhonnell Agored.
Tra roeddem yn astudio hyn i gyd, llwyddodd ein diweddariadau adroddiad i fethu'n llwyddiannus ac anfon neges gwall ataf yn y sianel. Rydw i'n mynd i wirio i weld a yw'n anghywir ...

Torrodd rhywbeth yn ein ci! Onid dyna yr oeddem yn ei ddisgwyl? Yn union!
Ydych chi'n mynd i arllwys?
Ydych chi'n teimlo fy mod wedi colli rhywbeth? Mae'n ymddangos ei fod wedi addo trosglwyddo data o SQL Server i Vertica, ac yna fe'i cymerodd a symud oddi ar y pwnc, y scoundrel!
Roedd yr erchyllter hwn yn fwriadol, yn syml iawn roedd yn rhaid imi ddehongli rhywfaint o derminoleg i chi. Nawr gallwch chi fynd ymhellach.
Ein cynllun oedd hyn:
- Gwna dag
- Cynhyrchu tasgau
- Gweld pa mor brydferth yw popeth
- Neilltuo rhifau sesiwn i lenwi
- Cael data o SQL Server
- Rhowch ddata yn Vertica
- Casglu ystadegau
Felly, i gael hyn i gyd ar waith, fe wnes i ychwanegiad bach i'n docker-compose.yml:
docwr-cyfansoddi.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyYno rydym yn codi:
- Vertica fel gwesteiwr
dwhgyda'r gosodiadau mwyaf diofyn, - tri achos o SQL Server,
- rydym yn llenwi'r cronfeydd data yn yr olaf gyda rhywfaint o ddata (peidiwch ag ymchwilio i unrhyw achos
mssql_init.py!)
Rydyn ni'n lansio'r holl dda gyda chymorth gorchymyn ychydig yn fwy cymhleth na'r tro diwethaf:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Yr hyn a gynhyrchwyd gan ein hapiwr gwyrthiol, gallwch ddefnyddio'r eitem Data Profiling/Ad Hoc Query:

Y prif beth yw peidio â'i ddangos i ddadansoddwyr
ymhelaethu ar Sesiynau ETL Wn i ddim, mae popeth yn ddibwys yno: rydyn ni'n gwneud sylfaen, mae arwydd ynddo, rydyn ni'n lapio popeth gyda rheolwr cyd-destun, a nawr rydyn ni'n gwneud hyn:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15sesiwn.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passMae'r amser wedi dod casglu ein data o'n byrddau cant a hanner. Gadewch i ni wneud hyn gyda chymorth llinellau diymhongar iawn:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Gyda chymorth bachyn a gawn o Airflow
pymssql-cysylltu - Gadewch i ni amnewid cyfyngiad ar ffurf dyddiad yn y cais - bydd yn cael ei daflu i'r swyddogaeth gan y peiriant templed.
- Bwydo ein cais
pandaspwy fydd yn ein caelDataFrame- bydd yn ddefnyddiol i ni yn y dyfodol.
Rwy'n defnyddio amnewid
{dt}yn lle paramedr cais%snid oherwydd fy mod yn Pinocchio drwg, ond oherwyddpandasmethu trinpymssqlac yn llithro yr un olafparams: Lister ei fod wir eisiautuple.
Sylwch hefyd fod y datblygwrpymssqlpenderfynodd beidio â'i gefnogi mwyach, ac mae'n bryd symud allanpyodbc.
Gawn ni weld beth wnaeth Airflow stwffio dadleuon ein swyddogaethau gyda:

Os nad oes data, yna nid oes diben parhau. Ond rhyfedd hefyd yw ystyried y llenwad yn llwyddiannus. Ond nid camgymeriad yw hyn. A-ah-ah, beth i'w wneud?! A dyma beth:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException yn dweud wrth Airflow nad oes unrhyw wallau, ond rydym yn hepgor y dasg. Ni fydd gan y rhyngwyneb sgwâr gwyrdd neu goch, ond pinc.
Gadewch i ni daflu ein data colofnau lluosog:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Sef
- Y gronfa ddata y cymerwyd yr archebion ohoni,
- ID ein sesiwn llifogydd (bydd yn wahanol ar gyfer pob tasg),
- Stwnsh o'r ID ffynhonnell a threfn - fel bod gennym ni ID archeb unigryw yn y gronfa ddata derfynol (lle mae popeth yn cael ei arllwys i un bwrdd).
Erys y cam olaf ond un: arllwyswch bopeth i Vertica. Ac, yn rhyfedd ddigon, un o'r ffyrdd mwyaf trawiadol ac effeithlon o wneud hyn yw trwy CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Rydym yn gwneud derbynnydd arbennig
StringIO. pandasBydd garedig rhoi einDataFramear y ffurfCSV-llinellau.- Gadewch i ni agor cysylltiad â'n hoff Vertica gyda bachyn.
- Ac yn awr gyda chymorth
copy()anfon ein data yn uniongyrchol i Vertika!
Byddwn yn cymryd gan y gyrrwr faint o linellau a lenwyd, ac yn dweud wrth y rheolwr sesiwn bod popeth yn iawn:
session.loaded_rows = cursor.rowcount
session.successful = TrueDyna i gyd.
Ar y gwerthiant, rydym yn creu'r plât targed â llaw. Yma fe wnes i ganiatáu peiriant bach i mi fy hun:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)Rwy'n defnyddio
VerticaOperator()Rwy'n creu sgema cronfa ddata a thabl (os nad ydynt yn bodoli eisoes, wrth gwrs). Y prif beth yw trefnu'r dibyniaethau yn gywir:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadCrynhoi
— Wel, — meddai y llygoden fach, — onid ydyw, yn awr
A ydych yn argyhoeddedig mai fi yw'r anifail mwyaf ofnadwy yn y goedwig?
Julia Donaldson, Y Gryffalo
Rwy'n meddwl pe bai gan fy nghydweithwyr a minnau gystadleuaeth: pwy fydd yn creu ac yn lansio proses ETL yn gyflym o'r dechrau: maen nhw gyda'u SSIS a llygoden a fi gyda Airflow ... Ac yna byddem hefyd yn cymharu rhwyddineb cynnal a chadw ... Waw, dwi'n meddwl y byddwch chi'n cytuno y byddaf yn eu curo ar bob ffrynt!
Os ychydig yn fwy difrifol, yna Apache Airflow - trwy ddisgrifio prosesau ar ffurf cod rhaglen - a wnaeth fy swydd llawer yn fwy cyfforddus a phleserus.
Mae ei estynadwyedd diderfyn, o ran ategion a rhagdueddiad i scalability, yn rhoi'r cyfle i chi ddefnyddio Llif Awyr mewn bron unrhyw faes: hyd yn oed yn y cylch llawn o gasglu, paratoi a phrosesu data, hyd yn oed wrth lansio rocedi (i'r blaned Mawrth, o cwrs).
Rhan derfynol, cyfeirnod a gwybodaeth
Y rhaca rydym wedi casglu i chi
start_date. Ydy, mae hwn eisoes yn feme lleol. Trwy brif ddadl dougstart_datei gyd yn pasio. Yn fyr, os nodwch ynstart_datedyddiad presennol, aschedule_interval- un diwrnod, yna bydd DAG yn dechrau yfory dim cynt.start_date = datetime(2020, 7, 7, 0, 1, 2)A dim mwy o broblemau.
Mae gwall amser rhedeg arall yn gysylltiedig ag ef:
Task is missing the start_date parameter, sy'n nodi amlaf eich bod wedi anghofio rhwymo'r gweithredwr dag.- I gyd ar un peiriant. Ie, a seiliau (Airflow ei hun a'n cotio), a gweinydd gwe, a scheduler, a gweithwyr. Ac fe weithiodd hyd yn oed. Ond dros amser, tyfodd nifer y tasgau ar gyfer gwasanaethau, a phan ddechreuodd PostgreSQL ymateb i'r mynegai mewn 20 s yn lle 5 ms, fe wnaethom ei gymryd a'i gario i ffwrdd.
- Gweithredwr Lleol. Ydym, rydym yn dal i eistedd arno, ac rydym eisoes wedi dod i ymyl yr affwys. Mae LocalExecutor wedi bod yn ddigon i ni hyd yn hyn, ond nawr mae'n bryd ehangu gydag o leiaf un gweithiwr, a bydd yn rhaid i ni weithio'n galed i symud i CeleryExecutor. Ac o ystyried y ffaith y gallwch chi weithio gydag ef ar un peiriant, nid oes dim yn eich atal rhag defnyddio Seleri hyd yn oed ar weinydd, na fydd “wrth gwrs, byth yn mynd i mewn i gynhyrchu, a dweud y gwir!”
- Di-ddefnydd offer adeiledig:
- Cysylltiadau i storio manylion gwasanaeth,
- CLG Misses ymateb i dasgau nad oeddent yn gweithio allan ar amser,
- xcom ar gyfer cyfnewid metadata (dywedais metadata!) rhwng tasgau dag.
- Camddefnydd post. Wel, beth alla i ddweud? Gosodwyd rhybuddion ar gyfer pob ailadroddiad o dasgau cwympo. Nawr mae gan Gmail fy ngwaith >90k o e-byst gan Airflow, ac mae'r we mail mail yn gwrthod codi a dileu mwy na 100 ar y tro.
Mwy o beryglon:
Mwy o offer awtomeiddio
Er mwyn i ni weithio hyd yn oed yn fwy gyda'n pennau ac nid gyda'n dwylo, mae Airflow wedi paratoi hyn i ni:
- - mae ganddo statws Arbrofol o hyd, nad yw'n ei atal rhag gweithio. Ag ef, gallwch nid yn unig gael gwybodaeth am dagiau a thasgau, ond hefyd stopio / cychwyn dag, creu DAG Run neu bwll.
- - mae llawer o offer ar gael trwy'r llinell orchymyn nad ydynt yn anghyfleus yn unig i'w defnyddio trwy'r WebUI, ond sy'n absennol yn gyffredinol. Er enghraifft:
backfillangen ailgychwyn achosion tasg.
Er enghraifft, daeth dadansoddwyr a dweud: “Ac mae gennych chi, gymrawd, nonsens yn y data o Ionawr 1 i 13! Trwsio, trwsio, ei drwsio, ei drwsio!" Ac rydych chi'n gymaint o hob:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Gwasanaeth sylfaenol:
initdb,resetdb,upgradedb,checkdb. run, sy'n eich galluogi i redeg un dasg enghraifft, a hyd yn oed sgorio ar bob dibyniaeth. Ar ben hynny, gallwch chi ei redeg trwyLocalExecutor, hyd yn oed os oes gennych glwstwr Seleri.- Yn gwneud yr un peth fwy neu lai
test, yn unig hefyd mewn gwaelodion yn ysgrifennu dim. connectionscaniatáu creu màs o gysylltiadau o'r gragen.
- - ffordd greiddiol braidd o ryngweithio, a fwriedir ar gyfer ategion, ac nid heidio ynddo gyda dwylo bach. Ond pwy sydd i'n rhwystro ni rhag mynd
/home/airflow/dags, rhedegipythona dechrau chwarae o gwmpas? Gallwch, er enghraifft, allforio pob cysylltiad â'r cod canlynol:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Cysylltu â chronfa ddata meta-lif Air. Nid wyf yn argymell ysgrifennu ato, ond gall cael cyflyrau tasg ar gyfer amrywiol fetrigau penodol fod yn llawer cyflymach a haws na thrwy unrhyw un o'r APIs.
Gadewch i ni ddweud nad yw pob un o'n tasgau yn analluog, ond gallant ddisgyn weithiau, ac mae hyn yn normal. Ond mae rhai rhwystrau eisoes yn amheus, a byddai angen gwirio.
Byddwch yn ofalus SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
cyfeiriadau
Ac wrth gwrs, y deg dolen gyntaf o gyhoeddiad Google yw cynnwys y ffolder Airflow o'm nodau tudalen.
- - wrth gwrs, mae'n rhaid i ni ddechrau gyda'r swyddfa. dogfennaeth, ond pwy sy'n darllen y cyfarwyddiadau?
- - Wel, o leiaf darllenwch yr argymhellion gan y crewyr.
- - y cychwyn cyntaf: y rhyngwyneb defnyddiwr mewn lluniau
- - mae'r cysyniadau sylfaenol wedi'u disgrifio'n dda, os (yn sydyn!) nad oeddech chi'n deall rhywbeth gen i.
- - canllaw byr ar gyfer sefydlu clwstwr Llif Awyr.
- - bron yr un erthygl ddiddorol, ac eithrio efallai mwy o ffurfioldeb, a llai o enghreifftiau.
- — am weithio ar y cyd â Seleri.
- - am analluedd tasgau, llwytho trwy ID yn lle dyddiad, trawsnewid, strwythur ffeiliau a phethau diddorol eraill.
- - dibyniaethau tasgau a Rheol Sbardun, a grybwyllais yn unig wrth fynd heibio.
- - sut i oresgyn rhai "gweithiau fel y bwriadwyd" yn y rhaglennydd, llwytho data coll a blaenoriaethu tasgau.
- — Ymholiadau SQL defnyddiol i fetadata Airflow.
- - mae adran ddefnyddiol am greu synhwyrydd personol.
- — nodyn byr diddorol am adeiladu seilwaith ar AWS ar gyfer Gwyddor Data.
- - camgymeriadau cyffredin (pan nad yw rhywun yn darllen y cyfarwyddiadau o hyd).
- - gwenwch sut mae pobl yn bagio storio cyfrineiriau, er y gallwch chi ddefnyddio Connections yn unig.
- - anfon ymlaen DAG ymhlyg, taflu swyddogaethau cyd-destun, eto am ddibyniaethau, a hefyd am sgipio lansiadau tasg.
- - am y defnydd
default argumentsиparamsmewn templedi, yn ogystal â Newidynnau a Chysylltiadau. - - stori am sut mae'r cynlluniwr yn paratoi ar gyfer Airflow 2.0.
- - erthygl ychydig yn hen ffasiwn am leoli ein clwstwr i mewn
docker-compose. - - tasgau deinamig gan ddefnyddio templedi a blaenyrru cyd-destun.
- - Hysbysiadau safonol ac arfer trwy'r post a Slack.
- - Tasgau canghennog, macros ac XCom.
A'r dolenni a ddefnyddir yn yr erthygl:
- - dalfannau ar gael i'w defnyddio mewn templedi.
- — Camgymeriadau cyffredin wrth greu dagiau.
- -
docker-composear gyfer arbrofi, dadfygio a mwy. - - Deunydd lapio Python ar gyfer Telegram REST API.
Ffynhonnell: hab.com




