Здраво, ја сам Дмитриј Логвиненко - инжењер података одељења аналитике групе компанија Везет.
Рећи ћу вам о дивном алату за развој ЕТЛ процеса - Апацхе Аирфлов. Али Аирфлов је толико разноврстан и вишеструк да би требало да га боље погледате чак и ако нисте укључени у токове података, али имате потребу да повремено покрећете било који процес и надгледате њихово извршење.
И да, не само да ћу рећи, већ и показати: програм има пуно кода, снимака екрана и препорука.

Оно што обично видите када прогуглате реч Аирфлов / Викимедиа Цоммонс
Преглед садржаја
Увод
Апацхе Аирфлов је као Дјанго:
- написан у питону
- постоји одличан админ панел,
- прошириво на неодређено време
- само боље, а направљен је за сасвим друге сврхе, наиме (како пише пре ката):
- покретање и надгледање задатака на неограниченом броју машина (колико ће вам Целери / Кубернетес и ваша савест дозволити)
- са динамичким генерисањем тока посла из Питхон кода који је врло једноставан за писање и разумевање
- и могућност међусобног повезивања било које базе података и АПИ-ја користећи и готове компоненте и домаће додатке (што је изузетно једноставно).
Користимо Апацхе Аирфлов овако:
- прикупљамо податке из различитих извора (много СКЛ Сервер и ПостгреСКЛ инстанци, разни АПИ-ји са метриком апликације, чак и 1Ц) у ДВХ и ОДС (имамо Вертица и Цлицкхоусе).
- колико напредно
cron, који покреће процесе консолидације података на ОДС-у, а такође прати њихово одржавање.
Наше потребе је донедавно покривао један мали сервер са 32 језгра и 50 ГБ РАМ-а. У Аирфлов-у ово функционише:
- више 200 дагс (заправо токови посла, у које смо ставили задатке),
- у сваком у просеку 70 задатака,
- ова доброта почиње (такође у просеку) једном на сат.
А о томе како смо се проширили, писаћу у наставку, али сада хајде да дефинишемо убер-проблем који ћемо решити:
Постоје три изворна СКЛ сервера, сваки са 50 база података – инстанце једног пројекта, респективно, имају исту структуру (скоро свуда, муа-ха-ха), што значи да сваки има табелу наруџби (на срећу, табелу са тим име се може угурати у било који посао). Узимамо податке додавањем сервисних поља (изворни сервер, изворна база података, ИД ЕТЛ задатка) и наивно их бацамо у, рецимо, Вертику.
Идемо!
Главни део, практичан (и мало теоријски)
Зашто ми (и ви)
Кад је дрвеће било велико, а ја једноставан SQL-сцхик у једној руској малопродаји, преварили смо ЕТЛ процесе или токове података користећи два алата која су нам доступна:
- Информатица Повер Центер - систем који се изузетно шири, изузетно продуктиван, са сопственим хардвером, сопственим верзијама. Користио сам не дај Боже 1% његових могућности. Зашто? Па, пре свега, овај интерфејс, негде из 380-их, психички је извршио притисак на нас. Друго, ова справа је дизајнирана за изузетно фенси процесе, бесну поновну употребу компоненти и друге веома важне трикове за предузећа. О томе колико кошта, попут крила Аирбус АXNUMX / године, нећемо ништа рећи.
Пазите, снимак екрана може мало да повреди људе млађе од 30 година

