I-Apache Airflow: Ukwenza i-ETL ibe lula

Sawubona, ngingu-Dmitry Logvinenko - Unjiniyela Wedatha Womnyango Wokuhlaziya weqembu le-Vezet lezinkampani.

Ngizokutshela ngethuluzi elihle lokuthuthukisa izinqubo ze-ETL - Apache Airflow. Kodwa i-Airflow iguquguquka futhi inezici eziningi kangangokuthi kufanele uyibhekisise ngisho noma ungabandakanyeki ekugelezeni kwedatha, kodwa unesidingo sokuvula noma yiziphi izinqubo futhi uqaphe ukusebenza kwazo.

Futhi yebo, ngeke ngitshele kuphela, kodwa futhi ngibonise: uhlelo lunekhodi eningi, izithombe-skrini nezincomo.

I-Apache Airflow: Ukwenza i-ETL ibe lula
Ovame ​​ukukubona uma usebenzisa i-google igama elithi Airflow / Wikimedia Commons

Uhlu lokuqukethwe

Isingeniso

I-Apache Airflow ifana ne-Django:

  • ebhalwe nge-python
  • kunephaneli enkulu yokuphatha,
  • enwebekayo unomphela

- okungcono kuphela, futhi yenzelwe izinjongo ezihluke ngokuphelele, okungukuthi (njengoba kubhaliwe ngaphambi kwe-kat):

  • ukusebenza nokuqapha imisebenzi enanini elingenamkhawulo lemishini (njengoba abaningi beCelery / Kubernetes nonembeza wakho uzokuvumela)
  • ngesizukulwane sokuhamba komsebenzi esiguqukayo esivela kulula kakhulu ukubhala nokuqonda ikhodi yePython
  • kanye nekhono lokuxhuma noma iyiphi i-database nama-API komunye nomunye kusetshenziswa kokubili izingxenye esezilungile nama-plugin enziwe ekhaya (okuyinto elula kakhulu).

Sisebenzisa i-Apache Airflow kanje:

  • siqoqa idatha emithonjeni ehlukahlukene (izimo eziningi ze-SQL Server ne-PostgreSQL, ama-API ahlukahlukene anamamethrikhi ohlelo lokusebenza, ngisho no-1C) ku-DWH ne-ODS (sine-Vertica ne-Clickhouse).
  • kuthuthuke kangakanani cron, eqala izinqubo zokuhlanganisa idatha ku-ODS, futhi iphinde igade ukugcinwa kwayo.

Kuze kube muva nje, izidingo zethu bezimbozwe iseva eyodwa encane enama-cores angama-32 kanye ne-50 GB ye-RAM. Ku-Airflow, lokhu kusebenza:

  • more 200 amagremu (empeleni ukuhamba komsebenzi, lapho sihlohle khona imisebenzi),
  • ngayinye ngokwesilinganiso 70 imisebenzi,
  • lobu buhle buqala (futhi ngokwesilinganiso) kanye ngehora.

Futhi mayelana nendlela esandise ngayo, ngizobhala ngezansi, kodwa manje ake sichaze i-über-inkinga esizoyixazulula:

Kunemithombo emithathu yamaseva e-SQL, ngayinye enemininingwane yolwazi engama-50 - izehlakalo zephrojekthi eyodwa, ngokulandelana, inesakhiwo esifanayo (cishe yonke indawo, i-mua-ha-ha), okusho ukuthi ngayinye inetafula lama-oda (ngenhlanhla, itafula elinalokho. Igama lingaphushwa kunoma yiliphi ibhizinisi). Sithatha idatha ngokwengeza izinkambu zesevisi (iseva yomthombo, isizindalwazi somthombo, i-ID yomsebenzi we-ETL) bese siziphonsela phakathi, sithi, Vertica.

Hamba!

Ingxenye eyinhloko, ephathekayo (kanye nethiyori encane)

Kungani thina (nawe)

