Silav, ez Dmitry Logvinenko me - Endezyarê Daneyên Beşa Analytics ya koma pargîdaniyên Vezet.
Ez ê ji we re li ser amûrek ecêb ji bo pêşkeftina pêvajoyên ETL - Apache Airflow vebêjim. Lê Airflow ew qas pirreng û piralî ye ku divê hûn ji nêz ve lê binihêrin heke hûn di herikîna daneyan de nebin jî, lê hewcedariya we heye ku hûn pêvajoyek periyodîk bidin destpêkirin û pêkanîna wan bişopînin.
Erê, ez ê ne tenê bibêjim, lê di heman demê de destnîşan bikim: bername gelek kod, dîmen û pêşniyar hene.

Dema ku hûn peyva Airflow / Wikimedia Commons di google de bi gelemperî hûn dibînin
Table of Contents
Pîrozbahiyê
Apache Airflow mîna Django ye:
- bi python hatiye nivîsandin
- panelek mezin a rêveberiyê heye,
- bêdawî berfireh dibe
- tenê çêtir e, û ew ji bo mebestên bi tevahî cûda hate çêkirin, ango (wek ku berî katê hatî nivîsandin):
- peywirên xebitandin û çavdêrîkirina li ser hejmarek bêsînor makîneyan (wek ku gelek Celery / Kubernetes û wijdanê we dê destûrê bidin we)
- bi hilberîna xebata dînamîkî ya ji nivîsandin û fêmkirina koda Python pir hêsan e
- û şiyana girêdana her databas û API-yê bi hev re bi karanîna hem hêmanên amade û hem jî pêvekên malê-çêkirî (ku zehf hêsan e).
Em Apache Airflow bi vî rengî bikar tînin:
- em daneyan ji çavkaniyên cihêreng (gelek mînakên SQL Server û PostgreSQL, API-yên cihêreng ên bi pîvanên serîlêdanê, tewra 1C) di DWH û ODS de berhev dikin (me Vertica û Clickhouse hene).
- çiqas pêşketî
cron, ku pêvajoyên hevgirtina daneyan li ser ODS dest pê dike, û di heman demê de lênihêrîna wan jî dişopîne.
Heya vê dawiyê, hewcedariyên me ji hêla serverek piçûk ve bi 32 core û 50 GB RAM ve dihatin vegirtin. Di Airflow de, ev kar dike:
- более 200 dar (bi rastî pêlên xebatê, ku me tê de peywir pêk anî),
- di her navencî de 70 wezîfe,
- ev qencî dest pê dike (di heman demê de bi navînî) saetê carekê.
Û li ser ka me çawa berfireh kir, ez ê li jêr binivîsim, lê naha em pirsgirêka über-a ku em ê çareser bikin diyar bikin:
Sê Pêşkêşkerên SQL-ya çavkaniyê hene, her yek bi 50 databasên - mînakên yek projeyê, bi rêzê ve, wan xwedî heman avahî ne (hema hema li her deverê, mua-ha-ha), ku tê vê wateyê ku her yekê tabloyek Order heye (bextane, tabloyek bi wê nav dikare di her karsaziyê de were avêtin). Em daneyan bi lêzêdekirina qadên karûbarê (pêşkêşkara çavkaniyê, databasa çavkaniyê, Nasnameya peywirê ya ETL) digirin û wan bi dilşikestî diavêjin, bêje, Vertica.
Em herin!
Beşa sereke, pratîk (û piçek teorîk)
Çima em (û hûn)
Dema ku darên mezin bûn û ez sade bûm SQL-di yek firotgehek rûsî de, me bi du amûrên ku ji me re hene bi pêvajoyên ETL yên wekî herikîna daneyê xapandin:
- Navenda Hêza Informatica - pergalek pir belavbûyî, zehf hilber, bi hardware xwe, guhertoya xwe. Min Xwedê nehêle 1% ji kapasîteyên wê bikar anîn. Çima? Belê, berî her tiştî, ev navber, ji salên 380-an vir ve, bi zihniyet zext li me kir. Ya duyemîn, ev tevlihevî ji bo pêvajoyên pir xweşik, ji nû ve karanîna hêmanên hêrs û hîleyên din ên pargîdanî yên pir girîng hatî çêkirin. Li ser lêçûna wê, mîna baskê Airbus AXNUMX / sal, em ê tiştek nebêjin.
Hişyar bin, dîmenek dikare hinekî zirarê bide mirovên di bin 30 salî de

