Апацхе Аирфлов: Олакшавање ЕТЛ-а

Здраво, ја сам Дмитриј Логвиненко - инжењер података одељења аналитике групе компанија Везет.

Рећи ћу вам о дивном алату за развој ЕТЛ процеса - Апацхе Аирфлов. Али Аирфлов је толико разноврстан и вишеструк да би требало да га боље погледате чак и ако нисте укључени у токове података, али имате потребу да повремено покрећете било који процес и надгледате њихово извршење.

И да, не само да ћу рећи, већ и показати: програм има пуно кода, снимака екрана и препорука.

Апацхе Аирфлов: Олакшавање ЕТЛ-а
Оно што обично видите када прогуглате реч Аирфлов / Викимедиа Цоммонс

Преглед садржаја

Увод

Апацхе Аирфлов је као Дјанго:

  • написан у питону
  • постоји одличан админ панел,
  • прошириво на неодређено време

- само боље, а направљен је за сасвим друге сврхе, наиме (како пише пре ката):

  • покретање и надгледање задатака на неограниченом броју машина (колико ће вам Целери / Кубернетес и ваша савест дозволити)
  • са динамичким генерисањем тока посла из Питхон кода који је врло једноставан за писање и разумевање
  • и могућност међусобног повезивања било које базе података и АПИ-ја користећи и готове компоненте и домаће додатке (што је изузетно једноставно).

Користимо Апацхе Аирфлов овако:

  • прикупљамо податке из различитих извора (много СКЛ Сервер и ПостгреСКЛ инстанци, разни АПИ-ји са метриком апликације, чак и 1Ц) у ДВХ и ОДС (имамо Вертица и Цлицкхоусе).
  • колико напредно cron, који покреће процесе консолидације података на ОДС-у, а такође прати њихово одржавање.

Наше потребе је донедавно покривао један мали сервер са 32 језгра и 50 ГБ РАМ-а. У Аирфлов-у ово функционише:

  • више 200 дагс (заправо токови посла, у које смо ставили задатке),
  • у сваком у просеку 70 задатака,
  • ова доброта почиње (такође у просеку) једном на сат.

А о томе како смо се проширили, писаћу у наставку, али сада хајде да дефинишемо убер-проблем који ћемо решити:

Постоје три изворна СКЛ сервера, сваки са 50 база података – инстанце једног пројекта, респективно, имају исту структуру (скоро свуда, муа-ха-ха), што значи да сваки има табелу наруџби (на срећу, табелу са тим име се може угурати у било који посао). Узимамо податке додавањем сервисних поља (изворни сервер, изворна база података, ИД ЕТЛ задатка) и наивно их бацамо у, рецимо, Вертику.

Идемо!

Главни део, практичан (и мало теоријски)

Зашто ми (и ви)

Кад је дрвеће било велико, а ја једноставан SQL-сцхик у једној руској малопродаји, преварили смо ЕТЛ процесе или токове података користећи два алата која су нам доступна:

  • Информатица Повер Центер - систем који се изузетно шири, изузетно продуктиван, са сопственим хардвером, сопственим верзијама. Користио сам не дај Боже 1% његових могућности. Зашто? Па, пре свега, овај интерфејс, негде из 380-их, психички је извршио притисак на нас. Друго, ова справа је дизајнирана за изузетно фенси процесе, бесну поновну употребу компоненти и друге веома важне трикове за предузећа. О томе колико кошта, попут крила Аирбус АXNUMX / године, нећемо ништа рећи.

    Пазите, снимак екрана може мало да повреди људе млађе од 30 година

    Апацхе Аирфлов: Олакшавање ЕТЛ-а

  • СКЛ Сервер Интеграциони Сервер - користили смо овог друга у нашим токовима унутар пројекта. Па, у ствари: већ користимо СКЛ Сервер, и било би некако неразумно не користити његове ЕТЛ алате. Све у њему је добро: и интерфејс је леп, и извештаји о напретку... Али не волимо софтверске производе зато, ох, не због овога. Версион ит dtsx (што је КСМЛ са измешаним чворовима при чувању) можемо, али у чему је поента? Шта кажете на израду пакета задатака који ће повући стотине табела са једног сервера на други? Да шта сто, отпашће ти кажипрст са двадесет комада, кликнувши на дугме миша. Али дефинитивно изгледа модерније:

    Апацхе Аирфлов: Олакшавање ЕТЛ-а

