„Apache“ oro srautas: palengvinti ETL

Sveiki, esu Dmitrijus Logvinenko – įmonių grupės „Vezet“ analitikos skyriaus duomenų inžinierius.

Papasakosiu apie nuostabų ETL procesų kūrimo įrankį – Apache Airflow. Tačiau „Airflow“ yra toks universalus ir daugialypis, kad turėtumėte į jį atidžiau pažvelgti, net jei nesate susiję su duomenų srautais, tačiau turite periodiškai paleisti bet kokius procesus ir stebėti jų vykdymą.

Ir taip, aš ne tik pasakysiu, bet ir parodysiu: programoje yra daug kodo, ekrano kopijų ir rekomendacijų.

„Apache“ oro srautas: palengvinti ETL
Tai, ką paprastai matote, kai „Google“ ieškote žodį „Airflow“ / „Wikimedia Commons“.

Turinys

įvedimas

„Apache Airflow“ yra kaip „Django“:

  • parašyta python
  • yra puikus administratoriaus skydelis,
  • plečiamas neribotą laiką

- tik geriau, ir jis buvo pagamintas visai kitiems tikslams, būtent (kaip parašyta prieš katą):

  • vykdyti ir stebėti užduotis neribotame skaičiuje mašinų (kiek leis daug Salierų / Kubernetes ir jūsų sąžinė)
  • su dinamine darbo eigos generavimu iš labai lengvai rašomo ir suprantamo Python kodo
  • ir galimybė sujungti bet kokias duomenų bazes ir API tarpusavyje naudojant paruoštus komponentus ir namuose sukurtus įskiepius (o tai labai paprasta).

Mes naudojame „Apache Airflow“ taip:

  • renkame duomenis iš įvairių šaltinių (daug SQL Server ir PostgreSQL egzempliorių, įvairių API su taikomųjų programų metrika, net 1C) DWH ir ODS (turime Vertica ir Clickhouse).
  • kiek pažengęs cron, kuris pradeda duomenų konsolidavimo procesus ODS, taip pat stebi jų priežiūrą.

Dar visai neseniai mūsų poreikius tenkino vienas mažas serveris su 32 branduoliais ir 50 GB RAM. Oro sraute tai veikia:

  • daugiau 200 dagų (iš tikrųjų darbo eigos, į kurias įdėjome užduotis),
  • kiekviename vidutiniškai 70 užduočių,
  • šis gėris prasideda (taip pat vidutiniškai) kartą per valandą.

O apie tai, kaip išsiplėtėme, parašysiu žemiau, bet dabar apibrėžkime über problemą, kurią išspręsime:

Yra trys originalūs SQL serveriai, kurių kiekviename yra 50 duomenų bazių - atitinkamai vieno projekto egzemplioriai, jie turi tą pačią struktūrą (beveik visur, mua-ha-ha), o tai reiškia, kad kiekvienas turi užsakymų lentelę (laimei, lentelę su tokia). pavadinimas gali būti įtrauktas į bet kurį verslą). Duomenis paimame pridėdami paslaugų laukus (šaltinio serveris, šaltinio duomenų bazė, ETL užduoties ID) ir naiviai metame į, tarkime, Vertica.

Eikime!

Pagrindinė dalis, praktinė (ir šiek tiek teorinė)

Kodėl mes (ir jūs)

Kai medžiai buvo dideli, o aš paprastas SQL-schik vienoje Rusijos mažmeninėje prekyboje apgaudinėjome ETL procesus, dar vadinamus duomenų srautais, naudodami du mums prieinamus įrankius:

  • Informatikos energijos centras - itin plinta sistema, itin produktyvi, su savo technine įranga, savo versijomis. Aš panaudojau 1% jos galimybių, neduok Dieve. Kodėl? Na, visų pirma, ši sąsaja, kilusi iš 380-ųjų, darė mums psichikos spaudimą. Antra, šis įtaisas skirtas itin įmantriam procesui, įnirtingam komponentų pakartotiniam naudojimui ir kitiems labai svarbiems įmonės triukams. Apie tai, kiek kainuoja, pavyzdžiui, „Airbus AXNUMX“ sparnas per metus, nieko nesakysime.

    Atsargiai, ekrano kopija gali šiek tiek pakenkti jaunesniems nei 30 metų žmonėms

    „Apache“ oro srautas: palengvinti ETL

  • SQL serverio integravimo serveris - naudojome šį draugą savo projektų srautuose. Na, iš tikrųjų: mes jau naudojame SQL Server, ir būtų kažkaip neprotinga nenaudoti jo ETL įrankių. Viskas jame gerai: ir sąsaja graži, ir pažangos ataskaitos... Bet ne dėl to mes mėgstame programinės įrangos produktus, o, ne dėl to. Versija tai dtsx (tai yra XML, kai mazgai sumaišomi išsaugant) galime, bet kokia prasmė? O kaip sukurti užduočių paketą, kuris nutemps šimtus lentelių iš vieno serverio į kitą? Taip, koks šimtas, rodomasis pirštas nukris nuo dvidešimties gabaliukų, spustelėjus pelės mygtuką. Bet tikrai atrodo madingiau:

    „Apache“ oro srautas: palengvinti ETL

