Apache Airflow: Yin ETL Mafi Sauƙi

Barka dai, Ni Dmitry Logvinenko - Injiniyan Bayanai na Sashen Bincike na rukunin kamfanoni na Vezet.

Zan gaya muku game da kayan aiki mai ban mamaki don haɓaka hanyoyin ETL - Apache Airflow. Amma Airflow ne mai amfani da yawa da kuma sanyaya wa ya kamata ku kula da shi ko da ba ku da hannu a cikin bayanai kuma ba da buƙatar lokaci-lokaci kisan.

Kuma a, ba zan fada kawai ba, amma kuma nuna: shirin yana da lambar da yawa, hotunan kariyar kwamfuta da shawarwari.

Apache Airflow: Yin ETL Mafi Sauƙi
Abin da kuke gani yawanci lokacin da kuke google kalmar Airflow / Wikimedia Commons

Abubuwan da ke ciki

Gabatarwar

Apache Airflow yana kama da Django:

  • rubuta a Python
  • akwai babban admin panel,
  • fadadawa har abada

- kawai mafi kyau, kuma an yi shi don dalilai daban-daban, wato (kamar yadda aka rubuta a gaban kata):

  • gudana da saka idanu ayyuka akan injuna marasa iyaka (kamar yadda Celery / Kubernetes da yawa da lamirinku zasu ba ku damar)
  • tare da tsararrun aikin aiki mai ƙarfi daga mai sauƙin rubutu da fahimtar lambar Python
  • da ikon haɗa kowane bayanan bayanai da APIs tare da juna ta amfani da abubuwan da aka yi da shirye-shiryen da kayan aikin gida (wanda yake da sauƙin gaske).

