Apache Airflow: Olakšavanje ETL-a

Zdravo, ja sam Dmitrij Logvinenko - inženjer podataka odjela za analitiku grupe kompanija Vezet.

Reći ću vam o divnom alatu za razvoj ETL procesa - Apache Airflow. Ali Airflow je toliko svestran i višestruk da biste ga trebali bolje pogledati čak i ako niste uključeni u tokove podataka, ali imate potrebu da povremeno pokrećete bilo koji proces i nadgledate njihovo izvršenje.

I da, ne samo da ću reći, već i pokazati: program ima puno koda, snimaka ekrana i preporuka.

Apache Airflow: Olakšavanje ETL-a
Ono što obično vidite kada proguglate riječ Airflow / Wikimedia Commons

Sadržaj

Uvod

Apache Airflow je kao Django:

  • napisano u pythonu
  • postoji odličan admin panel,
  • proširivo na neodređeno vreme

- samo bolji, a napravljen je za potpuno druge svrhe, naime (kako piše prije kat):

  • pokretanje i praćenje zadataka na neograničenom broju mašina (koliko će vam Celery/Kubernetes i vaša savjest dozvoliti)
  • sa generisanjem dinamičkog toka posla iz Python koda koji je vrlo jednostavan za pisanje i razumevanje
  • i mogućnost međusobnog povezivanja bilo koje baze podataka i API-ja koristeći i gotove komponente i domaće dodatke (što je izuzetno jednostavno).

Koristimo Apache Airflow ovako:

  • prikupljamo podatke iz različitih izvora (mnogo SQL Server i PostgreSQL instanci, razni API-ji sa metrikom aplikacije, čak i 1C) u DWH i ODS (imamo Vertica i Clickhouse).
  • kako napredno cron, koji pokreće procese konsolidacije podataka na ODS-u, a također prati njihovo održavanje.

Naše potrebe je donedavno pokrivao jedan mali server sa 32 jezgra i 50 GB RAM-a. U Airflow ovo radi:

  • više 200 dags (zapravo tokovi posla, u koje smo stavili zadatke),
  • u svakom u prosjeku 70 zadataka,
  • ova dobrota počinje (također u prosjeku) jednom na sat.

A o tome kako smo se proširili, pisaću u nastavku, ali sada hajde da definiramo über-problem koji ćemo riješiti:

Postoje tri izvorna SQL servera, svaki sa 50 baza podataka - instance jednog projekta, odnosno, imaju istu strukturu (skoro svuda, mua-ha-ha), što znači da svaki ima tabelu narudžbi (na sreću, tabelu sa tim ime se može ubaciti u bilo koji posao). Podatke uzimamo dodavanjem servisnih polja (izvorni server, izvorna baza podataka, ID ETL zadatka) i naivno ih bacamo u, recimo, Verticu.

Idemo!

Glavni dio, praktični (i malo teoretski)

Zašto mi (i vi)

Kad je drveće bilo veliko, a ja sam bio jednostavan SQL-schik u jednoj ruskoj maloprodaji, prevarili smo ETL procese ili tokove podataka koristeći dva alata koja su nam dostupna:

  • Informatica Power Center - sistem koji se izuzetno širi, izuzetno produktivan, sa sopstvenim hardverom, sopstvenim verzijama. Koristio sam ne daj Bože 1% njegovih mogućnosti. Zašto? Pa, prije svega, ovaj interfejs, negdje iz 380-ih, psihički je izvršio pritisak na nas. Drugo, ova sprava je dizajnirana za izuzetno fensi procese, besnu ponovnu upotrebu komponenti i druge veoma važne trikove za preduzeća. O tome koliko košta, poput krila Airbusa AXNUMX / godine, nećemo ništa reći.

    Pazite, snimak ekrana može malo povrijediti ljude mlađe od 30 godina

    Apache Airflow: Olakšavanje ETL-a

  • Server integracije SQL Servera - koristili smo ovog druga u našim tokovima unutar projekta. Pa, u stvari: već koristimo SQL Server, i bilo bi nekako nerazumno ne koristiti njegove ETL alate. Sve u njemu je dobro: i interfejs je prelep, i izveštaji o napretku... Ali nije razlog zašto volimo softverske proizvode, oh, ne zbog toga. Verzija dtsx (što je XML sa čvorovima izmešanim pri čuvanju) možemo, ali koja je poenta? Šta kažete na stvaranje paketa zadataka koji će povući stotine tabela sa jednog servera na drugi? Da, kakva sto, kažiprst će vam otpasti sa dvadeset komada, klikom na dugme miša. Ali definitivno izgleda modernije:

    Apache Airflow: Olakšavanje ETL-a