Tikrai ieškojome išeičių. Atvejis net beveik atėjo į savarankiškai parašytą SSIS paketų generatorių ...

…ir tada mane surado naujas darbas. Ir Apache Airflow mane aplenkė.

Kai sužinojau, kad ETL procesų aprašymai yra paprastas Python kodas, aš tiesiog nešokau iš džiaugsmo. Taip duomenų srautai buvo versijuojami ir diferencijuojami, o vienos struktūros lentelių suliejimas iš šimtų duomenų bazių į vieną taikinį tapo Python kodo reikalu pusantro ar dviejuose 13 colių ekranuose.

Klasterio surinkimas

Netvarkykime visiškai vaikų darželio ir nekalbėkime čia apie visiškai akivaizdžius dalykus, tokius kaip Airflow įdiegimas, jūsų pasirinkta duomenų bazė, Salierai ir kiti doke aprašyti atvejai.

Kad galėtume nedelsiant pradėti eksperimentus, nubraižiau eskizą docker-compose.yml kuriame:

  • Iš tikrųjų pakelkime Oro srautas: planuoklis, žiniatinklio serveris. Gėlė taip pat suksis ten, kad stebėtų salierų užduotis (nes ji jau buvo įstumta apache/airflow:1.10.10-python3.7, bet mes neprieštaraujame)
  • PostgreSQL, kuriame Airflow įrašys savo paslaugų informaciją (planuotojo duomenis, vykdymo statistiką ir kt.), o Celery pažymės atliktas užduotis;
  • Redis, kuri veiks kaip „Selery“ užduočių tarpininkas;
  • Salierų darbuotojas, kuri užsiims tiesioginiu užduočių vykdymu.
  • Į aplanką ./dags mes pridėsime savo failus su dags aprašymu. Jie bus paimti skrendant, todėl po kiekvieno čiaudėjimo nereikia žongliruoti visa krūva.

Kai kur pavyzdžiuose esantis kodas ne iki galo parodytas (kad tekstas nebūtų perkrautas), bet kai kur jis proceso metu modifikuojamas. Išsamių darbo kodų pavyzdžių galima rasti saugykloje https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Pastabos:

  • Surinkdama kompoziciją daugiausia rėmiausi gerai žinomu įvaizdžiu puckel / Docker-oro srautas - būtinai patikrink. Galbūt tau gyvenime nieko daugiau ir nereikia.
  • Visi oro srauto nustatymai pasiekiami ne tik per airflow.cfg, bet ir per aplinkos kintamuosius (ačiū kūrėjams), kuriais piktybiškai pasinaudojau.
  • Natūralu, kad jis nėra paruoštas gamybai: aš sąmoningai nedėjau širdies plakimo ant konteinerių, nesivarginau dėl saugumo. Bet aš padariau minimumą, tinkantį mūsų eksperimentuotojams.
  • Prisimink tai:
    • Aplankas dag turi būti pasiekiamas ir planuotojui, ir darbuotojams.
    • Tas pats pasakytina apie visas trečiųjų šalių bibliotekas – jos visos turi būti įdiegtos įrenginiuose su planuokliu ir darbuotojais.

Na, dabar viskas paprasta:

$ docker-compose up --scale worker=3

Kai viskas pakyla, galite pažvelgti į žiniatinklio sąsajas:

Pagrindinės sąvokos