Lapho izihlahla zazinkulu futhi ngangilula SQL-schik ekuthengiseni okukodwa kwaseRussia, sikhwabanise izinqubo ze-ETL aka ukugeleza kwedatha sisebenzisa amathuluzi amabili atholakalayo kithi:

  • Isikhungo samandla se-Informatica - isistimu esabalalisa ngokwedlulele, ekhiqiza kakhulu, enehadiwe yayo, inguqulo yayo. Ngasebenzisa uNkulunkulu angavumeli 1% yamakhono ayo. Kungani? Okokuqala, lesi sikhombimsebenzisi, endaweni ethile kusukela kuma-380s, sasicindezela ngokwengqondo. Okwesibili, le contraption yakhelwe izinqubo eziwubukhazikhazi kakhulu, ukuphinda kusetshenziswe ingxenye ethukuthele namanye amaqhinga ebhizinisi abaluleke kakhulu. Mayelana neqiniso lokuthi kubiza, njengophiko lwe-Airbus AXNUMX / ngonyaka, ngeke sisho lutho.

    Qaphela, isithombe-skrini singalimaza abantu abangaphansi kweminyaka engama-30 kancane

    I-Apache Airflow: Ukwenza i-ETL ibe lula

  • Iseva ye-SQL yokuhlanganisa iseva - sisebenzise leli qabane ekugelezeni kwethu kwangaphakathi kwephrojekthi. Nokho, eqinisweni: sesivele sisebenzisa i-SQL Server, futhi kungaba okungenangqondo ngandlela thile ukungasebenzisi amathuluzi ayo e-ETL. Konke okukuyo kuhle: kokubili isikhombimsebenzisi sihle, futhi inqubekelaphambili ibika ... Kodwa akusona isizathu sokuthi sithanda imikhiqizo yesofthiwe, oh, hhayi ngalokhu. Inguqulo dtsx (okuyi-XML enamanodi ashiyiwe ekulondolozweni) singakwazi, kodwa yini iphuzu? Kuthiwani ngokwenza iphakethe lomsebenzi elizodonsa amakhulu amatafula lisuka kwesinye iseva liye kwenye? Yebo, ikhulu elingakanani, umunwe wakho wenkomba uzowa ezingxenyeni ezingamashumi amabili, ngokuchofoza inkinobho yegundane. Kodwa ngokuqinisekile kubukeka kuyimfashini kakhulu:

    I-Apache Airflow: Ukwenza i-ETL ibe lula

Ngokuqinisekile safuna izindlela zokuphuma. Icala elilinganayo cishe ifike kujeneretha wephakheji ye-SSIS ozibhalele yona...

…bese ngathola umsebenzi omusha. Futhi i-Apache Airflow yangifica kuyo.

Lapho ngithola ukuthi izincazelo zenqubo ye-ETL ziyikhodi yePython elula, angizange ngidansele injabulo. Lena yindlela ukusakazwa kwedatha okwahunyushwa ngayo futhi kwahlukaniswa ngayo, futhi ukuthulula amatafula anesakhiwo esisodwa kusuka kusizindalwazi esingamakhulu kuya kuthagethi eyodwa kwaba indaba yekhodi yePython esikrinini esisodwa nesigamu noma ezimbili ezingu-13 ”.

Ukuhlanganisa iqoqo

Masingahleli inkulisa ngokuphelele, futhi singakhulumi ngezinto ezisobala ngokuphelele lapha, njengokufaka i-Airflow, isizindalwazi sakho esikhethiwe, isilimo esidliwayo esinamagatsha anamanzi namanye amacala achazwe emadokodweni.

Ukuze siqale ngokushesha ukuhlola, ngidwebe docker-compose.yml lapho:

  • Asiphakamise ngempela Ukungena komoya: Isihleli, Webserver. I-Flower izophinde ijikeleze lapho ukuze iqaphe imisebenzi ye-Celery (ngoba isivele iphushelwe kukho apache/airflow:1.10.10-python3.7, kodwa asinankinga)
  • I-PostgreSQL, lapho i-Airflow izobhala khona ulwazi lwayo lwesevisi (idatha yeshejuli, izibalo zokwenziwa, njll.), futhi I-Celery izomaka imisebenzi eqediwe;
  • Redis, ozosebenza njenge-task broker ye-Celery;
  • Isisebenzi se-Celery, ezobe ibambe iqhaza ekusebenzeni okuqondile kwemisebenzi.
  • Kufolda ./dags sizofaka amafayela ethu nencazelo yama-dags. Zizolandwa empukaneni, ngakho-ke asikho isidingo sokuhlanganisa sonke isitaki ngemva kokuthimula ngakunye.

Kwezinye izindawo, ikhodi esezibonelweni ayibonisiwe ngokuphelele (ukuze ingahlanganisi umbhalo), kodwa endaweni ethile iguqulwa inqubo. Izibonelo zekhodi yokusebenza ephelele zingatholakala endaweni yokugcina https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Amanothi:

  • Emhlanganweni wokuqanjwa, ngangithembele kakhulu esithombeni esaziwayo i-puckel/docker-airflow - qiniseka ukuthi uyayihlola. Mhlawumbe awudingi okunye empilweni yakho.
  • Zonke izilungiselelo ze-Airflow azitholakali kuphela nge airflow.cfg, kodwa nangezinto eziguquguqukayo zemvelo (ngibonga abathuthukisi), engisebenzise ngokungalungile.
  • Ngokwemvelo, akulungile ukukhiqizwa: ngamabomu angizange ngibeke ukushaya kwenhliziyo ezitsheni, angizange ngizihluphe ngokuphepha. Kodwa ngenze ubuncane obufanele abahloli bethu.
  • Qaphela ukuthi:
    • Ifolda ye-dag kufanele ifinyeleleke kubo bobabili abahleli kanye nabasebenzi.
    • Okufanayo kusebenza kuyo yonke imitapo yolwazi evela eceleni - kufanele yonke ifakwe emishinini enomhleli nabasebenzi.

Nokho, manje kulula:

$ docker-compose up --scale worker=3

Ngemuva kokuthi yonke into isivele, ungabheka izixhumanisi zewebhu:

Imiqondo eyisisekelo

