Apache Kafka y el procesamiento de datos en streaming con Spark Streaming

¡Hola Habr! Hoy construiremos un sistema que procesará flujos de mensajes de Apache Kafka utilizando Spark Streaming y escribirá los resultados del procesamiento en la base de datos en la nube de AWS RDS.

Imaginemos que una determinada entidad de crédito nos encarga la tarea de procesar las transacciones entrantes “sobre la marcha” en todas sus sucursales. Esto se puede hacer con el fin de calcular rápidamente una posición monetaria abierta para la tesorería, los límites o los resultados financieros de las transacciones, etc.

Cómo implementar este negocio sin el uso de magia y hechizos mágicos: ¡lea debajo del corte! ¡Ir!

Apache Kafka y el procesamiento de datos en streaming con Spark Streaming
(Fuente de imagen)

introducción

Por supuesto, el procesamiento de una gran cantidad de datos en tiempo real ofrece amplias oportunidades de uso en sistemas modernos. Una de las combinaciones más populares para esto es el tándem de Apache Kafka y Spark Streaming, donde Kafka crea un flujo de paquetes de mensajes entrantes y Spark Streaming procesa estos paquetes en un intervalo de tiempo determinado.

Para aumentar la tolerancia a fallos de la aplicación, utilizaremos puntos de control. Con este mecanismo, cuando el motor Spark Streaming necesita recuperar datos perdidos, solo necesita volver al último punto de control y reanudar los cálculos desde allí.

Arquitectura del sistema desarrollado.

Apache Kafka y el procesamiento de datos en streaming con Spark Streaming

Componentes utilizados:

  • Apache Kafka es un sistema distribuido de mensajería de publicación-suscripción. Adecuado para el consumo de mensajes en línea y fuera de línea. Para evitar la pérdida de datos, los mensajes de Kafka se almacenan en el disco y se replican dentro del clúster. El sistema Kafka está construido sobre el servicio de sincronización ZooKeeper;
  • Transmisión de Apache Spark - Componente Spark para procesar datos en streaming. El módulo Spark Streaming está construido utilizando una arquitectura de microlotes, donde el flujo de datos se interpreta como una secuencia continua de pequeños paquetes de datos. Spark Streaming toma datos de diferentes fuentes y los combina en pequeños paquetes. Se crean nuevos paquetes a intervalos regulares. Al comienzo de cada intervalo de tiempo, se crea un nuevo paquete y todos los datos recibidos durante ese intervalo se incluyen en el paquete. Al final del intervalo, el crecimiento del paquete se detiene. El tamaño del intervalo está determinado por un parámetro llamado intervalo de lote;
  • Apache chispa SQL - combina el procesamiento relacional con la programación funcional Spark. Los datos estructurados son datos que tienen un esquema, es decir, un único conjunto de campos para todos los registros. Spark SQL admite entradas de una variedad de fuentes de datos estructurados y, gracias a la disponibilidad de información del esquema, puede recuperar de manera eficiente solo los campos de registros requeridos y también proporciona API de DataFrame;
  • RDS de AWS es un servicio web de base de datos relacional basado en la nube relativamente económico que simplifica la configuración, el funcionamiento y el escalado, y es administrado directamente por Amazon.

Instalación y ejecución del servidor Kafka

Antes de usar Kafka directamente, debes asegurarte de tener Java, porque... JVM se utiliza para el trabajo:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Creemos un nuevo usuario para trabajar con Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

A continuación, descargue la distribución desde el sitio web oficial de Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Desempaquete el archivo descargado:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

El siguiente paso es opcional. El hecho es que la configuración predeterminada no le permite utilizar plenamente todas las capacidades de Apache Kafka. Por ejemplo, elimine un tema, categoría o grupo en el que se puedan publicar mensajes. Para cambiar esto, editemos el archivo de configuración:

vim ~/kafka/config/server.properties

Agregue lo siguiente al final del archivo:

delete.topic.enable = true

Antes de iniciar el servidor Kafka, debe iniciar el servidor ZooKeeper; usaremos el script auxiliar que viene con la distribución de Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Después de que ZooKeeper se haya iniciado exitosamente, inicie el servidor Kafka en una terminal separada:

bin/kafka-server-start.sh config/server.properties

Creemos un nuevo tema llamado Transacción:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Asegurémonos de que se haya creado un tema con la cantidad requerida de particiones y replicación:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka y el procesamiento de datos en streaming con Spark Streaming

Perdamos los momentos de probar al productor y al consumidor sobre el tema recién creado. Más detalles sobre cómo puede probar el envío y la recepción de mensajes están escritos en la documentación oficial: enviar algunos mensajes. Bueno, pasamos a escribir un productor en Python usando la API KafkaProducer.

