Kia ora, ko Dmitry Logvinenko ahau - Kaihanga Raraunga o te Tari Tatari o te roopu kamupene Vezet.
Ka korero ahau ki a koe mo tetahi taputapu whakamiharo mo te whakawhanake i nga tikanga ETL - Apache Airflow. Engari ko te Airflow he tino mohio me te maha o nga waahanga me ata titiro koe ahakoa kaore koe e uru ki nga rerenga raraunga, engari me whakarewahia e koe nga tikanga me te aro turuki i o raatau mahi.
Ae, kaore au e korero noa, engari ka whakaatu ano hoki: he maha nga tohu o te kaupapa, nga whakaahua me nga taunakitanga.

Ko nga mea ka kitea e koe ina google koe i te kupu Airflow / Wikimedia Commons
Ripanga o nga ihirangi
Whakataki
He rite tonu a Apache Airflow ki a Django:
- tuhia ki te python
- he papa whakahaere pai,
- whakawhänui ake ake
- he pai ake, he mea hanga mo nga kaupapa rereke, ara (i tuhia ki mua i te kata):
- te whakahaere me te aro turuki i nga mahi i runga i te maha o nga mihini (he maha nga herewi / Kubernetes me to hinengaro ka whakaae koe)
- me te whakaputa rerengamahi hihiri mai i te tino ngawari ki te tuhi me te mohio ki te waehere Python
- me te kaha ki te hono i nga papaa raraunga me nga API ki a raatau ano ma te whakamahi i nga waahanga kua rite me nga taputapu hanga-whare (he tino ngawari).
Ka whakamahia e matou a Apache Airflow penei:
- ka kohia e matou nga raraunga mai i nga momo puna (he maha nga waahanga SQL Server me PostgreSQL, nga momo API me nga inenga tono, tae noa ki te 1C) i DWH me ODS (kei a matou a Vertica me Clickhouse).
- pehea te matatau
cron, ka timata i nga tukanga whakakotahi raraunga i runga i te ODS, me te aroturuki hoki i to raatau tiaki.
Tae noa ki tenei wa, i hipokina o matou hiahia e tetahi tūmau iti me te 32 cores me te 50 GB o te RAM. I roto i te Airflow, ka mahi tenei:
- atu 200 tau (ko nga rerengamahi, i whakakiia e matou nga mahi),
- i ia toharite 70 nga mahi,
- ka timata tenei pai (me te toharite) kotahi te haora.
A mo te pehea i whakawhānuihia ai, ka tuhia e ahau ki raro, engari inaianei me tautuhi te raruraru-über ka whakatauhia e tatou:
E toru nga punanga SQL Servers, e 50 nga papaaarangi o ia waahanga - he tauira o te kaupapa kotahi, he rite tonu te hanganga (tata ki nga waahi katoa, mua-ha-ha), ko te tikanga he ripanga Whakataua kei ia tangata (waimarie, he tepu me tera. Ka taea te pana te ingoa ki tetahi umanga). Ka tangohia e matou nga raraunga ma te taapiri i nga mara ratonga (tumau puna, puna raraunga, ID mahi ETL) ka maka ki roto, kii, Vertica.
Haere tatou!
Ko te waahanga matua, mahi (me te iti o te kaupapa)
He aha tatou (me koe)
I te wa e nui ana nga rakau, he maamaa ahau SQL-schik i roto i tetahi hokohoko Ruhia, i tinihangatia e matou nga tukanga ETL aka rere raraunga ma te whakamahi i nga taputapu e rua e waatea ana ki a maatau:
- Pokapū Hiko Informatica - he punaha tino horahanga, tino whai hua, me ona ake taputapu, tana ake whakaputanga. I whakamahia e ahau te Atua kia kore e 1% o ona kaha. He aha? Kaati, ko te tuatahi, ko tenei atanga, mai i te tau 380, ka pehi a hinengaro ki a tatou. Tuarua, kua hangaia tenei taputapu mo nga mahi tino ataahua, te whakamahi ano i nga waahanga riri me etahi atu mahi tino-nui-ahumahi-tinihanga. Mo te meka he utu, penei i te parirau o te Airbus AXNUMX / tau, kaore matou e kii i tetahi mea.
Kia tupato, ka taea e te whakaahua te whara i nga tangata kei raro iho i te 30 tau

