جریان هوا ابزاری برای توسعه راحت و سریع و حفظ فرآیندهای پردازش دسته ای داده است

جریان هوا ابزاری برای توسعه راحت و سریع و حفظ فرآیندهای پردازش دسته ای داده است

سلام، هابر! در این مقاله می خواهم در مورد یک ابزار عالی برای توسعه فرآیندهای پردازش دسته ای داده صحبت کنم، به عنوان مثال، در زیرساخت یک DWH شرکت یا DataLake شما. ما در مورد Apache Airflow صحبت خواهیم کرد (که از این پس جریان هوا نامیده می شود). به‌طور ناعادلانه‌ای از توجه به Habré محروم شده است، و در بخش اصلی سعی خواهم کرد شما را متقاعد کنم که حداقل Airflow ارزش آن را دارد که هنگام انتخاب یک زمان‌بندی برای فرآیندهای ETL/ELT خود به آن نگاه کنید.

پیش از این، زمانی که در بانک Tinkoff کار می کردم، مجموعه ای از مقالات با موضوع DWH نوشتم. اکنون بخشی از تیم Mail.Ru Group شده ام و در حال توسعه پلتفرمی برای تجزیه و تحلیل داده ها در حوزه بازی هستم. در واقع، با ظاهر شدن اخبار و راه حل های جالب، من و تیم من در اینجا در مورد پلت فرم خود برای تجزیه و تحلیل داده ها صحبت خواهیم کرد.

پیش نویس

بنابراین، بیایید شروع کنیم. جریان هوا چیست؟ این یک کتابخانه (یا مجموعه ای از کتابخانه ها) توسعه، برنامه ریزی و نظارت بر فرآیندهای کاری. ویژگی اصلی جریان هوا: کد پایتون برای توصیف (توسعه) فرآیندها استفاده می شود. این مزیت های زیادی برای سازماندهی پروژه و توسعه شما دارد: در اصل، پروژه ETL شما (به عنوان مثال) فقط یک پروژه پایتون است و می توانید با در نظر گرفتن ویژگی های زیرساخت، اندازه تیم و آن را به دلخواه سازماندهی کنید. ملزومات دیگر. از نظر ابزاری همه چیز ساده است. برای مثال از PyCharm + Git استفاده کنید. فوق العاده است و بسیار راحت!

حال بیایید به نهادهای اصلی Airflow نگاه کنیم. با درک ماهیت و هدف آنها، می توانید معماری فرآیند خود را بهینه سازماندهی کنید. شاید موجودیت اصلی گراف غیر چرخه ای جهت دار باشد (از این پس DAG نامیده می شود).

DAG

DAG یک ارتباط معنی‌دار از وظایف شما است که می‌خواهید در یک دنباله کاملاً تعریف شده طبق یک برنامه زمانی خاص تکمیل کنید. Airflow یک رابط وب مناسب برای کار با DAG و سایر نهادها فراهم می کند:

جریان هوا ابزاری برای توسعه راحت و سریع و حفظ فرآیندهای پردازش دسته ای داده است

DAG ممکن است به شکل زیر باشد:

جریان هوا ابزاری برای توسعه راحت و سریع و حفظ فرآیندهای پردازش دسته ای داده است

توسعه دهنده، هنگام طراحی یک DAG، مجموعه ای از اپراتورها را تعیین می کند که وظایف درون DAG بر روی آنها ساخته می شود. در اینجا به یک نهاد مهم دیگر می رسیم: اپراتور جریان هوا.

اپراتورها

اپراتور موجودیتی است که بر اساس آن نمونه های شغلی ایجاد می شوند، که توصیف می کند در طول اجرای یک نمونه کار چه اتفاقی می افتد. جریان هوا از GitHub منتشر می شود در حال حاضر شامل مجموعه ای از عملگرهای آماده برای استفاده است. مثال ها:

  • BashOperator - عملگر برای اجرای دستور bash.
  • PythonOperator - اپراتور برای فراخوانی کد پایتون.
  • EmailOperator - اپراتور برای ارسال ایمیل.
  • HTTPOperator - اپراتور برای کار با درخواست های http.
  • SqlOperator - اپراتور برای اجرای کد SQL.
  • Sensor عملگر انتظار برای یک رویداد (رسیدن زمان مورد نیاز، ظاهر شدن فایل مورد نیاز، یک خط در پایگاه داده، پاسخ از API و غیره و غیره) است.

اپراتورهای خاص تری وجود دارد: DockerOperator، HiveOperator، S3FileTransferOperator، PrestoToMysqlOperator، SlackOperator.

شما همچنین می توانید اپراتورها را بر اساس ویژگی های خود توسعه دهید و از آنها در پروژه خود استفاده کنید. به عنوان مثال، ما MongoDBToHiveViaHdfsTransfer را ایجاد کردیم، یک اپراتور برای صادرات اسناد از MongoDB به Hive، و چندین اپراتور برای کار با کلیک هاوس: CHLoadFromHiveOperator و CHTableLoaderOperator. اساساً، به محض اینکه یک پروژه مکرراً از کدهای ساخته شده بر روی عبارات اساسی استفاده می کند، می توانید به ساخت آن در یک عبارت جدید فکر کنید. این توسعه بیشتر را ساده می کند و شما کتابخانه اپراتورهای خود را در پروژه گسترش می دهید.

