ايئر فلو هڪ اوزار آهي آساني سان ۽ جلدي ترقي ۽ برقرار رکڻ لاءِ بيچ ڊيٽا پروسيسنگ پروسيسنگ

ايئر فلو هڪ اوزار آهي آساني سان ۽ جلدي ترقي ۽ برقرار رکڻ لاءِ بيچ ڊيٽا پروسيسنگ پروسيسنگ

هيلو، حبر! هن آرٽيڪل ۾ مان هڪ بهترين اوزار بابت ڳالهائڻ چاهيان ٿو بيچ ڊيٽا پروسيسنگ پروسيسنگ کي ترقي ڪرڻ لاءِ، مثال طور، ڪارپوريٽ DWH يا توهان جي DataLake جي انفراسٽرڪچر ۾. اسان Apache Airflow بابت ڳالهائينداسين (هتان کان پوء ايئر فلو جو حوالو ڏنو ويو). اهو غير منصفانه طور تي Habré تي ڌيان کان محروم آهي، ۽ مکيه حصي ۾ آئون توهان کي قائل ڪرڻ جي ڪوشش ڪندس ته گهٽ ۾ گهٽ ايئر فلو ڏسڻ جي قابل آهي جڏهن توهان جي ETL / ELT پروسيس لاء شيڊولر چونڊيو.

اڳي، مون DWH جي موضوع تي مضمونن جو هڪ سلسلو لکيو جڏهن مون ٽينڪوف بئنڪ ۾ ڪم ڪيو. ھاڻي مان Mail.Ru گروپ ٽيم جو حصو بڻجي چڪو آھيان ۽ گيمنگ ايريا ۾ ڊيٽا جي تجزيو لاءِ پليٽ فارم ٺاھي رھيو آھيان. دراصل، جيئن خبرون ۽ دلچسپ حل ظاهر ٿيندا آهن، منهنجي ٽيم ۽ مان هتي ڳالهائينداسين ڊيٽا اينالائيٽڪس لاءِ اسان جي پليٽ فارم بابت.

اڳڀرائي

سو، اچو ته شروع ڪريون. ايئر فلو ڇا آهي؟ هي هڪ لائبريري آهي (يا لائبريرين جو سيٽ) ڪم جي عمل کي ترقي، منصوبابندي ۽ نگراني ڪرڻ. ايئر فلو جي مکيه خصوصيت: پٿون ڪوڊ استعمال ڪيو ويندو آهي بيان ڪرڻ لاءِ (ترقي) عمل. توھان جي پروجيڪٽ ۽ ڊولپمينٽ کي منظم ڪرڻ لاءِ ھن جا ڪيترائي فائدا آھن: اصل ۾، توھان جو (مثال طور) ETL پروجيڪٽ صرف ھڪڙو پائٿون پروجيڪٽ آھي، ۽ توھان ان کي منظم ڪري سگھوٿا جيئن توھان چاھيو، بنيادي ڍانچي جي خصوصيتن کي مدنظر رکندي، ٽيم جي سائيز ۽ ٻيون گهرجون. instrumentally سڀڪنھن شيء کي سادي آهي. مثال طور استعمال ڪريو PyCharm + Git. اهو شاندار ۽ تمام آسان آهي!

هاڻي اچو ته Airflow جي مکيه ادارن تي نظر. انهن جي جوهر ۽ مقصد کي سمجهڻ سان، توهان پنهنجي عمل جي فن تعمير کي بهتر طور تي منظم ڪري سگهو ٿا. ٿي سگهي ٿو ته مکيه ادارو آهي سڌو Acyclic گراف (هاڻي بعد ۾ DAG طور حوالو ڏنو ويو).

ڊيگ

هڪ DAG توهان جي ڪمن جي ڪجهه بامعني انجمن آهي جنهن کي توهان هڪ خاص شيڊول جي مطابق هڪ سخت بيان ڪيل ترتيب ۾ مڪمل ڪرڻ چاهيو ٿا. ايئر فلو DAGs ۽ ٻين ادارن سان ڪم ڪرڻ لاءِ هڪ آسان ويب انٽرفيس فراهم ڪري ٿو:

ايئر فلو هڪ اوزار آهي آساني سان ۽ جلدي ترقي ۽ برقرار رکڻ لاءِ بيچ ڊيٽا پروسيسنگ پروسيسنگ

