Apache ауа ағыны: ETL жеңілдету

Сәлеметсіз бе, мен Дмитрий Логвиненкомын - Vezet компаниялар тобының аналитика бөлімінің инженері.

Мен сізге ETL процестерін дамытуға арналған тамаша құрал - Apache Airflow туралы айтып беремін. Бірақ ауа ағыны соншалықты жан-жақты және көп қырлы, сондықтан сіз деректер ағынына қатыспасаңыз да, оны мұқият қарауыңыз керек, бірақ кез келген процестерді кезеңді түрде іске қосып, олардың орындалуын бақылау қажет.

Иә, мен айтып қана қоймай, көрсетемін: бағдарламада көптеген кодтар, скриншоттар мен ұсыныстар бар.

Apache ауа ағыны: ETL жеңілдету
Google-де Airflow / Wikimedia Commons сөзін іздегенде әдетте көретін нәрсе

Мазмұны

Кіріспе

Apache Airflow Джанго сияқты:

  • питон тілінде жазылған
  • тамаша әкімші панелі бар,
  • шексіз кеңеюде

- тек жақсырақ және ол мүлде басқа мақсаттар үшін жасалған, атап айтқанда (кат алдында жазылғандай):

  • машиналарда шектеусіз тапсырмаларды орындау және бақылау (көптеген балдыркөк / Кубернете және сіздің ар-ұжданыңыз мүмкіндік береді)
  • Python кодын жазуға және түсінуге оңайдан динамикалық жұмыс ағынының генерациясымен
  • және дайын компоненттерді де, үйде жасалған плагиндерді де пайдалана отырып, кез келген дерекқорлар мен API интерфейстерін бір-бірімен қосу мүмкіндігі (бұл өте қарапайым).

Біз Apache Airflow қолданамыз:

  • біз әртүрлі көздерден деректерді жинаймыз (көптеген SQL Server және PostgreSQL даналары, қолданба өлшемдері бар әртүрлі API интерфейстері, тіпті 1C) DWH және ODS (бізде Vertica және Clickhouse бар).
  • қаншалықты озық cron, ол ODS бойынша деректерді біріктіру процестерін бастайды, сондай-ақ олардың сақталуын бақылайды.

Соңғы уақытқа дейін біздің қажеттіліктер 32 ядросы және 50 ГБ жедел жады бар бір шағын сервермен жабылды. Ауа ағынында бұл жұмыс істейді:

  • более 200 тг (шын мәнінде біз тапсырмаларды толтырған жұмыс процестері),
  • әрқайсысында орташа 70 тапсырма,
  • бұл жақсылық басталады (сонымен қатар орташа) сағатына бір рет.

Біз қалай кеңейткеніміз туралы мен төменде жазамын, бірақ енді біз шешетін über-проблемасын анықтайық:

Үш бастапқы SQL сервері бар, олардың әрқайсысында 50 дерекқор бар - бір жобаның даналары, сәйкесінше, олардың құрылымы бірдей (барлық жерде дерлік, муа-ха-ха), яғни әрқайсысында Тапсырыстар кестесі бар (бақытымызға орай, бұл кесте бар). атауды кез келген бизнеске енгізуге болады). Біз деректерді қызмет өрістерін (бастапқы сервер, бастапқы дерекқор, ETL тапсырма идентификаторы) қосу арқылы аламыз және оларды, айталық, Vertica-ға жібереміз.

Барайық!

Негізгі бөлім, практикалық (және аздап теориялық)

Неліктен біз (және сіз)

Ағаштар үлкен, мен қарапайым болған кезде SQLБір ресейлік бөлшек саудада -schik, біз қол жетімді екі құралды пайдаланып ETL процестерін, яғни деректер ағындарын алдап алдық:

  • Ақпараттық қуат орталығы - өте кең таралған жүйе, өте өнімді, өзінің аппараттық құралдары, өзіндік нұсқасы бар. Мен оның мүмкіндіктерінің 1% құдай сақтасын пайдаландым. Неліктен? Ең алдымен, 380-шы жылдардағы бұл интерфейс бізге ойша қысым жасады. Екіншіден, бұл контрацепция өте сәнді процестерге, компоненттерді ашулы қайта пайдалануға және басқа да өте маңызды-кәсіпорын-трюктерге арналған. Оның құны туралы, мысалы, Airbus AXNUMX қанаты / жылына, біз ештеңе айтпаймыз.

    Сақ болыңыз, скриншот 30 жасқа толмаған адамдарға аздап зиян тигізуі мүмкін

    Apache ауа ағыны: ETL жеңілдету

  • SQL серверімен біріктіру сервері - біз бұл жолдасты жобаішілік ағындарымызда пайдаландық. Шындығында: біз SQL серверін қолданамыз және оның ETL құралдарын пайдаланбау қандай да бір негізсіз болар еді. Ондағы барлығы жақсы: интерфейс әдемі де, орындалу туралы есептер де ... Бірақ біз бағдарламалық өнімдерді жақсы көретініміз емес, ол үшін емес. Оның нұсқасы dtsx (сақтау кезінде араластырылған түйіндері бар XML дегеніміз) біз жасай аламыз, бірақ мұның мәні неде? Жүздеген кестелерді бір серверден екіншісіне апаратын тапсырмалар пакетін жасау туралы не айтуға болады? Иә, не жүз, сұқ саусағың жиырма бөліктен түсіп қалады, тышқанның түймесін басып. Бірақ ол әлдеқайда сәнді көрінеді:

    Apache ауа ағыны: ETL жеңілдету

