سلام، من دیمیتری لوگویننکو هستم - مهندس داده بخش تجزیه و تحلیل گروه شرکت های Vezet.
من در مورد یک ابزار فوق العاده برای توسعه فرآیندهای ETL - Apache Airflow به شما خواهم گفت. اما Airflow به قدری همه کاره و چندوجهی است که حتی اگر درگیر جریان داده ها نیستید، اما نیاز به راه اندازی دوره ای هر فرآیند و نظارت بر اجرای آنها دارید، باید نگاه دقیق تری به آن بیندازید.
و بله، من نه تنها می گویم، بلکه نشان می دهم: این برنامه کد، اسکرین شات و توصیه های زیادی دارد.

چیزی که معمولاً هنگام جستجوی کلمه Airflow / Wikimedia Commons در گوگل مشاهده می کنید
فهرست مندرجات
معرفی
Apache Airflow دقیقاً مانند جنگو است:
- نوشته شده در پایتون
- یک پنل مدیریت عالی وجود دارد،
- قابل گسترش به طور نامحدود
- فقط بهتر است، و برای اهداف کاملاً متفاوت ساخته شده است، یعنی (همانطور که قبل از کاتا نوشته شده است):
- اجرای و نظارت بر وظایف بر روی تعداد نامحدودی از ماشین ها (همانطور که بسیاری از Celery / Kubernetes و وجدان شما به شما اجازه می دهد)
- با تولید گردش کار پویا از نوشتن و درک کد پایتون بسیار آسان است
- و امکان اتصال هر پایگاه داده و API با یکدیگر با استفاده از اجزای آماده و پلاگین های خانگی (که بسیار ساده است).
ما از Apache Airflow به صورت زیر استفاده می کنیم:
- ما داده ها را از منابع مختلف (بسیاری از نمونه های SQL Server و PostgreSQL، API های مختلف با معیارهای برنامه، حتی 1C) در DWH و ODS جمع آوری می کنیم (ما Vertica و Clickhouse را داریم).
- چقدر پیشرفته
cron، که فرآیندهای یکپارچه سازی داده ها را در ODS شروع می کند و همچنین نگهداری آنها را نظارت می کند.
تا همین اواخر، نیازهای ما توسط یک سرور کوچک با 32 هسته و 50 گیگابایت رم پوشش داده می شد. در جریان هوا، این کار می کند:
- بیش 200 دگ (در واقع گردش کار، که در آن وظایف را پر کرده ایم)،
- در هر یک به طور متوسط 70 کار,
- این خوبی شروع می شود (همچنین به طور متوسط) یک بار در ساعت.
و در مورد نحوه گسترش ما، در زیر می نویسم، اما اکنون اجازه دهید مشکل über را که حل خواهیم کرد، تعریف کنیم:
سه منبع SQL Server وجود دارد که هر کدام دارای 50 پایگاه داده هستند - نمونه هایی از یک پروژه، به ترتیب، ساختار یکسانی دارند (تقریبا در همه جا، mua-ha-ha)، به این معنی که هر کدام یک جدول Orders (خوشبختانه، یک جدول با آن نام را می توان به هر کسب و کاری فشار داد). ما داده ها را با افزودن فیلدهای سرویس (سرور منبع، پایگاه داده منبع، شناسه وظیفه ETL) می گیریم و ساده لوحانه آنها را مثلاً به Vertica می اندازیم.
بریم!
بخش اصلی، عملی (و کمی تئوری)
چرا ما (و شما)
وقتی درخت ها بزرگ بودند و من ساده بودم SQLدر یکی از خردهفروشیهای روسی، ما با استفاده از دو ابزاری که در دسترس ما بود، از فرآیندهای ETL که به جریانهای داده معروف است کلاهبرداری کردیم:
- مرکز برق انفورماتیکا - یک سیستم بسیار گسترده، بسیار سازنده، با سخت افزار خاص خود، نسخه سازی خاص خود. خدای نکرده 1 درصد از قابلیت هاش استفاده کردم. چرا؟ خوب، اول از همه، این رابط، جایی از دهه 380، از نظر ذهنی ما را تحت فشار قرار داد. ثانیا، این ابزار برای فرآیندهای بسیار فانتزی، استفاده مجدد از اجزای خشمگین و دیگر ترفندهای بسیار مهم سازمانی طراحی شده است. در مورد هزینه آن، مانند بال ایرباس AXNUMX / سال، ما چیزی نخواهیم گفت.
مراقب باشید، یک اسکرین شات می تواند به افراد زیر 30 سال کمی آسیب برساند

