El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real" ¡Hola residentes de Khabro! Este libro es adecuado para cualquier desarrollador que quiera comprender el procesamiento de subprocesos. Comprender la programación distribuida le ayudará a comprender mejor Kafka y Kafka Streams. Sería bueno conocer el marco de Kafka en sí, pero no es necesario: te diré todo lo que necesitas. En este libro, tanto los desarrolladores experimentados como los principiantes de Kafka aprenderán cómo crear interesantes aplicaciones de procesamiento de flujo utilizando la biblioteca Kafka Streams. Los desarrolladores de Java intermedios y avanzados que ya estén familiarizados con conceptos como la serialización aprenderán a aplicar sus habilidades para crear aplicaciones Kafka Streams. El código fuente del libro está escrito en Java 8 y hace un uso significativo de la sintaxis de expresiones lambda de Java 8, por lo que será útil saber cómo trabajar con funciones lambda (incluso en otro lenguaje de programación).

Extracto. 5.3. Operaciones de agregación y ventanas.

En esta sección, continuaremos explorando las partes más prometedoras de Kafka Streams. Hasta ahora hemos cubierto los siguientes aspectos de Kafka Streams:

  • crear una topología de procesamiento;
  • usar el estado en aplicaciones de streaming;
  • realizar conexiones de flujo de datos;
  • diferencias entre flujos de eventos (KStream) y flujos de actualización (KTable).

En los siguientes ejemplos reuniremos todos estos elementos. También aprenderá sobre las ventanas, otra gran característica de las aplicaciones de transmisión por secuencias. Nuestro primer ejemplo será una agregación simple.

5.3.1. Agregación de ventas de acciones por sector industrial

La agregación y la agrupación son herramientas vitales cuando se trabaja con datos en streaming. El examen de los registros individuales a medida que se reciben suele ser insuficiente. Para extraer información adicional de los datos, es necesario agruparlos y combinarlos.

En este ejemplo, se pondrá el disfraz de un comerciante intradía que necesita realizar un seguimiento del volumen de ventas de acciones de empresas de varias industrias. Específicamente, le interesan las cinco empresas con las mayores ventas de acciones en cada industria.

Dicha agregación requerirá los siguientes pasos para traducir los datos a la forma deseada (en términos generales).

  1. Cree una fuente basada en temas que publique información bruta sobre el comercio de acciones. Tendremos que asignar un objeto de tipo StockTransaction a un objeto de tipo ShareVolume. El punto es que el objeto StockTransaction contiene metadatos de ventas, pero solo necesitamos datos sobre la cantidad de acciones que se venden.
  2. Agrupe los datos de ShareVolume por símbolo bursátil. Una vez agrupados por símbolo, puede contraer estos datos en subtotales de volúmenes de ventas de acciones. Vale la pena señalar que el método KStream.groupBy devuelve una instancia de tipo KGroupedStream. Y puede obtener una instancia de KTable llamando al método KGroupedStream.reduce.

¿Qué es la interfaz KGroupedStream?

Los métodos KStream.groupBy y KStream.groupByKey devuelven una instancia de KGroupedStream. KGroupedStream es una representación intermedia de un flujo de eventos después de agruparlos por claves. No está pensado en absoluto para trabajar directamente con él. En cambio, KGroupedStream se utiliza para operaciones de agregación, que siempre dan como resultado una KTable. Y dado que el resultado de las operaciones de agregación es una KTable y utilizan un almacén de estado, es posible que no todas las actualizaciones como resultado se envíen más adelante en el proceso.

El método KTable.groupBy devuelve una KGroupedTable similar: una representación intermedia del flujo de actualizaciones, reagrupada por clave.