DAG شايد هن طرح نظر اچن ٿا:

ايئر فلو هڪ اوزار آهي آساني سان ۽ جلدي ترقي ۽ برقرار رکڻ لاءِ بيچ ڊيٽا پروسيسنگ پروسيسنگ

ڊولپر، ڊي اي جي ڊزائين ڪرڻ وقت، آپريٽرز جو هڪ سيٽ رکي ٿو جن تي ڊي اي جي اندر ڪم ڪيا ويندا. هتي اسان هڪ ٻي اهم اداري ڏانهن اچون ٿا: ايئر فلو آپريٽر.

آپريٽرز

هڪ آپريٽر هڪ ادارو آهي جنهن جي بنياد تي نوڪري جا مثال ٺاهيا ويندا آهن، جيڪو بيان ڪري ٿو ته نوڪري جي مثال جي عمل دوران ڇا ٿيندو. GitHub مان ايئر فلو جاري ڪري ٿو اڳ ۾ ئي استعمال ڪرڻ لاء تيار آپريٽرز جي هڪ سيٽ تي مشتمل آهي. مثال:

  • BashOperator - بيش ڪمانڊ تي عمل ڪرڻ لاءِ آپريٽر.
  • PythonOperator - Python ڪوڊ ڪال ڪرڻ لاءِ آپريٽر.
  • اي ميل آپريٽر - اي ميل موڪلڻ لاءِ آپريٽر.
  • HTTPOperator - HTTP درخواستن سان ڪم ڪرڻ لاءِ آپريٽر.
  • SqlOperator - SQL ڪوڊ تي عمل ڪرڻ لاءِ آپريٽر.
  • سينسر هڪ آپريٽر آهي جيڪو واقعي جي انتظار ۾ آهي (گهربل وقت جي آمد، گهربل فائل جي ظاهر ٿيڻ، ڊيٽابيس ۾ هڪ لائن، API کان جواب، وغيره وغيره).

هتي وڌيڪ مخصوص آپريٽرز آهن: DockerOperator، HiveOperator، S3FileTransferOperator، PrestoToMysqlOperator، SlackOperator.

توهان پڻ ترقي ڪري سگهو ٿا آپريٽرز توهان جي پنهنجي خاصيتن جي بنياد تي ۽ انهن کي پنهنجي منصوبي ۾ استعمال ڪريو. مثال طور، اسان MongoDBToHiveViaHdfsTransfer، MongoDB کان Hive ڏانهن دستاويز برآمد ڪرڻ لاءِ هڪ آپريٽر، ۽ ڪم ڪرڻ لاءِ ڪيترائي آپريٽر ٺاهيا آهن. ڪلڪ ڪريو هائوس: CHLoadFromHiveOperator ۽ CHTableLoaderOperator. لازمي طور تي، جيترو جلدي هڪ پروجيڪٽ بنيادي بيانن تي ٺهيل ڪوڊ استعمال ڪيو آهي، توهان ان کي نئين بيان ۾ تعمير ڪرڻ بابت سوچي سگهو ٿا. اهو وڌيڪ ترقي کي آسان بڻائيندو، ۽ توهان پروجيڪٽ ۾ آپريٽرز جي لائبريري کي وڌايو.

اڳيون، ڪم جي انهن سڀني مثالن تي عمل ڪرڻ جي ضرورت آهي، ۽ هاڻي اسان شيڊولر بابت ڳالهائينداسين.

شيڊيولر

ايئر فلو جي ٽاسڪ شيڊولر تي ٺهيل آهي سريلا. Celery هڪ Python لائبريري آهي جيڪا توهان کي هڪ قطار کي منظم ڪرڻ جي اجازت ڏئي ٿي ۽ ڪمن جي غير مطابقت ۽ تقسيم تي عملدرآمد. ايئر فلو پاسي تي، سڀئي ڪم تلاء ۾ ورهايل آهن. پول دستي طور تي ٺاهيا ويا آهن. عام طور تي، انهن جو مقصد ذريعو سان ڪم ڪرڻ جي ڪم جي لوڊ کي محدود ڪرڻ يا DWH اندر ڪمن کي ٽائپ ڪرڻ آهي. پول ويب انٽرفيس ذريعي منظم ڪري سگهجي ٿو:

ايئر فلو هڪ اوزار آهي آساني سان ۽ جلدي ترقي ۽ برقرار رکڻ لاءِ بيچ ڊيٽا پروسيسنگ پروسيسنگ

هر تلاءَ ۾ سلاٽ جي تعداد تي حد آهي. جڏهن هڪ DAG ٺاهي، اهو هڪ تلاء ڏنو ويو آهي:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

DAG سطح تي بيان ڪيل هڪ تلاء کي ڪم جي سطح تي ختم ڪري سگهجي ٿو.
هڪ الڳ عمل، شيڊولر، ايئر فلو ۾ سڀني ڪمن کي شيڊول ڪرڻ جو ذميوار آهي. دراصل، شيڊيولر عمل ڪرڻ لاءِ ڪمن کي ترتيب ڏيڻ جي سڀني ميڪنڪس سان واسطو رکي ٿو. ڪم تي عمل ڪرڻ کان اڳ ڪيترن ئي مرحلن مان گذري ٿو:

  1. اڳوڻو ڪم DAG ۾ مڪمل ٿي چڪو آهي، هڪ نئون ڪم ڪري سگهجي ٿو.
  2. قطار کي ترتيب ڏنل ڪمن جي ترجيحن جي بنياد تي (ترجيح پڻ ڪنٽرول ڪري سگهجي ٿي)، ۽ جيڪڏهن تلاء ۾ هڪ مفت سلاٽ آهي، ڪم کي آپريشن ۾ وٺي سگهجي ٿو.
  3. جيڪڏهن ڪو مفت ڪم ڪندڙ celery آهي، اهو ڪم ان ڏانهن موڪليو ويندو آهي؛ اهو ڪم جيڪو توهان مسئلي ۾ پروگرام ڪيو آهي، هڪ يا ٻيو آپريٽر استعمال ڪندي شروع ٿئي ٿو.

ڪافي سادو.

شيڊيولر سڀني DAGs جي سيٽ تي هلندو آهي ۽ DAGs اندر سڀ ڪم.

شيڊيولر لاءِ DAG سان ڪم شروع ڪرڻ لاءِ، DAG کي هڪ شيڊول مقرر ڪرڻ جي ضرورت آهي:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

هتي تيار ڪيل سيٽن جو هڪ سيٽ آهي: @once, @hourly, @daily, @weekly, @monthly, @yearly.

توھان پڻ استعمال ڪري سگھو ٿا ڪرون اظهار:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

عملدرآمد جي تاريخ

سمجھڻ لاءِ ته ايئر فلو ڪيئن ڪم ڪري ٿو، اھو سمجھڻ ضروري آھي ته ڊي اي جي لاءِ Execution Date ڇا آھي. ايئر فلو ۾، ڊي اي جي هڪ عمل جي تاريخ جو طول و عرض آهي، يعني، ڊي اي جي جي ڪم جي شيڊول تي منحصر آهي، هر عمل جي تاريخ لاء ٽاسڪ مثال ٺاهيا ويندا آهن. ۽ هر عمل جي تاريخ لاءِ، ڪمن کي ٻيهر عمل ۾ آڻي سگهجي ٿو - يا، مثال طور، هڪ DAG هڪ ئي وقت ڪيترن ئي عمل جي تاريخن ۾ ڪم ڪري سگهي ٿو. اهو واضح طور تي هتي ڏيکاريل آهي:

ايئر فلو هڪ اوزار آهي آساني سان ۽ جلدي ترقي ۽ برقرار رکڻ لاءِ بيچ ڊيٽا پروسيسنگ پروسيسنگ

بدقسمتي سان (يا شايد خوش قسمتي سان: اهو صورتحال تي منحصر آهي)، جيڪڏهن DAG ۾ ڪم جي عمل کي درست ڪيو ويو آهي، ته پوءِ اڳئين عمل جي تاريخ ۾ عمل کي ترتيب ڏيڻ جي حساب سان اڳتي وڌندو. اهو سٺو آهي جيڪڏهن توهان کي نئين الگورتھم استعمال ڪندي گذريل دورن ۾ ڊيٽا کي ٻيهر ڳڻپ ڪرڻ جي ضرورت آهي، پر اهو خراب آهي ڇو ته نتيجن جي ٻيهر پيداوار وڃائي وئي آهي (يقينا، ڪو به توهان کي گٽ مان سورس ڪوڊ جو گهربل نسخو واپس ڪرڻ جي تڪليف نه ڪندو آهي ۽ حساب ڪريو ڇا. توهان کي هڪ وقت جي ضرورت آهي، جنهن طريقي سان توهان کي ضرورت آهي).

