O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real" Olá, residentes de Khabro! Este livro é adequado para qualquer desenvolvedor que queira entender o processamento de threads. Compreender a programação distribuída ajudará você a entender melhor o Kafka e o Kafka Streams. Seria bom conhecer o framework Kafka em si, mas não é necessário: vou te contar tudo que você precisa. Desenvolvedores experientes e novatos do Kafka aprenderão como criar aplicativos interessantes de processamento de fluxo usando a biblioteca Kafka Streams neste livro. Desenvolvedores Java intermediários e avançados já familiarizados com conceitos como serialização aprenderão a aplicar suas habilidades para criar aplicativos Kafka Streams. O código-fonte do livro é escrito em Java 8 e faz uso significativo da sintaxe de expressão lambda do Java 8, portanto, saber como trabalhar com funções lambda (mesmo em outra linguagem de programação) será útil.

Excerto. 5.3. Operações de agregação e janelas

Nesta seção, exploraremos as partes mais promissoras do Kafka Streams. Até agora, cobrimos os seguintes aspectos do Kafka Streams:

  • criando uma topologia de processamento;
  • usando estado em aplicativos de streaming;
  • realizar conexões de fluxo de dados;
  • diferenças entre fluxos de eventos (KStream) e fluxos de atualização (KTable).

Nos exemplos a seguir reuniremos todos esses elementos. Você também aprenderá sobre janelas, outro ótimo recurso dos aplicativos de streaming. Nosso primeiro exemplo será uma agregação simples.

5.3.1. Agregação das vendas de ações por setor industrial

Agregação e agrupamento são ferramentas vitais ao trabalhar com streaming de dados. O exame dos registos individuais à medida que são recebidos é muitas vezes insuficiente. Para extrair informações adicionais dos dados, é necessário agrupá-los e combiná-los.

Neste exemplo, você se fantasiará de day trader que precisa acompanhar o volume de vendas de ações de empresas de diversos setores. Especificamente, você está interessado nas cinco empresas com maior participação nas vendas em cada setor.

Essa agregação exigirá as seguintes etapas para traduzir os dados na forma desejada (falando em termos gerais).

  1. Crie uma fonte baseada em tópicos que publique informações brutas sobre negociação de ações. Teremos que mapear um objeto do tipo StockTransaction para um objeto do tipo ShareVolume. A questão é que o objeto StockTransaction contém metadados de vendas, mas precisamos apenas de dados sobre o número de ações vendidas.
  2. Agrupe dados do ShareVolume por símbolo de ação. Uma vez agrupados por símbolo, você pode recolher esses dados em subtotais de volumes de vendas de ações. É importante notar que o método KStream.groupBy retorna uma instância do tipo KGroupedStream. E você pode obter uma instância KTable chamando ainda o método KGroupedStream.reduce.

Qual é a interface do KGroupedStream

Os métodos KStream.groupBy e KStream.groupByKey retornam uma instância de KGroupedStream. KGroupedStream é uma representação intermediária de um fluxo de eventos após agrupamento por chaves. Não se destina de forma alguma ao trabalho direto com ele. Em vez disso, KGroupedStream é usado para operações de agregação, que sempre resultam em uma KTable. E como o resultado das operações de agregação é uma KTable e eles usam um armazenamento de estado, é possível que nem todas as atualizações resultantes sejam enviadas posteriormente no pipeline.

O método KTable.groupBy retorna um KGroupedTable semelhante - uma representação intermediária do fluxo de atualizações, reagrupado por chave.

Vamos fazer uma pequena pausa e observar a Fig. 5.9, que mostra o que conseguimos. Esta topologia já deve ser muito familiar para você.

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"
Vejamos agora o código desta topologia (ele pode ser encontrado no arquivo src/main/java/bbejeck/chapter_5/AggregationsAndReduceExample.java) (Listagem 5.2).

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"
O código fornecido se diferencia pela brevidade e pelo grande volume de ações realizadas em diversas linhas. Você pode notar algo novo no primeiro parâmetro do método builder.stream: um valor do tipo enum AutoOffsetReset.EARLIEST (há também um LATEST), definido usando o método Consumed.withOffsetResetPolicy. Este tipo de enumeração pode ser usado para especificar uma estratégia de redefinição de deslocamento para cada KStream ou KTable e tem precedência sobre a opção de redefinição de deslocamento da configuração.

GroupByKey e GroupBy

