Apache Airflow: Gør ETL lettere

Hej, jeg hedder Dmitry Logvinenko - dataingeniør i analyseafdelingen i Vezet-gruppen af ​​virksomheder.

Jeg vil fortælle dig om et vidunderligt værktøj til udvikling af ETL-processer - Apache Airflow. Men Airflow er så alsidigt og mangefacetteret, at du bør se nærmere på det, selvom du ikke er involveret i datastrømme, men har et behov for periodisk at starte eventuelle processer og overvåge deres eksekvering.

Og ja, jeg vil ikke kun fortælle, men også vise: programmet har en masse kode, skærmbilleder og anbefalinger.

Apache Airflow: Gør ETL lettere
Hvad du normalt ser, når du googler ordet Airflow / Wikimedia Commons

indholdsfortegnelse

Indledning

Apache Airflow er ligesom Django:

  • skrevet i python
  • der er et fantastisk admin panel,
  • kan udvides på ubestemt tid

- kun bedre, og den blev lavet til helt andre formål, nemlig (som det er skrevet før kataen):

  • køre og overvåge opgaver på et ubegrænset antal maskiner (som mange selleri / Kubernetes og din samvittighed vil tillade dig)
  • med dynamisk workflowgenerering fra meget let at skrive og forstå Python-kode
  • og muligheden for at forbinde alle databaser og API'er med hinanden ved hjælp af både færdige komponenter og hjemmelavede plugins (hvilket er ekstremt enkelt).

