Apache Airflow: осонтар кардани ETL

Салом, ман Дмитрий Логвиненко - муҳандиси маълумоти шӯъбаи таҳлили гурӯҳи ширкатҳои Vezet.

Ман ба шумо дар бораи як воситаи олиҷаноб барои таҳияи равандҳои ETL нақл мекунам - Apache Airflow. Аммо Airflow он қадар гуногунҷанба ва гуногунҷанба аст, ки шумо бояд онро бодиққат аз назар гузаронед, ҳатто агар шумо дар ҷараёни маълумот иштирок надоред, аммо лозим аст, ки давра ба давра ҳама гуна равандҳоро оғоз кунед ва иҷрои онҳоро назорат кунед.

Ва ҳа, ман на танҳо мегӯям, балки нишон медиҳам: барнома бисёр код, скриншотҳо ва тавсияҳо дорад.

Apache Airflow: осонтар кардани ETL
Он чизеро, ки шумо одатан ҳангоми ҷустуҷӯи Google калимаи Airflow / Wikimedia Commons мебинед

Мундариҷа

Муқаддима

Apache Airflow ба монанди Django аст:

  • дар python навишта шудааст
  • як панели бузурги администратор вуҷуд дорад,
  • ба таври номуайян васеъ карда мешавад

- танҳо беҳтар аст ва он барои мақсадҳои тамоман дигар сохта шудааст, яъне (чунон ки пеш аз кат навишта шудааст):

  • иҷро ва мониторинги вазифаҳо дар шумораи номаҳдуди мошинҳо (чунон ки бисёре аз Celery / Kubernetes ва виҷдони шумо ба шумо имкон медиҳанд)
  • бо тавлиди ҷараёни кории динамикӣ аз навиштан ва фаҳмидани рамзи Python хеле осон аст
  • ва қобилияти пайваст кардани ҳама гуна пойгоҳи додаҳо ва API бо ҳам бо истифода аз ҷузъҳои тайёр ва плагинҳои хонагӣ (ки хеле содда аст).

Мо Apache Airflow-ро чунин истифода мебарем:

  • мо маълумотро аз сарчашмаҳои гуногун ҷамъ меорем (бисёр мисолҳои SQL Server ва PostgreSQL, API-ҳои гуногун бо ченакҳои барнома, ҳатто 1C) дар DWH ва ODS (мо Vertica ва Clickhouse дорем).
  • чи кадар пешкадам cron, ки равандҳои муттаҳидсозии маълумотро дар ODS оғоз мекунад ва инчунин нигоҳдории онҳоро назорат мекунад.

То ба наздикӣ эҳтиёҷоти моро як сервери хурд бо 32 ядро ​​ва 50 ГБ хотираи оперативӣ қонеъ мекард. Дар Airflow, ин кор мекунад:

  • более 200 дак (воқеан ҷараёнҳои корӣ, ки дар онҳо мо вазифаҳоро пур кардаем),
  • дар ҳар як ба ҳисоби миёна 70 вазифа,
  • ин некӣ оғоз мешавад (инчунин ба ҳисоби миёна) як маротиба дар як соат.

Ва дар бораи он ки мо чӣ гуна васеъ кардем, ман дар зер менависам, аммо ҳоло биёед мушкилоти über-ро муайян кунем, ки мо онро ҳал мекунем:

Се сервери аслии SQL мавҷуд аст, ки ҳар яки онҳо 50 пойгоҳи додаҳо доранд - намунаҳои як лоиҳа, мутаносибан, онҳо сохтори якхела доранд (қариб дар ҳама ҷо, муа-ха-ха), яъне ҳар кадоми онҳо ҷадвали фармоишҳо доранд (хушбахтона, ҷадвал бо он ном метавонад ба ҳама гуна тиҷорат тела дода шавад). Мо маълумотро тавассути илова кардани майдонҳои хидматрасонӣ (сервери манбаъ, пойгоҳи додаҳои манбаъ, ID вазифаи ETL) мегирем ва соддалавҳона онҳоро ба Vertica мепартоем.

Биёед биравем!

Қисми асосӣ, амалӣ (ва каме назариявӣ)

Чаро мо (ва шумо)

Вақте ки дарахтон калон буданд ва ман содда будам SQL-schik дар як чаканаи Русия, мо бо истифода аз ду асбоби дастраси мо равандҳои ETL-ро, яъне ҷараёни маълумотро фиреб додем:

  • Маркази энергетикии информатика - системаи бениҳоят паҳншаванда, бениҳоят самаранок, бо сахтафзори худ, версияи худ. Худо накунад 1% имконияташро истифода бурдам. Чаро? Хуб, пеш аз ҳама, ин интерфейс, дар ҷое аз солҳои 380-ум, рӯҳан ба мо фишор овард. Дуюм, ин зиддият барои равандҳои бениҳоят зебо, истифодаи такрории ҷузъҳои хашмгин ва дигар ҳиллаҳои хеле муҳими корхона пешбинӣ шудааст. Дар бораи он, ки он ба монанди боли Airbus AXNUMX дар як сол арзиш дорад, мо чизе намегӯем.

    Эҳтиёт бошед, скриншот метавонад ба одамони то 30-сола каме осеб расонад

    Apache Airflow: осонтар кардани ETL

  • Сервери ҳамгироии SQL Server — мо ин рафикро дар чараёни дохили-лоихавии худ истифода бурдем. Хуб, дар асл: мо аллакай SQL Server-ро истифода мебарем ва истифода накардани абзорҳои ETL-и он то андозае беасос мебуд. Ҳама чиз дар он хуб аст: ҳам интерфейс зебо аст ва ҳам гузоришҳои пешрафт ... Аммо аз ин сабаб нест, ки мо маҳсулоти нармафзорро дӯст медорем, оҳ, на барои ин. Версияи он dtsx (ки XML бо гиреҳҳои омехта дар нигоҳдорӣ аст) мо метавонем, аммо чӣ маъно дорад? Дар бораи сохтани бастаи вазифаҳо, ки садҳо ҷадвалҳоро аз як сервер ба сервери дигар кашола мекунанд, чӣ гуфтан мумкин аст? Бале, чй сад, ангушти ишорати ту аз бист порча меафтад, тугмаи мушро пахш мекунад. Аммо он бешубҳа мӯдтар ба назар мерасад:

    Apache Airflow: осонтар кардани ETL