Біз, әрине, шығу жолдарын іздедік. Іс тіпті дерлік өздігінен жазылған SSIS пакетінің генераторына келді ...

...сосын маған жаңа жұмыс табылды. Ал Apache Airflow мені басып озды.

Мен ETL процесінің сипаттамасы қарапайым Python коды екенін білгенде, мен жай ғана қуаныштан билемедім. Деректер ағындары осылайша нұсқаланды және дифференцияланды және жүздеген дерекқорлардан бір мақсатқа бір құрылымды кестелерді құю бір жарым немесе екі 13 ”экранда Python кодының мәселесі болды.

Кластерді құрастыру

Толық балабақшаны ұйымдастырмай-ақ қояйық және мұнда Airflow орнату, таңдаған дерекқор, балдыркөк және доктарда сипатталған басқа да жағдайлар туралы айтпай-ақ қояйық.

Тәжірибелерді бірден бастау үшін мен эскиз жасадым docker-compose.yml онда:

  • Шын мәнінде көтерейік Ауа шығыны: Жоспарлағыш, веб-сервер. Гүл де сол жерде балдыркөк тапсырмаларын бақылау үшін айналады (өйткені ол итерілген apache/airflow:1.10.10-python3.7, бірақ қарсы емеспіз)
  • PostgreSQL, онда Airflow өзінің қызмет ақпаратын (жоспарлаушы деректері, орындалу статистикасы, т.б.) жазады және балдыркөк аяқталған тапсырмаларды белгілейді;
  • Редис, ол балдыркөк үшін тапсырма брокері ретінде әрекет етеді;
  • Балдыркөк жұмысшысытапсырмаларды тікелей орындаумен айналысатын болады.
  • Қалтаға ./dags біз файлдарды дагтардың сипаттамасымен қосамыз. Оларды ұшқанда алып кетеді, сондықтан әрбір түшкіргеннен кейін бүкіл стекпен жонглёрлеудің қажеті жоқ.

Кейбір жерлерде мысалдардағы код толық көрсетілмеген (мәтінді шатастырмау үшін), бірақ бір жерде ол процесте өзгертілген. Толық жұмыс кодының мысалдарын репозиторийден табуға болады https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Ескерту:

  • Композицияны құрастыруда мен көбіне белгілі образға сүйендім пукель/докер-ауа ағыны - міндетті түрде тексеріңіз. Мүмкін сізге өмірде басқа ештеңе керек емес шығар.
  • Барлық ауа ағыны параметрлері арқылы ғана емес қол жетімді airflow.cfg, сонымен қатар қоршаған ортаның айнымалы мәндері арқылы (әзірлеушілерге рахмет), мен оны қасақана пайдаландым.
  • Әрине, ол өндіріске дайын емес: мен әдейі жүрек соғысын контейнерлерге қоймадым, қауіпсіздікпен алаңдамадым. Бірақ мен экспериментшілерімізге қолайлы минимумды жасадым.
  • Ескертіп қой:
    • Dag қалтасы жоспарлаушыға да, жұмысшыларға да қолжетімді болуы керек.
    • Бұл барлық үшінші тарап кітапханаларына қатысты - олардың барлығы жоспарлаушы мен жұмысшылары бар машиналарға орнатылуы керек.

Енді бәрі қарапайым:

$ docker-compose up --scale worker=3

Барлығы көтерілгеннен кейін сіз веб-интерфейстерді көре аласыз:

Негізгі ұғымдар

