Apache Airflow: Ho etsa hore ETL e be bonolo

Lumela, ke Dmitry Logvinenko - Moenjiniere oa Boitsebiso oa Lefapha la Litlhahlobo la sehlopha sa Vezet sa lik'hamphani.

Ke tla u joetsa ka sesebelisoa se setle sa ho nts'etsapele lits'ebetso tsa ETL - Apache Airflow. Empa Airflow e na le mefuta e mengata ebile e na le mefuta e mengata hoo o lokelang ho e shebisisa le haeba o sa kenelle phallong ea data, empa o na le tlhoko ea ho qala ts'ebetso efe kapa efe le ho beha leihlo ts'ebetso ea bona.

'Me e, nke ke ka bolela feela, empa hape ke bontša: lenaneo le na le khoutu e ngata, li-screenshots le likhothaletso.

Apache Airflow: Ho etsa hore ETL e be bonolo
Seo u tloaetseng ho se bona ha u google lentsoe Airflow / Wikimedia Commons

Lethathamo la tse kahare

Selelekela

Apache Airflow e tšoana le Django:

  • e ngotsoeng ka python
  • ho na le sehlopha se seholo sa admin,
  • e atolohang ka ho sa feleng

- e molemo feela, 'me e entsoe ka merero e fapaneng ka ho feletseng, e leng (joalokaha e ngotsoe pele ho kata):

  • ho sebetsa le ho beha leihlo mesebetsi ka palo e se nang moeli ea mechini (joalo ka Celery / Kubernetes le letsoalo la hau le tla u lumella)
  • ka tlhahiso e matla ea phallo ea mosebetsi ho tloha ho bonolo haholo ho ngola le ho utloisisa khoutu ea Python
  • le bokhoni ba ho hokahanya li-database le li-API le tse ling ho sebelisa likarolo tse lokiselitsoeng le li-plugins tse entsoeng lapeng (tse bonolo haholo).

Re sebelisa Apache Airflow ka tsela ena:

  • re bokella lintlha ho tsoa mehloling e fapaneng (maemo a mangata a SQL Server le PostgreSQL, li-API tse fapaneng tse nang le metrics ea kopo, esita le 1C) ho DWH le ODS (re na le Vertica le Clickhouse).
  • e tsoetse pele hakae cron, e qalang mekhoa ea ho kopanya lintlha ho ODS, hape e hlokomela tlhokomelo ea bona.

Ho fihlela morao tjena, litlhoko tsa rona li ne li koaheloa ke seva se le seng se senyenyane se nang le li-cores tsa 32 le 50 GB ea RAM. Ho Airflow, sena se sebetsa:

  • ho feta 200 likotoana (ha e le hantle, mosebetsi oa ho sebetsa, oo re kentseng mesebetsi ho oona),
  • ho e 'ngoe le e 'ngoe ka karolelano 70 mesebetsi,
  • molemo ona o qala (hape ka karolelano) hang ka hora.

Mme mabapi le hore na re atolositse joang, ke tla ngola ka tlase, empa joale ha re hlaloseng über-problem eo re tla e rarolla:

Ho na le mehloli e meraro ea SQL Servers, e 'ngoe le e' ngoe e na le li-database tse 50 - mehlala ea morero o le mong, ka ho latellana, li na le sebopeho se tšoanang (hoo e batlang e le hohle, mua-ha-ha), ho bolelang hore e mong le e mong o na le tafole ea litaelo (ka lehlohonolo, tafole e nang le eona. lebitso le ka sutumetsoa khoebong efe kapa efe). Re nka lintlha ka ho eketsa libaka tsa lits'ebeletso (seva ea mohloli, database ea mohloli, ID ea mosebetsi oa ETL) ebe re li lahlela ka mokhoa o se nang kelello, re re, Vertica.

A re tsamaee!

Karolo e ka sehloohong, e sebetsang (le khopolo-taba e nyane)

Hobaneng rona (le uena)

Ha lifate li ne li le khōlō 'me ke ne ke le bonolo SQL-schik lebenkeleng le le leng la Serussia, re qhekelletse lits'ebetso tsa ETL ka phallo ea data re sebelisa lisebelisoa tse peli tse fumanehang ho rona:

  • Setsi sa Matla a Informatica - Sistimi e hasaneng haholo, e hlahisang litholoana haholo, e nang le lisebelisoa tsa eona, phetolelo ea eona. Ke sebelisitse Molimo hanela 1% ea bokhoni ba eona. Hobaneng? Pele ho tsohle, sebopeho sena, kae-kae ho tloha lilemong tsa bo-380, se ile sa re hatella kelellong. Taba ea bobeli, contraption ena e etselitsoe lits'ebetso tse majabajaba haholo, tšebeliso e mpe ea karolo le maqheka a mang a bohlokoa haholo a khoebo. Mabapi le seo e se bitsang, joalo ka lepheo la Airbus AXNUMX / selemo, re ke ke ra bua letho.

    Hlokomela, skrini se ka lematsa batho ba ka tlase ho lilemo tse 30 hanyane

    Apache Airflow: Ho etsa hore ETL e be bonolo

  • SQL Server Integration Server - re sebelisitse comrade ena phallong ea rona ea projeke. Ha e le hantle, re se re ntse re sebelisa SQL Server, 'me e ka ba ntho e sa utloahaleng ho se sebelise lisebelisoa tsa eona tsa ETL. Ntho e 'ngoe le e' ngoe e ho eona e ntle: ka bobeli sebopeho se setle, 'me tsoelo-pele e tlaleha ... Empa sena ha se lebaka leo ka lona re ratang lihlahisoa tsa software, oh, eseng bakeng sa sena. E fetolele dtsx (e leng XML e nang le li-node tse hokahaneng ho bolokoa) re ka khona, empa ntlha ke efe? Ho thoe'ng ka ho etsa sephutheloana sa mosebetsi se tla hula litafole tse makholo ho tloha ho seva se seng ho ea ho se seng? E, ke lekholo lefe, monoana oa hau oa index o tla oa ho tloha likotoana tse mashome a mabeli, o tobetsa konopo ea mouse. Empa ehlile e shebahala e le fesheneng ho feta:

    Apache Airflow: Ho etsa hore ETL e be bonolo

