«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер" Салам, Хабро шаарынын тургундары! Бул китеп жипти иштетүүнү түшүнгүсү келген ар бир иштеп чыгуучу үчүн ылайыктуу. Бөлүштүрүлгөн программалоону түшүнүү Кафка жана Кафка агымдарын жакшыраак түшүнүүгө жардам берет. Кафка рамкасынын өзүн билсеңиз жакшы болмок, бирок бул зарыл эмес: мен сизге керектүү нерселердин баарын айтып берем. Тажрыйбалуу Кафканы иштеп чыгуучулар жана жаңы баштагандар бул китептен Kafka Streams китепканасын колдонуу менен кызыктуу агымдарды иштетүүчү тиркемелерди түзүүнү үйрөнүшөт. Сериялаштыруу сыяктуу түшүнүктөр менен тааныш болгон орто жана алдыңкы Java иштеп чыгуучулары Kafka Streams тиркемелерин түзүү үчүн өз жөндөмдөрүн колдонууну үйрөнүшөт. Китептин баштапкы коду Java 8де жазылган жана Java 8 лямбда экспрессиясынын синтаксисин олуттуу колдонот, андыктан lambda функциялары менен иштөөнү билүү (башка программалоо тилинде да) пайдалуу болот.

үзүндү. 5.3. Агрегация жана терезелөө операциялары

Бул бөлүмдө биз Кафка агымынын эң келечектүү бөлүктөрүн изилдөөнү улантабыз. Буга чейин биз Кафка агымынын төмөнкү аспектилерин карадык:

  • иштетүү топологиясын түзүү;
  • агымдык колдонмолордо абалды колдонуу;
  • маалымат агымынын байланыштарын аткаруу;
  • окуя агымдары (KStream) жана жаңыртуу агымдары (KTable) ортосундагы айырмачылыктар.

Төмөнкү мисалдарда биз бул элементтердин баарын чогуу келтиребиз. Сиз ошондой эле агымдык тиркемелердин дагы бир сонун өзгөчөлүгүн терезелөө жөнүндө биле аласыз. Биздин биринчи мисал жөнөкөй топтоо болот.

5.3.1. Өнөр жай секторлору боюнча запастарды сатуунун жыйындысы

Агрегациялоо жана топтоо агымдык маалыматтар менен иштөөдө маанилүү инструмент болуп саналат. Кабыл алынган жеке жазууларды текшерүү көп учурда жетишсиз. Маалыматтардан кошумча маалымат алуу үчүн аларды топтоп, бириктирүү керек.

Бул мисалда сиз бир нече тармактардагы компаниялардын акцияларынын сатуу көлөмүнө көз салышы керек болгон бир күндүк соодагердин кийимин кийесиз. Тактап айтканда, сиз ар бир тармакта эң көп үлүшү сатылган беш компанияга кызыгасыз.

Мындай топтоо маалыматтарды керектүү формага которуу үчүн төмөнкү бир нече кадамдарды талап кылат (жалпы сөз менен айтканда).

  1. чийки биржа соода маалымат жарыялайт темага негизделген булагы түзүү. StockTransaction түрүндөгү объектти ShareVolume түрүндөгү объектке салыштырууга туура келет. Бул StockTransaction объектисинде сатуунун метадайындары бар, бирок бизге сатылып жаткан акциялардын саны жөнүндө гана маалымат керек.
  2. ShareVolume берилиштерин акциялардын белгиси боюнча топтоо. Символ боюнча топтоштурулгандан кийин, сиз бул маалыматтарды акцияларды сатуу көлөмүнүн орто суммаларына жыйыштыра аласыз. Белгилей кетсек, KStream.groupBy ыкмасы KGroupedStream түрүндөгү мисалды кайтарат. KGroupedStream.reduce ыкмасын андан ары чакыруу менен KTable инстанциясын ала аласыз.

KGroupedStream интерфейси деген эмне

KStream.groupBy жана KStream.groupByKey ыкмалары KGroupedStreamдин үлгүсүн кайтарат. KGroupedStream - баскычтар боюнча топтоштурулгандан кийин окуялардын агымынын ортодогу өкүлчүлүгү. Бул такыр аны менен түздөн-түз иштөө үчүн арналган эмес. Анын ордуна, KGroupedStream ар дайым KTtableге алып келген топтоо операциялары үчүн колдонулат. Жана топтоо операцияларынын натыйжасы KTable болгондуктан жана алар мамлекеттик дүкөндү колдонушкандыктан, натыйжада бардык жаңыртуулар түтүктөн ылдыйга жөнөтүлбөй калышы мүмкүн.

KTable.groupBy ыкмасы окшош KGroupedTable - ачкыч боюнча кайра топтолгон жаңыртуулардын агымынын ортодогу өкүлчүлүгүн кайтарат.