Muna amfani da Apache Airflow kamar haka:

  • muna tattara bayanai daga tushe daban-daban (yawancin SQL Server da lokuta na PostgreSQL, APIs daban-daban tare da ma'aunin aikace-aikacen, har ma da 1C) a cikin DWH da ODS (muna da Vertica da Clickhouse).
  • yadda ci gaba cron, wanda ke fara tsarin haɗin gwiwar bayanai akan ODS, kuma yana sa ido kan kiyaye su.

Har zuwa kwanan nan, buƙatunmu an rufe su da ƙaramin sabar sabar guda ɗaya mai 32 cores da 50 GB na RAM. A cikin Airflow, wannan yana aiki:

  • mafi 200 daga (a zahiri gudanawar aiki, wanda muka cika ayyuka),
  • a kowace a matsakaici Ayyuka 70,
  • wannan alherin yana farawa (kuma a matsakaici) sau daya a awa daya.

Kuma game da yadda muka fadada, zan rubuta a kasa, amma yanzu bari mu ayyana über-matsalar da za mu warware:

Akwai SQL Servers na asali guda uku, kowannensu yana da bayanan bayanai guda 50 - misalin aikin guda daya, bi da bi, suna da tsari iri daya (kusan ko'ina, mua-ha-ha), wanda ke nufin cewa kowanne yana da tebur na oda (an yi sa'a, tebur mai wannan. suna iya turawa cikin kowane kasuwanci). Muna ɗaukar bayanan ta ƙara filayen sabis (sabar tushen, tushen bayanai, ID na aiki na ETL) kuma mu jefa su cikin butulci, a ce, Vertica.

Bari mu tafi!

Babban sashi, mai amfani (kuma ɗan ka'ida)

Me yasa mu (da ku)

Lokacin da bishiyoyi suke da girma kuma na kasance mai sauƙi SQL-schik a cikin dillalan Rasha ɗaya, mun zamba da tsarin ETL aka gudanar da bayanai ta amfani da kayan aikin guda biyu da ke akwai:

  • Cibiyar Wuta ta Informatica - tsarin da ke yaɗuwa sosai, yana da fa'ida sosai, tare da kayan masarufi, nau'in nasa. Na yi amfani da Allah ya kiyaye kashi 1% na iyawarsa. Me yasa? Da kyau, da farko, wannan haɗin gwiwa, wani wuri daga 380s, a hankali yana matsa mana lamba. Na biyu, an ƙirƙiri wannan ƙetare don kyakkyawan tsari, sake amfani da abubuwan ban haushi da sauran dabarun-kasuwanci masu mahimmanci. Game da gaskiyar cewa farashinsa, kamar reshe na Airbus AXNUMX / shekara, ba za mu ce komai ba.

    Hattara, hoton allo na iya cutar da mutanen ƙasa da 30 kaɗan

    Apache Airflow: Yin ETL Mafi Sauƙi

  • SQL Sabar Haɗin Kai - mun yi amfani da wannan abokin aikinmu a cikin ayyukanmu na gudana. To, a gaskiya: mun riga mun yi amfani da SQL Server, kuma zai zama ko ta yaya rashin hankali kada a yi amfani da kayan aikin ETL. Duk abin da ke cikin shi yana da kyau: duka masu dubawa suna da kyau, kuma rahoton ci gaba ... Amma wannan ba shine dalilin da ya sa muke son samfuran software ba, oh, ba don wannan ba. Siffar shi dtsx (wanda shine XML tare da nodes shuffled akan ajiyewa) zamu iya, amma menene ma'anar? Yaya game da yin kunshin ɗawainiya wanda zai ja ɗaruruwan teburi daga wannan uwar garken zuwa wani? Ee, menene ɗari, yatsan hannun ku zai faɗi daga guda ashirin, danna maɓallin linzamin kwamfuta. Amma tabbas ya fi kyan gani:

    Apache Airflow: Yin ETL Mafi Sauƙi

Lallai mun nemi mafita. Harka ma kusan ya zo kan janareta fakitin SSIS wanda ya rubuta kansa ...

...sai kuma wani sabon aiki ya same ni. Kuma Apache Airflow ya riske ni a kai.

Lokacin da na gano cewa bayanin tsarin ETL code ne mai sauƙi na Python, Ban yi rawa don murna ba. Wannan shi ne yadda aka tsara magudanar bayanai da rarrabasu, kuma zuba teburi masu tsari guda ɗaya daga ɗaruruwan rumbun adana bayanai a cikin manufa ɗaya ya zama batu na lambar Python a fuska ɗaya da rabi ko biyu 13 ”.

Haɗa tari

Kada mu shirya gaba ɗaya makarantar kindergarten, kuma kada muyi magana game da cikakkun abubuwa a bayyane a nan, kamar shigar da Airflow, bayanan da kuka zaɓa, Seleri da sauran lokuta da aka bayyana a cikin docks.

Domin mu fara gwaji nan da nan, na zayyana docker-compose.yml a cikinsa:

  • Bari a zahiri tadawa Gunadan iska: Mai tsara tsarawa, Webserver. Flower kuma za ta yi juyi a wurin don saka idanu kan ayyukan Celery (saboda an riga an tura shi cikin apache/airflow:1.10.10-python3.7amma bamu damu ba)
  • PostgreSQL, wanda Airflow zai rubuta bayanan sabis ɗin sa (bayanan tsarawa, kididdigar kisa, da dai sauransu), kuma Celery zai nuna alamun kammala ayyukan;
  • Redis, wanda zai yi aiki a matsayin dillali mai aiki don Celery;
  • Ma'aikacin Seleri, wanda za a tsunduma cikin aiwatar da ayyuka kai tsaye.
  • Zuwa babban fayil ./dags za mu ƙara fayilolin mu tare da bayanin dags. Za a ɗauke su a kan gardama, don haka babu buƙatar jujjuya dukkan tari bayan kowace atishawa.

A wasu wurare, ba a nuna lambar a cikin misalan gaba ɗaya (don kar a rikitar da rubutun), amma a wani wuri an canza shi a cikin tsari. Ana iya samun cikakkun misalan lambar aiki a cikin ma'ajiya https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Bayanan kula:

  • A cikin taro na abun da ke ciki, na dogara da yawa ga sanannen hoton puckel/docker-gudanar iska - tabbatar da duba shi. Wataƙila ba kwa buƙatar wani abu dabam a rayuwar ku.
  • Ana samun duk saitunan iska ba kawai ta hanyar ba airflow.cfg, amma kuma ta hanyar canjin yanayi (godiya ga masu haɓakawa), waɗanda na yi amfani da su da mugunta.
  • A zahiri, ba a shirye-shiryen samarwa ba: Ban sanya bugun zuciya a kan kwantena da gangan ba, ban damu da tsaro ba. Amma na yi mafi ƙarancin dacewa ga masu gwajin mu.
  • Lura cewa:
    • Dole ne babban fayil ɗin dag ya kasance mai isa ga duka mai tsarawa da ma'aikata.
    • Hakanan ya shafi duk ɗakunan karatu na ɓangare na uku - dole ne a sanya su duka akan injina tare da na'urar tsarawa da ma'aikata.

To, yanzu yana da sauki:

$ docker-compose up --scale worker=3

Bayan komai ya tashi, zaku iya duba mu'amalar yanar gizo:

Tushen ka'idoji

Idan ba ku fahimci komai ba a cikin duk waɗannan “dags”, to ga ɗan gajeren ƙamus:

  • tsara - mafi mahimmancin kawu a cikin Airflow, wanda ke sarrafa cewa robots suna aiki tuƙuru, ba mutum ba: yana sa ido kan jadawalin, sabunta dags, ƙaddamar da ayyuka.

    Gabaɗaya, a cikin tsoffin juzu'ai, yana da matsaloli tare da ƙwaƙwalwar ajiya (a'a, ba amnesia ba, amma leaks) kuma ma'aunin gado har ma ya kasance a cikin saitunan. run_duration - tazarar sake farawa. Amma yanzu komai ya daidaita.

  • Dag (aka "dag") - "directed acyclic graph", amma irin wannan ma'anar zai gaya wa 'yan mutane, amma a gaskiya shi ne akwati don ayyuka masu hulɗa da juna (duba ƙasa) ko analogue na Kunshin a cikin SSIS da Workflow a Informatica. .

    Baya ga dags, ana iya har yanzu akwai subdags, amma da alama ba za mu iya zuwa gare su ba.

  • DAG Run - farawa dag, wanda aka sanya nasa execution_date. Dagrans na wannan dag na iya aiki a layi daya (idan kun sanya ayyukanku masu ƙarfi, ba shakka).
  • Operator guda ne na lambar da ke da alhakin yin takamaiman aiki. Akwai nau'ikan masu aiki guda uku:
    • matakikamar wanda muke so PythonOperator, wanda zai iya aiwatar da kowane lambar Python (mai inganci);
    • canja wurin, masu jigilar bayanai daga wuri zuwa wuri, suna cewa, MsSqlToHiveTransfer;
    • Na'urar haska bayanai a gefe guda kuma, zai ba ku damar mayar da martani ko rage ci gaba da aiwatar da dag har sai wani lamari ya faru. HttpSensor zai iya ja da ƙayyadadden wurin ƙarshe, kuma lokacin da ake jiran amsan da ake so, fara canja wuri GoogleCloudStorageToS3Operator. Mai tunani zai tambaya: “Me ya sa? Bayan haka, zaku iya yin maimaitawa daidai a cikin ma'aikacin!" Sannan, don kar a toshe tafkin ayyuka tare da masu aiki da aka dakatar. Firikwensin yana farawa, dubawa kuma ya mutu kafin ƙoƙari na gaba.
  • Task - ma'aikatan da aka ayyana, ba tare da la'akari da nau'in ba, kuma a haɗe zuwa dag ana haɓaka su zuwa matsayi na aiki.
  • misali aiki - lokacin da babban mai tsarawa ya yanke shawarar cewa lokaci ya yi da za a aika ayyuka cikin yaƙi a kan ma'aikata-ma'aikata (dama kan tabo, idan muka yi amfani da su). LocalExecutor ko zuwa kumburi mai nisa a cikin lamarin CeleryExecutor), yana ba su mahallin mahallin (watau saitin masu canji - sigogin aiwatarwa), yana faɗaɗa umarni ko samfuri na tambaya, da taruwa su.