Ka sebele re ile ra batla litsela tsa ho tsoa. Leha ho le joalo hoo e ka bang Ke ile ka tla ho jenereithara ea sephutheloana sa SSIS e ingoletseng ...

... mme yaba ke fumana mosebetsi o motjha. Mme Apache Airflow e ile ea nkotla ho eona.

Ha ke fumana hore litlhaloso tsa ts'ebetso ea ETL ke khoutu e bonolo ea Python, ha kea ka ka tantša ka thabo. Ke kamoo melapo ea data e fetotsoeng le ho fapana, 'me ho tšolloa ha litafole tse nang le sebopeho se le seng ho tloha ho li-database tse makholo ho ea ho sepheo se le seng e bile taba ea khoutu ea Python skrineng se le seng le halofo kapa tse peli tse 13.

Ho kopanya sehlopha

A re se keng ra hlophisa kereche ka ho feletseng, 'me re se ke ra bua ka lintho tse hlakileng ka ho feletseng mona, joalo ka ho kenya Airflow, database ea hau e khethiloeng, Celery le linyeoe tse ling tse hlalositsoeng ka li-docks.

E le hore re ka qala liteko hang hang, ke ile ka thathamisa docker-compose.yml moo:

  • Ha e le hantle re phahamise Ho foka ha moea: Sehlophisi, Webserver. Palesa e tla boela e bilika moo ho lekola mesebetsi ea Celery (hobane e se e kenelletse ho eona apache/airflow:1.10.10-python3.7, empa ha re tsotelle)
  • PostgreSQL, moo Airflow e tla ngola tlhahisoleseding ea eona ea tšebeletso (data ea kemiso, lipalo-palo tsa ts'ebetso, joalo-joalo), le Celery e tla tšoaea mesebetsi e phethiloeng;
  • Redis, e tla sebetsa e le morekisi oa mesebetsi bakeng sa Celery;
  • Mosebetsi oa celery, e tla kengoa tšebetsong ka kotloloho ea mesebetsi.
  • Ho foldara ./dags re tla eketsa lifaele tsa rona ka tlhaloso ea dags. Li tla nkuoa ka ntsintsi, kahoo ha ho na lebaka la ho ts'oara mokoloko oohle ka mor'a ho thimola ho hong le ho hong.

Libakeng tse ling, khoutu ea mehlala ha e bontšoe ka ho feletseng (e le hore e se ke ea kopanya mongolo), empa kae-kae e fetotsoe ts'ebetsong. Mehlala e felletseng ea khoutu ea ho sebetsa e ka fumanoa sebakeng sa polokelo https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Lintlha:

  • Kopanong ea moqapi, ke ne ke itšetlehile haholo ka setšoantšo se tsebahalang puckel/docker-airflow - etsa bonnete ba ho e hlahloba. Mohlomong ha u hloke letho bophelong ba hau.
  • Litlhophiso tsohle tsa Airflow ha li fumanehe feela ka airflow.cfg, empa hape le ka mefuta-futa ea tikoloho (ka lebaka la bahlahisi), eo ke ileng ka e sebelisa hampe.
  • Ka tlhaho, ha e-so lokisetsoe tlhahiso: ha kea ka ka beha ho otla ha pelo ka boomo lijaneng, ha kea ka ka khathatseha ka ts'ireletso. Empa ke entse bonyane bo loketseng ba liteko tsa rona.
  • Lemoha hore:
    • Foldara ea dag e tlameha ho fumaneha ho bahlophisi le basebetsi.
    • Hoa tšoana le ho lilaebrari tsohle tsa mokha oa boraro - kaofela li tlameha ho kenngoa mochining o nang le kemiso le basebetsi.

Joale, ho bonolo:

$ docker-compose up --scale worker=3

Ka mor'a hore ntho e 'ngoe le e' ngoe e phahame, u ka sheba marang-rang a marang-rang:

Lintlha tsa motheo

Haeba u ne u sa utloisise letho ho "dags" tsena kaofela, buka e khuts'oane ke ena:

  • Scheduler - malome oa bohlokoa ka ho fetisisa ho Airflow, ho laola hore liroboto li sebetse ka thata, eseng motho: ho shebella kemiso, ho ntlafatsa li-dags, ho qala mesebetsi.

    Ka kakaretso, liphetolelong tsa khale, o ne a e-na le mathata a mohopolo (che, eseng amnesia, empa ho lutla) mme parameter ea lefa e bile ea lula ho configs. run_duration - nako ea eona ea ho qala bocha. Empa joale tsohle li hantle.

  • DAG (aka "dag") - "directed acyclic graph", empa tlhaloso e joalo e tla bolella batho ba fokolang, empa ha e le hantle ke setshelo sa mesebetsi e sebetsanang le e mong (sheba ka tlase) kapa analogue ea Package ho SSIS le Workflow ho Informatica .

    Ntle le li-dags, ho kanna ha ba le li-subdags, empa mohlomong re ke ke ra fihla ho tsona.

  • DAG Matha - dag e qalileng, e abetsoeng ea eona execution_date. Li-dagrans tsa dag e tšoanang li ka sebetsa ka mokhoa o ts'oanang (haeba u entse hore mesebetsi ea hau e se be matla, ehlile).
  • Operator ke likaroloana tsa khoutu tse ikarabellang bakeng sa ho etsa ketso e itseng. Ho na le mefuta e meraro ea basebelisi:
    • nke khato e molemojoalo ka seo re se ratang PythonOperator, e ka phethahatsang khoutu efe kapa efe (e nepahetseng) ea Python;
    • phetiso, e tsamaisang data ho tloha sebakeng se seng ho ea ho se seng, ho thoe, MsSqlToHiveTransfer;
    • kutlo ka lehlakoreng le leng, e tla u lumella ho itšoara kapa ho fokotsa ts'ebetso e tsoelang pele ea dag ho fihlela ketsahalo e etsahala. HttpSensor e ka hula ntlha e boletsoeng, 'me ha karabo e lakatsehang e emetse, qala ho fetisa GoogleCloudStorageToS3Operator. Kelello e batlang ho tseba lintho e tla botsa: “Hobane’ng? Ntle le moo, o ka pheta-pheta hantle ho opareitara! 'Me joale, e le hore u se ke ua koala letamo la mesebetsi le basebetsi ba emisitsoeng. Sensor e qala, e hlahloba le ho shoa pele ho teko e latelang.
  • Mosebetsi - basebetsi ba phatlalalitsoeng, ho sa tsotelehe hore na ke ba mofuta ofe, 'me ba khomaretsoe ho dag ba phahamisetsoa boemong ba mosebetsi.
  • mohlala oa mosebetsi - ha moralo oa kakaretso a etsa qeto ea hore ke nako ea ho romela mesebetsi ntoeng ho basebetsi-basebetsi (hantle feela, haeba re sebelisa LocalExecutor kapa ho node e hole tabeng ya CeleryExecutor), e abela moelelo oa taba (ke hore, sete ea mefuta-futa ea li-parameter), e holisa litaelo kapa litempele tsa lipotso, 'me e li kopanya.

Re etsa mesebetsi

Taba ea pele, a re hlaloseng moralo o akaretsang oa doug ea rona, ebe joale re tla kenella ka botlalo haholoanyane, hobane re sebelisa litharollo tse ling tse sa reng letho.

Kahoo, ka mokhoa oa eona o bonolo, dag e joalo e tla shebahala tjena:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Ha re e utloisise:

  • Ntlha ea pele, re kenya libs tse hlokahalang le ntho enngwe;
  • sql_server_ds - sena ke List[namedtuple[str, str]] ka mabitso a likhokahano tse tsoang ho Airflow Connections le li-database tseo re tla nka poleiti ea rona ho tsona;
  • dag - phatlalatso ea dag ea rona, e tlamehang ho ba teng globals(), ho seng joalo Airflow e ke ke ea e fumana. Doug o boetse o hloka ho re:
    • lebitso la hae ke mang orders - lebitso lena le tla hlaha sehokelong sa webo,
    • hore o tla sebetsa ho tloha har'a mp'a bosiu ka la borobeli la Phupu;
    • 'me e lokela ho sebetsa, hoo e ka bang lihora tse ling le tse ling tse 6 (bakeng sa banna ba thata mona sebakeng sa timedelta() dumelletsoe cron- mola 0 0 0/6 ? * * *, bakeng sa batho ba fokolang haholo - polelo e kang @daily);
  • workflow() e tla etsa mosebetsi oa mantlha, empa eseng hona joale. Hajoale, re tla beha taba ea rona ho log.
  • 'Me joale boselamose bo bonolo ba ho theha mesebetsi:
    • re matha ka mehloli ea rona;
    • qala PythonOperator, e tla phethahatsa dummy ea rona workflow(). U se ke ua lebala ho hlakisa lebitso le ikhethang (ka har'a dag) la mosebetsi le ho tlama dag ka boeona. Folakha provide_context ka lehlakoreng le leng, e tla tšela likhang tse eketsehileng tšebetsong, eo re tla e bokella ka hloko re e sebelisa **context.

Hajoale, ke phetho. Seo re nang le sona:

  • dag e ncha sebakeng sa marang-rang,
  • mesebetsi e lekholo le halofo e tla etsoa ka mokhoa o ts'oanang (haeba Airflow, litlhophiso tsa Celery le bokhoni ba seva li lumella).

Be, ke batla ke e fumane.

Apache Airflow: Ho etsa hore ETL e be bonolo
Ke mang ea tla kenya litšepe?

Ho nolofatsa taba ena kaofela, ke ile ka kenella docker-compose.yml ho sebetsa requirements.txt dintlheng tsohle.

Joale e felile:

Apache Airflow: Ho etsa hore ETL e be bonolo

Grey squares ke liketsahalo tsa mosebetsi tse sebetsitsoeng ke mohlophisi.

Re ema hanyane, mesebetsi e hlophisoa ke basebetsi:

Apache Airflow: Ho etsa hore ETL e be bonolo

Ba botala, ehlile, ba phethile mosebetsi oa bona ka katleho. Bofubelu ha bo atlehe haholo.

Ka tsela, ha ho na foldara ho prod ea rona ./dags, ha ho na khokahano lipakeng tsa mechini - li-dags kaofela li robetse git ho Gitlab ea rona, 'me Gitlab CI e fana ka lintlafatso ho mechini ha e kopanya master.

Hanyenyane ka Flower

Ha basebetsi ba ntse ba otla li-pacifiers tsa rona, a re hopoleng sesebelisoa se seng se ka re bontšang ho hong - Flower.

Leqephe la pele le nang le kakaretso ea lintlha tse mabapi le libaka tsa basebetsi:

Apache Airflow: Ho etsa hore ETL e be bonolo

Leqephe le matla haholo le nang le mesebetsi e ileng ea sebetsa:

Apache Airflow: Ho etsa hore ETL e be bonolo

Leqephe le tenang haholo le nang le boemo ba morekisi oa rona:

Apache Airflow: Ho etsa hore ETL e be bonolo

Leqephe le khanyang ka ho fetesisa le na le li-graph tsa boemo ba mosebetsi le nako ea tsona ea ts'ebetso:

Apache Airflow: Ho etsa hore ETL e be bonolo

Re laela tse ka tlaase

Kahoo, mesebetsi eohle e sebelitse, u ka nka ba lemetseng.

Apache Airflow: Ho etsa hore ETL e be bonolo

'Me ho ne ho e-na le ba bangata ba lemetseng - ka lebaka le leng kapa le leng. Tabeng ea tšebeliso e nepahetseng ea Airflow, libaka tsena li bontša hore data ha e ea fihla.

U hloka ho shebella logi ebe u qala hape maemo a oeleng a mosebetsi.

Ka ho tobetsa sekwere sefe kapa sefe, re tla bona liketso tse fumanehang ho rona:

Apache Airflow: Ho etsa hore ETL e be bonolo

U ka nka le ho etsa Hlakola ba oeleng. Ke hore, re lebala hore ho na le ntho e hlōlehileng moo, 'me mosebetsi o tšoanang oa mohlala o tla ea ho scheduler.

Apache Airflow: Ho etsa hore ETL e be bonolo

Ho hlakile hore ho etsa sena ka mouse ka lisekoere tsohle tse khubelu ha ho na botho - sena ha se seo re se lebeletseng ho Airflow. Ka tlhaho, re na le libetsa tse bolaeang batho ba bangata: Browse/Task Instances

Apache Airflow: Ho etsa hore ETL e be bonolo

Ha re khethe ntho e 'ngoe le e' ngoe ka nako e le 'ngoe ebe re e khutlisetsa ho zero, tobetsa ntho e nepahetseng:

Apache Airflow: Ho etsa hore ETL e be bonolo

Kamora ho hloekisa, litekesi tsa rona li shebahala tjena (li se li emetse hore mohlophisi a li hlophise):

Apache Airflow: Ho etsa hore ETL e be bonolo

Khokahano, lihakisi le lintho tse ling tse fapaneng

Ke nako ea ho sheba DAG e latelang, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Na bohle ba kile ba etsa tlaleho ea ntlafatso? Enoa ke eena hape: ho na le lethathamo la mehloli ea moo u ka fumanang data teng; ho na le lethathamo moo u ka behang; u se ke ua lebala ho honk ha ntho e 'ngoe le e' ngoe e etsahetse kapa e robehile (hantle, sena ha se mabapi le rona, che).

Ha re hlahlobeng faele hape 'me re shebe lintho tse ncha tse sa hlakang:

  • from commons.operators import TelegramBotSendMessage - ha ho letho le re thibelang ho iketsetsa li-operator tsa rona, tseo re ileng ra li sebelisa ka ho etsa wrapper e nyenyane bakeng sa ho romela melaetsa ho Unblocked. (Re tla bua haholoanyane ka opereishene ena ka tlase);
  • default_args={} - dag e ka fana ka likhang tse tšoanang ho basebelisi bohle ba eona;
  • to='{{ var.value.all_the_kings_men }}' - lebala to re ke ke ra ba le hardcode, empa re entsoe ka matla re sebelisa Jinja le mofuta o fapaneng o nang le lethathamo la li-imeile, tseo ke li kentseng ka hloko. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — boemo ba ho qala opareitara. Tabeng ea rona, lengolo le tla fofela ho baokameli ha feela litšepe tsohle li sebetsa ka katleho;
  • tg_bot_conn_id='tg_main' - likhang conn_id amohela li-ID tsa khokahano eo re e etsang Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - melaetsa ea Telegraph e tla fofa feela haeba ho na le mesebetsi e oeleng;
  • task_concurrency=1 - re thibela ho qalisoa ka nako e le 'ngoe ha liketsahalo tse' maloa tsa mosebetsi o le mong. Ho seng joalo, re tla fumana ts'ebetso e ts'oanang ea tse 'maloa VerticaOperator (ho sheba tafole e le nngwe);
  • report_update >> [email, tg] - tsohle VerticaOperator kopana ka ho romella mangolo le melaetsa, joalo ka:
    Apache Airflow: Ho etsa hore ETL e be bonolo

    Empa kaha li-notifier li na le maemo a fapaneng a ho qala, ke a le mong feela ea tla sebetsa. Ho Tree View, ntho e 'ngoe le e' ngoe e shebahala e fokola haholo:
    Apache Airflow: Ho etsa hore ETL e be bonolo

Ke tla bua mantsoe a seng makae ka makhro le metsoalle ea bona - mefuta e fapaneng.

Macros ke litšoantšiso tsa Jinja tse ka kenyang lintlha tse fapaneng tsa bohlokoa sebakeng sa likhang tsa opareitara. Ka mohlala, joalo ka:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} e tla atoloha ho ea ho litaba tse fapaneng tsa moelelo oa taba execution_date ka setšoantšo YYYY-MM-DD: 2020-07-14. Karolo e molemohali ke hore maemo a feto-fetohang a khokhothetsoe ketsahalong e itseng (sekwere ho Tree View), 'me ha e qalisoa bocha, litšoantšiso li tla hola ho fihlela boleng bo tšoanang.

Lintlha tse fanoeng li ka bonoa ho sebelisoa konopo ea Rendered ketsahalong ka 'ngoe ea mosebetsi. Ke kamoo mosebetsi oa ho romella lengolo:

Apache Airflow: Ho etsa hore ETL e be bonolo

'Me kahoo mosebetsing oa ho romella molaetsa:

Apache Airflow: Ho etsa hore ETL e be bonolo

Lenane le felletseng la li-macros tse hahelletsoeng bakeng sa mofuta oa morao-rao o fumanehang le fumaneha mona: litšupiso tsa macros

Ho feta moo, ka thuso ea li-plugins, re ka phatlalatsa li-macros tsa rona, empa ke pale e 'ngoe.

Ntle le lintho tse boletsoeng esale pele, re ka nka sebaka sa boleng ba mefuta ea rona (ke se ke sebelisitse sena khoutu e kaholimo). Ha re theheng Admin/Variables lintho tse peli:

Apache Airflow: Ho etsa hore ETL e be bonolo

Tsohle tseo u ka li sebelisang:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Boleng e ka ba scalar, kapa hape e ka ba JSON. Tabeng ea JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

sebelisa feela tsela e eang ho senotlolo seo u se batlang: {{ var.json.bot_config.bot.token }}.

Ke tla bua lentsoe le le leng 'me ke bontše skrini se le seng ka maqhama. Ntho e ngoe le e ngoe ke ea mantlha mona: leqepheng Admin/Connections re theha khokahano, re eketsa li-logins / li-password le li-parameter tse ikhethileng moo. Rata sena:

Apache Airflow: Ho etsa hore ETL e be bonolo

Li-password li ka ngolisoa (ka botlalo ho feta tsa kamehla), kapa u ka siea mofuta oa khokahano (joalo ka ha ke entse tg_main) - 'nete ke hore lethathamo la mefuta e thata ka mefuta ea Airflow' me e ke ke ea atolosoa ntle le ho kena mehloling ea mohloli (haeba ka tšohanyetso ha kea ka ka google ntho e itseng, ke kopa u lokise), empa ha ho letho le tla re thibela ho fumana likoloto feela ka lebitso.

U ka boela ua etsa likamano tse 'maloa ka lebitso le le leng: tabeng ena, mokhoa BaseHook.get_connection(), e re fang likhokahano ka mabitso, e tla fana ka ka tšohanyetso ho tsoa ho mabitso a 'maloa (e ka ba ntho e utloahalang ho etsa Round Robin, empa a re e tloheleng matsoalong a baetsi ba Airflow).

Liphetoho le likhokahano ka sebele ke lisebelisoa tse pholileng, empa ke habohlokoa hore u se ke ua lahleheloa ke tekanyo: ke likarolo life tsa phallo eo u li bolokang ka khoutu ka boeona, le hore na u fana ka likarolo life ho Airflow bakeng sa polokelo. Ka lehlakoreng le leng, ho ka ba bonolo ho fetola boleng ka potlako, mohlala, lebokose la poso, ka UI. Ka lehlakoreng le leng, sena e ntse e le ho khutlela ho toeba ho tobetsa, eo re (I) re neng re batla ho e tlosa.

Ho sebetsana le likhokahano ke o mong oa mesebetsi likhoka. Ka kakaretso, li-hook tsa Airflow ke lintlha tsa ho e hokahanya le lits'ebeletso le lilaeborari tsa mokha oa boraro. Mohlala, JiraHook e tla bula moreki hore re sebelisane le Jira (o ka tsamaisa mesebetsi pele le morao), le ka thuso ea SambaHook o ka sutumelletsa faele ea lehae ho smb- ntlha.

Ho hlahlobisisa opareitara e ikhethileng

Mme re ile ra atamela ho sheba hore na e etswa jwang TelegramBotSendMessage

khoutu commons/operators.py le mosebeletsi oa sebele:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Mona, joalo ka ntho e ngoe le e ngoe ho Airflow, tsohle li bonolo haholo:

  • E futsitsoe ho tsoa ho BaseOperator, e sebelisang lintho tse 'maloa tse ikhethileng tsa Airflow (sheba boikhathollo ba hau)
  • Libaka tse boletsoeng template_fields, moo Jinja a tla batla macros ho sebetsa.
  • Lokisitse likhang tse nepahetseng bakeng sa __init__(), beha li-default moo ho hlokahalang.
  • Ha rea ​​lebala ka ho qalisoa ha moholo-holo.
  • E butse hoko e tsamaellanang TelegramBotHooke amohetse ntho ea moreki ho tsoa ho eona.
  • Mokgoa o hlakisitsweng (botjha). BaseOperator.execute(), e leng Airfow e tla sisinyeha ha nako e fihla ea ho qala opereishene - ho eona re tla kenya ts'ebetsong ketso e kholo, re lebala ho kena. (Re kena, ka tsela, hantle feela stdout и stderr - Airflow e tla thibela ntho e 'ngoe le e' ngoe, e e phuthele ka bokhabane, e bolile moo ho hlokahalang.)

A re boneng hore na re na le eng commons/hooks.py. Karolo ea pele ea faele, e nang le hook ka boeona:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ha ke tsebe le hore na nka hlalosa eng mona, ke tla hlokomela lintlha tsa bohlokoa feela:

  • Re rua, nahana ka likhang - hangata e tla ba e le 'ngoe: conn_id;
  • Ho fetisa mekhoa e tloaelehileng: Ke ile ka ipehela meeli get_conn(), eo ho eona ke fumanang liparamente tsa khokahano ka mabitso ebe ke fumana karolo feela extra (ena ke tšimo ea JSON), eo ke (ho ea ka litaelo tsa ka!) Ke behile letšoao la bot la Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ke etsa mohlala oa rona TelegramBot, ho e fa letšoao le khethehileng.

Ke phetho. O ka fumana moreki ho tsoa hook o sebelisa TelegramBotHook().clent kapa TelegramBotHook().get_conn().

Le karolo ea bobeli ea faele, eo ho eona ke etsang microwrapper bakeng sa Telegram REST API, e le hore u se ke ua hula e tšoanang. python-telegram-bot bakeng sa mokhoa o le mong sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Mokhoa o nepahetseng ke ho kopanya tsohle: TelegramBotSendMessage, TelegramBotHook, TelegramBot - ho plugin, kenya polokelo ea sechaba, 'me u e fe Open Source.

Ha re ntse re ithuta tsena tsohle, lintlafatso tsa rona tsa tlaleho li atlehile ho hloleha mme tsa nthomella molaetsa oa phoso mocha. Ke il'o hlahloba ho bona hore na e fosahetse ...

Apache Airflow: Ho etsa hore ETL e be bonolo
Ho na le ntho e ileng ea robeha ka har'a ntlo ea rona! Na hase seo re neng re se lebeletse? Ehlile!

U tla tšolla?

Na u utloa eka ke hopotse ho hong? Ho bonahala eka o tšepisitse ho fetisetsa data ho tloha SQL Server ho ea Vertica, ebe oe nka 'me a tloha sehloohong, scoundrel!

Bokhopo bona e ne e le boomo, ke ile ka tlameha feela ho u hlalosetsa mantsoe a itseng. Joale u ka tsoela pele.

Morero oa rona e ne e le ona:

  1. Etsa dag
  2. Hlahisa mesebetsi
  3. Bonang kamoo ntho e nngwe le e nngwe e leng ntle kateng
  4. Abela linomoro tsa lenaneo ho tlatsa
  5. Fumana lintlha ho tsoa ho SQL Server
  6. Kenya data ho Vertica
  7. Bokella lipalo-palo

Kahoo, ho etsa sena sohle le ho sebetsa, ke entse tlatsetso e nyane ho rona docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Moo re phahamisa:

  • Vertica joalo ka moamoheli dwh ka li-setting tsa kamehla tsa kamehla,
  • mehlala e meraro ea SQL Server,
  • re tlatsa li-database tsa morao-rao ka lintlha tse ling (ha ho joalo u se ke ua sheba mssql_init.py!)

Re qala tsohle tse ntle ka thuso ea taelo e thata ho feta ea ho qetela:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Seo mohlolo oa rona o se hlahisitseng, o ka sebelisa ntho eo Data Profiling/Ad Hoc Query:

Apache Airflow: Ho etsa hore ETL e be bonolo
Ntho e ka sehloohong ke ho se e bontše ho bahlahlobisisi

qaqisa ka Likopano tsa ETL Ha ke na, ntho e 'ngoe le e' ngoe e nyenyane moo: re etsa motheo, ho na le letšoao ho eona, re phuthela ntho e 'ngoe le e' ngoe ka mookameli oa moelelo, 'me joale re etsa sena:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

seboka.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Nako e fihlile bokella lintlha tsa rona ho tloha litafoleng tsa rōna tse lekholo le halofo. Ha re etseng sena ka thuso ea mela e sa tsitsang haholo:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Ka thuso ea hook re fumana ho tloha Airflow pymssql-hokela
  2. Ha re fetoleng thibelo ka mokhoa oa letsatsi ho kopo - e tla lahleloa ts'ebetsong ke enjene ea template.
  3. Ho fepa kopo ea rona pandaske mang ea tla re fumana DataFrame - e tla ba molemo ho rona nakong e tlang.

Ke sebelisa phetolo {dt} sebakeng sa parameter ea kopo %s eseng hobane ke le Pinocchio e mobe, empa hobane pandas ha e kgone pymssql ebe e thella ea ho qetela params: Listle hoja a hlile a batla tuple.
Hape hlokomela hore moqapi pymssql o ile a etsa qeto ea hore ha a sa tla hlola a mo tšehetsa, 'me ke nako ea ho falla pyodbc.

Ha re boneng hore na Airflow e kentse likhang tsa mesebetsi ea rona ka eng:

Apache Airflow: Ho etsa hore ETL e be bonolo

Haeba ho se na data, joale ha ho na ntlha ea ho tsoela pele. Empa ho boetse hoa makatsa ho nahana ka ho tlatsa ka katleho. Empa sena hase phoso. A-ah-ah, ho etsa eng?! Mme ke eng:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException e bolella Airflow hore ha ho na liphoso, empa re tlola mosebetsi. Khokahano e ke ke ea ba le sekwere se setala kapa se khubelu, empa se pinki.

Ha re lahlele data ea rona litšiea tse ngata:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Ho joalo:

  • Database eo re nkileng litaelo ho eona,
  • ID ea nako ea rona ea likhohola (e tla fapana bakeng sa mosebetsi o mong le o mong),
  • Hash e tsoang mohloling le ID ea taelo - e le hore sebakeng sa polokelo ea boitsebiso (moo ntho e 'ngoe le e' ngoe e tšeloang tafoleng e le 'ngoe) re na le ID e ikhethang ea taelo.

Mohato oa pele o sala: tšela tsohle ho Vertica. Mme, ka ho makatsang, e 'ngoe ea litsela tse makatsang le tse sebetsang tsa ho etsa sena ke ka CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Re etsa moamoheli ea khethehileng StringIO.
  2. pandas e tla beha rona ka mosa DataFrame ka sebōpeho sa CSV- mela.
  3. Ha re bule khokahano ho Vertica eo re e ratang ka hook.
  4. 'Me joale ka thuso copy() romella lintlha tsa rona ka kotloloho ho Vertika!

Re tla nka ho mokhanni hore na ke mela e kae e tlatsitsoeng, 'me re bolelle mookameli oa seboka hore tsohle li lokile:

session.loaded_rows = cursor.rowcount
session.successful = True

Ke phetho.

Ha re rekisa, re theha poleiti e reriloeng ka letsoho. Mona ke ile ka itumella mochine o monyenyane:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Kea sebelisa VerticaOperator() Ke theha schema ea database le tafole (haeba li se li le sieo, ehlile). Ntho ea bohlokoa ke ho hlophisa hantle litšepe:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Ho akaretsa

- Ho lokile, - ho boletse toeba e nyane, - na ha ho joalo, joale
Na u kholisehile hore ke phoofolo e tšabehang ka ho fetisisa morung?

Julia Donaldson, The Gruffalo

Ke nahana hore haeba 'na le basebetsi-'moho le 'na re ne re e-na le tlhōlisano: ke mang ea tla potlakela ho theha le ho qala ts'ebetso ea ETL ho tloha qalong: bona ka SSIS ea bona le mouse le' na ka Airflow ... 'Me joale re tla boela re bapise boiketlo ba tlhokomelo ... Khele, ke nahana hore u tla lumela hore ke tla ba otla ka mahlakoreng 'ohle!

Haeba e tebile haholoanyane, joale Apache Airflow - ka ho hlalosa lits'ebetso ka mokhoa oa khoutu ea lenaneo - e entse mosebetsi oa ka. haholo e phutholohile ebile e monate.

Katoloso ea eona e sa lekanyetsoang, ka bobeli mabapi le li-plug-ins le predisposition to scalability, e u fa monyetla oa ho sebelisa Airflow hoo e batlang e le sebakeng sefe kapa sefe: esita le potolohong e felletseng ea ho bokella, ho lokisa le ho sebetsana le data, esita le ho qala lirokete (ho Mars, ea khoso).

Karolo ea ho qetela, boitsebiso le boitsebiso

Rake eo re u bokelelitseng eona

  • start_date. Ee, ena e se e ntse e le meme ea lehae. Ka lebaka la khang ea mantlha ea Doug start_date bohle ba feta. Ka bokhutšoanyane, haeba u qolla start_date letsatsi la joale, le schedule_interval - ka letsatsi le leng, ebe DAG e tla qala hosane ha ho pejana.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Mme ha ho sa na mathata.

    Ho na le phoso e 'ngoe ea nako ea ho sebetsa e amanang le eona: Task is missing the start_date parameter, eo hangata e bonts'ang hore u lebetse ho tlama ho opareitara ea dag.

  • Tsohle ka mochini o le mong. E, le metheo (Airflow ka boeona le ho roala ha rona), le seva sa marang-rang, le kemiso, le basebetsi. Mme e bile ya sebetsa. Empa ha nako e ntse e ea, palo ea mesebetsi bakeng sa lits'ebeletso e ile ea eketseha, 'me ha PostgreSQL e qala ho arabela index ka 20 s ho e-na le 5 ms, re ile ra e nka' me ra e tlosa.
  • LocalExecutor. E, re ntse re lutse holim'a eona, 'me re se re fihlile pheletsong ea mohohlo. LocalExecutor e re lekane ho fihlela joale, empa joale ke nako ea ho holisa bonyane mosebeletsi a le mong, 'me re tla tlameha ho sebetsa ka thata ho fallela CeleryExecutor. 'Me ka lebaka la hore u ka sebetsa le eona mochine o le mong, ha ho letho le u thibelang ho sebelisa Celery esita le ho seva, eo "ehlile, e ke keng ea kena tlhahiso, ka botšepehi!"
  • E sa sebelisoeng lisebelisoa tse hahiloeng:
    • Connections ho boloka litokomane tsa tšebeletso,
    • Liphoso tsa SLA ho arabela mesebetsing e sa kang ea sebetsa ka nako,
    • xcom bakeng sa phapanyetsano ea metadata (ke boletse metadata!) lipakeng tsa mesebetsi ea dag.
  • Tšebeliso e mpe ea mangolo. Joale, nka re'ng? Litemoso li ile tsa hlophisoa bakeng sa ho pheta-pheta ha mesebetsi e oeleng. Hona joale mosebetsi oa ka oa Gmail o na le li-imeile tse 90k tse tsoang ho Airflow, 'me mohala oa lengolo-tsoibila o hana ho nka le ho hlakola tse fetang 100 ka nako.

Likotsi tse ling: Likotsi tsa Apache Airflow

Lisebelisoa tse ling tsa othomathike

E le hore re sebetse le ho feta ka lihlooho tsa rona eseng ka matsoho a rona, Airflow e re lokiselitse sena:

  • LING API - o ntse a e-na le boemo ba Teko, bo sa mo thibeleng ho sebetsa. Ka eona, u ke ke ua fumana tlhahisoleseling feela ka li-dags le mesebetsi, empa hape u emisa / qala dag, theha DAG Run kapa letamo.
  • CLI - Lisebelisoa tse ngata li fumaneha ka mohala oa litaelo tse seng litšitiso feela ho sebelisoa ka WebUI, empa hangata ha li eo. Ka mohlala:
    • backfill ho hlokahala ho qala ts'ebetso hape.
      Ka mohlala, bahlahlobisisi ba ile ba tla ’me ba re: “’Me uena, comrade, u na le lintho tse se nang thuso litabeng tsa ho tloha ka la 1 Pherekhong ho ea ho la 13! E lokise, e lokise, e lokise, e lokise!" 'Me u sehobe se joalo:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Tšebeletso ea motheo: initdb, resetdb, upgradedb, checkdb.
    • run, e u lumellang hore u tsamaise mosebetsi oa mohlala o le mong, esita le ho fumana lintlha holima lintho tsohle tse itšetlehileng ka tsona. Ho feta moo, o ka e tsamaisa ka LocalExecutor, leha o na le sehlopha sa Celery.
    • E etsa ntho e tšoanang hantle test, feela hape ka metheong ha a ngole letho.
    • connections e lumella popo ea bongata ba likhokahano ho tsoa ho khetla.
  • python api - mokhoa o thata oa ho sebelisana, o etselitsoeng li-plugins, mme o sa kenelle ho oona ka matsoho a manyane. Empa ke mang ea ka re thibelang ho ea /home/airflow/dags, matha ipython ebe o qala ho senya? Ka mohlala, u ka romella likhokahano tsohle ka khoutu e latelang:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • E hokela ho metadatabase ea Airflow. Ha ke khothaletse ho e ngolla, empa ho fumana maemo a mosebetsi bakeng sa metrics e fapaneng ho ka potlaka hape ho bonolo ho feta ka li-API life kapa life.

    Ha re re ha se mesebetsi eohle ea rona e hlokang matla, empa ka linako tse ling e ka oa, 'me sena se tloaelehile. Empa li-blockages tse 'maloa li se li ntse li belaella,' me ho tla hlokahala ho hlahloba.

    Hlokomela SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

litšupiso

'Me ehlile, lihokelo tse leshome tsa pele ho tsoa ho phano ea Google ke litaba tsa foldara ea Airflow ho tsoa ho li-bookmark tsa ka.

Le lihokelo tse sebelisitsoeng sengolong:

Source: www.habr.com