"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා" ආයුබෝවන්, Khabro පදිංචිකරුවන්! මෙම පොත නූල් සැකසුම් තේරුම් ගැනීමට කැමති ඕනෑම සංවර්ධකයෙකු සඳහා සුදුසු වේ. බෙදා හරින ලද ක්‍රමලේඛනය අවබෝධ කර ගැනීම ඔබට කෆ්කා සහ කෆ්කා ප්‍රවාහ වඩාත් හොඳින් අවබෝධ කර ගැනීමට උපකාරී වේ. කෆ්කා රාමුව දැන ගැනීම සතුටක්, නමුත් මෙය අවශ්‍ය නොවේ: ඔබට අවශ්‍ය සියල්ල මම ඔබට කියමි. පළපුරුදු කෆ්කා සංවර්ධකයින් සහ නවකයන් මෙම පොතේ කෆ්කා ස්ට්‍රීම්ස් පුස්තකාලය භාවිතයෙන් රසවත් ප්‍රවාහ සැකසුම් යෙදුම් නිර්මාණය කරන්නේ කෙසේදැයි ඉගෙන ගනු ඇත. අනුක්‍රමිකකරණය වැනි සංකල්ප සමඟ දැනටමත් හුරුපුරුදු අතරමැදි සහ උසස් ජාවා සංවර්ධකයින් Kafka Streams යෙදුම් නිර්මාණය කිරීමට ඔවුන්ගේ කුසලතා යෙදීමට ඉගෙන ගනු ඇත. පොතේ මූලාශ්‍ර කේතය Java 8 හි ලියා ඇති අතර Java 8 lambda ප්‍රකාශන වාක්‍ය ඛණ්ඩය සැලකිය යුතු ලෙස භාවිතා කරයි, එබැවින් lambda ශ්‍රිත සමඟ වැඩ කරන්නේ කෙසේදැයි දැන ගැනීම (වෙනත් ක්‍රමලේඛන භාෂාවකින් පවා) ප්‍රයෝජනවත් වනු ඇත.

උපුටා ගැනීමකි. 5.3 එකතු කිරීමේ සහ කවුළු මෙහෙයුම්

මෙම කොටසේදී, අපි කෆ්කා ප්‍රවාහවල වඩාත් බලාපොරොත්තු සහගත කොටස් ගවේෂණය කිරීමට ඉදිරියට යන්නෙමු. මෙතෙක් අපි කෆ්කා ප්‍රවාහයේ පහත සඳහන් අංග ආවරණය කර ඇත:

  • සැකසුම් ස්ථලකය නිර්මාණය කිරීම;
  • ප්‍රවාහ යෙදුම්වල තත්වය භාවිතා කිරීම;
  • දත්ත ප්රවාහ සම්බන්ධතා සිදු කිරීම;
  • සිදුවීම් ප්‍රවාහ (KStream) සහ යාවත්කාලීන ප්‍රවාහ (KTable) අතර වෙනස්කම්.

පහත උදාහරණ වලින් අපි මෙම සියලු අංග එකට ගෙන එනු ඇත. ප්‍රවාහ යෙදුම්වල තවත් විශිෂ්ට අංගයක් වන windowing ගැනද ඔබ ඉගෙන ගනු ඇත. අපගේ පළමු උදාහරණය සරල එකතු කිරීමක් වනු ඇත.

5.3.1. කර්මාන්ත අංශය අනුව කොටස් අලෙවිය එකතු කිරීම

ප්‍රවාහ දත්ත සමඟ වැඩ කිරීමේදී එකතු කිරීම සහ කණ්ඩායම් කිරීම අත්‍යවශ්‍ය මෙවලම් වේ. එක් එක් වාර්තා ලැබෙන පරිදි ඒවා පරීක්ෂා කිරීම බොහෝ විට ප්රමාණවත් නොවේ. දත්ත වලින් අමතර තොරතුරු උකහා ගැනීම සඳහා, ඒවා කණ්ඩායම් කිරීම සහ ඒකාබද්ධ කිරීම අවශ්ය වේ.

මෙම උදාහරණයේදී, ඔබ කර්මාන්ත කිහිපයක සමාගම්වල කොටස්වල විකුණුම් පරිමාව නිරීක්ෂණය කිරීමට අවශ්‍ය දින වෙළෙන්දෙකුගේ ඇඳුම අඳිනු ඇත. විශේෂයෙන්, ඔබ එක් එක් කර්මාන්තයේ විශාලතම කොටස් විකුණුම් ඇති සමාගම් පහ ගැන උනන්දු වෙයි.

එවැනි එකතු කිරීමකට අවශ්‍ය පෝරමයට දත්ත පරිවර්තනය කිරීම සඳහා පහත පියවර කිහිපයක් අවශ්‍ය වේ (සාමාන්‍ය වචන වලින් කථා කිරීම).

  1. අමු කොටස් වෙළඳ තොරතුරු ප්‍රකාශයට පත් කරන මාතෘකා පදනම් වූ මූලාශ්‍රයක් සාදන්න. අපිට StockTransaction වර්ගයේ වස්තුවක් ShareVolume වර්ගයේ වස්තුවකට සිතියම්ගත කිරීමට සිදුවේ. කාරණය වන්නේ StockTransaction වස්තුවේ විකුණුම් පාර-දත්ත අඩංගු වන නමුත් අපට අවශ්‍ය වන්නේ විකුණනු ලබන කොටස් ගණන පිළිබඳ දත්ත පමණි.
  2. කොටස් සංකේතය අනුව ShareVolume දත්ත සමූහ කරන්න. සංකේතය අනුව කාණ්ඩ කළ පසු, ඔබට මෙම දත්ත කොටස් විකුණුම් වෙළුම්වල උප එකතුවලට හකුළ හැක. KStream.groupBy ක්‍රමය KGroupedStream ආකාරයේ අවස්ථාවක් ලබා දෙන බව සඳහන් කිරීම වටී. තවද KGroupedStream.reduce ක්‍රමය ඇමතීමෙන් ඔබට KTable අවස්ථාවක් ලබා ගත හැක.

KGroupedStream අතුරුමුහුණත යනු කුමක්ද?

KStream.groupBy සහ KStream.groupByKey ක්‍රම මගින් KGroupedStream හි අවස්ථාවක් ලබා දේ. KGroupedStream යනු යතුරු මගින් සමූහගත කිරීමෙන් පසු සිදුවීම් ප්‍රවාහයක අතරමැදි නිරූපණයකි. එය සමඟ සෘජු වැඩ කිරීම සඳහා එය කිසිසේත්ම අදහස් නොකෙරේ. ඒ වෙනුවට, KGroupedStream එකතු කිරීමේ මෙහෙයුම් සඳහා භාවිතා කරයි, එය සෑම විටම KTable එකක් ඇති කරයි. එකතු කිරීමේ මෙහෙයුම්වල ප්‍රතිඵලය KTable එකක් වන අතර ඔවුන් රාජ්‍ය ගබඩාවක් භාවිතා කරන බැවින්, එහි ප්‍රතිඵලයක් ලෙස සියලුම යාවත්කාලීන කිරීම් තවදුරටත් නල මාර්ගයේ නොයැවීමට ඉඩ ඇත.

KTable.groupBy ක්‍රමය සමාන KGroupedTable එකක් ආපසු ලබා දෙයි - යතුර මගින් නැවත සමූහගත කරන ලද යාවත්කාලීන ප්‍රවාහයේ අතරමැදි නිරූපණයකි.