Jei visuose šiuose „daguose“ nieko nesupratote, čia yra trumpas žodynas:

  • Tvarkaraštis - svarbiausias dėdė Airflow, kontroliuojantis, kad robotai sunkiai dirbtų, o ne žmogus: stebi tvarkaraštį, atnaujina dagius, paleidžia užduotis.

    Apskritai, senesnėse versijose jis turėjo problemų su atmintimi (ne, ne amnezija, o nutekėjimai), o senasis parametras net išliko konfigūracijose run_duration - jo paleidimo iš naujo intervalas. Bet dabar viskas gerai.

  • DAG (dar žinomas kaip "dag") - "nukreiptas aciklinis grafikas", tačiau toks apibrėžimas pasakys nedaugeliui žmonių, tačiau iš tikrųjų tai yra vienas su kitu sąveikaujančių užduočių konteineris (žr. toliau) arba SSIS paketo ir Informatikos darbo eigos analogas. .

    Be dagių, dar gali būti ir subdagų, bet greičiausiai jų nepasieksime.

  • DAG bėgimas - inicijuotas dag, kuriam priskiriamas savas execution_date. To paties dago dagranai gali dirbti lygiagrečiai (jei savo užduotis padarėte idealias, žinoma).
  • operatorius yra kodo dalys, atsakingos už konkretaus veiksmo atlikimą. Yra trijų tipų operatoriai:
    • veiksmaskaip mūsų mėgstamiausia PythonOperator, kuris gali vykdyti bet kokį (galiojantį) Python kodą;
    • perkėlimas, kurie perkelia duomenis iš vienos vietos į kitą, tarkime, MsSqlToHiveTransfer;
    • jutiklis kita vertus, tai leis jums reaguoti arba sulėtinti tolesnį dag vykdymą, kol įvyks įvykis. HttpSensor gali patraukti nurodytą galinį tašką, o kai laukia norimas atsakymas, pradėti perkėlimą GoogleCloudStorageToS3Operator. Smalsus protas paklaus: „Kodėl? Juk pakartojimus galite daryti tiesiog operatoriuje! Ir tada, kad neužkimštų užduočių telkinio su sustabdytais operatoriais. Jutiklis įsijungia, patikrina ir užges prieš kitą bandymą.
  • užduotis - deklaruoti operatoriai, neatsižvelgiant į tipą, ir prijungti prie dag, pakeliami į užduoties rangą.
  • užduoties pavyzdys - kai generalinis planuotojas nusprendė, kad laikas pasiųsti užduotis į mūšį atlikėjams-darbininkams (iš karto, jei naudosime LocalExecutor arba į nuotolinį mazgą, jei CeleryExecutor), priskiria jiems kontekstą (t. y. kintamųjų rinkinį – vykdymo parametrus), išplečia komandų ar užklausų šablonus ir sujungia juos.

Mes generuojame užduotis

Iš pradžių apibūdinkime bendrą mūsų tešlos schemą, o tada vis labiau pasinersime į smulkmenas, nes taikome keletą nebanalių sprendimų.

Taigi paprasčiausia forma toks dag atrodys taip:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Išsiaiškinkime:

  • Pirma, importuojame reikiamus libs ir kažkas kito;
  • sql_server_ds - yra List[namedtuple[str, str]] su jungčių iš Airflow Connections pavadinimais ir duomenų bazėmis, iš kurių paimsime savo plokštelę;
  • dag - mūsų dienos skelbimas, kuris būtinai turi būti globals(), kitaip Airflow jo neras. Dougas taip pat turi pasakyti:
    • koks jo vardas orders - tada šis pavadinimas bus rodomas žiniatinklio sąsajoje,
    • kad dirbs nuo liepos aštuntosios vidurnakčio,
    • ir jis turėtų veikti maždaug kas 6 valandas (kietiems vaikinams čia vietoj timedelta() leistina cron- linija 0 0 0/6 ? * * *, mažiau šauniems - posakis kaip @daily);
  • workflow() atliks pagrindinį darbą, bet ne dabar. Kol kas mes tiesiog įtrauksime kontekstą į žurnalą.
  • O dabar paprasta užduočių kūrimo magija:
    • bėgame per savo šaltinius;
    • inicijuoti PythonOperator, kuris įvykdys mūsų manekeną workflow(). Nepamirškite nurodyti unikalaus (dagoje) užduoties pavadinimo ir susieti patį dag. Vėliava provide_context savo ruožtu į funkciją įlies papildomų argumentų, kuriuos naudodami atsargiai rinksime **context.

Kol kas tai viskas. Ką gavome:

  • naujas dag žiniatinklio sąsajoje,
  • pusantro šimto užduočių, kurios bus vykdomos lygiagrečiai (jei tai leis Airflow, Celery nustatymai ir serverio talpa).

Na, beveik supratau.

