Прывітанне, я Зміцер Лагвіненка – Data Engineer аддзела аналітыкі групы кампаній «Вязе».
Я раскажу вам пра выдатную прыладу для распрацоўкі ETL-працэсаў — Apache Airflow. Але Airflow настолькі ўніверсальны і шматгранны, што вам варта дагледзецца да яго нават калі вы не займаецеся струменямі дадзеных, а маеце запатрабаванне перыядычна запускаць якія-небудзь працэсы і сачыць за іх выкананнем.
І так, я буду не толькі расказваць, але і паказваць: у праграме шмат кода, скрыншотаў і рэкамендацый.

Што звычайна бачыш, калі гугліш слова Airflow / Wikimedia Commons
Змест
Увядзенне
Apache Airflow – ён прама як Django:
- напісаны на Python,
- ёсць выдатная адмінка,
- неабмежавана пашыраем,
— толькі лепш, ды і зроблены зусім для іншых мэт, а менавіта (як напісана да ката):
- запуск і маніторынг задач на неабмежаванай колькасці машын (колькі вам дазволіць Celery/Kubernetes і ваша сумленне)
- з дынамічнай генерацыяй workflow з вельмі лёгкага для напісання і ўспрыманні Python-кода
- і магчымасцю звязваць сябар з сябра любыя базы дадзеных і API з дапамогай як гатовых кампанентаў, так і самаробных плагінаў (што робіцца надзвычай проста).
Мы выкарыстоўваем Apache Airflow так:
- збіраем дадзеныя з розных крыніц (мноства інстансаў SQL Server і PostgreSQL, розныя API з метрыкамі прыкладанняў, нават 1С) у DWH і ODS (у нас гэта Vertica і Clickhouse).
- як прасунуты
cron, які запускае працэсы кансалідацыі дадзеных на ODS, а таксама сочыць за іх абслугоўваннем.
Да нядаўняга часу нашы запатрабаванні пакрываў адзін невялікі сервер на 32 ядрах і 50 GB аператыўкі. У Airflow пры гэтым працуе:
- больш 200 дагоў (Уласна workflows, у якія мы набілі задачы),
- у кожным у сярэднім па 70 цягак,
- запускаецца гэта дабро (таксама ў сярэднім) раз у гадзіну.
А пра тое, як мы пашыраліся, я напішу ніжэй, а зараз давайце вызначым über-задачу, якую мы будзем вырашаць:
Ёсць тры зыходных SQL Server'а, на кожным па 50 баз дадзеных - інстансаў аднаго праекта, адпаведна, структура ў іх аднолькавая (амаль усюды, муа-ха-ха), а значыць у кожнай ёсць табліца Orders (балазе табліцу з такой назвай можна заштурхаць у любы бізнэс). Мы забіраем дадзеныя, дадаючы службовыя палі (сервер-крыніца, база-крыніца, ідэнтыфікатар ETL-задачы) і наіўнай выявай кінем іх у, скажам, Vertica.
Паехалі!
Частка асноўная, практычная (і крыху тэарэтычная)
Навошта яно нам (і вам)
Калі дрэвы былі вялікімі, а я быў простым SQL-шчыкам у адным расійскім рытэйле, мы шпарылі ETL-працэсы aka струмені дадзеных з дапамогай двух даступных нам сродкаў:
- Informatica Power Center - вельмі раскідзістая сістэма, надзвычай прадукцыйная, са сваімі жалязякамі, уласным версіяваннем. Выкарыстоўваў я дай бог 1% яе магчымасцяў. Чаму? Ну, па-першае, гэты інтэрфейс недзе з нулявых псіхічна ціснуў на нас. Па-другое, гэтая штуковіна заменчаная пад надзвычай наварочаныя працэсы, лютае перавыкарыстанне кампанентаў і іншыя вельмі-важныя-энтэрпрайз-фішачкі. Пра тое што варта яна, як крыло Airbus A380/год, мы прамаўчым.
Асцярожна, скрыншот можа зрабіць людзям малодшай 30 крыху балюча

