Apache Airflow: Gjør ETL enklere

Hei, jeg heter Dmitry Logvinenko - dataingeniør ved analyseavdelingen i Vezet-gruppen av selskaper.

Jeg vil fortelle deg om et fantastisk verktøy for å utvikle ETL-prosesser - Apache Airflow. Men Airflow er så allsidig og mangefasettert at du bør se nærmere på det selv om du ikke er involvert i dataflyter, men har behov for med jevne mellomrom å starte eventuelle prosesser og overvåke utførelsen av dem.

Og ja, jeg vil ikke bare fortelle, men også vise: programmet har mye kode, skjermbilder og anbefalinger.

Apache Airflow: Gjør ETL enklere
Det du vanligvis ser når du googler ordet Airflow / Wikimedia Commons

innholdsfortegnelsen

Innledning

Apache Airflow er akkurat som Django:

  • skrevet i python
  • det er et flott administrasjonspanel,
  • kan utvides på ubestemt tid

- bare bedre, og den ble laget for helt andre formål, nemlig (som det er skrevet før kat):

  • kjøre og overvåke oppgaver på et ubegrenset antall maskiner (som mange selleri / Kubernetes og din samvittighet vil tillate deg)
  • med dynamisk arbeidsflytgenerering fra svært enkel å skrive og forstå Python-kode
  • og muligheten til å koble alle databaser og APIer med hverandre ved hjelp av både ferdige komponenter og hjemmelagde plugins (noe som er ekstremt enkelt).

Vi bruker Apache Airflow slik:

  • vi samler inn data fra ulike kilder (mange SQL Server- og PostgreSQL-instanser, ulike APIer med applikasjonsberegninger, til og med 1C) i DWH og ODS (vi har Vertica og Clickhouse).
  • hvor avansert cron, som starter datakonsolideringsprosessene på ODS, og også overvåker vedlikeholdet av dem.

Inntil nylig ble våre behov dekket av en liten server med 32 kjerner og 50 GB RAM. I Airflow fungerer dette:

  • mer 200 dags (egentlig arbeidsflyter, der vi fylte oppgaver),
  • i hver i gjennomsnitt 70 oppgaver,
  • denne godheten starter (også i gjennomsnitt) en gang i timen.

Og om hvordan vi utvidet, vil jeg skrive nedenfor, men la oss nå definere über-problemet som vi skal løse:

Det er tre originale SQL-servere, hver med 50 databaser - forekomster av henholdsvis ett prosjekt, de har samme struktur (nesten overalt, mua-ha-ha), noe som betyr at hver har en ordretabell (heldigvis en tabell med det) navn kan skyves inn i enhver virksomhet). Vi tar dataene ved å legge til tjenestefelt (kildeserver, kildedatabase, ETL-oppgave-ID) og kaster dem naivt inn i for eksempel Vertica.

La oss gå!

Hoveddelen, praktisk (og litt teoretisk)

Hvorfor gjør vi (og du)

Da trærne var store og jeg var enkel SQL-schik i en russisk detaljhandel, vi svindlet ETL-prosesser aka dataflyter ved å bruke to verktøy tilgjengelig for oss:

  • Informatica Power Center - et ekstremt spredningssystem, ekstremt produktivt, med egen maskinvare, egen versjonering. Jeg brukte Gud forby 1% av dens evner. Hvorfor? Vel, først av alt, dette grensesnittet, et sted fra 380-tallet, satte mentalt press på oss. For det andre er denne innretningen designet for ekstremt fancy prosesser, rasende gjenbruk av komponenter og andre veldig viktige bedriftstriks. Om det faktum at det koster, som vingen til Airbus AXNUMX / år, vil vi ikke si noe.

    Pass på, et skjermbilde kan skade folk under 30 litt

    Apache Airflow: Gjør ETL enklere

  • SQL Server Integration Server – Vi brukte denne kameraten i våre interne prosjektflyter. Vel, faktisk: vi bruker allerede SQL Server, og det ville på en eller annen måte være urimelig å ikke bruke ETL-verktøyene. Alt i den er bra: både grensesnittet er vakkert, og fremdriftsrapportene ... Men det er ikke derfor vi elsker programvareprodukter, åh, ikke for dette. Versjon den dtsx (som er XML med noder blandet på lagre) vi kan, men hva er vitsen? Hva med å lage en oppgavepakke som vil dra hundrevis av tabeller fra en server til en annen? Ja, hva hundre, pekefingeren vil falle av fra tjue stykker ved å klikke på museknappen. Men det ser definitivt mer fasjonabelt ut:

    Apache Airflow: Gjør ETL enklere