Svakako smo tražili izlaze. Čak i slučaj gotovo došao do samog generatora SSIS paketa...

…i onda me pronašao novi posao. I Apache Airflow me je pretekao na njemu.

Kada sam saznao da su opisi ETL procesa jednostavan Python kod, jednostavno nisam plesao od radosti. Ovako su tokovi podataka verzionisani i različiti, a sipanje tabela sa jednom strukturom iz stotina baza podataka u jedan cilj postalo je stvar Python koda na jedan i po ili dva ekrana od 13 inča.

Sastavljanje klastera

Hajde da ne uredimo kompletan vrtić, i da ne pričamo o potpuno očiglednim stvarima, poput instaliranja Airflow-a, odabrane baze podataka, Celery i drugih slučajeva opisanih u dokovima.

Da bismo odmah mogli početi eksperimente, skicirao sam docker-compose.yml u kojem:

  • Hajde da zapravo podignemo Airflow: Planer, Web server. Flower će se također vrtjeti tamo kako bi nadgledao Celery zadatke (jer je već gurnut apache/airflow:1.10.10-python3.7, ali nama ne smeta)
  • PostgreSQL, u koji će Airflow upisati svoje servisne informacije (podaci planera, statistika izvršenja, itd.), a Celery će označiti završene zadatke;
  • Redis, koji će djelovati kao posrednik zadataka za Celery;
  • Radnik celera, koji će biti angažovan na neposrednom izvršavanju zadataka.
  • U folder ./dags mi ćemo dodati naše fajlove sa opisom dags. Oni će se pokupiti u hodu, tako da nema potrebe za žongliranjem cijelom hrpom nakon svakog kihanja.

Na nekim mjestima kod u primjerima nije u potpunosti prikazan (kako ne bi zatrpao tekst), ali se negdje mijenja u procesu. Kompletni primjeri radnog koda mogu se naći u spremištu https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Napomene:

  • U sklapanju kompozicije u velikoj meri sam se oslanjao na dobro poznatu sliku puckel/docker-airflow - obavezno provjerite. Možda ti ništa više ne treba u životu.
  • Sve postavke protoka zraka dostupne su ne samo putem airflow.cfg, ali i kroz varijable okruženja (zahvaljujući programerima), koje sam zlonamjerno iskoristio.
  • Naravno, nije spreman za proizvodnju: namjerno nisam stavljao otkucaje srca na kontejnere, nisam se zamarao osiguranjem. Ali uradio sam minimum prikladan za naše eksperimentatore.
  • Zapiši to:
    • Dag folder mora biti dostupan i planeru i radnicima.
    • Isto važi i za sve biblioteke trećih strana - sve one moraju biti instalirane na mašinama sa planerom i radnicima.

Pa, sad je jednostavno:

$ docker-compose up --scale worker=3

Nakon što se sve podigne, možete pogledati web sučelja:

Osnovni pojmovi

