Apache Kafka e procesamento de datos en streaming con Spark Streaming

Ola, Habr! Hoxe imos construír un sistema que procesará fluxos de mensaxes de Apache Kafka mediante Spark Streaming e escribirá os resultados do procesamento na base de datos na nube de AWS RDS.

Imaxinemos que unha determinada entidade de crédito nos encarga a tarefa de procesar transaccións entrantes "sobre a marcha" en todas as súas oficinas. Isto pódese facer co propósito de calcular rapidamente unha posición de moeda aberta para o tesouro, límites ou resultados financeiros para transaccións, etc.

Como implementar este caso sen o uso de maxias e feitizos máxicos: le debaixo do corte! Vaia!

Apache Kafka e procesamento de datos en streaming con Spark Streaming
(Fonte da imaxe)

Introdución

Por suposto, procesar unha gran cantidade de datos en tempo real ofrece amplas oportunidades de uso nos sistemas modernos. Unha das combinacións máis populares para iso é o tándem de Apache Kafka e Spark Streaming, onde Kafka crea un fluxo de paquetes de mensaxes entrantes e Spark Streaming procesa estes paquetes nun intervalo de tempo determinado.

Para aumentar a tolerancia a fallos da aplicación, utilizaremos puntos de control. Con este mecanismo, cando o motor de Spark Streaming necesita recuperar datos perdidos, só ten que volver ao último punto de control e retomar os cálculos desde alí.

Arquitectura do sistema desenvolvido

Apache Kafka e procesamento de datos en streaming con Spark Streaming

Compoñentes utilizados:

  • Apache Kafka é un sistema de mensaxería distribuído de publicación-subscrición. Adecuado tanto para o consumo de mensaxes fóra de liña como en liña. Para evitar a perda de datos, as mensaxes de Kafka almacénanse no disco e replícanse dentro do clúster. O sistema Kafka está construído sobre o servizo de sincronización ZooKeeper;
  • Transmisión de Apache Spark - Compoñente Spark para procesar datos de transmisión. O módulo Spark Streaming está construído mediante unha arquitectura de micro-lotes, onde o fluxo de datos se interpreta como unha secuencia continua de pequenos paquetes de datos. Spark Streaming toma datos de diferentes fontes e combínaos en pequenos paquetes. Os novos paquetes créanse a intervalos regulares. Ao comezo de cada intervalo de tempo, créase un novo paquete e os datos recibidos durante ese intervalo inclúense no paquete. Ao final do intervalo, o crecemento do paquete detense. O tamaño do intervalo está determinado por un parámetro chamado intervalo por lotes;
  • Apache Spark SQL - combina o procesamento relacional coa programación funcional de Spark. Os datos estruturados son datos que teñen un esquema, é dicir, un único conxunto de campos para todos os rexistros. Spark SQL admite a entrada dunha variedade de fontes de datos estruturados e, grazas á dispoñibilidade de información do esquema, pode recuperar de forma eficiente só os campos necesarios dos rexistros e tamén ofrece API de DataFrame;
  • AWS RDS é unha base de datos relacional baseada na nube relativamente barata, un servizo web que simplifica a configuración, o funcionamento e a escala, e que é administrado directamente por Amazon.

Instalación e execución do servidor Kafka

Antes de usar Kafka directamente, debes asegurarte de ter Java, porque... JVM úsase para traballar:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Imos crear un novo usuario para traballar con Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

A continuación, descarga a distribución desde o sitio web oficial de Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Descomprimir o arquivo descargado:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

O seguinte paso é opcional. O feito é que a configuración predeterminada non che permite utilizar todas as funcións de Apache Kafka. Por exemplo, elimina un tema, categoría ou grupo no que se poden publicar mensaxes. Para cambialo, editemos o ficheiro de configuración:

vim ~/kafka/config/server.properties

Engade o seguinte ao final do ficheiro:

delete.topic.enable = true

Antes de iniciar o servidor Kafka, cómpre iniciar o servidor ZooKeeper; usaremos o script auxiliar que inclúe a distribución Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Despois de iniciar ZooKeeper correctamente, inicie o servidor Kafka nun terminal separado:

bin/kafka-server-start.sh config/server.properties

Imos crear un novo tema chamado Transacción:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Asegurémonos de que se creou un tema co número necesario de particións e replicación:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka e procesamento de datos en streaming con Spark Streaming

Perdamos os momentos de probas ao produtor e ao consumidor para o tema de nova creación. Máis detalles sobre como pode probar o envío e a recepción de mensaxes están escritos na documentación oficial - Envía algunhas mensaxes. Ben, pasamos a escribir un produtor en Python usando a API KafkaProducer.

