Агаарын урсгал нь багц өгөгдөл боловсруулах процессыг хялбар бөгөөд хурдан хөгжүүлэх, хадгалах хэрэгсэл юм

Агаарын урсгал нь багц өгөгдөл боловсруулах процессыг хялбар бөгөөд хурдан хөгжүүлэх, хадгалах хэрэгсэл юм

Сайн уу, Хабр! Энэ нийтлэлд би жишээлбэл, корпорацийн DWH эсвэл DataLake-ийн дэд бүтцэд багц өгөгдөл боловсруулах процессыг хөгжүүлэх нэг гайхалтай хэрэгслийн талаар ярихыг хүсч байна. Бид Apache Airflow (цаашид Агаарын урсгал гэх) тухай ярих болно. Энэ нь Habré дээр шударга бусаар анхаарал хандуулахгүй байгаа бөгөөд үндсэн хэсэгт би ETL/ELT процессын хуваарьлагчийг сонгохдоо ядаж Airflow-ийг анхаарч үзэх хэрэгтэй гэдэгт итгүүлэхийг хичээх болно.

Өмнө нь би Tinkoff банкинд ажиллаж байхдаа DWH сэдвээр цуврал нийтлэл бичиж байсан. Одоо би Mail.Ru группын нэг хэсэг болж, тоглоомын талбарт өгөгдөлд дүн шинжилгээ хийх платформ боловсруулж байна. Үнэндээ мэдээ, сонирхолтой шийдлүүд гарч ирэх тусам манай баг бид хоёр өгөгдлийн аналитик платформын талаар энд ярих болно.

Оршил

За ингээд эхэлцгээе. Агаарын урсгал гэж юу вэ? Энэ бол номын сан (эсвэл номын сангийн багц) ажлын явцыг боловсруулах, төлөвлөх, хянах. Airflow-ийн гол онцлог нь: Python кодыг процессуудыг тайлбарлахад (хөгжүүлэх) ашигладаг. Энэ нь таны төсөл, хөгжлийг зохион байгуулахад маш олон давуу талтай: үндсэндээ таны (жишээлбэл) ETL төсөл бол зүгээр л Python төсөл бөгөөд та үүнийг дэд бүтцийн онцлог, багийн хэмжээ, багийн хэмжээ зэргийг харгалзан өөрийн хүссэнээр зохион байгуулж болно. бусад шаардлага. Хэрэгслийн хувьд бүх зүйл энгийн байдаг. Жишээ нь PyCharm + Git ашиглана уу. Энэ бол гайхалтай бөгөөд маш тохиромжтой!

Одоо Airflow-ийн үндсэн байгууллагуудыг харцгаая. Тэдний мөн чанар, зорилгыг ойлгосноор та процессын архитектурыг оновчтой зохион байгуулж чадна. Магадгүй гол байгууллага нь чиглэгдсэн циклик график (цаашид DAG гэх) байж болох юм.

DAG

DAG гэдэг нь тодорхой хуваарийн дагуу нарийн тодорхой дарааллаар гүйцэтгэхийг хүсч буй ажлуудынхаа утга учиртай холбоо юм. Airflow нь DAG болон бусад байгууллагуудтай ажиллахад тохиромжтой вэб интерфейсээр хангадаг.

Агаарын урсгал нь багц өгөгдөл боловсруулах процессыг хялбар бөгөөд хурдан хөгжүүлэх, хадгалах хэрэгсэл юм

DAG дараах байдлаар харагдаж болно.

Агаарын урсгал нь багц өгөгдөл боловсруулах процессыг хялбар бөгөөд хурдан хөгжүүлэх, хадгалах хэрэгсэл юм

Хөгжүүлэгч нь DAG-ийг зохион бүтээхдээ DAG доторх даалгавруудыг гүйцэтгэх операторуудын багцыг тавьдаг. Энд бид өөр нэг чухал байгууллага руу орлоо: Агаарын урсгалын оператор.

Операторууд

