Apache Airflow: ETL:n helpottaminen

Hei, olen Dmitry Logvinenko - Vezet-yritysryhmän Analytics-osaston tietoinsinööri.

Kerron sinulle upeasta työkalusta ETL-prosessien kehittämiseen - Apache Airflow. Mutta Airflow on niin monipuolinen ja monipuolinen, että sitä kannattaa tarkastella lähemmin, vaikka et olisikaan mukana tietovirroissa, mutta joutuisit ajoittain käynnistämään prosesseja ja seuraamaan niiden toteutumista.

Ja kyllä, en vain kerro, vaan myös näytän: ohjelmassa on paljon koodia, kuvakaappauksia ja suosituksia.

Apache Airflow: ETL:n helpottaminen
Mitä yleensä näet, kun googletat sanaa Airflow / Wikimedia Commons

sisällysluettelo

Esittely

Apache Airflow on aivan kuin Django:

  • kirjoitettu pythonilla
  • siellä on loistava hallintapaneeli,
  • laajennettavissa toistaiseksi

- vain parempi, ja se tehtiin täysin eri tarkoituksiin, nimittäin (kuten ennen kattia on kirjoitettu):

  • tehtävien suorittaminen ja valvonta rajoittamattomalla määrällä koneita (niin monta selleriä/kubernettiä ja omatunto sallii)
  • dynaamisen työnkulun luomisen kanssa erittäin helposti kirjoitettavasta ja ymmärrettävästä Python-koodista
  • ja kyky yhdistää tietokannat ja API:t toisiinsa käyttämällä sekä valmiita komponentteja että kotitekoisia laajennuksia (mikä on erittäin yksinkertaista).

Käytämme Apache Airflowta seuraavasti:

  • keräämme tietoja eri lähteistä (monet SQL Server- ja PostgreSQL-instanssit, erilaiset API:t sovellusmittareineen, jopa 1C) DWH:ssa ja ODS:ssä (meillä on Vertica ja Clickhouse).
  • kuinka edistynyt cron, joka käynnistää ODS:n tietojen konsolidointiprosessit ja valvoo myös niiden ylläpitoa.

Viime aikoihin asti tarpeemme kattoi yksi pieni palvelin, jossa oli 32 ydintä ja 50 Gt RAM-muistia. Airflow:ssa tämä toimii:

  • lisää 200 päivää (itse asiassa työnkulkuja, joihin täyttimme tehtäviä),
  • jokaisessa keskimäärin 70 tehtävää,
  • tämä hyvyys alkaa (myös keskimäärin) kerran tunnissa.

Ja kuinka laajennimme, kirjoitan alla, mutta nyt määritellään se yber-ongelma, jonka ratkaisemme:

Alkuperäisiä SQL-palvelimia on kolme, joissa kussakin on 50 tietokantaa - yhden projektin esiintymiä, vastaavasti, niillä on sama rakenne (melkein kaikkialla, mua-ha-ha), mikä tarkoittaa, että jokaisessa on Tilaukset-taulukko (onneksi taulukko, jossa se on nimi voidaan työntää mihin tahansa liiketoimintaan). Otamme tiedot lisäämällä palvelukenttiä (lähdepalvelin, lähdetietokanta, ETL-tehtävätunnus) ja heitämme ne naiivisti vaikkapa Verticaan.

Mennään!

Pääosa, käytännöllinen (ja hieman teoreettinen)

Miksi me (ja sinä)

Kun puut olivat isoja ja minä olin yksinkertainen SQL-schik yhdessä venäläisessä vähittäiskaupassa, huijasimme ETL-prosesseja eli tietovirtoja käyttämällä kahta käytettävissämme olevaa työkalua:

  • Informatica Power Center - erittäin levittävä järjestelmä, erittäin tuottava, omalla laitteistollaan, omalla versioillaan. Käytin Jumala varjelkoon 1% sen kyvyistä. Miksi? Ensinnäkin tämä käyttöliittymä, jostain 380-luvulta, painoi meitä henkisesti. Toiseksi tämä konsti on suunniteltu erittäin hienoihin prosesseihin, raivokkaaseen komponenttien uudelleenkäyttöön ja muihin erittäin tärkeisiin yritystemppuihin. Siitä tosiasiasta, että se maksaa, kuten Airbus AXNUMX:n siipi / vuosi, emme sano mitään.

    Varo, kuvakaappaus voi vahingoittaa hieman alle 30-vuotiaita

    Apache Airflow: ETL:n helpottaminen

  • SQL Server Integration Server - Käytimme tätä toveria projektin sisäisissä virroissamme. No itse asiassa: käytämme jo SQL Serveriä, ja olisi jotenkin kohtuutonta olla käyttämättä sen ETL-työkaluja. Kaikki siinä on hyvää: sekä käyttöliittymä on kaunis, että edistymisraportit... Mutta emme siksi rakasta ohjelmistotuotteita, oi, ei tätä varten. Versio se dtsx (joka on XML, jossa solmut sekoitetaan tallennuksen yhteydessä) voimme, mutta mitä järkeä on? Mitä jos tekisit tehtäväpaketin, joka vetää satoja taulukoita palvelimelta toiselle? Kyllä, mikä sata, etusormesi putoaa kahdestakymmenestä palasta hiiren painiketta napsauttamalla. Mutta hän näyttää ehdottomasti muodikkaammalta:

    Apache Airflow: ETL:n helpottaminen

