Apache Kafka ed elaborazione dei dati in streaming con Spark Streaming

Ciao, Habr! Oggi creeremo un sistema che elaborerà i flussi di messaggi Apache Kafka utilizzando Spark Streaming e scriverà i risultati dell'elaborazione nel database cloud AWS RDS.

Immaginiamo che un certo istituto di credito ci affidi il compito di elaborare le transazioni in entrata “al volo” in tutte le sue filiali. Ciò può essere fatto allo scopo di calcolare tempestivamente una posizione valutaria aperta per la tesoreria, limiti o risultati finanziari per le transazioni, ecc.

Come implementare questo caso senza l'uso di magie e incantesimi: leggi sotto il taglio! Andare!

Apache Kafka ed elaborazione dei dati in streaming con Spark Streaming
(Fonte immagine)

Introduzione

Naturalmente, l’elaborazione di una grande quantità di dati in tempo reale offre ampie opportunità di utilizzo nei sistemi moderni. Una delle combinazioni più popolari a questo scopo è il tandem di Apache Kafka e Spark Streaming, in cui Kafka crea un flusso di pacchetti di messaggi in entrata e Spark Streaming elabora questi pacchetti in un determinato intervallo di tempo.

Per aumentare la tolleranza agli errori dell'applicazione, utilizzeremo i checkpoint. Con questo meccanismo, quando il motore Spark Streaming ha bisogno di recuperare i dati persi, deve solo tornare all'ultimo checkpoint e riprendere i calcoli da lì.

Architettura del sistema sviluppato

Apache Kafka ed elaborazione dei dati in streaming con Spark Streaming

Componenti utilizzati:

  • Apache Kafka è un sistema di messaggistica distribuito-sottoscrivi. Adatto sia per il consumo di messaggi offline che online. Per prevenire la perdita di dati, i messaggi Kafka vengono archiviati su disco e replicati all'interno del cluster. Il sistema Kafka è basato sul servizio di sincronizzazione ZooKeeper;
  • Streaming di Apache Spark - Componente Spark per l'elaborazione dei dati in streaming. Il modulo Spark Streaming è costruito utilizzando un'architettura micro-batch, in cui il flusso di dati viene interpretato come una sequenza continua di piccoli pacchetti di dati. Spark Streaming prende i dati da diverse fonti e li combina in piccoli pacchetti. Nuovi pacchetti vengono creati a intervalli regolari. All'inizio di ogni intervallo di tempo viene creato un nuovo pacchetto e tutti i dati ricevuti durante tale intervallo vengono inclusi nel pacchetto. Al termine dell'intervallo, la crescita dei pacchetti si interrompe. La dimensione dell'intervallo è determinata da un parametro chiamato intervallo batch;
  • Apache SparkSql - combina l'elaborazione relazionale con la programmazione funzionale Spark. Per dati strutturati si intendono i dati che hanno uno schema, ovvero un singolo set di campi per tutti i record. Spark SQL supporta input da una varietà di origini dati strutturate e, grazie alla disponibilità di informazioni sullo schema, può recuperare in modo efficiente solo i campi richiesti dei record e fornisce anche API DataFrame;
  • RDS AWS è un database relazionale basato su cloud relativamente economico, un servizio Web che semplifica la configurazione, il funzionamento e la scalabilità ed è amministrato direttamente da Amazon.

Installazione ed esecuzione del server Kafka

Prima di utilizzare direttamente Kafka, devi assicurarti di avere Java, perché... JVM viene utilizzato per il lavoro:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Creiamo un nuovo utente per lavorare con Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Successivamente, scarica la distribuzione dal sito Web ufficiale Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Decomprimere l'archivio scaricato:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Il passaggio successivo è facoltativo. Il fatto è che le impostazioni predefinite non consentono di utilizzare appieno tutte le funzionalità di Apache Kafka. Ad esempio, elimina un argomento, una categoria o un gruppo in cui è possibile pubblicare i messaggi. Per cambiarlo, modifichiamo il file di configurazione:

vim ~/kafka/config/server.properties

Aggiungi quanto segue alla fine del file:

delete.topic.enable = true

Prima di avviare il server Kafka, è necessario avviare il server ZooKeeper; utilizzeremo lo script ausiliario fornito con la distribuzione Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Dopo che ZooKeeper è stato avviato correttamente, avvia il server Kafka in un terminale separato:

bin/kafka-server-start.sh config/server.properties

Creiamo un nuovo argomento chiamato Transazione:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Assicuriamoci che sia stato creato un argomento con il numero richiesto di partizioni e repliche:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka ed elaborazione dei dati in streaming con Spark Streaming

Perdiamo i momenti di test del produttore e del consumatore per l'argomento appena creato. Maggiori dettagli su come testare l'invio e la ricezione dei messaggi sono scritti nella documentazione ufficiale - Invia alcuni messaggi. Bene, passiamo alla scrittura di un produttore in Python utilizzando l'API KafkaProducer.

Scrittura del produttore

Il produttore genererà dati casuali: 100 messaggi al secondo. Per dati casuali intendiamo un dizionario composto da tre campi:

  • Branch di società — nome del punto vendita dell’ente creditizio;
  • Valuta - valuta di transazione;
  • Quantità - Importo della transazione. L'importo sarà un numero positivo se si tratta di un acquisto di valuta da parte della Banca, e un numero negativo se si tratta di una vendita.

Il codice per il produttore è simile al seguente:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Successivamente, utilizzando il metodo send, inviamo un messaggio al server, all'argomento di cui abbiamo bisogno, in formato JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Durante l'esecuzione dello script, riceviamo i seguenti messaggi nel terminale:

Apache Kafka ed elaborazione dei dati in streaming con Spark Streaming

Ciò significa che tutto funziona come volevamo: il produttore genera e invia messaggi sull'argomento di cui abbiamo bisogno.
Il passaggio successivo consiste nell'installare Spark ed elaborare questo flusso di messaggi.

Installazione di Apache Spark

Apache Spark è una piattaforma di cluster computing universale e ad alte prestazioni.

Spark offre prestazioni migliori rispetto alle implementazioni più diffuse del modello MapReduce, supportando al tempo stesso una gamma più ampia di tipi di calcolo, comprese query interattive ed elaborazione di flussi. La velocità gioca un ruolo importante quando si elaborano grandi quantità di dati, poiché è la velocità che consente di lavorare in modo interattivo senza trascorrere minuti o ore in attesa. Uno dei maggiori punti di forza di Spark che lo rende così veloce è la sua capacità di eseguire calcoli in memoria.

Questo framework è scritto in Scala, quindi devi prima installarlo:

sudo apt-get install scala

Scarica la distribuzione Spark dal sito ufficiale:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Decomprimi l'archivio:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Aggiungi il percorso di Spark al file bash:

vim ~/.bashrc

Aggiungi le seguenti righe tramite l'editor:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Esegui il comando seguente dopo aver apportato modifiche a bashrc:

source ~/.bashrc

Distribuzione di AWS PostgreSQL

Non resta che distribuire il database in cui caricheremo le informazioni elaborate dai flussi. Per questo utilizzeremo il servizio AWS RDS.

Vai alla console AWS -> AWS RDS -> Database -> Crea database:
Apache Kafka ed elaborazione dei dati in streaming con Spark Streaming

Seleziona PostgreSQL e fai clic su Avanti:
Apache Kafka ed elaborazione dei dati in streaming con Spark Streaming

Perché Questo esempio è solo a scopo didattico; utilizzeremo un server gratuito “al minimo” (Livello gratuito):
Apache Kafka ed elaborazione dei dati in streaming con Spark Streaming

Successivamente, inseriamo un segno di spunta nel blocco Livello gratuito, dopodiché ci verrà offerta automaticamente un'istanza della classe t2.micro: sebbene debole, è gratuita e abbastanza adatta al nostro compito:
Apache Kafka ed elaborazione dei dati in streaming con Spark Streaming

Poi arrivano cose molto importanti: il nome dell'istanza del database, il nome dell'utente master e la sua password. Diamo un nome all'istanza: myHabrTest, utente master: habr, parola d'ordine: habr12345 e fare clic sul pulsante Avanti:
Apache Kafka ed elaborazione dei dati in streaming con Spark Streaming

Nella pagina successiva ci sono i parametri responsabili dell'accessibilità del nostro server database dall'esterno (Accessibilità pubblica) e della disponibilità delle porte:

Apache Kafka ed elaborazione dei dati in streaming con Spark Streaming

Creiamo una nuova impostazione per il gruppo di sicurezza VPC, che consentirà l'accesso esterno al nostro server database tramite la porta 5432 (PostgreSQL).
Andiamo alla console AWS in una finestra del browser separata nella dashboard VPC -> Gruppi di sicurezza -> sezione Crea gruppo di sicurezza:
Apache Kafka ed elaborazione dei dati in streaming con Spark Streaming

Impostiamo il nome per il gruppo di sicurezza - PostgreSQL, una descrizione, indichiamo a quale VPC questo gruppo deve essere associato e facciamo clic sul pulsante Crea:
Apache Kafka ed elaborazione dei dati in streaming con Spark Streaming

Compila le regole in entrata per la porta 5432 per il gruppo appena creato, come mostrato nell'immagine seguente. Non è possibile specificare la porta manualmente, ma selezionare PostgreSQL dall'elenco a discesa Tipo.

A rigor di termini, il valore ::/0 indica la disponibilità di traffico in entrata al server da tutto il mondo, il che canonicamente non è del tutto vero, ma per analizzare l'esempio, permettiamoci di utilizzare questo approccio:
Apache Kafka ed elaborazione dei dati in streaming con Spark Streaming