- Pêşkêşkara Yekbûna Servera SQL - Me ev rêheval di nava projeyên xwe de bi kar anî. Welê, bi rastî: em jixwe SQL Server bikar tînin, û ew ê hinekî ne maqûl be ku em amûrên wê ETL bikar neynin. Her tişt di wê de baş e: hem navbeynkar xweşik e, hem jî raporên pêşkeftinê ... Lê ne ji ber vê yekê em ji hilberên nermalavê hez dikin, oh, ne ji bo vê yekê. Guhertoya wê
dtsx(ku XML bi girêkên ku li ser hilanînê veqetandî ye) em dikarin, lê mesele çi ye? Ma hûn pakêtek peywirê çêbikin ku dê bi sedan tablo ji serverek berbi ya din bikişîne? Erê, çi sed e, tiliya te ya nîşanê dê ji bîst perçeyan bikeve, bişkoka mişkê bike. Lê bê guman ew modatir xuya dike:
Bê guman em li rêyên derketinê geriyan. Case even hema hema hat ber jeneratorek pakêtê ya SSIS-a xwe-nivîskî ...
…û paşê karekî nû min dît. Û Apache Airflow min li ser wê girt.
Gava ku min fêhm kir ku danasînên pêvajoya ETL koda Python-ê hêsan in, min tenê ji şahiyê dans nekir. Bi vî rengî herikên daneyê hatin guhertokirin û cûda kirin, û rijandina tabloyên bi avahiyek yekane ji bi sedan databasan di nav yek armancê de bû mijara koda Python di yek û nîv an du ekranên 13 ”an de.
Komkirina komê
Werin em bi tevahî zarokxaneyek saz nekin, û li vir li ser tiştên bi tevahî eşkere neaxivin, mîna sazkirina Airflow, databasa weya bijartî, Celery û rewşên din ên ku di doşkeyan de têne diyar kirin.
Ji bo ku em tavilê dest bi ceribandinan bikin, min xêz kir docker-compose.yml di kîjanê de:
- Bi rastî em rabin şibaka: Bername, Webserver. Kulîlk dê li wir jî dizivire da ku karên Celery bişopîne (ji ber ku ew ji berê ve hatî avêtin
apache/airflow:1.10.10-python3.7, lê em xem nakin) - PostgreSQL, ku tê de Airflow dê agahdariya karûbarê xwe binivîse (daneyên plansazker, statîstîkên darvekirinê, hwd.), û Celery dê karên qedandî nîşan bide;
- Redis, ku dê ji bo Celery wekî brokerek peywirê tevbigere;
- Karkerê sêlê, ku dê bi cîbicîkirina rasterast a karan ve mijûl bibe.
- Ji bo peldankê
./dagsem ê pelên xwe bi danasîna dags lê zêde bikin. Ew ê di firînê de werin hilanîn, ji ber vê yekê ne hewce ye ku piştî her birûskê bi tevahî stêrk biqelînin.
Li hin deveran, koda di mînakan de bi tevahî nayê xuyang kirin (da ku nivîsê tevlihev neke), lê li cîhek ew di pêvajoyê de tê guheztin. Nimûneyên kodên xebatê yên bêkêmasî dikarin di depoyê de werin dîtin .
docker-berhevkirin.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerNotes:
- Di civîna kompozîsyonê de, min bi piranî xwe spart wêneyê naskirî - Bê guman wê kontrol bikin. Dibe ku hûn di jiyana xwe de ne hewce ne tiştek din.
- Hemî mîhengên hewayê ne tenê bi navgîniyê têne peyda kirin
airflow.cfg, di heman demê de bi navgîniya guhêrbarên hawîrdorê (spas ji pêşdebiran re), ku min bi xerabî jê sûd girt. - Bi xwezayî, ew ne amade ye ji bo hilberînê: Min bi qestî lêdanên dil nexist ser konteyneran, min ji ewlehiyê aciz nekir. Lê min ji bo ceribandinên me kêmtirîn guncan kir.
- Têbînî ku:
- Peldanka dag divê hem ji plansazker û hem jî ji xebatkaran re bigihîje.
- Heman tişt ji bo hemî pirtûkxaneyên partiya sêyemîn jî derbas dibe - divê ew hemî li ser makîneyên bi plansazker û xebatkaran werin saz kirin.
Belê, niha ew hêsan e:
$ docker-compose up --scale worker=3Piştî ku her tişt rabe, hûn dikarin li navgînên malperê binêrin:
- Airflow:
- Kûlîlk:
Têgehên bingehîn
Ger we di van hemî "dag"an de tiştek fam nekiribe, wê hingê li vir ferhengek kurt heye:
- Scheduler - Mamê herî girîng ê di Airflow de, kontrol dike ku robot bi dijwarî dixebitin, û ne kesek: nexşeyê dişopîne, dakêşan nûve dike, karan dest pê dike.
Bi gelemperî, di guhertoyên kevn de, wî bi bîranînê re pirsgirêkên wî hebûn (na, ne amnesia, lê diherikin) û pîvana mîrasê jî di mîhengan de ma.
run_duration- navbera wê ya ji nû ve destpêkirinê. Lê niha her tişt baş e. - DAG (aka "dag") - "grafiya asîklîk a rêvekirî", lê pênaseyek wusa dê ji çend kesan re vebêje, lê bi rastî ew konteynir e ji bo peywirên ku bi hevûdu re têkildar in (li jêr binêre) an analogek Pakêtê di SSIS û Workflow di Informatica de .
Ji xeynî dagan, dibe ku hîn jî binerd hebin, lê bi îhtîmalek mezin em ê negihîjin wan.
- DAG Run - dag destpêkî, ku bi xwe ve hatî destnîşan kirin
execution_date. Dagranên heman dagê dikarin paralel bixebitin (heke we karên xwe bêhêz kirin, bê guman). - Makînevan parçeyên kodê ne ku ji bo pêkanîna çalakiyek taybetî berpirsiyar in. Sê celeb operator hene:
- çalakîwek favorite me
PythonOperator, ku dikare her kodek Python (derbasdar) bixebite; - derbaskirin, ku daneyan ji cihekî vediguhezîne cîh, bêje,
MsSqlToHiveTransfer; - sensor ji hêla din ve, ew ê bihêle ku hûn bertek nîşan bidin an jî înfazkirina din a dagê hêdî bikin heya ku bûyerek çêbibe.
HttpSensordikare xala dawiya diyarkirî bikişîne, û gava ku bersiva xwestî li bendê ye, veguheztinê dest pê bikeGoogleCloudStorageToS3Operator. Hişê lêkolîner dê bipirse: "Çima? Beriya her tiştî, hûn dikarin di operatorê de rast dubare bikin!” Û dûv re, ji bo ku hewza peywiran bi operatorên rawestandî re neyê girtin. Sensor dest pê dike, kontrol dike û berî hewildana din dimire.
- çalakîwek favorite me
- Karî - Operatorên diyarkirî, bêyî ku celeb be, û bi dag ve girêdayî ne di rêza peywirê de têne pêşve xistin.
- mînaka peywirê - gava ku plansazê giştî biryar da ku ew dem e ku peywiran bişîne şer li ser karker-karkeran (rast di cih de, ger em bikar bînin
LocalExecutoran ji bo node dûr di dozaCeleryExecutor), ew ji wan re çarçoveyek destnîşan dike (ango, komek guhêrbar - parametreyên darvekirinê), şablonên ferman an pirsê berfireh dike, û wan berhev dike.
Em peywiran çêdikin
Pêşî, bila em nexşeya giştî ya doçka xwe xêz bikin, û dûv re jî em ê bêtir û bêtir hûrguliyan bişopînin, ji ber ku em hin çareseriyên ne-pîvan bicîh tînin.
Ji ber vê yekê, di forma xweya herî hêsan de, dagek wusa dê bi vî rengî xuya bike:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Ka em wê bihesibînin:
- Pêşî, em libên pêwîst û tiştekî din;
sql_server_dsYeList[namedtuple[str, str]]bi navên girêdanên ji Girêdanên Airflow û databasên ku em ê plakaya xwe jê bigirin;dag- Ragihandina dagê me, ku pêdivî ye ku tê de beglobals(), Wekî din Airflow wê nabîne. Doug jî hewce dike ku bêje:- navê wî çi ye
orders- Dê ev nav paşê di navgîniya malperê de xuya bibe, - ku ew ê ji nîvê şeva heştê Tîrmehê bixebite,
- û divê ew bimeşe, hema hema her 6 demjimêran (ji bo xortên dijwar li vir li şûna
timedelta()dibecron-xet0 0 0/6 ? * * *, ji bo kêm sar - îfadeyek mîna@daily);
- navê wî çi ye
workflow()dê karê sereke bike, lê ne niha. Heya nuha, em ê tenê naveroka xwe bavêjin têketinê.- Û naha sêrbaziya hêsan a afirandina karan:
- em bi çavkaniyên xwe diherikin;
- destpêk kirin
PythonOperator, ku dê dumê me îdam bikeworkflow(). Ji bîr nekin ku navek yekta (di nav dagê de) ya peywirê diyar bikin û dag bixwe girêdin. Alprovide_contextdi encamê de, dê argumanên din jî birijîne nav fonksiyonê, ku em ê bi baldarî bikar bînin berhev bikin**context.
Ji bo niha, ew hemû. Tiştê ku me girt:
- xala nû di navgîniya malperê de,
- sed û nîv peywirên ku dê bi paralelî bêne darve kirin (heke mîhengên Airflow, Celery û kapasîteya serverê destûrê bidin).
Baş e, hema ew girt.

