Airflow - бул партиялык маалыматтарды иштетүү процесстерин ыңгайлуу жана тез иштеп чыгуу жана колдоо үчүн курал

Airflow - бул партиялык маалыматтарды иштетүү процесстерин ыңгайлуу жана тез иштеп чыгуу жана колдоо үчүн курал

Салам, Хабр! Бул макалада мен, мисалы, корпоративдик DWH же сиздин DataLake инфраструктурасында пакеттик маалыматтарды иштеп чыгуу процесстерин өнүктүрүү үчүн эң сонун курал жөнүндө айткым келет. Биз Apache Airflow (мындан ары Airflow деп аталат) жөнүндө сүйлөшөбүз. Habréге көңүл бурулбай калган жана негизги бөлүгүндө мен ETL/ELT процесстери үчүн пландаштыргычты тандоодо жок дегенде Airflow көңүл бурууга арзырлык экенине ишендирүүгө аракет кылам.

Мурда мен Tinkoff банкында иштеп жүргөндө DWH темасында бир катар макалаларды жазгам. Азыр мен Mail.Ru Group командасынын бир бөлүгү болуп калдым жана оюн чөйрөсүндө маалыматтарды талдоо үчүн платформа иштеп жатам. Чынында, жаңылыктар жана кызыктуу чечимдер пайда болгондо, менин командам экөөбүз бул жерде маалымат аналитикасы үчүн платформабыз жөнүндө сүйлөшөбүз.

Кириш сөз

Ошентип, баштайлы. Аба агымы деген эмне? Бул китепкана (же китепканалардын жыйындысы) иш процесстерин иштеп чыгуу, пландаштыруу жана көзөмөлдөө. Airflow негизги өзгөчөлүгү: Python коду процесстерди сүрөттөө (иштеп чыгуу) үчүн колдонулат. Бул сиздин долбооруңузду жана өнүгүүңүздү уюштуруу үчүн көптөгөн артыкчылыктарга ээ: сиздин (мисалы) ETL проектиңиз жөн гана Python долбоору жана сиз аны инфраструктуранын өзгөчөлүктөрүн, команданын көлөмүн жана башка талаптар. Инструменталдык жактан баары жөнөкөй. Мисалы, PyCharm + Git колдонуңуз. Бул сонун жана абдан ыңгайлуу!

Эми Airflow негизги объектилерин карап көрөлү. Алардын маңызын жана максатын түшүнүү менен сиз процессиңиздин архитектурасын оптималдуу түрдө уюштура аласыз. Балким, негизги объект Багытталган Ациклдик График (мындан ары DAG) болуп саналат.

ДАГ

DAG - бул белгилүү бир графикке ылайык так аныкталган ырааттуулукта бүтүргүңүз келген тапшырмаларыңыздын кандайдыр бир маанилүү бирикмеси. Airflow DAG жана башка объектилер менен иштөө үчүн ыңгайлуу веб-интерфейсти камсыз кылат:

Airflow - бул партиялык маалыматтарды иштетүү процесстерин ыңгайлуу жана тез иштеп чыгуу жана колдоо үчүн курал

DAG мындай көрүнүшү мүмкүн:

Airflow - бул партиялык маалыматтарды иштетүү процесстерин ыңгайлуу жана тез иштеп чыгуу жана колдоо үчүн курал

Иштеп чыгуучу DAGды долбоорлоодо, DAG алкагында милдеттер курула турган операторлордун топтомун белгилейт. Бул жерде биз дагы бир маанилүү объектке келдик: Аба агымынын оператору.

операторлор

Оператор - бул жумуш инстанциялары түзүлгөн, анын негизинде жумуш инстанциясын аткаруу учурунда эмне болоорун сүрөттөгөн субъект. GitHub'дан аба агымы чыгарылат колдонууга даяр операторлордун топтомун камтыйт. Мисалдар:

  • BashOperator - bash буйругун аткаруу үчүн оператор.
  • PythonOperator - Python кодун чакыруу үчүн оператор.
  • EmailOperator — электрондук кат жөнөтүү үчүн оператор.
  • HTTPOperator - http сурамдары менен иштөө үчүн оператор.
  • SqlOperator - SQL кодун аткаруу үчүн оператор.
  • Сенсор – бул окуяны күтүү оператору (талап кылынган убакыттын келиши, талап кылынган файлдын көрүнүшү, маалымат базасындагы сызык, APIден жооп ж.б.у.с.).

Конкреттүү операторлор бар: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Сиз ошондой эле өзүңүздүн өзгөчөлүктөрүңүздүн негизинде операторлорду иштеп чыгып, аларды долбооруңузда колдоно аласыз. Мисалы, MongoDBToHiveViaHdfsTransfer, MongoDBден Hiveге документтерди экспорттоо операторун жана алар менен иштөө үчүн бир нече операторлорду түздүк. Clickhouse: CHLoadFromHiveOperator жана CHTableLoaderOperator. Негизи, долбоор негизги билдирүүлөргө негизделген кодду көп колдонгондо, аны жаңы билдирүүгө куруу жөнүндө ойлонсоңуз болот. Бул андан аркы өнүгүүнү жөнөкөйлөтөт жана сиз долбоордогу операторлоруңуздун китепканасын кеңейтесиз.

