Inde, kukonza deta yochuluka mu nthawi yeniyeni kumapereka mwayi wokwanira wogwiritsa ntchito machitidwe amakono. Chimodzi mwazophatikiza zodziwika bwino pa izi ndi tandem ya Apache Kafka ndi Spark Streaming, komwe Kafka imapanga mtsinje wa mapaketi a uthenga omwe akubwera, ndipo Spark Streaming imayendetsa mapaketiwa pakanthawi kochepa.
Kuti tiwonjezere kulolerana kolakwika kwa pulogalamuyo, tidzagwiritsa ntchito ma checkpoints. Ndi makina awa, injini ya Spark Streaming ikafunika kubweza zomwe zatayika, zimangofunika kubwereranso pamalo omaliza ndikuyambiranso kuwerengera kuchokera pamenepo.
Zomangamanga za dongosolo lopangidwa
Zogwiritsidwa ntchito:
Apache Kafka ndi makina otumizira mauthenga ofalitsidwa. Ndioyenera kugwiritsa ntchito mauthenga opanda intaneti komanso pa intaneti. Kuti mupewe kutayika kwa data, mauthenga a Kafka amasungidwa pa disk ndikusinthidwa mkati mwa tsango. Dongosolo la Kafka limamangidwa pamwamba pa ntchito yolumikizana ndi ZooKeeper;
Apache Spark Streaming - Chigawo cha Spark pokonza data yosakira. Spark Streaming module imamangidwa pogwiritsa ntchito zomangamanga zazing'ono, pomwe mtsinje wa data umatanthauziridwa ngati ndondomeko yotsatizana ya mapepala ang'onoang'ono a data. Spark Streaming imatenga zambiri kuchokera kuzinthu zosiyanasiyana ndikuziphatikiza m'mapaketi ang'onoang'ono. Maphukusi atsopano amapangidwa nthawi ndi nthawi. Kumayambiriro kwa nthawi iliyonse, paketi yatsopano imapangidwa, ndipo deta iliyonse yolandiridwa panthawiyi imaphatikizidwa mu paketi. Pamapeto pa nthawiyi, kukula kwa paketi kumasiya. Kukula kwa nthawiyo kumatsimikiziridwa ndi chizindikiro chotchedwa batch interval;
Apache Spark SQL - amaphatikiza kukonza kwaubale ndi mapulogalamu a Spark. Deta yokhazikika imatanthawuza deta yomwe ili ndi schema, ndiko kuti, magawo amodzi a zolemba zonse. Spark SQL imathandizira zolowa kuchokera kumitundu yosiyanasiyana ya data ndipo, chifukwa cha kupezeka kwa chidziwitso cha schema, imatha kupezanso magawo ofunikira a zolemba, komanso imapereka ma DataFrame API;
AWS RDS ndi njira yotsika mtengo yochokera pamtambo yochokera pamtambo, ntchito yapaintaneti yomwe imathandizira kukhazikitsa, kugwira ntchito ndi makulitsidwe, ndipo imayendetsedwa mwachindunji ndi Amazon.
Kenako, pogwiritsa ntchito njira yotumizira, timatumiza uthenga ku seva, kumutu womwe tikufuna, mumtundu wa JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Tikamayendetsa script, timalandira mauthenga otsatirawa mu terminal:
Chifukwa Chitsanzo ichi ndi cha maphunziro okha; tidzagwiritsa ntchito seva yaulere "osachepera" (Free Tier):
Kenako, timayika tiki mu chipika cha Free Tier, ndipo pambuyo pake tidzapatsidwa chitsanzo cha t2.micro class - ngakhale yofooka, ndi yaulere komanso yoyenera ntchito yathu:
Kenako pamabwera zinthu zofunika kwambiri: dzina lachitsanzo cha database, dzina la wogwiritsa ntchito wamkulu ndi mawu ake achinsinsi. Tiyeni titchule chitsanzo: myHabrTest, wogwiritsa ntchito: hab, chinsinsi: habr12345 ndikudina batani Lotsatira:
Patsamba lotsatira pali magawo omwe ali ndi udindo wopezeka kwa seva yathu ya database kuchokera kunja (Kufikira kwa anthu) ndi kupezeka kwa doko:
Tiyeni tipange malo atsopano a gulu lachitetezo la VPC, lomwe litilola kuti tipeze seva yathu ya database kudzera pa port 5432 (PostgreSQL).
Tiyeni tipite ku kontrakitala ya AWS pazenera lapadera la msakatuli kupita ku VPC Dashboard -> Magulu Otetezedwa -> Pangani gulu lachitetezo:
Tidayika dzina la gulu la Chitetezo - PostgreSQL, kufotokozera, kuwonetsa VPC yomwe gululi liyenera kulumikizidwa nalo ndikudina batani Pangani:
Lembani malamulo olowera pa doko 5432 pagulu lomwe langopangidwa kumene, monga momwe chithunzi chili pansipa. Simungathe kufotokoza doko pamanja, koma sankhani PostgreSQL kuchokera pamndandanda wotsitsa wa Type.
Kunena zowona, mtengo ::/0 umatanthauza kupezeka kwa magalimoto obwera ku seva kuchokera padziko lonse lapansi, zomwe sizowona kwenikweni, koma kusanthula chitsanzo, tiyeni tidzilole kugwiritsa ntchito njirayi:
Timabwerera patsamba la osatsegula, pomwe tili ndi "Sinthani zosintha zapamwamba" ndikutsegula ndikusankha gawo lamagulu achitetezo a VPC -> Sankhani magulu achitetezo a VPC omwe alipo -> PostgreSQL:
Chotsatira, muzosankha za Database -> Dzina la database -> ikani dzina - habrDB.
Gawo lomaliza lidzakhala chitukuko cha ntchito ya Spark, yomwe idzakonza deta yatsopano kuchokera ku Kafka masekondi awiri aliwonse ndikulowetsa zotsatira zake mu database.
Pogwiritsa ntchito Spark SQL, timapanga gulu losavuta ndikuwonetsa zotsatira zake mu console:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Kupeza mawu amafunso ndikuyendetsa kudzera pa Spark SQL:
Kenako timasunga zomwe zaphatikizidwazo patebulo mu AWS RDS. Kuti tisunge zotsatira zophatikiza pa tebulo la database, tidzagwiritsa ntchito njira yolembera ya chinthu cha DataFrame:
Mawu ochepa okhudza kukhazikitsa kulumikizana ndi AWS RDS. Tidapanga wosuta ndi mawu achinsinsi pagawo la "Deploying AWS PostgreSQL". Muyenera kugwiritsa ntchito Endpoint ngati url ya seva ya database, yomwe ikuwonetsedwa mu Kulumikizana & gawo lachitetezo:
Kuti mulumikizane bwino Spark ndi Kafka, muyenera kuyendetsa ntchitoyi kudzera pa smark-submit pogwiritsa ntchito chojambulacho. spark-streaming-kafka-0-8_2.11. Kuphatikiza apo, tidzagwiritsanso ntchito chojambula polumikizana ndi nkhokwe ya PostgreSQL; tidzawasamutsa kudzera --packages.
Kuti script ikhale yosinthika, tidzaphatikizanso monga magawo olowetsamo dzina la seva ya uthenga ndi mutu womwe tikufuna kulandira deta.
Zonse zinayenda bwino! Monga mukuwonera pachithunzichi, pomwe pulogalamuyo ikugwira ntchito, zotsatira zophatikiza zatsopano zimatuluka masekondi a 2 aliwonse, chifukwa timayika nthawi ya batching kukhala masekondi a 2 pomwe tidapanga chinthu cha StreamingContext:
Kenako, timapanga funso losavuta ku database kuti tiwone kupezeka kwa zolemba patebulo transaction_flow:
Pomaliza
Nkhaniyi idayang'ana chitsanzo cha kusuntha kwa chidziwitso pogwiritsa ntchito Spark Streaming molumikizana ndi Apache Kafka ndi PostgreSQL. Ndi kukula kwa data kuchokera kumagwero osiyanasiyana, ndizovuta kupitilira mtengo wofunikira wa Spark Streaming pakupanga kutsatsa komanso kugwiritsa ntchito nthawi yeniyeni.