Apache Airflow: ETLди жеңилдетүү

Салам, мен Дмитрий Логвиненко - Vezet компаниялар тобунун аналитика бөлүмүнүн инженери.

Мен сизге ETL процесстерин өнүктүрүү үчүн сонун курал - Apache Airflow жөнүндө айтып берем. Бирок Airflow ушунчалык ар тараптуу жана көп кырдуу болгондуктан, сиз маалымат агымына катышпасаңыз дагы, аны жакшылап карап чыгышыңыз керек, бирок мезгил-мезгили менен ар кандай процесстерди ишке киргизип, алардын аткарылышын көзөмөлдөө керек.

Ооба, мен айтып эле койбостон, көрсөтөм: программада көптөгөн коддор, скриншоттор жана сунуштар бар.

Apache Airflow: ETLди жеңилдетүү
Google'да Airflow / Wikimedia Commons деген сөздү көбүнчө көрөсүз

Мазмуну

тааныштыруу

Apache Airflow Django сыяктуу:

  • питондо жазылган
  • сонун администратор панели бар,
  • чексиз кеңейтилет

- бир гана жакшыраак жана ал такыр башка максаттар үчүн жасалган, атап айтканда (ката алдында жазылгандай):

  • чексиз сандагы машиналарда тапшырмаларды аткаруу жана көзөмөлдөө (көптөгөн Сельдерей / Кубернетес жана абийириңиз сизге уруксат берсе)
  • Python кодун жазуу жана түшүнүү үчүн абдан жеңилден динамикалык иштөө процесси менен
  • жана даяр компоненттерди жана үйдө жасалган плагиндерди колдонуу менен ар кандай маалымат базаларын жана API'лерди бири-бири менен туташтыруу мүмкүнчүлүгү (бул өтө жөнөкөй).

Биз Apache Airflow колдонобуз:

  • биз DWH жана ODS (бизде Vertica жана Clickhouse бар) ар кандай булактардан (көптөгөн SQL Server жана PostgreSQL инстанциялары, колдонмо көрсөткүчтөрү менен ар кандай API'лер, жада калса 1C) маалыматтарды чогултабыз.
  • канчалык өнүккөн cron, ал ODS боюнча маалыматтарды консолидациялоо процесстерин баштайт, ошондой эле алардын сакталышын көзөмөлдөйт.

Жакынкы убакка чейин биздин муктаждыктарыбызды 32 өзөктүү жана 50 ГБ оперативдүү эс тутум менен бир кичинекей сервер камсыздап келген. Аба агымында бул иштейт:

  • дагы 200 даг (чындыгында биз тапшырмаларды толтурган иш процесстери),
  • ар биринде орточо 70 тапшырма,
  • бул жакшылык башталат (ошондой эле орточо) саатына бир жолу.

Кантип кеңейгенибиз жөнүндө мен төмөндө жазам, бирок эми биз чече турган über-проблеманы аныктайлы:

Үч оригиналдуу SQL сервери бар, алардын ар биринде 50 маалымат базасы бар - бир долбоордун инстанциялары, тиешелүүлүгүнө жараша, алар бирдей структурага ээ (дээрлик бардык жерде, муа-ха-ха), бул ар биринде Буйрутмалар таблицасы (бактыга жараша, таблица бар) дегенди билдирет. аты каалаган бизнеске түртүлүшү мүмкүн). Биз кызмат талааларын (булак сервери, булак маалымат базасы, ETL тапшырма идентификатору) кошуу менен маалыматтарды алабыз жана аларды, айталы, Verticaга ыргытабыз.

Кеттик!

Негизги бөлүгү, практикалык (жана бир аз теориялык)

Эмне үчүн биз (жана сен)

Бак-дарактар ​​чоң болуп, мен жөнөкөй элем SQL-schik бир орус чекене соодасында, биз ETL процесстерин, башкача айтканда, маалымат агымдарын, бизге жеткиликтүү болгон эки куралды колдонуп алдадык:

  • Информатика энергия борбору - өтө жайылган система, өтө өндүрүмдүү, өзүнүн аппараттык каражаттары, өзүнүн версиясы бар. Мен анын мүмкүнчүлүктөрүн Кудай сактасын 1% колдондум. Неге? Биринчиден, 380-жылдардагы бул интерфейс бизге психикалык жактан басым жасады. Экинчиден, бул карама-каршылык абдан кооз процесстерге, ачууланган компоненттерди кайра колдонууга жана башка абдан маанилүү ишкана-трюктарга арналган. Airbus AXNUMX канаты сыяктуу, анын баасы жөнүндө биз эч нерсе айтпайбыз.

    Абайлаңыз, скриншот 30 жашка чейинки адамдарга бир аз зыян келтириши мүмкүн

    Apache Airflow: ETLди жеңилдетүү

  • SQL Server Integration Server — бул жолдошту биз езубуздун про-екттин ичиндеги агымдарда колдондук. Ооба, чындыгында: биз буга чейин SQL Server колдонобуз жана анын ETL куралдарын колдонбоо кандайдыр бир негизсиз болмок. Андагы бардыгы жакшы: интерфейс да сонун, прогресс боюнча отчеттор да... Бирок биз программалык продуктыларды жакшы көргөнүбүз үчүн эмес, бул үчүн эмес. Анын версиясы dtsx (сактоодо аралаштырылган түйүндөр менен XML) биз жасай алабыз, бирок мунун мааниси эмнеде? Жүздөгөн таблицаларды бир серверден экинчисине сүйрө турган тапшырмалар пакетин жасоо жөнүндө эмне айтууга болот? Ооба, кандай жүз, сөөмөйүңүз чычкан баскычын басып, жыйырма бөлүктөн түшүп калат. Бирок, албетте, модалуураак көрүнөт:

    Apache Airflow: ETLди жеңилдетүү

