Ҷараёни ҳаво як абзорест барои ба осонӣ ва зуд таҳия ва нигоҳ доштани равандҳои коркарди маълумотҳои партия

Ҷараёни ҳаво як абзорест барои ба осонӣ ва зуд таҳия ва нигоҳ доштани равандҳои коркарди маълумотҳои партия

Салом, Хабр! Дар ин мақола ман мехоҳам дар бораи як воситаи олиҷаноб барои таҳияи равандҳои коркарди маълумотҳо, масалан, дар инфрасохтори DWH корпоративӣ ё DataLake-и шумо сӯҳбат кунам. Мо дар бораи Apache Airflow (минбаъд Airflow номида мешавад) сӯҳбат хоҳем кард. Он ба Ҳабре беадолатона аз таваҷҷӯҳ маҳрум аст ва дар қисми асосӣ ман кӯшиш мекунам, ки шуморо итминон диҳам, ки ҳадди аққал Airflow ҳангоми интихоби нақша барои равандҳои ETL/ELT-и худ ба назар гирифтан лозим аст.

Қаблан, вақте ки ман дар Tinkoff Bank кор мекардам, дар мавзӯи DWH як қатор мақолаҳо навишта будам. Ҳоло ман як қисми дастаи Mail.Ru Group шудам ва платформаи таҳлили маълумотро дар майдони бозӣ таҳия мекунам. Воқеан, вақте ки хабарҳо ва ҳалли ҷолиб пайдо мешаванд, ман ва дастаи ман дар ин ҷо дар бораи платформаи худ барои таҳлили додаҳо сӯҳбат хоҳем кард.

Prologue

Пас, биёед оғоз кунем. Ҷараёни ҳаво чист? Ин китобхона аст (ё маҷмӯи китобхонаҳо) таҳия, ба нақша гирифтан ва назорат кардани равандҳои кор. Хусусияти асосии Airflow: Рамзи Python барои тавсифи (таҳияи) равандҳо истифода мешавад. Ин барои ташкили лоиҳа ва таҳияи шумо бартариҳои зиёд дорад: дар асл, лоиҳаи шумо (масалан) ETL танҳо як лоиҳаи Python аст ва шумо метавонед онро бо назардошти хусусиятҳои инфрасохтор, андозаи даста ва талаботи дигар. Асбоб ҳама чиз оддӣ аст. Масалан, PyCharm + Git-ро истифода баред. Ин аҷиб ва хеле қулай аст!

Акнун биёед ба объектҳои асосии Airflow назар кунем. Бо фаҳмидани моҳият ва ҳадафи онҳо, шумо метавонед меъмории равандҳои худро оптималӣ ташкил кунед. Эҳтимол, объекти асосӣ Графикаи мустақими даврӣ (минбаъд DAG номида мешавад) бошад.

DAG

DAG ин як иттиҳодияи пурмазмуни вазифаҳои шумост, ки шумо мехоҳед бо пайдарпаии қатъии муайяншуда мувофиқи ҷадвали мушаххас иҷро кунед. Airflow веб-интерфейси қулайро барои кор бо DAG ва дигар объектҳо таъмин мекунад:

Ҷараёни ҳаво як абзорест барои ба осонӣ ва зуд таҳия ва нигоҳ доштани равандҳои коркарди маълумотҳои партия

DAG метавонад чунин бошад:

Ҷараёни ҳаво як абзорест барои ба осонӣ ва зуд таҳия ва нигоҳ доштани равандҳои коркарди маълумотҳои партия

Таҳиягар ҳангоми тарҳрезии DAG як қатор операторҳоро муқаррар мекунад, ки дар онҳо вазифаҳо дар дохили DAG сохта мешаванд. Дар ин ҷо мо ба як ниҳоди муҳими дигар меоем: Оператори ҷараёни ҳаво.

Операторҳо