Мо бешубҳа роҳҳои баромаданро ҷустуҷӯ кардем. Ҳолат ҳатто қариб ба генератори бастаи худнависи SSIS омад ...

...ва баъд маро кори нав пайдо кард. Ва Apache Airflow ба ман расид.

Вақте ки ман фаҳмидам, ки тавсифи раванди ETL рамзи оддии Python аст, ман танҳо аз шодӣ рақс накардам. Маҳз ҳамин тавр ҷараёнҳои додаҳо версия ва тафовут карда шуданд ва рехтани ҷадвалҳо бо сохтори ягона аз садҳо пойгоҳи додаҳо ба як ҳадаф дар якуним ё ду экрани 13 ”ба рамзи Python табдил ёфт.

Ҷамъоварии кластер

Биёед як кӯдакистонро комилан ташкил накунем ва дар ин ҷо дар бораи чизҳои комилан равшан сухан нагӯем, ба монанди насб кардани Airflow, пойгоҳи додаҳои интихобкардаи шумо, Карафс ва дигар ҳолатҳое, ки дар докҳо тавсиф шудаанд.

Барои он ки мо фавран таҷрибаҳоро оғоз кунем, ман нақша кашидам docker-compose.yml ки дар он:

  • Биёед воқеан баланд кунем Равиши ҳаво: Барномасоз, веб-сервер. Гул низ дар он ҷо барои назорат кардани вазифаҳои карафс чарх мезанад (зеро он аллакай ба он тела дода шудааст apache/airflow:1.10.10-python3.7, аммо мо зид нестем)
  • PostgreSQL, ки дар он Airflow маълумоти хидматрасонии худро (маълумоти банақшагирӣ, омори иҷро ва ғ.) менависад ва Celery вазифаҳои анҷомшударо қайд мекунад;
  • Redis, ки ҳамчун брокери вазифа барои Celery амал мекунад;
  • Коргари карафс, ки ба ичрои бевоситаи супоришхо машгул мешаванд.
  • Ба папка ./dags мо файлҳои худро бо тавсифи дагҳо илова мекунем. Онҳо дар магасе гирифта мешаванд, аз ин рӯ пас аз ҳар атса задан лозим нест, ки тамоми стекро ҷунглер кунед.

Дар баъзе ҷойҳо, коди мисолҳо пурра нишон дода нашудааст (барои он ки матнро парешон накунад), аммо дар ҷое он дар ҷараёни он тағир дода мешавад. Намунаҳои мукаммали коди кориро дар анбор пайдо кардан мумкин аст https://github.com/dm-logv/airflow-tutorial.

Доктор-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Эзоҳ:

  • Дар чамъоварии композиция ман бештар ба образи маъруф такя кардам puckel / docker-ҳаво - ҳатман онро тафтиш кунед. Шояд шумо дар ҳаёти худ ба чизи дигаре ниёз надоред.
  • Ҳама танзимоти ҷараёни ҳаво на танҳо тавассути airflow.cfg, балки инчунин тавассути тағирёбандаҳои муҳити зист (ба шарофати таҳиягарон), ки ман бадқасдона аз онҳо истифода бурдам.
  • Табиист, ки он ба истеҳсолот омода нест: ман дидаву дониста набзи дилро ба контейнерҳо наовардам, ман бо амният ташвиш надидам. Аммо ман ҳадди ақали барои таҷрибаомӯзони мо мувофиқро кардам.
  • Дар назар гиред, ки:
    • Папкаи даг бояд ҳам ба нақшакаш ва ҳам барои коргарон дастрас бошад.
    • Ин ба ҳамаи китобхонаҳои тарафи сеюм дахл дорад - ҳамаи онҳо бояд дар мошинҳо бо нақшакаш ва коргарон насб карда шаванд.

Хуб, ҳоло он оддӣ аст:

$ docker-compose up --scale worker=3

Пас аз ҳама чиз баланд мешавад, шумо метавонед ба интерфейсҳои веб нигаред:

Мафҳумҳои асосӣ

Агар шумо дар ҳамаи ин "дагҳо" чизе нафаҳмида бошед, пас ин аст луғати кӯтоҳ:

  • Scheduler - амаки муҳимтарин дар Airflow, ки назорат мекунад, ки роботҳо сахт кор кунанд, на шахс: ҷадвалро назорат мекунад, дагҳоро нав мекунад, вазифаҳоро оғоз мекунад.

    Умуман, дар версияҳои кӯҳна, ӯ бо хотира мушкилот дошт (не, на амнезия, балки ихроҷ) ва параметри меросӣ ҳатто дар конфигуратсияҳо боқӣ монд. run_duration — фосилаи аз нав оғоз кардани он. Аммо ҳоло ҳама чиз хуб аст.

  • DAG (aka "dag") - "графики асикликии равонашуда", аммо чунин таъриф ба кам одамон нақл мекунад, аммо дар асл он як контейнер барои вазифаҳое мебошад, ки бо ҳамдигар ҳамкорӣ мекунанд (ба поён нигаред) ё аналоги Баста дар SSIS ва Workflow дар Informatica .

    Илова ба дагҳо, метавонанд то ҳол зердастҳо бошанд, аммо мо эҳтимол ба онҳо намерасем.

  • DAG Run - дагҳои ибтидоӣ, ки худаш таъин шудааст execution_date. Дагранҳои ҳамон даг метавонанд дар мувозӣ кор кунанд (агар шумо вазифаҳои худро idempotent карда бошед, албатта).
  • Оператор қисмҳои рамзҳо мебошанд, ки барои иҷрои амали мушаххас масъуланд. Се намуди операторҳо мавҷуданд:
    • амалмисли дӯстдоштаи мо PythonOperator, ки метавонад ҳама гуна (эътибор) рамзи Python -ро иҷро кунад;
    • интиқол додан, ки маълумотро аз як ҷо ба ҷои дигар интиқол медиҳанд, мегӯянд, MsSqlToHiveTransfer;
    • санҷандаро аз тарафи дигар, он ба шумо имкон медиҳад, ки то ба амал омадани ҳодиса вокуниш нишон диҳед ё иҷрои минбаъдаи дагро суст кунед. HttpSensor метавонад нуқтаи ниҳоии муайяншударо кашад ва вақте ки посухи дилхоҳ интизор аст, интиқолро оғоз кунед GoogleCloudStorageToS3Operator. Ақли пуртаҷриба мепурсад: «Чаро? Баъд аз ҳама, шумо метавонед такрорҳоро мустақиман дар оператор иҷро кунед! ” Ва он гоҳ, барои он ки ҳавзи вазифаҳоро бо операторони боздошташуда банд накунанд. Сенсор пеш аз кӯшиши навбатӣ оғоз меёбад, месанҷад ва мемирад.
  • Вазифаи - операторони эълоншуда, сарфи назар аз намуд ва ба даг вобаста ба рутбаи вазифа пешбарӣ карда мешаванд.
  • мисоли вазифа - вақте ки банақшагири генералӣ тасмим гирифт, ки вақти он расидааст, ки вазифаҳоро ба корбарон фиристем (дар ҳамон ҷо, агар мо истифода барем. LocalExecutor ё ба гирехи дурдаст дар сурати CeleryExecutor), он ба онҳо контекст таъин мекунад (яъне маҷмӯи тағирёбандаҳо - параметрҳои иҷро), қолабҳои фармон ё дархостро васеъ мекунад ва онҳоро ба ҳам меорад.

Мо вазифаҳоро эҷод мекунем

Аввалан, биёед схемаи умумии догамонро шарҳ диҳем ва сипас мо ба тафсилот бештар ва бештар ғарқ мешавем, зеро мо баъзе ҳалли ночизро истифода мебарем.

Ҳамин тавр, дар соддатарин шакли худ, чунин даг чунин хоҳад буд:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Биёед онро муайян кунем:

  • Аввалан, мо libs заруриро ворид мекунем ва чизи дигар;
  • sql_server_ds Оё List[namedtuple[str, str]] бо номҳои пайвастшавӣ аз Airflow Connections ва пойгоҳи додаҳое, ки мо таблиғи худро мегирем;
  • dag — эълони дагамон, ки бояд хатман дар globals(), вагарна Airflow онро намеёбад. Дуг инчунин бояд бигӯяд:
    • номи вай чист orders - он гоҳ ин ном дар интерфейси веб пайдо мешавад,
    • ки вай аз нисфи шаби хаштуми июль кор мекунад.
    • ва он бояд тақрибан ҳар 6 соат кор кунад (барои бачаҳои сахтгир дар ин ҷо ба ҷои timedelta() қобили қабул аст cron-хат 0 0 0/6 ? * * *, барои камтар сард - ифодаи монанди @daily);
  • workflow() кори асосиро ичро мекунад, вале хозир не. Дар айни замон, мо танҳо контексти худро ба гузориш мепартоем.
  • Ва ҳоло ҷодуи оддии эҷоди вазифаҳо:
    • мо аз сарчашмаҳои худ мегузарем;
    • оғоз кардан PythonOperator, ки дамми моро ичро мекунад workflow(). Фаромӯш накунед, ки номи беназири (дар дохили даг) супориш ва бастани худи даг. Парчам provide_context дар навбати худ, ба функсия далелҳои иловагӣ мерезанд, ки мо бодиққат бо истифода аз он ҷамъ хоҳем кард **context.

Ҳозирча ҳаммаси шу. Он чизе ки мо гирифтем:

  • Даги нав дар интерфейси веб,
  • якунимсад вазифа, ки дар баробари иҷро карда мешаванд (агар ҷараёни ҳаво, танзимоти карафс ва қобилияти сервер ба он имкон диҳанд).

Бале, қариб гирифтам.

Apache Airflow: осонтар кардани ETL
Вобастагиҳоро кӣ насб мекунад?

Барои содда кардани ин ҳама, ман ба кор даромадам docker-compose.yml коркард requirements.txt дар ҳама гиреҳҳо.

Ҳоло он нест:

Apache Airflow: осонтар кардани ETL

Майдонҳои хокистарӣ мисолҳои вазифаҳое мебошанд, ки аз ҷониби нақшакаш коркард мешаванд.

Мо каме интизор мешавем, вазифаҳоро коргарон ба миён меоранд:

Apache Airflow: осонтар кардани ETL

Сабзавотхо, албатта, кори худро бомуваффакият анчом доданд. Сурхҳо чандон муваффақ нестанд.

Воқеан, дар маҳсулоти мо ягон папка нест ./dags, дар байни мошинҳо синхронизатсия вуҷуд надорад - ҳама дагҳо дар git дар Gitlab мо ва Gitlab CI ҳангоми якҷояшавӣ навсозиро ба мошинҳо паҳн мекунад master.

Каме дар бораи Гул

Хангоме ки коргарон сосикахои моро мезананд, биёед асбоби дигареро ба хотир орем, ки ба мо чизе нишон дода метавонад — Гул.

Саҳифаи аввал бо маълумоти мухтасар дар бораи гиреҳҳои корӣ:

Apache Airflow: осонтар кардани ETL

Саҳифаи пурқувваттарин бо вазифаҳое, ки ба кор рафтаанд:

Apache Airflow: осонтар кардани ETL

Саҳифаи дилгиркунанда бо мақоми брокери мо:

Apache Airflow: осонтар кардани ETL

Саҳифаи дурахшонтарин бо графикҳои ҳолати вазифаҳо ва вақти иҷрои онҳост:

Apache Airflow: осонтар кардани ETL

Мо борҳои камборро бор мекунем

Ҳамин тавр, ҳама вазифаҳо иҷро шуданд, шумо метавонед захмиёнро кашед.

Apache Airflow: осонтар кардани ETL

Ва бисьёр ярадорон буданд — бо ин ё он сабаб. Дар сурати истифодаи дурусти Airflow, ин квадратҳо нишон медиҳанд, ки маълумот бешубҳа нарасидааст.

Ба шумо лозим аст, ки сабтро тамошо кунед ва мисолҳои вазифаи афтодаро аз нав оғоз кунед.

Бо пахш кардани ягон мураббаъ, мо амалҳои дастрасро мебинем:

Apache Airflow: осонтар кардани ETL

Шумо метавонед бигиред ва Clear афтода. Яъне, мо фаромӯш мекунем, ки чизе дар он ҷо ноком шудааст ва ҳамон як вазифаи мисол ба нақшакаш меравад.

Apache Airflow: осонтар кардани ETL

Маълум аст, ки ин кор бо муш бо тамоми квадратҳои сурх чандон инсондӯстона нест - ин чизе нест, ки мо аз Airflow интизорем. Табиист, ки мо силоҳҳои қатли ом дорем: Browse/Task Instances

Apache Airflow: осонтар кардани ETL

Биёед ҳама чизро якбора интихоб кунем ва ба сифр барқарор кунем, банди дурустро клик кунед:

Apache Airflow: осонтар кардани ETL

Пас аз тозакунӣ, таксиҳои мо чунин ба назар мерасанд (онҳо аллакай интизоранд, ки нақшакаш онҳоро ба нақша гирад):

Apache Airflow: осонтар кардани ETL

Пайвастҳо, қалмоқҳо ва дигар тағирёбандаҳо

Вақти он расидааст, ки ба DAG оянда нигаред, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Оё ҳама ягон бор навсозии гузоришро анҷом додаанд? Ин боз вай аст: рӯйхати манбаъҳо вуҷуд дорад, ки маълумотро аз куҷо гирифтан мумкин аст; рӯйхат вуҷуд дорад, ки дар куҷо гузоштан лозим аст; Фаромӯш накунед, ки вақте ки ҳама чиз рӯй дод ё вайрон шуд (хуб, ин дар бораи мо нест, не).

Биёед бори дигар файлро аз назар гузаронем ва ба чизҳои нави норавшан назар андозем:

  • from commons.operators import TelegramBotSendMessage - ҳеҷ чиз ба мо халал намерасонад, ки операторони худамонро созем, ки мо аз он бо сохтани як бастаи хурд барои фиристодани паёмҳо ба Unblocked бартарӣ гирифтем. (Дар бораи ин оператор дар зер бештар сӯҳбат хоҳем кард);
  • default_args={} - dag метавонад як далелҳоро ба ҳамаи операторони худ тақсим кунад;
  • to='{{ var.value.all_the_kings_men }}' - майдон to мо коди сахт нахоҳем дошт, аммо бо истифода аз Jinja ва тағирёбанда бо рӯйхати мактубҳо, ки ман онҳоро бодиққат ворид мекунам, ба таври динамикӣ тавлид карда мешавад. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — шарти ба кор андохтани оператор. Дар ҳолати мо, мактуб танҳо ба сардорон парвоз мекунад, агар ҳама вобастагӣ кор кунанд бомуваффакият;
  • tg_bot_conn_id='tg_main' - далелҳо conn_id ID-ҳои пайвастшавӣ, ки мо дар он эҷод мекунем, қабул кунед Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - паёмҳо дар Telegram танҳо дар сурати мавҷуд будани вазифаҳои афтода парвоз мекунанд;
  • task_concurrency=1 - мо оғози ҳамзамон якчанд мисолҳои вазифаҳои як вазифаро манъ мекунем. Дар акси ҳол, мо дар як вақт ба кор андохтани якчанд даст меорем VerticaOperator (ба як миз нигариста);
  • report_update >> [email, tg] - ҳама VerticaOperator дар фиристодани мактубҳо ва паёмҳо, ба монанди инҳо муттаҳид шавед:
    Apache Airflow: осонтар кардани ETL

    Аммо азбаски операторони огоҳкунанда шартҳои гуногуни оғозёбӣ доранд, танҳо яке кор мекунад. Дар намуди дарахт, ҳама чиз каме визуалӣ ба назар мерасад:
    Apache Airflow: осонтар кардани ETL

Дар бораи он чанд сухан мегӯям макросҳо ва дӯстони онҳо - тағирёбандаҳо.

Макросҳо ҷойгузинҳои Ҷинжа мебошанд, ки метавонанд маълумоти гуногуни муфидро дар аргументҳои оператор иваз кунанд. Масалан, ба ин монанд:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} ба мундариҷаи тағирёбандаи контекст васеъ хоҳад шуд execution_date дар формат YYYY-MM-DD: 2020-07-14. Беҳтарин қисми он аст, ки тағирёбандаҳои контекстӣ ба як мисоли вазифаи мушаххас мехкӯб карда мешаванд (як мураббаъ дар Намоиши дарахтон) ва ҳангоми аз нав оғоз кардани ҷойнишинҳо ба ҳамон арзишҳо васеъ мешаванд.

Арзишҳои таъиншударо бо истифода аз тугмаи Rendered дар ҳар як мисоли вазифа дидан мумкин аст. Вазифаи фиристодани мактуб чунин аст:

Apache Airflow: осонтар кардани ETL

Ва ҳамин тавр дар вазифаи фиристодани паём:

Apache Airflow: осонтар кардани ETL

Рӯйхати пурраи макросҳои дарунсохт барои версияи охирини дастрас дар ин ҷо дастрас аст: истинод ба макросҳо

Гузашта аз ин, бо ёрии плагинҳо, мо метавонем макросҳои худро эълон кунем, аммо ин як ҳикояи дигар аст.

Илова ба чизҳои пешакӣ муайяншуда, мо метавонем арзишҳои тағирёбандаҳои худро иваз кунем (ман инро аллакай дар коди боло истифода кардам). биёед дар Admin/Variables чанд чиз:

Apache Airflow: осонтар кардани ETL

Ҳама чизеро, ки шумо метавонед истифода баред:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Арзиш метавонад скаляр бошад ё он метавонад JSON бошад. Дар сурати JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

танҳо роҳро ба калиди дилхоҳ истифода баред: {{ var.json.bot_config.bot.token }}.

Ман аслан як калима мегӯям ва як скриншотро дар бораи он нишон медиҳам пайвастагиҳо. Дар ин ҷо ҳама чиз ибтидоӣ аст: дар саҳифа Admin/Connections мо пайваст эҷод мекунем, логинҳо / паролҳо ва параметрҳои мушаххаси худро дар он ҷо илова мекунем. Монанди ин:

Apache Airflow: осонтар кардани ETL

Паролҳоро метавон рамзгузорӣ кард (назар ба пешфарз амиқтар) ё шумо метавонед намуди пайвастро тарк кунед (чунон ки ман барои он кардам tg_main) - далел дар он аст, ки рӯйхати намудҳо дар моделҳои Airflow пайваст карда шудаанд ва онҳоро бидуни ворид шудан ба кодҳои сарчашма васеъ кардан мумкин нест (агар ногаҳон ман чизеро дар Google нагирифта бошам, лутфан маро ислоҳ кунед), аммо ҳеҷ чиз моро аз гирифтани кредит бозмедорад. ном.

Шумо инчунин метавонед якчанд пайвастҳоро бо як ном созед: дар ин ҳолат, усул BaseHook.get_connection(), ки ба мо пайвандҳо бо номи меорад, хоҳад дод тасодуфӣ аз якчанд номҳо (сохтани Round Robin мантиқтар мебуд, аммо биёед онро ба виҷдони таҳиягарони Airflow гузорем).

Тағйирёбандаҳо ва Пайвастшавӣ албатта абзорҳои олӣ мебошанд, аммо муҳим аст, ки мувозинатро аз даст надиҳед: кадом қисмҳои ҷараёнҳои худро шумо дар худи код нигоҳ медоред ва кадом қисмҳоро ба Airflow барои нигоҳдорӣ медиҳед. Аз як тараф, метавонад ба зудӣ тағир додани арзиш, масалан, қуттии почтаи электронӣ тавассути UI қулай бошад. Аз тарафи дигар, ин ҳанӯз бозгашт ба клики муш аст, ки мо (ман) аз он халос шудан мехостем.

Кор бо пайвандҳо яке аз вазифаҳост қалмоқҳо. Умуман, қалмоқҳои Airflow нуқтаҳои барои пайваст кардани он ба хидматҳо ва китобхонаҳои тарафи сеюм мебошанд. Масалан, JiraHook барои муошират бо Jira муштарӣ мекушояд (шумо метавонед вазифаҳоро ба пеш ва пас интиқол диҳед) ва бо ёрии SambaHook шумо метавонед файли маҳаллиро ба smb-нуқта.