Биз албетте чыгуунун жолдорун издедик. Жада калса дээрлик өз алдынча жазылган SSIS пакетинин генераторуна келди ...

... анан мага жаңы жумуш табылды. Ал эми Apache Airflow мени басып өттү.

Мен ETL процессинин сүрөттөмөлөрү жөнөкөй Python коду экенин билгенде, мен жөн гана кубанычтан бийлеген жокмун. Берилиш агымдары ушундайча версияланган жана айырмаланган жана жүздөгөн маалымат базаларынан бир максатка бир структурадагы таблицаларды куюу бир жарым же эки 13 ”экранда Python кодунун маселеси болуп калды.

Кластерди чогултуу

Келгиле, толугу менен бала бакчаны уюштурбайлы жана бул жерде Airflow, сиз тандаган маалымат базасын орнотуу, сельдерей жана доктарда сүрөттөлгөн башка учурлар жөнүндө айтпай эле коелу.

Ошентип, биз дароо эксперименттерди баштоо үчүн, мен эскиз docker-compose.yml анда:

  • Чындыгында көтөрөлү Аба агымы: Пландаштыруучу, Вебсервер. Гүл сельдерей тапшырмаларын көзөмөлдөө үчүн ал жерде айланат (анткени ал мурунтан эле түртүлгөн apache/airflow:1.10.10-python3.7, бирок биз каршы эмеспиз)
  • PostgreSQL, анда Airflow өзүнүн тейлөө маалыматын жазат (график маалыматтары, аткаруу статистикасы ж.б.), ал эми Сельдерей аткарылган тапшырмаларды белгилейт;
  • Redis, сельдерей үчүн тапшырма брокери катары иш алып барат;
  • Сельдерей жумушчу, тапшырмаларды тузден-туз аткаруу менен алек болот.
  • Папкага ./dags биз файлдарыбызды дагдардын сүрөттөлүшү менен кошобуз. Аларды чымындап алып кетишет, андыктан ар бир чүчкүргөндөн кийин бүт стекти жонглёрдун кереги жок.

Кээ бир жерлерде мисалдардагы код толук көрсөтүлбөйт (текстти баш аламан болбош үчүн), бирок кайсы бир жерде процессте өзгөртүлгөн. Толук жумушчу коддун мисалдарын репозиторийден тапса болот https://github.com/dm-logv/airflow-tutorial.

ютуб-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Эскертүүлөр:

  • Композицияны жыйноодо мен негизинен белгилүү образга таяндым пукель/докер-аба агымы - аны текшерип көргүлө. Балким жашооңузда башка эч нерсе керек эместир.
  • Бардык Airflow орнотуулары аркылуу гана эмес, жеткиликтүү airflow.cfg, бирок ошондой эле айлана-чөйрө өзгөрмөлөрү аркылуу (иштеп чыгуучуларга рахмат), мен аларды кара ниеттик менен пайдаландым.
  • Албетте, ал өндүрүшкө даяр эмес: мен атайылап контейнерлерге жүрөктүн согушун койгон жокмун, коопсуздук менен убара болгон жокмун. Бирок мен биздин эксперименттер үчүн ылайыктуу минимум жасадым.
  • Белгилей кетчү нерсе:
    • Даг папка пландоочуга да, жумушчуларга да жеткиликтүү болушу керек.
    • Ошол эле бардык үчүнчү тараптын китепканаларына тиешелүү - алардын бардыгы пландоочу жана жумушчулары бар машиналарга орнотулушу керек.

Ооба, азыр бул жөнөкөй:

$ docker-compose up --scale worker=3

Баары көтөрүлгөндөн кийин, веб-интерфейстерди карасаңыз болот:

негизги түшүнүктөр

Эгер сиз бул "дагдардын" баарында эч нерсени түшүнбөсөңүз, анда бул жерде кыскача сөздүк:

  • Scheduler - роботтордун адам эмес, катуу иштешин көзөмөлдөгөн Airflow ичиндеги эң маанилүү агасы: графикти көзөмөлдөйт, дагдарды жаңыртып, тапшырмаларды ишке киргизет.

    Жалпысынан алганда, эски версияларда анын эс тутумунда көйгөйлөр болгон (жок, амнезия эмес, бирок агып кетүү) жана мурастык параметр конфигурацияларда да калган. run_duration — анын кайра баштоо аралыгы. Бирок азыр баары жакшы.

  • ДАГ (ака "даг") - "багытталган ациклдик график", бирок мындай аныктама аз эле адамдарга айтып берет, бирок чындыгында бул бири-бири менен өз ара аракеттенүүчү тапшырмалар үчүн контейнер (төмөндө караңыз) же SSISдеги пакеттин аналогу жана Informaticaдагы Workflow. .

    Дагдардан тышкары, дагы эле субдагдар болушу мүмкүн, бирок биз аларга жете албайбыз.

  • DAG Run - инициализацияланган даг, ал өзүнө таандык execution_date. Ошол эле дагдын даграндары параллелдүү иштей алат (эгер сиз өзүңүздүн милдеттериңизди идемпотенттүү кылып койгон болсоңуз, албетте).
  • Оператор белгилүү бир иш-аракетти аткаруу үчүн жооптуу код бөлүктөрү болуп саналат. Операторлордун үч түрү бар:
    • иш-аракеттербиздин сүйүктүү сыяктуу PythonOperator, каалаган (жарамдуу) Python кодун аткара алат;
    • ташуумаалыматтарды бир жерден экинчи жерге ташыган, айталы, MsSqlToHiveTransfer;
    • сенсор экинчи жагынан, бул окуя болгонго чейин дагдын андан ары аткарылышын реакцияга же жайлатууга мүмкүндүк берет. HttpSensor көрсөтүлгөн акыркы чекитти тарта алат жана керектүү жооп күтүп турганда, которууну баштаңыз GoogleCloudStorageToS3Operator. Кызыккан акыл сурайт: «Эмне үчүн? Кантсе да кайталоолорду оператордо эле жасай аласың!” Андан кийин, убактылуу токтотулган операторлор менен милдеттердин бассейнин жабыш үчүн эмес. Сенсор иштей баштайт, текшерет жана кийинки аракетке чейин өлөт.
  • маселе - жарыяланган операторлор, түрүнө карабастан жана дагга тиркелет.
  • тапшырма мисалы - башкы пландоочу тапшырмаларды аткаруучу-жумушчуларга салгылашууга убакыт келди деп чечкенде (эгер биз колдонсок, ошол жерде LocalExecutor же учурда алыскы түйүнгө CeleryExecutor), аларга контекстти дайындайт (б.а. өзгөрмөлөрдүн жыйындысы – аткаруу параметрлери), буйрук же суроо шаблондорун кеңейтет жана аларды бириктирет.

Биз тапшырмаларды жаратабыз

Биринчиден, келгиле, догдурдун жалпы схемасын аныктап көрөлү, андан кийин биз майда-чүйдөсүнө чейин тереңирээк карайбыз, анткени биз кээ бир маанилүү эмес чечимдерди колдонобуз.

Ошентип, жөнөкөй түрдө, мындай даг төмөнкүдөй болот:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Аны аныктап көрөлү:

  • Биринчиден, биз керектүү libs жана импорттоо башка нерсе;
  • sql_server_ds - аны List[namedtuple[str, str]] Airflow Connections'тен туташуулардын аттары жана пластинкабызды ала турган маалымат базалары менен;
  • dag - сөзсүз түрдө болушу керек болгон биздин дагыбызды жарыялоо globals(), антпесе Airflow аны таба албайт. Даг да айтышы керек:
    • анын аты ким orders - бул аталыш анда веб-интерфейсте пайда болот,
    • ал сегизинчи июлда түн ортосунан тарта иштей турганын,
    • жана ал болжол менен ар бир 6 саат сайын иштеши керек (бул жерде катаал балдар үчүн timedelta() жол берилген cron-сап 0 0 0/6 ? * * *, азыраак үчүн - сыяктуу туюнтма @daily);
  • workflow() негизги ишти аткарат, бирок азыр эмес. Азырынча биз контекстибизди журналга таштайбыз.
  • Эми тапшырмаларды түзүүнүн жөнөкөй сыйкыры:
    • биз өзүбүздүн булактар ​​аркылуу иштейбиз;
    • инициализациялоо PythonOperator, биздин муляжыбызды аткара турган workflow(). Тапшырманын уникалдуу (дагдын ичинде) атын көрсөтүүнү жана дагдын өзүн байлоону унутпаңыз. Желек provide_context өз кезегинде функцияга кошумча аргументтерди төкөт, аларды биз кылдаттык менен чогултабыз **context.

Азырынча баары ушул. Бизде эмне бар:

  • веб-интерфейсиндеги жаңы дагдар,
  • параллелдүү аткарыла турган бир жарым жүз тапшырма (эгер Аба агымы, Сельдерей орнотуулары жана сервердин сыйымдуулугу уруксат берсе).

Ооба, дээрлик түшүндүм.

