Airflow – пакеттік деректерді өңдеу процестерін ыңғайлы және жылдам әзірлеуге және қолдауға арналған құрал

Airflow – пакеттік деректерді өңдеу процестерін ыңғайлы және жылдам әзірлеуге және қолдауға арналған құрал

Сәлем, Хабр! Бұл мақалада мен, мысалы, корпоративтік DWH немесе сіздің DataLake инфрақұрылымында пакеттік деректерді өңдеу процестерін дамытуға арналған тамаша құрал туралы айтқым келеді. Біз Apache Airflow (бұдан әрі - Airflow) туралы айтатын боламыз. Бұл Habré-де әділетсіз түрде назардан тыс қалды, және негізгі бөлімде мен сізді ETL/ELT процестері үшін жоспарлаушыны таңдаған кезде кем дегенде Airflow назар аударуға тұрарлық екеніне сендіруге тырысамын.

Бұрын мен Tinkoff банкінде жұмыс істеген кезде DWH тақырыбына бірқатар мақалалар жазған болатынмын. Қазір мен Mail.Ru Group командасының бір бөлігі болдым және ойын аймағында деректерді талдау платформасын әзірлеудемін. Шын мәнінде, жаңалықтар мен қызықты шешімдер пайда болған кезде, мен және менің командам осы жерде деректерді талдау платформасы туралы сөйлесетін боламыз.

Проглог

Сонымен, бастайық. Ауа ағыны дегеніміз не? Бұл кітапхана (немесе кітапханалар жиынтығы) жұмыс процестерін әзірлеу, жоспарлау және бақылау. Airflow негізгі ерекшелігі: Python коды процестерді сипаттау (дамыту) үшін пайдаланылады. Бұл сіздің жобаңызды және әзірлеуіңізді ұйымдастыру үшін көптеген артықшылықтарға ие: мәні бойынша, сіздің (мысалы) ETL жобаңыз жай ғана Python жобасы болып табылады және оны инфрақұрылымның ерекшеліктерін, команда өлшемін және басқа талаптар. Аспаптық тұрғыдан бәрі қарапайым. Мысалы, PyCharm + Git пайдаланыңыз. Бұл керемет және өте ыңғайлы!

Енді Airflow негізгі нысандарын қарастырайық. Олардың мәні мен мақсатын түсіну арқылы сіз процестің архитектурасын оңтайлы ұйымдастыра аласыз. Мүмкін, негізгі нысан Бағытталған циклдік график (бұдан әрі - DAG) болып табылады.

ГПДР

DAG - бұл белгілі бір кестеге сәйкес қатаң анықталған дәйектілікпен орындағыңыз келетін тапсырмаларыңыздың мағыналы бірлестігі. Airflow DAG және басқа нысандармен жұмыс істеу үшін ыңғайлы веб-интерфейсті қамтамасыз етеді:

Airflow – пакеттік деректерді өңдеу процестерін ыңғайлы және жылдам әзірлеуге және қолдауға арналған құрал

DAG келесідей болуы мүмкін:

Airflow – пакеттік деректерді өңдеу процестерін ыңғайлы және жылдам әзірлеуге және қолдауға арналған құрал

Әзірлеуші ​​DAG жобалау кезінде DAG ішіндегі міндеттер құрастырылатын операторлар жинағын белгілейді. Мұнда біз тағы бір маңызды нысанға келдік: Ауа ағынының операторы.

Операторлар

Оператор - тапсырма данасын орындау кезінде не болатынын сипаттайтын, оның негізінде жұмыс даналары жасалатын нысан. GitHub-тен ауа ағыны шығарылады қазірдің өзінде пайдалануға дайын операторлар жиынын қамтиды. Мысалдар:

  • BashOperator – bash командасын орындауға арналған оператор.
  • PythonOperator – Python кодын шақыру операторы.
  • EmailOperator — электрондық поштаны жіберу операторы.
  • HTTPOperator – http сұрауларымен жұмыс істеу операторы.
  • SqlOperator – SQL кодын орындауға арналған оператор.
  • Датчик – оқиғаны күту операторы (қажетті уақыттың келуі, қажетті файлдың пайда болуы, деректер қорындағы жол, API-дан жауап және т.б.).

Нақтырақ операторлар бар: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Сондай-ақ, сіз өзіңіздің сипаттамаларыңызға негізделген операторларды жасай аласыз және оларды жобаңызда пайдалана аласыз. Мысалы, біз MongoDBToHiveViaHdfsTransfer, құжаттарды MongoDB-тен Hive-ге экспорттауға арналған операторды және олармен жұмыс істеуге арналған бірнеше операторларды жасадық. кликхаус: CHLoadFromHiveOperator және CHTableLoaderOperator. Негізінде, жоба негізгі мәлімдемелерге негізделген кодты жиі пайдаланған кезде, оны жаңа мәлімдемеге құру туралы ойлануға болады. Бұл одан әрі дамуды жеңілдетеді және сіз жобадағы операторлар кітапханасын кеңейтесіз.

