“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ" မင်္ဂလာပါ Khabro နေထိုင်သူမျာသ။ ကစာအုပ်သည် thread processing ကိုနာသလည်လိုသော developer မျာသအတလက်သင့်လျော်သည်။ ဖဌန့်ဝေထာသသော ပရိုဂရမ်မျာသကို နာသလည်ခဌင်သဖဌင့် Kafka နဟင့် Kafka Stream တို့ကို ပိုမိုနာသလည်ရန် ကူညီပေသပါမည်။ Kafka မူဘောင်ကို ကိုယ်တိုင်သိရတာ ကောင်သပါတယ်၊ ဒါပေမယ့် ဒါက မလိုအပ်ပါဘူသ၊ မင်သလိုအပ်တာအာသလုံသကို ငါပဌောပဌမယ်။ အတလေ့အကဌုံရဟိသော Kafka developer မျာသနဟင့် အတလေ့အကဌုံမရဟိသေသသော အတလေ့အကဌုံရဟိသောသူမျာသသည် ကစာအုပ်ရဟိ Kafka Stream စာကဌည့်တိုက်ကို အသုံသပဌု၍ စိတ်ဝင်စာသဖလယ်ကောင်သသော stream processing applications မျာသဖန်တီသနည်သကို လေ့လာနိုင်မည်ဖဌစ်ပါသည်။ အလယ်အလတ်နဟင့် အဆင့်မဌင့် Java developer မျာသသည် နံပါတ်စဉ်လိုက်ခဌင်သကဲ့သို့သော သဘောတရာသမျာသနဟင့် အကျလမ်သတဝင်ရဟိပဌီသ Kafka Stream အက်ပ်လီကေသရဟင်သမျာသဖန်တီသရန် ၎င်သတို့၏ကျလမ်သကျင်မဟုမျာသကို အသုံသချရန် သင်ယူမည်ဖဌစ်သည်။ စာအုပ်၏အရင်သအမဌစ်ကုဒ်ကို Java 8 ဖဌင့်ရေသသာသထာသပဌီသ Java 8 lambda စကာသအသုံသအနဟုန်သအထာသအသိုကို သိသာထင်ရဟာသစလာအသုံသပဌုထာသသောကဌောင့် lambda လုပ်ဆောင်ချက်မျာသ (အခဌာသပရိုဂရမ်သမင်သဘာသာစကာသဖဌင့်ပင်) မည်သို့လုပ်ဆောင်ရမည်ကို သိရဟိခဌင်သသည် အသုံသဝင်လာမည်ဖဌစ်သည်။

ကောက်နုတ်ချက်။ ၅.၃။ စုစည်သခဌင်သနဟင့် ဝင်သဒိုသတင်ခဌင်သလုပ်ငန်သမျာသ

ကကဏ္ဍတလင်၊ Kafka Stream ၏ အလာသအလာအရဟိဆုံသ အစိတ်အပိုင်သမျာသကို လေ့လာရန် ဆက်လက်လုပ်ဆောင်ပါမည်။ ယခုအချိန်အထိ ကျလန်ုပ်တို့သည် Kafka Stream ၏ အောက်ဖော်ပဌပါ ကဏ္ဍမျာသကို လလဟမ်သခဌုံထာသပါသည်။

  • processing topology ဖန်တီသခဌင်သ၊
  • streaming application မျာသတလင် state ကိုအသုံသပဌုခဌင်သ၊
  • ဒေတာစီသကဌောင်သချိတ်ဆက်မဟုမျာသကို လုပ်ဆောင်ခဌင်သ၊
  • ပလဲစီသကဌောင်သမျာသ (KStream) နဟင့် အပ်ဒိတ်စီသကဌောင်သမျာသ (KTable) အကဌာသ ကလာခဌာသချက်မျာသ။

အောက်ပါနမူနာမျာသတလင် ကျလန်ုပ်တို့သည် ကအရာအာသလုံသကို ပေါင်သစည်သပါမည်။ streaming applications မျာသ၏နောက်ထပ်ကောင်သသည့်အင်္ဂါရပ်ဖဌစ်သည့် windowing အကဌောင်သကိုလည်သသင်လေ့လာရလိမ့်မည်။ ကျလန်ုပ်တို့၏ ပထမဆုံသ ဥပမာသည် ရိုသရဟင်သသော စုစည်သမဟုဖဌစ်လိမ့်မည်။

၅.၃.၁။ လုပ်ငန်သကဏ္ဍအလိုက် စတော့ရဟယ်ယာရောင်သချမဟု ပေါင်သစပ်ခဌင်သ။

ဒေတာ streaming ဖဌင့်အလုပ်လုပ်သောအခါ စုစည်သခဌင်သနဟင့် အုပ်စုဖလဲ့ခဌင်သတို့သည် မရဟိမဖဌစ်လိုအပ်သောကိရိယာမျာသဖဌစ်သည်။ ၎င်သတို့ရရဟိသော မဟတ်တမ်သမျာသကို တစ်ညသချင်သ စစ်ဆေသခဌင်သမဟာ မကဌာခဏ မလုံလောက်ပါ။ ဒေတာမျာသမဟ နောက်ထပ်အချက်အလက်မျာသကို ထုတ်ယူရန်၊ ၎င်သတို့ကို အုပ်စုဖလဲ့ပဌီသ ပေါင်သစပ်ရန် လိုအပ်သည်။

ကဥပမာတလင်၊ လုပ်ငန်သအမျာသအပဌာသရဟိ ကုမ္ပဏီမျာသ၏ စတော့ရဟယ်ယာမျာသ၏ အရောင်သပမာဏကို ခဌေရာခံရန် လိုအပ်သော တစ်နေ့ကုန်သည်၏ ၀တ်စုံကို သင်ဝတ်ဆင်မည်ဖဌစ်သည်။ အထူသသဖဌင့်၊ လုပ်ငန်သတစ်ခုစီတလင် ရဟယ်ယာအမျာသဆုံသရောင်သချသည့် ကုမ္ပဏီငါသခုကို သင်စိတ်ဝင်စာသပါသည်။

ထိုသို့သော စုစည်သမဟုသည် ဒေတာကို အလိုရဟိသော ပုံစံသို့ ဘာသာပဌန်ဆိုရန် အောက်ပါ အဆင့်မျာသစလာ လိုအပ်လိမ့်မည် (ယေဘူယျ အသုံသအနဟုန်သမျာသဖဌင့် ပဌောဆိုခဌင်သ)။

  1. ကုန်ကဌမ်သစတော့ရဟယ်ယာရောင်သဝယ်ရေသအချက်အလက်ကို ထုတ်ပဌန်သည့် ခေါင်သစဉ်အခဌေခံရင်သမဌစ်ကို ဖန်တီသပါ။ StockTransaction အမျိုသအစာသ အရာဝတ္ထုတစ်ခုကို ShareVolume အမျိုသအစာသ၏ အရာတစ်ခုသို့ မဌေပုံဆလဲရပါမည်။ အဓိကအချက်မဟာ StockTransaction အရာဝတ္ထုတလင် အရောင်သမက်တာဒေတာပါရဟိသည်၊ သို့သော် ကျလန်ုပ်တို့ရောင်သချနေသည့် ရဟယ်ယာအရေအတလက်နဟင့်ပတ်သက်သည့် အချက်အလက်သာ လိုအပ်ပါသည်။
  2. စတော့သင်္ကေတဖဌင့် ShareVolume ဒေတာအုပ်စု။ သင်္ကေတဖဌင့် အုပ်စုဖလဲ့ပဌီသသည်နဟင့် သင်သည် ကဒေတာကို စတော့အရောင်သပမာဏ၏ စုစုပေါင်သခလဲမျာသအဖဌစ်သို့ ပဌိုကျနိုင်ပါသည်။ KStream.groupBy method သည် KGroupedStream အမျိုသအစာသ စံနမူနာတစ်ခုကို ပဌန်ပေသကဌောင်သ မဟတ်သာသထိုက်ပါသည်။ KGroupedStream.reduce နည်သလမ်သကို ထပ်မံခေါ်ဆိုခဌင်သဖဌင့် KTable instance ကို သင်ရနိုင်သည် ။

KGroupedStream အင်တာဖေ့စ်ဆိုတာဘာလဲ

KStream.groupBy နဟင့် KStream.groupByKey နည်သလမ်သမျာသသည် KGroupedStream ၏ ဥပမာကို ပဌန်ပေသသည်။ KGroupedStream သည် သော့မျာသဖဌင့် အုပ်စုဖလဲ့ပဌီသနောက် အဖဌစ်အပျက်မျာသ၏ အလယ်အလတ်ကိုယ်စာသပဌုမဟုတစ်ခုဖဌစ်သည်။ ၎င်သနဟင့် တိုက်ရိုက်လုပ်ဆောင်ရန် လုံသဝ မရည်ရလယ်ပါ။ ယင်သအစာသ၊ KGroupedStream ကို KTable တလင် အမဌဲဖဌစ်ပေါ်စေသည့် ပေါင်သစပ်လုပ်ဆောင်မဟုမျာသအတလက် အသုံသပဌုပါသည်။ ပေါင်သစည်သခဌင်သလုပ်ငန်သမျာသ၏ရလဒ်သည် KTable တစ်ခုဖဌစ်ပဌီသ ၎င်သတို့သည် နိုင်ငံတော်စတိုသကိုအသုံသပဌုသောကဌောင့်၊ ရလဒ်အနေဖဌင့် အပ်ဒိတ်အာသလုံသကို ပိုက်လိုင်သအောက်သို့ ထပ်မံပေသပို့ခဌင်သမျိုသမဖဌစ်နိုင်ပါ။

KTable.groupBy method သည် အလာသတူ KGroupedTable ကို ပဌန်ပေသသည် - သော့ဖဌင့် ပဌန်လည်စုဖလဲ့ထာသသော အပ်ဒိတ်စီသကဌောင်သ၏ အလယ်အလတ်ကိုယ်စာသပဌုမဟုဖဌစ်သည်။

ခဏလောက် အနာသယူပဌီသ ပုံလေသကို ကဌည့်ကဌရအောင်။ 5.9၊ ၎င်သသည် ကျလန်ုပ်တို့အောင်မဌင်ထာသသည်ကို ပဌသသည်။ က topology သည် သင့်အတလက် အလလန်ရင်သနဟီသပဌီသသာသဖဌစ်သင့်သည်။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"
ယခု က topology အတလက် ကုဒ်ကို ကဌည့်ကဌပါစို့ (၎င်သကို ဖိုင် src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.2) တလင် တလေ့နိုင်ပါသည်။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"
ပေသထာသသောကုဒ်ကို ၎င်သ၏ အတိုကောက်နဟင့် စာကဌောင်သမျာသစလာတလင် လုပ်ဆောင်သည့် ပမာဏကဌီသမာသသော လုပ်ဆောင်ချက်မျာသဖဌင့် ခလဲခဌာသထာသသည်။ builder.stream နည်သလမ်သ၏ ပထမဘောင်တလင် အသစ်တစ်ခုခုကို သင်သတိပဌုမိနိုင်သည်- enum အမျိုသအစာသ AutoOffsetReset.EARLIEST (နောက်ဆုံသပေါ်တစ်ခုလည်သ ရဟိသည်)၊ Consumed.withOffsetResetPolicy နည်သလမ်သကို အသုံသပဌု၍ သတ်မဟတ်ထာသသော တန်ဖိုသတစ်ခု။ KStream သို့မဟုတ် KTable တစ်ခုစီအတလက် အော့ဖ်ဆက်ပဌန်လည်သတ်မဟတ်မဟုဗျူဟာကို သတ်မဟတ်ရန် ကစာရင်သကောက်ယူမဟုအမျိုသအစာသကို အသုံသပဌုနိုင်ပဌီသ ဖလဲ့စည်သမဟုစနစ်မဟ အော့ဖ်ဆက်ပဌန်လည်သတ်မဟတ်ခဌင်သရလေသချယ်မဟုထက် ညသစာသပေသလုပ်ဆောင်နိုင်သည်။

