Апачи проток на воздух: Олеснување на ETL

Здраво, јас сум Дмитриј Логвиненко - инженер за податоци на Одделот за аналитика на групата компании Везет.

Ќе ви кажам за прекрасна алатка за развој на ETL процеси - Apache Airflow. Но, Airflow е толку разновиден и повеќеслоен што треба внимателно да го погледнете дури и ако не сте вклучени во протокот на податоци, но имате потреба периодично да стартувате какви било процеси и да го следите нивното извршување.

И да, не само што ќе кажам, туку и ќе покажам: програмата има многу код, слики од екранот и препораки.

Апачи проток на воздух: Олеснување на ETL
Она што обично го гледате кога ќе го пребарувате на гугл зборот Airflow / Wikimedia Commons

Содржина

Вовед

Apache Airflow е исто како Џанго:

  • напишано во питон
  • има одличен административен панел,
  • може да се прошири на неодредено време

- само подобро, а направен е за сосема други цели, имено (како што пишува пред ката):

  • работи и следење задачи на неограничен број машини (колку што ќе ви дозволат целер/Кубернети и вашата совест)
  • со генерирање на динамичен работен тек од многу лесен за пишување и разбирање Python код
  • и можност за поврзување на било какви бази на податоци и API-и едни со други користејќи готови компоненти и домашни приклучоци (што е исклучително едноставно).

Ние користиме Apache Airflow вака:

  • собираме податоци од различни извори (многу примероци на SQL Server и PostgreSQL, различни API со метрика на апликацијата, дури и 1C) во DWH и ODS (имаме Vertica и Clickhouse).
  • колку напредна cron, кој ги започнува процесите на консолидација на податоците на ODS, а исто така го следи нивното одржување.

До неодамна, нашите потреби ги покриваше еден мал сервер со 32 јадра и 50 GB RAM. Во протокот на воздух, ова функционира:

  • повеќе 200 дари (всушност работни текови, во кои ги наполнивме задачите),
  • во секоја во просек 70 задачи,
  • оваа добрина започнува (исто така во просек) еднаш на час.

А за тоа како се проширивме, ќе напишам подолу, но сега да го дефинираме über-проблемот што ќе го решиме:

Има три оригинални SQL сервери, секој со 50 бази на податоци - примероци од еден проект, соодветно, имаат иста структура (скоро секаде, mua-ha-ha), што значи дека секој има табела Orders (за среќа, табела со тоа името може да се втурне во секој бизнис). Ги земаме податоците со додавање полиња за услуги (изворниот сервер, изворната база на податоци, ИД на задачата ETL) и наивно ги фрламе во, да речеме, Вертика.

Ајде да одиме!

Главниот дел, практичен (и малку теоретски)

Зошто ние (и вие)

Кога дрвјата беа големи, а јас бев едноставен SQL-Шик во една руска малопродажба, ги измамивме процесите на ETL ака текови на податоци користејќи две алатки кои ни се достапни:

  • Центар за напојување на информатика - екстремно распространет систем, исклучително продуктивен, со сопствен хардвер, сопствена верзија. Искористив не дај Боже 1% од неговите можности. Зошто? Па, пред се, овој интерфејс, некаде од 380-тите, ментално нè притиска. Второ, оваа алатка е дизајнирана за екстремно фенси процеси, бесна повторна употреба на компоненти и други многу важни-претпријатија-трикови. За фактот дека чини, како крилото на Ербас АXNUMX / година, нема да кажеме ништо.

    Внимавајте, скриншот може малку да ги повреди луѓето под 30 години

    Апачи проток на воздух: Олеснување на ETL

  • Сервер за интеграција на SQL Server - го користевме овој другар во нашите интра-проектни текови. Па, всушност: ние веќе користиме SQL Server, и би било некако неразумно да не ги користиме неговите ETL алатки. Сè во него е добро: и интерфејсот е убав, и извештаите за напредокот... Но, ова не е причината зошто ги сакаме софтверските производи, о, не за ова. Верзија на тоа dtsx (што е XML со мешани јазли на зачувај) можеме, но која е поентата? Како да направите пакет со задачи што ќе влече стотици табели од еден сервер на друг? Да, колку сто, ќе ти падне показалецот од дваесет парчиња, со кликнување на копчето на глувчето. Но, дефинитивно изгледа помодерно:

    Апачи проток на воздух: Олеснување на ETL

