Apache Airflow: ETL atviegloŔana

Sveiki, es esmu Dmitrijs Logviņenko ā€” uzņēmumu grupas Vezet Analytics nodaļas datu inženieris.

PastāstÄ«Å”u par brÄ«niŔķīgu rÄ«ku ETL procesu izstrādei - Apache Airflow. Taču Airflow ir tik daudzpusÄ«ga un daudzpusÄ«ga, ka jums vajadzētu to aplÅ«kot vērÄ«gāk pat tad, ja neesat iesaistÄ«ts datu plÅ«smās, bet jums ir nepiecieÅ”ams periodiski palaist kādus procesus un uzraudzÄ«t to izpildi.

Un jā, es ne tikai pastāstÄ«Å”u, bet arÄ« parādÄ«Å”u: programmā ir daudz koda, ekrānuzņēmumu un ieteikumu.

Apache Airflow: ETL atviegloŔana
Ko parasti redzat, meklējot Google vārdu Airflow / Wikimedia Commons

Satura

Ievads

Apache Airflow ir gluži kā Django:

  • rakstÄ«ts python valodā
  • ir lielisks administratora panelis,
  • paplaÅ”ināms uz nenoteiktu laiku

- tikai labāk, un tas tika izgatavots pavisam citiem mērķiem, proti (kā rakstīts pirms kata):

  • uzdevumu izpilde un uzraudzÄ«ba neierobežotā skaitā iekārtu (cik daudz Selery/Kubernetes un jÅ«su sirdsapziņa to ļaus)
  • ar dinamisku darbplÅ«smas Ä£enerÄ“Å”anu no ļoti viegli rakstāma un saprotama Python koda
  • un iespēja savienot jebkuras datu bāzes un API savā starpā, izmantojot gan gatavus komponentus, gan paÅ”taisÄ«tus spraudņus (kas ir ārkārtÄ«gi vienkārÅ”i).

Mēs izmantojam Apache Airflow Ŕādi:

  • mēs apkopojam datus no dažādiem avotiem (daudziem SQL Server un PostgreSQL gadÄ«jumiem, dažādām API ar lietojumprogrammu metriku, pat 1C) DWH un ODS (mums ir Vertica un Clickhouse).
  • cik attÄ«stÄ«ts cron, kas uzsāk datu konsolidācijas procesus ODS, kā arÄ« uzrauga to uzturÄ“Å”anu.

Vēl nesen mūsu vajadzības apmierināja viens neliels serveris ar 32 kodoliem un 50 GB RAM. Gaisa plūsmā tas darbojas:

  • vairāk 200 dienas (faktiski darbplÅ«smas, kurās mēs ievietojām uzdevumus),
  • katrā vidēji 70 uzdevumi,
  • Å”is labums sākas (arÄ« vidēji) reizi stundā.

Un par to, kā mēs paplaÅ”inājāmies, es rakstÄ«Å”u zemāk, bet tagad definēsim Ć¼ber-problēmu, kuru mēs atrisināsim:

Ir trīs avota SQL serveri, katrs ar 50 datu bāzēm - attiecīgi viena projekta gadījumi, tiem ir vienāda struktūra (gandrīz visur, mua-ha-ha), kas nozīmē, ka katram ir pasūtījumu tabula (par laimi, tabula ar to vārdu var iespiest jebkurā biznesā). Mēs ņemam datus, pievienojot servisa laukus (avota serveris, avota datu bāze, ETL uzdevuma ID) un naivi iemetam tos, piemēram, Vertica.

Iesim!

Galvenā daļa, praktiskā (un nedaudz teorētiska)

Kāpēc mēs (un jūs)

Kad koki bija lieli un es biju vienkārÅ”s SQL-schik vienā Krievijas mazumtirdzniecÄ«bā mēs izkrāpām ETL procesus jeb datu plÅ«smas, izmantojot divus mums pieejamos rÄ«kus:

  • Informācijas enerÄ£ijas centrs - ārkārtÄ«gi izplatÄ«ta sistēma, ārkārtÄ«gi produktÄ«va, ar savu aparatÅ«ru, savu versiju veidoÅ”anu. Es izmantoju 1% no tās iespējām. Kāpēc? Pirmkārt, Ŕī saskarne kaut kur no 380. gadiem radÄ«ja mums garÄ«gu spiedienu. Otrkārt, Ŕī ierÄ«ce ir paredzēta ārkārtÄ«gi smalkiem procesiem, niknai komponentu atkārtotai izmantoÅ”anai un citiem ļoti svarÄ«giem uzņēmuma trikiem. Par to, ka tas maksā, tāpat kā Airbus AXNUMX spārns / gadā, mēs neko neteiksim.

    Uzmanieties, ekrānuzņēmums var nedaudz ievainot cilvēkus, kas jaunāki par 30 gadiem

    Apache Airflow: ETL atviegloŔana

  • SQL servera integrācijas serveris - mēs izmantojām Å”o biedru mÅ«su iekŔējās projekta plÅ«smās. Nu, patiesÄ«bā: mēs jau izmantojam SQL Server, un bÅ«tu kaut kā nesaprātÄ«gi neizmantot tā ETL rÄ«kus. Viss tajā ir labs: gan interfeiss ir skaists, gan progresa ziņojumi... Bet ne tāpēc mēs mÄ«lam programmatÅ«ras produktus, ak, ne tāpēc. Versija to dtsx (kas ir XML ar mezgliem, kas sajaukti saglabāŔanas laikā) mēs varam, bet kāda jēga? Kā bÅ«tu ar uzdevumu pakotnes izveidi, kas vilks simtiem tabulu no viena servera uz otru? Jā, kāds simts, rādÄ«tājpirksts nokritÄ«s no divdesmit gabaliņiem, noklikŔķinot uz peles pogas. Bet tas noteikti izskatās modernāk:

    Apache Airflow: ETL atviegloŔana

