Hej, jeg hedder Dmitry Logvinenko - dataingeniør i analyseafdelingen i Vezet-gruppen af virksomheder.
Jeg vil fortælle dig om et vidunderligt værktøj til udvikling af ETL-processer - Apache Airflow. Men Airflow er så alsidigt og mangefacetteret, at du bør se nærmere på det, selvom du ikke er involveret i datastrømme, men har et behov for periodisk at starte eventuelle processer og overvåge deres eksekvering.
Og ja, jeg vil ikke kun fortælle, men også vise: programmet har en masse kode, skærmbilleder og anbefalinger.
Hvad du normalt ser, når du googler ordet Airflow / Wikimedia Commons
- kun bedre, og den blev lavet til helt andre formål, nemlig (som det er skrevet før kataen):
køre og overvåge opgaver på et ubegrænset antal maskiner (som mange selleri / Kubernetes og din samvittighed vil tillade dig)
med dynamisk workflowgenerering fra meget let at skrive og forstå Python-kode
og muligheden for at forbinde alle databaser og API'er med hinanden ved hjælp af både færdige komponenter og hjemmelavede plugins (hvilket er ekstremt enkelt).
Vi bruger Apache Airflow sådan her:
vi indsamler data fra forskellige kilder (mange SQL Server- og PostgreSQL-instanser, forskellige API'er med applikationsmetrikker, endda 1C) i DWH og ODS (vi har Vertica og Clickhouse).
hvor avanceret cron, som starter datakonsolideringsprocesserne på ODS'en og også overvåger deres vedligeholdelse.
Indtil for nylig blev vores behov dækket af én lille server med 32 kerner og 50 GB RAM. I Airflow virker dette:
mere 200 dags (faktisk arbejdsgange, hvor vi fyldte opgaver),
i hver i gennemsnit 70 opgaver,
denne godhed starter (også i gennemsnit) en gang i timen.
Og om hvordan vi udvidede, vil jeg skrive nedenfor, men lad os nu definere über-problemet, som vi vil løse:
Der er tre originale SQL-servere, hver med 50 databaser - forekomster af henholdsvis et projekt, de har den samme struktur (næsten overalt, mua-ha-ha), hvilket betyder, at hver har en ordretabel (heldigvis en tabel med det navn kan skubbes ind i enhver virksomhed). Vi tager dataene ved at tilføje servicefelter (kildeserver, kildedatabase, ETL opgave-id) og smider dem naivt ind i f.eks. Vertica.
Lad os gå!
Hoveddelen, praktisk (og lidt teoretisk)
Hvorfor gør vi (og du)
Da træerne var store, og jeg var simpel SQL-schik i en russisk detailhandel snydte vi ETL-processer aka datastrømme ved hjælp af to tilgængelige værktøjer:
Informatica Power Center - et ekstremt spredningssystem, ekstremt produktivt, med sin egen hardware, sin egen versionering. Jeg brugte Gud forbyde 1% af dens muligheder. Hvorfor? Nå, først og fremmest satte denne grænseflade, et sted fra 380'erne, mentalt pres på os. For det andet er denne anordning designet til ekstremt smarte processer, rasende genbrug af komponenter og andre meget vigtige virksomhedstricks. Om hvad det koster, ligesom vingen på Airbus AXNUMX / år, vil vi ikke sige noget.
Pas på, et skærmbillede kan skade folk under 30 lidt
SQL Server Integration Server - vi brugte denne kammerat i vores interne projektstrømme. Nå, faktisk: vi bruger allerede SQL Server, og det ville på en eller anden måde være urimeligt ikke at bruge dets ETL-værktøjer. Alt i det er godt: både grænsefladen er smuk, og statusrapporterne ... Men det er ikke derfor, vi elsker softwareprodukter, åh, ikke for dette. Version det dtsx (som er XML med noder blandet på gem) kan vi, men hvad er meningen? Hvad med at lave en opgavepakke, der vil trække hundredvis af tabeller fra en server til en anden? Ja, sikke et hundrede, din pegefinger vil falde af fra tyve stykker ved at klikke på museknappen. Men det ser bestemt mere moderigtigt ud:
Vi ledte bestemt efter udveje. Sag endda næsten kom til en selvskrevet SSIS-pakkegenerator ...
…og så fandt et nyt job mig. Og Apache Airflow overhalede mig på det.
Da jeg fandt ud af, at ETL-procesbeskrivelser er simpel Python-kode, dansede jeg bare ikke af glæde. Sådan blev datastrømme versioneret og adskilt, og at hælde tabeller med en enkelt struktur fra hundredvis af databaser ind i ét mål blev et spørgsmål om Python-kode på halvanden eller to 13 ”skærme.
Samling af klyngen
Lad os ikke arrangere en helt børnehave, og ikke tale om helt indlysende ting her, som at installere Airflow, din valgte database, Selleri og andre sager beskrevet i dokken.
Så vi straks kan begynde eksperimenter, skitserede jeg docker-compose.yml hvori:
Lad os faktisk hæve Luftmængde: Planlægger, Webserver. Blomst vil også snurre der for at overvåge selleri-opgaver (fordi den allerede er blevet skubbet ind apache/airflow:1.10.10-python3.7, men vi gider ikke)
PostgreSQL, hvor Airflow vil skrive sine serviceoplysninger (planlægningsdata, udførelsesstatistik osv.), og Celery vil markere afsluttede opgaver;
Omfor, som vil fungere som opgavemægler for Selleri;
Selleriarbejder, som vil være engageret i den direkte udførelse af opgaver.
Til mappe ./dags vi tilføjer vores filer med beskrivelsen af dags. De bliver samlet op i farten, så der er ingen grund til at jonglere med hele stakken efter hvert nys.
Nogle steder er koden i eksemplerne ikke helt vist (for ikke at rode i teksten), men et eller andet sted bliver den modificeret i processen. Fuldstændige eksempler på arbejdskode kan findes i depotet https://github.com/dm-logv/airflow-tutorial.
I sammensætningen af kompositionen stolede jeg i høj grad på det velkendte billede pukkel/docker-luftstrøm - husk at tjekke det ud. Måske har du ikke brug for andet i dit liv.
Alle Airflow-indstillinger er tilgængelige ikke kun via airflow.cfg, men også gennem miljøvariabler (tak til udviklerne), som jeg ondsindet benyttede mig af.
Naturligvis er den ikke produktionsklar: Jeg lagde bevidst ikke hjerteslag på containere, jeg bøvlede ikke med sikkerheden. Men jeg gjorde det minimum, der var passende for vores forsøgsledere.
Noter det:
Dag-mappen skal være tilgængelig for både planlæggeren og arbejderne.
Det samme gælder for alle tredjepartsbiblioteker - de skal alle være installeret på maskiner med skemalægger og arbejdere.
Nå, nu er det enkelt:
$ docker-compose up --scale worker=3
Når alt er kommet op, kan du se på webgrænsefladerne:
Hvis du ikke forstod noget i alle disse "dage", så er her en kort ordbog:
Scheduler - den vigtigste onkel i Airflow, der kontrollerer, at robotter arbejder hårdt, og ikke en person: overvåger tidsplanen, opdaterer dag, starter opgaver.
Generelt havde han i ældre versioner problemer med hukommelsen (nej, ikke hukommelsestab, men lækager), og den gamle parameter forblev endda i konfigurationerne run_duration — dens genstartsinterval. Men nu er alt godt.
DAG (aka "dag") - "dirigeret acyklisk graf", men en sådan definition vil fortælle få mennesker, men faktisk er det en beholder til opgaver, der interagerer med hinanden (se nedenfor) eller en analog af Package i SSIS og Workflow i Informatica .
Ud over dags kan der stadig være underdage, men dem når vi højst sandsynligt ikke.
DAG Løb - initialiseret dag, som tildeles sin egen execution_date. Dagrans af samme dag kan arbejde parallelt (hvis du har gjort dine opgaver idempotente, selvfølgelig).
Operatør er stykker kode, der er ansvarlige for at udføre en bestemt handling. Der er tre typer operatører:
Actionsom vores favorit PythonOperator, som kan udføre enhver (gyldig) Python-kode;
overførsel, som transporterer data fra sted til sted, f.eks. MsSqlToHiveTransfer;
sensor på den anden side vil det give dig mulighed for at reagere eller bremse den videre udførelse af dagen, indtil en begivenhed indtræffer. HttpSensor kan trække det angivne slutpunkt, og når det ønskede svar venter, startes overførslen GoogleCloudStorageToS3Operator. Et nysgerrigt sind vil spørge: "hvorfor? Du kan trods alt lave gentagelser direkte i operatøren!” Og så, for ikke at tilstoppe puljen af opgaver med suspenderede operatører. Sensoren starter, kontrollerer og dør inden næste forsøg.
Opgaver - erklærede operatører, uanset type, og knyttet til dagen forfremmes til rang af opgave.
opgave instans - da den generelle planlægger besluttede, at det var tid til at sende opgaver i kamp på performer-arbejdere (lige på stedet, hvis vi bruger LocalExecutor eller til en fjernknude i tilfælde af CeleryExecutor), den tildeler en kontekst til dem (dvs. et sæt variabler - udførelsesparametre), udvider kommando- eller forespørgselsskabeloner og samler dem.
Vi genererer opgaver
Lad os først skitsere det generelle skema for vores doug, og så vil vi dykke ned i detaljerne mere og mere, fordi vi anvender nogle ikke-trivielle løsninger.
Så i sin enkleste form vil sådan en dag se sådan ud:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Lad os finde ud af det:
Først importerer vi de nødvendige libs og noget andet;
sql_server_ds - Er List[namedtuple[str, str]] med navnene på forbindelserne fra Airflow Connections og de databaser, som vi tager vores plade fra;
dag - annonceringen af vores dag, som nødvendigvis skal være inde globals(), ellers finder Airflow det ikke. Doug skal også sige:
hvad er hans navn orders - dette navn vises derefter i webgrænsefladen,
at han skal arbejde fra midnat den ottende juli,
og den skal køre cirka hver 6. time (for hårde fyre her i stedet for timedelta() tilladt cron-linje 0 0 0/6 ? * * *, for de mindre seje - et udtryk som @daily);
workflow() vil gøre hovedopgaven, men ikke nu. Indtil videre dumper vi blot vores kontekst i loggen.
Og nu den simple magi ved at skabe opgaver:
vi løber gennem vores kilder;
initialisere PythonOperator, som vil henrette vores dummy workflow(). Glem ikke at angive et unikt (inden for dag) navn på opgaven og binde selve dagen. Flag provide_context til gengæld vil hælde yderligere argumenter ind i funktionen, som vi omhyggeligt vil indsamle ved hjælp af **context.
For nu er det alt. Hvad vi fik:
ny dag i webgrænsefladen,
halvandet hundrede opgaver, der vil blive udført parallelt (hvis Airflow, Selleri-indstillinger og serverkapacitet tillader det).
Nå, jeg fik det næsten.
Hvem vil installere afhængighederne?
For at forenkle det hele skruede jeg ind docker-compose.yml forarbejdning requirements.txt på alle noder.
Nu er det væk:
Grå firkanter er opgaveforekomster, der behandles af planlæggeren.
Vi venter lidt, opgaverne er snappet op af arbejderne:
De grønne har selvfølgelig fuldført deres arbejde. Røde er ikke særlig succesfulde.
Der er i øvrigt ingen folder på vores prod ./dags, der er ingen synkronisering mellem maskiner - alle dage ligger i git på vores Gitlab, og Gitlab CI distribuerer opdateringer til maskiner, når de flettes ind master.
Lidt om Blomst
Mens arbejderne tæsker vores sutter, lad os huske et andet værktøj, der kan vise os noget - Blomst.
Den allerførste side med oversigtsoplysninger om arbejderknudepunkter:
Den mest intense side med opgaver, der gik på arbejde:
Den mest kedelige side med status for vores mægler:
Den lyseste side er med opgavestatusgrafer og deres udførelsestid:
Vi læsser de underbelastede
Så alle opgaverne er løst, du kan bære de sårede væk.
Og der var mange sårede – af den ene eller anden grund. I tilfælde af korrekt brug af Airflow indikerer netop disse firkanter, at dataene bestemt ikke kom frem.
Du skal se loggen og genstarte de faldne opgaveforekomster.
Ved at klikke på en hvilken som helst firkant, vil vi se de handlinger, der er tilgængelige for os:
Du kan tage og gøre Clear the fallen. Det vil sige, vi glemmer, at noget er fejlet der, og den samme instansopgave vil gå til skemalæggeren.
Det er klart, at det ikke er særlig humant at gøre dette med musen med alle de røde firkanter – det er ikke det, vi forventer af Airflow. Naturligvis har vi masseødelæggelsesvåben: Browse/Task Instances
Lad os vælge alt på én gang og nulstille, klik på det korrekte element:
Efter rengøring ser vores taxaer sådan ud (de venter allerede på, at planlæggeren planlægger dem):
Forbindelser, kroge og andre variabler
Det er tid til at se på den næste DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Har alle nogensinde lavet en rapportopdatering? Dette er hende igen: der er en liste over kilder, hvorfra man kan hente data; der er en liste, hvor man skal sætte; glem ikke at tude, når alt skete eller gik i stykker (godt, det handler ikke om os, nej).
Lad os gennemgå filen igen og se på de nye obskure ting:
from commons.operators import TelegramBotSendMessage - intet forhindrer os i at lave vores egne operatører, hvilket vi udnyttede ved at lave en lille indpakning til at sende beskeder til Unblocked. (Vi vil tale mere om denne operatør nedenfor);
default_args={} - dag kan distribuere de samme argumenter til alle sine operatører;
to='{{ var.value.all_the_kings_men }}' - Mark to vi vil ikke have hårdkodet, men dynamisk genereret ved hjælp af Jinja og en variabel med en liste over e-mails, som jeg omhyggeligt satte ind Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS — betingelse for at starte operatøren. I vores tilfælde vil brevet kun flyve til cheferne, hvis alle afhængigheder har fungeret succes;
tg_bot_conn_id='tg_main' - argumenter conn_id acceptere forbindelses-id'er, som vi opretter i Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - beskeder i Telegram vil kun flyve væk, hvis der er faldne opgaver;
task_concurrency=1 - vi forbyder samtidig lancering af flere opgaveforekomster af én opgave. Ellers får vi den samtidige lancering af flere VerticaOperator (ser på et bord);
report_update >> [email, tg] - Alle VerticaOperator konvergere i at sende breve og meddelelser, som dette:
Men da notifier-operatører har forskellige lanceringsbetingelser, vil kun én fungere. I trævisningen ser alt lidt mindre visuelt ud:
Jeg vil sige et par ord om makroer og deres venner - variabler.
Makroer er Jinja-pladsholdere, der kan erstatte forskellige nyttige oplysninger med operatørargumenter. For eksempel sådan her:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} udvides til indholdet af kontekstvariablen execution_date i formatet YYYY-MM-DD: 2020-07-14. Det bedste er, at kontekstvariabler er naglet til en specifik opgaveforekomst (en firkant i trævisningen), og når de genstartes, udvides pladsholderne til de samme værdier.
De tildelte værdier kan ses ved at bruge knappen Gengivet på hver opgaveforekomst. Sådan er opgaven med at sende et brev:
Og så ved opgaven med at sende en besked:
En komplet liste over indbyggede makroer til den seneste tilgængelige version er tilgængelig her: makroreference
Desuden kan vi ved hjælp af plugins erklære vores egne makroer, men det er en anden historie.
Ud over de foruddefinerede ting kan vi erstatte værdierne af vores variabler (jeg brugte allerede dette i koden ovenfor). Lad os skabe ind Admin/Variables et par ting:
brug bare stien til den ønskede nøgle: {{ var.json.bot_config.bot.token }}.
Jeg vil bogstaveligt talt sige et ord og vise et skærmbillede om tilslutning. Alt er elementært her: på siden Admin/Connections vi opretter en forbindelse, tilføjer vores logins/adgangskoder og mere specifikke parametre der. Sådan her:
Adgangskoder kan krypteres (mere grundigt end standard), eller du kan udelade forbindelsestypen (som jeg gjorde for tg_main) - faktum er, at listen over typer er fastkablet i Airflow-modeller og kan ikke udvides uden at komme ind i kildekoderne (hvis jeg pludselig ikke googlede noget, så ret mig venligst), men intet vil forhindre os i at få kreditter bare ved at navn.
Du kan også lave flere forbindelser med samme navn: i dette tilfælde metoden BaseHook.get_connection(), som får os forbindelser ved navn, vil give tilfældig fra flere navnebrødre (det ville være mere logisk at lave Round Robin, men lad os lade det være på Airflow-udviklernes samvittighed).
Variables og Connections er bestemt fede værktøjer, men det er vigtigt ikke at miste balancen: hvilke dele af dine flows du gemmer i selve koden, og hvilke dele du giver til Airflow til opbevaring. På den ene side kan det være praktisk hurtigt at ændre værdien, for eksempel en postboks, gennem brugergrænsefladen. Til gengæld er dette stadig en tilbagevenden til det museklik, som vi (jeg) gerne ville af med.
At arbejde med forbindelser er en af opgaverne kroge. Generelt er Airflow-kroge punkter til at forbinde den til tredjepartstjenester og biblioteker. F.eks, JiraHook vil åbne en klient, så vi kan interagere med Jira (du kan flytte opgaver frem og tilbage), og ved hjælp af SambaHook du kan skubbe en lokal fil til smb-punkt.
Parser den tilpassede operator
Og vi kom tæt på at se på, hvordan det er lavet TelegramBotSendMessage
Kode commons/operators.py med den faktiske operatør:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Her, som alt andet i Airflow, er alt meget enkelt:
Nedarvet fra BaseOperator, som implementerer en del Airflow-specifikke ting (se på din fritid)
Deklarerede felter template_fields, hvor Jinja vil lede efter makroer at behandle.
Arrangeret de rigtige argumenter for __init__(), indstil standardindstillingerne, hvor det er nødvendigt.
Vi glemte heller ikke initialiseringen af forfaderen.
Åbnede den tilsvarende krog TelegramBotHookmodtaget et klientobjekt fra det.
Tilsidesat (omdefineret) metode BaseOperator.execute(), som Airfow vil rykke, når tiden kommer til at starte operatøren - i den implementerer vi hovedhandlingen og glemmer at logge ind. (Vi logger forresten ind lige ind stdout и stderr - Luftstrømmen vil opsnappe alt, pakke det smukt ind, nedbryde det, hvor det er nødvendigt.)
Lad os se, hvad vi har commons/hooks.py. Den første del af filen, med selve krogen:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Jeg ved ikke engang, hvad jeg skal forklare her, jeg vil bare bemærke de vigtige punkter:
Vi arver, tænk på argumenterne - i de fleste tilfælde vil det være ét: conn_id;
Tilsidesættende standardmetoder: Jeg begrænsede mig selv get_conn(), hvor jeg får forbindelsesparametrene ved navn og får bare afsnittet extra (dette er et JSON-felt), hvori jeg (ifølge mine egne instruktioner!) sætter Telegram-bot-tokenet: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Jeg opretter en instans af vores TelegramBot, hvilket giver det et specifikt token.
Det er alt. Du kan få en klient fra en krog ved hjælp af TelegramBotHook().clent eller TelegramBotHook().get_conn().
Og den anden del af filen, hvor jeg laver en microwrapper til Telegram REST API, for ikke at trække det samme python-telegram-bot for én metode sendMessage.
Den korrekte måde er at lægge det hele sammen: TelegramBotSendMessage, TelegramBotHook, TelegramBot - i pluginnet, læg et offentligt lager og giv det til Open Source.
Mens vi studerede alt dette, lykkedes det for vores rapportopdateringer at mislykkes og sendte mig en fejlmeddelelse i kanalen. Jeg vil tjekke om det er forkert...
Noget gik i stykker i vores doge! Var det ikke det, vi havde forventet? Nemlig!
Skal du hælde?
Føler du, at jeg gik glip af noget? Det ser ud til, at han lovede at overføre data fra SQL Server til Vertica, og så tog han det og flyttede fra emnet, slyngelen!
Denne grusomhed var bevidst, jeg var simpelthen nødt til at tyde noget terminologi for dig. Nu kan du gå videre.
Vores plan var denne:
Gør dag
Generer opgaver
Se hvor smukt alt er
Tildel sessionsnumre til fyld
Hent data fra SQL Server
Læg data ind i Vertica
Saml statistik
Så for at få det hele op at køre, lavede jeg en lille tilføjelse til vores docker-compose.yml:
Vertica som vært dwh med de fleste standardindstillinger,
tre forekomster af SQL Server,
vi udfylder databaserne i sidstnævnte med nogle data (under ingen omstændigheder skal du ikke se nærmere på mssql_init.py!)
Vi starter alt det gode ved hjælp af en lidt mere kompliceret kommando end sidste gang:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Hvad vores mirakel randomizer genererede, kan du bruge varen Data Profiling/Ad Hoc Query:
Det vigtigste er ikke at vise det til analytikere
uddybe ETL sessioner Det vil jeg ikke, alt er trivielt der: vi laver en base, der er et skilt i den, vi pakker alt ind med en kontekstmanager, og nu gør vi dette:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Tiden er inde indsamle vores data fra vores halvandet hundrede borde. Lad os gøre dette ved hjælp af meget uhøjtidelige linjer:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Ved hjælp af en krog får vi fra Airflow pymssql-Opret forbindelse
Lad os erstatte en begrænsning i form af en dato i anmodningen - den vil blive kastet ind i funktionen af skabelonmotoren.
Foder vores anmodning pandashvem får os DataFrame - det vil være nyttigt for os i fremtiden.
Jeg bruger substitution {dt} i stedet for en anmodningsparameter %s ikke fordi jeg er en ond Pinocchio, men fordi pandas ikke kan klare pymssql og smutter det sidste params: Listselvom han virkelig gerne vil tuple.
Bemærk også, at udvikleren pymssql besluttede ikke at støtte ham mere, og det er tid til at flytte ud pyodbc.
Lad os se, hvad Airflow fyldte argumenterne for vores funktioner med:
Hvis der ikke er nogen data, så nytter det ikke noget at fortsætte. Men det er også mærkeligt at betragte fyldet som vellykket. Men dette er ikke en fejl. A-ah-ah, hvad skal man gøre?! Og her er hvad:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException vil fortælle Airflow, at der ikke er nogen fejl, men vi springer opgaven over. Grænsefladen vil ikke have en grøn eller rød firkant, men lyserød.
På udsalget opretter vi målpladen manuelt. Her tillod jeg mig selv en lille maskine:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
jeg bruger VerticaOperator() Jeg opretter et databaseskema og en tabel (hvis de ikke allerede eksisterer, selvfølgelig). Det vigtigste er at arrangere afhængighederne korrekt:
- Nå, - sagde den lille mus, - er det nu ikke
Er du overbevist om, at jeg er det mest forfærdelige dyr i skoven?
Julia Donaldson, The Gruffalo
Jeg tror, hvis mine kolleger og jeg havde en konkurrence: hvem vil hurtigt skabe og lancere en ETL-proces fra bunden: de med deres SSIS og en mus og mig med Airflow ... Og så ville vi også sammenligne den nemme vedligeholdelse ... Wow, jeg tror, du vil være enig i, at jeg vil slå dem på alle fronter!
Hvis det er lidt mere seriøst, så gjorde Apache Airflow - ved at beskrive processer i form af programkode - mit arbejde mere mere behageligt og behageligt.
Dens ubegrænsede udvidelsesmuligheder, både med hensyn til plug-ins og tilbøjelighed til skalerbarhed, giver dig mulighed for at bruge Airflow i næsten ethvert område: selv i den fulde cyklus med indsamling, forberedelse og behandling af data, selv ved opsendelse af raketter (til Mars, eller Rute).
Delfinale, reference og information
Raken har vi samlet til dig
start_date. Ja, dette er allerede et lokalt meme. Via Dougs hovedargument start_date alle bestå. Kort sagt, hvis du angiver i start_date nuværende dato, og schedule_interval - en dag, så starter DAG tidligst i morgen.
start_date = datetime(2020, 7, 7, 0, 1, 2)
Og ikke flere problemer.
Der er en anden runtime fejl forbundet med det: Task is missing the start_date parameter, hvilket oftest indikerer, at du har glemt at binde til dag-operatøren.
Alt sammen på én maskine. Ja, og baser (selve luftstrømmen og vores belægning), og en webserver og en planlægger og arbejdere. Og det virkede endda. Men med tiden voksede antallet af opgaver til tjenester, og da PostgreSQL begyndte at reagere på indekset på 20 s i stedet for 5 ms, tog vi det og bar det væk.
LocalExecutor. Ja, vi sidder stadig på den, og vi er allerede nået til kanten af afgrunden. LocalExecutor har været nok for os indtil videre, men nu er det tid til at udvide med mindst én arbejder, og vi bliver nødt til at arbejde hårdt for at flytte til CeleryExecutor. Og i lyset af det faktum, at du kan arbejde med det på én maskine, forhindrer intet dig i at bruge Selleri, selv på en server, som "naturligvis aldrig vil gå i produktion, ærligt talt!"
Ikke-brug indbyggede værktøjer:
Tilslutninger at gemme serviceoplysninger,
SLA frøkener at reagere på opgaver, der ikke lykkedes til tiden,
XCom til metadataudveksling (sagde jeg metadata!) mellem dag opgaver.
Misbrug af mail. Hvad kan jeg sige? Der blev oprettet alarmer for alle gentagelser af faldne opgaver. Nu har mit arbejde Gmail >90 e-mails fra Airflow, og webmail-mundkurven nægter at hente og slette mere end 100 ad gangen.
For at vi kan arbejde endnu mere med hovedet og ikke med hænderne, har Airflow forberedt os dette:
REST-API - han har stadig status som Eksperimentel, hvilket ikke forhindrer ham i at arbejde. Med den kan du ikke kun få information om dags og opgaver, men også stoppe/starte en dag, oprette et DAG Run eller en pulje.
CLI - Mange værktøjer er tilgængelige via kommandolinjen, som ikke bare er ubelejlige at bruge gennem WebUI, men som generelt er fraværende. For eksempel:
backfill nødvendig for at genstarte opgaveforekomster.
For eksempel kom analytikere og sagde: ”Og du, kammerat, har noget sludder i dataene fra 1. til 13. januar! Reparer det, fix det, fix det, fix det!" Og du er sådan en kogeplade:
run, som giver dig mulighed for at køre én instansopgave og endda score på alle afhængigheder. Desuden kan du køre det via LocalExecutor, også selvom du har en Selleriklynge.
Gør stort set det samme test, kun også i baser skriver intet.
connections tillader masseoprettelse af forbindelser fra skallen.
python api - en ret hardcore måde at interagere på, som er beregnet til plugins, og ikke myldrer i det med små hænder. Men hvem skal forhindre os i at gå til /home/airflow/dags, løb ipython og begynde at rode rundt? Du kan f.eks. eksportere alle forbindelser med følgende kode:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Tilslutning til Airflow-metadatabasen. Jeg anbefaler ikke at skrive til det, men at få opgavetilstande for forskellige specifikke metrics kan være meget hurtigere og nemmere end at bruge nogen af API'erne.
Lad os sige, at ikke alle vores opgaver er idempotente, men de kan nogle gange falde, og det er normalt. Men et par blokeringer er allerede mistænkelige, og det ville være nødvendigt at tjekke.
Pas på SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
RЎSЃS <P "RєRё
Og selvfølgelig er de første ti links fra udstedelsen af Google indholdet af Airflow-mappen fra mine bogmærker.
Apache Airflow Dokumentation - selvfølgelig skal vi starte med kontoret. dokumentation, men hvem læser instruktionerne?
Best Practices - Nå, læs i det mindste anbefalingerne fra skaberne.
Airflow UI - selve begyndelsen: brugergrænsefladen i billeder
Zen af Python og Apache Airflow - implicit DAG-videresendelse, kontekstindsættelse af funktioner, igen om afhængigheder, og også om at springe opgavelanceringer over.