Bawo, Emi ni Dmitry Logvinenko - Onimọ-ẹrọ data ti Ẹka atupale ti ẹgbẹ Vezet ti awọn ile-iṣẹ.
Emi yoo sọ fun ọ nipa ohun elo iyalẹnu kan fun idagbasoke awọn ilana ETL - Apache Airflow. Ṣugbọn Airflow jẹ wapọ ati multifaceted ti o yẹ ki o wo ni pẹkipẹki paapaa ti o ko ba ni ipa ninu awọn ṣiṣan data, ṣugbọn ni iwulo lati ṣe ifilọlẹ eyikeyi awọn ilana lorekore ati ṣe atẹle ipaniyan wọn.
Ati bẹẹni, Emi kii yoo sọ nikan, ṣugbọn tun fihan: eto naa ni ọpọlọpọ awọn koodu, awọn sikirinisoti ati awọn iṣeduro.

Ohun ti o maa n rii nigbati o ba google ọrọ Airflow / Wikimedia Commons
Tabili ti awọn akoonu
Ifihan
Apache Airflow dabi Django:
- ti a kọ sinu Python
- nronu abojuto nla kan wa,
- unlimited expandable
- dara nikan, ati ṣe fun awọn idi ti o yatọ patapata, eyun (gẹgẹbi a ti kọ ṣaaju kita):
- nṣiṣẹ ati abojuto awọn iṣẹ ṣiṣe lori nọmba ailopin ti awọn ẹrọ (bii ọpọlọpọ Seleri / Kubernetes ati ẹri-ọkan rẹ yoo gba ọ laaye)
- pẹlu iran iṣan-iṣẹ agbara lati rọrun pupọ lati kọ ati loye koodu Python
- ati agbara lati sopọ eyikeyi awọn apoti isura infomesonu ati awọn API pẹlu ara wọn nipa lilo awọn paati ti a ti ṣetan ati awọn afikun ile (eyiti o rọrun pupọju).
A lo Apache Airflow bii eyi:
- a gba data lati orisirisi awọn orisun (ọpọlọpọ awọn SQL Server ati PostgreSQL apeere, orisirisi APIs pẹlu ohun elo metiriki, ani 1C) ni DWH ati ODS (a ni Vertica ati Clickhouse).
- bi o ti ni ilọsiwaju
cron, eyiti o bẹrẹ awọn ilana isọdọkan data lori ODS, ati tun ṣe abojuto itọju wọn.
Titi di aipẹ, awọn iwulo wa ni aabo nipasẹ olupin kekere kan pẹlu awọn ohun kohun 32 ati 50 GB ti Ramu. Ni Airflow, eyi ṣiṣẹ:
- siwaju sii 200 agba (Nitootọ ṣiṣan iṣẹ, ninu eyiti a ṣe awọn iṣẹ ṣiṣe),
- ni kọọkan lori apapọ 70 awọn iṣẹ-ṣiṣe,
- Oore yii bẹrẹ (tun ni apapọ) lẹẹkan wakati kan.
Emi yoo kọ nipa bii a ṣe gbooro si isalẹ, ṣugbọn nisisiyi jẹ ki a ṣalaye iṣẹ-ṣiṣe über ti a yoo yanju:
Awọn olupin SQL orisun mẹta wa, ọkọọkan pẹlu awọn apoti isura infomesonu 50 - awọn apẹẹrẹ ti iṣẹ akanṣe kan, ni atele, wọn ni eto kanna (fere nibikibi, mua-ha-ha), eyiti o tumọ si pe ọkọọkan ni tabili Awọn aṣẹ (O daa, tabili pẹlu iyẹn). orukọ le jẹ Titari sinu eyikeyi iṣowo). A gba data naa nipa fifi awọn aaye iṣẹ kun (olupin orisun, ibi ipamọ data orisun, ID iṣẹ-ṣiṣe ETL) ati ni irọra sọ wọn sinu, sọ, Vertica.
Lọ!
Apa akọkọ, ilowo (ati imọ-jinlẹ diẹ)
Kini idi ti awa (ati iwọ)
Nigbati awọn igi tobi ati pe Mo rọrun SQL-schik ninu soobu ara ilu Rọsia kan, a scammed awọn ilana ETL aka ṣiṣan data nipa lilo awọn irinṣẹ meji ti o wa fun wa:
- Ile-iṣẹ Agbara Informatica - eto ti ntan kaakiri, iṣelọpọ pupọ, pẹlu ohun elo tirẹ, ẹya tirẹ. Bi o ṣe fẹ, Mo lo 1% ti awọn agbara rẹ. Kí nìdí? O dara, ni akọkọ, wiwo yii fi titẹ ẹmi si wa ni ibikan sẹhin ni awọn ọdun 380. Ni ẹẹkeji, ilodisi yii jẹ apẹrẹ fun awọn ilana ti o wuyi pupọ, atunlo paati ibinu ati awọn ẹtan ile-iṣẹ to ṣe pataki pupọ. A kii yoo sọ ohunkohun nipa otitọ pe o jẹ iye to bi apakan Airbus AXNUMX fun ọdun kan.
Ṣọra, sikirinifoto le ṣe ipalara fun awọn eniyan labẹ ọdun 30 diẹ

