Apache օդային հոսք. ETL-ի հեշտացում

Ողջույն, ես Դմիտրի Լոգվինենկոն եմ՝ Vezet ընկերությունների խմբի Վերլուծության բաժնի տվյալների ինժեներ:

Ես ձեզ կպատմեմ ETL պրոցեսների մշակման հրաշալի գործիքի՝ Apache Airflow-ի մասին։ Բայց Airflow-ն այնքան բազմակողմանի և բազմակողմանի է, որ դուք պետք է ավելի ուշադիր նայեք դրան, նույնիսկ եթե դուք ներգրավված չեք տվյալների հոսքերի մեջ, այլ անհրաժեշտություն ունեք պարբերաբար գործարկել որևէ գործընթաց և վերահսկել դրանց կատարումը:

Եվ այո, ես ոչ միայն կասեմ, այլ նաև ցույց կտամ՝ ծրագիրն ունի շատ կոդ, սքրինշոթ և առաջարկություններ։

Apache օդային հոսք. ETL-ի հեշտացում
Այն, ինչ դուք սովորաբար տեսնում եք, երբ google-ում եք «Օդ հոսք» / Wikimedia Commons բառը

Պահեստավորված նյութեր

Ներածություն

Apache Airflow-ը նման է Django-ին.

  • գրված է python-ով
  • կա մեծ ադմինիստրատորի վահանակ,
  • ընդլայնվում է անորոշ ժամանակով

- միայն ավելի լավ, և այն պատրաստված է բոլորովին այլ նպատակների համար, մասնավորապես (ինչպես գրված է կատվի առաջ).

  • առաջադրանքներ գործարկել և վերահսկել անսահմանափակ թվով մեքենաների վրա (քանի որ շատ Celery/Kubernetes և ձեր խիղճը թույլ կտա ձեզ)
  • դինամիկ աշխատանքային հոսքի ստեղծմամբ՝ շատ հեշտ գրելու և հասկանալու Python կոդը
  • և ցանկացած տվյալների բազաներ և API-ներ միմյանց հետ կապելու հնարավորություն՝ օգտագործելով և՛ պատրաստի բաղադրիչները, և՛ տնական փլագինները (ինչը չափազանց պարզ է):

Մենք օգտագործում ենք Apache Airflow-ը այսպես.

  • մենք հավաքում ենք տվյալներ տարբեր աղբյուրներից (բազմաթիվ SQL Server և PostgreSQL օրինակներ, տարբեր API-ներ՝ կիրառական չափանիշներով, նույնիսկ 1C) DWH-ում և ODS-ում (մենք ունենք Vertica և Clickhouse):
  • որքան առաջադեմ cron, որը սկսում է տվյալների համախմբման գործընթացները ODS-ի վրա, ինչպես նաև վերահսկում է դրանց պահպանումը:

Մինչև վերջերս մեր կարիքները ծածկում էր մեկ փոքր սերվեր՝ 32 միջուկով և 50 ԳԲ օպերատիվ հիշողությամբ: Airflow-ում սա աշխատում է.

  • ավելի շատ 200 դագ (իրականում աշխատանքային հոսքեր, որոնցում մենք լրացրել ենք առաջադրանքներ),
  • յուրաքանչյուրում միջինը 70 առաջադրանք,
  • այս բարությունը սկսվում է (նաև միջին հաշվով) ժամը մեկ անգամ.

Իսկ այն մասին, թե ինչպես ենք մենք ընդլայնվել, ես կգրեմ ստորև, բայց հիմա եկեք սահմանենք über-խնդիրը, որը մենք կլուծենք.

Կան երեք աղբյուր SQL սերվերներ, որոնցից յուրաքանչյուրը ունի 50 տվյալների բազա՝ համապատասխանաբար մեկ նախագծի օրինակներ, նրանք ունեն նույն կառուցվածքը (գրեթե ամենուր, mua-ha-ha), ինչը նշանակում է, որ յուրաքանչյուրն ունի Պատվերների աղյուսակ (բարեբախտաբար, աղյուսակը դրանով): անունը կարող է մղվել ցանկացած բիզնեսի մեջ): Մենք վերցնում ենք տվյալները՝ ավելացնելով սպասարկման դաշտերը (աղբյուրի սերվեր, աղբյուրի տվյալների բազա, ETL առաջադրանքի ID) և միամտաբար նետում ենք դրանք, ասենք, Vertica:

Եկեք գնանք.

Հիմնական մասը՝ գործնական (և մի փոքր տեսական)

Ինչու է դա մեզ համար (և ձեզ համար)