Hagamos una breve pausa y miremos la Fig. 5.9, que muestra lo que hemos logrado. Esta topología ya debería resultarle muy familiar.

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"
Veamos ahora el código para esta topología (se puede encontrar en el archivo src/main/java/bbejeck/chapter_5/AggregationsAndRetainingExample.java) (Listado 5.2).

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"
El código dado se distingue por su brevedad y la gran cantidad de acciones realizadas en varias líneas. Puede notar algo nuevo en el primer parámetro del método builder.stream: un valor del tipo de enumeración AutoOffsetReset.EARLIEST (también hay un ÚLTIMO), establecido usando el método Consumed.withOffsetResetPolicy. Este tipo de enumeración se puede utilizar para especificar una estrategia de restablecimiento de compensación para cada KStream o KTable y tiene prioridad sobre la opción de restablecimiento de compensación de la configuración.

GroupByKey y GroupBy

La interfaz KStream tiene dos métodos para agrupar registros: GroupByKey y GroupBy. Ambos devuelven una KGroupedTable, por lo que quizás se pregunte cuál es la diferencia entre ellos y cuándo usar cuál.

El método GroupByKey se utiliza cuando las claves en KStream ya no están vacías. Y lo más importante, el indicador "requiere nueva partición" nunca se estableció.

El método GroupBy supone que ha cambiado las claves de agrupación, por lo que el indicador de repartición se establece en verdadero. Realizar uniones, agregaciones, etc. después del método GroupBy dará como resultado una nueva partición automática.
Resumen: Siempre que sea posible, debe utilizar GroupByKey en lugar de GroupBy.

Está claro lo que hacen los métodos mapValues ​​​​y groupBy, así que echemos un vistazo al método sum() (que se encuentra en src/main/java/bbejeck/model/ShareVolume.java) (Listado 5.3).

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"
El método ShareVolume.sum devuelve el total acumulado del volumen de ventas de acciones y el resultado de toda la cadena de cálculos es un objeto KTable. . Ahora comprende el papel que desempeña KTable. Cuando llegan los objetos ShareVolume, el objeto KTable correspondiente almacena la última actualización actual. Es importante recordar que todas las actualizaciones se reflejan en el shareVolumeKTable anterior, pero no todas se envían más.

Luego usamos esta KTable para agregar (por número de acciones negociadas) para llegar a las cinco empresas con los mayores volúmenes de acciones negociadas en cada industria. Nuestras acciones en este caso serán similares a las de la primera agregación.

  1. Realice otra operación groupBy para agrupar objetos ShareVolume individuales por industria.
  2. Comience a resumir los objetos ShareVolume. Esta vez el objeto de agregación es una cola de prioridad de tamaño fijo. En esta cola de tamaño fijo, sólo se retienen las cinco empresas con la mayor cantidad de acciones vendidas.
  3. Asigne las colas del párrafo anterior a un valor de cadena y devuelva las cinco acciones más negociadas por número por industria.
  4. Escriba los resultados en forma de cadena para el tema.

En la Fig. La Figura 5.10 muestra el gráfico de topología del flujo de datos. Como puede ver, la segunda ronda de procesamiento es bastante sencilla.

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"
Ahora que tenemos una comprensión clara de la estructura de esta segunda ronda de procesamiento, podemos pasar a su código fuente (lo encontrará en el archivo src/main/java/bbejeck/chapter_5/AggregationsAndRetainingExample.java) (Listado 5.4). .

Este inicializador contiene una variable fixQueue. Este es un objeto personalizado que es un adaptador para java.util.TreeSet que se utiliza para realizar un seguimiento de los N resultados principales en orden descendente de acciones negociadas.

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"
Ya ha visto las llamadas groupBy y mapValues, por lo que no las abordaremos (estamos llamando al método KTable.toStream porque el método KTable.print está en desuso). Pero aún no has visto la versión KTable de agregado(), así que dedicaremos un poco de tiempo a discutir eso.

Como recordarás, lo que hace diferente a KTable es que los registros con las mismas claves se consideran actualizaciones. KTable reemplaza la entrada anterior por una nueva. La agregación ocurre de manera similar: se agregan los últimos registros con la misma clave. Cuando llega un registro, se agrega a la instancia de clase FixedSizePriorityQueue usando un sumador (segundo parámetro en la llamada al método agregado), pero si ya existe otro registro con la misma clave, entonces el registro anterior se elimina usando un restador (tercer parámetro en la llamada al método agregado).