Оператор гэдэг нь ажлын жишээг гүйцэтгэх явцад юу тохиолдохыг тодорхойлсон ажлын байрны жишээг бий болгодог аж ахуйн нэгж юм. GitHub-аас агаарын урсгалыг гаргадаг ашиглахад бэлэн операторуудын багцыг аль хэдийн агуулж байна. Жишээ нь:

  • BashOperator - bash командыг гүйцэтгэх оператор.
  • PythonOperator - Python код руу залгах оператор.
  • EmailOperator — имэйл илгээх оператор.
  • HTTPOperator - http хүсэлттэй ажиллах оператор.
  • SqlOperator - SQL кодыг гүйцэтгэх оператор.
  • Мэдрэгч нь үйл явдлыг хүлээх оператор юм (шаардлагатай цаг ирэх, шаардлагатай файлын харагдах байдал, мэдээллийн сан дахь мөр, API-ийн хариу гэх мэт).

Илүү тодорхой операторууд байдаг: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Мөн та өөрийн онцлогт тулгуурлан операторуудыг хөгжүүлж, төсөлдөө ашиглах боломжтой. Жишээлбэл, бид MongoDBToHiveViaHdfsTransfer, MongoDB-ээс Hive руу бичиг баримт экспортлох оператор болон түүнтэй ажиллах хэд хэдэн операторуудыг үүсгэсэн. clickhouse: CHLoadFromHiveOperator болон CHTableLoaderOperator. Үндсэндээ, төсөл нь үндсэн мэдэгдлүүд дээр суурилсан кодыг байнга ашигладаг болмогц та үүнийг шинэ мэдэгдэл болгон бүтээх талаар бодож болно. Энэ нь цаашдын хөгжлийг хялбарчлах бөгөөд та төслийн операторуудын номын санг өргөжүүлэх болно.

Дараа нь эдгээр бүх даалгавруудыг гүйцэтгэх шаардлагатай бөгөөд одоо бид төлөвлөгчийн талаар ярих болно.

Хуваарьлагч

Агаарын урсгалын ажлын хуваарь дээр суурилсан селөдерей. Celery бол дарааллыг зохион байгуулах, мөн асинхрон, хуваарилагдсан даалгавар гүйцэтгэх боломжийг олгодог Python номын сан юм. Агаарын урсгалын тал дээр бүх ажлыг усан санд хуваадаг. Усан сангуудыг гараар бүтээдэг. Ерөнхийдөө тэдний зорилго нь эх сурвалжтай ажиллах ажлын ачааллыг хязгаарлах эсвэл DWH доторх даалгавруудыг төрөлжүүлэх явдал юм. Усан сангуудыг вэб интерфэйсээр удирдаж болно:

Агаарын урсгал нь багц өгөгдөл боловсруулах процессыг хялбар бөгөөд хурдан хөгжүүлэх, хадгалах хэрэгсэл юм

Усан сан бүр үүрний тоонд хязгаарлалт тавьдаг. DAG үүсгэх үед түүнд сан өгдөг:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

DAG түвшинд тодорхойлсон санг даалгаврын түвшинд хүчингүй болгож болно.
Тусдаа процесс, Scheduler нь Airflow дахь бүх ажлыг төлөвлөх үүрэгтэй. Үнэн хэрэгтээ, Scheduler нь гүйцэтгэх даалгавруудыг тохируулах бүх механизмтай ажилладаг. Даалгаврыг гүйцэтгэхийн өмнө хэд хэдэн үе шатыг дамждаг:

  1. Өмнөх даалгавруудыг DAG-д гүйцэтгэсэн; шинээр нь дараалалд оруулах боломжтой.
  2. Дараалал нь даалгаврын тэргүүлэх чиглэлээс хамаарч эрэмблэгддэг (тэргүүлэх чиглэлийг мөн хянах боломжтой) бөгөөд хэрэв усан санд чөлөөтэй зай байвал даалгаврыг ажиллуулж болно.
  3. Хэрэв үнэ төлбөргүй ажилчин селөдерей байгаа бол даалгаврыг түүнд илгээдэг; асуудалд программчилсан ажил нэг эсвэл өөр операторыг ашиглан эхэлнэ.

Хангалттай энгийн.

Хуваарьлагч нь бүх DAG болон DAG доторх бүх ажлуудын багц дээр ажилладаг.

Хуваарьлагч нь DAG-тай ажиллаж эхлэхийн тулд DAG нь дараах хуваарийг тогтоох шаардлагатай.

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Бэлэн тохируулгын багц байдаг: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Та мөн cron илэрхийллийг ашиглаж болно:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Гүйцэтгэлийн огноо

