اسپارڪ اسٽريمنگ سان اپاچي ڪافڪا ۽ اسٽريمنگ ڊيٽا پروسيسنگ

هيلو، حبر! اڄ اسان هڪ سسٽم ٺاهينداسين جيڪو اسپارڪ اسٽريمنگ استعمال ڪندي Apache Kafka ميسيج اسٽريمز تي عمل ڪندو ۽ پروسيسنگ جا نتيجا AWS RDS ڪلائوڊ ڊيٽابيس ۾ لکندو.

اچو ته تصور ڪريون ته هڪ خاص ڪريڊٽ ادارو اسان کي ان جي سڀني شاخن ۾ ايندڙ ٽرانزيڪشن کي پروسيسنگ ڪرڻ جو ڪم مقرر ڪري ٿو. اهو ٿي سگهي ٿو فوري طور تي حساب ڪرڻ جي مقصد لاءِ هڪ کليل ڪرنسي جي پوزيشن لاءِ خزاني، حدن يا مالي نتيجن لاءِ ٽرانزيڪشن وغيره.

جادو ۽ جادو منتر جي استعمال کان سواء هن ڪيس کي ڪيئن لاڳو ڪرڻ - ڪٽ جي هيٺان پڙهو! وڃ!

اسپارڪ اسٽريمنگ سان اپاچي ڪافڪا ۽ اسٽريمنگ ڊيٽا پروسيسنگ
(تصوير جو ذريعو)

تعارف

يقينن، حقيقي وقت ۾ ڊيٽا جي وڏي مقدار کي پروسيسنگ جديد سسٽم ۾ استعمال لاء ڪافي موقعا فراهم ڪري ٿي. ان لاءِ سڀ کان وڌيڪ مشهور مجموعن مان هڪ آهي اپاچي ڪافڪا ۽ اسپارڪ اسٽريمنگ جو ٽينڊم، جتي ڪافڪا ايندڙ پيغامن جي پيڪٽن جو هڪ وهڪرو ٺاهي ٿو، ۽ اسپارڪ اسٽريمنگ انهن پيڪن کي هڪ مقرر وقت جي وقفي تي پروسيس ڪري ٿو.

ايپليڪيشن جي غلطي رواداري کي وڌائڻ لاء، اسان چيڪ پوسٽون استعمال ڪنداسين. هن ميکانيزم سان، جڏهن اسپارڪ اسٽريمنگ انجڻ کي وڃايل ڊيٽا بحال ڪرڻ جي ضرورت آهي، ان کي صرف آخري چيڪ پوائنٽ تي واپس وڃڻو پوندو ۽ اتان کان حساب ڪتاب ٻيهر شروع ڪرڻو پوندو.

ترقي يافته نظام جو فن تعمير

اسپارڪ اسٽريمنگ سان اپاچي ڪافڪا ۽ اسٽريمنگ ڊيٽا پروسيسنگ

استعمال ٿيل اجزاء:

  • ايپيڪي ڪيفيڪا هڪ تقسيم پبلش-سبسڪرائب ميسيجنگ سسٽم آهي. ٻنهي آف لائن ۽ آن لائن پيغام واپرائڻ لاء مناسب. ڊيٽا جي نقصان کي روڪڻ لاء، ڪافڪا پيغام ڊسڪ تي ذخيرو ٿيل آهن ۽ ڪلستر جي اندر نقل ٿيل آهن. ڪافڪا سسٽم ZooKeeper synchronization service جي چوٽي تي ٺهيل آهي؛
  • Apache Spark اسٽريمنگ - اسپارڪ جزو پروسيسنگ اسٽريمنگ ڊيٽا لاءِ. اسپارڪ اسٽريمنگ ماڊل هڪ مائڪرو بيچ آرڪيٽيڪچر استعمال ڪندي ٺاهيو ويو آهي، جتي ڊيٽا اسٽريم کي ننڍڙن ڊيٽا پيڪٽس جي لڳاتار تسلسل طور تشريح ڪئي ويندي آهي. اسپارڪ اسٽريمنگ مختلف ذريعن کان ڊيٽا وٺي ٿي ۽ ان کي ننڍڙن پيڪيجز ۾ گڏ ڪري ٿي. نوان پيڪيجز باقاعده وقفن تي ٺاهيا ويا آهن. هر وقت جي وقفي جي شروعات ۾، هڪ نئون پيڪٽ ٺاهيو ويندو آهي، ۽ انهي وقفي دوران حاصل ڪيل ڊيٽا کي پيڪٽ ۾ شامل ڪيو ويندو آهي. وقفي جي آخر ۾، پيٽ جي واڌ کي روڪي ٿو. وقفي جي ماپ جو اندازو لڳايو ويندو آهي هڪ پيٽرول ذريعي جنهن کي بيچ وقفو سڏيو ويندو آهي.
  • Apache Spark SQL - اسپارڪ فنڪشنل پروگرامنگ سان لاڳاپيل پروسيسنگ کي گڏ ڪري ٿو. ٺهيل ڊيٽا جو مطلب آهي ڊيٽا جنهن ۾ هڪ اسڪيما آهي، اهو آهي، سڀني رڪارڊ لاء فيلڊ جو هڪ واحد سيٽ. Spark SQL مختلف قسم جي منظم ڊيٽا ذريعن مان ان پٽ کي سپورٽ ڪري ٿو ۽، اسڪيما جي معلومات جي دستيابي جي مهرباني، اهو صرف رڪارڊ جي گهربل شعبن کي موثر طريقي سان ٻيهر حاصل ڪري سگهي ٿو، ۽ ڊيٽا فريم APIs پڻ مهيا ڪري ٿو.
  • AWS RDS هڪ نسبتا سستو ڪلائوڊ تي ٻڌل لاڳاپو ڊيٽابيس، ويب سروس آهي جيڪا سيٽ اپ، آپريشن ۽ اسڪيلنگ کي آسان بڻائي ٿي، ۽ سڌو سنئون Amazon پاران ترتيب ڏنل آهي.