Mēs noteikti meklējām izejas. Lieta pat gandrÄ«z nonācis pie paÅ”rakstÄ«ta SSIS pakotņu Ä£eneratora ...

ā€¦un tad mani atrada jauns darbs. Un Apache Airflow mani apsteidza tajā.

Kad uzzināju, ka ETL procesu apraksti ir vienkārÅ”s Python kods, es vienkārÅ”i nedejoju aiz prieka. Tādā veidā datu straumes tika versētas un diferencētas, un tabulu ar vienu struktÅ«ru ielieÅ”ana no simtiem datu bāzu vienā mērÄ·Ä« kļuva par Python koda jautājumu pusotra vai divos 13 collu ekrānos.

Klastera salikŔana

Nekārtosim pilnÄ«gi bērnudārzu un nerunāsim Å”eit par pilnÄ«gi paÅ”saprotamām lietām, piemēram, Airflow instalÄ“Å”anu, jÅ«su izvēlēto datubāzi, Selerijas un citiem dokos aprakstÄ«tajiem gadÄ«jumiem.

Lai mēs nekavējoties varētu sākt eksperimentus, es ieskicēju docker-compose.yml kurā:

  • PatiesÄ«bā paaugstināsim Airflow: plānotājs, tÄ«mekļa serveris. Flower arÄ« tur griezÄ«sies, lai uzraudzÄ«tu Selerijas uzdevumus (jo tas jau ir iespiests apache/airflow:1.10.10-python3.7, bet mums nav nekas pretÄ«)
  • PostgreSQL, kurā Airflow ierakstÄ«s savu servisa informāciju (plānotāja datus, izpildes statistiku utt.), bet Selery atzÄ«mēs izpildÄ«tos uzdevumus;
  • Redis, kas darbosies kā Selerijas uzdevumu brokeris;
  • Selerijas strādnieks, kas nodarbosies ar tieÅ”u uzdevumu izpildi.
  • Uz mapi ./dags mēs pievienosim savus failus ar dags aprakstu. Tie tiks savākti lidojuma laikā, tāpēc pēc katras ŔķaudÄ«Å”anas nav nepiecieÅ”ams žonglēt ar visu kaudzÄ«ti.

Dažās vietās kods piemēros nav pilnībā parādīts (lai nepārblīvētu tekstu), bet kaut kur tas tiek modificēts procesā. Pilnus darba kodu piemērus var atrast repozitorijā https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Piezīmes:

  • KompozÄ«cijas montāžā lielā mērā paļāvos uz labi zināmo tēlu puckel/docker-gaisa plÅ«sma - noteikti pārbaudiet to. VarbÅ«t tev dzÄ«vē nekas cits nav vajadzÄ«gs.
  • Visi gaisa plÅ«smas iestatÄ«jumi ir pieejami ne tikai caur airflow.cfg, bet arÄ« ar vides mainÄ«gajiem (paldies izstrādātājiem), kurus es ļaunprātÄ«gi izmantoju.
  • Protams, tas nav gatavs ražoÅ”anai: es apzināti neliku sirdspukstus uz konteineriem, es neuztraucos ar droŔību. Bet es izdarÄ«ju mÅ«su eksperimentētājiem piemēroto minimumu.
  • Pieraksti to:
    • Mapei dag ir jābÅ«t pieejamai gan plānotājam, gan darbiniekiem.
    • Tas pats attiecas uz visām treÅ”o puÅ”u bibliotēkām ā€” tām visām jābÅ«t instalētām iekārtās ar plānotāju un darbiniekiem.

Nu, tagad tas ir vienkārŔi:

$ docker-compose up --scale worker=3

Kad viss ir pacēlies, varat apskatīt tīmekļa saskarnes:

Pamatjēdzieni

