Kaixo, Dmitry Logvinenko naiz - Vezet enpresa taldeko Analitika Saileko datu-ingeniaria.
ETL prozesuak garatzeko tresna zoragarri baten berri emango dizut - Apache Airflow. Baina Airflow hain polifazetikoa eta polifazetikoa da, non gehiago aztertu beharko zenuke datu-fluxuetan parte hartzen ez baduzu ere, baina aldian-aldian edozein prozesu abiarazi eta haien exekuzioa kontrolatu beharra daukazu.
Eta bai, kontatu ez ezik, erakutsi ere egingo dut: programak kode, pantaila-argazki eta gomendio asko ditu.
Airflow / Wikimedia Commons hitza Googlen ikusten duzuna
- hobea baino ez, eta helburu guztiz ezberdinetarako egin zen, hots (kat aurretik idatzita dagoen bezala):
zereginak exekutatu eta kontrolatu makina kopuru mugagabean (Apioa / Kubernetes askok eta zure kontzientziari esker)
Python kodea idazteko eta ulertzeko oso erraza den lan-fluxuaren sorkuntza dinamikoarekin
eta edozein datu-base eta API elkarren artean konektatzeko gaitasuna prest egindako osagaiak eta etxeko pluginak erabiliz (oso erraza da).
Apache Airflow honela erabiltzen dugu:
Hainbat iturritako datuak biltzen ditugu (SQL Server eta PostgreSQL instantzia asko, hainbat API aplikazio-neurriekin, baita 1C ere) DWH eta ODSn (Vertica eta Clickhouse ditugu).
zein aurreratua cron, ODSn datuak finkatzeko prozesuak hasten dituena, eta horien mantentze-lanak ere kontrolatzen ditu.
Duela gutxi arte, gure beharrak 32 nukleo eta 50 GB RAM zituen zerbitzari txiki batek estali zituen. Airflow-en, honek funtzionatzen du:
gehiago 200 egun (benetan lan-fluxuak, zeinetan zereginak bete genituen),
bakoitzean batez beste 70 zeregin,
ontasun hau hasten da (batez bestekoa ere) orduan behin.
Eta nola zabaldu ginenari buruz, jarraian idatziko dut, baina orain definitu dezagun ebatziko dugun ΓΌber-arazoa:
Jatorrizko hiru SQL zerbitzari daude, bakoitzak 50 datu-baserekin - proiektu baten instantziak, hurrenez hurren, egitura bera dute (ia nonahi, mua-ha-ha), hau da, bakoitzak Eskaerak taula bat dauka (zorionez, taula horrekin). izena edozein negoziotara bultza daiteke). Datuak zerbitzu-eremuak gehituz hartzen ditugu (iturburu-zerbitzaria, iturburu-datu-basea, ETL ataza ID) eta inozoki botatzen ditugu, esate baterako, Verticara.
Goazen!
Zati nagusia, praktikoa (eta apur bat teorikoa)
Zergatik egiten dugu (eta zu)
Zuhaitzak handiak zirenean eta ni sinplea nintzenean SQL-schik Errusiako txikizkako merkataritza batean, ETL prozesuak edo datu-fluxuak iruzur egin genituen gure eskura ditugun bi tresna erabiliz:
Informatika Power Center - Oso zabaltzen den sistema, izugarri produktiboa, bere hardwarea, bere bertsioa duena. Jainkoak debekatu nuen bere gaitasunen %1 erabili nuen. Zergatik? Bada, lehenik eta behin, interfaze honek, nonbait, 380ko hamarkadakoa, mentalki presioa egiten zigun. Bigarrenik, tramankulu hau oso prozesu dotoreetarako, osagaien berrerabilpen amorratuetarako eta beste enpresa-trikimailu oso garrantzitsuak egiteko diseinatuta dago. Zer kostatzen den, Airbus AXNUMX / urteko hegala bezala, ez dugu ezer esango.
Kontuz, pantaila-argazkiak 30 urtetik beherako pertsonei min pixka bat egin diezaieke
SQL Server Integrazio Zerbitzaria - kamarada hau gure proiektu barneko fluxuetan erabili dugu. Beno, egia esan: dagoeneko erabiltzen dugu SQL Server, eta nolabait zentzugabea izango litzateke bere ETL tresnak ez erabiltzea. Bertan dena ona da: bai interfazea ederra da, bai aurrerapen txostenak... Baina ez da horregatik maite ditugu software produktuak, oh, ez horregatik. Bertsioa dtsx (zein da XML gordetzean nodoak nahastuta) egin dezakegu, baina zertarako? Zer moduz zerbitzari batetik bestera ehunka taula arrastatuko dituen zeregin pakete bat egitea? Bai, zer ehun, zure hatz erakuslea eroriko zaizu hogei zatitatik, saguaren botoian klik eginez. Baina zalantzarik gabe modanagoa dirudi:
Zalantzarik gabe, irteerak bilatu ditugu. Kasua ere ia norberak idatzitako SSIS pakete-sorgailu batera iritsi zen...
β¦ eta gero lan berri batek aurkitu ninduen. Eta Apache Airflow-ek aurreratu ninduen.
ETL prozesuen deskribapenak Python kode sinpleak direla jakin nuenean, ez nuen pozez dantza egin. Horrela datu-korronteak bertsionatu eta desberdintzen ziren, eta ehunka datu-baseetatik egitura bakarreko taulak helburu bakarrean sartzea Python kodearen kontua bihurtu zen 13 βpantaila eta erdi edo bitan.
Klusterra muntatzea
Ez dezagun antolatu guztiz haurtzaindegia, eta ez dezagun hitz egin hemen guztiz ageriko gauzei buruz, adibidez, Airflow instalatzea, aukeratutako datu-basea, Apioa eta kaietan deskribatutako beste kasu batzuei buruz.
Esperimentuak berehala has gaitezen, zirriborratu nuen docker-compose.yml zeinetan:
Goazen benetan Airflow: Programatzailea, Web zerbitzaria. Flower ere bertan biraka egingo du Apioaren zereginak kontrolatzeko (dagoeneko sartu delako apache/airflow:1.10.10-python3.7, baina ez zaigu axola)
PostgreSQL, zeinetan Airflow-ek bere zerbitzuaren informazioa idatziko du (planifikatzailearen datuak, exekuzio-estatistikak, etab.), eta Apioak amaitutako zereginak markatuko ditu;
Birbanaketa, Apioaren zereginen bitartekari gisa jardungo duena;
Apioaren langilea, zereginen zuzeneko exekuzioan arituko dena.
Karpetara ./dags gure fitxategiak gehituko ditugu dags-en deskribapenarekin. Hegan bertan jasoko dira, beraz, ez dago zertan pila osoa malabarerik egin doministiku bakoitzaren ondoren.
Zenbait tokitan, adibideetako kodea ez da guztiz erakusten (testua ez nahasteko), baina nonbait aldatu egiten da prozesuan. Lan-kodeen adibide osoak biltegian aurki daitezke https://github.com/dm-logv/airflow-tutorial.
Konposizioaren muntaketan, irudi ezagunean oinarritu nintzen neurri handi batean pukel/docker-airflow - Ziurtatu egiaztatzea. Agian ez duzu beste ezer behar zure bizitzan.
Aire-fluxuaren ezarpen guztiak erabilgarri daude bidez ez ezik airflow.cfg, baina baita ingurune aldagaien bidez ere (garatzaileei esker), gaiztoki aprobetxatu nituenak.
Jakina, ez dago produkziorako prest: nahita ez nituen ontzietan bihotz taupadak jarri, ez nuen segurtasunarekin trabarik jarri. Baina gure esperimentatzaileentzat egokia den minimoa egin nuen.
Apuntatu hori:
Dag karpetak eskuragarri egon behar du programatzaileak zein langileak.
Gauza bera gertatzen da hirugarrenen liburutegi guztietan: guztiak programatzaile eta langileak dituzten makinetan instalatu behar dira.
Beno, orain erraza da:
$ docker-compose up --scale worker=3
Dena igo ondoren, web interfazeei begiratu dezakezu:
"Dag" guzti hauetan ezer ulertzen ez bazenuen, hona hemen hiztegi labur bat:
Antolatzaileak - Airflow-eko osabarik garrantzitsuena, robotek gogor lan egiten dutela kontrolatzen duena, eta ez pertsona batek: ordutegia kontrolatzen du, eguneratzen ditu, lanak abiarazten ditu.
Oro har, bertsio zaharretan, memoriarekin arazoak izan zituen (ez, ez amnesia, baina filtrazioak) eta ondarearen parametroa konfigurazioetan ere geratu zen. run_duration - bere berrabiarazteko tartea. Baina orain dena ondo dago.
DAG (aka "dag") - "zuzendutako grafiko aziklikoa", baina definizio horrek jende gutxiri esango dio, baina, hain zuzen ere, elkarren artean elkarreragiten duten zereginetarako edukiontzi bat da (ikus behean) edo SSIS-en paketearen eta Informatica-n Workflow-en antzeko bat da. .
Dagez gain, oraindik ere azpidagak egon daitezke, baina ziurrenik ez gara iritsiko.
DAG Korrika - hasieratutako dag, berea esleitzen zaiona execution_date. Dag bereko Dagranek paraleloan lan egin dezakete (zure zereginak idempotent egin badituzu, noski).
Operator ekintza zehatz bat egiteaz arduratzen diren kode zatiak dira. Hiru operadore mota daude:
ekintzagure gogokoena bezala PythonOperator, edozein (baliozko) Python kode exekutatu dezakeena;
transferitzeko, datuak leku batetik bestera garraiatzen dituztenak, esate baterako, MsSqlToHiveTransfer;
sentsore bestetik, erreakzionatzeko edo dagaren exekuzio gehiago moteltzeko aukera emango dizu, gertaera bat gertatu arte. HttpSensor zehaztutako amaiera-puntua atera dezake, eta nahi den erantzuna zain dagoenean, hasi transferentzia GoogleCloudStorageToS3Operator. Buru jakintsu batek galdetuko du: βzergatik? Azken finean, errepikapenak zuzenean egin ditzakezu operadorean!Β». Eta gero, esekitako operadoreekin zereginen multzoa ez estaltzeko. Sentsorea abiarazi, egiaztatu eta hil egiten da hurrengo saiakeraren aurretik.
Task - deklaratutako operadoreak, mota edozein dela ere, eta dag-ari atxikitakoak zeregin mailara igotzen dira.
ataza instantzia - Antolatzaile orokorrak erabaki zuenean, zereginak borrokara bidaltzeko garaia zela interprete-langileei (bere bertan, erabiltzen badugu LocalExecutor edo urruneko nodo batera, kasuan CeleryExecutor), testuinguru bat esleitzen die (hau da, aldagai multzo bat - exekuzio-parametroak), komando edo kontsulta txantiloiak zabaltzen ditu eta biltzen ditu.
Zereginak sortzen ditugu
Lehenik eta behin, gure dougaren eskema orokorra azalduko dugu, eta gero eta xehetasunetan murgilduko gara gero eta gehiago, irtenbide ez-hutsak aplikatzen ditugulako.
Beraz, bere forma errazenean, honelako itxura izango du:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Asma dezagun:
Lehenik eta behin, beharrezko liburuak inportatzen ditugu eta beste zerbait;
sql_server_ds - Is List[namedtuple[str, str]] Airflow Connections-en konexioen izenekin eta gure plaka hartuko dugun datu-baseekin;
dag - gure dagaren iragarkia, derrigorrez egon behar duena globals(), bestela Airflow-ek ez du aurkituko. Dougek ere esan behar du:
zein da bere izena orders - izen hau web interfazean agertuko da,
uztailaren XNUMXko gauerditik aurrera lan egingo duela,
eta exekutatu beharko luke, gutxi gorabehera, 6 orduz behin (hemengo gogorrentzat ordez timedelta() onargarria cron-lerroa 0 0 0/6 ? * * *, hain cool-a bezalako esamolde bat @daily);
workflow() egingo du lan nagusia, baina ez orain. Oraingoz, gure testuingurua erregistrora botako dugu.
Eta orain zereginak sortzeko magia sinplea:
gure iturrietatik igarotzen gara;
hasieratu PythonOperator, gure manikia exekutatuko duena workflow(). Ez ahaztu zereginaren izen esklusibo bat (dag barruan) zehaztea eta dag bera lotzea. Bandera provide_context aldi berean, argumentu gehigarriak isuriko ditu funtziora, eta arretaz bilduko ditugu erabiliz **context.
Oraingoz, hori da dena. Lortu duguna:
dag berria web interfazean,
paraleloan gauzatuko diren ehun eta erdi zeregin (Aire-fluxua, Apioaren ezarpenak eta zerbitzariaren ahalmenak ahalbidetzen badu).
Beno, ia lortu.
Nork instalatuko ditu mendekotasunak?
Hori guztia errazteko, izorratu nuen docker-compose.yml prozesatzea requirements.txt nodo guztietan.
Orain desagertu da:
Karratu grisak programatzaileak prozesatutako ataza-instantziak dira.
Pixka bat itxaron dugu, zereginak langileek hartzen dituzte:
Berdeek, noski, ongi bukatu dute euren lana. Gorriek ez dute arrakasta handirik.
Bide batez, gure produktuan ez dago karpetarik ./dags, ez dago makinen arteko sinkronizaziorik - dag guztiak daude git gure Gitlab-en, eta Gitlab CI-k eguneraketak banatzen ditu makinetan bateratzean master.
Loreari buruz pixka bat
Langileak txupeteak kolpatzen ari diren bitartean, gogoan dezagun zerbait erakutsi diezagukeen beste tresna bat: Lorea.
Langile-nodoei buruzko laburpen informazioa duen lehen orrialdea:
Lanera joandako zereginak dituen orrialderik biziena:
Gure broker-aren egoera duen orrialde aspergarriena:
Orrialde distiratsuena zereginen egoera grafikoekin eta haien exekuzio denborarekin dago:
Azpikargatutakoa kargatzen dugu
Beraz, zeregin guztiak bete dira, zaurituak eraman ditzakezu.
Eta zauritu asko egon ziren, arrazoi bategatik edo besteagatik. Airflow-aren erabilera zuzenaren kasuan, lauki horiek beraiek adierazten dute datuak behin betiko ez zirela iritsi.
Erregistroa ikusi eta eroritako ataza-instantziak berrabiarazi behar dituzu.
Edozein laukitan klik eginez gero, eskura ditugun ekintzak ikusiko ditugu:
Hartu eta egin dezakezu Clear eroria. Hau da, ahaztu egiten zaigu hor zerbaitek huts egin duela, eta instantzia-zeregin bera programatzailera joango da.
Argi dago saguarekin lauki gorri guztiekin egitea ez dela oso gizatiarra; hori ez da Airflow-tik espero duguna. Jakina, suntsipen masiboko armak ditugu: Browse/Task Instances
Hautatu dena aldi berean eta berrezarri dezagun zerora, egin klik elementu egokian:
Garbitu ondoren, gure taxiek itxura hau dute (dagoeneko programatzaileak programatzeko zain daude):
Konexioak, kakoak eta bestelako aldagaiak
Hurrengo DAG-ari begiratzeko garaia da, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ΠΠΎΡΠΏΠΎΠ΄Π° Ρ ΠΎΡΠΎΡΠΈΠ΅, ΠΎΡΡΠ΅ΡΡ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Ρ"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ΠΠ°ΡΠ°Ρ, ΠΏΡΠΎΡΡΠΏΠ°ΠΉΡΡ, ΠΌΡ {{ dag.dag_id }} ΡΡΠΎΠ½ΠΈΠ»ΠΈ
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Denek egin al dute inoiz txostenen eguneratzerik? Hau da berriro ere: datuak nondik ateratzeko iturrien zerrenda dago; zerrenda bat dago non jarri; ez ahaztu dena gertatu edo hautsi zenean bozina jotzea (beno, hau ez da guri buruz, ez).
Azter ditzagun berriro fitxategia eta ikus ditzagun gauza ilun berriak:
from commons.operators import TelegramBotSendMessage - ezerk ez digu eragozten gure operadoreak egitea, eta hori aprobetxatu genuen Unblocked-era mezuak bidaltzeko bilgarri txiki bat eginez. (Gehiago hitz egingo dugu operadore honi buruz jarraian);
default_args={} - dag-ek argumentu berdinak banatu ditzake bere operadore guztiei;
to='{{ var.value.all_the_kings_men }}' -eremua to ez dugu gogor kodetua izango, baina dinamikoki sortuko dugu Jinja eta mezu elektronikoen zerrenda duen aldagai bat erabiliz, kontu handiz jarri dudana Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS β operadorea martxan jartzeko baldintza. Gure kasuan, eskutitzak nagusiengana hegan egingo du mendekotasun guztiak funtzionatu badira bakarrik arrakastaz;
tg_bot_conn_id='tg_main' - argudioak conn_id onartzen ditugun konexio IDak Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - Telegram-eko mezuak hutsik dauden zereginak badaude soilik ihes egingo dute;
task_concurrency=1 - Zeregin bateko hainbat ataza-instantzia aldi berean abiaraztea debekatzen dugu. Bestela, hainbat aldi berean abian jartzea lortuko dugu VerticaOperator (mahai bati begira);
report_update >> [email, tg] - guztiak VerticaOperator gutunak eta mezuak bidaltzean bat egiten dute, honela:
Baina jakinarazleen operadoreek abiarazteko baldintza desberdinak dituztenez, bakarrak funtzionatuko du. Zuhaitz ikuspegian, dena apur bat gutxiago ikusten da:
Hitz batzuk esango ditut makroak eta haien lagunak - aldagaiak.
Makroak Jinja leku-markak dira, eta hainbat informazio erabilgarria ordezka dezakete operadorearen argumentuetan. Adibidez, honela:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} testuinguru aldagaiaren edukietara zabalduko da execution_date formatuan YYYY-MM-DD: 2020-07-14. Zatirik onena da testuinguru-aldagaiak ataza-instantzia zehatz batean iltzatuta daudela (Zuhaitz ikuspegiko karratu batean), eta berrabiarazten denean, leku-markak balio berdinetara zabalduko dira.
Esleitutako balioak ataza-instantzia bakoitzean Errendatutako botoia erabiliz ikus daitezke. Hau da gutun bat bidaltzeko zeregina:
Eta horrela mezu bat bidaltzeko zereginean:
Eskuragarri dagoen azken bertsiorako integratutako makroen zerrenda osoa eskuragarri dago hemen: makro erreferentzia
Gainera, pluginen laguntzaz, geure makroak deklara ditzakegu, baina hori beste istorio bat da.
Aurrez zehaztutako gauzez gain, gure aldagaien balioak ordezka ditzakegu (dagoeneko erabili nuen goiko kodean). Sortu dezagun Admin/Variables gauza pare bat:
erabili nahi duzun gakorako bidea: {{ var.json.bot_config.bot.token }}.
Hitz bat esango dut literalki eta pantaila-argazki bat erakutsiko dut konexio. Hemen dena oinarrizkoa da: orrialdean Admin/Connections konexio bat sortzen dugu, gure saio-hasiera / pasahitzak eta parametro zehatzagoak gehitzen ditugu bertan. Horrela:
Pasahitzak enkriptatu daitezke (lehenetsia baino sakonago), edo konexio mota kanpoan utzi dezakezu (nik egin nuen bezala). tg_main) - Izan ere, mota-zerrenda Airflow ereduetan kablekatuta dago eta ezin dela zabaldu iturburu-kodeetan sartu gabe (bat-batean ez badut zerbait Googlen bilatu, zuzendu mesedez), baina ezerk ez digu utziko kredituak lortzea. izena.
Izen bereko hainbat konexio ere egin ditzakezu: kasu honetan, metodoa BaseHook.get_connection(), izenez konexioak lortzen dizkigu, emango du ausaz hainbat izenetatik (logikoagoa litzateke Round Robin egitea, baina utzi dezagun Airflow garatzaileen kontzientzia).
Aldagaiak eta konexioak tresna politak dira, zalantzarik gabe, baina garrantzitsua da oreka ez galtzea: zure fluxuen zein zati gordetzen dituzun kodean bertan, eta zein zati ematen dizkiozun Airflow-i biltegiratzeko. Alde batetik, komenigarria izan daiteke balioa azkar aldatzea, adibidez, posta-kutxa bat, UI bidez. Bestalde, hau oraindik saguaren klikaren itzulera da, bertatik kendu nahi genuen (nik).
Konexioekin lan egitea da zereginetako bat amuak. Oro har, Airflow amuak hirugarrenen zerbitzu eta liburutegietara konektatzeko puntuak dira. Adibidez, JiraHook Bezero bat irekiko digu Jirarekin elkarreragiteko (zereginak aurrera eta atzera mugi ditzakezu), eta honen laguntzaz SambaHook tokiko fitxategi bat bultza dezakezu smb-puntua.
Operadore pertsonalizatua analizatzen
Eta nola egiten den ikustera gerturatu ginen TelegramBotSendMessage
Code commons/operators.py benetako operadorearekin:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Hemen, Airflow-en beste guztia bezala, dena oso erraza da:
Oinordetzan hartua BaseOperator, Airflow-eko gauza espezifiko batzuk ezartzen dituena (begiratu zure aisialdia)
Deklaratutako eremuak template_fields, eta bertan Jinjak prozesatzeko makroak bilatuko ditu.
Argudio egokiak antolatu __init__(), ezarri lehenetsiak beharrezkoa denean.
Arbasoaren hasieratzeaz ere ez genuen ahaztu.
Dagokion amua ireki TelegramBotHookbezero objektu bat jaso zuen bertatik.
Gaingabetutako (birdefinitutako) metodoa BaseOperator.execute(), Airfow-ek operadorea abiarazteko unea iristean kiskatuko duena - bertan ekintza nagusia ezarriko dugu, saioa hasteko ahaztuta. (Saioa egiten dugu, bide batez, berehala stdout ΠΈ stderr - Aire-fluxuak dena atzemango du, ederki bilduko du, beharrezkoa den lekuan deskonposatuko du.)
Ea zer daukagun commons/hooks.py. Fitxategiaren lehen zatia, kakoarekin berarekin:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Hemen ere ez dakit zer azaldu, puntu garrantzitsuak ohartaraziko ditut:
Oinordetzan hartzen dugu, pentsatu argudioak - kasu gehienetan bat izango da: conn_id;
Metodo estandarrak gainditzea: mugatu dut get_conn(), zeinetan konexio-parametroak izenez jasotzen ditudan eta atala besterik ez dut lortzen extra (JSON eremua da hau), eta bertan (nire argibideen arabera!) Telegram bot tokena jarri nuen: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Gure instantzia bat sortzen dut TelegramBot, seinale zehatz bat emanez.
Hori da dena. Bezero bat kako batetik lor dezakezu erabiliz TelegramBotHook().clent edo TelegramBotHook().get_conn().
Eta fitxategiaren bigarren zatia, Telegram REST APIrako mikrobilgarri bat egiten dudana, berdina ez arrastatu python-telegram-bot metodo baterako sendMessage.
Modu zuzena dena gehitzea da: TelegramBotSendMessage, TelegramBotHook, TelegramBot - pluginean, biltegi publiko batean jarri eta Kode Irekiari eman.
Hori guztia aztertzen ari ginen bitartean, gure txostenen eguneratzeek huts egitea lortu zuten eta kanalean errore-mezu bat bidaltzea lortu zuten. Egiaztatuko dut ea gaizki dagoen...
Gure dogean zerbait hautsi da! Ez al da hori espero genuena? Zehazki!
Botatuko duzu?
Zerbait galdu dudala sentitzen al duzu? Badirudi SQL Server-etik Vertica-ra datuak transferitzeko agindu zuela, eta orduan hartu eta gaitik aldendu zen, zitala!
Ankerkeria hau nahita izan zen, terminologia bat deszifratu besterik ez nuen egin behar zuretzat. Orain harago joan zaitezke.
Gure plana hau zen:
Egin dag
Sortu zereginak
Ikusi zein ederra den dena
Esleitu saio-zenbakiak betegarriei
Lortu datuak SQL Server-etik
Jarri datuak Vertican
Bildu estatistikak
Beraz, hau guztia martxan jartzeko, gehigarri txiki bat egin nuen gurean docker-compose.yml:
Vertica ostalari gisa dwh ezarpen lehenetsienekin,
SQL Server-en hiru instantzia,
azken honetan datu-baseak datu batzuekin betetzen ditugu (inola ere ez begiratu mssql_init.py!)
On guztia abiarazten dugu azken aldian baino komando apur bat konplikatuago baten laguntzaz:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Gure miraria ausazkoak sortu duena, elementua erabil dezakezu Data Profiling/Ad Hoc Query:
Gauza nagusia analistei ez erakustea da
landu ETL saioak Ez dut egingo, hor dena hutsala da: oinarri bat egiten dugu, bertan seinale bat dago, dena testuinguru-kudeatzaile batekin biltzen dugu, eta orain hau egiten dugu:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
saioa.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Iritsi da ordua bildu gure datuak gure ehun eta erdi mahaietatik. Egin dezagun hau oso itxurarik gabeko lerroen laguntzaz:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Amu baten laguntzaz Airflow-tik lortzen dugu pymssql-konektatu
Ordez dezagun data baten formako murrizketa eskaeran - txantiloi motorrak funtziora botako du.
Gure eskaera elikatzen pandasnork lortuko gaitu DataFrame - baliagarria izango zaigu etorkizunean.
Ordezkapena erabiltzen ari naiz {dt} eskaera-parametro baten ordez %s ez Pinotxo gaiztoa naizelako, baizik eta pandas ezin maneiatu pymssql eta azkena irrist egiten du params: Listbenetan nahi duen arren tuple.
Kontuan izan ere garatzaileak pymssql gehiago ez onartzea erabaki zuen, eta alde egiteko garaia da pyodbc.
Ikus dezagun Airflow-ek gure funtzioen argudioak zerekin betetzen dituen:
Daturik ez badago, ez du balio jarraitzeak. Baina bitxia ere bada betetzea arrakastatsua izatea. Baina hau ez da akats bat. A-ah-ah, zer egin?! Eta hona hemen zer:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException Akatsik ez dagoela esango dio Airflow-i, baina zeregina saltatzen dugu. Interfazeak ez du karratu berdea edo gorria izango, arrosa baizik.
Salmentan, xede-plaka eskuz sortzen dugu. Hona hemen makina txiki bat onartu nuen:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
erabiltzen ari naiz VerticaOperator() Datu-basearen eskema eta taula bat sortzen ditut (ez badaude, noski). Gauza nagusia mendekotasunak ondo antolatzea da:
- Beno, - esan zuen sagutxoak, - ez al da, orain
Konbentzituta al zaude basoko animaliarik ikaragarriena naizela?
Julia Donaldson, Gruffalo
Uste dut nire lankideek eta biok lehiaketa bat izango bagenu: nork azkar sortu eta abiaraziko duen ETL prozesu bat hutsetik: haiek beren SSIS eta sagu batekin eta ni Airflow-arekin ... Eta gero mantentze-erraztasuna ere alderatuko genuke ... Aupa, uste dut ados egongo zarela alde guztietan irabaziko ditudala!
Pixka bat serioago bada, orduan Apache Airflow - prozesuak programa-kode moduan deskribatuz - nire lana egin zuen asko erosoagoa eta atseginagoa.
Bere hedagarritasun mugagabeak, bai plug-inen aldetik, bai eskalagarritasunerako joerari dagokionez, Airflow ia edozein arlotan erabiltzeko aukera ematen dizu: datuak biltzeko, prestatzeko eta prozesatzeko ziklo osoan ere, baita suziriak jaurtitzerakoan ere (Martera, ikastaroa).
Azken zatia, erreferentzia eta informazioa
Zuretzat bildu dugun arrastoa
start_date. Bai, hau dagoeneko tokiko meme bat da. Dougen argudio nagusiaren bidez start_date guztiak pasatzen dira. Laburbilduz, zehazten baduzu start_date uneko data, eta schedule_interval - Egun batean, orduan DAG bihar hasiko da ez lehenago.
start_date = datetime(2020, 7, 7, 0, 1, 2)
Eta arazorik ez.
Beste exekuzio-errore bat dago lotuta: Task is missing the start_date parameter, gehienetan dag operadoreari lotzea ahaztu zaizula adierazten duena.
Dena makina batean. Bai, eta oinarriak (Airflow bera eta gure estaldura), eta web zerbitzari bat, eta programatzaile bat, eta langileak. Eta funtzionatu ere egin zuen. Baina denborarekin, zerbitzuen ataza kopurua hazi egin zen, eta PostgreSQL indizeari 20 s-tan 5 ms beharrean XNUMX s-tan erantzuten hasi zenean, hartu eta eraman genuen.
LocalExecutor. Bai, oraindik eserita gaude, eta dagoeneko amildegiaren ertzera iritsi gara. LocalExecutor nahikoa izan zaigu orain arte, baina orain gutxienez langile batekin zabaltzeko garaia da, eta gogor lan egin beharko dugu CeleryExecutor-era pasatzeko. Eta makina batean lan egin dezakezula ikusita, ezerk ez zaitu gelditzen Apioa zerbitzari batean ere erabiltzea, "noski, ez da inoiz produkziora sartuko, zintzotasunez!"
Ez erabiltzea integratutako tresnak:
Konexioak zerbitzuaren kredentzialak gordetzeko,
SLA andereΓ±oak garaiz funtzionatu ez zuten zereginei erantzuteko,
xcom metadatuak trukatzeko (esan nuen metadatuak!) dag zereginen artean.
Posta gehiegikeria. Tira, zer esan dezaket? Alertak ezarri ziren eroritako zereginen errepikapen guztietarako. Orain nire laneko Gmail-ek Airflow-eko 90 mezu elektroniko ditu, eta web-postaren mukiak uko egiten dio aldi berean 100 baino gehiago jasotzeari eta ezabatzeari.
Gure buruekin eta ez eskuekin are gehiago lan egiteko, Airflow-ek hau prestatu digu:
REST API - oraindik Experimental estatusa du, eta horrek ez dio lan egitea eragozten. Harekin, dag eta zereginei buruzko informazioa lortu ez ezik, dag bat gelditu/hasi, DAG Run edo igerileku bat ere sortu dezakezu.
CLI - Komando-lerroaren bidez tresna asko daude eskuragarri, WebUI-ren bidez erabiltzeko deserosoak ez direnak, baina orokorrean ez daudenak. Adibidez:
backfill ataza-instantziak berrabiarazteko beharrezkoak.
Esaterako, analistak etorri ziren eta esan zuten: βEta zuk, kamarada, txorakeriak dituzu urtarrilaren 1etik 13ra bitarteko datuetan! Konpondu, konpondu, konpondu, konpondu!" Eta halako sukaldea zara:
Oinarrizko zerbitzua: initdb, resetdb, upgradedb, checkdb.
run, instantzia-zeregin bat exekutatzeko aukera ematen duena, eta baita mendekotasun guztietan puntuatzea ere. Gainera, bidez exekutatu dezakezu LocalExecutor, nahiz eta Apioaren multzoa izan.
Ia gauza bera egiten du test, soilik oinarrietan ere ez du ezer idazten.
connections shelletik konexioak masiboki sortzeko aukera ematen du.
Python APIa - Elkarreragiteko modu gogor samarra, pluginetarako pentsatuta dagoena, eta ez esku txikiekin ibiltzea. Baina norengana joatea galaraziko digu /home/airflow/dags, Korrika egin ipython eta nahasten hasi? Adibidez, konexio guztiak esporta ditzakezu kode honekin:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Airflow metadatubasera konektatzen. Ez dut idaztea gomendatzen, baina hainbat metrika zehatzetarako ataza-egoerak lortzea askoz azkarrago eta errazagoa izan daiteke APIren bidez baino.
Demagun gure zeregin guztiak ez direla idepotenteak, baina batzuetan erori daitezkeela, eta hori normala da. Baina blokeo batzuk susmagarriak dira dagoeneko, eta egiaztatu beharko litzateke.
Kontuz SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
Erreferentziak
Eta, jakina, Google-ren jaulkipeneko lehen hamar estekak nire laster-marketako Airflow karpetako edukiak dira.
Apache Airflow dokumentazioa - noski, bulegotik hasi behar dugu. dokumentazioa, baina nork irakurtzen ditu argibideak?
DAG idazketa praktika onak Apache Airflow-en - Zereginen inpotentziari buruz, IDaren arabera kargatzea dataren ordez, eraldaketa, fitxategien egitura eta beste gauza interesgarri batzuk.
Python eta Apache Airflow-en Zen - DAG birbidaltze inplizitua, funtzioak testuingurua botatzea, berriro mendekotasunei buruz, eta baita ataza abiarazteei buruz ere.