ProHoster > Blog > Pangangasiwa > Apache Kafka at Streaming Data Processing gamit ang Spark Streaming
Apache Kafka at Streaming Data Processing gamit ang Spark Streaming
Hello, Habr! Ngayon, bubuo kami ng system na magpoproseso ng mga stream ng mensahe ng Apache Kafka gamit ang Spark Streaming at isusulat ang mga resulta ng pagproseso sa cloud database ng AWS RDS.
Isipin natin na ang isang partikular na institusyon ng kredito ay nagtatakda sa atin ng gawain ng pagproseso ng mga papasok na transaksyon "on the fly" sa lahat ng sangay nito. Magagawa ito para sa layunin ng agarang pagkalkula ng isang bukas na posisyon ng pera para sa treasury, mga limitasyon o mga resulta sa pananalapi para sa mga transaksyon, atbp.
Paano ipatupad ang kasong ito nang hindi gumagamit ng magic at magic spells - basahin sa ilalim ng hiwa! Go!
Siyempre, ang pagproseso ng malaking halaga ng data sa real time ay nagbibigay ng sapat na pagkakataon para magamit sa mga modernong system. Isa sa mga pinakasikat na kumbinasyon para dito ay ang tandem ng Apache Kafka at Spark Streaming, kung saan gumagawa ang Kafka ng stream ng mga papasok na packet ng mensahe, at pinoproseso ng Spark Streaming ang mga packet na ito sa isang partikular na agwat ng oras.
Para mapataas ang fault tolerance ng application, gagamit kami ng mga checkpoint. Sa mekanismong ito, kapag kailangan ng Spark Streaming engine na mabawi ang nawalang data, kailangan lang nitong bumalik sa huling checkpoint at ipagpatuloy ang mga kalkulasyon mula doon.
Arkitektura ng binuong sistema
Mga bahaging ginamit:
Apache Kafka ay isang distributed publish-subscribe messaging system. Angkop para sa parehong offline at online na pagkonsumo ng mensahe. Upang maiwasan ang pagkawala ng data, ang mga mensahe ng Kafka ay iniimbak sa disk at ginagaya sa loob ng cluster. Ang sistema ng Kafka ay binuo sa ibabaw ng serbisyo ng pag-synchronize ng ZooKeeper;
Apache Spark Streaming - Spark component para sa pagproseso ng streaming data. Ang module ng Spark Streaming ay binuo gamit ang isang micro-batch architecture, kung saan ang stream ng data ay binibigyang-kahulugan bilang isang tuluy-tuloy na pagkakasunud-sunod ng maliliit na data packet. Ang Spark Streaming ay kumukuha ng data mula sa iba't ibang pinagmulan at pinagsasama ito sa maliliit na pakete. Ang mga bagong pakete ay nilikha sa mga regular na pagitan. Sa simula ng bawat agwat ng oras, isang bagong packet ang nilikha, at anumang data na natanggap sa pagitan ng agwat na iyon ay kasama sa packet. Sa pagtatapos ng agwat, hihinto ang paglaki ng packet. Ang laki ng pagitan ay tinutukoy ng isang parameter na tinatawag na batch interval;
Apache Spark SQL - pinagsasama ang relational processing sa Spark functional programming. Ang structured data ay nangangahulugan ng data na may schema, iyon ay, isang set ng field para sa lahat ng record. Sinusuportahan ng Spark SQL ang pag-input mula sa iba't ibang structured data source at, salamat sa pagkakaroon ng impormasyon ng schema, maaari lamang nitong makuha ang mga kinakailangang field ng record nang mahusay, at nagbibigay din ng mga DataFrame API;
AWS RDS ay isang medyo murang cloud-based na relational database, serbisyo sa web na pinapasimple ang setup, operasyon at scaling, at direktang pinangangasiwaan ng Amazon.
Pag-install at pagpapatakbo ng Kafka server
Bago gamitin ang Kafka nang direkta, kailangan mong tiyakin na mayroon kang Java, dahil... Ginagamit ang JVM para sa trabaho:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Ang susunod na hakbang ay opsyonal. Ang katotohanan ay ang mga default na setting ay hindi nagpapahintulot sa iyo na ganap na gamitin ang lahat ng mga tampok ng Apache Kafka. Halimbawa, tanggalin ang isang paksa, kategorya, pangkat kung saan maaaring mai-publish ang mga mensahe. Para baguhin ito, i-edit natin ang configuration file:
vim ~/kafka/config/server.properties
Idagdag ang sumusunod sa dulo ng file:
delete.topic.enable = true
Bago simulan ang Kafka server, kailangan mong simulan ang ZooKeeper server; gagamitin namin ang auxiliary script na kasama ng Kafka distribution:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Matapos matagumpay na magsimula ang ZooKeeper, ilunsad ang Kafka server sa isang hiwalay na terminal:
Palampasin natin ang mga sandali ng pagsubok sa producer at consumer para sa bagong likhang paksa. Higit pang mga detalye tungkol sa kung paano mo masusubok ang pagpapadala at pagtanggap ng mga mensahe ay nakasulat sa opisyal na dokumentasyon - Magpadala ng ilang mensahe. Buweno, nagpapatuloy kami sa pagsulat ng isang producer sa Python gamit ang KafkaProducer API.
Pagsusulat ng producer
Ang producer ay bubuo ng random na data - 100 mga mensahe bawat segundo. Sa pamamagitan ng random na data, ang ibig naming sabihin ay isang diksyunaryo na binubuo ng tatlong field:
Sangay β pangalan ng punto ng pagbebenta ng institusyon ng kredito;
Pera - pera ng transaksyon;
dami β halaga ng transaksyon. Ang halaga ay magiging isang positibong numero kung ito ay isang pagbili ng pera ng Bangko, at isang negatibong numero kung ito ay isang pagbebenta.
Susunod, gamit ang paraan ng pagpapadala, nagpapadala kami ng mensahe sa server, sa paksang kailangan namin, sa format na JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Kapag pinapatakbo ang script, natatanggap namin ang mga sumusunod na mensahe sa terminal:
Nangangahulugan ito na gumagana ang lahat ayon sa gusto namin - ang producer ay bumubuo at nagpapadala ng mga mensahe sa paksang kailangan namin.
Ang susunod na hakbang ay i-install ang Spark at iproseso ang stream ng mensaheng ito.
Pag-install ng Apache Spark
Apache Spark ay isang unibersal at high-performance cluster computing platform.
Mas mahusay ang performance ng Spark kaysa sa mga sikat na pagpapatupad ng modelong MapReduce habang sinusuportahan ang mas malawak na hanay ng mga uri ng pagkalkula, kabilang ang mga interactive na query at pagpoproseso ng stream. Ang bilis ay gumaganap ng isang mahalagang papel kapag nagpoproseso ng malaking halaga ng data, dahil ito ay ang bilis na nagbibigay-daan sa iyo upang gumana nang interactive nang hindi gumugugol ng ilang minuto o oras sa paghihintay. Isa sa pinakamalaking lakas ng Spark na nagpapabilis dito ay ang kakayahang magsagawa ng mga kalkulasyon sa memorya.
Ang balangkas na ito ay nakasulat sa Scala, kaya kailangan mo muna itong i-install:
sudo apt-get install scala
I-download ang pamamahagi ng Spark mula sa opisyal na website:
Patakbuhin ang utos sa ibaba pagkatapos gumawa ng mga pagbabago sa bashrc:
source ~/.bashrc
Pag-deploy ng AWS PostgreSQL
Ang natitira na lang ay i-deploy ang database kung saan namin ia-upload ang naprosesong impormasyon mula sa mga stream. Para dito gagamitin namin ang serbisyo ng AWS RDS.
Pumunta sa AWS console -> AWS RDS -> Mga Database -> Lumikha ng database:
Piliin ang PostgreSQL at i-click ang Susunod:
kasi Ang halimbawang ito ay para sa mga layuning pang-edukasyon lamang; gagamit kami ng isang libreng server "sa pinakamababa" (Libreng Tier):
Susunod, naglalagay kami ng tsek sa bloke ng Libreng Tier, at pagkatapos nito ay awtomatiko kaming bibigyan ng isang halimbawa ng klase ng t2.micro - kahit na mahina, ito ay libre at medyo angkop para sa aming gawain:
Susunod na darating ang mga napakahalagang bagay: ang pangalan ng halimbawa ng database, ang pangalan ng master user at ang kanyang password. Pangalanan natin ang instance: myHabrTest, master user: habr, password: habr12345 at i-click ang Next button:
Sa susunod na pahina mayroong mga parameter na responsable para sa accessibility ng aming database server mula sa labas (Public accessibility) at port availability:
Gumawa tayo ng bagong setting para sa pangkat ng seguridad ng VPC, na magbibigay-daan sa external na access sa aming database server sa pamamagitan ng port 5432 (PostgreSQL).
Pumunta tayo sa AWS console sa isang hiwalay na browser window sa VPC Dashboard -> Security Groups -> Lumikha ng seksyon ng security group:
Itinakda namin ang pangalan para sa pangkat ng Seguridad - PostgreSQL, isang paglalarawan, ipahiwatig kung aling VPC ang pangkat na ito ay dapat iugnay at i-click ang pindutang Lumikha:
Punan ang Inbound na mga panuntunan para sa port 5432 para sa bagong likhang grupo, tulad ng ipinapakita sa larawan sa ibaba. Hindi mo maaaring tukuyin ang port nang manu-mano, ngunit piliin ang PostgreSQL mula sa Uri ng drop-down na listahan.
Sa mahigpit na pagsasalita, ang halaga ::/0 ay nangangahulugang ang pagkakaroon ng papasok na trapiko sa server mula sa buong mundo, na hindi ganap na totoo, ngunit upang suriin ang halimbawa, hayaan natin ang ating sarili na gamitin ang diskarteng ito:
Bumalik kami sa page ng browser, kung saan nakabukas ang "I-configure ang mga advanced na setting" at pumili sa seksyon ng mga pangkat ng seguridad ng VPC -> Piliin ang mga umiiral nang pangkat ng seguridad ng VPC -> PostgreSQL:
Susunod, sa mga pagpipilian sa Database -> Pangalan ng database -> itakda ang pangalan - habrDB.
Maaari naming iwanan ang natitirang mga parameter, maliban sa hindi pagpapagana ng backup (panahon ng pagpapanatili ng backup - 0 araw), pagsubaybay at Performance Insights, bilang default. Mag-click sa pindutan Lumikha ng database:
Tagapangasiwa ng thread
Ang huling yugto ay ang pagbuo ng isang trabaho sa Spark, na magpoproseso ng bagong data na nagmumula sa Kafka bawat dalawang segundo at ipasok ang resulta sa database.
Gaya ng nabanggit sa itaas, ang mga checkpoint ay isang pangunahing mekanismo sa SparkStreaming na dapat i-configure upang matiyak ang pagpapahintulot sa pagkakamali. Gagamit kami ng mga checkpoint at, kung nabigo ang pamamaraan, kakailanganin lamang ng Spark Streaming na module na bumalik sa huling checkpoint at ipagpatuloy ang mga kalkulasyon mula dito upang mabawi ang nawalang data.
Maaaring paganahin ang checkpointing sa pamamagitan ng pagtatakda ng direktoryo sa isang fault-tolerant, maaasahang file system (tulad ng HDFS, S3, atbp.) kung saan iimbak ang impormasyon ng checkpoint. Ginagawa ito gamit ang, halimbawa:
streamingContext.checkpoint(checkpointDirectory)
Sa aming halimbawa, gagamitin namin ang sumusunod na diskarte, ibig sabihin, kung umiiral ang checkpointDirectory, muling gagawa ang konteksto mula sa data ng checkpoint. Kung ang direktoryo ay hindi umiiral (ibig sabihin, naisakatuparan sa unang pagkakataon), ang functionToCreateContext ay tinatawag upang lumikha ng bagong konteksto at i-configure ang DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Lumilikha kami ng DirectStream object upang kumonekta sa paksang "transaksyon" gamit ang paraan ng createDirectStream ng KafkaUtils library:
Gamit ang Spark SQL, gumagawa kami ng isang simpleng pagpapangkat at ipinapakita ang resulta sa console:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Pagkuha ng query text at pagpapatakbo nito sa pamamagitan ng Spark SQL:
At pagkatapos ay ise-save namin ang nagresultang pinagsama-samang data sa isang talahanayan sa AWS RDS. Upang i-save ang mga resulta ng pagsasama-sama sa isang talahanayan ng database, gagamitin namin ang paraan ng pagsulat ng object ng DataFrame:
Ilang salita tungkol sa pag-set up ng koneksyon sa AWS RDS. Ginawa namin ang user at password para dito sa hakbang na "Pag-deploy ng AWS PostgreSQL". Dapat mong gamitin ang Endpoint bilang url ng server ng database, na ipinapakita sa seksyong Pagkakakonekta at seguridad:
Upang maikonekta nang tama ang Spark at Kafka, dapat mong patakbuhin ang trabaho sa pamamagitan ng smark-submit gamit ang artifact spark-streaming-kafka-0-8_2.11. Bukod pa rito, gagamit din kami ng artifact para sa pakikipag-ugnayan sa database ng PostgreSQL; ililipat namin ang mga ito sa pamamagitan ng --packages.
Para sa flexibility ng script, isasama rin namin bilang mga parameter ng input ang pangalan ng server ng mensahe at ang paksa kung saan gusto naming makatanggap ng data.
Kaya, oras na para ilunsad at suriin ang functionality ng system:
Lahat ay nagtagumpay! Tulad ng makikita mo sa larawan sa ibaba, habang tumatakbo ang application, ang mga bagong resulta ng pagsasama-sama ay output tuwing 2 segundo, dahil itinakda namin ang pagitan ng batching sa 2 segundo noong nilikha namin ang object ng StreamingContext:
Susunod, gumawa kami ng isang simpleng query sa database upang suriin ang pagkakaroon ng mga tala sa talahanayan transaction_flow:
Konklusyon
Ang artikulong ito ay tumingin sa isang halimbawa ng pagpoproseso ng stream ng impormasyon gamit ang Spark Streaming kasabay ng Apache Kafka at PostgreSQL. Sa paglaki ng data mula sa iba't ibang pinagmumulan, mahirap i-overestimate ang praktikal na halaga ng Spark Streaming para sa paglikha ng streaming at real-time na mga application.
Mahahanap mo ang buong source code sa aking repository sa GitHub.
Natutuwa akong talakayin ang artikulong ito, inaasahan ko ang iyong mga komento, at umaasa rin ako para sa nakabubuo na pagpuna mula sa lahat ng nagmamalasakit na mambabasa.
Nais kong tagumpay ka!
Ps. Noong una ay binalak na gumamit ng lokal na database ng PostgreSQL, ngunit dahil sa pagmamahal ko sa AWS, nagpasya akong ilipat ang database sa cloud. Sa susunod na artikulo sa paksang ito, ipapakita ko kung paano ipatupad ang buong sistemang inilarawan sa itaas sa AWS gamit ang AWS Kinesis at AWS EMR. Sundan ang balita!