Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi" Habari, wakazi wa Khabro! Kitabu hiki kinafaa kwa msanidi programu yeyote ambaye anataka kuelewa uchakataji wa nyuzi. Kuelewa programu zinazosambazwa kutakusaidia kuelewa vyema Mipasho ya Kafka na Kafka. Itakuwa nzuri kujua mfumo wa Kafka yenyewe, lakini hii sio lazima: Nitakuambia kila kitu unachohitaji. Wasanidi programu wa Kafka na wanovice walio na uzoefu watajifunza jinsi ya kuunda programu zinazovutia za kuchakata mitiririko kwa kutumia maktaba ya Kafka Streams katika kitabu hiki. Wasanidi programu wa Java wa kati na wa hali ya juu ambao tayari wanafahamu dhana kama vile utayarishaji mfululizo watajifunza kutumia ujuzi wao ili kuunda programu za Kafka Streams. Nambari ya chanzo cha kitabu imeandikwa katika Java 8 na hutumia sana syntax ya kujieleza ya Java 8, kwa hivyo kujua jinsi ya kufanya kazi na kazi za lambda (hata katika lugha nyingine ya programu) kutakusaidia.

Dondoo. 5.3. Shughuli za kujumlisha na kuweka madirisha

Katika sehemu hii, tutaendelea kuchunguza sehemu zenye matumaini zaidi za Mipasho ya Kafka. Kufikia sasa tumeshughulikia vipengele vifuatavyo vya Mipasho ya Kafka:

  • kuunda topolojia ya usindikaji;
  • kutumia hali katika programu za utiririshaji;
  • kufanya miunganisho ya mkondo wa data;
  • tofauti kati ya mitiririko ya matukio (KStream) na mipasho ya kusasisha (KTable).

Katika mifano ifuatayo tutaleta vipengele hivi vyote pamoja. Pia utajifunza kuhusu windowsing, kipengele kingine kizuri cha utiririshaji wa programu. Mfano wetu wa kwanza utakuwa mkusanyiko rahisi.

5.3.1. Mkusanyiko wa mauzo ya hisa kulingana na sekta ya tasnia

Kujumlisha na kupanga ni zana muhimu wakati wa kufanya kazi na data ya kutiririsha. Uchunguzi wa rekodi za kibinafsi jinsi zinavyopokelewa mara nyingi hautoshi. Ili kutoa maelezo ya ziada kutoka kwa data, ni muhimu kuunganisha na kuchanganya.

Katika mfano huu, utavaa vazi la mfanyabiashara wa siku ambaye anahitaji kufuatilia kiasi cha mauzo ya hisa za makampuni katika tasnia kadhaa. Hasa, unavutiwa na kampuni tano zilizo na mauzo makubwa zaidi ya hisa katika kila tasnia.

Ujumlisho kama huo utahitaji hatua kadhaa zifuatazo ili kutafsiri data katika fomu inayotakiwa (kuzungumza kwa maneno ya jumla).

  1. Unda chanzo kulingana na mada ambacho huchapisha maelezo ya biashara ya hisa ghafi. Tutalazimika kuchora kitu cha aina ya StockTransaction kwa kitu cha aina ya ShareVolume. Jambo ni kwamba kitu cha StockTransaction kina metadata ya mauzo, lakini tunahitaji tu data kuhusu idadi ya hisa zinazouzwa.
  2. Data ya kikundi cha ShareVolume kwa ishara ya hisa. Baada ya kupangwa kwa alama, unaweza kukunja data hii katika jumla ndogo za kiasi cha mauzo ya hisa. Inafaa kukumbuka kuwa njia ya KStream.groupBy inarudisha mfano wa aina ya KGroupedStream. Na unaweza kupata mfano wa KTable kwa kupiga zaidi mbinu ya KGroupedStream.reduce.

Kiolesura cha KGroupedStream ni nini

Mbinu za KStream.groupBy na KStream.groupByKey hurejesha mfano wa KGroupedStream. KGroupedStream ni uwakilishi wa kati wa mtiririko wa matukio baada ya kupangwa kwa funguo. Haikusudiwa kufanya kazi moja kwa moja nayo. Badala yake, KGroupedStream inatumika kwa shughuli za kujumlisha, ambazo kila mara husababisha KTable. Na kwa kuwa matokeo ya shughuli za kujumlisha ni KTable na wanatumia duka la serikali, inawezekana kwamba sio sasisho zote kama matokeo zinatumwa zaidi chini ya bomba.

Njia ya KTable.groupBy inarudisha KGroupedTable sawa - uwakilishi wa kati wa mtiririko wa sasisho, zilizounganishwa tena na ufunguo.

Hebu tuchukue mapumziko mafupi na tuangalie Mtini. 5.9, ambayo inaonyesha kile ambacho tumefanikiwa. Topolojia hii inapaswa kuwa tayari kujulikana sana kwako.

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"
Hebu sasa tuangalie msimbo wa topolojia hii (inaweza kupatikana kwenye faili src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Orodha 5.2).

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"
Nambari iliyotolewa inatofautishwa na ufupi wake na idadi kubwa ya vitendo vinavyofanywa katika mistari kadhaa. Unaweza kugundua kitu kipya katika kigezo cha kwanza cha njia ya builder.stream: thamani ya aina ya enum AutoOffsetReset.EARLIEST (pia kuna LATEST), iliyowekwa kwa kutumia mbinu ya Consumed.withOffsetResetPolicy. Aina hii ya hesabu inaweza kutumika kubainisha mkakati wa kuweka upya kwa kila KStream au KTable na inachukua nafasi ya kwanza juu ya chaguo la kuweka upya kutoka kwa usanidi.

