Apache Kafka i Data Streaming amb Spark Streaming

Hola Habr! Avui construirem un sistema que processarà els fluxos de missatges d'Apache Kafka mitjançant Spark Streaming i escriurà el resultat del processament a la base de dades del núvol AWS RDS.

Imaginem que una determinada entitat de crèdit ens proposa la tasca de processar les transaccions entrants "sobre la marxa" per a totes les seves oficines. Això es pot fer amb el propòsit de calcular ràpidament la posició de moneda oberta per a la tresoreria, els límits o el resultat financer de les transaccions, etc.

Com implementar aquest cas sense l'ús de màgia i encanteris màgics: llegiu sota el tall! Va!

Apache Kafka i Data Streaming amb Spark Streaming
(Font de la imatge)

Introducció

Per descomptat, el processament en temps real d'una gran quantitat de dades ofereix àmplies oportunitats d'ús en sistemes moderns. Una de les combinacions més populars per a això és el tàndem d'Apache Kafka i Spark Streaming, on Kafka crea un flux de paquets de missatges entrants i Spark Streaming processa aquests paquets en un interval de temps especificat.

Per millorar la tolerància a errors de l'aplicació, utilitzarem punts de control - punts de control. Amb aquest mecanisme, quan el mòdul Spark Streaming necessita recuperar dades perdudes, només necessita tornar a l'últim punt de control i reprendre els càlculs des d'allà.

L'arquitectura del sistema desenvolupat

Apache Kafka i Data Streaming amb Spark Streaming

Components utilitzats:

  • Apatxe Kafka és un sistema de missatgeria distribuït de publicació i subscripció. Apte per al consum de missatges fora de línia i en línia. Per evitar la pèrdua de dades, els missatges de Kafka s'emmagatzemen al disc i es reprodueixen dins del clúster. El sistema Kafka està construït sobre el servei de sincronització ZooKeeper;
  • Transmissió d'Apache Spark - un component Spark per processar dades de streaming. El mòdul Spark Streaming es construeix mitjançant una arquitectura de micro-lots, quan un flux de dades s'interpreta com una seqüència contínua de petits paquets de dades. Spark Streaming agafa dades de diferents fonts i les combina en petits lots. Es creen nous paquets a intervals regulars. Al començament de cada interval de temps, es crea un paquet nou i les dades rebudes durant aquest interval s'inclouen al paquet. Al final de l'interval, el creixement del paquet s'atura. La mida de l'interval ve determinada per un paràmetre anomenat interval per lots;
  • Apache Spark SQL - Combina el processament relacional amb la programació funcional de Spark. Les dades estructurades fan referència a les dades que tenen un esquema, és a dir, un únic conjunt de camps per a tots els registres. Spark SQL admet l'entrada d'una varietat de fonts de dades estructurades i, a causa de la presència d'informació d'esquema, només pot recuperar de manera eficient els camps necessaris dels registres i també proporciona API de DataFrame;
  • AWS RDS és una base de dades relacional basada en núvol relativament econòmica, un servei web que simplifica la configuració, el funcionament i l'escalat, administrat directament per Amazon.

Instal·lació i execució del servidor Kafka

Abans d'utilitzar Kafka directament, heu d'assegurar-vos que teniu Java, perquè JVM s'utilitza per treballar:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Creem un usuari nou per treballar amb Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

A continuació, descarregueu el kit de distribució des del lloc web oficial d'Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Descomprimir l'arxiu descarregat:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

El següent pas és opcional. El fet és que la configuració predeterminada no us permet utilitzar plenament totes les funcions d'Apache Kafka. Per exemple, suprimiu un tema, categoria, grup en què es poden publicar missatges. Per canviar-ho, editem el fitxer de configuració:

vim ~/kafka/config/server.properties

Afegiu el següent al final del fitxer:

delete.topic.enable = true

Abans d'iniciar el servidor Kafka, heu d'iniciar el servidor ZooKeeper, utilitzarem l'script d'ajuda que ve amb la distribució Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Després que ZooKeeper s'hagi iniciat correctament, iniciem el servidor Kafka en un terminal independent:

bin/kafka-server-start.sh config/server.properties

Creem un tema nou anomenat Transacció:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Assegurem-nos que s'ha creat un tema amb el nombre necessari de particions i rèplica:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka i Data Streaming amb Spark Streaming

Perdem els moments de prova del productor i consumidor del tema de nova creació. Més detalls sobre com podeu provar l'enviament i la recepció de missatges estan escrits a la documentació oficial: Envieu alguns missatges. Bé, estem passant a escriure un productor a Python mitjançant l'API KafkaProducer.

