Здравейте, аз съм Дмитрий Логвиненко - инженер по данни от отдела за анализ на групата компании Vezet.
Ще ви разкажа за един прекрасен инструмент за разработване на ETL процеси - Apache Airflow. Но Airflow е толкова гъвкав и многостранен, че трябва да го разгледате по-отблизо, дори ако не участвате в потоци от данни, но имате нужда периодично да стартирате всякакви процеси и да наблюдавате тяхното изпълнение.
И да, не само ще разкажа, но и ще покажа: програмата има много код, екранни снимки и препоръки.

Това, което обикновено виждате, когато търсите в Google думата Airflow / Wikimedia Commons
Таблица на съдържанието
въведение
Apache Airflow е точно като Django:
- написан на python
- има страхотен админ панел,
- разширяема за неопределено време
- само по-добре и е направено за съвсем други цели, а именно (както е написано преди кат):
- изпълнение и наблюдение на задачи на неограничен брой машини (колкото Celery / Kubernetes и съвестта ви позволяват)
- с динамично генериране на работен поток от много лесен за писане и разбиране код на Python
- и възможност за свързване на всякакви бази данни и API помежду си, като се използват както готови компоненти, така и домашно направени плъгини (което е изключително просто).
Използваме Apache Airflow така:
- събираме данни от различни източници (много екземпляри на SQL Server и PostgreSQL, различни API с показатели на приложенията, дори 1C) в DWH и ODS (имаме Vertica и Clickhouse).
- колко напреднал
cron, който стартира процесите на консолидация на данни в ODS, а също така следи тяхната поддръжка.
До скоро нуждите ни се покриваха от един малък сървър с 32 ядра и 50 GB RAM. В Airflow това работи:
- още 200 dag (всъщност работни потоци, в които напъхахме задачи),
- във всеки средно 70 задачи,
- тази доброта започва (също средно) веднъж на час.
А за това как се разширихме, ще пиша по-долу, но сега нека дефинираме über-проблема, който ще решим:
Има три изходни SQL сървъра, всеки с 50 бази данни - екземпляри на един проект, съответно те имат една и съща структура (почти навсякъде, муа-ха-ха), което означава, че всеки има таблица Orders (за щастие, таблица с това името може да бъде прокарано във всеки бизнес). Вземаме данните, като добавяме сервизни полета (изходен сървър, изходна база данни, ETL идентификатор на задача) и наивно ги хвърляме, да речем, във Vertica.
Да вървим!
Основната част, практическа (и малко теоретична)
Защо ние (и вие)
Когато дърветата бяха големи, а аз бях прост SQL-schik в един руски магазин на дребно, ние измамихме ETL процеси, известни още като потоци от данни, използвайки два налични инструмента:
- Informatica Power Center - изключително разпространена система, изключително продуктивна, със собствен хардуер, собствена версия. Използвах дай Боже 1% от възможностите му. Защо? Ами, първо, този интерфейс, някъде от 380-те години, психически ни натовари. Второ, тази измишльотина е предназначена за изключително фантастични процеси, яростно повторно използване на компоненти и други много важни трикове за предприятието. За това какво струва, като крилото на Airbus AXNUMX / година, няма да кажем нищо.
Внимавайте, екранна снимка може да нарани малко хора под 30 години