Todo esto significa que nuestro agregador, FixedSizePriorityQueue, no agrega todos los valores con una clave, sino que almacena una suma móvil de las cantidades de los N tipos de acciones más negociados. Cada entrada entrante contiene el número total de acciones vendidas hasta el momento. KTable le brindará información sobre las acciones de las empresas que más se negocian actualmente, sin necesidad de agregar continuamente cada actualización.

Aprendimos a hacer dos cosas importantes:

  • agrupar valores en KTable mediante una clave común;
  • realice operaciones útiles como resumen y agregación en estos valores agrupados.

Saber cómo realizar estas operaciones es importante para comprender el significado de los datos que se mueven a través de una aplicación Kafka Streams y comprender qué información transporta.

También hemos reunido algunos de los conceptos clave discutidos anteriormente en este libro. En el Capítulo 4, analizamos cuán importante es el estado local tolerante a fallas para una aplicación de transmisión por secuencias. El primer ejemplo de este capítulo demostró por qué el estado local es tan importante: le brinda la posibilidad de realizar un seguimiento de la información que ya ha visto. El acceso local evita retrasos en la red, lo que hace que la aplicación sea más eficaz y resistente a errores.

Al realizar cualquier operación de acumulación o agregación, debe especificar el nombre del almacén de estado. Las operaciones de resumen y agregación devuelven una instancia de KTable, y KTable utiliza el almacenamiento de estado para reemplazar los resultados antiguos por otros nuevos. Como ha visto, no todas las actualizaciones se envían por proceso, y esto es importante porque las operaciones de agregación están diseñadas para producir información resumida. Si no aplica el estado local, KTable reenviará todos los resultados de agregación y resumen.

A continuación, veremos cómo realizar operaciones como la agregación dentro de un período de tiempo específico: las llamadas operaciones de ventanas.

5.3.2. Operaciones de ventana

En la sección anterior, introdujimos la convolución deslizante y la agregación. La aplicación realizó un resumen continuo del volumen de ventas de acciones, seguido de la agregación de las cinco acciones más negociadas en la bolsa.

A veces es necesaria esta agregación y acumulación continua de resultados. Y a veces es necesario realizar operaciones sólo durante un período de tiempo determinado. Por ejemplo, calcule cuántas transacciones de cambio se realizaron con acciones de una empresa en particular en los últimos 10 minutos. O cuántos usuarios hicieron clic en un nuevo banner publicitario en los últimos 15 minutos. Una aplicación puede realizar dichas operaciones varias veces, pero con resultados que se aplican sólo a períodos de tiempo específicos (ventanas de tiempo).

Contando transacciones de cambio por comprador

En el siguiente ejemplo, realizaremos un seguimiento de las transacciones de acciones entre múltiples operadores, ya sean grandes organizaciones o financistas individuales inteligentes.

Hay dos posibles razones para este seguimiento. Uno de ellos es la necesidad de saber qué están comprando/vendiendo los líderes del mercado. Si estos grandes actores e inversores sofisticados ven oportunidades, tiene sentido seguir su estrategia. La segunda razón es el deseo de detectar posibles signos de uso ilegal de información privilegiada. Para hacer esto, necesitará analizar la correlación de grandes picos de ventas con comunicados de prensa importantes.

Dicho seguimiento consta de los siguientes pasos:

  • crear un flujo de lectura del tema de transacciones bursátiles;
  • agrupar registros entrantes por ID de comprador y símbolo bursátil. Llamar al método groupBy devuelve una instancia de la clase KGroupedStream;
  • El método KGroupedStream.windowedBy devuelve un flujo de datos limitado a una ventana de tiempo, lo que permite la agregación en ventanas. Dependiendo del tipo de ventana, se devuelve TimeWindowedKStream o SessionWindowedKStream;
  • recuento de transacciones para la operación de agregación. El flujo de datos en ventana determina si un registro particular se tiene en cuenta en este recuento;
  • escribir resultados para un tema o enviarlos a la consola durante el desarrollo.

