Hi, mwen se Dmitry Logvinenko - Enjenyè Done nan Depatman an Analytics nan gwoup la Vezet nan konpayi yo.
Mwen pral di ou sou yon zouti bèl bagay pou devlope pwosesis ETL - Apache Airflow. Men, Airflow tèlman versatile ak plizyè aspè ke ou ta dwe pran yon gade pi pre nan li menm si ou pa patisipe nan koule done, men ou gen yon bezwen detanzantan lanse nenpòt pwosesis epi kontwole ekzekisyon yo.
Ak repons lan se wi, mwen pral pa sèlman di, men tou montre: pwogram nan gen yon anpil nan kòd, Ekran ak rekòmandasyon.

Ki sa ou konn wè lè w ap gade sou Google mo Airflow / Wikimedia Commons
Table of Contents
Entwodiksyon
Apache Airflow se jis tankou Django:
- ekri nan piton
- gen yon gwo panèl admin,
- dilatabl endefiniman
- sèlman pi bon, epi li te fè pou rezon konplètman diferan, sètadi (jan sa ekri anvan kat la):
- kouri ak siveyans travay sou yon kantite machin san limit (tankou seleri / Kubernetes ak konsyans ou pral pèmèt ou)
- ak jenerasyon workflow dinamik soti nan trè fasil yo ekri ak konprann kòd Python
- ak kapasite pou konekte nenpòt baz done ak API youn ak lòt lè l sèvi avèk tou de konpozan pare yo ak grefon ki fèt lakay yo (ki se trè senp).
Nou itilize Apache Airflow tankou sa a:
- nou kolekte done ki soti nan divès sous (anpil SQL Server ak PostgreSQL, divès API ak mezi aplikasyon, menm 1C) nan DWH ak ODS (nou gen Vertica ak Clickhouse).
- ki jan avanse
cron, ki kòmanse pwosesis konsolidasyon done yo sou ODS la, epi tou kontwole antretyen yo.
Jiska dènyèman, bezwen nou yo te kouvri pa yon sèl ti sèvè ak 32 nwayo ak 50 GB RAM. Nan Airflow, sa a ap travay:
- plis 200 dag (aktyèlman workflows, kote nou boure travay),
- nan chak an mwayèn 70 travay,
- bonte sa a kòmanse (tou an mwayèn) yon fwa pa èdtan.
Ak sou ki jan nou te elaji, mwen pral ekri anba a, men kounye a ann defini ßber-pwoblèm ke nou pral rezoud:
Gen twa sous SQL Servers, yo chak ak 50 baz done - ka yon pwojè, respektivman, yo gen menm estrikti (prèske tout kote, mua-ha-ha), ki vle di ke chak gen yon tab Lòd (ererezman, yon tab ak sa a). non ka pouse nan nenpòt biznis). Nou pran done yo lè nou ajoute jaden sèvis (sèvè sous, baz done sous, ID travay ETL) ak nayiv jete yo nan, di, Vertica.
Ale!
Pati prensipal la, pratik (ak yon ti kras teyorik)
Poukisa nou (ak ou)
Lè pye bwa yo te gwo epi mwen te senp SQL-schik nan yon sèl Ris Yo Vann an Detay, nou tronpe pwosesis ETL aka koule done lè l sèvi avèk de zouti ki disponib pou nou:
- Sant pouvwa Informatica - yon sistèm trè gaye, trè pwodiktif, ak pwòp pyès ki nan konpitè, vèsyon pwòp li yo. Mwen te itilize Bondye padon 1% nan kapasite li yo. Poukisa? Oke, anvan tout bagay, koòdone sa a, yon kote nan ane 380 yo, mantalman mete presyon sou nou. Dezyèmman, kontrapsyon sa a fèt pou pwosesis trè anpenpan, reutilize eleman kòlè ak lòt ke trik nouvèl trè enpòtan. Sou lefèt ke li koute, tankou zèl la nan èrbus AXNUMX / ane a, nou pa pral di anyen.
Pran prekosyon nou, yon ekran ka fè moun ki poko gen 30 an yon ti kras mal