Uma ubungaqondi lutho kuzo zonke lezi “dags”, nasi isichazamazwi esifushane:

  • Umhleli - umalume obaluleke kakhulu ku-Airflow, olawula ukuthi amarobhothi asebenza kanzima, hhayi umuntu: uqapha ishejuli, ubuyekeza ama-dags, wethula imisebenzi.

    Ngokuvamile, ezinguqulweni ezindala, wayenenkinga yenkumbulo (cha, hhayi i-amnesia, kodwa ukuvuza) futhi ipharamitha yefa yahlala ngisho kuzilungiselelo. run_duration - isikhawu sokuqalisa kabusha. Kodwa manje konke kuhamba kahle.

  • DAG (aka "dag") - "igrafu ye-acyclic eqondisiwe", kodwa incazelo enjalo izotshela abantu abambalwa, kodwa empeleni iyisitsha semisebenzi exhumanayo (bona ngezansi) noma i-analogue yePhakheji ku-SSIS kanye nokugeleza komsebenzi ku-Informatica .

    Ngaphezu kwama-dags, kungase kube khona ama-subdags, kodwa cishe ngeke sifike kuwo.

  • I-DAG Run - i-dag eqalisiwe, eyabelwe yona execution_date. Ama-dagrans we-dag efanayo angasebenza ngokuhambisana (uma wenze imisebenzi yakho yaba yinto engenamsebenzi, kunjalo).
  • I-Operator izingcezu zekhodi ezinesibopho sokwenza isenzo esithile. Kunezinhlobo ezintathu zama-opharetha:
    • isenzonjengentandokazi yethu PythonOperator, engasebenzisa noma iyiphi (evumelekile) ikhodi yePython;
    • ukudluliselwa, ehambisa idatha isuka endaweni iye kwenye, ithi, MsSqlToHiveTransfer;
    • inzwa ngakolunye uhlangothi, kuzokuvumela ukuthi usabele noma wehlise ijubane ukuqhubeka kwe-dag kuze kube yilapho kwenzeka umcimbi. HttpSensor ingadonsa iphoyinti lokugcina elishiwo, futhi lapho impendulo oyifunayo ilindile, qala ukudlulisa GoogleCloudStorageToS3Operator. Umqondo othanda ukwazi uzobuza: “ngani? Phela, ungenza izimpinda khona kanye ku-opharetha!” Futhi-ke, ukuze ungavimbeli inqwaba yemisebenzi nabaqhubi abamisiwe. Inzwa iqala, ihlole futhi ife ngaphambi komzamo olandelayo.
  • Umsebenzi - ama-opharetha amenyezelwe, kungakhathaliseki ukuthi hlobo luni, futhi anamathiselwe ku-dag anyuselwa ezingeni lomsebenzi.
  • isibonelo somsebenzi - lapho umhleli ojwayelekile enquma ukuthi sekuyisikhathi sokuthumela imisebenzi empini kubasebenzi abasebenza (khona lapho, uma sisebenzisa LocalExecutor noma ku-node eyihlane uma kwenzeka CeleryExecutor), ibanika umongo (okungukuthi, isethi yezinto eziguquguqukayo - amapharamitha wokwenza), inweba umyalo noma izifanekiso zemibuzo, futhi izihlanganise.

Senza imisebenzi

Okokuqala, ake siveze uhlelo olujwayelekile lwe-doug yethu, bese sizongena emininingwaneni ngokwengeziwe, ngoba sisebenzisa izixazululo ezingezona ezincane.

Ngakho-ke, ngendlela elula, i-dag enjalo izobukeka kanje:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Ake sikuthole:

  • Okokuqala, singenisa ama-libs adingekayo kanye Okunye;
  • sql_server_ds Ingabe List[namedtuple[str, str]] ngamagama okuxhunywa ku-Airflow Connections kanye nemininingwane yolwazi esizothatha kuyo ipuleti lethu;
  • dag - isimemezelo se-dag yethu, okumele ibe phakathi globals(), ngaphandle kwalokho i-Airflow ngeke iyithole. U-Doug naye udinga ukuthi:
    • ubani igama lakhe orders - leli gama lizovela kusixhumi esibonakalayo sewebhu,
    • ukuthi uzosebenza kusukela phakathi kwamabili mhlaka XNUMX kuJulayi,
    • futhi kufanele isebenze, cishe njalo emahoreni ayi-6 (kubantu abaqinile lapha esikhundleni salokho timedelta() eyamukelekayo cron- umugqa 0 0 0/6 ? * * *, kokupholile - isisho esifana ne @daily);
  • workflow() uzokwenza umsebenzi oyinhloko, kodwa hhayi manje. Okwamanje, sizovele silahle umongo wethu kulogi.
  • Futhi manje umlingo olula wokudala imisebenzi:
    • sigijima emithonjeni yethu;
    • qala PythonOperator, okuzokwenza i-dummy yethu workflow(). Ungakhohlwa ukucacisa igama eliyingqayizivele (ngaphakathi kwedag) lomsebenzi futhi ubophe insangu ngokwayo. Hlaba umkhosi provide_context yona, izothela izimpikiswano ezengeziwe emsebenzini, esizoziqoqa ngokucophelela sisebenzisa **context.

