اپاچی کافکا اور اسپارک اسٹریمنگ کے ساتھ اسٹریمنگ ڈیٹا پروسیسنگ

ہیلو، حبر! آج ہم ایک ایسا نظام بنائیں گے جو اسپارک اسٹریمنگ کا استعمال کرتے ہوئے اپاچی کافکا میسج اسٹریمز پر کارروائی کرے گا اور پروسیسنگ کے نتائج کو AWS RDS کلاؤڈ ڈیٹا بیس میں لکھے گا۔

آئیے تصور کریں کہ ایک مخصوص کریڈٹ ادارہ ہمیں اپنی تمام برانچوں میں آنے والی ٹرانزیکشنز پر کارروائی کرنے کا کام متعین کرتا ہے۔ یہ خزانے کے لیے کھلی کرنسی کی پوزیشن، لین دین کے لیے حد یا مالیاتی نتائج وغیرہ کا فوری حساب لگانے کے مقصد سے کیا جا سکتا ہے۔

جادو اور جادو منتر کے استعمال کے بغیر اس کیس کو کیسے نافذ کیا جائے - کٹ کے نیچے پڑھیں! جاؤ!

اپاچی کافکا اور اسپارک اسٹریمنگ کے ساتھ اسٹریمنگ ڈیٹا پروسیسنگ
(تصویری ماخذ)

تعارف

بلاشبہ، حقیقی وقت میں ڈیٹا کی ایک بڑی مقدار پر کارروائی جدید نظاموں میں استعمال کے لیے کافی مواقع فراہم کرتی ہے۔ اس کے لیے سب سے مشہور امتزاج میں سے ایک اپاچی کافکا اور اسپارک اسٹریمنگ کا ٹینڈم ہے، جہاں کافکا آنے والے پیغام کے پیکٹوں کا ایک سلسلہ بناتا ہے، اور اسپارک اسٹریمنگ ایک مقررہ وقت کے وقفے پر ان پیکٹوں پر کارروائی کرتی ہے۔

درخواست کی غلطی برداشت کو بڑھانے کے لیے، ہم چیک پوائنٹس کا استعمال کریں گے۔ اس طریقہ کار کے ساتھ، جب اسپارک سٹریمنگ انجن کو کھوئے ہوئے ڈیٹا کو بازیافت کرنے کی ضرورت ہوتی ہے، تو اسے صرف آخری چوکی پر واپس جانے اور وہاں سے دوبارہ حساب کتاب شروع کرنے کی ضرورت ہوتی ہے۔

ترقی یافتہ نظام کا فن تعمیر

اپاچی کافکا اور اسپارک اسٹریمنگ کے ساتھ اسٹریمنگ ڈیٹا پروسیسنگ