Ako ništa niste razumjeli u svim ovim "dagovima", evo kratkog rječnika:

  • Planer - najvažniji ujak u Airflowu, koji kontrolira da roboti naporno rade, a ne osoba: prati raspored, ažurira dagove, pokreće zadatke.

    Općenito, u starijim verzijama imao je problema s memorijom (ne, nije amnezija, već curenje), a legacy parametar je čak ostao u konfiguracijama run_duration — njegov interval ponovnog pokretanja. Ali sada je sve u redu.

  • DAG (aka "dag") - "usmjereni aciklički graf", ali takva definicija će reći malo ljudi, ali u stvari je to kontejner za zadatke koji međusobno komuniciraju (vidi dolje) ili analog paketa u SSIS-u i toka rada u Informatici .

    Osim dagova, možda još postoje poddagovi, ali do njih najvjerovatnije nećemo doći.

  • DAG Run - inicijalizirani dag, kojem je dodijeljen vlastiti execution_date. Dagranovi istog daga mogu raditi paralelno (ako ste svoje zadatke učinili idempotentnim, naravno).
  • operator su dijelovi koda odgovorni za izvođenje određene radnje. Postoje tri tipa operatora:
    • akcijakao naš omiljeni PythonOperator, koji može izvršiti bilo koji (važeći) Python kod;
    • transfer, koji prenose podatke od mjesta do mjesta, recimo, MsSqlToHiveTransfer;
    • senzor s druge strane, omogućit će vam da reagirate ili usporite dalje izvršavanje dag-a sve dok se ne dogodi neki događaj. HttpSensor može povući navedenu krajnju tačku i kada čeka željeni odgovor, započnite prijenos GoogleCloudStorageToS3Operator. Radoznali um će se zapitati: „Zašto? Na kraju krajeva, možete raditi ponavljanja direktno u operateru!” A onda, kako ne bi začepili skup zadataka suspendovanim operaterima. Senzor se pokreće, provjerava i ugasi prije sljedećeg pokušaja.
  • zadatak - deklarisani operateri, bez obzira na vrstu, i priključeni na dag, unapređuju se u rang zadatka.
  • instanca zadatka - kada je generalni planer odlučio da je vrijeme za slanje zadataka u borbu na izvođače-radnike (odmah na licu mjesta, ako koristimo LocalExecutor ili udaljenom čvoru u slučaju CeleryExecutor), dodjeljuje im kontekst (tj. skup varijabli - parametara izvršenja), proširuje predloške naredbi ili upita i objedinjuje ih.

Mi generišemo zadatke

Prvo, ocrtajmo opću shemu našeg douga, a zatim ćemo se sve više upuštati u detalje, jer primjenjujemo neka netrivijalna rješenja.

Dakle, u svom najjednostavnijem obliku, takav dag će izgledati ovako:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Hajde da shvatimo:

  • Prvo uvozimo potrebne biblioteke i nešto drugo;
  • sql_server_ds Je List[namedtuple[str, str]] sa nazivima veza iz Airflow Connections i bazama podataka iz kojih ćemo uzeti našu ploču;
  • dag - najava našeg daga, koja obavezno mora biti u globals(), inače ga Airflow neće pronaći. Doug takođe treba da kaže:
    • kako se on zove orders - ovo ime će se tada pojaviti u web interfejsu,
    • da će raditi od ponoći osmog jula,
    • i trebalo bi da radi, otprilike svakih 6 sati (za jake momke ovdje umjesto timedelta() prihvatljivo cron-line 0 0 0/6 ? * * *, za manje kul - izraz poput @daily);
  • workflow() obaviće glavni posao, ali ne sada. Za sada ćemo samo izbaciti naš kontekst u dnevnik.
  • A sada jednostavna magija kreiranja zadataka:
    • prolazimo kroz naše izvore;
    • inicijalizirati PythonOperator, koji će izvršiti našu lutku workflow(). Ne zaboravite navesti jedinstveno (unutar daga) ime zadatka i vezati sam dag. Zastava provide_context zauzvrat će u funkciju uliti dodatne argumente, koje ćemo pažljivo prikupiti koristeći **context.

Za sada, to je sve. šta smo dobili:

  • novi dag u web interfejsu,
  • sto i pol zadataka koji će se izvršavati paralelno (ako to dozvoljavaju Airflow, Celery postavke i kapacitet servera).

Pa, skoro sam dobio.

Apache Airflow: Olakšavanje ETL-a
Ko će instalirati zavisnosti?

