Apache Airflow: Kurahisisha ETL

Hujambo, mimi ni Dmitry Logvinenko - Mhandisi wa Data wa Idara ya Uchanganuzi wa kundi la kampuni za Vezet.

Nitakuambia juu ya zana nzuri ya kukuza michakato ya ETL - Apache Airflow. Lakini Airflow ni nyingi sana na ina mambo mengi kiasi kwamba unapaswa kuiangalia kwa karibu hata kama hauhusiki katika mtiririko wa data, lakini una hitaji la kuzindua michakato yoyote mara kwa mara na kufuatilia utekelezaji wao.

Na ndiyo, sitasema tu, lakini pia kuonyesha: programu ina kanuni nyingi, viwambo vya skrini na mapendekezo.

Apache Airflow: Kurahisisha ETL
Unachoona kwa kawaida unapo google neno Airflow / Wikimedia Commons

Meza ya yaliyomo

Utangulizi

Apache Airflow ni kama Django:

  • iliyoandikwa kwa chatu
  • kuna jopo kubwa la admin,
  • inayoweza kupanuliwa kwa muda usiojulikana

- bora tu, na ilitengenezwa kwa madhumuni tofauti kabisa, ambayo ni (kama ilivyoandikwa kabla ya kat):

  • kuendesha na kufuatilia kazi kwenye idadi isiyo na kikomo ya mashine (kama Celery / Kubernetes nyingi na dhamiri yako itakuruhusu)
  • na kizazi chenye nguvu cha mtiririko wa kazi kutoka rahisi sana kuandika na kuelewa nambari ya Python
  • na uwezo wa kuunganisha hifadhidata na API zozote kwa kutumia vijenzi vilivyotengenezwa tayari na programu-jalizi zilizotengenezwa nyumbani (ambayo ni rahisi sana).

Tunatumia Apache Airflow kama hii:

  • tunakusanya data kutoka vyanzo mbalimbali (matukio mengi ya Seva ya SQL na PostgreSQL, API mbalimbali zilizo na vipimo vya programu, hata 1C) katika DWH na ODS (tuna Vertica na Clickhouse).
  • jinsi ya juu cron, ambayo huanza michakato ya ujumuishaji wa data kwenye ODS, na pia inasimamia matengenezo yao.

Hadi hivi majuzi, mahitaji yetu yalifunikwa na seva moja ndogo na cores 32 na 50 GB ya RAM. Katika Airflow, hii inafanya kazi:

  • zaidi Dakika 200 (kwa kweli mtiririko wa kazi, ambao tuliingiza kazi),
  • katika kila wastani 70 kazi,
  • wema huu huanza (pia kwa wastani) mara moja kwa saa.

Na kuhusu jinsi tulivyopanuka, nitaandika hapa chini, lakini sasa hebu tufafanue ΓΌber-tatizo ambalo tutasuluhisha:

Kuna vyanzo vitatu vya Seva za SQL, kila moja ikiwa na hifadhidata 50 - mifano ya mradi mmoja, mtawaliwa, zina muundo sawa (karibu kila mahali, mua-ha-ha), ambayo inamaanisha kuwa kila moja ina jedwali la Maagizo (kwa bahati nzuri, meza iliyo na hiyo). jina linaweza kusukuma katika biashara yoyote). Tunachukua data kwa kuongeza sehemu za huduma (seva chanzo, hifadhidata ya chanzo, Kitambulisho cha kazi cha ETL) na kuzitupa kwa ujinga, tuseme, Vertica.

Hebu kwenda!

Sehemu kuu, ya vitendo (na ya kinadharia kidogo)

Kwa nini sisi (na wewe)

Wakati miti ilikuwa kubwa na nilikuwa rahisi SQL-schik katika rejareja moja ya Kirusi, tulilaghai michakato ya ETL aka mtiririko wa data kwa kutumia zana mbili zinazopatikana kwetu:

  • Kituo cha Nguvu cha Informatica - mfumo unaoenea sana, unaozalisha sana, na vifaa vyake, matoleo yake mwenyewe. Nilitumia Mungu apishe 1% ya uwezo wake. Kwa nini? Kweli, kwanza kabisa, kiolesura hiki, mahali fulani kutoka miaka ya 380, kiliweka shinikizo juu yetu kiakili. Pili, ukandamizaji huu umeundwa kwa ajili ya michakato ya dhana sana, utumiaji wa sehemu ya hasira na mbinu zingine muhimu sana za biashara. Kuhusu kile kinachogharimu, kama mrengo wa Airbus AXNUMX / mwaka, hatutasema chochote.

    Tahadhari, picha ya skrini inaweza kuumiza watu walio chini ya miaka 30 kidogo

    Apache Airflow: Kurahisisha ETL

  • Seva ya Ujumuishaji wa Seva ya SQL - tulimtumia mwenzetu huyu katika mtiririko wetu wa ndani ya mradi. Kweli, kwa kweli: tayari tunatumia Seva ya SQL, na itakuwa isiyo na maana kwa njia fulani kutotumia zana zake za ETL. Kila kitu ndani yake ni nzuri: interface zote mbili ni nzuri, na ripoti ya maendeleo ... Lakini hii sio kwa nini tunapenda bidhaa za programu, oh, si kwa hili. Toa toleo dtsx (ambayo ni XML iliyo na nodi zilizochanganyika kwenye kuokoa) tunaweza, lakini ni nini uhakika? Vipi kuhusu kutengeneza kifurushi cha kazi ambacho kitaburuta mamia ya meza kutoka kwa seva moja hadi nyingine? Ndio, ni mia gani, kidole chako cha index kitaanguka kutoka kwa vipande ishirini, kubonyeza kitufe cha panya. Lakini hakika inaonekana mtindo zaidi:

    Apache Airflow: Kurahisisha ETL