„Apache“ oro srautas: palengvinti ETL
Kas įdiegs priklausomybes?

Kad visa tai būtų supaprastinta, įsukau docker-compose.yml apdorojimas requirements.txt visuose mazguose.

Dabar jo nebėra:

„Apache“ oro srautas: palengvinti ETL

Pilki kvadratai yra planavimo priemonės apdorojami užduočių atvejai.

Šiek tiek palaukiame, užduotis laužo darbininkai:

„Apache“ oro srautas: palengvinti ETL

Žalieji, žinoma, sėkmingai baigė savo darbą. Raudonos spalvos nėra labai sėkmingos.

Beje, mūsų gaminyje nėra aplanko ./dags, nėra sinchronizavimo tarp mašinų - visi dagiai guli git mūsų „Gitlab“, o „Gitlab CI“ platina atnaujinimus įrenginiams, kai susijungia master.

Šiek tiek apie Gėlę

Kol darbininkai tranko mūsų čiulptukus, prisiminkime dar vieną įrankį, galintį mums kažką parodyti – Gėlę.

Pats pirmasis puslapis su suvestinės informacijos apie darbuotojų mazgus:

„Apache“ oro srautas: palengvinti ETL

Intensyviausias puslapis su atliktomis užduotimis:

„Apache“ oro srautas: palengvinti ETL

Pats nuobodžiausias puslapis su mūsų brokerio statusu:

„Apache“ oro srautas: palengvinti ETL

Ryškiausias puslapis yra su užduočių būsenos diagramomis ir jų vykdymo laiku:

„Apache“ oro srautas: palengvinti ETL

Krauname per mažai pakrautą

Taigi, visos užduotys buvo įvykdytos, galite išvežti sužeistuosius.

„Apache“ oro srautas: palengvinti ETL

O sužeistųjų buvo daug – dėl vienokių ar kitokių priežasčių. Teisingai naudojant „Airflow“, šie kvadratai rodo, kad duomenys tikrai nebuvo gauti.

Turite žiūrėti žurnalą ir iš naujo paleisti nukritusias užduotis.

Spustelėję bet kurį kvadratą pamatysime mums galimus veiksmus:

„Apache“ oro srautas: palengvinti ETL

Galite paimti ir padaryti Išvalyti kritusius. Tai yra, pamirštame, kad ten kažkas nepavyko, ir ta pati egzemplioriaus užduotis atiteks planuokliui.

„Apache“ oro srautas: palengvinti ETL

Akivaizdu, kad tai daryti su pele su visais raudonais kvadratėliais nėra labai humaniška – iš Airflow to nesitikime. Natūralu, kad turime masinio naikinimo ginklų: Browse/Task Instances

„Apache“ oro srautas: palengvinti ETL

Pasirinkime viską iš karto ir iš naujo nustatykime į nulį, spustelėkite tinkamą elementą:

„Apache“ oro srautas: palengvinti ETL

Po valymo mūsų taksi atrodo taip (jau laukia, kol juos suplanuos tvarkaraštis):

„Apache“ oro srautas: palengvinti ETL

Jungtys, kabliukai ir kiti kintamieji

Atėjo laikas pažvelgti į kitą DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Ar visi kada nors atnaujino ataskaitą? Tai vėl ji: yra šaltinių, iš kurių gauti duomenis, sąrašas; yra sąrašas, kur įdėti; nepamirškite pagirti, kai viskas atsitiko ar sugedo (na, čia ne apie mus, ne).

Dar kartą peržiūrėkime failą ir pažvelkime į naujus neaiškius dalykus:

  • from commons.operators import TelegramBotSendMessage - niekas netrukdo mums pasidaryti savo operatorių, kuriais pasinaudojome sukūrę nedidelį įpakavimą žinutėms siųsti į Unblocked. (Apie šį operatorių plačiau pakalbėsime žemiau);
  • default_args={} - dag gali paskirstyti tuos pačius argumentus visiems savo operatoriams;
  • to='{{ var.value.all_the_kings_men }}' - laukas to neturėsime koduotų, o dinamiškai sugeneruotų naudojant Jinja ir kintamąjį su el. laiškų sąrašu, kurį atsargiai įdėjau Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — operatoriaus paleidimo sąlyga. Mūsų atveju laiškas nuskris į viršininkus tik tuo atveju, jei visos priklausomybės išsispręs sėkmingai;
  • tg_bot_conn_id='tg_main' - argumentai conn_id priimti prisijungimo ID, kuriuos sukuriame Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - žinutės „Telegram“ nuskris tik tuo atveju, jei bus užduočių;
  • task_concurrency=1 - Draudžiame vienu metu paleisti kelis vienos užduoties užduočių atvejus. Priešingu atveju vienu metu bus paleista keletas VerticaOperator (žiūri į vieną stalą);
  • report_update >> [email, tg] - viskas VerticaOperator susilieja siunčiant laiškus ir žinutes, pavyzdžiui:
    „Apache“ oro srautas: palengvinti ETL

    Bet kadangi pranešėjų operatoriai turi skirtingas paleidimo sąlygas, veiks tik viena. Medžio rodinyje viskas atrodo šiek tiek mažiau vizualiai:
    „Apache“ oro srautas: palengvinti ETL

