I-Apache Airflow: Ukwenza i-ETL ibe lula

Molo, ndinguDmitry Logvinenko -uNjineli weDatha weSebe loHlaziyo lweqela leVezet leenkampani.

Ndiza kukuxelela ngesixhobo esimangalisayo sokuphuhlisa iinkqubo ze-ETL-Apache Airflow. Kodwa i-Airflow iyaguquguquka kwaye inamacala amaninzi kangangokuba kuya kufuneka uyijonge ngakumbi nokuba awubandakanyekanga kuqukuqelo lwedatha, kodwa unesidingo sokumisela rhoqo naziphi na iinkqubo kwaye ubeke iliso ekusebenzeni kwazo.

Kwaye ewe, andiyi kuxelela kuphela, kodwa ndibonise kwakhona: inkqubo inekhowudi eninzi, izikrini kunye neengcebiso.

I-Apache Airflow: Ukwenza i-ETL ibe lula
Yintoni oqhele ukuyibona xa uGoogle igama elithi Airflow / Wikimedia Commons

Uluhlu lomxholo

Intshayelelo

IApache Airflow ifana neDjango:

  • ibhalwe ngepython
  • kukho ipaneli enkulu yolawulo,
  • yandiswa ngokungenammiselo

- ngcono kuphela, kwaye yenziwe ngeenjongo ezahlukeneyo ngokupheleleyo, ezizezi (njengoko kubhaliwe phambi kwekata):

  • ukuqhuba kunye nokubeka esweni imisebenzi kwinani elingenamkhawulo loomatshini (ubuninzi beCelery / Kubernetes kunye nesazela sakho siya kukuvumela)
  • kunye nesizukulwana sokuhamba komsebenzi esivela kulula kakhulu ukubhala nokuqonda ikhowudi yePython
  • kunye nokukwazi ukudibanisa nayiphi na i-database kunye nee-API kunye nomnye usebenzisa amacandelo asele esele enziwe kunye neeplagi ezenziwe ekhaya (ezilula kakhulu).

Sisebenzisa iApache Airflow ngolu hlobo:

  • siqokelela idatha kwimithombo eyahlukeneyo (ininzi ye-SQL Server kunye ne-PostgreSQL iimeko, ii-APIs ezahlukeneyo kunye neemetrics zesicelo, nokuba yi-1C) kwi-DWH kunye ne-ODS (sineVertica kunye neClickhouse).
  • njani phambili cron, eqala iinkqubo zokudibanisa idatha kwi-ODS, kwaye ibeke esweni ukugcinwa kwayo.

Kuze kube kutshanje, iimfuno zethu zigutyungelwe ngumncedisi omnye omncinci kunye ne-32 cores kunye ne-50 GB ye-RAM. Kwi-Airflow, oku kusebenza:

  • Π±ΠΎΠ»Π΅Π΅ 200 iidags (eneneni ukuhamba komsebenzi, apho sasifaka khona imisebenzi),
  • kwi-avareji nganye 70 imisebenzi,
  • oku kulunga kuyaqala (kwakhona kumndilili) kanye ngeyure.

Kwaye malunga nendlela esandise ngayo, ndiza kubhala ngezantsi, kodwa ngoku makhe sichaze i-ΓΌber-ingxaki esiza kuyisombulula:

Kukho ezintathu umthombo SQL Servers, ngamnye kunye 50 yogcino-lwazi - imizekelo yeprojekthi enye, ngokulandelelana, banesakhiwo esifanayo (phantse yonke indawo, mua-ha-ha), nto leyo ethetha ukuba ngamnye itafile Orders (ngethamsanqa, itafile kunye naloo nto. Igama lingatyhalelwa kulo naliphi na ishishini). Sithatha idatha ngokongeza imimandla yenkonzo (umncedisi womthombo, isiseko sedatha, i-ID yomsebenzi we-ETL) kwaye ngokungenangqondo uziphose kuyo, yithi, i-Vertica.

Masihambe!

Elona candelo liphambili, elisebenzayo (kunye nethiyori encinci)

Kutheni thina (kunye nawe)