ڪافڪا سرور کي انسٽال ڪرڻ ۽ هلائڻ

ڪافڪا کي سڌو استعمال ڪرڻ کان پهريان، توهان کي پڪ ڪرڻ جي ضرورت آهي ته توهان وٽ جاوا آهي، ڇاڪاڻ ته ... JVM ڪم لاء استعمال ڪيو ويندو آهي:

sudo apt-get update 
sudo apt-get install default-jre
java -version

اچو ته ڪافڪا سان ڪم ڪرڻ لاءِ نئون يوزر ٺاهيون:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

اڳيون، تقسيم ڊائون لوڊ ڪريو سرڪاري Apache Kafka ويب سائيٽ تان:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

ڊائون لوڊ ٿيل آرڪائيو کي کوليو:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

ايندڙ قدم اختياري آهي. حقيقت اها آهي ته ڊفالٽ سيٽنگون توهان کي Apache Kafka جي سڀني صلاحيتن کي مڪمل طور تي استعمال ڪرڻ جي اجازت نه ڏيندا آهن. مثال طور، حذف ڪريو ھڪڙو موضوع، ڪيٽيگري، گروپ جنھن ۾ پيغام شايع ڪري سگھجن ٿا. ھن کي تبديل ڪرڻ لاء، اچو ته ترتيب واري فائل کي تبديل ڪريو:

vim ~/kafka/config/server.properties

فائل جي آخر ۾ ھيٺيون شامل ڪريو:

delete.topic.enable = true

ڪافڪا سرور شروع ڪرڻ کان پهريان، توهان کي ZooKeeper سرور شروع ڪرڻ جي ضرورت آهي؛ اسان مددگار اسڪرپٽ استعمال ڪنداسين جيڪو ڪافڪا جي تقسيم سان اچي ٿو:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

ZooKeeper ڪاميابيءَ سان شروع ٿيڻ کان پوءِ، ڪافڪا سرور کي الڳ ٽرمينل ۾ لانچ ڪريو:

bin/kafka-server-start.sh config/server.properties

اچو ته ٽرانزيڪشن نالي هڪ نئون موضوع ٺاهيون:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

اچو ته پڪ ڪريون ته هڪ موضوع ٺاهيو ويو آهي گهربل تعداد جي ورهاڱي ۽ نقل سان:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

اسپارڪ اسٽريمنگ سان اپاچي ڪافڪا ۽ اسٽريمنگ ڊيٽا پروسيسنگ

اچو ته نئين ٺاهيل موضوع لاء پروڊڪٽر ۽ صارف جي جانچ ڪرڻ جي لمحن کي وڃايو. وڌيڪ تفصيل بابت توهان ڪيئن ٽيسٽ ڪري سگهو ٿا پيغام موڪلڻ ۽ وصول ڪرڻ جي سرڪاري دستاويزن ۾ لکيل آهن - ڪجھ پيغام موڪليو. خير، اسان KafkaProducer API استعمال ڪندي Python ۾ هڪ پروڊيوسر لکڻ تي اڳتي وڌون ٿا.

