raws li advanced cron, uas khiav cov txheej txheem sib sau ua ke ntawm ODS thiab tseem saib xyuas lawv cov kev saib xyuas.
Txog thaum tsis ntev los no, peb cov kev xav tau tau them los ntawm ib lub server me nrog 32 cores thiab 50 GB ntawm RAM. Hauv Airflow qhov no ua haujlwm:
ntau 200 noj (Qhov tseeb ua haujlwm uas peb tau ua tiav hauv cov haujlwm),
Hauv kev sib sau ua ke, kuv vam khom rau cov duab zoo puckel/docker-airflow - Nco ntsoov xyuas nws tawm. Tej zaum koj yuav tsis xav tau lwm yam hauv lub neej.
Tag nrho Airflow chaw muaj tsis tau tsuas yog los ntawm airflow.cfg, tab sis kuj los ntawm ib puag ncig hloov pauv (lub koob meej rau cov neeg tsim khoom), uas kuv tau ua phem rau kom zoo dua.
DAG Run - ib tug pib dag, uas yog muab nws tus kheej execution_date. Dagrans ntawm ib lub dag tuaj yeem ua haujlwm sib luag (yog tias, ntawm chav kawm, koj tau ua rau koj cov dej num tsis muaj zog).
Tus neeg teb xov tooj - Cov no yog cov lej ntawm lub luag haujlwm rau kev ua haujlwm tshwj xeeb. Muaj peb hom neeg ua haujlwm:
txiav txim, zoo li peb tus hlub PythonOperator, uas tuaj yeem ua tiav ib qho ( siv tau) Python code;
sensor Nws tseem yuav tso cai rau koj los cuam tshuam lossis ua kom qeeb ntxiv ntawm dag ua ntej qhov tshwm sim tshwm sim. HttpSensor tuaj yeem rub cov ntsiab lus kawg, thiab thaum tau txais cov lus teb xav tau, pib hloov mus GoogleCloudStorageToS3Operator. Ib tug inquisitive lub siab yuav nug: βVim li cas? Tom qab tag nrho, koj tuaj yeem ua rov ua dua txoj cai hauv tus neeg teb xov tooj! " Thiab tom qab ntawd, kom tsis txhob txhaws lub pas dej ua haujlwm nrog cov neeg ua haujlwm daig. Lub sensor pib, sim thiab tuag kom txog rau thaum lub sim tom ntej.
Ua haujlwm - Cov neeg ua haujlwm tshaj tawm, tsis hais hom twg, thiab txuas nrog lub dag tau nce mus rau qib ntawm txoj haujlwm.
Ua haujlwm piv txwv - Thaum tus thawj tswj hwm tau txiav txim siab tias nws yog lub sijhawm xa cov dej num mus rau hauv kev sib ntaus sib tua tawm tsam cov neeg ua yeeb yam (txoj cai ntawm qhov chaw, yog tias peb siv LocalExecutor los yog nyob rau hauv tej thaj chaw deb ntawm cov ntaub ntawv CeleryExecutor), nws muab lawv cov ntsiab lus (piv txwv li, cov txheej txheem ntawm kev hloov pauv - kev ua tsis tiav), nthuav cov lus txib lossis thov cov qauv thiab muab tso rau hauv lub pas dej.
Tsim cov haujlwm
Ua ntej, cia peb piav qhia txog cov txheej txheem dav dav ntawm peb lub dag, thiab tom qab ntawd peb yuav dhia dej ntau ntxiv rau hauv cov ntsiab lus, vim tias peb siv qee qhov kev daws teeb meem tsis tseem ceeb.
Yog li, hauv nws daim ntawv yooj yim, xws li dag yuav zoo li no:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Koj tuaj yeem nqa nws thiab ua kom Clear rau tus poob. Ntawd yog, peb tsis nco qab tias ib yam dab tsi tau poob rau hauv qhov ntawd, thiab tib yam haujlwm yuav mus rau tus teem sijhawm.
Nws yog qhov tseeb tias ua qhov no nrog nas nrog tag nrho cov squares liab tsis yog tib neeg - qhov no tsis yog qhov peb xav tau los ntawm Airflow. Lawm, peb muaj riam phom ntawm kev puas tsuaj loj: Browse/Task Instances
Cia peb xaiv txhua yam ib zaug thiab rov pib dua rau xoom, nyem qhov khoom raug:
Tom qab ntxuav, peb cov tsheb tavxij zoo li no (lawv tsis tuaj yeem tos lub sijhawm teem sijhawm rau lawv):
Kev sib txuas, hooks thiab lwm yam hloov pauv
Nws yog lub sijhawm los saib DAG tom ntej, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ΠΠΎΡΠΏΠΎΠ΄Π° Ρ ΠΎΡΠΎΡΠΈΠ΅, ΠΎΡΡΠ΅ΡΡ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Ρ"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ΠΠ°ΡΠ°Ρ, ΠΏΡΠΎΡΡΠΏΠ°ΠΉΡΡ, ΠΌΡ {{ dag.dag_id }} ΡΡΠΎΠ½ΠΈΠ»ΠΈ
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Txhua tus tau hloov kho lawv cov lus ceeb toom, puas yog? Ntawm no nws yog dua: muaj ib daim ntawv teev cov peev txheej uas tau txais cov ntaub ntawv; muaj ib daim ntawv teev qhov twg muab tso rau; tsis txhob hnov ββqab hnia thaum txhua yam tshwm sim los yog tawg (zoo, qhov no tsis yog hais txog peb, tsis yog).
Cia peb rov qab mus rau hauv cov ntaub ntawv dua thiab saib qhov txawv txawv tshiab:
from commons.operators import TelegramBotSendMessage - tsis muaj dab tsi tiv thaiv peb los ntawm kev tsim peb tus kheej cov neeg ua haujlwm, uas peb tau siv kom zoo dua los ntawm kev ua ib lub hnab me me rau kev xa cov lus rau Unblocked. (Peb yuav tham ntxiv txog tus neeg teb xov tooj hauv qab no);
default_args={} - dag tuaj yeem faib cov lus sib cav rau txhua tus neeg ua haujlwm;
to='{{ var.value.all_the_kings_men }}' - teb to peb li yuav tsis hardcoded, tab sis generated dynamically siv Jinja thiab ib tug kuj sib txawv nrog ib daim ntawv teev cov emails, uas kuv ua tib zoo muab tso rau hauv. Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS - tus neeg teb xov tooj tso tus mob. Hauv peb qhov xwm txheej, tsab ntawv yuav raug xa mus rau cov thawj coj tsuas yog tias txhua qhov kev cia siab tau ua tiav ntse;
Kuv mam li hais ib lo lus thiab qhia ib lub screenshot txog kev sib txuas. Txhua yam yog theem pib ntawm no: ntawm nplooj ntawv Admin/Connections Peb tsim kev sib txuas, ntxiv peb tus ID nkag mus / lo lus zais thiab ntau qhov tshwj xeeb muaj nyob ntawd. Zoo li no:
Cov passwords tuaj yeem raug encrypted (ua tib zoo ntau dua li qhov kev xaiv ua ntej), lossis koj tsis tuaj yeem hais qhia hom kev sib txuas (raws li kuv tau ua rau tg_main) - qhov tseeb yog tias cov npe ntawm hom yog hardwired rau hauv Airflow qauv thiab tsis tuaj yeem nthuav dav yam tsis tau nkag mus rau hauv qhov chaws (yog tias tam sim ntawd kuv tsis Google ib yam dab tsi, thov kho kuv), tab sis tsis muaj dab tsi yuav txwv tsis pub peb tsis tau txais cov qhab nia yooj yim los ntawm npe.
Koj tuaj yeem ua ntau yam kev sib txuas nrog tib lub npe: hauv qhov no, txoj kev BaseHook.get_connection(), uas tau txais peb kev sib txuas los ntawm lub npe, yuav muab random los ntawm ob peb lub npe (nws yuav yog qhov xav tau ntau dua los ua Round Robin, tab sis peb yuav tso qhov ntawd rau lub siab ntawm Airflow developers).
Kev sib txawv thiab kev sib txuas yog cov cuab yeej txias, tab sis nws yog ib qho tseem ceeb kom tsis txhob poob qhov sib npaug ntawm qhov chaw ntawm koj cov dej ntws koj khaws cia hauv cov lej thiab qhov chaw koj muab rau Airflow rau kev cia. Ntawm qhov tod tes, hloov pauv sai sai, piv txwv li, mailbox, tuaj yeem yooj yim los ntawm UI. Ntawm qhov tod tes, qhov no tseem yog qhov rov qab mus rau nas nyem, uas peb (Kuv) xav kom tshem tawm.
Cia peb saib seb peb muaj dab tsi hauv commons/hooks.py. Thawj feem ntawm cov ntaub ntawv, nrog tus nuv nws tus kheej:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Kuv tsis paub dab tsi tuaj yeem piav qhia ntawm no, kuv tsuas yog sau cov ntsiab lus tseem ceeb:
Overriding txheem txheej txheem: Kuv txwv kuv tus kheej get_conn(), nyob rau hauv uas kuv tau txais kev twb kev txuas parameters los ntawm lub npe thiab cia li tau txais cov seem extra (qhov no yog daim teb rau JSON), uas kuv (raws li kuv tus kheej cov lus qhia!) muab lub Telegram bot token: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Peb tab tom ua lub chaw kaw neeg tshwj xeeb StringIO.
pandas yuav ua siab zoo muab tso rau hauv peb DataFrame hauv daim ntawv CSV- kab.
Cia peb qhib kev sib txuas rau peb tus hlub Vertica siv tus nuv.
Thiab tam sim no nrog kev pab copy() Cia peb xa peb cov ntaub ntawv ncaj qha rau Vertika!
Peb yuav coj los ntawm tus neeg tsav tsheb yuav ua li cas muaj pes tsawg kab tau sau rau hauv thiab qhia tus neeg saib xyuas kev sib kho tias txhua yam zoo:
Ntawm qhov kev ceeb toom loj me ntsis, Apache Airflow - los ntawm kev piav qhia txog cov txheej txheem hauv daim ntawv teev npe ntawm qhov program code - tau ua kuv txoj haujlwm. ntau yooj yim dua thiab qab ntxiag.
Nws unlimited extensibility: ob qho tib si nyob rau hauv cov nqe lus ntawm plugins thiab predisposition rau scalability - muab lub sij hawm rau koj siv Airflow nyob rau hauv yuav luag txhua qhov chaw: txawm nyob rau hauv tag nrho lub voj voog ntawm kev sau, npaj thiab ua cov ntaub ntawv, txawm nyob rau hauv launching foob pob ua ntxaij (rau Mars, ntawm chav kawm) .
Qhov kawg, siv thiab cov ntaub ntawv
Lub rake peb sau rau koj
start_date. Yog lawm, qhov no yog ib qho meme hauv zos. Los ntawm Dag lub ntsiab lus sib cav start_date sawv daws hla. Luv luv, yog tias koj qhia hauv start_date hnub tim tam sim no, thiab nyob rau schedule_interval - muaj ib hnub, ces DAG yuav tawm tsis ntxov tshaj tag kis.
start_date = datetime(2020, 7, 7, 0, 1, 2)
Thiab tsis muaj teeb meem ntxiv.
Lwm qhov kev ua yuam kev raug cuam tshuam nrog nws: Task is missing the start_date parameter, uas feem ntau qhia tias koj tsis nco qab khi dag rau tus neeg teb xov tooj.
Txhua yam ntawm ib lub tshuab. Yog, thiab databases (Airflow nws tus kheej thiab peb txheej), thiab lub web server, thiab tus teem sijhawm, thiab cov neeg ua haujlwm. Thiab nws txawm ua haujlwm. Tab sis dhau sij hawm, tus naj npawb ntawm cov hauj lwm rau cov kev pab cuam loj hlob, thiab thaum PostgreSQL pib teb rau qhov Performance index nyob rau hauv 20 s es tsis txhob ntawm 5 ms, peb coj nws thiab nqa nws mus.
LocalExecutor. Yog, peb tseem zaum ntawm nws, peb twb tuaj txog ntawm lub abyss. LocalExecutor tau txaus rau peb txog tam sim no, tab sis tam sim no nws yog lub sijhawm los nthuav nrog tsawg kawg ib tus neeg ua haujlwm, thiab peb yuav tau ua haujlwm hnyav dua kom txav mus rau CeleryExecutor. Thiab txij li thaum koj tuaj yeem ua haujlwm nrog nws ntawm ib lub tshuab, tsis muaj dab tsi txwv koj los ntawm kev siv Celery txawm nyob rau ntawm lub server, uas "ib txwm yuav tsis mus rau hauv kev tsim khoom, ncaj ncees!"
Kev tsim txom ntawm kev xa ntawv. Zoo kuv tuaj yeem hais li cas? Kev ceeb toom tau teeb tsa rau txhua qhov rov ua dua ntawm cov haujlwm poob. Tam sim no hauv kuv txoj haujlwm Gmail muaj> 90k tsab ntawv los ntawm Airflow, thiab lub vev xaib xa ntawv tsis kam lees thiab rho tawm ntau dua 100 daim ib zaug.
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Txuas rau Airflow metadata database. Kuv tsis pom zoo kom sau ntawv rau nws, tab sis koj tuaj yeem tau txais cov haujlwm ua haujlwm rau ntau yam kev ntsuas tshwj xeeb sai dua thiab yooj yim dua los ntawm ib qho ntawm APIs.
Cia peb hais tias tsis yog txhua yam ntawm peb txoj haujlwm tsis muaj zog, tab sis qee zaum lawv tuaj yeem ua tsis tau thiab qhov no yog qhov qub. Tab sis ob peb lub pob zeb twb tsis txaus ntseeg, thiab peb yuav tsum tau kuaj xyuas nws.
Ceev faj, SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
ua tim khawv
Zoo, tau kawg, thawj kaum qhov txuas los ntawm Google yog cov ntsiab lus ntawm Airflow nplaub tshev los ntawm kuv phau ntawv cim.
Cov ntaub ntawv Apache Airflow - tau kawg, peb yuav tsum pib nrog lub chaw ua haujlwm. cov ntaub ntawv, tab sis leej twg nyeem cov lus qhia?
Airflow: Thaum Koj DAG nyob deb tom qab Lub Sijhawm - Yuav ua li cas kov yeej qee qhov "ua haujlwm raws li xav tau" teeb meem nrog lub sijhawm teem sijhawm, thauj cov ntaub ntawv poob thiab ua haujlwm tseem ceeb.