හෙලෝ, හබ්ර්! අද අපි Spark Streaming භාවිතයෙන් Apache Kafka පණිවිඩ ප්රවාහයන් සකසන පද්ධතියක් ගොඩනඟා AWS RDS ක්ලවුඩ් දත්ත ගබඩාවට සැකසුම් ප්රතිඵල ලියන්නෙමු.
යම් ණය ආයතනයක් එහි සියලුම ශාඛා හරහා "පියාඹන විට" පැමිණෙන ගනුදෙනු සැකසීමේ කාර්යය අපට සකසන බව අපි සිතමු. භාණ්ඩාගාරය සඳහා විවෘත මුදල් තත්ත්වය, ගනුදෙනු සඳහා සීමාවන් හෝ මූල්ය ප්රතිඵල ආදිය ක්ෂණිකව ගණනය කිරීමේ අරමුණින් මෙය කළ හැකිය.
මැජික් සහ මැජික් අක්ෂර භාවිතයෙන් තොරව මෙම නඩුව ක්රියාත්මක කරන්නේ කෙසේද - කප්පාදුව යටතේ කියවන්න! යන්න!
හැඳින්වීම
ඇත්ත වශයෙන්ම, තථ්ය කාලය තුළ දත්ත විශාල ප්රමාණයක් සැකසීම නවීන පද්ධතිවල භාවිතය සඳහා ප්රමාණවත් අවස්ථාවන් සපයයි. මේ සඳහා වඩාත් ජනප්රිය සංයෝජනයක් වන්නේ Apache Kafka සහ Spark Streaming වල ටැන්ඩම් එකයි, එහිදී Kafka විසින් ලැබෙන පණිවිඩ පැකට් ප්රවාහයක් නිර්මාණය කරයි, සහ Spark Streaming මෙම පැකට් ලබා දෙන කාල පරතරයකින් සකසයි.
යෙදුමේ වැරදි ඉවසීම වැඩි කිරීම සඳහා, අපි මුරපොලවල් භාවිතා කරන්නෙමු. මෙම යාන්ත්රණය සමඟින්, Spark Streaming එන්ජිමට නැතිවූ දත්ත නැවත ලබා ගැනීමට අවශ්ය වූ විට, එයට අවශ්ය වන්නේ අවසාන මුරපොල වෙත ආපසු ගොස් එහි සිට ගණනය කිරීම් නැවත ආරම්භ කිරීම පමණි.
සංවර්ධිත පද්ධතියේ ගෘහ නිර්මාණ ශිල්පය
භාවිතා කරන සංරචක:
Apache Kafka බෙදා හරින ලද ප්රකාශන-දායක පණිවිඩ පද්ධතියකි. නොබැඳි සහ මාර්ගගත පණිවිඩ පරිභෝජනය සඳහා සුදුසු වේ. දත්ත නැතිවීම වැළැක්වීම සඳහා, කෆ්කා පණිවිඩ තැටියේ ගබඩා කර පොකුර තුළ ප්රතිවර්තනය කරනු ලැබේ. Kafka පද්ධතිය ZooKeeper සමමුහුර්ත කිරීමේ සේවාව මත ගොඩනගා ඇත;Apache Spark Streaming - ප්රවාහ දත්ත සැකසීම සඳහා Spark සංරචකය. Spark Streaming මොඩියුලය ගොඩනගා ඇත්තේ micro-batch architecture භාවිතයෙන් වන අතර එහිදී දත්ත ප්රවාහය කුඩා දත්ත පැකට් වල අඛණ්ඩ අනුපිළිවෙලක් ලෙස අර්ථ දැක්වේ. Spark Streaming විවිධ මූලාශ්රවලින් දත්ත ලබාගෙන කුඩා පැකේජවලට ඒකාබද්ධ කරයි. නව පැකේජ නිත්ය කාල පරාසයන් තුළ නිර්මාණය වේ. එක් එක් කාල පරතරය ආරම්භයේදී, නව පැකට්ටුවක් සාදනු ලබන අතර, එම කාල පරතරය තුළ ලැබෙන ඕනෑම දත්තයක් පැකට්ටුවට ඇතුළත් වේ. පරතරය අවසානයේ පැකට් වර්ධනය නතර වේ. පරතරයේ විශාලත්වය තීරණය වන්නේ කාණ්ඩ පරතරය ලෙස හැඳින්වෙන පරාමිතියක් මගිනි;Apache Spark SQL - ස්පාර්ක් ක්රියාකාරී ක්රමලේඛනය සමඟ සම්බන්ධතා සැකසුම් ඒකාබද්ධ කරයි. ව්යුහගත දත්ත යන්නෙන් අදහස් වන්නේ ක්රමලේඛයක් ඇති දත්ත, එනම් සියලුම වාර්තා සඳහා තනි ක්ෂේත්ර කට්ටලයක්. Spark SQL විවිධ ව්යුහගත දත්ත මූලාශ්රවලින් ආදානය සඳහා සහය දක්වන අතර, ක්රමලේඛන තොරතුරු ලබා ගැනීමට ස්තූතිවන්ත වන අතර, එයට අවශ්ය වාර්තා ක්ෂේත්ර පමණක් කාර්යක්ෂමව ලබා ගත හැකි අතර DataFrame API ද සපයයි;AWS RDS යනු සාපේක්ෂව මිල අඩු වලාකුළු මත පදනම් වූ සම්බන්ධතා දත්ත ගබඩාවක් වන අතර එය සැකසීම, ක්රියාත්මක කිරීම සහ පරිමාණය සරල කරන වෙබ් සේවාවක් වන අතර එය ඇමසන් විසින් සෘජුවම පරිපාලනය කරයි.
Kafka සේවාදායකය ස්ථාපනය කිරීම සහ ධාවනය කිරීම
කෆ්කා කෙලින්ම භාවිතා කිරීමට පෙර, ඔබ සතුව ජාවා තිබේදැයි සහතික කර ගත යුතුය, මන්ද... JVM වැඩ සඳහා භාවිතා කරයි:
sudo apt-get update
sudo apt-get install default-jre
java -version
කෆ්කා සමඟ වැඩ කිරීමට නව පරිශීලකයෙකු නිර්මාණය කරමු:
sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo
ඊළඟට, නිල Apache Kafka වෙබ් අඩවියෙන් බෙදා හැරීම බාගන්න:
wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"
බාගත කළ සංරක්ෂිතය ඉවත් කරන්න:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
ඊළඟ පියවර විකල්ප වේ. කාරණය වන්නේ පෙරනිමි සැකසුම් ඔබට Apache Kafka හි සියලුම අංග සම්පූර්ණයෙන් භාවිතා කිරීමට ඉඩ නොදෙන බවයි. උදාහරණයක් ලෙස, පණිවිඩ ප්රකාශ කළ හැකි මාතෘකාවක්, කාණ්ඩයක්, කණ්ඩායමක් මකන්න. මෙය වෙනස් කිරීමට, අපි වින්යාස ගොනුව සංස්කරණය කරමු:
vim ~/kafka/config/server.properties
ගොනුවේ අවසානයට පහත දේ එක් කරන්න:
delete.topic.enable = true
Kafka සේවාදායකය ආරම්භ කිරීමට පෙර, ඔබ ZooKeeper සේවාදායකය ආරම්භ කළ යුතුය; අපි Kafka බෙදාහැරීම සමඟ එන සහායක ස්ක්රිප්ට් භාවිතා කරන්නෙමු:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
ZooKeeper සාර්ථකව ආරම්භ වූ පසු, Kafka සේවාදායකය වෙනම පර්යන්තයක දියත් කරන්න:
bin/kafka-server-start.sh config/server.properties
අපි ගනුදෙනු නමින් නව මාතෘකාවක් නිර්මාණය කරමු:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction
අවශ්ය කොටස් සංඛ්යාව සහ අනුවර්තනය සහිත මාතෘකාවක් නිර්මාණය කර ඇති බව සහතික කර ගනිමු:
bin/kafka-topics.sh --describe --zookeeper localhost:2181
අලුතින් නිර්මාණය කරන ලද මාතෘකාව සඳහා නිෂ්පාදකයා සහ පාරිභෝගිකයා පරීක්ෂා කිරීමේ අවස්ථාවන් මග හරිමු. ඔබට පණිවිඩ යැවීම සහ ලැබීම පරීක්ෂා කළ හැකි ආකාරය පිළිබඳ වැඩි විස්තර නිල ලේඛනවල ලියා ඇත -
නිෂ්පාදකයාගේ ලිවීම
නිෂ්පාදකයා අහඹු දත්ත ජනනය කරනු ඇත - සෑම තත්පරයකටම පණිවිඩ 100 ක්. අහඹු දත්ත යන්නෙන් අපි අදහස් කරන්නේ ක්ෂේත්ර තුනකින් සමන්විත ශබ්දකෝෂයකි:
- ශාඛාව - ණය ආයතනයේ විකුණුම් ස්ථානයේ නම;
- මුදල් - ගනුදෙනු මුදල්;
- ප්රමාණය - ගනුදෙනු ප්රමාණය. එය බැංකුව විසින් මුදල් මිලදී ගැනීමක් නම් එම මුදල ධන අංකයක් වන අතර එය විකිණීමක් නම් සෘණ අංකයක් වනු ඇත.
නිෂ්පාදකයා සඳහා කේතය මේ වගේ ය:
from numpy.random import choice, randint
def get_random_value():
new_dict = {}
branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
currency_list = ['RUB', 'USD', 'EUR', 'GBP']
new_dict['branch'] = choice(branch_list)
new_dict['currency'] = choice(currency_list)
new_dict['amount'] = randint(-100, 100)
return new_dict
ඊළඟට, යැවීමේ ක්රමය භාවිතා කරමින්, අපි සේවාදායකයට, අපට අවශ්ය මාතෘකාවට, JSON ආකෘතියෙන් පණිවිඩයක් යවන්නෙමු:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
ස්ක්රිප්ට් ධාවනය කරන විට, අපට ටර්මිනලයේ පහත පණිවිඩ ලැබේ:
මෙයින් අදහස් කරන්නේ සෑම දෙයක්ම අපට අවශ්ය පරිදි ක්රියාත්මක වන බවයි - නිෂ්පාදකයා අපට අවශ්ය මාතෘකාවට පණිවිඩ ජනනය කර යවයි.
මීලඟ පියවර වන්නේ Spark ස්ථාපනය කර මෙම පණිවිඩ ප්රවාහය සැකසීමයි.
Apache Spark ස්ථාපනය කිරීම
අපාචේ ස්පාර්ක් යනු විශ්වීය සහ ඉහළ කාර්යසාධනයක් සහිත පොකුරු පරිගණක වේදිකාවකි.
අන්තර්ක්රියාකාරී විමසුම් සහ ප්රවාහ සැකසුම් ඇතුළුව, පුළුල් පරාසයක පරිගණක වර්ග සඳහා සහය දක්වන අතරම, MapReduce ආකෘතියේ ජනප්රිය ක්රියාත්මක කිරීම්වලට වඩා Spark වඩා හොඳින් ක්රියා කරයි. විශාල දත්ත ප්රමාණයක් සැකසීමේදී වේගය වැදගත් කාර්යභාරයක් ඉටු කරයි, මන්ද එය මිනිත්තු හෝ පැය ගණනක් බලා සිටීමෙන් තොරව අන්තර් ක්රියාකාරීව වැඩ කිරීමට ඔබට ඉඩ සලසයි. ස්පාර්ක්ගේ විශාලතම ශක්තීන්ගෙන් එකක් එය වේගවත් කරයි, මතකයේ ගණනය කිරීම් සිදු කිරීමේ හැකියාවයි.
මෙම රාමුව Scala හි ලියා ඇත, එබැවින් ඔබ එය මුලින්ම ස්ථාපනය කළ යුතුය:
sudo apt-get install scala
නිල වෙබ් අඩවියෙන් Spark බෙදාහැරීම බාගන්න:
wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"
සංරක්ෂිතය ඉවත් කරන්න:
sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark
bash ගොනුවට Spark වෙත මාර්ගය එක් කරන්න:
vim ~/.bashrc
සංස්කාරකය හරහා පහත රේඛා එක් කරන්න:
SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH
bashrc හි වෙනස්කම් සිදු කිරීමෙන් පසු පහත විධානය ක්රියාත්මක කරන්න:
source ~/.bashrc
AWS PostgreSQL යෙදවීම
ඉතිරිව ඇත්තේ ප්රවාහ වලින් අපි සැකසූ තොරතුරු උඩුගත කරන දත්ත සමුදාය යෙදවීම පමණි. මේ සඳහා අපි AWS RDS සේවාව භාවිතා කරමු.
AWS කොන්සෝලය වෙත යන්න -> AWS RDS -> දත්ත සමුදායන් -> දත්ත සමුදාය සාදන්න:
PostgreSQL තෝරන්න සහ ඊළඟ ක්ලික් කරන්න:
නිසා මෙම උදාහරණය අධ්යාපනික අරමුණු සඳහා පමණි; අපි "අවම වශයෙන්" නොමිලේ සේවාදායකයක් භාවිතා කරන්නෙමු (නිදහස් ස්ථරය):
මීලඟට, අපි Free Tier block එකට ටික් එකක් දමමු, ඉන්පසු අපට ස්වයංක්රීයව t2.micro පන්තියේ අවස්ථාවක් පිරිනමනු ලැබේ - දුර්වල වුවද, එය නොමිලේ සහ අපගේ කාර්යයට බෙහෙවින් සුදුසු ය:
ඊළඟට ඉතා වැදගත් දේවල් පැමිණේ: දත්ත සමුදායේ නම, ප්රධාන පරිශීලකයාගේ නම සහ ඔහුගේ මුරපදය. අපි උදාහරණය නම් කරමු: myHabrTest, ප්රධාන පරිශීලකයා: habr, මුරපදය: habr12345 සහ Next බොත්තම මත ක්ලික් කරන්න:
මීළඟ පිටුවේ පිටතින් අපගේ දත්ත සමුදා සේවාදායකයේ ප්රවේශ්යතාව (පොදු ප්රවේශ්යතාව) සහ වරාය ලබා ගැනීමේ හැකියාව සඳහා වගකියන පරාමිති ඇත:
VPC ආරක්ෂක කණ්ඩායම සඳහා නව සැකසුමක් නිර්මාණය කරමු, එමඟින් අපගේ දත්ත සමුදා සේවාදායකයට වරාය 5432 (PostgreSQL) හරහා බාහිර ප්රවේශය ලබා දෙනු ඇත.
අපි වෙනම බ්රව්සර් කවුළුවක AWS කොන්සෝලය වෙත යමු VPC උපකරණ පුවරුව -> ආරක්ෂක කණ්ඩායම් -> ආරක්ෂක කණ්ඩායම් කොටස සාදන්න:
අපි ආරක්ෂක කණ්ඩායම සඳහා නම සකසමු - PostgreSQL, විස්තරයක්, මෙම කණ්ඩායම සම්බන්ධ කළ යුත්තේ කුමන VPC ද යන්න සඳහන් කර සාදන්න බොත්තම ක්ලික් කරන්න:
පහත පින්තූරයේ පෙන්වා ඇති පරිදි, අලුතින් සාදන ලද කණ්ඩායම සඳහා 5432 වරාය සඳහා එන නීති පුරවන්න. ඔබට තොට හස්තීයව සඳහන් කළ නොහැක, නමුත් වර්ගය පතන ලැයිස්තුවෙන් PostgreSQL තෝරන්න.
හරියටම කිවහොත්, අගය ::/0 යන්නෙන් අදහස් කරන්නේ ලොව පුරා සිට සේවාදායකයට එන ගමනාගමනය ලබා ගත හැකි වීමයි, එය කැනොනිකල් වශයෙන් සම්පූර්ණයෙන්ම සත්ය නොවේ, නමුත් උදාහරණය විශ්ලේෂණය කිරීමට, මෙම ප්රවේශය භාවිතා කිරීමට අපට ඉඩ දෙමු:
අපි බ්රවුසර පිටුවට ආපසු යමු, එහිදී අපට “උසස් සැකසුම් වින්යාස කරන්න” විවෘත කර VPC ආරක්ෂක කණ්ඩායම් කොටස තෝරන්න -> පවතින VPC ආරක්ෂක කණ්ඩායම් තෝරන්න -> PostgreSQL:
ඊළඟට, දත්ත සමුදායේ විකල්ප -> දත්ත සමුදායේ නම -> නම සකසන්න - habrDB.
උපස්ථය අක්රිය කිරීම (උපස්ථ රඳවා ගැනීමේ කාලය - දින 0), අධීක්ෂණය සහ කාර්ය සාධන තීක්ෂ්ණ බුද්ධිය පෙරනිමියෙන් හැර, අපට ඉතිරි පරාමිති අත්හැරිය හැක. බොත්තම මත ක්ලික් කරන්න දත්ත සමුදාය සාදන්න:
නූල් හසුරුවන්නා
අවසාන අදියර වනුයේ Spark job එකක් සංවර්ධනය කිරීම වන අතර, එය Kafka වෙතින් සෑම තත්පර දෙකකටම නව දත්ත සකසන අතර එහි ප්රතිඵලය දත්ත සමුදායට ඇතුල් කරනු ඇත.
ඉහත සඳහන් කළ පරිදි, මුරපොල යනු SparkStreaming හි මූලික යාන්ත්රණයක් වන අතර එය දෝෂ ඉවසීම සහතික කිරීම සඳහා වින්යාස කළ යුතුය. අපි මුරපොලවල් භාවිතා කරන අතර, ක්රියා පටිපාටිය අසාර්ථක වුවහොත්, Spark Streaming මොඩියුලයට අවශ්ය වන්නේ අවසන් මුරපොල වෙත ආපසු ගොස් නැතිවූ දත්ත නැවත ලබා ගැනීම සඳහා එයින් ගණනය කිරීම් නැවත ආරම්භ කිරීමයි.
මුරපොල තොරතුරු ගබඩා කරනු ලබන දෝෂ-ඉවසන විශ්වාසදායක ගොනු පද්ධතියක් (HDFS, S3, ආදිය) මත නාමාවලියක් සැකසීමෙන් චෙක්පොයින්ට් කිරීම සක්රීය කළ හැක. මෙය භාවිතා කර ඇත, උදාහරණයක් ලෙස:
streamingContext.checkpoint(checkpointDirectory)
අපගේ උදාහරණයේදී, අපි පහත ප්රවේශය භාවිතා කරන්නෙමු, එනම්, චෙක්පොයින්ට් ඩිරෙක්ටරි තිබේ නම්, එවිට සන්දර්භය මුරපොල දත්ත වලින් ප්රතිනිර්මාණය වේ. නාමාවලිය නොපවතියි නම් (එනම් පළමු වරට ක්රියාත්මක කර ඇත), එවිට නව සන්දර්භයක් නිර්මාණය කිරීමට සහ DStreams වින්යාස කිරීමට functionToCreateContext කැඳවනු ලැබේ:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
අපි KafkaUtils පුස්තකාලයේ createDirectStream ක්රමය භාවිතයෙන් “ගනුදෙනු” මාතෘකාවට සම්බන්ධ වීමට DirectStream වස්තුවක් සාදන්නෙමු:
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)
broker_list = 'localhost:9092'
topic = 'transaction'
directKafkaStream = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": broker_list})
JSON ආකෘතියෙන් ලැබෙන දත්ත විග්රහ කිරීම:
rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
currency=w['currency'],
amount=w['amount']))
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")
Spark SQL භාවිතයෙන්, අපි සරල සමූහකරණයක් සිදු කර කොන්සෝලය තුළ ප්රතිඵලය පෙන්වමු:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
විමසුම් පෙළ ලබා ගැනීම සහ එය Spark SQL හරහා ධාවනය කිරීම:
sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)
ඉන්පසුව අපි එකතු කළ දත්ත AWS RDS හි වගුවකට සුරකිමු. එකතු කිරීමේ ප්රතිඵල දත්ත සමුදා වගුවකට සුරැකීමට, අපි DataFrame වස්තුවේ ලිවීමේ ක්රමය භාවිතා කරමු:
testResultDataFrame.write
.format("jdbc")
.mode("append")
.option("driver", 'org.postgresql.Driver')
.option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB")
.option("dbtable", "transaction_flow")
.option("user", "habr")
.option("password", "habr12345")
.save()
AWS RDS වෙත සම්බන්ධතාවයක් සැකසීම ගැන වචන කිහිපයක්. අපි "AWS PostgreSQL යෙදවීම" පියවරේදී ඒ සඳහා පරිශීලක සහ මුරපදය නිර්මාණය කළෙමු. ඔබ දත්ත සමුදා සේවාදායක url ලෙස Endpoint භාවිතා කළ යුතුය, එය Connectivity & Security කොටසේ පෙන්වයි:
ස්පාර්ක් සහ කෆ්කා නිවැරදිව සම්බන්ධ කිරීම සඳහා, ඔබ කෞතුක වස්තුව භාවිතයෙන් ස්මාර්ක්-ඉදිරිපත් කිරීම හරහා කාර්යය ධාවනය කළ යුතුය. spark-streaming-kafka-0-8_2.11. මීට අමතරව, අපි PostgreSQL දත්ත සමුදාය සමඟ අන්තර්ක්රියා කිරීම සඳහා කෞතුක වස්තුවක් ද භාවිතා කරන්නෙමු; අපි ඒවා --packages හරහා මාරු කරන්නෙමු.
ස්ක්රිප්ටයේ නම්යශීලී බව සඳහා, අපි පණිවිඩ සේවාදායකයේ නම සහ අපට දත්ත ලබා ගැනීමට අවශ්ය මාතෘකාව ආදාන පරාමිති ලෙස ඇතුළත් කරන්නෙමු.
එබැවින්, පද්ධතියේ ක්රියාකාරිත්වය දියත් කිරීමට සහ පරීක්ෂා කිරීමට කාලයයි:
spark-submit
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207
spark_job.py localhost:9092 transaction
සෑම දෙයක්ම සාර්ථක විය! ඔබට පහත පින්තූරයේ පෙනෙන පරිදි, යෙදුම ක්රියාත්මක වන අතරතුර, සෑම තත්පර 2 කට වරක් නව එකතු කිරීමේ ප්රතිඵල ප්රතිදානය වේ, මන්ද අපි StreamingContext වස්තුව සාදන විට අපි බැචින් පරතරය තත්පර 2කට සකසන්නෙමු:
ඊළඟට, වගුවේ වාර්තා තිබේදැයි පරීක්ෂා කිරීම සඳහා අපි දත්ත සමුදායට සරල විමසුමක් කරන්නෙමු ගනුදෙනු_ප්රවාහය:
නිගමනය
මෙම ලිපිය Apache Kafka සහ PostgreSQL සමඟ එක්ව Spark Streaming භාවිතයෙන් තොරතුරු ප්රවාහ සැකසීමේ උදාහරණයක් දෙස බැලුවේය. විවිධ ප්රභවයන්ගෙන් දත්ත වර්ධනය වීමත් සමඟ, ප්රවාහය සහ තත්ය කාලීන යෙදුම් නිර්මාණය කිරීම සඳහා ස්පාර්ක් ප්රවාහයේ ප්රායෝගික වටිනාකම අධිතක්සේරු කිරීම දුෂ්කර ය.
ඔබට මගේ ගබඩාවේ සම්පූර්ණ මූල කේතය සොයාගත හැකිය
මෙම ලිපිය සාකච්ඡා කිරීමට මම සතුටු වෙමි, මම ඔබගේ අදහස් බලාපොරොත්තු වෙමි, එමෙන්ම සියලු සැලකිලිමත් පාඨකයන්ගෙන් නිර්මාණාත්මක විවේචන බලාපොරොත්තු වෙමි.
ඔබට සාර්ථක වේවා!
ගීතා. මුලදී එය දේශීය PostgreSQL දත්ත සමුදායක් භාවිතා කිරීමට සැලසුම් කර ඇත, නමුත් AWS සඳහා මගේ ආදරය මත, මම දත්ත සමුදාය වලාකුළට ගෙන යාමට තීරණය කළෙමි. මෙම මාතෘකාව පිළිබඳ ඊළඟ ලිපියෙන්, AWS Kinesis සහ AWS EMR භාවිතයෙන් AWS හි ඉහත විස්තර කර ඇති සම්පූර්ණ පද්ධතියම ක්රියාත්මක කරන්නේ කෙසේදැයි මම පෙන්වන්නම්. පුවත් අනුගමනය කරන්න!
මූලාශ්රය: www.habr.com