Pasakysiu keletą žodžių apie makrokomandas ir jų draugai - kintamieji.

Makrokomandos yra Jinja vietos žymekliai, kurie gali pakeisti įvairią naudingą informaciją į operatoriaus argumentus. Pavyzdžiui, taip:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} išsiplės iki kontekstinio kintamojo turinio execution_date formatu YYYY-MM-DD: 2020-07-14. Geriausia yra tai, kad konteksto kintamieji pririšami prie konkrečios užduoties egzemplioriaus (kvadratas medžio rodinyje), o paleidus iš naujo, rezervuotos vietos išsiplės iki tų pačių verčių.

Priskirtas reikšmes galima peržiūrėti naudojant kiekvieno užduoties egzemplioriaus mygtuką Pateiktas. Štai kaip atliekama laiško siuntimo užduotis:

„Apache“ oro srautas: palengvinti ETL

Ir taip atliekant užduotį su žinutės siuntimu:

„Apache“ oro srautas: palengvinti ETL

Visą naujausios versijos integruotų makrokomandų sąrašą rasite čia: makrokomandų nuoroda

Be to, naudodami papildinius galime deklaruoti savo makrokomandas, bet tai jau kita istorija.

Be iš anksto nustatytų dalykų, galime pakeisti savo kintamųjų reikšmes (aš tai jau naudojau aukščiau esančiame kode). Kurkime Admin/Variables pora dalykų:

„Apache“ oro srautas: palengvinti ETL

Viskas, ką galite naudoti:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Vertė gali būti skaliarinė arba JSON. JSON atveju:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

tiesiog naudokite kelią į norimą raktą: {{ var.json.bot_config.bot.token }}.

Pažodžiui pasakysiu vieną žodį ir parodysiu vieną ekrano kopiją apie jungtys. Viskas čia elementaru: puslapyje Admin/Connections sukuriame ryšį, pridedame savo prisijungimus / slaptažodžius ir konkretesnius parametrus. Kaip šitas:

„Apache“ oro srautas: palengvinti ETL

Slaptažodžiai gali būti užšifruoti (labiau nei numatytasis) arba galite nenurodyti ryšio tipo (kaip aš padariau tg_main) - faktas yra tas, kad tipų sąrašas yra prijungtas prie „Airflow“ modelių ir negali būti išplėstas neįsigilinus į šaltinio kodus (jei staiga ko nors nepastebėjau „Google“, pataisykite mane), bet niekas netrukdys mums gauti kreditų. vardas.

Taip pat galite užmegzti keletą jungčių tuo pačiu pavadinimu: šiuo atveju metodas BaseHook.get_connection(), kuris suteikia mums ryšius pagal pavadinimą, duos atsitiktinis iš kelių bendravardių (logiškiau būtų padaryti Round Robin, bet palikime tai ant Airflow kūrėjų sąžinės).

Kintamieji ir jungtys tikrai yra šaunūs įrankiai, tačiau svarbu neprarasti pusiausvyros: kurias srautų dalis saugote pačiame kode, o kurias atiduodate saugoti „Airflow“. Viena vertus, per vartotojo sąsają gali būti patogu greitai pakeisti vertę, pavyzdžiui, pašto dėžutės. Kita vertus, tai vis tiek yra grįžimas prie pelės paspaudimo, nuo kurio mes (aš) norėjome atsikratyti.

Darbas su ryšiais yra viena iš užduočių kabliukai. Apskritai „Airflow“ kabliukai yra taškai, skirti prijungti jį prie trečiųjų šalių paslaugų ir bibliotekų. Pvz., JiraHook atidarys klientą, kad galėtume bendrauti su Jira (galite perkelti užduotis pirmyn ir atgal), ir padedant SambaHook galite nusiųsti vietinį failą smb-taškas.

