Hi, Waxaan ahay Dmitry Logvinenko - Injineer Xogta ee Waaxda Falanqaynta ee kooxda Vezet ee shirkadaha.
Waxaan kuu sheegi doonaa qalab cajiib ah oo loogu talagalay horumarinta hababka ETL - Apache Airflow. Laakin socodka hawadu waa mid aad u kala duwan oo dhinacyo badan leh oo waa inaad si dhow u eegtaa xitaa haddii aadan ku lug lahayn socodka xogta, laakiin aad u baahan tahay inaad si xilliyo ah u bilowdo wax kasta oo habab ah oo aad la socoto fulinta.
Oo haa, ma sheegi doono oo kaliya, laakiin sidoo kale muujin: barnaamijku wuxuu leeyahay kood badan, sawirro iyo talooyin.

Waxa aad inta badan aragto marka aad Google ka gasho kelmadda Airflow / Wikimedia Commons
Tusmada
Horudhac
Apache Airflow waa sida Django:
- ku qoran Python
- waxaa jira admin panel weyn,
- la ballaarin karo si aan xad lahayn
- kaliya ka fiican, waxaana loo sameeyay ujeedooyin kala duwan, kuwaas oo ah (sida ku qoran kat ka hor):
- socodsiinta iyo la socodka hawlaha tiro aan xadidnayn oo mashiinno ah (sida badan oo Selery / Kubernetes ah iyo damiirkaagu ayaa kuu oggolaanaya)
- oo leh jiil shaqo firfircoon oo aad u fudud in la qoro oo la fahmo koodka Python
- iyo awoodda lagu xidho xog kasta iyo API-yada midba midka kale iyadoo la adeegsanayo qaybaha diyaarsan iyo fiilooyinka guriga lagu sameeyay (taas oo aad u fudud).
Waxaan u isticmaalnaa Apache Airflow sida tan:
- waxaan ka soo aruurineynaa ilo kala duwan (tusaale badan oo SQL Server iyo PostgreSQL ah, API-yo kala duwan oo leh cabbirada codsiga, xitaa 1C) ee DWH iyo ODS (waxaan leenahay Vertica iyo Clickhouse).
- sidee u hormartay
cron, kaas oo ka bilaaba hababka xog ururinta ee ODS, oo waliba la socda dayactirkooda.
Ilaa dhawaan, baahiyahayaga waxaa daboolay hal server oo yar oo leh 32 kore iyo 50 GB oo RAM ah. Socodka Hawada, tani waxay ka shaqeysaa:
- dheeraad ah oo ku 200 oo dag (dhab ahaantii socodka shaqada, kaas oo aanu ku buuxinay hawlaha),
- mid walba celcelis ahaan 70 hawlood,
- Wanaaggani wuxuu bilaabmayaa (sidoo kale celcelis ahaan) saacadii mar.
Iyo sida aan u ballaarinay, hoos ayaan ku qori doonaa, laakiin hadda aan qeexno über-problem in aan xallin doono:
Waxaa jira saddex SQL Servers oo asal ah, mid kastaa wuxuu leeyahay 50 xog ururin - tusaale ahaan hal mashruuc, siday u kala horreeyaan, waxay leeyihiin qaab isku mid ah (ku dhawaad meel kasta, mua-ha-ha), taas oo macnaheedu yahay in mid kastaa leeyahay miis amar ah (nasiib wanaag, miis leh taas magaca waxaa lagu riixi karaa ganacsi kasta). Waxaan qaadanaa xogta annagoo ku darayna goobaha adeegga (Serer-ka isha, xogta isha, Aqoonsiga shaqada ETL) oo si fudud ugu tuurno, dheh, Vertica.
Aan tagno!
Qaybta ugu muhiimsan, wax ku ool ah (iyo aragti yar)
Maxaynu (iyo adiga)
Marka geeduhu way weynaayeen oo aan fududaa SQL-schik mid ka mid ah tafaariiqda Ruushka, waxaanu ku khiyaanay hababka ETL aka socodka xogta iyadoo la adeegsanayo laba qalab oo noo diyaar ah:
- Xarunta Korontada ee Informatica - nidaam aad u faafa, aad u wax soo saar leh, leh qalab u gaar ah, qaabayntiisa. Waxaan isticmaalay ilaah haku xafido 1% awoodeeda. Waa maxay sababtu? Waa hagaag, marka hore, interface this, meel ka 380s, maskax ahaan cadaadis nagu saaray. Marka labaad, ka-hortaggani waxa loogu talagalay hannaan-socod aad u qurux badan, dib-u-isticmaalka qaybaha cadhada leh iyo khiyaamooyinka kale ee-ganacsiga-muhiimka ah. Ku saabsan xaqiiqda ah inay ku kacayso, sida garabka Airbus AXNUMX / sanadka, waxba ma odhan doono.
Iska jir, sawirku wax yar ayuu wax yeeli karaa dadka ka yar 30 sano