Etsimme varmasti ulospääsyä. Tapaus jopa melkein tuli itse kirjoitettuun SSIS-pakettigeneraattoriin ...

…ja sitten sain uuden työpaikan. Ja Apache Airflow ohitti minut siinä.

Kun sain selville, että ETL-prosessikuvaukset ovat yksinkertaista Python-koodia, en vain tanssinut ilosta. Näin tietovirrat versioitiin ja diffattiin, ja yksirakenneisten taulukoiden kaatamisesta sadoista tietokannoista yhteen kohteeseen tuli Python-koodia puolessatoista tai kahdessa 13” näytössä.

Klusterin kokoaminen

Älkäämme järjestäkö täysin päiväkotia, äläkä puhu täällä täysin itsestään selvistä asioista, kuten Airflown asentamisesta, valitsemastasi tietokannasta, Sellerin ja muista telakoissa kuvatuista tapauksista.

Jotta voimme aloittaa kokeilut välittömästi, luonnostelin docker-compose.yml jossa:

  • Nostetaan oikeasti Ilmavirta: Aikataulu, Web-palvelin. Flower tulee myös pyörimään siellä valvomaan Selleritehtäviä (koska se on jo työnnetty apache/airflow:1.10.10-python3.7, mutta emme välitä)
  • PostgreSQL, johon Airflow kirjoittaa palvelutietonsa (ajoitustiedot, suoritustilastot jne.), ja Selleri merkitsee suoritetut tehtävät;
  • Redis, joka toimii sellerin tehtävävälittäjänä;
  • Selleri työntekijä, joka osallistuu suoraan tehtävien suorittamiseen.
  • Kansioon ./dags lisäämme tiedostomme dags-kuvauksen kanssa. Ne poimitaan lennossa, joten koko pinoa ei tarvitse jongleerata jokaisen aivastauksen jälkeen.

Joissain paikoissa esimerkkien koodia ei näytetä kokonaan (jotta teksti ei sotkeutuisi), mutta jossain sitä muutetaan prosessin aikana. Täydelliset toimivat koodiesimerkit löytyvät arkistosta https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Huomautuksia:

  • Sävellyksen kokoonpanossa luotin pitkälti tunnettuun kuvaan puckel/doccker-ilmavirta - muista tarkistaa se. Ehkä et tarvitse mitään muuta elämääsi.
  • Kaikki ilmavirran asetukset ovat käytettävissä paitsi kautta airflow.cfg, mutta myös ympäristömuuttujien kautta (kiitos kehittäjille), joita käytin ilkeästi hyväkseni.
  • Se ei tietenkään ole tuotantovalmis: en tietoisesti laittanut sykettä konteille, en vaivautunut turvallisuuteen. Mutta tein kokeilijoillemme sopivan minimin.
  • Ota huomioon, että:
    • dag-kansion on oltava sekä ajoittajan että työntekijöiden käytettävissä.
    • Sama koskee kaikkia kolmannen osapuolen kirjastoja - ne kaikki on asennettava koneille, joissa on ajastin ja työntekijät.

No nyt on yksinkertaista:

$ docker-compose up --scale worker=3

Kun kaikki on noussut, voit katsoa verkkokäyttöliittymiä:

Peruskäsitteet

