Apache Airflow: Olakšavanje ETL-a

Bok, ja sam Dmitry Logvinenko - inženjer podataka odjela analitike grupe tvrtki Vezet.

Reći ću vam o prekrasnom alatu za razvoj ETL procesa - Apache Airflow. Ali Airflow je toliko svestran i višestruk da bi ga trebali bolje pogledati čak i ako niste uključeni u protok podataka, ali imate potrebu povremeno pokretati bilo koje procese i pratiti njihovo izvršenje.

I da, neću samo reći, već i pokazati: program ima puno koda, snimaka zaslona i preporuka.

Apache Airflow: Olakšavanje ETL-a
Ono što obično vidite kada guglate riječ Airflow / Wikimedia Commons

pregled sadržaja

Uvod

Apache Airflow je poput Djanga:

  • napisano u pythonu
  • postoji odlična administratorska ploča,
  • proširivo na neodređeno vrijeme

- samo bolja, a napravljena je za sasvim druge svrhe, naime (kako piše prije kate):

  • pokretanje i praćenje zadataka na neograničenom broju strojeva (koliko Celery / Kubernetea i vaša savjest vam dopuste)
  • s dinamičkim generiranjem tijeka rada iz Python koda koji je vrlo jednostavan za pisanje i razumijevanje
  • i mogućnost međusobnog povezivanja bilo koje baze podataka i API-ja koristeći i gotove komponente i dodatke napravljene kod kuće (što je iznimno jednostavno).

Apache Airflow koristimo ovako:

  • prikupljamo podatke iz raznih izvora (mnogi SQL Server i PostgreSQL instance, razni API-ji s metrikom aplikacija, čak i 1C) u DWH i ODS (imamo Verticu i Clickhouse).
  • koliko napredan cron, koji pokreće procese konsolidacije podataka na ODS-u, te prati njihovo održavanje.

Donedavno je naše potrebe pokrivao jedan mali poslužitelj s 32 jezgre i 50 GB RAM-a. U Airflowu ovo funkcionira:

  • više 200 dag (zapravo tijek rada, u koji smo ubacili zadatke),
  • u svakom u prosjeku 70 zadataka,
  • ova dobrota počinje (također u prosjeku) jednom na sat.

A o tome kako smo se širili, pisat ću u nastavku, ali sada definirajmo über-problem koji ćemo rješavati:

Postoje tri izvorna SQL poslužitelja, svaki s 50 baza podataka - instanci jednog projekta, odnosno imaju istu strukturu (skoro posvuda, mua-ha-ha), što znači da svaki ima tablicu Narudžbe (srećom, tablicu s tim ime se može ugurati u bilo koji posao). Podatke uzimamo dodavanjem servisnih polja (source server, source database, ETL task ID) i naivno ih bacamo u, recimo, Verticu.

Idemo!

Glavni dio, praktični (i malo teorijski)

Zašto mi (i vi)

Kad je drveće bilo veliko, a ja jednostavan SQL-schik u jednoj ruskoj maloprodaji, prevarili smo ETL procese poznate kao tokovi podataka koristeći dva alata koja su nam dostupna:

  • Informatica Power Center - iznimno raširen sustav, iznimno produktivan, s vlastitim hardverom, vlastitim verzijama. Iskoristio sam ne daj Bože 1% njegovih mogućnosti. Zašto? Pa, prije svega, ovo sučelje, tamo negdje iz 380-ih, psihički nas je pritisnulo. Drugo, ova je naprava dizajnirana za iznimno otmjene procese, bijesnu ponovnu upotrebu komponenti i druge vrlo važne trikove za poduzeća. O tome koliko košta, poput krila Airbusa AXNUMX / godišnje, nećemo ništa reći.

    Pazite, screenshot može malo povrijediti osobe mlađe od 30 godina

    Apache Airflow: Olakšavanje ETL-a

  • SQL Server integracijski poslužitelj - koristili smo ovog druga u našim unutarprojektnim tokovima. Pa zapravo: već koristimo SQL Server i bilo bi nekako nerazumno ne koristiti njegove ETL alate. Sve je u njemu dobro: i sučelje je lijepo, i izvješća o napretku ... Ali to nije razlog zašto volimo softverske proizvode, oh, ne zbog ovoga. Verzija je dtsx (što je XML s čvorovima promiješanim prilikom spremanja) možemo, ali koja je svrha? Što kažete na izradu paketa zadataka koji će vući stotine tablica s jednog poslužitelja na drugi? Da, kakva stotka, kažiprst će vam otpasti od dvadeset komada, klikajući na tipku miša. Ali definitivno izgleda modernije:

    Apache Airflow: Olakšavanje ETL-a