- SQL Server isdhexgalka - Waxaan u isticmaalnay saaxiibkan socodka mashruuca dhexdiisa. Hagaag, dhab ahaantii: waxaan horay u isticmaalnay SQL Server, waxayna noqon doontaa si uun caqli gal ah inaanan isticmaalin qalabkeeda ETL. Wax kasta oo ku jira waa wanaagsan yihiin: labadaba interface-ku waa qurux badan yahay, iyo warbixinnada horumarka ... Laakiin tani maaha sababta aan u jecelnahay alaabta software, oh, maahan tan. Noocee
dtsx(kaas oo ah XML leh qanjidhada la isku shaandheeyay ee kaydinta) waan awoodnaa, laakiin waa maxay macnaha? Sidee ku saabsan samaynta xirmo hawleed ka soo jiidaya boqollaal miis mid ka mid ah server-ka mid kale? Haa, waa maxay boqol, fartaadu waxay ka dhici doontaa labaatan qaybood, adigoo gujinaya badhanka jiirka. Laakiin xaqiiqdii waxay u egtahay moodada:
Dhab ahaantii waxaan raadinay waddooyin looga baxo. Xataa xitaa ku dhawaad u yimid qalab-dhaliye xirmo SSIS iskiis u qoray...
…ka dibna shaqo cusub ayaa i heshay. Iyo Apache Airflow ayaa igu dul maray.
Markii aan ogaaday in sharraxaadda habka ETL ay yihiin koodka Python fudud, kaliya kumaan dheel dheel farxad darteed. Tani waa sida qulqulka xogta loo habeeyey oo loo kala qaybiyey, iyo ku shubista miisaska hal qaab oo ka yimid boqolaal xog ururin ah oo la geliyay hal bartilmaameed ayaa noqday arrin ku saabsan code Python hal iyo badh ama laba 13 "shashooyin.
Ururinta kooxda
Yaynaan qabanqaabinin dugsiga barbaarinta oo dhammaystiran, oo yeynan ka hadlin waxyaalaha muuqda ee halkan ku yaal, sida rakibida hawada hawada, xogta aad dooratay, Selery iyo kiisaska kale ee lagu sifeeyay doomaha.
Si aan isla markiiba u bilaabi karno tijaabooyinka, ayaan sawiray docker-compose.yml kaas oo:
- Aynu dhab ahaantii kor u qaadno Socodka hawada: Jadwalka, Webserver. Ubaxa ayaa sidoo kale ku wareegaya halkaas si uu ula socdo hawlaha Selery (maxaa yeelay mar hore ayaa lagu riixay
apache/airflow:1.10.10-python3.7, laakiin waxba nagama hayso) - PostgreSQL, kaas oo Airflow ay ku qori doonto macluumaadka adeeggeeda (xogta jadwalka, tirakoobka fulinta, iwm.), iyo Celery waxay calaamadin doontaa hawlaha la dhammeeyey;
- Redis, kaas oo u shaqayn doona sidii dalaal hawsha Selery;
- Shaqaale celery, kaas oo si toos ah ugu hawlgeli doona fulinta hawlaha.
- Gal gal
./dagswaxaanu ku dari doonaa faylalkayaga sharaxaadda dags. Waxaa lagu soo qaadi doonaa duullimaad, markaa looma baahna in la isku rogo dhammaan xirmooyinka ka dib hindhiso kasta.
Meelaha qaarkood, koodhka tusaalooyinka si buuxda looma muujin (si aanay u dhicin qoraalka), laakiin meel ayaa wax laga beddelay habka. Tusaalooyinka koodhka shaqada oo dhamaystiran waxa laga heli karaa kaydka .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerOgeysiis:
- Isku dhafka halabuurka, waxaan inta badan ku tiirsanaa sawirka si fiican loo yaqaan - hubi inaad hubiso. Waxaa laga yaabaa inaadan u baahnayn wax kale noloshaada.
- Dhammaan goobaha socodka hawadu waxa lagu heli karaa iyada oo keliya maaha
airflow.cfg, laakiin sidoo kale iyada oo loo marayo doorsoomayaasha deegaanka (mahadsanid horumarinta), kuwaas oo aan si xaasidnimo ah uga faa'iidaystay. - Dabiici ahaan, maaha wax-soo-saar-diyaar ah: Si ula kac ah uma dhigin garaaca wadnaha weelasha, kuma dhibin amniga. Laakiin waxaan sameeyay ugu yaraan ku habboon tijaabiyeyaasheenna.
- Ogow taas:
- Galka daggu waa inuu noqdaa mid ay heli karaan jadwalaha iyo shaqaalaha labadaba.
- Isla sidaas oo kale ayaa khuseeysa dhammaan maktabadaha qolo saddexaad - waa in dhammaantood lagu rakibaa mashiinnada jadwal-dejiye iyo shaqaale leh.
Hagaag, hadda way fududahay:
$ docker-compose up --scale worker=3Ka dib markii wax walba kor u kacaan, waxaad eegi kartaa interneedka webka:
- Socodka hawada:
- Ubax:
Fikradaha aasaasiga ah
Haddii aadan waxba ka fahmin dhammaan "daga"kan, markaa halkan waa qaamuus gaaban:
- Jadwalka - adeerka ugu muhiimsan ee Airflow, kaas oo koontaroolaya in robotyadu ay si adag u shaqeeyaan, oo aan ahayn qof: kormeera jadwalka, cusbooneysiinta dags, bilaabaya hawlaha.
Guud ahaan, noocyadii hore, wuxuu lahaa dhibaatooyin xagga xusuusta ah (maya, ma aha amnesia, laakiin daadinta) iyo cabbirka dhaxalka xitaa wuxuu ku sii jiray qaab-dhismeedka
run_duration- inta u dhaxaysa dib u bilaabashada. Laakiin hadda wax walba waa hagaagsan yihiin. - DAG (loo yaqaan "dag") - "garaafka acyclic toosan", laakiin qeexitaan noocan oo kale ah ayaa u sheegi doona dad yar, laakiin dhab ahaantii waa weel loogu talagalay hawlaha isdhexgalka (hoos fiiri) ama analooga Xidhmada SSIS iyo Workflow in Informatica .
Marka laga soo tago dalagyada, waxaa laga yaabaa inay weli jiraan subdags, laakiin waxay u badan tahay inaanan heli doonin iyaga.
- DAG Run - dag bilowga ah, kaas oo loo qoondeeyey u gaar ah
execution_date. Dagrans of dag la mid ah waxay u shaqayn karaan si isbarbar ah (haddii aad ka dhigtay hawlahaaga mid awood leh, dabcan). - Hawlwadeennada waa qaybo ka mid ah kood ka masuulka ah fulinta fal gaar ah. Waxa jira saddex nooc oo hawl-wadeenno ah:
- tallaabosida aan ugu jecelnahay
PythonOperator, kaas oo fulin kara kood kasta (oo ansax ah) Python; - wareejinta, kuwaas oo xogta meelba meel u kala qaada, dheh,
MsSqlToHiveTransfer; - shidma Dhanka kale, waxay kuu ogolaaneysaa inaad ka falceliso ama aad hoos u dhigto fulinta dheeraadka ah ee toogashada ilaa ay dhacdo dhacdo.
HttpSensorjiidi kara barta dhamaadka ee la cayimay, iyo marka jawaabta la rabo la sugo, bilow wareejintaGoogleCloudStorageToS3Operator. Maskaxda wax weyddiinta ayaa ku weydiin doonta: “Sabab? Ka dib oo dhan, waxaad ku samayn kartaa ku celcelinta saxda ah ee hawlwadeenka!" Kadibna, si aan loo xirin barkada hawlaha ee hawlwadeennada shaqada laga joojiyay. Dareemuhu wuu bilaabmaa, hubiyaa wuuna dhimanayaa ka hor isku dayga xiga.
- tallaabosida aan ugu jecelnahay
- Hawl - hawl-wadeennada la caddeeyey, nooca ay doonaan ha yihiine, kuna dheggan toorayaasha waxa loo dallacay darajada shaqada.
- tusaale ahaan hawsha - markii qorsheeyaha guud uu go'aansaday in la joogo waqtigii loo diri lahaa hawlaha dagaalka ee shaqaalaha hawl-wadeenka ah ( isla markiiba goobta, haddii aan isticmaalno
LocalExecutorama meel fog laga galo xaaladduCeleryExecutor), waxa ay ku meelaysaa macnaha guud iyaga (ie, set of doors - execution standards), balaadhisa amarka ama hambalyada weydiinta, oo isku daraa iyaga.
Waxaan abuurnaa hawlo
Marka hore, aynu dulmarno nidaamka guud ee dhogortayada, ka dibna waxaan u dhex geli doonaa faahfaahin dheeraad ah iyo in ka badan, sababtoo ah waxaan isticmaalnaa xalal aan sahlanayn.
Marka, qaabkeeda ugu fudud, daggu wuxuu u ekaan doonaa sidan:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Aan qiyaasno:
- Marka hore, waxaan soo dejineynaa libyada lagama maarmaanka ah iyo wax kale;
sql_server_dsWaaList[namedtuple[str, str]]oo leh magacyada isku xirka Airflow Connections iyo database-yada aan ka soo qaadan doono saxankeena;dag- ku dhawaaqida daggeena, oo ay tahay inay daruuri noqotoglobals()Haddii kale socodka hawadu ma heli doono. Doug sidoo kale wuxuu u baahan yahay inuu yiraahdo:- waa maxay magaciisa
orders- Magacani wuxuu markaas ka muuqan doonaa interface-ka shabakadda, - in uu shaqayn doono laga bilaabo saqda dhexe ee sideedda Luulyo.
- waana inay socotaa, qiyaastii 6dii saacadoodba mar (Ragga adag ee halkan jooga
timedelta()la ogol yahaycron-line0 0 0/6 ? * * *, kuwa yar qabow - odhaah ah sida@daily);
- waa maxay magaciisa
workflow()qaban doona shaqada ugu weyn, laakiin hadda ma aha. Hadda, waxa aanu ku shubi doonaa qoraalkayaga qoraalka.- Oo hadda sixirka fudud ee abuurista hawlaha:
- waxaan ku dhex ordanaa ilahayada;
- bilaabid
PythonOperator, kaas oo fulin doona dummy ourworkflow(). Ha iloobin inaad qeexdo magac u gaar ah (dag ku dhex jira) hawsha oo xidh xidhidha lafteeda. Calanprovide_contextmarkeeda, waxay ku shubi doontaa doodo dheeraad ah shaqada, taas oo aan si taxadar leh u ururin doono isticmaalka**context.
Hadda, waa intaas. Waxaan helnay:
- dag cusub oo ku jira interface webka,
- boqol iyo badh hawlood oo si barbar socda loo fulin doono (haddii Hawo socodka, goobaha Selery iyo awoodda adeegaha ay oggolaadaan).
Waa hagaag, ku dhawaad helay.