Таҳлили оператори фармоишӣ

Ва мо наздик шудем, ки бубинем, ки он чӣ гуна сохта шудааст TelegramBotSendMessage

рамз commons/operators.py бо оператори воқеӣ:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Дар ин ҷо, мисли ҳама чизи дигар дар Airflow, ҳама чиз хеле содда аст:

  • Аз мерос гирифта шудааст BaseOperator, ки як қатор чизҳои мушаххаси ҷараёни ҳаворо амалӣ мекунад (ба вақти истироҳати худ нигаред)
  • Майдонҳои эълоншуда template_fields, ки дар он Ҷинжа барои коркарди макросҳо ҷустуҷӯ хоҳад кард.
  • Далелҳои дурустро барои __init__(), дар ҷои зарурӣ пешфарзҳоро муқаррар кунед.
  • Мо дар бораи ибтидои ниёгон низ фаромӯш накардаем.
  • Қуттии мувофиқро кушод TelegramBotHookаз он объекти муштариро гирифт.
  • Усули аз нав муайяншуда (аз нав муайяншуда). BaseOperator.execute(), ки Airfow вақте ки вақти ба кор андохтани оператор фаро мерасад, қаҳва мекунад - дар он мо амали асосиро иҷро карда, ворид шуданро фаромӯш мекунем. (Дар омади гап, мо ворид мешавем stdout и stderr - Ҷараёни ҳаво ҳама чизро дарбар мегирад, онро зебо печонед ва дар ҷои зарурӣ онро таҷзия мекунад.)

Биёед бубинем, ки мо чӣ дорем commons/hooks.py. Қисми якуми файл бо худи қалмоқ:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ман ҳатто намедонам, ки дар ин ҷо чӣ шарҳ диҳам, ман танҳо нуктаҳои муҳимро қайд мекунам:

  • Мо мерос мегирем, дар бораи далелҳо фикр мекунем - дар аксари ҳолатҳо он як хоҳад буд: conn_id;
  • Бартарии усулҳои стандартӣ: Ман худамро маҳдуд кардам get_conn(), ки дар он ман параметрҳои пайвастшавӣ аз рӯи ном ва танҳо ба даст фасли extra (ин майдони JSON аст), ки дар он ман (тибқи дастурҳои худам!) аломати боти Telegram -ро гузоштам: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ман як мисоли моро эҷод мекунам TelegramBot, додани он аломати мушаххас.

Ҳамааш ҳамин. Шумо метавонед муштариро аз қалмоқ истифода баред TelegramBotHook().clent ё TelegramBotHook().get_conn().

Ва қисми дуюми файл, ки дар он ман барои API Telegram REST микропрепарат месозам, то ҳамон чизеро кашола накунам. python-telegram-bot барои як усул sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Роҳи дуруст ин аст, ки ҳама чизро илова кунед: TelegramBotSendMessage, TelegramBotHook, TelegramBot - дар плагин, дар як анбори ҷамъиятӣ ҷойгир кунед ва онро ба кушодаасос диҳед.

Ҳангоме ки мо ин ҳамаро меомӯзем, навсозии гузориши мо муяссар шуд, ки бомуваффақият ноком шаванд ва дар канал ба ман паёми хато фиристанд. Ман меравам тафтиш кунам, ки оё ин нодуруст аст ...

Apache Airflow: осонтар кардани ETL
Дар доги мо чизе шикаст! Оё ин чизе нест, ки мо интизор будем? Айнан!

Шумо рехтед?

Оё шумо ҳис мекунед, ки ман чизеро аз даст додаам? Чунин ба назар мерасад, ки вай ваъда дода буд, ки маълумотро аз SQL Server ба Vertica интиқол медиҳад ва сипас онро гирифта, аз мавзӯъ дур шуд, харом!

Ин ваҳшиёна қасдан буд, ман маҷбур будам, ки барои шумо баъзе истилоҳотро ифшо кунам. Акнун шумо метавонед бештар равед.

Нақшаи мо чунин буд:

  1. Кунед
  2. Эҷоди вазифаҳо
  3. Бубинед, ки ҳама чиз чӣ қадар зебост
  4. Рақамҳои сессияро барои пур кардан таъин кунед
  5. Маълумотро аз SQL Server гиред
  6. Маълумотро ба Vertica гузоред
  7. Ҷамъоварии омор

Ҳамин тавр, барои ба кор даровардани ин ҳама, ман ба мо як каме илова кардам docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Дар он ҷо мо баланд мекунем:

  • Vertica ҳамчун мизбон dwh бо танзимоти пешфарзтарин,
  • се мисоли SQL Server,
  • мо пойгоҳи додаҳои охиринро бо баъзе маълумотҳо пур мекунем (ба ҳеҷ ваҷҳ ба mssql_init.py!)

Мо ҳама чизҳои хубро бо ёрии фармони каме мураккабтар аз дафъаи гузашта оғоз мекунем:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Он чизе ки рандомизатори мӯъҷизаи мо тавлид кардааст, шумо метавонед ашёро истифода баред Data Profiling/Ad Hoc Query:

Apache Airflow: осонтар кардани ETL
Муҳим он аст, ки онро ба таҳлилгарон нишон надиҳед