A interface KStream possui dois métodos para agrupar registros: GroupByKey e GroupBy. Ambos retornam um KGroupedTable, então você deve estar se perguntando qual é a diferença entre eles e quando usar qual deles?

O método GroupByKey é usado quando as chaves no KStream já não estão vazias. E o mais importante, o sinalizador “requer reparticionamento” nunca foi definido.

O método GroupBy pressupõe que você alterou as chaves de agrupamento, portanto, o sinalizador de repartição é definido como verdadeiro. A realização de junções, agregações, etc. após o método GroupBy resultará em reparticionamento automático.
Resumo: Sempre que possível, você deve usar GroupByKey em vez de GroupBy.

Está claro o que os métodos mapValues ​​e groupBy fazem, então vamos dar uma olhada no método sum() (encontrado em src/main/java/bbejeck/model/ShareVolume.java) (Listagem 5.3).

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"
O método ShareVolume.sum retorna o total acumulado do volume de vendas de ações e o resultado de toda a cadeia de cálculos é um objeto KTable . Agora você entende o papel que o KTable desempenha. Quando os objetos ShareVolume chegam, o objeto KTable correspondente armazena a atualização atual mais recente. É importante lembrar que todas as atualizações são refletidas no shareVolumeKTable anterior, mas nem todas são enviadas posteriormente.

Em seguida, usamos esta KTable para agregar (por número de ações negociadas) para chegar às cinco empresas com os maiores volumes de ações negociadas em cada setor. Nossas ações neste caso serão semelhantes às da primeira agregação.

  1. Execute outra operação groupBy para agrupar objetos ShareVolume individuais por setor.
  2. Comece a resumir objetos ShareVolume. Desta vez, o objeto de agregação é uma fila de prioridade de tamanho fixo. Nessa fila de tamanho fixo, ficam retidas apenas as cinco empresas com maior quantidade de ações vendidas.
  3. Mapeie as filas do parágrafo anterior para um valor de string e retorne as cinco ações mais negociadas por número por setor.
  4. Escreva os resultados em formato de string no tópico.

Na Fig. A Figura 5.10 mostra o gráfico da topologia do fluxo de dados. Como você pode ver, a segunda rodada de processamento é bastante simples.

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"
Agora que temos uma compreensão clara da estrutura dessa segunda rodada de processamento, podemos voltar ao seu código-fonte (você o encontrará no arquivo src/main/java/bbejeck/chapter_5/AggregationsAndReduceExample.java) (Listagem 5.4) .

Este inicializador contém uma variável fixaQueue. Este é um objeto personalizado que é um adaptador para java.util.TreeSet usado para rastrear os N principais resultados em ordem decrescente de ações negociadas.

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"
Você já viu as chamadas groupBy e mapValues, então não entraremos nelas (estamos chamando o método KTable.toStream porque o método KTable.print está obsoleto). Mas você ainda não viu a versão KTable de agregate(), então passaremos um pouco de tempo discutindo isso.

Como você lembra, o que torna o KTable diferente é que registros com as mesmas chaves são considerados atualizações. KTable substitui a entrada antiga por uma nova. A agregação ocorre de maneira semelhante: os registros mais recentes com a mesma chave são agregados. Quando um registro chega, ele é adicionado à instância da classe FixedSizePriorityQueue usando um somador (segundo parâmetro na chamada do método agregado), mas se já existir outro registro com a mesma chave, então o registro antigo é removido usando um subtrator (terceiro parâmetro em a chamada do método agregado).

Tudo isso significa que nosso agregador, FixedSizePriorityQueue, não agrega todos os valores com uma chave, mas armazena uma soma móvel das quantidades dos N tipos de ações mais negociadas. Cada entrada recebida contém o número total de ações vendidas até o momento. KTable fornecerá informações sobre quais ações das empresas são atualmente mais negociadas, sem exigir agregação contínua de cada atualização.

Aprendemos a fazer duas coisas importantes:

  • agrupar valores no KTable por uma chave comum;
  • executar operações úteis, como rollup e agregação nesses valores agrupados.

Saber como realizar essas operações é importante para compreender o significado dos dados que se movem através de um aplicativo Kafka Streams e entender quais informações eles carregam.

Também reunimos alguns dos principais conceitos discutidos anteriormente neste livro. No Capítulo 4, discutimos como o estado local tolerante a falhas é importante para uma aplicação de streaming. O primeiro exemplo deste capítulo demonstrou por que o estado local é tão importante – ele lhe dá a capacidade de acompanhar as informações que você já viu. O acesso local evita atrasos na rede, tornando a aplicação mais performática e resistente a erros.