Сигурно смо тражили излазе. Чак и случај скоро дошао до самог генератора ССИС пакета ...

…и онда ме је пронашао нови посао. И Апацхе Аирфлов ме је претекао на њему.

Када сам сазнао да су описи ЕТЛ процеса једноставан Питхон код, једноставно нисам плесао од радости. Овако су токови података верзионисани и диференцирани, а сипање табела са једном структуром из стотина база података у један циљ постало је ствар Питхон кода на један и по или два екрана од 13 инча.

Састављање кластера

Хајде да не уредимо комплетно обданиште, и да не причамо о потпуно очигледним стварима, попут инсталирања Аирфлов-а, одабране базе података, Целера и других случајева описаних у доковима.

Да бисмо одмах могли да почнемо са експериментима, скицирао сам docker-compose.yml у којима:

  • Хајде да заправо подигнемо Проток ваздуха: Планер, Веб сервер. Цвет ће се такође окретати тамо да надгледа задатке целера (јер је већ гурнут apache/airflow:1.10.10-python3.7, али нам не смета)
  • ПостгреСКЛ, у који ће Аирфлов уписати своје сервисне информације (податке о распореду, статистику извршења, итд.), а Целери ће означити завршене задатке;
  • Редис, који ће деловати као посредник задатака за Целери;
  • Радник целера, која ће бити ангажована на непосредном извршавању задатака.
  • У фолдер ./dags ми ћемо додати наше датотеке са описом дагс. Они ће се покупити у ходу, тако да нема потребе да жонглирате целом гомилом након сваког кихања.

На неким местима код у примерима није у потпуности приказан (да не би затрпао текст), али се негде мења у процесу. Комплетни примери радног кода могу се наћи у спремишту https://github.com/dm-logv/airflow-tutorial.

доцкер-цомпосе.имл

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Напомене:

  • У склапању композиције у великој мери сам се ослањао на познату слику пуцкел/доцкер-аирфлов - обавезно проверите. Можда вам ништа више не треба у животу.
  • Сва подешавања протока ваздуха су доступна не само преко airflow.cfg, али и кроз променљиве окружења (захваљујући програмерима), које сам злонамерно искористио.
  • Наравно, није спреман за производњу: намерно нисам стављао откуцаје срца на контејнере, нисам се бавио обезбеђењем. Али урадио сам минимум прикладан за наше експериментаторе.
  • Напоменути да:
    • Фасцикла даг мора бити доступна и планеру и радницима.
    • Исто важи и за све библиотеке независних произвођача – све оне морају бити инсталиране на машинама са планером и радницима.

Па, сада је једноставно:

$ docker-compose up --scale worker=3

Након што се све подигне, можете погледати веб интерфејсе:

Основни концепти

Ако ништа нисте разумели у свим овим „даговима“, ево кратког речника:

  • Планер - најважнији ујак у Аирфлов-у, који контролише да роботи напорно раде, а не особа: прати распоред, ажурира дагове, покреће задатке.

    Генерално, у старијим верзијама, имао је проблема са меморијом (не, не амнезија, већ цурење) и легаци параметар је чак остао у конфигурацијама run_duration — његов интервал поновног покретања. Али сада је све у реду.

  • ДАГ (ака "даг") - "усмерени ациклични граф", али таква дефиниција ће рећи мало људи, али у ствари је то контејнер за задатке који међусобно комуницирају (погледајте доле) или аналог пакета у ССИС-у и тока рада у Информатици .

    Поред дагова, можда још постоје поддагови, али до њих највероватније нећемо доћи.

  • ДАГ Рун - иницијализовани даг, коме је додељен сопствени execution_date. Дагранови истог дага могу радити паралелно (ако сте своје задатке учинили идемпотентним, наравно).
  • оператор су делови кода одговорни за обављање одређене радње. Постоје три типа оператора:
    • акцијакао наш омиљени PythonOperator, који може да изврши било који (важећи) Питхон код;
    • пренос, који преносе податке од места до места, нпр. MsSqlToHiveTransfer;
    • сензор с друге стране, омогућиће вам да реагујете или успорите даље извршавање даг-а док се не догоди неки догађај. HttpSensor може повући наведену крајњу тачку, а када жељени одговор сачека, започните пренос GoogleCloudStorageToS3Operator. Радознали ум ће питати: „Зашто? На крају крајева, можете да радите понављања директно у оператеру!“ А онда, да не би закрчили базен задатака суспендованим оператерима. Сензор се покреће, проверава и угаси пре следећег покушаја.
  • Задатак - декларисани оператери, без обзира на врсту, и прикључени на даг, унапређују се у ранг задатка.
  • инстанца задатка - када је генерални планер одлучио да је време да се задаци пошаљу у борбу на извођаче-раднике (одмах на лицу места, ако користимо LocalExecutor или удаљеном чвору у случају CeleryExecutor), додељује им контекст (тј. скуп променљивих - параметара извршења), проширује шаблоне команди или упита и обједињује их.