استعمال شدہ اجزاء:

  • اپاچی کافکا ایک تقسیم شدہ اشاعت-سبسکرائب پیغام رسانی کا نظام ہے۔ آف لائن اور آن لائن پیغام کے استعمال کے لیے موزوں ہے۔ ڈیٹا کے نقصان کو روکنے کے لیے، کافکا کے پیغامات ڈسک پر محفوظ کیے جاتے ہیں اور کلسٹر کے اندر نقل کیے جاتے ہیں۔ کافکا سسٹم زو کیپر سنکرونائزیشن سروس کے اوپر بنایا گیا ہے۔
  • اپاچی اسپارک اسٹریمنگ - اسٹریمنگ ڈیٹا پر کارروائی کرنے کے لیے اسپارک جزو۔ اسپارک سٹریمنگ ماڈیول ایک مائیکرو بیچ فن تعمیر کا استعمال کرتے ہوئے بنایا گیا ہے، جہاں ڈیٹا سٹریم کو چھوٹے ڈیٹا پیکٹوں کی مسلسل ترتیب سے تعبیر کیا جاتا ہے۔ اسپارک اسٹریمنگ مختلف ذرائع سے ڈیٹا لیتی ہے اور اسے چھوٹے پیکجوں میں یکجا کرتی ہے۔ نئے پیکجز باقاعدہ وقفوں پر بنائے جاتے ہیں۔ ہر وقت کے وقفے کے آغاز میں، ایک نیا پیکٹ بنایا جاتا ہے، اور اس وقفہ کے دوران موصول ہونے والا کوئی بھی ڈیٹا پیکٹ میں شامل کیا جاتا ہے۔ وقفہ کے اختتام پر، پیکٹ کی ترقی رک جاتی ہے۔ وقفہ کا سائز ایک پیرامیٹر کے ذریعہ طے کیا جاتا ہے جسے بیچ وقفہ کہا جاتا ہے۔
  • اپاچی اسپارک ایس کیو ایل - متعلقہ پروسیسنگ کو اسپارک فنکشنل پروگرامنگ کے ساتھ جوڑتا ہے۔ سٹرکچرڈ ڈیٹا کا مطلب ہے وہ ڈیٹا جس میں اسکیما ہے، یعنی تمام ریکارڈز کے لیے فیلڈز کا ایک سیٹ۔ اسپارک ایس کیو ایل مختلف قسم کے سٹرکچرڈ ڈیٹا ذرائع سے ان پٹ کو سپورٹ کرتا ہے اور، اسکیما کی معلومات کی دستیابی کی بدولت، یہ مؤثر طریقے سے ریکارڈز کے صرف مطلوبہ فیلڈز کو بازیافت کرسکتا ہے، اور ڈیٹا فریم APIs بھی فراہم کرتا ہے۔
  • AWS RDS ایک نسبتاً سستا کلاؤڈ بیسڈ ریلیشنل ڈیٹا بیس، ویب سروس ہے جو سیٹ اپ، آپریشن اور اسکیلنگ کو آسان بناتی ہے، اور براہ راست ایمیزون کے زیر انتظام ہے۔

کافکا سرور کو انسٹال اور چلانا

کافکا کو براہ راست استعمال کرنے سے پہلے، آپ کو یہ یقینی بنانا ہوگا کہ آپ کے پاس جاوا ہے، کیونکہ... JVM کام کے لیے استعمال کیا جاتا ہے:

sudo apt-get update 
sudo apt-get install default-jre
java -version

آئیے کافکا کے ساتھ کام کرنے کے لیے ایک نیا صارف بنائیں:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

اگلا، سرکاری اپاچی کافکا ویب سائٹ سے تقسیم ڈاؤن لوڈ کریں:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

ڈاؤن لوڈ کردہ آرکائیو کو کھولیں:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

اگلا مرحلہ اختیاری ہے۔ حقیقت یہ ہے کہ پہلے سے طے شدہ ترتیبات آپ کو اپاچی کافکا کی تمام صلاحیتوں کو مکمل طور پر استعمال کرنے کی اجازت نہیں دیتی ہیں۔ مثال کے طور پر، ایک موضوع، زمرہ، گروپ کو حذف کریں جس میں پیغامات شائع کیے جا سکیں۔ اسے تبدیل کرنے کے لیے، آئیے کنفیگریشن فائل میں ترمیم کریں:

vim ~/kafka/config/server.properties

فائل کے آخر میں درج ذیل کو شامل کریں:

delete.topic.enable = true

کافکا سرور شروع کرنے سے پہلے، آپ کو زو کیپر سرور شروع کرنے کی ضرورت ہے؛ ہم معاون اسکرپٹ استعمال کریں گے جو کافکا کی تقسیم کے ساتھ آتا ہے:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

زو کیپر کے کامیابی سے شروع ہونے کے بعد، کافکا سرور کو ایک الگ ٹرمینل میں لانچ کریں:

bin/kafka-server-start.sh config/server.properties

آئیے ایک نیا موضوع بناتے ہیں جسے ٹرانزیکشن کہتے ہیں:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

آئیے اس بات کو یقینی بنائیں کہ پارٹیشنز اور نقل کی مطلوبہ تعداد کے ساتھ ایک موضوع بنایا گیا ہے:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

اپاچی کافکا اور اسپارک اسٹریمنگ کے ساتھ اسٹریمنگ ڈیٹا پروسیسنگ

آئیے نئے تخلیق کردہ موضوع کے لیے پروڈیوسر اور صارف کی جانچ کے لمحات کو یاد نہ کریں۔ آپ پیغامات بھیجنے اور وصول کرنے کی جانچ کیسے کر سکتے ہیں اس بارے میں مزید تفصیلات سرکاری دستاویزات میں لکھی گئی ہیں۔ کچھ پیغامات بھیجیں۔. ٹھیک ہے، ہم KafkaProducer API کا استعمال کرتے ہوئے Python میں ایک پروڈیوسر لکھنے کی طرف بڑھتے ہیں۔

