O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real" Ola, veciños de Khabro! Este libro é axeitado para calquera programador que queira comprender o procesamento de fíos. A comprensión da programación distribuída axudarache a comprender mellor Kafka e Kafka Streams. Sería bo coñecer o propio marco de Kafka, pero isto non é necesario: contarei todo o que necesites. Os desenvolvedores experimentados de Kafka e os novatos aprenderán neste libro a crear aplicacións de procesamento de fluxos interesantes utilizando a biblioteca de Kafka Streams. Os desenvolvedores de Java intermedios e avanzados que xa están familiarizados con conceptos como a serialización aprenderán a aplicar as súas habilidades para crear aplicacións Kafka Streams. O código fonte do libro está escrito en Java 8 e fai un uso significativo da sintaxe de expresión lambda de Java 8, polo que saber traballar con funcións lambda (mesmo noutra linguaxe de programación) será moi útil.

Fragmento. 5.3. Operacións de agregación e fiestras

Nesta sección, imos explorar as partes máis prometedoras de Kafka Streams. Ata agora cubrimos os seguintes aspectos de Kafka Streams:

  • creando unha topoloxía de procesamento;
  • uso do estado en aplicacións de streaming;
  • realizar conexións de fluxo de datos;
  • diferenzas entre fluxos de eventos (KStream) e fluxos de actualización (KTable).

Nos seguintes exemplos reuniremos todos estes elementos. Tamén aprenderás sobre as fiestras, outra gran característica das aplicacións de streaming. O noso primeiro exemplo será unha simple agregación.

5.3.1. Agregación das vendas de stock por sectores industriais

A agregación e a agrupación son ferramentas vitais cando se traballa con datos en tempo real. O exame dos rexistros individuais a medida que se reciben adoita ser insuficiente. Para extraer información adicional dos datos, é necesario agrupalos e combinalos.

Neste exemplo, vestirás o traxe dun comerciante de día que necesita facer un seguimento do volume de vendas das accións de empresas de varias industrias. En concreto, estás interesado nas cinco empresas con maiores vendas de accións en cada sector.

Tal agregación requirirá os seguintes pasos para traducir os datos á forma desexada (falando en termos xerais).

  1. Crea unha fonte baseada en temas que publique información de negociación de accións en bruto. Teremos que mapear un obxecto de tipo StockTransaction a un obxecto de tipo ShareVolume. A cuestión é que o obxecto StockTransaction contén metadatos de vendas, pero só necesitamos datos sobre o número de accións que se venden.
  2. Agrupar datos de volume por símbolo de accións. Unha vez agrupados por símbolo, pode contraer estes datos en subtotais de volumes de vendas de accións. Paga a pena notar que o método KStream.groupBy devolve unha instancia do tipo KGroupedStream. E podes obter unha instancia de KTable chamando ademais ao método KGroupedStream.reduce.

Que é a interface KGroupedStream

Os métodos KStream.groupBy e KStream.groupByKey devolven unha instancia de KGroupedStream. KGroupedStream é unha representación intermedia dun fluxo de eventos despois da agrupación por claves. Non está en absoluto destinado ao traballo directo con el. En cambio, KGroupedStream úsase para operacións de agregación, que sempre dan como resultado unha KTable. E dado que o resultado das operacións de agregación é unha KTable e usan unha tenda de estado, é posible que non todas as actualizacións como resultado se envíen máis abaixo na canalización.

O método KTable.groupBy devolve unha KGroupedTable similar: unha representación intermedia do fluxo de actualizacións, reagrupadas por chave.