Агаарын урсгал хэрхэн ажилладагийг ойлгохын тулд DAG-ийн гүйцэтгэлийн огноо гэж юу болохыг ойлгох нь чухал юм. Агаарын урсгалд DAG нь Гүйцэтгэлийн огноо хэмжигдэхүүнтэй байдаг, өөрөөр хэлбэл, DAG-ын ажлын хуваариас хамааран Гүйцэтгэх огноо бүрд даалгаврын жишээг үүсгэдэг. Гүйцэтгэлийн огноо бүрийн хувьд даалгавруудыг дахин гүйцэтгэж болно, эсвэл жишээлбэл, DAG нь хэд хэдэн Гүйцэтгэх огноонд нэгэн зэрэг ажиллах боломжтой. Үүнийг энд тодорхой харуулав:

Агаарын урсгал нь багц өгөгдөл боловсруулах процессыг хялбар бөгөөд хурдан хөгжүүлэх, хадгалах хэрэгсэл юм

Харамсалтай нь (эсвэл азаар: энэ нь нөхцөл байдлаас шалтгаална), хэрэв DAG дахь даалгаврын хэрэгжилтийг зассан бол өмнөх Гүйцэтгэлийн огнооны гүйцэтгэл нь тохируулгыг харгалзан үргэлжилнэ. Хэрэв та шинэ алгоритм ашиглан өнгөрсөн үеийн өгөгдлийг дахин тооцоолох шаардлагатай бол энэ нь сайн боловч үр дүнгийн давтагдах чадвар алдагдсан тул муу юм (Мэдээжийн хэрэг, Git-ээс эх кодын шаардлагатай хувилбарыг буцааж, юу болохыг тооцоолоход хэн ч танд саад болохгүй. танд хэрэгтэй байдлаараа нэг удаа хэрэгтэй болно).

Даалгавруудыг бий болгож байна

DAG-ийн хэрэгжилт нь Python-д код байдаг тул жишээ нь, хэлтэрхий эх сурвалжтай ажиллахдаа кодын хэмжээг багасгах маш тохиромжтой арга бий. Танд гурван MySQL хэлтэрхий байгаа гэж бодъё, та тус бүрд нь орж зарим мэдээлэл авах хэрэгтэй. Түүнээс гадна, бие даан, зэрэгцээ. DAG дахь Python код дараах байдалтай байж болно.

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG дараах байдлаар харагдаж байна.

Агаарын урсгал нь багц өгөгдөл боловсруулах процессыг хялбар бөгөөд хурдан хөгжүүлэх, хадгалах хэрэгсэл юм

Энэ тохиолдолд та зүгээр л тохиргоог хийж, DAG-г шинэчлэх замаар хэлтэрхий нэмэх эсвэл устгах боломжтой. Тав тухтай!

Та илүү нарийн төвөгтэй код үүсгэх аргыг ашиглаж болно, жишээлбэл, мэдээллийн сан хэлбэрээр эх сурвалжтай ажиллах эсвэл хүснэгтийн бүтэц, хүснэгттэй ажиллах алгоритмыг тайлбарлах, DWH дэд бүтцийн онцлогийг харгалзан процесс үүсгэх боломжтой. N хүснэгтийг хадгалах сандаа ачаалахад зориулагдсан. Эсвэл жишээ нь жагсаалт хэлбэрээр параметртэй ажиллахыг дэмждэггүй API-тай ажиллахдаа энэ жагсаалтаас DAG-д N даалгавруудыг үүсгэж, API дахь хүсэлтүүдийн параллель байдлыг сан руу хязгаарлаж, хусах боломжтой. API-аас шаардлагатай өгөгдөл. Уян хатан!

хадгалах газар

Airflow нь өөрийн гэсэн мэдээллийн сантай (MySQL эсвэл Postgres байж болно, бидэнд Postgres байдаг) даалгаврын төлөв, DAG, холболтын тохиргоо, глобал хувьсагч гэх мэтийг хадгалдаг. Энд би хэлмээр байна. Airflow дахь агуулах нь маш энгийн (ойролцоогоор 20 хүснэгт) бөгөөд хэрэв та үүн дээр өөрийн процессыг бүтээхийг хүсвэл тохиромжтой. Informatica репозитор дахь 100500 хүснэгтийг би санаж байна, энэ нь асуулга хэрхэн бүтээхийг ойлгохын өмнө удаан хугацааны туршид судлах шаардлагатай байсан.