پروڈیوسر تحریر

پروڈیوسر بے ترتیب ڈیٹا تیار کرے گا - ہر سیکنڈ میں 100 پیغامات۔ بے ترتیب ڈیٹا سے ہمارا مطلب تین شعبوں پر مشتمل ایک لغت ہے:

  • برانچ - کریڈٹ ادارے کے فروخت کے مقام کا نام؛
  • کرنسی - لین دین کی کرنسی؛
  • رقم - لین دین کی رقم. رقم ایک مثبت نمبر ہو گی اگر یہ بینک کی طرف سے کرنسی کی خریداری ہے، اور منفی نمبر ہو گی اگر یہ فروخت ہے۔

پروڈیوسر کا کوڈ اس طرح لگتا ہے:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

اگلا، بھیجنے کا طریقہ استعمال کرتے ہوئے، ہم JSON فارمیٹ میں سرور کو، ہمیں مطلوبہ موضوع پر ایک پیغام بھیجتے ہیں:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

سکرپٹ چلاتے وقت، ہمیں ٹرمینل میں درج ذیل پیغامات موصول ہوتے ہیں:

اپاچی کافکا اور اسپارک اسٹریمنگ کے ساتھ اسٹریمنگ ڈیٹا پروسیسنگ

اس کا مطلب ہے کہ سب کچھ اسی طرح کام کرتا ہے جیسا کہ ہم چاہتے ہیں - پروڈیوسر ہمیں مطلوبہ موضوع پر پیغامات تیار کرتا اور بھیجتا ہے۔
اگلا مرحلہ سپارک کو انسٹال کرنا اور اس میسج اسٹریم پر کارروائی کرنا ہے۔

اپاچی اسپارک انسٹال کرنا

اپاچی چمک ایک عالمگیر اور اعلیٰ کارکردگی والا کلسٹر کمپیوٹنگ پلیٹ فارم ہے۔

اسپارک MapReduce ماڈل کے مقبول نفاذ سے بہتر کارکردگی کا مظاہرہ کرتا ہے جبکہ کمپیوٹیشن کی اقسام کی وسیع رینج کو سپورٹ کرتا ہے، بشمول انٹرایکٹو سوالات اور اسٹریم پروسیسنگ۔ بڑی مقدار میں ڈیٹا پر کارروائی کرتے وقت رفتار ایک اہم کردار ادا کرتی ہے، کیونکہ یہ وہ رفتار ہے جو آپ کو منٹوں یا گھنٹے انتظار کیے بغیر انٹرایکٹو کام کرنے کی اجازت دیتی ہے۔ اسپارک کی سب سے بڑی طاقت جو اسے اتنی تیزی سے بناتی ہے وہ میموری میں حساب کتاب کرنے کی صلاحیت ہے۔

یہ فریم ورک Scala میں لکھا گیا ہے، لہذا آپ کو پہلے اسے انسٹال کرنے کی ضرورت ہے:

sudo apt-get install scala

آفیشل ویب سائٹ سے سپارک ڈسٹری بیوشن ڈاؤن لوڈ کریں:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

محفوظ شدہ دستاویزات کو کھولیں:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

اسپارک کا راستہ bash فائل میں شامل کریں:

vim ~/.bashrc

ایڈیٹر کے ذریعے درج ذیل لائنیں شامل کریں:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

bashrc میں تبدیلیاں کرنے کے بعد نیچے دی گئی کمانڈ کو چلائیں:

source ~/.bashrc

AWS PostgreSQL تعینات کرنا

جو کچھ باقی ہے وہ ڈیٹا بیس کو تعینات کرنا ہے جس میں ہم اسٹریمز سے پروسیس شدہ معلومات کو اپ لوڈ کریں گے۔ اس کے لیے ہم AWS RDS سروس استعمال کریں گے۔

AWS کنسول پر جائیں -> AWS RDS -> ڈیٹا بیس -> ڈیٹا بیس بنائیں:
اپاچی کافکا اور اسپارک اسٹریمنگ کے ساتھ اسٹریمنگ ڈیٹا پروسیسنگ