Fagamos un pequeno descanso e vexamos a Fig. 5.9, que mostra o que conseguimos. Esta topoloxía xa debería resultarche moi familiar.

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"
Vexamos agora o código desta topoloxía (pódese atopar no ficheiro src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listado 5.2).

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"
O código dado distínguese pola súa brevidade e o gran volume de accións realizadas en varias liñas. Podes notar algo novo no primeiro parámetro do método builder.stream: un valor do tipo de enumeración AutoOffsetReset.EARLIEST (tamén hai un LATEST), definido mediante o método Consumed.withOffsetResetPolicy. Este tipo de enumeración pódese usar para especificar unha estratexia de restablecemento de compensación para cada KStream ou KTable e ten prioridade sobre a opción de restablecemento de compensación da configuración.

GroupByKey e GroupBy

A interface KStream ten dous métodos para agrupar rexistros: GroupByKey e GroupBy. Ambos devolven unha KGroupedTable, polo que pode estarse preguntando cal é a diferenza entre eles e cando usar cal?

O método GroupByKey úsase cando as chaves do KStream xa non están baleiras. E o máis importante, nunca se estableceu a bandeira "require re-particionamento".

O método GroupBy asume que cambiou as claves de agrupación, polo que a marca de repartición está definida como verdadeira. A realización de unións, agregacións, etc. despois do método GroupBy dará lugar a unha nova partición automática.
Resumo: sempre que sexa posible, debería usar GroupByKey en lugar de GroupBy.

Está claro o que fan os métodos mapValues ​​e groupBy, así que vexamos o método sum() (que se atopa en src/main/java/bbejeck/model/ShareVolume.java) (Listado 5.3).

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"
O método ShareVolume.sum devolve o total acumulado do volume de vendas de accións e o resultado de toda a cadea de cálculos é un obxecto KTable . Agora comprendes o papel que xoga KTable. Cando chegan obxectos ShareVolume, o obxecto KTable correspondente almacena a última actualización actual. É importante lembrar que todas as actualizacións están reflectidas no shareVolumeKTable anterior, pero non todas se envían máis adiante.

Despois usamos esta KTable para agregar (por número de accións negociadas) para chegar ás cinco empresas con maior volume de accións negociadas en cada industria. As nosas accións neste caso serán similares ás da primeira agregación.

  1. Realice outra operación groupBy para agrupar obxectos ShareVolume individuais por sector.
  2. Comeza a resumir obxectos ShareVolume. Esta vez o obxecto de agregación é unha cola de prioridade de tamaño fixo. Nesta cola de tamaño fixo só se conservan as cinco empresas con maiores cantidades de accións vendidas.
  3. Asigne as filas do parágrafo anterior a un valor de cadea e devolve as cinco accións máis negociadas por número por sector.
  4. Escribe os resultados en forma de cadea para o tema.

Na Fig. A figura 5.10 mostra o gráfico de topoloxía do fluxo de datos. Como podes ver, a segunda rolda de procesamento é bastante sinxela.

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"
Agora que temos unha comprensión clara da estrutura desta segunda rolda de procesamento, podemos recorrer ao seu código fonte (atópao no ficheiro src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listado 5.4) .

Este inicializador contén unha variable fixedQueue. Este é un obxecto personalizado que é un adaptador para java.util.TreeSet que se usa para rastrexar os N principais resultados en orde descendente de accións negociadas.

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"
Xa viches as chamadas groupBy e mapValues, polo que non entraremos nelas (chamamos ao método KTable.toStream porque o método KTable.print está obsoleto). Pero aínda non viches a versión KTable de aggregate() , así que dedicaremos un pouco a discutilo.

Como lembras, o que fai diferente a KTable é que os rexistros coas mesmas claves considéranse actualizacións. KTable substitúe a entrada antiga por unha nova. A agregación prodúcese dun xeito similar: os últimos rexistros coa mesma clave son agregados. Cando chega un rexistro, engádese á instancia da clase FixedSizePriorityQueue mediante un sumador (segundo parámetro na chamada ao método agregado), pero se xa existe outro rexistro coa mesma clave, entón o rexistro antigo elimínase mediante un subtractor (terceiro parámetro en chamada ao método agregado).