Сигурно баравме излези. Дури и случај речиси дошол до генератор на пакети SSIS напишан ...

…и потоа ме најде нова работа. И Apache Airflow ме престигна на неа.

Кога дознав дека описите на процесот ETL се едноставни Python код, едноставно не танцував од радост. Вака се верзии и се разликуваа тековите на податоци, а прелевањето табели со една структура од стотици бази на податоци во една цел стана прашање на код на Пајтон во еден и пол или два екрани од 13 инчи.

Составување на кластерот

Да не организираме комплетна градинка и да не зборуваме за сосема очигледни работи овде, како инсталирање на Airflow, вашата избрана база на податоци, Celery и други случаи опишани во доковите.

За да можеме веднаш да започнеме со експерименти, скицирав docker-compose.yml во кој:

  • Ајде всушност да подигнеме Струење: Распоредувач, веб-сервер. Цветот, исто така, ќе се врти таму за да ги следи задачите на Целерот (бидејќи веќе е втурнат apache/airflow:1.10.10-python3.7, но не ни пречи)
  • PostgreSQL, во кој Airflow ќе ги запише своите сервисни информации (податоци за распоредувачот, статистика за извршување итн.), а Celery ќе ги означува завршените задачи;
  • Redis, кој ќе делува како брокер за задачи за Целер;
  • Работник со целер, кои ќе бидат ангажирани за директно извршување на задачите.
  • Во папката ./dags ќе ги додадеме нашите датотеки со опис на dags. Тие ќе бидат подигнати на мува, така што нема потреба да жонглирате со целиот оџак по секое кивање.

На некои места, кодот во примерите не е целосно прикажан (за да не се натрупува текстот), но некаде се менува во процесот. Целосните примери за работни кодови може да се најдат во складиштето https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Забелешки:

  • Во склопувањето на композицијата во голема мера се потпирав на добро познатата слика пукел/докер-проток на воздух - не заборавајте да го проверите. Можеби не ви треба ништо друго во вашиот живот.
  • Сите поставки за проток на воздух се достапни не само преку airflow.cfg, но и преку променливите на околината (благодарение на програмерите), кои злонамерно ги искористив.
  • Секако, не е подготвен за производство: намерно не ставав отчукувања на срцето на контејнерите, не се замарав со безбедноста. Но, го направив минимумот погоден за нашите експериментатори.
  • Забележи го тоа:
    • Папката dag мора да биде достапна и за распоредувачот и за работниците.
    • Истото важи и за сите библиотеки од трети страни - сите тие мора да се инсталираат на машини со распоредувач и работници.

Па, сега е едноставно:

$ docker-compose up --scale worker=3

Откако сè ќе се зголеми, можете да ги погледнете веб-интерфејсите:

Основни концепти

Ако не разбравте ништо во сите овие „дамки“, тогаш еве краток речник:

  • Распоредувачот - најважниот вујко во Airflow, кој контролира роботите да работат напорно, а не личноста: го следи распоредот, ажурира дамки, започнува задачи.

    Во принцип, во постарите верзии, тој имаше проблеми со меморијата (не, не амнезија, туку протекување) и параметарот на наследството остана дури и во конфигурациите run_duration — неговиот интервал за рестартирање. Но, сега се е во ред.

  • ДАГ (познато како „даг“) - „насочен ацикличен график“, но таквата дефиниција ќе им каже на неколку луѓе, но всушност тоа е контејнер за задачи кои се во интеракција едни со други (види подолу) или аналог на пакетот во SSIS и Workflow во Informatica .

    Покрај дамките, може сè уште да има поддаги, но најверојатно нема да дојдеме до нив.

  • ДАГ Испратена - иницијализиран даг, кој е доделен свој execution_date. Даграните од истата даг можат да работат паралелно (ако сте ги направиле вашите задачи идемпотентни, се разбира).
  • Оператор се парчиња код одговорни за извршување на одредена акција. Постојат три типа на оператори:
    • акцијакако нашиот омилен PythonOperator, кој може да изврши кој било (валиден) Python код;
    • пренос на, кои пренесуваат податоци од место до место, да речеме, MsSqlToHiveTransfer;
    • сензор од друга страна, ќе ви овозможи да реагирате или да го забавите понатамошното извршување на даг додека не се случи некој настан. HttpSensor може да ја повлече наведената крајна точка и кога се чека саканиот одговор, започнете го преносот GoogleCloudStorageToS3Operator. Истражувачкиот ум ќе праша: „Зошто? На крајот на краиштата, можете да правите повторувања токму во операторот!“ И тогаш, за да не се заглави базенот на задачи со суспендирани оператори. Сензорот се вклучува, проверува и умира пред следниот обид.
  • Задача - декларираните оператори, без разлика на типот, и прикачени на даг се унапредуваат во ранг на задача.
  • пример на задача - кога генералниот планер одлучи дека е време да испрати задачи во битка на изведувачите-работници (точно на лице место, ако користиме LocalExecutor или на оддалечен јазол во случај на CeleryExecutor), им доделува контекст (т.е. збир на променливи - параметри за извршување), ги проширува шаблоните за команди или барања и ги здружува.