Оператор субъектест, ки дар асоси он намунаҳои корӣ эҷод карда мешаванд, ки дар ҷараёни иҷрои як мисоли корӣ чӣ рӯй медиҳад. Ҷараёни ҳаво аз GitHub мебарорад аллакай дорои маҷмӯи операторҳое, ки барои истифода омодаанд. Мисолҳо:

  • BashOperator - оператор барои иҷрои фармони bash.
  • PythonOperator - оператор барои занги рамзи Python.
  • EmailOperator — оператор барои фиристодани почтаи электронӣ.
  • HTTPOperator - оператор барои кор бо дархостҳои http.
  • SqlOperator - оператор барои иҷрои рамзи SQL.
  • Сенсор операторест барои интизории ҳодиса (расидани вақти зарурӣ, пайдоиши файли зарурӣ, сатр дар базаи маълумот, посух аз API ва ғ. ва ғ.).

Операторҳои мушаххастар мавҷуданд: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Шумо инчунин метавонед операторҳоро дар асоси хусусиятҳои худ таҳия кунед ва онҳоро дар лоиҳаи худ истифода баред. Масалан, мо MongoDBToHiveViaHdfsTransfer, оператор барои содироти ҳуҷҷатҳо аз MongoDB ба Hive ва якчанд операторҳоро барои кор бо кликхона: CHLoadFromHiveOperator ва CHTableLoaderOperator. Аслан, вақте ки лоиҳа коди дар изҳороти асосӣ сохташударо зуд-зуд истифода мебарад, шумо метавонед дар бораи сохтани он ба изҳороти нав фикр кунед. Ин рушди минбаъдаро осон мекунад ва шумо китобхонаи операторони худро дар лоиҳа васеъ хоҳед кард.

Минбаъд, ҳамаи ин мисолҳои вазифаҳо бояд иҷро шаванд ва ҳоло мо дар бораи нақшакаш гап мезанем.

Барномасоз

Нақшаи вазифаҳои Airflow дар асоси он сохта шудааст Карафс. Карафс як китобхонаи Python аст, ки ба шумо имкон медиҳад, ки навбат ва иҷрои асинхронӣ ва тақсимшудаи вазифаҳоро ташкил кунед. Дар тарафи ҷараёни ҳаво, ҳама вазифаҳо ба ҳавзҳо тақсим карда мешаванд. Ҳавзҳо дастӣ сохта мешаванд. Одатан, ҳадафи онҳо маҳдуд кардани сарбории кор бо манбаъ ё тавсифи вазифаҳо дар доираи DWH мебошад. Ҳавзҳоро тавассути интерфейси веб идора кардан мумкин аст:

Ҷараёни ҳаво як абзорест барои ба осонӣ ва зуд таҳия ва нигоҳ доштани равандҳои коркарди маълумотҳои партия

Ҳар як ҳавз дорои маҳдудияти шумораи слотҳо мебошад. Ҳангоми сохтани DAG ба он ҳавз дода мешавад:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Ҳавзи дар сатҳи DAG муайяншуда метавонад дар сатҳи вазифа бекор карда шавад.
Раванди алоҳида, Scheduler, барои банақшагирии ҳама вазифаҳо дар Airflow масъул аст. Дар асл, Scheduler бо тамоми механикаи гузоштани вазифаҳо барои иҷро сарукор дорад. Вазифа пеш аз иҷроиш аз якчанд марҳила мегузарад:

  1. Супоришҳои қаблӣ дар DAG иҷро шуданд, вазифаи навро метавон дар навбати худ гузошт.
  2. Навбат вобаста ба афзалияти вазифаҳо мураттаб карда мешавад (афзалиятҳоро низ назорат кардан мумкин аст) ва агар дар ҳавз ҷойи ройгон мавҷуд бошад, вазифаро ба кор бурдан мумкин аст.
  3. Агар карафси коргари бепул бошад, супориш ба он фиристода мешавад; коре, ки шумо дар масъала барномарезӣ кардаед, бо истифода аз ин ё он оператор оғоз мешавад.

Кофӣ оддӣ.

Банақшагир дар маҷмӯи ҳама DAGҳо ва ҳама вазифаҳои дохили DAGҳо кор мекунад.

