Apache Airflow: Fa'afaigofie le ETL

Talofa, o aʻu o Dmitry Logvinenko - Inisinia Faʻamatalaga o le Matagaluega o Faʻamaumauga a le Vezet vaega o kamupani.

O le a ou taʻu atu ia te oe se meafaigaluega matagofie mo le atinaʻeina o faiga ETL - Apache Airflow. Ae o le Airflow e matua tele lava ma tele e tatau ona e vaʻai totoʻa i ai e tusa lava pe e te le o aʻafia i faʻamaumauga, ae e iai se manaʻoga e faʻalauiloa i lea taimi ma lea taimi soʻo se faiga ma mataʻituina a latou faʻatinoga.

Ma ioe, o le a le gata ina ou taʻu atu, ae faʻaalia foi: o le polokalama e tele naua code, screenshots ma fautuaga.

Apache Airflow: Fa'afaigofie le ETL
O mea e masani ona e va'aia pe a e google le upu Airflow / Wikimedia Commons

Lisi o Mataupu

Faatomuaga

Apache Airflow e pei lava o Django:

  • tusia i le python
  • o loʻo i ai se pule faʻapitoa,
  • fa'alautele e le gata

- naʻo le sili atu, ma na faia mo faʻamoemoega eseese, e pei ona tusia i luma o le kat):

  • tamoe ma mataʻituina galuega i luga o se numera e le faʻatapulaʻaina o masini (e pei o le tele o Celery / Kubernetes ma lou lotofuatiaifo o le a faʻatagaina oe)
  • faʻatasi ai ma le faʻagasologa o galuega faʻavae mai le faigofie tele e tusi ma malamalama i le Python code
  • ma le mafai ona faʻafesoʻotaʻi soʻo se faʻamaumauga ma API i le tasi ma le isi e faʻaaoga uma vaega uma ua saunia ma faʻapipiʻi fale (lea e matua faigofie lava).

Matou te faʻaaogaina Apache Airflow pei o lenei:

  • matou te aoina faʻamatalaga mai faʻamatalaga eseese (tele SQL Server ma PostgreSQL faʻataʻitaʻiga, API eseese faʻatasi ai ma fua faʻatatau, e oʻo lava i le 1C) i le DWH ma le ODS (e iai a matou Vertica ma Clickhouse).
  • o le a le maualuga cron, lea e amata ai le faʻamautuina o faʻamaumauga i luga o le ODS, ma mataʻituina foi a latou tausiga.