Ние генерираме задачи

Прво, да ја претставиме општата шема на нашето тесто, а потоа сè повеќе ќе се нурнеме во деталите, бидејќи применуваме некои нетривијални решенија.

Значи, во наједноставна форма, таквата дамка ќе изгледа вака:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Ајде да го сфатиме:

  • Прво, ги увезуваме потребните либови и нешто друго;
  • sql_server_ds - Дали List[namedtuple[str, str]] со имињата на врските од Airflow Connections и базите на податоци од кои ќе ја земеме нашата плоча;
  • dag - објавата на нашата даг, која нужно мора да биде ин globals(), инаку Airflow нема да го најде. Даг исто така треба да каже:
    • како се вика orders - ова име потоа ќе се појави во веб-интерфејсот,
    • дека ќе работи од полноќ на осми јули,
    • и треба да работи, приближно на секои 6 часа (за тешки момци овде наместо timedelta() дозволената cron-линија 0 0 0/6 ? * * *, за помалку кул - израз како @daily);
  • workflow() ќе ја заврши главната работа, но не сега. Засега, само ќе го фрлиме нашиот контекст во дневникот.
  • И сега едноставната магија за создавање задачи:
    • трчаме низ нашите извори;
    • иницијализираат PythonOperator, кој ќе ја изврши нашата кукла workflow(). Не заборавајте да наведете уникатно (во рамките на даг) име на задачата и да ја врзете самата даг. Знаме provide_context за возврат, ќе внесе дополнителни аргументи во функцијата, кои внимателно ќе ги собереме користејќи ги **context.

Засега тоа е се. Што добивме:

  • нова дамка во веб-интерфејсот,
  • сто и пол сто задачи кои ќе се извршуваат паралелно (доколку тоа го дозволуваат поставките Airflow, Celery и капацитетот на серверот).

Па, речиси го сфатив.

Апачи проток на воздух: Олеснување на ETL
Кој ќе ги инсталира зависностите?

За да ја поедноставам целата работа, се зафркнав docker-compose.yml обработка requirements.txt на сите јазли.

Сега го нема:

Апачи проток на воздух: Олеснување на ETL

Сивите квадрати се примери на задачи обработени од распоредувачот.

Чекаме малку, работните задачи ги кршат:

Апачи проток на воздух: Олеснување на ETL

Зелените, секако, успешно си ја завршија работата. Црвените не се многу успешни.

Патем, нема папка на нашиот прод ./dags, нема синхронизација помеѓу машините - сите дамки лежат во git на нашиот Gitlab, а Gitlab CI дистрибуира ажурирања на машините кога се спојуваат master.

Малку за Цветот

Додека работниците ни ги тресат цуцлите, да се потсетиме на уште една алатка која може да ни покаже нешто - Цвет.

Првата страница со збирни информации за работничките јазли:

Апачи проток на воздух: Олеснување на ETL

Најинтензивната страница со задачи што отидоа на работа:

Апачи проток на воздух: Олеснување на ETL

Најдосадната страница со статус на нашиот брокер:

Апачи проток на воздух: Олеснување на ETL

Најсветлата страница е со графикони за статус на задачи и време на нивното извршување:

Апачи проток на воздух: Олеснување на ETL

Го товариме недоволно

Значи, сите задачи се разработени, можете да ги однесете ранетите.

Апачи проток на воздух: Олеснување на ETL

И имаше многу ранети - од една или друга причина. Во случај на правилна употреба на Airflow, токму овие квадрати покажуваат дека податоците дефинитивно не пристигнале.

Треба да го гледате дневникот и да ги рестартирате паднатите примери на задачи.

