Tere, mina olen Dmitri Logvinenko – Vezeti ettevõtete grupi analüüsiosakonna andmeinsener.
Räägin teile suurepärasest tööriistast ETL protsesside arendamiseks - Apache Airflow. Kuid Airflow on nii mitmekülgne ja mitmetahuline, et peaksite seda lähemalt uurima ka siis, kui te pole andmevoogudega seotud, kuid teil on vajadus perioodiliselt mingeid protsesse käivitada ja nende täitmist jälgida.
Ja jah, ma mitte ainult ei ütle, vaid ka näitan: programmis on palju koodi, ekraanipilte ja soovitusi.

Mida tavaliselt näete, kui googeldate sõna Airflow / Wikimedia Commons
Sisukord
Sissejuhatus
Apache Airflow on täpselt nagu Django:
- kirjutatud pythonis
- seal on suurepärane administraatoripaneel,
- piiramatult laiendatav
- ainult parem, ja see tehti täiesti erinevatel eesmärkidel, nimelt (nagu kati ees on kirjutatud):
- ülesannete käitamine ja jälgimine piiramatul arvul masinatel (nii palju kui palju sellerit/kuberneteid ja teie südametunnistus lubavad)
- dünaamilise töövoo genereerimisega Pythoni koodist, mida on väga lihtne kirjutada ja mõista
- ja võimalus ühendada omavahel mis tahes andmebaase ja API-sid, kasutades nii valmiskomponente kui ka kodus valmistatud pluginaid (mis on äärmiselt lihtne).
Me kasutame Apache Airflow'i järgmiselt:
- kogume andmeid erinevatest allikatest (paljud SQL Serveri ja PostgreSQL-i eksemplarid, erinevad API-d koos rakenduste mõõdikutega, isegi 1C) DWH-s ja ODS-is (meil on Vertica ja Clickhouse).
- kui arenenud
cron, mis käivitab ODS-i andmete koondamise protsessid ja jälgib ka nende hooldust.
Kuni viimase ajani kattis meie vajadused üks väike server 32 tuuma ja 50 GB muutmäluga. Airflow puhul töötab see:
- rohkem 200 päeva (tegelikult töövood, millesse me ülesandeid toppisime),
- igas keskmiselt 70 ülesannet,
- see headus algab (ka keskmiselt) kord tunnis.
Ja sellest, kuidas me laienesime, kirjutan allpool, kuid nüüd määratleme überprobleemi, mille me lahendame:
Algupäraseid SQL-servereid on kolm, millest igaühel on 50 andmebaasi - vastavalt ühe projekti eksemplarid, neil on sama struktuur (peaaegu igal pool, mua-ha-ha), mis tähendab, et igal neist on Tellimuste tabel (õnneks tabel sellega nime saab igasse ärisse suruda). Me võtame andmed, lisades teenuseväljad (allikaserver, lähteandmebaas, ETL-i ülesande ID) ja viskame need naiivselt näiteks Verticasse.
Mine!
Põhiosa, praktiline (ja natuke teoreetiline)
Miks meie (ja teie)
Kui puud olid suured ja mina olin lihtne SQL-schik ühes Venemaa jaemüügis petsime ETL-i protsesse ehk andmevooge, kasutades kahte meile saadaolevat tööriista:
- Informaatika toitekeskus - äärmiselt laialivalguv süsteem, äärmiselt produktiivne, oma riistvaraga, oma versioonidega. Kasutasin jumal hoidku 1% selle võimalustest. Miks? Noh, esiteks avaldas see kuskil 380ndatest pärit liides meile vaimselt survet. Teiseks on see tööriist mõeldud ülimalt uhketeks protsessideks, raevukaks komponentide taaskasutamiseks ja muudeks väga tähtsateks ettevõtte nippideks. Selle kohta, et see maksab, nagu Airbus AXNUMX tiib aastas, ei ütle me midagi.
Ettevaatust, ekraanipilt võib alla 30-aastastele inimestele pisut haiget teha