Ми генеришемо задатке

Прво, хајде да оцртамо општу шему нашег доуга, а затим ћемо све више урањати у детаље, јер примењујемо нека нетривијална решења.

Дакле, у свом најједноставнијем облику, такав даг ће изгледати овако:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Хајде да схватимо:

  • Прво увозимо потребне библиотеке и нешто друго;
  • sql_server_ds - Је List[namedtuple[str, str]] са називима веза из Аирфлов Цоннецтионс и базама података из којих ћемо узети нашу плочу;
  • dag - саопштење нашег дага, које обавезно мора бити у globals(), иначе га Аирфлов неће пронаћи. Доуг такође треба да каже:
    • како се он зове orders - ово име ће се тада појавити у веб интерфејсу,
    • да ће осмог јула радити од поноћи,
    • и требало би да ради, отприлике сваких 6 сати (за јаке момке овде уместо timedelta() дозвољено cron-лине 0 0 0/6 ? * * *, за мање кул - израз као @daily);
  • workflow() обавиће главни посао, али не сада. За сада ћемо само избацити наш контекст у дневник.
  • А сада једноставна магија креирања задатака:
    • пролазимо кроз наше изворе;
    • иницијализовати PythonOperator, који ће извршити нашу лутку workflow(). Не заборавите да наведете јединствено (унутар дага) име задатка и вежите сам даг. Застава provide_context заузврат ће у функцију улити додатне аргументе, које ћемо пажљиво прикупити користећи **context.

За сада, то је све. Шта имамо:

  • нови даг у веб интерфејсу,
  • стотину и по задатака који ће се извршавати паралелно (ако то дозвољавају Аирфлов, Целери подешавања и капацитет сервера).

Па, скоро сам схватио.

Апацхе Аирфлов: Олакшавање ЕТЛ-а
Ко ће инсталирати зависности?

Да поједноставим целу ову ствар, зезнуо сам docker-compose.yml обрада requirements.txt на свим чворовима.

сада га нема:

Апацхе Аирфлов: Олакшавање ЕТЛ-а

Сиви квадрати су инстанце задатака које обрађује планер.

Чекамо мало, задатке похватају радници:

Апацхе Аирфлов: Олакшавање ЕТЛ-а

Зелени су, наравно, успешно завршили посао. Црвени нису баш успешни.

Иначе, на нашем производу нема фасцикле ./dags, нема синхронизације између машина - сви дагови леже у git на нашем Гитлабу, а Гитлаб ЦИ дистрибуира ажурирања машинама приликом спајања master.

Мало о цвету

Док нам радници млатарају цуцле, сетимо се још једног алата који нам нешто може показати – Цвета.

Прва страница са сажетим информацијама о радничким чворовима:

Апацхе Аирфлов: Олакшавање ЕТЛ-а

Најинтензивнија страница са задацима који су успели:

Апацхе Аирфлов: Олакшавање ЕТЛ-а

Најдосаднија страница са статусом нашег брокера:

Апацхе Аирфлов: Олакшавање ЕТЛ-а

Најсјајнија страница је са графиконима статуса задатака и временом њиховог извршавања:

Апацхе Аирфлов: Олакшавање ЕТЛ-а

Учитавамо недовољно оптерећење

Дакле, сви задаци су успели, можете да однесете рањене.

Апацхе Аирфлов: Олакшавање ЕТЛ-а

