Воздухот е алатка за практично и брзо развивање и одржување на процесите на сериска обработка на податоци

Воздухот е алатка за практично и брзо развивање и одржување на процесите на сериска обработка на податоци

Здраво, Хабр! Во оваа статија сакам да зборувам за една одлична алатка за развој на процеси на сериска обработка на податоци, на пример, во инфраструктурата на корпоративното DWH или на вашето DataLake. Ќе зборуваме за Apache Airflow (во натамошниот текст како проток на воздух). Неправедно е лишено од внимание на Habré, и во главниот дел ќе се обидам да ве убедам дека барем вреди да се погледне Airflow при изборот на распоредувач за вашите ETL/ELT процеси.

Претходно напишав серија написи на тема DWH кога работев во Tinkoff Bank. Сега станав дел од тимот на Mail.Ru Group и развивам платформа за анализа на податоци во областа на игри. Всушност, како што се појавуваат вести и интересни решенија, јас и мојот тим овде ќе зборуваме за нашата платформа за анализа на податоци.

Пролог

Значи, да започнеме. Што е проток на воздух? Ова е библиотека (или збир на библиотеки) да развива, планира и следи работни процеси. Главната карактеристика на протокот на воздух: Пајтон кодот се користи за опишување (развивање) на процесите. Ова има многу предности за организирање на вашиот проект и развој: во суштина, вашиот (на пример) проект ETL е само проект на Python и можете да го организирате како што сакате, земајќи ги предвид спецификите на инфраструктурата, големината на тимот и други барања. Инструментално сè е едноставно. Користете на пример PyCharm + Git. Прекрасно е и многу погодно!

Сега да ги погледнеме главните ентитети на Airflow. Со разбирање на нивната суштина и цел, можете оптимално да ја организирате архитектурата на вашиот процес. Можеби главниот ентитет е насочен ацикличен график (во натамошниот текст ДАГ).

ДАГ

ДАГ е некоја значајна асоцијација на вашите задачи што сакате да ги завршите во строго дефинирана низа според одреден распоред. Воздухот обезбедува удобен веб-интерфејс за работа со DAG и други ентитети:

Воздухот е алатка за практично и брзо развивање и одржување на процесите на сериска обработка на податоци

ДАГ може да изгледа вака:

Воздухот е алатка за практично и брзо развивање и одржување на процесите на сериска обработка на податоци

Развивачот, при дизајнирање на ДАГ, поставува збир на оператори на кои ќе се градат задачи во рамките на ДАГ. Овде доаѓаме до уште еден важен субјект: Оператор на проток на воздух.

Оператори

Операторот е ентитет врз основа на кој се креираат инстанци за работа, што опишува што ќе се случи за време на извршувањето на примерот за работа. Протокот на воздух се ослободува од GitHub веќе содржи збир на оператори подготвени за употреба. Примери:

  • BashOperator - оператор за извршување на команда bash.
  • PythonOperator - оператор за повикување Python код.
  • EmailOperator — оператор за испраќање е-пошта.
  • HTTPOператор - оператор за работа со http барања.
  • SqlOperator - оператор за извршување на SQL код.
  • Сензор е оператор за чекање настан (доаѓање на потребното време, појава на потребната датотека, линија во базата на податоци, одговор од API, итн., итн.).

Постојат поспецифични оператори: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Можете исто така да развиете оператори врз основа на вашите сопствени карактеристики и да ги користите во вашиот проект. На пример, создадовме MongoDBToHiveViaHdfsTransfer, оператор за извоз на документи од MongoDB во Hive и неколку оператори за работа со Кликни куќа: CHLoadFromHiveOperator и CHTableLoaderOperator. Во суштина, штом проектот често користи код изграден на основни изјави, можете да размислите да го изградите во нова изјава. Ова ќе го поедностави понатамошниот развој и ќе ја проширите вашата библиотека на оператори во проектот.

Следно, сите овие примери на задачи треба да се извршат, а сега ќе зборуваме за распоредувачот.

Распоредувач

Распоредот на задачи на протокот на воздух е изграден на Целер. Celery е библиотека на Python која ви овозможува да организирате редица плус асинхроно и дистрибуирано извршување на задачите. На страната на протокот на воздух, сите задачи се поделени на базени. Базените се создаваат рачно. Обично, нивната цел е да го ограничат обемот на работа со изворот или да типизираат задачи во рамките на DWH. Со базените може да се управува преку веб-интерфејсот:

Воздухот е алатка за практично и брзо развивање и одржување на процесите на сериска обработка на податоци

Секој базен има ограничување на бројот на слотови. Кога креирате ДАГ, му се дава базен:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Базен дефиниран на ниво на ДАГ може да се отфрли на ниво на задача.
Посебен процес, Распоредувач, е одговорен за закажување на сите задачи во Воздухот. Всушност, Распоредувачот се занимава со сета механика на поставување задачи за извршување. Задачата поминува низ неколку фази пред да се изврши:

  1. Претходните задачи се завршени во ДАГ, може да се чека нова.
  2. Редот се подредува во зависност од приоритетот на задачите (приоритетите исто така може да се контролираат), а ако има слободен слот во базенот, задачата може да се стави во функција.
  3. Ако има бесплатен работник целер, задачата му се испраќа; започнува работата што сте ја програмирале во проблемот, користејќи еден или друг оператор.

Доволно едноставно.

Распоредувачот работи на множеството од сите DAG и сите задачи во рамките на DAG.