අපි කෙටි විවේකයක් ගෙන රූපය දෙස බලමු. 5.9, එයින් පෙන්නුම් කරන්නේ අප අත්කර ගෙන ඇති දේ. මෙම ස්ථලකය දැනටමත් ඔබට ඉතා හුරුපුරුදු විය යුතුය.

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"
අපි දැන් බලමු මෙම ස්ථලකය සඳහා වන කේතය (එය src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java ගොනුවෙන් සොයාගත හැක) (ලැයිස්තුගත කිරීම 5.2).

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"
ලබා දී ඇති කේතය එහි සංක්ෂිප්තභාවය සහ පේළි කිහිපයකින් සිදු කරන ලද විශාල ක්‍රියාවන් මගින් කැපී පෙනේ. ඔබ builder.stream ක්‍රමයේ පළමු පරාමිතිය තුළ අලුත් දෙයක් දැකිය හැක: enum වර්ගයේ අගයක් AutoOffsetReset.EARLIEST (LATEST එකක් ද ඇත), Consumed.withOffsetResetPolicy ක්‍රමය භාවිතයෙන් සකසන්න. එක් එක් KStream හෝ KTable සඳහා ඕෆ්සෙට් යළි පිහිටුවීමේ උපාය මාර්ගයක් නියම කිරීමට මෙම ගණන් කිරීමේ වර්ගය භාවිතා කළ හැකි අතර වින්‍යාසයෙන් ඕෆ්සෙට් යළි පිහිටුවීමේ විකල්පයට වඩා ප්‍රමුඛත්වය ගනී.

GroupByKey සහ GroupBy

KStream අතුරුමුහුණතට වාර්තා කාණ්ඩගත කිරීම සඳහා ක්‍රම දෙකක් ඇත: GroupByKey සහ GroupBy. දෙකම KGroupedTable එකක් ලබා දෙයි, එබැවින් ඔබ ඔවුන් අතර ඇති වෙනස කුමක්ද සහ කුමන එක භාවිතා කළ යුතුද යන්න ගැන කල්පනා කරනවා විය හැක.

KStream හි යතුරු දැනටමත් හිස් නොවන විට GroupByKey ක්‍රමය භාවිතා වේ. වැදගත්ම දෙය නම්, "නැවත කොටස් කිරීම අවශ්‍යයි" ධජය කිසි විටෙකත් සකසා නොතිබුණි.

GroupBy ක්‍රමය උපකල්පනය කරන්නේ ඔබ කණ්ඩායම් කිරීමේ යතුරු වෙනස් කර ඇති බවයි, එබැවින් නැවත කොටස් කිරීමේ ධජය සත්‍ය ලෙස සකසා ඇත. GroupBy ක්‍රමයට පසුව සම්බන්ධ කිරීම්, එකතු කිරීම් ආදිය සිදු කිරීමෙන් ස්වයංක්‍රීයව නැවත කොටස් කිරීම සිදුවේ.
සාරාංශය: හැකි සෑම විටම, ඔබ GroupBy වෙනුවට GroupByKey භාවිතා කළ යුතුය.

mapValues ​​සහ groupBy ක්‍රම මගින් කරන්නේ කුමක්ද යන්න පැහැදිලිය, එබැවින් අපි එකතුව() ක්‍රමය (src/main/java/bbejeck/model/ShareVolume.java හි දක්නට ලැබේ) (ලැයිස්තුගත කිරීම 5.3) දෙස බලමු.

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"
ShareVolume.sum ක්‍රමය කොටස් විකුණුම් පරිමාවේ ධාවන එකතුව ආපසු ලබා දෙන අතර සමස්ත ගණනය කිරීම් දාමයේ ප්‍රතිඵලය KTable වස්තුවකි. . දැන් ඔබට තේරෙනවා KTable විසින් ඉටු කරන කාර්යභාරය. ShareVolume වස්තු පැමිණි විට, අනුරූප KTable වස්තුව නවතම වත්මන් යාවත්කාලීන ගබඩා කරයි. සියලුම යාවත්කාලීනයන් පෙර shareVolumeKTable හි පිළිබිඹු වන නමුත්, සියල්ල තවදුරටත් යවනු නොලැබෙන බව මතක තබා ගැනීම වැදගත්ය.

එක් එක් කර්මාන්තයේ වැඩිම කොටස් වෙළුම් සහිත සමාගම් පහ වෙත පැමිණීමට (වෙළඳාම කරන ලද කොටස් ගණන අනුව) අපි මෙම KTable භාවිතා කරමු. මෙම නඩුවේ අපගේ ක්රියාවන් පළමු එකතු කිරීම සඳහා සමාන වනු ඇත.

  1. කර්මාන්තය අනුව තනි ShareVolume වස්තු කාණ්ඩ කිරීමට තවත් groupBy මෙහෙයුමක් සිදු කරන්න.
  2. ShareVolume වස්තු සාරාංශ කිරීම ආරම්භ කරන්න. මෙවර එකතු කිරීමේ වස්තුව ස්ථාවර ප්‍රමාණයේ ප්‍රමුඛතා පෝලිමකි. මෙම ස්ථාවර ප්‍රමාණයේ පෝලිමේ රඳවා තබාගනු ලබන්නේ විශාලතම කොටස් විකුණා ඇති සමාගම් පහ පමණි.
  3. පෙර ඡේදයේ සිට පෙළ අගයකට පෝලිම් සිතියම්ගත කර කර්මාන්තය අනුව අංක අනුව වැඩිපුරම වෙළඳාම් කළ කොටස් පහ ආපසු දෙන්න.
  4. මාතෘකාවට ප්‍රතිඵල තන්තු ආකාරයෙන් ලියන්න.

රූපයේ. රූප සටහන 5.10 දත්ත ප්‍රවාහ ස්ථල විද්‍යාව ප්‍රස්ථාරය පෙන්වයි. ඔබට පෙනෙන පරිදි, සැකසීමේ දෙවන වටය තරමක් සරල ය.

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"
දැන් අපට මෙම දෙවන වටයේ සැකසුම් වල ව්‍යුහය පිළිබඳව පැහැදිලි අවබෝධයක් ඇති බැවින්, අපට එහි මූල කේතය වෙත හැරවිය හැක (ඔබ එය src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java ගොනුවෙන් සොයා ගනු ඇත) (ලැයිස්තුගත කිරීම 5.4) .

මෙම ආරම්භකයෙහි ස්ථාවර පෝලිම් විචල්‍යයක් අඩංගු වේ. මෙය java.util.TreeSet සඳහා ඇඩප්ටරයක් ​​වන අභිරුචි වස්තුවක් වන අතර එය වෙළඳාම් කරන ලද කොටස්වල අවරෝහණ අනුපිළිවෙලෙහි ඉහළම N ප්රතිඵල නිරීක්ෂණය කිරීමට භාවිතා කරයි.

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"
ඔබ දැනටමත් groupBy සහ mapValues ​​ඇමතුම් දැක ඇත, එබැවින් අපි ඒවාට නොයන්නෙමු (KTable.toStream ක්‍රමයට අපි අමතන්නේ KTable.print ක්‍රමය අත්හරින ලද බැවිනි). නමුත් ඔබ තවමත් aggregate() හි KTable අනුවාදය දැක නැත, එබැවින් අපි එය සාකච්ඡා කිරීමට සුළු කාලයක් ගත කරමු.

ඔබට මතක ඇති පරිදි, KTable වෙනස් වන්නේ එකම යතුරු සහිත වාර්තා යාවත්කාලීන ලෙස සැලකීමයි. KTable පැරණි ප්‍රවේශය නව එකක් සමඟ ප්‍රතිස්ථාපනය කරයි. එකතු කිරීම සමාන ආකාරයකින් සිදු වේ: එකම යතුර සහිත නවතම වාර්තා එකතු කර ඇත. වාර්තාවක් පැමිණි විට, එය එකතු කරන්නෙකු භාවිතයෙන් FixedSizePriorityQueue පන්තියේ අවස්ථාවට එක් කරනු ලැබේ (සමස්ත ක්‍රම ඇමතුමේ දෙවන පරාමිතිය), නමුත් එම යතුර සමඟ වෙනත් වාර්තාවක් දැනටමත් පවතී නම්, පැරණි වාර්තාව උපස්ථකය භාවිතයෙන් ඉවත් කරනු ලැබේ (තෙවන පරාමිතිය තුළ සමස්ථ ක්රමය ඇමතුම).