Ja jūs neko nesapratāt visos Ŕajos "dags", tad Ŕeit ir īsa vārdnīca:

  • Plānotājs - vissvarÄ«gākais onkulis Airflow, kas kontrolē, lai roboti smagi strādātu, nevis cilvēks: uzrauga grafiku, atjaunina dienas, palaiž uzdevumus.

    Kopumā vecākās versijās viņam bija problēmas ar atmiņu (nē, nevis amnēzija, bet noplÅ«des) un mantotais parametrs pat palika konfigurācijās run_duration ā€” tā restartÄ“Å”anas intervāls. Bet tagad viss ir kārtÄ«bā.

  • DAG (aka "dag") - "virzÄ«ts aciklisks grafiks", taču Ŕāda definÄ«cija pateiks dažiem cilvēkiem, taču patiesÄ«bā tas ir konteiners uzdevumiem, kas mijiedarbojas viens ar otru (skatÄ«t zemāk) vai paketes SSIS un darbplÅ«smas analogs informaticā. .

    Papildus dagiem joprojām var būt subdags, bet mēs, visticamāk, līdz tiem netiksim.

  • DAG Skrējiens - inicializēts dag, kuram tiek pieŔķirts savs execution_date. Tā paÅ”a dag Dagrans var strādāt paralēli (ja jÅ«s, protams, padarÄ«jāt savus uzdevumus idempotentus).
  • operators ir koda daļas, kas ir atbildÄ«gas par noteiktas darbÄ«bas veikÅ”anu. Ir trÄ«s veidu operatori:
    • rÄ«cÄ«bakā mÅ«su mīļākie PythonOperator, kas var izpildÄ«t jebkuru (derÄ«gu) Python kodu;
    • pārsÅ«tÄ«t, kas pārsÅ«ta datus no vienas vietas uz otru, piemēram, MsSqlToHiveTransfer;
    • devējs no otras puses, tas ļaus jums reaģēt vai palēnināt turpmāko dag izpildi, lÄ«dz notiek notikums. HttpSensor var izvilkt norādÄ«to beigu punktu un, kad vēlamā atbilde gaida, sākt pārsÅ«tÄ«Å”anu GoogleCloudStorageToS3Operator. ZiņkārÄ«gs prāts jautās: ā€œKāpēc? Galu galā jÅ«s varat veikt atkārtojumus tieÅ”i pie operatora! Un pēc tam, lai neaizsprostotu uzdevumu kopumu ar apturētajiem operatoriem. Sensors ieslēdzas, pārbauda un nomirst pirms nākamā mēģinājuma.
  • Uzdevums - deklarētie operatori neatkarÄ«gi no veida un pievienoti dag tiek paaugstināti uz uzdevuma pakāpi.
  • uzdevuma gadÄ«jums - kad Ä£enerālplānotājs nolēma, ka ir pienācis laiks sÅ«tÄ«t uzdevumus kaujā izpildÄ«tājiem-strādniekiem (tieÅ”i uz vietas, ja mēs izmantojam LocalExecutor vai uz attālo mezglu, ja CeleryExecutor), tas pieŔķir tiem kontekstu (t.i., mainÄ«go lielumu kopu - izpildes parametrus), paplaÅ”ina komandu vai vaicājumu veidnes un apvieno tās.

Mēs ģenerējam uzdevumus

Vispirms ieskicētu mūsu douga vispārējo shēmu, un tad arvien vairāk iedziļināsimies detaļās, jo mēs izmantojam dažus netriviālus risinājumus.

Tātad vienkārŔākajā formā Ŕāds dags izskatīsies Ŕādi:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Izdomāsim:

  • Pirmkārt, mēs importējam nepiecieÅ”amos libs un kaut kas cits;
  • sql_server_ds -Å o List[namedtuple[str, str]] ar savienojumu nosaukumiem no Airflow Connections un datu bāzēm, no kurām mēs ņemsim savu plāksni;
  • dag - mÅ«su dienas paziņojums, kuram obligāti jābÅ«t iekŔā globals(), pretējā gadÄ«jumā Airflow to neatradÄ«s. Dagam arÄ« jāsaka:
    • kāds ir viņa vārds orders - Å”is nosaukums tiks parādÄ«ts tÄ«mekļa saskarnē,
    • ka viņŔ strādās no astotā jÅ«lija pusnakts,
    • un tam vajadzētu darboties aptuveni ik pēc 6 stundām (nevis skarbajiem puiÅ”iem timedelta() pieļaujama cron- lÄ«nija 0 0 0/6 ? * * *, mazāk forÅ”ajiem - izteiciens patÄ«k @daily);
  • workflow() darÄ«s galveno darbu, bet ne tagad. Pagaidām mēs vienkārÅ”i iekļausim savu kontekstu žurnālā.
  • Un tagad vienkārÅ”a uzdevumu izveides burvÄ«ba:
    • mēs skrienam cauri saviem avotiem;
    • palaist PythonOperator, kas izpildÄ«s mÅ«su manekenu workflow(). Neaizmirstiet norādÄ«t unikālu (dag ietvaros) uzdevuma nosaukumu un piesaistÄ«t paÅ”u dag. Karogs provide_context savukārt funkcijā iebērs papildu argumentus, kurus rÅ«pÄ«gi apkoposim izmantojot **context.

Pagaidām tas arī viss. Ko mēs saņēmām:

  • jauna diena tÄ«mekļa saskarnē,
  • pusotrs simts uzdevumu, kas tiks izpildÄ«ti paralēli (ja to ļaus Airflow, Selery iestatÄ«jumi un servera jauda).

Nu, gandrīz sapratu.

Apache Airflow: ETL atviegloŔana
KurÅ” instalēs atkarÄ«bas?

Lai vienkārÅ”otu Å”o visu, es ieskrÅ«vēju docker-compose.yml apstrāde requirements.txt visos mezglos.