Vi bruger Apache Airflow sådan her:

  • vi indsamler data fra forskellige kilder (mange SQL Server- og PostgreSQL-instanser, forskellige API'er med applikationsmetrikker, endda 1C) i DWH og ODS (vi har Vertica og Clickhouse).
  • hvor avanceret cron, som starter datakonsolideringsprocesserne på ODS'en og også overvåger deres vedligeholdelse.

Indtil for nylig blev vores behov dækket af én lille server med 32 kerner og 50 GB RAM. I Airflow virker dette:

  • mere 200 dags (faktisk arbejdsgange, hvor vi fyldte opgaver),
  • i hver i gennemsnit 70 opgaver,
  • denne godhed starter (også i gennemsnit) en gang i timen.

Og om hvordan vi udvidede, vil jeg skrive nedenfor, men lad os nu definere über-problemet, som vi vil løse:

Der er tre originale SQL-servere, hver med 50 databaser - forekomster af henholdsvis et projekt, de har den samme struktur (næsten overalt, mua-ha-ha), hvilket betyder, at hver har en ordretabel (heldigvis en tabel med det navn kan skubbes ind i enhver virksomhed). Vi tager dataene ved at tilføje servicefelter (kildeserver, kildedatabase, ETL opgave-id) og smider dem naivt ind i f.eks. Vertica.

Lad os gå!

Hoveddelen, praktisk (og lidt teoretisk)

Hvorfor gør vi (og du)

Da træerne var store, og jeg var simpel SQL-schik i en russisk detailhandel snydte vi ETL-processer aka datastrømme ved hjælp af to tilgængelige værktøjer:

  • Informatica Power Center - et ekstremt spredningssystem, ekstremt produktivt, med sin egen hardware, sin egen versionering. Jeg brugte Gud forbyde 1% af dens muligheder. Hvorfor? Nå, først og fremmest satte denne grænseflade, et sted fra 380'erne, mentalt pres på os. For det andet er denne anordning designet til ekstremt smarte processer, rasende genbrug af komponenter og andre meget vigtige virksomhedstricks. Om hvad det koster, ligesom vingen på Airbus AXNUMX / år, vil vi ikke sige noget.

    Pas på, et skærmbillede kan skade folk under 30 lidt

    Apache Airflow: Gør ETL lettere

  • SQL Server Integration Server - vi brugte denne kammerat i vores interne projektstrømme. Nå, faktisk: vi bruger allerede SQL Server, og det ville på en eller anden måde være urimeligt ikke at bruge dets ETL-værktøjer. Alt i det er godt: både grænsefladen er smuk, og statusrapporterne ... Men det er ikke derfor, vi elsker softwareprodukter, åh, ikke for dette. Version det dtsx (som er XML med noder blandet på gem) kan vi, men hvad er meningen? Hvad med at lave en opgavepakke, der vil trække hundredvis af tabeller fra en server til en anden? Ja, sikke et hundrede, din pegefinger vil falde af fra tyve stykker ved at klikke på museknappen. Men det ser bestemt mere moderigtigt ud:

    Apache Airflow: Gør ETL lettere

Vi ledte bestemt efter udveje. Sag endda næsten kom til en selvskrevet SSIS-pakkegenerator ...

…og så fandt et nyt job mig. Og Apache Airflow overhalede mig på det.

Da jeg fandt ud af, at ETL-procesbeskrivelser er simpel Python-kode, dansede jeg bare ikke af glæde. Sådan blev datastrømme versioneret og adskilt, og at hælde tabeller med en enkelt struktur fra hundredvis af databaser ind i ét mål blev et spørgsmål om Python-kode på halvanden eller to 13 ”skærme.

Samling af klyngen

Lad os ikke arrangere en helt børnehave, og ikke tale om helt indlysende ting her, som at installere Airflow, din valgte database, Selleri og andre sager beskrevet i dokken.

Så vi straks kan begynde eksperimenter, skitserede jeg docker-compose.yml hvori:

  • Lad os faktisk hæve Luftmængde: Planlægger, Webserver. Blomst vil også snurre der for at overvåge selleri-opgaver (fordi den allerede er blevet skubbet ind apache/airflow:1.10.10-python3.7, men vi gider ikke)
  • PostgreSQL, hvor Airflow vil skrive sine serviceoplysninger (planlægningsdata, udførelsesstatistik osv.), og Celery vil markere afsluttede opgaver;
  • Omfor, som vil fungere som opgavemægler for Selleri;
  • Selleriarbejder, som vil være engageret i den direkte udførelse af opgaver.
  • Til mappe ./dags vi tilføjer vores filer med beskrivelsen af ​​dags. De bliver samlet op i farten, så der er ingen grund til at jonglere med hele stakken efter hvert nys.

Nogle steder er koden i eksemplerne ikke helt vist (for ikke at rode i teksten), men et eller andet sted bliver den modificeret i processen. Fuldstændige eksempler på arbejdskode kan findes i depotet https://github.com/dm-logv/airflow-tutorial.

havnearbeider-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Bemærkninger:

  • I sammensætningen af ​​kompositionen stolede jeg i høj grad på det velkendte billede pukkel/docker-luftstrøm - husk at tjekke det ud. Måske har du ikke brug for andet i dit liv.
  • Alle Airflow-indstillinger er tilgængelige ikke kun via airflow.cfg, men også gennem miljøvariabler (tak til udviklerne), som jeg ondsindet benyttede mig af.
  • Naturligvis er den ikke produktionsklar: Jeg lagde bevidst ikke hjerteslag på containere, jeg bøvlede ikke med sikkerheden. Men jeg gjorde det minimum, der var passende for vores forsøgsledere.
  • Noter det:
    • Dag-mappen skal være tilgængelig for både planlæggeren og arbejderne.
    • Det samme gælder for alle tredjepartsbiblioteker - de skal alle være installeret på maskiner med skemalægger og arbejdere.

Nå, nu er det enkelt:

$ docker-compose up --scale worker=3

Når alt er kommet op, kan du se på webgrænsefladerne:

Grundlæggende begreber

Hvis du ikke forstod noget i alle disse "dage", så er her en kort ordbog:

  • Scheduler - den vigtigste onkel i Airflow, der kontrollerer, at robotter arbejder hårdt, og ikke en person: overvåger tidsplanen, opdaterer dag, starter opgaver.

    Generelt havde han i ældre versioner problemer med hukommelsen (nej, ikke hukommelsestab, men lækager), og den gamle parameter forblev endda i konfigurationerne run_duration — dens genstartsinterval. Men nu er alt godt.

  • DAG (aka "dag") - "dirigeret acyklisk graf", men en sådan definition vil fortælle få mennesker, men faktisk er det en beholder til opgaver, der interagerer med hinanden (se nedenfor) eller en analog af Package i SSIS og Workflow i Informatica .

    Ud over dags kan der stadig være underdage, men dem når vi højst sandsynligt ikke.

  • DAG Løb - initialiseret dag, som tildeles sin egen execution_date. Dagrans af samme dag kan arbejde parallelt (hvis du har gjort dine opgaver idempotente, selvfølgelig).
  • Operatør er stykker kode, der er ansvarlige for at udføre en bestemt handling. Der er tre typer operatører:
    • Actionsom vores favorit PythonOperator, som kan udføre enhver (gyldig) Python-kode;
    • overførsel, som transporterer data fra sted til sted, f.eks. MsSqlToHiveTransfer;
    • sensor på den anden side vil det give dig mulighed for at reagere eller bremse den videre udførelse af dagen, indtil en begivenhed indtræffer. HttpSensor kan trække det angivne slutpunkt, og når det ønskede svar venter, startes overførslen GoogleCloudStorageToS3Operator. Et nysgerrigt sind vil spørge: "hvorfor? Du kan trods alt lave gentagelser direkte i operatøren!” Og så, for ikke at tilstoppe puljen af ​​opgaver med suspenderede operatører. Sensoren starter, kontrollerer og dør inden næste forsøg.
  • Opgaver - erklærede operatører, uanset type, og knyttet til dagen forfremmes til rang af opgave.
  • opgave instans - da den generelle planlægger besluttede, at det var tid til at sende opgaver i kamp på performer-arbejdere (lige på stedet, hvis vi bruger LocalExecutor eller til en fjernknude i tilfælde af CeleryExecutor), den tildeler en kontekst til dem (dvs. et sæt variabler - udførelsesparametre), udvider kommando- eller forespørgselsskabeloner og samler dem.

Vi genererer opgaver

Lad os først skitsere det generelle skema for vores doug, og så vil vi dykke ned i detaljerne mere og mere, fordi vi anvender nogle ikke-trivielle løsninger.

Så i sin enkleste form vil sådan en dag se sådan ud:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Lad os finde ud af det:

  • Først importerer vi de nødvendige libs og noget andet;
  • sql_server_ds - Er List[namedtuple[str, str]] med navnene på forbindelserne fra Airflow Connections og de databaser, som vi tager vores plade fra;
  • dag - annonceringen af ​​vores dag, som nødvendigvis skal være inde globals(), ellers finder Airflow det ikke. Doug skal også sige:
    • hvad er hans navn orders - dette navn vises derefter i webgrænsefladen,
    • at han skal arbejde fra midnat den ottende juli,
    • og den skal køre cirka hver 6. time (for hårde fyre her i stedet for timedelta() tilladt cron-linje 0 0 0/6 ? * * *, for de mindre seje - et udtryk som @daily);
  • workflow() vil gøre hovedopgaven, men ikke nu. Indtil videre dumper vi blot vores kontekst i loggen.
  • Og nu den simple magi ved at skabe opgaver:
    • vi løber gennem vores kilder;
    • initialisere PythonOperator, som vil henrette vores dummy workflow(). Glem ikke at angive et unikt (inden for dag) navn på opgaven og binde selve dagen. Flag provide_context til gengæld vil hælde yderligere argumenter ind i funktionen, som vi omhyggeligt vil indsamle ved hjælp af **context.

For nu er det alt. Hvad vi fik:

  • ny dag i webgrænsefladen,
  • halvandet hundrede opgaver, der vil blive udført parallelt (hvis Airflow, Selleri-indstillinger og serverkapacitet tillader det).

Nå, jeg fik det næsten.

Apache Airflow: Gør ETL lettere
Hvem vil installere afhængighederne?

For at forenkle det hele skruede jeg ind docker-compose.yml forarbejdning requirements.txt på alle noder.

Nu er det væk:

Apache Airflow: Gør ETL lettere

Grå firkanter er opgaveforekomster, der behandles af planlæggeren.

Vi venter lidt, opgaverne er snappet op af arbejderne:

Apache Airflow: Gør ETL lettere

De grønne har selvfølgelig fuldført deres arbejde. Røde er ikke særlig succesfulde.

Der er i øvrigt ingen folder på vores prod ./dags, der er ingen synkronisering mellem maskiner - alle dage ligger i git på vores Gitlab, og Gitlab CI distribuerer opdateringer til maskiner, når de flettes ind master.

Lidt om Blomst

Mens arbejderne tæsker vores sutter, lad os huske et andet værktøj, der kan vise os noget - Blomst.

Den allerførste side med oversigtsoplysninger om arbejderknudepunkter:

Apache Airflow: Gør ETL lettere

Den mest intense side med opgaver, der gik på arbejde:

Apache Airflow: Gør ETL lettere

Den mest kedelige side med status for vores mægler:

Apache Airflow: Gør ETL lettere

Den lyseste side er med opgavestatusgrafer og deres udførelsestid:

Apache Airflow: Gør ETL lettere

Vi læsser de underbelastede

Så alle opgaverne er løst, du kan bære de sårede væk.

Apache Airflow: Gør ETL lettere

Og der var mange sårede – af den ene eller anden grund. I tilfælde af korrekt brug af Airflow indikerer netop disse firkanter, at dataene bestemt ikke kom frem.

Du skal se loggen og genstarte de faldne opgaveforekomster.

Ved at klikke på en hvilken som helst firkant, vil vi se de handlinger, der er tilgængelige for os:

Apache Airflow: Gør ETL lettere

Du kan tage og gøre Clear the fallen. Det vil sige, vi glemmer, at noget er fejlet der, og den samme instansopgave vil gå til skemalæggeren.

Apache Airflow: Gør ETL lettere

Det er klart, at det ikke er særlig humant at gøre dette med musen med alle de røde firkanter – det er ikke det, vi forventer af Airflow. Naturligvis har vi masseødelæggelsesvåben: Browse/Task Instances

Apache Airflow: Gør ETL lettere

Lad os vælge alt på én gang og nulstille, klik på det korrekte element:

Apache Airflow: Gør ETL lettere

Efter rengøring ser vores taxaer sådan ud (de venter allerede på, at planlæggeren planlægger dem):

Apache Airflow: Gør ETL lettere

Forbindelser, kroge og andre variabler

Det er tid til at se på den næste DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Har alle nogensinde lavet en rapportopdatering? Dette er hende igen: der er en liste over kilder, hvorfra man kan hente data; der er en liste, hvor man skal sætte; glem ikke at tude, når alt skete eller gik i stykker (godt, det handler ikke om os, nej).

Lad os gennemgå filen igen og se på de nye obskure ting:

  • from commons.operators import TelegramBotSendMessage - intet forhindrer os i at lave vores egne operatører, hvilket vi udnyttede ved at lave en lille indpakning til at sende beskeder til Unblocked. (Vi vil tale mere om denne operatør nedenfor);
  • default_args={} - dag kan distribuere de samme argumenter til alle sine operatører;
  • to='{{ var.value.all_the_kings_men }}' - Mark to vi vil ikke have hårdkodet, men dynamisk genereret ved hjælp af Jinja og en variabel med en liste over e-mails, som jeg omhyggeligt satte ind Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — betingelse for at starte operatøren. I vores tilfælde vil brevet kun flyve til cheferne, hvis alle afhængigheder har fungeret succes;
  • tg_bot_conn_id='tg_main' - argumenter conn_id acceptere forbindelses-id'er, som vi opretter i Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - beskeder i Telegram vil kun flyve væk, hvis der er faldne opgaver;
  • task_concurrency=1 - vi forbyder samtidig lancering af flere opgaveforekomster af én opgave. Ellers får vi den samtidige lancering af flere VerticaOperator (ser på et bord);
  • report_update >> [email, tg] - Alle VerticaOperator konvergere i at sende breve og meddelelser, som dette:
    Apache Airflow: Gør ETL lettere

    Men da notifier-operatører har forskellige lanceringsbetingelser, vil kun én fungere. I trævisningen ser alt lidt mindre visuelt ud:
    Apache Airflow: Gør ETL lettere

Jeg vil sige et par ord om makroer og deres venner - variabler.

Makroer er Jinja-pladsholdere, der kan erstatte forskellige nyttige oplysninger med operatørargumenter. For eksempel sådan her:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} udvides til indholdet af kontekstvariablen execution_date i formatet YYYY-MM-DD: 2020-07-14. Det bedste er, at kontekstvariabler er naglet til en specifik opgaveforekomst (en firkant i trævisningen), og når de genstartes, udvides pladsholderne til de samme værdier.

