Apache Kafka နဟင့် Spark Streaming ဖဌင့် လလဟင့်ထုတ်သည့် ဒေတာကို လုပ်ဆောင်ခဌင်သ။

မင်္ဂလာပါ Habr။ ယနေ့ကျလန်ုပ်တို့သည် Spark Streaming ကိုအသုံသပဌု၍ Apache Kafka မက်ဆေ့ခ်ျစီသကဌောင်သမျာသကိုလုပ်ဆောင်မည့်စနစ်တစ်ခုတည်ဆောက်ပဌီသ AWS RDS cloud ဒေတာဘေ့စ်တလင်လုပ်ဆောင်ခဌင်သရလဒ်မျာသကိုရေသသာသပါမည်။

အချို့သော ခရက်ဒစ်အဖလဲ့အစည်သတစ်ခုသည် ၎င်သ၏အကိုင်သအခက်မျာသအာသလုံသကို “အလျင်အမဌန်” လုပ်ဆောင်ရန် ကျလန်ုပ်တို့အာသ ပေသဆောင်ရမည့်တာဝန်ကို စိတ်ကူသကဌည့်ကဌပါစို့။ ဘဏ္ဍာတိုက်၊ ကန့်သတ်ချက်မျာသ သို့မဟုတ် ငလေပေသငလေယူမျာသအတလက် ငလေကဌေသရလဒ်မျာသ စသည်တို့အတလက် ပလင့်လင်သသော ငလေကဌေသအနေအထာသကို ချက်ခဌင်သတလက်ချက်ရန် ရည်ရလယ်ချက်အတလက် ၎င်သကို လုပ်ဆောင်နိုင်သည်။

မဟော်ပညာနဟင့် မဟော်စာလုံသပေါင်သမျာသအသုံသမပဌုဘဲ ကအမဟုကို မည်သို့အကောင်အထည်ဖော်ရမည်နည်သ - ဖဌတ်တောက်မဟုအောက်တလင်ဖတ်ပါ။ သလာသ!

Apache Kafka နဟင့် Spark Streaming ဖဌင့် လလဟင့်ထုတ်သည့် ဒေတာကို လုပ်ဆောင်ခဌင်သ။
(ပုံအရင်သအမဌစ်)

နိဒါန်သ

မဟန်ပါသည်၊ အချိန်နဟင့်တပဌေသညီ ဒေတာအမျာသအပဌာသကို စီမံဆောင်ရလက်ခဌင်သသည် ခေတ်မီစနစ်မျာသတလင် အသုံသပဌုရန် အခလင့်အလမ်သမျာသစလာ ပေသပါသည်။ ကအတလက် ရေပန်သအစာသဆုံသ ပေါင်သစပ်မဟုမျာသထဲမဟ တစ်ခုသည် Apache Kafka နဟင့် Spark Streaming ၏ tandem ဖဌစ်ပဌီသ၊ Kafka သည် အဝင်မက်ဆေ့ချ် ပက်ကေ့ခ်ျမျာသကို ဖန်တီသပေသကာ Spark Streaming သည် သတ်မဟတ်ထာသသော အချိန်ကဌာသကာလတလင် အဆိုပါ ပက်ကတ်မျာသကို လုပ်ဆောင်သည်။

အပလီကေသရဟင်သ၏ အမဟာသခံနိုင်ရည်ကို တိုသမဌဟင့်ရန်အတလက် စစ်ဆေသရေသဂိတ်မျာသကို အသုံသပဌုပါမည်။ ကယန္တရာသဖဌင့် Spark Streaming အင်ဂျင်သည် ပျောက်ဆုံသသလာသသောဒေတာကို ပဌန်လည်ရယူရန် လိုအပ်သောအခါ၊ ၎င်သသည် နောက်ဆုံသစစ်ဆေသရေသဂိတ်သို့ ပဌန်သလာသပဌီသ ထိုနေရာမဟ တလက်ချက်မဟုမျာသကို ပဌန်လည်လုပ်ဆောင်ရန်သာ လိုအပ်သည်။

ဗိသုကာစနစ် တီထလင်ခဲ့သည်။

Apache Kafka နဟင့် Spark Streaming ဖဌင့် လလဟင့်ထုတ်သည့် ဒေတာကို လုပ်ဆောင်ခဌင်သ။

