Привіт, Хабре! У цій статті я хочу розповісти про один чудовий інструмент для розробки batch-процесів обробки даних, наприклад, в інфраструктурі корпоративного DWH або вашого DataLake. Йтиметься про Apache Airflow (далі Airflow). Він несправедливо обділений увагою на Хабре, і в основному я спробую переконати вас у тому, що як мінімум на Airflow варто дивитися при виборі планувальника для ETL/ELT-процесів.
Раніше я писав серію статей на тему DWH, коли працював у Тінькофф Банку. Тепер я став частиною команди Mail.Ru Group та займаюся розвитком платформи для аналізу даних на ігровому напрямку. Власне, у міру появи новин та цікавих рішень ми з командою розповідатимемо тут про нашу платформу для аналітики даних.
Пролог
Тож почнемо. Що таке Airflow? Це бібліотека (ну або
Тепер розглянемо основні сутності Airflow. Зрозумівши їхню суть і призначення, ви оптимально організуєте архітектуру процесів. Мабуть, основна сутність це Directed Acyclic Graph (далі DAG).
DAG
DAG - це деяке смислове поєднання ваших завдань, які ви хочете виконати в строго певній послідовності за розкладом. Airflow представляє зручний web-інтерфейс для роботи з DAG'ами та іншими сутностями:
DAG може виглядати таким чином:
Розробник, проектуючи DAG, закладає набір операторів, де будуть побудовані завдання всередині DAG'а. Тут ми приходимо до однієї важливої сутності: Airflow Operator.
Оператори
Оператор - це сутність, на підставі якої створюються екземпляри завдань, де описується, що відбуватиметься під час виконання екземпляра завдання.
- BashOperator - оператор для виконання bash-команди.
- PythonOperator – оператор для виклику Python-коду.
- EmailOperator - оператор для відправки email'а.
- HTTPOperator – оператор для роботи з http-запитами.
- SqlOperator - оператор для виконання SQL-коду.
- Sensor - оператор очікування події (настання потрібного часу, появи потрібного файлу, рядки в базі БД, відповіді з API - і т.д., і т.п.).
Є більш специфічні оператори: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Ви також можете розробляти оператори, орієнтуючись на свої особливості, та використовувати їх у проекті. Наприклад, ми створили MongoDBToHiveViaHdfsTransfer, оператор експорту документів з MongoDB до Hive, і кілька операторів для роботи з
Далі всі ці екземпляри завдань потрібно виконувати, і тепер мова піде про планувальника.
Планувальник
Планувальник завдань в Airflow побудований на
Кожен пул має обмеження кількості слотів. При створенні DAG'а йому задається пул:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
Пул, заданий лише на рівні DAG'а, можна перевизначити лише на рівні завдання.
За планування всіх завдань Airflow відповідає окремий процес — Scheduler. Власне, Scheduler займається всією механікою постановки завдань виконання. Завдання, перш ніж потрапити на виконання, проходить кілька етапів:
- У DAG'і виконані попередні завдання, нову можна поставити у чергу.
- Черга сортується залежно від пріоритету завдань (пріоритетами також можна керувати), і, якщо в пулі є вільний слот, завдання можна взяти в роботу.
- Якщо є вільний worker celery, завдання прямує до нього; починається робота, яку ви запрограмували в задачі, використовуючи той чи інший оператор.
Досить просто.
Scheduler працює на безлічі всіх DAG'ів та всіх завдань усередині DAG'ів.
Щоб Scheduler почав роботу з DAG'ом, DAG'у потрібно задати розклад:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Є набір готових preset'ів: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Також можна використовувати cron-вирази:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Дата виконання
Щоб розібратися в тому, як працює Airflow, важливо розуміти, що таке Execution Date для DAG'а. У Airflow DAG має вимір Execution Date, т. е. залежно від розкладу роботи DAG'а створюються екземпляри завдань кожну Execution Date. І за кожну Execution Date завдання можна виконати повторно - або, наприклад, DAG може працювати одночасно в кількох Execution Date. Це наочно відображено тут:
На жаль (а можливо, і на щастя: залежить від ситуації), якщо керується реалізація завдання в DAG'і, то виконання в попередніх Execution Date піде вже з урахуванням коригувань. Це добре, якщо потрібно перерахувати дані в минулих періодах новим алгоритмом, але погано, тому що втрачається відтворюваність результату (звісно, ніхто не заважає повернути з Git'а потрібну версію вихідного коду і разово порахувати те, що потрібно так, як потрібно).
Генерація завдань
Реалізація DAG'а — код на Python, тому ми маємо дуже зручний спосіб скоротити обсяг коду при роботі, наприклад, із шардованими джерелами. Нехай у вас як джерело три шарди MySQL, вам потрібно злазити в кожен і забрати якісь дані. Причому незалежно та паралельно. Код на Python у DAG'і може виглядати так:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG виходить таким:
При цьому можна додати або прибрати шард, просто скоригувавши налаштування та оновивши DAG. Зручно!
Можна використовувати більш складну генерацію коду, наприклад працювати з джерелами у вигляді БД або описувати табличну структуру, алгоритм роботи з таблицею і з урахуванням особливостей інфраструктури DWH генерувати процес завантаження N таблиць до вас у сховище. Або, наприклад, роботу з API, яке не підтримує роботу з параметром у вигляді списку, ви можете згенерувати за цим списком N завдань у DAG'і, обмежити паралельність запитів в API пулом і вигрібти з API необхідні дані. Гнучко!
Репозиторій
В Airflow є свій бекенд-репозиторій, БД (може бути MySQL або Postgres, у нас Postgres), в якій зберігаються стани завдань, DAG'ів, налаштування з'єднань, глобальні змінні і т.д., і т.п. сказати, що репозиторій в Airflow дуже простий (близько 20 таблиць) і зручний, якщо ви хочете побудувати свій процес над ним. Згадується 100500 XNUMX таблиць у репозиторії Informatica, які потрібно було довго курити, перш ніж зрозуміти, як побудувати запит.
моніторинг
Враховуючи простоту репозиторію, ви можете самі побудувати зручний для вас процес моніторингу завдань. Ми використовуємо блокнот у Zeppelin, де дивимося стан завдань:
Це може бути і web-інтерфейс самого Airflow:
Код Airflow відкритий, тому ми додали до себе алертинг в Telegram. Кожен працюючий інстанс завдання, якщо відбувається помилка, спамить у групу Telegram, де складається вся команда розробки та підтримки.
Отримуємо через Telegram оперативне реагування (якщо таке потрібно), через Zeppelin – загальну картину із завдань Airflow.
Разом
Airflow насамперед open source, і не потрібно чекати від нього чудес. Будьте готові витратити час і сили на те, щоб вибудувати діюче рішення. Мета з розряду досяжних, повірте, воно того варте. Швидкість розробки, гнучкість, простота додавання нових процесів вам сподобається. Звісно, треба приділяти багато уваги організації проекту, стабільності роботи самого Airflow: чудес не буває.
Зараз у нас Airflow щодня відпрацьовує близько 6,5 тисяч задач. За характером вони досить різні. Є завдання завантаження даних в основне DWH з безлічі різних і дуже специфічних джерел, є завдання розрахунку вітрин всередині основного DWH, є завдання публікації даних в швидке DWH, є багато різних завдань - і Airflow всі їх пережовує день за днем. Якщо ж говорити цифрами, то це 2,3 тисячі ELT задач різної складності всередині DWH (Hadoop), навколо 2,5 сотень баз даних джерел, це команда з 4-ох ETL розробників, які діляться на ETL процесинг даних у DWH і ELT процесинг даних усередині DWH і ще одного адміна, що займається інфраструктурою сервісу.
Плани на майбутнє
Кількість процесів неминуче зростає, і основне, чим ми займатимемося в частині інфраструктури Airflow – це масштабування. Ми хочемо побудувати кластер Airflow, виділити пару ніг для worker'ов Celery і зробити голову, що дублює себе, з процесами планування завдань і репозиторієм.
Епілог
Це, звичайно, далеко не все, що хотілося б розповісти про Airflow, але основні моменти я постарався висвітлити. Апетит приходить під час їжі, спробуйте - і вам сподобається 🙂
Джерело: habr.com