Apache Airflow: Улесняване на ETL

Здравейте, аз съм Дмитрий Логвиненко - инженер по данни от отдела за анализ на групата компании Vezet.

Ще ви разкажа за един прекрасен инструмент за разработване на ETL процеси - Apache Airflow. Но Airflow е толкова гъвкав и многостранен, че трябва да го разгледате по-отблизо, дори ако не участвате в потоци от данни, но имате нужда периодично да стартирате всякакви процеси и да наблюдавате тяхното изпълнение.

И да, не само ще разкажа, но и ще покажа: програмата има много код, екранни снимки и препоръки.

Apache Airflow: Улесняване на ETL
Това, което обикновено виждате, когато търсите в Google думата Airflow / Wikimedia Commons

Таблица на съдържанието

въведение

Apache Airflow е точно като Django:

  • написан на python
  • има страхотен админ панел,
  • разширяема за неопределено време

- само по-добре и е направено за съвсем други цели, а именно (както е написано преди кат):

  • изпълнение и наблюдение на задачи на неограничен брой машини (колкото Celery / Kubernetes и съвестта ви позволяват)
  • с динамично генериране на работен поток от много лесен за писане и разбиране код на Python
  • и възможност за свързване на всякакви бази данни и API помежду си, като се използват както готови компоненти, така и домашно направени плъгини (което е изключително просто).

Използваме Apache Airflow така:

  • събираме данни от различни източници (много екземпляри на SQL Server и PostgreSQL, различни API с показатели на приложенията, дори 1C) в DWH и ODS (имаме Vertica и Clickhouse).
  • колко напреднал cron, който стартира процесите на консолидация на данни в ODS, а също така следи тяхната поддръжка.

До скоро нуждите ни се покриваха от един малък сървър с 32 ядра и 50 GB RAM. В Airflow това работи:

  • още 200 dag (всъщност работни потоци, в които напъхахме задачи),
  • във всеки средно 70 задачи,
  • тази доброта започва (също средно) веднъж на час.

А за това как се разширихме, ще пиша по-долу, но сега нека дефинираме über-проблема, който ще решим:

Има три изходни SQL сървъра, всеки с 50 бази данни - екземпляри на един проект, съответно те имат една и съща структура (почти навсякъде, муа-ха-ха), което означава, че всеки има таблица Orders (за щастие, таблица с това името може да бъде прокарано във всеки бизнес). Вземаме данните, като добавяме сервизни полета (изходен сървър, изходна база данни, ETL идентификатор на задача) и наивно ги хвърляме, да речем, във Vertica.

Да вървим!

Основната част, практическа (и малко теоретична)

Защо ние (и вие)

Когато дърветата бяха големи, а аз бях прост SQL-schik в един руски магазин на дребно, ние измамихме ETL процеси, известни още като потоци от данни, използвайки два налични инструмента:

  • Informatica Power Center - изключително разпространена система, изключително продуктивна, със собствен хардуер, собствена версия. Използвах дай Боже 1% от възможностите му. Защо? Ами, първо, този интерфейс, някъде от 380-те години, психически ни натовари. Второ, тази измишльотина е предназначена за изключително фантастични процеси, яростно повторно използване на компоненти и други много важни трикове за предприятието. За това какво струва, като крилото на Airbus AXNUMX / година, няма да кажем нищо.

    Внимавайте, екранна снимка може да нарани малко хора под 30 години

    Apache Airflow: Улесняване на ETL

  • Сървър за интеграция на SQL Server - използвахме този другар в нашите вътрешнопроектни потоци. Е, всъщност: ние вече използваме SQL Server и би било някак неразумно да не използваме неговите ETL инструменти. Всичко в него е добро: и интерфейсът е красив, и отчетите за напредъка ... Но не затова обичаме софтуерните продукти, о, не за това. Версия го dtsx (което е XML с възли, разбъркани при запазване) можем, но какъв е смисълът? Какво ще кажете да създадете пакет от задачи, който ще плъзга стотици таблици от един сървър на друг? Да, какви сто, показалецът ви ще падне от двадесет парчета, като щракнете върху бутона на мишката. Но определено изглежда по-модерно:

    Apache Airflow: Улесняване на ETL

