Apache loftflæði: Gerir ETL auðveldara

Hæ, ég er Dmitry Logvinenko - Gagnaverkfræðingur greiningardeildar Vezet fyrirtækjasamsteypunnar.

Ég mun segja þér frá frábæru tæki til að þróa ETL ferla - Apache Airflow. En Airflow er svo fjölhæft og margþætt að þú ættir að skoða það betur jafnvel þótt þú sért ekki í gagnaflæði, heldur þurfir þú reglulega að ræsa hvaða ferla sem er og fylgjast með framkvæmd þeirra.

Og já, ég mun ekki aðeins segja, heldur einnig sýna: forritið hefur mikið af kóða, skjámyndum og ráðleggingum.

Apache loftflæði: Gerir ETL auðveldara
Það sem þú sérð venjulega þegar þú gúglar orðið Airflow / Wikimedia Commons

efnisyfirlit

Inngangur

Apache Airflow er alveg eins og Django:

  • skrifað í python
  • það er frábært stjórnendaborð,
  • stækkanlegt endalaust

- aðeins betra, og það var gert í allt öðrum tilgangi, nefnilega (eins og það er skrifað á undan kat):

  • keyra og fylgjast með verkefnum á ótakmarkaðan fjölda véla (eins og mörg sellerí / Kubernetes og samviska þín mun leyfa þér)
  • með kraftmiklu verkflæðisframleiðslu frá mjög auðvelt að skrifa og skilja Python kóða
  • og getu til að tengja hvaða gagnagrunna og API sem er við hvert annað með því að nota bæði tilbúna íhluti og heimagerða viðbætur (sem er mjög einfalt).

Við notum Apache Airflow svona:

  • við söfnum gögnum frá ýmsum aðilum (mörgum SQL Server og PostgreSQL tilvikum, ýmsum API með forritamælingum, jafnvel 1C) í DWH og ODS (við höfum Vertica og Clickhouse).
  • hversu háþróaður cron, sem byrjar gagnasamþjöppunarferli á ODS, og fylgist einnig með viðhaldi þeirra.

Þar til nýlega var þörfum okkar fullnægt af einum litlum netþjóni með 32 kjarna og 50 GB af vinnsluminni. Í Airflow virkar þetta:

  • meira 200 dags (reyndar verkflæði, þar sem við fylltum verkefni),
  • í hverjum að meðaltali 70 verkefni,
  • þetta góðgæti byrjar (einnig að meðaltali) einu sinni á klukkustund.

Og um hvernig við stækkuðum mun ég skrifa hér að neðan, en nú skulum við skilgreina yfirvandann sem við munum leysa:

Það eru þrír uppruna-SQL-þjónar, hver með 50 gagnagrunnum - tilvik af einu verkefni, hver um sig, þeir hafa sömu uppbyggingu (nánast alls staðar, mua-ha-ha), sem þýðir að hver hefur pöntunartöflu (sem betur fer töflu með því) nafn er hægt að ýta inn í hvaða fyrirtæki sem er). Við tökum gögnin með því að bæta við þjónustusviðum (upprunaþjóni, frumgagnagrunni, ETL verkefnaauðkenni) og henda þeim á barnalegan hátt inn í, segjum, Vertica.

Við skulum fara!

Aðalhlutinn, verklegur (og svolítið fræðilegur)

Af hverju er það fyrir okkur (og fyrir þig)

Þegar trén voru stór og ég var einföld SQL-schik í einni rússneskri smásölu, við svikum ETL ferla aka gagnaflæði með því að nota tvö verkfæri sem eru tiltæk fyrir okkur:

  • Informatica Power Center - ákaflega dreift kerfi, afar afkastamikið, með eigin vélbúnaði, eigin útgáfu. Ég notaði Guð forði 1% af getu þess. Hvers vegna? Jæja, fyrst af öllu, þetta viðmót, einhvers staðar frá 380, setti andlega pressu á okkur. Í öðru lagi er þessi búnaður hannaður fyrir einstaklega flotta ferla, tryllta endurnotkun íhluta og önnur mjög mikilvæg fyrirtækisbrellur. Um þá staðreynd að það kostar, eins og væng Airbus AXNUMX / ár, munum við ekki segja neitt.

    Varist, skjáskot getur skaðað fólk undir 30 aðeins

    Apache loftflæði: Gerir ETL auðveldara

  • SQL Server samþættingarþjónn - við notuðum þennan félaga í innra verkefnaflæði okkar. Jæja, í raun: við notum nú þegar SQL Server og það væri einhvern veginn ósanngjarnt að nota ekki ETL verkfæri hans. Allt í henni er gott: bæði viðmótið er fallegt og framvinduskýrslurnar ... En þetta er ekki ástæðan fyrir því að við elskum hugbúnaðarvörur, ó, ekki fyrir þetta. Útgáfa það dtsx (sem er XML með hnútum sem eru stokkaðir á vistun) við getum það, en hver er tilgangurinn? Hvað með að búa til verkefnapakka sem mun draga hundruð borða frá einum netþjóni til annars? Já, hvaða hundrað, vísifingur þinn mun detta af tuttugu bitum, með því að smella á músarhnappinn. En það lítur örugglega meira út í tísku:

    Apache loftflæði: Gerir ETL auðveldara