GroupByKey na GroupBy

Kiolesura cha KStream kina mbinu mbili za kupanga rekodi: GroupByKey na GroupBy. Zote mbili zinarudisha KGroupedTable, kwa hivyo unaweza kuwa unajiuliza ni tofauti gani kati yao na wakati wa kutumia ipi?

Mbinu ya GroupByKey inatumika wakati funguo katika KStream tayari hazina tupu. Na muhimu zaidi, bendera "inahitaji kugawanywa tena" haikuwekwa kamwe.

Njia ya GroupBy inadhania kuwa umebadilisha funguo za kupanga, kwa hivyo bendera ya ugawaji imewekwa kuwa kweli. Kutekeleza viungio, mijumuisho, n.k. baada ya mbinu ya GroupBy kutasababisha kugawanya upya kiotomatiki.
Muhtasari: Inapowezekana, unapaswa kutumia GroupByKey badala ya GroupBy.

Ni wazi ni nini mapValues ​​​​na njia za groupBy hufanya, kwa hivyo hebu tuangalie sum() njia (inayopatikana katika src/main/java/bbejeck/model/ShareVolume.java) (Orodha 5.3).

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"
Mbinu ya ShareVolume.sum inarejesha jumla inayoendeshwa ya kiasi cha mauzo ya hisa, na matokeo ya msururu mzima wa hesabu ni kitu cha KTable. . Sasa unaelewa jukumu la KTable inacheza. Wakati vitu vya ShareVolume vinapowasili, kitu kinacholingana cha KTable huhifadhi sasisho la hivi punde. Ni muhimu kukumbuka kwamba masasisho yote yanaonyeshwa katika shareVolumeKTable ya awali, lakini sio zote zinatumwa zaidi.

Kisha tunatumia KTable hii kujumlisha (kwa idadi ya hisa zinazouzwa) kufikia kampuni tano zilizo na hisa nyingi zaidi zinazouzwa katika kila sekta. Matendo yetu katika kesi hii yatakuwa sawa na yale ya mkusanyiko wa kwanza.

  1. Tekeleza kikundi kingineKwa operesheni ya kupanga vitu vya ShareVolume binafsi kulingana na tasnia.
  2. Anza kufanya muhtasari wa vipengee vya ShareVolume. Wakati huu kitu cha kujumlisha ni foleni ya kipaumbele cha ukubwa usiobadilika. Katika foleni hii ya ukubwa usiobadilika, ni kampuni tano pekee zilizo na kiasi kikubwa cha hisa zinazouzwa ndizo zinazobaki.
  3. Ramani za foleni kutoka kwa aya iliyotangulia hadi kwa thamani ya mfuatano na urudishe hisa tano za juu zinazouzwa zaidi kulingana na idadi kwa sekta.
  4. Andika matokeo katika fomu ya kamba kwa mada.

Katika Mtini. Mchoro 5.10 unaonyesha grafu ya topolojia ya mtiririko wa data. Kama unaweza kuona, mzunguko wa pili wa usindikaji ni rahisi sana.

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"
Sasa kwa kuwa tuna ufahamu wazi wa muundo wa mzunguko huu wa pili wa usindikaji, tunaweza kugeukia msimbo wake wa chanzo (utaipata kwenye faili src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Orodha 5.4) .

Kianzishaji hiki kina kigeu cha FixedQueue. Hiki ni kifaa maalum ambacho ni adapta ya java.util.TreeSet ambayo hutumika kufuatilia matokeo ya juu ya N katika mpangilio wa kushuka wa hisa zinazouzwa.

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"
Tayari umeona simu za groupBy na mapValues, kwa hivyo hatutaingia kwenye hizo (tunaita mbinu ya KTable.toStream kwa sababu mbinu ya KTable.print imeacha kutumika). Lakini bado haujaona toleo la KTable la aggregate(), kwa hivyo tutatumia muda kidogo kujadili hilo.

Kama unavyokumbuka, kinachofanya KTable kuwa tofauti ni kwamba rekodi zilizo na funguo sawa huzingatiwa kuwa visasisho. KTable inachukua nafasi ya ingizo la zamani na mpya. Ujumlisho hutokea kwa njia sawa: rekodi za hivi karibuni zilizo na ufunguo sawa zimeunganishwa. Rekodi inapofika, huongezwa kwa mfano wa darasa la FixedSizePriorityQueue kwa kutumia kiboreshaji (parameta ya pili katika simu ya njia ya jumla), lakini ikiwa rekodi nyingine tayari iko na ufunguo sawa, basi rekodi ya zamani huondolewa kwa kutumia kiondoa (parameta ya tatu ndani). simu ya njia ya jumla).