Svakako smo tražili izlaze. Slučaj čak skoro došao do generatora SSIS paketa koji je sam napisao ...

…a onda me pronašao novi posao. I na njemu me pretekao Apache Airflow.

Kad sam saznao da su opisi ETL procesa jednostavan Python kod, jednostavno nisam plesao od sreće. Ovo je način na koji su se tokovi podataka verzionirali i razlikovali, a izlijevanje tablica s jednom strukturom iz stotina baza podataka u jedan cilj postalo je stvar Python koda na jednom i pol ili dva 13” zaslona.

Sastavljanje klastera

Nemojmo organizirati potpuno dječji vrtić, i ne govorimo o potpuno očiglednim stvarima ovdje, poput instaliranja Airflowa, odabrane baze podataka, Celeryja i drugih slučajeva opisanih u dokovima.

Kako bismo odmah mogli početi s eksperimentima, skicirao sam docker-compose.yml u kojem:

  • Hajdemo zapravo povisiti Protok zraka: Planer, Web poslužitelj. Flower će se također vrtjeti tamo kako bi nadgledao Celery zadatke (jer je već gurnut u apache/airflow:1.10.10-python3.7, ali nemamo ništa protiv)
  • PostgreSQL, u koji će Airflow upisivati ​​svoje servisne informacije (podatke o rasporedu, statistiku izvršenja itd.), a Celery označavati izvršene zadatke;
  • Redis, koji će djelovati kao broker zadataka za Celery;
  • Radnik celera, koji će biti angažiran u neposrednom izvršavanju zadataka.
  • U mapu ./dags mi ćemo dodati naše datoteke s opisom dagova. Oni će se pokupiti u hodu, tako da nema potrebe žonglirati cijelom hrpom nakon svakog kihanja.

Na nekim mjestima kod u primjerima nije u potpunosti prikazan (da ne zatrpavamo tekst), ali negdje je modificiran u procesu. Kompletni primjeri radnog koda mogu se pronaći u repozitoriju https://github.com/dm-logv/airflow-tutorial.

doker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Bilješke:

  • U sklapanju kompozicije uvelike sam se oslanjao na dobro poznatu sliku puckel/docker-protok zraka - svakako provjerite. Možda ti više ništa u životu i ne treba.
  • Sve postavke protoka zraka dostupne su ne samo putem airflow.cfg, ali i kroz varijable okoline (zahvaljujući programerima), što sam zlonamjerno iskoristio.
  • Naravno, nije spreman za proizvodnju: namjerno nisam stavio otkucaje srca na kontejnere, nisam se zamarao sigurnošću. Ali napravio sam minimum koji je prikladan za naše eksperimentatore.
  • Napomena:
    • Mapa dag mora biti dostupna i planeru i radnicima.
    • Isto vrijedi i za sve biblioteke trećih strana - sve moraju biti instalirane na strojevima s planerom i radnicima.

Pa, sada je jednostavno:

$ docker-compose up --scale worker=3

Nakon što se sve podigne, možete pogledati web sučelja:

Osnovni pojmovi