Við leituðum svo sannarlega leiða út. Mál jafnvel næstum kom að sjálfskrifuðum SSIS pakka rafall ...

…og svo fann ég nýtt starf. Og Apache Airflow náði mér á því.

Þegar ég komst að því að ETL ferlalýsingar eru einfaldur Python kóða dansaði ég bara ekki af gleði. Svona voru gagnastraumar útfærðir og breyttir og að hella töflum með einni uppbyggingu úr hundruðum gagnagrunna í eitt skotmark varð spurning um Python kóða á einum og hálfum eða tveimur 13” skjám.

Að setja saman klasann

Við skulum ekki skipuleggja algjörlega leikskóla og ekki tala um algjörlega augljósa hluti hér, eins og að setja upp Airflow, gagnagrunninn sem þú valdir, sellerí og önnur tilvik sem lýst er í bryggjunni.

Svo að við getum strax hafið tilraunir, skissaði ég docker-compose.yml þar sem:

  • Við skulum reyndar hækka Loftstreymi: Tímaáætlun, vefþjónn. Blóm mun einnig snúast þar til að fylgjast með selleríverkefnum (vegna þess að það hefur þegar verið ýtt inn apache/airflow:1.10.10-python3.7, en okkur er sama)
  • PostgreSQL, þar sem Airflow mun skrifa þjónustuupplýsingar sínar (dagskrárgögn, framkvæmdatölfræði osfrv.), og Celery mun merkja lokið verkefnum;
  • Redis, sem mun starfa sem verkefnamiðlari fyrir sellerí;
  • Sellerí verkamaður, sem mun sinna beinni framkvæmd verkefna.
  • Til möppu ./dags við munum bæta við skrám okkar með lýsingu dags. Þeir verða teknir upp á flugu, þannig að það er engin þörf á að leika allan stafla eftir hverja hnerra.

Sums staðar er kóðinn í dæmunum ekki alveg sýndur (til að gera textann ekki í ruglinu), en einhvers staðar er honum breytt í ferlinu. Heildar dæmi um vinnukóða má finna í geymslunni https://github.com/dm-logv/airflow-tutorial.

Docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Skýringar:

  • Við samsetningu tónverksins treysti ég að miklu leyti á hina þekktu mynd pukkel/docker-loftflæði - endilega kíkið á það. Kannski þarftu ekki neitt annað í lífi þínu.
  • Allar loftflæðisstillingar eru fáanlegar ekki aðeins í gegnum airflow.cfg, en einnig í gegnum umhverfisbreytur (þökk sé þróunaraðilum), sem ég nýtti mér illgjarnt.
  • Auðvitað er það ekki tilbúið til framleiðslu: ég setti vísvitandi ekki hjartslátt á gáma, ég nennti ekki öryggi. En ég gerði það lágmark sem hentaði tilraunamönnum okkar.
  • Athugaðu að:
    • Dag mappan verður að vera aðgengileg bæði fyrir tímaritara og starfsmenn.
    • Sama á við um öll þriðja aðila bókasöfn - þau verða öll að vera uppsett á vélum með tímaáætlun og starfsmönnum.

Jæja, nú er það einfalt:

$ docker-compose up --scale worker=3

Eftir að allt hækkar geturðu skoðað vefviðmótin:

Grunnhugtök