- Tūmau Whakauru SQL Server - i whakamahia e matou tenei hoa i roto i a maatau rerenga kaupapa-roto. Ae ra: kua whakamahia kētia e matou a SQL Server, a he mea poauau te kore e whakamahi i ana taputapu ETL. He pai nga mea katoa: he ataahua nga atanga e rua, me nga korero o te ahunga whakamua ... Engari ehara tenei i te mea e pai ana matou ki nga hua rorohiko, aue, ehara mo tenei. Putanga reira
dtsx(ko te XML me nga pona kua riwhi i te tiaki) ka taea, engari he aha te take? Me pehea te hanga i tetahi kete mahi ka toia nga rau ripanga mai i tetahi tūmau ki tetahi atu? Ae, he aha te rau, ka taka to maihao tohu mai i nga wahanga e rua tekau, ka paato i te paatene kiore. Engari he ahua ahua ake:
I rapua e matou he huarahi ki waho. Te take ahakoa tata i tae mai ki tetahi kaihanga putea SSIS kua tuhia e ia ake ...
…katahi ka kitea he mahi hou i ahau. Na ka mauhia ahau e Apache Airflow.
I taku kitenga ko nga whakaahuatanga tukanga ETL he waehere Python ngawari, kaore au i kanikani mo te koa. Koinei te ahua o te whakaputanga me te rereke o nga rerenga raraunga, me te ringihia nga teepu me te hanganga kotahi mai i nga rau o nga papaa raraunga ki te whaainga kotahi ka waiho hei take mo te waehere Python i roto i te kotahi me te hawhe, e rua ranei nga mata 13 ".
Huihui i te tautau
Kaua e whakarite i tetahi whare wananga katoa, kaua hoki e korero mo nga mea tino kitea i konei, penei i te whakauru i te Rererangi, to papaarangi kua tohua, Celery me etahi atu keehi e whakaahuatia ana i roto i nga tauranga.
Kia taea ai e tatou te timata tonu i nga whakamatautau, i tuhi ahau docker-compose.yml kei roto:
- Kia piki ake airflow: Kaihōtaka, Tukutuku. Ka huri ano te Puawai ki reira ki te aro turuki i nga mahi herewi (na te mea kua panaia ki roto
apache/airflow:1.10.10-python3.7, engari kaore matou e whakaaro) - PostgreSQL, ka tuhia e Airflow ana korero ratonga (raraunga kairangi, tatauranga mahi, me etahi atu), ka tohuhia e te Celery nga mahi kua oti;
- Redis, ka mahi hei kaihokohoko mahi mo te Celery;
- Kaimahi herewi, ka uru ki te mahi tika i nga mahi.
- Ki te kōpaki
./dagska taapirihia o maatau konae me te whakamaarama o dags. Ka kohia i runga i te rere, no reira kaore he take ki te huri i te puranga katoa i muri i ia tihe.
I etahi waahi, kaore i tino whakaatuhia te waehere i roto i nga tauira (kia kore ai e pakaru te tuhinga), engari i tetahi waahi ka whakarereketia i roto i te tukanga. Ka kitea nga tauira waehere mahi oti i roto i te putunga .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerTuhipoka:
- I roto i te huihuinga o te hanganga, i whakawhirinaki ahau ki te ahua rongonui - kia mohio koe ki te tirotiro. Kaore pea koe e hiahia ki tetahi atu mea i roto i to oranga.
- Ko nga tautuhinga Rererangi katoa e waatea ana ehara i te mea anake
airflow.cfg, engari na roto i nga taurangi taiao (he mihi ki nga kaiwhakawhanake), i kino ahau ki te whakamahi. - Ko te tikanga, ehara i te mea kua rite: Kaore au i te tuku i nga ngakau ki runga i nga ipu, kaore au i raru ki te haumaru. Engari i mahia e ahau te iti rawa e tika ana mo o maatau whakamatautau.
- Kia mahara ko:
- Me uru te kōpaki dag ki te kaihōtaka me nga kaimahi.
- Ka pa ano ki nga whare pukapuka tuatoru katoa - me whakauru katoa ki runga i nga miihini me te raarangi me nga kaimahi.
Inaianei he ngawari noa:
$ docker-compose up --scale worker=3Ka ara ake nga mea katoa, ka taea e koe te titiro ki nga hononga tukutuku:
- Whakawhiti Air
- Puawai:
Ngā ariā taketake
Mena kaore koe i mohio ki tetahi mea i roto i enei "dags", koinei te papakupu poto:
- Kaihoahoa - te matua keke nui i roto i te Airflow, e whakahaere ana i nga mahi a nga karetao, ehara i te tangata: ka aro turuki i te raarangi, ka whakahou i nga ra, ka whakarewahia nga mahi.
I te nuinga o nga wa, i nga waahanga tawhito, he raruraru ki a ia mo te mahara (kao, ehara i te amnesia, engari he turuturu) a ka noho tonu te tawhā tuku iho ki nga whirihora.
run_duration— tona wa whakaara ano. Inaianei kua pai nga mea katoa. - DAG (aka "dag") - "whakahaere kauwhata acyclic", engari ko te whakamaarama penei he iti noa nga taangata, engari he ipu mo nga mahi e mahi tahi ana (tirohia ki raro) he taapiri ranei mo te Package in SSIS me te Rerengamahi i Informatica .
I tua atu i nga dags, tera pea he subdags, engari kaore pea e tae atu ki a raatau.
- Rere DAG - te ra kua tohua, kua tohua ki a ia ake
execution_date. Ka taea e Dagrans o taua ra te mahi whakarara (mehemea kua whakatauhia e koe o mahi, o te akoranga). - Tohutūmahi he waahanga waehere te kawenga mo te mahi i tetahi mahi motuhake. E toru nga momo kaiwhakahaere:
- mahirite to tatou tino pai
PythonOperator, ka taea te mahi i tetahi (mana) waehere Python; - whakawhiti, e kawe ana i nga raraunga mai i tetahi waahi ki tetahi waahi, penei,
MsSqlToHiveTransfer; - pūoko i tetahi atu taha, ka taea e koe te urupare, te whakaroa ranei i te mahi o te ra kia puta ra ano tetahi huihuinga.
HttpSensorka taea te toia te pito mutunga kua tohua, a ka tatari ana te whakautu e hiahiatia ana, timata te whakawhitiGoogleCloudStorageToS3Operator. Ka patai te hinengaro rapu: “he aha? I muri i nga mea katoa, ka taea e koe te mahi tukurua tika i roto i te kaiwhakahaere! Na, kia kore ai e purua te puna o nga mahi me nga kaiwhakahaere kua whakatarewahia. Ka tīmata te pūoko, ka taki, ka mate i mua i te nganatanga e whai ake nei.
- mahirite to tatou tino pai
- Tūmahi - Ko nga kaiwhakahaere kua whakahuahia, ahakoa he aha te momo, ka piri ki te ra ka whakatairangahia ki te taumata o te mahi.
- tauira mahi - i te wa i whakatauhia e te kaiwhakatakoto mahere kua tae ki te wa ki te tuku mahi ki te whawhai ki nga kaihaka-kaimahi (i runga tonu i te waahi, mena ka whakamahia e matou
LocalExecutorranei ki te pona mamao i roto i te take oCeleryExecutor), ka tautapahia he horopaki ki a ratou (arā, he huinga taurangi - tawhā mahi), ka whakawhanuihia nga tauira tono, patai ranei, ka kohia.
Ka whakaputa mahi maatau
Tuatahi, me whakaatu i te kaupapa whanui o to tatou doug, katahi ka rukuhia nga korero mo nga korero, na te mea ka whakamahia e matou etahi otinga kore-iti.
Na, i roto i tona ahua ngawari, ka penei te ahua o te ra:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Kia mohio tatou:
- Tuatahi, kawemai tatou i nga libs e tika ana me tetahi atu mea;
sql_server_dsKoList[namedtuple[str, str]]me nga ingoa o nga hononga mai i Airflow Connections me nga papaunga raraunga ka tangohia e matou to maatau pereti;dag- te panui o to tatou ra, me uru maiglobals(), ki te kore e kore e kitea e Airflow. Me kii ano a Doug:- ko wai tona ingoa
orders- ka puta tenei ingoa ki te atanga tukutuku, - ka mahi ia i waenganui po i te waru o Hurae,
- me oma, tata ia 6 haora (mo nga tangata uaua i konei hei utu mo
timedelta()whakaaetiacron-raina0 0 0/6 ? * * *, mo te iti ake te hauhautanga - he korero penei@daily);
- ko wai tona ingoa
workflow()ka mahi i te mahi matua, engari kaua inaianei. Mo tenei wa, ka maka noa to tatou horopaki ki te raarangi.- Na inaianei ko te makutu ngawari o te hanga mahi:
- rere tatou i roto i to tatou puna;
- arawhiti
PythonOperator, ka mahi i ta tatou poheheworkflow(). Kaua e wareware ki te tohu i tetahi ingoa ahurei (i roto i te ra) o te mahi me te here i te ra ano. Karaprovide_contexti roto i te tahuri, ka ringihia atu tohenga ki roto i te mahi, e āta kohikohi tatou te whakamahi**context.
Mo tenei wa, heoi ano. He aha ta matou i whiwhi:
- Da hou i roto i te atanga tukutuku,
- kotahi me te haurua rau nga mahi ka mahia i roto i te whakarara (mehemea ka whakaaetia e te Rererangi, nga tautuhinga Celery me te kaha o te tūmau).
Kaati, tata ka mau.