Tagad tas ir pazudis:

Apache Airflow: ETL atviegloŔana

Pelēki kvadrāti ir uzdevumu gadījumi, ko apstrādā plānotājs.

Nedaudz pagaidām, darbus ķer strādnieki:

Apache Airflow: ETL atviegloŔana

Zaļie, protams, savu darbu ir veiksmīgi pabeiguŔi. Sarkanie nav īpaŔi veiksmīgi.

Starp citu, mūsu prod nav mapes ./dags, nav sinhronizācijas starp maŔīnām - visi dags atrodas iekŔā git mūsu Gitlab, un Gitlab CI izplata atjauninājumus iekārtām, kad tās tiek apvienotas master.

Mazliet par Ziedu

Kamēr strādnieki dauza mÅ«su knupÄ«Å”us, atcerēsimies vēl vienu rÄ«ku, kas var mums kaut ko parādÄ«t - Ziedu.

Pati pirmā lapa ar kopsavilkuma informāciju par darbinieku mezgliem:

Apache Airflow: ETL atviegloŔana

Visintensīvākā lapa ar uzdevumiem, kas tika veikti:

Apache Airflow: ETL atviegloŔana

Garlaicīgākā lapa ar mūsu brokera statusu:

Apache Airflow: ETL atviegloŔana

Spilgtākā lapa ir ar uzdevumu statusa grafikiem un to izpildes laiku:

Apache Airflow: ETL atviegloŔana

Mēs ielādējam nepietiekami noslogotos

Tātad, visi uzdevumi ir izpildīti, jūs varat aizvest ievainotos.

Apache Airflow: ETL atviegloŔana

Un bija daudz ievainoto - viena vai otra iemesla dēļ. Pareizas Airflow lietoÅ”anas gadÄ«jumā tieÅ”i Å”ie kvadrāti norāda, ka dati noteikti nav saņemti.

Jums jāskatās žurnāls un jārestartē krituŔās uzdevumu instances.

NoklikŔķinot uz jebkura kvadrāta, mēs redzēsim mums pieejamās darbÄ«bas:

Apache Airflow: ETL atviegloŔana

JÅ«s varat ņemt un padarÄ«t Clear krituÅ”o. Tas ir, mēs aizmirstam, ka tur kaut kas neizdevās, un tas pats instances uzdevums tiks nosÅ«tÄ«ts plānotājam.

Apache Airflow: ETL atviegloŔana

Skaidrs, ka to darÄ«t ar peli ar visiem sarkanajiem kvadrātiņiem nav Ä«paÅ”i humāni ā€“ tas nav tas, ko mēs sagaidām no Airflow. Protams, mums ir masu iznÄ«cināŔanas ieroči: Browse/Task Instances

Apache Airflow: ETL atviegloŔana

Atlasīsim visu uzreiz un atiestatīsim uz nulli, noklikŔķiniet uz pareizā vienuma:

Apache Airflow: ETL atviegloŔana

Pēc tÄ«rÄ«Å”anas mÅ«su taksometri izskatās Ŕādi (viņi jau gaida, kad plānotājs tos saplānos):

Apache Airflow: ETL atviegloŔana

Savienojumi, āķi un citi mainīgie

Ir pienācis laiks apskatīt nākamo DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Š“Š¾ŃŠæŠ¾Š“Š° хŠ¾Ń€Š¾ŃˆŠøŠµ, Š¾Ń‚чŠµŃ‚Ń‹ Š¾Š±Š½Š¾Š²Š»ŠµŠ½Ń‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ŠŠ°Ń‚Š°Ńˆ, ŠæрŠ¾ŃŃ‹ŠæŠ°Š¹ŃŃ, Š¼Ń‹ {{ dag.dag_id }} урŠ¾Š½ŠøŠ»Šø
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Vai visi kādreiz ir veikuÅ”i pārskata atjaunināŔanu? Å Ä« atkal ir viņa: ir saraksts ar avotiem, no kuriem iegÅ«t datus; ir saraksts, kur likt; neaizmirstiet paburkŔķēt, kad viss notika vai salÅ«za (nu, tas nav par mums, nē).