Yaa rakibi doona ku-tiirsanaanta?
Si aan u fududeeyo waxan oo dhan, waan soo galay docker-compose.yml farsamaynta requirements.txt dhammaan qanjidhada.
Hadda way tagtay:

Afar geesoodka cawl waa tusaaleyaal hawleed uu farsameeyo jadwaleeyaha.
Waxoogaa waanu sugayna, hawshu waxay soo diyaariyeen shaqaaluhu:

Kuwa cagaarka ah, dabcan, waxay si guul leh u dhameeyeen shaqadooda. Casaanku aad uma guulaysto.
By habka, ma jiraan wax folder on prod our
./dags, Ma jiraan wax wada shaqayn ah oo u dhexeeya mishiinnada - dhammaan daggu way jiifaangiton Gitlab, iyo Gitlab CI waxay u qaybisaa cusboonaysiinta mashiinada marka la isku daromaster.
Wax yar oo ku saabsan Ubaxa
Inta ay shaqaaluhu garaacayaan mujurucyadayada, aynu xasuusano qalab kale oo wax ina tusi kara - Ubax.
Bogga ugu horreeya oo leh macluumaad kooban oo ku saabsan qanjidhada shaqaalaha:

Bogga ugu daran ee leh hawlo shaqo galay:

Bogga ugu caajiska badan ee leh heerka dullaalkeena:

Bogga ugu quruxda badan wuxuu wataa garaafyada heerka shaqada iyo waqtigooda fulinta:

Waxaan soo rarnaa kuwa la raray
Markaa, dhammaan hawlihii la shaqeeyay, waxaad qaadi kartaa dhaawaca.

Oo waxaa jiray dad badan oo ku dhaawacmay - hal sabab ama mid kale. Marka la eego isticmaalka saxda ah ee socodka hawada, meelahan aad u yar waxay muujinayaan in xogtu aysan xaqiiqdii imaan.
Waxaad u baahan tahay inaad daawato diiwaanka oo aad dib u bilowdo tusaalooyinka hawsha dhacay.
Markaad gujiso labajibbaaran kasta, waxaan arki doonaa ficilada diyaar noo ah:

Waad qaadan kartaa oo nadiifin kartaa wixii dhacay. Taasi waa, waxaan iloobin in wax ay ku fashilmeen halkaas, isla hawsha tusaale ahaan waxay tagi doontaa jadwalka.

Way caddahay in tan lagu sameeyo jiirka oo dhan afar geesoodka cas ma aha mid aad u bini'aadanti - tani ma aha waxa aan ka fileyno Airflow. Dabcan, waxaan haysanaa hubka wax gumaada: Browse/Task Instances

Aan halmar doorano wax walba oo dib u dhigno eber, dhagsii shayga saxda ah:

Nadiifinta ka dib, tagsiisyadu waxay u egyihiin sidan (waxay durba sugayaan jadwalaha inuu jadwaleeyo):

Isku xirka, jillaabyada iyo doorsoomayaasha kale
Waa markii la eegi lahaa DAG soo socda, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Qof kastaa waligiis ma sameeyay warbixin cusub? Tani waa iyada mar kale: waxaa jira liis ilo laga helayo halka laga helo xogta; waxaa jira liis meesha la dhigo; Ha iloobin in aad honk marka wax walba dhaceen ama jabeen (si fiican, tani maaha mid annaga nagu saabsan, maya).
Aan mar kale dhex galno faylka oo aan eegno waxyaabaha cusub ee dahsoon:
from commons.operators import TelegramBotSendMessage- Ma jiraan wax naga hor istaagaya in aan samayno hawl-wadeenno noo gaar ah, taas oo aanu ka faa'iidaysanay in aanu samaynay duub yar oo farriimaha loo diro Unlocked. (Waxaan ka hadli doonaa wax badan oo ku saabsan hawlwadeenkan hoos);default_args={}- dag waxay u qaybin kartaa isla doodaha dhammaan hawl-wadeenadeeda;to='{{ var.value.all_the_kings_men }}'- beertatoMa yeelan doonno mid adag, laakiin si firfircoon ayaa loo soo saaray iyadoo la adeegsanayo Jinja iyo doorsoome leh liis emails ah, oo aan si taxadar leh u geliyayAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS- shuruuda lagu bilaabayo hawlwadeenka. Xaaladeena, warqaddu waxay u duuli doontaa madaxda kaliya haddii dhammaan ku-tiirsanaanta ay shaqeeyaan si guul leh;tg_bot_conn_id='tg_main'- doodahaconn_idaqbal aqoonsiyada xidhiidhka ee aan ku abuurnoAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- fariimaha ku jira Telegram waxay duuli doonaan oo keliya haddii ay jiraan hawlo dhacay;task_concurrency=1- Waxaan mamnuucnay in isku mar la bilaabo dhowr hawlood oo hal hawl ah. Haddii kale, waxaan heli doonaa bilaabista isku mar ee dhowr ahVerticaOperator(eeg hal miis);report_update >> [email, tg]- dhammaanVerticaOperatorIsku soo duub dirida waraaqaha iyo fariimaha, sida tan:

Laakiin maadaama hawlwadeenada ogeysiisyadu ay leeyihiin shuruudo bilaabista oo kala duwan, mid keliya ayaa shaqayn doona. Muuqaalka Geedka, wax walbaa waxay u egyihiin muuqaal yar:

Waxaan dhowr eray ka odhan doonaa macros iyo asxaabtooda - doorsoomayaasha.
Macros waa meel-hayeyaasha Jinja oo ku beddeli kara macluumaad kala duwan oo waxtar leh doodaha hawlwadeennada. Tusaale ahaan, sida tan:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} balaadhin doona nuxurka doorsoomiyaha macnaha guud execution_date qaab ahaan YYYY-MM-DD: 2020-07-14. Qaybta ugu fiican ayaa ah in doorsoomayaasha macnaha guud lagu qodbay tusaale hawleed gaar ah ( labajibbaaran oo ku yaal Muuqaalka Geedka), iyo marka dib loo bilaabo, meeleyayaashu waxay ku fidi doonaan isla qiimayaal.
Qiimayaasha loo qoondeeyay waxaa lagu arki karaa iyadoo la isticmaalayo badhanka la sameeyay ee tusaale hawl kasta. Tani waa sida hawsha warqad dirida:

Oo sidaas awgeed hawsha fariinta dirida:

Liis dhammaystiran oo macros-ku-dhismay oo ah nooca ugu dambeeyay ee la heli karo ayaa laga heli karaa halkan:
Intaa waxaa dheer, iyadoo la kaashanayo plugins, waxaan ku dhawaaqi karnaa macros noo gaar ah, laakiin taasi waa sheeko kale.
Marka lagu daro waxyaabaha horay loo qeexay, waxaan ku bedeli karnaa qiyamka doorsoomayaashayada (waxaan horeyba ugu isticmaalay koodka kore). Aan ku abuurno Admin/Variables dhowr arrimood:

Wax kasta oo aad isticmaali karto:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Qiimuhu wuxuu noqon karaa scalar, ama wuxuu kaloo noqon karaa JSON. Haddii ay dhacdo JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}Kaliya isticmaal jidka furaha la rabo: {{ var.json.bot_config.bot.token }}.
Waxaan dhab ahaan odhan doonaa hal erey oo waxaan tusi doonaa hal sawir oo ku saabsan isku xirnaanta. Wax walba waa kuwa hoose: bogga Admin/Connections Waxaan abuurnaa xiriir, ku darno logins / ereyada sirta ah iyo xuduudo gaar ah oo dheeraad ah halkaas. Sida tan:

Erayada sirta ah waa la qarin karaa (si aad uga fiican kan caadiga ah), ama waxaad ka tagi kartaa nooca isku xirka (sida aan ku sameeyay tg_main) - Xaqiiqdu waxay tahay in liiska noocyada ay ku adag yihiin moodooyinka Airflow oo aan la ballaarin karin iyada oo aan la gelin koodhka isha (haddii si lama filaan ah aanan wax Google ah u gelin, fadlan i sax), laakiin ma jiraan wax naga joojinaya inaan helno credits kaliya magac.
Waxa kale oo aad samayn kartaa dhowr xiriir oo isku magac ah: kiiskan, habka BaseHook.get_connection(), kaas oo naga helaya xidhiidhada magac ahaan, ayaa ku siin doona random laga bilaabo dhowr magacyo (waxa ay noqon doontaa mid macquul ah in la sameeyo Round Robin, laakiin aan ku dhaafno damiirka horumarinta hawada).
Doorsoomayaasha iyo Xidhiidhada hubaal waa qalab fiican, laakiin waa muhiim inaadan lumin dheelitirka: qaybaha socodkaaga aad ku kaydiso koodhka laftiisa, iyo qaybaha aad siiso Airflow kaydinta. Dhinaca kale, waxay noqon kartaa mid ku habboon in si degdeg ah loo beddelo qiimaha, tusaale ahaan, sanduuqa boostada, iyada oo loo marayo UI. Dhanka kale, tani wali waa ku soo noqoshada jiirka jiirka, kaas oo aan (I) rabnay inaan ka takhalusno.
Ku shaqeynta isku xirka waa mid ka mid ah hawlaha qabatooyin. Guud ahaan, xidhmooyinka hawadu waa dhibco loogu xidho adeegyada iyo maktabadaha cid saddexaad. Tusaale, JiraHook waxay noo furaysaa macmiil si aanu ula falgeli karno Jira (waxaad u dhaqaajin kartaa hawlaha hore iyo dib), iyo iyadoo la kaashanayo SambaHook waxaad ku riixi kartaa faylka deegaanka smb-dhibic.
Kala soocida hawlwadeenka gaarka ah
Waxaana ku dhawaanay inaan eegno sida loo sameeyay TelegramBotSendMessage
Code commons/operators.py oo leh hawlwadeenka dhabta ah:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Halkan, sida wax kasta oo kale oo ku jira qulqulka hawada, wax walba waa mid aad u fudud:
- Laga dhaxlay
BaseOperator, kaas oo fuliya waxyaabo aad u yar oo gaar ah socodka hawada (fiiri wakhtiga firaaqadaada) - Beeraha lagu dhawaaqay
template_fields, kaas oo Jinja uu raadin doono macros si loo farsameeyo. - U diyaariyey doodaha saxda ah
__init__(), deji khaladaadka meesha loo baahdo. - Ma aan iloobin bilawgii awoowga sidoo kale.
- furay jillaab u dhigma
TelegramBotHookka helay shay macmiil ah. - Habka la dulmay (la qeexay).
BaseOperator.execute(), taas oo Airfow ay garaaci doonto marka la gaaro wakhtiga la bilaabayo hawlwadeenka - gudaha waxaan ku hirgelin doonaa ficilka ugu muhiimsan, illowda gelitaanka. (Waxaan galnaa, jidka, isla markiibastdoutиstderr- Dulqulka hawadu wuxuu dhexda u geli doonaa wax walba, si qurux badan ayuu u duubi doonaa, wuu dumin doonaa meeshii loo baahdo.)
Aan aragno waxa aan haysano commons/hooks.py. Qaybta hore ee faylka, oo leh jillaab lafteeda:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientXitaa ma aqaano waxaan halkan ku sharxo, waxaan kaliya ku xusi doonaa qodobada muhiimka ah:
- Waxaan dhaxalnay, ka fikirnaa doodaha - inta badan waxay noqon doontaa mid:
conn_id; - Hababka caadiga ah ee xad-dhaafka ah: Waxaan xaddiday naftayda
get_conn(), kaas oo aan ka helo xuduudaha isku xirka magaca oo kaliya hel qaybtaextra(tani waa goob JSON ah), kaas oo aan (sida waafaqsan tilmaamahayga!) dhigay calaamada bot Telegram:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Waxaan abuuraa tusaale ka mid ah our
TelegramBot, isagoo siinaya calaamad gaar ah.
Waa intaas. Waxaad ka heli kartaa macmiilka jillaab adoo isticmaalaya TelegramBotHook().clent ama TelegramBotHook().get_conn().
Iyo qaybta labaad ee feylka, oo aan u sameeyo microwrapper Telegram REST API, si aan isku mid u jiidin. hal hab sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Habka saxda ah waa in la isku daro:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- in plugin, geli meel dadweynaha, oo sii Open Source.
Intaan ku guda jirnay waxaas oo dhan, warbixintayada ayaa ku guuleysatay inay si guul leh u fashilanto oo fariin qalad ah ii soo dirto kanaalka. Waxaan doonayaa inaan hubiyo inay khaldan tahay...