Ao executar qualquer operação de rollup ou agregação, você deve especificar o nome do armazenamento de estado. As operações de rollup e agregação retornam uma instância KTable, e a KTable usa armazenamento de estado para substituir resultados antigos por novos. Como você viu, nem todas as atualizações são enviadas pelo pipeline, e isso é importante porque as operações de agregação são projetadas para produzir informações resumidas. Se você não aplicar o estado local, o KTable encaminhará todos os resultados de agregação e rollup.

A seguir, veremos a execução de operações como agregação dentro de um período específico de tempo - as chamadas operações de janelamento.

5.3.2. Operações de janela

Na seção anterior, introduzimos convolução e agregação deslizantes. A aplicação realizou um roll-up contínuo do volume de vendas de ações, seguido da agregação das cinco ações mais negociadas na bolsa.

Às vezes, essa agregação e acumulação contínuas de resultados são necessárias. E às vezes você precisa realizar operações apenas durante um determinado período de tempo. Por exemplo, calcule quantas transações de câmbio foram feitas com ações de uma determinada empresa nos últimos 10 minutos. Ou quantos usuários clicaram em um novo banner publicitário nos últimos 15 minutos. Um aplicativo pode executar essas operações diversas vezes, mas com resultados que se aplicam apenas a períodos de tempo especificados (janelas de tempo).

Contando transações de câmbio por comprador

No próximo exemplo, rastrearemos as transações de ações entre vários traders – sejam grandes organizações ou financiadores individuais inteligentes.

Existem duas razões possíveis para esse rastreamento. Uma delas é a necessidade de saber o que os líderes de mercado estão comprando/vendendo. Se estes grandes intervenientes e investidores sofisticados virem oportunidades, faz sentido seguir a sua estratégia. A segunda razão é o desejo de detectar quaisquer possíveis sinais de abuso de informação privilegiada. Para fazer isso, você precisará analisar a correlação de grandes picos de vendas com comunicados de imprensa importantes.

Esse rastreamento consiste nas seguintes etapas:

  • criação de um fluxo para leitura do tópico transações de ações;
  • agrupando registros recebidos por ID do comprador e símbolo de ação. Chamar o método groupBy retorna uma instância da classe KGroupedStream;
  • O método KGroupedStream.windowedBy retorna um fluxo de dados limitado a uma janela de tempo, que permite agregação em janela. Dependendo do tipo de janela, um TimeWindowedKStream ou um SessionWindowedKStream será retornado;
  • contagem de transações para a operação de agregação. O fluxo de dados em janela determina se um determinado registro é levado em consideração nesta contagem;
  • gravar resultados em um tópico ou enviá-los para o console durante o desenvolvimento.

A topologia deste aplicativo é simples, mas uma imagem clara dela seria útil. Vamos dar uma olhada na Fig. 5.11.

A seguir, veremos a funcionalidade das operações de janela e o código correspondente.

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"

Tipos de janela

Existem três tipos de janelas no Kafka Streams:

  • sessão;
  • “caindo”;
  • deslizar/saltar.

Qual escolher depende dos requisitos do seu negócio. As janelas de queda e salto são limitadas no tempo, enquanto as janelas de sessão são limitadas pela atividade do usuário – a duração da(s) sessão(ões) é determinada exclusivamente pelo quão ativo o usuário é. A principal coisa a lembrar é que todos os tipos de janelas são baseados nos carimbos de data/hora das entradas, não na hora do sistema.

A seguir, implementamos nossa topologia com cada um dos tipos de janela. O código completo será fornecido apenas no primeiro exemplo; para outros tipos de janelas nada mudará, exceto o tipo de operação da janela.

Janelas de sessão

As janelas de sessão são muito diferentes de todos os outros tipos de janelas. Eles são limitados não tanto pelo tempo, mas pela atividade do usuário (ou pela atividade da entidade que você gostaria de rastrear). As janelas de sessão são delimitadas por períodos de inatividade.

A Figura 5.12 ilustra o conceito de janelas de sessão. A sessão menor será mesclada com a sessão à sua esquerda. E a sessão da direita será separada porque segue um longo período de inatividade. As janelas de sessão são baseadas na atividade do usuário, mas usam carimbos de data/hora das entradas para determinar a qual sessão a entrada pertence.

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"