Redacción do produtor

O produtor xerará datos aleatorios: 100 mensaxes por segundo. Por datos aleatorios entendemos un dicionario formado por tres campos:

  • Rama - nome do punto de venda da entidade de crédito;
  • Moeda - moeda de transacción;
  • cantidade - importe da transacción. O importe será un número positivo se se trata dunha compra de moeda por parte do Banco, e un número negativo se se trata dunha venda.

O código para o produtor é o seguinte:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

A continuación, mediante o método send, enviamos unha mensaxe ao servidor, ao tema que necesitamos, en formato JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Ao executar o script, recibimos as seguintes mensaxes no terminal:

Apache Kafka e procesamento de datos en streaming con Spark Streaming

Isto significa que todo funciona como queriamos: o produtor xera e envía mensaxes ao tema que necesitamos.
O seguinte paso é instalar Spark e procesar este fluxo de mensaxes.

Instalación de Apache Spark

Apache Spark é unha plataforma de computación en clúster universal e de alto rendemento.

Spark funciona mellor que as implementacións populares do modelo MapReduce mentres admite unha gama máis ampla de tipos de cálculo, incluíndo consultas interactivas e procesamento de fluxos. A velocidade xoga un papel importante ao procesar grandes cantidades de datos, xa que é a velocidade a que che permite traballar de forma interactiva sen pasar minutos ou horas de espera. Un dos maiores puntos fortes de Spark que o fai tan rápido é a súa capacidade para realizar cálculos na memoria.

Este framework está escrito en Scala, polo que primeiro debes instalalo:

sudo apt-get install scala

Descarga a distribución de Spark desde o sitio web oficial:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Descomprimir o arquivo:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Engade o camiño a Spark ao ficheiro bash:

vim ~/.bashrc

Engade as seguintes liñas a través do editor:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Executa o seguinte comando despois de facer cambios en bashrc:

source ~/.bashrc

Implementación de AWS PostgreSQL

Só queda despregar a base de datos na que cargaremos a información procesada dos fluxos. Para iso utilizaremos o servizo AWS RDS.

Vaia á consola de AWS -> AWS RDS -> Bases de datos -> Crear base de datos:
Apache Kafka e procesamento de datos en streaming con Spark Streaming

Seleccione PostgreSQL e prema Seguinte:
Apache Kafka e procesamento de datos en streaming con Spark Streaming

Porque Este exemplo é só para fins educativos; utilizaremos un servidor gratuíto "como mínimo" (Nivel gratuíto):
Apache Kafka e procesamento de datos en streaming con Spark Streaming

A continuación, marcamos o bloque Free Tier e, despois diso, ofreceranos automaticamente unha instancia da clase t2.micro, aínda que débil, é gratuíto e moi axeitado para a nosa tarefa:
Apache Kafka e procesamento de datos en streaming con Spark Streaming

A continuación veñen cousas moi importantes: o nome da instancia da base de datos, o nome do usuario mestre e o seu contrasinal. Poñemos o nome da instancia: myHabrTest, usuario mestre: habr, contrasinal: habr12345 e fai clic no botón Seguinte:
Apache Kafka e procesamento de datos en streaming con Spark Streaming

Na seguinte páxina aparecen os parámetros responsables da accesibilidade do noso servidor de bases de datos desde o exterior (Accesibilidade pública) e da dispoñibilidade do porto:

Apache Kafka e procesamento de datos en streaming con Spark Streaming

Imos crear unha nova configuración para o grupo de seguridade VPC, que permitirá o acceso externo ao noso servidor de bases de datos a través do porto 5432 (PostgreSQL).
Imos á consola de AWS nunha xanela separada do navegador ao panel de control de VPC -> Grupos de seguranza -> sección Crear grupo de seguridade:
Apache Kafka e procesamento de datos en streaming con Spark Streaming

Establecemos o nome do grupo de seguranza - PostgreSQL, unha descrición, indicamos a que VPC debe asociarse este grupo e prememos no botón Crear:
Apache Kafka e procesamento de datos en streaming con Spark Streaming

Encha as regras de entrada para o porto 5432 para o grupo recén creado, como se mostra na imaxe de abaixo. Non pode especificar o porto manualmente, pero seleccione PostgreSQL na lista despregable Tipo.

En rigor, o valor ::/0 significa a dispoñibilidade de tráfico entrante ao servidor de todo o mundo, o que canonicamente non é totalmente certo, pero para analizar o exemplo, permitímonos utilizar este enfoque:
Apache Kafka e procesamento de datos en streaming con Spark Streaming

