Apache Kafka e processamento de dados de streaming com Spark Streaming

Olá, Habr! Hoje construiremos um sistema que processará fluxos de mensagens Apache Kafka usando Spark Streaming e gravará os resultados do processamento no banco de dados em nuvem AWS RDS.

Imaginemos que uma determinada instituição de crédito nos atribui a tarefa de processar as transações recebidas “on the fly” em todas as suas sucursais. Isso pode ser feito com a finalidade de calcular prontamente uma posição cambial aberta para o tesouro, limites ou resultados financeiros para transações, etc.

Como implementar este caso sem o uso de magia e feitiços mágicos - leia abaixo! Ir!

Apache Kafka e processamento de dados de streaming com Spark Streaming
(Fonte da imagem)

Introdução

É claro que o processamento de uma grande quantidade de dados em tempo real oferece amplas oportunidades para uso em sistemas modernos. Uma das combinações mais populares para isso é o conjunto de Apache Kafka e Spark Streaming, onde Kafka cria um fluxo de pacotes de mensagens recebidas e o Spark Streaming processa esses pacotes em um determinado intervalo de tempo.

Para aumentar a tolerância a falhas da aplicação, usaremos checkpoints. Com esse mecanismo, quando o mecanismo Spark Streaming precisa recuperar dados perdidos, ele só precisa voltar ao último ponto de verificação e retomar os cálculos a partir daí.

Arquitetura do sistema desenvolvido

Apache Kafka e processamento de dados de streaming com Spark Streaming

Componentes usados:

  • Apache Kafka é um sistema distribuído de mensagens de publicação e assinatura. Adequado para consumo de mensagens offline e online. Para evitar a perda de dados, as mensagens Kafka são armazenadas em disco e replicadas no cluster. O sistema Kafka é baseado no serviço de sincronização ZooKeeper;
  • Transmissão Apache Spark - Componente Spark para processamento de dados de streaming. O módulo Spark Streaming é construído usando uma arquitetura de microlotes, onde o fluxo de dados é interpretado como uma sequência contínua de pequenos pacotes de dados. Spark Streaming pega dados de diferentes fontes e os combina em pequenos pacotes. Novos pacotes são criados em intervalos regulares. No início de cada intervalo de tempo, um novo pacote é criado e quaisquer dados recebidos durante esse intervalo são incluídos no pacote. No final do intervalo, o crescimento do pacote é interrompido. O tamanho do intervalo é determinado por um parâmetro denominado intervalo de lote;
  • SQL do Apache Spark - combina processamento relacional com programação funcional Spark. Dados estruturados significam dados que possuem um esquema, ou seja, um único conjunto de campos para todos os registros. O Spark SQL oferece suporte à entrada de uma variedade de fontes de dados estruturados e, graças à disponibilidade de informações de esquema, pode recuperar com eficiência apenas os campos obrigatórios dos registros e também fornece APIs DataFrame;
  • AWS RDS é um banco de dados relacional baseado em nuvem relativamente barato, um serviço da web que simplifica a configuração, a operação e o dimensionamento e é administrado diretamente pela Amazon.

Instalando e executando o servidor Kafka

Antes de usar o Kafka diretamente, você precisa ter certeza de que possui Java, porque... JVM é usado para trabalhar:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Vamos criar um novo usuário para trabalhar com Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

A seguir, baixe a distribuição do site oficial do Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Descompacte o arquivo baixado:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

A próxima etapa é opcional. O fato é que as configurações padrão não permitem o uso completo de todos os recursos do Apache Kafka. Por exemplo, exclua um tópico, categoria ou grupo no qual as mensagens podem ser publicadas. Para mudar isso, vamos editar o arquivo de configuração:

vim ~/kafka/config/server.properties

Adicione o seguinte ao final do arquivo:

delete.topic.enable = true

Antes de iniciar o servidor Kafka, você precisa iniciar o servidor ZooKeeper; usaremos o script auxiliar que acompanha a distribuição Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Depois que o ZooKeeper for iniciado com sucesso, inicie o servidor Kafka em um terminal separado:

bin/kafka-server-start.sh config/server.properties

Vamos criar um novo tópico chamado Transação:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Vamos ter certeza de que um tópico com o número necessário de partições e replicações foi criado:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka e processamento de dados de streaming com Spark Streaming

Vamos perder os momentos de testar o produtor e o consumidor para o tema recém-criado. Mais detalhes sobre como você pode testar o envio e recebimento de mensagens estão escritos na documentação oficial - Envie algumas mensagens. Bem, passamos a escrever um produtor em Python usando a API KafkaProducer.