Vi så absolutt etter utveier. Til og med nesten kom til en selvskrevet SSIS-pakkegenerator ...

…og så fant en ny jobb meg. Og Apache Airflow overtok meg på den.

Da jeg fant ut at ETL-prosessbeskrivelser er enkel Python-kode, danset jeg bare ikke av glede. Dette er hvordan datastrømmer ble versjonert og forskjellig, og å helle tabeller med en enkelt struktur fra hundrevis av databaser inn i ett mål ble et spørsmål om Python-kode på halvannen eller to 13-tommers skjermer.

Montering av klyngen

La oss ikke ordne en helt barnehage, og ikke snakke om helt åpenbare ting her, som å installere Airflow, din valgte database, Selleri og andre tilfeller beskrevet i dokkene.

Slik at vi umiddelbart kan begynne eksperimenter, skisserte jeg docker-compose.yml der:

  • La oss faktisk heve Luftstrøm: Planlegger, webserver. Blomst vil også snurre der for å overvåke sellerioppgaver (fordi den allerede er presset inn apache/airflow:1.10.10-python3.7, men vi har ikke noe imot)
  • PostgreSQL, der Airflow vil skrive tjenesteinformasjonen sin (planleggerdata, utførelsesstatistikk osv.), og Celery vil merke fullførte oppgaver;
  • Redis, som vil fungere som en oppgavemegler for selleri;
  • Selleriarbeider, som vil være engasjert i direkte utførelse av oppgaver.
  • Til mappe ./dags vi vil legge til filene våre med beskrivelsen av dags. De vil bli plukket opp i farten, så det er ikke nødvendig å sjonglere hele stabelen etter hvert nys.

Noen steder er koden i eksemplene ikke fullstendig vist (for ikke å rote opp teksten), men et sted blir den modifisert i prosessen. Komplette eksempler på arbeidskode finner du i depotet https://github.com/dm-logv/airflow-tutorial.

Docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Merknader:

  • I sammenstillingen av komposisjonen stolte jeg i stor grad på det velkjente bildet pukkel/docker-luftstrøm - sørg for å sjekke det ut. Kanskje du ikke trenger noe annet i livet ditt.
  • Alle luftstrøminnstillinger er tilgjengelige ikke bare gjennom airflow.cfg, men også gjennom miljøvariabler (takk til utviklerne), som jeg ondsinnet utnyttet.
  • Naturligvis er den ikke produksjonsklar: Jeg satte bevisst ikke hjerteslag på containere, jeg brydde meg ikke om sikkerhet. Men jeg gjorde det minste egnet for våre eksperimentører.
  • Noter det:
    • Dag-mappen må være tilgjengelig for både planleggeren og arbeiderne.
    • Det samme gjelder alle tredjepartsbiblioteker - de må alle installeres på maskiner med planlegger og arbeidere.

Vel, nå er det enkelt:

$ docker-compose up --scale worker=3

Etter at alt har hevet seg, kan du se på nettgrensesnittene:

Grunnleggende begreper