Со кликнување на кој било квадрат, ќе ги видиме дејствата што ни се достапни:

Апачи проток на воздух: Олеснување на ETL

Можете да земете и да направите Расчистување на паднатите. Односно, забораваме дека нешто не успеа таму, а истата задача за пример ќе оди кај распоредувачот.

Апачи проток на воздух: Олеснување на ETL

Јасно е дека тоа да се прави со глувчето со сите црвени квадрати не е многу хумано - тоа не е она што го очекуваме од Airflow. Нормално, имаме оружје за масовно уништување: Browse/Task Instances

Апачи проток на воздух: Олеснување на ETL

Ајде да избереме сè одеднаш и да се вратиме на нула, кликнете на точната ставка:

Апачи проток на воздух: Олеснување на ETL

По чистењето, нашите такси изгледаат вака (тие веќе чекаат распоредувачот да ги закаже):

Апачи проток на воздух: Олеснување на ETL

Врски, куки и други променливи

Време е да го погледнеме следниот ДАГ, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Дали сите некогаш направиле ажурирање на извештајот? Ова е повторно таа: има список на извори од каде да се добијат податоците; има листа каде да се стави; не заборавајте да завивате кога сè се случило или пукнало (добро, ова не е за нас, не).

Ајде повторно да ја разгледаме датотеката и да ги погледнеме новите нејасни работи:

  • from commons.operators import TelegramBotSendMessage - ништо не не спречува да направиме сопствени оператори, што го искористивме правејќи мала обвивка за испраќање пораки до Unblocked. (Подолу ќе зборуваме повеќе за овој оператор);
  • default_args={} - dag може да ги дистрибуира истите аргументи на сите свои оператори;
  • to='{{ var.value.all_the_kings_men }}' - Поле to нема да имаме хардкодирано, туку динамично генерирано со помош на Jinja и променлива со листа на е-пошта, која внимателно ја ставив Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — услов за стартување на операторот. Во нашиот случај, писмото ќе лета до газдите само ако сите зависности се решат успешно;
  • tg_bot_conn_id='tg_main' - аргументи conn_id прифатете ги идентификаторите за поврзување што ги создаваме Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - пораките во Telegram ќе одлетаат само ако има паднати задачи;
  • task_concurrency=1 - забрануваме истовремено стартување на неколку примери на задачи од една задача. Во спротивно, ќе добиеме истовремено лансирање на неколку VerticaOperator (гледајќи во една маса);
  • report_update >> [email, tg] - сè VerticaOperator се спојуваат во испраќањето писма и пораки, како ова:
    Апачи проток на воздух: Олеснување на ETL

    Но, бидејќи операторите на известувачите имаат различни услови за стартување, само еден ќе работи. Во приказот на дрвото, сè изгледа малку помалку визуелно:
    Апачи проток на воздух: Олеснување на ETL

Ќе кажам неколку зборови за макроа и нивните пријатели - променливи.

Макроата се заштитни места на Jinja кои можат да заменат различни корисни информации во аргументи на операторот. На пример, вака:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} ќе се прошири на содржината на контекстната променлива execution_date во формат YYYY-MM-DD: 2020-07-14. Најдобриот дел е што контекстуалните променливи се приковани на одреден пример на задача (квадрат во приказот на дрвото), а кога ќе се рестартираат, држачите на места ќе се прошират на истите вредности.

Доделените вредности може да се видат со помош на копчето Rendered на секој примерок на задача. Вака задачата со испраќање писмо:

Апачи проток на воздух: Олеснување на ETL

И така на задачата со испраќање порака:

Апачи проток на воздух: Олеснување на ETL

Комплетна листа на вградени макроа за најновата достапна верзија е достапна овде: референца за макроа

Згора на тоа, со помош на приклучоци, можеме да декларираме свои макроа, но тоа е друга приказна.

Покрај претходно дефинираните работи, можеме да ги замениме вредностите на нашите променливи (веќе го користев ова во кодот погоре). Ајде да создадеме внатре Admin/Variables неколку работи:

Апачи проток на воздух: Олеснување на ETL

Сè што можете да користите:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Вредноста може да биде скалар, или може да биде и JSON. Во случај на JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

само користете ја патеката до саканиот клуч: {{ var.json.bot_config.bot.token }}.