Escrita do produtor

O produtor irá gerar dados aleatórios – 100 mensagens por segundo. Por dados aleatórios entendemos um dicionário que consiste em três campos:

  • Ramo — nome do ponto de venda da instituição de crédito;
  • Moeda - moeda de transação;
  • Valor — valor da transação. O valor será um número positivo se for uma compra de moeda pelo Banco, e um número negativo se for uma venda.

O código do produtor é assim:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

A seguir, através do método send, enviamos uma mensagem ao servidor, para o tópico que necessitamos, no formato JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Ao executar o script, recebemos as seguintes mensagens no terminal:

Apache Kafka e processamento de dados de streaming com Spark Streaming

Isso significa que tudo funciona como queríamos - o produtor gera e envia mensagens sobre o tema que precisamos.
A próxima etapa é instalar o Spark e processar esse fluxo de mensagens.

Instalando o Apache Spark

Apache Spark é uma plataforma de computação em cluster universal e de alto desempenho.

O Spark tem um desempenho melhor do que as implementações populares do modelo MapReduce, ao mesmo tempo que oferece suporte a uma ampla variedade de tipos de computação, incluindo consultas interativas e processamento de fluxo. A velocidade desempenha um papel importante no processamento de grandes quantidades de dados, pois é a velocidade que permite trabalhar de forma interativa sem perder minutos ou horas de espera. Um dos maiores pontos fortes do Spark que o torna tão rápido é a capacidade de realizar cálculos na memória.

Este framework foi escrito em Scala, então você precisa instalá-lo primeiro:

sudo apt-get install scala

Baixe a distribuição Spark do site oficial:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Descompacte o arquivo:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Adicione o caminho do Spark ao arquivo bash:

vim ~/.bashrc

Adicione as seguintes linhas através do editor:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Execute o comando abaixo após fazer alterações no bashrc:

source ~/.bashrc

Implantação do AWS PostgreSQL

Resta implantar o banco de dados no qual carregaremos as informações processadas dos fluxos. Para isso utilizaremos o serviço AWS RDS.

Vá para o console AWS -> AWS RDS -> Bancos de dados -> Criar banco de dados:
Apache Kafka e processamento de dados de streaming com Spark Streaming

Selecione PostgreSQL e clique em Avançar:
Apache Kafka e processamento de dados de streaming com Spark Streaming

Porque Este exemplo é apenas para fins educacionais; usaremos um servidor gratuito “no mínimo” (Free Tier):
Apache Kafka e processamento de dados de streaming com Spark Streaming

Em seguida, marcamos o bloco Free Tier e, em seguida, será oferecida automaticamente uma instância da classe t2.micro - embora fraca, é gratuita e bastante adequada para nossa tarefa:
Apache Kafka e processamento de dados de streaming com Spark Streaming

A seguir vêm coisas muito importantes: o nome da instância do banco de dados, o nome do usuário mestre e sua senha. Vamos nomear a instância: myHabrTest, usuário mestre: ter, senha: habr12345 e clique no botão Avançar:
Apache Kafka e processamento de dados de streaming com Spark Streaming

Na próxima página estão os parâmetros responsáveis ​​pela acessibilidade externa do nosso servidor de banco de dados (Acessibilidade pública) e disponibilidade de portas:

Apache Kafka e processamento de dados de streaming com Spark Streaming

Vamos criar uma nova configuração para o grupo de segurança VPC, que permitirá acesso externo ao nosso servidor de banco de dados através da porta 5432 (PostgreSQL).
Vamos para o console AWS em uma janela separada do navegador para a seção VPC Dashboard -> Security Groups -> Create security group:
Apache Kafka e processamento de dados de streaming com Spark Streaming

Definimos o nome do grupo de segurança - PostgreSQL, uma descrição, indicamos a qual VPC este grupo deve estar associado e clicamos no botão Criar:
Apache Kafka e processamento de dados de streaming com Spark Streaming

Preencha as regras de entrada para a porta 5432 para o grupo recém-criado, conforme mostrado na imagem abaixo. Você não pode especificar a porta manualmente, mas selecione PostgreSQL na lista suspensa Tipo.

A rigor, o valor ::/0 significa a disponibilidade de tráfego de entrada para o servidor vindo de todo o mundo, o que canonicamente não é totalmente verdade, mas para analisar o exemplo, vamos nos permitir usar esta abordagem:
Apache Kafka e processamento de dados de streaming com Spark Streaming

