Hey Habr! Bu yazıda, məsələn, korporativ DWH və ya DataLake infrastrukturunda toplu məlumatların işlənməsi proseslərini inkişaf etdirmək üçün əla bir vasitə haqqında danışmaq istəyirəm. Apache Airflow (bundan sonra Hava axını) haqqında danışacağıq. Habré-də ədalətsiz olaraq diqqətdən məhrumdur və əsas hissədə sizi inandırmağa çalışacağam ki, ETL / ELT prosesləriniz üçün planlaşdırıcı seçərkən heç olmasa Hava axınına baxmağa dəyər.
Əvvəllər Tinkoff Bankda işlədiyim zaman DWH mövzusunda bir sıra məqalələr yazmışdım. İndi mən Mail.Ru Group komandasının bir hissəsi oldum və oyun sahəsində məlumatların təhlili üçün platforma hazırlayıram. Əslində, xəbərlər və maraqlı həllər ortaya çıxdıqca, komanda və mən burada məlumat analitikası platformamız haqqında danışacağıq.
Ön söz
Beləliklə, başlayaq. Hava axını nədir? Bu kitabxanadır (və ya
İndi Airflow-un əsas obyektlərinə baxaq. Onların mahiyyətini və məqsədini başa düşərək, prosesin arxitekturasını optimal şəkildə təşkil edəcəksiniz. Ola bilsin ki, əsas obyekt yönləndirilmiş asiklik qrafikdir (bundan sonra DAG).
DAG
DAG, müəyyən bir cədvəl üzrə ciddi şəkildə müəyyən edilmiş ardıcıllıqla yerinə yetirmək istədiyiniz tapşırıqlarınızın semantik birliyidir. Airflow DAG-lar və digər qurumlarla işləmək üçün rahat veb interfeys təqdim edir:
DAG belə görünə bilər:
DAG dizayn edərkən, tərtibatçı DAG daxilində tapşırıqların qurulacağı bir sıra operatorlar qoyur. Burada başqa bir vacib quruma gəlirik: Hava axını operatoru.
Operatorlar
Operator, iş nümunəsinin icrası zamanı nə baş verəcəyini təsvir edən iş nümunələrinin yaradıldığı bir qurumdur.
- BashOperator bash əmrini yerinə yetirmək üçün operatordur.
- PythonOperator Python kodunu çağırmaq üçün operatordur.
- EmailOperator - e-poçt göndərmək üçün operator.
- HTTPOperator - http sorğuları ilə işləmək üçün operator.
- SqlOperator SQL kodunu icra etmək üçün operatordur.
- Sensor - hadisəni gözləmək üçün operator (istənilən vaxtın gəlməsi, tələb olunan faylın görünüşü, verilənlər bazasında sıra, API-dən cavab və s. və s.).
Daha konkret operatorlar var: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Siz həmçinin ehtiyaclarınıza uyğun operatorlar hazırlayıb layihənizdə istifadə edə bilərsiniz. Məsələn, MongoDBToHiveViaHdfsTransfer, sənədləri MongoDB-dən Hive-a ixrac etmək üçün operator və onlarla işləmək üçün bir neçə operator yaratdıq.
Bundan əlavə, bütün bu tapşırıq nümunələri yerinə yetirilməlidir və indi planlaşdırıcı haqqında danışacağıq.
Planlayıcı
Airflow-da tapşırıq planlayıcısı üzərində qurulub
Hər hovuzda slotların sayına məhdudiyyət qoyulur. DAG yaratarkən ona hovuz verilir:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
DAG səviyyəsində müəyyən edilmiş hovuz tapşırıq səviyyəsində ləğv edilə bilər.
Ayrı bir proses, Scheduler, Airflow-da bütün tapşırıqların planlaşdırılmasına cavabdehdir. Əslində, Planlayıcı icra üçün tapşırıqların qoyulmasının bütün mexanikası ilə məşğul olur. Tapşırıq yerinə yetirilməzdən əvvəl bir neçə mərhələdən keçir:
- DAG-da əvvəlki vəzifələr tamamlandı, yenisi növbəyə alına bilər.
- Növbə tapşırıqların prioritetindən asılı olaraq çeşidlənir (prioritetlərə də nəzarət etmək olar) və hovuzda boş yer varsa, tapşırıq işə götürülə bilər.
- Pulsuz işçi kərəviz varsa, tapşırıq ona göndərilir; tapşırıqda proqramlaşdırdığınız iş bu və ya digər operatordan istifadə edərək başlayır.
Kifayət qədər sadə.
Planlayıcı bütün DAG-lar və DAG-lar daxilindəki bütün tapşırıqlar toplusunda işləyir.
Planlaşdırıcının DAG ilə işləməyə başlaması üçün DAG cədvəli təyin etməlidir:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Hazır əvvəlcədən qurulmuş bir sıra var: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Siz həmçinin cron ifadələrindən istifadə edə bilərsiniz:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
İcra tarixi
Hava axınının necə işlədiyini başa düşmək üçün DAG üçün İcra Tarixinin nə olduğunu başa düşmək vacibdir. Hava axını DAG İcra Tarixi ölçüsünə malikdir, yəni DAG-ın iş qrafikindən asılı olaraq hər İcra Tarixi üçün tapşırıq nümunələri yaradılır. Və hər İcra Tarixi üçün tapşırıqlar yenidən icra oluna bilər - və ya, məsələn, DAG eyni vaxtda bir neçə İcra Tarixində işləyə bilər. Bu burada aydın şəkildə göstərilir:
Təəssüf ki (və ya bəlkə də xoşbəxtlikdən: bu vəziyyətdən asılıdır), əgər DAG-da tapşırığın yerinə yetirilməsi düzgündürsə, əvvəlki İcra Tarixindəki icra düzəlişlərlə gedəcəkdir. Yeni alqoritmdən istifadə edərək keçmiş dövrlərdəki məlumatları yenidən hesablamaq lazımdırsa, bu yaxşıdır, lakin pisdir, çünki nəticənin təkrarlanma qabiliyyəti itir (əlbəttə ki, heç kim Git-dən mənbə kodunun tələb olunan versiyasını qaytarmaq və nə etdiyinizi hesablamaq üçün narahat olmur. lazım olduqda bir dəfə lazımdır).
Tapşırıq generasiyası
DAG tətbiqi Python kodudur, buna görə də, məsələn, parçalanmış mənbələrlə işləyərkən kodun miqdarını azaltmaq üçün çox rahat bir yolumuz var. Tutaq ki, mənbə kimi üç MySQL qırıntınız var, hər birinə dırmaşmaq və bəzi məlumatları götürməlisiniz. Və müstəqil və paralel olaraq. DAG-dakı Python kodu belə görünə bilər:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG belə görünür:
Eyni zamanda, sadəcə parametrləri tənzimləmək və DAG-ı yeniləməklə bir parça əlavə edə və ya silə bilərsiniz. Rahat!
Siz həmçinin daha mürəkkəb kod generasiyasından istifadə edə bilərsiniz, məsələn, verilənlər bazası şəklində mənbələrlə işləyə və ya cədvəl strukturunu, cədvəllə işləmək üçün alqoritmi təsvir edə və DWH infrastrukturunun xüsusiyyətlərini nəzərə alaraq prosesi yarada bilərsiniz. yaddaşınıza N cədvəl yükləmək. Və ya, məsələn, siyahı şəklində bir parametrlə işləməyi dəstəkləməyən API ilə işləyərkən, bu siyahıdan istifadə edərək DAG-da N tapşırıq yarada, API-dəki sorğuların paralelliyini hovuza məhdudlaşdıra və çıxara bilərsiniz. API-dən lazımi məlumatlar. Çevik!
anbar
Airflow-un öz arxa repozitoriyası var, verilənlər bazası (bəlkə MySQL və ya Postgres, bizdə Postgres var), burada tapşırıqların vəziyyətləri, DAG-lar, əlaqə parametrləri, qlobal dəyişənlər və s. və s. Airflow-da çox sadədir (təxminən 20 cədvəl) və hər hansı bir prosesinizi onun üzərində qurmaq istəyirsinizsə, rahatdır. Sorğunun necə qurulacağını başa düşməzdən əvvəl uzun müddət siqaret çəkməli olan Informatica deposunda 100500 cədvəli xatırlayıram.
Monitorinq
Anbarın sadəliyini nəzərə alaraq, tapşırıqların monitorinqi üçün sizin üçün əlverişli olan bir proses qura bilərsiniz. Tapşırıqların vəziyyətinə baxdığımız Zeppelin-də notepad istifadə edirik:
O, həmçinin Airflow-un özünün veb interfeysi ola bilər:
Hava axını kodu açıqdır, ona görə də biz Telegram-da xəbərdarlıq əlavə etdik. Hər çalışan tapşırıq nümunəsi, xəta baş verərsə, bütün inkişaf və dəstək qrupunun ibarət olduğu Telegram qrupuna spam göndərir.
Biz Telegram vasitəsilə (lazım olduqda), Zeppelin vasitəsilə operativ cavab alırıq - Airflow-dakı tapşırıqların ümumi şəkli.
Ümumi
Hava axını ilk növbədə açıq mənbədir və ondan möcüzə gözləməyin. İşləyən bir həll yaratmaq üçün vaxt və səy sərf etməyə hazır olun. Əldə edilə bilən kateqoriyadan bir məqsəd, inanın, buna dəyər. İnkişaf sürəti, çeviklik, yeni proseslər əlavə etmək asanlığı - bunu bəyənəcəksiniz. Əlbəttə ki, layihənin təşkilinə, Airflow-un özünün işinin sabitliyinə çox diqqət yetirmək lazımdır: möcüzələr yoxdur.
İndi bizdə Airflow gündəlik işləyir təxminən 6,5 min tapşırıq. Təbiət baxımından tamamilə fərqlidirlər. Bir çox fərqli və çox xüsusi mənbələrdən məlumatların əsas DWH-yə yüklənməsi üçün tapşırıqlar var, əsas DWH daxilində vitrinlərin hesablanması üçün tapşırıqlar var, məlumatları sürətli DWH-də dərc etmək üçün tapşırıqlar var, çoxlu sayda müxtəlif tapşırıqlar var - və Hava axını bütün gün onları çeynəyir. Rəqəmlərlə desək, belədir 2,3 min DWH (Hadoop) daxilində müxtəlif mürəkkəblikdə ELT tapşırıqları, haqqında 2,5 yüz verilənlər bazası mənbələrdən gələn bir əmrdir 4 ETL tərtibatçıları, bunlar DWH-də ETL məlumat emalına və DWH daxilində ELT məlumat emalına və əlbəttə ki, daha çoxuna bölünür. bir admin, xidmətin infrastrukturu ilə məşğul olan.
Gələcək üçün planlar
Proseslərin sayı qaçılmaz olaraq artır və Hava axını infrastrukturu baxımından edəcəyimiz əsas şey miqyaslandırmadır. Biz Hava axını klasteri qurmaq, Kərəviz işçiləri üçün bir neçə ayaq ayırmaq və iş planlaşdırma prosesləri və anbar ilə dublikat baş etmək istəyirik.
Epiloq
Bu, əlbəttə ki, Airflow haqqında danışmaq istədiyim hər şeydən uzaqdır, amma əsas məqamları vurğulamağa çalışdım. İştah yeməklə gəlir, cəhd edin, bəyənəcəksiniz 🙂
Mənbə: www.habr.com