Барои он ки Scheduler кор бо DAG оғоз кунад, DAG бояд ҷадвал муқаррар кунад:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Маҷмӯи танзимоти омода мавҷуд аст: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Шумо инчунин метавонед ифодаҳои cronро истифода баред:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Санаи иҷро

Барои фаҳмидани он ки чӣ тавр Airflow кор мекунад, фаҳмидани он муҳим аст, ки санаи иҷро барои DAG чист. Дар ҷараёни ҳаво, DAG андозагирии санаи иҷро дорад, яъне вобаста ба ҷадвали кории DAG, намунаҳои вазифаҳо барои ҳар як санаи иҷро сохта мешаванд. Ва барои ҳар як санаи иҷро, вазифаҳоро метавон дубора иҷро кард - ё, масалан, DAG метавонад дар як вақт дар якчанд санаҳои иҷро кор кунад. Ин дар ин ҷо равшан нишон дода шудааст:

Ҷараёни ҳаво як абзорест барои ба осонӣ ва зуд таҳия ва нигоҳ доштани равандҳои коркарди маълумотҳои партия

Мутаассифона (ё шояд хушбахтона: он аз вазъият вобаста аст), агар иҷрои вазифа дар DAG ислоҳ карда шавад, пас иҷроиш дар санаи қаблии иҷрошуда бо назардошти ислоҳот идома меёбад. Ин хуб аст, агар ба шумо лозим аст, ки маълумотро дар давраҳои гузашта бо истифода аз алгоритми нав дубора ҳисоб кунед, аммо ин бад аст, зеро такроршавандагии натиҷа аз даст меравад (албатта, ҳеҷ кас шуморо ташвиш намедиҳад, ки версияи лозимии коди сарчашмаро аз Git баргардонед ва ҳисоб кунед ба шумо як вақт лозим аст, ҳамон тавре ки ба шумо лозим аст).

Эҷоди вазифаҳо

Татбиқи DAG код дар Python аст, аз ин рӯ мо як роҳи хеле қулайи кам кардани миқдори кодро ҳангоми кор, масалан, бо манбаъҳои ҷудошуда дорем. Фарз мекунем, ки шумо се порчаи MySQL-ро ҳамчун сарчашма доред, шумо бояд ба ҳар яки онҳо ворид шавед ва баъзе маълумотро гиред. Илова бар ин, мустақилона ва дар баробари. Рамзи Python дар DAG метавонад чунин бошад:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG чунин менамояд:

Ҷараёни ҳаво як абзорест барои ба осонӣ ва зуд таҳия ва нигоҳ доштани равандҳои коркарди маълумотҳои партия

Дар ин ҳолат, шумо метавонед бо танҳо танзим кардани танзимот ва навсозии DAG як пораро илова ё хориҷ кунед. Бароҳат!

Шумо инчунин метавонед тавлиди коди мураккабтарро истифода баред, масалан, бо манбаъҳо дар шакли пойгоҳи додаҳо кор кунед ё сохтори ҷадвал, алгоритми кор бо ҷадвалро тавсиф кунед ва бо дарназардошти хусусиятҳои инфрасохтори DWH раванд тавлид кунед. барои бор кардани ҷадвалҳои N ба анбори шумо. Ё, масалан, кор бо API, ки кор бо параметр дар шакли рӯйхатро дастгирӣ намекунад, шумо метавонед аз ин рӯйхат дар DAG N вазифа эҷод кунед, параллелизми дархостҳоро дар API ба ҳавз маҳдуд кунед ва канда кунед маълумоти зарурӣ аз API. Фасеҳ!

анбор

Airflow дорои анбори пуштибонии худ, пойгоҳи додаҳо (метавонад MySQL ё Postgres бошад, мо Postgres дорем), ки ҳолати вазифаҳо, DAGҳо, танзимоти пайвастшавӣ, тағирёбандаҳои глобалӣ ва ғайраро нигоҳ медорад. Дар ин ҷо ман мехоҳам бигӯям, ки репозиторий дар Airflow хеле содда (тақрибан 20 ҷадвал) ва қулай аст, агар шумо хоҳед, ки ягон равандҳои худро дар болои он созед. Ман 100500 ҷадвалро дар анбори Informatica дар ёд дорам, ки пеш аз фаҳмидани тарзи сохтани пурсиш, онҳоро муддати тӯлонӣ омӯхтан лозим буд.