Ef þú skildir ekkert í öllum þessum „dögum“, þá er hér stutt orðabók:

  • Tímaáætlun - mikilvægasti frændi í Airflow, sem stjórnar því að vélmenni vinni hörðum höndum, en ekki manneskja: fylgist með dagskrá, uppfærir dags, setur verkefni af stað.

    Almennt séð, í eldri útgáfum, átti hann í vandræðum með minni (nei, ekki minnisleysi, heldur leki) og arfleifðarfæribreytan hélst jafnvel í stillingunum run_duration — endurræsingartímabil þess. En nú er allt í lagi.

  • DAG (aka "dag") - "stýrt óhringlaga graf", en slík skilgreining mun segja fáum, en í raun er það ílát fyrir verkefni sem hafa samskipti sín á milli (sjá hér að neðan) eða hliðstæða pakka í SSIS og Workflow í Informatica .

    Auk dags geta enn verið undirdagar, en við munum líklegast ekki ná þeim.

  • DAG Run - frumstillt dag, sem er úthlutað sínum eigin execution_date. Dagrans sama dags getur virkað samhliða (ef þú hefur gert verkefni þín auðmjúk, auðvitað).
  • Flugrekandi eru stykki af kóða sem bera ábyrgð á að framkvæma tiltekna aðgerð. Það eru þrjár gerðir af rekstraraðilum:
    • aðgerðeins og uppáhaldið okkar PythonOperator, sem getur framkvæmt hvaða (gildan) Python kóða sem er;
    • flytja, sem flytja gögn frá stað til stað, td, MsSqlToHiveTransfer;
    • skynjari á hinn bóginn mun það leyfa þér að bregðast við eða hægja á frekari framkvæmd dagsins þar til atburður á sér stað. HttpSensor getur dregið tilgreindan endapunkt, og þegar æskilegt svar bíður, byrjaðu flutninginn GoogleCloudStorageToS3Operator. Forvitinn hugur mun spyrja: „af hverju? Þegar öllu er á botninn hvolft geturðu gert endurtekningar beint í símanum!“ Og svo, í því skyni að stífla ekki laug verkefna með stöðvuðum rekstraraðilum. Skynjarinn fer í gang, athugar og deyr fyrir næstu tilraun.
  • Verkefni - yfirlýstir rekstraraðilar, óháð tegund, og tengdir daginum eru færðir í stöðu verkefnis.
  • verkefni dæmi - þegar aðalskipuleggjandi ákvað að það væri kominn tími til að senda verkefni í bardaga á listamenn (rétt á staðnum, ef við notum LocalExecutor eða í fjarlægan hnút ef um er að ræða CeleryExecutor), það úthlutar þeim samhengi (þ.e. mengi af breytum - framkvæmdarbreytur), stækkar skipana- eða fyrirspurnarsniðmát og sameinar þau.

Við búum til verkefni

Í fyrsta lagi skulum við gera grein fyrir almennu kerfi dougsins okkar, og síðan munum við kafa í smáatriðin meira og meira, vegna þess að við notum nokkrar ekki léttvægar lausnir.

Svo, í sinni einföldustu mynd, mun slíkur dag líta svona út:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Við skulum reikna það út:

  • Fyrst flytjum við inn nauðsynlegar libs og eitthvað annað;
  • sql_server_ds - Er List[namedtuple[str, str]] með nöfnum á tengingum frá Airflow Connections og gagnagrunnum sem við munum taka diskinn okkar úr;
  • dag - tilkynning dagsins okkar, sem verður endilega að vera í globals(), annars finnur Airflow það ekki. Doug þarf líka að segja:
    • hvað heitir hann orders - þetta nafn mun þá birtast í vefviðmótinu,
    • að hann vinni frá miðnætti þann áttunda júlí,
    • og það ætti að keyra, á um það bil 6 klukkustunda fresti (fyrir harða krakka hér í staðinn fyrir timedelta() leyfilegt cron-lína 0 0 0/6 ? * * *, fyrir þá sem minna kúl - tjáning eins og @daily);
  • workflow() mun sinna aðalstarfinu, en ekki núna. Í bili munum við bara henda samhengi okkar inn í loggbókina.
  • Og nú er einfaldi galdurinn við að búa til verkefni:
    • við rennum í gegnum heimildir okkar;
    • frumstilla PythonOperator, sem mun aflífa dúlluna okkar workflow(). Ekki gleyma að tilgreina einstakt (innan dag) heiti verkefnisins og binda daginn sjálfan. Fáni provide_context aftur á móti mun hella fleiri rökum í fallið, sem við munum safna vandlega með því að nota **context.

Í bili er það allt. Það sem við fengum:

  • nýr dag í vefviðmótinu,
  • eitt og hálft hundrað verkefni sem verða unnin samhliða (ef Airflow, Sellerí stillingar og getu netþjóns leyfa það).

Jæja, náði því næstum.

Apache loftflæði: Gerir ETL auðveldara
Hver mun setja upp ósjálfstæðin?