Андан кийин, тапшырмалардын бардык инстанциялары аткарылышы керек, эми биз пландоочу жөнүндө сүйлөшөбүз.

Пландаштыруучу

Аба агымынын тапшырма пландоочусу курулган сельдерей. Сельдерей - бул Python китепканасы, ал кезекти уюштурууга, ошондой эле асинхрондуу жана бөлүштүрүлгөн тапшырмаларды аткарууга мүмкүндүк берет. Аба агымы тарабында бардык тапшырмалар бассейндерге бөлүнөт. Бассейндер кол менен түзүлөт. Эреже катары, алардын максаты булак менен иштөөнүн жүгүн чектөө же DWH ичиндеги тапшырмаларды типтештирүү болуп саналат. Бассейндерди веб-интерфейс аркылуу башкарууга болот:

Airflow - бул партиялык маалыматтарды иштетүү процесстерин ыңгайлуу жана тез иштеп чыгуу жана колдоо үчүн курал

Ар бир бассейнде уячалардын саны боюнча чектөө бар. DAG түзүүдө ага бассейн берилет:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

DAG деңгээлинде аныкталган бассейн тапшырма деңгээлинде жокко чыгарылышы мүмкүн.
Аба агымындагы бардык тапшырмаларды пландаштыруу үчүн өзүнчө процесс, Scheduler жооптуу. Чынында, Scheduler аткаруу үчүн милдеттерди коюунун бардык механикасы менен алектенет. Тапшырма аткарылганга чейин бир нече этаптан өтөт:

  1. ДАГда мурунку тапшырмалар аткарылды, жаңысын кезекке коюуга болот.
  2. Кезек тапшырмалардын артыкчылыктуулугуна жараша иргелет (артыкчылыктар да көзөмөлдөнүшү мүмкүн), ал эми бассейнде бош уяча бар болсо, тапшырма ишке киргизилиши мүмкүн.
  3. бекер жумушчу сельдерей бар болсо, тапшырма ага жиберилет; маселеде сиз программалаган иш тигил же бул оператордун жардамы менен башталат.

Жетиштүү жөнөкөй.

Пландаштыруучу бардык DAGлардын топтомунда жана DAG ичиндеги бардык тапшырмаларда иштейт.

Scheduler DAG менен иштей башташы үчүн, DAG графикти орнотуу керек:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Даяр алдын ала орнотуулар топтому бар: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Сиз ошондой эле cron туюнтмаларын колдоно аласыз:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Аткаруу датасы

Аба агымы кантип иштээрин түшүнүү үчүн, DAG үчүн Аткаруу датасы кандай экенин түшүнүү маанилүү. Аба агымында, DAG Аткаруу күнү өлчөмгө ээ, б.а., DAGнын иш графигине жараша, тапшырма инстанциялары ар бир Аткаруу Датасы үчүн түзүлөт. Жана ар бир Аткаруу Датасы үчүн тапшырмалар кайра аткарылышы мүмкүн - же, мисалы, DAG бир эле учурда бир нече Аткаруу Даталарында иштей алат. Бул жерде ачык көрүнүп турат:

Airflow - бул партиялык маалыматтарды иштетүү процесстерин ыңгайлуу жана тез иштеп чыгуу жана колдоо үчүн курал

Тилекке каршы (же балким, бактыга жараша: бул кырдаалга жараша болот), эгерде DAGда тапшырманы ишке ашыруу оңдолсо, анда мурунку Аткаруу күнүндө аткаруу оңдоолорду эске алуу менен уланат. Эгер жаңы алгоритмди колдонуу менен өткөн мезгилдердеги маалыматтарды кайра эсептөө керек болсо, бул жакшы, бирок бул жаман, анткени натыйжанын кайталануу мүмкүнчүлүгү жоголот (албетте, Gitтен баштапкы коддун талап кылынган версиясын кайтарып, эмне болорун эсептөө үчүн эч ким сизди убара кылбайт. сизге бир жолу керек, сизге кандай керек болсо).

Тапшырмаларды түзүү

DAGти ишке ашыруу Pythonдогу код, ошондуктан бизде коддун көлөмүн кыскартуунун абдан ыңгайлуу жолу бар, мисалы, майдаланган булактар ​​менен. Булак катары сизде үч MySQL сыныктары бар дейли, ар бирине кирип, кээ бир маалыматтарды алышыңыз керек. Мындан тышкары, өз алдынча жана параллелдүү. DAGдагы Python коду мындай көрүнүшү мүмкүн:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG төмөнкүдөй көрүнөт:

Airflow - бул партиялык маалыматтарды иштетүү процесстерин ыңгайлуу жана тез иштеп чыгуу жана колдоо үчүн курал

Бул учурда, жөн гана жөндөөлөрдү тууралоо жана DAG жаңыртуу менен сыныктарды кошуп же алып салсаңыз болот. Ыңгайлуу!