La topología de esta aplicación es simple, pero sería útil tener una imagen clara de ella. Echemos un vistazo a la Fig. 5.11.

A continuación, veremos la funcionalidad de las operaciones de ventana y el código correspondiente.

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"

Tipos de ventanas

Hay tres tipos de ventanas en Kafka Streams:

  • de una sesión;
  • “cayendo” (cayendo);
  • deslizarse/saltar.

Cuál elegir depende de los requisitos de su negocio. Las ventanas de caída y salto tienen un límite de tiempo, mientras que las ventanas de sesión están limitadas por la actividad del usuario; la duración de la sesión está determinada únicamente por qué tan activo es el usuario. Lo principal que debe recordar es que todos los tipos de ventanas se basan en las marcas de fecha y hora de las entradas, no en la hora del sistema.

A continuación, implementamos nuestra topología con cada uno de los tipos de ventana. El código completo se dará sólo en el primer ejemplo; para otros tipos de ventanas nada cambiará excepto el tipo de operación de la ventana.

Ventanas de sesión

Las ventanas de sesión son muy diferentes de todos los demás tipos de ventanas. Están limitados no tanto por el tiempo sino por la actividad del usuario (o la actividad de la entidad que le gustaría rastrear). Las ventanas de sesión están delimitadas por períodos de inactividad.

La Figura 5.12 ilustra el concepto de ventanas de sesión. La sesión más pequeña se fusionará con la sesión de su izquierda. Y la sesión de la derecha será separada porque sigue a un largo período de inactividad. Las ventanas de sesión se basan en la actividad del usuario, pero utilizan marcas de fecha y hora de las entradas para determinar a qué sesión pertenece la entrada.

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"

Uso de ventanas de sesión para realizar un seguimiento de las transacciones bursátiles

Usemos ventanas de sesión para capturar información sobre transacciones de intercambio. La implementación de las ventanas de sesión se muestra en el Listado 5.5 (que se puede encontrar en src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"
Ya ha visto la mayoría de las operaciones en esta topología, por lo que no es necesario revisarlas aquí nuevamente. Pero aquí también hay varios elementos nuevos, que ahora analizaremos.

Cualquier operación groupBy normalmente realiza algún tipo de operación de agregación (agregación, acumulación o conteo). Puede realizar una agregación acumulativa con un total acumulado o una agregación de ventana, que tiene en cuenta los registros dentro de un período de tiempo específico.

El código del Listado 5.5 cuenta el número de transacciones dentro de las ventanas de sesión. En la Fig. 5.13 estas acciones se analizan paso a paso.

Al llamar a windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) creamos una ventana de sesión con un intervalo de inactividad de 20 segundos y un intervalo de persistencia de 15 minutos. Un intervalo de inactividad de 20 segundos significa que la aplicación incluirá cualquier entrada que llegue dentro de los 20 segundos posteriores al final o al inicio de la sesión actual en la sesión actual (activa).

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"
A continuación, especificamos qué operación de agregación debe realizarse en la ventana de sesión; en este caso, contar. Si una entrada entrante queda fuera de la ventana de inactividad (a cualquier lado de la marca de fecha/hora), la aplicación crea una nueva sesión. El intervalo de retención significa mantener una sesión durante un cierto período de tiempo y permite datos tardíos que se extienden más allá del período de inactividad de la sesión pero que aún se pueden adjuntar. Además, el inicio y el final de la nueva sesión resultante de la fusión corresponden a la fecha y hora más temprana y más reciente.

Veamos algunas entradas del método de conteo para ver cómo funcionan las sesiones (Tabla 5.1).

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"
Cuando llegan los registros, buscamos sesiones existentes con la misma clave, una hora de finalización menor que la marca de fecha/hora actual - intervalo de inactividad y una hora de inicio mayor que la marca de fecha/hora actual + intervalo de inactividad. Teniendo esto en cuenta, cuatro entradas de la tabla. 5.1 se fusionan en una sola sesión de la siguiente manera.