Определено търсихме изходи. Случай дори почти стигна до самонаписан генератор на SSIS пакети ...

…и тогава нова работа ме намери. И Apache Airflow ме изпревари на него.

Когато разбрах, че описанията на ETL процеси са прост код на Python, просто не танцувах от радост. Ето как потоците от данни бяха версионирани и разграничени, а изливането на таблици с една структура от стотици бази данни в една цел стана въпрос на код на Python в един и половина или два 13-инчови екрана.

Сглобяване на клъстера

Нека не организираме напълно детска градина и не говорим за напълно очевидни неща тук, като инсталиране на Airflow, избраната от вас база данни, Celery и други случаи, описани в доковете.

За да можем веднага да започнем експерименти, скицирах docker-compose.yml в който:

  • Нека всъщност рейзваме Airflow: Планировчик, Уеб сървър. Flower също ще се върти там, за да наблюдава задачите на Celery (защото вече е натиснат в apache/airflow:1.10.10-python3.7, но ние нямаме нищо против)
  • PostgreSQL, в който Airflow ще записва сервизната си информация (данни за планировчика, статистики за изпълнение и т.н.), а Celery ще маркира изпълнени задачи;
  • Redis, който ще действа като брокер на задачи за Celery;
  • Целина работник, които ще бъдат ангажирани с прякото изпълнение на задачите.
  • Към папка ./dags ще добавим нашите файлове с описанието на dags. Те ще бъдат взети в движение, така че няма нужда да жонглирате с целия стек след всяко кихане.

На някои места кодът в примерите не е показан изцяло (за да не претрупва текста), но някъде е модифициран в процеса. Пълни примери за работещ код могат да бъдат намерени в хранилището https://github.com/dm-logv/airflow-tutorial.

докер-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Забележки:

  • При сглобяването на композицията до голяма степен заложих на добре познатия образ puckel/docker-въздушен поток - не забравяйте да го проверите. Може би нямате нужда от нищо друго в живота си.
  • Всички настройки на Airflow са достъпни не само чрез airflow.cfg, но и чрез променливи на средата (благодарение на разработчиците), от които злонамерено се възползвах.
  • Естествено, не е готов за производство: умишлено не поставих сърдечни удари на контейнери, не се занимавах със сигурността. Но направих минимума, подходящ за нашите експериментатори.
  • Отбележи, че:
    • Папката dag трябва да е достъпна както за планировчика, така и за работниците.
    • Същото важи и за всички библиотеки на трети страни - всички те трябва да бъдат инсталирани на машини с планировчик и работници.

Е, сега е просто:

$ docker-compose up --scale worker=3

След като всичко се издигне, можете да разгледате уеб интерфейсите:

Основни понятия

Ако не сте разбрали нищо във всички тези „даги“, тогава ето кратък речник:

  • Scheduler - най-важният чичо в Airflow, който контролира, че роботите работят усилено, а не човек: следи графика, актуализира dags, стартира задачи.

    Като цяло в по-старите версии той имаше проблеми с паметта (не, не амнезия, а течове) и наследеният параметър дори остана в конфигурациите run_duration — неговият интервал за рестартиране. Но сега всичко е наред.

  • DAG (известен още като "dag") - "насочена ациклична графика", но такава дефиниция ще каже на малко хора, но всъщност това е контейнер за задачи, взаимодействащи помежду си (вижте по-долу) или аналог на Package в SSIS и Workflow в Informatica .

    В допълнение към dags все още може да има subdags, но най-вероятно няма да стигнем до тях.

  • DAG Изпълнение - инициализиран dag, който се присвоява собствен execution_date. Даграни от един и същ даг могат да работят паралелно (ако сте направили задачите си идемпотентни, разбира се).
  • Оператор са части от код, отговорни за извършването на конкретно действие. Има три вида оператори:
    • действиекато нашия любим PythonOperator, който може да изпълни всеки (валиден) Python код;
    • прехвърляне, които пренасят данни от място на място, да речем MsSqlToHiveTransfer;
    • сензор от друга страна, това ще ви позволи да реагирате или да забавите по-нататъшното изпълнение на dag, докато настъпи събитие. HttpSensor може да изтегли посочената крайна точка и когато желаният отговор чака, да започне прехвърлянето GoogleCloudStorageToS3Operator. Любознателният ум ще попита: „Защо? В крайна сметка можете да правите повторения направо в оператора!“ И тогава, за да не задръстите пула от задачи със спрени оператори. Сензорът стартира, проверява и умира преди следващия опит.
  • Task - декларираните оператори, независимо от типа и прикачени към dag, се повишават в ранг на задача.
  • екземпляр на задачата - когато генералният плановик реши, че е време да изпрати задачи в битка на работници-изпълнители (точно на място, ако използваме LocalExecutor или към отдалечен възел в случай на CeleryExecutor), той им присвоява контекст (т.е. набор от променливи - параметри за изпълнение), разширява шаблони на команди или заявки и ги обединява.