Okwamanje, yilokho kuphela. Esikutholile:

  • i-dag entsha kusixhumi esibonakalayo sewebhu,
  • imisebenzi eyikhulu nengxenye ezokwenziwa ngokuhambisana (uma i-Airflow, izilungiselelo ze-Celery kanye nomthamo weseva zikuvumela).

Hhayi-ke, cishe ngiyitholile.

I-Apache Airflow: Ukwenza i-ETL ibe lula
Ubani ozofaka okuncikile?

Ukwenza yonke le nto ibe lula, ngingene ngaphakathi docker-compose.yml ukucubungula requirements.txt kuwo wonke ama-node.

Manje isihambile:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Izikwele ezimpunga yizenzakalo zomsebenzi ezicutshungulwa isihleli.

Silinda kancane, imisebenzi ifinyezwa ngabasebenzi:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Abaluhlaza, kunjalo, bawuqedile ngempumelelo umsebenzi wabo. Okubomvu akuphumelelanga kakhulu.

Phela, ayikho ifolda ku-prod yethu ./dags, akukho ukuvumelanisa phakathi kwemishini - wonke ama-dags alele phakathi git ku-Gitlab yethu, futhi i-Gitlab CI isabalalisa izibuyekezo emishinini lapho ihlangana master.

Okuncane ngoFlower

Ngenkathi abasebenzi beshaya ama-pacifiers ethu, masikhumbule elinye ithuluzi elingasibonisa okuthile - Imbali.

Ikhasi lokuqala elinolwazi olufingqiwe kumanodi omsebenzi:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Ikhasi eliqine kakhulu elinemisebenzi eqale ukusebenza:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Ikhasi elibhora kakhulu elinesimo somthengisi wethu:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Ikhasi elikhanyayo linamagrafu esimo somsebenzi kanye nesikhathi sawo sokwenza:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Silayisha ngaphansi

Ngakho-ke, yonke imisebenzi isisebenzile, ungathwala abalimele.

I-Apache Airflow: Ukwenza i-ETL ibe lula

Futhi kwakukhona abaningi abalimele - ngesizathu esisodwa noma esinye. Esimeni sokusetshenziswa okufanele kwe-Airflow, zona kanye lezi zikwele zibonisa ukuthi idatha ayizange ifike.

Udinga ukubuka ilogu futhi uqale kabusha izimo zomsebenzi eziwile.

Ngokuchofoza kunoma yisiphi isikwele, sizobona izenzo ezitholakala kithi:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Ungathatha futhi wenze Sula abawile. Okusho ukuthi, siyakhohlwa ukuthi kukhona okuhlulekile lapho, futhi umsebenzi ofanayo wesibonelo uzoya kumhleli.

I-Apache Airflow: Ukwenza i-ETL ibe lula

Kuyacaca ukuthi ukwenza lokhu ngegundane ngazo zonke izikwele ezibomvu akubona ubuntu - lokhu akukhona esikulindele ku-Airflow. Ngokwemvelo, sinezikhali zokucekela phansi okukhulu: Browse/Task Instances

I-Apache Airflow: Ukwenza i-ETL ibe lula

Masikhethe yonke into ngesikhathi esisodwa bese sisetha kabusha kuqanda, chofoza into elungile:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Ngemva kokuhlanza, amatekisi ethu abukeka kanje (asevele alinde umhleli ukuthi awahlele):

I-Apache Airflow: Ukwenza i-ETL ibe lula

Izixhumanisi, izingwegwe nezinye eziguquguqukayo

Isikhathi sokubheka i-DAG elandelayo, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Ingabe wonke umuntu wake wenza isibuyekezo sombiko? Lona futhi: kukhona uhlu lwemithombo lapho ungathola khona idatha; kukhona uhla lapho kubekwa khona; ungakhohlwa ukushaya i-honk lapho konke kwenzeka noma kwaphuka (kahle, lokhu akukona ngathi, cha).

Ake sihlole ifayela futhi futhi sibheke izinto ezintsha ezingacacile:

  • from commons.operators import TelegramBotSendMessage - akukho okusivimbelayo ekwenzeni ama-opharetha ethu, esizuze ngawo ngokwenza isembozo esincane sokuthumela imilayezo ku-Unblocked. (Sizokhuluma kabanzi ngalo opharetha ngezansi);
  • default_args={} - i-dag ingasabalalisa izimpikiswano ezifanayo kubo bonke abaqhubi bayo;
  • to='{{ var.value.all_the_kings_men }}' - inkambu to ngeke sibe namakhodi aqinile, kodwa akhiqizwa ngamandla sisebenzisa i-Jinja kanye noguquko olunohlu lwama-imeyili, engilufake ngokucophelela. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — umbandela wokuqala u-opharetha. Kithina, incwadi izondizela kubaphathi kuphela uma konke ukuncika sekusebenzile ngempumelelo;
  • tg_bot_conn_id='tg_main' - izingxabano conn_id yamukela ama-ID okuxhumana esiwakha ngawo Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Imiyalezo kuTelegram izondiza ihambe kuphela uma kunemisebenzi ewile;
  • task_concurrency=1 - sikwenqabela ukwethulwa kanyekanye kwezimo ezimbalwa zomsebenzi womsebenzi owodwa. Uma kungenjalo, sizothola ukwethulwa okuningana ngesikhathi esisodwa VerticaOperator (ebuka itafula elilodwa);
  • report_update >> [email, tg] - konke VerticaOperator hlangana ekuthumeleni izincwadi nemiyalezo, kanje:
    I-Apache Airflow: Ukwenza i-ETL ibe lula

    Kodwa njengoba opharetha bezaziso benezimo ezihlukene zokuqalisa, eyodwa kuphela ezosebenza. Ku-Tree View, yonke into ibukeka ingabonakali kancane:
    I-Apache Airflow: Ukwenza i-ETL ibe lula