Երբ ծառերը մեծ էին, իսկ ես պարզ SQLՌուսական մանրածախ առևտրի մեկում մենք խաբել ենք ETL գործընթացները՝ որպես տվյալների հոսքեր՝ օգտագործելով մեզ հասանելի երկու գործիքներ.

  • Informatica Power Center - չափազանց տարածվող համակարգ, չափազանց արդյունավետ, իր ապարատով, իր սեփական տարբերակով: Աստված մի արասցե դրա հնարավորությունների 1%-ն օգտագործեցի։ Ինչո՞ւ։ Դե, առաջին հերթին, այս ինտերֆեյսը, ինչ-որ տեղ 380-ականներից, մտավոր ճնշում էր մեզ վրա: Երկրորդ, այս գործիքը նախատեսված է չափազանց շքեղ գործընթացների, կատաղի բաղադրիչների վերաօգտագործման և այլ շատ կարևոր ձեռնարկատիրական հնարքների համար: Այն մասին, որ այն արժե, ինչպես Airbus AXNUMX-ի թևը / տարի, մենք ոչինչ չենք ասի:

    Զգուշացեք, սքրինշոթը կարող է մի փոքր վնասել 30 տարեկանից ցածր մարդկանց

    Apache օդային հոսք. ETL-ի հեշտացում

  • SQL Server ինտեգրման սերվեր -Այս ընկերոջը մենք օգտագործել ենք մեր ներնախագծային հոսքերում։ Դե, փաստորեն, մենք արդեն օգտագործում ենք SQL Server, և ինչ-որ կերպ անհիմն կլինի չօգտագործել նրա ETL գործիքները: Դրանում ամեն ինչ լավ է. և՛ ինտերֆեյսը գեղեցիկ է, և՛ առաջընթացի մասին հաշվետվություններ... Բայց սա չէ պատճառը, որ մենք սիրում ենք ծրագրային արտադրանք, ախ, ոչ դրա համար: Տարբերակ այն dtsx (որն է XML-ը, որտեղ հանգույցները խառնվում են պահպանման վրա) մենք կարող ենք, բայց ո՞րն է իմաստը: Ի՞նչ կասեք առաջադրանքների փաթեթ ստեղծելու մասին, որը հարյուրավոր աղյուսակներ կքաշի մի սերվերից մյուսը: Այո, ինչ հարյուր, ձեր ցուցամատը քսան կտորից կընկնի, սեղմելով մկնիկի կոճակը։ Բայց դա հաստատ ավելի նորաձև է թվում.

    Apache օդային հոսք. ETL-ի հեշտացում

Մենք, անշուշտ, ելքեր էինք փնտրում։ Գործը նույնիսկ գրեթե եկել է ինքնուրույն գրված SSIS փաթեթի գեներատոր ...

…և հետո նոր աշխատանք գտա ինձ: Եվ Apache Airflow-ը գերազանցեց ինձ դրա վրա:

Երբ ես իմացա, որ ETL գործընթացի նկարագրությունները պարզ Python կոդ են, ես պարզապես ուրախությունից չպարեցի: Ահա թե ինչպես են տվյալների հոսքերը տարբերակվել և տարբերվել, և հարյուրավոր տվյալների բազաներից մեկ թիրախի մեջ մեկ կառուցվածքով աղյուսակներ լցնելը դարձել է Python-ի կոդի խնդիր մեկուկես կամ երկու 13” էկրաններում:

Կլաստերի հավաքում

Եկեք ամբողջությամբ մանկապարտեզ չկազմակերպենք և այստեղ չխոսենք բոլորովին ակնհայտ բաների մասին, օրինակ՝ տեղադրել Airflow-ը, ձեր ընտրած տվյալների բազան, Celery-ը և նավահանգիստներում նկարագրված այլ դեպքեր:

Որպեսզի մենք անմիջապես սկսենք փորձերը, ես ուրվագծեցի docker-compose.yml որի մեջ:

  • Եկեք իրականում բարձրացնենք ՕդափոխումըԺամանակացույց, Վեբսերվեր: Ծաղիկը նույնպես կպտտվի այնտեղ՝ վերահսկելու Նեխուրի առաջադրանքները (քանի որ այն արդեն դրված է apache/airflow:1.10.10-python3.7, բայց մենք դեմ չենք)
  • PostgreSQL, որում Airflow-ը կգրի իր սպասարկման տեղեկատվությունը (ժամանակացույցի տվյալներ, կատարման վիճակագրություն և այլն), իսկ Celery-ը կնշի կատարված առաջադրանքները.
  • Redis, որը հանդես կգա որպես առաջադրանքների բրոքեր Celery-ի համար;
  • Նեխուրի բանվոր, որը կզբաղվի առաջադրանքների անմիջական կատարմամբ։
  • Թղթապանակին ./dags մենք կավելացնենք մեր ֆայլերը դագերի նկարագրությամբ: Դրանք կվերցվեն անմիջապես, այնպես որ կարիք չկա ամեն փռշտալուց հետո ամբողջ կույտը ձեռնամուխ լինել:

Որոշ տեղերում օրինակների ծածկագիրը ամբողջությամբ չի ցուցադրվում (որպեսզի տեքստը չխառնվի), բայց ինչ-որ տեղ այն փոփոխվում է գործընթացում։ Աշխատանքային կոդի ամբողջական օրինակները կարելի է գտնել պահեստում https://github.com/dm-logv/airflow-tutorial.

դոկտոր-կոմպոզ.իմլ

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Ծանուցում:

  • Կոմպոզիցիայի հավաքման ժամանակ ես մեծապես ապավինում էի հայտնի կերպարին puckel/docker-airflow - Համոզվեք, որ ստուգեք այն: Միգուցե ձեր կյանքում ուրիշ ոչինչ պետք չէ:
  • Օդի հոսքի բոլոր կարգավորումները հասանելի են ոչ միայն միջոցով airflow.cfg, այլ նաև շրջակա միջավայրի փոփոխականների միջոցով (շնորհիվ մշակողների), որոնցից ես չարամտորեն օգտվեցի։
  • Բնականաբար, այն պատրաստ չէ արտադրությանը. ես միտումնավոր սրտի զարկեր չեմ դրել տարաների վրա, չեմ անհանգստացրել անվտանգությանը։ Բայց ես արեցի մեր փորձարարներին հարմար նվազագույնը։
  • Նշենք, որ.
    • Dag թղթապանակը պետք է հասանելի լինի ինչպես ժամանակացույցի, այնպես էլ աշխատողների համար:
    • Նույնը վերաբերում է բոլոր երրորդ կողմի գրադարաններին. դրանք բոլորը պետք է տեղադրվեն ժամանակացույցով և աշխատողներով մեքենաների վրա:

Դե, հիմա պարզ է.

$ docker-compose up --scale worker=3

Ամեն ինչ բարձրանալուց հետո կարող եք դիտել վեբ միջերեսները.

Հիմնական հասկացություններ