Da pojednostavim cijelu ovu stvar, zeznuo sam docker-compose.yml obrada requirements.txt na svim čvorovima.

sada je nestalo:

Apache Airflow: Olakšavanje ETL-a

Sivi kvadrati su instance zadataka koje obrađuje planer.

Čekamo malo, poslove pohvataju radnici:

Apache Airflow: Olakšavanje ETL-a

Zeleni su, naravno, uspješno završili svoj posao. Crveni nisu baš uspješni.

Inače, na našem proizvodu nema foldera ./dags, nema sinhronizacije između mašina - svi dagovi leže git na našem Gitlabu, a Gitlab CI distribuira ažuriranja mašinama prilikom spajanja master.

Malo o cvijetu

Dok nam radnici mlataraju cucle, sjetimo se još jednog alata koji nam može nešto pokazati - Cvijeća.

Prva stranica sa sažetim informacijama o radničkim čvorovima:

Apache Airflow: Olakšavanje ETL-a

Najintenzivnija stranica sa zadacima koji su uspjeli:

Apache Airflow: Olakšavanje ETL-a

Najdosadnija stranica sa statusom našeg brokera:

Apache Airflow: Olakšavanje ETL-a

Najsjajnija stranica je sa grafikonima statusa zadataka i vremenom njihovog izvršavanja:

Apache Airflow: Olakšavanje ETL-a

Učitavamo nedovoljno

Dakle, svi zadaci su uspjeli, možete odnijeti ranjenike.

Apache Airflow: Olakšavanje ETL-a

I bilo je mnogo ranjenih - iz ovih ili onih razloga. U slučaju ispravne upotrebe Airflowa, upravo ovi kvadrati pokazuju da podaci definitivno nisu stigli.

Morate pogledati dnevnik i ponovo pokrenuti pale instance zadataka.

Klikom na bilo koji kvadrat, vidjet ćemo akcije koje su nam dostupne:

Apache Airflow: Olakšavanje ETL-a

Možete uzeti i očistiti pale. Odnosno, zaboravljamo da je tamo nešto nije uspjelo, a isti zadatak instance će ići u planer.

Apache Airflow: Olakšavanje ETL-a

Jasno je da ovo raditi mišem sa svim crvenim kvadratima nije baš humano – to nije ono što očekujemo od Airflowa. Naravno, imamo oružje za masovno uništenje: Browse/Task Instances

Apache Airflow: Olakšavanje ETL-a

Odaberimo sve odjednom i vratimo na nulu, kliknite na ispravnu stavku:

Apache Airflow: Olakšavanje ETL-a

Nakon čišćenja naši taksiji izgledaju ovako (već čekaju da ih rasporedi rasporedi):

Apache Airflow: Olakšavanje ETL-a

Veze, kuke i druge varijable

Vrijeme je da pogledamo sljedeći DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Da li su svi ikada ažurirali izvještaj? Ovo je opet ona: postoji lista izvora odakle se mogu dobiti podaci; postoji lista gde da se stavi; ne zaboravite da zatrubite kada se sve desilo ili puklo (pa, ne radi se o nama, ne).

Prođimo ponovo kroz fajl i pogledajmo nove opskurne stvari:

  • from commons.operators import TelegramBotSendMessage - ništa nas ne sprečava da napravimo sopstvene operatere, što smo iskoristili tako što smo napravili mali omot za slanje poruka na Unblocked. (O ovom operateru ćemo više govoriti u nastavku);
  • default_args={} - dag može distribuirati iste argumente svim svojim operaterima;
  • to='{{ var.value.all_the_kings_men }}' - polje to nećemo imati tvrdo kodiranu, već dinamički generiranu koristeći Jinja i varijablu sa listom emailova, koju sam pažljivo unio Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — uslov za pokretanje operatera. U našem slučaju, pismo će doletjeti šefovima samo ako su sve ovisnosti riješene uspješno;
  • tg_bot_conn_id='tg_main' - argumenti conn_id prihvatiti ID-ove veze u kojima kreiramo Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - poruke u Telegramu će odleteti samo ako ima palih zadataka;
  • task_concurrency=1 - zabranjujemo istovremeno pokretanje više instanci zadatka jednog zadatka. U suprotnom ćemo dobiti istovremeno lansiranje nekoliko VerticaOperator (gleda u jedan sto);
  • report_update >> [email, tg] - sve VerticaOperator konvergiraju u slanju pisama i poruka, poput ove:
    Apache Airflow: Olakšavanje ETL-a

    Ali pošto operateri notifier imaju različite uslove pokretanja, samo jedan će raditi. U prikazu stabla sve izgleda malo manje vizuelno:
    Apache Airflow: Olakšavanje ETL-a