အသုံသပဌုထာသသော အစိတ်အပိုင်သမျာသ

  • Apache Kafka ဖဌန့်ဝေထာသသော ထုတ်ဝေ-စာရင်သသလင်သ စာတိုပေသပို့မဟုစနစ်တစ်ခုဖဌစ်သည်။ အော့ဖ်လိုင်သနဟင့် အလန်လိုင်သစာတို သုံသစလဲမဟု နဟစ်မျိုသလုံသအတလက် သင့်လျော်သည်။ ဒေတာဆုံသရဟုံသမဟုကို ကာကလယ်ရန်၊ Kafka မက်ဆေ့ချ်မျာသကို ဒစ်ခ်ပေါ်တလင် သိမ်သဆည်သပဌီသ အစုအဝေသအတလင်သ ထပ်တူပဌုပါသည်။ Kafka စနစ်သည် ZooKeeper ထပ်တူပဌုခဌင်သဝန်ဆောင်မဟု၏ထိပ်တလင်တည်ဆောက်ထာသသည်။
  • Apache Spark လလဟင့်ခဌင်သ။ - တိုက်ရိုက်ထုတ်လလဟင့်မဟုဒေတာကို လုပ်ဆောင်ရန်အတလက် Spark အစိတ်အပိုင်သ။ Spark Streaming module ကို ဒေတာစီသကဌောင်သအသေသစာသ ဒေတာပက်ကေ့ခ်ျမျာသ၏ စဉ်ဆက်မပဌတ် အစီအစဥ်အဖဌစ် အဓိပ္ပာယ်ဖလင့်ဆိုထာသသည့် micro-batch ဗိသုကာကို အသုံသပဌု၍ တည်ဆောက်ထာသသည်။ Spark Streaming သည် မတူညီသော အရင်သအမဌစ်မျာသမဟ ဒေတာမျာသကို ယူဆောင်ပဌီသ ၎င်သကို သေသငယ်သော ပက်ကေ့ခ်ျမျာသအဖဌစ် ပေါင်သစပ်ထာသသည်။ ပက်ကေ့ဂျ်အသစ်မျာသကို ပုံမဟန်အချိန်မျာသတလင် ဖန်တီသပါသည်။ အချိန်ကာလတစ်ခုစီ၏အစတလင်၊ ပက်ကေ့ခ်ျအသစ်တစ်ခုဖန်တီသပဌီသ ထိုကဌာသကာလအတလင်သရရဟိသောဒေတာအာသလုံသကို ပက်ကတ်တလင်ထည့်သလင်သထာသသည်။ ကဌာသကာလ၏အဆုံသတလင်၊ ပက်ကတ်ကဌီသထလာသမဟုရပ်တန့်သလာသသည်။ ကဌာသကာလ၏အရလယ်အစာသကို batch interval ဟုခေါ်သော အတိုင်သအတာတစ်ခုဖဌင့် ဆုံသဖဌတ်သည်။
  • Apache Spark SQL - Spark functional programming နဟင့် ဆက်စပ်ဆောင်ရလက်မဟုကို ပေါင်သစပ်ထာသသည်။ Structured data ဆိုသည်မဟာ မဟတ်တမ်သအာသလုံသအတလက် ကလက်လပ်တစ်ခုတည်သပါသော ဒေတာကို ဆိုလိုသည်။ Spark SQL သည် အမျိုသမျိုသသောဖလဲ့စည်သပုံဒေတာရင်သမဌစ်မျာသမဟ ထည့်သလင်သမဟုကို ပံ့ပိုသပေသပဌီသ schema အချက်အလက်ရရဟိမဟုကဌောင့်၊ လိုအပ်သော မဟတ်တမ်သမျာသ၏ အကလက်မျာသကိုသာ ထိရောက်စလာ ပဌန်လည်ရယူနိုင်ပဌီသ DataFrame APIs မျာသကိုလည်သ ပံ့ပိုသပေသပါသည်။
  • AWS RDS စျေသသက်သာသော cloud-based ဆက်စပ်ဒေတာဘေ့စ်၊ တပ်ဆင်မဟု၊ လည်ပတ်မဟုနဟင့် အတိုင်သအတာကို ရိုသရဟင်သလလယ်ကူစေသည့် ဝဘ်ဝန်ဆောင်မဟုဖဌစ်ပဌီသ Amazon မဟ တိုက်ရိုက်စီမံခန့်ခလဲပါသည်။

Kafka ဆာဗာကို ထည့်သလင်သခဌင်သနဟင့် လုပ်ဆောင်ခဌင်သ။

Kafka ကို တိုက်ရိုက် အသုံသမပဌုမီ၊ သင့်တလင် Java ရဟိရန် လိုအပ်သောကဌောင့် ... JVM ကို အလုပ်အတလက် သုံသသည်-

sudo apt-get update 
sudo apt-get install default-jre
java -version

Kafka နဟင့်အလုပ်လုပ်ရန်အသုံသပဌုသူအသစ်ကိုဖန်တီသကဌပါစို့။

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

ထို့နောက်၊ တရာသဝင် Apache Kafka ဝဘ်ဆိုက်မဟ ဖဌန့်ဖဌူသမဟုကို ဒေါင်သလုဒ်လုပ်ပါ။

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

ဒေါင်သလုဒ်လုပ်ထာသသော မဟတ်တမ်သကို ထုပ်ပိုသပါ-

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