De tildelte værdier kan ses ved at bruge knappen Gengivet på hver opgaveforekomst. Sådan er opgaven med at sende et brev:

Apache Airflow: Gør ETL lettere

Og så ved opgaven med at sende en besked:

Apache Airflow: Gør ETL lettere

En komplet liste over indbyggede makroer til den seneste tilgængelige version er tilgængelig her: makroreference

Desuden kan vi ved hjælp af plugins erklære vores egne makroer, men det er en anden historie.

Ud over de foruddefinerede ting kan vi erstatte værdierne af vores variabler (jeg brugte allerede dette i koden ovenfor). Lad os skabe ind Admin/Variables et par ting:

Apache Airflow: Gør ETL lettere

Alt hvad du kan bruge:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Værdien kan være en skalar, eller den kan også være JSON. I tilfælde af JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

brug bare stien til den ønskede nøgle: {{ var.json.bot_config.bot.token }}.

Jeg vil bogstaveligt talt sige et ord og vise et skærmbillede om tilslutning. Alt er elementært her: på siden Admin/Connections vi opretter en forbindelse, tilføjer vores logins/adgangskoder og mere specifikke parametre der. Sådan her:

Apache Airflow: Gør ETL lettere

Adgangskoder kan krypteres (mere grundigt end standard), eller du kan udelade forbindelsestypen (som jeg gjorde for tg_main) - faktum er, at listen over typer er fastkablet i Airflow-modeller og kan ikke udvides uden at komme ind i kildekoderne (hvis jeg pludselig ikke googlede noget, så ret mig venligst), men intet vil forhindre os i at få kreditter bare ved at navn.