Мониторинг

Бо назардошти соддагии анбор, шумо метавонед як раванди мониторинги вазифаҳоро созед, ки барои шумо қулай аст. Мо блокнотро дар Zeppelin истифода мебарем, ки дар он ба ҳолати вазифаҳо назар мекунем:

Ҷараёни ҳаво як абзорест барои ба осонӣ ва зуд таҳия ва нигоҳ доштани равандҳои коркарди маълумотҳои партия

Ин инчунин метавонад интерфейси веби худи Airflow бошад:

Ҷараёни ҳаво як абзорест барои ба осонӣ ва зуд таҳия ва нигоҳ доштани равандҳои коркарди маълумотҳои партия

Рамзи Airflow манбаи кушода аст, аз ин рӯ мо ба Telegram огоҳӣ илова кардем. Ҳар як мисоли иҷрошавандаи вазифа, агар хатогӣ рух диҳад, гурӯҳро дар Telegram спам мекунад, ки дар он тамоми дастаи таҳия ва дастгирӣ иборат аст.

Мо тавассути Telegram посухи фаврӣ мегирем (агар лозим бошад) ва тавассути Zeppelin мо тасвири умумии вазифаҳоро дар Airflow мегирем.

Ҳамагӣ

Ҷараёни ҳаво пеш аз ҳама манбаи кушода аст ва шумо набояд аз он мӯъҷиза интизор шавед. Омода бошед, ки вақт ва саъю кӯшишро барои сохтани як ҳалли коркунанда сарф кунед. Мақсад иҷрошаванда аст, бовар кунед, ин меарзад. Суръати рушд, чандирӣ, осонии илова кардани равандҳои нав - ба шумо маъқул хоҳад шуд. Албатта, ба ташкили лоиха, устувории худи Хаво диккати калон додан лозим аст: муъчизахо руй намедиханд.

Ҳоло мо Airflow ҳамарӯза кор мекунем кариб 6,5 хазор супориш. Онҳо аз рӯи хислат комилан фарқ мекунанд. Вазифаҳои боркунии маълумот ба DWH асосӣ аз сарчашмаҳои гуногун ва хеле мушаххас мавҷуданд, вазифаҳои ҳисобкунии дӯконҳо дар дохили DWH асосӣ мавҷуданд, вазифаҳои интишори маълумот ба DWH зуд мавҷуданд, бисёр ва бисёр вазифаҳои гуногун мавҷуданд - ва Airflow хамаи онхоро руз то руз мехурад. Агар бо ракамхо гуем, ин аст 2,3 ҳазор Вазифаҳои ELT мураккабии гуногун дар доираи DWH (Hadoop), тақрибан. 2,5 сад базаи маълумот Манбаъҳо, ин як даста аз 4 таҳиягарони ETL, ки ба коркарди додаҳои ETL дар DWH ва коркарди додаҳои ELT дар дохили DWH ва албатта бештар тақсим мешаванд як админ, ки бо инфраструктураи хидмат сарукор дорад.

Нақшаҳои оянда

Миқдори равандҳо ногузир меафзояд ва чизи асосие, ки мо дар робита ба инфрасохтори Airflow анҷом медиҳем, ин миқёс аст. Мо мехоҳем кластери Airflow созем, барои коргарони Celery як ҷуфт пой ҷудо кунем ва бо равандҳои банақшагирии кор ва анбор сари худпешбар созем.

Эпилогонӣ

Ин, албатта, на ҳама чизест, ки ман мехостам дар бораи Airflow бигӯям, аммо ман кӯшиш кардам, ки нуктаҳои асосиро қайд кунам. Иштаҳо бо хӯрдан пайдо мешаванд, бисанҷед ва ба шумо писанд меояд :)

Манбаъ: will.com

Илова Эзоҳ