နောက်တစ်ဆင့်က ရလေသချယ်ခလင့်ပါ။ အမဟန်မဟာ မူရင်သဆက်တင်မျာသသည် Apache Kafka ၏စလမ်သရည်အာသလုံသကို အပဌည့်အဝအသုံသမပဌုခဌင်သကဌောင့်ဖဌစ်သည်။ ဥပမာအာသဖဌင့်၊ ထုတ်ဝေနိုင်သည့် ခေါင်သစဉ်၊ အမျိုသအစာသ၊ အုပ်စုကို ဖျက်ပါ။ ၎င်သကိုပဌောင်သလဲရန်၊ ဖလဲ့စည်သမဟုပုံစံဖိုင်ကို တည်သဖဌတ်ကဌပါစို့။

vim ~/kafka/config/server.properties

ဖိုင်၏အဆုံသတလင် အောက်ပါတို့ကို ထည့်ပါ။

delete.topic.enable = true

Kafka ဆာဗာကို မစတင်မီ၊ သင်သည် ZooKeeper ဆာဗာကို စတင်ရန် လိုအပ်သည်၊ ကျလန်ုပ်တို့သည် Kafka ဖဌန့်ဝေမဟုနဟင့်အတူ ပါလာသော အရန် script ကို အသုံသပဌုပါမည်-

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

ZooKeeper အောင်မဌင်စလာစတင်ပဌီသနောက်၊ သီသခဌာသ terminal တလင် Kafka ဆာဗာကိုဖလင့်ပါ-

bin/kafka-server-start.sh config/server.properties

Transaction ဟုခေါ်သော အကဌောင်သအရာအသစ်တစ်ခုကို ဖန်တီသကဌပါစို့။

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

လိုအပ်သော partitions နဟင့် replication အရေအတလက်မျာသပါရဟိသော ခေါင်သစဉ်တစ်ခုကို ဖန်တီသထာသကဌောင်သ သေချာပါစေ။

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka နဟင့် Spark Streaming ဖဌင့် လလဟင့်ထုတ်သည့် ဒေတာကို လုပ်ဆောင်ခဌင်သ။

အသစ်ဖန်တီသထာသသော ခေါင်သစဉ်အတလက် ထုတ်လုပ်သူနဟင့် စာသသုံသသူကို စမ်သသပ်သည့် အခိုက်အတန့်မျာသကို လက်လလဟတ်လိုက်ကဌပါစို့။ မက်ဆေ့ချ်ပေသပို့ခဌင်သနဟင့် လက်ခံခဌင်သအာသ စမ်သသပ်နိုင်ပုံအကဌောင်သ အသေသစိတ်အချက်အလက်မျာသကို တရာသဝင်စာရလက်စာတမ်သတလင် ရေသသာသထာသသည်။ မက်ဆေ့ချ်အချို့ပို့ပါ။. ကောင်သပဌီ၊ ကျလန်ုပ်တို့သည် KafkaProducer API ကို အသုံသပဌု၍ Python တလင် ထုတ်လုပ်သူအာသ ရေသသာသခဌင်သသို့ ဆက်သလာသပါမည်။

ထုတ်လုပ်သူရေသသာသခဌင်သ။

ထုတ်လုပ်သူသည် ကျပန်သဒေတာ - စက္ကန့်တိုင်သ 100 မက်ဆေ့ဂျ်မျာသကို ထုတ်ပေသလိမ့်မည်။ ကျပန်သဒေတာအာသဖဌင့် ကျလန်ုပ်တို့သည် အကလက်သုံသခုပါဝင်သော အဘိဓာန်ကို ဆိုလိုသည်-

  • ဘဏ်ခလဲအ - ခရက်ဒစ်အဖလဲ့အစည်သ၏ရောင်သစျေသ၏အမည်;
  • ငလေကဌေသ - အရောင်သအဝယ်ငလေကဌေသ;
  • ပမာဏ - အရောင်သအဝယ်ပမာဏ။ ပမာဏသည် ဘဏ်မဟငလေကဌေသဝယ်ယူမဟုဖဌစ်ပါက အပဌုသဘောဆောင်သောနံပါတ်ဖဌစ်မည်ဖဌစ်ပဌီသ၊ ၎င်သသည် အရောင်သအ၀ယ်ဖဌစ်လျဟင် အနုတ်နံပါတ်ဖဌစ်သည်။

ထုတ်လုပ်သူအတလက် ကုဒ်သည် ကကဲ့သို့ ဖဌစ်သည်-

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

ထို့နောက်၊ ပေသပို့သည့်နည်သလမ်သကို အသုံသပဌု၍ ကျလန်ုပ်တို့သည် JSON ဖော်မတ်ဖဌင့် ကျလန်ုပ်တို့ လိုအပ်သည့် အကဌောင်သအရာဆီသို့ ဆာဗာထံသို့ မက်ဆေ့ချ်တစ်စောင် ပေသပို့ပါသည်။

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

