ndi m'badwo wosunthika wamayendedwe osavuta kulemba ndikumvetsetsa Python code
ndikutha kulumikiza nkhokwe zilizonse ndi ma API wina ndi mnzake pogwiritsa ntchito zida zonse zokonzeka komanso mapulagini opangidwa kunyumba (omwe ndi osavuta kwambiri).
Timagwiritsa ntchito Apache Airflow motere:
timasonkhanitsa deta kuchokera kuzinthu zosiyanasiyana (zambiri za SQL Server ndi PostgreSQL, ma API osiyanasiyana okhala ndi ma metrics ogwiritsira ntchito, ngakhale 1C) mu DWH ndi ODS (tili ndi Vertica ndi Clickhouse).
ubwino uwu umayamba (komanso pafupifupi) kamodzi pa ola.
Ndipo za momwe tidakulitsira, ndilemba pansipa, koma tsopano tiyeni tifotokozere vuto la ΓΌber lomwe tithane nalo:
Pali ma Server atatu oyambirira a SQL, iliyonse ili ndi nkhokwe 50 - zochitika za polojekiti imodzi, motero, ali ndi dongosolo lomwelo (pafupifupi kulikonse, mua-ha-ha), zomwe zikutanthauza kuti aliyense ali ndi tebulo la Orders (mwamwayi, tebulo ndi izo). dzina likhoza kukankhidwira mubizinesi iliyonse). Timatenga zidziwitsozo powonjezera magawo a ntchito (seva yoyambira, nkhokwe, ID yantchito ya ETL) ndikuziponya mosasamala, titi, Vertica.
SQL Server Integration Server - tidagwiritsa ntchito comrade iyi mumayendedwe athu amkati. Chabwino, kwenikweni: timagwiritsa ntchito kale SQL Server, ndipo zingakhale zopanda nzeru kuti tisagwiritse ntchito zida zake za ETL. Chilichonse chomwe chili mmenemo ndi chabwino: mawonekedwe onsewa ndi okongola, ndipo malipoti akupita patsogolo ... Koma sichifukwa chake timakonda mapulogalamu a mapulogalamu, o, osati izi. Sinthani izo dtsx (yomwe ndi XML yokhala ndi ma node osakanikirana) titha, koma ndi chiyani? Nanga bwanji kupanga phukusi lantchito lomwe lingakokere mazana a matebulo kuchokera pa seva imodzi kupita ku ina? Inde, zana lanji, chala chanu cholozera chidzagwa kuchokera ku zidutswa makumi awiri, ndikudina batani la mbewa. Koma zikuwoneka bwino kwambiri:
Ndithudi tinafunafuna njira zothetsera. Mlandu ngakhale pafupifupi adafika pa jenereta yodzilemba yokha ya SSIS ...
β¦ndipo ntchito ina inandipeza. Ndipo Apache Airflow idandipeza pamenepo.
Nditazindikira kuti mafotokozedwe a ETL ndi njira yosavuta ya Python, sindinavine mosangalala. Umu ndi momwe mitsinje ya data idasinthidwira ndikusiyana, ndikutsanulira matebulo okhala ndi mawonekedwe amodzi kuchokera pamasamba mazana ambiri kukhala chandamale imodzi idakhala nkhani ya Python pazithunzi chimodzi ndi theka kapena ziwiri 13 β.
Kusonkhanitsa gulu
Tiyeni tisamakonzere sukulu ya kindergarten, osalankhula za zinthu zodziwikiratu pano, monga kukhazikitsa Airflow, Nawonso achichepere omwe mwasankha, Selari ndi milandu ina yomwe ikufotokozedwa m'madoko.
Tiyeni tikweze Mayendedwe ampweya: Scheduler, Webserver. Maluwa adzakhalanso akuzungulira pamenepo kuti aziyang'anira ntchito za Selari (chifukwa adakankhidwira kale apache/airflow:1.10.10-python3.7, koma sitisamala)
PostgreSQL, momwe Airflow idzalemba zidziwitso zake zautumiki (ma data a scheduler, ziwerengero zakupha, etc.), ndipo Selari idzalemba ntchito zomwe zatsirizidwa;
Redis, yomwe idzachita ngati wogulitsa ntchito kwa Selari;
Pamsonkhano wa zolembazo, ndinadalira kwambiri chithunzi chodziwika bwino puckel/docker-airflow - onetsetsani kuti mwachiwona. Mwina simukusowa china chilichonse pamoyo wanu.
Zokonda zonse za Airflow zimapezeka osati kudzera airflow.cfg, komanso kudzera mumitundu yosiyanasiyana (zikomo kwa opanga), zomwe ndidapezerapo mwayi.
Mwachilengedwe, sizokonzekera kupanga: Sindinayike dala kugunda kwamtima pazotengera, sindinavutike ndi chitetezo. Koma ndinachita zochepa zoyenera kwa oyesera athu.
Zindikirani kuti:
Foda ya dag iyenera kupezeka kwa onse okonza mapulani komanso ogwira ntchito.
Zomwezo zimagwiranso ntchito ku malaibulale onse a chipani chachitatu - onse ayenera kukhazikitsidwa pamakina omwe ali ndi ndandanda ndi antchito.
Nthawi zambiri, m'matembenuzidwe akale, anali ndi vuto la kukumbukira (ayi, osati amnesia, koma kutayikira) ndipo cholowacho chinakhalabe m'makonzedwe. run_duration - nthawi yake yoyambiranso. Koma tsopano zonse zili bwino.
DAG (aka "dag") - "directed acyclic graph", koma kutanthauzira koteroko kudzauza anthu ochepa, koma kwenikweni ndi chidebe cha ntchito zomwe zimagwirizana (onani m'munsimu) kapena analogue ya Phukusi mu SSIS ndi Workflow mu Informatica .
Kuphatikiza pa ma dags, pangakhalebe ma subdags, koma mwina sitingafike kwa iwo.
Kuthamanga kwa DAG - dag yoyambira, yomwe imapatsidwa yake execution_date. Ma Dagrans a dag omwewo amatha kugwira ntchito limodzi (ngati mwapangitsa ntchito zanu kukhala zopanda ntchito, inde).
Woyendetsa ndi zidutswa za code zomwe zimagwira ntchito inayake. Pali mitundu itatu ya ogwira ntchito:
Chifukwa chake, mwanjira yake yosavuta, dag yotere idzawoneka motere:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Tiyeni tiwone izi:
Choyamba, ife kuitanitsa libs zofunika ndi chinthu china;
sql_server_ds Ndi List[namedtuple[str, str]] ndi mayina a maulumikizidwe ochokera ku Airflow Connections ndi nkhokwe zomwe tidzatengere mbale yathu;
dag - kulengeza kwa dag yathu, yomwe iyenera kukhalamo globals(), apo ayi Airflow siipeza. Doug nayenso ayenera kunena kuti:
dzina lake ndani orders - dzinali lidzawonekera pa intaneti,
kuti azigwira ntchito kuyambira pakati pausiku pa Julayi XNUMX,
Mwa njira, palibe chikwatu pa prod yathu ./dags, palibe kulunzanitsa pakati pa makina - dags zonse zagona git pa Gitlab yathu, ndipo Gitlab CI imagawira zosintha pamakina polumikizana master.
Pang'ono ndi Flower
Pamene ogwira ntchito akuphwanya ma pacifiers athu, tiyeni tikumbukire chida china chomwe chingatiwonetse chinachake - Flower.
Tsamba loyamba lomwe lili ndi chidziwitso chachidule pamanodi antchito:
from commons.operators import TelegramBotSendMessage - palibe chomwe chimatilepheretsa kupanga operekera athu, omwe tidatengerapo mwayi popanga kapu yaing'ono yotumiza mauthenga ku Unblocked. (Tilankhula zambiri za wogwiritsa ntchito uyu pansipa);
default_args={} - dag ikhoza kugawa zotsutsana zomwezo kwa onse ogwira nawo ntchito;
to='{{ var.value.all_the_kings_men }}' - munda to Sitidzakhala ndi ma hardcode, koma opangidwa mwamphamvu pogwiritsa ntchito Jinja ndi zosinthika ndi mndandanda wa maimelo, omwe ndidayikamo mosamala. Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS - chikhalidwe choyambitsa woyendetsa. Kwa ife, kalatayo idzawulukira kwa mabwana pokhapokha ngati zodalira zonse zatha bwino;
tg_bot_conn_id='tg_main' - mikangano conn_id vomerezani ma ID olumikizana omwe timapangamo Admin/Connections;
Koma popeza ogwiritsa ntchito azidziwitso ali ndi zosiyana zoyambira, ndi imodzi yokha yomwe ingagwire ntchito. Mu Tree View, chilichonse chikuwoneka chocheperako:
Ine ndinena mawu ochepa za zazikulu ndi anzawo - zosintha.
Macro ndi zoikira malo za Jinja zomwe zimatha kusintha zambiri zothandiza m'makambirano a ogwiritsa ntchito. Mwachitsanzo, monga chonchi:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} idzakula ku zomwe zili muzosintha execution_date mu mawonekedwe YYYY-MM-DD: 2020-07-14. Gawo labwino kwambiri ndilakuti zosintha zamakina zimakhomeredwa ku zochitika zinazake (mzere mu Tree View), ndipo ikayambiranso, oikira malowo amakula kuzinthu zomwezo.
Makhalidwe omwe adapatsidwa amatha kuwonedwa pogwiritsa ntchito batani la Rendered pazochitika zilizonse. Umu ndi momwe ntchito yotumizira kalata:
Ndilankhula mawu amodzi ndikuwonetsa chithunzi chimodzi cha malumikizidwe. Chilichonse ndi choyambirira apa: patsamba Admin/Connections timapanga kulumikizana, kuwonjezera ma logins / mapasiwedi athu ndi magawo ena enieni pamenepo. Ngati chonchi:
Mawu achinsinsi amatha kubisika (mochuluka kuposa osakhazikika), kapena mutha kusiya mtundu wolumikizira (monga momwe ndidachitira tg_main) - Chowonadi ndi chakuti mndandanda wa mitunduyo ndi wokhazikika mumitundu ya Airflow ndipo sungathe kukulitsidwa popanda kulowa m'mabuku oyambira (ngati mwadzidzidzi sindinagwiritse ntchito google, chonde ndikonzereni), koma palibe chomwe chingatilepheretse kulandira ngongole pokhapokha dzina.
Mukhozanso kupanga maulendo angapo ndi dzina lomwelo: pamenepa, njira BaseHook.get_connection(), zomwe zimatipangitsa kulumikizana ndi dzina, zidzapereka mwachisawawa kuchokera ku namesakes angapo (zingakhale zomveka kupanga Round Robin, koma tiyeni tisiye pa chikumbumtima cha opanga Airflow).
Zosintha ndi Malumikizidwe ndi zida zabwino, koma ndikofunikira kuti musataye malire: ndi magawo ati amayendedwe anu omwe mumasunga mu code yokha, ndi magawo ati omwe mumapereka ku Airflow kuti musungidwe. Kumbali imodzi, zitha kukhala zabwino kusintha mwachangu mtengo, mwachitsanzo, bokosi lamakalata, kudzera mu UI. Kumbali inayi, izi zikadali kubwereranso kugunda kwa mbewa, komwe ife (ine) timafuna kuchotsa.
Kugwira ntchito mogwirizana ndi chimodzi mwazofunikira mbedza. Nthawi zambiri, ma mbewa a Airflow ndi malo olumikizirana ndi mautumiki a chipani chachitatu ndi malaibulale. Mwachitsanzo, JiraHook adzatsegula kasitomala kuti tizilumikizana ndi Jira (mutha kusuntha ntchito mmbuyo ndi mtsogolo), komanso mothandizidwa ndi SambaHook mutha kukankhira fayilo yakumaloko smb-mfundo.
Tiyeni tiwone zomwe tili nazo commons/hooks.py. Gawo loyamba la fayilo, ndi mbedza yokha:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Kuposa njira zokhazikika: Ndinadzichepetsera ndekha get_conn(), momwe ndimapezera magawo olumikizirana ndi dzina ndikungotenga gawolo extra (uwu ndi gawo la JSON), momwe ine (malinga ndi malangizo anga!) ndinayika chizindikiro cha Telegraph bot: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Ndikuganiza ngati ine ndi anzanga tikhala ndi mpikisano: ndani adzalenga mwamsanga ndikuyambitsa ndondomeko ya ETL kuyambira pachiyambi: iwo ndi SSIS yawo ndi mbewa ndi ine ndi Airflow ... Wow, ndikuganiza kuti mukuvomera kuti ndiwamenya mbali zonse!
start_date. Inde, iyi ndi kale meme yakomweko. Kudzera mkangano waukulu wa Doug start_date zonse zimapita. Mwachidule, ngati mwatsimikiza start_date tsiku lapano, ndi schedule_interval - tsiku lina, ndiye DAG iyamba mawa palibe kale.
start_date = datetime(2020, 7, 7, 0, 1, 2)
Ndipo palibenso mavuto.
Palinso vuto lina la nthawi yoyendetsera ntchito lomwe likugwirizana nalo: Task is missing the start_date parameter, zomwe nthawi zambiri zimasonyeza kuti mwaiwala kumangiriza kwa dag operator.
Zonse pamakina amodzi. Inde, ndi maziko (Airflow yokha ndi zokutira zathu), ndi seva ya intaneti, ndi ndondomeko, ndi ogwira ntchito. Ndipo zinathandizanso. Koma patapita nthawi, chiwerengero cha ntchito za mautumiki chinakula, ndipo PostgreSQL itayamba kuyankha ndondomeko mu 20 s m'malo mwa 5 ms, tinayitenga ndikuyitenga.
LocalExecutor. Inde, tidakhalabe pamenepo, ndipo tabwera kale m'mphepete mwa phompho. LocalExecutor yatikwanira mpaka pano, koma tsopano ndi nthawi yoti tifutukule ndi wogwira ntchito m'modzi, ndipo tifunika kuyesetsa kusamukira ku CeleryExecutor. Ndipo poganizira kuti mutha kugwira nawo ntchito pamakina amodzi, palibe chomwe chimakulepheretsani kugwiritsa ntchito Selari ngakhale pa seva, zomwe "zachidziwikire, sizidzapanganso, moona mtima!"
Zosagwiritsa ntchito zida zomangidwa:
Kulumikizana kusunga zizindikiro za utumiki,
SLA Amaphonya kuyankha ntchito zomwe sizinachitike panthawi yake,
xcom pakusinthana kwa metadata (ndinatero metadata!) pakati pa ntchito za dag.
Kugwiritsa ntchito makalata molakwika. Chabwino, ndinganene chiyani? Zidziwitso zidakhazikitsidwa pazobwereza zonse za ntchito zomwe zidagwa. Tsopano ntchito yanga ya Gmail ili ndi maimelo a 90k ochokera ku Airflow, ndipo chinsinsi cha makalata apaintaneti chikukana kunyamula ndikuchotsa oposa 100 nthawi imodzi.
Kuti tigwire ntchito mochulukira ndi mitu yathu osati ndi manja athu, Airflow yatikonzera izi:
REST API - akadali ndi udindo wa Experimental, zomwe sizimamulepheretsa kugwira ntchito. Ndi izo, simungapeze zambiri zokhudza dags ndi ntchito, komanso kuyimitsa / kuyambitsa dag, pangani DAG Run kapena dziwe.
CLI - zida zambiri zimapezeka kudzera pamzere wolamula zomwe sizongovuta kugwiritsa ntchito kudzera pa WebUI, koma nthawi zambiri kulibe. Mwachitsanzo:
connections amalola kupanga misa yolumikizana kuchokera ku chipolopolo.
python api - njira yolimba yolumikizirana, yomwe imapangidwira mapulagini, osati kudzaza ndi manja pang'ono. Koma ndani atiletse kuti tisapite /home/airflow/dags, thamanga ipython ndikuyamba kusokoneza? Mukhoza, mwachitsanzo, kutumiza mauthenga onse ndi code iyi:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Kulumikiza ku metadatabase ya Airflow. Sindikulimbikitsani kulembera, koma kupeza magawo azinthu zosiyanasiyana kumatha kukhala kwachangu komanso kosavuta kuposa kugwiritsa ntchito ma API aliwonse.
Tinene kuti si ntchito zathu zonse zomwe zili zopanda ntchito, koma nthawi zina zimatha kugwa, ndipo izi ndizabwinobwino. Koma ma blockages ochepa amakayikira kale, ndipo pangafunike kuyang'ana.
Chenjerani ndi SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
powatsimikizira
Ndipo zowonadi, maulalo khumi oyamba kuchokera pakutulutsidwa kwa Google ndi zomwe zili mufoda ya Airflow kuchokera ku ma bookmark anga.