Ngizosho amagama ambalwa mayelana amakhro nabangane babo - eziguquguqukayo.

Amamakhro ayizibambi zendawo zamaJinja ezingafaka esikhundleni sokwaziswa okuwusizo okuhlukahlukene zibe izimpikiswano zomqhubi. Ngokwesibonelo, kanje:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} izonwebeka ifinyelele kokuqukethwe okuguquguqukayo komongo execution_date ngefomethi YYYY-MM-DD: 2020-07-14. Ingxenye engcono kakhulu ukuthi okuguquguqukayo komongo kubethelwe endaweni ethile yomsebenzi (isikwele ku-Tree View), futhi lapho kuqalwa kabusha, izimeli zizokhula zibe amanani afanayo.

Amanani abelwe angabukwa kusetshenziswa inkinobho ethi Rendered kusenzakalo ngasinye somsebenzi. Nansi indlela umsebenzi wokuthumela incwadi:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Futhi ngakho-ke emsebenzini ngokuthumela umlayezo:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Uhlu oluphelele lwamamakhro akhelwe ngaphakathi enguqulo yakamuva etholakalayo luyatholakala lapha: ireferensi yama-macros

Ngaphezu kwalokho, ngosizo lwama-plugin, singakwazi ukumemezela ama-macros ethu, kodwa leyo enye indaba.

Ngokungeziwe ezintweni ezichazwe ngaphambilini, singashintsha amanani okuguquguqukayo kwethu (sengivele ngisebenzise lokhu kukhodi engenhla). Masidale phakathi Admin/Variables izinto ezimbalwa:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Konke ongakusebenzisa:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Inani lingaba isikali, noma futhi lingaba i-JSON. Uma kwenzeka i-JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

vele usebenzise indlela eya kukhiye oyifunayo: {{ var.json.bot_config.bot.token }}.

Ngizosho igama elilodwa ngokoqobo futhi ngibonise isithombe-skrini esisodwa mayelana ukuxhumana. Konke kuyisisekelo lapha: ekhasini Admin/Connections sakha uxhumano, sengeza ukungena kwethu / amaphasiwedi kanye nemingcele ethize lapho. Kanje:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Amagama ayimfihlo angabethelwa (ngokucophelela kakhulu kunokuzenzakalelayo), noma ungashiya ngaphandle uhlobo lokuxhuma (njengoba ngenzile tg_main) - Iqiniso liwukuthi uhlu lwezinhlobo lufakwe ngokuqinile kumamodeli we-Airflow futhi alukwazi ukunwetshwa ngaphandle kokungena kumakhodi omthombo (uma kungazelelwe ngingazange ngisebenzise okuthile ku-google, ngicela ungilungise), kodwa akukho okuzosivimba ekutholeni amakhredithi igama.

Ungenza futhi ukuxhumana okuningana ngegama elifanayo: kulokhu, indlela BaseHook.get_connection(), esithola ukuxhumana ngegama, izonikeza okungahleliwe kusuka kuma-namesakes amaningana (kungaba okunengqondo kakhulu ukwenza i-Round Robin, kodwa ake sikushiye kunembeza wabathuthukisi be-Airflow).

Okuguquguqukayo Nokuxhumana kungamathuluzi apholile ngempela, kodwa kubalulekile ukuthi ungalahlekelwa ibhalansi: yiziphi izingxenye zokugeleza kwakho ozigcina kukhodi ngokwayo, nokuthi yiziphi izingxenye ozinikeza ku-Airflow ukuze zigcinwe. Ngakolunye uhlangothi, kungaba lula ukushintsha inani ngokushesha, isibonelo, ibhokisi leposi, nge-UI. Ngakolunye uhlangothi, lokhu kusewukubuyisela ekuchofozeni igundane, lapho thina (mina) sasifuna ukukususa.

Ukusebenza ngezixhumanisi kungomunye wemisebenzi izingwegwe. Ngokuvamile, amahhuku e-Airflow angamaphuzu okuyixhuma kumasevisi ezinkampani zangaphandle nemitapo yolwazi. Isb, JiraHook izosivulela iklayenti ukuze sihlanganyele noJira (ungahambisa imisebenzi uye phambili), futhi ngosizo lwe SambaHook ungaphusha ifayela lendawo ku smb- iphuzu.

Ukuhlaziya opharetha ngokwezifiso

