«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար» Ողջույն Խաբրո բնակիչներ։ Այս գիրքը հարմար է ցանկացած մշակողի համար, ով ցանկանում է հասկանալ թելերի մշակումը: Բաշխված ծրագրավորման ըմբռնումը կօգնի ձեզ ավելի լավ հասկանալ Kafka-ն և Kafka Streams-ը: Լավ կլիներ իմանալ հենց Կաֆկայի շրջանակը, բայց դա անհրաժեշտ չէ. ես ձեզ կասեմ այն ​​ամենը, ինչ ձեզ հարկավոր է: Փորձառու Kafka մշակողները և սկսնակները կսովորեն, թե ինչպես ստեղծել հետաքրքիր հոսքային մշակման հավելվածներ՝ օգտագործելով Kafka Streams գրադարանն այս գրքում: Միջանկյալ և առաջադեմ Java ծրագրավորողները, որոնք արդեն ծանոթ են սերիալացման հասկացություններին, կսովորեն կիրառել իրենց հմտությունները Kafka Streams հավելվածներ ստեղծելու համար: Գրքի սկզբնական կոդը գրված է Java 8-ով և զգալի օգտագործում է Java 8 lambda արտահայտությունների շարահյուսությունը, այնպես որ իմանալը, թե ինչպես աշխատել լամբդա ֆունկցիաների հետ (նույնիսկ այլ ծրագրավորման լեզվով) օգտակար կլինի:

Հատված. 5.3. Ագրեգացման և պատուհանագծման գործողություններ

Այս բաժնում մենք կշարունակենք ուսումնասիրել Kafka Streams-ի ամենախոստումնալից հատվածները: Մինչ այժմ մենք լուսաբանել ենք Kafka Streams-ի հետևյալ ասպեկտները.

  • մշակման տոպոլոգիայի ստեղծում;
  • վիճակի օգտագործում հոսքային հավելվածներում;
  • տվյալների հոսքի միացումների իրականացում;
  • Տարբերությունները իրադարձությունների հոսքերի (KStream) և թարմացման հոսքերի (KTable) միջև:

Հետևյալ օրինակներում մենք միասին կբերենք այս բոլոր տարրերը: Դուք նաև կսովորեք պատուհանապատման՝ հոսքային հավելվածների մեկ այլ հիանալի հատկանիշի մասին: Մեր առաջին օրինակը կլինի պարզ ագրեգացիա:

5.3.1. Բաժնետոմսերի վաճառքի համախմբում ըստ արդյունաբերության ոլորտի

Ագրեգացումը և խմբավորումը կենսական գործիքներ են հոսքային տվյալների հետ աշխատելիս: Անհատական ​​գրառումների ուսումնասիրությունը, երբ դրանք ստացվում են, հաճախ անբավարար են: Տվյալներից լրացուցիչ տեղեկատվություն հանելու համար անհրաժեշտ է դրանք խմբավորել և համատեղել։

Այս օրինակում դուք կհագնեք ամենօրյա վաճառողի զգեստը, ով պետք է հետևի մի քանի ոլորտների ընկերությունների բաժնետոմսերի վաճառքի ծավալին: Մասնավորապես, ձեզ հետաքրքրում են յուրաքանչյուր ոլորտում ամենամեծ բաժնետոմսերի վաճառքով հինգ ընկերությունները:

Նման համախմբումը կպահանջի հետևյալ մի քանի քայլերը՝ տվյալները ցանկալի ձևով թարգմանելու համար (խոսելով ընդհանուր տերմիններով):

  1. Ստեղծեք թեմայի վրա հիմնված աղբյուր, որը հրապարակում է չմշակված բաժնետոմսերի առևտրային տեղեկատվությունը: Մենք ստիպված կլինենք StockTransaction տեսակի օբյեկտը քարտեզագրել ShareVolume տիպի օբյեկտի վրա: Բանն այն է, որ StockTransaction օբյեկտը պարունակում է վաճառքի մետատվյալներ, սակայն մեզ անհրաժեշտ են միայն վաճառվող բաժնետոմսերի քանակի մասին տվյալներ։
  2. Խմբավորել ShareVolume տվյալները ըստ բաժնետոմսերի խորհրդանիշի: Ըստ խորհրդանիշների խմբավորվելուց հետո դուք կարող եք այս տվյալները փլուզել բաժնետոմսերի վաճառքի ծավալների ենթագումարների մեջ: Հարկ է նշել, որ KStream.groupBy մեթոդը վերադարձնում է KGroupedStream տիպի օրինակ: Եվ դուք կարող եք ստանալ KTable օրինակ՝ հետագայում զանգահարելով KGroupedStream.reduce մեթոդը:

Ինչ է KGroupedStream ինտերֆեյսը

KStream.groupBy և KStream.groupByKey մեթոդները վերադարձնում են KGroupedStream-ի օրինակ: KGroupedStream-ը իրադարձությունների հոսքի միջանկյալ ներկայացումն է՝ ըստ բանալիների խմբավորումից հետո: Այն ամենևին նախատեսված չէ դրա հետ անմիջական աշխատանքի համար։ Փոխարենը, KGroupedStream-ն օգտագործվում է ագրեգացման գործողությունների համար, որոնք միշտ հանգեցնում են KTable-ի: Եվ քանի որ ագրեգացման գործողությունների արդյունքը KTable է, և նրանք օգտագործում են պետական ​​խանութ, հնարավոր է, որ արդյունքում ոչ բոլոր թարմացումներն ուղարկվեն խողովակաշարի ներքև:

KTable.groupBy մեթոդը վերադարձնում է նմանատիպ KGroupedTable - թարմացումների հոսքի միջանկյալ ներկայացում, որը վերախմբավորվում է ըստ բանալիի:

Եկեք մի փոքր ընդմիջում կատարենք և նայենք Նկ. 5.9, որը ցույց է տալիս, թե ինչի ենք հասել։ Այս տոպոլոգիան ձեզ արդեն շատ ծանոթ պետք է լինի:

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»
Այժմ նայենք այս տոպոլոգիայի կոդը (այն կարելի է գտնել src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java ֆայլում) (Ցուցակ 5.2):

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»
Տվյալ ծածկագիրը առանձնանում է իր հակիրճությամբ և մի քանի տողերում կատարված գործողությունների մեծ ծավալով։ Դուք կարող եք նկատել ինչ-որ նոր բան builder.stream մեթոդի առաջին պարամետրում. Enum տեսակի AutoOffsetReset.EARLIEST (կա նաև ՎԵՐՋԻՆ) արժեք, որը սահմանված է Consumed.withOffsetResetPolicy մեթոդով: Թվարկման այս տեսակը կարող է օգտագործվել յուրաքանչյուր KStream-ի կամ KTable-ի համար օֆսեթ վերակայման ռազմավարություն սահմանելու համար և գերակայություն է ստանում կազմաձևից օֆսեթ վերակայման տարբերակից:

GroupByKey և GroupBy

KStream ինտերֆեյսն ունի գրառումների խմբավորման երկու եղանակ՝ GroupByKey և GroupBy: Երկուսն էլ վերադարձնում են KGroupedTable, այնպես որ դուք կարող եք մտածել, թե որն է տարբերությունը նրանց միջև և երբ օգտագործել որն է:

GroupByKey մեթոդն օգտագործվում է, երբ KStream-ի ստեղները արդեն դատարկ չեն: Եվ ամենակարևորը, «վերաբաշխում է պահանջում» դրոշը երբեք չի դրվել:

GroupBy մեթոդը ենթադրում է, որ դուք փոխել եք խմբավորման ստեղները, ուստի վերաբաշխման դրոշակը սահմանվում է true-ի: GroupBy մեթոդից հետո միացումներ, ագրեգացիաներ և այլն կատարելը կհանգեցնի ավտոմատ վերաբաժանման:
Ամփոփում. Հնարավորության դեպքում դուք պետք է օգտագործեք GroupByKey-ը, քան GroupBy-ը:

Հասկանալի է, թե ինչ են անում mapValues-ը և groupBy մեթոդները, ուստի եկեք նայենք sum() մեթոդին (գտնվում է src/main/java/bbejeck/model/ShareVolume.java-ում) (Ցուցակ 5.3):

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»
ShareVolume.sum մեթոդը վերադարձնում է բաժնետոմսերի վաճառքի ծավալի ընթացիկ ընդհանուր գումարը, և հաշվարկների ամբողջ շղթայի արդյունքը KTable օբյեկտ է: . Այժմ դուք հասկանում եք KTable-ի դերը: Երբ ShareVolume օբյեկտները ժամանում են, համապատասխան KTable օբյեկտը պահում է վերջին ընթացիկ թարմացումը: Կարևոր է հիշել, որ բոլոր թարմացումներն արտացոլված են նախորդ shareVolumeKTable-ում, բայց ոչ բոլորն են ուղարկվում հետագա:

Այնուհետև մենք օգտագործում ենք այս KTable-ը` միավորելու համար (ըստ վաճառվող բաժնետոմսերի քանակի), որպեսզի հասնենք յուրաքանչյուր ոլորտում վաճառվող բաժնետոմսերի ամենաբարձր ծավալներով հինգ ընկերություններին: Մեր գործողություններն այս դեպքում նման կլինեն առաջին ագրեգացման գործողություններին:

  1. Կատարեք մեկ այլ groupBy գործողություն՝ առանձին ShareVolume օբյեկտները խմբավորելու համար՝ ըստ ոլորտի:
  2. Սկսեք ամփոփել ShareVolume օբյեկտները: Այս անգամ ագրեգացման օբյեկտը ֆիքսված չափի առաջնահերթ հերթ է: Այս ֆիքսված չափի հերթում պահպանվում են միայն վաճառված բաժնետոմսերի ամենամեծ քանակով հինգ ընկերությունները:
  3. Նախորդ պարբերության հերթերը գծեք լարային արժեքի վրա և վերադարձրեք ամենավաճառվող բաժնետոմսերի լավագույն հնգյակը ըստ թվերի ըստ ոլորտի:
  4. Արդյունքները տողային ձևով գրեք թեմային:

Նկ. Նկար 5.10-ը ցույց է տալիս տվյալների հոսքի տոպոլոգիայի գրաֆիկը: Ինչպես տեսնում եք, վերամշակման երկրորդ փուլը բավականին պարզ է:

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»
Այժմ, երբ մենք հստակ հասկանում ենք մշակման այս երկրորդ փուլի կառուցվածքը, կարող ենք դիմել դրա սկզբնաղբյուրին (այն կգտնեք src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java ֆայլում) (Ցուցակ 5.4) .

Այս սկզբնավորիչը պարունակում է fixedQueue փոփոխական: Սա հատուկ օբյեկտ է, որը ադապտեր է java.util.TreeSet-ի համար, որն օգտագործվում է վաճառվող բաժնետոմսերի նվազման կարգով վերին N արդյունքներին հետևելու համար:

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»
Դուք արդեն տեսել եք groupBy-ի և mapValues-ի զանգերը, ուստի մենք չենք անդրադառնա դրանց (մենք անվանում ենք KTable.toStream մեթոդ, քանի որ KTable.print մեթոդը հնացած է): Բայց դուք դեռ չեք տեսել aggregate()-ի KTable տարբերակը, այնպես որ մենք մի փոքր ժամանակ կծախսենք դրա քննարկմանը:

Ինչպես հիշում եք, KTable-ը տարբերվում է նրանով, որ նույն ստեղներով գրառումները համարվում են թարմացումներ: KTable-ը փոխարինում է հին մուտքը նորով: Ագրեգացումը տեղի է ունենում նույն ձևով. նույն բանալիով վերջին գրառումները ագրեգացվում են: Երբ գրառումը գալիս է, այն ավելացվում է FixedSizePriorityQueue դասի օրինակին, օգտագործելով ավելորդ (երկրորդ պարամետր ագրեգատ մեթոդի կանչում), բայց եթե մեկ այլ գրառում արդեն գոյություն ունի նույն բանալիով, ապա հին գրառումը հանվում է հանմանչի միջոցով (երրորդ պարամետրը. ագրեգատ մեթոդի կանչը):

Այս ամենը նշանակում է, որ մեր ագրեգատորը՝ FixedSizePriorityQueue-ը, չի համախմբում բոլոր արժեքները մեկ բանալիով, այլ պահպանում է բաժնետոմսերի N առավել վաճառվող տեսակների քանակությունների շարժական գումարը: Յուրաքանչյուր մուտքային գրառում պարունակում է մինչ այժմ վաճառված բաժնետոմսերի ընդհանուր թիվը: KTable-ը ձեզ կտրամադրի տեղեկատվություն այն մասին, թե որ ընկերությունների բաժնետոմսերն են ներկայումս ամենաշատ վաճառվողները՝ առանց յուրաքանչյուր թարմացման շարժական ագրեգացման պահանջի:

Մենք սովորեցինք անել երկու կարևոր բան.

  • խմբավորել արժեքները KTable-ում ընդհանուր բանալիով;
  • կատարել օգտակար գործողություններ, ինչպիսիք են հավաքագրումը և համախմբումը այս խմբավորված արժեքների վրա:

Իմանալը, թե ինչպես կատարել այս գործողությունները, կարևոր է Kafka Streams հավելվածի միջոցով շարժվող տվյալների իմաստը հասկանալու և այն տեղեկության փոխանցման համար:

Մենք նաև հավաքել ենք այս գրքում ավելի վաղ քննարկված որոշ հիմնական հասկացություններ: Գլուխ 4-ում մենք քննարկեցինք, թե որքան սխալ հանդուրժող, տեղական վիճակը կարևոր է հոսքային հավելվածի համար: Այս գլխի առաջին օրինակը ցույց տվեց, թե ինչու է տեղական պետությունն այդքան կարևոր. այն ձեզ հնարավորություն է տալիս հետևելու, թե ինչ տեղեկատվություն եք արդեն տեսել: Տեղական մուտքը խուսափում է ցանցի հետաձգումներից՝ հավելվածը դարձնելով ավելի արդյունավետ և սխալների նկատմամբ:

Ցանկացած հավաքման կամ ագրեգացման գործողություն կատարելիս պետք է նշեք պետական ​​խանութի անունը: Համախմբման և համախմբման գործողությունները վերադարձնում են KTable օրինակ, իսկ KTable-ն օգտագործում է վիճակի պահեստավորում՝ հին արդյունքները նորերով փոխարինելու համար: Ինչպես տեսաք, ոչ բոլոր թարմացումներն են ուղարկվում խողովակաշարով, և դա կարևոր է, քանի որ ագրեգացման գործողությունները նախատեսված են ամփոփ տեղեկատվություն ստանալու համար: Եթե ​​դուք չեք կիրառում տեղական վիճակը, KTable-ը կուղարկի բոլոր ագրեգացման և համախմբման արդյունքները:

Այնուհետև մենք կանդրադառնանք որոշակի ժամանակահատվածում այնպիսի գործողությունների կատարմանը, ինչպիսին է ագրեգացումը, այսպես կոչված, պատուհանների կիրառումը:

5.3.2. Պատուհանների գործողություններ

Նախորդ բաժնում մենք ներկայացրեցինք սահող կոնվոլյուցիան և ագրեգացումը: Հավելվածը իրականացրել է բաժնետոմսերի վաճառքի շարունակական համախմբում, որին հաջորդել է բորսայում հինգ ամենավաճառվող բաժնետոմսերի միավորումը:

Երբեմն անհրաժեշտ է լինում արդյունքների նման շարունակական համախմբում և համախմբում: Եվ երբեմն անհրաժեշտ է գործողություններ կատարել միայն որոշակի ժամանակահատվածում: Օրինակ՝ հաշվեք, թե վերջին 10 րոպեում քանի բորսայական գործարք է կատարվել կոնկրետ ընկերության բաժնետոմսերով։ Կամ քանի օգտատեր է կտտացրել նոր գովազդային դրոշի վրա վերջին 15 րոպեի ընթացքում: Հավելվածը կարող է նման գործողություններ կատարել մի քանի անգամ, բայց արդյունքներով, որոնք վերաբերում են միայն որոշակի ժամանակահատվածներին (ժամանակային պատուհաններ):

Փոխանակման գործարքների հաշվում գնորդի կողմից

Հաջորդ օրինակում մենք կհետևենք բաժնետոմսերի գործարքներին մի քանի թրեյդերների միջև՝ խոշոր կազմակերպություններ կամ խելացի անհատ ֆինանսիստներ:

Այս հետևելու երկու հնարավոր պատճառ կա. Դրանցից մեկն այն է, որ պետք է իմանալ, թե շուկայի առաջատարներն ինչ են գնում/վաճառում: Եթե ​​այս խոշոր խաղացողները և բարդ ներդրողները հնարավորություն են տեսնում, իմաստ ունի հետևել նրանց ռազմավարությանը: Երկրորդ պատճառը ապօրինի ինսայդերական առևտրի հնարավոր նշաններ հայտնաբերելու ցանկությունն է: Դա անելու համար ձեզ հարկավոր է վերլուծել վաճառքի մեծ աճերի հարաբերակցությունը կարևոր մամուլի հաղորդագրությունների հետ:

Նման հետևելը բաղկացած է հետևյալ քայլերից.

  • բաժնետոմսերի գործարքների թեմայից կարդալու հոսքի ստեղծում;
  • մուտքային գրառումների խմբավորում ըստ գնորդի ID-ի և բաժնետոմսի խորհրդանիշի: GroupBy մեթոդը կանչելը վերադարձնում է KGroupedStream դասի օրինակը;
  • KGroupedStream.windowedBy մեթոդը վերադարձնում է տվյալների հոսք, որը սահմանափակվում է ժամանակային պատուհանով, որը թույլ է տալիս պատուհաններով ագրեգացնել: Կախված պատուհանի տեսակից, վերադարձվում է TimeWindowedKStream կամ SessionWindowedKStream;
  • գործարքների հաշվարկը միավորման գործողության համար: Պատուհաններով տվյալների հոսքը որոշում է, թե արդյոք որոշակի գրառումը հաշվի է առնվել այս հաշվարկում.
  • արդյունքները գրել թեմայում կամ դրանք մշակելիս ելքացնել վահանակին:

Այս հավելվածի տոպոլոգիան պարզ է, բայց դրա հստակ պատկերը օգտակար կլինի: Եկեք նայենք Նկ. 5.11.

Հաջորդը, մենք կանդրադառնանք պատուհանի գործառնությունների ֆունկցիոնալությանը և համապատասխան կոդը:

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»

Պատուհանների տեսակները

Kafka Streams-ում կան երեք տեսակի պատուհաններ.

  • նիստային;
  • «գլուխկոտրում»;
  • սահում / ցատկում.

Որն ընտրել կախված է ձեր բիզնեսի պահանջներից: Շրջանառվող և թռչող պատուհանները ժամանակով սահմանափակված են, մինչդեռ նստաշրջանի պատուհանները սահմանափակվում են օգտվողի ակտիվությամբ. նիստ(ներ)ի տևողությունը որոշվում է բացառապես օգտատերի ակտիվությամբ: Հիմնական բանը, որ պետք է հիշել, այն է, որ բոլոր պատուհանների տեսակները հիմնված են մուտքերի ամսաթվի/ժամային դրոշմանիշերի վրա, ոչ թե համակարգի ժամանակի վրա:

Հաջորդը, մենք իրականացնում ենք մեր տոպոլոգիան պատուհանների տեսակներից յուրաքանչյուրի հետ: Ամբողջական կոդը կտրվի միայն առաջին օրինակում, այլ տեսակի պատուհանների համար ոչինչ չի փոխվի, բացի պատուհանի գործողության տեսակից:

Նիստի պատուհաններ

Նիստի պատուհանները շատ տարբերվում են բոլոր այլ տեսակի պատուհաններից: Դրանք սահմանափակված են ոչ այնքան ժամանակով, որքան օգտատիրոջ ակտիվությամբ (կամ այն ​​սուբյեկտի ակտիվությամբ, որին դուք կցանկանայիք հետևել): Նիստի պատուհանները սահմանազատված են անգործության ժամանակաշրջաններով:

Նկար 5.12-ը ցույց է տալիս նիստերի պատուհանների հայեցակարգը: Փոքր նիստը կմիավորվի ձախ կողմում գտնվող նստաշրջանի հետ: Իսկ աջ կողմում նիստը լինելու է առանձին, քանի որ այն հետևում է երկարատև անգործության։ Աշխատաշրջանի պատուհանները հիմնված են օգտատերերի գործունեության վրա, սակայն մուտքագրումներից օգտվում են ամսաթվի/ժամային դրոշմակնիքներից՝ որոշելու համար, թե որ նիստին է պատկանում գրառումը:

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»

Օգտագործելով նստաշրջանի պատուհանները՝ բաժնետոմսերի գործարքներին հետևելու համար

Եկեք օգտագործենք նիստերի պատուհանները՝ բորսայական գործարքների մասին տեղեկությունները հավաքելու համար: Սեսիայի պատուհանների իրականացումը ցուցադրված է Ցուցակ 5.5-ում (որը կարելի է գտնել src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java-ում):

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»
Դուք արդեն տեսել եք այս տոպոլոգիայի գործողությունների մեծ մասը, ուստի կարիք չկա դրանք նորից նայել այստեղ: Բայց այստեղ կան նաև մի քանի նոր տարրեր, որոնք մենք հիմա կքննարկենք:

GroupBy-ի ցանկացած գործողություն սովորաբար կատարում է ինչ-որ ագրեգացման գործողություն (հավաքում, հավաքում կամ հաշվում): Դուք կարող եք կատարել կա՛մ կուտակային ագրեգացիա ընթացիկ ընդհանուր գումարով, կա՛մ պատուհանների ագրեգացիա, որը հաշվի է առնում նշված ժամանակային պատուհանի ընթացքում գրանցված գրառումները:

Ցուցակ 5.5-ի կոդը հաշվում է սեսիայի պատուհաններում կատարված գործարքների քանակը: Նկ. 5.13 Այս գործողությունները վերլուծվում են քայլ առ քայլ:

Կանչելով windowedBy(SessionWindows.with(twentySeconds).until(fifteenMiutes)) մենք ստեղծում ենք նիստի պատուհան՝ 20 վայրկյան անգործության միջակայքով և 15 րոպե կայունության միջակայքով: 20 վայրկյան պարապ ընդմիջումը նշանակում է, որ հավելվածը կներառի ցանկացած մուտք, որը կհասնի ընթացիկ նստաշրջանի ավարտից կամ սկզբից 20 վայրկյանի ընթացքում ընթացիկ (ակտիվ) նստաշրջան:

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»
Հաջորդը, մենք նշում ենք, թե որ ագրեգացման գործողությունը պետք է կատարվի նիստի պատուհանում. այս դեպքում՝ հաշվում: Եթե ​​մուտքային մուտքն ընկնում է անգործության պատուհանից դուրս (ամսաթվի/ժամանակի դրոշմակնիքի երկու կողմերում), հավելվածը ստեղծում է նոր նստաշրջան: Պահպանման միջակայքը նշանակում է նիստի պահպանում որոշակի ժամանակով և թույլ է տալիս ուշացած տվյալներ, որոնք անցնում են նիստի անգործության շրջանից, բայց դեռ կարող են կցվել: Բացի այդ, միաձուլման արդյունքում առաջացած նոր նստաշրջանի սկիզբը և ավարտը համապատասխանում են ամենավաղ և ամենավերջին ամսաթվի/ժամային դրոշմակնիքին:

Եկեք նայենք մի քանի գրառումների հաշվարկի մեթոդից՝ տեսնելու, թե ինչպես են աշխատում նիստերը (Աղյուսակ 5.1):

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»
Երբ գրառումները հասնում են, մենք փնտրում ենք գոյություն ունեցող սեսիաները նույն բանալիով, ավարտի ժամանակը պակաս է ընթացիկ ամսաթվի/ժամանակի դրոշմանիշից՝ անգործության միջակայքը, և մեկնարկի ժամանակը ավելի մեծ է, քան ընթացիկ ամսաթիվը/ժամը + անգործության ինտերվալը: Հաշվի առնելով դա՝ չորս գրառում աղյուսակից։ 5.1-ը միավորվում են մեկ նստաշրջանի մեջ հետևյալ կերպ.

1. Գրառման 1-ը գալիս է առաջինը, ուստի մեկնարկի ժամը հավասար է ավարտի ժամանակին և 00:00:00 է:

2. Հաջորդը գալիս է մուտք 2, և մենք փնտրում ենք նիստեր, որոնք ավարտվում են 23:59:55-ից ոչ շուտ և սկսվում են ոչ ուշ, քան 00:00:35: Մենք գտնում ենք ձայնագրությունը 1 և միավորում ենք 1-ին և 2-րդ նիստերը: Մենք վերցնում ենք 1-ին նստաշրջանի մեկնարկի ժամը (ավելի վաղ) և 2-րդ նստաշրջանի ավարտի ժամը (ավելի ուշ), այնպես որ մեր նոր նիստը սկսվում է 00:00:00-ին և ավարտվում 00-ին: 00:15.

3. Գրառման 3-ը հասնում է, մենք փնտրում ենք նիստեր 00:00:30-ից 00:01:10-ի միջև և չենք գտնում: Ավելացրեք երկրորդ նստաշրջան 123-345-654,FFBE բանալու համար, որը սկսվում և ավարտվում է 00:00:50-ին:

4. Գրառման 4-ը գալիս է, և մենք փնտրում ենք սեանսներ 23:59:45-ից 00:00:25: Այս անգամ գտնվել են 1-ին և 2-րդ նիստերը: Բոլոր երեք նիստերը միավորված են մեկի մեջ՝ մեկնարկի ժամը 00:00:00 և ավարտի ժամը 00:00:15:

Այս բաժնում նկարագրվածից արժե հիշել հետևյալ կարևոր նրբերանգները.

  • նիստերը ֆիքսված չափի պատուհաններ չեն: Նիստի տեւողությունը որոշվում է տվյալ ժամանակահատվածում կատարվող ակտիվությամբ.
  • Տվյալների մեջ ամսաթվի/ժամային դրոշմանիշերը որոշում են՝ արդյոք իրադարձությունը պատկանում է գոյություն ունեցող նիստին, թե պարապ ժամանակաշրջանում:

Հաջորդիվ կքննարկենք պատուհանների հաջորդ տեսակը՝ «թափվող» պատուհանները:

«Շողացող» պատուհաններ

Շողացող պատուհանները ֆիքսում են իրադարձությունները, որոնք ընկնում են որոշակի ժամանակահատվածում: Պատկերացրեք, որ դուք պետք է յուրաքանչյուր 20 վայրկյանը մեկ նկարահանեք որոշակի ընկերության բաժնետոմսերի բոլոր գործարքները, այնպես որ դուք հավաքեք բոլոր իրադարձությունները այդ ժամանակահատվածում: 20 վայրկյան ընդմիջման վերջում պատուհանը պտտվում է և տեղափոխվում 20 վայրկյան դիտարկման նոր ընդմիջում: Նկար 5.14-ը ցույց է տալիս այս իրավիճակը:

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»
Ինչպես տեսնում եք, վերջին 20 վայրկյանում ստացված բոլոր իրադարձությունները ներառված են պատուհանում: Այս ժամանակահատվածի վերջում ստեղծվում է նոր պատուհան:

Ցուցակ 5.6-ը ցույց է տալիս ծածկագիրը, որը ցույց է տալիս շրջվող պատուհանների օգտագործումը բաժնետոմսերի գործարքները յուրաքանչյուր 20 վայրկյանը մեկ գրանցելու համար (գտնվում է src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java-ում):

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»
TimeWindows.of մեթոդի կանչի այս փոքր փոփոխությամբ դուք կարող եք օգտագործել շրջվող պատուհան: Այս օրինակը չի կանչում մինչև() մեթոդը, ուստի կօգտագործվի 24 ժամվա կանխադրված պահպանման միջակայքը:

Վերջապես, ժամանակն է անցնել պատուհանի տարբերակներից վերջինին` «ցատկել» պատուհաններին:

Լոգարիթմական («ցատկող») պատուհաններ

Լոգարիթմական/թափվող պատուհանները նման են շրջվող պատուհաններին, բայց մի փոքր տարբերությամբ: Լոգարիթմական պատուհանները չեն սպասում ժամանակի ավարտին, նախքան նոր պատուհան ստեղծելը՝ վերջին իրադարձությունները մշակելու համար: Նրանք սկսում են նոր հաշվարկներ պատուհանի տևողությունից պակաս սպասման ընդմիջումից հետո:

Շողացող և ցատկող պատուհանների տարբերությունները պատկերացնելու համար վերադառնանք բորսայական գործարքների հաշվման օրինակին: Մեր նպատակը դեռևս գործարքների քանակը հաշվելն է, բայց մենք չենք ցանկանում սպասել ամբողջ ժամանակ մինչև հաշվիչը թարմացնելը: Փոխարենը, մենք կթարմացնենք հաշվիչը ավելի կարճ ընդմիջումներով: Օրինակ, մենք դեռ կհաշվենք գործարքների քանակը յուրաքանչյուր 20 վայրկյանը մեկ, բայց հաշվիչը կթարմացնենք յուրաքանչյուր 5 վայրկյանը մեկ, ինչպես ցույց է տրված Նկ. 5.15. Այս դեպքում մենք հայտնվում ենք երեք արդյունքի պատուհաններով՝ համընկնող տվյալների հետ:

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»
5.7 ցուցակը ցույց է տալիս լոգարիթմական պատուհանների սահմանման կոդը (գտնվում է src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java-ում):

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»
Շողացող պատուհանը կարող է վերածվել թռչող պատուհանի՝ ավելացնելով զանգ advanceBy() մեթոդին: Ցուցադրված օրինակում խնայողության միջակայքը 15 րոպե է:

Դուք տեսաք այս բաժնում, թե ինչպես կարելի է սահմանափակել ագրեգացման արդյունքները ժամանակային պատուհաններով: Մասնավորապես, ուզում եմ, որ այս բաժնից հիշեք հետևյալ երեք բաները.

  • նստաշրջանի պատուհանների չափը սահմանափակվում է ոչ թե ժամանակաշրջանով, այլ օգտագործողի ակտիվությամբ.
  • «թափվող» պատուհանները տրամադրում են իրադարձությունների ակնարկ տվյալ ժամանակահատվածում.
  • Ցատկվող պատուհանների տեւողությունը ֆիքսված է, սակայն դրանք հաճախ թարմացվում են և կարող են պարունակել համընկնող գրառումներ բոլոր պատուհաններում:

Հաջորդը, մենք կսովորենք, թե ինչպես վերափոխել KTable-ը KStream-ի միացման համար:

5.3.3. KStream և KTable օբյեկտների միացում

4-րդ գլխում մենք քննարկեցինք KStream երկու օբյեկտների միացումը: Այժմ մենք պետք է սովորենք, թե ինչպես միացնել KTable-ը և KStream-ը: Սա կարող է անհրաժեշտ լինել հետևյալ պարզ պատճառով. KStream-ը գրառումների հոսք է, իսկ KTable-ը գրառումների թարմացումների հոսք է, բայց երբեմն կարող եք ցանկանալ լրացուցիչ համատեքստ ավելացնել գրառումների հոսքին՝ օգտագործելով KTable-ի թարմացումները:

Վերցնենք տվյալներ ֆոնդային բորսայական գործարքների քանակի վերաբերյալ և դրանք համատեղենք համապատասխան ճյուղերի բորսայական նորությունների հետ։ Ահա թե ինչ պետք է անեք դրան հասնելու համար՝ հաշվի առնելով այն կոդը, որն արդեն ունեք:

  1. Փոխակերպեք KTable օբյեկտը բաժնետոմսերի գործարքների քանակի վերաբերյալ տվյալների հետ KStream-ի, որից հետո բանալին փոխարինեք այս բաժնետոմսերի խորհրդանիշին համապատասխանող արդյունաբերության ոլորտը ցույց տվող բանալիով:
  2. Ստեղծեք KTable օբյեկտ, որը կարդում է տվյալներ ֆոնդային բորսայի նորություններով: Այս նոր KTable-ը դասակարգվելու է ըստ արդյունաբերության ոլորտի:
  3. Միացրեք նորությունների թարմացումները ըստ արդյունաբերության ոլորտի ֆոնդային բորսայական գործարքների քանակի մասին տեղեկատվության:

Այժմ տեսնենք, թե ինչպես իրականացնել այս գործողությունների ծրագիրը:

Փոխարկել KTable-ը KStream-ի

KTable-ը KStream-ի փոխարկելու համար անհրաժեշտ է անել հետևյալը.

  1. Զանգահարեք KTable.toStream() մեթոդը:
  2. Կանչելով KStream.map մեթոդը, բանալին փոխարինեք ոլորտի անունով, այնուհետև առբերեք TransactionSummary օբյեկտը Windowed օրինակից:

Մենք կապելու ենք այս գործողությունները հետևյալ կերպ (կոդը կարելի է գտնել src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java ֆայլում) (Ցուցակ 5.8):

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»
Քանի որ մենք կատարում ենք KStream.map գործողություն, վերադարձված KStream օրինակը նորից բաժանվում է ավտոմատ կերպով, երբ այն օգտագործվում է կապի մեջ:

Մենք ավարտել ենք փոխակերպման գործընթացը, այնուհետև մենք պետք է ստեղծենք KTable օբյեկտ՝ ֆոնդային նորություններ կարդալու համար:

KTable-ի ստեղծում ֆոնդային նորությունների համար

Բարեբախտաբար, KTable օբյեկտ ստեղծելը պահանջում է ընդամենը մեկ տող կոդ (կոդը կարելի է գտնել src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java-ում) (Ցուցակ 5.9):

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»
Հարկ է նշել, որ ոչ մի Serde օբյեկտ չի պահանջվում նշել, քանի որ կարգավորումներում օգտագործվում են լարային Serde-ներ: Նաև, օգտագործելով ԱՄԵՆԱՎԱՂ թվարկումը, աղյուսակը հենց սկզբում լրացվում է գրառումներով:

Այժմ մենք կարող ենք անցնել վերջին քայլին՝ կապին:

Նորությունների թարմացումների միացում գործարքների քանակի տվյալների հետ

Կապ ստեղծելը դժվար չէ։ Մենք կօգտագործենք ձախ միացում, եթե համապատասխան ոլորտի համար բաժնետոմսերի նորություններ չլինեն (անհրաժեշտ կոդը կարելի է գտնել src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java ֆայլում) (Ցուցակ 5.10):

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»
Այս leftJoin օպերատորը բավականին պարզ է: Ի տարբերություն 4-րդ գլխի միացումների, JoinWindow մեթոդը չի օգտագործվում, քանի որ KStream-KTable միացում կատարելիս KTable-ում կա միայն մեկ մուտք յուրաքանչյուր ստեղնի համար: Նման կապը ժամանակով սահմանափակված չէ. գրառումը կա՛մ KTable-ում է, կա՛մ բացակայում է: Հիմնական եզրակացությունը. KTable օբյեկտների միջոցով դուք կարող եք հարստացնել KStream-ը ավելի քիչ հաճախակի թարմացվող տեղեկատու տվյալներով:

Այժմ մենք կդիտարկենք KStream-ի իրադարձությունները հարստացնելու ավելի արդյունավետ միջոց:

5.3.4. GlobalKTable օբյեկտներ

Ինչպես տեսնում եք, իրադարձությունների հոսքերը հարստացնելու կամ դրանց համատեքստ ավելացնելու անհրաժեշտություն կա: 4-րդ գլխում դուք տեսաք KStream երկու օբյեկտների միջև կապը, իսկ նախորդ բաժնում տեսաք KStream-ի և KTable-ի կապը: Այս բոլոր դեպքերում անհրաժեշտ է վերաբաժանել տվյալների հոսքը՝ բանալիները նոր տեսակի կամ արժեքի քարտեզագրելիս: Երբեմն վերաբաժանումը կատարվում է բացահայտորեն, և երբեմն Kafka Streams-ը դա անում է ավտոմատ կերպով: Կրկին բաժանումն անհրաժեշտ է, քանի որ ստեղները փոխվել են, և գրառումները պետք է ավարտվեն նոր բաժիններում, հակառակ դեպքում կապն անհնար կլինի (սա քննարկվել է 4-րդ գլխում, 4.2.4 ենթաբաժնի «Տվյալների վերաբաշխում» բաժնում):

Կրկին բաժանումն ունի ծախս