PostgreSQL کو منتخب کریں اور اگلا پر کلک کریں:
اپاچی کافکا اور اسپارک اسٹریمنگ کے ساتھ اسٹریمنگ ڈیٹا پروسیسنگ

کیونکہ یہ مثال صرف تعلیمی مقاصد کے لیے ہے؛ ہم ایک مفت سرور استعمال کریں گے "کم سے کم" (مفت درجے):
اپاچی کافکا اور اسپارک اسٹریمنگ کے ساتھ اسٹریمنگ ڈیٹا پروسیسنگ

اس کے بعد، ہم فری ٹائر بلاک میں ایک ٹک لگاتے ہیں، اور اس کے بعد ہمیں خود بخود t2.micro کلاس کی ایک مثال پیش کی جائے گی - اگرچہ کمزور ہے، لیکن یہ ہمارے کام کے لیے مفت اور کافی موزوں ہے:
اپاچی کافکا اور اسپارک اسٹریمنگ کے ساتھ اسٹریمنگ ڈیٹا پروسیسنگ

اس کے بعد بہت اہم چیزیں آتی ہیں: ڈیٹا بیس مثال کا نام، ماسٹر صارف کا نام اور اس کا پاس ورڈ۔ آئیے مثال کا نام دیں: myHabrTest، master user: habrپاس ورڈ: habr12345 اور نیکسٹ بٹن پر کلک کریں:
اپاچی کافکا اور اسپارک اسٹریمنگ کے ساتھ اسٹریمنگ ڈیٹا پروسیسنگ

اگلے صفحے پر ہمارے ڈیٹا بیس سرور کی باہر سے رسائی (عوامی رسائی) اور بندرگاہ کی دستیابی کے لیے ذمہ دار پیرامیٹرز ہیں:

اپاچی کافکا اور اسپارک اسٹریمنگ کے ساتھ اسٹریمنگ ڈیٹا پروسیسنگ

آئیے VPC سیکیورٹی گروپ کے لیے ایک نئی ترتیب بنائیں، جو پورٹ 5432 (PostgreSQL) کے ذریعے ہمارے ڈیٹا بیس سرور تک بیرونی رسائی کی اجازت دے گی۔
آئیے الگ براؤزر ونڈو میں AWS کنسول پر VPC ڈیش بورڈ -> سیکیورٹی گروپس -> سیکیورٹی گروپ سیکشن بنائیں:
اپاچی کافکا اور اسپارک اسٹریمنگ کے ساتھ اسٹریمنگ ڈیٹا پروسیسنگ

ہم نے سیکورٹی گروپ - PostgreSQL کے لیے نام سیٹ کیا، ایک تفصیل، بتاتے ہیں کہ یہ گروپ کس VPC سے منسلک ہونا چاہیے اور Create بٹن پر کلک کریں:
اپاچی کافکا اور اسپارک اسٹریمنگ کے ساتھ اسٹریمنگ ڈیٹا پروسیسنگ

نئے بنائے گئے گروپ کے لیے پورٹ 5432 کے لیے ان باؤنڈ قوانین کو پُر کریں، جیسا کہ ذیل کی تصویر میں دکھایا گیا ہے۔ آپ دستی طور پر پورٹ کی وضاحت نہیں کر سکتے، لیکن ٹائپ ڈراپ ڈاؤن فہرست سے PostgreSQL کو منتخب کریں۔

سخت الفاظ میں، قدر ::/0 کا مطلب پوری دنیا سے سرور پر آنے والی ٹریفک کی دستیابی ہے، جو کہ مکمل طور پر درست نہیں ہے، لیکن مثال کا تجزیہ کرنے کے لیے، آئیے اپنے آپ کو اس نقطہ نظر کو استعمال کرنے کی اجازت دیں:
اپاچی کافکا اور اسپارک اسٹریمنگ کے ساتھ اسٹریمنگ ڈیٹا پروسیسنگ