- SQL sèvè entegrasyon sèvè - nou te itilize kamarad sa a nan flux anndan pwojè nou yo. Oke, an reyalite: nou deja itilize SQL sèvè, epi li ta yon jan kanmenm rezonab pa sèvi ak zouti ETL li yo. Tout bagay nan li bon: tou de koòdone a bèl, ak rapò sou pwogrè yo ... Men, sa a se pa poukisa nou renmen pwodwi lojisyèl, o, pa pou sa. Version li
dtsx(ki se XML ak nĹuds melanje sou sove) nou kapab, men ki pwen an? Kouman sou fè yon pake travay ki pral trennen dè santèn de tab soti nan yon sèvè nan yon lòt? Wi, ki sa ki yon santèn, dwèt endèks ou pral tonbe soti nan ven moso, klike sou bouton an sourit. Men, li definitivman sanble pi alamòd:
Nou sètènman chèche fason pou sòti. Ka menm prèske te vini nan yon dèlko pake SSIS ekri pwòp tèt ou ...
âŚEpi yon nouvo travay jwenn mwen. Ak Apache Airflow depase m 'sou li.
Lè mwen te jwenn ke deskripsyon pwosesis ETL yo se senp kòd Python, mwen jis pa t 'danse pou kè kontan. Sa a se fason kouran done yo te vèsyon ak diferan, ak vide tab ak yon estrikti sèl soti nan dè santèn de baz done nan yon sèl sib te vin yon kesyon de kòd Python nan yon sèl ak yon mwatye oswa de 13 "ekran.
Rasanble gwoup la
Se pou nou pa fè aranjman pou yon jadendanfan konplètman, epi yo pa pale sou bagay konplètman evidan isit la, tankou enstale Airflow, baz done ou chwazi a, seleri ak lòt ka ki dekri nan waf yo.
Pou nou ka imedyatman kòmanse eksperyans, mwen te trase docker-compose.yml nan ki:
- Ann ogmante aktyèlman Vantilasyon: Planifikatè, sèvè Web. Flè pral tou vire la pou kontwole travay seleri (paske li te deja pouse nan
apache/airflow:1.10.10-python3.7, men nou pa gen pwoblèm) - Postgrèskl, nan ki Airflow pral ekri enfòmasyon sèvis li yo (done orè, estatistik ekzekisyon, elatriye), ak seleri pral make travay fini;
- Redis, ki pral aji kòm yon koutye travay pou seleri;
- Travayè seleri, ki pral angaje nan ekzekisyon an dirèk nan travay.
- Nan katab
./dagsnou pral ajoute dosye nou yo ak deskripsyon dags. Yo pral ranmase sou vole a, kidonk pa gen okenn nesesite pou jongle tout pil la apre chak etènye.
Nan kèk kote, kòd la nan egzanp yo pa montre konplètman (pou yo pa ankonbre tèks la), men yon kote li modifye nan pwosesis la. Ou ka jwenn egzanp konplè kòd k ap travay nan depo a .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerNòt:
- Nan asanble a nan konpozisyon an, mwen lajman konte sou imaj la byen li te ye - Asire w ou tcheke li. Petèt ou pa bezwen nenpòt lòt bagay nan lavi ou.
- Tout paramèt Airflow yo disponib pa sèlman nan
airflow.cfg, men tou atravè varyab anviwònman (gras a devlopè yo), ki mwen malveyan te pwofite sou yo. - Natirèlman, li pa pare pou pwodiksyon: mwen fè espre pa mete batman kè sou resipyan, mwen pa t 'anmède ak sekirite. Men, mwen te fè minimòm ki apwopriye pou eksperyans nou yo.
- Sonje ke:
- Katab dag la dwe aksesib pou tou de pwogramè a ak travayè yo.
- Menm bagay la tou aplike nan tout bibliyotèk twazyèm pati - yo tout dwe enstale sou machin ki gen yon orè ak travayè yo.
Oke, kounye a li senp:
$ docker-compose up --scale worker=3Apre tout bagay leve, ou ka gade nan entèfas entènèt yo:
- koule lè:
- Flè:
Konsèp debaz
Si ou pa t 'konprann anyen nan tout "dags" sa yo, Lè sa a, isit la se yon diksyonè kout:
- Planifikateur - Tonton ki pi enpòtan nan Airflow, kontwole ke robo travay di, epi yo pa yon moun: kontwole orè a, mete ajou dags, lanse travay.
An jeneral, nan ansyen vèsyon, li te gen pwoblèm ak memwa (non, pa amnÊsie, men fwit) ak paramèt eritaj la menm rete nan konfigirasyon yo.
run_duration- entèval rekòmanse li yo. Men koulye a, tout bagay anfòm. - Dag (aka "dag") - "dirije graf acyclic", men yon definisyon konsa pral di kèk moun, men an reyalite li se yon veso pou travay kominike youn ak lòt (gade anba a) oswa yon analogue nan pake nan SSIS ak workflow nan Informatica. .
Anplis dag, ka toujou gen subdag, men nou gen plis chans pa pral jwenn yo.
- DAG kouri - inisyalize dag, ki se asiyen pwòp li yo
execution_date. Dagrans nan menm dag ka travay nan paralèl (si ou te fè travay ou idempotent, nan kou). - Operatè se moso kòd ki responsab pou fè yon aksyon espesifik. Gen twa kalite operatè:
- aksyontankou pi renmen nou an
PythonOperator, ki ka egzekite nenpòt (valid) kòd Python; - transfere, ki transpòte done yon kote an yon kote, di,
MsSqlToHiveTransfer; - Capteur an lòt men an, li pral pèmèt ou reyaji oswa ralanti ekzekisyon an plis nan dag la jiskaske yon evènman rive.
HttpSensorka rale pwen final la espesifye, epi lè repons lan vle ap tann, kòmanse transfè aGoogleCloudStorageToS3Operator. Yon lide fouy ap mande: "poukisa? Apre yo tout, ou ka fè repetisyon dwat nan operatè a!" Lè sa a, yo nan lòd yo pa bouche pisin lan nan travay ak operatè sispann. Capteur a kòmanse, tcheke ak mouri anvan pwochen tantativ la.
- aksyontankou pi renmen nou an
- Objektif Travay la - te deklare operatè yo, kèlkeswa kalite, ak tache ak dag la ap monte nan ran de travay.
- egzanp travay - lè planifikatè jeneral la deside ke li te tan voye travay nan batay sou pèfòmè-travayè (dwa sou plas la, si nou itilize
LocalExecutoroswa nan yon ne aleka nan ka a nanCeleryExecutor), li bay yo yon kontèks (sa vle di, yon seri varyab - paramèt ekzekisyon), elaji modèl lòd oswa demann, ak pisin yo.
Nou jenere travay
Premyèman, se pou nou dekri konplo jeneral doug nou an, epi Lè sa a, nou pral plonje nan detay yo pi plis ak plis, paske nou aplike kèk solisyon ki pa trivial.
Se konsa, nan fòm ki pi senp li yo, tankou yon dag pral sanble sa a:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Ann kalkile li:
- Premyèman, nou enpòte lib ki nesesè yo ak yon lòt bagay;
sql_server_ds- EskeList[namedtuple[str, str]]ak non koneksyon yo soti nan Airflow Connections ak baz done ki soti nan ki nou pral pran plak nou an;dag- anonsman dag nou an, ki dwe nesesèman nanglobals(), sinon Airflow pa pral jwenn li. Doug bezwen di tou:- ki jan li rele
orders- non sa a pral parèt nan koòdone entènèt la, - ke li pral travay apati minwi wit jiyè a,
- epi li ta dwe kouri, apeprè chak 6 èdtan (pou mesye difisil isit la olye pou yo
timedelta()akseptabcron-liy0 0 0/6 ? * * *, pou mwens fre - yon ekspresyon tankou@daily);
- ki jan li rele
workflow()pral fè travay prensipal la, men se pa kounye a. Pou kounye a, nou pral jis jete kontèks nou an nan boutèy la.- Epi, koulye a majik la senp nan kreye travay:
- nou kouri atravè sous nou yo;
- inisyalize
PythonOperator, ki pral egzekite enbesil nou anworkflow(). Pa bliye presize yon non inik (nan dag la) nan travay la epi mare dag nan tèt li. Drapoprovide_contextan vire, pral vide agiman adisyonèl nan fonksyon an, ki nou pral ak anpil atansyon kolekte lè l sèvi avèk**context.
Pou kounye a, se tout. Sa nou genyen:
- nouvo dag nan koòdone entènèt la,
- yon sèl ak yon demi san travay ki pral egzekite nan paralèl (si Airflow la, anviwònman seleri ak kapasite sèvè pèmèt li).
Oke, prèske jwenn li.