Apache Airflow: ETLди жеңилдетүү
Көз карандылыкты ким орнотот?

Мунун баарын жөнөкөйлөтүү үчүн, мен бурулдум docker-compose.yml иштетүү requirements.txt бардык түйүндөрдө.

Эми жок болуп кетти:

Apache Airflow: ETLди жеңилдетүү

Боз чарчы - пландоочу тарабынан иштелип чыккан тапшырма инстанциялары.

Биз бир аз күтө турабыз, жумушчулар тапшырмаларды аткарышат:

Apache Airflow: ETLди жеңилдетүү

Жашылдар, албетте, ишин ийгиликтуу аякташты. Кызылдар анча ийгиликтүү эмес.

Баса, биздин продюсерде папка жок ./dags, машиналар ортосунда эч кандай синхрондоштуруу жок - бардык дагдар жатат git биздин Gitlab жана Gitlab CI биригүүдө машиналарга жаңыртууларды таратат master.

Гүл жөнүндө бир аз

Жумушчулар биздин соускаларыбызды соруп жатканда, бизге бир нерсени көрсөтө турган дагы бир куралды эстейли - Гүл.

Жумушчу түйүндөрү боюнча кыскача маалымат менен эң биринчи бет:

Apache Airflow: ETLди жеңилдетүү

Жумушка кеткен тапшырмалар менен эң күчтүү барак:

Apache Airflow: ETLди жеңилдетүү

Биздин брокердин статусу менен эң кызыксыз барак:

Apache Airflow: ETLди жеңилдетүү

Эң жаркыраган бет тапшырманын статусунун графиктери жана алардын аткарылуу убактысы:

Apache Airflow: ETLди жеңилдетүү

Биз аз жүктөлгөндү жүктөйбүз

Ошентип, бардык тапшырмалар иштелип чыкты, сиз жарадарларды алып кете аласыз.

Apache Airflow: ETLди жеңилдетүү

Жана көптөгөн жарадар болгон - тигил же бул себептерден улам. Аба агымын туура колдонгон учурда, бул төрт бурчтар маалыматтар сөзсүз түрдө келбегендигин көрсөтүп турат.

Сиз журналды көрүп, түшкөн тапшырма инстанцияларын кайра башташыңыз керек.

Каалаган квадратты чыкылдатуу менен, биз колдо болгон аракеттерди көрөбүз:

Apache Airflow: ETLди жеңилдетүү

Сиз жыгылгандарды алып, тазалай аласыз. Башкача айтканда, биз ал жерде бир нерсе ишке ашпай калганын унутуп, ошол эле тапшырма пландоочуга барат.

Apache Airflow: ETLди жеңилдетүү

Муну бардык кызыл квадраттар менен чычкан менен жасоо өтө гумандуу эмес экени түшүнүктүү - бул биз Airflowдан күткөн нерсе эмес. Албетте, бизде массалык кыргын салуучу куралдар бар: Browse/Task Instances

Apache Airflow: ETLди жеңилдетүү

Келгиле, баарын дароо тандап, нөлгө кайтаралы, туура нерсени чыкылдатыңыз:

Apache Airflow: ETLди жеңилдетүү

Тазалангандан кийин, биздин таксилер төмөнкүдөй көрүнөт (алар пландоочунун аларды пландаштырышын күтүп жатышат):

Apache Airflow: ETLди жеңилдетүү

Туташуулар, илгичтер жана башка өзгөрмөлөр

Кийинки DAGды кароого убакыт келди, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Ар бир адам качандыр бир убакта отчетту жаңыртыштыбы? Бул дагы ал: маалыматтарды кайдан алууга боло турган булактардын тизмеси бар; коюунун тизмеси бар; баары болгондо же бузулганда коңгуроону унутпаңыз (жакшы, бул биз жөнүндө эмес, жок).

Келгиле, файлды кайрадан карап чыгалы жана жаңы түшүнүксүз нерселерди карап көрөлү:

  • from commons.operators import TelegramBotSendMessage - Бизге өз операторлорубузду жасоого эч нерсе тоскоол болбойт, аны биз блоктон чыгарылганга билдирүүлөрдү жөнөтүү үчүн кичинекей капкак жасоо менен колдондук. (Төмөндө бул оператор тууралуу кененирээк сүйлөшөбүз);
  • default_args={} - даг бардык операторлоруна бирдей аргументтерди тарата алат;
  • to='{{ var.value.all_the_kings_men }}' - талаа to бизде катуу коддолгон эмес, бирок Jinja жана мен кылдаттык менен киргизген электрондук каттардын тизмеси бар өзгөрмөнүн жардамы менен динамикалык түрдө түзүлөт. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — операторду ишке киргизүүнүн шарты. Биздин учурда, кат бардык көз карандылыкты иштеп чыкканда гана жетекчилерге учуп кетет ийгиликтүү;
  • tg_bot_conn_id='tg_main' - аргументтер conn_id биз түзгөн байланыш идентификаторлорун кабыл алыңыз Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Telegramдагы билдирүүлөр аткарылбай калган тапшырмалар болгондо гана учуп кетет;
  • task_concurrency=1 - бир тапшырманын бир нече тапшырма инстанцияларын бир эле учурда ишке киргизүүгө тыюу салабыз. Болбосо, биз бир эле учурда бир нече ишке алабыз VerticaOperator (бир столду карап);
  • report_update >> [email, tg] - баары VerticaOperator каттарды жана билдирүүлөрдү жөнөтүүдө биригишет, мисалы:
    Apache Airflow: ETLди жеңилдетүү

    Бирок кабарлоочу операторлордун ишке киргизүү шарттары башка болгондуктан, бирөө гана иштейт. Дарак көрүнүшүндө баары бир аз визуалдык көрүнөт:
    Apache Airflow: ETLди жеңилдетүү

