Сайн уу, Хабр! Энэ нийтлэлд би жишээлбэл, корпорацийн DWH эсвэл DataLake-ийн дэд бүтцэд багц өгөгдөл боловсруулах процессыг хөгжүүлэх нэг гайхалтай хэрэгслийн талаар ярихыг хүсч байна. Бид Apache Airflow (цаашид Агаарын урсгал гэх) тухай ярих болно. Энэ нь Habré дээр шударга бусаар анхаарал хандуулахгүй байгаа бөгөөд үндсэн хэсэгт би ETL/ELT процессын хуваарьлагчийг сонгохдоо ядаж Airflow-ийг анхаарч үзэх хэрэгтэй гэдэгт итгүүлэхийг хичээх болно.
Өмнө нь би Tinkoff банкинд ажиллаж байхдаа DWH сэдвээр цуврал нийтлэл бичиж байсан. Одоо би Mail.Ru группын нэг хэсэг болж, тоглоомын талбарт өгөгдөлд дүн шинжилгээ хийх платформ боловсруулж байна. Үнэндээ мэдээ, сонирхолтой шийдлүүд гарч ирэх тусам манай баг бид хоёр өгөгдлийн аналитик платформын талаар энд ярих болно.
Оршил
За ингээд эхэлцгээе. Агаарын урсгал гэж юу вэ? Энэ бол номын сан (эсвэл
Одоо Airflow-ийн үндсэн байгууллагуудыг харцгаая. Тэдний мөн чанар, зорилгыг ойлгосноор та процессын архитектурыг оновчтой зохион байгуулж чадна. Магадгүй гол байгууллага нь чиглэгдсэн циклик график (цаашид DAG гэх) байж болох юм.
DAG
DAG гэдэг нь тодорхой хуваарийн дагуу нарийн тодорхой дарааллаар гүйцэтгэхийг хүсч буй ажлуудынхаа утга учиртай холбоо юм. Airflow нь DAG болон бусад байгууллагуудтай ажиллахад тохиромжтой вэб интерфейсээр хангадаг.
DAG дараах байдлаар харагдаж болно.
Хөгжүүлэгч нь DAG-ийг зохион бүтээхдээ DAG доторх даалгавруудыг гүйцэтгэх операторуудын багцыг тавьдаг. Энд бид өөр нэг чухал байгууллага руу орлоо: Агаарын урсгалын оператор.
Операторууд
Оператор гэдэг нь ажлын жишээг гүйцэтгэх явцад юу тохиолдохыг тодорхойлсон ажлын байрны жишээг бий болгодог аж ахуйн нэгж юм.
- BashOperator - bash командыг гүйцэтгэх оператор.
- PythonOperator - Python код руу залгах оператор.
- EmailOperator — имэйл илгээх оператор.
- HTTPOperator - http хүсэлттэй ажиллах оператор.
- SqlOperator - SQL кодыг гүйцэтгэх оператор.
- Мэдрэгч нь үйл явдлыг хүлээх оператор юм (шаардлагатай цаг ирэх, шаардлагатай файлын харагдах байдал, мэдээллийн сан дахь мөр, API-ийн хариу гэх мэт).
Илүү тодорхой операторууд байдаг: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Мөн та өөрийн онцлогт тулгуурлан операторуудыг хөгжүүлж, төсөлдөө ашиглах боломжтой. Жишээлбэл, бид MongoDBToHiveViaHdfsTransfer, MongoDB-ээс Hive руу бичиг баримт экспортлох оператор болон түүнтэй ажиллах хэд хэдэн операторуудыг үүсгэсэн.
Дараа нь эдгээр бүх даалгавруудыг гүйцэтгэх шаардлагатай бөгөөд одоо бид төлөвлөгчийн талаар ярих болно.
Хуваарьлагч
Агаарын урсгалын ажлын хуваарь дээр суурилсан
Усан сан бүр үүрний тоонд хязгаарлалт тавьдаг. DAG үүсгэх үед түүнд сан өгдөг:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
DAG түвшинд тодорхойлсон санг даалгаврын түвшинд хүчингүй болгож болно.
Тусдаа процесс, Scheduler нь Airflow дахь бүх ажлыг төлөвлөх үүрэгтэй. Үнэн хэрэгтээ, Scheduler нь гүйцэтгэх даалгавруудыг тохируулах бүх механизмтай ажилладаг. Даалгаврыг гүйцэтгэхийн өмнө хэд хэдэн үе шатыг дамждаг:
- Өмнөх даалгавруудыг DAG-д гүйцэтгэсэн; шинээр нь дараалалд оруулах боломжтой.
- Дараалал нь даалгаврын тэргүүлэх чиглэлээс хамаарч эрэмблэгддэг (тэргүүлэх чиглэлийг мөн хянах боломжтой) бөгөөд хэрэв усан санд чөлөөтэй зай байвал даалгаврыг ажиллуулж болно.
- Хэрэв үнэ төлбөргүй ажилчин селөдерей байгаа бол даалгаврыг түүнд илгээдэг; асуудалд программчилсан ажил нэг эсвэл өөр операторыг ашиглан эхэлнэ.
Хангалттай энгийн.
Хуваарьлагч нь бүх DAG болон DAG доторх бүх ажлуудын багц дээр ажилладаг.
Хуваарьлагч нь DAG-тай ажиллаж эхлэхийн тулд DAG нь дараах хуваарийг тогтоох шаардлагатай.
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Бэлэн тохируулгын багц байдаг: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Та мөн cron илэрхийллийг ашиглаж болно:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Гүйцэтгэлийн огноо
Агаарын урсгал хэрхэн ажилладагийг ойлгохын тулд DAG-ийн гүйцэтгэлийн огноо гэж юу болохыг ойлгох нь чухал юм. Агаарын урсгалд DAG нь Гүйцэтгэлийн огноо хэмжигдэхүүнтэй байдаг, өөрөөр хэлбэл, DAG-ын ажлын хуваариас хамааран Гүйцэтгэх огноо бүрд даалгаврын жишээг үүсгэдэг. Гүйцэтгэлийн огноо бүрийн хувьд даалгавруудыг дахин гүйцэтгэж болно, эсвэл жишээлбэл, DAG нь хэд хэдэн Гүйцэтгэх огноонд нэгэн зэрэг ажиллах боломжтой. Үүнийг энд тодорхой харуулав:
Харамсалтай нь (эсвэл азаар: энэ нь нөхцөл байдлаас шалтгаална), хэрэв DAG дахь даалгаврын хэрэгжилтийг зассан бол өмнөх Гүйцэтгэлийн огнооны гүйцэтгэл нь тохируулгыг харгалзан үргэлжилнэ. Хэрэв та шинэ алгоритм ашиглан өнгөрсөн үеийн өгөгдлийг дахин тооцоолох шаардлагатай бол энэ нь сайн боловч үр дүнгийн давтагдах чадвар алдагдсан тул муу юм (Мэдээжийн хэрэг, Git-ээс эх кодын шаардлагатай хувилбарыг буцааж, юу болохыг тооцоолоход хэн ч танд саад болохгүй. танд хэрэгтэй байдлаараа нэг удаа хэрэгтэй болно).
Даалгавруудыг бий болгож байна
DAG-ийн хэрэгжилт нь Python-д код байдаг тул жишээ нь, хэлтэрхий эх сурвалжтай ажиллахдаа кодын хэмжээг багасгах маш тохиромжтой арга бий. Танд гурван MySQL хэлтэрхий байгаа гэж бодъё, та тус бүрд нь орж зарим мэдээлэл авах хэрэгтэй. Түүнээс гадна, бие даан, зэрэгцээ. DAG дахь Python код дараах байдалтай байж болно.
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG дараах байдлаар харагдаж байна.
Энэ тохиолдолд та зүгээр л тохиргоог хийж, DAG-г шинэчлэх замаар хэлтэрхий нэмэх эсвэл устгах боломжтой. Тав тухтай!
Та илүү нарийн төвөгтэй код үүсгэх аргыг ашиглаж болно, жишээлбэл, мэдээллийн сан хэлбэрээр эх сурвалжтай ажиллах эсвэл хүснэгтийн бүтэц, хүснэгттэй ажиллах алгоритмыг тайлбарлах, DWH дэд бүтцийн онцлогийг харгалзан процесс үүсгэх боломжтой. N хүснэгтийг хадгалах сандаа ачаалахад зориулагдсан. Эсвэл жишээ нь жагсаалт хэлбэрээр параметртэй ажиллахыг дэмждэггүй API-тай ажиллахдаа энэ жагсаалтаас DAG-д N даалгавруудыг үүсгэж, API дахь хүсэлтүүдийн параллель байдлыг сан руу хязгаарлаж, хусах боломжтой. API-аас шаардлагатай өгөгдөл. Уян хатан!
хадгалах газар
Airflow нь өөрийн гэсэн мэдээллийн сантай (MySQL эсвэл Postgres байж болно, бидэнд Postgres байдаг) даалгаврын төлөв, DAG, холболтын тохиргоо, глобал хувьсагч гэх мэтийг хадгалдаг. Энд би хэлмээр байна. Airflow дахь агуулах нь маш энгийн (ойролцоогоор 20 хүснэгт) бөгөөд хэрэв та үүн дээр өөрийн процессыг бүтээхийг хүсвэл тохиромжтой. Informatica репозитор дахь 100500 хүснэгтийг би санаж байна, энэ нь асуулга хэрхэн бүтээхийг ойлгохын өмнө удаан хугацааны туршид судлах шаардлагатай байсан.
Хяналт шинжилгээ
Хадгалах газрын энгийн байдлыг харгалзан та өөрт тохирсон ажлын хяналтын процессыг үүсгэж болно. Бид Zeppelin-д тэмдэглэлийн дэвтэр ашигладаг бөгөөд энд даалгаврын статусыг хардаг.
Энэ нь мөн Airflow-ийн вэб интерфэйс байж болно:
Агаарын урсгалын код нь нээлттэй эх сурвалж тул бид Telegram-д анхааруулга нэмсэн. Даалгаврын ажиллаж байгаа тохиолдол бүр, хэрэв алдаа гарвал бүх хөгжүүлэлт, дэмжлэг үзүүлэх баг бүрддэг Telegram дахь бүлэгт спам илгээдэг.
Бид Telegram-аар (шаардлагатай бол) шуурхай хариу хүлээн авдаг бөгөөд Zeppelin-ээр дамжуулан бид Airflow дахь ажлуудын ерөнхий дүр зургийг хүлээн авдаг.
Нийт
Агаарын урсгал нь үндсэндээ нээлттэй эх сурвалж бөгөөд үүнээс гайхамшгийг хүлээх ёсгүй. Үр дүнтэй шийдлийг бий болгохын тулд цаг хугацаа, хүчин чармайлт гаргахад бэлэн байгаарай. Зорилго нь биелэх боломжтой, надад итгээрэй, энэ нь үнэ цэнэтэй юм. Хөгжлийн хурд, уян хатан байдал, шинэ процесс нэмэхэд хялбар байдал - танд таалагдах болно. Мэдээжийн хэрэг, та төслийн зохион байгуулалт, Агаарын урсгалын тогтвортой байдалд маш их анхаарал хандуулах хэрэгтэй: гайхамшгууд тохиолддоггүй.
Одоо бид өдөр бүр Airflow ажиллаж байна 6,5 мянга орчим ажил. Тэд зан чанарын хувьд огт өөр. Олон янзын, маш тодорхой эх сурвалжаас үндсэн DWH-д өгөгдөл ачаалах, үндсэн DWH доторх дэлгүүрийн нүүрийг тооцоолох, хурдан DWH болгон өгөгдлийг нийтлэх, олон янзын даалгаварууд байдаг - болон Агаарын урсгал өдөр бүр бүгдийг нь зажилдаг. Тоогоор ярих юм бол энэ 2,3 мянга DWH (Hadoop) доторх янз бүрийн нарийн төвөгтэй ELT даалгаврууд, ойролцоогоор. 2,5 зуун мэдээллийн сан эх сурвалж, энэ нь нэг баг юм 4 ETL хөгжүүлэгч, эдгээр нь DWH дахь ETL өгөгдөл боловсруулах, DWH доторх ELT өгөгдөл боловсруулах болон мэдээж бусад зүйлд хуваагддаг. нэг админ, үйлчилгээний дэд бүтцийн асуудлыг хэн хариуцдаг.
РџР »Р ° РЅС <РЅР ° Р ± САРРґЭсарС ‰ РμРμ
Процессын тоо зайлшгүй нэмэгдэж байгаа бөгөөд агаарын урсгалын дэд бүтцийн хувьд бидний хийх гол зүйл бол масштабыг нэмэгдүүлэх явдал юм. Бид Агаарын урсгалын кластер байгуулж, Селөдерей ажилчдад зориулж хос хөл хуваарилж, ажлын хуваарь боловсруулах процесс, хадгалах газартай өөрөө хувилах толгой хийхийг хүсч байна.
Эпилог
Мэдээжийн хэрэг, энэ нь миний Агаарын урсгалын талаар хэлэхийг хүссэн бүх зүйл биш, гэхдээ би гол санааг тодруулахыг хичээсэн. Хоол идэхэд хоолны дуршил ирдэг, туршаад үзээрэй, танд таалагдах болно :)
Эх сурвалж: www.habr.com