Apache Kafka และการประมวลผลข้อมูลแบบสตรีมมิ่งด้วย Spark Streaming

สวัสดีฮับ! วันนี้เราจะสร้างระบบที่จะประมวลผลสตรีมข้อความ Apache Kafka โดยใช้ Spark Streaming และเขียนผลการประมวลผลไปยังฐานข้อมูลระบบคลาวด์ AWS RDS

ลองจินตนาการว่าสถาบันสินเชื่อบางแห่งกำหนดให้เรามีหน้าที่ในการประมวลผลธุรกรรมที่เข้ามา "ทันที" ในทุกสาขา ซึ่งสามารถทำได้เพื่อวัตถุประสงค์ในการคำนวณตำแหน่งสกุลเงินเปิดสำหรับคลัง ขีดจำกัด หรือผลลัพธ์ทางการเงินสำหรับธุรกรรม ฯลฯ ทันที

วิธีใช้กรณีนี้โดยไม่ต้องใช้เวทย์มนตร์และคาถา - อ่านให้ละเอียด! ไป!

Apache Kafka และการประมวลผลข้อมูลแบบสตรีมมิ่งด้วย Spark Streaming
(แหล่งรูปภาพ)

การแนะนำ

แน่นอนว่าการประมวลผลข้อมูลจำนวนมากแบบเรียลไทม์ให้โอกาสมากมายสำหรับใช้ในระบบสมัยใหม่ หนึ่งในการผสมผสานที่ได้รับความนิยมมากที่สุดสำหรับสิ่งนี้คือการควบคู่ของ Apache Kafka และ Spark Streaming โดยที่ Kafka สร้างสตรีมของแพ็กเก็ตข้อความขาเข้า และ Spark Streaming ประมวลผลแพ็กเก็ตเหล่านี้ในช่วงเวลาที่กำหนด

เพื่อเพิ่มความทนทานต่อข้อผิดพลาดของแอปพลิเคชัน เราจะใช้จุดตรวจสอบ ด้วยกลไกนี้ เมื่อเอ็นจิ้น Spark Streaming ต้องการกู้คืนข้อมูลที่สูญหาย ก็เพียงแค่ต้องกลับไปที่จุดตรวจสอบสุดท้ายและดำเนินการคำนวณต่อจากจุดนั้น

สถาปัตยกรรมของระบบที่พัฒนาแล้ว

Apache Kafka และการประมวลผลข้อมูลแบบสตรีมมิ่งด้วย Spark Streaming