Егер сіз осы «дагтардың» бәрінде ештеңе түсінбесеңіз, міне қысқаша сөздік:

  • Жоспарлағыш - Airflow-тағы ең маңызды ағай, роботтардың адам емес, қатты жұмыс істейтінін бақылайды: кестені бақылайды, деректерді жаңартады, тапсырмаларды іске қосады.

    Жалпы, ескі нұсқаларда оның жадында проблемалар болды (жоқ, амнезия емес, ағып кетеді) және бұрынғы параметр тіпті конфигурацияларда қалды. run_duration — оның қайта іске қосылу аралығы. Бірақ қазір бәрі жақсы.

  • ГПДР (aka «dag») - «бағытталған ациклдік график», бірақ мұндай анықтама аз адамдарға айтады, бірақ іс жүзінде бұл бір-бірімен өзара әрекеттесетін тапсырмаларға арналған контейнер (төменде қараңыз) немесе SSIS ішіндегі пакеттің аналогы және Informatica жұмыс процесі .

    Дагтардан басқа, субдагтар әлі де болуы мүмкін, бірақ біз оларға жете алмаймыз.

  • DAG Run - инициализацияланған даг, оған меншікті тағайындалады execution_date. Бір дагның даграндары параллель жұмыс істей алады (егер сіз өз тапсырмаларыңызды идемпотентті етіп жасасаңыз, әрине).
  • Оператор белгілі бір әрекетті орындауға жауапты код бөліктері. Операторлардың үш түрі бар:
    • іс-шараларбіздің сүйікті сияқты PythonOperator, ол кез келген (жарамды) Python кодын орындай алады;
    • аударымдеректерді бір жерден екінші жерге тасымалдайтын , айталық, MsSqlToHiveTransfer;
    • сенсор екінші жағынан, бұл оқиға орын алғанша әрекет етуге немесе дагның одан әрі орындалуын бәсеңдетуге мүмкіндік береді. HttpSensor көрсетілген соңғы нүктені тарта алады және қажетті жауап күткенде, тасымалдауды бастаңыз GoogleCloudStorageToS3Operator. Ізденімпаз сана: «Неге? Ақыр соңында, сіз қайталауды дәл операторда жасай аласыз!» Содан кейін, тоқтатылған операторлармен тапсырмалар пулын бітеп алмау үшін. Сенсор келесі әрекетке дейін іске қосылады, тексереді және өледі.
  • тапсырма - түріне қарамастан жарияланған және дагға бекітілген операторлар тапсырма дәрежесіне көтеріледі.
  • тапсырма данасы - бас жоспарлаушы тапсырмаларды орындаушы-жұмысшыларға жіберу уақыты келді деп шешкен кезде (егер қолданатын болсақ, дәл сол жерде LocalExecutor немесе жағдайда қашықтағы түйінге CeleryExecutor), ол оларға контекст тағайындайды (яғни, айнымалылар жиыны – орындау параметрлері), пәрмен немесе сұрау үлгілерін кеңейтеді және оларды біріктіреді.

Біз тапсырмаларды жасаймыз

Алдымен, біздің қамырдың жалпы схемасын белгілеп алайық, содан кейін біз егжей-тегжейлерге көбірек енеміз, өйткені біз кейбір тривиальды емес шешімдерді қолданамыз.

Осылайша, қарапайым түрде мұндай дагжа келесідей болады:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Келіңіздер,

  • Біріншіден, біз қажетті libs және импорттаймыз басқа нәрсе;
  • sql_server_ds Мүмкін List[namedtuple[str, str]] Airflow Connections жүйесіндегі қосылымдардың атаулары және біз пластинаны алатын дерекқорлармен;
  • dag - міндетті түрде болуы керек біздің дагымыздың хабарландыруы globals(), әйтпесе Airflow оны таба алмайды. Даг сонымен қатар мынаны айтуы керек:
    • оның аты кім orders - бұл атау содан кейін веб-интерфейсте пайда болады,
    • сегізінші шілдеде түн ортасынан бастап жұмыс істейтінін,
    • және ол шамамен әр 6 сағат сайын жұмыс істеуі керек (мұнда емес, қатал жігіттер үшін timedelta() рұқсат етілген cron-түзу 0 0 0/6 ? * * *, аз салқын үшін - сияқты өрнек @daily);
  • workflow() негізгі жұмысты атқарады, бірақ қазір емес. Әзірге біз контекстімізді журналға жібереміз.
  • Ал енді тапсырмаларды жасаудың қарапайым сиқыры:
    • біз өзіміздің дереккөздеріміз арқылы жүреміз;
    • инициализациялау PythonOperator, ол біздің манекенімізді орындайды workflow(). Тапсырманың бірегей (даг ішінде) атауын көрсетуді және дагтың өзін байлауды ұмытпаңыз. Жалау provide_context өз кезегінде функцияға қосымша аргументтер құйылады, біз оларды мұқият пайдаланып жинаймыз **context.

Әзірге бәрі осы. Бізде не бар:

  • веб-интерфейстегі жаңа дақ,
  • параллель орындалатын бір жарым жүз тапсырма (егер ауа ағыны, балдыркөк параметрлері және сервер сыйымдылығы рұқсат етсе).

Түсініп қалдым.

