Հատված. 5.3. Ագրեգացման և պատուհանագծման գործողություններ
Այս բաժնում մենք կշարունակենք ուսումնասիրել Kafka Streams-ի ամենախոստումնալից հատվածները: Մինչ այժմ մենք լուսաբանել ենք Kafka Streams-ի հետևյալ ասպեկտները.
- մշակման տոպոլոգիայի ստեղծում;
- վիճակի օգտագործում հոսքային հավելվածներում;
- տվյալների հոսքի միացումների իրականացում;
- Տարբերությունները իրադարձությունների հոսքերի (KStream) և թարմացման հոսքերի (KTable) միջև:
Հետևյալ օրինակներում մենք միասին կբերենք այս բոլոր տարրերը: Դուք նաև կսովորեք պատուհանապատման՝ հոսքային հավելվածների մեկ այլ հիանալի հատկանիշի մասին: Մեր առաջին օրինակը կլինի պարզ ագրեգացիա:
5.3.1. Բաժնետոմսերի վաճառքի համախմբում ըստ արդյունաբերության ոլորտի
Ագրեգացումը և խմբավորումը կենսական գործիքներ են հոսքային տվյալների հետ աշխատելիս: Անհատական գրառումների ուսումնասիրությունը, երբ դրանք ստացվում են, հաճախ անբավարար են: Տվյալներից լրացուցիչ տեղեկատվություն հանելու համար անհրաժեշտ է դրանք խմբավորել և համատեղել։
Այս օրինակում դուք կհագնեք ամենօրյա վաճառողի զգեստը, ով պետք է հետևի մի քանի ոլորտների ընկերությունների բաժնետոմսերի վաճառքի ծավալին: Մասնավորապես, ձեզ հետաքրքրում են յուրաքանչյուր ոլորտում ամենամեծ բաժնետոմսերի վաճառքով հինգ ընկերությունները:
Նման համախմբումը կպահանջի հետևյալ մի քանի քայլերը՝ տվյալները ցանկալի ձևով թարգմանելու համար (խոսելով ընդհանուր տերմիններով):
- Ստեղծեք թեմայի վրա հիմնված աղբյուր, որը հրապարակում է չմշակված բաժնետոմսերի առևտրային տեղեկատվությունը: Մենք ստիպված կլինենք StockTransaction տեսակի օբյեկտը քարտեզագրել ShareVolume տիպի օբյեկտի վրա: Բանն այն է, որ StockTransaction օբյեկտը պարունակում է վաճառքի մետատվյալներ, սակայն մեզ անհրաժեշտ են միայն վաճառվող բաժնետոմսերի քանակի մասին տվյալներ։
- Խմբավորել ShareVolume տվյալները ըստ բաժնետոմսերի խորհրդանիշի: Ըստ խորհրդանիշների խմբավորվելուց հետո դուք կարող եք այս տվյալները փլուզել բաժնետոմսերի վաճառքի ծավալների ենթագումարների մեջ: Հարկ է նշել, որ KStream.groupBy մեթոդը վերադարձնում է KGroupedStream տիպի օրինակ: Եվ դուք կարող եք ստանալ KTable օրինակ՝ հետագայում զանգահարելով KGroupedStream.reduce մեթոդը:
Ինչ է KGroupedStream ինտերֆեյսը
KStream.groupBy և KStream.groupByKey մեթոդները վերադարձնում են KGroupedStream-ի օրինակ: KGroupedStream-ը իրադարձությունների հոսքի միջանկյալ ներկայացումն է՝ ըստ բանալիների խմբավորումից հետո: Այն ամենևին նախատեսված չէ դրա հետ անմիջական աշխատանքի համար։ Փոխարենը, KGroupedStream-ն օգտագործվում է ագրեգացման գործողությունների համար, որոնք միշտ հանգեցնում են KTable-ի: Եվ քանի որ ագրեգացման գործողությունների արդյունքը KTable է, և նրանք օգտագործում են պետական խանութ, հնարավոր է, որ արդյունքում ոչ բոլոր թարմացումներն ուղարկվեն խողովակաշարի ներքև:
KTable.groupBy մեթոդը վերադարձնում է նմանատիպ KGroupedTable - թարմացումների հոսքի միջանկյալ ներկայացում, որը վերախմբավորվում է ըստ բանալիի:
Եկեք մի փոքր ընդմիջում կատարենք և նայենք Նկ. 5.9, որը ցույց է տալիս, թե ինչի ենք հասել։ Այս տոպոլոգիան ձեզ արդեն շատ ծանոթ պետք է լինի:
Այժմ նայենք այս տոպոլոգիայի կոդը (այն կարելի է գտնել src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java ֆայլում) (Ցուցակ 5.2):
Տվյալ ծածկագիրը առանձնանում է իր հակիրճությամբ և մի քանի տողերում կատարված գործողությունների մեծ ծավալով։ Դուք կարող եք նկատել ինչ-որ նոր բան builder.stream մեթոդի առաջին պարամետրում. Enum տեսակի AutoOffsetReset.EARLIEST (կա նաև ՎԵՐՋԻՆ) արժեք, որը սահմանված է Consumed.withOffsetResetPolicy մեթոդով: Թվարկման այս տեսակը կարող է օգտագործվել յուրաքանչյուր KStream-ի կամ KTable-ի համար օֆսեթ վերակայման ռազմավարություն սահմանելու համար և գերակայություն է ստանում կազմաձևից օֆսեթ վերակայման տարբերակից:
GroupByKey և GroupBy
KStream ինտերֆեյսն ունի գրառումների խմբավորման երկու եղանակ՝ GroupByKey և GroupBy: Երկուսն էլ վերադարձնում են KGroupedTable, այնպես որ դուք կարող եք մտածել, թե որն է տարբերությունը նրանց միջև և երբ օգտագործել որն է:
GroupByKey մեթոդն օգտագործվում է, երբ KStream-ի ստեղները արդեն դատարկ չեն: Եվ ամենակարևորը, «վերաբաշխում է պահանջում» դրոշը երբեք չի դրվել:
GroupBy մեթոդը ենթադրում է, որ դուք փոխել եք խմբավորման ստեղները, ուստի վերաբաշխման դրոշակը սահմանվում է true-ի: GroupBy մեթոդից հետո միացումներ, ագրեգացիաներ և այլն կատարելը կհանգեցնի ավտոմատ վերաբաժանման:
Ամփոփում. Հնարավորության դեպքում դուք պետք է օգտագործեք GroupByKey-ը, քան GroupBy-ը:
Հասկանալի է, թե ինչ են անում mapValues-ը և groupBy մեթոդները, ուստի եկեք նայենք sum() մեթոդին (գտնվում է src/main/java/bbejeck/model/ShareVolume.java-ում) (Ցուցակ 5.3):
ShareVolume.sum մեթոդը վերադարձնում է բաժնետոմսերի վաճառքի ծավալի ընթացիկ ընդհանուր գումարը, և հաշվարկների ամբողջ շղթայի արդյունքը KTable օբյեկտ է: . Այժմ դուք հասկանում եք KTable-ի դերը: Երբ ShareVolume օբյեկտները ժամանում են, համապատասխան KTable օբյեկտը պահում է վերջին ընթացիկ թարմացումը: Կարևոր է հիշել, որ բոլոր թարմացումներն արտացոլված են նախորդ shareVolumeKTable-ում, բայց ոչ բոլորն են ուղարկվում հետագա:
Այնուհետև մենք օգտագործում ենք այս KTable-ը` միավորելու համար (ըստ վաճառվող բաժնետոմսերի քանակի), որպեսզի հասնենք յուրաքանչյուր ոլորտում վաճառվող բաժնետոմսերի ամենաբարձր ծավալներով հինգ ընկերություններին: Մեր գործողություններն այս դեպքում նման կլինեն առաջին ագրեգացման գործողություններին:
- Կատարեք մեկ այլ groupBy գործողություն՝ առանձին ShareVolume օբյեկտները խմբավորելու համար՝ ըստ ոլորտի:
- Սկսեք ամփոփել ShareVolume օբյեկտները: Այս անգամ ագրեգացման օբյեկտը ֆիքսված չափի առաջնահերթ հերթ է: Այս ֆիքսված չափի հերթում պահպանվում են միայն վաճառված բաժնետոմսերի ամենամեծ քանակով հինգ ընկերությունները:
- Նախորդ պարբերության հերթերը գծեք լարային արժեքի վրա և վերադարձրեք ամենավաճառվող բաժնետոմսերի լավագույն հնգյակը ըստ թվերի ըստ ոլորտի:
- Արդյունքները տողային ձևով գրեք թեմային:
Նկ. Նկար 5.10-ը ցույց է տալիս տվյալների հոսքի տոպոլոգիայի գրաֆիկը: Ինչպես տեսնում եք, վերամշակման երկրորդ փուլը բավականին պարզ է:
Այժմ, երբ մենք հստակ հասկանում ենք մշակման այս երկրորդ փուլի կառուցվածքը, կարող ենք դիմել դրա սկզբնաղբյուրին (այն կգտնեք src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java ֆայլում) (Ցուցակ 5.4) .
Այս սկզբնավորիչը պարունակում է fixedQueue փոփոխական: Սա հատուկ օբյեկտ է, որը ադապտեր է java.util.TreeSet-ի համար, որն օգտագործվում է վաճառվող բաժնետոմսերի նվազման կարգով վերին N արդյունքներին հետևելու համար:
Դուք արդեն տեսել եք groupBy-ի և mapValues-ի զանգերը, ուստի մենք չենք անդրադառնա դրանց (մենք անվանում ենք KTable.toStream մեթոդ, քանի որ KTable.print մեթոդը հնացած է): Բայց դուք դեռ չեք տեսել aggregate()-ի KTable տարբերակը, այնպես որ մենք մի փոքր ժամանակ կծախսենք դրա քննարկմանը:
Ինչպես հիշում եք, KTable-ը տարբերվում է նրանով, որ նույն ստեղներով գրառումները համարվում են թարմացումներ: KTable-ը փոխարինում է հին մուտքը նորով: Ագրեգացումը տեղի է ունենում նույն ձևով. նույն բանալիով վերջին գրառումները ագրեգացվում են: Երբ գրառումը գալիս է, այն ավելացվում է FixedSizePriorityQueue դասի օրինակին, օգտագործելով ավելորդ (երկրորդ պարամետր ագրեգատ մեթոդի կանչում), բայց եթե մեկ այլ գրառում արդեն գոյություն ունի նույն բանալիով, ապա հին գրառումը հանվում է հանմանչի միջոցով (երրորդ պարամետրը. ագրեգատ մեթոդի կանչը):
Այս ամենը նշանակում է, որ մեր ագրեգատորը՝ FixedSizePriorityQueue-ը, չի համախմբում բոլոր արժեքները մեկ բանալիով, այլ պահպանում է բաժնետոմսերի N առավել վաճառվող տեսակների քանակությունների շարժական գումարը: Յուրաքանչյուր մուտքային գրառում պարունակում է մինչ այժմ վաճառված բաժնետոմսերի ընդհանուր թիվը: KTable-ը ձեզ կտրամադրի տեղեկատվություն այն մասին, թե որ ընկերությունների բաժնետոմսերն են ներկայումս ամենաշատ վաճառվողները՝ առանց յուրաքանչյուր թարմացման շարժական ագրեգացման պահանջի:
Մենք սովորեցինք անել երկու կարևոր բան.
- խմբավորել արժեքները KTable-ում ընդհանուր բանալիով;
- կատարել օգտակար գործողություններ, ինչպիսիք են հավաքագրումը և համախմբումը այս խմբավորված արժեքների վրա:
Իմանալը, թե ինչպես կատարել այս գործողությունները, կարևոր է Kafka Streams հավելվածի միջոցով շարժվող տվյալների իմաստը հասկանալու և այն տեղեկության փոխանցման համար:
Մենք նաև հավաքել ենք այս գրքում ավելի վաղ քննարկված որոշ հիմնական հասկացություններ: Գլուխ 4-ում մենք քննարկեցինք, թե որքան սխալ հանդուրժող, տեղական վիճակը կարևոր է հոսքային հավելվածի համար: Այս գլխի առաջին օրինակը ցույց տվեց, թե ինչու է տեղական պետությունն այդքան կարևոր. այն ձեզ հնարավորություն է տալիս հետևելու, թե ինչ տեղեկատվություն եք արդեն տեսել: Տեղական մուտքը խուսափում է ցանցի հետաձգումներից՝ հավելվածը դարձնելով ավելի արդյունավետ և սխալների նկատմամբ:
Ցանկացած հավաքման կամ ագրեգացման գործողություն կատարելիս պետք է նշեք պետական խանութի անունը: Համախմբման և համախմբման գործողությունները վերադարձնում են KTable օրինակ, իսկ KTable-ն օգտագործում է վիճակի պահեստավորում՝ հին արդյունքները նորերով փոխարինելու համար: Ինչպես տեսաք, ոչ բոլոր թարմացումներն են ուղարկվում խողովակաշարով, և դա կարևոր է, քանի որ ագրեգացման գործողությունները նախատեսված են ամփոփ տեղեկատվություն ստանալու համար: Եթե դուք չեք կիրառում տեղական վիճակը, KTable-ը կուղարկի բոլոր ագրեգացման և համախմբման արդյունքները:
Այնուհետև մենք կանդրադառնանք որոշակի ժամանակահատվածում այնպիսի գործողությունների կատարմանը, ինչպիսին է ագրեգացումը, այսպես կոչված, պատուհանների կիրառումը:
5.3.2. Պատուհանների գործողություններ
Նախորդ բաժնում մենք ներկայացրեցինք սահող կոնվոլյուցիան և ագրեգացումը: Հավելվածը իրականացրել է բաժնետոմսերի վաճառքի շարունակական համախմբում, որին հաջորդել է բորսայում հինգ ամենավաճառվող բաժնետոմսերի միավորումը:
Երբեմն անհրաժեշտ է լինում արդյունքների նման շարունակական համախմբում և համախմբում: Եվ երբեմն անհրաժեշտ է գործողություններ կատարել միայն որոշակի ժամանակահատվածում: Օրինակ՝ հաշվեք, թե վերջին 10 րոպեում քանի բորսայական գործարք է կատարվել կոնկրետ ընկերության բաժնետոմսերով։ Կամ քանի օգտատեր է կտտացրել նոր գովազդային դրոշի վրա վերջին 15 րոպեի ընթացքում: Հավելվածը կարող է նման գործողություններ կատարել մի քանի անգամ, բայց արդյունքներով, որոնք վերաբերում են միայն որոշակի ժամանակահատվածներին (ժամանակային պատուհաններ):
Փոխանակման գործարքների հաշվում գնորդի կողմից
Հաջորդ օրինակում մենք կհետևենք բաժնետոմսերի գործարքներին մի քանի թրեյդերների միջև՝ խոշոր կազմակերպություններ կամ խելացի անհատ ֆինանսիստներ:
Այս հետևելու երկու հնարավոր պատճառ կա. Դրանցից մեկն այն է, որ պետք է իմանալ, թե շուկայի առաջատարներն ինչ են գնում/վաճառում: Եթե այս խոշոր խաղացողները և բարդ ներդրողները հնարավորություն են տեսնում, իմաստ ունի հետևել նրանց ռազմավարությանը: Երկրորդ պատճառը ապօրինի ինսայդերական առևտրի հնարավոր նշաններ հայտնաբերելու ցանկությունն է: Դա անելու համար ձեզ հարկավոր է վերլուծել վաճառքի մեծ աճերի հարաբերակցությունը կարևոր մամուլի հաղորդագրությունների հետ:
Նման հետևելը բաղկացած է հետևյալ քայլերից.
- բաժնետոմսերի գործարքների թեմայից կարդալու հոսքի ստեղծում;
- մուտքային գրառումների խմբավորում ըստ գնորդի ID-ի և բաժնետոմսի խորհրդանիշի: GroupBy մեթոդը կանչելը վերադարձնում է KGroupedStream դասի օրինակը;
- KGroupedStream.windowedBy մեթոդը վերադարձնում է տվյալների հոսք, որը սահմանափակվում է ժամանակային պատուհանով, որը թույլ է տալիս պատուհաններով ագրեգացնել: Կախված պատուհանի տեսակից, վերադարձվում է TimeWindowedKStream կամ SessionWindowedKStream;
- գործարքների հաշվարկը միավորման գործողության համար: Պատուհաններով տվյալների հոսքը որոշում է, թե արդյոք որոշակի գրառումը հաշվի է առնվել այս հաշվարկում.
- արդյունքները գրել թեմայում կամ դրանք մշակելիս ելքացնել վահանակին:
Այս հավելվածի տոպոլոգիան պարզ է, բայց դրա հստակ պատկերը օգտակար կլինի: Եկեք նայենք Նկ. 5.11.
Հաջորդը, մենք կանդրադառնանք պատուհանի գործառնությունների ֆունկցիոնալությանը և համապատասխան կոդը:
Պատուհանների տեսակները
Kafka Streams-ում կան երեք տեսակի պատուհաններ.
- նիստային;
- «գլուխկոտրում»;
- սահում / ցատկում.
Որն ընտրել կախված է ձեր բիզնեսի պահանջներից: Շրջանառվող և թռչող պատուհանները ժամանակով սահմանափակված են, մինչդեռ նստաշրջանի պատուհանները սահմանափակվում են օգտվողի ակտիվությամբ. նիստ(ներ)ի տևողությունը որոշվում է բացառապես օգտատերի ակտիվությամբ: Հիմնական բանը, որ պետք է հիշել, այն է, որ բոլոր պատուհանների տեսակները հիմնված են մուտքերի ամսաթվի/ժամային դրոշմանիշերի վրա, ոչ թե համակարգի ժամանակի վրա:
Հաջորդը, մենք իրականացնում ենք մեր տոպոլոգիան պատուհանների տեսակներից յուրաքանչյուրի հետ: Ամբողջական կոդը կտրվի միայն առաջին օրինակում, այլ տեսակի պատուհանների համար ոչինչ չի փոխվի, բացի պատուհանի գործողության տեսակից:
Նիստի պատուհաններ
Նիստի պատուհանները շատ տարբերվում են բոլոր այլ տեսակի պատուհաններից: Դրանք սահմանափակված են ոչ այնքան ժամանակով, որքան օգտատիրոջ ակտիվությամբ (կամ այն սուբյեկտի ակտիվությամբ, որին դուք կցանկանայիք հետևել): Նիստի պատուհանները սահմանազատված են անգործության ժամանակաշրջաններով:
Նկար 5.12-ը ցույց է տալիս նիստերի պատուհանների հայեցակարգը: Փոքր նիստը կմիավորվի ձախ կողմում գտնվող նստաշրջանի հետ: Իսկ աջ կողմում նիստը լինելու է առանձին, քանի որ այն հետևում է երկարատև անգործության։ Աշխատաշրջանի պատուհանները հիմնված են օգտատերերի գործունեության վրա, սակայն մուտքագրումներից օգտվում են ամսաթվի/ժամային դրոշմակնիքներից՝ որոշելու համար, թե որ նիստին է պատկանում գրառումը:
Օգտագործելով նստաշրջանի պատուհանները՝ բաժնետոմսերի գործարքներին հետևելու համար
Եկեք օգտագործենք նիստերի պատուհանները՝ բորսայական գործարքների մասին տեղեկությունները հավաքելու համար: Սեսիայի պատուհանների իրականացումը ցուցադրված է Ցուցակ 5.5-ում (որը կարելի է գտնել src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java-ում):
Դուք արդեն տեսել եք այս տոպոլոգիայի գործողությունների մեծ մասը, ուստի կարիք չկա դրանք նորից նայել այստեղ: Բայց այստեղ կան նաև մի քանի նոր տարրեր, որոնք մենք հիմա կքննարկենք:
GroupBy-ի ցանկացած գործողություն սովորաբար կատարում է ինչ-որ ագրեգացման գործողություն (հավաքում, հավաքում կամ հաշվում): Դուք կարող եք կատարել կա՛մ կուտակային ագրեգացիա ընթացիկ ընդհանուր գումարով, կա՛մ պատուհանների ագրեգացիա, որը հաշվի է առնում նշված ժամանակային պատուհանի ընթացքում գրանցված գրառումները:
Ցուցակ 5.5-ի կոդը հաշվում է սեսիայի պատուհաններում կատարված գործարքների քանակը: Նկ. 5.13 Այս գործողությունները վերլուծվում են քայլ առ քայլ:
Կանչելով windowedBy(SessionWindows.with(twentySeconds).until(fifteenMiutes)) մենք ստեղծում ենք նիստի պատուհան՝ 20 վայրկյան անգործության միջակայքով և 15 րոպե կայունության միջակայքով: 20 վայրկյան պարապ ընդմիջումը նշանակում է, որ հավելվածը կներառի ցանկացած մուտք, որը կհասնի ընթացիկ նստաշրջանի ավարտից կամ սկզբից 20 վայրկյանի ընթացքում ընթացիկ (ակտիվ) նստաշրջան:
Հաջորդը, մենք նշում ենք, թե որ ագրեգացման գործողությունը պետք է կատարվի նիստի պատուհանում. այս դեպքում՝ հաշվում: Եթե մուտքային մուտքն ընկնում է անգործության պատուհանից դուրս (ամսաթվի/ժամանակի դրոշմակնիքի երկու կողմերում), հավելվածը ստեղծում է նոր նստաշրջան: Պահպանման միջակայքը նշանակում է նիստի պահպանում որոշակի ժամանակով և թույլ է տալիս ուշացած տվյալներ, որոնք անցնում են նիստի անգործության շրջանից, բայց դեռ կարող են կցվել: Բացի այդ, միաձուլման արդյունքում առաջացած նոր նստաշրջանի սկիզբը և ավարտը համապատասխանում են ամենավաղ և ամենավերջին ամսաթվի/ժամային դրոշմակնիքին:
Եկեք նայենք մի քանի գրառումների հաշվարկի մեթոդից՝ տեսնելու, թե ինչպես են աշխատում նիստերը (Աղյուսակ 5.1):
Երբ գրառումները հասնում են, մենք փնտրում ենք գոյություն ունեցող սեսիաները նույն բանալիով, ավարտի ժամանակը պակաս է ընթացիկ ամսաթվի/ժամանակի դրոշմանիշից՝ անգործության միջակայքը, և մեկնարկի ժամանակը ավելի մեծ է, քան ընթացիկ ամսաթիվը/ժամը + անգործության ինտերվալը: Հաշվի առնելով դա՝ չորս գրառում աղյուսակից։ 5.1-ը միավորվում են մեկ նստաշրջանի մեջ հետևյալ կերպ.
1. Գրառման 1-ը գալիս է առաջինը, ուստի մեկնարկի ժամը հավասար է ավարտի ժամանակին և 00:00:00 է:
2. Հաջորդը գալիս է մուտք 2, և մենք փնտրում ենք նիստեր, որոնք ավարտվում են 23:59:55-ից ոչ շուտ և սկսվում են ոչ ուշ, քան 00:00:35: Մենք գտնում ենք ձայնագրությունը 1 և միավորում ենք 1-ին և 2-րդ նիստերը: Մենք վերցնում ենք 1-ին նստաշրջանի մեկնարկի ժամը (ավելի վաղ) և 2-րդ նստաշրջանի ավարտի ժամը (ավելի ուշ), այնպես որ մեր նոր նիստը սկսվում է 00:00:00-ին և ավարտվում 00-ին: 00:15.
3. Գրառման 3-ը հասնում է, մենք փնտրում ենք նիստեր 00:00:30-ից 00:01:10-ի միջև և չենք գտնում: Ավելացրեք երկրորդ նստաշրջան 123-345-654,FFBE բանալու համար, որը սկսվում և ավարտվում է 00:00:50-ին:
4. Գրառման 4-ը գալիս է, և մենք փնտրում ենք սեանսներ 23:59:45-ից 00:00:25: Այս անգամ գտնվել են 1-ին և 2-րդ նիստերը: Բոլոր երեք նիստերը միավորված են մեկի մեջ՝ մեկնարկի ժամը 00:00:00 և ավարտի ժամը 00:00:15:
Այս բաժնում նկարագրվածից արժե հիշել հետևյալ կարևոր նրբերանգները.
- նիստերը ֆիքսված չափի պատուհաններ չեն: Նիստի տեւողությունը որոշվում է տվյալ ժամանակահատվածում կատարվող ակտիվությամբ.
- Տվյալների մեջ ամսաթվի/ժամային դրոշմանիշերը որոշում են՝ արդյոք իրադարձությունը պատկանում է գոյություն ունեցող նիստին, թե պարապ ժամանակաշրջանում:
Հաջորդիվ կքննարկենք պատուհանների հաջորդ տեսակը՝ «թափվող» պատուհանները:
«Շողացող» պատուհաններ
Շողացող պատուհանները ֆիքսում են իրադարձությունները, որոնք ընկնում են որոշակի ժամանակահատվածում: Պատկերացրեք, որ դուք պետք է յուրաքանչյուր 20 վայրկյանը մեկ նկարահանեք որոշակի ընկերության բաժնետոմսերի բոլոր գործարքները, այնպես որ դուք հավաքեք բոլոր իրադարձությունները այդ ժամանակահատվածում: 20 վայրկյան ընդմիջման վերջում պատուհանը պտտվում է և տեղափոխվում 20 վայրկյան դիտարկման նոր ընդմիջում: Նկար 5.14-ը ցույց է տալիս այս իրավիճակը:
Ինչպես տեսնում եք, վերջին 20 վայրկյանում ստացված բոլոր իրադարձությունները ներառված են պատուհանում: Այս ժամանակահատվածի վերջում ստեղծվում է նոր պատուհան:
Ցուցակ 5.6-ը ցույց է տալիս ծածկագիրը, որը ցույց է տալիս շրջվող պատուհանների օգտագործումը բաժնետոմսերի գործարքները յուրաքանչյուր 20 վայրկյանը մեկ գրանցելու համար (գտնվում է src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java-ում):
TimeWindows.of մեթոդի կանչի այս փոքր փոփոխությամբ դուք կարող եք օգտագործել շրջվող պատուհան: Այս օրինակը չի կանչում մինչև() մեթոդը, ուստի կօգտագործվի 24 ժամվա կանխադրված պահպանման միջակայքը:
Վերջապես, ժամանակն է անցնել պատուհանի տարբերակներից վերջինին` «ցատկել» պատուհաններին:
Լոգարիթմական («ցատկող») պատուհաններ
Լոգարիթմական/թափվող պատուհանները նման են շրջվող պատուհաններին, բայց մի փոքր տարբերությամբ: Լոգարիթմական պատուհանները չեն սպասում ժամանակի ավարտին, նախքան նոր պատուհան ստեղծելը՝ վերջին իրադարձությունները մշակելու համար: Նրանք սկսում են նոր հաշվարկներ պատուհանի տևողությունից պակաս սպասման ընդմիջումից հետո:
Շողացող և ցատկող պատուհանների տարբերությունները պատկերացնելու համար վերադառնանք բորսայական գործարքների հաշվման օրինակին: Մեր նպատակը դեռևս գործարքների քանակը հաշվելն է, բայց մենք չենք ցանկանում սպասել ամբողջ ժամանակ մինչև հաշվիչը թարմացնելը: Փոխարենը, մենք կթարմացնենք հաշվիչը ավելի կարճ ընդմիջումներով: Օրինակ, մենք դեռ կհաշվենք գործարքների քանակը յուրաքանչյուր 20 վայրկյանը մեկ, բայց հաշվիչը կթարմացնենք յուրաքանչյուր 5 վայրկյանը մեկ, ինչպես ցույց է տրված Նկ. 5.15. Այս դեպքում մենք հայտնվում ենք երեք արդյունքի պատուհաններով՝ համընկնող տվյալների հետ:
5.7 ցուցակը ցույց է տալիս լոգարիթմական պատուհանների սահմանման կոդը (գտնվում է src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java-ում):
Շողացող պատուհանը կարող է վերածվել թռչող պատուհանի՝ ավելացնելով զանգ advanceBy() մեթոդին: Ցուցադրված օրինակում խնայողության միջակայքը 15 րոպե է:
Դուք տեսաք այս բաժնում, թե ինչպես կարելի է սահմանափակել ագրեգացման արդյունքները ժամանակային պատուհաններով: Մասնավորապես, ուզում եմ, որ այս բաժնից հիշեք հետևյալ երեք բաները.
- նստաշրջանի պատուհանների չափը սահմանափակվում է ոչ թե ժամանակաշրջանով, այլ օգտագործողի ակտիվությամբ.
- «թափվող» պատուհանները տրամադրում են իրադարձությունների ակնարկ տվյալ ժամանակահատվածում.
- Ցատկվող պատուհանների տեւողությունը ֆիքսված է, սակայն դրանք հաճախ թարմացվում են և կարող են պարունակել համընկնող գրառումներ բոլոր պատուհաններում:
Հաջորդը, մենք կսովորենք, թե ինչպես վերափոխել KTable-ը KStream-ի միացման համար:
5.3.3. KStream և KTable օբյեկտների միացում
4-րդ գլխում մենք քննարկեցինք KStream երկու օբյեկտների միացումը: Այժմ մենք պետք է սովորենք, թե ինչպես միացնել KTable-ը և KStream-ը: Սա կարող է անհրաժեշտ լինել հետևյալ պարզ պատճառով. KStream-ը գրառումների հոսք է, իսկ KTable-ը գրառումների թարմացումների հոսք է, բայց երբեմն կարող եք ցանկանալ լրացուցիչ համատեքստ ավելացնել գրառումների հոսքին՝ օգտագործելով KTable-ի թարմացումները:
Վերցնենք տվյալներ ֆոնդային բորսայական գործարքների քանակի վերաբերյալ և դրանք համատեղենք համապատասխան ճյուղերի բորսայական նորությունների հետ։ Ահա թե ինչ պետք է անեք դրան հասնելու համար՝ հաշվի առնելով այն կոդը, որն արդեն ունեք:
- Փոխակերպեք KTable օբյեկտը բաժնետոմսերի գործարքների քանակի վերաբերյալ տվյալների հետ KStream-ի, որից հետո բանալին փոխարինեք այս բաժնետոմսերի խորհրդանիշին համապատասխանող արդյունաբերության ոլորտը ցույց տվող բանալիով:
- Ստեղծեք KTable օբյեկտ, որը կարդում է տվյալներ ֆոնդային բորսայի նորություններով: Այս նոր KTable-ը դասակարգվելու է ըստ արդյունաբերության ոլորտի:
- Միացրեք նորությունների թարմացումները ըստ արդյունաբերության ոլորտի ֆոնդային բորսայական գործարքների քանակի մասին տեղեկատվության:
Այժմ տեսնենք, թե ինչպես իրականացնել այս գործողությունների ծրագիրը:
Փոխարկել KTable-ը KStream-ի
KTable-ը KStream-ի փոխարկելու համար անհրաժեշտ է անել հետևյալը.
- Զանգահարեք KTable.toStream() մեթոդը:
- Կանչելով KStream.map մեթոդը, բանալին փոխարինեք ոլորտի անունով, այնուհետև առբերեք TransactionSummary օբյեկտը Windowed օրինակից:
Մենք կապելու ենք այս գործողությունները հետևյալ կերպ (կոդը կարելի է գտնել src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java ֆայլում) (Ցուցակ 5.8):
Քանի որ մենք կատարում ենք KStream.map գործողություն, վերադարձված KStream օրինակը նորից բաժանվում է ավտոմատ կերպով, երբ այն օգտագործվում է կապի մեջ:
Մենք ավարտել ենք փոխակերպման գործընթացը, այնուհետև մենք պետք է ստեղծենք KTable օբյեկտ՝ ֆոնդային նորություններ կարդալու համար:
KTable-ի ստեղծում ֆոնդային նորությունների համար
Բարեբախտաբար, KTable օբյեկտ ստեղծելը պահանջում է ընդամենը մեկ տող կոդ (կոդը կարելի է գտնել src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java-ում) (Ցուցակ 5.9):
Հարկ է նշել, որ ոչ մի Serde օբյեկտ չի պահանջվում նշել, քանի որ կարգավորումներում օգտագործվում են լարային Serde-ներ: Նաև, օգտագործելով ԱՄԵՆԱՎԱՂ թվարկումը, աղյուսակը հենց սկզբում լրացվում է գրառումներով:
Այժմ մենք կարող ենք անցնել վերջին քայլին՝ կապին:
Նորությունների թարմացումների միացում գործարքների քանակի տվյալների հետ
Կապ ստեղծելը դժվար չէ։ Մենք կօգտագործենք ձախ միացում, եթե համապատասխան ոլորտի համար բաժնետոմսերի նորություններ չլինեն (անհրաժեշտ կոդը կարելի է գտնել src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java ֆայլում) (Ցուցակ 5.10):
Այս leftJoin օպերատորը բավականին պարզ է: Ի տարբերություն 4-րդ գլխի միացումների, JoinWindow մեթոդը չի օգտագործվում, քանի որ KStream-KTable միացում կատարելիս KTable-ում կա միայն մեկ մուտք յուրաքանչյուր ստեղնի համար: Նման կապը ժամանակով սահմանափակված չէ. գրառումը կա՛մ KTable-ում է, կա՛մ բացակայում է: Հիմնական եզրակացությունը. KTable օբյեկտների միջոցով դուք կարող եք հարստացնել KStream-ը ավելի քիչ հաճախակի թարմացվող տեղեկատու տվյալներով:
Այժմ մենք կդիտարկենք KStream-ի իրադարձությունները հարստացնելու ավելի արդյունավետ միջոց:
5.3.4. GlobalKTable օբյեկտներ
Ինչպես տեսնում եք, իրադարձությունների հոսքերը հարստացնելու կամ դրանց համատեքստ ավելացնելու անհրաժեշտություն կա: 4-րդ գլխում դուք տեսաք KStream երկու օբյեկտների միջև կապը, իսկ նախորդ բաժնում տեսաք KStream-ի և KTable-ի կապը: Այս բոլոր դեպքերում անհրաժեշտ է վերաբաժանել տվյալների հոսքը՝ բանալիները նոր տեսակի կամ արժեքի քարտեզագրելիս: Երբեմն վերաբաժանումը կատարվում է բացահայտորեն, և երբեմն Kafka Streams-ը դա անում է ավտոմատ կերպով: Կրկին բաժանումն անհրաժեշտ է, քանի որ ստեղները փոխվել են, և գրառումները պետք է ավարտվեն նոր բաժիններում, հակառակ դեպքում կապն անհնար կլինի (սա քննարկվել է 4-րդ գլխում, 4.2.4 ենթաբաժնի «Տվյալների վերաբաշխում» բաժնում):
Կրկին բաժանումն ունի ծախս
Վերաբաշխումը պահանջում է ծախսեր՝ միջանկյալ թեմաներ ստեղծելու համար լրացուցիչ ռեսուրսային ծախսեր, կրկնօրինակ տվյալներ այլ թեմայում պահելու համար. դա նաև նշանակում է հետաձգման ավելացում այս թեմայից գրելու և կարդալու պատճառով: Բացի այդ, եթե ձեզ անհրաժեշտ է միանալ մեկից ավելի ասպեկտներով կամ հարթություններով, դուք պետք է կապեք միացումները, քարտեզագրեք գրառումները նոր ստեղներով և նորից գործարկեք բաժանման գործընթացը:
Միացում ավելի փոքր տվյալների հավաքածուներին
Որոշ դեպքերում կապակցվող հղման տվյալների ծավալը համեմատաբար փոքր է, ուստի դրանց ամբողջական պատճենները կարող են հեշտությամբ տեղավորվել տեղական յուրաքանչյուր հանգույցի վրա: Նման իրավիճակների համար Kafka Streams-ը տրամադրում է GlobalKTable դասը:
GlobalKTable-ի օրինակները եզակի են, քանի որ հավելվածը վերարտադրում է բոլոր տվյալները հանգույցներից յուրաքանչյուրին: Եվ քանի որ բոլոր տվյալները առկա են յուրաքանչյուր հանգույցում, կարիք չկա իրադարձության հոսքը բաժանել հղումային տվյալների բանալիով, որպեսզի այն հասանելի լինի բոլոր բաժանմունքներին: Կարող եք նաև առանց բանալի միացումներ կատարել՝ օգտագործելով GlobalKTable օբյեկտները: Եկեք վերադառնանք նախորդ օրինակներից մեկին՝ այս հատկությունը ցույց տալու համար:
KStream օբյեկտների միացում GlobalKTable օբյեկտներին
5.3.2 ենթաբաժնում մենք իրականացրել ենք գնորդների կողմից փոխանակման գործարքների պատուհանի ագրեգացիա: Այս ագրեգացիայի արդյունքները մոտավորապես այսպիսի տեսք ունեին.
{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16
Թեև այս արդյունքները ծառայում էին նպատակին, ավելի օգտակար կլիներ, եթե ցուցադրվեին հաճախորդի անունը և ընկերության ամբողջական անվանումը: Հաճախորդի անունը և ընկերության անվանումը ավելացնելու համար կարող եք սովորական միացումներ կատարել, բայց ձեզ հարկավոր է կատարել երկու հիմնական քարտեզագրում և նորից բաժանում: GlobalKTable-ի միջոցով դուք կարող եք խուսափել նման գործողությունների ծախսերից:
Դա անելու համար մենք կօգտագործենք countStream օբյեկտը Ցուցակ 5.11-ից (համապատասխան կոդը կարելի է գտնել src/main/java/bbejeck/chapter_5/GlobalKTableExample.java-ում) և միացնել այն երկու GlobalKTable օբյեկտի:
Մենք նախկինում արդեն քննարկել ենք դա, ուստի չեմ կրկնի: Բայց ես նշում եմ, որ toStream().map ֆունկցիայի կոդը աբստրակտացվում է ֆունկցիայի օբյեկտի մեջ՝ ներկառուցված լամբդա արտահայտության փոխարեն՝ ընթեռնելիության համար:
Հաջորդ քայլը GlobalKTable-ի երկու օրինակ հայտարարելն է (ցուցադրված կոդը կարելի է գտնել src/main/java/bbejeck/chapter_5/GlobalKTableExample.java ֆայլում) (Ցուցակ 5.12):
Խնդրում ենք նկատի ունենալ, որ թեմաների անվանումները նկարագրված են թվարկված տեսակների միջոցով:
Այժմ, երբ մենք պատրաստ ենք բոլոր բաղադրիչները, մնում է գրել միացման կոդը (որը կարելի է գտնել src/main/java/bbejeck/chapter_5/GlobalKTableExample.java ֆայլում) (Ցուցակ 5.13):
Թեև այս կոդում կա երկու միացում, դրանք շղթայված են, քանի որ դրանց արդյունքներից ոչ մեկը չի օգտագործվում առանձին: Արդյունքները ցուցադրվում են ամբողջ գործողության ավարտին:
Երբ գործարկեք վերը նշված միացման գործողությունը, դուք կստանաք այսպիսի արդյունքներ.
{customer='Barney, Smith' company="Exxon", transactions= 17}
Էությունը չի փոխվել, բայց այս արդյունքներն ավելի պարզ են թվում։
Եթե դուք հետ հաշվում եք մինչև Գլուխ 4, դուք արդեն տեսել եք մի քանի տեսակի կապեր գործողության մեջ: Դրանք թվարկված են աղյուսակում: 5.2. Այս աղյուսակը արտացոլում է Kafka Streams-ի 1.0.0 տարբերակի միացման հնարավորությունները. Ինչ-որ բան կարող է փոխվել հետագա թողարկումներում:
Ամեն ինչ ամփոփելու համար եկեք ամփոփենք հիմունքները. դուք կարող եք միացնել իրադարձությունների հոսքերը (KStream) և թարմացնել հոսքերը (KTable)՝ օգտագործելով տեղական վիճակը: Որպես այլընտրանք, եթե հղման տվյալների չափը չափազանց մեծ չէ, կարող եք օգտագործել GlobalKTable օբյեկտը: GlobalKTables-ը կրկնօրինակում է բոլոր բաժինները Kafka Streams հավելվածի յուրաքանչյուր հանգույցում՝ ապահովելով, որ բոլոր տվյալները հասանելի են՝ անկախ նրանից, թե որ բաժնին է համապատասխանում բանալին:
Հաջորդը մենք կտեսնենք Kafka Streams ֆունկցիան, որի շնորհիվ մենք կարող ենք դիտարկել վիճակի փոփոխությունները՝ առանց Կաֆկայի թեմայի տվյալների սպառման:
5.3.5. Հարցման ենթակա վիճակ
Մենք արդեն կատարել ենք մի քանի գործողություններ, որոնք ներառում են վիճակը և արդյունքները միշտ թողարկվում են վահանակի վրա (մշակման նպատակներով) կամ դրանք գրում են թեմայում (արտադրական նպատակներով): Արդյունքները թեմայի վրա գրելիս պետք է օգտագործեք Կաֆկա սպառողը՝ դրանք դիտելու համար:
Այս թեմաներից տվյալների ընթերցումը կարելի է համարել նյութականացված տեսակետների տեսակ։ Մեր նպատակների համար մենք կարող ենք օգտագործել Վիքիպեդիայից նյութականացված տեսակետի սահմանումը. «...հարցման արդյունքներ պարունակող տվյալների բազայի ֆիզիկական օբյեկտ։ Օրինակ, դա կարող է լինել հեռավոր տվյալների տեղական պատճեն, կամ աղյուսակի տողերի և/կամ սյունակների ենթաբազմություն կամ միացման արդյունքներ, կամ ամփոփ աղյուսակ, որը ստացվել է ագրեգացիայի միջոցով» (https://en.wikipedia.org/wiki /Naterialized_view):
Kafka Streams-ը նաև թույլ է տալիս ինտերակտիվ հարցումներ կատարել պետական խանութներում՝ թույլ տալով ուղղակիորեն կարդալ այս նյութականացված տեսակետները: Կարևոր է նշել, որ պետական խանութին ուղղված հարցումը միայն կարդալու գործողություն է: Սա երաշխավորում է, որ դուք ստիպված չեք լինի անհանգստանալ պատահաբար վիճակի անհամապատասխանության մասին, մինչ ձեր հավելվածը մշակում է տվյալները:
Կարևոր է պետական խանութներին ուղղակիորեն հարցումներ անելու ունակությունը: Սա նշանակում է, որ դուք կարող եք ստեղծել վահանակի հավելվածներ՝ առանց Kafka-ի սպառողից նախապես տվյալներ վերցնելու: Այն նաև մեծացնում է հավելվածի արդյունավետությունը՝ պայմանավորված այն հանգամանքով, որ կարիք չկա նորից գրել տվյալները.
- տվյալների տեղայնության շնորհիվ դրանք կարող են արագ մուտք գործել.
- Տվյալների կրկնօրինակումը վերացված է, քանի որ դրանք գրված չեն արտաքին պահեստում:
Հիմնական բանը, որ ես ուզում եմ, որ դուք հիշեք, այն է, որ դուք կարող եք ուղղակիորեն հարցումներ կատարել պետությանը ձեր դիմումի ներսում: Այն հնարավորությունները, որոնք դա ձեզ տալիս է, չի կարելի գերագնահատել: Կաֆկայից ստացված տվյալները սպառելու և հավելվածի համար տվյալների բազայում գրառումներ պահելու փոխարեն, դուք կարող եք նույն արդյունքով հարցումներ կատարել պետական խանութներից: Պետական խանութներին ուղղված ուղղակի հարցումները նշանակում են ավելի քիչ կոդ (առանց սպառողի) և ավելի քիչ ծրագրակազմ (արդյունքները պահելու համար տվյալների բազայի աղյուսակի կարիք չկա):
Այս գլխում մենք բավականին շատ բան ենք անդրադարձել, ուստի պետական խանութների դեմ ինտերակտիվ հարցումների մեր քննարկումն առայժմ կթողնենք: Բայց մի անհանգստացեք. 9-րդ գլխում մենք կստեղծենք ինտերակտիվ հարցումներով պարզ վահանակի հավելված: Այն կօգտագործի այս և նախորդ գլուխների որոշ օրինակներ՝ ցուցադրելու ինտերակտիվ հարցումները և ինչպես կարող եք դրանք ավելացնել Kafka Streams հավելվածներին:
Ամփոփում
- KStream օբյեկտները ներկայացնում են իրադարձությունների հոսքեր, որոնք համեմատելի են տվյալների բազայում ներդիրների հետ: KTable օբյեկտները ներկայացնում են թարմացման հոսքեր, որոնք ավելի շատ նման են տվյալների բազայի թարմացումներին: KTable օբյեկտի չափը չի մեծանում, հին գրառումները փոխարինվում են նորերով:
- KTable օբյեկտները պահանջվում են ագրեգացման գործողությունների համար:
- Օգտագործելով պատուհանների գործառնությունները, դուք կարող եք համախմբված տվյալները բաժանել ժամանակի դույլերի:
- GlobalKTable օբյեկտների շնորհիվ դուք կարող եք մուտք գործել հղման տվյալներ հավելվածի ցանկացած կետում՝ անկախ բաժանումից:
- Հնարավոր են միացումներ KStream, KTable և GlobalKTable օբյեկտների միջև:
Մինչ այժմ մենք կենտրոնացել ենք Kafka Streams հավելվածների ստեղծման վրա՝ օգտագործելով բարձր մակարդակի KStream DSL: Չնայած բարձր մակարդակի մոտեցումը թույլ է տալիս ստեղծել կոկիկ և հակիրճ ծրագրեր, դրա օգտագործումը փոխզիջում է: DSL KStream-ի հետ աշխատելը նշանակում է բարձրացնել ձեր կոդի հակիրճությունը՝ նվազեցնելով կառավարման աստիճանը: Հաջորդ գլխում մենք կանդրադառնանք ցածր մակարդակի կարգավորիչ հանգույցի API-ին և կփորձենք այլ փոխզիջումներ: Ծրագրերն ավելի երկար կլինեն, քան նախկինում էին, բայց մենք կկարողանանք ստեղծել գրեթե ցանկացած կարգավորիչ հանգույց, որը մեզ կարող է անհրաժեշտ լինել:
→ Գրքի մասին ավելի մանրամասն կարող եք գտնել այստեղ
→ Habrozhiteli-ի համար 25% զեղչ՝ օգտագործելով կտրոնը - Կաֆկայի հոսքեր
→ Գրքի թղթային տարբերակի համար վճարելուց հետո էլեկտրոնային գիրք կուղարկվի էլեկտրոնային փոստով:
Source: www.habr.com