Hvis du ikke forsto noe i alle disse "dagene", så her er en kort ordbok:

  • Scheduler - den viktigste onkelen i Airflow, som kontrollerer at roboter jobber hardt, og ikke en person: overvåker timeplanen, oppdaterer dag, lanserer oppgaver.

    Generelt, i eldre versjoner, hadde han problemer med hukommelsen (nei, ikke hukommelsestap, men lekkasjer) og den gamle parameteren forble til og med i konfigurasjonene run_duration – omstartsintervallet. Men nå er alt bra.

  • DAG (aka "dag") - "rettet asyklisk graf", men en slik definisjon vil fortelle få mennesker, men faktisk er det en beholder for oppgaver som samhandler med hverandre (se nedenfor) eller en analog av Package in SSIS og Workflow i Informatica .

    I tillegg til dags kan det fortsatt være underdager, men vi kommer mest sannsynlig ikke til dem.

  • DAG Løp - initialisert dag, som er tildelt sin egen execution_date. Dagrans av samme dag kan fungere parallelt (hvis du har gjort oppgavene dine idempotente, selvfølgelig).
  • operatør er kodebiter som er ansvarlige for å utføre en spesifikk handling. Det er tre typer operatører:
    • handlingsom vår favoritt PythonOperator, som kan kjøre hvilken som helst (gyldig) Python-kode;
    • overføre, som transporterer data fra sted til sted, for eksempel, MsSqlToHiveTransfer;
    • sensor på den annen side vil det tillate deg å reagere eller bremse den videre utførelsen av dagen til en hendelse inntreffer. HttpSensor kan trekke det angitte endepunktet, og når ønsket respons venter, start overføringen GoogleCloudStorageToS3Operator. Et nysgjerrig sinn vil spørre: "hvorfor? Tross alt kan du gjøre repetisjoner rett i operatøren!» Og så, for ikke å tette bassenget av oppgaver med suspenderte operatører. Sensoren starter, sjekker og dør før neste forsøk.
  • Oppgave - erklærte operatører, uavhengig av type, og knyttet til dagen forfremmes til rang av oppgave.
  • oppgaveforekomst - da generalplanleggeren bestemte at det var på tide å sende oppgaver i kamp på utøvere-arbeidere (rett på stedet, hvis vi bruker LocalExecutor eller til en ekstern node i tilfelle CeleryExecutor), den tildeler en kontekst til dem (dvs. et sett med variabler - utførelsesparametere), utvider kommando- eller spørringsmaler og samler dem.

Vi genererer oppgaver

Først, la oss skissere det generelle opplegget til dougen vår, og så vil vi dykke inn i detaljene mer og mer, fordi vi bruker noen ikke-trivielle løsninger.

Så, i sin enkleste form, vil en slik dag se slik ut:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

La oss finne ut av det:

  • Først importerer vi de nødvendige libs og noe annet;
  • sql_server_ds - Er List[namedtuple[str, str]] med navnene på tilkoblingene fra Airflow Connections og databasene som vi tar tallerkenen vår fra;
  • dag - kunngjøringen av vår dag, som nødvendigvis må være inne globals(), ellers finner ikke Airflow det. Doug må også si:
    • hva heter han orders - dette navnet vil da vises i nettgrensesnittet,
    • at han skal jobbe fra midnatt den åttende juli,
    • og den skal kjøre, omtrent hver 6. time (for tøffe gutter her i stedet for timedelta() tillatelig cron-linje 0 0 0/6 ? * * *, for de mindre kule - et uttrykk som @daily);
  • workflow() vil gjøre hovedjobben, men ikke nå. Foreløpig vil vi bare dumpe konteksten vår inn i loggen.
  • Og nå den enkle magien med å lage oppgaver:
    • vi går gjennom kildene våre;
    • initialisere PythonOperator, som vil henrette dummyen vår workflow(). Ikke glem å spesifisere et unikt (innen dag) navn på oppgaven og knytte selve dagen. Flagg provide_context på sin side vil helle ytterligere argumenter inn i funksjonen, som vi nøye samler inn ved hjelp av **context.

For nå er det alt. Hva vi fikk:

  • ny dag i webgrensesnittet,
  • ett og et halvt hundre oppgaver som vil bli utført parallelt (hvis Airflow, Selleri-innstillingene og serverkapasiteten tillater det).