Буквално ќе кажам еден збор и ќе покажам еден скриншот за соединенија. Сè е елементарно овде: на страницата Admin/Connections создаваме врска, ги додаваме нашите најавувања / лозинки и поспецифични параметри таму. Како ова:

Апачи проток на воздух: Олеснување на ETL

Лозинките може да се шифрираат (потемелно од стандардното), или можете да го изоставите типот на врската (како што направив за tg_main) - факт е дека списокот на типови е харджичен во моделите Airflow и не може да се прошири без да се навлезе во изворните кодови (ако ненадејно не прогуглав нешто, поправете ме), но ништо нема да не спречи да добиеме кредити само со име.

Можете исто така да направите неколку врски со исто име: во овој случај, методот BaseHook.get_connection(), што ни добива врски по име, ќе даде случајно од неколку имењаци (пологично би било да се направи Round Robin, но да го оставиме тоа на совеста на развивачите на Airflow).

Променливите и врските се секако одлични алатки, но важно е да не се изгуби рамнотежата: кои делови од вашите текови ги складирате во самиот код и кои делови му ги давате на Airflow за складирање. Од една страна, може да биде погодно брзо да ја промените вредноста, на пример, поштенско сандаче, преку интерфејсот. Од друга страна, ова е сепак враќање на кликнувањето на глувчето, од кое ние (јас) сакавме да се ослободиме.

Работата со врски е една од задачите куки. Општо земено, куките за проток на воздух се точки за поврзување со услуги и библиотеки од трети страни. На пр. JiraHook ќе ни отвори клиент за да комуницираме со Jira (можете да ги преместувате задачите напред и назад), и со помош на SambaHook можете да туркате локална датотека до smb- точка.

Парсирање на прилагодениот оператор

И блиску дојдовме да погледнеме како е направен TelegramBotSendMessage

Код commons/operators.py со вистинскиот оператор:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Овде, како и сè друго во Airflow, сè е многу едноставно:

  • Наследен од BaseOperator, кој имплементира неколку работи специфични за протокот на воздух (погледнете го вашето слободно време)
  • Декларирани полиња template_fields, во која Џинџа ќе бара макроа за обработка.
  • Подреди вистинските аргументи за __init__(), поставете ги стандардните вредности каде што е потребно.
  • Не заборавивме ниту на иницијализацијата на предокот.
  • Ја отвори соодветната кука TelegramBotHookдобил клиентски објект од него.
  • Преземен (редефиниран) метод BaseOperator.execute(), кој Airfow ќе го замрси кога ќе дојде време да се стартува операторот - во него ќе ја спроведеме главната акција, заборавајќи да се најавиме. (Ние се најавуваме, патем, веднаш stdout и stderr - Протокот на воздух ќе пресретне сè, ќе го завитка убаво, ќе го разложи каде што е потребно.)

Ајде да видиме што имаме commons/hooks.py. Првиот дел од датотеката, со самата кука:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Не знам ни што да објаснам овде, само ќе ги забележам важните точки:

  • Ние наследуваме, размислете за аргументите - во повеќето случаи тоа ќе биде еден: conn_id;
  • Надминувачки стандардни методи: се ограничив get_conn(), во која ги добивам параметрите за поврзување по име и само го добивам делот extra (ова е JSON поле), во кое јас (според моите упатства!) го ставам бот токенот Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Јас создавам пример од нашата TelegramBot, давајќи му специфичен знак.

Тоа е се. Можете да добиете клиент од кука користејќи TelegramBotHook().clent или TelegramBotHook().get_conn().

И вториот дел од датотеката, во која правам микрообвивка за Telegram REST API, за да не го влечете истото python-telegram-bot за еден метод sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Правилниот начин е да се собере сето тоа: TelegramBotSendMessage, TelegramBotHook, TelegramBot - во додатокот, ставете во јавно складиште и дајте го на Open Source.

Додека го проучувавме сето ова, ажурирањата на нашите извештаи успеаја успешно да пропаднат и да ми испратат порака за грешка на каналот. Ќе проверам дали не е во ред...

Апачи проток на воздух: Олеснување на ETL
Нешто пукна во нашиот дужд! Зарем тоа не го очекувавме? Точно!

Ќе истуриш?

Дали чувствувате дека нешто пропуштив? Изгледа вети дека ќе префрли податоци од SQL Server на Vertica, а потоа ги зеде и се оттргна од темата, ѓубрето!