Escritura del productor

El productor generará datos aleatorios: 100 mensajes por segundo. Por datos aleatorios nos referimos a un diccionario que consta de tres campos:

  • Rama — nombre del punto de venta de la entidad de crédito;
  • Moneda — moneda de transacción;
  • Cantidad - cantidad de transacción. El monto será un número positivo si se trata de una compra de moneda por parte del Banco, y un número negativo si se trata de una venta.

El código del productor se ve así:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

A continuación, usando el método send, enviamos un mensaje al servidor, al tema que necesitamos, en formato JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Al ejecutar el script, recibimos los siguientes mensajes en la terminal:

Apache Kafka y el procesamiento de datos en streaming con Spark Streaming

Esto significa que todo funciona como queríamos: el productor genera y envía mensajes al tema que necesitamos.
El siguiente paso es instalar Spark y procesar este flujo de mensajes.

Instalación de Apache Spark

Apache Spark es una plataforma informática de clúster universal y de alto rendimiento.

Spark funciona mejor que las implementaciones populares del modelo MapReduce y, al mismo tiempo, admite una gama más amplia de tipos de cálculo, incluidas consultas interactivas y procesamiento de secuencias. La velocidad juega un papel importante a la hora de procesar grandes cantidades de datos, ya que es la velocidad la que permite trabajar de forma interactiva sin perder minutos u horas de espera. Una de las mayores fortalezas de Spark que lo hace tan rápido es su capacidad para realizar cálculos en memoria.

Este marco está escrito en Scala, por lo que primero debes instalarlo:

sudo apt-get install scala

Descargue la distribución Spark desde el sitio web oficial:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Descomprimir el archivo:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Agregue la ruta a Spark al archivo bash:

vim ~/.bashrc

Agregue las siguientes líneas a través del editor:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Ejecute el siguiente comando después de realizar cambios en bashrc:

source ~/.bashrc

Implementación de AWS PostgreSQL

Todo lo que queda es implementar la base de datos en la que cargaremos la información procesada de las transmisiones. Para ello utilizaremos el servicio AWS RDS.

Vaya a la consola de AWS -> AWS RDS -> Bases de datos -> Crear base de datos:
Apache Kafka y el procesamiento de datos en streaming con Spark Streaming

Seleccione PostgreSQL y haga clic en Siguiente:
Apache Kafka y el procesamiento de datos en streaming con Spark Streaming

Porque Este ejemplo es solo para fines educativos; usaremos un servidor gratuito “como mínimo” (Nivel gratuito):
Apache Kafka y el procesamiento de datos en streaming con Spark Streaming

A continuación, marcamos el bloque Nivel gratuito y luego se nos ofrecerá automáticamente una instancia de la clase t2.micro; aunque débil, es gratuita y bastante adecuada para nuestra tarea:
Apache Kafka y el procesamiento de datos en streaming con Spark Streaming

Luego vienen cosas muy importantes: el nombre de la instancia de la base de datos, el nombre del usuario maestro y su contraseña. Llamemos a la instancia: myHabrTest, usuario maestro: Hablar, contraseña: habr12345 y haga clic en el botón Siguiente:
Apache Kafka y el procesamiento de datos en streaming con Spark Streaming

En la página siguiente hay parámetros responsables de la accesibilidad de nuestro servidor de base de datos desde el exterior (Accesibilidad pública) y la disponibilidad del puerto:

Apache Kafka y el procesamiento de datos en streaming con Spark Streaming

Creemos una nueva configuración para el grupo de seguridad de VPC, que permitirá el acceso externo a nuestro servidor de base de datos a través del puerto 5432 (PostgreSQL).
Vayamos a la consola de AWS en una ventana separada del navegador a la sección Panel de control de VPC -> Grupos de seguridad -> Crear grupo de seguridad:
Apache Kafka y el procesamiento de datos en streaming con Spark Streaming

Configuramos el nombre para el grupo de seguridad - PostgreSQL, una descripción, indicamos a qué VPC debe asociarse este grupo y hacemos clic en el botón Crear:
Apache Kafka y el procesamiento de datos en streaming con Spark Streaming

Complete las reglas de entrada para el puerto 5432 para el grupo recién creado, como se muestra en la siguiente imagen. No puede especificar el puerto manualmente, pero seleccione PostgreSQL en la lista desplegable Tipo.

Estrictamente hablando, el valor ::/0 significa la disponibilidad de tráfico entrante al servidor desde todo el mundo, lo cual canónicamente no es del todo cierto, pero para analizar el ejemplo, permitámonos usar este enfoque:
Apache Kafka y el procesamiento de datos en streaming con Spark Streaming