Vel, fikk det nesten.

Apache Airflow: Gjør ETL enklere
Hvem skal installere avhengighetene?

For å forenkle hele greia, skrudde jeg inn docker-compose.yml behandling requirements.txt på alle noder.

Nå er det borte:

Apache Airflow: Gjør ETL enklere

Grå firkanter er oppgaveforekomster som behandles av planleggeren.

Vi venter litt, oppgavene snappes opp av arbeiderne:

Apache Airflow: Gjør ETL enklere

De grønne har selvfølgelig fullført arbeidet sitt. Røde er ikke særlig vellykkede.

Det er forresten ingen mappe på vår prod ./dags, det er ingen synkronisering mellom maskiner - alle dager ligger i git på vår Gitlab, og Gitlab CI distribuerer oppdateringer til maskiner ved sammenslåing master.

Litt om Blomst

Mens arbeiderne banker på smokkene våre, la oss huske et annet verktøy som kan vise oss noe – Flower.

Den aller første siden med sammendragsinformasjon om arbeidernoder:

Apache Airflow: Gjør ETL enklere

Den mest intense siden med oppgaver som gikk på jobb:

Apache Airflow: Gjør ETL enklere

Den kjedeligste siden med statusen til megleren vår:

Apache Airflow: Gjør ETL enklere

Den lyseste siden er med oppgavestatusgrafer og deres utførelsestid:

Apache Airflow: Gjør ETL enklere

Vi laster underbelastet

Så, alle oppgavene har ordnet seg, du kan bære bort de sårede.

Apache Airflow: Gjør ETL enklere

Og det var mange sårede – av en eller annen grunn. Ved riktig bruk av Airflow indikerer nettopp disse firkantene at dataene definitivt ikke kom frem.

Du må se på loggen og starte de falt oppgaveforekomstene på nytt.

Ved å klikke på hvilken som helst rute, vil vi se handlingene som er tilgjengelige for oss:

Apache Airflow: Gjør ETL enklere

Du kan ta og gjøre Clear the fallen. Det vil si at vi glemmer at noe har feilet der, og den samme instansoppgaven vil gå til planleggeren.

Apache Airflow: Gjør ETL enklere

Det er tydelig at det ikke er særlig humant å gjøre dette med musen med alle de røde rutene – det er ikke det vi forventer av Airflow. Naturligvis har vi masseødeleggelsesvåpen: Browse/Task Instances

Apache Airflow: Gjør ETL enklere

La oss velge alt på en gang og tilbakestille til null, klikk på riktig element:

Apache Airflow: Gjør ETL enklere

Etter rengjøring ser taxiene våre slik ut (de venter allerede på at planleggeren skal planlegge dem):

Apache Airflow: Gjør ETL enklere

Tilkoblinger, kroker og andre variabler

Det er på tide å se på neste DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Har alle noen gang gjort en rapportoppdatering? Dette er henne igjen: det er en liste over kilder hvor du kan hente dataene; det er en liste hvor du skal plassere; ikke glem å tute når alt skjedde eller gikk i stykker (vel, dette handler ikke om oss, nei).

La oss gå gjennom filen igjen og se på de nye obskure tingene:

  • from commons.operators import TelegramBotSendMessage – ingenting hindrer oss i å lage våre egne operatører, noe vi utnyttet ved å lage en liten innpakning for å sende meldinger til Unblocked. (Vi vil snakke mer om denne operatøren nedenfor);
  • default_args={} - dag kan distribuere de samme argumentene til alle sine operatører;
  • to='{{ var.value.all_the_kings_men }}' - felt to vi vil ikke ha hardkodet, men dynamisk generert ved hjelp av Jinja og en variabel med en liste over e-poster, som jeg forsiktig legger inn Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — betingelse for å starte operatøren. I vårt tilfelle vil brevet bare fly til sjefene hvis alle avhengigheter har løst seg vellykket;
  • tg_bot_conn_id='tg_main' - argumenter conn_id godta tilkoblings-IDer som vi oppretter i Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - meldinger i Telegram vil fly bort bare hvis det er falt oppgaver;
  • task_concurrency=1 - Vi forbyr samtidig lansering av flere oppgaveforekomster av én oppgave. Ellers vil vi få samtidig lansering av flere VerticaOperator (ser på ett bord);
  • report_update >> [email, tg] - alt VerticaOperator konvergerer i å sende brev og meldinger, som dette:
    Apache Airflow: Gjør ETL enklere

    Men siden varsleroperatører har forskjellige oppskytingsbetingelser, vil bare én fungere. I trevisningen ser alt litt mindre visuelt ut:
    Apache Airflow: Gjør ETL enklere

