Apache Airflow: Kuita kuti ETL ive nyore

Mhoroi, ndini Dmitry Logvinenko - Data Injiniya weDhipatimendi reAnalytics reboka reVezet remakambani.

Ini ndichakuudza nezve chishandiso chinoshamisa chekugadzira ETL maitiro - Apache Airflow. Asi Airflow inoshanda zvakasiyana-siyana uye yakawandisa zvekuti iwe unofanirwa kutarisisa pairi kunyangwe iwe usingabatanidzwe mukuyerera kwedata, asi uine chido chenguva nenguva kuvhura chero maitiro uye kutarisa maitiro avo.

Uye hongu, ini handisi kuzongotaura chete, asiwo kuratidza: chirongwa chine yakawanda kodhi, skrini uye kurudziro.

Apache Airflow: Kuita kuti ETL ive nyore
Zvaunowanzo kuona kana iwe uchi google izwi rekuti Airflow / Wikimedia Commons

Tafura yezvinyorwa

Nhanganyaya

Apache Airflow yakangofanana neDjango:

  • yakanyorwa nepython
  • pane huru admin panel,
  • inogona kuwedzerwa nekusingaperi

- zviri nani chete, uye yakagadzirirwa zvinangwa zvakasiyana zvachose, izvo (sezvazvakanyorwa pamberi pekat):

  • kumhanya uye kutarisa mabasa pahuwandu husingagumi hwemakina (sezvakawanda zveCelery / Kubernetes uye hana yako ichakubvumidza)
  • ine simba rekufambiswa kwebasa kubva nyore nyore kunyora uye kunzwisisa Python kodhi
  • uye kugona kubatanidza chero dhatabhesi uye APIs nemumwe nemumwe uchishandisa ese akagadzirira-akagadzirwa zvikamu uye plugins dzakagadzirwa kumba (izvo zviri nyore kwazvo).

Isu tinoshandisa Apache Airflow seizvi:

  • isu tinounganidza data kubva kwakasiyana siyana (yakawanda SQL Server uye PostgreSQL zviitiko, akasiyana API ane application metrics, kunyangwe 1C) muDWH neODS (tine Vertica uye Clickhouse).
  • zvafamba sei cron, iyo inotanga maitiro ekubatanidza data paODS, uye inoongororawo kuchengetedza kwavo.

Kusvika nguva pfupi yadarika, zvatinoda zvakafukidzwa neimwe diki server ine 32 cores uye 50 GB ye RAM. Mu Airflow, izvi zvinoshanda:

  • Π±ΠΎΠ»Π΅Π΅ 200 dags (chaizvoizvo mafambiro ebasa, matakaisa mabasa),
  • mune imwe neimwe paavhareji 70 mabasa,
  • kunaka uku kunotanga (zvakare paavhareji) kamwe paawa.

Uye nezve mawedzerero atakaita, ini ndichanyora pazasi, asi ikozvino ngatitsanangurirei ΓΌber-dambudziko ratinozogadzirisa:

Kune matatu masosi eSQL Servers, imwe neimwe ine makumi mashanu dhatabhesi - zviitiko zveimwe purojekiti, zvichiteerana, ivo vane chimiro chakafanana (kunenge kwese kwese, mua-ha-ha), zvinoreva kuti imwe neimwe ine Orders tafura (nerombo rakanaka, tafura ine izvozvo. zita rinogona kusundirwa mune chero bhizinesi). Isu tinotora iyo data nekuwedzera masevhisi masevhisi (sosi server, sosi dhatabhesi, ETL basa ID) uye nekuzvikandira mukati, toti, Vertica.

Ngatitangei!

Chikamu chikuru, chinoshanda (uye zvishoma theoretical)

Sei zviri kwatiri (uye kwauri)

Apo miti yakanga yakakura uye ndakanga ndiri nyore SQL-schik mune imwe yekutengesa yeRussia, isu takabiridzira ETL maitiro aka data inoyerera tichishandisa maturusi maviri anowanikwa kwatiri:

  • Informatica Power Center -Iyo yakanyanya kupararira sisitimu, inogadzira zvakanyanya, ine yayo hardware, yayo pachayo shanduro. Ndakashandisa Mwari arambidza 1% yezvaanogona. Sei? Zvakanaka, kutanga kwezvose, iyi interface, kumwe kubva kuma380s, mupfungwa inotimanikidza. Chechipiri, iyi contraption yakagadzirirwa maitiro akanyanya kunaka, kutsamwa kwechikamu kushandiswa zvakare uye mamwe akakosha-bhizinesi-manomano. Nezve izvo zvinodhura, sebapiro reAirbus AXNUMX / gore, isu hatitaure chero chinhu.

    Chenjerera, skrini inogona kukuvadza vanhu vari pasi pemakore makumi matatu zvishoma

    Apache Airflow: Kuita kuti ETL ive nyore

  • SQL Server Integration Server - isu takashandisa iyi komuredhi mukuyerera kwedu kwemukati-purojekiti. Zvakanaka, kutaura zvazviri: isu tatoshandisa SQL Server, uye zvingave zvisina musoro kusashandisa maturusi ayo ETL. Zvose zviri mairi zvakanaka: zvose zviri mukati zvakanaka, uye kufambira mberi kunoshuma ... Asi ichi hachisi chikonzero nei tichida zvigadzirwa zve software, o, kwete nokuda kweizvi. Shanduro yacho dtsx (iyo iri XML ine node dzakashongedzwa pakuchengetedza) tinogona, asi chinangwa chii? Zvakadini nekugadzira pasuru yebasa iyo inodhonza mazana ematafura kubva kune imwe server kuenda kune imwe? Hongu, zana ripi, munwe wako we index uchadonha kubva pazvidimbu makumi maviri, uchidzvanya pane bhatani rembeva. Asi iye anotaridzika zvakanyanya mufashoni:

    Apache Airflow: Kuita kuti ETL ive nyore