Xa imithi yayinkulu kwaye ndandilula SQL-schik kwindawo enye yokuthengisa yaseRashiya, siye saqhatha iinkqubo ze-ETL aka ukuhamba kwedatha usebenzisa izixhobo ezibini ezikhoyo kuthi:

  • Informatica Power Centre -inkqubo esasazeke kakhulu, enemveliso kakhulu, enehardware yayo, inguqulelo yayo. Ndasebenzisa uThixo angavumeli i-1% yezakhono zayo. Ngoba? Ewe, okokuqala, olu jongano, kwindawo ethile ukusuka kwi-380s, lubeka uxinzelelo lwengqondo kuthi. Okwesibini, le contraption yenzelwe iinkqubo ezintle kakhulu, ukuphinda kusetyenziswe icandelo elinomsindo kunye namanye amaqhinga abaluleke kakhulu eshishini. Malunga neendleko, njengephiko le-Airbus AXNUMX / ngonyaka, asiyi kuthetha nto.

    Qaphela, umfanekiso weskrini unokulimaza abantu abangaphantsi kwama-30 kancinci

    I-Apache Airflow: Ukwenza i-ETL ibe lula

  • Iseva yoHlanganiso lweSeva yeSQL - sisebenzise eli qabane ekuhambeni kwethu kweprojekthi yangaphakathi. Ewe, eneneni: sele sisebenzisa i-SQL Server, kwaye iya kuba yinto engekho ngqiqweni ngandlela thile ukungasebenzisi izixhobo zayo ze-ETL. Yonke into ekuyo ilungile: zombini i-interface ihle, kwaye ingxelo yenkqubela phambili ... Kodwa akusiyo isizathu sokuba sithanda iimveliso zesoftware, oh, kungekhona oku. Yiguqulele dtsx (eyi-XML enamaqhuqhuva adityanisiweyo ekugcinweni) singakwazi, kodwa yintoni inqaku? Kuthekani ngokwenza ipakethe yomsebenzi eya kutsala amakhulu eetafile ukusuka kwiseva enye ukuya kwenye? Ewe, leliphi ikhulu, umnwe wakho wesalathisi uya kuwa kwiziqwenga ezingamashumi amabini, ucofa iqhosha lemouse. Kodwa ngokuqinisekileyo ibonakala isefashonini ngakumbi:

    I-Apache Airflow: Ukwenza i-ETL ibe lula

Ngokuqinisekileyo sakhangela iindlela zokuphuma. Case even phantse yeza kwijenereyitha yephakheji yeSSIS ezibhale ngokwayo ...

…kwaye ke ndafumana umsebenzi omtsha. Kwaye iApache Airflow yandifumana kuyo.

Xa ndafumanisa ukuba iinkcazo zenkqubo ye-ETL ziyikhowudi yePython elula, andizange ndidanise ngovuyo. Yile ndlela imijelo yedatha eguqulelwe ngayo kwaye yahlulwa, kwaye ukugalela iitafile ezinesakhiwo esinye ukusuka kumakhulu ogcino-lwazi ukuya kwithagethi enye yaba ngumbandela wekhowudi yePython kwisikrini esinye nesiqingatha okanye ezibini ezili-13.

Ukudibanisa iqela

Masingacwangcisi i-kindergarten ngokupheleleyo, kwaye singathethi malunga nezinto ezicacileyo apha, njengokufakela i-Airflow, i-database yakho ekhethiweyo, i-Celery kunye nezinye iimeko ezichazwe kwi-docks.

Ukuze siqalise kwangoko imifuniselo, ndiye ndazoba docker-compose.yml apho ku:

  • Masiphakamise ngokwenene Ukuhamba komoya: Umcwangcisi, umncedisi wewebhu. Intyatyambo iya kujikeleza apho ukuze ibeke iliso kwimisebenzi yeCelery (kuba sele ityhiliziwe apache/airflow:1.10.10-python3.7, kodwa asinangxaki)
  • PostgreSQL, apho i-Airflow iya kubhala ulwazi lwayo lwenkonzo (idatha yomcwangcisi, izibalo zokubulawa, njl.), kunye neCelery iya kumakisha imisebenzi egqityiweyo;
  • Redis, eya kusebenza njenge-task broker yeCelery;
  • Umsebenzi weCelery, eya kubandakanyeka ekusebenzeni ngokuthe ngqo kwemisebenzi.
  • Ukufowunela ./dags siya kongeza iifayile zethu kunye nenkcazo yeedags. Ziya kucholwa kwimpukane, ngoko akukho mfuneko yokujija yonke into emva kokuthimla ngakunye.

Kwezinye iindawo, ikhowudi kwimizekelo ayiboniswanga ngokupheleleyo (ukuze ingafaki isicatshulwa), kodwa kwenye indawo iguqulwa kwinkqubo. Imizekelo epheleleyo yekhowudi yokusebenza inokufumaneka kwindawo yokugcina https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Amanqaku:

  • Kwindibano yokuqulunqa, ndandithembele kakhulu kumfanekiso owaziwayo puckel/docker-airflow - qiniseka ukuyijonga. Mhlawumbi awudingi nto yimbi ebomini bakho.
  • Zonke iisetingi zeAirflow azifumaneki kuphela nge airflow.cfg, kodwa nangokuguquguquka kwemekobume (enkosi kubaphuhlisi), endithe ndathatha ithuba ngayo.
  • Ngokwendalo, ayisilungele imveliso: khange ndibeke ukubetha kwentliziyo ngabom kwizikhongozeli, andikhange ndikhathazeke ngokhuseleko. Kodwa ndenze ubuncinci obufanelekileyo kubavavanyi bethu.
  • Qaphela oko:
    • Ifolda yedag mayifikeleleke kubo bobabini umcwangcisi kunye nabasebenzi.
    • Kukwasebenza okufanayo kuwo onke amathala eencwadi esithathu - kufuneka afakwe onke koomatshini abanomcwangcisi kunye nabasebenzi.

Ewe, ngoku ilula:

$ docker-compose up --scale worker=3

Emva kokuba yonke into iphakame, unokujonga ujongano lwewebhu:

Iingcamango ezisisiseko