Hii yote inamaanisha kuwa kijumlishi chetu, FixedSizePriorityQueue, hakijumlishi thamani zote kwa ufunguo mmoja, lakini huhifadhi jumla inayosonga ya kiasi cha hifadhi N zinazouzwa zaidi. Kila ingizo linaloingia lina jumla ya idadi ya hisa zilizouzwa hadi sasa. KTable itakupa habari kuhusu hisa za kampuni ambazo zinauzwa zaidi kwa sasa, bila kuhitaji ujumlishaji wa kila sasisho.

Tulijifunza kufanya mambo mawili muhimu:

  • maadili ya kikundi katika KTable kwa ufunguo wa kawaida;
  • fanya shughuli muhimu kama vile kujumuisha na kujumlisha thamani hizi zilizowekwa katika vikundi.

Kujua jinsi ya kufanya shughuli hizi ni muhimu kuelewa maana ya data inayosonga kupitia programu ya Mipasho ya Kafka na kuelewa ni taarifa gani inabeba.

Pia tumeleta pamoja baadhi ya dhana muhimu zilizojadiliwa mapema katika kitabu hiki. Katika Sura ya 4, tulijadili jinsi kustahimili makosa, jimbo la karibu ni muhimu kwa programu ya kutiririsha. Mfano wa kwanza katika sura hii ulionyesha kwa nini jimbo la karibu ni muhimu sanaβ€”inakuruhusu kufuatilia ni maelezo gani ambayo tayari umeona. Ufikiaji wa ndani huepuka ucheleweshaji wa mtandao, na kufanya programu kufanya kazi zaidi na kustahimili hitilafu.

Wakati wa kufanya operesheni yoyote ya kukunja au kujumlisha, lazima ueleze jina la duka la serikali. Shughuli za kukunja na kujumlisha hurejesha mfano wa KTable, na KTable hutumia hifadhi ya hali kuchukua nafasi ya matokeo ya zamani na mapya. Kama ulivyoona, sio masasisho yote yanayotumwa chini, na hii ni muhimu kwa sababu shughuli za ujumlishaji zimeundwa ili kutoa maelezo ya muhtasari. Ikiwa hutatumia jimbo la karibu nawe, KTable itasambaza matokeo yote ya kujumlisha na kujumuisha.

Ifuatayo, tutaangalia kufanya shughuli kama vile kujumlisha ndani ya muda maalum - kinachojulikana kama shughuli za madirisha.

5.3.2. Operesheni za dirisha

Katika sehemu iliyotangulia, tulianzisha ubadilishaji wa kuteleza na ujumuishaji. Maombi yalifanya mkusanyo wa mara kwa mara wa kiasi cha mauzo ya hisa, ikifuatiwa na ujumlishaji wa hisa tano zinazouzwa zaidi kwenye soko.

Wakati mwingine ujumlishaji unaoendelea kama huu na upangaji wa matokeo ni muhimu. Na wakati mwingine unahitaji kufanya shughuli kwa kipindi fulani cha muda. Kwa mfano, hesabu ni shughuli ngapi za kubadilishana fedha zilizofanywa na hisa za kampuni fulani katika dakika 10 zilizopita. Au ni watumiaji wangapi waliobofya kwenye bango jipya la utangazaji katika dakika 15 zilizopita. Programu inaweza kufanya shughuli kama hizi mara kadhaa, lakini kwa matokeo ambayo yanatumika kwa vipindi maalum vya muda (madirisha ya wakati).

Kuhesabu shughuli za kubadilishana na mnunuzi

Katika mfano unaofuata, tutafuatilia miamala ya hisa kati ya wafanyabiashara wengiβ€”iwe mashirika makubwa au wafadhili mahiri.

Kuna sababu mbili zinazowezekana za ufuatiliaji huu. Mojawapo ni hitaji la kujua viongozi wa soko wananunua/uza nini. Ikiwa wachezaji hawa wakubwa na wawekezaji wa hali ya juu wataona fursa, ni jambo la maana kufuata mkakati wao. Sababu ya pili ni hamu ya kuona dalili zozote zinazowezekana za biashara haramu ya ndani. Ili kufanya hivyo, utahitaji kuchambua uunganisho wa spikes kubwa za mauzo na vyombo vya habari muhimu.

Ufuatiliaji kama huo unajumuisha hatua zifuatazo:

  • kuunda mkondo wa kusoma kutoka kwa mada ya miamala ya hisa;
  • kupanga rekodi zinazoingia kwa kutumia kitambulisho cha mnunuzi na alama ya hisa. Kupiga simu kwa njia ya kikundi kunarudisha mfano wa darasa la KGroupedStream;
  • Mbinu ya KGroupedStream.windowedBy hurejesha mtiririko wa data kwa kidirisha cha muda, ambacho huruhusu ujumlishaji wa madirisha. Kulingana na aina ya dirisha, TimeWindowedKStream au SessionWindowedKStream inarejeshwa;
  • hesabu ya miamala kwa ajili ya operesheni ya kujumlisha. Mtiririko wa data uliowekwa kwenye dirisha huamua ikiwa rekodi fulani itazingatiwa katika hesabu hii;
  • kuandika matokeo kwa mada au kuyatoa kwa kiweko wakati wa utayarishaji.