Jeg vil si noen ord om makroer og deres venner - variabler.

Makroer er Jinja-plassholdere som kan erstatte forskjellig nyttig informasjon med operatørargumenter. For eksempel slik:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} utvides til innholdet i kontekstvariabelen execution_date i format YYYY-MM-DD: 2020-07-14. Det beste er at kontekstvariablene er spikret til en spesifikk oppgaveforekomst (en firkant i trevisningen), og når de startes på nytt, vil plassholderne utvides til de samme verdiene.

De tildelte verdiene kan sees ved å bruke Gjengitt-knappen på hver oppgaveforekomst. Slik er oppgaven med å sende et brev:

Apache Airflow: Gjør ETL enklere

Og så ved oppgaven med å sende en melding:

Apache Airflow: Gjør ETL enklere

En komplett liste over innebygde makroer for den siste tilgjengelige versjonen er tilgjengelig her: makroreferanse

Dessuten, ved hjelp av plugins, kan vi deklarere våre egne makroer, men det er en annen historie.

I tillegg til de forhåndsdefinerte tingene, kan vi erstatte verdiene til variablene våre (jeg brukte allerede dette i koden ovenfor). La oss skape inn Admin/Variables et par ting:

Apache Airflow: Gjør ETL enklere

Alt du kan bruke:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Verdien kan være en skalar, eller den kan også være JSON. I tilfelle av JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

bare bruk banen til ønsket nøkkel: {{ var.json.bot_config.bot.token }}.

Jeg vil bokstavelig talt si ett ord og vise ett skjermbilde om forbindelse. Alt er elementært her: på siden Admin/Connections vi oppretter en tilkobling, legger til våre pålogginger/passord og mer spesifikke parametere der. Som dette:

Apache Airflow: Gjør ETL enklere

Passord kan krypteres (mer grundig enn standard), eller du kan utelate tilkoblingstypen (som jeg gjorde for tg_main) - faktum er at listen over typer er fastkoblet i Airflow-modeller og kan ikke utvides uten å komme inn i kildekodene (hvis jeg plutselig ikke googlet noe, vennligst korriger meg), men ingenting vil stoppe oss fra å få kreditter bare ved å Navn.

Du kan også lage flere forbindelser med samme navn: i dette tilfellet metoden BaseHook.get_connection(), som får oss forbindelser ved navn, vil gi tilfeldig fra flere navnebrødre (det ville vært mer logisk å lage Round Robin, men la oss la det ligge på Airflow-utviklernes samvittighet).

Variables and Connections er absolutt kule verktøy, men det er viktig å ikke miste balansen: hvilke deler av strømmene dine lagrer du i selve koden, og hvilke deler du gir til Airflow for lagring. På den ene siden kan det være praktisk å raskt endre verdien, for eksempel en postboks, gjennom brukergrensesnittet. På den annen side er dette fortsatt en tilbakevending til museklikket, som vi (jeg) ønsket å bli kvitt.

Å jobbe med koblinger er en av oppgavene kroker. Generelt er Airflow-kroker punkter for å koble den til tredjepartstjenester og biblioteker. f.eks. JiraHook vil åpne en klient for oss å samhandle med Jira (du kan flytte oppgaver frem og tilbake), og ved hjelp av SambaHook du kan skyve en lokal fil til smb-punkt.