Ukuba ubungaqondi kwanto kuzo zonke ezi "dags", ke nasi isichazi-magama esifutshane:

  • Ishedyuli - oyena malume ubalulekileyo kwi-Airflow, ukulawula ukuba iirobhothi zisebenza nzima, kwaye kungekhona umntu: ubeka iliso kwishedyuli, uhlaziywa i-dags, uqalise imisebenzi.

    Ngokubanzi, kwiinguqulelo ezindala, wayenengxaki yenkumbulo (hayi, hayi i-amnesia, kodwa iyavuza) kwaye iparamitha yelifa ide yahlala kwi-configs. run_duration - isithuba sokuqalisa kwakhona. Kodwa ngoku yonke into ilungile.

  • Dag (aka "dag") - "igrafu ye-acyclic eqondisiweyo", kodwa inkcazo enjalo iya kuxelela abantu abambalwa, kodwa eneneni sisitya semisebenzi esebenzisanayo (jonga ngezantsi) okanye i-analogue yePakethe kwi-SSIS kunye nokuhamba komsebenzi kwi-Informatica .

    Ukongeza kwii-dags, kusenokubakho ii-subdags, kodwa ngokuqinisekileyo asiyi kufika kubo.

  • DAG Qhuba - i-dag eqalisiwe, eyabelwe yona execution_date. Iidagrans zedag efanayo zinokusebenza ngokufanayo (ukuba uyenzile imisebenzi yakho ingenamsebenzi, kunjalo).
  • Umqhubi ziingceba zekhowudi ezinoxanduva lokwenza isenzo esithile. Kukho iintlobo ezintathu zabaqhubi:
    • inyathelonjengokuthanda kwethu PythonOperator, enokuphumeza nayiphi na (esebenzayo) ikhowudi yePython;
    • tshintshelo, ehambisa idatha ukusuka kwindawo ukuya kwindawo, ithi, MsSqlToHiveTransfer;
    • woluvo kwelinye icala, iyakuvumela ukuba usabele okanye ucothise uphumezo oluqhubekayo lwedag de isiganeko senzeke. HttpSensor inokutsala isiphelo esichaziweyo, kwaye xa impendulo efunwayo ilindile, qalisa ukudluliselwa GoogleCloudStorageToS3Operator. Ingqondo ethandabuzayo iya kubuza: β€œNgoba? Emva kwayo yonke loo nto, ungenza uphinda-phindo kanye kumsebenzisi!" Kwaye ke, ukuze ungavala ichibi lemisebenzi kunye nabaqhubi abamisiweyo. Uluvo luqala, lujonga kwaye lufe phambi komzamo olandelayo.
  • Task - abaqhubi ababhengeziweyo, kungakhathaliseki ukuba luhlobo luni na, kwaye luqhotyoshelwe kwi-dag banyuselwa kwinqanaba lomsebenzi.
  • umzekelo womsebenzi - xa umcwangcisi jikelele egqibe ekubeni lixesha lokuthumela imisebenzi edabini kubasebenzi abaqhubayo (kanye kanye kuloo ndawo, ukuba sisebenzisa LocalExecutor okanye kwindawo ekude kwimeko ye CeleryExecutor), inika umxholo kubo (oko kukuthi, uluhlu lwezinto eziguquguqukayo - iiparamitha zokuphumeza), yandisa umyalelo okanye itemplates zokubuza, kwaye izidibanise.

Senza imisebenzi

Okokuqala, makhe sichaze iskimu esiqhelekileyo se-doug yethu, kwaye emva koko siya kuntywila kwiinkcukacha ngakumbi nangakumbi, kuba sisebenzisa izisombululo ezithile ezingezizo.

Ke, ngeyona ndlela ilula, i-dag enjalo iya kujongeka ngolu hlobo:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Masiyiqonde:

  • Okokuqala, singenisa ii-libs eziyimfuneko kunye enye into;
  • sql_server_ds - yi le List[namedtuple[str, str]] ngamagama oqhagamshelo oluvela kwi-Airflow Connections kunye nogcino-lwazi apho siya kuthatha ipleyiti yethu;
  • dag -isibhengezo sedag yethu, ekufuneka ingenile globals(), kungenjalo Airflow akayi kuyifumana. UDoug naye kufuneka athi:
    • Ngubani igama lakhe orders -eli gama liya kuvela kujongano lwewebhu,
    • ukuba uya kusebenza ukususela ezinzulwini zobusuku ngomhla wesibhozo kaJulayi,
    • kwaye kufuneka ibaleke, malunga neeyure ezi-6 (kubantu abanzima apha endaweni timedelta() kuvumelekile cron-umgca 0 0 0/6 ? * * *, ngenxa yokuphola kancinci - imbonakalo efana @daily);
  • workflow() izakwenza owona msebenzi uphambili, kodwa hayi ngoku. Okwangoku, siza kulahla umxholo wethu kwilog.
  • Kwaye ngoku umlingo olula wokudala imisebenzi:
    • sibaleka ngemithombo yethu;
    • qalisa PythonOperator, eya kwenza i-dummy yethu workflow(). Ungalibali ukucacisa igama elikhethekileyo (ngaphakathi kwedag) lomsebenzi kwaye ubophe i-dag ngokwayo. Iflegi provide_context ngokulandelayo, iya kugalela iingxoxo ezongezelelweyo kumsebenzi, esiya kuqokelela ngononophelo siwusebenzisa **context.