А рањених је било много – из ових или оних разлога. У случају правилне употребе Аирфлов-а, управо ови квадрати указују на то да подаци дефинитивно нису стигли.

Морате да погледате дневник и поново покренете пале инстанце задатака.

Кликом на било који квадрат, видећемо акције које су нам доступне:

Апацхе Аирфлов: Олакшавање ЕТЛ-а

Можете узети и очистити пале. То јест, заборављамо да је тамо нешто покварено, а исти задатак инстанце ће ићи у планер.

Апацхе Аирфлов: Олакшавање ЕТЛ-а

Јасно је да ово са мишем са свим црвеним квадратима није баш хумано - то није оно што очекујемо од Аирфлов-а. Наравно, имамо оружје за масовно уништење: Browse/Task Instances

Апацхе Аирфлов: Олакшавање ЕТЛ-а

Одаберимо све одједном и вратимо на нулу, кликните на тачну ставку:

Апацхе Аирфлов: Олакшавање ЕТЛ-а

Након чишћења наши таксији изгледају овако (већ чекају да их распореди распоред):

Апацхе Аирфлов: Олакшавање ЕТЛ-а

Везе, куке и друге варијабле

Време је да погледамо следећи ДАГ, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Да ли су сви икада урадили ажурирање извештаја? Ово је опет она: постоји списак извора одакле се добијају подаци; постоји листа где да се стави; не заборавите да затрубите када се све догодило или покварило (па, не ради се о нама, не).

Хајде да поново прођемо кроз фајл и погледамо нове нејасне ствари:

  • from commons.operators import TelegramBotSendMessage - ништа нас не спречава да направимо сопствене оператере, што смо искористили тако што смо направили мали омот за слање порука у Унблоцкед. (О овом оператеру ћемо више говорити у наставку);
  • default_args={} - даг може дистрибуирати исте аргументе свим својим оператерима;
  • to='{{ var.value.all_the_kings_men }}' - поље to нећемо имати хардкодиран, већ динамички генерисан користећи Јиња и променљиву са листом имејлова, коју сам пажљиво унео Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — услов за покретање оператера. У нашем случају, писмо ће долетети до шефова само ако су све зависности решене успешно;
  • tg_bot_conn_id='tg_main' - аргументи conn_id прихватите ИД-ове везе у којима креирамо Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - поруке у Телеграму ће одлетети само ако има палих задатака;
  • task_concurrency=1 - забрањујемо истовремено покретање више инстанци задатка једног задатка. У супротном, добићемо истовремено лансирање неколико VerticaOperator (гледа у један сто);
  • report_update >> [email, tg] - све VerticaOperator конвергирају у слању писама и порука, овако:
    Апацхе Аирфлов: Олакшавање ЕТЛ-а

    Али пошто оператери обавештавача имају различите услове покретања, само један ће радити. У приказу дрвета све изгледа мало мање визуелно:
    Апацхе Аирфлов: Олакшавање ЕТЛ-а

Рећи ћу неколико речи о мацрос и њихови пријатељи - Променљиве.

Макрои су Јиња чувари места који могу заменити различите корисне информације у аргументима оператора. На пример, овако:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} прошириће се на садржај променљиве контекста execution_date у формату YYYY-MM-DD: 2020-07-14. Најбољи део је то што су променљиве контекста приковане за одређену инстанцу задатка (квадрат у приказу стабла), а када се поново покрене, чувари места ће се проширити на исте вредности.

Додељене вредности се могу видети помоћу дугмета Рендеред на свакој инстанци задатка. Овако је задатак са слањем писма:

Апацхе Аирфлов: Олакшавање ЕТЛ-а

И тако на задатку са слањем поруке:

Апацхе Аирфлов: Олакшавање ЕТЛ-а

Комплетна листа уграђених макроа за најновију доступну верзију доступна је овде: референца макроа

Штавише, уз помоћ додатака можемо декларисати сопствене макрое, али то је друга прича.

Поред унапред дефинисаних ствари, можемо да заменимо вредности наших променљивих (ја сам то већ користио у коду изнад). Хајде да стварамо Admin/Variables пар ствари:

Апацхе Аирфлов: Олакшавање ЕТЛ-а

Све што можете да користите:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Вредност може бити скалар, или такође може бити ЈСОН. У случају ЈСОН-а:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

само користите путању до жељеног кључа: {{ var.json.bot_config.bot.token }}.

