Apache Airflow: Nagħmlu ETL aktar faċli

Hi, jien Dmitry Logvinenko - Inġinier tad-Data tad-Dipartiment tal-Analitika tal-grupp ta 'kumpaniji Vezet.

Jien ngħidlek dwar għodda mill-isbaħ għall-iżvilupp ta 'proċessi ETL - Apache Airflow. Iżda Airflow huwa tant versatili u multidimensjonali li għandek tagħti ħarsa aktar mill-qrib lejha anki jekk m'intix involut fi flussi tad-dejta, iżda jkollok bżonn li perjodikament tniedi kwalunkwe proċess u tissorvelja l-eżekuzzjoni tagħhom.

U iva, mhux biss ngħid, imma nuri wkoll: il-programm għandu ħafna kodiċi, screenshots u rakkomandazzjonijiet.

Apache Airflow: Nagħmlu ETL aktar faċli
Dak li s-soltu tara meta tpoġġi fuq Google l-kelma Airflow / Wikimedia Commons

Tabella tal-kontenut

Introduzzjoni

Apache Airflow huwa bħal Django:

  • miktuba bil-python
  • hemm panel amministrattiv kbir,
  • tespandi indefinittivament

- aħjar biss, u sar għal skopijiet kompletament differenti, jiġifieri (kif miktub qabel il-kata):

  • it-tmexxija u l-monitoraġġ tal-kompiti fuq numru illimitat ta 'magni (kif ħafna Karfus / Kubernetes u l-kuxjenza tiegħek jippermettulek)
  • bil-ġenerazzjoni tal-fluss tax-xogħol dinamiku minn faċli ħafna biex tikteb u tifhem il-kodiċi Python
  • u l-abbiltà li tikkonnettja kwalunkwe databases u APIs ma 'xulxin billi tuża kemm komponenti lesti kif ukoll plugins magħmulin mid-dar (li huwa estremament sempliċi).

Aħna nużaw Apache Airflow bħal dan:

  • aħna niġbru data minn sorsi varji (ħafna istanzi ta 'SQL Server u PostgreSQL, diversi APIs b'metriċi ta' applikazzjoni, anke 1C) f'DWH u ODS (għandna Vertica u Clickhouse).
  • kemm avvanzat cron, li jibda l-proċessi ta' konsolidazzjoni tad-dejta fuq l-ODS, u jimmonitorja wkoll il-manutenzjoni tagħhom.

Sa ftit ilu, il-bżonnijiet tagħna kienu koperti minn server żgħir wieħed bi 32 core u 50 GB RAM. Fl-Airflow, dan jaħdem:

  • aktar 200 dag (fil-fatt flussi tax-xogħol, li fihom nimtlew kompiti),
  • f'kull medja 70 biċċa xogħol,
  • din it-tjubija tibda (ukoll bħala medja) darba fis-siegħa.

U dwar kif espanna, se nikteb hawn taħt, imma issa ejja niddefinixxu l-über-problema li se nsolvu:

Hemm tliet SQL Servers oriġinali, kull wieħed b'50 database - każijiet ta 'proġett wieħed, rispettivament, għandhom l-istess struttura (kważi kullimkien, mua-ha-ha), li jfisser li kull wieħed għandu tabella tal-Ordnijiet (fortunatament, tabella b'dik isem jista 'jiġi push fi kwalunkwe negozju). Aħna nieħdu d-dejta billi nżidu oqsma tas-servizz (server tas-sors, database tas-sors, ID tal-kompitu ETL) u b'mod naiv nitfgħuhom, ngħidu aħna, Vertica.

Ejja ħa mmorru!

Il-parti prinċipali, prattika (u ftit teoretika)

Għaliex aħna (u int)

Meta s-siġar kienu kbar u kont sempliċi SQL-schik f'bejgħ bl-imnut Russu wieħed, aħna scammed proċessi ETL aka flussi tad-dejta bl-użu ta 'żewġ għodod disponibbli għalina:

  • Ċentru tal-Enerġija tal-Informatika - sistema li tinfirex ħafna, estremament produttiva, bil-ħardwer tagħha stess, il-verżjoni tagħha stess. Jien użajt Alla jipprojbixxi 1% tal-kapaċitajiet tiegħu. Għaliex? Ukoll, l-ewwelnett, din l-interface, x'imkien mis-snin 380, mentalment poġġiet pressjoni fuqna. It-tieni nett, dan il-kontraption huwa ddisinjat għal proċessi estremament fancy, użu mill-ġdid ta 'komponenti furious u tricks oħra importanti ħafna ta' intrapriża. Dwar il-fatt li tiswa, bħall-ġwienaħ tal-Airbus AXNUMX / sena, mhux se ngħidu xejn.

    Oqgħod attent, screenshot jista 'jweġġa' ftit nies taħt it-30

    Apache Airflow: Nagħmlu ETL aktar faċli

  • SQL Server Integrazzjoni Server - użajna dan il-kompa fil-flussi intra-proġett tagħna. Ukoll, fil-fatt: aħna diġà nużaw SQL Server, u jkun b'xi mod irraġonevoli li ma tużax l-għodod ETL tagħha. Kollox fih huwa tajjeb: kemm l-interface hija sabiħa, kif ukoll ir-rapporti tal-progress ... Iżda dan mhux għaliex inħobbu l-prodotti tas-softwer, oh, mhux għal dan. Verżjoni dan dtsx (li huwa XML b'nodes shuffled fuq issalva) nistgħu, imma x'inhu l-punt? Kif dwar li tagħmel pakkett ta 'kompitu li se jkaxkru mijiet ta' tabelli minn server għal ieħor? Iva, liema mija, subgħajk l-indiċi se taqa 'minn għoxrin biċċa, tikklikkja fuq il-buttuna tal-maws. Imma żgur tidher aktar moda:

    Apache Airflow: Nagħmlu ETL aktar faċli