Ние генерираме задачи

Първо, нека очертаем общата схема на нашия дъг, а след това ще се потопим в детайлите все повече и повече, защото прилагаме някои нетривиални решения.

И така, в най-простата си форма, такъв даг ще изглежда така:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Нека разберем:

  • Първо импортираме необходимите библиотеки и нещо друго;
  • sql_server_ds - List[namedtuple[str, str]] с имената на връзките от Airflow Connections и базите данни, от които ще вземем табелата си;
  • dag - обявяването на нашия даг, който задължително трябва да бъде в globals(), в противен случай Airflow няма да го намери. Дъг също трябва да каже:
    • как се казва той orders - това име ще се появи в уеб интерфейса,
    • че ще работи от полунощ на осми юли,
    • и трябва да работи приблизително на всеки 6 часа (за яки момчета тук вместо timedelta() допустимо cron- линия 0 0 0/6 ? * * *, за по-малко готините - израз като @daily);
  • workflow() ще свърши основната работа, но не сега. Засега просто ще изхвърлим нашия контекст в дневника.
  • А сега простата магия за създаване на задачи:
    • преминаваме през нашите източници;
    • инициализирам PythonOperator, който ще изпълни нашия манекен workflow(). Не забравяйте да посочите уникално (в рамките на dag) име на задачата и да завържете самия dag. Флаг provide_context на свой ред ще влее допълнителни аргументи във функцията, които внимателно ще съберем, използвайки **context.

За сега това е всичко. Какво получихме:

  • нов dag в уеб интерфейса,
  • сто и половина задачи, които ще се изпълняват паралелно (ако настройките на Airflow, Celery и капацитетът на сървъра го позволяват).

Е, почти разбрах.

Apache Airflow: Улесняване на ETL
Кой ще инсталира зависимостите?

За да опростя цялото това нещо, аз се прецаках docker-compose.yml обработка requirements.txt на всички възли.

Ето го сега:

Apache Airflow: Улесняване на ETL

Сивите квадрати са екземпляри на задачи, обработени от планировчика.

Чакаме малко, задачите се разграбват от работниците:

Apache Airflow: Улесняване на ETL

Зелените, разбира се, свършиха успешно работата си. Червените не са много успешни.

Между другото, няма папка на нашия продукт ./dags, няма синхронизация между машините - всички даги лежат git в нашия Gitlab и Gitlab CI разпространява актуализации на машини при сливане master.

Малко за Цвете

Докато работниците ни трошат залъгалките, нека си спомним още един инструмент, който може да ни покаже нещо - Цветето.

Първата страница с обобщена информация за работните възли:

Apache Airflow: Улесняване на ETL

Най-интензивната страница с успешни задачи:

Apache Airflow: Улесняване на ETL

Най-скучната страница със статуса на нашия брокер:

Apache Airflow: Улесняване на ETL

Най-ярката страница е с графики на състоянието на задачите и времето за тяхното изпълнение:

Apache Airflow: Улесняване на ETL

Зареждаме недотоварените

И така, всички задачи са отработени, можете да носите ранените.

Apache Airflow: Улесняване на ETL

И имаше много ранени – по една или друга причина. В случай на правилно използване на Airflow, точно тези квадрати показват, че данните определено не са пристигнали.