Muna samar da ayyuka

Da farko, bari mu zayyana tsarin gaba ɗaya na kuren namu, sannan za mu ƙara nutsewa cikin cikakkun bayanai, saboda muna amfani da wasu hanyoyin da ba su da mahimmanci.

Don haka, a cikin mafi sauƙin tsari, irin wannan dag zai yi kama da haka:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Bari mu gane shi:

  • Na farko, muna shigo da labulen da ake bukata da wani abu kuma;
  • sql_server_ds Shin List[namedtuple[str, str]] tare da sunayen hanyoyin sadarwa daga Airflow Connections da kuma bayanan da za mu dauki farantin mu;
  • dag - sanarwar kuren mu, wanda dole ne ya kasance a ciki globals(), in ba haka ba Airflow ba zai same shi ba. Doug kuma yana buƙatar ya ce:
    • menene sunansa orders - wannan sunan zai bayyana a cikin mahaɗin yanar gizon,
    • cewa zai yi aiki daga tsakar dare a ranar takwas ga Yuli,
    • kuma ya kamata ya gudana, kusan kowane sa'o'i 6 (ga masu tauri a nan maimakon timedelta() m cron- layi 0 0 0/6 ? * * *, ga mafi ƙarancin sanyi - magana kamar @daily);
  • workflow() zai yi babban aikin, amma ba yanzu ba. A yanzu, za mu jefar da mahallin mu a cikin log ɗin.
  • Kuma yanzu sauƙin sihiri na ƙirƙirar ayyuka:
    • muna gudu ta hanyar mu;
    • fara farawa PythonOperator, wanda zai kashe mu dummy workflow(). Kar a manta don saka sunan na musamman (a cikin dag) na aikin kuma ku ɗaure dag ɗin kanta. Tuta provide_context bi da bi, za su zuba ƙarin muhawara a cikin aikin, wanda za mu tattara a hankali ta amfani da **context.

A yanzu, shi ke nan. Abin da muka samu:

  • sabon dag a cikin yanar gizo dubawa,
  • ayyuka dari da rabi da za a aiwatar a layi daya (idan Airflow, Selery settings da uwar garken damar).

To, kusan samu.