Okwangoku, kuphelele apho. Into esinayo:

  • i-dag entsha kujongano lwewebhu,
  • ikhulu elinesiqingatha imisebenzi eya kwenziwa ngokunxuseneyo (ukuba Airflow, useto lweCelery kunye nomthamo umncedisi uyakuvumela).

Kulungile, phantse ndiyifumene.

I-Apache Airflow: Ukwenza i-ETL ibe lula
Ngubani oza kufaka izinto ezixhomekeke kuwe?

Ukwenza lula yonke le nto, ndangena ngaphakathi docker-compose.yml ukuqhubekeka requirements.txt kuzo zonke iindawo.

Ngoku ayisekho:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Izikwere ezingwevu ziimeko zemisebenzi eyenziwa ngumcwangcisi.

Silinda kancinci, imisebenzi ithathwa ngabasebenzi:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Abaluhlaza, ngokuqinisekileyo, bawugqibile ngempumelelo umsebenzi wabo. Iibomvu aziphumelelanga kakhulu.

Ngendlela, akukho folda kwiprod yethu ./dags, akukho lungqamaniso phakathi koomatshini - zonke iidags zilele git kwiGitlab yethu, kunye neGitlab CI isasaza uhlaziyo koomatshini xa udityaniswa master.

Kancinci malunga neNtyantyambo

Ngelixa abasebenzi bebhuqa iipacifiers zethu, masikhumbule esinye isixhobo esinokusibonisa into - Intyatyambo.

Iphepha lokuqala elinolwazi olusisishwankathelo kwiindawo zabasebenzi:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Elona phepha libukhali elinemisebenzi eye yasebenza:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Elona phepha likruqulayo elinesimo somthengisi wethu:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Elona phepha liqaqambileyo linegrafu yobume bomsebenzi kunye nexesha labo lokwenziwa:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Silayisha phantsi

Ke, yonke imisebenzi isebenzile, unokuthwala iingxwelerha.

I-Apache Airflow: Ukwenza i-ETL ibe lula

Kwaye baninzi abangxwelerhekileyo - ngenxa yesizathu okanye esinye. Kwimeko yokusetyenziswa ngokuchanekileyo kwe-Airflow, ezi zikwere zibonisa ukuba idatha ngokuqinisekileyo ayizange ifike.

Kufuneka ujonge ilog kwaye uqalise kwakhona iimeko zomsebenzi eziwileyo.

Ngokucofa nakwesiphi na isikwere, siza kubona izenzo ezifumanekayo kuthi:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Ungathatha kwaye wenze Cacisa abawileyo. Oko kukuthi, siyalibala ukuba kukho into engaphumelelanga apho, kwaye umsebenzi ofanayo womzekelo uya kuya kumcwangcisi.

I-Apache Airflow: Ukwenza i-ETL ibe lula

Kucacile ukuba ukwenza oku nge mouse kunye nazo zonke izikwere ezibomvu azikho ubuntu kakhulu - oku ayisiyiyo into esiyilindeleyo kwi-Airflow. Ngokwemvelo, sinezixhobo zentshabalalo enkulu: Browse/Task Instances

I-Apache Airflow: Ukwenza i-ETL ibe lula

Masikhethe yonke into ngaxeshanye kwaye sibuyisele ku-zero, cofa into eyiyo:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Emva kokucoca, iiteksi zethu zijongeka ngolu hlobo (sele zilinde umcwangcisi ukuba azicwangcise):

I-Apache Airflow: Ukwenza i-ETL ibe lula

Ukudibanisa, iigwegwe kunye nezinye izinto eziguquguqukayo

Lixesha lokujonga iDAG elandelayo, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа Ρ…ΠΎΡ€ΠΎΡˆΠΈΠ΅, ΠΎΡ‚Ρ‡Π΅Ρ‚Ρ‹ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Ρ‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ΠΠ°Ρ‚Π°Ρˆ, просыпайся, ΠΌΡ‹ {{ dag.dag_id }} ΡƒΡ€ΠΎΠ½ΠΈΠ»ΠΈ
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Ngaba wonke umntu wakha wenza uhlaziyo lwengxelo? Nguye lo kwakhona: kukho uluhlu lwemithombo evela apho unokufumana khona idatha; kukho uluhlu apho lubekwe khona; ungalibali ukucula xa yonke into yenzeka okanye yaphuka (kakuhle, oku akukho ngathi, hayi).