Pasirinktinis operatorius analizuojamas

Ir mes priartėjome prie to, kad pamatytume, kaip jis pagamintas TelegramBotSendMessage

Kodas commons/operators.py su tikruoju operatoriumi:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Čia, kaip ir visa kita Airflow, viskas labai paprasta:

  • Paveldėjo iš BaseOperator, kuriame įgyvendinama nemažai oro srautui būdingų dalykų (pažiūrėkite į savo laisvalaikį)
  • Deklaruoti laukai template_fields, kuriame Jinja ieškos apdorotų makrokomandų.
  • Surinko tinkamus argumentus už __init__(), kur reikia, nustatykite numatytuosius nustatymus.
  • Nepamiršome ir protėvio inicijavimo.
  • Atidarė atitinkamą kabliuką TelegramBotHookiš jos gavo kliento objektą.
  • Nepaisytas (iš naujo apibrėžtas) metodas BaseOperator.execute(), kurį Airfow trūkčios, kai ateis laikas paleisti operatorių – jame įgyvendinsime pagrindinį veiksmą, pamiršę prisijungti. (Beje, mes prisijungiame iškart stdout и stderr - Oro srautas viską sulaikys, gražiai apvynios, suskaidys, kur reikia.)

Pažiūrėkime, ką turime commons/hooks.py. Pirmoji failo dalis su pačiu kabliuku:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Net nežinau, ką čia paaiškinti, tik atkreipsiu dėmesį į svarbius dalykus:

  • Mes paveldime, galvojame apie argumentus - daugeliu atvejų tai bus vienas: conn_id;
  • Standartinių metodų nepaisymas: aš apsiribojau get_conn(), kuriame aš gaunu ryšio parametrus pagal pavadinimą ir tiesiog gaunu skyrių extra (tai yra JSON laukas), kuriame aš (pagal savo instrukcijas!) įdėjau Telegram boto prieigos raktą: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Kuriu mūsų pavyzdį TelegramBot, suteikiant jai konkretų prieigos raktą.

Tai viskas. Galite gauti klientą iš kablio naudodami TelegramBotHook().clent arba TelegramBotHook().get_conn().

Ir antroji failo dalis, kurioje darau „Telegram REST API“ mikroįvyniojimą, kad netempčiau to paties python-telegram-bot vienam metodui sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Teisingas būdas yra viską sudėti: TelegramBotSendMessage, TelegramBotHook, TelegramBot - įskiepyje įdėkite į viešą saugyklą ir suteikite ją atvirajam šaltiniui.

Kol visa tai studijavome, mūsų ataskaitų atnaujinimai sėkmingai nepavyko ir atsiuntė man kanale klaidos pranešimą. Einu paziureti ar negerai...

„Apache“ oro srautas: palengvinti ETL
Kažkas sugedo mūsų šunelyje! Ar ne to mes tikėjomės? tiksliai!

Ar ketini pilti?

Ar jauti, kad kažką praleidau? Atrodo, kad jis pažadėjo perkelti duomenis iš SQL serverio į Vertica, o tada ėmė ir nukrypo nuo temos, niekšas!

Šis žiaurumas buvo tyčinis, aš tiesiog turėjau jums iššifruoti tam tikrą terminiją. Dabar galite eiti toliau.

Mūsų planas buvo toks:

  1. Do dag
  2. Generuokite užduotis
  3. Pažiūrėkite, kaip viskas gražu
  4. Priskirkite užpildams seansų numerius
  5. Gaukite duomenis iš SQL serverio
  6. Įdėkite duomenis į Vertica
  7. Rinkti statistiką

Taigi, kad visa tai pradėtų veikti, aš padariau nedidelį mūsų papildymą docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Ten mes keliame:

  • Vertica kaip šeimininkas dwh su labiausiai numatytais nustatymais,
  • trys SQL serverio egzemplioriai,
  • Pastarosiose esančias duomenų bazes užpildome kai kuriais duomenimis (jokiu būdu nesižvalgykite mssql_init.py!)

Visą gėrį paleidžiame naudodami šiek tiek sudėtingesnę komandą nei praėjusį kartą:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Tai, ką sukūrė mūsų stebuklingas atsitiktinių imčių įrankis, galite naudoti elementą Data Profiling/Ad Hoc Query:

„Apache“ oro srautas: palengvinti ETL
Svarbiausia to nerodyti analitikams