Kî dê girêdanan saz bike?
Ji bo hêsankirina vê tevahiyê, min şikand docker-compose.yml xebitandinî requirements.txt li ser hemû girêkan.
Niha ew çû:

Qadên gewr mînakên peywirê ne ku ji hêla plansazker ve têne hilberandin.
Em hinekî li bendê dimînin, peywir ji hêla karkeran ve têne hilanîn:

Yên kesk, helbet karê xwe bi serkeftî qedandine. Sor ne pir serkeftî ne.
Bi awayê, peldankek li ser hilberê me tune
./dags, di navbera makîneyan de hevdengiyek tune - hemî dag di nav de negitli ser Gitlab-a me, û Gitlab CI dema ku tev li hev dibin nûvekirinan li makîneyan belav dikemaster.
Hinekî li ser Flower
Dema ku xebatkar pîsîkên me diqelînin, em amûrek din a ku dikare tiştek nîşanî me bide bi bîr bînin - Kulîlk.
Rûpelê yekem bi kurteya agahdariya li ser girêkên karker:

Rûpelê herî zexm bi karên ku ketine xebatê:

Rûpelê herî bêzar bi statûya brokerê me:

Rûpelê herî geş bi grafikên rewşa peywirê û dema pêkanîna wan e:

Em barkirina bin bar dikin
Ji ber vê yekê, hemî peywir bi ser ketin, hûn dikarin birîndaran rakin.