- Сървър за интеграция на SQL Server - използвахме този другар в нашите вътрешнопроектни потоци. Е, всъщност: ние вече използваме SQL Server и би било някак неразумно да не използваме неговите ETL инструменти. Всичко в него е добро: и интерфейсът е красив, и отчетите за напредъка ... Но не затова обичаме софтуерните продукти, о, не за това. Версия го
dtsx(което е XML с възли, разбъркани при запазване) можем, но какъв е смисълът? Какво ще кажете да създадете пакет от задачи, който ще плъзга стотици таблици от един сървър на друг? Да, какви сто, показалецът ви ще падне от двадесет парчета, като щракнете върху бутона на мишката. Но определено изглежда по-модерно:
Определено търсихме изходи. Случай дори почти стигна до самонаписан генератор на SSIS пакети ...
…и тогава нова работа ме намери. И Apache Airflow ме изпревари на него.
Когато разбрах, че описанията на ETL процеси са прост код на Python, просто не танцувах от радост. Ето как потоците от данни бяха версионирани и разграничени, а изливането на таблици с една структура от стотици бази данни в една цел стана въпрос на код на Python в един и половина или два 13-инчови екрана.
Сглобяване на клъстера
Нека не организираме напълно детска градина и не говорим за напълно очевидни неща тук, като инсталиране на Airflow, избраната от вас база данни, Celery и други случаи, описани в доковете.
За да можем веднага да започнем експерименти, скицирах docker-compose.yml в който:
- Нека всъщност рейзваме Airflow: Планировчик, Уеб сървър. Flower също ще се върти там, за да наблюдава задачите на Celery (защото вече е натиснат в
apache/airflow:1.10.10-python3.7, но ние нямаме нищо против) - PostgreSQL, в който Airflow ще записва сервизната си информация (данни за планировчика, статистики за изпълнение и т.н.), а Celery ще маркира изпълнени задачи;
- Redis, който ще действа като брокер на задачи за Celery;
- Целина работник, които ще бъдат ангажирани с прякото изпълнение на задачите.
- Към папка
./dagsще добавим нашите файлове с описанието на dags. Те ще бъдат взети в движение, така че няма нужда да жонглирате с целия стек след всяко кихане.
На някои места кодът в примерите не е показан изцяло (за да не претрупва текста), но някъде е модифициран в процеса. Пълни примери за работещ код могат да бъдат намерени в хранилището .
докер-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerЗабележки:
- При сглобяването на композицията до голяма степен заложих на добре познатия образ - не забравяйте да го проверите. Може би нямате нужда от нищо друго в живота си.
- Всички настройки на Airflow са достъпни не само чрез
airflow.cfg, но и чрез променливи на средата (благодарение на разработчиците), от които злонамерено се възползвах. - Естествено, не е готов за производство: умишлено не поставих сърдечни удари на контейнери, не се занимавах със сигурността. Но направих минимума, подходящ за нашите експериментатори.
- Отбележи, че:
- Папката dag трябва да е достъпна както за планировчика, така и за работниците.
- Същото важи и за всички библиотеки на трети страни - всички те трябва да бъдат инсталирани на машини с планировчик и работници.
Е, сега е просто:
$ docker-compose up --scale worker=3След като всичко се издигне, можете да разгледате уеб интерфейсите:
- Въздушно течение:
- цветя:
Основни понятия
Ако не сте разбрали нищо във всички тези „даги“, тогава ето кратък речник:
- Scheduler - най-важният чичо в Airflow, който контролира, че роботите работят усилено, а не човек: следи графика, актуализира dags, стартира задачи.
Като цяло в по-старите версии той имаше проблеми с паметта (не, не амнезия, а течове) и наследеният параметър дори остана в конфигурациите
run_duration— неговият интервал за рестартиране. Но сега всичко е наред. - DAG (известен още като "dag") - "насочена ациклична графика", но такава дефиниция ще каже на малко хора, но всъщност това е контейнер за задачи, взаимодействащи помежду си (вижте по-долу) или аналог на Package в SSIS и Workflow в Informatica .
В допълнение към dags все още може да има subdags, но най-вероятно няма да стигнем до тях.
- DAG Изпълнение - инициализиран dag, който се присвоява собствен
execution_date. Даграни от един и същ даг могат да работят паралелно (ако сте направили задачите си идемпотентни, разбира се). - Оператор са части от код, отговорни за извършването на конкретно действие. Има три вида оператори:
- действиекато нашия любим
PythonOperator, който може да изпълни всеки (валиден) Python код; - прехвърляне, които пренасят данни от място на място, да речем
MsSqlToHiveTransfer; - сензор от друга страна, това ще ви позволи да реагирате или да забавите по-нататъшното изпълнение на dag, докато настъпи събитие.
HttpSensorможе да изтегли посочената крайна точка и когато желаният отговор чака, да започне прехвърлянетоGoogleCloudStorageToS3Operator. Любознателният ум ще попита: „Защо? В крайна сметка можете да правите повторения направо в оператора!“ И тогава, за да не задръстите пула от задачи със спрени оператори. Сензорът стартира, проверява и умира преди следващия опит.
- действиекато нашия любим
- Task - декларираните оператори, независимо от типа и прикачени към dag, се повишават в ранг на задача.
- екземпляр на задачата - когато генералният плановик реши, че е време да изпрати задачи в битка на работници-изпълнители (точно на място, ако използваме
LocalExecutorили към отдалечен възел в случай наCeleryExecutor), той им присвоява контекст (т.е. набор от променливи - параметри за изпълнение), разширява шаблони на команди или заявки и ги обединява.
Ние генерираме задачи
Първо, нека очертаем общата схема на нашия дъг, а след това ще се потопим в детайлите все повече и повече, защото прилагаме някои нетривиални решения.
И така, в най-простата си форма, такъв даг ще изглежда така:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Нека разберем:
- Първо импортираме необходимите библиотеки и нещо друго;
sql_server_ds-List[namedtuple[str, str]]с имената на връзките от Airflow Connections и базите данни, от които ще вземем табелата си;dag- обявяването на нашия даг, който задължително трябва да бъде вglobals(), в противен случай Airflow няма да го намери. Дъг също трябва да каже:- как се казва той
orders- това име ще се появи в уеб интерфейса, - че ще работи от полунощ на осми юли,
- и трябва да работи приблизително на всеки 6 часа (за яки момчета тук вместо
timedelta()допустимоcron- линия0 0 0/6 ? * * *, за по-малко готините - израз като@daily);
- как се казва той
workflow()ще свърши основната работа, но не сега. Засега просто ще изхвърлим нашия контекст в дневника.- А сега простата магия за създаване на задачи:
- преминаваме през нашите източници;
- инициализирам
PythonOperator, който ще изпълни нашия манекенworkflow(). Не забравяйте да посочите уникално (в рамките на dag) име на задачата и да завържете самия dag. Флагprovide_contextна свой ред ще влее допълнителни аргументи във функцията, които внимателно ще съберем, използвайки**context.
За сега това е всичко. Какво получихме:
- нов dag в уеб интерфейса,
- сто и половина задачи, които ще се изпълняват паралелно (ако настройките на Airflow, Celery и капацитетът на сървъра го позволяват).
Е, почти разбрах.