Volvemos á páxina do navegador, onde temos "Configurar configuración avanzada" aberta e seleccionamos na sección Grupos de seguranza VPC -> Escolla os grupos de seguranza VPC existentes -> PostgreSQL:
Apache Kafka e procesamento de datos en streaming con Spark Streaming

A continuación, nas opcións da base de datos -> Nome da base de datos -> establece o nome - habrDB.

Podemos deixar o resto de parámetros, a excepción da desactivación da copia de seguridade (período de retención de copia de seguranza - 0 días), monitorización e Performance Insights, por defecto. Fai clic no botón Crear base de datos:
Apache Kafka e procesamento de datos en streaming con Spark Streaming

Manexador de fíos

A fase final será o desenvolvemento dun traballo de Spark, que procesará novos datos procedentes de Kafka cada dous segundos e introducirá o resultado na base de datos.

Como se indicou anteriormente, os puntos de control son un mecanismo central en SparkStreaming que debe configurarse para garantir a tolerancia a fallos. Utilizaremos puntos de control e, se o procedemento falla, o módulo Spark Streaming só terá que volver ao último punto de control e retomar os cálculos a partir del para recuperar os datos perdidos.

Pódese activar o checkpoint configurando un directorio nun sistema de ficheiros fiable e tolerante a fallos (como HDFS, S3, etc.) no que se almacenará a información do punto de control. Isto faise usando, por exemplo:

streamingContext.checkpoint(checkpointDirectory)

No noso exemplo, usaremos o seguinte enfoque, é dicir, se existe checkpointDirectory, entón o contexto recrearase a partir dos datos do punto de control. Se o directorio non existe (é dicir, executado por primeira vez), entón chámase a functionToCreateContext para crear un novo contexto e configurar DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Creamos un obxecto DirectStream para conectarse ao tema "transacción" usando o método createDirectStream da biblioteca KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Analizando datos entrantes en formato JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Usando Spark SQL, facemos unha agrupación sinxela e mostramos o resultado na consola:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Obter o texto da consulta e executalo a través de Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

E despois gardamos os datos agregados resultantes nunha táboa en AWS RDS. Para gardar os resultados da agregación nunha táboa de base de datos, utilizaremos o método de escritura do obxecto DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Algunhas palabras sobre a configuración dunha conexión a AWS RDS. Creamos o usuario e o contrasinal para el no paso "Implementación de AWS PostgreSQL". Debes usar Endpoint como URL do servidor de base de datos, que se mostra na sección Conectividade e seguridade:

Apache Kafka e procesamento de datos en streaming con Spark Streaming

Para conectar correctamente Spark e Kafka, debes executar o traballo mediante smark-submit usando o artefacto spark-streaming-kafka-0-8_2.11. Ademais, tamén usaremos un artefacto para interactuar coa base de datos PostgreSQL; transferirémolos mediante --packages.

Para a flexibilidade do script, tamén incluiremos como parámetros de entrada o nome do servidor de mensaxes e o tema do que queremos recibir os datos.

Entón, é hora de iniciar e comprobar a funcionalidade do sistema:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Todo funcionou! Como podes ver na imaxe de abaixo, mentres a aplicación está en execución, xorden novos resultados de agregación cada 2 segundos, porque establecemos o intervalo de lotes en 2 segundos cando creamos o obxecto StreamingContext:

Apache Kafka e procesamento de datos en streaming con Spark Streaming

A continuación, realizamos unha consulta sinxela á base de datos para comprobar a presenza de rexistros na táboa fluxo_transacción:

Apache Kafka e procesamento de datos en streaming con Spark Streaming

Conclusión

Este artigo analizou un exemplo de procesamento de información mediante Spark Streaming xunto con Apache Kafka e PostgreSQL. Co crecemento dos datos de varias fontes, é difícil sobreestimar o valor práctico de Spark Streaming para crear aplicacións en tempo real e en tempo real.

Podes atopar o código fonte completo no meu repositorio en GitHub.

Estou encantado de comentar este artigo, espero os teus comentarios e tamén espero críticas construtivas de todos os lectores interesados.

Desexo vostede éxito!

p.d. Inicialmente estaba previsto usar unha base de datos PostgreSQL local, pero dado o meu amor por AWS, decidín mover a base de datos á nube. No seguinte artigo sobre este tema, mostrarei como implementar todo o sistema descrito anteriormente en AWS usando AWS Kinesis e AWS EMR. Segue as noticias!

Fonte: www.habr.com

Engadir un comentario