Pārskatīsim failu vēlreiz un apskatīsim jaunos neskaidros materiālus:

  • from commons.operators import TelegramBotSendMessage - nekas neliedz mums izveidot savus operatorus, ko izmantojām, izveidojot nelielu iesaiņojumu ziņojumu nosÅ«tÄ«Å”anai uz Unbloed. (Par Å”o operatoru vairāk runāsim tālāk);
  • default_args={} - dag var izplatÄ«t vienādus argumentus visiem saviem operatoriem;
  • to='{{ var.value.all_the_kings_men }}' - lauks to mums nebÅ«s kodēts, bet dinamiski Ä£enerēts, izmantojot Jinja un mainÄ«go ar e-pasta sarakstu, ko es rÅ«pÄ«gi ievietoju Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS ā€” nosacÄ«jums operatora palaiÅ”anai. MÅ«su gadÄ«jumā vēstule aizlidos pie priekÅ”niekiem tikai tad, ja visas atkarÄ«bas bÅ«s atrisinātas veiksmÄ«gi;
  • tg_bot_conn_id='tg_main' - argumenti conn_id pieņemt mÅ«su izveidotos savienojuma ID Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - ziņojumi telegrammā aizlidos tikai tad, ja bÅ«s nokrituÅ”i uzdevumi;
  • task_concurrency=1 - mēs aizliedzam vairāku viena uzdevuma uzdevumu vienlaicÄ«gu palaiÅ”anu. Pretējā gadÄ«jumā mēs saņemsim vairāku vienlaicÄ«gu palaiÅ”anu VerticaOperator (skatoties uz vienu galdu);
  • report_update >> [email, tg] - viss VerticaOperator saplÅ«st vēstuļu un ziņojumu sÅ«tÄ«Å”anā, piemēram:
    Apache Airflow: ETL atviegloŔana

    Bet, tā kā paziņotāju operatoriem ir dažādi palaiÅ”anas nosacÄ«jumi, darbosies tikai viens. Koka skatā viss izskatās mazāk vizuāli:
    Apache Airflow: ETL atviegloŔana

Es teikÅ”u dažus vārdus par makro un viņu draugi - mainÄ«gie.

Makro ir Jinja vietturi, kas operatora argumentos var aizstāt dažādu noderÄ«gu informāciju. Piemēram, Ŕādi:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} tiks paplaÅ”ināts lÄ«dz konteksta mainÄ«gā saturam execution_date formātā YYYY-MM-DD: 2020-07-14. Labākais ir tas, ka konteksta mainÄ«gie tiek pienagloti konkrētam uzdevuma gadÄ«jumam (kvadrātiņam koka skatā), un, restartējot, vietturi tiks paplaÅ”ināti lÄ«dz tādām paŔām vērtÄ«bām.

PieŔķirtās vērtÄ«bas var apskatÄ«t, izmantojot pogu Rendered katrā uzdevuma instancē. Å is ir uzdevums ar vēstules nosÅ«tÄ«Å”anu:

Apache Airflow: ETL atviegloŔana

Un tā uzdevumā ar ziņojuma nosÅ«tÄ«Å”anu:

Apache Airflow: ETL atviegloŔana

Pilns jaunākās pieejamās versijas iebÅ«vēto makro saraksts ir pieejams Å”eit: makro atsauce

Turklāt ar spraudņu palīdzību mēs varam deklarēt savus makro, bet tas ir cits stāsts.

Papildus iepriekÅ” definētajām lietām mēs varam aizstāt mÅ«su mainÄ«go vērtÄ«bas (es to jau izmantoju iepriekÅ” minētajā kodā). Izveidosim iekŔā Admin/Variables pāris lietas:

Apache Airflow: ETL atviegloŔana

Viss, ko varat izmantot:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Vērtība var būt skalārs vai arī JSON. JSON gadījumā:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

vienkārÅ”i izmantojiet ceļu uz vajadzÄ«go atslēgu: {{ var.json.bot_config.bot.token }}.

Es burtiski teikÅ”u vienu vārdu un parādÄ«Å”u vienu ekrānuzņēmumu par savienojumi. Å eit viss ir elementāri: lapā Admin/Connections mēs izveidojam savienojumu, pievienojam savus pieteikumvārdus / paroles un specifiskākus parametrus. Kā Å”is:

Apache Airflow: ETL atviegloŔana

Paroles var Å”ifrēt (pamatÄ«gāk nekā noklusējuma), vai arÄ« varat nepievienot savienojuma veidu (kā es to darÄ«ju tg_main) - fakts ir tāds, ka Airflow modeļos tipu saraksts ir savienots un to nevar paplaÅ”ināt, neiedziļinoties avota kodos (ja pēkŔņi kaut ko nemeklēju googlē, lÅ«dzu, izlabojiet mani), taču nekas netraucēs mums iegÅ«t kredÄ«tus. nosaukums.

Varat arÄ« izveidot vairākus savienojumus ar tādu paÅ”u nosaukumu: Å”ajā gadÄ«jumā metode BaseHook.get_connection(), kas iegÅ«st mums savienojumus pēc nosaukuma, dos nejauÅ”i no vairākiem vārdabrāliem (loÄ£iskāk bÅ«tu uztaisÄ«t Round Robin, bet atstāsim to uz Airflow izstrādātāju sirdsapziņas).

MainÄ«gie un savienojumi noteikti ir lieliski rÄ«ki, taču ir svarÄ«gi nezaudēt lÄ«dzsvaru: kuras plÅ«smas daļas jÅ«s saglabājat paŔā kodā un kuras daļas nododat glabāŔanai Airflow. No vienas puses, var bÅ«t ērti ātri mainÄ«t vērtÄ«bu, piemēram, pasta kastÄ«ti, izmantojot lietotāja saskarni. No otras puses, Ŕī joprojām ir atgrieÅ”anās pie peles klikŔķa, no kura mēs (es) gribējām atbrÄ«voties.