- СКЛ Сервер Интеграциони Сервер - користили смо овог друга у нашим токовима унутар пројекта. Па, у ствари: већ користимо СКЛ Сервер, и било би некако неразумно не користити његове ЕТЛ алате. Све у њему је добро: и интерфејс је леп, и извештаји о напретку... Али не волимо софтверске производе зато, ох, не због овога. Версион ит
dtsx(што је КСМЛ са измешаним чворовима при чувању) можемо, али у чему је поента? Шта кажете на израду пакета задатака који ће повући стотине табела са једног сервера на други? Да шта сто, отпашће ти кажипрст са двадесет комада, кликнувши на дугме миша. Али дефинитивно изгледа модерније:
Сигурно смо тражили излазе. Чак и случај скоро дошао до самог генератора ССИС пакета ...
…и онда ме је пронашао нови посао. И Апацхе Аирфлов ме је претекао на њему.
Када сам сазнао да су описи ЕТЛ процеса једноставан Питхон код, једноставно нисам плесао од радости. Овако су токови података верзионисани и диференцирани, а сипање табела са једном структуром из стотина база података у један циљ постало је ствар Питхон кода на један и по или два екрана од 13 инча.
Састављање кластера
Хајде да не уредимо комплетно обданиште, и да не причамо о потпуно очигледним стварима, попут инсталирања Аирфлов-а, одабране базе података, Целера и других случајева описаних у доковима.
Да бисмо одмах могли да почнемо са експериментима, скицирао сам docker-compose.yml у којима:
- Хајде да заправо подигнемо Проток ваздуха: Планер, Веб сервер. Цвет ће се такође окретати тамо да надгледа задатке целера (јер је већ гурнут
apache/airflow:1.10.10-python3.7, али нам не смета) - ПостгреСКЛ, у који ће Аирфлов уписати своје сервисне информације (податке о распореду, статистику извршења, итд.), а Целери ће означити завршене задатке;
- Редис, који ће деловати као посредник задатака за Целери;
- Радник целера, која ће бити ангажована на непосредном извршавању задатака.
- У фолдер
./dagsми ћемо додати наше датотеке са описом дагс. Они ће се покупити у ходу, тако да нема потребе да жонглирате целом гомилом након сваког кихања.
На неким местима код у примерима није у потпуности приказан (да не би затрпао текст), али се негде мења у процесу. Комплетни примери радног кода могу се наћи у спремишту .
доцкер-цомпосе.имл
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerНапомене:
- У склапању композиције у великој мери сам се ослањао на познату слику - обавезно проверите. Можда вам ништа више не треба у животу.
- Сва подешавања протока ваздуха су доступна не само преко
airflow.cfg, али и кроз променљиве окружења (захваљујући програмерима), које сам злонамерно искористио. - Наравно, није спреман за производњу: намерно нисам стављао откуцаје срца на контејнере, нисам се бавио обезбеђењем. Али урадио сам минимум прикладан за наше експериментаторе.
- Напоменути да:
- Фасцикла даг мора бити доступна и планеру и радницима.
- Исто важи и за све библиотеке независних произвођача – све оне морају бити инсталиране на машинама са планером и радницима.
Па, сада је једноставно:
$ docker-compose up --scale worker=3Након што се све подигне, можете погледати веб интерфејсе:
- Проток ваздуха:
- Цвет:
Основни концепти
Ако ништа нисте разумели у свим овим „даговима“, ево кратког речника:
- Планер - најважнији ујак у Аирфлов-у, који контролише да роботи напорно раде, а не особа: прати распоред, ажурира дагове, покреће задатке.
Генерално, у старијим верзијама, имао је проблема са меморијом (не, не амнезија, већ цурење) и легаци параметар је чак остао у конфигурацијама
run_duration— његов интервал поновног покретања. Али сада је све у реду. - ДАГ (ака "даг") - "усмерени ациклични граф", али таква дефиниција ће рећи мало људи, али у ствари је то контејнер за задатке који међусобно комуницирају (погледајте доле) или аналог пакета у ССИС-у и тока рада у Информатици .
Поред дагова, можда још постоје поддагови, али до њих највероватније нећемо доћи.
- ДАГ Рун - иницијализовани даг, коме је додељен сопствени
execution_date. Дагранови истог дага могу радити паралелно (ако сте своје задатке учинили идемпотентним, наравно). - оператор су делови кода одговорни за обављање одређене радње. Постоје три типа оператора:
- акцијакао наш омиљени
PythonOperator, који може да изврши било који (важећи) Питхон код; - пренос, који преносе податке од места до места, нпр.
MsSqlToHiveTransfer; - сензор с друге стране, омогућиће вам да реагујете или успорите даље извршавање даг-а док се не догоди неки догађај.
HttpSensorможе повући наведену крајњу тачку, а када жељени одговор сачека, започните преносGoogleCloudStorageToS3Operator. Радознали ум ће питати: „Зашто? На крају крајева, можете да радите понављања директно у оператеру!“ А онда, да не би закрчили базен задатака суспендованим оператерима. Сензор се покреће, проверава и угаси пре следећег покушаја.
- акцијакао наш омиљени
- Задатак - декларисани оператери, без обзира на врсту, и прикључени на даг, унапређују се у ранг задатка.
- инстанца задатка - када је генерални планер одлучио да је време да се задаци пошаљу у борбу на извођаче-раднике (одмах на лицу места, ако користимо
LocalExecutorили удаљеном чвору у случајуCeleryExecutor), додељује им контекст (тј. скуп променљивих - параметара извршења), проширује шаблоне команди или упита и обједињује их.
Ми генеришемо задатке
Прво, хајде да оцртамо општу шему нашег доуга, а затим ћемо све више урањати у детаље, јер примењујемо нека нетривијална решења.
Дакле, у свом најједноставнијем облику, такав даг ће изгледати овако:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Хајде да схватимо:
- Прво увозимо потребне библиотеке и нешто друго;
sql_server_ds- ЈеList[namedtuple[str, str]]са називима веза из Аирфлов Цоннецтионс и базама података из којих ћемо узети нашу плочу;dag- саопштење нашег дага, које обавезно мора бити уglobals(), иначе га Аирфлов неће пронаћи. Доуг такође треба да каже:- како се он зове
orders- ово име ће се тада појавити у веб интерфејсу, - да ће осмог јула радити од поноћи,
- и требало би да ради, отприлике сваких 6 сати (за јаке момке овде уместо
timedelta()дозвољеноcron-лине0 0 0/6 ? * * *, за мање кул - израз као@daily);
- како се он зове
workflow()обавиће главни посао, али не сада. За сада ћемо само избацити наш контекст у дневник.- А сада једноставна магија креирања задатака:
- пролазимо кроз наше изворе;
- иницијализовати
PythonOperator, који ће извршити нашу луткуworkflow(). Не заборавите да наведете јединствено (унутар дага) име задатка и вежите сам даг. Заставаprovide_contextзаузврат ће у функцију улити додатне аргументе, које ћемо пажљиво прикупити користећи**context.
За сада, то је све. Шта имамо:
- нови даг у веб интерфејсу,
- стотину и по задатака који ће се извршавати паралелно (ако то дозвољавају Аирфлов, Целери подешавања и капацитет сервера).
Па, скоро сам схватио.