detalizuoti ETL sesijos Nedarysiu, ten viskas yra nereikšminga: padarome pagrindą, jame yra ženklas, viską apvyniojame konteksto tvarkykle, o dabar darome taip:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Atėjo laikas rinkti mūsų duomenis nuo mūsų pusantro šimto stalų. Padarykime tai naudodami labai nepretenzingus eilutes:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Kabliuko pagalba gauname iš Airflow pymssql-Prisijungti
  2. Užklausoje pakeiskime apribojimą datos forma – šablono variklis jį įmes į funkciją.
  3. Pateikiame mūsų prašymą pandaskas mus gaus DataFrame – tai mums pravers ateityje.

Aš naudoju pakaitalą {dt} vietoj užklausos parametro %s ne todėl, kad esu piktasis Pinokis, o todėl pandas negali susitvarkyti pymssql ir paslysta paskutinis params: Listnors jis tikrai nori tuple.
Taip pat atkreipkite dėmesį, kad kūrėjas pymssql nusprendė jo neberemti, ir laikas išsikraustyti pyodbc.

Pažiūrėkime, kuo „Airflow“ užpildė mūsų funkcijų argumentus:

„Apache“ oro srautas: palengvinti ETL

Jei nėra duomenų, nėra prasmės tęsti. Tačiau taip pat keista laikyti įdarą sėkmingu. Bet tai nėra klaida. A-ah-ah, ką daryti?! Ir štai kas:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException pasakys Airflow, kad klaidų nėra, bet mes praleidžiame užduotį. Sąsaja turės ne žalią ar raudoną kvadratą, o rausvą.

Permeskime savo duomenis keli stulpeliai:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Būtent:

  • Duomenų bazė, iš kurios gavome užsakymus,
  • Mūsų potvynio seanso ID (jis bus kitoks kiekvienai užduočiai),
  • Maiša iš šaltinio ir užsakymo ID – kad galutinėje duomenų bazėje (kur viskas supilama į vieną lentelę) turėtume unikalų užsakymo ID.

Lieka priešpaskutinis žingsnis: supilkite viską į Vertica. Ir, kaip bebūtų keista, vienas įspūdingiausių ir efektyviausių būdų tai padaryti yra CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Gaminame specialų imtuvą StringIO.
  2. pandas maloniai įdės mūsų DataFrame kaip CSV-linijos.
  3. Su kabliu atidarykime ryšį su mūsų mėgstama Vertica.
  4. O dabar su pagalba copy() siųskite mūsų duomenis tiesiai Vertika!

Mes paimsime iš vairuotojo, kiek eilučių buvo užpildyta, ir pasakysime sesijos vadovui, kad viskas gerai:

session.loaded_rows = cursor.rowcount
session.successful = True

Tai viskas.

Parduodant tikslinę plokštę sukuriame rankiniu būdu. Čia leidau sau nedidelę mašinėlę:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Aš naudojuosi VerticaOperator() Sukuriu duomenų bazės schemą ir lentelę (jei jų dar nėra, žinoma). Svarbiausia yra teisingai išdėstyti priklausomybes:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Sumavimas

- Na, - tarė pelytė, - ar ne dabar
Ar tu įsitikinęs, kad aš esu baisiausias gyvūnas miške?

Julia Donaldson, Grufalas

Manau, jei su kolegomis konkuruotume: kas greitai sukurs ir pradės ETL procesą nuo nulio: jie su savo SSIS ir pele, o aš su Airflow... Ir tada dar palygintume priežiūros paprastumą... Oho, manau, sutiksite, kad aš juos įveiksiu visuose frontuose!

Jei šiek tiek rimčiau, tai Apache Airflow - aprašydamas procesus programos kodo forma - atliko mano darbą daug patogiau ir maloniau.

Jo neribotas išplėtimas tiek papildinių, tiek polinkio į mastelį atžvilgiu suteikia galimybę naudoti „Airflow“ beveik bet kurioje srityje: net per visą duomenų rinkimo, ruošimo ir apdorojimo ciklą, net paleidžiant raketas (į Marsą, kursas).

Galutinė dalis, nuoroda ir informacija