Zvechokwadi takatsvaka nzira dzokubuda nadzo. Case even zvimwe akauya kune anozvinyora ega SSIS package jenareta ...

...uyezve basa idzva rakandiwana. Uye Apache Airflow yakandibata pairi.

Pandakaona kuti tsananguro yeETL iri nyore Python kodhi, handina kungotamba nemufaro. Aya ndiwo mashandurirwo akaitwa hova dzedata uye nekusiyana, uye kudurura matafura ane chimiro chimwe kubva kumazana emadhatabhesi mune chimwe chinangwa yakava nyaya yePython kodhi mune imwe nehafu kana maviri 13 ”skrini.

Kuunganidza sumbu

Ngatirege kuronga zvachose kindergarten, uye kwete kutaura pamusoro pezvinhu zviri pachena pano, sekuisa Airflow, dhatabhesi yako yakasarudzwa, Celery nezvimwe zviitiko zvinotsanangurwa mumadhokisi.

Kuitira kuti tikwanise kutanga kuyedza, ini ndakadhirowa docker-compose.yml umo:

  • Ngatisimudzei chaizvo wokufema: Murongi, Webserver. Ruva richange richitenderera ipapo kutarisa Celery mabasa (nekuti yakatosundirwa mukati apache/airflow:1.10.10-python3.7, asi isu hatinetseke)
  • PostgreSQL, umo Airflow ichanyora ruzivo rwayo rwebasa (scheduler data, execution statistics, nezvimwewo), uye Celery ichamaka mabasa apera;
  • Redis, iyo ichaita semutengesi webasa weCelery;
  • Celery mushandi, iyo ichave ichibatanidzwa mukuita zvakananga kwemabasa.
  • To folder ./dags isu tichawedzera mafaera edu netsananguro yemadags. Ivo vanozonhongwa panhunzi, saka hapana chikonzero chekumhanyisa murwi wese mushure mekuhotsira kwega kwega.

Mune dzimwe nzvimbo, iyo kodhi mumienzaniso haina kunyatso kuratidzwa (kuitira kuti isabatanidza zvinyorwa), asi pane imwe nzvimbo inogadziriswa mukuita. Mienzaniso yakakwana yekodhi yekushanda inogona kuwanikwa mune repository https://github.com/dm-logv/airflow-tutorial.

docker-kunyora.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Notes:

  • Mumusangano wekuumbwa, ndainyanya kuvimba nemufananidzo unozivikanwa puckel/docker-airflow - iva nechokwadi chekutarisa. Pamwe hapana chimwe chaunoda muhupenyu hwako.
  • Yese Airflow marongero anowanikwa kwete kuburikidza chete airflow.cfg, asi zvakare kuburikidza nekusiyana kwezvakatipoteredza (ndatenda kune vanogadzira), izvo zvandakatora mukana nazvo.
  • Sezvingatarisirwa, haina kugadzirwa-yakagadzirira: Ini nemaune handina kuisa marovero emwoyo pamidziyo, handina kunetseka nekuchengetedzeka. Asi ini ndakaita zvishoma zvakakodzera kune vedu vanoedza.
  • Ziva kuti:
    • Iyo dag folda inofanirwa kuwanikwa kune vese vanoronga uye nevashandi.
    • Izvi zvinoshandawo kune ese echitatu-bato raibhurari - iwo ese anofanirwa kuiswa pamakina ane scheduler nevashandi.

Zvakanaka, ikozvino zviri nyore:

$ docker-compose up --scale worker=3

Mushure mekunge zvese zvasimuka, unogona kutarisa pawebhu interfaces:

Basic pfungwa