Seia oʻo mai talu ai nei, o matou manaʻoga na ufiufi e se tasi tamaʻi server ma 32 cores ma 50 GB o RAM. I le Airflow, e aoga lenei mea:

  • sili atu 200 aso (o le mea moni o galuega, lea na matou faʻatumu ai galuega),
  • i le averesi taitasi 70 galuega,
  • amata lenei lelei (fa'apena foi i le averesi) faatasi i le itula.

Ma e uiga i le auala na tatou faʻalauteleina ai, o le a ou tusia i lalo, ae o lenei seʻi o tatou faʻamalamalamaina le über-faʻafitauli o le a tatou foia:

E tolu uluai SQL Servers, e taʻitasi ma 50 faʻamaumauga - faʻataʻitaʻiga o le tasi poloketi, i le faasologa, latou te tutusa le fausaga (toetoe lava i soo se mea, mua-ha-ha), o lona uiga e tofu i latou ma se laulau o Poloaiga (lelei, o se laulau ma lena. igoa e mafai ona tulei i soo se pisinisi). Matou te ave faʻamaumauga e ala i le faʻaopoopoina o faʻalapotopotoga o auaunaga (server source, source database, ETL task ID) ma lafo i totonu, fai mai, Vertica.

Tatou o!

O le vaega autu, faʻatinoga (ma sina faʻamatalaga)

Aisea tatou (ma oe)

Ina ua tetele laau ma sa ou faigofie SQL-schik i se tasi o faleoloa a Rusia, na matou faʻaseseina faiga ETL aka faʻamatalaga faʻamatalaga e faʻaaoga ai meafaigaluega e lua o loʻo avanoa mo i matou:

  • Informatica Power Center - o se faiga e matua salalau, e matua'i aoga, ma ana lava meafaigaluega, lona lava fa'aliliuga. Na ou faʻaaogaina le Atua faʻasa 1% o ona gafatia. Aisea? Ia, muamua lava, o lenei atinaʻe, i se mea mai le 380s, na faʻamalosia ai i matou i le mafaufau. Lona lua, o lenei mea faʻapipiʻi ua mamanuina mo faiga sili ona manaia, toe faʻaaogaina vaega ita ma isi mea taua-pisinisi-togafiti. E uiga i le mea moni e tau, pei o le apaau o le Airbus AXNUMX / tausaga, matou te le fai atu se mea.

    Faʻaeteete, o le faʻamalama e mafai ona afaina ai tagata i lalo ole 30

    Apache Airflow: Fa'afaigofie le ETL

  • SQL Server Integration Server - na matou fa'aogaina lenei uo i totonu oa matou galuega faatino. Ia, o le mea moni: ua uma ona matou faʻaogaina le SQL Server, ma o le a le talafeagai le le faʻaaogaina o ana meafaigaluega ETL. O mea uma i totonu e lelei: o le atinaʻe uma e matagofie, ma le alualu i luma lipoti ... Ae e le o le mea lea matou te fiafia ai i mea faʻapipiʻi, oi, e le mo lenei. Version it dtsx (lea o le XML ma nodes shuffled i luga o sefe) tatou mafai, ae o le a le uiga? E fa'afefea le faia o se pusa galuega e toso atu ai le faitau selau o laulau mai le tasi server i le isi? Ioe, o le a le selau, o lou tamatamailima lima o le a pa'u ese mai le luasefulu fasi, kiliki i luga o le kiore. Ae e mautinoa lava e sili atu ona faʻalelei:

    Apache Airflow: Fa'afaigofie le ETL

E mautinoa lava sa matou vaavaai mo auala e alu ese ai. Tulaga tutusa toetoe lava na oʻo mai i se faʻapipiʻi pusa SSIS na tusia e ia lava ...

… ona maua ai lea o a'u e se galuega fou. Ma na maua a'u e Apache Airflow.

Ina ua ou iloa o faʻamatalaga o le ETL o le Python code faigofie, ou te leʻi siva ma le fiafia. O le auala lea na faʻaliliuina ma eseese ai faʻamaumauga, ma sasaa laulau ma se fausaga e tasi mai le faitau selau o faʻamaumauga i totonu o le tasi sini na avea ma mataupu o le Python code i le tasi ma le afa pe lua 13 "screens.

Fa'apotopotoina le fuifui

Aua neʻi o tatou faʻatulagaina se aʻoga atoa, ma aua le talanoa e uiga i mea manino atoatoa iinei, e pei o le faʻapipiʻiina o Airflow, lau faʻamaumauga filifilia, Seleli ma isi mataupu o loʻo faʻamatalaina i totonu o faʻailoga.

Ina ia mafai ona vave amata faʻataʻitaʻiga, na ou tusia ai docker-compose.yml lea:

  • Tatou siitia moni lava Felaʻuaiga o le ea: Fa'atonu, Webserver. Fugalaau o le a taamilo foi iina e mataʻituina galuega Celery (aua ua uma ona tulei i totonu apache/airflow:1.10.10-python3.7, ae matou te le afaina)
  • PostgreSQL, lea o le a tusia ai e le Airflow ana faʻamatalaga auʻaunaga (faʻamaumauga faʻatulagaina, faʻamaumauga o faʻatinoga, ma isi), ma o le a faʻailogaina e Seleri galuega ua maeʻa;
  • Redis, lea o le a galue o se tagata e faia galuega mo Seleri;
  • Tagata fai seleni, lea o le a auai i le faʻatinoina saʻo o galuega.
  • I le faila ./dags o le a matou faʻaopoopoina a matou faila ma faʻamatalaga o dags. O le a pikiina i luga o le lele, o lea e le manaʻomia ai le faʻafefeteina o le faaputuga atoa pe a uma le mafatua.

I nisi o nofoaga, o le code i faʻataʻitaʻiga e leʻo faʻaalia atoatoa (ina ia le faʻafefeteina le tusitusiga), ae o se mea e suia i le faagasologa. E mafai ona maua faʻataʻitaʻiga faʻataʻitaʻiga faʻataʻitaʻiga atoatoa ile fale teu oloa https://github.com/dm-logv/airflow-tutorial.

faicker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Faamatalaga:

  • I le faʻapotopotoga o le fatuga, na ou faʻalagolago tele i le ata lauiloa puckel/docker-airflow - ia mautinoa e siaki i fafo. Atonu e te le manaʻomia se isi mea i lou olaga.
  • E maua uma faatulagaga Airflow e le gata i airflow.cfg, ae faʻapea foʻi i suiga o le siosiomaga (faʻafetai i le au atinaʻe), lea na ou faʻaogaina ma le leaga.
  • E masani lava, e leʻo saunia: Ou te leʻi tuʻuina le fatu fatu i luga o pusa, ou te leʻi faʻalavelave i le saogalemu. Ae na ou faia le mea aupito maualalo talafeagai mo a matou tagata suʻesuʻe.
  • Manatua:
    • E tatau ona avanoa le faila dag i le fa'atulagaina ma le aufaigaluega.
    • E fa'apea fo'i i faletusi uma a isi vaega - e tatau ona fa'apipi'i uma i masini e iai se fa'asologa ma tagata faigaluega.

Ia, o lea ua faigofie:

$ docker-compose up --scale worker=3

A maeʻa mea uma, e mafai ona e vaʻavaʻai i fesoʻotaʻiga i luga ole laiga:

Faʻamatalaga autu

Afai e te leʻi malamalama i se mea i nei "dags" uma, o se lomifefiloi puupuu lenei:

  • Faʻatulaga - o le tuagane sili ona taua i le Airflow, o loʻo pulea le galue malosi o robots, ae le o se tagata: mataʻituina le faʻatulagaga, faʻafouina aso, faʻalauiloa galuega.

    I se tulaga lautele, i lomiga tuai, sa i ai ni faafitauli i le mafaufau (leai, e le o le amnesia, ae o le gaogao) ma o le talatuu na tumau pea i le configs. run_duration - le va o lona toe amata. Ae o lea ua lelei mea uma.

  • DAG (aka "dag") - "faʻatonu acyclic kalafi", ae o sea faʻamatalaga o le a taʻuina atu i nai tagata, ae o le mea moni o se pusa mo galuega e fegalegaleai ai le tasi ma le isi (silasila i lalo) poʻo se faʻataʻitaʻiga o Package i SSIS ma Workflow i Informatica .

    I le faaopoopo atu i dags, atonu o loʻo i ai pea subdags, ae atonu o le a tatou le oʻo atu ia i latou.

  • DAG Run - amataga aso, lea ua tofia i ai lava execution_date. Dagrans o le aso lava e tasi e mafai ona galulue tutusa (pe a fai na e faia au galuega e le mafai, ioe).
  • Pule o vaega o tulafono laiti e nafa ma le faia o se gaioiga patino. E tolu ituaiga o tagata faigaluega:
    • gaoioigapei o le matou fiafia PythonOperator, lea e mafai ona faʻatinoina soʻo se (aoga) Python code;
    • faaliliuina atu, lea e felauai fa'amatalaga mai lea nofoaga i lea nofoaga, fai mai, MsSqlToHiveTransfer;
    • sensor i le isi itu, o le a faʻatagaina oe e tali pe faʻagesegese le faʻataunuʻuina atili o le aso seia oʻo ina tupu se mea na tupu. HttpSensor e mafai ona tosoina le faʻaiʻuga faʻamaonia, ma a faʻatali le tali manaʻomia, amata le fesiitaiga GoogleCloudStorageToS3Operator. O le a fesili se mafaufau suʻesuʻe: “Aisea? A uma mea uma, e mafai ona e faia toe fai saʻo i totonu o le faʻalapotopotoga! Ona sosoo ai lea, ina ia aua neʻi faʻapipiʻi le vaitaele o galuega ma tagata faʻamalolo le tumau. E amata le masini, siaki ma mate aʻo leʻi faia le isi taumafaiga.
  • Task - ta'uta'u fa'agaioiga, tusa lava po'o le a le ituaiga, ma fa'apipi'i i le aso e si'itia i le tulaga o galuega.
  • fa'ata'ita'iga o galuega - ina ua filifili le fuafuaga lautele ua oʻo i le taimi e tuʻuina atu ai galuega i le taua i luga o tagata faʻatino-tagata faigaluega (saʻo i le taimi, pe a tatou faʻaogaina LocalExecutor pe i se node mamao i le tulaga o CeleryExecutor), na te tuʻuina atu se faʻamatalaga ia i latou (e pei o se seti o fesuiaiga - faʻataunuʻu tapulaʻa), faʻalauteleina faʻatonuga poʻo faʻataʻitaʻiga fesili, ma faʻaputuina.

Matou te gaosia galuega

Muamua, seʻi o tatou faʻavasegaina le fuafuaga lautele o la tatou doug, ona tatou faʻasalalau atili lea i faʻamatalaga atili, aua tatou te faʻaaogaina ni fofo e le taua.

O lea la, i lona foliga sili ona faigofie, o se aso e foliga mai e pei o lenei:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Sei o tatou mafaufau i ai:

  • Muamua, matou te faʻaulufale mai le libs manaʻomia ma se isi mea;
  • sql_server_ds - o lea lava List[namedtuple[str, str]] fa'atasi ai ma igoa o feso'ota'iga mai le Airflow Connections ma fa'amaumauga o le a ave ai la matou ipu;
  • dag - le faʻaaliga o la tatou aso, lea e tatau ona i ai i totonu globals(), a leai o le a le maua e le Airflow. E tatau foi ona fai mai Doug:
    • o ai lona igoa orders - o lenei igoa o le a aliali mai i luga o le upega tafaʻilagi,
    • o le a ia galue mai le vaeluapo i le aso valu o Iulai,
    • ma e tatau ona tamoʻe, e tusa ma le 6 itula uma (mo tagata faigata iinei nai lo timedelta() fa'atagaina cron-laina 0 0 0/6 ? * * *, mo le to'afilemu - o se fa'aaliga pei @daily);
  • workflow() o le a faia le galuega autu, ae le o le taimi nei. Mo le taimi nei, o le a na'o le lafoa'i o la tatou tala i totonu o le ogalaau.
  • Ma o lenei o le togafiti faigofie o le fatuina o galuega:
    • tatou te taufetuli i o tatou punavai;
    • amatalia PythonOperator, lea o le a faʻataunuʻuina a tatou faʻataʻitaʻiga workflow(). Aua nei galo e faʻamaonia se igoa tulaga ese (i totonu o le aso) o le galuega ma nonoa le aso lava ia. Fu'a provide_context i le isi itu, o le a sasaa atu finauga faaopoopo i le galuega tauave, lea o le a tatou aoina ma le faaeteete faaaogaina **context.

Mo le taimi nei, na o le pau lena. O mea na matou maua:

  • aso fou i luga o le upega tafaʻilagi,
  • tasi ma le afa selau galuega o le a faia i se tulaga tutusa (pe afai e faatagaina e le Airflow, Celery faatulagaga ma le gafatia o le server).

Ia, toeitiiti lava maua.

Apache Airflow: Fa'afaigofie le ETL
O ai e fa'apipi'i fa'alagolago?

Ina ia faafaigofieina lenei mea atoa, sa ou faaseseina docker-compose.yml faiga requirements.txt i nodes uma.

Ua leai nei:

Apache Airflow: Fa'afaigofie le ETL

O sikuea lanu efuefu o fa'ata'ita'iga o galuega e fa'agasolo e le fa'atulagaina.

Matou te faʻatali mo sina taimi, o galuega e faʻapipiʻiina e le aufaigaluega:

Apache Airflow: Fa'afaigofie le ETL

O lanu meamata, ioe, ua maeʻa lelei a latou galuega. Reds e le manuia tele.

I le ala, e leai se faila i luga o le matou prod ./dags, e leai se feso'ota'iga i le va o masini - o aso uma e taoto i totonu git i luga o la matou Gitlab, ma Gitlab CI tufatufa faʻafouga i masini pe a tuʻufaʻatasia i totonu master.

O sina mea itiiti e uiga i Fugalaau

A o soli e le aufaigaluega a tatou pacifiers, tatou manatua se isi meafaigaluega e mafai ona faʻaalia mai ia i tatou se mea - Fugalaau.

Le itulau muamua o lo'o iai fa'amatalaga otooto i nodes a tagata faigaluega:

Apache Airflow: Fa'afaigofie le ETL

Le itulau sili ona malosi ma galuega na alu i le galuega:

Apache Airflow: Fa'afaigofie le ETL

Le itulau pito sili ona manaia ma le tulaga o le matou faioloa:

Apache Airflow: Fa'afaigofie le ETL

O le itulau sili ona susulu o loʻo iai faʻataʻitaʻiga tulaga o galuega ma latou taimi faʻatino:

Apache Airflow: Fa'afaigofie le ETL

Matou te utaina le mea o loʻo i lalo o le uta

O lea la, o galuega uma ua maeʻa, e mafai ona e ave ese le manua.

Apache Airflow: Fa'afaigofie le ETL

Ma sa i ai le tele o manuʻa - mo se tasi mafuaaga poʻo se isi. I le tulaga o le faʻaogaina saʻo o le Airflow, o nei sikuea e faʻaalia ai e mautinoa lava e leʻi oʻo mai faʻamatalaga.

E te manaʻomia le matamata i le ogalaau ma toe amata faʻataʻitaʻiga galuega paʻu.

O le kiliki i luga o soʻo se sikuea, o le a matou vaʻai i gaioiga o loʻo avanoa mo i matou:

Apache Airflow: Fa'afaigofie le ETL

E mafai ona e ave ma fa'amama le pa'u. O lona uiga, ua galo ia i tatou ua i ai se mea ua le manuia iina, ma o le galuega lava e tasi o le a alu i le scheduler.

Apache Airflow: Fa'afaigofie le ETL

E manino lava o le faia o lenei mea i le isumu ma sikuea mumu uma e le o se tagata - e le o le mea lea tatou te faʻamoemoeina mai le Airflow. E masani lava, e iai a tatou auupega o le tele o faʻaumatiaga: Browse/Task Instances

Apache Airflow: Fa'afaigofie le ETL

Sei o tatou filifilia mea uma i le taimi e tasi ma toe seti i le zero, kiliki le mea saʻo:

Apache Airflow: Fa'afaigofie le ETL

A maeʻa ona faʻamamā, e faʻapea a matou taxis (ua leva ona latou faʻatali mo le faʻatulagaina e faʻatulagaina i latou):

Apache Airflow: Fa'afaigofie le ETL

So'oga, matau ma isi fesuiaiga

Ua oʻo i le taimi e vaʻai ai i le DAG e sosoo ai, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Pe na faia e tagata uma se fa'afouga lipoti? O ia foi lenei: o loʻo i ai se lisi o punaoa mai le mea e maua ai faʻamatalaga; o loʻo i ai se lisi e tuʻu ai; aua nei galo ona pu pe a tupu mea uma pe malepe (ia, e le o se mea e uiga ia i tatou, leai).

Seʻi o tatou toe suʻesuʻe le faila ma vaʻai i mea fou le manino:

  • from commons.operators import TelegramBotSendMessage - e leai se mea e taofia ai i matou mai le faia o a matou lava tagata faʻatautaia, lea na matou faʻaogaina e ala i le faia o se afifi laʻititi mo le auina atu o feʻau i le Unblocked. (O le a matou talanoa atili e uiga i lenei faʻalapotopotoga i lalo);
  • default_args={} - e mafai e le aso ona tufatufa fa'amatalaga tutusa i ana fa'alapotopotoga uma;
  • to='{{ var.value.all_the_kings_men }}' - fanua to o le a tatou le maua hardcoded, ae dynamically gaosia e faaaoga Jinja ma se fesuiaiga ma se lisi o imeli, lea ou te tuu ma le faaeteete i totonu. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - tulaga mo le amataina o le tagata fa'afoe. I la matou tulaga, o le tusi o le a lele atu i le pule pe a uma ona faʻalagolago manuia;
  • tg_bot_conn_id='tg_main' - finauga conn_id talia ID feso'ota'iga matou te faia i totonu Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - feʻau i Telegram o le a lele ese pe a iai ni galuega pa'ū;
  • task_concurrency=1 - matou te faʻasaina le faʻalauiloaina faʻatasi o le tele o faʻataʻitaʻiga o galuega o le tasi galuega. A leai, o le a tatou maua le faʻalauiloaina faʻatasi o nisi VerticaOperator (vaavaai i le laulau e tasi);
  • report_update >> [email, tg] - uma VerticaOperator fa'atasi i le lafoina o tusi ma fe'au, pei o lenei:
    Apache Airflow: Fa'afaigofie le ETL

    Ae talu ai e eseese tulaga fa'alauiloa a le au fa'asalalau, e na'o le tasi e galue. I le Tree View, o mea uma e foliga mai e itiiti ifo le vaʻaia:
    Apache Airflow: Fa'afaigofie le ETL

O le a ou fai atu ni nai upu e uiga i macros ma a latou uo- fesuiaiga.

Macros o Jinja placeholders e mafai ona suitulaga faʻamatalaga aoga eseese i finauga a le aufaipisinisi. Mo se faʻataʻitaʻiga, pei o lenei:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} o le a fa'alautele atu i mea o lo'o i totonu o le fa'aliliuga fa'amatalaga execution_date i le faatulagaga YYYY-MM-DD: 2020-07-14. O le vaega pito sili ona lelei o le faʻaogaina o fesuiaiga o tala i se faʻataʻitaʻiga faʻapitoa (se sikuea i le Tree View), ma pe a toe amataina, o le a faʻalauteleina le faʻaogaina i tulaga tutusa.

E mafai ona va'aia fa'atatauga fa'atatau e fa'aaoga ai le fa'amau Fa'aliliuina i fa'ata'ita'iga o galuega ta'itasi. O le auala lenei o le galuega i le lafoina o se tusi:

Apache Airflow: Fa'afaigofie le ETL

Ma o lea i le galuega i le auina atu o se savali:

Apache Airflow: Fa'afaigofie le ETL

O lo'o maua se lisi atoa o macros fa'apipi'i mo le lomiga fou o lo'o maua iinei: fa'amatalaga macros

E le gata i lea, faatasi ai ma le fesoasoani a plugins, e mafai ona tatou faʻaalia a tatou lava macros, ae o se isi tala.

I le faaopoopo atu i mea na muai faʻamalamalamaina, e mafai ona tatou suitulaga i tau oa tatou fesuiaiga (ua uma ona ou faʻaogaina i le code i luga). Tatou fatu i totonu Admin/Variables lua mea:

Apache Airflow: Fa'afaigofie le ETL

Mea uma e mafai ona e faʻaaogaina:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

O le tau e mafai ona avea ma scalar, pe mafai foi ona avea ma JSON. I le tulaga o JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

na'o le fa'aoga o le ala i le ki mana'omia: {{ var.json.bot_config.bot.token }}.

O le a ou fai atu moni lava se upu e tasi ma faaali atu se ata e tasi e uiga i sootaga. O mea uma e tulagalua iinei: i luga o le itulau Admin/Connections matou te fatuina se fesoʻotaʻiga, faʻaopoopo a matou logins / passwords ma isi faʻamaufaʻailoga patino iina. Faapei o lea:

Apache Airflow: Fa'afaigofie le ETL

E mafai ona faʻailogaina upu faʻamaonia (sili atu ona maeʻa nai lo le faaletonu), pe mafai foi ona e tuʻua le ituaiga fesoʻotaʻiga (e pei ona ou faia mo tg_main) - o le mea moni o le lisi o ituaiga o loʻo faʻapipiʻiina i Airflow faʻataʻitaʻiga ma e le mafai ona faʻalauteleina e aunoa ma le oʻo i totonu o faʻamaumauga autu (afai faʻafuaseʻi ou te leʻi google se mea, faʻamolemole faʻasaʻo aʻu), ae leai se mea e taofia ai i matou mai le mauaina o faʻatagaga naʻo le igoa.

E mafai foi ona e faia ni feso'ota'iga i le igoa lava e tasi: i lenei tulaga, o le metotia BaseHook.get_connection(), lea e maua ai tatou sootaga i igoa, o le a tuuina atu fa'afuase'i mai le tele o igoa (e sili atu ona talafeagai le faia o Round Robin, ae seʻi o tatou tuʻuina atu i luga o le lotofuatiaifo o le au atinaʻe Airflow).

Fesuia'i ma Feso'ota'iga e mautinoa lava e manaia meafaigaluega, ae e taua le aua ne'i leiloa le paleni: o fea vaega o au tafega e te teuina i totonu o le code lava ia, ma o fea vaega e te tu'uina atu i le Airflow mo le teuina. I le tasi itu, vave suia le tau, mo se faʻataʻitaʻiga, pusa meli, e mafai ona faigofie e ala i le UI. I le isi itu, o le toe foʻi atu lea i le kiliki o le isumu, lea na matou (I) manaʻo e faʻaumatia.

O le galue ma feso'ota'iga o se tasi lea o galuega matau. I se tulaga lautele, Airflow matau o vaega ia mo le faʻafesoʻotaʻi i auaunaga a isi vaega ma faletusi. Faataitaiga, JiraHook o le a tatalaina se tagata fa'atau mo matou e fegalegaleai ma Jira (e mafai ona e fa'agasolo galuega i tua ma luma), ma le fesoasoani a SambaHook e mafai ona e tuleia se faila i le lotoifale i smb-manatu.