Reći ću nekoliko riječi o macros i njihovi prijatelji - varijable.

Makroi su Jinja čuvari mjesta koji mogu zamijeniti različite korisne informacije u argumentima operatora. Na primjer, ovako:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} proširit će se na sadržaj varijable konteksta execution_date u formatu YYYY-MM-DD: 2020-07-14. Najbolji dio je to što su varijable konteksta prikovane za određenu instancu zadatka (kvadrat u prikazu stabla), a kada se ponovo pokrene, čuvari mjesta će se proširiti na iste vrijednosti.

Dodijeljene vrijednosti mogu se vidjeti pomoću gumba Rendered na svakoj instanci zadatka. Ovako je zadatak sa slanjem pisma:

Apache Airflow: Olakšavanje ETL-a

I tako na zadatku sa slanjem poruke:

Apache Airflow: Olakšavanje ETL-a

Kompletna lista ugrađenih makroa za najnoviju dostupnu verziju dostupna je ovdje: referenca makroa

Štaviše, uz pomoć dodataka možemo deklarirati vlastite makroe, ali to je druga priča.

Osim unaprijed definiranih stvari, možemo zamijeniti vrijednosti naših varijabli (ja sam to već koristio u kodu iznad). Kreirajmo unutra Admin/Variables par stvari:

Apache Airflow: Olakšavanje ETL-a

Sve što možete koristiti:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Vrijednost može biti skalar, ili također može biti JSON. U slučaju JSON-a:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

samo koristite putanju do željenog ključa: {{ var.json.bot_config.bot.token }}.

Doslovno ću reći jednu riječ i pokazati jedan snimak ekrana veze. Ovdje je sve elementarno: na stranici Admin/Connections kreiramo vezu, dodajemo naše login/lozinke i konkretnije parametre tamo. Volim ovo:

Apache Airflow: Olakšavanje ETL-a

Lozinke se mogu šifrirati (temeljitije od zadanih) ili možete izostaviti vrstu veze (kao što sam ja učinio za tg_main) - činjenica je da je lista tipova ugrađena u Airflow modele i ne može se proširiti bez ulaska u izvorne kodove (ako odjednom nisam nešto proguglao, ispravite me), ali ništa nas neće spriječiti da dobijemo kredite samo tako što ćete ime.

Također možete napraviti nekoliko veza s istim imenom: u ovom slučaju metodu BaseHook.get_connection(), koji nam daje veze po imenu, će dati nasumično od nekoliko imenjaka (bilo bi logičnije napraviti Round Robin, ali ostavimo to na savjesti Airflow programera).

Varijable i veze su svakako cool alati, ali važno je da ne izgubite ravnotežu: koje dijelove svojih tokova pohranjujete u samom kodu, a koje dijelove dajete Airflow-u na pohranu. S jedne strane, može biti zgodno brzo promijeniti vrijednost, na primjer, poštansko sanduče, putem korisničkog sučelja. S druge strane, ovo je još uvijek povratak na klik mišem kojeg smo (ja) htjeli da se riješimo.

Rad sa vezama je jedan od zadataka kuke. Općenito, kuke Airflow su tačke za povezivanje sa uslugama i bibliotekama trećih strana. npr. JiraHook će nam otvoriti klijenta za interakciju sa Jira (možete pomicati zadatke naprijed-nazad), i uz pomoć SambaHook možete gurnuti lokalnu datoteku u smb-point.