Parser den tilpassede operatoren

Og vi kom nærme på å se på hvordan den er laget TelegramBotSendMessage

Kode commons/operators.py med den faktiske operatøren:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Her, som alt annet i Airflow, er alt veldig enkelt:

  • Arvet fra BaseOperator, som implementerer ganske mange Airflow-spesifikke ting (se på fritiden din)
  • Deklarerte felt template_fields, der Jinja vil se etter makroer å behandle.
  • Ordnet de riktige argumentene for __init__(), angi standardinnstillingene der det er nødvendig.
  • Vi glemte heller ikke initialiseringen av stamfaren.
  • Åpnet den tilsvarende kroken TelegramBotHookmottatt et klientobjekt fra den.
  • Overstyrt (redefinert) metode BaseOperator.execute(), som Airfow vil rykke når tiden kommer for å starte operatøren - i den vil vi implementere hovedhandlingen, glemme å logge på. (Vi logger forresten inn rett inn stdout и stderr - Luftstrømmen vil fange opp alt, pakke det vakkert inn, bryte det ned der det er nødvendig.)

La oss se hva vi har commons/hooks.py. Den første delen av filen, med selve kroken:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Jeg vet ikke engang hva jeg skal forklare her, jeg vil bare merke meg de viktige punktene:

  • Vi arver, tenk på argumentene - i de fleste tilfeller vil det være ett: conn_id;
  • Overordnede standardmetoder: Jeg begrenset meg selv get_conn(), der jeg får tilkoblingsparameterne etter navn og bare får delen extra (dette er et JSON-felt), der jeg (i henhold til mine egne instruksjoner!) legger inn Telegram bot-token: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Jeg lager en forekomst av vår TelegramBot, og gir den et spesifikt token.

Det er alt. Du kan få en klient fra en krok ved å bruke TelegramBotHook().clent eller TelegramBotHook().get_conn().

Og den andre delen av filen, der jeg lager en microwrapper for Telegram REST API, for ikke å dra det samme python-telegram-bot for én metode sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Den riktige måten er å legge alt sammen: TelegramBotSendMessage, TelegramBotHook, TelegramBot - i plugin, legg i et offentlig depot, og gi det til åpen kildekode.

Mens vi studerte alt dette, klarte rapportoppdateringene våre å mislykkes og sende meg en feilmelding i kanalen. Jeg skal sjekke om det er feil...

Apache Airflow: Gjør ETL enklere
Noe brakk i hunden vår! Var det ikke det vi ventet? Nøyaktig!

Skal du helle?

Føler du at jeg har gått glipp av noe? Det ser ut til at han lovet å overføre data fra SQL Server til Vertica, og så tok han det og flyttet fra temaet, skurken!

Denne grusomheten var med vilje, jeg måtte rett og slett tyde litt terminologi for deg. Nå kan du gå videre.

Planen vår var denne:

  1. Gjør dag
  2. Generer oppgaver
  3. Se hvor vakkert alt er
  4. Tilordne øktnummer til fyll
  5. Få data fra SQL Server
  6. Sett data inn i Vertica
  7. Samle statistikk

Så for å få alt dette i gang, laget jeg et lite tillegg til vår docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Der tar vi opp:

  • Vertica som vert dwh med de fleste standardinnstillingene,
  • tre forekomster av SQL Server,
  • vi fyller databasene i sistnevnte med noen data (ikke i noe tilfelle ikke se nærmere på mssql_init.py!)

Vi lanserer alt det gode ved hjelp av en litt mer komplisert kommando enn forrige gang:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Hva vår mirakel randomizer genererte, kan du bruke elementet Data Profiling/Ad Hoc Query:

Apache Airflow: Gjør ETL enklere
Det viktigste er ikke å vise det til analytikere