Трябва да гледате дневника и да рестартирате падналите екземпляри на задачи.

Щраквайки върху който и да е квадрат, ще видим достъпните за нас действия:

Apache Airflow: Улесняване на ETL

Можете да вземете и да изчистите падналите. Тоест, забравяме, че нещо се е провалило там и същата задача на екземпляра ще отиде в планировчика.

Apache Airflow: Улесняване на ETL

Ясно е, че да правиш това с мишката с всички червени квадратчета не е много хуманно – не това очакваме от Airflow. Естествено, имаме оръжия за масово унищожение: Browse/Task Instances

Apache Airflow: Улесняване на ETL

Нека изберем всичко наведнъж и нулираме, щракнете върху правилния елемент:

Apache Airflow: Улесняване на ETL

След почистване нашите таксита изглеждат така (те вече чакат графика да ги назначи):

Apache Airflow: Улесняване на ETL

Връзки, куки и други променливи

Време е да погледнем следващия DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Всеки ли е правил актуализация на отчет? Това отново е тя: има списък с източници, откъдето да вземете данните; има списък къде да поставите; не забравяйте да клаксоните, когато всичко се случи или се счупи (е, това не е за нас, не).

Нека да прегледаме файла отново и да разгледаме новите неясни неща:

  • from commons.operators import TelegramBotSendMessage - нищо не ни пречи да си направим собствени оператори, от което се възползвахме, като направихме малка обвивка за изпращане на съобщения до Unblocked. (Ще говорим повече за този оператор по-долу);
  • default_args={} - dag може да разпространява едни и същи аргументи към всички свои оператори;
  • to='{{ var.value.all_the_kings_men }}' - поле to няма да имаме твърдо кодирани, а динамично генерирани с помощта на Jinja и променлива със списък от имейли, които внимателно поставих Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — условие за стартиране на оператора. В нашия случай писмото ще лети до шефовете само ако всички зависимости са работили успешно;
  • tg_bot_conn_id='tg_main' - аргументи conn_id приемете ID на връзката, в която създаваме Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - съобщенията в Telegram ще отлетят само ако има паднали задачи;
  • task_concurrency=1 - забраняваме едновременното стартиране на няколко екземпляра на една задача. В противен случай ще получим едновременно стартиране на няколко VerticaOperator (гледа една маса);
  • report_update >> [email, tg] - всичко VerticaOperator се събират в изпращането на писма и съобщения, като това:
    Apache Airflow: Улесняване на ETL

    Но тъй като операторите на нотификатори имат различни условия за стартиране, само един ще работи. В дървовидния изглед всичко изглежда малко по-малко визуално:
    Apache Airflow: Улесняване на ETL

Ще кажа няколко думи за макроси и техните приятели - променливи.

Макросите са заместители на Jinja, които могат да заменят различна полезна информация в аргументите на оператора. Например така:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} ще се разшири до съдържанието на контекстната променлива execution_date във формат YYYY-MM-DD: 2020-07-14. Най-добрата част е, че контекстните променливи са приковани към конкретен екземпляр на задача (квадрат в дървовидния изглед) и когато се рестартират, контейнерите ще се разширят до същите стойности.

Присвоените стойности могат да се видят с помощта на бутона Rendered на всеки екземпляр на задача. Ето как се изпълнява задачата с изпращане на писмо:

Apache Airflow: Улесняване на ETL

И така при задачата с изпращане на съобщение:

Apache Airflow: Улесняване на ETL

Пълен списък с вградени макроси за най-новата налична версия е достъпен тук: препратка към макроси

Освен това с помощта на плъгини можем да декларираме собствени макроси, но това е друга история.

В допълнение към предварително дефинираните неща, можем да заменим стойностите на нашите променливи (вече използвах това в кода по-горе). Да създаваме в Admin/Variables няколко неща:

Apache Airflow: Улесняване на ETL

Всичко, което можете да използвате:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Стойността може да бъде скаларен или може също да бъде JSON. В случай на JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

просто използвайте пътя до желания ключ: {{ var.json.bot_config.bot.token }}.