Topolojia ya programu hii ni rahisi, lakini picha yake wazi inaweza kusaidia. Hebu tuangalie Mtini. 5.11.

Ifuatayo, tutaangalia utendaji wa shughuli za dirisha na msimbo unaofanana.

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"

Aina za madirisha

Kuna aina tatu za madirisha katika Mipasho ya Kafka:

  • kikao;
  • "kuanguka" (kuanguka);
  • kuteleza/kurukaruka.

Ni ipi ya kuchagua inategemea mahitaji ya biashara yako. Madirisha ya kuyumba na kuruka hayana muda, ilhali madirisha ya vipindi yanadhibitiwa na shughuli za mtumiajiβ€”muda wa (vipindi) huamuliwa pekee na jinsi mtumiaji anavyofanya kazi. Jambo kuu la kukumbuka ni kwamba aina zote za dirisha zinategemea tarehe/saa za mihuri ya maingizo, sio wakati wa mfumo.

Ifuatayo, tunatekeleza topolojia yetu na kila aina ya dirisha. Nambari kamili itatolewa tu katika mfano wa kwanza; kwa aina zingine za windows hakuna kitakachobadilika isipokuwa aina ya operesheni ya dirisha.

Dirisha la kikao

Madirisha ya kikao ni tofauti sana na aina nyingine zote za madirisha. Hazizuiliwi sana na wakati bali na shughuli ya mtumiaji (au shughuli ya huluki ambayo ungependa kufuatilia). Dirisha za kipindi hutenganishwa na vipindi vya kutofanya kazi.

Mchoro 5.12 unaonyesha dhana ya madirisha ya kikao. Kipindi kidogo kitaunganishwa na kikao upande wake wa kushoto. Na kikao cha kulia kitakuwa tofauti kwa sababu kinafuata muda mrefu wa kutokuwa na shughuli. Madirisha ya kipindi yanategemea shughuli za mtumiaji, lakini tumia mihuri ya tarehe/saa kutoka kwa maingizo ili kubaini ni kipindi gani ingizo linahusika.

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"

Kwa kutumia madirisha ya kipindi kufuatilia miamala ya hisa