Буквално ћу рећи једну реч и показати један снимак екрана везе. Овде је све елементарно: на страници Admin/Connections креирамо везу, додајемо наше логин / лозинке и конкретније параметре тамо. Овако:

Апацхе Аирфлов: Олакшавање ЕТЛ-а

Лозинке се могу шифровати (темељније од подразумеваних) или можете изоставити тип везе (као што сам ја урадио за tg_main) - чињеница је да је листа типова уграђена у Аирфлов моделе и да се не може проширити без уласка у изворне кодове (ако одједном нисам нешто прогуглао, исправите ме), али ништа нас неће спречити да добијемо кредите само тако што ћете име.

Такође можете направити неколико веза са истим именом: у овом случају метод BaseHook.get_connection(), који нам даје везе по имену, даће насумично од неколико имењака (било би логичније направити Роунд Робин, али оставимо то на савести програмера Аирфлов-а).

Променљиве и везе су свакако кул алати, али је важно да не изгубите равнотежу: које делове својих токова чувате у самом коду, а које делове дајете Аирфлов-у на складиштење. С једне стране, брза промена вредности, на пример, поштанско сандуче, може бити згодна преко корисничког интерфејса. С друге стране, ово је још увек повратак на клик мишем, којег смо (ја) желели да се отарасимо.

Рад са везама је један од задатака куке. Уопштено говорећи, куке за Аирфлов су тачке за повезивање са услугама и библиотекама трећих страна. На пример, JiraHook ће нам отворити клијент за интеракцију са Јира (можете да померате задатке напред-назад), и уз помоћ SambaHook можете да гурнете локалну датотеку у smb-тачка.

Рашчлањивање прилагођеног оператора

И приближили смо се томе да погледамо како је направљен TelegramBotSendMessage

Код commons/operators.py са стварним оператером:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Овде, као и све остало у Аирфлов-у, све је врло једноставно:

  • Наслеђено од BaseOperator, који имплементира доста ствари специфичних за Аирфлов (погледајте у слободно време)
  • Декларисана поља template_fields, у којем ће Јиња тражити макрое за обраду.
  • Сложио праве аргументе за __init__(), подесите подразумеване вредности где је потребно.
  • Нисмо заборавили ни на иницијализацију претка.
  • Отворио одговарајућу куку TelegramBotHookод њега примио објекат клијента.
  • Замењени (редефинисани) метод BaseOperator.execute(), који ће Аирфов трзати када дође време за покретање оператера - у њему ћемо спровести главну акцију, заборављајући да се пријавимо. (Успут, пријављујемо се одмах stdout и stderr - Проток ваздуха ће све пресрести, лепо умотати, разложити где је потребно.)

Хајде да видимо шта имамо commons/hooks.py. Први део датотеке, са самом куком:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Не знам ни шта да објасним овде, само ћу приметити важне тачке:

  • Наслеђујемо, размислите о аргументима - у већини случајева то ће бити један: conn_id;
  • Превазилажење стандардних метода: ограничио сам се get_conn(), у којој добијам параметре везе по имену и само добијам одељак extra (ово је ЈСОН поље), у које сам (према сопственим упутствима!) ставио токен Телеграм бота: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ја стварам инстанцу нашег TelegramBot, дајући му одређени токен.

То је све. Можете добити клијента са куке користећи TelegramBotHook().clent или TelegramBotHook().get_conn().

И други део фајла, у коме правим микро-папер за Телеграм РЕСТ АПИ, да не бих вукао исти python-telegram-bot за једну методу sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Исправан начин је да се све сабере: TelegramBotSendMessage, TelegramBotHook, TelegramBot - у додатку, ставите у јавно спремиште и дајте га Опен Соурце-у.

Док смо све ово проучавали, ажурирања наших извештаја су успела да пропадну и да ми на каналу пошаље поруку о грешци. Идем да проверим да ли није у реду...

Апацхе Аирфлов: Олакшавање ЕТЛ-а
Нешто је пукло у нашем дужду! Зар то није оно што смо очекивали? Баш тако!

Хоћеш ли да сипаш?

Да ли осећаш да сам нешто пропустио? Изгледа да је обећао да ће пренети податке са СКЛ сервера на Вертику, а онда је узео и скренуо са теме, нитков!

