Spark Streaming සමඟ Apache Kafka සහ Streaming Data Processing

හෙලෝ, හබ්ර්! අද අපි Spark Streaming භාවිතයෙන් Apache Kafka පණිවිඩ ප්‍රවාහයන් සකසන පද්ධතියක් ගොඩනඟා AWS RDS ක්ලවුඩ් දත්ත ගබඩාවට සැකසුම් ප්‍රතිඵල ලියන්නෙමු.

යම් ණය ආයතනයක් එහි සියලුම ශාඛා හරහා "පියාඹන විට" පැමිණෙන ගනුදෙනු සැකසීමේ කාර්යය අපට සකසන බව අපි සිතමු. භාණ්ඩාගාරය සඳහා විවෘත මුදල් තත්ත්වය, ගනුදෙනු සඳහා සීමාවන් හෝ මූල්‍ය ප්‍රතිඵල ආදිය ක්ෂණිකව ගණනය කිරීමේ අරමුණින් මෙය කළ හැකිය.

මැජික් සහ මැජික් අක්ෂර භාවිතයෙන් තොරව මෙම නඩුව ක්රියාත්මක කරන්නේ කෙසේද - කප්පාදුව යටතේ කියවන්න! යන්න!

Spark Streaming සමඟ Apache Kafka සහ Streaming Data Processing
(පින්තූර මූලාශ්‍රය)

හැඳින්වීම

ඇත්ත වශයෙන්ම, තථ්‍ය කාලය තුළ දත්ත විශාල ප්‍රමාණයක් සැකසීම නවීන පද්ධතිවල භාවිතය සඳහා ප්‍රමාණවත් අවස්ථාවන් සපයයි. මේ සඳහා වඩාත් ජනප්‍රිය සංයෝජනයක් වන්නේ Apache Kafka සහ Spark Streaming වල ටැන්ඩම් එකයි, එහිදී Kafka විසින් ලැබෙන පණිවිඩ පැකට් ප්‍රවාහයක් නිර්මාණය කරයි, සහ Spark Streaming මෙම පැකට් ලබා දෙන කාල පරතරයකින් සකසයි.

යෙදුමේ වැරදි ඉවසීම වැඩි කිරීම සඳහා, අපි මුරපොලවල් භාවිතා කරන්නෙමු. මෙම යාන්ත්‍රණය සමඟින්, Spark Streaming එන්ජිමට නැතිවූ දත්ත නැවත ලබා ගැනීමට අවශ්‍ය වූ විට, එයට අවශ්‍ය වන්නේ අවසාන මුරපොල වෙත ආපසු ගොස් එහි සිට ගණනය කිරීම් නැවත ආරම්භ කිරීම පමණි.

සංවර්ධිත පද්ධතියේ ගෘහ නිර්මාණ ශිල්පය

Spark Streaming සමඟ Apache Kafka සහ Streaming Data Processing