Буквално ще кажа една дума и ще покажа една екранна снимка за връзка. Тук всичко е елементарно: на страницата Admin/Connections ние създаваме връзка, добавяме нашите потребителски данни / пароли и по-специфични параметри там. Като този:

Apache Airflow: Улесняване на ETL

Паролите могат да бъдат шифровани (по-задълбочено от стандартните) или можете да пропуснете типа връзка (както направих за tg_main) - факт е, че списъкът с типове е твърдо свързан в моделите на Airflow и не може да бъде разширен, без да навляза в изходните кодове (ако внезапно не съм намерил нещо в Google, моля, поправете ме), но нищо няма да ни попречи да получим кредити само от име.

Можете също да направите няколко връзки с едно и също име: в този случай методът BaseHook.get_connection(), което ни дава връзки по име, ще даде произволен от няколко съименници (би било по-логично да се направи Round Robin, но нека го оставим на съвестта на разработчиците на Airflow).

Променливите и връзките със сигурност са страхотни инструменти, но е важно да не губите баланса: кои части от вашите потоци съхранявате в самия код и кои части давате на Airflow за съхранение. От една страна, може да е удобно бързо да промените стойността, например пощенска кутия, чрез потребителския интерфейс. От друга страна, това все още е връщане към щракането на мишката, от което (аз) искахме да се отървем.

Работата с връзки е една от задачите кукички. Като цяло, куките на Airflow са точки за свързване към услуги и библиотеки на трети страни. напр. JiraHook ще ни отвори клиент, за да взаимодействаме с Jira (можете да местите задачи напред и назад) и с помощта на SambaHook можете да натиснете локален файл към smb-точка.

Анализ на персонализирания оператор

И се доближихме до това да видим как се прави TelegramBotSendMessage

Код commons/operators.py с действителния оператор:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Тук, както всичко останало в Airflow, всичко е много просто:

  • Наследени от BaseOperator, който прилага доста специфични за Airflow неща (погледнете свободното си време)
  • Декларирани полета template_fields, в който Jinja ще търси макроси за обработка.
  • Подреди правилните аргументи за __init__(), задайте настройките по подразбиране, където е необходимо.
  • Не забравихме и инициализацията на предшественика.
  • Отвори съответната кука TelegramBotHookполучи клиентски обект от него.
  • Отменен (предефиниран) метод BaseOperator.execute(), който Airfow ще потрепне, когато дойде време да стартира оператора - в него ще приложим основното действие, забравяйки да влезем. (Ние влизаме, между другото, направо stdout и stderr - Въздушният поток ще прихване всичко, ще го увие красиво, ще го разложи, където е необходимо.)

Да видим какво имаме commons/hooks.py. Първата част на файла със самата кука:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Дори не знам какво да обясня тук, просто ще отбележа важните точки:

  • Ние наследяваме, помислете за аргументите - в повечето случаи ще бъде един: conn_id;
  • Преодоляване на стандартните методи: Ограничих се get_conn(), в който получавам параметрите на връзката по име и просто получавам секцията extra (това е JSON поле), в което аз (според собствените си инструкции!) поставих токена на Telegram bot: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Създавам екземпляр на нашия TelegramBot, давайки му конкретен токен.

Това е всичко. Можете да получите клиент от кука, като използвате TelegramBotHook().clent или TelegramBotHook().get_conn().

И втората част на файла, в която правя microwrapper за Telegram REST API, за да не влача същото python-telegram-bot за един метод sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Правилният начин е да съберете всичко: TelegramBotSendMessage, TelegramBotHook, TelegramBot - в приставката, поставете в публично хранилище и го предайте на Open Source.

Докато изучавахме всичко това, нашите актуализации на отчета успяха да се провалят успешно и да ми изпратят съобщение за грешка в канала. Отивам да проверя дали не е наред...

Apache Airflow: Улесняване на ETL
Нещо се счупи в нашия додж! Не е ли това, което очаквахме? Точно!

Ще наливаш ли?

Чувстваш ли, че съм пропуснал нещо? Май е обещал да прехвърли данни от SQL Server към Vertica, а после го взе и се отклони от темата, негодника!

Това зверство беше умишлено, просто трябваше да дешифрирам малко терминология за вас. Сега можете да отидете по-далеч.

Нашият план беше следният:

  1. Направете даг
  2. Генериране на задачи
  3. Вижте колко красиво е всичко
  4. Присвояване на номера на сесии на попълвания
  5. Вземете данни от SQL Server
  6. Поставете данни във Vertica
  7. Събиране на статистика

И така, за да стартирам всичко това, направих малко допълнение към нашия docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Там повдигаме:

  • Vertica като домакин dwh с повечето настройки по подразбиране,
  • три екземпляра на SQL Server,
  • ние попълваме базите данни в последния с някои данни (в никакъв случай не разглеждайте mssql_init.py!)

Стартираме всичко добро с помощта на малко по-сложна команда от последния път:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Можете да използвате артикула, генериран от нашия чудотворен рандомизатор Data Profiling/Ad Hoc Query:

Apache Airflow: Улесняване на ETL
Основното нещо е да не го показвате на анализатори

разработете по-подробно ETL сесии Няма, там всичко е тривиално: правим база, има знак в нея, обгръщаме всичко с контекстен мениджър и сега правим това:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Дойде времето събирайте нашите данни от нашите сто и половина маси. Нека направим това с помощта на много непретенциозни редове:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. С помощта на кука получаваме от Airflow pymssql- свързване
  2. Нека заменим ограничение под формата на дата в заявката - тя ще бъде хвърлена във функцията от машината за шаблони.
  3. Подхранване на нашата молба pandasкой ще ни вземе DataFrame - ще ни бъде полезно в бъдеще.

Използвам заместване {dt} вместо параметър на заявка %s не защото съм зъл Пинокио, а защото pandas не мога да се справя pymssql и изплъзва последния params: Listвъпреки че наистина иска tuple.
Също така имайте предвид, че разработчикът pymssql реши да не го подкрепя повече и е време да се изнесе pyodbc.

Нека да видим с какво Airflow напълни аргументите на нашите функции:

Apache Airflow: Улесняване на ETL

Ако няма данни, няма смисъл да продължавате. Но също така е странно да се смята, че пълнежът е успешен. Но това не е грешка. А-а-а, какво да правя?! И ето какво:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException казва на Airflow, че няма грешки, но пропускаме задачата. Интерфейсът няма да има зелен или червен квадрат, а розов.

Нека хвърлим нашите данни множество колони:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

А именно

  • Базата данни, от която взехме поръчките,
  • ID на нашата сесия за наводняване (той ще бъде различен за всяка задача),
  • Хеш от източника и ID на поръчката - така че в крайната база данни (където всичко е излято в една таблица) да имаме уникален ID на поръчката.

Остава предпоследната стъпка: изсипете всичко във Vertica. И колкото и да е странно, един от най-зрелищните и ефективни начини да направите това е чрез CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Правим специален приемник StringIO.
  2. pandas любезно ще постави нашите DataFrame във формата CSV-линии.
  3. Нека отворим връзка към любимата ни Vertica с кука.
  4. И сега с помощта copy() изпратете нашите данни директно на Vertika!

Ще вземем от драйвера колко реда са били запълнени и ще кажем на мениджъра на сесията, че всичко е наред:

session.loaded_rows = cursor.rowcount
session.successful = True

Това е всичко.

При разпродажбата ние създаваме целевата плоча ръчно. Тук си позволих една малка машина:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Използвам VerticaOperator() Създавам схема на база данни и таблица (ако все още не съществуват, разбира се). Основното нещо е да подредите правилно зависимостите:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Резюмиране

- Е, - каза малката мишка, - нали сега
Убеден ли си, че аз съм най-ужасното животно в гората?

Джулия Доналдсън, The Gruffalo

Мисля, че ако моите колеги и аз имахме състезание: кой бързо ще създаде и стартира ETL процес от нулата: те с техните SSIS и мишка, а аз с Airflow ... И тогава бихме сравнили и лекотата на поддръжка ... Уау, мисля, че ще се съгласите, че ще ги победя на всички фронтове!

Ако малко по-сериозно, тогава Apache Airflow - като описва процесите под формата на програмен код - ми свърши работата много по-удобно и приятно.

