اسپارڪ اسٽريمنگ سان اپاچي ڪافڪا ۽ اسٽريمنگ ڊيٽا پروسيسنگ
هيلو، حبر! اڄ اسان هڪ سسٽم ٺاهينداسين جيڪو اسپارڪ اسٽريمنگ استعمال ڪندي Apache Kafka ميسيج اسٽريمز تي عمل ڪندو ۽ پروسيسنگ جا نتيجا AWS RDS ڪلائوڊ ڊيٽابيس ۾ لکندو.
اچو ته تصور ڪريون ته هڪ خاص ڪريڊٽ ادارو اسان کي ان جي سڀني شاخن ۾ ايندڙ ٽرانزيڪشن کي پروسيسنگ ڪرڻ جو ڪم مقرر ڪري ٿو. اهو ٿي سگهي ٿو فوري طور تي حساب ڪرڻ جي مقصد لاءِ هڪ کليل ڪرنسي جي پوزيشن لاءِ خزاني، حدن يا مالي نتيجن لاءِ ٽرانزيڪشن وغيره.
جادو ۽ جادو منتر جي استعمال کان سواء هن ڪيس کي ڪيئن لاڳو ڪرڻ - ڪٽ جي هيٺان پڙهو! وڃ!
يقينن، حقيقي وقت ۾ ڊيٽا جي وڏي مقدار کي پروسيسنگ جديد سسٽم ۾ استعمال لاء ڪافي موقعا فراهم ڪري ٿي. ان لاءِ سڀ کان وڌيڪ مشهور مجموعن مان هڪ آهي اپاچي ڪافڪا ۽ اسپارڪ اسٽريمنگ جو ٽينڊم، جتي ڪافڪا ايندڙ پيغامن جي پيڪٽن جو هڪ وهڪرو ٺاهي ٿو، ۽ اسپارڪ اسٽريمنگ انهن پيڪن کي هڪ مقرر وقت جي وقفي تي پروسيس ڪري ٿو.
ايپليڪيشن جي غلطي رواداري کي وڌائڻ لاء، اسان چيڪ پوسٽون استعمال ڪنداسين. هن ميکانيزم سان، جڏهن اسپارڪ اسٽريمنگ انجڻ کي وڃايل ڊيٽا بحال ڪرڻ جي ضرورت آهي، ان کي صرف آخري چيڪ پوائنٽ تي واپس وڃڻو پوندو ۽ اتان کان حساب ڪتاب ٻيهر شروع ڪرڻو پوندو.
ترقي يافته نظام جو فن تعمير
استعمال ٿيل اجزاء:
ايپيڪي ڪيفيڪا هڪ تقسيم پبلش-سبسڪرائب ميسيجنگ سسٽم آهي. ٻنهي آف لائن ۽ آن لائن پيغام واپرائڻ لاء مناسب. ڊيٽا جي نقصان کي روڪڻ لاء، ڪافڪا پيغام ڊسڪ تي ذخيرو ٿيل آهن ۽ ڪلستر جي اندر نقل ٿيل آهن. ڪافڪا سسٽم ZooKeeper synchronization service جي چوٽي تي ٺهيل آهي؛
Apache Spark اسٽريمنگ - اسپارڪ جزو پروسيسنگ اسٽريمنگ ڊيٽا لاءِ. اسپارڪ اسٽريمنگ ماڊل هڪ مائڪرو بيچ آرڪيٽيڪچر استعمال ڪندي ٺاهيو ويو آهي، جتي ڊيٽا اسٽريم کي ننڍڙن ڊيٽا پيڪٽس جي لڳاتار تسلسل طور تشريح ڪئي ويندي آهي. اسپارڪ اسٽريمنگ مختلف ذريعن کان ڊيٽا وٺي ٿي ۽ ان کي ننڍڙن پيڪيجز ۾ گڏ ڪري ٿي. نوان پيڪيجز باقاعده وقفن تي ٺاهيا ويا آهن. هر وقت جي وقفي جي شروعات ۾، هڪ نئون پيڪٽ ٺاهيو ويندو آهي، ۽ انهي وقفي دوران حاصل ڪيل ڊيٽا کي پيڪٽ ۾ شامل ڪيو ويندو آهي. وقفي جي آخر ۾، پيٽ جي واڌ کي روڪي ٿو. وقفي جي ماپ جو اندازو لڳايو ويندو آهي هڪ پيٽرول ذريعي جنهن کي بيچ وقفو سڏيو ويندو آهي.
Apache Spark SQL - اسپارڪ فنڪشنل پروگرامنگ سان لاڳاپيل پروسيسنگ کي گڏ ڪري ٿو. ٺهيل ڊيٽا جو مطلب آهي ڊيٽا جنهن ۾ هڪ اسڪيما آهي، اهو آهي، سڀني رڪارڊ لاء فيلڊ جو هڪ واحد سيٽ. Spark SQL مختلف قسم جي منظم ڊيٽا ذريعن مان ان پٽ کي سپورٽ ڪري ٿو ۽، اسڪيما جي معلومات جي دستيابي جي مهرباني، اهو صرف رڪارڊ جي گهربل شعبن کي موثر طريقي سان ٻيهر حاصل ڪري سگهي ٿو، ۽ ڊيٽا فريم APIs پڻ مهيا ڪري ٿو.
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
ايندڙ قدم اختياري آهي. حقيقت اها آهي ته ڊفالٽ سيٽنگون توهان کي Apache Kafka جي سڀني صلاحيتن کي مڪمل طور تي استعمال ڪرڻ جي اجازت نه ڏيندا آهن. مثال طور، حذف ڪريو ھڪڙو موضوع، ڪيٽيگري، گروپ جنھن ۾ پيغام شايع ڪري سگھجن ٿا. ھن کي تبديل ڪرڻ لاء، اچو ته ترتيب واري فائل کي تبديل ڪريو:
vim ~/kafka/config/server.properties
فائل جي آخر ۾ ھيٺيون شامل ڪريو:
delete.topic.enable = true
ڪافڪا سرور شروع ڪرڻ کان پهريان، توهان کي ZooKeeper سرور شروع ڪرڻ جي ضرورت آهي؛ اسان مددگار اسڪرپٽ استعمال ڪنداسين جيڪو ڪافڪا جي تقسيم سان اچي ٿو:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
ZooKeeper ڪاميابيءَ سان شروع ٿيڻ کان پوءِ، ڪافڪا سرور کي الڳ ٽرمينل ۾ لانچ ڪريو:
اڳيون، موڪلڻ جو طريقو استعمال ڪندي، اسان سرور ڏانهن پيغام موڪليندا آهيون، جنهن موضوع تي اسان کي ضرورت آهي، JSON فارميٽ ۾:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
اسپارڪ MapReduce ماڊل جي مقبول عملن کان بھتر ڪم ڪري ٿو، جڏھن ته حسابن جي قسمن جي وسيع رينج کي سپورٽ ڪري ٿو، بشمول انٽرايڪٽو سوالن ۽ اسٽريم پروسيسنگ. اسپيڊ هڪ اهم ڪردار ادا ڪري ٿي جڏهن ڊيٽا جي وڏي مقدار کي پروسيس ڪري ٿي، ڇو ته اها رفتار آهي جيڪا توهان کي ڪم ڪرڻ جي اجازت ڏئي ٿي بغير ڪنهن منٽ يا ڪلاڪ جي انتظار جي. اسپارڪ جي سڀ کان وڏي طاقت جيڪا ان کي تمام تيز بڻائي ٿي ان جي ميموري حساب ڪتاب ڪرڻ جي صلاحيت آهي.
هي فريم ورڪ اسڪالا ۾ لکيل آهي، تنهنڪري توهان کي پهريان ان کي انسٽال ڪرڻو پوندو:
ڇاڪاڻ ته هي مثال صرف تعليمي مقصدن لاءِ آهي؛ اسان استعمال ڪنداسين مفت سرور ”گهٽ ۾ گهٽ“ (مفت درجو):
اڳيون، اسان مفت ٽائر بلاڪ ۾ هڪ ٽڪ لڳايو، ۽ ان کان پوء اسان کي خودڪار طور تي t2.micro ڪلاس جو هڪ مثال پيش ڪيو ويندو - جيتوڻيڪ ڪمزور، اهو مفت آهي ۽ اسان جي ڪم لاء بلڪل مناسب آهي:
اڳيان اچي ٿو تمام ضروري شيون: ڊيٽابيس جو نالو مثال طور، ماسٽر استعمال ڪندڙ جو نالو ۽ سندس پاسورڊ. اچو ته مثال جو نالو ڏيو: myHabrTest، ماسٽر استعمال ڪندڙ: هبر، پاسورڊ: habr12345 ۽ ايندڙ بٽڻ تي ڪلڪ ڪريو:
ايندڙ صفحي تي اسان جي ڊيٽابيس سرور جي ٻاهران (عوامي رسائي) ۽ بندرگاهن جي دستيابي لاءِ ذميوار آهن:
نئين ٺاهيل گروپ لاءِ پورٽ 5432 لاءِ اندريون ضابطا ڀريو، جيئن هيٺ ڏنل تصوير ۾ ڏيکاريل آهي. توهان دستي طور تي بندرگاهن جي وضاحت نٿا ڪري سگهو، پر قسم جي ڊراپ-ڊائون لسٽ مان PostgreSQL چونڊيو.
سختي سان ڳالهائڻ، قدر ::/0 جو مطلب آهي سرور ڏانهن ايندڙ ٽرئفڪ جي دستيابي سڄي دنيا مان، جيڪو صحيح طور تي مڪمل طور تي درست ناهي، پر مثال جي تجزيو ڪرڻ لاء، اچو ته پاڻ کي هن طريقي سان استعمال ڪرڻ جي اجازت ڏيو:
اڳيون، ڊيٽابيس جي اختيارن ۾ -> ڊيٽابيس جو نالو -> نالو مقرر ڪريو - habrDB.
اسان ڇڏي سگھون ٿا باقي پيرا ميٽرز، سواءِ بيڪ اپ کي غير فعال ڪرڻ (بيڪ اپ برقرار رکڻ جي مدت - 0 ڏينهن)، مانيٽرنگ ۽ پرفارمنس بصيرت، ڊفالٽ طور. بٽڻ تي ڪلڪ ڪريو ڊيٽابيس ٺاهيو:
ڌاڳو سنڀاليندڙ
آخري مرحلي ۾ اسپارڪ جاب جي ترقي ٿيندي، جيڪا هر ٻن سيڪنڊن ۾ ڪافڪا کان ايندڙ نئين ڊيٽا کي پروسيس ڪندي ۽ نتيجن کي ڊيٽابيس ۾ داخل ڪندي.
جيئن مٿي ذڪر ڪيو ويو آهي، چيڪ پوائنٽس اسپارڪ اسٽريمنگ ۾ هڪ بنيادي ميکانيزم آهن جيڪي غلطي رواداري کي يقيني بڻائڻ لاءِ ترتيب ڏيڻ گهرجن. اسان چيڪ پوائنٽس استعمال ڪنداسين ۽، جيڪڏهن طريقيڪار ناڪام ٿئي ٿو، اسپارڪ اسٽريمنگ ماڊل کي صرف آخري چيڪ پوائنٽ تي واپس وڃڻ جي ضرورت پوندي ۽ گم ٿيل ڊيٽا کي بحال ڪرڻ لاءِ ان کان حساب ڪتاب ٻيهر شروع ڪرڻو پوندو.
چيڪ پوائنٽنگ کي ڊاريڪٽري ترتيب ڏيڻ سان چالو ڪري سگھجي ٿو غلطي برداشت ڪندڙ، قابل اعتماد فائل سسٽم (جهڙوڪ HDFS، S3، وغيره) جنهن ۾ چيڪ پوائنٽ جي معلومات محفوظ ڪئي ويندي. اهو استعمال ڪيو ويندو آهي، مثال طور:
streamingContext.checkpoint(checkpointDirectory)
اسان جي مثال ۾، اسان هيٺين طريقي کي استعمال ڪنداسين، يعني، جيڪڏهن checkpointDirectory موجود آهي، ته پوءِ حوالو چيڪ پوائنٽ ڊيٽا مان ٻيهر ٺاهيو ويندو. جيڪڏهن ڊاريڪٽري موجود نه آهي (يعني پهريون ڀيرو عمل ڪيو ويو)، پوء فنڪشن ToCreateContext کي سڏيو ويندو آهي هڪ نئون حوالو ٺاهڻ ۽ ڊي اسٽريمز کي ترتيب ڏيڻ لاءِ:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
اسان KafkaUtils لائبريري جي createDirectStream طريقي کي استعمال ڪندي "ٽرانزيڪشن" موضوع سان ڳنڍڻ لاءِ هڪ DirectStream اعتراض ٺاهيندا آهيون:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
سوال جو متن حاصل ڪرڻ ۽ ان کي اسپارڪ SQL ذريعي هلائڻ:
۽ پوءِ اسان نتيجو مجموعي ڊيٽا کي AWS RDS ۾ ٽيبل ۾ محفوظ ڪريون ٿا. مجموعي نتيجن کي ڊيٽابيس جي ٽيبل تي محفوظ ڪرڻ لاء، اسان ڊيٽا فريم اعتراض جي لکڻ جو طريقو استعمال ڪنداسين:
AWS RDS سان ڪنيڪشن قائم ڪرڻ بابت ڪجھ لفظ. اسان "AWS PostgreSQL کي ترتيب ڏيڻ" قدم تي ان لاءِ يوزر ۽ پاسورڊ ٺاهيو. توھان کي استعمال ڪرڻ گھرجي Endpoint کي ڊيٽابيس سرور url طور، جيڪو ڏيکاريل آھي ڪنيڪشن ۽ سيڪيورٽي سيڪشن ۾:
اسپارڪ ۽ ڪافڪا کي صحيح طريقي سان ڳنڍڻ لاءِ، توهان کي ڪم هلائڻ گهرجي smark-submit ذريعي artifact استعمال ڪندي اسپارڪ-اسٽريمنگ-ڪافڪا-0-8_2.11. اضافي طور تي، اسان پوسٽ گري ايس ايس ايل ڊيٽابيس سان رابطي لاءِ هڪ نمونو پڻ استعمال ڪنداسين؛ اسان انهن کي --packages ذريعي منتقل ڪنداسين.
اسڪرپٽ جي لچڪداريءَ لاءِ، اسان ان پٽ پيراميٽر جي طور تي پيغام جي سرور جو نالو ۽ موضوع جنهن مان ڊيٽا حاصل ڪرڻ چاهيون ٿا شامل ڪنداسين.
تنهن ڪري، اهو وقت شروع ڪرڻ ۽ سسٽم جي ڪارڪردگي کي جانچڻ جو وقت آهي:
سڀ ڪجھ ڪم ڪيو! جيئن توهان هيٺ ڏنل تصوير ۾ ڏسي سگهو ٿا، جڏهن ته ايپليڪيشن هلندي آهي، نئين مجموعي جا نتيجا هر 2 سيڪنڊن ۾ نڪرندا آهن، ڇاڪاڻ ته اسان بيچنگ وقفي کي 2 سيڪنڊن تي سيٽ ڪيو جڏهن اسان StreamingContext اعتراض ٺاهيو:
اڳيون، اسان ڊيٽابيس ۾ هڪ سادي سوال ٺاهيو ته ٽيبل ۾ رڪارڊ جي موجودگي کي جانچڻ لاء ٽرانزيڪشن_flow:
ٿڪل
هي آرٽيڪل اسپارڪ اسٽريمنگ استعمال ڪندي معلومات جي اسٽريمنگ پروسيسنگ جو هڪ مثال ڏٺو Apache Kafka ۽ PostgreSQL سان گڏ. مختلف ذريعن کان ڊيٽا جي واڌ سان، اسپارڪ اسٽريمنگ جي عملي قدر کي وڌائڻ ڏکيو آهي اسٽريمنگ ۽ ريئل ٽائيم ايپليڪيشن ٺاهڻ لاءِ.