สวัสดีฮับ! วันนี้เราจะสร้างระบบที่จะประมวลผลสตรีมข้อความ Apache Kafka โดยใช้ Spark Streaming และเขียนผลการประมวลผลไปยังฐานข้อมูลระบบคลาวด์ AWS RDS
ลองจินตนาการว่าสถาบันสินเชื่อบางแห่งกำหนดให้เรามีหน้าที่ในการประมวลผลธุรกรรมที่เข้ามา "ทันที" ในทุกสาขา ซึ่งสามารถทำได้เพื่อวัตถุประสงค์ในการคำนวณตำแหน่งสกุลเงินเปิดสำหรับคลัง ขีดจำกัด หรือผลลัพธ์ทางการเงินสำหรับธุรกรรม ฯลฯ ทันที
วิธีใช้กรณีนี้โดยไม่ต้องใช้เวทย์มนตร์และคาถา - อ่านให้ละเอียด! ไป!
การแนะนำ
แน่นอนว่าการประมวลผลข้อมูลจำนวนมากแบบเรียลไทม์ให้โอกาสมากมายสำหรับใช้ในระบบสมัยใหม่ หนึ่งในการผสมผสานที่ได้รับความนิยมมากที่สุดสำหรับสิ่งนี้คือการควบคู่ของ Apache Kafka และ Spark Streaming โดยที่ Kafka สร้างสตรีมของแพ็กเก็ตข้อความขาเข้า และ Spark Streaming ประมวลผลแพ็กเก็ตเหล่านี้ในช่วงเวลาที่กำหนด
เพื่อเพิ่มความทนทานต่อข้อผิดพลาดของแอปพลิเคชัน เราจะใช้จุดตรวจสอบ ด้วยกลไกนี้ เมื่อเอ็นจิ้น Spark Streaming ต้องการกู้คืนข้อมูลที่สูญหาย ก็เพียงแค่ต้องกลับไปที่จุดตรวจสอบสุดท้ายและดำเนินการคำนวณต่อจากจุดนั้น
สถาปัตยกรรมของระบบที่พัฒนาแล้ว
ส่วนประกอบที่ใช้:
Apache Kafka เป็นระบบส่งข้อความเผยแพร่และสมัครสมาชิกแบบกระจาย เหมาะสำหรับการใช้ข้อความทั้งออฟไลน์และออนไลน์ เพื่อป้องกันข้อมูลสูญหาย ข้อความ Kafka จะถูกจัดเก็บไว้ในดิสก์และจำลองแบบภายในคลัสเตอร์ ระบบ Kafka สร้างขึ้นจากบริการซิงโครไนซ์ ZooKeeper;การสตรีม Apache Spark - องค์ประกอบ Spark สำหรับการประมวลผลข้อมูลสตรีมมิ่ง โมดูล Spark Streaming สร้างขึ้นโดยใช้สถาปัตยกรรมไมโครแบทช์ โดยที่สตรีมข้อมูลจะถูกตีความว่าเป็นลำดับต่อเนื่องของแพ็กเก็ตข้อมูลขนาดเล็ก Spark Streaming นำข้อมูลจากแหล่งต่างๆ และรวมเป็นแพ็คเกจขนาดเล็ก แพ็คเกจใหม่จะถูกสร้างขึ้นตามช่วงเวลาปกติ เมื่อเริ่มต้นแต่ละช่วงเวลา จะมีการสร้างแพ็กเก็ตใหม่ และข้อมูลใดๆ ที่ได้รับระหว่างช่วงเวลานั้นจะรวมอยู่ในแพ็กเก็ต เมื่อสิ้นสุดช่วงเวลา การเติบโตของแพ็กเก็ตจะหยุดลง ขนาดของช่วงเวลาถูกกำหนดโดยพารามิเตอร์ที่เรียกว่าช่วงแบทช์อาปาเช่ สปาร์ค SQL - รวมการประมวลผลเชิงสัมพันธ์เข้ากับการเขียนโปรแกรมฟังก์ชัน Spark ข้อมูลที่มีโครงสร้างหมายถึงข้อมูลที่มีสคีมา ซึ่งก็คือชุดช่องเดียวสำหรับระเบียนทั้งหมด Spark SQL รองรับอินพุตจากแหล่งข้อมูลที่มีโครงสร้างที่หลากหลาย และด้วยความพร้อมใช้งานของข้อมูลสคีมา ทำให้สามารถดึงข้อมูลเฉพาะฟิลด์บันทึกที่จำเป็นได้อย่างมีประสิทธิภาพ และยังมี DataFrame API อีกด้วยAWS RDS เป็นฐานข้อมูลเชิงสัมพันธ์บนคลาวด์ที่มีราคาไม่แพงนัก ซึ่งเป็นบริการบนเว็บที่ทำให้การตั้งค่า การดำเนินการ และการปรับขนาดง่ายขึ้น และบริหารจัดการโดย Amazon โดยตรง
การติดตั้งและใช้งานเซิร์ฟเวอร์ Kafka
ก่อนที่จะใช้ Kafka โดยตรง คุณต้องตรวจสอบให้แน่ใจว่าคุณมี Java เพราะ... JVM ใช้สำหรับการทำงาน:
sudo apt-get update
sudo apt-get install default-jre
java -version
มาสร้างผู้ใช้ใหม่เพื่อทำงานกับ Kafka:
sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo
จากนั้นดาวน์โหลดการแจกจ่ายจากเว็บไซต์ Apache Kafka อย่างเป็นทางการ:
wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"
คลายไฟล์เก็บถาวรที่ดาวน์โหลด:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
ขั้นตอนต่อไปเป็นทางเลือก ความจริงก็คือการตั้งค่าเริ่มต้นไม่อนุญาตให้คุณใช้คุณสมบัติทั้งหมดของ Apache Kafka ได้อย่างเต็มที่ เช่น ลบหัวข้อ หมวดหมู่ กลุ่มที่สามารถเผยแพร่ข้อความได้ หากต้องการเปลี่ยนแปลง ให้แก้ไขไฟล์การกำหนดค่า:
vim ~/kafka/config/server.properties
เพิ่มสิ่งต่อไปนี้ที่ส่วนท้ายของไฟล์:
delete.topic.enable = true
ก่อนที่จะเริ่มเซิร์ฟเวอร์ Kafka คุณต้องเริ่มเซิร์ฟเวอร์ ZooKeeper เราจะใช้สคริปต์เสริมที่มาพร้อมกับการกระจาย Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
หลังจากที่ ZooKeeper เริ่มต้นได้สำเร็จ ให้เปิดเซิร์ฟเวอร์ Kafka ในเทอร์มินัลอื่น:
bin/kafka-server-start.sh config/server.properties
มาสร้างหัวข้อใหม่ที่เรียกว่า Transaction:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction
ตรวจสอบให้แน่ใจว่าได้สร้างหัวข้อที่มีพาร์ติชันและการจำลองตามจำนวนที่ต้องการแล้ว:
bin/kafka-topics.sh --describe --zookeeper localhost:2181
พลาดช่วงเวลาแห่งการทดสอบผู้ผลิตและผู้บริโภคสำหรับหัวข้อที่สร้างขึ้นใหม่ รายละเอียดเพิ่มเติมเกี่ยวกับวิธีทดสอบการส่งและรับข้อความระบุไว้ในเอกสารอย่างเป็นทางการ -
การเขียนของผู้ผลิต
ผู้ผลิตจะสร้างข้อมูลแบบสุ่ม - 100 ข้อความทุกวินาที โดยการสุ่มข้อมูลเราหมายถึงพจนานุกรมที่ประกอบด้วยสามช่อง:
- สาขา — ชื่อจุดขายของสถาบันสินเชื่อ
- เงินตรา — สกุลเงินของการทำธุรกรรม
- ราคา — จำนวนธุรกรรม จำนวนเงินจะเป็นจำนวนบวกหากเป็นการซื้อสกุลเงินโดยธนาคาร และเป็นจำนวนลบหากเป็นการขาย
รหัสสำหรับผู้ผลิตมีลักษณะดังนี้:
from numpy.random import choice, randint
def get_random_value():
new_dict = {}
branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
currency_list = ['RUB', 'USD', 'EUR', 'GBP']
new_dict['branch'] = choice(branch_list)
new_dict['currency'] = choice(currency_list)
new_dict['amount'] = randint(-100, 100)
return new_dict
ต่อไปโดยใช้วิธีการส่ง เราจะส่งข้อความไปยังเซิร์ฟเวอร์ไปยังหัวข้อที่เราต้องการ ในรูปแบบ JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
เมื่อรันสคริปต์ เราได้รับข้อความต่อไปนี้ในเทอร์มินัล:
ซึ่งหมายความว่าทุกอย่างทำงานได้ตามที่เราต้องการ - ผู้ผลิตสร้างและส่งข้อความไปยังหัวข้อที่เราต้องการ
ขั้นตอนต่อไปคือการติดตั้ง Spark และประมวลผลสตรีมข้อความนี้
การติดตั้ง Apache Spark
Apache Spark เป็นแพลตฟอร์มการประมวลผลคลัสเตอร์ที่เป็นสากลและมีประสิทธิภาพสูง
Spark ทำงานได้ดีกว่าการใช้งานโมเดล MapReduce ยอดนิยม ในขณะที่รองรับประเภทการคำนวณที่หลากหลาย รวมถึงการสืบค้นเชิงโต้ตอบและการประมวลผลสตรีม ความเร็วมีบทบาทสำคัญในการประมวลผลข้อมูลจำนวนมาก เนื่องจากเป็นความเร็วที่ช่วยให้คุณทำงานแบบโต้ตอบได้โดยไม่ต้องเสียเวลารอเป็นนาทีหรือเป็นชั่วโมง จุดแข็งที่ใหญ่ที่สุดประการหนึ่งของ Spark ที่ทำให้มันเร็วมากคือความสามารถในการคำนวณในหน่วยความจำ
เฟรมเวิร์กนี้เขียนด้วยภาษา Scala ดังนั้นคุณต้องติดตั้งก่อน:
sudo apt-get install scala
ดาวน์โหลดการแจกจ่าย Spark จากเว็บไซต์อย่างเป็นทางการ:
wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"
แกะไฟล์เก็บถาวร:
sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark
เพิ่มเส้นทางไปยัง Spark ไปยังไฟล์ bash:
vim ~/.bashrc
เพิ่มบรรทัดต่อไปนี้ผ่านตัวแก้ไข:
SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH
รันคำสั่งด้านล่างหลังจากทำการเปลี่ยนแปลง bashrc:
source ~/.bashrc
การปรับใช้ AWS PostgreSQL
สิ่งที่เหลืออยู่คือการปรับใช้ฐานข้อมูลซึ่งเราจะอัปโหลดข้อมูลที่ประมวลผลจากสตรีม สำหรับสิ่งนี้ เราจะใช้บริการ AWS RDS
ไปที่คอนโซล AWS -> AWS RDS -> ฐานข้อมูล -> สร้างฐานข้อมูล:
เลือก PostgreSQL แล้วคลิกถัดไป:
เพราะ ตัวอย่างนี้มีวัตถุประสงค์เพื่อการศึกษาเท่านั้น เราจะใช้เซิร์ฟเวอร์ฟรี "ขั้นต่ำ" (Free Tier):
ต่อไป เราทำเครื่องหมายในบล็อก Free Tier และหลังจากนั้นเราจะได้รับอินสแตนซ์ของคลาส t2.micro โดยอัตโนมัติ - แม้ว่าจะอ่อนแอ แต่ก็ฟรีและค่อนข้างเหมาะสมกับงานของเรา:
ถัดมาคือสิ่งที่สำคัญมาก: ชื่อของอินสแตนซ์ฐานข้อมูล ชื่อผู้ใช้หลัก และรหัสผ่านของเขา ตั้งชื่ออินสแตนซ์: myHabrTest ผู้ใช้หลัก: แฮบ, รหัสผ่าน: ฮาเบอร์12345 และคลิกที่ปุ่มถัดไป:
ในหน้าถัดไป มีพารามิเตอร์ที่รับผิดชอบในการเข้าถึงเซิร์ฟเวอร์ฐานข้อมูลของเราจากภายนอก (การเข้าถึงแบบสาธารณะ) และความพร้อมใช้งานของพอร์ต:
มาสร้างการตั้งค่าใหม่สำหรับกลุ่มความปลอดภัย VPC ซึ่งจะอนุญาตให้ภายนอกเข้าถึงเซิร์ฟเวอร์ฐานข้อมูลของเราผ่านพอร์ต 5432 (PostgreSQL)
ไปที่คอนโซล AWS ในหน้าต่างเบราว์เซอร์แยกต่างหากไปที่ VPC Dashboard -> กลุ่มความปลอดภัย -> สร้างกลุ่มความปลอดภัย:
เราตั้งชื่อให้กับกลุ่มความปลอดภัย - PostgreSQL ซึ่งเป็นคำอธิบายระบุว่า VPC ใดที่กลุ่มนี้ควรเชื่อมโยงกับและคลิกปุ่มสร้าง:
กรอกกฎขาเข้าสำหรับพอร์ต 5432 สำหรับกลุ่มที่สร้างขึ้นใหม่ ดังภาพด้านล่าง คุณไม่สามารถระบุพอร์ตด้วยตนเองได้ แต่เลือก PostgreSQL จากรายการแบบเลื่อนลงประเภท
พูดอย่างเคร่งครัด ค่า ::/0 หมายถึงความพร้อมใช้งานของการรับส่งข้อมูลขาเข้าไปยังเซิร์ฟเวอร์จากทั่วทุกมุมโลก ซึ่งตามหลักบัญญัตินั้นไม่เป็นความจริงทั้งหมด แต่เพื่อวิเคราะห์ตัวอย่าง ให้เราอนุญาตให้เราใช้แนวทางนี้:
เรากลับไปที่หน้าเบราว์เซอร์ โดยที่เราเปิด “กำหนดการตั้งค่าขั้นสูง” ไว้ และเลือกในส่วนกลุ่มความปลอดภัย VPC -> เลือกกลุ่มความปลอดภัย VPC ที่มีอยู่ -> PostgreSQL:
ถัดไปในตัวเลือกฐานข้อมูล -> ชื่อฐานข้อมูล -> ตั้งชื่อ - ฮาเบอร์ดีบี.
เราสามารถคงพารามิเตอร์ที่เหลือไว้ได้ ยกเว้นการปิดใช้งานการสำรองข้อมูล (ระยะเวลาเก็บข้อมูลสำรอง - 0 วัน) การตรวจสอบ และข้อมูลเชิงลึกด้านประสิทธิภาพ ตามค่าเริ่มต้น คลิกที่ปุ่ม สร้างฐานข้อมูล:
ตัวจัดการเธรด
ขั้นตอนสุดท้ายคือการพัฒนางาน Spark ซึ่งจะประมวลผลข้อมูลใหม่ที่มาจาก Kafka ทุก ๆ สองวินาที และป้อนผลลัพธ์ลงในฐานข้อมูล
ตามที่ระบุไว้ข้างต้น จุดตรวจสอบเป็นกลไกหลักใน SparkStreaming ที่ต้องกำหนดค่าเพื่อให้แน่ใจว่าทนทานต่อข้อผิดพลาด เราจะใช้จุดตรวจสอบ และหากขั้นตอนล้มเหลว โมดูล Spark Streaming จะต้องกลับไปยังจุดตรวจสอบสุดท้ายและดำเนินการคำนวณต่อจากนั้นเพื่อกู้คืนข้อมูลที่สูญหาย
จุดตรวจสอบสามารถเปิดใช้งานได้โดยการตั้งค่าไดเร็กทอรีบนระบบไฟล์ที่ทนทานต่อข้อผิดพลาดและเชื่อถือได้ (เช่น HDFS, S3 ฯลฯ) ซึ่งข้อมูลจุดตรวจสอบจะถูกเก็บไว้ ทำได้โดยใช้ตัวอย่างเช่น:
streamingContext.checkpoint(checkpointDirectory)
ในตัวอย่างของเรา เราจะใช้วิธีการต่อไปนี้ กล่าวคือ หากมี checkpointDirectory อยู่แล้ว บริบทจะถูกสร้างขึ้นใหม่จากข้อมูลจุดตรวจสอบ หากไม่มีไดเร็กทอรี (เช่น ดำเนินการเป็นครั้งแรก) ดังนั้น functionToCreateContext จะถูกเรียกเพื่อสร้างบริบทใหม่และกำหนดค่า DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
เราสร้างออบเจ็กต์ DirectStream เพื่อเชื่อมต่อกับหัวข้อ “ธุรกรรม” โดยใช้วิธี createDirectStream ของไลบรารี KafkaUtils:
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)
broker_list = 'localhost:9092'
topic = 'transaction'
directKafkaStream = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": broker_list})
แยกวิเคราะห์ข้อมูลที่เข้ามาในรูปแบบ JSON:
rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
currency=w['currency'],
amount=w['amount']))
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")
เมื่อใช้ Spark SQL เราจะจัดกลุ่มอย่างง่ายและแสดงผลลัพธ์ในคอนโซล:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
รับข้อความค้นหาและเรียกใช้ผ่าน Spark SQL:
sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)
จากนั้นเราจะบันทึกข้อมูลที่รวบรวมไว้ลงในตารางใน AWS RDS ในการบันทึกผลการรวมลงในตารางฐานข้อมูล เราจะใช้วิธีการเขียนของวัตถุ DataFrame:
testResultDataFrame.write
.format("jdbc")
.mode("append")
.option("driver", 'org.postgresql.Driver')
.option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB")
.option("dbtable", "transaction_flow")
.option("user", "habr")
.option("password", "habr12345")
.save()
คำไม่กี่คำเกี่ยวกับการตั้งค่าการเชื่อมต่อกับ AWS RDS เราสร้างผู้ใช้และรหัสผ่านในขั้นตอน “ปรับใช้ AWS PostgreSQL” คุณควรใช้ Endpoint เป็น URL เซิร์ฟเวอร์ฐานข้อมูล ซึ่งจะแสดงในส่วนการเชื่อมต่อและความปลอดภัย:
เพื่อเชื่อมต่อ Spark และ Kafka ได้อย่างถูกต้อง คุณควรรันงานผ่าน smark-submit โดยใช้สิ่งประดิษฐ์ จุดประกายสตรีมมิ่ง-kafka-0-8_2.11. นอกจากนี้ เรายังใช้สิ่งประดิษฐ์สำหรับการโต้ตอบกับฐานข้อมูล PostgreSQL โดยเราจะถ่ายโอนข้อมูลเหล่านั้นผ่าน --packages
เพื่อความยืดหยุ่นของสคริปต์เราจะรวมชื่อของเซิร์ฟเวอร์ข้อความและหัวข้อที่เราต้องการรับข้อมูลเป็นพารามิเตอร์อินพุต
ถึงเวลาที่จะเปิดตัวและตรวจสอบการทำงานของระบบ:
spark-submit
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207
spark_job.py localhost:9092 transaction
ทุกอย่างได้ผล! ดังที่คุณเห็นในภาพด้านล่าง ในขณะที่แอปพลิเคชันกำลังทำงาน ผลลัพธ์การรวมใหม่จะถูกส่งออกทุกๆ 2 วินาที เนื่องจากเราตั้งค่าช่วงเวลาแบทช์เป็น 2 วินาทีเมื่อเราสร้างออบเจ็กต์ StreamingContext:
ต่อไป เราทำแบบสอบถามง่ายๆ ในฐานข้อมูลเพื่อตรวจสอบการมีอยู่ของบันทึกในตาราง ธุรกรรม_ไหล:
ข้อสรุป
บทความนี้กล่าวถึงตัวอย่างการประมวลผลสตรีมข้อมูลโดยใช้ Spark Streaming ร่วมกับ Apache Kafka และ PostgreSQL ด้วยการเติบโตของข้อมูลจากแหล่งต่างๆ จึงเป็นเรื่องยากที่จะประเมินค่าสูงไปในทางปฏิบัติของ Spark Streaming สำหรับการสร้างแอปพลิเคชันสตรีมมิ่งและเรียลไทม์
คุณสามารถค้นหาซอร์สโค้ดแบบเต็มได้ในพื้นที่เก็บข้อมูลของฉันที่
ฉันยินดีที่จะหารือเกี่ยวกับบทความนี้ ฉันหวังว่าจะได้รับความคิดเห็นของคุณ และฉันก็หวังว่าจะได้รับคำวิจารณ์ที่สร้างสรรค์จากผู้อ่านที่ห่วงใยทุกคน
ฉันขอให้คุณประสบความสำเร็จ!
ps ในตอนแรกมีแผนจะใช้ฐานข้อมูล PostgreSQL ในเครื่อง แต่เนื่องจากฉันชอบ AWS ฉันจึงตัดสินใจย้ายฐานข้อมูลไปยังคลาวด์ ในบทความถัดไปในหัวข้อนี้ ฉันจะแสดงวิธีใช้งานระบบทั้งหมดที่อธิบายไว้ข้างต้นใน AWS โดยใช้ AWS Kinesis และ AWS EMR ติดตามข่าว!
ที่มา: will.com