муфассалтар кунед Сеансҳои ETL Ман намехоҳам, дар он ҷо ҳама чиз ночиз аст: мо пойгоҳ месозем, дар он нишона ҳаст, мо ҳама чизро бо менеҷери контекст мепӯшем ва ҳоло ин корро мекунем:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Вакти он расидааст маълумоти моро ҷамъ кунед аз якунимсад мизи мо. Биёед инро бо ёрии хатҳои хеле содда иҷро кунем:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Бо ёрии қалмоқе, ки мо аз Airflow мегирем pymssql-пайваст кунед
  2. Биёед маҳдудиятро дар шакли сана дар дархост иваз кунем - он аз ҷониби муҳаррики шаблон ба функсия партофта мешавад.
  3. Дархости мо pandasкй моро мегирад DataFrame — дар оянда ба мо фоиданок мешавад.

Ман ивазкуниро истифода мебарам {dt} ба ҷои параметри дархост %s на аз он сабаб, ки ман Пиноккио бад ҳастам, балки аз он сабаб pandas аз ӯҳдаи худ гирифта наметавонад pymssql ва охиринашро мепартояд params: Listгарчанде ки вай дар хакикат мехохад tuple.
Инчунин қайд кунед, ки таҳиякунанда pymssql тасмим гирифт, ки дигар ӯро дастгирӣ накунад ва вақти он расидааст, ки берун равед pyodbc.

Биёед бубинем, ки Airflow далелҳои функсияҳои моро бо чӣ пур кардааст:

Apache Airflow: осонтар кардани ETL

Агар ягон маълумот мавҷуд набошад, пас идома додан маъно надорад. Аммо ин ҳам аҷиб аст, ки пур кардани онро бомуваффақият ҳисоб кунед. Аммо ин хато нест. А-а-а, чй бояд кард?! Ва ин аст он чизе:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException ба Airflow мегӯяд, ки ҳеҷ гуна хатогӣ вуҷуд надорад, аммо мо супоришро мегузаронем. Интерфейс майдони сабз ё сурх надорад, аммо гулобӣ.

Биёед маълумоти худро партоем сутунҳои сершумор:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Яъне:

  • Махзани маълумоте, ки мо аз он фармоиш гирифтаем,
  • ID-и сессияи обхезии мо (он гуногун хоҳад буд барои ҳар як вазифа),
  • Хеш аз манбаъ ва ID фармоиш - ба тавре ки дар базаи ниҳоӣ (дар он ҷо ҳама чиз ба як ҷадвал рехта мешавад) мо ID-и фармоишии беназир дорем.

Қадами охирин боқӣ мемонад: ҳама чизро ба Vertica резед. Ва, аҷиб аст, ки яке аз роҳҳои ҷолибтарин ва самараноки ин тавассути CSV аст!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Мо приёмникхои махсус тайёр карда истодаем StringIO.
  2. pandas моро бо эхтиром мегузорад DataFrame дар шакли CSV-хатҳо.
  3. Биёед бо қалмоқ ба Vertica дӯстдоштаамон пайваст шавем.
  4. Ва ҳоло бо кӯмаки copy() маълумоти моро бевосита ба Вертика фиристед!

Мо аз ронанда чанд сатр пур кардашударо мегирем ва ба мудири сеанс мегӯем, ки ҳамааш хуб аст:

session.loaded_rows = cursor.rowcount
session.successful = True

Ҳамааш ҳамин.

Дар фурӯш, мо лавҳаи ҳадафро дастӣ эҷод мекунем. Дар ин ҷо ман ба худ як мошини хурд иҷозат додам:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Ман истифода мебарам VerticaOperator() Ман схемаи пойгоҳи додаҳо ва ҷадвал эҷод мекунам (агар онҳо аллакай вуҷуд надошта бошанд, албатта). Хӯроки асосии он аст, ки дуруст ба танзим даровардани вобастагӣ:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Натиҷа

— Хайр, — гуфт муши хурдакак, — не, холо
Оё шумо боварӣ доред, ки ман даҳшатноктарин ҳайвони ҷангал ҳастам?

Ҷулия Доналдсон, The Gruffalo

Ман фикр мекунам, ки агар ман ва ҳамкасбони ман рақобат дошта бошем: кӣ раванди ETL-ро аз сифр зуд эҷод мекунад ва оғоз мекунад: онҳо бо SSIS ва муш ва ман бо Airflow ... Ва он гоҳ мо инчунин осонии нигоҳубинро муқоиса мекардем ... Вой, ман фикр мекунам, ки шумо розӣ мешавед, ки ман онҳоро дар ҳама ҷабҳаҳо мезанам!

Агар каме ҷиддӣтар бошад, пас Apache Airflow - бо тавсифи равандҳо дар шакли коди барнома - кори маро иҷро кард хеле зиёд бароҳаттар ва лаззатбахштар.

Тавсеаи номаҳдуди он ҳам аз ҷиҳати плагинҳо ва ҳам майл ба миқёспазирӣ ба шумо имкон медиҳад, ки Airflow-ро тақрибан дар ҳама соҳаҳо истифода баред: ҳатто дар давраи пурраи ҷамъоварӣ, омодасозӣ ва коркарди маълумот, ҳатто ҳангоми сар додани мушакҳо (ба Миррих, курс).

Қисми ниҳоӣ, маълумотнома ва маълумот