Żgur li fittixna modi ta’ ħruġ. Kawża anke kważi wasal għal ġeneratur tal-pakkett SSIS li nkiteb waħdu...

…u mbagħad sabni impjieg ġdid. U Apache Airflow qabeżni fuqha.

Meta sibt li d-deskrizzjonijiet tal-proċess ETL huma kodiċi Python sempliċi, jien biss ma żfinx għall-ferħ. Dan huwa kif il-flussi tad-dejta ġew verżjoni u differenti, u t-tferrigħ ta 'tabelli bi struttura waħda minn mijiet ta' databases f'mira waħda saret kwistjoni ta 'kodiċi Python fi skrins wieħed u nofs jew żewġ 13 ".

Assemblaġġ tal-cluster

Ejja ma nirranġawx kindergarten kompletament, u ma nitkellmux dwar affarijiet kompletament ovvji hawn, bħall-installazzjoni tal-Airflow, id-database magħżula tiegħek, Karfus u każijiet oħra deskritti fil-baċiri.

Sabiex inkunu nistgħu immedjatament nibdew esperimenti, abbozzajt docker-compose.yml F'liema:

  • Ejja fil-fatt ngħollu Fluss tal-arja: Scheduler, Webserver. Fjura se tkun qed iddur hemm ukoll biex tissorvelja l-kompiti tal-karfus (għax diġà ġiet imbuttata fiha apache/airflow:1.10.10-python3.7, imma ma niddejqux)
  • PostgreSQL, li fiha Airflow se tikteb l-informazzjoni tas-servizz tagħha (dejta tal-iskeduler, statistika tal-eżekuzzjoni, eċċ.), u Karfus se jimmarka l-kompiti lesti;
  • Ddistribwit mill-, li se jaġixxi bħala sensar tal-kompitu għal Karfus;
  • Ħaddiem tal-karfus, li se jkunu involuti fl-eżekuzzjoni diretta tal-kompiti.
  • Biex folder ./dags se nżidu l-fajls tagħna bid-deskrizzjoni tad-dags. Dawn se jinġabru fuq il-fly, u għalhekk m'hemmx għalfejn juggle mal-munzell kollu wara kull għatis.

F'xi postijiet, il-kodiċi fl-eżempji ma jintwerax kompletament (sabiex ma jħarrekx it-test), iżda x'imkien jiġi modifikat fil-proċess. Eżempji kompluti tal-kodiċi tax-xogħol jistgħu jinstabu fir-repożitorju https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Noti:

  • Fl-assemblaġġ tal-kompożizzjoni, kont invokat l-aktar fuq l-immaġni magħrufa puckel/docker-airflow - kun żgur li tiċċekkjaha. Forsi m’għandek bżonn xi ħaġa oħra f’ħajtek.
  • Is-settings kollha Airflow huma disponibbli mhux biss permezz airflow.cfg, iżda wkoll permezz ta 'varjabbli ambjentali (grazzi għall-iżviluppaturi), li b'mod malizzjuż ħadt vantaġġ minnhom.
  • Naturalment, mhuwiex lest għall-produzzjoni: ma poġġiex deliberatament taħbit tal-qalb fuq kontenituri, ma ddejjaqtx bis-sigurtà. Imma għamilt il-minimu adattat għall-esperimentaturi tagħna.
  • Innota li:
    • Il-folder tad-dag għandu jkun aċċessibbli kemm għall-iskedatur kif ukoll għall-ħaddiema.
    • L-istess japplika għal-libreriji kollha ta 'partijiet terzi - għandhom kollha jiġu installati fuq magni bi scheduler u ħaddiema.

Ukoll, issa huwa sempliċi:

$ docker-compose up --scale worker=3

Wara li kollox jogħla, tista 'tħares lejn l-interfaces tal-web:

Kunċetti bażiċi