Ako niste ništa razumjeli u svim ovim "dagovima", evo kratkog rječnika:

  • Raspored - najvažniji ujak u Airflowu, koji kontrolira da roboti marljivo rade, a ne osoba: prati raspored, ažurira dagove, pokreće zadatke.

    Općenito, u starijim verzijama imao je problema s pamćenjem (ne, ne amnezija, već curenje), a naslijeđeni parametar čak je ostao u konfiguracijama run_duration — njegov interval ponovnog pokretanja. Ali sada je sve u redu.

  • DAG (aka "dag") - "usmjereni aciklički graf", ali takva će definicija malom broju ljudi reći, ali zapravo je to spremnik za zadatke koji međusobno komuniciraju (vidi dolje) ili analog paketa u SSIS-u i tijeka rada u Informatici .

    Osim znakova, još uvijek mogu postojati podznaci, ali do njih najvjerojatnije nećemo doći.

  • DAG Trčanje - inicijalizirani dag, koji je dodijeljen vlastiti execution_date. Dagrani istog daga mogu raditi paralelno (ako ste svoje zadatke učinili idempotentnim, naravno).
  • Operator su dijelovi koda odgovorni za izvođenje određene akcije. Postoje tri vrste operatora:
    • akcijskipoput našeg favorita PythonOperator, koji može izvršiti bilo koji (važeći) Python kod;
    • Transfer, koji prenose podatke s mjesta na mjesto, recimo, MsSqlToHiveTransfer;
    • senzor s druge strane, omogućit će vam da reagirate ili usporite daljnje izvođenje daga dok se događaj ne dogodi. HttpSensor može povući navedenu krajnju točku, a kada željeni odgovor čeka, započeti prijenos GoogleCloudStorageToS3Operator. Radoznali um će se zapitati: „Zašto? Uostalom, ponavljanja možete raditi izravno u operateru!” I onda, kako ne bi začepili skup zadataka suspendiranim operaterima. Senzor se pokreće, provjerava i umire prije sljedećeg pokušaja.
  • Zadatak - deklarirani operatori, bez obzira na tip, i pridruženi dagu promiču se u rang zadatka.
  • instanca zadatka - kada je generalni planer odlučio da je vrijeme da se zadaci pošalju u bitku na izvođačima-radnicima (na licu mjesta, ako koristimo LocalExecutor ili na udaljeni čvor u slučaju CeleryExecutor), dodjeljuje im kontekst (tj. skup varijabli - parametara izvršenja), proširuje predloške naredbi ili upita i objedinjuje ih.

Generiramo zadatke

Prvo, ocrtajmo opću shemu našeg douga, a zatim ćemo sve više zaroniti u detalje jer primjenjujemo neka netrivijalna rješenja.

Dakle, u svom najjednostavnijem obliku, takav dag će izgledati ovako:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Hajde da shvatimo:

  • Prvo uvozimo potrebne libs i nešto drugo;
  • sql_server_ds - Je List[namedtuple[str, str]] s nazivima priključaka iz Airflow Connections i bazama podataka iz kojih ćemo uzeti našu ploču;
  • dag - najava našeg daga, koji mora nužno biti in globals(), inače ga Airflow neće pronaći. Doug također treba reći:
    • kako se on zove orders - ovo će se ime tada pojaviti u web sučelju,
    • da će raditi od ponoći osmog srpnja,
    • i trebao bi raditi, otprilike svakih 6 sati (za žestoke momke ovdje umjesto timedelta() dopustiv cron-crta 0 0 0/6 ? * * *, za manje cool - izraz poput @daily);
  • workflow() će obaviti glavni posao, ali ne sada. Za sada ćemo samo izbaciti naš kontekst u dnevnik.
  • A sada jednostavna čarolija stvaranja zadataka:
    • prolazimo kroz svoje izvore;
    • inicijalizirati PythonOperator, koji će izvršiti našu lutku workflow(). Ne zaboravite navesti jedinstveni (unutar daga) naziv zadatka i vezati sam dag. Zastava provide_context zauzvrat će dodati dodatne argumente u funkciju, koje ćemo pažljivo prikupiti pomoću **context.

Za sada je to sve. Što smo dobili:

  • novi dag u web sučelju,
  • stotinu i pol zadataka koji će se izvršavati paralelno (ako Airflow, Celery postavke i kapacitet poslužitelja to dopuštaju).

Pa, skoro sam shvatio.