Jos et ymmärtänyt mitään kaikista näistä "päivistä", tässä on lyhyt sanakirja:

  • Scheduler - Airflown tärkein setä, joka hallitsee, että robotit työskentelevät kovasti, ei ihminen: tarkkailee aikataulua, päivittää päivämääriä, käynnistää tehtäviä.

    Yleensä vanhemmissa versioissa hänellä oli ongelmia muistin kanssa (ei, ei muistinmenetystä, vaan vuotoja) ja vanha parametri jäi jopa asetuksiin run_duration - sen uudelleenkäynnistysväli. Mutta nyt kaikki on hyvin.

  • PÄIVÄ (alias "dag") - "suunnattu asyklinen graafi", mutta tällainen määritelmä kertoo harvoille ihmisille, mutta itse asiassa se on säilö tehtäville, jotka ovat vuorovaikutuksessa toistensa kanssa (katso alla) tai analoginen paketille SSIS:ssä ja Workflow in Informaticassa .

    Dagien lisäksi voi vielä olla alidageja, mutta emme todennäköisesti pääse niihin.

  • DAG Juoksu - alustettu dag, jolle on määritetty oma execution_date. Saman dag:n Dagranit voivat toimia rinnakkain (jos olet tehnyt tehtäväsi idempotenteiksi, tietysti).
  • operaattori ovat koodinpätkiä, jotka vastaavat tietyn toiminnon suorittamisesta. Operaattoreita on kolmenlaisia:
    • toimintakuten meidän suosikkimme PythonOperator, joka voi suorittaa minkä tahansa (kelvollisen) Python-koodin;
    • siirtää, jotka kuljettavat tietoja paikasta toiseen, esim. MsSqlToHiveTransfer;
    • anturi toisaalta sen avulla voit reagoida tai hidastaa dag:n jatkoa, kunnes tapahtuma tapahtuu. HttpSensor voi vetää määritetyn päätepisteen, ja kun haluttu vastaus odottaa, aloita siirto GoogleCloudStorageToS3Operator. Utelias mieli kysyy: "miksi? Loppujen lopuksi voit tehdä toistoja suoraan operaattorissa!” Ja sitten, jotta tehtävien joukko ei tukkeutuisi keskeytetyillä operaattoreilla. Anturi käynnistyy, tarkistaa ja sammuu ennen seuraavaa yritystä.
  • Tehtävä - Ilmoitetut operaattorit, tyypistä riippumatta ja jotka on liitetty dag:iin, ylennetään tehtävän arvoon.
  • tehtävän esimerkki - kun yleissuunnittelija päätti, että on aika lähettää tehtävät taisteluun esiintyjätyöntekijöiden päälle (oikein paikan päällä, jos käytämme LocalExecutor tai etäsolmuun, jos kyseessä on CeleryExecutor), se määrittää niille kontekstin (eli joukon muuttujia - suoritusparametreja), laajentaa komento- tai kyselymalleja ja yhdistää ne.

Luomme tehtäviä

Ensin hahmotellaan dougimme yleinen kaavio, ja sitten sukeltamme yksityiskohtiin yhä enemmän, koska käytämme joitain ei-triviaaleja ratkaisuja.

Joten yksinkertaisimmassa muodossaan tällainen dag näyttää tältä:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Ymmärretään:

  • Ensin tuomme tarvittavat libsit ja jotain muuta;
  • sql_server_ds - Onko List[namedtuple[str, str]] Airflow Connectionsin yhteyksien nimet ja tietokannat, joista otamme levymme;
  • dag - ilmoitus päivästämme, jonka on välttämättä oltava mukana globals(), muuten Airflow ei löydä sitä. Dougin on myös sanottava:
    • mikä hänen nimensä on orders - tämä nimi tulee näkyviin verkkokäyttöliittymään,
    • että hän työskentelee keskiyöstä XNUMX. heinäkuuta,
    • ja sen pitäisi käydä noin 6 tunnin välein (koville miehille täällä sen sijaan timedelta() hyväksyttäväksi cron-linja 0 0 0/6 ? * * *, vähemmän cool - ilmaus kuin @daily);
  • workflow() tekee päätyön, mutta ei nyt. Toistaiseksi vain upotamme kontekstimme lokiin.
  • Ja nyt tehtävien luomisen yksinkertainen taika:
    • käymme läpi lähteemme;
    • alustaa PythonOperator, joka suorittaa nuken workflow(). Älä unohda määrittää tehtävälle yksilöivää (dag:n sisällä) nimeä ja sitoa itse dag. Lippu provide_context puolestaan ​​lisää funktioon lisäargumentteja, joita keräämme huolellisesti käyttämällä **context.

Toistaiseksi siinä kaikki. Mitä saimme:

  • uusi päivä verkkokäyttöliittymässä,
  • puolitoista sataa tehtävää, jotka suoritetaan rinnakkain (jos Airflow-, Sellery-asetukset ja palvelimen kapasiteetti sen sallivat).