жөнүндө бир нече сөз айта кетейин макрос жана алардын достору - өзгөрмөлөр.

Макрос - бул Jinja толтургучтары, алар ар кандай пайдалуу маалыматты оператордун аргументтерине алмаштыра алат. Мисалы, бул сыяктуу:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} контексттик өзгөрмөнүн мазмунуна кеңейет execution_date түрдө YYYY-MM-DD: 2020-07-14. Эң жакшы жери, контексттик өзгөрмөлөр конкреттүү тапшырма инстанциясына (Дарак көрүнүшүндөгү чарчы) кадалат жана кайра иштетилгенде, толтургучтар ошол эле маанилерге чейин кеңейет.

Белгиленген маанилерди ар бир тапшырма инстанциясындагы Көрсөтүлгөн баскычы аркылуу көрүүгө болот. Кат жөнөтүү милдети мына ушундай:

Apache Airflow: ETLди жеңилдетүү

Ошентип, билдирүү жөнөтүү тапшырмасында:

Apache Airflow: ETLди жеңилдетүү

Акыркы жеткиликтүү версия үчүн орнотулган макростордун толук тизмеси бул жерде жеткиликтүү: макрос шилтемеси

Анын үстүнө, плагиндердин жардамы менен биз өзүбүздүн макросторубузду жарыялай алабыз, бирок бул башка окуя.

Алдын ала аныкталган нерселерден тышкары, биз өзгөрмөлөрүбүздүн маанилерин алмаштыра алабыз (мен муну жогорудагы коддо колдонгонмун). Келгиле, киргизели Admin/Variables бир нече нерсе:

Apache Airflow: ETLди жеңилдетүү

Сиз колдоно ала турган нерселердин бардыгы:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Маани скаляр болушу мүмкүн же JSON да болушу мүмкүн. JSON учурда:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

жөн гана каалаган ачкычтын жолун колдонуу: {{ var.json.bot_config.bot.token }}.

Мен түзмө-түз бир сөз айтып, бир скриншот көрсөтөм байланыштар. Бул жерде баары жөнөкөй: бетте Admin/Connections биз байланыш түзүп, логиндерди / сырсөздөрдү жана конкреттүү параметрлерди кошобуз. Бул сыяктуу:

Apache Airflow: ETLди жеңилдетүү