Hakika tulitafuta njia za kutoka. Kesi hata karibu ilikuja kwa jenereta ya kifurushi cha SSIS iliyojiandikisha ...

... na kisha kazi mpya ikanipata. Na Apache Airflow ilinipata juu yake.

Nilipogundua kuwa maelezo ya mchakato wa ETL ni nambari rahisi ya Python, sikucheza kwa furaha. Hivi ndivyo mitiririko ya data ilitolewa na kutofautishwa, na kumwaga jedwali zilizo na muundo mmoja kutoka kwa mamia ya hifadhidata hadi lengo moja ikawa suala la msimbo wa Python katika skrini moja na nusu au mbili 13 ”.

Kukusanya nguzo

Wacha tusitengeneze chekechea kabisa, na tusizungumze juu ya vitu dhahiri kabisa hapa, kama kusakinisha Airflow, hifadhidata uliyochagua, Celery na kesi zingine zilizoelezewa kwenye kizimbani.

Ili tuweze kuanza majaribio mara moja, nilichora docker-compose.yml ambayo:

  • Hebu tuinue Airflow: Mratibu, Webserver. Flower pia itakuwa inazunguka huko ili kufuatilia kazi za Seli (kwa sababu tayari imesukumwa ndani apache/airflow:1.10.10-python3.7, lakini hatujali)
  • PostgreSQL, ambapo Airflow itaandika maelezo yake ya huduma (data ya kiratibu, takwimu za utekelezaji, n.k.), na Celery itaashiria kazi zilizokamilishwa;
  • Rejea, ambayo itafanya kama wakala wa kazi kwa Celery;
  • Mfanyikazi wa celery, ambayo itahusika katika utekelezaji wa moja kwa moja wa kazi.
  • Kwa folda ./dags tutaongeza faili zetu na maelezo ya dags. Watachukuliwa kuruka, kwa hivyo hakuna haja ya kugeuza rundo zima baada ya kila kupiga chafya.

Katika maeneo mengine, msimbo katika mifano hauonyeshwa kabisa (ili usiingie maandishi), lakini mahali fulani hurekebishwa katika mchakato. Mifano kamili ya nambari za kufanya kazi inaweza kupatikana kwenye ghala https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Notes:

  • Katika mkusanyiko wa utungaji, nilitegemea sana picha inayojulikana puckel/docker-airflow - hakikisha kuiangalia. Labda hauitaji kitu kingine chochote katika maisha yako.
  • Mipangilio yote ya Airflow inapatikana sio tu kupitia airflow.cfg, lakini pia kupitia anuwai za mazingira (shukrani kwa watengenezaji), ambayo nilichukua faida yake kwa ubaya.
  • Kwa kawaida, haiko tayari kwa uzalishaji: kwa makusudi sikuweka mapigo ya moyo kwenye vyombo, sikujisumbua na usalama. Lakini nilifanya kiwango cha chini kinachofaa kwa majaribio yetu.
  • Kumbuka kuwa:
    • Folda ya dag lazima ipatikane na kipanga ratiba na wafanyikazi.
    • Vile vile hutumika kwa maktaba zote za watu wengine - lazima zote zisanikishwe kwenye mashine zilizo na mpangilio na wafanyikazi.

Kweli, sasa ni rahisi:

$ docker-compose up --scale worker=3

Baada ya kila kitu kuongezeka, unaweza kuangalia miingiliano ya wavuti:

Dhana za kimsingi