Wax baa ka jabay xayndaabkayaga! Miyaanay ahayn tii aanu filaynay? Dhab ahaantii!
Ma shubaysaa?
Ma dareentay inaan wax u xiisay? Waxaad mooddaa in uu balan qaaday in uu xogta SQL Server-ka u wareejinayo Vertica, ka dibna uu ka qaaday oo uu mowduucii ka sii dhaqaaqay, faqashtii!
Arxan-darradani waxay ahayd mid ula kac ah, waxay ahayd inaan si fudud kuu qeexo erey-bixinno. Hadda waad sii socon kartaa.
Qorshahayagu wuxuu ahaa sidan:
- Dag dag
- Shaqo abuur
- Bal eeg sida ay wax walba u qurux badan yihiin
- U qoondee nambarada fadhiga si aad u buuxiso
- Ka hel xogta Server-ka SQL
- Ku rid xogta Vertica
- Ururi tirakoob
Markaa, si aan waxan oo dhan kor ugu qaadno, waxaan ku daray wax yar oo noo docker-compose.yml:
docker-ka kooban.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyHalkaa waxaanu ku soo kordhinaynaa:
- Vertica martigaliyaha ahaan
dwhoo leh dejinta ugu default, - Saddex xaaladood oo SQL Server ah,
- waxaan ku buuxineynaa keydka macluumaadka ee dambe xogta qaar (xaaladna ha eegin
mssql_init.py!)
Waxaan bilownaa dhammaan wanaagga annagoo kaashanayna amar xoogaa ka dhib badan kii ugu dambeeyay:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Waxa mucjisadayada randomizer soo saaray, waxaad isticmaali kartaa shayga Data Profiling/Ad Hoc Query:

Waxa ugu weyni maaha in la tuso falanqeeyayaasha
ku faahfaahiyaan Kulamada ETL Ma yeeli doono, wax walba waa wax aan macquul ahayn: waxaan sameyneynaa saldhig, calaamad ayaa ku jirta, wax walba waxaan ku duubnaa maamulaha macnaha guud, oo hadda waxaan sameynaa tan:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15fadhi.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passWaqtigu waa yimid ururin xogtayada laga bilaabo boqol iyo badh miis. Aynu tan ku samayno annagoo kaashanayna khadadka aan micnaha lahayn:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Iyada oo la kaashanayo jillaab waxaan ka helnaa socodka hawada
pymssql-isku xidhid - Aynu ku beddelno xaddidaadda qaabka taariikhda codsiga - waxaa lagu tuuri doonaa shaqada mashiinka template.
- Quudinta codsigeena
pandasyaa naga heli doonaDataFrame- waxay anfacaysaa mustaqbalka.
Waxaan isticmaalayaa beddelka
{dt}halkii laga isticmaali lahaa cabbirka codsiga%sma aha sababtoo ah waxaan ahay Pinocchio shar ah, laakiin sababtoo ahpandasma xamili karopymssqloo midda u dambaysa siibatoparams: Listinkastoo uu runtii rabotuple.
Sidoo kale ogow horumariyahapymssqlwuxuu go'aansaday inuusan mar dambe taageerin, waxaana la gaaray waqtigii laga guuri lahaapyodbc.
Aynu aragno waxa Airflow ka buuxiyey doodaha shaqadeena:

Haddii aysan jirin xog, markaa ma jirto wax macno ah oo lagu sii wado. Laakiin sidoo kale waa la yaab in la tixgeliyo buuxinta si guul leh. Laakiin tani khalad maaha. A-ah-ah, maxaa la sameeyaa?! Oo waa kan waxa:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException waxay u sheegi doontaa Airflow in aysan jirin khaladaad, laakiin waan ka boodnay hawsha. Interface-ku ma lahaan doono afar gees oo cagaar ah ama casaan ah, laakiin casaan.
Aan tuurno xogtayada tiirar badan:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Tusaale ahaan:
- Xogta aanu ka soo qaadanay amarada,
- Aqoonsiga fadhiga daadadka (waa ka duwanaan doona hawl kasta),
- Xashiish laga soo bilaabo isha iyo dalbashada aqoonsiga - si markaa xogta ugu dambeysa (halkaas oo wax walba lagu shubo hal miis) waxaan haysanaa aqoonsi dalbashada gaarka ah.
Talaabada ciqaabta ah ayaa hadhaysa: wax walba ku shub Vertica. Iyo, si yaab leh, mid ka mid ah siyaabaha ugu cajiibsan uguna hufan ee tan loo sameeyo waa iyada oo loo marayo CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Waxaan sameyneynaa qaabilaad gaar ah
StringIO. pandassi naxariis leh noo gelin doonaaDataFrameqaab ahaanCSV-khadadka.- Aan furno xiriirka Vertica aan jecelnahay oo leh jillaab.
- Oo hadda iyadoo la kaashanayo
copy()si toos ah ugu soo dir xogtayada Vertika!
Waxaan ka qaadi doonaa darawalka inta xariiq ee la buuxiyay, waxaanan u sheegi doonaa maamulaha fadhiga in wax walba ay sax yihiin:
session.loaded_rows = cursor.rowcount
session.successful = TrueWaa intaas.
Iibka, waxaan u abuurnaa saxanka bartilmaameedka gacanta. Halkan waxaan naftayda u ogolaaday mishiin yar:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)waan isticmaalayaa
VerticaOperator()Waxaan abuuraa schema database iyo miis (haddii aysan hore u jirin, dabcan). Waxa ugu weyn waa in si sax ah loo habeeyo ku tiirsanaanta:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadSoo koobid
- Waa hagaag, - ayaa yidhi jiirka yar, - ma aha, hadda
Ma ku qanacsan tahay inaan ahay xayawaanka ugu xun kaynta?
Julia Donaldson, The Gruffalo
Waxaan u maleynayaa haddii asxaabteyda iyo aniga aan tartan lahayn: yaa si dhakhso ah u abuuri doona oo bilaabi doona habka ETL ee xoqan: iyaga oo wata SSIS iyo jiir iyo aniga oo leh Hawo-socodka Wow, waxaan filayaa inaad ku heshiin doontaan inaan ku garaaci doono dhinac walba!
Haddii wax yar ka sii daran, ka dibna Apache Airflow - adoo qeexaya hababka qaabka code barnaamijka - shaqadeyda qabtay badan raaxo iyo raaxaysi badan.
Awooddeeda aan xadidnayn, labadaba marka la eego plug-ins iyo saadaalinta miisaanka, waxay ku siinaysaa fursad aad ku isticmaasho hawada hawada ku dhawaad meel kasta: xitaa wareegga buuxa ee ururinta, diyaarinta iyo socodsiinta xogta, xitaa marka la soo saaro gantaalaha (ilaa Mars, ee koorsada).
Qaybta kama dambaysta ah, tixraaca iyo macluumaadka
Jeegaantii aanu idiin ururinay
start_date. Haa, tani waa horeba meme maxalli ah. Via Doug doodda ugu weynstart_datedhammaan dhaafaan. Si kooban, haddii aad ku qeexdostart_datetaariikhda hadda, iyoschedule_interval- maalin maalmaha ka mid ah, markaas DAG bilaabi doonaa berrito aan ka hor.start_date = datetime(2020, 7, 7, 0, 1, 2)Oo dhib dambe ma jiraan.
Waxa jira khalad kale oo runtime ah oo la xidhiidha:
Task is missing the start_date parameter, kaas oo inta badan tilmaamaya inaad illowday inaad ku xidho hawlwadeenka dag.- Dhammaan hal mashiin. Haa, iyo saldhigyada (Hawo-mareenka laftiisa iyo dahaadhkayaga), iyo server-ka shabakadda, iyo jadwalka, iyo shaqaalaha. Xitaa way shaqeysay. Laakiin waqti ka dib, tirada hawlaha adeegyada ayaa kordhay, iyo markii PostgreSQL uu bilaabay inuu ka jawaabo tusaha 20 s halkii 5 ms, waanu qaadnay oo qaadnay.
- Fuliyaha deegaanka. Haa, weli waanu ku dul fadhinaa, oo waxaynu mar hore soo gaadhnay godkii cidhifkii. Fuliyaha Maxaliga ah ayaa nagu filan ilaa hadda, laakiin hadda waa waqtigii la ballaarin lahaa ugu yaraan hal shaqaale, waana inaan si adag u shaqeyn doonaa si aan ugu guurno CeleryExecutor. Iyo marka la eego xaqiiqda ah inaad ku shaqeyn karto hal mashiin, ma jiraan wax kaa joojinaya inaad isticmaasho Celery xitaa server, kaas oo "dabcan, waligiis ma geli doono wax soo saarka, si daacad ah!"
- Isticmaal la'aan qalab lagu dhisay:
- Connections si loo kaydiyo aqoonsiga adeegga,
- SLA way seegtay in laga jawaabo hawlaha aan ku qabsoomin waqtigii loogu talagalay.
- xcom beddelashada metadata (waxaan idhi metaxogta!) inta u dhaxaysa hawlaha dag.
- Xadgudubka boostada. Hagaag, maxaan dhihi karaa? Ogeysiisyo ayaa loo dejiyay dhammaan ku celcelinta hawlaha dhacay. Hadda shaqadayda Gmail waxa ay haysataa>90k iimaylo ka socda Airflow, iyo xabka mailku waxa uu diiday in uu qaado oo tirtiro in ka badan 100 markiiba.
Dhibaatooyin badan:
Qalab otomaatig ah oo badan
Si aan xataa madaxeena ugu shaqayno oo aan gacmaheena ugu shaqayno, Airflow waxa ay inoo diyaarisay sidan:
- - wuxuu weli leeyahay heerka tijaabada, taas oo aan ka hor istaagin inuu shaqeeyo. Iyadoo la adeegsanayo, kaliya ma heli kartid macluumaadka ku saabsan dags iyo hawlaha, laakiin sidoo kale joojin / bilaabi kartaa dag, samee DAG Run ama barkad.
- - Qalab badan ayaa laga heli karaa khadka taliska kuwaas oo aan ku habboonayn in la isticmaalo iyada oo loo marayo WebUI, laakiin guud ahaan maqan yihiin. Tusaale ahaan:
backfillloo baahan yahay in dib loo bilaabo tusaalooyinka hawsha.
Tusaale ahaan, falanqeeyayaasha ayaa yimid oo yiri: "Adiga, saaxiib, waxaad leedahay wax aan macno lahayn xogta laga bilaabo Janaayo 1 ilaa 13! Hagaaji, Hagaaji, Hagaaji, Hagaaji! Oo waxaad tahay hob sida:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Adeegga saldhiga:
initdb,resetdb,upgradedb,checkdb. run, taas oo kuu ogolaanaysa in aad qabato hal hawl tusaale ah, oo xitaa dhibcaha ku tiirsanaanta oo dhan. Waxaa intaa dheer, waxaad ku socodsiin kartaa viaLocalExecutor, xitaa haddii aad leedahay koox Selery ah.- Wuxuu sameeyaa wax la mid ah
test, kaliya sidoo kale saldhigyada waxba kuma qorto. connectionsogolaanaya abuurista ballaaran ee isku xirka qolofka.
- - hab aad u adag oo isdhexgalka, kaas oo loogu talagalay plugins, oo aan ku dhex qulqulin gacmo yar. Laakin yaa naga diidaya inaan aadno
/home/airflow/dags, orodipythonoo bilaw inaad isku qasto? Waxaad, tusaale ahaan, ku dhoofin kartaa dhammaan xidhiidhada koodka soo socda:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Ku xidhida xog-ururinta badan ee socodka hawada Kuma talinayo inaan u qoro, laakiin helitaanka xaaladaha shaqada ee cabbirro kala duwan oo gaar ah ayaa aad uga dhaqso badan ugana fududaan kara adeegsiga mid ka mid ah API-yada.
Aynu nidhaahno dhammaan hawlaheennu maaha kuwo awood leh, laakiin mararka qaarkood way dhici karaan, tanina waa caadi. Laakiin dhowr xannibaad ayaa durba laga shakisan yahay, waxaana lagama maarmaan noqon doonta in la hubiyo.
Iska jir SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
tixraacyada
Dabcan, tobanka xiriir ee ugu horreeya ee ka soo baxa Google-ka ayaa ah waxa ku jira galka Hawo-mareenka ee bookmarks-kayga.
- - Dabcan, waa in aan ka bilownaa xafiiska. dukumeenti, laakiin yaa akhriya tilmaamaha?
- - Hagaag, ugu yaraan akhri talooyinka hal-abuurayaasha.
- - bilowga hore: interface user ee sawirada
- - fikradaha aasaasiga ah si fiican ayaa loo sharraxay, haddii (si lama filaan ah!) Wax igama fahmin.
- - hage gaaban oo loogu talagalay dejinta koox socodka hawada.
- - ku dhawaad isla maqaal xiiso leh, marka laga reebo laga yaabaa in ka badan formalism, iyo tusaalooyin yar.
- - ku saabsan la shaqaynta Celery.
- - oo ku saabsan awoodda hawlaha, ku shubista aqoonsiga halkii taariikhda, isbeddelka, qaabka faylka iyo waxyaabo kale oo xiiso leh.
- -ku-tiirsanaanta hawlaha iyo Xeerka Kicinta, oo aan ku sheegay oo kaliya marka la gudbo.
- - sida looga gudbo qaar ka mid ah "shaqooyinka sidii loogu talagalay" jadwalka, load xogta lumay iyo mudnaanta hawlaha.
- - su'aalo SQL ah oo faa'iido leh oo ku saabsan xogta badan ee socodka hawada.
- - waxaa jira qayb faa'iido leh oo ku saabsan abuurista dareeme gaar ah.
- - qoraal gaaban oo xiiso leh oo ku saabsan dhisidda kaabayaasha AWS ee Sayniska Xogta.
- - khaladaadka caadiga ah (marka qof weli ma akhriyin tilmaamaha).
- - U dhoola caddee sida ay dadku u kala gooyaan kaydinta ereyada sirta ah, in kasta oo aad isticmaali karto Connections.
- - Gudbinta DAG ee daahsoon, macnaha guud ee shaqooyinka, mar kale ku saabsan ku-tiirsanaanta, iyo sidoo kale ku saabsan ka boodista bilaabista hawsha.
- - ku saabsan isticmaalka
default argumentsиparamsqaab-dhismeedka, iyo sidoo kale Doorsoomayaasha iyo Xiriirinta. - - sheeko ku saabsan sida uu qorshayuhu ugu diyaar garoobayo socodka hawada 2.0.
- - maqaal yar oo duugoobay oo ku saabsan geynta kooxdayada
docker-compose. - - hawlo firfircoon iyadoo la adeegsanayo qaab-dhismeedka iyo gudbinta macnaha guud.
- - ogeysiisyada caadiga ah iyo kuwa caadiga ah ee boostada iyo Slack.
- - Hawlaha laanta, macros iyo XCom.
Iyo xiriirada loo adeegsaday maqaalka:
- - meeleeyayaasha diyaar u ah in loo isticmaalo qaab-dhismeedka.
- - Qaladaadka caadiga ah marka la abuurayo toorayaasha.
- -
docker-composetijaabinta, qaladka iyo wax ka badan - - Python duub oo loogu talagalay Telegram REST API.
Source: www.habr.com