Бир аз тыныгуу алып, сүрөттү карап көрөлү. 5.9, бул биз эмнеге жетишкенибизди көрсөтөт. Бул топология сизге мурунтан эле тааныш болушу керек.

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"
Эми бул топологиянын кодун карап көрөлү (аны src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java файлынан тапса болот) (5.2 Листинг).

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"
Берилген код өзүнүн кыскалыгы жана бир нече сапта аткарылган аракеттердин чоң көлөмү менен айырмаланат. Сиз builder.stream ыкмасынын биринчи параметринде жаңы нерсени байкай аласыз: Consumed.withOffsetResetPolicy ыкмасын колдонуу менен коюлган AutoOffsetReset.EARLIEST энум түрүнүн мааниси (АКЫРКЫ да бар). Бул санап чыгуу түрү ар бир KStream же KTable үчүн офсеттик абалга келтирүү стратегиясын аныктоо үчүн колдонулушу мүмкүн жана конфигурациядагы офсеттик баштапкы абалга келтирүү опциясынан артыкчылыкка ээ.

GroupByKey жана GroupBy

KStream интерфейсинде жазууларды топтоонун эки ыкмасы бар: GroupByKey жана GroupBy. Экөө тең KGroupedTableди кайтарышат, андыктан алардын ортосунда кандай айырма бар жана кайсынысын качан колдонуу керек деп ойлонуп жатсаңыз болот?

GroupByKey ыкмасы KStreamдеги ачкычтар мурунтан эле бош эмес болгондо колдонулат. Эң негизгиси, "кайра бөлүүнү талап кылат" желек эч качан коюлган эмес.

GroupBy методу сиз топтоо баскычтарын өзгөрттүңүз деп болжолдойт, ошондуктан кайра бөлүштүрүү желеги чындыкка коюлат. GroupBy ыкмасынан кийин кошулууларды, бириктирүүлөрдү ж.б. аткаруу автоматтык түрдө кайра бөлүүгө алып келет.
Корутунду: Мүмкүн болушунча, GroupBy эмес, GroupByKey колдонушуңуз керек.

mapValues ​​жана groupBy методдору эмне кылары түшүнүктүү, андыктан sum() ыкмасын карап көрөлү (src/main/java/bbejeck/model/ShareVolume.javaда табылган) (5.3 тизмеси).

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"
ShareVolume.sum ыкмасы акцияны сатуу көлөмүнүн иштеп жаткан жалпы санын кайтарып берет жана эсептөөлөрдүн бүт чынжырынын натыйжасы KTable объекти болуп саналат . Эми сиз KTable кандай роль ойноорун түшүндүңүз. ShareVolume объекттери келгенде, тиешелүү KTable объекти акыркы учурдагы жаңыртууну сактайт. Бардык жаңыртуулар мурунку shareVolumeKTableде чагылдырылганын эстен чыгарбоо керек, бирок баары андан ары жөнөтүлбөйт.

Андан кийин биз бул KTableди бириктирүү үчүн (сатылган акциялардын саны боюнча) ар бир тармакта сатылган акциялардын көлөмү эң көп беш компанияга жетүү үчүн колдонобуз. Бул учурда биздин иш-аракеттерибиз биринчи топтоодогудай болот.

  1. Жеке ShareVolume объекттерин тармак боюнча топтоо үчүн дагы groupBy операциясын аткарыңыз.
  2. ShareVolume объекттерин жыйынтыктап баштаңыз. Бул жолу топтоо объектиси белгиленген өлчөмдөгү артыкчылыктуу кезек. Бул белгиленген өлчөмдөгү кезекте эң көп акциялар сатылган беш компания гана сакталат.
  3. Мурунку абзацтагы кезектерди саптык мааниге салыштырып, тармак боюнча саны боюнча эң көп сатылган беш эң мыкты акцияларды кайтарыңыз.
  4. Натыйжаларды темага сап түрүндө жазыңыз.

Сүрөттө. 5.10-сүрөттө маалымат агымынын топологиясынын графиги көрсөтүлгөн. Көрүнүп тургандай, кайра иштетүүнүн экинчи айлампасы абдан жөнөкөй.

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"
Эми биз кайра иштетүүнүн экинчи раундунун структурасын так түшүнгөнүбүздөн кийин, анын баштапкы кодуна кайрылсак болот (сиз аны src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java файлынан таба аласыз) (5.4 Листинг) .

Бул инициализатор fixedQueue өзгөрмөсүн камтыйт. Бул ыңгайлаштырылган объект, ал java.util.TreeSet үчүн адаптер болуп саналат, ал соодаланган акциялардын азаюу тартибинде эң жогорку N натыйжаларына көз салуу үчүн колдонулат.

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"
Сиз буга чейин groupBy жана mapValues ​​чалууларын көргөнсүз, андыктан биз аларга кирбейбиз (KTable.toStream ыкмасын чакырып жатабыз, анткени KTable.print ыкмасы эскирген). Бирок сиз aggregate() нын KTable версиясын көрө элексиз, андыктан аны талкуулоого бир аз убакыт коротабыз.

Эсиңизде болсо, KTable'ди айырмалоочу нерсе, ошол эле баскычтары бар жазуулар жаңыртуулар болуп эсептелет. KTable эски жазууну жаңысына алмаштырат. Агрегация ушундай жол менен ишке ашат: бир эле ачкыч менен акыркы жазуулар бириктирилет. Жазуу келгенде, ал FixedSizePriorityQueue классынын инстанциясына суммалоочунун жардамы менен кошулат (бирдиктүү ыкма чакырыгында экинчи параметр), бирок ошол эле ачкыч менен башка жазуу мурунтан эле бар болсо, анда эски жазуу кемитүү (үчүнчү параметр жалпы ыкманы чакыруу).