ہم براؤزر کے صفحے پر واپس آتے ہیں، جہاں ہمارے پاس "اعلی درجے کی ترتیبات کو ترتیب دیں" کھلا ہے اور VPC سیکیورٹی گروپس سیکشن میں منتخب کریں -> موجودہ VPC سیکیورٹی گروپس کا انتخاب کریں -> PostgreSQL:
اپاچی کافکا اور اسپارک اسٹریمنگ کے ساتھ اسٹریمنگ ڈیٹا پروسیسنگ

اگلا، ڈیٹا بیس کے اختیارات میں -> ڈیٹا بیس کا نام -> نام سیٹ کریں - habrDB.

ہم بقیہ پیرامیٹرز کو چھوڑ سکتے ہیں، بیک اپ کو غیر فعال کرنے (بیک اپ برقرار رکھنے کی مدت - 0 دن)، نگرانی اور کارکردگی کی بصیرت، بطور ڈیفالٹ۔ بٹن پر کلک کریں۔ ڈیٹا بیس بنائیں:
اپاچی کافکا اور اسپارک اسٹریمنگ کے ساتھ اسٹریمنگ ڈیٹا پروسیسنگ

تھریڈ ہینڈلر

آخری مرحلہ اسپارک جاب کی ڈیولپمنٹ ہو گا، جو ہر دو سیکنڈ میں کافکا سے آنے والے نئے ڈیٹا کو پروسیس کرے گا اور نتیجہ کو ڈیٹا بیس میں داخل کرے گا۔

جیسا کہ اوپر بیان کیا گیا ہے، چوکیاں SparkStreaming میں ایک بنیادی میکانزم ہیں جو غلطی کو برداشت کرنے کو یقینی بنانے کے لیے ترتیب دینا ضروری ہے۔ ہم چیک پوائنٹس کا استعمال کریں گے اور، اگر طریقہ کار ناکام ہو جاتا ہے، تو اسپارک سٹریمنگ ماڈیول کو صرف آخری چیک پوائنٹ پر واپس جانے اور کھوئے ہوئے ڈیٹا کو بازیافت کرنے کے لیے اس سے دوبارہ حساب کتاب کرنے کی ضرورت ہوگی۔

چیک پوائنٹنگ کو غلطی برداشت کرنے والے، قابل اعتماد فائل سسٹم (جیسے HDFS، S3، وغیرہ) پر ڈائرکٹری ترتیب دے کر فعال کیا جا سکتا ہے جس میں چیک پوائنٹ کی معلومات کو محفوظ کیا جائے گا۔ یہ استعمال کرتے ہوئے کیا جاتا ہے، مثال کے طور پر:

streamingContext.checkpoint(checkpointDirectory)

ہماری مثال میں، ہم مندرجہ ذیل طریقہ استعمال کریں گے، یعنی، اگر checkpointDirectory موجود ہے، تو پھر سیاق و سباق کو چیک پوائنٹ ڈیٹا سے دوبارہ بنایا جائے گا۔ اگر ڈائرکٹری موجود نہیں ہے (یعنی پہلی بار عمل میں آئی ہے)، تو فنکشن ٹو کریٹ کونٹکس کو ایک نیا سیاق و سباق بنانے اور DStreams کو ترتیب دینے کے لیے کہا جاتا ہے:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

ہم KafkaUtils لائبریری کے createDirectStream طریقہ کا استعمال کرتے ہوئے "لین دین" کے موضوع سے مربوط ہونے کے لیے ایک DirectStream آبجیکٹ بناتے ہیں:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

آنے والے ڈیٹا کو JSON فارمیٹ میں پارس کرنا:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

اسپارک ایس کیو ایل کا استعمال کرتے ہوئے، ہم ایک سادہ گروپ بندی کرتے ہیں اور نتیجہ کنسول میں ظاہر کرتے ہیں:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

استفسار کا متن حاصل کرنا اور اسے سپارک ایس کیو ایل کے ذریعے چلانا:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

اور پھر ہم نتیجے میں جمع شدہ ڈیٹا کو AWS RDS میں ٹیبل میں محفوظ کرتے ہیں۔ ڈیٹا بیس ٹیبل میں جمع کرنے کے نتائج کو محفوظ کرنے کے لیے، ہم ڈیٹا فریم آبجیکٹ کے لکھنے کا طریقہ استعمال کریں گے:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