Û gelek birîndar bûn - ji ber sedemek an sedemek din. Di doza karanîna rast a Airflow de, van çargoşeyan destnîşan dikin ku dane bê guman nehatine.
Pêdivî ye ku hûn têketinê temaşe bikin û bûyerên peywirê yên ketî ji nû ve bidin destpêkirin.
Bi tikandina li ser çargoşeyekê, em ê çalakiyên ku ji me re peyda dibin bibînin:

Hûn dikarin bikevin û paqij bikin. Ango, em ji bîr dikin ku tiştek li wir têk çûye, û heman peywira nimûneyê dê biçe ser plansazker.

Eşkere ye ku kirina vê yekê bi mişkê bi hemî çarçikên sor re ne pir mirovahî ye - ev ne ya ku em ji Airflow hêvî dikin e. Bi xwezayî, çekên me yên qirkirina komî hene: Browse/Task Instances

Ka em her tiştî bi yekcarî hilbijêrin û sifirê vegerînin, tiştê rast bikirtînin:

Piştî paqijkirinê, taksiyên me bi vî rengî xuya dikin (ew jixwe li benda plansazker in ku wan bi rê ve bibe):

Girêdan, çeng û guhêrbarên din
Wext e ku meriv li DAG-a din binêre, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Ma her kesî carî nûvekirina raporê kiriye? Ev dîsa ew e: navnîşek jêderan heye ku meriv ji ku derê daneyê bigire; lîsteyek heye ku meriv lê bixin; ji bîr nekin ku gava her tişt qewimî an şikest, xêz bikin (baş, ev ne li ser me ye, na).
Ka em dîsa li pelê bigerin û li tiştên nû yên nezelal binêrin:
from commons.operators import TelegramBotSendMessage- Tiştek me nahêle ku em operatorên xwe çêbikin, ku me jê sûd werdigire bi çêkirina pêçek piçûk ji bo şandina peyaman ji Unblocked re. (Em ê li jêr li ser vê operatorê bêtir biaxivin);default_args={}- dag dikare heman argumanan li hemî operatorên xwe belav bike;to='{{ var.value.all_the_kings_men }}'- zeviyêtoem ê ne kodkirî nebin, lê bi dînamîk bi karanîna Jinja û guhêrbarek bi navnîşek e-nameyê, ku min bi baldarî tê de danîne, têne çêkirin.Admin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS- şertê destpêkirina operatorê. Di rewşa me de, nameyê tenê heke hemî girêdan bi ser ketine dê berbi patronan ve biçe bi serkeftî;tg_bot_conn_id='tg_main'- argumentênconn_idNasnameyên pêwendiyê yên ku em tê de diafirînin qebûl bikinAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- peyamên di Telegram de tenê heke peywirên ketî hebin dê bifirin;task_concurrency=1- em destpêkirina hevdemî ya çend nimûneyên peywirê yên yek peywirê qedexe dikin. Wekî din, em ê hemwext destpêkirina çendan bistîninVerticaOperator(li yek maseyê dinêre);report_update >> [email, tg]- hemîVerticaOperatordi şandina name û peyaman de li hev bicivin, bi vî rengî:

Lê ji ber ku operatorên ragihandinê xwedî şert û mercên destpêkirina cihê ne, tenê yek dê bixebite. Di Dîtina Darê de, her tişt hinekî kêmtir dîtbar xuya dike:

Ez ê li ser çend peyvan bibêjim makro û hevalên wan - variables.
Makro cîhên Jinja ne ku dikarin agahdariyên cihêreng ên kêrhatî di nav argumanên operatorê de biguhezînin. Mînakî, bi vî rengî:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} dê berbi naveroka guhêrbara kontekstê ve bibe execution_date di forma YYYY-MM-DD: 2020-07-14. Beşa çêtirîn ev e ku guhêrbarên çarçoveyê li mînakek peywirek taybetî (çargoşeyek di Nîşana Darê de) têne girêdan, û gava ku ji nû ve dest pê bikin, cîhgir dê li heman nirxan berfireh bibin.
Nirxên destnîşankirî dikarin bi karanîna bişkoka Rendered li ser her mînakek peywirê werin dîtin. Karê şandina nameyê bi vî rengî ye:

Û bi vî awayî di karê şandina peyamê de:

Navnîşek bêkêmasî ya makroyên çêkirî yên ji bo guhertoya herî dawî ya berdest li vir heye:
Wekî din, bi alîkariya pêvekan, em dikarin makroyên xwe ragihînin, lê ew çîrokek din e.
Ji bilî tiştên pêşwext, em dikarin nirxên guhêrbarên xwe biguhezînin (min berê di koda jorîn de ev bikar anî). Werin em biafirînin Admin/Variables çend tişt:

Her tiştê ku hûn dikarin bikar bînin:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Nirx dikare pîvanek be, an jî dikare JSON be. Di doza JSON de:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}tenê riya mifteya xwestinê bikar bînin: {{ var.json.bot_config.bot.token }}.
Ez ê bi rastî yek gotinê bibêjim û yek dîmenek li ser nîşan bidim girêdan. Li vir her tişt bingehîn e: li ser rûpelê Admin/Connections em têkiliyek çêdikin, têketin / şîfreyên xwe û pîvanên taybetî li wir zêde dikin. Welî evê:

Şîfre dikarin bêne şîfre kirin (ji ya xwerû bi hûrgulî), an hûn dikarin celebê girêdanê bihêlin (wek ku min ji bo tg_main) - Rastî ev e ku navnîşa celeban di modelên Airflow de hişk e û bêyî ketina kodên çavkaniyê nayê berfireh kirin (heke ji nişka ve min tiştek google nekiriye, ji kerema xwe min rast bike), lê tiştek dê me nehêle ku em tenê bi kredî bistînin. nav.
Di heman demê de hûn dikarin bi heman navî çend pêwendiyan çêbikin: di vê rewşê de, rêbaz BaseHook.get_connection(), ku ji me re girêdanên bi navê, dê bide bêpayîn ji çend navan (dê maqûltir be ku meriv Round Robin çêbike, lê bila em wê li ser wijdana pêşdebirên Airflow bihêlin).
Guherîn û Têkilî bê guman amûrên xweş in, lê girîng e ku hûn hevsengiyê winda nekin: hûn kîjan parçeyên herikên xwe di kodê de hildigirin, û hûn kîjan parçeyan ji bo hilanînê didin Airflow. Ji aliyek ve, ew dikare hêsan be ku meriv zû nirxê biguhezîne, mînakî, qutiyek e-nameyê, bi navgîniya UI-yê. Ji hêla din ve, ev hîn jî vegerek e li klîk mişkê, ya ku me (min) dixwest jê xilas bibe.
Karkirina bi girêdanan yek ji wan karan e hooks. Bi gelemperî, çîpên Airflow xalên girêdana wê bi karûbar û pirtûkxaneyên partiya sêyemîn re ne. Mînak, JiraHook dê ji me re xerîdarek veke ku em bi Jira re têkilî daynin (hûn dikarin peywiran bi paş û paş ve bikin), û bi alîkariya SambaHook hûn dikarin pelek herêmî bişopînin smb-cî.
Parsing operatorê xwerû
Û em nêzîkî dîtina wê yekê bûn ku ew çawa hatî çêkirin TelegramBotSendMessage
code commons/operators.py bi operatorê rastîn:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Li vir, mîna her tiştê din di Airflow de, her tişt pir hêsan e:
- mîrata ji
BaseOperator, ku çend tiştên taybetî yên hewayê bicîh tîne (li dema vala xwe binihêre) - Zeviyên diyar kirin
template_fields, ku tê de Jinja dê li makroyan bigere ku pêvajoyê bike. - Ji bo argumanên rast rêz kirin
__init__(), li cîhê ku pêwîst be pêşandan saz bikin. - Me destpêkirina bav û kalan jî ji bîr nekir.
- Kulika têkildar vekir
TelegramBotHookjê tiştekî muşterek standiye. - Rêbaza sergirtî (ji nû ve pênasekirin).
BaseOperator.execute(), ku Airfow ê gava ku dema destpêkirina operatorê tê biqelişe - di wê de em ê çalakiya sereke bicîh bînin, ji bîr nekin ku têkevinê. (Em bi awayê, rast têkevinstdoutиstderr- Herikîna hewayê dê her tiştî bigire, wê bi xweşikî pêça, li cîhê ku hewce bike wê hilweşîne.)
Ka em bibînin ka çi heye commons/hooks.py. Beşa yekem a pelê, bi çengelê bixwe:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientEz jî nizanim li vir çi şirove bikim, ez ê tenê xalên girîng destnîşan bikim:
- Em mîras digirin, li ser argumanan difikirin - di pir rewşan de ew ê yek be:
conn_id; - Rêbazên standard ên berbiçav: Min xwe bi sînor kir
get_conn(), ku tê de ez pîvanên pêwendiyê bi navê xwe distînim û tenê beşê distînimextra(ev zeviyek JSON e), ku ez tê de (li gorî rêwerzên xwe!) tokena botê ya Telegram danîne:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Ez mînakek me çêdikim
TelegramBot, nîşanek taybetî dide.
Navê pêger. Hûn dikarin xerîdarek ji hook bikar bînin TelegramBotHook().clent an TelegramBotHook().get_conn().
Û beşa duyemîn a pelê, ku tê de ez ji bo Telegram REST API-ya mîkrokûpê çêdikim, da ku heman yekê nekişîne ji bo yek rêbazê sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Awayê rast ev e ku hûn hemî lê zêde bikin:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- di pêvekê de, têxin depoyek giştî, û wê bidin Çavkaniya Vekirî.
Dema ku me van hemîyan dixwend, nûvekirinên raporta me bi serfirazî têk çûn û di kanalê de peyamek xeletiyek ji min re şandin. Ez ê kontrol bikim ka ew xelet e…

