Airflow toplu məlumatların işlənməsi proseslərini rahat və tez inkişaf etdirmək və saxlamaq üçün bir vasitədir

Airflow toplu məlumatların işlənməsi proseslərini rahat və tez inkişaf etdirmək və saxlamaq üçün bir vasitədir

Hey Habr! Bu yazıda, məsələn, korporativ DWH və ya DataLake infrastrukturunda toplu məlumatların işlənməsi proseslərini inkişaf etdirmək üçün əla bir vasitə haqqında danışmaq istəyirəm. Apache Airflow (bundan sonra Hava axını) haqqında danışacağıq. Habré-də ədalətsiz olaraq diqqətdən məhrumdur və əsas hissədə sizi inandırmağa çalışacağam ki, ETL / ELT prosesləriniz üçün planlaşdırıcı seçərkən heç olmasa Hava axınına baxmağa dəyər.

Əvvəllər Tinkoff Bankda işlədiyim zaman DWH mövzusunda bir sıra məqalələr yazmışdım. İndi mən Mail.Ru Group komandasının bir hissəsi oldum və oyun sahəsində məlumatların təhlili üçün platforma hazırlayıram. Əslində, xəbərlər və maraqlı həllər ortaya çıxdıqca, komanda və mən burada məlumat analitikası platformamız haqqında danışacağıq.

Ön söz

Beləliklə, başlayaq. Hava axını nədir? Bu kitabxanadır (və ya kitabxanalar toplusu) iş axınını inkişaf etdirmək, planlaşdırmaq və nəzarət etmək. Airflow-un əsas xüsusiyyəti Python kodunun prosesləri təsvir etmək (inkişaf etmək) üçün istifadə olunmasıdır. Bunun layihənizi və inkişafınızı təşkil etmək üçün bir çox üstünlükləri var: əslində sizin (məsələn) ETL layihəniz sadəcə Python layihəsidir və siz onu infrastruktur xüsusiyyətlərini, komanda ölçüsünü və digər tələbləri nəzərə alaraq istədiyiniz kimi təşkil edə bilərsiniz. . Instrumental olaraq hər şey sadədir. Məsələn, PyCharm + Git istifadə edin. Əla və çox rahatdır!

İndi Airflow-un əsas obyektlərinə baxaq. Onların mahiyyətini və məqsədini başa düşərək, prosesin arxitekturasını optimal şəkildə təşkil edəcəksiniz. Ola bilsin ki, əsas obyekt yönləndirilmiş asiklik qrafikdir (bundan sonra DAG).

DAG

DAG, müəyyən bir cədvəl üzrə ciddi şəkildə müəyyən edilmiş ardıcıllıqla yerinə yetirmək istədiyiniz tapşırıqlarınızın semantik birliyidir. Airflow DAG-lar və digər qurumlarla işləmək üçün rahat veb interfeys təqdim edir:

Airflow toplu məlumatların işlənməsi proseslərini rahat və tez inkişaf etdirmək və saxlamaq üçün bir vasitədir

DAG belə görünə bilər:

Airflow toplu məlumatların işlənməsi proseslərini rahat və tez inkişaf etdirmək və saxlamaq üçün bir vasitədir

DAG dizayn edərkən, tərtibatçı DAG daxilində tapşırıqların qurulacağı bir sıra operatorlar qoyur. Burada başqa bir vacib quruma gəlirik: Hava axını operatoru.

Operatorlar

Operator, iş nümunəsinin icrası zamanı nə baş verəcəyini təsvir edən iş nümunələrinin yaradıldığı bir qurumdur. GitHub-dan hava axını buraxılır artıq istifadəyə hazır ifadələr toplusunu ehtiva edir. Nümunələr:

  • BashOperator bash əmrini yerinə yetirmək üçün operatordur.
  • PythonOperator Python kodunu çağırmaq üçün operatordur.
  • EmailOperator - e-poçt göndərmək üçün operator.
  • HTTPOperator - http sorğuları ilə işləmək üçün operator.
  • SqlOperator SQL kodunu icra etmək üçün operatordur.
  • Sensor - hadisəni gözləmək üçün operator (istənilən vaxtın gəlməsi, tələb olunan faylın görünüşü, verilənlər bazasında sıra, API-dən cavab və s. və s.).

Daha konkret operatorlar var: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Siz həmçinin ehtiyaclarınıza uyğun operatorlar hazırlayıb layihənizdə istifadə edə bilərsiniz. Məsələn, MongoDBToHiveViaHdfsTransfer, sənədləri MongoDB-dən Hive-a ixrac etmək üçün operator və onlarla işləmək üçün bir neçə operator yaratdıq. Basın Evi: CHLoadFromHiveOperator və CHTableLoaderOperator. Əslində, bir layihə əsas ifadələr üzərində qurulmuş kodu tez-tez istifadə edən kimi, onu yeni bir ifadəyə tərtib etmək barədə düşünə bilərsiniz. Bu, gələcək inkişafı asanlaşdıracaq və siz layihədəki operatorlar kitabxananıza əlavə edəcəksiniz.