Ово зверство је било намерно, једноставно сам морао да вам дешифрујем неку терминологију. Сада можете ићи даље.

Наш план је био следећи:

  1. До даг
  2. Генеришите задатке
  3. Видите како је све лепо
  4. Доделите бројеве сесија попуњавању
  5. Добијте податке са СКЛ Сервера
  6. Ставите податке у Вертица
  7. Прикупите статистику

Дакле, да бих све ово покренуо, направио сам мали додатак нашем docker-compose.yml:

доцкер-цомпосе.дб.имл

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Тамо подижемо:

  • Вертица као домаћин dwh са највећим подразумеваним подешавањима,
  • три инстанце СКЛ Сервера,
  • попуњавамо базе података у последњем неким подацима (ни у ком случају не гледајте mssql_init.py!)

Све добро покрећемо уз помоћ мало компликованије команде него прошли пут:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Оно што је наш чудесни рандомизер генерисао, можете користити ставку Data Profiling/Ad Hoc Query:

Апацхе Аирфлов: Олакшавање ЕТЛ-а
Главна ствар је да то не покажете аналитичарима

Појасни ЕТЛ сесије Нећу, тамо је све тривијално: направимо базу, у њој је знак, све умотамо у контекст менаџер, а сада радимо ово:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

сессион.пи

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Дошло је време прикупљамо наше податке са наших сто и по столова. Урадимо то уз помоћ врло непретенциозних линија:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Уз помоћ куке добијамо од Аирфлов-а pymssql-цоннецт
  2. Заменимо ограничење у облику датума у ​​захтеву - то ће бити убачено у функцију од стране машине за шаблоне.
  3. Храњење нашег захтева pandasко ће нас добити DataFrame - биће нам од користи у будућности.

Користим замену {dt} уместо параметра захтева %s не зато што сам зао Пинокио, већ зато што pandas не могу да поднесу pymssql и измиче последњи params: Listиако заиста жели tuple.
Такође имајте на уму да програмер pymssql одлучио да га више не подржава, и време је да пређемо на pyodbc.

Хајде да видимо чиме је Аирфлов напунио аргументе наших функција:

Апацхе Аирфлов: Олакшавање ЕТЛ-а

Ако нема података, онда нема сврхе да се наставља. Али такође је чудно сматрати пуњење успешним. Али ово није грешка. А-а-ах, шта да се ради?! А ево шта:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException каже Аирфлов-у да нема грешака, али ми прескачемо задатак. Интерфејс неће имати зелени или црвени квадрат, већ розе.

Хајде да бацимо наше податке више колона:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Наиме

  • База података из које смо преузели наруџбине,
  • ИД наше флоодинг сесије (биће другачији за сваки задатак),
  • Хеш из извора и ИД поруџбине - тако да у коначној бази података (где се све сипа у једну табелу) имамо јединствени ИД поруџбине.

Остаје претпоследњи корак: сипајте све у Вертику. И, што је чудно, један од најспектакуларнијих и најефикаснијих начина да се то уради је преко ЦСВ-а!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Правимо посебан пријемник StringIO.
  2. pandas љубазно ће ставити наше DataFrame у облику CSV-линије.
  3. Отворимо везу са нашом омиљеном Вертицом помоћу куке.
  4. А сада уз помоћ copy() пошаљите наше податке директно у Вертику!

Од возача ћемо узети колико је редова попуњено и рећи менаџеру сесије да је све у реду:

session.loaded_rows = cursor.rowcount
session.successful = True

То је све.

На распродаји ручно креирамо циљну плочу. Овде сам себи дозволио малу машину:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

користим VerticaOperator() Правим шему базе података и табелу (ако већ не постоје, наравно). Главна ствар је да правилно уредите зависности:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Резиме

- Па, - рече мали миш, - зар не
Јеси ли убеђен да сам ја најстрашнија животиња у шуми?

Џулија Доналдсон, Груфало

Мислим да кад бисмо моје колеге и ја имали такмичење: ко ће брзо креирати и покренути ЕТЛ процес од нуле: они са својим ССИС-ом и мишем а ја са Аирфлов-ом... А онда бисмо упоредили и лакоћу одржавања... Вау, мислим да ћете се сложити да ћу их победити на свим фронтовима!

