د هوا جریان یوه وسیله ده چې په اسانۍ او ګړندۍ توګه د بیچ ډیټا پروسس کولو پروسې رامینځته کولو او ساتلو لپاره

د هوا جریان یوه وسیله ده چې په اسانۍ او ګړندۍ توګه د بیچ ډیټا پروسس کولو پروسې رامینځته کولو او ساتلو لپاره

سلام، حبر! پدې مقاله کې زه غواړم د بیچ ډیټا پروسس کولو پروسې رامینځته کولو لپاره د یوې عالي وسیلې په اړه وغږیږم ، د مثال په توګه ، د کارپوریټ DWH یا ستاسو ډیټا لیک زیربنا کې. موږ به د اپاچي ایر فلو په اړه وغږیږو (له دې وروسته د ایر فلو په نوم یادیږي). دا په غیر عادلانه ډول د هابری په اړه له پاملرنې څخه محروم دی ، او په اصلي برخه کې به زه هڅه وکړم تاسو ته قانع کړم چې لږترلږه د ایر فلو د کتلو ارزښت لري کله چې ستاسو د ETL/ELT پروسو لپاره مهالویش غوره کړئ.

مخکې، ما د DWH موضوع په اړه یو لړ مقالې لیکلي کله چې ما په ټینکوف بانک کې کار کاوه. اوس زه د Mail.Ru ګروپ ټیم برخه شوی یم او د لوبو په ساحه کې د معلوماتو تحلیل لپاره یو پلیټ فارم رامینځته کوم. په حقیقت کې ، لکه څنګه چې خبرونه او په زړه پوري حلونه څرګندیږي ، زما ټیم او زه به دلته د ډیټا تحلیلونو لپاره زموږ د پلیټ فارم په اړه وغږیږو.

پرولوګ

نو، راځئ چې پیل وکړو. د هوا جریان څه شی دی؟ دا یو کتابتون دی (یا د کتابتونونو ټولګه) د کاري پروسو پراختیا، پالن او نظارت کول. د ایر فلو اصلي ځانګړتیا: د پایتون کوډ د پروسې تشریح (پرمختګ) لپاره کارول کیږي. دا ستاسو د پروژې تنظیم کولو او پراختیا لپاره ډیری ګټې لري: په اصل کې، ستاسو (د مثال په توګه) د ETL پروژه یوازې د Python پروژه ده، او تاسو کولی شئ دا د خپلې خوښې سره سم تنظیم کړئ، د زیربنا ځانګړتیاوې، د ټیم اندازه او په پام کې نیولو سره. نورې اړتیاوې. په وسیله هرڅه ساده دي. د مثال په توګه وکاروئ PyCharm + Git. دا په زړه پوری او خورا اسانه دی!

اوس راځئ چې د هوا جریان اصلي ادارو ته وګورو. د دوی د جوهر او هدف په پوهیدو سره، تاسو کولی شئ د خپل پروسې جوړښت په ښه توګه تنظیم کړئ. شاید اصلي اداره مستقیم اکسلیک ګراف وي (له دې وروسته د DAG په نوم یادیږي).

DAG

DAG ستاسو د دندو یو څه معنی لرونکی اتحادیه ده چې تاسو غواړئ د یو ځانګړي مهالویش سره سم په کلکه تعریف شوي ترتیب کې بشپړ کړئ. ایر فلو د DAGs او نورو ادارو سره کار کولو لپاره یو مناسب ویب انٹرفیس چمتو کوي:

د هوا جریان یوه وسیله ده چې په اسانۍ او ګړندۍ توګه د بیچ ډیټا پروسس کولو پروسې رامینځته کولو او ساتلو لپاره

DAG کیدای شي داسې ښکاري:

د هوا جریان یوه وسیله ده چې په اسانۍ او ګړندۍ توګه د بیچ ډیټا پروسس کولو پروسې رامینځته کولو او ساتلو لپاره

پراختیا کونکی، کله چې د DAG ډیزاین کوي، د آپریټرانو یوه ټولګه وړاندې کوي چې د DAG دننه دندې به رامینځته شي. دلته موږ بل مهم ارګان ته راځو: د هوا جریان آپریټر.

عملیات