No, melkein sain sen.

Apache Airflow: ETL:n helpottaminen
Kuka asentaa riippuvuudet?

Yksinkertaistaakseni tätä koko asiaa, menin asiaan docker-compose.yml käsittelyä requirements.txt kaikissa solmuissa.

Nyt se on poissa:

Apache Airflow: ETL:n helpottaminen

Harmaat neliöt ovat ajastimen käsittelemiä tehtävänäytteitä.

Odotellaan vähän, työt napsautetaan työntekijöiden toimesta:

Apache Airflow: ETL:n helpottaminen

Vihreät ovat tietysti saaneet työnsä onnistuneesti päätökseen. Punaiset eivät ole kovin menestyviä.

Muuten, tuotteessamme ei ole kansiota ./dags, koneiden välillä ei ole synkronointia - kaikki päivät ovat sisällä git Gitlabissamme, ja Gitlab CI jakaa päivityksiä koneille, kun ne yhdistetään master.

Vähän Flowerista

Sillä aikaa, kun työntekijät puskevat tuttejamme, muistetaan toinen työkalu, joka voi näyttää meille jotain - Kukka.

Aivan ensimmäinen sivu, jossa on yhteenveto työntekijöiden solmuista:

Apache Airflow: ETL:n helpottaminen

Intensiivisin sivu, jossa on suoritettuja tehtäviä:

Apache Airflow: ETL:n helpottaminen

Tylsin sivu välittäjämme statuksella:

Apache Airflow: ETL:n helpottaminen

Kirkkaimmalla sivulla on tehtävän tilakaaviot ja niiden suoritusaika:

Apache Airflow: ETL:n helpottaminen

Lataamme alikuormitetut

Joten kaikki tehtävät on suoritettu, voit viedä haavoittuneet pois.

Apache Airflow: ETL:n helpottaminen

Ja haavoittuneita oli monia - syystä tai toisesta. Jos Airflowa käytetään oikein, juuri nämä neliöt osoittavat, että tiedot eivät todellakaan saapuneet perille.

Sinun on katsottava lokia ja käynnistettävä kaatuneet tehtävät uudelleen.

Napsauta mitä tahansa neliötä, näemme käytettävissämme olevat toiminnot:

Apache Airflow: ETL:n helpottaminen

Voit ottaa ja poistaa kaatuneet. Toisin sanoen unohdamme, että jokin on epäonnistunut siellä, ja sama ilmentymätehtävä menee ajoittimeen.

Apache Airflow: ETL:n helpottaminen

On selvää, että tämän tekeminen hiirellä kaikilla punaisilla neliöillä ei ole kovin inhimillistä - tätä emme odota Airflowlta. Luonnollisesti meillä on joukkotuhoaseita: Browse/Task Instances

Apache Airflow: ETL:n helpottaminen

Valitaan kaikki kerralla ja nollataan, napsauta oikeaa kohdetta:

Apache Airflow: ETL:n helpottaminen

Siivouksen jälkeen taksimme näyttävät tältä (ne odottavat jo aikatauluttajaa aikatauluttamaan ne):

Apache Airflow: ETL:n helpottaminen

Liitännät, koukut ja muut muuttujat

On aika katsoa seuraavaa DAG:ta, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Ovatko kaikki koskaan päivittäneet raporttia? Tämä on taas hän: siellä on luettelo lähteistä, joista tiedot saa; siellä on luettelo, mihin sijoittaa; älä unohda puhua, kun kaikki tapahtui tai meni rikki (no, tämä ei koske meitä, ei).

Käydään tiedosto läpi uudelleen ja katsotaan uusia epäselviä juttuja:

  • from commons.operators import TelegramBotSendMessage - mikään ei estä meitä tekemästä omia operaattoreita, joita hyödynsimme tekemällä pienen kääreen viestien lähettämiseen Unblockedille. (Puhumme lisää tästä operaattorista alla);
  • default_args={} - dag voi jakaa samat argumentit kaikille operaattoreilleen;
  • to='{{ var.value.all_the_kings_men }}' - kenttä to meillä ei ole kovakoodattua, vaan dynaamisesti luotua käyttämällä Jinjaa ja muuttujaa sähköpostilistalla, jonka laitoin huolellisesti Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — käyttäjän käynnistämisen ehto. Meidän tapauksessamme kirje lentää pomoille vain, jos kaikki riippuvuudet ovat selvinneet onnistuneesti;
  • tg_bot_conn_id='tg_main' - argumentit conn_id hyväksyä luomamme yhteystunnukset Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Telegramin viestit lentävät pois vain, jos tehtäviä on kaatunut;
  • task_concurrency=1 - kielletään yhden tehtävän useiden tehtäväinstanssien samanaikainen käynnistäminen. Muuten saamme useiden samanaikaisen käynnistyksen VerticaOperator (katso yhtä pöytää);
  • report_update >> [email, tg] - kaikki VerticaOperator lähentyvät kirjeitä ja viestejä, kuten tämä:
    Apache Airflow: ETL:n helpottaminen

    Mutta koska ilmoittajaoperaattoreilla on erilaiset käynnistysehdot, vain yksi toimii. Puunäkymässä kaikki näyttää hieman vähemmän visuaalliselta:
    Apache Airflow: ETL:n helpottaminen

