Talofa, o aʻu o Dmitry Logvinenko - Inisinia Faʻamatalaga o le Matagaluega o Faʻamaumauga a le Vezet vaega o kamupani.
O le a ou taʻu atu ia te oe se meafaigaluega matagofie mo le atinaʻeina o faiga ETL - Apache Airflow. Ae o le Airflow e matua tele lava ma tele e tatau ona e vaʻai totoʻa i ai e tusa lava pe e te le o aʻafia i faʻamaumauga, ae e iai se manaʻoga e faʻalauiloa i lea taimi ma lea taimi soʻo se faiga ma mataʻituina a latou faʻatinoga.
Ma ioe, o le a le gata ina ou taʻu atu, ae faʻaalia foi: o le polokalama e tele naua code, screenshots ma fautuaga.

O mea e masani ona e va'aia pe a e google le upu Airflow / Wikimedia Commons
Lisi o Mataupu
Faatomuaga
Apache Airflow e pei lava o Django:
- tusia i le python
- o loʻo i ai se pule faʻapitoa,
- fa'alautele e le gata
- naʻo le sili atu, ma na faia mo faʻamoemoega eseese, e pei ona tusia i luma o le kat):
- tamoe ma mataʻituina galuega i luga o se numera e le faʻatapulaʻaina o masini (e pei o le tele o Celery / Kubernetes ma lou lotofuatiaifo o le a faʻatagaina oe)
- faʻatasi ai ma le faʻagasologa o galuega faʻavae mai le faigofie tele e tusi ma malamalama i le Python code
- ma le mafai ona faʻafesoʻotaʻi soʻo se faʻamaumauga ma API i le tasi ma le isi e faʻaaoga uma vaega uma ua saunia ma faʻapipiʻi fale (lea e matua faigofie lava).
Matou te faʻaaogaina Apache Airflow pei o lenei:
- matou te aoina faʻamatalaga mai faʻamatalaga eseese (tele SQL Server ma PostgreSQL faʻataʻitaʻiga, API eseese faʻatasi ai ma fua faʻatatau, e oʻo lava i le 1C) i le DWH ma le ODS (e iai a matou Vertica ma Clickhouse).
- o le a le maualuga
cron, lea e amata ai le faʻamautuina o faʻamaumauga i luga o le ODS, ma mataʻituina foi a latou tausiga.
Seia oʻo mai talu ai nei, o matou manaʻoga na ufiufi e se tasi tamaʻi server ma 32 cores ma 50 GB o RAM. I le Airflow, e aoga lenei mea:
- sili atu 200 aso (o le mea moni o galuega, lea na matou faʻatumu ai galuega),
- i le averesi taitasi 70 galuega,
- amata lenei lelei (fa'apena foi i le averesi) faatasi i le itula.
Ma e uiga i le auala na tatou faʻalauteleina ai, o le a ou tusia i lalo, ae o lenei seʻi o tatou faʻamalamalamaina le über-faʻafitauli o le a tatou foia:
E tolu uluai SQL Servers, e taʻitasi ma 50 faʻamaumauga - faʻataʻitaʻiga o le tasi poloketi, i le faasologa, latou te tutusa le fausaga (toetoe lava i soo se mea, mua-ha-ha), o lona uiga e tofu i latou ma se laulau o Poloaiga (lelei, o se laulau ma lena. igoa e mafai ona tulei i soo se pisinisi). Matou te ave faʻamaumauga e ala i le faʻaopoopoina o faʻalapotopotoga o auaunaga (server source, source database, ETL task ID) ma lafo i totonu, fai mai, Vertica.
Tatou o!
O le vaega autu, faʻatinoga (ma sina faʻamatalaga)
Aisea tatou (ma oe)
Ina ua tetele laau ma sa ou faigofie SQL-schik i se tasi o faleoloa a Rusia, na matou faʻaseseina faiga ETL aka faʻamatalaga faʻamatalaga e faʻaaoga ai meafaigaluega e lua o loʻo avanoa mo i matou:
- Informatica Power Center - o se faiga e matua salalau, e matua'i aoga, ma ana lava meafaigaluega, lona lava fa'aliliuga. Na ou faʻaaogaina le Atua faʻasa 1% o ona gafatia. Aisea? Ia, muamua lava, o lenei atinaʻe, i se mea mai le 380s, na faʻamalosia ai i matou i le mafaufau. Lona lua, o lenei mea faʻapipiʻi ua mamanuina mo faiga sili ona manaia, toe faʻaaogaina vaega ita ma isi mea taua-pisinisi-togafiti. E uiga i le mea moni e tau, pei o le apaau o le Airbus AXNUMX / tausaga, matou te le fai atu se mea.
Faʻaeteete, o le faʻamalama e mafai ona afaina ai tagata i lalo ole 30

- SQL Server Integration Server - na matou fa'aogaina lenei uo i totonu oa matou galuega faatino. Ia, o le mea moni: ua uma ona matou faʻaogaina le SQL Server, ma o le a le talafeagai le le faʻaaogaina o ana meafaigaluega ETL. O mea uma i totonu e lelei: o le atinaʻe uma e matagofie, ma le alualu i luma lipoti ... Ae e le o le mea lea matou te fiafia ai i mea faʻapipiʻi, oi, e le mo lenei. Version it
dtsx(lea o le XML ma nodes shuffled i luga o sefe) tatou mafai, ae o le a le uiga? E fa'afefea le faia o se pusa galuega e toso atu ai le faitau selau o laulau mai le tasi server i le isi? Ioe, o le a le selau, o lou tamatamailima lima o le a pa'u ese mai le luasefulu fasi, kiliki i luga o le kiore. Ae e mautinoa lava e sili atu ona faʻalelei:
E mautinoa lava sa matou vaavaai mo auala e alu ese ai. Tulaga tutusa toetoe lava na oʻo mai i se faʻapipiʻi pusa SSIS na tusia e ia lava ...
… ona maua ai lea o a'u e se galuega fou. Ma na maua a'u e Apache Airflow.
Ina ua ou iloa o faʻamatalaga o le ETL o le Python code faigofie, ou te leʻi siva ma le fiafia. O le auala lea na faʻaliliuina ma eseese ai faʻamaumauga, ma sasaa laulau ma se fausaga e tasi mai le faitau selau o faʻamaumauga i totonu o le tasi sini na avea ma mataupu o le Python code i le tasi ma le afa pe lua 13 "screens.
Fa'apotopotoina le fuifui
Aua neʻi o tatou faʻatulagaina se aʻoga atoa, ma aua le talanoa e uiga i mea manino atoatoa iinei, e pei o le faʻapipiʻiina o Airflow, lau faʻamaumauga filifilia, Seleli ma isi mataupu o loʻo faʻamatalaina i totonu o faʻailoga.
Ina ia mafai ona vave amata faʻataʻitaʻiga, na ou tusia ai docker-compose.yml lea:
- Tatou siitia moni lava Felaʻuaiga o le ea: Fa'atonu, Webserver. Fugalaau o le a taamilo foi iina e mataʻituina galuega Celery (aua ua uma ona tulei i totonu
apache/airflow:1.10.10-python3.7, ae matou te le afaina) - PostgreSQL, lea o le a tusia ai e le Airflow ana faʻamatalaga auʻaunaga (faʻamaumauga faʻatulagaina, faʻamaumauga o faʻatinoga, ma isi), ma o le a faʻailogaina e Seleri galuega ua maeʻa;
- Redis, lea o le a galue o se tagata e faia galuega mo Seleri;
- Tagata fai seleni, lea o le a auai i le faʻatinoina saʻo o galuega.
- I le faila
./dagso le a matou faʻaopoopoina a matou faila ma faʻamatalaga o dags. O le a pikiina i luga o le lele, o lea e le manaʻomia ai le faʻafefeteina o le faaputuga atoa pe a uma le mafatua.
I nisi o nofoaga, o le code i faʻataʻitaʻiga e leʻo faʻaalia atoatoa (ina ia le faʻafefeteina le tusitusiga), ae o se mea e suia i le faagasologa. E mafai ona maua faʻataʻitaʻiga faʻataʻitaʻiga faʻataʻitaʻiga atoatoa ile fale teu oloa .
faicker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerFaamatalaga:
- I le faʻapotopotoga o le fatuga, na ou faʻalagolago tele i le ata lauiloa - ia mautinoa e siaki i fafo. Atonu e te le manaʻomia se isi mea i lou olaga.
- E maua uma faatulagaga Airflow e le gata i
airflow.cfg, ae faʻapea foʻi i suiga o le siosiomaga (faʻafetai i le au atinaʻe), lea na ou faʻaogaina ma le leaga. - E masani lava, e leʻo saunia: Ou te leʻi tuʻuina le fatu fatu i luga o pusa, ou te leʻi faʻalavelave i le saogalemu. Ae na ou faia le mea aupito maualalo talafeagai mo a matou tagata suʻesuʻe.
- Manatua:
- E tatau ona avanoa le faila dag i le fa'atulagaina ma le aufaigaluega.
- E fa'apea fo'i i faletusi uma a isi vaega - e tatau ona fa'apipi'i uma i masini e iai se fa'asologa ma tagata faigaluega.
Ia, o lea ua faigofie:
$ docker-compose up --scale worker=3A maeʻa mea uma, e mafai ona e vaʻavaʻai i fesoʻotaʻiga i luga ole laiga:
- Fa'alelei:
- Fugalaʻau:
Faʻamatalaga autu
Afai e te leʻi malamalama i se mea i nei "dags" uma, o se lomifefiloi puupuu lenei:
- Faʻatulaga - o le tuagane sili ona taua i le Airflow, o loʻo pulea le galue malosi o robots, ae le o se tagata: mataʻituina le faʻatulagaga, faʻafouina aso, faʻalauiloa galuega.
I se tulaga lautele, i lomiga tuai, sa i ai ni faafitauli i le mafaufau (leai, e le o le amnesia, ae o le gaogao) ma o le talatuu na tumau pea i le configs.
run_duration- le va o lona toe amata. Ae o lea ua lelei mea uma. - DAG (aka "dag") - "faʻatonu acyclic kalafi", ae o sea faʻamatalaga o le a taʻuina atu i nai tagata, ae o le mea moni o se pusa mo galuega e fegalegaleai ai le tasi ma le isi (silasila i lalo) poʻo se faʻataʻitaʻiga o Package i SSIS ma Workflow i Informatica .
I le faaopoopo atu i dags, atonu o loʻo i ai pea subdags, ae atonu o le a tatou le oʻo atu ia i latou.
- DAG Run - amataga aso, lea ua tofia i ai lava
execution_date. Dagrans o le aso lava e tasi e mafai ona galulue tutusa (pe a fai na e faia au galuega e le mafai, ioe). - Pule o vaega o tulafono laiti e nafa ma le faia o se gaioiga patino. E tolu ituaiga o tagata faigaluega:
- gaoioigapei o le matou fiafia
PythonOperator, lea e mafai ona faʻatinoina soʻo se (aoga) Python code; - faaliliuina atu, lea e felauai fa'amatalaga mai lea nofoaga i lea nofoaga, fai mai,
MsSqlToHiveTransfer; - sensor i le isi itu, o le a faʻatagaina oe e tali pe faʻagesegese le faʻataunuʻuina atili o le aso seia oʻo ina tupu se mea na tupu.
HttpSensore mafai ona tosoina le faʻaiʻuga faʻamaonia, ma a faʻatali le tali manaʻomia, amata le fesiitaigaGoogleCloudStorageToS3Operator. O le a fesili se mafaufau suʻesuʻe: “Aisea? A uma mea uma, e mafai ona e faia toe fai saʻo i totonu o le faʻalapotopotoga! Ona sosoo ai lea, ina ia aua neʻi faʻapipiʻi le vaitaele o galuega ma tagata faʻamalolo le tumau. E amata le masini, siaki ma mate aʻo leʻi faia le isi taumafaiga.
- gaoioigapei o le matou fiafia
- Task - ta'uta'u fa'agaioiga, tusa lava po'o le a le ituaiga, ma fa'apipi'i i le aso e si'itia i le tulaga o galuega.
- fa'ata'ita'iga o galuega - ina ua filifili le fuafuaga lautele ua oʻo i le taimi e tuʻuina atu ai galuega i le taua i luga o tagata faʻatino-tagata faigaluega (saʻo i le taimi, pe a tatou faʻaogaina
LocalExecutorpe i se node mamao i le tulaga oCeleryExecutor), na te tuʻuina atu se faʻamatalaga ia i latou (e pei o se seti o fesuiaiga - faʻataunuʻu tapulaʻa), faʻalauteleina faʻatonuga poʻo faʻataʻitaʻiga fesili, ma faʻaputuina.
Matou te gaosia galuega
Muamua, seʻi o tatou faʻavasegaina le fuafuaga lautele o la tatou doug, ona tatou faʻasalalau atili lea i faʻamatalaga atili, aua tatou te faʻaaogaina ni fofo e le taua.
O lea la, i lona foliga sili ona faigofie, o se aso e foliga mai e pei o lenei:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Sei o tatou mafaufau i ai:
- Muamua, matou te faʻaulufale mai le libs manaʻomia ma se isi mea;
sql_server_ds- o lea lavaList[namedtuple[str, str]]fa'atasi ai ma igoa o feso'ota'iga mai le Airflow Connections ma fa'amaumauga o le a ave ai la matou ipu;dag- le faʻaaliga o la tatou aso, lea e tatau ona i ai i totonuglobals(), a leai o le a le maua e le Airflow. E tatau foi ona fai mai Doug:- o ai lona igoa
orders- o lenei igoa o le a aliali mai i luga o le upega tafaʻilagi, - o le a ia galue mai le vaeluapo i le aso valu o Iulai,
- ma e tatau ona tamoʻe, e tusa ma le 6 itula uma (mo tagata faigata iinei nai lo
timedelta()fa'atagainacron-laina0 0 0/6 ? * * *, mo le to'afilemu - o se fa'aaliga pei@daily);
- o ai lona igoa
workflow()o le a faia le galuega autu, ae le o le taimi nei. Mo le taimi nei, o le a na'o le lafoa'i o la tatou tala i totonu o le ogalaau.- Ma o lenei o le togafiti faigofie o le fatuina o galuega:
- tatou te taufetuli i o tatou punavai;
- amatalia
PythonOperator, lea o le a faʻataunuʻuina a tatou faʻataʻitaʻigaworkflow(). Aua nei galo e faʻamaonia se igoa tulaga ese (i totonu o le aso) o le galuega ma nonoa le aso lava ia. Fu'aprovide_contexti le isi itu, o le a sasaa atu finauga faaopoopo i le galuega tauave, lea o le a tatou aoina ma le faaeteete faaaogaina**context.
Mo le taimi nei, na o le pau lena. O mea na matou maua:
- aso fou i luga o le upega tafaʻilagi,
- tasi ma le afa selau galuega o le a faia i se tulaga tutusa (pe afai e faatagaina e le Airflow, Celery faatulagaga ma le gafatia o le server).
Ia, toeitiiti lava maua.

O ai e fa'apipi'i fa'alagolago?
Ina ia faafaigofieina lenei mea atoa, sa ou faaseseina docker-compose.yml faiga requirements.txt i nodes uma.
Ua leai nei:

O sikuea lanu efuefu o fa'ata'ita'iga o galuega e fa'agasolo e le fa'atulagaina.
Matou te faʻatali mo sina taimi, o galuega e faʻapipiʻiina e le aufaigaluega:

O lanu meamata, ioe, ua maeʻa lelei a latou galuega. Reds e le manuia tele.
I le ala, e leai se faila i luga o le matou prod
./dags, e leai se feso'ota'iga i le va o masini - o aso uma e taoto i totonugiti luga o la matou Gitlab, ma Gitlab CI tufatufa faʻafouga i masini pe a tuʻufaʻatasia i totonumaster.
O sina mea itiiti e uiga i Fugalaau
A o soli e le aufaigaluega a tatou pacifiers, tatou manatua se isi meafaigaluega e mafai ona faʻaalia mai ia i tatou se mea - Fugalaau.
Le itulau muamua o lo'o iai fa'amatalaga otooto i nodes a tagata faigaluega:

Le itulau sili ona malosi ma galuega na alu i le galuega:

Le itulau pito sili ona manaia ma le tulaga o le matou faioloa:

O le itulau sili ona susulu o loʻo iai faʻataʻitaʻiga tulaga o galuega ma latou taimi faʻatino:

Matou te utaina le mea o loʻo i lalo o le uta
O lea la, o galuega uma ua maeʻa, e mafai ona e ave ese le manua.

Ma sa i ai le tele o manuʻa - mo se tasi mafuaaga poʻo se isi. I le tulaga o le faʻaogaina saʻo o le Airflow, o nei sikuea e faʻaalia ai e mautinoa lava e leʻi oʻo mai faʻamatalaga.
E te manaʻomia le matamata i le ogalaau ma toe amata faʻataʻitaʻiga galuega paʻu.
O le kiliki i luga o soʻo se sikuea, o le a matou vaʻai i gaioiga o loʻo avanoa mo i matou:

E mafai ona e ave ma fa'amama le pa'u. O lona uiga, ua galo ia i tatou ua i ai se mea ua le manuia iina, ma o le galuega lava e tasi o le a alu i le scheduler.

E manino lava o le faia o lenei mea i le isumu ma sikuea mumu uma e le o se tagata - e le o le mea lea tatou te faʻamoemoeina mai le Airflow. E masani lava, e iai a tatou auupega o le tele o faʻaumatiaga: Browse/Task Instances

Sei o tatou filifilia mea uma i le taimi e tasi ma toe seti i le zero, kiliki le mea saʻo:

A maeʻa ona faʻamamā, e faʻapea a matou taxis (ua leva ona latou faʻatali mo le faʻatulagaina e faʻatulagaina i latou):

So'oga, matau ma isi fesuiaiga
Ua oʻo i le taimi e vaʻai ai i le DAG e sosoo ai, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Pe na faia e tagata uma se fa'afouga lipoti? O ia foi lenei: o loʻo i ai se lisi o punaoa mai le mea e maua ai faʻamatalaga; o loʻo i ai se lisi e tuʻu ai; aua nei galo ona pu pe a tupu mea uma pe malepe (ia, e le o se mea e uiga ia i tatou, leai).
Seʻi o tatou toe suʻesuʻe le faila ma vaʻai i mea fou le manino:
from commons.operators import TelegramBotSendMessage- e leai se mea e taofia ai i matou mai le faia o a matou lava tagata faʻatautaia, lea na matou faʻaogaina e ala i le faia o se afifi laʻititi mo le auina atu o feʻau i le Unblocked. (O le a matou talanoa atili e uiga i lenei faʻalapotopotoga i lalo);default_args={}- e mafai e le aso ona tufatufa fa'amatalaga tutusa i ana fa'alapotopotoga uma;to='{{ var.value.all_the_kings_men }}'- fanuatoo le a tatou le maua hardcoded, ae dynamically gaosia e faaaoga Jinja ma se fesuiaiga ma se lisi o imeli, lea ou te tuu ma le faaeteete i totonu.Admin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS- tulaga mo le amataina o le tagata fa'afoe. I la matou tulaga, o le tusi o le a lele atu i le pule pe a uma ona faʻalagolago manuia;tg_bot_conn_id='tg_main'- finaugaconn_idtalia ID feso'ota'iga matou te faia i totonuAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- feʻau i Telegram o le a lele ese pe a iai ni galuega pa'ū;task_concurrency=1- matou te faʻasaina le faʻalauiloaina faʻatasi o le tele o faʻataʻitaʻiga o galuega o le tasi galuega. A leai, o le a tatou maua le faʻalauiloaina faʻatasi o nisiVerticaOperator(vaavaai i le laulau e tasi);report_update >> [email, tg]- umaVerticaOperatorfa'atasi i le lafoina o tusi ma fe'au, pei o lenei:

Ae talu ai e eseese tulaga fa'alauiloa a le au fa'asalalau, e na'o le tasi e galue. I le Tree View, o mea uma e foliga mai e itiiti ifo le vaʻaia:

O le a ou fai atu ni nai upu e uiga i macros ma a latou uo- fesuiaiga.
Macros o Jinja placeholders e mafai ona suitulaga faʻamatalaga aoga eseese i finauga a le aufaipisinisi. Mo se faʻataʻitaʻiga, pei o lenei:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} o le a fa'alautele atu i mea o lo'o i totonu o le fa'aliliuga fa'amatalaga execution_date i le faatulagaga YYYY-MM-DD: 2020-07-14. O le vaega pito sili ona lelei o le faʻaogaina o fesuiaiga o tala i se faʻataʻitaʻiga faʻapitoa (se sikuea i le Tree View), ma pe a toe amataina, o le a faʻalauteleina le faʻaogaina i tulaga tutusa.
E mafai ona va'aia fa'atatauga fa'atatau e fa'aaoga ai le fa'amau Fa'aliliuina i fa'ata'ita'iga o galuega ta'itasi. O le auala lenei o le galuega i le lafoina o se tusi:

Ma o lea i le galuega i le auina atu o se savali:

O lo'o maua se lisi atoa o macros fa'apipi'i mo le lomiga fou o lo'o maua iinei:
E le gata i lea, faatasi ai ma le fesoasoani a plugins, e mafai ona tatou faʻaalia a tatou lava macros, ae o se isi tala.
I le faaopoopo atu i mea na muai faʻamalamalamaina, e mafai ona tatou suitulaga i tau oa tatou fesuiaiga (ua uma ona ou faʻaogaina i le code i luga). Tatou fatu i totonu Admin/Variables lua mea:

Mea uma e mafai ona e faʻaaogaina:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')O le tau e mafai ona avea ma scalar, pe mafai foi ona avea ma JSON. I le tulaga o JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}na'o le fa'aoga o le ala i le ki mana'omia: {{ var.json.bot_config.bot.token }}.
O le a ou fai atu moni lava se upu e tasi ma faaali atu se ata e tasi e uiga i sootaga. O mea uma e tulagalua iinei: i luga o le itulau Admin/Connections matou te fatuina se fesoʻotaʻiga, faʻaopoopo a matou logins / passwords ma isi faʻamaufaʻailoga patino iina. Faapei o lea:

E mafai ona faʻailogaina upu faʻamaonia (sili atu ona maeʻa nai lo le faaletonu), pe mafai foi ona e tuʻua le ituaiga fesoʻotaʻiga (e pei ona ou faia mo tg_main) - o le mea moni o le lisi o ituaiga o loʻo faʻapipiʻiina i Airflow faʻataʻitaʻiga ma e le mafai ona faʻalauteleina e aunoa ma le oʻo i totonu o faʻamaumauga autu (afai faʻafuaseʻi ou te leʻi google se mea, faʻamolemole faʻasaʻo aʻu), ae leai se mea e taofia ai i matou mai le mauaina o faʻatagaga naʻo le igoa.
E mafai foi ona e faia ni feso'ota'iga i le igoa lava e tasi: i lenei tulaga, o le metotia BaseHook.get_connection(), lea e maua ai tatou sootaga i igoa, o le a tuuina atu fa'afuase'i mai le tele o igoa (e sili atu ona talafeagai le faia o Round Robin, ae seʻi o tatou tuʻuina atu i luga o le lotofuatiaifo o le au atinaʻe Airflow).
Fesuia'i ma Feso'ota'iga e mautinoa lava e manaia meafaigaluega, ae e taua le aua ne'i leiloa le paleni: o fea vaega o au tafega e te teuina i totonu o le code lava ia, ma o fea vaega e te tu'uina atu i le Airflow mo le teuina. I le tasi itu, vave suia le tau, mo se faʻataʻitaʻiga, pusa meli, e mafai ona faigofie e ala i le UI. I le isi itu, o le toe foʻi atu lea i le kiliki o le isumu, lea na matou (I) manaʻo e faʻaumatia.
O le galue ma feso'ota'iga o se tasi lea o galuega matau. I se tulaga lautele, Airflow matau o vaega ia mo le faʻafesoʻotaʻi i auaunaga a isi vaega ma faletusi. Faataitaiga, JiraHook o le a tatalaina se tagata fa'atau mo matou e fegalegaleai ma Jira (e mafai ona e fa'agasolo galuega i tua ma luma), ma le fesoasoani a SambaHook e mafai ona e tuleia se faila i le lotoifale i smb-manatu.
Fa'avasega le fa'agaioiga masani
Ma na matou latalata i le tilotilo i le auala na faia ai TelegramBotSendMessage
kote commons/operators.py fa'atasi ma le tagata fa'afoe moni:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)O iinei, pei o isi mea uma i le Airflow, e matua faigofie lava mea uma:
- Tufaa mai
BaseOperator, lea e fa'atino ai ni nai mea fa'apitoa ole Airflow (va'ai i lou taimi paganoa) - Fa'ailoa fanua
template_fields, lea o le a suʻe ai e Jinja ni macros e faʻatautaia. - Fa'atulaga finauga sa'o mo
__init__(), seti le faaletonu pe a tatau ai. - E lei galo foi ia i matou le amataga o le tuaa.
- Tatala le matau talafeagai
TelegramBotHookmaua mai ai se mea a le tagata o tausia. - Metotia ua sui (toe faauigaina).
BaseOperator.execute(), lea o le a faʻafefeteina e Airfow pe a oʻo mai le taimi e faʻalauiloa ai le tagata faʻapipiʻi - i totonu o le a matou faʻatinoina ai le gaioiga autu, galo e saini i totonu. (Matou te ulufale i totonu, i le ala, i totonustdoutиstderr- O le a faʻalavelaveina e le ea mea uma, afifi matagofie, faʻaumatia pe a manaʻomia.)
Sei o tatou vaai po o le a le mea ua tatou maua commons/hooks.py. Le vaega muamua o le faila, ma le matau lava ia:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientOu te le iloa foi pe o le a le mea e faʻamatalaina iinei, o le a ou matauina mea taua:
- Matou te mautofi, mafaufau e uiga i finauga - i le tele o tulaga o le a tasi:
conn_id; - Fa'asili auala masani: Sa fa'atapula'aina a'u
get_conn(), lea ou te maua ai le fesoʻotaʻiga laina i le igoa ma naʻo le mauaina o le vaegaextra(o se fanua JSON lenei), lea na ou (e tusa ai ma aʻu lava faatonuga!) tuʻuina le Telegram bot faʻailoga:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Ou te fatuina se faataitaiga o la matou
TelegramBot, tuuina atu i ai se faailoga patino.
Pau lava lena. E mafai ona e mauaina se tagata fa'atau mai se matau e fa'aaoga TelegramBotHook().clent poʻo TelegramBotHook().get_conn().
Ma le vaega lona lua o le faila, lea ou te faia ai se microwrapper mo le Telegram REST API, ina ia aua nei toso tutusa mo se tasi auala sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))O le auala saʻo o le faʻaopoopoina uma:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- i totonu o le faʻapipiʻi, tuʻu i totonu o se faleoloa lautele, ma tuʻuina atu i le Open Source.
A'o matou su'esu'eina nei mea uma, na mafai ona le manuia a matou lipoti fa'afouga ma lafo mai ia te a'u se fe'au sese i le alalaupapa. O le a ou siaki pe ua sese...