Jekk ma fhimt xejn f’dawn id-“dags”, allura hawn dizzjunarju qasir:

  • skedar - l-iktar ziju importanti fl-Airflow, li jikkontrolla li r-robots jaħdmu iebes, u mhux persuna: jimmonitorja l-iskeda, jaġġorna d-dags, iniedi l-kompiti.

    B'mod ġenerali, f'verżjonijiet anzjani, kellu problemi bil-memorja (le, mhux amnesija, iżda tnixxijiet) u l-parametru tal-legat saħansitra baqa 'fil-konfigurazzjonijiet run_duration — l-intervall mill-ġdid tiegħu. Imma issa kollox tajjeb.

  • DAG (magħruf ukoll bħala "dag") - "graff aċikliku dirett", iżda definizzjoni bħal din tgħid ftit nies, iżda fil-fatt hija kontenitur għal kompiti li jinteraġixxu ma 'xulxin (ara hawn taħt) jew analogu ta' Pakkett f'SSIS u Workflow f'Informatica .

    Minbarra d-dags, xorta jista 'jkun hemm subdags, iżda x'aktarx mhux se naslu għalihom.

  • DAG Run - dag inizjalizzat, li huwa assenjat tiegħu stess execution_date. Dagrans ta 'l-istess dag jistgħu jaħdmu b'mod parallel (jekk għamilt il-kompiti tiegħek idempotent, ovvjament).
  • operatur huma biċċiet ta' kodiċi responsabbli għat-twettiq ta' azzjoni speċifika. Hemm tliet tipi ta’ operaturi:
    • azzjonibħall-favorit tagħna PythonOperator, li jista' jesegwixxi kwalunkwe kodiċi Python (validu);
    • trasferiment, li jittrasportaw data minn post għal post, ngħidu aħna, MsSqlToHiveTransfer;
    • senser min-naħa l-oħra, se jippermettilek tirreaġixxi jew tnaqqas il-mod l-eżekuzzjoni ulterjuri tad-dag sakemm iseħħ avveniment. HttpSensor jista 'jiġbed l-endpoint speċifikat, u meta r-rispons mixtieq ikun qed jistenna, ibda t-trasferiment GoogleCloudStorageToS3Operator. Moħħ kurżjuż jistaqsi: “għaliex? Wara kollox, tista’ tagħmel repetizzjonijiet eżatt fl-operatur!” U mbagħad, sabiex ma jinstaddux il-ġabra ta 'kompiti ma' operaturi sospiżi. Is-sensor jibda, jiċċekkja u jmut qabel l-attentat li jmiss.
  • Segretarjali u klerikali. - operaturi ddikjarati, irrispettivament mit-tip, u mehmuża mad-dag jiġu promossi għall-grad ta' kompitu.
  • eżempju tal-kompitu - meta l-pjanifikatur ġenerali ddeċieda li kien wasal iż-żmien li jintbagħtu l-kompiti fil-battalja fuq l-artisti-ħaddiema (eżatt fuq il-post, jekk nużaw LocalExecutor jew għal node remot fil-każ ta CeleryExecutor), jassenja lilhom kuntest (jiġifieri, sett ta 'varjabbli - parametri ta' eżekuzzjoni), jespandi mudelli ta 'kmand jew mistoqsija, u jiġborhom.

Aħna niġġeneraw kompiti

L-ewwel, ejja tiddeskrivi l-iskema ġenerali ta 'doug tagħna, u mbagħad aħna se adsa fid-dettalji aktar u aktar, minħabba li napplikaw xi soluzzjonijiet mhux trivjali.

Allura, fil-forma l-aktar sempliċi tagħha, tali dag se tidher bħal din:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Ejja nsemmu:

  • L-ewwel, aħna jimportaw il-libs meħtieġa u xi haga ohra;
  • sql_server_ds - dan hu List[namedtuple[str, str]] bl-ismijiet tal-konnessjonijiet minn Airflow Connections u d-databases li minnhom se nieħdu l-platt tagħna;
  • dag - it-tħabbira tad-dag tagħna, li bilfors irid ikun fi globals(), inkella Airflow ma ssibhiex. Doug jeħtieġ ukoll jgħid:
    • X'jismu orders - dan l-isem imbagħad jidher fl-interface tal-web,
    • li se jaħdem minn nofs il-lejl fit-tmienja ta’ Lulju,
    • u għandha taħdem, bejn wieħed u ieħor kull 6 sigħat (għal guys iebsa hawn minflok timedelta() permissibbli cron-linja 0 0 0/6 ? * * *, għall-inqas jibred - espressjoni simili @daily);
  • workflow() se tagħmel ix-xogħol ewlieni, iżda mhux issa. Għalissa, aħna ser biss dump kuntest tagħna fil-log.
  • U issa l-maġija sempliċi tal-ħolqien tal-kompiti:
    • aħna għaddejjin minn sorsi tagħna;
    • initialize PythonOperator, li se jesegwixxi l-manikin tagħna workflow(). Tinsiex li tispeċifika isem uniku (fi ħdan id-dag) tal-kompitu u torbot id-dag innifsu. Bandiera provide_context imbagħad, se pour argumenti addizzjonali fil-funzjoni, li aħna se jiġbru bir-reqqa bl-użu **context.

Għalissa, dak kollu. Dak li ksibna:

  • dag ġdid fl-interface tal-web,
  • mitt u nofs xogħol li se jiġu esegwiti b'mod parallel (jekk il-Fluss tal-Ajru, is-settings tal-Kfus u l-kapaċità tas-server jippermettu dan).