භාවිතා කරන සංරචක:

  • Apache Kafka බෙදා හරින ලද ප්‍රකාශන-දායක පණිවිඩ පද්ධතියකි. නොබැඳි සහ මාර්ගගත පණිවිඩ පරිභෝජනය සඳහා සුදුසු වේ. දත්ත නැතිවීම වැළැක්වීම සඳහා, කෆ්කා පණිවිඩ තැටියේ ගබඩා කර පොකුර තුළ ප්‍රතිවර්තනය කරනු ලැබේ. Kafka පද්ධතිය ZooKeeper සමමුහුර්ත කිරීමේ සේවාව මත ගොඩනගා ඇත;
  • Apache Spark Streaming - ප්‍රවාහ දත්ත සැකසීම සඳහා Spark සංරචකය. Spark Streaming මොඩියුලය ගොඩනගා ඇත්තේ micro-batch architecture භාවිතයෙන් වන අතර එහිදී දත්ත ප්‍රවාහය කුඩා දත්ත පැකට් වල අඛණ්ඩ අනුපිළිවෙලක් ලෙස අර්ථ දැක්වේ. Spark Streaming විවිධ මූලාශ්‍රවලින් දත්ත ලබාගෙන කුඩා පැකේජවලට ඒකාබද්ධ කරයි. නව පැකේජ නිත්‍ය කාල පරාසයන් තුළ නිර්මාණය වේ. එක් එක් කාල පරතරය ආරම්භයේදී, නව පැකට්ටුවක් සාදනු ලබන අතර, එම කාල පරතරය තුළ ලැබෙන ඕනෑම දත්තයක් පැකට්ටුවට ඇතුළත් වේ. පරතරය අවසානයේ පැකට් වර්ධනය නතර වේ. පරතරයේ විශාලත්වය තීරණය වන්නේ කාණ්ඩ පරතරය ලෙස හැඳින්වෙන පරාමිතියක් මගිනි;
  • Apache Spark SQL - ස්පාර්ක් ක්‍රියාකාරී ක්‍රමලේඛනය සමඟ සම්බන්ධතා සැකසුම් ඒකාබද්ධ කරයි. ව්‍යුහගත දත්ත යන්නෙන් අදහස් වන්නේ ක්‍රමලේඛයක් ඇති දත්ත, එනම් සියලුම වාර්තා සඳහා තනි ක්ෂේත්‍ර කට්ටලයක්. Spark SQL විවිධ ව්‍යුහගත දත්ත මූලාශ්‍රවලින් ආදානය සඳහා සහය දක්වන අතර, ක්‍රමලේඛන තොරතුරු ලබා ගැනීමට ස්තූතිවන්ත වන අතර, එයට අවශ්‍ය වාර්තා ක්ෂේත්‍ර පමණක් කාර්යක්ෂමව ලබා ගත හැකි අතර DataFrame API ද සපයයි;
  • AWS RDS යනු සාපේක්ෂව මිල අඩු වලාකුළු මත පදනම් වූ සම්බන්ධතා දත්ත ගබඩාවක් වන අතර එය සැකසීම, ක්‍රියාත්මක කිරීම සහ පරිමාණය සරල කරන වෙබ් සේවාවක් වන අතර එය ඇමසන් විසින් සෘජුවම පරිපාලනය කරයි.

Kafka සේවාදායකය ස්ථාපනය කිරීම සහ ධාවනය කිරීම

කෆ්කා කෙලින්ම භාවිතා කිරීමට පෙර, ඔබ සතුව ජාවා තිබේදැයි සහතික කර ගත යුතුය, මන්ද... JVM වැඩ සඳහා භාවිතා කරයි:

sudo apt-get update 
sudo apt-get install default-jre
java -version

කෆ්කා සමඟ වැඩ කිරීමට නව පරිශීලකයෙකු නිර්මාණය කරමු:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

ඊළඟට, නිල Apache Kafka වෙබ් අඩවියෙන් බෙදා හැරීම බාගන්න:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

බාගත කළ සංරක්ෂිතය ඉවත් කරන්න:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

ඊළඟ පියවර විකල්ප වේ. කාරණය වන්නේ පෙරනිමි සැකසුම් ඔබට Apache Kafka හි සියලුම අංග සම්පූර්ණයෙන් භාවිතා කිරීමට ඉඩ නොදෙන බවයි. උදාහරණයක් ලෙස, පණිවිඩ ප්‍රකාශ කළ හැකි මාතෘකාවක්, කාණ්ඩයක්, කණ්ඩායමක් මකන්න. මෙය වෙනස් කිරීමට, අපි වින්‍යාස ගොනුව සංස්කරණය කරමු:

vim ~/kafka/config/server.properties

ගොනුවේ අවසානයට පහත දේ එක් කරන්න:

delete.topic.enable = true

Kafka සේවාදායකය ආරම්භ කිරීමට පෙර, ඔබ ZooKeeper සේවාදායකය ආරම්භ කළ යුතුය; අපි Kafka බෙදාහැරීම සමඟ එන සහායක ස්ක්‍රිප්ට් භාවිතා කරන්නෙමු:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

ZooKeeper සාර්ථකව ආරම්භ වූ පසු, Kafka සේවාදායකය වෙනම පර්යන්තයක දියත් කරන්න:

bin/kafka-server-start.sh config/server.properties

අපි ගනුදෙනු නමින් නව මාතෘකාවක් නිර්මාණය කරමු:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

අවශ්‍ය කොටස් සංඛ්‍යාව සහ අනුවර්තනය සහිත මාතෘකාවක් නිර්මාණය කර ඇති බව සහතික කර ගනිමු:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Spark Streaming සමඟ Apache Kafka සහ Streaming Data Processing