Сырсөздөр шифрленген болушу мүмкүн (демейкиге караганда кылдаттык менен) же туташуу түрүн калтырсаңыз болот (мен кылгандай tg_main) - чындыгында, түрлөрүнүн тизмеси Airflow моделдеринде жабдылган жана баштапкы коддорго кирбестен кеңейтилбейт (эгерде күтүлбөгөн жерден мен Google'га бир нерсе киргизбей калсам, мени оңдоп коюңуз), бирок эч нерсе бизге кредиттерди алууга тоскоол болбойт. аты.

Ошондой эле бир эле ат менен бир нече байланыштарды түзө аласыз: бул учурда, ыкма BaseHook.get_connection(), бизге аты менен байланыштарды алат, берет кокустук бир нече ысымдардан (Round Robin жасоо логикалык жактан туура болмок, бирок аны Airflow иштеп чыгуучулардын абийирине калтыралы).

Өзгөрмөлөр жана Туташуулар албетте сонун инструменттер, бирок балансты жоготпоо маанилүү: агымыңыздын кайсы бөлүктөрүн коддун өзүндө сактайсыз жана кайсы бөлүктөрүн Airflow'ка сактоо үчүн бересиз. Бир жагынан алганда, UI аркылуу маанини, мисалы, почта кутусун тез өзгөртүүгө ыңгайлуу болушу мүмкүн. Башка жагынан алганда, бул дагы эле чычканды чыкылдатууга кайтуу, андан биз (мен) кутулууну кааладык.

Байланыштар менен иштөө милдеттердин бири илгичтер. Жалпысынан алганда, Airflow илгичтери аны үчүнчү тараптын кызматтарына жана китепканаларына туташтыруу үчүн пункттар болуп саналат. Мисалга, JiraHook Jira менен иштешүү үчүн бизге кардарды ачат (сиз тапшырмаларды алдыга жана артка жылдыра аласыз) жана анын жардамы менен SambaHook жергиликтүү файлды түртсөңүз болот smb-пункт.

Ыңгайлаштырылган операторду талдоо

Ал эми биз анын кантип жасалганын кароого жакындап калдык TelegramBotSendMessage

коду commons/operators.py чыныгы оператор менен:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Бул жерде, Airflow башка бардык нерселер сыяктуу эле, баары абдан жөнөкөй:

  • дан мураска алынган BaseOperator, ал бир нече аба агымына тиешелүү нерселерди ишке ашырат (бос убактыңызды караңыз)
  • Жарыяланган талаалар template_fields, анда Джиндя иштетүү үчүн макросторду издейт.
  • үчүн туура аргументтерди уюштурган __init__(), зарыл болгон жерде демейки параметрлерди коюңуз.
  • Бабаны инициализациялоону да эстен чыгарган жокпуз.
  • Тиешелүү илгичти ачты TelegramBotHookандан кардар объектисин алды.
  • Overridden (кайра аныкталган) ыкма BaseOperator.execute(), кайсы Airfow операторду ишке киргизүү убактысы келгенде кыйрайт - анда биз кирүүнү унутуп, негизги аракетти ишке ашырабыз. (Баса, биз киребиз stdout и stderr - Аба агымы бардыгын кармап, кооз ороп, керек болсо, аны майдалайт.)

Келгиле, бизде эмне бар экенин карап көрөлү commons/hooks.py. Файлдын биринчи бөлүгү, илгичтин өзү менен:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Мен бул жерде эмнени түшүндүрүүнү да билбейм, мен жөн гана маанилүү пункттарды белгилейм:

  • Биз мурастайбыз, аргументтер жөнүндө ойлонобуз - көпчүлүк учурда бул бир болот: conn_id;
  • Стандарттык ыкмаларды жокко чыгаруу: Мен өзүмдү чектедим get_conn(), анда мен байланыш параметрлерин аты менен алып, жөн гана бөлүмдү алам extra (бул JSON талаасы), анда мен (өзүмдүн көрсөтмөлөрүм боюнча!) Telegram бот белгисин койдум: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Мен биздин мисалды түзөм TelegramBot, ага конкреттүү белги берүү.

Баары болду. Сиз колдонуу менен илгичтен кардарды ала аласыз TelegramBotHook().clent же TelegramBotHook().get_conn().

Ал эми файлдын экинчи бөлүгү, мен ошол эле сүйрөп кетпеш үчүн Telegram REST API үчүн микроблокторду жасайм. python-telegram-bot бир ыкма үчүн sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Туура жол - мунун бардыгын кошуу: TelegramBotSendMessage, TelegramBotHook, TelegramBot - плагинде коомдук репозиторийге салып, аны Open Sourceге бериңиз.

Биз мунун баарын изилдеп жатканыбызда, биздин отчеттун жаңыртуулары ийгиликсиз болуп, каналга ката билдирүү жөнөттү. Мен туура эмес экенин текшерип көрөм ...

Apache Airflow: ETLди жеңилдетүү
Биздин итибизде бир нерсе сынып кетти! Биз күткөн нерсе эмеспи? Так!

куюп жатасыңбы?

Мен бир нерсени сагындым деп ойлойсуңбу? Ал SQL Serverден Verticaга маалыматтарды өткөрүп берем деп убада берген окшойт, анан аны алып, темадан алыстап кетти окшойт, шылуун!

Бул мыкаачылык атайылап жасалган, мен жөн гана сиз үчүн кээ бир терминологияны чечмелеп беришим керек болчу. Эми сиз андан ары кете аласыз.

Биздин план мындай болчу:

  1. Даг
  2. Тапшырмаларды түзүү
  3. Баары кандай сонун экенин көр
  4. Толтуруу үчүн сессия номерлерин дайындаңыз
  5. SQL серверинен маалыматтарды алуу
  6. Маалыматтарды Verticaга салыңыз
  7. Статистиканы чогултуу

Ошентип, мунун баарын ишке ашыруу үчүн, мен өзүбүзгө кичине кошумча жасадым docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Ал жерде биз көтөрөбүз:

  • Vertica хост катары dwh демейки жөндөөлөр менен,
  • SQL Server үч нускасы,
  • биз акыркы маалыматтар базасын кээ бир маалыматтар менен толтурабыз (эч кандай учурда карабагыла mssql_init.py!)

Биз бардык жакшылыктарды мурункуга караганда бир аз татаалыраак команданын жардамы менен ишке киргизебиз:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Биздин керемет рандомизатор жараткан нерсени сиз колдоно аласыз Data Profiling/Ad Hoc Query:

Apache Airflow: ETLди жеңилдетүү
Эң башкысы аны аналитиктерге көрсөтпөө

иштеп чыгуу ETL сессиялары Мен андай кылбайм, ал жерде баары анча маанилүү эмес: биз база жасайбыз, анда белги бар, биз баарын контексттик менеджер менен ороп алабыз, эми муну жасайбыз:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Убакыт келди биздин маалыматтарды чогултуу бир жарым жуз столубуздан. Келгиле, муну өтө жөнөкөй саптардын жардамы менен кылалы:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Илгичтин жардамы менен биз Аба агымынан алабыз pymssql-байланыш
  2. Сурамга дата түрүндөгү чектөөнү алмаштыралы - ал шаблон кыймылдаткычы тарабынан функцияга ыргытылат.
  3. Биздин өтүнүчүбүздү тойгузуу pandasбизди ким алат DataFrame - Бул келечекте бизге пайдалуу болот.

Мен алмаштырууну колдонуп жатам {dt} суроо параметринин ордуна %s Мен жаман Пиноккио болгондуктан эмес, анткени pandas көтөрө албайт pymssql жана акыркысын тайдырат params: Listал чындап кааласа да tuple.
Ошондой эле иштеп чыгуучу экенин белгилей кетүү керек pymssql мындан ары аны колдобоону чечти, эми көчүп кетүүгө убакыт келди pyodbc.

Келгиле, Airflow биздин функциялардын аргументтерин эмне менен толтурганын карап көрөлү:

Apache Airflow: ETLди жеңилдетүү

Эгерде маалымат жок болсо, анда улантуунун кереги жок. Бирок толтурууну ийгиликтүү деп эсептөө кызык. Бирок бул ката эмес. А-а-аа, эмне кылуу керек?! Жана бул жерде:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException Airflow эч кандай ката жок экенин айтат, бирок биз тапшырманы өткөрүп жиберебиз. Интерфейс жашыл же кызыл чарчы эмес, кызгылт түстө болот.

Келгиле, маалыматтарыбызды таштайлы бир нече тилке:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

атап айтканда:

  • Биз буйрутма алган маалымат базасы,
  • Биздин суу ташкын сессиясынын ID (ал башкача болот ар бир иш үчүн),
  • Булактан жана буйрутма идентификаторунан хэш - акыркы маалымат базасында (бардыгы бир таблицага куюлган) бизде уникалдуу заказ ID бар.

Акыркы кадам калды: бардыгын Verticaга куюңуз. Жана таң калыштуусу, муну жасоонун эң сонун жана эффективдүү жолдорунун бири CSV аркылуу!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Атайын ресивер жасап жатабыз StringIO.
  2. pandas ырайымдуулук менен коёт DataFrame түрүндө CSV-саптар.
  3. Келгиле, биздин сүйүктүү Vertica менен байланышты илгич менен ачалы.
  4. Ал эми азыр жардамы менен copy() биздин маалыматтарды түздөн-түз Vertika жөнөтүү!

Биз айдоочудан канча линия толтурулганын алып, сеанс менеджерине бардыгы жайында деп айтабыз:

session.loaded_rows = cursor.rowcount
session.successful = True

Баары болду.

сатуу боюнча, биз кол менен максаттуу табак түзүү. Бул жерде мен өзүмө кичинекей машинага уруксат бердим:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Мен колдонуп жатам VerticaOperator() Мен маалымат базасынын схемасын жана таблицасын түзөм (эгер алар буга чейин жок болсо, албетте). негизги нерсе туура көз карандылыкты уюштуруу болуп саналат:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

кошуу жолу менен

- Мейли, - деди кичинекей чычкан, - чынбы, азыр
Мен токойдогу эң коркунучтуу жаныбар экениме ишенесиңби?

Джулия Дональдсон, The Gruffalo

Менин оюмча, эгерде менин кесиптештерим менен менде атаандаштык болсо: ким ETL процессин нөлдөн баштап тез түзүп, ишке киргизет: алар SSIS жана чычкан менен, ал эми мен Airflow менен ... Анан биз тейлөөнүн жеңилдигин салыштырмакпыз ... Вау, мен аларды бардык фронттордо жеңем дегениме макул болосуңар деп ойлойм!

Эгер бир аз олуттуураак болсо, анда Apache Airflow - программалык код түрүндө процесстерди сүрөттөп, менин ишимди аткарды дагы дагы ыңгайлуу жана жагымдуу.

Анын чексиз кеңейүү мүмкүнчүлүгү, плагиндер жагынан да, масштабдалууга ыңгайлуулугу жагынан да, аба агымын дээрлик бардык чөйрөдө колдонууга мүмкүнчүлүк берет: маалыматтарды чогултуунун, даярдоонун жана иштетүүнүн толук циклинде, атүгүл ракеталарды учурууда (Марска, курс).

Бөлүм жыйынтыктоочу, маалымдама жана маалымат

Биз сиз үчүн чогулткан тырмоо

  • start_date. Ооба, бул жергиликтүү мем. Дагдын негизги аргументи аркылуу start_date баары өтөт. Кыскача айтканда, эгер сиз белгилесеңиз start_date учурдагы дата жана schedule_interval - бир күнү, анан DAG эрте эмес эртең башталат.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Анан дагы көйгөйлөр жок.

    Аны менен байланышкан дагы бир иштөө катасы бар: Task is missing the start_date parameter, бул көбүнчө сиз даг операторуна байланышууну унутуп калганыңызды билдирет.

  • Баары бир машинада. Ооба, жана базалар (Airflow өзү жана биздин каптоо), жана веб-сервер, пландоочу жана жумушчулар. Жана ал тургай иштеген. Бирок убакыттын өтүшү менен кызматтар үчүн тапшырмалардын саны өсүп, PostgreSQL индекске 20 мс эмес, 5 секундада жооп бере баштаганда, биз аны алып, алып кеттик.
  • LocalExecutor. Ооба, биз дагы эле анын үстүндө отурабыз, ансыз деле туңгуюктун четине келип калдык. LocalExecutor ушул убакка чейин биз үчүн жетиштүү болду, бирок азыр жок дегенде бир жумушчу менен кеңейтүүгө убакыт келди жана биз CeleryExecutorго өтүү үчүн көп иштешибиз керек. Жана аны менен бир машинада иштей ала турганыңызды эске алганда, Сельдерейди атүгүл серверде колдонууга эч нерсе тоскоол болбойт, ал "албетте, чынчылдык менен эч качан өндүрүшкө кирбейт!"
  • Колдонбоо орнотулган аспаптар:
    • Соолуган кызмат грамоталарын сактоо үчүн,
    • SLA Misses өз убагында аткарылбаган тапшырмаларга жооп берүү,
    • xcom метадайындар алмашуу үчүн (мен айттым максатмаалыматтар!) даг милдеттердин ортосунда.
  • Почтаны кыянаттык менен пайдалануу. Мейли, эмне десем болот? Кулаган тапшырмалардын бардык кайталанышы үчүн эскертүүлөр орнотулган. Азыр менин жумушум Gmail'де Airflow'тан> 90 миң электрондук кат бар жана веб-почта бир эле учурда 100дөн ашык каттарды алып, жок кылуудан баш тартат.

Көбүрөөк тузактар: Apache Airflow Pitfails

Дагы автоматташтыруу куралдары

Колубуз менен эмес, башыбыз менен дагы көбүрөөк иштешибиз үчүн, Airflow бизге муну даярдады:

  • REST API - ал дагы эле Эксперименттик статусуна ээ, бул анын иштөөсүнө тоскоолдук кылбайт. Анын жардамы менен сиз дагдар жана тапшырмалар жөнүндө маалымат алып тим болбостон, дагды токтотуп/баштап, DAG Run же бассейнди түзө аласыз.
  • CLI - көптөгөн инструменттер буйрук сабы аркылуу жеткиликтүү, алар жөн гана WebUI аркылуу колдонууга ыңгайсыз эмес, бирок жалпысынан жок. Мисалы:
    • backfill тапшырма учурларын кайра баштоо үчүн керек.
      Маселен, аналитиктер келип: «А сизде, жолдош, 1-январдан 13-январга чейинки маалыматтарда куру кеп бар! Оңдо, оңдо, оңдо, оңдо! А сен ушундай мешсиң:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Негизги кызмат: initdb, resetdb, upgradedb, checkdb.
    • run, бул сизге бир нуска тапшырмасын аткарууга, ал тургай, бардык көз карандылыктар боюнча упай алууга мүмкүндүк берет. Мындан тышкары, сиз аны аркылуу иштете аласыз LocalExecutor, сизде Сельдерей кластери болсо да.
    • Дээрлик бир эле нерсени жасайт test, базаларда гана эч нерсе жазбайт.
    • connections кабыктан байланыштарды массалык түрдө түзүүгө мүмкүндүк берет.
  • python api - плагиндер үчүн арналган жана кичинекей колдор менен аралашпастан, өз ара аракеттенүүнүн абдан катуу ыкмасы. Бирок бизге барууга ким тоскоол болот /home/airflow/dags, чуркагыла ipython жана баш аламандык кыла баштайсызбы? Сиз, мисалы, төмөнкү код менен бардык байланыштарды экспорттой аласыз:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Airflow метаберилиштер базасына туташууда. Мен ага жазууну сунуш кылбайм, бирок ар кандай конкреттүү көрсөткүчтөр үчүн тапшырма абалдарын алуу API'лердин бирин колдонууга караганда алда канча тез жана оңой болушу мүмкүн.

    Айталы, биздин бардык тапшырмаларыбыз идемпотенттүү эмес, бирок алар кээде кулап калышы мүмкүн жана бул нормалдуу көрүнүш. Бирок бир нече бөгөттөлүүлөр буга чейин эле шектүү, аны текшерүү керек.

    Абайлаңыз SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

шилтемелер

Жана, албетте, Google чыгарган алгачкы он шилтеме - бул менин кыстармалардагы Airflow папкасынын мазмуну.

Жана макалада колдонулган шилтемелер:

Source: www.habr.com