Վերաբաշխումը պահանջում է ծախսեր՝ միջանկյալ թեմաներ ստեղծելու համար լրացուցիչ ռեսուրսային ծախսեր, կրկնօրինակ տվյալներ այլ թեմայում պահելու համար. դա նաև նշանակում է հետաձգման ավելացում այս թեմայից գրելու և կարդալու պատճառով: Բացի այդ, եթե ձեզ անհրաժեշտ է միանալ մեկից ավելի ասպեկտներով կամ հարթություններով, դուք պետք է կապեք միացումները, քարտեզագրեք գրառումները նոր ստեղներով և նորից գործարկեք բաժանման գործընթացը:

Միացում ավելի փոքր տվյալների հավաքածուներին

Որոշ դեպքերում կապակցվող հղման տվյալների ծավալը համեմատաբար փոքր է, ուստի դրանց ամբողջական պատճենները կարող են հեշտությամբ տեղավորվել տեղական յուրաքանչյուր հանգույցի վրա: Նման իրավիճակների համար Kafka Streams-ը տրամադրում է GlobalKTable դասը:

GlobalKTable-ի օրինակները եզակի են, քանի որ հավելվածը վերարտադրում է բոլոր տվյալները հանգույցներից յուրաքանչյուրին: Եվ քանի որ բոլոր տվյալները առկա են յուրաքանչյուր հանգույցում, կարիք չկա իրադարձության հոսքը բաժանել հղումային տվյալների բանալիով, որպեսզի այն հասանելի լինի բոլոր բաժանմունքներին: Կարող եք նաև առանց բանալի միացումներ կատարել՝ օգտագործելով GlobalKTable օբյեկտները: Եկեք վերադառնանք նախորդ օրինակներից մեկին՝ այս հատկությունը ցույց տալու համար:

KStream օբյեկտների միացում GlobalKTable օբյեկտներին

5.3.2 ենթաբաժնում մենք իրականացրել ենք գնորդների կողմից փոխանակման գործարքների պատուհանի ագրեգացիա: Այս ագրեգացիայի արդյունքները մոտավորապես այսպիսի տեսք ունեին.

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Թեև այս արդյունքները ծառայում էին նպատակին, ավելի օգտակար կլիներ, եթե ցուցադրվեին հաճախորդի անունը և ընկերության ամբողջական անվանումը: Հաճախորդի անունը և ընկերության անվանումը ավելացնելու համար կարող եք սովորական միացումներ կատարել, բայց ձեզ հարկավոր է կատարել երկու հիմնական քարտեզագրում և նորից բաժանում: GlobalKTable-ի միջոցով դուք կարող եք խուսափել նման գործողությունների ծախսերից:

Դա անելու համար մենք կօգտագործենք countStream օբյեկտը Ցուցակ 5.11-ից (համապատասխան կոդը կարելի է գտնել src/main/java/bbejeck/chapter_5/GlobalKTableExample.java-ում) և միացնել այն երկու GlobalKTable օբյեկտի:

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»
Մենք նախկինում արդեն քննարկել ենք դա, ուստի չեմ կրկնի: Բայց ես նշում եմ, որ toStream().map ֆունկցիայի կոդը աբստրակտացվում է ֆունկցիայի օբյեկտի մեջ՝ ներկառուցված լամբդա արտահայտության փոխարեն՝ ընթեռնելիության համար:

Հաջորդ քայլը GlobalKTable-ի երկու օրինակ հայտարարելն է (ցուցադրված կոդը կարելի է գտնել src/main/java/bbejeck/chapter_5/GlobalKTableExample.java ֆայլում) (Ցուցակ 5.12):

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»

Խնդրում ենք նկատի ունենալ, որ թեմաների անվանումները նկարագրված են թվարկված տեսակների միջոցով:

Այժմ, երբ մենք պատրաստ ենք բոլոր բաղադրիչները, մնում է գրել միացման կոդը (որը կարելի է գտնել src/main/java/bbejeck/chapter_5/GlobalKTableExample.java ֆայլում) (Ցուցակ 5.13):

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»
Թեև այս կոդում կա երկու միացում, դրանք շղթայված են, քանի որ դրանց արդյունքներից ոչ մեկը չի օգտագործվում առանձին: Արդյունքները ցուցադրվում են ամբողջ գործողության ավարտին:

Երբ գործարկեք վերը նշված միացման գործողությունը, դուք կստանաք այսպիսի արդյունքներ.

{customer='Barney, Smith' company="Exxon", transactions= 17}

Էությունը չի փոխվել, բայց այս արդյունքներն ավելի պարզ են թվում։

Եթե ​​դուք հետ հաշվում եք մինչև Գլուխ 4, դուք արդեն տեսել եք մի քանի տեսակի կապեր գործողության մեջ: Դրանք թվարկված են աղյուսակում: 5.2. Այս աղյուսակը արտացոլում է Kafka Streams-ի 1.0.0 տարբերակի միացման հնարավորությունները. Ինչ-որ բան կարող է փոխվել հետագա թողարկումներում:

«Կաֆկան հոսում է գործողության մեջ. Ծրագրեր և միկրոծառայություններ իրական ժամանակում աշխատանքի համար»
Ամեն ինչ ամփոփելու համար եկեք ամփոփենք հիմունքները. դուք կարող եք միացնել իրադարձությունների հոսքերը (KStream) և թարմացնել հոսքերը (KTable)՝ օգտագործելով տեղական վիճակը: Որպես այլընտրանք, եթե հղման տվյալների չափը չափազանց մեծ չէ, կարող եք օգտագործել GlobalKTable օբյեկտը: GlobalKTables-ը կրկնօրինակում է բոլոր բաժինները Kafka Streams հավելվածի յուրաքանչյուր հանգույցում՝ ապահովելով, որ բոլոր տվյալները հասանելի են՝ անկախ նրանից, թե որ բաժնին է համապատասխանում բանալին:

Հաջորդը մենք կտեսնենք Kafka Streams ֆունկցիան, որի շնորհիվ մենք կարող ենք դիտարկել վիճակի փոփոխությունները՝ առանց Կաֆկայի թեմայի տվյալների սպառման:

5.3.5. Հարցման ենթակա վիճակ