Raščlanjivanje prilagođenog operatora

I bili smo blizu da pogledamo kako je napravljen TelegramBotSendMessage

Kod commons/operators.py sa stvarnim operaterom:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Ovdje, kao i sve ostalo u Airflowu, sve je vrlo jednostavno:

  • Naslijeđeno od BaseOperator, koji implementira dosta stvari specifičnih za protok zraka (pogledajte u slobodno vrijeme)
  • Deklarisana polja template_fields, u kojem će Jinja tražiti makroe za obradu.
  • Organizovao prave argumente za __init__(), postavite zadane postavke gdje je potrebno.
  • Nismo zaboravili ni na inicijalizaciju pretka.
  • Otvorio odgovarajuću kuku TelegramBotHookprimio klijentski objekat od njega.
  • Poništena (redefinirana) metoda BaseOperator.execute(), koji će Airfow trzati kada dođe vrijeme za pokretanje operatera - u njemu ćemo implementirati glavnu akciju, zaboravljajući da se prijavimo. (Usput, prijavljujemo se odmah stdout и stderr - Protok zraka će sve presresti, lijepo umotati, razložiti gdje je potrebno.)

Hajde da vidimo šta imamo commons/hooks.py. Prvi dio fajla, sa samom kukom:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ne znam ni šta da objasnim ovde, samo ću napomenuti važne tačke:

  • Nasljeđujemo, razmislite o argumentima - u većini slučajeva to će biti jedan: conn_id;
  • Nadjačavanje standardnih metoda: ograničio sam se get_conn(), u kojem dobijam parametre veze po imenu i samo dobijam sekciju extra (ovo je JSON polje), u koje sam (prema svojim uputstvima!) stavio token Telegram bota: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ja kreiram našu instancu TelegramBot, dajući mu određeni token.

To je sve. Možete dobiti klijenta iz kuke koristeći TelegramBotHook().clent ili TelegramBotHook().get_conn().

I drugi dio fajla, u kojem pravim microwrapper za Telegram REST API, da ne bih prevukao isti python-telegram-bot za jednu metodu sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Ispravan način je da se sve zbroji: TelegramBotSendMessage, TelegramBotHook, TelegramBot - u dodatku, stavite u javno spremište i dajte ga Open Source-u.

Dok smo sve ovo proučavali, ažuriranja naših izvještaja uspjela su uspješno propasti i poslati mi poruku o grešci na kanalu. Idem da proverim da li nije u redu...

Apache Airflow: Olakšavanje ETL-a
Nešto se slomilo u našem duždu! Nije li to ono što smo očekivali? Upravo!

Hoćeš li sipati?

Da li mislite da sam nešto propustio? Izgleda da je obećao da će prenijeti podatke sa SQL Servera na Verticu, a onda je uzeo i skrenuo s teme, nitkov!

Ovaj zločin je bio namjeran, jednostavno sam morao da vam dešifrujem neku terminologiju. Sada možete ići dalje.

Naš plan je bio sledeći:

  1. Do dag
  2. Generirajte zadatke
  3. Pogledajte kako je sve lepo
  4. Dodijelite brojeve sesije popunjavanju
  5. Dobijte podatke sa SQL Servera
  6. Stavite podatke u Vertica
  7. Prikupite statistiku

Dakle, da bih sve ovo pokrenuo, napravio sam mali dodatak našem docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Tamo podižemo:

  • Vertica kao domaćin dwh sa najvećim zadanim postavkama,
  • tri instance SQL Servera,
  • popunjavamo baze podataka u potonjem nekim podacima (ni u kom slučaju ne gledajte u njih mssql_init.py!)

Sve dobro pokrećemo uz pomoć malo kompliciranije naredbe nego prošli put:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Ono što je naš čudesni randomizer generirao, možete koristiti predmet Data Profiling/Ad Hoc Query:

Apache Airflow: Olakšavanje ETL-a
Glavna stvar je da to ne pokazujete analitičarima