Masingene kwifayile kwakhona kwaye sijonge izinto ezintsha ezingacacanga:

  • from commons.operators import TelegramBotSendMessage -akukho nto isithintelayo ekwenzeni abethu abaqhubi, esithe sathatha ithuba ngokwenza i-wrapper encinci yokuthumela imiyalezo kwi-Unblocked. (Siza kuthetha ngakumbi ngalo mqhubi apha ngezantsi);
  • default_args={} - i-dag inokuhambisa iingxoxo ezifanayo kubo bonke abaqhubi bayo;
  • to='{{ var.value.all_the_kings_men }}' - intsimi to asiyi kuba ne-hardcoded, kodwa yenziwe ngokuguquguqukayo kusetyenziswa iJinja kunye noguquko ngoluhlu lwee-imeyile, endizibeke ngononophelo. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS β€” imeko yokuqalisa umqhubi. Kwimeko yethu, ileta iya kubhabha kubaphathi kuphela ukuba bonke abaxhomekeke kuye basebenza ngempumelelo;
  • tg_bot_conn_id='tg_main' - iingxoxo conn_id yamkela ii-ID zoqhagamshelo esizidalayo Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED -imiyalezo kwiTelegram iya kubhabha kuphela ukuba kukho imisebenzi ewileyo;
  • task_concurrency=1 - siyakwalela ukusungulwa ngaxeshanye kwemisebenzi emininzi yomsebenzi omnye. Ngaphandle koko, siya kufumana ukuqaliswa ngaxeshanye ezininzi VerticaOperator (ejonga itafile enye);
  • report_update >> [email, tg] - zonke VerticaOperator dibana ekuthumeleni iileta kunye nemiyalezo, ngolu hlobo:
    I-Apache Airflow: Ukwenza i-ETL ibe lula

    Kodwa kuba abasebenzi bezazisi baneemeko ezahlukeneyo zokuqaliswa, inye kuphela eya kusebenza. KwiJongo loMthi, yonke into ibonakala incinci kancinci:
    I-Apache Airflow: Ukwenza i-ETL ibe lula

Ndiza kuthetha amagama ambalwa malunga iimakhro kunye nabahlobo babo - ezahlukeneyo.

Iimakhro zizibambi-ndawo zeJinja ezinokubeka endaweni yolwazi oluninzi oluluncedo kwiingxoxo zabaqhubi. Umzekelo, njengale:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} iya kwandisa imixholo yokuguquguquka kwemeko execution_date kwifomathi YYYY-MM-DD: 2020-07-14. Elona candelo lingcono kukuba iimeko eziguquguqukayo zibethelelwa kumzekelo othile womsebenzi (isikwere kwiJongo loMthi), kwaye xa iqalwa ngokutsha, izibambi-ndawo ziya kwanda ziye kumaxabiso afanayo.

Amaxabiso anikiweyo anokujongwa kusetyenziswa iqhosha eliNikelwe kumzekelo ngamnye womsebenzi. Nantsi indlela umsebenzi wokuthumela ileta:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Ke ngoko emsebenzini ngokuthumela umyalezo:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Uluhlu olupheleleyo lweemakhro ezakhelwe ngaphakathi zoguqulelo olukhoyo lwamva nje luyafumaneka apha: ireferensi ye-macros

Ngapha koko, ngoncedo lweeplagi, sinokubhengeza ezethu iimacros, kodwa lelinye ibali.

Ukongeza kwizinto ezichazwe kwangaphambili, sinokutshintsha amaxabiso eenguqu zethu (sele ndiyisebenzise le khowudi ingentla). Masidale ngaphakathi Admin/Variables izinto ezimbini:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Yonke into ongayisebenzisa:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Ixabiso linokuba yi-scalar, okanye inokuba yi-JSON. Kwimeko ye-JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

sebenzisa nje umendo oya kwisitshixo esifunekayo: {{ var.json.bot_config.bot.token }}.

Ndiza kuthetha igama elinye kwaye ndibonise umfanekiso wekhusi malunga unxibelelwano. Yonke into isisiseko apha: kwiphepha Admin/Connections sidala uqhagamshelo, songeza ii-logins / amagama ayimfihlo kunye neeparameters ezithile ngakumbi apho. Ndiyayithanda lento:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Amagama okugqithisa angenziwa ngoguqulelo oluntsonkothileyo (ngokucokisekileyo kunokungagqibekanga), okanye ungalushiya ngaphandle uhlobo lomdibaniso (njengoko ndenze tg_main) - inyani yeyokuba uluhlu lweentlobo luqinisiwe kwiimodeli ze-Airflow kwaye alunakwandiswa ngaphandle kokungena kwiikhowudi zomthombo (ukuba ngequbuliso andizange ndijonge into ethile, nceda undilungise), kodwa akukho nto iya kusithintela ekufumaneni iikhredithi igama.

Unokwenza udibaniso oluninzi ngegama elifanayo: kulo mzekelo, indlela BaseHook.get_connection(), esifumana unxibelelwano ngegama, uya kunika random ukusuka kwiinamesakes ezininzi (kuya kuba sengqiqweni ngakumbi ukwenza iRound Robin, kodwa masiyishiye kwisazela sabaphuhlisi beAirflow).

Izinto eziguquguqukayo kunye noQhagamshelwano ngokuqinisekileyo zizixhobo ezipholileyo, kodwa kubalulekile ukuba ungaphulukani nebhalansi: zeziphi iinxalenye zokuhamba kwakho ozigcina kwikhowudi ngokwayo, kwaye zeziphi iindawo ozinika i-Airflow yokugcina. Ngakolunye uhlangothi, ukutshintsha ngokukhawuleza ixabiso, umzekelo, ibhokisi lokuposa, linokuba lula nge-UI. Kwelinye icala, oku kuseyimbuyekezo kunqakrazo lwemouse, apho thina (mna) sasifuna ukuyisusa.

