Apache Kafka și procesarea datelor în flux cu Spark Streaming

Bună, Habr! Astăzi vom construi un sistem care va procesa fluxurile de mesaje Apache Kafka folosind Spark Streaming și vom scrie rezultatele procesării în baza de date cloud AWS RDS.

Să ne imaginăm că o anumită instituție de credit ne stabilește sarcina de a procesa tranzacțiile primite „din mers” în toate sucursalele sale. Acest lucru se poate face în scopul calculării prompte a unei poziții valutare deschise pentru trezorerie, limite sau rezultate financiare pentru tranzacții etc.

Cum să implementați acest caz fără a folosi magie și vrăji magice - citiți sub tăietură! Merge!

Apache Kafka și procesarea datelor în flux cu Spark Streaming
(Sursa imagine)

Introducere

Desigur, procesarea unei cantități mari de date în timp real oferă oportunități ample de utilizare în sistemele moderne. Una dintre cele mai populare combinații pentru aceasta este tandemul Apache Kafka și Spark Streaming, unde Kafka creează un flux de pachete de mesaje primite, iar Spark Streaming procesează aceste pachete la un interval de timp dat.

Pentru a crește toleranța la erori a aplicației, vom folosi puncte de control. Cu acest mecanism, atunci când motorul Spark Streaming trebuie să recupereze datele pierdute, trebuie doar să se întoarcă la ultimul punct de control și să reia calculele de acolo.

Arhitectura sistemului dezvoltat

Apache Kafka și procesarea datelor în flux cu Spark Streaming

Componente folosite:

  • Apache Kafka este un sistem de mesagerie de publicare-abonare distribuit. Potrivit atât pentru consumul de mesaje offline, cât și online. Pentru a preveni pierderea datelor, mesajele Kafka sunt stocate pe disc și replicate în cluster. Sistemul Kafka este construit pe deasupra serviciului de sincronizare ZooKeeper;
  • Apache Spark Streaming - Componentă Spark pentru procesarea datelor în flux. Modulul Spark Streaming este construit folosind o arhitectură micro-loturi, în care fluxul de date este interpretat ca o secvență continuă de pachete de date mici. Spark Streaming preia date din diferite surse și le combină în pachete mici. Noi pachete sunt create la intervale regulate. La începutul fiecărui interval de timp, se creează un nou pachet și toate datele primite în acel interval sunt incluse în pachet. La sfârșitul intervalului, creșterea pachetelor se oprește. Mărimea intervalului este determinată de un parametru numit interval batch;
  • Apache Spark SQL - combină procesarea relațională cu programarea funcțională Spark. Datele structurate înseamnă date care au o schemă, adică un singur set de câmpuri pentru toate înregistrările. Spark SQL acceptă introducerea dintr-o varietate de surse de date structurate și, datorită disponibilității informațiilor despre schemă, poate prelua eficient doar câmpurile necesare de înregistrări și oferă, de asemenea, API-uri DataFrame;
  • AWS RDS este o bază de date relațională bazată pe cloud, un serviciu web relativ ieftin, care simplifică configurarea, operarea și scalarea și este administrată direct de Amazon.

Instalarea și rularea serverului Kafka

Înainte de a folosi Kafka direct, trebuie să vă asigurați că aveți Java, deoarece... JVM este folosit pentru lucru:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Să creăm un utilizator nou care să lucreze cu Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Apoi, descărcați distribuția de pe site-ul oficial Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Despachetați arhiva descărcată:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Următorul pas este opțional. Faptul este că setările implicite nu vă permit să utilizați pe deplin toate caracteristicile Apache Kafka. De exemplu, ștergeți un subiect, o categorie, un grup în care pot fi publicate mesaje. Pentru a schimba acest lucru, să edităm fișierul de configurare:

vim ~/kafka/config/server.properties

Adăugați următoarele la sfârșitul fișierului:

delete.topic.enable = true

Înainte de a porni serverul Kafka, trebuie să porniți serverul ZooKeeper; vom folosi scriptul auxiliar care vine cu distribuția Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

După ce ZooKeeper a pornit cu succes, lansați serverul Kafka într-un terminal separat:

bin/kafka-server-start.sh config/server.properties

Să creăm un subiect nou numit Tranzacție:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Să ne asigurăm că a fost creat un subiect cu numărul necesar de partiții și replicare:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka și procesarea datelor în flux cu Spark Streaming

Sa ratam momentele de testare a producatorului si consumatorului pentru tema nou creata. Mai multe detalii despre cum puteți testa trimiterea și primirea mesajelor sunt scrise în documentația oficială - Trimite câteva mesaje. Ei bine, trecem la scrierea unui producător în Python folosind API-ul KafkaProducer.

