Apache Airflow: olajšanje ETL

Živijo, sem Dmitry Logvinenko - podatkovni inženir oddelka za analitiko skupine podjetij Vezet.

Povedal vam bom o čudovitem orodju za razvoj ETL procesov - Apache Airflow. Toda Airflow je tako vsestranski in večplasten, da si ga morate podrobneje ogledati, tudi če niste vključeni v podatkovne tokove, vendar morate občasno zagnati kakršne koli procese in spremljati njihovo izvajanje.

In ja, ne bom samo povedal, ampak tudi pokazal: program ima veliko kode, posnetkov zaslona in priporočil.

Apache Airflow: olajšanje ETL
Kaj običajno vidite, ko v Googlu poiščete besedo Airflow / Wikimedia Commons

Kazalo

Predstavitev

Apache Airflow je tako kot Django:

  • napisano v pythonu
  • obstaja odlična skrbniška plošča,
  • razširljiv za nedoločen čas

- samo boljša in je bila narejena za čisto druge namene in sicer (kot piše pred kat):

  • izvajanje in spremljanje opravil na neomejenem številu strojev (kolikor vam Celery / Kubernetes in vaša vest dopuščata)
  • z dinamičnim generiranjem poteka dela iz Python kode, ki jo je zelo enostavno napisati in razumeti
  • in zmožnost medsebojnega povezovanja poljubnih podatkovnih baz in API-jev z uporabo že pripravljenih komponent in doma narejenih vtičnikov (kar je izjemno preprosto).

Apache Airflow uporabljamo takole:

  • podatke zbiramo iz različnih virov (veliko SQL Server in PostgreSQL instanc, razni API-ji z ​​metriko aplikacij, tudi 1C) v DWH in ODS (imamo Vertico in Clickhouse).
  • kako napreden cron, ki zažene procese konsolidacije podatkov na ODS in spremlja njihovo vzdrževanje.

Do nedavnega je naše potrebe pokrival en majhen strežnik z 32 jedri in 50 GB RAM-a. V Airflowu to deluje:

  • bolj 200 dag (pravzaprav delovni tokovi, v katere smo stlačili naloge),
  • v vsakem povprečno 70 nalog,
  • ta dobrota se začne (tudi v povprečju) enkrat na uro.

In o tem, kako smo se širili, bom pisal spodaj, zdaj pa definirajmo über-problem, ki ga bomo rešili:

Obstajajo trije izvirni strežniki SQL, vsak s 50 bazami podatkov – primerki enega projekta, imajo enako strukturo (skoraj povsod, mua-ha-ha), kar pomeni, da ima vsak tabelo Naročila (na srečo tabelo s tem ime se lahko potisne v katero koli podjetje). Podatke vzamemo tako, da dodamo servisna polja (source server, source database, ETL task ID) in jih naivno vržemo v, recimo, Vertico.

Gremo!

Glavni del, praktični (in malo teoretični)

Zakaj mi (in ti)

Ko so bila drevesa velika in sem bil preprost SQL-schik v eni ruski trgovini na drobno smo prevarali procese ETL, imenovane podatkovne tokove, z uporabo dveh orodij, ki sta nam na voljo:

  • Informatica Power Center - izjemno razpršen sistem, izjemno produktiven, z lastno strojno opremo, lastno različico. Izkoristil sem bog ne daj 1% njegovih zmogljivosti. Zakaj? No, najprej nas je ta vmesnik, nekje iz leta 380, mentalno pritiskal. Drugič, ta naprava je zasnovana za izjemno modne procese, besno ponovno uporabo komponent in druge zelo pomembne podjetniške trike. O tem, koliko stane, kot krilo Airbus AXNUMX / leto, ne bomo povedali ničesar.

    Pozor, slika zaslona lahko malce prizadene osebe, mlajše od 30 let

    Apache Airflow: olajšanje ETL

  • Integracijski strežnik SQL Server - tega tovariša smo uporabili v naših tokovih znotraj projekta. No, pravzaprav: SQL Server že uporabljamo in bilo bi nekako nesmiselno, da ne bi uporabljali njegovih ETL orodij. Vse v njem je dobro: vmesnik je lep in poročila o napredku ... Ampak to ni razlog, zakaj imamo radi programske izdelke, oh, ne zaradi tega. Različica dtsx (kar je XML z vozlišči, premešanimi ob shranjevanju) lahko, ampak kaj je smisel? Kaj pa izdelava paketa opravil, ki bo povlekel na stotine tabel z enega strežnika na drugega? Ja, kakšnih sto, od dvajsetih kosov ti bo odpadel kazalec, klikni na gumb miške. Ampak vsekakor izgleda bolj modno:

    Apache Airflow: olajšanje ETL