Мунун баары биздин агрегаторубуз, FixedSizePriorityQueue бардык маанилерди бир ачкыч менен бириктирбестен, акциялардын эң көп сатылган N түрлөрүнүн кыймылдуу суммасын сактай турганын билдирет. Ар бир кирүүчү жазуу буга чейин сатылган акциялардын жалпы санын камтыйт. KTable сизге кайсы компаниялардын акциялары эң көп сатылып жатканы тууралуу маалымат берет, ар бир жаңыртууларды жылдырууну талап кылбастан.

Биз эки маанилүү нерсени жасоону үйрөндүк:

  • KTableдеги баалуулуктарды жалпы ачкыч менен топтоо;
  • бул топтоштурулган баалуулуктар боюнча топтоо жана топтоо сыяктуу пайдалуу операцияларды аткарыңыз.

Бул операцияларды кантип аткарууну билүү Kafka Streams тиркемеси аркылуу өтүүчү маалыматтардын маанисин түшүнүү жана ал кандай маалыматты алып келерин түшүнүү үчүн маанилүү.

Биз ошондой эле бул китепте мурда талкууланган кээ бир негизги түшүнүктөрдү чогулттук. 4-бөлүмдө биз каталарга чыдамдуу, жергиликтүү абал агымдык колдонмо үчүн канчалык маанилүү экенин талкууладык. Бул бөлүмдөгү биринчи мисал эмне үчүн жергиликтүү мамлекет мынчалык маанилүү экенин көрсөттү — ал сизге мурда көргөн маалыматыңызга көз салуу мүмкүнчүлүгүн берет. Жергиликтүү жетүү тармактын кечигүүлөрүн болтурбай, тиркемени кыйла натыйжалуу жана каталарга туруштук берет.

Кандайдыр бир топтоо же бириктирүү операциясын аткарып жатканда, мамлекеттик дүкөндүн атын көрсөтүү керек. Чогултуу жана бириктирүү операциялары KTable инстанциясын кайтарат жана KTable эски натыйжаларды жаңыларына алмаштыруу үчүн мамлекеттик сактагычты колдонот. Көрүнүп тургандай, жаңыртуулардын баары эле жөнөтүлбөйт жана бул маанилүү, анткени топтоо операциялары кыскача маалыматты чыгаруу үчүн иштелип чыккан. Эгер сиз жергиликтүү штатты колдонбосоңуз, KTable бардык топтоо жана топтоо натыйжаларын башка жакка багыттайт.

Андан кийин, биз белгилүү бир убакыттын ичинде топтоо сыяктуу операцияларды - терезелөө операциялары деп аталган операцияларды карап чыгабыз.

5.3.2. Терезе операциялары

Мурунку бөлүмдө биз жылма конволюция жана агрегацияны киргиздик. Тиркеме биржада эң көп сатылган беш акцияны бириктирүү менен акцияларды сатуунун үзгүлтүксүз жыйымын жүргүздү.

Кээде мындай үзгүлтүксүз топтоо жана жыйынтыктарды чогултуу зарыл. Ал эми кээде белгилүү бир убакыттын ичинде гана операцияларды жасоо керек. Мисалы, акыркы 10 мүнөт ичинде белгилүү бир компаниянын акциялары менен канча алмашуу операциялары жасалганын эсептеп көрүңүз. Же акыркы 15 мүнөт ичинде канча колдонуучулар жаңы жарнамалык баннерди басышты. Тиркеме мындай операцияларды бир нече жолу аткарышы мүмкүн, бирок натыйжалар белгиленген убакыт аралыгына гана тиешелүү (убакыт терезелери).

Сатып алуучу тарабынан алмашуу операцияларын эсептөө

Кийинки мисалда биз бир нече трейдерлердин — ири уюмдар же акылдуу жеке финансисттер боюнча биржа операцияларына көз салабыз.

Бул көз салуу үчүн эки мүмкүн болгон себеп бар. Алардын бири рыноктун лидерлери эмнени сатып/сатып жатканын билүү зарылчылыгы. Эгерде бул чоң оюнчулар жана татаал инвесторлор мүмкүнчүлүктөрдү көрүшсө, алардын стратегиясын аткаруунун мааниси бар. Экинчи себеп - мыйзамсыз инсайдердик сооданын мүмкүн болуучу белгилерин байкап калуу каалоосу. Бул үчүн, сиз маанилүү пресс-релиздер менен чоң сатуунун корреляциясын талдооңуз керек.

Мындай көз салуу төмөнкү кадамдардан турат:

  • биржалык операциялар темасынан окуу үчүн агым түзүү;
  • сатып алуучунун ID жана биржа белгиси боюнча кирген жазууларды топтоо. groupBy ыкмасын чакыруу KGroupedStream классынын инстанциясын кайтарат;
  • KGroupedStream.windowedBy ыкмасы убакыт терезеси менен чектелген маалымат агымын кайтарып берет, бул терезелешип бириктирүүгө мүмкүндүк берет. Терезенин түрүнө жараша TimeWindowedKStream же SessionWindowedKStream кайтарылат;
  • топтоо операциясы үчүн транзакциялардын саны. Терезелүү маалымат агымы бул эсепте белгилүү бир жазуу эске алынарын аныктайт;
  • темага жыйынтыктарды жазуу же иштеп чыгуу учурунда аларды консолго чыгаруу.