Til að einfalda þetta allt saman þá skrúfaði ég inn docker-compose.yml vinnslu requirements.txt á öllum hnútum.

Nú er það horfið:

Apache loftflæði: Gerir ETL auðveldara

Gráir reitir eru verktilvik sem unnin eru af tímaáætlunarmanni.

Við bíðum aðeins, verkefnin eru tekin upp af starfsmönnum:

Apache loftflæði: Gerir ETL auðveldara

Þeir grænu hafa að sjálfsögðu lokið sínu starfi. Rauðir eru ekki mjög vel heppnaðir.

Við the vegur, það er engin mappa á vörunum okkar ./dags, það er engin samstilling á milli véla - allir dagar liggja í git á Gitlab okkar, og Gitlab CI dreifir uppfærslum á vélar við sameiningu master.

Smá um blóm

Á meðan verkamennirnir eru að troða snuðunum okkar skulum við muna eftir öðru tæki sem getur sýnt okkur eitthvað - Blóm.

Fyrsta síða með samantektarupplýsingum um starfshnúta:

Apache loftflæði: Gerir ETL auðveldara

Ákaflegasta síðan með verkefnum sem fóru í vinnuna:

Apache loftflæði: Gerir ETL auðveldara

Leiðinlegasta síðan með stöðu miðlara okkar:

Apache loftflæði: Gerir ETL auðveldara

Bjartasta síðan er með verkefnastöðugröf og framkvæmdartíma þeirra:

Apache loftflæði: Gerir ETL auðveldara

Við hleðjum undirhlaðna

Svo, öll verkefnin hafa gengið upp, þú getur borið burt særða.

Apache loftflæði: Gerir ETL auðveldara

Og það voru margir særðir - af einni eða annarri ástæðu. Ef um er að ræða rétta notkun á Airflow benda einmitt þessir reitir til þess að gögnin hafi örugglega ekki borist.

Þú þarft að fylgjast með skránni og endurræsa tilvikin sem hafa fallið.

Með því að smella á hvaða ferning sem er sjáum við þær aðgerðir sem okkur standa til boða:

Apache loftflæði: Gerir ETL auðveldara

Þú getur tekið og gert Clear the fallen. Það er, við gleymum að eitthvað hefur mistekist þar og sama tilviksverkefni mun fara í tímaáætlunarmanninn.

Apache loftflæði: Gerir ETL auðveldara

Það er ljóst að það er ekki mjög mannúðlegt að gera þetta með músinni með öllum rauðu ferningunum - þetta er ekki það sem við búumst við frá Airflow. Auðvitað eigum við gereyðingarvopn: Browse/Task Instances

Apache loftflæði: Gerir ETL auðveldara

Við skulum velja allt í einu og núllstilla, smelltu á réttan hlut:

Apache loftflæði: Gerir ETL auðveldara

Eftir hreinsun líta leigubílarnir okkar svona út (þeir eru nú þegar að bíða eftir að tímaáætlunarmaðurinn skipuleggi þá):

Apache loftflæði: Gerir ETL auðveldara

Tengingar, krókar og aðrar breytur

Það er kominn tími til að líta á næsta DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Hafa allir gert skýrsluuppfærslu? Þetta er hún aftur: það er listi yfir heimildir þaðan sem hægt er að fá gögnin; það er listi hvar á að setja; ekki gleyma að tuða þegar allt gerðist eða bilaði (jæja, þetta snýst ekki um okkur, nei).

Við skulum fara í gegnum skrána aftur og skoða nýju óljósu dótið:

  • from commons.operators import TelegramBotSendMessage - ekkert kemur í veg fyrir að við búum til okkar eigin símafyrirtæki, sem við nýttum okkur með því að búa til litla umbúðir til að senda skilaboð til Unblocked. (Við munum tala meira um þennan rekstraraðila hér að neðan);
  • default_args={} - dag getur dreift sömu rökum til allra rekstraraðila;
  • to='{{ var.value.all_the_kings_men }}' - sviði to við munum ekki hafa harðkóða, heldur myndað á kraftmikinn hátt með Jinja og breytu með lista yfir tölvupósta, sem ég setti vandlega inn Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — skilyrði fyrir því að ræsa rekstraraðila. Í okkar tilviki mun bréfið aðeins fljúga til yfirmannanna ef öll ósjálfstæði hafa gengið upp með góðum árangri;
  • tg_bot_conn_id='tg_main' - rök conn_id samþykkja auðkenni tenginga sem við búum til í Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - skilaboð í Telegram munu aðeins fljúga í burtu ef það eru fallin verkefni;
  • task_concurrency=1 - við bönnum samtímis kynningu á nokkrum verkefnatilvikum af einu verkefni. Annars munum við fá samtímis kynningu á nokkrum VerticaOperator (horft á eitt borð);
  • report_update >> [email, tg] - allt VerticaOperator sameinast við að senda bréf og skilaboð, eins og þetta:
    Apache loftflæði: Gerir ETL auðveldara

    En þar sem rekstraraðilar tilkynnenda hafa mismunandi ræsingarskilyrði mun aðeins einn virka. Í trésýninni lítur allt aðeins minna út:
    Apache loftflæði: Gerir ETL auðveldara