Vsekakor smo iskali izhode. Primer celo skoraj prišel do samonapisanega generatorja paketov SSIS ...

…in potem me je našla nova služba. In Apache Airflow me je na njem prehitel.

Ko sem ugotovil, da so opisi procesov ETL preprosta koda Python, preprosto nisem plesal od veselja. Tako so se podatkovni tokovi spreminjali in razlikovali, zlivanje tabel z eno samo strukturo iz več sto baz podatkov v en cilj pa je postalo stvar kode Python na enem in pol ali dveh 13-palčnih zaslonih.

Sestavljanje grozda

Ne uredimo popolnoma otroškega vrtca in ne govorimo o povsem očitnih stvareh, kot je namestitev Airflowa, vaše izbrane baze podatkov, Celeryja in drugih primerov, opisanih v dokih.

Da lahko takoj začnemo s poskusi, sem skiciral docker-compose.yml v katerem:

  • Dejansko dvignimo Pretok zraka: razporejevalnik, spletni strežnik. Tam se bo vrtel tudi Flower, ki bo spremljal naloge Celeryja (ker je bil že potisnjen v apache/airflow:1.10.10-python3.7, vendar nas ne moti)
  • PostgreSQL, v katerega bo Airflow zapisal svoje storitvene informacije (podatke o razporejevalniku, statistiko izvajanja itd.), Celery pa bo označil opravljene naloge;
  • Redis, ki bo deloval kot posrednik opravil za Celery;
  • Delavec zelene, ki se bo ukvarjal z neposrednim izvajanjem nalog.
  • V mapo ./dags dodali bomo naše datoteke z opisom dagov. Pobrali jih bodo sproti, tako da vam po vsakem kihanju ni treba žonglirati s celotnim kupom.

Ponekod koda v primerih ni v celoti prikazana (da ne bi obremenjevala besedila), ponekod pa je v procesu spremenjena. Celotne primere delovne kode lahko najdete v repozitoriju https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Opombe:

  • Pri sestavljanju kompozicije sem se v veliki meri oprl na znano podobo puckel/docker-airflow - obvezno preverite. Mogoče v življenju ne potrebuješ ničesar drugega.
  • Vse nastavitve pretoka zraka so na voljo ne samo prek airflow.cfg, temveč tudi preko spremenljivk okolja (zahvaljujoč razvijalcem), kar sem zlonamerno izkoristil.
  • Seveda ni pripravljen za proizvodnjo: namenoma nisem dal srčnih utripov na posode, nisem se obremenjeval z varnostjo. Toda naredil sem minimum, ki je primeren za naše eksperimentatorje.
  • Upoštevajte, da:
    • Mapa dag mora biti dostopna tako razporejevalniku kot delavcem.
    • Enako velja za vse knjižnice tretjih oseb - vse morajo biti nameščene na strojih z razporejevalnikom in delavci.

No, zdaj je preprosto:

$ docker-compose up --scale worker=3

Ko se vse dvigne, si lahko ogledate spletne vmesnike:

Osnovni pojmi