Бул колдонмонун топологиясы жөнөкөй, бирок анын так сүрөтү пайдалуу болмок. Келгиле, сүрөттү карап көрөлү. 5.11.

Андан кийин, биз терезе операцияларынын функционалдуулугун жана тиешелүү кодду карап чыгабыз.

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"

Терезе түрлөрү

Кафка агымдарында терезелердин үч түрү бар:

  • сессиялык;
  • "төңкөрүш";
  • жылма/секирүү.

Кайсынысын тандоо бизнес талаптарына жараша болот. Айлануу жана секирүү терезелери убакыт менен чектелген, ал эми сеанс терезелери колдонуучунун активдүүлүгү менен чектелет — сеанстын(лардын) узактыгы колдонуучунун канчалык активдүү экендиги менен гана аныкталат. Эсиңизде болсун, бардык терезе түрлөрү системанын убактысына эмес, жазуулардын датасы/убакыт штамптарына негизделген.

Андан кийин, биз ар бир терезе түрлөрү менен топологиябызды ишке ашырабыз. Толук код биринчи мисалда гана берилет, терезелердин башка түрлөрү үчүн терезе операциясынын түрүнөн башка эч нерсе өзгөрбөйт.

Сеанс терезелери

Сеанс терезелери башка бардык терезелерден абдан айырмаланат. Алар убакыт менен эмес, колдонуучунун аракети (же сиз көз салгыңыз келген объекттин иш-аракети) менен чектелет. Сеанс терезелери аракетсиздик мөөнөттөрү менен чектелет.

5.12-сүрөт сеанс терезелеринин түшүнүгүн көрсөтөт. Кичирээк сессия сол жагындагы сессия менен биригет. Ал эми оң жактагы сессия өзүнчө болот, анткени ал узак убакыт бою аракетсиздик менен коштолот. Сеанс терезелери колдонуучунун аракетине негизделген, бирок жазуу кайсы сессияга таандык экенин аныктоо үчүн жазуулардан алынган дата/убакыт штамптарын колдонуңуз.

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"

Биржа операцияларына көз салуу үчүн сеанс терезелерин колдонуу

Келгиле, алмашуу транзакциялары жөнүндө маалыматты алуу үчүн сессия терезелерин колдонолу. Сеанс терезелерин ишке ашыруу 5.5 Листингинде көрсөтүлгөн (аны src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.javaдан тапса болот).

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"
Сиз бул топологиядагы операциялардын көбүн көргөнсүз, андыктан аларды бул жерден кайра кароонун кереги жок. Бирок бул жерде дагы бир нече жаңы элементтер бар, аларды азыр талкуулайбыз.

Ар кандай groupBy операциясы, адатта, кандайдыр бир топтоо операциясын аткарат (чогултуу, топтоо же эсептөө). Сиз иштеп жаткан жалпы сумма менен кумулятивдүү бириктирүүнү, же белгиленген убакыт терезесинин ичиндеги жазууларды эсепке алган терезе бириктирүүнү аткара аласыз.

5.5 Листингиндеги код сессия терезелериндеги транзакциялардын санын эсептейт. Сүрөттө. 5.13 Бул иш-аракеттер этап-этабы менен талданат.

windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) чалуу менен биз 20 секунд аракетсиздик интервалы жана 15 мүнөттүк туруктуулук интервалы менен сеанс терезесин түзөбүз. 20 секунддук бош убакыт аралыгы тиркеме учурдагы сессия аяктагандан же башталгандан кийин 20 секунданын ичинде келген жазууну учурдагы (активдүү) сессияга камтый турганын билдирет.

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"
Андан кийин, биз сеанс терезесинде кандай бириктирүү операциясы аткарылышы керек экенин белгилейбиз - бул учурда сана. Эгерде кирүүчү жазуу аракетсиздик терезесинин сыртында калса (күн/убакыт штампынын эки тарабы), колдонмо жаңы сессияны түзөт. Сактоо аралыгы сеансты белгилүү бир убакытка кармап туруу дегенди билдирет жана сессиянын аракетсиздик мөөнөтүнөн ашкан, бирок дагы эле тиркелиши мүмкүн болгон кеч берилиштерге мүмкүндүк берет. Кошумчалай кетсек, бириктирүүдөн келип чыккан жаңы сессиянын башталышы жана аягы эң алгачкы жана акыркы дата/убакыт штампына туура келет.

Сеанстардын кантип иштээрин көрүү үчүн эсептөө ыкмасынан бир нече жазууларды карап көрөлү (5.1-таблица).

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"
Жазуулар келгенде, биз ошол эле ачкыч менен учурдагы сеанстарды издейбиз, аяктоо убактысы учурдагы дата/убакыт штампынан азыраак - аракетсиздик аралыгы жана учурдагы дата/убакыт штампынан + аракетсиздик интервалынан чоңураак башталуу убактысы. Муну эске алып, таблицадан төрт жазуу. 5.1 төмөнкүдөй бир сессияга бириктирилет.

1. 1-жазма биринчи келет, андыктан башталуу убактысы аяктоо убактысына барабар жана 00:00:00.