මේ සියල්ලෙන් අදහස් කරන්නේ අපගේ එකතු කරන්නා වන FixedSizePriorityQueue, සියලුම අගයන් එක් යතුරක් සමඟ එකතු නොකරන නමුත් N වඩාත්ම වෙළඳාම් කරන ලද කොටස්වල ප්‍රමාණවල චලනය වන එකතුවක් ගබඩා කරන බවයි. එන සෑම ප්‍රවේශයක්ම මෙතෙක් විකුණා ඇති මුළු කොටස් සංඛ්‍යාව අඩංගු වේ. KTable මඟින් ඔබට දැනට වැඩිපුරම ගනුදෙනු වන්නේ කුමන සමාගම්වල කොටස්ද යන්න පිළිබඳ තොරතුරු ලබා දෙනු ඇත, එක් එක් යාවත්කාලීන කිරීම් එකතු කිරීම අවශ්‍ය නොවේ.

අපි වැදගත් දේවල් දෙකක් කිරීමට ඉගෙන ගත්තා:

  • පොදු යතුරකින් KTable හි කණ්ඩායම් අගයන්;
  • මෙම සමූහ අගයන් මත පෙරළීම සහ එකතු කිරීම වැනි ප්‍රයෝජනවත් මෙහෙයුම් සිදු කරන්න.

මෙම මෙහෙයුම් සිදු කරන්නේ කෙසේදැයි දැන ගැනීම Kafka Streams යෙදුමක් හරහා ගමන් කරන දත්තවල අර්ථය අවබෝධ කර ගැනීමට සහ එය රැගෙන යන තොරතුරු මොනවාද යන්න තේරුම් ගැනීමට වැදගත් වේ.

අපි මේ පොතේ කලින් සාකච්ඡා කළ ප්‍රධාන සංකල්ප කිහිපයක් ද ගෙන ආවා. 4 වන පරිච්ඡේදයේ, අපි ප්‍රවාහ යෙදුමක් සඳහා දෝෂ-ඉවසන, ප්‍රාදේශීය රාජ්‍යය වැදගත් වන්නේ කෙසේද යන්න සාකච්ඡා කළෙමු. මෙම පරිච්ඡේදයේ පළමු උදාහරණය ප්‍රාදේශීය රාජ්‍යය එතරම් වැදගත් වන්නේ මන්දැයි පෙන්නුම් කරයි - ඔබ දැනටමත් දැක ඇති තොරතුරු මොනවාදැයි නිරීක්ෂණය කිරීමට එය ඔබට ඉඩ සලසයි. දේශීය ප්‍රවේශය ජාල ප්‍රමාදයන් වළක්වයි, යෙදුම වඩාත් ක්‍රියාකාරී සහ දෝෂ-ප්‍රතිරෝධී කරයි.

කිසියම් පෙරළීමක් හෝ එකතු කිරීමේ මෙහෙයුමක් සිදු කරන විට, ඔබ රාජ්ය ගබඩාවේ නම සඳහන් කළ යුතුය. පෙරළීම සහ එකතු කිරීමේ මෙහෙයුම් මඟින් KTable අවස්ථාවක් ලබා දෙන අතර, KTable පැරණි ප්‍රතිඵල නව ඒවා සමඟ ප්‍රතිස්ථාපනය කිරීමට රාජ්‍ය ගබඩාව භාවිත කරයි. ඔබ දැක ඇති පරිදි, සියලුම යාවත්කාලීනයන් නල මාර්ගයෙන් පහළට යවනු නොලැබේ, සහ එකතු කිරීමේ මෙහෙයුම් සාරාංශ තොරතුරු නිපදවීමට සැලසුම් කර ඇති නිසා මෙය වැදගත් වේ. ඔබ ප්‍රාදේශීය රාජ්‍යය යොදන්නේ නැතිනම්, KTable සියලු එකතු කිරීම් සහ පෙරළීමේ ප්‍රතිඵල ඉදිරිපත් කරයි.

මීලඟට, අපි නිශ්චිත කාලයක් තුළ එකතු කිරීම වැනි මෙහෙයුම් සිදු කිරීම දෙස බලමු - ඊනියා කවුළු මෙහෙයුම්.

5.3.2. කවුළු මෙහෙයුම්

පෙර කොටසේදී, අපි ස්ලයිඩින් කන්වලූෂන් සහ එකතු කිරීම හඳුන්වා දුන්නෙමු. යෙදුම කොටස් විකුණුම් පරිමාව අඛණ්ඩව පෙරළීමක් සිදු කරන ලද අතර පසුව හුවමාරුවේ වැඩිපුරම වෙළඳාම් කරන ලද කොටස් පහ එකතු කළේය.

සමහර විට එවැනි අඛණ්ඩ එකතු කිරීම සහ ප්රතිඵල පෙරළීම අවශ්ය වේ. සමහර විට ඔබට මෙහෙයුම් සිදු කිරීමට අවශ්‍ය වන්නේ යම් කාල සීමාවක් තුළ පමණි. උදාහරණයක් ලෙස, පසුගිය මිනිත්තු 10 තුළ යම් සමාගමක කොටස් සමඟ හුවමාරු ගනුදෙනු කීයක් සිදු කර ඇත්දැයි ගණනය කරන්න. නැතහොත් අවසාන මිනිත්තු 15 තුළ නව වෙළඳ ප්‍රචාරණ බැනරයක් මත පරිශීලකයින් කී දෙනෙක් ක්ලික් කළාද. යෙදුමක් එවැනි මෙහෙයුම් කිහිප වතාවක් සිදු කළ හැක, නමුත් නිශ්චිත කාල සීමාවන් සඳහා පමණක් අදාළ වන ප්‍රතිඵල සහිතව (කාල කවුළු).

ගැනුම්කරු විසින් හුවමාරු ගනුදෙනු ගණනය කිරීම

මීළඟ උදාහරණයේදී, අපි බහුවිධ වෙළඳුන් හරහා-විශාල සංවිධාන හෝ දක්ෂ තනි පුද්ගල මූල්‍යකරුවන් හරහා කොටස් ගනුදෙනු නිරීක්ෂණය කරන්නෙමු.

මෙම ලුහුබැඳීමට හේතු දෙකක් විය හැකිය. ඒවායින් එකක් වන්නේ වෙළඳපල නායකයින් මිලදී ගැනීම / විකුණන්නේ කුමක් දැයි දැනගැනීමේ අවශ්‍යතාවයයි. මෙම විශාල ක්‍රීඩකයින් සහ නවීන ආයෝජකයින් අවස්ථාවක් දකින්නේ නම්, ඔවුන්ගේ උපාය මාර්ගය අනුගමනය කිරීම අර්ථවත් කරයි. දෙවන හේතුව වන්නේ නීතිවිරෝධී අභ්‍යන්තර වෙළඳාමේ ඇති විය හැකි සලකුණු හඳුනා ගැනීමට ඇති ආශාවයි. මෙය සිදු කිරීම සඳහා, ඔබට වැදගත් මාධ්‍ය නිවේදන සමඟ විශාල විකුණුම් කරල්වල සහසම්බන්ධය විශ්ලේෂණය කිරීමට අවශ්‍ය වනු ඇත.

එවැනි ලුහුබැඳීම පහත පියවර වලින් සමන්විත වේ:

  • කොටස් ගනුදෙනු මාතෘකාවෙන් කියවීම සඳහා ධාරාවක් නිර්මාණය කිරීම;
  • ගැනුම්කරු හැඳුනුම්පත සහ කොටස් සංකේතය අනුව පැමිණෙන වාර්තා කාණ්ඩගත කිරීම. groupBy ක්‍රමය ඇමතීමෙන් KGroupedStream පන්තියේ අවස්ථාවක් ලබා දෙයි;
  • KGroupedStream.windowedBy ක්‍රමය මඟින් කාල කවුළුවකට සීමා වූ දත්ත ප්‍රවාහයක් ආපසු ලබා දෙයි, එමඟින් කවුළු එකතු කිරීමට ඉඩ ලබා දේ. කවුළු වර්ගය මත පදනම්ව, TimeWindowedKStream එකක් හෝ SessionWindowedKStream එකක් ආපසු ලබාදේ;
  • එකතු කිරීමේ මෙහෙයුම සඳහා ගනුදෙනු ගණන. ජනෙල් දත්ත ප්‍රවාහය මෙම ගණන් කිරීමේදී යම් වාර්තාවක් සැලකිල්ලට ගන්නේද යන්න තීරණය කරයි;
  • මාතෘකාවකට ප්‍රතිඵල ලිවීම හෝ සංවර්ධනය අතරතුර ඒවා කොන්සෝලයට ප්‍රතිදානය කිරීම.