- سرور یکپارچه سازی SQL Server - ما از این رفیق در جریان های درون پروژه ای خود استفاده کردیم. خوب، در واقع: ما در حال حاضر از SQL Server استفاده می کنیم و استفاده نکردن از ابزارهای ETL آن به نوعی غیرمنطقی است. همه چیز در آن خوب است: هم رابط کاربری زیبا است و هم گزارش های پیشرفت... اما این دلیلی نیست که ما محصولات نرم افزاری را دوست داریم، اوه، نه برای این. نسخه آن
dtsx(که XML با گرهها به هم ریخته در ذخیره است) میتوانیم، اما نکته چیست؟ در مورد ایجاد یک بسته کار که صدها جدول را از یک سرور به سرور دیگر بکشد، چطور؟ بله، چه صد، انگشت اشاره شما با کلیک بر روی دکمه ماوس از بیست تیکه می افتد. اما قطعا شیک تر به نظر می رسد:
مطمئناً به دنبال راههایی برای خروج بودیم. مورد حتی تقریبا به یک ژنراتور بسته SSIS که خود نوشته شده بود رسید ...
… و سپس یک کار جدید برای من پیدا کرد. و Apache Airflow از من سبقت گرفت.
وقتی فهمیدم که توضیحات فرآیند ETL کدهای ساده پایتون هستند، از خوشحالی نرقصیدم. اینگونه بود که جریانهای داده نسخهبندی و متفاوت شدند، و ریختن جداول با ساختار واحد از صدها پایگاه داده در یک هدف تبدیل به یک موضوع کد پایتون در یک و نیم یا دو صفحه 13 اینچی شد.
جمع آوری خوشه
بیایید یک مهدکودک کاملاً ترتیب دهیم و در اینجا در مورد چیزهای کاملاً واضح صحبت نکنیم، مانند نصب Airflow، پایگاه داده انتخابی شما، Celery و موارد دیگری که در اسکله ها توضیح داده شده است.
برای اینکه بتوانیم بلافاصله آزمایشات را شروع کنیم، طرح کردم docker-compose.yml که در آن:
- بیایید در واقع بالا ببریم جریان هوا: زمانبندی، وب سرور. گل همچنین برای نظارت بر وظایف کرفس در آنجا می چرخد (زیرا قبلاً به آن فشار داده شده است
apache/airflow:1.10.10-python3.7، اما ما مشکلی نداریم) - PostgreSQL و، که در آن Airflow اطلاعات سرویس خود را می نویسد (داده های زمان بندی، آمار اجرا و غیره) و Celery وظایف تکمیل شده را علامت گذاری می کند.
- Redis، که به عنوان یک کارگزار برای کرفس عمل می کند.
- کارگر کرفس، که به اجرای مستقیم وظایف مشغول خواهد شد.
- به پوشه
./dagsما فایل های خود را با توضیحات dags اضافه می کنیم. آنها در حال پرواز برداشته می شوند، بنابراین پس از هر عطسه نیازی به دستکاری کل پشته نیست.
در بعضی جاها، کد موجود در مثال ها به طور کامل نشان داده نمی شود (تا متن را به هم نریزد)، اما در جایی در این فرآیند اصلاح می شود. نمونه های کامل کد کار را می توان در مخزن یافت .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerیادداشت ها:
- در مونتاژ ترکیب، من تا حد زیادی به تصویر شناخته شده تکیه کردم - حتما بررسی کنید شاید شما به هیچ چیز دیگری در زندگی خود نیاز ندارید.
- همه تنظیمات جریان هوا نه تنها از طریق در دسترس هستند
airflow.cfg، بلکه از طریق متغیرهای محیطی (با تشکر از توسعه دهندگان) که من به طور مخرب از آنها استفاده کردم. - طبیعتاً آماده تولید نیست: من عمداً ضربان قلب را روی ظروف قرار ندادم ، امنیت را به زحمت نینداختم. اما من حداقل های مناسب را برای آزمایش کنندگانمان انجام دادم.
- توجه داشته باشید که:
- پوشه dag باید هم برای زمانبندی و هم برای کارگران قابل دسترسی باشد.
- همین امر در مورد تمام کتابخانه های شخص ثالث صدق می کند - همه آنها باید روی ماشین هایی با زمانبندی و کارگران نصب شوند.
خوب، حالا ساده است:
$ docker-compose up --scale worker=3بعد از اینکه همه چیز بالا رفت، می توانید به رابط های وب نگاه کنید:
- جریان هوا:
- گل:
مفاهیم پایه
اگر در تمام این "دگ ها" چیزی متوجه نشدید، در اینجا یک فرهنگ لغت کوتاه آمده است:
- زمان بند - مهمترین عمو در جریان هوا، که کنترل می کند که روبات ها سخت کار کنند، نه یک شخص: برنامه را نظارت می کند، Dags را به روز می کند، وظایف را راه اندازی می کند.
به طور کلی، در نسخه های قدیمی، او با حافظه مشکل داشت (نه، فراموشی نیست، اما نشتی) و پارامتر legacy حتی در تنظیمات باقی می ماند.
run_duration- فاصله شروع مجدد آن اما اکنون همه چیز خوب است. - DAG (با نام مستعار "داگ") - "گراف غیر چرخه ای جهت دار"، اما چنین تعریفی به افراد کمی می گوید، اما در واقع محفظه ای برای وظایفی است که با یکدیگر در تعامل هستند (پایین را ببینید) یا یک آنالوگ از Package در SSIS و Workflow در Informatica .
علاوه بر داگ ها، ممکن است هنوز هم زیرداگ ها وجود داشته باشد، اما به احتمال زیاد به آنها نخواهیم رسید.
- DAG Run - داگ مقدار دهی اولیه شده که به خود اختصاص داده می شود
execution_date. داگران های همان داگ می توانند به صورت موازی کار کنند (البته اگر وظایف خود را بی قدرت کرده باشید). - اپراتور قطعاتی از کد هستند که مسئول انجام یک عمل خاص هستند. سه نوع عملگر وجود دارد:
- اقداممانند مورد علاقه ما
PythonOperator، که می تواند هر کد (معتبر) پایتون را اجرا کند. - انتقال، که داده ها را از مکانی به مکان دیگر منتقل می کند، مثلاً،
MsSqlToHiveTransfer; - حسی از سوی دیگر، به شما این امکان را می دهد که تا زمانی که یک رویداد رخ دهد، واکنش نشان دهید یا اجرای بیشتر Dag را کاهش دهید.
HttpSensorمی تواند نقطه پایانی مشخص شده را بکشد و هنگامی که پاسخ مورد نظر در انتظار است، انتقال را شروع کندGoogleCloudStorageToS3Operator. یک ذهن کنجکاو خواهد پرسید: «چرا؟ پس از همه، شما می توانید تکرارها را درست در اپراتور انجام دهید! و سپس، برای اینکه مجموعه وظایف با اپراتورهای معلق مسدود نشود. سنسور شروع می شود، بررسی می کند و قبل از تلاش بعدی می میرد.
- اقداممانند مورد علاقه ما
- کار - اپراتورهای اعلام شده، صرف نظر از نوع، و متصل به داگ به رتبه وظیفه ارتقا می یابند.
- نمونه کار - زمانی که برنامه ریز کل تصمیم گرفت که زمان ارسال وظایف به نبرد بر روی کارگران مجری است (در صورت استفاده درست در محل
LocalExecutorیا به یک گره راه دور در موردCeleryExecutor، یک زمینه را به آنها اختصاص می دهد (به عنوان مثال، مجموعه ای از متغیرها - پارامترهای اجرا)، قالب های فرمان یا پرس و جو را گسترش می دهد و آنها را ادغام می کند.
ما وظایف تولید می کنیم
ابتدا طرح کلی دوغ خود را ترسیم می کنیم و سپس بیشتر و بیشتر به جزئیات می پردازیم، زیرا ما برخی از راه حل های غیر ضروری را اعمال می کنیم.
بنابراین، در ساده ترین شکل خود، چنین داگ به شکل زیر خواهد بود:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)بیایید آن را بفهمیم:
- ابتدا lib های لازم را وارد می کنیم و یک چیز دیگر;
sql_server_ds- آیاList[namedtuple[str, str]]با نام اتصالات از Airflow Connections و پایگاه های داده ای که صفحه خود را از آنها خواهیم گرفت.dag- اعلام دگ ما که حتما باید در باشدglobals()، در غیر این صورت Airflow آن را پیدا نمی کند. داگ همچنین باید بگوید:- اسمش چیه
orders- سپس این نام در رابط وب ظاهر می شود، - که از نیمه شب هشتم ژوئیه کار خواهد کرد،
- و باید تقریباً هر 6 ساعت یکبار اجرا شود (برای افراد سرسخت اینجا به جای
timedelta()قابل قبولcron-خط0 0 0/6 ? * * *، برای کمتر سرد - عبارتی مانند@daily);
- اسمش چیه
workflow()کار اصلی را انجام خواهد داد، اما نه اکنون. در حال حاضر، ما فقط زمینه خود را در لاگ می اندازیم.- و اکنون جادوی ساده ایجاد وظایف:
- ما از طریق منابع خود استفاده می کنیم.
- مقداردهی اولیه
PythonOperator، که ساختگی ما را اجرا می کندworkflow(). فراموش نکنید که یک نام منحصر به فرد (در داخل داگ) کار را مشخص کنید و خود داگ را ببندید. پرچمprovide_contextبه نوبه خود، آرگومان های اضافی را در تابع می ریزد که با استفاده از آنها به دقت جمع آوری می کنیم**context.
در حال حاضر، این همه است. آنچه به دست آوردیم:
- داگ جدید در رابط وب،
- صد و نیم کار که به صورت موازی اجرا می شوند (اگر تنظیمات Airflow، Celery و ظرفیت سرور اجازه دهد).
خب تقریبا فهمیدم

چه کسی وابستگی ها را نصب خواهد کرد؟
برای سادهتر کردن این موضوع، من را به هم زدم docker-compose.yml در حال پردازش requirements.txt روی همه گره ها
حالا رفته:

مربع های خاکستری نمونه های کار هستند که توسط زمان بندی پردازش می شوند.
ما کمی صبر می کنیم، کارها توسط کارگران انجام می شود:

سبزها البته کار خود را با موفقیت به پایان رسانده اند. قرمزها چندان موفق نیستند.
به هر حال، هیچ پوشه ای در پرود ما وجود ندارد
./dags، هیچ هماهنگی بین ماشین ها وجود ندارد - همه دگ ها در آن قرار دارندgitدر Gitlab ما، و Gitlab CI بهروزرسانیها را در صورت ادغام در ماشینها اجرا میکندmaster.
کمی در مورد گل
در حالی که کارگران در حال کوبیدن پستانک های ما هستند، بیایید ابزار دیگری را به یاد بیاوریم که می تواند چیزی را به ما نشان دهد - گل.
اولین صفحه با اطلاعات خلاصه در مورد گره های کارگر:

شدیدترین صفحه با وظایفی که به کار خود ادامه دادند:

خسته کننده ترین صفحه با وضعیت کارگزار ما:

روشن ترین صفحه با نمودارهای وضعیت کار و زمان اجرای آنها است:

ما زیر بار را بارگذاری می کنیم
بنابراین، تمام وظایف انجام شده است، شما می توانید مجروحان را با خود ببرید.

و تعداد زیادی مجروح وجود داشت - به دلایلی. در مورد استفاده صحیح از Airflow، همین مربع ها نشان می دهد که داده ها قطعاً به دست نیامده اند.
شما باید گزارش را تماشا کنید و نمونه های کار افتاده را مجدداً راه اندازی کنید.
با کلیک بر روی هر مربع، اقداماتی را که در دسترس ما هستند مشاهده خواهیم کرد:

شما می توانید بردارید و Clear the fallen را بسازید. یعنی فراموش میکنیم که چیزی در آنجا شکست خورده است و همان کار نمونه به زمانبندی میرود.

واضح است که انجام این کار با ماوس با تمام مربع های قرمز چندان انسانی نیست - این چیزی نیست که ما از Airflow انتظار داریم. طبیعتاً ما سلاح های کشتار جمعی داریم: Browse/Task Instances

بیایید همه چیز را یکباره انتخاب کنیم و به صفر بازنشانی کنیم، روی مورد صحیح کلیک کنید:

پس از تمیز کردن، تاکسیهای ما به این شکل هستند (آنها از قبل منتظر برنامهریزی هستند تا آنها را برنامه ریزی کند):

اتصالات، قلاب ها و سایر متغیرها
وقت آن است که به DAG بعدی نگاه کنیم، update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]آیا تا به حال همه یک به روز رسانی گزارش انجام داده اند؟ این دوباره اوست: فهرستی از منابع وجود دارد که از کجا می توان داده ها را دریافت کرد. لیستی وجود دارد که در آن قرار دهید. فراموش نکنید که وقتی همه چیز اتفاق افتاد یا شکست، بوق بزنید (خب، این مربوط به ما نیست، نه).
بیایید دوباره فایل را مرور کنیم و به موارد مبهم جدید نگاه کنیم:
from commons.operators import TelegramBotSendMessage- هیچ چیز ما را از ساخت اپراتورهای خودمان باز نمی دارد که با ساختن یک لفاف کوچک برای ارسال پیام به Unblocked از آن استفاده کردیم. (در ادامه در مورد این اپراتور بیشتر صحبت خواهیم کرد).default_args={}- dag می تواند همان آرگومان ها را بین تمام عملگرهای خود توزیع کند.to='{{ var.value.all_the_kings_men }}'- رشتهtoما هاردکد نخواهیم داشت، بلکه به صورت پویا با استفاده از Jinja و یک متغیر با لیستی از ایمیلها تولید میکنیم که من با دقت در آن قرار دادم.Admin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS- شرط راه اندازی اپراتور. در مورد ما، نامه فقط در صورتی به رئیسان ارسال می شود که همه وابستگی ها برطرف شده باشند با موفقیت;tg_bot_conn_id='tg_main'- استدلالconn_idشناسه های اتصالی را که در آن ایجاد می کنیم بپذیریدAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- پیام ها در تلگرام تنها در صورت وجود وظایف از بین می روند.task_concurrency=1- ما راه اندازی همزمان چند نمونه کار از یک کار را ممنوع می کنیم. در غیر این صورت، ما چندین راه اندازی همزمان خواهیم داشتVerticaOperator(نگاه به یک میز)؛report_update >> [email, tg]- همهVerticaOperatorدر ارسال نامه ها و پیام ها همگرا شوند، مانند این:

اما از آنجایی که اپراتورهای اعلان کننده شرایط راه اندازی متفاوتی دارند، تنها یکی کار می کند. در نمای درختی، همه چیز کمی کمتر بصری به نظر می رسد:

من چند کلمه در مورد ماکروها و دوستانشان - متغیرها.
ماکروها مکانهایی هستند که میتوانند اطلاعات مفید مختلف را با آرگومانهای عملگر جایگزین کنند. به عنوان مثال، مانند این:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} به محتویات متغیر زمینه گسترش می یابد execution_date در قالب YYYY-MM-DD: 2020-07-14. بهترین بخش این است که متغیرهای زمینه به یک نمونه کار خاص (یک مربع در نمای درختی) میخ میشوند و وقتی دوباره راهاندازی میشوند، متغیرهای مکان به همان مقادیر گسترش مییابند.
مقادیر اختصاص داده شده را می توان با استفاده از دکمه Rendered در هر نمونه کار مشاهده کرد. وظیفه ارسال نامه به این صورت است:

و بنابراین در وظیفه ارسال پیام:

لیست کاملی از ماکروهای داخلی برای آخرین نسخه موجود در اینجا موجود است:
علاوه بر این، با کمک افزونه ها، ما می توانیم ماکروهای خود را اعلام کنیم، اما این داستان دیگری است.
علاوه بر موارد از پیش تعریف شده، می توانیم مقادیر متغیرهای خود را جایگزین کنیم (من قبلاً از این در کد بالا استفاده کردم). بیایید در ایجاد کنیم Admin/Variables یکی دو چیز:

هر چیزی که می توانید استفاده کنید:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')مقدار می تواند یک اسکالر یا JSON باشد. در مورد JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}فقط از مسیر کلید مورد نظر استفاده کنید: {{ var.json.bot_config.bot.token }}.
من به معنای واقعی کلمه یک کلمه می گویم و یک اسکرین شات در مورد آن نشان می دهم ارتباطات. اینجا همه چیز ابتدایی است: در صفحه Admin/Connections ما یک اتصال ایجاد می کنیم، لاگین / رمز عبور و پارامترهای خاص تر را در آنجا اضافه می کنیم. مثل این:

گذرواژهها را میتوان رمزگذاری کرد (بهطور کاملتر از پیشفرض)، یا میتوانید نوع اتصال را حذف کنید (همانطور که من انجام دادم tg_main) - واقعیت این است که لیست انواع در مدلهای Airflow متصل است و نمیتوان بدون وارد شدن به کدهای منبع آن را گسترش داد (اگر ناگهان چیزی را در گوگل جستجو نکردم، لطفاً من را اصلاح کنید)، اما هیچ چیز ما را از دریافت اعتبار باز نمیدارد. نام.
شما همچنین می توانید چندین اتصال با یک نام ایجاد کنید: در این مورد، روش BaseHook.get_connection()، که اتصالات را با نام به ما می دهد، خواهد داد تصادفی از چندین همنام (منطقی تر است که Round Robin بسازیم، اما اجازه دهید آن را به وجدان توسعه دهندگان Airflow واگذار کنیم).
مطمئناً متغیرها و اتصالات ابزارهای جالبی هستند، اما مهم است که تعادل را از دست ندهید: کدام قسمتهای جریان خود را در خود کد ذخیره میکنید و کدام قسمتها را برای ذخیره به Airflow میدهید. از یک طرف، تغییر سریع مقدار، به عنوان مثال، یک صندوق پستی، از طریق UI می تواند راحت باشد. از طرف دیگر، این هنوز بازگشتی به کلیک ماوس است که ما (من) می خواستیم از شر آن خلاص شویم.
کار با اتصالات یکی از وظایف است قلاب ها. به طور کلی قلاب های جریان هوا نقاطی برای اتصال آن به خدمات و کتابخانه های شخص ثالث هستند. به عنوان مثال، JiraHook یک کلاینت را برای ما باز می کند تا با Jira تعامل داشته باشیم (شما می توانید وظایف را به جلو و عقب ببرید) و با کمک SambaHook می توانید یک فایل محلی را به آن فشار دهید smb-نقطه.
تجزیه اپراتور سفارشی
و ما به بررسی نحوه ساخت آن نزدیک شدیم TelegramBotSendMessage
رمز commons/operators.py با اپراتور واقعی:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)در اینجا، مانند هر چیز دیگری در Airflow، همه چیز بسیار ساده است:
- به ارث رسیده از
BaseOperator، که چندین چیز خاص جریان هوا را اجرا می کند (به اوقات فراغت خود نگاه کنید) - فیلدهای اعلام شده
template_fields، که در آن Jinja به دنبال ماکروهایی برای پردازش خواهد بود. - آرگومان های درست را ترتیب داد
__init__()، پیش فرض ها را در صورت لزوم تنظیم کنید. - ما در مورد مقداردهی اولیه اجداد نیز فراموش نکردیم.
- قلاب مربوطه را باز کرد
TelegramBotHookیک شی مشتری از آن دریافت کرد. - روش لغو (بازتعریف شده).
BaseOperator.execute()، هنگامی که زمان راه اندازی اپراتور برسد Airfow آن را تکان می دهد - در آن ما عمل اصلی را اجرا می کنیم و فراموش می کنیم که وارد شوید. (به هر حال، دقیقاً وارد می شویمstdoutиstderr- جریان هوا همه چیز را قطع می کند، آن را به زیبایی می پیچد، در صورت لزوم تجزیه می کند.)
ببینیم چی داریم commons/hooks.py. قسمت اول فایل با خود قلاب:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientمن حتی نمی دانم در اینجا چه توضیحی بدهم، فقط به نکات مهم توجه می کنم:
- ما ارث می بریم، در مورد استدلال ها فکر می کنیم - در بیشتر موارد یکی خواهد بود:
conn_id; - نادیده گرفتن روش های استاندارد: من خودم را محدود کردم
get_conn()، که در آن پارامترهای اتصال را با نام دریافت می کنم و فقط بخش را دریافت می کنمextra(این یک فیلد JSON است)، که من (طبق دستورالعمل خودم!) توکن ربات تلگرام را در آن قرار دادم:{"bot_token": "YOuRAwEsomeBOtToKen"}. - من یک نمونه از ما ایجاد می کنم
TelegramBot، به آن یک نشانه خاص می دهد.
همین. شما می توانید یک مشتری از یک قلاب با استفاده از TelegramBotHook().clent یا TelegramBotHook().get_conn().
و قسمت دوم فایل، که در آن یک microwrapper برای تلگرام REST API درست می کنم تا همان را درگ نکنم. برای یک روش sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))راه صحیح این است که همه را جمع کنید:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- در افزونه، در یک مخزن عمومی قرار دهید و آن را به منبع باز بدهید.
در حالی که ما در حال مطالعه این همه بودیم، به روز رسانی گزارش ما با موفقیت شکست خورد و یک پیام خطا در کانال برای من ارسال کرد. میرم چک کنم ببینم اشتباهه...