Če niste ničesar razumeli v vseh teh "dagih", potem je tukaj kratek slovar:

  • Scheduler - najpomembnejši stric v Airflowu, ki nadzoruje, da roboti trdo delajo, in ne človek: spremlja urnik, posodablja dagove, zaganja naloge.

    Na splošno je imel v starejših različicah težave s pomnilnikom (ne, ne amnezija, ampak puščanje) in parameter legacy je celo ostal v konfiguracijah run_duration — njegov interval ponovnega zagona. Ampak zdaj je vse v redu.

  • DAG (aka "dag") - "usmerjeni aciklični graf", vendar bo taka definicija povedala malo ljudem, vendar je v resnici vsebnik za naloge, ki medsebojno delujejo (glej spodaj) ali analog paketa v SSIS in poteka dela v Informatici .

    Poleg dagov lahko še vedno obstajajo poddagi, vendar do njih najverjetneje ne bomo prišli.

  • DAG Run - inicializiran dag, ki mu je dodeljen lasten execution_date. Dagrani istega daga lahko delujejo vzporedno (če ste svoje naloge naredili idempotentne, seveda).
  • Operater so deli kode, odgovorni za izvedbo določenega dejanja. Obstajajo tri vrste operaterjev:
    • ukrepanjekot naš najljubši PythonOperator, ki lahko izvede katero koli (veljavno) kodo Python;
    • prenos, ki prenašajo podatke iz kraja v kraj, recimo MsSqlToHiveTransfer;
    • senzor po drugi strani pa vam bo omogočilo, da reagirate ali upočasnite nadaljnjo izvedbo daga, dokler ne pride do dogodka. HttpSensor lahko potegne navedeno končno točko in ko čaka želeni odgovor, začne prenos GoogleCloudStorageToS3Operator. Radovedni um se bo vprašal: »Zakaj? Navsezadnje lahko delaš ponovitve kar v operaterju!« In potem, da ne bi zamašili skupine nalog s suspendiranimi operaterji. Senzor se zažene, preveri in umre pred naslednjim poskusom.
  • Naloga - deklarirani operatorji, ne glede na vrsto in pripeti na dag, so povišani v rang naloge.
  • primer naloge - ko se je generalni načrtovalec odločil, da je čas, da pošlje naloge v boj na delavce izvajalce (takoj na kraju samem, če uporabimo LocalExecutor ali v oddaljeno vozlišče v primeru CeleryExecutor), jim dodeli kontekst (tj. nabor spremenljivk - izvedbenih parametrov), razširi predloge ukazov ali poizvedb in jih združi.

Ustvarjamo naloge

Najprej orišemo splošno shemo našega douga, nato pa se bomo vse bolj poglabljali v podrobnosti, ker uporabljamo nekaj netrivialnih rešitev.

Torej, v svoji najpreprostejši obliki bo tak dag izgledal takole:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Naj razumemo:

  • Najprej uvozimo potrebne lib in nekaj drugega;
  • sql_server_ds - Je List[namedtuple[str, str]] z imeni povezav iz Airflow Connections in bazami podatkov, iz katerih bomo vzeli svoj krožnik;
  • dag - napoved našega daga, ki mora biti nujno v globals(), sicer ga Airflow ne bo našel. Doug mora tudi povedati:
    • kako mu je ime orders - to ime se bo nato pojavilo v spletnem vmesniku,
    • da bo osmi julij delal od polnoči,
    • in mora delovati približno vsakih 6 ur (za močne fante tukaj namesto timedelta() dopustno cron-linija 0 0 0/6 ? * * *, za manj kul – izraz kot @daily);
  • workflow() bo opravil glavno delo, vendar ne zdaj. Zaenkrat bomo naš kontekst samo odložili v dnevnik.
  • In zdaj preprosta čarovnija ustvarjanja nalog:
    • tečemo skozi svoje vire;
    • inicializirati PythonOperator, ki bo izvršil našo lutko workflow(). Ne pozabite določiti edinstvenega (znotraj daga) imena opravila in povezati sam dag. Zastava provide_context po drugi strani pa bo v funkcijo dodal dodatne argumente, ki jih bomo skrbno zbrali z uporabo **context.

Za zdaj je to vse. Kaj imamo:

  • nov dag v spletnem vmesniku,
  • sto in pol nalog, ki se bodo izvajale vzporedno (če to dopuščajo nastavitve Airflow, Celery in zmogljivost strežnika).