Неговата неограничена разширяемост, както по отношение на плъгини, така и предразположение към скалируемост, ви дава възможност да използвате Airflow в почти всяка област: дори в пълния цикъл на събиране, подготовка и обработка на данни, дори при изстрелване на ракети (до Марс, на курс).

Част финална, справочно-информационна

Рейкът, който събрахме за вас

  • start_date. Да, това вече е местно меме. Чрез основния аргумент на Дъг start_date всички минават. Накратко, ако посочите в start_date текуща дата и schedule_interval - един ден, тогава DAG ще започне утре не по-рано.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    И няма повече проблеми.

    Има друга грешка по време на изпълнение, свързана с него: Task is missing the start_date parameter, което най-често показва, че сте забравили да се свържете с dag оператора.

  • Всичко на една машина. Да, и бази (самият Airflow и нашето покритие), и уеб сървър, и планировчик, и работници. И дори проработи. Но с течение на времето броят на задачите за услуги нарасна и когато PostgreSQL започна да отговаря на индекса за 20 s вместо за 5 ms, ние го взехме и го отнесохме.
  • LocalExecutor. Да, ние все още седим на него и вече сме стигнали до ръба на бездната. LocalExecutor ни беше достатъчен досега, но сега е време да се разширим с поне един работник и ще трябва да работим усилено, за да преминем към CeleryExecutor. И с оглед на факта, че можете да работите с него на една машина, нищо не ви спира да използвате Celery дори на сървър, който "разбира се, никога няма да влезе в производство, честно!"
  • Неупотреба вградени инструменти:
    • Връзки за съхраняване на идентификационни данни за услугата,
    • SLA пропуски да отговаря на задачи, които не са успели навреме,
    • xcom за обмен на метаданни (казах целданни!) между dag задачи.
  • Злоупотреба с поща. Е, какво да кажа? Бяха настроени сигнали за всички повторения на паднали задачи. Сега служебният ми Gmail има >90 хиляди имейла от Airflow, а муцуната на уеб пощата отказва да вземе и изтрие повече от 100 наведнъж.

Още клопки: Неизправности на Apache Airflow

Още инструменти за автоматизация

За да работим още повече с главите си, а не с ръцете си, Airflow ни подготви това:

  • REST API - все още е със статут на Опитно, което не му пречи да работи. С него можете не само да получите информация за dags и задачи, но и да спрете/стартирате dag, да създадете DAG Run или пул.
  • CLI - много инструменти са достъпни чрез командния ред, които не просто са неудобни за използване през WebUI, но като цяло липсват. Например:
    • backfill необходими за рестартиране на екземпляри на задачи.
      Например дойдоха анализатори и казаха: „А вие, другарю, имате глупости в данните от 1 до 13 януари! Поправи го, поправи го, поправи го, поправи го!" И ти си такъв котлон:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Основна услуга: initdb, resetdb, upgradedb, checkdb.
    • run, което ви позволява да изпълнявате една задача за екземпляр и дори да оценявате всички зависимости. Освен това можете да го стартирате чрез LocalExecutor, дори ако имате клъстер Celery.
    • Прави почти същото test, само също в базите не пише нищо.
    • connections позволява масово създаване на връзки от обвивката.
  • API на Python - доста хардкор начин на взаимодействие, който е предназначен за плъгини, а не за роене в него с малки ръце. Но кой ни пречи да отидем /home/airflow/dags, пусни ipython и да започнеш да се бъркаш? Можете например да експортирате всички връзки със следния код:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Свързване към базата данни на Airflow. Не препоръчвам да пишете в него, но получаването на състояния на задачите за различни специфични показатели може да бъде много по-бързо и лесно, отколкото чрез който и да е API.

    Да кажем, че не всички наши задачи са идемпотентни, но понякога могат да паднат и това е нормално. Но няколко запушвания вече са подозрителни и е необходимо да се провери.

    Пазете се от SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

Позоваването

И разбира се, първите десет връзки от издаването на Google са съдържанието на папката Airflow от моите отметки.

И връзките, използвани в статията:

Източник: www.habr.com