මෙම යෙදුමේ ස්ථලකය සරල ය, නමුත් එය පිළිබඳ පැහැදිලි චිත්රයක් ප්රයෝජනවත් වනු ඇත. අපි රූපය දෙස බලමු. 5.11.

ඊළඟට, අපි කවුළු මෙහෙයුම් වල ක්රියාකාරිත්වය සහ අනුරූප කේතය දෙස බලමු.

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"

කවුළු වර්ග

Kafka Streams හි කවුළු වර්ග තුනක් ඇත:

  • සැසිවාරය;
  • "tumbling" (tumbling);
  • ලිස්සා යාම / පැනීම.

කුමන එකක් තෝරාගත යුතුද යන්න ඔබේ ව්‍යාපාර අවශ්‍යතා මත රඳා පවතී. ටම්බ්ලිං සහ පැනීමේ කවුළු කාල සීමා සහිත වන අතර සැසි කවුළු පරිශීලක ක්‍රියාකාරකම් මගින් සීමා වේ - සැසියේ කාලසීමාව තීරණය වන්නේ පරිශීලකයා කෙතරම් ක්‍රියාශීලීද යන්න මත පමණි. මතක තබා ගත යුතු ප්‍රධානතම දෙය නම්, සියලුම කවුළු වර්ග පදනම් වී ඇත්තේ ඇතුළත් කිරීම් වල දිනය/වේලා මුද්දර මත මිස පද්ධති වේලාව මත නොවන බවයි.

ඊළඟට, අපි එක් එක් කවුළු වර්ග සමඟ අපගේ ස්ථලකය ක්රියාත්මක කරමු. සම්පූර්ණ කේතය ලබා දෙනු ලබන්නේ පළමු උදාහරණයේ පමණි; වෙනත් වර්ගවල කවුළු සඳහා කවුළු මෙහෙයුම් වර්ගය හැර කිසිවක් වෙනස් නොවේ.

සැසි කවුළු

Session windows අනෙකුත් සියලුම ආකාරයේ windows වලට වඩා බෙහෙවින් වෙනස් වේ. ඒවා පරිශීලකයාගේ ක්‍රියාකාරකම (හෝ ඔබ නිරීක්ෂණය කිරීමට කැමති ආයතනයේ ක්‍රියාකාරකම්) අනුව කාලය අනුව සීමා නොවේ. සැසි කවුළු අක්‍රිය කාල පරිච්ඡේද මගින් සීමා කෙරේ.

රූප සටහන 5.12 සැසි කවුළු පිළිබඳ සංකල්පය විදහා දක්වයි. කුඩා සැසිය එහි වම් පස ඇති සැසිය සමඟ ඒකාබද්ධ වනු ඇත. දකුණු පස ඇති සැසිය දිගු කාලයක් අක්‍රියව පවතින බැවින් එය වෙනම වනු ඇත. සැසි කවුළු පරිශීලක ක්‍රියාකාරකම් මත පදනම් වේ, නමුත් ප්‍රවේශය අයත් වන්නේ කුමන සැසියටද යන්න තීරණය කිරීමට ඇතුළත් කිරීම් වලින් දින/වේලා මුද්දර භාවිතා කරන්න.

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"

කොටස් ගනුදෙනු නිරීක්ෂණය කිරීමට සැසි කවුළු භාවිතා කිරීම

හුවමාරු ගනුදෙනු පිළිබඳ තොරතුරු ග්‍රහණය කර ගැනීමට සැසි කවුළු භාවිතා කරමු. සැසි කවුළු ක්‍රියාත්මක කිරීම ලැයිස්තුගත කිරීම් 5.5 හි පෙන්වා ඇත (එය src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java හි සොයා ගත හැක).

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"
මෙම ස්ථල විද්‍යාවේ බොහෝ මෙහෙයුම් ඔබ දැනටමත් දැක ඇත, එබැවින් ඒවා නැවත මෙහි බැලීමට අවශ්‍ය නොවේ. නමුත් මෙහි නව අංග කිහිපයක් ද ඇත, අපි දැන් සාකච්ඡා කරමු.

ඕනෑම groupBy මෙහෙයුමක් සාමාන්‍යයෙන් යම් ආකාරයක එකතු කිරීමේ මෙහෙයුමක් සිදු කරයි (එකතු කිරීම, පෙරළීම හෝ ගණන් කිරීම). ඔබට ධාවන එකතුවක් සමඟ සමුච්චිත එකතු කිරීම හෝ නිශ්චිත කාල කවුළුවක් තුළ වාර්තා සැලකිල්ලට ගන්නා කවුළු එකතු කිරීම සිදු කළ හැක.

ලැයිස්තුගත කිරීමේ 5.5 හි කේතය සැසි කවුළු තුළ ඇති ගනුදෙනු ගණන ගණනය කරයි. රූපයේ. 5.13 මෙම ක්රියාවන් පියවරෙන් පියවර විශ්ලේෂණය කෙරේ.

windowedBy(SessionWindows.with(twentySeconds) දක්වා (විනාඩි පහළොවක්)) ඇමතීමෙන් අපි තත්පර 20ක අක්‍රිය කාල පරතරයක් සහ මිනිත්තු 15ක අඛණ්ඩ කාල පරතරයක් සහිත සැසි කවුළුවක් සාදන්නෙමු. තත්පර 20ක නිෂ්ක්‍රීය විරාමයක් යන්නෙන් අදහස් වන්නේ වත්මන් සැසියේ අවසානය හෝ ආරම්භයේ සිට තත්පර 20ක් ඇතුළත වත්මන් (ක්‍රියාකාරී) සැසියට පැමිණෙන ඕනෑම ප්‍රවේශයක් යෙදුමට ඇතුළත් වන බවයි.

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"
මීලඟට, අපි සැසි කවුළුව තුළ සිදු කළ යුතු එකතු කිරීමේ මෙහෙයුම නියම කරමු - මෙම අවස්ථාවේදී, ගණන් කරන්න. එන ප්‍රවේශයක් අක්‍රිය කවුළුවෙන් පිටත වැටේ නම් (දිනය/වේලා මුද්‍රාවේ දෙපස), යෙදුම නව සැසියක් නිර්මාණය කරයි. රඳවා ගැනීමේ පරතරය යනු යම් කාලයක් සඳහා සැසියක් පවත්වා ගෙන යාම සහ සැසියේ අක්‍රිය කාල සීමාවෙන් ඔබ්බට විහිදෙන නමුත් තවමත් ඇමිණිය හැකි ප්‍රමාද දත්ත සඳහා ඉඩ සලසයි. අතිරේකව, ඒකාබද්ධ වීමෙන් ඇතිවන නව සැසියේ ආරම්භය සහ අවසානය මුල්ම සහ නවතම දිනය/වේලා මුද්දරයට අනුරූප වේ.

සැසි ක්‍රියා කරන ආකාරය බැලීමට ගණන් කිරීමේ ක්‍රමයේ ඇතුළත් කිරීම් කිහිපයක් බලමු (වගුව 5.1).

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"
වාර්තා පැමිණෙන විට, අපි එකම යතුර සමඟ පවතින සැසි සොයන්නෙමු, වත්මන් දිනය/වේලා මුද්‍රාවට වඩා අඩු අවසන් වේලාවක් - අකර්මන්‍ය කාල පරතරය, සහ වත්මන් දිනය/වේලා මුද්‍රාව + අක්‍රිය කාල පරතරයට වඩා වැඩි ආරම්භක වේලාවක්. මෙය සැලකිල්ලට ගනිමින්, වගුවෙන් ඇතුළත් කිරීම් හතරක්. 5.1 පහත පරිදි තනි සැසියකට ඒකාබද්ධ කර ඇත.