Scrierea producătorului

Producătorul va genera date aleatorii - 100 de mesaje în fiecare secundă. Prin date aleatorii înțelegem un dicționar format din trei câmpuri:

  • Branch firma — denumirea punctului de vânzare al instituției de credit;
  • Monedă — moneda tranzacției;
  • Sumă - suma tranzacției. Suma va fi un număr pozitiv dacă este o achiziție de monedă de către Bancă și un număr negativ dacă este o vânzare.

Codul producătorului arată astfel:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

În continuare, folosind metoda send, trimitem un mesaj către server, la subiectul de care avem nevoie, în format JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Când rulăm scriptul, primim următoarele mesaje în terminal:

Apache Kafka și procesarea datelor în flux cu Spark Streaming

Aceasta înseamnă că totul funcționează așa cum ne-am dorit - producătorul generează și trimite mesaje la subiectul de care avem nevoie.
Următorul pas este să instalați Spark și să procesați acest flux de mesaje.

Instalarea Apache Spark

Apache Spark este o platformă de calcul cluster universală și de înaltă performanță.

Spark are performanțe mai bune decât implementările populare ale modelului MapReduce, în timp ce acceptă o gamă mai largă de tipuri de calcul, inclusiv interogări interactive și procesare flux. Viteza joacă un rol important atunci când procesați cantități mari de date, deoarece viteza este cea care vă permite să lucrați interactiv fără să petreceți minute sau ore de așteptare. Unul dintre cele mai mari puncte forte ale lui Spark care îl face atât de rapid este capacitatea sa de a efectua calcule în memorie.

Acest cadru este scris în Scala, deci trebuie să îl instalați mai întâi:

sudo apt-get install scala

Descărcați distribuția Spark de pe site-ul oficial:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Despachetați arhiva:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Adăugați calea către Spark în fișierul bash:

vim ~/.bashrc

Adăugați următoarele rânduri prin editor:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Rulați comanda de mai jos după ce faceți modificări în bashrc:

source ~/.bashrc

Implementarea AWS PostgreSQL

Tot ce rămâne este să implementăm baza de date în care vom încărca informațiile procesate din fluxuri. Pentru aceasta vom folosi serviciul AWS RDS.

Accesați consola AWS -> AWS RDS -> Baze de date -> Creare bază de date:
Apache Kafka și procesarea datelor în flux cu Spark Streaming

Selectați PostgreSQL și faceți clic pe Următorul:
Apache Kafka și procesarea datelor în flux cu Spark Streaming

Deoarece Acest exemplu este doar în scopuri educaționale; vom folosi un server gratuit „cel puțin” (nivel gratuit):
Apache Kafka și procesarea datelor în flux cu Spark Streaming

Apoi, punem o bifă în blocul Free Tier, iar după aceea ni se va oferi automat o instanță a clasei t2.micro - deși slabă, este gratuită și destul de potrivită pentru sarcina noastră:
Apache Kafka și procesarea datelor în flux cu Spark Streaming

Urmează lucruri foarte importante: numele instanței bazei de date, numele utilizatorului principal și parola acestuia. Să numim instanța: myHabrTest, utilizator principal: habr, parola: habr12345 și faceți clic pe butonul Următorul:
Apache Kafka și procesarea datelor în flux cu Spark Streaming

Pe pagina următoare sunt parametrii responsabili pentru accesibilitatea serverului nostru de baze de date din exterior (Accesibilitatea publică) și disponibilitatea portului:

Apache Kafka și procesarea datelor în flux cu Spark Streaming

Să creăm o nouă setare pentru grupul de securitate VPC, care va permite accesul extern la serverul nostru de baze de date prin portul 5432 (PostgreSQL).
Să mergem la consola AWS într-o fereastră separată a browserului la tabloul de bord VPC -> Grupuri de securitate -> secțiunea Creare grup de securitate:
Apache Kafka și procesarea datelor în flux cu Spark Streaming

Setăm numele grupului de securitate - PostgreSQL, o descriere, indicăm cu ce VPC ar trebui să fie asociat acest grup și facem clic pe butonul Creare:
Apache Kafka și procesarea datelor în flux cu Spark Streaming

Completați regulile de intrare pentru portul 5432 pentru grupul nou creat, așa cum se arată în imaginea de mai jos. Nu puteți specifica portul manual, dar selectați PostgreSQL din lista derulantă Tip.

Strict vorbind, valoarea ::/0 înseamnă disponibilitatea traficului de intrare către server din întreaga lume, ceea ce canonic nu este în întregime adevărat, dar pentru a analiza exemplul, să ne permitem să folosim această abordare:
Apache Kafka și procesarea datelor în flux cu Spark Streaming