Apache Airflow: Olakšavanje ETL-a
Tko će instalirati ovisnosti?

Da pojednostavim cijelu ovu stvar, ja sam se zajebao docker-compose.yml obrada requirements.txt na svim čvorovima.

Sada je nestalo:

Apache Airflow: Olakšavanje ETL-a

Sivi kvadratići su instance zadataka koje obrađuje planer.

Čekamo malo, zadatke razgrabe radnici:

Apache Airflow: Olakšavanje ETL-a

Zeleni su, naravno, uspješno odradili posao. Crveni nisu baš uspješni.

Usput, na našem proizvodu nema mape ./dags, nema sinkronizacije između strojeva - sve stavke leže unutra git na našem Gitlabu, a Gitlab CI distribuira ažuriranja strojevima prilikom spajanja master.

Malo o Floweru

Dok nam radnici mlataraju po dudama, prisjetimo se još jednog alata koji nam može nešto pokazati - Cvijeta.

Prva stranica sa sažetim informacijama o radnim čvorovima:

Apache Airflow: Olakšavanje ETL-a

Najintenzivnija stranica sa zadacima koji su išli na posao:

Apache Airflow: Olakšavanje ETL-a

Najdosadnija stranica sa statusom našeg brokera:

Apache Airflow: Olakšavanje ETL-a

Najsvjetlija stranica je s grafikonima statusa zadataka i njihovim vremenom izvršenja:

Apache Airflow: Olakšavanje ETL-a

Ukrcavamo nedovoljno opterećeno

Dakle, svi zadaci su odrađeni, možete nositi ranjene.

Apache Airflow: Olakšavanje ETL-a

A bilo je mnogo ranjenih – iz ovih ili onih razloga. U slučaju ispravnog korištenja Airflowa, upravo ovi kvadratići pokazuju da podaci definitivno nisu stigli.

Morate pogledati dnevnik i ponovno pokrenuti pale instance zadatka.

Klikom na bilo koji kvadrat, vidjet ćemo radnje koje su nam dostupne:

Apache Airflow: Olakšavanje ETL-a

Možete uzeti i očistiti pale. Odnosno, zaboravljamo da tamo nešto nije uspjelo, a isti zadatak instance će ići u planer.

Apache Airflow: Olakšavanje ETL-a

Jasno je da to raditi s mišem sa svim crvenim kvadratićima nije baš humano - to nije ono što očekujemo od Airflowa. Naravno, imamo oružje za masovno uništenje: Browse/Task Instances

Apache Airflow: Olakšavanje ETL-a

Odaberimo sve odjednom i vratimo na nulu, kliknite ispravnu stavku:

Apache Airflow: Olakšavanje ETL-a

Nakon čišćenja, naši taksiji izgledaju ovako (već čekaju da ih rasporedi raspored):

Apache Airflow: Olakšavanje ETL-a

Veze, kuke i druge varijable

Vrijeme je da pogledamo sljedeći DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Jesu li svi ikada izvršili ažuriranje izvješća? Ovo je opet ona: postoji popis izvora odakle se mogu dobiti podaci; postoji popis gdje staviti; ne zaboravite zatrubiti kad se sve dogodi ili pokvari (pa, ne radi se o nama, ne).

Prođimo ponovno kroz datoteku i pogledajmo nove opskurne stvari:

  • from commons.operators import TelegramBotSendMessage - ništa nas ne sprječava da napravimo vlastite operatere, što smo iskoristili tako što smo napravili mali wrapper za slanje poruka na Unblocked. (O ovom operatoru ćemo više govoriti u nastavku);
  • default_args={} - dag može distribuirati iste argumente svim svojim operatorima;
  • to='{{ var.value.all_the_kings_men }}' - polje to nećemo imati tvrdo kodirane, već dinamički generirane pomoću Jinje i varijable s popisom e-poruka koje sam pažljivo stavio u Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — uvjet za pokretanje pogona. U našem slučaju, pismo će doletjeti šefovima samo ako su sve ovisnosti uspjele uspješno;
  • tg_bot_conn_id='tg_main' - argumenti conn_id prihvaćaju ID-ove veza koje stvaramo Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - poruke u Telegramu će odletjeti samo ako postoje pali zadaci;
  • task_concurrency=1 - zabranjujemo istovremeno pokretanje nekoliko instanci jednog zadatka. U suprotnom, dobit ćemo istovremeno lansiranje nekoliko VerticaOperator (gleda u jedan stol);
  • report_update >> [email, tg] - svi VerticaOperator konvergiraju u slanju pisama i poruka, ovako:
    Apache Airflow: Olakšavanje ETL-a

    Ali budući da operateri obavijesti imaju različite uvjete pokretanja, samo će jedan raditi. U prikazu stabla sve izgleda malo manje vizualno:
    Apache Airflow: Olakšavanje ETL-a

