Airflow - bu ommaviy ma'lumotlarni qayta ishlash jarayonlarini qulay va tez ishlab chiqish va saqlash uchun vosita

Airflow - bu ommaviy ma'lumotlarni qayta ishlash jarayonlarini qulay va tez ishlab chiqish va saqlash uchun vosita

Salom, Xabr! Ushbu maqolada men, masalan, korporativ DWH yoki DataLake infratuzilmasida ommaviy ma'lumotlarni qayta ishlash jarayonlarini ishlab chiqish uchun ajoyib vosita haqida gapirmoqchiman. Biz Apache Airflow (keyingi o'rinlarda havo oqimi deb yuritiladi) haqida gapiramiz. Bu Habré-da nohaq e'tibordan mahrum va asosiy qismda men sizni ETL/ELT jarayonlari uchun rejalashtiruvchi tanlashda hech bo'lmaganda Airflow-ga qarashga arziydiganligiga ishontirishga harakat qilaman.

Ilgari men Tinkoff bankida ishlaganimda DWH mavzusida bir qator maqolalar yozganman. Endi men Mail.Ru Group jamoasining bir qismiga aylandim va o'yin sohasida ma'lumotlarni tahlil qilish platformasini ishlab chiqyapman. Aslida, yangiliklar va qiziqarli echimlar paydo bo'lganda, men va mening jamoam bu erda ma'lumotlar tahlili platformamiz haqida gaplashamiz.

Prologiya