در مرحله بعد، همه این موارد از وظایف باید اجرا شوند، و اکنون در مورد زمانبندی صحبت خواهیم کرد.

برنامه ریز

زمانبندی وظیفه جریان هوا بر اساس ساخته شده است کرفس. Celery یک کتابخانه پایتون است که به شما امکان می دهد یک صف به همراه اجرای ناهمزمان و توزیع شده وظایف را سازماندهی کنید. در سمت جریان هوا، همه وظایف به استخر تقسیم می شوند. استخرها به صورت دستی ایجاد می شوند. به طور معمول، هدف آنها محدود کردن حجم کاری کار با منبع یا مشخص کردن وظایف در DWH است. استخرها را می توان از طریق رابط وب مدیریت کرد:

جریان هوا ابزاری برای توسعه راحت و سریع و حفظ فرآیندهای پردازش دسته ای داده است

هر استخر محدودیتی در تعداد اسلات دارد. هنگام ایجاد یک DAG، یک Pool به آن داده می شود:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

یک استخر تعریف شده در سطح DAG می تواند در سطح وظیفه لغو شود.
یک فرآیند جداگانه، Scheduler، مسئول زمان بندی تمام وظایف در جریان هوا است. در واقع، Scheduler با تمام مکانیک های تنظیم وظایف برای اجرا سروکار دارد. این کار قبل از اجرا چندین مرحله را طی می کند:

  1. وظایف قبلی در DAG تکمیل شده است؛ یک مورد جدید را می توان در صف قرار داد.
  2. صف بسته به اولویت کارها مرتب می شود (اولویت ها را نیز می توان کنترل کرد) و در صورت وجود شکاف آزاد در استخر می توان کار را عملیاتی کرد.
  3. اگر کرفس کارگر مجانی وجود داشته باشد، وظیفه به آن ارسال می شود. کاری که در مشکل برنامه ریزی کرده اید با استفاده از یک یا آن عملگر شروع می شود.

به اندازه کافی ساده

Scheduler روی مجموعه تمام DAG ها و همه وظایف درون DAG اجرا می شود.

برای اینکه Scheduler شروع به کار با DAG کند، DAG باید یک برنامه زمانی تنظیم کند:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

مجموعه ای از پیش تنظیم های آماده وجود دارد: @once, @hourly, @daily, @weekly, @monthly, @yearly.

همچنین می توانید از عبارات cron استفاده کنید:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

تاریخ اجرا

برای درک اینکه جریان هوا چگونه کار می کند، مهم است که بدانیم تاریخ اجرا برای DAG چیست. در جریان هوا، DAG دارای یک بعد تاریخ اجرا است، به عنوان مثال، بسته به برنامه کاری DAG، نمونه های وظیفه برای هر تاریخ اجرا ایجاد می شود. و برای هر تاریخ اجرا، وظایف را می توان دوباره اجرا کرد - یا، برای مثال، یک DAG می تواند به طور همزمان در چندین تاریخ اجرا کار کند. این به وضوح در اینجا نشان داده شده است:

جریان هوا ابزاری برای توسعه راحت و سریع و حفظ فرآیندهای پردازش دسته ای داده است

متأسفانه (یا شاید خوشبختانه: بستگی به موقعیت دارد)، اگر اجرای وظیفه در DAG اصلاح شود، اجرا در تاریخ اجرای قبلی با در نظر گرفتن تنظیمات پیش خواهد رفت. اگر نیاز به محاسبه مجدد داده ها در دوره های گذشته با استفاده از یک الگوریتم جدید داشته باشید این خوب است، اما بد است زیرا تکرارپذیری نتیجه از بین می رود (البته هیچکس شما را اذیت نمی کند که نسخه مورد نیاز کد منبع را از Git برگردانید و محاسبه کنید شما به یک بار نیاز دارید، همانطور که به آن نیاز دارید).

ایجاد وظایف

پیاده سازی DAG کد در پایتون است، بنابراین ما یک راه بسیار راحت برای کاهش مقدار کد در هنگام کار، به عنوان مثال، با منابع خرد شده داریم. فرض کنید سه قطعه MySQL به عنوان منبع دارید، باید به هر کدام بروید و مقداری داده را جمع آوری کنید. علاوه بر این، به طور مستقل و موازی. کد پایتون در DAG ممکن است به شکل زیر باشد:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG به شکل زیر است:

جریان هوا ابزاری برای توسعه راحت و سریع و حفظ فرآیندهای پردازش دسته ای داده است

در این حالت، می توانید به سادگی با تنظیم تنظیمات و به روز رسانی DAG، یک قطعه را اضافه یا حذف کنید. راحت!

