Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

Moni, ndine Dmitry Logvinenko - Data Engineer of the Analytics Department of the Vezet group of companies.

Ndikuuzani za chida chabwino kwambiri chopangira njira za ETL - Apache Airflow. Koma Airflow ndi yosunthika komanso yochulukirapo kotero kuti muyenera kuyang'anitsitsa ngakhale simukukhudzidwa ndi kayendedwe ka deta, koma mukufunikira kuyambitsa ndondomeko iliyonse ndikuwunika momwe amachitira.

Ndipo inde, sindidzangonena, komanso ndikuwonetsa: pulogalamuyi ili ndi ma code ambiri, zithunzi ndi malingaliro.

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta
Zomwe mumawona nthawi zambiri mukamayika google mawu akuti Airflow / Wikimedia Commons

Zamkatimu

Mau oyamba

Apache Airflow ili ngati Django:

  • yolembedwa mu python
  • pali gulu lalikulu la admin,
  • kukula kosatha

- zabwinoko zokha, ndipo zidapangidwira zolinga zosiyana, zomwe ndi (monga momwe zidalembedwera pakat):

  • kuyendetsa ndikuyang'anira ntchito pamakina ambiri opanda malire (monga Selari / Kubernetes ambiri ndi chikumbumtima chanu chidzakulolani)
  • ndi m'badwo wosunthika wamayendedwe osavuta kulemba ndikumvetsetsa Python code
  • ndikutha kulumikiza nkhokwe zilizonse ndi ma API wina ndi mnzake pogwiritsa ntchito zida zonse zokonzeka komanso mapulagini opangidwa kunyumba (omwe ndi osavuta kwambiri).

Timagwiritsa ntchito Apache Airflow motere:

  • timasonkhanitsa deta kuchokera kuzinthu zosiyanasiyana (zambiri za SQL Server ndi PostgreSQL, ma API osiyanasiyana okhala ndi ma metrics ogwiritsira ntchito, ngakhale 1C) mu DWH ndi ODS (tili ndi Vertica ndi Clickhouse).
  • zapita patsogolo bwanji cron, yomwe imayamba njira zophatikizira deta pa ODS, ndikuwunikanso kukonza kwawo.

Mpaka posachedwa, zosowa zathu zidaphimbidwa ndi seva imodzi yaying'ono yokhala ndi ma cores 32 ndi 50 GB ya RAM. Mu Airflow, izi zimagwira ntchito:

  • kuposa 200 magalamu (Zomwe timagwira ntchito, zomwe timayikamo),
  • aliyense pa avareji 70 ntchito,
  • ubwino uwu umayamba (komanso pafupifupi) kamodzi pa ola.

Ndipo za momwe tidakulitsira, ndilemba pansipa, koma tsopano tiyeni tifotokozere vuto la ΓΌber lomwe tithane nalo:

Pali ma Server atatu oyambirira a SQL, iliyonse ili ndi nkhokwe 50 - zochitika za polojekiti imodzi, motero, ali ndi dongosolo lomwelo (pafupifupi kulikonse, mua-ha-ha), zomwe zikutanthauza kuti aliyense ali ndi tebulo la Orders (mwamwayi, tebulo ndi izo). dzina likhoza kukankhidwira mubizinesi iliyonse). Timatenga zidziwitsozo powonjezera magawo a ntchito (seva yoyambira, nkhokwe, ID yantchito ya ETL) ndikuziponya mosasamala, titi, Vertica.

Tiyeni tipite!