Futhi sisondele ekubhekeni ukuthi yenziwa kanjani TelegramBotSendMessage

Ikhodi commons/operators.py no-opharetha wangempela:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Lapha, njengakho konke okunye ku-Airflow, yonke into ilula kakhulu:

  • Kuthathwe njengefa BaseOperator, esebenzisa izinto ezimbalwa eziqondene ne-Airflow (bheka isikhathi sakho sokuphumula)
  • Izinkambu ezimenyezelwe template_fields, lapho uJinja ezobheka khona amamakhro azocutshungulwa.
  • Uhlele izimpikiswano ezifanele __init__(), setha okumisiwe lapho kudingeka.
  • Asizange sikukhohlwe nokuqalwa kwedlozi.
  • Ivule ihuku elihambisanayo TelegramBotHookithole into yeklayenti kuyo.
  • Indlela ekhishiwe (echazwe kabusha). BaseOperator.execute(), okuyinto i-Airfow ezonyakaza lapho kufika isikhathi sokuqalisa opharetha - kuyo sizosebenzisa isenzo esiyinhloko, sikhohlwe ukungena ngemvume. (Singena, vele, singene stdout и stderr - I-Airflow izovimba yonke into, iyisonge kahle, ibole lapho kunesidingo.)

Ake sibone esinakho commons/hooks.py. Ingxenye yokuqala yefayela, enehhuku ngokwayo:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Angazi nokuthi ngizochaza ini lapha, ngizovele ngiphawule amaphuzu abalulekile:

  • Sithola ifa, cabanga ngezimpikiswano - ezimweni eziningi kuzoba eyodwa: conn_id;
  • Ukweqa izindlela ezijwayelekile: Ngizikhawulele get_conn(), lapho ngithola khona imingcele yokuxhuma ngegama futhi ngivele ngithole isigaba extra (le inkambu ye-JSON), lapho mina (ngokwemiyalo yami!) ngibeke ithokheni ye-Telegram bot: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ngenza isibonelo sethu TelegramBot, enikeza uphawu oluthile.

Yilokho kuphela. Ungathola iklayenti usebenzisa ihuku TelegramBotHook().clent noma TelegramBotHook().get_conn().

Futhi ingxenye yesibili yefayela, lapho ngenza khona i-microwrapper yeTelegram REST API, ukuze ngingadonsi okufanayo. python-telegram-bot ngendlela eyodwa sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Indlela efanele iwukuhlanganisa konke: TelegramBotSendMessage, TelegramBotHook, TelegramBot - ku-plugin, faka endaweni yokugcina yomphakathi, futhi uyinikeze ku-Open Source.

Ngenkathi sifunda konke lokhu, izibuyekezo zethu zombiko zikwazile ukwehluleka futhi zangithumelela umlayezo wephutha esiteshini. Ngizobheka ukuthi akulungile yini...

I-Apache Airflow: Ukwenza i-ETL ibe lula
Kukhona okwaphuka edojini lethu! Akukhona yini lokho ebesikulindele? Impela!

Uzophalaza?

Ingabe uzizwa ngiphuthelwe okuthile? Kubonakala sengathi uthembise ukudlulisa idatha esuka ku-SQL Server iye ku-Vertica, wabe eseyithatha futhi wayisusa esihlokweni, i-scoundrel!

Lesi sihluku besingenhloso, bekumele ngikucacisele amagama athile. Manje ungadlulela phambili.

Uhlelo lwethu bekungukuthi:

  1. Yenza dag
  2. Khiqiza imisebenzi
  3. Bheka ukuthi kuhle kanjani konke
  4. Yabela izinombolo zeseshini ezizogcwaliswa
  5. Thola idatha ku-SQL Server
  6. Faka idatha ku-Vertica
  7. Qoqa izibalo

Ngakho-ke, ukuze konke lokhu kusebenze, ngenze isengezo esincane kweyethu docker-compose.yml:

i-docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Lapho siphakamisa:

  • UVertica ungumbungazi ohamba phambili dwh ngezilungiselelo ezizenzakalelayo kakhulu,
  • izimo ezintathu ze-SQL Server,
  • sigcwalisa imininingwane ekugcineni ngedatha ethile (noma kunjalo ungabheki mssql_init.py!)

Sethula konke okuhle ngosizo lomyalo onzima kakhulu kunesikhathi sokugcina:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Okukhiqizwe isimangaliso sethu se-randomizer, ungasebenzisa into Data Profiling/Ad Hoc Query:

I-Apache Airflow: Ukwenza i-ETL ibe lula
Into esemqoka ukuthi ungayibonisi kubahlaziyi

chaza kabanzi Izikhathi ze-ETL Ngeke, yonke into incane lapho: senza isisekelo, kukhona uphawu kuso, sigoqa yonke into ngomphathi komongo, futhi manje senza lokhu:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

iseshini.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Isikhathi sesifikile qoqa idatha yethu ematafuleni ethu alikhulu nengxenye. Masenze lokhu ngosizo lwemigqa engenasizotha kakhulu:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Ngosizo lwehuku sithola ku-Airflow pymssql-xhuma
  2. Ake sishintshe ukuvinjelwa ngendlela yedethi esicelweni - sizophonswa kumsebenzi ngenjini yesifanekiso.
  3. Ukuphakela isicelo sethu pandasngubani ozosithola DataFrame - kuyoba usizo kithi esikhathini esizayo.