Kana iwe usina chaunonzwisisa mune ese aya "dags", heino duramazwi pfupi:

  • Muparidzi - babamunini vanonyanya kukosha muAirflow, kutonga kuti marobhoti anoshanda nesimba, uye kwete munhu: anotarisisa chirongwa, anogadziridza dags, anotanga mabasa.

    Kazhinji, mushanduro dzekare, akange aine matambudziko nendangariro (kwete, kwete amnesia, asi kuvuza) uye paramende yenhaka yakatoramba iri mumagadzirirwo. run_duration - nguva yayo yekutanga. Asi iye zvino zvinhu zvose zvakanaka.

  • Dag (aka "dag") - "directed acyclic graph", asi tsananguro yakadaro inoudza vanhu vashoma, asi kutaura zvazviri igaba remabasa anodyidzana (ona pazasi) kana analogue yePackage muSSIS uye Workflow muInformatica. .

    Pamusoro pema dags, panogona kunge paine subdags, asi isu kazhinji hatizosvika kwavari.

  • DAG Run - yakatanga dag, iyo inopihwa yayo execution_date. Dagrans yedag imwechete inogona kushanda pamwe chete (kana iwe wakaita kuti mabasa ako ave asina simba, hongu).
  • Operator zvidimbu zvekodhi zvine basa rekuita chimwe chiitiko. Kune marudzi matatu evashandisi:
    • chiitosekuda kwedu PythonOperator, iyo inogona kuita chero (yakakodzera) Python kodhi;
    • chinja, iyo inotakura data kubva kune imwe nzvimbo kuenda kune imwe, inoti, MsSqlToHiveTransfer;
    • Switch kune rumwe rutivi, zvinokutendera kuti uite kana kunonoka kupfuudza kuurayiwa kwedag kusvikira chiitiko chaitika. HttpSensor inogona kudhonza iyo yakatsanangurwa yekupedzisira, uye kana yaunoda mhinduro yakamirira, tanga kutamisa GoogleCloudStorageToS3Operator. Murangariro wokuda kuziva unobvunza kuti: β€œnei? Mushure mezvose, unogona kudzokorodza mushandisi! " Uye zvino, kuitira kuti usavhare dziva remabasa nevaya vakamiswa. Iyo sensor inotanga, inotarisa uye inofa isati yaedza inotevera.
  • basa - vakaziviswa vanoshanda, zvisinei nerudzi, uye vakanamatira kune dag vanosimudzirwa kune chinzvimbo chebasa.
  • basa muenzaniso - apo mukuru kuronga akafunga kuti yaive nguva yekutumira mabasa kuhondo kune vanoita-vashandi (papo chaipo, kana tikashandisa. LocalExecutor kana kuti kure node panyaya ye CeleryExecutor), inopa mamiriro kwavari (kureva, seti yezvinosiyana - execution paramita), inowedzera kuraira kana kubvunza matemplate, uye kuabatanidza.

Isu tinogadzira mabasa

Chekutanga, ngatitarisei hurongwa hwakajairwa hwedoug yedu, tobva tanyura mune zvakadzama uye zvakanyanya, nekuti isu tinoshandisa zvisiri-zvidiki mhinduro.

Saka, muchimiro chayo chakareruka, dag rakadaro richaita seizvi:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Ngatizvinzwisisei:

  • Kutanga, isu tinopinza iyo inodiwa libs uye chimwe chinhu;
  • sql_server_ds - ichi chi List[namedtuple[str, str]] nemazita ekubatanidza kubva kuAirflow Connections uye dhatabhesi kwatinotora ndiro yedu;
  • dag - kuziviswa kwedag redu, iro rinofanira kunge riri mukati globals(), kana zvisina kudaro Airflow haizoiwana. Doug anodawo kuti:
    • zita rake ndiani orders -Iri zita rinobva raonekwa muwebhu interface,
    • kuti achashanda kubva pakati pehusiku musi wechisere waChikunguru,
    • uye inofanira kumhanya, angangoita maawa matanhatu ega ega (kune vakomana vakaoma pano pane timedelta() zvinotenderwa cron-line 0 0 0/6 ? * * *, kune zvishoma kutonhorera - kutaura senge @daily);
  • workflow() achaita basa guru, asi kwete ikozvino. Parizvino, isu tichangorasa mamiriro edu mulog.
  • Uye ikozvino mashiripiti akareruka ekugadzira mabasa:
    • tinomhanya nemumatsime edu;
    • tanga PythonOperator, iyo ichaita dummy yedu workflow(). Usakanganwa kutsanangura rakasiyana (mukati me dag) zita rebasa uye sunga iyo dag pachayo. Flag provide_context zvakare, inodururira dzimwe nharo muchiitiko, chatinozonyatsounganidza tichishandisa **context.

Parizvino, ndizvo chete. Zvatakawana:

  • dag nyowani muwebhu interface,
  • zana nehafu mabasa anozoitwa nenzira yakafanana (kana iyo Airflow, Celery marongero uye server simba inobvumira).

Zvakanaka, ndapotsa ndazviwana.

Apache Airflow: Kuita kuti ETL ive nyore
Ndiani achaisa ma dependencies?

