Apache Kafka è Data Streaming cù Spark Streaming

Salutami, Habr! Oghje custruiremu un sistema chì processerà i flussi di missaghju Apache Kafka usendu Spark Streaming è scrive i risultati di trasfurmazioni in a basa di dati di nuvola AWS RDS.

Fighjemu chì una certa istituzione di creditu ci stabilisce u compitu di trasfurmà e transazzioni entranti "à a mosca" in tutti i so rami. Questu pò esse fattu per u scopu di calculà prontamente una pusizione di valuta aperta per u tesoru, limiti o risultati finanziarii per transacciones, etc.

Cumu implementà stu casu senza l'usu di magia è magia magica - leghje sottu u cut! Vai !

Apache Kafka è Data Streaming cù Spark Streaming
(Fonte di l'imaghjini)

Introduzione

Di sicuru, trasfurmà una grande quantità di dati in tempu reale furnisce assai opportunità per l'usu in i sistemi muderni. Una di e combinazioni più populari per questu hè u tandem di Apache Kafka è Spark Streaming, induve Kafka crea un flussu di pacchetti di messagi entranti, è Spark Streaming processa questi pacchetti in un intervallu di tempu determinatu.

Per aumentà a tolleranza di difetti di l'applicazione, useremu i punti di cuntrollu. Cù stu miccanisimu, quandu u mutore Spark Streaming hà bisognu di ricuperà i dati persi, solu deve vultà à l'ultimu puntu di cuntrollu è ripiglià i calculi da quì.

L'architettura di u sistema sviluppatu

Apache Kafka è Data Streaming cù Spark Streaming

Cumpunenti utilizati:

  • Apache Kafka hè un sistema di messageria di pubblicazione-abbonamentu distribuitu. Adatta per u cunsumu di messagi offline è in linea. Per prevene a perdita di dati, i missaghji Kafka sò almacenati in discu è replicati in u cluster. U sistema Kafka hè custruitu nantu à u serviziu di sincronizazione ZooKeeper;
  • Streaming Apache Spark - Componente Spark per u processu di dati in streaming. U modulu Spark Streaming hè custruitu cù una architettura micro-batch, induve u flussu di dati hè interpretatu cum'è una sequenza cuntinua di picculi pacchetti di dati. Spark Streaming piglia dati da diverse fonti è li combina in picculi pacchetti. I pacchetti novi sò creati à intervalli regulari. À u principiu di ogni intervallu di tempu, un novu pacchettu hè creatu, è qualsiasi dati ricevuti durante quellu intervallu hè inclusu in u pacchettu. À a fine di l'intervallu, a crescita di pacchetti si ferma. A dimensione di l'intervallu hè determinata da un paràmetru chjamatu intervallu batch;
  • Apache Spark SQL - unisce u prucessu relazionale cù a prugrammazione funziunale Spark. I dati strutturati significanu dati chì anu un schema, vale à dì un unicu settore di campi per tutti i registri. Spark SQL supporta l'input da una varietà di fonti di dati strutturati è, grazia à a dispunibilità di l'infurmazioni di schema, pò ricuperà in modu efficace solu i campi necessarii di registri, è furnisce ancu API di DataFrame;
  • AWS RDS hè una basa di dati relazionale basata in nuvola relativamente pocu prezzu, serviziu web chì simplificà a stallazione, l'operazione è a scala, è hè amministrata direttamente da Amazon.

Installazione è esecuzione di u servitore Kafka

Prima di utilizà Kafka direttamente, avete bisognu di assicurà chì avete Java, perchè ... JVM hè utilizatu per u travagliu:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Creemu un novu utilizatore per travaglià cù Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Dopu, scaricate a distribuzione da u situ ufficiale Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Unpack l'archiviu scaricatu:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

U prossimu passu hè facultativu. U fattu hè chì i paràmetri predeterminati ùn permettenu micca di utilizà tutte e funzioni di Apache Kafka. Per esempiu, sguassate un tema, categuria, gruppu à quale i missaghji ponu esse publicati. Per cambià questu, editàmu u schedariu di cunfigurazione:

vim ~/kafka/config/server.properties

Aghjunghjite i seguenti à a fine di u schedariu:

delete.topic.enable = true

Prima di inizià u servitore Kafka, avete bisognu di inizià u servitore ZooKeeper; useremu u scrittore ausiliariu chì vene cù a distribuzione Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Dopu chì ZooKeeper hà iniziatu cù successu, lanciate u servitore Kafka in un terminal separatu:

bin/kafka-server-start.sh config/server.properties

Creemu un novu tema chjamatu Transaction:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Assicuratevi chì un tema cù u numeru necessariu di partizioni è replicazione hè statu creatu:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka è Data Streaming cù Spark Streaming

Mandemu i mumenti di pruvà u pruduttore è u cunsumadore per u tema novu creatu. Più dettagli nantu à cumu pudete pruvà l'inviu è a ricezione di messagi sò scritti in a documentazione ufficiale - Mandate qualchi missaghji. Ebbè, andemu à scrive un pruduttore in Python cù l'API KafkaProducer.

Scrittura di pruduttore

U pruduttore generà dati aleatorii - 100 missaghji ogni secondu. Per dati aleatorii intendemu un dizziunariu custituitu di trè campi:

  • Branch - nome di u puntu di vendita di l'istituzione di creditu;
  • Currency - valuta di transazzione;
  • muntanti - quantità di transazzione. A quantità serà un numeru pusitivu s'ellu hè una compra di munita da u Bancu, è un numeru negativu s'ellu hè una vendita.

U codice per u pruduttore hè cusì:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Dopu, utilizendu u metudu di mandatu, mandemu un missaghju à u servitore, à u tema chì avemu bisognu, in formatu JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Quandu eseguisce u script, ricevemu i seguenti missaghji in u terminal:

Apache Kafka è Data Streaming cù Spark Streaming

Questu significa chì tuttu funziona cum'è vulemu - u pruduttori genera è manda messagi à u tema chì avemu bisognu.
U prossimu passu hè di stallà Spark è processà stu flussu di missaghju.

Installazione di Apache Spark

Apache Spark hè una piattaforma di computing cluster universale è di altu rendiment.

Spark rende megliu cà l'implementazioni populari di u mudellu MapReduce mentre sustene una gamma più larga di tippi di calculu, cumprese dumande interattive è trasfurmazioni di flussu. A velocità ghjoca un rolu impurtante in u processu di grande quantità di dati, postu chì hè a velocità chì vi permette di travaglià interattivamente senza passà minuti o ore aspittendu. Unu di i più grandi punti di forza di Spark chì a rende cusì veloce hè a so capacità di fà calculi in memoria.

Stu quadru hè scrittu in Scala, cusì avete bisognu di stallà prima:

sudo apt-get install scala

Scaricate a distribuzione Spark da u situ ufficiale:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Unpack l'archiviu:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Aghjunghjite u percorsu à Spark à u schedariu bash:

vim ~/.bashrc

Aghjunghjite e seguenti linee attraversu l'editore:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Eseguite u cumandimu sottu dopu avè fattu cambiamenti à bashrc:

source ~/.bashrc

Implementazione di AWS PostgreSQL

Tuttu ciò chì resta hè di implementà a basa di dati in quale caricheremu l'infurmazioni processate da i flussi. Per questu useremu u serviziu AWS RDS.

Andate à a cunsola AWS -> AWS RDS -> Database -> Crea una basa di dati:
Apache Kafka è Data Streaming cù Spark Streaming

Selezziunate PostgreSQL è cliccate Next:
Apache Kafka è Data Streaming cù Spark Streaming

Perchè Questu esempiu hè solu per scopi educativi; useremu un servitore gratuitu "à u minimu" (Free Tier):
Apache Kafka è Data Streaming cù Spark Streaming

In seguitu, mettemu un tick in u bloccu Free Tier, è dopu ci sarà automaticamente offrittu una istanza di a classa t2.micro - ancu s'ellu hè debule, hè liberu è abbastanza adattatu per u nostru compitu:
Apache Kafka è Data Streaming cù Spark Streaming

Dopu venenu cose assai impurtanti: u nome di l'istanza di a basa di dati, u nome di l'utilizatore maestru è a so password. Chjamemu l'istanza: myHabrTest, utilizatore maestru: abr, codice: abru12345 è cliccate nant'à u buttone Next:
Apache Kafka è Data Streaming cù Spark Streaming

In a pagina dopu ci sò paràmetri rispunsevuli di l'accessibilità di u nostru servitore di basa di dati da l'esternu (accessibilità publica) è a dispunibilità di u portu:

Apache Kafka è Data Streaming cù Spark Streaming

Creemu un novu paràmetru per u gruppu di sicurità VPC, chì permetterà l'accessu esternu à u nostru servitore di basa di dati via u portu 5432 (PostgreSQL).
Andemu à a cunsola AWS in una finestra di navigatore separata à u Dashboard VPC -> Gruppi di sicurezza -> Crea a sezione di gruppu di sicurezza:
Apache Kafka è Data Streaming cù Spark Streaming

Avemu stabilitu u nome per u gruppu di Sicurezza - PostgreSQL, una descrizzione, indicà quale VPC stu gruppu deve esse assuciatu è cliccate u buttone Crea:
Apache Kafka è Data Streaming cù Spark Streaming

Riempite e regule Inbound per u portu 5432 per u gruppu novu creatu, cum'è mostra in a stampa sottu. Ùn pudete micca specificà u portu manualmente, ma selezziunate PostgreSQL da a lista di Tipu.

Strictly speaking, u valore ::/0 significa a dispunibilità di u trafficu entrante à u servitore da tuttu u mondu, chì ùn hè micca canonicamente vera, ma per analizà l'esempiu, permettemu di utilizà stu approcciu:
Apache Kafka è Data Streaming cù Spark Streaming

Riturnemu à a pagina di u navigatore, induve avemu "Configurate paràmetri avanzati" aperti è selezziunate in a sezione di gruppi di sicurezza VPC -> Sceglite i gruppi di sicurezza VPC esistenti -> PostgreSQL:
Apache Kafka è Data Streaming cù Spark Streaming

Dopu, in l'opzioni di basa di dati -> Nome di basa di dati -> stabilisce u nome - habrDB.

Pudemu lascià i paràmetri rimanenti, cù l'eccezzioni di disattivà a copia di salvezza (periodu di conservazione di salvezza - 0 ghjorni), monitoraghju è Performance Insights, per difettu. Cliccate nant'à u buttone Crea una basa di dati:
Apache Kafka è Data Streaming cù Spark Streaming

Gestore di filu

L'ultima tappa serà u sviluppu di un travagliu Spark, chì prucederà novi dati chì venenu da Kafka ogni dui seconde è entre u risultatu in a basa di dati.

Comu nutatu sopra, i punti di cuntrollu sò un mecanismu core in SparkStreaming chì deve esse cunfiguratu per assicurà a tolleranza di difetti. Avemu aduprà i punti di cuntrollu è, se a prucedura falla, u modulu Spark Streaming solu bisognu di vultà à l'ultimu puntu di cuntrollu è ripiglià i calculi da ellu per ricuperà i dati persi.

U checkpointing pò esse attivatu mettendu un repertoriu nantu à un sistema di fugliale affidabile è tolerante à i difetti (cum'è HDFS, S3, etc.) in quale l'infurmazioni di u puntu di cuntrollu seranu guardati. Questu hè fattu cù, per esempiu:

streamingContext.checkpoint(checkpointDirectory)

In u nostru esempiu, useremu l'approcciu seguente, vale à dì, se checkpointDirectory esiste, allora u cuntestu serà ricreatu da i dati di checkpoint. Se u repertoriu ùn esiste micca (vale à dì eseguitu per a prima volta), allora functionToCreateContext hè chjamatu per creà un novu cuntestu è cunfigurà DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Creemu un oggettu DirectStream per cunnette à u tema "transazzione" utilizendu u metudu createDirectStream di a biblioteca KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Analisi di dati in entrata in formatu JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Utilizendu Spark SQL, facemu un raggruppamentu simplice è mostra u risultatu in a cunsola:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Ottene u testu di a dumanda è eseguisce attraversu Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

E poi salvemu i dati aggregati risultanti in una tabella in AWS RDS. Per salvà i risultati di l'aggregazione in una tabella di basa di dati, useremu u metudu di scrittura di l'ughjettu DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Uni pochi parolle nantu à a creazione di una cunnessione à AWS RDS. Avemu creatu l'utilizatore è a password per questu in u passu "Implementazione di AWS PostgreSQL". Duvete aduprà Endpoint cum'è l'url di u servitore di basa di dati, chì hè visualizatu in a sezione Connettività è sicurità:

Apache Kafka è Data Streaming cù Spark Streaming

Per cunnette currettamente Spark è Kafka, duvete eseguisce u travagliu via smark-submit usendu l'artefattu. spark-streaming-kafka-0-8_2.11. Inoltre, useremu ancu un artefattu per interagisce cù a basa di dati PostgreSQL; li trasfereremu via --packages.

Per a flessibilità di u script, includeremu ancu cum'è parametri di input u nome di u servitore di messagiu è u tema da quale vulemu riceve dati.

Dunque, hè ora di lancià è verificà e funziunalità di u sistema:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Tuttu hà travagliatu! Comu pudete vede in a stampa quì sottu, mentre l'applicazione hè in esecuzione, i novi risultati di aggregazione sò emessi ogni 2 seconde, perchè avemu stabilitu l'intervallu di batching à 2 seconde quandu avemu creatu l'oggettu StreamingContext:

Apache Kafka è Data Streaming cù Spark Streaming

In seguitu, facemu una dumanda simplice à a basa di dati per verificà a presenza di registri in a tavula flussu di transazzione:

Apache Kafka è Data Streaming cù Spark Streaming

cunchiusioni

Questu articulu hà guardatu un esempiu di trasfurmazioni in flussu di l'infurmazioni cù Spark Streaming in cunjunzione cù Apache Kafka è PostgreSQL. Cù a crescita di dati da diverse fonti, hè difficiule di sopravvalutà u valore praticu di Spark Streaming per creà streaming è applicazioni in tempu reale.

Pudete truvà u codice fonte sanu in u mo repository à GitHub.

Sò cuntentu di discutiri stu articulu, aghju aspittatu i vostri cumenti, è speru ancu di critiche constructive da tutti i lettori attenti.

Ju ti vògliu successu!

. Inizialmente era previstu di utilizà una basa di dati PostgreSQL lucale, ma datu u mo amore per AWS, decisu di trasfurmà a basa di dati à u nuvulu. In u prossimu articulu nantu à questu tema, vi mustrarà cumu implementà tuttu u sistema descrittu sopra in AWS utilizendu AWS Kinesis è AWS EMR. Segui a nutizia !

Source: www.habr.com

Add a comment