Du kan også lave flere forbindelser med samme navn: i dette tilfælde metoden BaseHook.get_connection(), som får os forbindelser ved navn, vil give tilfældig fra flere navnebrødre (det ville være mere logisk at lave Round Robin, men lad os lade det være på Airflow-udviklernes samvittighed).

Variables og Connections er bestemt fede værktøjer, men det er vigtigt ikke at miste balancen: hvilke dele af dine flows du gemmer i selve koden, og hvilke dele du giver til Airflow til opbevaring. På den ene side kan det være praktisk hurtigt at ændre værdien, for eksempel en postboks, gennem brugergrænsefladen. Til gengæld er dette stadig en tilbagevenden til det museklik, som vi (jeg) gerne ville af med.

At arbejde med forbindelser er en af ​​opgaverne kroge. Generelt er Airflow-kroge punkter til at forbinde den til tredjepartstjenester og biblioteker. F.eks, JiraHook vil åbne en klient, så vi kan interagere med Jira (du kan flytte opgaver frem og tilbage), og ved hjælp af SambaHook du kan skubbe en lokal fil til smb-punkt.

Parser den tilpassede operator

Og vi kom tæt på at se på, hvordan det er lavet TelegramBotSendMessage

Kode commons/operators.py med den faktiske operatør:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Her, som alt andet i Airflow, er alt meget enkelt:

  • Nedarvet fra BaseOperator, som implementerer en del Airflow-specifikke ting (se på din fritid)
  • Deklarerede felter template_fields, hvor Jinja vil lede efter makroer at behandle.
  • Arrangeret de rigtige argumenter for __init__(), indstil standardindstillingerne, hvor det er nødvendigt.
  • Vi glemte heller ikke initialiseringen af ​​forfaderen.
  • Åbnede den tilsvarende krog TelegramBotHookmodtaget et klientobjekt fra det.
  • Tilsidesat (omdefineret) metode BaseOperator.execute(), som Airfow vil rykke, når tiden kommer til at starte operatøren - i den implementerer vi hovedhandlingen og glemmer at logge ind. (Vi logger forresten ind lige ind stdout и stderr - Luftstrømmen vil opsnappe alt, pakke det smukt ind, nedbryde det, hvor det er nødvendigt.)

