Airflow е инструмент за удобно и бързо разработване и поддържане на процеси за пакетна обработка на данни

Airflow е инструмент за удобно и бързо разработване и поддържане на процеси за пакетна обработка на данни

Хей Хабр! В тази статия искам да говоря за един страхотен инструмент за разработване на процеси за пакетна обработка на данни, например в инфраструктурата на корпоративна DWH или вашето DataLake. Ще говорим за Apache Airflow (наричан по-нататък Airflow). Той е несправедливо лишен от внимание на Habré и ​​в основната част ще се опитам да ви убедя, че поне Airflow си струва да разгледате, когато избирате планировчик за вашите ETL / ELT процеси.

Преди това написах поредица от статии по темата за DWH, когато работех в Tinkoff Bank. Сега станах част от екипа на Mail.Ru Group и разработвам платформа за анализ на данни в областта на игрите. Всъщност, когато се появят новини и интересни решения, ние с екипа ще говорим тук за нашата платформа за анализ на данни.

пролог

И така, да започваме. Какво е Airflow? Това е библиотека (или набор от библиотеки) за разработване, планиране и наблюдение на работни процеси. Основната характеристика на Airflow е, че кодът на Python се използва за описание (разработване) на процеси. Това има много предимства за организиране на вашия проект и разработка: всъщност вашият (например) ETL проект е просто проект на Python и можете да го организирате както желаете, като вземете предвид характеристиките на инфраструктурата, размера на екипа и други изисквания. Инструментално всичко е просто. Използвайте например PyCharm + Git. Страхотен е и много удобен!

Сега нека да разгледаме основните елементи на Airflow. След като разберете тяхната същност и предназначение, вие ще организирате оптимално архитектурата на процеса. Може би основният обект е насочената ациклична графика (наричана по-долу DAG).

DAG

DAG е някаква семантична асоциация на вашите задачи, които искате да изпълните в строго определена последователност по определен график. Airflow представя удобен уеб интерфейс за работа с DAG и други обекти:

Airflow е инструмент за удобно и бързо разработване и поддържане на процеси за пакетна обработка на данни

DAG може да изглежда така:

Airflow е инструмент за удобно и бързо разработване и поддържане на процеси за пакетна обработка на данни

Разработчикът, когато проектира DAG, определя набор от оператори, върху които ще бъдат изградени задачите в рамките на DAG. Тук стигаме до друга важна единица: операторът на въздушния поток.

оператори

Операторът е обект, на базата на който се създават екземпляри на задание, което описва какво ще се случи по време на изпълнението на екземпляр на задание. Издания на Airflow от GitHub вече съдържат набор от изрази, готови за използване. Примери:

  • BashOperator е оператор за изпълнение на bash команда.
  • PythonOperator е оператор за извикване на Python код.
  • EmailOperator - оператор за изпращане на имейл.
  • HTTPOperator - оператор за работа с http заявки.
  • SqlOperator е оператор за изпълнение на SQL код.
  • Сензорът е оператор за изчакване на събитие (пристигане на желания час, поява на необходимия файл, ред в базата данни, отговор от API и т.н. и т.н.).

Има по-специфични оператори: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Можете също така да разработите оператори, които да отговарят на вашите нужди и да ги използвате във вашия проект. Например създадохме MongoDBToHiveViaHdfsTransfer, оператор за експортиране на документи от MongoDB към Hive и няколко оператора за работа с Щракнете върху Къща: CHLoadFromHiveOperator и CHTableLoaderOperator. Всъщност, веднага щом даден проект често използва код, изграден върху основни изрази, можете да помислите за компилирането му в нов израз. Това ще опрости по-нататъшното развитие и ще добавите към вашата библиотека от оператори в проекта.

Освен това всички тези случаи на задачи трябва да бъдат изпълнени и сега ще говорим за планировчика.

Планировчик

Планировчикът на задачи в Airflow е изграден върху Целина. Celery е библиотека на Python, която ви позволява да организирате опашка плюс асинхронно и разпределено изпълнение на задачи. От страна на Airflow всички задачи са разделени на групи. Пуловете се създават ръчно. По правило тяхната цел е да ограничат натоварването при работа с източника или да въвеждат задачи вътре в DWH. Пуловете могат да се управляват чрез уеб интерфейса:

Airflow е инструмент за удобно и бързо разработване и поддържане на процеси за пакетна обработка на данни

Всеки пул има ограничение за броя на слотовете. Когато се създава DAG, му се дава пул:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Пулът, зададен на ниво DAG, може да бъде заменен на ниво задача.
Отделен процес, Scheduler, отговаря за планирането на всички задачи в Airflow. Всъщност Scheduler се занимава с цялата механика на задаване на задачи за изпълнение. Задачата преминава през няколко етапа, преди да бъде изпълнена:

  1. Предишни задачи са изпълнени в DAG, нова може да бъде поставена на опашка.
  2. Опашката се сортира в зависимост от приоритета на задачите (приоритетите също могат да се контролират) и ако има свободно място в пула, задачата може да бъде взета за работа.
  3. Ако има свободен worker celery, задачата се изпраща към него; работата, която сте програмирали в задачата, започва, използвайки един или друг оператор.

Достатъчно просто.

Планировчикът работи върху набор от всички DAG и всички задачи в DAG.

