ProHoster > blog > Utawala > Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark
Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark
Habari, Habr! Leo tutaunda mfumo ambao utachakata mitiririko ya ujumbe wa Apache Kafka kwa kutumia Spark Streaming na kuandika matokeo ya usindikaji kwenye hifadhidata ya wingu ya AWS RDS.
Hebu fikiria kwamba taasisi fulani ya mikopo inatuwekea kazi ya kushughulikia miamala inayoingia "kwa kuruka" kwenye matawi yake yote. Hili linaweza kufanywa kwa madhumuni ya kukokotoa mara moja nafasi ya wazi ya fedha kwa hazina, vikomo au matokeo ya kifedha ya miamala, n.k.
Jinsi ya kutekeleza kesi hii bila matumizi ya uchawi na uchawi wa uchawi - soma chini ya kukata! Nenda!
Bila shaka, usindikaji wa kiasi kikubwa cha data kwa wakati halisi hutoa fursa nyingi za matumizi katika mifumo ya kisasa. Mojawapo ya mchanganyiko maarufu zaidi wa hii ni tandem ya Apache Kafka na Spark Streaming, ambapo Kafka huunda mtiririko wa pakiti za ujumbe zinazoingia, na Spark Streaming huchakata pakiti hizi kwa muda fulani.
Ili kuongeza uvumilivu wa makosa ya programu, tutatumia vituo vya ukaguzi. Kwa utaratibu huu, wakati injini ya Spark Streaming inahitaji kurejesha data iliyopotea, inahitaji tu kurudi kwenye kituo cha ukaguzi cha mwisho na kuendelea na mahesabu kutoka hapo.
Usanifu wa mfumo ulioendelezwa
Vipengele vilivyotumika:
Apache Kafka ni mfumo wa utumaji ujumbe unaosambazwa wa uchapishaji unaofuata. Inafaa kwa matumizi ya ujumbe nje ya mtandao na mtandaoni. Ili kuzuia upotezaji wa data, ujumbe wa Kafka huhifadhiwa kwenye diski na kunakiliwa ndani ya nguzo. Mfumo wa Kafka umejengwa juu ya huduma ya ulandanishi ya ZooKeeper;
Utiririshaji wa Apache Spark - Sehemu ya Spark kwa usindikaji wa data ya utiririshaji. Moduli ya Utiririshaji wa Spark imeundwa kwa kutumia usanifu wa bechi ndogo, ambapo mtiririko wa data unafasiriwa kama mlolongo unaoendelea wa pakiti ndogo za data. Utiririshaji wa Spark huchukua data kutoka kwa vyanzo tofauti na kuichanganya kuwa vifurushi vidogo. Vifurushi vipya vinaundwa kwa vipindi vya kawaida. Mwanzoni mwa kila muda, pakiti mpya huundwa, na data yoyote iliyopokelewa wakati wa muda huo imejumuishwa kwenye pakiti. Mwishoni mwa muda, ukuaji wa pakiti huacha. Ukubwa wa muda unatambuliwa na parameter inayoitwa muda wa kundi;
SQL ya Apache Spark - inachanganya usindikaji wa uhusiano na programu ya kazi ya Spark. Data iliyopangwa inamaanisha data ambayo ina schema, yaani, seti moja ya sehemu za rekodi zote. Spark SQL inasaidia pembejeo kutoka kwa vyanzo mbalimbali vya data vilivyoundwa na, kutokana na upatikanaji wa taarifa za schema, inaweza kurejesha kwa ufanisi sehemu zinazohitajika za rekodi, na pia hutoa API za DataFrame;
AWS RDS ni hifadhidata ya uhusiano ya bei nafuu inayotegemea wingu, huduma ya tovuti ambayo hurahisisha usanidi, uendeshaji na kuongeza, na inasimamiwa moja kwa moja na Amazon.
Kufunga na kuendesha seva ya Kafka
Kabla ya kutumia Kafka moja kwa moja, unahitaji kuhakikisha kuwa una Java, kwa sababu... JVM inatumika kwa kazi:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Hatua inayofuata ni ya hiari. Ukweli ni kwamba mipangilio ya chaguo-msingi haikuruhusu kutumia kikamilifu uwezo wote wa Apache Kafka. Kwa mfano, futa mada, kategoria, kikundi ambacho ujumbe unaweza kuchapishwa. Ili kubadilisha hii, hebu tuhariri faili ya usanidi:
vim ~/kafka/config/server.properties
Ongeza yafuatayo hadi mwisho wa faili:
delete.topic.enable = true
Kabla ya kuanza seva ya Kafka, unahitaji kuanzisha seva ya ZooKeeper; tutatumia hati ya msaidizi inayokuja na usambazaji wa Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Baada ya ZooKeeper kuanza kwa mafanikio, zindua seva ya Kafka kwenye terminal tofauti:
Hebu tukose matukio ya kujaribu mzalishaji na mtumiaji kwa mada mpya iliyoundwa. Maelezo zaidi kuhusu jinsi unavyoweza kujaribu kutuma na kupokea ujumbe yameandikwa katika nyaraka rasmi - Tuma baadhi ya ujumbe. Kweli, tunaendelea kuandika mtayarishaji katika Python kwa kutumia API ya KafkaProducer.
Kuandika kwa mtayarishaji
Mtayarishaji atatoa data nasibu - ujumbe 100 kila sekunde. Kwa data nasibu tunamaanisha kamusi inayojumuisha sehemu tatu:
Tawi - jina la kituo cha uuzaji cha taasisi ya mkopo;
Sarafu - sarafu ya manunuzi;
kiasi - kiasi cha manunuzi. Kiasi hicho kitakuwa nambari chanya ikiwa ni ununuzi wa sarafu na Benki, na nambari hasi ikiwa ni mauzo.
Ifuatayo, kwa kutumia njia ya kutuma, tunatuma ujumbe kwa seva, kwa mada tunayohitaji, katika umbizo la JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Wakati wa kuendesha hati, tunapokea ujumbe ufuatao kwenye terminal:
Hii inamaanisha kuwa kila kitu hufanya kazi kama tulivyotaka - mtayarishaji hutoa na kutuma ujumbe kwa mada tunayohitaji.
Hatua inayofuata ni kusakinisha Spark na kuchakata mtiririko huu wa ujumbe.
Kufunga Apache Spark
Apache Spark ni jukwaa la jumla na la utendaji wa juu la kompyuta la nguzo.
Spark hufanya kazi vyema zaidi kuliko utekelezaji maarufu wa muundo wa MapReduce huku ikisaidia aina mbalimbali za ukokotoaji, ikiwa ni pamoja na hoja wasilianifu na uchakataji wa mtiririko. Kasi ina jukumu muhimu wakati wa kusindika idadi kubwa ya data, kwani ni kasi ambayo hukuruhusu kufanya kazi kwa maingiliano bila kutumia dakika au masaa kungojea. Mojawapo ya uwezo mkubwa wa Spark unaoifanya iwe haraka sana ni uwezo wake wa kufanya hesabu za kumbukumbu.
Mfumo huu umeandikwa katika Scala, kwa hivyo unahitaji kuisanikisha kwanza:
sudo apt-get install scala
Pakua usambazaji wa Spark kutoka kwa tovuti rasmi:
Tekeleza amri hapa chini baada ya kufanya mabadiliko kwa bashrc:
source ~/.bashrc
Inapeleka AWS PostgreSQL
Kilichosalia ni kupeleka hifadhidata ambamo tutapakia habari iliyochakatwa kutoka kwa mitiririko. Kwa hili tutatumia huduma ya AWS RDS.
Nenda kwa koni ya AWS -> AWS RDS -> Hifadhidata -> Unda hifadhidata:
Chagua PostgreSQL na ubonyeze Ifuatayo:
Kwa sababu Mfano huu ni kwa madhumuni ya kielimu pekee; tutatumia seva isiyolipishwa "kwa kiwango cha chini" (Kiwango cha Bure):
Ifuatayo, tunaweka tiki kwenye Kizuizi cha Free Tier, na baada ya hapo tutapewa kiotomatiki mfano wa darasa la t2.micro - ingawa ni dhaifu, ni la bure na linafaa kabisa kwa kazi yetu:
Kisha huja mambo muhimu sana: jina la mfano wa hifadhidata, jina la mtumiaji mkuu na nenosiri lake. Wacha tupe mfano: myHabrTest, mtumiaji mkuu: habri, nenosiri: habr12345 na ubonyeze kitufe kifuatacho:
Katika ukurasa unaofuata kuna vigezo vinavyohusika na ufikivu wa seva yetu ya hifadhidata kutoka nje (ufikivu wa Umma) na upatikanaji wa mlango:
Hebu tuunde mpangilio mpya wa kikundi cha usalama cha VPC, ambacho kitaruhusu ufikiaji wa nje kwa seva yetu ya hifadhidata kupitia bandari 5432 (PostgreSQL).
Wacha tuende kwenye koni ya AWS kwenye kidirisha tofauti cha kivinjari kwa Dashibodi ya VPC -> Vikundi vya Usalama -> Unda sehemu ya kikundi cha usalama:
Tunaweka jina la kikundi cha Usalama - PostgreSQL, maelezo, yanaonyesha ni VPC gani kikundi hiki kinapaswa kuhusishwa nacho na ubofye kitufe cha Unda:
Jaza sheria zinazoingia za bandari 5432 za kikundi kipya, kama inavyoonekana kwenye picha hapa chini. Huwezi kubainisha bandari wewe mwenyewe, lakini chagua PostgreSQL kutoka kwenye orodha kunjuzi ya Aina.
Kusema kweli, thamani ::/0 inamaanisha upatikanaji wa trafiki inayoingia kwa seva kutoka duniani kote, ambayo si kweli kabisa, lakini ili kuchanganua mfano, hebu tujiruhusu kutumia mbinu hii:
Tunarudi kwenye ukurasa wa kivinjari, ambapo tuna "Sanidi mipangilio ya kina" iliyofunguliwa na uchague katika sehemu ya vikundi vya usalama vya VPC -> Chagua vikundi vya usalama vya VPC vilivyopo -> PostgreSQL:
Ifuatayo, katika chaguzi za Hifadhidata -> Jina la Hifadhidata -> weka jina - habrDB.
Tunaweza kuacha vigezo vilivyosalia, isipokuwa kuzima kipengele cha kuhifadhi nakala (kipindi cha kuhifadhi nakala - siku 0), ufuatiliaji na Maarifa ya Utendaji, kwa chaguo-msingi. Bofya kwenye kifungo Unda hifadhidata:
Kidhibiti cha nyuzi
Hatua ya mwisho itakuwa ukuzaji wa kazi ya Spark, ambayo itashughulikia data mpya kutoka kwa Kafka kila sekunde mbili na kuingiza matokeo kwenye hifadhidata.
Kama ilivyoonyeshwa hapo juu, vituo vya ukaguzi ni njia kuu katika SparkStreaming ambayo lazima isanidiwe ili kuhakikisha uvumilivu wa makosa. Tutatumia vituo vya ukaguzi na, ikiwa utaratibu hautafaulu, moduli ya Spark Streaming itahitaji tu kurudi kwenye kituo cha ukaguzi cha mwisho na kuanza tena mahesabu kutoka kwayo ili kurejesha data iliyopotea.
Ukaguzi unaweza kuwashwa kwa kuweka saraka kwenye mfumo wa faili unaostahimili hitilafu, unaotegemewa (kama vile HDFS, S3, n.k.) ambamo maelezo ya kituo cha ukaguzi yatahifadhiwa. Hii inafanywa kwa kutumia, kwa mfano:
streamingContext.checkpoint(checkpointDirectory)
Katika mfano wetu, tutatumia mbinu ifuatayo, yaani, ikiwa checkpointDirectory ipo, basi muktadha utaundwa upya kutoka kwa data ya ukaguzi. Ikiwa saraka haipo (yaani imetekelezwa kwa mara ya kwanza), basi functionToCreateContext inaitwa kuunda muktadha mpya na kusanidi DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Tunaunda kipengee cha DirectStream ili kuungana na mada ya "muamala" kwa kutumia njia ya kuundaDirectStream ya maktaba ya KafkaUtils:
Kutumia Spark SQL, tunafanya kikundi rahisi na kuonyesha matokeo kwenye koni:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Kupata maandishi ya hoja na kuiendesha kupitia Spark SQL:
Na kisha tunahifadhi data iliyojumlishwa katika jedwali katika AWS RDS. Ili kuhifadhi matokeo ya ujumlisho kwenye jedwali la hifadhidata, tutatumia njia ya kuandika ya kitu cha DataFrame:
Maneno machache kuhusu kusanidi muunganisho kwa AWS RDS. Tuliunda mtumiaji na nenosiri lake katika hatua ya "Kutumia AWS PostgreSQL". Unapaswa kutumia Endpoint kama url ya seva ya hifadhidata, ambayo inaonyeshwa katika sehemu ya Muunganisho na usalama:
Ili kuunganisha kwa usahihi Spark na Kafka, unapaswa kuendesha kazi kupitia smark-submit kwa kutumia kisanii. utiririshaji-cheche-kafka-0-8_2.11. Zaidi ya hayo, tutatumia vizalia vya programu kwa kuingiliana na hifadhidata ya PostgreSQL; tutazihamisha kupitia --packages.
Kwa unyumbufu wa hati, tutajumuisha pia kama vigezo vya kuingiza jina la seva ya ujumbe na mada ambayo tunataka kupokea data.
Kwa hivyo, ni wakati wa kuzindua na kuangalia utendaji wa mfumo:
Kila kitu kilifanyika! Kama unavyoona kwenye picha hapa chini, wakati programu inaendeshwa, matokeo mapya ya ujumlisho hutolewa kila baada ya sekunde 2, kwa sababu tuliweka muda wa kubandika hadi sekunde 2 tulipounda kipengee cha StreamingContext:
Ifuatayo, tunafanya swala rahisi kwa hifadhidata ili kuangalia uwepo wa rekodi kwenye jedwali mtiririko_wa_muamala:
Hitimisho
Nakala hii iliangalia mfano wa usindikaji wa habari kwa kutumia Spark Streaming kwa kushirikiana na Apache Kafka na PostgreSQL. Pamoja na ukuaji wa data kutoka kwa vyanzo mbalimbali, ni vigumu kukadiria thamani ya vitendo ya Utiririshaji wa Spark kwa kuunda utiririshaji na programu za wakati halisi.
Unaweza kupata msimbo kamili wa chanzo kwenye hazina yangu kwa GitHub.
Nimefurahiya kujadili nakala hii, natarajia maoni yako, na pia natumai ukosoaji mzuri kutoka kwa wasomaji wote wanaojali.
Nakutakia mafanikio!
Zab. Hapo awali ilipangwa kutumia hifadhidata ya ndani ya PostgreSQL, lakini kwa kuzingatia upendo wangu kwa AWS, niliamua kuhamisha hifadhidata hadi kwa wingu. Katika makala inayofuata juu ya mada hii, nitaonyesha jinsi ya kutekeleza mfumo mzima ulioelezwa hapo juu katika AWS kwa kutumia AWS Kinesis na AWS EMR. Fuata habari!