Kurerutsa chinhu ichi chose, ndakapinda mukati docker-compose.yml processing requirements.txt pamanodhi ese.

Zvino zvaenda:

Apache Airflow: Kuita kuti ETL ive nyore

Gray squares zviitiko zvebasa zvinogadziriswa neanoronga.

Isu tinomirira zvishoma, mabasa anotorwa nevashandi:

Apache Airflow: Kuita kuti ETL ive nyore

Iwo akasvibira, hongu, akabudirira kupedza basa rawo. Zvitsvuku hazvina kubudirira zvakanyanya.

Nenzira, hapana folda pane yedu prod ./dags, hapana kuwiriranisa pakati pemichina - ese madhagi akarara mukati git paGitlab yedu, uye Gitlab CI inogovera zvigadziriso kumashini kana uchibatanidza mukati master.

A little about Ruva

Panguva iyo vashandi vari kurova-rova pacifiers edu, ngatiyeuke chimwe chishandiso chinogona kutiratidza chimwe chinhu - Ruva.

Iro peji rekutanga rine ruzivo rwepfupiso pamanodhi evashandi:

Apache Airflow: Kuita kuti ETL ive nyore

Iro peji rakanyanyisa rine mabasa akaenda kunoshanda:

Apache Airflow: Kuita kuti ETL ive nyore

Iro peji rinobhowa rine chimiro chebroker yedu:

Apache Airflow: Kuita kuti ETL ive nyore

Iro peji rinopenya rine basa rekuita magirafu uye nguva yavo yekuuraya:

Apache Airflow: Kuita kuti ETL ive nyore

Tinoisa pasi pasi

Saka, mabasa ese ashanda, unogona kutakura vakakuvara.

Apache Airflow: Kuita kuti ETL ive nyore

Uye kwaiva nevazhinji vakakuvadzwa - nokuda kwechikonzero chimwe kana chimwe. Panyaya yekushandiswa chaiko kwe Airflow, aya masikweya chaiwo anoratidza kuti data harina kusvika.

Iwe unofanirwa kutarisa irogi uye wotangazve akadonha ebasa zviitiko.

Nekudzvanya pane chero sikweya, tinoona zviito zviripo kwatiri:

Apache Airflow: Kuita kuti ETL ive nyore

Iwe unogona kutora uye kuita Bvisa iyo yakawa. Ndiko kuti, tinokanganwa kuti chimwe chinhu chakundikana ipapo, uye basa rimwechete remuenzaniso richaenda kumugadziri.

Apache Airflow: Kuita kuti ETL ive nyore

Zviripachena kuti kuita izvi negonzo nemakona ese matsvuku hazvina hunhu - izvi hazvisi izvo zvatinotarisira kubva kuAirflow. Sezvingatarisirwa, tine zvombo zvekuparadza kukuru: Browse/Task Instances

Apache Airflow: Kuita kuti ETL ive nyore

Ngatisarudze zvese kamwechete uye tosetazve ku zero, tinya chinhu chakakodzera:

Apache Airflow: Kuita kuti ETL ive nyore

Mushure mekuchenesa, matekisi edu anotaridzika seizvi (vakatomirira kuti mugadziri azvironge):

Apache Airflow: Kuita kuti ETL ive nyore

Zvisungo, hoko nezvimwe zvakasiyana

Yave nguva yekutarisa DAG inotevera, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа Ρ…ΠΎΡ€ΠΎΡˆΠΈΠ΅, ΠΎΡ‚Ρ‡Π΅Ρ‚Ρ‹ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Ρ‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ΠΠ°Ρ‚Π°Ρˆ, просыпайся, ΠΌΡ‹ {{ dag.dag_id }} ΡƒΡ€ΠΎΠ½ΠΈΠ»ΠΈ
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Munhu wese akamboita report update? Uyu ndiye zvakare: pane rondedzero yezvinyorwa kubva kwaunogona kuwana iyo data; pane runyoro rwekuiswa; usakanganwa kuhon kana zvese zvaitika kana kuputsika (zvakanaka, izvi hazvisi nezvedu, kwete).