Gawo lalikulu, lothandiza (komanso longoyerekeza pang'ono)

Chifukwa chiyani ife (ndi inu)

Pamene mitengo inali yaikulu ndipo ndinali wosavuta SQL-schik mu ritelo imodzi yaku Russia, tidasokoneza njira za ETL aka kuyenda kwa data pogwiritsa ntchito zida ziwiri zomwe tili nazo:

  • Informatica Power Center - dongosolo lofalikira kwambiri, lopanga kwambiri, lokhala ndi zida zake, kumasulira kwake. Ndinagwiritsa ntchito Mulungu aletse 1% ya mphamvu zake. Chifukwa chiyani? Chabwino, choyamba, mawonekedwe awa, kwinakwake kuchokera m'zaka za m'ma 380, amaika maganizo athu pa ife. Kachiwiri, contraption iyi idapangidwira njira zapamwamba kwambiri, kugwiritsanso ntchito kwaukali ndi zina zofunika kwambiri zamabizinesi. Zakuti zimawononga ndalama, monga mapiko a Airbus AXNUMX / chaka, sitinena kalikonse.

    Chenjerani, chithunzithunzi chikhoza kuvulaza anthu ochepera zaka 30 pang'ono

    Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

  • SQL Server Integration Server - tidagwiritsa ntchito comrade iyi mumayendedwe athu amkati. Chabwino, kwenikweni: timagwiritsa ntchito kale SQL Server, ndipo zingakhale zopanda nzeru kuti tisagwiritse ntchito zida zake za ETL. Chilichonse chomwe chili mmenemo ndi chabwino: mawonekedwe onsewa ndi okongola, ndipo malipoti akupita patsogolo ... Koma sichifukwa chake timakonda mapulogalamu a mapulogalamu, o, osati izi. Sinthani izo dtsx (yomwe ndi XML yokhala ndi ma node osakanikirana) titha, koma ndi chiyani? Nanga bwanji kupanga phukusi lantchito lomwe lingakokere mazana a matebulo kuchokera pa seva imodzi kupita ku ina? Inde, zana lanji, chala chanu cholozera chidzagwa kuchokera ku zidutswa makumi awiri, ndikudina batani la mbewa. Koma zikuwoneka bwino kwambiri:

    Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

Ndithudi tinafunafuna njira zothetsera. Mlandu ngakhale pafupifupi adafika pa jenereta yodzilemba yokha ya SSIS ...

…ndipo ntchito ina inandipeza. Ndipo Apache Airflow idandipeza pamenepo.

Nditazindikira kuti mafotokozedwe a ETL ndi njira yosavuta ya Python, sindinavine mosangalala. Umu ndi momwe mitsinje ya data idasinthidwira ndikusiyana, ndikutsanulira matebulo okhala ndi mawonekedwe amodzi kuchokera pamasamba mazana ambiri kukhala chandamale imodzi idakhala nkhani ya Python pazithunzi chimodzi ndi theka kapena ziwiri 13 ”.

Kusonkhanitsa gulu

Tiyeni tisamakonzere sukulu ya kindergarten, osalankhula za zinthu zodziwikiratu pano, monga kukhazikitsa Airflow, Nawonso achichepere omwe mwasankha, Selari ndi milandu ina yomwe ikufotokozedwa m'madoko.

Kuti titha kuyamba zoyeserera nthawi yomweyo, ndidajambula docker-compose.yml momwe:

  • Tiyeni tikweze Mayendedwe ampweya: Scheduler, Webserver. Maluwa adzakhalanso akuzungulira pamenepo kuti aziyang'anira ntchito za Selari (chifukwa adakankhidwira kale apache/airflow:1.10.10-python3.7, koma sitisamala)
  • PostgreSQL, momwe Airflow idzalemba zidziwitso zake zautumiki (ma data a scheduler, ziwerengero zakupha, etc.), ndipo Selari idzalemba ntchito zomwe zatsirizidwa;
  • Redis, yomwe idzachita ngati wogulitsa ntchito kwa Selari;
  • Selari wogwira ntchito, yomwe idzagwire ntchito mwachindunji.
  • Ku foda ./dags tidzawonjezera mafayilo athu ndi mafotokozedwe a dags. Adzanyamulidwa pa ntchentche, kotero palibe chifukwa chogwedeza mulu wonse mutatha kuyetsemula.

M'malo ena, kachidindo m'zitsanzo sichimasonyezedwa kwathunthu (kuti musasokoneze malemba), koma kwinakwake amasinthidwa. Zitsanzo zamtundu wathunthu zogwirira ntchito zitha kupezeka m'malo osungira https://github.com/dm-logv/airflow-tutorial.

makina oyimba.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Mfundo:

  • Pamsonkhano wa zolembazo, ndinadalira kwambiri chithunzi chodziwika bwino puckel/docker-airflow - onetsetsani kuti mwachiwona. Mwina simukusowa china chilichonse pamoyo wanu.
  • Zokonda zonse za Airflow zimapezeka osati kudzera airflow.cfg, komanso kudzera mumitundu yosiyanasiyana (zikomo kwa opanga), zomwe ndidapezerapo mwayi.
  • Mwachilengedwe, sizokonzekera kupanga: Sindinayike dala kugunda kwamtima pazotengera, sindinavutike ndi chitetezo. Koma ndinachita zochepa zoyenera kwa oyesera athu.
  • Zindikirani kuti:
    • Foda ya dag iyenera kupezeka kwa onse okonza mapulani komanso ogwira ntchito.
    • Zomwezo zimagwiranso ntchito ku malaibulale onse a chipani chachitatu - onse ayenera kukhazikitsidwa pamakina omwe ali ndi ndandanda ndi antchito.

Chabwino, tsopano ndi zophweka:

$ docker-compose up --scale worker=3

Zonse zikakwera, mutha kuyang'ana pa intaneti:

Mfundo zazikulu

Ngati simunamvetse kalikonse mu "dags" zonsezi, ndiye kuti dikishonale yayifupi:

  • Scheduler - amalume ofunikira kwambiri mu Airflow, kuwongolera kuti maloboti agwire ntchito molimbika, osati munthu: amayang'anira ndandanda, kusintha ma dags, kuyambitsa ntchito.

    Nthawi zambiri, m'matembenuzidwe akale, anali ndi vuto la kukumbukira (ayi, osati amnesia, koma kutayikira) ndipo cholowacho chinakhalabe m'makonzedwe. run_duration - nthawi yake yoyambiranso. Koma tsopano zonse zili bwino.

  • DAG (aka "dag") - "directed acyclic graph", koma kutanthauzira koteroko kudzauza anthu ochepa, koma kwenikweni ndi chidebe cha ntchito zomwe zimagwirizana (onani m'munsimu) kapena analogue ya Phukusi mu SSIS ndi Workflow mu Informatica .

    Kuphatikiza pa ma dags, pangakhalebe ma subdags, koma mwina sitingafike kwa iwo.

  • Kuthamanga kwa DAG - dag yoyambira, yomwe imapatsidwa yake execution_date. Ma Dagrans a dag omwewo amatha kugwira ntchito limodzi (ngati mwapangitsa ntchito zanu kukhala zopanda ntchito, inde).
  • Woyendetsa ndi zidutswa za code zomwe zimagwira ntchito inayake. Pali mitundu itatu ya ogwira ntchito:
    • kuchitapomonga zomwe timakonda PythonOperator, yomwe imatha kugwiritsa ntchito code (yovomerezeka) ya Python;
    • tumizani, zomwe zimanyamula deta kuchokera kumalo kupita kumalo, kunena, MsSqlToHiveTransfer;
    • sensa Kumbali inayi, zimakupatsani mwayi wochitapo kanthu kapena kuchepetsa kupha kwa dag mpaka chochitika chichitike. HttpSensor akhoza kukoka mapeto otchulidwa, ndipo pamene yankho lofunidwa likudikirira, yambani kusamutsa GoogleCloudStorageToS3Operator. Munthu wofuna kudziwa zambiri amafunsa kuti: β€œChifukwa chiyani? Kupatula apo, mutha kubwerezanso mu opareshoni! ” Ndiyeno, kuti musatseke dziwe la ntchito ndi ogwira ntchito oimitsidwa. Sensa imayamba, imayang'ana ndikumwalira isanayambe kuyesanso.
  • Ntchito - olengezedwa ogwiritsira ntchito, mosasamala kanthu za mtundu, ndipo ophatikizidwa ku dag amakwezedwa paudindo wantchito.
  • chitsanzo cha ntchito - pamene wokonza mapulani adaganiza kuti inali nthawi yoti atumize ntchito kunkhondo kwa ogwira ntchito (pomwepo, ngati tigwiritsa ntchito LocalExecutor kapena ku node yakutali ngati CeleryExecutor), imawapatsa nkhani (mwachitsanzo, gulu la zosinthika - magawo ochitira), imakulitsa ma tempulo amafunso, ndikuwayika.

Timapanga ntchito

Choyamba, tiyeni tifotokoze dongosolo lonse la doug wathu, ndiyeno tizama mwatsatanetsatane, chifukwa timagwiritsa ntchito njira zina zomwe sizinali zazing'ono.

Chifukwa chake, mwanjira yake yosavuta, dag yotere idzawoneka motere:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Tiyeni tiwone izi:

  • Choyamba, ife kuitanitsa libs zofunika ndi chinthu china;
  • sql_server_ds Ndi List[namedtuple[str, str]] ndi mayina a maulumikizidwe ochokera ku Airflow Connections ndi nkhokwe zomwe tidzatengere mbale yathu;
  • dag - kulengeza kwa dag yathu, yomwe iyenera kukhalamo globals(), apo ayi Airflow siipeza. Doug nayenso ayenera kunena kuti:
    • dzina lake ndani orders - dzinali lidzawonekera pa intaneti,
    • kuti azigwira ntchito kuyambira pakati pausiku pa Julayi XNUMX,
    • ndipo iyenera kuthamanga, pafupifupi maola 6 aliwonse (kwa anyamata amphamvu pano m'malo mwake timedelta() zovomerezeka cron- mzere 0 0 0/6 ? * * *, kwa ochepera - mawu ngati @daily);
  • workflow() adzachita ntchito yayikulu, koma osati tsopano. Pakadali pano, tingotaya nkhani yathu mu chipika.
  • Ndipo tsopano matsenga osavuta kupanga ntchito:
    • timadutsa magwero athu;
    • yambitsani PythonOperator, yomwe idzachita dummy yathu workflow(). Musaiwale kutchula dzina lapadera (mkati mwa dag) la ntchitoyo ndikumangirira dag palokha. Mbendera provide_context nayenso, adzatsanulira mfundo zowonjezera mu ntchitoyi, yomwe tidzasonkhanitsa mosamala pogwiritsa ntchito **context.

Kwa tsopano, ndizo zonse. Zomwe tapeza:

  • dag yatsopano pa intaneti,
  • ntchito zana limodzi ndi theka zomwe zidzachitike mofanana (ngati Airflow, Selari zoikamo ndi mphamvu ya seva zimalola).

Chabwino, pafupifupi ndinachipeza icho.

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta
Ndani aziyika zodalira?

Kuti ndichepetse zonsezi, ndidalowa docker-compose.yml kukonza requirements.txt pa nodes zonse.

Tsopano ndi izi:

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

Mabwalo otuwa ndi zochitika zomwe zimakonzedwa ndi wokonza mapulani.

Tikudikirira pang'ono, ntchitozo zimasinthidwa ndi antchito:

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

Zobiriwira, ndithudi, zamaliza bwino ntchito yawo. Zofiira sizopambana kwambiri.

Mwa njira, palibe chikwatu pa prod yathu ./dags, palibe kulunzanitsa pakati pa makina - dags zonse zagona git pa Gitlab yathu, ndipo Gitlab CI imagawira zosintha pamakina polumikizana master.

Pang'ono ndi Flower

Pamene ogwira ntchito akuphwanya ma pacifiers athu, tiyeni tikumbukire chida china chomwe chingatiwonetse chinachake - Flower.

Tsamba loyamba lomwe lili ndi chidziwitso chachidule pamanodi antchito:

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

Tsamba lamphamvu kwambiri lomwe lili ndi ntchito zomwe zidayamba kugwira ntchito:

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

Tsamba lotopetsa kwambiri lomwe lili ndi udindo wa broker wathu:

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

Tsamba lowala kwambiri lili ndi ma graph a ntchito ndi nthawi yawo yochitira:

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

Timatsitsa zotsitsa

Choncho, ntchito zonse zatheka, mukhoza kunyamula ovulala.

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

Ndipo panali ambiri ovulazidwa - pazifukwa zina. Pankhani ya kugwiritsidwa ntchito moyenera kwa Airflow, mabwalo omwewa akuwonetsa kuti deta sinafike.

Muyenera kuyang'ana chipika ndikuyambitsanso zochitika zomwe zagwa.

Podina pa lalikulu lililonse, tiwona zomwe tingachite:

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

Inu mukhoza kutenga ndi kupanga Chotsani kugwa. Ndiye kuti, timayiwala kuti china chake chalephera pamenepo, ndipo ntchito yofananira ipita kwa wopanga.

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

Zikuwonekeratu kuti kuchita izi ndi mbewa ndi mabwalo onse ofiira sizowoneka bwino - izi sizomwe timayembekezera kuchokera ku Airflow. Mwachibadwa, tili ndi zida zowononga kwambiri: Browse/Task Instances

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

Tiyeni tisankhe chilichonse nthawi imodzi ndikukhazikitsanso ziro, dinani chinthu choyenera:

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

Pambuyo poyeretsa, ma taxi athu amawoneka motere (akuyembekezera kale kuti wokonza mapulani awakonzere):

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

Zogwirizana, mbewa ndi zosintha zina

Yakwana nthawi yoti tiwone DAG yotsatira, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа Ρ…ΠΎΡ€ΠΎΡˆΠΈΠ΅, ΠΎΡ‚Ρ‡Π΅Ρ‚Ρ‹ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Ρ‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ΠΠ°Ρ‚Π°Ρˆ, просыпайся, ΠΌΡ‹ {{ dag.dag_id }} ΡƒΡ€ΠΎΠ½ΠΈΠ»ΠΈ
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Kodi aliyense adalembapo lipoti? Uyu ndi iye kachiwiri: pali mndandanda wa magwero komwe mungapeze deta; pali ndandanda kumene kuika; musaiwale kulira zonse zikachitika kapena kusweka (chabwino, izi sizokhudza ife, ayi).

Tiyeni tidutsenso fayiloyo ndikuwona zatsopano zosamveka:

  • from commons.operators import TelegramBotSendMessage - palibe chomwe chimatilepheretsa kupanga operekera athu, omwe tidatengerapo mwayi popanga kapu yaing'ono yotumiza mauthenga ku Unblocked. (Tilankhula zambiri za wogwiritsa ntchito uyu pansipa);
  • default_args={} - dag ikhoza kugawa zotsutsana zomwezo kwa onse ogwira nawo ntchito;
  • to='{{ var.value.all_the_kings_men }}' - munda to Sitidzakhala ndi ma hardcode, koma opangidwa mwamphamvu pogwiritsa ntchito Jinja ndi zosinthika ndi mndandanda wa maimelo, omwe ndidayikamo mosamala. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - chikhalidwe choyambitsa woyendetsa. Kwa ife, kalatayo idzawulukira kwa mabwana pokhapokha ngati zodalira zonse zatha bwino;
  • tg_bot_conn_id='tg_main' - mikangano conn_id vomerezani ma ID olumikizana omwe timapangamo Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - mauthenga mu Telegraph amawuluka pokhapokha ngati pali ntchito zomwe zagwa;
  • task_concurrency=1 - timaletsa kuyambitsa nthawi imodzi kwa zochitika zingapo za ntchito imodzi. Kupanda kutero, tipeza kukhazikitsidwa kwakanthawi kochepa kwa angapo VerticaOperator (kuyang'ana pa tebulo limodzi);
  • report_update >> [email, tg] - onse VerticaOperator sinthani potumiza makalata ndi mauthenga, monga chonchi:
    Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

    Koma popeza ogwiritsa ntchito azidziwitso ali ndi zosiyana zoyambira, ndi imodzi yokha yomwe ingagwire ntchito. Mu Tree View, chilichonse chikuwoneka chocheperako:
    Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

Ine ndinena mawu ochepa za zazikulu ndi anzawo - zosintha.

Macro ndi zoikira malo za Jinja zomwe zimatha kusintha zambiri zothandiza m'makambirano a ogwiritsa ntchito. Mwachitsanzo, monga chonchi:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} idzakula ku zomwe zili muzosintha execution_date mu mawonekedwe YYYY-MM-DD: 2020-07-14. Gawo labwino kwambiri ndilakuti zosintha zamakina zimakhomeredwa ku zochitika zinazake (mzere mu Tree View), ndipo ikayambiranso, oikira malowo amakula kuzinthu zomwezo.

Makhalidwe omwe adapatsidwa amatha kuwonedwa pogwiritsa ntchito batani la Rendered pazochitika zilizonse. Umu ndi momwe ntchito yotumizira kalata:

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

Ndiye pa ntchito ndi kutumiza meseji:

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

Mndandanda wathunthu wama macro omangidwa amtundu waposachedwa ukupezeka apa: macros reference

Kuphatikiza apo, mothandizidwa ndi mapulagini, titha kulengeza ma macros athu, koma iyi ndi nkhani ina.

Kuphatikiza pa zinthu zomwe zafotokozedweratu, titha kulowetsa m'malo mwazosintha zathu (ndagwiritsa kale izi pama code pamwambapa). Tiyeni tipange mkati Admin/Variables zinthu ziwiri:

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

Zonse zomwe mungagwiritse ntchito:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Mtengo ukhoza kukhala scalar, kapena ukhoza kukhala JSON. Ngati JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

ingogwiritsani ntchito njira yopita ku kiyi yomwe mukufuna: {{ var.json.bot_config.bot.token }}.

Ndilankhula mawu amodzi ndikuwonetsa chithunzi chimodzi cha malumikizidwe. Chilichonse ndi choyambirira apa: patsamba Admin/Connections timapanga kulumikizana, kuwonjezera ma logins / mapasiwedi athu ndi magawo ena enieni pamenepo. Ngati chonchi:

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

Mawu achinsinsi amatha kubisika (mochuluka kuposa osakhazikika), kapena mutha kusiya mtundu wolumikizira (monga momwe ndidachitira tg_main) - Chowonadi ndi chakuti mndandanda wa mitunduyo ndi wokhazikika mumitundu ya Airflow ndipo sungathe kukulitsidwa popanda kulowa m'mabuku oyambira (ngati mwadzidzidzi sindinagwiritse ntchito google, chonde ndikonzereni), koma palibe chomwe chingatilepheretse kulandira ngongole pokhapokha dzina.

Mukhozanso kupanga maulendo angapo ndi dzina lomwelo: pamenepa, njira BaseHook.get_connection(), zomwe zimatipangitsa kulumikizana ndi dzina, zidzapereka mwachisawawa kuchokera ku namesakes angapo (zingakhale zomveka kupanga Round Robin, koma tiyeni tisiye pa chikumbumtima cha opanga Airflow).

Zosintha ndi Malumikizidwe ndi zida zabwino, koma ndikofunikira kuti musataye malire: ndi magawo ati amayendedwe anu omwe mumasunga mu code yokha, ndi magawo ati omwe mumapereka ku Airflow kuti musungidwe. Kumbali imodzi, zitha kukhala zabwino kusintha mwachangu mtengo, mwachitsanzo, bokosi lamakalata, kudzera mu UI. Kumbali inayi, izi zikadali kubwereranso kugunda kwa mbewa, komwe ife (ine) timafuna kuchotsa.

Kugwira ntchito mogwirizana ndi chimodzi mwazofunikira mbedza. Nthawi zambiri, ma mbewa a Airflow ndi malo olumikizirana ndi mautumiki a chipani chachitatu ndi malaibulale. Mwachitsanzo, JiraHook adzatsegula kasitomala kuti tizilumikizana ndi Jira (mutha kusuntha ntchito mmbuyo ndi mtsogolo), komanso mothandizidwa ndi SambaHook mutha kukankhira fayilo yakumaloko smb-mfundo.

Kufotokozera wogwiritsa ntchito mwachizolowezi

Ndipo ife tinayandikira kuyang'ana momwe izo zimapangidwira TelegramBotSendMessage

kachidindo commons/operators.py ndi woyendetsa weniweni:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Apa, monga china chilichonse mu Airflow, zonse ndi zophweka:

  • Cholowa kuchokera BaseOperator, yomwe imagwiritsa ntchito zinthu zingapo za Airflow (yang'anani nthawi yanu yopumula)
  • Minda yolengezedwa template_fields, momwe Jinja adzayang'ana ma macros kuti akonze.
  • Anakonza mfundo zoyenera __init__(), ikani zosintha ngati pakufunika.
  • Sitinaiwale za kukhazikitsidwa kwa makolo.
  • Anatsegula mbedza yogwirizana TelegramBotHookadalandira chinthu cha kasitomala kuchokera pamenepo.
  • Njira yowonjezereka (yofotokozedwanso). BaseOperator.execute(), yomwe Airfow idzagwedezeka ikafika nthawi yoyambitsa woyendetsa - mmenemo tidzakhazikitsa ntchito yaikulu, kuiwala kulowa. (Timalowa, mwa njira, momwemo stdout ΠΈ stderr - Kuyenda kwa mpweya kumasokoneza chilichonse, kukulunga mokongola, ndikuwola ngati kuli kofunikira.)

Tiyeni tiwone zomwe tili nazo commons/hooks.py. Gawo loyamba la fayilo, ndi mbedza yokha:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Sindikudziwa zomwe ndingafotokoze apa, ndingowona mfundo zofunika:

  • Timatenga cholowa, taganizirani za mikangano - nthawi zambiri idzakhala imodzi: conn_id;
  • Kuposa njira zokhazikika: Ndinadzichepetsera ndekha get_conn(), momwe ndimapezera magawo olumikizirana ndi dzina ndikungotenga gawolo extra (uwu ndi gawo la JSON), momwe ine (malinga ndi malangizo anga!) ndinayika chizindikiro cha Telegraph bot: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ndimapanga chitsanzo chathu TelegramBot, kupereka chizindikiro chenicheni.

Ndizomwezo. Mutha kupeza kasitomala ku mbedza pogwiritsa ntchito TelegramBotHook().clent kapena TelegramBotHook().get_conn().

Ndipo gawo lachiwiri la fayilo, momwe ndimapanga microwrapper ya Telegraph REST API, kuti ndisakokere zomwezo. python-telegram-bot kwa njira imodzi sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Njira yolondola ndikuwonjezera zonse: TelegramBotSendMessage, TelegramBotHook, TelegramBot - mu pulogalamu yowonjezera, ikani posungira anthu, ndikupatseni Open Source.

Tili kuphunzira zonsezi, zosintha zathu za lipoti zidalephera bwino ndikunditumizira uthenga wolakwika panjira. Ndifufuze kuti ndione ngati zalakwika...

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta
Chinachake chinathyoka m'kamwa mwathu! Kodi si zimene tinali kuyembekezera? Ndendende!

Kodi mukuthira?

Mukuwona kuti ndaphonyapo kanthu? Zikuwoneka kuti adalonjeza kusamutsa deta kuchokera ku SQL Server kupita ku Vertica, ndiyeno adayitenga ndikuchoka pamutuwo, wonyoza!

Nkhanza imeneyi inali yadala, ndinangofunika kukumasulirani mawu enaake. Tsopano mutha kupita patsogolo.

Plan yathu inali iyi:

  1. Kodi dag
  2. Pangani ntchito
  3. Onani kukongola kwa chilichonse
  4. Perekani manambala a gawo kuti mudzaze
  5. Pezani zambiri kuchokera ku SQL Server
  6. Ikani data mu Vertica
  7. Sungani ziwerengero

Chifukwa chake, kuti ndikwaniritse zonsezi, ndidawonjezera pang'ono ku zathu docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Pamenepo timakweza:

  • Vertica ngati woyang'anira dwh ndi makonda osasintha kwambiri,
  • magawo atatu a SQL Server,
  • timadzaza nkhokwe pamapeto pake ndi zina (nthawi zonse musayang'ane mssql_init.py!)

Timakhazikitsa zabwino zonse mothandizidwa ndi lamulo lovuta pang'ono kuposa nthawi yapitayi:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Zomwe zozizwitsa zathu za randomizer zidapanga, mutha kugwiritsa ntchito chinthucho Data Profiling/Ad Hoc Query:

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta
Chinthu chachikulu sikuwonetsa kwa akatswiri

fotokozani Gawo la ETL Sindingatero, zonse ndi zazing'ono pamenepo: timapanga maziko, pali chizindikiro mmenemo, timakulunga chirichonse ndi woyang'anira nkhani, ndipo tsopano tichita izi:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

gawo.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Nthawi yafika sonkhanitsani deta yathu kuchokera pa magome athu zana limodzi ndi theka. Tiyeni tichite izi mothandizidwa ndi mizere yonyozeka kwambiri:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Ndi chithandizo cha mbedza timapeza kuchokera ku Airflow pymssql- kugwirizana
  2. Tiyeni tisinthire chiletso mu mawonekedwe a tsiku mu pempho - lidzaponyedwa mu ntchito ndi injini ya template.
  3. Kudyetsa pempho lathu pandasamene atitenga ife DataFrame - zidzakhala zothandiza kwa ife mtsogolo.

Ndikugwiritsa ntchito m'malo {dt} m'malo mwa pempho parameter %s osati chifukwa ndine Pinocchio woyipa, koma chifukwa pandas sindingathe kupirira pymssql ndipo amazembera womaliza params: Listngakhale akufunadi tuple.
Onaninso kuti wopanga pymssql adaganiza zosiya kumuthandiza, ndipo nthawi yoti asamuke yafika pyodbc.

Tiyeni tiwone zomwe Airflow idayika mikangano yantchito zathu ndi:

Apache Airflow: Kupangitsa ETL Kukhala Yosavuta

Ngati palibe deta, ndiye kuti palibe chifukwa chopitirizira. Koma ndizodabwitsanso kulingalira kuti kudzazidwa kumakhala kopambana. Koma uku sikulakwa. A-ah-ah, kuchita chiyani?! Ndipo izi ndi zomwe:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException tidzauza Airflow kuti palibe zolakwika, koma timadumpha ntchitoyi. Mawonekedwewo sadzakhala ndi mabwalo obiriwira kapena ofiira, koma pinki.

Tiyeni tiponye ma data athu mizati yambiri:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Zotere:

  • Nawonso database yomwe tidatengerako maoda,
  • ID ya gawo lathu la kusefukira kwa madzi (zikhala zosiyana pa ntchito iliyonse),
  • Hashi yochokera ku gwero ndi ID - kotero kuti mu database yomaliza (komwe zonse zimatsanuliridwa patebulo limodzi) tili ndi ID yapadera.

Gawo lomaliza litsalira: tsanulirani chilichonse ku Vertica. Ndipo, zodabwitsa mokwanira, imodzi mwa njira zochititsa chidwi komanso zothandiza kwambiri zochitira izi ndi CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Tikupanga cholandila chapadera StringIO.
  2. pandas adzaika wathu mokoma DataFrame mu mawonekedwe CSV-mizere.
  3. Tiyeni titsegule kulumikizana ndi Vertica yomwe timakonda ndi mbedza.
  4. Ndipo tsopano ndi chithandizo copy() tumizani deta yathu mwachindunji ku Vertika!

Tidzatenga kwa dalaivala mizere ingati yomwe idadzazidwa, ndikuwuza woyang'anira gawoli kuti zonse zili bwino:

session.loaded_rows = cursor.rowcount
session.successful = True

Ndizomwezo.

Pogulitsa, timapanga mbale yomwe tikufuna pamanja. Apa ndinadzilola ndekha makina ang'onoang'ono:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Ndikugwiritsa ntchito VerticaOperator() Ndimapanga schema ya database ndi tebulo (ngati palibe, ndithudi). Chinthu chachikulu ndikukonzekera bwino zodalira:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Kuphatikizidwa

- Chabwino, - anati mbewa yaying'ono, - sichoncho, tsopano
Kodi mukutsimikiza kuti ndine nyama yoopsa kwambiri m'nkhalangoyi?

Julia Donaldson, The Gruffalo

Ndikuganiza ngati ine ndi anzanga tikhala ndi mpikisano: ndani adzalenga mwamsanga ndikuyambitsa ndondomeko ya ETL kuyambira pachiyambi: iwo ndi SSIS yawo ndi mbewa ndi ine ndi Airflow ... Wow, ndikuganiza kuti mukuvomera kuti ndiwamenya mbali zonse!

Ngati mozama pang'ono, ndiye Apache Airflow - pofotokoza njira zamapulogalamu apulogalamu - idachita ntchito yanga. zambiri omasuka komanso osangalatsa.

Kutalikirana kwake kopanda malire, potsata mapulagini komanso kutengera scalability, kumakupatsani mwayi wogwiritsa ntchito Airflow pafupifupi m'dera lililonse: ngakhale paulendo wonse wosonkhanitsa, kukonzekera ndi kukonza zidziwitso, ngakhale poyambitsa maroketi (ku Mars, a njira).

Gawo lomaliza, zolemba ndi zambiri

Rake takusankhani

  • start_date. Inde, iyi ndi kale meme yakomweko. Kudzera mkangano waukulu wa Doug start_date zonse zimapita. Mwachidule, ngati mwatsimikiza start_date tsiku lapano, ndi schedule_interval - tsiku lina, ndiye DAG iyamba mawa palibe kale.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Ndipo palibenso mavuto.

    Palinso vuto lina la nthawi yoyendetsera ntchito lomwe likugwirizana nalo: Task is missing the start_date parameter, zomwe nthawi zambiri zimasonyeza kuti mwaiwala kumangiriza kwa dag operator.

  • Zonse pamakina amodzi. Inde, ndi maziko (Airflow yokha ndi zokutira zathu), ndi seva ya intaneti, ndi ndondomeko, ndi ogwira ntchito. Ndipo zinathandizanso. Koma patapita nthawi, chiwerengero cha ntchito za mautumiki chinakula, ndipo PostgreSQL itayamba kuyankha ndondomeko mu 20 s m'malo mwa 5 ms, tinayitenga ndikuyitenga.
  • LocalExecutor. Inde, tidakhalabe pamenepo, ndipo tabwera kale m'mphepete mwa phompho. LocalExecutor yatikwanira mpaka pano, koma tsopano ndi nthawi yoti tifutukule ndi wogwira ntchito m'modzi, ndipo tifunika kuyesetsa kusamukira ku CeleryExecutor. Ndipo poganizira kuti mutha kugwira nawo ntchito pamakina amodzi, palibe chomwe chimakulepheretsani kugwiritsa ntchito Selari ngakhale pa seva, zomwe "zachidziwikire, sizidzapanganso, moona mtima!"
  • Zosagwiritsa ntchito zida zomangidwa:
    • Kulumikizana kusunga zizindikiro za utumiki,
    • SLA Amaphonya kuyankha ntchito zomwe sizinachitike panthawi yake,
    • xcom pakusinthana kwa metadata (ndinatero metadata!) pakati pa ntchito za dag.
  • Kugwiritsa ntchito makalata molakwika. Chabwino, ndinganene chiyani? Zidziwitso zidakhazikitsidwa pazobwereza zonse za ntchito zomwe zidagwa. Tsopano ntchito yanga ya Gmail ili ndi maimelo a 90k ochokera ku Airflow, ndipo chinsinsi cha makalata apaintaneti chikukana kunyamula ndikuchotsa oposa 100 nthawi imodzi.

Zovuta zina: Apache Airflow Pitfails

Zida zowonjezera zokha

Kuti tigwire ntchito mochulukira ndi mitu yathu osati ndi manja athu, Airflow yatikonzera izi:

  • REST API - akadali ndi udindo wa Experimental, zomwe sizimamulepheretsa kugwira ntchito. Ndi izo, simungapeze zambiri zokhudza dags ndi ntchito, komanso kuyimitsa / kuyambitsa dag, pangani DAG Run kapena dziwe.
  • CLI - zida zambiri zimapezeka kudzera pamzere wolamula zomwe sizongovuta kugwiritsa ntchito kudzera pa WebUI, koma nthawi zambiri kulibe. Mwachitsanzo:
    • backfill zofunika kuyambitsanso zochitika zantchito.
      Mwachitsanzo, openda adabwera nati: β€œNdipo inu, comrade, muli ndi nkhani zopanda pake kuyambira pa Januware 1 mpaka 13! Konzani, konzani, konzani, konzani! Ndipo ndinu hob yotere:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Ntchito yoyambira: initdb, resetdb, upgradedb, checkdb.
    • run, zomwe zimakupatsani mwayi woyendetsa ntchito imodzi, komanso kugoletsa pazodalira zonse. Komanso, mukhoza kuthamanga kudzera LocalExecutor, ngakhale mutakhala ndi gulu la Selari.
    • Amachita chimodzimodzi test, kokha komanso m'munsi salemba kanthu.
    • connections amalola kupanga misa yolumikizana kuchokera ku chipolopolo.
  • python api - njira yolimba yolumikizirana, yomwe imapangidwira mapulagini, osati kudzaza ndi manja pang'ono. Koma ndani atiletse kuti tisapite /home/airflow/dags, thamanga ipython ndikuyamba kusokoneza? Mukhoza, mwachitsanzo, kutumiza mauthenga onse ndi code iyi:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Kulumikiza ku metadatabase ya Airflow. Sindikulimbikitsani kulembera, koma kupeza magawo azinthu zosiyanasiyana kumatha kukhala kwachangu komanso kosavuta kuposa kugwiritsa ntchito ma API aliwonse.

    Tinene kuti si ntchito zathu zonse zomwe zili zopanda ntchito, koma nthawi zina zimatha kugwa, ndipo izi ndizabwinobwino. Koma ma blockages ochepa amakayikira kale, ndipo pangafunike kuyang'ana.

    Chenjerani ndi SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

powatsimikizira

Ndipo zowonadi, maulalo khumi oyamba kuchokera pakutulutsidwa kwa Google ndi zomwe zili mufoda ya Airflow kuchokera ku ma bookmark anga.

Ndipo maulalo omwe agwiritsidwa ntchito m'nkhaniyi:

Source: www.habr.com