За да може Scheduler да започне да работи с DAG, DAG трябва да зададе график:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Има набор от готови настройки: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Можете също да използвате cron изрази:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Дата на изпълнение

За да разберете как работи Airflow, е важно да разберете какво е дата на изпълнение за DAG. Airflow DAG има измерение Дата на изпълнение, т.е. в зависимост от работния график на DAG, екземпляри на задачи се създават за всяка дата на изпълнение. И за всяка дата на изпълнение задачите могат да бъдат изпълнени повторно - или, например, DAG може да работи едновременно в няколко дати на изпълнение. Това е ясно показано тук:

Airflow е инструмент за удобно и бързо разработване и поддържане на процеси за пакетна обработка на данни

За съжаление (или може би за щастие: зависи от ситуацията), ако изпълнението на задачата в DAG е правилно, тогава изпълнението в предишната дата на изпълнение ще върви с корекциите. Това е добре, ако трябва да преизчислите данни за минали периоди, като използвате нов алгоритъм, но е лошо, защото се губи възпроизводимостта на резултата (разбира се, никой не си прави труда да върне необходимата версия на изходния код от Git и да изчисли това, което трябва веднъж, колкото е необходимо).

Генериране на задачи

Реализацията на DAG е код на Python, така че имаме много удобен начин да намалим количеството код, когато работим, например, с фрагментирани източници. Да предположим, че имате три MySQL шарда като източник, трябва да влезете във всеки и да вземете някои данни. И самостоятелно и паралелно. Кодът на Python в DAG може да изглежда така:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG изглежда така:

Airflow е инструмент за удобно и бързо разработване и поддържане на процеси за пакетна обработка на данни

В същото време можете да добавите или премахнете шард, като просто коригирате настройката и актуализирате DAG. Удобно!

Можете също така да използвате по-сложно генериране на код, например да работите с източници под формата на база данни или да опишете таблична структура, алгоритъм за работа с таблица и, като вземете предвид характеристиките на DWH инфраструктурата, генерирайте процеса за зареждане на N таблици във вашето хранилище. Или, например, работейки с API, който не поддържа работа с параметър под формата на списък, можете да генерирате N задачи в DAG, като използвате този списък, да ограничите паралелизма на заявките в API до пул и да извлечете необходимите данни от API. Гъвкав!

хранилище

Airflow има собствено бекенд хранилище, база данни (може би MySQL или Postgres, ние имаме Postgres), която съхранява състоянията на задачите, DAG, настройките за връзка, глобалните променливи и т.н. и т.н. Тук бих искал да кажа, че хранилището в Airflow е много проста (около 20 таблици) и удобна, ако искате да изградите някой от вашите процеси върху нея. Спомням си 100500 XNUMX таблици в хранилището на Informatica, които трябваше да се пушат дълго време, преди да разбера как да изградя заявка.

мониторинг

Като се има предвид простотата на хранилището, можете да изградите процес за наблюдение на задачи, който е удобен за вас. Използваме бележник в Zeppelin, където разглеждаме състоянието на задачите:

Airflow е инструмент за удобно и бързо разработване и поддържане на процеси за пакетна обработка на данни

Може да бъде и уеб интерфейсът на самия Airflow:

Airflow е инструмент за удобно и бързо разработване и поддържане на процеси за пакетна обработка на данни

Кодът на Airflow е отворен, затова добавихме предупреждение в Telegram. Всеки изпълняван екземпляр на задача, ако възникне грешка, изпраща спам към групата на Telegram, където се състои целият екип за разработка и поддръжка.

Получаваме бърз отговор чрез Telegram (при необходимост), чрез Zeppelin - цялостна картина на задачите в Airflow.

Общо

Airflow е преди всичко с отворен код и не очаквайте чудеса от него. Бъдете готови да вложите време и усилия, за да създадете работещо решение. Цел от категорията на постижимите, повярвайте ми, заслужава си. Скорост на разработка, гъвкавост, лекота на добавяне на нови процеси - ще ви хареса. Разбира се, трябва да обърнете много внимание на организацията на проекта, стабилността на работата на самия Airflow: няма чудеса.

Сега Airflow работи ежедневно около 6,5 хиляди задачи. Те са доста различни по природа. Има задачи за зареждане на данни в основния DWH от много различни и много специфични източници, има задачи за изчисляване на витрини в главния DWH, има задачи за публикуване на данни в бърз DWH, има много, много различни задачи - и Airflow дъвче ги цял ден след ден. Говорейки в цифри, това е 2,3 хиляди ELT задачи с различна сложност в DWH (Hadoop), около 2,5 стотици бази данни източници, това е команда от 4 ETL разработчици, които са разделени на ETL обработка на данни в DWH и ELT обработка на данни в рамките на DWH и разбира се повече един админ, който се занимава с инфраструктурата на услугата.

Планове за бъдещето

Броят на процесите неизбежно расте и основното нещо, което ще правим по отношение на инфраструктурата на Airflow, е мащабирането. Искаме да изградим клъстер Airflow, да разпределим няколко крака за работниците на Celery и да направим дубликат на главата с процеси за планиране на задачи и хранилище.

Епилог

Това, разбира се, далеч не е всичко, което бих искал да говоря за Airflow, но се опитах да подчертая основните точки. Апетитът идва с яденето, опитайте и ще ви хареса 🙂

Източник: www.habr.com

Добавяне на нов коментар