1. El registro 1 llega primero, por lo que la hora de inicio es igual a la hora de finalización y es 00:00:00.

2. A continuación llega la entrada 2 y buscamos sesiones que finalicen no antes de las 23:59:55 y comiencen no más tarde de las 00:00:35. Encontramos el registro 1 y combinamos las sesiones 1 y 2. Tomamos la hora de inicio de la sesión 1 (antes) y la hora de finalización de la sesión 2 (más tarde), de modo que nuestra nueva sesión comience a las 00:00:00 y termine a las 00: 00:15.

3. Llega el registro 3, buscamos sesiones entre las 00:00:30 y las 00:01:10 y no encontramos ninguna. Agregue una segunda sesión para la clave 123-345-654,FFBE, comenzando y finalizando a las 00:00:50.

4. Llega el registro 4 y buscamos sesiones entre las 23:59:45 y las 00:00:25. Esta vez se encuentran las sesiones 1 y 2. Las tres sesiones se combinan en una, con una hora de inicio de 00:00:00 y una hora de finalización de 00:00:15.

De lo descrito en este apartado, conviene recordar los siguientes matices importantes:

  • Las sesiones no son ventanas de tamaño fijo. La duración de una sesión está determinada por la actividad realizada dentro de un período de tiempo determinado;
  • Las marcas de fecha y hora en los datos determinan si el evento cae dentro de una sesión existente o durante un período de inactividad.

A continuación, analizaremos el siguiente tipo de ventana: las ventanas "volcantes".

Ventanas "que caen"

Las ventanas giratorias capturan eventos que ocurren dentro de un cierto período de tiempo. Imagine que necesita capturar todas las transacciones bursátiles de una determinada empresa cada 20 segundos, de modo que recopile todos los eventos durante ese período de tiempo. Al final del intervalo de 20 segundos, la ventana se desplaza y pasa a un nuevo intervalo de observación de 20 segundos. La figura 5.14 ilustra esta situación.

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"
Como puede ver, todos los eventos recibidos en los últimos 20 segundos se incluyen en la ventana. Al final de este período de tiempo, se crea una nueva ventana.

