Օդային հոսքը գործիք է խմբաքանակային տվյալների մշակման գործընթացները հարմար և արագ մշակելու և պահպանելու համար

Օդային հոսքը գործիք է խմբաքանակային տվյալների մշակման գործընթացները հարմար և արագ մշակելու և պահպանելու համար

Բարև, Հաբր: Այս հոդվածում ես ուզում եմ խոսել խմբաքանակային տվյալների մշակման գործընթացների մշակման մեկ հիանալի գործիքի մասին, օրինակ՝ կորպորատիվ DWH-ի կամ ձեր DataLake-ի ենթակառուցվածքում: Մենք կխոսենք Apache Airflow-ի մասին (այսուհետ՝ Airflow): Այն անարդարացիորեն զրկված է Habré-ի ուշադրությունից, և հիմնականում ես կփորձեմ ձեզ համոզել, որ գոնե Airflow-ին արժե նայել ձեր ETL/ELT գործընթացների ժամանակացույց ընտրելիս:

Նախկինում ես գրել էի մի շարք հոդվածներ DWH թեմայով, երբ աշխատում էի Tinkoff Bank-ում: Այժմ ես դարձել եմ Mail.Ru Group-ի թիմի անդամ և մշակում եմ խաղերի ոլորտում տվյալների վերլուծության հարթակ: Իրականում, երբ հայտնվում են նորություններ և հետաքրքիր լուծումներ, ես և իմ թիմը այստեղ կխոսենք տվյալների վերլուծության մեր հարթակի մասին:

Նախաբան

Այսպիսով, եկեք սկսենք: Ի՞նչ է օդային հոսքը: Սա գրադարան է (կամ գրադարանների հավաքածու) մշակել, պլանավորել և վերահսկել աշխատանքային գործընթացները: Airflow-ի հիմնական առանձնահատկությունը. Python կոդը օգտագործվում է գործընթացները նկարագրելու (մշակելու): Սա շատ առավելություններ ունի ձեր նախագիծը և զարգացումը կազմակերպելու համար. ըստ էության, ձեր (օրինակ) ETL նախագիծը պարզապես Python նախագիծ է, և դուք կարող եք այն կազմակերպել այնպես, ինչպես ցանկանում եք՝ հաշվի առնելով ենթակառուցվածքի առանձնահատկությունները, թիմի չափը և այլ պահանջներ: Գործիքային առումով ամեն ինչ պարզ է. Օգտագործեք օրինակ PyCharm + Git: Հրաշալի է և շատ հարմար։

Հիմա եկեք դիտարկենք Airflow-ի հիմնական սուբյեկտները: Հասկանալով դրանց էությունն ու նպատակը, դուք կարող եք օպտիմալ կերպով կազմակերպել ձեր գործընթացի ճարտարապետությունը: Թերևս հիմնական սուբյեկտը Ուղղորդված ացիկլիկ գրաֆիկն է (այսուհետ՝ DAG):

DAG

DAG-ը ձեր առաջադրանքների որոշակի իմաստալից ասոցիացիա է, որը դուք ցանկանում եք կատարել խիստ սահմանված հաջորդականությամբ՝ ըստ որոշակի ժամանակացույցի: Airflow-ն ապահովում է հարմար վեբ ինտերֆեյս DAG-ների և այլ կազմակերպությունների հետ աշխատելու համար.

Օդային հոսքը գործիք է խմբաքանակային տվյալների մշակման գործընթացները հարմար և արագ մշակելու և պահպանելու համար

DAG-ն կարող է այսպիսի տեսք ունենալ.

Օդային հոսքը գործիք է խմբաքանակային տվյալների մշակման գործընթացները հարմար և արագ մշակելու և պահպանելու համար

Մշակողը, երբ նախագծում է DAG, սահմանում է օպերատորների մի շարք, որոնց վրա կկառուցվեն DAG-ի շրջանակներում առաջադրանքները: Այստեղ մենք հասնում ենք մեկ այլ կարևոր սուբյեկտի՝ օդային հոսքի օպերատորին:

Օպերատորներ