Lad os se, hvad vi har commons/hooks.py. Den første del af filen, med selve krogen:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Jeg ved ikke engang, hvad jeg skal forklare her, jeg vil bare bemærke de vigtige punkter:

  • Vi arver, tænk på argumenterne - i de fleste tilfælde vil det være ét: conn_id;
  • Tilsidesættende standardmetoder: Jeg begrænsede mig selv get_conn(), hvor jeg får forbindelsesparametrene ved navn og får bare afsnittet extra (dette er et JSON-felt), hvori jeg (ifølge mine egne instruktioner!) sætter Telegram-bot-tokenet: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Jeg opretter en instans af vores TelegramBot, hvilket giver det et specifikt token.

Det er alt. Du kan få en klient fra en krog ved hjælp af TelegramBotHook().clent eller TelegramBotHook().get_conn().

Og den anden del af filen, hvor jeg laver en microwrapper til Telegram REST API, for ikke at trække det samme python-telegram-bot for én metode sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Den korrekte måde er at lægge det hele sammen: TelegramBotSendMessage, TelegramBotHook, TelegramBot - i pluginnet, læg et offentligt lager og giv det til Open Source.

Mens vi studerede alt dette, lykkedes det for vores rapportopdateringer at mislykkes og sendte mig en fejlmeddelelse i kanalen. Jeg vil tjekke om det er forkert...