یو آپریټر یوه اداره ده چې پر بنسټ یې د دندې مثالونه رامینځته کیږي، کوم چې تشریح کوي چې د دندې مثال اجرا کولو پرمهال به څه پیښیږي. د GitHub څخه د هوا جریان خپریږي دمخه د کارونې لپاره چمتو یو چلونکي سیټ لري. بېلګې:

  • BashOperator - د باش کمانډ اجرا کولو لپاره آپریټر.
  • PythonOperator - د Python کوډ زنګ وهلو لپاره آپریټر.
  • د بریښنالیک آپریټر - د بریښنالیک لیږلو لپاره آپریټر.
  • HTTPOperator - د HTTP غوښتنو سره کار کولو لپاره آپریټر.
  • SqlOperator - د SQL کوډ اجرا کولو لپاره آپریټر.
  • سینسر د پیښې انتظار کولو لپاره یو آپریټر دی (د اړتیا وخت رارسیدل ، د اړین فایل څرګندیدل ، په ډیټابیس کې یوه کرښه ، د API څخه ځواب ، او داسې نور).

نور ځانګړي آپریټرونه شتون لري: DockerOperator، HiveOperator، S3FileTransferOperator، PrestoToMysqlOperator، SlackOperator.

تاسو کولی شئ د خپلو ځانګړتیاو پراساس چلونکي رامینځته کړئ او په خپله پروژه کې یې وکاروئ. د مثال په توګه، موږ د MongoDBToHiveViaHdfsTransfer، د MongoDB څخه Hive ته د اسنادو صادرولو لپاره یو آپریټر، او د کار کولو لپاره څو آپریټرونه جوړ کړل. ټک هاوس: CHLoadFromHiveOperator او CHTableLoaderOperator. په لازمي ډول ، هرڅومره ژر چې یوه پروژه په مکرر ډول په لومړني بیاناتو کې جوړ کوډ کارولی وي ، تاسو کولی شئ دا په نوي بیان کې رامینځته کولو په اړه فکر وکړئ. دا به نور پرمختګ ساده کړي، او تاسو به په پروژه کې ستاسو د آپریټرانو کتابتون پراخ کړئ.

بیا، د دندو دا ټول مثالونه باید اجرا شي، او اوس به موږ د مهالویش په اړه وغږیږو.

مهالویش کوونکی

د ایر فلو کاري مهالویش جوړ شوی دی ژړا. سیلري د Python کتابتون دی چې تاسو ته اجازه درکوي قطار تنظیم کړئ او د دندو غیر متناسب او توزیع شوي اجرا کول. د هوا جریان اړخ کې ، ټولې دندې په حوضونو ویشل شوي. حوضونه په لاسي ډول جوړ شوي دي. عموما، د دوی هدف د سرچینې سره د کار کولو کاري بار محدودول یا په DWH کې د دندو ټایپ کول دي. حوضونه د ویب انٹرفیس له لارې اداره کیدی شي:

د هوا جریان یوه وسیله ده چې په اسانۍ او ګړندۍ توګه د بیچ ډیټا پروسس کولو پروسې رامینځته کولو او ساتلو لپاره

هر حوض د سلاټونو په شمیر محدودیت لري. کله چې DAG جوړ کړئ، دا یو حوض ورکول کیږي:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

د DAG په کچه تعریف شوی حوض د دندې په کچه کې له پامه غورځول کیدی شي.
یو جلا پروسه، مهالویش کونکی، د هوا جریان کې د ټولو دندو مهالویش کولو مسولیت لري. په حقیقت کې ، مهالویش د اجرا کولو لپاره د دندو تنظیم کولو ټولو میخانیکونو سره معامله کوي. دنده د اجرا کیدو دمخه د څو مرحلو څخه تیریږي:

  1. په DAG کې پخوانۍ دندې بشپړې شوې؛ یو نوی په کتار کې کیدی شي.
  2. کتار د دندو د لومړیتوب پراساس ترتیب شوی (لومړیتوبونه هم کنټرول کیدی شي) ، او که چیرې په حوض کې وړیا سلاټ شتون ولري ، نو دنده په عمل کې نیول کیدی شي.
  3. که یو وړیا کارګر سیلری وي، دنده ورته لیږل کیږي؛ هغه کار چې تاسو په ستونزه کې برنامه کړی د یو یا بل آپریټر په کارولو سره پیل کیږي.

کافي ساده.

مهالویش کونکی د ټولو DAGs او د DAGs دننه ټولې دندې پرمخ وړي.

د DAG سره د کار پیل کولو لپاره، DAG باید یو مهال ویش ترتیب کړي:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