ڪم پيدا ڪرڻ

DAG جو نفاذ Python ڪوڊ آهي، تنهنڪري اسان وٽ ڪم ڪرڻ وقت ڪوڊ جي مقدار کي گهٽائڻ جو هڪ تمام آسان طريقو آهي، مثال طور، شارڊ ٿيل ذريعن سان. اچو ته چئو ته توهان وٽ هڪ ماخذ طور ٽي MySQL شارڊز آهن، توهان کي هر هڪ تي چڙهڻ ۽ ڪجهه ڊيٽا کڻڻ جي ضرورت آهي. ان کان سواء، آزاد ۽ متوازي طور تي. DAG ۾ پٿون ڪوڊ هن طرح نظر اچي سگھي ٿو:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG هن طرح نظر اچي ٿو:

ايئر فلو هڪ اوزار آهي آساني سان ۽ جلدي ترقي ۽ برقرار رکڻ لاءِ بيچ ڊيٽا پروسيسنگ پروسيسنگ

انهي حالت ۾، توهان صرف سيٽنگون ترتيب ڏيڻ ۽ ڊيگ کي اپڊيٽ ڪندي هڪ شارڊ شامل يا ختم ڪري سگهو ٿا. آرام سان!

توھان وڌيڪ پيچيده ڪوڊ جنريشن پڻ استعمال ڪري سگھو ٿا، مثال طور، ڊيٽابيس جي صورت ۾ ذريعن سان ڪم ڪريو يا ٽيبل جي ڍانچي کي بيان ڪريو، ٽيبل سان ڪم ڪرڻ لاءِ ھڪڙو الگورٿم، ۽، DWH انفراسٽرڪچر جي خصوصيتن کي مدنظر رکندي، ھڪڙو عمل ٺاھيو. توهان جي اسٽوريج ۾ N ٽيبل لوڊ ڪرڻ لاء. يا، مثال طور، هڪ API سان ڪم ڪري رهيو آهي جيڪو هڪ فهرست جي صورت ۾ هڪ پيٽرولر سان ڪم ڪرڻ جي حمايت نٿو ڪري، توهان هن فهرست مان هڪ DAG ۾ N ڪم ٺاهي سگهو ٿا، API ۾ درخواستن جي متوازي کي پول تائين محدود ڪري سگهو ٿا ۽ اسڪريپ کي ختم ڪري سگهو ٿا. API کان ضروري ڊيٽا. لچڪدار!

مخزن

ايئر فلو جو پنهنجو پس منظر مخزن آهي، هڪ ڊيٽابيس (جي سگهي ٿو MySQL يا Postgres، اسان وٽ Postgres آهي)، جيڪو ڪمن جي رياستن، DAGs، ڪنيڪشن سيٽنگون، گلوبل متغيرات، وغيره وغيره کي محفوظ ڪري ٿو. هتي مان چاهيان ٿو ته آئون اهو چئي سگهان ٿو ته ايئر فلو ۾ مخزن تمام سادو آهي (اٽڪل 20 ٽيبل) ۽ آسان آهي جيڪڏهن توهان چاهيو ٿا ته ان جي مٿان پنهنجو ڪو عمل ٺاهيو. مون کي ياد آهي 100500 ٽيبل انفارميٽيڪا مخزن ۾، جن کي سمجهڻ کان اڳ هڪ ڊگهو وقت تائين مطالعو ڪرڻو پوندو هو ته هڪ سوال ڪيئن ٺاهيو وڃي.

مانيٽرنگ

مخزن جي سادگي کي نظر ۾ رکندي، توهان هڪ ٽاسڪ مانيٽرنگ عمل ٺاهي سگهو ٿا جيڪو توهان لاءِ آسان آهي. اسان Zeppelin ۾ هڪ نوٽ پيڊ استعمال ڪندا آهيون، جتي اسان ڪمن جي حالت کي ڏسون ٿا:

ايئر فلو هڪ اوزار آهي آساني سان ۽ جلدي ترقي ۽ برقرار رکڻ لاءِ بيچ ڊيٽا پروسيسنگ پروسيسنگ

اهو پڻ ٿي سگهي ٿو ويب انٽرفيس جو پاڻ Airflow:

ايئر فلو هڪ اوزار آهي آساني سان ۽ جلدي ترقي ۽ برقرار رکڻ لاءِ بيچ ڊيٽا پروسيسنگ پروسيسنگ

ايئر فلو ڪوڊ اوپن سورس آهي، ان ڪري اسان ٽيليگرام تي الرٽ شامل ڪيو آهي. ھر ھڪ ھلندڙ ٽاسڪ جو مثال، جيڪڏھن ڪا نقص ٿئي ٿي، گروپ کي ٽيليگرام ۾ اسپام ڪري ٿو، جتي پوري ڊولپمينٽ ۽ سپورٽ ٽيم آھي.

اسان ٽيليگرام (جيڪڏهن گهربل هجي) ذريعي فوري جواب حاصل ڪندا آهيون، ۽ Zeppelin ذريعي اسان ايئر فلو ۾ ڪمن جي مجموعي تصوير حاصل ڪندا آهيون.

ڪل

ايئر فلو بنيادي طور تي کليل ذريعو آهي، ۽ توهان کي ان مان معجزن جي اميد نه رکڻ گهرجي. ڪم ڪرڻ جو حل ٺاهڻ لاءِ وقت ۽ ڪوشش ۾ رکڻ لاءِ تيار ٿيو. مقصد حاصل ڪرڻ لائق آهي، مون کي يقين ڪر، اهو ان جي لائق آهي. ترقي جي رفتار، لچڪ، نون عملن کي شامل ڪرڻ ۾ آسان - توھان ان کي پسند ڪندا. يقينا، توهان کي منصوبي جي تنظيم تي تمام گهڻو ڌيان ڏيڻ جي ضرورت آهي، خود ايئر فلو جي استحڪام: معجزا نه ٿيندا آهن.

ھاڻي اسان وٽ روزانو ڪم ڪندڙ ايئر فلو آھي اٽڪل 6,5 هزار ڪم. اهي ڪردار ۾ بلڪل مختلف آهن. مکيه DWH ۾ ڪيترن ئي مختلف ۽ تمام خاص ذريعن کان ڊيٽا لوڊ ڪرڻ جا ڪم آهن، مکيه DWH اندر اسٽور فرنٽ کي ڳڻڻ جا ڪم آهن، ڊيٽا کي تيز DWH ۾ شايع ڪرڻ جا ڪم آهن، اتي ڪيترائي، ڪيترائي مختلف ڪم آهن - ۽ ايئر فلو. انهن سڀني کي ڏينهن کان پوءِ چيري ٿو. انگن ۾ ڳالهائڻ، هي آهي 2,3 هزار DWH (Hadoop) جي اندر مختلف پيچيدگي جي ELT ڪم، تقريبن. 2,5 سئو ڊيٽابيس ذريعن موجب هي هڪ ٽيم آهي 4 ETL ڊولپرز، جيڪي DWH ۾ ETL ڊيٽا پروسيسنگ ۾ ورهايل آهن ۽ DWH اندر ELT ڊيٽا پروسيسنگ ۽ يقيناً وڌيڪ. هڪ منتظم، جيڪو خدمت جي بنيادي ڍانچي سان واسطو رکي ٿو.

مستقبل لاء منصوبا

عملن جو تعداد ناگزير طور تي وڌي رهيو آهي، ۽ بنيادي شيء جيڪا اسان ڪنداسين ايئر فلو انفراسٽرڪچر جي لحاظ کان اسڪيلنگ آهي. اسان چاهيون ٿا هڪ ايئر فلو ڪلسٽر ٺاهڻ، سيلري جي ڪارڪنن لاءِ پيرن جو هڪ جوڙو مختص ڪرڻ، ۽ نوڪري جي شيڊيولنگ ​​جي عملن ۽ هڪ مخزن سان گڏ هڪ خود نقلي سر ٺاهڻ.

چرچا

اهو، يقينا، اهو سڀ ڪجهه ناهي جيڪو آئون ايئر فلو بابت ٻڌائڻ چاهيان ٿو، پر مون بنيادي نقطي کي اجاگر ڪرڻ جي ڪوشش ڪئي. بک کائڻ سان ايندي آهي، ڪوشش ڪريو ۽ توهان کي پسند ايندي :)

جو ذريعو: www.habr.com

تبصرو شامل ڪريو