Здраво, Хабр! У овом чланку желим да говорим о једном сјајном алату за развој процеса групне обраде података, на пример, у инфраструктури корпоративног ДВХ-а или вашег ДатаЛаке-а. Говорићемо о Апацхе Аирфлов-у (у даљем тексту Аирфлов). Неправедно је лишен пажње на Хабреу, а у главном делу покушаћу да вас убедим да је бар Аирфлов вредан пажње када бирате планер за ваше ЕТЛ/ЕЛТ процесе.
Претходно сам написао серију чланака на тему ДВХ-а када сам радио у Тинкофф банци. Сада сам постао део тима Маил.Ру Гроуп и развијам платформу за анализу података у области игара. Заправо, како се буду појављивале вести и занимљива решења, мој тим и ја ћемо овде причати о нашој платформи за анализу података.
Прологуе
Дакле, почнимо. Шта је проток ваздуха? Ово је библиотека (или
Сада погледајмо главне ентитете Аирфлов-а. Разумевањем њихове суштине и сврхе, можете оптимално организовати своју архитектуру процеса. Можда је главни ентитет усмерени ациклични граф (у даљем тексту ДАГ).
ДАГ
ДАГ је нека значајна асоцијација ваших задатака које желите да завршите у строго дефинисаном низу према одређеном распореду. Аирфлов пружа згодан веб интерфејс за рад са ДАГ-овима и другим ентитетима:
ДАГ би могао изгледати овако:
Програмер, када дизајнира ДАГ, поставља скуп оператора на којима ће се градити задаци унутар ДАГ-а. Овде долазимо до још једног важног ентитета: Оператор протока ваздуха.
Оператори
Оператор је ентитет на основу којег се креирају инстанце посла, који описује шта ће се десити током извршавања инстанце посла.
- БасхОператор - оператор за извршавање басх команде.
- ПитхонОператор - оператор за позивање Питхон кода.
- ЕмаилОператор — оператер за слање е-поште.
- ХТТПОператор - оператор за рад са хттп захтевима.
- СклОператор - оператор за извршавање СКЛ кода.
- Сензор је оператор за чекање догађаја (долазак траженог времена, појављивање тражене датотеке, ред у бази, одговор АПИ-ја итд. итд.).
Постоје специфичнији оператори: ДоцкерОператор, ХивеОператор, С3ФилеТрансферОператор, ПрестоТоМисклОператор, СлацкОператор.
Такође можете развити оператере на основу сопствених карактеристика и користити их у свом пројекту. На пример, креирали смо МонгоДБТоХивеВиаХдфсТрансфер, оператор за извоз докумената из МонгоДБ у Хиве, и неколико оператера за рад са
Затим, све ове инстанце задатака треба да се изврше, а сада ћемо причати о планеру.
Планер
Планер задатака Аирфлов је изграђен на
Сваки базен има ограничење броја слотова. Приликом креирања ДАГ-а, даје му се скуп:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
Група дефинисана на нивоу ДАГ-а може бити замењена на нивоу задатка.
За заказивање свих задатака у Аирфлов-у одговоран је посебан процес, Сцхедулер. Заправо, Сцхедулер се бави свим механизмима постављања задатака за извршење. Задатак пролази кроз неколико фаза пре него што се изврши:
- Претходни задаци су завршени у ДАГ-у; нови се може ставити у ред чекања.
- Ред се сортира у зависности од приоритета задатака (приоритети се такође могу контролисати), а ако у пулу постоји слободан слот, задатак се може преузети у рад.
- Ако постоји слободан радник целер, задатак се шаље њему; почиње рад који сте програмирали у проблему, користећи један или други оператор.
Довољно једноставно.
Планер ради на скупу свих ДАГ-ова и свих задатака унутар ДАГ-ова.
Да би Планер почео да ради са ДАГ-ом, ДАГ треба да постави распоред:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Постоји скуп готових унапред подешених вредности: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Такође можете користити црон изразе:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Датум извршења
Да бисте разумели како функционише проток ваздуха, важно је разумети који је датум извршења за ДАГ. У Аирфлов-у, ДАГ има димензију датума извршења, односно, у зависности од распореда рада ДАГ-а, инстанце задатака се креирају за сваки датум извршења. И за сваки датум извршења, задаци се могу поново извршити - или, на пример, ДАГ може да ради истовремено у неколико датума извршења. Ово је јасно приказано овде:
Нажалост (или можда на срећу: зависи од ситуације), ако се исправи имплементација задатка у ДАГ-у, онда ће се извршење у претходном Датуму извршења наставити узимајући у обзир прилагођавања. Ово је добро ако треба да прерачунате податке у прошлим периодима користећи нови алгоритам, али је лоше јер се губи поновљивост резултата (наравно, нико вам не смета да вратите потребну верзију изворног кода из Гита и израчунате шта треба вам једно време, онако како вам треба).
Генерисање задатака
Имплементација ДАГ-а је код у Питхон-у, тако да имамо веома згодан начин да смањимо количину кода када радимо, на пример, са подељеним изворима. Рецимо да имате три МиСКЛ шарда као извор, морате се попети у сваки и покупити неке податке. Штавише, независно и паралелно. Питхон код у ДАГ-у може изгледати овако:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
ДАГ изгледа овако:
У овом случају, можете додати или уклонити фрагмент једноставним подешавањем подешавања и ажурирањем ДАГ-а. Удобан!
Такође можете користити сложеније генерисање кода, на пример, радити са изворима у облику базе података или описати структуру табеле, алгоритам за рад са табелом и, узимајући у обзир карактеристике ДВХ инфраструктуре, генерисати процес за учитавање Н табела у вашу меморију. Или, на пример, радећи са АПИ-јем који не подржава рад са параметром у облику листе, можете да генеришете Н задатака у ДАГ-у са ове листе, ограничите паралелизам захтева у АПИ-ју на групу и скрежете потребне податке из АПИ-ја. Флексибилно!
репозиторијум
Аирфлов има своје позадинско спремиште, базу података (може бити МиСКЛ или Постгрес, имамо Постгрес), која чува стања задатака, ДАГ-ова, подешавања везе, глобалне варијабле, итд, итд. Овде бих волео да кажем да Репозиторијум у Аирфлов-у је веома једноставан (око 20 табела) и погодан ако желите да направите било који од сопствених процеса на њему. Сећам се 100500 табела у репозиторијуму Информатике, које је требало дуго проучавати пре него што се схвати како да се направи упит.
Праћење
С обзиром на једноставност спремишта, можете изградити процес праћења задатака који вам одговара. Користимо бележницу у Зеппелин-у, где гледамо статус задатака:
Ово такође може бити веб интерфејс самог Аирфлов-а:
Код Аирфлов је отвореног кода, па смо додали упозорење у Телеграм. Свака покренута инстанца задатка, ако дође до грешке, шаље нежељену пошту групи у Телеграму, где се састоји цео тим за развој и подршку.
Добијамо брз одговор преко Телеграма (ако је потребно), а преко Зеппелина добијамо општу слику задатака у Аирфлов-у.
Укупно
Аирфлов је првенствено отвореног кода и од њега не треба очекивати чуда. Будите спремни да уложите време и труд да направите решење које функционише. Циљ је остварив, верујте ми, вреди. Брзина развоја, флексибилност, лакоћа додавања нових процеса - свидеће вам се. Наравно, треба посветити велику пажњу организацији пројекта, стабилности самог Аирфлов-а: чуда се не дешавају.
Сада имамо Аирфлов који ради свакодневно око 6,5 хиљада задатака. Они су прилично различити по карактеру. Постоје задаци учитавања података у главни ДВХ из много различитих и врло специфичних извора, постоје задаци израчунавања излога унутар главног ДВХ, постоје задаци објављивања података у брзом ДВХ, постоји много, много различитих задатака - и Аирфлов све их жваће дан за даном. Говорећи у бројкама, ово је 2,3 хиљаде ЕЛТ задаци различите сложености у оквиру ДВХ (Хадооп), прибл. 2,5 стотине база података извора, ово је тим из 4 ЕТЛ програмера, који се деле на ЕТЛ обраду података у ДВХ и ЕЛТ обраду података унутар ДВХ и наравно више један админ, који се бави инфраструктуром сервиса.
Планови за будућност
Број процеса неминовно расте, а главна ствар коју ћемо радити у погледу инфраструктуре Аирфлов је скалирање. Желимо да направимо Аирфлов кластер, доделимо пар ногу радницима Целери-а и направимо главу која се сама умножава са процесима заказивања послова и спремиштем.
Епилог
Ово, наравно, није све што бих желео да кажем о Аирфлов-у, али сам покушао да истакнем главне тачке. Апетит долази са јелом, пробајте и свидеће вам се :)
Извор: ввв.хабр.цом