Redacció del productor

El productor generarà dades aleatòries: 100 missatges cada segon. Per dades aleatòries entenem un diccionari format per tres camps:

  • Branca — nom del punt de venda de l'entitat de crèdit;
  • Moneda - moneda de transacció;
  • quantitat - import de la transacció. L'import serà positiu si es tracta d'una compra de divises per part del Banc, i negatiu si es tracta d'una venda.

El codi del productor és el següent:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

A continuació, mitjançant el mètode send, enviem un missatge al servidor, al tema que necessitem, en format JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Quan executem l'script, ens apareixen els missatges següents al terminal:

Apache Kafka i Data Streaming amb Spark Streaming

Això vol dir que tot funciona com volíem: el productor genera i envia missatges al tema que necessitem.
El següent pas és instal·lar Spark i processar aquest flux de missatges.

Instal·lació d'Apache Spark

Apache Spark és una plataforma informàtica de clúster versàtil i d'alt rendiment.

Spark supera les implementacions populars del model MapReduce en termes de rendiment, alhora que ofereix suport per a una gamma més àmplia de tipus de càlcul, incloses consultes interactives i streaming. La velocitat té un paper important a l'hora de processar grans quantitats de dades, ja que és la velocitat la que permet treballar de manera interactiva sense passar minuts o hores d'espera. Un dels majors punts forts de Spark per oferir aquesta velocitat és la seva capacitat per realitzar càlculs a la memòria.

Aquest marc està escrit a Scala, de manera que primer cal instal·lar-lo:

sudo apt-get install scala

Baixeu la distribució de Spark des del lloc web oficial:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Descomprimir l'arxiu:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Afegiu el camí a Spark al fitxer bash:

vim ~/.bashrc

Afegiu les línies següents a través de l'editor:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Executeu l'ordre següent després de fer canvis a bashrc:

source ~/.bashrc

Implementació d'AWS PostgreSQL

Queda per ampliar la base de dades, on omplirem la informació processada dels fluxos. Per fer-ho, utilitzarem el servei AWS RDS.

Aneu a la consola AWS -> AWS RDS -> Bases de dades -> Crea base de dades:
Apache Kafka i Data Streaming amb Spark Streaming

Seleccioneu PostgreSQL i feu clic al botó Següent:
Apache Kafka i Data Streaming amb Spark Streaming

Perquè Aquest exemple s'analitza únicament amb finalitats educatives, farem servir un servidor gratuït "com a mínim" (Free Tier):
Apache Kafka i Data Streaming amb Spark Streaming

A continuació, comproveu el bloc de nivell gratuït i, després d'això, automàticament se'ns oferirà una instància de la classe t2.micro, encara que feble, però gratuïta i bastant adequada per a la nostra tasca:
Apache Kafka i Data Streaming amb Spark Streaming

Segueixen coses molt importants: el nom de la instància de base de dades, el nom de l'usuari mestre i la seva contrasenya. Anem a cridar la instància: myHabrTest, usuari mestre: habr, contrasenya: habr12345 i feu clic al botó Següent:
Apache Kafka i Data Streaming amb Spark Streaming

La pàgina següent conté els paràmetres responsables de l'accessibilitat del nostre servidor de bases de dades des de l'exterior (Accessibilitat pública) i la disponibilitat de ports:

Apache Kafka i Data Streaming amb Spark Streaming

Creem una nova configuració per al grup de seguretat VPC, que permetrà l'accés extern al nostre servidor de bases de dades a través del port 5432 (PostgreSQL).
Anem en una finestra del navegador independent a la consola d'AWS a la secció VPC Dashboard -> Security Groups -> Create security group:
Apache Kafka i Data Streaming amb Spark Streaming

Definim el nom del grup de seguretat - PostgreSQL, una descripció, especifiquem a quina VPC s'ha d'associar aquest grup i feu clic al botó Crea:
Apache Kafka i Data Streaming amb Spark Streaming

Omplim les regles d'entrada del grup recentment creat per al port 5432, tal com es mostra a la imatge següent. No podeu especificar el port manualment, però seleccioneu PostgreSQL a la llista desplegable Tipus.

En sentit estricte, el valor ::/0 significa la disponibilitat de trànsit entrant per al servidor d'arreu del món, cosa que no és del tot cert canònicament, però per analitzar l'exemple, utilitzem aquest enfocament:
Apache Kafka i Data Streaming amb Spark Streaming