Todo isto significa que o noso agregador, FixedSizePriorityQueue, non agrega todos os valores cunha soa clave, senón que almacena unha suma móbil das cantidades dos N tipos de accións máis negociados. Cada entrada entrante contén o número total de accións vendidas ata o momento. KTable darache información sobre cales son as accións das empresas que máis se negocian actualmente, sen esixir unha agregación continua de cada actualización.

Aprendemos a facer dúas cousas importantes:

  • agrupa os valores en KTable por unha chave común;
  • realizar operacións útiles como acumulación e agregación nestes valores agrupados.

Saber como realizar estas operacións é importante para comprender o significado dos datos que se moven a través dunha aplicación Kafka Streams e comprender que información contén.

Tamén reunimos algúns dos conceptos clave comentados anteriormente neste libro. No capítulo 4, discutimos como é importante o estado local tolerante a fallos para unha aplicación de streaming. O primeiro exemplo deste capítulo demostrou por que o estado local é tan importante: dáche a posibilidade de facer un seguimento da información que xa viu. O acceso local evita atrasos na rede, facendo que a aplicación sexa máis eficiente e resistente aos erros.

Ao realizar calquera operación de acumulación ou agregación, debes especificar o nome da tenda estatal. As operacións de acumulación e agregación devolven unha instancia de KTable e KTable usa o almacenamento de estado para substituír os resultados antigos por outros novos. Como viches, non todas as actualizacións se envían polo pipeline, e isto é importante porque as operacións de agregación están deseñadas para producir información resumida. Se non aplicas o estado local, KTable reenviará todos os resultados de agregación e acumulación.

A continuación, analizaremos a realización de operacións como a agregación nun período de tempo específico - as chamadas operacións de fiestras.

5.3.2. Operacións de ventá

Na sección anterior, introducimos a convolución deslizante e a agregación. A aplicación realizou unha acumulación continua do volume de vendas de accións, seguida da agregación das cinco accións máis negociadas na bolsa.

Ás veces é necesaria esa agregación continua e acumulación de resultados. E ás veces cómpre realizar operacións só durante un período de tempo determinado. Por exemplo, calcula cantas transaccións de cambio se realizaron con accións dunha determinada empresa nos últimos 10 minutos. Ou cantos usuarios fixeron clic nun novo banner publicitario nos últimos 15 minutos. Unha aplicación pode realizar tales operacións varias veces, pero con resultados que só se aplican a períodos de tempo especificados (xanelas de tempo).

Contando as transaccións de cambio por parte do comprador

No seguinte exemplo, faremos un seguimento das transaccións de accións de varios comerciantes, xa sexan grandes organizacións ou financeiros individuais intelixentes.

Hai dous posibles motivos para este seguimento. Un deles é a necesidade de saber que mercan/venden os líderes do mercado. Se estes grandes xogadores e investimentos sofisticados ven oportunidades, ten sentido seguir a súa estratexia. A segunda razón é o desexo de detectar posibles signos de información privilegiada ilegal. Para iso, terás que analizar a correlación dos grandes picos de vendas con notas de prensa importantes.

Este seguimento consta dos seguintes pasos:

  • crear un fluxo para ler a partir do tema de transaccións de accións;
  • agrupando os rexistros entrantes por ID do comprador e símbolo de accións. Ao chamar ao método groupBy devolve unha instancia da clase KGroupedStream;
  • O método KGroupedStream.windowedBy devolve un fluxo de datos limitado a unha xanela de tempo, o que permite a agregación por ventás. Dependendo do tipo de xanela, devólvese un TimeWindowedKStream ou SessionWindowedKStream;
  • conta de transaccións para a operación de agregación. O fluxo de datos en ventás determina se se ten en conta un rexistro particular neste reconto;
  • escribir resultados nun tema ou saíndoos á consola durante o desenvolvemento.