Օպերատորը կազմակերպություն է, որի հիման վրա ստեղծվում են աշխատատեղերի օրինակներ, որոնք նկարագրում են, թե ինչ է տեղի ունենալու աշխատանքային օրինակի կատարման ընթացքում: Օդի հոսքը թողարկվում է GitHub-ից արդեն պարունակում է օգտագործման համար պատրաստ օպերատորների մի շարք: Օրինակներ.

  • BashOperator - օպերատոր՝ bash հրամանի կատարման համար:
  • PythonOperator - Python կոդը կանչելու օպերատոր:
  • EmailOperator — էլփոստ ուղարկելու օպերատոր:
  • HTTPOoperator - http հարցումների հետ աշխատելու օպերատոր:
  • SqlOperator - SQL կոդի կատարման օպերատոր:
  • Սենսորը իրադարձության սպասման օպերատոր է (պահանջվող ժամանակի ժամանումը, անհրաժեշտ ֆայլի հայտնվելը, տվյալների բազայում տող, API-ի պատասխան և այլն և այլն):

Կան ավելի կոնկրետ օպերատորներ՝ DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator։

Դուք կարող եք նաև զարգացնել օպերատորներ՝ հիմնվելով ձեր սեփական հատկանիշների վրա և օգտագործել դրանք ձեր նախագծում: Օրինակ՝ մենք ստեղծեցինք MongoDBToHiveViaHdfsTransfer-ը՝ MongoDB-ից Hive փաստաթղթեր արտահանելու օպերատոր, և մի քանի օպերատորներ՝ հետ աշխատելու համար։ clickhouseCHLoadFromHiveOperator և CHTableLoaderOperator: Ըստ էության, հենց որ նախագիծը հաճախակի օգտագործում է հիմնական հայտարարությունների վրա կառուցված ծածկագիրը, դուք կարող եք մտածել այն նոր հայտարարության մեջ կառուցելու մասին: Սա կհեշտացնի հետագա զարգացումը, և դուք կընդլայնեք ձեր օպերատորների գրադարանը նախագծում:

Հաջորդը, առաջադրանքների այս բոլոր օրինակները պետք է կատարվեն, և այժմ մենք կխոսենք ժամանակացույցի մասին:

Ժամանակացույց

Airflow-ի առաջադրանքների ժամանակացույցը կառուցված է Կարոս. Celery-ը Python գրադարան է, որը թույլ է տալիս կազմակերպել հերթ՝ գումարած առաջադրանքների ասինխրոն և բաշխված կատարում: Օդային հոսքի կողմում բոլոր առաջադրանքները բաժանված են լողավազանների: Լողավազանները ստեղծվում են ձեռքով: Սովորաբար, դրանց նպատակն է սահմանափակել աղբյուրի հետ աշխատելու ծանրաբեռնվածությունը կամ DWH-ում առաջադրանքների ձևակերպումը: Լողավազանները կարող են կառավարվել վեբ ինտերֆեյսի միջոցով.

Օդային հոսքը գործիք է խմբաքանակային տվյալների մշակման գործընթացները հարմար և արագ մշակելու և պահպանելու համար

Յուրաքանչյուր լողավազան ունի սլոտների քանակի սահմանափակում: DAG ստեղծելիս նրան տրվում է լողավազան.

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

DAG-ի մակարդակով սահմանված լողավազանը կարող է վերացվել առաջադրանքի մակարդակում:
Առանձին գործընթաց՝ Scheduler, պատասխանատու է Airflow-ում բոլոր առաջադրանքների պլանավորման համար: Իրականում, Scheduler-ը զբաղվում է կատարման առաջադրանքները դնելու բոլոր մեխանիզմներով: Առաջադրանքը կատարելուց առաջ անցնում է մի քանի փուլով.

  1. Նախկին առաջադրանքները կատարվել են DAG-ում, նորը կարող է հերթագրվել:
  2. Հերթը դասակարգվում է կախված առաջադրանքների առաջնահերթությունից (առաջնահերթությունները կարող են նաև վերահսկվել), և եթե լողավազանում ազատ տեղ կա, ապա առաջադրանքը կարող է գործարկվել:
  3. Եթե ​​կա անվճար բանվորական նեխուր, առաջադրանքն ուղարկվում է նրան; աշխատանքը, որը դուք ծրագրավորել եք խնդրի մեջ, սկսվում է՝ օգտագործելով այս կամ այն ​​օպերատորը:

Բավականին պարզ:

Scheduler-ն աշխատում է բոլոր DAG-ների և DAG-ի բոլոր առաջադրանքների վրա:

Որպեսզի Scheduler-ը սկսի աշխատել DAG-ի հետ, DAG-ն պետք է սահմանի ժամանակացույց.

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Կա պատրաստի նախադրյալների մի շարք. @once, @hourly, @daily, @weekly, @monthly, @yearly.