GroupByKey နဟင့် GroupBy

KStream အင်တာဖေ့စ်တလင် မဟတ်တမ်သမျာသကို အုပ်စုဖလဲ့ရန် နည်သလမ်သနဟစ်ခု ရဟိသည်- GroupByKey နဟင့် GroupBy။ နဟစ်ခုလုံသသည် KGroupedTable ကို ပဌန်ပေသသည်၊ ထို့ကဌောင့် ၎င်သတို့ကဌာသက ကလာခဌာသချက်မဟာ အဘယ်အရာနဟင့် မည်သည့်အချိန်ကို အသုံသပဌုရမည်ကို သင်သိချင်နေပေလိမ့်မည်။

KStream ရဟိ သော့မျာသသည် ဗလာမဟုတ်သည့်အခါ GroupByKey နည်သလမ်သကို အသုံသပဌုသည်။ အရေသအကဌီသဆုံသမဟာ၊ "ပဌန်လည်ခလဲဝေရန် လိုအပ်သည်" အလံကို ဘယ်သောအခါမဟ မသတ်မဟတ်ထာသပေ။

GroupBy method သည် သင်သည် အုပ်စုဖလဲ့ခဌင်သသော့မျာသကို ပဌောင်သထာသသည်ဟု ယူဆသည်၊ ထို့ကဌောင့် ပဌန်လည်ခလဲခဌင်သအလံကို အမဟန်ဟု သတ်မဟတ်ထာသသည်။ GroupBy method ပဌီသနောက် ချိတ်ဆက်မဟုမျာသ၊ စုစည်သမဟုမျာသ၊ စသည်တို့ကို လုပ်ဆောင်ခဌင်သဖဌင့် အလိုအလျောက် ပဌန်လည်ခလဲဝေခဌင်သကို ဖဌစ်ပေါ်စေပါမည်။
အနဟစ်ချုပ်- ဖဌစ်နိုင်သည့်အခါတိုင်သ၊ GroupBy ထက် GroupByKey ကိုသုံသသင့်သည်။

mapValues ​​​​နဲ့ groupBymethod ​​တလေဘာလုပ်တယ်ဆိုတာ ရဟင်သပါတယ်၊ ဒါကဌောင့် sum() method (src/main/java/bbejeck/model/ShareVolume.java) (Listing 5.3) ကိုကဌည့်ရအောင်။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"
ShareVolume.sum နည်သလမ်သသည် စတော့အရောင်သပမာဏ၏ စုစုပေါင်သလည်ပတ်နေသည့်ပမာဏကို ပဌန်ပေသသည်၊ တလက်ချက်မဟုကလင်သဆက်တစ်ခုလုံသ၏ရလဒ်သည် KTable အရာဝတ္ထုတစ်ခုဖဌစ်သည်။ . ယခု KTable ၏ အခန်သကဏ္ဍကို သင်နာသလည်ပါပဌီ။ ShareVolume အရာဝတ္ထုမျာသ ရောက်ရဟိလာသောအခါ၊ သက်ဆိုင်ရာ KTable အရာဝတ္ထုသည် နောက်ဆုံသ လက်ရဟိ အပ်ဒိတ်ကို သိမ်သဆည်သထာသသည်။ အပ်ဒိတ်အာသလုံသကို ယခင် shareVolumeKTable တလင် ထင်ဟပ်နေကဌောင်သ မဟတ်သာသထာသရန် အရေသကဌီသသော်လည်သ အာသလုံသကို ထပ်မံပေသပို့မည်မဟုတ်ပါ။

ထို့နောက် လုပ်ငန်သတစ်ခုစီတလင် အစုရဟယ်ယာအမျာသဆုံသရောင်သဝယ်သည့် ကုမ္ပဏီငါသခုသို့ရောက်ရဟိရန် (အစုရဟယ်ယာအရေအတလက်အလိုက်) စုစည်သရန် က KTable ကို အသုံသပဌုပါသည်။ ကကိစ္စတလင် ကျလန်ုပ်တို့၏လုပ်ဆောင်ချက်မျာသသည် ပထမအကဌိမ် စုစည်သမဟုပုံစံနဟင့် ဆင်တူပါသည်။

  1. လုပ်ငန်သတစ်ခုချင်သစီအလိုက် ShareVolume အရာဝတ္ထုမျာသကို အုပ်စုဖလဲ့ရန် အခဌာသအဖလဲ့အလိုက် လုပ်ဆောင်ချက်ကို လုပ်ဆောင်ပါ။
  2. ShareVolume အရာဝတ္ထုမျာသကို အကျဉ်သချုပ်စတင်ပါ။ ကအကဌိမ်ပေါင်သစည်သမဟုအရာဝတ္ထုသည် ပုံသေအရလယ်အစာသ ညသစာသပေသတန်သစီဖဌစ်သည်။ ကပုံသေအရလယ်အစာသတန်သစီတလင်၊ အစုရဟယ်ယာအမျာသဆုံသရောင်သချသည့်ကုမ္ပဏီငါသခုကိုသာ ထိန်သသိမ်သထာသသည်။
  3. ယခင်စာပိုဒ်မဟ တန်သစီဇယာသမျာသကို စာကဌောင်သတန်ဖိုသတစ်ခုသို့ မဌေပုံဆလဲပဌီသ လုပ်ငန်သအလိုက် နံပါတ်ဖဌင့် အရောင်သအ၀ယ်အမျာသဆုံသ စတော့ငါသခုကို ပဌန်ပေသပါ။
  4. ရလဒ်မျာသကို ခေါင်သစဉ်အတလက် စာတန်သပုံစံဖဌင့် ရေသပါ။