همچنین می‌توانید از تولید کد پیچیده‌تر استفاده کنید، به عنوان مثال، کار با منابع در قالب یک پایگاه داده یا توصیف ساختار جدول، الگوریتمی برای کار با جدول، و با در نظر گرفتن ویژگی‌های زیرساخت DWH، یک فرآیند ایجاد کنید. برای بارگیری N جدول در فضای ذخیره سازی شما. یا مثلاً با کار با یک API که از کار با پارامتر به شکل لیست پشتیبانی نمی‌کند، می‌توانید از این لیست N کار در یک DAG ایجاد کنید، موازی بودن درخواست‌ها در API را به یک استخر محدود کنید و آن را پاک کنید. داده های لازم از API قابل انعطاف!

مخزن

Airflow مخزن پشتیبان خود را دارد، یک پایگاه داده (می تواند MySQL یا Postgres باشد، ما Postgres داریم)، ​​که وضعیت وظایف، DAG ها، تنظیمات اتصال، متغیرهای جهانی و غیره و غیره را ذخیره می کند. در اینجا می خواهم بگویم که مخزن در Airflow بسیار ساده است (حدود 20 جدول) و راحت است اگر بخواهید هر یک از فرآیندهای خود را در بالای آن بسازید. 100500 جدول موجود در مخزن انفورماتیکا را به خاطر می آورم که قبل از فهمیدن نحوه ساخت یک پرس و جو باید برای مدت طولانی مطالعه می شد.

نظارت

با توجه به سادگی مخزن، می توانید یک فرآیند نظارت بر کار را ایجاد کنید که برای شما راحت باشد. ما از یک دفترچه یادداشت در Zeppelin استفاده می کنیم، جایی که به وضعیت وظایف نگاه می کنیم:

جریان هوا ابزاری برای توسعه راحت و سریع و حفظ فرآیندهای پردازش دسته ای داده است

این همچنین می تواند رابط وب خود Airflow باشد:

جریان هوا ابزاری برای توسعه راحت و سریع و حفظ فرآیندهای پردازش دسته ای داده است

کد جریان هوا منبع باز است، بنابراین ما هشدار را به تلگرام اضافه کرده ایم. هر نمونه در حال اجرا از یک کار، اگر خطایی رخ دهد، گروه را در تلگرام اسپم می کند، جایی که کل تیم توسعه و پشتیبانی شامل می شود.

ما یک پاسخ سریع از طریق تلگرام (در صورت نیاز) دریافت می کنیم و از طریق Zeppelin یک تصویر کلی از وظایف در Airflow دریافت می کنیم.

در کل

جریان هوا در اصل منبع باز است و نباید از آن انتظار معجزه داشت. آماده باشید که زمان و تلاش خود را برای ایجاد یک راه حل موثر صرف کنید. هدف دست یافتنی است، باور کنید ارزشش را دارد. سرعت توسعه، انعطاف پذیری، سهولت افزودن فرآیندهای جدید - شما آن را دوست خواهید داشت. البته، شما باید به سازماندهی پروژه، ثبات خود جریان هوا توجه زیادی داشته باشید: معجزه اتفاق نمی افتد.

اکنون Airflow داریم که روزانه کار می کند حدود 6,5 هزار کار. آنها از نظر شخصیت کاملاً متفاوت هستند. وظایف بارگیری داده ها در DWH اصلی از منابع مختلف و بسیار خاص وجود دارد، وظایف محاسبه ویترین فروشگاه ها در داخل DWH اصلی وجود دارد، وظایف انتشار داده ها در یک DWH سریع وجود دارد، وظایف بسیار بسیار متفاوتی وجود دارد - و جریان هوا همه آنها را روز به روز می جود. با اعداد صحبت کنیم، این است 2,3 هزار وظایف ELT با پیچیدگی های مختلف در DWH (Hadoop)، تقریباً 2,5 صد پایگاه داده منابع، این یک تیم از 4 توسعه دهنده ETL، که به پردازش داده ETL در DWH و پردازش داده ELT در داخل DWH و البته بیشتر تقسیم می شوند. یک ادمین، که با زیرساخت سرویس سروکار دارد.

برنامه های آینده

تعداد فرآیندها ناگزیر در حال افزایش است و اصلی‌ترین کاری که از نظر زیرساخت جریان هوا انجام خواهیم داد، مقیاس‌پذیری است. ما می‌خواهیم یک خوشه جریان هوا بسازیم، یک جفت پایه را برای کارگران Celery اختصاص دهیم، و یک سر خودکپی با فرآیندهای زمان‌بندی کار و یک مخزن بسازیم.

خاتمه

البته این همه آن چیزی نیست که بخواهم در مورد Airflow بگویم، اما سعی کردم نکات اصلی را برجسته کنم. اشتها با خوردن میاد، امتحانش کن خوشت میاد :)

منبع: www.habr.com

اضافه کردن نظر