Ma wai e whakauru nga whakawhirinakitanga?
Hei whakamaarama i tenei mea katoa, i wiri ahau docker-compose.yml tukatuka requirements.txt i runga i nga pona katoa.
Inaianei kua ngaro:

Ko nga tapawha hina he waahi mahi i tukatukahia e te kaihōtaka.
Ka tatari tatou, ka mau nga mahi e nga kaimahi:

Ko nga mea kaariki, ko te tikanga, kua tutuki pai a raatau mahi. Ko nga Whero kaore i tino angitu.
Ma te ara, karekau he kōpaki kei runga i ta maatau hua
./dags, karekau he tukutahitanga i waenga i nga miihini - kei roto katoa nga dagsgiti runga i ta maatau Gitlab, a ka tohatohahia e Gitlab CI nga whakahoutanga ki nga miihini i te wa e hanumi anamaster.
He iti mo te Puawai
I te wa e patupatu ana nga kaimahi i a tatou pacifiers, kia maumahara tatou ki tetahi atu taputapu hei whakaatu mai i tetahi mea - Puawai.
Ko te wharangi tuatahi me nga korero whakarāpopototanga mō ngā kōpuku kaimahi:

Ko te wharangi tino kaha me nga mahi i haere ki te mahi:

Ko te wharangi tino hoha me te mana o to maatau kaihokohoko:

Ko te wharangi kanapa me nga kauwhata mana mahi me te waa mahi:

Ka utaina e matou nga mea kua kore e utaina
Na, kua mahi nga mahi katoa, ka taea e koe te kawe atu i nga taotu.

A he maha nga taotu - mo tetahi take, mo tetahi atu. I roto i te take o te whakamahi tika o Airflow, ko enei tapawha e tohu ana kaore nga raraunga i tae mai.
Me maataki koe i te raarangi me te whakaara ano i nga waa mahi kua hinga.
Ma te panui i tetahi tapawha, ka kite tatou i nga mahi e waatea ana ki a maatau:

Ka taea e koe te tango me te Whakakore i te hunga kua hinga. Arā, ka warewarehia kua rahua tetahi mea ki reira, ka haere ano te mahi tauira ki te kaitakataka.

E marama ana ko te mahi i tenei me te kiore me nga tapawha whero katoa kaore i te tino tangata - ehara tenei i te mea e tumanakohia ana e Airflow. Ko te tikanga, kei a tatou nga patu patu tangata: Browse/Task Instances

Me kowhiria nga mea katoa i te wa kotahi ka tautuhi ano ki te kore, pawhiria te mea tika:

Whai muri i te horoi, he penei te ahua o a maatau taxi (kei te tatari kee te kaiwhakariterite ki te whakarite i a raatau):

Hononga, matau me etahi atu taurangi
Kua tae ki te wa ki te titiro ki te DAG e whai ake nei, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Kua mahia e te katoa he whakahou purongo? Koia ano tenei: he rarangi o nga puna korero mai i hea ka whiwhi raraunga; he rarangi kei hea hei whakatakoto; kaua e wareware ki te whakatangi i te wa i pa ai nga mea katoa, i pakaru ranei (kaore tenei mo tatou, kaore).
Ka huri ano tatou i te konae ka titiro ki nga mea kerekere hou:
from commons.operators import TelegramBotSendMessage- kaore he mea e arai i a matou ki te hanga i a matou ake kaiwhakahaere, i whakamahia e matou ma te hanga takai iti mo te tuku karere ki te Wewete. (Ka korerohia e matou mo tenei kaiwhakahaere i raro nei);default_args={}- ka taea e dag te toha i nga tohenga rite ki ona kaiwhakahaere katoa;to='{{ var.value.all_the_kings_men }}'- maratoe kore matou e whai tohu pakeke, engari i hangaia ma te whakamahi i a Jinja me tetahi taurangi me te raarangi imeera, ka ata whakauruhia e au.Admin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— tikanga mo te tiimata i te kaiwhakahaere. I roto i to maatau, ka rere te reta ki nga rangatira mena kua mahi nga whakawhirinaki katoa angitu;tg_bot_conn_id='tg_main'- tohengaconn_idwhakaae ki nga TT hononga ka hangaia e matouAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- ka rere atu nga karere i roto i Telegram mena he mahi kua hinga;task_concurrency=1- ka aukatihia e matou te whakarewanga o nga wa maha o nga mahi kotahi. Ki te kore, ka whiwhi tatou i te whakarewanga tukutahi o etahiVerticaOperator(te titiro ki tetahi tepu);report_update >> [email, tg]- nga mea katoaVerticaOperatorwhakakotahi ki te tuku reta me nga karere, penei:

Engari i te mea he rereke nga tikanga whakarewanga o nga kaiwhakatakoto korero, kotahi anake ka mahi. I te Tirohanga Rakau, he iti ake te ahua o nga mea katoa:

Ka korero ahau i etahi kupu mo tonotono me o ratou hoa- taurangi.
Ko nga tonotono he kaiwhakatakoto waahi a Jinja ka taea te whakakapi i nga momo korero whaihua ki nga tohenga a te kaiwhakahaere. Hei tauira, penei:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} ka whakawhänui atu ki nga ihirangi o te taurangi horopaki execution_date i roto i te whakatakotoranga YYYY-MM-DD: 2020-07-14. Ko te mea pai rawa atu ko nga taurangi horopaki ka whana ki tetahi tauira mahi motuhake (he tapawha i te Tirohanga Rakau), a ka timata ano, ka whakawhānuihia nga waahi ki nga uara ano.
Ka taea te tiro i nga uara kua tohua ma te whakamahi i te paatene Rendered ki ia tauira mahi. He penei te mahi me te tuku reta:

Na i te mahi ki te tuku karere:

Kei konei te rarangi katoa o nga tonotono whakauru mo te putanga hou e waatea ana:
I tua atu, ma te awhina o nga monomai, ka taea e tatou te whakaatu i o tatou ake tonotono, engari he korero ano tera.
I tua atu i nga mea kua tautuhia, ka taea e taatau te whakakapi i nga uara o a maatau taurangi (kua whakamahia e au i te waehere i runga ake nei). Kia hanga tatou ki roto Admin/Variables e rua nga mea:

Ko nga mea katoa ka taea e koe te whakamahi:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Ko te uara he scalar, he JSON hoki. Mena mo JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}whakamahia noa te ara ki te kī e hiahiatia ana: {{ var.json.bot_config.bot.token }}.
Ka korero ahau i tetahi kupu ka whakaatu i tetahi whakaahua mo hononga. He mea timatanga nga mea katoa i konei: kei te wharangi Admin/Connections ka hangaia he hononga, taapiri i a maatau takiuru / kupuhipa me etahi atu taapiri motuhake ki reira. Pēnei:

Ka taea te whakamunatia nga kupuhipa (he nui ake i te taunoa), ka taea ranei e koe te whakarere i te momo hononga (penei i ahau mo tg_main) - Ko te meka ko te rarangi o nga momo he maataki i roto i nga tauira Airflow me te kore e taea te whakawhanui me te kore e uru ki nga waehere puna (mehemea karekau au i google i tetahi mea, tena koa whakatikahia ahau), engari kaore he mea e aukati i a maatau ki te whiwhi whiwhinga noa. ingoa.
Ka taea hoki e koe te hanga hononga maha me te ingoa kotahi: i tenei keehi, ko te tikanga BaseHook.get_connection(), e whiwhi hononga ki a tatou ma te ingoa, ka hoatu matapōkeretia mai i te maha o nga ingoa (he pai ake te hanga i a Round Robin, engari me waiho ma te hinengaro o nga kaihanga Airflow).
Ko nga Taurangi me nga Hononga he taputapu hauhautanga, engari he mea nui kia kaua e ngaro te toenga: ko nga waahanga o au rerenga ka penapenahia e koe i roto i te waehere ake, me nga waahanga ka hoatu e koe ki te Airflow hei rokiroki. I tetahi taha, he watea ki te whakarereke tere i te uara, hei tauira, he pouaka mēra, ma te UI. I tetahi atu taha, he hokinga tonu tenei ki te paato kiore, i hiahia matou (I) ki te whakakore atu.
Ko te mahi me nga hononga tetahi o nga mahi matau. I te nuinga o te waa, ko nga matau Airflow he tohu mo te hono atu ki nga ratonga tuatoru me nga whare pukapuka. Hei tauira, JiraHook ka whakatuwheratia he kiritaki mo tatou ki te taunekeneke ki a Jira (ka taea e koe te neke i nga mahi ki muri me muri), me te awhina o SambaHook ka taea e koe te pana i tetahi konae rohe ki smb-tohu.
Paring i te kaiwhakahaere ritenga
Na ka tata matou ki te titiro ki te ahua o te hanga TelegramBotSendMessage
Waehere commons/operators.py me te kaiwhakahaere pono:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)I konei, penei i nga mea katoa o Airflow, he tino ngawari nga mea katoa:
- I tuku iho mai
BaseOperator, e whakatinana ana i etahi mea motuhake Rererangi (tirohia to waatea) - Nga mara kua whakapuakina
template_fields, ka titiro a Jinja mo nga tonotono hei tukatuka. - Kua whakaritea nga tohenga tika mo
__init__(), tautuhia nga taunoa ki nga waahi e tika ana. - Kare hoki matou i wareware ki te timatanga o te tipuna.
- I whakatuwherahia te matau e rite ana
TelegramBotHooki whiwhi taonga kiritaki mai i a ia. - Aratuka whakakorea (tautuhi ano).
BaseOperator.execute(), ka huri a Airfow ka tae mai te wa ki te whakarewa i te kaiwhakahaere - kei roto ka whakatinanahia e matou te mahi matua, ka wareware ki te takiuru. (Ka takiuru matou, na te ara, tika ki rotostdoutиstderr- Ka haukoti te rere o te hau i nga mea katoa, ka takai ataahua, ka pirau i nga waahi e tika ana.)
Kia kite tatou he aha ta tatou commons/hooks.py. Ko te wahanga tuatahi o te konae, me te matau ano:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientKaore au i te mohio he aha te whakamarama i konei, ka tuhi noa ahau i nga mea nui:
- Ka whakawhiwhia tatou, whakaarohia nga tautohetohe - i te nuinga o te waa ka kotahi:
conn_id; - Ko nga tikanga paerewa: I whakawhäiti ahau i ahau
get_conn(), ka whiwhi ahau i nga tawhā hononga ma te ingoa me te tiki noa i te waahangaextra(he mara JSON tenei), i tuhia e au (e ai ki aku tohutohu!) te tohu karetao Telegram:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Ka hanga e ahau he tauira o to maatau
TelegramBot, ka hoatu he tohu motuhake.
Heoi ano. Ka taea e koe te tiki kaihoko mai i te matau ma te whakamahi TelegramBotHook().clent ranei TelegramBotHook().get_conn().
A ko te waahanga tuarua o te konae, ka mahia e au he miihini miihini mo te Telegram REST API, kia kore ai e toia ano. mo tetahi tikanga sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Ko te huarahi tika ko te taapiri katoa:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- i roto i te monomai, hoatu ki roto i te putunga a te iwi, ka hoatu ki te Open Source.
I a matou e ako ana i enei mea katoa, i tutuki pai a matou whakahounga purongo me te tuku karere hapa ki ahau i te hongere. Ka tirohia e au mena kei te he...