Apache Airflow: Yin ETL Mafi Sauƙi
Wanene zai shigar da masu dogara?

Don sauƙaƙa wannan duka, na kutsa kai docker-compose.yml sarrafawa requirements.txt a kan dukkan nodes.

Yanzu ya tafi:

Apache Airflow: Yin ETL Mafi Sauƙi

Fure-fure masu launin toka misalai ne na ɗawainiya da mai tsarawa ke sarrafa su.

Muna jira kadan, ma'aikata ne suka tattara ayyukan:

Apache Airflow: Yin ETL Mafi Sauƙi

Masu kore, ba shakka, sun kammala aikinsu cikin nasara. Reds ba su da nasara sosai.

Af, babu babban fayil akan samfurin mu ./dags, babu aiki tare tsakanin inji - duk dags suna kwance a ciki git akan Gitlab ɗin mu, kuma Gitlab CI yana rarraba sabuntawa zuwa injuna lokacin haɗuwa a ciki master.

Kadan game da Flower

Yayin da ma'aikata ke murƙushe kayan aikin mu, bari mu tuna wani kayan aiki wanda zai iya nuna mana wani abu - Flower.

Shafi na farko tare da taƙaitaccen bayani akan nodes na ma'aikata:

Apache Airflow: Yin ETL Mafi Sauƙi

Shafi mafi tsanani tare da ayyuka waɗanda suka tafi aiki:

Apache Airflow: Yin ETL Mafi Sauƙi

Shafi mafi ban sha'awa tare da matsayin dillalin mu:

Apache Airflow: Yin ETL Mafi Sauƙi

Shafi mafi haske yana tare da jadawali matsayi da lokacin aiwatar da su:

Apache Airflow: Yin ETL Mafi Sauƙi

Muna ɗora abubuwan da ke ƙasa

Don haka, duk ayyukan sun yi aiki, zaku iya ɗaukar waɗanda suka ji rauni.

Apache Airflow: Yin ETL Mafi Sauƙi

Kuma akwai mutane da yawa da suka ji rauni - saboda wani dalili ko wani. Game da yadda ake amfani da Airflow daidai, waɗannan murabba'ai suna nuna cewa babu shakka bayanan ba su isa ba.

Kuna buƙatar kallon log ɗin kuma sake kunna misalan ayyuka da suka faɗi.

Ta danna kowane murabba'i, za mu ga ayyukan da ke akwai a gare mu:

Apache Airflow: Yin ETL Mafi Sauƙi

Kuna iya ɗauka kuma ku share faɗuwar. Wato, mun manta cewa wani abu ya gaza a can, kuma aikin misali guda zai tafi ga mai tsarawa.

Apache Airflow: Yin ETL Mafi Sauƙi

A bayyane yake cewa yin wannan tare da linzamin kwamfuta tare da duk jajayen murabba'i ba shi da mutuntawa sosai - wannan ba shine abin da muke tsammani daga Airflow ba. A zahiri, muna da makamai na hallaka jama'a: Browse/Task Instances

Apache Airflow: Yin ETL Mafi Sauƙi

Bari mu zaɓi komai a lokaci guda kuma mu sake saita sifili, danna abu daidai:

Apache Airflow: Yin ETL Mafi Sauƙi

Bayan tsaftacewa, taksinmu yayi kama da haka (sun riga sun jira mai tsara jadawalin su):

Apache Airflow: Yin ETL Mafi Sauƙi

Haɗi, ƙugiya da sauran masu canji

Lokaci yayi da za a kalli DAG na gaba, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Shin kowa ya taɓa yin sabuntawar rahoto? Wannan ita ce kuma: akwai jerin hanyoyin da za a samo bayanan; akwai jerin inda za a saka; kar a manta da yin magana lokacin da komai ya faru ko ya karye (da kyau, wannan ba game da mu ba ne, a'a).

Bari mu sake shiga cikin fayil ɗin mu duba sabbin abubuwan da ba su da kyau:

  • from commons.operators import TelegramBotSendMessage - Babu wani abu da zai hana mu yin namu ma'aikata, wanda muka yi amfani da shi ta hanyar yin ƙaramin takarda don aika saƙonni zuwa Unblocked. (Za mu yi magana game da wannan ma'aikacin da ke ƙasa);
  • default_args={} - dag na iya rarraba mahawara iri ɗaya ga duk ma'aikatanta;
  • to='{{ var.value.all_the_kings_men }}' - filin to Ba za mu yi hardcoded ba, amma an samar da shi ta hanyar amfani da Jinja da ma'auni tare da jerin imel, wanda na sanya a hankali. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - sharadi don farawa mai aiki. A cikin yanayinmu, wasiƙar za ta tashi zuwa ga shugabannin ne kawai idan duk abin dogara ya yi aiki nasara;
  • tg_bot_conn_id='tg_main' - muhawara conn_id karbi ID na haɗin haɗin da muka ƙirƙira a ciki Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - saƙonni a cikin Telegram za su tashi kawai idan akwai ayyuka da suka fadi;
  • task_concurrency=1 - mun haramta ƙaddamar da lokuta da dama na ɗawainiya a lokaci guda. In ba haka ba, za mu sami ƙaddamar da yawa lokaci guda VerticaOperator (kallon tebur daya);
  • report_update >> [email, tg] - duk VerticaOperator hada kai wajen aika wasiku da sakonni, kamar haka:
    Apache Airflow: Yin ETL Mafi Sauƙi

    Amma tunda masu aikin sanarwar suna da yanayin ƙaddamarwa daban-daban, ɗaya kawai zai yi aiki. A cikin Duban Bishiyar, komai yayi kama da ƙarancin gani:
    Apache Airflow: Yin ETL Mafi Sauƙi