- SQL Serveri integratsiooniserver - kasutasime seda seltsimeest oma projektisisestes voogudes. Noh, tegelikult: me kasutame juba SQL Serverit ja oleks kuidagi ebamõistlik mitte kasutada selle ETL-i tööriistu. Kõik selles on hea: nii liides on ilus kui ka edenemise aruanded ... Kuid see pole põhjus, miks me tarkvaratooteid armastame, oh, mitte selle pärast. Versioon see
dtsx(mis on salvestamisel segatud sõlmedega XML) saame, aga mis mõte sellel on? Kuidas oleks teha ülesannete pakett, mis lohistaks sadu tabeleid ühest serverist teise? Jah, mis sada, nimetissõrm kukub hiirenupule klõpsates kahekümnest tükist maha. Kuid see näeb kindlasti moodsam välja:
Kindlasti otsisime väljapääse. Juhtum isegi peaaegu jõudis ise kirjutatud SSIS-i paketigeneraatorini ...
…ja siis leidis mind uus töökoht. Ja Apache Airflow möödus minust sellel.
Kui sain teada, et ETL-i protsesside kirjeldused on lihtne Pythoni kood, siis ma lihtsalt ei tantsinud rõõmust. Nii sai andmevooge versioonitud ja diferentseeritud ning ühe struktuuriga tabelite valamine sadadest andmebaasidest ühte sihtmärki sai Pythoni koodi asjaks pooleteise või kahe 13-tollise ekraaniga.
Klastri kokkupanek
Ärgem korraldagem täiesti lasteaeda ja ärgem rääkigem siin täiesti ilmsetest asjadest, nagu Airflow installimine, teie valitud andmebaas, seller ja muud dokkides kirjeldatud juhtumid.
Et saaksime kohe katseid alustada, visandasin docker-compose.yml milles:
- Tõstame tegelikult Õhuvool: ajakava, veebiserver. Flower hakkab seal ka keerlema, et jälgida selleri ülesandeid (sest see on juba sisse lükatud
apache/airflow:1.10.10-python3.7, aga meil pole selle vastu midagi) - PostgreSQL, kuhu Airflow kirjutab oma teenuseteabe (plaanija andmed, täitmisstatistika jne) ja Selery märgib lõpetatud ülesanded;
- Redis, mis tegutseb selleri tööülesannete vahendajana;
- Selleri töötaja, mis hakkab tegelema ülesannete otsese täitmisega.
- Kausta
./dagslisame oma failid dagsi kirjeldusega. Need korjatakse üles lennult, nii et pärast iga aevastamist pole vaja kogu virnaga žongleerida.
Mõnes kohas pole näidetes olev kood lõpuni välja toodud (et mitte teksti segamini ajada), kuid kuskil muudetakse seda protsessi käigus. Täielikud töötavad koodinäited leiate hoidlast .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerMärkused:
- Kompositsiooni kokkupanemisel toetusin suuresti tuntud kuvandile - vaadake seda kindlasti. Võib-olla polegi midagi muud oma ellu vaja.
- Kõik õhuvoolu seaded on saadaval mitte ainult läbi
airflow.cfg, aga ka keskkonnamuutujate kaudu (tänu arendajatele), mida ma pahatahtlikult ära kasutasin. - Loomulikult pole see tootmisvalmis: ma ei pannud meelega konteineritele südamelööke, ei vaevanud end turvalisusega. Aga tegin meie katsetajatele sobiva miinimumi.
- Pange tähele, et:
- Kaust dag peab olema juurdepääsetav nii planeerijale kui ka töötajatele.
- Sama kehtib kõigi kolmandate osapoolte teekide kohta – need kõik peavad olema installitud ajakava ja töötajatega masinatesse.
Noh, nüüd on kõik lihtne:
$ docker-compose up --scale worker=3Kui kõik on tõusnud, saate vaadata veebiliideseid:
- Õhuvool:
- Lill:
Põhimõisted
Kui te kõigist nendest "päevadest" midagi aru ei saanud, siis siin on lühike sõnastik:
- Scheduler - Airflow'i kõige olulisem onu, kes kontrollib, et robotid töötaksid kõvasti, mitte inimene: jälgib ajakava, värskendab päevakirju, käivitab ülesandeid.
Üldiselt oli vanemates versioonides tal probleeme mäluga (ei, mitte amneesia, vaid lekked) ja pärandparameeter jäi isegi konfiguratsioonidesse.
run_duration— selle taaskäivitamise intervall. Aga nüüd on kõik hästi. - DAG (teise nimega "dag") - "suunatud atsükliline graafik", kuid selline määratlus ütleb vähestele inimestele, kuid tegelikult on see üksteisega suhtlevate ülesannete konteiner (vt allpool) või SSIS-i paketi ja Informatica töövoo analoog .
Lisaks dagidele võib siiski olla ka alamdage, kuid suure tõenäosusega me nendeni ei jõua.
- DAG Jooks - initsialiseeritud dag, millele on määratud oma
execution_date. Sama dag'i dagranid võivad töötada paralleelselt (muidugi, kui olete oma ülesanded idempotentseks muutnud). - operaator on kooditükid, mis vastutavad konkreetse toimingu sooritamise eest. Operaatoreid on kolme tüüpi:
- tegevusnagu meie lemmik
PythonOperator, mis suudab käivitada mis tahes (kehtiva) Pythoni koodi; - üle, mis transpordivad andmeid ühest kohast teise, näiteks
MsSqlToHiveTransfer; - andur teisest küljest võimaldab see reageerida või aeglustada dag edasist täitmist kuni sündmuse toimumiseni.
HttpSensorsaab määratud lõpp-punkti tõmmata ja kui soovitud vastus ootab, alustage edastamistGoogleCloudStorageToS3Operator. Uudishimulik meel küsib: "Miks? Kordusi saab ju otse operaatoris teha!” Ja siis, et mitte ummistada peatatud operaatoritega ülesannete kogumit. Andur käivitub, kontrollib ja sureb enne järgmist katset.
- tegevusnagu meie lemmik
- Ülesanne - deklareeritud operaatorid, olenemata tüübist ja lisatud dag, ülendatakse ülesande auastmele.
- ülesande eksemplar - kui üldplaneerija otsustas, et on aeg ülesanded esitajatele-töölistele lahingusse saata (kohe kohapeal, kui kasutame
LocalExecutorvõi kaugsõlme puhulCeleryExecutor), määrab see neile konteksti (st muutujate komplekti – täitmisparameetrid), laiendab käsu- või päringumalle ja koondab need kokku.
Loome ülesandeid
Esmalt visandame oma dougi üldise skeemi ja seejärel sukeldume üha enam detailidesse, sest rakendame mõningaid mittetriviaalseid lahendusi.
Nii et kõige lihtsamal kujul näeb selline dag välja järgmine:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Selgitame:
- Esiteks impordime vajalikud libid ja midagi muud;
sql_server_ds- KasList[namedtuple[str, str]]Airflow Connectionsi ühenduste nimedega ja andmebaasidega, kust me oma plaadi võtame;dag- meie päevakuulutus, mis peab tingimata sees olemaglobals(), muidu Airflow seda ei leia. Doug peab ka ütlema:- mis ta nimi on
orders- see nimi kuvatakse seejärel veebiliideses, - et ta töötab kaheksanda juuli südaööst,
- ja see peaks käima umbes iga 6 tunni järel (kõvade meeste jaoks siin selle asemel
timedelta()vastuvõetavcron- rida0 0 0/6 ? * * *, vähem lahedatele - väljend nagu@daily);
- mis ta nimi on
workflow()teeb põhitöö ära, aga mitte praegu. Praegu lihtsalt heidame oma konteksti logisse.- Ja nüüd ülesannete loomise lihtne võlu:
- jookseme läbi oma allikate;
- initsialiseerida
PythonOperator, mis hukkab meie mannekeeniworkflow(). Ärge unustage määrata ülesande kordumatut (dag-i sees) nime ja siduda dag ise. Lippprovide_contextomakorda lisab funktsiooni täiendavaid argumente, mille abil me hoolikalt kogume**context.
Praeguseks on see kõik. Mida me saime:
- uus dag veebiliideses,
- poolteist sada ülesannet, mida täidetakse paralleelselt (kui Airflow, Celery seaded ja serveri maht seda võimaldavad).
Noh, peaaegu sain aru.