No, skoraj sem dobil.

Apache Airflow: olajšanje ETL
Kdo bo namestil odvisnosti?

Da bi poenostavil celotno stvar, sem se zajebal docker-compose.yml obravnavati requirements.txt na vseh vozliščih.

Zdaj ga ni več:

Apache Airflow: olajšanje ETL

Sivi kvadratki so primerki opravil, ki jih obdeluje razporejevalnik.

Malo počakamo, naloge pograbijo delavci:

Apache Airflow: olajšanje ETL

Zeleni so seveda uspešno opravili svoje delo. Rdeči niso preveč uspešni.

Mimogrede, na našem produktu ni mape ./dags, ni sinhronizacije med stroji - vsi dagi ležijo noter git na našem Gitlabu, Gitlab CI pa ob združitvi distribuira posodobitve strojem master.

Nekaj ​​malega o Cvetki

Medtem ko delavci mlatijo naše dude, se spomnimo še na eno orodje, ki nam lahko nekaj pokaže – Rožo.

Čisto prva stran s povzetkom informacij o delovnih vozliščih:

Apache Airflow: olajšanje ETL

Najbolj intenzivna stran z nalogami, ki so šle na delo:

Apache Airflow: olajšanje ETL

Najbolj dolgočasna stran s statusom našega posrednika:

Apache Airflow: olajšanje ETL

Najsvetlejša stran je z grafi stanja nalog in časom njihove izvedbe:

Apache Airflow: olajšanje ETL

Naložimo premalo obremenjeno

Torej, vse naloge so uspele, lahko odnesete ranjence.

Apache Airflow: olajšanje ETL

In ranjenih je bilo veliko – iz takšnih ali drugačnih razlogov. V primeru pravilne uporabe Airflow prav ti kvadratki pomenijo, da podatki zagotovo niso prispeli.

Gledati morate dnevnik in znova zagnati padle primerke opravil.

S klikom na poljuben kvadrat bomo videli dejanja, ki so nam na voljo:

Apache Airflow: olajšanje ETL

Lahko vzamete in očistite padle. To pomeni, da pozabimo, da tam nekaj ni uspelo, in ista naloga primerka bo šla v razporejevalnik.

Apache Airflow: olajšanje ETL

Jasno je, da to početje z miško z vsemi rdečimi kvadratki ni zelo humano - tega ne pričakujemo od Airflowa. Seveda imamo orožje za množično uničevanje: Browse/Task Instances

Apache Airflow: olajšanje ETL

Izberimo vse naenkrat in ponastavimo na nič, kliknite pravilen element:

Apache Airflow: olajšanje ETL

Naši taksiji po čiščenju izgledajo tako (že čakajo na urnika, da jih razporedi):

Apache Airflow: olajšanje ETL

Povezave, kljuke in druge spremenljivke

Čas je, da pogledamo naslednji DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Ali je že vsakdo posodobil poročilo? To je spet ona: obstaja seznam virov, od koder je mogoče dobiti podatke; obstaja seznam, kam dati; ne pozabi zatrobiti, ko se je vse zgodilo ali pokvarilo (no, ne gre za nas, ne).

Ponovno preglejmo datoteko in poglejmo nove nejasne stvari:

  • from commons.operators import TelegramBotSendMessage - nič nam ne preprečuje, da bi naredili svoje operaterje, kar smo izkoristili tako, da smo naredili majhen ovoj za pošiljanje sporočil Unblocked. (Več o tem operaterju bomo govorili spodaj);
  • default_args={} - dag lahko distribuira iste argumente vsem svojim operaterjem;
  • to='{{ var.value.all_the_kings_men }}' - polje to ne bomo imeli trdo kodiranih, ampak dinamično generiranih z uporabo Jinje in spremenljivke s seznamom e-poštnih sporočil, ki sem jih skrbno vstavil Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — pogoj za zagon pogona. V našem primeru bo pismo letelo do šefov le, če bodo vse odvisnosti delovale uspešno;
  • tg_bot_conn_id='tg_main' - argumenti conn_id sprejmejo ID-je povezav, v katerih ustvarimo Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - sporočila v Telegramu bodo odletela le, če bodo padle naloge;
  • task_concurrency=1 - prepovedujemo hkratni zagon več primerkov ene naloge. V nasprotnem primeru bomo dobili hkratno lansiranje več VerticaOperator (gleda na eno mizo);
  • report_update >> [email, tg] - vse VerticaOperator konvergirajo pri pošiljanju pisem in sporočil, takole:
    Apache Airflow: olajšanje ETL

    Ker pa imajo operaterji obveščevalcev različne pogoje za zagon, bo deloval samo eden. V drevesnem pogledu je vse videti nekoliko manj vizualno:
    Apache Airflow: olajšanje ETL