Apache ауа ағыны: ETL жеңілдету
Тәуелділіктерді кім орнатады?

Осының бәрін жеңілдету үшін мен бұрылдым docker-compose.yml өңдеу requirements.txt барлық түйіндерде.

Енді кетті:

Apache ауа ағыны: ETL жеңілдету

Сұр квадраттар - жоспарлаушы өңдейтін тапсырма даналары.

Біз біраз күтеміз, жұмысшылар тапсырмаларды орындайды:

Apache ауа ағыны: ETL жеңілдету

Жасылдар, әрине, сәтті жұмыс істеді. Қызылдар өте сәтті емес.

Айтпақшы, біздің өнімде қалта жоқ ./dags, машиналар арасында синхрондау жоқ - барлық дагдар жатады git Gitlab жүйесінде және Gitlab CI біріктіру кезінде машиналарға жаңартуларды таратады master.

Гүл туралы аздап

Жұмысшылар біздің емізіктерді қағып жатқанда, бізге бірдеңе көрсете алатын тағы бір құралды еске түсірейік - Гүл.

Жұмысшы түйіндері туралы жиынтық ақпараты бар ең бірінші бет:

Apache ауа ағыны: ETL жеңілдету

Жұмысқа кеткен тапсырмалары бар ең қарқынды бет:

Apache ауа ағыны: ETL жеңілдету

Біздің брокер мәртебесі бар ең скучно бет:

Apache ауа ағыны: ETL жеңілдету

Ең жарқын бет - тапсырма күйінің графиктері және олардың орындалу уақыты:

Apache ауа ағыны: ETL жеңілдету

Біз аз жүктелгенді жүктейміз

Сонымен, барлық тапсырмалар орындалды, сіз жаралыларды алып кете аласыз.

Apache ауа ағыны: ETL жеңілдету

Және көптеген жараланғандар болды - бір себептермен немесе басқа. Ауа ағыны дұрыс пайдаланылған жағдайда, дәл осы квадраттар деректердің анық келмегенін көрсетеді.

Журналды қарап, құлаған тапсырма даналарын қайта бастау керек.

Кез келген шаршыны басу арқылы біз қол жетімді әрекеттерді көреміз:

Apache ауа ағыны: ETL жеңілдету

Сіз құлағандарды алып, Clear жасай аласыз. Яғни, біз ол жерде бірдеңе сәтсіз болғанын ұмытамыз және дәл сол даналық тапсырма жоспарлаушыға өтеді.

Apache ауа ағыны: ETL жеңілдету

Мұны барлық қызыл квадраттармен тінтуірмен жасау өте адамгершілікке жатпайтыны анық - бұл біз Airflow-тан күткен нәрсе емес. Әрине, бізде жаппай қырып-жоятын қару бар: Browse/Task Instances

Apache ауа ағыны: ETL жеңілдету

Барлығын бірден таңдап, нөлге қайтарайық, дұрыс элементті басыңыз:

Apache ауа ағыны: ETL жеңілдету

Тазалаудан кейін біздің таксилер келесідей көрінеді (олар жоспарлаушының оларды жоспарлауын күтуде):

Apache ауа ағыны: ETL жеңілдету

Қосылымдар, ілгектер және басқа айнымалылар

Келесі DAG-ға қараудың уақыты келді, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Барлығы есеп жаңартуын жасады ма? Бұл тағы да ол: деректерді қайдан алуға болатын көздердің тізімі бар; қоюға болатын тізім бар; бәрі болған немесе бұзылған кезде сигнал беруді ұмытпаңыз (жақсы, бұл біз туралы емес, жоқ).

Файлды қайта қарап шығып, жаңа түсініксіз нәрселерді қарастырайық:

  • from commons.operators import TelegramBotSendMessage - блоктан шығарылған хабарламаларды жіберуге арналған шағын қаптама жасау арқылы біз өзіміздің операторларымызды жасауға ештеңе кедергі келтірмейді. (Бұл оператор туралы төменде толығырақ айтатын боламыз);
  • default_args={} - dag өзінің барлық операторларына бірдей аргументтерді тарата алады;
  • to='{{ var.value.all_the_kings_men }}' - өріс to бізде қатты кодталмаған, бірақ Jinja көмегімен динамикалық түрде жасалған және мен мұқият енгізген электрондық пошталар тізімі бар айнымалы болады. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — операторды іске қосу шарты. Біздің жағдайда хат барлық тәуелділіктер шешілген жағдайда ғана бастықтарға ұшады сәтті;
  • tg_bot_conn_id='tg_main' - дәлелдер conn_id біз жасайтын қосылым идентификаторларын қабылдаңыз Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Telegram-дағы хабарламалар орындалмаған тапсырмалар болған жағдайда ғана ұшып кетеді;
  • task_concurrency=1 - біз бір тапсырманың бірнеше тапсырма данасын бір уақытта іске қосуға тыйым саламыз. Әйтпесе, біз бір уақытта бірнеше ұшыруды аламыз VerticaOperator (бір үстелге қарап);
  • report_update >> [email, tg] - барлық VerticaOperator келесідей хаттар мен хабарламаларды жіберуде біріктіріңіз:
    Apache ауа ағыны: ETL жеңілдету

    Бірақ хабарландырушы операторлардың іске қосу шарттары әртүрлі болғандықтан, тек біреуі ғана жұмыс істейді. Ағаш көрінісінде бәрі азырақ көрнекі көрінеді:
    Apache ауа ағыны: ETL жеңілдету