ٺاهيندڙ لکڻ

ٺاهيندڙ بي ترتيب ڊيٽا ٺاهيندو - 100 پيغام هر سيڪنڊ. بي ترتيب ڊيٽا مان اسان جو مطلب آهي هڪ لغت جنهن ۾ ٽن شعبن تي مشتمل آهي:

  • شاخ - ڪريڊٽ اداري جي وڪري جو نالو؛
  • ڪرنسي - ٽرانزيڪشن ڪرنسي؛
  • رقم - ٽرانزيڪشن جي رقم. رقم هڪ مثبت نمبر هوندو جيڪڏهن اها بئنڪ طرفان ڪرنسي جي خريداري آهي، ۽ هڪ منفي نمبر جيڪڏهن اها وڪرو آهي.

پروسيسر لاء ڪوڊ هن طرح ڏسڻ ۾ اچي ٿو:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

اڳيون، موڪلڻ جو طريقو استعمال ڪندي، اسان سرور ڏانهن پيغام موڪليندا آهيون، جنهن موضوع تي اسان کي ضرورت آهي، JSON فارميٽ ۾:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

جڏهن اسڪرپٽ هلائيندا آهيون، اسان ٽرمينل ۾ هيٺيون پيغام وصول ڪندا آهيون:

اسپارڪ اسٽريمنگ سان اپاچي ڪافڪا ۽ اسٽريمنگ ڊيٽا پروسيسنگ

ان جو مطلب اهو آهي ته هر شي ڪم ڪري ٿي جيئن اسان چاهيون ٿا - پروڊيوسر پيدا ڪري ٿو ۽ پيغام موڪلي ٿو انهي موضوع تي جيڪو اسان کي گهربل آهي.
ايندڙ قدم اسپارڪ کي انسٽال ڪرڻ ۽ هن پيغام واري وهڪرو کي پروسيس ڪرڻ آهي.

Apache Spark انسٽال ڪرڻ

Apache Apache هڪ آفاقي ۽ اعليٰ ڪارڪردگي وارو ڪلسٽر ڪمپيوٽنگ پليٽ فارم آهي.

اسپارڪ MapReduce ماڊل جي مقبول عملن کان بھتر ڪم ڪري ٿو، جڏھن ته حسابن جي قسمن جي وسيع رينج کي سپورٽ ڪري ٿو، بشمول انٽرايڪٽو سوالن ۽ اسٽريم پروسيسنگ. اسپيڊ هڪ اهم ڪردار ادا ڪري ٿي جڏهن ڊيٽا جي وڏي مقدار کي پروسيس ڪري ٿي، ڇو ته اها رفتار آهي جيڪا توهان کي ڪم ڪرڻ جي اجازت ڏئي ٿي بغير ڪنهن منٽ يا ڪلاڪ جي انتظار جي. اسپارڪ جي سڀ کان وڏي طاقت جيڪا ان کي تمام تيز بڻائي ٿي ان جي ميموري حساب ڪتاب ڪرڻ جي صلاحيت آهي.

هي فريم ورڪ اسڪالا ۾ لکيل آهي، تنهنڪري توهان کي پهريان ان کي انسٽال ڪرڻو پوندو:

sudo apt-get install scala

ڊائون لوڊ ڪريو اسپارڪ تقسيم سرڪاري ويب سائيٽ تان:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

آرڪائيو کي کوليو:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

بش فائل ۾ اسپارڪ جو رستو شامل ڪريو:

vim ~/.bashrc

ايڊيٽر ذريعي ھيٺيون لائينون شامل ڪريو:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

bashrc ۾ تبديليون ڪرڻ کان پوءِ ھيٺ ڏنل حڪم ھلايو:

source ~/.bashrc

AWS PostgreSQL کي ترتيب ڏيڻ

باقي اهو آهي ته ڊيٽابيس کي ترتيب ڏيو جنهن ۾ اسين پروسيس ٿيل معلومات کي اسٽريم مان اپ لوڊ ڪنداسين. ان لاءِ اسان استعمال ڪنداسين AWS RDS سروس.

AWS ڪنسول ڏانھن وڃو -> AWS RDS -> ڊيٽابيس -> ڊيٽابيس ٺاھيو:
اسپارڪ اسٽريمنگ سان اپاچي ڪافڪا ۽ اسٽريمنگ ڊيٽا پروسيسنگ