Povedal bom nekaj besed o makri in njihovi prijatelji - spremenljivke.

Makri so ogradne oznake Jinja, ki lahko nadomestijo različne uporabne informacije v argumentih operaterja. Na primer takole:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} se bo razširil na vsebino spremenljivke konteksta execution_date v formatu YYYY-MM-DD: 2020-07-14. Najboljše pri tem je, da so kontekstne spremenljivke prikovane na določeno instanco opravila (kvadrat v drevesnem pogledu) in ob ponovnem zagonu se bodo ograde razširile na enake vrednosti.

Dodeljene vrednosti si lahko ogledate z gumbom Upodobljeno na vsaki instanci opravila. Takole poteka naloga s pošiljanjem pisma:

Apache Airflow: olajšanje ETL

In tako pri nalogi s pošiljanjem sporočila:

Apache Airflow: olajšanje ETL

Celoten seznam vgrajenih makrov za zadnjo razpoložljivo različico je na voljo tukaj: referenca makrov

Še več, s pomočjo vtičnikov lahko deklariramo lastne makre, vendar je to že druga zgodba.

Poleg vnaprej določenih stvari lahko nadomestimo vrednosti naših spremenljivk (to sem že uporabil v zgornji kodi). Ustvarjajmo v Admin/Variables nekaj stvari:

Apache Airflow: olajšanje ETL

Vse, kar lahko uporabite:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Vrednost je lahko skalar ali pa tudi JSON. V primeru JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

samo uporabite pot do želenega ključa: {{ var.json.bot_config.bot.token }}.

Povedal bom dobesedno eno besedo in pokazal en posnetek zaslona o tem povezave. Tukaj je vse elementarno: na strani Admin/Connections ustvarimo povezavo, tam dodamo svoje prijave / gesla in bolj specifične parametre. Všečkaj to:

Apache Airflow: olajšanje ETL

Gesla so lahko šifrirana (bolj temeljito kot privzeto) ali pa izpustite vrsto povezave (kot sem naredil za tg_main) - dejstvo je, da je seznam vrst v modelih Airflow vgrajen in ga ni mogoče razširiti, ne da bi se poglobili v izvorne kode (če nenadoma nisem česa poguglal, me prosim popravite), vendar nas nič ne bo ustavilo pri pridobivanju dobropisov ime.

Ustvarite lahko tudi več povezav z istim imenom: v tem primeru metoda BaseHook.get_connection(), ki nam dobi povezave po imenu, bo dal naključen iz več soimenjakov (bolj logično bi bilo narediti Round Robin, a pustimo to na vesti Airflow razvijalcev).

Spremenljivke in povezave so vsekakor kul orodja, vendar je pomembno, da ne izgubite ravnovesja: katere dele vaših tokov shranite v sami kodi in katere dele daste v shranjevanje Airflowu. Po eni strani je lahko priročno hitro spremeniti vrednost, na primer poštni predal, prek uporabniškega vmesnika. Po drugi strani pa je to še vedno vrnitev na klik z miško, ki smo se ga (sem) želeli znebiti.

Delo s povezavami je ena od nalog kavlji. Na splošno so kljuke Airflow točke za povezovanje s storitvami in knjižnicami tretjih oseb. npr. JiraHook nam bo odprl odjemalca za interakcijo z Jiro (naloge lahko premikate naprej in nazaj) in s pomočjo SambaHook lahko potisnete lokalno datoteko smb-točka.