Ова ѕверство беше намерно, едноставно морав да ти дешифрирам некоја терминологија. Сега можете да одите понатаму.

Нашиот план беше овој:

  1. Дали даг
  2. Генерирајте задачи
  3. Погледнете колку е се убаво
  4. Доделете броеви на сесии за пополнување
  5. Добијте податоци од SQL Server
  6. Ставете податоци во Вертика
  7. Собира статистика

Така, за сето ова да почне да функционира, направив мал додаток на нашата docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Таму подигаме:

  • Вертика како домаќин dwh со најмногу стандардни поставки,
  • три примери на SQL Server,
  • ние ги пополнуваме базите на податоци во вторите со некои податоци (во никој случај не гледајте mssql_init.py!)

Го лансираме сето добро со помош на малку покомплицирана команда од минатиот пат:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Она што го генерира нашиот чудотворен рандомизатор, можете да ја користите ставката Data Profiling/Ad Hoc Query:

Апачи проток на воздух: Олеснување на ETL
Главната работа не е да им се покаже на аналитичарите

елаборираат на ETL сесии Нема, таму сè е тривијално: правиме основа, има знак во неа, завиткуваме сè со контекстуален менаџер и сега го правиме ова:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

сесија.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Времето дојде собирајте ги нашите податоци од нашите сто и пол стотина маси. Ајде да го направиме ова со помош на многу непретенциозни линии:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Со помош на кука добиваме од Airflow pymssql- поврзете
  2. Ајде да замениме ограничување во форма на датум во барањето - тоа ќе биде фрлено во функцијата од моторот на шаблонот.
  3. Хранење на нашето барање pandasкој ќе не добие DataFrame - ќе ни биде од корист во иднина.

Јас користам замена {dt} наместо параметар за барање %s не затоа што сум злобен Пинокио, туку затоа што pandas не може да се справи pymssql и го лизга последниот params: Listиако навистина сака tuple.
Исто така, забележете дека инвеститорот pymssql одлучи повеќе да не го поддржува, и време е да се иселиме pyodbc.

Ајде да видиме со што Airflow ги наполнил аргументите на нашите функции:

Апачи проток на воздух: Олеснување на ETL

Ако нема податоци, тогаш нема смисла да се продолжи. Но, исто така е чудно полнењето да се смета за успешно. Но, ова не е грешка. А-а-ах, што да правам?! А еве што:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException ќе му каже на Airflow дека нема грешки, но ние ја прескокнуваме задачата. Интерфејсот нема да има зелен или црвен квадрат, туку розов.

Ајде да ги фрлиме нашите податоци повеќе колони:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Имено

  • Базата на податоци од која ги земавме нарачките,
  • ID на нашата поплавна сесија (ќе биде различно за секоја задача),
  • Хеш од изворот и ID на нарачка - така што во конечната база на податоци (каде што сè е преточено во една табела) имаме единствен ID на нарачка.

Останува претпоследниот чекор: истурете сè во Вертика. И, чудно е доволно, еден од најспектакуларните и најефикасните начини да го направите ова е преку CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Правиме специјален ресивер StringIO.
  2. pandas љубезно ќе ги стави нашите DataFrame во форма CSV- линии.
  3. Ајде да отвориме врска со нашата омилена Вертика со кука.
  4. И сега со помош copy() испратете ги нашите податоци директно до Вертика!

Ќе земеме од возачот колку линии се пополнети и ќе му кажеме на менаџерот на сесијата дека сè е во ред:

session.loaded_rows = cursor.rowcount
session.successful = True

Тоа е се.

На продажба, ние рачно ја креираме целната плоча. Еве си дозволив мала машина:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

користам VerticaOperator() Јас креирам шема на база на податоци и табела (ако веќе не постојат, се разбира). Главната работа е правилно да ги распоредите зависностите:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Сумирање

- Па, - рече малото глувче, - нели, сега
Дали сте убедени дека јас сум најстрашното животно во шумата?

Џулија Доналдсон, Груфало

Мислам дека ако јас и моите колеги имавме конкуренција: кој брзо ќе создаде и ќе започне процес на ETL од нула: тие со нивниот SSIS и глушец и јас со Airflow ... И тогаш ќе ја споредиме и леснотијата на одржување ... Леле, мислам дека ќе се согласите дека ќе ги победам на сите фронтови!