El Listado 5.6 muestra un código que demuestra el uso de ventanas giratorias para capturar transacciones de acciones cada 20 segundos (que se encuentra en src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"
Con este pequeño cambio en la llamada al método TimeWindows.of, puede utilizar una ventana giratoria. Este ejemplo no llama al método hasta(), por lo que se utilizará el intervalo de retención predeterminado de 24 horas.

Finalmente, es hora de pasar a la última de las opciones de ventana: ventanas de "salto".

Ventanas correderas ("saltantes")

Las ventanas corredizas/salto son similares a las ventanas giratorias, pero con una ligera diferencia. Las ventanas deslizantes no esperan hasta el final del intervalo de tiempo antes de crear una nueva ventana para procesar eventos recientes. Comienzan nuevos cálculos después de un intervalo de espera menor que la duración de la ventana.

Para ilustrar las diferencias entre ventanas de caída y de salto, volvamos al ejemplo del recuento de transacciones bursátiles. Nuestro objetivo sigue siendo contar el número de transacciones, pero no queremos esperar todo el tiempo antes de actualizar el contador. En su lugar, actualizaremos el contador a intervalos más cortos. Por ejemplo, seguiremos contando el número de transacciones cada 20 segundos, pero actualizaremos el contador cada 5 segundos, como se muestra en la Fig. 5.15. En este caso, terminamos con tres ventanas de resultados con datos superpuestos.

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"
El Listado 5.7 muestra el código para definir ventanas deslizantes (que se encuentra en src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"
Una ventana de caída se puede convertir en una ventana de salto agregando una llamada al método advancedBy(). En el ejemplo mostrado, el intervalo de guardado es de 15 minutos.

En esta sección vio cómo limitar los resultados de agregación a ventanas de tiempo. En particular, quiero que recuerdes las siguientes tres cosas de esta sección:

  • el tamaño de las ventanas de sesión no está limitado por el período de tiempo, sino por la actividad del usuario;
  • las ventanas “volteantes” proporcionan una visión general de los acontecimientos dentro de un período de tiempo determinado;
  • La duración de los saltos de ventanas es fija, pero se actualizan con frecuencia y pueden contener entradas superpuestas en todas las ventanas.

A continuación, aprenderemos cómo convertir una KTable nuevamente en un KStream para una conexión.

5.3.3. Conexión de objetos KStream y KTable

En el Capítulo 4, analizamos la conexión de dos objetos KStream. Ahora tenemos que aprender cómo conectar KTable y KStream. Esto puede ser necesario por la sencilla razón siguiente. KStream es un flujo de registros y KTable es un flujo de actualizaciones de registros, pero a veces es posible que desee agregar contexto adicional al flujo de registros utilizando actualizaciones de KTable.

Tomemos datos sobre el número de transacciones bursátiles y combinémoslos con noticias bursátiles de las industrias relevantes. Esto es lo que debe hacer para lograrlo según el código que ya tiene.

  1. Convierta un objeto KTable con datos sobre el número de transacciones bursátiles en un KStream, y luego reemplace la clave con la clave que indica el sector industrial correspondiente a este símbolo bursátil.
  2. Cree un objeto KTable que lea datos de un tema con noticias bursátiles. Esta nueva KTable estará categorizada por sector industrial.
  3. Conecte actualizaciones de noticias con información sobre el número de transacciones bursátiles por sector industrial.

Ahora veamos cómo implementar este plan de acción.

Convertir KTable a KStream

Para convertir KTable a KStream, debe hacer lo siguiente.

  1. Llame al método KTable.toStream().
  2. Al llamar al método KStream.map, reemplace la clave con el nombre de la industria y luego recupere el objeto TransactionSummary de la instancia en ventana.

Encadenaremos estas operaciones de la siguiente manera (el código se puede encontrar en el archivo src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listado 5.8).

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"
Debido a que estamos realizando una operación KStream.map, la instancia de KStream devuelta se vuelve a particionar automáticamente cuando se usa en una conexión.

Hemos completado el proceso de conversión, a continuación necesitamos crear un objeto KTable para leer noticias bursátiles.

Creación de KTable para noticias bursátiles.

Afortunadamente, crear un objeto KTable requiere solo una línea de código (el código se puede encontrar en src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listado 5.9).

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"
Vale la pena señalar que no es necesario especificar ningún objeto Serde, ya que las cadenas Serdes se utilizan en la configuración. Además, al utilizar la enumeración EARLIEST, la tabla se llena con registros desde el principio.

Ahora podemos pasar al paso final: la conexión.

Conexión de actualizaciones de noticias con datos de recuento de transacciones

Crear una conexión no es difícil. Usaremos una combinación izquierda en caso de que no haya noticias bursátiles para la industria relevante (el código necesario se puede encontrar en el archivo src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listado 5.10).

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"
Este operador leftJoin es bastante simple. A diferencia de las uniones del Capítulo 4, el método JoinWindow no se utiliza porque cuando se realiza una unión KStream-KTable, solo hay una entrada en KTable para cada clave. Esta conexión no está limitada en el tiempo: la entrada está en KTable o no está. La principal conclusión: utilizando objetos KTable puede enriquecer KStream con datos de referencia actualizados con menos frecuencia.

Ahora veremos una forma más eficiente de enriquecer eventos de KStream.

5.3.4. Objetos GlobalKTable

Como puede ver, es necesario enriquecer los flujos de eventos o agregarles contexto. En el Capítulo 4 vio las conexiones entre dos objetos KStream y en la sección anterior vio la conexión entre un KStream y una KTable. En todos estos casos, es necesario volver a particionar el flujo de datos al asignar las claves a un nuevo tipo o valor. A veces, el reparticionamiento se realiza explícitamente y, a veces, Kafka Streams lo hace automáticamente. La nueva partición es necesaria porque las claves han cambiado y los registros deben terminar en nuevas secciones; de lo contrario, la conexión será imposible (esto se analizó en el Capítulo 4, en la sección "Repartición de datos" en la subsección 4.2.4).

Volver a particionar tiene un costo

Volver a particionar requiere costos: costos de recursos adicionales para crear temas intermedios y almacenar datos duplicados en otro tema; también significa una mayor latencia debido a la escritura y lectura de este tema. Además, si necesita unir más de un aspecto o dimensión, debe encadenar las uniones, asignar los registros con nuevas claves y ejecutar el proceso de repartición nuevamente.

Conexión a conjuntos de datos más pequeños

En algunos casos, el volumen de datos de referencia que se van a conectar es relativamente pequeño, por lo que pueden caber fácilmente copias completas localmente en cada nodo. Para situaciones como esta, Kafka Streams proporciona la clase GlobalKTable.

Las instancias de GlobalKTable son únicas porque la aplicación replica todos los datos en cada uno de los nodos. Y dado que todos los datos están presentes en cada nodo, no es necesario particionar el flujo de eventos mediante una clave de datos de referencia para que esté disponible para todas las particiones. También puede realizar uniones sin llave utilizando objetos GlobalKTable. Volvamos a uno de los ejemplos anteriores para demostrar esta característica.

Conexión de objetos KStream a objetos GlobalKTable

En la subsección 5.3.2, realizamos la agregación en ventana de las transacciones de cambio de los compradores. Los resultados de esta agregación se parecían a esto:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Si bien estos resultados cumplieron su propósito, habrían sido más útiles si también se hubieran mostrado el nombre del cliente y el nombre completo de la empresa. Para agregar el nombre del cliente y el nombre de la empresa, puede realizar uniones normales, pero necesitará realizar dos asignaciones de claves y volver a particionar. Con GlobalKTable podrás evitar el coste de este tipo de operaciones.

Para hacer esto, usaremos el objeto countStream del Listado 5.11 (el código correspondiente se puede encontrar en src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) y lo conectaremos a dos objetos GlobalKTable.

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"
Ya hemos hablado de esto antes, así que no lo repetiré. Pero observo que el código de la función toStream().map se abstrae en un objeto de función en lugar de una expresión lambda en línea por razones de legibilidad.

El siguiente paso es declarar dos instancias de GlobalKTable (el código mostrado se puede encontrar en el archivo src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listado 5.12).

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"

Tenga en cuenta que los nombres de los temas se describen utilizando tipos enumerados.

Ahora que tenemos todos los componentes listos, solo queda escribir el código para la conexión (que se puede encontrar en el archivo src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listado 5.13).

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"
Aunque hay dos combinaciones en este código, están encadenadas porque ninguno de sus resultados se utiliza por separado. Los resultados se muestran al final de toda la operación.

Cuando ejecute la operación de unión anterior, obtendrá resultados como este:

{customer='Barney, Smith' company="Exxon", transactions= 17}

La esencia no ha cambiado, pero estos resultados parecen más claros.

Si cuenta atrás hasta el Capítulo 4, ya habrá visto varios tipos de conexiones en acción. Están enumerados en la tabla. 5.2. Esta tabla refleja las capacidades de conectividad a partir de la versión 1.0.0 de Kafka Streams; Algo puede cambiar en futuras versiones.

El libro “Kafka Streams en acción. Aplicaciones y microservicios para el trabajo en tiempo real"
Para resumir, recapitulemos lo básico: puede conectar transmisiones de eventos (KStream) y actualizar transmisiones (KTable) usando el estado local. Alternativamente, si el tamaño de los datos de referencia no es demasiado grande, puede utilizar el objeto GlobalKTable. GlobalKTables replica todas las particiones en cada nodo de la aplicación Kafka Streams, asegurando que todos los datos estén disponibles independientemente de a qué partición corresponda la clave.

A continuación veremos la función Kafka Streams, gracias a la cual podemos observar cambios de estado sin consumir datos de un tema de Kafka.

5.3.5. Estado consultable

Ya hemos realizado varias operaciones que involucran el estado y siempre enviamos los resultados a la consola (para fines de desarrollo) o los escribimos en un tema (para fines de producción). Al escribir resultados sobre un tema, debe utilizar un consumidor de Kafka para verlos.

La lectura de datos de estos temas puede considerarse un tipo de vistas materializadas. Para nuestros propósitos, podemos utilizar la definición de vista materializada de Wikipedia: “...un objeto físico de base de datos que contiene los resultados de una consulta. Por ejemplo, podría ser una copia local de datos remotos, o un subconjunto de filas y/o columnas de una tabla o resultados combinados, o una tabla resumen obtenida mediante agregación” (https://en.wikipedia.org/wiki /vista_materializada).

Kafka Streams también le permite ejecutar consultas interactivas en las tiendas estatales, lo que le permite leer directamente estas vistas materializadas. Es importante tener en cuenta que la consulta al almacén de estado es una operación de solo lectura. Esto garantiza que no tenga que preocuparse por hacer que el estado sea inconsistente accidentalmente mientras su aplicación procesa datos.

La capacidad de consultar directamente las tiendas estatales es importante. Esto significa que puede crear aplicaciones de panel sin tener que obtener primero los datos del consumidor de Kafka. También aumenta la eficiencia de la aplicación, debido a que no es necesario volver a escribir datos:

  • gracias a la localidad de los datos, se puede acceder a ellos rápidamente;
  • Se elimina la duplicación de datos, ya que no se escriben en un almacenamiento externo.

Lo principal que quiero que recuerdes es que puedes consultar el estado directamente desde tu aplicación. No se pueden subestimar las oportunidades que esto le brinda. En lugar de consumir datos de Kafka y almacenar registros en una base de datos para la aplicación, puede consultar los almacenes estatales con el mismo resultado. Las consultas directas a las tiendas estatales significan menos código (sin consumidor) y menos software (sin necesidad de una tabla de base de datos para almacenar los resultados).

Hemos cubierto bastante terreno en este capítulo, por lo que dejaremos nuestra discusión sobre consultas interactivas en tiendas estatales por ahora. Pero no se preocupe: en el Capítulo 9, crearemos una aplicación de panel simple con consultas interactivas. Utilizará algunos de los ejemplos de este y de los capítulos anteriores para demostrar consultas interactivas y cómo agregarlas a las aplicaciones Kafka Streams.

Resumen

  • Los objetos KStream representan flujos de eventos, comparables a las inserciones en una base de datos. Los objetos KTable representan flujos de actualización, más como actualizaciones de una base de datos. El tamaño del objeto KTable no aumenta, los registros antiguos se reemplazan por otros nuevos.
  • Los objetos KTable son necesarios para las operaciones de agregación.
  • Mediante operaciones de ventanas, puede dividir los datos agregados en períodos de tiempo.
  • Gracias a los objetos GlobalKTable, puede acceder a datos de referencia en cualquier lugar de la aplicación, independientemente de la partición.
  • Son posibles conexiones entre objetos KStream, KTable y GlobalKTable.

Hasta ahora, nos hemos centrado en crear aplicaciones Kafka Streams utilizando KStream DSL de alto nivel. Aunque el enfoque de alto nivel le permite crear programas claros y concisos, su uso representa una compensación. Trabajar con DSL KStream significa aumentar la concisión de su código al reducir el grado de control. En el próximo capítulo, veremos la API del nodo controlador de bajo nivel y probaremos otras compensaciones. Los programas serán más largos que antes, pero podremos crear casi cualquier nodo controlador que podamos necesitar.

→ Más detalles sobre el libro se pueden encontrar en sitio web del editor

→ Para Habrozhiteli 25% de descuento usando cupón - Corrientes de Kafka

→ Una vez realizado el pago de la versión impresa del libro, se enviará un libro electrónico por correo electrónico.

Fuente: habr.com

Añadir un comentario