අලුතින් නිර්මාණය කරන ලද මාතෘකාව සඳහා නිෂ්පාදකයා සහ පාරිභෝගිකයා පරීක්ෂා කිරීමේ අවස්ථාවන් මග හරිමු. ඔබට පණිවිඩ යැවීම සහ ලැබීම පරීක්ෂා කළ හැකි ආකාරය පිළිබඳ වැඩි විස්තර නිල ලේඛනවල ලියා ඇත - පණිවිඩ කිහිපයක් යවන්න. හොඳයි, අපි KafkaProducer API භාවිතයෙන් Python හි නිෂ්පාදකයෙකු ලිවීමට ඉදිරියට යමු.

නිෂ්පාදකයාගේ ලිවීම

නිෂ්පාදකයා අහඹු දත්ත ජනනය කරනු ඇත - සෑම තත්පරයකටම පණිවිඩ 100 ක්. අහඹු දත්ත යන්නෙන් අපි අදහස් කරන්නේ ක්ෂේත්‍ර තුනකින් සමන්විත ශබ්දකෝෂයකි:

  • ශාඛාව - ණය ආයතනයේ විකුණුම් ස්ථානයේ නම;
  • මුදල් - ගනුදෙනු මුදල්;
  • ප්රමාණය - ගනුදෙනු ප්රමාණය. එය බැංකුව විසින් මුදල් මිලදී ගැනීමක් නම් එම මුදල ධන අංකයක් වන අතර එය විකිණීමක් නම් සෘණ අංකයක් වනු ඇත.

නිෂ්පාදකයා සඳහා කේතය මේ වගේ ය:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

ඊළඟට, යැවීමේ ක්‍රමය භාවිතා කරමින්, අපි සේවාදායකයට, අපට අවශ්‍ය මාතෘකාවට, JSON ආකෘතියෙන් පණිවිඩයක් යවන්නෙමු:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

ස්ක්‍රිප්ට් ධාවනය කරන විට, අපට ටර්මිනලයේ පහත පණිවිඩ ලැබේ:

Spark Streaming සමඟ Apache Kafka සහ Streaming Data Processing

මෙයින් අදහස් කරන්නේ සෑම දෙයක්ම අපට අවශ්‍ය පරිදි ක්‍රියාත්මක වන බවයි - නිෂ්පාදකයා අපට අවශ්‍ය මාතෘකාවට පණිවිඩ ජනනය කර යවයි.
මීලඟ පියවර වන්නේ Spark ස්ථාපනය කර මෙම පණිවිඩ ප්‍රවාහය සැකසීමයි.

Apache Spark ස්ථාපනය කිරීම

අපාචේ ස්පාර්ක් යනු විශ්වීය සහ ඉහළ කාර්යසාධනයක් සහිත පොකුරු පරිගණක වේදිකාවකි.

අන්තර්ක්‍රියාකාරී විමසුම් සහ ප්‍රවාහ සැකසුම් ඇතුළුව, පුළුල් පරාසයක පරිගණක වර්ග සඳහා සහය දක්වන අතරම, MapReduce ආකෘතියේ ජනප්‍රිය ක්‍රියාත්මක කිරීම්වලට වඩා Spark වඩා හොඳින් ක්‍රියා කරයි. විශාල දත්ත ප්‍රමාණයක් සැකසීමේදී වේගය වැදගත් කාර්යභාරයක් ඉටු කරයි, මන්ද එය මිනිත්තු හෝ පැය ගණනක් බලා සිටීමෙන් තොරව අන්තර් ක්‍රියාකාරීව වැඩ කිරීමට ඔබට ඉඩ සලසයි. ස්පාර්ක්ගේ විශාලතම ශක්තීන්ගෙන් එකක් එය වේගවත් කරයි, මතකයේ ගණනය කිරීම් සිදු කිරීමේ හැකියාවයි.

මෙම රාමුව Scala හි ලියා ඇත, එබැවින් ඔබ එය මුලින්ම ස්ථාපනය කළ යුතුය:

sudo apt-get install scala

නිල වෙබ් අඩවියෙන් Spark බෙදාහැරීම බාගන්න:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

සංරක්ෂිතය ඉවත් කරන්න:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

bash ගොනුවට Spark වෙත මාර්ගය එක් කරන්න:

vim ~/.bashrc

සංස්කාරකය හරහා පහත රේඛා එක් කරන්න:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

bashrc හි වෙනස්කම් සිදු කිරීමෙන් පසු පහත විධානය ක්‍රියාත්මක කරන්න:

source ~/.bashrc

AWS PostgreSQL යෙදවීම

ඉතිරිව ඇත්තේ ප්‍රවාහ වලින් අපි සැකසූ තොරතුරු උඩුගත කරන දත්ත සමුදාය යෙදවීම පමණි. මේ සඳහා අපි AWS RDS සේවාව භාවිතා කරමු.