د چمتو شوي پریسټونو سیټ شتون لري: @once, @hourly, @daily, @weekly, @monthly, @yearly.

تاسو کولی شئ د کرون څرګندونې هم وکاروئ:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

د اعدام نیټه

د دې لپاره چې پوه شي چې د هوا جریان څنګه کار کوي، دا مهمه ده چې پوه شئ چې د DAG لپاره د اجرا نیټه څه ده. په ایر فلو کې، DAG د اجرا کولو نیټې ابعاد لري، د بیلګې په توګه، د DAG کاري مهال ویش پورې اړه لري، د هرې اعدام نیټې لپاره د کار مثالونه رامینځته کیږي. او د هرې اعدام نیټې لپاره، دندې بیا اجرا کیدی شي - یا، د بیلګې په توګه، DAG کولی شي د اعدام په څو نیټو کې یوځل کار وکړي. دا په واضح ډول دلته ښودل شوي:

د هوا جریان یوه وسیله ده چې په اسانۍ او ګړندۍ توګه د بیچ ډیټا پروسس کولو پروسې رامینځته کولو او ساتلو لپاره

له بده مرغه (یا شاید خوشبختانه: دا په وضعیت پورې اړه لري)، که چیرې په DAG کې د دندې پلي کول سم شي، نو د تیر اجرا نیټه کې اجرا کول به د سمونونو په پام کې نیولو سره پرمخ ځي. دا ښه دی که تاسو د نوي الګوریتم په کارولو سره په تیرو دورو کې د معلوماتو بیا محاسبه کولو ته اړتیا لرئ ، مګر دا خراب دی ځکه چې د پایلې بیا تولید وړتیا له لاسه ورکوي (البته ، هیڅ څوک به تاسو ته د Git څخه د سرچینې کوډ اړین نسخه بیرته راستنولو ته اړ نه کړي او څه محاسبه کړئ. تاسو یو ځل ته اړتیا لرئ، هغه ډول چې تاسو ورته اړتیا لرئ).

د کارونو پیدا کول

د DAG پلي کول په Python کې کوډ دی، نو موږ د کار کولو پر مهال د کوډ مقدار کمولو لپاره خورا اسانه لار لرو، د بیلګې په توګه، د شارډ سرچینو سره. راځئ چې ووایو تاسو د سرچینې په توګه درې MySQL شارډونه لرئ، تاسو اړتیا لرئ چې هر یو ته ورشئ او یو څه ډاټا واخلئ. سربیره پردې، په خپلواکه توګه او په موازي توګه. په DAG کې د Python کوډ ممکن داسې ښکاري:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG داسې ښکاري:

د هوا جریان یوه وسیله ده چې په اسانۍ او ګړندۍ توګه د بیچ ډیټا پروسس کولو پروسې رامینځته کولو او ساتلو لپاره

پدې حالت کې ، تاسو کولی شئ په ساده ډول د تنظیماتو تنظیم کولو او DAG تازه کولو سره شارډ اضافه یا لرې کړئ. راحته!

تاسو کولی شئ ډیر پیچلي کوډ تولید هم وکاروئ، د بیلګې په توګه، د ډیټابیس په بڼه د سرچینو سره کار وکړئ یا د میز جوړښت تشریح کړئ، د میز سره کار کولو لپاره الګوریتم، او د DWH زیربنا ځانګړتیاوې په پام کې نیولو سره، یو پروسې رامینځته کړئ. ستاسو په ذخیره کې د N میزونو بارولو لپاره. یا، د بیلګې په توګه، د API سره کار کول چې د لیست په بڼه د پیرامیټر سره کار کولو ملاتړ نه کوي، تاسو کولی شئ د دې لیست څخه په DAG کې د N دندې تولید کړئ، په API کې د غوښتنو موازي محدودیت حوض ته محدود کړئ او سکریپ کړئ. د API څخه اړین معلومات. انعطاف وړ!

ذخیره

د ایر فلو خپل بیکینډ ذخیره لري، یو ډیټابیس (کیدای شي MySQL یا Postgres وي، موږ Postgres لرو)، کوم چې د دندو ریاستونه، DAGs، د اتصال ترتیبات، نړیوال تغیرات، او داسې نور ذخیره کوي. دلته زه غواړم ووایم چې په ایر فلو کې ذخیره خورا ساده ده (شاوخوا 20 جدولونه) او اسانه دي که تاسو غواړئ د دې په سر کې خپله کومه پروسه جوړه کړئ. زه د انفارمیټیکا ذخیره کې 100500 جدولونه په یاد لرم ، کوم چې د پوښتنې رامینځته کولو څرنګوالي پوهیدو دمخه د اوږدې مودې لپاره مطالعه کړې وه.