Usando janelas de sessão para rastrear transações de ações

Vamos usar janelas de sessão para capturar informações sobre transações de câmbio. A implementação das janelas de sessão é mostrada na Listagem 5.5 (que pode ser encontrada em src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"
Você já viu a maioria das operações nesta topologia, portanto não há necessidade de examiná-las novamente aqui. Mas também existem vários elementos novos aqui, que discutiremos agora.

Qualquer operação groupBy normalmente executa algum tipo de operação de agregação (agregação, rollup ou contagem). Você pode realizar agregação cumulativa com um total em execução ou agregação de janela, que leva em consideração registros dentro de uma janela de tempo especificada.

O código na Listagem 5.5 conta o número de transações nas janelas de sessão. Na Fig. 5.13 essas ações são analisadas passo a passo.

Chamando windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) criamos uma janela de sessão com um intervalo de inatividade de 20 segundos e um intervalo de persistência de 15 minutos. Um intervalo ocioso de 20 segundos significa que o aplicativo incluirá qualquer entrada que chegue dentro de 20 segundos após o término ou início da sessão atual na sessão atual (ativa).

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"
A seguir, especificamos qual operação de agregação precisa ser executada na janela de sessão – neste caso, contagem. Se uma entrada recebida estiver fora da janela de inatividade (ambos os lados do carimbo de data/hora), o aplicativo criará uma nova sessão. Intervalo de retenção significa manter uma sessão por um determinado período de tempo e permite dados atrasados ​​que se estendem além do período de inatividade da sessão, mas ainda podem ser anexados. Além disso, o início e o término da nova sessão resultante da mesclagem correspondem ao carimbo de data/hora mais antigo e mais recente.

Vejamos algumas entradas do método count para ver como as sessões funcionam (Tabela 5.1).

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"
Quando os registros chegam, procuramos sessões existentes com a mesma chave, um horário de término menor que o carimbo de data/hora atual - intervalo de inatividade e um horário de início maior que o carimbo de data/hora atual + intervalo de inatividade. Levando isso em consideração, quatro entradas da tabela. 5.1 são mesclados em uma única sessão como segue.

1. O registro 1 chega primeiro, portanto o horário de início é igual ao horário de término e é 00:00:00.

2. Em seguida, chega a entrada 2 e procuramos sessões que terminem no máximo às 23:59:55 e comecem no máximo às 00:00:35. Encontramos o registro 1 e combinamos as sessões 1 e 2. Tomamos o horário de início da sessão 1 (anterior) e o horário de término da sessão 2 (mais tarde), de modo que nossa nova sessão comece às 00:00:00 e termine às 00:00: 15:XNUMX.

3. Chega o registro 3, procuramos sessões entre 00:00:30 e 00:01:10 e não encontramos nenhuma. Adicione uma segunda sessão para a chave 123-345-654,FFBE, começando e terminando às 00:00:50.

4. Chega o registro 4 e procuramos sessões entre 23:59:45 e 00:00:25. Desta vez, são encontradas as sessões 1 e 2. Todas as três sessões são combinadas em uma, com horário de início às 00:00:00 e horário de término às 00:00:15.

Pelo que está descrito nesta seção, vale lembrar as seguintes nuances importantes:

  • as sessões não são janelas de tamanho fixo. A duração de uma sessão é determinada pela atividade realizada num determinado período de tempo;
  • Os carimbos de data/hora nos dados determinam se o evento ocorre em uma sessão existente ou durante um período ocioso.

A seguir discutiremos o próximo tipo de janela - janelas “tombadas”.

Janelas "caindo"

As janelas giratórias capturam eventos que ocorrem dentro de um determinado período de tempo. Imagine que você precisa capturar todas as transações de ações de uma determinada empresa a cada 20 segundos, para coletar todos os eventos desse período de tempo. No final do intervalo de 20 segundos, a janela rola e passa para um novo intervalo de observação de 20 segundos. A Figura 5.14 ilustra esta situação.

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"
Como você pode ver, todos os eventos recebidos nos últimos 20 segundos são incluídos na janela. Ao final deste período, uma nova janela é criada.