AWS කොන්සෝලය වෙත යන්න -> AWS RDS -> දත්ත සමුදායන් -> දත්ත සමුදාය සාදන්න:
Spark Streaming සමඟ Apache Kafka සහ Streaming Data Processing

PostgreSQL තෝරන්න සහ ඊළඟ ක්ලික් කරන්න:
Spark Streaming සමඟ Apache Kafka සහ Streaming Data Processing

නිසා මෙම උදාහරණය අධ්‍යාපනික අරමුණු සඳහා පමණි; අපි "අවම වශයෙන්" නොමිලේ සේවාදායකයක් භාවිතා කරන්නෙමු (නිදහස් ස්ථරය):
Spark Streaming සමඟ Apache Kafka සහ Streaming Data Processing

මීලඟට, අපි Free Tier block එකට ටික් එකක් දමමු, ඉන්පසු අපට ස්වයංක්‍රීයව t2.micro පන්තියේ අවස්ථාවක් පිරිනමනු ලැබේ - දුර්වල වුවද, එය නොමිලේ සහ අපගේ කාර්යයට බෙහෙවින් සුදුසු ය:
Spark Streaming සමඟ Apache Kafka සහ Streaming Data Processing

ඊළඟට ඉතා වැදගත් දේවල් පැමිණේ: දත්ත සමුදායේ නම, ප්රධාන පරිශීලකයාගේ නම සහ ඔහුගේ මුරපදය. අපි උදාහරණය නම් කරමු: myHabrTest, ප්‍රධාන පරිශීලකයා: habr, මුරපදය: habr12345 සහ Next බොත්තම මත ක්ලික් කරන්න:
Spark Streaming සමඟ Apache Kafka සහ Streaming Data Processing

මීළඟ පිටුවේ පිටතින් අපගේ දත්ත සමුදා සේවාදායකයේ ප්‍රවේශ්‍යතාව (පොදු ප්‍රවේශ්‍යතාව) සහ වරාය ලබා ගැනීමේ හැකියාව සඳහා වගකියන පරාමිති ඇත:

Spark Streaming සමඟ Apache Kafka සහ Streaming Data Processing

VPC ආරක්ෂක කණ්ඩායම සඳහා නව සැකසුමක් නිර්මාණය කරමු, එමඟින් අපගේ දත්ත සමුදා සේවාදායකයට වරාය 5432 (PostgreSQL) හරහා බාහිර ප්‍රවේශය ලබා දෙනු ඇත.
අපි වෙනම බ්‍රව්සර් කවුළුවක AWS කොන්සෝලය වෙත යමු VPC උපකරණ පුවරුව -> ආරක්ෂක කණ්ඩායම් -> ආරක්ෂක කණ්ඩායම් කොටස සාදන්න:
Spark Streaming සමඟ Apache Kafka සහ Streaming Data Processing

අපි ආරක්ෂක කණ්ඩායම සඳහා නම සකසමු - PostgreSQL, විස්තරයක්, මෙම කණ්ඩායම සම්බන්ධ කළ යුත්තේ කුමන VPC ද යන්න සඳහන් කර සාදන්න බොත්තම ක්ලික් කරන්න:
Spark Streaming සමඟ Apache Kafka සහ Streaming Data Processing

පහත පින්තූරයේ පෙන්වා ඇති පරිදි, අලුතින් සාදන ලද කණ්ඩායම සඳහා 5432 වරාය සඳහා එන නීති පුරවන්න. ඔබට තොට හස්තීයව සඳහන් කළ නොහැක, නමුත් වර්ගය පතන ලැයිස්තුවෙන් PostgreSQL තෝරන්න.

හරියටම කිවහොත්, අගය ::/0 යන්නෙන් අදහස් කරන්නේ ලොව පුරා සිට සේවාදායකයට එන ගමනාගමනය ලබා ගත හැකි වීමයි, එය කැනොනිකල් වශයෙන් සම්පූර්ණයෙන්ම සත්‍ය නොවේ, නමුත් උදාහරණය විශ්ලේෂණය කිරීමට, මෙම ප්‍රවේශය භාවිතා කිරීමට අපට ඉඩ දෙමු:
Spark Streaming සමඟ Apache Kafka සහ Streaming Data Processing

අපි බ්‍රවුසර පිටුවට ආපසු යමු, එහිදී අපට “උසස් සැකසුම් වින්‍යාස කරන්න” විවෘත කර VPC ආරක්ෂක කණ්ඩායම් කොටස තෝරන්න -> පවතින VPC ආරක්ෂක කණ්ඩායම් තෝරන්න -> PostgreSQL:
Spark Streaming සමඟ Apache Kafka සහ Streaming Data Processing

ඊළඟට, දත්ත සමුදායේ විකල්ප -> දත්ත සමුදායේ නම -> නම සකසන්න - habrDB.

උපස්ථය අක්‍රිය කිරීම (උපස්ථ රඳවා ගැනීමේ කාලය - දින 0), අධීක්‍ෂණය සහ කාර්ය සාධන තීක්ෂ්ණ බුද්ධිය පෙරනිමියෙන් හැර, අපට ඉතිරි පරාමිති අත්හැරිය හැක. බොත්තම මත ක්ලික් කරන්න දත්ත සමුදාය සාදන්න:
Spark Streaming සමඟ Apache Kafka සහ Streaming Data Processing

නූල් හසුරුවන්නා

අවසාන අදියර වනුයේ Spark job එකක් සංවර්ධනය කිරීම වන අතර, එය Kafka වෙතින් සෑම තත්පර දෙකකටම නව දත්ත සකසන අතර එහි ප්‍රතිඵලය දත්ත සමුදායට ඇතුල් කරනු ඇත.

ඉහත සඳහන් කළ පරිදි, මුරපොල යනු SparkStreaming හි මූලික යාන්ත්‍රණයක් වන අතර එය දෝෂ ඉවසීම සහතික කිරීම සඳහා වින්‍යාස කළ යුතුය. අපි මුරපොලවල් භාවිතා කරන අතර, ක්‍රියා පටිපාටිය අසාර්ථක වුවහොත්, Spark Streaming මොඩියුලයට අවශ්‍ය වන්නේ අවසන් මුරපොල වෙත ආපසු ගොස් නැතිවූ දත්ත නැවත ලබා ගැනීම සඳහා එයින් ගණනය කිරීම් නැවත ආරම්භ කිරීමයි.

මුරපොල තොරතුරු ගබඩා කරනු ලබන දෝෂ-ඉවසන විශ්වාසදායක ගොනු පද්ධතියක් (HDFS, S3, ආදිය) මත නාමාවලියක් සැකසීමෙන් චෙක්පොයින්ට් කිරීම සක්‍රීය කළ හැක. මෙය භාවිතා කර ඇත, උදාහරණයක් ලෙස:

streamingContext.checkpoint(checkpointDirectory)

අපගේ උදාහරණයේදී, අපි පහත ප්‍රවේශය භාවිතා කරන්නෙමු, එනම්, චෙක්පොයින්ට් ඩිරෙක්ටරි තිබේ නම්, එවිට සන්දර්භය මුරපොල දත්ත වලින් ප්‍රතිනිර්මාණය වේ. නාමාවලිය නොපවතියි නම් (එනම් පළමු වරට ක්‍රියාත්මක කර ඇත), එවිට නව සන්දර්භයක් නිර්මාණය කිරීමට සහ DStreams වින්‍යාස කිරීමට functionToCreateContext කැඳවනු ලැබේ:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

අපි KafkaUtils පුස්තකාලයේ createDirectStream ක්‍රමය භාවිතයෙන් “ගනුදෙනු” මාතෘකාවට සම්බන්ධ වීමට DirectStream වස්තුවක් සාදන්නෙමු:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

JSON ආකෘතියෙන් ලැබෙන දත්ත විග්‍රහ කිරීම:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Spark SQL භාවිතයෙන්, අපි සරල සමූහකරණයක් සිදු කර කොන්සෝලය තුළ ප්‍රතිඵලය පෙන්වමු:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

විමසුම් පෙළ ලබා ගැනීම සහ එය Spark SQL හරහා ධාවනය කිරීම:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

ඉන්පසුව අපි එකතු කළ දත්ත AWS RDS හි වගුවකට සුරකිමු. එකතු කිරීමේ ප්‍රතිඵල දත්ත සමුදා වගුවකට සුරැකීමට, අපි DataFrame වස්තුවේ ලිවීමේ ක්‍රමය භාවිතා කරමු:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