utdype om ETL økter Jeg vil ikke, alt er trivielt der: vi lager en base, det er et skilt i den, vi pakker alt inn med en kontekstleder, og nå gjør vi dette:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Tiden har kommet samle inn våre data fra våre ett og et halvt hundre bord. La oss gjøre dette ved hjelp av veldig upretensiøse linjer:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Ved hjelp av en krok får vi fra Airflow pymssql-koble
  2. La oss erstatte en begrensning i form av en dato i forespørselen - den vil bli kastet inn i funksjonen av malmotoren.
  3. Mater forespørselen vår pandashvem skal få oss DataFrame – det vil være nyttig for oss i fremtiden.

Jeg bruker substitusjon {dt} i stedet for en forespørselsparameter %s ikke fordi jeg er en ond Pinocchio, men fordi pandas kan ikke håndtere pymssql og glir den siste params: Listselv om han virkelig vil tuple.
Merk også at utvikleren pymssql bestemte seg for ikke å støtte ham lenger, og det er på tide å flytte ut pyodbc.

La oss se hva Airflow fylte argumentene for funksjonene våre med:

Apache Airflow: Gjør ETL enklere

Hvis det ikke finnes data, er det ingen vits i å fortsette. Men det er også rart å anse fyllingen som vellykket. Men dette er ikke en feil. A-ah-ah, hva skal man gjøre?! Og her er hva:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException forteller Airflow at det ikke er noen feil, men vi hopper over oppgaven. Grensesnittet vil ikke ha en grønn eller rød firkant, men rosa.

La oss kaste dataene våre flere kolonner:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

nemlig

  • Databasen som vi tok bestillingene fra,
  • ID for flomøkten vår (den vil være annerledes for hver oppgave),
  • En hash fra kilden og ordre-ID - slik at vi i den endelige databasen (der alt er hellet inn i en tabell) har en unik ordre-ID.

Det nest siste trinnet gjenstår: hell alt i Vertica. Og merkelig nok er en av de mest spektakulære og effektive måtene å gjøre dette på gjennom CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Vi lager en spesiell mottaker StringIO.
  2. pandas vil vennligst sette vår DataFrame i formen CSV-linjer.
  3. La oss åpne en forbindelse til vår favoritt Vertica med en krok.
  4. Og nå med hjelp copy() send våre data direkte til Vertika!

Vi tar fra sjåføren hvor mange linjer som ble fylt opp, og forteller sesjonslederen at alt er OK:

session.loaded_rows = cursor.rowcount
session.successful = True

Det er alt.

På salget lager vi målplaten manuelt. Her tillot jeg meg selv en liten maskin:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

jeg bruker VerticaOperator() Jeg lager et databaseskjema og en tabell (hvis de ikke allerede eksisterer, selvfølgelig). Det viktigste er å ordne avhengighetene riktig:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Oppsummering

- Vel, - sa den lille musa, - ikke sant, nå
Er du overbevist om at jeg er det mest forferdelige dyret i skogen?

Julia Donaldson, The Gruffalo

Jeg tror at hvis kollegene mine og jeg hadde en konkurranse: hvem vil raskt lage og starte en ETL-prosess fra bunnen av: de med SSIS og en mus og meg med Airflow ... Og så ville vi også sammenlignet vedlikeholdsvennligheten ... Wow, jeg tror du vil være enig i at jeg skal slå dem på alle fronter!

Om litt mer seriøst, så gjorde Apache Airflow - ved å beskrive prosesser i form av programkode - jobben min mye mer behagelig og hyggelig.

Dens ubegrensede utvidbarhet, både når det gjelder plug-ins og disposisjon for skalerbarhet, gir deg muligheten til å bruke Airflow i nesten alle områder: selv i hele syklusen med innsamling, forberedelse og behandling av data, selv ved oppskyting av raketter (til Mars, eller kurs).

Delfinale, referanse og informasjon

