Сәлеметсіз бе, мен Дмитрий Логвиненкомын - Vezet компаниялар тобының аналитика бөлімінің инженері.
Мен сізге ETL процестерін дамытуға арналған тамаша құрал - Apache Airflow туралы айтып беремін. Бірақ ауа ағыны соншалықты жан-жақты және көп қырлы, сондықтан сіз деректер ағынына қатыспасаңыз да, оны мұқият қарауыңыз керек, бірақ кез келген процестерді кезеңді түрде іске қосып, олардың орындалуын бақылау қажет.
Иә, мен айтып қана қоймай, көрсетемін: бағдарламада көптеген кодтар, скриншоттар мен ұсыныстар бар.
Google-де Airflow / Wikimedia Commons сөзін іздегенде әдетте көретін нәрсе
- тек жақсырақ және ол мүлде басқа мақсаттар үшін жасалған, атап айтқанда (кат алдында жазылғандай):
машиналарда шектеусіз тапсырмаларды орындау және бақылау (көптеген балдыркөк / Кубернете және сіздің ар-ұжданыңыз мүмкіндік береді)
Python кодын жазуға және түсінуге оңайдан динамикалық жұмыс ағынының генерациясымен
және дайын компоненттерді де, үйде жасалған плагиндерді де пайдалана отырып, кез келген дерекқорлар мен API интерфейстерін бір-бірімен қосу мүмкіндігі (бұл өте қарапайым).
Біз Apache Airflow қолданамыз:
біз әртүрлі көздерден деректерді жинаймыз (көптеген SQL Server және PostgreSQL даналары, қолданба өлшемдері бар әртүрлі API интерфейстері, тіпті 1C) DWH және ODS (бізде Vertica және Clickhouse бар).
қаншалықты озық cron, ол ODS бойынша деректерді біріктіру процестерін бастайды, сондай-ақ олардың сақталуын бақылайды.
Соңғы уақытқа дейін біздің қажеттіліктер 32 ядросы және 50 ГБ жедел жады бар бір шағын сервермен жабылды. Ауа ағынында бұл жұмыс істейді:
более 200 тг (шын мәнінде біз тапсырмаларды толтырған жұмыс процестері),
әрқайсысында орташа 70 тапсырма,
бұл жақсылық басталады (сонымен қатар орташа) сағатына бір рет.
Біз қалай кеңейткеніміз туралы мен төменде жазамын, бірақ енді біз шешетін über-проблемасын анықтайық:
Үш бастапқы SQL сервері бар, олардың әрқайсысында 50 дерекқор бар - бір жобаның даналары, сәйкесінше, олардың құрылымы бірдей (барлық жерде дерлік, муа-ха-ха), яғни әрқайсысында Тапсырыстар кестесі бар (бақытымызға орай, бұл кесте бар). атауды кез келген бизнеске енгізуге болады). Біз деректерді қызмет өрістерін (бастапқы сервер, бастапқы дерекқор, ETL тапсырма идентификаторы) қосу арқылы аламыз және оларды, айталық, Vertica-ға жібереміз.
Барайық!
Негізгі бөлім, практикалық (және аздап теориялық)
Неліктен біз (және сіз)
Ағаштар үлкен, мен қарапайым болған кезде SQLБір ресейлік бөлшек саудада -schik, біз қол жетімді екі құралды пайдаланып ETL процестерін, яғни деректер ағындарын алдап алдық:
Ақпараттық қуат орталығы - өте кең таралған жүйе, өте өнімді, өзінің аппараттық құралдары, өзіндік нұсқасы бар. Мен оның мүмкіндіктерінің 1% құдай сақтасын пайдаландым. Неліктен? Ең алдымен, 380-шы жылдардағы бұл интерфейс бізге ойша қысым жасады. Екіншіден, бұл контрацепция өте сәнді процестерге, компоненттерді ашулы қайта пайдалануға және басқа да өте маңызды-кәсіпорын-трюктерге арналған. Оның құны туралы, мысалы, Airbus AXNUMX қанаты / жылына, біз ештеңе айтпаймыз.
Сақ болыңыз, скриншот 30 жасқа толмаған адамдарға аздап зиян тигізуі мүмкін
SQL серверімен біріктіру сервері - біз бұл жолдасты жобаішілік ағындарымызда пайдаландық. Шындығында: біз SQL серверін қолданамыз және оның ETL құралдарын пайдаланбау қандай да бір негізсіз болар еді. Ондағы барлығы жақсы: интерфейс әдемі де, орындалу туралы есептер де ... Бірақ біз бағдарламалық өнімдерді жақсы көретініміз емес, ол үшін емес. Оның нұсқасы dtsx (сақтау кезінде араластырылған түйіндері бар XML дегеніміз) біз жасай аламыз, бірақ мұның мәні неде? Жүздеген кестелерді бір серверден екіншісіне апаратын тапсырмалар пакетін жасау туралы не айтуға болады? Иә, не жүз, сұқ саусағың жиырма бөліктен түсіп қалады, тышқанның түймесін басып. Бірақ ол әлдеқайда сәнді көрінеді:
Біз, әрине, шығу жолдарын іздедік. Іс тіпті дерлік өздігінен жазылған SSIS пакетінің генераторына келді ...
...сосын маған жаңа жұмыс табылды. Ал Apache Airflow мені басып озды.
Мен ETL процесінің сипаттамасы қарапайым Python коды екенін білгенде, мен жай ғана қуаныштан билемедім. Деректер ағындары осылайша нұсқаланды және дифференцияланды және жүздеген дерекқорлардан бір мақсатқа бір құрылымды кестелерді құю бір жарым немесе екі 13 ”экранда Python кодының мәселесі болды.
Кластерді құрастыру
Толық балабақшаны ұйымдастырмай-ақ қояйық және мұнда Airflow орнату, таңдаған дерекқор, балдыркөк және доктарда сипатталған басқа да жағдайлар туралы айтпай-ақ қояйық.
Тәжірибелерді бірден бастау үшін мен эскиз жасадым docker-compose.yml онда:
Шын мәнінде көтерейік Ауа шығыны: Жоспарлағыш, веб-сервер. Гүл де сол жерде балдыркөк тапсырмаларын бақылау үшін айналады (өйткені ол итерілген apache/airflow:1.10.10-python3.7, бірақ қарсы емеспіз)
PostgreSQL, онда Airflow өзінің қызмет ақпаратын (жоспарлаушы деректері, орындалу статистикасы, т.б.) жазады және балдыркөк аяқталған тапсырмаларды белгілейді;
Редис, ол балдыркөк үшін тапсырма брокері ретінде әрекет етеді;
Балдыркөк жұмысшысытапсырмаларды тікелей орындаумен айналысатын болады.
Қалтаға ./dags біз файлдарды дагтардың сипаттамасымен қосамыз. Оларды ұшқанда алып кетеді, сондықтан әрбір түшкіргеннен кейін бүкіл стекпен жонглёрлеудің қажеті жоқ.
Кейбір жерлерде мысалдардағы код толық көрсетілмеген (мәтінді шатастырмау үшін), бірақ бір жерде ол процесте өзгертілген. Толық жұмыс кодының мысалдарын репозиторийден табуға болады https://github.com/dm-logv/airflow-tutorial.
Композицияны құрастыруда мен көбіне белгілі образға сүйендім пукель/докер-ауа ағыны - міндетті түрде тексеріңіз. Мүмкін сізге өмірде басқа ештеңе керек емес шығар.
Барлық ауа ағыны параметрлері арқылы ғана емес қол жетімді airflow.cfg, сонымен қатар қоршаған ортаның айнымалы мәндері арқылы (әзірлеушілерге рахмет), мен оны қасақана пайдаландым.
Әрине, ол өндіріске дайын емес: мен әдейі жүрек соғысын контейнерлерге қоймадым, қауіпсіздікпен алаңдамадым. Бірақ мен экспериментшілерімізге қолайлы минимумды жасадым.
Ескертіп қой:
Dag қалтасы жоспарлаушыға да, жұмысшыларға да қолжетімді болуы керек.
Бұл барлық үшінші тарап кітапханаларына қатысты - олардың барлығы жоспарлаушы мен жұмысшылары бар машиналарға орнатылуы керек.
Енді бәрі қарапайым:
$ docker-compose up --scale worker=3
Барлығы көтерілгеннен кейін сіз веб-интерфейстерді көре аласыз:
Егер сіз осы «дагтардың» бәрінде ештеңе түсінбесеңіз, міне қысқаша сөздік:
Жоспарлағыш - Airflow-тағы ең маңызды ағай, роботтардың адам емес, қатты жұмыс істейтінін бақылайды: кестені бақылайды, деректерді жаңартады, тапсырмаларды іске қосады.
Жалпы, ескі нұсқаларда оның жадында проблемалар болды (жоқ, амнезия емес, ағып кетеді) және бұрынғы параметр тіпті конфигурацияларда қалды. run_duration — оның қайта іске қосылу аралығы. Бірақ қазір бәрі жақсы.
ГПДР (aka «dag») - «бағытталған ациклдік график», бірақ мұндай анықтама аз адамдарға айтады, бірақ іс жүзінде бұл бір-бірімен өзара әрекеттесетін тапсырмаларға арналған контейнер (төменде қараңыз) немесе SSIS ішіндегі пакеттің аналогы және Informatica жұмыс процесі .
Дагтардан басқа, субдагтар әлі де болуы мүмкін, бірақ біз оларға жете алмаймыз.
DAG Run - инициализацияланған даг, оған меншікті тағайындалады execution_date. Бір дагның даграндары параллель жұмыс істей алады (егер сіз өз тапсырмаларыңызды идемпотентті етіп жасасаңыз, әрине).
Оператор белгілі бір әрекетті орындауға жауапты код бөліктері. Операторлардың үш түрі бар:
іс-шараларбіздің сүйікті сияқты PythonOperator, ол кез келген (жарамды) Python кодын орындай алады;
аударымдеректерді бір жерден екінші жерге тасымалдайтын , айталық, MsSqlToHiveTransfer;
сенсор екінші жағынан, бұл оқиға орын алғанша әрекет етуге немесе дагның одан әрі орындалуын бәсеңдетуге мүмкіндік береді. HttpSensor көрсетілген соңғы нүктені тарта алады және қажетті жауап күткенде, тасымалдауды бастаңыз GoogleCloudStorageToS3Operator. Ізденімпаз сана: «Неге? Ақыр соңында, сіз қайталауды дәл операторда жасай аласыз!» Содан кейін, тоқтатылған операторлармен тапсырмалар пулын бітеп алмау үшін. Сенсор келесі әрекетке дейін іске қосылады, тексереді және өледі.
тапсырма - түріне қарамастан жарияланған және дагға бекітілген операторлар тапсырма дәрежесіне көтеріледі.
тапсырма данасы - бас жоспарлаушы тапсырмаларды орындаушы-жұмысшыларға жіберу уақыты келді деп шешкен кезде (егер қолданатын болсақ, дәл сол жерде LocalExecutor немесе жағдайда қашықтағы түйінге CeleryExecutor), ол оларға контекст тағайындайды (яғни, айнымалылар жиыны – орындау параметрлері), пәрмен немесе сұрау үлгілерін кеңейтеді және оларды біріктіреді.
Біз тапсырмаларды жасаймыз
Алдымен, біздің қамырдың жалпы схемасын белгілеп алайық, содан кейін біз егжей-тегжейлерге көбірек енеміз, өйткені біз кейбір тривиальды емес шешімдерді қолданамыз.
Осылайша, қарапайым түрде мұндай дагжа келесідей болады:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Келіңіздер,
Біріншіден, біз қажетті libs және импорттаймыз басқа нәрсе;
sql_server_ds Мүмкін List[namedtuple[str, str]] Airflow Connections жүйесіндегі қосылымдардың атаулары және біз пластинаны алатын дерекқорлармен;
dag - міндетті түрде болуы керек біздің дагымыздың хабарландыруы globals(), әйтпесе Airflow оны таба алмайды. Даг сонымен қатар мынаны айтуы керек:
оның аты кім orders - бұл атау содан кейін веб-интерфейсте пайда болады,
сегізінші шілдеде түн ортасынан бастап жұмыс істейтінін,
және ол шамамен әр 6 сағат сайын жұмыс істеуі керек (мұнда емес, қатал жігіттер үшін timedelta() рұқсат етілген cron-түзу 0 0 0/6 ? * * *, аз салқын үшін - сияқты өрнек @daily);
workflow() негізгі жұмысты атқарады, бірақ қазір емес. Әзірге біз контекстімізді журналға жібереміз.
Ал енді тапсырмаларды жасаудың қарапайым сиқыры:
біз өзіміздің дереккөздеріміз арқылы жүреміз;
инициализациялау PythonOperator, ол біздің манекенімізді орындайды workflow(). Тапсырманың бірегей (даг ішінде) атауын көрсетуді және дагтың өзін байлауды ұмытпаңыз. Жалау provide_context өз кезегінде функцияға қосымша аргументтер құйылады, біз оларды мұқият пайдаланып жинаймыз **context.
Әзірге бәрі осы. Бізде не бар:
веб-интерфейстегі жаңа дақ,
параллель орындалатын бір жарым жүз тапсырма (егер ауа ағыны, балдыркөк параметрлері және сервер сыйымдылығы рұқсат етсе).
Түсініп қалдым.
Тәуелділіктерді кім орнатады?
Осының бәрін жеңілдету үшін мен бұрылдым docker-compose.yml өңдеу requirements.txt барлық түйіндерде.
Енді кетті:
Сұр квадраттар - жоспарлаушы өңдейтін тапсырма даналары.
Біз біраз күтеміз, жұмысшылар тапсырмаларды орындайды:
Жасылдар, әрине, сәтті жұмыс істеді. Қызылдар өте сәтті емес.
Айтпақшы, біздің өнімде қалта жоқ ./dags, машиналар арасында синхрондау жоқ - барлық дагдар жатады git Gitlab жүйесінде және Gitlab CI біріктіру кезінде машиналарға жаңартуларды таратады master.
Гүл туралы аздап
Жұмысшылар біздің емізіктерді қағып жатқанда, бізге бірдеңе көрсете алатын тағы бір құралды еске түсірейік - Гүл.
Жұмысшы түйіндері туралы жиынтық ақпараты бар ең бірінші бет:
Жұмысқа кеткен тапсырмалары бар ең қарқынды бет:
Біздің брокер мәртебесі бар ең скучно бет:
Ең жарқын бет - тапсырма күйінің графиктері және олардың орындалу уақыты:
Біз аз жүктелгенді жүктейміз
Сонымен, барлық тапсырмалар орындалды, сіз жаралыларды алып кете аласыз.
Және көптеген жараланғандар болды - бір себептермен немесе басқа. Ауа ағыны дұрыс пайдаланылған жағдайда, дәл осы квадраттар деректердің анық келмегенін көрсетеді.
Журналды қарап, құлаған тапсырма даналарын қайта бастау керек.
Кез келген шаршыны басу арқылы біз қол жетімді әрекеттерді көреміз:
Сіз құлағандарды алып, Clear жасай аласыз. Яғни, біз ол жерде бірдеңе сәтсіз болғанын ұмытамыз және дәл сол даналық тапсырма жоспарлаушыға өтеді.
Мұны барлық қызыл квадраттармен тінтуірмен жасау өте адамгершілікке жатпайтыны анық - бұл біз Airflow-тан күткен нәрсе емес. Әрине, бізде жаппай қырып-жоятын қару бар: Browse/Task Instances
Барлығын бірден таңдап, нөлге қайтарайық, дұрыс элементті басыңыз:
Тазалаудан кейін біздің таксилер келесідей көрінеді (олар жоспарлаушының оларды жоспарлауын күтуде):
Қосылымдар, ілгектер және басқа айнымалылар
Келесі DAG-ға қараудың уақыты келді, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Барлығы есеп жаңартуын жасады ма? Бұл тағы да ол: деректерді қайдан алуға болатын көздердің тізімі бар; қоюға болатын тізім бар; бәрі болған немесе бұзылған кезде сигнал беруді ұмытпаңыз (жақсы, бұл біз туралы емес, жоқ).
Файлды қайта қарап шығып, жаңа түсініксіз нәрселерді қарастырайық:
from commons.operators import TelegramBotSendMessage - блоктан шығарылған хабарламаларды жіберуге арналған шағын қаптама жасау арқылы біз өзіміздің операторларымызды жасауға ештеңе кедергі келтірмейді. (Бұл оператор туралы төменде толығырақ айтатын боламыз);
default_args={} - dag өзінің барлық операторларына бірдей аргументтерді тарата алады;
to='{{ var.value.all_the_kings_men }}' - өріс to бізде қатты кодталмаған, бірақ Jinja көмегімен динамикалық түрде жасалған және мен мұқият енгізген электрондық пошталар тізімі бар айнымалы болады. Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS — операторды іске қосу шарты. Біздің жағдайда хат барлық тәуелділіктер шешілген жағдайда ғана бастықтарға ұшады сәтті;
tg_bot_conn_id='tg_main' - дәлелдер conn_id біз жасайтын қосылым идентификаторларын қабылдаңыз Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - Telegram-дағы хабарламалар орындалмаған тапсырмалар болған жағдайда ғана ұшып кетеді;
task_concurrency=1 - біз бір тапсырманың бірнеше тапсырма данасын бір уақытта іске қосуға тыйым саламыз. Әйтпесе, біз бір уақытта бірнеше ұшыруды аламыз VerticaOperator (бір үстелге қарап);
report_update >> [email, tg] - барлық VerticaOperator келесідей хаттар мен хабарламаларды жіберуде біріктіріңіз:
Бірақ хабарландырушы операторлардың іске қосу шарттары әртүрлі болғандықтан, тек біреуі ғана жұмыс істейді. Ағаш көрінісінде бәрі азырақ көрнекі көрінеді:
туралы бірнеше сөз айтайын макростар және олардың достары - айнымалылар.
Макростар әр түрлі пайдалы ақпаратты оператор дәлелдеріне алмастыра алатын Джиндя толтырғыштары болып табылады. Мысалы, келесідей:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} контекстік айнымалының мазмұнына дейін кеңейеді execution_date форматта YYYY-MM-DD: 2020-07-14. Ең жақсы бөлігі - контекстік айнымалы мәндер нақты тапсырма данасына (Ағаш көрінісіндегі шаршы) бекітіледі және қайта іске қосылғанда толтырғыштар бірдей мәндерге дейін кеңейеді.
Тағайындалған мәндерді әрбір тапсырма данасында Көрсетілген түймесі арқылы көруге болады. Хат жіберу тапсырмасы келесідей:
Сонымен, хабарлама жіберу тапсырмасында:
Ең соңғы қол жетімді нұсқа үшін кірістірілген макростардың толық тізімі мына жерден қолжетімді: макрос сілтемесі
Сонымен қатар, плагиндердің көмегімен біз өз макростарымызды жариялай аламыз, бірақ бұл басқа әңгіме.
Алдын ала анықталған нәрселерден басқа, біз айнымалыларымыздың мәндерін ауыстыра аламыз (мен оны жоғарыдағы кодта қолдандым). Құрайық Admin/Variables екі нәрсе:
қажетті кілтке жолды пайдаланыңыз: {{ var.json.bot_config.bot.token }}.
Мен бір сөзді айтып, бір скриншотты көрсетемін қосылымдар. Мұнда бәрі қарапайым: бетте Admin/Connections біз қосылым жасаймыз, логиндерді / парольдерді және нақты параметрлерді қосамыз. Бұл сияқты:
Құпиясөздер шифрлануы мүмкін (әдепкіге қарағанда мұқият) немесе қосылым түрін қалдыруға болады (мен жасағандай tg_main) - шын мәнінде, түрлердің тізімі Airflow үлгілерінде бекітілген және бастапқы кодтарға кірмей кеңейтілмейді (егер кенеттен мен Google-да бірдеңе жасамаған болсам, мені түзетіңіз), бірақ бізге несие алуымызға ештеңе кедергі болмайды. аты.
Сондай-ақ бір атпен бірнеше қосылымдар жасауға болады: бұл жағдайда әдіс BaseHook.get_connection(), ол бізге аты бойынша байланыстарды береді, береді кездейсоқ бірнеше есімдерден (Раунд Робин жасау қисындырақ болар еді, бірақ оны Airflow әзірлеушілерінің ар-ожданына қалдырайық).
Айнымалылар мен қосылымдар, әрине, керемет құралдар, бірақ тепе-теңдікті жоғалтпау маңызды: ағындарыңыздың қай бөліктерін кодтың өзінде сақтайсыз және қандай бөліктерді Airflow жүйесіне сақтау үшін бересіз. Бір жағынан, мәнді жылдам өзгерту, мысалы, пошта жәшігі, UI арқылы ыңғайлы болуы мүмкін. Екінші жағынан, бұл әлі де біз (мен) құтылғымыз келген тінтуірді басуға оралу.
Байланыстармен жұмыс – тапсырмалардың бірі ілгектер. Жалпы алғанда, Airflow ілмектері оны үшінші тарап қызметтері мен кітапханаларына қосуға арналған нүктелер болып табылады. Мысалыға, JiraHook Jira-мен әрекеттесу үшін клиентті ашады (тапсырмаларды алға және артқа жылжыта аласыз) және көмегімен SambaHook жергілікті файлды түртуге болады smb-нүкте.
Пайдаланушы операторын талдау
Біз оның қалай жасалғанын қарауға жақындадық TelegramBotSendMessage
код commons/operators.py нақты оператормен:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Мұнда, Airflow ішіндегі барлық басқа нәрселер сияқты, бәрі өте қарапайым:
Мұраға алынған BaseOperator, ол ауа ағынына қатысты бірнеше нәрсені жүзеге асырады (бос уақытыңызды қараңыз)
Жарияланған өрістер template_fields, онда Джинджа өңдеу үшін макростарды іздейді.
үшін дұрыс дәлелдерді ұйымдастырды __init__(), қажет жерде әдепкі мәндерді орнатыңыз.
Бабаны инициализациялауды да ұмытпадық.
Сәйкес ілгекті ашты TelegramBotHookодан клиенттік нысанды алды.
Қайта анықталған (қайта анықталған) әдіс BaseOperator.execute(), Операторды іске қосу уақыты келгенде Airfow дірілдейді - онда біз кіруді ұмытып, негізгі әрекетті орындаймыз. (Айтпақшы, біз кіреміз stdout и stderr - Ауа ағыны бәрін ұстап алады, оны әдемі орап, қажет жерде ыдыратады.)
Бізде не бар екенін көрейік commons/hooks.py. Ілмекпен бірге файлдың бірінші бөлігі:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Мен бұл жерде не түсіндірерімді білмеймін, тек маңызды тұстарды атап өтейін:
Біз мұра аламыз, дәлелдер туралы ойланамыз - көп жағдайда ол біреу болады: conn_id;
Стандартты әдістерді жоққа шығару: Мен өзімді шектедім get_conn(), онда мен қосылым параметрлерін аты бойынша аламын және жай ғана бөлімді аламын extra (бұл JSON өрісі), мен оған (өз нұсқауларым бойынша!) Telegram бот белгісін қойдым: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Мен біздің дананы жасаймын TelegramBot, оған белгілі бір белгі береді.
Осымен болды. Сіз ілмектен клиентті пайдалана аласыз TelegramBotHook().clent немесе TelegramBotHook().get_conn().
Файлдың екінші бөлігі, онда мен Telegram REST API үшін микроораманы жасаймын, осылайша сүйреп кетпеу үшін. python-telegram-bot бір әдіс үшін sendMessage.
Барлығын қосудың дұрыс жолы: TelegramBotSendMessage, TelegramBotHook, TelegramBot - плагинде жалпыға қолжетімді репозиторийге қойыңыз және оны Open Source-қа беріңіз.
Осының бәрін зерттеп жатқанда, біздің есеп жаңартулары сәтсіз аяқталды және маған арнада қате туралы хабар жіберді. Мен оның дұрыс емес екенін тексеру үшін барамын ...
Біздің итте бірдеңе бұзылды! Бұл біздің күткеніміз емес пе? Дәл!
Құйып бересіз бе?
Мен бір нәрсені сағындым деп ойлайсың ба? SQL Server-ден Vertica-ға мәліметтерді тасымалдауға уәде беріп, сосын алып, тақырыптан ауытқыған сияқты, масқара!
Бұл зұлымдық қасақана болды, мен сізге кейбір терминологияны шешуім керек болды. Енді сіз одан әрі қарай жүре аласыз.
Біздің жоспарымыз мынадай болды:
Жаса
Тапсырмаларды құру
Барлығы қаншалықты әдемі екенін қараңыз
Толтыруға сеанс нөмірлерін тағайындаңыз
SQL серверінен деректерді алыңыз
Деректерді Vertica ішіне қойыңыз
Статистиканы жинаңыз
Осының бәрін іске қосу үшін мен өзімізге шағын қосымша жасадым docker-compose.yml:
Хост ретінде Vertica dwh ең әдепкі параметрлермен,
SQL серверінің үш данасы,
біз соңғы деректер қорын кейбір деректермен толтырамыз (ешқандай жағдайда да қарастырмаңыз mssql_init.py!)
Біз барлық жақсылықты өткенге қарағанда біршама күрделі команданың көмегімен іске қосамыз:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Біздің керемет рандомизатор жасаған нәрсе, сіз элементті пайдалана аласыз Data Profiling/Ad Hoc Query:
Ең бастысы, оны талдаушыларға көрсетпеу керек
пысықтау ETL сеанстары Мен мұны істемеймін, ол жерде бәрі тривиальды: біз негіз жасаймыз, онда белгі бар, біз бәрін контекстік менеджермен орап аламыз, енді біз мұны істейміз:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Уақыт келді деректерімізді жинаймыз бір жарым жүз үстелімізден. Мұны өте қарапайым сызықтардың көмегімен жасайық:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Ілмектің көмегімен біз Airflow-тан аламыз pymssql-қосу
Сұранысқа күн түріндегі шектеуді ауыстырайық - ол шаблондық қозғалтқыш арқылы функцияға жіберіледі.
Біздің өтінішімізді тамақтандыру pandasбізді кім алады DataFrame - бұл болашақта бізге пайдалы болады.
Мен ауыстыруды қолданамын {dt} сұрау параметрінің орнына %s Мен зұлым Пиноккио болғандықтан емес, өйткені pandas көтере алмайды pymssql және соңғысын сырғытады params: Listол шынымен қаласа да tuple.
Сондай-ақ әзірлеушіге назар аударыңыз pymssql бұдан былай оны қолдамауды шешті, ал кетудің уақыты келді pyodbc.
Airflow біздің функцияларымыздың дәлелдерін немен толтырғанын көрейік:
Егер деректер болмаса, онда жалғастырудың қажеті жоқ. Бірақ толтыруды сәтті деп санау да біртүрлі. Бірақ бұл қате емес. А-а-а, не істеу керек?! Міне, мыналар:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException Airflow-қа қателер жоқ екенін айтады, бірақ біз тапсырманы өткізіп жібереміз. Интерфейс жасыл немесе қызыл шаршы емес, қызғылт болады.
Біздің су тасқыны сеансының идентификаторы (ол басқаша болады әрбір тапсырма үшін),
Дереккөзден және тапсырыс идентификаторынан алынған хэш - түпкілікті дерекқорда (бәрі бір кестеге құйылады) бізде бірегей тапсырыс идентификаторы болады.
Соңғы қадам қалады: барлығын Vertica-ға құйыңыз. Бір қызығы, мұны істеудің ең керемет және тиімді әдістерінің бірі - CSV арқылы!
Сатылымда біз мақсатты тақтаны қолмен жасаймыз. Мұнда мен өзіме шағын машинаға рұқсат бердім:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
пайдаланып жатырмын VerticaOperator() Мен дерекқор схемасын және кестені жасаймын (егер олар әлі жоқ болса, әрине). Ең бастысы - тәуелділіктерді дұрыс орналастыру:
– Жарайды, – деді кішкентай тышқан, – солай емес пе, енді
Менің ормандағы ең қорқынышты жануар екеніме сенімдісіз бе?
Джулия Дональдсон, The Gruffalo
Менің ойымша, егер менің әріптестерім мен менде бәсекелестік болса: ETL процесін нөлден бастап кім тез жасайды және іске қосады: олар SSIS және тінтуірмен және мен Airflow арқылы ... Содан кейін біз техникалық қызмет көрсетудің қарапайымдылығын салыстырар едік ... Уау, мен оларды барлық майданда жеңемін деп келісесіз деп ойлаймын!
Егер сәл байыпты болса, онда Apache Airflow - бағдарламалық код түріндегі процестерді сипаттау арқылы - менің жұмысымды жасады көп анағұрлым ыңғайлы және жағымды.
Оның қосылатын модульдер тұрғысынан да, ауқымдылыққа бейімділігі жағынан да шексіз кеңейту мүмкіндігі сізге ауа ағынын іс жүзінде кез келген салада пайдалануға мүмкіндік береді: тіпті деректерді жинаудың, дайындаудың және өңдеудің толық циклінде, тіпті зымырандарды ұшыру кезінде де (Марсқа, курс).
Қорытынды бөлім, анықтама және ақпарат
Біз сіз үшін жинаған тырма
start_date. Иә, бұл қазірдің өзінде жергілікті мем. Дагтың негізгі дәлелі арқылы start_date бәрі өтеді. Қысқаша айтқанда, егер сізде көрсетсеңіз start_date ағымдағы күн және schedule_interval - бір күні, содан кейін DAG ертең ерте емес басталады.
start_date = datetime(2020, 7, 7, 0, 1, 2)
Және басқа проблемалар жоқ.
Онымен байланысты басқа орындалу қатесі бар: Task is missing the start_date parameter, бұл көбінесе dag операторына байланыстыруды ұмытып кеткеніңізді көрсетеді.
Барлығы бір машинада. Иә, және негіздер (Airflow өзі және біздің жабынымыз), веб-сервер, жоспарлаушы және жұмысшылар. Және бұл тіпті жұмыс істеді. Бірақ уақыт өте келе қызметтерге арналған тапсырмалар саны өсті және PostgreSQL индекске 20 мс орнына 5 секундта жауап бере бастағанда, біз оны алып, алып кеттік.
Жергілікті орындаушы. Иә, біз әлі оның үстінде отырмыз, біз шыңыраудың шетіне жеттік. Осы уақытқа дейін LocalExecutor бізге жеткілікті болды, бірақ енді кем дегенде бір жұмысшымен кеңейту уақыты келді және біз CeleryExecutor-қа көшу үшін көп жұмыс істеуіміз керек. Сіз онымен бір машинада жұмыс істей алатыныңызды ескере отырып, балдыркөкті тіпті серверде де қолдануға ештеңе кедергі келтірмейді, ол «әрине, ешқашан өндіріске кірмейді!»
Қолданбау кіріктірілген құралдар:
Қосылымдар қызмет тіркелгі деректерін сақтау үшін,
SLA қателері уақытында орындалмаған тапсырмаларға жауап беру,
xcom метадеректер алмасу үшін (мен айттым метадеректер!) Dag тапсырмалары арасында.
Поштаны теріс пайдалану. Ал, мен не айта аламын? Құлаған тапсырмалардың барлық қайталануы үшін ескертулер орнатылды. Енді менің жұмысым Gmail-де Airflow-тан >90 мың электрондық пошта бар, ал веб-поштаның тұмсығы бір уақытта 100-ден астам алудан және жоюдан бас тартады.
Қолымызбен емес, басымызбен жұмыс істеуіміз үшін Airflow бізге мынаны дайындады:
REST API - ол әлі де Эксперименттік мәртебесіне ие, бұл оның жұмыс істеуіне кедергі келтірмейді. Оның көмегімен сіз дагдар мен тапсырмалар туралы ақпаратты алып қана қоймай, сонымен қатар дагды тоқтатуға/бастауға, DAG Run немесе пулды жасауға болады.
CLI - WebUI арқылы пайдалану ыңғайсыз ғана емес, әдетте жоқ көптеген құралдар пәрмен жолы арқылы қол жетімді. Мысалы:
backfill тапсырма даналарын қайта бастау үшін қажет.
Мәселен, сарапшылар келіп: «Ал, жолдас, 1-13 қаңтар аралығындағы деректерде бос сөз бар! Түзет, жөнде, жөнде, жөнде!» Ал сіз осындай плитасыз:
Негізгі қызмет: initdb, resetdb, upgradedb, checkdb.
run, бұл бір даналық тапсырманы орындауға және тіпті барлық тәуелділіктер бойынша ұпай алуға мүмкіндік береді. Сонымен қатар, сіз оны арқылы іске қоса аласыз LocalExecutor, тіпті сізде балдыркөк кластері болса да.
Дәл сол нәрсені жасайды test, тек негіздерде де ештеңе жазбайды.
connections қабықтан байланыстарды жаппай жасауға мүмкіндік береді.
python API - плагиндерге арналған және кішкентай қолдармен араласпаған өзара әрекеттесудің өте қиын әдісі. Бірақ бізге баруға кім кедергі жасайды /home/airflow/dags, жүгіріңіз ipython және араласа бастайсыз ба? Сіз, мысалы, келесі кодпен барлық қосылымдарды экспорттай аласыз:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Airflow метадеректер базасына қосылу. Мен оған жазуды ұсынбаймын, бірақ әртүрлі нақты көрсеткіштер үшін тапсырма күйлерін алу API интерфейстерінің кез келгеніне қарағанда әлдеқайда жылдам және оңай болуы мүмкін.
Айталық, біздің барлық тапсырмаларымыз идепотентті емес, бірақ олар кейде құлап кетуі мүмкін және бұл қалыпты жағдай. Бірақ бірнеше бітелулер қазірдің өзінде күдікті және тексеру қажет.
SQL-ден сақ болыңыз!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
сілтемелер
Әрине, Google шығарған алғашқы он сілтеме - бұл менің бетбелгілерімдегі Airflow қалтасының мазмұны.
Python Zen және Apache Airflow - жасырын DAG бағыттау, функцияларды контекстке жіберу, тағы да тәуелділіктер туралы, сондай-ақ тапсырмаларды іске қосуды өткізіп жіберу туралы.