Кой ще инсталира зависимостите?
За да опростя цялото това нещо, аз се прецаках docker-compose.yml обработка requirements.txt на всички възли.
Ето го сега:

Сивите квадрати са екземпляри на задачи, обработени от планировчика.
Чакаме малко, задачите се разграбват от работниците:

Зелените, разбира се, свършиха успешно работата си. Червените не са много успешни.
Между другото, няма папка на нашия продукт
./dags, няма синхронизация между машините - всички даги лежатgitв нашия Gitlab и Gitlab CI разпространява актуализации на машини при сливанеmaster.
Малко за Цвете
Докато работниците ни трошат залъгалките, нека си спомним още един инструмент, който може да ни покаже нещо - Цветето.
Първата страница с обобщена информация за работните възли:

Най-интензивната страница с успешни задачи:

Най-скучната страница със статуса на нашия брокер:

Най-ярката страница е с графики на състоянието на задачите и времето за тяхното изпълнение:

Зареждаме недотоварените
И така, всички задачи са отработени, можете да носите ранените.

И имаше много ранени – по една или друга причина. В случай на правилно използване на Airflow, точно тези квадрати показват, че данните определено не са пристигнали.
Трябва да гледате дневника и да рестартирате падналите екземпляри на задачи.
Щраквайки върху който и да е квадрат, ще видим достъпните за нас действия:

Можете да вземете и да изчистите падналите. Тоест, забравяме, че нещо се е провалило там и същата задача на екземпляра ще отиде в планировчика.

Ясно е, че да правиш това с мишката с всички червени квадратчета не е много хуманно – не това очакваме от Airflow. Естествено, имаме оръжия за масово унищожение: Browse/Task Instances

Нека изберем всичко наведнъж и нулираме, щракнете върху правилния елемент:

След почистване нашите таксита изглеждат така (те вече чакат графика да ги назначи):

