ProHoster > Blog > administração > Apache Kafka e processamento de dados de streaming com Spark Streaming
Apache Kafka e processamento de dados de streaming com Spark Streaming
Olá, Habr! Hoje construiremos um sistema que processará fluxos de mensagens Apache Kafka usando Spark Streaming e gravará os resultados do processamento no banco de dados em nuvem AWS RDS.
Imaginemos que uma determinada instituição de crédito nos atribui a tarefa de processar as transações recebidas “on the fly” em todas as suas sucursais. Isso pode ser feito com a finalidade de calcular prontamente uma posição cambial aberta para o tesouro, limites ou resultados financeiros para transações, etc.
Como implementar este caso sem o uso de magia e feitiços mágicos - leia abaixo! Ir!
É claro que o processamento de uma grande quantidade de dados em tempo real oferece amplas oportunidades para uso em sistemas modernos. Uma das combinações mais populares para isso é o conjunto de Apache Kafka e Spark Streaming, onde Kafka cria um fluxo de pacotes de mensagens recebidas e o Spark Streaming processa esses pacotes em um determinado intervalo de tempo.
Para aumentar a tolerância a falhas da aplicação, usaremos checkpoints. Com esse mecanismo, quando o mecanismo Spark Streaming precisa recuperar dados perdidos, ele só precisa voltar ao último ponto de verificação e retomar os cálculos a partir daí.
Arquitetura do sistema desenvolvido
Componentes usados:
Apache Kafka é um sistema distribuído de mensagens de publicação e assinatura. Adequado para consumo de mensagens offline e online. Para evitar a perda de dados, as mensagens Kafka são armazenadas em disco e replicadas no cluster. O sistema Kafka é baseado no serviço de sincronização ZooKeeper;
Transmissão Apache Spark - Componente Spark para processamento de dados de streaming. O módulo Spark Streaming é construído usando uma arquitetura de microlotes, onde o fluxo de dados é interpretado como uma sequência contínua de pequenos pacotes de dados. Spark Streaming pega dados de diferentes fontes e os combina em pequenos pacotes. Novos pacotes são criados em intervalos regulares. No início de cada intervalo de tempo, um novo pacote é criado e quaisquer dados recebidos durante esse intervalo são incluídos no pacote. No final do intervalo, o crescimento do pacote é interrompido. O tamanho do intervalo é determinado por um parâmetro denominado intervalo de lote;
SQL do Apache Spark - combina processamento relacional com programação funcional Spark. Dados estruturados significam dados que possuem um esquema, ou seja, um único conjunto de campos para todos os registros. O Spark SQL oferece suporte à entrada de uma variedade de fontes de dados estruturados e, graças à disponibilidade de informações de esquema, pode recuperar com eficiência apenas os campos obrigatórios dos registros e também fornece APIs DataFrame;
AWS RDS é um banco de dados relacional baseado em nuvem relativamente barato, um serviço da web que simplifica a configuração, a operação e o dimensionamento e é administrado diretamente pela Amazon.
Instalando e executando o servidor Kafka
Antes de usar o Kafka diretamente, você precisa ter certeza de que possui Java, porque... JVM é usado para trabalhar:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
A próxima etapa é opcional. O fato é que as configurações padrão não permitem o uso completo de todos os recursos do Apache Kafka. Por exemplo, exclua um tópico, categoria ou grupo no qual as mensagens podem ser publicadas. Para mudar isso, vamos editar o arquivo de configuração:
vim ~/kafka/config/server.properties
Adicione o seguinte ao final do arquivo:
delete.topic.enable = true
Antes de iniciar o servidor Kafka, você precisa iniciar o servidor ZooKeeper; usaremos o script auxiliar que acompanha a distribuição Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Depois que o ZooKeeper for iniciado com sucesso, inicie o servidor Kafka em um terminal separado:
Vamos perder os momentos de testar o produtor e o consumidor para o tema recém-criado. Mais detalhes sobre como você pode testar o envio e recebimento de mensagens estão escritos na documentação oficial - Envie algumas mensagens. Bem, passamos a escrever um produtor em Python usando a API KafkaProducer.
Escrita do produtor
O produtor irá gerar dados aleatórios – 100 mensagens por segundo. Por dados aleatórios entendemos um dicionário que consiste em três campos:
Ramo — nome do ponto de venda da instituição de crédito;
Moeda - moeda de transação;
Valor — valor da transação. O valor será um número positivo se for uma compra de moeda pelo Banco, e um número negativo se for uma venda.
A seguir, através do método send, enviamos uma mensagem ao servidor, para o tópico que necessitamos, no formato JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Ao executar o script, recebemos as seguintes mensagens no terminal:
Isso significa que tudo funciona como queríamos - o produtor gera e envia mensagens sobre o tema que precisamos.
A próxima etapa é instalar o Spark e processar esse fluxo de mensagens.
Instalando o Apache Spark
Apache Spark é uma plataforma de computação em cluster universal e de alto desempenho.
O Spark tem um desempenho melhor do que as implementações populares do modelo MapReduce, ao mesmo tempo que oferece suporte a uma ampla variedade de tipos de computação, incluindo consultas interativas e processamento de fluxo. A velocidade desempenha um papel importante no processamento de grandes quantidades de dados, pois é a velocidade que permite trabalhar de forma interativa sem perder minutos ou horas de espera. Um dos maiores pontos fortes do Spark que o torna tão rápido é a capacidade de realizar cálculos na memória.
Este framework foi escrito em Scala, então você precisa instalá-lo primeiro:
Execute o comando abaixo após fazer alterações no bashrc:
source ~/.bashrc
Implantação do AWS PostgreSQL
Resta implantar o banco de dados no qual carregaremos as informações processadas dos fluxos. Para isso utilizaremos o serviço AWS RDS.
Vá para o console AWS -> AWS RDS -> Bancos de dados -> Criar banco de dados:
Selecione PostgreSQL e clique em Avançar:
Porque Este exemplo é apenas para fins educacionais; usaremos um servidor gratuito “no mínimo” (Free Tier):
Em seguida, marcamos o bloco Free Tier e, em seguida, será oferecida automaticamente uma instância da classe t2.micro - embora fraca, é gratuita e bastante adequada para nossa tarefa:
A seguir vêm coisas muito importantes: o nome da instância do banco de dados, o nome do usuário mestre e sua senha. Vamos nomear a instância: myHabrTest, usuário mestre: ter, senha: habr12345 e clique no botão Avançar:
Na próxima página estão os parâmetros responsáveis pela acessibilidade externa do nosso servidor de banco de dados (Acessibilidade pública) e disponibilidade de portas:
Vamos criar uma nova configuração para o grupo de segurança VPC, que permitirá acesso externo ao nosso servidor de banco de dados através da porta 5432 (PostgreSQL).
Vamos para o console AWS em uma janela separada do navegador para a seção VPC Dashboard -> Security Groups -> Create security group:
Definimos o nome do grupo de segurança - PostgreSQL, uma descrição, indicamos a qual VPC este grupo deve estar associado e clicamos no botão Criar:
Preencha as regras de entrada para a porta 5432 para o grupo recém-criado, conforme mostrado na imagem abaixo. Você não pode especificar a porta manualmente, mas selecione PostgreSQL na lista suspensa Tipo.
A rigor, o valor ::/0 significa a disponibilidade de tráfego de entrada para o servidor vindo de todo o mundo, o que canonicamente não é totalmente verdade, mas para analisar o exemplo, vamos nos permitir usar esta abordagem:
Voltamos à página do navegador, onde temos “Definir configurações avançadas” aberto e selecionamos na seção Grupos de segurança VPC -> Escolher grupos de segurança VPC existentes -> PostgreSQL:
A seguir, nas opções do banco de dados -> Nome do banco de dados -> defina o nome - habrDB.
Podemos deixar os demais parâmetros, com exceção da desativação do backup (período de retenção do backup - 0 dias), monitoramento e Performance Insights, por padrão. Clique no botão Criar banco de dados:
Manipulador de thread
A etapa final será o desenvolvimento de um job Spark, que processará novos dados vindos do Kafka a cada dois segundos e inserirá o resultado no banco de dados.
Conforme observado acima, os pontos de verificação são um mecanismo central no SparkStreaming que deve ser configurado para garantir tolerância a falhas. Utilizaremos checkpoints e, caso o procedimento falhe, o módulo Spark Streaming precisará apenas retornar ao último checkpoint e retomar os cálculos a partir dele para recuperar os dados perdidos.
O ponto de verificação pode ser ativado definindo um diretório em um sistema de arquivos confiável e tolerante a falhas (como HDFS, S3, etc.) no qual as informações do ponto de verificação serão armazenadas. Isso é feito usando, por exemplo:
streamingContext.checkpoint(checkpointDirectory)
Em nosso exemplo, usaremos a seguinte abordagem, ou seja, se checkpointDirectory existir, então o contexto será recriado a partir dos dados do ponto de verificação. Se o diretório não existir (ou seja, executado pela primeira vez), então functionToCreateContext será chamado para criar um novo contexto e configurar DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Criamos um objeto DirectStream para conectar ao tópico “transação” usando o método createDirectStream da biblioteca KafkaUtils:
Usando Spark SQL, fazemos um agrupamento simples e exibimos o resultado no console:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Obtendo o texto da consulta e executando-o por meio do Spark SQL:
E então salvamos os dados agregados resultantes em uma tabela no AWS RDS. Para salvar os resultados da agregação em uma tabela de banco de dados, usaremos o método write do objeto DataFrame:
Algumas palavras sobre como configurar uma conexão com AWS RDS. Criamos o usuário e a senha para ele na etapa “Implantando AWS PostgreSQL”. Você deve usar Endpoint como URL do servidor de banco de dados, que é exibido na seção Conectividade e segurança:
Para conectar corretamente o Spark e o Kafka, você deve executar o trabalho via smark-submit usando o artefato spark-streaming-kafka-0-8_2.11. Além disso, também usaremos um artefato para interagir com o banco de dados PostgreSQL; iremos transferi-los via --packages.
Para flexibilidade do script, incluiremos também como parâmetros de entrada o nome do servidor de mensagens e o tópico do qual queremos receber os dados.
Então é hora de iniciar e verificar a funcionalidade do sistema:
Deu tudo certo! Como você pode ver na imagem abaixo, enquanto o aplicativo está em execução, novos resultados de agregação são gerados a cada 2 segundos, porque definimos o intervalo de lote para 2 segundos quando criamos o objeto StreamingContext:
A seguir, fazemos uma consulta simples ao banco de dados para verificar a presença de registros na tabela fluxo_transação:
Conclusão
Este artigo analisou um exemplo de processamento de fluxo de informações usando Spark Streaming em conjunto com Apache Kafka e PostgreSQL. Com o crescimento dos dados de diversas fontes, é difícil superestimar o valor prático do Spark Streaming para a criação de aplicativos de streaming e em tempo real.
Você pode encontrar o código-fonte completo em meu repositório em GitHub.
Fico feliz em discutir este artigo, aguardo seus comentários e também espero críticas construtivas de todos os leitores atenciosos.
Desejo-lhe sucesso!
Ps. Inicialmente foi planejado usar um banco de dados PostgreSQL local, mas devido ao meu amor pela AWS, decidi migrar o banco de dados para a nuvem. No próximo artigo sobre este tema, mostrarei como implementar todo o sistema descrito acima na AWS usando AWS Kinesis e AWS EMR. Acompanhe as novidades!