Әрі қарай, тапсырмалардың осы даналарының барлығын орындау керек, енді біз жоспарлаушы туралы айтатын боламыз.

Жоспарлаушы

Ауа ағынының тапсырмаларды жоспарлаушысы құрастырылған Балдыркөк. Балдыркөк кезекті ұйымдастыруға, сонымен қатар тапсырмалардың асинхронды және бөлінген орындалуына мүмкіндік беретін Python кітапханасы. Ауа ағыны жағында барлық тапсырмалар пулдарға бөлінген. Бассейндер қолмен жасалады. Әдетте, олардың мақсаты көзбен жұмыс істеу жүктемесін шектеу немесе DWH ішіндегі тапсырмаларды типтеу болып табылады. Бассейндерді веб-интерфейс арқылы басқаруға болады:

Airflow – пакеттік деректерді өңдеу процестерін ыңғайлы және жылдам әзірлеуге және қолдауға арналған құрал

Әрбір бассейнде слоттар санына шектеу бар. DAG құру кезінде оған пул беріледі:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

DAG деңгейінде анықталған пулды тапсырма деңгейінде қайта анықтауға болады.
Бөлек процесс, Жоспарлағыш, Airflow бағдарламасындағы барлық тапсырмаларды жоспарлауға жауап береді. Шын мәнінде, Жоспарлағыш тапсырмаларды орындау үшін орнатудың барлық механикасымен айналысады. Тапсырма орындалмас бұрын бірнеше кезеңнен өтеді:

  1. DAG-да алдыңғы тапсырмалар орындалды, жаңасын кезекке қоюға болады.
  2. Кезек тапсырмалардың басымдығына байланысты сұрыпталады (басымдықтарды да басқаруға болады) және бассейнде бос орын болса, тапсырманы іске қосуға болады.
  3. Тегін жұмысшы балдыркөк болса, тапсырма оған жіберіледі; мәселеде бағдарламалаған жұмыс сол немесе басқа операторды пайдалана отырып басталады.

Жеткілікті қарапайым.

Жоспарлағыш барлық DAG жиынында және DAG ішіндегі барлық тапсырмаларда жұмыс істейді.

Жоспарлағыш DAG-пен жұмыс істей бастау үшін DAG кестені орнатуы керек:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Дайын алдын ала орнатулар жиынтығы бар: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Сондай-ақ cron өрнектерін пайдалануға болады:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Орындау күні

Ауа ағынының қалай жұмыс істейтінін түсіну үшін DAG үшін Орындалу күні қандай екенін түсіну маңызды. Ауа ағынында DAG орындалу күні өлшемі бар, яғни DAG жұмыс кестесіне байланысты әрбір Орындалу күні үшін тапсырма даналары жасалады. Және әрбір Орындалу күні үшін тапсырмаларды қайта орындауға болады - немесе, мысалы, DAG бірнеше Орындалу Күндерінде бір уақытта жұмыс істей алады. Бұл жерде анық көрсетілген:

Airflow – пакеттік деректерді өңдеу процестерін ыңғайлы және жылдам әзірлеуге және қолдауға арналған құрал

Өкінішке орай (немесе бақытымызға орай: бұл жағдайға байланысты), егер DAG-та тапсырманың орындалуы түзетілсе, алдыңғы Орындау күніндегі орындау түзетулерді ескере отырып жалғасады. Бұл жаңа алгоритмді пайдаланып өткен кезеңдердегі деректерді қайта есептеу қажет болса жақсы, бірақ бұл жаман, себебі нәтиженің қайталану мүмкіндігі жоғалады (әрине, Git-тен бастапқы кодтың қажетті нұсқасын қайтару және не болатынын есептеу үшін ешкім сізді алаңдатпайды. сізге бір реттік қажет, қалай қажет болса).

Тапсырмаларды құру

DAG іске асырылуы Python-дағы код болып табылады, сондықтан бізде, мысалы, ұсақталған көздермен жұмыс істегенде, код көлемін азайтудың өте ыңғайлы әдісі бар. Дереккөз ретінде сізде үш MySQL үзіндісі бар делік, әрқайсысына кіріп, кейбір деректерді алу керек. Оның үстіне, тәуелсіз және параллельді. DAG ішіндегі Python коды келесідей болуы мүмкін:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG келесідей көрінеді:

Airflow – пакеттік деректерді өңдеу процестерін ыңғайлы және жылдам әзірлеуге және қолдауға арналған құрал

Бұл жағдайда жай ғана параметрлерді реттеу және DAG жаңарту арқылы үзіндіні қосуға немесе жоюға болады. Ыңғайлы!