Sanon muutaman sanan aiheesta makroja ja heidän ystävänsä - muuttujia.

Makrot ovat Jinja-paikkamerkkejä, jotka voivat korvata erilaisia ​​hyödyllisiä tietoja operaattoriargumenteiksi. Esimerkiksi näin:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} laajenee kontekstimuuttujan sisältöön execution_date muodossa YYYY-MM-DD: 2020-07-14. Parasta on, että kontekstimuuttujat naulataan tiettyyn tehtäväesiintymään (neliöön puunäkymässä), ja kun käynnistetään uudelleen, paikkamerkit laajenevat samoihin arvoihin.

Määritettyjä arvoja voi tarkastella kunkin tehtävän ilmentymän Rendered-painikkeella. Kirjeen lähettäminen tapahtuu seuraavasti:

Apache Airflow: ETL:n helpottaminen

Ja niin tehtävässä viestin lähettämisessä:

Apache Airflow: ETL:n helpottaminen

Täydellinen luettelo uusimman saatavilla olevan version sisäänrakennetuista makroista on saatavilla täältä: makroviittaus

Lisäksi pluginien avulla voimme ilmoittaa omat makromme, mutta se on toinen tarina.

Ennalta määritettyjen asioiden lisäksi voimme korvata muuttujien arvot (käytin tätä jo yllä olevassa koodissa). Luodaan sisään Admin/Variables pari asiaa:

Apache Airflow: ETL:n helpottaminen

Kaikki mitä voit käyttää:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Arvo voi olla skalaari tai se voi olla myös JSON. JSON:n tapauksessa:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

käytä vain polkua haluttuun avaimeen: {{ var.json.bot_config.bot.token }}.

Sanon kirjaimellisesti yhden sanan ja näytän yhden kuvakaappauksen aiheesta yhteys. Kaikki on alkeellista täällä: sivulla Admin/Connections luomme yhteyden, lisäämme sisäänkirjautumistunnuksemme / salasanamme ja tarkemmat parametrit sinne. Kuten tämä:

Apache Airflow: ETL:n helpottaminen

Salasanat voidaan salata (oletusarvoa perusteellisemmin) tai voit jättää yhteystyypin pois (kuten tein tg_main) - Tosiasia on, että tyyppiluettelo on langallinen Airflow-malleissa, eikä sitä voi laajentaa ilman lähdekoodeja (jos yhtäkkiä en googlettanut jotain, korjaa minua), mutta mikään ei estä meitä saamasta krediittejä. nimi.

Voit myös muodostaa useita yhteyksiä samalla nimellä: tässä tapauksessa menetelmä BaseHook.get_connection(), joka saa meille yhteyksiä nimellä, antaa satunnainen useilta kaimailta (olisi loogisempaa tehdä Round Robin, mutta jätetään se Airflow-kehittäjien omantunnon varaan).

Muuttujat ja yhteydet ovat varmasti hienoja työkaluja, mutta on tärkeää, että et menetä tasapainoa: mitkä osat virtauksistasi tallennat itse koodiin ja mitkä osat annat Airflowlle tallennettavaksi. Toisaalta voi olla kätevää muuttaa nopeasti arvoa, esimerkiksi postilaatikkoa, käyttöliittymän kautta. Toisaalta tämä on silti paluu hiiren napsautukseen, josta halusimme (minä) päästä eroon.

Yhteyksien parissa työskenteleminen on yksi tehtävistä koukut. Yleisesti ottaen Airflow-koukut ovat pisteitä, joilla se yhdistetään kolmannen osapuolen palveluihin ja kirjastoihin. Esim, JiraHook avaa asiakkaan meille vuorovaikutukseen Jiran kanssa (voit siirtää tehtäviä edestakaisin) ja SambaHook voit työntää paikallisen tiedoston smb-kohta.