Tiştek di dojka me de şikest! Ma ev ne ya ku em li bendê bûn? Tam!
Ma hûn ê birijînin?
Ma hûn hîs dikin ku min tiştek winda kiriye? Wusa dixuye ku wî soz da ku daneyan ji SQL Serverê veguhezîne Vertica, û dûv re wî ew hilda û ji mijarê dûr ket, pîs!
Ev hovîtî bi qestî bû, min bi tenê neçar ma ku ji we re hin termînolojiyê deşîfre bikim. Niha hûn dikarin bêtir biçin.
Plana me ev bû:
- Bikin
- Karûbaran çêbikin
- Binêrin her tişt çiqas xweş e
- Ji bo dagirtin hejmarên danişînê destnîşan bikin
- Daneyên ji SQL Server bistînin
- Daneyên xwe bixin nav Vertica
- Statîstîkan berhev bikin
Ji ber vê yekê, ji bo ku ev hemî rabe û bixebite, min pêvekek piçûk ji me re çêkir docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyLi wir em bilind dikin:
- Vertica wekî mêvandar
dwhbi mîhengên herî xwerû, - sê mînakên SQL Server,
- em databasên di paşîn de bi hin daneyan tijî dikin (di tu rewşê de nenêrin
mssql_init.py!)
Em bi alîkariya fermanek hinekî tevlihevtir ji ya berê dest pê dikin:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Tiştê ku randomîzatora meya mûcîze çêkiriye, hûn dikarin tiştê bikar bînin Data Profiling/Ad Hoc Query:

Ya sereke ne ew e ku meriv wê ji analîstan re nîşan bide
berfireh kirin danişînên ETL Ez naxwazim, her tişt li wê derê piçûk e: em bingehek çêdikin, tê de nîşanek heye, em her tiştî bi rêveberek çarçovê re dipêçin, û naha em vê yekê dikin:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passDem hatiye daneyên me berhev bikin ji sed û nîv maseyên me. Ka em vê yekê bi alîkariya rêzikên pir bêkêmasî bikin:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Bi alîkariya çengek em ji Airflow distînin
pymssql-bihevgirêdan - Werin em sînorkirinek di forma tarîxê de li daxwazê biguhezînin - ew ê ji hêla motora şablonê ve were avêtin nav fonksiyonê.
- Daxwaza me têr kirin
pandaskî dê me bigireDataFrame- Di pêşerojê de dê ji me re bikêr be.
Ez cîgir bikar tînim
{dt}li şûna parametreyek daxwazê%sne ji ber ku ez Pînokyoyekî xerab im, lê ji ber kupandasnikare bi dest bixepymssqlû ya dawî diqulipîneparams: Listtevî ku ew bi rastî dixwazetuple.
Her weha bala xwe bidin ku pêşdebirpymssqlbiryar da ku êdî piştgirî nede wî, û dem dema derketina derve yepyodbc.
Ka em bibînin ka Airflow argumanên fonksiyonên me bi çi dagirtî:

Ger dane tune be, wê demê tu wateya berdewamkirinê tune. Lê di heman demê de ecêb e ku meriv dagirtina serketî were hesibandin. Lê ev ne xeletiyek e. A-ah-ah, çi bikim?! Û li vir çi ye:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException dê ji Airflow re bêje ku xeletî tune, lê em ji peywirê derdixin. Di navberê de dê çargoşeyek kesk an sor nebe, lê pembe.
Ka em daneyên xwe biavêjin stûnên piralî:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Namely:
- Databasa ku me ferman jê girt,
- Nasnameya rûniştina meya lehiyê (ew ê cûda be ji bo her karekî),
- Hashek ji çavkaniyê û Nasnameya fermanê - da ku di databasa paşîn de (ku her tişt li yek tabloyê tê rijandin) me nasnameyek fermanê ya bêhempa heye.
Pêngava paşîn dimîne: her tiştî biavêjin Vertica. Û, pir ecêb e, yek ji awayên herî balkêş û bikêrhatî ji bo kirina vê yekê bi riya CSV e!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Em wergirê taybet çêdikin
StringIO. pandasdê bi dilovanî me deyninDataFramedi formaCSV- xetên.- Werin em bi çengelê pêwendiyek bi Vertica-ya xweya bijare vekin.
- Û niha bi alîkariyê
copy()daneyên me rasterast ji Vertika re bişînin!
Em ê ji ajokerê bigirin ka çend xet tije bûne, û ji rêveberê rûniştinê re bêjin ku her tişt baş e:
session.loaded_rows = cursor.rowcount
session.successful = TrueNavê pêger.
Di firotanê de, em plakaya armancê bi destan diafirînin. Li vir min destûr da xwe makîneyek piçûk:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)Ez bi kar tînim
VerticaOperator()Ez nexşeyek databas û tabloyek diafirînim (heke ew jixwe nebin, bê guman). Ya sereke ev e ku meriv pêwendiyan bi rêkûpêk saz bike:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadHilberîn
- Belê, - mişkê biçûk got, - ma ne niha
Ma hûn pê bawer in ku ez heywanê herî xedar ê daristanê me?
Julia Donaldson, The Gruffalo
Ez difikirim ku heke min û hevkarên min pêşbaziyek hebûya: kî dê zû pêvajoyek ETL ji sifrê biafirîne û bide destpêkirin: ew bi SSIS û mişkek xwe û ez bi Airflow re ... Û wê hingê em ê hêsaniya lênihêrînê jî bidin ber hev ... Wow, ez difikirim ku hûn ê bipejirînin ku ez ê wan li hemî eniyan bişkînim!
Ger hinekî ciddîtir, wê hingê Apache Airflow - bi danasîna pêvajoyên di forma koda bernameyê de - karê min kir pir rehettir û xweştir.
Berfirehbûna wê ya bêsînor, hem di warê pêvekan de û hem jî ji hêla pîvanbûnê ve, fersendê dide we ku hûn hema hema li her deverê Airflow bikar bînin: tewra di çerxa tevahî ya berhevkirin, amadekirin û hilberandina daneyan de, tewra di avêtina rokêtan de jî (ji bo Marsê, ji kûrs).
Beşa dawî, referans û agahdarî
Raqeya ku me ji we re berhev kiriye
start_date. Erê, ev jixwe memeyek herêmî ye. Bi riya argumana sereke ya Dougstart_datehemû derbas dibin. Bi kurtasî, heke hûn tê de diyar bikinstart_dateroja niha, ûschedule_interval- Rojekê, DAG wê sibe ne zûtir dest pê bike.start_date = datetime(2020, 7, 7, 0, 1, 2)Û bêtir pirsgirêk tune.
Çewtiyek din a dema xebitandinê bi wê re têkildar heye:
Task is missing the start_date parameter, ku pir caran destnîşan dike ku we ji bîr kir ku hûn bi operatorê dag ve girêdin.- Hemî li ser yek makîneyê. Erê, û bingeh (Airflow bixwe û pêlava me), û serverek malperê, û plansazker, û karker. Û ew jî xebitî. Lê bi demê re, hejmara peywirên ji bo karûbaran mezin bû, û gava PostgreSQL dest pê kir ku di 20 deqeyan de li şûna 5 ms bersivê bide navnîşê, me ew girt û hilda.
- LocalExecutor. Erê, em hê jî li ser rûniştin, û em berê xwe dane ber qurmê. LocalExecutor heta niha têra me kiriye, lê niha dem hatiye ku em bi kêmanî yek xebatkarê xwe berfireh bikin, û ji bo ku em biçin CeleryExecutor em ê pir bixebitin. Û ji ber vê yekê ku hûn dikarin bi wê re li ser yek makîneyê bixebitin, tiştek we nahêle ku Celery tewra li ser serverek bikar bînin, ku "bê guman, dê çu carî nekeve hilberînê, bi rastî!"
- Ne-bikaranîna amûrên çêkirî:
- Girêdanên ji bo hilanîna pêbaweriyên karûbarê,
- SLA Misses ji bo bersivdana karên ku di wextê xwe de nexebitin,
- xcom ji bo danûstendina metadata (min got metadata!) di navbera karên dag.
- Mail abuse. Baş e, ez çi bibêjim? Ji bo hemî dubarekirina karên ketî hişyarî hatin danîn. Naha xebata min a Gmail ji Airflow zêdetirî 90 hezar e-nameyên e-nameyê hene, û mêla e-nameyên webê red dike ku di carekê de zêdetirî 100 e-nameyê hilde û jê bibe.
Zehfên bêtir:
Zêdetir amûrên otomatîkê
Ji bo ku em hê bêtir bi serê xwe ne bi destê xwe bixebitin, Airflow ji me re ev amade kiriye:
- - Hîn jî statûya wî ya Ezmûnî heye, ku nahêle ku wî bixebite. Bi wê re, hûn ne tenê dikarin di derbarê dag û peywiran de agahdarî bistînin, lê di heman demê de dagek rawestînin/dest pê bikin, DAG Run an hewzek biafirînin.
- - Gelek amûr bi navgîniya rêzika fermanê ve têne peyda kirin ku ne tenê ji karanîna WebUI-yê nerehet in, lê bi gelemperî tune ne. Bo nimûne:
backfillji bo ji nû ve destpêkirina mînakên peywirê hewce ye.
Mînak analîst hatin û gotin: “Û hevalno, di daneyên 1-13ê Çileyê de tu bêwateyî heye! Rast bikin, rast bikin, rast bikin, rast bikin!" Û tu hobekî wisa yî:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Xizmeta bingehîn:
initdb,resetdb,upgradedb,checkdb. run, ku destûrê dide te ku hûn karekî mînakek bimeşînin, û tewra li ser hemî girêdanan jî bixin. Wekî din, hûn dikarin wê bi rê ve bibinLocalExecutor, hetta ku we komikek Celery heye.- Hema hema heman tiştî dike
test, tenê di binkeyan de jî tiştek nanivîse. connectionsdestûrê dide afirandina girseya girêdanên ji şêlê.
- - rêgezek têkiliyek zehf hişk, ku ji bo pêvekan tête armanc kirin, û ne bi destên piçûk di nav wê de diheje. Lê kî wê rê li me bigire ku em biçin
/home/airflow/dags, bireveipythonû dest bi tevliheviyê bikin? Mînakî, hûn dikarin hemî girêdan bi koda jêrîn derxînin:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Girêdana bi metadanûsa Airflow. Ez nivîsandina wê pêşniyar nakim, lê girtina dewletên peywirê ji bo metrîkên cihêreng ên taybetî dikare ji her yek ji API-yê zûtir û hêsantir be.
Em bêjin ku ne hemî peywirên me bêhêz in, lê carinan dikarin bikevin, û ev normal e. Lê çend astengî jixwe gumanbar in, û pêdivî ye ku were kontrol kirin.
Hay ji SQL hebin!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
references
Û bê guman, deh girêdanên yekem ên ji weşandina Google-ê naveroka peldanka Airflow ji nîşangirên min in.
- - Helbet, divê em ji ofîsê dest pê bikin. belgekirin, lê kî talîmatan dixwîne?
- - Belê, bi kêmanî pêşniyarên ji afirîneran bixwînin.
- - destpêka destpêkê: navgîniya bikarhêner di wêneyan de
- - têgehên bingehîn baş hatine vegotin, heke (ji nişka ve!) we tiştek ji min fêm nekir.
- - rêbernameyek kurt ji bo sazkirina komek hewayê.
- - Hema hema heman gotara balkêş, ji bilî belkî bêtir formalîzmê, û kêmtir mînakan.
- - di derbarê xebata digel Celery de.
- - li ser bêhêziya peywiran, barkirina bi nasnameyê li şûna tarîx, veguherîn, avahiya pelê û tiştên din ên balkêş.
- - girêdayîbûna peywiran û Rêbaza Trigger, ya ku min tenê di derbasbûnê de behs kir.
- - meriv çawa di nexşerê de hin "xebatên wekî ku tê xwestin" bi ser keve, daneyên winda bar bike û karan pêşî lê bigire.
- - Pirsên SQL yên kêrhatî ji metadata Airflow re.
- - di derbarê afirandina senzorek xwerû de beşek kêrhatî heye.
- - Nîşanek kurt a balkêş di derbarê avakirina binesaziyek li ser AWS ji bo Zanistiya Daneyê.
- - Çewtiyên hevpar (gava ku kesek hîn rêwerzan nexwendiye).
- - Bişirîn ka mirov çawa şîfreyan hildigire, her çend hûn tenê dikarin Têkiliyan bikar bînin.
- - şandina nepenî ya DAG, avêtina çarçoweyê di fonksiyonan de, dîsa li ser girêdayiyan, û her weha der barê destpêkirina peywirê de jî derbas kirin.
- - li ser bikaranîna
default argumentsиparamsdi şablonan de, her weha Guherbar û Girêdan. - - Çîrokek li ser ka plansaz çawa ji bo Airflow 2.0 amade dike.
- - gotarek hinekî kevnar di derbarê bicîhkirina koma me de
docker-compose. - - Karên dînamîkî bi karanîna şablon û şandina çarçovê.
- - Agahiyên standard û xwerû bi nameyê û Slack.
- - Karên şaxkirinê, makro û XCom.
Û girêdanên ku di gotarê de têne bikaranîn:
- - cîhgiran ji bo karanîna di şablonan de peyda dibin.
- - Çewtiyên hevpar ên dema çêkirina dakêşan.
- -
docker-composeji bo ceribandin, debugging û bêtir. - - Ji bo Telegram REST API-ê pêça Python.
Source: www.habr.com