Ukoll, kważi ltqajna.

Apache Airflow: Nagħmlu ETL aktar faċli
Min se jinstalla d-dipendenzi?

Biex tissimplifika din il-ħaġa kollha, daħħalt docker-compose.yml ipproċessar requirements.txt fuq in-nodi kollha.

Issa marret:

Apache Airflow: Nagħmlu ETL aktar faċli

Il-kwadri griżi huma każijiet ta' xogħol ipproċessati mill-iskedar.

Nistennew ftit, il-kompiti jinqabdu mill-ħaddiema:

Apache Airflow: Nagħmlu ETL aktar faċli

Dawk ħodor, ovvjament, temmew b'suċċess ix-xogħol tagħhom. Il-Ħomor ma tantx għandhom suċċess.

Mill-mod, m'hemm l-ebda folder fuq il-prod tagħna ./dags, m'hemm l-ebda sinkronizzazzjoni bejn il-magni - id-dags kollha jinsabu git fuq Gitlab tagħna, u Gitlab CI jiddistribwixxi aġġornamenti lill-magni meta jingħaqdu master.

Ftit dwar Fjura

Waqt li l-ħaddiema qed jaqtgħu l-paċifikati tagħna, ejja niftakru għodda oħra li tista’ turina xi ħaġa – Fjura.

L-ewwel paġna b'informazzjoni fil-qosor dwar in-nodi tal-ħaddiema:

Apache Airflow: Nagħmlu ETL aktar faċli

L-aktar paġna intensa b'kompiti li marru jaħdmu:

Apache Airflow: Nagħmlu ETL aktar faċli

L-aktar paġna boring bl-istatus tas-sensar tagħna:

Apache Airflow: Nagħmlu ETL aktar faċli

L-isbaħ paġna hija bil-grafiċi tal-istatus tal-kompitu u l-ħin tal-eżekuzzjoni tagħhom:

Apache Airflow: Nagħmlu ETL aktar faċli

Aħna tagħbija l-mgħabija

Allura, il-kompiti kollha ħadmu, tista 'twettaq il-midruba.

Apache Airflow: Nagħmlu ETL aktar faċli

U kien hemm ħafna midruba - għal xi raġuni jew oħra. Fil-każ tal-użu korrett tal-Airflow, dawn il-kwadri stess jindikaw li d-dejta żgur ma waslitx.

Ikollok bżonn tara l-log u terġa 'tibda l-istanzi tal-kompitu waqgħu.

Billi tikklikkja fuq kwalunkwe kwadru, naraw l-azzjonijiet disponibbli għalina:

Apache Airflow: Nagħmlu ETL aktar faċli

Tista 'tieħu u tagħmel ċari l-waqa'. Jiġifieri, ninsew li xi ħaġa falliet hemmhekk, u l-istess kompitu ta 'istanza se jmur għand l-iskedar.

Apache Airflow: Nagħmlu ETL aktar faċli

Huwa ċar li tagħmel dan bil-maws bil-kwadri ħomor kollha mhux uman ħafna - dan mhux dak li nistennew mill-Airflow. Naturalment, għandna armi tal-qerda tal-massa: Browse/Task Instances

Apache Airflow: Nagħmlu ETL aktar faċli

Ejja nagħżlu kollox f'daqqa u reset għal żero, ikklikkja l-oġġett it-tajjeb:

Apache Airflow: Nagħmlu ETL aktar faċli

Wara t-tindif, it-taxis tagħna jidhru bħal dawn (diġà qed jistennew li l-iskedatur jiskedahom):

Apache Airflow: Nagħmlu ETL aktar faċli

Konnessjonijiet, ganċijiet u varjabbli oħra

Wasal iż-żmien li nħarsu lejn id-DAG li jmiss, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Kulħadd qatt għamel aġġornament tar-rapport? Din hija għal darb'oħra tagħha: hemm lista ta 'sorsi minn fejn tikseb id-data; hemm lista fejn tpoġġi; ma ninsewx li tfaċċar meta kollox ġara jew inkiser (tajjeb, dan mhux dwarna, le).