Եթե ​​այս բոլոր «դագերում» ոչինչ չհասկացաք, ապա ահա կարճ բառարան.

  • Scheduler - Airflow-ի ամենակարևոր հորեղբայրը, որը վերահսկում է, որ ռոբոտները քրտնաջան աշխատեն, և ոչ թե մարդը.

    Ընդհանրապես, հին տարբերակներում նա խնդիրներ ուներ հիշողության հետ (ոչ, ոչ թե ամնեզիա, այլ արտահոսք) և ժառանգության պարամետրը նույնիսկ մնաց կոնֆիգուրացիաներում: run_duration - դրա վերագործարկման միջակայքը: Բայց հիմա ամեն ինչ լավ է։

  • DAG (aka «dag») - «ուղղորդված ացիկլիկ գրաֆիկ», բայց նման սահմանումը քչերին կպատմի, բայց իրականում այն ​​միմյանց հետ փոխազդող առաջադրանքների համար նախատեսված կոնտեյներ է (տես ստորև) կամ SSIS-ի փաթեթի և Informatica-ի աշխատանքային հոսքի անալոգը: .

    Բացի դագերից, դեռ կարող են լինել ենթադասեր, բայց մենք, ամենայն հավանականությամբ, չենք հասնի դրանց:

  • DAG Run - սկզբնականացված դագ, որը նշանակված է իր սեփականը execution_date. Նույն դագի դագրանները կարող են զուգահեռ աշխատել (եթե ձեր առաջադրանքները, իհարկե, անիմաստ եք դարձրել):
  • օպերատոր կոդի կտորներ են, որոնք պատասխանատու են կոնկրետ գործողություն կատարելու համար: Օպերատորների երեք տեսակ կա.
    • գործողությունմեր սիրելիի նման PythonOperator, որը կարող է կատարել Python-ի ցանկացած (վավեր) կոդ;
    • փոխանցել, որոնք տեղափոխում են տվյալներ տեղից տեղ, ասենք. MsSqlToHiveTransfer;
    • սենսոր մյուս կողմից, դա ձեզ թույլ կտա արձագանքել կամ դանդաղեցնել դագի հետագա կատարումը, մինչև որևէ իրադարձություն տեղի ունենա: HttpSensor կարող է քաշել նշված վերջնակետը, և երբ ցանկալի պատասխանը սպասում է, սկսիր փոխանցումը GoogleCloudStorageToS3Operator. Հետաքրքրասեր միտքը կհարցնի. «Ինչո՞ւ. Ի վերջո, դուք կարող եք կրկնություններ անել հենց օպերատորում»: Եվ հետո, կասեցված օպերատորներով առաջադրանքների լողավազանը չխցանելու համար: Սենսորը սկսում է, ստուգում և մահանում է հաջորդ փորձից առաջ:
  • Խնդիր - հայտարարված օպերատորները, անկախ տեսակից, և կցված դագին, բարձրացվում են առաջադրանքի աստիճան:
  • առաջադրանքի օրինակ - երբ գլխավոր պլանավորողը որոշեց, որ ժամանակն է առաջադրանքները մարտ ուղարկելու կատարող-աշխատողներին (հենց տեղում, եթե մենք օգտագործում ենք. LocalExecutor կամ հեռավոր հանգույցի դեպքում CeleryExecutor), այն վերագրում է նրանց համատեքստ (այսինքն՝ փոփոխականների մի շարք՝ կատարման պարամետրեր), ընդլայնում է հրամանների կամ հարցումների ձևանմուշները և միավորում դրանք։

Մենք ստեղծում ենք առաջադրանքներ

Նախ, եկեք ուրվագծենք մեր խմորի ընդհանուր սխեման, այնուհետև ավելի ու ավելի կխորանանք մանրամասների մեջ, քանի որ մենք կիրառում ենք որոշ ոչ տրիվիալ լուծումներ:

Այսպիսով, իր ամենապարզ ձևով նման դագը կունենա հետևյալ տեսքը.

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Եկեք պարզենք դա.

  • Նախ, մենք ներմուծում ենք անհրաժեշտ լիբները և ինչ որ այլ բան;
  • sql_server_ds - Ից List[namedtuple[str, str]] Airflow Connections-ի միացումների անուններով և տվյալների բազաներով, որոնցից մենք կվերցնենք մեր ափսեը.
  • dag - մեր դագի հայտարարությունը, որն անպայման պետք է լինի globals(), հակառակ դեպքում Airflow-ը չի գտնի այն: Դագը նաև պետք է ասի.
    • ինչ է նրա անունը orders - այս անունը այնուհետև կհայտնվի վեբ ինտերֆեյսում,
    • որ նա աշխատելու է հուլիսի XNUMX-ի կեսգիշերից,
    • և այն պետք է աշխատի մոտավորապես 6 ժամը մեկ (այստեղ կոշտ տղաների փոխարեն timedelta() թույլատրելի cron- տող 0 0 0/6 ? * * *, ավելի քիչ զովի համար - նման արտահայտություն @daily);
  • workflow() հիմնական գործը կանի, բայց ոչ հիմա։ Առայժմ մենք պարզապես կթափենք մեր համատեքստը գրանցամատյանում:
  • Եվ հիմա առաջադրանքներ ստեղծելու պարզ կախարդանքը.
    • մենք անցնում ենք մեր աղբյուրներով.
    • սկզբնավորել PythonOperator, որը կկատարի մեր խաբեբայությունը workflow(). Մի մոռացեք նշել առաջադրանքի եզակի անվանումը (դագի սահմաններում) և կապել դագը: Դրոշ provide_context իր հերթին լրացուցիչ արգումենտներ կլցնի ֆունկցիայի մեջ, որոնք մենք զգուշորեն կհավաքենք՝ օգտագործելով **context.

