Ողջույն, ես Դմիտրի Լոգվինենկոն եմ՝ Vezet ընկերությունների խմբի Վերլուծության բաժնի տվյալների ինժեներ:
Ես ձեզ կպատմեմ ETL պրոցեսների մշակման հրաշալի գործիքի՝ Apache Airflow-ի մասին։ Բայց Airflow-ն այնքան բազմակողմանի և բազմակողմանի է, որ դուք պետք է ավելի ուշադիր նայեք դրան, նույնիսկ եթե դուք ներգրավված չեք տվյալների հոսքերի մեջ, այլ անհրաժեշտություն ունեք պարբերաբար գործարկել որևէ գործընթաց և վերահսկել դրանց կատարումը:
Եվ այո, ես ոչ միայն կասեմ, այլ նաև ցույց կտամ՝ ծրագիրն ունի շատ կոդ, սքրինշոթ և առաջարկություններ։
Այն, ինչ դուք սովորաբար տեսնում եք, երբ google-ում եք «Օդ հոսք» / Wikimedia Commons բառը
- միայն ավելի լավ, և այն պատրաստված է բոլորովին այլ նպատակների համար, մասնավորապես (ինչպես գրված է կատվի առաջ).
առաջադրանքներ գործարկել և վերահսկել անսահմանափակ թվով մեքենաների վրա (քանի որ շատ Celery/Kubernetes և ձեր խիղճը թույլ կտա ձեզ)
դինամիկ աշխատանքային հոսքի ստեղծմամբ՝ շատ հեշտ գրելու և հասկանալու Python կոդը
և ցանկացած տվյալների բազաներ և API-ներ միմյանց հետ կապելու հնարավորություն՝ օգտագործելով և՛ պատրաստի բաղադրիչները, և՛ տնական փլագինները (ինչը չափազանց պարզ է):
Մենք օգտագործում ենք Apache Airflow-ը այսպես.
մենք հավաքում ենք տվյալներ տարբեր աղբյուրներից (բազմաթիվ SQL Server և PostgreSQL օրինակներ, տարբեր API-ներ՝ կիրառական չափանիշներով, նույնիսկ 1C) DWH-ում և ODS-ում (մենք ունենք Vertica և Clickhouse):
որքան առաջադեմ cron, որը սկսում է տվյալների համախմբման գործընթացները ODS-ի վրա, ինչպես նաև վերահսկում է դրանց պահպանումը:
Մինչև վերջերս մեր կարիքները ծածկում էր մեկ փոքր սերվեր՝ 32 միջուկով և 50 ԳԲ օպերատիվ հիշողությամբ: Airflow-ում սա աշխատում է.
ավելի շատ 200 դագ (իրականում աշխատանքային հոսքեր, որոնցում մենք լրացրել ենք առաջադրանքներ),
յուրաքանչյուրում միջինը 70 առաջադրանք,
այս բարությունը սկսվում է (նաև միջին հաշվով) ժամը մեկ անգամ.
Իսկ այն մասին, թե ինչպես ենք մենք ընդլայնվել, ես կգրեմ ստորև, բայց հիմա եկեք սահմանենք über-խնդիրը, որը մենք կլուծենք.
Կան երեք աղբյուր SQL սերվերներ, որոնցից յուրաքանչյուրը ունի 50 տվյալների բազա՝ համապատասխանաբար մեկ նախագծի օրինակներ, նրանք ունեն նույն կառուցվածքը (գրեթե ամենուր, mua-ha-ha), ինչը նշանակում է, որ յուրաքանչյուրն ունի Պատվերների աղյուսակ (բարեբախտաբար, աղյուսակը դրանով): անունը կարող է մղվել ցանկացած բիզնեսի մեջ): Մենք վերցնում ենք տվյալները՝ ավելացնելով սպասարկման դաշտերը (աղբյուրի սերվեր, աղբյուրի տվյալների բազա, ETL առաջադրանքի ID) և միամտաբար նետում ենք դրանք, ասենք, Vertica:
Եկեք գնանք.
Հիմնական մասը՝ գործնական (և մի փոքր տեսական)
Ինչու է դա մեզ համար (և ձեզ համար)
Երբ ծառերը մեծ էին, իսկ ես պարզ SQLՌուսական մանրածախ առևտրի մեկում մենք խաբել ենք ETL գործընթացները՝ որպես տվյալների հոսքեր՝ օգտագործելով մեզ հասանելի երկու գործիքներ.
Informatica Power Center - չափազանց տարածվող համակարգ, չափազանց արդյունավետ, իր ապարատով, իր սեփական տարբերակով: Աստված մի արասցե դրա հնարավորությունների 1%-ն օգտագործեցի։ Ինչո՞ւ։ Դե, առաջին հերթին, այս ինտերֆեյսը, ինչ-որ տեղ 380-ականներից, մտավոր ճնշում էր մեզ վրա: Երկրորդ, այս գործիքը նախատեսված է չափազանց շքեղ գործընթացների, կատաղի բաղադրիչների վերաօգտագործման և այլ շատ կարևոր ձեռնարկատիրական հնարքների համար: Այն մասին, որ այն արժե, ինչպես Airbus AXNUMX-ի թևը / տարի, մենք ոչինչ չենք ասի:
Զգուշացեք, սքրինշոթը կարող է մի փոքր վնասել 30 տարեկանից ցածր մարդկանց
SQL Server ինտեգրման սերվեր -Այս ընկերոջը մենք օգտագործել ենք մեր ներնախագծային հոսքերում։ Դե, փաստորեն, մենք արդեն օգտագործում ենք SQL Server, և ինչ-որ կերպ անհիմն կլինի չօգտագործել նրա ETL գործիքները: Դրանում ամեն ինչ լավ է. և՛ ինտերֆեյսը գեղեցիկ է, և՛ առաջընթացի մասին հաշվետվություններ... Բայց սա չէ պատճառը, որ մենք սիրում ենք ծրագրային արտադրանք, ախ, ոչ դրա համար: Տարբերակ այն dtsx (որն է XML-ը, որտեղ հանգույցները խառնվում են պահպանման վրա) մենք կարող ենք, բայց ո՞րն է իմաստը: Ի՞նչ կասեք առաջադրանքների փաթեթ ստեղծելու մասին, որը հարյուրավոր աղյուսակներ կքաշի մի սերվերից մյուսը: Այո, ինչ հարյուր, ձեր ցուցամատը քսան կտորից կընկնի, սեղմելով մկնիկի կոճակը։ Բայց դա հաստատ ավելի նորաձև է թվում.
Մենք, անշուշտ, ելքեր էինք փնտրում։ Գործը նույնիսկ գրեթե եկել է ինքնուրույն գրված SSIS փաթեթի գեներատոր ...
…և հետո նոր աշխատանք գտա ինձ: Եվ Apache Airflow-ը գերազանցեց ինձ դրա վրա:
Երբ ես իմացա, որ ETL գործընթացի նկարագրությունները պարզ Python կոդ են, ես պարզապես ուրախությունից չպարեցի: Ահա թե ինչպես են տվյալների հոսքերը տարբերակվել և տարբերվել, և հարյուրավոր տվյալների բազաներից մեկ թիրախի մեջ մեկ կառուցվածքով աղյուսակներ լցնելը դարձել է Python-ի կոդի խնդիր մեկուկես կամ երկու 13” էկրաններում:
Կլաստերի հավաքում
Եկեք ամբողջությամբ մանկապարտեզ չկազմակերպենք և այստեղ չխոսենք բոլորովին ակնհայտ բաների մասին, օրինակ՝ տեղադրել Airflow-ը, ձեր ընտրած տվյալների բազան, Celery-ը և նավահանգիստներում նկարագրված այլ դեպքեր:
Որպեսզի մենք անմիջապես սկսենք փորձերը, ես ուրվագծեցի docker-compose.yml որի մեջ:
Եկեք իրականում բարձրացնենք ՕդափոխումըԺամանակացույց, Վեբսերվեր: Ծաղիկը նույնպես կպտտվի այնտեղ՝ վերահսկելու Նեխուրի առաջադրանքները (քանի որ այն արդեն դրված է apache/airflow:1.10.10-python3.7, բայց մենք դեմ չենք)
PostgreSQL, որում Airflow-ը կգրի իր սպասարկման տեղեկատվությունը (ժամանակացույցի տվյալներ, կատարման վիճակագրություն և այլն), իսկ Celery-ը կնշի կատարված առաջադրանքները.
Redis, որը հանդես կգա որպես առաջադրանքների բրոքեր Celery-ի համար;
Նեխուրի բանվոր, որը կզբաղվի առաջադրանքների անմիջական կատարմամբ։
Թղթապանակին ./dags մենք կավելացնենք մեր ֆայլերը դագերի նկարագրությամբ: Դրանք կվերցվեն անմիջապես, այնպես որ կարիք չկա ամեն փռշտալուց հետո ամբողջ կույտը ձեռնամուխ լինել:
Որոշ տեղերում օրինակների ծածկագիրը ամբողջությամբ չի ցուցադրվում (որպեսզի տեքստը չխառնվի), բայց ինչ-որ տեղ այն փոփոխվում է գործընթացում։ Աշխատանքային կոդի ամբողջական օրինակները կարելի է գտնել պահեստում https://github.com/dm-logv/airflow-tutorial.
Կոմպոզիցիայի հավաքման ժամանակ ես մեծապես ապավինում էի հայտնի կերպարին puckel/docker-airflow - Համոզվեք, որ ստուգեք այն: Միգուցե ձեր կյանքում ուրիշ ոչինչ պետք չէ:
Օդի հոսքի բոլոր կարգավորումները հասանելի են ոչ միայն միջոցով airflow.cfg, այլ նաև շրջակա միջավայրի փոփոխականների միջոցով (շնորհիվ մշակողների), որոնցից ես չարամտորեն օգտվեցի։
Բնականաբար, այն պատրաստ չէ արտադրությանը. ես միտումնավոր սրտի զարկեր չեմ դրել տարաների վրա, չեմ անհանգստացրել անվտանգությանը։ Բայց ես արեցի մեր փորձարարներին հարմար նվազագույնը։
Նշենք, որ.
Dag թղթապանակը պետք է հասանելի լինի ինչպես ժամանակացույցի, այնպես էլ աշխատողների համար:
Նույնը վերաբերում է բոլոր երրորդ կողմի գրադարաններին. դրանք բոլորը պետք է տեղադրվեն ժամանակացույցով և աշխատողներով մեքենաների վրա:
Դե, հիմա պարզ է.
$ docker-compose up --scale worker=3
Ամեն ինչ բարձրանալուց հետո կարող եք դիտել վեբ միջերեսները.
Եթե այս բոլոր «դագերում» ոչինչ չհասկացաք, ապա ահա կարճ բառարան.
Scheduler - Airflow-ի ամենակարևոր հորեղբայրը, որը վերահսկում է, որ ռոբոտները քրտնաջան աշխատեն, և ոչ թե մարդը.
Ընդհանրապես, հին տարբերակներում նա խնդիրներ ուներ հիշողության հետ (ոչ, ոչ թե ամնեզիա, այլ արտահոսք) և ժառանգության պարամետրը նույնիսկ մնաց կոնֆիգուրացիաներում: run_duration - դրա վերագործարկման միջակայքը: Բայց հիմա ամեն ինչ լավ է։
DAG (aka «dag») - «ուղղորդված ացիկլիկ գրաֆիկ», բայց նման սահմանումը քչերին կպատմի, բայց իրականում այն միմյանց հետ փոխազդող առաջադրանքների համար նախատեսված կոնտեյներ է (տես ստորև) կամ SSIS-ի փաթեթի և Informatica-ի աշխատանքային հոսքի անալոգը: .
Բացի դագերից, դեռ կարող են լինել ենթադասեր, բայց մենք, ամենայն հավանականությամբ, չենք հասնի դրանց:
DAG Run - սկզբնականացված դագ, որը նշանակված է իր սեփականը execution_date. Նույն դագի դագրանները կարող են զուգահեռ աշխատել (եթե ձեր առաջադրանքները, իհարկե, անիմաստ եք դարձրել):
օպերատոր կոդի կտորներ են, որոնք պատասխանատու են կոնկրետ գործողություն կատարելու համար: Օպերատորների երեք տեսակ կա.
գործողությունմեր սիրելիի նման PythonOperator, որը կարող է կատարել Python-ի ցանկացած (վավեր) կոդ;
փոխանցել, որոնք տեղափոխում են տվյալներ տեղից տեղ, ասենք. MsSqlToHiveTransfer;
սենսոր մյուս կողմից, դա ձեզ թույլ կտա արձագանքել կամ դանդաղեցնել դագի հետագա կատարումը, մինչև որևէ իրադարձություն տեղի ունենա: HttpSensor կարող է քաշել նշված վերջնակետը, և երբ ցանկալի պատասխանը սպասում է, սկսիր փոխանցումը GoogleCloudStorageToS3Operator. Հետաքրքրասեր միտքը կհարցնի. «Ինչո՞ւ. Ի վերջո, դուք կարող եք կրկնություններ անել հենց օպերատորում»: Եվ հետո, կասեցված օպերատորներով առաջադրանքների լողավազանը չխցանելու համար: Սենսորը սկսում է, ստուգում և մահանում է հաջորդ փորձից առաջ:
Խնդիր - հայտարարված օպերատորները, անկախ տեսակից, և կցված դագին, բարձրացվում են առաջադրանքի աստիճան:
առաջադրանքի օրինակ - երբ գլխավոր պլանավորողը որոշեց, որ ժամանակն է առաջադրանքները մարտ ուղարկելու կատարող-աշխատողներին (հենց տեղում, եթե մենք օգտագործում ենք. LocalExecutor կամ հեռավոր հանգույցի դեպքում CeleryExecutor), այն վերագրում է նրանց համատեքստ (այսինքն՝ փոփոխականների մի շարք՝ կատարման պարամետրեր), ընդլայնում է հրամանների կամ հարցումների ձևանմուշները և միավորում դրանք։
Մենք ստեղծում ենք առաջադրանքներ
Նախ, եկեք ուրվագծենք մեր խմորի ընդհանուր սխեման, այնուհետև ավելի ու ավելի կխորանանք մանրամասների մեջ, քանի որ մենք կիրառում ենք որոշ ոչ տրիվիալ լուծումներ:
Այսպիսով, իր ամենապարզ ձևով նման դագը կունենա հետևյալ տեսքը.
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Եկեք պարզենք դա.
Նախ, մենք ներմուծում ենք անհրաժեշտ լիբները և ինչ որ այլ բան;
sql_server_ds - Ից List[namedtuple[str, str]] Airflow Connections-ի միացումների անուններով և տվյալների բազաներով, որոնցից մենք կվերցնենք մեր ափսեը.
dag - մեր դագի հայտարարությունը, որն անպայման պետք է լինի globals(), հակառակ դեպքում Airflow-ը չի գտնի այն: Դագը նաև պետք է ասի.
ինչ է նրա անունը orders - այս անունը այնուհետև կհայտնվի վեբ ինտերֆեյսում,
որ նա աշխատելու է հուլիսի XNUMX-ի կեսգիշերից,
և այն պետք է աշխատի մոտավորապես 6 ժամը մեկ (այստեղ կոշտ տղաների փոխարեն timedelta() թույլատրելի cron- տող 0 0 0/6 ? * * *, ավելի քիչ զովի համար - նման արտահայտություն @daily);
workflow() հիմնական գործը կանի, բայց ոչ հիմա։ Առայժմ մենք պարզապես կթափենք մեր համատեքստը գրանցամատյանում:
Եվ հիմա առաջադրանքներ ստեղծելու պարզ կախարդանքը.
մենք անցնում ենք մեր աղբյուրներով.
սկզբնավորել PythonOperator, որը կկատարի մեր խաբեբայությունը workflow(). Մի մոռացեք նշել առաջադրանքի եզակի անվանումը (դագի սահմաններում) և կապել դագը: Դրոշ provide_context իր հերթին լրացուցիչ արգումենտներ կլցնի ֆունկցիայի մեջ, որոնք մենք զգուշորեն կհավաքենք՝ օգտագործելով **context.
Առայժմ այսքանը: Այն, ինչ մենք ստացանք.
նոր դաջվածք վեբ ինտերֆեյսում,
մեկուկես հարյուր առաջադրանք, որոնք կիրականացվեն զուգահեռ (եթե դա թույլ են տալիս Airflow, Celery կարգավորումները և սերվերի հզորությունը):
Դե, գրեթե հասկացա:
Ո՞վ է տեղադրելու կախվածությունները:
Այս ամբողջը պարզեցնելու համար ես պտտվեցի docker-compose.yml վերամշակում requirements.txt բոլոր հանգույցների վրա:
Հիմա այն գնացել է.
Մոխրագույն քառակուսիները առաջադրանքների օրինակներ են, որոնք մշակվում են ժամանակացույցի կողմից:
Մենք մի փոքր սպասում ենք, առաջադրանքները խզվում են աշխատողների կողմից.
Կանաչները, իհարկե, հաջողությամբ ավարտել են իրենց գործը։ Կարմիրներն այնքան էլ հաջող չեն:
Ի դեպ, մեր արդյունահանման վրա թղթապանակ չկա ./dagsՄեքենաների միջև համաժամացում չկա, բոլոր դաջերը գտնվում են git մեր Gitlab-ում, իսկ Gitlab CI-ն միաձուլվելիս թարմացումներ է բաժանում մեքենաներին master.
Մի փոքր Flower-ի մասին
Մինչ աշխատողները ծեծում են մեր ծծակները, եկեք հիշենք մեկ այլ գործիք, որը կարող է մեզ ինչ-որ բան ցույց տալ՝ Ծաղիկը:
Առաջին էջը աշխատող հանգույցների վերաբերյալ ամփոփ տեղեկություններով.
Ամենաինտենսիվ էջը առաջադրանքներով, որոնք գործի են անցել.
Մեր բրոքերի կարգավիճակով ամենաձանձրալի էջը.
Ամենապայծառ էջը առաջադրանքի կարգավիճակի գծապատկերներով և դրանց կատարման ժամանակով է.
Մենք բեռնում ենք թերբեռնվածը
Այսպիսով, բոլոր առաջադրանքները մշակված են, դուք կարող եք տանել վիրավորներին։
Ու կային բազմաթիվ վիրավորներ՝ այս կամ այն պատճառով։ Airflow-ի ճիշտ օգտագործման դեպքում այս նույն քառակուսիները ցույց են տալիս, որ տվյալները հաստատ չեն հասել։
Դուք պետք է դիտեք գրանցամատյանը և վերագործարկեք ձախողված առաջադրանքների դեպքերը:
Սեղմելով ցանկացած քառակուսի, մենք կտեսնենք մեզ հասանելի գործողությունները.
Դուք կարող եք վերցնել և մաքրել ընկածներին: Այսինքն, մենք մոռանում ենք, որ այնտեղ ինչ-որ բան ձախողվել է, և նույն օրինակի առաջադրանքը գնալու է ժամանակացույցին:
Հասկանալի է, որ մկնիկի հետ դա անելը բոլոր կարմիր քառակուսիներով այնքան էլ մարդասիրական չէ. սա այն չէ, ինչ մենք ակնկալում ենք Airflow-ից: Բնականաբար, մենք ունենք զանգվածային ոչնչացման զենք. Browse/Task Instances
Եկեք միանգամից ընտրենք ամեն ինչ և վերակայենք զրոյի, սեղմեք ճիշտ կետը.
Մաքրումից հետո մեր տաքսիներն այսպիսի տեսք ունեն (նրանք արդեն սպասում են, որ ժամանակացույցը պլանավորի).
Միացումներ, կեռիկներ և այլ փոփոխականներ
Ժամանակն է նայելու հաջորդ DAG-ին, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Բոլորը երբևէ կատարել են հաշվետվության թարմացում: Սա կրկին նա է. կա աղբյուրների ցանկ, որտեղից կարելի է ստանալ տվյալները. կա ցուցակ, որտեղ դնել; մի մոռացեք ձայն տալ, երբ ամեն ինչ տեղի ունեցավ կամ կոտրվեց (դե, սա մեր մասին չէ, ոչ):
Եկեք նորից անցնենք ֆայլի միջով և նայենք նոր անհասկանալի նյութերին.
from commons.operators import TelegramBotSendMessage - ոչինչ չի խանգարում մեզ ստեղծել մեր սեփական օպերատորները, ինչից մենք օգտվեցինք՝ պատրաստելով փոքրիկ փաթաթան Unblocked-ին հաղորդագրություններ ուղարկելու համար: (Այս օպերատորի մասին ավելին կխոսենք ստորև);
default_args={} - dag-ը կարող է նույն փաստարկները տարածել իր բոլոր օպերատորներին.
to='{{ var.value.all_the_kings_men }}' - դաշտ to մենք չենք ունենա կոշտ կոդավորված, այլ դինամիկ ձևով գեներացվել՝ օգտագործելով Jinja և փոփոխական՝ նամակների ցանկով, որը ես ուշադիր տեղադրել եմ Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS — օպերատորը գործարկելու պայման. Մեր դեպքում նամակը շեֆերին կթռչի միայն այն դեպքում, եթե բոլոր կախվածությունները լուծվեն հաջողությամբ;
tg_bot_conn_id='tg_main' - փաստարկներ conn_id ընդունել կապի ID-ները, որոնցում մենք ստեղծում ենք Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - Telegram-ի հաղորդագրությունները կթռչեն միայն այն դեպքում, եթե կան ձախողված առաջադրանքներ.
task_concurrency=1 - մենք արգելում ենք մեկ առաջադրանքի մի քանի առաջադրանքների միաժամանակյա գործարկումը: Հակառակ դեպքում մենք կստանանք մի քանիսի միաժամանակյա գործարկում VerticaOperator (նայելով մեկ սեղանին);
report_update >> [email, tg] - Բոլորը VerticaOperator զուգակցվում են նամակներ և հաղորդագրություններ ուղարկելիս, ինչպես հետևյալը.
Բայց քանի որ ծանուցող օպերատորները գործարկման տարբեր պայմաններ ունեն, միայն մեկը կաշխատի: Ծառի տեսքում ամեն ինչ մի փոքր ավելի քիչ տեսողական է թվում.
Մի քանի խոսք կասեմ դրա մասին մակրոներ և նրանց ընկերները - փոփոխականներ.
Մակրոները Jinja տեղապահներ են, որոնք կարող են տարբեր օգտակար տեղեկություններ փոխարինել օպերատորի փաստարկներով: Օրինակ, այսպես.
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} կընդլայնվի մինչև համատեքստի փոփոխականի բովանդակությունը execution_date ձեւաչափով YYYY-MM-DD: 2020-07-14. Լավագույնն այն է, որ համատեքստի փոփոխականները գամված են առաջադրանքի կոնկրետ օրինակին (քառակուսի ծառի տեսքում), և երբ վերագործարկվեն, տեղապահները կընդլայնվեն մինչև նույն արժեքները:
Նշանակված արժեքները կարելի է դիտել՝ օգտագործելով Rendered կոճակը յուրաքանչյուր առաջադրանքի օրինակում: Նամակ ուղարկելու առաջադրանքը հետևյալն է.
Եվ այսպես, հաղորդագրություն ուղարկելու առաջադրանքում.
Ներկառուցված մակրոների ամբողջական ցանկը վերջին հասանելի տարբերակի համար հասանելի է այստեղ. մակրո հղում
Ավելին, փլագինների օգնությամբ մենք կարող ենք հայտարարել մեր սեփական մակրոները, բայց դա այլ պատմություն է։
Բացի նախապես սահմանված բաներից, մենք կարող ենք փոխարինել մեր փոփոխականների արժեքները (ես արդեն օգտագործել եմ սա վերևի կոդում): Եկեք ստեղծենք ներսում Admin/Variables մի քանի բան.
պարզապես օգտագործեք ցանկալի բանալին տանող ճանապարհը. {{ var.json.bot_config.bot.token }}.
Ես բառացիորեն կասեմ մեկ բառ և ցույց կտամ մեկ սքրինշոթ մասին կապեր. Այստեղ ամեն ինչ տարրական է՝ էջում Admin/Connections մենք ստեղծում ենք կապ, այնտեղ ավելացնում ենք մեր մուտքերը/գաղտնաբառերը և ավելի կոնկրետ պարամետրեր: Սրա նման:
Գաղտնաբառերը կարող են գաղտնագրվել (ավելի մանրամասն, քան լռելյայն), կամ կարող եք բաց թողնել կապի տեսակը (ինչպես ես արեցի tg_main) - Փաստն այն է, որ տեսակների ցանկը ամրագրված է Airflow մոդելներում և չի կարող ընդլայնվել առանց սկզբնաղբյուրի կոդերի մեջ մտնելու (եթե հանկարծ ինչ-որ բան չեմ փնտրել google-ում, խնդրում եմ ուղղեք ինձ), բայց ոչինչ չի խանգարի մեզ միայն վարկեր ստանալ: Անուն.
Կարող եք նաև մի քանի կապ հաստատել նույն անունով՝ այս դեպքում՝ մեթոդը BaseHook.get_connection(), որը մեզ կապեր է ստանում անունով, կտա պատահական մի քանի անվանակիցներից (ավելի տրամաբանական կլիներ Round Robin անել, բայց թողնենք դա Airflow մշակողների խղճին):
Փոփոխականները և միացումները, իհարկե, հիանալի գործիքներ են, բայց կարևոր է չկորցնել հավասարակշռությունը. ձեր հոսքերի որ մասերն եք պահում հենց կոդի մեջ, և որ մասերը տրամադրում եք Airflow-ին պահեստավորման համար: Մի կողմից, արժեքը արագ փոխելը, օրինակ, փոստային արկղը, կարող է հարմար լինել UI-ի միջոցով: Մյուս կողմից, սա դեռ վերադարձ է մկնիկի սեղմմանը, որից մենք (ես) ուզում էինք ազատվել։
Կապերի հետ աշխատելը խնդիրներից մեկն է կեռիկներ. Ընդհանուր առմամբ, Airflow կեռիկները այն երրորդ կողմի ծառայություններին և գրադարաններին միացնելու կետեր են: Օրինակ, JiraHook մեզ համար հաճախորդ կբացի Ժիրայի հետ շփվելու համար (դուք կարող եք առաջադրանքները ետ ու առաջ տեղափոխել) և օգնությամբ SambaHook դուք կարող եք մղել տեղական ֆայլը smb- կետ.
Պատվերով օպերատորի վերլուծություն
Եվ մենք մոտեցանք նայելուն, թե ինչպես է այն պատրաստված TelegramBotSendMessage
Code commons/operators.py փաստացի օպերատորի հետ՝
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Այստեղ, ինչպես և ամեն ինչ Airflow-ում, ամեն ինչ շատ պարզ է.
ժառանգել է BaseOperator, որն իրականացնում է օդային հոսքին հատուկ բաներ (նայեք ձեր ժամանցին)
Հայտարարված դաշտեր template_fields, որում Ջինջան կփնտրի մակրոներ՝ մշակելու համար։
Դասավորեց ճիշտ փաստարկները __init__(), անհրաժեշտության դեպքում սահմանեք կանխադրվածները:
Չմոռացանք նաև նախահայրի սկզբնավորման մասին։
Բացել է համապատասխան կեռիկը TelegramBotHookդրանից ստացել է հաճախորդի օբյեկտ:
Վերագրանցված (վերասահմանված) մեթոդ BaseOperator.execute(), որը Airfow-ը կկտրվի, երբ գա օպերատորը գործարկելու ժամանակը, դրանում մենք կիրականացնենք հիմնական գործողությունը՝ մոռանալով մուտք գործել: (Մենք մուտք ենք գործում, ի դեպ, հենց ներս stdout и stderr - Օդային հոսքը կխափանի ամեն ինչ, գեղեցիկ կփաթաթի, անհրաժեշտության դեպքում կքայքայվի:)
Եկեք տեսնենք, թե ինչ ունենք commons/hooks.py. Ֆայլի առաջին մասը՝ ինքնին կեռիկով.
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Ես նույնիսկ չգիտեմ, թե ինչ բացատրել այստեղ, ես պարզապես նշեմ կարևոր կետերը.
Մենք ժառանգում ենք, մտածում փաստարկների մասին, շատ դեպքերում դա կլինի մեկը. conn_id;
Գերակայող ստանդարտ մեթոդներ. ես սահմանափակեցի ինձ get_conn(), որում ես ստանում եմ կապի պարամետրերը անունով և պարզապես ստանում եմ բաժինը extra (սա JSON դաշտ է), որում ես (ըստ իմ հրահանգների!) տեղադրել եմ Telegram բոտի նշանը. {"bot_token": "YOuRAwEsomeBOtToKen"}.
Ես ստեղծում եմ մեր օրինակը TelegramBot, դրան տալով կոնկրետ նշան։
Այսքանը: Դուք կարող եք հաճախորդ ստանալ մանգաղից, օգտագործելով TelegramBotHook().clent կամ TelegramBotHook().get_conn().
Եվ ֆայլի երկրորդ մասը, որում ես միկրոփաթաթում եմ պատրաստում Telegram REST API-ի համար, որպեսզի նույնը չքաշեմ python-telegram-bot մեկ մեթոդի համար sendMessage.
Ճիշտ ճանապարհը այս ամենը գումարելն է. TelegramBotSendMessage, TelegramBotHook, TelegramBot - plugin-ում, դրեք հանրային պահոց և տվեք այն Open Source-ին:
Մինչ մենք ուսումնասիրում էինք այս ամենը, մեր զեկույցի թարմացումները հաջողությամբ ձախողվեցին և ինձ ալիքում սխալ հաղորդագրություն ուղարկեցին: Ես պատրաստվում եմ ստուգել, թե արդյոք դա սխալ է ...
Ինչ-որ բան կոտրվեց մեր դոգում: Մի՞թե դա այն չէ, ինչ մենք սպասում էինք: Ճիշտ!
Դուք պատրաստվում եք լցնել:
Դուք զգում եք, որ ես ինչ-որ բան բաց եմ թողել: Կարծես խոստացել էր SQL Server-ից տվյալները փոխանցել Vertica, հետո վերցրեց ու թեմայից շեղեց, սրիկա՛։
Այս վայրագությունը միտումնավոր էր, ես պարզապես ստիպված էի վերծանել ձեզ համար ինչ-որ տերմինաբանություն: Այժմ կարող եք ավելի հեռուն գնալ:
Մեր ծրագիրը հետևյալն էր.
Դարձիր
Ստեղծեք առաջադրանքներ
Տեսեք, թե որքան գեղեցիկ է ամեն ինչ
Նշանակե՛ք նիստերի համարները լրացումներին
Ստացեք տվյալներ SQL Server-ից
Տեղադրեք տվյալները Vertica-ում
Հավաքել վիճակագրություն
Այսպիսով, այս ամենը գործի դնելու համար ես մի փոքր հավելում արեցի մեր docker-compose.yml:
Vertica որպես հյուրընկալող dwh առավել լռելյայն կարգավորումներով,
SQL Server-ի երեք օրինակ,
վերջիններիս շտեմարանները լրացնում ենք որոշ տվյալներով (ոչ մի դեպքում չդիտեք mssql_init.py!)
Մենք գործարկում ենք ամեն լավը մի փոքր ավելի բարդ հրամանի օգնությամբ, քան նախորդ անգամ.
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Այն, ինչ ստեղծել է մեր հրաշք պատահական սարքը, կարող եք օգտագործել նյութը Data Profiling/Ad Hoc Query:
Գլխավորը դա վերլուծաբաններին ցույց չտալն է
մանրամասնել ETL նիստեր Ես չեմ անի, այնտեղ ամեն ինչ մանրուք է. մենք հիմք ենք ստեղծում, դրա մեջ նշան կա, մենք ամեն ինչ փաթաթում ենք համատեքստի կառավարիչով, և հիմա մենք անում ենք սա.
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Ժամանակը եկել է հավաքել մեր տվյալները մեր մեկուկես հարյուր սեղաններից։ Եկեք դա անենք շատ ոչ հավակնոտ տողերի օգնությամբ.
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Կեռիկի օգնությամբ մենք ստանում ենք Airflow-ից pymssql- միացնել
Եկեք հարցման մեջ փոխարինենք ամսաթվի ձևով սահմանափակում. այն կներդրվի ֆունկցիայի մեջ կաղապարի շարժիչի կողմից:
Սնուցելով մեր խնդրանքը pandasով մեզ կբերի DataFrame - դա մեզ ապագայում օգտակար կլինի։
Ես օգտագործում եմ փոխարինում {dt} հարցման պարամետրի փոխարեն %s ոչ թե այն պատճառով, որ ես չար Պինոքիո եմ, այլ որովհետև pandas չի կարողանում գլուխ հանել pymssql և սայթաքում է վերջինը params: Listչնայած նա իսկապես ցանկանում է tuple.
Նաև նշեք, որ մշակողը pymssql որոշել է այլևս չաջակցել նրան, և ժամանակն է տեղափոխվել pyodbc.
Տեսնենք, թե Airflow-ն ինչով է լցրել մեր գործառույթների փաստարկները.
Եթե տվյալներ չկան, ուրեմն շարունակելն իմաստ չունի։ Բայց նաեւ տարօրինակ է լցոնումը հաջողված համարելը։ Բայց սա սխալ չէ։ A-ah-ah, ինչ անել?! Եվ ահա թե ինչ.
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException Airflow-ին կասի, որ սխալներ չկան, բայց մենք բաց ենք թողնում առաջադրանքը: Ինտերֆեյսը չի ունենա կանաչ կամ կարմիր քառակուսի, այլ վարդագույն:
Վաճառքի ժամանակ մենք ձեռքով ստեղծում ենք թիրախային ափսե: Այստեղ ես ինձ թույլ տվեցի փոքրիկ մեքենա.
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
ես օգտագործում եմ VerticaOperator() Ես ստեղծում եմ տվյալների բազայի սխեման և աղյուսակ (եթե դրանք արդեն չկան, իհարկե): Հիմնական բանը կախվածությունները ճիշտ դասավորելն է.
-Դե,- ասաց փոքրիկ մուկը,- չէ՞, հիմա
Համոզվա՞ծ ես, որ ես անտառի ամենասարսափելի կենդանին եմ։
Ջուլիա Դոնալդսոն, Գրուֆալոն
Կարծում եմ, եթե ես և իմ գործընկերները ունենայինք մրցակցություն. Վայ, կարծում եմ կհամաձայնեք, որ ես կհաղթեմ նրանց բոլոր ճակատներում։
Եթե մի փոքր ավելի լուրջ, ապա Apache Airflow-ը, գործընթացները նկարագրելով ծրագրի կոդի տեսքով, արեց իմ գործը շատ ավելի հարմարավետ և հաճելի:
Նրա անսահմանափակ ընդարձակելիությունը, ինչպես plug-ins-ի, այնպես էլ մասշտաբայնության նախատրամադրվածության առումով, հնարավորություն է տալիս օգտագործել Airflow գրեթե ցանկացած ոլորտում՝ նույնիսկ տվյալների հավաքագրման, պատրաստման և մշակման ամբողջական ցիկլում, նույնիսկ հրթիռների արձակման ժամանակ (դեպի Մարս, դասընթաց).
Մասի վերջնական, հղում և տեղեկատվություն
Փոցխը, որը մենք հավաքել ենք ձեզ համար
start_date. Այո, սա արդեն տեղական մեմ է: Դագի հիմնական փաստարկի միջոցով start_date բոլորն անցնում են. Հակիրճ, եթե նշեք start_date ընթացիկ ամսաթիվը, և schedule_interval -Մի օր, հետո ԴԱԳ-ն վաղը կսկսի ոչ շուտ։
start_date = datetime(2020, 7, 7, 0, 1, 2)
Եվ այլևս ոչ մի խնդիր:
Դրա հետ կապված գործարկման ժամանակի մեկ այլ սխալ կա. Task is missing the start_date parameter, որն ամենից հաճախ ցույց է տալիս, որ մոռացել եք կապել դագ օպերատորին։
Ամեն ինչ մեկ մեքենայի վրա: Այո, և հիմքեր (ինքն օդային հոսքը և մեր ծածկույթը), և վեբ սերվեր, և ժամանակացույց և աշխատողներ: Եվ դա նույնիսկ աշխատեց: Բայց ժամանակի ընթացքում ծառայությունների համար առաջադրանքների թիվն աճեց, և երբ PostgreSQL-ը սկսեց ինդեքսին արձագանքել 20 ms-ի փոխարեն 5 վայրկյանում, մենք վերցրեցինք այն և տարանք:
Տեղական կատարող. Այո, մենք դեռ նստած ենք դրա վրա, և արդեն հասել ենք անդունդի եզրին։ LocalExecutor-ը մեզ մինչ այժմ բավական էր, բայց հիմա ժամանակն է ընդլայնվելու առնվազն մեկ աշխատողով, և մենք պետք է շատ աշխատենք CeleryExecutor-ին անցնելու համար: Եվ հաշվի առնելով այն փաստը, որ դուք կարող եք աշխատել դրա հետ մեկ մեքենայի վրա, ոչինչ չի խանգարում ձեզ օգտագործել Celery նույնիսկ սերվերի վրա, որը «իհարկե, երբեք չի արտադրվի, ազնվորեն»:
Չօգտագործում ներկառուցված գործիքներ:
Կապեր սպասարկման հավատարմագրերը պահելու համար,
SLA Misses ժամանակին չստացված առաջադրանքներին արձագանքել,
xcom մետատվյալների փոխանակման համար (ես ասացի մետատվյալների!) dag առաջադրանքների միջև:
Փոստի չարաշահում. Դե ինչ ասեմ։ Զգուշացումներ են սահմանվել տապալված առաջադրանքների բոլոր կրկնությունների համար: Այժմ իմ աշխատանքային Gmail-ն ունի ավելի քան 90 հազար նամակներ Airflow-ից, և վեբ փոստի դնչկալը հրաժարվում է միաժամանակ վերցնել և ջնջել ավելի քան 100 նամակ:
Որպեսզի մենք ավելի շատ աշխատենք մեր գլխով և ոչ թե ձեռքերով, Airflow-ը մեզ համար պատրաստել է սա.
REST API -Նա դեռևս ունի Էքսպերիմենտալի կարգավիճակ, ինչը չի խանգարում նրան աշխատել։ Դրա միջոցով դուք կարող եք ոչ միայն տեղեկատվություն ստանալ դագերի և առաջադրանքների մասին, այլև դադարեցնել/սկսել դագը, ստեղծել DAG Run կամ լողավազան:
CLI - հրամանի տողի միջոցով հասանելի են բազմաթիվ գործիքներ, որոնք ոչ միայն անհարմար են WebUI-ի միջոցով օգտագործելու համար, այլ ընդհանրապես բացակայում են: Օրինակ:
backfill անհրաժեշտ է առաջադրանքների օրինակները վերագործարկելու համար:
Օրինակ, վերլուծաբաններ եկան ու ասացին. «Իսկ դու, ընկեր, հունվարի 1-ից 13-ի տվյալների մեջ հիմարություն ունես։ Շտկե՛ք, շտկե՛ք, շտկե՛ք, շտկե՛ք»։ Եվ դուք այդպիսի օջախ եք.
run, որը թույլ է տալիս կատարել մեկ օրինակով առաջադրանք և նույնիսկ միավորներ հավաքել բոլոր կախվածությունների վրա: Ավելին, դուք կարող եք այն գործարկել միջոցով LocalExecutor, նույնիսկ եթե դուք ունեք նեխուրի կլաստեր:
Գրեթե նույն բանն է անում test, միայն նաև հիմքերում ոչինչ չի գրում։
connections թույլ է տալիս զանգվածային կապեր ստեղծել պատյանից:
python api - շփվելու բավականին հարդքոր եղանակ, որը նախատեսված է պլագինների համար, այլ ոչ թե փոքրիկ ձեռքերով լցվելու մեջ: Բայց ով է խանգարելու մեզ գնալ /home/airflow/dags, վազել ipython ու սկսե՞ս խառնվել: Դուք կարող եք, օրինակ, արտահանել բոլոր կապերը հետևյալ կոդով.
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Միացում Airflow մետատվյալների բազային: Ես խորհուրդ չեմ տալիս գրել դրան, բայց տարբեր կոնկրետ չափումների համար առաջադրանքների վիճակներ ստանալը կարող է շատ ավելի արագ և հեշտ լինել, քան API-ներից որևէ մեկի օգտագործումը:
Ասենք, որ մեր բոլոր առաջադրանքները չէ, որ անզոր են, բայց դրանք երբեմն կարող են ընկնել, և դա նորմալ է։ Բայց մի քանի խցանումներ արդեն կասկածելի են, և հարկ կլինի ստուգել։
Զգուշացեք SQL-ից:
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
Սայլակ
Եվ իհարկե, Google-ի թողարկման առաջին տասը հղումները իմ էջանիշներից Airflow թղթապանակի բովանդակությունն են:
Python-ի և Apache օդային հոսքի Զենը - DAG-ի անուղղակի վերահասցեավորում, գործառույթների մեջ համատեքստի գցում, կրկին կախվածության մասին, ինչպես նաև առաջադրանքների մեկնարկից բաց թողնելու մասին: