Hola Habr! Avui construirem un sistema que processarà els fluxos de missatges d'Apache Kafka mitjançant Spark Streaming i escriurà el resultat del processament a la base de dades del núvol AWS RDS.
Imaginem que una determinada entitat de crèdit ens proposa la tasca de processar les transaccions entrants "sobre la marxa" per a totes les seves oficines. Això es pot fer amb el propòsit de calcular ràpidament la posició de moneda oberta per a la tresoreria, els límits o el resultat financer de les transaccions, etc.
Com implementar aquest cas sense l'ús de màgia i encanteris màgics: llegiu sota el tall! Va!
Per descomptat, el processament en temps real d'una gran quantitat de dades ofereix àmplies oportunitats d'ús en sistemes moderns. Una de les combinacions més populars per a això és el tàndem d'Apache Kafka i Spark Streaming, on Kafka crea un flux de paquets de missatges entrants i Spark Streaming processa aquests paquets en un interval de temps especificat.
Per millorar la tolerància a errors de l'aplicació, utilitzarem punts de control - punts de control. Amb aquest mecanisme, quan el mòdul Spark Streaming necessita recuperar dades perdudes, només necessita tornar a l'últim punt de control i reprendre els càlculs des d'allà.
L'arquitectura del sistema desenvolupat
Components utilitzats:
Apatxe Kafka és un sistema de missatgeria distribuït de publicació i subscripció. Apte per al consum de missatges fora de línia i en línia. Per evitar la pèrdua de dades, els missatges de Kafka s'emmagatzemen al disc i es reprodueixen dins del clúster. El sistema Kafka està construït sobre el servei de sincronització ZooKeeper;
Transmissió d'Apache Spark - un component Spark per processar dades de streaming. El mòdul Spark Streaming es construeix mitjançant una arquitectura de micro-lots, quan un flux de dades s'interpreta com una seqüència contínua de petits paquets de dades. Spark Streaming agafa dades de diferents fonts i les combina en petits lots. Es creen nous paquets a intervals regulars. Al començament de cada interval de temps, es crea un paquet nou i les dades rebudes durant aquest interval s'inclouen al paquet. Al final de l'interval, el creixement del paquet s'atura. La mida de l'interval ve determinada per un paràmetre anomenat interval per lots;
Apache Spark SQL - Combina el processament relacional amb la programació funcional de Spark. Les dades estructurades fan referència a les dades que tenen un esquema, és a dir, un únic conjunt de camps per a tots els registres. Spark SQL admet l'entrada d'una varietat de fonts de dades estructurades i, a causa de la presència d'informació d'esquema, només pot recuperar de manera eficient els camps necessaris dels registres i també proporciona API de DataFrame;
AWS RDS és una base de dades relacional basada en núvol relativament econòmica, un servei web que simplifica la configuració, el funcionament i l'escalat, administrat directament per Amazon.
Instal·lació i execució del servidor Kafka
Abans d'utilitzar Kafka directament, heu d'assegurar-vos que teniu Java, perquè JVM s'utilitza per treballar:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
El següent pas és opcional. El fet és que la configuració predeterminada no us permet utilitzar plenament totes les funcions d'Apache Kafka. Per exemple, suprimiu un tema, categoria, grup en què es poden publicar missatges. Per canviar-ho, editem el fitxer de configuració:
vim ~/kafka/config/server.properties
Afegiu el següent al final del fitxer:
delete.topic.enable = true
Abans d'iniciar el servidor Kafka, heu d'iniciar el servidor ZooKeeper, utilitzarem l'script d'ajuda que ve amb la distribució Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Després que ZooKeeper s'hagi iniciat correctament, iniciem el servidor Kafka en un terminal independent:
Perdem els moments de prova del productor i consumidor del tema de nova creació. Més detalls sobre com podeu provar l'enviament i la recepció de missatges estan escrits a la documentació oficial: Envieu alguns missatges. Bé, estem passant a escriure un productor a Python mitjançant l'API KafkaProducer.
Redacció del productor
El productor generarà dades aleatòries: 100 missatges cada segon. Per dades aleatòries entenem un diccionari format per tres camps:
Branca — nom del punt de venda de l'entitat de crèdit;
Moneda - moneda de transacció;
quantitat - import de la transacció. L'import serà positiu si es tracta d'una compra de divises per part del Banc, i negatiu si es tracta d'una venda.
A continuació, mitjançant el mètode send, enviem un missatge al servidor, al tema que necessitem, en format JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Quan executem l'script, ens apareixen els missatges següents al terminal:
Això vol dir que tot funciona com volíem: el productor genera i envia missatges al tema que necessitem.
El següent pas és instal·lar Spark i processar aquest flux de missatges.
Instal·lació d'Apache Spark
Apache Spark és una plataforma informàtica de clúster versàtil i d'alt rendiment.
Spark supera les implementacions populars del model MapReduce en termes de rendiment, alhora que ofereix suport per a una gamma més àmplia de tipus de càlcul, incloses consultes interactives i streaming. La velocitat té un paper important a l'hora de processar grans quantitats de dades, ja que és la velocitat la que permet treballar de manera interactiva sense passar minuts o hores d'espera. Un dels majors punts forts de Spark per oferir aquesta velocitat és la seva capacitat per realitzar càlculs a la memòria.
Aquest marc està escrit a Scala, de manera que primer cal instal·lar-lo:
sudo apt-get install scala
Baixeu la distribució de Spark des del lloc web oficial:
Executeu l'ordre següent després de fer canvis a bashrc:
source ~/.bashrc
Implementació d'AWS PostgreSQL
Queda per ampliar la base de dades, on omplirem la informació processada dels fluxos. Per fer-ho, utilitzarem el servei AWS RDS.
Aneu a la consola AWS -> AWS RDS -> Bases de dades -> Crea base de dades:
Seleccioneu PostgreSQL i feu clic al botó Següent:
Perquè Aquest exemple s'analitza únicament amb finalitats educatives, farem servir un servidor gratuït "com a mínim" (Free Tier):
A continuació, comproveu el bloc de nivell gratuït i, després d'això, automàticament se'ns oferirà una instància de la classe t2.micro, encara que feble, però gratuïta i bastant adequada per a la nostra tasca:
Segueixen coses molt importants: el nom de la instància de base de dades, el nom de l'usuari mestre i la seva contrasenya. Anem a cridar la instància: myHabrTest, usuari mestre: habr, contrasenya: habr12345 i feu clic al botó Següent:
La pàgina següent conté els paràmetres responsables de l'accessibilitat del nostre servidor de bases de dades des de l'exterior (Accessibilitat pública) i la disponibilitat de ports:
Creem una nova configuració per al grup de seguretat VPC, que permetrà l'accés extern al nostre servidor de bases de dades a través del port 5432 (PostgreSQL).
Anem en una finestra del navegador independent a la consola d'AWS a la secció VPC Dashboard -> Security Groups -> Create security group:
Definim el nom del grup de seguretat - PostgreSQL, una descripció, especifiquem a quina VPC s'ha d'associar aquest grup i feu clic al botó Crea:
Omplim les regles d'entrada del grup recentment creat per al port 5432, tal com es mostra a la imatge següent. No podeu especificar el port manualment, però seleccioneu PostgreSQL a la llista desplegable Tipus.
En sentit estricte, el valor ::/0 significa la disponibilitat de trànsit entrant per al servidor d'arreu del món, cosa que no és del tot cert canònicament, però per analitzar l'exemple, utilitzem aquest enfocament:
Tornem a la pàgina del navegador, on tenim obert “Configurar la configuració avançada” i seleccionem Grups de seguretat de VPC -> Escolliu grups de seguretat de VPC existents -> PostgreSQL a la secció:
A continuació, a la secció Opcions de la base de dades -> Nom de la base de dades -> estableix el nom - habrDB.
Podem deixar la resta de paràmetres, a excepció de la desactivació de la còpia de seguretat (període de retenció de còpia de seguretat - 0 dies), la supervisió i Performance Insights, per defecte. Feu clic al botó Crea una base de dades:
Gestor de fluxos
L'etapa final serà el desenvolupament d'un treball Spark, que processarà noves dades de Kafka cada dos segons i introduirà el resultat a la base de dades.
Com s'ha indicat anteriorment, els punts de control són el mecanisme principal de SparkStreaming que s'ha de configurar per proporcionar tolerància a errors. Utilitzarem punts de control i, en cas d'error del procediment, el mòdul Spark Streaming només haurà de tornar a l'últim punt de control i reprendre els càlculs des d'aquest per recuperar les dades perdudes.
Es pot habilitar un punt de control configurant un directori en un sistema de fitxers fiable i tolerant a errors (per exemple, HDFS, S3, etc.) on s'emmagatzemarà la informació del punt de control. Això es fa amb, per exemple:
streamingContext.checkpoint(checkpointDirectory)
En el nostre exemple, utilitzarem el següent enfocament, és a dir, si el checkpointDirectory existeix, el context es recrearà a partir de les dades del punt de control. Si el directori no existeix (és a dir, s'està executant per primera vegada), es crida a la funció functionToCreateContext per crear un context nou i configurar DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Creem un objecte DirectStream per connectar-nos al tema "transacció" mitjançant el mètode createDirectStream de la biblioteca KafkaUtils:
Utilitzant Spark SQL, fem una agrupació senzilla i donem el resultat a la consola:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Obtenir el cos de la consulta i executar-lo mitjançant Spark SQL:
I després desem les dades agregades rebudes en una taula a AWS RDS. Per desar els resultats de l'agregació en una taula de base de dades, utilitzarem el mètode d'escriptura de l'objecte DataFrame:
Unes quantes paraules sobre la configuració d'una connexió a AWS RDS. Vam crear l'usuari i la contrasenya al pas "Implementació d'AWS PostgreSQL". Com a URL del servidor de bases de dades, hauríeu d'utilitzar el punt final, que es mostra a la secció Connectivitat i seguretat:
Per connectar correctament Spark i Kafka, hauríeu d'executar el treball mitjançant smark-submit mitjançant l'artefacte spark-streaming-kafka-0-8_2.11. A més, també utilitzarem un artefacte per interactuar amb la base de dades PostgreSQL, els passarem per --packages.
Per a la flexibilitat de l'script, també treurem el nom del servidor de missatges i el tema del qual volem rebre dades com a paràmetres d'entrada.
Tot va funcionar! Com podeu veure a la imatge següent, mentre l'aplicació s'executa, es mostren nous resultats d'agregació cada 2 segons, perquè establim l'interval d'agrupació en 2 segons quan vam crear l'objecte StreamingContext:
A continuació, fem una consulta senzilla a la base de dades per comprovar si hi ha registres a la taula flux_transaccions:
Conclusió
En aquest article, es va considerar un exemple de processament d'informació en streaming mitjançant Spark Streaming juntament amb Apache Kafka i PostgreSQL. Amb el creixement del volum de dades de diverses fonts, és difícil sobreestimar el valor pràctic de Spark Streaming per crear aplicacions en temps real i de streaming.
Podeu trobar el codi font complet al meu repositori a GitHub.
Estic encantat de parlar d'aquest article, espero els vostres comentaris i també espero crítiques constructives de tots els lectors interessats.
Els desitjo molt èxit!
Pg. Originalment estava previst utilitzar una base de dades PostgreSQL local, però donat el meu amor per AWS, vaig decidir moure la base de dades al núvol. Al següent article sobre aquest tema, us mostraré com implementar tot el sistema descrit anteriorment a AWS mitjançant AWS Kinesis i AWS EMR. Segueix les notícies!