- SQL Server Integration Server — гэтым таварышам мы карысталіся ў сваіх унутрыпраектных патоках. Ну а на самой справе: SQL Server мы ўжо выкарыстоўваем, і не карыстацца яго ETL-тулзы было б неяк неразумна. Усё ў ім добра: і інтэрфейс прыгожы, і справаздачы выканання… Але не за гэта мы любім праграмныя прадукты, ох не за гэта. Версіянаваць яго
dtsx(які ўяўляе сабой XML з якія змешваюцца пры захаванні нодамі) мы можам, а толку? А зрабіць пакет цягоў, які перацягне сотню табліц з аднаго сервера на іншы? Ды што сотню, у вас ад дваццаці штук адваліцца паказальны палец, які пстрыкае па мышынай кнопцы. Але выглядае ён, вызначана, больш модна:
Мы безумоўна шукалі выхады. Справа нават амаль дайшло да самапіснага генератара SSIS-пакетаў…
… а потым мяне знайшла новая праца. А на ёй мяне нагнаў Apache Airflow.
Калі я даведаўся, што апісанні ETL-працэсаў - гэта просты Python-код, я толькі што не скакаў ад радасці. Вось так плыні дадзеных падвергліся версіянаванню і дыфу, а ссыпаць табліцы з адзінай структурай з сотні баз дадзеных у адзін таргет стала справай Python-кода ў паўтара-два 13” экрана.
Збіраны кластар
Давайце не ўладкоўваць зусім ужо дзіцячы сад, і не казаць тут пра зусім відавочныя рэчы, накшталт усталёўкі Airflow, абранай вамі БД, Celery і іншых спраў, апісаных у доках.
Каб мы маглі адразу прыступіць да эксперыментаў, я накідаў docker-compose.yml у якім:
- Падымем уласна Паветраны паток: Scheduler, Webserver. Там жа будзе круціцца Flower для маніторынгу Celery-задач (таму што яго ўжо заштурхалі ў
apache/airflow:1.10.10-python3.7, а мы і не супраць); - PostgreSQL, у які Airflow будзе пісаць сваю службовую інфармацыю (дадзеныя планавальніка, статыстыка выканання і т. д.), а Celery – адзначаць завершаныя цягі;
- Redis, які будзе выступаць брокерам задач для Celery;
- Celery worker, які і зоймецца непасрэдным выкананнем задачак.
- У тэчку
./dagsмы будзем складаць нашы файлы з апісаннем дагоў. Яны будуць падхапляцца на лета, таму ператоргваць увесь стэк пасля кожнага чыха не трэба.
Дзе-нідзе код у прыкладах прыведзены не цалкам (каб не загрувашчваць тэкст), а дзесьці ён мадыфікуецца ў працэсе. Суцэльныя працавальныя прыклады кода можна паглядзець у рэпазітары .
докер-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerЗаўвагі:
- У зборцы кампозу я шмат у чым абапіраўся на вядомую выяву - абавязкова паглядзіце. Можа вам у жыцці больш нічога і не спатрэбіцца.
- Усе наладкі Airflow даступныя не толькі праз
airflow.cfg, Але і праз зменныя асяроддзі (слава распрацоўнікам), чым я злосна скарыстаўся. - Натуральна, ён не production-ready: я наўмысна не ставіў heartbeats на кантэйнеры, не затлумляўся з бяспекай. Але мінімум, прыдатны для нашых эксперыментыкаў я зрабіў.
- Звярніце ўвагу, што:
- Тэчка з дагамі павінна быць даступная як планавальніку, так і воркерам.
- Тое ж самае тычыцца і ўсіх іншых бібліятэк - яны ўсе павінны быць устаноўлены на машыны з шэдулерам і воркерамі.
Ну а зараз проста:
$ docker-compose up --scale worker=3Пасля таго, як усё паднімецца, можна глядзець на вэб-інтэрфейсы:
- Паток паветра:
- Кветка:
асноўныя паняцці
Калі вы нічога не зразумелі ва ўсіх гэтых "дагах", то вось кароткі слоўнічак:
- Планавальнік – самы галоўны дзядзька ў Airflow, які кантралюе, каб укалывалі робаты, а не чалавек: сочыць за раскладам, абнаўляе дагі, запускае цягі.
Наогул, у старых версіях, у яго былі праблемы з памяццю (не, не амнезія, а ўцечкі) і ў канфігах нават застаўся легасі-параметр
run_duration- Інтэрвал яго перазапуску. Але зараз усё добра. - ДЗЕНЬ (ён жа "даг") - "накіраваны ацыклічны граф", але такое вызначэнне мала каму што скажа, а па сутнасці гэта кантэйнер для ўзаемадзейнічаюць адзін з адным цягачаў (гл. ніжэй) або аналаг Package ў SSIS і Workflow ў Informatica.
Апроч дагаў яшчэ могуць быць сабдагі, але мы да іх хутчэй за ўсё не дабяромся.
- DAG Run - ініцыялізаваны даг, якому прысвоены свой
execution_date. Даграны аднаго дага могуць суцэль працаваць раўналежна (калі вы, вядома, зрабілі свае цягі ідэмпатэнтнымі). - аператар - Гэта кавалачкі кода, адказныя за выкананне якога-небудзь канкрэтнага дзеяння. Ёсць тры тыпу аператараў:
- дзеянне, як напрыклад наш каханы
PythonOperator, які ў сілах выканаць любы (валідны) Python-код; - пераклад, якія перавозяць дадзеныя з месца на месца, скажам,
MsSqlToHiveTransfer; - датчык ж дазволіць рэагаваць або прытармазіць далейшае выкананне дага да наступлення якой-небудзь падзеі.
HttpSensorможа тузаць паказаны эндпойнт, і калі дачакаецца патрэбны адказ, запусціць трансферGoogleCloudStorageToS3Operator. Дапытлівы розум спытае: «навошта? Бо можна рабіць паўторы прама ў аператары!» А затым, каб не забіваць пул цягоў падвіслым аператарамі. Сэнсар запускаецца, правярае і памірае да наступнай спробы.
- дзеянне, як напрыклад наш каханы
- Задача - Абвешчаныя аператары па-за залежнасці ад тыпу і прымацаваныя да дагу павышаюцца да чыну цяга.
- Task instance - калі генерал-планавальнік вырашыў, што цягі сітавіна адпраўляць у бой на выканаўцы-воркеры (прама на месцы, калі мы выкарыстаем
LocalExecutorці на выдаленую ноду ў выпадку зCeleryExecutor), ён прызначае ім кантэкст (т. е. камплект зменных - параметраў выканання), разгортвае шаблоны каманд або запытаў і складае іх у пул.
Генеруем цягі
Перш абазначым агульную схему нашага дага, а затым будзем усё больш і больш апускацца ў дэталі, таму што мы ўжываем некаторыя нетрывіяльныя рашэнні.
Такім чынам, у найпростым выглядзе падобны даг будзе выглядаць так:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Давайце разбірацца:
- Спачатку імпартуем патрэбныя лісы і сёе-тое яшчэ;
sql_server_ds- ГэтаList[namedtuple[str, str]]з імёнамі канэктаў з Airflow Connections і базамі дадзеных з якіх мы будзем забіраць нашу таблічку;dag- Аб'ява нашага дага, якое абавязкова павінна ляжаць уglobals(), інакш Airflow яго не знойдзе. Дагу таксама трэба сказаць:- што яго клічуць
orders– гэтае імя потым будзе маячыць у вэб-інтэрфейсе, - што працаваць ён будзе, пачынаючы з паўночы восьмага ліпеня,
- а запускаць ён павінен, прыкладна кожныя 6 гадзін (для крутых хлопцаў тут замест
timedelta()дапушчальнаяcron-радок0 0 0/6 ? * * *, для менш крутых - выраз накшталт@daily);
- што яго клічуць
workflow()будзе рабіць асноўную працу, але не зараз. Цяпер мы проста высыпем наш кантэкст у лог.- А зараз простая магія стварэння цягал:
- прабягаем па нашых крыніцах;
- ініцыялізуем
PythonOperator, які будзе выконваць нашу пустышкуworkflow(). Не забывайце ўказваць унікальнае (у рамках дага) імя цяга і падвязваць сам даг. Сцягprovide_contextу сваю чаргу насыпець у функцыю дадатковых аргументаў, якія мы беражліва збяром з дапамогай**context.
Пакуль на гэтым усё. Што мы атрымалі:
- новы даг у вэб-інтэрфейсе,
- паўтары сотні цягінаў, якія будуць выконвацца раўналежна (калі то дазволяць налады Airflow, Celery і магутнасці сервераў).
Ну амаль атрымалі.