Ikiwa haukuelewa chochote katika "dagi" hizi zote, basi hapa kuna kamusi fupi:

  • Kipanya - mjomba muhimu zaidi katika Airflow, kudhibiti kwamba roboti hufanya kazi kwa bidii, na si mtu: hufuatilia ratiba, kusasisha dags, kuzindua kazi.

    Kwa ujumla, katika matoleo ya zamani, alikuwa na shida na kumbukumbu (hapana, sio amnesia, lakini uvujaji) na parameta ya urithi hata ilibaki kwenye usanidi. run_duration - muda wake wa kuanza upya. Lakini sasa kila kitu ni sawa.

  • DAG (aka "dag") - "grafu ya acyclic iliyoelekezwa", lakini ufafanuzi kama huo utaambia watu wachache, lakini kwa kweli ni chombo cha kazi zinazoingiliana (tazama hapa chini) au analog ya Package katika SSIS na Workflow katika Informatica. .

    Mbali na dags, bado kunaweza kuwa na subdags, lakini uwezekano mkubwa hatutafika kwao.

  • Mbio za DAG - dag iliyoanzishwa, ambayo imepewa yake mwenyewe execution_date. Dagrans ya dag sawa inaweza kufanya kazi kwa sambamba (ikiwa ulifanya kazi zako kuwa zisizo na maana, bila shaka).
  • Opereta ni vipande vya kanuni vinavyohusika na kutekeleza kitendo maalum. Kuna aina tatu za waendeshaji:
    • hatuakama mpendwa wetu PythonOperator, ambayo inaweza kutekeleza msimbo wowote (sahihi) wa Python;
    • kuhamisha, ambayo husafirisha data kutoka mahali hadi mahali, sema, MsSqlToHiveTransfer;
    • sensor kwa upande mwingine, itawawezesha kuguswa au kupunguza kasi ya utekelezaji zaidi wa dag mpaka tukio hutokea. HttpSensor inaweza kuvuta sehemu ya mwisho iliyoainishwa, na wakati jibu linalohitajika linangojea, anza uhamishaji GoogleCloudStorageToS3Operator. Akili ya kudadisi itauliza: β€œKwa nini? Baada ya yote, unaweza kufanya marudio moja kwa moja kwenye opereta! Na kisha, ili si kuziba bwawa la kazi na waendeshaji kusimamishwa. Sensor huanza, hukagua na kufa kabla ya jaribio linalofuata.
  • Kazi - waendeshaji waliotangazwa, bila kujali aina, na kushikamana na dag wanapandishwa cheo cha kazi.
  • mfano wa kazi - wakati mpangaji mkuu aliamua kuwa ni wakati wa kutuma kazi vitani kwa watendaji-watendaji (papo hapo, ikiwa tunatumia LocalExecutor au kwa nodi ya mbali katika kesi ya CeleryExecutor), inawapa muktadha (yaani, seti ya vigezo - vigezo vya utekelezaji), huongeza amri au violezo vya hoja, na kuziweka.

Tunatengeneza kazi

Kwanza, hebu tuonyeshe mpango wa jumla wa doug yetu, na kisha tutaingia kwenye maelezo zaidi na zaidi, kwa sababu tunatumia ufumbuzi usio na maana.

Kwa hivyo, kwa fomu yake rahisi, dag kama hii itaonekana kama hii:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Hebu tufikirie:

  • Kwanza, tunaagiza libs muhimu na kitu kingine;
  • sql_server_ds - Je, List[namedtuple[str, str]] na majina ya viunganisho kutoka kwa Viunganisho vya Airflow na hifadhidata ambazo tutachukua sahani yetu;
  • dag - tangazo la dag yetu, ambayo lazima iwe ndani globals(), vinginevyo Airflow haitaipata. Doug pia anahitaji kusema:
    • jina lake ni nani orders - jina hili litaonekana kwenye kiolesura cha wavuti,
    • kwamba atafanya kazi kuanzia usiku wa manane tarehe nane Julai,
    • na inapaswa kukimbia, takriban kila masaa 6 (kwa watu wagumu hapa badala ya timedelta() inayokubalika cron- mstari 0 0 0/6 ? * * *, kwa baridi kidogo - usemi kama @daily);
  • workflow() itafanya kazi kuu, lakini sio sasa. Kwa sasa, tutatupa tu muktadha wetu kwenye logi.
  • Na sasa uchawi rahisi wa kuunda kazi:
    • tunapitia vyanzo vyetu;
    • anzisha PythonOperator, ambayo itatekeleza dummy yetu workflow(). Usisahau kutaja jina la pekee (ndani ya dag) la kazi na kumfunga dag yenyewe. Bendera provide_context kwa upande wake, itamimina hoja za ziada kwenye kazi, ambayo tutakusanya kwa uangalifu kwa kutumia **context.

Kwa sasa, ni hayo tu. Tulichopata:

  • dag mpya kwenye kiolesura cha wavuti,
  • majukumu mia moja na nusu ambayo yatatekelezwa kwa sambamba (ikiwa Airflow, mipangilio ya Celery na uwezo wa seva inaruhusu).

Naam, karibu kupata.

Apache Airflow: Kurahisisha ETL
Nani ataweka tegemezi?