1. වාර්තා 1 පළමුව පැමිණේ, එබැවින් ආරම්භක වේලාව අවසන් වේලාවට සමාන වන අතර 00:00:00 වේ.

2. ඊළඟට, ප්‍රවේශය 2 පැමිණෙන අතර, අපි 23:59:55 ට පෙර අවසන් නොවන සහ 00:00:35 ට පසුව ආරම්භ නොවන සැසි සොයමු. අපි වාර්තා 1 සොයාගෙන 1 සහ 2 සැසි ඒකාබද්ධ කරමු. අපි සැසියේ 1 (පෙර) ආරම්භක වේලාව සහ 2 සැසියේ අවසන් කාලය (පසුව) ගනිමු, එවිට අපගේ නව සැසිය 00:00:00 ට ආරම්භ වී 00 ට අවසන් වේ: 00:15.

3. වාර්තා 3 පැමිණේ, අපි 00:00:30 සහ 00:01:10 අතර සැසි සොයන අතර කිසිවක් සොයා නොගනිමු. යතුර 123-345-654,FFBE සඳහා දෙවන සැසියක් එක් කරන්න, 00:00:50 ට ආරම්භ කර අවසන් කරන්න.

4. වාර්තා 4 පැමිණෙන අතර අපි 23:59:45 සහ 00:00:25 අතර සැසි සොයමින් සිටිමු. මෙවර සැසි 1 සහ 2 දෙකම හමු වේ. සියලුම සැසි තුනම එකකට ඒකාබද්ධ කර ඇති අතර ආරම්භක වේලාව 00:00:00 සහ අවසන් වේලාව 00:00:15 වේ.

මෙම කොටසේ විස්තර කර ඇති දෙයින්, පහත සඳහන් වැදගත් සූක්ෂ්ම කරුණු මතක තබා ගැනීම වටී:

  • සැසි ස්ථාවර ප්‍රමාණයේ කවුළු නොවේ. සැසියක කාලසීමාව තීරණය වන්නේ යම් කාල සීමාවක් තුළ ක්‍රියාකාරකම් මගිනි;
  • දත්තවල ඇති දිනය/වේලා මුද්දර මගින් සිදුවීම පවතින සැසියක් තුළට හෝ අක්‍රිය කාලපරිච්ඡේදයකට අයත් වේද යන්න තීරණය කරයි.

ඊළඟට අපි ඊළඟ වර්ගයේ කවුළුව ගැන සාකච්ඡා කරමු - "ටම්බල්" කවුළු.

"පෙරළෙන" කවුළු

ටම්බ්ලිං කවුළු නිශ්චිත කාල සීමාවක් තුළ සිදුවන සිදුවීම් ග්‍රහණය කරයි. සෑම තත්පර 20 කට වරක් ඔබ යම් සමාගමක සියලුම කොටස් ගනුදෙනු ග්‍රහණය කර ගත යුතු යැයි සිතන්න, එබැවින් ඔබ එම කාලය තුළ සියලුම සිදුවීම් එකතු කරයි. තත්පර 20 ක පරතරය අවසානයේ, කවුළුව පෙරළී නව තත්පර 20 නිරීක්ෂණ පරතරයකට ගමන් කරයි. රූප සටහන 5.14 මෙම තත්වය පෙන්නුම් කරයි.

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"
ඔබට පෙනෙන පරිදි, අවසාන තත්පර 20 තුළ ලැබුණු සියලුම සිදුවීම් කවුළුව තුළ ඇතුළත් වේ. මෙම කාලය අවසානයේදී, නව කවුළුවක් නිර්මාණය වේ.

ලැයිස්තුගත 5.6 සෑම තත්පර 20 කට වරක් කොටස් ගනුදෙනු ග්‍රහණය කර ගැනීම සඳහා ටම්බ්ලිං කවුළු භාවිතය පෙන්නුම් කරන කේතය පෙන්වයි (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java හි දක්නට ලැබේ).

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"
TimeWindows.of ක්‍රම ඇමතුමට මෙම කුඩා වෙනස සමඟ, ඔබට ටම්බ්ලිං කවුළුවක් භාවිත කළ හැක. මෙම උදාහරණය දක්වා() ක්‍රමය හඳුන්වන්නේ නැත, එබැවින් පැය 24ක පෙරනිමි රඳවා ගැනීමේ පරතරය භාවිතා වේ.

අවසාන වශයෙන්, අවසාන කවුළු විකල්පයන් වෙත යාමට කාලයයි - "පැන යන" කවුළු.

ස්ලයිඩින් ("පනින") කවුළු

ස්ලයිඩින්/පහන් ජනේල ටම්බ්ලිං කවුළුවලට සමාන නමුත් සුළු වෙනසක් ඇත. මෑත සිදුවීම් සැකසීමට නව කවුළුවක් සෑදීමට පෙර ස්ලයිඩින් කවුළු කාල පරතරය අවසන් වන තෙක් බලා නොසිටියි. ඔවුන් නව ගණනය කිරීම් ආරම්භ කරන්නේ කවුළු කාල සීමාවට වඩා අඩු පොරොත්තු විරාමයකින් පසුවය.

ජනේල පෙරලීම සහ පැනීම අතර ඇති වෙනස්කම් නිදර්ශනය කිරීම සඳහා, කොටස් හුවමාරු ගනුදෙනු ගණනය කිරීමේ උදාහරණය වෙත ආපසු යමු. අපගේ ඉලක්කය තවමත් ගණුදෙණු ගණන ගණනය කිරීම, නමුත් කවුන්ටරය යාවත්කාලීන කිරීමට පෙර මුළු කාලයම බලා සිටීමට අපට අවශ්‍ය නැත. ඒ වෙනුවට, අපි කවුන්ටරය කෙටි කාල පරතරයකින් යාවත්කාලීන කරන්නෙමු. උදාහරණයක් ලෙස, අපි තවමත් සෑම තත්පර 20 කට වරක් ගනුදෙනු ගණන ගණන් කරන්නෙමු, නමුත් රූපයේ පෙන්වා ඇති පරිදි සෑම තත්පර 5 කට වරක් කවුන්ටරය යාවත්කාලීන කරන්නෙමු. 5.15. මෙම අවස්ථාවේදී, අපි අතිච්ඡාදනය වන දත්ත සහිත ප්රතිඵල කවුළු තුනක් සමඟ අවසන් කරමු.

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"
ලැයිස්තු 5.7 ස්ලයිඩින් කවුළු නිර්වචනය කිරීමේ කේතය පෙන්වයි (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java හි දක්නට ලැබේ).

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"
advanceBy() ක්‍රමයට ඇමතුමක් එක් කිරීමෙන් ටම්බ්ලිං කවුළුවක් පනින කවුළුවක් බවට පරිවර්තනය කළ හැක. පෙන්වා ඇති උදාහරණයේ, ඉතිරි කිරීමේ පරතරය විනාඩි 15 කි.

එකතු කිරීමේ ප්‍රතිඵල කාල කවුළුවලට සීමා කරන්නේ කෙසේදැයි ඔබ මෙම කොටසින් දැක ඇත. විශේෂයෙන්ම, මෙම කොටසින් පහත කරුණු තුන ඔබට මතක තබා ගැනීමට මට අවශ්‍යය.

  • සැසි කවුළු වල ප්‍රමාණය සීමා වන්නේ කාල සීමාව අනුව නොව, පරිශීලක ක්‍රියාකාරකම් අනුව ය;
  • "tumbling" කවුළු යම් කාල සීමාවක් තුළ සිදුවීම් පිළිබඳ දළ විශ්ලේෂණයක් සපයයි;
  • පැනීමේ කවුළු වල කාලසීමාව නියම කර ඇත, නමුත් ඒවා නිතර යාවත්කාලීන වන අතර සියලුම කවුළු වල අතිච්ඡාදනය වන ඇතුළත් කිරීම් අඩංගු විය හැක.

මීළඟට, අපි KTable එකක් නැවත KStream එකක් වෙත සම්බන්ධතාවයක් සඳහා පරිවර්තනය කරන්නේ කෙසේදැයි ඉගෙන ගනිමු.