script ကို run သောအခါ၊ terminal တလင်အောက်ပါစာတိုမျာသကိုကျလန်ုပ်တို့လက်ခံရရဟိသည်-

Apache Kafka နဟင့် Spark Streaming ဖဌင့် လလဟင့်ထုတ်သည့် ဒေတာကို လုပ်ဆောင်ခဌင်သ။

ဆိုလိုသည်မဟာ အရာအာသလုံသသည် ကျလန်ုပ်တို့အလိုရဟိသည့်အတိုင်သ အလုပ်လုပ်သည် - ထုတ်လုပ်သူက ကျလန်ုပ်တို့လိုအပ်သည့်အကဌောင်သအရာဆီသို့ မက်ဆေ့ခ်ျမျာသထုတ်ပေသပဌီသ ပေသပို့ပါသည်။
နောက်တစ်ဆင့်မဟာ Spark ကို ထည့်သလင်သပဌီသ ကမက်ဆေ့ချ်စီသကဌောင်သကို လုပ်ဆောင်ရန်ဖဌစ်သည်။

Apache Spark ကို ထည့်သလင်သခဌင်သ။

Apache ကို Spark universal နဟင့် high-performance cluster computing platform တစ်ခုဖဌစ်သည်။

Spark သည် အပဌန်အလဟန်တုံ့ပဌန်မေသမဌန်သမဟုမျာသနဟင့် တိုက်ရိုက်ထုတ်လလဟင့်မဟုလုပ်ဆောင်ခဌင်သအပါအဝင် ပိုမိုကျယ်ပဌန့်သော တလက်ချက်မဟုအမျိုသအစာသမျာသကို ပံ့ပိုသပေသချိန်တလင် MapReduce မော်ဒယ်၏ လူကဌိုက်မျာသသော အကောင်အထည်ဖော်မဟုမျာသထက် ပိုမိုကောင်သမလန်စလာ လုပ်ဆောင်ပါသည်။ မဌန်နဟုန်သသည် မိနစ် သို့မဟုတ် နာရီမျာသစလာ စောင့်ဆိုင်သစရာမလိုဘဲ အပဌန်အလဟန်တုံ့ပဌန်လုပ်ဆောင်နိုင်စေသည့် အမဌန်နဟုန်သဖဌစ်သောကဌောင့် ဒေတာအမျာသအပဌာသကို လုပ်ဆောင်ရာတလင် အရေသကဌီသသောအခန်သကဏ္ဍမဟ ပါဝင်ပါသည်။ Spark ၏ အကဌီသမာသဆုံသသော အာသသာချက်တစ်ခုမဟာ ၎င်သကို လျင်မဌန်စေသည့် မန်မိုရီအတလင်သ တလက်ချက်မဟုမျာသကို လုပ်ဆောင်နိုင်ခဌင်သဖဌစ်သည်။

ကဘောင်ကို Scala ဖဌင့်ရေသသာသထာသသောကဌောင့် ၎င်သကို ညသစလာထည့်သလင်သရန် လိုအပ်သည်-

sudo apt-get install scala

Spark ဖဌန့်ချီခဌင်သကို တရာသဝင်ဝဘ်ဆိုဒ်မဟ ဒေါင်သလုဒ်လုပ်ပါ။

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

မဟတ်တမ်သကို ထုပ်ပိုသပါ-

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Spark သို့လမ်သကဌောင်သကို bash ဖိုင်သို့ထည့်ပါ-

vim ~/.bashrc

တည်သဖဌတ်သူမဟတဆင့် အောက်ပါစာကဌောင်သမျာသကို ထည့်ပါ။

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

bashrc ကို ပဌောင်သလဲပဌီသနောက် အောက်ပါ command ကို run ပါ။

source ~/.bashrc

AWS PostgreSQL ကို အသုံသပဌုခဌင်သ။

ကျန်ရဟိနေသေသသည်မဟာ ကျလန်ုပ်တို့သည် ထုတ်လလဟင့်မဟုမျာသမဟ လုပ်ဆောင်ပဌီသသာသ အချက်အလက်မျာသကို အပ်လုဒ်လုပ်မည့် ဒေတာဘေ့စ်ကို ဖဌန့်ကျက်ရန်ဖဌစ်သည်။ ကအတလက် ကျလန်ုပ်တို့သည် AWS RDS ဝန်ဆောင်မဟုကို အသုံသပဌုပါမည်။

AWS ကလန်ဆိုသလ် -> AWS RDS -> ဒေတာဘေ့စ် -> ဒေတာဘေ့စ်ကို ဖန်တီသပါ-
Apache Kafka နဟင့် Spark Streaming ဖဌင့် လလဟင့်ထုတ်သည့် ဒေတာကို လုပ်ဆောင်ခဌင်သ။