ส่วนประกอบที่ใช้:

  • Apache Kafka เป็นระบบส่งข้อความเผยแพร่และสมัครสมาชิกแบบกระจาย เหมาะสำหรับการใช้ข้อความทั้งออฟไลน์และออนไลน์ เพื่อป้องกันข้อมูลสูญหาย ข้อความ Kafka จะถูกจัดเก็บไว้ในดิสก์และจำลองแบบภายในคลัสเตอร์ ระบบ Kafka สร้างขึ้นจากบริการซิงโครไนซ์ ZooKeeper;
  • การสตรีม Apache Spark - องค์ประกอบ Spark สำหรับการประมวลผลข้อมูลสตรีมมิ่ง โมดูล Spark Streaming สร้างขึ้นโดยใช้สถาปัตยกรรมไมโครแบทช์ โดยที่สตรีมข้อมูลจะถูกตีความว่าเป็นลำดับต่อเนื่องของแพ็กเก็ตข้อมูลขนาดเล็ก Spark Streaming นำข้อมูลจากแหล่งต่างๆ และรวมเป็นแพ็คเกจขนาดเล็ก แพ็คเกจใหม่จะถูกสร้างขึ้นตามช่วงเวลาปกติ เมื่อเริ่มต้นแต่ละช่วงเวลา จะมีการสร้างแพ็กเก็ตใหม่ และข้อมูลใดๆ ที่ได้รับระหว่างช่วงเวลานั้นจะรวมอยู่ในแพ็กเก็ต เมื่อสิ้นสุดช่วงเวลา การเติบโตของแพ็กเก็ตจะหยุดลง ขนาดของช่วงเวลาถูกกำหนดโดยพารามิเตอร์ที่เรียกว่าช่วงแบทช์
  • อาปาเช่ สปาร์ค SQL - รวมการประมวลผลเชิงสัมพันธ์เข้ากับการเขียนโปรแกรมฟังก์ชัน Spark ข้อมูลที่มีโครงสร้างหมายถึงข้อมูลที่มีสคีมา ซึ่งก็คือชุดช่องเดียวสำหรับระเบียนทั้งหมด Spark SQL รองรับอินพุตจากแหล่งข้อมูลที่มีโครงสร้างที่หลากหลาย และด้วยความพร้อมใช้งานของข้อมูลสคีมา ทำให้สามารถดึงข้อมูลเฉพาะฟิลด์บันทึกที่จำเป็นได้อย่างมีประสิทธิภาพ และยังมี DataFrame API อีกด้วย
  • AWS RDS เป็นฐานข้อมูลเชิงสัมพันธ์บนคลาวด์ที่มีราคาไม่แพงนัก ซึ่งเป็นบริการบนเว็บที่ทำให้การตั้งค่า การดำเนินการ และการปรับขนาดง่ายขึ้น และบริหารจัดการโดย Amazon โดยตรง

การติดตั้งและใช้งานเซิร์ฟเวอร์ Kafka

ก่อนที่จะใช้ Kafka โดยตรง คุณต้องตรวจสอบให้แน่ใจว่าคุณมี Java เพราะ... JVM ใช้สำหรับการทำงาน:

sudo apt-get update 
sudo apt-get install default-jre
java -version

มาสร้างผู้ใช้ใหม่เพื่อทำงานกับ Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

จากนั้นดาวน์โหลดการแจกจ่ายจากเว็บไซต์ Apache Kafka อย่างเป็นทางการ:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

คลายไฟล์เก็บถาวรที่ดาวน์โหลด:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

ขั้นตอนต่อไปเป็นทางเลือก ความจริงก็คือการตั้งค่าเริ่มต้นไม่อนุญาตให้คุณใช้คุณสมบัติทั้งหมดของ Apache Kafka ได้อย่างเต็มที่ เช่น ลบหัวข้อ หมวดหมู่ กลุ่มที่สามารถเผยแพร่ข้อความได้ หากต้องการเปลี่ยนแปลง ให้แก้ไขไฟล์การกำหนดค่า:

vim ~/kafka/config/server.properties

เพิ่มสิ่งต่อไปนี้ที่ส่วนท้ายของไฟล์:

delete.topic.enable = true

ก่อนที่จะเริ่มเซิร์ฟเวอร์ Kafka คุณต้องเริ่มเซิร์ฟเวอร์ ZooKeeper เราจะใช้สคริปต์เสริมที่มาพร้อมกับการกระจาย Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

หลังจากที่ ZooKeeper เริ่มต้นได้สำเร็จ ให้เปิดเซิร์ฟเวอร์ Kafka ในเทอร์มินัลอื่น:

bin/kafka-server-start.sh config/server.properties

มาสร้างหัวข้อใหม่ที่เรียกว่า Transaction:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

ตรวจสอบให้แน่ใจว่าได้สร้างหัวข้อที่มีพาร์ติชันและการจำลองตามจำนวนที่ต้องการแล้ว:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka และการประมวลผลข้อมูลแบบสตรีมมิ่งด้วย Spark Streaming

พลาดช่วงเวลาแห่งการทดสอบผู้ผลิตและผู้บริโภคสำหรับหัวข้อที่สร้างขึ้นใหม่ รายละเอียดเพิ่มเติมเกี่ยวกับวิธีทดสอบการส่งและรับข้อความระบุไว้ในเอกสารอย่างเป็นทางการ - ส่งข้อความบ้าง. เรามาต่อกันที่การเขียนโปรดิวเซอร์ใน Python โดยใช้ KafkaProducer API

การเขียนของผู้ผลิต

ผู้ผลิตจะสร้างข้อมูลแบบสุ่ม - 100 ข้อความทุกวินาที โดยการสุ่มข้อมูลเราหมายถึงพจนานุกรมที่ประกอบด้วยสามช่อง:

  • สาขา — ชื่อจุดขายของสถาบันสินเชื่อ
  • เงินตรา — สกุลเงินของการทำธุรกรรม
  • ราคา — จำนวนธุรกรรม จำนวนเงินจะเป็นจำนวนบวกหากเป็นการซื้อสกุลเงินโดยธนาคาร และเป็นจำนวนลบหากเป็นการขาย

รหัสสำหรับผู้ผลิตมีลักษณะดังนี้:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

ต่อไปโดยใช้วิธีการส่ง เราจะส่งข้อความไปยังเซิร์ฟเวอร์ไปยังหัวข้อที่เราต้องการ ในรูปแบบ JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

เมื่อรันสคริปต์ เราได้รับข้อความต่อไปนี้ในเทอร์มินัล:

Apache Kafka และการประมวลผลข้อมูลแบบสตรีมมิ่งด้วย Spark Streaming

ซึ่งหมายความว่าทุกอย่างทำงานได้ตามที่เราต้องการ - ผู้ผลิตสร้างและส่งข้อความไปยังหัวข้อที่เราต้องการ
ขั้นตอนต่อไปคือการติดตั้ง Spark และประมวลผลสตรีมข้อความนี้

การติดตั้ง Apache Spark

Apache Spark เป็นแพลตฟอร์มการประมวลผลคลัสเตอร์ที่เป็นสากลและมีประสิทธิภาพสูง

Spark ทำงานได้ดีกว่าการใช้งานโมเดล MapReduce ยอดนิยม ในขณะที่รองรับประเภทการคำนวณที่หลากหลาย รวมถึงการสืบค้นเชิงโต้ตอบและการประมวลผลสตรีม ความเร็วมีบทบาทสำคัญในการประมวลผลข้อมูลจำนวนมาก เนื่องจากเป็นความเร็วที่ช่วยให้คุณทำงานแบบโต้ตอบได้โดยไม่ต้องเสียเวลารอเป็นนาทีหรือเป็นชั่วโมง จุดแข็งที่ใหญ่ที่สุดประการหนึ่งของ Spark ที่ทำให้มันเร็วมากคือความสามารถในการคำนวณในหน่วยความจำ

เฟรมเวิร์กนี้เขียนด้วยภาษา Scala ดังนั้นคุณต้องติดตั้งก่อน:

sudo apt-get install scala

ดาวน์โหลดการแจกจ่าย Spark จากเว็บไซต์อย่างเป็นทางการ:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

แกะไฟล์เก็บถาวร:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

เพิ่มเส้นทางไปยัง Spark ไปยังไฟล์ bash:

vim ~/.bashrc

เพิ่มบรรทัดต่อไปนี้ผ่านตัวแก้ไข:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

รันคำสั่งด้านล่างหลังจากทำการเปลี่ยนแปลง bashrc:

source ~/.bashrc

การปรับใช้ AWS PostgreSQL

สิ่งที่เหลืออยู่คือการปรับใช้ฐานข้อมูลซึ่งเราจะอัปโหลดข้อมูลที่ประมวลผลจากสตรีม สำหรับสิ่งนี้ เราจะใช้บริการ AWS RDS

ไปที่คอนโซล AWS -> AWS RDS -> ฐานข้อมูล -> สร้างฐานข้อมูล:
Apache Kafka และการประมวลผลข้อมูลแบบสตรีมมิ่งด้วย Spark Streaming

เลือก PostgreSQL แล้วคลิกถัดไป:
Apache Kafka และการประมวลผลข้อมูลแบบสตรีมมิ่งด้วย Spark Streaming

เพราะ ตัวอย่างนี้มีวัตถุประสงค์เพื่อการศึกษาเท่านั้น เราจะใช้เซิร์ฟเวอร์ฟรี "ขั้นต่ำ" (Free Tier):
Apache Kafka และการประมวลผลข้อมูลแบบสตรีมมิ่งด้วย Spark Streaming

ต่อไป เราทำเครื่องหมายในบล็อก Free Tier และหลังจากนั้นเราจะได้รับอินสแตนซ์ของคลาส t2.micro โดยอัตโนมัติ - แม้ว่าจะอ่อนแอ แต่ก็ฟรีและค่อนข้างเหมาะสมกับงานของเรา:
Apache Kafka และการประมวลผลข้อมูลแบบสตรีมมิ่งด้วย Spark Streaming

ถัดมาคือสิ่งที่สำคัญมาก: ชื่อของอินสแตนซ์ฐานข้อมูล ชื่อผู้ใช้หลัก และรหัสผ่านของเขา ตั้งชื่ออินสแตนซ์: myHabrTest ผู้ใช้หลัก: แฮบ, รหัสผ่าน: ฮาเบอร์12345 และคลิกที่ปุ่มถัดไป:
Apache Kafka และการประมวลผลข้อมูลแบบสตรีมมิ่งด้วย Spark Streaming

ในหน้าถัดไป มีพารามิเตอร์ที่รับผิดชอบในการเข้าถึงเซิร์ฟเวอร์ฐานข้อมูลของเราจากภายนอก (การเข้าถึงแบบสาธารณะ) และความพร้อมใช้งานของพอร์ต:

Apache Kafka และการประมวลผลข้อมูลแบบสตรีมมิ่งด้วย Spark Streaming

มาสร้างการตั้งค่าใหม่สำหรับกลุ่มความปลอดภัย VPC ซึ่งจะอนุญาตให้ภายนอกเข้าถึงเซิร์ฟเวอร์ฐานข้อมูลของเราผ่านพอร์ต 5432 (PostgreSQL)
ไปที่คอนโซล AWS ในหน้าต่างเบราว์เซอร์แยกต่างหากไปที่ VPC Dashboard -> กลุ่มความปลอดภัย -> สร้างกลุ่มความปลอดภัย:
Apache Kafka และการประมวลผลข้อมูลแบบสตรีมมิ่งด้วย Spark Streaming

เราตั้งชื่อให้กับกลุ่มความปลอดภัย - PostgreSQL ซึ่งเป็นคำอธิบายระบุว่า VPC ใดที่กลุ่มนี้ควรเชื่อมโยงกับและคลิกปุ่มสร้าง:
Apache Kafka และการประมวลผลข้อมูลแบบสตรีมมิ่งด้วย Spark Streaming

กรอกกฎขาเข้าสำหรับพอร์ต 5432 สำหรับกลุ่มที่สร้างขึ้นใหม่ ดังภาพด้านล่าง คุณไม่สามารถระบุพอร์ตด้วยตนเองได้ แต่เลือก PostgreSQL จากรายการแบบเลื่อนลงประเภท

พูดอย่างเคร่งครัด ค่า ::/0 หมายถึงความพร้อมใช้งานของการรับส่งข้อมูลขาเข้าไปยังเซิร์ฟเวอร์จากทั่วทุกมุมโลก ซึ่งตามหลักบัญญัตินั้นไม่เป็นความจริงทั้งหมด แต่เพื่อวิเคราะห์ตัวอย่าง ให้เราอนุญาตให้เราใช้แนวทางนี้:
Apache Kafka และการประมวลผลข้อมูลแบบสตรีมมิ่งด้วย Spark Streaming

เรากลับไปที่หน้าเบราว์เซอร์ โดยที่เราเปิด “กำหนดการตั้งค่าขั้นสูง” ไว้ และเลือกในส่วนกลุ่มความปลอดภัย VPC -> เลือกกลุ่มความปลอดภัย VPC ที่มีอยู่ -> PostgreSQL:
Apache Kafka และการประมวลผลข้อมูลแบบสตรีมมิ่งด้วย Spark Streaming

ถัดไปในตัวเลือกฐานข้อมูล -> ชื่อฐานข้อมูล -> ตั้งชื่อ - ฮาเบอร์ดีบี.

เราสามารถคงพารามิเตอร์ที่เหลือไว้ได้ ยกเว้นการปิดใช้งานการสำรองข้อมูล (ระยะเวลาเก็บข้อมูลสำรอง - 0 วัน) การตรวจสอบ และข้อมูลเชิงลึกด้านประสิทธิภาพ ตามค่าเริ่มต้น คลิกที่ปุ่ม สร้างฐานข้อมูล:
Apache Kafka และการประมวลผลข้อมูลแบบสตรีมมิ่งด้วย Spark Streaming

ตัวจัดการเธรด

ขั้นตอนสุดท้ายคือการพัฒนางาน Spark ซึ่งจะประมวลผลข้อมูลใหม่ที่มาจาก Kafka ทุก ๆ สองวินาที และป้อนผลลัพธ์ลงในฐานข้อมูล

ตามที่ระบุไว้ข้างต้น จุดตรวจสอบเป็นกลไกหลักใน SparkStreaming ที่ต้องกำหนดค่าเพื่อให้แน่ใจว่าทนทานต่อข้อผิดพลาด เราจะใช้จุดตรวจสอบ และหากขั้นตอนล้มเหลว โมดูล Spark Streaming จะต้องกลับไปยังจุดตรวจสอบสุดท้ายและดำเนินการคำนวณต่อจากนั้นเพื่อกู้คืนข้อมูลที่สูญหาย

จุดตรวจสอบสามารถเปิดใช้งานได้โดยการตั้งค่าไดเร็กทอรีบนระบบไฟล์ที่ทนทานต่อข้อผิดพลาดและเชื่อถือได้ (เช่น HDFS, S3 ฯลฯ) ซึ่งข้อมูลจุดตรวจสอบจะถูกเก็บไว้ ทำได้โดยใช้ตัวอย่างเช่น:

streamingContext.checkpoint(checkpointDirectory)

ในตัวอย่างของเรา เราจะใช้วิธีการต่อไปนี้ กล่าวคือ หากมี checkpointDirectory อยู่แล้ว บริบทจะถูกสร้างขึ้นใหม่จากข้อมูลจุดตรวจสอบ หากไม่มีไดเร็กทอรี (เช่น ดำเนินการเป็นครั้งแรก) ดังนั้น functionToCreateContext จะถูกเรียกเพื่อสร้างบริบทใหม่และกำหนดค่า DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

เราสร้างออบเจ็กต์ DirectStream เพื่อเชื่อมต่อกับหัวข้อ “ธุรกรรม” โดยใช้วิธี createDirectStream ของไลบรารี KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

แยกวิเคราะห์ข้อมูลที่เข้ามาในรูปแบบ JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

เมื่อใช้ Spark SQL เราจะจัดกลุ่มอย่างง่ายและแสดงผลลัพธ์ในคอนโซล:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

รับข้อความค้นหาและเรียกใช้ผ่าน Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

จากนั้นเราจะบันทึกข้อมูลที่รวบรวมไว้ลงในตารางใน AWS RDS ในการบันทึกผลการรวมลงในตารางฐานข้อมูล เราจะใช้วิธีการเขียนของวัตถุ DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

