ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز" سلام، خابرو شهرين! هي ڪتاب ڪنهن به ڊولپر لاءِ موزون آهي جيڪو ٿريڊ پروسيسنگ کي سمجهڻ چاهي ٿو. ورهايل پروگرامنگ کي سمجهڻ توهان کي ڪافڪا ۽ ڪافڪا اسٽريمز کي بهتر سمجهڻ ۾ مدد ڏيندو. اهو سٺو لڳندو ته ڪافڪا فريم ورڪ پاڻ کي ڄاڻڻ، پر اهو ضروري ناهي: مان توهان کي سڀ ڪجهه ٻڌائيندس جيڪو توهان جي ضرورت آهي. تجربيڪار ڪافڪا ڊولپرز ۽ نوان نوان هڪجهڙا سکندا ته ڪيئن دلچسپ اسٽريم پروسيسنگ ايپليڪيشنون ٺاهيون هن ڪتاب ۾ ڪافڪا اسٽريمز لائبريري استعمال ڪندي. وچولي ۽ ترقي يافته جاوا ڊولپرز اڳ ۾ ئي سيريلائيزيشن جي تصورن کان واقف آهن ڪافڪا اسٽريمز ايپليڪيشن ٺاهڻ لاءِ پنهنجون صلاحيتون لاڳو ڪرڻ سکندا. ڪتاب جو سورس ڪوڊ جاوا 8 ۾ لکيو ويو آهي ۽ جاوا 8 ليمبڊا ايڪسپريس نحو جو اهم استعمال ڪري ٿو، تنهن ڪري اهو ڄاڻڻ ته ڪيئن ڪم ڪجي لامبڊا افعال سان (جيتوڻيڪ ڪنهن ٻئي پروگرامنگ ٻولي ۾) ڪم ۾ ايندو.

اقتباس. 5.3. مجموعو ۽ ونڊو آپريشن

هن حصي ۾، اسان اڳتي وڌنداسين ڪافڪا اسٽريمز جي سڀ کان وڌيڪ ترقي يافته حصن کي ڳولڻ لاء. هن وقت تائين اسان ڪافڪا اسٽريمز جي هيٺين پهلوئن کي ڍڪي ڇڏيو آهي:

  • پروسيسنگ ٽوپولوجي ٺاهڻ؛
  • اسٽريمنگ ايپليڪيشنن ۾ رياست استعمال ڪندي؛
  • ڪارڪردگي ڊيٽا وهڪرو ڪنيڪشن؛
  • ايونٽ اسٽريمز (KStream) ۽ اپڊيٽ اسٽريمز (KTable) جي وچ ۾ فرق.

هيٺين مثالن ۾ اسين انهن سڀني عنصرن کي گڏ ڪنداسين. توھان ونڊونگ جي باري ۾ پڻ سکندا، اسٽريمنگ ايپليڪيشنن جي ٻي وڏي خصوصيت. اسان جو پهريون مثال هڪ سادي مجموعو هوندو.

5.3.1. انڊسٽري شعبي طرفان اسٽاڪ سيلز جو مجموعو

گڏ ڪرڻ ۽ گروپنگ اهم اوزار آهن جڏهن اسٽريمنگ ڊيٽا سان ڪم ڪري رهيا آهن. انفرادي رڪارڊ جو امتحان جيئن ته اهي وصول ڪيا ويا آهن اڪثر ڪافي نه آهن. ڊيٽا مان اضافي معلومات ڪڍڻ لاء، اهو ضروري آهي ته انهن کي گروپ ۽ گڏ ڪرڻ.

هن مثال ۾، توهان هڪ ڏينهن جي واپار جي لباس تي رکون ٿا جيڪو ڪيترن ئي صنعتن ۾ ڪمپنين جي اسٽاڪ جي سيلز جي مقدار کي ٽريڪ ڪرڻ جي ضرورت آهي. خاص طور تي، توهان پنجن ڪمپنين ۾ دلچسپي وٺندا آهيو هر صنعت ۾ سڀ کان وڏي شيئر سيلز سان.

اهڙي مجموعي کي گهربل شڪل ۾ ڊيٽا کي ترجمو ڪرڻ لاءِ هيٺين ڪيترن ئي قدمن جي ضرورت پوندي (عام اصطلاحن ۾ ڳالهائڻ).

  1. هڪ موضوع تي ٻڌل ذريعو ٺاهيو جيڪو خام اسٽاڪ واپاري معلومات شايع ڪري ٿو. اسان کي ھڪڙي شئي جو نقشو ڏيڻو پوندو اسٽاڪ ٽرانزيڪشن جي ھڪڙي شئي قسم جي شئي حجم سان. نقطو اهو آهي ته اسٽاڪ ٽرانزيڪشن اعتراض سيلز ميٽا ڊيٽا تي مشتمل آهي، پر اسان کي صرف وڪرو ٿيڻ واري شيئرن جي تعداد بابت ڊيٽا جي ضرورت آهي.
  2. اسٽاڪ جي علامت طرفان گروپ شيئر حجم ڊيٽا. هڪ دفعو علامت سان گڏ ڪيل، توهان هن ڊيٽا کي ختم ڪري سگهو ٿا اسٽاڪ سيلز جي مقدار جي ذيلي مجموعي ۾. اها ڳالهه نوٽ ڪرڻ جي قابل آهي ته KStream.groupBy طريقو KGroupedStream قسم جو هڪ مثال ڏي ٿو. ۽ توهان KTable مثال حاصل ڪري سگهو ٿا وڌيڪ ڪال ڪري KGroupedStream.reduce طريقو.

KGroupedStream انٽرفيس ڇا آهي

KStream.groupBy ۽ KStream.groupByKey طريقا KGroupedStream جو هڪ مثال ڏي ٿو. KGroupedStream هڪ وچولي نمائندگي آهي واقعن جي هڪ وهڪري جي ڪنجي ذريعي گروپ ڪرڻ کان پوءِ. اهو سڀ ڪجهه ان سان سڌو ڪم ڪرڻ جو ارادو نه آهي. ان جي بدران، KGroupedStream استعمال ڪيو ويندو آهي مجموعي عملن لاءِ، جنهن جي نتيجي ۾ هميشه KTable ٿئي ٿي. ۽ جيئن ته مجموعي عملن جو نتيجو هڪ KTable آهي ۽ اهي هڪ رياستي اسٽور استعمال ڪندا آهن، اهو ممڪن آهي ته نتيجو طور تي سڀئي تازه ڪاريون پائپ لائن کي وڌيڪ نه موڪليا وڃن.

KTable.groupBy طريقو ساڳيو KGroupedTable ڏي ٿو - تازه ڪاري جي اسٽريم جي وچولي نمائندگي، ڪي جي ذريعي ٻيهر منظم ٿيل.