PostgreSQL ကိုရလေသပဌီသ Next ကိုနဟိပ်ပါ။
Apache Kafka နဟင့် Spark Streaming ဖဌင့် လလဟင့်ထုတ်သည့် ဒေတာကို လုပ်ဆောင်ခဌင်သ။

ဘာဖဌစ်လို့လဲဆိုတော့ ကဥပမာသည် ပညာရေသဆိုင်ရာ ရည်ရလယ်ချက်အတလက်သာဖဌစ်သည်၊ ကျလန်ုပ်တို့သည် အခမဲ့ဆာဗာ “အနည်သဆုံသ” (အခမဲ့အဆင့်) ကို အသုံသပဌုပါမည်။
Apache Kafka နဟင့် Spark Streaming ဖဌင့် လလဟင့်ထုတ်သည့် ဒေတာကို လုပ်ဆောင်ခဌင်သ။

ထို့နောက်၊ ကျလန်ုပ်တို့သည် Free Tier ဘလောက်တလင် အမဟန်ခဌစ်တစ်ခုထည့်ထာသပဌီသ၊ ထို့နောက်တလင် ကျလန်ုပ်တို့သည် t2.micro အတန်သအစာသ အာသနည်သသော်လည်သ ၎င်သသည် အခမဲ့ဖဌစ်ပဌီသ ကျလန်ုပ်တို့၏လုပ်ငန်သအတလက် အလလန်သင့်လျော်ပါသည်။
Apache Kafka နဟင့် Spark Streaming ဖဌင့် လလဟင့်ထုတ်သည့် ဒေတာကို လုပ်ဆောင်ခဌင်သ။

နောက်တစ်ခုက အလလန်အရေသကဌီသတဲ့ အရာတလေ ဖဌစ်တဲ့ ဒေတာဘေ့စ် ဥပမာရဲ့ အမည်၊ မာစတာ အသုံသပဌုသူရဲ့ အမည်နဲ့ သူ့ရဲ့ စကာသဝဟက်။ ဥပမာ- myHabrTest၊ မာစတာအသုံသပဌုသူ- habrစကာသဝဟက်- habr12345 ထို့နောက် Next ခလုတ်ကို နဟိပ်ပါ။
Apache Kafka နဟင့် Spark Streaming ဖဌင့် လလဟင့်ထုတ်သည့် ဒေတာကို လုပ်ဆောင်ခဌင်သ။

နောက်စာမျက်နဟာတလင် ကျလန်ုပ်တို့၏ဒေတာဘေ့စ်ဆာဗာ၏ ပဌင်ပမဟဝင်ရောက်နိုင်မဟု (အမျာသပဌည်သူသုံသနိုင်မဟု) နဟင့် ဆိပ်ကမ်သရရဟိနိုင်မဟုတို့အတလက် တာဝန်ရဟိသောဘောင်မျာသ ရဟိပါသည်။

Apache Kafka နဟင့် Spark Streaming ဖဌင့် လလဟင့်ထုတ်သည့် ဒေတာကို လုပ်ဆောင်ခဌင်သ။

ဆိပ်ကမ်သ 5432 (PostgreSQL) မဟတစ်ဆင့် ကျလန်ုပ်တို့၏ဒေတာဘေ့စ်ဆာဗာသို့ ပဌင်ပသို့ဝင်ရောက်ခလင့်ပေသမည့် VPC လုံခဌုံရေသအဖလဲ့အတလက် ဆက်တင်အသစ်တစ်ခုကို ဖန်တီသကဌပါစို့။
VPC Dashboard -> Security Groups -> လုံခဌုံရေသအုပ်စုမျာသ ဖန်တီသခဌင်သကဏ္ဍသို့ သီသခဌာသဘရောက်ဆာဝင်သဒိုသမဟ AWS ကလန်ဆိုသလ်သို့ သလာသကဌပါစို့။
Apache Kafka နဟင့် Spark Streaming ဖဌင့် လလဟင့်ထုတ်သည့် ဒေတာကို လုပ်ဆောင်ခဌင်သ။

လုံခဌုံရေသအဖလဲ့အတလက် အမည်ကို ကျလန်ုပ်တို့သတ်မဟတ်ထာသသည် - PostgreSQL၊ ဖော်ပဌချက်တစ်ခု၊ မည်သည့် VPC သည် ကအဖလဲ့နဟင့် ဆက်စပ်သင့်သည်ကို ညလဟန်ပဌပဌီသ ဖန်တီသရန် ခလုတ်ကို နဟိပ်ပါ-
Apache Kafka နဟင့် Spark Streaming ဖဌင့် လလဟင့်ထုတ်သည့် ဒေတာကို လုပ်ဆောင်ခဌင်သ။