A topoloxía desta aplicación é sinxela, pero sería útil unha imaxe clara dela. Vexamos a Fig. 5.11.

A continuación, analizaremos a funcionalidade das operacións da xanela e o código correspondente.

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"

Tipos de fiestras

Hai tres tipos de ventás en Kafka Streams:

  • de sesión;
  • “bambalear” (tombalear);
  • deslizando/saltando.

Cal elixir depende das necesidades da túa empresa. As ventás de caída e salto teñen un tempo limitado, mentres que as fiestras de sesión están limitadas pola actividade do usuario; a duración das sesións determínase unicamente polo nivel de actividade do usuario. O principal que hai que lembrar é que todos os tipos de ventás baséanse nos selos de data/hora das entradas, non na hora do sistema.

A continuación, implementamos a nosa topoloxía con cada un dos tipos de xanela. O código completo darase só no primeiro exemplo; para outros tipos de fiestras non cambiará nada excepto o tipo de operación da fiestra.

Fiestras de sesión

As fiestras de sesión son moi diferentes de todos os outros tipos de ventás. Están limitados non tanto polo tempo como pola actividade do usuario (ou a actividade da entidade que desexa rastrexar). As ventás das sesións están delimitadas por períodos de inactividade.

A figura 5.12 ilustra o concepto de ventás de sesión. A sesión máis pequena fusionarase coa sesión á súa esquerda. E a sesión da dereita será separada porque segue un longo período de inactividade. As fiestras de sesión baséanse na actividade do usuario, pero usa selos de data/hora das entradas para determinar a que sesión pertence a entrada.

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"

Usando fiestras de sesión para rastrexar as transaccións de accións

Usemos as fiestras de sesión para capturar información sobre as transaccións de intercambio. A implementación das fiestras de sesión móstrase no Listado 5.5 (que se pode atopar en src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"
Xa viu a maioría das operacións desta topoloxía, polo que non é necesario volver a miralas aquí. Pero tamén hai varios elementos novos aquí, que agora comentaremos.

Calquera operación groupBy normalmente realiza algún tipo de operación de agregación (agregación, acumulación ou reconto). Podes realizar unha agregación acumulativa cun total acumulado ou unha agregación de ventás, que ten en conta os rexistros dentro dunha xanela de tempo especificada.

O código do Listado 5.5 conta o número de transaccións dentro das ventás de sesión. Na Fig. 5.13 Estas accións analízanse paso a paso.

Ao chamar a windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) creamos unha xanela de sesión cun intervalo de inactividade de 20 segundos e un intervalo de persistencia de 15 minutos. Un intervalo de inactividade de 20 segundos significa que a aplicación incluirá calquera entrada que chegue dentro dos 20 segundos seguintes ao final ou ao comezo da sesión actual á sesión (activa).

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"
A continuación, especificamos que operación de agregación debe realizarse na xanela da sesión; neste caso, conta. Se unha entrada entrante queda fóra da xanela de inactividade (a calquera lado do selo de data/hora), a aplicación crea unha nova sesión. O intervalo de retención significa manter unha sesión durante un período de tempo determinado e permite datos atrasados ​​que se estenden máis aló do período de inactividade da sesión pero que aínda se poden anexar. Ademais, o inicio e o final da nova sesión resultante da combinación corresponden ao selo de data/hora máis antigo e máis recente.

Vexamos algunhas entradas do método de reconto para ver como funcionan as sesións (táboa 5.1).

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"
Cando chegan os rexistros, buscamos sesións existentes coa mesma clave, unha hora de finalización menor que a marca de data/hora actual - intervalo de inactividade e unha hora de inicio superior á marca de data/hora actual + intervalo de inactividade. Tendo en conta isto, catro entradas da táboa. 5.1 únense nunha única sesión do seguinte xeito.

1. O rexistro 1 chega primeiro, polo que a hora de inicio é igual á hora de finalización e é as 00:00:00.