Ukusebenza ngoqhagamshelwano ngomnye wemisebenzi amagwegwe. Ngokubanzi, i-Airflow hook ngamanqaku okuyidibanisa kwiinkonzo zomntu wesithathu kunye namathala eencwadi. Umzekelo, JiraHook iya kusivulela umxhasi ukuba sisebenzisane noJira (ungahambisa imisebenzi emva naphambili), kwaye ngoncedo lwe SambaHook ungatyhala ifayile yendawo kuyo smb-inqaku.

Ukwahlulahlula umsebenzisi oqhelekileyo

Kwaye sisondele ekujongeni indlela eyenziwe ngayo TelegramBotSendMessage

Ikhowudi commons/operators.py ngoyena msebenzisi:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Apha, njengayo yonke enye into kwi-Airflow, yonke into ilula kakhulu:

  • Ilifa elivela BaseOperator, ephumeza izinto ezimbalwa ezithe ngqo kwi-Airflow (jonga ukuphumla kwakho)
  • Imimandla echaziweyo template_fields, apho iJinja iya kukhangela i-macros ukuba iqhubeke.
  • Lungiselela iingxoxo ezifanelekileyo __init__(), seta okungagqibekanga apho kuyimfuneko.
  • Asizange silibale nangokuqaliswa kwesinyanya.
  • Wavula ikhonkco elihambelanayo TelegramBotHookifumene into yomxhasi kuyo.
  • Indlela ebhalwe ngaphezulu (echazwe ngokutsha). BaseOperator.execute(), leyo i-Airfow iya kubetha xa kufika ixesha lokuqalisa umqhubi - kuyo siya kuphumeza isenzo esiphambili, silibale ukungena. (Singena, ngendlela, kanye stdout ΠΈ stderr -Ukuhamba komoya kuya kunqanda yonke into, isonge kakuhle, ibole apho kuyimfuneko.)

Makhe sibone into esinayo commons/hooks.py. Inxalenye yokuqala yefayile, kunye nekhonkco ngokwayo:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Andazi nokuba mandichaze ntoni apha, ndiza kuqaphela amanqaku abalulekileyo:

  • Sizuza ilifa, cinga malunga neengxabano - kwiimeko ezininzi iya kuba enye: conn_id;
  • Ukongamela iindlela ezisemgangathweni: Ndizithintele get_conn(), apho ndifumana iparameters zoqhagamshelwano ngegama kwaye ndifumane nje icandelo extra (le yintsimi ye-JSON), apho mna (ngokwemiyalelo yam!) ndibeka uphawu lwebhoti yeTelegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ndenza umzekelo wethu TelegramBot, ukunika uphawu oluthile.

Kuko konke. Unokufumana umxhasi kwikhonkco usebenzisa TelegramBotHook().clent okanye TelegramBotHook().get_conn().

Kwaye inxalenye yesibini yefayile, apho ndenza i-microwrapper yeTelegram REST API, ukuze ungatsali okufanayo. python-telegram-bot indlela enye sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Indlela echanekileyo kukudibanisa yonke into: TelegramBotSendMessage, TelegramBotHook, TelegramBot - kwi-plugin, faka kwindawo yokugcina yoluntu, kwaye uyinike kwi-Open Source.

Ngelixa sifunda konke oku, uhlaziyo lwethu lwengxelo lukwazile ukungaphumeleli kwaye lundithumele umyalezo wempazamo kwisitishi. Ndizakujonga ukuba ayilunganga na...

I-Apache Airflow: Ukwenza i-ETL ibe lula
Kukho into eqhekekileyo kwidoji yethu! Ngaba asiyiyo le nto besiyilindele? Ngqo!

Uya kugalela?

Ngaba uvakalelwa kukuba kukho into endiyiphosileyo? Kubonakala ngathi uthembise ukudlulisa idatha kwi-SQL Server ukuya kwi-Vertica, kwaye emva koko wayithabatha kwaye wayisusa kwisihloko, i-scoundrel!

Esi sigebenga sasisenziwa ngabom, kwafuneka ndikucacisele isigama esithile. Ngoku ungaya phambili.

Isicwangciso sethu ibisithi:

  1. Yenza dag
  2. Veza imisebenzi
  3. Jonga indlela entle ngayo yonke into
  4. Yabela amanani eseshoni ukuba azalise
  5. Fumana idatha kwi-SQL Server
  6. Faka idatha kwiVertica
  7. Qokelela amanani

Ke, ukuze yonke le nto isebenze, ndenze ukongeza okuncinci kwethu docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Apho siphakamisa:

  • Vertica njengomamkeli dwh ngezona zicwangciso ezihlala zikhona,
  • Imizekelo emithathu ye-SQL Server,
  • sigcwalisa i-database ekugqibeleni ngedatha ethile (akukho meko ungajongi kuyo mssql_init.py!)

Siphehlelela konke okuhle ngoncedo lomyalelo onzima ngakumbi kunexesha elidlulileyo:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Yintoni eveliswe ngummangaliso wethu we-randomizer, ungasebenzisa loo nto Data Profiling/Ad Hoc Query:

I-Apache Airflow: Ukwenza i-ETL ibe lula
Into ephambili ayiyikubonisa kubahlalutyi

cacisa ngakumbi Iiseshoni ze-ETL Andiyi, yonke into incinci apho: senza isiseko, kukho uphawu kuyo, sisonga yonke into ngomphathi womxholo, kwaye ngoku senza oku:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

