ایئر فلو بیچ ڈیٹا پروسیسنگ کے عمل کو آسانی سے اور تیزی سے تیار کرنے اور برقرار رکھنے کا ایک ٹول ہے۔

ایئر فلو بیچ ڈیٹا پروسیسنگ کے عمل کو آسانی سے اور تیزی سے تیار کرنے اور برقرار رکھنے کا ایک ٹول ہے۔

ہیلو، حبر! اس مضمون میں میں بیچ ڈیٹا پروسیسنگ کے عمل کو تیار کرنے کے لیے ایک بہترین ٹول کے بارے میں بات کرنا چاہتا ہوں، مثال کے طور پر، کارپوریٹ DWH یا آپ کے DataLake کے بنیادی ڈھانچے میں۔ ہم اپاچی ایئر فلو کے بارے میں بات کریں گے (اس کے بعد ایئر فلو کہا جاتا ہے)۔ یہ Habré پر غیر منصفانہ طور پر توجہ سے محروم ہے، اور مرکزی حصے میں میں آپ کو یہ باور کرانے کی کوشش کروں گا کہ آپ کے ETL/ELT عمل کے لیے شیڈولر کا انتخاب کرتے وقت کم از کم Airflow کو دیکھنے کے قابل ہے۔

اس سے پہلے، میں نے DWH کے موضوع پر مضامین کی ایک سیریز لکھی تھی جب میں نے Tinkoff بینک میں کام کیا تھا۔ اب میں Mail.Ru گروپ کی ٹیم کا حصہ بن گیا ہوں اور گیمنگ ایریا میں ڈیٹا کے تجزیہ کے لیے ایک پلیٹ فارم تیار کر رہا ہوں۔ دراصل، جیسے ہی خبریں اور دلچسپ حل ظاہر ہوتے ہیں، میں اور میری ٹیم یہاں ڈیٹا اینالیٹکس کے اپنے پلیٹ فارم کے بارے میں بات کریں گے۔

طول وعرض

تو، چلو شروع کرتے ہیں. ہوا کا بہاؤ کیا ہے؟ یہ ایک لائبریری ہے (یا لائبریریوں کا سیٹ) کام کے عمل کو تیار کرنا، منصوبہ بندی کرنا اور نگرانی کرنا۔ ایئر فلو کی اہم خصوصیت: Python کوڈ کو عمل کی وضاحت (ترقی) کے لیے استعمال کیا جاتا ہے۔ اس کے آپ کے پروجیکٹ اور ڈیولپمنٹ کو منظم کرنے کے بہت سے فوائد ہیں: خلاصہ یہ کہ آپ کا (مثال کے طور پر) ETL پروجیکٹ صرف ایک ازگر کا پروجیکٹ ہے، اور آپ اسے اپنی مرضی کے مطابق ترتیب دے سکتے ہیں، بنیادی ڈھانچے کی تفصیلات، ٹیم کے سائز اور دیگر ضروریات. آلاتی طور پر سب کچھ آسان ہے۔ مثال کے طور پر PyCharm + Git استعمال کریں۔ یہ حیرت انگیز اور بہت آسان ہے!

اب آئیے ایئر فلو کے اہم اداروں کو دیکھتے ہیں۔ ان کے جوہر اور مقصد کو سمجھ کر، آپ اپنے عمل کے فن تعمیر کو بہترین طریقے سے ترتیب دے سکتے ہیں۔ شاید مرکزی ہستی ڈائریکٹڈ Acyclic گراف ہے (اس کے بعد DAG کہا جاتا ہے)۔

ماؤنٹین

ڈی اے جی آپ کے کاموں کی کچھ معنی خیز ایسوسی ایشن ہے جسے آپ ایک مخصوص شیڈول کے مطابق سختی سے متعین ترتیب میں مکمل کرنا چاہتے ہیں۔ ایئر فلو DAGs اور دیگر اداروں کے ساتھ کام کرنے کے لیے ایک آسان ویب انٹرفیس فراہم کرتا ہے:

ایئر فلو بیچ ڈیٹا پروسیسنگ کے عمل کو آسانی سے اور تیزی سے تیار کرنے اور برقرار رکھنے کا ایک ٹول ہے۔