Ngisebenzisa ukufaka esikhundleni {dt} esikhundleni sepharamitha yesicelo %s hhayi ngoba nginguPinocchio omubi, kodwa ngoba pandas ayikwazi ukuphatha pymssql bese eshelela esokugcina params: Listnakuba efuna ngempela tuple.
Futhi qaphela ukuthi unjiniyela pymssql wanquma ukuthi angabe esameseka, futhi sekuyisikhathi sokuphuma pyodbc.

Ake sibone ukuthi i-Airflow ifake ini izingxabano zemisebenzi yethu:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Uma ingekho idatha, asikho isidingo sokuqhubeka. Kodwa futhi kuyamangaza ukucabangela ukugcwalisa ngempumelelo. Kodwa lokhu akulona iphutha. A-ah-ah, yini okufanele uyenze?! Futhi nakhu:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException itshela i-Airflow ukuthi awekho amaphutha, kodwa siyaweqa umsebenzi. I-interface ngeke ibe nesikwele esiluhlaza noma esibomvu, kodwa i-pink.

Masilahle idatha yethu amakholomu amaningi:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Okuthiwa:

  • Isizindalwazi esithathe kuso ama-oda,
  • I-ID yeseshini yethu yezikhukhula (izohluka kuwo wonke umsebenzi),
  • I-hashi evela kumthombo kanye ne-ID ye-oda - ukuze kusizindalwazi sokugcina (lapho konke kuthelwa etafuleni elilodwa) sibe ne-ID ye-oda eyingqayizivele.

Isinyathelo sokuqala sisele: thela yonke into ku-Vertica. Futhi, okuxakile ukuthi, enye yezindlela ezinhle kakhulu nezisebenzayo zokwenza lokhu nge-CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Senza isamukeli esikhethekile StringIO.
  2. pandas uzosifaka ngomusa wethu DataFrame ngesimo CSV- imigqa.
  3. Masivule uxhumano lwe-Vertica yethu esiyintandokazi ngehuku.
  4. Futhi manje ngosizo copy() thumela idatha yethu ngqo ku-Vertika!

Sizothatha kumshayeli ukuthi mingaki imigqa egcwaliswe, bese sitshela umphathi weseshini ukuthi konke kulungile:

session.loaded_rows = cursor.rowcount
session.successful = True

Yilokho kuphela.

Lapho sithengisa, sakha ipuleti eliqondiwe mathupha. Lapha ngizivumele umshini omncane:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Ngiyasebenzisa VerticaOperator() Ngakha i-schema sedathabhesi kanye netafula (uma zingekho kakade, kunjalo). Into esemqoka ukuhlela kahle ukuncika:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Ukufingqa

- Awu, - kusho igundane elincane, - akunjalo, manje
Ingabe uyaqiniseka ukuthi ngiyisilwane esibi kakhulu ehlathini?

UJulia Donaldson, The Gruffalo

Ngicabanga ukuthi uma mina nozakwethu sinomncintiswano: ubani ozodala ngokushesha futhi aqalise inqubo ye-ETL kusukela ekuqaleni: bona nge-SSIS yabo negundane nami nge-Airflow ... Futhi-ke sizophinde siqhathanise ukukhululeka kokugcinwa ... Hawu, ngicabanga ukuthi uzovuma ukuthi ngizobashaya kuzo zonke izinhlangothi!

Uma ubucayi kakhulu, i-Apache Airflow - ngokuchaza izinqubo ngendlela yekhodi yohlelo - ngenze umsebenzi wami kakhulu ukhululekile futhi kujabulise.

Ukwandiswa kwayo okungenamkhawulo, kokubili ngokwemibandela yama-plug-in kanye nokubikezelwa kokukaleka, kukunikeza ithuba lokusebenzisa i-Airflow cishe kunoma iyiphi indawo: ngisho nasemjikelezweni ogcwele wokuqoqa, ukulungiselela nokucubungula idatha, ngisho nasekuqaliseni amarokhethi (ku-Mars, inkambo).

Ingxenye yokugcina, inkomba kanye nolwazi

