Aloha ʻoe, ʻo wau ʻo Dmitry Logvinenko - Data Engineer o ke Keʻena Analytics o ka hui Vezet o nā hui.
E haʻi aku wau iā ʻoe e pili ana i kahi mea hana kupanaha no ka hoʻomohala ʻana i nā kaʻina hana ETL - Apache Airflow. Akā ʻo ka Airflow he mea maʻalahi a multifaceted pono ʻoe e nānā pono iā ia inā ʻaʻole ʻoe i komo i nā kahe ʻikepili, akā pono e hoʻomaka i nā kaʻina hana a nānā i kā lākou hoʻokō.
A ʻae, ʻaʻole wau e haʻi wale aku, akā hōʻike pū kekahi: he nui nā code, nā screenshots a me nā ʻōlelo aʻoaʻo o ka papahana.

ʻO ka mea maʻamau āu e ʻike ai ke google ʻoe i ka huaʻōlelo Airflow / Wikimedia Commons
Ka papa o nā mea
Hōʻike
ʻO Apache Airflow e like me Django:
- kākau ʻia ma ka python
- aia kahi papa admin nui,
- hiki ke hoʻonui mau ʻia
- ʻoi aku ka maikaʻi, a ua hana ʻia no nā kumu like ʻole, ʻo ia hoʻi (e like me ka mea i kākau ʻia ma mua o ka kat):
- ka holo ʻana a me ka nānā ʻana i nā hana ma ka helu palena ʻole o nā mīkini (e like me ka nui o Celery / Kubernetes a me kou lunamanaʻo e ʻae iā ʻoe)
- me ka hoʻoulu ʻana o ka workflow mai ka maʻalahi loa e kākau a hoʻomaopopo i ka code Python
- a me ka hiki ke hoʻohui i nā ʻikepili a me nā API me kekahi me ka hoʻohana ʻana i nā ʻāpana i mākaukau a me nā plugins i hana ʻia e ka home (he maʻalahi loa ia).
Hoʻohana mākou i ka Apache Airflow e like me kēia:
- hōʻiliʻili mākou i ka ʻikepili mai nā kumu like ʻole (nui nā SQL Server a me PostgreSQL manawa, nā API like ʻole me nā metric noi, ʻo 1C) ma DWH a me ODS (loaʻa iā mākou ʻo Vertica a me Clickhouse).
- pehea ka holomua
cron, e hoʻomaka ana i nā kaʻina hoʻohui ʻikepili ma ka ODS, a nānā pū i kā lākou mālama.
A hiki i kēia manawa, ua uhi ʻia kā mākou mau pono e kahi kikowaena liʻiliʻi me 32 cores a me 50 GB o RAM. Ma Airflow, hana kēia:
- более 200 mau lā (ʻoiaʻiʻo nā kahe hana, kahi mākou i hoʻopiha ai i nā hana),
- i kēlā me kēia ma ka awelika 70 mau hana,
- hoʻomaka kēia maikaʻi (ma ka awelika hoʻi) hookahi hora.
A pehea mākou i hoʻonui ai, e kākau wau ma lalo, akā i kēia manawa e wehewehe mākou i ka pilikia über a mākou e hoʻoponopono ai:
ʻEkolu kumu SQL Servers, kēlā me kēia me 50 ʻikepili - nā manawa o hoʻokahi papahana, kēlā me kēia, loaʻa iā lākou ke ʻano like (kokoke ma nā wahi āpau, mua-ha-ha), ʻo ia hoʻi he papa kauoha ko kēlā me kēia. Hiki ke hoʻokomo ʻia ka inoa i kekahi ʻoihana). Lawe mākou i ka ʻikepili ma ka hoʻohui ʻana i nā kahua lawelawe (kumu kumu, waihona kumu, ETL task ID) a hoʻolei naive iā lākou i loko, e ʻōlelo, Vertica.
E hele kākou!
ʻO ka ʻāpana nui, hana (a me kahi ʻōlelo aʻoaʻo)
No ke aha mākou (a ʻo ʻoe)
I ka nui o nā kumulāʻau a ua maʻalahi wau SQL-schik i hoʻokahi hale kūʻai Lūkini, ua hoʻopunipuni mākou i nā kaʻina ETL aka kahe ʻikepili me ka hoʻohana ʻana i ʻelua mau mea hana i loaʻa iā mākou:
- Kikowaena Mana Informatica - he ʻōnaehana hoʻolaha nui loa, hoʻohua nui loa, me kāna lako ponoʻī, kāna hoʻololi ponoʻī. Ua hoʻohana au i ke Akua e pāpā i ka 1% o kona hiki. No ke aha mai? ʻAe, ʻo ka mea mua, ʻo kēia interface, ma kahi o nā makahiki 380, ua hoʻokaumaha ka noʻonoʻo iā mākou. ʻO ka lua, ua hoʻolālā ʻia kēia contraption no nā kaʻina hana maikaʻi loa, ka hoʻohana hou ʻana i ka ʻāpana huhū a me nā hoʻopunipuni ʻoihana nui loa. E pili ana i ke kumukūʻai, e like me ka ʻēheu o ka Airbus AXNUMX / makahiki, ʻaʻole mākou e ʻōlelo i kekahi mea.
E makaʻala, hiki i ka screenshot ke hōʻeha iki i ka poʻe ma lalo o 30