Ejja nerġgħu ngħaddu mill-fajl u nħarsu lejn l-affarijiet oskurati ġodda:

  • from commons.operators import TelegramBotSendMessage - xejn ma jipprevjeni milli nagħmlu l-operaturi tagħna stess, li ħadna vantaġġ minnhom billi għamilna wrapper żgħir biex nibagħtu messaġġi lil Unblocked. (Se nitkellmu aktar dwar dan l-operatur hawn taħt);
  • default_args={} - dag tista' tqassam l-istess argumenti lill-operaturi kollha tagħha;
  • to='{{ var.value.all_the_kings_men }}' - qasam to mhux se jkollna hardcoded, iżda ġenerati b'mod dinamiku bl-użu ta 'Jinja u varjabbli b'lista ta' emails, li nressaq bir-reqqa Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — il-kondizzjoni biex jibda l-operatur. Fil-każ tagħna, l-ittra se ttir lejn il-pumijiet biss jekk id-dipendenzi kollha jkunu ħadmu b'suċċess;
  • tg_bot_conn_id='tg_main' - argumenti conn_id jaċċettaw IDs tal-konnessjoni li noħolqu fihom Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - messaġġi f'Telegram se jtiru biss jekk ikun hemm kompiti waqgħu;
  • task_concurrency=1 - nipprojbixxu t-tnedija simultanja ta' diversi każijiet ta' kompitu ta' kompitu wieħed. Inkella, aħna se tikseb it-tnedija simultanja ta 'diversi VerticaOperator (tħares lejn mejda waħda);
  • report_update >> [email, tg] - kollha VerticaOperator jikkonverġu biex jibagħtu ittri u messaġġi, bħal dan:
    Apache Airflow: Nagħmlu ETL aktar faċli

    Iżda peress li l-operaturi tan-notifikanti għandhom kundizzjonijiet ta’ tnedija differenti, wieħed biss jaħdem. Fit-Tree View, kollox jidher ftit inqas viżwali:
    Apache Airflow: Nagħmlu ETL aktar faċli

Se ngħid ftit kliem dwar makro u sħabhom - varjabbli.

Macros huma placeholders Jinja li jistgħu jissostitwixxu informazzjoni utli varji fl-argumenti tal-operatur. Per eżempju, bħal dan:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} se jespandi għall-kontenut tal-varjabbli tal-kuntest execution_date fil-format YYYY-MM-DD: 2020-07-14. L-aħjar parti hija li l-varjabbli tal-kuntest huma nailed għal eżempju ta 'kompitu speċifiku (kwadru fit-Tree View), u meta jerġgħu jibdew, il-placeholders se jespandu għall-istess valuri.

Il-valuri assenjati jistgħu jitqiesu billi tuża l-buttuna Rendered fuq kull istanza tal-kompitu. Dan huwa kif il-kompitu li tibgħat ittra:

Apache Airflow: Nagħmlu ETL aktar faċli

U għalhekk fil-kompitu li tibgħat messaġġ:

Apache Airflow: Nagħmlu ETL aktar faċli

Lista kompluta ta' macros inkorporati għall-aħħar verżjoni disponibbli hija disponibbli hawn: referenza macros

Barra minn hekk, bl-għajnuna ta 'plugins, nistgħu niddikjaraw macros tagħna stess, iżda dik hija storja oħra.