چیزی در دوج ما شکست! آیا این همان چیزی نیست که ما انتظارش را داشتیم؟ دقیقا!
میخوای بریزی؟
آیا احساس می کنید چیزی را از دست داده ام؟ گویا قول داده از SQL Server به Vertica انتقال داده و بعد برداشته و از تاپیک خارج شده رذل!
این ظلم عمدی بود، من به سادگی مجبور شدم برخی اصطلاحات را برای شما رمزگشایی کنم. حالا می توانید جلوتر بروید.
برنامه ما این بود:
- دگ کن
- وظایف ایجاد کنید
- ببین همه چی قشنگه
- شماره جلسات را به پرها اختصاص دهید
- دریافت داده از SQL Server
- داده ها را در Vertica قرار دهید
- جمع آوری آمار
بنابراین، برای اینکه همه اینها راه اندازی شود، یک افزونه کوچک به ما اضافه کردم docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyدر آنجا مطرح می کنیم:
- Vertica به عنوان میزبان
dwhبا بیشترین تنظیمات پیش فرض، - سه نمونه از SQL Server،
- ما پایگاه های داده در دومی را با برخی از داده ها پر می کنیم (به هیچ وجه به آن نگاه نکنید
mssql_init.py!)
ما همه چیزهای خوب را با کمک یک دستور کمی پیچیده تر از دفعه قبل راه اندازی می کنیم:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3آنچه تصادفی ساز معجزه ما ایجاد کرد، می توانید از مورد استفاده کنید Data Profiling/Ad Hoc Query:

نکته اصلی این است که آن را به تحلیلگران نشان ندهید
شرح دادن در جلسات ETL من نمی خواهم، همه چیز در آنجا بی اهمیت است: ما یک پایه می سازیم، یک علامت در آن وجود دارد، ما همه چیز را با یک مدیر زمینه می بندیم، و اکنون این کار را انجام می دهیم:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passزمان آن رسیده است داده های ما را جمع آوری کنید از یکصد و نیم میز ما. بیایید این کار را با کمک خطوط بسیار بی تکلف انجام دهیم:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- با کمک یک قلاب از جریان هوا می گیریم
pymssql-اتصال - بیایید یک محدودیت را به شکل تاریخ در درخواست جایگزین کنیم - توسط موتور الگو به تابع پرتاب می شود.
- تغذیه درخواست ما
pandasچه کسی ما را خواهد گرفتDataFrame- در آینده برای ما مفید خواهد بود.
من از جایگزینی استفاده می کنم
{dt}به جای پارامتر درخواست%sنه به این دلیل که من یک پینوکیوی شیطانی هستم، بلکه به این دلیلpandasنمی تواند اداره کندpymssqlو آخری را می لغزدparams: Listاگرچه او واقعاً می خواهدtuple.
همچنین توجه داشته باشید که توسعه دهندهpymssqlتصمیم گرفت که دیگر از او حمایت نکند و وقت آن رسیده است که بیرون بیاییpyodbc.
بیایید ببینیم که Airflow آرگومان های توابع ما را با چه چیزی پر کرده است:

اگر داده ای وجود نداشته باشد، ادامه دادن فایده ای ندارد. اما این نیز عجیب است که پر کردن را موفق بدانیم. اما این یک اشتباه نیست. آخ آخ چیکار کنم؟! و این چیزی است که:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException به Airflow می گوید که هیچ خطایی وجود ندارد، اما ما از انجام وظیفه صرف نظر می کنیم. رابط کاربری مربع سبز یا قرمز نخواهد داشت، بلکه صورتی خواهد داشت.
بیایید داده هایمان را پرتاب کنیم چندین ستون:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])یعنی
- پایگاه داده ای که از آن سفارش ها را گرفته ایم،
- شناسه جلسه سیل ما (متفاوت خواهد بود برای هر کار),
- یک هش از منبع و شناسه سفارش - به طوری که در پایگاه داده نهایی (جایی که همه چیز در یک جدول ریخته می شود) یک شناسه سفارش منحصر به فرد داشته باشیم.
مرحله ماقبل آخر باقی می ماند: همه چیز را در Vertica بریزید. و، به اندازه کافی عجیب، یکی از دیدنی ترین و کارآمدترین راه ها برای انجام این کار از طریق CSV است!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- ما در حال ساخت یک گیرنده ویژه هستیم
StringIO. pandasبا مهربانی ما را قرار خواهد دادDataFrameبه شکلCSV-خطوط- بیایید یک اتصال به Vertica مورد علاقه خود را با یک قلاب باز کنیم.
- و حالا با کمک
copy()داده های ما را مستقیماً به Vertika ارسال کنید!
ما از راننده می گیریم که چند خط پر شده است و به مدیر جلسه می گوییم که همه چیز درست است:
session.loaded_rows = cursor.rowcount
session.successful = Trueفقط همین.
در فروش، صفحه هدف را به صورت دستی ایجاد می کنیم. در اینجا به خودم اجازه دادم یک ماشین کوچک داشته باشم:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)من با استفاده از
VerticaOperator()من یک طرح پایگاه داده و یک جدول ایجاد می کنم (البته اگر قبلاً وجود نداشته باشند). نکته اصلی این است که وابستگی ها را به درستی مرتب کنید:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadجمعبندی
- خوب، - گفت موش کوچولو، - اینطور نیست، حالا
آیا متقاعد شده اید که من وحشتناک ترین حیوان جنگل هستم؟
جولیا دونالدسون، گروفالو
فکر میکنم اگر من و همکارانم رقابتی داشتیم: چه کسی به سرعت یک فرآیند ETL را از ابتدا ایجاد و راهاندازی میکند: آنها با SSIS و ماوس خود و من با Airflow... و سپس ما همچنین سهولت تعمیر و نگهداری را با هم مقایسه میکنیم... وای، فکر کنم قبول داری که در همه جبهه ها آنها را شکست خواهم داد!
اگر کمی جدی تر، Apache Airflow - با توصیف فرآیندها در قالب کد برنامه - کار من را انجام داد. خیلی راحت تر و لذت بخش تر
توسعه نامحدود آن، هم از نظر پلاگین ها و هم از نظر تمایل به مقیاس پذیری، به شما این فرصت را می دهد که از Airflow تقریبا در هر زمینه ای استفاده کنید: حتی در چرخه کامل جمع آوری، آماده سازی و پردازش داده ها، حتی در پرتاب موشک (به مریخ، دوره).
قسمت نهایی، مرجع و اطلاعات
چنگکی که برای شما جمع آوری کرده ایم
start_date. بله، این قبلاً یک میم محلی است. از طریق استدلال اصلی داگstart_dateهمه عبور می کنند به طور خلاصه، اگر در آن مشخص کنیدstart_dateتاریخ فعلی، وschedule_interval- یک روز، DAG فردا زودتر شروع می شود.start_date = datetime(2020, 7, 7, 0, 1, 2)و دیگر مشکلی وجود ندارد.
خطای زمان اجرا دیگری نیز با آن مرتبط است:
Task is missing the start_date parameter، که اغلب نشان می دهد که شما فراموش کرده اید به عملگر dag متصل شوید.- همه روی یک دستگاه بله، و پایه ها (خود جریان هوا و پوشش ما)، و وب سرور، و زمانبندی، و کارگران. و حتی کار کرد. اما با گذشت زمان، تعداد وظایف خدمات افزایش یافت و زمانی که PostgreSQL شروع به پاسخگویی به ایندکس به جای 20 میلی ثانیه در 5 ثانیه کرد، آن را برداشتیم و آن را دور بردیم.
- LocalExecutor. بله، ما هنوز روی آن نشستهایم و تا به حال به لبه پرتگاه رسیدهایم. LocalExecutor تا اینجا برای ما کافی بوده است، اما اکنون زمان آن رسیده که حداقل با یک کارگر توسعه دهیم و برای انتقال به CeleryExecutor باید سخت کار کنیم. و با توجه به این واقعیت که شما می توانید با آن در یک دستگاه کار کنید، هیچ چیز شما را از استفاده از Celery حتی بر روی یک سرور باز نمی دارد، که "البته، صادقانه بگویم، هرگز تولید نخواهد شد!"
- عدم استفاده ابزارهای داخلی:
- اتصالات برای ذخیره اعتبار خدمات،
- SLA از دست می دهد برای پاسخگویی به وظایفی که به موقع انجام نشده اند،
- xcom برای تبادل ابرداده (گفتم متاداده!) بین وظایف dag.
- سوء استفاده از نامه خوب چه می توانم بگویم؟ هشدارها برای همه تکرارهای کارهای افتاده تنظیم شده بودند. اکنون جیمیل کاری من بیش از 90 هزار ایمیل از Airflow دارد، و مخزن ایمیل از دریافت و حذف بیش از 100 ایمیل در یک زمان خودداری می کند.
دام های بیشتر:
ابزارهای اتوماسیون بیشتر
برای اینکه حتی بیشتر با سر کار کنیم نه با دست، Airflow این را برای ما آماده کرده است:
- - او هنوز وضعیت تجربی را دارد که مانع از کار او نمی شود. با استفاده از آن، نه تنها می توانید اطلاعاتی در مورد داگ ها و وظایف دریافت کنید، بلکه می توانید یک داگ را متوقف/شروع کنید، یک DAG Run یا یک استخر ایجاد کنید.
- - ابزارهای زیادی از طریق خط فرمان در دسترس هستند که نه تنها برای استفاده از طریق WebUI ناخوشایند هستند، بلکه به طور کلی وجود ندارند. مثلا:
backfillبرای راه اندازی مجدد نمونه های کار مورد نیاز است.
به عنوان مثال، تحلیلگران آمدند و گفتند: «و شما رفیق، از اول تا سیزدهم ژانویه در داده های مزخرف دارید! درستش کن، درستش کن، درستش کن، درستش کن!» و شما یک سرگرمی هستید:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- خدمات پایه:
initdb,resetdb,upgradedb,checkdb. run، که به شما امکان می دهد یک کار نمونه را اجرا کنید و حتی به همه وابستگی ها امتیاز دهید. علاوه بر این، می توانید آن را از طریق اجرا کنیدLocalExecutor، حتی اگر خوشه کرفس داشته باشید.- تقریبا همین کار را می کند
test، فقط در پایه ها هم چیزی نمی نویسد. connectionsامکان ایجاد انبوه اتصالات از پوسته را فراهم می کند.
- - روشی نسبتاً سختگیر برای تعامل که برای پلاگین ها در نظر گرفته شده است و با دستان کوچک در آن ازدحام نمی کند. اما چه کسی مانع رفتن ما می شود
/home/airflow/dags، اجرا کنipythonو شروع به خرابکاری کنید؟ برای مثال می توانید تمام اتصالات را با کد زیر صادر کنید:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - اتصال به پایگاه داده های فراداده Airflow. من نوشتن در آن را توصیه نمیکنم، اما دریافت وضعیتهای وظیفه برای معیارهای خاص مختلف میتواند بسیار سریعتر و آسانتر از هر یک از APIها باشد.
بیایید بگوییم که همه وظایف ما ناتوان نیست، اما گاهی اوقات ممکن است سقوط کنند، و این طبیعی است. اما چند انسداد در حال حاضر مشکوک هستند و بررسی آن ضروری است.
مراقب SQL باشید!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
مراجع
و البته ده لینک اول از صدور گوگل محتویات پوشه Airflow از بوکمارک های من است.
- - البته، ما باید از دفتر شروع کنیم. مستندات، اما چه کسی دستورالعمل ها را می خواند؟
- - خوب، حداقل توصیه های سازندگان را بخوانید.
- - همان ابتدا: رابط کاربری در تصاویر
- - مفاهیم اساسی به خوبی توصیف شده است، اگر (ناگهان!) چیزی از من متوجه نشدید.
- - راهنمای کوتاهی برای تنظیم یک خوشه جریان هوا.
- - تقریباً همان مقاله جالب، به جز شاید فرمالیسم بیشتر، و نمونه های کمتر.
- - در مورد کار در ارتباط با کرفس.
- - در مورد ناتوانی وظایف، بارگیری با شناسه به جای تاریخ، تبدیل، ساختار فایل و چیزهای جالب دیگر.
- - وابستگی وظایف و قانون ماشه که فقط به صورت گذرا به آن اشاره کردم.
- - نحوه غلبه بر برخی "کارها همانطور که در نظر گرفته شده" در زمانبندی، بارگیری داده های از دست رفته و اولویت بندی وظایف.
- - پرس و جوهای مفید SQL برای ابرداده های جریان هوا.
- - یک بخش مفید در مورد ایجاد یک سنسور سفارشی وجود دارد.
- - یک یادداشت کوتاه جالب در مورد ایجاد زیرساخت در AWS برای علم داده.
- - اشتباهات رایج (زمانی که کسی هنوز دستورالعمل ها را نمی خواند).
- - لبخند بزنید که مردم چگونه رمزهای عبور را ذخیره می کنند، اگرچه شما فقط می توانید از Connections استفاده کنید.
- - ارسال ضمنی DAG، پرتاب متن در توابع، دوباره در مورد وابستگی ها، و همچنین در مورد پرش از راه اندازی وظایف.
- - در مورد استفاده
default argumentsиparamsدر قالب ها و همچنین متغیرها و اتصالات. - - داستانی در مورد چگونگی آماده شدن برنامه ریز برای Airflow 2.0.
- - یک مقاله کمی قدیمی در مورد استقرار خوشه ما در
docker-compose. - - وظایف پویا با استفاده از الگوها و ارسال زمینه.
- - اعلان های استاندارد و سفارشی از طریق پست و Slack.
- - وظایف شعبه، ماکروها و XCom.
و لینک های استفاده شده در مقاله:
- - متغیرهایی در دسترس برای استفاده در قالب ها.
- - اشتباهات رایج هنگام ایجاد داگ.
- -
docker-composeبرای آزمایش، اشکال زدایی و موارد دیگر. - - پوشش پایتون برای Telegram REST API.
منبع: www.habr.com




