Salom, Xabr! Ushbu maqolada men, masalan, korporativ DWH yoki DataLake infratuzilmasida ommaviy ma'lumotlarni qayta ishlash jarayonlarini ishlab chiqish uchun ajoyib vosita haqida gapirmoqchiman. Biz Apache Airflow (keyingi o'rinlarda havo oqimi deb yuritiladi) haqida gapiramiz. Bu Habré-da nohaq e'tibordan mahrum va asosiy qismda men sizni ETL/ELT jarayonlari uchun rejalashtiruvchi tanlashda hech bo'lmaganda Airflow-ga qarashga arziydiganligiga ishontirishga harakat qilaman.
Ilgari men Tinkoff bankida ishlaganimda DWH mavzusida bir qator maqolalar yozganman. Endi men Mail.Ru Group jamoasining bir qismiga aylandim va o'yin sohasida ma'lumotlarni tahlil qilish platformasini ishlab chiqyapman. Aslida, yangiliklar va qiziqarli echimlar paydo bo'lganda, men va mening jamoam bu erda ma'lumotlar tahlili platformamiz haqida gaplashamiz.
Prologiya
Shunday ekan, boshlaylik. Havo oqimi nima? Bu kutubxona (yoki
Endi havo oqimining asosiy ob'ektlarini ko'rib chiqaylik. Ularning mohiyati va maqsadini tushunib, siz jarayon arxitekturasini optimal tarzda tashkil qilishingiz mumkin. Ehtimol, asosiy ob'ekt yo'naltirilgan asiklik grafik (keyingi o'rinlarda DAG deb yuritiladi).
DAG
DAG - bu ma'lum bir jadvalga muvofiq qat'iy belgilangan ketma-ketlikda bajarishni istagan vazifalaringizning mazmunli birlashmasi. Airflow DAG va boshqa ob'ektlar bilan ishlash uchun qulay veb-interfeysni taqdim etadi:
DAG quyidagicha ko'rinishi mumkin:
Ishlab chiquvchi DAGni loyihalashda DAG doirasida vazifalar quriladigan operatorlar to'plamini belgilaydi. Bu erda biz yana bir muhim ob'ektga keldik: Havo oqimi operatori.
Operatorlar
Operator - bu ish misollari yaratiladigan ob'ekt bo'lib, u ish misolini bajarish paytida nima sodir bo'lishini tavsiflaydi.
- BashOperator - bash buyrug'ini bajarish uchun operator.
- PythonOperator - Python kodini chaqirish uchun operator.
- EmailOperator - elektron pochta xabarlarini yuborish operatori.
- HTTPOperator - http so'rovlari bilan ishlash operatori.
- SqlOperator - SQL kodini bajarish operatori.
- Sensor - bu hodisani kutish operatori (kerakli vaqtning kelishi, kerakli faylning paydo bo'lishi, ma'lumotlar bazasidagi chiziq, APIdan javob va boshqalar).
Aniqroq operatorlar mavjud: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Shuningdek, siz o'zingizning xususiyatlaringiz asosida operatorlarni ishlab chiqishingiz va ularni loyihangizda ishlatishingiz mumkin. Masalan, biz MongoDBToHiveViaHdfsTransfer, hujjatlarni MongoDB-dan Hive-ga eksport qilish operatori va ular bilan ishlash uchun bir nechta operatorlarni yaratdik.
Keyinchalik, ushbu topshiriqlarning barchasi bajarilishi kerak va endi biz rejalashtiruvchi haqida gaplashamiz.
Rejalashtiruvchi
Havo oqimining vazifalarni rejalashtiruvchisi qurilgan
Har bir hovuzda slotlar soni bo'yicha cheklov mavjud. DAG yaratishda unga hovuz beriladi:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
DAG darajasida belgilangan hovuz vazifa darajasida bekor qilinishi mumkin.
Alohida jarayon, Scheduler, Airflow'dagi barcha vazifalarni rejalashtirish uchun javobgardir. Aslida, Scheduler bajarish uchun vazifalarni belgilashning barcha mexanikasi bilan shug'ullanadi. Vazifa bajarilishidan oldin bir necha bosqichlardan o'tadi:
- DAGda oldingi vazifalar bajarilgan, yangisini navbatga qo'yish mumkin.
- Navbat vazifalarning ustuvorligiga qarab tartiblanadi (ustuvorliklar ham nazorat qilinishi mumkin) va agar hovuzda bo'sh joy bo'lsa, vazifa ishga tushirilishi mumkin.
- Agar bepul ishchi selderey bo'lsa, vazifa unga yuboriladi; muammoda dasturlashtirgan ish u yoki bu operator yordamida boshlanadi.
Yetarlicha oddiy.
Rejalashtiruvchi barcha DAGlar to'plamida va DAG ichidagi barcha vazifalarda ishlaydi.
Scheduler DAG bilan ishlashni boshlashi uchun DAG jadvalni o'rnatishi kerak:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Tayyor sozlamalar to'plami mavjud: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Siz cron ifodalarini ham ishlatishingiz mumkin:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Amalga oshirish sanasi
Havo oqimi qanday ishlashini tushunish uchun DAG uchun bajarilish sanasi nima ekanligini tushunish muhimdir. Havo oqimida DAG ijro etilish sanasi o'lchamiga ega, ya'ni DAG ish jadvaliga qarab, har bir Ijro sanasi uchun vazifa misollari yaratiladi. Va har bir Ijro sanasi uchun vazifalar qayta bajarilishi mumkin - yoki, masalan, DAG bir vaqtning o'zida bir nechta Ijro sanalarida ishlashi mumkin. Bu bu erda aniq ko'rsatilgan:
Afsuski (yoki xayriyatki: bu vaziyatga bog'liq), agar DAGda topshiriqning bajarilishi tuzatilgan bo'lsa, avvalgi Ijro sanasida bajarish tuzatishlarni hisobga olgan holda davom etadi. Agar siz yangi algoritm yordamida o'tgan davrlardagi ma'lumotlarni qayta hisoblashingiz kerak bo'lsa, bu yaxshi, lekin bu yomon, chunki natijaning takrorlanishi yo'qoladi (albatta, hech kim sizni Git'dan manba kodining kerakli versiyasini qaytarib berish va nima qilishini hisoblash uchun bezovta qilmaydi. sizga bir marta kerak, sizga kerak bo'lgan tarzda).
Vazifalarni yaratish
DAG-ni amalga oshirish Python-da koddir, shuning uchun bizda, masalan, parchalangan manbalar bilan ishlashda kod miqdorini kamaytirishning juda qulay usuli mavjud. Aytaylik, sizda manba sifatida uchta MySQL parchasi bor, ularning har biriga kirib, ba'zi ma'lumotlarni olishingiz kerak. Bundan tashqari, mustaqil ravishda va parallel ravishda. DAGdagi Python kodi quyidagicha ko'rinishi mumkin:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG quyidagicha ko'rinadi:
Bunday holda, siz shunchaki sozlamalarni o'zgartirish va DAGni yangilash orqali parcha qo'shishingiz yoki olib tashlashingiz mumkin. Qulay!
Shuningdek, siz murakkabroq kod ishlab chiqarishdan foydalanishingiz mumkin, masalan, ma'lumotlar bazasi ko'rinishidagi manbalar bilan ishlashingiz yoki jadval tuzilishini, jadval bilan ishlash algoritmini tavsiflashingiz va DWH infratuzilmasi xususiyatlarini hisobga olgan holda jarayonni yaratishingiz mumkin. xotirangizga N ta jadvalni yuklash uchun. Yoki, masalan, ro'yxat ko'rinishidagi parametr bilan ishlashni qo'llab-quvvatlamaydigan API bilan ishlash, siz ushbu ro'yxatdan DAGda N ta vazifani yaratishingiz, APIdagi so'rovlarning parallelligini hovuzga cheklashingiz va API dan kerakli ma'lumotlar. Moslashuvchan!
ombori
Airflow o'zining zaxira omboriga, ma'lumotlar bazasiga ega (MySQL yoki Postgres bo'lishi mumkin, bizda Postgres mavjud), u vazifalar holatini, DAG'larni, ulanish sozlamalarini, global o'zgaruvchilarni va hokazolarni va hokazolarni saqlaydi. Bu erda shuni aytishim mumkinki, Airflow-dagi ombor juda oddiy (taxminan 20 ta jadval) va agar siz o'zingizning biron bir jarayoningizni uning ustiga qurmoqchi bo'lsangiz, qulay. Informatica omboridagi 100500 XNUMX ta jadvalni eslayman, ular so'rovni qanday tuzishni tushunishdan oldin uzoq vaqt davomida o'rganilishi kerak edi.
Monitoring
Omborning soddaligini hisobga olgan holda, siz o'zingiz uchun qulay bo'lgan vazifani kuzatish jarayonini yaratishingiz mumkin. Biz Zeppelin-da bloknotdan foydalanamiz, u erda biz vazifalarning holatini ko'rib chiqamiz:
Bu shuningdek, Airflow veb-interfeysi bo'lishi mumkin:
Havo oqimi kodi ochiq manba, shuning uchun biz Telegram’ga ogohlantirish qo‘shdik. Vazifaning har bir ishlayotgan namunasi, agar xatolik yuzaga kelsa, butun ishlab chiqish va qo'llab-quvvatlash jamoasidan iborat Telegramdagi guruhga spam yuboradi.
Biz Telegram orqali tezkor javob olamiz (agar kerak bo'lsa) va Zeppelin orqali biz Airflow-dagi vazifalarning umumiy rasmini olamiz.
jami
Havo oqimi birinchi navbatda ochiq manba bo'lib, undan mo''jizalar kutmasligingiz kerak. Ishlaydigan yechimni yaratish uchun vaqt va kuch sarflashga tayyor bo'ling. Maqsadga erishish mumkin, ishoning, bunga arziydi. Rivojlanish tezligi, moslashuvchanlik, yangi jarayonlarni qo'shish qulayligi - bu sizga yoqadi. Albatta, siz loyihani tashkil etishga, Havo oqimining barqarorligiga katta e'tibor berishingiz kerak: mo''jizalar sodir bo'lmaydi.
Endi bizda havo oqimi har kuni ishlaydi 6,5 mingga yaqin vazifa. Ular xarakter jihatidan juda farq qiladi. Asosiy DWH-ga turli xil va juda aniq manbalardan ma'lumotlarni yuklash vazifalari mavjud, asosiy DWH ichidagi do'konlarni hisoblash vazifalari mavjud, ma'lumotlarni tezkor DWHga nashr qilish vazifalari mavjud, juda ko'p turli xil vazifalar mavjud - va havo oqimi ularni kundan-kunga chaynadi. Raqamlarda gapiradigan bo'lsak, bu 2,3 ta DWH (Hadoop) ichida turli xil murakkablikdagi ELT vazifalari, taxminan. 2,5 yuzta ma'lumotlar bazasi manbalar, bu bir jamoa 4 ETL ishlab chiquvchisi, ular DWHda ETL ma'lumotlarini qayta ishlash va DWH ichida ELT ma'lumotlarini qayta ishlash va, albatta, boshqalarga bo'linadi. bitta admin, kim xizmat infratuzilmasi bilan shug'ullanadi.
Kelajak uchun rejalar
Jarayonlar soni muqarrar ravishda o'sib bormoqda va biz havo oqimi infratuzilmasi nuqtai nazaridan qiladigan asosiy narsa - bu miqyoslash. Biz Airflow klasterini qurmoqchimiz, Selderey ishchilari uchun bir juft oyoq ajratmoqchimiz va ishni rejalashtirish jarayonlari va ombori bilan o'z-o'zini ko'paytiradigan bosh yasamoqchimiz.
Epilog
Bu, albatta, men Airflow haqida aytmoqchi bo'lgan hamma narsa emas, lekin men asosiy fikrlarni ta'kidlashga harakat qildim. Ishtaha ovqatlanish bilan birga keladi, sinab ko'ring va sizga yoqadi :)
Manba: www.habr.com