Bundan əlavə, bütün bu tapşırıq nümunələri yerinə yetirilməlidir və indi planlaşdırıcı haqqında danışacağıq.

Planlayıcı

Airflow-da tapşırıq planlayıcısı üzərində qurulub Kərəviz. Kərəviz, növbə təşkil etməyə və tapşırıqların asinxron və paylanmış icrasını təşkil etməyə imkan verən Python kitabxanasıdır. Hava axını tərəfdən bütün vəzifələr hovuzlara bölünür. Hovuzlar əl ilə yaradılır. Bir qayda olaraq, onların məqsədi mənbə ilə işləmə yükünü məhdudlaşdırmaq və ya DWH daxilində tapşırıqları yazmaqdır. Hovuzları veb interfeys vasitəsilə idarə etmək olar:

Airflow toplu məlumatların işlənməsi proseslərini rahat və tez inkişaf etdirmək və saxlamaq üçün bir vasitədir

Hər hovuzda slotların sayına məhdudiyyət qoyulur. DAG yaratarkən ona hovuz verilir:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

DAG səviyyəsində müəyyən edilmiş hovuz tapşırıq səviyyəsində ləğv edilə bilər.
Ayrı bir proses, Scheduler, Airflow-da bütün tapşırıqların planlaşdırılmasına cavabdehdir. Əslində, Planlayıcı icra üçün tapşırıqların qoyulmasının bütün mexanikası ilə məşğul olur. Tapşırıq yerinə yetirilməzdən əvvəl bir neçə mərhələdən keçir:

  1. DAG-da əvvəlki vəzifələr tamamlandı, yenisi növbəyə alına bilər.
  2. Növbə tapşırıqların prioritetindən asılı olaraq çeşidlənir (prioritetlərə də nəzarət etmək olar) və hovuzda boş yer varsa, tapşırıq işə götürülə bilər.
  3. Pulsuz işçi kərəviz varsa, tapşırıq ona göndərilir; tapşırıqda proqramlaşdırdığınız iş bu və ya digər operatordan istifadə edərək başlayır.

Kifayət qədər sadə.

Planlayıcı bütün DAG-lar və DAG-lar daxilindəki bütün tapşırıqlar toplusunda işləyir.

Planlaşdırıcının DAG ilə işləməyə başlaması üçün DAG cədvəli təyin etməlidir:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Hazır əvvəlcədən qurulmuş bir sıra var: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Siz həmçinin cron ifadələrindən istifadə edə bilərsiniz:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

İcra tarixi

Hava axınının necə işlədiyini başa düşmək üçün DAG üçün İcra Tarixinin nə olduğunu başa düşmək vacibdir. Hava axını DAG İcra Tarixi ölçüsünə malikdir, yəni DAG-ın iş qrafikindən asılı olaraq hər İcra Tarixi üçün tapşırıq nümunələri yaradılır. Və hər İcra Tarixi üçün tapşırıqlar yenidən icra oluna bilər - və ya, məsələn, DAG eyni vaxtda bir neçə İcra Tarixində işləyə bilər. Bu burada aydın şəkildə göstərilir:

Airflow toplu məlumatların işlənməsi proseslərini rahat və tez inkişaf etdirmək və saxlamaq üçün bir vasitədir

Təəssüf ki (və ya bəlkə də xoşbəxtlikdən: bu vəziyyətdən asılıdır), əgər DAG-da tapşırığın yerinə yetirilməsi düzgündürsə, əvvəlki İcra Tarixindəki icra düzəlişlərlə gedəcəkdir. Yeni alqoritmdən istifadə edərək keçmiş dövrlərdəki məlumatları yenidən hesablamaq lazımdırsa, bu yaxşıdır, lakin pisdir, çünki nəticənin təkrarlanma qabiliyyəti itir (əlbəttə ki, heç kim Git-dən mənbə kodunun tələb olunan versiyasını qaytarmaq və nə etdiyinizi hesablamaq üçün narahat olmur. lazım olduqda bir dəfə lazımdır).

Tapşırıq generasiyası

DAG tətbiqi Python kodudur, buna görə də, məsələn, parçalanmış mənbələrlə işləyərkən kodun miqdarını azaltmaq üçün çox rahat bir yolumuz var. Tutaq ki, mənbə kimi üç MySQL qırıntınız var, hər birinə dırmaşmaq və bəzi məlumatları götürməlisiniz. Və müstəqil və paralel olaraq. DAG-dakı Python kodu belə görünə bilər:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG belə görünür:

Airflow toplu məlumatların işlənməsi proseslərini rahat və tez inkişaf etdirmək və saxlamaq üçün bir vasitədir

Eyni zamanda, sadəcə parametrləri tənzimləmək və DAG-ı yeniləməklə bir parça əlavə edə və ya silə bilərsiniz. Rahat!