Ili kurahisisha jambo hili lote, nilijiingiza docker-compose.yml usindikaji requirements.txt kwenye nodi zote.

Sasa imepita:

Apache Airflow: Kurahisisha ETL

miraba ya kijivu ni matukio ya kazi yanayochakatwa na kipanga ratiba.

Tunasubiri kidogo, kazi zinachukuliwa na wafanyikazi:

Apache Airflow: Kurahisisha ETL

Wale wa kijani, bila shaka, wamekamilisha kazi yao kwa ufanisi. Nyekundu hazifanikiwa sana.

Kwa njia, hakuna folda kwenye prod yetu ./dags, hakuna maingiliano kati ya mashine - dags zote ziko ndani git kwenye Gitlab yetu, na Gitlab CI inasambaza masasisho kwa mashine wakati wa kuunganishwa master.

Kidogo kuhusu Maua

Wakati wafanyikazi wanapiga viboreshaji vyetu, tukumbuke zana nyingine ambayo inaweza kutuonyesha kitu - Maua.

Ukurasa wa kwanza kabisa ulio na habari ya muhtasari kwenye nodi za wafanyikazi:

Apache Airflow: Kurahisisha ETL

Ukurasa mkali zaidi na kazi ambazo zilifanya kazi:

Apache Airflow: Kurahisisha ETL

Ukurasa wa kuchosha zaidi na hali ya wakala wetu:

Apache Airflow: Kurahisisha ETL

Ukurasa mkali zaidi una grafu za hali ya kazi na muda wa utekelezaji wake:

Apache Airflow: Kurahisisha ETL

Tunapakia iliyopakuliwa

Kwa hivyo, kazi zote zimefanyika, unaweza kubeba waliojeruhiwa.

Apache Airflow: Kurahisisha ETL

Na kulikuwa na wengi waliojeruhiwa - kwa sababu moja au nyingine. Katika kesi ya matumizi sahihi ya Airflow, miraba hii inaonyesha kuwa data hakika haikufika.

Unahitaji kutazama logi na uanze tena matukio ya kazi iliyoanguka.

Kwa kubofya mraba wowote, tutaona vitendo vinavyopatikana kwetu:

Apache Airflow: Kurahisisha ETL

Unaweza kuchukua na kufanya Wazi walioanguka. Hiyo ni, tunasahau kuwa kuna kitu kimeshindwa hapo, na kazi sawa ya mfano itaenda kwa mpangaji.

Apache Airflow: Kurahisisha ETL

Ni wazi kuwa kufanya hivi na panya na miraba nyekundu sio ya kibinadamu sana - hii sio tunayotarajia kutoka kwa Airflow. Kwa kawaida, tuna silaha za maangamizi makubwa: Browse/Task Instances

Apache Airflow: Kurahisisha ETL

Wacha tuchague kila kitu mara moja na tuweke upya hadi sifuri, bofya kipengee sahihi:

Apache Airflow: Kurahisisha ETL

Baada ya kusafisha, teksi zetu zinaonekana kama hii (tayari wanangojea mpangaji kupanga ratiba):

Apache Airflow: Kurahisisha ETL

Viunganisho, ndoano na vigezo vingine

Ni wakati wa kuangalia DAG ijayo, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа Ρ…ΠΎΡ€ΠΎΡˆΠΈΠ΅, ΠΎΡ‚Ρ‡Π΅Ρ‚Ρ‹ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Ρ‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ΠΠ°Ρ‚Π°Ρˆ, просыпайся, ΠΌΡ‹ {{ dag.dag_id }} ΡƒΡ€ΠΎΠ½ΠΈΠ»ΠΈ
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Je, kila mtu amewahi kusasisha ripoti? Huyu ndiye tena: kuna orodha ya vyanzo kutoka mahali pa kupata data; kuna orodha ambapo kuweka; usisahau kupiga honi wakati kila kitu kilifanyika au kuvunja (vizuri, hii sio juu yetu, hapana).

