Talofa, o aʻu o Dmitry Logvinenko - Inisinia Faʻamatalaga o le Matagaluega o Faʻamaumauga a le Vezet vaega o kamupani.
O le a ou taʻu atu ia te oe se meafaigaluega matagofie mo le atinaʻeina o faiga ETL - Apache Airflow. Ae o le Airflow e matua tele lava ma tele e tatau ona e vaʻai totoʻa i ai e tusa lava pe e te le o aʻafia i faʻamaumauga, ae e iai se manaʻoga e faʻalauiloa i lea taimi ma lea taimi soʻo se faiga ma mataʻituina a latou faʻatinoga.
Ma ioe, o le a le gata ina ou taʻu atu, ae faʻaalia foi: o le polokalama e tele naua code, screenshots ma fautuaga.
O mea e masani ona e va'aia pe a e google le upu Airflow / Wikimedia Commons
- naʻo le sili atu, ma na faia mo faʻamoemoega eseese, e pei ona tusia i luma o le kat):
tamoe ma mataʻituina galuega i luga o se numera e le faʻatapulaʻaina o masini (e pei o le tele o Celery / Kubernetes ma lou lotofuatiaifo o le a faʻatagaina oe)
faʻatasi ai ma le faʻagasologa o galuega faʻavae mai le faigofie tele e tusi ma malamalama i le Python code
ma le mafai ona faʻafesoʻotaʻi soʻo se faʻamaumauga ma API i le tasi ma le isi e faʻaaoga uma vaega uma ua saunia ma faʻapipiʻi fale (lea e matua faigofie lava).
Matou te faʻaaogaina Apache Airflow pei o lenei:
matou te aoina faʻamatalaga mai faʻamatalaga eseese (tele SQL Server ma PostgreSQL faʻataʻitaʻiga, API eseese faʻatasi ai ma fua faʻatatau, e oʻo lava i le 1C) i le DWH ma le ODS (e iai a matou Vertica ma Clickhouse).
o le a le maualuga cron, lea e amata ai le faʻamautuina o faʻamaumauga i luga o le ODS, ma mataʻituina foi a latou tausiga.
Seia oʻo mai talu ai nei, o matou manaʻoga na ufiufi e se tasi tamaʻi server ma 32 cores ma 50 GB o RAM. I le Airflow, e aoga lenei mea:
sili atu 200 aso (o le mea moni o galuega, lea na matou faʻatumu ai galuega),
i le averesi taitasi 70 galuega,
amata lenei lelei (fa'apena foi i le averesi) faatasi i le itula.
Ma e uiga i le auala na tatou faʻalauteleina ai, o le a ou tusia i lalo, ae o lenei seʻi o tatou faʻamalamalamaina le über-faʻafitauli o le a tatou foia:
E tolu uluai SQL Servers, e taʻitasi ma 50 faʻamaumauga - faʻataʻitaʻiga o le tasi poloketi, i le faasologa, latou te tutusa le fausaga (toetoe lava i soo se mea, mua-ha-ha), o lona uiga e tofu i latou ma se laulau o Poloaiga (lelei, o se laulau ma lena. igoa e mafai ona tulei i soo se pisinisi). Matou te ave faʻamaumauga e ala i le faʻaopoopoina o faʻalapotopotoga o auaunaga (server source, source database, ETL task ID) ma lafo i totonu, fai mai, Vertica.
Tatou o!
O le vaega autu, faʻatinoga (ma sina faʻamatalaga)
Aisea tatou (ma oe)
Ina ua tetele laau ma sa ou faigofie SQL-schik i se tasi o faleoloa a Rusia, na matou faʻaseseina faiga ETL aka faʻamatalaga faʻamatalaga e faʻaaoga ai meafaigaluega e lua o loʻo avanoa mo i matou:
Informatica Power Center - o se faiga e matua salalau, e matua'i aoga, ma ana lava meafaigaluega, lona lava fa'aliliuga. Na ou faʻaaogaina le Atua faʻasa 1% o ona gafatia. Aisea? Ia, muamua lava, o lenei atinaʻe, i se mea mai le 380s, na faʻamalosia ai i matou i le mafaufau. Lona lua, o lenei mea faʻapipiʻi ua mamanuina mo faiga sili ona manaia, toe faʻaaogaina vaega ita ma isi mea taua-pisinisi-togafiti. E uiga i le mea moni e tau, pei o le apaau o le Airbus AXNUMX / tausaga, matou te le fai atu se mea.
Faʻaeteete, o le faʻamalama e mafai ona afaina ai tagata i lalo ole 30
SQL Server Integration Server - na matou fa'aogaina lenei uo i totonu oa matou galuega faatino. Ia, o le mea moni: ua uma ona matou faʻaogaina le SQL Server, ma o le a le talafeagai le le faʻaaogaina o ana meafaigaluega ETL. O mea uma i totonu e lelei: o le atinaʻe uma e matagofie, ma le alualu i luma lipoti ... Ae e le o le mea lea matou te fiafia ai i mea faʻapipiʻi, oi, e le mo lenei. Version it dtsx (lea o le XML ma nodes shuffled i luga o sefe) tatou mafai, ae o le a le uiga? E fa'afefea le faia o se pusa galuega e toso atu ai le faitau selau o laulau mai le tasi server i le isi? Ioe, o le a le selau, o lou tamatamailima lima o le a pa'u ese mai le luasefulu fasi, kiliki i luga o le kiore. Ae e mautinoa lava e sili atu ona faʻalelei:
E mautinoa lava sa matou vaavaai mo auala e alu ese ai. Tulaga tutusa toetoe lava na oʻo mai i se faʻapipiʻi pusa SSIS na tusia e ia lava ...
… ona maua ai lea o a'u e se galuega fou. Ma na maua a'u e Apache Airflow.
Ina ua ou iloa o faʻamatalaga o le ETL o le Python code faigofie, ou te leʻi siva ma le fiafia. O le auala lea na faʻaliliuina ma eseese ai faʻamaumauga, ma sasaa laulau ma se fausaga e tasi mai le faitau selau o faʻamaumauga i totonu o le tasi sini na avea ma mataupu o le Python code i le tasi ma le afa pe lua 13 "screens.
Fa'apotopotoina le fuifui
Aua neʻi o tatou faʻatulagaina se aʻoga atoa, ma aua le talanoa e uiga i mea manino atoatoa iinei, e pei o le faʻapipiʻiina o Airflow, lau faʻamaumauga filifilia, Seleli ma isi mataupu o loʻo faʻamatalaina i totonu o faʻailoga.
Ina ia mafai ona vave amata faʻataʻitaʻiga, na ou tusia ai docker-compose.yml lea:
Tatou siitia moni lava Felaʻuaiga o le ea: Fa'atonu, Webserver. Fugalaau o le a taamilo foi iina e mataʻituina galuega Celery (aua ua uma ona tulei i totonu apache/airflow:1.10.10-python3.7, ae matou te le afaina)
PostgreSQL, lea o le a tusia ai e le Airflow ana faʻamatalaga auʻaunaga (faʻamaumauga faʻatulagaina, faʻamaumauga o faʻatinoga, ma isi), ma o le a faʻailogaina e Seleri galuega ua maeʻa;
Redis, lea o le a galue o se tagata e faia galuega mo Seleri;
Tagata fai seleni, lea o le a auai i le faʻatinoina saʻo o galuega.
I le faila ./dags o le a matou faʻaopoopoina a matou faila ma faʻamatalaga o dags. O le a pikiina i luga o le lele, o lea e le manaʻomia ai le faʻafefeteina o le faaputuga atoa pe a uma le mafatua.
I nisi o nofoaga, o le code i faʻataʻitaʻiga e leʻo faʻaalia atoatoa (ina ia le faʻafefeteina le tusitusiga), ae o se mea e suia i le faagasologa. E mafai ona maua faʻataʻitaʻiga faʻataʻitaʻiga faʻataʻitaʻiga atoatoa ile fale teu oloa https://github.com/dm-logv/airflow-tutorial.
I le faʻapotopotoga o le fatuga, na ou faʻalagolago tele i le ata lauiloa puckel/docker-airflow - ia mautinoa e siaki i fafo. Atonu e te le manaʻomia se isi mea i lou olaga.
E maua uma faatulagaga Airflow e le gata i airflow.cfg, ae faʻapea foʻi i suiga o le siosiomaga (faʻafetai i le au atinaʻe), lea na ou faʻaogaina ma le leaga.
E masani lava, e leʻo saunia: Ou te leʻi tuʻuina le fatu fatu i luga o pusa, ou te leʻi faʻalavelave i le saogalemu. Ae na ou faia le mea aupito maualalo talafeagai mo a matou tagata suʻesuʻe.
Manatua:
E tatau ona avanoa le faila dag i le fa'atulagaina ma le aufaigaluega.
E fa'apea fo'i i faletusi uma a isi vaega - e tatau ona fa'apipi'i uma i masini e iai se fa'asologa ma tagata faigaluega.
Ia, o lea ua faigofie:
$ docker-compose up --scale worker=3
A maeʻa mea uma, e mafai ona e vaʻavaʻai i fesoʻotaʻiga i luga ole laiga:
Afai e te leʻi malamalama i se mea i nei "dags" uma, o se lomifefiloi puupuu lenei:
Faʻatulaga - o le tuagane sili ona taua i le Airflow, o loʻo pulea le galue malosi o robots, ae le o se tagata: mataʻituina le faʻatulagaga, faʻafouina aso, faʻalauiloa galuega.
I se tulaga lautele, i lomiga tuai, sa i ai ni faafitauli i le mafaufau (leai, e le o le amnesia, ae o le gaogao) ma o le talatuu na tumau pea i le configs. run_duration - le va o lona toe amata. Ae o lea ua lelei mea uma.
DAG (aka "dag") - "faʻatonu acyclic kalafi", ae o sea faʻamatalaga o le a taʻuina atu i nai tagata, ae o le mea moni o se pusa mo galuega e fegalegaleai ai le tasi ma le isi (silasila i lalo) poʻo se faʻataʻitaʻiga o Package i SSIS ma Workflow i Informatica .
I le faaopoopo atu i dags, atonu o loʻo i ai pea subdags, ae atonu o le a tatou le oʻo atu ia i latou.
DAG Run - amataga aso, lea ua tofia i ai lava execution_date. Dagrans o le aso lava e tasi e mafai ona galulue tutusa (pe a fai na e faia au galuega e le mafai, ioe).
Pule o vaega o tulafono laiti e nafa ma le faia o se gaioiga patino. E tolu ituaiga o tagata faigaluega:
gaoioigapei o le matou fiafia PythonOperator, lea e mafai ona faʻatinoina soʻo se (aoga) Python code;
faaliliuina atu, lea e felauai fa'amatalaga mai lea nofoaga i lea nofoaga, fai mai, MsSqlToHiveTransfer;
sensor i le isi itu, o le a faʻatagaina oe e tali pe faʻagesegese le faʻataunuʻuina atili o le aso seia oʻo ina tupu se mea na tupu. HttpSensor e mafai ona tosoina le faʻaiʻuga faʻamaonia, ma a faʻatali le tali manaʻomia, amata le fesiitaiga GoogleCloudStorageToS3Operator. O le a fesili se mafaufau suʻesuʻe: “Aisea? A uma mea uma, e mafai ona e faia toe fai saʻo i totonu o le faʻalapotopotoga! Ona sosoo ai lea, ina ia aua neʻi faʻapipiʻi le vaitaele o galuega ma tagata faʻamalolo le tumau. E amata le masini, siaki ma mate aʻo leʻi faia le isi taumafaiga.
Task - ta'uta'u fa'agaioiga, tusa lava po'o le a le ituaiga, ma fa'apipi'i i le aso e si'itia i le tulaga o galuega.
fa'ata'ita'iga o galuega - ina ua filifili le fuafuaga lautele ua oʻo i le taimi e tuʻuina atu ai galuega i le taua i luga o tagata faʻatino-tagata faigaluega (saʻo i le taimi, pe a tatou faʻaogaina LocalExecutor pe i se node mamao i le tulaga o CeleryExecutor), na te tuʻuina atu se faʻamatalaga ia i latou (e pei o se seti o fesuiaiga - faʻataunuʻu tapulaʻa), faʻalauteleina faʻatonuga poʻo faʻataʻitaʻiga fesili, ma faʻaputuina.
Matou te gaosia galuega
Muamua, seʻi o tatou faʻavasegaina le fuafuaga lautele o la tatou doug, ona tatou faʻasalalau atili lea i faʻamatalaga atili, aua tatou te faʻaaogaina ni fofo e le taua.
O lea la, i lona foliga sili ona faigofie, o se aso e foliga mai e pei o lenei:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Sei o tatou mafaufau i ai:
Muamua, matou te faʻaulufale mai le libs manaʻomia ma se isi mea;
sql_server_ds - o lea lava List[namedtuple[str, str]] fa'atasi ai ma igoa o feso'ota'iga mai le Airflow Connections ma fa'amaumauga o le a ave ai la matou ipu;
dag - le faʻaaliga o la tatou aso, lea e tatau ona i ai i totonu globals(), a leai o le a le maua e le Airflow. E tatau foi ona fai mai Doug:
o ai lona igoa orders - o lenei igoa o le a aliali mai i luga o le upega tafaʻilagi,
o le a ia galue mai le vaeluapo i le aso valu o Iulai,
ma e tatau ona tamoʻe, e tusa ma le 6 itula uma (mo tagata faigata iinei nai lo timedelta() fa'atagaina cron-laina 0 0 0/6 ? * * *, mo le to'afilemu - o se fa'aaliga pei @daily);
workflow() o le a faia le galuega autu, ae le o le taimi nei. Mo le taimi nei, o le a na'o le lafoa'i o la tatou tala i totonu o le ogalaau.
Ma o lenei o le togafiti faigofie o le fatuina o galuega:
tatou te taufetuli i o tatou punavai;
amatalia PythonOperator, lea o le a faʻataunuʻuina a tatou faʻataʻitaʻiga workflow(). Aua nei galo e faʻamaonia se igoa tulaga ese (i totonu o le aso) o le galuega ma nonoa le aso lava ia. Fu'a provide_context i le isi itu, o le a sasaa atu finauga faaopoopo i le galuega tauave, lea o le a tatou aoina ma le faaeteete faaaogaina **context.
Mo le taimi nei, na o le pau lena. O mea na matou maua:
aso fou i luga o le upega tafaʻilagi,
tasi ma le afa selau galuega o le a faia i se tulaga tutusa (pe afai e faatagaina e le Airflow, Celery faatulagaga ma le gafatia o le server).
Ia, toeitiiti lava maua.
O ai e fa'apipi'i fa'alagolago?
Ina ia faafaigofieina lenei mea atoa, sa ou faaseseina docker-compose.yml faiga requirements.txt i nodes uma.
Ua leai nei:
O sikuea lanu efuefu o fa'ata'ita'iga o galuega e fa'agasolo e le fa'atulagaina.
Matou te faʻatali mo sina taimi, o galuega e faʻapipiʻiina e le aufaigaluega:
O lanu meamata, ioe, ua maeʻa lelei a latou galuega. Reds e le manuia tele.
I le ala, e leai se faila i luga o le matou prod ./dags, e leai se feso'ota'iga i le va o masini - o aso uma e taoto i totonu git i luga o la matou Gitlab, ma Gitlab CI tufatufa faʻafouga i masini pe a tuʻufaʻatasia i totonu master.
O sina mea itiiti e uiga i Fugalaau
A o soli e le aufaigaluega a tatou pacifiers, tatou manatua se isi meafaigaluega e mafai ona faʻaalia mai ia i tatou se mea - Fugalaau.
Le itulau muamua o lo'o iai fa'amatalaga otooto i nodes a tagata faigaluega:
Le itulau sili ona malosi ma galuega na alu i le galuega:
Le itulau pito sili ona manaia ma le tulaga o le matou faioloa:
O le itulau sili ona susulu o loʻo iai faʻataʻitaʻiga tulaga o galuega ma latou taimi faʻatino:
Matou te utaina le mea o loʻo i lalo o le uta
O lea la, o galuega uma ua maeʻa, e mafai ona e ave ese le manua.
Ma sa i ai le tele o manuʻa - mo se tasi mafuaaga poʻo se isi. I le tulaga o le faʻaogaina saʻo o le Airflow, o nei sikuea e faʻaalia ai e mautinoa lava e leʻi oʻo mai faʻamatalaga.
E te manaʻomia le matamata i le ogalaau ma toe amata faʻataʻitaʻiga galuega paʻu.
O le kiliki i luga o soʻo se sikuea, o le a matou vaʻai i gaioiga o loʻo avanoa mo i matou:
E mafai ona e ave ma fa'amama le pa'u. O lona uiga, ua galo ia i tatou ua i ai se mea ua le manuia iina, ma o le galuega lava e tasi o le a alu i le scheduler.
E manino lava o le faia o lenei mea i le isumu ma sikuea mumu uma e le o se tagata - e le o le mea lea tatou te faʻamoemoeina mai le Airflow. E masani lava, e iai a tatou auupega o le tele o faʻaumatiaga: Browse/Task Instances
Sei o tatou filifilia mea uma i le taimi e tasi ma toe seti i le zero, kiliki le mea saʻo:
A maeʻa ona faʻamamā, e faʻapea a matou taxis (ua leva ona latou faʻatali mo le faʻatulagaina e faʻatulagaina i latou):
So'oga, matau ma isi fesuiaiga
Ua oʻo i le taimi e vaʻai ai i le DAG e sosoo ai, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Pe na faia e tagata uma se fa'afouga lipoti? O ia foi lenei: o loʻo i ai se lisi o punaoa mai le mea e maua ai faʻamatalaga; o loʻo i ai se lisi e tuʻu ai; aua nei galo ona pu pe a tupu mea uma pe malepe (ia, e le o se mea e uiga ia i tatou, leai).
Seʻi o tatou toe suʻesuʻe le faila ma vaʻai i mea fou le manino:
from commons.operators import TelegramBotSendMessage - e leai se mea e taofia ai i matou mai le faia o a matou lava tagata faʻatautaia, lea na matou faʻaogaina e ala i le faia o se afifi laʻititi mo le auina atu o feʻau i le Unblocked. (O le a matou talanoa atili e uiga i lenei faʻalapotopotoga i lalo);
default_args={} - e mafai e le aso ona tufatufa fa'amatalaga tutusa i ana fa'alapotopotoga uma;
to='{{ var.value.all_the_kings_men }}' - fanua to o le a tatou le maua hardcoded, ae dynamically gaosia e faaaoga Jinja ma se fesuiaiga ma se lisi o imeli, lea ou te tuu ma le faaeteete i totonu. Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS - tulaga mo le amataina o le tagata fa'afoe. I la matou tulaga, o le tusi o le a lele atu i le pule pe a uma ona faʻalagolago manuia;
tg_bot_conn_id='tg_main' - finauga conn_id talia ID feso'ota'iga matou te faia i totonu Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - feʻau i Telegram o le a lele ese pe a iai ni galuega pa'ū;
task_concurrency=1 - matou te faʻasaina le faʻalauiloaina faʻatasi o le tele o faʻataʻitaʻiga o galuega o le tasi galuega. A leai, o le a tatou maua le faʻalauiloaina faʻatasi o nisi VerticaOperator (vaavaai i le laulau e tasi);
report_update >> [email, tg] - uma VerticaOperator fa'atasi i le lafoina o tusi ma fe'au, pei o lenei:
Ae talu ai e eseese tulaga fa'alauiloa a le au fa'asalalau, e na'o le tasi e galue. I le Tree View, o mea uma e foliga mai e itiiti ifo le vaʻaia:
O le a ou fai atu ni nai upu e uiga i macros ma a latou uo- fesuiaiga.
Macros o Jinja placeholders e mafai ona suitulaga faʻamatalaga aoga eseese i finauga a le aufaipisinisi. Mo se faʻataʻitaʻiga, pei o lenei:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} o le a fa'alautele atu i mea o lo'o i totonu o le fa'aliliuga fa'amatalaga execution_date i le faatulagaga YYYY-MM-DD: 2020-07-14. O le vaega pito sili ona lelei o le faʻaogaina o fesuiaiga o tala i se faʻataʻitaʻiga faʻapitoa (se sikuea i le Tree View), ma pe a toe amataina, o le a faʻalauteleina le faʻaogaina i tulaga tutusa.
E mafai ona va'aia fa'atatauga fa'atatau e fa'aaoga ai le fa'amau Fa'aliliuina i fa'ata'ita'iga o galuega ta'itasi. O le auala lenei o le galuega i le lafoina o se tusi:
Ma o lea i le galuega i le auina atu o se savali:
O lo'o maua se lisi atoa o macros fa'apipi'i mo le lomiga fou o lo'o maua iinei: fa'amatalaga macros
E le gata i lea, faatasi ai ma le fesoasoani a plugins, e mafai ona tatou faʻaalia a tatou lava macros, ae o se isi tala.
I le faaopoopo atu i mea na muai faʻamalamalamaina, e mafai ona tatou suitulaga i tau oa tatou fesuiaiga (ua uma ona ou faʻaogaina i le code i luga). Tatou fatu i totonu Admin/Variables lua mea:
na'o le fa'aoga o le ala i le ki mana'omia: {{ var.json.bot_config.bot.token }}.
O le a ou fai atu moni lava se upu e tasi ma faaali atu se ata e tasi e uiga i sootaga. O mea uma e tulagalua iinei: i luga o le itulau Admin/Connections matou te fatuina se fesoʻotaʻiga, faʻaopoopo a matou logins / passwords ma isi faʻamaufaʻailoga patino iina. Faapei o lea:
E mafai ona faʻailogaina upu faʻamaonia (sili atu ona maeʻa nai lo le faaletonu), pe mafai foi ona e tuʻua le ituaiga fesoʻotaʻiga (e pei ona ou faia mo tg_main) - o le mea moni o le lisi o ituaiga o loʻo faʻapipiʻiina i Airflow faʻataʻitaʻiga ma e le mafai ona faʻalauteleina e aunoa ma le oʻo i totonu o faʻamaumauga autu (afai faʻafuaseʻi ou te leʻi google se mea, faʻamolemole faʻasaʻo aʻu), ae leai se mea e taofia ai i matou mai le mauaina o faʻatagaga naʻo le igoa.
E mafai foi ona e faia ni feso'ota'iga i le igoa lava e tasi: i lenei tulaga, o le metotia BaseHook.get_connection(), lea e maua ai tatou sootaga i igoa, o le a tuuina atu fa'afuase'i mai le tele o igoa (e sili atu ona talafeagai le faia o Round Robin, ae seʻi o tatou tuʻuina atu i luga o le lotofuatiaifo o le au atinaʻe Airflow).
Fesuia'i ma Feso'ota'iga e mautinoa lava e manaia meafaigaluega, ae e taua le aua ne'i leiloa le paleni: o fea vaega o au tafega e te teuina i totonu o le code lava ia, ma o fea vaega e te tu'uina atu i le Airflow mo le teuina. I le tasi itu, vave suia le tau, mo se faʻataʻitaʻiga, pusa meli, e mafai ona faigofie e ala i le UI. I le isi itu, o le toe foʻi atu lea i le kiliki o le isumu, lea na matou (I) manaʻo e faʻaumatia.
O le galue ma feso'ota'iga o se tasi lea o galuega matau. I se tulaga lautele, Airflow matau o vaega ia mo le faʻafesoʻotaʻi i auaunaga a isi vaega ma faletusi. Faataitaiga, JiraHook o le a tatalaina se tagata fa'atau mo matou e fegalegaleai ma Jira (e mafai ona e fa'agasolo galuega i tua ma luma), ma le fesoasoani a SambaHook e mafai ona e tuleia se faila i le lotoifale i smb-manatu.
Fa'avasega le fa'agaioiga masani
Ma na matou latalata i le tilotilo i le auala na faia ai TelegramBotSendMessage
kote commons/operators.py fa'atasi ma le tagata fa'afoe moni:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
O iinei, pei o isi mea uma i le Airflow, e matua faigofie lava mea uma:
Tufaa mai BaseOperator, lea e fa'atino ai ni nai mea fa'apitoa ole Airflow (va'ai i lou taimi paganoa)
Fa'ailoa fanua template_fields, lea o le a suʻe ai e Jinja ni macros e faʻatautaia.
Fa'atulaga finauga sa'o mo __init__(), seti le faaletonu pe a tatau ai.
E lei galo foi ia i matou le amataga o le tuaa.
Tatala le matau talafeagai TelegramBotHookmaua mai ai se mea a le tagata o tausia.
Metotia ua sui (toe faauigaina). BaseOperator.execute(), lea o le a faʻafefeteina e Airfow pe a oʻo mai le taimi e faʻalauiloa ai le tagata faʻapipiʻi - i totonu o le a matou faʻatinoina ai le gaioiga autu, galo e saini i totonu. (Matou te ulufale i totonu, i le ala, i totonu stdout и stderr - O le a faʻalavelaveina e le ea mea uma, afifi matagofie, faʻaumatia pe a manaʻomia.)
Sei o tatou vaai po o le a le mea ua tatou maua commons/hooks.py. Le vaega muamua o le faila, ma le matau lava ia:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Ou te le iloa foi pe o le a le mea e faʻamatalaina iinei, o le a ou matauina mea taua:
Matou te mautofi, mafaufau e uiga i finauga - i le tele o tulaga o le a tasi: conn_id;
Fa'asili auala masani: Sa fa'atapula'aina a'u get_conn(), lea ou te maua ai le fesoʻotaʻiga laina i le igoa ma naʻo le mauaina o le vaega extra (o se fanua JSON lenei), lea na ou (e tusa ai ma aʻu lava faatonuga!) tuʻuina le Telegram bot faʻailoga: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Ou te fatuina se faataitaiga o la matou TelegramBot, tuuina atu i ai se faailoga patino.
Pau lava lena. E mafai ona e mauaina se tagata fa'atau mai se matau e fa'aaoga TelegramBotHook().clent poʻo TelegramBotHook().get_conn().
Ma le vaega lona lua o le faila, lea ou te faia ai se microwrapper mo le Telegram REST API, ina ia aua nei toso tutusa python-telegram-bot mo se tasi auala sendMessage.
O le auala saʻo o le faʻaopoopoina uma: TelegramBotSendMessage, TelegramBotHook, TelegramBot - i totonu o le faʻapipiʻi, tuʻu i totonu o se faleoloa lautele, ma tuʻuina atu i le Open Source.
A'o matou su'esu'eina nei mea uma, na mafai ona le manuia a matou lipoti fa'afouga ma lafo mai ia te a'u se fe'au sese i le alalaupapa. O le a ou siaki pe ua sese...
Sa i ai se mea na gau i la matou taifau! Pe le o le mea ea lena sa tatou faatalitalia? E sa'o lava!
E te sasaa atu?
E te lagona na ou misia se mea? E foliga mai na ia folafola e faʻafeiloaʻi faʻamatalaga mai le SQL Server i Vertica, ona ia ave lea ma alu ese mai le autu, le ulavale!
O lenei faʻalavelave na faʻamoemoeina, naʻo loʻu faʻamalamalamaina o ni faʻamatalaga mo oe. O lea e mafai ona e alu atili.
O le matou fuafuaga lenei:
Fai aso
Fausia galuega
Vaai i le matagofie o mea uma
Tofi numera o vasega e fa'atumu
Maua fa'amatalaga mai le SQL Server
Tu'u fa'amaumauga ile Vertica
Ao mai fuainumera
O lea la, ina ia faʻaleleia uma nei mea, sa ou faia se faʻaopoopoga itiiti i la matou docker-compose.yml:
Vertica e avea ma talimalo dwh fa'atasi ai ma fa'atonuga sili ona leaga,
tolu faʻataʻitaʻiga o le SQL Server,
matou te faʻatumu faʻamaumauga i le vaega mulimuli ma nisi faʻamatalaga (e leai se mea e vaʻai i totonu mssql_init.py!)
Matou te faʻalauiloaina mea lelei uma ma le fesoasoani a se faʻatonuga sili atu ona faigata nai lo le taimi mulimuli:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Le mea na faia e le matou vavega randomizer, e mafai ona e faʻaogaina le mea Data Profiling/Ad Hoc Query:
O le mea autu e le o le faʻaalia i tagata suʻesuʻe
fa'amatala auiliili ETL sauniga Ou te le faia, o mea uma e le taua iina: matou te faia se faʻavae, o loʻo i ai se faʻailoga i totonu, matou te afifiina mea uma i se pule faʻamatalaga, ma o lenei matou te faia lenei mea:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Ua oo mai le taimi aoina a matou faʻamatalaga mai a matou laulau e tasi ma le afa selau. Sei o tatou faia lenei mea ma le fesoasoani a laina sili ona le lelei:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Faatasi ai ma le fesoasoani a se matau matou te maua mai le Airflow pymssql-feso'ota'i
Sei o tatou sui se tapulaʻa i le tulaga o se aso i totonu o le talosaga - o le a lafoina i totonu o le galuega e le masini mamanu.
Fafaga le matou talosaga pandaso ai na te mauaina i tatou DataFrame - o le a aoga ia i tatou i le lumanaʻi.
O lo'o ou fa'aogaina le suitulaga {dt} nai lo se parakalafa talosaga %s e le ona o aʻu o se Pinocchio leaga, ae ona pandas le mafai ona taulimaina pymssql ma se'e le mea mulimuli params: Liste ui ina mana'o moni lava o ia tuple.
Ia maitauina foi o le tagata atiae pymssql na filifili e le toe lagolagoina o ia, ma ua oo i le taimi e alu ese ai pyodbc.
Sei o tatou vaʻai pe o le a le mea na faʻapipiʻiina e le Airflow finauga oa tatou galuega i:
Afai e leai se faʻamatalaga, e leai se aoga e faʻaauau ai. Ae e ese foi le mafaufau i le faatumuina o le manuia. Ae e le o se mea sese lea. A-ah-ah, o le a le mea e fai?! Ma o le mea lenei:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException ta'u atu i le Airflow e leai ni mea sese, ae matou te faamisi le galuega. O le a leai se sikuea lanu meamata pe mumu, ae piniki.
ID o le matou sauniga lolovaia (o le a ese mo galuega uma),
O se hash mai le punavai ma le faʻatonu ID - ina ia i totonu o le faʻamaumauga mulimuli (lea e sasaa ai mea uma i totonu o le laulau e tasi) matou te maua se ID faʻatonu tulaga ese.
O loʻo tumau pea le laasaga mulimuli: sasaa mea uma i Vertica. Ma, o le mea e ese ai, o se tasi o auala sili ona mataʻina ma lelei e fai ai lenei mea e ala i le CSV!
I luga o le faʻatau, matou te fatuina ma le lima le ipu faʻatatau. O iinei na ou faʻatagaina ai aʻu lava i se tamai masini:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
O lo'o ou fa'aaogaina VerticaOperator() Ou te fatuina se faʻamaumauga faʻamaumauga ma se laulau (pe afai latou te leʻi i ai, ioe). O le mea autu o le faʻatulagaina saʻo o faʻalagolago:
- Ia, - fai mai le tamai isumu, - e le o lea, i le taimi nei
Ua e talitonu o aʻu o le manu sili ona mataʻutia i le vaomatua?
Julia Donaldson, Le Gruffalo
Ou te manatu afai e i ai se tauvaga ma aʻu uo: o ai o le a vave faia ma faʻalauiloa se faiga ETL mai le amataga: latou ma a latou SSIS ma se isumu ma aʻu ma Airflow ... Ona matou faʻatusatusaina lea o le faigofie o le tausiga ... Oka, ou te manatu o le a e malilie o le a ou sasaina i latou i luma uma!
Afai e sili atu le ogaoga, o Apache Airflow - e ala i le faʻamatalaina o faiga i le tulaga o le polokalame code - na faia laʻu galuega tele sili atu le mafanafana ma le fiafia.
O lona faʻalauteleina e le faʻatapulaʻaina, e le gata i le faʻaogaina o plug-ins ma le predisposition i scalability, e te maua ai le avanoa e faʻaoga ai le Airflow i toetoe lava o soʻo se eria: e oʻo lava i le taamilosaga atoa o le aoina, saunia ma le faʻaogaina o faʻamatalaga, e oʻo lava i le faʻalauiloaina o rockets (i Mars, o vasega).
Vaega mulimuli, fa'amatalaga ma fa'amatalaga
Le salu ua matou aoina mo oe
start_date. Ioe, o lea ua leva ona avea ma meme i le lotoifale. E ala i le finauga autu a Doug start_date pasi uma. I se faapuupuuga, afai e te faʻamaonia i totonu start_date aso nei, ma schedule_interval - e tasi le aso, ona amata loa lea o le DAG taeao e leai se taimi muamua.
start_date = datetime(2020, 7, 7, 0, 1, 2)
Ma e le toe iai ni fa'afitauli.
O lo'o i ai se isi fa'alavelave fa'alavelave e feso'ota'i ma ia: Task is missing the start_date parameter, lea e masani ona taʻu mai ai ua galo ia te oe e fusifusia i le operator dag.
O mea uma i luga ole masini e tasi. Ioe, ma faʻavae (Airflow lava ia ma lo tatou faʻapipiʻiina), ma se upega tafaʻilagi, ma se faʻatulagaina, ma tagata faigaluega. Ma sa aoga foi. Ae i le aluga o taimi, o le numera o galuega mo auaunaga na faʻatupulaia, ma ina ua amata ona tali atu PostgreSQL i le faasino igoa i le 20 s nai lo le 5 ms, na matou ave ma ave ese.
LocalExecutor. Ioe, o loo tatou nonofo pea i luga, ma ua uma ona tatou oo i le faatausiusiuga o le to e le gata. LocalExecutor ua lava mo i matou i le taimi nei, ae o le taimi nei e faʻalautele ma le itiiti ifo ma le tasi le tagata faigaluega, ma e tatau ona matou galulue malosi e siitia atu i CeleryExecutor. Ma i le manatu i le mea moni e mafai ona e galue i luga o le masini e tasi, e leai se mea e taofia oe mai le faʻaaogaina o le Seleri e oʻo lava i luga o se 'auʻaunaga, lea "ioe, e le mafai lava ona alu i le gaosiga, faʻamaoni!"
Le fa'aaogaina mea faigaluega faufale:
fesootaiga e teu ai fa'amatalaga tautua,
SLA Misi e tali atu i galuega e leʻi manuia i le taimi,
xcom mo fefaʻatauaʻiga metadata (sa ou fai atu metafaʻamatalaga!) i le va o galuega.
Fa'aleagaina meli. Ia, o le a sa'u tala e fai atu? Na fa'atūina fa'aaliga mo le toe faia uma o galuega pa'u. O la'u galuega Gmail ua i ai> 90k imeli mai Airflow, ma o le upega o meli meli e musu e piki ma tape le sili atu i le 100 i le taimi.
Ina ia mafai ona tatou galulue atili ma o tatou ulu ae le o tatou lima, ua saunia e le Airflow mo i tatou lenei:
malolo API - o loʻo i ai pea le tulaga o le Experimental, lea e le taofia ai o ia mai le galue. Faatasi ai ma ia, e le gata e mafai ona e mauaina faʻamatalaga e uiga i aso ma galuega, ae faʻapea foi ona taofi / amata se aso, fatuina se DAG Run poʻo se vaitaele.
CLI - tele meafaigaluega o loʻo maua i le laina faʻatonu e le naʻo le faʻaogaina e ala i le WebUI, ae e masani ona toesea. Faataitaiga:
backfill mana'omia e toe amata ai galuega.
Mo se faʻataʻitaʻiga, na o mai le au suʻesuʻe ma fai mai: "Ma o oe, uo, e i ai le valea i faʻamaumauga mai ia Ianuari 1 i le 13! Fa'alelei, fa'aleleia, fa'aleleia, fa'alelei!" Ma o oe o se mea fiafia:
run, lea e mafai ai e oe ona faʻatino se tasi faʻataʻitaʻiga galuega, ma e oʻo lava i togi i luga o faʻalagolago uma. E le gata i lea, e mafai ona e faʻaogaina e ala i LocalExecutor, tusa lava pe iai sau fuifui Seleri.
E fai tutusa lava test, na'o fa'avae fo'i e lē tusia ai se mea.
connections fa'ataga le fa'atupu tele o feso'ota'iga mai le atigi.
python api - o se auala sili ona faigata e fegalegaleai ai, lea e faʻamoemoe mo plugins, ae le o le faʻafefe i totonu ma lima laiti. Ae o ai e taofia i tatou mai le o atu i ai /home/airflow/dags, tamoe ipython ma amata ona fai mea leaga? E mafai, mo se faʻataʻitaʻiga, auina atu i fafo soʻotaga uma ma le code lea:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Feso'ota'i ile fa'amaumauga ole Airflow. Ou te le fautuaina e tusi i ai, ae o le mauaina o galuega mo tulaga eseese e mafai ona sili atu le vave ma faigofie nai lo soʻo se API.
Seʻi tatou fai atu e le o a tatou galuega uma e le mafai, ae e mafai ona paʻu i nisi taimi, ma e masani lava. Ae o nai poloka poloka ua leva ona masalomia, ma e tatau ona siaki.
Fa'aeteete SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
mau
Ma o le mea moni, o sootaga muamua e sefulu mai le tuʻuina atu o Google o mea o loʻo i totonu o le Airflow folder mai aʻu faʻailoga.
Apache Airflow Documentation - ioe, e tatau ona tatou amata i le ofisa. pepa, ae o ai na te faitauina faatonuga?
Ea: Pe a mamao lau DAG i tua o le faʻasologa - faʻafefea ona faʻatoʻilaloina nisi "galue e pei ona faʻamoemoeina" i le faʻatulagaina, utaina faʻamaumauga leiloa ma faʻamuamua galuega.
O le Zen o le Python ma le Apache Airflow - fa'asinomaga DAG fa'ase'e, fa'asolo fa'amatalaga i galuega, toe fa'atatau i fa'alagolago, fa'apea fo'i e fa'ase'e galuega fa'alauiloa.
Ea: Fautuaga, Togafiti, ma Faiga Sili - e uiga i le faʻaaogaina default arguments и params i faʻataʻitaʻiga, faʻapea foʻi ma Fesuiaiga ma Fesoʻotaʻiga.