Առայժմ այսքանը: Այն, ինչ մենք ստացանք.

  • նոր դաջվածք վեբ ինտերֆեյսում,
  • մեկուկես հարյուր առաջադրանք, որոնք կիրականացվեն զուգահեռ (եթե դա թույլ են տալիս Airflow, Celery կարգավորումները և սերվերի հզորությունը):

Դե, գրեթե հասկացա:

Apache օդային հոսք. ETL-ի հեշտացում
Ո՞վ է տեղադրելու կախվածությունները:

Այս ամբողջը պարզեցնելու համար ես պտտվեցի docker-compose.yml վերամշակում requirements.txt բոլոր հանգույցների վրա:

Հիմա այն գնացել է.

Apache օդային հոսք. ETL-ի հեշտացում

Մոխրագույն քառակուսիները առաջադրանքների օրինակներ են, որոնք մշակվում են ժամանակացույցի կողմից:

Մենք մի փոքր սպասում ենք, առաջադրանքները խզվում են աշխատողների կողմից.

Apache օդային հոսք. ETL-ի հեշտացում

Կանաչները, իհարկե, հաջողությամբ ավարտել են իրենց գործը։ Կարմիրներն այնքան էլ հաջող չեն:

Ի դեպ, մեր արդյունահանման վրա թղթապանակ չկա ./dagsՄեքենաների միջև համաժամացում չկա, բոլոր դաջերը գտնվում են git մեր Gitlab-ում, իսկ Gitlab CI-ն միաձուլվելիս թարմացումներ է բաժանում մեքենաներին master.

Մի փոքր Flower-ի մասին

Մինչ աշխատողները ծեծում են մեր ծծակները, եկեք հիշենք մեկ այլ գործիք, որը կարող է մեզ ինչ-որ բան ցույց տալ՝ Ծաղիկը:

Առաջին էջը աշխատող հանգույցների վերաբերյալ ամփոփ տեղեկություններով.

Apache օդային հոսք. ETL-ի հեշտացում

Ամենաինտենսիվ էջը առաջադրանքներով, որոնք գործի են անցել.

Apache օդային հոսք. ETL-ի հեշտացում

Մեր բրոքերի կարգավիճակով ամենաձանձրալի էջը.

Apache օդային հոսք. ETL-ի հեշտացում

Ամենապայծառ էջը առաջադրանքի կարգավիճակի գծապատկերներով և դրանց կատարման ժամանակով է.

Apache օդային հոսք. ETL-ի հեշտացում

Մենք բեռնում ենք թերբեռնվածը

Այսպիսով, բոլոր առաջադրանքները մշակված են, դուք կարող եք տանել վիրավորներին։

Apache օդային հոսք. ETL-ի հեշտացում

Ու կային բազմաթիվ վիրավորներ՝ այս կամ այն ​​պատճառով։ Airflow-ի ճիշտ օգտագործման դեպքում այս նույն քառակուսիները ցույց են տալիս, որ տվյալները հաստատ չեն հասել։

Դուք պետք է դիտեք գրանցամատյանը և վերագործարկեք ձախողված առաջադրանքների դեպքերը:

Սեղմելով ցանկացած քառակուսի, մենք կտեսնենք մեզ հասանելի գործողությունները.

Apache օդային հոսք. ETL-ի հեշտացում

Դուք կարող եք վերցնել և մաքրել ընկածներին: Այսինքն, մենք մոռանում ենք, որ այնտեղ ինչ-որ բան ձախողվել է, և նույն օրինակի առաջադրանքը գնալու է ժամանակացույցին:

Apache օդային հոսք. ETL-ի հեշտացում

Հասկանալի է, որ մկնիկի հետ դա անելը բոլոր կարմիր քառակուսիներով այնքան էլ մարդասիրական չէ. սա այն չէ, ինչ մենք ակնկալում ենք Airflow-ից: Բնականաբար, մենք ունենք զանգվածային ոչնչացման զենք. Browse/Task Instances

Apache օդային հոսք. ETL-ի հեշտացում

Եկեք միանգամից ընտրենք ամեն ինչ և վերակայենք զրոյի, սեղմեք ճիշտ կետը.

Apache օդային հոսք. ETL-ի հեշտացում

Մաքրումից հետո մեր տաքսիներն այսպիսի տեսք ունեն (նրանք արդեն սպասում են, որ ժամանակացույցը պլանավորի).

Apache օդային հոսք. ETL-ի հեշտացում

Միացումներ, կեռիկներ և այլ փոփոխականներ

Ժամանակն է նայելու հաջորդ DAG-ին, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Բոլորը երբևէ կատարել են հաշվետվության թարմացում: Սա կրկին նա է. կա աղբյուրների ցանկ, որտեղից կարելի է ստանալ տվյալները. կա ցուցակ, որտեղ դնել; մի մոռացեք ձայն տալ, երբ ամեն ինչ տեղի ունեցավ կամ կոտրվեց (դե, սա մեր մասին չէ, ոչ):