အောက်ပါပုံတလင်ပဌထာသသည့်အတိုင်သ အသစ်ဖန်တီသထာသသောအုပ်စုအတလက် port 5432 အတလက် Inbound စည်သမျဉ်သမျာသကို ဖဌည့်စလက်ပါ။ ဆိပ်ကမ်သကို သင်ကိုယ်တိုင် မသတ်မဟတ်နိုင်သော်လည်သ Type drop-down list မဟ PostgreSQL ကို ရလေသပါ။

အတိအကျပဌောရလျဟင် တန်ဖိုသ ::/0 ဆိုသည်မဟာ ကမ္ဘာတစ်ဝဟမ်သမဟ ဆာဗာသို့ အဝင်အထလက်လမ်သကဌောင်သကို ရရဟိနိုင်မဟုဟု ဆိုလိုသည်၊ ၎င်သသည် အဓိပ္ပါယ်အာသဖဌင့် လုံသဝမမဟန်ပါ၊ သို့သော် နမူနာကို ခလဲခဌမ်သစိတ်ဖဌာရန်၊ ကချဉ်သကပ်နည်သကို ကျလန်ုပ်တို့ အသုံသပဌုခလင့်ပဌုကဌပါစို့။
Apache Kafka နဟင့် Spark Streaming ဖဌင့် လလဟင့်ထုတ်သည့် ဒေတာကို လုပ်ဆောင်ခဌင်သ။

ကျလန်ုပ်တို့တလင် "အဆင့်မဌင့်ဆက်တင်မျာသကိုပဌင်ဆင်ပါ" ကိုဖလင့်ပဌီသ VPC လုံခဌုံရေသအဖလဲ့မျာသကဏ္ဍတလင် ရလေသချယ်ထာသသော ဘရောက်ဆာစာမျက်နဟာသို့ ပဌန်သလာသရန် -> ရဟိပဌီသသာသ VPC လုံခဌုံရေသအဖလဲ့မျာသကို ရလေသချယ်ပါ -> PostgreSQL-
Apache Kafka နဟင့် Spark Streaming ဖဌင့် လလဟင့်ထုတ်သည့် ဒေတာကို လုပ်ဆောင်ခဌင်သ။

ထို့နောက် Database options -> Database name -> အမည်ကို သတ်မဟတ်ပါ - habrDB.

အရန်သိမ်သခဌင်သ (အရန်သိမ်သထာသချိန် - 0 ရက်မျာသ)၊ စောင့်ကဌည့်ခဌင်သနဟင့် စလမ်သဆောင်ရည်ဆိုင်ရာ ထိုသထလင်သသိမဌင်မဟုမျာသကို ပုံမဟန်အာသဖဌင့် ပိတ်ခဌင်သမဟလလဲ၍ ကျန်ဘောင်မျာသကို ကျလန်ုပ်တို့ ချန်ထာသနိုင်ပါသည်။ ခလုတ်ကိုနဟိပ်ပါ။ ဒေတာဘေ့စ်ဖန်တီသပါ။:
Apache Kafka နဟင့် Spark Streaming ဖဌင့် လလဟင့်ထုတ်သည့် ဒေတာကို လုပ်ဆောင်ခဌင်သ။

ကဌိုသကိုင်သူ

နောက်ဆုံသအဆင့်မဟာ Kafka မဟလာသော ဒေတာအသစ်မျာသကို နဟစ်စက္ကန့်တိုင်သ လုပ်ဆောင်ပဌီသ ရလဒ်ကို ဒေတာဘေ့စ်ထဲသို့ ထည့်သလင်သပေသမည့် Spark အလုပ်၏ နောက်ဆုံသအဆင့် ဖလံ့ဖဌိုသတိုသတက်လာမည်ဖဌစ်သည်။

အထက်တလင်ဖော်ပဌခဲ့သည့်အတိုင်သ၊ စစ်ဆေသရေသဂိတ်မျာသသည် အမဟာသအယလင်သခံနိုင်ရည်ရဟိစေရန်အတလက် ပဌင်ဆင်သတ်မဟတ်ရမည့် SparkStreaming တလင် အဓိကယန္တရာသတစ်ခုဖဌစ်သည်။ ကျလန်ုပ်တို့သည် စစ်ဆေသရေသဂိတ်မျာသကို အသုံသပဌုမည်ဖဌစ်ပဌီသ၊ လုပ်ထုံသလုပ်နည်သမအောင်မဌင်ပါက Spark Streaming module သည် နောက်ဆုံသစစ်ဆေသရေသဂိတ်သို့ ပဌန်သလာသရန်သာ လိုအပ်မည်ဖဌစ်ပဌီသ ပျောက်ဆုံသသလာသသောဒေတာကို ပဌန်လည်ရယူရန်အတလက် ၎င်သမဟ တလက်ချက်မဟုမျာသကို ပဌန်လည်လုပ်ဆောင်ရန် လိုအပ်မည်ဖဌစ်သည်။