ڈی اے جی اس طرح نظر آسکتا ہے:

ایئر فلو بیچ ڈیٹا پروسیسنگ کے عمل کو آسانی سے اور تیزی سے تیار کرنے اور برقرار رکھنے کا ایک ٹول ہے۔

ڈویلپر، ڈی اے جی کو ڈیزائن کرتے وقت آپریٹرز کا ایک سیٹ پیش کرتا ہے جس پر ڈی اے جی کے اندر کام کیے جائیں گے۔ یہاں ہم ایک اور اہم ادارے کی طرف آتے ہیں: ایئر فلو آپریٹر۔

آپریٹرز

آپریٹر ایک ایسا ادارہ ہوتا ہے جس کی بنیاد پر ملازمت کی مثالیں تخلیق کی جاتی ہیں، جو بیان کرتی ہے کہ ملازمت کی مثال کے نفاذ کے دوران کیا ہوگا۔ GitHub سے ہوا کا بہاؤ جاری ہے۔ پہلے سے ہی آپریٹرز کا ایک سیٹ استعمال کرنے کے لیے تیار ہے۔ مثالیں:

  • BashOperator - bash کمانڈ پر عمل کرنے کے لیے آپریٹر۔
  • PythonOperator - Python کوڈ کال کرنے کا آپریٹر۔
  • EmailOperator — ای میل بھیجنے کے لیے آپریٹر۔
  • HTTPOperator - HTTP درخواستوں کے ساتھ کام کرنے کے لیے آپریٹر۔
  • SqlOperator - ایس کیو ایل کوڈ پر عمل درآمد کرنے والا آپریٹر۔
  • سینسر ایک آپریٹر ہے جو کسی ایونٹ کے انتظار کے لیے ہوتا ہے (مطلوبہ وقت کی آمد، مطلوبہ فائل کی ظاہری شکل، ڈیٹا بیس میں ایک لائن، API کا جواب، وغیرہ)۔

مزید مخصوص آپریٹرز ہیں: DockerOperator، HiveOperator، S3FileTransferOperator، PrestoToMysqlOperator، SlackOperator۔

آپ اپنی خصوصیات کی بنیاد پر آپریٹرز بھی تیار کر سکتے ہیں اور انہیں اپنے پروجیکٹ میں استعمال کر سکتے ہیں۔ مثال کے طور پر، ہم نے MongoDBToHiveViaHdfsTransfer، MongoDB سے Hive میں دستاویزات برآمد کرنے کے لیے ایک آپریٹر، اور اس کے ساتھ کام کرنے کے لیے کئی آپریٹرز بنائے۔ کلک ہاؤس: CHLoadFromHiveOperator اور CHTableLoaderOperator۔ بنیادی طور پر، جیسے ہی کسی پروجیکٹ نے بنیادی بیانات پر بنائے گئے کوڈ کو کثرت سے استعمال کیا ہے، آپ اسے ایک نئے بیان میں بنانے کے بارے میں سوچ سکتے ہیں۔ یہ مزید ترقی کو آسان بنائے گا، اور آپ پروجیکٹ میں آپریٹرز کی اپنی لائبریری کو وسعت دیں گے۔

اگلا، کاموں کی ان تمام مثالوں کو انجام دینے کی ضرورت ہے، اور اب ہم شیڈولر کے بارے میں بات کریں گے۔

شیڈولر

ایئر فلو کا ٹاسک شیڈیولر بنایا گیا ہے۔ اجمود. Celery ایک Python لائبریری ہے جو آپ کو ایک قطار کے علاوہ غیر مطابقت پذیر اور تقسیم شدہ کاموں کے عمل کو منظم کرنے کی اجازت دیتی ہے۔ ایئر فلو کی طرف، تمام کاموں کو پول میں تقسیم کیا گیا ہے۔ پول دستی طور پر بنائے جاتے ہیں۔ عام طور پر، ان کا مقصد ذریعہ کے ساتھ کام کرنے کے کام کے بوجھ کو محدود کرنا یا DWH کے اندر کاموں کو ٹائپ کرنا ہے۔ پولز کا انتظام ویب انٹرفیس کے ذریعے کیا جا سکتا ہے:

ایئر فلو بیچ ڈیٹا پروسیسنگ کے عمل کو آسانی سے اور تیزی سے تیار کرنے اور برقرار رکھنے کا ایک ٹول ہے۔

ہر پول میں سلاٹ کی تعداد کی ایک حد ہوتی ہے۔ ڈی اے جی بناتے وقت، اسے ایک پول دیا جاتا ہے:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

ڈی اے جی کی سطح پر بیان کردہ پول کو ٹاسک لیول پر اوور رائڈ کیا جا سکتا ہے۔
ایک الگ عمل، شیڈیولر، ایئر فلو میں تمام کاموں کو شیڈول کرنے کا ذمہ دار ہے۔ دراصل، شیڈیولر عمل درآمد کے لیے کاموں کو ترتیب دینے کے تمام میکانکس سے نمٹتا ہے۔ کام انجام دینے سے پہلے کئی مراحل سے گزرتا ہے:

  1. ڈی اے جی میں پچھلے کام مکمل ہوچکے ہیں، نئے کام کی قطار لگ سکتی ہے۔
  2. قطار کو کاموں کی ترجیح کے لحاظ سے ترتیب دیا جاتا ہے (ترجیحات کو بھی کنٹرول کیا جا سکتا ہے)، اور اگر پول میں مفت سلاٹ ہے تو، کام کو عمل میں لایا جا سکتا ہے۔
  3. اگر کوئی مفت ورکر سیلری ہے تو اسے ٹاسک بھیجا جاتا ہے۔ وہ کام جسے آپ نے مسئلہ میں پروگرام کیا ہے، ایک یا دوسرے آپریٹر کا استعمال کرتے ہوئے شروع ہوتا ہے۔

کافی سادہ۔

شیڈولر تمام DAGs کے سیٹ اور DAGs کے اندر تمام کاموں پر چلتا ہے۔

شیڈیولر کے لیے ڈی اے جی کے ساتھ کام شروع کرنے کے لیے، ڈی اے جی کو ایک شیڈول ترتیب دینے کی ضرورت ہے:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

تیار شدہ پیش سیٹوں کا ایک سیٹ ہے: @once, @hourly, @daily, @weekly, @monthly, @yearly.

آپ کرون اظہار بھی استعمال کر سکتے ہیں:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

پھانسی کی تاریخ

یہ سمجھنے کے لیے کہ ایئر فلو کیسے کام کرتا ہے، یہ سمجھنا ضروری ہے کہ ڈی اے جی کے لیے عمل درآمد کی تاریخ کیا ہے۔ ایئر فلو میں، ڈی اے جی کے پاس عمل درآمد کی تاریخ کا طول و عرض ہوتا ہے، یعنی، ڈی اے جی کے کام کے شیڈول کے مطابق، ہر عمل درآمد کی تاریخ کے لیے ٹاسک انسٹینس بنائے جاتے ہیں۔ اور ہر عملدرآمد کی تاریخ کے لیے، کاموں کو دوبارہ عمل میں لایا جا سکتا ہے - یا، مثال کے طور پر، ایک ڈی اے جی ایک ساتھ عمل درآمد کی کئی تاریخوں میں کام کر سکتا ہے۔ یہ یہاں واضح طور پر دکھایا گیا ہے:

ایئر فلو بیچ ڈیٹا پروسیسنگ کے عمل کو آسانی سے اور تیزی سے تیار کرنے اور برقرار رکھنے کا ایک ٹول ہے۔

بدقسمتی سے (یا شاید خوش قسمتی سے: یہ صورتحال پر منحصر ہے)، اگر ڈی اے جی میں ٹاسک کے نفاذ کو درست کیا جاتا ہے، تو پچھلی ایگزیکیوشن ڈیٹ میں عمل درآمد ایڈجسٹمنٹ کو مدنظر رکھتے ہوئے آگے بڑھے گا۔ یہ اچھا ہے اگر آپ کو ایک نئے الگورتھم کا استعمال کرتے ہوئے پچھلے ادوار میں ڈیٹا کو دوبارہ گننے کی ضرورت ہو، لیکن یہ برا ہے کیونکہ نتیجہ کی تولیدی صلاحیت ختم ہو گئی ہے (یقیناً، کوئی بھی آپ کو گٹ سے سورس کوڈ کا مطلوبہ ورژن واپس کرنے کی زحمت نہیں دیتا اور اس کا حساب لگاتا ہے۔ آپ کو ایک وقت کی ضرورت ہے، جس طرح آپ کو اس کی ضرورت ہے)۔