Reći ću nekoliko riječi o makronaredbe i njihovi prijatelji - varijable.

Makronaredbe su Jinja rezervirana mjesta koja mogu zamijeniti razne korisne informacije u argumentima operatora. Na primjer, ovako:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} će se proširiti na sadržaj varijable konteksta execution_date u formatu YYYY-MM-DD: 2020-07-14. Najbolji dio je što su varijable konteksta prikovane za određenu instancu zadatka (kvadrat u prikazu stabla), a kada se ponovno pokrene, rezervirana mjesta će se proširiti na iste vrijednosti.

Dodijeljene vrijednosti mogu se vidjeti pomoću gumba Renderirano na svakoj instanci zadatka. Ovako se radi zadatak sa slanjem pisma:

Apache Airflow: Olakšavanje ETL-a

I tako kod zadatka sa slanjem poruke:

Apache Airflow: Olakšavanje ETL-a

Potpuni popis ugrađenih makronaredbi za najnoviju dostupnu verziju dostupan je ovdje: referenca makronaredbe

Štoviše, uz pomoć dodataka možemo deklarirati vlastite makronaredbe, no to je već druga priča.

Osim unaprijed definiranih stvari, možemo zamijeniti vrijednosti naših varijabli (već sam to koristio u gornjem kodu). Kreirajmo u Admin/Variables par stvari:

Apache Airflow: Olakšavanje ETL-a

Sve što možete koristiti:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Vrijednost može biti skalar ili može biti i JSON. U slučaju JSON-a:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

samo koristite put do željenog ključa: {{ var.json.bot_config.bot.token }}.

Doslovno ću reći jednu riječ i pokazati jednu snimku zaslona o tome veza. Ovdje je sve elementarno: na stranici Admin/Connections stvaramo vezu, tamo dodajemo naše prijave / lozinke i više specifičnih parametara. Kao ovo:

Apache Airflow: Olakšavanje ETL-a

Lozinke se mogu šifrirati (temeljitije od zadane) ili možete izostaviti vrstu veze (kao što sam ja učinio za tg_main) - činjenica je da je popis tipova ugrađen u Airflow modele i ne može se proširiti bez ulaska u izvorne kodove (ako odjednom nešto nisam proguglao, ispravite me), ali ništa nas neće spriječiti da dobijemo kredite samo Ime.

Također možete napraviti nekoliko veza s istim imenom: u ovom slučaju, metoda BaseHook.get_connection(), koji nam dobiva veze po imenu, dat će nasumično od nekoliko istoimenjaka (logičnije bi bilo napraviti Round Robin, ali ostavimo to na savjesti Airflow developera).

Varijable i veze su svakako cool alati, ali važno je ne izgubiti ravnotežu: koje dijelove svojih tokova spremate u sam kod, a koje dijelove dajete Airflowu za pohranu. S jedne strane, može biti zgodno brzo promijeniti vrijednost, na primjer, poštanski sandučić, putem korisničkog sučelja. S druge strane, ovo je ipak povratak na klik mišem, kojeg smo se (ja) htjeli riješiti.

Rad s vezama jedan je od zadataka udice. Općenito, Airflow kuke su točke za povezivanje s uslugama i bibliotekama trećih strana. npr. JiraHook otvorit će nam klijenta za interakciju s Jirom (možete pomicati zadatke naprijed-natrag), a uz pomoć SambaHook možete gurnuti lokalnu datoteku smb-točka.