Muokatun operaattorin jäsentäminen

Ja pääsimme katsomaan kuinka se on tehty TelegramBotSendMessage

Koodi commons/operators.py varsinaisen operaattorin kanssa:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Täällä, kuten kaikessa muussakin Airflowssa, kaikki on hyvin yksinkertaista:

  • Peritty BaseOperator, joka toteuttaa aika monta Airflow-kohtaista asiaa (katso vapaa-aikaasi)
  • Ilmoitetut kentät template_fields, jossa Jinja etsii makroja käsiteltäväksi.
  • Järjestettiin oikeat argumentit puolesta __init__(), aseta oletusasetukset tarvittaessa.
  • Emme myöskään unohtaneet esi-isän alustusta.
  • Avasi vastaavan koukun TelegramBotHooksaanut siitä asiakasobjektin.
  • Ohitettu (uudelleenmääritetty) menetelmä BaseOperator.execute(), joka Airfow nykii, kun on aika käynnistää operaattori - siinä toteutamme päätoimenpiteen, unohtaen kirjautua sisään. (Kirjaudumme muuten heti sisään stdout и stderr - Ilmavirta sieppaa kaiken, kääri sen kauniisti, hajottaa tarvittaessa.)

Katsotaan mitä meillä on commons/hooks.py. Tiedoston ensimmäinen osa koukulla:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

En edes tiedä mitä tässä selittäisin, huomautan vain tärkeät seikat:

  • Perimme, ajattelemme argumentteja - useimmissa tapauksissa se on yksi: conn_id;
  • Vakiomenetelmien ohittaminen: rajoitin itseäni get_conn(), jossa saan yhteysparametrit nimen mukaan ja vain osion extra (tämä on JSON-kenttä), johon laitoin (omien ohjeideni mukaan!) Telegram-bottitunnuksen: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Luon esimerkin meidän TelegramBot, antamalla sille tietyn tunnuksen.

Siinä kaikki. Voit saada asiakkaan koukusta käyttämällä TelegramBotHook().clent tai TelegramBotHook().get_conn().

Ja tiedoston toinen osa, jossa teen mikrokääreen Telegram REST API:lle, jotta en vedä samaa python-telegram-bot yhdelle menetelmälle sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Oikea tapa on laskea kaikki yhteen: TelegramBotSendMessage, TelegramBotHook, TelegramBot - laita laajennuksessa julkiseen arkistoon ja anna se avoimelle lähteelle.

Tutkiessamme tätä kaikkea raporttipäivityksemme onnistuivat epäonnistumaan ja lähettämään minulle kanavassa virheilmoituksen. Menen katsomaan onko vika...

Apache Airflow: ETL:n helpottaminen
Jotain meni rikki koirassamme! Eikö se ole sitä mitä odotimme? Tarkalleen!

Aiotko kaataa?

Tuntuuko sinusta, että missasin jotain? Näyttää siltä, ​​​​että hän lupasi siirtää tietoja SQL Serveristä Verticaan, ja sitten hän otti sen ja siirtyi aiheesta, roisto!

Tämä julmuus oli tahallista, minun piti vain tulkita sinulle terminologiaa. Nyt voit mennä pidemmälle.

Suunnitelmamme oli tämä:

  1. Do dag
  2. Luo tehtäviä
  3. Katso kuinka kaunista kaikki on
  4. Määritä istuntonumerot täyttöihin
  5. Hae tietoja SQL Serveristä
  6. Laita tiedot Verticaan
  7. Kerää tilastoja

Joten, jotta tämä kaikki saadaan toimimaan, tein pienen lisäyksen meidän docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Siellä nostetaan:

  • Vertica isäntänä dwh kaikkein oletusasetuksilla,
  • kolme SQL Serverin esiintymää,
  • täytämme jälkimmäisen tietokannat joillakin tiedoilla (älä missään tapauksessa tutki mssql_init.py!)

Käynnistämme kaiken hyvän hieman monimutkaisemman komennon avulla kuin viime kerralla:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Voit käyttää tuotetta mitä ihmesatunnaistajamme loi Data Profiling/Ad Hoc Query:

Apache Airflow: ETL:n helpottaminen
Tärkeintä ei ole näyttää sitä analyytikoille