- SQL Server Integration Server - a lo ẹlẹgbẹ yii ninu awọn ṣiṣan inu-iṣẹ wa. O dara, ni otitọ: a ti lo SQL Server tẹlẹ, ati pe yoo jẹ aimọgbọnwa bakanna lati ma lo awọn irinṣẹ ETL rẹ. Ohun gbogbo ti o wa ninu rẹ dara: mejeeji ni wiwo jẹ lẹwa, ati awọn ijabọ ilọsiwaju ... Ṣugbọn eyi kii ṣe idi ti a fi fẹran awọn ọja sọfitiwia, oh, kii ṣe fun eyi. Ẹya rẹ
dtsx(eyiti o jẹ XML pẹlu awọn apa ti o dapọ nigbati o ba fipamọ) a le, ṣugbọn kini aaye naa? Bawo ni nipa ṣiṣe akojọpọ awọn iṣẹ-ṣiṣe ti yoo fa awọn tabili ọgọrun lati ọdọ olupin kan si ekeji? Kilode, ọgọrun, ogun ninu wọn yoo jẹ ki ika ika rẹ ṣubu lakoko titẹ bọtini Asin naa. Ṣugbọn dajudaju o dabi asiko diẹ sii:
Dajudaju a wa awọn ọna jade. Ọran paapaa fere wa si olupilẹṣẹ package SSIS ti ara ẹni…
… ati lẹhinna iṣẹ tuntun kan rii mi. Apache Airflow si bori mi lori rẹ.
Nigbati mo rii pe awọn apejuwe ilana ETL jẹ koodu Python rọrun, Mo kan ko jo fun ayọ. Eyi ni bii awọn ṣiṣan data ṣe jẹ ẹya ati iyatọ, ati sisọ awọn tabili pẹlu eto kan lati awọn ọgọọgọrun awọn data data sinu ibi-afẹde kan di ọrọ ti koodu Python ni ọkan ati idaji tabi meji awọn iboju 13 ”.
Nto iṣupọ
Jẹ ki a ko ṣeto ile-ẹkọ jẹle-osinmi patapata, ki a ma sọrọ nipa awọn nkan ti o han gbangba nibi, bii fifi sori ẹrọ Airflow, ibi ipamọ data ti o yan, Seleri ati awọn ọran miiran ti a ṣalaye ninu awọn docks.
Ki a le lẹsẹkẹsẹ bẹrẹ awọn adanwo, Mo sketched docker-compose.yml ninu eyiti:
- Jẹ ki a gbe ga gaan Fife ategun: Iṣeto, Webserver. Flower yoo tun ti wa ni nyi nibẹ lati bojuto awọn Seleri awọn iṣẹ-ṣiṣe (nitori o ti tẹlẹ ti ti sinu
apache/airflow:1.10.10-python3.7ṣugbọn a ko bikita) - PostgreSQL, Ninu eyiti Airflow yoo kọ alaye iṣẹ rẹ (data oluṣeto, awọn iṣiro ipaniyan, ati bẹbẹ lọ), ati Celery yoo samisi awọn iṣẹ ṣiṣe ti pari;
- Redis, eyi ti yoo ṣiṣẹ bi alagbata iṣẹ-ṣiṣe fun Seleri;
- Osise seleri, eyi ti yoo wa ni olukoni ni taara ipaniyan ti awọn iṣẹ-ṣiṣe.
- Si folda
./dagsA yoo ṣe akojọpọ awọn faili wa pẹlu awọn apejuwe ti awọn ika. Wọn yoo gbe soke lori fo, nitorina ko si iwulo lati gbe gbogbo akopọ lẹhin igbati kọọkan.
Ni diẹ ninu awọn aaye, koodu ti o wa ninu awọn apẹẹrẹ ko han patapata (ki o má ba ṣe idamu ọrọ naa), ṣugbọn ibikan ni o ṣe atunṣe ninu ilana naa. Awọn apẹẹrẹ koodu iṣẹ pipe ni a le rii ni ibi ipamọ .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerAwọn akọsilẹ:
- Ninu apejọ ti akopọ, Mo gbarale pupọ lori aworan ti a mọ daradara - rii daju lati ṣayẹwo. Boya o ko nilo ohunkohun miiran ninu aye re.
- Gbogbo Airflow eto wa ko nipasẹ nikan
airflow.cfg, sugbon tun nipasẹ ayika oniyipada (o ṣeun si awọn Difelopa), eyi ti mo ti irira lo anfani ti. - Nipa ti, kii ṣe iṣelọpọ-ṣetan: Emi ko mọọmọ ko fi awọn aiya sinu awọn apoti, Emi ko ni wahala pẹlu aabo. Sugbon mo ti ṣe awọn kere dara fun wa experimenters.
- Ṣe akiyesi pe:
- Apoti dag gbọdọ wa ni iraye si awọn oluṣeto ati awọn oṣiṣẹ.
- Kanna kan si gbogbo awọn ile-ikawe ẹni-kẹta - gbogbo wọn gbọdọ fi sori ẹrọ lori awọn ẹrọ pẹlu oluṣeto ati awọn oṣiṣẹ.
O dara, bayi o rọrun:
$ docker-compose up --scale worker=3Lẹhin ohun gbogbo ti dide, o le wo awọn atọkun wẹẹbu:
- Fife ategun:
- Aladodo:
Awọn imọran ipilẹ
Ti o ko ba loye ohunkohun ninu gbogbo awọn “dags” wọnyi, lẹhinna eyi ni iwe-itumọ kukuru kan:
- scheduler - aburo pataki julọ ni Airflow, ẹniti o ṣakoso pe awọn roboti ṣiṣẹ takuntakun, kii ṣe eniyan: ṣe abojuto iṣeto, awọn imudojuiwọn dags, awọn ifilọlẹ awọn iṣẹ ṣiṣe.
Ni gbogbogbo, ni awọn ẹya agbalagba, o ni awọn iṣoro pẹlu iranti (rara, kii ṣe amnesia, ṣugbọn awọn n jo) ati paramita ohun-ini paapaa wa ninu awọn atunto.
run_duration- awọn oniwe-tun aarin. Ṣugbọn nisisiyi ohun gbogbo dara. - DAG (aka "dag") - "aworan acyclic ti o darí", ṣugbọn iru itumọ bẹẹ yoo sọ fun eniyan diẹ, ṣugbọn ni otitọ o jẹ eiyan fun awọn iṣẹ ṣiṣe ti o nlo pẹlu ara wọn (wo isalẹ) tabi afọwọṣe ti Package ni SSIS ati Ise-iṣẹ ni Informatica .
Ni afikun si awọn dags, nibẹ ni o le tun jẹ sabdags, sugbon a julọ seese yoo ko gba si wọn.
- DAG Run - initialized dag, eyi ti o ti wa ni sọtọ awọn oniwe-ara
execution_date. Dagrans ti ọkan dag le ṣiṣẹ ni afiwe (ti o ba jẹ pe, dajudaju, o ti jẹ ki awọn iṣẹ ṣiṣe rẹ lagbara). - onišẹ jẹ awọn ege koodu ti o ni iduro fun ṣiṣe iṣe kan pato. Awọn oriṣi mẹta ti awọn oniṣẹ wa:
- igbesebi ayanfẹ wa
PythonOperator, eyi ti o le ṣiṣẹ eyikeyi (wulo) Python koodu; - gbigbe, eyiti o gbe data lati ibi de ibi, sọ pe,
MsSqlToHiveTransfer; - sensọ ni apa keji, yoo gba ọ laaye lati fesi tabi fa fifalẹ ipaniyan siwaju sii ti dag titi iṣẹlẹ yoo fi waye.
HttpSensorle fa awọn pàtó kan endpoint, ati nigbati awọn ti o fẹ esi ti wa ni nduro, bẹrẹ awọn gbigbeGoogleCloudStorageToS3Operator. Okan oniwadii yoo beere pe: “Kilode? Lẹhinna, o le ṣe awọn atunwi taara ninu oniṣẹ ẹrọ!” Ati lẹhinna, ni ibere ki o má ba di adagun awọn iṣẹ-ṣiṣe pẹlu awọn oniṣẹ ti daduro. Sensọ bẹrẹ, ṣayẹwo ati ku ṣaaju igbiyanju atẹle.
- igbesebi ayanfẹ wa
- Išẹ - Awọn oniṣẹ ti a kede, laibikita iru, ati ti a so mọ dag naa ni igbega si ipo iṣẹ-ṣiṣe.
- apẹẹrẹ iṣẹ-ṣiṣe - nigbati oluṣeto gbogbogbo pinnu pe o to akoko lati fi awọn iṣẹ-ṣiṣe ranṣẹ si ogun lori awọn oṣere-iṣẹ (ọtun ni aaye, ti a ba lo.
LocalExecutortabi lati kan latọna ipade ni irú tiCeleryExecutor), o fi aaye kan fun wọn (ie, eto awọn oniyipada - awọn aye ipaniyan), faagun aṣẹ tabi awọn awoṣe ibeere, ati awọn adagun-omi wọn.
A ṣe ipilẹṣẹ awọn iṣẹ-ṣiṣe
Ni akọkọ, jẹ ki a ṣe ilana ilana gbogbogbo ti aja wa, ati lẹhinna a yoo lọ sinu awọn alaye siwaju ati siwaju sii, nitori a lo diẹ ninu awọn solusan ti kii ṣe bintin.
Nitorinaa, ni ọna ti o rọrun julọ, iru dag kan yoo dabi eyi:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Jẹ ki a ro ero rẹ:
- Ni akọkọ, a gbe wọle awọn libs pataki ati nkan miran;
sql_server_dsṢeList[namedtuple[str, str]]pẹlu awọn orukọ ti awọn asopọ lati Airflow Connections ati awọn database lati eyi ti a yoo gba wa awo;dag- ikede dag wa, eyiti o gbọdọ wa ni dandanglobals(), bibẹkọ ti Airflow yoo ko ri o. Doug tun nilo lati sọ:- Kini oruko re
orders- orukọ yii yoo han lẹhinna ni wiwo wẹẹbu, - pé yóò ṣiṣẹ́ láti ọ̀gànjọ́ òru ọjọ́ kẹjọ, oṣù keje,
- ati pe o yẹ ki o ṣe ifilọlẹ isunmọ ni gbogbo awọn wakati 6 (fun awọn eniyan tutu, nibi dipo
timedelta()iyọọdacron-ila0 0 0/6 ? * * *, fun awọn kere itura - ẹya ikosile bi@daily);
- Kini oruko re
workflow()yoo ṣe akọkọ ise, sugbon ko bayi. Ni bayi, a kan yoo da ọrọ-ọrọ wa silẹ sinu akọọlẹ naa.- Ati nisisiyi idan ti o rọrun ti ṣiṣẹda awọn iṣẹ-ṣiṣe:
- a ṣiṣe nipasẹ awọn orisun wa;
- ipilẹṣẹ
PythonOperator, eyi ti yoo ṣiṣẹ dummy waworkflow(). Maṣe gbagbe lati pato orukọ alailẹgbẹ (laarin dag) ti iṣẹ-ṣiṣe ki o di dag naa funrararẹ. Flagprovide_contextni titan, yoo tú awọn ariyanjiyan afikun sinu iṣẹ naa, eyiti a yoo farabalẹ gba ni lilo**context.
Fun bayi, iyẹn ni gbogbo. Ohun ti a ni:
- dag tuntun ni wiwo wẹẹbu,
- awọn iṣẹ-ṣiṣe kan ati idaji ti yoo ṣe ni afiwe (ti Airflow, Seleri eto ati agbara olupin gba laaye).
O dara, o fẹrẹ gba.