5.3.3. KStream සහ KTable වස්තු සම්බන්ධ කිරීම

4 වන පරිච්ඡේදයේදී, අපි KStream වස්තු දෙකක් සම්බන්ධ කිරීම ගැන සාකච්ඡා කළෙමු. දැන් KTable සහ KStream සම්බන්ධ කරන්නේ කෙසේදැයි ඉගෙන ගත යුතුය. පහත සරල හේතුව නිසා මෙය අවශ්‍ය විය හැක. KStream යනු වාර්තා ප්‍රවාහයක් වන අතර KTable යනු වාර්තා යාවත්කාලීන ප්‍රවාහයකි, නමුත් සමහර විට ඔබට KTable වෙතින් වන යාවත්කාලීන භාවිතයෙන් වාර්තා ප්‍රවාහයට අමතර සන්දර්භයක් එක් කිරීමට අවශ්‍ය විය හැක.

කොටස් හුවමාරු ගනුදෙනු සංඛ්‍යාව පිළිබඳ දත්ත ගෙන ඒවා අදාළ කර්මාන්ත සඳහා කොටස් හුවමාරු ප්‍රවෘත්ති සමඟ ඒකාබද්ධ කරමු. ඔබට දැනටමත් ඇති කේතය ලබා දී මෙය සාක්ෂාත් කර ගැනීමට ඔබ කළ යුතු දේ මෙන්න.

  1. කොටස් ගනුදෙනු සංඛ්‍යාව පිළිබඳ දත්ත සහිත KTable වස්තුවක් KStream බවට පරිවර්තනය කරන්න, ඉන්පසු මෙම කොටස් සංකේතයට අදාළ කර්මාන්ත අංශය දැක්වෙන යතුර සමඟ යතුර ප්‍රතිස්ථාපනය කරන්න.
  2. කොටස් හුවමාරු පුවත් සමඟ මාතෘකාවකින් දත්ත කියවන KTable වස්තුවක් සාදන්න. මෙම නව KTable කර්මාන්ත අංශය අනුව වර්ග කෙරේ.
  3. කර්මාන්ත අංශය විසින් කොටස් හුවමාරු ගනුදෙනු ගණන පිළිබඳ තොරතුරු සමඟ පුවත් යාවත්කාලීන සම්බන්ධ කරන්න.

දැන් අපි බලමු කොහොමද මේ ක්‍රියාකාරී සැලැස්ම ක්‍රියාත්මක කරන්නේ කියලා.

KTable to KStream බවට පරිවර්තනය කරන්න

KTable KStream බවට පරිවර්තනය කිරීමට ඔබ පහත දේ කළ යුතුය.

  1. KTable.toStream() ක්‍රමය අමතන්න.
  2. KStream.map ක්‍රමය ඇමතීමෙන්, යතුර කර්මාන්ත නාමයෙන් ප්‍රතිස්ථාපනය කරන්න, ඉන්පසු Windowed අවස්ථාවෙන් TransactionSummary වස්තුව ලබා ගන්න.

අපි මෙම මෙහෙයුම් එකට පහත පරිදි සම්බන්ධ කරන්නෙමු (කේතය src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java ගොනුවෙන් සොයාගත හැක) (ලැයිස්තුගත කිරීම 5.8).

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"
අපි KStream.map මෙහෙයුමක් සිදු කරන නිසා, ආපසු ලබා දුන් KStream අවස්ථාව සම්බන්ධතාවයක භාවිතා කරන විට එය ස්වයංක්‍රීයව නැවත කොටස් කෙරේ.

අපි පරිවර්තන ක්‍රියාවලිය සම්පූර්ණ කර ඇත, ඊළඟට අපට කොටස් ප්‍රවෘත්ති කියවීම සඳහා KTable වස්තුවක් සෑදිය යුතුය.

කොටස් පුවත් සඳහා KTable නිර්මාණය කිරීම

වාසනාවකට මෙන්, KTable වස්තුවක් සෑදීමට ගත වන්නේ එක් කේත පේළියක් පමණි (කේතය src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java හි සොයා ගත හැක) (ලැයිස්තුගත කිරීම 5.9).

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"
සැකසුම් තුළ Serdes string භාවිතා කරන බැවින්, Serde වස්තු කිසිවක් සඳහන් කිරීමට අවශ්‍ය නොවන බව සඳහන් කිරීම වටී. එසේම, EARLIEST ගණන් කිරීම භාවිතා කිරීමෙන්, වගුව ආරම්භයේදීම වාර්තා වලින් පිරී ඇත.

දැන් අපට අවසාන පියවර වෙත යා හැකිය - සම්බන්ධතාවය.

ගනුදෙනු ගණන් දත්ත සමඟ පුවත් යාවත්කාලීන සම්බන්ධ කිරීම

සම්බන්ධතාවයක් නිර්මාණය කිරීම අපහසු නැත. අදාළ කර්මාන්තය සඳහා කොටස් ප්‍රවෘත්ති නොමැති අවස්ථාවක අපි වම් සම්බන්ධකයක් භාවිතා කරන්නෙමු (අවශ්‍ය කේතය src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java ගොනුවෙන් සොයාගත හැකිය) (ලැයිස්තුගත කිරීම 5.10).

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"
මෙම leftJoin ක්රියාකරු ඉතා සරලයි. 4 වන පරිච්ඡේදයේ ඇති Joins මෙන් නොව, JoinWindow ක්‍රමය භාවිතා නොවේ, මන්ද KStream-KTable Join කිරීමක් සිදු කරන විට, KTable එකෙහි එක් එක් යතුර සඳහා ඇත්තේ එක් ප්‍රවේශයක් පමණි. එවැනි සම්බන්ධතාවයක් කාලය තුළ සීමා නොවේ: වාර්තාව KTable හි හෝ නොපැමිණේ. ප්‍රධාන නිගමනය: KTable objects භාවිතයෙන් ඔබට අඩුවෙන් යාවත්කාලීන කරන ලද විමර්ශන දත්ත සමඟ KStream පොහොසත් කළ හැක.

දැන් අපි KStream වෙතින් සිදුවීම් පොහොසත් කිරීමට වඩාත් කාර්යක්ෂම ක්‍රමයක් දෙස බලමු.

5.3.4. GlobalKTable වස්තූන්

ඔබට පෙනෙන පරිදි, සිදුවීම් ප්‍රවාහයන් පොහොසත් කිරීමට හෝ ඒවාට සන්දර්භය එක් කිරීමට අවශ්‍ය වේ. 4 වන පරිච්ඡේදයේ ඔබ KStream වස්තු දෙකක් අතර සම්බන්ධතා දැක ඇති අතර, පෙර කොටසේදී ඔබ KStream සහ KTable අතර සම්බන්ධය දුටුවේය. මෙම සියලු අවස්ථා වලදී, යතුරු නව වර්ගයකට හෝ අගයකට සිතියම්ගත කිරීමේදී දත්ත ප්‍රවාහය නැවත කොටස් කිරීම අවශ්‍ය වේ. සමහර විට නැවත කොටස් කිරීම පැහැදිලිව සිදු කරන අතර සමහර විට Kafka Streams එය ස්වයංක්‍රීයව සිදු කරයි. යතුරු වෙනස් වී ඇති බැවින් නැවත කොටස් කිරීම අවශ්‍ය වන අතර වාර්තා නව කොටස් වලින් අවසන් විය යුතුය, එසේ නොමැතිනම් සම්බන්ධතාවය කළ නොහැකි වනු ඇත (මෙය 4 වන පරිච්ඡේදයේ, 4.2.4 උපවගන්තියේ "දත්ත නැවත කොටස් කිරීම" යන කොටසේ සාකච්ඡා කර ඇත).

නැවත කොටස් කිරීම සඳහා පිරිවැයක් ඇත