A Listagem 5.6 mostra o código que demonstra o uso de janelas giratórias para capturar transações de ações a cada 20 segundos (encontrado em src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"
Com esta pequena alteração na chamada do método TimeWindows.of, você pode usar uma janela giratória. Este exemplo não chama o método Until(), portanto será usado o intervalo de retenção padrão de 24 horas.

Finalmente, é hora de passar para a última das opções de janela – janelas “saltos”.

Janelas deslizantes ("saltos")

As janelas deslizantes/saltos são semelhantes às janelas giratórias, mas com uma ligeira diferença. As janelas deslizantes não esperam até o final do intervalo de tempo antes de criar uma nova janela para processar eventos recentes. Eles iniciam novos cálculos após um intervalo de espera menor que a duração da janela.

Para ilustrar as diferenças entre janelas giratórias e saltitantes, voltemos ao exemplo da contagem de transações na bolsa de valores. Nosso objetivo ainda é contar o número de transações, mas não queremos esperar todo o tempo antes de atualizar o contador. Em vez disso, atualizaremos o contador em intervalos mais curtos. Por exemplo, ainda contaremos o número de transações a cada 20 segundos, mas atualizaremos o contador a cada 5 segundos, conforme mostrado na Fig. 5.15. Neste caso, acabamos com três janelas de resultados com dados sobrepostos.

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"
A Listagem 5.7 mostra o código para definir janelas deslizantes (encontrado em src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"
Uma janela oscilante pode ser convertida em uma janela saltitante adicionando uma chamada ao método advanceBy(). No exemplo mostrado, o intervalo de salvamento é de 15 minutos.

Você viu nesta seção como limitar os resultados da agregação a janelas de tempo. Em particular, quero que você se lembre das três coisas a seguir desta seção:

  • o tamanho das janelas de sessão é limitado não pelo período de tempo, mas pela atividade do usuário;
  • janelas “em movimento” fornecem uma visão geral dos eventos dentro de um determinado período de tempo;
  • A duração das janelas saltantes é fixa, mas elas são atualizadas com frequência e podem conter entradas sobrepostas em todas as janelas.

A seguir, aprenderemos como converter um KTable de volta em um KStream para uma conexão.

5.3.3. Conectando objetos Kstream e KTable

No Capítulo 4, discutimos a conexão de dois objetos Kstream. Agora temos que aprender como conectar KTable e Kstream. Isso pode ser necessário pelo seguinte motivo simples. KStream é um fluxo de registros e KTable é um fluxo de atualizações de registros, mas às vezes você pode querer adicionar contexto adicional ao fluxo de registros usando atualizações do KTable.

Vamos pegar dados sobre o número de transações na bolsa de valores e combiná-los com notícias da bolsa de valores dos setores relevantes. Aqui está o que você precisa fazer para conseguir isso, considerando o código que você já possui.

  1. Converta um objeto KTable com dados sobre o número de transações de ações em um KStream, seguido da substituição da chave pela chave que indica o setor da indústria correspondente a este símbolo de ação.
  2. Crie um objeto KTable que leia dados de um tópico com notícias da bolsa de valores. Esta nova KTable será categorizada por setor industrial.
  3. Conecte atualizações de notícias com informações sobre o número de transações em bolsa de valores por setor industrial.

Agora vamos ver como implementar este plano de ação.

Converter KTable em KStream

Para converter KTable em Kstream você precisa fazer o seguinte.

  1. Chame o método KTable.toStream().
  2. Ao chamar o método KStream.map, substitua a chave pelo nome do setor e, em seguida, recupere o objeto TransactionSummary da instância Windowed.

Encadearemos essas operações como segue (o código pode ser encontrado no arquivo src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listagem 5.8).

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"
Como estamos executando uma operação KStream.map, a instância retornada do KStream é reparticionada automaticamente quando é usada em uma conexão.

Concluímos o processo de conversão, em seguida precisamos criar um objeto KTable para leitura de notícias da bolsa.

Criação de KTable para notícias de ações

Felizmente, a criação de um objeto KTable requer apenas uma linha de código (o código pode ser encontrado em src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listagem 5.9).

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"
É importante notar que nenhum objeto Serde precisa ser especificado, uma vez que a string Serdes é usada nas configurações. Além disso, usando a enumeração EARLIEST, a tabela é preenchida com registros logo no início.

Agora podemos passar para a etapa final - conexão.

Conectando atualizações de notícias com dados de contagem de transações

Criar uma conexão não é difícil. Usaremos um left join caso não haja notícias de ações para o setor relevante (o código necessário pode ser encontrado no arquivo src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listagem 5.10).

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"
Este operador leftJoin é bastante simples. Ao contrário das junções do Capítulo 4, o método JoinWindow não é usado porque ao realizar uma junção KStream-KTable, há apenas uma entrada na KTable para cada chave. Essa conexão não é limitada no tempo: o registro está no KTable ou ausente. A principal conclusão: usando objetos KTable você pode enriquecer o KStream com dados de referência atualizados com menos frequência.

Agora veremos uma maneira mais eficiente de enriquecer eventos do KStream.

5.3.4. Objetos GlobalKTable

Como você pode ver, é necessário enriquecer os fluxos de eventos ou adicionar contexto a eles. No Capítulo 4 você viu as conexões entre dois objetos KStream, e na seção anterior você viu a conexão entre um KStream e um KTable. Em todos estes casos, é necessário particionar novamente o fluxo de dados ao mapear as chaves para um novo tipo ou valor. Às vezes, o reparticionamento é feito explicitamente e, às vezes, o Kafka Streams faz isso automaticamente. O reparticionamento é necessário porque as chaves foram alteradas e os registros devem terminar em novas seções, caso contrário a conexão será impossível (isso foi discutido no Capítulo 4, na seção “Reparticionamento de dados” na subseção 4.2.4).

O reparticionamento tem um custo

O reparticionamento requer custos - custos de recursos adicionais para criar tópicos intermediários, armazenar dados duplicados em outro tópico; também significa maior latência devido à escrita e leitura deste tópico. Além disso, se precisar unir mais de um aspecto ou dimensão, você deverá encadear as junções, mapear os registros com novas chaves e executar novamente o processo de particionamento.

Conectando-se a conjuntos de dados menores

Em alguns casos, o volume de dados de referência a serem conectados é relativamente pequeno, de modo que cópias completas deles podem caber facilmente localmente em cada nó. Para situações como essa, o Kafka Streams fornece a classe GlobalKTable.

As instâncias GlobalKTable são exclusivas porque o aplicativo replica todos os dados para cada um dos nós. E como todos os dados estão presentes em cada nó, não há necessidade de particionar o fluxo de eventos por chave de dados de referência para que fique disponível para todas as partições. Você também pode fazer junções sem chave usando objetos GlobalKTable. Vamos voltar a um dos exemplos anteriores para demonstrar esse recurso.

Conectando objetos Kstream a objetos GlobalKTable

Na subseção 5.3.2, realizamos agregação de janelas de transações de câmbio por compradores. Os resultados desta agregação foram mais ou menos assim:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Embora esses resultados servissem ao propósito, teria sido mais útil se o nome do cliente e o nome completo da empresa também tivessem sido exibidos. Para adicionar o nome do cliente e o nome da empresa, você pode fazer junções normais, mas precisará fazer dois mapeamentos de chaves e reparticionamento. Com GlobalKTable você pode evitar o custo de tais operações.

Para fazer isso, usaremos o objeto countStream da Listagem 5.11 (o código correspondente pode ser encontrado em src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) e o conectaremos a dois objetos GlobalKTable.

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"
Já discutimos isso antes, então não vou repetir. Mas observo que o código na função toStream().map é abstraído em um objeto de função em vez de uma expressão lambda embutida para facilitar a leitura.

O próximo passo é declarar duas instâncias de GlobalKTable (o código mostrado pode ser encontrado no arquivo src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listagem 5.12).

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"

Observe que os nomes dos tópicos são descritos usando tipos enumerados.

Agora que temos todos os componentes prontos, só falta escrever o código para a conexão (que pode ser encontrado no arquivo src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listagem 5.13).

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"
Embora existam duas junções neste código, elas são encadeadas porque nenhum de seus resultados é usado separadamente. Os resultados são exibidos ao final de toda a operação.

Ao executar a operação de junção acima, você obterá resultados como este:

{customer='Barney, Smith' company="Exxon", transactions= 17}

A essência não mudou, mas estes resultados parecem mais claros.

Se você fizer a contagem regressiva até o Capítulo 4, já verá vários tipos de conexões em ação. Eles estão listados na tabela. 5.2. Esta tabela reflete os recursos de conectividade a partir da versão 1.0.0 do Kafka Streams; Algo pode mudar em versões futuras.

O livro “Kafka Streams em Ação. Aplicativos e microsserviços para trabalho em tempo real"
Para finalizar, vamos recapitular o básico: você pode conectar streams de eventos (KStream) e atualizar streams (KTable) usando o estado local. Alternativamente, se o tamanho dos dados de referência não for muito grande, você poderá usar o objeto GlobalKTable. GlobalKTables replica todas as partições para cada nó do aplicativo Kafka Streams, garantindo que todos os dados estejam disponíveis, independentemente de qual partição a chave corresponde.

A seguir veremos o recurso Kafka Streams, graças ao qual podemos observar mudanças de estado sem consumir dados de um tópico Kafka.

5.3.5. Estado consultável

Já realizamos diversas operações envolvendo estado e sempre enviamos os resultados para o console (para fins de desenvolvimento) ou os escrevemos em um tópico (para fins de produção). Ao escrever resultados em um tópico, você deve usar um consumidor Kafka para visualizá-los.

A leitura dos dados desses tópicos pode ser considerada um tipo de visão materializada. Para nossos propósitos, podemos usar a definição de visão materializada da Wikipedia: “...um objeto de banco de dados físico contendo os resultados de uma consulta. Por exemplo, pode ser uma cópia local de dados remotos, ou um subconjunto de linhas e/ou colunas de uma tabela ou resultados de junção, ou uma tabela de resumo obtida através de agregação” (https://en.wikipedia.org/wiki /Materializado_view).

Kafka Streams também permite executar consultas interativas em armazenamentos de estado, permitindo a leitura direta dessas visualizações materializadas. É importante observar que a consulta ao armazenamento de estado é uma operação somente leitura. Isso garante que você não precise se preocupar em tornar acidentalmente o estado inconsistente enquanto seu aplicativo estiver processando dados.

A capacidade de consultar diretamente os armazenamentos estaduais é importante. Isso significa que você pode criar aplicativos de painel sem precisar primeiro buscar dados do consumidor Kafka. Também aumenta a eficiência da aplicação, pois não há necessidade de gravar dados novamente:

  • graças à localização dos dados, eles podem ser acessados ​​rapidamente;
  • a duplicação de dados é eliminada, pois não são gravados em armazenamento externo.

A principal coisa que quero que você lembre é que você pode consultar o estado diretamente de dentro do seu aplicativo. As oportunidades que isso oferece não podem ser exageradas. Em vez de consumir dados do Kafka e armazenar registros em um banco de dados para o aplicativo, você pode consultar armazenamentos de estado com o mesmo resultado. Consultas diretas aos armazenamentos estaduais significam menos código (sem consumidor) e menos software (sem necessidade de uma tabela de banco de dados para armazenar os resultados).

Abordamos bastante assunto neste capítulo, então deixaremos nossa discussão sobre consultas interativas em armazenamentos estaduais por enquanto. Mas não se preocupe: no Capítulo 9, criaremos um aplicativo de painel simples com consultas interativas. Ele usará alguns dos exemplos deste e dos capítulos anteriores para demonstrar consultas interativas e como você pode adicioná-las aos aplicativos Kafka Streams.

Resumo

  • Os objetos KStream representam fluxos de eventos, comparáveis ​​a inserções em um banco de dados. Os objetos KTable representam fluxos de atualização, mais como atualizações em um banco de dados. O tamanho do objeto KTable não aumenta, os registros antigos são substituídos por novos.
  • Os objetos KTable são necessários para operações de agregação.
  • Usando operações de janelas, você pode dividir os dados agregados em intervalos de tempo.
  • Graças aos objetos GlobalKTable, você pode acessar dados de referência em qualquer lugar do aplicativo, independentemente do particionamento.
  • Conexões entre objetos KStream, KTable e GlobalKTable são possíveis.

Até agora, nos concentramos na construção de aplicativos Kafka Streams usando o KStream DSL de alto nível. Embora a abordagem de alto nível permita criar programas simples e concisos, usá-la representa uma compensação. Trabalhar com DSL KStream significa aumentar a concisão do seu código, reduzindo o grau de controle. No próximo capítulo, veremos a API do nó manipulador de baixo nível e tentaremos outras compensações. Os programas serão mais longos do que eram antes, mas seremos capazes de criar praticamente qualquer nó manipulador que precisarmos.

→ Mais detalhes sobre o livro podem ser encontrados em site da editora

→ Para Habrozhiteli 25% de desconto usando cupom - Streams Kafka

→ Após o pagamento da versão impressa do livro, será enviado um livro eletrônico por e-mail.

Fonte: habr.com

Adicionar um comentário