Tornem a la pàgina del navegador, on tenim obert “Configurar la configuració avançada” i seleccionem Grups de seguretat de VPC -> Escolliu grups de seguretat de VPC existents -> PostgreSQL a la secció:
Apache Kafka i Data Streaming amb Spark Streaming

A continuació, a la secció Opcions de la base de dades -> Nom de la base de dades -> estableix el nom - habrDB.

Podem deixar la resta de paràmetres, a excepció de la desactivació de la còpia de seguretat (període de retenció de còpia de seguretat - 0 dies), la supervisió i Performance Insights, per defecte. Feu clic al botó Crea una base de dades:
Apache Kafka i Data Streaming amb Spark Streaming

Gestor de fluxos

L'etapa final serà el desenvolupament d'un treball Spark, que processarà noves dades de Kafka cada dos segons i introduirà el resultat a la base de dades.

Com s'ha indicat anteriorment, els punts de control són el mecanisme principal de SparkStreaming que s'ha de configurar per proporcionar tolerància a errors. Utilitzarem punts de control i, en cas d'error del procediment, el mòdul Spark Streaming només haurà de tornar a l'últim punt de control i reprendre els càlculs des d'aquest per recuperar les dades perdudes.

Es pot habilitar un punt de control configurant un directori en un sistema de fitxers fiable i tolerant a errors (per exemple, HDFS, S3, etc.) on s'emmagatzemarà la informació del punt de control. Això es fa amb, per exemple:

streamingContext.checkpoint(checkpointDirectory)

En el nostre exemple, utilitzarem el següent enfocament, és a dir, si el checkpointDirectory existeix, el context es recrearà a partir de les dades del punt de control. Si el directori no existeix (és a dir, s'està executant per primera vegada), es crida a la funció functionToCreateContext per crear un context nou i configurar DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Creem un objecte DirectStream per connectar-nos al tema "transacció" mitjançant el mètode createDirectStream de la biblioteca KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Analitzant les dades entrants en format JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Utilitzant Spark SQL, fem una agrupació senzilla i donem el resultat a la consola:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Obtenir el cos de la consulta i executar-lo mitjançant Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

I després desem les dades agregades rebudes en una taula a AWS RDS. Per desar els resultats de l'agregació en una taula de base de dades, utilitzarem el mètode d'escriptura de l'objecte DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Unes quantes paraules sobre la configuració d'una connexió a AWS RDS. Vam crear l'usuari i la contrasenya al pas "Implementació d'AWS PostgreSQL". Com a URL del servidor de bases de dades, hauríeu d'utilitzar el punt final, que es mostra a la secció Connectivitat i seguretat:

Apache Kafka i Data Streaming amb Spark Streaming

Per connectar correctament Spark i Kafka, hauríeu d'executar el treball mitjançant smark-submit mitjançant l'artefacte spark-streaming-kafka-0-8_2.11. A més, també utilitzarem un artefacte per interactuar amb la base de dades PostgreSQL, els passarem per --packages.

Per a la flexibilitat de l'script, també treurem el nom del servidor de missatges i el tema del qual volem rebre dades com a paràmetres d'entrada.

Per tant, és hora d'executar i provar el sistema:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Tot va funcionar! Com podeu veure a la imatge següent, mentre l'aplicació s'executa, es mostren nous resultats d'agregació cada 2 segons, perquè establim l'interval d'agrupació en 2 segons quan vam crear l'objecte StreamingContext:

Apache Kafka i Data Streaming amb Spark Streaming

A continuació, fem una consulta senzilla a la base de dades per comprovar si hi ha registres a la taula flux_transaccions:

Apache Kafka i Data Streaming amb Spark Streaming

Conclusió

En aquest article, es va considerar un exemple de processament d'informació en streaming mitjançant Spark Streaming juntament amb Apache Kafka i PostgreSQL. Amb el creixement del volum de dades de diverses fonts, és difícil sobreestimar el valor pràctic de Spark Streaming per crear aplicacions en temps real i de streaming.

Podeu trobar el codi font complet al meu repositori a GitHub.

Estic encantat de parlar d'aquest article, espero els vostres comentaris i també espero crítiques constructives de tots els lectors interessats.

Els desitjo molt èxit!

Pg. Originalment estava previst utilitzar una base de dades PostgreSQL local, però donat el meu amor per AWS, vaig decidir moure la base de dades al núvol. Al següent article sobre aquest tema, us mostraré com implementar tot el sistema descrit anteriorment a AWS mitjançant AWS Kinesis i AWS EMR. Segueix les notícies!

Font: www.habr.com

Afegeix comentari