2. A continuación, chega a entrada 2, e buscamos sesións que rematen non antes das 23:59:55 e que comecen non máis tarde das 00:00:35. Atopamos o rexistro 1 e combinamos as sesións 1 e 2. Tomamos a hora de inicio da sesión 1 (anterior) e a hora de finalización da sesión 2 (máis tarde), de xeito que a nosa nova sesión comece ás 00:00:00 e remate ás 00:00: 15:XNUMX.

3. Chega o rexistro 3, buscamos sesións entre as 00:00:30 e as 00:01:10 e non atopamos ningunha. Engade unha segunda sesión para a clave 123-345-654,FFBE, comezando e rematando ás 00:00:50.

4. Chega o rexistro 4 e buscamos sesións entre as 23:59:45 e as 00:00:25. Nesta ocasión atópanse as dúas sesións 1 e 2. As tres sesións combínanse nunha soa, cunha hora de inicio de 00:00:00 e unha hora de finalización de 00:00:15.

Polo que se describe nesta sección, convén lembrar os seguintes matices importantes:

  • as sesións non son ventás de tamaño fixo. A duración dunha sesión está determinada pola actividade nun período de tempo determinado;
  • Os selos de data/hora dos datos determinan se o evento se sitúa nunha sesión existente ou durante un período de inactividade.

A continuación discutiremos o seguinte tipo de fiestra: as fiestras "caídas".

Fiestras "Tumbling".

As fiestras que caen capturan eventos que caen nun período de tempo determinado. Imaxina que necesitas capturar todas as transaccións de accións dunha determinada empresa cada 20 segundos, polo que recolles todos os eventos durante ese período de tempo. Ao final do intervalo de 20 segundos, a xanela envólvese e pasa a un novo intervalo de observación de 20 segundos. A figura 5.14 ilustra esta situación.

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"
Como podes ver, todos os eventos recibidos nos últimos 20 segundos están incluídos na xanela. Ao final deste período de tempo, créase unha nova xanela.