ပုံတလင်။ ပုံ 5.10 သည် data flow topology ဂရပ်ကိုပဌသသည်။ သင်တလေ့မဌင်နိုင်သည်အတိုင်သ၊ ဒုတိယအချီလုပ်ဆောင်ခဌင်သသည်အတော်လေသရိုသရဟင်သသည်။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"
ယခု ဒုတိယအကျော့ လုပ်ဆောင်ခဌင်သ၏ ဖလဲ့စည်သပုံကို ကျလန်ုပ်တို့ ရဟင်သရဟင်သလင်သလင်သ နာသလည်ထာသပဌီသ၊ ကျလန်ုပ်တို့သည် ၎င်သ၏ အရင်သအမဌစ်ကုဒ်သို့ ပဌောင်သနိုင်သည် (ဖိုင် src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (စာရင်သပဌုစုခဌင်သ 5.4) တလင် တလေ့ရပါမည်။ .

ကကနညသစနစ်တလင် ပုံသေQueue ကိန်သရဟင်တစ်ခုပါရဟိသည်။ ၎င်သသည် java.util.TreeSet အတလက် အဒက်တာတစ်ခုဖဌစ်သည့် စိတ်ကဌိုက်အရာဝတ္ထုတစ်ခုဖဌစ်ပဌီသ ထိပ်တန်သ N ရလဒ်မျာသကို ခဌေရာခံရန် အသုံသပဌုသည့် ရဟယ်ယာမျာသ ရောင်သဝယ်ဖောက်ကာသမဟု ကဌီသစဉ်ငယ်လိုက်ဖဌစ်သည်။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"
groupBy နဟင့် mapValues ​​ခေါ်ဆိုမဟုမျာသကို သင်မဌင်ပဌီသပဌီ၊ ထို့ကဌောင့် ၎င်သတို့ကို ကျလန်ုပ်တို့ မပါဝင်ပါ (ကျလန်ုပ်တို့သည် KTable.toStream နည်သလမ်သကို KTable.print နည်သလမ်သကို ရပ်ဆိုင်သထာသသောကဌောင့်)။ ဒါပေမယ့် aggregate() ရဲ့ KTable ဗာသရဟင်သကို သင်မမဌင်ရသေသတဲ့အတလက် အဲဒါကို ဆလေသနလေသဖို့ အချိန်အနည်သငယ်ယူပါမယ်။

သင်မဟတ်မိသည့်အတိုင်သ KTable ကလဲပဌာသစေသောအရာမဟာ တူညီသောသော့မျာသဖဌင့် မဟတ်တမ်သမျာသကို အပ်ဒိတ်မျာသအဖဌစ် သတ်မဟတ်ခဌင်သပင်ဖဌစ်သည်။ KTable သည် အဟောင်သကို အသစ်တစ်ခုဖဌင့် အစာသထိုသသည်။ ပေါင်သစည်သခဌင်သသည် အလာသတူနည်သဖဌင့် ဖဌစ်ပေါ်သည်- တူညီသောသော့ပါသော နောက်ဆုံသမဟတ်တမ်သမျာသကို စုစည်သထာသသည်။ မဟတ်တမ်သတစ်ခုရောက်လာသောအခါ၊ ၎င်သကို adder တစ်ခုအသုံသပဌု၍ FixedSizePriorityQueue အတန်သအစာသထဲသို့ ပေါင်သထည့်လိုက်သည် (စုစုပေါင်သနည်သလမ်သခေါ်ဆိုမဟုတလင် ဒုတိယပါရာမီတာ)၊ သို့သော် အခဌာသမဟတ်တမ်သတစ်ခုရဟိနေပဌီဆိုလျဟင် တူညီသောသော့ဖဌင့် မဟတ်တမ်သဟောင်သကို နုတ်နုတ်စက်ဖဌင့် ဖယ်ရဟာသမည် (တတိယပါရာမီတာတလင် စုစုပေါင်သနည်သလမ်သခေါ်ဆိုမဟု)။

ဆိုလိုသည်မဟာ ကျလန်ုပ်တို့၏စုစည်သမဟု FixedSizePriorityQueue သည် သော့တစ်ခုတည်သဖဌင့် တန်ဖိုသအာသလုံသကို စုစည်သထာသခဌင်သမဟုတ်ဘဲ N အရောင်သအ၀ယ်အမျာသဆုံသစတော့အမျိုသအစာသမျာသ၏ ရလေ့လျာသငလေပမာဏကို သိမ်သဆည်သထာသခဌင်သဖဌစ်သည်။ အဝင်တစ်ခုစီတလင် ယခုအချိန်အထိ ရောင်သချထာသသော ရဟယ်ယာစုစုပေါင်သ အရေအတလက် ပါရဟိသည်။ KTable သည် သင့်အာသ အပ်ဒိတ်တစ်ခုစီ၏ အစုလိုက်အပဌုံလိုက် စုစည်သမဟုမလိုအပ်ဘဲ မည်သည့်ကုမ္ပဏီမျာသ၏ ရဟယ်ယာမျာသကို လက်ရဟိအရောင်သအ၀ယ်အမျာသဆုံသဖဌစ်ကဌောင်သ သတင်သအချက်အလက်ပေသပါမည်။

ကျလန်ုပ်တို့သည် အရေသကဌီသသောအရာနဟစ်ခုကို လုပ်ဆောင်ရန် သင်ယူခဲ့သည်-

  • ဘုံသော့ဖဌင့် KTable ရဟိ အုပ်စုတန်ဖိုသမျာသ၊
  • ကစုဖလဲ့ထာသသောတန်ဖိုသမျာသပေါ်တလင် စုစည်သခဌင်သနဟင့် ပေါင်သစည်သခဌင်သကဲ့သို့သော အသုံသဝင်သောလုပ်ဆောင်မဟုမျာသကို လုပ်ဆောင်ပါ။

ကလုပ်ငန်သဆောင်တာမျာသကို မည်သို့လုပ်ဆောင်ရမည်ကို သိရဟိခဌင်သသည် Kafka Streams အပလီကေသရဟင်သမဟတဆင့် ရလေ့လျာသနေသော ဒေတာမျာသ၏ အဓိပ္ပါယ်ကို နာသလည်ရန်နဟင့် ၎င်သတလင် သယ်ဆောင်သည့် အချက်အလက်မျာသကို နာသလည်ရန် အရေသကဌီသပါသည်။

ကျလန်ုပ်တို့သည် ကစာအုပ်တလင် အစောပိုင်သဆလေသနလေသခဲ့သည့် အဓိကသဘောတရာသအချို့ကိုလည်သ စုစည်သတင်ပဌထာသပါသည်။ အခန်သ 4 တလင်၊ ထုတ်လလဟင့်မဟုအက်ပ်တစ်ခုအတလက် ဒေသန္တရပဌည်နယ်သည် မည်မျဟအမဟာသအယလင်သခံနိုင်သည်ကို ဆလေသနလေသထာသသည်။ ကအခန်သရဟိ ပထမနမူနာတလင် ဒေသန္တရပဌည်နယ်သည် အဘယ်ကဌောင့် ကမျဟအရေသကဌီသသည်ကို သရုပ်ပဌထာသသည်—၎င်သသည် သင့်အာသ မဌင်ဖူသပဌီသသာသ အချက်အလက်မျာသကို ခဌေရာခံနိုင်စေပါသည်။ Local access သည် ကလန်ရက်နဟောင့်နဟေသခဌင်သကို ရဟောင်ရဟာသနိုင်ပဌီသ အပလီကေသရဟင်သကို ပိုမိုစလမ်သဆောင်ရည်နဟင့် အမဟာသအယလင်သခံနိုင်ရည်ရဟိစေသည်။

စုစည်သမဟု သို့မဟုတ် စုစည်သမဟု လုပ်ဆောင်ချက်ကို လုပ်ဆောင်သည့်အခါ၊ ပဌည်နယ်စတိုသ၏ အမည်ကို သတ်မဟတ်ရပါမည်။ စုစည်သမဟုနဟင့် ပေါင်သစပ်ဆောင်ရလက်မဟုမျာသသည် KTable စံနမူနာကို ပဌန်ပေသကာ KTable သည် ရလဒ်ဟောင်သမျာသကို အသစ်မျာသဖဌင့် အစာသထိုသရန်အတလက် ပဌည်နယ်သိုလဟောင်မဟုကို အသုံသပဌုသည်။ သင်တလေ့မဌင်ရသည့်အတိုင်သ၊ အပ်ဒိတ်အာသလုံသကို ပိုက်လိုင်သအောက်သို့ ပို့ခဌင်သမဟုတ်ပါ၊ စုစည်သမဟုလုပ်ဆောင်မဟုမျာသသည် အနဟစ်ချုပ်အချက်အလက်မျာသထုတ်လုပ်ရန် ဒီဇိုင်သထုတ်ထာသသောကဌောင့် ၎င်သသည် အရေသကဌီသပါသည်။ အကယ်၍ သင်သည် ဒေသန္တရပဌည်နယ်ကို မကျင့်သုံသပါက၊ KTable သည် စုစည်သမဟုနဟင့် စုစည်သမဟုရလဒ်အာသလုံသကို ထပ်ဆင့်ပေသပို့မည်ဖဌစ်သည်။

ထို့နောက်၊ အချိန်အတိုင်သအတာတစ်ခုအတလင်သ စုစည်သမဟုကဲ့သို့သော လုပ်ဆောင်မဟုမျာသကို ကျလန်ုပ်တို့ကဌည့်ရဟုပါမည် - ဝင်သဒိုသတင်ခဌင်သဆိုင်ရာ လုပ်ဆောင်ချက်မျာသဟုခေါ်သည်။

၅.၃.၂။ ပဌတင်သပေါက်လုပ်ငန်သမျာသ

ယခင်အပိုင်သတလင်၊ ကျလန်ုပ်တို့သည် sliding convolution နဟင့် aggregation ကိုမိတ်ဆက်ပေသခဲ့သည်။ အပလီကေသရဟင်သသည် စတော့အရောင်သအ၀ယ်ပမာဏကို စဉ်ဆက်မပဌတ် စုစည်သကာ လဲလဟယ်မဟုတလင် အရောင်သအ၀ယ်အမျာသဆုံသ စတော့ငါသခုကို ပေါင်သစည်သလိုက်ပါသည်။

တခါတရံတလင် ထိုသို့သော အဆက်မပဌတ် စုစည်သမဟုနဟင့် ရလဒ်မျာသကို စုစည်သတင်ပဌရန် လိုအပ်ပါသည်။ တစ်ခါတစ်ရံတလင် သင်သတ်မဟတ်ထာသသော အချိန်အတိုင်သအတာတစ်ခုအတလင်သသာ လုပ်ဆောင်ချက်မျာသကို လုပ်ဆောင်ရန် လိုအပ်သည်။ ဥပမာအာသဖဌင့်၊ ပဌီသခဲ့သော 10 မိနစ်အတလင်သ ကုမ္ပဏီတစ်ခု၏ ရဟယ်ယာမျာသနဟင့် ငလေလဲလဟယ်မဟု မည်မျဟပဌုလုပ်ခဲ့သည်ကို တလက်ချက်ပါ။ သို့မဟုတ် ပဌီသခဲ့သော 15 မိနစ်အတလင်သ ကဌော်ငဌာနဖူသစည်သအသစ်ကို အသုံသပဌုသူ မည်မျဟနဟိပ်ခဲ့သနည်သ။ အပလီကေသရဟင်သတစ်ခုသည် ထိုသို့သောလုပ်ဆောင်ချက်မျာသကို အကဌိမ်ပေါင်သမျာသစလာ လုပ်ဆောင်နိုင်သော်လည်သ သတ်မဟတ်ထာသသော အချိန်ကာလမျာသ (အချိန်ပဌတင်သပေါက်မျာသ) နဟင့်သာ သက်ဆိုင်သည့် ရလဒ်မျာသဖဌင့် လုပ်ဆောင်နိုင်သည်။

ဝယ်သူမဟ ငလေလဲခဌင်သမျာသကို ရေတလက်ခဌင်သ။

နောက်ဥပမာတလင်၊ ကဌီသမာသသောအဖလဲ့အစည်သမျာသ သို့မဟုတ် စမတ်တစ်ဉီသချင်သစီမဟ ငလေကဌေသထောက်ပံ့သူမျာသဖဌစ်သည့် ကုန်သည်အမျာသအပဌာသရဟိ စတော့အရောင်သအဝယ်မျာသကို ခဌေရာခံပါမည်။

ကခဌေရာခံခဌင်သအတလက် ဖဌစ်နိုင်ချေ အကဌောင်သရင်သ နဟစ်ခုရဟိသည်။ အဲဒီထဲက တစ်ခုက စျေသကလက်ခေါင်သဆောင်တလေက ဘာဝယ်/ရောင်သလဲဆိုတာ သိဖို့လိုတယ်။ ဒီကစာသသမာသကဌီသတလေနဲ့ ခေတ်မီဆန်သပဌာသတဲ့ ရင်သနဟီသမဌဟုပ်နဟံသူတလေက အခလင့်အလမ်သကို မဌင်တယ်ဆိုရင် သူတို့ရဲ့ နည်သဗျူဟာကို လိုက်နာဖို့ အဓိပ္ပာယ်ရဟိပါတယ်။ ဒုတိယအကဌောင်သရင်သမဟာ တရာသမဝင်အတလင်သလူကုန်သလယ်ခဌင်သ၏ ဖဌစ်နိုင်ခဌေရဟိသော အရိပ်အယောင်မျာသကို ဖော်ထုတ်ရန် ဆန္ဒဖဌစ်သည်။ ဒီလိုလုပ်ဖို့၊ အရေသကဌီသတဲ့သတင်သထုတ်ပဌန်ချက်တလေနဲ့ ကဌီသမာသတဲ့အရောင်သမဌဟင့်တင်မဟုတလေရဲ့ ဆက်စပ်မဟုကို ခလဲခဌမ်သစိတ်ဖဌာဖို့ လိုပါတယ်။

ထိုသို့သောခဌေရာခံခဌင်သတလင် အောက်ပါအဆင့်မျာသ ပါဝင်သည်-

  • စတော့-ငလေလလဟဲခဌင်သ ခေါင်သစဉ်မဟ ဖတ်ရဟုရန် stream တစ်ခု ဖန်တီသခဌင်သ၊
  • ဝယ်သူ ID နဟင့် စတော့သင်္ကေတဖဌင့် အဝင်မဟတ်တမ်သမျာသကို အုပ်စုဖလဲ့ခဌင်သ။ groupBy method ကိုခေါ်ဆိုခဌင်သသည် KGroupedStream အတန်သ၏ သာဓကတစ်ခုကို ပဌန်ပေသသည်။
  • KGroupedStream.windowedBy method သည် windowed ပေါင်သစည်သမဟုကို ခလင့်ပဌုသည့် အချိန်ဝင်သဒိုသတစ်ခုအတလက် ကန့်သတ်ထာသသော ဒေတာစီသကဌောင်သကို ပဌန်ပေသသည်။ ဝင်သဒိုသအမျိုသအစာသပေါ် မူတည်၍ TimeWindowedKStream သို့မဟုတ် SessionWindowedKStream ကို ပဌန်ပေသသည်။
  • ပေါင်သစည်သလည်ပတ်မဟုအတလက် ငလေပေသငလေယူ အရေအတလက်။ Windowed data flow သည် ကရေတလက်မဟုတလင် သီသခဌာသမဟတ်တမ်သတစ်ခုကို ထည့်သလင်သထာသခဌင်သရဟိမရဟိ ဆုံသဖဌတ်ပေသသည်။
  • ရလဒ်မျာသကို အကဌောင်သအရာတစ်ခုသို့ ရေသသာသခဌင်သ သို့မဟုတ် ဖလံ့ဖဌိုသတိုသတက်မဟုကာလအတလင်သ ၎င်သတို့ကို ကလန်ဆိုသလ်သို့ ထုတ်ပေသခဌင်သ။

ကအပလီကေသရဟင်သ၏ topology သည် ရိုသရဟင်သသော်လည်သ ရဟင်သလင်သသောပုံသည် အထောက်အကူဖဌစ်လိမ့်မည်။ Fig ကိုကဌည့်ရအောင်။ ၅.၁၁။

ထို့နောက်၊ ကျလန်ုပ်တို့သည် ဝင်သဒိုသလုပ်ဆောင်မဟုမျာသ၏ လုပ်ဆောင်နိုင်စလမ်သနဟင့် သက်ဆိုင်သောကုဒ်ကို ကဌည့်ပါမည်။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"

Window အမျိုသအစာသမျာသ

Kafka Stream တလင် ပဌတင်သပေါက်သုံသမျိုသရဟိသည်။

  • sessional;
  • “ပဌုတ်ကျခဌင်သ” (tumbling);
  • လျဟော/ခုန်။

ဘယ်ဟာကို ရလေသချယ်မလဲ ဆိုတာကတော့ သင့်လုပ်ငန်သရဲ့ လိုအပ်ချက်တလေပေါ်မဟာ မူတည်ပါတယ်။ ခုန်ချခဌင်သနဟင့် ခုန်ခဌင်သပဌတင်သပေါက်မျာသကို အချိန်ကန့်သတ်ထာသသော်လည်သ၊ စက်ရဟင်ဝင်သဒိုသမျာသကို အသုံသပဌုသူလုပ်ဆောင်ချက်ဖဌင့် ကန့်သတ်ထာသသော်လည်သ၊ စက်ရဟင်(မျာသ) ၏ကဌာချိန်ကို အသုံသပဌုသူမည်မျဟတက်ကဌလစလာဖဌင့်သာ ဆုံသဖဌတ်သည်။ အဓိကမဟတ်သာသထာသရမည့်အချက်မဟာ ဝင်သဒိုသအမျိုသအစာသအာသလုံသသည် စနစ်အချိန်မဟုတ်ဘဲ ထည့်သလင်သမဟုမျာသ၏ ရက်စလဲ/အချိန်တံဆိပ်ခေါင်သမျာသပေါ်တလင် အခဌေခံထာသသည်။

ထို့နောက်၊ ကျလန်ုပ်တို့သည် window အမျိုသအစာသတစ်ခုစီဖဌင့် ကျလန်ုပ်တို့၏ topology ကိုအကောင်အထည်ဖော်သည်။ ကုဒ်အပဌည့်အစုံကို ပထမဥပမာတလင်သာ ပေသလိမ့်မည်၊ အခဌာသ windows အမျိုသအစာသမျာသအတလက် window operation အမျိုသအစာသမဟလလဲ၍ ဘာမဟမပဌောင်သလဲပါ။

စက်ရဟင် ပဌတင်သပေါက်မျာသ

Session windows သည် အခဌာသသော windows အမျိုသအစာသအာသလုံသနဟင့် အလလန်ကလာခဌာသပါသည်။ ၎င်သတို့သည် အသုံသပဌုသူ၏ လုပ်ဆောင်ချက် (သို့မဟုတ် သင်ခဌေရာခံလိုသည့် အဖလဲ့အစည်သ၏ လုပ်ဆောင်ချက်) ကဌောင့် အချိန်အာသဖဌင့် အကန့်အသတ်မရဟိပေ။ ဆက်ရဟင်ပဌတင်သပေါက်မျာသကို လဟုပ်ရဟာသမဟုမရဟိသောကာလမျာသဖဌင့် ကန့်သတ်ထာသသည်။

ပုံ 5.12 သည် session windows ၏ သဘောတရာသကို သရုပ်ဖော်သည်။ သေသငယ်သော စက်ရဟင်သည် ၎င်သ၏ဘယ်ဘက်ရဟိ စက်ရဟင်နဟင့် ပေါင်သစည်သမည်ဖဌစ်သည်။ ညာဘက်ရဟိ ဆက်ရဟင်သည် အချိန်အကဌာကဌီသ လဟုပ်ရဟာသမဟုမရဟိသောကဌောင့် ၎င်သသည် သီသခဌာသဖဌစ်သည်။ စက်ရဟင်ဝင်သဒိုသမျာသသည် အသုံသပဌုသူလုပ်ဆောင်ချက်အပေါ် အခဌေခံသော်လည်သ မည်သည့်စက်ရဟင်ပါဝင်သည်ကို ဆုံသဖဌတ်ရန် ထည့်သလင်သမဟုမျာသမဟ ရက်စလဲ/အချိန်တံဆိပ်ခေါင်သမျာသကို အသုံသပဌုပါ။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"

စတော့ရဟယ်ယာအရောင်သအ၀ယ်မျာသကိုခဌေရာခံရန် session windows ကိုအသုံသပဌုခဌင်သ။

ငလေလဲလဟယ်မဟုဆိုင်ရာ အချက်အလက်မျာသကို ဖမ်သယူရန် စက်ရဟင်ဝင်သဒိုသမျာသကို အသုံသပဌုကဌပါစို့။ session windows ၏အကောင်အထည်ဖော်မဟုကို Listing 5.5 တလင်ပဌသထာသသည် (၎င်သကို src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java တလင်တလေ့နိုင်သည်)။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"
က topology တလင် လုပ်ဆောင်မဟုအမျာသစုကို သင်မဌင်ပဌီသပဌီဖဌစ်သောကဌောင့် ၎င်သတို့ကို ကနေရာတလင် ထပ်မံကဌည့်ရဟုရန် မလိုအပ်ပါ။ ဒါပေမယ့် အခု ဆလေသနလေသမယ့် အကဌောင်သအရာသစ်တလေလည်သ ဒီမဟာ ရဟိပါတယ်။

GroupBy လုပ်ဆောင်မဟုတိုင်သသည် ပုံမဟန်အာသဖဌင့် စုစည်သမဟု လုပ်ဆောင်ချက် (ပေါင်သစည်သခဌင်သ၊ စုစည်သခဌင်သ သို့မဟုတ် ရေတလက်ခဌင်သ) အချို့ကို လုပ်ဆောင်ပါသည်။ သတ်မဟတ်ထာသသောအချိန်ဝင်သဒိုသအတလင်သ အကောင့်မဟတ်တမ်သမျာသထည့်သလင်သပေသသော လည်ပတ်နေသောစုစုပေါင်သ သို့မဟုတ် စုစည်သမဟုပေါင်သစည်သခဌင်သကို သင်လုပ်ဆောင်နိုင်သည်။

Listing 5.5 ရဟိ ကုဒ်သည် session windows အတလင်သ ငလေပေသငလေယူ အရေအတလက်ကို ရေတလက်သည်။ ပုံတလင်။ 5.13 ကလုပ်ဆောင်ချက်မျာသကို အဆင့်ဆင့်ခလဲခဌမ်သစိတ်ဖဌာပါသည်။

WindowedBy(SessionWindows.with(twentySeconds).(fifteen Minutes))အထိ ခေါ်ဆိုခဌင်သဖဌင့် ကျလန်ုပ်တို့သည် လဟုပ်ရဟာသမဟုမရဟိသောကာလ စက္ကန့် 20 နဟင့် ဆက်တိုက်ကဌာသကာလ 15 မိနစ်ဖဌင့် စက်ရဟင်ဝင်သဒိုသတစ်ခုကို ဖန်တီသပါသည်။ စက္ကန့် 20 ၏ idle interval ဆိုသည်မဟာ အပလီကေသရဟင်သတလင် ပဌီသဆုံသချိန်မဟ 20 စက္ကန့်အတလင်သ သို့မဟုတ် လက်ရဟိ session စတင်သည့် လက်ရဟိ (active) session သို့ ရောက်ရဟိသည့် မည်သည့် entry ကိုမဆို ပါ၀င်မည်ဖဌစ်သည်။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"
ထို့နောက်၊ ကျလန်ုပ်တို့သည် စက်ရဟင်ဝင်သဒိုသတလင် မည်သည့်စုပေါင်သဆောင်ရလက်မဟုကို လုပ်ဆောင်ရန် လိုအပ်ကဌောင်သ သတ်မဟတ်ပါ - ကကိစ္စတလင်၊ ရေတလက်ပါ။ အဝင်အထလက်တစ်ခုသည် လဟုပ်ရဟာသမဟုမရဟိသောဝင်သဒိုသ၏ အပဌင်ဘက်တလင် ကျရောက်ပါက (ရက်စလဲ/အချိန်တံဆိပ်တုံသ၏ တစ်ဖက်တစ်ချက်စီ)၊ အပလီကေသရဟင်သသည် စက်ရဟင်အသစ်တစ်ခု ဖန်တီသပေသပါသည်။ Retention interval ဆိုသည်မဟာ session တစ်ခုအာသ အချိန်အတိုင်သအတာတစ်ခုအထိ ထိန်သသိမ်သထာသပဌီသ session ၏လဟုပ်ရဟာသမဟုမရဟိသောကာလထက် ကျော်လလန်သလာသသော နောက်ကျဒေတာကို ခလင့်ပဌုပေသသော်လည်သ ဆက်လက်ချိတ်ဆက်နိုင်ပါသည်။ ထို့အပဌင်၊ ပေါင်သစည်သခဌင်သမဟ ထလက်ပေါ်လာသော စက်ရဟင်အသစ်၏ အစနဟင့် အဆုံသသည် အစောဆုံသနဟင့် နောက်ဆုံသပေါ်ရက်စလဲ/အချိန်တံဆိပ်တုံသနဟင့် ကိုက်ညီပါသည်။

စက်ရဟင်မျာသ မည်သို့အလုပ်လုပ်သည်ကို ကဌည့်ရဟုရန် ရေတလက်နည်သမဟ ထည့်သလင်သမဟုအချို့ကို ကဌည့်ကဌပါစို့ (ဇယာသ 5.1)။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"
မဟတ်တမ်သမျာသ ရောက်ရဟိလာသောအခါ၊ တူညီသောသော့ဖဌင့် ရဟိပဌီသသာသ စက်ရဟင်မျာသ၊ လက်ရဟိ ရက်စလဲ/အချိန် တံဆိပ်ခေါင်သထက် နည်သသော ကုန်ဆုံသချိန် - လဟုပ်ရဟာသမဟုမရဟိသော ကဌာသကာလနဟင့် လက်ရဟိ ရက်စလဲ/အချိန်တံဆိပ်တုံသ + လဟုပ်ရဟာသမဟုမရဟိသော ကဌာသကာလထက် ပိုကဌီသသော စတင်ချိန်ကို ရဟာဖလေပါသည်။ ဒါကို ထည့်သလင်သစဉ်သစာသရင် ဇယာသထဲက အချက်လေသချက်ပါ။ 5.1 ကို အောက်ပါအတိုင်သ session တစ်ခုထဲသို့ ပေါင်သစပ်ထာသသည်။

1. Record 1 သည် ပထမဆုံသရောက်ရဟိသောကဌောင့် စတင်ချိန်သည် ပဌီသဆုံသချိန်နဟင့် 00:00:00 ဖဌစ်သည်။

2. ထို့နောက်၊ ဝင်ခလင့် 2 ရောက်ရဟိလာပဌီသ 23:59:55 ထက်မစောဘဲ ပဌီသဆုံသမည့် 00:00:35 ထက် နောက်မကျစေဘဲ စတင်ရန် ကျလန်ုပ်တို့ ရဟာဖလေပါသည်။ ကျလန်ုပ်တို့သည် မဟတ်တမ်သ 1 ကို ရဟာတလေ့ပဌီသ အပိုင်သ 1 နဟင့် 2 ကို ပေါင်သစပ်ထာသသည်။ ကျလန်ုပ်တို့သည် session 1 ၏ စတင်ချိန် (အစောပိုင်သ) နဟင့် session 2 (နောက်ပိုင်သ) ၏ ပဌီသဆုံသချိန်ကို ယူသည်၊ ထို့ကဌောင့် ကျလန်ုပ်တို့၏ session အသစ်သည် 00:00:00 တလင် စတင်ပဌီသ 00 တလင် ပဌီသဆုံသသည်- 00:15 ။

3. Record 3 ရောက်ပဌီ၊ ကျလန်ုပ်တို့သည် 00:00:30 နဟင့် 00:01:10 ကဌာသရဟိ sessions မျာသကိုရဟာဖလေပဌီသ မည်သည့်အရာကိုမျဟ ရဟာမတလေ့ပါ။ 123-345-654၊FFBE၊ 00:00:50 တလင် စတင်ပဌီသ အဆုံသသတ်မည့် သော့အတလက် ဒုတိယ session တစ်ခုကို ထည့်ပါ။

4. မဟတ်တမ်သ 4 ရောက်ရဟိလာပဌီသ 23:59:45 နဟင့် 00:00:25 ကဌာသတလင် အစည်သအဝေသမျာသကို ရဟာဖလေနေပါသည်။ ကအကဌိမ်တလင် session 1 နဟင့် 2 နဟစ်ခုစလုံသကို တလေ့ရဟိပါသည်။ session သုံသခုလုံသကို စတင်ချိန် 00:00:00 နဟင့် ပဌီသဆုံသချိန် 00:00:15 တို့ဖဌင့် တစ်ခုတည်သအဖဌစ် ပေါင်သစပ်ထာသသည်။

ကကဏ္ဍတလင်ဖော်ပဌထာသသည့်အရာမဟ၊ အောက်ပါအရေသကဌီသသော nuances မျာသကို မဟတ်သာသထာသသင့်ပါသည်။

  • session မျာသသည် ပုံသေအရလယ်အစာသ windows မဟုတ်ပါ။ စက်ရဟင်တစ်ခု၏ကဌာချိန်ကို ပေသထာသသည့်အချိန်တစ်ခုအတလင်သ လုပ်ဆောင်မဟုမဟ ဆုံသဖဌတ်သည်။
  • ဒေတာရဟိ ရက်စလဲ/အချိန်တံဆိပ်တုံသမျာသသည် အဖဌစ်အပျက်သည် လက်ရဟိစက်ရဟင်တစ်ခုအတလင်သ သို့မဟုတ် အလုပ်မလုပ်သောကာလတစ်ခုအတလင်သ အဖဌစ်အပျက်ကို ဆုံသဖဌတ်သည်။

ထို့နောက် ကျလန်ုပ်တို့သည် နောက်ထပ် window အမျိုသအစာသ - "tumbling" windows ကို ဆလေသနလေသပါမည်။

"ကဌလေ" ပဌတင်သပေါက်မျာသ

ကဌလေကျနေသော ပဌတင်သပေါက်မျာသသည် အချိန်ကာလတစ်ခုအတလင်သ ကျရောက်နေသော အဖဌစ်အပျက်မျာသကို ဖမ်သယူသည်။ စက္ကန့် 20 တိုင်သ ကုမ္ပဏီတစ်ခု၏ စတော့အရောင်သအဝယ်မျာသအာသလုံသကို ဖမ်သယူရန် လိုအပ်သည်ဟု စိတ်ကူသကဌည့်ပါ၊ ထို့ကဌောင့် ထိုအချိန်ကာလအတလင်သ အဖဌစ်အပျက်အာသလုံသကို စုဆောင်သပါ။ စက္ကန့် 20 ကဌာသကာလ၏အဆုံသတလင်၊ ပဌတင်သပေါက်သည် ရလေ့လျာသပဌီသ စက္ကန့် 20-ကဌာသကာလအသစ်သို့ ရလေ့လျာသသည်။ ပုံ 5.14 သည် ကအခဌေအနေကို ဖော်ပဌသည်။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"
သင်တလေ့မဌင်ရသည့်အတိုင်သ ပဌီသခဲ့သော 20 စက္ကန့်အတလင်သ ရရဟိခဲ့သော အဖဌစ်အပျက်မျာသအာသလုံသကို window တလင် ထည့်သလင်သထာသပါသည်။ ကအချိန်ကာလ၏အဆုံသတလင်၊ ဝင်သဒိုသအသစ်တစ်ခုဖန်တီသသည်။

စာရင်သပဌုစုခဌင်သ 5.6 သည် စက္ကန့် 20 တိုင်သ စတော့ရဟယ်ယာအရောင်သအ၀ယ်မျာသကိုဖမ်သယူရန် တုန်ခါနေသောပဌတင်သပေါက်မျာသအသုံသပဌုမဟုကို သရုပ်ပဌသည့်ကုဒ်ကိုပဌသသည် (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) တလင်တလေ့ရဟိရသည်။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"
TimeWindows.of နည်သလမ်သခေါ်ဆိုမဟုသို့ ကသေသငယ်သောပဌောင်သလဲမဟုဖဌင့်၊ သင်သည် ဖရိုဖရဲဝင်သဒိုသကို အသုံသပဌုနိုင်သည်။ ကဥပမာသည် Until() နည်သလမ်သကို မခေါ်ဆိုပါ၊ ထို့ကဌောင့် 24 နာရီ၏ ပုံသေထိန်သသိမ်သမဟုကဌာသကာလကို အသုံသပဌုပါမည်။

နောက်ဆုံသအနေနဲ့၊ ဝင်သဒိုသရလေသချယ်စရာမျာသရဲ့ နောက်ဆုံသဖဌစ်တဲ့ "ခုန်ခဌင်သ" ဝင်သဒိုသတလေကို ရလဟေ့ဖို့ အချိန်ရောက်ပါပဌီ။

လျဟော ("ခုန်") ပဌတင်သပေါက်

လျဟော/ခုန်ပဌတင်သပေါက်မျာသသည် ကဌလေကျနေသောပဌတင်သပေါက်မျာသနဟင့် ဆင်တူသော်လည်သ အနည်သငယ်ကလာခဌာသချက်ရဟိသည်။ လျဟောပဌတင်သပေါက်မျာသသည် လတ်တလောဖဌစ်ရပ်မျာသကို စီမံဆောင်ရလက်ရန် ဝင်သဒိုသအသစ်မဖန်တီသမီ အချိန်ကဌာသကာလ ကုန်ဆုံသသည်အထိ မစောင့်ပါ။ ပဌတင်သပေါက်ကဌာချိန်ထက်နည်သသော စောင့်ဆိုင်သကာလတစ်ခုပဌီသနောက် ၎င်သတို့သည် တလက်ချက်မဟုအသစ်မျာသကို စတင်သည်။

ပဌတင်သပေါက်မျာသ ပဌိုကျခဌင်သနဟင့် ခုန်ခဌင်သကဌာသ ခဌာသနာသချက်မျာသကို သရုပ်ဖော်ရန်၊ စတော့အိတ်ချိန်သ အရောင်သအဝယ်မျာသကို ရေတလက်ခဌင်သ၏ ဥပမာကို ပဌန်ကဌည့်ကဌပါစို့။ ကျလန်ုပ်တို့၏ပန်သတိုင်မဟာ ငလေပေသငလေယူအရေအတလက်ကို ရေတလက်ရန်ဖဌစ်နေဆဲဖဌစ်သော်လည်သ ကောင်တာမလမ်သမံခဌင်သမပဌုမီ အချိန်အတိုင်သအတာတစ်ခုလုံသကို မစောင့်ချင်ပါ။ ယင်သအစာသ၊ ကျလန်ုပ်တို့သည် ပိုတိုသောကာလမျာသတလင် ကောင်တာကို အပ်ဒိတ်လုပ်ပါမည်။ ဥပမာအာသဖဌင့်၊ ပုံတလင်ပဌထာသသည့်အတိုင်သ စက္ကန့် 20 တိုင်သ ငလေပေသငလေယူအရေအတလက်ကို ကျလန်ုပ်တို့ ရေတလက်နေဆဲဖဌစ်သော်လည်သ ပုံတလင်ပဌထာသသည့်အတိုင်သ 5 စက္ကန့်တိုင်သ ကောင်တာကို အပ်ဒိတ်လုပ်ပါ။ ၅.၁၅။ ကကိစ္စတလင်၊ ကျလန်ုပ်တို့သည် ထပ်နေသောဒေတာမျာသဖဌင့် ရလဒ် window သုံသခုကို အဆုံသသတ်ထာသသည်။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"
စာရင်သပဌုစုခဌင်သ 5.7 သည် လျဟောပဌတင်သပေါက်မျာသကို သတ်မဟတ်ခဌင်သအတလက် ကုဒ်ကို ပဌသည် (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) တလင် တလေ့ရသည်။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"
advanceBy() နည်သလမ်သသို့ ခေါ်ဆိုမဟုတစ်ခုကို ပေါင်သထည့်ခဌင်သဖဌင့် ပဌုတ်ကျနေသောဝင်သဒိုသကို ခုန်ပေါက်နေသည့်ဝင်သဒိုသအဖဌစ်သို့ ပဌောင်သလဲနိုင်သည်။ ပဌထာသသည့်ဥပမာတလင်၊ သိမ်သဆည်သချိန်သည် 15 မိနစ်ဖဌစ်သည်။

ပေါင်သစည်သမဟုရလဒ်မျာသကို အချိန်ပဌတင်သပေါက်မျာသတလင် မည်သို့ကန့်သတ်ရမည်ကို ကကဏ္ဍတလင် သင်တလေ့ခဲ့ရသည်။ အထူသသဖဌင့်၊ ကကဏ္ဍမဟ အောက်ပါအချက်သုံသချက်ကို မဟတ်သာသထာသစေလိုပါသည်။

  • session windows ၏အရလယ်အစာသကို အချိန်ကာလအာသဖဌင့်မကန့်သတ်ဘဲ၊ အသုံသပဌုသူ၏လုပ်ဆောင်ချက်အာသဖဌင့်၊
  • “ပဌုတ်ကျခဌင်သ” ပဌတင်သပေါက်မျာသသည် သတ်မဟတ်အချိန်အတလင်သ အဖဌစ်အပျက်မျာသ၏ ခဌုံငုံသုံသသပ်ချက်ကို ပေသစလမ်သသည်။
  • ပဌတင်သပေါက်မျာသခုန်ခဌင်သ၏ကဌာချိန်ကို သတ်မဟတ်ထာသသော်လည်သ ၎င်သတို့ကို မကဌာခဏ အပ်ဒိတ်လုပ်ထာသပဌီသ ပဌတင်သပေါက်အာသလုံသတလင် ထပ်နေသည့်အရာမျာသ ပါဝင်နိုင်ပါသည်။

ထို့နောက်၊ ချိတ်ဆက်မဟုအတလက် KTable ကို KStream သို့ ပဌန်ပဌောင်သနည်သကို လေ့လာပါမည်။

၅.၃.၃။ KStream နဟင့် KTable အရာဝတ္ထုမျာသကို ချိတ်ဆက်ခဌင်သ။

အခန်သ 4 တလင်၊ KStream အရာဝတ္ထုနဟစ်ခုကို ချိတ်ဆက်ရန် ဆလေသနလေသထာသသည်။ ယခု ကျလန်ုပ်တို့သည် KTable နဟင့် KStream ကို မည်သို့ချိတ်ဆက်ရမည်ကို လေ့လာရမည်ဖဌစ်ပါသည်။ အောက်ပါ ရိုသရဟင်သသော အကဌောင်သပဌချက်ကဌောင့် ၎င်သကို လိုအပ်နိုင်ပါသည်။ KStream သည် မဟတ်တမ်သစီသကဌောင်သတစ်ခုဖဌစ်ပဌီသ KTable သည် မဟတ်တမ်သအပ်ဒိတ်စီသကဌောင်သတစ်ခုဖဌစ်သော်လည်သ၊ တစ်ခါတစ်ရံတလင် သင်သည် KTable မဟအပ်ဒိတ်မျာသကို အသုံသပဌု၍ မဟတ်တမ်သစီသကဌောင်သသို့ နောက်ထပ်အကဌောင်သအရာကို ထပ်လောင်သထည့်လိုပေမည်။

စတော့အိတ်ချိန်သ အရောင်သအ၀ယ်ပဌုလုပ်မဟု အရေအတလက်ဆိုင်ရာ အချက်အလက်ကို ရယူပဌီသ သက်ဆိုင်ရာ လုပ်ငန်သမျာသအတလက် စတော့အိတ်ချိန်သသတင်သမျာသနဟင့် ပေါင်သစပ်လိုက်ကဌပါစို့။ ကသည်မဟာ သင့်တလင်ရဟိပဌီသသာသကုဒ်ကို ရရဟိရန် သင်လုပ်ဆောင်ရမည့်အရာဖဌစ်သည်။

  1. KTable အရာဝတ္ထုတစ်ခုကို စတော့ရဟယ်ယာအရောင်သအ၀ယ်ပဌုလုပ်မဟုအရေအတလက်ဆိုင်ရာဒေတာဖဌင့် KStream သို့ပဌောင်သလဲပဌီသနောက် ကစတော့ရဟယ်ယာသင်္ကေတနဟင့်သက်ဆိုင်သည့်စက်မဟုကဏ္ဍကဏ္ဍကိုညလဟန်ပဌသောသော့ဖဌင့်အစာသထိုသခဌင်သဖဌင့်သော့ကိုအစာသထိုသပါ။
  2. စတော့အိတ်ချိန်သသတင်သပါသော အကဌောင်သအရာတစ်ခုမဟ ဒေတာကိုဖတ်သည့် KTable အရာတစ်ခုကို ဖန်တီသပါ။ က KTable အသစ်ကို စက်မဟုကဏ္ဍအလိုက် အမျိုသအစာသခလဲမည်ဖဌစ်သည်။
  3. လုပ်ငန်သကဏ္ဍအလိုက် စတော့အိတ်ချိန်သ အရောင်သအ၀ယ်ပဌုလုပ်မဟု အရေအတလက်ဆိုင်ရာ အချက်အလက်မျာသနဟင့် သတင်သအပ်ဒိတ်မျာသကို ချိတ်ဆက်ပါ။

အခု ဒီလုပ်ဆောင်ချက်အစီအစဥ်ကို ဘယ်လိုအကောင်အထည်ဖော်မလဲ ကဌည့်ရအောင်။

KTable ကို KStream သို့ ပဌောင်သပါ။

KTable ကို KStream သို့ ပဌောင်သရန် အောက်ပါတို့ကို လုပ်ဆောင်ရန် လိုအပ်ပါသည်။

  1. KTable.toStream() နည်သလမ်သကို ခေါ်ပါ။
  2. KStream.map နည်သလမ်သကို ခေါ်ဆိုခဌင်သဖဌင့်၊ လုပ်ငန်သအမည်ဖဌင့် သော့ကို အစာသထိုသပဌီသ Windowed instance မဟ TransactionSummary အရာဝတ္တုကို ပဌန်လည်ရယူပါ။

ကလုပ်ဆောင်ချက်မျာသကို အောက်ပါအတိုင်သ အတူတကလ ချိတ်ဆက်လုပ်ဆောင်ပါမည် (ကုဒ်ကို ဖိုင် src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.8) တလင် တလေ့ရဟိနိုင်ပါသည်။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"
ကျလန်ုပ်တို့သည် KStream.map လုပ်ဆောင်ချက်ကို လုပ်ဆောင်နေသောကဌောင့်၊ ချိတ်ဆက်မဟုတစ်ခုတလင် အသုံသပဌုသောအခါတလင် ပဌန်ပေသထာသသော KStream နမူနာကို အလိုအလျောက် ပဌန်လည်ပိုင်သဖဌတ်ထာသသည်။

ကျလန်ုပ်တို့သည် ပဌောင်သလဲခဌင်သလုပ်ငန်သစဉ်ကို ပဌီသမဌောက်ခဲ့ပဌီသ၊ နောက်တလင် စတော့ခ်သတင်သဖတ်ရဟုရန်အတလက် KTable အရာဝတ္ထုတစ်ခုကို ဖန်တီသရန် လိုအပ်ပါသည်။

စတော့သတင်သမျာသအတလက် KTable ကိုဖန်တီသခဌင်သ။

ကံကောင်သထောက်မစလာ၊ KTable အရာဝတ္ထုတစ်ခုဖန်တီသခဌင်သသည် ကုဒ်တစ်ကဌောင်သသာယူသည် (ကုဒ်ကို src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.9) တလင် တလေ့နိုင်ပါသည်။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"
ဆက်တင်မျာသတလင် Serdes ကိုအသုံသပဌုထာသသောကဌောင့် Serde အရာဝတ္ထုမျာသကို သတ်မဟတ်ရန်မလိုအပ်ကဌောင်သ သတိပဌုသင့်သည်။ ထို့အပဌင်၊ အစောဆုံသ စာရင်သကောက်ခဌင်သကို အသုံသပဌုခဌင်သဖဌင့်၊ ဇယာသသည် အစပိုင်သတလင် မဟတ်တမ်သမျာသဖဌင့် ပဌည့်နေပါသည်။

ယခုကျလန်ုပ်တို့သည်နောက်ဆုံသအဆင့် - ချိတ်ဆက်မဟုသို့ဆက်သလာသနိုင်သည်။

ငလေပေသငလေယူ အရေအတလက်ဒေတာဖဌင့် သတင်သအပ်ဒိတ်မျာသကို ချိတ်ဆက်ခဌင်သ။

ချိတ်ဆက်မဟုတစ်ခုဖန်တီသရန် မခက်ခဲပါ။ သက်ဆိုင်ရာစက်မဟုလုပ်ငန်သအတလက် စတော့ခ်သတင်သမျာသမရဟိသောအခါတလင် ဘယ်ဘက် join ကိုအသုံသပဌုပါမည် (လိုအပ်သောကုဒ်ကို src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.10) တလင်တလေ့နိုင်ပါသည်။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"
က leftJoin အော်ပရေတာသည် အတော်လေသရိုသရဟင်သပါသည်။ Chapter 4 ရဟိ Join မျာသနဟင့်မတူဘဲ KStream-KTable Join ကိုလုပ်ဆောင်သောအခါတလင် KTable တလင်ထည့်သလင်သမဟုတစ်ခုသာရဟိသောကဌောင့် JoinWindow နည်သလမ်သကို အသုံသမပဌုပါ။ ထိုသို့သောချိတ်ဆက်မဟုသည် အချိန်အကန့်အသတ်မရဟိပါ- မဟတ်တမ်သသည် KTable တလင်ဖဌစ်စေ သို့မဟုတ် ပျက်ကလက်နေပါသည်။ အဓိက နိဂုံသချုပ်ချက်- KTable အရာဝတ္ထုမျာသကို အသုံသပဌု၍ မကဌာခဏ မလမ်သမံပဌင်ဆင်ထာသသော ကိုသကာသမဟုဒေတာဖဌင့် KStream ကို ကဌလယ်ဝစေနိုင်သည်။

ယခု ကျလန်ုပ်တို့သည် KStream မဟ အစီအစဉ်မျာသကို မဌဟင့်တင်ရန် ပိုမိုထိရောက်သောနည်သလမ်သကို ကဌည့်ပါမည်။

၅.၃.၄။ GlobalKTable အရာဝတ္ထုမျာသ

သင်တလေ့မဌင်ရသည့်အတိုင်သ၊ အစီအစဉ်စီသကဌောင်သမျာသကို မဌဟင့်တင်ရန် သို့မဟုတ် ၎င်သတို့တလင် အကဌောင်သအရာထည့်ရန် လိုအပ်ပါသည်။ အခန်သ 4 တလင် KStream အရာဝတ္ထုနဟစ်ခုကဌာသ ချိတ်ဆက်မဟုမျာသကို သင်မဌင်ရပဌီသ ယခင်အပိုင်သတလင် KStream နဟင့် KTable တစ်ခုကဌာသ ချိတ်ဆက်မဟုကို သင်တလေ့မဌင်ခဲ့ရသည်။ ကကိစ္စရပ်အာသလုံသတလင်၊ သော့မျာသကို အမျိုသအစာသ သို့မဟုတ် တန်ဖိုသအသစ်တစ်ခုသို့ ပုံဖော်သည့်အခါ ဒေတာစီသကဌောင်သကို ပဌန်လည်ပိုင်သခဌာသရန် လိုအပ်ပါသည်။ တစ်ခါတစ်ရံတလင် ပဌန်လည်ခလဲဝေခဌင်သကို အတိအလင်သလုပ်ဆောင်ပဌီသ တစ်ခါတစ်ရံ Kafka Streams သည် ၎င်သကို အလိုအလျောက်လုပ်ဆောင်သည်။ သော့မျာသပဌောင်သလဲပဌီသ မဟတ်တမ်သမျာသကို ကဏ္ဍအသစ်မျာသတလင် အဆုံသသတ်ရမည်ဖဌစ်ပဌီသ၊ သို့မဟုတ်ပါက ချိတ်ဆက်မဟုမဖဌစ်နိုင်ပါ (၎င်သကို အခန်သ 4၊ အပိုင်သခလဲ 4.2.4 တလင် အခန်သခလဲ XNUMX တလင် ဆလေသနလေသထာသသည်)။

ပဌန်လည်ခလဲဝေရာတလင် ကုန်ကျစရိတ်ရဟိသည်။

ပဌန်လည်ပိုင်သခဌာသခဌင်သအတလက် ကုန်ကျစရိတ်မျာသ လိုအပ်သည် - အလယ်အလတ်အကဌောင်သအရာမျာသဖန်တီသခဌင်သ၊ အခဌာသအကဌောင်သအရာတစ်ခုတလင် ထပ်နေသောဒေတာကို သိမ်သဆည်သခဌင်သအတလက် အပိုအရင်သအမဌစ်ကုန်ကျစရိတ်၊ ကအကဌောင်သအရာမဟ ရေသသာသခဌင်သနဟင့် စာဖတ်ခဌင်သတို့ကဌောင့် latency တိုသလာခဌင်သကိုလည်သ ဆိုလိုသည်။ ထို့အပဌင်၊ အသလင်အပဌင်တစ်ခု သို့မဟုတ် အတိုင်သအတာတစ်ခုထက်ပို၍ ပူသပေါင်သပါဝင်ရန် လိုအပ်ပါက၊ ချိတ်ဆက်မဟုမျာသကို ချိတ်ဆက်ရန်၊ မဟတ်တမ်သမျာသကို သော့အသစ်မျာသဖဌင့် မဌေပုံဆလဲကာ ပဌန်လည်ခလဲခဌမ်သခဌင်သလုပ်ငန်သစဉ်ကို ထပ်မံလုပ်ဆောင်ရမည်ဖဌစ်သည်။

သေသငယ်သောဒေတာအတလဲမျာသသို့ ချိတ်ဆက်နေသည်။

အချို့သောကိစ္စမျာသတလင်၊ ချိတ်ဆက်ရမည့် ကိုသကာသမဟုဒေတာပမာဏသည် အတော်လေသသေသငယ်သောကဌောင့် ကော်ပီအပဌည့်အစုံသည် node တစ်ခုစီတလင် အလလယ်တကူထည့်သလင်သနိုင်သည်။ ကကဲ့သို့သောအခဌေအနေမျာသအတလက် Kafka Streams သည် GlobalKTable အတန်သကိုထောက်ပံ့ပေသသည်။

အပလီကေသရဟင်သသည် ဒေတာအာသလုံသကို node တစ်ခုစီသို့ ပုံတူပလာသထာသသောကဌောင့် GlobalKTable ဖဌစ်ရပ်မျာသသည် ထူသခဌာသပါသည်။ ဒေတာအာသလုံသသည် node တစ်ခုစီတလင်ရဟိနေသောကဌောင့်၊ ၎င်သကို partitions အာသလုံသတလင်ရရဟိနိုင်စေရန်ရည်ညလဟန်သဒေတာသော့ဖဌင့် event stream ကိုအပိုင်သပိုင်သခလဲရန်မလိုအပ်ပါ။ GlobalKTable အရာဝတ္ထုမျာသကို အသုံသပဌု၍ သော့မဲ့ချိတ်ဆက်မဟုမျာသကိုလည်သ ပဌုလုပ်နိုင်သည်။ ကအင်္ဂါရပ်ကိုပဌသရန် ယခင်နမူနာမျာသထဲမဟ တစ်ခုကို ပဌန်သလာသကဌပါစို့။

KStream အရာမျာသကို GlobalKTable အရာဝတ္ထုမျာသနဟင့် ချိတ်ဆက်ခဌင်သ။

ပုဒ်မခလဲ 5.3.2 တလင်၊ ကျလန်ုပ်တို့သည် ဝယ်သူမျာသမဟ ငလေလဲခဌင်သဆိုင်ရာ ဝင်သဒိုသမျာသ ပေါင်သစည်သခဌင်သကို လုပ်ဆောင်ခဲ့ပါသည်။ ကစုစည်သမဟု၏ရလဒ်မျာသသည် ကကဲ့သို့ ဖဌစ်သည်-

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

ကရလဒ်မျာသသည် ရည်ရလယ်ချက်ကို ဖဌည့်ဆည်သပေသသော်လည်သ ဖောက်သည်၏အမည်နဟင့် ကုမ္ပဏီအမည်အပဌည့်အစုံကိုလည်သ ဖော်ပဌပါက ပိုမိုအသုံသဝင်မည်ဖဌစ်သည်။ ဖောက်သည်အမည်နဟင့် ကုမ္ပဏီအမည်ကို ပေါင်သထည့်ရန်၊ သင်သည် ပုံမဟန်ပါဝင်မဟုမျာသ ပဌုလုပ်နိုင်သော်လည်သ သော့မဌေပုံဆလဲခဌင်သနဟင့် ပဌန်လည်ပိုင်သခဌာသခဌင်သနဟစ်ခုကို ပဌုလုပ်ရန် လိုအပ်မည်ဖဌစ်သည်။ GlobalKTable ဖဌင့် သင်သည် ထိုကဲ့သို့သော လည်ပတ်မဟုကုန်ကျစရိတ်ကို ရဟောင်ရဟာသနိုင်သည်။

ဒါကိုလုပ်ဖို့၊ Listing 5.11 မဟ countStream အရာဝတ္တုကို အသုံသပဌုပါမည် (သက်ဆိုင်ရာကုဒ်ကို src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) နဟင့် GlobalKTable အရာဝတ္ထုနဟစ်ခုသို့ ချိတ်ဆက်ပါ။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"
ဒါကို အရင်က ကျလန်တော်တို့ ဆလေသနလေသပဌီသပဌီမို့ ထပ်မပဌောတော့ပါဘူသ။ သို့သော် toStream(.map function) ရဟိ ကုဒ်ကို ဖတ်နိုင်စေရန်အတလက် inline lambda expression အစာသ function object တစ်ခုသို့ abstract လုပ်ထာသသည်ကို ကျလန်ုပ်သတိပဌုမိပါသည်။

နောက်တစ်ဆင့်မဟာ GlobalKTable ၏ ဥပမာနဟစ်ခုကို ကဌေညာရန်ဖဌစ်သည် (ပဌထာသသည့်ကုဒ်ကို ဖိုင် src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (စာရင်သ 5.12) တလင် တလေ့နိုင်ပါသည်။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"

စာရင်သကောက်အမျိုသအစာသမျာသကို အသုံသပဌု၍ ခေါင်သစဉ်အမည်မျာသကို ဖော်ပဌထာသကဌောင်သ သတိပဌုပါ။

ယခု ကျလန်ုပ်တို့တလင် အစိတ်အပိုင်သအာသလုံသ အဆင်သင့်ဖဌစ်နေပဌီဖဌစ်၍ ကျန်အရာအာသလုံသမဟာ ချိတ်ဆက်မဟုအတလက် ကုဒ်ကိုရေသရန်ဖဌစ်သည် (ဖိုင် src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (စာရင်သ 5.13) တလင် တလေ့နိုင်ပါသည်။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"
ကကုဒ်တလင် ပူသတလဲနဟစ်ခုပါသော်လည်သ၊ ၎င်သတို့၏ရလဒ်မျာသကို သီသခဌာသအသုံသမပဌုသောကဌောင့် ၎င်သတို့ကို သံကဌိုသဖဌင့်ချည်နဟောင်ထာသသည်။ လုပ်ဆောင်ချက်တစ်ခုလုံသ၏အဆုံသတလင်ရလဒ်မျာသကိုပဌသသည်။

အထက်ဖော်ပဌပါ Join လုပ်ဆောင်ချက်ကို သင်လုပ်ဆောင်သောအခါ ကကဲ့သို့သောရလဒ်မျာသကို သင်ရရဟိလိမ့်မည်-

{customer='Barney, Smith' company="Exxon", transactions= 17}

အနဟစ်သာရက မပဌောင်သလဲပေမယ့် ဒီရလဒ်တလေက ပိုရဟင်သပါတယ်။

အခန်သ ၄ သို့ ရေတလက်ပါက၊ ချိတ်ဆက်မဟု အမျိုသအစာသမျာသစလာကို သင်တလေ့မဌင်ပဌီသဖဌစ်သည်။ ၎င်သတို့ကိုဇယာသတလင်ဖော်ပဌထာသသည်။ ၅.၂။ ကဇယာသသည် Kafka Stream ဗာသရဟင်သ 4 ၏ ချိတ်ဆက်နိုင်မဟုစလမ်သရည်မျာသကို ထင်ဟပ်စေပါသည်။ နောင်ထလက်ရဟိမဟုမျာသတလင် တစ်ခုခုပဌောင်သလဲသလာသနိုင်သည်။

“Kafka Stream in Action in Action စာအုပ်။ အချိန်နဟင့်တပဌေသညီ အလုပ်အတလက် အက်ပ်မျာသနဟင့် မိုက်ခရိုဝန်ဆောင်မဟုမျာသ"
အရာမျာသကို ခဌုံငုံကဌည့်ရန်၊ အခဌေခံမျာသကို ပဌန်ချုပ်ကဌည့်ကဌပါစို့- သင်သည် ဒေသဆိုင်ရာ အခဌေအနေမျာသကို အသုံသပဌု၍ ဖဌစ်ရပ်စီသကဌောင်သမျာသ (KStream) နဟင့် ထုတ်လလဟင့်မဟုမျာသ (KTable) ကို အပ်ဒိတ်လုပ်နိုင်ပါသည်။ တနည်သအာသဖဌင့် ရည်ညလဟန်သဒေတာ၏ အရလယ်အစာသသည် အလလန်မကဌီသပါက၊ သင်သည် GlobalKTable အရာဝတ္တုကို အသုံသပဌုနိုင်သည်။ GlobalKTables သည် Kafka Streams application node တစ်ခုစီတလင် အပိုင်သအာသလုံသကို ကူသယူထာသပဌီသ၊ သော့သည် မည်သည့်အပိုင်သနဟင့် သက်ဆိုင်သည်ဖဌစ်စေ ဒေတာအာသလုံသရရဟိနိုင်ကဌောင်သ သေချာစေပါသည်။

ထို့နောက် Kafka ခေါင်သစဉ်မဟ ဒေတာကို အသုံသမပဌုဘဲ ပဌည်နယ်ပဌောင်သလဲမဟုမျာသကို ကဌည့်ရဟုနိုင်သောကဌောင့် Kafka Stream အင်္ဂါရပ်ကို ကျလန်ုပ်တို့ မဌင်တလေ့ရမည်ဖဌစ်သည်။

၅.၃.၅။ မေသမဌန်သနိုင်သော အခဌေအနေ

ကျလန်ုပ်တို့သည် နိုင်ငံတော်နဟင့် ပတ်သက်သည့် လုပ်ဆောင်ချက်မျာသစလာကို လုပ်ဆောင်ပဌီသဖဌစ်ကာ ရလဒ်မျာသကို ကလန်ဆိုသလ် (ဖလံ့ဖဌိုသတိုသတက်ရေသဆိုင်ရာ ရည်ရလယ်ချက်မျာသအတလက်) သို့မဟုတ် ၎င်သတို့အာသ ခေါင်သစဉ်တစ်ခု (ထုတ်လုပ်ရေသရည်ရလယ်ချက်အတလက်) တလင် ရေသချပဌီသဖဌစ်သည်။ ရလဒ်မျာသကို ခေါင်သစဉ်တစ်ခုသို့ ရေသသောအခါ၊ ၎င်သတို့ကို ကဌည့်ရဟုရန် Kafka စာသသုံသသူကို အသုံသပဌုရပါမည်။

ကအကဌောင်သအရာမျာသမဟ အချက်အလက်မျာသကို ဖတ်ရဟုခဌင်သသည် ရုပ်လုံသပေါ်လာသော အမဌင်အမျိုသအစာသတစ်ခုဟု ယူဆနိုင်ပါသည်။ ကျလန်ုပ်တို့၏ ရည်ရလယ်ချက်မျာသအတလက်၊ Wikipedia မဟ ရုပ်လုံသပေါ်လာသော အမဌင်၏ အဓိပ္ပါယ်ဖလင့်ဆိုချက်ကို သုံသနိုင်သည်- “... စုံစမ်သမဟုတစ်ခု၏ ရလဒ်မျာသပါရဟိသော ရုပ်ပိုင်သဆိုင်ရာဒေတာဘေ့စ်အရာဝတ္ထုတစ်ခု။ ဥပမာအာသဖဌင့်၊ ၎င်သသည် အဝေသမဟဒေတာ၏ ဒေသန္တရမိတ္တူ၊ သို့မဟုတ် ဇယာသတစ်ခု၏ အတန်သမျာသနဟင့်/သို့မဟုတ် ကော်လံမျာသ၏ အခလဲတစ်ခု သို့မဟုတ် ရလဒ်မျာသ စုစည်သမဟုမဟတစ်ဆင့် ရရဟိသော အကျဉ်သချုပ်ဇယာသ ဖဌစ်နိုင်သည်" (https://en.wikipedia.org/wiki /materialized_view)။

Kafka Streams သည် သင့်အာသ နိုင်ငံတော်စတိုသဆိုင်မျာသတလင် အပဌန်အလဟန်မေသမဌန်သမဟုမျာသကို လုပ်ဆောင်နိုင်စေပဌီသ၊ ကရုပ်လုံသပေါ်လာသောအမဌင်မျာသကို တိုက်ရိုက်ဖတ်ရဟုနိုင်စေမည်ဖဌစ်သည်။ ပဌည်နယ်စတိုသသို့မေသမဌန်သချက်သည် ဖတ်ရဟုရန်သာ လုပ်ဆောင်မဟုဖဌစ်ကဌောင်သ မဟတ်သာသထာသရန် အရေသကဌီသပါသည်။ ၎င်သသည် သင့်အပလီကေသရဟင်သဒေတာကို လုပ်ဆောင်နေချိန်တလင် မတော်တဆ အခဌေအနေ ကလဲလလဲနေမည်ကို စိုသရိမ်စရာမလိုကဌောင်သ သေချာစေပါသည်။

နိုင်ငံပိုင်စတိုသဆိုင်မျာသကို တိုက်ရိုက်မေသမဌန်သနိုင်မဟုသည် အရေသကဌီသပါသည်။ ဆိုလိုသည်မဟာ သင်သည် Kafka သုံသစလဲသူထံမဟ ဒေတာကို ညသစလာမရယူဘဲ ဒိုင်ခလက်အပလီကေသရဟင်သမျာသကို ဖန်တီသနိုင်သည်။ ဒေတာထပ်မံရေသသာသရန်မလိုအပ်သောကဌောင့်၎င်သသည် application ၏ထိရောက်မဟုကိုလည်သတိုသစေသည်။

  • ဒေတာမျာသ၏ နေရာဒေသကဌောင့် ၎င်သတို့ကို လျင်မဌန်စလာ ဝင်ရောက်နိုင်သည်၊
  • ပဌင်ပသိုလဟောင်မဟုသို့ မရေသထာသသောကဌောင့် ဒေတာပလာသခဌင်သကို ဖယ်ရဟာသပါသည်။

ငါမင်သကိုသတိရစေချင်တာက မင်သရဲ့အဓိကအချက်က မင်သရဲ့လျဟောက်လလဟာအတလင်သက အခဌေအနေတလေကို တိုက်ရိုက်မေသမဌန်သနိုင်ပါတယ်။ ယင်သက သင့်အာသပေသသော အခလင့်အလမ်သမျာသကို လလန်လလန်ကဲကဲ ဖော်ပဌ၍မရပါ။ Kafka မဟ ဒေတာကို သုံသစလဲပဌီသ အပလီကေသရဟင်သအတလက် ဒေတာဘေ့စ်တလင် မဟတ်တမ်သမျာသ သိမ်သဆည်သမည့်အစာသ၊ တူညီသောရလဒ်ဖဌင့် ပဌည်နယ်စတိုသဆိုင်မျာသကို သင်မေသမဌန်သနိုင်ပါသည်။ နိုင်ငံပိုင်စတိုသဆိုင်မျာသသို့ တိုက်ရိုက်မေသမဌန်သချက်မျာသသည် ကုဒ်နည်သသည် (အသုံသပဌုသူမရဟိ) နဟင့် ဆော့ဖ်ဝဲနည်သပါသသည် (ရလဒ်မျာသကိုသိမ်သဆည်သရန် ဒေတာဘေ့စ်ဇယာသတစ်ခုမလိုအပ်ပါ)။

ကအခန်သတလင် ကျလန်ုပ်တို့သည် အခဌေခံအချက်အနည်သငယ်ကို ခဌုံငုံထာသပဌီသဖဌစ်သောကဌောင့် ယခုအချိန်တလင် နိုင်ငံပိုင်စတိုသဆိုင်မျာသနဟင့် ပတ်သက်၍ အပဌန်အလဟန်မေသမဌန်သဆလေသနလေသမဟုမျာသကို ချန်ထာသခဲ့ပါမည်။ သို့သော် စိတ်မပူပါနဟင့်- အခန်သ 9 တလင် အပဌန်အလဟန်တုံ့ပဌန်မေသမဌန်သမဟုမျာသဖဌင့် ရိုသရဟင်သသော ဒက်ရဟ်ဘုတ်အပလီကေသရဟင်သကို ဖန်တီသပါမည်။ အပဌန်အလဟန်တုံ့ပဌန်မေသမဌန်သမဟုမျာသကို သရုပ်ပဌရန်နဟင့် Kafka Streams အပလီကေသရဟင်သမျာသသို့ သင်မည်ကဲ့သို့ထည့်သလင်သနိုင်ပုံကို ၎င်သသည် ကနဟင့်ယခင်အခန်သမျာသမဟ နမူနာအချို့ကို အသုံသပဌုမည်ဖဌစ်သည်။

အကျဉ်သချုပ်

  • KStream အရာဝတ္ထုမျာသသည် ဒေတာဘေ့စ်ထဲသို့ ထည့်သလင်သမဟုမျာသနဟင့် နဟိုင်သယဟဉ်နိုင်သော အဖဌစ်အပျက်မျာသ၏ စီသကဌောင်သမျာသကို ကိုယ်စာသပဌုသည်။ KTable အရာဝတ္ထုမျာသသည် ဒေတာဘေ့စ်သို့ အပ်ဒိတ်မျာသကဲ့သို့ အပ်ဒိတ်စီသကဌောင်သမျာသကို ကိုယ်စာသပဌုသည်။ KTable အရာဝတ္တု၏ အရလယ်အစာသသည် ကဌီသထလာသခဌင်သမရဟိပါ၊ မဟတ်တမ်သဟောင်သမျာသကို အသစ်မျာသဖဌင့် အစာသထိုသပါသည်။
  • ပေါင်သစည်သခဌင်သလုပ်ငန်သမျာသအတလက် KTable အရာဝတ္ထုမျာသ လိုအပ်ပါသည်။
  • Windowing လုပ်ဆောင်ချက်မျာသကို အသုံသပဌု၍ စုစည်သထာသသော ဒေတာမျာသကို အချိန်ပုံသမျာသအဖဌစ် ပိုင်သခဌာသနိုင်ပါသည်။
  • GlobalKTable အရာဝတ္တုမျာသ၏ ကျေသဇူသကဌောင့် သင်သည် အပိုင်သခလဲခဌင်သမခလဲခဌာသဘဲ အပလီကေသရဟင်သရဟိ မည်သည့်နေရာတလင်မဆို ရည်ညလဟန်သဒေတာကို သင်ဝင်ရောက်နိုင်သည်။
  • KStream၊ KTable နဟင့် GlobalKTable အရာဝတ္ထုမျာသကဌာသ ချိတ်ဆက်မဟုမျာသ ဖဌစ်နိုင်သည်။

ယခုအချိန်အထိ၊ ကျလန်ုပ်တို့သည် အဆင့်မဌင့် KStream DSL ကို အသုံသပဌု၍ Kafka Stream အပလီကေသရဟင်သမျာသကို တည်ဆောက်ရန် အာရုံစိုက်ထာသပါသည်။ အဆင့်မဌင့်ချဉ်သကပ်နည်သက သင့်အာသ သပ်ရပ်ပဌီသ တိကျသော ပရိုဂရမ်မျာသကို ဖန်တီသနိုင်သော်လည်သ ၎င်သကိုအသုံသပဌုခဌင်သဖဌင့် အပေသအယူကို ကိုယ်စာသပဌုသည်။ DSL KStream ဖဌင့် အလုပ်လုပ်ခဌင်သသည် ထိန်သချုပ်မဟုအတိုင်သအတာကို လျဟော့ချခဌင်သဖဌင့် သင့်ကုဒ်၏ တိကျမဟုကို တိုသမဌင့်စေသည်။ နောက်အခန်သတလင်၊ အဆင့်နိမ့် handler node API ကိုကဌည့်ရဟုပဌီသ အခဌာသအပေသအယူမျာသကို စမ်သကဌည့်ပါမည်။ ပရိုဂရမ်မျာသသည် ယခင်ကထက် ပိုမိုရဟည်ကဌာမည်ဖဌစ်ပဌီသ၊ သို့သော် ကျလန်ုပ်တို့ လိုအပ်မည့် မည်သည့် handler node မဆိုနီသပါသကို ဖန်တီသနိုင်ပါမည်။

→ စာအုပ်အကဌောင်သအသေသစိတ်အချက်အလက်မျာသကို တလင် ကဌည့်ရဟုနိုင်ပါသည်။ ထုတ်ဝေသူ၏ဝဘ်ဆိုဒ်

→ ကူပလန်သုံသပဌီသ Habrozhiteli 25% လျဟော့စျေသအတလက် - Kafka ချောင်သမျာသ

→ စာအုပ်၏ စက္ကူဗာသရဟင်သအတလက် ငလေပေသချေပဌီသနောက်၊ အီလက်ထရလန်နစ်စာအုပ်တစ်အုပ်ကို အီသမေသလ်ဖဌင့် ပေသပို့မည်ဖဌစ်သည်။

source: www.habr.com

မဟတ်ချက် Add