Apache Airflow: Gør ETL lettere
Noget gik i stykker i vores doge! Var det ikke det, vi havde forventet? Nemlig!

Skal du hælde?

Føler du, at jeg gik glip af noget? Det ser ud til, at han lovede at overføre data fra SQL Server til Vertica, og så tog han det og flyttede fra emnet, slyngelen!

Denne grusomhed var bevidst, jeg var simpelthen nødt til at tyde noget terminologi for dig. Nu kan du gå videre.

Vores plan var denne:

  1. Gør dag
  2. Generer opgaver
  3. Se hvor smukt alt er
  4. Tildel sessionsnumre til fyld
  5. Hent data fra SQL Server
  6. Læg data ind i Vertica
  7. Saml statistik

Så for at få det hele op at køre, lavede jeg en lille tilføjelse til vores docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Der rejser vi:

  • Vertica som vært dwh med de fleste standardindstillinger,
  • tre forekomster af SQL Server,
  • vi udfylder databaserne i sidstnævnte med nogle data (under ingen omstændigheder skal du ikke se nærmere på mssql_init.py!)

Vi starter alt det gode ved hjælp af en lidt mere kompliceret kommando end sidste gang:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Hvad vores mirakel randomizer genererede, kan du bruge varen Data Profiling/Ad Hoc Query:

Apache Airflow: Gør ETL lettere
Det vigtigste er ikke at vise det til analytikere