туралы бірнеше сөз айтайын макростар және олардың достары - айнымалылар.

Макростар әр түрлі пайдалы ақпаратты оператор дәлелдеріне алмастыра алатын Джиндя толтырғыштары болып табылады. Мысалы, келесідей:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} контекстік айнымалының мазмұнына дейін кеңейеді execution_date форматта YYYY-MM-DD: 2020-07-14. Ең жақсы бөлігі - контекстік айнымалы мәндер нақты тапсырма данасына (Ағаш көрінісіндегі шаршы) бекітіледі және қайта іске қосылғанда толтырғыштар бірдей мәндерге дейін кеңейеді.

Тағайындалған мәндерді әрбір тапсырма данасында Көрсетілген түймесі арқылы көруге болады. Хат жіберу тапсырмасы келесідей:

Apache ауа ағыны: ETL жеңілдету

Сонымен, хабарлама жіберу тапсырмасында:

Apache ауа ағыны: ETL жеңілдету

Ең соңғы қол жетімді нұсқа үшін кірістірілген макростардың толық тізімі мына жерден қолжетімді: макрос сілтемесі

Сонымен қатар, плагиндердің көмегімен біз өз макростарымызды жариялай аламыз, бірақ бұл басқа әңгіме.

Алдын ала анықталған нәрселерден басқа, біз айнымалыларымыздың мәндерін ауыстыра аламыз (мен оны жоғарыдағы кодта қолдандым). Құрайық Admin/Variables екі нәрсе:

Apache ауа ағыны: ETL жеңілдету

Сіз қолдануға болатын барлық нәрсе:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Мән скаляр болуы мүмкін немесе JSON болуы мүмкін. JSON жағдайында:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

қажетті кілтке жолды пайдаланыңыз: {{ var.json.bot_config.bot.token }}.

Мен бір сөзді айтып, бір скриншотты көрсетемін қосылымдар. Мұнда бәрі қарапайым: бетте Admin/Connections біз қосылым жасаймыз, логиндерді / парольдерді және нақты параметрлерді қосамыз. Бұл сияқты:

Apache ауа ағыны: ETL жеңілдету

Құпиясөздер шифрлануы мүмкін (әдепкіге қарағанда мұқият) немесе қосылым түрін қалдыруға болады (мен жасағандай tg_main) - шын мәнінде, түрлердің тізімі Airflow үлгілерінде бекітілген және бастапқы кодтарға кірмей кеңейтілмейді (егер кенеттен мен Google-да бірдеңе жасамаған болсам, мені түзетіңіз), бірақ бізге несие алуымызға ештеңе кедергі болмайды. аты.

Сондай-ақ бір атпен бірнеше қосылымдар жасауға болады: бұл жағдайда әдіс BaseHook.get_connection(), ол бізге аты бойынша байланыстарды береді, береді кездейсоқ бірнеше есімдерден (Раунд Робин жасау қисындырақ болар еді, бірақ оны Airflow әзірлеушілерінің ар-ожданына қалдырайық).

Айнымалылар мен қосылымдар, әрине, керемет құралдар, бірақ тепе-теңдікті жоғалтпау маңызды: ағындарыңыздың қай бөліктерін кодтың өзінде сақтайсыз және қандай бөліктерді Airflow жүйесіне сақтау үшін бересіз. Бір жағынан, мәнді жылдам өзгерту, мысалы, пошта жәшігі, UI арқылы ыңғайлы болуы мүмкін. Екінші жағынан, бұл әлі де біз (мен) құтылғымыз келген тінтуірді басуға оралу.

Байланыстармен жұмыс – тапсырмалардың бірі ілгектер. Жалпы алғанда, Airflow ілмектері оны үшінші тарап қызметтері мен кітапханаларына қосуға арналған нүктелер болып табылады. Мысалыға, JiraHook Jira-мен әрекеттесу үшін клиентті ашады (тапсырмаларды алға және артқа жылжыта аласыз) және көмегімен SambaHook жергілікті файлды түртуге болады smb-нүкте.

Пайдаланушы операторын талдау

