Airflow - інструмент, каб зручна і хутка распрацоўваць і падтрымліваць batch-працэсы апрацоўкі дадзеных

Airflow - інструмент, каб зручна і хутка распрацоўваць і падтрымліваць batch-працэсы апрацоўкі дадзеных

Прывітанне, Хабр! У гэтым артыкуле я жадаю распавесці пра адну выдатную прыладу для распрацоўкі batch-працэсаў апрацоўкі дадзеных, напрыклад, у інфраструктуры карпаратыўнага DWH ці вашага DataLake. Гаворка пойдзе пра Apache Airflow (далей Airflow). Ён несправядліва абдзелены ўвагай на Хабре, і ў асноўнай частцы я паспрабую пераканаць вас у тым, што прынамсі на Airflow варта глядзець пры выбары планавальніка для вашых ETL/ELT-працэсаў.

Раней я пісаў серыю артыкулаў на тэму DWH, калі працаваў у Тинькофф Банку. Цяпер я стаў часткай каманды Mail.Ru Group і займаюся развіццём платформы для аналізу дадзеных на гульнявым кірунку. Уласна, па меры з'яўлення навін і цікавых рашэнняў мы з камандай будзем расказваць тут пра нашу платформу для аналітыкі даных.

пралог

Такім чынам, пачнем. Што такое Airflow? Гэта бібліятэка (ну ці набор бібліятэк) для распрацоўкі, планавання і маніторынгу працоўных працэсаў. Асноўная асаблівасць Airflow: для апісання (распрацоўкі) працэсаў выкарыстоўваецца код на мове Python. Адсюль выцякае маса пераваг для арганізацыі вашага праекта і распрацоўкі: па сутнасці, ваш (напрыклад) ETL-праект - гэта проста Python-праект, і вы можаце яго арганізоўваць як вам зручна, улічваючы асаблівасці інфраструктуры, памер каманды і іншыя патрабаванні. Інструментальна ўсё проста. Выкарыстоўвайце, напрыклад, PyCharm + Git. Гэта цудоўна і вельмі зручна!

Цяпер разгледзім асноўныя сутнасці Airflow. Зразумеўшы іх сутнасць і прызначэнне, вы аптымальна арганізуеце архітэктуру працэсаў. Мабыць, асноўная сутнасць – гэта Directed Acyclic Graph (далей DAG).

ДЗЕНЬ

DAG - гэта некаторае сэнсавае аб'яднанне вашых задач, якія вы хочаце выканаць у строга вызначанай паслядоўнасці па вызначаным раскладзе. Airflow уяўляе зручны web-інтэрфейс для працы з DAG'амі і іншымі сутнасцямі:

Airflow - інструмент, каб зручна і хутка распрацоўваць і падтрымліваць batch-працэсы апрацоўкі дадзеных

DAG можа выглядаць такім чынам:

Airflow - інструмент, каб зручна і хутка распрацоўваць і падтрымліваць batch-працэсы апрацоўкі дадзеных

Распрацоўнік, праектуючы DAG, закладвае набор аператараў, на якіх будуць пабудаваны задачы ўсярэдзіне DAG'а. Тут мы прыходзім яшчэ да адной важнай сутнасці: Airflow Operator.

Аператары