Ég ætla að fara nokkrum orðum um fjölvi og vinir þeirra - breytum.

Fjölvi eru Jinja staðgenglar sem geta skipt út ýmsum gagnlegum upplýsingum í rekstrarrök. Til dæmis, svona:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} mun stækka við innihald samhengisbreytunnar execution_date með sniðinu YYYY-MM-DD: 2020-07-14. Það besta er að samhengisbreytur eru negldar við tiltekið verktilvik (ferningur í trésýninni), og þegar endurræst er munu staðgenglar stækka í sömu gildi.

Úthlutað gildi er hægt að skoða með því að nota Rendered hnappinn á hverju verktilviki. Svona er verkefnið við að senda bréf:

Apache loftflæði: Gerir ETL auðveldara

Og svo við verkefnið með því að senda skilaboð:

Apache loftflæði: Gerir ETL auðveldara

Heildarlisti yfir innbyggða fjölvi fyrir nýjustu tiltæku útgáfuna er fáanlegur hér: tilvísun fjölva

Þar að auki, með hjálp viðbætur, getum við lýst yfir eigin fjölvi, en það er önnur saga.

Til viðbótar við fyrirfram skilgreinda hluti, getum við skipt út gildum breytanna okkar (ég notaði þetta nú þegar í kóðanum hér að ofan). Við skulum búa til Admin/Variables nokkur atriði:

Apache loftflæði: Gerir ETL auðveldara

Allt sem þú getur notað:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Gildið getur verið stigstærð, eða það getur líka verið JSON. Ef um JSON er að ræða:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

notaðu bara slóðina að viðkomandi lykli: {{ var.json.bot_config.bot.token }}.

Ég ætla bókstaflega að segja eitt orð og sýna eitt skjáskot um соединения. Allt er grunnatriði hér: á síðunni Admin/Connections við búum til tengingu, bætum við innskráningum / lykilorðum okkar og nákvæmari breytum þar. Svona:

Apache loftflæði: Gerir ETL auðveldara

Lykilorð geta verið dulkóðuð (rækilegar en sjálfgefið), eða þú getur sleppt tengingargerðinni (eins og ég gerði fyrir tg_main) - Staðreyndin er sú að listinn yfir tegundir er tengdur í Airflow módelum og er ekki hægt að stækka hann án þess að komast inn í frumkóðann (ef ég var skyndilega ekki að googla eitthvað, vinsamlegast leiðréttið mig), en ekkert mun koma í veg fyrir að við fáum inneignir bara kl. nafn.

Þú getur líka gert nokkrar tengingar með sama nafni: í þessu tilfelli, aðferðin BaseHook.get_connection(), sem fær okkur tengingar með nafni, mun gefa handahófi frá nokkrum nafna (það væri rökréttara að gera Round Robin, en við skulum skilja það eftir á samvisku Airflow þróunaraðila).

Breytur og tengingar eru vissulega flott verkfæri, en það er mikilvægt að missa ekki jafnvægið: hvaða hluta flæðisins þíns geymir þú í kóðanum sjálfum og hvaða hluta þú gefur Airflow til geymslu. Annars vegar getur verið þægilegt að breyta gildinu fljótt, til dæmis póstkassa, í gegnum HÍ. Á hinn bóginn er þetta enn afturhvarf til músarsmellsins, sem við (ég) vildum losna við.

Vinna með tengingar er eitt af verkefnunum krókar. Almennt séð eru Airflow krókar punktar til að tengja það við þjónustu og bókasöfn þriðja aðila. Td JiraHook mun opna viðskiptavin fyrir okkur til að hafa samskipti við Jira (þú getur fært verkefni fram og til baka), og með hjálp SambaHook þú getur ýtt staðbundinni skrá til smb-punktur.