Եկեք նորից անցնենք ֆայլի միջով և նայենք նոր անհասկանալի նյութերին.

  • from commons.operators import TelegramBotSendMessage - ոչինչ չի խանգարում մեզ ստեղծել մեր սեփական օպերատորները, ինչից մենք օգտվեցինք՝ պատրաստելով փոքրիկ փաթաթան Unblocked-ին հաղորդագրություններ ուղարկելու համար: (Այս օպերատորի մասին ավելին կխոսենք ստորև);
  • default_args={} - dag-ը կարող է նույն փաստարկները տարածել իր բոլոր օպերատորներին.
  • to='{{ var.value.all_the_kings_men }}' - դաշտ to մենք չենք ունենա կոշտ կոդավորված, այլ դինամիկ ձևով գեներացվել՝ օգտագործելով Jinja և փոփոխական՝ նամակների ցանկով, որը ես ուշադիր տեղադրել եմ Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — օպերատորը գործարկելու պայման. Մեր դեպքում նամակը շեֆերին կթռչի միայն այն դեպքում, եթե բոլոր կախվածությունները լուծվեն հաջողությամբ;
  • tg_bot_conn_id='tg_main' - փաստարկներ conn_id ընդունել կապի ID-ները, որոնցում մենք ստեղծում ենք Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Telegram-ի հաղորդագրությունները կթռչեն միայն այն դեպքում, եթե կան ձախողված առաջադրանքներ.
  • task_concurrency=1 - մենք արգելում ենք մեկ առաջադրանքի մի քանի առաջադրանքների միաժամանակյա գործարկումը: Հակառակ դեպքում մենք կստանանք մի քանիսի միաժամանակյա գործարկում VerticaOperator (նայելով մեկ սեղանին);
  • report_update >> [email, tg] - Բոլորը VerticaOperator զուգակցվում են նամակներ և հաղորդագրություններ ուղարկելիս, ինչպես հետևյալը.
    Apache օդային հոսք. ETL-ի հեշտացում

    Բայց քանի որ ծանուցող օպերատորները գործարկման տարբեր պայմաններ ունեն, միայն մեկը կաշխատի: Ծառի տեսքում ամեն ինչ մի փոքր ավելի քիչ տեսողական է թվում.
    Apache օդային հոսք. ETL-ի հեշտացում

Մի քանի խոսք կասեմ դրա մասին մակրոներ և նրանց ընկերները - փոփոխականներ.

Մակրոները Jinja տեղապահներ են, որոնք կարող են տարբեր օգտակար տեղեկություններ փոխարինել օպերատորի փաստարկներով: Օրինակ, այսպես.

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} կընդլայնվի մինչև համատեքստի փոփոխականի բովանդակությունը execution_date ձեւաչափով YYYY-MM-DD: 2020-07-14. Լավագույնն այն է, որ համատեքստի փոփոխականները գամված են առաջադրանքի կոնկրետ օրինակին (քառակուսի ծառի տեսքում), և երբ վերագործարկվեն, տեղապահները կընդլայնվեն մինչև նույն արժեքները:

Նշանակված արժեքները կարելի է դիտել՝ օգտագործելով Rendered կոճակը յուրաքանչյուր առաջադրանքի օրինակում: Նամակ ուղարկելու առաջադրանքը հետևյալն է.

Apache օդային հոսք. ETL-ի հեշտացում

Եվ այսպես, հաղորդագրություն ուղարկելու առաջադրանքում.

Apache օդային հոսք. ETL-ի հեշտացում

Ներկառուցված մակրոների ամբողջական ցանկը վերջին հասանելի տարբերակի համար հասանելի է այստեղ. մակրո հղում

Ավելին, փլագինների օգնությամբ մենք կարող ենք հայտարարել մեր սեփական մակրոները, բայց դա այլ պատմություն է։

Բացի նախապես սահմանված բաներից, մենք կարող ենք փոխարինել մեր փոփոխականների արժեքները (ես արդեն օգտագործել եմ սա վերևի կոդում): Եկեք ստեղծենք ներսում Admin/Variables մի քանի բան.

Apache օդային հոսք. ETL-ի հեշտացում

Այն ամենը, ինչ կարող եք օգտագործել.

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Արժեքը կարող է լինել սկալյար, կամ կարող է լինել նաև JSON: JSON-ի դեպքում.

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

պարզապես օգտագործեք ցանկալի բանալին տանող ճանապարհը. {{ var.json.bot_config.bot.token }}.

Ես բառացիորեն կասեմ մեկ բառ և ցույց կտամ մեկ սքրինշոթ մասին կապեր. Այստեղ ամեն ինչ տարրական է՝ էջում Admin/Connections մենք ստեղծում ենք կապ, այնտեղ ավելացնում ենք մեր մուտքերը/գաղտնաբառերը և ավելի կոնկրետ պարամետրեր: Սրա նման:

Apache օդային հոսք. ETL-ի հեշտացում

Գաղտնաբառերը կարող են գաղտնագրվել (ավելի մանրամասն, քան լռելյայն), կամ կարող եք բաց թողնել կապի տեսակը (ինչպես ես արեցի tg_main) - Փաստն այն է, որ տեսակների ցանկը ամրագրված է Airflow մոդելներում և չի կարող ընդլայնվել առանց սկզբնաղբյուրի կոդերի մեջ մտնելու (եթե հանկարծ ինչ-որ բան չեմ փնտրել google-ում, խնդրում եմ ուղղեք ինձ), բայց ոչինչ չի խանգարի մեզ միայն վարկեր ստանալ: Անուն.

Կարող եք նաև մի քանի կապ հաստատել նույն անունով՝ այս դեպքում՝ մեթոդը BaseHook.get_connection(), որը մեզ կապեր է ստանում անունով, կտա պատահական մի քանի անվանակիցներից (ավելի տրամաբանական կլիներ Round Robin անել, բայց թողնենք դա Airflow մշակողների խղճին):

Փոփոխականները և միացումները, իհարկե, հիանալի գործիքներ են, բայց կարևոր է չկորցնել հավասարակշռությունը. ձեր հոսքերի որ մասերն եք պահում հենց կոդի մեջ, և որ մասերը տրամադրում եք Airflow-ին պահեստավորման համար: Մի կողմից, արժեքը արագ փոխելը, օրինակ, փոստային արկղը, կարող է հարմար լինել UI-ի միջոցով: Մյուս կողմից, սա դեռ վերադարձ է մկնիկի սեղմմանը, որից մենք (ես) ուզում էինք ազատվել։