Залежнасці хто будзе ставіць?
Каб усю гэтую справу спрасціць я ўкарачыў у docker-compose.yml апрацоўку requirements.txt на ўсіх нодах.
Вось зараз панеслася:

Шэрыя квадрацікі - task instances, апрацаваныя планавальнікам.
Трохі чакаем, задачы расхопліваюць воркеры:

Зялёныя, зразумелая справа, - паспяхова адпрацавалі. Чырвоныя - не вельмі паспяхова.
Дарэчы, на нашым продзе ніякай тэчкі
./dags, якая сінхранізуецца паміж машынамі няма - усе дагі ляжаць уgitна нашым Gitlab, а Gitlab CI раскладвае абнаўлення на машыны пры мэрджы ўmaster.
Трохі аб Flower
Пакуль воркеры малоцяць нашы тасачкі-пустышкі, успомнім пра яшчэ адзін інструмент, які можа нам сёе-тое паказаць – Flower.
Самая першая старонка з сумарнай інфармацыяй па нодам-воркерам:

Самая насычаная старонка з задачамі, якія адправіліся ў працу:

Самая сумная старонка са станам нашага брокера:

Самая яркая старонка - з графікамі стану цягліц і іх часам выканання:

Дагружаем недагружанае
Такім чынам, усе цягі адпрацавалі, можна выносіць параненых.