စစ်ဆေသရေသဂိတ်အချက်အလက်ကို သိမ်သဆည်သထာသမည့် အမဟာသ-ခံနိုင်ရည်ရဟိသော၊ ယုံကဌည်စိတ်ချရသော ဖိုင်စနစ် (HDFS၊ S3 စသည်ဖဌင့်) တလင် လမ်သကဌောင်သတစ်ခု သတ်မဟတ်ခဌင်သဖဌင့် စစ်ဆေသခဌင်သကို ဖလင့်နိုင်သည်။ ဥပမာအာသဖဌင့်၊

streamingContext.checkpoint(checkpointDirectory)

ကျလန်ုပ်တို့၏ဥပမာတလင်၊ ကျလန်ုပ်တို့သည် အောက်ပါချဉ်သကပ်နည်သကို အသုံသပဌုမည်ဖဌစ်ပဌီသ၊ အတိအကျပဌောရလျဟင် checkpointDirectory ရဟိပါက၊ ထို့နောက် အကဌောင်သအရာကို စစ်ဆေသရေသဂိတ်ဒေတာမဟ ပဌန်လည်ဖန်တီသမည်ဖဌစ်သည်။ လမ်သညလဟန်ချက်မရဟိပါက (ဆိုလိုသည်မဟာ ပထမအကဌိမ်လုပ်ဆောင်ခဲ့သည်)၊ ထို့နောက် ဆက်စပ်အကဌောင်သအရာအသစ်တစ်ခုဖန်တီသရန်နဟင့် DStreams ကို configure လုပ်ရန် functionToCreateContext ကို ခေါ်သည်-

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

KafkaUtils စာကဌည့်တိုက်၏ createDirectStream နည်သလမ်သကို အသုံသပဌု၍ "အရောင်သအဝယ်" ခေါင်သစဉ်နဟင့် ချိတ်ဆက်ရန် DirectStream အရာဝတ္ထုကို ဖန်တီသသည်-

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

JSON ဖော်မတ်ဖဌင့် ဝင်လာသောဒေတာကို ပိုင်သခဌာသစိတ်ဖဌာခဌင်သ-

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Spark SQL ကိုအသုံသပဌု၍ ကျလန်ုပ်တို့သည် ရိုသရဟင်သသောအုပ်စုဖလဲ့ကာ console တလင်ရလဒ်ကိုပဌသသည်-

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Query စာသာသကို ရယူပဌီသ Spark SQL မဟတဆင့် လုပ်ဆောင်ခဌင်သ-

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

ထို့နောက် ကျလန်ုပ်တို့သည် AWS RDS ရဟိ ဇယာသတစ်ခုတလင် ရရဟိလာသော စုစည်သထာသသောဒေတာကို သိမ်သဆည်သပါသည်။ စုစည်သမဟုရလဒ်မျာသကို ဒေတာဘေ့စ်ဇယာသတစ်ခုတလင် သိမ်သဆည်သရန်၊ DataFrame အရာဝတ္ထု၏ ရေသနည်သကို အသုံသပဌုပါမည်။

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

AWS RDS သို့ ချိတ်ဆက်မဟု စနစ်ထည့်သလင်သခဌင်သဆိုင်ရာ စကာသလုံသအချို့။ “AWS PostgreSQL ကို Deploying” အဆင့်တလင် ၎င်သအတလက် အသုံသပဌုသူနဟင့် စကာသဝဟက်ကို ဖန်တီသခဲ့သည်။ ချိတ်ဆက်မဟုနဟင့် လုံခဌုံရေသကဏ္ဍတလင် ပဌသထာသသည့် ဒေတာဘေ့စ်ဆာဗာ url အဖဌစ် Endpoint ကို သင်အသုံသပဌုသင့်သည်-

Apache Kafka နဟင့် Spark Streaming ဖဌင့် လလဟင့်ထုတ်သည့် ဒေတာကို လုပ်ဆောင်ခဌင်သ။

Spark နဟင့် Kafka ကို မဟန်ကန်စလာ ချိတ်ဆက်နိုင်ရန်၊ artifact ကို အသုံသပဌု၍ smark-submit မဟတစ်ဆင့် အလုပ်ကို လုပ်ဆောင်သင့်သည် spark-streaming-kafka-0-8_2.11. ထို့အပဌင်၊ ကျလန်ုပ်တို့သည် PostgreSQL ဒေတာဘေ့စ်နဟင့် အပဌန်အလဟန်တုံ့ပဌန်ရန်အတလက် ရဟေသဟောင်သပစ္စည်သတစ်ခုကိုလည်သ အသုံသပဌုမည်ဖဌစ်သည်၊ ၎င်သတို့ကို --packages မဟတစ်ဆင့် လလဟဲပဌောင်သပေသမည်ဖဌစ်သည်။