Біз оның қалай жасалғанын қарауға жақындадық TelegramBotSendMessage

код commons/operators.py нақты оператормен:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Мұнда, Airflow ішіндегі барлық басқа нәрселер сияқты, бәрі өте қарапайым:

  • Мұраға алынған BaseOperator, ол ауа ағынына қатысты бірнеше нәрсені жүзеге асырады (бос уақытыңызды қараңыз)
  • Жарияланған өрістер template_fields, онда Джинджа өңдеу үшін макростарды іздейді.
  • үшін дұрыс дәлелдерді ұйымдастырды __init__(), қажет жерде әдепкі мәндерді орнатыңыз.
  • Бабаны инициализациялауды да ұмытпадық.
  • Сәйкес ілгекті ашты TelegramBotHookодан клиенттік нысанды алды.
  • Қайта анықталған (қайта анықталған) әдіс BaseOperator.execute(), Операторды іске қосу уақыты келгенде Airfow дірілдейді - онда біз кіруді ұмытып, негізгі әрекетті орындаймыз. (Айтпақшы, біз кіреміз stdout и stderr - Ауа ағыны бәрін ұстап алады, оны әдемі орап, қажет жерде ыдыратады.)

Бізде не бар екенін көрейік commons/hooks.py. Ілмекпен бірге файлдың бірінші бөлігі:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Мен бұл жерде не түсіндірерімді білмеймін, тек маңызды тұстарды атап өтейін:

  • Біз мұра аламыз, дәлелдер туралы ойланамыз - көп жағдайда ол біреу болады: conn_id;
  • Стандартты әдістерді жоққа шығару: Мен өзімді шектедім get_conn(), онда мен қосылым параметрлерін аты бойынша аламын және жай ғана бөлімді аламын extra (бұл JSON өрісі), мен оған (өз нұсқауларым бойынша!) Telegram бот белгісін қойдым: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Мен біздің дананы жасаймын TelegramBot, оған белгілі бір белгі береді.

Осымен болды. Сіз ілмектен клиентті пайдалана аласыз TelegramBotHook().clent немесе TelegramBotHook().get_conn().

Файлдың екінші бөлігі, онда мен Telegram REST API үшін микроораманы жасаймын, осылайша сүйреп кетпеу үшін. python-telegram-bot бір әдіс үшін sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Барлығын қосудың дұрыс жолы: TelegramBotSendMessage, TelegramBotHook, TelegramBot - плагинде жалпыға қолжетімді репозиторийге қойыңыз және оны Open Source-қа беріңіз.

Осының бәрін зерттеп жатқанда, біздің есеп жаңартулары сәтсіз аяқталды және маған арнада қате туралы хабар жіберді. Мен оның дұрыс емес екенін тексеру үшін барамын ...

Apache ауа ағыны: ETL жеңілдету
Біздің итте бірдеңе бұзылды! Бұл біздің күткеніміз емес пе? Дәл!

Құйып бересіз бе?

Мен бір нәрсені сағындым деп ойлайсың ба? SQL Server-ден Vertica-ға мәліметтерді тасымалдауға уәде беріп, сосын алып, тақырыптан ауытқыған сияқты, масқара!

Бұл зұлымдық қасақана болды, мен сізге кейбір терминологияны шешуім керек болды. Енді сіз одан әрі қарай жүре аласыз.

Біздің жоспарымыз мынадай болды:

  1. Жаса
  2. Тапсырмаларды құру
  3. Барлығы қаншалықты әдемі екенін қараңыз
  4. Толтыруға сеанс нөмірлерін тағайындаңыз
  5. SQL серверінен деректерді алыңыз
  6. Деректерді Vertica ішіне қойыңыз
  7. Статистиканы жинаңыз

Осының бәрін іске қосу үшін мен өзімізге шағын қосымша жасадым docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Онда біз көтереміз:

  • Хост ретінде Vertica dwh ең әдепкі параметрлермен,
  • SQL серверінің үш данасы,
  • біз соңғы деректер қорын кейбір деректермен толтырамыз (ешқандай жағдайда да қарастырмаңыз mssql_init.py!)

Біз барлық жақсылықты өткенге қарағанда біршама күрделі команданың көмегімен іске қосамыз:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Біздің керемет рандомизатор жасаған нәрсе, сіз элементті пайдалана аласыз Data Profiling/Ad Hoc Query:

Apache ауа ағыны: ETL жеңілдету
Ең бастысы, оны талдаушыларға көрсетпеу керек