کام پیدا کرنا

DAG کا نفاذ Python میں کوڈ ہے، لہذا ہمارے پاس کام کرتے وقت کوڈ کی مقدار کو کم کرنے کا ایک بہت ہی آسان طریقہ ہے، مثال کے طور پر، شارڈ ذرائع کے ساتھ۔ فرض کریں کہ آپ کے پاس ایک ذریعہ کے طور پر تین MySQL شارڈز ہیں، آپ کو ہر ایک میں چڑھنے اور کچھ ڈیٹا لینے کی ضرورت ہے۔ مزید یہ کہ آزادانہ اور متوازی طور پر۔ DAG میں Python کوڈ اس طرح نظر آ سکتا ہے:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

ڈی اے جی اس طرح لگتا ہے:

ایئر فلو بیچ ڈیٹا پروسیسنگ کے عمل کو آسانی سے اور تیزی سے تیار کرنے اور برقرار رکھنے کا ایک ٹول ہے۔

اس صورت میں، آپ صرف سیٹنگز کو ایڈجسٹ کرکے اور DAG کو اپ ڈیٹ کرکے شارڈ کو شامل یا ہٹا سکتے ہیں۔ آرام دہ!

آپ زیادہ پیچیدہ کوڈ جنریشن کا بھی استعمال کر سکتے ہیں، مثال کے طور پر، ڈیٹا بیس کی شکل میں ذرائع کے ساتھ کام کرنا یا ٹیبل کے ڈھانچے کی وضاحت کرنا، ٹیبل کے ساتھ کام کرنے کے لیے الگورتھم، اور DWH انفراسٹرکچر کی خصوصیات کو مدنظر رکھتے ہوئے، ایک عمل تیار کرنا۔ اپنے اسٹوریج میں N ٹیبل لوڈ کرنے کے لیے۔ یا، مثال کے طور پر، ایسے API کے ساتھ کام کرنا جو فہرست کی شکل میں پیرامیٹر کے ساتھ کام کرنے کی حمایت نہیں کرتا ہے، آپ اس فہرست سے DAG میں N ٹاسک تیار کر سکتے ہیں، API میں درخواستوں کے متوازی کو پول تک محدود کر سکتے ہیں، اور سکریپ کر سکتے ہیں۔ API سے ضروری ڈیٹا۔ لچکدار!

ذخیرہ

ایئر فلو کا اپنا بیک اینڈ ریپوزٹری ہے، ایک ڈیٹا بیس (مائی ایس کیو ایل یا پوسٹگریس ہو سکتا ہے، ہمارے پاس پوسٹگریس ہے)، جو کاموں کی سٹیٹس، ڈی اے جی، کنکشن سیٹنگز، عالمی متغیرات، وغیرہ وغیرہ کو اسٹور کرتا ہے۔ یہاں میں یہ کہنا چاہوں گا کہ ایئر فلو میں ذخیرہ بہت آسان ہے (تقریباً 20 میزیں) اور اگر آپ اس کے اوپر اپنا کوئی عمل بنانا چاہتے ہیں تو آسان ہے۔ مجھے انفارمیٹیکا ریپوزٹری میں 100500 ٹیبلز یاد ہیں، جن کا استفسار کرنے کا طریقہ سمجھنے سے پہلے کافی دیر تک مطالعہ کرنا پڑتا تھا۔

نگرانی

مخزن کی سادگی کو دیکھتے ہوئے، آپ ٹاسک مانیٹرنگ کا عمل بنا سکتے ہیں جو آپ کے لیے آسان ہو۔ ہم Zeppelin میں ایک نوٹ پیڈ استعمال کرتے ہیں، جہاں ہم کاموں کی حیثیت کو دیکھتے ہیں:

ایئر فلو بیچ ڈیٹا پروسیسنگ کے عمل کو آسانی سے اور تیزی سے تیار کرنے اور برقرار رکھنے کا ایک ٹول ہے۔