iseshoni.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Ixesha lifikile qokelela idatha yethu kwiitafile zethu ezilikhulu elinesiqingatha. Masenze oku ngoncedo lwemigca engathobekiyo kakhulu:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Ngoncedo lwehuku sifumana kwi-Airflow pymssql-dibanisa
  2. Masitshintshe isithintelo ngohlobo lomhla kwisicelo - siya kuphoswa kumsebenzi yi-injini yetemplate.
  3. Ukondla isicelo sethu pandasngubani oza kusifumana DataFrame - iya kuba luncedo kuthi kwixesha elizayo.

Ndisebenzisa indawo {dt} endaweni yesicelo iparameter %s kungekhona ngenxa yokuba ndinguPinocchio ongendawo, kodwa ngenxa yokuba pandas ayikwazi ukusingatha pymssql kwaye utyibilika eyokugqibela params: Listnangona efuna ngokwenene tuple.
Kwakhona qaphela ukuba umphuhlisi pymssql wagqiba kwelokuba angaphindi ndimxhase, kwaye lixesha lokuphuma pyodbc.

Makhe sibone ukuba i-Airflow ifake ntoni kwiingxoxo zemisebenzi yethu:

I-Apache Airflow: Ukwenza i-ETL ibe lula

Ukuba akukho datha, ke akukho nqaku lokuqhubeka. Kodwa kwakhona kuyamangalisa ukuqwalasela ukuzaliswa ngempumelelo. Kodwa oku akuyompazamo. A-ah-ah, ukwenza ntoni?! Kwaye nantsi into:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException uya kuxelela Airflow ukuba akukho mpazamo, kodwa siyawutsiba umsebenzi. I-interface ayiyi kuba nesikwere esiluhlaza okanye esibomvu, kodwa i-pink.

Masilahle idatha yethu iikholamu ezininzi:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Ngaloo ndlela:

  • Uvimba weenkcukacha esithathe kuwo iiodolo,
  • I-ID yeseshoni yethu yezikhukula (iya kwahluka kuwo wonke umsebenzi),
  • I-hash evela kumthombo kunye ne-ID yomyalelo - ukwenzela ukuba kwi-database yokugqibela (apho yonke into igalelwe kwitafile enye) sine-ID yomyalelo owodwa.

Inyathelo elingaphambili lihleli: galela yonke into kwiVertica. Kwaye, ngokungaqhelekanga, enye yezona ndlela zibalaseleyo nezisebenzayo zokwenza oku kukusebenzisa i-CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Senza umamkeli okhethekileyo StringIO.
  2. pandas izobeka wethu ngobubele DataFrame kwifom CSV-imigca.
  3. Masivule umdibaniso kwiVertica yethu esiyithandayo ngehuku.
  4. Kwaye ngoku ngoncedo copy() thumela idatha yethu ngqo kwiVertika!

Siza kuthatha kumqhubi ukuba mingaphi na imigca eyazaliswayo, kwaye uxelele umphathi weseshoni ukuba yonke into ilungile:

session.loaded_rows = cursor.rowcount
session.successful = True

Kuko konke.

Kwintengiso, senza ipleyiti ekujoliswe kuyo ngesandla. Apha ndizivumele umatshini omncinci:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Ndiya sebenzisa VerticaOperator() Ndidala i-schema yedatha kunye netafile (ukuba azikho, kunjalo). Into ephambili kukulungiselela ngokuchanekileyo ukuxhomekeka:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Ukuqulunqa

- Ewe, - yathi impuku encinci, - akunjalo, ngoku
Ngaba uqinisekile ukuba ndisesona silwanyana soyikekayo ehlathini?

UJulia Donaldson, iGruffalo

Ndicinga ukuba mna noogxa bam sinokhuphiswano: ngubani oza kudala kwaye aqalise inkqubo ye-ETL ukusuka ekuqaleni: bona kunye ne-SSIS yabo kunye nempuku kunye nam nge-Airflow ... Kwaye ke siya kuthelekisa ukukhululeka kokugcinwa ... Wow, ndicinga ukuba uya kuvuma ukuba ndiza kubabetha kuzo zonke iinkalo!

Ukuba kancinci ngakumbi, ke iApache Airflow - ngokuchaza iinkqubo ngendlela yekhowudi yenkqubo - ndenze umsebenzi wam kakhulu ukhululekile kwaye uyonwabisa.

Ukwandiswa kwayo okungenamkhawulo, kokubini ngokwemigaqo ye-plug-ins kunye ne-predisposition to scalability, ikunika ithuba lokusebenzisa i-Airflow phantse kuyo nayiphi na indawo: nakumjikelezo opheleleyo wokuqokelela, ukulungiselela kunye nokucubungula idatha, nasekuqaliseni iirokethi (ukuya ku-Mars, ikhosi).

Inxalenye yokugqibela, ireferensi kunye nolwazi