Raščlanjivanje prilagođenog operatora

I približili smo se tome kako se to proizvodi TelegramBotSendMessage

Šifra commons/operators.py sa stvarnim operatorom:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Ovdje je, kao i sve ostalo u Airflowu, sve vrlo jednostavno:

  • Naslijeđeno od BaseOperator, koji implementira dosta stvari specifičnih za Airflow (pogledajte svoje slobodno vrijeme)
  • Deklarirana polja template_fields, u kojem će Jinja tražiti makronaredbe za obradu.
  • Posložio prave argumente za __init__(), postavite zadane vrijednosti gdje je to potrebno.
  • Nismo zaboravili niti na inicijalizaciju pretka.
  • Otvorio odgovarajuću kuku TelegramBotHookprimio objekt klijenta od njega.
  • Nadjačana (redefinirana) metoda BaseOperator.execute(), koji će Airfow trzati kada dođe vrijeme za pokretanje operatera - u njemu ćemo provesti glavnu akciju, zaboravljajući se prijaviti. (Usput se prijavljujemo odmah stdout и stderr - Protok zraka će presresti sve, lijepo zamotati, razgraditi gdje je potrebno.)

Da vidimo što imamo commons/hooks.py. Prvi dio datoteke, sa samom kukom:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ne znam ni što bih ovdje objasnio, samo ću napomenuti važne točke:

  • Nasljeđujemo, razmislite o argumentima - u većini slučajeva to će biti jedan: conn_id;
  • Prevladavanje standardnih metoda: Ograničio sam se get_conn(), u kojem dobivam parametre veze po imenu i samo dobivam odjeljak extra (ovo je JSON polje), u koje sam (prema vlastitim uputama!) stavio Telegram bot token: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ja stvaram naš primjerak TelegramBot, dajući mu određeni token.

To je sve. Možete dobiti klijenta iz udice pomoću TelegramBotHook().clent ili TelegramBotHook().get_conn().

I drugi dio fajla, u kojem pravim microwrapper za Telegram REST API, da ne povlačim isti python-telegram-bot za jednu metodu sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Ispravan način je zbrojiti sve: TelegramBotSendMessage, TelegramBotHook, TelegramBot - u dodatku, stavite u javni repozitorij i dajte ga Open Sourceu.

Dok smo sve ovo proučavali, naša ažuriranja izvješća uspjela su uspješno propasti i poslati mi poruku o pogrešci na kanalu. Idem provjeriti je li krivo...

Apache Airflow: Olakšavanje ETL-a
Nešto je puklo u našem duždu! Nije li to ono što smo očekivali? Točno!

Hoćeš li točiti?

Osjećaš li da sam nešto propustio? Izgleda da je obećao prebaciti podatke sa SQL Servera na Verticu, a onda uzeo i maknuo se s teme, nitkov!

Ova grozota je bila namjerna, jednostavno sam vam morao dešifrirati neku terminologiju. Sada možete ići dalje.

Naš plan je bio sljedeći:

  1. Do dag
  2. Generirajte zadatke
  3. Vidite kako je sve lijepo
  4. Dodijelite brojeve sesija ispunama
  5. Dohvatite podatke sa SQL Servera
  6. Stavite podatke u Verticu
  7. Prikupiti statistiku

Dakle, da sve ovo pokrenem, napravio sam mali dodatak našem docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Tu podižemo:

  • Vertica kao domaćin dwh s većinom zadanih postavki,
  • tri instance SQL Servera,
  • popunjavamo baze podataka u potonjem nekim podacima (ni u kojem slučaju ne gledajte u mssql_init.py!)

Sve dobro pokrećemo uz pomoć malo kompliciranije naredbe nego prošli put:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Ono što je naš čudesni randomizer generirao, možete koristiti stavku Data Profiling/Ad Hoc Query:

Apache Airflow: Olakšavanje ETL-a
Glavna stvar je ne pokazati to analitičarima