Að þátta sérsniðna rekstraraðilann

Og við komumst nálægt því að skoða hvernig það er búið til TelegramBotSendMessage

Code commons/operators.py með raunverulegum rekstraraðila:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Hér, eins og allt annað í Airflow, er allt mjög einfalt:

  • Erfði frá BaseOperator, sem útfærir töluvert af Airflow-sértækum hlutum (horfðu á tómstundir þínar)
  • Yfirlýstir reitir template_fields, þar sem Jinja mun leita að fjölvi til að vinna úr.
  • Raðaði upp réttum rökum fyrir __init__(), stilltu sjálfgefnar stillingar þar sem þörf krefur.
  • Við gleymdum ekki frumstillingu forföðursins heldur.
  • Opnaði samsvarandi krók TelegramBotHookfengið viðskiptavinahlut frá henni.
  • Hnekkt (endurskilgreind) aðferð BaseOperator.execute(), sem Airfow mun kippa sér upp við þegar kemur að því að ræsa rekstraraðilann - í henni munum við útfæra aðalaðgerðina, gleyma að skrá þig inn. (Við skráum okkur inn, við the vegur, beint inn stdout и stderr - Loftflæði mun stöðva allt, vefja það fallega, sundurliða það þar sem þörf krefur.)

Við skulum sjá hvað við höfum commons/hooks.py. Fyrsti hluti skráarinnar, með króknum sjálfum:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ég veit ekki einu sinni hvað ég á að útskýra hér, ég ætla bara að taka eftir mikilvægum atriðum:

  • Við erfum, hugsum um rökin - í flestum tilfellum verður það eitt: conn_id;
  • Yfirstíga staðlaðar aðferðir: Ég takmarkaði mig get_conn(), þar sem ég fæ tengibreytur með nafni og fæ bara hlutann extra (þetta er JSON reitur), þar sem ég (samkvæmt mínum eigin leiðbeiningum!) setti Telegram bot táknið: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ég bý til dæmi um okkar TelegramBot, gefa því sérstakan tákn.

Það er allt og sumt. Þú getur fengið viðskiptavin frá krók með því að nota TelegramBotHook().clent eða TelegramBotHook().get_conn().

Og seinni hluti skráarinnar, þar sem ég bý til örumbúðir fyrir Telegram REST API, til að draga ekki það sama python-telegram-bot fyrir eina aðferð sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Rétta leiðin er að leggja þetta allt saman: TelegramBotSendMessage, TelegramBotHook, TelegramBot - í viðbótinni, settu í opinbera geymslu og gefðu það til Open Source.

Á meðan við vorum að kynna okkur allt þetta tókst skýrsluuppfærslunum okkar að mistakast með góðum árangri og senda mér villuboð í rásinni. Ég ætla að athuga hvort það sé rangt...

Apache loftflæði: Gerir ETL auðveldara
Eitthvað brotnaði í hundinum okkar! Var það ekki það sem við áttum von á? Einmitt!

Ætlarðu að hella?

Finnst þér ég hafa misst af einhverju? Svo virðist sem hann hafi lofað að flytja gögn frá SQL Server til Vertica, og svo tók hann það og fór út fyrir efnið, skúrkur!

Þetta voðaverk var viljandi, ég varð einfaldlega að ráða einhver hugtök fyrir þig. Nú geturðu gengið lengra.

Planið okkar var þetta:

  1. Gerðu dag
  2. Búðu til verkefni
  3. Sjáðu hvað allt er fallegt
  4. Úthlutaðu lotunúmerum til fyllinga
  5. Fáðu gögn frá SQL Server
  6. Settu gögn í Vertica
  7. Safna tölfræði

Svo, til að koma þessu öllu í gang, gerði ég smá viðbót við okkar docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Þar hækkum við:

  • Vertica sem gestgjafi dwh með mest sjálfgefna stillingum,
  • þrjú tilvik af SQL Server,
  • við fyllum gagnagrunna í þeim síðarnefnda með einhverjum gögnum (í engu tilviki skoðum mssql_init.py!)

Við ræsum allt það góða með hjálp aðeins flóknari skipunar en síðast:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Það sem kraftaverka slembivalið okkar bjó til, þú getur notað hlutinn Data Profiling/Ad Hoc Query:

Apache loftflæði: Gerir ETL auðveldara
Aðalatriðið er að sýna það ekki greinendum