Ко ће инсталирати зависности?
Да поједноставим целу ову ствар, зезнуо сам docker-compose.yml обрада requirements.txt на свим чворовима.
сада га нема:

Сиви квадрати су инстанце задатака које обрађује планер.
Чекамо мало, задатке похватају радници:

Зелени су, наравно, успешно завршили посао. Црвени нису баш успешни.
Иначе, на нашем производу нема фасцикле
./dags, нема синхронизације између машина - сви дагови леже уgitна нашем Гитлабу, а Гитлаб ЦИ дистрибуира ажурирања машинама приликом спајањаmaster.
Мало о цвету
Док нам радници млатарају цуцле, сетимо се још једног алата који нам нешто може показати – Цвета.
Прва страница са сажетим информацијама о радничким чворовима:

Најинтензивнија страница са задацима који су успели:

Најдосаднија страница са статусом нашег брокера:

Најсјајнија страница је са графиконима статуса задатака и временом њиховог извршавања:

Учитавамо недовољно оптерећење
Дакле, сви задаци су успели, можете да однесете рањене.

А рањених је било много – из ових или оних разлога. У случају правилне употребе Аирфлов-а, управо ови квадрати указују на то да подаци дефинитивно нису стигли.
Морате да погледате дневник и поново покренете пале инстанце задатака.
Кликом на било који квадрат, видећемо акције које су нам доступне:

Можете узети и очистити пале. То јест, заборављамо да је тамо нешто покварено, а исти задатак инстанце ће ићи у планер.

Јасно је да ово са мишем са свим црвеним квадратима није баш хумано - то није оно што очекујемо од Аирфлов-а. Наравно, имамо оружје за масовно уништење: Browse/Task Instances

Одаберимо све одједном и вратимо на нулу, кликните на тачну ставку:

Након чишћења наши таксији изгледају овако (већ чекају да их распореди распоред):