пысықтау ETL сеанстары Мен мұны істемеймін, ол жерде бәрі тривиальды: біз негіз жасаймыз, онда белгі бар, біз бәрін контекстік менеджермен орап аламыз, енді біз мұны істейміз:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Уақыт келді деректерімізді жинаймыз бір жарым жүз үстелімізден. Мұны өте қарапайым сызықтардың көмегімен жасайық:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Ілмектің көмегімен біз Airflow-тан аламыз pymssql-қосу
  2. Сұранысқа күн түріндегі шектеуді ауыстырайық - ол шаблондық қозғалтқыш арқылы функцияға жіберіледі.
  3. Біздің өтінішімізді тамақтандыру pandasбізді кім алады DataFrame - бұл болашақта бізге пайдалы болады.

Мен ауыстыруды қолданамын {dt} сұрау параметрінің орнына %s Мен зұлым Пиноккио болғандықтан емес, өйткені pandas көтере алмайды pymssql және соңғысын сырғытады params: Listол шынымен қаласа да tuple.
Сондай-ақ әзірлеушіге назар аударыңыз pymssql бұдан былай оны қолдамауды шешті, ал кетудің уақыты келді pyodbc.

Airflow біздің функцияларымыздың дәлелдерін немен толтырғанын көрейік:

Apache ауа ағыны: ETL жеңілдету

Егер деректер болмаса, онда жалғастырудың қажеті жоқ. Бірақ толтыруды сәтті деп санау да біртүрлі. Бірақ бұл қате емес. А-а-а, не істеу керек?! Міне, мыналар:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException Airflow-қа қателер жоқ екенін айтады, бірақ біз тапсырманы өткізіп жібереміз. Интерфейс жасыл немесе қызыл шаршы емес, қызғылт болады.

Деректерімізді лақтырайық бірнеше бағандар:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Атап айтқанда:

  • Біз тапсырыстар алған дерекқор,
  • Біздің су тасқыны сеансының идентификаторы (ол басқаша болады әрбір тапсырма үшін),
  • Дереккөзден және тапсырыс идентификаторынан алынған хэш - түпкілікті дерекқорда (бәрі бір кестеге құйылады) бізде бірегей тапсырыс идентификаторы болады.

Соңғы қадам қалады: барлығын Vertica-ға құйыңыз. Бір қызығы, мұны істеудің ең керемет және тиімді әдістерінің бірі - CSV арқылы!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Біз арнайы қабылдағыш жасап жатырмыз StringIO.
  2. pandas мейірімділікпен қояды DataFrame түрде CSV-сызықтар.
  3. Сүйікті Vertica-ға ілмекпен қосылымды ашайық.
  4. Ал енді көмегімен copy() деректерімізді тікелей Вертикаға жіберіңіз!

Біз жүргізушіден қанша жолдың толтырылғанын аламыз және сеанс менеджеріне бәрі дұрыс екенін айтамыз:

session.loaded_rows = cursor.rowcount
session.successful = True

Осымен болды.

Сатылымда біз мақсатты тақтаны қолмен жасаймыз. Мұнда мен өзіме шағын машинаға рұқсат бердім:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

пайдаланып жатырмын VerticaOperator() Мен дерекқор схемасын және кестені жасаймын (егер олар әлі жоқ болса, әрине). Ең бастысы - тәуелділіктерді дұрыс орналастыру:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Қорытындылау

– Жарайды, – деді кішкентай тышқан, – солай емес пе, енді
Менің ормандағы ең қорқынышты жануар екеніме сенімдісіз бе?

Джулия Дональдсон, The Gruffalo

Менің ойымша, егер менің әріптестерім мен менде бәсекелестік болса: ETL процесін нөлден бастап кім тез жасайды және іске қосады: олар SSIS және тінтуірмен және мен Airflow арқылы ... Содан кейін біз техникалық қызмет көрсетудің қарапайымдылығын салыстырар едік ... Уау, мен оларды барлық майданда жеңемін деп келісесіз деп ойлаймын!

Егер сәл байыпты болса, онда Apache Airflow - бағдарламалық код түріндегі процестерді сипаттау арқылы - менің жұмысымды жасады көп анағұрлым ыңғайлы және жағымды.

Оның қосылатын модульдер тұрғысынан да, ауқымдылыққа бейімділігі жағынан да шексіз кеңейту мүмкіндігі сізге ауа ағынын іс жүзінде кез келген салада пайдалануға мүмкіндік береді: тіпті деректерді жинаудың, дайындаудың және өңдеудің толық циклінде, тіпті зымырандарды ұшыру кезінде де (Марсқа, курс).

Қорытынды бөлім, анықтама және ақпарат