А параненых аказалася нямала — па тых ці іншых прычынах. У выпадку правільнага выкарыстання Airflow вось гэтыя самыя квадраты кажуць аб тым, што дадзеныя дакладна не даехалі.
Трэба глядзець лог і перазапускаць падалі task instances.
Кінуўшы на любы квадрат, убачым даступныя нам дзеянні:

Можна ўзяць, і зрабіць Clear які ўпаў. Гэта значыць, мы забываемся пра тое, што там нешта завалілася, і той жа самы інстанс цяга сыдзе планавальніку.

Зразумела, што рабіць так мышкай з усімі чырвонымі квадратамі не вельмі гуманна не гэтага мы чакаем ад Airflow. Натуральна, у нас ёсць зброя масавай паразы: Browse/Task Instances

Выберам усё зараз і абнулім націснем правільны пункт:

Пасля ачысткі нашы таксі выглядаюць так (яны ўжо чакаюць не дачакаюцца, калі шэдулер іх заплануе):

Злучэнні, хукі і іншыя зменныя
Самы час паглядзець на наступны DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Усё ж калі-небудзь рабілі абнаўлялку справаздач? Гэта зноў яна: ёсць спіс крыніц, адкуль забраць дадзеныя; ёсць спіс, куды пакласці; не забываем пасігналіць, калі ўсё здарылася ці зламалася (ну гэта не пра нас, не).
Давайце зноў пройдземся па файле і паглядзім на новыя незразумелыя штукі:
from commons.operators import TelegramBotSendMessage- нам нішто не перашкаджае рабіць свае аператары, чым мы і скарысталіся, зрабіўшы невялікую абгортачку для адпраўкі паведамленняў у Разблакаваны. (Пра гэтага аператара мы яшчэ пагаворым ніжэй);default_args={}- Даг можа раздаваць адны і тыя ж аргументы ўсім сваім аператарам;to='{{ var.value.all_the_kings_men }}'- полеtoу нас будзе не захардскураным, а фармаваным дынамічна з дапамогай Jinja і зменнай са спісам email-ов, якую я клапатліва паклаў уAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS- Умова запуску аператара. У нашым выпадку, ліст паляціць босам толькі калі ўсе залежнасці адпрацавалі. паспяхова;tg_bot_conn_id='tg_main'- аргументыconn_idпрымаюць у сябе ідэнтыфікатары злучэнняў, якія мы ствараем уAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- паведамленні ў Telegram паляцяць толькі пры наяўнасці ўпалых цягліц;task_concurrency=1- забараняем адначасовы запуск некалькіх task instances аднаго цяга. У адваротным выпадку, мы атрымаем адначасовы запуск некалькіхVerticaOperator(якія глядзяць на адну табліцу);report_update >> [email, tg]- усёVerticaOperatorсыдуцца ў адпраўцы ліста і паведамленні, вось так:

Але паколькі ў аператараў-натыфікатараў стаяць розныя ўмовы запуску, працаваць будзе толькі адзін. У Tree View усё выглядае некалькі меней навочна:

Скажу пару слоў аб макрасах і іх сябрах зменных.
Макрасы – гэта Jinja-плейсхолдэры, якія могуць падстаўляць розную карысную інфармацыю ў аргументы аператараў. Напрыклад, так:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} разгорнецца ў змесціва зменнай кантэксту execution_date у фармаце YYYY-MM-DD: 2020-07-14. Самае прыемнае, што зменныя кантэксты прыбіваюцца цвікамі да вызначанага інстансу цяга (квадратыку ў Tree View), і пры перазапуску плейсхолдэры расчыняцца ў тыя ж самыя значэнні.
Прысвоеныя значэнні можна глядзець з дапамогай кнопкі Rendered на кожным таск-інстансе. Вось так у цяга з адпраўкай ліста:

А так у цягі з адпраўкай паведамлення:

Поўны спіс убудаваных макрасаў для апошняй даступнай версіі даступны тут:
Больш за тое, з дапамогай плагінаў, мы можам аб'яўляць уласныя макрасы, але гэта ўжо зусім іншая гісторыя.
Апроч наканаваных штук, мы можам падстаўляць значэнні сваіх зменных (вышэй у кодзе я ўжо гэтым скарыстаўся). Створым у Admin/Variables пару штук:

Усё, можна карыстацца:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')У значэнні можа быць скаляр, а можа ляжаць і JSON. У выпадку JSON-а:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}проста выкарыстоўваем шлях да патрэбнага ключа: {{ var.json.bot_config.bot.token }}.
Скажу літаральна адно слова і пакажу адзін скрыншот пра злучэння. Тут усё элементарна: на старонцы Admin/Connections ствараем злучэнне, складаем туды нашы лагіны/паролі і больш спецыфічныя параметры. Вось так:

Паролі можна шыфраваць (больш старанна, чым у варыянце па змаўчанні), а можна не паказваць тып злучэння (як я зрабіў для tg_main) - справа ў тым, што спіс тыпаў зашыты ў мадэлях Airflow і пашырэнню без залажэння ў зыходнікі не паддаецца (калі раптам я чагосьці не дагугліў - прашу мяне паправіць), але атрымаць крэды проста па імені нам нішто не перашкодзіць.
А яшчэ можна зрабіць некалькі злучэнняў з адным імем: у такім разе метад BaseHook.get_connection(), які дастае нам злучэння па імені, будзе аддаваць выпадковага з некалькіх цёзак (было б лагічна зрабіць Round Robin, але пакінем гэта на сумленні распрацоўнікаў Airflow).
Variables і Connections, безумоўна, класныя сродкі, але важна не страціць баланс: якія часткі вашых патокаў вы захоўваеце ўласна ў кодзе, а якія - аддаяце на захоўванне Airflow. З аднаго боку хутка памяняць значэнне, напрыклад, скрыню рассылання, можа быць зручна праз UI. А з другога - гэта ўсё-ткі зварот да мышакліку, ад якога мы (я) хацелі пазбавіцца.
Праца са злучэннямі - гэта адна з задач хукаў. Наогул хукі Airflow - гэта кропкі падлучэння яго да іншых сэрвісаў і бібліятэкам. Напрыклад, JiraHook адкрые для нас кліент для ўзаемадзеяння з Jira (можна задачкі падштурхоўваць туды-сюды), а з дапамогай SambaHook можна запушыць лакальны файл на smb-кропку.
Разбіраны кастамны аператар
І мы ўшчыльную падабраліся да таго, каб паглядзець на тое, як зроблены TelegramBotSendMessage
Код commons/operators.py з уласна аператарам:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Тут, як і астатняе ў Airflow, усё вельмі проста:
- Адспадкаваліся ад
BaseOperator, Які рэалізуе даволі шмат Airflow-спецыфічных штук (паглядзіце на вольным часе) - Абвясцілі палі
template_fields, у якіх Jinja будзе шукаць макрасы для апрацоўкі. - Арганізавалі правільныя аргументы для
__init__(), расставілі змаўчанні, дзе трэба. - Аб ініцыялізацыі продка таксама не забыліся.
- Адкрылі адпаведны хук
TelegramBotHook, атрымалі ад яго аб'ект-кліент - Аверрайднулі (перавызначылі) метад
BaseOperator.execute(), які Airfow будзе паторгваць, калі наступіць час запускаць аператар – у ім мы і рэалізуем асноўнае дзеянне, на забыўшыся залагавацца. (Лагуем, дарэчы, прама ўstdoutиstderr- Airflow усё перахопіць, хораша абгарне, раскладзе, куды трэба.)
Давайце глядзець, што ў нас у commons/hooks.py. Першая частка файліка, з самім хукам:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientЯ нават не ведаю, што тут можна тлумачыць, проста адзначу важныя моманты:
- Успадкоўваемся, думаем над аргументамі - у большасці выпадкаў ён будзе адзін:
conn_id; - Перавызначаем стандартныя метады: я абмежаваўся
get_conn(), у якім я атрымліваю параметры злучэння па імені і ўсяго толькі дастаю секцыюextra(гэтае поле для JSON), у якую я (па сваёй жа інструкцыі!) паклаў токен Telegram-бота:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Ствараю асобнік нашага
TelegramBot, аддаючы яму ўжо канкрэтны токен.
Вось і ўсё. Атрымаць кліент з хука можна з дапамогай TelegramBotHook().clent або TelegramBotHook().get_conn().
І другая частка файліка, у якім я зрабіць мікраабгортачку для Telegram REST API, каб не цягнуць той жа дзеля аднаго метаду sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Правільны шлях - скласці ўсё гэта:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- у плягін, пакласці ў агульнадаступны рэпазітар, і аддаць у Open Source.
Пакуль мы ўсё гэта вывучалі, нашы абнаўленні справаздач паспелі паспяхова заваліцца і адправіць мне ў канал паведамленне аб памылцы. Пайду правяраць, што зноў не так…