útfæra nánar ETL fundur Ég geri það ekki, allt er léttvægt þarna: við búum til grunn, það er merki í honum, við vefjum allt með samhengisstjóra og nú gerum við þetta:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Tíminn er kominn safna gögnum okkar frá eitt og hálft hundrað borðum okkar. Við skulum gera þetta með hjálp mjög tilgerðarlausra lína:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Með hjálp króks fáum við frá Airflow pymssql-tengjast
  2. Við skulum setja takmörkun í formi dagsetningar í beiðnina - henni verður kastað inn í aðgerðina af sniðmátsvélinni.
  3. Mata beiðni okkar pandashver fær okkur DataFrame - það mun nýtast okkur í framtíðinni.

Ég er að nota staðgöngu {dt} í stað beiðnifæribreytu %s ekki vegna þess að ég er vondur Pinocchio, heldur vegna þess pandas ræður ekki við pymssql og sleppir því síðasta params: Listþó hann vilji virkilega tuple.
Athugaðu einnig að verktaki pymssql ákvað að styðja hann ekki lengur, og það er kominn tími til að flytja út pyodbc.

Við skulum sjá hvað Airflow fyllti rök aðgerða okkar með:

Apache loftflæði: Gerir ETL auðveldara

Ef það eru engin gögn, þá þýðir ekkert að halda áfram. En það er líka skrítið að telja fyllinguna vel heppnaða. En þetta eru ekki mistök. A-ah-ah, hvað á að gera?! Og hér er það:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException segir Airflow að það séu engar villur, en við sleppum verkefninu. Viðmótið verður ekki með grænum eða rauðum ferningi, heldur bleiku.

Við skulum henda gögnunum okkar margar dálkar:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

þ.e.

  • Gagnagrunnurinn sem við tókum pantanir úr,
  • Auðkenni flóðafundarins okkar (það verður öðruvísi fyrir hvert verkefni),
  • Hash frá uppruna og pöntunarauðkenni - þannig að í endanlegum gagnagrunni (þar sem öllu er hellt í eina töflu) höfum við einstakt pöntunarkenni.

Næstsíðasta skrefið er eftir: helltu öllu í Vertica. Og einkennilega er ein stórbrotnasta og skilvirkasta leiðin til að gera þetta í gegnum CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Við erum að búa til sérstakan móttakara StringIO.
  2. pandas mun vinsamlega setja okkar DataFrame í formi CSV-línur.
  3. Við skulum opna tengingu við uppáhalds Vertica okkar með krók.
  4. Og nú með hjálpinni copy() sendu gögnin okkar beint til Vertika!

Við munum taka frá bílstjóranum hversu margar línur voru fylltar upp og segja fundarstjóranum að allt sé í lagi:

session.loaded_rows = cursor.rowcount
session.successful = True

Það er allt og sumt.

Á útsölunni búum við til markplötuna handvirkt. Hér leyfði ég mér litla vél:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

ég er að nota VerticaOperator() Ég bý til gagnagrunnsskema og töflu (ef þau eru ekki þegar til, auðvitað). Aðalatriðið er að raða ósjálfstæði rétt:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Toppur upp

— Jæja, — sagði litla músin, — er það nú ekki
Ertu sannfærður um að ég sé hræðilegasta dýrið í skóginum?

Julia Donaldson, The Gruffalo

Ég held að ef félagar mínir og ég ættum samkeppni: hver mun fljótt búa til og hefja ETL ferli frá grunni: þeir með SSIS og mús og ég með Airflow ... Og þá myndum við líka bera saman auðvelt viðhald ... Vá, ég held að þú sért sammála því að ég mun sigra þá á öllum vígstöðvum!

Ef það er aðeins alvarlegra, þá gerði Apache Airflow - með því að lýsa ferlum í formi forritskóða - starf mitt mikið þægilegri og skemmtilegri.

Ótakmarkaður stækkanleiki þess, bæði hvað varðar viðbætur og tilhneigingu til sveigjanleika, gefur þér tækifæri til að nota Airflow á næstum hvaða svæði sem er: jafnvel í heildarferli söfnunar, undirbúnings og vinnslu gagna, jafnvel við að skjóta eldflaugum á loft (til Mars, eða námskeið).

Lokahluti, tilvísun og upplýsingar