AWS RDS سے کنکشن قائم کرنے کے بارے میں چند الفاظ۔ ہم نے "AWS PostgreSQL کی تعیناتی" مرحلے پر اس کے لیے صارف اور پاس ورڈ بنایا۔ آپ کو اینڈ پوائنٹ کو ڈیٹا بیس سرور یو آر ایل کے طور پر استعمال کرنا چاہیے، جو کنیکٹیویٹی اور سیکیورٹی سیکشن میں ظاہر ہوتا ہے:

اپاچی کافکا اور اسپارک اسٹریمنگ کے ساتھ اسٹریمنگ ڈیٹا پروسیسنگ

اسپارک اور کافکا کو صحیح طریقے سے مربوط کرنے کے لیے، آپ کو آرٹفیکٹ کا استعمال کرتے ہوئے smark-submit کے ذریعے کام چلانا چاہیے۔ spark-streaming-kafka-0-8_2.11. مزید برآں، ہم PostgreSQL ڈیٹا بیس کے ساتھ بات چیت کے لیے ایک نمونہ بھی استعمال کریں گے؛ ہم انہیں --packages کے ذریعے منتقل کریں گے۔

اسکرپٹ کی لچک کے لیے، ہم ان پٹ پیرامیٹرز کے طور پر میسج سرور کا نام اور اس موضوع کو بھی شامل کریں گے جس سے ہم ڈیٹا حاصل کرنا چاہتے ہیں۔

لہذا، یہ نظام کی فعالیت کو شروع کرنے اور جانچنے کا وقت ہے:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

سب کچھ ہو گیا! جیسا کہ آپ نیچے دی گئی تصویر میں دیکھ سکتے ہیں، جب ایپلیکیشن چل رہی ہے، نئے مجموعی نتائج ہر 2 سیکنڈ میں آؤٹ پٹ ہوتے ہیں، کیونکہ جب ہم StreamingContext آبجیکٹ بناتے ہیں تو ہم بیچنگ کا وقفہ 2 سیکنڈ پر سیٹ کرتے ہیں:

اپاچی کافکا اور اسپارک اسٹریمنگ کے ساتھ اسٹریمنگ ڈیٹا پروسیسنگ

اگلا، ہم ٹیبل میں ریکارڈ کی موجودگی کو چیک کرنے کے لیے ڈیٹا بیس سے ایک سادہ سوال کرتے ہیں۔ transaction_flow:

اپاچی کافکا اور اسپارک اسٹریمنگ کے ساتھ اسٹریمنگ ڈیٹا پروسیسنگ

حاصل يہ ہوا

اس مضمون نے اپاچی کافکا اور پوسٹگری ایس کیو ایل کے ساتھ مل کر اسپارک اسٹریمنگ کا استعمال کرتے ہوئے معلومات کی اسٹریم پروسیسنگ کی ایک مثال کو دیکھا۔ مختلف ذرائع سے ڈیٹا میں اضافے کے ساتھ، اسٹریمنگ اور ریئل ٹائم ایپلی کیشنز بنانے کے لیے اسپارک اسٹریمنگ کی عملی قدر کا اندازہ لگانا مشکل ہے۔

آپ کو میرے ذخیرے میں مکمل سورس کوڈ مل سکتا ہے۔ GitHub کے.

مجھے اس مضمون پر گفتگو کرتے ہوئے خوشی ہوئی، میں آپ کے تبصروں کا منتظر ہوں، اور مجھے تمام خیال رکھنے والے قارئین سے تعمیری تنقید کی بھی امید ہے۔

میں تمہارے لیے کامیابی چاہتا ہوں!

پی ایس. ابتدائی طور پر یہ ایک مقامی PostgreSQL ڈیٹا بیس استعمال کرنے کا منصوبہ بنایا گیا تھا، لیکن AWS سے میری محبت کو دیکھتے ہوئے، میں نے ڈیٹا بیس کو کلاؤڈ پر منتقل کرنے کا فیصلہ کیا۔ اس موضوع پر اگلے مضمون میں، میں AWS Kinesis اور AWS EMR کا استعمال کرتے ہوئے AWS میں اوپر بیان کردہ پورے نظام کو کیسے لاگو کرنا ہے یہ دکھاؤں گا۔ خبروں پر عمل کریں!

ماخذ: www.habr.com

نیا تبصرہ شامل کریں