یہ خود ایئر فلو کا ویب انٹرفیس بھی ہوسکتا ہے:

ایئر فلو بیچ ڈیٹا پروسیسنگ کے عمل کو آسانی سے اور تیزی سے تیار کرنے اور برقرار رکھنے کا ایک ٹول ہے۔

ایئر فلو کوڈ اوپن سورس ہے، اس لیے ہم نے ٹیلیگرام میں الرٹ شامل کیا ہے۔ کسی کام کی ہر چلنے والی مثال، اگر کوئی غلطی ہوتی ہے، تو ٹیلیگرام میں گروپ کو اسپام کرتا ہے، جہاں پوری ڈیولپمنٹ اور سپورٹ ٹیم ہوتی ہے۔

ہمیں ٹیلیگرام (اگر ضرورت ہو) کے ذریعے فوری جواب موصول ہوتا ہے، اور Zeppelin کے ذریعے ہمیں Airflow میں کاموں کی مجموعی تصویر موصول ہوتی ہے۔

مجموعی طور پر

ایئر فلو بنیادی طور پر اوپن سورس ہے، اور آپ کو اس سے معجزات کی توقع نہیں کرنی چاہیے۔ کام کرنے والے حل کی تعمیر کے لیے وقت اور کوشش کرنے کے لیے تیار رہیں۔ مقصد قابل حصول ہے، مجھ پر یقین کرو، یہ اس کے قابل ہے۔ ترقی کی رفتار، لچک، نئے عمل کو شامل کرنے میں آسانی - آپ کو یہ پسند آئے گا۔ یقینا، آپ کو اس منصوبے کی تنظیم، ایئر فلو کے استحکام پر بہت زیادہ توجہ دینے کی ضرورت ہے: معجزات نہیں ہوتے ہیں.

اب ہمارے پاس ایئر فلو روزانہ کام کر رہا ہے۔ تقریباً 6,5 ہزار کام. وہ کردار میں کافی مختلف ہیں۔ بہت سے مختلف اور انتہائی مخصوص ذرائع سے ڈیٹا کو مین DWH میں لوڈ کرنے کے کام ہیں، مین DWH کے اندر اسٹور فرنٹ کا حساب لگانے کے کام ہیں، ڈیٹا کو تیز DWH میں شائع کرنے کے کام ہیں، بہت سے، بہت سے مختلف کام ہیں - اور ایئر فلو ان سب کو دن بہ دن چباتا ہے۔ تعداد میں بات کرتے ہوئے، یہ ہے 2,3 ہزار DWH (Hadoop) کے اندر مختلف پیچیدگیوں کے ELT کام، تقریباً۔ 2,5 سو ڈیٹا بیس ذرائع کے مطابق یہ ایک ٹیم ہے۔ 4 ETL ڈویلپرز، جو DWH میں ETL ڈیٹا پروسیسنگ اور DWH کے اندر ELT ڈیٹا پروسیسنگ میں تقسیم ہیں اور یقینا مزید ایک ایڈمن، جو سروس کے بنیادی ڈھانچے سے نمٹتا ہے۔

مستقبل کے لئے منصوبوں

عمل کی تعداد ناگزیر طور پر بڑھ رہی ہے، اور ایئر فلو انفراسٹرکچر کے حوالے سے ہم جو اہم کام کریں گے وہ ہے اسکیلنگ۔ ہم ایک ائیر فلو کلسٹر بنانا چاہتے ہیں، سیلری ورکرز کے لیے ٹانگوں کا ایک جوڑا مختص کرنا چاہتے ہیں، اور جاب شیڈولنگ کے عمل اور ایک ذخیرہ کے ساتھ ایک سیلف ڈپلیکیٹنگ ہیڈ بنانا چاہتے ہیں۔

اپسنہار

یقیناً یہ وہ سب کچھ نہیں ہے جو میں ایئر فلو کے بارے میں بتانا چاہوں گا، لیکن میں نے اہم نکات کو اجاگر کرنے کی کوشش کی۔ بھوک کھانے سے لگتی ہے، آزمائیں آپ کو پسند آئے گی :)

ماخذ: www.habr.com

نیا تبصرہ شامل کریں