Մենք արդեն կատարել ենք մի քանի գործողություններ, որոնք ներառում են վիճակը և արդյունքները միշտ թողարկվում են վահանակի վրա (մշակման նպատակներով) կամ դրանք գրում են թեմայում (արտադրական նպատակներով): Արդյունքները թեմայի վրա գրելիս պետք է օգտագործեք Կաֆկա սպառողը՝ դրանք դիտելու համար:

Այս թեմաներից տվյալների ընթերցումը կարելի է համարել նյութականացված տեսակետների տեսակ։ Մեր նպատակների համար մենք կարող ենք օգտագործել Վիքիպեդիայից նյութականացված տեսակետի սահմանումը. «...հարցման արդյունքներ պարունակող տվյալների բազայի ֆիզիկական օբյեկտ։ Օրինակ, դա կարող է լինել հեռավոր տվյալների տեղական պատճեն, կամ աղյուսակի տողերի և/կամ սյունակների ենթաբազմություն կամ միացման արդյունքներ, կամ ամփոփ աղյուսակ, որը ստացվել է ագրեգացիայի միջոցով» (https://en.wikipedia.org/wiki /Naterialized_view):

Kafka Streams-ը նաև թույլ է տալիս ինտերակտիվ հարցումներ կատարել պետական ​​խանութներում՝ թույլ տալով ուղղակիորեն կարդալ այս նյութականացված տեսակետները: Կարևոր է նշել, որ պետական ​​խանութին ուղղված հարցումը միայն կարդալու գործողություն է: Սա երաշխավորում է, որ դուք ստիպված չեք լինի անհանգստանալ պատահաբար վիճակի անհամապատասխանության մասին, մինչ ձեր հավելվածը մշակում է տվյալները:

Կարևոր է պետական ​​խանութներին ուղղակիորեն հարցումներ անելու ունակությունը: Սա նշանակում է, որ դուք կարող եք ստեղծել վահանակի հավելվածներ՝ առանց Kafka-ի սպառողից նախապես տվյալներ վերցնելու: Այն նաև մեծացնում է հավելվածի արդյունավետությունը՝ պայմանավորված այն հանգամանքով, որ կարիք չկա նորից գրել տվյալները.

  • տվյալների տեղայնության շնորհիվ դրանք կարող են արագ մուտք գործել.
  • Տվյալների կրկնօրինակումը վերացված է, քանի որ դրանք գրված չեն արտաքին պահեստում:

Հիմնական բանը, որ ես ուզում եմ, որ դուք հիշեք, այն է, որ դուք կարող եք ուղղակիորեն հարցումներ կատարել պետությանը ձեր դիմումի ներսում: Այն հնարավորությունները, որոնք դա ձեզ տալիս է, չի կարելի գերագնահատել: Կաֆկայից ստացված տվյալները սպառելու և հավելվածի համար տվյալների բազայում գրառումներ պահելու փոխարեն, դուք կարող եք նույն արդյունքով հարցումներ կատարել պետական ​​խանութներից: Պետական ​​խանութներին ուղղված ուղղակի հարցումները նշանակում են ավելի քիչ կոդ (առանց սպառողի) և ավելի քիչ ծրագրակազմ (արդյունքները պահելու համար տվյալների բազայի աղյուսակի կարիք չկա):

Այս գլխում մենք բավականին շատ բան ենք անդրադարձել, ուստի պետական ​​խանութների դեմ ինտերակտիվ հարցումների մեր քննարկումն առայժմ կթողնենք: Բայց մի անհանգստացեք. 9-րդ գլխում մենք կստեղծենք ինտերակտիվ հարցումներով պարզ վահանակի հավելված: Այն կօգտագործի այս և նախորդ գլուխների որոշ օրինակներ՝ ցուցադրելու ինտերակտիվ հարցումները և ինչպես կարող եք դրանք ավելացնել Kafka Streams հավելվածներին:

Ամփոփում

  • KStream օբյեկտները ներկայացնում են իրադարձությունների հոսքեր, որոնք համեմատելի են տվյալների բազայում ներդիրների հետ: KTable օբյեկտները ներկայացնում են թարմացման հոսքեր, որոնք ավելի շատ նման են տվյալների բազայի թարմացումներին: KTable օբյեկտի չափը չի մեծանում, հին գրառումները փոխարինվում են նորերով:
  • KTable օբյեկտները պահանջվում են ագրեգացման գործողությունների համար:
  • Օգտագործելով պատուհանների գործառնությունները, դուք կարող եք համախմբված տվյալները բաժանել ժամանակի դույլերի:
  • GlobalKTable օբյեկտների շնորհիվ դուք կարող եք մուտք գործել հղման տվյալներ հավելվածի ցանկացած կետում՝ անկախ բաժանումից:
  • Հնարավոր են միացումներ KStream, KTable և GlobalKTable օբյեկտների միջև:

Մինչ այժմ մենք կենտրոնացել ենք Kafka Streams հավելվածների ստեղծման վրա՝ օգտագործելով բարձր մակարդակի KStream DSL: Չնայած բարձր մակարդակի մոտեցումը թույլ է տալիս ստեղծել կոկիկ և հակիրճ ծրագրեր, դրա օգտագործումը փոխզիջում է: DSL KStream-ի հետ աշխատելը նշանակում է բարձրացնել ձեր կոդի հակիրճությունը՝ նվազեցնելով կառավարման աստիճանը: Հաջորդ գլխում մենք կանդրադառնանք ցածր մակարդակի կարգավորիչ հանգույցի API-ին և կփորձենք այլ փոխզիջումներ: Ծրագրերն ավելի երկար կլինեն, քան նախկինում էին, բայց մենք կկարողանանք ստեղծել գրեթե ցանկացած կարգավորիչ հանգույց, որը մեզ կարող է անհրաժեշտ լինել:

→ Գրքի մասին ավելի մանրամասն կարող եք գտնել այստեղ հրատարակչի կայքը

→ Habrozhiteli-ի համար 25% զեղչ՝ օգտագործելով կտրոնը - Կաֆկայի հոսքեր

→ Գրքի թղթային տարբերակի համար վճարելուց հետո էլեկտրոնային գիրք կուղարկվի էլեկտրոնային փոստով:

Source: www.habr.com

Добавить комментарий