اچو ته هڪ مختصر وقفو وٺون ۽ تصوير کي ڏسو. 5.9، جيڪو ڏيکاري ٿو جيڪو اسان حاصل ڪيو آهي. هي ٽوپولوجي اڳ ۾ ئي توهان کي تمام گهڻو واقف هجڻ گهرجي.

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"
اچو ته ھاڻي ھن ٽوپولوجي لاءِ ڪوڊ ڏسو (اھو فائل src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) ۾ ڳولي سگھجي ٿو (فھرست 5.2).

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"
ڏنل ڪوڊ ان جي اختصار ۽ ڪيترن ئي لائينن ۾ ڪيل عملن جي وڏي مقدار جي لحاظ کان ممتاز آهي. توهان Builder.stream طريقي جي پهرين پيٽرولر ۾ ڪا نئين شيءِ محسوس ڪري سگهو ٿا: Enum قسم جو هڪ قدر AutoOffsetReset.EARLIEST (هڪ تازو پڻ آهي)، Consumed.withOffsetResetPolicy طريقو استعمال ڪندي سيٽ ڪريو. هن ڳڻپ جو قسم هر KStream يا KTable لاءِ هڪ آفسٽ ري سيٽ حڪمت عملي بيان ڪرڻ لاءِ استعمال ڪري سگهجي ٿو ۽ ترتيب ڏنل آفسٽ ري سيٽ آپشن تي اوليت رکي ٿو.

GroupByKey ۽ GroupBy

KStream انٽرفيس جا ٻه طريقا آهن رڪارڊ گروپ ڪرڻ لاءِ: GroupByKey ۽ GroupBy. ٻئي هڪ KGrouped Table واپس آڻيندا، تنهنڪري توهان حيران ٿي رهيا هوندا ته انهن ۾ ڪهڙو فرق آهي ۽ ڪهڙي وقت استعمال ڪجي؟

GroupByKey طريقو استعمال ڪيو ويندو آهي جڏهن KStream ۾ ڪنجيون اڳ ۾ ئي غير خالي آهن. ۽ سڀ کان اهم، "ٻيهر ورهاڱي جي ضرورت آهي" پرچم ڪڏهن به مقرر نه ڪيو ويو.

GroupBy طريقو فرض ڪري ٿو ته توھان گروپنگ ڪيز کي تبديل ڪيو آھي، تنھنڪري ورھاڱي واري جھنڊو صحيح تي مقرر ڪيو ويو آھي. گروپ بائي طريقي کان پوءِ شامل ٿيڻ، مجموعن وغيره کي انجام ڏيڻ جي نتيجي ۾ خودڪار ٻيهر ورهاڱي ٿيندي.
خلاصو: جڏهن به ممڪن هجي، توهان کي استعمال ڪرڻ گهرجي GroupByKey بجاءِ GroupBy.

اهو واضح آهي ته mapValues ​​۽ GroupBy طريقا ڇا ڪندا آهن، پوءِ اچو ته sum() طريقي تي هڪ نظر وجهون (src/main/java/bbejeck/model/ShareVolume.java) (فهرست 5.3) ۾.

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"
ShareVolume.sum طريقو اسٽاڪ سيلز جي مقدار جي هلندڙ مجموعي کي واپس ڪري ٿو، ۽ حساب جي پوري زنجير جو نتيجو KTable اعتراض آهي . ھاڻي توھان سمجھو ٿا ته KTable جو ڪردار ادا ڪري ٿو. جڏهن ShareVolume شيون اچي وڃن ٿيون، ته لاڳاپيل KTable اعتراض تازو موجوده تازه ڪاري کي محفوظ ڪري ٿو. اهو ياد رکڻ ضروري آهي ته سڀئي تازه ڪاريون پوئين شيئر وولوم ڪي ٽيبل ۾ ظاهر ٿيل آهن، پر سڀ ڪجهه وڌيڪ نه موڪليا ويا آهن.

اڳيون، هن KTable کي استعمال ڪندي، اسان مجموعي طور تي پنجن ڪمپنين تائين پهچون ٿا (واپاري ٿيل شيئرز جي تعداد جي حساب سان) هر صنعت ۾ سڀ کان وڌيڪ حصص جي واپار سان. هن معاملي ۾ اسان جا عمل پهرين مجموعي لاءِ ساڳيا هوندا.

  1. ٻئي گروپ ذريعي آپريشن ڪريو انفرادي شيئر ووليم شين کي گروپ ڪرڻ لاءِ انڊسٽري طرفان.
  2. ShareVolume شين جو خلاصو شروع ڪريو. هن ڀيري مجموعي اعتراض هڪ مقرر ٿيل سائيز جي ترجيحي قطار آهي. هن مقرر ٿيل سائيز جي قطار ۾، صرف پنج ڪمپنيون رکيل آهن جن ۾ سڀ کان وڌيڪ شيئر وڪرو ڪيا ويا آهن.
  3. پوئين پيراگراف کان قطارن کي ھڪڙي اسٽرنگ ويليو ڏانھن نقشو ٺاھيو ۽ صنعت جي لحاظ کان نمبر جي لحاظ کان مٿين پنجن سڀ کان وڌيڪ واپار واري اسٽاڪ کي واپس ڪريو.
  4. نتيجن کي اسٽرنگ فارم ۾ موضوع تي لکو.