Darbs ar savienojumiem ir viens no uzdevumiem āķi. Kopumā Airflow āķi ir punkti, lai to savienotu ar treÅ”o puÅ”u pakalpojumiem un bibliotēkām. Piemēram, JiraHook atvērs klientu, lai mēs varētu sadarboties ar Jira (jÅ«s varat pārvietot uzdevumus uz priekÅ”u un atpakaļ), un ar SambaHook varat nosÅ«tÄ«t vietējo failu smb- punkts.

Pielāgotā operatora parsÄ“Å”ana

Un mēs tuvojāmies tam, lai apskatītu, kā tas ir izgatavots TelegramBotSendMessage

Kods commons/operators.py ar faktisko operatoru:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Å eit, tāpat kā viss pārējais Airflow, viss ir ļoti vienkārÅ”s:

  • Mantojums no BaseOperator, kas ievieÅ” diezgan daudz ar gaisa plÅ«smu raksturÄ«gu lietu (skatieties uz savu atpÅ«tu)
  • Deklarētie lauki template_fields, kurā Jinja meklēs makro, ko apstrādāt.
  • Sakārtoja pareizos argumentus par __init__(), iestatiet noklusējuma iestatÄ«jumus, ja nepiecieÅ”ams.
  • Neaizmirsām arÄ« par senča inicializāciju.
  • Atvēra atbilstoÅ”o āķi TelegramBotHooksaņēma no tā klienta objektu.
  • Ignorēta (pārdefinēta) metode BaseOperator.execute(), kuru Airfow raustÄ«s, kad pienāks laiks palaist operatoru - tajā mēs Ä«stenosim galveno darbÄ«bu, aizmirstot pieteikties. (Starp citu, mēs piesakāmies tieÅ”i stdout Šø stderr - Gaisa plÅ«sma visu pārtvers, skaisti iesaiņos, sadalÄ«s, kur nepiecieÅ”ams.)

Paskatīsimies, kas mums ir commons/hooks.py. Pirmā faila daļa ar paŔu āķi:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Es pat nezinu, ko Ŕeit paskaidrot, es tikai atzīmēŔu svarīgos punktus:

  • Mēs mantojam, domājam par argumentiem - vairumā gadÄ«jumu tas bÅ«s viens: conn_id;
  • Standartmetožu ignorÄ“Å”ana: es sevi ierobežoju get_conn(), kurā es iegÅ«stu savienojuma parametrus pēc nosaukuma un vienkārÅ”i iegÅ«stu sadaļu extra (tas ir JSON lauks), kurā es (saskaņā ar saviem norādÄ«jumiem!) ievietoju Telegram robota pilnvaru: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Es izveidoju mÅ«su piemēru TelegramBot, pieŔķirot tam Ä«paÅ”u pilnvaru.

Tas ir viss. Jūs varat iegūt klientu no āķa, izmantojot TelegramBotHook().clent vai TelegramBotHook().get_conn().

Un faila otrā daļa, kurā es izveidoju mikroiesaiņojumu Telegram REST API, lai nevilktu to paÅ”u python-telegram-bot vienai metodei sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Pareizais veids ir to visu saskaitīt: TelegramBotSendMessage, TelegramBotHook, TelegramBot - spraudnī ievietojiet publiskā repozitorijā un nododiet to atvērtajam pirmkodam.

Kamēr mēs to visu pētÄ«jām, mÅ«su pārskatu atjauninājumi veiksmÄ«gi neizdevās un man kanālā nosÅ«tÄ«ja kļūdas ziņojumu. Es ieÅ”u pārbaudÄ«t, vai tas nav kārtÄ«bā...

Apache Airflow: ETL atviegloŔana
MÅ«su dogā kaut kas salÅ«za! Vai tas nav tas, ko mēs gaidÄ«jām? tieÅ”i tā!

Vai tu taisies liet?

Vai tev liekas, ka es kaut ko palaidu garām? Å Ä·iet, ka viņŔ solÄ«ja pārsÅ«tÄ«t datus no SQL servera uz Vertica un tad ņēma un aizgāja no tēmas, nelietis!

Å Ä« zvērÄ«ba bija tÄ«Å”a, man vienkārÅ”i bija jāatÅ”ifrē kāda terminoloÄ£ija jÅ«su vietā. Tagad jÅ«s varat doties tālāk.

Mūsu plāns bija Ŕāds:

  1. Do dag
  2. Ģenerējiet uzdevumus
  3. Redziet, cik viss ir skaisti
  4. PieŔķiriet aizpildījumam sesijas numurus
  5. Iegūstiet datus no SQL Server
  6. Ievietojiet datus Vertica
  7. Savākt statistiku

Tāpēc, lai tas viss sāktu darboties, es veicu nelielu papildinājumu mūsu docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Tur mēs paaugstinām:

  • Vertica kā saimniekdators dwh ar visvairāk noklusējuma iestatÄ«jumiem,
  • trÄ«s SQL Server gadÄ«jumi,
  • mēs aizpildām pēdējās esoŔās datu bāzes ar dažiem datiem (nekādā gadÄ«jumā neieskatieties mssql_init.py!)

Mēs palaižam visu labo, izmantojot nedaudz sarežģītāku komandu nekā pagājuÅ”ajā reizē:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Varat izmantot Å”o vienumu, ko Ä£enerēja mÅ«su brÄ«numu nejauÅ”inātājs Data Profiling/Ad Hoc Query:

Apache Airflow: ETL atviegloŔana
Galvenais to nerādīt analītiķiem

sÄ«kāk izstrādāt ETL sesijas Es nedarÄ«Å”u, tur viss ir triviāli: mēs izveidojam pamatni, tajā ir zÄ«me, mēs visu aptinam ar konteksta pārvaldnieku, un tagad mēs darām Ŕādi:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Laiks ir pienācis apkopot mūsu datus no mūsu pusotra simta galdiņiem. Darīsim to ar ļoti nepretenciozu līniju palīdzību:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Ar āķa palīdzību tiekam no Airflow pymssql- savienot
  2. Aizstāsim pieprasÄ«jumā ierobežojumu datuma veidā ā€” to funkcijā iemetÄ«s veidnes dzinējs.
  3. Tiek izpildÄ«ts mÅ«su pieprasÄ«jums pandaskurÅ” mÅ«s dabÅ«s DataFrame - tas mums noderēs nākotnē.

Es izmantoju aizstāŔanu {dt} pieprasÄ«juma parametra vietā %s nevis tāpēc, ka es bÅ«tu ļauns Pinokio, bet gan tāpēc pandas nevar tikt galā pymssql un paslÄ«d pēdējo params: Listlai gan viņŔ ļoti vēlas tuple.
Ņemiet vērā arī to, ka izstrādātājs pymssql nolēma viņu vairs neatbalstīt, un ir pienācis laiks izvākties pyodbc.

Apskatīsim, ar ko Airflow papildināja mūsu funkciju argumentus:

Apache Airflow: ETL atviegloŔana

Ja nav datu, tad nav jēgas turpināt. Bet ir arī dīvaini uzskatīt pildījumu par veiksmīgu. Bet tā nav kļūda. A-ah-ah, ko darīt?! Un, lūk, kas:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException pateiks Airflow, ka kļūdu nav, bet mēs izlaižam uzdevumu. Interfeisam bÅ«s nevis zaļŔ vai sarkans kvadrāts, bet gan rozā.

Izmetīsim savus datus vairākas kolonnas:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Proti

  • Datubāze, no kuras mēs saņēmām pasÅ«tÄ«jumus,
  • MÅ«su plÅ«du sesijas ID (tas bÅ«s atŔķirÄ«gs katram uzdevumam),
  • Hash no avota un pasÅ«tÄ«juma ID - lai gala datu bāzē (kur viss ir saliets vienā tabulā) mums bÅ«tu unikāls pasÅ«tÄ«juma ID.

Atliek priekÅ”pēdējais solis: ielej visu Vertikā. Un, dÄ«vainā kārtā, viens no iespaidÄ«gākajiem un efektÄ«vākajiem veidiem, kā to izdarÄ«t, ir CSV fails!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Mēs izgatavojam Ä«paÅ”u uztvērēju StringIO.
  2. pandas laipni ieliks mūsu DataFrame formā CSV- līnijas.
  3. Atvērsim savienojumu ar mūsu iecienīto Vertica ar āķi.
  4. Un tagad ar palīdzību copy() nosūtiet mūsu datus tieŔi Vertika!

Mēs paņemsim no vadītāja, cik rindu ir aizpildītas, un pateiksim sesijas vadītājam, ka viss ir kārtībā:

session.loaded_rows = cursor.rowcount
session.successful = True

Tas ir viss.

PārdoÅ”anā mēs manuāli izveidojam mērÄ·a plāksni. Å eit es atļāvu sev nelielu maŔīnu:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

es lietoju VerticaOperator() Es izveidoju datu bāzes shēmu un tabulu (ja tādas vēl nav, protams). Galvenais ir pareizi sakārtot atkarības:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Apkopojot

- Nu, - sacīja mazā pele, - vai ne, tagad
Vai esat pārliecināts, ka esmu visbriesmīgākais dzīvnieks mežā?

Džūlija Donaldsone, Grufalo

Es domāju, ja man un maniem kolēģiem bÅ«tu konkurss: kurÅ” ātri izveidos un sāks ETL procesu no nulles: viņi ar savu SSIS un peli un es ar Airflow ... Un tad mēs arÄ« salÄ«dzinātu apkopes vieglumu ... Oho, es domāju, ka piekritÄ«siet, ka es viņus pārspÄ“Å”u visās frontēs!

Ja nedaudz nopietnāk, tad Apache Airflow - aprakstot procesus programmas koda veidā - izdarīja manu darbu daudz ērtāk un patīkamāk.

Tā neierobežotā paplaÅ”ināmÄ«ba gan spraudņu, gan mērogojamÄ«bas ziņā sniedz iespēju izmantot Airflow gandrÄ«z jebkurā jomā: pat pilnā datu vākÅ”anas, sagatavoÅ”anas un apstrādes ciklā, pat palaižot raÄ·etes (uz Marsu, no kurss).

Daļas noslēgums, atsauce un informācija