uddybe ETL sessioner Det vil jeg ikke, alt er trivielt der: vi laver en base, der er et skilt i den, vi pakker alt ind med en kontekstmanager, og nu gør vi dette:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Tiden er inde indsamle vores data fra vores halvandet hundrede borde. Lad os gøre dette ved hjælp af meget uhøjtidelige linjer:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Ved hjælp af en krog får vi fra Airflow pymssql-Opret forbindelse
  2. Lad os erstatte en begrænsning i form af en dato i anmodningen - den vil blive kastet ind i funktionen af ​​skabelonmotoren.
  3. Foder vores anmodning pandashvem får os DataFrame - det vil være nyttigt for os i fremtiden.

Jeg bruger substitution {dt} i stedet for en anmodningsparameter %s ikke fordi jeg er en ond Pinocchio, men fordi pandas ikke kan klare pymssql og smutter det sidste params: Listselvom han virkelig gerne vil tuple.
Bemærk også, at udvikleren pymssql besluttede ikke at støtte ham mere, og det er tid til at flytte ud pyodbc.

Lad os se, hvad Airflow fyldte argumenterne for vores funktioner med:

Apache Airflow: Gør ETL lettere

Hvis der ikke er nogen data, så nytter det ikke noget at fortsætte. Men det er også mærkeligt at betragte fyldet som vellykket. Men dette er ikke en fejl. A-ah-ah, hvad skal man gøre?! Og her er hvad:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException vil fortælle Airflow, at der ikke er nogen fejl, men vi springer opgaven over. Grænsefladen vil ikke have en grøn eller rød firkant, men lyserød.

Lad os smide vores data flere kolonner:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Nemlig

  • Databasen, hvorfra vi tog ordrerne,
  • ID for vores oversvømmelsessession (det vil være anderledes til enhver opgave),
  • En hash fra kilden og ordre-id - så vi i den endelige database (hvor alt er hældt i én tabel) har et unikt ordre-id.

Næstsidste trin forbliver: hæld alt i Vertica. Og mærkeligt nok er en af ​​de mest spektakulære og effektive måder at gøre dette på gennem CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Vi laver en speciel modtager StringIO.
  2. pandas vil venligst sætte vores DataFrame som CSV-linjer.
  3. Lad os åbne en forbindelse til vores favorit Vertica med en krog.
  4. Og nu med hjælpen copy() send vores data direkte til Vertika!

Vi tager fra chaufføren, hvor mange linjer der var fyldt op, og fortæller sessionslederen, at alt er OK:

session.loaded_rows = cursor.rowcount
session.successful = True

Det er alt.

På udsalget opretter vi målpladen manuelt. Her tillod jeg mig selv en lille maskine:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

jeg bruger VerticaOperator() Jeg opretter et databaseskema og en tabel (hvis de ikke allerede eksisterer, selvfølgelig). Det vigtigste er at arrangere afhængighederne korrekt:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Opsummering

- Nå, - sagde den lille mus, - er det nu ikke
Er du overbevist om, at jeg er det mest forfærdelige dyr i skoven?

Julia Donaldson, The Gruffalo

Jeg tror, ​​hvis mine kolleger og jeg havde en konkurrence: hvem vil hurtigt skabe og lancere en ETL-proces fra bunden: de med deres SSIS og en mus og mig med Airflow ... Og så ville vi også sammenligne den nemme vedligeholdelse ... Wow, jeg tror, ​​du vil være enig i, at jeg vil slå dem på alle fronter!

Hvis det er lidt mere seriøst, så gjorde Apache Airflow - ved at beskrive processer i form af programkode - mit arbejde mere mere behageligt og behageligt.

Dens ubegrænsede udvidelsesmuligheder, både med hensyn til plug-ins og tilbøjelighed til skalerbarhed, giver dig mulighed for at bruge Airflow i næsten ethvert område: selv i den fulde cyklus med indsamling, forberedelse og behandling af data, selv ved opsendelse af raketter (til Mars, eller Rute).

Delfinale, reference og information