Zan faɗi 'yan kalmomi game da su macros da abokansu - masu canji.

Macros sune masu riƙe da Jinja waɗanda zasu iya musanya bayanai masu amfani daban-daban zuwa gardamar mai aiki. Misali, kamar haka:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} zai faɗaɗa zuwa abubuwan da ke cikin mahallin mahallin execution_date a tsari YYYY-MM-DD: 2020-07-14. Mafi kyawun sashi shine cewa masu canjin mahallin an ƙusa su zuwa takamaiman misali na ɗawainiya (fare a cikin Duban Bishiyar), kuma idan aka sake kunnawa, masu sanya wuri za su faɗaɗa zuwa ƙima iri ɗaya.

Ana iya duba ƙimar da aka keɓance ta amfani da maɓallin da aka yi a kowane misalin ɗawainiya. Wannan shine yadda aikin aika wasiƙa:

Apache Airflow: Yin ETL Mafi Sauƙi

Don haka a cikin aikin tare da aika saƙo:

Apache Airflow: Yin ETL Mafi Sauƙi

Cikakken jeri na ginanniyar macros don sabuwar sigar da ake da ita tana nan: macro reference

Bugu da ƙari, tare da taimakon plugins, za mu iya bayyana macro na mu, amma wannan wani labari ne.

Baya ga abubuwan da aka riga aka ƙayyade, za mu iya musanya dabi'u na masu canjin mu (Na riga na yi amfani da wannan a cikin lambar da ke sama). Bari mu ƙirƙira a ciki Admin/Variables abubuwa biyu:

Apache Airflow: Yin ETL Mafi Sauƙi

Duk abin da za ku iya amfani da shi:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Darajar na iya zama scalar, ko kuma tana iya zama JSON. A cikin JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

kawai amfani da hanyar zuwa maɓallin da ake so: {{ var.json.bot_config.bot.token }}.

Zan faɗi kalma ɗaya a zahiri kuma in nuna hoton allo ɗaya game da shi haɗi. Komai na farko anan: akan shafi Admin/Connections muna ƙirƙira hanyar haɗi, ƙara abubuwan shiga / kalmomin shiga da ƙarin takamaiman sigogi a wurin. Kamar wannan:

Apache Airflow: Yin ETL Mafi Sauƙi

Ana iya rufaffen kalmomin shiga (mafi kyau fiye da tsoho), ko kuna iya barin nau'in haɗin (kamar yadda na yi don tg_main) - gaskiyar ita ce jerin nau'ikan suna da ƙarfi a cikin samfuran Airflow kuma ba za a iya faɗaɗa ba tare da shiga cikin lambobin tushe (idan ba zato ba tsammani ban google wani abu ba, don Allah a gyara ni), amma babu abin da zai hana mu samun ƙima ta hanyar kawai. suna.