Grėblis, kurį surinkome jums

  • start_date. Taip, tai jau vietinis memas. Pagrindinis Dougo argumentas start_date Visi išlaikyti. Trumpai, jei nurodysite start_date dabartinė data ir schedule_interval - vieną dieną, tada DAG prasidės rytoj ne anksčiau.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Ir daugiau jokių problemų.

    Su juo susijusi kita vykdymo laiko klaida: Task is missing the start_date parameter, o tai dažniausiai rodo, kad pamiršote susieti su dag operatoriumi.

  • Viskas vienoje mašinoje. Taip, ir bazės (pats „Airflow“ ir mūsų danga), ir žiniatinklio serveris, ir planuotojas, ir darbuotojai. Ir netgi pavyko. Tačiau laikui bėgant paslaugų užduočių skaičius augo ir kai PostgreSQL pradėjo reaguoti į indeksą per 20 s, o ne per 5 ms, mes jį paėmėme ir nunešėme.
  • Vietinis vykdytojas. Taip, mes vis dar sėdime ant jo ir jau priėjome prie bedugnės krašto. „LocalExecutor“ mums iki šiol pakako, bet dabar laikas plėstis bent vienu darbuotoju, o pereiti prie „CeleryExecutor“ turėsime sunkiai dirbti. Ir atsižvelgiant į tai, kad galite dirbti su juo viename įrenginyje, niekas netrukdo jums naudoti „Sellery“ net serveryje, kuris „žinoma, niekada nebus pradėtas gaminti, sąžiningai!
  • Nenaudojimo įmontuoti įrankiai:
    • Jungtys saugoti paslaugų kredencialus,
    • SLA Misses reaguoti į užduotis, kurios nepavyko laiku,
    • xcom metaduomenų mainams (sakiau metaduomenis!) tarp dag užduočių.
  • Pašto piktnaudžiavimas. Na, ką aš galiu pasakyti? Buvo nustatyti įspėjimai apie visus kritusių užduočių pasikartojimus. Dabar mano darbo „Gmail“ turi daugiau nei 90 100 el. laiškų iš „Airflow“, o žiniatinklio pašto snukis atsisako pasiimti ir ištrinti daugiau nei XNUMX laiškų vienu metu.

Daugiau spąstų: Apache Airflow Pitfails

Daugiau automatizavimo įrankių

Tam, kad dar daugiau dirbtume galva, o ne rankomis, Airflow mums paruošė štai ką:

  • POILSIO API - jis vis dar turi Eksperimento statusą, kuris jam netrukdo dirbti. Su juo galite ne tik gauti informacijos apie dags ir užduotis, bet ir sustabdyti/paleisti dag, sukurti DAG Run ar baseiną.
  • CLI - Komandinėje eilutėje yra daug įrankių, kuriuos ne tik nepatogu naudoti naudojant WebUI, bet ir apskritai jų nėra. Pavyzdžiui:
    • backfill reikalingas norint iš naujo paleisti užduočių egzempliorius.
      Pavyzdžiui, atėjo analitikai ir sako: „O tu, drauge, turi nesąmonių sausio 1–13 d. Pataisyk, pataisyk, pataisyk, pataisyk! O tu tokia kaitlentė:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Pagrindinė paslauga: initdb, resetdb, upgradedb, checkdb.
    • run, kuri leidžia vykdyti vieną egzemplioriaus užduotį ir netgi įvertinti visas priklausomybes. Be to, galite jį paleisti per LocalExecutor, net jei turite salierų grupę.
    • Veikia beveik tą patį test, tik taip pat bazėse nieko nerašo.
    • connections leidžia masiškai kurti ryšius iš apvalkalo.
  • „Python“ API - gana kietas bendravimo būdas, skirtas įskiepiams, o ne knibždantis jame mažomis rankytėmis. Bet kas mums trukdys eiti /home/airflow/dags, bėk ipython ir pradedi blaškytis? Pavyzdžiui, galite eksportuoti visus ryšius naudodami šį kodą:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Prisijungimas prie Airflow metaduomenų bazės. Nerekomenduoju į jį rašyti, tačiau gauti užduočių būsenas įvairioms konkrečioms metrikoms gali būti daug greičiau ir lengviau nei naudojant bet kurią API.

    Tarkime, ne visos mūsų užduotys yra idempotentiškos, tačiau kartais jos gali nukristi, ir tai normalu. Bet keli užsikimšimai jau kelia įtarimų, ir reikėtų patikrinti.

    Saugokitės SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

Nuorodos

Ir, žinoma, pirmosios dešimt nuorodų iš „Google“ išleidimo yra aplanko „Airflow“ turinys iš mano žymių.

Ir straipsnyje naudojamos nuorodos:

Šaltinis: www.habr.com