Apache Kafka i Streaming obrada podataka uz Spark Streaming
Zdravo, Habr! Danas ćemo izgraditi sistem koji će obraditi Apache Kafka tokove poruka koristeći Spark Streaming i zapisivati rezultate obrade u AWS RDS cloud bazu podataka.
Zamislimo da nam određena kreditna institucija postavlja zadatak da obrađujemo pristigle transakcije „u hodu“ u svim svojim filijalama. Ovo se može učiniti u svrhu brzog izračunavanja otvorene valutne pozicije za trezor, limita ili finansijskih rezultata za transakcije itd.
Kako implementirati ovaj slučaj bez upotrebe magije i magijskih čarolija - pročitajte ispod! Idi!
Naravno, obrada velike količine podataka u realnom vremenu pruža široke mogućnosti za upotrebu u savremenim sistemima. Jedna od najpopularnijih kombinacija za ovo je tandem Apache Kafka i Spark Streaming, gdje Kafka kreira tok dolaznih paketa poruka, a Spark Streaming obrađuje te pakete u datom vremenskom intervalu.
Da bismo povećali toleranciju na greške aplikacije, koristit ćemo kontrolne točke. Sa ovim mehanizmom, kada Spark Streaming motor treba da povrati izgubljene podatke, samo treba da se vrati na poslednju kontrolnu tačku i odatle nastavi proračune.
Arhitektura razvijenog sistema
Korištene komponente:
Apache Kafka je distribuirani sistem za razmjenu poruka za objavljivanje-pretplatu. Pogodno za offline i online konzumaciju poruka. Da bi se spriječio gubitak podataka, Kafka poruke se pohranjuju na disk i repliciraju unutar klastera. Kafka sistem je izgrađen na vrhu servisa za sinhronizaciju ZooKeeper;
Apache Spark Streaming - Spark komponenta za obradu streaming podataka. Modul Spark Streaming je izgrađen korištenjem mikro-batch arhitekture, gdje se tok podataka tumači kao kontinuirani niz malih paketa podataka. Spark Streaming uzima podatke iz različitih izvora i kombinuje ih u male pakete. Novi paketi se kreiraju u redovnim intervalima. Na početku svakog vremenskog intervala kreira se novi paket, a svi podaci primljeni tokom tog intervala se uključuju u paket. Na kraju intervala, rast paketa se zaustavlja. Veličina intervala je određena parametrom koji se zove interval serije;
Apache Spark SQL - kombinuje relacionu obradu sa Spark funkcionalnim programiranjem. Strukturirani podaci označavaju podatke koji imaju šemu, odnosno jedan skup polja za sve zapise. Spark SQL podržava unos iz različitih strukturiranih izvora podataka i, zahvaljujući dostupnosti informacija o šemi, može efikasno dohvatiti samo potrebna polja zapisa, a takođe pruža API-je za DataFrame;
AWS RDS je relativno jeftina relaciona baza podataka zasnovana na oblaku, web servis koji pojednostavljuje podešavanje, rad i skaliranje, a njime direktno administrira Amazon.
Instaliranje i pokretanje Kafka servera
Prije nego što direktno koristite Kafku, morate biti sigurni da imate Javu, jer... JVM se koristi za rad:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Sljedeći korak nije obavezan. Činjenica je da vam zadane postavke ne dozvoljavaju da u potpunosti koristite sve mogućnosti Apache Kafke. Na primjer, izbrišite temu, kategoriju, grupu u kojoj se poruke mogu objavljivati. Da ovo promijenimo, uredimo konfiguracijski fajl:
vim ~/kafka/config/server.properties
Dodajte sljedeće na kraj datoteke:
delete.topic.enable = true
Prije pokretanja Kafka servera, morate pokrenuti server ZooKeeper; koristit ćemo pomoćnu skriptu koja dolazi s Kafka distribucijom:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Nakon što se ZooKeeper uspješno pokrene, pokrenite Kafka server u zasebnom terminalu:
Propustimo trenutke testiranja proizvođača i potrošača za novonastalu temu. Više detalja o tome kako možete testirati slanje i primanje poruka napisano je u službenoj dokumentaciji - Pošaljite neke poruke. Pa, prelazimo na pisanje producenta u Pythonu koristeći KafkaProducer API.
Pisanje producenta
Proizvođač će generirati nasumične podatke - 100 poruka svake sekunde. Pod slučajnim podacima podrazumijevamo rječnik koji se sastoji od tri polja:
grana — naziv prodajnog mjesta kreditne institucije;
valuta — valuta transakcije;
iznos — iznos transakcije. Iznos će biti pozitivan broj ako se radi o kupovini valute od strane Banke, a negativan ako se radi o prodaji.
Zatim, koristeći metodu send, šaljemo poruku serveru, na temu koja nam je potrebna, u JSON formatu:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Prilikom pokretanja skripte, primamo sljedeće poruke u terminalu:
To znači da sve funkcioniše kako smo želeli - proizvođač generiše i šalje poruke na temu koja nam je potrebna.
Sljedeći korak je instaliranje Spark-a i obrada ovog toka poruka.
Instaliranje Apache Sparka
Apache Spark je univerzalna i klaster računarska platforma visokih performansi.
Spark radi bolje od popularnih implementacija MapReduce modela dok podržava širi raspon tipova izračunavanja, uključujući interaktivne upite i obradu toka. Brzina igra važnu ulogu pri obradi velikih količina podataka, jer je brzina ta koja vam omogućava da radite interaktivno bez trošenja minuta ili sati na čekanje. Jedna od najvećih prednosti Spark-a koja ga čini tako brzim je njegova sposobnost izvođenja računanja u memoriji.
Ovaj okvir je napisan u Scali, tako da ga prvo morate instalirati:
sudo apt-get install scala
Preuzmite Spark distribuciju sa službene web stranice:
Pokrenite naredbu ispod nakon što izvršite promjene u bashrc:
source ~/.bashrc
Primena AWS PostgreSQL
Ostaje samo da se postavi baza podataka u koju ćemo učitati obrađene informacije iz streamova. Za ovo ćemo koristiti uslugu AWS RDS.
Idite na AWS konzolu -> AWS RDS -> Baze podataka -> Kreiraj bazu podataka:
Odaberite PostgreSQL i kliknite Dalje:
Jer Ovaj primjer je samo u obrazovne svrhe; koristit ćemo besplatni server "najmanje" (Free Tier):
Zatim stavljamo kvačicu u blok Free Tier, a nakon toga će nam automatski biti ponuđena instanca klase t2.micro - iako slaba, besplatna je i sasvim prikladna za naš zadatak:
Slijede vrlo važne stvari: ime instance baze podataka, ime glavnog korisnika i njegova lozinka. Nazovimo instancu: myHabrTest, glavni korisnik: habr, lozinka: habr12345 i kliknite na dugme Dalje:
Na sljedećoj stranici nalaze se parametri odgovorni za pristupačnost našeg servera baze podataka izvana (Public accessibility) i dostupnost porta:
Kreirajmo novu postavku za VPC sigurnosnu grupu, koja će omogućiti eksterni pristup našem serveru baze podataka preko porta 5432 (PostgreSQL).
Idemo na AWS konzolu u zasebnom prozoru pretraživača na VPC Dashboard -> Sigurnosne grupe -> Kreiraj odjeljak sigurnosne grupe:
Postavljamo naziv za Sigurnosnu grupu - PostgreSQL, opis, označavamo sa kojim VPC-om ova grupa treba biti povezana i kliknemo na dugme Kreiraj:
Popunite ulazna pravila za port 5432 za novokreiranu grupu, kao što je prikazano na slici ispod. Ne možete ručno odrediti port, već odaberite PostgreSQL sa padajuće liste Tip.
Strogo govoreći, vrijednost ::/0 znači dostupnost dolaznog prometa na server iz cijelog svijeta, što kanonski nije sasvim tačno, ali da analiziramo primjer, dozvolimo sebi da koristimo ovaj pristup:
Vraćamo se na stranicu pretraživača, gdje imamo otvorenu “Konfiguriraj napredne postavke” i odabiremo u odjeljku VPC sigurnosne grupe -> Odaberite postojeće VPC sigurnosne grupe -> PostgreSQL:
Zatim, u opcijama baze podataka -> Ime baze podataka -> postavite ime - habrDB.
Preostale parametre, osim onemogućavanja sigurnosne kopije (period zadržavanja rezervne kopije - 0 dana), nadzora i uvida u performanse, možemo ostaviti po defaultu. Kliknite na dugme Kreirajte bazu podataka:
Thread handler
Završna faza će biti razvoj Spark posla, koji će svake dvije sekunde obrađivati nove podatke koji dolaze od Kafke i unositi rezultat u bazu podataka.
Kao što je gore navedeno, kontrolne tačke su osnovni mehanizam u SparkStreaming-u koji se mora konfigurirati kako bi se osigurala tolerancija grešaka. Koristit ćemo kontrolne tačke i, ako procedura ne uspije, modul Spark Streaming će se samo trebati vratiti na posljednju kontrolnu tačku i nastaviti kalkulacije sa nje kako bi povratio izgubljene podatke.
Kontrolna tačka se može omogućiti postavljanjem direktorijuma na pouzdanom sistemu datoteka otpornom na greške (kao što je HDFS, S3, itd.) u kojem će biti pohranjene informacije o kontrolnoj tački. To se radi pomoću, na primjer:
streamingContext.checkpoint(checkpointDirectory)
U našem primjeru koristit ćemo sljedeći pristup, naime, ako checkpointDirectory postoji, onda će kontekst biti ponovo kreiran iz podataka kontrolne točke. Ako direktorij ne postoji (tj. izvršava se po prvi put), tada se poziva functionToCreateContext da kreira novi kontekst i konfigurira DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Kreiramo DirectStream objekat da se povežemo na temu "transakcije" koristeći createDirectStream metodu biblioteke KafkaUtils:
Koristeći Spark SQL, radimo jednostavno grupisanje i prikazujemo rezultat u konzoli:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Dobivanje teksta upita i pokretanje kroz Spark SQL:
A zatim spremamo rezultirajuće agregirane podatke u tabelu u AWS RDS. Za spremanje rezultata agregacije u tablicu baze podataka, koristit ćemo metodu pisanja objekta DataFrame:
Nekoliko riječi o postavljanju veze na AWS RDS. Napravili smo korisnika i lozinku za njega u koraku “Primjena AWS PostgreSQL-a”. Trebali biste koristiti Endpoint kao URL servera baze podataka, koji je prikazan u odjeljku Povezivanje i sigurnost:
Da biste ispravno povezali Spark i Kafku, trebali biste pokrenuti posao putem smark-submit koristeći artefakt spark-streaming-kafka-0-8_2.11. Osim toga, koristićemo artefakt za interakciju sa PostgreSQL bazom podataka; mi ćemo ih prenijeti putem --packagesa.
Radi fleksibilnosti skripte, kao ulazne parametre uključićemo i naziv servera poruka i temu iz koje želimo da primamo podatke.
Dakle, vrijeme je da pokrenete i provjerite funkcionalnost sistema:
Sve je uspjelo! Kao što možete vidjeti na slici ispod, dok je aplikacija pokrenuta, novi rezultati agregacije se izlaze svake 2 sekunde, jer smo postavili interval batching-a na 2 sekunde kada smo kreirali StreamingContext objekat:
Zatim pravimo jednostavan upit bazi podataka kako bismo provjerili prisustvo zapisa u tabeli transakcijski_tok:
zaključak
Ovaj članak je razmatrao primjer stream obrade informacija pomoću Spark Streaming-a u kombinaciji s Apache Kafka i PostgreSQL-om. Uz porast podataka iz različitih izvora, teško je precijeniti praktičnu vrijednost Spark Streaming-a za kreiranje streaming aplikacija i aplikacija u realnom vremenu.
Kompletan izvorni kod možete pronaći u mom spremištu na adresi GitHub.
Drago mi je da raspravljam o ovom članku, radujem se vašim komentarima, a nadam se i konstruktivnoj kritici svih brižnih čitatelja.
Želim vam uspeh!
. U početku je bilo planirano korištenje lokalne PostgreSQL baze podataka, ali s obzirom na moju ljubav prema AWS-u, odlučio sam da premjestim bazu podataka u oblak. U sljedećem članku na ovu temu pokazaću kako implementirati cijeli gore opisani sistem u AWS koristeći AWS Kinesis i AWS EMR. Pratite vijesti!