- SQL Server Integration Server - ua hoʻohana mākou i kēia hoa i kā mākou kahe intra-project. ʻAe, ʻoiaʻiʻo: ua hoʻohana mākou i ka SQL Server, a he mea kūpono ʻole ke hoʻohana ʻole i kāna mau mea hana ETL. Maikaʻi nā mea a pau: nani nā mea ʻelua, a hōʻike ka holomua ... ʻAʻole kēia ke kumu makemake mākou i nā huahana polokalamu, ʻaʻole no kēia. Version it
dtsx(ʻo ia ka XML me nā nodes i hoʻopaʻa ʻia i ka mālama ʻana) hiki iā mākou, akā he aha ke kumu? Pehea e pili ana i ka hana ʻana i kahi pūʻolo hana e kauo ai i nā haneli haneli mai kahi kikowaena i kekahi? ʻAe, he aha ka haneli, e hāʻule ana kou manamana lima mai nā ʻāpana he iwakālua, e kaomi ana i ke pihi ʻiole. Akā ʻoi aku ka maikaʻi o ka nānā ʻana:
Ua ʻimi maoli mākou i nā ala e puka ai. Ka hihia like kokoke hele mai i kahi mea hoʻoheheʻe pūʻolo SSIS kākau ponoʻī ...
…a laila loaʻa iaʻu kahi hana hou. A ua loaʻa iaʻu ʻo Apache Airflow.
I koʻu ʻike ʻana he code Python maʻalahi nā wehewehe kaʻina ETL, ʻaʻole wau i hula no ka hauʻoli. ʻO kēia ke ʻano o ka hoʻololi ʻana a me ka ʻokoʻa ʻana o nā kahawai ʻikepili, a ʻo ka ninini ʻana i nā papa me kahi hoʻolālā hoʻokahi mai nā haneli o nā waihona i hoʻokahi pahuhopu i lilo i mea no ka Python code i hoʻokahi a me ka hapa a i ʻole ʻelua mau pale 13 ".
ʻO ka hui ʻana i ka pūʻulu
Mai hoʻonohonoho mākou i kahi kula kindergarten, a mai kamaʻilio e pili ana i nā mea maopopo loa ma aneʻi, e like me ka hoʻokomo ʻana i ka Airflow, kāu waihona i koho ʻia, Celery a me nā hihia ʻē aʻe i wehewehe ʻia i loko o nā pahu.
I hiki iā mākou ke hoʻomaka koke i nā hoʻokolohua, ua kahakaha au docker-compose.yml ma kahi:
- E hoʻokiʻekiʻe maoli kākou Airflow: Mea hoʻonohonoho, Webserver. E wili pū ana ka pua ma laila e nānā i nā hana Celery (no ka mea, ua hoʻokomo ʻia i loko
apache/airflow:1.10.10-python3.7, akā ʻaʻole mākou manaʻo) - PostgreSQL, kahi e kākau ai ʻo Airflow i kāna ʻike lawelawe (nā ʻikepili hoʻonohonoho, nā helu hoʻokō, etc.), a e hōʻailona ʻo Celery i nā hana i hoʻopau ʻia;
- Ho'ōla, ka mea e hana ma ke ʻano he mea hoʻolimalima hana no Celery;
- Mea hana Seleri, e komo i ka hoʻokō pololei i nā hana.
- I ka waihona
./dagse hoʻohui mākou i kā mākou mau faila me ka wehewehe ʻana o dags. E ʻohi ʻia lākou ma ka lele, no laila ʻaʻohe pono e juggle i ka waihona holoʻokoʻa ma hope o kēlā me kēia kihe.
I kekahi mau wahi, ʻaʻole i hōʻike piha ʻia ke code i loko o nā hiʻohiʻona (i ʻole e hoʻopili i ka kikokikona), akā ma kahi i hoʻololi ʻia i ke kaʻina hana. Hiki ke loaʻa nā hiʻohiʻona code hana piha ma ka waihona .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerNā memo:
- I ka hui o ka haku mele, ua hilinaʻi nui au i ke kiʻi kaulana - e nānā pono. Malia paha ʻaʻole pono ʻoe i kekahi mea ʻē aʻe i kou ola.
- Loaʻa nā hoʻonohonoho Airflow āpau ʻaʻole wale ma o
airflow.cfg, akā ma o nā ʻano hoʻololi kaiapuni (mahalo i nā mea hoʻomohala), aʻu i hoʻohana hewa ai. - ʻO ka mea maʻamau, ʻaʻole ia i mākaukau: ʻaʻole wau i kau i ka puʻuwai puʻuwai ma nā ipu, ʻaʻole wau i pilikia i ka palekana. Akā, ua hana au i ka liʻiliʻi kūpono no kā mākou mau mea hoʻokolohua.
- E hoʻomaopopo:
- Pono ka waihona dag e hiki i ka mea hoʻonohonoho a me nā limahana.
- Hoʻohana like ia i nā hale waihona puke ʻaoʻao ʻekolu - pono lākou e hoʻokomo ʻia ma nā mīkini me kahi mea hoʻonohonoho a me nā limahana.
ʻAe, i kēia manawa ua maʻalahi:
$ docker-compose up --scale worker=3Ma hope o ka piʻi ʻana o nā mea a pau, hiki iā ʻoe ke nānā i nā kikowaena pūnaewele:
- Ualawai:
- Pua:
Nā manaʻo kumu
Inā ʻaʻole ʻoe i maopopo i kekahi mea ma kēia mau "dags", a laila eia kahi puke wehewehe pōkole:
- Hoʻomākaukau - ʻo ka makua kāne koʻikoʻi ma Airflow, e hoʻomalu ana i ka hana ʻana o nā robots, ʻaʻole ke kanaka: nānā i ka papa kuhikuhi, hoʻonui i nā dags, hoʻomaka i nā hana.
Ma keʻano laulā, i nā mana kahiko, loaʻa iā ia nā pilikia me ka hoʻomanaʻo (ʻaʻole, ʻaʻole amnesia, akā leaks) a ua mau ka ʻāpana hoʻoilina i nā configs.
run_duration— kona manawa hoʻomaka hou. Akā i kēia manawa ua maikaʻi nā mea a pau. - DAG (aka "dag") - "directed acyclic graph", akā ʻo ia ʻano wehewehe e haʻi i nā poʻe liʻiliʻi, akā ʻoiaʻiʻo he pahu ia no nā hana e launa pū kekahi me kekahi (e ʻike i lalo) a i ʻole kahi analogue o Package in SSIS a me Workflow in Informatica .
Ma waho aʻe o nā dags, aia paha nā subdags, akā ʻaʻole paha mākou e kiʻi iā lākou.
- Holo ka DAG - ka lā i hoʻomaka ʻia, i hāʻawi ʻia iā ia iho
execution_date. Hiki i nā Dagrans o ka lā hoʻokahi ke hana like (inā ua hana ʻoe i kāu mau hana i idempotent, ʻoiaʻiʻo). - Aʻole he mau ʻāpana code no ka hana ʻana i kahi hana kikoʻī. ʻEkolu ʻano o nā mea hana:
- hanae like me kā mākou punahele
PythonOperator, hiki ke hoʻokō i kekahi code Python (pono); - hoʻoili, ka mea lawe i ka ʻikepili mai kahi a i kahi, e ʻōlelo,
MsSqlToHiveTransfer; - mīkiniʻimi ma ka ʻaoʻao ʻē aʻe, e ʻae iā ʻoe e pane a hoʻolohi i ka hoʻokō hou ʻana o ka dag a hiki i kahi hanana.
HttpSensorhiki ke huki i ka hopena i kuhikuhi ʻia, a ke kali ka pane i makemake ʻia, e hoʻomaka i ka hoʻoiliGoogleCloudStorageToS3Operator. E nīnau ka noʻonoʻo nīnau: “No ke aha? Ma hope o nā mea a pau, hiki iā ʻoe ke hana hou i ka mea hoʻohana! A laila, i ʻole e hoʻopaʻa i ka wai o nā hana me nā mea hana i hoʻokuʻu ʻia. Hoʻomaka ka mea ʻike, nānā a make ma mua o ka hoʻāʻo aʻe.
- hanae like me kā mākou punahele
- Pākuʻi - hoʻolaha ʻia nā mea hoʻohana, me ka nānā ʻole i ke ʻano, a pili i ka dag e hāpai ʻia i ke kūlana o ka hana.
- laʻana hana - i ka manawa i hoʻoholo ai ka mea hoʻolālā maʻamau ua hiki i ka manawa e hoʻouna i nā hana i ke kaua ma luna o nā mea hana-hana (pololei ma kahi, inā mākou e hoʻohana
LocalExecutora i ʻole i kahi node mamao i ka hihia oCeleryExecutor), hāʻawi ia i kahi pōʻaiapili iā lākou (ʻo ia hoʻi, kahi hoʻonohonoho o nā mea hoʻololi - nā ʻāpana hoʻokō), hoʻonui i nā ʻōkuhi kauoha a i ʻole nīnau, a hoʻokomo iā lākou.
Hana mākou i nā hana
ʻO ka mea mua, e wehewehe mākou i ka hoʻolālā maʻamau o kā mākou doug, a laila e luʻu mākou i nā kikoʻī hou aʻe, no ka mea ke hoʻohana nei mākou i kekahi mau hoʻonā non-trivial.
No laila, ma kāna ʻano maʻalahi, e like ke ʻano o kēia dag:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)E noʻonoʻo kākou:
- ʻO ka mea mua, lawe mākou i nā libs pono a mea e ae;
sql_server_dsUaList[namedtuple[str, str]]me nā inoa o nā pilina mai Airflow Connections a me nā ʻikepili kahi e lawe ai mākou i kā mākou pā;dag- ka hoʻolaha o kā mākou lā, pono e komo i lokoglobals(), inā ʻaʻole e loaʻa ka Airflow. Pono ʻo Doug e ʻōlelo:- Owai kona inoa
orders- a laila e ʻike ʻia kēia inoa ma ka ʻaoʻao pūnaewele, - e hana ana oia mai ke aumoe i ka la ewalu o Iulai.
- a pono e holo, ma kahi o kēlā me kēia 6 hola (no ka poʻe paʻakikī ma ʻaneʻi ma kahi o
timedelta()ʻae ʻiacron-laina0 0 0/6 ? * * *, no ka liʻiliʻi liʻiliʻi - he ʻōlelo like@daily);
- Owai kona inoa
workflow()e hana i ka hana nui, akā ʻaʻole i kēia manawa. I kēia manawa, e hoʻolei mākou i kā mākou pōʻaiapili i loko o ka log.- A i kēia manawa ka mea kilokilo maʻalahi o ka hana ʻana i nā hana:
- holo mākou ma ko mākou mau kumu;
- hoʻomaka
PythonOperator, ka mea e hoʻokō i kā mākou dummyworkflow(). Mai poina e kuhikuhi i kahi inoa kūikawā (i loko o ka dag) o ka hana a hoʻopaʻa i ka dag ponoʻī. Haeprovide_contextma ka huli ʻana, e ninini i nā hoʻopaʻapaʻa hou aʻe i ka hana, a mākou e hōʻiliʻili pono ai me ka hoʻohana ʻana**context.
I kēia manawa, ʻo ia wale nō. Nā mea i loaʻa iā mākou:
- ka lā hou ma ka ʻaoʻao pūnaewele,
- hoʻokahi a me ka hapa haneli mau hana e hana like ʻia (inā ʻae ka Airflow, Celery a me ka mana kikowaena).
ʻAe, kokoke loaʻa.