คำไม่กี่คำเกี่ยวกับการตั้งค่าการเชื่อมต่อกับ AWS RDS เราสร้างผู้ใช้และรหัสผ่านในขั้นตอน “ปรับใช้ AWS PostgreSQL” คุณควรใช้ Endpoint เป็น URL เซิร์ฟเวอร์ฐานข้อมูล ซึ่งจะแสดงในส่วนการเชื่อมต่อและความปลอดภัย:

Apache Kafka และการประมวลผลข้อมูลแบบสตรีมมิ่งด้วย Spark Streaming

เพื่อเชื่อมต่อ Spark และ Kafka ได้อย่างถูกต้อง คุณควรรันงานผ่าน smark-submit โดยใช้สิ่งประดิษฐ์ จุดประกายสตรีมมิ่ง-kafka-0-8_2.11. นอกจากนี้ เรายังใช้สิ่งประดิษฐ์สำหรับการโต้ตอบกับฐานข้อมูล PostgreSQL โดยเราจะถ่ายโอนข้อมูลเหล่านั้นผ่าน --packages

เพื่อความยืดหยุ่นของสคริปต์เราจะรวมชื่อของเซิร์ฟเวอร์ข้อความและหัวข้อที่เราต้องการรับข้อมูลเป็นพารามิเตอร์อินพุต

ถึงเวลาที่จะเปิดตัวและตรวจสอบการทำงานของระบบ:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

ทุกอย่างได้ผล! ดังที่คุณเห็นในภาพด้านล่าง ในขณะที่แอปพลิเคชันกำลังทำงาน ผลลัพธ์การรวมใหม่จะถูกส่งออกทุกๆ 2 วินาที เนื่องจากเราตั้งค่าช่วงเวลาแบทช์เป็น 2 วินาทีเมื่อเราสร้างออบเจ็กต์ StreamingContext:

Apache Kafka และการประมวลผลข้อมูลแบบสตรีมมิ่งด้วย Spark Streaming

ต่อไป เราทำแบบสอบถามง่ายๆ ในฐานข้อมูลเพื่อตรวจสอบการมีอยู่ของบันทึกในตาราง ธุรกรรม_ไหล:

Apache Kafka และการประมวลผลข้อมูลแบบสตรีมมิ่งด้วย Spark Streaming

ข้อสรุป

บทความนี้กล่าวถึงตัวอย่างการประมวลผลสตรีมข้อมูลโดยใช้ Spark Streaming ร่วมกับ Apache Kafka และ PostgreSQL ด้วยการเติบโตของข้อมูลจากแหล่งต่างๆ จึงเป็นเรื่องยากที่จะประเมินค่าสูงไปในทางปฏิบัติของ Spark Streaming สำหรับการสร้างแอปพลิเคชันสตรีมมิ่งและเรียลไทม์

คุณสามารถค้นหาซอร์สโค้ดแบบเต็มได้ในพื้นที่เก็บข้อมูลของฉันที่ GitHub.

ฉันยินดีที่จะหารือเกี่ยวกับบทความนี้ ฉันหวังว่าจะได้รับความคิดเห็นของคุณ และฉันก็หวังว่าจะได้รับคำวิจารณ์ที่สร้างสรรค์จากผู้อ่านที่ห่วงใยทุกคน

ฉันขอให้คุณประสบความสำเร็จ!

ps ในตอนแรกมีแผนจะใช้ฐานข้อมูล PostgreSQL ในเครื่อง แต่เนื่องจากฉันชอบ AWS ฉันจึงตัดสินใจย้ายฐานข้อมูลไปยังคลาวด์ ในบทความถัดไปในหัวข้อนี้ ฉันจะแสดงวิธีใช้งานระบบทั้งหมดที่อธิบายไว้ข้างต้นใน AWS โดยใช้ AWS Kinesis และ AWS EMR ติดตามข่าว!

ที่มา: will.com

เพิ่มความคิดเห็น