Ако малку посериозно, тогаш Apache Airflow - со опишување процеси во форма на програмски код - ја заврши мојата работа многу поудобно и попријатно.

Неговата неограничена екстензивност, и во смисла на приклучоци и предиспозиција за приспособливост, ви дава можност да го користите Airflow во речиси секоја област: дури и во целиот циклус на собирање, подготовка и обработка на податоци, дури и при лансирање ракети (на Марс, на курс).

Конечниот дел, референца и информации

Греблото што го собравме за вас

  • start_date. Да, ова е веќе локален мем. Преку главниот аргумент на Даг start_date сите поминуваат. Накратко, ако наведете во start_date тековниот датум и schedule_interval - Еден ден, тогаш ДАГ ќе почне утре не порано.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    И нема повеќе проблеми.

    Постои уште една грешка во времето на траење поврзана со неа: Task is missing the start_date parameter, што најчесто покажува дека сте заборавиле да се поврзете со операторот даг.

  • Сите на една машина. Да, и бази (самиот проток на воздух и нашата обвивка), и веб-сервер, и распоредувач и работници. И дури и успеа. Но, со текот на времето, бројот на задачи за услугите растеше и кога PostgreSQL почна да одговара на индексот за 20 секунди наместо за 5 ms, ние го зедовме и го однесовме.
  • Локален извршител. Да, ние сè уште седиме на него, а веќе дојдовме до работ на бездната. Досега ни беше доволен LocalExecutor, но сега е време да се прошириме со барем еден работник и ќе треба да работиме напорно за да се преселиме во CeleryExecutor. И со оглед на фактот дека можете да работите со него на една машина, ништо не ве спречува да користите Celery дури и на сервер, кој „се разбира, никогаш нема да влезе во производство, искрено!“
  • Неупотреба вградени алатки:
    • Врски за складирање на акредитиви за услуги,
    • SLA промаши да одговори на задачи кои не успеале навреме,
    • xcom за размена на метаподатоци (реков целподатоци!) помеѓу даг задачи.
  • Злоупотреба на пошта. Па, што да кажам? Беа поставени предупредувања за сите повторувања на паднатите задачи. Сега мојот работен Gmail има повеќе од 90 илјади е-пошта од Airflow, а муцката за веб-пошта одбива да земе и избрише повеќе од 100 истовремено.

Повеќе стапици: Замки за проток на воздух на Apache

Повеќе алатки за автоматизација

За да работиме уште повеќе со глава, а не со раце, Airflow ни го подготви ова:

  • ОСТАНАТОТО API - се уште има статус на Експериментал, што не го спречува да работи. Со него, вие не само што можете да добивате информации за дамки и задачи, туку и да запрете/започнете даг, да креирате DAG Run или базен.
  • CLI - многу алатки се достапни преку командната линија кои не само што се незгодни за користење преку WebUI, туку генерално ги нема. На пример:
    • backfill потребни за рестартирање на примероци на задачи.
      На пример, дојдоа аналитичари и рекоа: „А ти, другар, имаш глупости во податоците од 1 до 13 јануари! Поправете го, поправете го, поправете го, поправете го!" И вие сте таква рингла:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Основна услуга: initdb, resetdb, upgradedb, checkdb.
    • run, што ви овозможува да извршите една задача на пример, па дури и да дадете бод за сите зависности. Покрај тоа, можете да го извршите преку LocalExecutor, дури и ако имате кластер Целер.
    • Го прави скоро истото test, само исто така во бази не пишува ништо.
    • connections овозможува масовно создавање на врски од школка.
  • АПИ на Пајтон - прилично хардкор начин на интеракција, кој е наменет за приклучоци, а не рој во него со мали раце. Но, кој ќе не спречи да одиме /home/airflow/dags, трчај ipython и да почнеш да се плеткаш? Можете, на пример, да ги извезете сите врски со следниов код:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Поврзување со метабазата на проток на воздух. Не препорачувам да пишувате на него, но добивањето состојби на задачи за различни специфични метрики може да биде многу побрзо и полесно отколку да користите некој од API-ите.

    Да речеме дека не сите наши задачи се идемотентни, но понекогаш може да паднат и тоа е нормално. Но, неколку блокади се веќе сомнителни и би требало да се провери.

    Пазете се SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

референци

И, се разбира, првите десет линкови од издавањето на Google се содржината на папката Airflow од моите обележувачи.

И врските користени во статијата:

Извор: www.habr.com