2. Андан кийин, 2-киргизүү келип, биз 23:59:55тен эрте бүтпөгөн жана 00:00:35тен кеч эмес башталчу сессияларды издейбиз. Биз 1 рекордду таап, 1 жана 2-сеанстарды бириктиребиз. Жаңы сессиябыз саат 1:2:00дө башталып, 00дө бүтүшү үчүн 00-сеанстын башталуу убактысын (мурда) жана 00-сеанстын аяктоо убактысын (кийин) алабыз: 00:15.

3. Рекорд 3 келет, биз 00:00:30 жана 00:01:10 ортосундагы сессияларды издеп жатабыз жана эч кимди таба албайбыз. 123-345-654,FFBE ачкычынын экинчи сессиясын кошуңуз, саат 00:00:50дө башталып, аяктайт.

4. Рекорд 4 келип, биз 23:59:45 жана 00:00:25 ортосундагы сессияларды издеп жатабыз. Бул жолу 1 жана 2 сеанстары тең табылды. Үч сессия тең бирге бириктирилип, башталышы 00:00:00 жана аяктоо убактысы 00:00:15.

Бул бөлүмдө сүрөттөлгөн нерседен, төмөнкү маанилүү нюанстарды эстен чыгарбоо керек:

  • сессиялар белгиленген өлчөмдөгү терезелер эмес. Сессиянын узактыгы белгилүү бир убакыттын ичиндеги ишмердүүлүк менен аныкталат;
  • Берилиштердеги дата/убакыт мөөрлөрү окуянын учурдагы сессияга же бош турган мезгилге туура келгенин аныктайт.

Андан кийин биз терезенин кийинки түрүн талкуулайбыз - "төңкөрүлгөн" терезелер.

Терезелер "төлгөн"

Айланган терезелер белгилүү бир убакыттын ичинде болгон окуяларды чагылдырат. Ар бир 20 секунд сайын белгилүү бир компаниянын бардык акцияларын басып турушуңуз керек деп элестетип көрүңүз, андыктан ошол убакыт аралыгындагы бардык окуяларды чогултасыз. 20 секунддук интервал аяктагандан кийин терезе оодарылып, жаңы 20 секунддук байкоо аралыгына өтөт. 5.14-сүрөт бул жагдайды көрсөтөт.

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"
Көрүнүп тургандай, акыркы 20 секунданын ичинде алынган бардык окуялар терезеге киргизилген. Бул убакыттын аягында жаңы терезе түзүлөт.

Листинг 5.6 ар бир 20 секунд сайын биржа транзакцияларын тартуу үчүн бурмаланган терезелерди колдонууну көрсөткөн кодду көрсөтөт (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.javaдан табылган).

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"
TimeWindows.of ыкмасын чакырууга бул кичинекей өзгөртүү менен, сиз бурмаланган терезени колдоно аласыз. Бул мисал чейин() ыкмасын чакырбайт, андыктан демейки 24 сааттык сактоо аралыгы колдонулат.

Акыр-аягы, терезе параметрлеринин акыркысына өтүүгө убакыт келди - "секирүү" терезелер.

Жылдырма («секирүүчү») терезелер

Жылдыруучу/секирүү терезелер кулап түшкөн терезелерге окшош, бирок бир аз айырмасы бар. Жылдыруучу терезелер акыркы окуяларды иштетүү үчүн жаңы терезе түзүүдөн мурун убакыт аралыгынын аягына чейин күтүшпөйт. Алар жаңы эсептөөлөрдү терезенин узактыгынан азыраак күтүү аралыгынан кийин башташат.

Терезелердин секирүү жана секирүү ортосундагы айырмачылыктарды көрсөтүү үчүн, биржа операцияларын эсептөөнүн мисалына кайрылып көрөлү. Биздин максат дагы эле транзакциялардын санын эсептөө, бирок эсептегичти жаңыртуудан мурун бүткүл убакытты күткүбүз келбейт. Анын ордуна, биз эсептегичти кыска аралыкта жаңыртабыз. Мисалы, биз дагы эле транзакциялардын санын ар бир 20 секунд сайын санайбыз, бирок ар бир 5 секунд сайын эсептегичти жаңыртып турабыз. 5.15. Бул учурда, биз кайталанган маалыматтар менен үч натыйжа терезелери менен аяктайт.

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"
Листинг 5.7 жылма терезелерди аныктоо үчүн кодду көрсөтөт (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.javaдан табылган).

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"
Айлануучу терезени advanceBy() ыкмасына чалуу кошуу менен секирүү терезесине айландырса болот. Көрсөтүлгөн мисалда, сактоо аралыгы 15 мүнөт.

Сиз бул бөлүмдө топтоо натыйжаларын убакыт терезелерине кантип чектөөнү көрдүңүз. Айрыкча, бул бөлүмдөн төмөнкү үч нерсени эстеп калышыңарды каалайм:

  • сеанс терезелеринин өлчөмү убакыт аралыгы менен эмес, колдонуучунун активдүүлүгү менен чектелет;
  • "Төңкөрүш" терезелери белгилүү бир убакыттын ичинде болгон окуяларга сереп салат;
  • Секирүү терезелеринин узактыгы белгиленген, бирок алар тез-тез жаңыланып турат жана бардык терезелерде бири-бирин кайталаган жазууларды камтышы мүмкүн.

Андан кийин, биз туташуу үчүн KTableди кайра KStreamге кантип айландырууну үйрөнөбүз.

5.3.3. KStream жана KTable объекттерин туташтыруу