منتخب ڪريو PostgreSQL ۽ ڪلڪ ڪريو اڳيون:
اسپارڪ اسٽريمنگ سان اپاچي ڪافڪا ۽ اسٽريمنگ ڊيٽا پروسيسنگ

ڇاڪاڻ ته هي مثال صرف تعليمي مقصدن لاءِ آهي؛ اسان استعمال ڪنداسين مفت سرور ”گهٽ ۾ گهٽ“ (مفت درجو):
اسپارڪ اسٽريمنگ سان اپاچي ڪافڪا ۽ اسٽريمنگ ڊيٽا پروسيسنگ

اڳيون، اسان مفت ٽائر بلاڪ ۾ هڪ ٽڪ لڳايو، ۽ ان کان پوء اسان کي خودڪار طور تي t2.micro ڪلاس جو هڪ مثال پيش ڪيو ويندو - جيتوڻيڪ ڪمزور، اهو مفت آهي ۽ اسان جي ڪم لاء بلڪل مناسب آهي:
اسپارڪ اسٽريمنگ سان اپاچي ڪافڪا ۽ اسٽريمنگ ڊيٽا پروسيسنگ

اڳيان اچي ٿو تمام ضروري شيون: ڊيٽابيس جو نالو مثال طور، ماسٽر استعمال ڪندڙ جو نالو ۽ سندس پاسورڊ. اچو ته مثال جو نالو ڏيو: myHabrTest، ماسٽر استعمال ڪندڙ: هبر، پاسورڊ: habr12345 ۽ ايندڙ بٽڻ تي ڪلڪ ڪريو:
اسپارڪ اسٽريمنگ سان اپاچي ڪافڪا ۽ اسٽريمنگ ڊيٽا پروسيسنگ

ايندڙ صفحي تي اسان جي ڊيٽابيس سرور جي ٻاهران (عوامي رسائي) ۽ بندرگاهن جي دستيابي لاءِ ذميوار آهن:

اسپارڪ اسٽريمنگ سان اپاچي ڪافڪا ۽ اسٽريمنگ ڊيٽا پروسيسنگ

اچو ته VPC سيڪيورٽي گروپ لاءِ هڪ نئين سيٽنگ ٺاهي، جيڪا اسان جي ڊيٽابيس سرور تائين بندرگاهه 5432 (PostgreSQL) ذريعي ٻاهرين رسائي جي اجازت ڏئي ٿي.
اچو ته AWS ڪنسول ڏانهن هڪ الڳ برائوزر ونڊو ۾ VPC ڊيش بورڊ ڏانهن وڃو -> سيڪيورٽي گروپ -> سيڪيورٽي گروپ سيڪشن ٺاهيو:
اسپارڪ اسٽريمنگ سان اپاچي ڪافڪا ۽ اسٽريمنگ ڊيٽا پروسيسنگ

اسان سيڪيورٽي گروپ جو نالو مقرر ڪيو - PostgreSQL، هڪ وضاحت، ظاهر ڪيو ته ڪهڙي VPC هن گروپ سان لاڳاپيل هجڻ گهرجي ۽ ڪلڪ ڪريو ٺاهيو بٽڻ:
اسپارڪ اسٽريمنگ سان اپاچي ڪافڪا ۽ اسٽريمنگ ڊيٽا پروسيسنگ

نئين ٺاهيل گروپ لاءِ پورٽ 5432 لاءِ اندريون ضابطا ڀريو، جيئن هيٺ ڏنل تصوير ۾ ڏيکاريل آهي. توهان دستي طور تي بندرگاهن جي وضاحت نٿا ڪري سگهو، پر قسم جي ڊراپ-ڊائون لسٽ مان PostgreSQL چونڊيو.

سختي سان ڳالهائڻ، قدر ::/0 جو مطلب آهي سرور ڏانهن ايندڙ ٽرئفڪ جي دستيابي سڄي دنيا مان، جيڪو صحيح طور تي مڪمل طور تي درست ناهي، پر مثال جي تجزيو ڪرڻ لاء، اچو ته پاڻ کي هن طريقي سان استعمال ڪرڻ جي اجازت ڏيو:
اسپارڪ اسٽريمنگ سان اپاچي ڪافڪا ۽ اسٽريمنگ ڊيٽا پروسيسنگ

