Здравейте, аз съм Дмитрий Логвиненко - инженер по данни от отдела за анализ на групата компании Vezet.
Ще ви разкажа за един прекрасен инструмент за разработване на ETL процеси - Apache Airflow. Но Airflow е толкова гъвкав и многостранен, че трябва да го разгледате по-отблизо, дори ако не участвате в потоци от данни, но имате нужда периодично да стартирате всякакви процеси и да наблюдавате тяхното изпълнение.
И да, не само ще разкажа, но и ще покажа: програмата има много код, екранни снимки и препоръки.
Това, което обикновено виждате, когато търсите в Google думата Airflow / Wikimedia Commons
- само по-добре и е направено за съвсем други цели, а именно (както е написано преди кат):
изпълнение и наблюдение на задачи на неограничен брой машини (колкото Celery / Kubernetes и съвестта ви позволяват)
с динамично генериране на работен поток от много лесен за писане и разбиране код на Python
и възможност за свързване на всякакви бази данни и API помежду си, като се използват както готови компоненти, така и домашно направени плъгини (което е изключително просто).
Използваме Apache Airflow така:
събираме данни от различни източници (много екземпляри на SQL Server и PostgreSQL, различни API с показатели на приложенията, дори 1C) в DWH и ODS (имаме Vertica и Clickhouse).
колко напреднал cron, който стартира процесите на консолидация на данни в ODS, а също така следи тяхната поддръжка.
До скоро нуждите ни се покриваха от един малък сървър с 32 ядра и 50 GB RAM. В Airflow това работи:
още 200 dag (всъщност работни потоци, в които напъхахме задачи),
във всеки средно 70 задачи,
тази доброта започва (също средно) веднъж на час.
А за това как се разширихме, ще пиша по-долу, но сега нека дефинираме über-проблема, който ще решим:
Има три изходни SQL сървъра, всеки с 50 бази данни - екземпляри на един проект, съответно те имат една и съща структура (почти навсякъде, муа-ха-ха), което означава, че всеки има таблица Orders (за щастие, таблица с това името може да бъде прокарано във всеки бизнес). Вземаме данните, като добавяме сервизни полета (изходен сървър, изходна база данни, ETL идентификатор на задача) и наивно ги хвърляме, да речем, във Vertica.
Да вървим!
Основната част, практическа (и малко теоретична)
Защо ние (и вие)
Когато дърветата бяха големи, а аз бях прост SQL-schik в един руски магазин на дребно, ние измамихме ETL процеси, известни още като потоци от данни, използвайки два налични инструмента:
Informatica Power Center - изключително разпространена система, изключително продуктивна, със собствен хардуер, собствена версия. Използвах дай Боже 1% от възможностите му. Защо? Ами, първо, този интерфейс, някъде от 380-те години, психически ни натовари. Второ, тази измишльотина е предназначена за изключително фантастични процеси, яростно повторно използване на компоненти и други много важни трикове за предприятието. За това какво струва, като крилото на Airbus AXNUMX / година, няма да кажем нищо.
Внимавайте, екранна снимка може да нарани малко хора под 30 години
Сървър за интеграция на SQL Server - използвахме този другар в нашите вътрешнопроектни потоци. Е, всъщност: ние вече използваме SQL Server и би било някак неразумно да не използваме неговите ETL инструменти. Всичко в него е добро: и интерфейсът е красив, и отчетите за напредъка ... Но не затова обичаме софтуерните продукти, о, не за това. Версия го dtsx (което е XML с възли, разбъркани при запазване) можем, но какъв е смисълът? Какво ще кажете да създадете пакет от задачи, който ще плъзга стотици таблици от един сървър на друг? Да, какви сто, показалецът ви ще падне от двадесет парчета, като щракнете върху бутона на мишката. Но определено изглежда по-модерно:
Определено търсихме изходи. Случай дори почти стигна до самонаписан генератор на SSIS пакети ...
…и тогава нова работа ме намери. И Apache Airflow ме изпревари на него.
Когато разбрах, че описанията на ETL процеси са прост код на Python, просто не танцувах от радост. Ето как потоците от данни бяха версионирани и разграничени, а изливането на таблици с една структура от стотици бази данни в една цел стана въпрос на код на Python в един и половина или два 13-инчови екрана.
Сглобяване на клъстера
Нека не организираме напълно детска градина и не говорим за напълно очевидни неща тук, като инсталиране на Airflow, избраната от вас база данни, Celery и други случаи, описани в доковете.
За да можем веднага да започнем експерименти, скицирах docker-compose.yml в който:
Нека всъщност рейзваме Airflow: Планировчик, Уеб сървър. Flower също ще се върти там, за да наблюдава задачите на Celery (защото вече е натиснат в apache/airflow:1.10.10-python3.7, но ние нямаме нищо против)
PostgreSQL, в който Airflow ще записва сервизната си информация (данни за планировчика, статистики за изпълнение и т.н.), а Celery ще маркира изпълнени задачи;
Redis, който ще действа като брокер на задачи за Celery;
Целина работник, които ще бъдат ангажирани с прякото изпълнение на задачите.
Към папка ./dags ще добавим нашите файлове с описанието на dags. Те ще бъдат взети в движение, така че няма нужда да жонглирате с целия стек след всяко кихане.
На някои места кодът в примерите не е показан изцяло (за да не претрупва текста), но някъде е модифициран в процеса. Пълни примери за работещ код могат да бъдат намерени в хранилището https://github.com/dm-logv/airflow-tutorial.
При сглобяването на композицията до голяма степен заложих на добре познатия образ puckel/docker-въздушен поток - не забравяйте да го проверите. Може би нямате нужда от нищо друго в живота си.
Всички настройки на Airflow са достъпни не само чрез airflow.cfg, но и чрез променливи на средата (благодарение на разработчиците), от които злонамерено се възползвах.
Естествено, не е готов за производство: умишлено не поставих сърдечни удари на контейнери, не се занимавах със сигурността. Но направих минимума, подходящ за нашите експериментатори.
Отбележи, че:
Папката dag трябва да е достъпна както за планировчика, така и за работниците.
Същото важи и за всички библиотеки на трети страни - всички те трябва да бъдат инсталирани на машини с планировчик и работници.
Е, сега е просто:
$ docker-compose up --scale worker=3
След като всичко се издигне, можете да разгледате уеб интерфейсите:
Ако не сте разбрали нищо във всички тези „даги“, тогава ето кратък речник:
Scheduler - най-важният чичо в Airflow, който контролира, че роботите работят усилено, а не човек: следи графика, актуализира dags, стартира задачи.
Като цяло в по-старите версии той имаше проблеми с паметта (не, не амнезия, а течове) и наследеният параметър дори остана в конфигурациите run_duration — неговият интервал за рестартиране. Но сега всичко е наред.
DAG (известен още като "dag") - "насочена ациклична графика", но такава дефиниция ще каже на малко хора, но всъщност това е контейнер за задачи, взаимодействащи помежду си (вижте по-долу) или аналог на Package в SSIS и Workflow в Informatica .
В допълнение към dags все още може да има subdags, но най-вероятно няма да стигнем до тях.
DAG Изпълнение - инициализиран dag, който се присвоява собствен execution_date. Даграни от един и същ даг могат да работят паралелно (ако сте направили задачите си идемпотентни, разбира се).
Оператор са части от код, отговорни за извършването на конкретно действие. Има три вида оператори:
действиекато нашия любим PythonOperator, който може да изпълни всеки (валиден) Python код;
прехвърляне, които пренасят данни от място на място, да речем MsSqlToHiveTransfer;
сензор от друга страна, това ще ви позволи да реагирате или да забавите по-нататъшното изпълнение на dag, докато настъпи събитие. HttpSensor може да изтегли посочената крайна точка и когато желаният отговор чака, да започне прехвърлянето GoogleCloudStorageToS3Operator. Любознателният ум ще попита: „Защо? В крайна сметка можете да правите повторения направо в оператора!“ И тогава, за да не задръстите пула от задачи със спрени оператори. Сензорът стартира, проверява и умира преди следващия опит.
Task - декларираните оператори, независимо от типа и прикачени към dag, се повишават в ранг на задача.
екземпляр на задачата - когато генералният плановик реши, че е време да изпрати задачи в битка на работници-изпълнители (точно на място, ако използваме LocalExecutor или към отдалечен възел в случай на CeleryExecutor), той им присвоява контекст (т.е. набор от променливи - параметри за изпълнение), разширява шаблони на команди или заявки и ги обединява.
Ние генерираме задачи
Първо, нека очертаем общата схема на нашия дъг, а след това ще се потопим в детайлите все повече и повече, защото прилагаме някои нетривиални решения.
И така, в най-простата си форма, такъв даг ще изглежда така:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Нека разберем:
Първо импортираме необходимите библиотеки и нещо друго;
sql_server_ds - List[namedtuple[str, str]] с имената на връзките от Airflow Connections и базите данни, от които ще вземем табелата си;
dag - обявяването на нашия даг, който задължително трябва да бъде в globals(), в противен случай Airflow няма да го намери. Дъг също трябва да каже:
как се казва той orders - това име ще се появи в уеб интерфейса,
че ще работи от полунощ на осми юли,
и трябва да работи приблизително на всеки 6 часа (за яки момчета тук вместо timedelta() допустимо cron- линия 0 0 0/6 ? * * *, за по-малко готините - израз като @daily);
workflow() ще свърши основната работа, но не сега. Засега просто ще изхвърлим нашия контекст в дневника.
А сега простата магия за създаване на задачи:
преминаваме през нашите източници;
инициализирам PythonOperator, който ще изпълни нашия манекен workflow(). Не забравяйте да посочите уникално (в рамките на dag) име на задачата и да завържете самия dag. Флаг provide_context на свой ред ще влее допълнителни аргументи във функцията, които внимателно ще съберем, използвайки **context.
За сега това е всичко. Какво получихме:
нов dag в уеб интерфейса,
сто и половина задачи, които ще се изпълняват паралелно (ако настройките на Airflow, Celery и капацитетът на сървъра го позволяват).
Е, почти разбрах.
Кой ще инсталира зависимостите?
За да опростя цялото това нещо, аз се прецаках docker-compose.yml обработка requirements.txt на всички възли.
Ето го сега:
Сивите квадрати са екземпляри на задачи, обработени от планировчика.
Чакаме малко, задачите се разграбват от работниците:
Зелените, разбира се, свършиха успешно работата си. Червените не са много успешни.
Между другото, няма папка на нашия продукт ./dags, няма синхронизация между машините - всички даги лежат git в нашия Gitlab и Gitlab CI разпространява актуализации на машини при сливане master.
Малко за Цвете
Докато работниците ни трошат залъгалките, нека си спомним още един инструмент, който може да ни покаже нещо - Цветето.
Първата страница с обобщена информация за работните възли:
Най-интензивната страница с успешни задачи:
Най-скучната страница със статуса на нашия брокер:
Най-ярката страница е с графики на състоянието на задачите и времето за тяхното изпълнение:
Зареждаме недотоварените
И така, всички задачи са отработени, можете да носите ранените.
И имаше много ранени – по една или друга причина. В случай на правилно използване на Airflow, точно тези квадрати показват, че данните определено не са пристигнали.
Трябва да гледате дневника и да рестартирате падналите екземпляри на задачи.
Щраквайки върху който и да е квадрат, ще видим достъпните за нас действия:
Можете да вземете и да изчистите падналите. Тоест, забравяме, че нещо се е провалило там и същата задача на екземпляра ще отиде в планировчика.
Ясно е, че да правиш това с мишката с всички червени квадратчета не е много хуманно – не това очакваме от Airflow. Естествено, имаме оръжия за масово унищожение: Browse/Task Instances
Нека изберем всичко наведнъж и нулираме, щракнете върху правилния елемент:
След почистване нашите таксита изглеждат така (те вече чакат графика да ги назначи):
Връзки, куки и други променливи
Време е да погледнем следващия DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Всеки ли е правил актуализация на отчет? Това отново е тя: има списък с източници, откъдето да вземете данните; има списък къде да поставите; не забравяйте да клаксоните, когато всичко се случи или се счупи (е, това не е за нас, не).
Нека да прегледаме файла отново и да разгледаме новите неясни неща:
from commons.operators import TelegramBotSendMessage - нищо не ни пречи да си направим собствени оператори, от което се възползвахме, като направихме малка обвивка за изпращане на съобщения до Unblocked. (Ще говорим повече за този оператор по-долу);
default_args={} - dag може да разпространява едни и същи аргументи към всички свои оператори;
to='{{ var.value.all_the_kings_men }}' - поле to няма да имаме твърдо кодирани, а динамично генерирани с помощта на Jinja и променлива със списък от имейли, които внимателно поставих Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS — условие за стартиране на оператора. В нашия случай писмото ще лети до шефовете само ако всички зависимости са работили успешно;
tg_bot_conn_id='tg_main' - аргументи conn_id приемете ID на връзката, в която създаваме Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - съобщенията в Telegram ще отлетят само ако има паднали задачи;
task_concurrency=1 - забраняваме едновременното стартиране на няколко екземпляра на една задача. В противен случай ще получим едновременно стартиране на няколко VerticaOperator (гледа една маса);
report_update >> [email, tg] - всичко VerticaOperator се събират в изпращането на писма и съобщения, като това:
Но тъй като операторите на нотификатори имат различни условия за стартиране, само един ще работи. В дървовидния изглед всичко изглежда малко по-малко визуално:
Ще кажа няколко думи за макроси и техните приятели - променливи.
Макросите са заместители на Jinja, които могат да заменят различна полезна информация в аргументите на оператора. Например така:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} ще се разшири до съдържанието на контекстната променлива execution_date във формат YYYY-MM-DD: 2020-07-14. Най-добрата част е, че контекстните променливи са приковани към конкретен екземпляр на задача (квадрат в дървовидния изглед) и когато се рестартират, контейнерите ще се разширят до същите стойности.
Присвоените стойности могат да се видят с помощта на бутона Rendered на всеки екземпляр на задача. Ето как се изпълнява задачата с изпращане на писмо:
И така при задачата с изпращане на съобщение:
Пълен списък с вградени макроси за най-новата налична версия е достъпен тук: препратка към макроси
Освен това с помощта на плъгини можем да декларираме собствени макроси, но това е друга история.
В допълнение към предварително дефинираните неща, можем да заменим стойностите на нашите променливи (вече използвах това в кода по-горе). Да създаваме в Admin/Variables няколко неща:
просто използвайте пътя до желания ключ: {{ var.json.bot_config.bot.token }}.
Буквално ще кажа една дума и ще покажа една екранна снимка за връзка. Тук всичко е елементарно: на страницата Admin/Connections ние създаваме връзка, добавяме нашите потребителски данни / пароли и по-специфични параметри там. Като този:
Паролите могат да бъдат шифровани (по-задълбочено от стандартните) или можете да пропуснете типа връзка (както направих за tg_main) - факт е, че списъкът с типове е твърдо свързан в моделите на Airflow и не може да бъде разширен, без да навляза в изходните кодове (ако внезапно не съм намерил нещо в Google, моля, поправете ме), но нищо няма да ни попречи да получим кредити само от име.
Можете също да направите няколко връзки с едно и също име: в този случай методът BaseHook.get_connection(), което ни дава връзки по име, ще даде произволен от няколко съименници (би било по-логично да се направи Round Robin, но нека го оставим на съвестта на разработчиците на Airflow).
Променливите и връзките със сигурност са страхотни инструменти, но е важно да не губите баланса: кои части от вашите потоци съхранявате в самия код и кои части давате на Airflow за съхранение. От една страна, може да е удобно бързо да промените стойността, например пощенска кутия, чрез потребителския интерфейс. От друга страна, това все още е връщане към щракането на мишката, от което (аз) искахме да се отървем.
Работата с връзки е една от задачите кукички. Като цяло, куките на Airflow са точки за свързване към услуги и библиотеки на трети страни. напр. JiraHook ще ни отвори клиент, за да взаимодействаме с Jira (можете да местите задачи напред и назад) и с помощта на SambaHook можете да натиснете локален файл към smb-точка.
Анализ на персонализирания оператор
И се доближихме до това да видим как се прави TelegramBotSendMessage
Код commons/operators.py с действителния оператор:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Тук, както всичко останало в Airflow, всичко е много просто:
Наследени от BaseOperator, който прилага доста специфични за Airflow неща (погледнете свободното си време)
Декларирани полета template_fields, в който Jinja ще търси макроси за обработка.
Подреди правилните аргументи за __init__(), задайте настройките по подразбиране, където е необходимо.
Не забравихме и инициализацията на предшественика.
Отвори съответната кука TelegramBotHookполучи клиентски обект от него.
Отменен (предефиниран) метод BaseOperator.execute(), който Airfow ще потрепне, когато дойде време да стартира оператора - в него ще приложим основното действие, забравяйки да влезем. (Ние влизаме, между другото, направо stdout и stderr - Въздушният поток ще прихване всичко, ще го увие красиво, ще го разложи, където е необходимо.)
Да видим какво имаме commons/hooks.py. Първата част на файла със самата кука:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Дори не знам какво да обясня тук, просто ще отбележа важните точки:
Ние наследяваме, помислете за аргументите - в повечето случаи ще бъде един: conn_id;
Преодоляване на стандартните методи: Ограничих се get_conn(), в който получавам параметрите на връзката по име и просто получавам секцията extra (това е JSON поле), в което аз (според собствените си инструкции!) поставих токена на Telegram bot: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Създавам екземпляр на нашия TelegramBot, давайки му конкретен токен.
Това е всичко. Можете да получите клиент от кука, като използвате TelegramBotHook().clent или TelegramBotHook().get_conn().
И втората част на файла, в която правя microwrapper за Telegram REST API, за да не влача същото python-telegram-bot за един метод sendMessage.
Правилният начин е да съберете всичко: TelegramBotSendMessage, TelegramBotHook, TelegramBot - в приставката, поставете в публично хранилище и го предайте на Open Source.
Докато изучавахме всичко това, нашите актуализации на отчета успяха да се провалят успешно и да ми изпратят съобщение за грешка в канала. Отивам да проверя дали не е наред...
Нещо се счупи в нашия додж! Не е ли това, което очаквахме? Точно!
Ще наливаш ли?
Чувстваш ли, че съм пропуснал нещо? Май е обещал да прехвърли данни от SQL Server към Vertica, а после го взе и се отклони от темата, негодника!
Това зверство беше умишлено, просто трябваше да дешифрирам малко терминология за вас. Сега можете да отидете по-далеч.
Нашият план беше следният:
Направете даг
Генериране на задачи
Вижте колко красиво е всичко
Присвояване на номера на сесии на попълвания
Вземете данни от SQL Server
Поставете данни във Vertica
Събиране на статистика
И така, за да стартирам всичко това, направих малко допълнение към нашия docker-compose.yml:
Vertica като домакин dwh с повечето настройки по подразбиране,
три екземпляра на SQL Server,
ние попълваме базите данни в последния с някои данни (в никакъв случай не разглеждайте mssql_init.py!)
Стартираме всичко добро с помощта на малко по-сложна команда от последния път:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Можете да използвате артикула, генериран от нашия чудотворен рандомизатор Data Profiling/Ad Hoc Query:
Основното нещо е да не го показвате на анализатори
разработете по-подробно ETL сесии Няма, там всичко е тривиално: правим база, има знак в нея, обгръщаме всичко с контекстен мениджър и сега правим това:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Дойде времето събирайте нашите данни от нашите сто и половина маси. Нека направим това с помощта на много непретенциозни редове:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
С помощта на кука получаваме от Airflow pymssql- свързване
Нека заменим ограничение под формата на дата в заявката - тя ще бъде хвърлена във функцията от машината за шаблони.
Подхранване на нашата молба pandasкой ще ни вземе DataFrame - ще ни бъде полезно в бъдеще.
Използвам заместване {dt} вместо параметър на заявка %s не защото съм зъл Пинокио, а защото pandas не мога да се справя pymssql и изплъзва последния params: Listвъпреки че наистина иска tuple.
Също така имайте предвид, че разработчикът pymssql реши да не го подкрепя повече и е време да се изнесе pyodbc.
Нека да видим с какво Airflow напълни аргументите на нашите функции:
Ако няма данни, няма смисъл да продължавате. Но също така е странно да се смята, че пълнежът е успешен. Но това не е грешка. А-а-а, какво да правя?! И ето какво:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException казва на Airflow, че няма грешки, но пропускаме задачата. Интерфейсът няма да има зелен или червен квадрат, а розов.
ID на нашата сесия за наводняване (той ще бъде различен за всяка задача),
Хеш от източника и ID на поръчката - така че в крайната база данни (където всичко е излято в една таблица) да имаме уникален ID на поръчката.
Остава предпоследната стъпка: изсипете всичко във Vertica. И колкото и да е странно, един от най-зрелищните и ефективни начини да направите това е чрез CSV!
При разпродажбата ние създаваме целевата плоча ръчно. Тук си позволих една малка машина:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
Използвам VerticaOperator() Създавам схема на база данни и таблица (ако все още не съществуват, разбира се). Основното нещо е да подредите правилно зависимостите:
- Е, - каза малката мишка, - нали сега
Убеден ли си, че аз съм най-ужасното животно в гората?
Джулия Доналдсън, The Gruffalo
Мисля, че ако моите колеги и аз имахме състезание: кой бързо ще създаде и стартира ETL процес от нулата: те с техните SSIS и мишка, а аз с Airflow ... И тогава бихме сравнили и лекотата на поддръжка ... Уау, мисля, че ще се съгласите, че ще ги победя на всички фронтове!
Ако малко по-сериозно, тогава Apache Airflow - като описва процесите под формата на програмен код - ми свърши работата много по-удобно и приятно.
Неговата неограничена разширяемост, както по отношение на плъгини, така и предразположение към скалируемост, ви дава възможност да използвате Airflow в почти всяка област: дори в пълния цикъл на събиране, подготовка и обработка на данни, дори при изстрелване на ракети (до Марс, на курс).
Част финална, справочно-информационна
Рейкът, който събрахме за вас
start_date. Да, това вече е местно меме. Чрез основния аргумент на Дъг start_date всички минават. Накратко, ако посочите в start_date текуща дата и schedule_interval - един ден, тогава DAG ще започне утре не по-рано.
start_date = datetime(2020, 7, 7, 0, 1, 2)
И няма повече проблеми.
Има друга грешка по време на изпълнение, свързана с него: Task is missing the start_date parameter, което най-често показва, че сте забравили да се свържете с dag оператора.
Всичко на една машина. Да, и бази (самият Airflow и нашето покритие), и уеб сървър, и планировчик, и работници. И дори проработи. Но с течение на времето броят на задачите за услуги нарасна и когато PostgreSQL започна да отговаря на индекса за 20 s вместо за 5 ms, ние го взехме и го отнесохме.
LocalExecutor. Да, ние все още седим на него и вече сме стигнали до ръба на бездната. LocalExecutor ни беше достатъчен досега, но сега е време да се разширим с поне един работник и ще трябва да работим усилено, за да преминем към CeleryExecutor. И с оглед на факта, че можете да работите с него на една машина, нищо не ви спира да използвате Celery дори на сървър, който "разбира се, никога няма да влезе в производство, честно!"
Неупотреба вградени инструменти:
Връзки за съхраняване на идентификационни данни за услугата,
SLA пропуски да отговаря на задачи, които не са успели навреме,
xcom за обмен на метаданни (казах целданни!) между dag задачи.
Злоупотреба с поща. Е, какво да кажа? Бяха настроени сигнали за всички повторения на паднали задачи. Сега служебният ми Gmail има >90 хиляди имейла от Airflow, а муцуната на уеб пощата отказва да вземе и изтрие повече от 100 наведнъж.
За да работим още повече с главите си, а не с ръцете си, Airflow ни подготви това:
REST API - все още е със статут на Опитно, което не му пречи да работи. С него можете не само да получите информация за dags и задачи, но и да спрете/стартирате dag, да създадете DAG Run или пул.
CLI - много инструменти са достъпни чрез командния ред, които не просто са неудобни за използване през WebUI, но като цяло липсват. Например:
backfill необходими за рестартиране на екземпляри на задачи.
Например дойдоха анализатори и казаха: „А вие, другарю, имате глупости в данните от 1 до 13 януари! Поправи го, поправи го, поправи го, поправи го!" И ти си такъв котлон:
Основна услуга: initdb, resetdb, upgradedb, checkdb.
run, което ви позволява да изпълнявате една задача за екземпляр и дори да оценявате всички зависимости. Освен това можете да го стартирате чрез LocalExecutor, дори ако имате клъстер Celery.
Прави почти същото test, само също в базите не пише нищо.
connections позволява масово създаване на връзки от обвивката.
API на Python - доста хардкор начин на взаимодействие, който е предназначен за плъгини, а не за роене в него с малки ръце. Но кой ни пречи да отидем /home/airflow/dags, пусни ipython и да започнеш да се бъркаш? Можете например да експортирате всички връзки със следния код:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Свързване към базата данни на Airflow. Не препоръчвам да пишете в него, но получаването на състояния на задачите за различни специфични показатели може да бъде много по-бързо и лесно, отколкото чрез който и да е API.
Да кажем, че не всички наши задачи са идемпотентни, но понякога могат да паднат и това е нормално. Но няколко запушвания вече са подозрителни и е необходимо да се провери.
Пазете се от SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
Позоваването
И разбира се, първите десет връзки от издаването на Google са съдържанието на папката Airflow от моите отметки.
Дзен на Python и Apache Airflow - имплицитно пренасочване на DAG, хвърляне на контекст във функциите, отново за зависимостите, а също и за пропускане на стартирания на задачи.