Iraki esikuqokelelele yona

  • start_date. Ewe, le sele iyimeme yendawo. Ngengxoxo ephambili kaDoug start_date zonke zidlule. Ngokufutshane, ukuba uyayichaza start_date umhla wangoku, kunye schedule_interval - ngenye imini, ke DAG uya kuqala ngomso akukho ngaphambili.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Kwaye akusekho ngxaki.

    Kukho enye impazamo yexesha lokusebenza ehambelana nayo: Task is missing the start_date parameter, ehlala ibonisa ukuba ulibele ukubophelela kumsebenzisi wedag.

  • Konke kumatshini omnye. Ewe, kunye neziseko (I-Airflow ngokwayo kunye neengubo zethu), kunye nomncedisi wewebhu, kunye nomcwangcisi, kunye nabasebenzi. Kwaye yade yasebenza. Kodwa ekuhambeni kwexesha, inani lemisebenzi yeenkonzo likhule, kwaye xa i-PostgreSQL iqala ukuphendula kwisalathisi kwi-20 s endaweni ye-5 ms, sayithatha sayithwala.
  • UMlawuli wendawo. Ewe, sisahleli phezu kwayo, kwaye sele sifikile ekupheleni kwenzonzobila. I-LocalExecutor isanele kuthi ukuza kuthi ga ngoku, kodwa ngoku lixesha lokwandisa nomsebenzi omnye, kwaye kuya kufuneka sisebenze nzima ukuya kwiCeleryExecutor. Kwaye ngenxa yokuba unokusebenza nayo kumatshini omnye, akukho nto ikuthintela ekusebenziseni iCelery nakwiseva, ethi "ewe, ayisoze yangena kwimveliso, ngokunyaniseka!"
  • Ukungasetyenziswa izixhobo ezakhelwe ngaphakathi:
    • Uxhumo ukugcina iziqinisekiso zenkonzo,
    • SLA Uyakhumbula ukuphendula kwimisebenzi engakhange isebenze ngexesha,
    • xcom kutshintshiselwano lwemetadata (ndatsho Imetaidatha!) phakathi kwemisebenzi yedag.
  • Ukuxhatshazwa kwemeyile. Kulungile, ndithini? Izilumkiso zazisekiwe kulo lonke uphindo lwemisebenzi ewileyo. Ngoku umsebenzi wam we-Gmail une>90k yee-imeyile ezisuka kwi-Airflow, kwaye umbhobho weimeyile yewebhu uyala ukuchola nokucima ngaphezulu kwe-100 ngexesha.

Imigibe engakumbi: I-Apache Airflow Pitfails

Izixhobo ezizenzekelayo ezingakumbi

Ukuze sisebenze ngakumbi ngeentloko zethu hayi ngezandla zethu, iAirflow isilungiselele oku:

  • I-API yokuphinda - usenayo imo ye-Experimental, engamthinteli ekusebenzeni. Ngayo, awukwazi ukufumana kuphela ulwazi malunga needags kunye nemisebenzi, kodwa uphinde uyeke / uqale i-dag, yenza i-DAG Run okanye i-pool.
  • CLI - izixhobo ezininzi ziyafumaneka ngelayini yomyalelo engeyiyo nje into engalunganga ukuyisebenzisa ngeWebUI, kodwa azikho ngokubanzi. Umzekelo:
    • backfill efunekayo ukuqalisa kwakhona iimeko zomsebenzi.
      Ngokomzekelo, abahlalutyi beza bathi: β€œKwaye wena, qabane, awunamdenge kwidatha ukususela ngomhla woku-1 ukuya kowe-13 kuJanuwari! Yilungise, yilungise, yilungise, yilungise!" Kwaye uyihob enjalo:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Inkonzo esisiseko: initdb, resetdb, upgradedb, checkdb.
    • run, ekuvumela ukuba wenze umsebenzi omnye womzekelo, kunye namanqaku kuko konke ukuxhomekeka. Ngaphezu koko, ungayiqhuba nge LocalExecutor, nokuba uneqela leCelery.
    • Yenza into efanayo test, kuphela nakwiziseko akabhali nto.
    • connections ivumela ukudalwa kobunzima boqhagamshelwano olusuka kwiqokobhe.
  • IPython API -indlela enzima kakhulu yokunxibelelana, eyenzelwe iiplagi, kwaye inganyakazi kuyo ngezandla ezincinci. Kodwa ngubani oza kusinqanda ukuba singayi kuye /home/airflow/dags, baleka ipython kwaye uqale ukumosha? Unako, umzekelo, ukuthumela ngaphandle lonke uqhagamshelo ngale khowudi ilandelayo:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Iqhagamshela kwi-Airflow metadatabase. Andiyicebisi ukuba ndiyibhalele, kodwa ukufumana iindawo zomsebenzi kwiimethrikhi ezahlukeneyo kunokukhawuleza kwaye kube lula kunayo nayiphi na i-APIs.

    Masithi asiyiyo yonke imisebenzi yethu engenamandla, kodwa ngamanye amaxesha inokuwa, kwaye oku kuqhelekile. Kodwa iibhlokhi ezimbalwa sele zikrokreleka, kwaye kuya kuba yimfuneko ukujonga.

    Lumka iSQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

iimbekiselo

Kwaye kunjalo, amakhonkco alishumi okuqala ukusuka ekukhutshweni kweGoogle yimixholo yeAirflow ifolda ukusuka kwiibhukhimakhi zam.

Kwaye amakhonkco asetyenziswe kwinqaku:

umthombo: www.habr.com