razraditi ETL sesije Neću, tamo je sve trivijalno: napravimo bazu, u njoj je znak, sve omotamo kontekst menadžerom, a sada radimo ovo:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Došlo je vrijeme prikupljamo naše podatke sa naših sto i po stolova. Učinimo to uz pomoć vrlo nepretencioznih linija:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Uz pomoć kuke dobijamo od Airflow-a pymssql-connect
  2. Zamijenimo ograničenje u obliku datuma u zahtjevu - to će biti ubačeno u funkciju od strane motora šablona.
  3. Ispunjava naš zahtjev pandasko će nas uhvatiti DataFrame - biće nam od koristi u budućnosti.

Koristim zamjenu {dt} umjesto parametra zahtjeva %s ne zato što sam zao Pinokio, nego zato pandas ne mogu podnijeti pymssql i sklizne posljednju params: Listiako zaista želi tuple.
Također imajte na umu da programer pymssql odlučio da ga više ne podržavam i vrijeme je da se iseli pyodbc.

Pogledajmo čime je Airflow napunio argumente naših funkcija:

Apache Airflow: Olakšavanje ETL-a

Ako nema podataka, onda nema smisla nastaviti. Ali također je čudno smatrati da je punjenje uspjelo. Ali ovo nije greška. A-ah-ah, šta da se radi?! A evo šta:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException govori Airflow-u da nema grešaka, ali preskačemo zadatak. Interfejs neće imati zeleni ili crveni kvadrat, već roze.

Hajde da bacimo naše podatke više kolona:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Naime:

  • Baza podataka iz koje smo preuzimali narudžbe,
  • ID naše flooding sesije (bit će drugačiji za svaki zadatak),
  • Haš iz izvora i ID narudžbe - tako da u konačnoj bazi podataka (gdje se sve sipa u jednu tabelu) imamo jedinstveni ID narudžbe.

Ostaje pretposljednji korak: sve sipati u Verticu. I, što je čudno, jedan od najspektakularnijih i najefikasnijih načina da se to uradi je putem CSV-a!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Pravimo poseban prijemnik StringIO.
  2. pandas ljubazno će staviti naše DataFrame u obliku CSV-linije.
  3. Otvorimo vezu sa našom omiljenom Verticom pomoću kuke.
  4. A sada uz pomoć copy() pošaljite naše podatke direktno Vertici!

Od vozača ćemo uzeti koliko je redova popunjeno i reći menadžeru sesije da je sve u redu:

session.loaded_rows = cursor.rowcount
session.successful = True

To je sve.

Na prodaji ciljanu ploču kreiramo ručno. Evo dozvolio sam sebi malu mašinu:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

ja koristim VerticaOperator() Kreiram šemu baze podataka i tabelu (ako već ne postoje, naravno). Glavna stvar je pravilno rasporediti zavisnosti:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Sumiranje

- Pa, - reče mali miš, - zar ne
Jesi li uvjeren da sam ja najstrašnija životinja u šumi?

Julia Donaldson, The Gruffalo

Mislim da kad bismo moje kolege i ja imali konkurenciju: ko će brzo kreirati i pokrenuti ETL proces od nule: oni sa svojim SSIS-om i mišem a ja sa Airflowom... A onda bismo uporedili i lakoću održavanja... Vau, mislim da ćete se složiti da ću ih pobediti na svim frontovima!

Ako malo ozbiljnije, onda je Apache Airflow - opisujući procese u obliku programskog koda - odradio moj posao puno udobnije i prijatnije.

Njegova neograničena proširivost, kako u pogledu dodataka tako i sklonosti ka skalabilnosti, daje vam priliku da koristite Airflow u gotovo svakom području: čak iu punom ciklusu prikupljanja, pripreme i obrade podataka, čak i pri lansiranju raketa (na Mars, od kurs).

Završni dio, referenca i informacija