Ако мало озбиљније, онда је Апацхе Аирфлов – описујући процесе у виду програмског кода – урадио мој посао много удобније и пријатније.

Његова неограничена проширивост, како у погледу додатака тако и предиспозиције за скалабилност, даје вам прилику да користите Аирфлов у готово свим областима: чак иу пуном циклусу прикупљања, припреме и обраде података, чак и при лансирању ракета (на Марс, од наравно).

Део завршни, референца и информација

Грабље које смо прикупили за вас

  • start_date. Да, ово је већ локални мем. Преко Даговог главног аргумента start_date све додавање. Укратко, ако наведете у start_date актуелни датум, и schedule_interval - једног дана, онда ће ДАГ почети сутра не раније.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    И нема више проблема.

    Постоји још једна грешка током извршавања која је повезана са њим: Task is missing the start_date parameter, што најчешће указује на то да сте заборавили да се повежете са даг оператором.

  • Све на једној машини. Да, и базе (сам Аирфлов и наш премаз), и веб сервер, и планер, и радници. И чак је успело. Али временом је број задатака за услуге растао, и када је ПостгреСКЛ почео да одговара на индекс за 20 с уместо за 5 мс, ми смо га узели и однели.
  • ЛоцалЕкецутор. Да, још седимо на њему, а већ смо дошли до ивице провалије. ЛоцалЕкецутор нам је до сада био довољан, али сада је време да се проширимо са најмање једним радником, а ми ћемо морати да се потрудимо да пређемо на ЦелериЕкецутор. А с обзиром на то да са њим можете да радите на једној машини, ништа вас не спречава да користите Целери чак ни на серверу, који „наравно, никада неће ући у производњу, искрено!“
  • Неупотреба уграђени алати:
    • veze за чување акредитива услуге,
    • СЛА Миссес да одговори на задатке који нису успели на време,
    • КСЦом за размену метаподатака (рекао сам metaподатака!) између даг задатака.
  • Злоупотреба поште. Па, шта да кажем? Постављена су упозорења за сва понављања палих задатака. Сада мој Гмаил на послу има >90 е-порука од Аирфлов-а, а њушка веб поште одбија да покупи и избрише више од 100 истовремено.

Више замки: Апацхе Аирфлов Питфаилс

Више алата за аутоматизацију

Да бисмо још више радили главом, а не рукама, Аирфлов нам је припремио ово:

  • РЕСТ АПИ - и даље има статус Огледног, што га не спречава да ради. Помоћу њега не можете само да добијете информације о даговима и задацима, већ и да зауставите/покренете даг, креирате ДАГ Рун или скуп.
  • ПГ - многи алати су доступни преко командне линије који нису само незгодни за коришћење преко ВебУИ, већ су генерално одсутни. На пример:
    • backfill потребно за поновно покретање инстанци задатка.
      На пример, дошли су аналитичари и рекли: „А ти, друже, имаш глупости у подацима од 1. до 13. јануара! Поправи, поправи, поправи, поправи!" А ти си таква плоча:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Основна услуга: initdb, resetdb, upgradedb, checkdb.
    • run, што вам омогућава да покренете задатак једне инстанце, па чак и да оцените све зависности. Штавише, можете га покренути преко LocalExecutor, чак и ако имате кластер целера.
    • Ради отприлике исту ствар test, само иу базама не пише ништа.
    • connections омогућава масовно стварање веза из љуске.
  • Питхон АПИ - прилично хардкор начин интеракције, који је намењен додацима, а не ројење у њему малим рукама. Али ко ће нас спречити да идемо /home/airflow/dags, трцати ipython и почети да се зезаш? Можете, на пример, да извезете све везе са следећим кодом:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Повезивање са базом метаподатака Аирфлов. Не препоручујем да пишете на њега, али добијање стања задатака за различите специфичне метрике може бити много брже и лакше него преко било ког АПИ-ја.

    Рецимо да нису сви наши задаци идемпотентни, али понекад могу пасти, и то је нормално. Али неколико блокада је већ сумњиво и било би неопходно проверити.

    Чувајте се СКЛ!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

референце

И наравно, првих десет линкова из издавања Гугла је садржај фолдера Аирфлов из мојих обележивача.

И линкови коришћени у чланку:

Извор: ввв.хабр.цом