Hakanan zaka iya yin haɗi da yawa tare da suna iri ɗaya: a wannan yanayin, hanyar BaseHook.get_connection(), wanda ke samun mu haɗin kai da sunan, zai bayar bazuwar daga sunayen da yawa (zai zama mafi ma'ana don yin Round Robin, amma bari mu bar shi a kan lamiri na masu haɓakawa na Airflow).

Bambance-bambancen da Haɗin kai tabbas kayan aiki ne masu kyau, amma yana da mahimmanci kada ku rasa ma'auni: waɗanne ɓangarorin magudanar ruwa kuke adanawa a cikin lambar kanta, da kuma waɗanne sassa kuke ba Airflow don ajiya. A gefe ɗaya, yana iya zama dacewa don canza ƙimar da sauri, misali, akwatin aikawa, ta UI. A daya hannun, wannan shi ne har yanzu komawa zuwa linzamin kwamfuta click, daga abin da muka (I) so mu rabu da.

Yin aiki tare da haɗin gwiwa yana ɗaya daga cikin ayyuka ƙugiya. Gabaɗaya, ƙugiya na Airflow maki ne don haɗa shi zuwa sabis na ɓangare na uku da ɗakunan karatu. Misali, JiraHook zai buɗe mana abokin ciniki don yin hulɗa tare da Jira (zaku iya motsa ayyuka baya da gaba), kuma tare da taimakon SambaHook za ka iya tura fayil na gida zuwa smb- batu.

Yin nazarin ma'aikacin al'ada

Kuma mun kusa duba yadda ake yinsa TelegramBotSendMessage

Lambar commons/operators.py tare da ainihin ma'aikacin:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Anan, kamar kowane abu a cikin Airflow, komai mai sauqi ne:

  • Gado daga BaseOperator, wanda ke aiwatar da wasu ƙayyadaddun abubuwan ƙayyadaddun iska (duba lokacin hutun ku)
  • Filayen da aka ayyana template_fields, wanda Jinja zai nemi macro don sarrafa shi.
  • Shirya dalilai masu dacewa don __init__(), saita abubuwan da suka dace a inda ya cancanta.
  • Ba mu manta da farkon kakannin ma ba.
  • Buɗe ƙugiya mai dacewa TelegramBotHooksamu abokin ciniki abu daga gare ta.
  • Hanyar da aka soke (tabbatar). BaseOperator.execute(), wanda Airfow zai yi rawar jiki lokacin da lokaci ya yi don ƙaddamar da mai aiki - a ciki za mu aiwatar da babban aikin, manta da shiga. (Mun shiga, ta hanya, daidai stdout и stderr - Gudun iska za ta katse komai, kunsa shi da kyau, lalata shi idan ya cancanta.)

Bari mu ga abin da muke da shi commons/hooks.py. Sashin farko na fayil ɗin, tare da ƙugiya kanta:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ban ma san abin da zan yi bayani a nan ba, zan lura da muhimman batutuwa:

  • Mun gaji, tunani game da muhawara - a mafi yawan lokuta zai zama daya: conn_id;
  • Haɓaka daidaitattun hanyoyin: Na iyakance kaina get_conn(), wanda a ciki na sami sigogin haɗin da suna kuma kawai samun sashin extra (wannan filin JSON ne), wanda na (bisa ga umarnina!) sanya alamar bot na Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Na kirkiro misalin mu TelegramBot, ba shi takamaiman alama.

Shi ke nan. Kuna iya samun abokin ciniki daga ƙugiya ta amfani da TelegramBotHook().clent ko TelegramBotHook().get_conn().

Kuma kashi na biyu na fayil ɗin, wanda nake yin microwrapper don Telegram REST API, don kada in ja iri ɗaya. python-telegram-bot ga hanya daya sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Hanyar da ta dace ita ce ƙara duka: TelegramBotSendMessage, TelegramBotHook, TelegramBot - a cikin plugin ɗin, saka a cikin ma'ajiyar jama'a, kuma a ba shi ga Buɗe tushen.

Yayin da muke nazarin duk wannan, sabunta rahotonmu ya yi nasara cikin nasara kuma ya aiko mani da sakon kuskure a cikin tashar. Zan duba don ganin ko ba daidai ba...

Apache Airflow: Yin ETL Mafi Sauƙi
Wani abu ya fashe a cikin dogenmu! Ashe ba abin da muke tsammani ba ne? Daidai!

Za ki zuba?

Kuna jin na rasa wani abu? Da alama ya yi alƙawarin canja wurin bayanai daga SQL Server zuwa Vertica, sannan ya ɗauka ya kawar da batun, ɗan iska!

Wannan ta'asa ta ganganci ne, kawai sai na zana muku wasu kalmomi. Yanzu za ku iya ci gaba.

Shirin mu shine:

  1. Ku daga
  2. Ƙirƙirar ayyuka
  3. Dubi yadda komai yake da kyau
  4. Sanya lambobin zama don cikewa
  5. Samo bayanai daga SQL Server
  6. Saka bayanai cikin Vertica
  7. Tattara ƙididdiga

Don haka, don samun wannan duka, na yi ƙaramin ƙari ga namu docker-compose.yml:

docker-hada.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Can mu daga:

  • Vertica a matsayin mai masaukin baki dwh tare da mafi yawan saitunan tsoho,
  • lokuta uku na SQL Server,
  • mun cika ma'ajin bayanai na karshen tare da wasu bayanai (a kowane hali kar a duba mssql_init.py!)

Mun ƙaddamar da duk kyawawan abubuwa tare da taimakon umarni mai rikitarwa fiye da lokacin ƙarshe:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Abin da randomizer mu'ujiza ya haifar, zaku iya amfani da abun Data Profiling/Ad Hoc Query:

Apache Airflow: Yin ETL Mafi Sauƙi
Babban abu shine kada a nuna shi ga manazarta

yi bayani dalla-dalla Zaman ETL Ba zan iya ba, duk abin da ba shi da mahimmanci a can: muna yin tushe, akwai alama a ciki, mun kunsa komai tare da mai sarrafa mahallin, kuma yanzu muna yin haka:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

zaman.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Lokaci ya yi tattara bayanan mu daga teburin mu ɗari da rabi. Bari mu yi wannan tare da taimakon layukan da ba su da fa'ida sosai:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Tare da taimakon ƙugiya muna samun daga Airflow pymssql-haɗa
  2. Bari mu musanya ƙuntatawa ta hanyar kwanan wata a cikin buƙatun - za a jefa shi cikin aikin ta injin samfuri.
  3. Ciyar da bukatar mu pandaswa zai same mu DataFrame - zai zama da amfani a gare mu a nan gaba.

Ina amfani da canji {dt} maimakon ma'aunin buƙata %s ba don ni mugun Pinocchio ba ne, amma saboda pandas kasa rikewa pymssql kuma ya zame na karshe params: Listko da yake yana so tuple.
Hakanan lura cewa mai haɓakawa pymssql yanke shawarar ba za ta ƙara tallafa masa ba, kuma lokaci ya yi da za a ƙaura pyodbc.

Bari mu ga abin da Airflow ya cika muhawarar ayyukanmu da:

Apache Airflow: Yin ETL Mafi Sauƙi

Idan babu bayanai, to babu ma'ana a ci gaba. Amma kuma yana da ban mamaki a yi la'akari da cikar nasara. Amma wannan ba kuskure ba ne. A-ah-ah, me za a yi?! Ga abin da:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException ya gaya wa Airflow cewa babu kurakurai, amma mun tsallake aikin. Matsakaicin ba zai sami murabba'in kore ko ja ba, amma ruwan hoda.

Mu jefar da bayanan mu ginshiƙai masu yawa:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Wato:

  • Database wanda muka karba umarni,
  • ID na zaman ambaliyar mu (zai bambanta ga kowane aiki),
  • Hash daga tushen da oda ID - don haka a cikin bayanan ƙarshe (inda aka zuba komai a cikin tebur ɗaya) muna da ID na musamman na oda.

Mataki na ƙarshe ya rage: zuba komai a cikin Vertica. Kuma, abin banƙyama, ɗayan mafi ban mamaki da ingantattun hanyoyin yin wannan ita ce ta CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Muna yin mai karɓa na musamman StringIO.
  2. pandas za mu sa mu DataFrame a cikin tsari CSV-layi.
  3. Bari mu buɗe hanyar haɗi zuwa Vertica da muka fi so tare da ƙugiya.
  4. Kuma yanzu tare da taimako copy() aika bayanan mu kai tsaye zuwa Vertika!

Za mu ɗauki daga direban layukan nawa ne aka cika, mu gaya wa manajan zaman cewa komai yana da kyau:

session.loaded_rows = cursor.rowcount
session.successful = True

Shi ke nan.

A kan siyarwa, muna ƙirƙirar farantin da aka yi niyya da hannu. Anan na kyale kaina karamar inji:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Ina amfani VerticaOperator() Na ƙirƙiri tsarin tsarin bayanai da tebur (idan ba su wanzu ba, ba shakka). Babban abu shine a tsara abubuwan dogaro daidai:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Girgawa sama

- To, - in ji ƙaramin linzamin kwamfuta, - ba haka ba, yanzu
Shin kun gamsu cewa ni ne mafi munin dabba a cikin dajin?

Julia Donaldson, The Gruffalo

Ina tsammanin idan ni da abokan aiki na muna da gasa: wanda zai yi sauri ya ƙirƙira da ƙaddamar da tsarin ETL daga karce: su da SSIS da linzamin kwamfuta da ni tare da Airflow ... Sannan kuma za mu kwatanta sauƙin kulawa ... Kai, ina tsammanin za ku yarda cewa zan doke su ta kowane bangare!

Idan kadan da gaske, to Apache Airflow - ta hanyar kwatanta matakai a cikin nau'in lambar shirin - ya yi aiki na. da yawa mafi dadi da jin dadi.

Its Unlimited extensibility, duka cikin sharuddan toshe-ins da predisposition zuwa scalability, ba ka damar amfani da Airflow a kusan kowane yanki: ko da a cikin cikakken sake zagayowar na tattara, shirya da kuma sarrafa bayanai, ko da a harba roka (zuwa Mars, na hanya).

Sashe na ƙarshe, tunani da bayanai

Rake da muka tara muku

  • start_date. Ee, wannan rigar meme ce ta gida. Via Doug babbar hujja start_date duk wucewa. A taƙaice, idan kun saka start_date kwanan wata, da schedule_interval - wata rana, to DAG zai fara gobe ba da wuri ba.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Kuma babu sauran matsaloli.

    Akwai wani kuskuren runtime mai alaƙa da shi: Task is missing the start_date parameter, wanda galibi yana nuna cewa kun manta daure da ma'aikacin dag.

  • Duk akan inji daya. Ee, da tushe (Airflow kanta da murfin mu), da sabar yanar gizo, da mai tsarawa, da ma'aikata. Kuma har ya yi aiki. Amma bayan lokaci, yawan ayyuka don ayyuka ya karu, kuma lokacin da PostgreSQL ya fara amsa ma'anar a cikin 20 s maimakon 5 ms, mun dauke shi kuma muka dauke shi.
  • Mai zartarwa na gida. Eh, muna zaune a kanta, kuma mun riga mun zo bakin ramin. LocalExecutor ya ishe mu ya zuwa yanzu, amma yanzu lokaci ya yi da za mu faɗaɗa tare da aƙalla ma'aikaci ɗaya, kuma za mu yi aiki tuƙuru don ƙaura zuwa CeleryExecutor. Kuma saboda gaskiyar cewa za ku iya yin aiki tare da shi a kan na'ura ɗaya, babu abin da zai hana ku yin amfani da Celery ko da a kan uwar garke, wanda "ba shakka, ba zai taba shiga cikin samarwa ba, gaskiya!"
  • Rashin amfani ginannen kayan aikin:
    • Connections don adana bayanan sabis,
    • SLA ya rasa don amsa ayyukan da ba su yi aiki a kan lokaci ba,
    • xcom don musayar metadata (Na ce metadata!) tsakanin dag ayyuka.
  • Zagin wasiku. To, me zan iya cewa? An saita faɗakarwa don duk maimaita ayyukan da suka faɗi. Yanzu aikina na Gmail yana da> 90k imel daga Airflow, kuma muzzle na gidan yanar gizo ya ƙi ɗauka da share fiye da 100 a lokaci guda.

Ƙarin matsaloli: Matsalolin Jirgin Sama na Apache

Ƙarin kayan aikin sarrafa kansa

Domin mu kara yin aiki da kawunanmu ba da hannunmu ba, Airflow ya shirya mana wannan:

  • REST API - har yanzu yana da matsayi na gwaji, wanda ba ya hana shi aiki. Tare da shi, ba za ku iya samun bayanai kawai game da dags da ayyuka ba, amma kuma dakatarwa / fara dag, ƙirƙirar DAG Run ko tafkin.
  • CLI - Ana samun kayan aikin da yawa ta hanyar layin umarni waɗanda ba kawai rashin dacewa don amfani da su ta hanyar WebUI ba, amma gabaɗaya ba su nan. Misali:
    • backfill da ake buƙata don sake farawa al'amuran ɗawainiya.
      Misali, manazarta sun zo suka ce: “Kuma kai abokina, kana da shirme a cikin bayanan daga 1 zuwa 13 ga Janairu! Gyara shi, gyara shi, gyara shi, gyara shi!" Kuma ku ne irin wannan hob:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Sabis na tushe: initdb, resetdb, upgradedb, checkdb.
    • run, wanda ke ba ku damar gudanar da aikin misali ɗaya, har ma da maki akan duk abin dogaro. Bugu da ƙari, za ku iya gudanar da shi ta hanyar LocalExecutor, koda kuwa kuna da tarin Seleri.
    • Yayi kyawawan abu iri ɗaya test, kawai kuma a cikin tushe ba ya rubuta komai.
    • connections yana ba da damar ƙirƙirar haɗin gwiwar taro daga harsashi.
  • API na Python - hanyar mu'amala mai wuyar gaske, wacce aka yi niyya don plugins, kuma ba ta mamaye ta da ƙananan hannaye ba. Amma wa zai hana mu zuwa /home/airflow/dags, gudu ipython kuma fara rikici? Kuna iya, alal misali, fitar da duk haɗin gwiwa tare da lambar mai zuwa:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Haɗa zuwa metadatabase na Airflow. Ba na ba da shawarar rubuta masa ba, amma samun jihohin ɗawainiya don takamaiman ma'auni na musamman na iya zama da sauri da sauƙi fiye da kowane API ɗin.

    Bari mu ce ba duk ayyukanmu ba ne masu ƙarfi, amma wani lokacin suna iya faɗi, kuma wannan al'ada ce. Amma 'yan blockages sun riga sun kasance masu shakku, kuma zai zama dole a bincika.

    Hattara SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

nassoshi

Kuma tabbas, hanyoyin haɗin yanar gizo goma na farko daga fitowar Google sune abubuwan da ke cikin babban fayil ɗin Airflow daga alamomi na.

Kuma hanyoyin da aka yi amfani da su a cikin labarin:

source: www.habr.com