Կարող եք նաև օգտագործել cron արտահայտությունները.

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Կատարման ամսաթիվը

Հասկանալու համար, թե ինչպես է աշխատում օդային հոսքը, կարևոր է հասկանալ, թե որն է Կատարման ամսաթիվը DAG-ի համար: Օդային հոսքում DAG-ն ունի կատարման ամսաթվի չափ, այսինքն՝ կախված DAG-ի աշխատանքային գրաֆիկից, առաջադրանքների օրինակները ստեղծվում են յուրաքանչյուր Կատարման ամսաթվի համար: Եվ յուրաքանչյուր Կատարման ամսաթվի համար առաջադրանքները կարող են կրկին կատարվել, կամ, օրինակ, DAG-ը կարող է միաժամանակ աշխատել մի քանի Կատարման ամսաթվերում: Սա հստակ երևում է այստեղ.

Օդային հոսքը գործիք է խմբաքանակային տվյալների մշակման գործընթացները հարմար և արագ մշակելու և պահպանելու համար

Ցավոք (կամ գուցե բարեբախտաբար. դա կախված է իրավիճակից), եթե DAG-ում առաջադրանքի կատարումը շտկվի, ապա նախորդ Կատարման ամսաթվով կատարումը կշարունակվի՝ հաշվի առնելով ճշգրտումները: Սա լավ է, եթե ձեզ անհրաժեշտ է վերահաշվարկել անցյալ ժամանակաշրջանների տվյալները՝ օգտագործելով նոր ալգորիթմը, բայց դա վատ է, քանի որ արդյունքի վերարտադրելիությունը կորչում է (իհարկե, ոչ ոք ձեզ չի խանգարում Git-ից վերադարձնել սկզբնաղբյուրի պահանջվող տարբերակը և հաշվարկել, թե ինչն է: Ձեզ անհրաժեշտ է մեկանգամյա, այնպես, ինչպես դա ձեզ անհրաժեշտ է):

Առաջադրանքների ստեղծում

DAG-ի ներդրումը Python-ում կոդ է, ուստի մենք ունենք շատ հարմար միջոց՝ կրճատելու կոդի քանակը, երբ աշխատում ենք, օրինակ, բեկորային աղբյուրներով։ Ենթադրենք, որ դուք ունեք երեք MySQL բեկորներ որպես աղբյուր, դուք պետք է բարձրանաք յուրաքանչյուրի մեջ և հավաքեք որոշ տվյալներ: Ընդ որում, ինքնուրույն ու զուգահեռաբար։ Python կոդը DAG-ում կարող է այսպիսի տեսք ունենալ.

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG-ն ունի հետևյալ տեսքը.

Օդային հոսքը գործիք է խմբաքանակային տվյալների մշակման գործընթացները հարմար և արագ մշակելու և պահպանելու համար

Այս դեպքում դուք կարող եք ավելացնել կամ հեռացնել բեկոր՝ պարզապես կարգավորելով կարգավորումները և թարմացնելով DAG-ը: Հարմարավետ!

Կարող եք նաև օգտագործել ավելի բարդ կոդերի ստեղծում, օրինակ՝ աշխատել աղբյուրների հետ տվյալների բազայի տեսքով կամ նկարագրել աղյուսակի կառուցվածքը, աղյուսակի հետ աշխատելու ալգորիթմը և, հաշվի առնելով DWH ենթակառուցվածքի առանձնահատկությունները, ստեղծել գործընթաց։ N աղյուսակներ ձեր պահեստում բեռնելու համար: Կամ, օրինակ, աշխատելով API-ի հետ, որը չի աջակցում պարամետրի հետ աշխատել ցանկի տեսքով, դուք կարող եք ստեղծել N առաջադրանք DAG-ում այս ցուցակից, սահմանափակել API-ի հարցումների զուգահեռությունը լողավազանով և քերել անհրաժեշտ տվյալներ API-ից: Ճկուն!

պահոց

Airflow-ն ունի իր backend պահեստը, տվյալների բազան (կարող է լինել MySQL կամ Postgres, մենք ունենք Postgres), որը պահպանում է առաջադրանքների վիճակները, DAG-ները, կապի կարգավորումները, գլոբալ փոփոխականները և այլն, և այլն: Այստեղ ես կցանկանայի ասել, որ Airflow-ի պահեստը շատ պարզ է (մոտ 20 աղյուսակ) և հարմար, եթե ցանկանում եք դրա վրա կառուցել ձեր սեփական գործընթացներից որևէ մեկը: Հիշում եմ Informatica-ի շտեմարանի 100500 աղյուսակները, որոնք պետք էր երկար ուսումնասիրել՝ հասկանալու համար, թե ինչպես կարելի է հարցում կառուցել։