Shunday ekan, boshlaylik. Havo oqimi nima? Bu kutubxona (yoki kutubxonalar to'plami) ish jarayonlarini ishlab chiqish, rejalashtirish va monitoring qilish. Airflow ning asosiy xususiyati: Python kodi jarayonlarni tavsiflash (ishlab chiqish) uchun ishlatiladi. Bu sizning loyihangizni va ishlanmangizni tashkil qilish uchun juda ko'p afzalliklarga ega: mohiyatiga ko'ra, sizning (masalan) ETL loyihangiz shunchaki Python loyihasi bo'lib, uni infratuzilmaning o'ziga xosligi, jamoa hajmi va hajmini hisobga olgan holda o'zingiz xohlagan tarzda tashkil qilishingiz mumkin. boshqa talablar. Instrumental ravishda hamma narsa oddiy. Masalan, PyCharm + Git-dan foydalaning. Bu ajoyib va ​​juda qulay!

Endi havo oqimining asosiy ob'ektlarini ko'rib chiqaylik. Ularning mohiyati va maqsadini tushunib, siz jarayon arxitekturasini optimal tarzda tashkil qilishingiz mumkin. Ehtimol, asosiy ob'ekt yo'naltirilgan asiklik grafik (keyingi o'rinlarda DAG deb yuritiladi).

DAG

DAG - bu ma'lum bir jadvalga muvofiq qat'iy belgilangan ketma-ketlikda bajarishni istagan vazifalaringizning mazmunli birlashmasi. Airflow DAG va boshqa ob'ektlar bilan ishlash uchun qulay veb-interfeysni taqdim etadi:

Airflow - bu ommaviy ma'lumotlarni qayta ishlash jarayonlarini qulay va tez ishlab chiqish va saqlash uchun vosita

DAG quyidagicha ko'rinishi mumkin:

Airflow - bu ommaviy ma'lumotlarni qayta ishlash jarayonlarini qulay va tez ishlab chiqish va saqlash uchun vosita

Ishlab chiquvchi DAGni loyihalashda DAG doirasida vazifalar quriladigan operatorlar to'plamini belgilaydi. Bu erda biz yana bir muhim ob'ektga keldik: Havo oqimi operatori.

Operatorlar

Operator - bu ish misollari yaratiladigan ob'ekt bo'lib, u ish misolini bajarish paytida nima sodir bo'lishini tavsiflaydi. GitHub-dan havo oqimi chiqariladi allaqachon foydalanishga tayyor operatorlar to'plamini o'z ichiga oladi. Misollar:

  • BashOperator - bash buyrug'ini bajarish uchun operator.
  • PythonOperator - Python kodini chaqirish uchun operator.
  • EmailOperator - elektron pochta xabarlarini yuborish operatori.
  • HTTPOperator - http so'rovlari bilan ishlash operatori.
  • SqlOperator - SQL kodini bajarish operatori.
  • Sensor - bu hodisani kutish operatori (kerakli vaqtning kelishi, kerakli faylning paydo bo'lishi, ma'lumotlar bazasidagi chiziq, APIdan javob va boshqalar).

Aniqroq operatorlar mavjud: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Shuningdek, siz o'zingizning xususiyatlaringiz asosida operatorlarni ishlab chiqishingiz va ularni loyihangizda ishlatishingiz mumkin. Masalan, biz MongoDBToHiveViaHdfsTransfer, hujjatlarni MongoDB-dan Hive-ga eksport qilish operatori va ular bilan ishlash uchun bir nechta operatorlarni yaratdik. ClickHouse: CHLoadFromHiveOperator va CHTableLoaderOperator. Aslini olganda, loyiha asosiy iboralar asosida tuzilgan koddan tez-tez foydalansa, uni yangi bayonotga aylantirish haqida o'ylashingiz mumkin. Bu keyingi rivojlanishni soddalashtiradi va siz loyihadagi operatorlar kutubxonangizni kengaytirasiz.

Keyinchalik, ushbu topshiriqlarning barchasi bajarilishi kerak va endi biz rejalashtiruvchi haqida gaplashamiz.

Rejalashtiruvchi

Havo oqimining vazifalarni rejalashtiruvchisi qurilgan Seldr. Selderey - bu Python kutubxonasi bo'lib, u navbatni tashkil qilish, shuningdek, vazifalarning asinxron va taqsimlangan bajarilishini tashkil qilish imkonini beradi. Havo oqimi tomonida barcha vazifalar hovuzlarga bo'lingan. Hovuzlar qo'lda yaratilgan. Odatda, ularning maqsadi manba bilan ishlashning ish yukini cheklash yoki DWH ichidagi vazifalarni tiplashtirishdir. Hovuzlarni veb-interfeys orqali boshqarish mumkin:

Airflow - bu ommaviy ma'lumotlarni qayta ishlash jarayonlarini qulay va tez ishlab chiqish va saqlash uchun vosita

Har bir hovuzda slotlar soni bo'yicha cheklov mavjud. DAG yaratishda unga hovuz beriladi:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

DAG darajasida belgilangan hovuz vazifa darajasida bekor qilinishi mumkin.
Alohida jarayon, Scheduler, Airflow'dagi barcha vazifalarni rejalashtirish uchun javobgardir. Aslida, Scheduler bajarish uchun vazifalarni belgilashning barcha mexanikasi bilan shug'ullanadi. Vazifa bajarilishidan oldin bir necha bosqichlardan o'tadi:

  1. DAGda oldingi vazifalar bajarilgan, yangisini navbatga qo'yish mumkin.
  2. Navbat vazifalarning ustuvorligiga qarab tartiblanadi (ustuvorliklar ham nazorat qilinishi mumkin) va agar hovuzda bo'sh joy bo'lsa, vazifa ishga tushirilishi mumkin.
  3. Agar bepul ishchi selderey bo'lsa, vazifa unga yuboriladi; muammoda dasturlashtirgan ish u yoki bu operator yordamida boshlanadi.

Yetarlicha oddiy.

Rejalashtiruvchi barcha DAGlar to'plamida va DAG ichidagi barcha vazifalarda ishlaydi.

Scheduler DAG bilan ishlashni boshlashi uchun DAG jadvalni o'rnatishi kerak:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Tayyor sozlamalar to'plami mavjud: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Siz cron ifodalarini ham ishlatishingiz mumkin:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Amalga oshirish sanasi

Havo oqimi qanday ishlashini tushunish uchun DAG uchun bajarilish sanasi nima ekanligini tushunish muhimdir. Havo oqimida DAG ijro etilish sanasi o'lchamiga ega, ya'ni DAG ish jadvaliga qarab, har bir Ijro sanasi uchun vazifa misollari yaratiladi. Va har bir Ijro sanasi uchun vazifalar qayta bajarilishi mumkin - yoki, masalan, DAG bir vaqtning o'zida bir nechta Ijro sanalarida ishlashi mumkin. Bu bu erda aniq ko'rsatilgan:

Airflow - bu ommaviy ma'lumotlarni qayta ishlash jarayonlarini qulay va tez ishlab chiqish va saqlash uchun vosita

Afsuski (yoki xayriyatki: bu vaziyatga bog'liq), agar DAGda topshiriqning bajarilishi tuzatilgan bo'lsa, avvalgi Ijro sanasida bajarish tuzatishlarni hisobga olgan holda davom etadi. Agar siz yangi algoritm yordamida o'tgan davrlardagi ma'lumotlarni qayta hisoblashingiz kerak bo'lsa, bu yaxshi, lekin bu yomon, chunki natijaning takrorlanishi yo'qoladi (albatta, hech kim sizni Git'dan manba kodining kerakli versiyasini qaytarib berish va nima qilishini hisoblash uchun bezovta qilmaydi. sizga bir marta kerak, sizga kerak bo'lgan tarzda).

Vazifalarni yaratish

DAG-ni amalga oshirish Python-da koddir, shuning uchun bizda, masalan, parchalangan manbalar bilan ishlashda kod miqdorini kamaytirishning juda qulay usuli mavjud. Aytaylik, sizda manba sifatida uchta MySQL parchasi bor, ularning har biriga kirib, ba'zi ma'lumotlarni olishingiz kerak. Bundan tashqari, mustaqil ravishda va parallel ravishda. DAGdagi Python kodi quyidagicha ko'rinishi mumkin:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG quyidagicha ko'rinadi:

Airflow - bu ommaviy ma'lumotlarni qayta ishlash jarayonlarini qulay va tez ishlab chiqish va saqlash uchun vosita

Bunday holda, siz shunchaki sozlamalarni o'zgartirish va DAGni yangilash orqali parcha qo'shishingiz yoki olib tashlashingiz mumkin. Qulay!

Shuningdek, siz murakkabroq kod ishlab chiqarishdan foydalanishingiz mumkin, masalan, ma'lumotlar bazasi ko'rinishidagi manbalar bilan ishlashingiz yoki jadval tuzilishini, jadval bilan ishlash algoritmini tavsiflashingiz va DWH infratuzilmasi xususiyatlarini hisobga olgan holda jarayonni yaratishingiz mumkin. xotirangizga N ta jadvalni yuklash uchun. Yoki, masalan, ro'yxat ko'rinishidagi parametr bilan ishlashni qo'llab-quvvatlamaydigan API bilan ishlash, siz ushbu ro'yxatdan DAGda N ta vazifani yaratishingiz, APIdagi so'rovlarning parallelligini hovuzga cheklashingiz va API dan kerakli ma'lumotlar. Moslashuvchan!

ombori

Airflow o'zining zaxira omboriga, ma'lumotlar bazasiga ega (MySQL yoki Postgres bo'lishi mumkin, bizda Postgres mavjud), u vazifalar holatini, DAG'larni, ulanish sozlamalarini, global o'zgaruvchilarni va hokazolarni va hokazolarni saqlaydi. Bu erda shuni aytishim mumkinki, Airflow-dagi ombor juda oddiy (taxminan 20 ta jadval) va agar siz o'zingizning biron bir jarayoningizni uning ustiga qurmoqchi bo'lsangiz, qulay. Informatica omboridagi 100500 XNUMX ta jadvalni eslayman, ular so'rovni qanday tuzishni tushunishdan oldin uzoq vaqt davomida o'rganilishi kerak edi.

Monitoring

Omborning soddaligini hisobga olgan holda, siz o'zingiz uchun qulay bo'lgan vazifani kuzatish jarayonini yaratishingiz mumkin. Biz Zeppelin-da bloknotdan foydalanamiz, u erda biz vazifalarning holatini ko'rib chiqamiz:

Airflow - bu ommaviy ma'lumotlarni qayta ishlash jarayonlarini qulay va tez ishlab chiqish va saqlash uchun vosita

Bu shuningdek, Airflow veb-interfeysi bo'lishi mumkin:

Airflow - bu ommaviy ma'lumotlarni qayta ishlash jarayonlarini qulay va tez ishlab chiqish va saqlash uchun vosita

Havo oqimi kodi ochiq manba, shuning uchun biz Telegram’ga ogohlantirish qo‘shdik. Vazifaning har bir ishlayotgan namunasi, agar xatolik yuzaga kelsa, butun ishlab chiqish va qo'llab-quvvatlash jamoasidan iborat Telegramdagi guruhga spam yuboradi.

Biz Telegram orqali tezkor javob olamiz (agar kerak bo'lsa) va Zeppelin orqali biz Airflow-dagi vazifalarning umumiy rasmini olamiz.

jami

Havo oqimi birinchi navbatda ochiq manba bo'lib, undan mo''jizalar kutmasligingiz kerak. Ishlaydigan yechimni yaratish uchun vaqt va kuch sarflashga tayyor bo'ling. Maqsadga erishish mumkin, ishoning, bunga arziydi. Rivojlanish tezligi, moslashuvchanlik, yangi jarayonlarni qo'shish qulayligi - bu sizga yoqadi. Albatta, siz loyihani tashkil etishga, Havo oqimining barqarorligiga katta e'tibor berishingiz kerak: mo''jizalar sodir bo'lmaydi.

Endi bizda havo oqimi har kuni ishlaydi 6,5 mingga yaqin vazifa. Ular xarakter jihatidan juda farq qiladi. Asosiy DWH-ga turli xil va juda aniq manbalardan ma'lumotlarni yuklash vazifalari mavjud, asosiy DWH ichidagi do'konlarni hisoblash vazifalari mavjud, ma'lumotlarni tezkor DWHga nashr qilish vazifalari mavjud, juda ko'p turli xil vazifalar mavjud - va havo oqimi ularni kundan-kunga chaynadi. Raqamlarda gapiradigan bo'lsak, bu 2,3 ta DWH (Hadoop) ichida turli xil murakkablikdagi ELT vazifalari, taxminan. 2,5 yuzta ma'lumotlar bazasi manbalar, bu bir jamoa 4 ETL ishlab chiquvchisi, ular DWHda ETL ma'lumotlarini qayta ishlash va DWH ichida ELT ma'lumotlarini qayta ishlash va, albatta, boshqalarga bo'linadi. bitta admin, kim xizmat infratuzilmasi bilan shug'ullanadi.

Kelajak uchun rejalar

Jarayonlar soni muqarrar ravishda o'sib bormoqda va biz havo oqimi infratuzilmasi nuqtai nazaridan qiladigan asosiy narsa - bu miqyoslash. Biz Airflow klasterini qurmoqchimiz, Selderey ishchilari uchun bir juft oyoq ajratmoqchimiz va ishni rejalashtirish jarayonlari va ombori bilan o'z-o'zini ko'paytiradigan bosh yasamoqchimiz.

Epilog

Bu, albatta, men Airflow haqida aytmoqchi bo'lgan hamma narsa emas, lekin men asosiy fikrlarni ta'kidlashga harakat qildim. Ishtaha ovqatlanish bilan birga keladi, sinab ko'ring va sizga yoqadi :)

Manba: www.habr.com

a Izoh qo'shish