Хяналт шинжилгээ

Хадгалах газрын энгийн байдлыг харгалзан та өөрт тохирсон ажлын хяналтын процессыг үүсгэж болно. Бид Zeppelin-д тэмдэглэлийн дэвтэр ашигладаг бөгөөд энд даалгаврын статусыг хардаг.

Агаарын урсгал нь багц өгөгдөл боловсруулах процессыг хялбар бөгөөд хурдан хөгжүүлэх, хадгалах хэрэгсэл юм

Энэ нь мөн Airflow-ийн вэб интерфэйс байж болно:

Агаарын урсгал нь багц өгөгдөл боловсруулах процессыг хялбар бөгөөд хурдан хөгжүүлэх, хадгалах хэрэгсэл юм

Агаарын урсгалын код нь нээлттэй эх сурвалж тул бид Telegram-д анхааруулга нэмсэн. Даалгаврын ажиллаж байгаа тохиолдол бүр, хэрэв алдаа гарвал бүх хөгжүүлэлт, дэмжлэг үзүүлэх баг бүрддэг Telegram дахь бүлэгт спам илгээдэг.

Бид Telegram-аар (шаардлагатай бол) шуурхай хариу хүлээн авдаг бөгөөд Zeppelin-ээр дамжуулан бид Airflow дахь ажлуудын ерөнхий дүр зургийг хүлээн авдаг.

Нийт

Агаарын урсгал нь үндсэндээ нээлттэй эх сурвалж бөгөөд үүнээс гайхамшгийг хүлээх ёсгүй. Үр дүнтэй шийдлийг бий болгохын тулд цаг хугацаа, хүчин чармайлт гаргахад бэлэн байгаарай. Зорилго нь биелэх боломжтой, надад итгээрэй, энэ нь үнэ цэнэтэй юм. Хөгжлийн хурд, уян хатан байдал, шинэ процесс нэмэхэд хялбар байдал - танд таалагдах болно. Мэдээжийн хэрэг, та төслийн зохион байгуулалт, Агаарын урсгалын тогтвортой байдалд маш их анхаарал хандуулах хэрэгтэй: гайхамшгууд тохиолддоггүй.

Одоо бид өдөр бүр Airflow ажиллаж байна 6,5 мянга орчим ажил. Тэд зан чанарын хувьд огт өөр. Олон янзын, маш тодорхой эх сурвалжаас үндсэн DWH-д өгөгдөл ачаалах, үндсэн DWH доторх дэлгүүрийн нүүрийг тооцоолох, хурдан DWH болгон өгөгдлийг нийтлэх, олон янзын даалгаварууд байдаг - болон Агаарын урсгал өдөр бүр бүгдийг нь зажилдаг. Тоогоор ярих юм бол энэ 2,3 мянга DWH (Hadoop) доторх янз бүрийн нарийн төвөгтэй ELT даалгаврууд, ойролцоогоор. 2,5 зуун мэдээллийн сан эх сурвалж, энэ нь нэг баг юм 4 ETL хөгжүүлэгч, эдгээр нь DWH дахь ETL өгөгдөл боловсруулах, DWH доторх ELT өгөгдөл боловсруулах болон мэдээж бусад зүйлд хуваагддаг. нэг админ, үйлчилгээний дэд бүтцийн асуудлыг хэн хариуцдаг.

РџР »Р ° РЅС <РЅР ° Р ± САРРґЭсарС ‰ РμРμ

Процессын тоо зайлшгүй нэмэгдэж байгаа бөгөөд агаарын урсгалын дэд бүтцийн хувьд бидний хийх гол зүйл бол масштабыг нэмэгдүүлэх явдал юм. Бид Агаарын урсгалын кластер байгуулж, Селөдерей ажилчдад зориулж хос хөл хуваарилж, ажлын хуваарь боловсруулах процесс, хадгалах газартай өөрөө хувилах толгой хийхийг хүсч байна.

Эпилог

Мэдээжийн хэрэг, энэ нь миний Агаарын урсгалын талаар хэлэхийг хүссэн бүх зүйл биш, гэхдээ би гол санааг тодруулахыг хичээсэн. Хоол идэхэд хоолны дуршил ирдэг, туршаад үзээрэй, танд таалагдах болно :)

Эх сурвалж: www.habr.com

сэтгэгдэл нэмэх