tarkentaa ETL-istunnot En tee, siellä kaikki on triviaalia: teemme pohjan, siinä on kyltti, käärimme kaiken kontekstinhallinnan avulla ja nyt teemme näin:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Aika on koittanut keräämme tietojamme puolentoistasadasta pöydästämme. Tehdään tämä erittäin vaatimattomien rivien avulla:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Koukun avulla saamme Airflowsta pymssql-kytkeä
  2. Korvataan pyyntöön päivämäärän muodossa oleva rajoitus - mallimoottori heittää sen funktioon.
  3. Ruokitaan pyyntöämme pandaskuka saa meidät DataFrame - siitä on meille hyötyä tulevaisuudessa.

Käytän substituutiota {dt} pyyntöparametrin sijaan %s ei siksi, että olisin paha Pinocchio, vaan siksi pandas ei voi käsitellä pymssql ja luistaa viimeisen params: Listvaikka hän todella haluaa tuple.
Huomaa myös, että kehittäjä pymssql päätti olla tukematta häntä enää, ja on aika muuttaa pois pyodbc.

Katsotaanpa, millä Airflow täytti funktioidemme argumentit:

Apache Airflow: ETL:n helpottaminen

Jos tietoja ei ole, ei ole mitään järkeä jatkaa. Mutta on myös outoa pitää täyttöä onnistuneena. Mutta tämä ei ole virhe. A-ah-ah, mitä tehdä?! Ja tässä mitä:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException kertoo Airflowlle, että virheitä ei ole, mutta ohitamme tehtävän. Käyttöliittymässä ei ole vihreää tai punaista neliötä, vaan vaaleanpunaista.

Heitetään tietomme useita sarakkeita:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Nimittäin

  • Tietokanta, josta saimme tilaukset,
  • Tulvaistuntomme tunnus (se on erilainen jokaiseen tehtävään),
  • Hajautus lähteestä ja tilaustunnus - jotta lopullisessa tietokannassa (jossa kaikki kaadetaan yhteen taulukkoon) meillä on yksilöllinen tilaustunnus.

Jäljelle jää toiseksi viimeinen vaihe: kaada kaikki Verticaan. Ja kummallista kyllä, yksi upeimmista ja tehokkaimmista tavoista tehdä tämä on CSV:n kautta!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Valmistamme erikoisvastaanottimen StringIO.
  2. pandas ystävällisesti laittaa meidän DataFrame muodossa CSV-linjat.
  3. Avataan koukulla yhteys suosikki Verticaan.
  4. Ja nyt avulla copy() lähetä tietomme suoraan Vertikaan!

Otamme kuljettajalta, kuinka monta riviä oli täytetty, ja kerromme istunnonjohtajalle, että kaikki on kunnossa:

session.loaded_rows = cursor.rowcount
session.successful = True

Siinä kaikki.

Myynnissä luomme kohdelevyn manuaalisesti. Tässä sallin itselleni pienen koneen:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

käytän VerticaOperator() Luon tietokantaskeeman ja taulukon (jos niitä ei vielä ole, tietysti). Tärkeintä on järjestää riippuvuudet oikein:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Yhteenvetona

- No, - sanoi pieni hiiri, - eikös nyt
Oletko varma, että olen metsän kauhein eläin?

Julia Donaldson, The Gruffalo

Luulen, että jos kollegoillani ja minulla olisi kilpailu: kuka nopeasti luo ja käynnistää ETL-prosessin tyhjästä: he SSIS:illään ja hiirellä ja minä Airflowlla... Ja sitten vertaisimme myös huollon helppoutta... Vau, uskon, että olet samaa mieltä siitä, että voitan heidät kaikilla rintamilla!

Jos vähän vakavammin, niin Apache Airflow - kuvailemalla prosesseja ohjelmakoodin muodossa - teki työni lisää mukavampaa ja nautinnollisempaa.

Sen rajaton laajennettavuus sekä laajennuksina että skaalautuvuuden suhteen antaa sinulle mahdollisuuden käyttää Airflow'ta lähes kaikilla alueilla: jopa koko tiedonkeruun, valmistelun ja käsittelyn aikana, jopa raketteja laukaiseessa (Marsiin, kurssi).

Osa loppu, viite ja tiedot