Tani yoo fi sori ẹrọ awọn igbẹkẹle naa?
Lati rọrun gbogbo nkan yii, Mo wọ inu docker-compose.yml processing requirements.txt lori gbogbo apa.
Bayi o ti lọ:

Awọn onigun mẹrin grẹy jẹ awọn iṣẹlẹ iṣẹ ṣiṣe nipasẹ oluṣeto.
A duro diẹ, awọn iṣẹ-ṣiṣe ti wa ni imudani nipasẹ awọn oṣiṣẹ:

Awọn alawọ ewe, dajudaju, ti pari iṣẹ wọn ni ifijišẹ. Reds ko ni aṣeyọri pupọ.
Nipa ọna, ko si folda lori ọja wa
./dags, ko si amuṣiṣẹpọ laarin awọn ẹrọ - gbogbo awọn dags dubulẹ nigitlori Gitlab wa, ati Gitlab CI n pin awọn imudojuiwọn si awọn ẹrọ nigbati o ba dapọ sinumaster.
Diẹ nipa Flower
Lakoko ti awọn oṣiṣẹ n pa awọn pacifiers wa, jẹ ki a ranti irinṣẹ miiran ti o le fi nkan han wa - Flower.
Oju-iwe akọkọ pupọ pẹlu alaye akojọpọ lori awọn apa oṣiṣẹ:

Oju-iwe ti o lagbara julọ pẹlu awọn iṣẹ ṣiṣe ti o lọ si iṣẹ:

Oju-iwe alaidun julọ pẹlu ipo ti alagbata wa:

Oju-iwe didan julọ wa pẹlu awọn aworan ipo iṣẹ-ṣiṣe ati akoko ipaniyan wọn:

A fifuye awọn underloaded
Nitorinaa, gbogbo awọn iṣẹ ṣiṣe ti ṣiṣẹ, o le gbe awọn ti o gbọgbẹ lọ.

Ati ọpọlọpọ awọn ti o gbọgbẹ - fun idi kan tabi omiiran. Ninu ọran ti lilo deede ti Airflow, awọn onigun mẹrin wọnyi tọka pe pato data ko de.
O nilo lati wo akọọlẹ naa ki o tun bẹrẹ awọn iṣẹlẹ iṣẹ-ṣiṣe ti o ṣubu.
Nipa tite lori eyikeyi onigun mẹrin, a yoo rii awọn iṣe ti o wa fun wa:

O le mu ki o ṣe Ko awọn ti o ṣubu kuro. Iyẹn ni, a gbagbe pe nkan kan ti kuna nibẹ, ati pe iṣẹ-ṣiṣe apẹẹrẹ kanna yoo lọ si oluṣeto.

O han gbangba pe ṣiṣe eyi pẹlu Asin pẹlu gbogbo awọn onigun mẹrin pupa kii ṣe eniyan pupọ - eyi kii ṣe ohun ti a nireti lati Airflow. Nipa ti ara, a ni awọn ohun ija iparun: Browse/Task Instances

Jẹ ki a yan ohun gbogbo ni ẹẹkan ki a tunto si odo, tẹ ohun kan ti o pe:

Lẹhin mimọ, awọn takisi wa dabi eyi (wọn ti nduro tẹlẹ fun oluṣeto lati ṣeto wọn):

Awọn asopọ, awọn iwọ ati awọn oniyipada miiran
O to akoko lati wo DAG atẹle, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Njẹ gbogbo eniyan ti ṣe imudojuiwọn ijabọ kan? Eyi ni lẹẹkansi: atokọ ti awọn orisun wa lati ibiti o ti gba data naa; nibẹ ni a akojọ ibi ti lati fi; maṣe gbagbe lati honk nigbati ohun gbogbo ṣẹlẹ tabi fọ (daradara, eyi kii ṣe nipa wa, rara).
Jẹ ki a lọ nipasẹ faili naa lẹẹkansi ki o wo nkan tuntun ti o ṣofo:
from commons.operators import TelegramBotSendMessage- Ko si ohun ti o ṣe idiwọ fun wa lati ṣe awọn oniṣẹ ti ara wa, eyiti a lo anfani nipasẹ ṣiṣe apẹja kekere kan fun fifiranṣẹ awọn ifiranṣẹ si Ṣii silẹ. (A yoo sọrọ diẹ sii nipa oniṣẹ ẹrọ yii ni isalẹ);default_args={}- dag le pin kaakiri awọn ariyanjiyan kanna si gbogbo awọn oniṣẹ rẹ;to='{{ var.value.all_the_kings_men }}'- aayetoa kii yoo ni koodu lile, ṣugbọn ti ipilẹṣẹ ni agbara ni lilo Jinja ati oniyipada kan pẹlu atokọ ti awọn imeeli, eyiti Mo farabalẹ fi siiAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS- ipo fun ibẹrẹ oniṣẹ. Ninu ọran wa, lẹta naa yoo fo si awọn ọga nikan ti gbogbo awọn igbẹkẹle ba ti ṣiṣẹ ni ifijišẹ;tg_bot_conn_id='tg_main'- awọn ariyanjiyanconn_idgba awọn ID asopọ ti a ṣẹda ninuAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- awọn ifiranṣẹ ni Telegram yoo fo kuro nikan ti awọn iṣẹ ṣiṣe ti o ṣubu ba wa;task_concurrency=1- a ṣe idiwọ ifilọlẹ igbakana ti ọpọlọpọ awọn iṣẹlẹ iṣẹ-ṣiṣe ti iṣẹ-ṣiṣe kan. Bibẹẹkọ, a yoo gba ọpọlọpọ awọn ifilọlẹ nigbakannaVerticaOperator(nwa ni ọkan tabili);report_update >> [email, tg]- gbogboVerticaOperatordarapọ ni fifiranṣẹ awọn lẹta ati awọn ifiranṣẹ, bii eyi:

Ṣugbọn niwọn igba ti awọn oniṣẹ notifier ni awọn ipo ifilọlẹ oriṣiriṣi, ọkan nikan yoo ṣiṣẹ. Ninu Wiwo Igi, ohun gbogbo dabi wiwo diẹ diẹ:

Emi yoo sọ ọrọ diẹ nipa Makiro ati awọn ọrẹ wọn - oniyipada.
Macros jẹ awọn oniduro Jinja ti o le paarọ ọpọlọpọ alaye to wulo sinu awọn ariyanjiyan oniṣẹ. Fun apẹẹrẹ, bii eyi:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} yoo faagun si awọn akoonu ti awọn oniyipada ọrọ execution_date ni ọna kika YYYY-MM-DD: 2020-07-14. Apakan ti o dara julọ ni pe awọn oniyipada ọrọ-ọrọ ni a kan mọ si apẹẹrẹ iṣẹ-ṣiṣe kan pato (square kan ninu Iwo Igi), ati pe nigba ti a tun bẹrẹ, awọn aaye yoo faagun si awọn iye kanna.
Awọn iye ti a sọtọ ni a le wo ni lilo bọtini Ti a ṣe lori apẹẹrẹ iṣẹ-ṣiṣe kọọkan. Eyi ni bii iṣẹ ṣiṣe pẹlu fifiranṣẹ lẹta kan:

Ati bẹ ni iṣẹ-ṣiṣe pẹlu fifiranṣẹ ifiranṣẹ kan:

Atokọ pipe ti awọn macros ti a ṣe sinu fun ẹya tuntun ti o wa nibi:
Pẹlupẹlu, pẹlu iranlọwọ ti awọn afikun, a le kede awọn macros tiwa, ṣugbọn iyẹn jẹ itan miiran.
Ni afikun si awọn ohun ti a ti sọ tẹlẹ, a le paarọ awọn iye ti awọn oniyipada wa (Mo ti lo eyi tẹlẹ ninu koodu loke). Jẹ ki a ṣẹda Admin/Variables nkan meji:

Ohun gbogbo ti o le lo:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Iye le jẹ iwọn, tabi o tun le jẹ JSON. Ninu ọran ti JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}kan lo ọna si bọtini ti o fẹ: {{ var.json.bot_config.bot.token }}.
Emi yoo sọ ọrọ kan gangan ati ṣafihan sikirinifoto kan nipa awọn isopọ. Ohun gbogbo jẹ alakọbẹrẹ nibi: lori oju-iwe naa Admin/Connections a ṣẹda asopọ kan, ṣafikun awọn iwọle / awọn ọrọ igbaniwọle wa ati awọn aye pataki diẹ sii nibẹ. Bi eleyi:

Awọn ọrọ igbaniwọle le jẹ ti paroko (diẹ sii daradara ju aiyipada lọ), tabi o le fi iru asopọ silẹ (bii MO ṣe fun tg_main) - otitọ ni pe atokọ ti awọn oriṣi jẹ lile sinu awọn awoṣe Airflow ati pe ko le ṣe afikun laisi titẹ sinu koodu orisun (ti o ba lojiji Emi ko Google nkankan, jọwọ ṣe atunṣe mi), ṣugbọn ko si ohun ti yoo da wa duro lati gba awọn kirẹditi ni irọrun nipasẹ oruko.
O tun le ṣe awọn asopọ pupọ pẹlu orukọ kanna: ninu ọran yii, ọna naa BaseHook.get_connection(), eyi ti o gba wa awọn isopọ nipa orukọ, yoo fun laileto lati awọn orukọ pupọ (yoo jẹ ọgbọn diẹ sii lati ṣe Round Robin, ṣugbọn jẹ ki a fi silẹ lori ẹri-ọkàn ti awọn olupilẹṣẹ Airflow).
Awọn iyipada ati Awọn isopọ jẹ awọn irinṣẹ itura dajudaju, ṣugbọn o ṣe pataki lati ma padanu iwọntunwọnsi: awọn apakan ti ṣiṣan rẹ ti o fipamọ sinu koodu funrararẹ, ati awọn apakan wo ni o fun Airflow fun ibi ipamọ. Ni ọna kan, o le rọrun lati yi iye pada ni kiakia, fun apẹẹrẹ, apoti ifiweranṣẹ, nipasẹ UI. Ni apa keji, eyi tun jẹ ipadabọ si tẹ Asin, lati eyiti a (I) fẹ lati yọ kuro.
Ṣiṣẹ pẹlu awọn asopọ jẹ ọkan ninu awọn iṣẹ-ṣiṣe ìkọ. Ni gbogbogbo, awọn kio Airflow jẹ awọn aaye fun sisopọ rẹ si awọn iṣẹ ẹnikẹta ati awọn ile-ikawe. Fun apẹẹrẹ, JiraHook yoo ṣii onibara fun a nlo pẹlu Jira (o le gbe awọn iṣẹ-ṣiṣe pada ati siwaju), ati pẹlu iranlọwọ ti awọn SambaHook o le Titari faili agbegbe kan si smb-ojuami.
Ṣiṣayẹwo oniṣẹ ẹrọ aṣa
Ati pe a sunmọ lati wo bi o ti ṣe TelegramBotSendMessage
Koodu commons/operators.py pẹlu oniṣẹ gangan:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Nibi, bii ohun gbogbo miiran ni Airflow, ohun gbogbo rọrun pupọ:
- Jogun lati
BaseOperator, eyiti o ṣe awọn ohun kan pato Airflow diẹ (wo akoko isinmi rẹ) - Awọn aaye ti a kede
template_fields, ninu eyiti Jinja yoo wa awọn macros lati ṣe ilana. - Seto awọn ọtun ariyanjiyan fun
__init__(), ṣeto awọn aiyipada ibi ti pataki. - A ko gbagbe nipa ibẹrẹ ti baba naa boya.
- Ṣii kio ti o baamu
TelegramBotHook, gba ohun onibara lati ọdọ rẹ. - Yipadanu (ti a tunṣe) ọna
BaseOperator.execute(), eyiti Airfow yoo tẹẹrẹ nigbati akoko ba de lati ṣe ifilọlẹ oniṣẹ - ninu rẹ a yoo ṣe iṣe iṣe akọkọ, gbagbe lati wọle. (A wọle, nipasẹ ọna, wọlestdoutиstderr- Ṣiṣan afẹfẹ yoo ṣe idiwọ ohun gbogbo, fi ipari si ni ẹwa, bajẹ rẹ nibiti o jẹ dandan.)
Jẹ ká wo ohun ti a ni ninu commons/hooks.py. Apa akọkọ ti faili naa, pẹlu kio funrararẹ:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientEmi ko paapaa mọ kini lati ṣalaye nibi, Emi yoo kan ṣe akiyesi awọn aaye pataki:
- A jogun, ronu nipa awọn ariyanjiyan - ni ọpọlọpọ awọn ọran yoo jẹ ọkan:
conn_id; - Yiyọ awọn ọna boṣewa: Mo ni opin ara mi
get_conn(), ninu eyiti Mo gba awọn paramita asopọ nipasẹ orukọ ati pe o kan gba apakan naaextra(Eyi jẹ aaye JSON), ninu eyiti MO (gẹgẹbi awọn ilana ti ara mi!) Fi ami ami bot Telegram naa:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Mo ṣẹda apẹẹrẹ ti wa
TelegramBot, fifun ni pato aami.
Gbogbo ẹ niyẹn. O le gba alabara kan lati inu kio kan nipa lilo TelegramBotHook().clent tabi TelegramBotHook().get_conn().
Ati apakan keji ti faili naa, ninu eyiti Mo ṣe microwrapper fun Telegram REST API, nitorinaa ki o ma ṣe fa kanna nitori ọna kan sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Ọna ti o tọ ni lati ṣafikun gbogbo rẹ:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- ninu ohun itanna, fi sinu ibi ipamọ ti gbogbo eniyan, ki o fun ni Ṣii Orisun.
Lakoko ti a nkọ gbogbo eyi, awọn imudojuiwọn ijabọ wa ṣakoso lati kuna ni aṣeyọri ati firanṣẹ ifiranṣẹ aṣiṣe kan si mi. Emi yoo ṣayẹwo lati rii boya o jẹ aṣiṣe…