නැවත කොටස් කිරීම සඳහා වියදම් අවශ්‍ය වේ - අතරමැදි මාතෘකා නිර්මාණය කිරීම, වෙනත් මාතෘකාවක අනුපිටපත් දත්ත ගබඩා කිරීම සඳහා අමතර සම්පත් පිරිවැය; මෙම මාතෘකාවෙන් ලිවීම සහ කියවීම හේතුවෙන් ප්‍රමාද වැඩි වීම ද එයින් අදහස් වේ. මීට අමතරව, ඔබට එක් අංශයකට හෝ මානයකට වඩා සම්බන්ධ වීමට අවශ්‍ය නම්, ඔබ සම්බන්ධ කිරීම් දාමයට සම්බන්ධ කළ යුතුය, නව යතුරු සමඟ වාර්තා සිතියම්ගත කළ යුතුය, සහ නැවත කොටස් කිරීමේ ක්‍රියාවලිය නැවත ක්‍රියාත්මක කළ යුතුය.

කුඩා දත්ත කට්ටල වෙත සම්බන්ධ කිරීම

සමහර අවස්ථාවලදී, සම්බන්ධ කළ යුතු සමුද්දේශ දත්ත පරිමාව සාපේක්ෂව කුඩා වේ, එබැවින් එහි සම්පූර්ණ පිටපත් පහසුවෙන් එක් එක් නෝඩයට දේශීයව ගැලපේ. මෙවැනි අවස්ථා සඳහා, Kafka Streams GlobalKTable පන්තිය සපයයි.

GlobalKTable අවස්ථාවන් අද්විතීය වන්නේ යෙදුම එක් එක් නෝඩ් වෙත සියලු දත්ත ප්‍රතිවර්තනය කරන බැවිනි. තවද සියලුම දත්ත එක් එක් නෝඩය මත පවතින බැවින්, එය සියලු කොටස් සඳහා ලබා ගත හැකි වන පරිදි විමර්ශන දත්ත යතුර මගින් සිදුවීම් ප්‍රවාහය කොටස් කිරීමට අවශ්‍ය නොවේ. ඔබට GlobalKTable objects භාවිතයෙන් යතුරු රහිත සම්බන්ධ කිරීම්ද කළ හැක. මෙම විශේෂාංගය විදහා දැක්වීමට පෙර උදාහරණවලින් එකකට ආපසු යමු.

KStream වස්තූන් GlobalKTable වස්තූන් වෙත සම්බන්ධ කිරීම

5.3.2 උපවගන්තිය තුළ, අපි ගැනුම්කරුවන් විසින් හුවමාරු ගනුදෙනු ජනෙල් ඒකරාශී කිරීම සිදු කරන ලදී. මෙම එකතු කිරීමේ ප්‍රතිඵල මේ වගේ දෙයක් විය:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

මෙම ප්‍රතිඵල අරමුණ ඉටු කරන අතරම, පාරිභෝගිකයාගේ නම සහ සම්පූර්ණ සමාගමේ නම ද ප්‍රදර්ශනය කළේ නම් එය වඩාත් ප්‍රයෝජනවත් වනු ඇත. පාරිභෝගික නාමය සහ සමාගමේ නම එකතු කිරීම සඳහා, ඔබට සාමාන්‍ය සම්බන්ධවීම් සිදු කළ හැක, නමුත් ඔබට යතුරු සිතියම්ගත කිරීම් දෙකක් සහ නැවත කොටස් කිරීම සිදු කිරීමට සිදුවේ. GlobalKTable සමඟින් ඔබට එවැනි මෙහෙයුම්වල පිරිවැය වළක්වා ගත හැකිය.

මෙය සිදු කිරීම සඳහා, අපි Listing 5.11 වෙතින් countStream වස්තුව භාවිතා කරන්නෙමු (අනුරූප කේතය src/main/java/bbejeck/chapter_5/GlobalKTableExample.java හි සොයා ගත හැක) සහ GlobalKTable objects දෙකකට සම්බන්ධ කරන්න.

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"
අපි දැනටමත් මේ ගැන සාකච්ඡා කර ඇති නිසා මම එය නැවත නොකියමි. නමුත් toStream().map ශ්‍රිතයේ ඇති කේතය කියවීමේ හැකියාව සඳහා පේළිගත ලැම්ඩා ප්‍රකාශනයක් වෙනුවට ශ්‍රිත වස්තුවක් බවට සංක්ෂිප්ත කර ඇති බව මම සටහන් කරමි.

මීලඟ පියවර වන්නේ GlobalKTable හි අවස්ථා දෙකක් ප්‍රකාශ කිරීමයි (පෙන්වන කේතය src/main/java/bbejeck/chapter_5/GlobalKTableExample.java ගොනුවේ සොයාගත හැක) (ලැයිස්තුගත කිරීම 5.12).

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"

මාතෘකා නම් විස්තර කර ඇත්තේ ගණන් කළ වර්ග භාවිතයෙන් බව කරුණාවෙන් සලකන්න.

දැන් අප සතුව සියලුම සංරචක සූදානම් කර ඇති බැවින්, ඉතිරිව ඇත්තේ සම්බන්ධතාවය සඳහා කේතය ලිවීමයි (එය src/main/java/bbejeck/chapter_5/GlobalKTableExample.java ගොනුවෙන් සොයාගත හැකිය) (ලැයිස්තුගත කිරීම 5.13).

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"
මෙම කේතයේ එක්වීම් දෙකක් ඇතත්, ඒවායේ ප්‍රතිඵල කිසිවක් වෙන වෙනම භාවිතා නොකරන නිසා ඒවා දම්වැලකින් බැඳී ඇත. සම්පූර්ණ මෙහෙයුම අවසානයේ ප්රතිඵල ප්රදර්ශනය කෙරේ.

ඔබ ඉහත එක්වීමේ මෙහෙයුම ක්‍රියාත්මක කරන විට, ඔබට මෙවැනි ප්‍රතිඵල ලැබෙනු ඇත:

{customer='Barney, Smith' company="Exxon", transactions= 17}

සාරය වෙනස් වී නැත, නමුත් මෙම ප්රතිඵල වඩාත් පැහැදිලිව පෙනේ.

ඔබ 4 වන පරිච්ඡේදය දක්වා ගණන් කළහොත්, ඔබ දැනටමත් ක්‍රියාත්මක වන සම්බන්ධතා වර්ග කිහිපයක් දැක ඇත. ඒවා වගුවේ දක්වා ඇත. 5.2 මෙම වගුව Kafka Streams හි 1.0.0 අනුවාදයේ සම්බන්ධතා හැකියාවන් පිළිබිඹු කරයි; අනාගත නිකුතු වලදී යමක් වෙනස් විය හැක.

"කෆ්කා ස්ට්රීම්ස් ඉන් ඇක්ෂන්" පොත. තත්‍ය කාලීන වැඩ සඳහා යෙදුම් සහ ක්ෂුද්‍ර සේවා"
දේවල් අවසන් කිරීමට, අපි මූලික කරුණු නැවත සලකා බලමු: ඔබට ප්‍රාදේශීය තත්ත්වය භාවිතයෙන් සිදුවීම් ප්‍රවාහ (KStream) සහ යාවත්කාලීන ප්‍රවාහ (KTable) සම්බන්ධ කළ හැක. විකල්පයක් ලෙස, යොමු දත්තවල ප්‍රමාණය ඉතා විශාල නොවේ නම්, ඔබට GlobalKTable වස්තුව භාවිතා කළ හැක. GlobalKTables සෑම Kafka Streams යෙදුම් නෝඩයකටම සියලුම කොටස් ප්‍රතිනිර්මාණය කරයි, යතුර කුමන කොටසට අනුරූප වුවද සියලුම දත්ත ලබා ගත හැකි බව සහතික කරයි.

මීළඟට අපි Kafka Streams විශේෂාංගය දකිමු, එයට ස්තූතිවන්ත වන්නට Kafka මාතෘකාවකින් දත්ත පරිභෝජනය නොකර රාජ්‍ය වෙනස්කම් නිරීක්ෂණය කළ හැකිය.

5.3.5 විමසිය හැකි තත්වය