Сиз ошондой эле татаалыраак кодду жаратууну колдоно аласыз, мисалы, маалымат базасы түрүндөгү булактар ​​менен иштөө же таблица структурасын, таблица менен иштөө алгоритмин сүрөттөп, жана DWH инфраструктурасынын өзгөчөлүктөрүн эске алуу менен процессти генерациялай аласыз. сактагычыңызга N таблицаларды жүктөө үчүн. Же, мисалы, тизме түрүндөгү параметр менен иштөөнү колдоого албаган API менен иштөө, сиз бул тизмеден DAGда N тапшырманы жаратып, APIдеги суроо-талаптардын параллелдүүлүгүн бассейнге чектеп, кырып салсаңыз болот. API керектүү маалыматтар. Ийкемдүү!

репозиторий

Airflow өзүнүн резервдик репозиторийине ээ, маалымат базасы (MySQL же Postgres болушу мүмкүн, бизде Postgres бар), анда тапшырмалардын абалын, DAGларды, туташуу жөндөөлөрүн, глобалдык өзгөрмөлөрдү ж.б.у.с. сактайт. Бул жерде мен айткым келет Airflow ичиндеги репозиторий абдан жөнөкөй (болжол менен 20 таблица) жана анын үстүнө өз процесстериңизди курууну кааласаңыз, ыңгайлуу. Мен Informatica репозиторийиндеги 100500 XNUMX таблицаны эстейм, алар суроону кантип түзүүнү түшүнүш үчүн көпкө чейин изилдениши керек болчу.

Мониторинг

Репозиторийдин жөнөкөйлүгүн эске алуу менен, сиз үчүн ыңгайлуу болгон тапшырманы көзөмөлдөө процессин түзө аласыз. Биз Zeppelin-де блокнот колдонобуз, анда биз тапшырмалардын абалын карайбыз:

Airflow - бул партиялык маалыматтарды иштетүү процесстерин ыңгайлуу жана тез иштеп чыгуу жана колдоо үчүн курал

Бул ошондой эле Airflow веб-интерфейси болушу мүмкүн:

Airflow - бул партиялык маалыматтарды иштетүү процесстерин ыңгайлуу жана тез иштеп чыгуу жана колдоо үчүн курал

Аба агымынын коду ачык булак болгондуктан, биз Telegramга эскертүүлөрдү коштук. Тапшырманын ар бир иштеп жаткан нускасы, эгер ката пайда болсо, бүт иштеп чыгуу жана колдоо тобу турган Telegramдагы топту спам кылат.

Биз Telegram аркылуу ыкчам жооп алабыз (зарыл болсо) жана Zeppelin аркылуу биз Airflow'тагы тапшырмалардын жалпы сүрөтүн алабыз.

жалпы

Аба агымы биринчи кезекте ачык булак жана андан керемет күтпөш керек. Иштей турган чечимди куруу үчүн убакытты жана күч-аракетти жумшоого даяр болуңуз. Максатка жетүүгө болот, мага ишен, бул татыктуу. Өнүгүү ылдамдыгы, ийкемдүүлүк, жаңы процесстерди кошуунун оңойлугу - сизге жагат. Албетте, долбоордун уюштурулушуна, Аба агымынын туруктуулугуна көп көңүл буруу керек: кереметтер болбойт.

Азыр биз күн сайын Airflow иштеп жатабыз 6,5 минге жакын тапшырма. Алар мүнөзү боюнча такыр башкача. Негизги DWHге ар кандай жана өзгөчө булактардан маалыматтарды жүктөө тапшырмалары бар, негизги DWH ичиндеги дүкөндөрдүн маңдайкы беттерин эсептөө тапшырмалары бар, маалыматтарды тез DWHге жарыялоо тапшырмалары бар, көптөгөн, көптөгөн ар кандай тапшырмалар бар - жана аба агымы алардын бардыгын кунден-кунге чайнайт. Сандар менен айтканда, бул 2,3 DWH (Hadoop) ичинде ар кандай татаалдыктагы ELT тапшырмалары, болжол менен. 2,5 жүз маалымат базасы булактар, бул бир команда 4 ETL иштеп чыгуучулары, алар DWHде ETL маалыматтарды иштетүүгө жана DWH ичинде ELT маалыматтарды иштетүүгө бөлүнөт жана, албетте, бир админ, кызматтын инфраструктурасы менен ким алектенет.

келечек үчүн пландар

Процесстердин саны сөзсүз түрдө өсүп жатат жана биз аба агымынын инфраструктурасы боюнча жасай турган негизги нерсе - масштабдоо. Биз Airflow кластерин кургубуз келет, Сельдерей жумушчулары үчүн бир жуп бут бөлүп, жумуш пландаштыруу процесстери жана репозиторий менен өзүн-өзү кайталаган башты жасагыбыз келет.

эпилогунда

Бул, албетте, мен Airflow жөнүндө айткым келгендин баары эмес, бирок мен негизги ойлорду баса белгилегенге аракет кылдым. Аппетит тамактануу менен келет, жасап көрүңүз, сизге жагат :)

Source: www.habr.com

Комментарий кошуу