Hallo, ek is Dmitry Logvinenko - Data-ingenieur van die Analytics-afdeling van die Vezet-groep van maatskappye.
Ek sal jou vertel van 'n wonderlike hulpmiddel vir die ontwikkeling van ETL-prosesse - Apache Airflow. Maar Airflow is so veelsydig en veelsydig dat jy dit van nader moet bekyk, selfs al is jy nie by datavloei betrokke nie, maar het jy 'n behoefte om periodiek enige prosesse te begin en die uitvoering daarvan te monitor.
En ja, ek sal nie net vertel nie, maar ook wys: die program het baie kode, skermkiekies en aanbevelings.

Wat jy gewoonlik sien as jy die woord Airflow / Wikimedia Commons google
Inhoudsopgawe
Inleiding
Apache Airflow is net soos Django:
- in luislang geskryf
- daar is 'n wonderlike administrasiepaneel,
- onbepaald uitbrei
- net beter, en dit is gemaak vir heeltemal ander doeleindes, naamlik (soos dit voor die kata geskryf is):
- uitvoer en monitering van take op 'n onbeperkte aantal masjiene (soos baie Seldery / Kubernetes en jou gewete jou sal toelaat)
- met dinamiese werkvloeigenerering van baie maklik om Python-kode te skryf en te verstaan
- en die vermoë om enige databasisse en API's met mekaar te verbind deur beide klaargemaakte komponente en tuisgemaakte plugins (wat uiters eenvoudig is).
Ons gebruik Apache Airflow soos volg:
- ons samel data van verskeie bronne in (baie SQL Server- en PostgreSQL-gevalle, verskeie API's met toepassingsstatistieke, selfs 1C) in DWH en ODS (ons het Vertica en Clickhouse).
- hoe gevorderd
cron, wat die datakonsolidasieprosesse op die ODS begin, en ook hul instandhouding monitor.
Tot onlangs was ons behoeftes gedek deur een klein bediener met 32 kerne en 50 GB RAM. In Airflow werk dit:
- meer 200 dae (eintlik werkstrome, waarin ons take gevul het),
- in elk gemiddeld 70 take,
- hierdie goedheid begin (ook gemiddeld) een keer per uur.
En oor hoe ons uitgebrei het, sal ek hieronder skryf, maar kom ons definieer nou die über-probleem wat ons sal oplos:
Daar is drie oorspronklike SQL Servers, elk met 50 databasisse - gevalle van een projek, onderskeidelik, hulle het dieselfde struktuur (byna oral, mua-ha-ha), wat beteken dat elkeen 'n Orders-tabel het (gelukkig 'n tabel met daardie naam kan in enige besigheid ingedruk word). Ons neem die data deur diensvelde by te voeg (bronbediener, brondatabasis, ETL-taak-ID) en gooi dit naïef in, sê, Vertica.
Kom ons gaan!
Die hoofgedeelte, prakties (en 'n bietjie teoreties)
Hoekom doen ons (en jy)
Toe die bome groot was en ek eenvoudig was SQL-schik in een Russiese kleinhandel, ons het ETL-prosesse, oftewel datavloei, bedrieg deur twee gereedskap wat vir ons beskikbaar is:
- Informatica Power Center - 'n uiters verspreide stelsel, uiters produktief, met sy eie hardeware, sy eie weergawe. Ek het God verhoede 1% van sy vermoëns gebruik. Hoekom? Wel, eerstens het hierdie koppelvlak, iewers uit die 380's, geestelik druk op ons geplaas. Tweedens, hierdie kontrepsie is ontwerp vir uiters spoggerige prosesse, woedende komponent-hergebruik en ander baie-belangrike ondernemingstruuks. Oor wat dit kos, soos die vlerk van die Airbus AXNUMX / jaar, sal ons niks sê nie.
Pasop, 'n kiekie kan mense onder 30 'n bietjie seermaak

- SQL Server Integrasie Bediener - ons het hierdie kameraad in ons intra-projekvloei gebruik. Wel, in werklikheid: ons gebruik reeds SQL Server, en dit sal op een of ander manier onredelik wees om nie sy ETL-nutsmiddels te gebruik nie. Alles daarin is goed: beide die koppelvlak is pragtig en die vorderingsverslae ... Maar dit is nie hoekom ons van sagtewareprodukte hou nie, o, nie hiervoor nie. Weergawe dit
dtsx(wat XML is met nodes wat op stoor geskommel is) ons kan, maar wat is die punt? Hoe gaan dit met die maak van 'n taakpakket wat honderde tafels van een bediener na 'n ander sal sleep? Ja, wat 'n honderd, jou wysvinger sal van twintig stukke afval, deur op die muisknoppie te klik. Maar dit lyk beslis meer modieus:
Ons het beslis na uitweg gesoek. Geval selfs byna het by 'n selfgeskrewe SSIS-pakketgenerator gekom ...
…en toe kry 'n nuwe werk my. En Apache Airflow het my daarop ingehaal.
Toe ek uitvind dat ETL-prosesbeskrywings eenvoudige Python-kode is, het ek net nie gedans van vreugde nie. Dit is hoe datastrome weergawe en verskil is, en om tabelle met 'n enkele struktuur uit honderde databasisse in een teiken te gooi, het 'n kwessie van Python-kode in een en 'n half of twee 13 "skerms geword.
Die samestelling van die groep
Kom ons reël nie 'n heeltemal kleuterskool nie, en praat nie hier oor heeltemal ooglopende dinge nie, soos die installering van Airflow, jou gekose databasis, Seldery en ander gevalle wat in die dokke beskryf word.
Sodat ons dadelik met eksperimente kan begin, het ek geskets docker-compose.yml waarin:
- Kom ons verhoog eintlik Lugvloei: Skeduleerder, Webbediener. Blom sal ook daar draai om selderytake te monitor (omdat dit reeds ingedruk is
apache/airflow:1.10.10-python3.7, maar ons gee nie om nie) - PostgreSQL, waarin Airflow sy diensinligting sal skryf (skeduleerderdata, uitvoeringstatistieke, ens.), en Selery sal voltooide take merk;
- Redis, wat as 'n taakmakelaar vir Seldery sal optree;
- Seldery werker, wat betrokke sal wees by die direkte uitvoering van take.
- Na gids
./dagsons sal ons lêers byvoeg met die beskrywing van dags. Hulle sal dadelik opgetel word, so dit is nie nodig om die hele stapel na elke nies te jongleren nie.
Op sommige plekke word die kode in die voorbeelde nie heeltemal gewys nie (om nie die teks deurmekaar te maak nie), maar iewers word dit in die proses gewysig. Volledige werkende kode voorbeelde kan gevind word in die bewaarplek .
Docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerNotas:
- In die samestelling van die komposisie het ek grootliks op die bekende beeld staatgemaak - maak seker dat u dit nagaan. Miskien het jy niks anders in jou lewe nodig nie.
- Alle lugvloeiinstellings is nie net beskikbaar nie
airflow.cfg, maar ook deur omgewingsveranderlikes (danksy die ontwikkelaars), wat ek kwaadwillig benut het. - Natuurlik is dit nie produksiegereed nie: ek het doelbewus nie hartklop op houers gesit nie, ek het my nie aan sekuriteit gesteur nie. Maar ek het die minimum gedoen wat geskik is vir ons eksperimenteerders.
- Let daarop dat:
- Die daglêer moet toeganklik wees vir beide die skeduleerder en die werkers.
- Dieselfde geld vir alle derdeparty-biblioteke - hulle moet almal op masjiene met 'n skeduleerder en werkers geïnstalleer word.
Wel, nou is dit eenvoudig:
$ docker-compose up --scale worker=3Nadat alles gestyg het, kan u na die webkoppelvlakke kyk:
- Lugvloei:
- Blom:
Basiese konsepte
As jy niks in al hierdie "dae" verstaan het nie, dan is hier 'n kort woordeboek:
- Skeduleerder - die belangrikste oom in Airflow, wat beheer dat robotte hard werk, en nie 'n persoon nie: monitor die skedule, werk dag op, loods take.
Oor die algemeen, in ouer weergawes, het hy probleme met geheue gehad (nee, nie geheueverlies nie, maar lekkasies) en die nalatenskapparameter het selfs in die konfigurasies gebly
run_duration- sy herbegin interval. Maar nou is alles reg. - DAG (aka "dag") - "gerigte asikliese grafiek", maar so 'n definisie sal min mense vertel, maar in werklikheid is dit 'n houer vir take wat met mekaar in wisselwerking is (sien hieronder) of 'n analoog van Package in SSIS en Workflow in Informatica .
Benewens daggies kan daar nog subdags wees, maar ons sal heel waarskynlik nie daarby uitkom nie.
- DAG hardloop - geïnisialiseer dag, wat sy eie toegeken word
execution_date. Dagrans van dieselfde dag kan parallel werk (as jy jou take natuurlik idempotent gemaak het). - operateur is stukke kode wat verantwoordelik is vir die uitvoering van 'n spesifieke aksie. Daar is drie tipes operateurs:
- aksiesoos ons gunsteling
PythonOperator, wat enige (geldige) Python-kode kan uitvoer; - oordra, wat data van plek tot plek vervoer, sê,
MsSqlToHiveTransfer; - sensor aan die ander kant sal dit jou toelaat om te reageer of die verdere uitvoering van die dag te vertraag totdat 'n gebeurtenis plaasvind.
HttpSensorkan die gespesifiseerde eindpunt trek, en wanneer die verlangde reaksie wag, begin die oordragGoogleCloudStorageToS3Operator. ’n Nuuskierige gees sal vra: “hoekom? Jy kan immers herhalings reg in die operateur doen!” En dan, om nie die poel take met geskorste operateurs te verstop nie. Die sensor begin, kontroleer en sterf voor die volgende poging.
- aksiesoos ons gunsteling
- Taak - verklaarde operateurs, ongeag tipe, en verbonde aan die dag word bevorder tot die rang van taak.
- taak instansie - toe die algemene beplanner besluit het dat dit tyd is om take in die stryd teen presterende-werkers te stuur (reg op die plek, as ons gebruik
LocalExecutorof na 'n afgeleë nodus in die geval vanCeleryExecutor), ken dit 'n konteks aan hulle toe (d.w.s. 'n stel veranderlikes - uitvoeringsparameters), brei opdrag- of navraagsjablone uit en voeg dit saam.
Ons genereer take
Laat ons eers die algemene skema van ons doug uiteensit, en dan sal ons meer en meer in die besonderhede duik, want ons pas 'n paar nie-triviale oplossings toe.
So, in sy eenvoudigste vorm, sal so 'n dag soos volg lyk:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Kom ons vind dit uit:
- Eerstens voer ons die nodige libs in en iets anders;
sql_server_ds- IsList[namedtuple[str, str]]met die name van die verbindings van Airflow Connections en die databasisse waaruit ons ons bord sal neem;dag- die aankondiging van ons dag, wat noodwendig in moet weesglobals(), anders sal Airflow dit nie vind nie. Doug moet ook sê:- wat is sy naam
orders- hierdie naam sal dan in die webkoppelvlak verskyn, - dat hy vanaf middernag op die agtste Julie sal werk,
- en dit behoort ongeveer elke 6 uur te loop (vir geharde ouens hier in plaas van
timedelta()toelaatbaarcron-lyn0 0 0/6 ? * * *, vir die minder cool - 'n uitdrukking soos@daily);
- wat is sy naam
workflow()sal die hoofwerk doen, maar nie nou nie. Vir eers sal ons net ons konteks in die log gooi.- En nou die eenvoudige magie van die skep van take:
- ons hardloop deur ons bronne;
- inisialiseer
PythonOperator, wat ons dummy sal teregstelworkflow(). Moenie vergeet om 'n unieke (binne die dag) naam van die taak te spesifiseer en die dag self te bind nie. Vlagprovide_contextop sy beurt sal addisionele argumente in die funksie gooi, wat ons versigtig sal versamel deur gebruik te maak**context.
Vir nou is dit al. Wat ons gekry het:
- nuwe dag in die webkoppelvlak,
- een en 'n half honderd take wat parallel uitgevoer sal word (as die lugvloei, seldery-instellings en bedienerkapasiteit dit toelaat).
Wel, amper het dit.

Wie sal die afhanklikhede installeer?
Om hierdie hele ding te vereenvoudig, het ek ingeskroef docker-compose.yml verwerking requirements.txt op alle nodusse.
Nou is dit weg:

Grys blokkies is taakgevalle wat deur die skeduleerder verwerk word.
Ons wag 'n bietjie, die take word opgeraap deur die werkers:

Die groenes het natuurlik hul werk suksesvol voltooi. Rooies is nie baie suksesvol nie.
Terloops, daar is geen gids op ons produk nie
./dags, daar is geen sinchronisasie tussen masjiene nie - alle dae lê ingitop ons Gitlab, en Gitlab CI versprei opdaterings na masjiene wanneer hulle saamsmeltmaster.
'n Bietjie oor Blom
Terwyl die werkers ons fopspeen slaan, laat ons nog 'n hulpmiddel onthou wat vir ons iets kan wys - Blom.
Die heel eerste bladsy met opsommende inligting oor werker nodusse:

Die mees intense bladsy met take wat te werk gegaan het:

Die verveligste bladsy met die status van ons makelaar:

Die helderste bladsy is met taakstatusgrafieke en hul uitvoeringstyd:

Ons laai die onderlaaide
So, al die take het uitgewerk, jy kan die gewondes wegdra.

En daar was baie gewondes – om een of ander rede. In die geval van die korrekte gebruik van Airflow, dui hierdie einste blokkies aan dat die data beslis nie opgedaag het nie.
Jy moet die log dophou en die gevalle taakgevalle weer begin.
Deur op enige vierkant te klik, sal ons die aksies wat vir ons beskikbaar is, sien:

Jy kan die gevallenes neem en skoonmaak. Dit wil sê, ons vergeet dat iets daar misluk het, en dieselfde voorbeeldtaak sal na die skeduleerder gaan.

Dit is duidelik dat dit nie baie menslik is om dit met die muis met al die rooi blokkies te doen nie – dit is nie wat ons van Airflow verwag nie. Natuurlik het ons massavernietigingswapens: Browse/Task Instances

Kom ons kies alles gelyktydig en herstel na nul, klik op die korrekte item:

Na skoonmaak lyk ons taxi's so (hulle wag reeds vir die skeduleerder om hulle te skeduleer):

Verbindings, hake en ander veranderlikes
Dit is tyd om na die volgende DAG te kyk, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Het almal al ooit 'n verslagopdatering gedoen? Dit is weer sy: daar is 'n lys van bronne van waar om die data te kry; daar is 'n lys waar om te plaas; moenie vergeet om te toeter wanneer alles gebeur of gebreek het nie (wel, dit gaan nie oor ons nie, nee).
Kom ons gaan weer deur die lêer en kyk na die nuwe obskure goed:
from commons.operators import TelegramBotSendMessage- niks verhinder ons om ons eie operateurs te maak nie, wat ons benut het deur 'n klein omhulsel te maak om boodskappe na Unblocked te stuur. (Ons sal hieronder meer oor hierdie operateur praat);default_args={}- dag kan dieselfde argumente aan al sy operateurs versprei;to='{{ var.value.all_the_kings_men }}'- veldtoons sal nie hardkodeer hê nie, maar dinamies gegenereer met behulp van Jinja en 'n veranderlike met 'n lys e-posse, wat ek versigtig ingesit hetAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— voorwaarde vir die aanvang van die operateur. In ons geval sal die brief net na die base vlieg as alle afhanklikhede uitgewerk het suksesvol;tg_bot_conn_id='tg_main'- argumenteconn_idaanvaar verbindings-ID's wat ons skepAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- boodskappe in Telegram sal net wegvlieg as daar gevalle take is;task_concurrency=1- ons verbied die gelyktydige bekendstelling van verskeie taakgevalle van een taak. Andersins kry ons die gelyktydige bekendstelling van verskeieVerticaOperator(kyk na een tafel);report_update >> [email, tg]- almalVerticaOperatorkonvergeer in die stuur van briewe en boodskappe, soos hierdie:

Maar aangesien kennisgewingsoperateurs verskillende bekendstellingstoestande het, sal slegs een werk. In die boomaansig lyk alles 'n bietjie minder visueel:

Ek sal 'n paar woorde sê oor makro's en hul vriende - veranderlikes.
Makro's is Jinja-plekhouers wat verskeie nuttige inligting in operateurargumente kan vervang. Byvoorbeeld, soos volg:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} sal uitbrei na die inhoud van die konteksveranderlike execution_date in die formaat YYYY-MM-DD: 2020-07-14. Die beste deel is dat konteksveranderlikes aan 'n spesifieke taakgeval vasgespyker word ('n vierkant in die boomaansig), en wanneer dit herbegin word, sal die plekhouers na dieselfde waardes uitbrei.
Die toegekende waardes kan bekyk word met die Weergegee-knoppie op elke taakgeval. Dit is hoe die taak om 'n brief te stuur:

En so by die taak met die stuur van 'n boodskap:

'n Volledige lys van ingeboude makro's vir die nuutste beskikbare weergawe is hier beskikbaar:
Boonop kan ons met behulp van plugins ons eie makro's verklaar, maar dit is 'n ander storie.
Benewens die vooraf gedefinieerde dinge, kan ons die waardes van ons veranderlikes vervang (ek het dit reeds in die kode hierbo gebruik). Kom ons skep in Admin/Variables 'n paar dinge:

Alles wat jy kan gebruik:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Die waarde kan 'n skalaar wees, of dit kan ook JSON wees. In die geval van JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}gebruik net die pad na die verlangde sleutel: {{ var.json.bot_config.bot.token }}.
Ek sal letterlik een woord sê en een kiekie oor wys verbindings. Alles is elementêr hier: op die bladsy Admin/Connections ons skep 'n verbinding, voeg ons logins / wagwoorde en meer spesifieke parameters daar by. Soos hierdie:

Wagwoorde kan geïnkripteer word (meer deeglik as die verstek), of jy kan die verbindingstipe weglaat (soos ek gedoen het vir tg_main) - die feit is dat die lys tipes hardbedraad is in Airflow-modelle en nie uitgebrei kan word sonder om in die bronkodes in te gaan nie (as ek skielik nie iets gegoogle het nie, korrigeer my asseblief), maar niks sal ons keer om krediete te kry net deur naam.
U kan ook verskeie verbindings met dieselfde naam maak: in hierdie geval die metode BaseHook.get_connection(), wat ons verbindings by die naam kry, sal gee ewekansig van verskeie naamgenote (dit sal meer logies wees om Round Robin te maak, maar kom ons los dit op die gewete van die Airflow-ontwikkelaars).
Veranderlikes en verbindings is beslis oulike hulpmiddels, maar dit is belangrik om nie die balans te verloor nie: watter dele van jou vloeie stoor jy in die kode self, en watter dele gee jy aan Airflow vir berging. Aan die een kant kan dit gerieflik wees om die waarde vinnig te verander, byvoorbeeld 'n posbus, deur die UI. Aan die ander kant is dit steeds 'n terugkeer na die muisklik, waarvan ons (ek) ontslae wou raak.
Om met verbindings te werk is een van die take hake. Oor die algemeen is Airflow-hake punte om dit aan derdeparty-dienste en biblioteke te koppel. Bv. JiraHook sal 'n kliënt vir ons oopmaak om met Jira te kommunikeer (jy kan take heen en weer skuif), en met behulp van SambaHook jy kan 'n plaaslike lêer na stoot smb-punt.
Ontleding van die pasgemaakte operateur
En ons het naby gekom om te kyk hoe dit gemaak word TelegramBotSendMessage
Kode commons/operators.py met die werklike operateur:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Hier, soos alles anders in Airflow, is alles baie eenvoudig:
- Geërf van
BaseOperator, wat 'n hele paar Airflow-spesifieke dinge implementeer (kyk na jou ontspanning) - Verklaarde velde
template_fields, waarin Jinja na makro's sal soek om te verwerk. - Die regte argumente gereël vir
__init__(), stel die verstekwaardes in waar nodig. - Ons het ook nie vergeet van die inisialisering van die voorvader nie.
- Het die ooreenstemmende haak oopgemaak
TelegramBotHook'n kliëntobjek daaruit ontvang. - Veranderde (herdefinieer) metode
BaseOperator.execute(), wat Airfow sal draai wanneer die tyd aanbreek om die operateur te begin - daarin sal ons die hoofaksie implementeer, en vergeet om aan te meld. (Ons meld terloops dadelik aanstdoutиstderr- Lugvloei sal alles onderskep, dit pragtig toevou, dit ontbind waar nodig.)
Kom ons kyk wat ons het commons/hooks.py. Die eerste deel van die lêer, met die haak self:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientEk weet nie eers wat om hier te verduidelik nie, ek let net op die belangrike punte:
- Ons erf, dink oor die argumente - in die meeste gevalle sal dit een wees:
conn_id; - Oorheersende standaardmetodes: Ek het myself beperk
get_conn(), waarin ek die verbindingsparameters by naam kry en net die afdeling kryextra(dit is 'n JSON-veld), waarin ek (volgens my eie instruksies!) die Telegram-bottoken plaas:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Ek skep 'n voorbeeld van ons
TelegramBot, gee dit 'n spesifieke teken.
Dis al. Jy kan 'n kliënt uit 'n haak met behulp van TelegramBotHook().clent of TelegramBotHook().get_conn().
En die tweede deel van die lêer, waarin ek 'n mikrowrapper vir die Telegram REST API maak, om nie dieselfde te sleep nie vir een metode sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Die korrekte manier is om dit alles bymekaar te tel:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- in die inprop, plaas 'n publieke bewaarplek en gee dit aan Open Source.
Terwyl ons dit alles bestudeer het, het ons verslagopdaterings daarin geslaag om suksesvol te misluk en vir my 'n foutboodskap in die kanaal te stuur. Ek gaan kyk of dit verkeerd is...

Iets het in ons doge gebreek! Is dit nie wat ons verwag het nie? Presies!
Gaan jy skink?
Voel jy ek het iets gemis? Dit blyk dat hy belowe het om data van SQL Server na Vertica oor te dra, en toe neem hy dit en skuif van die onderwerp af, die skelm!
Hierdie gruweldaad was opsetlik, ek moes bloot 'n paar terminologie vir jou ontsyfer. Nou kan jy verder gaan.
Ons plan was dit:
- Doen dag
- Genereer take
- Kyk hoe mooi is alles
- Ken sessienommers toe aan vullings
- Kry data vanaf SQL Server
- Plaas data in Vertica
- Versamel statistieke
Dus, om dit alles aan die gang te kry, het ek 'n klein toevoeging tot ons docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyDaar lig ons op:
- Vertika as gasheer
dwhmet die mees verstek instellings, - drie gevalle van SQL Server,
- ons vul die databasisse in laasgenoemde met sommige data (moet in geen geval nie na kyk nie
mssql_init.py!)
Ons begin al die goeie met behulp van 'n effens meer ingewikkelde opdrag as die vorige keer:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Wat ons wonderwerk-randomizer gegenereer het, kan jy die item gebruik Data Profiling/Ad Hoc Query:

Die belangrikste ding is om dit nie aan ontleders te wys nie
uitbrei oor ETL sessies Ek sal nie, alles is onbenullig daar: ons maak 'n basis, daar is 'n teken daarin, ons draai alles toe met 'n konteksbestuurder, en nou doen ons dit:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passDie tyd het aangebreek versamel ons data van ons een en 'n half honderd tafels. Kom ons doen dit met behulp van baie onpretensieuse lyne:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Met die hulp van 'n haak kry ons van Airflow
pymssql- verbind - Kom ons plaas 'n beperking in die vorm van 'n datum in die versoek - dit sal deur die sjabloonenjin in die funksie gegooi word.
- Voed ons versoek
pandaswie sal ons kryDataFrame- dit sal vir ons nuttig wees in die toekoms.
Ek gebruik substitusie
{dt}in plaas van 'n versoekparameter%snie omdat ek 'n bose Pinocchio is nie, maar omdatpandaskan hanteer niepymssqlen glip die laaste eenparams: Listalhoewel hy regtig wiltuple.
Let ook daarop dat die ontwikkelaarpymssqlbesluit om hom nie meer te ondersteun nie, en dit is tyd om uit te trekpyodbc.
Kom ons kyk waarmee Airflow die argumente van ons funksies gevul het:

As daar geen data is nie, is dit geen sin om voort te gaan nie. Maar dit is ook vreemd om die vulsel as suksesvol te beskou. Maar dit is nie 'n fout nie. A-ah-ah, wat om te doen?! En hier is wat:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException sê vir Airflow dat daar geen foute is nie, maar ons slaan die taak oor. Die koppelvlak sal nie 'n groen of rooi vierkant hê nie, maar pienk.
Kom ons gooi ons data veelvuldige kolomme:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Naamlik
- Die databasis waaruit ons die bestellings geneem het,
- ID van ons oorstromingsessie (dit sal anders wees vir elke taak),
- 'n Hash van die bron en bestelling ID - sodat ons in die finale databasis (waar alles in een tabel gegooi word) 'n unieke bestelling ID het.
Die voorlaaste stap bly: gooi alles in Vertica. En, vreemd genoeg, is een van die skouspelagtigste en doeltreffendste maniere om dit te doen deur CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Ons maak 'n spesiale ontvanger
StringIO. pandassal vriendelik plaas onsDataFramein die vormCSV-lyne.- Kom ons maak 'n verbinding met ons gunsteling Vertica met 'n haak.
- En nou met die hulp
copy()stuur ons data direk na Vertika!
Ons sal by die bestuurder neem hoeveel lyne gevul is, en vir die sessiebestuurder sê dat alles reg is:
session.loaded_rows = cursor.rowcount
session.successful = TrueDit is alles.
Op die uitverkoping skep ons die teikenplaat met die hand. Hier het ek myself 'n klein masjien toegelaat:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)ek gebruik
VerticaOperator()Ek skep 'n databasisskema en 'n tabel (as dit nie reeds bestaan nie, natuurlik). Die belangrikste ding is om die afhanklikhede korrek te rangskik:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadOpsomming
- Wel, - sê die muisie, - is dit nie nou nie
Is jy oortuig dat ek die aakligste dier in die bos is?
Julia Donaldson, The Gruffalo
Ek dink as ek en my kollegas 'n kompetisie gehad het: wie sal vinnig 'n ETL-proses van nuuts af skep en begin: hulle met hul SSIS en 'n muis en ek met Airflow ... En dan sou ons ook die gemak van onderhoud vergelyk ... Sjoe, ek dink jy sal saamstem dat ek hulle op alle fronte sal klop!
As 'n bietjie meer ernstig, dan het Apache Airflow - deur prosesse in die vorm van programkode te beskryf - my werk gedoen veel gemakliker en lekkerder.
Die onbeperkte uitbreidbaarheid daarvan, beide in terme van inproppe en geneigdheid tot skaalbaarheid, gee jou die geleentheid om Airflow in byna enige area te gebruik: selfs in die volle siklus van die insameling, voorbereiding en verwerking van data, selfs in die lansering van vuurpyle (na Mars, van kursus).
Deelfinaal, verwysing en inligting
Die hark wat ons vir jou ingesamel het
start_date. Ja, dit is reeds 'n plaaslike meme. Via Doug se hoofargumentstart_datealmal slaag. Kortliks, as jy spesifiseer instart_datehuidige datum, enschedule_interval- eendag, dan begin DAG môre nie vroeër nie.start_date = datetime(2020, 7, 7, 0, 1, 2)En nie meer probleme nie.
Daar is nog 'n looptydfout wat daarmee verband hou:
Task is missing the start_date parameter, wat meestal aandui dat jy vergeet het om aan die dagoperateur te bind.- Alles op een masjien. Ja, en basisse (Airflow self en ons coating), en 'n webbediener, en 'n skeduleerder, en werkers. En dit het selfs gewerk. Maar met verloop van tyd het die aantal take vir dienste gegroei, en toe PostgreSQL in 20 s in plaas van 5 ms op die indeks begin reageer het, het ons dit geneem en weggedra.
- LocalExecutor. Ja, ons sit nog steeds daarop, en ons het reeds op die rand van die afgrond gekom. LocalExecutor was tot dusver genoeg vir ons, maar nou is dit tyd om met ten minste een werker uit te brei, en ons sal hard moet werk om na CeleryExecutor te skuif. En in die lig van die feit dat jy daarmee op een masjien kan werk, keer niks jou om Selery selfs op 'n bediener te gebruik nie, wat "natuurlik nooit in produksie sal gaan nie, eerlikwaar!"
- Nie-gebruik ingeboude gereedskap:
- Connections om diensbewyse te stoor,
- SLA Mejuffroue om te reageer op take wat nie betyds uitgewerk het nie,
- xcom vir metadata-uitruiling (ek het gesê metadata!) tussen dagtake.
- Pos misbruik. Wel, wat kan ek sê? Waarskuwings is opgestel vir alle herhalings van gevalle take. Nou het my werk Gmail meer as 90 100 e-posse vanaf Airflow, en die webpos snuit weier om meer as XNUMX op 'n slag op te tel en uit te vee.
Nog slaggate:
Meer outomatiseringsinstrumente
Sodat ons selfs meer met ons koppe kan werk en nie met ons hande nie, het Airflow dit vir ons voorberei:
- - hy het steeds die status van Eksperimenteel, wat hom nie verhinder om te werk nie. Daarmee kan jy nie net inligting oor dag en take kry nie, maar ook 'n dag stop/begin, 'n DAG Run of 'n swembad skep.
- - Baie gereedskap is beskikbaar deur die opdragreël wat nie net ongerieflik is om deur die WebUI te gebruik nie, maar oor die algemeen afwesig is. Byvoorbeeld:
backfillnodig om taakgevalle te herbegin.
Ontleders het byvoorbeeld kom sê: “En jy, kameraad, het nonsens in die data van 1 tot 13 Januarie! Maak dit reg, maak dit reg, maak dit reg, maak dit reg!" En jy is so 'n kookplaat:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Basisdiens:
initdb,resetdb,upgradedb,checkdb. run, wat jou toelaat om een instansie taak uit te voer, en selfs op alle afhanklikhede te score. Boonop kan u dit viaLocalExecutor, selfs al het jy 'n Seldery-kluster.- Doen omtrent dieselfde ding
test, net ook in basisse skryf niks. connectionslaat massaskepping van verbindings vanaf die dop toe.
- - 'n taamlik harde manier van interaksie, wat bedoel is vir plugins, en nie met klein handjies daarin swerm nie. Maar wie gaan ons keer om na te gaan
/home/airflow/dags, hardloopipythonen begin rondmors? U kan byvoorbeeld alle verbindings met die volgende kode uitvoer:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Koppel aan die Airflow-metadatabasis. Ek beveel nie aan om daaraan te skryf nie, maar om taakstate vir verskeie spesifieke maatstawwe te kry, kan baie vinniger en makliker wees as om enige van die API's te gebruik.
Kom ons sê dat nie al ons take idempotent is nie, maar hulle kan soms val, en dit is normaal. Maar 'n paar blokkasies is reeds verdag, en dit sal nodig wees om na te gaan.
Pasop SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
verwysings
En natuurlik is die eerste tien skakels vanaf die uitreiking van Google die inhoud van die Airflow-lêergids van my boekmerke.
- - natuurlik moet ons by die kantoor begin. dokumentasie, maar wie lees die instruksies?
- - Wel, lees ten minste die aanbevelings van die skeppers.
- - die heel begin: die gebruikerskoppelvlak in foto's
- - die basiese konsepte is goed beskryf, as jy (skielik!) iets van my nie verstaan het nie.
- - 'n kort gids vir die opstel van 'n lugvloei-kluster.
- - amper dieselfde interessante artikel, behalwe miskien meer formalisme, en minder voorbeelde.
- - oor om saam met Seldery te werk.
- - oor die idempotensie van take, laai volgens ID in plaas van datum, transformasie, lêerstruktuur en ander interessante dinge.
- - afhanklikhede van take en Snellerreël, wat ek net terloops genoem het.
- - hoe om 'n paar "werke soos bedoel" in die skeduleerder te oorkom, verlore data te laai en take te prioritiseer.
- - nuttige SQL-navrae na Airflow-metadata.
- - daar is 'n nuttige afdeling oor die skep van 'n pasgemaakte sensor.
- — 'n interessante kort nota oor die bou van 'n infrastruktuur op AWS vir Data Science.
- - algemene foute (wanneer iemand steeds nie die instruksies lees nie).
- - glimlag hoe mense kruks stoor wagwoorde, alhoewel jy net verbindings kan gebruik.
- - implisiete DAG-aanstuur, konteks gooi funksies in, weer oor afhanklikhede, en ook oor die oorslaan van taakbekendstellings.
- - oor die gebruik
default argumentsиparamsin sjablone, sowel as veranderlikes en verbindings. - - 'n storie oor hoe die beplanner vir Airflow 2.0 voorberei.
- - 'n effens verouderde artikel oor die implementering van ons groepering in
docker-compose. - - dinamiese take met behulp van sjablone en konteksaanstuur.
- - standaard en pasgemaakte kennisgewings per pos en Slack.
- - Vertakkingstake, makro's en XCom.
En die skakels wat in die artikel gebruik word:
- - plekhouers beskikbaar vir gebruik in sjablone.
- — Algemene foute by die skep van dagga.
- -
docker-composevir eksperimentering, ontfouting en meer. - - Python-omhulsel vir Telegram REST API.
Bron: will.com