අපි දැනටමත් රාජ්‍ය සම්බන්ධ මෙහෙයුම් කිහිපයක් සිදු කර ඇති අතර සෑම විටම ප්‍රතිඵල කොන්සෝලයට (සංවර්ධන අරමුණු සඳහා) ප්‍රතිදානය කරන්න හෝ මාතෘකාවකට (නිෂ්පාදන අරමුණු සඳහා) ලියන්නෙමු. මාතෘකාවකට ප්‍රතිඵල ලිවීමේදී ඒවා බැලීම සඳහා Kafka පාරිභෝගිකයෙකු භාවිතා කිරීමට සිදුවේ.

මෙම මාතෘකා වලින් දත්ත කියවීම ද්‍රව්‍යමය දර්ශන වර්ගයක් ලෙස සැලකිය හැකිය. අපගේ අරමුණු සඳහා, අපට විකිපීඩියාවෙන් ද්‍රව්‍යමය දර්ශනයක නිර්වචනය භාවිතා කළ හැකිය: “... විමසුමක ප්‍රතිඵල අඩංගු භෞතික දත්ත සමුදා වස්තුවකි. උදාහරණයක් ලෙස, එය දුරස්ථ දත්තවල දේශීය පිටපතක්, හෝ වගුවක පේළි සහ/හෝ තීරුවල උප කුලකයක් හෝ එක්වීමේ ප්‍රතිඵලයක් හෝ එකතු කිරීම හරහා ලබාගත් සාරාංශ වගුවක් විය හැකිය” (https://en.wikipedia.org/wiki /Materialized_view).

Kafka Streams ඔබට රාජ්‍ය වෙළඳසැල් වල අන්තර්ක්‍රියාකාරී විමසුම් ධාවනය කිරීමට ඉඩ සලසයි, ඔබට මෙම ද්‍රව්‍යකරණය වූ දසුන් කෙලින්ම කියවීමට ඉඩ සලසයි. රාජ්ය ගබඩාව වෙත විමසුම කියවීමට පමණක් ක්රියාත්මක වන බව සැලකිල්ලට ගැනීම වැදගත්ය. ඔබගේ යෙදුම දත්ත සකසන අතරතුර අහම්බෙන් තත්වය අනනුකූල කිරීම ගැන කරදර විය යුතු නැති බව මෙය සහතික කරයි.

රාජ්ය ගබඩා සෘජුවම විමසීමේ හැකියාව වැදගත් වේ. මෙයින් අදහස් කරන්නේ ඔබට මුලින්ම Kafka පාරිභෝගිකයාගෙන් දත්ත ලබා ගැනීමෙන් තොරව උපකරණ පුවරු යෙදුම් නිර්මාණය කළ හැකි බවයි. නැවත දත්ත ලිවීමට අවශ්‍ය නොවන නිසා එය යෙදුමේ කාර්යක්ෂමතාව වැඩි කරයි:

  • දත්තවල ප්‍රදේශයට ස්තූතිවන්ත වන අතර, ඒවාට ඉක්මනින් ප්‍රවේශ විය හැකිය;
  • බාහිර ගබඩාවට ලියා නොමැති බැවින් දත්ත අනුපිටපත් කිරීම ඉවත් කරනු ලැබේ.

මට ඔබ මතක තබා ගැනීමට අවශ්‍ය ප්‍රධානතම දෙය නම්, ඔබට ඔබගේ යෙදුම තුළ සිට රාජ්‍යය කෙලින්ම විමසිය හැකි බවයි. මෙය ඔබට ලබා දෙන අවස්ථාවන් අධිතක්සේරු කළ නොහැක. Kafka වෙතින් දත්ත පරිභෝජනය කිරීම සහ යෙදුම සඳහා දත්ත ගබඩාවක වාර්තා ගබඩා කිරීම වෙනුවට, ඔබට එම ප්‍රතිඵලය සහිත රාජ්‍ය ගබඩාවලින් විමසිය හැක. රාජ්ය ගබඩා වෙත සෘජු විමසුම් යනු අඩු කේතයක් (පාරිභෝගිකයෙක් නැත) සහ අඩු මෘදුකාංගයක් (ප්රතිඵල ගබඩා කිරීම සඳහා දත්ත සමුදා වගුවක් අවශ්ය නොවේ).

අපි මෙම පරිච්ඡේදයේ බොහෝ කරුණු ආවරණය කර ඇත, එබැවින් අපි දැනට රාජ්‍ය වෙළඳසැල් වලට එරෙහිව අන්තර්ක්‍රියාකාරී විමසුම් පිළිබඳ අපගේ සාකච්ඡාව තබමු. නමුත් කරදර නොවන්න: 9 වන පරිච්ඡේදයේදී, අපි අන්තර්ක්‍රියාකාරී විමසුම් සහිත සරල උපකරණ පුවරු යෙදුමක් සාදන්නෙමු. අන්තර්ක්‍රියාකාරී විමසුම් සහ ඔබට ඒවා Kafka Streams යෙදුම් වෙත එක් කළ හැකි ආකාරය නිරූපණය කිරීමට මෙම සහ පෙර පරිච්ඡේදවල සමහර උදාහරණ භාවිතා කරනු ඇත.

සාරාංශය

  • KStream වස්තු දත්ත සමුදායකට ඇතුළු කිරීම් හා සැසඳිය හැකි සිදුවීම් ප්‍රවාහ නියෝජනය කරයි. KTable වස්තූන් දත්ත සමුදායකට යාවත්කාලීන කිරීම් වැනි යාවත්කාලීන ප්‍රවාහ නියෝජනය කරයි. KTable වස්තුවේ විශාලත්වය වර්ධනය නොවේ, පැරණි වාර්තා නව ඒවා මගින් ප්රතිස්ථාපනය වේ.
  • එකතු කිරීමේ මෙහෙයුම් සඳහා KTable වස්තු අවශ්ය වේ.
  • කවුළු මෙහෙයුම් භාවිතයෙන්, ඔබට එකතු කළ දත්ත කාල බකට් වලට බෙදිය හැක.
  • GlobalKTable objects වලට ස්තූතියි, ඔබට කොටස් කිරීම නොසලකා යෙදුමේ ඕනෑම තැනක විමර්ශන දත්ත වෙත ප්‍රවේශ විය හැක.
  • KStream, KTable සහ GlobalKTable වස්තූන් අතර සම්බන්ධතා ඇති විය හැක.

මෙතෙක්, අපි ඉහළ මට්ටමේ KStream DSL භාවිතයෙන් Kafka Streams යෙදුම් තැනීම කෙරෙහි අවධානය යොමු කර ඇත. ඉහළ මට්ටමේ ප්‍රවේශය ඔබට පිළිවෙලට සහ සංක්ෂිප්ත වැඩසටහන් නිර්මාණය කිරීමට ඉඩ ලබා දුන්නද, එය භාවිතා කිරීම වෙළඳාමක් නියෝජනය කරයි. DSL KStream සමඟ වැඩ කිරීම යනු පාලන මට්ටම අඩු කිරීමෙන් ඔබේ කේතයේ සංක්ෂිප්තභාවය වැඩි කිරීමයි. මීළඟ පරිච්ඡේදයේදී, අපි පහත් මට්ටමේ හසුරුවන්න නෝඩ් API දෙස බලා වෙනත් වෙළඳාම් කිරීමට උත්සාහ කරමු. වැඩසටහන් පෙරට වඩා දිගු වනු ඇත, නමුත් අපට අවශ්‍ය විය හැකි ඕනෑම හසුරුවන නෝඩයක් සෑදීමට අපට හැකි වනු ඇත.

→ පොත පිළිබඳ වැඩි විස්තර මෙතැනින් සොයාගත හැකිය ප්රකාශකයාගේ වෙබ් අඩවිය

→ Habrozhiteli සඳහා කූපනය භාවිතයෙන් 25% වට්ටමක් - කෆ්කා ධාරා

→ පොතේ කඩදාසි පිටපත සඳහා ගෙවීමෙන් පසු විද්‍යුත් පොතක් විද්‍යුත් තැපෑලෙන් එවනු ලැබේ.

මූලාශ්රය: www.habr.com

අදහස් එක් කරන්න