Раке, ки мо барои шумо ҷамъ овардаем

  • start_date. Бале, ин аллакай як хотираи маҳаллӣ аст. Тавассути далели асосии Дуг start_date ҳама мегузарад. Хулоса, агар шумо дар start_date санаи ҷорӣ ва schedule_interval — як руз, баъд ДАГ пагох на пештар сар мешавад.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Ва дигар мушкилот нест.

    Боз як хатои вақти корӣ бо он алоқаманд аст: Task is missing the start_date parameter, ки аксар вақт нишон медиҳад, ки шумо пайваст кардани операторро фаромӯш кардаед.

  • Ҳама дар як мошин. Бале, ва пойгоҳҳо (худи Airflow ва қабати мо) ва веб-сервер, нақшакаш ва коргарон. Ва он ҳатто кор мекард. Аммо бо гузашти вақт, шумораи вазифаҳо барои хидматҳо афзоиш ёфт ва вақте ки PostgreSQL ба индекс дар 20 сония ба ҷои 5 мс посух дод, мо онро гирифта бурдем.
  • Иҷрокунандаи маҳаллӣ. Бале, мо хануз дар болои он нишастаем ва аллакай ба лаби варта расидаем. LocalExecutor то ҳол барои мо кофӣ буд, аммо ҳоло вақти он расидааст, ки ҳадди аққал як коргарро васеъ кунем ва мо бояд сахт меҳнат кунем, то ба CeleryExecutor гузарем. Ва бо назардошти он, ки шумо метавонед бо он дар як мошин кор кунед, ҳеҷ чиз шуморо аз истифодаи Celery ҳатто дар сервер, ки "албатта, ҳеҷ гоҳ ба истеҳсолот ворид намешавад, ростқавлона!"
  • Истифодаи ғайриманқул асбобҳои дарунсохт:
    • Пайвастшавӣ барои нигоҳ доштани маълумоти хидматӣ,
    • Садақаҳои SLA ҷавоб додан ба вазифаҳое, ки сари вақт иҷро нашуданд,
    • xcom барои мубодилаи метамаълумот (ман гуфтам ҳадафмаълумот!) байни вазифахои даг.
  • сӯиистифода аз почта. Хуб, ман чӣ гуфта метавонам? Огоҳӣ барои ҳама такрори вазифаҳои афтода муқаррар карда шуд. Ҳоло дар Gmail кори ман зиёда аз 90 ҳазор мактубҳои Airflow дорад ва мозаи почтаи интернетӣ аз гирифтан ва нест кардани зиёда аз 100 дар як вақт худдорӣ мекунад.

Мушкилоти бештар: Pitfails Airflow Apache

Воситаҳои автоматикунонии бештар

Барои он ки мо бо сари худ бештар кор кунем, на бо дастҳоямон, Airflow барои мо инро омода кардааст:

  • оромии API — то хол макоми Эксперименталиро дорад, ки ин ба кор халал намерасонад. Бо он, шумо метавонед на танҳо дар бораи дагҳо ва вазифаҳо маълумот гиред, балки инчунин дагро қатъ/оғоз кунед, DAG Run ё ҳавз эҷод кунед.
  • CLI - асбобҳои зиёде тавассути сатри фармон дастрасанд, ки истифодаи онҳо тавассути WebUI на танҳо нороҳат аст, балки умуман вуҷуд надоранд. Барои намуна:
    • backfill барои аз нав оғоз кардани мисолҳои вазифа лозим аст.
      Масалан, тахлилгарон омада гуфтанд: «Ва шумо, рафикон, дар маълумотхои аз 1 то 13 январь сафсатае доред! Ислоҳ кунед, ислоҳ кунед, ислоҳ кунед, ислоҳ кунед! ” Ва шумо чунин як ошпаз ҳастед:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Хадамоти базавӣ: initdb, resetdb, upgradedb, checkdb.
    • run, ки ба шумо имкон медиҳад, ки як вазифаи мисолиро иҷро кунед ва ҳатто дар ҳама вобастагӣ холҳоро ба даст оред. Илова бар ин, шумо метавонед онро тавассути LocalExecutor, ҳатто агар шумо кластери карафс дошта бошед.
    • Қариб ҳамон корро мекунад test, танҳо дар базаҳо чизе наменависад.
    • connections имкон медихад, ки аз пнёх ба таври оммавй алокахо барпо карда шаванд.
  • API Python - як роҳи хеле сахти мутақобила, ки барои плагинҳо пешбинӣ шудааст ва дар он бо дастони хурд гирд намеояд. Аммо кй моро аз рафтан бозмедорад /home/airflow/dags, давидан ipython ва сар задан дар атрофи? Шумо метавонед, масалан, ҳама пайвастҳоро бо рамзи зерин содир кунед:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Пайвастшавӣ ба метабазаи Airflow. Ман ба он навиштанро тавсия намедиҳам, аммо гирифтани ҳолати вазифаҳо барои ченакҳои мушаххас метавонад нисбат ба истифодаи ҳама гуна APIҳо хеле зудтар ва осонтар бошад.

    Биёед бигӯем, ки на ҳама вазифаҳои мо беэътиноӣ мекунанд, аммо онҳо баъзан метавонанд афтоданд ва ин муқаррарӣ аст. Аммо якчанд блокҳо аллакай шубҳаноканд ва тафтиш кардан лозим аст.

    SQL-ро эҳтиёт кунед!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

мурожиат

Ва албатта, даҳ истиноди аввал аз нашри Google мундариҷаи ҷузвдони Airflow аз хатчӯбҳои ман мебошанд.

Ва истинодҳое, ки дар мақола истифода шудаанд:

Манбаъ: will.com