Մոնիտորինգ

Հաշվի առնելով պահեստի պարզությունը, դուք կարող եք ստեղծել առաջադրանքների մոնիտորինգի գործընթաց, որը հարմար է ձեզ համար: Մենք օգտագործում ենք նոթատետր Zeppelin-ում, որտեղ մենք նայում ենք առաջադրանքների կարգավիճակին.

Օդային հոսքը գործիք է խմբաքանակային տվյալների մշակման գործընթացները հարմար և արագ մշակելու և պահպանելու համար

Սա կարող է լինել նաև Airflow-ի վեբ ինտերֆեյսը.

Օդային հոսքը գործիք է խմբաքանակային տվյալների մշակման գործընթացները հարմար և արագ մշակելու և պահպանելու համար

Օդային հոսքի կոդը բաց կոդով է, ուստի մենք ծանուցում ենք ավելացրել Telegram-ում: Առաջադրանքի յուրաքանչյուր գործող օրինակ, եթե սխալ է տեղի ունենում, խումբը սպամ է ուղարկում Telegram-ում, որտեղ կազմված է զարգացման և աջակցման ողջ թիմը:

Մենք անհապաղ պատասխան ենք ստանում Telegram-ի միջոցով (անհրաժեշտության դեպքում), իսկ Zeppelin-ի միջոցով մենք ստանում ենք Airflow-ի առաջադրանքների ընդհանուր պատկերը:

Ընդհանուր

Օդային հոսքը հիմնականում բաց կոդով է, և դրանից հրաշքներ սպասել պետք չէ: Պատրաստ եղեք ժամանակ և ջանք գործադրել՝ արդյունավետ լուծում ստեղծելու համար: Նպատակը հասանելի է, հավատացեք, արժե այն: Զարգացման արագություն, ճկունություն, նոր գործընթացներ ավելացնելու հեշտություն՝ դա ձեզ դուր կգա: Իհարկե, պետք է մեծ ուշադրություն դարձնել նախագծի կազմակերպմանը, օդային հոսքի կայունությանը. հրաշքներ չեն լինում:

Այժմ մենք ամեն օր աշխատում ենք Airflow-ում շուրջ 6,5 հազար առաջադրանք. Նրանք բնավորությամբ բավականին տարբեր են։ Կան բազմաթիվ տարբեր և շատ հատուկ աղբյուրներից տվյալների բեռնման հիմնական DWH-ում, կան հիմնական DWH-ի ներսում ցուցափեղկերը հաշվարկելու առաջադրանքներ, կան արագ DWH տվյալների հրապարակման առաջադրանքներ, կան շատ ու շատ տարբեր առաջադրանքներ, և Airflow: ամեն օր ծամում է դրանք: Եթե ​​խոսենք թվերով, սա է 2,3 հզ Տարբեր բարդության ELT առաջադրանքներ DWH-ի շրջանակներում (Hadoop), մոտ. 2,5 հարյուր շտեմարաններ աղբյուրները, սա թիմ է 4 ETL մշակողներ, որոնք բաժանված են ETL տվյալների մշակման DWH-ում և ELT տվյալների մշակման DWH-ի ներսում և, իհարկե, ավելին մեկ ադմին, ով զբաղվում է ծառայության ենթակառուցվածքով։

Պլանները ապագայի համար

Գործընթացների թիվն անխուսափելիորեն աճում է, և գլխավորը, որ մենք անելու ենք Airflow ենթակառուցվածքի առումով, մասշտաբն է: Մենք ցանկանում ենք ստեղծել Airflow կլաստեր, հատկացնել զույգ ոտքեր Celery-ի աշխատողների համար և պատրաստել ինքնակրկնվող գլուխ՝ աշխատանքի պլանավորման գործընթացներով և պահեստով:

Վերջաբան

Սա, իհարկե, այն ամենը չէ, ինչ ես կցանկանայի պատմել Airflow-ի մասին, բայց ես փորձեցի ընդգծել հիմնական կետերը: Ախորժակը գալիս է ուտելուց, փորձեք և ձեզ դուր կգա :)

Source: www.habr.com

Добавить комментарий