Raken har vi samlet til dig

  • start_date. Ja, dette er allerede et lokalt meme. Via Dougs hovedargument start_date alle bestå. Kort sagt, hvis du angiver i start_date nuværende dato, og schedule_interval - en dag, så starter DAG tidligst i morgen.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Og ikke flere problemer.

    Der er en anden runtime fejl forbundet med det: Task is missing the start_date parameter, hvilket oftest indikerer, at du har glemt at binde til dag-operatøren.

  • Alt sammen på én maskine. Ja, og baser (selve luftstrømmen og vores belægning), og en webserver og en planlægger og arbejdere. Og det virkede endda. Men med tiden voksede antallet af opgaver til tjenester, og da PostgreSQL begyndte at reagere på indekset på 20 s i stedet for 5 ms, tog vi det og bar det væk.
  • LocalExecutor. Ja, vi sidder stadig på den, og vi er allerede nået til kanten af ​​afgrunden. LocalExecutor har været nok for os indtil videre, men nu er det tid til at udvide med mindst én arbejder, og vi bliver nødt til at arbejde hårdt for at flytte til CeleryExecutor. Og i lyset af det faktum, at du kan arbejde med det på én maskine, forhindrer intet dig i at bruge Selleri, selv på en server, som "naturligvis aldrig vil gå i produktion, ærligt talt!"
  • Ikke-brug indbyggede værktøjer:
    • Tilslutninger at gemme serviceoplysninger,
    • SLA frøkener at reagere på opgaver, der ikke lykkedes til tiden,
    • XCom til metadataudveksling (sagde jeg metadata!) mellem dag opgaver.
  • Misbrug af mail. Hvad kan jeg sige? Der blev oprettet alarmer for alle gentagelser af faldne opgaver. Nu har mit arbejde Gmail >90 e-mails fra Airflow, og webmail-mundkurven nægter at hente og slette mere end 100 ad gangen.

Flere faldgruber: Apache Airflow Pitfails

Flere automatiseringsværktøjer

For at vi kan arbejde endnu mere med hovedet og ikke med hænderne, har Airflow forberedt os dette:

  • REST-API - han har stadig status som Eksperimentel, hvilket ikke forhindrer ham i at arbejde. Med den kan du ikke kun få information om dags og opgaver, men også stoppe/starte en dag, oprette et DAG Run eller en pulje.
  • CLI - Mange værktøjer er tilgængelige via kommandolinjen, som ikke bare er ubelejlige at bruge gennem WebUI, men som generelt er fraværende. For eksempel:
    • backfill nødvendig for at genstarte opgaveforekomster.
      For eksempel kom analytikere og sagde: ”Og du, kammerat, har noget sludder i dataene fra 1. til 13. januar! Reparer det, fix det, fix det, fix det!" Og du er sådan en kogeplade:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Basisservice: initdb, resetdb, upgradedb, checkdb.
    • run, som giver dig mulighed for at køre én instansopgave og endda score på alle afhængigheder. Desuden kan du køre det via LocalExecutor, også selvom du har en Selleriklynge.
    • Gør stort set det samme test, kun også i baser skriver intet.
    • connections tillader masseoprettelse af forbindelser fra skallen.
  • python api - en ret hardcore måde at interagere på, som er beregnet til plugins, og ikke myldrer i det med små hænder. Men hvem skal forhindre os i at gå til /home/airflow/dags, løb ipython og begynde at rode rundt? Du kan f.eks. eksportere alle forbindelser med følgende kode:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Tilslutning til Airflow-metadatabasen. Jeg anbefaler ikke at skrive til det, men at få opgavetilstande for forskellige specifikke metrics kan være meget hurtigere og nemmere end at bruge nogen af ​​API'erne.

    Lad os sige, at ikke alle vores opgaver er idempotente, men de kan nogle gange falde, og det er normalt. Men et par blokeringer er allerede mistænkelige, og det ville være nødvendigt at tjekke.

    Pas på SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

RЎSЃS <P "RєRё

Og selvfølgelig er de første ti links fra udstedelsen af ​​Google indholdet af Airflow-mappen fra mine bogmærker.

Og links brugt i artiklen:

Kilde: www.habr.com