Razčlenjevanje operaterja po meri

In približali smo se temu, da pogledamo, kako je narejen TelegramBotSendMessage

Koda: commons/operators.py z dejanskim operaterjem:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Tukaj je, kot vse drugo v Airflowu, vse zelo preprosto:

  • Podedovano od BaseOperator, ki izvaja kar nekaj stvari, specifičnih za Airflow (poglejte svoj prosti čas)
  • Deklarirana polja template_fields, v katerem bo Jinja iskal makre za obdelavo.
  • Uredil prave argumente za __init__(), po potrebi nastavite privzete vrednosti.
  • Pozabili pa nismo niti na inicializacijo prednika.
  • Odprl ustrezen kavelj TelegramBotHookje od njega prejel predmet stranke.
  • Preglasena (redefinirana) metoda BaseOperator.execute(), ki ga bo Airfow trznil, ko bo prišel čas za zagon operaterja - v njem bomo izvedli glavno dejanje, pri čemer se bomo pozabili prijaviti. (Mimogrede se prijavimo takoj stdout и stderr - Pretok zraka bo vse prestregel, lepo zavil, razgradil, kjer je treba.)

Poglejmo, kaj imamo commons/hooks.py. Prvi del datoteke s samim kavljem:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Sploh ne vem, kaj bi tukaj razložil, omenil bom le pomembne točke:

  • Podedujemo, razmislite o argumentih - v večini primerov bo eden: conn_id;
  • Preglasitev standardnih metod: omejil sem se get_conn(), v katerem dobim parametre povezave po imenu in samo razdelek extra (to je polje JSON), v katerega sem (po lastnih navodilih!) vstavil Telegram bot token: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ustvarim naš primerek TelegramBot, ki mu daje določen žeton.

To je vse. Stranko lahko dobite iz kljuke z uporabo TelegramBotHook().clent ali TelegramBotHook().get_conn().

In drugi del datoteke, v kateri naredim microwrapper za Telegram REST API, da ne vlečem istega python-telegram-bot za eno metodo sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Pravilen način je, da vse seštejete: TelegramBotSendMessage, TelegramBotHook, TelegramBot - v vtičniku, postavite v javno skladišče in ga dajte odprtokodnemu sistemu.

Medtem ko smo preučevali vse to, so naše posodobitve poročil uspešno spodletele in mi poslale sporočilo o napaki v kanalu. Grem preverit, če je kaj narobe...

Apache Airflow: olajšanje ETL
V našem dožu se je nekaj zalomilo! Ali ni to tisto, kar smo pričakovali? točno tako!

Boste natočili?

Se vam zdi, da sem kaj zamudil? Zdi se, da je obljubil prenos podatkov iz SQL Serverja v Vertico, potem pa je vzel in se premaknil s teme, podlež!

Ta grozodejstvo je bilo namerno, preprosto sem vam moral razvozlati nekaj terminologije. Zdaj lahko greš dlje.

Naš načrt je bil tak:

  1. Naredi dag
  2. Ustvarite naloge
  3. Poglejte, kako lepo je vse
  4. Polnilam dodelite številke sej
  5. Pridobite podatke iz strežnika SQL
  6. Vnesite podatke v Vertico
  7. Zbirajte statistiko

Torej, da bi vse to začelo delovati, sem naredil majhen dodatek k našemu docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Tam dvignemo:

  • Vertica kot gostitelj dwh z najbolj privzetimi nastavitvami,
  • trije primerki SQL Serverja,
  • baze podatkov v slednjem napolnimo z nekaterimi podatki (v nobenem primeru ne poglejte v mssql_init.py!)

Vse dobro zaženemo s pomočjo nekoliko bolj zapletenega ukaza kot zadnjič:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Uporabite lahko predmet, kar je ustvaril naš čudežni naključni razporeditelj Data Profiling/Ad Hoc Query:

Apache Airflow: olajšanje ETL
Glavna stvar je, da tega ne pokažete analitikom

podrobneje opisati ETL seje Ne bom, tam je vse trivialno: naredimo osnovo, v njej je znak, vse zavijemo z upraviteljem konteksta in zdaj naredimo tole:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Prišel je čas zbirajo naše podatke z naših sto in pol miz. Naredimo to s pomočjo zelo nezahtevnih vrstic:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. S pomočjo kljuke dobimo iz Airflow pymssql-poveži
  2. V zahtevo nadomestimo omejitev v obliki datuma - v funkcijo jo bo vrgel mehanizem predloge.
  3. Hranjenje naše zahteve pandaskdo nas bo dobil DataFrame - nam bo koristilo v prihodnosti.

Uporabljam zamenjavo {dt} namesto parametra zahteve %s ne zato, ker sem zloben Ostržek, ampak zato, ker pandas ne zmorem pymssql in zdrsne zadnji params: Listčeprav si res želi tuple.
Upoštevajte tudi, da razvijalec pymssql odločil, da ga ne bo več podpiral, in čas je, da se izseli pyodbc.

Poglejmo, s čim je Airflow napolnil argumente naših funkcij:

Apache Airflow: olajšanje ETL

Če podatkov ni, potem nima smisla nadaljevati. Čudno pa je tudi, da je polnjenje uspešno. Vendar to ni napaka. A-ah-ah, kaj storiti?! In tukaj je to:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException pove Airflow, da ni napak, vendar nalogo preskočimo. Vmesnik ne bo imel zelenega ali rdečega kvadrata, ampak roza.

Vrzimo naše podatke več stolpcev:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

In sicer:

  • Baza podatkov, iz katere smo prevzemali naročila,
  • ID naše poplavne seje (drugačen bo za vsako nalogo),
  • Hash iz vira in ID naročila - tako da imamo v končni bazi (kjer je vse zlito v eno tabelo) edinstven ID naročila.

Ostaja še predzadnji korak: vse prelijemo v Vertico. In nenavadno je, da je eden najbolj spektakularnih in učinkovitih načinov za to prek CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Izdelujemo poseben sprejemnik StringIO.
  2. pandas bo prijazno postavil naš DataFrame v obliki CSV- črte.
  3. S kavljem odpremo povezavo do naše najljubše Vertice.
  4. In zdaj s pomočjo copy() pošljite naše podatke neposredno Vertiki!

Od voznika bomo vzeli, koliko vrstic je bilo zapolnjenih, in povedali upravitelju seje, da je vse v redu:

session.loaded_rows = cursor.rowcount
session.successful = True

To je vse.

Pri prodaji tarčno ploščo izdelamo ročno. Tukaj sem si dovolil majhen stroj:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

uporabljam VerticaOperator() Ustvarim shemo baze podatkov in tabelo (če seveda še ne obstajata). Glavna stvar je pravilno urediti odvisnosti:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Seštejemo

- No, - je rekla mala miška, - kajne?
Ste prepričani, da sem najstrašnejša žival v gozdu?

Julia Donaldson, Gruffalo

Mislim, da če bi s kolegi tekmovali: kdo bo hitro ustvaril in zagnal ETL proces iz nič: oni s svojim SSIS in miško in jaz z Airflow ... In potem bi primerjali tudi enostavnost vzdrževanja ... Vau, mislim, da se boste strinjali, da jih bom premagal na vseh frontah!

Če malo bolj resno, potem je Apache Airflow - z opisom procesov v obliki programske kode - opravil svoje delo veliko bolj udobno in prijetno.

Njegova neomejena razširljivost, tako v smislu vtičnikov kot nagnjenosti k razširljivosti, vam daje možnost uporabe Airflow na skoraj vseh področjih: tudi v celotnem ciklu zbiranja, priprave in obdelave podatkov, tudi pri izstrelitvi raket (na Mars, tečaj).

Končni del, reference in informacije