I pakaru tetahi mea i roto i to maatau kuri! He teka ianei ko ta matou i tumanako ai? Tika!
Ka ringihia e koe?
Kei te whakaaro koe i ngaro ahau i tetahi mea? Te ahua nei i oati ia ki te whakawhiti raraunga mai i te SQL Server ki a Vertica, katahi ka tangohia e ia, ka neke atu i te kaupapa, ko te ware!
Ko tenei mahi nanakia he mea whakaaro noa, me whakamaarama noa e au etahi kupu mo koe. Inaianei ka taea e koe te haere atu.
Ko ta matou mahere ko tenei:
- Mahi ra
- Hangaia nga mahi
- Tirohia te ataahua o nga mea katoa
- Whakaritea nga tau wahanga hei whakaki
- Tikina nga raraunga mai i te SQL Server
- Whakauruhia nga raraunga ki Vertica
- Kohikohia nga tatauranga
Na, ki te whakatika i enei mea katoa, i hanga e ahau he taapiri iti ki to maatau docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyI reira ka whakaarahia e matou:
- Ko Vertica hei kaihautu
dwhme te nuinga o nga tautuhinga taunoa, - e toru nga waahanga o te SQL Server,
- Ka whakakiia e matou nga papaaarangi i nga waahanga o muri me etahi raraunga (kaore rawa e tirohia
mssql_init.py!)
Ka whakarewahia e matou nga mea pai katoa ma te awhina o tetahi whakahau uaua ake i te waa whakamutunga:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Ko nga mea i hangaia e ta maatau mahi ohorere, ka taea e koe te whakamahi i te taonga Data Profiling/Ad Hoc Query:

Ko te mea nui kia kaua e whakaatu ki nga kaitätari
whakamaarama Nga waahanga e pa ana ki a ETL Kare au, he iti noa nga mea katoa i reira: ka hangaia e matou he turanga, he tohu kei roto, ka takai i nga mea katoa ki te kaiwhakahaere horopaki, a inaianei ka mahia e matou:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passKua tae mai te wa kohikohia o maatau raraunga mai i a matou tepu kotahi me te hawhe rau. Me mahi tenei ma te awhina o nga raina tino koretake:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Ma te awhina o te matau ka riro mai i te Airflow
pymssql-hono - Me whakakapi he here i roto i te ahua o te ra ki te tono - ka maka ki roto i te mahi e te miihini tauira.
- Te whangai i ta matou tono
pandasma wai tatou e tikiDataFrame- ka whai hua ki a tatou a meake nei.
Kei te whakamahi ahau i te whakakapi
{dt}hei utu mo te tawhā tono%sehara i te mea he Pinocchio kino ahau, engari na te meapandase kore e taea te hapaipymssqla ka paheke te whakamutungaparams: Listahakoa e tino hiahia ana iatuple.
Kia mahara ano ko te kaiwhakawhanakepymssqlka whakatau kia kaua e tautoko i a ia, kua tae ki te wa ki te wehepyodbc.
Kia kite tatou he aha te Airflow i whakakii i nga tautohetohe o a maatau mahi:

Mena kaore he raraunga, karekau he take ki te haere tonu. Engari he mea ke ki te whakaaro kua angitu te whakakii. Engari ehara tenei i te he. A-ah-ah, me aha?! Na konei te aha:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException ka korero ki a Airflow kaore he hapa, engari ka pekehia e matou te mahi. Ko te atanga kaore he tapawha matomato, whero ranei, engari he mawhero.
Kia makahia a maatau raraunga tīwae maha:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Koinei:
- Te pātengi raraunga i tangohia e mātou ngā ota,
- ID o ta maatau huihuinga waipuke (ka rereke mo ia mahi),
- He hash mai i te puna me te ota ID - kia i roto i te pātengi raraunga whakamutunga (kei reira ka ringihia nga mea katoa ki te tepu kotahi) kei a matou he ID ota ahurei.
Ko te taahiraa whakamutunga ka mau tonu: ringihia nga mea katoa ki Vertica. A, ko te mea whakamiharo, ko tetahi o nga huarahi tino whakamiharo me te pai ki te mahi i tenei ko te CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Kei te hanga e matou he kaiwhiwhi motuhake
StringIO. pandasKa pai te tuku i a maatauDataFrameTuhinga o muaCSV-raina.- Me whakatuwhera he hononga ki ta tatou Vertica tino pai ki te matau.
- Na inaianei me te awhina
copy()tukuna to maatau raraunga ki Vertika!
Ka tangohia e matou mai i te taraiwa e hia nga rarangi kua whakakiia, ka korero ki te kaiwhakahaere o te huihuinga he pai nga mea katoa:
session.loaded_rows = cursor.rowcount
session.successful = TrueHeoi ano.
I runga i te hokonga, ka hangaia e matou te pereti whaainga ma te ringa. I konei ka whakaaetia e ahau he miihini iti:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)Kei te whakamahi ahau
VerticaOperator()Ka waihangahia e au he mahere papaa raraunga me tetahi ripanga (mehemea karekau ano, ko te tikanga). Ko te mea nui ko te whakarite tika i nga whakawhirinaki:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadKohia ana
- Ae, - ka kii te kiore iti, - ehara, inaianei
Kei te tino mohio koe ko ahau te kararehe tino kino o te ngahere?
Julia Donaldson, Te Gruffalo
Ki taku whakaaro mena he whakataetae au me aku hoa mahi: ko wai ka tere te hanga me te whakarewa i tetahi mahi ETL mai i te wahanga: ko ratou me a ratou SSIS me te kiore me ahau me Airflow ... Na ka whakatauritea ano e matou te ngawari o te tiaki ... Aue, ki taku whakaaro ka whakaae koe ka whiua e au ki nga taha katoa!
Mena he iti ake te whakaaro, katahi a Apache Airflow - ma te whakaahua i nga tukanga i roto i te ahua o te waehere papatono - i mahi taku mahi nui he pai ake, he ngahau hoki.
Ko tana toronga mutunga kore, e rua mo te mono-mai me te aro nui ki te tauineine, ka whai waahi koe ki te whakamahi Airflow i nga waahi katoa: ahakoa i roto i te huringa katoa o te kohi, te whakarite me te tukatuka raraunga, tae noa ki te whakarewatanga o nga toka (ki Mars, o akoranga).
He wahanga whakamutunga, he tohutoro me nga korero
Ko te rake kua kohia e matou mo koe
start_date. Ae, he meme o te rohe tenei. Ma te tohenga matua a Dougstart_datehaere katoa. He poto, mena ka tohua e koe ki rotostart_daterā o nāianei, meschedule_interval- i tetahi ra, ka timata a DAG apopo kaore i mua.start_date = datetime(2020, 7, 7, 0, 1, 2)A kore ake he raruraru.
He hapa wa whakahaere e hono ana ki tera:
Task is missing the start_date parameter, e tohu ana kua wareware koe ki te here ki te kaiwhakahaere dag.- Katoa i runga i te miihini kotahi. Ae, me nga turanga (Airflow ake me to tatou paninga), me te tūmau tukutuku, me te kaihōtaka, me nga kaimahi. A i whai hua ano. Engari i te roanga o te wa, ka piki ake te maha o nga mahi mo nga ratonga, a, i te wa i timata a PostgreSQL ki te whakautu ki te taurangi i roto i te 20 s hei utu mo te 5 ms, ka tangohia e matou, ka haria atu.
- LocalExecutor. Ae, kei te noho tonu tatou i runga, kua tae noa mai ki te pito o te rire. Kua ranea a LocalExecutor mo matou i tenei wa, engari kua tae ki te wa ki te whakawhānui me tetahi kaimahi, me whakapau kaha ki te neke ki CeleryExecutor. A, i te mea ka taea e koe te mahi ki runga i tetahi miihini, kaore he mea e aukati i a koe ki te whakamahi i te Celery ahakoa i runga i te kaimau, "ko te tikanga, kaore e uru ki te mahi, pono!"
- Whakamahi kore taputapu hanga-i roto:
- hononga ki te penapena i nga tohu ratonga,
- Ka ngaro a SLA ki te whakautu ki nga mahi kaore i tutuki i te waa,
- xcom mo te whakawhiti metadata (i kii ahau metararaunga!) i waenga i nga mahi dag.
- Tukino Mēra. Kaati, he aha taku korero? I whakaritea nga matohi mo nga tukuruatanga katoa o nga mahi kua hinga. Inaianei kei aku mahi Gmail he >90k nga imeera mai i Airflow, karekau te ngutu mēra tukutuku ki te tango me te muku neke atu i te 100 i te wa kotahi.
Ētahi atu mahanga:
He taputapu aunoa
Kia kaha ake ai te mahi ma o tatou mahunga, kaua ma o tatou ringaringa, na Airflow i whakarite ma tatou tenei:
- - kei a ia tonu te mana o te Whakamatau, e kore e aukati i a ia ki te mahi. Ma tenei, kaore e taea e koe anake te tiki korero mo nga ra me nga mahi, engari ka mutu / tiimata he ra, hanga he DAG Run, he poka wai ranei.
- - he maha nga taputapu e waatea ana ma te raina whakahau ehara i te mea ngawari ki te whakamahi ma te WebUI, engari kei te ngaro noa. Hei tauira:
backfille hiahiatia ana ki te whakaara ano i nga tauira mahi.
Hei tauira, ka haere mai nga kaitirotiro ka kii: "A ko koe, e hoa, he poauau nga korero mai i te Hanuere 1 ki te 13! Whakatika, whakatika, whakatika, whakatika!" A he hob koe:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Ratonga turanga:
initdb,resetdb,upgradedb,checkdb. run, e taea ai e koe te whakahaere i tetahi mahi tauira, me te piro i runga i nga whakawhirinakitanga katoa. I tua atu, ka taea e koe te whakahaere ma teLocalExecutor, ahakoa he kahui Herewi koe.- He rite tonu te mahi
test, anake ano hoki i roto i nga turanga kaore e tuhia. connectionska taea te hanga papatipu hononga mai i te anga.
- - he huarahi tino uaua mo te taunekeneke, he mea tika mo nga monomai, kaua e pupuhi ki roto me nga ringaringa iti. Engari ko wai hei aukati i a maatau ki te haere
/home/airflow/dags, rereipythona ka timata ki te pohehe? Ka taea e koe, hei tauira, te kaweake i nga hononga katoa me te waehere e whai ake nei:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Te hono atu ki te papaunga raraunga Rererangi. Kaore au e pai ki te tuhi ki a ia, engari ko te whiwhi i nga ahuatanga mahi mo nga momo inenga motuhake ka tere ake, ka ngawari ake i te whakamahi i tetahi o nga API.
Me kii tatou ehara i te mea he ngoikore nga mahi katoa, engari ka taka pea i etahi wa, he mea noa tenei. Engari he ruarua nga aukati kua whakapae kee, a he mea tika kia tirohia.
Kia tupato SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
tohutoro
Ae ra, ko nga hononga tuatahi tekau mai i te tukunga a Google ko nga mea kei roto i te kōpaki Airflow mai i aku tohu tohu.
- - o te akoranga, me timata tatou ki te tari. tuhinga, engari ko wai ka panui nga tohutohu?
- - Ae, panuihia nga taunakitanga mai i nga kaihanga.
- - te timatanga: te atanga kaiwhakamahi i nga pikitia
- - he pai te whakaahuatanga o nga kaupapa matua, mena (kua ohorere!) Kaore koe i mohio ki tetahi mea mai i ahau.
- - he aratohu poto mo te whakatu kahui Rererangi.
- - tata ki te taua tuhinga whakamere, haunga pea te ahua okawa, me te iti o nga tauira.
- — mo te mahi tahi me te Herewi.
- - e pa ana ki te koretake o nga mahi, te utaina ma te ID hei utu mo te ra, huringa, hanganga konae me etahi atu mea whakamere.
- - te herenga o nga mahi me te Ture Keu, i whakahua noa ahau i te paahitanga.
- - me pehea ki te hinga etahi "mahi rite ki te whakaaro" i roto i te kaihōtaka, utaina nga raraunga ngaro me te whakarite i nga mahi.
- — nga patai SQL whai hua ki te Airflow metadata.
- - he waahanga whai hua mo te hanga i tetahi pukoro ritenga.
- - he korero poto mo te hanga hanganga i runga i te AWS mo te Pūtaiao Raraunga.
- - nga hapa noa (kaore ano tetahi e panui i nga tohutohu).
- - ataata te tangata ki te pupuri i nga kupuhipa, ahakoa ka taea e koe te whakamahi Hononga.
- - te tuku whakamua DAG, te makanga horopaki i roto i nga mahi, ano mo nga whakawhirinakitanga, me te mokowhiti i nga mahi whakarewatanga.
- - mo te whakamahi
default argumentsиparamsi roto i nga tauira, me nga Taurangi me nga Hononga. - - he korero e pa ana ki te whakarite a te kaiwhakamahere mo te Airflow 2.0.
- - he tuhinga kua tawhito noa mo te tuku i ta tatou roopu ki roto
docker-compose. - - nga mahi hihiri ma te whakamahi i nga tauira me te tuku horopaki.
- - nga panui paerewa me nga tikanga ma te mēra me te Slack.
- - Nga mahi manga, tonotono me XCom.
Me nga hononga i whakamahia i roto i te tuhinga:
- - e waatea ana nga waahi hei whakamahi i nga tauira.
- — He hapa noa i te wa e hanga ana nga ra.
- -
docker-composemo te whakamatautau, te patuiro me etahi atu. - — He takai Python mo Telegram REST API.
Source: will.com