Keräsimme sinulle

  • start_date. Kyllä, tämä on jo paikallinen meemi. Via Dougin tärkein argumentti start_date kaikki ohi. Lyhyesti, jos määrität start_date nykyinen päivämäärä ja schedule_interval - Jonain päivänä DAG alkaa huomenna ei aikaisemmin.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Eikä enää ongelmia.

    Siihen liittyy toinen ajonaikainen virhe: Task is missing the start_date parameter, mikä useimmiten osoittaa, että olet unohtanut sitoutua dag-operaattoriin.

  • Kaikki yhdellä koneella. Kyllä, ja pohjat (itse Airflow ja pinnoitteemme), ja web-palvelin, ja ajastin ja työntekijät. Ja se jopa toimi. Mutta ajan myötä palveluiden tehtävien määrä kasvoi, ja kun PostgreSQL alkoi vastata indeksiin 20 sekunnissa 5 ms:n sijaan, otimme sen ja kantoimme sen pois.
  • LocalExecutor. Kyllä, istumme edelleen sen päällä ja olemme jo tulleet kuilun reunalle. LocalExecutor on riittänyt meille toistaiseksi, mutta nyt on aika laajentua ainakin yhdellä työntekijällä ja meidän on tehtävä lujasti töitä siirtyäksemme CeleryExecutoriin. Ja kun otetaan huomioon, että voit työskennellä sen kanssa yhdellä koneella, mikään ei estä sinua käyttämästä Selleryä edes palvelimella, joka "rehellisesti sanottuna ei tietenkään koskaan mene tuotantoon!"
  • Ei käytössä sisäänrakennetut työkalut:
    • Liitännät tallentaa huoltotiedot,
    • SLA Miss vastata tehtäviin, jotka eivät sujuneet ajoissa,
    • xcom metatietojen vaihtoa varten (sanoin metadata!) dag-tehtävien välillä.
  • Postin väärinkäyttö. No, mitä voin sanoa? Hälytykset asetettiin kaikille kaatuneiden tehtävien toistuville. Nyt työssäni Gmailissa on yli 90 100 sähköpostia Airflowsta, ja verkkosähköpostin kuono kieltäytyy poimimasta ja poistamasta yli XNUMX:ta kerrallaan.

Lisää sudenkuoppia: Apache Airflow Pitfails

Lisää automaatiotyökaluja

Jotta voisimme työskennellä entistä enemmän päällämme eikä käsillämme, Airflow on valmistanut meille seuraavan:

  • REST API - hänellä on edelleen kokeellisen asema, mikä ei estä häntä työskentelemästä. Sen avulla voit paitsi saada tietoa päivämääristä ja tehtävistä, myös pysäyttää/aloittaa päiväpäivien, luoda DAG Run tai poolin.
  • CLI - Komentorivin kautta on saatavilla monia työkaluja, jotka eivät ole vain hankalia käyttää WebUI:n kautta, mutta ne ovat yleensä poissa. Esimerkiksi:
    • backfill tarvitaan tehtäväinstanssien käynnistämiseen uudelleen.
      Esimerkiksi analyytikot tulivat ja sanoivat: "Ja sinulla, toveri, on hölynpölyä tiedoissa 1. - 13. tammikuuta! Korjaa, korjaa, korjaa, korjaa!" Ja sinä olet sellainen liesi:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Peruspalvelu: initdb, resetdb, upgradedb, checkdb.
    • run, jonka avulla voit suorittaa yhden ilmentymätehtävän ja jopa tehdä pisteitä kaikista riippuvuuksista. Lisäksi voit ajaa sen kautta LocalExecutor, vaikka sinulla olisi selleriklusteri.
    • Tekee aika lailla samaa test, vain myös pohjassa ei kirjoita mitään.
    • connections mahdollistaa yhteyksien massan luomisen kuoresta.
  • Python-sovellusliittymä - melko kova vuorovaikutustapa, joka on tarkoitettu laajennuksille, eikä kuhise siinä pienillä käsillä. Mutta kuka estää meitä menemästä /home/airflow/dags, Suorita ipython ja alkaa sotkemaan? Voit esimerkiksi viedä kaikki yhteydet seuraavalla koodilla:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Yhdistetään Airflow-metatietokantaan. En suosittele kirjoittamista sille, mutta tehtävätilojen saaminen eri mittareille voi olla paljon nopeampaa ja helpompaa kuin minkä tahansa API:n käyttäminen.

    Oletetaan, että kaikki tehtävämme eivät ole idempotentteja, mutta ne voivat joskus kaatua, ja tämä on normaalia. Mutta muutama tukos on jo epäilyttävää, ja se olisi tarpeen tarkistaa.

    Varo SQL:ää!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

viittaukset

Ja tietysti, ensimmäiset kymmenen linkkiä Googlen liikkeeseenlaskusta ovat Airflow-kansion sisältö kirjanmerkeistäni.

Ja artikkelissa käytetyt linkit:

Lähde: will.com