Salutami, Habr! Oghje custruiremu un sistema chì processerà i flussi di missaghju Apache Kafka usendu Spark Streaming è scrive i risultati di trasfurmazioni in a basa di dati di nuvola AWS RDS.
Fighjemu chì una certa istituzione di creditu ci stabilisce u compitu di trasfurmà e transazzioni entranti "à a mosca" in tutti i so rami. Questu pò esse fattu per u scopu di calculà prontamente una pusizione di valuta aperta per u tesoru, limiti o risultati finanziarii per transacciones, etc.
Cumu implementà stu casu senza l'usu di magia è magia magica - leghje sottu u cut! Vai !
Di sicuru, trasfurmà una grande quantità di dati in tempu reale furnisce assai opportunità per l'usu in i sistemi muderni. Una di e combinazioni più populari per questu hè u tandem di Apache Kafka è Spark Streaming, induve Kafka crea un flussu di pacchetti di messagi entranti, è Spark Streaming processa questi pacchetti in un intervallu di tempu determinatu.
Per aumentà a tolleranza di difetti di l'applicazione, useremu i punti di cuntrollu. Cù stu miccanisimu, quandu u mutore Spark Streaming hà bisognu di ricuperà i dati persi, solu deve vultà à l'ultimu puntu di cuntrollu è ripiglià i calculi da quì.
L'architettura di u sistema sviluppatu
Cumpunenti utilizati:
Apache Kafka hè un sistema di messageria di pubblicazione-abbonamentu distribuitu. Adatta per u cunsumu di messagi offline è in linea. Per prevene a perdita di dati, i missaghji Kafka sò almacenati in discu è replicati in u cluster. U sistema Kafka hè custruitu nantu à u serviziu di sincronizazione ZooKeeper;
Streaming Apache Spark - Componente Spark per u processu di dati in streaming. U modulu Spark Streaming hè custruitu cù una architettura micro-batch, induve u flussu di dati hè interpretatu cum'è una sequenza cuntinua di picculi pacchetti di dati. Spark Streaming piglia dati da diverse fonti è li combina in picculi pacchetti. I pacchetti novi sò creati à intervalli regulari. À u principiu di ogni intervallu di tempu, un novu pacchettu hè creatu, è qualsiasi dati ricevuti durante quellu intervallu hè inclusu in u pacchettu. À a fine di l'intervallu, a crescita di pacchetti si ferma. A dimensione di l'intervallu hè determinata da un paràmetru chjamatu intervallu batch;
Apache Spark SQL - unisce u prucessu relazionale cù a prugrammazione funziunale Spark. I dati strutturati significanu dati chì anu un schema, vale à dì un unicu settore di campi per tutti i registri. Spark SQL supporta l'input da una varietà di fonti di dati strutturati è, grazia à a dispunibilità di l'infurmazioni di schema, pò ricuperà in modu efficace solu i campi necessarii di registri, è furnisce ancu API di DataFrame;
AWS RDS hè una basa di dati relazionale basata in nuvola relativamente pocu prezzu, serviziu web chì simplificà a stallazione, l'operazione è a scala, è hè amministrata direttamente da Amazon.
Installazione è esecuzione di u servitore Kafka
Prima di utilizà Kafka direttamente, avete bisognu di assicurà chì avete Java, perchè ... JVM hè utilizatu per u travagliu:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
U prossimu passu hè facultativu. U fattu hè chì i paràmetri predeterminati ùn permettenu micca di utilizà tutte e funzioni di Apache Kafka. Per esempiu, sguassate un tema, categuria, gruppu à quale i missaghji ponu esse publicati. Per cambià questu, editàmu u schedariu di cunfigurazione:
vim ~/kafka/config/server.properties
Aghjunghjite i seguenti à a fine di u schedariu:
delete.topic.enable = true
Prima di inizià u servitore Kafka, avete bisognu di inizià u servitore ZooKeeper; useremu u scrittore ausiliariu chì vene cù a distribuzione Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Dopu chì ZooKeeper hà iniziatu cù successu, lanciate u servitore Kafka in un terminal separatu:
Mandemu i mumenti di pruvà u pruduttore è u cunsumadore per u tema novu creatu. Più dettagli nantu à cumu pudete pruvà l'inviu è a ricezione di messagi sò scritti in a documentazione ufficiale - Mandate qualchi missaghji. Ebbè, andemu à scrive un pruduttore in Python cù l'API KafkaProducer.
Scrittura di pruduttore
U pruduttore generà dati aleatorii - 100 missaghji ogni secondu. Per dati aleatorii intendemu un dizziunariu custituitu di trè campi:
Branch - nome di u puntu di vendita di l'istituzione di creditu;
Currency - valuta di transazzione;
muntanti - quantità di transazzione. A quantità serà un numeru pusitivu s'ellu hè una compra di munita da u Bancu, è un numeru negativu s'ellu hè una vendita.
Dopu, utilizendu u metudu di mandatu, mandemu un missaghju à u servitore, à u tema chì avemu bisognu, in formatu JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Quandu eseguisce u script, ricevemu i seguenti missaghji in u terminal:
Questu significa chì tuttu funziona cum'è vulemu - u pruduttori genera è manda messagi à u tema chì avemu bisognu.
U prossimu passu hè di stallà Spark è processà stu flussu di missaghju.
Installazione di Apache Spark
Apache Spark hè una piattaforma di computing cluster universale è di altu rendiment.
Spark rende megliu cà l'implementazioni populari di u mudellu MapReduce mentre sustene una gamma più larga di tippi di calculu, cumprese dumande interattive è trasfurmazioni di flussu. A velocità ghjoca un rolu impurtante in u processu di grande quantità di dati, postu chì hè a velocità chì vi permette di travaglià interattivamente senza passà minuti o ore aspittendu. Unu di i più grandi punti di forza di Spark chì a rende cusì veloce hè a so capacità di fà calculi in memoria.
Stu quadru hè scrittu in Scala, cusì avete bisognu di stallà prima:
sudo apt-get install scala
Scaricate a distribuzione Spark da u situ ufficiale:
Eseguite u cumandimu sottu dopu avè fattu cambiamenti à bashrc:
source ~/.bashrc
Implementazione di AWS PostgreSQL
Tuttu ciò chì resta hè di implementà a basa di dati in quale caricheremu l'infurmazioni processate da i flussi. Per questu useremu u serviziu AWS RDS.
Andate à a cunsola AWS -> AWS RDS -> Database -> Crea una basa di dati:
Selezziunate PostgreSQL è cliccate Next:
Perchè Questu esempiu hè solu per scopi educativi; useremu un servitore gratuitu "à u minimu" (Free Tier):
In seguitu, mettemu un tick in u bloccu Free Tier, è dopu ci sarà automaticamente offrittu una istanza di a classa t2.micro - ancu s'ellu hè debule, hè liberu è abbastanza adattatu per u nostru compitu:
Dopu venenu cose assai impurtanti: u nome di l'istanza di a basa di dati, u nome di l'utilizatore maestru è a so password. Chjamemu l'istanza: myHabrTest, utilizatore maestru: abr, codice: abru12345 è cliccate nant'à u buttone Next:
In a pagina dopu ci sò paràmetri rispunsevuli di l'accessibilità di u nostru servitore di basa di dati da l'esternu (accessibilità publica) è a dispunibilità di u portu:
Creemu un novu paràmetru per u gruppu di sicurità VPC, chì permetterà l'accessu esternu à u nostru servitore di basa di dati via u portu 5432 (PostgreSQL).
Andemu à a cunsola AWS in una finestra di navigatore separata à u Dashboard VPC -> Gruppi di sicurezza -> Crea a sezione di gruppu di sicurezza:
Avemu stabilitu u nome per u gruppu di Sicurezza - PostgreSQL, una descrizzione, indicà quale VPC stu gruppu deve esse assuciatu è cliccate u buttone Crea:
Riempite e regule Inbound per u portu 5432 per u gruppu novu creatu, cum'è mostra in a stampa sottu. Ùn pudete micca specificà u portu manualmente, ma selezziunate PostgreSQL da a lista di Tipu.
Strictly speaking, u valore ::/0 significa a dispunibilità di u trafficu entrante à u servitore da tuttu u mondu, chì ùn hè micca canonicamente vera, ma per analizà l'esempiu, permettemu di utilizà stu approcciu:
Riturnemu à a pagina di u navigatore, induve avemu "Configurate paràmetri avanzati" aperti è selezziunate in a sezione di gruppi di sicurezza VPC -> Sceglite i gruppi di sicurezza VPC esistenti -> PostgreSQL:
Dopu, in l'opzioni di basa di dati -> Nome di basa di dati -> stabilisce u nome - habrDB.
Pudemu lascià i paràmetri rimanenti, cù l'eccezzioni di disattivà a copia di salvezza (periodu di conservazione di salvezza - 0 ghjorni), monitoraghju è Performance Insights, per difettu. Cliccate nant'à u buttone Crea una basa di dati:
Gestore di filu
L'ultima tappa serà u sviluppu di un travagliu Spark, chì prucederà novi dati chì venenu da Kafka ogni dui seconde è entre u risultatu in a basa di dati.
Comu nutatu sopra, i punti di cuntrollu sò un mecanismu core in SparkStreaming chì deve esse cunfiguratu per assicurà a tolleranza di difetti. Avemu aduprà i punti di cuntrollu è, se a prucedura falla, u modulu Spark Streaming solu bisognu di vultà à l'ultimu puntu di cuntrollu è ripiglià i calculi da ellu per ricuperà i dati persi.
U checkpointing pò esse attivatu mettendu un repertoriu nantu à un sistema di fugliale affidabile è tolerante à i difetti (cum'è HDFS, S3, etc.) in quale l'infurmazioni di u puntu di cuntrollu seranu guardati. Questu hè fattu cù, per esempiu:
streamingContext.checkpoint(checkpointDirectory)
In u nostru esempiu, useremu l'approcciu seguente, vale à dì, se checkpointDirectory esiste, allora u cuntestu serà ricreatu da i dati di checkpoint. Se u repertoriu ùn esiste micca (vale à dì eseguitu per a prima volta), allora functionToCreateContext hè chjamatu per creà un novu cuntestu è cunfigurà DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Creemu un oggettu DirectStream per cunnette à u tema "transazzione" utilizendu u metudu createDirectStream di a biblioteca KafkaUtils:
Utilizendu Spark SQL, facemu un raggruppamentu simplice è mostra u risultatu in a cunsola:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Ottene u testu di a dumanda è eseguisce attraversu Spark SQL:
E poi salvemu i dati aggregati risultanti in una tabella in AWS RDS. Per salvà i risultati di l'aggregazione in una tabella di basa di dati, useremu u metudu di scrittura di l'ughjettu DataFrame:
Uni pochi parolle nantu à a creazione di una cunnessione à AWS RDS. Avemu creatu l'utilizatore è a password per questu in u passu "Implementazione di AWS PostgreSQL". Duvete aduprà Endpoint cum'è l'url di u servitore di basa di dati, chì hè visualizatu in a sezione Connettività è sicurità:
Per cunnette currettamente Spark è Kafka, duvete eseguisce u travagliu via smark-submit usendu l'artefattu. spark-streaming-kafka-0-8_2.11. Inoltre, useremu ancu un artefattu per interagisce cù a basa di dati PostgreSQL; li trasfereremu via --packages.
Per a flessibilità di u script, includeremu ancu cum'è parametri di input u nome di u servitore di messagiu è u tema da quale vulemu riceve dati.
Dunque, hè ora di lancià è verificà e funziunalità di u sistema:
Tuttu hà travagliatu! Comu pudete vede in a stampa quì sottu, mentre l'applicazione hè in esecuzione, i novi risultati di aggregazione sò emessi ogni 2 seconde, perchè avemu stabilitu l'intervallu di batching à 2 seconde quandu avemu creatu l'oggettu StreamingContext:
In seguitu, facemu una dumanda simplice à a basa di dati per verificà a presenza di registri in a tavula flussu di transazzione:
cunchiusioni
Questu articulu hà guardatu un esempiu di trasfurmazioni in flussu di l'infurmazioni cù Spark Streaming in cunjunzione cù Apache Kafka è PostgreSQL. Cù a crescita di dati da diverse fonti, hè difficiule di sopravvalutà u valore praticu di Spark Streaming per creà streaming è applicazioni in tempu reale.
Pudete truvà u codice fonte sanu in u mo repository à GitHub.
Sò cuntentu di discutiri stu articulu, aghju aspittatu i vostri cumenti, è speru ancu di critiche constructive da tutti i lettori attenti.
Ju ti vògliu successu!
. Inizialmente era previstu di utilizà una basa di dati PostgreSQL lucale, ma datu u mo amore per AWS, decisu di trasfurmà a basa di dati à u nuvulu. In u prossimu articulu nantu à questu tema, vi mustrarà cumu implementà tuttu u sistema descrittu sopra in AWS utilizendu AWS Kinesis è AWS EMR. Segui a nutizia !