Apache Kafka și procesarea datelor în flux cu Spark Streaming
Bună, Habr! Astăzi vom construi un sistem care va procesa fluxurile de mesaje Apache Kafka folosind Spark Streaming și vom scrie rezultatele procesării în baza de date cloud AWS RDS.
Să ne imaginăm că o anumită instituție de credit ne stabilește sarcina de a procesa tranzacțiile primite „din mers” în toate sucursalele sale. Acest lucru se poate face în scopul calculării prompte a unei poziții valutare deschise pentru trezorerie, limite sau rezultate financiare pentru tranzacții etc.
Cum să implementați acest caz fără a folosi magie și vrăji magice - citiți sub tăietură! Merge!
Desigur, procesarea unei cantități mari de date în timp real oferă oportunități ample de utilizare în sistemele moderne. Una dintre cele mai populare combinații pentru aceasta este tandemul Apache Kafka și Spark Streaming, unde Kafka creează un flux de pachete de mesaje primite, iar Spark Streaming procesează aceste pachete la un interval de timp dat.
Pentru a crește toleranța la erori a aplicației, vom folosi puncte de control. Cu acest mecanism, atunci când motorul Spark Streaming trebuie să recupereze datele pierdute, trebuie doar să se întoarcă la ultimul punct de control și să reia calculele de acolo.
Arhitectura sistemului dezvoltat
Componente folosite:
Apache Kafka este un sistem de mesagerie de publicare-abonare distribuit. Potrivit atât pentru consumul de mesaje offline, cât și online. Pentru a preveni pierderea datelor, mesajele Kafka sunt stocate pe disc și replicate în cluster. Sistemul Kafka este construit pe deasupra serviciului de sincronizare ZooKeeper;
Apache Spark Streaming - Componentă Spark pentru procesarea datelor în flux. Modulul Spark Streaming este construit folosind o arhitectură micro-loturi, în care fluxul de date este interpretat ca o secvență continuă de pachete de date mici. Spark Streaming preia date din diferite surse și le combină în pachete mici. Noi pachete sunt create la intervale regulate. La începutul fiecărui interval de timp, se creează un nou pachet și toate datele primite în acel interval sunt incluse în pachet. La sfârșitul intervalului, creșterea pachetelor se oprește. Mărimea intervalului este determinată de un parametru numit interval batch;
Apache Spark SQL - combină procesarea relațională cu programarea funcțională Spark. Datele structurate înseamnă date care au o schemă, adică un singur set de câmpuri pentru toate înregistrările. Spark SQL acceptă introducerea dintr-o varietate de surse de date structurate și, datorită disponibilității informațiilor despre schemă, poate prelua eficient doar câmpurile necesare de înregistrări și oferă, de asemenea, API-uri DataFrame;
AWS RDS este o bază de date relațională bazată pe cloud, un serviciu web relativ ieftin, care simplifică configurarea, operarea și scalarea și este administrată direct de Amazon.
Instalarea și rularea serverului Kafka
Înainte de a folosi Kafka direct, trebuie să vă asigurați că aveți Java, deoarece... JVM este folosit pentru lucru:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Următorul pas este opțional. Faptul este că setările implicite nu vă permit să utilizați pe deplin toate caracteristicile Apache Kafka. De exemplu, ștergeți un subiect, o categorie, un grup în care pot fi publicate mesaje. Pentru a schimba acest lucru, să edităm fișierul de configurare:
vim ~/kafka/config/server.properties
Adăugați următoarele la sfârșitul fișierului:
delete.topic.enable = true
Înainte de a porni serverul Kafka, trebuie să porniți serverul ZooKeeper; vom folosi scriptul auxiliar care vine cu distribuția Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
După ce ZooKeeper a pornit cu succes, lansați serverul Kafka într-un terminal separat:
Sa ratam momentele de testare a producatorului si consumatorului pentru tema nou creata. Mai multe detalii despre cum puteți testa trimiterea și primirea mesajelor sunt scrise în documentația oficială - Trimite câteva mesaje. Ei bine, trecem la scrierea unui producător în Python folosind API-ul KafkaProducer.
Scrierea producătorului
Producătorul va genera date aleatorii - 100 de mesaje în fiecare secundă. Prin date aleatorii înțelegem un dicționar format din trei câmpuri:
Branch firma — denumirea punctului de vânzare al instituției de credit;
Monedă — moneda tranzacției;
Sumă - suma tranzacției. Suma va fi un număr pozitiv dacă este o achiziție de monedă de către Bancă și un număr negativ dacă este o vânzare.
În continuare, folosind metoda send, trimitem un mesaj către server, la subiectul de care avem nevoie, în format JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Când rulăm scriptul, primim următoarele mesaje în terminal:
Aceasta înseamnă că totul funcționează așa cum ne-am dorit - producătorul generează și trimite mesaje la subiectul de care avem nevoie.
Următorul pas este să instalați Spark și să procesați acest flux de mesaje.
Instalarea Apache Spark
Apache Spark este o platformă de calcul cluster universală și de înaltă performanță.
Spark are performanțe mai bune decât implementările populare ale modelului MapReduce, în timp ce acceptă o gamă mai largă de tipuri de calcul, inclusiv interogări interactive și procesare flux. Viteza joacă un rol important atunci când procesați cantități mari de date, deoarece viteza este cea care vă permite să lucrați interactiv fără să petreceți minute sau ore de așteptare. Unul dintre cele mai mari puncte forte ale lui Spark care îl face atât de rapid este capacitatea sa de a efectua calcule în memorie.
Acest cadru este scris în Scala, deci trebuie să îl instalați mai întâi:
sudo apt-get install scala
Descărcați distribuția Spark de pe site-ul oficial:
Rulați comanda de mai jos după ce faceți modificări în bashrc:
source ~/.bashrc
Implementarea AWS PostgreSQL
Tot ce rămâne este să implementăm baza de date în care vom încărca informațiile procesate din fluxuri. Pentru aceasta vom folosi serviciul AWS RDS.
Accesați consola AWS -> AWS RDS -> Baze de date -> Creare bază de date:
Selectați PostgreSQL și faceți clic pe Următorul:
Deoarece Acest exemplu este doar în scopuri educaționale; vom folosi un server gratuit „cel puțin” (nivel gratuit):
Apoi, punem o bifă în blocul Free Tier, iar după aceea ni se va oferi automat o instanță a clasei t2.micro - deși slabă, este gratuită și destul de potrivită pentru sarcina noastră:
Urmează lucruri foarte importante: numele instanței bazei de date, numele utilizatorului principal și parola acestuia. Să numim instanța: myHabrTest, utilizator principal: habr, parola: habr12345 și faceți clic pe butonul Următorul:
Pe pagina următoare sunt parametrii responsabili pentru accesibilitatea serverului nostru de baze de date din exterior (Accesibilitatea publică) și disponibilitatea portului:
Să creăm o nouă setare pentru grupul de securitate VPC, care va permite accesul extern la serverul nostru de baze de date prin portul 5432 (PostgreSQL).
Să mergem la consola AWS într-o fereastră separată a browserului la tabloul de bord VPC -> Grupuri de securitate -> secțiunea Creare grup de securitate:
Setăm numele grupului de securitate - PostgreSQL, o descriere, indicăm cu ce VPC ar trebui să fie asociat acest grup și facem clic pe butonul Creare:
Completați regulile de intrare pentru portul 5432 pentru grupul nou creat, așa cum se arată în imaginea de mai jos. Nu puteți specifica portul manual, dar selectați PostgreSQL din lista derulantă Tip.
Strict vorbind, valoarea ::/0 înseamnă disponibilitatea traficului de intrare către server din întreaga lume, ceea ce canonic nu este în întregime adevărat, dar pentru a analiza exemplul, să ne permitem să folosim această abordare:
Revenim la pagina browserului, unde avem deschisă „Configurarea setărilor avansate” și selectăm în secțiunea grupuri de securitate VPC -> Alegeți grupurile de securitate VPC existente -> PostgreSQL:
Apoi, în Opțiuni bază de date -> Nume bază de date -> setați numele - habrDB.
Putem lăsa parametrii rămași, cu excepția dezactivării backupului (perioada de păstrare a backupului - 0 zile), monitorizării și Performance Insights, în mod implicit. Faceți clic pe butonul Creați o bază de date:
Manipulator de fire
Etapa finală va fi dezvoltarea unui job Spark, care va procesa date noi venite de la Kafka la fiecare două secunde și va introduce rezultatul în baza de date.
După cum sa menționat mai sus, punctele de control sunt un mecanism de bază în SparkStreaming care trebuie configurat pentru a asigura toleranța la erori. Vom folosi puncte de control și, dacă procedura eșuează, modulul Spark Streaming va trebui doar să revină la ultimul punct de control și să reia calculele de pe acesta pentru a recupera datele pierdute.
Punctul de control poate fi activat prin setarea unui director pe un sistem de fișiere fiabil și tolerant la erori (cum ar fi HDFS, S3 etc.) în care vor fi stocate informațiile despre punctul de control. Acest lucru se face folosind, de exemplu:
streamingContext.checkpoint(checkpointDirectory)
În exemplul nostru, vom folosi următoarea abordare, și anume, dacă checkpointDirectory există, atunci contextul va fi recreat din datele punctului de control. Dacă directorul nu există (adică executat pentru prima dată), atunci functionToCreateContext este apelat pentru a crea un nou context și a configura DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Creăm un obiect DirectStream pentru a vă conecta la subiectul „tranzacție” folosind metoda createDirectStream a bibliotecii KafkaUtils:
Folosind Spark SQL, facem o grupare simplă și afișăm rezultatul în consolă:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Obținerea textului de interogare și rularea acestuia prin Spark SQL:
Și apoi salvăm datele agregate rezultate într-un tabel în AWS RDS. Pentru a salva rezultatele agregării într-un tabel al bazei de date, vom folosi metoda de scriere a obiectului DataFrame:
Câteva cuvinte despre configurarea unei conexiuni la AWS RDS. Am creat utilizatorul și parola pentru acesta la pasul „Implementarea AWS PostgreSQL”. Ar trebui să utilizați Endpoint ca adresă URL a serverului de baze de date, care este afișată în secțiunea Conectivitate și securitate:
Pentru a conecta corect Spark și Kafka, ar trebui să rulați jobul prin smark-submit folosind artefactul spark-streaming-kafka-0-8_2.11. În plus, vom folosi și un artefact pentru a interacționa cu baza de date PostgreSQL; le vom transfera prin --packages.
Pentru flexibilitatea scriptului vom include ca parametri de intrare si numele serverului de mesaje si subiectul de la care dorim sa primim date.
Deci, este timpul să lansați și să verificați funcționalitatea sistemului:
Totul a mers! După cum puteți vedea în imaginea de mai jos, în timp ce aplicația rulează, noi rezultate de agregare sunt afișate la fiecare 2 secunde, deoarece am setat intervalul de loturi la 2 secunde când am creat obiectul StreamingContext:
Apoi, facem o interogare simplă la baza de date pentru a verifica prezența înregistrărilor în tabel flux_ tranzacție:
Concluzie
Acest articol a analizat un exemplu de procesare în flux a informațiilor folosind Spark Streaming împreună cu Apache Kafka și PostgreSQL. Odată cu creșterea datelor din diverse surse, este dificil să supraestimați valoarea practică a Spark Streaming pentru crearea de streaming și aplicații în timp real.
Puteți găsi codul sursă complet în depozitul meu la GitHub.
Mă bucur să discut despre acest articol, aștept cu nerăbdare comentariile voastre și, de asemenea, sper la critici constructive din partea tuturor cititorilor grijulii.
Vă urez succes!
Ps. Inițial a fost planificat să folosesc o bază de date locală PostgreSQL, dar având în vedere dragostea mea pentru AWS, am decis să mut baza de date în cloud. În următorul articol despre acest subiect, voi arăta cum să implementez întregul sistem descris mai sus în AWS folosind AWS Kinesis și AWS EMR. Urmăriți știrile!