Raken har vi samlet inn til deg

  • start_date. Ja, dette er allerede et lokalt meme. Via Dougs hovedargument start_date alle passerer. Kort sagt, hvis du spesifiserer i start_date gjeldende dato, og schedule_interval – en dag, så starter DAG i morgen tidligst.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Og ingen flere problemer.

    Det er en annen kjøretidsfeil knyttet til den: Task is missing the start_date parameter, som oftest indikerer at du har glemt å binde til dag-operatøren.

  • Alt på én maskin. Ja, og baser (selve luftstrømmen og belegget vårt), og en webserver, og en planlegger, og arbeidere. Og det fungerte til og med. Men over tid vokste antallet oppgaver for tjenester, og da PostgreSQL begynte å svare på indeksen på 20 s i stedet for 5 ms, tok vi den og bar den bort.
  • LocalExecutor. Ja, vi sitter fortsatt på den, og vi har allerede kommet til kanten av avgrunnen. LocalExecutor har vært nok for oss så langt, men nå er det på tide å utvide med minst én arbeider, og vi må jobbe hardt for å flytte til CeleryExecutor. Og i lys av det faktum at du kan jobbe med det på én maskin, er det ingenting som hindrer deg i å bruke Selleri selv på en server, som "selvfølgelig aldri vil gå i produksjon, ærlig talt!"
  • Ikke-bruk innebygde verktøy:
    • Tilkoblinger å lagre tjenestelegitimasjon,
    • SLA frøkener å svare på oppgaver som ikke fungerte i tide,
    • xcom for metadatautveksling (sa jeg metadata!) mellom dagoppgaver.
  • Misbruk av post. Vel, hva kan jeg si? Det ble satt opp varsler for alle repetisjoner av falt oppgaver. Nå har Gmail på jobben mer enn 90 100 e-poster fra Airflow, og nettposten nekter å hente og slette mer enn XNUMX om gangen.

Flere fallgruver: Apache Airflow Pitfails

Flere automatiseringsverktøy

For at vi skal jobbe enda mer med hodet og ikke med hendene, har Airflow forberedt dette for oss:

  • REST API – han har fortsatt status som Eksperimentell, noe som ikke hindrer ham i å jobbe. Med den kan du ikke bare få informasjon om dags og oppgaver, men også stoppe/starte en dag, lage et DAG Run eller et basseng.
  • CLI - Mange verktøy er tilgjengelige via kommandolinjen som ikke bare er upraktiske å bruke gjennom WebUI, men som generelt er fraværende. For eksempel:
    • backfill nødvendig for å starte oppgaveforekomster på nytt.
      For eksempel kom analytikere og sa: «Og du, kamerat, har tull i dataene fra 1. til 13. januar! Fiks det, fiks det, fiks det, fiks det!" Og du er en slik kokeplate:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Grunntjeneste: initdb, resetdb, upgradedb, checkdb.
    • run, som lar deg kjøre én forekomstoppgave, og til og med score på alle avhengigheter. Dessuten kan du kjøre den via LocalExecutor, selv om du har en selleriklynge.
    • Gjør stort sett det samme test, bare også i baser skriver ingenting.
    • connections tillater masseoppretting av forbindelser fra skallet.
  • python api - en ganske hardcore måte å samhandle på, som er beregnet for plugins, og ikke sverme i den med små hender. Men hvem skal hindre oss i å gå til /home/airflow/dags, løpe ipython og begynne å rote? Du kan for eksempel eksportere alle tilkoblinger med følgende kode:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Kobler til Airflow-metadatabasen. Jeg anbefaler ikke å skrive til den, men å få oppgavetilstander for ulike spesifikke beregninger kan være mye raskere og enklere enn å bruke noen av APIene.

    La oss si at ikke alle oppgavene våre er idempotente, men de kan noen ganger falle, og dette er normalt. Men noen få blokkeringer er allerede mistenkelige, og det vil være nødvendig å sjekke.

    Pass på SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

referanser

Og selvfølgelig er de ti første koblingene fra utstedelsen av Google innholdet i Airflow-mappen fra bokmerkene mine.

Og lenkene som brukes i artikkelen:

Kilde: www.habr.com