Аператар - гэта сутнасць, на падставе якой ствараюцца асобнікі заданняў, дзе апісваецца, што будзе адбывацца падчас выканання экзэмпляра задання. Рэлізы Airflow з GitHub ужо ўтрымоўваюць набор аператараў, гатовых да выкарыстання. Прыклады:

  • BashOperator – аператар для выканання bash-каманды.
  • PythonOperator – аператар для выкліку Python-кода.
  • EmailOperator - аператар для адпраўкі email'а.
  • HTTPOperator - аператар для працы з http-запытамі.
  • SqlOperator - аператар для выканання SQL-кода.
  • Sensor - аператар чакання падзеі (надыходу патрэбнага часу, з'яўлення патрабаванага файла, радкі ў базе БД, адказу з API - і т. д., і т. п.).

Ёсць больш спецыфічныя аператары: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Вы таксама можаце распрацоўваць аператары, арыентуючыся на свае асаблівасці, і выкарыстоўваць іх у праекце. Напрыклад, мы стварылі MongoDBToHiveViaHdfsTransfer, аператар экспарту дакументаў з MongoDB ў Hive, і некалькі аператараў для працы з ClickHouse: CHLoadFromHiveOperator і CHTableLoaderOperator. Па сутнасці, як толькі ў праекце ўзнікае часта выкарыстоўваецца код, пабудаваны на базавых аператарах, можна задумацца аб тым, каб сабраць яго ў новы аператар. Гэта спросціць далейшую распрацоўку, і вы папоўніце сваю бібліятэку аператараў у праекце.

Далей усе гэтыя асобнікі задачак трэба выконваць, і зараз прамова пайдзе аб планавальніку.

Планавальнік

Планавальнік задач у Airflow пабудаваны на салера. Celery - гэта Python-бібліятэка, якая дазваляе арганізаваць чаргу плюс асінхроннае і размеркаванае выкананне задач. З боку Airflow усе задачы падзяляюцца на пулы. Пулы ствараюцца ўручную. Як правіла, іх мэта - абмежаваць нагрузку на працу з крыніцай або тыпізаваць задачы ўнутры DWH. Пуламі можна кіраваць праз web-інтэрфейс:

Airflow - інструмент, каб зручна і хутка распрацоўваць і падтрымліваць batch-працэсы апрацоўкі дадзеных

Кожны пул мае абмежаванне па колькасці слотаў. Пры стварэнні DAG'а яму задаецца пул:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Пул, зададзены на ўзроўні DAG'а, можна перавызначыць на ўзроўні задачы.
За планіроўку ўсіх задач у Airflow адказвае асобны працэс – Scheduler. Уласна, Scheduler займаецца ўсёй механікай пастаноўкі задач на выкананне. Задача, перш чым патрапіць на выкананне, праходзіць некалькі этапаў:

  1. У DAG'е выкананы папярэднія задачы, новую можна паставіць у чаргу.
  2. Чарга сартуецца ў залежнасці ад прыярытэту задач (прыярытэтамі таксама можна кіраваць), і, калі ў пуле ёсць вольны слот, задачу можна ўзяць у працу.
  3. Калі ёсць вольны worker celery, задача накіроўваецца ў яго; пачынаецца праца, якую вы запраграмавалі ў задачцы, выкарыстаючы той ці іншы аператар.

Досыць проста.

Scheduler працуе на мностве ўсіх DAG'аў і ўсіх задач усярэдзіне DAG'ов.

Каб Scheduler пачаў працу з DAG'ам, DAG'у трэба задаць расклад:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Ёсць набор гатовых preset'аў: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Таксама можна выкарыстоўваць cron-выразы:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Дата выканання

Каб разабрацца ў тым, як працуе Airflow, важна разумець, што такое Execution Date для DAG'а. У Airflow DAG мае вымярэнне Execution Date, т. е. у залежнасці ад раскладу працы DAG'а ствараюцца асобнікі задач на кожную Execution Date. І за кожную Execution Date задачы можна выканаць паўторна - ці, напрыклад, DAG можа працаваць адначасова ў некалькіх Execution Date. Гэта наглядна адлюстравана тут:

Airflow - інструмент, каб зручна і хутка распрацоўваць і падтрымліваць batch-працэсы апрацоўкі дадзеных

Нажаль (а можа быць, і на шчасце: залежыць ад сітуацыі), калі кіруецца рэалізацыя задачы ў DAG'е, тое выкананне ў папярэдніх Execution Date пайдзе ўжо з улікам карэкціровак. Гэта добра, калі трэба пералічыць дадзеныя ў мінулых перыядах новым алгарытмам, але дрэнна, таму што губляецца ўзнаўляльнасць выніку (вядома, ніхто не мяшае вярнуць з Git'а патрэбную версію зыходніка і разава палічыць тое, што трэба, так, як трэба).

Генерацыя задач

Рэалізацыя DAG'а - код на Python, таму ў нас ёсць вельмі зручны спосаб скараціць аб'ём кода пры працы, напрыклад, з шардаванымі крыніцамі. Няхай у вас у якасці крыніцы тры шарды MySQL, вам трэба злазіць у кожны і забраць нейкія дадзеныя. Прычым незалежна і паралельна. Код на Python у DAG'е можа выглядаць так:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG атрымліваецца такім:

Airflow - інструмент, каб зручна і хутка распрацоўваць і падтрымліваць batch-працэсы апрацоўкі дадзеных

Пры гэтым можна дадаць або прыбраць шард, проста скарэктаваўшы наладу і абнавіўшы DAG. Зручна!