Ngatiendei nefaira zvakare uye titarise zvinhu zvitsva zvisingaoneki:

  • from commons.operators import TelegramBotSendMessage - hapana chinotitadzisa kugadzira edu maopareta, ayo atakatora mukana nekugadzira diki wrapper yekutumira mameseji kune Unblocked. (Tichataura zvakawanda nezve mushandisi uyu pazasi);
  • default_args={} - dag inogona kugovera nharo dzakafanana kune vese vashandisi vayo;
  • to='{{ var.value.all_the_kings_men }}' - munda to isu hatizove nehardcode, asi zvine simba kugadzirwa tichishandisa Jinja uye chinosiyana chine runyorwa rwemaemail, ini ndakanyatsoisa mukati. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - mamiriro ekutanga mushandisi. Kwatiri, tsamba inobhururuka kuenda kumaboss chete kana zvese zvinoenderana nazvo zvashanda kubudirira;
  • tg_bot_conn_id='tg_main' - nharo conn_id gamuchira maID ekubatanidza atinogadzira mukati Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - mameseji muTeregiramu anobhururuka achienda chete kana paine akadonha mabasa;
  • task_concurrency=1 - Isu tinorambidza kuvhurwa panguva imwe chete kweakati wandei ebasa zviitiko zvebasa rimwe. Zvikasadaro, isu tichawana kuvhurwa panguva imwe chete kwevanoverengeka VerticaOperator (achitarisa tafura imwe);
  • report_update >> [email, tg] - zvese VerticaOperator sangana mukutumira tsamba nemameseji, seizvi:
    Apache Airflow: Kuita kuti ETL ive nyore

    Asi sezvo vazivisi vanoshanda vaine akasiyana ekutanga mamiriro, imwe chete ndiyo inoshanda. MuMuti Wekuona, zvese zvinotaridzika zvishoma zvishoma:
    Apache Airflow: Kuita kuti ETL ive nyore

Ndichataura mashoko mashoma pamusoro macros neshamwari dzavo - variables.

Macros iJinja inobata nzvimbo iyo inogona kutsiva ruzivo rwakasiyana runobatsira kuita nharo dzevashandisi. Semuenzaniso, seizvi:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} ichawedzera kune zviri mukati mekusiyana kwechirevo execution_date muchimiro YYYY-MM-DD: 2020-07-14. Chikamu chakanakisa ndechekuti misiyano yemamiriro ekunze inorovererwa kune chaiyo yebasa chiitiko (skweya muMuti View), uye kana yatangwazve, vanobatirira vanozowedzera kune imwecheteyo kukosha.

Iwo akagoverwa kukosha anogona kutariswa uchishandisa Rendered bhatani pane yega yega basa chiitiko. Aya ndiwo maitiro ebasa rekutumira tsamba:

Apache Airflow: Kuita kuti ETL ive nyore

Uye saka pabasa nekutumira meseji:

Apache Airflow: Kuita kuti ETL ive nyore

Rondedzero yakazara yeakavakirwa-mukati macros yeyazvino vhezheni iripo inowanikwa pano: macros reference

Uyezve, nerubatsiro rwema plugins, tinogona kuzivisa edu macros, asi iyo imwe nyaya.

Pamusoro pezvinhu zvakafanotsanangurwa, isu tinogona kutsiva kukosha kwezvakasiyana zvedu (ini ndakatoshandisa izvi mune kodhi iri pamusoro). Ngatigadzirirei mukati Admin/Variables zvinhu zviviri:

Apache Airflow: Kuita kuti ETL ive nyore

Zvese zvaunogona kushandisa:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Iko kukosha kunogona kuva scalar, kana inogonawo kuva JSON. Kana iri JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

ingoshandisa nzira kune kiyi yaunoda: {{ var.json.bot_config.bot.token }}.

Ini ndichataura izwi rimwe chete uye kuratidza imwe skrini nezve kubatana. Zvese ndezvekutanga pano: pane peji Admin/Connections isu tinogadzira chinongedzo, wedzera yedu logins / mapassword uye mamwe chaiwo paramita ipapo. Sezvizvi:

Apache Airflow: Kuita kuti ETL ive nyore

Mapassword anogona kuvharirwa (kunyanya kupfuura iyo default), kana iwe unogona kusiya kunze rudzi rwekubatanidza (sezvandakaitira tg_main) - chokwadi ndechekuti rondedzero yemhando dzakaomeswa muAirflow modhi uye haigone kuwedzerwa pasina kupinda mumakodhi makodhi (kana kamwe kamwe ndisina kuGoogle chimwe chinhu, ndapota ndiruramise), asi hapana chinotitadzisa kuwana zvikwereti chete. zita.

Iwe unogona zvakare kuita akati wandei kubatanidza ane zita rimwechete: mune ino kesi, iyo nzira BaseHook.get_connection(), iyo inotiwanira hukama nemazita, ichapa random kubva kune akati wandei mazita (zvingave zvine musoro kugadzira Round Robin, asi ngatiisiye pahana yevagadziri veAirflow).

Variables uye maConnections zvechokwadi maturusi anotonhorera, asi zvakakosha kuti usarasikirwe nechiyero: ndedzipi zvikamu zvekuyerera kwako zvaunochengeta mukodhi pachayo, uye zvikamu zvipi zvaunopa Airflow kuchengetedza. Kune rimwe divi, zvinogona kuve nyore kukurumidza kuchinja kukosha, semuenzaniso, bhokisi rekutumira, kuburikidza neUI. Kune rimwe divi, izvi zvichiri kudzoka kune mbeva yekudzvanya, kubva isu (ini) taida kubvisa.