Везе, куке и друге варијабле
Време је да погледамо следећи ДАГ, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Да ли су сви икада урадили ажурирање извештаја? Ово је опет она: постоји списак извора одакле се добијају подаци; постоји листа где да се стави; не заборавите да затрубите када се све догодило или покварило (па, не ради се о нама, не).
Хајде да поново прођемо кроз фајл и погледамо нове нејасне ствари:
from commons.operators import TelegramBotSendMessage- ништа нас не спречава да направимо сопствене оператере, што смо искористили тако што смо направили мали омот за слање порука у Унблоцкед. (О овом оператеру ћемо више говорити у наставку);default_args={}- даг може дистрибуирати исте аргументе свим својим оператерима;to='{{ var.value.all_the_kings_men }}'- пољеtoнећемо имати хардкодиран, већ динамички генерисан користећи Јиња и променљиву са листом имејлова, коју сам пажљиво унеоAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— услов за покретање оператера. У нашем случају, писмо ће долетети до шефова само ако су све зависности решене успешно;tg_bot_conn_id='tg_main'- аргументиconn_idприхватите ИД-ове везе у којима креирамоAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- поруке у Телеграму ће одлетети само ако има палих задатака;task_concurrency=1- забрањујемо истовремено покретање више инстанци задатка једног задатка. У супротном, добићемо истовремено лансирање неколикоVerticaOperator(гледа у један сто);report_update >> [email, tg]- свеVerticaOperatorконвергирају у слању писама и порука, овако:

Али пошто оператери обавештавача имају различите услове покретања, само један ће радити. У приказу дрвета све изгледа мало мање визуелно:

Рећи ћу неколико речи о мацрос и њихови пријатељи - Променљиве.
Макрои су Јиња чувари места који могу заменити различите корисне информације у аргументима оператора. На пример, овако:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} прошириће се на садржај променљиве контекста execution_date у формату YYYY-MM-DD: 2020-07-14. Најбољи део је то што су променљиве контекста приковане за одређену инстанцу задатка (квадрат у приказу стабла), а када се поново покрене, чувари места ће се проширити на исте вредности.
Додељене вредности се могу видети помоћу дугмета Рендеред на свакој инстанци задатка. Овако је задатак са слањем писма:

И тако на задатку са слањем поруке:

Комплетна листа уграђених макроа за најновију доступну верзију доступна је овде:
Штавише, уз помоћ додатака можемо декларисати сопствене макрое, али то је друга прича.
Поред унапред дефинисаних ствари, можемо да заменимо вредности наших променљивих (ја сам то већ користио у коду изнад). Хајде да стварамо Admin/Variables пар ствари:

Све што можете да користите:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Вредност може бити скалар, или такође може бити ЈСОН. У случају ЈСОН-а:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}само користите путању до жељеног кључа: {{ var.json.bot_config.bot.token }}.
Буквално ћу рећи једну реч и показати један снимак екрана везе. Овде је све елементарно: на страници Admin/Connections креирамо везу, додајемо наше логин / лозинке и конкретније параметре тамо. Овако:

Лозинке се могу шифровати (темељније од подразумеваних) или можете изоставити тип везе (као што сам ја урадио за tg_main) - чињеница је да је листа типова уграђена у Аирфлов моделе и да се не може проширити без уласка у изворне кодове (ако одједном нисам нешто прогуглао, исправите ме), али ништа нас неће спречити да добијемо кредите само тако што ћете име.
Такође можете направити неколико веза са истим именом: у овом случају метод BaseHook.get_connection(), који нам даје везе по имену, даће насумично од неколико имењака (било би логичније направити Роунд Робин, али оставимо то на савести програмера Аирфлов-а).
Променљиве и везе су свакако кул алати, али је важно да не изгубите равнотежу: које делове својих токова чувате у самом коду, а које делове дајете Аирфлов-у на складиштење. С једне стране, брза промена вредности, на пример, поштанско сандуче, може бити згодна преко корисничког интерфејса. С друге стране, ово је још увек повратак на клик мишем, којег смо (ја) желели да се отарасимо.
Рад са везама је један од задатака куке. Уопштено говорећи, куке за Аирфлов су тачке за повезивање са услугама и библиотекама трећих страна. На пример, JiraHook ће нам отворити клијент за интеракцију са Јира (можете да померате задатке напред-назад), и уз помоћ SambaHook можете да гурнете локалну датотеку у smb-тачка.
Рашчлањивање прилагођеног оператора
И приближили смо се томе да погледамо како је направљен TelegramBotSendMessage
Код commons/operators.py са стварним оператером:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Овде, као и све остало у Аирфлов-у, све је врло једноставно:
- Наслеђено од
BaseOperator, који имплементира доста ствари специфичних за Аирфлов (погледајте у слободно време) - Декларисана поља
template_fields, у којем ће Јиња тражити макрое за обраду. - Сложио праве аргументе за
__init__(), подесите подразумеване вредности где је потребно. - Нисмо заборавили ни на иницијализацију претка.
- Отворио одговарајућу куку
TelegramBotHookод њега примио објекат клијента. - Замењени (редефинисани) метод
BaseOperator.execute(), који ће Аирфов трзати када дође време за покретање оператера - у њему ћемо спровести главну акцију, заборављајући да се пријавимо. (Успут, пријављујемо се одмахstdoutиstderr- Проток ваздуха ће све пресрести, лепо умотати, разложити где је потребно.)
Хајде да видимо шта имамо commons/hooks.py. Први део датотеке, са самом куком:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientНе знам ни шта да објасним овде, само ћу приметити важне тачке:
- Наслеђујемо, размислите о аргументима - у већини случајева то ће бити један:
conn_id; - Превазилажење стандардних метода: ограничио сам се
get_conn(), у којој добијам параметре везе по имену и само добијам одељакextra(ово је ЈСОН поље), у које сам (према сопственим упутствима!) ставио токен Телеграм бота:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Ја стварам инстанцу нашег
TelegramBot, дајући му одређени токен.
То је све. Можете добити клијента са куке користећи TelegramBotHook().clent или TelegramBotHook().get_conn().
И други део фајла, у коме правим микро-папер за Телеграм РЕСТ АПИ, да не бих вукао исти за једну методу sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Исправан начин је да се све сабере:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- у додатку, ставите у јавно спремиште и дајте га Опен Соурце-у.
Док смо све ово проучавали, ажурирања наших извештаја су успела да пропадну и да ми на каналу пошаље поруку о грешци. Идем да проверим да ли није у реду...

Нешто је пукло у нашем дужду! Зар то није оно што смо очекивали? Баш тако!
Хоћеш ли да сипаш?
Да ли осећаш да сам нешто пропустио? Изгледа да је обећао да ће пренети податке са СКЛ сервера на Вертику, а онда је узео и скренуо са теме, нитков!
Ово зверство је било намерно, једноставно сам морао да вам дешифрујем неку терминологију. Сада можете ићи даље.
Наш план је био следећи:
- До даг
- Генеришите задатке
- Видите како је све лепо
- Доделите бројеве сесија попуњавању
- Добијте податке са СКЛ Сервера
- Ставите податке у Вертица
- Прикупите статистику
Дакле, да бих све ово покренуо, направио сам мали додатак нашем docker-compose.yml:
доцкер-цомпосе.дб.имл
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyТамо подижемо:
- Вертица као домаћин
dwhса највећим подразумеваним подешавањима, - три инстанце СКЛ Сервера,
- попуњавамо базе података у последњем неким подацима (ни у ком случају не гледајте
mssql_init.py!)
Све добро покрећемо уз помоћ мало компликованије команде него прошли пут:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Оно што је наш чудесни рандомизер генерисао, можете користити ставку Data Profiling/Ad Hoc Query:

Главна ствар је да то не покажете аналитичарима
Појасни ЕТЛ сесије Нећу, тамо је све тривијално: направимо базу, у њој је знак, све умотамо у контекст менаџер, а сада радимо ово:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15сессион.пи
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passДошло је време прикупљамо наше податке са наших сто и по столова. Урадимо то уз помоћ врло непретенциозних линија:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Уз помоћ куке добијамо од Аирфлов-а
pymssql-цоннецт - Заменимо ограничење у облику датума у захтеву - то ће бити убачено у функцију од стране машине за шаблоне.
- Храњење нашег захтева
pandasко ће нас добитиDataFrame- биће нам од користи у будућности.
Користим замену
{dt}уместо параметра захтева%sне зато што сам зао Пинокио, већ зато штоpandasне могу да поднесуpymssqlи измиче последњиparams: Listиако заиста желиtuple.
Такође имајте на уму да програмерpymssqlодлучио да га више не подржава, и време је да пређемо наpyodbc.
Хајде да видимо чиме је Аирфлов напунио аргументе наших функција:

Ако нема података, онда нема сврхе да се наставља. Али такође је чудно сматрати пуњење успешним. Али ово није грешка. А-а-ах, шта да се ради?! А ево шта:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException каже Аирфлов-у да нема грешака, али ми прескачемо задатак. Интерфејс неће имати зелени или црвени квадрат, већ розе.
Хајде да бацимо наше податке више колона:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Наиме
- База података из које смо преузели наруџбине,
- ИД наше флоодинг сесије (биће другачији за сваки задатак),
- Хеш из извора и ИД поруџбине - тако да у коначној бази података (где се све сипа у једну табелу) имамо јединствени ИД поруџбине.
Остаје претпоследњи корак: сипајте све у Вертику. И, што је чудно, један од најспектакуларнијих и најефикаснијих начина да се то уради је преко ЦСВ-а!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Правимо посебан пријемник
StringIO. pandasљубазно ће ставити нашеDataFrameу обликуCSV-линије.- Отворимо везу са нашом омиљеном Вертицом помоћу куке.
- А сада уз помоћ
copy()пошаљите наше податке директно у Вертику!
Од возача ћемо узети колико је редова попуњено и рећи менаџеру сесије да је све у реду:
session.loaded_rows = cursor.rowcount
session.successful = TrueТо је све.
На распродаји ручно креирамо циљну плочу. Овде сам себи дозволио малу машину:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)користим
VerticaOperator()Правим шему базе података и табелу (ако већ не постоје, наравно). Главна ствар је да правилно уредите зависности:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadРезиме
- Па, - рече мали миш, - зар не
Јеси ли убеђен да сам ја најстрашнија животиња у шуми?
Џулија Доналдсон, Груфало
Мислим да кад бисмо моје колеге и ја имали такмичење: ко ће брзо креирати и покренути ЕТЛ процес од нуле: они са својим ССИС-ом и мишем а ја са Аирфлов-ом... А онда бисмо упоредили и лакоћу одржавања... Вау, мислим да ћете се сложити да ћу их победити на свим фронтовима!
Ако мало озбиљније, онда је Апацхе Аирфлов – описујући процесе у виду програмског кода – урадио мој посао много удобније и пријатније.
Његова неограничена проширивост, како у погледу додатака тако и предиспозиције за скалабилност, даје вам прилику да користите Аирфлов у готово свим областима: чак иу пуном циклусу прикупљања, припреме и обраде података, чак и при лансирању ракета (на Марс, од наравно).
Део завршни, референца и информација
Грабље које смо прикупили за вас
start_date. Да, ово је већ локални мем. Преко Даговог главног аргументаstart_dateсве додавање. Укратко, ако наведете уstart_dateактуелни датум, иschedule_interval- једног дана, онда ће ДАГ почети сутра не раније.start_date = datetime(2020, 7, 7, 0, 1, 2)И нема више проблема.
Постоји још једна грешка током извршавања која је повезана са њим:
Task is missing the start_date parameter, што најчешће указује на то да сте заборавили да се повежете са даг оператором.- Све на једној машини. Да, и базе (сам Аирфлов и наш премаз), и веб сервер, и планер, и радници. И чак је успело. Али временом је број задатака за услуге растао, и када је ПостгреСКЛ почео да одговара на индекс за 20 с уместо за 5 мс, ми смо га узели и однели.
- ЛоцалЕкецутор. Да, још седимо на њему, а већ смо дошли до ивице провалије. ЛоцалЕкецутор нам је до сада био довољан, али сада је време да се проширимо са најмање једним радником, а ми ћемо морати да се потрудимо да пређемо на ЦелериЕкецутор. А с обзиром на то да са њим можете да радите на једној машини, ништа вас не спречава да користите Целери чак ни на серверу, који „наравно, никада неће ући у производњу, искрено!“
- Неупотреба уграђени алати:
- veze за чување акредитива услуге,
- СЛА Миссес да одговори на задатке који нису успели на време,
- КСЦом за размену метаподатака (рекао сам metaподатака!) између даг задатака.
- Злоупотреба поште. Па, шта да кажем? Постављена су упозорења за сва понављања палих задатака. Сада мој Гмаил на послу има >90 е-порука од Аирфлов-а, а њушка веб поште одбија да покупи и избрише више од 100 истовремено.
Више замки:
Више алата за аутоматизацију
Да бисмо још више радили главом, а не рукама, Аирфлов нам је припремио ово:
- - и даље има статус Огледног, што га не спречава да ради. Помоћу њега не можете само да добијете информације о даговима и задацима, већ и да зауставите/покренете даг, креирате ДАГ Рун или скуп.
- - многи алати су доступни преко командне линије који нису само незгодни за коришћење преко ВебУИ, већ су генерално одсутни. На пример:
backfillпотребно за поновно покретање инстанци задатка.
На пример, дошли су аналитичари и рекли: „А ти, друже, имаш глупости у подацима од 1. до 13. јануара! Поправи, поправи, поправи, поправи!" А ти си таква плоча:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Основна услуга:
initdb,resetdb,upgradedb,checkdb. run, што вам омогућава да покренете задатак једне инстанце, па чак и да оцените све зависности. Штавише, можете га покренути прекоLocalExecutor, чак и ако имате кластер целера.- Ради отприлике исту ствар
test, само иу базама не пише ништа. connectionsомогућава масовно стварање веза из љуске.
- - прилично хардкор начин интеракције, који је намењен додацима, а не ројење у њему малим рукама. Али ко ће нас спречити да идемо
/home/airflow/dags, трцатиipythonи почети да се зезаш? Можете, на пример, да извезете све везе са следећим кодом:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Повезивање са базом метаподатака Аирфлов. Не препоручујем да пишете на њега, али добијање стања задатака за различите специфичне метрике може бити много брже и лакше него преко било ког АПИ-ја.
Рецимо да нису сви наши задаци идемпотентни, али понекад могу пасти, и то је нормално. Али неколико блокада је већ сумњиво и било би неопходно проверити.
Чувајте се СКЛ!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
референце
И наравно, првих десет линкова из издавања Гугла је садржај фолдера Аирфлов из мојих обележивача.
- - наравно, морамо почети од канцеларије. документацију, али ко чита упутства?
- - Па прочитајте бар препоруке креатора.
- - сам почетак: кориснички интерфејс у сликама
- - основни појмови су добро описани, ако (одједном!) нешто од мене нисте разумели.
- - кратак водич за подешавање Аирфлов кластера.
- - скоро исти занимљив чланак, осим можда више формализма и мање примера.
- — о раду у сарадњи са Целери.
- - о идемпотентности задатака, учитавању по ИД-у уместо датуму, трансформацији, структури фајла и другим занимљивостима.
- - зависности задатака и Триггер Руле, које сам поменуо само успутно.
- - како превазићи неке "радове како је предвиђено" у планеру, учитати изгубљене податке и одредити приоритет задатака.
- — корисни СКЛ упити за метаподатке Аирфлов-а.
- - постоји користан одељак о креирању прилагођеног сензора.
- — занимљива кратка белешка о изградњи инфраструктуре на АВС-у за науку о подацима.
- - честе грешке (када неко и даље не чита упутства).
- - насмејте се како људи чувају лозинке, иако можете једноставно користити Цоннецтионс.
- - имплицитно ДАГ прослеђивање, убацивање контекста у функције, опет о зависностима, као и о прескакању покретања задатака.
- - о употреби
default argumentsиparamsу шаблонима, као и променљиве и везе. - - прича о томе како се планер припрема за Аирфлов 2.0.
- - мало застарели чланак о постављању нашег кластера
docker-compose. - - динамички задаци који користе шаблоне и прослеђивање контекста.
- — стандардна и прилагођена обавештења путем поште и Слацк-а.
- - Задаци гранања, макрои и КСЦом.
И линкови коришћени у чланку:
- - чувари места доступни за употребу у шаблонима.
- — Уобичајене грешке при стварању дагова.
- -
docker-composeза експериментисање, отклањање грешака и још много тога. - — Питхон омотач за Телеграм РЕСТ АПИ.
Извор: ввв.хабр.цом