اسان برائوزر جي صفحي ڏانھن واپس وڃون ٿا، جتي اسان وٽ آھي "ڳوڙھي سيٽنگون ترتيب ڏيو" کوليو ۽ چونڊيو VPC سيڪيورٽي گروپ سيڪشن ۾ -> موجوده VPC سيڪيورٽي گروپ چونڊيو -> PostgreSQL:
اسپارڪ اسٽريمنگ سان اپاچي ڪافڪا ۽ اسٽريمنگ ڊيٽا پروسيسنگ

اڳيون، ڊيٽابيس جي اختيارن ۾ -> ڊيٽابيس جو نالو -> نالو مقرر ڪريو - habrDB.

اسان ڇڏي سگھون ٿا باقي پيرا ميٽرز، سواءِ بيڪ اپ کي غير فعال ڪرڻ (بيڪ اپ برقرار رکڻ جي مدت - 0 ڏينهن)، مانيٽرنگ ۽ پرفارمنس بصيرت، ڊفالٽ طور. بٽڻ تي ڪلڪ ڪريو ڊيٽابيس ٺاهيو:
اسپارڪ اسٽريمنگ سان اپاچي ڪافڪا ۽ اسٽريمنگ ڊيٽا پروسيسنگ

ڌاڳو سنڀاليندڙ

آخري مرحلي ۾ اسپارڪ جاب جي ترقي ٿيندي، جيڪا هر ٻن سيڪنڊن ۾ ڪافڪا کان ايندڙ نئين ڊيٽا کي پروسيس ڪندي ۽ نتيجن کي ڊيٽابيس ۾ داخل ڪندي.

جيئن مٿي ذڪر ڪيو ويو آهي، چيڪ پوائنٽس اسپارڪ اسٽريمنگ ۾ هڪ بنيادي ميکانيزم آهن جيڪي غلطي رواداري کي يقيني بڻائڻ لاءِ ترتيب ڏيڻ گهرجن. اسان چيڪ پوائنٽس استعمال ڪنداسين ۽، جيڪڏهن طريقيڪار ناڪام ٿئي ٿو، اسپارڪ اسٽريمنگ ماڊل کي صرف آخري چيڪ پوائنٽ تي واپس وڃڻ جي ضرورت پوندي ۽ گم ٿيل ڊيٽا کي بحال ڪرڻ لاءِ ان کان حساب ڪتاب ٻيهر شروع ڪرڻو پوندو.

چيڪ پوائنٽنگ کي ڊاريڪٽري ترتيب ڏيڻ سان چالو ڪري سگھجي ٿو غلطي برداشت ڪندڙ، قابل اعتماد فائل سسٽم (جهڙوڪ HDFS، S3، وغيره) جنهن ۾ چيڪ پوائنٽ جي معلومات محفوظ ڪئي ويندي. اهو استعمال ڪيو ويندو آهي، مثال طور:

streamingContext.checkpoint(checkpointDirectory)

اسان جي مثال ۾، اسان هيٺين طريقي کي استعمال ڪنداسين، يعني، جيڪڏهن checkpointDirectory موجود آهي، ته پوءِ حوالو چيڪ پوائنٽ ڊيٽا مان ٻيهر ٺاهيو ويندو. جيڪڏهن ڊاريڪٽري موجود نه آهي (يعني پهريون ڀيرو عمل ڪيو ويو)، پوء فنڪشن ToCreateContext کي سڏيو ويندو آهي هڪ نئون حوالو ٺاهڻ ۽ ڊي اسٽريمز کي ترتيب ڏيڻ لاءِ:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

اسان KafkaUtils لائبريري جي createDirectStream طريقي کي استعمال ڪندي "ٽرانزيڪشن" موضوع سان ڳنڍڻ لاءِ هڪ DirectStream اعتراض ٺاهيندا آهيون:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

JSON فارميٽ ۾ ايندڙ ڊيٽا کي پارس ڪرڻ:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Spark SQL استعمال ڪندي، اسان هڪ سادي گروهه ڪندا آهيون ۽ نتيجو ڪنسول ۾ ڏيکاري ٿو:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

سوال جو متن حاصل ڪرڻ ۽ ان کي اسپارڪ SQL ذريعي هلائڻ:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

۽ پوءِ اسان نتيجو مجموعي ڊيٽا کي AWS RDS ۾ ٽيبل ۾ محفوظ ڪريون ٿا. مجموعي نتيجن کي ڊيٽابيس جي ٽيبل تي محفوظ ڪرڻ لاء، اسان ڊيٽا فريم اعتراض جي لکڻ جو طريقو استعمال ڪنداسين:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