Сондай-ақ, күрделірек кодты генерациялауды қолдануға болады, мысалы, дерекқор түріндегі көздермен жұмыс істеуге немесе кесте құрылымын, кестемен жұмыс істеу алгоритмін сипаттауға және DWH инфрақұрылымының ерекшеліктерін ескере отырып, процесті генерациялауға болады. жадыңызға N кестелерді жүктеу үшін. Немесе, мысалы, тізім түріндегі параметрмен жұмыс істеуге қолдау көрсетпейтін API-мен жұмыс істеу, сіз осы тізімнен DAG-та N тапсырмаларды жасай аласыз, API сұрауларының параллельділігін пулға шектей аласыз және API-ден қажетті деректер. Икемді!

репозиторий

Airflow-тың өзіндік серверлік репозиторийі, деректер қоры (MySQL немесе Postgres болуы мүмкін, бізде Postgres бар), ол тапсырмалар күйлерін, DAGs, қосылым параметрлері, жаһандық айнымалылар және т.б. сақтайды. Бұл жерде мен айта аламын: Airflow репозиторийі өте қарапайым (шамамен 20 кесте) және оның үстіне кез келген өзіңіздің процестеріңізді құрғыңыз келсе ыңғайлы. Informatica репозиторийіндегі 100500 XNUMX кесте есімде, сұрауды қалай құру керектігін түсінбес бұрын оны ұзақ уақыт бойы зерделеу керек болды.

Бақылау

Репозиторийдің қарапайымдылығын ескере отырып, сіз өзіңізге ыңғайлы тапсырманы бақылау процесін құра аласыз. Біз Zeppelin-де блокнотты қолданамыз, онда біз тапсырмалардың күйін қарастырамыз:

Airflow – пакеттік деректерді өңдеу процестерін ыңғайлы және жылдам әзірлеуге және қолдауға арналған құрал

Бұл сонымен қатар Airflow веб-интерфейсі болуы мүмкін:

Airflow – пакеттік деректерді өңдеу процестерін ыңғайлы және жылдам әзірлеуге және қолдауға арналған құрал

Airflow коды ашық бастапқы код, сондықтан біз Telegram-ға ескерту қостық. Тапсырманың әрбір іске қосылған данасы, егер қате орын алса, бүкіл әзірлеу және қолдау тобы бар Telegram-дағы топқа спам жібереді.

Біз Telegram арқылы жедел жауап аламыз (қажет болса) және Zeppelin арқылы біз Airflow бағдарламасындағы тапсырмалардың жалпы суретін аламыз.

Барлығы

Ауа ағыны негізінен ашық көз болып табылады және одан ғажайыптарды күтуге болмайды. Жұмыс істейтін шешімді құру үшін уақыт пен күш салуға дайын болыңыз. Мақсатқа жетуге болады, маған сеніңіз, бұл оған тұрарлық. Даму жылдамдығы, икемділік, жаңа процестерді қосудың қарапайымдылығы - бұл сізге ұнайды. Әрине, жобаны ұйымдастыруға, Ауа ағынының тұрақтылығына көп көңіл бөлу керек: кереметтер болмайды.

Қазір бізде Airflow күнделікті жұмыс істейді шамамен 6,5 мың тапсырма. Олар мінезі жағынан мүлдем басқаша. Көптеген әртүрлі және өте нақты көздерден негізгі DWH-ге деректерді жүктеу тапсырмалары бар, негізгі DWH ішіндегі дүкен сөрелерін есептеу тапсырмалары бар, деректерді жылдам DWH-ге жариялау тапсырмалары бар, көптеген, көптеген әртүрлі тапсырмалар бар - және ауа ағыны олардың барлығын күн сайын шайнайды. Санмен айтсақ, бұл 2,3 мың DWH (Hadoop) ішінде әртүрлі күрделіліктегі ELT тапсырмалары, шамамен. 2,5 жүз деректер базасы Дереккөздер, бұл команда 4 ETL әзірлеушілері, олар DWH жүйесінде ETL деректерді өңдеуге және DWH ішінде ELT деректерді өңдеуге бөлінеді және, әрине, т.б. бір админ, қызметтің инфрақұрылымымен кім айналысады.

Болашақ жоспарлары

Процестердің саны сөзсіз өсуде және біз Airflow инфрақұрылымы тұрғысынан жасайтын ең бастысы - масштабтау. Біз Airflow кластерін құрғымыз келеді, балдыркөк жұмысшылары үшін жұп аяқтарды бөлгіміз келеді және жұмысты жоспарлау процестері мен репозиторийі бар өздігінен көшірілетін бас жасағымыз келеді.

Эпилогия

Бұл, әрине, мен Airflow туралы айтқым келетін нәрсе емес, бірақ мен негізгі ойларды бөліп көрсетуге тырыстым. Тәбет тамақтанумен бірге келеді, көріңіз және сізге ұнайды :)

Ақпарат көзі: www.habr.com

пікір қалдыру