Կապերի հետ աշխատելը խնդիրներից մեկն է կեռիկներ. Ընդհանուր առմամբ, Airflow կեռիկները այն երրորդ կողմի ծառայություններին և գրադարաններին միացնելու կետեր են: Օրինակ, JiraHook մեզ համար հաճախորդ կբացի Ժիրայի հետ շփվելու համար (դուք կարող եք առաջադրանքները ետ ու առաջ տեղափոխել) և օգնությամբ SambaHook դուք կարող եք մղել տեղական ֆայլը smb- կետ.

Պատվերով օպերատորի վերլուծություն

Եվ մենք մոտեցանք նայելուն, թե ինչպես է այն պատրաստված TelegramBotSendMessage

Code commons/operators.py փաստացի օպերատորի հետ՝

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Այստեղ, ինչպես և ամեն ինչ Airflow-ում, ամեն ինչ շատ պարզ է.

  • ժառանգել է BaseOperator, որն իրականացնում է օդային հոսքին հատուկ բաներ (նայեք ձեր ժամանցին)
  • Հայտարարված դաշտեր template_fields, որում Ջինջան կփնտրի մակրոներ՝ մշակելու համար։
  • Դասավորեց ճիշտ փաստարկները __init__(), անհրաժեշտության դեպքում սահմանեք կանխադրվածները:
  • Չմոռացանք նաև նախահայրի սկզբնավորման մասին։
  • Բացել է համապատասխան կեռիկը TelegramBotHookդրանից ստացել է հաճախորդի օբյեկտ:
  • Վերագրանցված (վերասահմանված) մեթոդ BaseOperator.execute(), որը Airfow-ը կկտրվի, երբ գա օպերատորը գործարկելու ժամանակը, դրանում մենք կիրականացնենք հիմնական գործողությունը՝ մոռանալով մուտք գործել: (Մենք մուտք ենք գործում, ի դեպ, հենց ներս stdout и stderr - Օդային հոսքը կխափանի ամեն ինչ, գեղեցիկ կփաթաթի, անհրաժեշտության դեպքում կքայքայվի:)