Volvemos a la página del navegador, donde tenemos abierto “Configurar ajustes avanzados” y seleccionamos en la sección Grupos de seguridad de VPC -> Elegir grupos de seguridad de VPC existentes -> PostgreSQL:
Apache Kafka y el procesamiento de datos en streaming con Spark Streaming

A continuación, en las opciones de la base de datos -> Nombre de la base de datos -> establecer el nombre - habrDB.

Podemos dejar los parámetros restantes, con la excepción de deshabilitar la copia de seguridad (período de retención de la copia de seguridad: 0 días), la monitorización y Performance Insights, de forma predeterminada. Haga clic en el botón Crear base de datos:
Apache Kafka y el procesamiento de datos en streaming con Spark Streaming

manejador de hilos

La etapa final será el desarrollo de un trabajo Spark, que procesará nuevos datos provenientes de Kafka cada dos segundos e ingresará el resultado en la base de datos.

Como se señaló anteriormente, los puntos de control son un mecanismo central en SparkStreaming que debe configurarse para garantizar la tolerancia a fallas. Usaremos puntos de control y, si el procedimiento falla, el módulo Spark Streaming solo necesitará regresar al último punto de control y reanudar los cálculos desde él para recuperar los datos perdidos.

Los puntos de control se pueden habilitar configurando un directorio en un sistema de archivos confiable y tolerante a fallas (como HDFS, S3, etc.) en el que se almacenará la información del punto de control. Esto se hace usando, por ejemplo:

streamingContext.checkpoint(checkpointDirectory)

En nuestro ejemplo, utilizaremos el siguiente enfoque, es decir, si checkpointDirectory existe, el contexto se recreará a partir de los datos del punto de control. Si el directorio no existe (es decir, se ejecuta por primera vez), se llama a functionToCreateContext para crear un nuevo contexto y configurar DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Creamos un objeto DirectStream para conectarnos al tema "transacción" usando el método createDirectStream de la biblioteca KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Analizando datos entrantes en formato JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Usando Spark SQL, hacemos una agrupación simple y mostramos el resultado en la consola:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Obtener el texto de la consulta y ejecutarlo a través de Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Y luego guardamos los datos agregados resultantes en una tabla en AWS RDS. Para guardar los resultados de la agregación en una tabla de base de datos, usaremos el método de escritura del objeto DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Algunas palabras sobre cómo configurar una conexión a AWS RDS. Creamos el usuario y la contraseña en el paso "Implementación de AWS PostgreSQL". Debe utilizar Endpoint como la URL del servidor de la base de datos, que se muestra en la sección Conectividad y seguridad:

Apache Kafka y el procesamiento de datos en streaming con Spark Streaming

Para conectar correctamente Spark y Kafka, debe ejecutar el trabajo mediante smark-submit usando el artefacto transmisión-de-chispa-kafka-0-8_2.11. Además, también usaremos un artefacto para interactuar con la base de datos PostgreSQL; los transferiremos a través de --packages.

Para flexibilidad del script, también incluiremos como parámetros de entrada el nombre del servidor de mensajes y el tema del que queremos recibir datos.

Entonces, es hora de iniciar y verificar la funcionalidad del sistema:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

¡Todo salió bien! Como puede ver en la imagen a continuación, mientras la aplicación se está ejecutando, se generan nuevos resultados de agregación cada 2 segundos, porque configuramos el intervalo de procesamiento por lotes en 2 segundos cuando creamos el objeto StreamingContext:

Apache Kafka y el procesamiento de datos en streaming con Spark Streaming

A continuación, realizamos una consulta sencilla a la base de datos para comprobar la presencia de registros en la tabla. flujo_transaccion:

Apache Kafka y el procesamiento de datos en streaming con Spark Streaming

Conclusión

Este artículo analizó un ejemplo de procesamiento de flujo de información utilizando Spark Streaming junto con Apache Kafka y PostgreSQL. Con el crecimiento de datos de diversas fuentes, es difícil sobreestimar el valor práctico de Spark Streaming para crear aplicaciones de streaming y en tiempo real.

Puedes encontrar el código fuente completo en mi repositorio en GitHub.

Me complace discutir este artículo, espero sus comentarios y también espero críticas constructivas de todos los lectores atentos.

Les deseo mucho éxito!

Ps. Inicialmente se planeó utilizar una base de datos PostgreSQL local, pero dado mi amor por AWS, decidí trasladar la base de datos a la nube. En el próximo artículo sobre este tema, mostraré cómo implementar todo el sistema descrito anteriormente en AWS utilizando AWS Kinesis y AWS EMR. ¡Sigue las novedades!

Fuente: habr.com

Añadir un comentario