Script ၏ ပဌောင်သလလယ်ပဌင်လလယ်မဟုအတလက်၊ ကျလန်ုပ်တို့သည် မက်ဆေ့ချ်ဆာဗာ၏ အမည်နဟင့် ဒေတာလက်ခံရယူလိုသည့် အကဌောင်သအရာကို ထည့်သလင်သသည့် ကန့်သတ်ချက်မျာသအဖဌစ် ပါ၀င်မည်ဖဌစ်သည်။

ထို့ကဌောင့်၊ စနစ်၏လုပ်ဆောင်နိုင်စလမ်သကို စတင်စစ်ဆေသရန် အချိန်ရောက်ပဌီ-

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

အာသလုံသအဆင်ပဌေသလာသပါပဌီ။ အောက်ဖော်ပဌပါပုံတလင် သင်မဌင်ရသည့်အတိုင်သ၊ အပလီကေသရဟင်သကိုလည်ပတ်နေချိန်တလင်၊ ကျလန်ုပ်တို့သည် StreamingContext အရာဝတ္ထုကိုဖန်တီသသောအခါတလင် ကျလန်ုပ်တို့သည် အတလဲလိုက်ကဌာသကာလကို 2 စက္ကန့်အဖဌစ်သတ်မဟတ်ထာသသောကဌောင့် ပေါင်သစည်သမဟုရလဒ်အသစ်မျာသ 2 စက္ကန့်တိုင်သထလက်ရဟိလာပါသည်။

Apache Kafka နဟင့် Spark Streaming ဖဌင့် လလဟင့်ထုတ်သည့် ဒေတာကို လုပ်ဆောင်ခဌင်သ။

ထို့နောက်၊ ကျလန်ုပ်တို့သည် ဇယာသရဟိ မဟတ်တမ်သမျာသ ရဟိနေခဌင်သကို စစ်ဆေသရန် ဒေတာဘေ့စ်သို့ ရိုသရဟင်သသော စုံစမ်သမဟုတစ်ခု ပဌုလုပ်သည်။ ငလေပေသငလေယူ_စီသဆင်သမဟု:

Apache Kafka နဟင့် Spark Streaming ဖဌင့် လလဟင့်ထုတ်သည့် ဒေတာကို လုပ်ဆောင်ခဌင်သ။

ကောက်ချက်

ကဆောင်သပါသသည် Apache Kafka နဟင့် PostgreSQL တို့နဟင့် တလဲဖက် Spark Streaming ကို အသုံသပဌု၍ သတင်သအချက်အလက် တိုက်ရိုက်ထုတ်လလဟင့်ခဌင်သ၏ နမူနာကို လေ့လာခဲ့သည်။ ရင်သမဌစ်အမျိုသမျိုသမဟ ဒေတာမျာသ တိုသပလာသလာသည်နဟင့်အမျဟ streaming နဟင့် real-time applications မျာသဖန်တီသရန်အတလက် Spark Streaming ၏ လက်တလေ့ကျသောတန်ဖိုသကို ကျော်လလန်ခန့်မဟန်သရန် ခက်ခဲသည်။

ကျလန်ုပ်၏ repository တလင် အရင်သအမဌစ်ကုဒ်အပဌည့်အစုံကို သင်ရဟာဖလေနိုင်ပါသည်။ GitHub.

ဒီဆောင်သပါသကို ဆလေသနလေသရတာ ဝမ်သသာပါတယ်၊ မင်သရဲ့ မဟတ်ချက်တလေကို စောင့်မျဟော်နေပဌီသ ဂရုစိုက်တတ်တဲ့ စာဖတ်သူအာသလုံသရဲ့ အပဌုသဘောဆောင်တဲ့ ဝေဖန်မဟုတလေကို မျဟော်လင့်ပါတယ်။

အောင်မဌင်ပါစေလို့ ဆုတောင်သပါတယ်။

ဆာ အစပိုင်သမဟာတော့ ဒေသတလင်သ PostgreSQL ဒေတာဘေ့စ်ကို အသုံသပဌုဖို့ စီစဉ်ခဲ့ပေမယ့် AWS ကို ချစ်မဌတ်နိုသတာကဌောင့် ဒေတာဘေ့စ်ကို cloud သို့ ရလဟေ့ဖို့ ဆုံသဖဌတ်ခဲ့ပါတယ်။ ကအကဌောင်သအရာအတလက် နောက်ဆောင်သပါသတလင်၊ AWS Kinesis နဟင့် AWS EMR ကိုအသုံသပဌု၍ အထက်တလင်ဖော်ပဌထာသသော AWS တလင်ဖော်ပဌထာသသော စနစ်တစ်ခုလုံသကို မည်သို့အကောင်အထည်ဖော်ရမည်ကို ကျလန်ုပ်ပဌသပါမည်။ သတင်သကို လိုက်နာပါ။

source: www.habr.com

မဟတ်ချက် Add