Revenim la pagina browserului, unde avem deschisă „Configurarea setărilor avansate” și selectăm în secțiunea grupuri de securitate VPC -> Alegeți grupurile de securitate VPC existente -> PostgreSQL:
Apache Kafka și procesarea datelor în flux cu Spark Streaming

Apoi, în Opțiuni bază de date -> Nume bază de date -> setați numele - habrDB.

Putem lăsa parametrii rămași, cu excepția dezactivării backupului (perioada de păstrare a backupului - 0 zile), monitorizării și Performance Insights, în mod implicit. Faceți clic pe butonul Creați o bază de date:
Apache Kafka și procesarea datelor în flux cu Spark Streaming

Manipulator de fire

Etapa finală va fi dezvoltarea unui job Spark, care va procesa date noi venite de la Kafka la fiecare două secunde și va introduce rezultatul în baza de date.

După cum sa menționat mai sus, punctele de control sunt un mecanism de bază în SparkStreaming care trebuie configurat pentru a asigura toleranța la erori. Vom folosi puncte de control și, dacă procedura eșuează, modulul Spark Streaming va trebui doar să revină la ultimul punct de control și să reia calculele de pe acesta pentru a recupera datele pierdute.

Punctul de control poate fi activat prin setarea unui director pe un sistem de fișiere fiabil și tolerant la erori (cum ar fi HDFS, S3 etc.) în care vor fi stocate informațiile despre punctul de control. Acest lucru se face folosind, de exemplu:

streamingContext.checkpoint(checkpointDirectory)

În exemplul nostru, vom folosi următoarea abordare, și anume, dacă checkpointDirectory există, atunci contextul va fi recreat din datele punctului de control. Dacă directorul nu există (adică executat pentru prima dată), atunci functionToCreateContext este apelat pentru a crea un nou context și a configura DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Creăm un obiect DirectStream pentru a vă conecta la subiectul „tranzacție” folosind metoda createDirectStream a bibliotecii KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Analizarea datelor primite în format JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Folosind Spark SQL, facem o grupare simplă și afișăm rezultatul în consolă:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Obținerea textului de interogare și rularea acestuia prin Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Și apoi salvăm datele agregate rezultate într-un tabel în AWS RDS. Pentru a salva rezultatele agregării într-un tabel al bazei de date, vom folosi metoda de scriere a obiectului DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Câteva cuvinte despre configurarea unei conexiuni la AWS RDS. Am creat utilizatorul și parola pentru acesta la pasul „Implementarea AWS PostgreSQL”. Ar trebui să utilizați Endpoint ca adresă URL a serverului de baze de date, care este afișată în secțiunea Conectivitate și securitate:

Apache Kafka și procesarea datelor în flux cu Spark Streaming

Pentru a conecta corect Spark și Kafka, ar trebui să rulați jobul prin smark-submit folosind artefactul spark-streaming-kafka-0-8_2.11. În plus, vom folosi și un artefact pentru a interacționa cu baza de date PostgreSQL; le vom transfera prin --packages.

Pentru flexibilitatea scriptului vom include ca parametri de intrare si numele serverului de mesaje si subiectul de la care dorim sa primim date.

Deci, este timpul să lansați și să verificați funcționalitatea sistemului:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Totul a mers! După cum puteți vedea în imaginea de mai jos, în timp ce aplicația rulează, noi rezultate de agregare sunt afișate la fiecare 2 secunde, deoarece am setat intervalul de loturi la 2 secunde când am creat obiectul StreamingContext:

Apache Kafka și procesarea datelor în flux cu Spark Streaming

Apoi, facem o interogare simplă la baza de date pentru a verifica prezența înregistrărilor în tabel flux_ tranzacție:

Apache Kafka și procesarea datelor în flux cu Spark Streaming

Concluzie

Acest articol a analizat un exemplu de procesare în flux a informațiilor folosind Spark Streaming împreună cu Apache Kafka și PostgreSQL. Odată cu creșterea datelor din diverse surse, este dificil să supraestimați valoarea practică a Spark Streaming pentru crearea de streaming și aplicații în timp real.

Puteți găsi codul sursă complet în depozitul meu la GitHub.

Mă bucur să discut despre acest articol, aștept cu nerăbdare comentariile voastre și, de asemenea, sper la critici constructive din partea tuturor cititorilor grijulii.

Vă urez succes!

Ps. Inițial a fost planificat să folosesc o bază de date locală PostgreSQL, dar având în vedere dragostea mea pentru AWS, am decis să mut baza de date în cloud. În următorul articol despre acest subiect, voi arăta cum să implementez întregul sistem descris mai sus în AWS folosind AWS Kinesis și AWS EMR. Urmăriți știrile!

Sursa: www.habr.com

Adauga un comentariu