Ki moun ki pral enstale depandans yo?
Pou senplifye tout bagay sa a, mwen vise nan docker-compose.yml pwosesis requirements.txt sou tout nĹuds.
Kounye a li ale:

Kare gri yo se sikonstans travay yo trete pa orè a.
Nou rete tann yon ti jan, travay yo te menen pa travayè yo:

Vèt yo, nan kou, te konplete avèk siksè travay yo. Wouj yo pa gen anpil siksè.
By wout la, pa gen okenn katab sou prod nou an
./dags, pa gen okenn senkronizasyon ant machin - tout dags kouche nangitsou Gitlab nou an, ak Gitlab CI distribye mizajou nan machin lè fizyone nanmaster.
Yon ti kras sou flè
Pandan travayè yo ap bat sison nou yo, ann sonje yon lòt zouti ki ka montre nou yon bagay - Flè.
Premye paj la ak enfòmasyon rezime sou nĹuds travayè yo:

Paj ki pi entans ak travay ki te ale nan travay:

Paj ki pi raz la ak estati koutye nou an:

Paj ki pi klere a se ak graf estati travay ak tan ekzekisyon yo:

Nou chaje anba chaje a
Se konsa, tout travay yo te travay deyò, ou ka pote ale blese yo.

E te gen anpil blese - pou yon rezon oswa yon lòt. Nan ka a nan itilizasyon kòrèk Airflow, kare sa yo montre ke done yo definitivman pa t 'rive.
Ou bezwen gade boutèy demi lit la epi rekòmanse ka travay ki tonbe yo.
Lè nou klike sou nenpòt kare, nou pral wè aksyon ki disponib pou nou:

Ou ka pran epi fè klè tonbe a. Sa vle di, nou bliye ke yon bagay te echwe la, ak menm travay egzanp lan pral ale nan orè a.

Li klè ke fè sa ak sourit la ak tout kare wouj yo pa trè imen - sa a se pa sa nou espere nan Airflow. Natirèlman, nou gen zam destriksyon mas: Browse/Task Instances

Ann chwazi tout bagay nan yon fwa epi reset a zewo, klike sou atik ki kòrèk la:

Apre w fin netwaye, taksi nou yo sanble sa a (yo deja ap tann pwogramè a pran randevou yo):

Koneksyon, kwòk ak lòt varyab
Li lè pou nou gade pwochen DAG la, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ĐĐžŃпОда Ń
ĐžŃĐžŃио, ĐžŃŃĐľŃŃ ĐžĐąĐ˝ĐžĐ˛ĐťĐľĐ˝Ń"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ĐаŃаŃ, ĐżŃĐžŃŃпаКŃŃ, ĐźŃ {{ dag.dag_id }} ŃŃОниНи
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Ăske tout moun te janm fè yon ajou rapò? Sa a se li ankò: gen yon lis sous ki soti kote yo ka resevwa done yo; gen yon lis kote yo mete; pa bliye klakson lè tout bagay te pase oswa kraze (byen, sa a se pa sou nou, non).
Ann ale nan dosye a ankò epi gade nan nouvo bagay ki fènwa:
from commons.operators import TelegramBotSendMessage- pa gen anyen ki anpeche nou fè pwòp operatè nou yo, ki nou te pwofite fè yon ti anbalaj pou voye mesaj bay Unblocked. (Nou pral pale plis sou operatè sa a anba a);default_args={}- dag ka distribye menm agiman yo bay tout operatè li yo;to='{{ var.value.all_the_kings_men }}'- jadentonou pa pral gen hardcoded, men dinamik pwodwi lè l sèvi avèk Jinja ak yon varyab ak yon lis imèl, ke mwen ak anpil atansyon mete nanAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESSâ kondisyon pou kòmanse operatè a. Nan ka nou an, lèt la pral vole bay patwon yo sèlman si tout depandans yo te travay deyò avèk siksè;tg_bot_conn_id='tg_main'- agimanconn_idaksepte ID koneksyon ke nou kreye nanAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- mesaj nan Telegram pral vole ale sèlman si gen travay ki tonbe;task_concurrency=1- nou entèdi lansman similtane plizyè ka travay nan yon sèl travay. Sinon, nou pral jwenn lansman similtane plizyèVerticaOperator(gade yon tab);report_update >> [email, tg]- toutVerticaOperatorkonvèje nan voye lèt ak mesaj, tankou sa a:

Men, depi operatè notifikatè yo gen diferan kondisyon lansman, yon sèl pral travay. Nan Tree View, tout bagay sanble yon ti kras mwens vizyèl:

Mwen pral di kèk mo sou makro ak zanmi yo - varyab yo.
Makro yo se yon plas Jinja ki ka ranplase divès enfòmasyon itil nan agiman operatè yo. Pou egzanp, tankou sa a:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} pral elaji nan sa ki nan varyab kontèks la execution_date nan fòma YYYY-MM-DD: 2020-07-14. Pati ki pi bon an se ke varyab kontèks yo kloure nan yon egzanp travay espesifik (yon kare nan Tree View la), epi lè rekòmanse, placeholders yo ap elaji nan menm valè yo.
Valè yo asiyen yo ka wè lè l sèvi avèk bouton an Rann sou chak egzanp travay. Men ki jan travay la ak voye yon lèt:

Se konsa, nan travay la ak voye yon mesaj:

Yon lis konplè makro entegre pou dènye vèsyon ki disponib la disponib isit la:
Anplis, avèk èd nan grefon, nou ka deklare makro pwòp nou yo, men sa a se yon lòt istwa.
Anplis de bagay sa yo predefini, nou ka ranplase valè yo nan varyab nou yo (mwen te deja itilize sa a nan kòd ki pi wo a). Ann kreye nan Admin/Variables yon koup de bagay:

Tout sa ou ka itilize:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Valè a kapab yon escalar, oswa li kapab tou JSON. Nan ka JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}jis itilize chemen an nan kle a vle: {{ var.json.bot_config.bot.token }}.
Mwen pral literalman di yon mo epi montre yon ekran sou koneksyon. Tout bagay se elemantè isit la: sou paj la Admin/Connections nou kreye yon koneksyon, ajoute logins / modpas nou yo ak paramèt plis espesifik la. Tankou sa a:

Modpas yo ka chiffres (plis byen pase default la), oswa ou ka kite kalite koneksyon an (jan mwen te fè pou tg_main) - reyalite a se ke lis la nan kalite yo fil elektrik nan modèl Airflow epi yo pa ka elaji san yo pa antre nan kòd sous yo (si toudenkou mwen pa t 'google yon bagay, tanpri korije m'), men pa gen anyen ki pral anpeche nou jwenn kredi jis pa non.
Ou kapab tou fè plizyè koneksyon ak menm non an: nan ka sa a, metòd la BaseHook.get_connection(), ki fè nou koneksyon pa non, ap bay o aza soti nan plizyè omonim (li ta pi lojik fè Round Robin, men ann kite li sou konsyans devlopè yo Airflow).
Varyab ak Koneksyon yo se sètènman zouti fre, men li enpòtan pou pa pèdi balans lan: ki pati nan koule ou ou estoke nan kòd la li menm, ak ki pati ou bay Airflow pou depo. Sou yon bò, li ka pratik byen vit chanje valè a, pou egzanp, yon bwat postal, atravè UI la. Nan lòt men an, sa a se toujou yon retounen nan klike sou la sourit, ki soti nan ki nou (mwen) te vle debarase m de.
Travay ak koneksyon se youn nan travay yo kwòk. An jeneral, Kwòk Airflow yo se pwen pou konekte li ak sèvis twazyèm pati ak bibliyotèk. Pa egzanp, JiraHook pral louvri yon kliyan pou nou kominike avèk Jira (ou ka deplase travay ale vini), epi avèk èd nan SambaHook ou ka pouse yon dosye lokal nan smb-pwen.
Analize operatè a koutim
Epi nou te tou pre gade ki jan li te fè TelegramBotSendMessage
Kòd commons/operators.py ak operatè aktyèl la:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Isit la, tankou tout lòt bagay nan Airflow, tout bagay trè senp:
- Eritye de
BaseOperator, ki aplike byen kèk bagay espesifik Airflow (gade lwazi ou) - Jaden ki deklare
template_fields, nan ki Jinja pral gade pou makro nan pwosesis. - Ranje agiman yo dwa pou
__init__(), mete defo yo kote sa nesesè. - Nou pa t bliye sou inisyalizasyon zansèt la tou.
- Louvri zen ki koresponn lan
TelegramBotHookte resevwa yon objè kliyan nan men li. - Metòd overridden (redefini).
BaseOperator.execute(), ki Airfow pral twitch lè lè a rive lanse operatè a - nan li nou pral aplike aksyon prensipal la, bliye konekte. (Nou konekte, nan chemen an, dwa nanstdoutиstderr- Airflow pral entèsepte tout bagay, vlope li trè byen, dekonpoze li kote sa nesesè.)
Ann wè sa nou genyen commons/hooks.py. Premye pati nan dosye a, ak zen nan tèt li:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientMwen pa menm konnen ki sa yo eksplike isit la, mwen pral jis sonje pwen enpòtan yo:
- Nou eritye, reflechi sou agiman yo - nan pifò ka li pral youn:
conn_id; - Depase metòd estanda: Mwen limite tèt mwen
get_conn(), nan ki mwen jwenn paramèt yo koneksyon pa non epi jis jwenn seksyon anextra(sa a se yon jaden JSON), kote mwen (dapre pwòp enstriksyon mwen!) mete siy bot Telegram la:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Mwen kreye yon egzanp nou an
TelegramBot, bay li yon siy espesifik.
Se tout. Ou ka jwenn yon kliyan nan yon zen lè l sèvi avèk TelegramBotHook().clent oswa TelegramBotHook().get_conn().
Ak dezyèm pati a nan dosye a, nan ki mwen fè yon mikwowrapper pou Telegram REST API a, pou yo pa trennen menm bagay la. pou yon metòd sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Fason ki kòrèk la se ajoute tout bagay:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- nan Plugin la, mete nan yon depo piblik, epi bay li nan Open Source.
Pandan nou t ap etidye tout bagay sa yo, mizajou rapò nou yo te reyisi echwe epi voye yon mesaj erè pou mwen nan kanal la. Mwen pral tcheke pou wè si li mal...

