Apache Kafka e procesamento de datos en streaming con Spark Streaming
Ola, Habr! Hoxe imos construír un sistema que procesará fluxos de mensaxes de Apache Kafka mediante Spark Streaming e escribirá os resultados do procesamento na base de datos na nube de AWS RDS.
Imaxinemos que unha determinada entidade de crédito nos encarga a tarefa de procesar transaccións entrantes "sobre a marcha" en todas as súas oficinas. Isto pódese facer co propósito de calcular rapidamente unha posición de moeda aberta para o tesouro, límites ou resultados financeiros para transaccións, etc.
Como implementar este caso sen o uso de maxias e feitizos máxicos: le debaixo do corte! Vaia!
Por suposto, procesar unha gran cantidade de datos en tempo real ofrece amplas oportunidades de uso nos sistemas modernos. Unha das combinacións máis populares para iso é o tándem de Apache Kafka e Spark Streaming, onde Kafka crea un fluxo de paquetes de mensaxes entrantes e Spark Streaming procesa estes paquetes nun intervalo de tempo determinado.
Para aumentar a tolerancia a fallos da aplicación, utilizaremos puntos de control. Con este mecanismo, cando o motor de Spark Streaming necesita recuperar datos perdidos, só ten que volver ao último punto de control e retomar os cálculos desde alí.
Arquitectura do sistema desenvolvido
Compoñentes utilizados:
Apache Kafka é un sistema de mensaxería distribuído de publicación-subscrición. Adecuado tanto para o consumo de mensaxes fóra de liña como en liña. Para evitar a perda de datos, as mensaxes de Kafka almacénanse no disco e replícanse dentro do clúster. O sistema Kafka está construído sobre o servizo de sincronización ZooKeeper;
Transmisión de Apache Spark - Compoñente Spark para procesar datos de transmisión. O módulo Spark Streaming está construído mediante unha arquitectura de micro-lotes, onde o fluxo de datos se interpreta como unha secuencia continua de pequenos paquetes de datos. Spark Streaming toma datos de diferentes fontes e combínaos en pequenos paquetes. Os novos paquetes créanse a intervalos regulares. Ao comezo de cada intervalo de tempo, créase un novo paquete e os datos recibidos durante ese intervalo inclúense no paquete. Ao final do intervalo, o crecemento do paquete detense. O tamaño do intervalo está determinado por un parámetro chamado intervalo por lotes;
Apache Spark SQL - combina o procesamento relacional coa programación funcional de Spark. Os datos estruturados son datos que teñen un esquema, é dicir, un único conxunto de campos para todos os rexistros. Spark SQL admite a entrada dunha variedade de fontes de datos estruturados e, grazas á dispoñibilidade de información do esquema, pode recuperar de forma eficiente só os campos necesarios dos rexistros e tamén ofrece API de DataFrame;
AWS RDS é unha base de datos relacional baseada na nube relativamente barata, un servizo web que simplifica a configuración, o funcionamento e a escala, e que é administrado directamente por Amazon.
Instalación e execución do servidor Kafka
Antes de usar Kafka directamente, debes asegurarte de ter Java, porque... JVM úsase para traballar:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
O seguinte paso é opcional. O feito é que a configuración predeterminada non che permite utilizar todas as funcións de Apache Kafka. Por exemplo, elimina un tema, categoría ou grupo no que se poden publicar mensaxes. Para cambialo, editemos o ficheiro de configuración:
vim ~/kafka/config/server.properties
Engade o seguinte ao final do ficheiro:
delete.topic.enable = true
Antes de iniciar o servidor Kafka, cómpre iniciar o servidor ZooKeeper; usaremos o script auxiliar que inclúe a distribución Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Despois de iniciar ZooKeeper correctamente, inicie o servidor Kafka nun terminal separado:
Perdamos os momentos de probas ao produtor e ao consumidor para o tema de nova creación. Máis detalles sobre como pode probar o envío e a recepción de mensaxes están escritos na documentación oficial - Envía algunhas mensaxes. Ben, pasamos a escribir un produtor en Python usando a API KafkaProducer.
Redacción do produtor
O produtor xerará datos aleatorios: 100 mensaxes por segundo. Por datos aleatorios entendemos un dicionario formado por tres campos:
Rama - nome do punto de venda da entidade de crédito;
Moeda - moeda de transacción;
cantidade - importe da transacción. O importe será un número positivo se se trata dunha compra de moeda por parte do Banco, e un número negativo se se trata dunha venda.
A continuación, mediante o método send, enviamos unha mensaxe ao servidor, ao tema que necesitamos, en formato JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Ao executar o script, recibimos as seguintes mensaxes no terminal:
Isto significa que todo funciona como queriamos: o produtor xera e envía mensaxes ao tema que necesitamos.
O seguinte paso é instalar Spark e procesar este fluxo de mensaxes.
Instalación de Apache Spark
Apache Spark é unha plataforma de computación en clúster universal e de alto rendemento.
Spark funciona mellor que as implementacións populares do modelo MapReduce mentres admite unha gama máis ampla de tipos de cálculo, incluíndo consultas interactivas e procesamento de fluxos. A velocidade xoga un papel importante ao procesar grandes cantidades de datos, xa que é a velocidade a que che permite traballar de forma interactiva sen pasar minutos ou horas de espera. Un dos maiores puntos fortes de Spark que o fai tan rápido é a súa capacidade para realizar cálculos na memoria.
Este framework está escrito en Scala, polo que primeiro debes instalalo:
sudo apt-get install scala
Descarga a distribución de Spark desde o sitio web oficial:
Executa o seguinte comando despois de facer cambios en bashrc:
source ~/.bashrc
Implementación de AWS PostgreSQL
Só queda despregar a base de datos na que cargaremos a información procesada dos fluxos. Para iso utilizaremos o servizo AWS RDS.
Vaia á consola de AWS -> AWS RDS -> Bases de datos -> Crear base de datos:
Seleccione PostgreSQL e prema Seguinte:
Porque Este exemplo é só para fins educativos; utilizaremos un servidor gratuíto "como mínimo" (Nivel gratuíto):
A continuación, marcamos o bloque Free Tier e, despois diso, ofreceranos automaticamente unha instancia da clase t2.micro, aínda que débil, é gratuíto e moi axeitado para a nosa tarefa:
A continuación veñen cousas moi importantes: o nome da instancia da base de datos, o nome do usuario mestre e o seu contrasinal. Poñemos o nome da instancia: myHabrTest, usuario mestre: habr, contrasinal: habr12345 e fai clic no botón Seguinte:
Na seguinte páxina aparecen os parámetros responsables da accesibilidade do noso servidor de bases de datos desde o exterior (Accesibilidade pública) e da dispoñibilidade do porto:
Imos crear unha nova configuración para o grupo de seguridade VPC, que permitirá o acceso externo ao noso servidor de bases de datos a través do porto 5432 (PostgreSQL).
Imos á consola de AWS nunha xanela separada do navegador ao panel de control de VPC -> Grupos de seguranza -> sección Crear grupo de seguridade:
Establecemos o nome do grupo de seguranza - PostgreSQL, unha descrición, indicamos a que VPC debe asociarse este grupo e prememos no botón Crear:
Encha as regras de entrada para o porto 5432 para o grupo recén creado, como se mostra na imaxe de abaixo. Non pode especificar o porto manualmente, pero seleccione PostgreSQL na lista despregable Tipo.
En rigor, o valor ::/0 significa a dispoñibilidade de tráfico entrante ao servidor de todo o mundo, o que canonicamente non é totalmente certo, pero para analizar o exemplo, permitímonos utilizar este enfoque:
Volvemos á páxina do navegador, onde temos "Configurar configuración avanzada" aberta e seleccionamos na sección Grupos de seguranza VPC -> Escolla os grupos de seguranza VPC existentes -> PostgreSQL:
A continuación, nas opcións da base de datos -> Nome da base de datos -> establece o nome - habrDB.
Podemos deixar o resto de parámetros, a excepción da desactivación da copia de seguridade (período de retención de copia de seguranza - 0 días), monitorización e Performance Insights, por defecto. Fai clic no botón Crear base de datos:
Manexador de fíos
A fase final será o desenvolvemento dun traballo de Spark, que procesará novos datos procedentes de Kafka cada dous segundos e introducirá o resultado na base de datos.
Como se indicou anteriormente, os puntos de control son un mecanismo central en SparkStreaming que debe configurarse para garantir a tolerancia a fallos. Utilizaremos puntos de control e, se o procedemento falla, o módulo Spark Streaming só terá que volver ao último punto de control e retomar os cálculos a partir del para recuperar os datos perdidos.
Pódese activar o checkpoint configurando un directorio nun sistema de ficheiros fiable e tolerante a fallos (como HDFS, S3, etc.) no que se almacenará a información do punto de control. Isto faise usando, por exemplo:
streamingContext.checkpoint(checkpointDirectory)
No noso exemplo, usaremos o seguinte enfoque, é dicir, se existe checkpointDirectory, entón o contexto recrearase a partir dos datos do punto de control. Se o directorio non existe (é dicir, executado por primeira vez), entón chámase a functionToCreateContext para crear un novo contexto e configurar DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Creamos un obxecto DirectStream para conectarse ao tema "transacción" usando o método createDirectStream da biblioteca KafkaUtils:
Usando Spark SQL, facemos unha agrupación sinxela e mostramos o resultado na consola:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Obter o texto da consulta e executalo a través de Spark SQL:
E despois gardamos os datos agregados resultantes nunha táboa en AWS RDS. Para gardar os resultados da agregación nunha táboa de base de datos, utilizaremos o método de escritura do obxecto DataFrame:
Algunhas palabras sobre a configuración dunha conexión a AWS RDS. Creamos o usuario e o contrasinal para el no paso "Implementación de AWS PostgreSQL". Debes usar Endpoint como URL do servidor de base de datos, que se mostra na sección Conectividade e seguridade:
Para conectar correctamente Spark e Kafka, debes executar o traballo mediante smark-submit usando o artefacto spark-streaming-kafka-0-8_2.11. Ademais, tamén usaremos un artefacto para interactuar coa base de datos PostgreSQL; transferirémolos mediante --packages.
Para a flexibilidade do script, tamén incluiremos como parámetros de entrada o nome do servidor de mensaxes e o tema do que queremos recibir os datos.
Entón, é hora de iniciar e comprobar a funcionalidade do sistema:
Todo funcionou! Como podes ver na imaxe de abaixo, mentres a aplicación está en execución, xorden novos resultados de agregación cada 2 segundos, porque establecemos o intervalo de lotes en 2 segundos cando creamos o obxecto StreamingContext:
A continuación, realizamos unha consulta sinxela á base de datos para comprobar a presenza de rexistros na táboa fluxo_transacción:
Conclusión
Este artigo analizou un exemplo de procesamento de información mediante Spark Streaming xunto con Apache Kafka e PostgreSQL. Co crecemento dos datos de varias fontes, é difícil sobreestimar o valor práctico de Spark Streaming para crear aplicacións en tempo real e en tempo real.
Podes atopar o código fonte completo no meu repositorio en GitHub.
Estou encantado de comentar este artigo, espero os teus comentarios e tamén espero críticas construtivas de todos os lectores interesados.
Desexo vostede éxito!
p.d. Inicialmente estaba previsto usar unha base de datos PostgreSQL local, pero dado o meu amor por AWS, decidín mover a base de datos á nube. No seguinte artigo sobre este tema, mostrarei como implementar todo o sistema descrito anteriormente en AWS usando AWS Kinesis e AWS EMR. Segue as noticias!