Kushanda nekubatanidza ndiro rimwe remabasa zvikorekedzo. Kazhinji, Airflow hooks mapoinzi ekubatanidza kune wechitatu-bato masevhisi nemaraibhurari. Eg, JiraHook ichavhura mutengi kuti titaurirane naJira (unogona kufambisa mabasa nekudzoka), uye nerubatsiro rwe SambaHook unogona kusundira faira remunharaunda ku smb-point.

Kuongorora tsika opareta

Uye tave pedyo nekutarisa kuti inogadzirwa sei TelegramBotSendMessage

kodhi commons/operators.py nemushandisi chaiye:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Pano, sezvimwe zvese muAirflow, zvese zviri nyore:

  • Nhaka kubva BaseOperator, iyo inoshandisa akati wandei Airflow-chaiwo zvinhu (tarisa kuzorora kwako)
  • Declared fields template_fields, umo Jinja achatsvaga macros ekugadzirisa.
  • Akaronga nharo dzakakodzera __init__(), isa zvisizvo pazvinenge zvakakodzera.
  • Hatina kukanganwa nezvekutanga kwemadzitateguru zvakare.
  • Yakavhura hoko yaienderana TelegramBotHookyakagamuchira chinhu chemutengi kubva kwairi.
  • Overridden (redefined) nzira BaseOperator.execute(), iyo Airfow ichazununguka kana nguva yasvika yekuvhura opareta - mairi isu tichaita chiitiko chikuru, tichikanganwa kupinda. (Tinopinda, nenzira, mukati chaimo stdout ΠΈ stderr - Kuyerera kwemhepo kunopindira zvese, kuchiputira zvakanaka, kuodza pazvinenge zvakakodzera.)

Ngationei zvatinazvo commons/hooks.py. Chikamu chekutanga chefaira, ine hoko pachayo:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ini handitomboziva zvekutsanangura pano, ini ndinongoona zvakakosha mapoinzi:

  • Isu tinotora nhaka, funga nezve nharo - kazhinji ichava imwe: conn_id;
  • Kupfuura nzira dzakajairika: Ndakazviganhurira get_conn(), umo ini ndinowana maparamita ekubatanidza nezita uye ndingowana chikamu extra (iyi munda weJSON), umo ini (maererano nemirairo yangu!) ndinoisa iyo Telegraph bot token: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ini ndinogadzira muenzaniso wedu TelegramBot, achichipa chiratidzo chaicho.

Ndizvo zvose. Unogona kuwana mutengi kubva pachiredzo uchishandisa TelegramBotHook().clent kana TelegramBotHook().get_conn().

Uye chikamu chechipiri chefaira, umo ini ndinogadzira microwrapper yeTeregiramu REST API, kuti urege kudhonza zvakafanana. python-telegram-bot kune imwe nzira sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Nzira yakarurama ndeyokuwedzera zvose: TelegramBotSendMessage, TelegramBotHook, TelegramBot - mune plugin, isa mune yeruzhinji repository, uye ipa iyo Open Source.

Tichiri kudzidza zvese izvi, mishumo yedu yakagadziridzwa yakakwanisa kutadza kubudirira uye kunditumira meseji yekukanganisa muchiteshi. Ndotarisa kuti ndione kana zvisirizvo...

Apache Airflow: Kuita kuti ETL ive nyore
Chimwe chinhu chakaputsika mukati medu! Handizvo zvataitarisira here? Ndizvozvo!

Uri kuzodira here?

Unonzwa ndasuwa chimwe chinhu here? Zvinoita sekuti akavimbisa kuendesa data kubva kuSQL Server kuenda kuVertica, uye akabva aitora ndokubvisa musoro, tsotsi!

Hutsinye uhwu hwaive hwemaune, ndaifanira kungodudzira mamwe mazwi ekutaura kwauri. Iye zvino unogona kuenda mberi.

Chirongwa chedu chaive ichi:

  1. Do dag
  2. Gadzira mabasa
  3. Ona kunaka kwakaita zvinhu zvese
  4. Ipa nhamba dzechikamu kuti uzadze
  5. Tora data kubva kuSQL Server
  6. Isa data muVertica
  7. Unganidza nhamba

Saka, kuti zvose zvitange, ndakaita kuwedzera kuduku kune yedu docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Ikoko tinosimudza:

  • Vertica semuenzi dwh ine zvigadziriso zvakanyanya,
  • zviitiko zvitatu zveSQL Server,
  • isu tinozadza dhatabhesi mune yekupedzisira neimwe data (pasina mhosva usatarise mukati mssql_init.py!)

Isu tinotangisa zvese zvakanaka nerubatsiro rwekuraira zvishoma kwakaoma kupfuura nguva yekupedzisira:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Izvo zvakagadzirwa nechishamiso chedu randomizer, unogona kushandisa chinhu chacho Data Profiling/Ad Hoc Query:

Apache Airflow: Kuita kuti ETL ive nyore
Chinhu chikuru hachisi chokuchiratidza kune vaongorori

tsanangura pamusoro ETL zvikamu Ini handidi, zvese zvidiki ipapo: tinoita hwaro, pane chiratidzo mairi, tinoputira zvese nemaneja wemamiriro, uye ikozvino tinoita izvi:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Nguva yasvika unganidza data redu kubva pamatafura edu zana nehafu. Ngatiitei izvi nerubatsiro rwemitsara isingaremekedzi:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Nekubatsirwa kwehokwe tinowana kubva kuAirflow pymssql-batanidza
  2. Ngatitsivei kurambidzwa muchimiro chezuva muchikumbiro - chinokandwa mubasa neinjini yetemplate.
  3. Kudyisa chikumbiro chedu pandasndiani achatitora DataFrame - zvichatibatsira mune ramangwana.

Ndiri kushandisa chinotsiva {dt} pachinzvimbo chekukumbira parameter %s kwete nekuti ndiri Pinocchio akaipa, asi nekuti pandas haigoni kubata pymssql uye anotsvedza wekupedzisira params: Listkunyange zvazvo achida chaizvo tuple.
Uyewo cherechedza kuti mugadziri pymssql akasarudza kusamutsigira zvakare, uye yave nguva yekubuda kunze pyodbc.

Ngationei izvo Airflow yakazadza nharo dzemabasa edu ne:

Apache Airflow: Kuita kuti ETL ive nyore

Kana pasina data, saka hapana chikonzero chekuenderera mberi. Asi zvakare zvinoshamisa kufunga nezvekuzadza kwakabudirira. Asi uku hakusi kukanganisa. A-ah-ah, kuita sei?! Uye heino chii:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException tichaudza Airflow kuti hapana zvikanganiso, asi isu tinosvetuka basa racho. Iyo interface haizove negirinhi kana tsvuku sikweya, asi pink.

Ngatikande data redu mbiru dzakawanda:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Zita:

  • Database kwatakatora maodha,
  • ID yechikamu chedu mafashama (zvichave zvakasiyana pabasa rose),
  • A hashi kubva kunobva uye kurongeka ID - kuitira kuti mudura rekupedzisira (apo zvese zvinodururwa mutafura imwe) isu tine yakasarudzika ID.

Danho rekupedzisira rinosara: dururira zvese muVertica. Uye, zvisingaite, imwe yedzakanakisa uye inoshanda nzira dzekuita izvi kuburikidza neCSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Tiri kugadzira yakakosha kugamuchira StringIO.
  2. pandas achaisa zvedu nomutsa DataFrame muchimiro CSV-mitsetse.
  3. Ngativhure chinongedzo kune yedu yatinoda Vertica ine hoko.
  4. Uye zvino nerubatsiro copy() tumira data redu zvakananga kuVertika!

Tichatora kubva kumutyairi kuti mitsetse mingani yakazadzwa, uye toudza maneja wechikamu kuti zvese zvakanaka:

session.loaded_rows = cursor.rowcount
session.successful = True

Ndizvo zvose.

Pakutengesa, tinogadzira ndiro inotarirwa nemaoko. Apa ndakazvibvumira muchina mudiki:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Ndiri kushandisa VerticaOperator() Ini ndinogadzira dhatabhesi schema uye tafura (kana ivo vasati vavapo, hongu). Chinhu chikuru ndechekugadzirisa nemazvo zvinoenderana:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Summing up

- Zvakanaka, - akadaro mbeva duku, - handizvo, ikozvino
Une chokwadi here kuti ndini mhuka inotyisa kwazvo musango?

Julia Donaldson, The Gruffalo

Ini ndinofunga kana ini nevandinoshanda navo tive nemakwikwi: ndiani achakurumidza kugadzira uye kutanga chirongwa cheETL kubva pakutanga: ivo neSSIS yavo uye mbeva uye ini neAirflow ... Uye isu taizofananidzawo kureruka kwekugadzirisa ... Wow, ndinofunga unobvuma kuti ndichavarova pamativi ese!

Kana zvishoma zvakanyanya, saka Apache Airflow - nekutsanangura maitiro muchimiro chekodhi yepurogiramu - akaita basa rangu. zvakawanda zvakanyanya kugadzikana uye zvinonakidza.

Kuwedzera kwayo kusingagumi, zvese maererano neplug-ins uye predisposition kune scalability, inokupa iwe mukana wekushandisa Airflow munharaunda chero ipi zvayo: kunyangwe mukutenderera kuzere kwekuunganidza, kugadzirira uye kugadzirisa data, kunyangwe mukuvhura maroketi (kuMars, course).

Chikamu chekupedzisira, chirevo uye ruzivo