4-бөлүмдө биз эки KStream объектин бириктирүүнү талкууладык. Эми биз KTable менен KStreamди кантип туташтырууну үйрөнүшүбүз керек. Бул төмөнкү жөнөкөй себептерден улам керек болушу мүмкүн. KStream — бул жазуулардын агымы, ал эми KTable — рекорддук жаңыртуулардын агымы, бирок кээде сиз KTable жаңыртууларынын жардамы менен жазуу агымына кошумча контекстти кошкуңуз келиши мүмкүн.

Биржа операцияларынын саны боюнча маалыматтарды алып, аларды тиешелүү тармактар ​​үчүн биржа жаңылыктары менен айкалыштыралы. Сизде мурунтан эле бар кодду эске алуу менен, буга жетишүү үчүн эмне кылышыңыз керек.

  1. Биржа транзакцияларынын саны жөнүндө маалыматтар менен KTable объектисин KStreamге айландырыңыз, андан кийин ачкычты ушул акциянын символуна туура келген тармакты көрсөтүүчү ачкыч менен алмаштырыңыз.
  2. Биржа жаңылыктары менен темадагы маалыматтарды окуй турган KTable объектин түзүңүз. Бул жаңы KTable өнөр жай сектору боюнча категорияланат.
  3. Өнөр жай сектору боюнча биржа операцияларынын саны жөнүндө маалымат менен жаңылык жаңыртууларын байланыштырыңыз.

Эми бул иш планды кантип ишке ашырууга болорун карап көрөлү.

KTableди KStreamге айландыруу

KTableди KStreamге айландыруу үчүн сиз төмөнкүлөрдү кылышыңыз керек.

  1. KTable.toStream() ыкмасына чалыңыз.
  2. KStream.map ыкмасын чакыруу менен, ачкычты тармактын аталышы менен алмаштырып, андан кийин Windowed инстанциясынан TransactionSummary объектисин чыгарып алыңыз.

Биз бул операцияларды төмөнкүдөй бириктиребиз (кодду src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java файлынан тапса болот) (5.8 Листинг).

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"
Биз KStream.map операциясын аткарып жаткандыктан, кайтарылган KStream инстанциясы туташууда колдонулганда автоматтык түрдө кайра бөлүнөт.

Биз конверсия процессин аяктадык, андан кийин биржа жаңылыктарын окуу үчүн KTable объектин түзүшүбүз керек.

Биржа жаңылыктары үчүн KTable түзүү

Бактыга жараша, KTable объектисин түзүү коддун бир эле сабын алат (кодду src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.javaдан тапса болот) (5.9 Листинг).

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"
Белгилей кетчү нерсе, эч кандай Serde объектилерин көрсөтүү талап кылынбайт, анткени орнотууларда Serdes саптары колдонулат. Ошондой эле, EARLIEST санаууну колдонуу менен, таблица эң башында жазуулар менен толтурулат.

Эми биз акыркы кадамга өтсө болот - байланыш.

Жаңылыктар жаңыртууларын транзакциялардын саны менен байланыштыруу

Байланыш түзүү кыйын эмес. Тиешелүү тармак үчүн биржа жаңылыктары жок болгон учурда солго кошулууну колдонобуз (керектүү кодду src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java файлынан тапса болот) (Листинг 5.10).

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"
Бул leftJoin оператору абдан жөнөкөй. 4-бөлүмдөгү кошулуулардан айырмаланып, JoinWindow ыкмасы колдонулбайт, анткени KStream-KTable кошулуусун аткарганда, ар бир ачкыч үчүн KTableде бир гана жазуу бар. Мындай байланыш убакыт менен чектелбейт: жазуу KTableде же жок. Негизги тыянак: KTable объекттерин колдонуу менен сиз KStreamди азыраак жаңылануучу маалымдама маалыматтары менен байыта аласыз.

Эми биз KStreamден окуяларды байытуунун натыйжалуу жолун карап чыгабыз.

5.3.4. GlobalKTable объекттери

Көрүнүп тургандай, окуя агымдарын байытуу же аларга контекстти кошуу зарылчылыгы бар. 4-бөлүмдө сиз эки KStream объектинин ортосундагы байланыштарды көрдүңүз, ал эми мурунку бөлүмдө сиз KStream менен KTable ортосундагы байланышты көрдүңүз. Бул бардык учурларда, ачкычтарды жаңы түргө же мааниге түшүрүүдө маалымат агымын кайра бөлүү зарыл. Кээде бөлүү ачык жасалат, кээде Kafka Streams муну автоматтык түрдө жасайт. Кайра бөлүү зарыл, анткени ачкычтар өзгөргөн жана жазуулар жаңы бөлүмдөр менен аякташы керек, антпесе, байланыш мүмкүн болбой калат (бул 4-главада, 4.2.4-пунктчадагы “Маалыматтарды кайра бөлүү” бөлүмүндө талкууланган).

Кайра бөлүүнүн баасы бар

Кайра бөлүү чыгымдарды талап кылат - аралык темаларды түзүүгө, башка темада кайталанган маалыматтарды сактоого кошумча ресурстук чыгымдар; ошондой эле бул теманы жазуу жана окуу менен байланыштуу көбөйгөн кечиктирүүнү билдирет. Кошумчалай кетсек, эгер сиз бир нече аспект же өлчөм боюнча кошулушуңуз керек болсо, сиз бириктирүүлөрдү чынжырлап, жаңы ачкычтар менен жазууларды картага түшүрүп, кайра бөлүү процессин кайра иштетишиңиз керек.

