Прывітанне, Хабр! У гэтым артыкуле я жадаю распавесці пра адну выдатную прыладу для распрацоўкі batch-працэсаў апрацоўкі дадзеных, напрыклад, у інфраструктуры карпаратыўнага DWH ці вашага DataLake. Гаворка пойдзе пра Apache Airflow (далей Airflow). Ён несправядліва абдзелены ўвагай на Хабре, і ў асноўнай частцы я паспрабую пераканаць вас у тым, што прынамсі на Airflow варта глядзець пры выбары планавальніка для вашых ETL/ELT-працэсаў.
Раней я пісаў серыю артыкулаў на тэму DWH, калі працаваў у Тинькофф Банку. Цяпер я стаў часткай каманды Mail.Ru Group і займаюся развіццём платформы для аналізу дадзеных на гульнявым кірунку. Уласна, па меры з'яўлення навін і цікавых рашэнняў мы з камандай будзем расказваць тут пра нашу платформу для аналітыкі даных.
пралог
Такім чынам, пачнем. Што такое Airflow? Гэта бібліятэка (ну ці
Цяпер разгледзім асноўныя сутнасці Airflow. Зразумеўшы іх сутнасць і прызначэнне, вы аптымальна арганізуеце архітэктуру працэсаў. Мабыць, асноўная сутнасць – гэта Directed Acyclic Graph (далей DAG).
ДЗЕНЬ
DAG - гэта некаторае сэнсавае аб'яднанне вашых задач, якія вы хочаце выканаць у строга вызначанай паслядоўнасці па вызначаным раскладзе. Airflow уяўляе зручны web-інтэрфейс для працы з DAG'амі і іншымі сутнасцямі:
DAG можа выглядаць такім чынам:
Распрацоўнік, праектуючы DAG, закладвае набор аператараў, на якіх будуць пабудаваны задачы ўсярэдзіне DAG'а. Тут мы прыходзім яшчэ да адной важнай сутнасці: Airflow Operator.
Аператары
Аператар - гэта сутнасць, на падставе якой ствараюцца асобнікі заданняў, дзе апісваецца, што будзе адбывацца падчас выканання экзэмпляра задання.
- BashOperator – аператар для выканання bash-каманды.
- PythonOperator – аператар для выкліку Python-кода.
- EmailOperator - аператар для адпраўкі email'а.
- HTTPOperator - аператар для працы з http-запытамі.
- SqlOperator - аператар для выканання SQL-кода.
- Sensor - аператар чакання падзеі (надыходу патрэбнага часу, з'яўлення патрабаванага файла, радкі ў базе БД, адказу з API - і т. д., і т. п.).
Ёсць больш спецыфічныя аператары: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Вы таксама можаце распрацоўваць аператары, арыентуючыся на свае асаблівасці, і выкарыстоўваць іх у праекце. Напрыклад, мы стварылі MongoDBToHiveViaHdfsTransfer, аператар экспарту дакументаў з MongoDB ў Hive, і некалькі аператараў для працы з
Далей усе гэтыя асобнікі задачак трэба выконваць, і зараз прамова пайдзе аб планавальніку.
Планавальнік
Планавальнік задач у Airflow пабудаваны на
Кожны пул мае абмежаванне па колькасці слотаў. Пры стварэнні DAG'а яму задаецца пул:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
Пул, зададзены на ўзроўні DAG'а, можна перавызначыць на ўзроўні задачы.
За планіроўку ўсіх задач у Airflow адказвае асобны працэс – Scheduler. Уласна, Scheduler займаецца ўсёй механікай пастаноўкі задач на выкананне. Задача, перш чым патрапіць на выкананне, праходзіць некалькі этапаў:
- У DAG'е выкананы папярэднія задачы, новую можна паставіць у чаргу.
- Чарга сартуецца ў залежнасці ад прыярытэту задач (прыярытэтамі таксама можна кіраваць), і, калі ў пуле ёсць вольны слот, задачу можна ўзяць у працу.
- Калі ёсць вольны worker celery, задача накіроўваецца ў яго; пачынаецца праца, якую вы запраграмавалі ў задачцы, выкарыстаючы той ці іншы аператар.
Досыць проста.
Scheduler працуе на мностве ўсіх DAG'аў і ўсіх задач усярэдзіне DAG'ов.
Каб Scheduler пачаў працу з DAG'ам, DAG'у трэба задаць расклад:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Ёсць набор гатовых preset'аў: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Таксама можна выкарыстоўваць cron-выразы:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Дата выканання
Каб разабрацца ў тым, як працуе Airflow, важна разумець, што такое Execution Date для DAG'а. У Airflow DAG мае вымярэнне Execution Date, т. е. у залежнасці ад раскладу працы DAG'а ствараюцца асобнікі задач на кожную Execution Date. І за кожную Execution Date задачы можна выканаць паўторна - ці, напрыклад, DAG можа працаваць адначасова ў некалькіх Execution Date. Гэта наглядна адлюстравана тут:
Нажаль (а можа быць, і на шчасце: залежыць ад сітуацыі), калі кіруецца рэалізацыя задачы ў DAG'е, тое выкананне ў папярэдніх Execution Date пайдзе ўжо з улікам карэкціровак. Гэта добра, калі трэба пералічыць дадзеныя ў мінулых перыядах новым алгарытмам, але дрэнна, таму што губляецца ўзнаўляльнасць выніку (вядома, ніхто не мяшае вярнуць з Git'а патрэбную версію зыходніка і разава палічыць тое, што трэба, так, як трэба).
Генерацыя задач
Рэалізацыя DAG'а - код на Python, таму ў нас ёсць вельмі зручны спосаб скараціць аб'ём кода пры працы, напрыклад, з шардаванымі крыніцамі. Няхай у вас у якасці крыніцы тры шарды MySQL, вам трэба злазіць у кожны і забраць нейкія дадзеныя. Прычым незалежна і паралельна. Код на Python у DAG'е можа выглядаць так:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG атрымліваецца такім:
Пры гэтым можна дадаць або прыбраць шард, проста скарэктаваўшы наладу і абнавіўшы DAG. Зручна!
Можна выкарыстоўваць і больш складаную генерацыю кода, напрыклад працаваць з крыніцамі ў выглядзе БД ці апісваць таблічную структуру, алгарытм працы з табліцай і з улікам асаблівасцяў інфраструктуры DWH генераваць працэс загрузкі N табліц да вас у сховішча. Ці ж, напрыклад, працу з API, якое не падтрымлівае працу з параметрам у выглядзе спісу, вы можаце згенераваць па гэтым спісе N задач у DAG'е, абмежаваць раўналежнасць запытаў у API пулам і выграсці з API неабходныя дадзеныя. Гнутка!
Рэпазітар
У Airflow ёсць свой бэкенд-рэпазітар, БД (можа быць MySQL ці Postgres, у нас Postgres), у якой захоўваюцца станы задач, DAG'ов, налады злучэнняў, глабальныя зменныя і т. д., і т. п. Тут жадалася бы сказаць, што рэпазітар у Airflow вельмі просты (каля 20 табліц) і зручны, калі вы хочаце пабудаваць які-небудзь свой працэс над ім. Успамінаецца 100500 табліц у рэпазітары Informatica, якія трэба было доўга ўкурваць, перш чым зразумець, як пабудаваць запыт.
Маніторынг
Улічваючы прастату рэпазітара, вы можаце самі пабудаваць зручны для вас працэс маніторынгу задачак. Мы выкарыстоўваем блакнот у Zeppelin, дзе глядзім стан задач:
Гэта можа быць і web-інтэрфейс самога Airflow:
Код Airflow адчынены, таму мы ў сябе дадалі алертынг ў Telegram. Кожны які працуе інстанс задачы, калі адбываецца памылка, спаміт у групу ў Telegram, дзе складаецца ўся каманда распрацоўкі і падтрымкі.
Атрымліваем праз Telegram аператыўнае рэагаванне (калі такое патрабуецца), праз Zeppelin – агульную карціну па задачах у Airflow.
Разам
Airflow у першую чаргу open source, і не трэба чакаць ад яго цудаў. Будзьце гатовыя патраціць час і сілы на тое, каб выбудаваць працуючае рашэнне. Мэта з разраду дасягальных, паверце, яно таго варта. Хуткасць распрацоўкі, гнуткасць, прастата дадання новых працэсаў - вам спадабаецца. Вядома, трэба надаваць шмат увагі арганізацыі праекту, стабільнасці працы самога Airflow: цудаў не бывае.
Цяпер у нас Airflow штодня адпрацоўвае каля 6,5 тысячы задач. Па характары яны дастаткова розныя. Ёсць задачы загрузкі дадзеных у асноўнае DWH з мноства розных і вельмі спецыфічных крыніц, ёсць задачы разліку вітрын ўнутры асноўнага DWH, ёсць задачы публікацыі дадзеных у хуткае DWH, ёсць шмат-шмат розных задач – і Airflow усе іх перажоўвае дзень за днём. Калі ж казаць лічбамі, то гэта 2,3 тысячы ELT задач рознай складанасці ўнутры DWH (Hadoop), каля 2,5 сотняў баз дадзеных крыніц, гэта каманда з 4-ох ETL распрацоўшчыкаў, якія дзеляцца на ETL працэсінг дадзеных у DWH і на ELT працэсінг дадзеных усярэдзіне DWH і вядома яшчэ аднаго адміна, Які займаецца інфраструктурай сэрвісу.
Планы на будучыню
Колькасць працэсаў непазбежна расце, і асноўнае, чым мы будзем займацца ў частцы інфраструктуры Airflow, - гэта маштабаванне. Мы жадаем пабудаваць кластар Airflow, вылучыць пару ног для worker'ов Celery і зрабіць дублюючую сябе галаву з працэсамі планіроўкі заданняў і рэпазітаром.
Эпілог
Гэта, вядома, далёка не ўсё, што хацелася б расказаць пра Airflow, але асноўныя моманты я пастараўся асвятліць. Апетыт прыходзіць падчас ежы, паспрабуйце - і вам спадабаецца 🙂
Крыніца: habr.com