Hebu tutumie madirisha ya kipindi ili kunasa taarifa kuhusu shughuli za kubadilishana fedha. Utekelezaji wa madirisha ya kikao umeonyeshwa katika Orodha ya 5.5 (ambayo inaweza kupatikana katika src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"
Tayari umeona shughuli nyingi katika topolojia hii, kwa hivyo hakuna haja ya kuzikagua hapa tena. Lakini pia kuna mambo kadhaa mapya hapa, ambayo sasa tutayajadili.

Operesheni yoyote ya kikundiKwa kawaida hufanya aina fulani ya operesheni ya kujumlisha (kukusanya, kukunja au kuhesabu). Unaweza kutekeleza mkusanyiko limbikizi kwa jumla inayoendelea, au ujumlisho wa dirisha, ambao unazingatia rekodi ndani ya dirisha la muda maalum.

Msimbo katika Orodha 5.5 huhesabu idadi ya miamala ndani ya madirisha ya kipindi. Katika Mtini. 5.13 vitendo hivi vinachambuliwa hatua kwa hatua.

Kwa kupiga simu windowedBy(SessionWindows.with(twentySeconds).mpaka(FifteenMinutes)) tunaunda dirisha la kipindi lenye muda wa kutofanya kazi wa sekunde 20 na muda wa kudumu wa dakika 15. Muda wa kutofanya kitu wa sekunde 20 unamaanisha kuwa programu itajumuisha ingizo lolote litakalofika ndani ya sekunde 20 baada ya mwisho au kuanza kwa kipindi cha sasa kwenye kipindi cha sasa ( kinachofanya kazi).

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"
Ifuatayo, tunataja ni operesheni gani ya kujumlisha inahitajika kufanywa kwenye dirisha la kikao - katika kesi hii, hesabu. Ikiwa ingizo linaloingia litaanguka nje ya dirisha la kutotumika (upande wowote wa muhuri wa tarehe/saa), programu itaunda kipindi kipya. Muda wa kubaki unamaanisha kudumisha kipindi kwa muda fulani na kuruhusu data iliyochelewa ambayo huendelea zaidi ya kipindi cha kutotumika lakini bado inaweza kuambatishwa. Zaidi ya hayo, mwanzo na mwisho wa kipindi kipya kinachotokana na muunganisho unalingana na muhuri wa mapema na wa hivi punde zaidi wa tarehe/saa.

Wacha tuangalie maingizo machache kutoka kwa njia ya kuhesabu ili kuona jinsi vipindi hufanya kazi (Jedwali 5.1).

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"
Rekodi zinapofika, tunatafuta vipindi vilivyopo kwa ufunguo sawa, muda wa mwisho chini ya tarehe/saa muhuri - muda wa kutotumika, na muda wa kuanza ulio mkubwa zaidi ya tarehe/saa muhuri + na muda wa sasa wa kutotumika. Kwa kuzingatia hili, maingizo manne kutoka kwa jedwali. 5.1 zimeunganishwa katika kipindi kimoja kama ifuatavyo.

1. Rekodi 1 hufika kwanza, kwa hivyo wakati wa kuanza ni sawa na wakati wa mwisho na ni 00:00:00.

2. Kisha, kiingilio cha 2 kinafika, na tunatafuta vipindi ambavyo haviisha mapema zaidi ya 23:59:55 na kuanza kabla ya 00:00:35. Tunapata rekodi ya 1 na kuchanganya kipindi cha 1 na 2. Tunachukua muda wa kuanza kwa kipindi cha 1 (mapema) na wakati wa mwisho wa kipindi cha 2 (baadaye), ili kipindi chetu kipya kianze saa 00:00:00 na kumalizika saa 00: 00:15.

3. Rekodi 3 inakuja, tunatafuta vipindi kati ya 00:00:30 na 00:01:10 na hatupati yoyote. Ongeza kipindi cha pili kwa ufunguo 123-345-654,FFBE, kuanzia na kuisha saa 00:00:50.

4. Rekodi 4 inafika na tunatafuta vipindi kati ya 23:59:45 na 00:00:25. Wakati huu vipindi vyote vya 1 na 2 vinapatikana. Vipindi vyote vitatu vimeunganishwa kuwa kimoja, na muda wa kuanza wa 00:00:00 na wakati wa mwisho wa 00:00:15.

Kutoka kwa kile kilichoelezewa katika sehemu hii, inafaa kukumbuka nuances zifuatazo muhimu:

  • vipindi si madirisha ya ukubwa usiobadilika. Muda wa kikao huamuliwa na shughuli ndani ya muda fulani;
  • Mihuri ya tarehe/saa katika data huamua ikiwa tukio litaanguka ndani ya kipindi kilichopo au katika kipindi cha kutofanya kitu.

Ifuatayo, tutajadili aina inayofuata ya dirisha - "kuanguka" madirisha.

"Kutetemeka" madirisha

Dirisha zinazoanguka hunasa matukio ambayo huanguka ndani ya kipindi fulani cha muda. Fikiria kuwa unahitaji kunasa shughuli zote za hisa za kampuni fulani kila baada ya sekunde 20, kwa hivyo unakusanya matukio yote katika kipindi hicho cha wakati. Mwishoni mwa muda wa sekunde 20, dirisha linazunguka na kuhamia kwa muda mpya wa uchunguzi wa sekunde 20. Mchoro 5.14 unaonyesha hali hii.

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"
Kama unaweza kuona, matukio yote yaliyopokelewa katika sekunde 20 zilizopita yanajumuishwa kwenye dirisha. Mwishoni mwa kipindi hiki cha muda, dirisha jipya linaundwa.

Orodha ya 5.6 inaonyesha msimbo unaoonyesha matumizi ya madirisha yanayoporomoka ili kunasa miamala ya hisa kila baada ya sekunde 20 (inapatikana katika src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"
Kwa mabadiliko haya madogo kwenye simu ya mbinu ya TimeWindows.of, unaweza kutumia kidirisha cha kuporomoka. Mfano huu hauitaji mbinu ya until(), kwa hivyo muda chaguomsingi wa uhifadhi wa saa 24 utatumika.

Hatimaye, ni wakati wa kuendelea hadi mwisho wa chaguzi za dirisha - madirisha ya "hopping".

Kuteleza ("kuruka") madirisha

Dirisha za kuteleza/kurukaruka ni sawa na madirisha yanayoporomoka, lakini kwa tofauti kidogo. Dirisha zinazoteleza hazisubiri hadi mwisho wa muda kabla ya kuunda dirisha jipya ili kuchakata matukio ya hivi majuzi. Wanaanza mahesabu mapya baada ya muda wa kusubiri chini ya muda wa dirisha.

Ili kuonyesha tofauti kati ya kuporomoka na kuruka madirisha, wacha turudi kwenye mfano wa kuhesabu shughuli za ubadilishaji wa hisa. Lengo letu bado ni kuhesabu idadi ya miamala, lakini hatutaki kusubiri muda wote kabla ya kusasisha kaunta. Badala yake, tutasasisha kaunta kwa vipindi vifupi. Kwa mfano, bado tutahesabu idadi ya miamala kila baada ya sekunde 20, lakini sasisha kihesabu kila baada ya sekunde 5, kama inavyoonyeshwa kwenye Mtini. 5.15. Katika kesi hii, tunaishia na madirisha matatu ya matokeo na data inayoingiliana.

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"
Orodha ya 5.7 inaonyesha msimbo wa kufafanua madirisha ya kutelezesha (inapatikana katika src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"
Dirisha linaloanguka linaweza kubadilishwa kuwa dirisha la kuruka-ruka kwa kuongeza simu kwa advanceBy() mbinu. Katika mfano ulioonyeshwa, muda wa kuokoa ni dakika 15.

Uliona katika sehemu hii jinsi ya kupunguza matokeo ya ujumlisho kwa madirisha ya saa. Hasa, nataka ukumbuke mambo matatu yafuatayo kutoka kwa sehemu hii:

  • ukubwa wa madirisha ya kikao ni mdogo si kwa muda, lakini kwa shughuli za mtumiaji;
  • madirisha ya "kuanguka" hutoa muhtasari wa matukio ndani ya muda fulani;
  • Muda wa madirisha ya kuruka umewekwa, lakini husasishwa mara kwa mara na inaweza kuwa na maingizo yanayoingiliana katika madirisha yote.

Ifuatayo, tutajifunza jinsi ya kubadilisha KTable kurudi kwa KStream kwa muunganisho.

5.3.3. Kuunganisha vitu vya KStream na KTable

Katika Sura ya 4, tulijadili kuunganisha vitu viwili vya KStream. Sasa tunapaswa kujifunza jinsi ya kuunganisha KTable na KStream. Hii inaweza kuhitajika kwa sababu zifuatazo rahisi. KStream ni mtiririko wa rekodi, na KTable ni mtiririko wa masasisho ya rekodi, lakini wakati mwingine unaweza kutaka kuongeza muktadha wa ziada kwenye mtiririko wa rekodi kwa kutumia masasisho kutoka kwa KTable.

Wacha tuchukue data juu ya idadi ya miamala ya soko la hisa na tuichanganye na habari za soko la hisa kwa tasnia husika. Hivi ndivyo unahitaji kufanya ili kufanikisha hili kutokana na nambari ambayo tayari unayo.

  1. Badilisha kitu cha KTable chenye data kuhusu idadi ya miamala ya hisa kuwa KStream, ikifuatiwa na kubadilisha ufunguo na ufunguo unaoonyesha sekta ya sekta inayolingana na ishara hii ya hisa.
  2. Unda kipengee cha KTable kinachosoma data kutoka kwa mada yenye habari za ubadilishaji wa hisa. KTable hii mpya itaainishwa na sekta ya tasnia.
  3. Unganisha masasisho ya habari na maelezo kuhusu idadi ya miamala ya soko la hisa kulingana na sekta ya tasnia.

Sasa hebu tuone jinsi ya kutekeleza mpango huu wa utekelezaji.

Badilisha KTable kuwa KStream

Ili kubadilisha KTable kuwa KStream unahitaji kufanya yafuatayo.

  1. Piga njia ya KTable.toStream().
  2. Kwa kuita mbinu ya KStream.map, badilisha ufunguo na jina la sekta, kisha urejeshe kitu cha TransactionSummary kutoka kwa mfano wa Dirisha.

Tutaunganisha shughuli hizi pamoja kama ifuatavyo (msimbo unaweza kupatikana katika faili src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Orodha 5.8).

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"
Kwa sababu tunatekeleza operesheni ya KStream.map, mfano uliorejeshwa wa KStream hugawanywa tena kiotomatiki inapotumika kwenye muunganisho.

Tumekamilisha mchakato wa ubadilishaji, kinachofuata tunahitaji kuunda kitu cha KTable cha kusoma habari za hisa.

Uundaji wa KTable kwa habari za hisa

Kwa bahati nzuri, kuunda kitu cha KTable huchukua mstari mmoja tu wa msimbo (msimbo unaweza kupatikana katika src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Orodha 5.9).

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"
Ni vyema kutambua kwamba hakuna vitu vya Serde vinavyohitajika kutajwa, kwa kuwa kamba Serdes hutumiwa katika mipangilio. Pia, kwa kutumia hesabu ya MAPEMA KABISA, jedwali linajazwa na rekodi mwanzoni kabisa.

Sasa tunaweza kuendelea na hatua ya mwisho - uunganisho.

Kuunganisha masasisho ya habari na data ya hesabu ya miamala

Kuunda muunganisho sio ngumu. Tutatumia kiungo cha kushoto iwapo hakuna habari za hisa kwa sekta husika (msimbo unaohitajika unaweza kupatikana katika faili src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Orodha 5.10).

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"
Opereta hii ya leftJoin ni rahisi sana. Tofauti na viungio katika Sura ya 4, mbinu ya JoinWindow haitumiki kwa sababu wakati wa kutekeleza unganisho la KStream-KTable, kuna ingizo moja tu katika KTable kwa kila ufunguo. Muunganisho kama huo sio mdogo kwa wakati: rekodi iko kwenye KTable au haipo. Hitimisho kuu: kwa kutumia vitu vya KTable unaweza kuboresha KStream na data ya marejeleo isiyosasishwa mara kwa mara.

Sasa tutaangalia njia bora zaidi ya kuboresha matukio kutoka kwa KStream.

5.3.4. Vitu vya GlobalKTable

Kama unavyoona, kuna haja ya kuimarisha mitiririko ya matukio au kuongeza muktadha kwao. Katika Sura ya 4 uliona miunganisho kati ya vitu viwili vya KStream, na katika sehemu iliyotangulia uliona muunganisho kati ya KStream na KTable. Katika matukio haya yote, ni muhimu kugawa tena mtiririko wa data wakati wa kupanga funguo za aina mpya au thamani. Wakati mwingine ugawaji upya unafanywa kwa uwazi, na wakati mwingine Kafka Streams hufanya hivyo moja kwa moja. Kugawanya upya ni muhimu kwa sababu funguo zimebadilika na rekodi lazima ziwe katika sehemu mpya, vinginevyo uunganisho hautawezekana (hii ilijadiliwa katika Sura ya 4, katika sehemu ya "Kugawanya tena data" katika kifungu cha 4.2.4).

Kugawanya upya kuna gharama

Kugawanya upya kunahitaji gharama - gharama za ziada za rasilimali kwa kuunda mada za kati, kuhifadhi data iliyorudiwa katika mada nyingine; pia inamaanisha kuongezeka kwa muda wa kusubiri kwa sababu ya kuandika na kusoma kutoka kwa mada hii. Zaidi ya hayo, ikiwa unahitaji kujiunga katika kipengele au vipimo zaidi ya kimoja, ni lazima uweke minyororo ya viungio, upange rekodi kwa funguo mpya, na uendeshe mchakato wa kugawa tena.

Inaunganisha kwenye seti ndogo za data

Katika baadhi ya matukio, kiasi cha data ya marejeleo ya kuunganishwa ni kidogo, kwa hivyo nakala zake kamili zinaweza kutoshea kwa urahisi ndani ya kila nodi. Kwa hali kama hizi, Mipasho ya Kafka hutoa darasa la GlobalKTable.

Matukio ya GlobalKTable ni ya kipekee kwa sababu programu inakili data zote kwa kila nodi. Na kwa kuwa data yote iko kwenye kila nodi, hakuna haja ya kugawa mkondo wa tukio kwa ufunguo wa data ya kumbukumbu ili ipatikane kwa sehemu zote. Unaweza pia kufanya viungio visivyo na ufunguo kwa kutumia vitu vya GlobalKTable. Hebu turudi kwenye mojawapo ya mifano iliyotangulia ili kuonyesha kipengele hiki.

Kuunganisha vitu vya KStream kwa vitu vya GlobalKTable

Katika kifungu kidogo cha 5.3.2, tulifanya ujumlisho wa dirisha wa miamala ya kubadilishana fedha na wanunuzi. Matokeo ya mkusanyiko huu yalionekana kama hii:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Ingawa matokeo haya yalitimiza madhumuni, ingefaa zaidi ikiwa jina la mteja na jina kamili la kampuni pia lingeonyeshwa. Ili kuongeza jina la mteja na jina la kampuni, unaweza kufanya viungio vya kawaida, lakini utahitaji kufanya upangaji wa ufunguo mbili na kugawa upya. Ukiwa na GlobalKTable unaweza kuepuka gharama ya shughuli kama hizo.

Ili kufanya hivyo, tutatumia kipengele cha countStream kutoka kwa Orodha ya 5.11 (msimbo unaolingana unaweza kupatikana katika src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) na kuiunganisha kwa vitu viwili vya GlobalKTable.

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"
Tayari tumejadili hili hapo awali, kwa hivyo sitarudia. Lakini nakumbuka kuwa msimbo katika toStream().kitendakazi cha ramani kimetolewa kuwa kitu cha kukokotoa badala ya usemi wa ndani wa lambda kwa ajili ya kusomeka.

Hatua inayofuata ni kutangaza matukio mawili ya GlobalKTable (msimbo ulioonyeshwa unaweza kupatikana katika faili src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Orodha 5.12).

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"

Tafadhali kumbuka kuwa majina ya mada yanaelezewa kwa kutumia aina zilizoorodheshwa.

Sasa kwa kuwa tuna vipengele vyote tayari, kilichobaki ni kuandika msimbo wa uunganisho (unaoweza kupatikana kwenye faili src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Orodha 5.13).

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"
Ingawa kuna viungio viwili katika msimbo huu, vimefungwa kwa sababu hakuna matokeo yao yanayotumika tofauti. Matokeo yanaonyeshwa mwishoni mwa operesheni nzima.

Unapoendesha operesheni ya kujiunga hapo juu, utapata matokeo kama haya:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Kiini hakijabadilika, lakini matokeo haya yanaonekana wazi zaidi.

Ukihesabu hadi Sura ya 4, tayari umeona aina kadhaa za miunganisho zikifanya kazi. Wameorodheshwa kwenye jedwali. 5.2. Jedwali hili linaonyesha uwezo wa muunganisho wa toleo la 1.0.0 la Mipasho ya Kafka; Kitu kinaweza kubadilika katika matoleo yajayo.

Kitabu "Kafka Inatiririka kwa Vitendo. Maombi na huduma ndogo kwa kazi ya wakati halisi"
Ili kukamilisha mambo, hebu turudie mambo ya msingi: unaweza kuunganisha mitiririko ya matukio (KStream) na kusasisha mitiririko (KTable) kwa kutumia jimbo la karibu. Vinginevyo, ikiwa saizi ya data ya kumbukumbu sio kubwa sana, unaweza kutumia kitu cha GlobalKTable. GlobalKTables huiga sehemu zote kwa kila nodi ya programu ya Mipasho ya Kafka, kuhakikisha kwamba data yote inapatikana bila kujali ni sehemu gani ya ufunguo inalingana.

Ifuatayo tutaona kipengele cha Mipasho ya Kafka, shukrani ambayo tunaweza kuona mabadiliko ya hali bila kutumia data kutoka kwa mada ya Kafka.

5.3.5. Hali ya kuhojiwa

Tayari tumetekeleza shughuli kadhaa zinazohusisha hali na kila mara kutoa matokeo kwa dashibodi (kwa madhumuni ya usanidi) au kuyaandika kwa mada (kwa madhumuni ya uzalishaji). Unapoandika matokeo kwa mada, lazima utumie mtumiaji wa Kafka ili kuyatazama.

Kusoma data kutoka kwa mada hizi kunaweza kuzingatiwa kama aina ya maoni yanayoonekana. Kwa madhumuni yetu, tunaweza kutumia ufafanuzi wa mtazamo unaoonekana kutoka kwa Wikipedia: β€œ...kipengee halisi cha hifadhidata kilicho na matokeo ya hoja. Kwa mfano, inaweza kuwa nakala ya ndani ya data ya mbali, au kikundi kidogo cha safu mlalo na/au safu wima za jedwali au matokeo ya uunganisho, au jedwali la muhtasari lililopatikana kwa kujumlisha” (https://en.wikipedia.org/wiki /Mwonekano_wa_nyenzo).

Mitiririko ya Kafka pia hukuruhusu kuendesha maswali wasilianifu kwenye maduka ya serikali, huku kuruhusu kusoma moja kwa moja maoni haya yaliyobadilishwa. Ni muhimu kutambua kwamba swala kwa duka la serikali ni operesheni ya kusoma tu. Hii inahakikisha kwamba huna haja ya kuwa na wasiwasi kuhusu kufanya hali isifanane kimakosa wakati programu yako inachakata data.

Uwezo wa kuuliza moja kwa moja maduka ya serikali ni muhimu. Hii inamaanisha kuwa unaweza kuunda programu za dashibodi bila kwanza kuleta data kutoka kwa mtumiaji wa Kafka. Pia huongeza ufanisi wa programu, kwa sababu ya ukweli kwamba hakuna haja ya kuandika data tena:

  • shukrani kwa eneo la data, zinaweza kupatikana kwa haraka;
  • kurudia data kunaondolewa, kwani haijaandikwa kwa hifadhi ya nje.

Jambo kuu ambalo nataka ukumbuke ni kwamba unaweza kuuliza moja kwa moja hali kutoka ndani ya programu yako. Fursa zinazokupa hii haziwezi kupitiwa. Badala ya kutumia data kutoka kwa Kafka na kuhifadhi rekodi kwenye hifadhidata ya programu, unaweza kuuliza maduka ya serikali na matokeo sawa. Maswali ya moja kwa moja kwa maduka ya serikali yanamaanisha msimbo mdogo (hakuna mtumiaji) na programu ndogo (hakuna haja ya jedwali la hifadhidata ili kuhifadhi matokeo).

Tumeshughulikia mambo mengi katika sura hii, kwa hivyo tutaacha mjadala wetu wa hoja wasilianifu dhidi ya maduka ya serikali kwa sasa. Lakini usijali: katika Sura ya 9, tutaunda programu rahisi ya dashibodi yenye maswali wasilianifu. Itatumia baadhi ya mifano kutoka kwa sura hii na iliyotangulia ili kuonyesha maswali wasilianifu na jinsi unavyoweza kuyaongeza kwenye programu za Mipasho ya Kafka.

Muhtasari

  • Vitu vya KStream vinawakilisha mikondo ya matukio, kulinganishwa na viingilio kwenye hifadhidata. Vipengee vya KTable vinawakilisha mitiririko ya sasisho, zaidi kama masasisho kwenye hifadhidata. Ukubwa wa kitu cha KTable haukua, rekodi za zamani zinabadilishwa na mpya.
  • Vitu vya KTable vinahitajika kwa shughuli za kujumlisha.
  • Kwa kutumia uendeshaji wa dirisha, unaweza kugawanya data iliyojumlishwa katika ndoo za saa.
  • Shukrani kwa vitu vya GlobalKTable, unaweza kufikia data ya marejeleo popote kwenye programu, bila kujali kugawa.
  • Muunganisho kati ya vitu vya KStream, KTable na GlobalKTable vinawezekana.

Kufikia sasa, tumeangazia kuunda programu za Kafka Streams kwa kutumia KStream DSL ya kiwango cha juu. Ingawa mbinu ya hali ya juu hukuruhusu kuunda programu nadhifu na mafupi, kuitumia inawakilisha biashara. Kufanya kazi na DSL KStream kunamaanisha kuongeza ufupi wa msimbo wako kwa kupunguza kiwango cha udhibiti. Katika sura inayofuata, tutaangalia API ya nodi ya kidhibiti cha kiwango cha chini na kujaribu ubadilishanaji mwingine. Programu zitakuwa ndefu kuliko ilivyokuwa hapo awali, lakini tutaweza kuunda karibu nodi yoyote ya kidhibiti ambayo tunaweza kuhitaji.

β†’ Maelezo zaidi kuhusu kitabu yanaweza kupatikana tovuti ya mchapishaji

β†’ Kwa Habrozhiteli punguzo la 25% kwa kutumia kuponi - Mipasho ya Kafka

β†’ Baada ya malipo ya toleo la karatasi la kitabu, kitabu cha kielektroniki kitatumwa kwa barua pepe.

Chanzo: mapenzi.com

Kuongeza maoni