Връзки, куки и други променливи
Време е да погледнем следващия DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Всеки ли е правил актуализация на отчет? Това отново е тя: има списък с източници, откъдето да вземете данните; има списък къде да поставите; не забравяйте да клаксоните, когато всичко се случи или се счупи (е, това не е за нас, не).
Нека да прегледаме файла отново и да разгледаме новите неясни неща:
from commons.operators import TelegramBotSendMessage- нищо не ни пречи да си направим собствени оператори, от което се възползвахме, като направихме малка обвивка за изпращане на съобщения до Unblocked. (Ще говорим повече за този оператор по-долу);default_args={}- dag може да разпространява едни и същи аргументи към всички свои оператори;to='{{ var.value.all_the_kings_men }}'- полеtoняма да имаме твърдо кодирани, а динамично генерирани с помощта на Jinja и променлива със списък от имейли, които внимателно поставихAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— условие за стартиране на оператора. В нашия случай писмото ще лети до шефовете само ако всички зависимости са работили успешно;tg_bot_conn_id='tg_main'- аргументиconn_idприемете ID на връзката, в която създавамеAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- съобщенията в Telegram ще отлетят само ако има паднали задачи;task_concurrency=1- забраняваме едновременното стартиране на няколко екземпляра на една задача. В противен случай ще получим едновременно стартиране на няколкоVerticaOperator(гледа една маса);report_update >> [email, tg]- всичкоVerticaOperatorсе събират в изпращането на писма и съобщения, като това:

Но тъй като операторите на нотификатори имат различни условия за стартиране, само един ще работи. В дървовидния изглед всичко изглежда малко по-малко визуално:

Ще кажа няколко думи за макроси и техните приятели - променливи.
Макросите са заместители на Jinja, които могат да заменят различна полезна информация в аргументите на оператора. Например така:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} ще се разшири до съдържанието на контекстната променлива execution_date във формат YYYY-MM-DD: 2020-07-14. Най-добрата част е, че контекстните променливи са приковани към конкретен екземпляр на задача (квадрат в дървовидния изглед) и когато се рестартират, контейнерите ще се разширят до същите стойности.
Присвоените стойности могат да се видят с помощта на бутона Rendered на всеки екземпляр на задача. Ето как се изпълнява задачата с изпращане на писмо:

И така при задачата с изпращане на съобщение:

Пълен списък с вградени макроси за най-новата налична версия е достъпен тук:
Освен това с помощта на плъгини можем да декларираме собствени макроси, но това е друга история.
В допълнение към предварително дефинираните неща, можем да заменим стойностите на нашите променливи (вече използвах това в кода по-горе). Да създаваме в Admin/Variables няколко неща:

Всичко, което можете да използвате:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Стойността може да бъде скаларен или може също да бъде JSON. В случай на JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}просто използвайте пътя до желания ключ: {{ var.json.bot_config.bot.token }}.
Буквално ще кажа една дума и ще покажа една екранна снимка за връзка. Тук всичко е елементарно: на страницата Admin/Connections ние създаваме връзка, добавяме нашите потребителски данни / пароли и по-специфични параметри там. Като този:

Паролите могат да бъдат шифровани (по-задълбочено от стандартните) или можете да пропуснете типа връзка (както направих за tg_main) - факт е, че списъкът с типове е твърдо свързан в моделите на Airflow и не може да бъде разширен, без да навляза в изходните кодове (ако внезапно не съм намерил нещо в Google, моля, поправете ме), но нищо няма да ни попречи да получим кредити само от име.
Можете също да направите няколко връзки с едно и също име: в този случай методът BaseHook.get_connection(), което ни дава връзки по име, ще даде произволен от няколко съименници (би било по-логично да се направи Round Robin, но нека го оставим на съвестта на разработчиците на Airflow).
Променливите и връзките със сигурност са страхотни инструменти, но е важно да не губите баланса: кои части от вашите потоци съхранявате в самия код и кои части давате на Airflow за съхранение. От една страна, може да е удобно бързо да промените стойността, например пощенска кутия, чрез потребителския интерфейс. От друга страна, това все още е връщане към щракането на мишката, от което (аз) искахме да се отървем.
Работата с връзки е една от задачите кукички. Като цяло, куките на Airflow са точки за свързване към услуги и библиотеки на трети страни. напр. JiraHook ще ни отвори клиент, за да взаимодействаме с Jira (можете да местите задачи напред и назад) и с помощта на SambaHook можете да натиснете локален файл към smb-точка.
Анализ на персонализирания оператор
И се доближихме до това да видим как се прави TelegramBotSendMessage
Код commons/operators.py с действителния оператор:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Тук, както всичко останало в Airflow, всичко е много просто:
- Наследени от
BaseOperator, който прилага доста специфични за Airflow неща (погледнете свободното си време) - Декларирани полета
template_fields, в който Jinja ще търси макроси за обработка. - Подреди правилните аргументи за
__init__(), задайте настройките по подразбиране, където е необходимо. - Не забравихме и инициализацията на предшественика.
- Отвори съответната кука
TelegramBotHookполучи клиентски обект от него. - Отменен (предефиниран) метод
BaseOperator.execute(), който Airfow ще потрепне, когато дойде време да стартира оператора - в него ще приложим основното действие, забравяйки да влезем. (Ние влизаме, между другото, направоstdoutиstderr- Въздушният поток ще прихване всичко, ще го увие красиво, ще го разложи, където е необходимо.)
Да видим какво имаме commons/hooks.py. Първата част на файла със самата кука:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientДори не знам какво да обясня тук, просто ще отбележа важните точки:
- Ние наследяваме, помислете за аргументите - в повечето случаи ще бъде един:
conn_id; - Преодоляване на стандартните методи: Ограничих се
get_conn(), в който получавам параметрите на връзката по име и просто получавам секциятаextra(това е JSON поле), в което аз (според собствените си инструкции!) поставих токена на Telegram bot:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Създавам екземпляр на нашия
TelegramBot, давайки му конкретен токен.
Това е всичко. Можете да получите клиент от кука, като използвате TelegramBotHook().clent или TelegramBotHook().get_conn().
И втората част на файла, в която правя microwrapper за Telegram REST API, за да не влача същото за един метод sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Правилният начин е да съберете всичко:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- в приставката, поставете в публично хранилище и го предайте на Open Source.
Докато изучавахме всичко това, нашите актуализации на отчета успяха да се провалят успешно и да ми изпратят съобщение за грешка в канала. Отивам да проверя дали не е наред...

Нещо се счупи в нашия додж! Не е ли това, което очаквахме? Точно!
Ще наливаш ли?
Чувстваш ли, че съм пропуснал нещо? Май е обещал да прехвърли данни от SQL Server към Vertica, а после го взе и се отклони от темата, негодника!
Това зверство беше умишлено, просто трябваше да дешифрирам малко терминология за вас. Сега можете да отидете по-далеч.
Нашият план беше следният:
- Направете даг
- Генериране на задачи
- Вижте колко красиво е всичко
- Присвояване на номера на сесии на попълвания
- Вземете данни от SQL Server
- Поставете данни във Vertica
- Събиране на статистика
И така, за да стартирам всичко това, направих малко допълнение към нашия docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyТам повдигаме:
- Vertica като домакин
dwhс повечето настройки по подразбиране, - три екземпляра на SQL Server,
- ние попълваме базите данни в последния с някои данни (в никакъв случай не разглеждайте
mssql_init.py!)
Стартираме всичко добро с помощта на малко по-сложна команда от последния път:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Можете да използвате артикула, генериран от нашия чудотворен рандомизатор Data Profiling/Ad Hoc Query:

Основното нещо е да не го показвате на анализатори
разработете по-подробно ETL сесии Няма, там всичко е тривиално: правим база, има знак в нея, обгръщаме всичко с контекстен мениджър и сега правим това:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passДойде времето събирайте нашите данни от нашите сто и половина маси. Нека направим това с помощта на много непретенциозни редове:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- С помощта на кука получаваме от Airflow
pymssql- свързване - Нека заменим ограничение под формата на дата в заявката - тя ще бъде хвърлена във функцията от машината за шаблони.
- Подхранване на нашата молба
pandasкой ще ни вземеDataFrame- ще ни бъде полезно в бъдеще.
Използвам заместване
{dt}вместо параметър на заявка%sне защото съм зъл Пинокио, а защотоpandasне мога да се справяpymssqlи изплъзва последнияparams: Listвъпреки че наистина искаtuple.
Също така имайте предвид, че разработчикътpymssqlреши да не го подкрепя повече и е време да се изнесеpyodbc.
Нека да видим с какво Airflow напълни аргументите на нашите функции:

Ако няма данни, няма смисъл да продължавате. Но също така е странно да се смята, че пълнежът е успешен. Но това не е грешка. А-а-а, какво да правя?! И ето какво:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException казва на Airflow, че няма грешки, но пропускаме задачата. Интерфейсът няма да има зелен или червен квадрат, а розов.
Нека хвърлим нашите данни множество колони:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])А именно
- Базата данни, от която взехме поръчките,
- ID на нашата сесия за наводняване (той ще бъде различен за всяка задача),
- Хеш от източника и ID на поръчката - така че в крайната база данни (където всичко е излято в една таблица) да имаме уникален ID на поръчката.
Остава предпоследната стъпка: изсипете всичко във Vertica. И колкото и да е странно, един от най-зрелищните и ефективни начини да направите това е чрез CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Правим специален приемник
StringIO. pandasлюбезно ще постави нашитеDataFrameвъв форматаCSV-линии.- Нека отворим връзка към любимата ни Vertica с кука.
- И сега с помощта
copy()изпратете нашите данни директно на Vertika!
Ще вземем от драйвера колко реда са били запълнени и ще кажем на мениджъра на сесията, че всичко е наред:
session.loaded_rows = cursor.rowcount
session.successful = TrueТова е всичко.
При разпродажбата ние създаваме целевата плоча ръчно. Тук си позволих една малка машина:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)Използвам
VerticaOperator()Създавам схема на база данни и таблица (ако все още не съществуват, разбира се). Основното нещо е да подредите правилно зависимостите:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadРезюмиране
- Е, - каза малката мишка, - нали сега
Убеден ли си, че аз съм най-ужасното животно в гората?
Джулия Доналдсън, The Gruffalo
Мисля, че ако моите колеги и аз имахме състезание: кой бързо ще създаде и стартира ETL процес от нулата: те с техните SSIS и мишка, а аз с Airflow ... И тогава бихме сравнили и лекотата на поддръжка ... Уау, мисля, че ще се съгласите, че ще ги победя на всички фронтове!
Ако малко по-сериозно, тогава Apache Airflow - като описва процесите под формата на програмен код - ми свърши работата много по-удобно и приятно.
Неговата неограничена разширяемост, както по отношение на плъгини, така и предразположение към скалируемост, ви дава възможност да използвате Airflow в почти всяка област: дори в пълния цикъл на събиране, подготовка и обработка на данни, дори при изстрелване на ракети (до Марс, на курс).
Част финална, справочно-информационна
Рейкът, който събрахме за вас
start_date. Да, това вече е местно меме. Чрез основния аргумент на Дъгstart_dateвсички минават. Накратко, ако посочите вstart_dateтекуща дата иschedule_interval- един ден, тогава DAG ще започне утре не по-рано.start_date = datetime(2020, 7, 7, 0, 1, 2)И няма повече проблеми.
Има друга грешка по време на изпълнение, свързана с него:
Task is missing the start_date parameter, което най-често показва, че сте забравили да се свържете с dag оператора.- Всичко на една машина. Да, и бази (самият Airflow и нашето покритие), и уеб сървър, и планировчик, и работници. И дори проработи. Но с течение на времето броят на задачите за услуги нарасна и когато PostgreSQL започна да отговаря на индекса за 20 s вместо за 5 ms, ние го взехме и го отнесохме.
- LocalExecutor. Да, ние все още седим на него и вече сме стигнали до ръба на бездната. LocalExecutor ни беше достатъчен досега, но сега е време да се разширим с поне един работник и ще трябва да работим усилено, за да преминем към CeleryExecutor. И с оглед на факта, че можете да работите с него на една машина, нищо не ви спира да използвате Celery дори на сървър, който "разбира се, никога няма да влезе в производство, честно!"
- Неупотреба вградени инструменти:
- Връзки за съхраняване на идентификационни данни за услугата,
- SLA пропуски да отговаря на задачи, които не са успели навреме,
- xcom за обмен на метаданни (казах целданни!) между dag задачи.
- Злоупотреба с поща. Е, какво да кажа? Бяха настроени сигнали за всички повторения на паднали задачи. Сега служебният ми Gmail има >90 хиляди имейла от Airflow, а муцуната на уеб пощата отказва да вземе и изтрие повече от 100 наведнъж.
Още клопки:
Още инструменти за автоматизация
За да работим още повече с главите си, а не с ръцете си, Airflow ни подготви това:
- - все още е със статут на Опитно, което не му пречи да работи. С него можете не само да получите информация за dags и задачи, но и да спрете/стартирате dag, да създадете DAG Run или пул.
- - много инструменти са достъпни чрез командния ред, които не просто са неудобни за използване през WebUI, но като цяло липсват. Например:
backfillнеобходими за рестартиране на екземпляри на задачи.
Например дойдоха анализатори и казаха: „А вие, другарю, имате глупости в данните от 1 до 13 януари! Поправи го, поправи го, поправи го, поправи го!" И ти си такъв котлон:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Основна услуга:
initdb,resetdb,upgradedb,checkdb. run, което ви позволява да изпълнявате една задача за екземпляр и дори да оценявате всички зависимости. Освен това можете да го стартирате чрезLocalExecutor, дори ако имате клъстер Celery.- Прави почти същото
test, само също в базите не пише нищо. connectionsпозволява масово създаване на връзки от обвивката.
- - доста хардкор начин на взаимодействие, който е предназначен за плъгини, а не за роене в него с малки ръце. Но кой ни пречи да отидем
/home/airflow/dags, пусниipythonи да започнеш да се бъркаш? Можете например да експортирате всички връзки със следния код:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Свързване към базата данни на Airflow. Не препоръчвам да пишете в него, но получаването на състояния на задачите за различни специфични показатели може да бъде много по-бързо и лесно, отколкото чрез който и да е API.
Да кажем, че не всички наши задачи са идемпотентни, но понякога могат да паднат и това е нормално. Но няколко запушвания вече са подозрителни и е необходимо да се провери.
Пазете се от SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
Позоваването
И разбира се, първите десет връзки от издаването на Google са съдържанието на папката Airflow от моите отметки.
- - разбира се, трябва да започнем с офиса. документация, но кой чете инструкциите?
- - Е, поне прочетете препоръките от създателите.
- - самото начало: потребителският интерфейс в снимки
- - основните понятия са добре описани, ако (внезапно!) Не сте разбрали нещо от мен.
- - кратко ръководство за настройка на Airflow клъстер.
- - почти същата интересна статия, освен може би повече формализъм и по-малко примери.
- — относно съвместната работа с Celery.
- - за идемпотентността на задачите, зареждане по ID вместо по дата, трансформация, файлова структура и други интересни неща.
- - зависимости на задачите и Trigger Rule, които споменах само мимоходом.
- - как да се преодолеят някои "работи по предназначение" в планировчика, зареждане на изгубени данни и приоритизиране на задачите.
- — полезни SQL заявки към метаданните на Airflow.
- - има полезен раздел за създаване на персонализиран сензор.
- — интересна кратка бележка за изграждането на инфраструктура на AWS за Data Science.
- - често срещани грешки (когато някой все още не чете инструкциите).
- - усмихнете се как хората патерица съхраняват пароли, въпреки че можете просто да използвате Connections.
- - имплицитно пренасочване на DAG, хвърляне на контекст във функциите, отново за зависимостите, а също и за пропускане на стартирания на задачи.
- - относно употребата
default argumentsиparamsв шаблони, както и променливи и връзки. - - история за това как проектантът се подготвя за Airflow 2.0.
- - малко остаряла статия за внедряването на нашия клъстер в
docker-compose. - - динамични задачи, използващи шаблони и контекстно пренасочване.
- — стандартни и персонализирани известия по пощата и Slack.
- - Разклонени задачи, макроси и XCom.
И връзките, използвани в статията:
- - контейнери, налични за използване в шаблони.
- — Често срещани грешки при създаване на dags.
- -
docker-composeза експериментиране, отстраняване на грешки и др. - — Python обвивка за Telegram REST API.
Източник: www.habr.com