Grablje, ki smo jih zbrali za vas

  • start_date. Da, to je že lokalni meme. Preko Dougovega glavnega argumenta start_date vse mimo. Na kratko, če navedete v start_date trenutni datum in schedule_interval - en dan, potem se DAG začne jutri nič prej.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    In ni več težav.

    S tem je povezana še ena napaka med izvajanjem: Task is missing the start_date parameter, kar največkrat pomeni, da ste se pozabili povezati z operatorjem dag.

  • Vse na enem stroju. Da, in baze (sam Airflow in naš premaz), spletni strežnik in razporejevalnik ter delavci. In je celo delovalo. Toda sčasoma je število nalog za storitve naraslo in ko se je PostgreSQL začel odzivati ​​na indeks v 20 s namesto v 5 ms, smo ga vzeli in odnesli.
  • LocalExecutor. Da, še vedno sedimo na njem in smo že prišli do roba prepada. Do sedaj nam je zadostoval LocalExecutor, zdaj pa je čas, da se razširimo z vsaj enim delavcem, za prehod na CeleryExecutor pa se bomo morali kar potruditi. In glede na dejstvo, da lahko z njim delate na enem stroju, vam nič ne preprečuje, da bi uporabili Celery tudi na strežniku, ki "seveda nikoli ne bo šel v proizvodnjo, iskreno!"
  • Neuporaba vgrajena orodja:
    • povezave za shranjevanje poverilnic storitve,
    • SLA zgreši odgovoriti na naloge, ki niso uspele pravočasno,
    • xcom za izmenjavo metapodatkov (sem rekel metapodatkov!) med opravili dag.
  • Zloraba pošte. No, kaj naj rečem? Nastavljena so bila opozorila za vse ponovitve padlih nalog. Zdaj ima moj službeni Gmail >90 e-poštnih sporočil Airflowa, gobec spletne pošte pa noče pobrati in izbrisati več kot 100 naenkrat.

Več pasti: Napake Apache Airflow

Več orodij za avtomatizacijo

Da bomo še več delali z glavo in ne z rokami, je Airflow za nas pripravil tole:

  • REST API - še vedno ima status Poskusni, kar mu ne preprečuje dela. Z njim ne morete samo pridobiti informacij o dagih in nalogah, ampak tudi ustaviti/zagnati dag, ustvariti DAG Run ali bazen.
  • CLI - v ukazni vrstici je na voljo veliko orodij, ki jih ni le neprijetno uporabljati prek spletnega uporabniškega vmesnika, ampak jih na splošno ni. Na primer:
    • backfill potrebno za ponovni zagon primerkov opravil.
      Prišli so na primer analitiki in rekli: »In vi, tovariš, imate v podatkih od 1. do 13. januarja neumnosti! Popravi, popravi, popravi, popravi!" In ti si tak hob:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Osnovna storitev: initdb, resetdb, upgradedb, checkdb.
    • run, ki vam omogoča, da zaženete nalogo enega primerka in celo ocenite vse odvisnosti. Poleg tega ga lahko zaženete prek LocalExecutor, tudi če imate grozd zelene.
    • Dela približno enako test, samo tudi v bazah ne piše nič.
    • connections omogoča množično ustvarjanje povezav iz lupine.
  • API Python - precej hardcore način interakcije, ki je namenjen vtičnikom in ne roji vanj z majhnimi rokami. Toda kdo nam brani, da ne gremo /home/airflow/dags, teči ipython in se začeti ubadati? Vse povezave lahko na primer izvozite z naslednjo kodo:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Povezovanje z metapodatkovno bazo Airflow. Ne priporočam pisanja vanj, vendar je pridobivanje stanj nalog za različne specifične metrike lahko veliko hitrejše in enostavnejše kot uporaba katerega koli API-ja.

    Recimo, da niso vse naše naloge idempotentne, lahko pa včasih padejo in to je normalno. A nekaj blokad je že sumljivih in bi bilo treba preveriti.

    Pazite SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

reference

In seveda, prvih deset povezav iz izdaje Google je vsebina mape Airflow iz mojih zaznamkov.

In povezave, uporabljene v članku:

Vir: www.habr.com