Yon bagay te kraze nan doge nou an! Ăske se pa sa nou t ap tann? Egzakteman!
Ou pral vide?
Ou santi mwen rate yon bagay? Li sanble ke li te pwomèt yo transfere done ki soti nan SQL sèvè nan Vertica, ak Lè sa a, li te pran li epi li deplase sou sijè a, kannay la!
Atwosite sa a te entansyonèl, mwen te senpleman oblije dechifre kèk tèminoloji pou ou. Koulye a, ou ka ale pi lwen.
Plan nou an te sa a:
- Fè dag
- Jenere travay
- Gade jan tout bagay bèl
- Bay nimewo sesyon yo pou ranpli
- Jwenn done ki soti nan SQL sèvè
- Mete done yo nan Vertica
- Kolekte estatistik
Se konsa, pou jwenn tout bagay sa yo ak kouri, mwen te fè yon ti adisyon nan nou an docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pySe la nou leve:
- Vertica kòm lame
dwhak paramèt ki pi default yo, - twa ka SQL sèvè,
- nou ranpli baz done yo nan lèt la ak kèk done (nan okenn ka gade nan
mssql_init.py!)
Nou lanse tout bon ak èd yon kòmandman yon ti kras pi konplike pase dènye fwa:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Ki sa ki randomize mirak nou an te pwodwi, ou ka itilize atik la Data Profiling/Ad Hoc Query:

Bagay pwensipal lan se pa montre li bay analis yo
elabore sou Sesyon ETL yo Mwen pa pral, tout bagay se trivial la: nou fè yon baz, gen yon siy nan li, nou vlope tout bagay ak yon manadjè kontèks, epi kounye a nou fè sa:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passLè a rive kolekte done nou yo soti nan yon sèl ak yon mwatye tab nou an. Ann fè sa avèk èd nan liy trè modestes:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Avèk èd nan yon zen nou jwenn nan Airflow
pymssql-konekte - Ann ranplase yon restriksyon nan fòm yon dat nan demann lan - li pral jete nan fonksyon an pa motè a modèl.
- Nouri demann nou an
pandaski moun ki pral jwenn nouDataFrame- li pral itil nou nan tan kap vini an.
Mwen sèvi ak sibstitisyon
{dt}olye de yon paramèt demann%spa paske mwen se yon Pinokyo sa ki mal, men paskepandaspa ka okipepymssqlepi li glise dènye aparams: Listbyenke li vrèman vletuple.
Epitou sonje ke pwomotè apymssqldeside pa sipòte l 'ankò, epi li lè yo deplase sotipyodbc.
Ann wè ki sa Airflow te boure agiman fonksyon nou yo:

Si pa gen okenn done, Lè sa a, pa gen okenn pwen nan kontinye. Men, li se tou etranj yo konsidere ranpli a siksè. Men, sa a se pa yon erè. A-ah-ah, kisa pou w fè?! Ak isit la se sa:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException di Airflow ke pa gen okenn erè, men nou sote travay la. Koòdone a pa pral gen yon kare vèt oswa wouj, men woz.
Ann voye done nou yo plizyè kolòn:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])savwa
- Baz done kote nou te pran lòd yo,
- ID sesyon inondasyon nou an (li pral diferan pou chak travay),
- Yon hash soti nan sous la ak ID lòd - pou ke nan baz done final la (kote tout bagay vide nan yon tab) nou gen yon ID lòd inik.
Penultyèm etap la rete: vide tout bagay nan Vertica. Epi, etranj ase, youn nan fason ki pi espektakilè ak efikas pou fè sa se atravè CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Nou ap fè yon reseptè espesyal
StringIO. pandaspral dous mete nouDataFramenan fòm lanCSV-liy.- Ann louvri yon koneksyon ak Vertica pi renmen nou an ak yon zen.
- Epi, koulye a ak èd la
copy()voye done nou yo dirèkteman nan Vertika!
Nou pral pran nan men chofè a konbyen liy yo te ranpli, epi di manadjè sesyon an ke tout bagay anfòm:
session.loaded_rows = cursor.rowcount
session.successful = TrueSe tout.
Sou vant la, nou kreye plak sib la manyèlman. Isit la mwen pèmèt tèt mwen yon ti machin:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)Mwen ap itilize
VerticaOperator()Mwen kreye yon chema baz done ak yon tab (si yo pa deja egziste, nan kou). Bagay pwensipal lan se kòrèkteman fè aranjman pou depandans yo:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadAdisyon moute
- Oke, - di ti sourit la, - se pa li, kounye a
Ăske ou konvenki ke mwen se bèt ki pi terib nan forè a?
Julia Donaldson, Gruffalo a
Mwen panse ke si kòlèg mwen yo ak mwen te gen yon konpetisyon: ki moun ki pral byen vit kreye ak lanse yon pwosesis ETL soti nan grafouyen: yo ak SSIS yo ak yon sourit ak m 'ak Airflow ... Lè sa a, nou ta tou konpare fasilite nan antretyen ... Wow, mwen panse ou pral dakò ke mwen pral bat yo sou tout fwon!
Si yon ti kras pi seryezman, Lè sa a, Apache Airflow - pa dekri pwosesis nan fòm lan nan kòd pwogram - te fè travay mwen. anpil pi konfòtab ak agreyab.
Ekstansiblite san limit li yo, tou de an tèm de plug-ins ak predispozisyon pou ÊvolutivitÊ, ba ou opòtinite pou yo sèvi ak Airflow nan prèske nenpòt zòn: menm nan sik la plen nan kolekte, prepare ak trete done, menm nan lansman fize (nan Mas, nan kou).
Pati final, referans ak enfòmasyon
Rate nou te ranmase pou ou
start_date. Wi, sa a se deja yon mem lokal. Via agiman prensipal Doug lastart_datetout pase. Yon ti tan, si ou presize nanstart_datedat aktyèl la, epischedule_interval- yon jou, Lè sa a, DAG ap kòmanse demen pa pi bonè.start_date = datetime(2020, 7, 7, 0, 1, 2)E pa gen plis pwoblèm.
Gen yon lòt erè ègzekutabl ki asosye ak li:
Task is missing the start_date parameter, ki pi souvan endike ke ou bliye mare nan operatè a dag.- Tout sou yon sèl machin. Wi, ak baz (Airflow tèt li ak kouch nou an), ak yon sèvè entènèt, ak yon orè, ak travayè yo. E li menm te travay. Men, apre yon sèten tan, kantite travay pou sèvis yo te grandi, epi lè PostgreSQL te kòmanse reponn a endèks la nan 20 s olye pou yo 5 ms, nou te pran li epi pote li ale.
- LocalExecutor. Wi, nou toujou chita sou li, epi nou deja rive nan kwen an nan gwo twou san fon an. LocalExecutor te ase pou nou jiskaprezan, men kounye a li lè yo elaji ak omwen yon travayè, epi nou pral oblije travay di pou ale nan CeleryExecutor. Epi nan lefèt ke ou ka travay avèk li sou yon sèl machin, pa gen anyen ki anpeche w sèvi ak seleri menm sou yon sèvè, ki "nan kou, pa janm pral antre nan pwodiksyon, onètman!"
- Ki pa sèvi ak zouti entegre:
- Koneksyon pou estoke kalifikasyon sèvis yo,
- SLA Miss pou reponn a travay ki pa t travay alè,
- xcom pou echanj metadata (mwen te di metadone!) ant travay dag.
- Abi lapòs. Bon, kisa mwen ka di? Alèt yo te mete kanpe pou tout repetisyon nan travay tonbe. Koulye a, Gmail travay mwen an gen> 90k imèl ki soti nan Airflow, ak mizo lapòs entènèt la refize ranmase ak efase plis pase 100 nan yon moman.
Plis enkonvenyans:
Plis zouti automatisation
Pou nou travay plis toujou ak tèt nou e non pa ak men nou, Airflow te prepare pou nou sa:
- - li toujou gen estati Experimental, ki pa anpeche l travay. Avèk li, ou ka pa sèlman jwenn enfòmasyon sou dags ak travay, men tou, sispann / kòmanse yon dag, kreye yon DAG Run oswa yon pisin.
- - gen anpil zouti ki disponib atravè liy kòmand ki pa sèlman enkonvenyan pou itilize atravè WebUI a, men yo jeneralman absan. Pa egzanp:
backfillbezwen rekòmanse ka travay yo.
Pa egzanp, analis yo te vini epi yo te di: "Epi ou menm, kamarad, gen istwa san sans nan done yo soti nan 1ye rive 13 janvye! Ranje li, ranje li, ranje li, ranje li!" Epi ou se tankou yon recho:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Sèvis de baz:
initdb,resetdb,upgradedb,checkdb. run, ki pèmèt ou kouri yon sèl travay egzanp, e menm nòt sou tout depandans. Anplis, ou ka kouri li atravèLocalExecutor, menm si ou gen yon gwoup seleri.- Fè prèske menm bagay la
test, sèlman tou nan baz ekri anyen. connectionspèmèt kreyasyon an mas koneksyon soti nan koki a.
- - yon fason pito hardcore nan kominike, ki se gen entansyon pou grefon, epi yo pa foule nan li ak men ti kras. Men, kiyès ki pou anpeche nou ale
/home/airflow/dags, kouriipythonepi kòmanse dezòd? Ou ka, pou egzanp, ekspòte tout koneksyon ak kòd sa a:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Konekte ak metadatabase Airflow la. Mwen pa rekòmande pou ekri li, men jwenn eta travay pou divès mezi espesifik ka pi vit ak pi fasil pase atravè nenpòt nan API yo.
Ann di ke se pa tout travay nou yo idempotan, men yo ka pafwa tonbe, e sa a se nòmal. Men, kèk blokaj yo deja sispèk, e li ta nesesè yo tcheke.
Pran prekosyon nou SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
Referans
Ak nan kou, dis premye lyen ki soti nan emisyon an nan Google yo se sa ki nan folder nan Airflow soti nan Bookmarks mwen an.
- - nan kou, nou dwe kòmanse ak biwo a. dokiman, men ki moun ki li enstriksyon yo?
- - Oke, omwen li rekòmandasyon yo nan men crÊateur yo.
- - kòmansman an anpil: koòdone itilizatè a nan foto
- - konsèp debaz yo byen dekri, si (toudenkou!) Ou pa t 'konprann yon bagay nan men mwen.
- - yon gid kout pou mete kanpe yon gwoup Airflow.
- - prèske menm atik la enteresan, eksepte petèt plis fòmèl, ak mwens egzanp.
- â sou travay ansanm ak seleri.
- - sou idempotity nan travay, chaje pa ID olye pou yo dat, transfòmasyon, estrikti dosye ak lòt bagay enteresan.
- - Depandans nan travay ak Règ deklanche, ki mwen mansyone sèlman an pase.
- - Ki jan yo simonte kèk "travay jan yo gen entansyon" nan orè a, chaje done pèdi ak priyorite travay.
- â rekèt SQL itil nan metadata Airflow.
- - gen yon seksyon itil sou kreye yon Capteur koutim.
- â yon ti nòt enteresan sou bati yon enfrastrikti sou AWS pou Data Science.
- - erè komen (lè yon moun toujou pa li enstriksyon yo).
- - souri ki jan moun yo beki estoke modpas, byenke ou ka jis itilize Koneksyon.
- - Transmisyon implicite DAG, kontèks voye nan fonksyon, ankò sou depandans, epi tou sou sote lanse travay.
- - sou itilizasyon an
default argumentsиparamsnan modèl, osi byen ke Varyab ak Koneksyon. - - yon istwa sou fason planifikatè a ap prepare pou Airflow 2.0.
- - yon atik yon ti kras demode sou deplwaye gwoup nou an nan
docker-compose. - - travay dinamik lè l sèvi avèk modèl ak transmisyon kontèks.
- â notifikasyon estanda ak koutim pa lapòs ak Slack.
- - Branch travay, makro ak XCom.
Ak lyen yo itilize nan atik la:
- - anplasman ki disponib pou itilize nan modèl.
- â Erè komen lè kreye dags.
- -
docker-composepou eksperimantasyon, debogaj ak plis ankò. - - Python wrapper pou Telegram REST API.
Sous: www.habr.com