Na wai e hoʻokomo i nā hilinaʻi?
No ka hoʻomaʻamaʻa ʻana i kēia mea āpau, ua hoʻololi wau docker-compose.yml hana ʻana requirements.txt ma na node a pau.
I kēia manawa ua hala:

ʻO nā ʻāpana ʻāhinahina he mau hana i hana ʻia e ka mea hoʻonohonoho.
Ke kali nei mākou, ua hoʻopau ʻia nā hana e nā limahana:

ʻO nā'ōmaʻomaʻo,ʻoiaʻiʻo, ua hoʻopau maikaʻi lākou i kā lākou hana. ʻAʻole lanakila nui nā ʻulaʻula.
Ma ke ala, ʻaʻohe waihona ma kā mākou huahana
./dags, ʻaʻohe hoʻonohonoho ma waena o nā mīkini - aia nā dags a paugitma kā mākou Gitlab, a hāʻawi ʻo Gitlab CI i nā mea hou i nā mīkini i ka wā e hui pū aimaster.
He wahi liʻiliʻi e pili ana i ka Pua
ʻOiai e ʻā ana nā limahana i kā mākou pacifiers, e hoʻomanaʻo kākou i kekahi mea hana e hiki ke hōʻike iā mākou i kekahi mea - Pua.
ʻO ka ʻaoʻao mua loa me ka ʻike hōʻuluʻulu o nā nodes limahana:

ʻO ka ʻaoʻao ikaika loa me nā hana i hele i ka hana:

ʻO ka ʻaoʻao ʻoluʻolu loa me ke kūlana o kā mākou broker:

ʻO ka ʻaoʻao māmā loa me nā kiʻi kūlana hana a me ko lākou manawa hoʻokō:

Hoʻouka mākou i ka underloaded
No laila, ua holo nā hana a pau, hiki iā ʻoe ke lawe i nā mea ʻeha.

A ua nui ka poe i eha - no kekahi kumu. I ka hihia o ka hoʻohana pono ʻana o Airflow, hōʻike kēia mau ʻāpana ʻaʻole i hiki mai ka ʻikepili.
Pono ʻoe e nānā i ka log a hoʻomaka hou i nā manawa hana hāʻule.
Ma ke kaomi ʻana i kēlā me kēia square, e ʻike mākou i nā hana i loaʻa iā mākou:

Hiki iā ʻoe ke lawe a hana i Clear the fallen. ʻO ia hoʻi, poina mākou ua hāʻule kekahi mea ma laila, a e hele ka hana like i ka mea hoʻonohonoho.

ʻIke ʻia ʻo ka hana ʻana i kēia me ka ʻiole me nā ʻāpana ʻulaʻula āpau ʻaʻole ia he kanaka - ʻaʻole kēia ka mea a mākou e manaʻo ai mai Airflow. Ma keʻano maoli, loaʻa iā mākou nā mea kaua o ka luku nui: Browse/Task Instances

E koho mākou i nā mea a pau i ka manawa hoʻokahi a hoʻihoʻi i ka zero, kaomi i ka mea kūpono:

Ma hope o ka hoʻomaʻemaʻe ʻana, e like me kēia kā mākou kaʻa (ke kali nei lākou i ka mea hoʻonohonoho e hoʻonohonoho iā lākou):

Hoʻohui, makau a me nā mea hoʻololi ʻē aʻe
ʻO ka manawa kēia e nānā ai i ka DAG e hiki mai ana, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Ua hana nā kānaka a pau i ka hōʻike hōʻike? Eia hou ʻo ia: aia kahi papa inoa o nā kumu mai kahi e loaʻa ai ka ʻikepili; aia kahi papa inoa e waiho ai; Mai poina i ka honk i ka wā i hana ʻia ai nā mea a pau a haki paha (maikaʻi, ʻaʻole pili kēia iā mākou, ʻaʻole).
E hele hou kāua i ka faila a nānā i nā mea pohihihi hou:
from commons.operators import TelegramBotSendMessage- ʻaʻohe mea e pale iā mākou mai ka hana ʻana i kā mākou mau mea hoʻohana ponoʻī, a mākou i hoʻohana pono ai ma ka hana ʻana i kahi wīwī liʻiliʻi no ka hoʻouna ʻana i nā leka iā Unblocked. (E kamaʻilio hou mākou e pili ana i kēia mea hoʻohana ma lalo);default_args={}- hiki i ka dag ke kaʻana like i nā manaʻo hoʻopaʻapaʻa i nā mea hoʻohana a pau;to='{{ var.value.all_the_kings_men }}'- kahuatoʻAʻole mākou e paʻakikī, akā hana ikaika ʻia me ka hoʻohana ʻana iā Jinja a me kahi ʻano me ka papa inoa o nā leka uila, aʻu i hoʻokomo pono ai i loko.Admin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— kūlana no ka hoʻomaka ʻana i ka mea hoʻohana. I kā mākou hihia, e lele ka leka i nā luna wale nō inā ua hana nā mea hilinaʻi āpau holomua;tg_bot_conn_id='tg_main'- hoʻopaʻapaʻaconn_ide ʻae i nā ID pili a mākou i hana aiAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- e lele wale nā memo ma Telegram inā loaʻa nā hana hāʻule;task_concurrency=1- pāpā mākou i ka hoʻomaka like ʻana o kekahi mau hana hana o hoʻokahi hana. A i ʻole, e loaʻa iā mākou ka hoʻomaka like ʻana o kekahiVerticaOperator(e nana ana i kekahi papaaina);report_update >> [email, tg]- nā mea āpauVerticaOperatorhui i ka hoʻouna ʻana i nā leka a me nā memo, e like me kēia:

Akā no ka mea he ʻokoʻa nā kūlana hoʻolaha o nā mea hoʻolaha, hoʻokahi wale nō e hana. I ka Tree View, ʻike iki nā mea a pau:

E ʻōlelo wau i kekahi mau ʻōlelo e pili ana macros a me ko lakou mau hoa- nā mea hoʻololi.
ʻO nā Macros nā wahi paʻa Jinja hiki ke hoʻololi i nā ʻike like ʻole i nā manaʻo hoʻopaʻapaʻa. No ka laʻana, e like me kēia:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} e hoʻonui i nā mea i loko o ka hoʻololi pōʻaiapili execution_date i ka ʻano YYYY-MM-DD: 2020-07-14. ʻO ka mea maikaʻi loa, ua hoʻopaʻa ʻia nā ʻano hoʻololi ʻokoʻa i kahi laʻana hana kikoʻī (kahi huinahā ma ka Tree View), a i ka wā e hoʻomaka hou ai, e hoʻonui ʻia nā mea waiho i nā waiwai like.
Hiki ke nānā ʻia nā waiwai i hāʻawi ʻia me ka pihi Rendered ma kēlā me kēia hana hana. Penei ka hana me ka hoʻouna leka:

A pēlā i ka hana me ka hoʻouna ʻana i kahi leka:

Aia kahi papa inoa piha o nā macros i kūkulu ʻia no ka mana hou loa i loaʻa ma aneʻi:
Eia kekahi, me ke kōkua o nā plugins, hiki iā mākou ke haʻi i kā mākou macros ponoʻī, akā ʻo ia kekahi moʻolelo.
Ma kahi o nā mea i koho mua ʻia, hiki iā mākou ke hoʻololi i nā waiwai o kā mākou mau ʻano (ua hoʻohana wau i kēia ma ke code ma luna). E hana kākou i loko Admin/Variables ʻelua mau mea:

ʻO nā mea a pau āu e hoʻohana ai:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Hiki i ka waiwai ke scalar, a i ʻole JSON. I ka hihia o JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}e hoʻohana wale i ke ala i ke kī makemake: {{ var.json.bot_config.bot.token }}.
E ʻōlelo maoli wau i hoʻokahi huaʻōlelo a hōʻike i hoʻokahi kiʻi kiʻi e pili ana nā pilina. He mea haʻahaʻa nā mea a pau ma ʻaneʻi: ma ka ʻaoʻao Admin/Connections hana mākou i kahi pilina, hoʻohui i kā mākou logins / password a me nā ʻāpana kikoʻī ma laila. E like me kēia:

Hiki ke hoʻopili ʻia nā ʻōlelo huna (ʻoi aku ka maikaʻi ma mua o ka paʻamau), a i ʻole hiki iā ʻoe ke haʻalele i ke ʻano pili (e like me kaʻu i hana ai no tg_main) - ʻo ka ʻoiaʻiʻo, ua paʻa ka papa inoa o nā ʻano i nā hiʻohiʻona Airflow a ʻaʻole hiki ke hoʻonui ʻia me ka ʻole o ka komo ʻana i nā code kumu (inā ʻaʻole au i google i kekahi mea, e ʻoluʻolu e hoʻoponopono iaʻu), akā ʻaʻohe mea e pale iā mākou mai ka loaʻa ʻana o nā hōʻaiʻē ma o inoa.
Hiki iā ʻoe ke hana i kekahi mau pilina me ka inoa like: i kēia hihia, ke ʻano BaseHook.get_connection(), e loaʻa iā mākou nā pilina ma ka inoa, e hāʻawi kūʻokoʻa mai kekahi mau inoa (ʻoi aku ka maikaʻi o ka hana ʻana i Round Robin, akā e waiho mākou ma ka lunamanaʻo o nā mea hoʻomohala Airflow).
ʻO nā ʻokoʻa a me nā pilina he mau mea hana maikaʻi loa, akā he mea nui ʻaʻole e nalowale i ke koena: ʻo nā ʻāpana o kāu kahe āu e mālama ai i ke code ponoʻī, a ʻo nā ʻāpana āu e hāʻawi ai iā Airflow no ka mālama ʻana. Ma kekahiʻaoʻao, hiki ke maʻalahi ke hoʻololi koke i ka waiwai, no ka laʻana, kahi pahu leka uila, ma o ka UI. Ma ka ʻaoʻao ʻē aʻe, he hoʻi kēia i ka kaomi ʻiole, kahi a mākou (I) i makemake ai e hoʻopau.
ʻO ka hana me nā pilina kekahi o nā hana makau. Ma keʻano laulā, he mau maka no ka hoʻopili ʻana iā ia i nā lawelawe ʻaoʻao ʻekolu a me nā hale waihona puke. E laʻa, JiraHook e wehe i kahi mea kūʻai no mākou e launa pū me Jira (hiki iā ʻoe ke neʻe i nā hana i hope a i waho), a me ke kōkua o SambaHook hiki iā ʻoe ke pana i kahi faila kūloko i smb- kiko.
Hoʻopili i ka mea hoʻohana maʻamau
A kokoke mākou e nānā i ka hana ʻana TelegramBotSendMessage
kuhi commons/operators.py me ka mea hoʻohana maoli:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Eia, e like me nā mea ʻē aʻe ma Airflow, maʻalahi loa nā mea āpau:
- Hoʻoilina mai
BaseOperator, e hoʻokō ana i kekahi mau mea kikoʻī Airflow (e nānā i kāu leʻaleʻa) - Nā kahua i haʻi ʻia
template_fields, kahi e ʻimi ai ʻo Jinja i nā macros e hana. - Hoʻonohonoho i nā hoʻopaʻapaʻa kūpono no
__init__(), hoʻonoho i nā mea paʻamau inā pono. - ʻAʻole mākou i poina i ka hoʻomaka ʻana o ke kupuna.
- Wehe ʻia ka makau pili
TelegramBotHookloaʻa i kahi mea kūʻai mai ia mea. - ʻO ke ʻano hoʻololi (hoʻololi hou ʻia).
BaseOperator.execute(), ʻo ia ka Airfow e hoʻokuʻu i ka wā e hoʻomaka ai ka mea hoʻohana - i loko o laila mākou e hoʻokō ai i ka hana nui, poina e komo. (Loaʻa mākou, ma ke ala, komo pololeistdoutиstderr- E hoʻokuʻu ka ea i nā mea a pau, kāʻei nani ia, hoʻoheheʻe ʻia inā pono.)
E ʻike kākou i nā mea i loaʻa iā kākou commons/hooks.py. ʻO ka hapa mua o ka faila, me ka makau ponoʻī:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientʻAʻole maopopo iaʻu i ka mea e wehewehe ai ma aneʻi, e nānā wale wau i nā mea nui:
- Hoʻoilina mākou, e noʻonoʻo e pili ana i nā hoʻopaʻapaʻa - ma ka hapanui o nā hihia he hoʻokahi:
conn_id; - Ka wehewehe hou ʻana i nā ʻano maʻamau: Ua kaupalena wau iaʻu iho
get_conn(), kahi e loaʻa ai iaʻu nā palena pili ma ka inoa a loaʻa wale i ka ʻāpanaextra(he kahua JSON kēia), kahi aʻu (e like me kaʻu mau ʻōlelo aʻoaʻo!) kau i ka Telegram bot token:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Hoʻokumu wau i kahi laʻana o kā mākou
TelegramBot, hāʻawi i kahi hōʻailona kikoʻī.
ʻo ia wale nō. Hiki iā ʻoe ke kiʻi i kahi mea kūʻai mai kahi hook hoʻohana TelegramBotHook().clent ai ole ia, TelegramBotHook().get_conn().
A ʻo ka ʻāpana ʻelua o ka faila, kahi aʻu e hana ai i kahi microwrapper no ka Telegram REST API, i ʻole e kauo like. no ka mea hookahi sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))ʻO ke ala kūpono e hoʻohui i nā mea a pau:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- i ka plugin, e hoʻokomo i loko o kahi waihona lehulehu, a hāʻawi iā Open Source.
ʻOiai mākou e aʻo nei i kēia mau mea a pau, ua lanakila kā mākou hōʻike hōʻike a hoʻouna mai iaʻu i kahi leka hewa ma ke kahawai. E nānā au e ʻike inā he hewa ...