Wacha tupitie faili tena na tuangalie vitu vipya visivyo wazi:

  • from commons.operators import TelegramBotSendMessage - hakuna kinachotuzuia kutengeneza waendeshaji wetu wenyewe, ambao tulichukua faida kwa kutengeneza karatasi ndogo ya kutuma ujumbe kwa Haijazuiwa. (Tutazungumza zaidi juu ya mwendeshaji huyu hapa chini);
  • default_args={} - dag inaweza kusambaza hoja sawa kwa waendeshaji wake wote;
  • to='{{ var.value.all_the_kings_men }}' - shamba to hatutakuwa na misimbo ngumu, lakini itatolewa kwa nguvu kwa kutumia Jinja na kibadilishaji kilicho na orodha ya barua pepe, ambazo niliweka kwa uangalifu. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - hali ya kuanzisha opereta. Kwa upande wetu, barua itaruka kwa wakubwa tu ikiwa utegemezi wote umefanya kazi kwa mafanikio;
  • tg_bot_conn_id='tg_main' - hoja conn_id ukubali vitambulisho vya muunganisho ambavyo tunaunda ndani Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - ujumbe katika Telegram utaondoka tu ikiwa kuna kazi zilizoanguka;
  • task_concurrency=1 - tunakataza uzinduzi wa wakati mmoja wa matukio kadhaa ya kazi ya kazi moja. Vinginevyo, tutapata uzinduzi wa wakati mmoja wa kadhaa VerticaOperator (kutazama meza moja);
  • report_update >> [email, tg] - wote VerticaOperator ungana katika kutuma barua na ujumbe, kama hii:
    Apache Airflow: Kurahisisha ETL

    Lakini kwa kuwa waendeshaji wa arifa wana hali tofauti za uzinduzi, moja tu itafanya kazi. Katika Mwonekano wa Mti, kila kitu kinaonekana kidogo kidogo:
    Apache Airflow: Kurahisisha ETL

Nitasema maneno machache kuhusu makro na marafiki zao - vigezo.

Macro ni vishikilia nafasi vya Jinja ambavyo vinaweza kubadilisha taarifa mbalimbali muhimu katika hoja za waendeshaji. Kwa mfano, kama hii:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} itapanuka hadi yaliyomo katika utofauti wa muktadha execution_date katika umbizo YYYY-MM-DD: 2020-07-14. Sehemu bora zaidi ni kwamba viambatisho vya muktadha vimetundikwa kwenye tukio maalum la kazi (mraba katika Mwonekano wa Mti), na inapowashwa upya, vishikilia nafasi vitapanuka hadi thamani sawa.

Thamani zilizokabidhiwa zinaweza kutazamwa kwa kutumia kitufe Iliyotolewa kwenye kila tukio la kazi. Hivi ndivyo kazi ya kutuma barua:

Apache Airflow: Kurahisisha ETL

Na kwa hivyo katika kazi ya kutuma ujumbe:

Apache Airflow: Kurahisisha ETL

Orodha kamili ya macros iliyojengwa ndani kwa toleo la hivi karibuni linalopatikana inapatikana hapa: kumbukumbu ya macros

Zaidi ya hayo, kwa msaada wa programu-jalizi, tunaweza kutangaza macros yetu wenyewe, lakini hiyo ni hadithi nyingine.

Kwa kuongezea vitu vilivyoainishwa, tunaweza kubadilisha maadili ya anuwai zetu (tayari nilitumia hii kwenye nambari iliyo hapo juu). Wacha tuunde Admin/Variables mambo kadhaa:

Apache Airflow: Kurahisisha ETL

Kila kitu unaweza kutumia:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Thamani inaweza kuwa scalar, au inaweza pia kuwa JSON. Katika kesi ya JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

tumia tu njia ya ufunguo unaotaka: {{ var.json.bot_config.bot.token }}.

Nitasema neno moja na kuonyesha picha moja ya skrini kuhusu viunganisho. Kila kitu ni cha msingi hapa: kwenye ukurasa Admin/Connections tunaunda muunganisho, ongeza maingizo / nywila zetu na vigezo maalum zaidi hapo. Kama hii:

Apache Airflow: Kurahisisha ETL

Nywila zinaweza kusimbwa (kwa undani zaidi kuliko chaguo-msingi), au unaweza kuacha aina ya unganisho (kama nilivyofanya kwa tg_main) - ukweli ni kwamba orodha ya aina ni ngumu katika mifano ya Airflow na haiwezi kupanuliwa bila kuingia kwenye misimbo ya chanzo (ikiwa ghafla sikuingia kwenye google kitu, tafadhali nirekebishe), lakini hakuna kitu kitakachotuzuia kupata mikopo tu kwa jina.

Unaweza pia kufanya viunganisho kadhaa kwa jina moja: katika kesi hii, njia BaseHook.get_connection(), ambayo hutupatia miunganisho kwa jina, itatoa nasibu kutoka kwa majina kadhaa (itakuwa ya busara zaidi kufanya Round Robin, lakini wacha tuiache kwa dhamiri ya watengenezaji wa Airflow).

Vigezo na Viunganisho hakika ni zana nzuri, lakini ni muhimu usipoteze salio: ni sehemu gani za mitiririko yako unazohifadhi katika msimbo wenyewe, na ni sehemu gani unazotoa kwa Airflow kwa hifadhi. Kwa upande mmoja, inaweza kuwa rahisi kubadili haraka thamani, kwa mfano, sanduku la barua, kupitia UI. Kwa upande mwingine, hii bado ni kurudi kwa kubofya kwa panya, ambayo sisi (mimi) tulitaka kujiondoa.