Fa'avasega le fa'agaioiga masani

Ma na matou latalata i le tilotilo i le auala na faia ai TelegramBotSendMessage

kote commons/operators.py fa'atasi ma le tagata fa'afoe moni:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

O iinei, pei o isi mea uma i le Airflow, e matua faigofie lava mea uma:

  • Tufaa mai BaseOperator, lea e fa'atino ai ni nai mea fa'apitoa ole Airflow (va'ai i lou taimi paganoa)
  • Fa'ailoa fanua template_fields, lea o le a suʻe ai e Jinja ni macros e faʻatautaia.
  • Fa'atulaga finauga sa'o mo __init__(), seti le faaletonu pe a tatau ai.
  • E lei galo foi ia i matou le amataga o le tuaa.
  • Tatala le matau talafeagai TelegramBotHookmaua mai ai se mea a le tagata o tausia.
  • Metotia ua sui (toe faauigaina). BaseOperator.execute(), lea o le a faʻafefeteina e Airfow pe a oʻo mai le taimi e faʻalauiloa ai le tagata faʻapipiʻi - i totonu o le a matou faʻatinoina ai le gaioiga autu, galo e saini i totonu. (Matou te ulufale i totonu, i le ala, i totonu stdout и stderr - O le a faʻalavelaveina e le ea mea uma, afifi matagofie, faʻaumatia pe a manaʻomia.)

Sei o tatou vaai po o le a le mea ua tatou maua commons/hooks.py. Le vaega muamua o le faila, ma le matau lava ia:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ou te le iloa foi pe o le a le mea e faʻamatalaina iinei, o le a ou matauina mea taua:

  • Matou te mautofi, mafaufau e uiga i finauga - i le tele o tulaga o le a tasi: conn_id;
  • Fa'asili auala masani: Sa fa'atapula'aina a'u get_conn(), lea ou te maua ai le fesoʻotaʻiga laina i le igoa ma naʻo le mauaina o le vaega extra (o se fanua JSON lenei), lea na ou (e tusa ai ma aʻu lava faatonuga!) tuʻuina le Telegram bot faʻailoga: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ou te fatuina se faataitaiga o la matou TelegramBot, tuuina atu i ai se faailoga patino.

Pau lava lena. E mafai ona e mauaina se tagata fa'atau mai se matau e fa'aaoga TelegramBotHook().clent poʻo TelegramBotHook().get_conn().

Ma le vaega lona lua o le faila, lea ou te faia ai se microwrapper mo le Telegram REST API, ina ia aua nei toso tutusa python-telegram-bot mo se tasi auala sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

O le auala saʻo o le faʻaopoopoina uma: TelegramBotSendMessage, TelegramBotHook, TelegramBot - i totonu o le faʻapipiʻi, tuʻu i totonu o se faleoloa lautele, ma tuʻuina atu i le Open Source.

A'o matou su'esu'eina nei mea uma, na mafai ona le manuia a matou lipoti fa'afouga ma lafo mai ia te a'u se fe'au sese i le alalaupapa. O le a ou siaki pe ua sese...

Apache Airflow: Fa'afaigofie le ETL
Sa i ai se mea na gau i la matou taifau! Pe le o le mea ea lena sa tatou faatalitalia? E sa'o lava!

E te sasaa atu?

E te lagona na ou misia se mea? E foliga mai na ia folafola e faʻafeiloaʻi faʻamatalaga mai le SQL Server i Vertica, ona ia ave lea ma alu ese mai le autu, le ulavale!

O lenei faʻalavelave na faʻamoemoeina, naʻo loʻu faʻamalamalamaina o ni faʻamatalaga mo oe. O lea e mafai ona e alu atili.

O le matou fuafuaga lenei:

  1. Fai aso
  2. Fausia galuega
  3. Vaai i le matagofie o mea uma
  4. Tofi numera o vasega e fa'atumu
  5. Maua fa'amatalaga mai le SQL Server
  6. Tu'u fa'amaumauga ile Vertica
  7. Ao mai fuainumera

O lea la, ina ia faʻaleleia uma nei mea, sa ou faia se faʻaopoopoga itiiti i la matou docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

O iina tatou te siitia ai:

  • Vertica e avea ma talimalo dwh fa'atasi ai ma fa'atonuga sili ona leaga,
  • tolu faʻataʻitaʻiga o le SQL Server,
  • matou te faʻatumu faʻamaumauga i le vaega mulimuli ma nisi faʻamatalaga (e leai se mea e vaʻai i totonu mssql_init.py!)

Matou te faʻalauiloaina mea lelei uma ma le fesoasoani a se faʻatonuga sili atu ona faigata nai lo le taimi mulimuli:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Le mea na faia e le matou vavega randomizer, e mafai ona e faʻaogaina le mea Data Profiling/Ad Hoc Query:

Apache Airflow: Fa'afaigofie le ETL
O le mea autu e le o le faʻaalia i tagata suʻesuʻe

fa'amatala auiliili ETL sauniga Ou te le faia, o mea uma e le taua iina: matou te faia se faʻavae, o loʻo i ai se faʻailoga i totonu, matou te afifiina mea uma i se pule faʻamatalaga, ma o lenei matou te faia lenei mea:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Ua oo mai le taimi aoina a matou faʻamatalaga mai a matou laulau e tasi ma le afa selau. Sei o tatou faia lenei mea ma le fesoasoani a laina sili ona le lelei:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Faatasi ai ma le fesoasoani a se matau matou te maua mai le Airflow pymssql-feso'ota'i
  2. Sei o tatou sui se tapulaʻa i le tulaga o se aso i totonu o le talosaga - o le a lafoina i totonu o le galuega e le masini mamanu.
  3. Fafaga le matou talosaga pandaso ai na te mauaina i tatou DataFrame - o le a aoga ia i tatou i le lumanaʻi.

O lo'o ou fa'aogaina le suitulaga {dt} nai lo se parakalafa talosaga %s e le ona o aʻu o se Pinocchio leaga, ae ona pandas le mafai ona taulimaina pymssql ma se'e le mea mulimuli params: Liste ui ina mana'o moni lava o ia tuple.
Ia maitauina foi o le tagata atiae pymssql na filifili e le toe lagolagoina o ia, ma ua oo i le taimi e alu ese ai pyodbc.

Sei o tatou vaʻai pe o le a le mea na faʻapipiʻiina e le Airflow finauga oa tatou galuega i:

Apache Airflow: Fa'afaigofie le ETL

Afai e leai se faʻamatalaga, e leai se aoga e faʻaauau ai. Ae e ese foi le mafaufau i le faatumuina o le manuia. Ae e le o se mea sese lea. A-ah-ah, o le a le mea e fai?! Ma o le mea lenei:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException ta'u atu i le Airflow e leai ni mea sese, ae matou te faamisi le galuega. O le a leai se sikuea lanu meamata pe mumu, ae piniki.

Se'i togi a tatou fa'amaumauga tele koluma:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

O le:

  • O le database lea na matou maua mai ai poloaiga,
  • ID o le matou sauniga lolovaia (o le a ese mo galuega uma),
  • O se hash mai le punavai ma le faʻatonu ID - ina ia i totonu o le faʻamaumauga mulimuli (lea e sasaa ai mea uma i totonu o le laulau e tasi) matou te maua se ID faʻatonu tulaga ese.

O loʻo tumau pea le laasaga mulimuli: sasaa mea uma i Vertica. Ma, o le mea e ese ai, o se tasi o auala sili ona mataʻina ma lelei e fai ai lenei mea e ala i le CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. O loʻo matou faia se tali faʻapitoa StringIO.
  2. pandas o le a tuu atu ma le agalelei a tatou DataFrame i le pepa CSV-laina.
  3. Tatala se feso'ota'iga i le matou Vertica e sili ona fiafia i ai ma se matau.
  4. Ma o lenei ma le fesoasoani copy() lafo sa'o a matou fa'amatalaga i Vertika!

O le a matou ave mai le avetaavale pe fia laina na faʻatumu, ma taʻu atu i le pule o le vasega o loʻo lelei mea uma:

session.loaded_rows = cursor.rowcount
session.successful = True

Pau lava lena.

I luga o le faʻatau, matou te fatuina ma le lima le ipu faʻatatau. O iinei na ou faʻatagaina ai aʻu lava i se tamai masini:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

O lo'o ou fa'aaogaina VerticaOperator() Ou te fatuina se faʻamaumauga faʻamaumauga ma se laulau (pe afai latou te leʻi i ai, ioe). O le mea autu o le faʻatulagaina saʻo o faʻalagolago:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Faʻaopoopo i luga

- Ia, - fai mai le tamai isumu, - e le o lea, i le taimi nei
Ua e talitonu o aʻu o le manu sili ona mataʻutia i le vaomatua?

Julia Donaldson, Le Gruffalo

Ou te manatu afai e i ai se tauvaga ma aʻu uo: o ai o le a vave faia ma faʻalauiloa se faiga ETL mai le amataga: latou ma a latou SSIS ma se isumu ma aʻu ma Airflow ... Ona matou faʻatusatusaina lea o le faigofie o le tausiga ... Oka, ou te manatu o le a e malilie o le a ou sasaina i latou i luma uma!

Afai e sili atu le ogaoga, o Apache Airflow - e ala i le faʻamatalaina o faiga i le tulaga o le polokalame code - na faia laʻu galuega tele sili atu le mafanafana ma le fiafia.

O lona faʻalauteleina e le faʻatapulaʻaina, e le gata i le faʻaogaina o plug-ins ma le predisposition i scalability, e te maua ai le avanoa e faʻaoga ai le Airflow i toetoe lava o soʻo se eria: e oʻo lava i le taamilosaga atoa o le aoina, saunia ma le faʻaogaina o faʻamatalaga, e oʻo lava i le faʻalauiloaina o rockets (i Mars, o vasega).

Vaega mulimuli, fa'amatalaga ma fa'amatalaga

Le salu ua matou aoina mo oe

  • start_date. Ioe, o lea ua leva ona avea ma meme i le lotoifale. E ala i le finauga autu a Doug start_date pasi uma. I se faapuupuuga, afai e te faʻamaonia i totonu start_date aso nei, ma schedule_interval - e tasi le aso, ona amata loa lea o le DAG taeao e leai se taimi muamua.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Ma e le toe iai ni fa'afitauli.

    O lo'o i ai se isi fa'alavelave fa'alavelave e feso'ota'i ma ia: Task is missing the start_date parameter, lea e masani ona taʻu mai ai ua galo ia te oe e fusifusia i le operator dag.

  • O mea uma i luga ole masini e tasi. Ioe, ma faʻavae (Airflow lava ia ma lo tatou faʻapipiʻiina), ma se upega tafaʻilagi, ma se faʻatulagaina, ma tagata faigaluega. Ma sa aoga foi. Ae i le aluga o taimi, o le numera o galuega mo auaunaga na faʻatupulaia, ma ina ua amata ona tali atu PostgreSQL i le faasino igoa i le 20 s nai lo le 5 ms, na matou ave ma ave ese.
  • LocalExecutor. Ioe, o loo tatou nonofo pea i luga, ma ua uma ona tatou oo i le faatausiusiuga o le to e le gata. LocalExecutor ua lava mo i matou i le taimi nei, ae o le taimi nei e faʻalautele ma le itiiti ifo ma le tasi le tagata faigaluega, ma e tatau ona matou galulue malosi e siitia atu i CeleryExecutor. Ma i le manatu i le mea moni e mafai ona e galue i luga o le masini e tasi, e leai se mea e taofia oe mai le faʻaaogaina o le Seleri e oʻo lava i luga o se 'auʻaunaga, lea "ioe, e le mafai lava ona alu i le gaosiga, faʻamaoni!"
  • Le fa'aaogaina mea faigaluega faufale:
    • fesootaiga e teu ai fa'amatalaga tautua,
    • SLA Misi e tali atu i galuega e leʻi manuia i le taimi,
    • xcom mo fefaʻatauaʻiga metadata (sa ou fai atu metafaʻamatalaga!) i le va o galuega.
  • Fa'aleagaina meli. Ia, o le a sa'u tala e fai atu? Na fa'atūina fa'aaliga mo le toe faia uma o galuega pa'u. O la'u galuega Gmail ua i ai> 90k imeli mai Airflow, ma o le upega o meli meli e musu e piki ma tape le sili atu i le 100 i le taimi.

E tele fa'alavelave: Apache Airflow Pitfais

Tele mea faigaluega masini

Ina ia mafai ona tatou galulue atili ma o tatou ulu ae le o tatou lima, ua saunia e le Airflow mo i tatou lenei:

  • malolo API - o loʻo i ai pea le tulaga o le Experimental, lea e le taofia ai o ia mai le galue. Faatasi ai ma ia, e le gata e mafai ona e mauaina faʻamatalaga e uiga i aso ma galuega, ae faʻapea foi ona taofi / amata se aso, fatuina se DAG Run poʻo se vaitaele.
  • CLI - tele meafaigaluega o loʻo maua i le laina faʻatonu e le naʻo le faʻaogaina e ala i le WebUI, ae e masani ona toesea. Faataitaiga:
    • backfill mana'omia e toe amata ai galuega.
      Mo se faʻataʻitaʻiga, na o mai le au suʻesuʻe ma fai mai: "Ma o oe, uo, e i ai le valea i faʻamaumauga mai ia Ianuari 1 i le 13! Fa'alelei, fa'aleleia, fa'aleleia, fa'alelei!" Ma o oe o se mea fiafia:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Auaunaga faavae: initdb, resetdb, upgradedb, checkdb.
    • run, lea e mafai ai e oe ona faʻatino se tasi faʻataʻitaʻiga galuega, ma e oʻo lava i togi i luga o faʻalagolago uma. E le gata i lea, e mafai ona e faʻaogaina e ala i LocalExecutor, tusa lava pe iai sau fuifui Seleri.
    • E fai tutusa lava test, na'o fa'avae fo'i e lē tusia ai se mea.
    • connections fa'ataga le fa'atupu tele o feso'ota'iga mai le atigi.
  • python api - o se auala sili ona faigata e fegalegaleai ai, lea e faʻamoemoe mo plugins, ae le o le faʻafefe i totonu ma lima laiti. Ae o ai e taofia i tatou mai le o atu i ai /home/airflow/dags, tamoe ipython ma amata ona fai mea leaga? E mafai, mo se faʻataʻitaʻiga, auina atu i fafo soʻotaga uma ma le code lea:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Feso'ota'i ile fa'amaumauga ole Airflow. Ou te le fautuaina e tusi i ai, ae o le mauaina o galuega mo tulaga eseese e mafai ona sili atu le vave ma faigofie nai lo soʻo se API.

    Seʻi tatou fai atu e le o a tatou galuega uma e le mafai, ae e mafai ona paʻu i nisi taimi, ma e masani lava. Ae o nai poloka poloka ua leva ona masalomia, ma e tatau ona siaki.

    Fa'aeteete SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

mau

Ma o le mea moni, o sootaga muamua e sefulu mai le tuʻuina atu o Google o mea o loʻo i totonu o le Airflow folder mai aʻu faʻailoga.

Ma o fesoʻotaʻiga na faʻaaogaina i le tusiga:

puna: www.habr.com