За Распоредувачот да започне да работи со ДАГ, ДАГ треба да постави распоред:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Постои сет на готови претходно поставени поставки: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Можете исто така да користите cron изрази:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Датум на извршување

За да разберете како функционира протокот на воздух, важно е да се разбере кој е датумот на извршување за ДАГ. Во Airflow, DAG има димензија на датум на извршување, т.е., во зависност од работниот распоред на DAG, се креираат примероци на задачи за секој датум на извршување. И за секој датум на извршување, задачите може повторно да се извршат - или, на пример, DAG може да работи истовремено во неколку датуми на извршување. Ова е јасно прикажано овде:

Воздухот е алатка за практично и брзо развивање и одржување на процесите на сериска обработка на податоци

За жал (или можеби за среќа: зависи од ситуацијата), доколку се коригира спроведувањето на задачата во ДАГ, тогаш извршувањето во претходниот Датум на извршување ќе продолжи земајќи ги предвид прилагодувањата. Ова е добро ако треба повторно да ги пресметате податоците во минатите периоди користејќи нов алгоритам, но е лошо бидејќи се губи репродуктивноста на резултатот (се разбира, никој не ви пречи да ја вратите потребната верзија на изворниот код од Git и да пресметате што ти треба еднократно, онака како што ти треба).

Генерирање задачи

Имплементацијата на DAG е код во Python, така што имаме многу удобен начин да го намалиме количеството на код кога работиме, на пример, со исечени извори. Да речеме дека имате три MySQL фрагменти како извор, треба да се качите во секоја од нив и да земете некои податоци. Згора на тоа, независно и паралелно. Кодот на Python во DAG може да изгледа вака:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

ДАГ изгледа вака:

Воздухот е алатка за практично и брзо развивање и одржување на процесите на сериска обработка на податоци

Во овој случај, можете да додадете или отстраните парче со едноставно прилагодување на поставките и ажурирање на DAG. Удобно!

Можете исто така да користите посложени генерирање кодови, на пример, да работите со извори во форма на база на податоци или да опишете структура на табела, алгоритам за работа со табела и, земајќи ги предвид карактеристиките на инфраструктурата DWH, да генерирате процес за вчитување на N табели во вашиот склад. Или, на пример, ако работите со API што не поддржува работа со параметар во форма на листа, можете да генерирате N задачи во DAG од оваа листа, да го ограничите паралелизмот на барањата во API на базен и да го избришете потребни податоци од API. Флексибилен!

складиште

Воздухот има свое складиште за заднина, база на податоци (може да биде MySQL или Postgres, имаме Postgres), која ги складира состојбите на задачи, DAG, поставките за поврзување, глобалните променливи итн., итн. Овде би сакал да кажам дека складиштето во Airflow е многу едноставно (околу 20 табели) и практично ако сакате да изградите некој од вашите сопствени процеси врз него. Се сеќавам на 100500 табели во складиштето Informatica, кои требаше да се проучуваат долго време пред да се разбере како да се изгради барање.

Мониторинг

Со оглед на едноставноста на складиштето, можете да изградите процес на следење на задачи што е погоден за вас. Ние користиме бележник во Цепелин, каде што го разгледуваме статусот на задачите:

Воздухот е алатка за практично и брзо развивање и одржување на процесите на сериска обработка на податоци

Ова може да биде и веб-интерфејсот на самиот Airflow:

Воздухот е алатка за практично и брзо развивање и одржување на процесите на сериска обработка на податоци

Кодот за проток на воздух е со отворен код, така што додадовме предупредување во Telegram. Секој работен пример на задача, ако се појави грешка, ја испраќа групата во Телеграм, каде што се состои целиот тим за развој и поддршка.

Добиваме брз одговор преку Telegram (ако е потребно), а преку Zeppelin добиваме целокупна слика за задачите во Airflow.

Во вкупен

Воздухот е првенствено отворен извор и не треба да очекувате чуда од него. Бидете подготвени да вложите време и напор за да изградите решение кое функционира. Целта е остварлива, верувајте, вреди. Брзина на развој, флексибилност, леснотија на додавање нови процеси - ќе ви се допадне. Се разбира, треба да посветите многу внимание на организацијата на проектот, на стабилноста на самиот проток на воздух: чуда не се случуваат.

Сега имаме Airflow кој работи секојдневно околу 6,5 илјади задачи. Тие се сосема различни по карактер. Има задачи за вчитување податоци во главниот DWH од многу различни и многу специфични извори, има задачи за пресметување на излози во главниот DWH, има задачи за објавување податоци во брз DWH, има многу, многу различни задачи - и проток на воздух ги џвака сите ден по ден. Говорејќи во бројки, ова е 2,3 илјади ELT задачи со различна сложеност во рамките на DWH (Hadoop), прибл. 2,5 стотина бази на податоци извори, ова е тим од 4 ETL програмери, кои се поделени на ETL обработка на податоци во DWH и ELT обработка на податоци внатре во DWH и секако повеќе еден админ, кој се занимава со инфраструктурата на услугата.

Планови за иднината

Бројот на процеси неизбежно расте, а главната работа што ќе ја правиме во однос на инфраструктурата на Airflow е скалирање. Сакаме да изградиме кластер за проток на воздух, да одвоиме пар ногарки за работниците во Celery и да направиме глава што се дуплира самостојно со процеси на распоред на работни места и складиште.

Епилог

Ова, се разбира, не е сè што би сакал да кажам за протокот на воздух, но се обидов да ги истакнам главните точки. Апетитот доаѓа со јадење, пробајте и ќе ви се допадне :)

Извор: www.habr.com

Додадете коментар