Nkankan bu ni doge wa! Ṣe kii ṣe ohun ti a nireti? Gangan!
Ti wa ni o ti lọ si tú?
Ṣe o lero Mo padanu nkankan? O dabi pe o ṣe ileri lati gbe data lati SQL Server si Vertica, ati lẹhinna o mu ki o lọ kuro ni koko-ọrọ, ẹlẹgàn!
Irufin yii jẹ imomose, Mo kan ni lati pinnu diẹ ninu awọn ọrọ-ọrọ fun ọ. Bayi o le lọ siwaju.
Ètò wa nìyí:
- Ṣe bẹ
- Ṣe ina awọn iṣẹ-ṣiṣe
- Wo bi ohun gbogbo ṣe lẹwa
- Fi awọn nọmba igba lati kun
- Gba data lati SQL Server
- Fi data sinu Vertica
- Gba awọn iṣiro
Nitorinaa, lati gba gbogbo eyi ati ṣiṣe, Mo ṣe afikun kekere si wa docker-compose.yml:
docker-kọ.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyNibẹ ni a gbe soke:
- Vertica bi ogun
dwhpẹlu awọn eto aiyipada julọ, - Awọn iṣẹlẹ mẹta ti SQL Server,
- a kun awọn apoti isura infomesonu ni igbehin pẹlu diẹ ninu awọn data (ninu ọran kankan ko wo sinu
mssql_init.py!)
A ṣe ifilọlẹ gbogbo awọn ti o dara pẹlu iranlọwọ ti aṣẹ idiju diẹ diẹ sii ju akoko to kẹhin lọ:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Kini randomizer iyanu wa ti ipilẹṣẹ, o le lo nkan naa Data Profiling/Ad Hoc Query:

Ohun akọkọ kii ṣe lati fi han si awọn atunnkanka
ṣe alaye lori Awọn akoko ETL Emi kii yoo ṣe, ohun gbogbo jẹ bintin nibẹ: a ṣe ipilẹ kan, ami kan wa ninu rẹ, a fi ipari si ohun gbogbo pẹlu oluṣakoso ọrọ, ati ni bayi a ṣe eyi:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15igba.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passÀkókò náà ti dé gba data wa lati wa ọkan ati idaji ọgọrun tabili. Jẹ ki a ṣe eyi pẹlu iranlọwọ ti awọn laini ti ko ni itumọ pupọ:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Pẹlu iranlọwọ ti a kio ti a gba lati Airflow
pymssql-sopọ - Jẹ ki a paarọ ihamọ kan ni irisi ọjọ kan sinu ibeere naa - yoo sọ sinu iṣẹ nipasẹ ẹrọ awoṣe.
- Ifunni ibeere wa
pandastani yio gba waDataFrame- yoo wulo fun wa ni ojo iwaju.
Mo n lo aropo
{dt}dipo ti a ìbéèrè paramita%sko nitori Mo wa ohun buburu Pinocchio, ṣugbọn nitoripandasko le mupymssqlati ki o yo awọn ti o kẹhinparams: Listbiotilejepe o gan fetuple.
Tun akiyesi pe Olùgbéejádepymssqlpinnu lati ma ṣe atilẹyin fun u mọ, ati pe o to akoko lati lọ kuropyodbc.
Jẹ ki a wo kini Airflow ṣe awọn ariyanjiyan ti awọn iṣẹ wa pẹlu:

Ti ko ba si data, lẹhinna ko si aaye lati tẹsiwaju. Ṣugbọn o tun jẹ ajeji lati ṣe akiyesi kikun ni aṣeyọri. Ṣugbọn eyi kii ṣe aṣiṣe. A-ah-ah, kini lati ṣe?! Ati pe kini eyi:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException yoo sọ fun Airflow pe ko si awọn aṣiṣe, ṣugbọn a fo iṣẹ naa. Ni wiwo yoo ko ni a alawọ ewe tabi pupa square, ṣugbọn Pink.
Jẹ ki a jabọ data wa ọpọ ọwọn:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Eyi ni:
- Data data lati eyiti a gba awọn aṣẹ,
- ID ti igba iṣan omi wa (yoo yatọ fun gbogbo iṣẹ-ṣiṣe),
- A hash lati orisun ati ID aṣẹ - nitorinaa ni ibi ipamọ data ikẹhin (nibiti ohun gbogbo ti dà sinu tabili kan) a ni ID aṣẹ alailẹgbẹ kan.
Igbesẹ penultimate wa: tú ohun gbogbo sinu Vertica. Ati pe, ni iyalẹnu to, ọkan ninu awọn ọna iyalẹnu julọ ati lilo daradara lati ṣe eyi ni nipasẹ CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- A n ṣe olugba pataki kan
StringIO. pandasyoo fi inurere gbe waDataFramebiCSV-ila.- Jẹ ki a ṣii asopọ kan si Vertica ayanfẹ wa pẹlu kio kan.
- Ati nisisiyi pẹlu iranlọwọ
copy()firanṣẹ data wa taara si Vertika!
A yoo gba lati ọdọ awakọ melo ni awọn ila ti o kun, ati sọ fun oluṣakoso igba pe ohun gbogbo dara:
session.loaded_rows = cursor.rowcount
session.successful = TrueGbogbo ẹ niyẹn.
Lori tita, a ṣẹda apẹrẹ ti a pinnu pẹlu ọwọ. Nibi Mo gba ara mi laaye ẹrọ kekere kan:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)Mo nlo
VerticaOperator()Mo ṣẹda ipilẹ data ati tabili kan (ti wọn ko ba wa tẹlẹ, dajudaju). Ohun akọkọ ni lati ṣeto awọn ti o gbẹkẹle:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadSumming soke
- Daradara, - wi kekere Asin, - ni ko o, bayi
Ṣe o da ọ loju pe Emi ni ẹranko ti o ni ẹru julọ ninu igbo?
Julia Donaldson, The Gruffalo
Mo ro pe ti awọn ẹlẹgbẹ mi ati Emi ni idije kan: tani yoo yara ṣẹda ati ṣe ifilọlẹ ilana ETL kan lati ibere: wọn pẹlu SSIS wọn ati Asin ati mi pẹlu Airflow… Ati lẹhinna a yoo tun ṣe afiwe irọrun itọju… Iro ohun, Mo ro pe o yoo gba pe Emi yoo lu wọn lori gbogbo awọn iwaju!
Ti o ba jẹ diẹ sii ni pataki, lẹhinna Apache Airflow - nipa apejuwe awọn ilana ni irisi koodu eto - ṣe iṣẹ mi pọ diẹ itura ati igbaladun.
Agbara ailopin rẹ, mejeeji ni awọn ofin ti plug-ins ati predisposition si scalability, fun ọ ni aye lati lo Airflow ni fere eyikeyi agbegbe: paapaa ni kikun ọmọ ti gbigba, ngbaradi ati ṣiṣe data, paapaa ni ifilọlẹ awọn apata (si Mars, ti dajudaju).
Apá ipari, itọkasi ati alaye
Awọn àwárí ti a ti gba fun o
start_date. Bẹẹni, eyi ti jẹ meme agbegbe tẹlẹ. Nipasẹ Doug ká akọkọ ariyanjiyanstart_dategbogbo kọja. Ni soki, ti o ba pato ninustart_datelọwọlọwọ ọjọ, atischedule_interval- ni ọjọ kan, lẹhinna DAG yoo ṣe ifilọlẹ ko ṣaaju ọla.start_date = datetime(2020, 7, 7, 0, 1, 2)Ati pe ko si awọn iṣoro diẹ sii.
Aṣiṣe asiko-ṣiṣe miiran wa ti o ni nkan ṣe pẹlu rẹ:
Task is missing the start_date parameter, eyiti o tọka nigbagbogbo pe o gbagbe lati sopọ mọ oniṣẹ ẹrọ dag.- Ohun gbogbo lori ẹrọ kan. Bẹẹni, ati awọn ipilẹ (Airflow funrararẹ ati ibora wa), ati olupin wẹẹbu kan, ati oluṣeto, ati awọn oṣiṣẹ. Ati pe o ṣiṣẹ paapaa. Ṣugbọn ni akoko pupọ, nọmba awọn iṣẹ-ṣiṣe fun awọn iṣẹ dagba, ati nigbati PostgreSQL bẹrẹ lati dahun si atọka ni 20 s dipo 5 ms, a mu ati gbe lọ.
- Aṣẹṣẹ agbegbe. Bẹẹni, a tun joko lori rẹ, ati pe a ti de eti ọgbun naa. LocalExecutor ti to fun wa titi di isisiyi, ṣugbọn nisisiyi o to akoko lati faagun pẹlu o kere ju oṣiṣẹ kan, ati pe a yoo ni lati ṣiṣẹ takuntakun lati gbe lọ si CeleryExecutor. Ati ni wiwo otitọ pe o le ṣiṣẹ pẹlu rẹ lori ẹrọ kan, ko si ohun ti o da ọ duro lati lo Seleri paapaa lori olupin kan, eyiti “dajudaju, kii yoo lọ sinu iṣelọpọ, nitootọ!”
- Ti kii ṣe lilo -itumọ ti ni irinṣẹ:
- awọn isopọ lati tọju awọn iwe-ẹri iṣẹ,
- SLA padanu lati dahun si awọn iṣẹ ṣiṣe ti ko pari ni akoko,
- xcom fun paṣipaarọ metadata (Mo sọ metadata!) laarin awọn iṣẹ-ṣiṣe dag.
- mail abuse. O dara, kini MO le sọ? Awọn itaniji ti ṣeto fun gbogbo awọn atunwi ti awọn iṣẹ ṣiṣe ti o ṣubu. Bayi Gmail iṣẹ mi ni> awọn imeeli 90k lati Airflow, ati muzzle mail wẹẹbu kọ lati gbe ati paarẹ diẹ sii ju 100 ni akoko kan.
Awọn ipalara diẹ sii:
Awọn irinṣẹ adaṣe adaṣe diẹ sii
Ni ibere fun wa lati ṣiṣẹ paapaa pẹlu awọn ori wa kii ṣe pẹlu ọwọ wa, Airflow ti pese sile fun wa:
- — o tun ni ipo idanwo, eyiti ko ṣe idiwọ rẹ lati ṣiṣẹ. Pẹlu rẹ, o ko le gba alaye nikan nipa awọn dags ati awọn iṣẹ-ṣiṣe, ṣugbọn tun da / bẹrẹ dag kan, ṣẹda DAG Run tabi adagun kan.
- - ọpọlọpọ awọn irinṣẹ wa nipasẹ laini aṣẹ ti kii ṣe inira lati lo nipasẹ WebUI, ṣugbọn ko si ni gbogbogbo. Fun apere:
backfillnilo lati tun awọn iṣẹlẹ iṣẹ-ṣiṣe bẹrẹ.
Fun apẹẹrẹ, awọn atunnkanka wa o sọ pe: “Ati iwọ, ẹlẹgbẹ, ni ọrọ isọkusọ ninu data lati Oṣu Kini ọjọ 1 si 13! Ṣe atunṣe, ṣe atunṣe, ṣe atunṣe, tun ṣe!" Ati pe iwọ jẹ alarinrin bẹ:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Iṣẹ ipilẹ:
initdb,resetdb,upgradedb,checkdb. run, eyiti o fun ọ laaye lati ṣiṣe iṣẹ-ṣiṣe apẹẹrẹ kan, ati paapaa Dimegilio lori gbogbo awọn igbẹkẹle. Pẹlupẹlu, o le ṣiṣe nipasẹLocalExecutor, paapaa ti o ba ni iṣupọ Seleri.- Ṣe lẹwa Elo ohun kanna
test, nikan tun ni awọn ipilẹ kọ ohunkohun. connectionsfaye gba ibi-ẹda awọn isopọ lati ikarahun.
- - ọna kuku ogbontarigi ti ibaraenisepo, eyiti o jẹ ipinnu fun awọn afikun, ati pe ko swaring ninu rẹ pẹlu awọn ọwọ kekere. Ṣugbọn tani yoo da wa duro lati lọ si
/home/airflow/dags, sureipythonki o si bẹrẹ idotin ni ayika? O le, fun apẹẹrẹ, okeere gbogbo awọn asopọ pẹlu koodu atẹle:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Nsopọ si metadatabase Airflow. Emi ko ṣeduro kikọ si rẹ, ṣugbọn gbigba awọn ipinlẹ iṣẹ-ṣiṣe fun ọpọlọpọ awọn metiriki pato le jẹ yiyara pupọ ati rọrun ju nipasẹ eyikeyi awọn API.
Jẹ ki a sọ pe kii ṣe gbogbo awọn iṣẹ-ṣiṣe wa ni agbara, ṣugbọn wọn le ṣubu nigbakan, ati pe eyi jẹ deede. Ṣugbọn awọn idena diẹ ti wa tẹlẹ ifura, ati pe yoo jẹ pataki lati ṣayẹwo.
Ṣọra SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
jo
Ati pe dajudaju, awọn ọna asopọ mẹwa akọkọ lati ipinfunni Google jẹ awọn akoonu ti folda Airflow lati awọn bukumaaki mi.
- - dajudaju, a gbọdọ bẹrẹ pẹlu ọfiisi. iwe, ṣugbọn ti o ka awọn ilana?
- - O dara, o kere ju ka awọn iṣeduro lati ọdọ awọn olupilẹṣẹ.
- - ibẹrẹ pupọ: wiwo olumulo ni awọn aworan
- - awọn ipilẹ agbekale ti wa ni daradara apejuwe, ti o ba (lojiji!) O ko ye nkankan lati mi.
- - Itọsọna kukuru kan fun iṣeto iṣupọ Airflow.
- - fere kanna awon nkan, ayafi boya diẹ formalism, ati díẹ apeere.
- - nipa ṣiṣẹ ni apapo pẹlu Seleri.
- - nipa iṣesi ti awọn iṣẹ-ṣiṣe, ikojọpọ nipasẹ ID dipo ọjọ, iyipada, eto faili ati awọn nkan ti o nifẹ si.
- - awọn igbẹkẹle ti awọn iṣẹ-ṣiṣe ati Ofin Nfa, eyiti Mo mẹnuba nikan ni gbigbe.
- - bii o ṣe le bori diẹ ninu “awọn iṣẹ bi a ti pinnu” ninu oluṣeto, fifuye data ti o sọnu ati ṣaju awọn iṣẹ ṣiṣe.
- - awọn ibeere SQL ti o wulo si metadata Airflow.
- - apakan ti o wulo wa nipa ṣiṣẹda sensọ aṣa kan.
- - akọsilẹ kukuru ti o nifẹ nipa kikọ amayederun lori AWS fun Imọ-jinlẹ data.
- - wọpọ asise (nigbati ẹnikan si tun ko ka awọn ilana).
- - rẹrin musẹ bi eniyan ṣe n tọju awọn ọrọ igbaniwọle pamọ, botilẹjẹpe o kan le lo Awọn isopọ.
- - Ifilọlẹ DAG titọ, jiju ọrọ ọrọ sinu awọn iṣẹ, lẹẹkansi nipa awọn igbẹkẹle, ati paapaa nipa awọn ifilọlẹ iṣẹ-ṣiṣe fo.
- - nipa lilo
default argumentsиparamsni awọn awoṣe, bakanna bi Awọn iyipada ati Awọn isopọ. - - itan kan nipa bawo ni oluṣeto ṣe ngbaradi fun Airflow 2.0.
- - nkan ti igba atijọ diẹ nipa gbigbe iṣupọ wa sinu
docker-compose. - - awọn iṣẹ ṣiṣe ti o ni agbara nipa lilo awọn awoṣe ati firanšẹ siwaju.
- - boṣewa ati awọn iwifunni aṣa nipasẹ meeli ati Slack.
- - Awọn iṣẹ ṣiṣe eka, macros ati XCom.
Ati awọn ọna asopọ ti o wa ninu nkan naa:
- - placeholders wa fun lilo ninu awọn awoṣe.
- - Wọpọ asise nigba ṣiṣẹda dags.
- -
docker-composefun experimentation, n ṣatunṣe aṣiṣe ati siwaju sii. - - Python wrapper fun Telegram REST API.
orisun: www.habr.com