Siz həmçinin daha mürəkkəb kod generasiyasından istifadə edə bilərsiniz, məsələn, verilənlər bazası şəklində mənbələrlə işləyə və ya cədvəl strukturunu, cədvəllə işləmək üçün alqoritmi təsvir edə və DWH infrastrukturunun xüsusiyyətlərini nəzərə alaraq prosesi yarada bilərsiniz. yaddaşınıza N cədvəl yükləmək. Və ya, məsələn, siyahı şəklində bir parametrlə işləməyi dəstəkləməyən API ilə işləyərkən, bu siyahıdan istifadə edərək DAG-da N tapşırıq yarada, API-dəki sorğuların paralelliyini hovuza məhdudlaşdıra və çıxara bilərsiniz. API-dən lazımi məlumatlar. Çevik!

anbar

Airflow-un öz arxa repozitoriyası var, verilənlər bazası (bəlkə MySQL və ya Postgres, bizdə Postgres var), burada tapşırıqların vəziyyətləri, DAG-lar, əlaqə parametrləri, qlobal dəyişənlər və s. və s. Airflow-da çox sadədir (təxminən 20 cədvəl) və hər hansı bir prosesinizi onun üzərində qurmaq istəyirsinizsə, rahatdır. Sorğunun necə qurulacağını başa düşməzdən əvvəl uzun müddət siqaret çəkməli olan Informatica deposunda 100500 cədvəli xatırlayıram.

Monitorinq

Anbarın sadəliyini nəzərə alaraq, tapşırıqların monitorinqi üçün sizin üçün əlverişli olan bir proses qura bilərsiniz. Tapşırıqların vəziyyətinə baxdığımız Zeppelin-də notepad istifadə edirik:

Airflow toplu məlumatların işlənməsi proseslərini rahat və tez inkişaf etdirmək və saxlamaq üçün bir vasitədir

O, həmçinin Airflow-un özünün veb interfeysi ola bilər:

Airflow toplu məlumatların işlənməsi proseslərini rahat və tez inkişaf etdirmək və saxlamaq üçün bir vasitədir

Hava axını kodu açıqdır, ona görə də biz Telegram-da xəbərdarlıq əlavə etdik. Hər çalışan tapşırıq nümunəsi, xəta baş verərsə, bütün inkişaf və dəstək qrupunun ibarət olduğu Telegram qrupuna spam göndərir.

Biz Telegram vasitəsilə (lazım olduqda), Zeppelin vasitəsilə operativ cavab alırıq - Airflow-dakı tapşırıqların ümumi şəkli.

Ümumi

Hava axını ilk növbədə açıq mənbədir və ondan möcüzə gözləməyin. İşləyən bir həll yaratmaq üçün vaxt və səy sərf etməyə hazır olun. Əldə edilə bilən kateqoriyadan bir məqsəd, inanın, buna dəyər. İnkişaf sürəti, çeviklik, yeni proseslər əlavə etmək asanlığı - bunu bəyənəcəksiniz. Əlbəttə ki, layihənin təşkilinə, Airflow-un özünün işinin sabitliyinə çox diqqət yetirmək lazımdır: möcüzələr yoxdur.

İndi bizdə Airflow gündəlik işləyir təxminən 6,5 min tapşırıq. Təbiət baxımından tamamilə fərqlidirlər. Bir çox fərqli və çox xüsusi mənbələrdən məlumatların əsas DWH-yə yüklənməsi üçün tapşırıqlar var, əsas DWH daxilində vitrinlərin hesablanması üçün tapşırıqlar var, məlumatları sürətli DWH-də dərc etmək üçün tapşırıqlar var, çoxlu sayda müxtəlif tapşırıqlar var - və Hava axını bütün gün onları çeynəyir. Rəqəmlərlə desək, belədir 2,3 min DWH (Hadoop) daxilində müxtəlif mürəkkəblikdə ELT tapşırıqları, haqqında 2,5 yüz verilənlər bazası mənbələrdən gələn bir əmrdir 4 ETL tərtibatçıları, bunlar DWH-də ETL məlumat emalına və DWH daxilində ELT məlumat emalına və əlbəttə ki, daha çoxuna bölünür. bir admin, xidmətin infrastrukturu ilə məşğul olan.

Gələcək üçün planlar

Proseslərin sayı qaçılmaz olaraq artır və Hava axını infrastrukturu baxımından edəcəyimiz əsas şey miqyaslandırmadır. Biz Hava axını klasteri qurmaq, Kərəviz işçiləri üçün bir neçə ayaq ayırmaq və iş planlaşdırma prosesləri və anbar ilə dublikat baş etmək istəyirik.

Epiloq

Bu, əlbəttə ki, Airflow haqqında danışmaq istədiyim hər şeydən uzaqdır, amma əsas məqamları vurğulamağa çalışdım. İştah yeməklə gəlir, cəhd edin, bəyənəcəksiniz 🙂

Mənbə: www.habr.com

Добавить комментарий