سلام، من دیمیتری لوگویننکو هستم - مهندس داده بخش تجزیه و تحلیل گروه شرکت های Vezet.
من در مورد یک ابزار فوق العاده برای توسعه فرآیندهای ETL - Apache Airflow به شما خواهم گفت. اما Airflow به قدری همه کاره و چندوجهی است که حتی اگر درگیر جریان داده ها نیستید، اما نیاز به راه اندازی دوره ای هر فرآیند و نظارت بر اجرای آنها دارید، باید نگاه دقیق تری به آن بیندازید.
و بله، من نه تنها می گویم، بلکه نشان می دهم: این برنامه کد، اسکرین شات و توصیه های زیادی دارد.
چیزی که معمولاً هنگام جستجوی کلمه Airflow / Wikimedia Commons در گوگل مشاهده می کنید
- فقط بهتر است، و برای اهداف کاملاً متفاوت ساخته شده است، یعنی (همانطور که قبل از کاتا نوشته شده است):
اجرای و نظارت بر وظایف بر روی تعداد نامحدودی از ماشین ها (همانطور که بسیاری از Celery / Kubernetes و وجدان شما به شما اجازه می دهد)
با تولید گردش کار پویا از نوشتن و درک کد پایتون بسیار آسان است
و امکان اتصال هر پایگاه داده و API با یکدیگر با استفاده از اجزای آماده و پلاگین های خانگی (که بسیار ساده است).
ما از Apache Airflow به صورت زیر استفاده می کنیم:
ما داده ها را از منابع مختلف (بسیاری از نمونه های SQL Server و PostgreSQL، API های مختلف با معیارهای برنامه، حتی 1C) در DWH و ODS جمع آوری می کنیم (ما Vertica و Clickhouse را داریم).
چقدر پیشرفته cron، که فرآیندهای یکپارچه سازی داده ها را در ODS شروع می کند و همچنین نگهداری آنها را نظارت می کند.
تا همین اواخر، نیازهای ما توسط یک سرور کوچک با 32 هسته و 50 گیگابایت رم پوشش داده می شد. در جریان هوا، این کار می کند:
بیش 200 دگ (در واقع گردش کار، که در آن وظایف را پر کرده ایم)،
در هر یک به طور متوسط 70 کار,
این خوبی شروع می شود (همچنین به طور متوسط) یک بار در ساعت.
و در مورد نحوه گسترش ما، در زیر می نویسم، اما اکنون اجازه دهید مشکل über را که حل خواهیم کرد، تعریف کنیم:
سه منبع SQL Server وجود دارد که هر کدام دارای 50 پایگاه داده هستند - نمونه هایی از یک پروژه، به ترتیب، ساختار یکسانی دارند (تقریبا در همه جا، mua-ha-ha)، به این معنی که هر کدام یک جدول Orders (خوشبختانه، یک جدول با آن نام را می توان به هر کسب و کاری فشار داد). ما داده ها را با افزودن فیلدهای سرویس (سرور منبع، پایگاه داده منبع، شناسه وظیفه ETL) می گیریم و ساده لوحانه آنها را مثلاً به Vertica می اندازیم.
بریم!
بخش اصلی، عملی (و کمی تئوری)
چرا ما (و شما)
وقتی درخت ها بزرگ بودند و من ساده بودم SQLدر یکی از خردهفروشیهای روسی، ما با استفاده از دو ابزاری که در دسترس ما بود، از فرآیندهای ETL که به جریانهای داده معروف است کلاهبرداری کردیم:
مرکز برق انفورماتیکا - یک سیستم بسیار گسترده، بسیار سازنده، با سخت افزار خاص خود، نسخه سازی خاص خود. خدای نکرده 1 درصد از قابلیت هاش استفاده کردم. چرا؟ خوب، اول از همه، این رابط، جایی از دهه 380، از نظر ذهنی ما را تحت فشار قرار داد. ثانیا، این ابزار برای فرآیندهای بسیار فانتزی، استفاده مجدد از اجزای خشمگین و دیگر ترفندهای بسیار مهم سازمانی طراحی شده است. در مورد هزینه آن، مانند بال ایرباس AXNUMX / سال، ما چیزی نخواهیم گفت.
مراقب باشید، یک اسکرین شات می تواند به افراد زیر 30 سال کمی آسیب برساند
سرور یکپارچه سازی SQL Server - ما از این رفیق در جریان های درون پروژه ای خود استفاده کردیم. خوب، در واقع: ما در حال حاضر از SQL Server استفاده می کنیم و استفاده نکردن از ابزارهای ETL آن به نوعی غیرمنطقی است. همه چیز در آن خوب است: هم رابط کاربری زیبا است و هم گزارش های پیشرفت... اما این دلیلی نیست که ما محصولات نرم افزاری را دوست داریم، اوه، نه برای این. نسخه آن dtsx (که XML با گرهها به هم ریخته در ذخیره است) میتوانیم، اما نکته چیست؟ در مورد ایجاد یک بسته کار که صدها جدول را از یک سرور به سرور دیگر بکشد، چطور؟ بله، چه صد، انگشت اشاره شما با کلیک بر روی دکمه ماوس از بیست تیکه می افتد. اما قطعا شیک تر به نظر می رسد:
مطمئناً به دنبال راههایی برای خروج بودیم. مورد حتی تقریبا به یک ژنراتور بسته SSIS که خود نوشته شده بود رسید ...
… و سپس یک کار جدید برای من پیدا کرد. و Apache Airflow از من سبقت گرفت.
وقتی فهمیدم که توضیحات فرآیند ETL کدهای ساده پایتون هستند، از خوشحالی نرقصیدم. اینگونه بود که جریانهای داده نسخهبندی و متفاوت شدند، و ریختن جداول با ساختار واحد از صدها پایگاه داده در یک هدف تبدیل به یک موضوع کد پایتون در یک و نیم یا دو صفحه 13 اینچی شد.
جمع آوری خوشه
بیایید یک مهدکودک کاملاً ترتیب دهیم و در اینجا در مورد چیزهای کاملاً واضح صحبت نکنیم، مانند نصب Airflow، پایگاه داده انتخابی شما، Celery و موارد دیگری که در اسکله ها توضیح داده شده است.
برای اینکه بتوانیم بلافاصله آزمایشات را شروع کنیم، طرح کردم docker-compose.yml که در آن:
بیایید در واقع بالا ببریم جریان هوا: زمانبندی، وب سرور. گل همچنین برای نظارت بر وظایف کرفس در آنجا می چرخد (زیرا قبلاً به آن فشار داده شده است apache/airflow:1.10.10-python3.7، اما ما مشکلی نداریم)
PostgreSQL و، که در آن Airflow اطلاعات سرویس خود را می نویسد (داده های زمان بندی، آمار اجرا و غیره) و Celery وظایف تکمیل شده را علامت گذاری می کند.
Redis، که به عنوان یک کارگزار برای کرفس عمل می کند.
کارگر کرفس، که به اجرای مستقیم وظایف مشغول خواهد شد.
به پوشه ./dags ما فایل های خود را با توضیحات dags اضافه می کنیم. آنها در حال پرواز برداشته می شوند، بنابراین پس از هر عطسه نیازی به دستکاری کل پشته نیست.
در بعضی جاها، کد موجود در مثال ها به طور کامل نشان داده نمی شود (تا متن را به هم نریزد)، اما در جایی در این فرآیند اصلاح می شود. نمونه های کامل کد کار را می توان در مخزن یافت https://github.com/dm-logv/airflow-tutorial.
در مونتاژ ترکیب، من تا حد زیادی به تصویر شناخته شده تکیه کردم puckel/docker-flow airflow - حتما بررسی کنید شاید شما به هیچ چیز دیگری در زندگی خود نیاز ندارید.
همه تنظیمات جریان هوا نه تنها از طریق در دسترس هستند airflow.cfg، بلکه از طریق متغیرهای محیطی (با تشکر از توسعه دهندگان) که من به طور مخرب از آنها استفاده کردم.
طبیعتاً آماده تولید نیست: من عمداً ضربان قلب را روی ظروف قرار ندادم ، امنیت را به زحمت نینداختم. اما من حداقل های مناسب را برای آزمایش کنندگانمان انجام دادم.
توجه داشته باشید که:
پوشه dag باید هم برای زمانبندی و هم برای کارگران قابل دسترسی باشد.
همین امر در مورد تمام کتابخانه های شخص ثالث صدق می کند - همه آنها باید روی ماشین هایی با زمانبندی و کارگران نصب شوند.
خوب، حالا ساده است:
$ docker-compose up --scale worker=3
بعد از اینکه همه چیز بالا رفت، می توانید به رابط های وب نگاه کنید:
اگر در تمام این "دگ ها" چیزی متوجه نشدید، در اینجا یک فرهنگ لغت کوتاه آمده است:
زمان بند - مهمترین عمو در جریان هوا، که کنترل می کند که روبات ها سخت کار کنند، نه یک شخص: برنامه را نظارت می کند، Dags را به روز می کند، وظایف را راه اندازی می کند.
به طور کلی، در نسخه های قدیمی، او با حافظه مشکل داشت (نه، فراموشی نیست، اما نشتی) و پارامتر legacy حتی در تنظیمات باقی می ماند. run_duration - فاصله شروع مجدد آن اما اکنون همه چیز خوب است.
DAG (با نام مستعار "داگ") - "گراف غیر چرخه ای جهت دار"، اما چنین تعریفی به افراد کمی می گوید، اما در واقع محفظه ای برای وظایفی است که با یکدیگر در تعامل هستند (پایین را ببینید) یا یک آنالوگ از Package در SSIS و Workflow در Informatica .
علاوه بر داگ ها، ممکن است هنوز هم زیرداگ ها وجود داشته باشد، اما به احتمال زیاد به آنها نخواهیم رسید.
DAG Run - داگ مقدار دهی اولیه شده که به خود اختصاص داده می شود execution_date. داگران های همان داگ می توانند به صورت موازی کار کنند (البته اگر وظایف خود را بی قدرت کرده باشید).
اپراتور قطعاتی از کد هستند که مسئول انجام یک عمل خاص هستند. سه نوع عملگر وجود دارد:
اقداممانند مورد علاقه ما PythonOperator، که می تواند هر کد (معتبر) پایتون را اجرا کند.
انتقال، که داده ها را از مکانی به مکان دیگر منتقل می کند، مثلاً، MsSqlToHiveTransfer;
حسی از سوی دیگر، به شما این امکان را می دهد که تا زمانی که یک رویداد رخ دهد، واکنش نشان دهید یا اجرای بیشتر Dag را کاهش دهید. HttpSensor می تواند نقطه پایانی مشخص شده را بکشد و هنگامی که پاسخ مورد نظر در انتظار است، انتقال را شروع کند GoogleCloudStorageToS3Operator. یک ذهن کنجکاو خواهد پرسید: «چرا؟ پس از همه، شما می توانید تکرارها را درست در اپراتور انجام دهید! و سپس، برای اینکه مجموعه وظایف با اپراتورهای معلق مسدود نشود. سنسور شروع می شود، بررسی می کند و قبل از تلاش بعدی می میرد.
کار - اپراتورهای اعلام شده، صرف نظر از نوع، و متصل به داگ به رتبه وظیفه ارتقا می یابند.
نمونه کار - زمانی که برنامه ریز کل تصمیم گرفت که زمان ارسال وظایف به نبرد بر روی کارگران مجری است (در صورت استفاده درست در محل LocalExecutor یا به یک گره راه دور در مورد CeleryExecutor، یک زمینه را به آنها اختصاص می دهد (به عنوان مثال، مجموعه ای از متغیرها - پارامترهای اجرا)، قالب های فرمان یا پرس و جو را گسترش می دهد و آنها را ادغام می کند.
ما وظایف تولید می کنیم
ابتدا طرح کلی دوغ خود را ترسیم می کنیم و سپس بیشتر و بیشتر به جزئیات می پردازیم، زیرا ما برخی از راه حل های غیر ضروری را اعمال می کنیم.
بنابراین، در ساده ترین شکل خود، چنین داگ به شکل زیر خواهد بود:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
بیایید آن را بفهمیم:
ابتدا lib های لازم را وارد می کنیم و یک چیز دیگر;
sql_server_ds - آیا List[namedtuple[str, str]] با نام اتصالات از Airflow Connections و پایگاه های داده ای که صفحه خود را از آنها خواهیم گرفت.
dag - اعلام دگ ما که حتما باید در باشد globals()، در غیر این صورت Airflow آن را پیدا نمی کند. داگ همچنین باید بگوید:
اسمش چیه orders - سپس این نام در رابط وب ظاهر می شود،
که از نیمه شب هشتم ژوئیه کار خواهد کرد،
و باید تقریباً هر 6 ساعت یکبار اجرا شود (برای افراد سرسخت اینجا به جای timedelta() قابل قبول cron-خط 0 0 0/6 ? * * *، برای کمتر سرد - عبارتی مانند @daily);
workflow() کار اصلی را انجام خواهد داد، اما نه اکنون. در حال حاضر، ما فقط زمینه خود را در لاگ می اندازیم.
و اکنون جادوی ساده ایجاد وظایف:
ما از طریق منابع خود استفاده می کنیم.
مقداردهی اولیه PythonOperator، که ساختگی ما را اجرا می کند workflow(). فراموش نکنید که یک نام منحصر به فرد (در داخل داگ) کار را مشخص کنید و خود داگ را ببندید. پرچم provide_context به نوبه خود، آرگومان های اضافی را در تابع می ریزد که با استفاده از آنها به دقت جمع آوری می کنیم **context.
در حال حاضر، این همه است. آنچه به دست آوردیم:
داگ جدید در رابط وب،
صد و نیم کار که به صورت موازی اجرا می شوند (اگر تنظیمات Airflow، Celery و ظرفیت سرور اجازه دهد).
خب تقریبا فهمیدم
چه کسی وابستگی ها را نصب خواهد کرد؟
برای سادهتر کردن این موضوع، من را به هم زدم docker-compose.yml در حال پردازش requirements.txt روی همه گره ها
حالا رفته:
مربع های خاکستری نمونه های کار هستند که توسط زمان بندی پردازش می شوند.
ما کمی صبر می کنیم، کارها توسط کارگران انجام می شود:
سبزها البته کار خود را با موفقیت به پایان رسانده اند. قرمزها چندان موفق نیستند.
به هر حال، هیچ پوشه ای در پرود ما وجود ندارد ./dags، هیچ هماهنگی بین ماشین ها وجود ندارد - همه دگ ها در آن قرار دارند git در Gitlab ما، و Gitlab CI بهروزرسانیها را در صورت ادغام در ماشینها اجرا میکند master.
کمی در مورد گل
در حالی که کارگران در حال کوبیدن پستانک های ما هستند، بیایید ابزار دیگری را به یاد بیاوریم که می تواند چیزی را به ما نشان دهد - گل.
اولین صفحه با اطلاعات خلاصه در مورد گره های کارگر:
شدیدترین صفحه با وظایفی که به کار خود ادامه دادند:
خسته کننده ترین صفحه با وضعیت کارگزار ما:
روشن ترین صفحه با نمودارهای وضعیت کار و زمان اجرای آنها است:
ما زیر بار را بارگذاری می کنیم
بنابراین، تمام وظایف انجام شده است، شما می توانید مجروحان را با خود ببرید.
و تعداد زیادی مجروح وجود داشت - به دلایلی. در مورد استفاده صحیح از Airflow، همین مربع ها نشان می دهد که داده ها قطعاً به دست نیامده اند.
شما باید گزارش را تماشا کنید و نمونه های کار افتاده را مجدداً راه اندازی کنید.
با کلیک بر روی هر مربع، اقداماتی را که در دسترس ما هستند مشاهده خواهیم کرد:
شما می توانید بردارید و Clear the fallen را بسازید. یعنی فراموش میکنیم که چیزی در آنجا شکست خورده است و همان کار نمونه به زمانبندی میرود.
واضح است که انجام این کار با ماوس با تمام مربع های قرمز چندان انسانی نیست - این چیزی نیست که ما از Airflow انتظار داریم. طبیعتاً ما سلاح های کشتار جمعی داریم: Browse/Task Instances
بیایید همه چیز را یکباره انتخاب کنیم و به صفر بازنشانی کنیم، روی مورد صحیح کلیک کنید:
پس از تمیز کردن، تاکسیهای ما به این شکل هستند (آنها از قبل منتظر برنامهریزی هستند تا آنها را برنامه ریزی کند):
اتصالات، قلاب ها و سایر متغیرها
وقت آن است که به DAG بعدی نگاه کنیم، update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
آیا تا به حال همه یک به روز رسانی گزارش انجام داده اند؟ این دوباره اوست: فهرستی از منابع وجود دارد که از کجا می توان داده ها را دریافت کرد. لیستی وجود دارد که در آن قرار دهید. فراموش نکنید که وقتی همه چیز اتفاق افتاد یا شکست، بوق بزنید (خب، این مربوط به ما نیست، نه).
بیایید دوباره فایل را مرور کنیم و به موارد مبهم جدید نگاه کنیم:
from commons.operators import TelegramBotSendMessage - هیچ چیز ما را از ساخت اپراتورهای خودمان باز نمی دارد که با ساختن یک لفاف کوچک برای ارسال پیام به Unblocked از آن استفاده کردیم. (در ادامه در مورد این اپراتور بیشتر صحبت خواهیم کرد).
default_args={} - dag می تواند همان آرگومان ها را بین تمام عملگرهای خود توزیع کند.
to='{{ var.value.all_the_kings_men }}' - رشته to ما هاردکد نخواهیم داشت، بلکه به صورت پویا با استفاده از Jinja و یک متغیر با لیستی از ایمیلها تولید میکنیم که من با دقت در آن قرار دادم. Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS - شرط راه اندازی اپراتور. در مورد ما، نامه فقط در صورتی به رئیسان ارسال می شود که همه وابستگی ها برطرف شده باشند با موفقیت;
tg_bot_conn_id='tg_main' - استدلال conn_id شناسه های اتصالی را که در آن ایجاد می کنیم بپذیرید Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - پیام ها در تلگرام تنها در صورت وجود وظایف از بین می روند.
task_concurrency=1 - ما راه اندازی همزمان چند نمونه کار از یک کار را ممنوع می کنیم. در غیر این صورت، ما چندین راه اندازی همزمان خواهیم داشت VerticaOperator (نگاه به یک میز)؛
report_update >> [email, tg] - همه VerticaOperator در ارسال نامه ها و پیام ها همگرا شوند، مانند این:
اما از آنجایی که اپراتورهای اعلان کننده شرایط راه اندازی متفاوتی دارند، تنها یکی کار می کند. در نمای درختی، همه چیز کمی کمتر بصری به نظر می رسد:
من چند کلمه در مورد ماکروها و دوستانشان - متغیرها.
ماکروها مکانهایی هستند که میتوانند اطلاعات مفید مختلف را با آرگومانهای عملگر جایگزین کنند. به عنوان مثال، مانند این:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} به محتویات متغیر زمینه گسترش می یابد execution_date در قالب YYYY-MM-DD: 2020-07-14. بهترین بخش این است که متغیرهای زمینه به یک نمونه کار خاص (یک مربع در نمای درختی) میخ میشوند و وقتی دوباره راهاندازی میشوند، متغیرهای مکان به همان مقادیر گسترش مییابند.
مقادیر اختصاص داده شده را می توان با استفاده از دکمه Rendered در هر نمونه کار مشاهده کرد. وظیفه ارسال نامه به این صورت است:
و بنابراین در وظیفه ارسال پیام:
لیست کاملی از ماکروهای داخلی برای آخرین نسخه موجود در اینجا موجود است: مرجع ماکروها
علاوه بر این، با کمک افزونه ها، ما می توانیم ماکروهای خود را اعلام کنیم، اما این داستان دیگری است.
علاوه بر موارد از پیش تعریف شده، می توانیم مقادیر متغیرهای خود را جایگزین کنیم (من قبلاً از این در کد بالا استفاده کردم). بیایید در ایجاد کنیم Admin/Variables یکی دو چیز:
فقط از مسیر کلید مورد نظر استفاده کنید: {{ var.json.bot_config.bot.token }}.
من به معنای واقعی کلمه یک کلمه می گویم و یک اسکرین شات در مورد آن نشان می دهم ارتباطات. اینجا همه چیز ابتدایی است: در صفحه Admin/Connections ما یک اتصال ایجاد می کنیم، لاگین / رمز عبور و پارامترهای خاص تر را در آنجا اضافه می کنیم. مثل این:
گذرواژهها را میتوان رمزگذاری کرد (بهطور کاملتر از پیشفرض)، یا میتوانید نوع اتصال را حذف کنید (همانطور که من انجام دادم tg_main) - واقعیت این است که لیست انواع در مدلهای Airflow متصل است و نمیتوان بدون وارد شدن به کدهای منبع آن را گسترش داد (اگر ناگهان چیزی را در گوگل جستجو نکردم، لطفاً من را اصلاح کنید)، اما هیچ چیز ما را از دریافت اعتبار باز نمیدارد. نام.
شما همچنین می توانید چندین اتصال با یک نام ایجاد کنید: در این مورد، روش BaseHook.get_connection()، که اتصالات را با نام به ما می دهد، خواهد داد تصادفی از چندین همنام (منطقی تر است که Round Robin بسازیم، اما اجازه دهید آن را به وجدان توسعه دهندگان Airflow واگذار کنیم).
مطمئناً متغیرها و اتصالات ابزارهای جالبی هستند، اما مهم است که تعادل را از دست ندهید: کدام قسمتهای جریان خود را در خود کد ذخیره میکنید و کدام قسمتها را برای ذخیره به Airflow میدهید. از یک طرف، تغییر سریع مقدار، به عنوان مثال، یک صندوق پستی، از طریق UI می تواند راحت باشد. از طرف دیگر، این هنوز بازگشتی به کلیک ماوس است که ما (من) می خواستیم از شر آن خلاص شویم.
کار با اتصالات یکی از وظایف است قلاب ها. به طور کلی قلاب های جریان هوا نقاطی برای اتصال آن به خدمات و کتابخانه های شخص ثالث هستند. به عنوان مثال، JiraHook یک کلاینت را برای ما باز می کند تا با Jira تعامل داشته باشیم (شما می توانید وظایف را به جلو و عقب ببرید) و با کمک SambaHook می توانید یک فایل محلی را به آن فشار دهید smb-نقطه.
تجزیه اپراتور سفارشی
و ما به بررسی نحوه ساخت آن نزدیک شدیم TelegramBotSendMessage
رمز commons/operators.py با اپراتور واقعی:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
در اینجا، مانند هر چیز دیگری در Airflow، همه چیز بسیار ساده است:
به ارث رسیده از BaseOperator، که چندین چیز خاص جریان هوا را اجرا می کند (به اوقات فراغت خود نگاه کنید)
فیلدهای اعلام شده template_fields، که در آن Jinja به دنبال ماکروهایی برای پردازش خواهد بود.
آرگومان های درست را ترتیب داد __init__()، پیش فرض ها را در صورت لزوم تنظیم کنید.
ما در مورد مقداردهی اولیه اجداد نیز فراموش نکردیم.
قلاب مربوطه را باز کرد TelegramBotHookیک شی مشتری از آن دریافت کرد.
روش لغو (بازتعریف شده). BaseOperator.execute()، هنگامی که زمان راه اندازی اپراتور برسد Airfow آن را تکان می دهد - در آن ما عمل اصلی را اجرا می کنیم و فراموش می کنیم که وارد شوید. (به هر حال، دقیقاً وارد می شویم stdout и stderr - جریان هوا همه چیز را قطع می کند، آن را به زیبایی می پیچد، در صورت لزوم تجزیه می کند.)
ببینیم چی داریم commons/hooks.py. قسمت اول فایل با خود قلاب:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
من حتی نمی دانم در اینجا چه توضیحی بدهم، فقط به نکات مهم توجه می کنم:
ما ارث می بریم، در مورد استدلال ها فکر می کنیم - در بیشتر موارد یکی خواهد بود: conn_id;
نادیده گرفتن روش های استاندارد: من خودم را محدود کردم get_conn()، که در آن پارامترهای اتصال را با نام دریافت می کنم و فقط بخش را دریافت می کنم extra (این یک فیلد JSON است)، که من (طبق دستورالعمل خودم!) توکن ربات تلگرام را در آن قرار دادم: {"bot_token": "YOuRAwEsomeBOtToKen"}.
من یک نمونه از ما ایجاد می کنم TelegramBot، به آن یک نشانه خاص می دهد.
همین. شما می توانید یک مشتری از یک قلاب با استفاده از TelegramBotHook().clent یا TelegramBotHook().get_conn().
و قسمت دوم فایل، که در آن یک microwrapper برای تلگرام REST API درست می کنم تا همان را درگ نکنم. python-telegram-bot برای یک روش sendMessage.
راه صحیح این است که همه را جمع کنید: TelegramBotSendMessage, TelegramBotHook, TelegramBot - در افزونه، در یک مخزن عمومی قرار دهید و آن را به منبع باز بدهید.
در حالی که ما در حال مطالعه این همه بودیم، به روز رسانی گزارش ما با موفقیت شکست خورد و یک پیام خطا در کانال برای من ارسال کرد. میرم چک کنم ببینم اشتباهه...
چیزی در دوج ما شکست! آیا این همان چیزی نیست که ما انتظارش را داشتیم؟ دقیقا!
میخوای بریزی؟
آیا احساس می کنید چیزی را از دست داده ام؟ گویا قول داده از SQL Server به Vertica انتقال داده و بعد برداشته و از تاپیک خارج شده رذل!
این ظلم عمدی بود، من به سادگی مجبور شدم برخی اصطلاحات را برای شما رمزگشایی کنم. حالا می توانید جلوتر بروید.
برنامه ما این بود:
دگ کن
وظایف ایجاد کنید
ببین همه چی قشنگه
شماره جلسات را به پرها اختصاص دهید
دریافت داده از SQL Server
داده ها را در Vertica قرار دهید
جمع آوری آمار
بنابراین، برای اینکه همه اینها راه اندازی شود، یک افزونه کوچک به ما اضافه کردم docker-compose.yml:
Vertica به عنوان میزبان dwh با بیشترین تنظیمات پیش فرض،
سه نمونه از SQL Server،
ما پایگاه های داده در دومی را با برخی از داده ها پر می کنیم (به هیچ وجه به آن نگاه نکنید mssql_init.py!)
ما همه چیزهای خوب را با کمک یک دستور کمی پیچیده تر از دفعه قبل راه اندازی می کنیم:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
آنچه تصادفی ساز معجزه ما ایجاد کرد، می توانید از مورد استفاده کنید Data Profiling/Ad Hoc Query:
نکته اصلی این است که آن را به تحلیلگران نشان ندهید
شرح دادن در جلسات ETL من نمی خواهم، همه چیز در آنجا بی اهمیت است: ما یک پایه می سازیم، یک علامت در آن وجود دارد، ما همه چیز را با یک مدیر زمینه می بندیم، و اکنون این کار را انجام می دهیم:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
زمان آن رسیده است داده های ما را جمع آوری کنید از یکصد و نیم میز ما. بیایید این کار را با کمک خطوط بسیار بی تکلف انجام دهیم:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
با کمک یک قلاب از جریان هوا می گیریم pymssql-اتصال
بیایید یک محدودیت را به شکل تاریخ در درخواست جایگزین کنیم - توسط موتور الگو به تابع پرتاب می شود.
تغذیه درخواست ما pandasچه کسی ما را خواهد گرفت DataFrame - در آینده برای ما مفید خواهد بود.
من از جایگزینی استفاده می کنم {dt} به جای پارامتر درخواست %s نه به این دلیل که من یک پینوکیوی شیطانی هستم، بلکه به این دلیل pandas نمی تواند اداره کند pymssql و آخری را می لغزد params: Listاگرچه او واقعاً می خواهد tuple.
همچنین توجه داشته باشید که توسعه دهنده pymssql تصمیم گرفت که دیگر از او حمایت نکند و وقت آن رسیده است که بیرون بیایی pyodbc.
بیایید ببینیم که Airflow آرگومان های توابع ما را با چه چیزی پر کرده است:
اگر داده ای وجود نداشته باشد، ادامه دادن فایده ای ندارد. اما این نیز عجیب است که پر کردن را موفق بدانیم. اما این یک اشتباه نیست. آخ آخ چیکار کنم؟! و این چیزی است که:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException به Airflow می گوید که هیچ خطایی وجود ندارد، اما ما از انجام وظیفه صرف نظر می کنیم. رابط کاربری مربع سبز یا قرمز نخواهد داشت، بلکه صورتی خواهد داشت.
یک هش از منبع و شناسه سفارش - به طوری که در پایگاه داده نهایی (جایی که همه چیز در یک جدول ریخته می شود) یک شناسه سفارش منحصر به فرد داشته باشیم.
مرحله ماقبل آخر باقی می ماند: همه چیز را در Vertica بریزید. و، به اندازه کافی عجیب، یکی از دیدنی ترین و کارآمدترین راه ها برای انجام این کار از طریق CSV است!
در فروش، صفحه هدف را به صورت دستی ایجاد می کنیم. در اینجا به خودم اجازه دادم یک ماشین کوچک داشته باشم:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
من با استفاده از VerticaOperator() من یک طرح پایگاه داده و یک جدول ایجاد می کنم (البته اگر قبلاً وجود نداشته باشند). نکته اصلی این است که وابستگی ها را به درستی مرتب کنید:
- خوب، - گفت موش کوچولو، - اینطور نیست، حالا
آیا متقاعد شده اید که من وحشتناک ترین حیوان جنگل هستم؟
جولیا دونالدسون، گروفالو
فکر میکنم اگر من و همکارانم رقابتی داشتیم: چه کسی به سرعت یک فرآیند ETL را از ابتدا ایجاد و راهاندازی میکند: آنها با SSIS و ماوس خود و من با Airflow... و سپس ما همچنین سهولت تعمیر و نگهداری را با هم مقایسه میکنیم... وای، فکر کنم قبول داری که در همه جبهه ها آنها را شکست خواهم داد!
اگر کمی جدی تر، Apache Airflow - با توصیف فرآیندها در قالب کد برنامه - کار من را انجام داد. خیلی راحت تر و لذت بخش تر
توسعه نامحدود آن، هم از نظر پلاگین ها و هم از نظر تمایل به مقیاس پذیری، به شما این فرصت را می دهد که از Airflow تقریبا در هر زمینه ای استفاده کنید: حتی در چرخه کامل جمع آوری، آماده سازی و پردازش داده ها، حتی در پرتاب موشک (به مریخ، دوره).
قسمت نهایی، مرجع و اطلاعات
چنگکی که برای شما جمع آوری کرده ایم
start_date. بله، این قبلاً یک میم محلی است. از طریق استدلال اصلی داگ start_date همه عبور می کنند به طور خلاصه، اگر در آن مشخص کنید start_date تاریخ فعلی، و schedule_interval - یک روز، DAG فردا زودتر شروع می شود.
start_date = datetime(2020, 7, 7, 0, 1, 2)
و دیگر مشکلی وجود ندارد.
خطای زمان اجرا دیگری نیز با آن مرتبط است: Task is missing the start_date parameter، که اغلب نشان می دهد که شما فراموش کرده اید به عملگر dag متصل شوید.
همه روی یک دستگاه بله، و پایه ها (خود جریان هوا و پوشش ما)، و وب سرور، و زمانبندی، و کارگران. و حتی کار کرد. اما با گذشت زمان، تعداد وظایف خدمات افزایش یافت و زمانی که PostgreSQL شروع به پاسخگویی به ایندکس به جای 20 میلی ثانیه در 5 ثانیه کرد، آن را برداشتیم و آن را دور بردیم.
LocalExecutor. بله، ما هنوز روی آن نشستهایم و تا به حال به لبه پرتگاه رسیدهایم. LocalExecutor تا اینجا برای ما کافی بوده است، اما اکنون زمان آن رسیده که حداقل با یک کارگر توسعه دهیم و برای انتقال به CeleryExecutor باید سخت کار کنیم. و با توجه به این واقعیت که شما می توانید با آن در یک دستگاه کار کنید، هیچ چیز شما را از استفاده از Celery حتی بر روی یک سرور باز نمی دارد، که "البته، صادقانه بگویم، هرگز تولید نخواهد شد!"
عدم استفاده ابزارهای داخلی:
اتصالات برای ذخیره اعتبار خدمات،
SLA از دست می دهد برای پاسخگویی به وظایفی که به موقع انجام نشده اند،
xcom برای تبادل ابرداده (گفتم متاداده!) بین وظایف dag.
سوء استفاده از نامه خوب چه می توانم بگویم؟ هشدارها برای همه تکرارهای کارهای افتاده تنظیم شده بودند. اکنون جیمیل کاری من بیش از 90 هزار ایمیل از Airflow دارد، و مخزن ایمیل از دریافت و حذف بیش از 100 ایمیل در یک زمان خودداری می کند.
برای اینکه حتی بیشتر با سر کار کنیم نه با دست، Airflow این را برای ما آماده کرده است:
REST API - او هنوز وضعیت تجربی را دارد که مانع از کار او نمی شود. با استفاده از آن، نه تنها می توانید اطلاعاتی در مورد داگ ها و وظایف دریافت کنید، بلکه می توانید یک داگ را متوقف/شروع کنید، یک DAG Run یا یک استخر ایجاد کنید.
CLI - ابزارهای زیادی از طریق خط فرمان در دسترس هستند که نه تنها برای استفاده از طریق WebUI ناخوشایند هستند، بلکه به طور کلی وجود ندارند. مثلا:
backfill برای راه اندازی مجدد نمونه های کار مورد نیاز است.
به عنوان مثال، تحلیلگران آمدند و گفتند: «و شما رفیق، از اول تا سیزدهم ژانویه در داده های مزخرف دارید! درستش کن، درستش کن، درستش کن، درستش کن!» و شما یک سرگرمی هستید:
run، که به شما امکان می دهد یک کار نمونه را اجرا کنید و حتی به همه وابستگی ها امتیاز دهید. علاوه بر این، می توانید آن را از طریق اجرا کنید LocalExecutor، حتی اگر خوشه کرفس داشته باشید.
تقریبا همین کار را می کند test، فقط در پایه ها هم چیزی نمی نویسد.
connections امکان ایجاد انبوه اتصالات از پوسته را فراهم می کند.
API پایتون - روشی نسبتاً سختگیر برای تعامل که برای پلاگین ها در نظر گرفته شده است و با دستان کوچک در آن ازدحام نمی کند. اما چه کسی مانع رفتن ما می شود /home/airflow/dags، اجرا کن ipython و شروع به خرابکاری کنید؟ برای مثال می توانید تمام اتصالات را با کد زیر صادر کنید:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
اتصال به پایگاه داده های فراداده Airflow. من نوشتن در آن را توصیه نمیکنم، اما دریافت وضعیتهای وظیفه برای معیارهای خاص مختلف میتواند بسیار سریعتر و آسانتر از هر یک از APIها باشد.
بیایید بگوییم که همه وظایف ما ناتوان نیست، اما گاهی اوقات ممکن است سقوط کنند، و این طبیعی است. اما چند انسداد در حال حاضر مشکوک هستند و بررسی آن ضروری است.
مراقب SQL باشید!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
مراجع
و البته ده لینک اول از صدور گوگل محتویات پوشه Airflow از بوکمارک های من است.
مستندات جریان هوای آپاچی - البته، ما باید از دفتر شروع کنیم. مستندات، اما چه کسی دستورالعمل ها را می خواند؟
بهترین روش - خوب، حداقل توصیه های سازندگان را بخوانید.