Kufanya kazi na viunganishi ni moja ya kazi kulabu. Kwa ujumla, ndoano za Airflow ni pointi za kuiunganisha kwa huduma na maktaba za watu wengine. Kwa mfano, JiraHook itatufungulia mteja ili tuwasiliane na Jira (unaweza kusogeza kazi huku na huko), na kwa usaidizi wa SambaHook unaweza kushinikiza faili ya ndani kwa smb-hatua.

Kuchanganua opereta maalum

Na tukakaribia kuangalia jinsi inavyotengenezwa TelegramBotSendMessage

Kanuni commons/operators.py na opereta halisi:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Hapa, kama kila kitu kingine katika Airflow, kila kitu ni rahisi sana:

  • Imerithiwa kutoka BaseOperator, ambayo hutekelezea mambo machache mahususi ya Airflow (angalia tafrija yako)
  • Sehemu zilizotangazwa template_fields, ambayo Jinja itatafuta macros ya kuchakata.
  • Kupanga hoja sahihi kwa __init__(), weka chaguo-msingi inapobidi.
  • Hatukusahau kuhusu kuanzishwa kwa babu pia.
  • Ilifungua ndoano inayolingana TelegramBotHookkupokea kitu mteja kutoka humo.
  • Mbinu iliyobatilishwa (iliyofafanuliwa upya). BaseOperator.execute(), ambayo Airfow itazunguka wakati unakuja wa kuzindua operator - ndani yake tutatekeleza hatua kuu, kusahau kuingia. (Tunaingia, kwa njia, moja kwa moja stdout ΠΈ stderr - Mtiririko wa hewa utakatiza kila kitu, kuifunga kwa uzuri, kuoza inapohitajika.)

Hebu tuone kile tulichonacho commons/hooks.py. Sehemu ya kwanza ya faili, na ndoano yenyewe:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Sijui hata nielezee nini hapa, nitazingatia tu mambo muhimu:

  • Tunarithi, fikiria juu ya hoja - katika hali nyingi itakuwa moja: conn_id;
  • Kupitisha mbinu za kawaida: Nilijizuia get_conn(), ambayo mimi hupata vigezo vya uunganisho kwa jina na kupata sehemu tu extra (hii ni sehemu ya JSON), ambamo mimi (kulingana na maagizo yangu!) niliweka ishara ya bot ya Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ninaunda mfano wetu TelegramBot, akiipa ishara maalum.

Ni hayo tu. Unaweza kupata mteja kutoka kwa ndoano kwa kutumia TelegramBotHook().clent au TelegramBotHook().get_conn().

Na sehemu ya pili ya faili, ambayo mimi hutengeneza microwrapper kwa Telegraph REST API, ili sio kuvuta sawa. python-telegram-bot kwa mbinu moja sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Njia sahihi ni kuongeza yote: TelegramBotSendMessage, TelegramBotHook, TelegramBot - kwenye programu-jalizi, weka hifadhi ya umma, na uipe Open Source.

Tulipokuwa tukijifunza haya yote, masasisho yetu ya ripoti yalifaulu kushindwa na kunitumia ujumbe wa hitilafu kwenye kituo. Nitaangalia kama ni makosa...

Apache Airflow: Kurahisisha ETL
Kitu kilivunjwa ndani ya mbwa wetu! Si ndivyo tulivyokuwa tukitarajia? Hasa!

Je, unaenda kumwaga?

Unahisi nimekosa kitu? Inaonekana kwamba aliahidi kuhamisha data kutoka SQL Server hadi Vertica, na kisha akaichukua na kuondoka kwenye mada, mhuni!

Ukatili huu ulikuwa wa kukusudia, ilibidi nikueleze istilahi fulani. Sasa unaweza kwenda mbali zaidi.

Mpango wetu ulikuwa hivi:

  1. Kufanya dag
  2. Tengeneza majukumu
  3. Tazama jinsi kila kitu kilivyo nzuri
  4. Weka nambari za kipindi ili ujaze
  5. Pata data kutoka kwa Seva ya SQL
  6. Weka data kwenye Vertica
  7. Kusanya takwimu

Kwa hivyo, ili kupata haya yote na kukimbia, nilifanya nyongeza ndogo kwa yetu docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Hapo tunainua:

  • Vertica ni mwenyeji bingwa dwh na mipangilio chaguo-msingi zaidi,
  • matukio matatu ya SQL Server,
  • tunajaza hifadhidata katika mwisho na data fulani (kwa hali yoyote usiangalie mssql_init.py!)

Tunazindua mema yote kwa msaada wa amri ngumu zaidi kuliko mara ya mwisho:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Kile randomizer yetu ya muujiza ilizalisha, unaweza kutumia bidhaa Data Profiling/Ad Hoc Query:

Apache Airflow: Kurahisisha ETL
Jambo kuu sio kuionyesha kwa wachambuzi