Кичирээк маалымат топтомуна туташуу

Кээ бир учурларда, туташууга тийиш болгон маалымдама маалыматтарынын көлөмү салыштырмалуу аз, андыктан анын толук көчүрмөлөрү ар бир түйүнгө локалдык түрдө оңой туура келет. Ушул сыяктуу жагдайлар үчүн Kafka Streams GlobalKTable классын камсыз кылат.

GlobalKTable инстанциялары уникалдуу болуп саналат, анткени колдонмо бардык маалыматтарды түйүндөргө кайталайт. Жана бардык маалыматтар ар бир түйүндө болгондуктан, окуя агымын маалымдама маалымат ачкычы менен бөлүүнүн кереги жок, ошондуктан ал бардык бөлүмдөр үчүн жеткиликтүү болот. Ошондой эле GlobalKTable объекттерин колдонуу менен ачкычсыз кошулууларды жасай аласыз. Бул өзгөчөлүктү көрсөтүү үчүн мурунку мисалдардын бирине кайрылалы.

KStream объекттерин GlobalKTable объекттерине туташтыруу

5.3.2-бөлүмдө биз сатып алуучулар тарабынан алмашуу операцияларын терезеден топтоо жүргүздүк. Бул топтоонун натыйжалары төмөнкүдөй болду:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Бул жыйынтыктар максатка жооп бергени менен, кардардын аты-жөнү жана компаниянын толук аты да көрсөтүлсө пайдалуураак болмок. Кардардын атын жана компаниянын атын кошуу үчүн, сиз кадимки кошулмаларды аткарсаңыз болот, бирок сиз эки негизги картаны жана кайра бөлүүнү аткарышыңыз керек. GlobalKTable менен сиз мындай операциялардын баасын кача аласыз.

Бул үчүн, 5.11 Листингинен countStream объектисин колдонобуз (тиешелүү кодду src/main/java/bbejeck/chapter_5/GlobalKTableExample.javaдан тапса болот) жана аны эки GlobalKTable объектисине туташтырабыз.

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"
Биз буга чейин бул жөнүндө сүйлөшкөнбүз, ошондуктан мен аны кайталабайм. Бирок окууга ыңгайлуу болуу үчүн toStream().map функциясындагы код саптагы лямбда туюнтмасынын ордуна функция объектисине абстракцияланганын белгилеймин.

Кийинки кадам GlobalKTableнин эки нускасын жарыялоо (көрсөтүлгөн кодду src/main/java/bbejeck/chapter_5/GlobalKTableExample.java файлынан тапса болот) (5.12 Листинг).

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"

Сураныч, теманын аталыштары саналып өткөн түрлөрүн колдонуу менен сүрөттөлөт.

Эми бизде бардык компоненттер даяр болгондуктан, туташуунун кодун жазуу гана калды (аны src/main/java/bbejeck/chapter_5/GlobalKTableExample.java файлынан тапса болот) (5.13 тизмеси).

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"
Бул коддо эки кошулуу бар болсо да, алар чынжырланган, анткени алардын натыйжаларынын бири да өзүнчө колдонулбайт. Натыйжалар бүт операциянын аягында көрсөтүлөт.

Жогорудагы кошулуу операциясын иштеткенде, сиз төмөнкүдөй жыйынтыктарды аласыз:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Маңызы өзгөргөн жок, бирок бул жыйынтыктар дагы ачык көрүнөт.

Эгерде сиз 4-бөлүмгө чейин санасаңыз, сиз буга чейин байланыштын бир нече түрүн иш жүзүндө көргөнсүз. Алар таблицада келтирилген. 5.2. Бул таблица Kafka Streams 1.0.0 версиясындагы байланыш мүмкүнчүлүктөрүн чагылдырат; Келечектеги чыгарылыштарда бир нерсе өзгөрүшү мүмкүн.

«Кафка агымдары аракетте. Реалдуу убакытта иштөө үчүн тиркемелер жана микросервистер"
Ишти жыйынтыктоо үчүн, келгиле, негизги нерселерди кайталап көрөлү: жергиликтүү абалды колдонуп, окуя агымдарын (KStream) туташтыра аласыз жана агымдарды (KTable) жаңырта аласыз. Же болбосо, маалымдама маалыматтарынын өлчөмү өтө чоң болбосо, GlobalKTable объектисин колдонсоңуз болот. GlobalKTables бардык бөлүмдөрдү ар бир Kafka Streams тиркеме түйүнүнө кайталап, ачкыч кайсы бөлүмгө туура келгенине карабастан, бардык маалыматтар жеткиликтүү болушун камсыздайт.

Андан кийин биз Кафка агымдарынын өзгөчөлүгүн көрөбүз, анын аркасында биз Кафка темасындагы маалыматтарды колдонбостон абалдын өзгөрүшүн байкай алабыз.

5.3.5. Суралуучу абал

Биз буга чейин абалды камтыган бир нече операцияларды аткарганбыз жана натыйжаларды ар дайым консолго чыгарабыз (иштеп чыгуу максатында) же аларды темага жазабыз (өндүрүштүк максаттар үчүн). Темага жыйынтыктарды жазып жатканда, аларды көрүү үчүн Кафка керектөөчүсүн колдонушуңуз керек.