Sa i ai se mea na gau i la matou taifau! Pe le o le mea ea lena sa tatou faatalitalia? E sa'o lava!
E te sasaa atu?
E te lagona na ou misia se mea? E foliga mai na ia folafola e faʻafeiloaʻi faʻamatalaga mai le SQL Server i Vertica, ona ia ave lea ma alu ese mai le autu, le ulavale!
O lenei faʻalavelave na faʻamoemoeina, naʻo loʻu faʻamalamalamaina o ni faʻamatalaga mo oe. O lea e mafai ona e alu atili.
O le matou fuafuaga lenei:
- Fai aso
- Fausia galuega
- Vaai i le matagofie o mea uma
- Tofi numera o vasega e fa'atumu
- Maua fa'amatalaga mai le SQL Server
- Tu'u fa'amaumauga ile Vertica
- Ao mai fuainumera
O lea la, ina ia faʻaleleia uma nei mea, sa ou faia se faʻaopoopoga itiiti i la matou docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyO iina tatou te siitia ai:
- Vertica e avea ma talimalo
dwhfa'atasi ai ma fa'atonuga sili ona leaga, - tolu faʻataʻitaʻiga o le SQL Server,
- matou te faʻatumu faʻamaumauga i le vaega mulimuli ma nisi faʻamatalaga (e leai se mea e vaʻai i totonu
mssql_init.py!)
Matou te faʻalauiloaina mea lelei uma ma le fesoasoani a se faʻatonuga sili atu ona faigata nai lo le taimi mulimuli:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Le mea na faia e le matou vavega randomizer, e mafai ona e faʻaogaina le mea Data Profiling/Ad Hoc Query:

O le mea autu e le o le faʻaalia i tagata suʻesuʻe
fa'amatala auiliili ETL sauniga Ou te le faia, o mea uma e le taua iina: matou te faia se faʻavae, o loʻo i ai se faʻailoga i totonu, matou te afifiina mea uma i se pule faʻamatalaga, ma o lenei matou te faia lenei mea:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passUa oo mai le taimi aoina a matou faʻamatalaga mai a matou laulau e tasi ma le afa selau. Sei o tatou faia lenei mea ma le fesoasoani a laina sili ona le lelei:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Faatasi ai ma le fesoasoani a se matau matou te maua mai le Airflow
pymssql-feso'ota'i - Sei o tatou sui se tapulaʻa i le tulaga o se aso i totonu o le talosaga - o le a lafoina i totonu o le galuega e le masini mamanu.
- Fafaga le matou talosaga
pandaso ai na te mauaina i tatouDataFrame- o le a aoga ia i tatou i le lumanaʻi.
O lo'o ou fa'aogaina le suitulaga
{dt}nai lo se parakalafa talosaga%se le ona o aʻu o se Pinocchio leaga, ae onapandasle mafai ona taulimainapymssqlma se'e le mea mulimuliparams: Liste ui ina mana'o moni lava o iatuple.
Ia maitauina foi o le tagata atiaepymssqlna filifili e le toe lagolagoina o ia, ma ua oo i le taimi e alu ese aipyodbc.
Sei o tatou vaʻai pe o le a le mea na faʻapipiʻiina e le Airflow finauga oa tatou galuega i:

Afai e leai se faʻamatalaga, e leai se aoga e faʻaauau ai. Ae e ese foi le mafaufau i le faatumuina o le manuia. Ae e le o se mea sese lea. A-ah-ah, o le a le mea e fai?! Ma o le mea lenei:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException ta'u atu i le Airflow e leai ni mea sese, ae matou te faamisi le galuega. O le a leai se sikuea lanu meamata pe mumu, ae piniki.
Se'i togi a tatou fa'amaumauga tele koluma:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])O le:
- O le database lea na matou maua mai ai poloaiga,
- ID o le matou sauniga lolovaia (o le a ese mo galuega uma),
- O se hash mai le punavai ma le faʻatonu ID - ina ia i totonu o le faʻamaumauga mulimuli (lea e sasaa ai mea uma i totonu o le laulau e tasi) matou te maua se ID faʻatonu tulaga ese.
O loʻo tumau pea le laasaga mulimuli: sasaa mea uma i Vertica. Ma, o le mea e ese ai, o se tasi o auala sili ona mataʻina ma lelei e fai ai lenei mea e ala i le CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- O loʻo matou faia se tali faʻapitoa
StringIO. pandaso le a tuu atu ma le agalelei a tatouDataFramei le pepaCSV-laina.- Tatala se feso'ota'iga i le matou Vertica e sili ona fiafia i ai ma se matau.
- Ma o lenei ma le fesoasoani
copy()lafo sa'o a matou fa'amatalaga i Vertika!
O le a matou ave mai le avetaavale pe fia laina na faʻatumu, ma taʻu atu i le pule o le vasega o loʻo lelei mea uma:
session.loaded_rows = cursor.rowcount
session.successful = TruePau lava lena.
I luga o le faʻatau, matou te fatuina ma le lima le ipu faʻatatau. O iinei na ou faʻatagaina ai aʻu lava i se tamai masini:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)O lo'o ou fa'aaogaina
VerticaOperator()Ou te fatuina se faʻamaumauga faʻamaumauga ma se laulau (pe afai latou te leʻi i ai, ioe). O le mea autu o le faʻatulagaina saʻo o faʻalagolago:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadFaʻaopoopo i luga
- Ia, - fai mai le tamai isumu, - e le o lea, i le taimi nei
Ua e talitonu o aʻu o le manu sili ona mataʻutia i le vaomatua?
Julia Donaldson, Le Gruffalo
Ou te manatu afai e i ai se tauvaga ma aʻu uo: o ai o le a vave faia ma faʻalauiloa se faiga ETL mai le amataga: latou ma a latou SSIS ma se isumu ma aʻu ma Airflow ... Ona matou faʻatusatusaina lea o le faigofie o le tausiga ... Oka, ou te manatu o le a e malilie o le a ou sasaina i latou i luma uma!
Afai e sili atu le ogaoga, o Apache Airflow - e ala i le faʻamatalaina o faiga i le tulaga o le polokalame code - na faia laʻu galuega tele sili atu le mafanafana ma le fiafia.
O lona faʻalauteleina e le faʻatapulaʻaina, e le gata i le faʻaogaina o plug-ins ma le predisposition i scalability, e te maua ai le avanoa e faʻaoga ai le Airflow i toetoe lava o soʻo se eria: e oʻo lava i le taamilosaga atoa o le aoina, saunia ma le faʻaogaina o faʻamatalaga, e oʻo lava i le faʻalauiloaina o rockets (i Mars, o vasega).
Vaega mulimuli, fa'amatalaga ma fa'amatalaga
Le salu ua matou aoina mo oe
start_date. Ioe, o lea ua leva ona avea ma meme i le lotoifale. E ala i le finauga autu a Dougstart_datepasi uma. I se faapuupuuga, afai e te faʻamaonia i totonustart_dateaso nei, maschedule_interval- e tasi le aso, ona amata loa lea o le DAG taeao e leai se taimi muamua.start_date = datetime(2020, 7, 7, 0, 1, 2)Ma e le toe iai ni fa'afitauli.
O lo'o i ai se isi fa'alavelave fa'alavelave e feso'ota'i ma ia:
Task is missing the start_date parameter, lea e masani ona taʻu mai ai ua galo ia te oe e fusifusia i le operator dag.- O mea uma i luga ole masini e tasi. Ioe, ma faʻavae (Airflow lava ia ma lo tatou faʻapipiʻiina), ma se upega tafaʻilagi, ma se faʻatulagaina, ma tagata faigaluega. Ma sa aoga foi. Ae i le aluga o taimi, o le numera o galuega mo auaunaga na faʻatupulaia, ma ina ua amata ona tali atu PostgreSQL i le faasino igoa i le 20 s nai lo le 5 ms, na matou ave ma ave ese.
- LocalExecutor. Ioe, o loo tatou nonofo pea i luga, ma ua uma ona tatou oo i le faatausiusiuga o le to e le gata. LocalExecutor ua lava mo i matou i le taimi nei, ae o le taimi nei e faʻalautele ma le itiiti ifo ma le tasi le tagata faigaluega, ma e tatau ona matou galulue malosi e siitia atu i CeleryExecutor. Ma i le manatu i le mea moni e mafai ona e galue i luga o le masini e tasi, e leai se mea e taofia oe mai le faʻaaogaina o le Seleri e oʻo lava i luga o se 'auʻaunaga, lea "ioe, e le mafai lava ona alu i le gaosiga, faʻamaoni!"
- Le fa'aaogaina mea faigaluega faufale:
- fesootaiga e teu ai fa'amatalaga tautua,
- SLA Misi e tali atu i galuega e leʻi manuia i le taimi,
- xcom mo fefaʻatauaʻiga metadata (sa ou fai atu metafaʻamatalaga!) i le va o galuega.
- Fa'aleagaina meli. Ia, o le a sa'u tala e fai atu? Na fa'atūina fa'aaliga mo le toe faia uma o galuega pa'u. O la'u galuega Gmail ua i ai> 90k imeli mai Airflow, ma o le upega o meli meli e musu e piki ma tape le sili atu i le 100 i le taimi.
E tele fa'alavelave:
Tele mea faigaluega masini
Ina ia mafai ona tatou galulue atili ma o tatou ulu ae le o tatou lima, ua saunia e le Airflow mo i tatou lenei:
- - o loʻo i ai pea le tulaga o le Experimental, lea e le taofia ai o ia mai le galue. Faatasi ai ma ia, e le gata e mafai ona e mauaina faʻamatalaga e uiga i aso ma galuega, ae faʻapea foi ona taofi / amata se aso, fatuina se DAG Run poʻo se vaitaele.
- - tele meafaigaluega o loʻo maua i le laina faʻatonu e le naʻo le faʻaogaina e ala i le WebUI, ae e masani ona toesea. Faataitaiga:
backfillmana'omia e toe amata ai galuega.
Mo se faʻataʻitaʻiga, na o mai le au suʻesuʻe ma fai mai: "Ma o oe, uo, e i ai le valea i faʻamaumauga mai ia Ianuari 1 i le 13! Fa'alelei, fa'aleleia, fa'aleleia, fa'alelei!" Ma o oe o se mea fiafia:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Auaunaga faavae:
initdb,resetdb,upgradedb,checkdb. run, lea e mafai ai e oe ona faʻatino se tasi faʻataʻitaʻiga galuega, ma e oʻo lava i togi i luga o faʻalagolago uma. E le gata i lea, e mafai ona e faʻaogaina e ala iLocalExecutor, tusa lava pe iai sau fuifui Seleri.- E fai tutusa lava
test, na'o fa'avae fo'i e lē tusia ai se mea. connectionsfa'ataga le fa'atupu tele o feso'ota'iga mai le atigi.
- - o se auala sili ona faigata e fegalegaleai ai, lea e faʻamoemoe mo plugins, ae le o le faʻafefe i totonu ma lima laiti. Ae o ai e taofia i tatou mai le o atu i ai
/home/airflow/dags, tamoeipythonma amata ona fai mea leaga? E mafai, mo se faʻataʻitaʻiga, auina atu i fafo soʻotaga uma ma le code lea:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Feso'ota'i ile fa'amaumauga ole Airflow. Ou te le fautuaina e tusi i ai, ae o le mauaina o galuega mo tulaga eseese e mafai ona sili atu le vave ma faigofie nai lo soʻo se API.
Seʻi tatou fai atu e le o a tatou galuega uma e le mafai, ae e mafai ona paʻu i nisi taimi, ma e masani lava. Ae o nai poloka poloka ua leva ona masalomia, ma e tatau ona siaki.
Fa'aeteete SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
mau
Ma o le mea moni, o sootaga muamua e sefulu mai le tuʻuina atu o Google o mea o loʻo i totonu o le Airflow folder mai aʻu faʻailoga.
- - ioe, e tatau ona tatou amata i le ofisa. pepa, ae o ai na te faitauina faatonuga?
- - Ia, ia faitau i fautuaga mai le au foafoa.
- - o le amataga lava: le faʻaoga faʻaoga i ata
- - o faʻamatalaga autu e faʻamatalaina lelei, pe afai (faʻafuaseʻi!) E te leʻi malamalama i se mea mai ia te aʻu.
- - ose ta'iala pu'upu'u mo le fa'atulagaina o se fuifui Airflow.
- - toeititi lava tutusa tala manaia, se'i vagana ai le tele o fa'ailoga, ma nai fa'ata'ita'iga.
- - e uiga i le galulue faʻatasi ma Seleli.
- - e uiga i le le atoatoa o galuega, utaina e le ID nai lo le aso, suiga, fausaga faila ma isi mea manaia.
- - faʻalagolago i galuega ma le Trigger Rule, lea na ou taʻua naʻo le pasi.
- - faʻafefea ona faʻatoʻilaloina nisi "galue e pei ona faʻamoemoeina" i le faʻatulagaina, utaina faʻamaumauga leiloa ma faʻamuamua galuega.
- - aoga fesili SQL ile Airflow metadata.
- - o loʻo i ai se vaega aoga e uiga i le fatuina o se masini masani.
- - o se faʻamatalaga puupuu manaia e uiga i le fausiaina o se atinaʻe i luga o le AWS mo Faʻamatalaga Saienisi.
- - mea sese masani (pe a le faitau e se tasi faatonuga).
- - ataata pe fa'afefea ona tootoo e tagata le teuina o fa'aupuga, e ui lava e mafai ona e fa'aogaina So'oga.
- - fa'asinomaga DAG fa'ase'e, fa'asolo fa'amatalaga i galuega, toe fa'atatau i fa'alagolago, fa'apea fo'i e fa'ase'e galuega fa'alauiloa.
- - e uiga i le faʻaaogaina
default argumentsиparamsi faʻataʻitaʻiga, faʻapea foʻi ma Fesuiaiga ma Fesoʻotaʻiga. - - o se tala e uiga i le auala o loʻo sauniuni ai le fuafuaga mo le Airflow 2.0.
- - ose tala tuai e uiga i le fa'aogaina o la matou fuifui i totonu
docker-compose. - - galuega fa'amalosi e fa'aaoga ai fa'ata'ita'iga ma fa'asologa o tala.
- - faʻasalalauga masani ma aganuʻu e ala i meli ma Slack.
- - Galuega fa'asoa, macros ma XCom.
Ma o fesoʻotaʻiga na faʻaaogaina i le tusiga:
- - avanoa avanoa mo le fa'aoga i fa'ata'ita'iga.
- — O mea sese masani pe a fatuina aso.
- -
docker-composemo faʻataʻitaʻiga, debugging ma isi mea. - - afifi Python mo Telegram REST API.
puna: www.habr.com