У нашым дазе нешта зламалася! А ці не гэтага мы чакалі? Менавіта!
Наліваць-то будзеш?
Адчуваеце, нешта я прапусціў? Як бы абяцаў дадзеныя з SQL Server у Vertica пераліваць, і тут узяў і з'ехаў з тэмы, нягоднік!
Злачынства гэта было наўмысным, я проста абавязаны быў расшыфраваць вам сякую-такую тэрміналогію. Цяпер можна ехаць далей.
План у нас быў такі:
- Зрабіць даг
- Нагенераваць цягі
- Паглядзець, як усё прыгожа
- Прысвойваць заліўкам нумара сесій
- Забраць дадзеныя з SQL Server
- Пакласці дадзеныя ў Vertica
- Сабраць статыстыку
Такім чынам, каб усё гэта запусціць, я зрабіў маленькі дадатак да нашага docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyТам мы падымаем:
- Vertica як хост
dwhз самымі дэфолтнымі наладамі, - тры асобнікі SQL Server,
- напаўняем базы ў апошніх некаторымі дадзенымі (ні ў якім разе не зазірайце ў
mssql_init.py!)
Запускаем усё дабро з дапамогай крыху больш складанай, чым у мінулы раз, каманды:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Што нагенераваў наш цударандамайзер, можна, скарыстаўшыся пунктам Data Profiling/Ad Hoc Query:

Галоўнае, не паказваць гэта аналітыкам
Падрабязна спыняцца на ETL-сесіях я не буду, там усё трывіяльна: які робіцца базу, у ёй таблічку, абарачаем усё мэнэджарам кантэксту, і зараз які робіцца так:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passНадышоў час забраць нашы дадзеныя з нашых паўтары сотняў табліц. Зробім гэта з дапамогай вельмі немудрагелістых радкоў:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- З дапамогай хука атрымаем з Airflow
pymssql-канэкт - У запыт падставім абмежаванне ў выглядзе даты - у функцыю яе падкіне шаблонизатор.
- Скормліваем наш запыт
pandas, які дастане для насDataFrame- Ён нам спатрэбіцца ў далейшым.
Я выкарыстоўваю падстаноўку
{dt}замест параметра запыту%sне таму, што я злосны Бураціна, а таму штоpandasне можа справіцца зpymssqlі падсоўвае апошнямуparams: List, хоць той вельмі хочаtuple.
Таксама звярніце ўвагу, што распрацоўшчыкpymssqlвырашыў больш яго не падтрымліваць, і самы час з'ехаць наpyodbc.
Паглядзім, чым Airflow нашпігаваў аргументы нашых функцый:

Калі дадзеных не аказалася, то працягваць сэнсу няма. Але лічыць заліванне паспяховай таксама дзіўна. Але гэта і не памылка. А-а-а, што рабіць?! А вось што:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException скажа Airflow, што памылкі, уласна няма, а таск мы прапускаем. У інтэрфейсе будзе не зялёны і не чырвоны квадрацік, а колеры pink.
Падкінем нашым дадзеным некалькі калонак:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])А менавіта:
- БД, з якой мы забралі замовы,
- Ідэнтыфікатар нашай заліваючай сесіі (яна будзе рознай на кожны таск),
- Хэш ад крыніцы і ідэнтыфікатара замовы - каб у канчатковай базе (дзе ўсё ссыпецца ў адну табліцу) у нас быў унікальны ідэнтыфікатар замовы.
Застаўся перадапошні крок: заліць усё ў Vertica. А, як ні дзіўна, адзін з самых эфектных эфектыўных спосабаў зрабіць гэта - праз CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Мы робім спецпрыёмнік
StringIO. pandasласкава складзе ў яго нашDataFrameу выглядзеCSV-радок.- Адкрыем злучэнне да нашай каханай Vertica хукам.
- А зараз з дапамогай
copy()адправім нашы дадзеныя прама ў Вертык!
З драйвера забярэм, колькі радкоў засыпалася, і скажам мэнэджару сесіі, што ўсё ОК:
session.loaded_rows = cursor.rowcount
session.successful = TrueВось і ўсё.
На продзе мы ствараем мэтавую таблічку ўручную. Тут я дазволіў сабе невялікі аўтамат:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)Я з дапамогай
VerticaOperator()ствараю схему БД і табліцу (калі іх яшчэ няма, натуральна). Галоўнае, правільна расставіць залежнасці:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadПадводзім вынікі
- Ну вось, - сказала мышаня, - ці не праўда, цяпер
Ты пераканаўся, што ў лесе я самы страшны звер?
Джулія Дональдсан, «Груфала»
Думаю, калі б мы з маімі калегамі зладзілі спаборніцтва: хто хутчэй складзе і запусціць з нуля ETL-працэс: яны са сваімі SSIS і мышкай і я з Airflow… А потым бы мы яшчэ параўналі выгоду суправаджэння… Ух, думаю, вы пагадзіцеся, што я абыду іх па ўсіх франтах!
Калі ж крыху больш сур'ёзны, то Apache Airflow – за кошт апісання працэсаў у выглядзе праграмнага кода – зрабіў маю працу значна зручней і прыемней.
Яго ж неабмежаваная пашыральнасць: як у плане плагінаў, так і схільнасць да маштабаванасці – дае вам магчымасць ужываць Airflow практычна ў любой вобласці: хоць у поўным цыкле збору, падрыхтоўкі і апрацоўкі дадзеных, хоць у запуску ракет (на Марс, вядома ж).
Частка заключная, даведачна-інфармацыйная
Граблі, якія мы сабралі за вас
start_date. Так, гэта ўжо лакальны мемасік. Праз галоўны аргумент дагаstart_dateпраходзяць усе. Коратка, калі паказаць уstart_dateбягучую дату, а ўschedule_interval- Адзін дзень, то DAG запусціцца заўтра не раней.start_date = datetime(2020, 7, 7, 0, 1, 2)І больш ніякіх праблем.
З ім жа звязана і яшчэ адна памылка выканання:
Task is missing the start_date parameter, Якая часцей за ўсё кажа аб тым, што вы забыліся прывязаць да аператара даг.- Усё на адной машыне. Так, і базы (самога Airflow і нашай абмазкі), і вэб-сервер, і планавальнік, і воркеры. І яно нават працавала. Але з часам колькасць задач у сэрвісаў расла, і калі PostgreSQL стаў аддаваць адказ па азначніку за 20 з замест 5 мс, мы яго ўзялі і панеслі.
- LocalExecutor. Так, мы сядзім на ім да гэтага часу, і мы ўжо падышлі да краю прорвы. LocalExecutor'а нам да гэтага часу хапала, але цяпер прыйшла пара пашырыцца мінімум адным воркерам, і прыйдзецца паднапружыцца, каб пераехаць на CeleryExecutor. А з прычыны таго, што з ім можна працаваць і на адной машынай, то нічога не спыняе ад выкарыстання Celery нават не серверы, які "натуральна, ніколі не пайдзе ў прод, чеслово!"
- Невыкарыстанне убудаваных сродкаў:
- Сувязі для захоўвання уліковых дадзеных сэрвісаў,
- SLA Misses для рэагавання на цягі, якія не адпрацавалі своечасова,
- XCom для абмену метададзенымі (я сказаў мэтададзенымі!) паміж цягамі дага.
- Злоўжыванне поштай. Ну што тут сказаць? Былі настроены абвесткі на ўсе паўторы паваленых цягал. Зараз у маім працоўным Gmail >90k лістоў ад Airflow, і вэб-морда пошты адмаўляецца браць і выдаляць больш за па 100 штук за раз.
Больш падводных камянёў:
Сродкі яшчэ большай аўтаматызацыі
Для таго каб нам яшчэ больш працаваць галавой, а не рукамі, Airflow нарыхтавала для нас вось што:
- - Ён да гэтага часу мае статус Experimental, што не перашкаджае яму працаваць. З яго дапамогай можна не толькі атрымліваць інфармацыю аб дагах і цягах, але спыніць/запусціць даг, стварыць DAG Run ці пул.
- - праз камандны радок даступныя многія сродкі, якія не проста нязручныя ў звароце праз WebUI, а наогул адсутнічаюць. Напрыклад:
backfillпатрэбен для паўторнага запуску інстансаў цягак.
Напрыклад, дашлі аналітыкі, кажуць: «А ў вас, таварыш, глупства ў дадзеных з 1 па 13 студзеня! Чыні-чыні-чыні-чыні!». А ты такі хоба:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Абслугоўванне базы:
initdb,resetdb,upgradedb,checkdb. run, які дазваляе запусціць адзін інстанс цяга, ды яшчэ і забіць на ўсё залежнасці. Больш за тое, можна запусціць яго празLocalExecutor, нават калі ў вас Celery-кластар.- Прыкладна тое ж самае робіць
test, толькі яшчэ і ў баз нічога не піша. connectionsдазваляе масава ствараць падлучэнні з шелла.
- - Даволі хардкорны спосаб узаемадзеяння, які прызначаны для плагінаў, а не кешкання ў ім ручанькамі. Але хто ж нам перашкодзіць пайсці ў
/home/airflow/dags, запусціцьipythonі пачаць бязмежжа? Можна, напрыклад, экспартаваць усе падлучэнні такім кодам:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Падключэнне да базы метададзеных Airflow. Пісаць у яе я не рэкамендую, а вось даставаць станы цягліц для розных спецыфічных метрык можна значна хутчэй і прасцей, чым праз любы з API.
Скажам, далёка не ўсе нашы цягі ідэмпатэнтныя, а могуць часам падаць і гэта нармальна. Але некалькі завалаў - гэта ўжо падазрона, і трэба было б праверыць.
Асцярожна, SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
Спасылкі
Ну і натуральна першыя дзесяць спасылак з выдачы гугла змесціва тэчкі Airflow з маіх закладак.
- - вядома, трэба пачаць з оф. дакументацыі, але хто ж чытае інструкцыі?
- - Ну хаця б рэкамендацыі ад стваральнікаў прачытайце.
- - самы пачатак: карыстацкі інтэрфейс у малюнках
- - добра распісаны базавыя паняцці, калі (раптам!) вы нешта не зразумелі ў мяне.
- - кароткі гайд па наладзе Airflow-кластара.
- - амаль такі ж цікавы артыкул, хіба што фармалізму пабольш, а прыкладаў паменш.
- - аб працы ў звязку з Celery.
- - пра ідэмпатэнтнасць цягінаў, загрузку па ID замест даты, трансфармацыі, структуру файлаў і іншыя цікавыя рэчы.
- - залежнасці цягачаў і Trigger Rule, якія я згадаў толькі мімаходзь.
- - як пераадольваць некаторыя "працуе, як задумана" ў планавальніка, загружаць страчаныя дадзеныя і расстаўляць прыярытэты цягоў.
- - карысныя SQL-запыты да метададзеным Airflow.
- - ёсць карысны раздзел пра стварэнне кастамнага сэнсара.
- - Цікавая кароткая нататка аб пабудове інфраструктуры на AWS для Data Science.
- - распаўсюджаныя памылкі (калі сёй-той усёткі не чытае інструкцый).
- - Усміхніцеся, як людзі мыліць захоўванне пароляў, хоць можна проста выкарыстоўваць Connections.
- - Няяўны пракід DAG, закід кантэксту ў функцыі, зноў пра залежнасці, а таксама пра пропуск запускаў цягоў.
- - аб выкарыстанні
default argumentsиparamsу шаблонах, а таксама аб Variables і Connections. - - аповяд аб тым, як планавальнік рыхтуюць да Airflow 2.0.
- - Трохі састарэлы артыкул пра дэплой нашага кластара ў
docker-compose. - - дынамічныя цяг з дапамогай шаблонаў і пракіду кантэксту.
- - стандартныя і кастамныя абвесткі поштай і Slack.
- - Галінавання цягоў, макрасы і XCom.
І спасылкі, задзейнічаныя ў артыкуле:
- - Даступныя для выкарыстання ў шаблонах плэйсхолдэры.
- - Распаўсюджаныя памылкі пры стварэнні дагоў.
- -
docker-composeдля эксперыментаў, адладкі і не толькі. - - Python-абгортка для Telegram REST API.
Крыніца: habr.com