Raka takakuunganidzirai

  • start_date. Hongu, iyi yatova meme yemunharaunda. Via Doug's nharo huru start_date zvose zvinopfuura. Muchidimbu, kana iwe ukataura mukati start_date zuva razvino, uye schedule_interval - rimwe zuva, ipapo DAG ichatanga mangwana kwete kare.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Uye hapasisina matambudziko.

    Pane imwe runtime kukanganisa kwakabatana nayo: Task is missing the start_date parameter, iyo inowanzoratidza kuti wakanganwa kusunga kune dag operator.

  • Zvese pamushini mumwe. Hongu, uye mabhesi (Airflow pachayo uye kupfeka kwedu), uye webhu server, uye scheduler, uye vashandi. Uye zvakatoshanda. Asi nekufamba kwenguva, huwandu hwemabasa emasevhisi hwakakura, uye PostgreSQL payakatanga kupindura kune index mu20 s panzvimbo ye5 ms, takaitora tikaenda nayo.
  • LocalExecutor. Hongu, isu tichiri kugara pairi, uye isu tatosvika kumucheto kwegomba rakadzika-dzika. LocalExecutor yanga yakatikwanira kusvika parizvino, asi ikozvino yave nguva yekuwedzera nemushandi mumwechete, uye isu tichafanira kushanda nesimba kuti tiende kuCeleryExecutor. Uye tichifunga nezvenyaya yekuti iwe unogona kushanda nayo pamushini mumwe chete, hapana chinokutadzisa kushandisa Celery kunyangwe pane sevha, iyo "zvechokwadi, haimbofi yakapinda mukugadzirwa, kutendeseka!"
  • Kusashandisa zvishandiso zvakavakwa:
    • Connections kuchengetedza zvitupa zvebasa,
    • SLA Misses kupindura kumabasa asina kushanda nenguva,
    • xcom yekuchinjana metadata (ndakadaro metadata!) pakati pe dag mabasa.
  • Kushungurudzwa kwetsamba. Zvakanaka, chii chandingati? Nyevero dzakagadzirirwa kudzokororwa kwese kwemabasa akadonha. Ikozvino basa rangu Gmail ine >90k maemail kubva kuAirflow, uye webhu mail muzzle inoramba kutora nekudzima anopfuura zana panguva.

Mimwe misungo: Apache Airflow Pitfails

Zvimwe otomatiki zvishandiso

Kuti isu tishande zvakanyanya nemisoro yedu uye kwete nemaoko edu, Airflow yakatigadzirira izvi:

  • VAMWE API - achiri nechimiro cheKuedza, izvo zvisingamutadzisi kushanda. Nayo, haugone chete kuwana ruzivo nezve dags uye mabasa, asi zvakare mira / tanga dag, gadzira DAG Run kana dziva.
  • CLI - Maturusi mazhinji anowanikwa kuburikidza nemutsara wekuraira izvo zvisiri kungonetsa kushandisa kuburikidza neWebUI, asi kazhinji asipo. Semuyenzaniso:
    • backfill inodiwa kutangazve zviitiko zvebasa.
      Semuenzaniso, vaongorori vakauya ndokuti: β€œUye iwe, sahwira, une upenzi mune data kubva muna Ndira 1 kusvika 13! Gadzirisa, gadzirisa, gadzirisa, gadzirisa!" Uye iwe uri hobho yakadaro:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Base service: initdb, resetdb, upgradedb, checkdb.
    • run, iyo inokutendera kuti umhanye chiitiko chimwe chete, uye kunyangwe zvibodzwa pane zvese zvinotsamira. Uyezve, unogona kuimhanyisa kuburikidza LocalExecutor, kunyangwe uine Celery cluster.
    • Inoita zvakafanana zvakafanana test, chete mumabhesi hapana chaanonyora.
    • connections inobvumira kusikwa kwehuwandu hwekubatanidza kubva kune shell.
  • python api - iyo yakaomesesa nzira yekudyidzana, iyo inoitirwa plugins, uye kwete kuputika mairi nemaoko madiki. Asi ndiani achatitadzisa kuenda /home/airflow/dags, mhanya ipython uye kutanga kukanganisa? Iwe unogona, semuenzaniso, kutumira kunze zvese zvinongedzo neinotevera kodhi:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Kubatanidza kune Airflow metadatabase. Ini handikurudzire kuinyorera, asi kuwana nzvimbo dzebasa kune akasiyana-siyana metrics kunogona kukurumidza uye nyore pane kushandisa chero maAPI.

    Ngatitii haasi ese emabasa edu asina simba, asi anogona kudonha dzimwe nguva, uye izvi zvakajairika. Asi mashoma blockages atonyumwira, uye zvingave zvakakosha kuti utarise.

    Ngwarira SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

nezvakanyorwa

Uye zvechokwadi, gumi ekutanga zvinongedzo kubva pakuburitswa kweGoogle zviri mukati meiyo Airflow folda kubva kumabhukimaki angu.

Uye zvinongedzo zvakashandiswa muchinyorwa:

Source: www.habr.com