Grābeklis, ko esam jums savākuŔi

  • start_date. Jā, Ŕī jau ir vietēja mēma. Via Doug galvenais arguments start_date viss pāriet. ÄŖsumā, ja norādāt start_date paÅ”reizējais datums un schedule_interval - kādu dienu, tad DAG sāksies rÄ«t ne agrāk.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Un vairs nekādu problēmu.

    Ar to ir saistīta cita izpildlaika kļūda: Task is missing the start_date parameter, kas visbiežāk norāda, ka esat aizmirsis saistīt ar dag operatoru.

  • Viss vienā maŔīnā. Jā, un bāzes (pati Airflow un mÅ«su pārklājums), un tÄ«mekļa serveris, un plānotājs, un darbinieki. Un tas pat strādāja. Bet laika gaitā pakalpojumu uzdevumu skaits pieauga, un, kad PostgreSQL sāka reaģēt uz indeksu 20 s, nevis 5 ms, mēs to paņēmām un aiznesām.
  • Vietējais izpildÄ«tājs. Jā, mēs joprojām uz tā sēžam, un jau esam nonākuÅ”i bezdibeņa malā. Ar LocalExecutor mums lÄ«dz Å”im ir bijis pietiekami, bet tagad ir pienācis laiks paplaÅ”ināties ar vismaz vienu darbinieku, un mums bÅ«s smagi jāstrādā, lai pārietu uz CeleryExecutor. Un, ņemot vērā to, ka jÅ«s varat strādāt ar to vienā maŔīnā, nekas neliedz jums izmantot Selery pat serverÄ«, kas, "protams, nekad nenonāks ražoÅ”anā, godÄ«gi sakot!"
  • NelietoÅ”ana iebÅ«vētie instrumenti:
    • savienojumi uzglabāt pakalpojumu akreditācijas datus,
    • SLA Miss reaģēt uz uzdevumiem, kas nav izdevies laikā,
    • xcom metadatu apmaiņai (es teicu mērÄ·isdati!) starp dag uzdevumiem.
  • Pasta ļaunprātÄ«ga izmantoÅ”ana. Nu ko es varu teikt? Par visiem krituÅ”o uzdevumu atkārtojumiem tika iestatÄ«ti brÄ«dinājumi. Tagad manā darba pakalpojumā Gmail ir vairāk nekā 90 100 e-pasta ziņojumu no Airflow, un tÄ«mekļa pasta uzgalis atsakās vienlaikus uztvert un dzēst vairāk nekā XNUMX.

Vairāk kļūmju: Apache Airflow Pitfails

Vairāk automatizācijas rīku

Lai mēs vēl vairāk strādātu ar galvu, nevis ar rokām, Airflow mums ir sagatavojis sekojoÅ”o:

  • REST API - viņam joprojām ir Eksperimentāla statuss, kas viņam netraucē strādāt. Ar to jÅ«s varat ne tikai iegÅ«t informāciju par dagiem un uzdevumiem, bet arÄ« apturēt/sākt dag, izveidot DAG Run vai pÅ«lu.
  • CLI - Komandrindā ir pieejami daudzi rÄ«ki, kurus ir ne tikai neērti lietot, izmantojot WebUI, bet arÄ« parasti to nav. Piemēram:
    • backfill nepiecieÅ”ams, lai restartētu uzdevumu gadÄ«jumus.
      Piemēram, atnāca analÄ«tiÄ·i un teica: ā€œUn jums, biedri, ir muļķības datos no 1. lÄ«dz 13. janvārim! Labojiet, labojiet, labojiet, labojiet!" Un tu esi tāda plÄ«ts virsma:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Bāzes pakalpojums: initdb, resetdb, upgradedb, checkdb.
    • run, kas ļauj izpildÄ«t vienu gadÄ«jumu uzdevumu un pat iegÅ«t punktus par visām atkarÄ«bām. Turklāt jÅ«s varat to palaist, izmantojot LocalExecutor, pat ja jums ir seleriju kopa.
    • Dara gandrÄ«z to paÅ”u test, tikai arÄ« bāzēs neko neraksta.
    • connections ļauj masveidā izveidot savienojumus no čaulas.
  • python api - diezgan stingrs mijiedarbÄ«bas veids, kas paredzēts spraudņiem, nevis spieto tajā ar mazām rociņām. Bet kas mums liedz iet /home/airflow/dags, palaist ipython un sākt jaukties? Varat, piemēram, eksportēt visus savienojumus ar Ŕādu kodu:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Savienojuma izveide ar Airflow metadatu bāzi. Es neiesaku tai rakstÄ«t, taču uzdevumu stāvokļu iegÅ«Å”ana dažādiem specifiskiem rādÄ«tājiem var bÅ«t daudz ātrāka un vienkārŔāka nekā ar kādu no API.

    Pieņemsim, ka ne visi mūsu uzdevumi ir idempotenti, bet dažreiz tie var nokrist, un tas ir normāli. Bet daži aizsprostojumi jau ir aizdomīgi, un tas būtu jāpārbauda.

    Uzmanieties no SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

atsauces

Un, protams, pirmās desmit saites no Google izdoÅ”anas ir mapes Airflow saturs no manām grāmatzÄ«mēm.

Un rakstā izmantotās saites:

Avots: www.habr.com