Minbarra l-affarijiet predefiniti, nistgħu nissostitwixxu l-valuri tal-varjabbli tagħna (diġà użajt dan fil-kodiċi ta 'hawn fuq). Ejja noħolqu ġewwa Admin/Variables ftit affarijiet:

Apache Airflow: Nagħmlu ETL aktar faċli

Dak kollu li tista' tuża:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Il-valur jista 'jkun skalar, jew jista' jkun ukoll JSON. Fil-każ ta' JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

uża biss it-triq għaċ-ċavetta mixtieqa: {{ var.json.bot_config.bot.token }}.

Se litteralment ngħid kelma waħda u nuri screenshot wieħed dwar konnessjonijiet. Kollox huwa elementari hawn: fuq il-paġna Admin/Connections noħolqu konnessjoni, żid il-logins / passwords tagħna u parametri aktar speċifiċi hemmhekk. Bħal dan:

Apache Airflow: Nagħmlu ETL aktar faċli

Il-passwords jistgħu jiġu encrypted (aktar bir-reqqa mill-default), jew tista 'tħalli barra t-tip ta' konnessjoni (kif għamilt għal tg_main) - il-fatt hu li l-lista tat-tipi hija wajerjata fil-mudelli Airflow u ma tistax tiġi estiża mingħajr ma tidħol fil-kodiċijiet tas-sors (jekk f'daqqa waħda ma kontx google xi ħaġa, jekk jogħġbok ikkoreġini), iżda xejn ma jwaqqafna milli niksbu krediti biss billi isem.

Tista 'wkoll tagħmel diversi konnessjonijiet bl-istess isem: f'dan il-każ, il-metodu BaseHook.get_connection(), li jġibna konnessjonijiet bl-isem, se jagħti addoċċ minn diversi namesakes (ikun aktar loġiku li tagħmel Round Robin, iżda ejja nħallu fuq il-kuxjenza ta 'l-iżviluppaturi Airflow).

Varjabbli u Konnessjonijiet huma ċertament għodod friski, iżda huwa importanti li ma titlifx il-bilanċ: liema partijiet tal-flussi tiegħek taħżen fil-kodiċi innifsu, u liema partijiet tagħti lil Airflow għall-ħażna. Min-naħa waħda, jista 'jkun konvenjenti li jinbidel malajr il-valur, pereżempju, kaxxa postali, permezz tal-UI. Min-naħa l-oħra, dan għadu ritorn għall-klikk tal-maws, li minnha ridna (jien) neħilsu.

Il-ħidma b'konnessjonijiet hija waħda mill-kompiti ganċijiet. B'mod ġenerali, il-ganċijiet tal-Fluss tal-Ajru huma punti għall-konnessjoni ma 'servizzi u libreriji ta' partijiet terzi. Eż., JiraHook se tiftaħ klijent għalina biex jinteraġixxu ma 'Jira (tista' ċċaqlaq il-kompiti 'l quddiem u 'l quddiem), u bl-għajnuna ta' SambaHook tista 'timbotta fajl lokali biex smb-punt.

Parsing tal-operatur tad-dwana

U sirna viċin li nħarsu lejn kif issir TelegramBotSendMessage

Kodiċi commons/operators.py mal-operatur attwali:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Hawnhekk, bħal kull ħaġa oħra fl-Airflow, kollox huwa sempliċi ħafna:

  • Wiret minn BaseOperator, li timplimenta pjuttost ftit affarijiet speċifiċi għall-fluss tal-arja (ħares lejn il-ħin liberu tiegħek)
  • Oqsma ddikjarati template_fields, li fih Jinja se tfittex macros biex tipproċessa.
  • Irranġat l-argumenti t-tajba għal __init__(), issettja l-inadempjenzi fejn meħtieġ.
  • Ma ninsewx dwar l-inizjalizzazzjoni tal-antenat lanqas.
  • Infetaħ il-ganċ korrispondenti TelegramBotHookirċieva oġġett klijent mingħandu.
  • Metodu overridden (definit mill-ġdid). BaseOperator.execute(), li Airfow se twitch meta jasal iż-żmien li tniedi l-operatur - fiha se nimplimentaw l-azzjoni ewlenija, u ninsew li tidħol. (Aħna nilloggjaw, bil-mod, eżatt stdout и stderr - Il-fluss ta 'l-arja se jinterċetta kollox, ikebbeb sewwa, jiddekomponih fejn meħtieġ.)

Ejja naraw x'għandna commons/hooks.py. L-ewwel parti tal-fajl, bil-ganċ innifsu:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Lanqas naf x'għandi nispjega hawn, ser ninnota biss il-punti importanti:

  • Aħna nirtu, aħseb dwar l-argumenti - f'ħafna każijiet se jkun wieħed: conn_id;
  • Metodi standard li jipprevjenu: I llimitat ruħi get_conn(), li fiha nieħu l-parametri tal-konnessjoni bl-isem u nġib biss is-sezzjoni extra (dan huwa qasam JSON), li fih jien (skond l-istruzzjonijiet tiegħi stess!) Inpoġġi t-token tal-bot Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Noħloq eżempju tagħna TelegramBot, billi tagħtiha token speċifiku.

Dak kollox. Tista 'tikseb klijent minn ganċ bl-użu TelegramBotHook().clent jew TelegramBotHook().get_conn().

U t-tieni parti tal-fajl, li fiha nagħmel microwrapper għat-Telegram REST API, sabiex ma nkaxkarx l-istess python-telegram-bot għal metodu wieħed sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Il-mod korrett huwa li żżid kollox: TelegramBotSendMessage, TelegramBotHook, TelegramBot - fil-plugin, poġġi f'repożitorju pubbliku, u agħtih lil Open Source.

Waqt li konna qed nistudjaw dan kollu, l-aġġornamenti tar-rapporti tagħna rnexxielhom ifallu b'suċċess u jibagħtuli messaġġ ta' żball fil-kanal. Se niċċekkja biex nara jekk hux ħażin...

Apache Airflow: Nagħmlu ETL aktar faċli
Xi ħaġa kissret fid-doġ tagħna! Mhux hekk konna nistennew? Eżattament!

Int se tferra?

Tħoss li tlift xi ħaġa? Jidher li huwa wiegħed li jittrasferixxi d-data minn SQL Server għal Vertica, u mbagħad ħadha u mċaqlaq barra mis-suġġett, il-kanna!

Din l-atroċità kienet intenzjonata, kelli sempliċement niddeċifra xi terminoloġija għalik. Issa tista’ tmur lil hinn.

Il-pjan tagħna kien dan:

  1. Do dag
  2. Iġġenera ħidmiet
  3. Ara kemm hu sabiħ kollox
  4. Assenja numri tas-sessjoni għall-mili
  5. Ikseb data minn SQL Server
  6. Poġġi d-dejta f'Vertica
  7. Iġbor l-istatistika

Allura, biex dan kollu jibda jaħdem, għamilt żieda żgħira għal tagħna docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Hemmhekk inqajmu:

  • Vertica bħala ospitanti dwh bl-aktar settings default,
  • tliet każijiet ta' SQL Server,
  • aħna nimlew id-databases f'dawn tal-aħħar b'xi dejta (fl-ebda każ ma tħaresx lejn mssql_init.py!)

Inniedu t-tajjeb kollu bl-għajnuna ta 'kmand kemmxejn aktar ikkumplikat mill-aħħar darba:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Dak li randomizer miracle tagħna iġġenerat, tista 'tuża l-oġġett Data Profiling/Ad Hoc Query:

Apache Airflow: Nagħmlu ETL aktar faċli
Il-ħaġa prinċipali mhix li turiha lill-analisti

telabora fuq Sessjonijiet ETL Mhux se, kollox huwa trivjali hemmhekk: nagħmlu bażi, hemm sinjal fiha, nagħżlu kollox ma 'maniġer tal-kuntest, u issa nagħmlu dan:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Wasal iż-żmien jiġbru d-data tagħna minn mitt u nofs tabella tagħna. Ejja nagħmlu dan bl-għajnuna ta 'linji bla pretenzjoni ħafna:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Bl-għajnuna ta 'ganċ niksbu minn Airflow pymssql-connect
  2. Issostitwixxi restrizzjoni fil-forma ta 'data fit-talba - se tintefa' fil-funzjoni mill-magna tal-mudell.
  3. It-tmigħ tat-talba tagħna pandasmin se jġibna DataFrame - se jkun utli għalina fil-futur.

Qed nuża sostituzzjoni {dt} minflok parametru talba %s mhux għax jien Pinocchio ħażin, imma għax pandas ma jistax jimmaniġġja pymssql u tiżloq l-aħħar waħda params: Listgħalkemm verament irid tuple.
Innota wkoll li l-iżviluppatur pymssql iddeċieda li ma jappoġġjahx aktar, u wasal iż-żmien li tiċċaqlaq pyodbc.

Ejja naraw b'liema Airflow mimli l-argumenti tal-funzjonijiet tagħna:

Apache Airflow: Nagħmlu ETL aktar faċli

Jekk ma jkunx hemm dejta, allura m'hemm l-ebda punt li tkompli. Iżda hija wkoll stramba li tikkunsidra l-mili ta 'suċċess. Iżda dan mhuwiex żball. A-ah-ah, x'għandek tagħmel?! U hawn xi:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException se tgħid Airflow li m'hemm l-ebda żbalji, iżda aħna naqbżu l-kompitu. L-interface mhux se jkollu kwadru aħdar jew aħmar, iżda roża.

Ejja toss id-data tagħna kolonni multipli:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Jiġifieri:

  • Id-database li minnha ħadna l-ordnijiet,
  • ID tas-sessjoni tal-għargħar tagħna (se tkun differenti għal kull kompitu),
  • Hash mis-sors u l-ID tal-ordni - sabiex fid-database finali (fejn kollox jitferra f'tabella waħda) ikollna ID tal-ordni unika.

Il-pass ta’ qabel tal-aħħar jibqa’: ferra’ kollox ġo Vertica. U, b'mod stramb, wieħed mill-aktar modi spettakolari u effiċjenti biex isir dan huwa permezz tas-CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Qed nagħmlu riċevitur speċjali StringIO.
  2. pandas se ġentilment tpoġġi tagħna DataFrame fil - forma ta ' CSV-linji.
  3. Ejja niftħu konnessjoni mal-Vertica favorita tagħna b'ganċ.
  4. U issa bl-għajnuna copy() ibgħat id-dejta tagħna direttament lil Vertika!

Se nieħdu mingħand ix-xufier kemm imtlew linji, u ngħidu lill-maniġer tas-sessjoni li kollox huwa tajjeb:

session.loaded_rows = cursor.rowcount
session.successful = True

Dak kollox.

Fuq il-bejgħ, noħolqu l-pjanċa fil-mira manwalment. Hawnhekk ħallejt lili nnifsi magna żgħira:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Qed nuża VerticaOperator() Noħloq skema tad-database u tabella (jekk ma jeżistux diġà, ovvjament). Il-ħaġa prinċipali hija li tirranġa b'mod korrett id-dipendenzi:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Bħala sinteżi

- Ukoll, - qal il-ġurdien żgħir, - hux, issa
Int konvint li jien l-aktar annimal terribbli fil-foresta?

Julia Donaldson, Il-Gruffalo

Naħseb li kieku jien u l-kollegi tiegħi kellna kompetizzjoni: min se joħloq u jniedi malajr proċess ETL mill-bidu: huma bl-SSIS tagħhom u maws u jien bl-Airflow ... U allura nqabblu wkoll il-faċilità tal-manutenzjoni ... Ara naqra, naħseb li inti taqbel li se ngħaqqadhom minn kull naħa!

Jekk xi ftit aktar bis-serjetà, allura Apache Airflow - billi ddeskriviet proċessi fil-forma ta 'kodiċi tal-programm - għamel ix-xogħol tiegħi ħafna aktar komdu u pjaċevoli.

L-estensibbiltà illimitata tagħha, kemm f’termini ta’ plug-ins kif ukoll ta’ predispożizzjoni għall-iskalabbiltà, tagħtik l-opportunità li tuża Airflow fi kważi kull qasam: anke fiċ-ċiklu sħiħ tal-ġbir, it-tħejjija u l-ipproċessar tad-dejta, anke fit-tnedija tar-rokits (sa Mars, ta’ kors).

Parti finali, referenza u informazzjoni

Ir-rake li ġbarna għalik

  • start_date. Iva, dan diġà huwa meme lokali. Via l-argument ewlieni ta’ Doug start_date kollha jgħaddu. Fil-qosor, jekk tispeċifika fi start_date data kurrenti, u schedule_interval - jum wieħed, imbagħad DAG se jibda għada mhux qabel.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    U mhux aktar problemi.

    Hemm żball ieħor ta' runtime assoċjat miegħu: Task is missing the start_date parameter, li ħafna drabi jindika li insejt torbot mal-operatur dag.

  • Kollha fuq magna waħda. Iva, u bażijiet (Airflow innifsu u kisi tagħna), u web server, u Scheduler, u ħaddiema. U anke ħadem. Iżda maż-żmien, in-numru ta 'kompiti għas-servizzi kiber, u meta PostgreSQL beda jirrispondi għall-indiċi f'20 s minflok 5 ms, ħadna u ġarrejna.
  • Eżekutur Lokali. Iva, għadna bilqiegħda fuqha, u diġà wasalna fit-tarf tal-abbiss. LocalExecutor kien biżżejjed għalina s'issa, iżda issa wasal iż-żmien li nespandu b'mill-inqas ħaddiem wieħed, u ser ikollna naħdmu ħafna biex nimxu għal CeleryExecutor. U fid-dawl tal-fatt li tista 'taħdem magħha fuq magna waħda, xejn ma jwaqqafk milli tuża Karfus anki fuq server, li "naturalment, qatt mhu se jidħol fil-produzzjoni, onestament!"
  • Nuqqas ta' użu għodod integrati:
    • Konnessjonijiet biex taħżen il-kredenzjali tas-servizz,
    • SLA Miss biex twieġeb għall-kompiti li ma ħadmux fil-ħin,
    • xcom għall-iskambju tal-metadata (għidt metadata!) bejn il-kompiti dag.
  • Abbuż tal-posta. Ukoll, x'nista 'ngħid? Ġew stabbiliti twissijiet għar-repetizzjonijiet kollha tal-kompiti waqgħu. Issa l-Gmail tax-xogħol tiegħi għandu > 90k email mill-Airflow, u l-geddum tal-posta tal-web jirrifjuta li jiġbor u jħassar aktar minn 100 kull darba.

Aktar nases: Apache Airflow Pitfails

Aktar għodod ta 'awtomazzjoni

Sabiex naħdmu aktar b'rasna u mhux b'idejna, Airflow ħejja għalina dan:

  • SERĦAN API - għad għandu l-istatus ta' Sperimentali, li ma jwaqqafx milli jaħdem. Biha, tista 'mhux biss tikseb informazzjoni dwar dags u ħidmiet, iżda wkoll twaqqaf/tibda dag, toħloq DAG Run jew pool.
  • CLI - ħafna għodod huma disponibbli permezz tal-linja tal-kmand li mhumiex biss inkonvenjenti biex jintużaw permezz tal-WebUI, iżda ġeneralment huma assenti. Pereżempju:
    • backfill meħtieġa biex jerġgħu jibdew l-istanzi tal-kompitu.
      Per eżempju, l-analisti ġew u qalu: “U int, sħabu, għandek xi ħaġa bla sens fid-dejta mill-1 sat-13 ta’ Jannar! Waħħalha, irranġaha, irranġaha, irranġaha!” U int tali hob:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Servizz bażi: initdb, resetdb, upgradedb, checkdb.
    • run, li jippermettilek tmexxi kompitu ta 'istanza waħda, u anke punteġġ fuq id-dipendenzi kollha. Barra minn hekk, tista 'taħdemha permezz LocalExecutor, anki jekk għandek raggruppament Karfus.
    • Jagħmel pjuttost l-istess ħaġa test, biss ukoll fil-bażijiet ma jikteb xejn.
    • connections jippermetti ħolqien tal-massa ta 'konnessjonijiet mill-qoxra.
  • API Python - mod pjuttost iebes ta 'interazzjoni, li huwa maħsub għall-plugins, u mhux swarming fih b'idejn żgħar. Imma min iwaqqafna milli mmorru /home/airflow/dags, run ipython u tibda taqbad? Tista', pereżempju, tesporta l-konnessjonijiet kollha bil-kodiċi li ġej:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Konnessjoni mal-metadatabase Airflow. Ma nirrakkomandax li nikteb lilha, iżda li tikseb stati tal-kompitu għal diversi metriċi speċifiċi jista 'jkun ħafna aktar mgħaġġel u eħfef milli tuża kwalunkwe mill-APIs.

    Ejja ngħidu li mhux il-kompiti tagħna kollha huma idempotenti, iżda xi drabi jistgħu jaqgħu, u dan huwa normali. Iżda ftit imblukkar huma diġà suspettużi, u jkun meħtieġ li tiċċekkja.

    Oqgħod attent SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

referenzi

U ovvjament, l-ewwel għaxar links mill-ħruġ ta 'Google huma l-kontenut tal-folder Airflow mill-bookmarks tiegħi.

U l-links użati fl-artiklu:

Sors: www.habr.com