Можна выкарыстоўваць і больш складаную генерацыю кода, напрыклад працаваць з крыніцамі ў выглядзе БД ці апісваць таблічную структуру, алгарытм працы з табліцай і з улікам асаблівасцяў інфраструктуры DWH генераваць працэс загрузкі N табліц да вас у сховішча. Ці ж, напрыклад, працу з API, якое не падтрымлівае працу з параметрам у выглядзе спісу, вы можаце згенераваць па гэтым спісе N задач у DAG'е, абмежаваць раўналежнасць запытаў у API пулам і выграсці з API неабходныя дадзеныя. Гнутка!

Рэпазітар

У Airflow ёсць свой бэкенд-рэпазітар, БД (можа быць MySQL ці Postgres, у нас Postgres), у якой захоўваюцца станы задач, DAG'ов, налады злучэнняў, глабальныя зменныя і т. д., і т. п. Тут жадалася бы сказаць, што рэпазітар у Airflow вельмі просты (каля 20 табліц) і зручны, калі вы хочаце пабудаваць які-небудзь свой працэс над ім. Успамінаецца 100500 табліц у рэпазітары Informatica, якія трэба было доўга ўкурваць, перш чым зразумець, як пабудаваць запыт.

Маніторынг

Улічваючы прастату рэпазітара, вы можаце самі пабудаваць зручны для вас працэс маніторынгу задачак. Мы выкарыстоўваем блакнот у Zeppelin, дзе глядзім стан задач:

Airflow - інструмент, каб зручна і хутка распрацоўваць і падтрымліваць batch-працэсы апрацоўкі дадзеных

Гэта можа быць і web-інтэрфейс самога Airflow:

Airflow - інструмент, каб зручна і хутка распрацоўваць і падтрымліваць batch-працэсы апрацоўкі дадзеных

Код Airflow адчынены, таму мы ў сябе дадалі алертынг ў Telegram. Кожны які працуе інстанс задачы, калі адбываецца памылка, спаміт у групу ў Telegram, дзе складаецца ўся каманда распрацоўкі і падтрымкі.

Атрымліваем праз Telegram аператыўнае рэагаванне (калі такое патрабуецца), праз Zeppelin – агульную карціну па задачах у Airflow.

Разам

Airflow у першую чаргу open source, і не трэба чакаць ад яго цудаў. Будзьце гатовыя патраціць час і сілы на тое, каб выбудаваць працуючае рашэнне. Мэта з разраду дасягальных, паверце, яно таго варта. Хуткасць распрацоўкі, гнуткасць, прастата дадання новых працэсаў - вам спадабаецца. Вядома, трэба надаваць шмат увагі арганізацыі праекту, стабільнасці працы самога Airflow: цудаў не бывае.

Цяпер у нас Airflow штодня адпрацоўвае каля 6,5 тысячы задач. Па характары яны дастаткова розныя. Ёсць задачы загрузкі дадзеных у асноўнае DWH з мноства розных і вельмі спецыфічных крыніц, ёсць задачы разліку вітрын ўнутры асноўнага DWH, ёсць задачы публікацыі дадзеных у хуткае DWH, ёсць шмат-шмат розных задач – і Airflow усе іх перажоўвае дзень за днём. Калі ж казаць лічбамі, то гэта 2,3 тысячы ELT задач рознай складанасці ўнутры DWH (Hadoop), каля 2,5 сотняў баз дадзеных крыніц, гэта каманда з 4-ох ETL распрацоўшчыкаў, якія дзеляцца на ETL працэсінг дадзеных у DWH і на ELT працэсінг дадзеных усярэдзіне DWH і вядома яшчэ аднаго адміна, Які займаецца інфраструктурай сэрвісу.

Планы на будучыню

Колькасць працэсаў непазбежна расце, і асноўнае, чым мы будзем займацца ў частцы інфраструктуры Airflow, - гэта маштабаванне. Мы жадаем пабудаваць кластар Airflow, вылучыць пару ног для worker'ов Celery і зрабіць дублюючую сябе галаву з працэсамі планіроўкі заданняў і рэпазітаром.

Эпілог

Гэта, вядома, далёка не ўсё, што хацелася б расказаць пра Airflow, але асноўныя моманты я пастараўся асвятліць. Апетыт прыходзіць падчас ежы, паспрабуйце - і вам спадабаецца 🙂

Крыніца: habr.com

Дадаць каментар