razraditi ETL sesije Neću, tamo je sve trivijalno: napravimo bazu, u njoj je znak, sve omotamo upraviteljem konteksta, a sada radimo ovo:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Došlo je vrijeme prikupiti naše podatke s naših stotinu i pol stolova. Učinimo to uz pomoć vrlo nepretencioznih linija:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Uz pomoć kuke dobivamo iz Airflowa pymssql-Spojiti
  2. Zamijenite ograničenje u obliku datuma u zahtjevu - motor predloška će ga ubaciti u funkciju.
  3. Hranjenje našeg zahtjeva pandastko će nas dobiti DataFrame - koristit će nam u budućnosti.

Koristim zamjenu {dt} umjesto parametra zahtjeva %s ne zato što sam zli Pinokio, nego zato što pandas ne mogu podnijeti pymssql i posklizne posljednji params: Listiako stvarno želi tuple.
Također imajte na umu da programer pymssql odlučio da ga više ne uzdržava, i vrijeme je da se iseli pyodbc.

Pogledajmo čime je Airflow napunio argumente naših funkcija:

Apache Airflow: Olakšavanje ETL-a

Ako nema podataka, onda nema smisla nastaviti. Ali također je čudno smatrati punjenje uspješnim. Ali ovo nije greška. A-ah-ah, što učiniti?! I evo što:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException govori Airflowu da nema grešaka, ali mi preskačemo zadatak. Sučelje neće imati zeleni ili crveni kvadrat, već ružičasti.

Bacimo naše podatke više stupaca:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Naime

  • Baza podataka iz koje smo preuzimali narudžbe,
  • ID naše flooding sesije (bit će drugačiji za svaki zadatak),
  • Hash iz izvora i ID narudžbe - tako da u konačnoj bazi (gdje je sve pretočeno u jednu tablicu) imamo jedinstveni ID narudžbe.

Ostaje pretposljednji korak: sve ulijte u Verticu. I, začudo, jedan od najspektakularnijih i najučinkovitijih načina za to je putem CSV-a!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Izrađujemo poseban prijemnik StringIO.
  2. pandas ljubazno će staviti naše DataFrame u obliku CSV- linije.
  3. Otvorimo vezu s našom omiljenom Verticom kukom.
  4. A sada uz pomoć copy() šaljite naše podatke izravno u Vertiku!

Od vozača ćemo uzeti koliko je redova popunjeno i reći voditelju sesije da je sve u redu:

session.loaded_rows = cursor.rowcount
session.successful = True

To je sve.

U prodaji ciljnu ploču izrađujemo ručno. Ovdje sam si dopustio mali stroj:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

koristim VerticaOperator() Kreiram shemu baze podataka i tablicu (ako već ne postoje, naravno). Glavna stvar je pravilno rasporediti ovisnosti:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Sažimanje

- Pa, - reče mali miš, - zar ne?
Jesi li uvjeren da sam ja najstrašnija životinja u šumi?

Julia Donaldson, The Gruffalo

Mislim da kad bismo se moji kolege i ja natjecali: tko će brzo kreirati i pokrenuti ETL proces od nule: oni sa svojim SSIS-om i mišem, a ja s Airflowom... A onda bismo usporedili i jednostavnost održavanja... Wow, mislim da ćete se složiti da ću ih pobijediti na svim frontama!

Ako malo ozbiljnije, onda je Apache Airflow - opisujući procese u obliku programskog koda - odradio svoj posao više udobnije i ugodnije.

Njegova neograničena proširivost, kako u smislu dodataka tako i predispozicije za skalabilnost, daje vam mogućnost korištenja Airflowa u gotovo svim područjima: čak i u punom ciklusu prikupljanja, pripreme i obrade podataka, čak i pri lansiranju raketa (na Mars, tečaj).

Završni dio, reference i informacije