Hrífan sem við höfum safnað fyrir þig

  • start_date. Já, þetta er nú þegar staðbundið meme. Í gegnum helstu rök Doug start_date allir standast. Í stuttu máli, ef þú tilgreinir í start_date núverandi dagsetning, og schedule_interval - einn daginn, þá byrjar DAG ekki fyrr á morgun.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Og engin vandamál lengur.

    Það er önnur keyrsluvilla tengd henni: Task is missing the start_date parameter, sem gefur oftast til kynna að þú hafir gleymt að binda þig við dag rekstraraðila.

  • Allt á einni vél. Já, og bækistöðvar (loftflæðið sjálft og húðunin okkar), og vefþjónn, og tímaáætlun, og starfsmenn. Og það virkaði meira að segja. En með tímanum jókst fjöldi verkefna fyrir þjónustu og þegar PostgreSQL byrjaði að bregðast við vísitölunni á 20 sekúndum í stað 5 ms tókum við hana og fluttum hana í burtu.
  • LocalExecutor. Já, við sitjum enn á því og erum þegar komin að brún hyldýpsins. LocalExecutor hefur dugað okkur hingað til, en nú er kominn tími til að stækka með að minnsta kosti einum starfsmanni og við verðum að leggja hart að okkur til að fara yfir í CeleryExecutor. Og í ljósi þeirrar staðreyndar að þú getur unnið með það á einni vél, kemur ekkert í veg fyrir að þú notir Sellerí jafnvel á netþjóni, sem „að sjálfsögðu mun aldrei fara í framleiðslu, satt að segja!“
  • Ekki í notkun innbyggð verkfæri:
    • Tengingar til að geyma þjónustuskilríki,
    • SLA ungfrú að bregðast við verkefnum sem gengu ekki upp á réttum tíma,
    • xcom fyrir lýsigagnaskipti (sagði ég metagögn!) milli dagverkefna.
  • Misnotkun á pósti. Jæja, hvað get ég sagt? Viðvaranir voru settar upp fyrir allar endurtekningar á föllnum verkefnum. Núna er Gmail í vinnunni minni með >90 tölvupósta frá Airflow og trýni vefpóstsins neitar að taka upp og eyða meira en 100 í einu.

Fleiri gildrur: Apache Airflow Pitfails

Fleiri sjálfvirkniverkfæri

Til þess að við getum unnið enn meira með höfuðið en ekki með höndunum hefur Airflow undirbúið þetta fyrir okkur:

  • REST API - hann hefur enn stöðuna sem tilraunamaður, sem kemur ekki í veg fyrir að hann starfi. Með honum er ekki aðeins hægt að fá upplýsingar um dags og verkefni, heldur einnig stöðva/byrja dag, búa til DAG Run eða sundlaug.
  • CLI - mörg verkfæri eru fáanleg í gegnum skipanalínuna sem eru ekki bara óþægileg í notkun í gegnum vefviðmótið heldur eru þau almennt fjarverandi. Til dæmis:
    • backfill þarf til að endurræsa verktilvik.
      Til dæmis komu sérfræðingar og sögðu: „Og þú, félagi, hefur vitleysu í gögnunum frá 1. til 13. janúar! Lagaðu það, lagaðu það, lagaðu það, lagaðu það!" Og þú ert svona helluborð:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Grunnþjónusta: initdb, resetdb, upgradedb, checkdb.
    • run, sem gerir þér kleift að keyra eitt dæmi verkefni, og jafnvel skora á öllum ósjálfstæðum. Þar að auki geturðu keyrt það í gegnum LocalExecutor, jafnvel þótt þú sért með selleríklasa.
    • Gerir nokkurn veginn það sama test, aðeins líka í stöðvum skrifar ekkert.
    • connections gerir fjöldasköpun tenginga úr skelinni.
  • python api - frekar harðkjarna leið til að hafa samskipti, sem er ætluð fyrir viðbætur, og ekki sveima í henni með litlum höndum. En hver á að hindra okkur í að fara til /home/airflow/dags, hlaupa ipython og byrja að rugla? Þú getur til dæmis flutt út allar tengingar með eftirfarandi kóða:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Tengist Airflow lýsigagnagrunninum. Ég mæli ekki með því að skrifa á það, en að fá verkefnastöðu fyrir ýmsar sérstakar mælikvarða getur verið miklu hraðari og auðveldara en í gegnum hvaða API sem er.

    Segjum að verkefni okkar séu ekki öll vanmáttug, en þau geta stundum fallið og það er eðlilegt. En nokkrar stíflur eru þegar grunsamlegar og það væri nauðsynlegt að athuga.

    Varist SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

tilvísanir

Og auðvitað eru fyrstu tíu hlekkirnir frá útgáfu Google innihald Airflow möppunnar úr bókamerkjunum mínum.

Og tenglarnir sem notaðir eru í greininni:

Heimild: www.habr.com