Moni, ndine Dmitry Logvinenko - Data Engineer of the Analytics Department of the Vezet group of companies.
Ndikuuzani za chida chabwino kwambiri chopangira njira za ETL - Apache Airflow. Koma Airflow ndi yosunthika komanso yochulukirapo kotero kuti muyenera kuyang'anitsitsa ngakhale simukukhudzidwa ndi kayendedwe ka deta, koma mukufunikira kuyambitsa ndondomeko iliyonse ndikuwunika momwe amachitira.
Ndipo inde, sindidzangonena, komanso ndikuwonetsa: pulogalamuyi ili ndi ma code ambiri, zithunzi ndi malingaliro.

Zomwe mumawona nthawi zambiri mukamayika google mawu akuti Airflow / Wikimedia Commons
Zamkatimu
Mau oyamba
Apache Airflow ili ngati Django:
- yolembedwa mu python
- pali gulu lalikulu la admin,
- kukula kosatha
- zabwinoko zokha, ndipo zidapangidwira zolinga zosiyana, zomwe ndi (monga momwe zidalembedwera pakat):
- kuyendetsa ndikuyang'anira ntchito pamakina ambiri opanda malire (monga Selari / Kubernetes ambiri ndi chikumbumtima chanu chidzakulolani)
- ndi m'badwo wosunthika wamayendedwe osavuta kulemba ndikumvetsetsa Python code
- ndikutha kulumikiza nkhokwe zilizonse ndi ma API wina ndi mnzake pogwiritsa ntchito zida zonse zokonzeka komanso mapulagini opangidwa kunyumba (omwe ndi osavuta kwambiri).
Timagwiritsa ntchito Apache Airflow motere:
- timasonkhanitsa deta kuchokera kuzinthu zosiyanasiyana (zambiri za SQL Server ndi PostgreSQL, ma API osiyanasiyana okhala ndi ma metrics ogwiritsira ntchito, ngakhale 1C) mu DWH ndi ODS (tili ndi Vertica ndi Clickhouse).
- zapita patsogolo bwanji
cron, yomwe imayamba njira zophatikizira deta pa ODS, ndikuwunikanso kukonza kwawo.
Mpaka posachedwa, zosowa zathu zidaphimbidwa ndi seva imodzi yaying'ono yokhala ndi ma cores 32 ndi 50 GB ya RAM. Mu Airflow, izi zimagwira ntchito:
- kuposa 200 magalamu (Zomwe timagwira ntchito, zomwe timayikamo),
- aliyense pa avareji 70 ntchito,
- ubwino uwu umayamba (komanso pafupifupi) kamodzi pa ola.
Ndipo za momwe tidakulitsira, ndilemba pansipa, koma tsopano tiyeni tifotokozere vuto la über lomwe tithane nalo:
Pali ma Server atatu oyambirira a SQL, iliyonse ili ndi nkhokwe 50 - zochitika za polojekiti imodzi, motero, ali ndi dongosolo lomwelo (pafupifupi kulikonse, mua-ha-ha), zomwe zikutanthauza kuti aliyense ali ndi tebulo la Orders (mwamwayi, tebulo ndi izo). dzina likhoza kukankhidwira mubizinesi iliyonse). Timatenga zidziwitsozo powonjezera magawo a ntchito (seva yoyambira, nkhokwe, ID yantchito ya ETL) ndikuziponya mosasamala, titi, Vertica.
Tiyeni tipite!
Gawo lalikulu, lothandiza (komanso longoyerekeza pang'ono)
Chifukwa chiyani ife (ndi inu)
Pamene mitengo inali yaikulu ndipo ndinali wosavuta SQL-schik mu ritelo imodzi yaku Russia, tidasokoneza njira za ETL aka kuyenda kwa data pogwiritsa ntchito zida ziwiri zomwe tili nazo:
- Informatica Power Center - dongosolo lofalikira kwambiri, lopanga kwambiri, lokhala ndi zida zake, kumasulira kwake. Ndinagwiritsa ntchito Mulungu aletse 1% ya mphamvu zake. Chifukwa chiyani? Chabwino, choyamba, mawonekedwe awa, kwinakwake kuchokera m'zaka za m'ma 380, amaika maganizo athu pa ife. Kachiwiri, contraption iyi idapangidwira njira zapamwamba kwambiri, kugwiritsanso ntchito kwaukali ndi zina zofunika kwambiri zamabizinesi. Zakuti zimawononga ndalama, monga mapiko a Airbus AXNUMX / chaka, sitinena kalikonse.
Chenjerani, chithunzithunzi chikhoza kuvulaza anthu ochepera zaka 30 pang'ono

- SQL Server Integration Server - tidagwiritsa ntchito comrade iyi mumayendedwe athu amkati. Chabwino, kwenikweni: timagwiritsa ntchito kale SQL Server, ndipo zingakhale zopanda nzeru kuti tisagwiritse ntchito zida zake za ETL. Chilichonse chomwe chili mmenemo ndi chabwino: mawonekedwe onsewa ndi okongola, ndipo malipoti akupita patsogolo ... Koma sichifukwa chake timakonda mapulogalamu a mapulogalamu, o, osati izi. Sinthani izo
dtsx(yomwe ndi XML yokhala ndi ma node osakanikirana) titha, koma ndi chiyani? Nanga bwanji kupanga phukusi lantchito lomwe lingakokere mazana a matebulo kuchokera pa seva imodzi kupita ku ina? Inde, zana lanji, chala chanu cholozera chidzagwa kuchokera ku zidutswa makumi awiri, ndikudina batani la mbewa. Koma zikuwoneka bwino kwambiri:
Ndithudi tinafunafuna njira zothetsera. Mlandu ngakhale pafupifupi adafika pa jenereta yodzilemba yokha ya SSIS ...
…ndipo ntchito ina inandipeza. Ndipo Apache Airflow idandipeza pamenepo.
Nditazindikira kuti mafotokozedwe a ETL ndi njira yosavuta ya Python, sindinavine mosangalala. Umu ndi momwe mitsinje ya data idasinthidwira ndikusiyana, ndikutsanulira matebulo okhala ndi mawonekedwe amodzi kuchokera pamasamba mazana ambiri kukhala chandamale imodzi idakhala nkhani ya Python pazithunzi chimodzi ndi theka kapena ziwiri 13 ”.
Kusonkhanitsa gulu
Tiyeni tisamakonzere sukulu ya kindergarten, osalankhula za zinthu zodziwikiratu pano, monga kukhazikitsa Airflow, Nawonso achichepere omwe mwasankha, Selari ndi milandu ina yomwe ikufotokozedwa m'madoko.
Kuti titha kuyamba zoyeserera nthawi yomweyo, ndidajambula docker-compose.yml momwe:
- Tiyeni tikweze Mayendedwe ampweya: Scheduler, Webserver. Maluwa adzakhalanso akuzungulira pamenepo kuti aziyang'anira ntchito za Selari (chifukwa adakankhidwira kale
apache/airflow:1.10.10-python3.7, koma sitisamala) - PostgreSQL, momwe Airflow idzalemba zidziwitso zake zautumiki (ma data a scheduler, ziwerengero zakupha, etc.), ndipo Selari idzalemba ntchito zomwe zatsirizidwa;
- Redis, yomwe idzachita ngati wogulitsa ntchito kwa Selari;
- Selari wogwira ntchito, yomwe idzagwire ntchito mwachindunji.
- Ku foda
./dagstidzawonjezera mafayilo athu ndi mafotokozedwe a dags. Adzanyamulidwa pa ntchentche, kotero palibe chifukwa chogwedeza mulu wonse mutatha kuyetsemula.
M'malo ena, kachidindo m'zitsanzo sichimasonyezedwa kwathunthu (kuti musasokoneze malemba), koma kwinakwake amasinthidwa. Zitsanzo zamtundu wathunthu zogwirira ntchito zitha kupezeka m'malo osungira .
makina oyimba.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerMfundo:
- Pamsonkhano wa zolembazo, ndinadalira kwambiri chithunzi chodziwika bwino - onetsetsani kuti mwachiwona. Mwina simukusowa china chilichonse pamoyo wanu.
- Zokonda zonse za Airflow zimapezeka osati kudzera
airflow.cfg, komanso kudzera mumitundu yosiyanasiyana (zikomo kwa opanga), zomwe ndidapezerapo mwayi. - Mwachilengedwe, sizokonzekera kupanga: Sindinayike dala kugunda kwamtima pazotengera, sindinavutike ndi chitetezo. Koma ndinachita zochepa zoyenera kwa oyesera athu.
- Zindikirani kuti:
- Foda ya dag iyenera kupezeka kwa onse okonza mapulani komanso ogwira ntchito.
- Zomwezo zimagwiranso ntchito ku malaibulale onse a chipani chachitatu - onse ayenera kukhazikitsidwa pamakina omwe ali ndi ndandanda ndi antchito.
Chabwino, tsopano ndi zophweka:
$ docker-compose up --scale worker=3Zonse zikakwera, mutha kuyang'ana pa intaneti:
- Mayendedwe ampweya:
- Maluwa:
Mfundo zazikulu
Ngati simunamvetse kalikonse mu "dags" zonsezi, ndiye kuti dikishonale yayifupi:
- Scheduler - amalume ofunikira kwambiri mu Airflow, kuwongolera kuti maloboti agwire ntchito molimbika, osati munthu: amayang'anira ndandanda, kusintha ma dags, kuyambitsa ntchito.
Nthawi zambiri, m'matembenuzidwe akale, anali ndi vuto la kukumbukira (ayi, osati amnesia, koma kutayikira) ndipo cholowacho chinakhalabe m'makonzedwe.
run_duration- nthawi yake yoyambiranso. Koma tsopano zonse zili bwino. - DAG (aka "dag") - "directed acyclic graph", koma kutanthauzira koteroko kudzauza anthu ochepa, koma kwenikweni ndi chidebe cha ntchito zomwe zimagwirizana (onani m'munsimu) kapena analogue ya Phukusi mu SSIS ndi Workflow mu Informatica .
Kuphatikiza pa ma dags, pangakhalebe ma subdags, koma mwina sitingafike kwa iwo.
- Kuthamanga kwa DAG - dag yoyambira, yomwe imapatsidwa yake
execution_date. Ma Dagrans a dag omwewo amatha kugwira ntchito limodzi (ngati mwapangitsa ntchito zanu kukhala zopanda ntchito, inde). - Woyendetsa ndi zidutswa za code zomwe zimagwira ntchito inayake. Pali mitundu itatu ya ogwira ntchito:
- kuchitapomonga zomwe timakonda
PythonOperator, yomwe imatha kugwiritsa ntchito code (yovomerezeka) ya Python; - tumizani, zomwe zimanyamula deta kuchokera kumalo kupita kumalo, kunena,
MsSqlToHiveTransfer; - sensa Kumbali inayi, zimakupatsani mwayi wochitapo kanthu kapena kuchepetsa kupha kwa dag mpaka chochitika chichitike.
HttpSensorakhoza kukoka mapeto otchulidwa, ndipo pamene yankho lofunidwa likudikirira, yambani kusamutsaGoogleCloudStorageToS3Operator. Munthu wofuna kudziwa zambiri amafunsa kuti: “Chifukwa chiyani? Kupatula apo, mutha kubwerezanso mu opareshoni! ” Ndiyeno, kuti musatseke dziwe la ntchito ndi ogwira ntchito oimitsidwa. Sensa imayamba, imayang'ana ndikumwalira isanayambe kuyesanso.
- kuchitapomonga zomwe timakonda
- Ntchito - olengezedwa ogwiritsira ntchito, mosasamala kanthu za mtundu, ndipo ophatikizidwa ku dag amakwezedwa paudindo wantchito.
- chitsanzo cha ntchito - pamene wokonza mapulani adaganiza kuti inali nthawi yoti atumize ntchito kunkhondo kwa ogwira ntchito (pomwepo, ngati tigwiritsa ntchito
LocalExecutorkapena ku node yakutali ngatiCeleryExecutor), imawapatsa nkhani (mwachitsanzo, gulu la zosinthika - magawo ochitira), imakulitsa ma tempulo amafunso, ndikuwayika.
Timapanga ntchito
Choyamba, tiyeni tifotokoze dongosolo lonse la doug wathu, ndiyeno tizama mwatsatanetsatane, chifukwa timagwiritsa ntchito njira zina zomwe sizinali zazing'ono.
Chifukwa chake, mwanjira yake yosavuta, dag yotere idzawoneka motere:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Tiyeni tiwone izi:
- Choyamba, ife kuitanitsa libs zofunika ndi chinthu china;
sql_server_dsNdiList[namedtuple[str, str]]ndi mayina a maulumikizidwe ochokera ku Airflow Connections ndi nkhokwe zomwe tidzatengere mbale yathu;dag- kulengeza kwa dag yathu, yomwe iyenera kukhalamoglobals(), apo ayi Airflow siipeza. Doug nayenso ayenera kunena kuti:- dzina lake ndani
orders- dzinali lidzawonekera pa intaneti, - kuti azigwira ntchito kuyambira pakati pausiku pa Julayi XNUMX,
- ndipo iyenera kuthamanga, pafupifupi maola 6 aliwonse (kwa anyamata amphamvu pano m'malo mwake
timedelta()zovomerezekacron- mzere0 0 0/6 ? * * *, kwa ochepera - mawu ngati@daily);
- dzina lake ndani
workflow()adzachita ntchito yayikulu, koma osati tsopano. Pakadali pano, tingotaya nkhani yathu mu chipika.- Ndipo tsopano matsenga osavuta kupanga ntchito:
- timadutsa magwero athu;
- yambitsani
PythonOperator, yomwe idzachita dummy yathuworkflow(). Musaiwale kutchula dzina lapadera (mkati mwa dag) la ntchitoyo ndikumangirira dag palokha. Mbenderaprovide_contextnayenso, adzatsanulira mfundo zowonjezera mu ntchitoyi, yomwe tidzasonkhanitsa mosamala pogwiritsa ntchito**context.
Kwa tsopano, ndizo zonse. Zomwe tapeza:
- dag yatsopano pa intaneti,
- ntchito zana limodzi ndi theka zomwe zidzachitike mofanana (ngati Airflow, Selari zoikamo ndi mphamvu ya seva zimalola).
Chabwino, pafupifupi ndinachipeza icho.

Ndani aziyika zodalira?
Kuti ndichepetse zonsezi, ndidalowa docker-compose.yml kukonza requirements.txt pa nodes zonse.
Tsopano ndi izi:

Mabwalo otuwa ndi zochitika zomwe zimakonzedwa ndi wokonza mapulani.
Tikudikirira pang'ono, ntchitozo zimasinthidwa ndi antchito:

Zobiriwira, ndithudi, zamaliza bwino ntchito yawo. Zofiira sizopambana kwambiri.
Mwa njira, palibe chikwatu pa prod yathu
./dags, palibe kulunzanitsa pakati pa makina - dags zonse zagonagitpa Gitlab yathu, ndipo Gitlab CI imagawira zosintha pamakina polumikizanamaster.
Pang'ono ndi Flower
Pamene ogwira ntchito akuphwanya ma pacifiers athu, tiyeni tikumbukire chida china chomwe chingatiwonetse chinachake - Flower.
Tsamba loyamba lomwe lili ndi chidziwitso chachidule pamanodi antchito:

Tsamba lamphamvu kwambiri lomwe lili ndi ntchito zomwe zidayamba kugwira ntchito:

Tsamba lotopetsa kwambiri lomwe lili ndi udindo wa broker wathu:

Tsamba lowala kwambiri lili ndi ma graph a ntchito ndi nthawi yawo yochitira:

Timatsitsa zotsitsa
Choncho, ntchito zonse zatheka, mukhoza kunyamula ovulala.

Ndipo panali ambiri ovulazidwa - pazifukwa zina. Pankhani ya kugwiritsidwa ntchito moyenera kwa Airflow, mabwalo omwewa akuwonetsa kuti deta sinafike.
Muyenera kuyang'ana chipika ndikuyambitsanso zochitika zomwe zagwa.
Podina pa lalikulu lililonse, tiwona zomwe tingachite:

Inu mukhoza kutenga ndi kupanga Chotsani kugwa. Ndiye kuti, timayiwala kuti china chake chalephera pamenepo, ndipo ntchito yofananira ipita kwa wopanga.

Zikuwonekeratu kuti kuchita izi ndi mbewa ndi mabwalo onse ofiira sizowoneka bwino - izi sizomwe timayembekezera kuchokera ku Airflow. Mwachibadwa, tili ndi zida zowononga kwambiri: Browse/Task Instances

Tiyeni tisankhe chilichonse nthawi imodzi ndikukhazikitsanso ziro, dinani chinthu choyenera:

Pambuyo poyeretsa, ma taxi athu amawoneka motere (akuyembekezera kale kuti wokonza mapulani awakonzere):

Zogwirizana, mbewa ndi zosintha zina
Yakwana nthawi yoti tiwone DAG yotsatira, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Kodi aliyense adalembapo lipoti? Uyu ndi iye kachiwiri: pali mndandanda wa magwero komwe mungapeze deta; pali ndandanda kumene kuika; musaiwale kulira zonse zikachitika kapena kusweka (chabwino, izi sizokhudza ife, ayi).
Tiyeni tidutsenso fayiloyo ndikuwona zatsopano zosamveka:
from commons.operators import TelegramBotSendMessage- palibe chomwe chimatilepheretsa kupanga operekera athu, omwe tidatengerapo mwayi popanga kapu yaing'ono yotumiza mauthenga ku Unblocked. (Tilankhula zambiri za wogwiritsa ntchito uyu pansipa);default_args={}- dag ikhoza kugawa zotsutsana zomwezo kwa onse ogwira nawo ntchito;to='{{ var.value.all_the_kings_men }}'- mundatoSitidzakhala ndi ma hardcode, koma opangidwa mwamphamvu pogwiritsa ntchito Jinja ndi zosinthika ndi mndandanda wa maimelo, omwe ndidayikamo mosamala.Admin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS- chikhalidwe choyambitsa woyendetsa. Kwa ife, kalatayo idzawulukira kwa mabwana pokhapokha ngati zodalira zonse zatha bwino;tg_bot_conn_id='tg_main'- mikanganoconn_idvomerezani ma ID olumikizana omwe timapangamoAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- mauthenga mu Telegraph amawuluka pokhapokha ngati pali ntchito zomwe zagwa;task_concurrency=1- timaletsa kuyambitsa nthawi imodzi kwa zochitika zingapo za ntchito imodzi. Kupanda kutero, tipeza kukhazikitsidwa kwakanthawi kochepa kwa angapoVerticaOperator(kuyang'ana pa tebulo limodzi);report_update >> [email, tg]- onseVerticaOperatorsinthani potumiza makalata ndi mauthenga, monga chonchi:

Koma popeza ogwiritsa ntchito azidziwitso ali ndi zosiyana zoyambira, ndi imodzi yokha yomwe ingagwire ntchito. Mu Tree View, chilichonse chikuwoneka chocheperako:

Ine ndinena mawu ochepa za zazikulu ndi anzawo - zosintha.
Macro ndi zoikira malo za Jinja zomwe zimatha kusintha zambiri zothandiza m'makambirano a ogwiritsa ntchito. Mwachitsanzo, monga chonchi:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} idzakula ku zomwe zili muzosintha execution_date mu mawonekedwe YYYY-MM-DD: 2020-07-14. Gawo labwino kwambiri ndilakuti zosintha zamakina zimakhomeredwa ku zochitika zinazake (mzere mu Tree View), ndipo ikayambiranso, oikira malowo amakula kuzinthu zomwezo.
Makhalidwe omwe adapatsidwa amatha kuwonedwa pogwiritsa ntchito batani la Rendered pazochitika zilizonse. Umu ndi momwe ntchito yotumizira kalata:

Ndiye pa ntchito ndi kutumiza meseji:

Mndandanda wathunthu wama macro omangidwa amtundu waposachedwa ukupezeka apa:
Kuphatikiza apo, mothandizidwa ndi mapulagini, titha kulengeza ma macros athu, koma iyi ndi nkhani ina.
Kuphatikiza pa zinthu zomwe zafotokozedweratu, titha kulowetsa m'malo mwazosintha zathu (ndagwiritsa kale izi pama code pamwambapa). Tiyeni tipange mkati Admin/Variables zinthu ziwiri:

Zonse zomwe mungagwiritse ntchito:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Mtengo ukhoza kukhala scalar, kapena ukhoza kukhala JSON. Ngati JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}ingogwiritsani ntchito njira yopita ku kiyi yomwe mukufuna: {{ var.json.bot_config.bot.token }}.
Ndilankhula mawu amodzi ndikuwonetsa chithunzi chimodzi cha malumikizidwe. Chilichonse ndi choyambirira apa: patsamba Admin/Connections timapanga kulumikizana, kuwonjezera ma logins / mapasiwedi athu ndi magawo ena enieni pamenepo. Ngati chonchi:

Mawu achinsinsi amatha kubisika (mochuluka kuposa osakhazikika), kapena mutha kusiya mtundu wolumikizira (monga momwe ndidachitira tg_main) - Chowonadi ndi chakuti mndandanda wa mitunduyo ndi wokhazikika mumitundu ya Airflow ndipo sungathe kukulitsidwa popanda kulowa m'mabuku oyambira (ngati mwadzidzidzi sindinagwiritse ntchito google, chonde ndikonzereni), koma palibe chomwe chingatilepheretse kulandira ngongole pokhapokha dzina.
Mukhozanso kupanga maulendo angapo ndi dzina lomwelo: pamenepa, njira BaseHook.get_connection(), zomwe zimatipangitsa kulumikizana ndi dzina, zidzapereka mwachisawawa kuchokera ku namesakes angapo (zingakhale zomveka kupanga Round Robin, koma tiyeni tisiye pa chikumbumtima cha opanga Airflow).
Zosintha ndi Malumikizidwe ndi zida zabwino, koma ndikofunikira kuti musataye malire: ndi magawo ati amayendedwe anu omwe mumasunga mu code yokha, ndi magawo ati omwe mumapereka ku Airflow kuti musungidwe. Kumbali imodzi, zitha kukhala zabwino kusintha mwachangu mtengo, mwachitsanzo, bokosi lamakalata, kudzera mu UI. Kumbali inayi, izi zikadali kubwereranso kugunda kwa mbewa, komwe ife (ine) timafuna kuchotsa.
Kugwira ntchito mogwirizana ndi chimodzi mwazofunikira mbedza. Nthawi zambiri, ma mbewa a Airflow ndi malo olumikizirana ndi mautumiki a chipani chachitatu ndi malaibulale. Mwachitsanzo, JiraHook adzatsegula kasitomala kuti tizilumikizana ndi Jira (mutha kusuntha ntchito mmbuyo ndi mtsogolo), komanso mothandizidwa ndi SambaHook mutha kukankhira fayilo yakumaloko smb-mfundo.
Kufotokozera wogwiritsa ntchito mwachizolowezi
Ndipo ife tinayandikira kuyang'ana momwe izo zimapangidwira TelegramBotSendMessage
kachidindo commons/operators.py ndi woyendetsa weniweni:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Apa, monga china chilichonse mu Airflow, zonse ndi zophweka:
- Cholowa kuchokera
BaseOperator, yomwe imagwiritsa ntchito zinthu zingapo za Airflow (yang'anani nthawi yanu yopumula) - Minda yolengezedwa
template_fields, momwe Jinja adzayang'ana ma macros kuti akonze. - Anakonza mfundo zoyenera
__init__(), ikani zosintha ngati pakufunika. - Sitinaiwale za kukhazikitsidwa kwa makolo.
- Anatsegula mbedza yogwirizana
TelegramBotHookadalandira chinthu cha kasitomala kuchokera pamenepo. - Njira yowonjezereka (yofotokozedwanso).
BaseOperator.execute(), yomwe Airfow idzagwedezeka ikafika nthawi yoyambitsa woyendetsa - mmenemo tidzakhazikitsa ntchito yaikulu, kuiwala kulowa. (Timalowa, mwa njira, momwemostdoutиstderr- Kuyenda kwa mpweya kumasokoneza chilichonse, kukulunga mokongola, ndikuwola ngati kuli kofunikira.)
Tiyeni tiwone zomwe tili nazo commons/hooks.py. Gawo loyamba la fayilo, ndi mbedza yokha:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientSindikudziwa zomwe ndingafotokoze apa, ndingowona mfundo zofunika:
- Timatenga cholowa, taganizirani za mikangano - nthawi zambiri idzakhala imodzi:
conn_id; - Kuposa njira zokhazikika: Ndinadzichepetsera ndekha
get_conn(), momwe ndimapezera magawo olumikizirana ndi dzina ndikungotenga gawoloextra(uwu ndi gawo la JSON), momwe ine (malinga ndi malangizo anga!) ndinayika chizindikiro cha Telegraph bot:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Ndimapanga chitsanzo chathu
TelegramBot, kupereka chizindikiro chenicheni.
Ndizomwezo. Mutha kupeza kasitomala ku mbedza pogwiritsa ntchito TelegramBotHook().clent kapena TelegramBotHook().get_conn().
Ndipo gawo lachiwiri la fayilo, momwe ndimapanga microwrapper ya Telegraph REST API, kuti ndisakokere zomwezo. kwa njira imodzi sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Njira yolondola ndikuwonjezera zonse:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- mu pulogalamu yowonjezera, ikani posungira anthu, ndikupatseni Open Source.
Tili kuphunzira zonsezi, zosintha zathu za lipoti zidalephera bwino ndikunditumizira uthenga wolakwika panjira. Ndifufuze kuti ndione ngati zalakwika...

Chinachake chinathyoka m'kamwa mwathu! Kodi si zimene tinali kuyembekezera? Ndendende!
Kodi mukuthira?
Mukuwona kuti ndaphonyapo kanthu? Zikuwoneka kuti adalonjeza kusamutsa deta kuchokera ku SQL Server kupita ku Vertica, ndiyeno adayitenga ndikuchoka pamutuwo, wonyoza!
Nkhanza imeneyi inali yadala, ndinangofunika kukumasulirani mawu enaake. Tsopano mutha kupita patsogolo.
Plan yathu inali iyi:
- Kodi dag
- Pangani ntchito
- Onani kukongola kwa chilichonse
- Perekani manambala a gawo kuti mudzaze
- Pezani zambiri kuchokera ku SQL Server
- Ikani data mu Vertica
- Sungani ziwerengero
Chifukwa chake, kuti ndikwaniritse zonsezi, ndidawonjezera pang'ono ku zathu docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyPamenepo timakweza:
- Vertica ngati woyang'anira
dwhndi makonda osasintha kwambiri, - magawo atatu a SQL Server,
- timadzaza nkhokwe pamapeto pake ndi zina (nthawi zonse musayang'ane
mssql_init.py!)
Timakhazikitsa zabwino zonse mothandizidwa ndi lamulo lovuta pang'ono kuposa nthawi yapitayi:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Zomwe zozizwitsa zathu za randomizer zidapanga, mutha kugwiritsa ntchito chinthucho Data Profiling/Ad Hoc Query:

Chinthu chachikulu sikuwonetsa kwa akatswiri
fotokozani Gawo la ETL Sindingatero, zonse ndi zazing'ono pamenepo: timapanga maziko, pali chizindikiro mmenemo, timakulunga chirichonse ndi woyang'anira nkhani, ndipo tsopano tichita izi:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15gawo.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passNthawi yafika sonkhanitsani deta yathu kuchokera pa magome athu zana limodzi ndi theka. Tiyeni tichite izi mothandizidwa ndi mizere yonyozeka kwambiri:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Ndi chithandizo cha mbedza timapeza kuchokera ku Airflow
pymssql- kugwirizana - Tiyeni tisinthire chiletso mu mawonekedwe a tsiku mu pempho - lidzaponyedwa mu ntchito ndi injini ya template.
- Kudyetsa pempho lathu
pandasamene atitenga ifeDataFrame- zidzakhala zothandiza kwa ife mtsogolo.
Ndikugwiritsa ntchito m'malo
{dt}m'malo mwa pempho parameter%sosati chifukwa ndine Pinocchio woyipa, koma chifukwapandassindingathe kupirirapymssqlndipo amazembera womalizaparams: Listngakhale akufunadituple.
Onaninso kuti wopangapymssqladaganiza zosiya kumuthandiza, ndipo nthawi yoti asamuke yafikapyodbc.
Tiyeni tiwone zomwe Airflow idayika mikangano yantchito zathu ndi:

Ngati palibe deta, ndiye kuti palibe chifukwa chopitirizira. Koma ndizodabwitsanso kulingalira kuti kudzazidwa kumakhala kopambana. Koma uku sikulakwa. A-ah-ah, kuchita chiyani?! Ndipo izi ndi zomwe:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException tidzauza Airflow kuti palibe zolakwika, koma timadumpha ntchitoyi. Mawonekedwewo sadzakhala ndi mabwalo obiriwira kapena ofiira, koma pinki.
Tiyeni tiponye ma data athu mizati yambiri:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Zotere:
- Nawonso database yomwe tidatengerako maoda,
- ID ya gawo lathu la kusefukira kwa madzi (zikhala zosiyana pa ntchito iliyonse),
- Hashi yochokera ku gwero ndi ID - kotero kuti mu database yomaliza (komwe zonse zimatsanuliridwa patebulo limodzi) tili ndi ID yapadera.
Gawo lomaliza litsalira: tsanulirani chilichonse ku Vertica. Ndipo, zodabwitsa mokwanira, imodzi mwa njira zochititsa chidwi komanso zothandiza kwambiri zochitira izi ndi CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Tikupanga cholandila chapadera
StringIO. pandasadzaika wathu mokomaDataFramemu mawonekedweCSV-mizere.- Tiyeni titsegule kulumikizana ndi Vertica yomwe timakonda ndi mbedza.
- Ndipo tsopano ndi chithandizo
copy()tumizani deta yathu mwachindunji ku Vertika!
Tidzatenga kwa dalaivala mizere ingati yomwe idadzazidwa, ndikuwuza woyang'anira gawoli kuti zonse zili bwino:
session.loaded_rows = cursor.rowcount
session.successful = TrueNdizomwezo.
Pogulitsa, timapanga mbale yomwe tikufuna pamanja. Apa ndinadzilola ndekha makina ang'onoang'ono:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)Ndikugwiritsa ntchito
VerticaOperator()Ndimapanga schema ya database ndi tebulo (ngati palibe, ndithudi). Chinthu chachikulu ndikukonzekera bwino zodalira:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadKuphatikizidwa
- Chabwino, - anati mbewa yaying'ono, - sichoncho, tsopano
Kodi mukutsimikiza kuti ndine nyama yoopsa kwambiri m'nkhalangoyi?
Julia Donaldson, The Gruffalo
Ndikuganiza ngati ine ndi anzanga tikhala ndi mpikisano: ndani adzalenga mwamsanga ndikuyambitsa ndondomeko ya ETL kuyambira pachiyambi: iwo ndi SSIS yawo ndi mbewa ndi ine ndi Airflow ... Wow, ndikuganiza kuti mukuvomera kuti ndiwamenya mbali zonse!
Ngati mozama pang'ono, ndiye Apache Airflow - pofotokoza njira zamapulogalamu apulogalamu - idachita ntchito yanga. zambiri omasuka komanso osangalatsa.
Kutalikirana kwake kopanda malire, potsata mapulagini komanso kutengera scalability, kumakupatsani mwayi wogwiritsa ntchito Airflow pafupifupi m'dera lililonse: ngakhale paulendo wonse wosonkhanitsa, kukonzekera ndi kukonza zidziwitso, ngakhale poyambitsa maroketi (ku Mars, a njira).
Gawo lomaliza, zolemba ndi zambiri
Rake takusankhani
start_date. Inde, iyi ndi kale meme yakomweko. Kudzera mkangano waukulu wa Dougstart_datezonse zimapita. Mwachidule, ngati mwatsimikizastart_datetsiku lapano, ndischedule_interval- tsiku lina, ndiye DAG iyamba mawa palibe kale.start_date = datetime(2020, 7, 7, 0, 1, 2)Ndipo palibenso mavuto.
Palinso vuto lina la nthawi yoyendetsera ntchito lomwe likugwirizana nalo:
Task is missing the start_date parameter, zomwe nthawi zambiri zimasonyeza kuti mwaiwala kumangiriza kwa dag operator.- Zonse pamakina amodzi. Inde, ndi maziko (Airflow yokha ndi zokutira zathu), ndi seva ya intaneti, ndi ndondomeko, ndi ogwira ntchito. Ndipo zinathandizanso. Koma patapita nthawi, chiwerengero cha ntchito za mautumiki chinakula, ndipo PostgreSQL itayamba kuyankha ndondomeko mu 20 s m'malo mwa 5 ms, tinayitenga ndikuyitenga.
- LocalExecutor. Inde, tidakhalabe pamenepo, ndipo tabwera kale m'mphepete mwa phompho. LocalExecutor yatikwanira mpaka pano, koma tsopano ndi nthawi yoti tifutukule ndi wogwira ntchito m'modzi, ndipo tifunika kuyesetsa kusamukira ku CeleryExecutor. Ndipo poganizira kuti mutha kugwira nawo ntchito pamakina amodzi, palibe chomwe chimakulepheretsani kugwiritsa ntchito Selari ngakhale pa seva, zomwe "zachidziwikire, sizidzapanganso, moona mtima!"
- Zosagwiritsa ntchito zida zomangidwa:
- Kulumikizana kusunga zizindikiro za utumiki,
- SLA Amaphonya kuyankha ntchito zomwe sizinachitike panthawi yake,
- xcom pakusinthana kwa metadata (ndinatero metadata!) pakati pa ntchito za dag.
- Kugwiritsa ntchito makalata molakwika. Chabwino, ndinganene chiyani? Zidziwitso zidakhazikitsidwa pazobwereza zonse za ntchito zomwe zidagwa. Tsopano ntchito yanga ya Gmail ili ndi maimelo a 90k ochokera ku Airflow, ndipo chinsinsi cha makalata apaintaneti chikukana kunyamula ndikuchotsa oposa 100 nthawi imodzi.
Zovuta zina:
Zida zowonjezera zokha
Kuti tigwire ntchito mochulukira ndi mitu yathu osati ndi manja athu, Airflow yatikonzera izi:
- - akadali ndi udindo wa Experimental, zomwe sizimamulepheretsa kugwira ntchito. Ndi izo, simungapeze zambiri zokhudza dags ndi ntchito, komanso kuyimitsa / kuyambitsa dag, pangani DAG Run kapena dziwe.
- - zida zambiri zimapezeka kudzera pamzere wolamula zomwe sizongovuta kugwiritsa ntchito kudzera pa WebUI, koma nthawi zambiri kulibe. Mwachitsanzo:
backfillzofunika kuyambitsanso zochitika zantchito.
Mwachitsanzo, openda adabwera nati: “Ndipo inu, comrade, muli ndi nkhani zopanda pake kuyambira pa Januware 1 mpaka 13! Konzani, konzani, konzani, konzani! Ndipo ndinu hob yotere:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Ntchito yoyambira:
initdb,resetdb,upgradedb,checkdb. run, zomwe zimakupatsani mwayi woyendetsa ntchito imodzi, komanso kugoletsa pazodalira zonse. Komanso, mukhoza kuthamanga kudzeraLocalExecutor, ngakhale mutakhala ndi gulu la Selari.- Amachita chimodzimodzi
test, kokha komanso m'munsi salemba kanthu. connectionsamalola kupanga misa yolumikizana kuchokera ku chipolopolo.
- - njira yolimba yolumikizirana, yomwe imapangidwira mapulagini, osati kudzaza ndi manja pang'ono. Koma ndani atiletse kuti tisapite
/home/airflow/dags, thamangaipythonndikuyamba kusokoneza? Mukhoza, mwachitsanzo, kutumiza mauthenga onse ndi code iyi:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Kulumikiza ku metadatabase ya Airflow. Sindikulimbikitsani kulembera, koma kupeza magawo azinthu zosiyanasiyana kumatha kukhala kwachangu komanso kosavuta kuposa kugwiritsa ntchito ma API aliwonse.
Tinene kuti si ntchito zathu zonse zomwe zili zopanda ntchito, koma nthawi zina zimatha kugwa, ndipo izi ndizabwinobwino. Koma ma blockages ochepa amakayikira kale, ndipo pangafunike kuyang'ana.
Chenjerani ndi SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
powatsimikizira
Ndipo zowonadi, maulalo khumi oyamba kuchokera pakutulutsidwa kwa Google ndi zomwe zili mufoda ya Airflow kuchokera ku ma bookmark anga.
- - Inde, tiyenera kuyamba ndi ofesi. zolemba, koma ndani amawerenga malangizo?
- - Chabwino, werengani malingaliro ochokera kwa omwe adapanga.
- - poyambira: mawonekedwe ogwiritsa ntchito pazithunzi
- - mfundo zazikuluzikulu zikufotokozedwa bwino, ngati (mwadzidzidzi!) Simunamvetse chinachake kuchokera kwa ine.
- - kalozera wachidule wokhazikitsa gulu la Airflow.
- - pafupifupi nkhani yosangalatsa yofanana, kupatula mwamwambo, ndi zitsanzo zochepa.
- - za kugwira ntchito molumikizana ndi Selari.
- - za kusakwanira kwa ntchito, kutsitsa ndi ID m'malo mwa tsiku, kusintha, mawonekedwe a fayilo ndi zinthu zina zosangalatsa.
- - kudalira kwa ntchito ndi Trigger Rule, zomwe ndidazitchula podutsa.
- - momwe mungagonjetsere "ntchito zina monga momwe adafunira" muzokonzera, kutsitsa zomwe zatayika ndikuyika ntchito patsogolo.
- - Mafunso othandiza a SQL ku metadata ya Airflow.
- - pali gawo lothandiza pakupanga sensa yachizolowezi.
- - cholembera chachifupi chosangalatsa chomanga maziko pa AWS ya Data Science.
- - zolakwa zofala (pamene wina sanawerengebe malangizo).
- - kumwetulira momwe anthu amavutikira kusunga mapasiwedi, ngakhale mutha kugwiritsa ntchito ma Connections.
- - kutumiza kwa DAG mosabisa, kuyika zochitika, komanso za kudalira, komanso kulumpha kukhazikitsidwa kwa ntchito.
- - za kugwiritsa ntchito
default argumentsиparamsmu ma templates, komanso Zosintha ndi Zolumikizira. - - nkhani ya momwe wokonza mapulani akukonzekera Airflow 2.0.
- - Nkhani yachikale pang'ono yokhudza kuyika gulu lathu mkati
docker-compose. - - ntchito zamphamvu pogwiritsa ntchito ma template ndi kutumiza nkhani.
- - zidziwitso zanthawi zonse komanso zamakalata ndi Slack.
- - Ntchito zanthambi, macros ndi XCom.
Ndipo maulalo omwe agwiritsidwa ntchito m'nkhaniyi:
- - zosungira malo zomwe zilipo kuti zigwiritsidwe ntchito m'ma templates.
- - Zolakwa wamba popanga dags.
- -
docker-composekwa kuyesa, kukonza zolakwika ndi zina zambiri. - - Python wrapper ya Telegraph REST API.
Source: www.habr.com




