Apache Kafka un straumÄÅ”anas datu apstrÄde ar Spark straumÄÅ”anu
Sveiks, Habr! Å odien mÄs izveidosim sistÄmu, kas apstrÄdÄs Apache Kafka ziÅojumu straumes, izmantojot Spark Streaming, un ierakstÄ«s apstrÄdes rezultÄtus AWS RDS mÄkoÅa datubÄzÄ.
IedomÄsimies, ka noteikta kredÄ«tiestÄde mums uzdod veikt ienÄkoÅ”os darÄ«jumus "lidojuma laikÄ" visÄs tÄs filiÄlÄs. To var izdarÄ«t, lai Ätri aprÄÄ·inÄtu atklÄto valÅ«tas pozÄ«ciju valsts kasei, darÄ«jumu limitus vai finanÅ”u rezultÄtus utt.
KÄ Ä«stenot Å”o gadÄ«jumu, neizmantojot burvju un burvju burvestÄ«bas - lasiet zem griezuma! Aiziet!
Protams, liela datu apjoma apstrÄde reÄllaikÄ sniedz plaÅ”as izmantoÅ”anas iespÄjas mÅ«sdienu sistÄmÄs. Viena no populÄrÄkajÄm kombinÄcijÄm Å”im nolÅ«kam ir Apache Kafka un Spark Streaming tandÄms, kur Kafka izveido ienÄkoÅ”o ziÅojumu pakeÅ”u straumi, bet Spark Streaming apstrÄdÄ Å”Ä«s paketes noteiktÄ laika intervÄlÄ.
Lai palielinÄtu lietojumprogrammas kļūdu toleranci, mÄs izmantosim kontrolpunktus. Izmantojot Å”o mehÄnismu, kad Spark Streaming dzinÄjam ir jÄatgÅ«st zaudÄtie dati, tam tikai jÄatgriežas pÄdÄjÄ kontrolpunktÄ un jÄatsÄk aprÄÄ·ini no turienes.
IzstrÄdÄtÄs sistÄmas arhitektÅ«ra
IzmantotÄs sastÄvdaļas:
Apache Kafka ir izplatÄ«ta publicÄÅ”anas un abonÄÅ”anas ziÅojumapmaiÅas sistÄma. PiemÄrots gan bezsaistes, gan tieÅ”saistes ziÅojumu patÄriÅam. Lai novÄrstu datu zudumu, Kafka ziÅojumi tiek glabÄti diskÄ un tiek replicÄti klasterÄ«. Kafka sistÄma ir balstÄ«ta uz ZooKeeper sinhronizÄcijas pakalpojumu;
Apache Spark straumÄÅ”ana - Spark komponents straumÄÅ”anas datu apstrÄdei. Spark Streaming modulis ir izveidots, izmantojot mikropakeÅ”u arhitektÅ«ru, kur datu straume tiek interpretÄta kÄ nepÄrtraukta mazu datu pakeÅ”u secÄ«ba. Spark Streaming Åem datus no dažÄdiem avotiem un apvieno tos mazÄs pakotnÄs. Jaunas paketes tiek veidotas regulÄri. Katra laika intervÄla sÄkumÄ tiek izveidota jauna pakete, un visi Å”ajÄ intervÄlÄ saÅemtie dati tiek iekļauti paketÄ. IntervÄla beigÄs pakeÅ”u augÅ”ana apstÄjas. IntervÄla lielumu nosaka parametrs, ko sauc par partijas intervÄlu;
Apache Spark SQL - apvieno relÄciju apstrÄdi ar Spark funkcionÄlo programmÄÅ”anu. StrukturÄtie dati ir dati, kuriem ir shÄma, tas ir, viena lauku kopa visiem ierakstiem. Spark SQL atbalsta ievadi no dažÄdiem strukturÄtu datu avotiem un, pateicoties shÄmas informÄcijas pieejamÄ«bai, var efektÄ«vi izgÅ«t tikai nepiecieÅ”amos ierakstu laukus, kÄ arÄ« nodroÅ”ina DataFrame API;
AWS RDS ir salÄ«dzinoÅ”i lÄta uz mÄkoÅiem balstÄ«ta relÄciju datubÄze, tÄ«mekļa pakalpojums, kas vienkÄrÅ”o iestatÄ«Å”anu, darbÄ«bu un mÄrogoÅ”anu, un to tieÅ”i administrÄ Amazon.
Kafka servera instalÄÅ”ana un palaiÅ”ana
Pirms tieÅ”i izmantojat Kafka, jums ir jÄpÄrliecinÄs, vai jums ir Java, jo... JVM tiek izmantots darbam:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
NÄkamÄ darbÄ«ba nav obligÄta. Fakts ir tÄds, ka noklusÄjuma iestatÄ«jumi neļauj pilnÄ«bÄ izmantot visas Apache Kafka iespÄjas. PiemÄram, izdzÄsiet tÄmu, kategoriju, grupu, kurai var publicÄt ziÅojumus. Lai to mainÄ«tu, rediÄ£Äsim konfigurÄcijas failu:
vim ~/kafka/config/server.properties
Faila beigÄs pievienojiet Å”Ädu tekstu:
delete.topic.enable = true
Pirms Kafka servera palaiÅ”anas jums ir jÄstartÄ serveris ZooKeeper; mÄs izmantosim Kafka izplatÄ«Å”anas komplektÄcijÄ iekļauto palÄ«gskriptu:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Kad ZooKeeper ir veiksmÄ«gi startÄjis, palaidiet Kafka serveri atseviÅ”Ä·Ä terminÄlÄ«:
PalaidÄ«sim garÄm brīžus, kad tiek pÄrbaudÄ«ts ražotÄjs un patÄrÄtÄjs jaunizveidotajai tÄmai. SÄ«kÄka informÄcija par to, kÄ pÄrbaudÄ«t ziÅojumu sÅ«tÄ«Å”anu un saÅemÅ”anu, ir rakstÄ«ta oficiÄlajÄ dokumentÄcijÄ - NosÅ«tiet dažas ziÅas. MÄs pÄrejam pie ražotÄja rakstÄ«Å”anas Python, izmantojot KafkaProducer API.
Producentu rakstīŔana
RažotÄjs Ä£enerÄs nejauÅ”us datus - 100 ziÅas katru sekundi. Ar nejauÅ”iem datiem mÄs saprotam vÄrdnÄ«cu, kas sastÄv no trim laukiem:
FiliÄle ā kredÄ«tiestÄdes tirdzniecÄ«bas vietas nosaukums;
ValÅ«ta ā darÄ«juma valÅ«ta;
summa ā darÄ«juma summa. Summa bÅ«s pozitÄ«vs skaitlis, ja tas ir Bankas valÅ«tas pirkums, un negatÄ«vs skaitlis, ja tas ir pÄrdoÅ”ana.
PÄc tam, izmantojot sÅ«tÄ«Å”anas metodi, mÄs nosÅ«tÄm ziÅojumu uz serveri, uz mums nepiecieÅ”amo tÄmu, JSON formÄtÄ:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Tas nozÄ«mÄ, ka viss darbojas tÄ, kÄ mÄs vÄlÄjÄmies ā producents Ä£enerÄ un sÅ«ta ziÅas uz mums nepiecieÅ”amo tÄmu.
NÄkamais solis ir instalÄt Spark un apstrÄdÄt Å”o ziÅojumu straumi.
Apache Spark instalÄÅ”ana
Apache Spark ir universÄla un augstas veiktspÄjas klasteru skaitļoÅ”anas platforma.
Spark darbojas labÄk nekÄ populÄrÄs MapReduce modeļa implementÄcijas, vienlaikus atbalstot plaÅ”Äku skaitļoÅ”anas veidu klÄstu, tostarp interaktÄ«vos vaicÄjumus un straumes apstrÄdi. Ätrumam ir svarÄ«ga loma, apstrÄdÄjot lielu datu apjomu, jo tas ir Ätrums, kas ļauj strÄdÄt interaktÄ«vi, netÄrÄjot minÅ«tes vai stundas. Viena no lielÄkajÄm lietÄm, kas padara Spark tik Ätru, ir tÄs spÄja veikt aprÄÄ·inus atmiÅÄ.
Å is ietvars ir rakstÄ«ts programmÄ Scala, tÄpÄc vispirms tas jÄinstalÄ:
sudo apt-get install scala
LejupielÄdÄjiet Spark izplatÄ«Å”anu no oficiÄlÄs vietnes:
Atliek tikai izvietot datu bÄzi, kurÄ augÅ”upielÄdÄsim apstrÄdÄto informÄciju no straumÄm. Å im nolÅ«kam mÄs izmantosim AWS RDS pakalpojumu.
Dodieties uz AWS konsoli -> AWS RDS -> DatubÄzes -> Izveidot datu bÄzi:
Atlasiet PostgreSQL un noklikŔķiniet uz TÄlÄk:
Jo Å is piemÄrs ir paredzÄts tikai izglÄ«tojoÅ”iem nolÅ«kiem; mÄs izmantosim bezmaksas serveri āvismazā (bezmaksas lÄ«menis):
TÄlÄk mÄs ieliekam Ä·eksÄ«ti Free Tier blokÄ, un pÄc tam mums automÄtiski tiks piedÄvÄts t2.micro klases gadÄ«jums - lai arÄ« vÄjÅ”, tas ir bezmaksas un diezgan piemÄrots mÅ«su uzdevumam:
TÄlÄk seko ļoti svarÄ«gas lietas: datu bÄzes instances nosaukums, galvenÄ lietotÄja vÄrds un viÅa parole. Nosauksim gadÄ«jumu: myHabrTest, galvenais lietotÄjs: habr, parole: habr12345 un noklikŔķiniet uz pogas TÄlÄk:
NÄkamajÄ lapÄ ir norÄdÄ«ti parametri, kas atbild par mÅ«su datu bÄzes servera pieejamÄ«bu no Ärpuses (publiskÄ pieejamÄ«ba) un portu pieejamÄ«bu:
Izveidosim jaunu iestatÄ«jumu VPC droŔības grupai, kas ļaus ÄrÄji piekļūt mÅ«su datu bÄzes serverim caur portu 5432 (PostgreSQL).
Dosimies uz AWS konsoli atseviÅ”Ä·Ä pÄrlÅ«kprogrammas logÄ uz VPC informÄcijas paneli -> DroŔības grupas -> Izveidot droŔības grupu sadaļu:
MÄs iestatÄm droŔības grupas nosaukumu - PostgreSQL, aprakstu, norÄdiet, ar kuru VPC Ŕī grupa ir jÄsaista, un noklikŔķiniet uz pogas Izveidot:
Aizpildiet jaunizveidotÄs grupas portam 5432 ienÄkoÅ”os noteikumus, kÄ parÄdÄ«ts attÄlÄ zemÄk. Portu nevar norÄdÄ«t manuÄli, bet nolaižamajÄ sarakstÄ Type atlasiet PostgreSQL.
Stingri runÄjot, vÄrtÄ«ba ::/0 nozÄ«mÄ ienÄkoÅ”Äs trafika pieejamÄ«bu serverim no visas pasaules, kas kanoniski nav pilnÄ«gi taisnÄ«ba, taÄu, lai analizÄtu piemÄru, ļausim sev izmantot Å”Ädu pieeju:
MÄs atgriežamies pÄrlÅ«kprogrammas lapÄ, kur ir atvÄrts āPapildu iestatÄ«jumu konfigurÄÅ”anaā un sadaÄ¼Ä VPC droŔības grupas atlasiet -> IzvÄlieties esoÅ”Äs VPC droŔības grupas -> PostgreSQL:
PÄc tam datu bÄzes opcijÄs -> DatubÄzes nosaukums -> iestatiet nosaukumu - habrDB.
AtlikuÅ”os parametrus, izÅemot dublÄÅ”anas atspÄjoÅ”anu (dublÄjuma saglabÄÅ”anas periods - 0 dienas), uzraudzÄ«bu un Performance Insights, varam atstÄt pÄc noklusÄjuma. NoklikŔķiniet uz pogas Izveidot datu bÄzi:
VÄ«tÅu apstrÄdÄtÄjs
PÄdÄjais posms bÅ«s Spark darba izstrÄde, kas ik pÄc divÄm sekundÄm apstrÄdÄs jaunus Kafkas datus un ievadÄ«s rezultÄtu datubÄzÄ.
KÄ minÄts iepriekÅ”, kontrolpunkti ir SparkStreaming galvenais mehÄnisms, kas ir jÄkonfigurÄ, lai nodroÅ”inÄtu kļūdu toleranci. MÄs izmantosim kontrolpunktus, un, ja procedÅ«ra neizdosies, Spark Streaming modulim bÅ«s tikai jÄatgriežas pÄdÄjÄ kontrolpunktÄ un jÄatsÄk aprÄÄ·ini no tÄ, lai atgÅ«tu zaudÄtos datus.
Kontrolpunktu var iespÄjot, iestatot direktoriju kļūmju izturÄ«gÄ, uzticamÄ failu sistÄmÄ (piemÄram, HDFS, S3 utt.), kurÄ tiks saglabÄta kontrolpunkta informÄcija. Tas tiek darÄ«ts, izmantojot, piemÄram:
streamingContext.checkpoint(checkpointDirectory)
MÅ«su piemÄrÄ mÄs izmantosim Å”Ädu pieeju, proti, ja pastÄv checkpointDirectory, konteksts tiks izveidots no jauna no kontrolpunkta datiem. Ja direktorija neeksistÄ (t.i., tiek izpildÄ«ta pirmo reizi), tiek izsaukta funkcija functionToCreateContext, lai izveidotu jaunu kontekstu un konfigurÄtu DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
MÄs izveidojam DirectStream objektu, lai izveidotu savienojumu ar ādarÄ«jumaā tÄmu, izmantojot KafkaUtils bibliotÄkas metodi createDirectStream:
Izmantojot Spark SQL, mÄs veicam vienkÄrÅ”u grupÄÅ”anu un konsolÄ parÄdÄm rezultÄtu:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
VaicÄjuma teksta iegÅ«Å”ana un palaiÅ”ana, izmantojot Spark SQL:
Un pÄc tam mÄs saglabÄjam iegÅ«tos apkopotos datus tabulÄ AWS RDS. Lai apkopoÅ”anas rezultÄtus saglabÄtu datu bÄzes tabulÄ, mÄs izmantosim DataFrame objekta rakstÄ«Å”anas metodi:
Daži vÄrdi par savienojuma iestatÄ«Å”anu ar AWS RDS. MÄs tam izveidojÄm lietotÄju un paroli, veicot darbÄ«bu āAWS PostgreSQL izvietoÅ”anaā. KÄ datu bÄzes servera URL ir jÄizmanto Endpoint, kas tiek parÄdÄ«ts sadaÄ¼Ä SavienojamÄ«ba un droŔība:
Lai pareizi savienotu Spark un Kafka, jums vajadzÄtu palaist darbu, izmantojot smark-submit, izmantojot artefaktu spark-streaming-kafka-0-8_2.11. TurklÄt mÄs izmantosim arÄ« artefaktu, lai mijiedarbotos ar PostgreSQL datu bÄzi; mÄs tos pÄrsÅ«tÄ«sim, izmantojot paketes.
Skripta elastÄ«bas labad kÄ ievades parametrus iekļausim arÄ« ziÅojumu servera nosaukumu un tÄmu, no kuras vÄlamies saÅemt datus.
TÄtad, ir pienÄcis laiks palaist un pÄrbaudÄ«t sistÄmas funkcionalitÄti:
Viss izdevÄs! KÄ redzams zemÄk esoÅ”ajÄ attÄlÄ, lietojumprogrammas darbÄ«bas laikÄ jauni apkopoÅ”anas rezultÄti tiek izvadÄ«ti ik pÄc 2 sekundÄm, jo āāmÄs iestatÄ«jÄm pakeÅ”u intervÄlu uz 2 sekundÄm, kad veidojÄm StreamingContext objektu:
TÄlÄk mÄs veicam vienkÄrÅ”u vaicÄjumu datu bÄzei, lai pÄrbaudÄ«tu ierakstu klÄtbÅ«tni tabulÄ transakcijas_plÅ«sma:
SecinÄjums
Å ajÄ rakstÄ tika aplÅ«kots informÄcijas straumes apstrÄdes piemÄrs, izmantojot Spark Streaming kopÄ ar Apache Kafka un PostgreSQL. Pieaugot datiem no dažÄdiem avotiem, ir grÅ«ti pÄrvÄrtÄt Spark Streaming praktisko vÄrtÄ«bu straumÄÅ”anas un reÄllaika lietojumprogrammu izveidÄ.
Pilnu avota kodu varat atrast manÄ repozitorijÄ vietnÄ GitHub.
Ar prieku apspriežu Å”o rakstu, gaidu jÅ«su komentÄrus, kÄ arÄ« ceru uz konstruktÄ«vu kritiku no visiem gÄdÄ«gajiem lasÄ«tÄjiem.
Es vÄlu jums veiksmi!
Ps. SÄkotnÄji bija plÄnots izmantot vietÄjo PostgreSQL datu bÄzi, taÄu, Åemot vÄrÄ manu mÄ«lestÄ«bu pret AWS, es nolÄmu pÄrvietot datu bÄzi uz mÄkoni. NÄkamajÄ rakstÄ par Å”o tÄmu es parÄdÄ«Å”u, kÄ ieviest visu iepriekÅ” aprakstÄ«to sistÄmu AWS, izmantojot AWS Kinesis un AWS EMR. Sekojiet jaunumiem!