تصوير ۾. شڪل 5.10 ڏيکاري ٿو ڊيٽا جي وهڪري جي ٽوپولوجي گراف. جئين توهان ڏسي سگهو ٿا، پروسيسنگ جو ٻيو دور بلڪل سادو آهي.

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"
هاڻي ته اسان کي پروسيسنگ جي هن ٻئي دور جي ساخت جي واضح ڄاڻ آهي، اسان ان جي سورس ڪوڊ ڏانهن رخ ڪري سگهون ٿا (توهان ان کي فائل ۾ ڳوليندا src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (فهرست 5.4) .

هي شروعاتي ڪندڙ هڪ مقرر ڪيل قطار متغير تي مشتمل آهي. هي هڪ ڪسٽم اعتراض آهي جيڪو java.util.TreeSet لاءِ هڪ اڊاپٽر آهي جيڪو مٿين N نتيجن کي ٽريڪ ڪرڻ لاءِ استعمال ڪيو ويندو آهي واپار ڪيل شيئرز جي هيٺئين ترتيب ۾.

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"
توھان اڳي ئي گروپ بائي ۽ mapValues ​​ڪالون ڏٺيون آھن، تنھنڪري اسين انھن ۾ نه وينداسين (اسين KTable.toStream طريقي کي سڏي رھيا آھيون ڇاڪاڻ ته KTable.print جو طريقو ختم ٿيل آھي). پر توهان اڃا تائين aggregate() جو KTable ورجن نه ڏٺو آهي، تنهنڪري اسان ان تي بحث ڪرڻ ۾ ٿورو وقت گذارينداسين.

جيئن توهان کي ياد آهي، جيڪا KTable کي مختلف بڻائي ٿي اها آهي ته ساڳين چاٻين سان رڪارڊ کي اپڊيٽ سمجهيو وڃي ٿو. KTable پراڻي داخلا کي نئين سان مٽائي ٿو. مجموعي طور تي ساڳئي طريقي سان ٿئي ٿي: ساڳئي چاٻي سان گڏ تازو رڪارڊ گڏ ڪيا ويا آهن. جڏهن هڪ رڪارڊ اچي ٿو، اهو هڪ اضافو استعمال ڪندي FixedSizePriorityQueue ڪلاس مثال ۾ شامل ڪيو ويندو آهي (مجموعي طريقي واري ڪال ۾ ٻيو پيٽرولر)، پر جيڪڏهن ٻيو رڪارڊ اڳ ۾ ئي موجود آهي ساڳئي ڪيچ سان، پوء پراڻي رڪارڊ کي هٽائي ڇڏيو آهي هڪ ذيلي ذخيرو استعمال ڪندي (ٽيون پيٽرولر ۾. مجموعي طريقو ڪال).

هن سڀني جو مطلب اهو آهي ته اسان جو جمع ڪندڙ، FixedSizePriorityQueue، سڀني قدرن کي هڪ ڪنجي سان گڏ نه ٿو ڪري، پر اسٽاڪ جي N سڀ کان وڌيڪ واپار واري قسم جي مقدار جي مقدار کي گڏ ڪري ٿو. هر ايندڙ داخلا شامل آهي مجموعي تعداد ۾ وڪرو ٿيل حصص جو هينئر تائين. KTable توهان کي معلومات ڏيندو ته ڪهڙن ڪمپنين جا شيئر هن وقت سڀ کان وڌيڪ واپار ڪيا ويا آهن، هر اپڊيٽ جي رولنگ ايگريگيشن جي ضرورت کان سواءِ.

اسان ٻه اهم شيون ڪرڻ سکيو:

  • KTable ۾ گروپ جي قدرن کي هڪ عام ڪنجي ذريعي؛
  • ڪارائتو عمل انجام ڏيو جيئن رول اپ ۽ گڏ ڪرڻ انهن گروهه ٿيل قدرن تي.

ڄاڻڻ ضروري آهي ته انهن عملن کي ڪيئن انجام ڏنو وڃي ڊيٽا جي معنى کي سمجھڻ لاءِ ڪافڪا اسٽريمز ايپليڪيشن ذريعي منتقل ٿي رهيو آهي ۽ اهو سمجهڻ ضروري آهي ته اها ڪهڙي معلومات رکي ٿي.

اسان هن ڪتاب ۾ اڳ ۾ بحث ڪيل ڪجهه اهم تصورن کي گڏ ڪيو آهي. باب 4 ۾، اسان بحث ڪيو ته ڪيئن غلطي برداشت ڪندڙ، مقامي رياست اسٽريمنگ ايپليڪيشن لاءِ اهم آهي. ھن باب ۾ پھريون مثال ڏيکاريو ويو آھي ڇو مقامي رياست ايتري اھم آھي- اھو توھان کي قابليت ڏئي ٿو توھان کي ٽريڪ رکڻ جي جيڪا معلومات توھان اڳ ۾ ئي ڏٺي آھي. مقامي رسائي نيٽ ورڪ جي دير کان بچي ٿي، ايپليڪيشن کي وڌيڪ ڪارڪردگي ۽ غلطي جي مزاحمتي بڻائي ٿي.

جڏهن ڪنهن به رول اپ يا مجموعي آپريشن کي انجام ڏيو، توهان کي رياستي اسٽور جو نالو بيان ڪرڻ گهرجي. رول اپ ۽ ايگريگيشن عمل KTable جو مثال ڏئي ٿو، ۽ KTable پراڻي نتيجن کي نئين سان تبديل ڪرڻ لاءِ اسٽيٽ اسٽوريج استعمال ڪري ٿو. جيئن توهان ڏٺو آهي، نه سڀئي تازه ڪاريون پائپ لائن هيٺ موڪليا ويا آهن، ۽ اهو اهم آهي ڇاڪاڻ ته مجموعي آپريشنز ٺهيل آهن خلاصو معلومات پيدا ڪرڻ لاءِ. جيڪڏهن توهان مقامي رياست کي لاڳو نه ڪيو، KTable سڀني مجموعي ۽ رول اپ نتيجن کي اڳتي وڌائيندو.

اڳيون، اسين عملن کي ڏسنداسون جيئن ته هڪ مخصوص وقت جي اندر گڏ ٿيڻ، جنهن کي ونڊونگ آپريشن سڏيو ويندو آهي.

5.3.2. ونڊو آپريشن

پوئين حصي ۾، اسان متعارف ڪرايو سلائيڊنگ ڪنوولوشن ۽ مجموعي. ائپليڪيشن اسٽاڪ سيلز جو مسلسل رول اپ ڪيو جنهن بعد ايڪسچينج تي پنجن سڀ کان وڌيڪ واپار ٿيل اسٽاڪ جو مجموعو.

ڪڏهن ڪڏهن اهڙي مسلسل مجموعي ۽ نتيجن جي رول اپ ضروري آهي. ۽ ڪڏهن ڪڏهن توهان کي آپريشن ڪرڻ جي ضرورت آهي صرف ڏنل وقت جي دوران. مثال طور، اندازو لڳايو ته گذريل 10 منٽن ۾ هڪ خاص ڪمپني جي شيئرز سان ڪيترا مٽا سٽا ٽرانزيڪشن ڪيا ويا. يا گذريل 15 منٽن ۾ ڪيترا صارفين هڪ نئين اشتهاري بينر تي ڪلڪ ڪيو. هڪ ايپليڪيشن شايد اهڙا عمل ڪيترائي ڀيرا انجام ڏئي سگهي ٿي، پر نتيجن سان جيڪي صرف مخصوص وقتن تي لاڳو ٿين ٿيون (ٽائم ونڊوز).

خريد ڪندڙ طرفان مٽا سٽا جي ڳڻپ

ايندڙ مثال ۾، اسين ڪيترن ئي واپارين جي وچ ۾ اسٽاڪ ٽرانزيڪشن کي ٽريڪ ڪنداسين- يا ته وڏيون تنظيمون يا سمارٽ انفرادي فنانسرز.

هن ٽريڪنگ جا ٻه ممڪن سبب آهن. انهن مان هڪ کي ڄاڻڻ جي ضرورت آهي ته مارڪيٽ جا اڳواڻ ڇا خريد ڪري رهيا آهن / وڪرو ڪري رهيا آهن. جيڪڏهن اهي وڏا رانديگر ۽ نفيس سيڙپڪار موقعو ڏسن ٿا، اهو انهن جي حڪمت عملي جي پيروي ڪرڻ جو احساس آهي. ٻيو سبب غير قانوني اندروني واپار جي ڪنهن به ممڪن نشانين کي ڳولڻ جي خواهش آهي. هن کي ڪرڻ لاء، توهان کي اهم پريس ريليز سان وڏي سيلز اسپيڪس جي باهمي تعلق جو تجزيو ڪرڻو پوندو.

اهڙي ٽريڪنگ هيٺين مرحلن تي مشتمل آهي:

  • اسٽاڪ ٽرانزيڪشن جي موضوع مان پڙهڻ لاء هڪ وهڪرو ٺاهڻ؛
  • خريداري جي سڃاڻپ ۽ اسٽاڪ جي علامت طرفان ايندڙ رڪارڊ گروپن. گروپ بائي طريقي کي ڪال ڪرڻ سان KGroupedStream ڪلاس جو هڪ مثال واپس اچي ٿو.
  • KGroupedStream.windowedBy طريقو هڪ ڊيٽا اسٽريم کي واپس ڏئي ٿو جيڪو ٽائم ونڊو تائين محدود آهي، جيڪا اجازت ڏئي ٿي ونڊو گڏ ڪرڻ جي. ونڊو جي قسم تي مدار رکندي، يا ته هڪ TimeWindowedKStream يا هڪ SessionWindowedKStream واپس ڪيو ويندو؛
  • مجموعي آپريشن لاءِ ٽرانزيڪشن ڳڻپ. ونڊو ٿيل ڊيٽا جو وهڪرو اهو طئي ڪري ٿو ته ڇا هڪ خاص رڪارڊ هن ڳڻپ ۾ رکيو ويو آهي؛
  • نتيجن کي هڪ موضوع تي لکڻ يا ترقي جي دوران انهن کي ڪنسول ڏانهن ڪڍڻ.

هن ايپليڪيشن جي ٽوپولوجي سادو آهي، پر ان جي هڪ واضح تصوير مددگار ثابت ٿيندي. اچو ته تصوير تي هڪ نظر وجهون. 5.11.

اڳيون، اسان ونڊو عملن جي ڪارڪردگي ۽ لاڳاپيل ڪوڊ تي نظر ڪنداسين.

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"

ونڊو جا قسم

ڪافڪا اسٽريمز ۾ ونڊوز جا ٽي قسم آهن:

  • سيشنل؛
  • "ٽمبلنگ" (ٽمبلنگ)؛
  • ڇڪڻ / ڇڪڻ.

جنهن کي چونڊڻ لاءِ توهان جي ڪاروباري گهرجن تي منحصر آهي. ٽمبلنگ ۽ جمپنگ ونڊوز وقت تائين محدود هونديون آهن، جڏهن ته سيشن ونڊوز محدود هونديون آهن صارف جي سرگرمي طرفان- سيشن جو مدو صرف ان حساب سان طئي ڪيو ويندو آهي ته صارف ڪيترو سرگرم آهي. ياد رکڻ لاءِ بنيادي شيءِ اها آهي ته سڀني ونڊو جا قسم داخلائن جي تاريخ/ٽائم اسٽيمپ تي ٻڌل آهن، نه سسٽم جي وقت تي.

اڳيون، اسان ونڊو جي هر قسم سان اسان جي ٽوپولوجي کي لاڳو ڪريون ٿا. مڪمل ڪوڊ صرف پهرين مثال ۾ ڏنو ويندو؛ ونڊوز جي ٻين قسمن لاءِ ونڊو آپريشن جي قسم کان سواءِ ڪجھ به تبديل نه ٿيندو.

سيشن ونڊوز

سيشن ونڊوز ٻين سڀني قسمن جي ونڊوز کان بلڪل مختلف آهن. اهي محدود نه آهن وقت جي لحاظ کان ايتري قدر جو صارف جي سرگرمي (يا ان اداري جي سرگرمي جنهن کي توهان ٽريڪ ڪرڻ چاهيو ٿا). سيشن ونڊوز غيرفعاليت جي مدي تائين محدود آهن.

شڪل 5.12 سيشن ونڊوز جي تصور کي بيان ڪري ٿو. ننڍو سيشن ان جي کاٻي پاسي واري سيشن سان ملندو. ۽ ساڄي پاسي واري سيشن الڳ ٿي ويندي ڇاڪاڻ ته اهو غير فعال ٿيڻ جي ڊگهي عرصي جي پٺيان آهي. سيشن ونڊوز استعمال ڪندڙ جي سرگرمي تي ٻڌل آهن، پر داخلان مان تاريخ/ٽائم اسٽام استعمال ڪريو اهو طئي ڪرڻ لاءِ ته داخلا ڪهڙي سيشن سان تعلق رکي ٿي.

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"

اسٽاڪ ٽرانزيڪشن کي ٽريڪ ڪرڻ لاء سيشن ونڊوز استعمال ڪندي

اچو ته استعمال ڪريون سيشن ونڊوز مٽا سٽا واري ٽرانزيڪشن بابت معلومات حاصل ڪرڻ لاءِ. سيشن ونڊوز تي عمل درآمد لسٽنگ 5.5 ۾ ڏيکاريو ويو آهي (جيڪو ڳولهي سگهجي ٿو src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"
توھان اڳ ۾ ئي ھن ٽوپولوجي ۾ گھڻا آپريشن ڏٺا آھن، تنھنڪري انھن کي ھتي ڏسڻ جي ضرورت ناھي. پر هتي پڻ ڪيترائي نوان عنصر آهن، جن تي اسان هاڻي بحث ڪنداسين.

ڪو به گروپ بائي آپريشن عام طور تي ڪجهه قسم جي مجموعي آپريشن (مجموعي، رول اپ، يا ڳڻپ) انجام ڏئي ٿو. توھان ھلائي سگھوٿا يا ته مجموعي مجموعي سان گڏ ھلندڙ مجموعي، يا ونڊو جمع، جيڪو ھڪڙي مخصوص وقت جي ونڊو اندر اڪائونٽ رڪارڊ ۾ وٺندو آھي.

لسٽنگ 5.5 ۾ ڪوڊ سيشن ونڊوز ۾ ٽرانزيڪشن جو تعداد شمار ڪري ٿو. تصوير ۾. 5.13 انهن عملن جو تجزيو ڪيو وڃي ٿو قدم بہ قدم.

فون ڪندي windowedBy(SessionWindows.with(twentySeconds).until(20Minutes)) اسان هڪ سيشن ونڊو ٺاهيندا آهيون جنهن ۾ 15 سيڪنڊن جي غير فعالي وقفي ۽ 20 منٽن جي مسلسل وقفي سان. 20 سيڪنڊن جي بيڪار وقفي جو مطلب آهي ته ايپليڪيشن ۾ ڪا به داخلا شامل هوندي جيڪا موجوده (فعال) سيشن ۾ موجوده سيشن جي پڄاڻي يا شروعات جي XNUMX سيڪنڊن اندر اچي ٿي.

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"
اڳيون، اسان بيان ڪريون ٿا ته سيشن ونڊو ۾ ڪھڙي مجموعي آپريشن کي انجام ڏيڻ جي ضرورت آھي - ھن صورت ۾، ڳڻيو. جيڪڏهن هڪ اچڻ واري داخلا غير فعال ٿيڻ واري ونڊو کان ٻاهر پوي ٿي (تاريخ/ٽائم اسٽمپ جي ٻئي پاسي)، ايپليڪيشن هڪ نئون سيشن ٺاهي ٿي. برقرار رکڻ جي وقفي جو مطلب آهي هڪ خاص وقت لاء هڪ سيشن کي برقرار رکڻ ۽ دير سان ڊيٽا جي اجازت ڏئي ٿو جيڪا سيشن جي غير فعالي جي مدت کان اڳتي وڌندي آهي پر اڃا تائين ڳنڍيل ٿي سگهي ٿو. اضافي طور تي، نئين سيشن جي شروعات ۽ پڄاڻي ضم ٿيڻ جي نتيجي ۾ ابتدائي ۽ تازي تاريخ/ٽائم اسٽمپ سان لاڳاپيل آهي.

اچو ته ڳڻپ جي طريقي مان ڪجھ داخلائن کي ڏسون ته سيشن ڪيئن ڪم ڪن ٿا (ٽيبل 5.1).

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"
جڏهن رڪارڊ اچي وڃن ٿا، اسان موجوده سيشن کي ساڳئي ڪيئي سان ڳوليون ٿا، هڪ آخري وقت موجوده تاريخ/ٽائم اسٽمپ کان گهٽ آهي - غير فعال ٿيڻ وارو وقفو، ۽ هڪ شروعاتي وقت موجوده تاريخ/ٽائم اسٽيمپ + غير فعالي وقفي کان وڌيڪ. انهي کي مدنظر رکندي، ٽيبل مان چار داخلائون. 5.1 ھيٺ ڏنل ھڪڙي ھڪڙي سيشن ۾ ضم ٿي ويا آھن.

1. رڪارڊ 1 پھرين اچي ٿو، تنھنڪري شروعاتي وقت ختم ٿيڻ واري وقت جي برابر آھي ۽ 00:00:00 آھي.

2. اڳيون، داخلا 2 اچي ٿو، ۽ اسين سيشن ڳوليون ٿا جيڪي 23:59:55 کان اڳ ختم نه ٿين ۽ 00:00:35 کان اڳ نه شروع ٿين. اسان رڪارڊ 1 ڳوليون ٿا ۽ سيشن 1 ۽ 2 کي گڏ ڪريون ٿا. اسان سيشن 1 جو شروعاتي وقت (اڳوڻي) ۽ سيشن 2 جو آخري وقت (بعد ۾) وٺون ٿا، ته جيئن اسان جو نئون سيشن 00:00:00 تي شروع ٿئي ۽ 00 تي ختم ٿئي: 00:15.

3. رڪارڊ 3 اچي ٿو، اسان 00:00:30 ۽ 00:01:10 جي وچ ۾ سيشن ڳوليندا آهيون ۽ نه ڳوليندا آهيون. اهم 123-345-654،FFBE، 00:00:50 تي شروع ۽ ختم ٿيڻ لاءِ ٻيو سيشن شامل ڪريو.

4. رڪارڊ 4 اچي ٿو ۽ اسان 23:59:45 ۽ 00:00:25 جي وچ ۾ سيشن ڳولي رهيا آهيون. هن ڀيري ٻئي سيشن 1 ۽ 2 مليا آهن. سڀ ٽي سيشن هڪ ۾ ملن ٿا، 00:00:00 جي شروعاتي وقت ۽ 00:00:15 جي آخري وقت سان.

هن حصي ۾ بيان ڪيو ويو آهي، ان کي هيٺين اهم nuances ياد ڪرڻ جي قابل آهي:

  • سيشن مقرر ٿيل سائيز ونڊوز نه آهن. سيشن جو عرصو مقرر ٿيل وقت جي اندر سرگرمي جي ذريعي مقرر ڪيو ويو آهي.
  • ڊيٽا ۾ تاريخ/ٽائم اسٽامون اهو طئي ڪن ٿيون ته واقعو موجوده سيشن جي اندر ٿئي ٿو يا بيڪار عرصي دوران.

اڳيون اسان ايندڙ قسم جي ونڊو تي بحث ڪنداسين - "ٽمبلنگ" ونڊوز.

"ٽمبلنگ" ونڊوز

ٽمبلنگ ونڊوز واقعن کي پڪڙي ٿو جيڪي ڪجهه وقت جي اندر ٿين ٿا. تصور ڪريو ته توهان کي هر 20 سيڪنڊن ۾ هڪ خاص ڪمپني جي سڀني اسٽاڪ ٽرانزيڪشن کي پڪڙڻ جي ضرورت آهي، تنهنڪري توهان ان عرصي دوران سڀني واقعن کي گڏ ڪيو. 20 سيڪنڊن جي وقفي جي آخر ۾، ونڊو ڦري ٿي ۽ نئين 20 سيڪنڊن جي مشاهدي واري وقفي تي هلي ٿي. شڪل 5.14 هن صورتحال کي بيان ڪري ٿو.

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"
جئين توهان ڏسي سگهو ٿا، آخري 20 سيڪنڊن ۾ حاصل ڪيل سڀئي واقعا ونڊو ۾ شامل آهن. وقت جي هن دور جي آخر ۾، هڪ نئين ونڊو ٺاهي وئي آهي.

لسٽنگ 5.6 ڪوڊ ڏيکاري ٿو جيڪو هر 20 سيڪنڊن ۾ اسٽاڪ ٽرانزيڪشن کي پڪڙڻ لاءِ ٽمبلنگ ونڊوز جي استعمال کي ظاهر ڪري ٿو (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java ۾ مليو).

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"
TimeWindows.of ميٿڊ ڪال ۾ ھن ننڍڙي تبديلي سان، توھان استعمال ڪري سگھو ٿا ٽمبنگ ونڊو. هي مثال جيستائين () طريقي کي نه سڏيندو آهي، تنهنڪري 24 ڪلاڪن جو ڊفالٽ برقرار رکڻ وارو وقفو استعمال ڪيو ويندو.

آخرڪار، اهو وقت آهي ونڊو اختيارن جي آخري ڏانهن وڃڻ جو - "هاپنگ" ونڊوز.

سلائيڊنگ ("جمپنگ") ونڊوز

سلائيڊنگ/هاپنگ ونڊوز ٽمبلنگ ونڊوز جهڙيون آهن، پر ٿوري فرق سان. تازو واقعن تي عمل ڪرڻ لاءِ نئين ونڊو ٺاهڻ کان اڳ سلائيڊنگ ونڊوز وقت جي وقفي جي آخر تائين انتظار نه ڪندا آهن. اهي ونڊو جي مدي کان گهٽ انتظار جي وقفي کان پوءِ نوان حساب ڪتاب شروع ڪندا آهن.

ٽمبلنگ ۽ جمپنگ ونڊوز جي وچ ۾ فرق کي واضع ڪرڻ لاءِ، اچو ته ڳڻپ جي اسٽاڪ ايڪسچينج ٽرانزيڪشن جي مثال ڏانهن موٽون. اسان جو مقصد اڃا تائين ٽرانزيڪشن جي تعداد کي ڳڻڻ آهي، پر اسان نه ٿا چاهيون ته ڪائونٽر کي اپڊيٽ ڪرڻ کان اڳ پوري وقت جو انتظار ڪريو. ان جي بدران، اسين مختصر وقفن تي ڪائونٽر کي تازه ڪاري ڪنداسين. مثال طور، اسان اڃا تائين هر 20 سيڪنڊن ۾ ٽرانزيڪشن جو تعداد شمار ڪنداسين، پر ڪائونٽر کي هر 5 سيڪنڊن ۾ تازه ڪاري ڪندا، جيئن تصوير ۾ ڏيکاريل آهي. 5.15. انهي حالت ۾، اسان ختم ڪريون ٿا ٽن نتيجن ونڊوز سان گڏ اوورليپنگ ڊيٽا سان.

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"
لسٽنگ 5.7 سلائيڊنگ ونڊوز جي وضاحت لاءِ ڪوڊ ڏيکاري ٿو (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java ۾ مليا).

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"
ايڊوانس بائي () ميٿڊ ۾ ڪال شامل ڪندي ٽمبلنگ ونڊو کي هٽائي ونڊو ۾ تبديل ڪري سگھجي ٿو. ڏيکاريل مثال ۾، بچت جو وقفو 15 منٽ آهي.

توهان هن حصي ۾ ڏٺو ته ڪيئن مجموعي نتيجن کي ٽائيم ونڊوز تائين محدود ڪجي. خاص طور تي، مان چاهيان ٿو ته توهان هن حصي مان هيٺيان ٽي شيون ياد رکو:

  • سيشن ونڊوز جي سائيز وقت جي مدي سان نه، پر صارف جي سرگرمي جي لحاظ کان محدود آهي؛
  • "ٽمبلنگ" ونڊوز ڏنل وقت جي اندر واقعن جو هڪ جائزو مهيا ڪن ٿا؛
  • جمپنگ ونڊوز جو عرصو مقرر ٿيل آھي، پر اھي بار بار اپڊيٽ ڪيا ويندا آھن ۽ ٿي سگھي ٿو سڀني ونڊوز ۾ اوورليپنگ اينٽريون.

اڳيون، اسان سيکارينداسين ته ڪيئن ڪنيڪشن لاءِ KTable کي واپس KStream ۾ تبديل ڪجي.

5.3.3. KStream ۽ KTable شيون ڳنڍڻ

باب 4 ۾، اسان ٻن KStream شين کي ڳنڍڻ تي بحث ڪيو. هاڻي اسان کي سکڻو پوندو KTable ۽ KStream کي ڪيئن ڳنڍجي. اهو هيٺين سادي سبب لاء گهربل ٿي سگهي ٿو. KStream رڪارڊ جو هڪ وهڪرو آهي، ۽ KTable رڪارڊ جي تازه ڪاري جو هڪ وهڪرو آهي، پر ڪڏهن ڪڏهن توهان KTable مان تازه ڪاريون استعمال ڪندي رڪارڊ اسٽريم ۾ اضافي حوالا شامل ڪرڻ چاهيو ٿا.

اچو ته اسٽاڪ ايڪسچينج ٽرانزيڪشن جي تعداد تي ڊيٽا وٺو ۽ انهن کي لاڳاپيل صنعتن لاء اسٽاڪ ايڪسچينج جي خبرن سان گڏ ڪريو. ھتي آھي توھان کي حاصل ڪرڻ لاءِ توھان کي ڇا ڪرڻو آھي اھو ڏنل ڪوڊ جيڪو توھان وٽ اڳ ۾ ئي آھي.

  1. هڪ KTable اعتراض کي ڊيٽا سان گڏ اسٽاڪ ٽرانزيڪشن جي تعداد تي KStream ۾ تبديل ڪريو، ان کان پوءِ ڪي کي مٽائي ڪيئي سان تبديل ڪريو ان اسٽاڪ جي نشاني سان لاڳاپيل انڊسٽري شعبي جو اشارو.
  2. ھڪڙو KTable اعتراض ٺاھيو جيڪو اسٽاڪ ايڪسچينج جي خبرن سان ھڪڙي موضوع مان ڊيٽا پڙھي. هي نئون KTable صنعت جي شعبي طرفان درجه بندي ڪيو ويندو.
  3. صنعت شعبي طرفان اسٽاڪ ايڪسچينج ٽرانزيڪشن جي تعداد تي معلومات سان خبرن جي تازه ڪاري سان ڳنڍيو.

هاڻي اچو ته ڏسون ته هن ايڪشن پلان تي ڪيئن عمل ڪجي.

تبديل ڪريو KTable جي طرف KStream

KTable کي KStream ۾ تبديل ڪرڻ لاءِ توھان کي ھيٺيون ڪم ڪرڻو پوندو.

  1. ڪال ڪريو KTable.toStream() طريقو.
  2. KStream.map طريقي کي ڪال ڪندي، ڪيڏي کي انڊسٽري جي نالي سان مٽايو، ۽ پوءِ Windowed مثال مان TransactionSummary اعتراض کي واپس وٺو.

اسان انهن عملن کي هن ريت گڏ ڪنداسين (ڪوڊ فائل ۾ ڳولهي سگهجي ٿو src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (فهرست 5.8).

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"
ڇاڪاڻ ته اسان هڪ KStream.map آپريشن ڪري رهيا آهيون، واپس ڪيل KStream مثال خودڪار طريقي سان ٻيهر ورهاڱي ڪئي ويندي آهي جڏهن اهو ڪنيڪشن ۾ استعمال ڪيو ويندو آهي.

اسان تبادلي جو عمل مڪمل ڪيو آهي، اڳتي اسان کي اسٽاڪ نيوز پڙهڻ لاءِ KTable اعتراض ٺاهڻو پوندو.

اسٽاڪ جي خبرن لاءِ ڪي ٽيبل ٺاهڻ

خوشقسمتيءَ سان، KTable شئي ٺاهڻ لاءِ ڪوڊ جي صرف هڪ لائن لڳندي آهي (ڪوڊ ڳولهي سگهجي ٿو src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (فهرست 5.9).

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"
اها ڳالهه نوٽ ڪرڻ جي قابل آهي ته ڪنهن به Serde شين جي وضاحت ڪرڻ جي ضرورت ناهي، ڇاڪاڻ ته اسٽرنگ Serdes سيٽنگن ۾ استعمال ڪيا ويا آهن. انهي سان گڏ، EARLIEST ڳڻپ استعمال ڪندي، ٽيبل تمام شروعات ۾ رڪارڊ سان ڀريل آهي.

ھاڻي اسان آخري قدم ڏانھن منتقل ڪري سگھون ٿا - ڪنيڪشن.

ٽرانزيڪشن ڳڻپ ڊيٽا سان خبرون تازه ڪاريون ڳنڍڻ

ڪنيڪشن ٺاهڻ ڏکيو نه آهي. اسان هڪ کاٻي جوائن استعمال ڪنداسين جيڪڏهن لاڳاپيل صنعت لاءِ ڪا به اسٽاڪ خبر ناهي (ضروري ڪوڊ فائل ۾ ڳولهي سگهجي ٿو src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (فهرست 5.10).

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"
هي leftJoin آپريٽر بلڪل سادو آهي. باب 4 ۾ شامل ٿيڻ جي برعڪس، JoinWindow طريقو استعمال نه ڪيو ويو آهي ڇاڪاڻ ته جڏهن هڪ KStream-KTable جوائن کي انجام ڏئي ٿو، اتي KTable ۾ هر ڪيئي لاءِ صرف هڪ داخلا آهي. اهڙي رابطي وقت ۾ محدود نه آهي: رڪارڊ يا ته KTable ۾ يا غير حاضر آهي. مکيه نتيجو: KTable شيون استعمال ڪندي توهان KStream کي بهتر ڪري سگهو ٿا گهٽ بار بار اپڊيٽ ٿيل ريفرنس ڊيٽا سان.

هاڻي اسان KStream کان واقعن کي بهتر بنائڻ لاءِ وڌيڪ ڪارائتو طريقو ڏسنداسين.

5.3.4. GlobalKTable شيون

جئين توهان ڏسي سگهو ٿا، واقعي جي اسٽريمز کي بهتر ڪرڻ يا انهن جي حوالي سان شامل ڪرڻ جي ضرورت آهي. باب 4 ۾ توهان ٻه KStream شين جي وچ ۾ ڪنيڪشن ڏٺا، ۽ پوئين حصي ۾ توهان KStream ۽ KTable جي وچ ۾ ڪنيڪشن ڏٺو. انهن سڀني صورتن ۾، ضروري آهي ته ڊيٽا جي وهڪري کي ٻيهر ورهاڱي ڪرڻ لاء جڏهن ڪنجين کي نئين قسم يا قيمت تي نقشو ڪيو وڃي. ڪڏهن ڪڏهن ٻيهر ورهاڱي واضح طور تي ڪيو ويندو آهي، ۽ ڪڏهن ڪڏهن ڪافڪا اسٽريمز اهو خودڪار طريقي سان ڪري ٿو. ٻيهر ورهاڱي ضروري آهي ڇاڪاڻ ته ڪنجيون تبديل ٿي ويون آهن ۽ رڪارڊ نئين حصن ۾ ختم ٿيڻ گهرجن، ٻي صورت ۾ ڪنيڪشن ناممڪن ٿي ويندو (اهو باب 4 ۾ بحث ڪيو ويو آهي، سيڪشن 4.2.4 ۾ "ڊيٽا ٻيهر ورهاڱي" ۾).

ٻيهر ورهاڱي جي قيمت آهي

ٻيهر ورهاڱي جي قيمتن جي ضرورت آهي - وچولي موضوع ٺاهڻ لاء اضافي وسيلن جي قيمت، ٻئي موضوع ۾ نقل واري ڊيٽا کي محفوظ ڪرڻ؛ ان جو مطلب اهو به آهي ته هن موضوع مان لکڻ ۽ پڙهڻ جي ڪري ويڪرائي وڌي وئي. اضافي طور تي، جيڪڏهن توهان کي هڪ کان وڌيڪ پاسن يا طول و عرض ۾ شامل ٿيڻ جي ضرورت آهي، توهان کي لازمي طور تي شامل ٿيڻ گهرجي، رڪارڊ کي نقشي کي نيون ڪنجين سان، ۽ ٻيهر ورهاڱي واري عمل کي ٻيهر هلائڻ گهرجي.

ننڍن ڊيٽا سيٽن سان ڳنڍڻ

ڪن حالتن ۾، ڳنڍڻ لاءِ ريفرنس ڊيٽا جو مقدار نسبتاً ننڍو هوندو آهي، تنهنڪري ان جون مڪمل ڪاپيون آساني سان مقامي طور تي هر نوڊ تي فٽ ٿي سگهن ٿيون. اهڙين حالتن لاءِ، ڪافڪا اسٽريمز مهيا ڪري ٿي GlobalKTable ڪلاس.

GlobalKTable مثال منفرد آهن ڇو ته ايپليڪيشن سڀني ڊيٽا کي هر هڪ نوڊس ڏانهن نقل ڪري ٿي. ۽ جيئن ته سمورو ڊيٽا هر نوڊ تي موجود آهي، تنهن ڪري ايونٽ اسٽريم کي ورهائڻ جي ڪا ضرورت ناهي ريفرنس ڊيٽا ڪيئي ذريعي ته جيئن اهو سڀني ورهاڱن لاءِ دستياب هجي. توهان GlobalKTable شيون استعمال ڪندي ڪيئي بيس جوائن پڻ ڪري سگهو ٿا. اچو ته ھن خصوصيت کي ڏيکارڻ لاءِ پوئين مثالن مان ھڪڙي ڏانھن واپس وڃون.

KStream آبجڪس کي GlobalKTable آبجڪس سان ڳنڍڻ

ذيلي سيڪشن 5.3.2 ۾، اسان خريد ڪندڙن طرفان تبادلي جي ٽرانزيڪشن جي ونڊو مجموعي کي انجام ڏنو. هن مجموعي جا نتيجا هن طرح ڪجهه نظر اچن ٿا:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

جڏهن ته اهي نتيجا مقصد پورا ڪيا، اهو وڌيڪ ڪارائتو ٿئي ها جيڪڏهن گراهڪ جو نالو ۽ مڪمل ڪمپني جو نالو پڻ ڏيکاريو وڃي ها. ڪسٽمر جو نالو ۽ ڪمپني جو نالو شامل ڪرڻ لاءِ، توھان عام جوائن ڪري سگھو ٿا، پر توھان کي ڪرڻو پوندو ٻه مکيه نقشا ۽ ٻيهر ورھاڱي. GlobalKTable سان توهان اهڙن عملن جي قيمت کان پاسو ڪري سگهو ٿا.

هن کي ڪرڻ لاءِ، اسان لسٽنگ 5.11 مان CountStream آبجیکٹ استعمال ڪنداسين (ساڳئي ڪوڊ src/main/java/bbejeck/chapter_5/GlobalKTableExample.java ۾ ڳولهي سگهجي ٿو) ۽ ان کي ٻن GlobalKTable شين سان ڳنڍينداسين.

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"
اسان اڳ ۾ ئي هن تي بحث ڪيو آهي، تنهنڪري مان ان کي ٻيهر نه ڏيندس. پر مان نوٽ ڪريان ٿو ته ڪوڊ toStream(.map فنڪشن ۾ خلاصو ڪيو ويو آهي هڪ فنڪشن اعتراض ۾ ان لائن ليمبڊا ايڪسپريشن جي بدران پڙهڻ جي قابل.

ايندڙ قدم GlobalKTable جي ٻن مثالن جو اعلان ڪرڻ آهي (ظاهر ڪيل ڪوڊ فائل ۾ ڳولهي سگهجي ٿو src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (فهرست 5.12).

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"

مهرباني ڪري نوٽ ڪريو ته عنوان جا نالا ڳڻپيوڪر قسم استعمال ڪندي بيان ڪيا ويا آهن.

ھاڻي ته اسان وٽ سڀ حصا تيار آھن، باقي رھيو آھي ڪنيڪشن لاءِ ڪوڊ لکڻ (جيڪو فائل ۾ ڳولهي سگھجي ٿو src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (فهرست 5.13).

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"
جيتوڻيڪ هن ڪوڊ ۾ ٻه شامل آهن، اهي زنجير آهن ڇو ته انهن مان ڪوبه نتيجو الڳ الڳ استعمال نه ڪيو ويو آهي. نتيجن کي پوري آپريشن جي آخر ۾ ڏيکاريل آهن.

جڏهن توهان مٿي ڏنل شامل ٿيڻ واري آپريشن کي هلائيندا آهيو، توهان هن طرح جا نتيجا حاصل ڪندا:

{customer='Barney, Smith' company="Exxon", transactions= 17}

جوهر تبديل نه ٿيو آهي، پر اهي نتيجا وڌيڪ واضح نظر اچن ٿا.

جيڪڏهن توهان باب 4 تائين ڳڻپ ڪيو ٿا، توهان اڳ ۾ ئي ڏٺو آهي ڪيترن ئي قسمن جا ڪنيڪشن عمل ۾. اهي جدول ۾ درج ٿيل آهن. 5.2. هي جدول ڪافڪا اسٽريمز جي ورزن 1.0.0 جي طور تي رابطي جي صلاحيتن کي ظاهر ڪري ٿو. مستقبل جي رليز ۾ ڪجھ تبديل ٿي سگھي ٿو.

ڪتاب ”ڪافڪا اسٽريمز ان ايڪشن. حقيقي وقت جي ڪم لاء ايپليڪيشنون ۽ مائڪرو سروسز"
شين کي ختم ڪرڻ لاءِ، اچو ته بنيادي ڳالهين کي ٻيهر ورجايو: توهان ايونٽ اسٽريمز (KStream) کي ڳنڍي سگهو ٿا ۽ مقامي رياست استعمال ڪندي اسٽريمز (KTable) کي اپڊيٽ ڪري سگهو ٿا. متبادل طور تي، جيڪڏهن ريفرنس ڊيٽا جي سائيز تمام وڏي نه آهي، توهان استعمال ڪري سگهو ٿا GlobalKTable اعتراض. GlobalKTables سڀني ڀاڱن کي نقل ڪري ٿو هر ڪافڪا اسٽريمز ايپليڪيشن نوڊ تي، انهي ڳالهه کي يقيني بڻائي ٿي ته سمورو ڊيٽا موجود آهي قطع نظر ان جي ورهاڱي جي ڪنجي سان لاڳاپيل آهي.

اڳتي اسان ڏسنداسين ڪافڪا اسٽريمز جي خصوصيت، جنهن جي مهرباني اسان ڪافڪا جي موضوع مان ڊيٽا استعمال ڪرڻ کان سواءِ رياستي تبديلين جو مشاهدو ڪري سگهون ٿا.

5.3.5. قابل سوال حالت

اسان اڳ ۾ ئي ڪيترائي آپريشن ڪيا آھن جن ۾ رياست شامل آھي ۽ ھميشه نتيجن کي ڪنسول ڏانھن (ترقي جي مقصدن لاءِ) يا انھن کي ھڪڙي موضوع تي لکو (پيداوار جي مقصدن لاءِ). جڏهن ڪنهن موضوع تي نتيجا لکي رهيا آهيو، توهان کي انهن کي ڏسڻ لاءِ ڪافڪا صارف استعمال ڪرڻو پوندو.

انهن عنوانن مان ڊيٽا پڙهڻ کي هڪ قسم جي مادي نظرين جو هڪ قسم سمجهي سگهجي ٿو. اسان جي مقصدن لاءِ، اسان وڪيپيڊيا مان مادي ٿيل ڏيک جي وصف استعمال ڪري سگھون ٿا: “...هڪ فزيڪل ڊيٽابيس شئي جنهن ۾ سوال جا نتيجا هجن. مثال طور، اهو ٿي سگهي ٿو ريموٽ ڊيٽا جي مقامي ڪاپي، يا ٽيبل جي قطارن ۽/يا ڪالمن جو ذيلي سيٽ يا نتيجن ۾ شامل ٿيڻ، يا مجموعي ذريعي حاصل ڪيل خلاصو جدول“ (https://en.wikipedia.org/wiki /Materialized_view).

ڪافڪا اسٽريمز پڻ توهان کي رياستي اسٽورن تي انٽرويو سوالن کي هلائڻ جي اجازت ڏئي ٿي، توهان کي انهن مادي ٿيل نظرين کي سڌو سنئون پڙهڻ جي اجازت ڏئي ٿي. اهو نوٽ ڪرڻ ضروري آهي ته رياستي اسٽور ڏانهن سوال صرف پڙهڻ لاءِ آپريشن آهي. اهو يقيني بڻائي ٿو ته توهان کي پريشان ٿيڻ جي ضرورت ناهي ته حادثي سان رياست کي متضاد بڻائڻ جي باري ۾ جڏهن توهان جي ايپليڪيشن ڊيٽا پروسيسنگ ڪري رهي آهي.

رياستي اسٽورن کان سڌو سوال ڪرڻ جي صلاحيت اهم آهي. هن جو مطلب اهو آهي ته توهان ڊيش بورڊ ايپليڪيشنون ٺاهي سگهو ٿا بغير پهرين ڪافڪا صارف کان ڊيٽا حاصل ڪرڻ جي. اهو پڻ ايپليڪيشن جي ڪارڪردگي وڌائي ٿو، حقيقت اها آهي ته ڊيٽا کي ٻيهر لکڻ جي ضرورت ناهي:

  • ڊيٽا جي مقاميت جي مهرباني، اهي جلدي پهچائي سگھجن ٿيون؛
  • ڊيٽا جي نقل کي ختم ڪيو ويو آهي، ڇاڪاڻ ته اهو خارجي اسٽوريج ڏانهن نه لکيو ويو آهي.

بنيادي شيء جيڪا مان توهان کي ياد رکڻ چاهيان ٿو اها آهي ته توهان پنهنجي درخواست جي اندر رياست کان سڌو سوال ڪري سگهو ٿا. اهي موقعا جيڪي توهان کي ڏئي ٿو انهن کي ختم نٿو ڪري سگهجي. ڪافڪا کان ڊيٽا استعمال ڪرڻ ۽ ايپليڪيشن لاءِ ڊيٽابيس ۾ رڪارڊ محفوظ ڪرڻ بدران، توهان ساڳئي نتيجن سان رياستي اسٽورن کان سوال ڪري سگهو ٿا. رياستي اسٽورن ڏانهن سڌو سوالن جو مطلب آهي گهٽ ڪوڊ (ڪو به صارف نه) ۽ گهٽ سافٽ ويئر (نه ضرورت آهي ڊيٽابيس ٽيبل جي نتيجن کي ذخيرو ڪرڻ لاءِ).

اسان هن باب ۾ ڪافي حد تائين زمين کي ڍڪي ڇڏيو آهي، تنهنڪري اسان رياستي اسٽورن جي خلاف انٽرايڪٽو سوالن جي بحث کي هن وقت تائين ڇڏينداسين. پر پريشان نه ٿيو: باب 9 ۾، اسان هڪ سادي ڊيش بورڊ ايپليڪيشن ٺاهينداسين جنهن ۾ انٽرايڪٽو سوالن سان گڏ. اهو هن ۽ پوئين بابن مان ڪجهه مثال استعمال ڪندو انٽرايڪٽو سوالن کي ظاهر ڪرڻ لاءِ ۽ ڪيئن توهان انهن کي ڪافڪا اسٽريمز ايپليڪيشنن ۾ شامل ڪري سگهو ٿا.

خلاصو

  • KStream شيون ايونٽس جي اسٽريمز جي نمائندگي ڪن ٿيون، ڊيٽابيس ۾ داخل ڪرڻ جي مقابلي ۾. KTable شيون تازه ڪاري اسٽريمز جي نمائندگي ڪن ٿيون، جيئن ڊيٽابيس ۾ تازه ڪاريون. KTable اعتراض جي سائيز نه وڌندي آهي، پراڻا رڪارڊ نوان سان مٽايا ويندا آهن.
  • KTable شيون گڏ ڪرڻ جي عملن لاءِ گھربل آھن.
  • ونڊونگ عملن کي استعمال ڪندي، توهان مجموعي ڊيٽا کي وقت جي بالٽ ۾ ورهائي سگهو ٿا.
  • GlobalKTable آبجیکٹس جي مهرباني، توهان اپليڪيشن ۾ ڪٿي به ريفرنس ڊيٽا تائين رسائي ڪري سگهو ٿا، ورهاڱي کان سواءِ.
  • KStream، KTable ۽ GlobalKTable شين جي وچ ۾ رابطا ممڪن آهن.

هن وقت تائين، اسان اعليٰ سطحي KStream DSL استعمال ڪندي ڪافڪا اسٽريمز ايپليڪيشنون ٺاهڻ تي ڌيان ڏنو آهي. جيتوڻيڪ اعلي سطحي طريقي سان توهان کي صاف ۽ جامع پروگرام ٺاهڻ جي اجازت ڏئي ٿي، ان کي استعمال ڪندي واپار جي نمائندگي ڪري ٿو. DSL KStream سان ڪم ڪرڻ جو مطلب آهي ڪنٽرول جي درجي کي گهٽائڻ سان توهان جي ڪوڊ جي جامعيت کي وڌائڻ. ايندڙ باب ۾، اسان ڏسنداسين گهٽ-سطح هينڊلر نوڊ API ۽ ڪوشش ڪنداسين ٻين واپاري بندن کي. پروگرام اڳي کان وڌيڪ ڊگھا هوندا، پر اسان تقريباً ڪنهن به هينڊلر نوڊ ٺاهي سگهنداسين جنهن جي اسان کي ضرورت هجي.

→ ڪتاب جي باري ۾ وڌيڪ تفصيل هتي ملي سگهي ٿو پبلشر جي ويب سائيٽ

→ Habrozhiteli لاءِ 25٪ رعايت ڪوپن استعمال ڪندي - ڪافڪا وهڪرو

→ ڪتاب جي پيپر ورزن جي ادائيگي تي، هڪ اليڪٽرانڪ ڪتاب اي ميل ذريعي موڪليو ويندو.

جو ذريعو: www.habr.com

تبصرو شامل ڪريو