Ireki sikuqoqele yona

  • start_date. Yebo, lokhu sekuvele kuyi-meme yendawo. Ngempikiswano enkulu kaDoug start_date zonke ziyadlula. Kafushane, uma ucacise ku start_date usuku lwamanje, kanye schedule_interval - ngolunye usuku, bese i-DAG izoqala kusasa akukho ngaphambili.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Futhi azisekho izinkinga.

    Kukhona elinye iphutha lesikhathi sokusebenza elihlotshaniswa nayo: Task is missing the start_date parameter, okuvame ukukhombisa ukuthi ukhohlwe ukubophezela ku-opharetha we-dag.

  • Konke kumshini owodwa. Yebo, nezisekelo (I-Airflow ngokwayo kanye ne-coating yethu), kanye neseva yewebhu, kanye nomhleli, nabasebenzi. Futhi kwasebenza. Kodwa ngokuhamba kwesikhathi, inani lemisebenzi yezinsizakalo lakhula, futhi lapho i-PostgreSQL iqala ukuphendula inkomba ngamasekhondi angu-20 esikhundleni sika-5 ms, sayithatha sahamba nayo.
  • I-LocalExecutor. Yebo, sisahlezi phezu kwayo, futhi sesivele sifikile ekugcineni kwalasha. I-LocalExecutor isanele kuze kube manje, kodwa manje sekuyisikhathi sokunweba okungenani nesisebenzi esisodwa, futhi kuzodingeka sisebenze kanzima ukuze sithuthele ku-CeleryExecutor. Futhi ngenxa yokuthi ungasebenza nayo emshinini owodwa, akukho lutho olukuvimbela ukusebenzisa i-Celery ngisho nakuseva, "okuyiqiniso, engasoze yangena ekukhiqizeni, ngokwethembeka!"
  • Ukungasebenzisi amathuluzi akhelwe ngaphakathi:
    • Connections ukugcina imininingwane yesevisi,
    • I-SLA iyaphuthelwa ukuphendula emisebenzini engazange isebenze ngesikhathi,
    • xcom ngokushintshisana kwemethadatha (ngithe imetaidatha!) phakathi kwemisebenzi ye-dag.
  • Ukuhlukunyezwa kwemeyili. Hhayi-ke, ngingathini? Izaziso zenzelwe zonke izimpinda zemisebenzi ewile. Manje i-Gmail yomsebenzi wami ine>ama-imeyili angu-90k avela ku-Airflow, futhi umlomo wombhobho wewebhu uyenqaba ukucosha nokususa angaphezu kuka-100 ngesikhathi.

Ezinye izingibe: I-Apache Airflow Pitfails

Amathuluzi e-automation amaningi

Ukuze sisebenze nakakhulu ngamakhanda ethu hhayi ngezandla zethu, i-Airflow isilungiselele lokhu:

  • I-REST API - usenesimo se-Experimental, esingamvimbi ukuthi asebenze. Ngayo, awukwazi ukuthola kuphela ulwazi mayelana nama-dags nemisebenzi, kodwa futhi umise/uqale i-dag, udale i-DAG Run noma ichibi.
  • CLI - amathuluzi amaningi ayatholakala ngomugqa womyalo okungekona nje okungalungile ukuwasebenzisa nge-WebUI, kodwa ngokuvamile awekho. Ngokwesibonelo:
    • backfill okudingekayo ukuze kuqalwe kabusha izimo zomsebenzi.
      Ngokwesibonelo, abahlaziyi beza bathi: “Futhi wena, qabane, unembudane emininingwaneni kusukela ngoJanuwari 1 kuya ku-13! Lungisa, lulungise, lulungise, lulungise!" Futhi uyi-hob enjalo:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Isevisi yesisekelo: initdb, resetdb, upgradedb, checkdb.
    • run, okuvumela ukuthi wenze umsebenzi wesibonelo esisodwa, futhi uphinde uthole amaphuzu kukho konke ukuncika. Ngaphezu kwalokho, ungakwazi ukuyisebenzisa ngokusebenzisa LocalExecutor, ngisho noma uneqoqo le-Celery.
    • Yenza into efanayo kakhulu test, kuphela futhi ezisekelweni ayibhali lutho.
    • connections ivumela ukudalwa okukhulu kokuxhumana okuvela kugobolondo.
  • IPython API - indlela eqinile yokuxhumana, ehloselwe ama-plugin, hhayi ukugcwala kuwo ngezandla ezincane. Kodwa ubani ozosivimba ukuthi siye kuye /home/airflow/dags, gijima ipython bese uqala ukumosha? Ungakwazi, isibonelo, ukuthekelisa konke ukuxhumana ngekhodi elandelayo:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Ixhuma ku-metadatabase ye-Airflow. Angincomi ukuyibhalela, kodwa ukuthola izifunda zomsebenzi zamamethrikhi athile athile kungashesha kakhulu futhi kube lula kunanoma imaphi ama-API.

    Ake sithi akuyona yonke imisebenzi yethu engenamandla, kodwa ngezinye izikhathi ingawa, futhi lokhu kuyinto evamile. Kodwa ukuvinjwa okumbalwa sekuvele kusolisa, futhi kuzodingeka ukuthi kuhlolwe.

    Qaphela i-SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

izithenjwa

Futhi-ke, izixhumanisi zokuqala eziyishumi eziphuma ekukhishweni kwe-Google zingukuqukethwe kwefolda ye-Airflow ephuma kumabhukhimakhi ami.

Futhi izixhumanisi ezisetshenziswe esihlokweni:

Source: www.habr.com

Thenga ukusingathwa okuthembekile kwamasayithi anokuvikelwa kwe-DDoS, amaseva e-VPS VDS 🔥 Thenga ukusingathwa kwewebhusayithi okuthembekile ngokuvikelwa kwe-DDoS, amaseva e-VPS VDS | ProHoster