AWS RDS سان ڪنيڪشن قائم ڪرڻ بابت ڪجھ لفظ. اسان "AWS PostgreSQL کي ترتيب ڏيڻ" قدم تي ان لاءِ يوزر ۽ پاسورڊ ٺاهيو. توھان کي استعمال ڪرڻ گھرجي Endpoint کي ڊيٽابيس سرور url طور، جيڪو ڏيکاريل آھي ڪنيڪشن ۽ سيڪيورٽي سيڪشن ۾:

اسپارڪ اسٽريمنگ سان اپاچي ڪافڪا ۽ اسٽريمنگ ڊيٽا پروسيسنگ

اسپارڪ ۽ ڪافڪا کي صحيح طريقي سان ڳنڍڻ لاءِ، توهان کي ڪم هلائڻ گهرجي smark-submit ذريعي artifact استعمال ڪندي اسپارڪ-اسٽريمنگ-ڪافڪا-0-8_2.11. اضافي طور تي، اسان پوسٽ گري ايس ايس ايل ڊيٽابيس سان رابطي لاءِ هڪ نمونو پڻ استعمال ڪنداسين؛ اسان انهن کي --packages ذريعي منتقل ڪنداسين.

اسڪرپٽ جي لچڪداريءَ لاءِ، اسان ان پٽ پيراميٽر جي طور تي پيغام جي سرور جو نالو ۽ موضوع جنهن مان ڊيٽا حاصل ڪرڻ چاهيون ٿا شامل ڪنداسين.

تنهن ڪري، اهو وقت شروع ڪرڻ ۽ سسٽم جي ڪارڪردگي کي جانچڻ جو وقت آهي:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

سڀ ڪجھ ڪم ڪيو! جيئن توهان هيٺ ڏنل تصوير ۾ ڏسي سگهو ٿا، جڏهن ته ايپليڪيشن هلندي آهي، نئين مجموعي جا نتيجا هر 2 سيڪنڊن ۾ نڪرندا آهن، ڇاڪاڻ ته اسان بيچنگ وقفي کي 2 سيڪنڊن تي سيٽ ڪيو جڏهن اسان StreamingContext اعتراض ٺاهيو:

اسپارڪ اسٽريمنگ سان اپاچي ڪافڪا ۽ اسٽريمنگ ڊيٽا پروسيسنگ

اڳيون، اسان ڊيٽابيس ۾ هڪ سادي سوال ٺاهيو ته ٽيبل ۾ رڪارڊ جي موجودگي کي جانچڻ لاء ٽرانزيڪشن_flow:

اسپارڪ اسٽريمنگ سان اپاچي ڪافڪا ۽ اسٽريمنگ ڊيٽا پروسيسنگ

ٿڪل

هي آرٽيڪل اسپارڪ اسٽريمنگ استعمال ڪندي معلومات جي اسٽريمنگ پروسيسنگ جو هڪ مثال ڏٺو Apache Kafka ۽ PostgreSQL سان گڏ. مختلف ذريعن کان ڊيٽا جي واڌ سان، اسپارڪ اسٽريمنگ جي عملي قدر کي وڌائڻ ڏکيو آهي اسٽريمنگ ۽ ريئل ٽائيم ايپليڪيشن ٺاهڻ لاءِ.

توھان ڳولي سگھوٿا مڪمل سورس ڪوڊ منھنجي مخزن ۾ GitHub.

مان هن مضمون تي بحث ڪندي خوش آهيان، مان توهان جي تبصرن جو انتظار ڪريان ٿو، ۽ مون کي پڻ اميد آهي ته سڀني پڙهندڙن کان تعميري تنقيد جي.

مان توهان کي ڪاميابي چاهيان ٿو!

پي ايس. شروعات ۾ ان کي مقامي PostgreSQL ڊيٽابيس استعمال ڪرڻ جي رٿابندي ڪئي وئي، پر AWS لاءِ منهنجي محبت ڏني، مون ڊيٽابيس کي ڪلائوڊ ڏانهن منتقل ڪرڻ جو فيصلو ڪيو. هن موضوع تي ايندڙ مضمون ۾، مان ڏيکاريندس ته AWS ۾ مٿي بيان ڪيل سموري سسٽم کي ڪيئن لاڳو ڪجي AWS Kinesis ۽ AWS EMR استعمال ڪندي. خبرن جي تابعداري ڪريو!

جو ذريعو: www.habr.com

تبصرو شامل ڪريو