Apache Kafka u Ipproċessar tad-Data Streaming bi Spark Streaming
Hello, Habr! Illum se nibnu sistema li tipproċessa flussi ta' messaġġi Apache Kafka billi tuża Spark Streaming u niktbu r-riżultati tal-ipproċessar fid-database tal-cloud AWS RDS.
Ejja nimmaġinaw li ċerta istituzzjoni ta’ kreditu tagħtina l-kompitu li nipproċessaw transazzjonijiet deħlin "fuq il-ħin" fil-fergħat kollha tagħha. Dan jista’ jsir bil-għan li tiġi kkalkulata fil-pront pożizzjoni ta’ munita miftuħa għat-teżor, limiti jew riżultati finanzjarji għal tranżazzjonijiet, eċċ.
Kif timplimenta dan il-każ mingħajr l-użu ta 'maġija u perjodi ta' maġija - aqra taħt il-qatgħa! Mur!
Naturalment, l-ipproċessar ta 'ammont kbir ta' dejta f'ħin reali jipprovdi opportunitajiet biżżejjed għall-użu f'sistemi moderni. Waħda mill-kombinazzjonijiet l-aktar popolari għal dan hija t-tandem ta 'Apache Kafka u Spark Streaming, fejn Kafka joħloq fluss ta' pakketti ta 'messaġġi deħlin, u Spark Streaming jipproċessa dawn il-pakketti f'intervall ta' ħin partikolari.
Biex tiżdied it-tolleranza tal-ħsarat tal-applikazzjoni, se nużaw punti ta 'kontroll. B'dan il-mekkaniżmu, meta l-magna Spark Streaming teħtieġ tirkupra d-dejta mitlufa, jeħtieġ biss li tmur lura għall-aħħar punt ta 'kontroll u terġa' tibda l-kalkoli minn hemm.
Arkitettura tas-sistema żviluppata
Komponenti użati:
Apache Kafka hija sistema mqassma ta' messaġġi ta' pubblikazzjoni-abbonament. Adattat kemm għall-konsum tal-messaġġi offline kif ukoll onlajn. Biex jiġi evitat it-telf tad-dejta, il-messaġġi Kafka jinħażnu fuq disk u jiġu replikati fi ħdan il-cluster. Is-sistema Kafka hija mibnija fuq is-servizz ta 'sinkronizzazzjoni ZooKeeper;
Apache Spark Streaming - Komponent Spark għall-ipproċessar tad-dejta tal-istrimjar. Il-modulu Spark Streaming huwa mibni bl-użu ta 'arkitettura mikro-lott, fejn il-fluss tad-dejta huwa interpretat bħala sekwenza kontinwa ta' pakketti ta 'dejta żgħar. Spark Streaming jieħu data minn sorsi differenti u jgħaqqadha f'pakketti żgħar. Jinħolqu pakketti ġodda f'intervalli regolari. Fil-bidu ta 'kull intervall ta' ħin, jinħoloq pakkett ġdid, u kwalunkwe data riċevuta matul dak l-intervall hija inkluża fil-pakkett. Fl-aħħar tal-intervall, it-tkabbir tal-pakkett jieqaf. Id-daqs tal-intervall huwa determinat minn parametru msejjaħ l-intervall tal-lott;
Apache Spark SQL - tgħaqqad l-ipproċessar relazzjonali ma 'programmazzjoni funzjonali Spark. Data strutturata tfisser data li għandha schema, jiġifieri sett wieħed ta' oqsma għar-rekords kollha. Spark SQL jappoġġja input minn varjetà ta 'sorsi ta' dejta strutturati u, grazzi għad-disponibbiltà ta 'informazzjoni ta' skema, jista 'jirkupra b'mod effiċjenti biss l-oqsma meħtieġa ta' rekords, u jipprovdi wkoll DataFrame APIs;
AWS RDS hija database relazzjonali relattivament irħisa bbażata fuq il-cloud, servizz tal-web li jissimplifika s-setup, it-tħaddim u l-iskala, u huwa amministrat direttament mill-Amazon.
L-installazzjoni u t-tħaddim tas-server Kafka
Qabel ma tuża Kafka direttament, trid tiżgura ruħek li għandek Java, għax... JVM jintuża għax-xogħol:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Il-pass li jmiss huwa fakultattiv. Il-fatt hu li s-settings default ma jippermettux li tuża bis-sħiħ il-kapaċitajiet kollha ta 'Apache Kafka. Pereżempju, ħassar suġġett, kategorija, grupp li għalihom jistgħu jiġu ppubblikati messaġġi. Biex tibdel dan, ejja neditjaw il-fajl tal-konfigurazzjoni:
vim ~/kafka/config/server.properties
Żid dan li ġej fl-aħħar tal-fajl:
delete.topic.enable = true
Qabel ma tibda s-server Kafka, għandek bżonn tibda s-server ZooKeeper; se nużaw l-iskritt awżiljarju li jiġi mad-distribuzzjoni Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Wara li ZooKeeper beda b'suċċess, iniedi s-server Kafka f'terminal separat:
Ejja nitilfu l-mumenti tal-ittestjar tal-produttur u l-konsumatur għas-suġġett maħluq ġdid. Aktar dettalji dwar kif tista' tittestja li tibgħat u tirċievi messaġġi huma miktuba fid-dokumentazzjoni uffiċjali - Ibgħat xi messaġġi. Ukoll, ngħaddu biex niktbu produttur f'Python billi tuża l-API KafkaProducer.
Kitba tal-produttur
Il-produttur se jiġġenera data każwali - 100 messaġġ kull sekonda. B’dejta każwali nifhmu dizzjunarju li jikkonsisti fi tliet oqsma:
Fergħa — isem il-punt tal-bejgħ tal-istituzzjoni ta’ kreditu;
munita — munita tat-tranżazzjoni;
ammont — l-ammont tat-tranżazzjoni. L-ammont ikun numru pożittiv jekk ikun xiri ta’ munita mill-Bank, u numru negattiv jekk ikun bejgħ.
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Meta tmexxi l-iskrittura, nirċievu l-messaġġi li ġejjin fit-terminal:
Dan ifisser li kollox jaħdem kif ridna - il-produttur jiġġenera u jibgħat messaġġi għas-suġġett li għandna bżonn.
Il-pass li jmiss huwa li tinstalla Spark u tipproċessa dan il-fluss tal-messaġġ.
Installazzjoni ta' Apache Spark
Apache Spark hija pjattaforma tal-kompjuters cluster universali u ta 'prestazzjoni għolja.
Spark jaħdem aħjar minn implimentazzjonijiet popolari tal-mudell MapReduce filwaqt li jappoġġja firxa usa 'ta' tipi ta 'komputazzjoni, inklużi mistoqsijiet interattivi u proċessar ta' stream. Il-veloċità għandha rwol importanti meta tipproċessa ammonti kbar ta 'dejta, peress li hija l-veloċità li tippermettilek taħdem b'mod interattiv mingħajr ma tqatta' minuti jew sigħat tistenna. Waħda mill-akbar saħħiet ta 'Spark li tagħmilha daqshekk mgħaġġla hija l-abbiltà tagħha li twettaq kalkoli fil-memorja.
Dan il-qafas huwa miktub fi Scala, għalhekk għandek bżonn tinstallah l-ewwel:
Għax Dan l-eżempju huwa għal skopijiet edukattivi biss; se nużaw server b'xejn "mill-inqas" (Livell Ħieles):
Sussegwentement, inpoġġu qurdien fil-blokk Free Tier, u wara dan se nkunu awtomatikament offruti eżempju tal-klassi t2.micro - għalkemm dgħajfa, hija b'xejn u pjuttost adattata għall-kompitu tagħna:
Wara jiġu affarijiet importanti ħafna: l-isem tal-istanza tad-database, l-isem tal-utent prinċipali u l-password tiegħu. Ejja nsemmu l-istanza: myHabrTest, utent ewlieni: habr, password: habr12345 u kklikkja fuq il-buttuna Li jmiss:
Fil-paġna li jmiss hemm parametri responsabbli għall-aċċessibbiltà tas-server tad-database tagħna minn barra (aċċessibbiltà pubblika) u d-disponibbiltà tal-port:
Ejja noħolqu setting ġdid għall-grupp tas-sigurtà VPC, li se jippermetti aċċess estern għas-server tad-database tagħna permezz tal-port 5432 (PostgreSQL).
Ejja mmorru għall-console AWS f'tieqa separata tal-brawżer għad-Dashboard tal-VPC -> Gruppi tas-Sigurtà -> Oħloq taqsima tal-grupp tas-sigurtà:
Aħna nissettjaw l-isem għall-grupp tas-Sigurtà - PostgreSQL, deskrizzjoni, indika liema VPC għandu jkun assoċjat miegħu dan il-grupp u kklikkja l-buttuna Oħloq:
Imla r-regoli Inbound għall-port 5432 għall-grupp maħluq ġdid, kif muri fl-istampa hawn taħt. Ma tistax tispeċifika l-port manwalment, iżda agħżel PostgreSQL mil-lista drop-down Tip.
B'mod strett, il-valur ::/0 ifisser id-disponibbiltà tat-traffiku deħlin lejn is-server minn madwar id-dinja, li kanonikament mhux għal kollox veru, iżda biex tanalizza l-eżempju, ejja nħallu lilna nfusna nużaw dan l-approċċ:
Nirritornaw għall-paġna tal-browser, fejn għandna "Ikkonfigura s-settings avvanzati" miftuħa u agħżel fit-taqsima tal-gruppi tas-sigurtà VPC -> Agħżel gruppi ta 'sigurtà VPC eżistenti -> PostgreSQL:
Sussegwentement, fl-għażliet tad-Database -> Isem tad-Database -> issettja l-isem - habrDB.
Nistgħu nħallu l-parametri li jifdal, bl-eċċezzjoni tad-diżattivazzjoni tal-backup (perjodu ta 'żamma ta' backup - jiem 0), monitoraġġ u Performance Insights, b'mod awtomatiku. Ikklikkja fuq il-buttuna Oħloq database:
Maniġer tal-ħajt
L-istadju finali se jkun l-iżvilupp ta 'xogħol Spark, li se jipproċessa dejta ġdida li tiġi minn Kafka kull żewġ sekondi u jdaħħal ir-riżultat fid-database.
Kif innutat hawn fuq, il-punti ta 'kontroll huma mekkaniżmu ewlieni fi SparkStreaming li għandu jiġi kkonfigurat biex jiżgura t-tolleranza tal-ħsarat. Se nużaw punti ta 'kontroll u, jekk il-proċedura tfalli, il-modulu Spark Streaming ikollu bżonn biss li jerġa' lura għall-aħħar punt ta 'kontroll u jerġa' jibda l-kalkoli minnu biex jirkupra d-dejta mitlufa.
Checkpointing jista 'jiġi attivat billi jiġi stabbilit direttorju fuq sistema ta' fajls affidabbli u tolleranti għall-ħsarat (bħal HDFS, S3, eċċ.) li fiha se tinħażen l-informazzjoni dwar il-punt ta 'kontroll. Dan isir bl-użu, pereżempju:
streamingContext.checkpoint(checkpointDirectory)
Fl-eżempju tagħna, se nużaw l-approċċ li ġej, jiġifieri, jekk jeżisti checkpointDirectory, allura l-kuntest jiġi rikrejat mid-data tal-punt ta 'kontroll. Jekk id-direttorju ma jeżistix (jiġifieri esegwit għall-ewwel darba), allura functionToCreateContext tissejjaħ biex toħloq kuntest ġdid u tikkonfigura DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Aħna noħolqu oġġett DirectStream biex nikkonnettjaw mas-suġġett "transazzjoni" billi tuża l-metodu createDirectStream tal-librerija KafkaUtils:
Billi tuża Spark SQL, nagħmlu grupp sempliċi u nuru r-riżultat fil-console:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Ikseb it-test tal-mistoqsija u tħaddem permezz ta' Spark SQL:
U mbagħad nissejvjaw id-dejta aggregata li tirriżulta f'tabella f'AWS RDS. Biex issalva r-riżultati tal-aggregazzjoni f'tabella tad-database, se nużaw il-metodu tal-kitba tal-oġġett DataFrame:
Ftit kelmiet dwar it-twaqqif ta' konnessjoni ma' AWS RDS. Ħloqna l-utent u l-password għaliha fil-pass "Deploying AWS PostgreSQL". Għandek tuża Endpoint bħala l-url tas-server tad-database, li tidher fit-taqsima Konnettività u sigurtà:
Sabiex tikkonnettja b'mod korrett Spark u Kafka, għandek tħaddem ix-xogħol permezz ta' smark-submit billi tuża l-artifact spark-streaming-kafka-0-8_2.11. Barra minn hekk, se nużaw ukoll artifact għall-interazzjoni mad-database PostgreSQL; se nittrasferixxuhom permezz ta '--packages.
Għall-flessibbiltà tal-iskrittura, aħna se ninkludu wkoll bħala parametri ta’ input l-isem tas-server tal-messaġġi u s-suġġett li minnu rridu nirċievu d-dejta.
Allura, wasal iż-żmien li tniedi u tiċċekkja l-funzjonalità tas-sistema:
Kollox ħadem! Kif tistgħu taraw fl-istampa hawn taħt, waqt li l-applikazzjoni tkun qed taħdem, joħorġu riżultati ġodda ta 'aggregazzjoni kull 2 sekondi, għaliex aħna waqqafna l-intervall ta' batching għal 2 sekondi meta ħloqna l-oġġett StreamingContext:
Sussegwentement, nagħmlu mistoqsija sempliċi għad-database biex tivverifika l-preżenza tar-rekords fit-tabella fluss_transazzjoni:
Konklużjoni
Dan l-artikolu ħares lejn eżempju ta 'proċessar ta' fluss ta 'informazzjoni bl-użu ta' Spark Streaming flimkien ma 'Apache Kafka u PostgreSQL. Bit-tkabbir tad-dejta minn sorsi varji, huwa diffiċli li jiġi stmat iżżejjed il-valur prattiku ta 'Spark Streaming għall-ħolqien ta' streaming u applikazzjonijiet f'ħin reali.
Tista' ssib il-kodiċi tas-sors sħiħ fir-repożitorju tiegħi fuq GitHub.
Ninsab kuntent li niddiskuti dan l-artikolu, nistenna bil-ħerqa l-kummenti tiegħek, u nittama wkoll għal kritika kostruttiva mill-qarrejja kollha li jieħdu ħsieb.
Nixtieqek suċċess!
Ps. Inizjalment kien ippjanat li nuża database PostgreSQL lokali, iżda minħabba l-imħabba tiegħi għall-AWS, iddeċidejt li nmexxi d-database għall-cloud. Fl-artiklu li jmiss dwar dan is-suġġett, ser nuri kif nimplimenta s-sistema kollha deskritta hawn fuq f'AWS billi tuża AWS Kinesis u AWS EMR. Segwi l-aħbarijiet!