Torniamo alla pagina del browser, dove abbiamo aperto “Configura impostazioni avanzate” e selezioniamo nella sezione Gruppi di sicurezza VPC -> Scegli gruppi di sicurezza VPC esistenti -> PostgreSQL:
Apache Kafka ed elaborazione dei dati in streaming con Spark Streaming

Successivamente, nelle Opzioni database -> Nome database -> imposta il nome - habrDB.

Possiamo lasciare i restanti parametri, ad eccezione della disabilitazione del backup (periodo di conservazione del backup - 0 giorni), del monitoraggio e di Performance Insights, per impostazione predefinita. Fare clic sul pulsante Crea database:
Apache Kafka ed elaborazione dei dati in streaming con Spark Streaming

Gestore del thread

La fase finale sarà lo sviluppo di un lavoro Spark, che elaborerà i nuovi dati provenienti da Kafka ogni due secondi e inserirà il risultato nel database.

Come notato in precedenza, i checkpoint sono un meccanismo fondamentale in SparkStreaming che deve essere configurato per garantire la tolleranza agli errori. Utilizzeremo i checkpoint e, se la procedura fallisce, al modulo Spark Streaming basterà tornare all'ultimo checkpoint e riprendere i calcoli da esso per recuperare i dati persi.

Il checkpoint può essere abilitato impostando una directory su un file system affidabile e con tolleranza agli errori (come HDFS, S3, ecc.) in cui verranno archiviate le informazioni del checkpoint. Questo viene fatto utilizzando, ad esempio:

streamingContext.checkpoint(checkpointDirectory)

Nel nostro esempio, utilizzeremo il seguente approccio, vale a dire, se checkpointDirectory esiste, il contesto verrà ricreato dai dati del checkpoint. Se la directory non esiste (ovvero viene eseguita per la prima volta), viene chiamata functionToCreateContext per creare un nuovo contesto e configurare DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Creiamo un oggetto DirectStream per connettersi al topic “transazione” utilizzando il metodo createDirectStream della libreria KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Analisi dei dati in entrata in formato JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Utilizzando Spark SQL, eseguiamo un semplice raggruppamento e visualizziamo il risultato nella console:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Ottenere il testo della query ed eseguirlo tramite Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Quindi salviamo i dati aggregati risultanti in una tabella in AWS RDS. Per salvare i risultati dell'aggregazione in una tabella del database, utilizzeremo il metodo write dell'oggetto DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Qualche parola sulla configurazione di una connessione ad AWS RDS. Abbiamo creato l'utente e la password nella fase "Distribuzione di AWS PostgreSQL". Dovresti utilizzare Endpoint come URL del server database, che viene visualizzato nella sezione Connettività e sicurezza:

Apache Kafka ed elaborazione dei dati in streaming con Spark Streaming

Per connettere correttamente Spark e Kafka, dovresti eseguire il lavoro tramite smark-submit utilizzando l'artefatto spark-streaming-kafka-0-8_2.11. Inoltre, utilizzeremo anche un artefatto per interagire con il database PostgreSQL; li trasferiremo tramite --packages.

Per flessibilità dello script, includeremo come parametri di input anche il nome del server dei messaggi e l'argomento da cui vogliamo ricevere i dati.

Quindi, è il momento di avviare e verificare la funzionalità del sistema:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Tutto ha funzionato! Come puoi vedere nell'immagine seguente, mentre l'applicazione è in esecuzione, i nuovi risultati dell'aggregazione vengono emessi ogni 2 secondi, perché abbiamo impostato l'intervallo di batch su 2 secondi quando abbiamo creato l'oggetto StreamingContext:

Apache Kafka ed elaborazione dei dati in streaming con Spark Streaming

Successivamente, eseguiamo una semplice query al database per verificare la presenza di record nella tabella flusso_transazione:

Apache Kafka ed elaborazione dei dati in streaming con Spark Streaming

conclusione

Questo articolo ha esaminato un esempio di elaborazione del flusso di informazioni utilizzando Spark Streaming insieme ad Apache Kafka e PostgreSQL. Con la crescita dei dati provenienti da varie fonti, è difficile sopravvalutare il valore pratico di Spark Streaming per la creazione di applicazioni in streaming e in tempo reale.

Puoi trovare il codice sorgente completo nel mio repository su GitHub.

Sono felice di discutere questo articolo, attendo con ansia i vostri commenti e spero anche in critiche costruttive da parte di tutti i lettori premurosi.

Vi auguro successo!

Ps. Inizialmente era previsto l'utilizzo di un database PostgreSQL locale, ma dato il mio amore per AWS, ho deciso di spostare il database sul cloud. Nel prossimo articolo su questo argomento, mostrerò come implementare l'intero sistema sopra descritto in AWS utilizzando AWS Kinesis e AWS EMR. Segui le notizie!

Fonte: habr.com

Aggiungi un commento