AWS RDS වෙත සම්බන්ධතාවයක් සැකසීම ගැන වචන කිහිපයක්. අපි "AWS PostgreSQL යෙදවීම" පියවරේදී ඒ සඳහා පරිශීලක සහ මුරපදය නිර්මාණය කළෙමු. ඔබ දත්ත සමුදා සේවාදායක url ලෙස Endpoint භාවිතා කළ යුතුය, එය Connectivity & Security කොටසේ පෙන්වයි:

Spark Streaming සමඟ Apache Kafka සහ Streaming Data Processing

ස්පාර්ක් සහ කෆ්කා නිවැරදිව සම්බන්ධ කිරීම සඳහා, ඔබ කෞතුක වස්තුව භාවිතයෙන් ස්මාර්ක්-ඉදිරිපත් කිරීම හරහා කාර්යය ධාවනය කළ යුතුය. spark-streaming-kafka-0-8_2.11. මීට අමතරව, අපි PostgreSQL දත්ත සමුදාය සමඟ අන්තර්ක්‍රියා කිරීම සඳහා කෞතුක වස්තුවක් ද භාවිතා කරන්නෙමු; අපි ඒවා --packages හරහා මාරු කරන්නෙමු.

ස්ක්‍රිප්ටයේ නම්‍යශීලී බව සඳහා, අපි පණිවිඩ සේවාදායකයේ නම සහ අපට දත්ත ලබා ගැනීමට අවශ්‍ය මාතෘකාව ආදාන පරාමිති ලෙස ඇතුළත් කරන්නෙමු.

එබැවින්, පද්ධතියේ ක්‍රියාකාරිත්වය දියත් කිරීමට සහ පරීක්ෂා කිරීමට කාලයයි:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

සෑම දෙයක්ම සාර්ථක විය! ඔබට පහත පින්තූරයේ පෙනෙන පරිදි, යෙදුම ක්‍රියාත්මක වන අතරතුර, සෑම තත්පර 2 කට වරක් නව එකතු කිරීමේ ප්‍රතිඵල ප්‍රතිදානය වේ, මන්ද අපි StreamingContext වස්තුව සාදන විට අපි බැචින් පරතරය තත්පර 2කට සකසන්නෙමු:

Spark Streaming සමඟ Apache Kafka සහ Streaming Data Processing

ඊළඟට, වගුවේ වාර්තා තිබේදැයි පරීක්ෂා කිරීම සඳහා අපි දත්ත සමුදායට සරල විමසුමක් කරන්නෙමු ගනුදෙනු_ප්‍රවාහය:

Spark Streaming සමඟ Apache Kafka සහ Streaming Data Processing

නිගමනය

මෙම ලිපිය Apache Kafka සහ PostgreSQL සමඟ එක්ව Spark Streaming භාවිතයෙන් තොරතුරු ප්‍රවාහ සැකසීමේ උදාහරණයක් දෙස බැලුවේය. විවිධ ප්‍රභවයන්ගෙන් දත්ත වර්ධනය වීමත් සමඟ, ප්‍රවාහය සහ තත්‍ය කාලීන යෙදුම් නිර්මාණය කිරීම සඳහා ස්පාර්ක් ප්‍රවාහයේ ප්‍රායෝගික වටිනාකම අධිතක්සේරු කිරීම දුෂ්කර ය.

ඔබට මගේ ගබඩාවේ සම්පූර්ණ මූල කේතය සොයාගත හැකිය GitHub.

මෙම ලිපිය සාකච්ඡා කිරීමට මම සතුටු වෙමි, මම ඔබගේ අදහස් බලාපොරොත්තු වෙමි, එමෙන්ම සියලු සැලකිලිමත් පාඨකයන්ගෙන් නිර්මාණාත්මක විවේචන බලාපොරොත්තු වෙමි.

ඔබට සාර්ථක වේවා!

ගීතා. මුලදී එය දේශීය PostgreSQL දත්ත සමුදායක් භාවිතා කිරීමට සැලසුම් කර ඇත, නමුත් AWS සඳහා මගේ ආදරය මත, මම දත්ත සමුදාය වලාකුළට ගෙන යාමට තීරණය කළෙමි. මෙම මාතෘකාව පිළිබඳ ඊළඟ ලිපියෙන්, AWS Kinesis සහ AWS EMR භාවිතයෙන් AWS හි ඉහත විස්තර කර ඇති සම්පූර්ණ පද්ධතියම ක්‍රියාත්මක කරන්නේ කෙසේදැයි මම පෙන්වන්නම්. පුවත් අනුගමනය කරන්න!

මූලාශ්රය: www.habr.com

අදහස් එක් කරන්න