Kes paigaldab sõltuvused?
Kogu selle asja lihtsustamiseks keerasin asja sisse docker-compose.yml töötlemine requirements.txt kõigil sõlmedel.
Nüüd on see kadunud:

Hallid ruudud on plaanija poolt töödeldud ülesannete eksemplarid.
Ootame veidi, ülesanded napsavad töölised:

Rohelised on loomulikult oma töö edukalt lõpetanud. Punased ei ole eriti edukad.
Muide, meie tootel pole kausta
./dags, masinate vahel ei ole sünkroonimist – kõik dagid peituvadgitmeie Gitlabis ja Gitlab CI jagab liitumisel masinatele värskendusimaster.
Natuke Lillest
Samal ajal kui töömehed meie lutte peksavad, meenutagem veel üht tööriista, mis võib meile midagi näidata – Lille.
Kõige esimene leht töötajate sõlmede kokkuvõtliku teabega:

Kõige ägedam leht töösse läinud ülesannetega:

Kõige igavam leht meie maakleri staatusega:

Eredaim leht on ülesannete olekugraafikute ja nende täitmise ajaga:

Laadime alakoormatud
Niisiis, kõik ülesanded on lahendatud, võite haavatud ära viia.

Ja haavatuid oli palju – ühel või teisel põhjusel. Airflow õige kasutamise korral näitavad just need ruudud, et andmed kindlasti kohale ei jõudnud.
Peate jälgima logi ja taaskäivitama langenud ülesannete eksemplare.
Klõpsates mis tahes ruudul, näeme meile saadaolevaid toiminguid:

Sa võid võtta ja teha Clear langenud. See tähendab, et unustame, et seal on midagi ebaõnnestunud, ja sama eksemplari ülesanne läheb planeerijasse.

Selge on see, et kõigi punaste ruutudega hiirega seda teha ei ole kuigi humaanne – seda me Airflow’lt ei oota. Loomulikult on meil massihävitusrelvi: Browse/Task Instances

Valime kõik korraga ja lähtestame nullile, klõpsake õiget üksust:

Peale puhastamist näevad meie taksod välja sellised (nad juba ootavad, et sõiduplaani koostaja need sõiduplaani koostaks):

Ühendused, konksud ja muud muutujad
On aeg vaadata järgmist DAG-i, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Kas kõik on kunagi aruannet värskendanud? See on jälle tema: seal on nimekiri allikatest, kust andmeid hankida; on nimekiri, kuhu panna; ärge unustage helinata, kui kõik juhtus või katki läks (noh, see ei puuduta meid, ei).
Vaatame faili uuesti läbi ja vaatame uusi ebaselgeid asju:
from commons.operators import TelegramBotSendMessage- miski ei takista meil oma operaatoreid tegemast, mida kasutasime ära, tehes Unblocked sõnumite saatmiseks väikese ümbrise. (Sellest operaatorist räägime lähemalt allpool);default_args={}- dag saab jagada samu argumente kõigile oma operaatoritele;to='{{ var.value.all_the_kings_men }}'- välitomeil ei ole kõvakodeeritud, vaid dünaamiliselt genereeritud, kasutades Jinjat ja muutujat koos e-kirjade loendiga, mille ma hoolikalt sisestasinAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— operaatori käivitamise tingimus. Meie puhul lendab kiri ülemustele alles siis, kui kõik sõltuvused on korda läinud edukalt;tg_bot_conn_id='tg_main'- argumendidconn_idaktsepteerige meie loodud ühenduse ID-sidAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- Telegrami sõnumid lendavad minema ainult siis, kui on langenud ülesandeid;task_concurrency=1- keelame ühe ülesande mitme ülesande eksemplari samaaegse käivitamise. Vastasel juhul käivitame korraga mituVerticaOperator(vaatab ühte lauda);report_update >> [email, tg]- kõikVerticaOperatorkoonduvad kirjade ja sõnumite saatmisel, näiteks järgmiselt:

Kuid kuna teavitajate operaatoritel on erinevad käivitamistingimused, töötab ainult üks. Puuvaates näeb kõik veidi vähem visuaalne:

Ma ütlen paar sõna selle kohta makrod ja nende sõbrad - muutujad.
Makrod on Jinja kohahoidjad, mis võivad operaatori argumentidesse mitmesuguse kasuliku teabe asendada. Näiteks nii:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} laieneb kontekstimuutuja sisule execution_date vormingus YYYY-MM-DD: 2020-07-14. Parim osa on see, et kontekstimuutujad naelutatakse konkreetse ülesande eksemplari külge (puuvaates ruut) ja taaskäivitamisel laienevad kohahoidjad samadele väärtustele.
Määratud väärtusi saab vaadata iga ülesande eksemplari nupu Renderdatud abil. Kirja saatmise ülesanne on järgmine:

Ja nii sõnumi saatmise ülesande juures:

Siit leiate uusima saadaoleva versiooni sisseehitatud makrode täieliku loendi:
Pealegi saame pistikprogrammide abil oma makrosid deklareerida, aga see on juba teine lugu.
Lisaks eelmääratletud asjadele saame asendada oma muutujate väärtused (kasutasin seda juba ülalolevas koodis). Loome sisse Admin/Variables paar asja:

Kõik, mida saate kasutada:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Väärtus võib olla skalaar või ka JSON. JSON-i puhul:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}kasutage lihtsalt soovitud võtme teed: {{ var.json.bot_config.bot.token }}.
Ütlen sõna otseses mõttes ühe sõna ja näitan selle kohta üht ekraanipilti ühendused. Siin on kõik elementaarne: lehel Admin/Connections loome ühenduse, lisame sinna oma sisselogimised/paroolid ja täpsemad parameetrid. Nagu nii:

Paroole saab krüpteerida (vaikimisi põhjalikumalt) või ühenduse tüübi välja jätta (nagu ma tegin tg_main) - tõsiasi on see, et tüüpide loend on Airflow mudelites kõvasti ühendatud ja seda ei saa laiendada ilma lähtekoodidesse sisenemata (kui ma äkki midagi googeldades ei leidnud, parandage mind), kuid miski ei takista meil krediite saamast. nimi.
Sama nimega saate luua ka mitu ühendust: antud juhul meetod BaseHook.get_connection(), mis toob meile nimepidi ühendused, annab juhuslik mitmelt nimekaimult (loogilisem oleks teha Round Robin, aga jätame selle Airflow arendajate südametunnistusele).
Muutujad ja ühendused on kindlasti lahedad tööriistad, kuid oluline on mitte kaotada tasakaalu: milliseid oma voogude osi salvestate koodi endasse ja millised osad annate Airflow'le talletamiseks. Ühest küljest võib kasutajaliidese kaudu olla mugav väärtust, näiteks postkasti, kiiresti muuta. Teisest küljest on see ikkagi tagasipöördumine hiireklõpsu juurde, millest me (mina) tahtsime vabaneda.
Töö seostega on üks ülesandeid konksud. Üldiselt on Airflow konksud punktid selle ühendamiseks kolmandate osapoolte teenuste ja raamatukogudega. Nt, JiraHook avab meile Jiraga suhtlemiseks kliendi (saate ülesandeid edasi-tagasi liigutada) ja abiga SambaHook saate kohaliku faili edasi lükata smb-punkt.
Kohandatud operaatori sõelumine
Ja jõudsime selle valmistamise uurimisele lähedale TelegramBotSendMessage
Kood commons/operators.py tegeliku operaatoriga:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Siin, nagu kõik muu Airflow'is, on kõik väga lihtne:
- Päritud alates
BaseOperator, mis rakendab üsna palju Airflow-spetsiifilisi asju (vaadake oma vaba aega) - Deklareeritud väljad
template_fields, milles Jinja otsib töötlemiseks makrosid. - Korraldati õiged argumendid poolt
__init__(), määrake vajadusel vaikeseaded. - Me ei unustanud ka esivanema initsialiseerimist.
- Avas vastava konksu
TelegramBotHooksai sellelt kliendiobjekti. - Alistatud (uuesti määratletud) meetod
BaseOperator.execute(), mida Airfow tõmbleb, kui saabub aeg operaatori käivitamiseks - selles teostame põhitoimingu, unustades sisse logida. (Logime sisse, muide, kohe sissestdoutиstderr- Õhuvool püüab kõik kinni, mähib selle ilusti kokku, vajadusel lagundab.)
Vaatame, mis meil on commons/hooks.py. Faili esimene osa koos konksuga:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientMa isegi ei tea, mida siin seletada, märgin lihtsalt olulised punktid:
- Me pärime, mõtleme argumentidele - enamikul juhtudel on see üks:
conn_id; - Standardmeetodite ülekaal: piirasin ennast
get_conn(), milles saan ühenduse parameetrid nime järgi ja saan lihtsalt jaotiseextra(see on JSON-väli), kuhu panin (vastavalt enda juhistele!) Telegrami roboti märgi:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Loon meie näite
TelegramBot, andes sellele konkreetse märgi.
See on kõik. Kliendi saad konksust kasutades TelegramBotHook().clent või TelegramBotHook().get_conn().
Ja faili teine osa, milles teen Telegram REST API jaoks mikroümbrise, et mitte sama lohistada ühe meetodi jaoks sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Õige viis on see kõik kokku liita:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- lisage pistikprogrammi avalikku hoidlasse ja andke see avatud lähtekoodile.
Sel ajal, kui me seda kõike uurisime, õnnestus meie aruannete värskendused edukalt ebaõnnestuda ja saatsid mulle kanalis veateate. Ma lähen kontrollin, kas see on valesti...