kufafanua Vipindi vya ETL Sitafanya, kila kitu ni kidogo huko: tunatengeneza msingi, kuna ishara ndani yake, tunafunga kila kitu na msimamizi wa muktadha, na sasa tunafanya hivi:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

kikao.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Wakati umefika kukusanya data zetu kutoka kwa meza zetu mia moja na nusu. Wacha tufanye hivi kwa msaada wa mistari isiyo na adabu sana:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Kwa msaada wa ndoano tunapata kutoka Airflow pymssql-unganisha
  2. Wacha tubadilishe kizuizi katika mfumo wa tarehe kwenye ombi - itatupwa kwenye kazi na injini ya template.
  3. Kulisha ombi letu pandasnani atatupata DataFrame - itakuwa na manufaa kwetu katika siku zijazo.

Ninatumia mbadala {dt} badala ya parameta ya ombi %s si kwa sababu mimi ni Pinocchio mbaya, lakini kwa sababu pandas haiwezi kushughulikia pymssql na kuteleza ya mwisho params: Listingawa anataka kweli tuple.
Pia kumbuka kuwa msanidi programu pymssql aliamua kutomuunga mkono tena, na ni wakati wa kuhama pyodbc.

Wacha tuone Airflow ilijaza hoja za kazi zetu na:

Apache Airflow: Kurahisisha ETL

Ikiwa hakuna data, basi hakuna uhakika katika kuendelea. Lakini pia ni ajabu kuzingatia kujaza kwa mafanikio. Lakini hili si kosa. A-ah-ah, nini cha kufanya?! Na hapa ni nini:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException itaambia Airflow kuwa hakuna makosa, lakini tunaruka jukumu hilo. Interface haitakuwa na mraba wa kijani au nyekundu, lakini nyekundu.

Hebu tupige data zetu safu wima nyingi:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

yaani

  • Hifadhidata ambayo tulichukua maagizo,
  • Kitambulisho cha kipindi chetu cha mafuriko (itakuwa tofauti kwa kila kazi),
  • Hashi kutoka kwa chanzo na kitambulisho cha kuagiza - ili katika hifadhidata ya mwisho (ambapo kila kitu hutiwa kwenye jedwali moja) tuna kitambulisho cha kipekee cha agizo.

Hatua ya mwisho inabaki: mimina kila kitu kwenye Vertica. Na, cha ajabu, mojawapo ya njia za kuvutia zaidi na bora za kufanya hivi ni kupitia CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Tunatengeneza kipokeaji maalum StringIO.
  2. pandas mapenzi kuweka yetu DataFrame kwa fomu CSV- mistari.
  3. Hebu tufungue muunganisho kwa Vertica yetu tuipendayo kwa ndoano.
  4. Na sasa kwa msaada copy() tuma data zetu moja kwa moja kwa Vertika!

Tutachukua kutoka kwa dereva ni mistari ngapi iliyojazwa, na kumwambia msimamizi wa kikao kuwa kila kitu ni sawa:

session.loaded_rows = cursor.rowcount
session.successful = True

Ni hayo tu.

Kwa mauzo, tunaunda sahani inayolengwa kwa mikono. Hapa nilijiruhusu mashine ndogo:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Ninatumia VerticaOperator() Ninaunda schema ya hifadhidata na meza (ikiwa hazipo tayari, kwa kweli). Jambo kuu ni kupanga kwa usahihi utegemezi:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Akihitimisha

- Naam, - alisema panya mdogo, - sivyo, sasa
Je, unasadiki kwamba mimi ndiye mnyama mbaya zaidi msituni?

Julia Donaldson, The Gruffalo

Nadhani ikiwa wenzangu na mimi tulikuwa na ushindani: ni nani atakayeunda na kuzindua mchakato wa ETL haraka kutoka mwanzo: wao na SSIS yao na panya na mimi na Airflow ... Na kisha tungelinganisha urahisi wa matengenezo ... Wow, nadhani utakubali kwamba nitawapiga pande zote!

Ikiwa kwa umakini zaidi, basi Apache Airflow - kwa kuelezea michakato katika mfumo wa nambari ya programu - ilifanya kazi yangu. sana vizuri zaidi na kufurahisha.

Upanuzi wake usio na kikomo, katika suala la programu-jalizi na utabiri wa kuongezeka, hukupa fursa ya kutumia Airflow karibu na eneo lolote: hata katika mzunguko kamili wa kukusanya, kuandaa na kuchakata data, hata katika kurusha roketi (hadi Mars, ya kozi).

Sehemu ya mwisho, kumbukumbu na habari