Біз сіз үшін жинаған тырма

  • start_date. Иә, бұл қазірдің өзінде жергілікті мем. Дагтың негізгі дәлелі арқылы start_date бәрі өтеді. Қысқаша айтқанда, егер сізде көрсетсеңіз start_date ағымдағы күн және schedule_interval - бір күні, содан кейін DAG ертең ерте емес басталады.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Және басқа проблемалар жоқ.

    Онымен байланысты басқа орындалу қатесі бар: Task is missing the start_date parameter, бұл көбінесе dag операторына байланыстыруды ұмытып кеткеніңізді көрсетеді.

  • Барлығы бір машинада. Иә, және негіздер (Airflow өзі және біздің жабынымыз), веб-сервер, жоспарлаушы және жұмысшылар. Және бұл тіпті жұмыс істеді. Бірақ уақыт өте келе қызметтерге арналған тапсырмалар саны өсті және PostgreSQL индекске 20 мс орнына 5 секундта жауап бере бастағанда, біз оны алып, алып кеттік.
  • Жергілікті орындаушы. Иә, біз әлі оның үстінде отырмыз, біз шыңыраудың шетіне жеттік. Осы уақытқа дейін LocalExecutor бізге жеткілікті болды, бірақ енді кем дегенде бір жұмысшымен кеңейту уақыты келді және біз CeleryExecutor-қа көшу үшін көп жұмыс істеуіміз керек. Сіз онымен бір машинада жұмыс істей алатыныңызды ескере отырып, балдыркөкті тіпті серверде де қолдануға ештеңе кедергі келтірмейді, ол «әрине, ешқашан өндіріске кірмейді!»
  • Қолданбау кіріктірілген құралдар:
    • Қосылымдар қызмет тіркелгі деректерін сақтау үшін,
    • SLA қателері уақытында орындалмаған тапсырмаларға жауап беру,
    • xcom метадеректер алмасу үшін (мен айттым метадеректер!) Dag тапсырмалары арасында.
  • Поштаны теріс пайдалану. Ал, мен не айта аламын? Құлаған тапсырмалардың барлық қайталануы үшін ескертулер орнатылды. Енді менің жұмысым Gmail-де Airflow-тан >90 мың электрондық пошта бар, ал веб-поштаның тұмсығы бір уақытта 100-ден астам алудан және жоюдан бас тартады.

Қосымша қателіктер: Apache Airflow Pitfails

Қосымша автоматтандыру құралдары

Қолымызбен емес, басымызбен жұмыс істеуіміз үшін Airflow бізге мынаны дайындады:

  • REST API - ол әлі де Эксперименттік мәртебесіне ие, бұл оның жұмыс істеуіне кедергі келтірмейді. Оның көмегімен сіз дагдар мен тапсырмалар туралы ақпаратты алып қана қоймай, сонымен қатар дагды тоқтатуға/бастауға, DAG Run немесе пулды жасауға болады.
  • CLI - WebUI арқылы пайдалану ыңғайсыз ғана емес, әдетте жоқ көптеген құралдар пәрмен жолы арқылы қол жетімді. Мысалы:
    • backfill тапсырма даналарын қайта бастау үшін қажет.
      Мәселен, сарапшылар келіп: «Ал, жолдас, 1-13 қаңтар аралығындағы деректерде бос сөз бар! Түзет, жөнде, жөнде, жөнде!» Ал сіз осындай плитасыз:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Негізгі қызмет: initdb, resetdb, upgradedb, checkdb.
    • run, бұл бір даналық тапсырманы орындауға және тіпті барлық тәуелділіктер бойынша ұпай алуға мүмкіндік береді. Сонымен қатар, сіз оны арқылы іске қоса аласыз LocalExecutor, тіпті сізде балдыркөк кластері болса да.
    • Дәл сол нәрсені жасайды test, тек негіздерде де ештеңе жазбайды.
    • connections қабықтан байланыстарды жаппай жасауға мүмкіндік береді.
  • python API - плагиндерге арналған және кішкентай қолдармен араласпаған өзара әрекеттесудің өте қиын әдісі. Бірақ бізге баруға кім кедергі жасайды /home/airflow/dags, жүгіріңіз ipython және араласа бастайсыз ба? Сіз, мысалы, келесі кодпен барлық қосылымдарды экспорттай аласыз:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Airflow метадеректер базасына қосылу. Мен оған жазуды ұсынбаймын, бірақ әртүрлі нақты көрсеткіштер үшін тапсырма күйлерін алу API интерфейстерінің кез келгеніне қарағанда әлдеқайда жылдам және оңай болуы мүмкін.

    Айталық, біздің барлық тапсырмаларымыз идепотентті емес, бірақ олар кейде құлап кетуі мүмкін және бұл қалыпты жағдай. Бірақ бірнеше бітелулер қазірдің өзінде күдікті және тексеру қажет.

    SQL-ден сақ болыңыз!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

сілтемелер

Әрине, Google шығарған алғашқы он сілтеме - бұл менің бетбелгілерімдегі Airflow қалтасының мазмұны.

Мақалада пайдаланылған сілтемелер:

Ақпарат көзі: www.habr.com