څارنه

د ذخیره سادګۍ په پام کې نیولو سره، تاسو کولی شئ د دندې څارنې پروسه جوړه کړئ چې ستاسو لپاره مناسب وي. موږ په زپیلین کې یو نوټ پیډ کاروو، چیرې چې موږ د دندو وضعیت ګورو:

د هوا جریان یوه وسیله ده چې په اسانۍ او ګړندۍ توګه د بیچ ډیټا پروسس کولو پروسې رامینځته کولو او ساتلو لپاره

دا پخپله د ایر فلو ویب انٹرفیس هم کیدی شي:

د هوا جریان یوه وسیله ده چې په اسانۍ او ګړندۍ توګه د بیچ ډیټا پروسس کولو پروسې رامینځته کولو او ساتلو لپاره

د هوا جریان کوډ خلاص سرچینه ده ، نو موږ په ټیلیګرام کې خبرداری اضافه کړی. د کار هره روانه بیلګه، که کومه تېروتنه رامنځ ته شي، په ټیلیګرام کې ګروپ سپیم کوي، چیرې چې د پراختیا او ملاتړ ټول ټیم ​​شتون لري.

موږ د ټیلیګرام له لارې سمدستي ځواب ترلاسه کوو (که اړتیا وي) ، او د زپیلین له لارې موږ په ایر فلو کې د دندو عمومي عکس ترلاسه کوو.

ټول

د هوا جریان اساسا خلاص سرچینه ده ، او تاسو باید له دې څخه د معجزو تمه ونه کړئ. د داسې حل جوړولو لپاره چې کار کوي د وخت او هڅې کولو لپاره چمتو اوسئ. هدف د لاسته راوړلو وړ دی، باور وکړئ، دا د دې ارزښت لري. د پرمختګ سرعت ، انعطاف ، د نوي پروسو اضافه کولو اسانتیا - تاسو به یې خوښ کړئ. البته، تاسو اړتیا لرئ د پروژې تنظیم ته ډیره پاملرنه وکړئ، پخپله د هوا جریان ثبات: معجزې نه کیږي.

اوس موږ د هوا جریان هره ورځ کار کوو شاوخوا 6,5 زره دندې. دوی په کرکټر کې خورا توپیر لري. د ډیری مختلف او خورا مشخصو سرچینو څخه اصلي DWH ته د ډیټا بارولو دندې شتون لري ، د اصلي DWH دننه د سټور فرنټ محاسبه کولو دندې شتون لري ، په ګړندۍ DWH کې د معلوماتو خپرولو دندې شتون لري ، ډیری ، ډیری مختلف دندې شتون لري - او هوایی جریان دا ټول ورځ په ورځ ژاړي. په شمیرو کې خبرې کول، دا دی ۴۳ زره د DWH (Hadoop) دننه د مختلف پیچلتیاو ELT دندې، نږدې. 2,5 سوه ډیټابیسونه سرچینې، دا یو ټیم دی 4 د ETL پراختیا کونکي، کوم چې په DWH کې د ETL ډیټا پروسس کولو او د DWH دننه د ELT ډیټا پروسس کولو کې ویشل شوي او البته نور یو اډمین، څوک چې د خدماتو زیربنا سره معامله کوي.

د راتلونکي لپاره پلانونه

د پروسو شمیر په ناڅاپي ډول وده کوي ، او اصلي شی چې موږ به یې د هوایی جریان زیربنا شرایطو کې ترسره کړو اندازه کول دي. موږ غواړو د ایر فلو کلستر جوړ کړو، د سیلري کارګرانو لپاره یوه جوړه پښې تخصیص کړو، او د دندې مهالویش پروسو او ذخیره کولو سره د ځان نقل کولو سر جوړ کړو.

اییلیلوم

دا، البته، هر څه نه دي چې زه غواړم د هوا فلو په اړه ووایم، مګر ما هڅه وکړه چې اصلي ټکي روښانه کړم. اشتها د خوړلو سره راځي، هڅه وکړئ او تاسو به یې خوښ کړئ :)

سرچینه: www.habr.com

Add a comment