O Listado 5.6 mostra o código que demostra o uso das ventás rotativas para capturar transaccións de accións cada 20 segundos (que se atopa en src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"
Con este pequeno cambio na chamada ao método TimeWindows.of, podes usar unha xanela de caída. Este exemplo non chama ao método until(), polo que se utilizará o intervalo de retención predeterminado de 24 horas.

Finalmente, é hora de pasar á última das opcións da xanela: as fiestras "saltando".

Fiestras corredizas ("saltar")

As fiestras deslizantes/saltantes son semellantes ás ventás que caen, pero cunha lixeira diferenza. As fiestras deslizantes non esperan ata o final do intervalo de tempo antes de crear unha nova ventá para procesar eventos recentes. Comezan novos cálculos despois dun intervalo de espera inferior á duración da xanela.

Para ilustrar as diferenzas entre as fiestras caer e saltar, volvamos ao exemplo de contar transaccións en bolsa. O noso obxectivo aínda é contar o número de transaccións, pero non queremos esperar todo o tempo antes de actualizar o contador. Pola contra, actualizaremos o contador a intervalos máis curtos. Por exemplo, aínda contaremos o número de transaccións cada 20 segundos, pero actualizaremos o contador cada 5 segundos, como se mostra na Fig. 5.15. Neste caso, acabamos con tres ventás de resultados con datos superpostos.

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"
O Listado 5.7 mostra o código para definir fiestras deslizantes (que se atopa en src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"
Unha xanela de salto pódese converter nunha xanela de salto engadindo unha chamada ao método advanceBy(). No exemplo mostrado, o intervalo de gardado é de 15 minutos.

Viches nesta sección como limitar os resultados da agregación a xanelas de tempo. En particular, quero que recordes as seguintes tres cousas desta sección:

  • o tamaño das ventás da sesión está limitado non polo período de tempo, senón pola actividade do usuario;
  • as fiestras "caídas" ofrecen unha visión xeral dos eventos nun período de tempo determinado;
  • A duración das fiestras de salto é fixa, pero actualízanse con frecuencia e poden conter entradas superpostas en todas as fiestras.

A continuación, aprenderemos a converter unha KTable de novo nun KStream para unha conexión.

5.3.3. Conectando obxectos KStream e KTable

No capítulo 4, comentamos a conexión de dous obxectos KStream. Agora temos que aprender a conectar KTable e KStream. Isto pode ser necesario pola seguinte razón sinxela. KStream é un fluxo de rexistros e KTable é un fluxo de actualizacións de rexistros, pero ás veces pode querer engadir contexto adicional ao fluxo de rexistros usando actualizacións de KTable.

Tomemos datos sobre o número de transaccións bursátiles e combinémolas coas noticias da bolsa para as industrias relevantes. Aquí tes o que tes que facer para logralo tendo en conta o código que xa tes.

  1. Converte un obxecto KTable con datos sobre o número de transaccións de accións nun KStream, seguido de substituír a clave pola clave que indica o sector da industria correspondente a este símbolo de accións.
  2. Crea un obxecto KTable que le datos dun tema con noticias da bolsa. Esta nova KTable clasificarase por sectores industriais.
  3. Conecta as actualizacións de noticias con información sobre o número de operacións en bolsa por sectores industriais.

Agora vexamos como implementar este plan de acción.

Converter KTable para KStream

Para converter KTable en KStream, debes facer o seguinte.

  1. Chame ao método KTable.toStream().
  2. Ao chamar ao método KStream.map, substitúe a chave polo nome do sector e, a continuación, recupere o obxecto TransactionSummary da instancia de Windowed.

Encadearemos estas operacións do seguinte xeito (o código pódese atopar no ficheiro src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listado 5.8).

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"
Debido a que estamos a realizar unha operación KStream.map, a instancia de KStream devolta volve particionarse automaticamente cando se usa nunha conexión.

Completamos o proceso de conversión, a continuación necesitamos crear un obxecto KTable para ler noticias de accións.

Creación de KTable para noticias de accións

Afortunadamente, a creación dun obxecto KTable leva só unha liña de código (o código pódese atopar en src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listado 5.9).

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"
Paga a pena sinalar que non se precisa especificar ningún obxecto Serde, xa que as cadeas Serdes úsanse na configuración. Ademais, ao usar a enumeración MÁIS ANTIGUA, a táboa énchese de rexistros ao principio.

Agora podemos pasar ao paso final: conexión.

Conectando actualizacións de noticias cos datos do reconto de transaccións

Crear unha conexión non é difícil. Usaremos unha unión á esquerda no caso de que non haxa noticias de stock para o sector relevante (o código necesario pódese atopar no ficheiro src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listado 5.10).

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"
Este operador leftJoin é bastante sinxelo. A diferenza das unións do capítulo 4, o método JoinWindow non se usa porque ao realizar unha unión KStream-KTable, só hai unha entrada na KTable para cada chave. Tal conexión non está limitada no tempo: o rexistro está ou na KTable ou está ausente. A conclusión principal: usando obxectos KTable pode enriquecer KStream con datos de referencia actualizados con menos frecuencia.

Agora veremos unha forma máis eficiente de enriquecer os eventos de KStream.

5.3.4. Obxectos GlobalKTable

Como podes ver, é necesario enriquecer os fluxos de eventos ou engadirlles contexto. No capítulo 4 viches as conexións entre dous obxectos KStream e na sección anterior viu a conexión entre un KStream e unha KTable. En todos estes casos, é necesario volver particionar o fluxo de datos ao asignar as claves a un novo tipo ou valor. Ás veces, a repartición faise de forma explícita, e ás veces Kafka Streams faino automaticamente. O re-particionamento é necesario porque as claves cambiaron e os rexistros deben acabar en novas seccións, se non, a conexión será imposible (diso comentouse no capítulo 4, no apartado "Reparticionar datos" na subsección 4.2.4).

Reparticionar ten un custo

O re-particionamento require custos: custos adicionais de recursos para crear temas intermedios, almacenar datos duplicados noutro tema; tamén supón un aumento da latencia debido á escritura e lectura deste tema. Ademais, se precisa unirse en máis dun aspecto ou dimensión, debe encadear as unións, mapear os rexistros con novas claves e executar de novo o proceso de particionamento.

Conexión a conxuntos de datos máis pequenos

Nalgúns casos, o volume de datos de referencia que se van conectar é relativamente pequeno, polo que as copias completas destes poden caber facilmente localmente en cada nodo. Para situacións como esta, Kafka Streams ofrece a clase GlobalKTable.

As instancias de GlobalKTable son únicas porque a aplicación replica todos os datos en cada un dos nodos. E dado que todos os datos están presentes en cada nodo, non hai necesidade de particionar o fluxo de eventos mediante a clave de datos de referencia para que estea dispoñible para todas as particións. Tamén pode facer unións sen chave usando obxectos GlobalKTable. Volvamos a un dos exemplos anteriores para demostrar esta característica.

Conectando obxectos KStream a obxectos GlobalKTable

Na subsección 5.3.2, realizamos a agregación de ventás de transaccións de cambio por parte dos compradores. Os resultados desta agregación foron algo así:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Aínda que estes resultados servían para o propósito, sería máis útil que tamén se mostrara o nome do cliente e o nome completo da empresa. Para engadir o nome do cliente e o nome da empresa, podes facer combinacións normais, pero terás que facer dúas asignacións de teclas e volver particionar. Con GlobalKTable pode evitar o custo deste tipo de operacións.

Para iso, utilizaremos o obxecto countStream do Listado 5.11 (o código correspondente pódese atopar en src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) e conectalo a dous obxectos GlobalKTable.

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"
Xa o comentamos antes, así que non o repetirei. Pero observo que o código da función toStream().map abstrúese nun obxecto de función en lugar dunha expresión lambda en liña por motivos de lexibilidade.

O seguinte paso é declarar dúas instancias de GlobalKTable (o código mostrado pódese atopar no ficheiro src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listado 5.12).

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"

Teña en conta que os nomes dos temas descríbense mediante tipos enumerados.

Agora que xa temos todos os compoñentes preparados, só queda escribir o código para a conexión (que se pode atopar no ficheiro src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listado 5.13).

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"
Aínda que hai dúas unións neste código, están encadeadas porque ningún dos seus resultados se usa por separado. Os resultados móstranse ao final de toda a operación.

Cando executes a operación de unión anterior, obterás resultados como este:

{customer='Barney, Smith' company="Exxon", transactions= 17}

A esencia non cambiou, pero estes resultados parecen máis claros.

Se conta atrás ata o capítulo 4, xa viches varios tipos de conexións en acción. Están listados na táboa. 5.2. Esta táboa reflicte as capacidades de conectividade a partir da versión 1.0.0 de Kafka Streams; Algo pode cambiar en próximas versións.

O libro “Kafka Streams in Action. Aplicacións e microservizos para o traballo en tempo real"
Para rematar, repasemos os conceptos básicos: podes conectar fluxos de eventos (KStream) e actualizar fluxos (KTable) usando o estado local. Alternativamente, se o tamaño dos datos de referencia non é demasiado grande, pode usar o obxecto GlobalKTable. GlobalKTables replica todas as particións en cada nodo da aplicación Kafka Streams, garantindo que todos os datos estean dispoñibles independentemente da partición á que corresponda a chave.

A continuación veremos a función Kafka Streams, grazas á cal podemos observar os cambios de estado sen consumir datos dun tema de Kafka.

5.3.5. Estado consultable

Xa realizamos varias operacións que implican o estado e sempre saímos os resultados á consola (con fins de desenvolvemento) ou escribimos nun tema (para fins de produción). Ao escribir resultados nun tema, tes que usar un consumidor de Kafka para velos.

A lectura de datos destes temas pode considerarse un tipo de vistas materializadas. Para os nosos propósitos, podemos utilizar a definición dunha vista materializada da Wikipedia: “...un obxecto físico de base de datos que contén os resultados dunha consulta. Por exemplo, pode ser unha copia local de datos remotos, ou un subconxunto de filas e/ou columnas dunha táboa ou resultados de unión, ou unha táboa de resumo obtida mediante a agregación” (https://en.wikipedia.org/wiki). /vista_materializada).

Kafka Streams tamén permite realizar consultas interactivas en tendas estatais, o que lle permite ler directamente estas vistas materializadas. É importante ter en conta que a consulta á tenda de estado é unha operación de só lectura. Isto garante que non tes que preocuparte de que o estado sexa accidentalmente inconsistente mentres a túa aplicación procesa datos.

A capacidade de consultar directamente as tendas de estado é importante. Isto significa que pode crear aplicacións de panel sen ter que buscar primeiro datos do consumidor de Kafka. Tamén aumenta a eficiencia da aplicación, debido ao feito de que non hai necesidade de escribir datos de novo:

  • grazas á localidade dos datos pódese acceder rapidamente a eles;
  • Elimínase a duplicación de datos, xa que non se escriben nun almacenamento externo.

O principal que quero que lembres é que podes consultar directamente o estado desde a túa aplicación. Non se poden exagerar as oportunidades que isto che brinda. En lugar de consumir datos de Kafka e almacenar rexistros nunha base de datos para a aplicación, pode consultar as tendas de estado co mesmo resultado. As consultas directas ás tendas estatais significan menos código (sen consumidor) e menos software (sen necesidade dunha táboa de base de datos para almacenar os resultados).

Cubrimos bastante terreo neste capítulo, polo que deixaremos por agora o noso debate sobre consultas interactivas contra tendas estatais. Pero non te preocupes: no capítulo 9, crearemos unha sinxela aplicación de panel con consultas interactivas. Usará algúns dos exemplos deste capítulo e dos anteriores para demostrar consultas interactivas e como podes engadilas ás aplicacións Kafka Streams.

Resumo

  • Os obxectos KStream representan fluxos de eventos, comparables ás insercións nunha base de datos. Os obxectos KTable representan fluxos de actualización, máis como actualizacións dunha base de datos. O tamaño do obxecto KTable non crece, os rexistros antigos son substituídos por outros novos.
  • Os obxectos KTable son necesarios para as operacións de agregación.
  • Usando as operacións de ventás, pode dividir os datos agregados en grupos de tempo.
  • Grazas aos obxectos GlobalKTable, pode acceder aos datos de referencia en calquera lugar da aplicación, independentemente da partición.
  • Son posibles conexións entre obxectos KStream, KTable e GlobalKTable.

Ata agora, centrámonos na creación de aplicacións Kafka Streams usando o KStream DSL de alto nivel. Aínda que o enfoque de alto nivel permítelle crear programas claros e concisos, usalo supón unha compensación. Traballar con DSL KStream significa aumentar a concisión do teu código reducindo o grao de control. No seguinte capítulo, analizaremos a API do nodo de controlador de baixo nivel e probaremos outras compensacións. Os programas serán máis longos do que eran antes, pero poderemos crear case calquera nodo controlador que necesitemos.

→ Podes atopar máis detalles sobre o libro en sitio web da editorial

→ Para Habrozhiteli 25% de desconto usando o cupón - Kafka Streams

→ Tras o pagamento da versión en papel do libro, enviarase un libro electrónico por correo electrónico.

Fonte: www.habr.com

Engadir un comentario