Voltamos à página do navegador, onde temos “Definir configurações avançadas” aberto e selecionamos na seção Grupos de segurança VPC -> Escolher grupos de segurança VPC existentes -> PostgreSQL:
Apache Kafka e processamento de dados de streaming com Spark Streaming

A seguir, nas opções do banco de dados -> Nome do banco de dados -> defina o nome - habrDB.

Podemos deixar os demais parâmetros, com exceção da desativação do backup (período de retenção do backup - 0 dias), monitoramento e Performance Insights, por padrão. Clique no botão Criar banco de dados:
Apache Kafka e processamento de dados de streaming com Spark Streaming

Manipulador de thread

A etapa final será o desenvolvimento de um job Spark, que processará novos dados vindos do Kafka a cada dois segundos e inserirá o resultado no banco de dados.

Conforme observado acima, os pontos de verificação são um mecanismo central no SparkStreaming que deve ser configurado para garantir tolerância a falhas. Utilizaremos checkpoints e, caso o procedimento falhe, o módulo Spark Streaming precisará apenas retornar ao último checkpoint e retomar os cálculos a partir dele para recuperar os dados perdidos.

O ponto de verificação pode ser ativado definindo um diretório em um sistema de arquivos confiável e tolerante a falhas (como HDFS, S3, etc.) no qual as informações do ponto de verificação serão armazenadas. Isso é feito usando, por exemplo:

streamingContext.checkpoint(checkpointDirectory)

Em nosso exemplo, usaremos a seguinte abordagem, ou seja, se checkpointDirectory existir, então o contexto será recriado a partir dos dados do ponto de verificação. Se o diretório não existir (ou seja, executado pela primeira vez), então functionToCreateContext será chamado para criar um novo contexto e configurar DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Criamos um objeto DirectStream para conectar ao tópico “transação” usando o método createDirectStream da biblioteca KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Analisando dados recebidos no formato JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Usando Spark SQL, fazemos um agrupamento simples e exibimos o resultado no console:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Obtendo o texto da consulta e executando-o por meio do Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

E então salvamos os dados agregados resultantes em uma tabela no AWS RDS. Para salvar os resultados da agregação em uma tabela de banco de dados, usaremos o método write do objeto DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Algumas palavras sobre como configurar uma conexão com AWS RDS. Criamos o usuário e a senha para ele na etapa “Implantando AWS PostgreSQL”. Você deve usar Endpoint como URL do servidor de banco de dados, que é exibido na seção Conectividade e segurança:

Apache Kafka e processamento de dados de streaming com Spark Streaming

Para conectar corretamente o Spark e o Kafka, você deve executar o trabalho via smark-submit usando o artefato spark-streaming-kafka-0-8_2.11. Além disso, também usaremos um artefato para interagir com o banco de dados PostgreSQL; iremos transferi-los via --packages.

Para flexibilidade do script, incluiremos também como parâmetros de entrada o nome do servidor de mensagens e o tópico do qual queremos receber os dados.

Então é hora de iniciar e verificar a funcionalidade do sistema:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Deu tudo certo! Como você pode ver na imagem abaixo, enquanto o aplicativo está em execução, novos resultados de agregação são gerados a cada 2 segundos, porque definimos o intervalo de lote para 2 segundos quando criamos o objeto StreamingContext:

Apache Kafka e processamento de dados de streaming com Spark Streaming

A seguir, fazemos uma consulta simples ao banco de dados para verificar a presença de registros na tabela fluxo_transação:

Apache Kafka e processamento de dados de streaming com Spark Streaming

Conclusão

Este artigo analisou um exemplo de processamento de fluxo de informações usando Spark Streaming em conjunto com Apache Kafka e PostgreSQL. Com o crescimento dos dados de diversas fontes, é difícil superestimar o valor prático do Spark Streaming para a criação de aplicativos de streaming e em tempo real.

Você pode encontrar o código-fonte completo em meu repositório em GitHub.

Fico feliz em discutir este artigo, aguardo seus comentários e também espero críticas construtivas de todos os leitores atenciosos.

Desejo-lhe sucesso!

Ps. Inicialmente foi planejado usar um banco de dados PostgreSQL local, mas devido ao meu amor pela AWS, decidi migrar o banco de dados para a nuvem. No próximo artigo sobre este tema, mostrarei como implementar todo o sistema descrito acima na AWS usando AWS Kinesis e AWS EMR. Acompanhe as novidades!

Fonte: habr.com

Adicionar um comentário