Grablje koje smo prikupili za vas

  • start_date. Da, ovo je već lokalni meme. Preko Dougovog glavnog argumenta start_date sve prolazi. Ukratko, ako navedete u start_date trenutni datum i schedule_interval - jednog dana, onda će DAG početi sutra ne ranije.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    I nema više problema.

    Uz to je povezana još jedna pogreška tijekom izvođenja: Task is missing the start_date parameter, što najčešće označava da ste se zaboravili vezati na dag operator.

  • Sve na jednom stroju. Da, i baze (sam Airflow i naš premaz), i web poslužitelj, i planer, i radnici. I čak je i uspjelo. Ali s vremenom je broj zadataka za usluge rastao, a kada je PostgreSQL počeo odgovarati na indeks za 20 s umjesto za 5 ms, uzeli smo ga i odnijeli.
  • Lokalni izvršitelj. Da, još uvijek sjedimo na njemu, a već smo došli do ruba ponora. Do sada nam je LocalExecutor bio dovoljan, ali sada je vrijeme da se proširimo s barem jednim radnikom, a za prelazak na CeleryExecutor ćemo se morati dobro namučiti. A s obzirom na to da s njim možete raditi na jednom stroju, ništa vas ne sprječava da koristite Celery čak i na poslužitelju, koji "naravno, nikada neće ići u proizvodnju, iskreno!"
  • Nekorištenje ugrađeni alati:
    • veze za pohranjivanje vjerodajnica usluge,
    • SLA promašaji odgovoriti na zadatke koji nisu uspjeli na vrijeme,
    • xcom za razmjenu metapodataka (rekao sam metapodataka!) između dag zadataka.
  • Zlouporaba pošte. Pa, što da kažem? Za sva ponavljanja palih zadataka postavljena su upozorenja. Sada moj poslovni Gmail ima >90 tisuća e-poruka od Airflowa, a brnjica web pošte odbija pokupiti i izbrisati više od 100 odjednom.

Više zamki: Zamke Apache Airflow

Više alata za automatizaciju

Kako bismo još više radili glavom, a ne rukama, Airflow nam je pripremio ovo:

  • REST API - još uvijek ima status Oglednog, što ga ne sprječava u radu. Pomoću njega ne samo da možete dobiti informacije o dagovima i zadacima, već i zaustaviti/pokrenuti dag, stvoriti DAG Run ili skup.
  • CLI - mnogi alati dostupni su putem naredbenog retka koji nisu samo nezgodni za korištenje putem WebUI-ja, već ih općenito nema. Na primjer:
    • backfill potrebno za ponovno pokretanje instanci zadataka.
      Recimo, došli su analitičari i rekli: “A ti, druže, imaš gluposti u podacima od 1. do 13. siječnja! Popravi to, popravi to, popravi to, popravi to!" A ti si takav hob:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Osnovna usluga: initdb, resetdb, upgradedb, checkdb.
    • run, koji vam omogućuje pokretanje jednog zadatka instance, pa čak i ocjenjivanje svih ovisnosti. Štoviše, možete ga pokrenuti putem LocalExecutor, čak i ako imate grozd celera.
    • Radi skoro istu stvar test, samo također u bazama ne piše ništa.
    • connections omogućuje masovno stvaranje veza iz ljuske.
  • python api - prilično hardcore način interakcije, koji je namijenjen dodacima, a ne rojiti se u njemu malim rukama. Ali tko će nas spriječiti da odemo /home/airflow/dags, trčanje ipython i početi petljati? Možete, na primjer, izvesti sve veze sa sljedećim kodom:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Povezivanje s bazom metapodataka Airflow. Ne preporučujem pisanje na njemu, ali dobivanje stanja zadataka za različite specifične metrike može biti mnogo brže i lakše od upotrebe bilo kojeg API-ja.

    Recimo, nisu svi naši zadaci idempotentni, ali ponekad znaju pasti, i to je normalno. Ali nekoliko začepljenja je već sumnjivo, i bilo bi potrebno provjeriti.

    Čuvajte se SQL-a!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

reference

I naravno, prvih deset poveznica iz izdanja Googlea sadržaj je mape Airflow iz mojih bookmarkova.

I linkovi korišteni u članku:

Izvor: www.habr.com