Бул темалардан маалыматтарды окуу материалдашкан көз караштардын бир түрү катары каралышы мүмкүн. Биздин максаттар үчүн биз Википедиядан материалдашкан көрүнүштүн аныктамасын колдоно алабыз: “...суроонун натыйжаларын камтыган физикалык маалымат базасынын объектиси. Мисалы, бул алыскы маалыматтардын локалдык көчүрмөсү, же таблицанын же кошулуунун натыйжаларынын саптарынын жана/же мамычаларынын чакан жыйындысы же топтоо аркылуу алынган жыйынды таблица болушу мүмкүн» (https://en.wikipedia.org/wiki) /Материалданган_көрүү).

Kafka Streams ошондой эле мамлекеттик дүкөндөрдө интерактивдүү сурамдарды жүргүзүүгө мүмкүндүк берет, бул материалдашкан көрүнүштөрдү түздөн-түз окууга мүмкүндүк берет. Мамлекеттик дүкөнгө суроо-талап окуу үчүн гана операция экенин белгилей кетүү маанилүү. Бул сиздин колдонмоңуз дайындарды иштеп жаткан учурда кокусунан абалды ыраатсыз кылып алуудан кабатыр болбоону камсыздайт.

Мамлекеттик дүкөндөрдө түздөн-түз суроо мүмкүнчүлүгү маанилүү. Бул Кафка керектөөчүсүнөн маалыматтарды алгач ала албай туруп, башкаруу панелинин тиркемелерин түзө аласыз дегенди билдирет. Ал ошондой эле маалыматтарды кайра жазуунун кереги жок болгондуктан, колдонмонун натыйжалуулугун жогорулатат:

  • маалыматтардын локалдуулугунун аркасында аларга тез жетүүгө болот;
  • маалыматтардын кайталанышы жок кылынат, анткени алар тышкы сактагычка жазылбайт.

Эсиңизде болсун деген эң негизги нерсе, сиз түздөн-түз колдонмоңуздан абалды сурай аласыз. Бул сизге берген мүмкүнчүлүктөрдү баалоого болбойт. Кафкадан алынган маалыматтарды керектөөнүн жана тиркеме үчүн маалымат базасында жазууларды сактоонун ордуна, сиз ошол эле натыйжа менен мамлекеттик дүкөндөрдү сурасаңыз болот. Мамлекеттик дүкөндөргө түз суроо азыраак кодду (керектөөчү жок) жана азыраак программалык камсыздоону билдирет (жыйынтыктарды сактоо үчүн маалымат базасынын таблицасынын кереги жок).

Бул бөлүмдө биз бир топ жерди карадык, андыктан биз мамлекеттик дүкөндөргө каршы интерактивдүү суроолорду талкуулоону азырынча калтырабыз. Бирок кабатыр болбоңуз: 9-бөлүмдө биз интерактивдүү сурамдар менен жөнөкөй панелдин тиркемесин түзөбүз. Ал интерактивдүү сурамдарды жана аларды Kafka Streams тиркемелерине кантип кошууга болорун көрсөтүү үчүн ушул жана мурунку бөлүмдөрдүн айрым мисалдарын колдонот.

на

  • KStream объекттери маалымат базасына киргизүү менен салыштырууга боло турган окуялардын агымын билдирет. KTable объекттери маалымат базасынын жаңыртуулары сыяктуу жаңыртуу агымдарын билдирет. KTable объектинин көлөмү өспөйт, эски жазуулар жаңылары менен алмаштырылат.
  • KTable объекттери бириктирүү операциялары үчүн талап кылынат.
  • Терезе операцияларын колдонуу менен сиз топтолгон маалыматтарды убакыт чакаларына бөлсөңүз болот.
  • GlobalKTable объекттеринин аркасында, бөлүүгө карабастан, колдонмонун каалаган жеринен маалымдама маалыматтарына кире аласыз.
  • KStream, KTable жана GlobalKTable объекттеринин ортосундагы байланыштар мүмкүн.

Буга чейин биз жогорку деңгээлдеги KStream DSLди колдонуу менен Kafka Streams тиркемелерин түзүүгө басым жасадык. Жогорку деңгээлдеги ыкма тыкан жана кыска программаларды түзүүгө мүмкүндүк берет да, аны колдонуу соодалашууну билдирет. DSL KStream менен иштөө башкаруу даражасын азайтуу аркылуу кодуңуздун кыскалыгын жогорулатууну билдирет. Кийинки бөлүмдө биз төмөнкү деңгээлдеги иштеткич түйүн API'син карап чыгабыз жана башка соодалашууну сынап көрөбүз. Программалар мурункуга караганда узунураак болот, бирок биз муктаж болгон бардык иштеткич түйүндөрдү түзө алабыз.

→ Китеп тууралуу кененирээк бул жерден тапса болот чыгаруучунун веб-сайты

→ Habrozhiteli үчүн купонду колдонуу менен 25% арзандатуу - Кафка агымдары

→ Китептин кагаз түрүндөгү версиясы үчүн төлөм жүргүзүлгөндө электрондук китеп электрондук почта аркылуу жөнөтүлөт.

Source: www.habr.com

Комментарий кошуу