Midagi läks meie dogis katki! Kas see pole see, mida me ootasime? Täpselt nii!
Kas kavatsete valada?
Kas sa tunned, et jäin millestki ilma? Tundub, et ta lubas andmed SQL Serverist Verticasse üle kanda ja siis võttis kätte ja läks teemast kõrvale, kelm!
See julmus oli tahtlik, ma lihtsalt pidin teie jaoks terminoloogiat lahti mõtestama. Nüüd saate minna kaugemale.
Meie plaan oli selline:
- Do dag
- Loo ülesandeid
- Vaadake, kui ilus kõik on
- Määrake täidistele seansinumbrid
- Hankige andmeid SQL Serverist
- Sisestage andmed Verticasse
- Koguge statistikat
Nii et selle kõige käivitamiseks tegin meie jaoks väikese täienduse docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pySeal tõstame:
- Vertica hostina
dwhkõige vaikeseadetega, - kolm SQL Serveri eksemplari,
- täidame viimases olevad andmebaasid teatud andmetega (ärge mingil juhul uurige
mssql_init.py!)
Käivitame kõik hea eelmisest veidi keerulisema käsu abil:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Seda, mida meie imeline randomiseerija genereeris, saate seda üksust kasutada Data Profiling/Ad Hoc Query:

Peaasi, et seda analüütikutele ei näidata
täpsustada ETL seansid Ma ei tee seda, kõik on seal triviaalne: teeme aluse, selles on silt, mähime kõik kontekstihalduriga ja nüüd teeme seda:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passAeg on tulnud koguda meie andmeid meie poolteisesajast lauast. Teeme seda väga tagasihoidlike ridade abil:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Konksu abil saame Airflowst
pymssql- ühendada - Asendame päringus piirangu kuupäeva kujul – mallimootor viskab selle funktsiooni.
- Toidame meie palvet
pandaskes meid saabDataFrame- see on meile tulevikus kasulik.
Ma kasutan asendust
{dt}päringu parameetri asemel%smitte sellepärast, et ma oleks kuri Pinocchio, vaid sellepärastpandasei saa hakkamapymssqlja libiseb viimaseparams: Listkuigi ta tõesti tahabtuple.
Pange tähele ka seda, et arendajapymssqlotsustas teda enam mitte toetada ja on aeg välja kolidapyodbc.
Vaatame, millega Airflow meie funktsioonide argumente toppis:

Kui andmeid pole, siis pole mõtet ka jätkata. Kuid imelik on ka täidist õnnestunuks pidada. Kuid see pole viga. A-ah-ah, mida teha?! Ja siin on see, mis:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException ütleb Airflow'le, et vigu pole, kuid jätame ülesande vahele. Liidesel ei ole rohelist ega punast ruutu, vaid roosa.
Loosime oma andmed mitu veergu:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Nimelt
- Andmebaas, kust tellimusi võtsime,
- Meie üleujutuse seansi ID (see on erinev iga ülesande jaoks),
- Lähte ja tellimuse ID räsi – et lõplikus andmebaasis (kus kõik ühte tabelisse valatakse) oleks meil kordumatu tellimuse ID.
Jääb eelviimane samm: valage kõik Verticasse. Kummalisel kombel on üks silmapaistvamaid ja tõhusamaid viise seda teha CSV kaudu!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Valmistame spetsiaalse vastuvõtja
StringIO. pandaspaneb lahkesti meieDataFramenaguCSV- read.- Avame konksuga ühenduse meie lemmik Verticaga.
- Ja nüüd abiga
copy()saatke meie andmed otse Vertikale!
Võtame juhilt, mitu rida täitusid, ja ütleme seansihaldurile, et kõik on korras:
session.loaded_rows = cursor.rowcount
session.successful = TrueSee on kõik.
Müügil loome sihtplaadi käsitsi. Siin lubasin endale väikese masina:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)ma kasutan
VerticaOperator()Loon andmebaasi skeemi ja tabeli (kui neid muidugi veel ei ole). Peaasi on sõltuvused õigesti korraldada:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadKokkuvõtteks
- Noh, - ütles hiireke, - kas pole nüüd
Kas olete veendunud, et ma olen metsa kõige kohutavam loom?
Julia Donaldson, Gruffalo
Ma arvan, et kui minu kolleegidega oleks konkurents: kes loob ja käivitab kiiresti nullist ETL-i protsessi: nemad oma SSIS-i ja hiirega ja mina Airflowga ... Ja siis võrdleksime ka hoolduse lihtsust ... Vau, ma arvan, et nõustute sellega, et võidan neid igal rindel!
Kui veidi tõsisemalt, siis Apache Airflow - kirjeldades protsesse programmikoodi kujul - tegi minu töö ära palju mugavam ja nauditavam.
Selle piiramatu laiendatavus nii pistikprogrammide kui ka skaleeritavuse osas annab teile võimaluse kasutada Airflow'i peaaegu igas valdkonnas: isegi kogu andmete kogumise, ettevalmistamise ja töötlemise tsüklis, isegi rakettide väljalaskmisel (Marsile, muidugi).
Osa lõpp, viide ja teave
Reha, mille oleme teile kogunud
start_date. Jah, see on juba kohalik meem. Dougi peamine argumentstart_datekõik läbivad. Lühidalt, kui täpsustatestart_datepraegune kuupäev jaschedule_interval- ühel päeval, siis DAG alustab homme mitte varem.start_date = datetime(2020, 7, 7, 0, 1, 2)Ja pole enam probleeme.
Sellega on seotud veel üks käitusaegne viga:
Task is missing the start_date parameter, mis viitab enamasti sellele, et unustasite dag operaatoriga siduda.- Kõik ühes masinas. Jah, ja alused (Airflow ise ja meie kate) ja veebiserver ja planeerija ja töötajad. Ja see isegi töötas. Kuid aja jooksul kasvas teenuste ülesannete arv ja kui PostgreSQL hakkas indeksile reageerima 20 ms asemel 5 sekundiga, võtsime selle ja viisime selle minema.
- Kohalik täitja. Jah, me istume endiselt sellel ja oleme juba jõudnud kuristiku servale. Meile on seni piisanud LocalExecutorist, kuid nüüd on aeg laieneda vähemalt ühe töötajaga ja peame CeleryExecutorisse kolimiseks kõvasti tööd tegema. Ja arvestades asjaolu, et saate sellega töötada ühes masinas, ei takista miski teid kasutamast sellerit isegi serveris, mis "loomulikult ei lähe kunagi tootmisse, ausalt!"
- Mittekasutamine sisseehitatud tööriistad:
- Side teenindusmandaatide salvestamiseks,
- SLA preilid vastata ülesannetele, mis ei õnnestunud õigel ajal,
- xcom metaandmete vahetamiseks (ma ütlesin metaandmed!) dag ülesannete vahel.
- Meili kuritarvitamine. No mis ma oskan öelda? Kõigi langenud ülesannete korduste kohta seati märguanded. Nüüd on minu töö Gmailil Airflow'st üle 90 100 meili ja veebimeilikork keeldub korraga üle XNUMX meili üles võtmast ja kustutamast.
Veel lõkse:
Rohkem automatiseerimistööriistu
Selleks, et saaksime veelgi rohkem töötada oma peaga, mitte kätega, on Airflow meile ette valmistanud järgmise:
- - tal on endiselt Eksperimendi staatus, mis ei takista tal töötamast. Sellega saate mitte ainult hankida teavet päevade ja ülesannete kohta, vaid ka peatada/käivitada päevakava, luua DAG Run või basseini.
- - käsurea kaudu on saadaval palju tööriistu, mida ei ole WebUI kaudu lihtsalt ebamugav kasutada, vaid need üldiselt puuduvad. Näiteks:
backfillvaja tegumijuhtumite taaskäivitamiseks.
Näiteks tulid analüütikud ja ütlesid: “Ja teil, seltsimees, on 1.-13. jaanuari andmetes lollus! Parandage, parandage, parandage, parandage!" Ja sa oled selline pliidiplaat:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Põhiteenus:
initdb,resetdb,upgradedb,checkdb. run, mis võimaldab teil käivitada ühe eksemplari ülesande ja isegi hinnata kõiki sõltuvusi. Lisaks saate seda käivitadaLocalExecutor, isegi kui teil on selleriklaster.- Teeb enam-vähem sama asja
test, ainult ka alustesse ei kirjuta midagi. connectionsvõimaldab massilist ühenduste loomist kestast.
- - üsna kõva suhtlemisviis, mis on mõeldud pistikprogrammidele, mitte kubisema selles väikeste kätega. Aga kes takistab meid minemast
/home/airflow/dags, jookseipythonja hakata jamama? Näiteks saate eksportida kõik ühendused järgmise koodiga:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Ühenduse loomine Airflow metaandmebaasiga. Ma ei soovita sellele kirjutada, kuid ülesande olekute hankimine erinevate konkreetsete mõõdikute jaoks võib olla palju kiirem ja lihtsam kui mis tahes API kasutamine.
Oletame, et kõik meie ülesanded ei ole idempotentsed, kuid mõnikord võivad need kukkuda ja see on normaalne. Aga paar ummistust on juba kahtlased ja oleks vaja kontrollida.
Ettevaatust SQL-iga!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
Viited
Ja loomulikult on Google'i väljastamise esimesed kümme linki minu järjehoidjate kausta Airflow sisu.
- - loomulikult peame alustama kontorist. dokumentatsiooni, aga kes juhiseid loeb?
- - Noh, lugege vähemalt loojate soovitusi.
- - päris algus: kasutajaliides piltides
- - põhimõisted on hästi kirjeldatud, kui (äkki!) Sa minust millestki aru ei saanud.
- - lühike juhend Airflow klastri seadistamiseks.
- - peaaegu sama huvitav artikkel, välja arvatud ehk rohkem formalismi ja vähem näiteid.
- — selleriga koostööst.
- - ülesannete idempotentsusest, kuupäeva asemel ID järgi laadimisest, teisendustest, failistruktuurist ja muust huvitavast.
- - ülesannete sõltuvused ja Trigger Rule, mida mainisin vaid möödaminnes.
- - kuidas ületada ajakavas mõningaid "töötab plaanipäraselt", laadida kaotatud andmeid ja seada ülesanded tähtsuse järjekorda.
- — kasulikud SQL-päringud Airflow metaandmetele.
- - seal on kasulik jaotis kohandatud anduri loomise kohta.
- — huvitav lühike märkus AWS for Data Science'i infrastruktuuri loomise kohta.
- - levinud vead (kui keegi ikka veel juhiseid ei loe).
- - naeratage, kuidas inimesed paroolide salvestamisel karku hoiavad, kuigi saate lihtsalt kasutada Ühendusi.
- - kaudne DAG-i edastamine, konteksti lisamine funktsioonidesse, jällegi sõltuvuste ja ka ülesannete käivitamise vahelejätmise kohta.
- - kasutamise kohta
default argumentsиparamsmallides, samuti muutujates ja ühendustes. - - lugu sellest, kuidas planeerija valmistub Airflow 2.0 jaoks.
- - veidi aegunud artikkel meie klastri juurutamise kohta
docker-compose. - - dünaamilised ülesanded, kasutades malle ja konteksti edastamist.
- — standardsed ja kohandatud teatised posti ja Slacki teel.
- - hargnemisülesanded, makrod ja XCom.
Ja artiklis kasutatud lingid:
- - mallides kasutamiseks saadaval kohahoidjad.
- — Levinud vead dagide loomisel.
- -
docker-composekatsetamiseks, silumiseks ja muuks. - — Pythoni ümbris Telegrami REST API jaoks.
Allikas: www.habr.com