Եկեք տեսնենք, թե ինչ ունենք commons/hooks.py. Ֆայլի առաջին մասը՝ ինքնին կեռիկով.

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ես նույնիսկ չգիտեմ, թե ինչ բացատրել այստեղ, ես պարզապես նշեմ կարևոր կետերը.

  • Մենք ժառանգում ենք, մտածում փաստարկների մասին, շատ դեպքերում դա կլինի մեկը. conn_id;
  • Գերակայող ստանդարտ մեթոդներ. ես սահմանափակեցի ինձ get_conn(), որում ես ստանում եմ կապի պարամետրերը անունով և պարզապես ստանում եմ բաժինը extra (սա JSON դաշտ է), որում ես (ըստ իմ հրահանգների!) տեղադրել եմ Telegram բոտի նշանը. {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ես ստեղծում եմ մեր օրինակը TelegramBot, դրան տալով կոնկրետ նշան։

Այսքանը: Դուք կարող եք հաճախորդ ստանալ մանգաղից, օգտագործելով TelegramBotHook().clent կամ TelegramBotHook().get_conn().

Եվ ֆայլի երկրորդ մասը, որում ես միկրոփաթաթում եմ պատրաստում Telegram REST API-ի համար, որպեսզի նույնը չքաշեմ python-telegram-bot մեկ մեթոդի համար sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Ճիշտ ճանապարհը այս ամենը գումարելն է. TelegramBotSendMessage, TelegramBotHook, TelegramBot - plugin-ում, դրեք հանրային պահոց և տվեք այն Open Source-ին:

Մինչ մենք ուսումնասիրում էինք այս ամենը, մեր զեկույցի թարմացումները հաջողությամբ ձախողվեցին և ինձ ալիքում սխալ հաղորդագրություն ուղարկեցին: Ես պատրաստվում եմ ստուգել, ​​թե արդյոք դա սխալ է ...

Apache օդային հոսք. ETL-ի հեշտացում
Ինչ-որ բան կոտրվեց մեր դոգում: Մի՞թե դա այն չէ, ինչ մենք սպասում էինք: Ճիշտ!

Դուք պատրաստվում եք լցնել:

Դուք զգում եք, որ ես ինչ-որ բան բաց եմ թողել: Կարծես խոստացել էր SQL Server-ից տվյալները փոխանցել Vertica, հետո վերցրեց ու թեմայից շեղեց, սրիկա՛։

Այս վայրագությունը միտումնավոր էր, ես պարզապես ստիպված էի վերծանել ձեզ համար ինչ-որ տերմինաբանություն: Այժմ կարող եք ավելի հեռուն գնալ:

Մեր ծրագիրը հետևյալն էր.

  1. Դարձիր
  2. Ստեղծեք առաջադրանքներ
  3. Տեսեք, թե որքան գեղեցիկ է ամեն ինչ
  4. Նշանակե՛ք նիստերի համարները լրացումներին
  5. Ստացեք տվյալներ SQL Server-ից
  6. Տեղադրեք տվյալները Vertica-ում
  7. Հավաքել վիճակագրություն

Այսպիսով, այս ամենը գործի դնելու համար ես մի փոքր հավելում արեցի մեր docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Այնտեղ մենք բարձրացնում ենք.

  • Vertica որպես հյուրընկալող dwh առավել լռելյայն կարգավորումներով,
  • SQL Server-ի երեք օրինակ,
  • վերջիններիս շտեմարանները լրացնում ենք որոշ տվյալներով (ոչ մի դեպքում չդիտեք mssql_init.py!)

Մենք գործարկում ենք ամեն լավը մի փոքր ավելի բարդ հրամանի օգնությամբ, քան նախորդ անգամ.

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Այն, ինչ ստեղծել է մեր հրաշք պատահական սարքը, կարող եք օգտագործել նյութը Data Profiling/Ad Hoc Query:

Apache օդային հոսք. ETL-ի հեշտացում
Գլխավորը դա վերլուծաբաններին ցույց չտալն է

մանրամասնել ETL նիստեր Ես չեմ անի, այնտեղ ամեն ինչ մանրուք է. մենք հիմք ենք ստեղծում, դրա մեջ նշան կա, մենք ամեն ինչ փաթաթում ենք համատեքստի կառավարիչով, և հիմա մենք անում ենք սա.

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Ժամանակը եկել է հավաքել մեր տվյալները մեր մեկուկես հարյուր սեղաններից։ Եկեք դա անենք շատ ոչ հավակնոտ տողերի օգնությամբ.

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Կեռիկի օգնությամբ մենք ստանում ենք Airflow-ից pymssql- միացնել
  2. Եկեք հարցման մեջ փոխարինենք ամսաթվի ձևով սահմանափակում. այն կներդրվի ֆունկցիայի մեջ կաղապարի շարժիչի կողմից:
  3. Սնուցելով մեր խնդրանքը pandasով մեզ կբերի DataFrame - դա մեզ ապագայում օգտակար կլինի։

Ես օգտագործում եմ փոխարինում {dt} հարցման պարամետրի փոխարեն %s ոչ թե այն պատճառով, որ ես չար Պինոքիո եմ, այլ որովհետև pandas չի կարողանում գլուխ հանել pymssql և սայթաքում է վերջինը params: Listչնայած նա իսկապես ցանկանում է tuple.
Նաև նշեք, որ մշակողը pymssql որոշել է այլևս չաջակցել նրան, և ժամանակն է տեղափոխվել pyodbc.

Տեսնենք, թե Airflow-ն ինչով է լցրել մեր գործառույթների փաստարկները.

Apache օդային հոսք. ETL-ի հեշտացում

Եթե ​​տվյալներ չկան, ուրեմն շարունակելն իմաստ չունի։ Բայց նաեւ տարօրինակ է լցոնումը հաջողված համարելը։ Բայց սա սխալ չէ։ A-ah-ah, ինչ անել?! Եվ ահա թե ինչ.

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException Airflow-ին կասի, որ սխալներ չկան, բայց մենք բաց ենք թողնում առաջադրանքը: Ինտերֆեյսը չի ունենա կանաչ կամ կարմիր քառակուսի, այլ վարդագույն:

Եկեք նետենք մեր տվյալները բազմաթիվ սյունակներ:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Այսինքն

  • Տվյալների բազան, որտեղից մենք վերցրել ենք պատվերները,
  • Մեր ջրհեղեղի նստաշրջանի ID-ն (այլ կլինի յուրաքանչյուր առաջադրանքի համար),
  • Հեշ աղբյուրից և պատվերի ID-ն, այնպես որ վերջնական տվյալների բազայում (որտեղ ամեն ինչ լցվում է մեկ աղյուսակում) մենք ունենք պատվերի եզակի ID:

Մնում է նախավերջին քայլը՝ ամեն ինչ լցնել Vertica-ի մեջ։ Եվ, տարօրինակ կերպով, դա անելու ամենադիտարժան և արդյունավետ միջոցներից մեկը CSV-ն է:

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Պատրաստում ենք հատուկ ընդունիչ StringIO.
  2. pandas սիրով կդնի մեր DataFrame ձևով CSV- տողեր.
  3. Եկեք կեռիկով կապ բացենք մեր սիրելի Vertica-ի հետ։
  4. Իսկ հիմա օգնությամբ copy() ուղարկել մեր տվյալները անմիջապես Vertika-ին:

Մենք վարորդից կվերցնենք, թե քանի գիծ է լցվել, և նիստի մենեջերին կասենք, որ ամեն ինչ կարգին է.

session.loaded_rows = cursor.rowcount
session.successful = True

Այսքանը:

Վաճառքի ժամանակ մենք ձեռքով ստեղծում ենք թիրախային ափսե: Այստեղ ես ինձ թույլ տվեցի փոքրիկ մեքենա.

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

ես օգտագործում եմ VerticaOperator() Ես ստեղծում եմ տվյալների բազայի սխեման և աղյուսակ (եթե դրանք արդեն չկան, իհարկե): Հիմնական բանը կախվածությունները ճիշտ դասավորելն է.

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Ամփոփելով

-Դե,- ասաց փոքրիկ մուկը,- չէ՞, հիմա
Համոզվա՞ծ ես, որ ես անտառի ամենասարսափելի կենդանին եմ։

Ջուլիա Դոնալդսոն, Գրուֆալոն

Կարծում եմ, եթե ես և իմ գործընկերները ունենայինք մրցակցություն. Վայ, կարծում եմ կհամաձայնեք, որ ես կհաղթեմ նրանց բոլոր ճակատներում։

Եթե ​​մի փոքր ավելի լուրջ, ապա Apache Airflow-ը, գործընթացները նկարագրելով ծրագրի կոդի տեսքով, արեց իմ գործը շատ ավելի հարմարավետ և հաճելի:

Նրա անսահմանափակ ընդարձակելիությունը, ինչպես plug-ins-ի, այնպես էլ մասշտաբայնության նախատրամադրվածության առումով, հնարավորություն է տալիս օգտագործել Airflow գրեթե ցանկացած ոլորտում՝ նույնիսկ տվյալների հավաքագրման, պատրաստման և մշակման ամբողջական ցիկլում, նույնիսկ հրթիռների արձակման ժամանակ (դեպի Մարս, դասընթաց).

Մասի վերջնական, հղում և տեղեկատվություն

Փոցխը, որը մենք հավաքել ենք ձեզ համար

  • start_date. Այո, սա արդեն տեղական մեմ է: Դագի հիմնական փաստարկի միջոցով start_date բոլորն անցնում են. Հակիրճ, եթե նշեք start_date ընթացիկ ամսաթիվը, և schedule_interval -Մի օր, հետո ԴԱԳ-ն վաղը կսկսի ոչ շուտ։
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Եվ այլևս ոչ մի խնդիր:

    Դրա հետ կապված գործարկման ժամանակի մեկ այլ սխալ կա. Task is missing the start_date parameter, որն ամենից հաճախ ցույց է տալիս, որ մոռացել եք կապել դագ օպերատորին։

  • Ամեն ինչ մեկ մեքենայի վրա: Այո, և հիմքեր (ինքն օդային հոսքը և մեր ծածկույթը), և վեբ սերվեր, և ժամանակացույց և աշխատողներ: Եվ դա նույնիսկ աշխատեց: Բայց ժամանակի ընթացքում ծառայությունների համար առաջադրանքների թիվն աճեց, և երբ PostgreSQL-ը սկսեց ինդեքսին արձագանքել 20 ms-ի փոխարեն 5 վայրկյանում, մենք վերցրեցինք այն և տարանք:
  • Տեղական կատարող. Այո, մենք դեռ նստած ենք դրա վրա, և արդեն հասել ենք անդունդի եզրին։ LocalExecutor-ը մեզ մինչ այժմ բավական էր, բայց հիմա ժամանակն է ընդլայնվելու առնվազն մեկ աշխատողով, և մենք պետք է շատ աշխատենք CeleryExecutor-ին անցնելու համար: Եվ հաշվի առնելով այն փաստը, որ դուք կարող եք աշխատել դրա հետ մեկ մեքենայի վրա, ոչինչ չի խանգարում ձեզ օգտագործել Celery նույնիսկ սերվերի վրա, որը «իհարկե, երբեք չի արտադրվի, ազնվորեն»:
  • Չօգտագործում ներկառուցված գործիքներ:
    • Կապեր սպասարկման հավատարմագրերը պահելու համար,
    • SLA Misses ժամանակին չստացված առաջադրանքներին արձագանքել,
    • xcom մետատվյալների փոխանակման համար (ես ասացի մետատվյալների!) dag առաջադրանքների միջև:
  • Փոստի չարաշահում. Դե ինչ ասեմ։ Զգուշացումներ են սահմանվել տապալված առաջադրանքների բոլոր կրկնությունների համար: Այժմ իմ աշխատանքային Gmail-ն ունի ավելի քան 90 հազար նամակներ Airflow-ից, և վեբ փոստի դնչկալը հրաժարվում է միաժամանակ վերցնել և ջնջել ավելի քան 100 նամակ:

Ավելի շատ որոգայթներ. Apache Airflow Pitfails

Ավելի շատ ավտոմատացման գործիքներ

Որպեսզի մենք ավելի շատ աշխատենք մեր գլխով և ոչ թե ձեռքերով, Airflow-ը մեզ համար պատրաստել է սա.

  • REST API -Նա դեռևս ունի Էքսպերիմենտալի կարգավիճակ, ինչը չի խանգարում նրան աշխատել։ Դրա միջոցով դուք կարող եք ոչ միայն տեղեկատվություն ստանալ դագերի և առաջադրանքների մասին, այլև դադարեցնել/սկսել դագը, ստեղծել DAG Run կամ լողավազան:
  • CLI - հրամանի տողի միջոցով հասանելի են բազմաթիվ գործիքներ, որոնք ոչ միայն անհարմար են WebUI-ի միջոցով օգտագործելու համար, այլ ընդհանրապես բացակայում են: Օրինակ:
    • backfill անհրաժեշտ է առաջադրանքների օրինակները վերագործարկելու համար:
      Օրինակ, վերլուծաբաններ եկան ու ասացին. «Իսկ դու, ընկեր, հունվարի 1-ից 13-ի տվյալների մեջ հիմարություն ունես։ Շտկե՛ք, շտկե՛ք, շտկե՛ք, շտկե՛ք»։ Եվ դուք այդպիսի օջախ եք.

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Բազային ծառայություն. initdb, resetdb, upgradedb, checkdb.
    • run, որը թույլ է տալիս կատարել մեկ օրինակով առաջադրանք և նույնիսկ միավորներ հավաքել բոլոր կախվածությունների վրա: Ավելին, դուք կարող եք այն գործարկել միջոցով LocalExecutor, նույնիսկ եթե դուք ունեք նեխուրի կլաստեր:
    • Գրեթե նույն բանն է անում test, միայն նաև հիմքերում ոչինչ չի գրում։
    • connections թույլ է տալիս զանգվածային կապեր ստեղծել պատյանից:
  • python api - շփվելու բավականին հարդքոր եղանակ, որը նախատեսված է պլագինների համար, այլ ոչ թե փոքրիկ ձեռքերով լցվելու մեջ: Բայց ով է խանգարելու մեզ գնալ /home/airflow/dags, վազել ipython ու սկսե՞ս խառնվել: Դուք կարող եք, օրինակ, արտահանել բոլոր կապերը հետևյալ կոդով.
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Միացում Airflow մետատվյալների բազային: Ես խորհուրդ չեմ տալիս գրել դրան, բայց տարբեր կոնկրետ չափումների համար առաջադրանքների վիճակներ ստանալը կարող է շատ ավելի արագ և հեշտ լինել, քան API-ներից որևէ մեկի օգտագործումը:

    Ասենք, որ մեր բոլոր առաջադրանքները չէ, որ անզոր են, բայց դրանք երբեմն կարող են ընկնել, և դա նորմալ է։ Բայց մի քանի խցանումներ արդեն կասկածելի են, և հարկ կլինի ստուգել։

    Զգուշացեք SQL-ից:

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

Սայլակ

Եվ իհարկե, Google-ի թողարկման առաջին տասը հղումները իմ էջանիշներից Airflow թղթապանակի բովանդակությունն են:

Իսկ հոդվածում օգտագործված հղումները.

Source: www.habr.com