Reki tumekukusanyia

  • start_date. Ndiyo, hii tayari ni meme ya ndani. Kupitia hoja kuu ya Doug start_date wote kupita. Kwa kifupi, ikiwa utabainisha ndani start_date tarehe ya sasa, na schedule_interval - siku moja, basi DAG itaanza kesho hakuna mapema.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Na hakuna matatizo zaidi.

    Kuna hitilafu nyingine ya wakati wa utekelezaji inayohusishwa nayo: Task is missing the start_date parameter, ambayo mara nyingi inaonyesha kuwa umesahau kumfunga dag operator.

  • Kila kitu kwenye mashine moja. Ndio, na besi (Mtiririko wa hewa yenyewe na mipako yetu), na seva ya wavuti, na mpangilio, na wafanyikazi. Na hata ilifanya kazi. Lakini baada ya muda, idadi ya kazi za huduma ilikua, na wakati PostgreSQL ilianza kujibu index katika 20 s badala ya 5 ms, tuliichukua na kuichukua.
  • Mtekelezaji wa Mitaa. Ndiyo, bado tumeketi juu yake, na tayari tumefika kwenye makali ya kuzimu. LocalExecutor imetutosha hadi sasa, lakini sasa ni wakati wa kupanua na angalau mfanyakazi mmoja, na itabidi tufanye bidii kuhamia CeleryExecutor. Na kwa kuzingatia ukweli kwamba unaweza kufanya kazi nayo kwenye mashine moja, hakuna kinachokuzuia kutumia Celery hata kwenye seva, ambayo "bila shaka, haitaingia katika uzalishaji, kwa uaminifu!"
  • Kutotumia zana zilizojengwa:
    • Connections kuhifadhi hati za huduma,
    • SLA Inakosa kujibu kazi ambazo hazifanyi kazi kwa wakati,
    • xcom kwa kubadilishana metadata (nilisema metadata!) kati ya kazi za dag.
  • Matumizi mabaya ya barua. Naam, naweza kusema nini? Arifa ziliwekwa kwa marudio yote ya kazi zilizoanguka. Sasa Gmail ya kazini ina >barua pepe 90k kutoka Airflow, na kificho cha barua pepe ya wavuti kinakataa kuchukua na kufuta zaidi ya 100 kwa wakati mmoja.

Mitego zaidi: Misukosuko ya Utiririshaji wa Air Apache

Zana zaidi za otomatiki

Ili tufanye kazi zaidi kwa vichwa vyetu na sio kwa mikono yetu, Airflow imetuandalia haya:

  • API YA REST - bado ana hali ya Majaribio, ambayo haimzuii kufanya kazi. Pamoja nayo, huwezi kupata tu habari kuhusu dags na kazi, lakini pia kuacha / kuanza dag, kuunda DAG Run au bwawa.
  • CLI - zana nyingi zinapatikana kupitia mstari wa amri ambazo sio tu zisizofaa kutumia kupitia WebUI, lakini kwa ujumla hazipo. Kwa mfano:
    • backfill inahitajika kuanzisha upya matukio ya kazi.
      Kwa mfano, wachambuzi walikuja na kusema: "Na wewe, rafiki, una upuuzi katika data kutoka Januari 1 hadi 13! Irekebishe, irekebishe, irekebishe, irekebishe!" Na wewe ni hobi kama hii:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Huduma ya msingi: initdb, resetdb, upgradedb, checkdb.
    • run, ambayo hukuruhusu kuendesha kazi ya mfano mmoja, na hata alama kwenye utegemezi wote. Kwa kuongeza, unaweza kuiendesha kupitia LocalExecutor, hata kama una kikundi cha Celery.
    • Inafanya kitu sawa test, pia katika misingi haiandiki chochote.
    • connections inaruhusu uundaji wa wingi wa miunganisho kutoka kwa ganda.
  • API ya chatu - njia ngumu ya kuingiliana, ambayo imekusudiwa kwa programu-jalizi, na sio kuingia ndani yake kwa mikono kidogo. Lakini ni nani wa kutuzuia kwenda /home/airflow/dags, kukimbia ipython na kuanza kufanya fujo? Unaweza, kwa mfano, kuuza nje miunganisho yote na nambari ifuatayo:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Inaunganisha kwenye metadatabase ya Airflow. Siipendekezi kuiandikia, lakini kupata hali za kazi kwa metriki anuwai maalum inaweza kuwa haraka na rahisi zaidi kuliko kupitia API zozote.

    Wacha tuseme kwamba sio kazi zetu zote hazina uwezo, lakini wakati mwingine zinaweza kuanguka, na hii ni kawaida. Lakini blockages chache tayari ni tuhuma, na itakuwa muhimu kuangalia.

    Jihadharini na SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

marejeo

Na bila shaka, viungo kumi vya kwanza kutoka kwa utoaji wa Google ni yaliyomo kwenye folda ya Airflow kutoka kwa alamisho zangu.

Na viungo vilivyotumika katika kifungu:

Chanzo: mapenzi.com