Ua haki kekahi mea i kā mākou ʻīlio! ʻAʻole anei ʻo ia kā mākou i manaʻo ai? Pololei!
E ninini anei ʻoe?
Manaʻo paha ʻoe ua nele au i kekahi mea? Me he mea lā ua hoʻohiki ʻo ia e hoʻoili i ka ʻikepili mai ka SQL Server iā Vertica, a laila lawe ʻo ia a neʻe aku i ke kumuhana, ka scoundrel!
Ua manaʻo ʻia kēia hana ʻino, pono wau e wehewehe i kekahi mau huaʻōlelo no ʻoe. I kēia manawa hiki iā ʻoe ke hele i mua.
ʻO kā mākou papahana kēia:
- Hana lā
- Hana i nā hana
- E ʻike i ka nani o nā mea a pau
- Hāʻawi i nā helu kau e hoʻopiha
- E kiʻi i ka ʻikepili mai SQL Server
- E hoʻokomo i ka ʻikepili i loko o Vertica
- E hōʻiliʻili i nā helu
No laila, no ka hoʻokō ʻana i kēia mau mea a pau, ua hana wau i kahi hoʻohui liʻiliʻi i kā mākou docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyMa laila mākou e hāpai ai:
- ʻO Vertica ka mea hoʻokipa
dwhme nā hoʻonohonoho paʻamau loa, - ʻekolu mau manawa o SQL Server,
- hoʻopiha mākou i nā ʻikepili i ka hope me kekahi mau ʻikepili (ʻaʻohe hihia e nānā i loko
mssql_init.py!)
Hoʻomaka mākou i nā mea maikaʻi a pau me ke kōkua o kahi kauoha ʻoi aku ka paʻakikī ma mua o ka manawa hope:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3ʻO ka mea i hana ʻia e kā mākou hana mana randomizer, hiki iā ʻoe ke hoʻohana i ka mea Data Profiling/Ad Hoc Query:

ʻO ka mea nui ʻaʻole ia e hōʻike i nā mea loiloi
wehewehe ma Nā kau ETL ʻAʻole wau, he mea liʻiliʻi loa nā mea a pau: hana mākou i kahi kumu, aia kahi hōʻailona i loko, kāʻei mākou i nā mea āpau me kahi luna pōʻaiapili, a ke hana nei mākou i kēia:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passUa hiki mai ka manawa e hōʻiliʻili i kā mākou ʻikepili mai ko makou papaaina hookahi a me ka hapa. E hana mākou i kēia me ke kōkua o nā laina maikaʻi ʻole:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Me ke kōkua o kahi makau e loaʻa iā mākou mai Airflow
pymssql-pili - E hoʻololi i kahi palena ma ke ʻano o ka lā i loko o ka noi - e hoʻolei ʻia i loko o ka hana e ka mīkini template.
- E hānai ana i kā mākou noi
pandasnawai e loaa maiDataFrame- he mea pono ia iā mākou i ka wā e hiki mai ana.
Ke hoʻohana nei au i ka hoʻololi
{dt}ma kahi o ka palena noi%sʻaʻole no ka mea he Pinocchio ʻino wau, akā no ka meapandashiki ole ke lawelawepymssqla pahee i ka hopeparams: Listʻoiai makemake maoli ʻo iatuple.
E hoʻomaopopo hoʻi i ka mea hoʻomohalapymssqlua hoʻoholo e kākoʻo hou iā ia, a ua hiki i ka manawa e neʻe aipyodbc.
E ʻike kākou i ka mea a Airflow i hoʻopiha ai i nā hoʻopaʻapaʻa o kā mākou hana me:

Inā ʻaʻohe ʻikepili, ʻaʻohe kumu o ka hoʻomau. Akā, he mea kupanaha nō hoʻi ke noʻonoʻo i ka holomua o ka hoʻopiha ʻana. Akā, ʻaʻole kēia he kuhi. A-ah-ah, he aha ka hana?! A eia ka mea:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException e haʻi iā Airflow ʻaʻohe hewa, akā hoʻokuʻu mākou i ka hana. ʻAʻole e loaʻa i ka interface kahi ʻōmaʻomaʻo a ʻulaʻula paha, akā ʻulaʻula.
E kiola i kā mākou ʻikepili mau kolamu:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Kaulana:
- ʻO ka ʻikepili kahi i lawe ai mākou i nā kauoha,
- ID o kā mākou kau waikahe (e ʻokoʻa no kēlā me kēia hana),
- ʻO kahi hash mai ke kumu a me ka ID kauoha - no laila i loko o ka waihona hope loa (kahi i ninini ʻia nā mea āpau i hoʻokahi papa) loaʻa iā mākou kahi ID kauoha kūʻokoʻa.
Ke koe nei ka pae hope loa: e ninini i nā mea a pau i Vertica. A ʻo ka mea kupanaha, ʻo kekahi o nā ala maikaʻi loa e hana ai i kēia ma o CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Ke hana nei mākou i kahi mea hoʻokipa kūikawā
StringIO. pandase hoʻokomo maikaʻi i kā mākouDataFramei ke anoCSV-laina.- E wehe kākou i kahi pilina me kā Vertica punahele me ka makau.
- A i kēia manawa me ke kōkua
copy()e hoʻouna pololei i kā mākou ʻikepili iā Vertika!
E kiʻi mākou mai ka mea hoʻokele i ka nui o nā laina i hoʻopiha ʻia, a haʻi i ka luna hālāwai ua maikaʻi nā mea a pau:
session.loaded_rows = cursor.rowcount
session.successful = TrueO ia wale nō.
Ma ke kūʻai aku, hana mākou me ka lima i ka pā paʻa. Ma ʻaneʻi ua ʻae wau iaʻu iho i kahi mīkini liʻiliʻi:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)Ke hoʻohana nei au
VerticaOperator()Hoʻokumu wau i kahi papa kuhikuhi waihona a me kahi papaʻaina (inā ʻaʻole lākou i noho mua, ʻoiaʻiʻo). ʻO ka mea nui e hoʻonohonoho pono i nā hilinaʻi:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadLoaʻa i luna
- ʻAe, - wahi a ka ʻiole liʻiliʻi, - ʻaʻole anei, i kēia manawa
Manaʻo ʻoe ʻo wau ka holoholona weliweli loa o ka nahele?
ʻO Julia Donaldson, The Gruffalo
Manaʻo wau inā he hoʻokūkū koʻu mau hoa hana: ʻo wai ka mea e hana wikiwiki a hoʻomaka i kahi kaʻina hana ETL mai ka wā ʻōpio: ʻo lākou me kā lākou SSIS a me ka ʻiole a me aʻu me Airflow ... A laila e hoʻohālikelike mākou i ka maʻalahi o ka mālama ʻana ... Auē, manaʻo wau e ʻae ʻoe e lanakila wau iā lākou ma nā ʻaoʻao āpau!
Inā ʻoi aku ka koʻikoʻi, a laila ʻo Apache Airflow - ma ka wehewehe ʻana i nā kaʻina hana ma ke ʻano o ka code program - i hana i kaʻu hana nui ʻoi aku ka ʻoluʻolu a me ka leʻaleʻa.
ʻO kona hiki ʻole palena ʻole, ma nā ʻōlelo o nā plug-ins a me ka predisposition i ka scalability, hāʻawi iā ʻoe i ka manawa e hoʻohana ai i ka Airflow ma kahi kokoke i nā wahi āpau: ʻoiai i ka piha holoʻokoʻa o ka hōʻiliʻili, hoʻomākaukau a me ka hoʻoili ʻana i ka ʻikepili, ʻoiai i ka hoʻokuʻu ʻana i nā rockets (i Mars, o papa).
Mahele hope, kuhikuhi a me ka ʻike
ʻO ka rake a mākou i hōʻiliʻili ai no ʻoe
start_date. ʻAe, he meme kūloko kēia. Via ka hoʻopaʻapaʻa nui a Dougstart_datehala nā mea a pau. I ka pōkole, inā ʻoe e kuhikuhi i lokostart_datelā o kēia manawa, aschedule_interval- i kekahi lā, a laila hoʻomaka ʻo DAG i ka lā ʻapōpō ʻaʻole ma mua.start_date = datetime(2020, 7, 7, 0, 1, 2)A ʻaʻohe pilikia hou.
Aia kekahi hewa runtime e pili ana me ia:
Task is missing the start_date parameter, e hōʻike pinepine ana ua poina ʻoe e hoʻopaʻa i ka mea hoʻohana dag.- ʻO nā mea a pau ma kahi mīkini. ʻAe, a me nā kumu (Airflow pono'ī a me kā mākou uhi), a me kahi kikowaena pūnaewele, a me ka mea hoʻonohonoho, a me nā limahana. A ua hana pono. Akā i ka wā lōʻihi, ua ulu ka nui o nā hana no nā lawelawe, a i ka wā i hoʻomaka ai ʻo PostgreSQL e pane i ka index ma 20 s ma kahi o 5 ms, lawe mākou a lawe aku.
- Mea hoʻokō kūloko. ʻAe, ke noho nei mākou i luna, a ua hiki mua mākou i ka lihi o ka hohonu. Ua lawa ka LocalExecutor iā mākou i kēia manawa, akā i kēia manawa ua hiki i ka manawa e hoʻonui me hoʻokahi mea hana, a pono mākou e hana ikaika e neʻe i CeleryExecutor. A no ka mea hiki iā ʻoe ke hana pū me ia ma ka mīkini hoʻokahi, ʻaʻohe mea e kāohi iā ʻoe mai ka hoʻohana ʻana i ka Celery ma kahi kikowaena, "ʻoiaʻiʻo, ʻaʻole loa e hele i ka hana, ʻoiaʻiʻo!"
- Hoʻohana ʻole mea paahana i kukuluia:
- i pili ai e mālama i nā hōʻoia lawelawe,
- SLA Nalo e pane i nā hana i hana ʻole i ka manawa,
- xcom no ka hoʻololi metadata (ʻōlelo wau metaʻikepili!) ma waena o nā hana dag.
- Hoʻokino leka. ʻAe, he aha kaʻu e ʻōlelo ai? Ua hoʻonohonoho ʻia nā māka no ka hana hou ʻana o nā hana hāʻule. I kēia manawa, ʻo kaʻu hana Gmail he> 90k leka uila mai Airflow, a hōʻole ka pahu leka uila e ʻohi a holoi i nā mea ʻoi aku ma mua o 100 i ka manawa.
Nā pilikia hou aʻe:
ʻOi aku nā mea hana automation
I mea e hana hou ai mākou me ko mākou mau poʻo ʻaʻole me ko mākou mau lima, ua hoʻomākaukau ʻo Airflow no mākou i kēia:
- - loaʻa iā ia ke kūlana o Experimental, ʻaʻole ia e pale iā ia mai ka hana. Me ia, ʻaʻole hiki iā ʻoe ke kiʻi wale i ka ʻike e pili ana i nā dags a me nā hana, akā hoʻopau / hoʻomaka i kahi dag, hana i kahi DAG Run a i ʻole kahi wai.
- - nui nā mea hana i loaʻa ma o ka laina kauoha ʻaʻole hiki ke hoʻohana ma o ka WebUI, akā ʻaʻole maʻamau. ʻo kahi laʻana:
backfillpono e hoʻomaka hou i nā manawa hana.
No ka laʻana, hele mai nā mea loiloi a ʻōlelo mai: "A ʻo ʻoe, e ka hoa, he mea lapuwale i ka ʻikepili mai Ianuali 1 a 13! Hoʻoponopono, hoʻoponopono, hoʻoponopono, hoʻoponopono!" A he hob oe:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Hana kumu:
initdb,resetdb,upgradedb,checkdb. run, hiki iā ʻoe ke holo i hoʻokahi hana hana, a me ka helu ʻana i nā mea hilinaʻi āpau. Eia kekahi, hiki iā ʻoe ke holo ma oLocalExecutor, ʻoiai inā loaʻa iā ʻoe kahi hui Celery.- Hana like i ka mea like
test, ma na kumu wale no ka mea kakau ole. connectionshiki ke hana nui i nā pilina mai ka pūpū.
- - kahi ala paʻakikī o ke kamaʻilio ʻana, i manaʻo ʻia no nā plugins, ʻaʻole e hoʻopā i loko me nā lima liʻiliʻi. Akā ʻo wai ka mea e kāohi iā mākou mai ka hele ʻana
/home/airflow/dags, holoipythona hoʻomaka i ka hana ʻino? Hiki iā ʻoe, no ka laʻana, e hoʻokuʻu aku i nā pili āpau me kēia code:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Hoʻopili i ka Airflow metadatabase. ʻAʻole wau e paipai e kākau iā ia, akā ʻoi aku ka wikiwiki a me ka maʻalahi o ka loaʻa ʻana o nā mokuʻāina hana no nā metric kikoʻī like ʻole ma mua o ka hoʻohana ʻana i kekahi o nā API.
E ʻōlelo kākou ʻaʻole i manaʻo nā hana a pau, akā hiki ke hāʻule i kekahi manawa, a he mea maʻamau kēia. Akā ke kānalua nei kekahi mau blockages, a pono e nānā.
E akahele SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
kūmole
A ʻo ka ʻoiaʻiʻo, ʻo nā loulou mua he ʻumi mai ka hoʻopuka ʻana o Google nā mea i loko o ka waihona Airflow mai kaʻu mau bookmark.
- - ʻoiaʻiʻo, pono mākou e hoʻomaka me ke keʻena. palapala, akā ʻo wai ka mea heluhelu i nā kuhikuhi?
- - ʻAe, ma ka liʻiliʻi e heluhelu i nā ʻōlelo aʻoaʻo mai nā mea hana.
- - ka hoʻomaka: ka mea hoʻohana ma nā kiʻi
- - ua wehewehe maikaʻi ʻia nā manaʻo kumu, inā (hikiwawe!) ʻAʻole ʻoe i maopopo i kekahi mea mai iaʻu.
- - he alakaʻi pōkole no ka hoʻonohonoho ʻana i kahi hui Airflow.
- - aneane like ka ʻatikala hoihoi, koe wale nō paha ka formalism, a me ka liʻiliʻi o nā laʻana.
- - e pili ana i ka hana pū me Celery.
- - e pili ana i ka idempotency o nā hana, hoʻouka ʻia e ka ID ma mua o ka lā, hoʻololi, hoʻonohonoho faila a me nā mea hoihoi.
- - nā hilinaʻi o nā hana a me Trigger Rule, aʻu i ʻōlelo ai ma ka hele wale ʻana.
- - pehea e lanakila ai i kekahi "hana e like me ka mea i manaʻo ʻia" i ka mea hoʻonohonoho, hoʻouka i ka ʻikepili nalowale a hoʻonohonoho i nā hana.
- - nā nīnau SQL pono i ka Airflow metadata.
- - aia kahi ʻāpana kūpono e pili ana i ka hana ʻana i kahi sensor maʻamau.
- - he leka pōkole hoihoi e pili ana i ke kūkulu ʻana i kahi ʻōnaehana ma AWS no ka ʻIke ʻIke.
- - nā hewa maʻamau (inā ʻaʻole heluhelu kekahi i nā ʻōlelo kuhikuhi).
- - ʻakaʻaka pehea ka poʻe e mālama ai i nā ʻōlelo huna, ʻoiai hiki iā ʻoe ke hoʻohana i nā Connections.
- - ka hoʻouna mua ʻana o DAG, ka hoʻolei ʻana i ka pōʻaiapili i nā hana, e pili ana i nā hilinaʻi, a e pili ana hoʻi i ka hoʻokuʻu ʻana i nā hana.
- - e pili ana i ka hoʻohana
default argumentsиparamsi nā templates, a me nā mea hoʻololi a me nā pilina. - - he moʻolelo e pili ana i ka hoʻomākaukau ʻana o ka mea hoʻolālā no ka Airflow 2.0.
- - he ʻatikala liʻiliʻi e pili ana i ka lawe ʻana i kā mākou hui i loko
docker-compose. - - nā hana ikaika me ka hoʻohana ʻana i nā templates a me ka hoʻouna ʻana i ka pōʻaiapili.
- - nā hoʻolaha maʻamau a me ka maʻamau ma ka leka uila a me Slack.
- - Nā hana lālā, macros a me XCom.
A me nā loulou i hoʻohana ʻia ma ka ʻatikala:
- - hiki ke hoʻohana ʻia nā mea hoʻopaʻa wahi no ka hoʻohana ʻana i nā templates.
- — Nā hewa maʻamau i ka hana ʻana i nā dags.
- -
docker-composeno ka hoʻāʻo ʻana, debugging a me nā mea hou aku. - — Kākoʻo Python no Telegram REST API.
Source: www.habr.com