Grablje koje smo prikupili za vas

  • start_date. Da, ovo je već lokalni mem. Preko Dagovog glavnog argumenta start_date svi prolaze. Ukratko, ako navedete u start_date trenutni datum, i schedule_interval - jednog dana, onda će DAG početi sutra ne ranije.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    I nema više problema.

    Postoji još jedna greška tokom izvođenja koja je povezana s tim: Task is missing the start_date parameter, što najčešće ukazuje da ste se zaboravili vezati za dag operator.

  • Sve na jednoj mašini. Da, i baze (sam tok zraka i naš premaz), i web server, i planer, i radnici. I čak je upalilo. Ali s vremenom je rastao broj zadataka za usluge, a kada je PostgreSQL počeo da odgovara na indeks za 20 s umjesto za 5 ms, uzeli smo ga i odnijeli.
  • LocalExecutor. Da, još uvijek sjedimo na njemu, a već smo došli do ivice provalije. LocalExecutor nam je do sada bio dovoljan, ali sada je vrijeme da se proširimo sa barem jednim radnikom, a mi ćemo se morati potruditi da pređemo na CeleryExecutor. A s obzirom na to da s njim možete raditi na jednoj mašini, ništa vas ne sprečava da koristite Celery čak ni na serveru, koji „naravno, nikada neće ući u proizvodnju, iskreno!“
  • Neupotreba ugrađeni alati:
    • Connections pohraniti servisne akreditive,
    • SLA Misses da odgovori na zadatke koji nisu uspjeli na vrijeme,
    • xcom za razmjenu metapodataka (rekao sam metapodataka!) između dag zadataka.
  • Zloupotreba pošte. Pa, šta da kažem? Postavljena su upozorenja za sva ponavljanja palih zadataka. Sada moj poslovni Gmail ima >90 e-poruka od Airflow-a, a njuška web pošte odbija da pokupi i izbriše više od 100 odjednom.

Više zamki: Apache Airflow Pitfails

Više alata za automatizaciju

Kako bismo još više radili glavom, a ne rukama, Airflow nam je pripremio ovo:

  • REST API - i dalje ima status Eksperimentalnog, što ga ne sprečava da radi. Pomoću njega ne možete samo dobiti informacije o dagovima i zadacima, već i zaustaviti/pokrenuti dag, kreirati DAG Run ili skup.
  • CLI - mnogi alati su dostupni putem komandne linije koji nisu samo nezgodni za korištenje kroz WebUI, već su generalno odsutni. Na primjer:
    • backfill potrebno za ponovno pokretanje instanci zadatka.
      Recimo, došli su analitičari i rekli: „A ti, druže, imaš gluposti u podacima od 1. do 13. januara! Popravi, popravi, popravi, popravi!" A ti si takva ploča za kuhanje:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Osnovna usluga: initdb, resetdb, upgradedb, checkdb.
    • run, što vam omogućava da pokrenete zadatak jedne instance, pa čak i postignete rezultat na svim zavisnostima. Štaviše, možete ga pokrenuti putem LocalExecutor, čak i ako imate klaster celera.
    • Radi otprilike istu stvar test, samo iu bazama ne piše ništa.
    • connections omogućava masovno stvaranje veza iz ljuske.
  • python api - prilično hardcore način interakcije, koji je namijenjen pluginovima, a ne rojenju u njemu malim rukama. Ali ko će nas spriječiti da idemo /home/airflow/dags, trči ipython i početi se petljati? Možete, na primjer, izvesti sve veze sa sljedećim kodom:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Povezivanje sa bazom metapodataka Airflow. Ne preporučujem da pišete na njega, ali dobivanje stanja zadataka za različite specifične metrike može biti mnogo brže i lakše nego putem bilo kojeg od API-ja.

    Recimo da nisu svi naši zadaci idempotentni, ali ponekad mogu pasti, i to je normalno. Ali nekoliko blokada je već sumnjivo i bilo bi potrebno provjeriti.

    Čuvajte se SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

reference

I naravno, prvih deset linkova iz izdavanja Gugla je sadržaj foldera Airflow iz mojih bookmarka.

I linkovi korišteni u članku:

izvor: www.habr.com