Spark Streaming์„ ์‚ฌ์šฉํ•œ Apache Kafka ๋ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ

์•ˆ๋…•, ํ•˜๋ธŒ๋ฅด! ์˜ค๋Š˜์€ Spark Streaming์„ ์‚ฌ์šฉํ•˜์—ฌ Apache Kafka ๋ฉ”์‹œ์ง€ ์ŠคํŠธ๋ฆผ์„ ์ฒ˜๋ฆฌํ•˜๊ณ  ์ฒ˜๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ AWS RDS ํด๋ผ์šฐ๋“œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์“ฐ๋Š” ์‹œ์Šคํ…œ์„ ๊ตฌ์ถ•ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.

ํŠน์ • ์‹ ์šฉ ๊ธฐ๊ด€์ด ๋ชจ๋“  ์ง€์ ์—์„œ "์ฆ‰์‹œ" ๋“ค์–ด์˜ค๋Š” ๊ฑฐ๋ž˜๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ์ž‘์—…์„ ์šฐ๋ฆฌ์—๊ฒŒ ์ง€์ •ํ–ˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•ด ๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค. ์ด๋Š” ์žฌ๋ฌด๋ถ€์˜ ๊ณต๊ฐœ ํ†ตํ™” ํฌ์ง€์…˜, ๊ฑฐ๋ž˜ ํ•œ๋„ ๋˜๋Š” ์žฌ๋ฌด ๊ฒฐ๊ณผ ๋“ฑ์„ ์‹ ์†ํ•˜๊ฒŒ ๊ณ„์‚ฐํ•˜๊ธฐ ์œ„ํ•œ ๋ชฉ์ ์œผ๋กœ ์ˆ˜ํ–‰๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋งˆ๋ฒ•๊ณผ ๋งˆ๋ฒ• ์ฃผ๋ฌธ์„ ์‚ฌ์šฉํ•˜์ง€ ์•Š๊ณ  ์ด ์‚ฌ๋ก€๋ฅผ ๊ตฌํ˜„ํ•˜๋Š” ๋ฐฉ๋ฒ• - ์•„๋ž˜ ๋‚ด์šฉ์„ ์ฝ์–ด๋ณด์„ธ์š”! ๊ฐ€๋‹ค!

Spark Streaming์„ ์‚ฌ์šฉํ•œ Apache Kafka ๋ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ
(์ด๋ฏธ์ง€ ์ถœ์ฒ˜)

์†Œ๊ฐœ

๋ฌผ๋ก  ๋Œ€๋Ÿ‰์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์‹ค์‹œ๊ฐ„์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๋ฉด ํ˜„๋Œ€ ์‹œ์Šคํ…œ์—์„œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ์ถฉ๋ถ„ํ•œ ๊ธฐํšŒ๊ฐ€ ์ œ๊ณต๋ฉ๋‹ˆ๋‹ค. ๊ฐ€์žฅ ๋„๋ฆฌ ์‚ฌ์šฉ๋˜๋Š” ์กฐํ•ฉ ์ค‘ ํ•˜๋‚˜๋Š” Apache Kafka์™€ Spark Streaming์˜ ์ง๋ ฌ ์กฐํ•ฉ์ž…๋‹ˆ๋‹ค. ์—ฌ๊ธฐ์„œ Kafka๋Š” ์ˆ˜์‹  ๋ฉ”์‹œ์ง€ ํŒจํ‚ท์˜ ์ŠคํŠธ๋ฆผ์„ ์ƒ์„ฑํ•˜๊ณ  Spark Streaming์€ ์ง€์ •๋œ ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ์œผ๋กœ ์ด๋Ÿฌํ•œ ํŒจํ‚ท์„ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค.

์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์˜ ๋‚ด๊ฒฐํ•จ์„ฑ์„ ๋†’์ด๊ธฐ ์œ„ํ•ด ์ฒดํฌํฌ์ธํŠธ๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ์ด ๋ฉ”์ปค๋‹ˆ์ฆ˜์„ ์‚ฌ์šฉํ•˜๋ฉด Spark Streaming ์—”์ง„์ด ์†์‹ค๋œ ๋ฐ์ดํ„ฐ๋ฅผ ๋ณต๊ตฌํ•ด์•ผ ํ•  ๋•Œ ๋งˆ์ง€๋ง‰ ์ฒดํฌํฌ์ธํŠธ๋กœ ๋Œ์•„๊ฐ€์„œ ๊ฑฐ๊ธฐ์—์„œ ๊ณ„์‚ฐ์„ ์žฌ๊ฐœํ•˜๊ธฐ๋งŒ ํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค.

๊ฐœ๋ฐœ๋œ ์‹œ์Šคํ…œ์˜ ์•„ํ‚คํ…์ฒ˜

Spark Streaming์„ ์‚ฌ์šฉํ•œ Apache Kafka ๋ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ

์‚ฌ์šฉ๋œ ๊ตฌ์„ฑ ์š”์†Œ:

  • ์•„ํŒŒ์น˜ ์นดํ”„์นด ๋ถ„์‚ฐ ๊ฒŒ์‹œ-๊ตฌ๋… ๋ฉ”์‹œ์ง• ์‹œ์Šคํ…œ์ž…๋‹ˆ๋‹ค. ์˜คํ”„๋ผ์ธ ๋ฐ ์˜จ๋ผ์ธ ๋ฉ”์‹œ์ง€ ์†Œ๋น„ ๋ชจ๋‘์— ์ ํ•ฉํ•ฉ๋‹ˆ๋‹ค. ๋ฐ์ดํ„ฐ ์†์‹ค์„ ๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•ด Kafka ๋ฉ”์‹œ์ง€๋Š” ๋””์Šคํฌ์— ์ €์žฅ๋˜๊ณ  ํด๋Ÿฌ์Šคํ„ฐ ๋‚ด์— ๋ณต์ œ๋ฉ๋‹ˆ๋‹ค. Kafka ์‹œ์Šคํ…œ์€ ZooKeeper ๋™๊ธฐํ™” ์„œ๋น„์Šค ์œ„์— ๊ตฌ์ถ•๋˜์—ˆ์Šต๋‹ˆ๋‹ค.
  • ์•„ํŒŒ์น˜ ์ŠคํŒŒํฌ ์ŠคํŠธ๋ฆฌ๋ฐ - ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ Spark ๊ตฌ์„ฑ ์š”์†Œ์ž…๋‹ˆ๋‹ค. Spark Streaming ๋ชจ๋“ˆ์€ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์ด ์ž‘์€ ๋ฐ์ดํ„ฐ ํŒจํ‚ท์˜ ์—ฐ์† ์‹œํ€€์Šค๋กœ ํ•ด์„๋˜๋Š” ๋งˆ์ดํฌ๋กœ ๋ฐฐ์น˜ ์•„ํ‚คํ…์ฒ˜๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๊ตฌ์ถ•๋˜์—ˆ์Šต๋‹ˆ๋‹ค. Spark Streaming์€ ๋‹ค์–‘ํ•œ ์†Œ์Šค์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์™€ ์ž‘์€ ํŒจํ‚ค์ง€๋กœ ๊ฒฐํ•ฉํ•ฉ๋‹ˆ๋‹ค. ์ƒˆ๋กœ์šด ํŒจํ‚ค์ง€๋Š” ์ •๊ธฐ์ ์œผ๋กœ ์ƒ์„ฑ๋ฉ๋‹ˆ๋‹ค. ๊ฐ ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ์ด ์‹œ์ž‘๋  ๋•Œ ์ƒˆ ํŒจํ‚ท์ด ์ƒ์„ฑ๋˜๊ณ  ํ•ด๋‹น ๊ฐ„๊ฒฉ ๋™์•ˆ ์ˆ˜์‹ ๋œ ๋ชจ๋“  ๋ฐ์ดํ„ฐ๊ฐ€ ํŒจํ‚ท์— ํฌํ•จ๋ฉ๋‹ˆ๋‹ค. ๊ฐ„๊ฒฉ์ด ๋๋‚˜๋ฉด ํŒจํ‚ท ์ฆ๊ฐ€๊ฐ€ ์ค‘์ง€๋ฉ๋‹ˆ๋‹ค. ๊ฐ„๊ฒฉ์˜ ํฌ๊ธฐ๋Š” ๋ฐฐ์น˜ ๊ฐ„๊ฒฉ์ด๋ผ๋Š” ๋งค๊ฐœ๋ณ€์ˆ˜์— ์˜ํ•ด ๊ฒฐ์ •๋ฉ๋‹ˆ๋‹ค.
  • ์•„ํŒŒ์น˜ ์ŠคํŒŒํฌ SQL - ๊ด€๊ณ„ํ˜• ์ฒ˜๋ฆฌ์™€ Spark ํ•จ์ˆ˜ํ˜• ํ”„๋กœ๊ทธ๋ž˜๋ฐ์„ ๊ฒฐํ•ฉํ•ฉ๋‹ˆ๋‹ค. ๊ตฌ์กฐํ™”๋œ ๋ฐ์ดํ„ฐ๋Š” ์Šคํ‚ค๋งˆ, ์ฆ‰ ๋ชจ๋“  ๋ ˆ์ฝ”๋“œ์— ๋Œ€ํ•œ ๋‹จ์ผ ํ•„๋“œ ์ง‘ํ•ฉ์ด ์žˆ๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์˜๋ฏธํ•ฉ๋‹ˆ๋‹ค. Spark SQL์€ ๋‹ค์–‘ํ•œ ๊ตฌ์กฐํ™”๋œ ๋ฐ์ดํ„ฐ ์†Œ์Šค์˜ ์ž…๋ ฅ์„ ์ง€์›ํ•˜๋ฉฐ, ์Šคํ‚ค๋งˆ ์ •๋ณด์˜ ๊ฐ€์šฉ์„ฑ ๋•๋ถ„์— ํ•„์š”ํ•œ ๋ ˆ์ฝ”๋“œ ํ•„๋“œ๋งŒ ํšจ์œจ์ ์œผ๋กœ ๊ฒ€์ƒ‰ํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ DataFrame API๋„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.
  • AWS RDS ์ƒ๋Œ€์ ์œผ๋กœ ์ €๋ ดํ•œ ํด๋ผ์šฐ๋“œ ๊ธฐ๋ฐ˜ ๊ด€๊ณ„ํ˜• ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค, ์„ค์ •, ์šด์˜ ๋ฐ ํ™•์žฅ์„ ๋‹จ์ˆœํ™”ํ•˜๊ณ  Amazon์—์„œ ์ง์ ‘ ๊ด€๋ฆฌํ•˜๋Š” ์›น ์„œ๋น„์Šค์ž…๋‹ˆ๋‹ค.

Kafka ์„œ๋ฒ„ ์„ค์น˜ ๋ฐ ์‹คํ–‰

Kafka๋ฅผ ์ง์ ‘ ์‚ฌ์šฉํ•˜๊ธฐ ์ „์— Java๊ฐ€ ์žˆ๋Š”์ง€ ํ™•์ธํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ์™œ๋ƒํ•˜๋ฉด... JVM์€ ์ž‘์—…์— ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค.

sudo apt-get update 
sudo apt-get install default-jre
java -version

Kafka๋ฅผ ์‚ฌ์šฉํ•  ์ƒˆ ์‚ฌ์šฉ์ž๋ฅผ ๋งŒ๋“ค์–ด ๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

๋‹ค์Œ์œผ๋กœ ๊ณต์‹ Apache Kafka ์›น์‚ฌ์ดํŠธ์—์„œ ๋ฐฐํฌํŒ์„ ๋‹ค์šด๋กœ๋“œํ•ฉ๋‹ˆ๋‹ค.

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

๋‹ค์šด๋กœ๋“œํ•œ ์•„์นด์ด๋ธŒ์˜ ์••์ถ•์„ ํ‘ผ๋‹ค:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

๋‹ค์Œ ๋‹จ๊ณ„๋Š” ์„ ํƒ์‚ฌํ•ญ์ž…๋‹ˆ๋‹ค. ์‚ฌ์‹ค ๊ธฐ๋ณธ ์„ค์ •์œผ๋กœ๋Š” Apache Kafka์˜ ๋ชจ๋“  ๊ธฐ๋Šฅ์„ ์™„์ „ํžˆ ์‚ฌ์šฉํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด ๋ฉ”์‹œ์ง€๋ฅผ ๊ฒŒ์‹œํ•  ์ˆ˜ ์žˆ๋Š” ์ฃผ์ œ, ์นดํ…Œ๊ณ ๋ฆฌ, ๊ทธ๋ฃน์„ ์‚ญ์ œํ•ฉ๋‹ˆ๋‹ค. ์ด๋ฅผ ๋ณ€๊ฒฝํ•˜๋ ค๋ฉด ๊ตฌ์„ฑ ํŒŒ์ผ์„ ํŽธ์ง‘ํ•ด ๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

vim ~/kafka/config/server.properties

ํŒŒ์ผ ๋์— ๋‹ค์Œ์„ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.

delete.topic.enable = true

Kafka ์„œ๋ฒ„๋ฅผ ์‹œ์ž‘ํ•˜๊ธฐ ์ „์— ZooKeeper ์„œ๋ฒ„๋ฅผ ์‹œ์ž‘ํ•ด์•ผ ํ•˜๋ฉฐ Kafka ๋ฐฐํฌ์™€ ํ•จ๊ป˜ ์ œ๊ณต๋˜๋Š” ๋ณด์กฐ ์Šคํฌ๋ฆฝํŠธ๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

ZooKeeper๊ฐ€ ์„ฑ๊ณต์ ์œผ๋กœ ์‹œ์ž‘๋œ ํ›„ ๋ณ„๋„์˜ ํ„ฐ๋ฏธ๋„์—์„œ Kafka ์„œ๋ฒ„๋ฅผ ์‹œ์ž‘ํ•ฉ๋‹ˆ๋‹ค.

bin/kafka-server-start.sh config/server.properties

Transaction์ด๋ผ๋Š” ์ƒˆ ์ฃผ์ œ๋ฅผ ๋งŒ๋“ค์–ด ๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

ํ•„์š”ํ•œ ์ˆ˜์˜ ํŒŒํ‹ฐ์…˜๊ณผ ๋ณต์ œ๊ฐ€ ํฌํ•จ๋œ ์ฃผ์ œ๊ฐ€ ์ƒ์„ฑ๋˜์—ˆ๋Š”์ง€ ํ™•์ธํ•ด ๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Spark Streaming์„ ์‚ฌ์šฉํ•œ Apache Kafka ๋ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ

์ƒˆ๋กœ ์ƒ์„ฑ๋œ ์ฃผ์ œ์— ๋Œ€ํ•ด ์ƒ์‚ฐ์ž์™€ ์†Œ๋น„์ž๋ฅผ ํ…Œ์ŠคํŠธํ•˜๋Š” ์ˆœ๊ฐ„์„ ๋†“์น˜์ง€ ๋งˆ์„ธ์š”. ๋ฉ”์‹œ์ง€ ๋ณด๋‚ด๊ธฐ ๋ฐ ๋ฐ›๊ธฐ๋ฅผ ํ…Œ์ŠคํŠธํ•˜๋Š” ๋ฐฉ๋ฒ•์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ ๊ณต์‹ ๋ฌธ์„œ์— ๊ธฐ๋ก๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค. ๋ฉ”์‹œ์ง€ ๋ณด๋‚ด๊ธฐ. ์ด์ œ KafkaProducer API๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Python์œผ๋กœ ์ƒ์‚ฐ์ž๋ฅผ ์ž‘์„ฑํ•˜๋Š” ๋‹จ๊ณ„๋กœ ๋„˜์–ด๊ฐ‘๋‹ˆ๋‹ค.

ํ”„๋กœ๋“€์„œ ๊ธ€์“ฐ๊ธฐ

์ƒ์‚ฐ์ž๋Š” ๋ฌด์ž‘์œ„ ๋ฐ์ดํ„ฐ(๋งค์ดˆ 100๊ฐœ์˜ ๋ฉ”์‹œ์ง€)๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. ๋ฌด์ž‘์œ„ ๋ฐ์ดํ„ฐ๋ž€ ์„ธ ๊ฐ€์ง€ ํ•„๋“œ๋กœ ๊ตฌ์„ฑ๋œ ์‚ฌ์ „์„ ์˜๋ฏธํ•ฉ๋‹ˆ๋‹ค.

  • ์ง€์‚ฌ - ์‹ ์šฉ ๊ธฐ๊ด€์˜ ํŒ๋งค ์ง€์  ์ด๋ฆ„
  • ํ™˜์œจ โ€” ๊ฑฐ๋ž˜ ํ†ตํ™”
  • ๊ธˆ์•ก โ€” ๊ฑฐ๋ž˜ ๊ธˆ์•ก. ๊ธˆ์•ก์€ ์€ํ–‰์ด ํ†ตํ™”๋ฅผ ๊ตฌ๋งคํ•˜๋Š” ๊ฒฝ์šฐ ์–‘์ˆ˜, ํŒ๋งค์ธ ๊ฒฝ์šฐ ์Œ์ˆ˜๊ฐ€ ๋ฉ๋‹ˆ๋‹ค.

์ƒ์‚ฐ์ž์˜ ์ฝ”๋“œ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

๋‹ค์Œ์œผ๋กœ, send ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ JSON ํ˜•์‹์œผ๋กœ ํ•„์š”ํ•œ ์ฃผ์ œ์— ๋Œ€ํ•œ ๋ฉ”์‹œ์ง€๋ฅผ ์„œ๋ฒ„์— ๋ณด๋ƒ…๋‹ˆ๋‹ค.

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

์Šคํฌ๋ฆฝํŠธ๋ฅผ ์‹คํ–‰ํ•˜๋ฉด ํ„ฐ๋ฏธ๋„์— ๋‹ค์Œ ๋ฉ”์‹œ์ง€๊ฐ€ ํ‘œ์‹œ๋ฉ๋‹ˆ๋‹ค.

Spark Streaming์„ ์‚ฌ์šฉํ•œ Apache Kafka ๋ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ

์ด๋Š” ๋ชจ๋“  ๊ฒƒ์ด ์šฐ๋ฆฌ๊ฐ€ ์›ํ•˜๋Š” ๋Œ€๋กœ ์ž‘๋™ํ•œ๋‹ค๋Š” ๊ฒƒ์„ ์˜๋ฏธํ•ฉ๋‹ˆ๋‹ค. ์ƒ์‚ฐ์ž๋Š” ์šฐ๋ฆฌ๊ฐ€ ํ•„์š”ํ•œ ์ฃผ์ œ์— ๋ฉ”์‹œ์ง€๋ฅผ ์ƒ์„ฑํ•˜๊ณ  ๋ณด๋ƒ…๋‹ˆ๋‹ค.
๋‹ค์Œ ๋‹จ๊ณ„๋Š” Spark๋ฅผ ์„ค์น˜ํ•˜๊ณ  ์ด ๋ฉ”์‹œ์ง€ ์ŠคํŠธ๋ฆผ์„ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

์•„ํŒŒ์น˜ ์ŠคํŒŒํฌ ์„ค์น˜

์•„ํŒŒ์น˜ ์ŠคํŒŒํฌ ๋ฒ”์šฉ ๊ณ ์„ฑ๋Šฅ ํด๋Ÿฌ์Šคํ„ฐ ์ปดํ“จํŒ… ํ”Œ๋žซํผ์ž…๋‹ˆ๋‹ค.

Spark๋Š” ๋Œ€ํ™”ํ˜• ์ฟผ๋ฆฌ ๋ฐ ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ๋ฅผ ํฌํ•จํ•˜์—ฌ ๊ด‘๋ฒ”์œ„ํ•œ ๊ณ„์‚ฐ ์œ ํ˜•์„ ์ง€์›ํ•˜๋ฉด์„œ ๋„๋ฆฌ ์‚ฌ์šฉ๋˜๋Š” MapReduce ๋ชจ๋ธ ๊ตฌํ˜„๋ณด๋‹ค ๋” ๋‚˜์€ ์„ฑ๋Šฅ์„ ๋ฐœํœ˜ํ•ฉ๋‹ˆ๋‹ค. ์†๋„๋Š” ๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•  ๋•Œ ์ค‘์š”ํ•œ ์—ญํ• ์„ ํ•ฉ๋‹ˆ๋‹ค. ๋ช‡ ๋ถ„, ๋ช‡ ์‹œ๊ฐ„์„ ๊ธฐ๋‹ค๋ฆฌ์ง€ ์•Š๊ณ ๋„ ๋Œ€ํ™”ํ˜•์œผ๋กœ ์ž‘์—…ํ•  ์ˆ˜ ์žˆ๊ฒŒ ํ•ด์ฃผ๋Š” ์†๋„์ด๊ธฐ ๋•Œ๋ฌธ์ž…๋‹ˆ๋‹ค. Spark๋ฅผ ๋งค์šฐ ๋น ๋ฅด๊ฒŒ ๋งŒ๋“œ๋Š” ๊ฐ€์žฅ ํฐ ์žฅ์  ์ค‘ ํ•˜๋‚˜๋Š” ๋ฉ”๋ชจ๋ฆฌ ๋‚ด ๊ณ„์‚ฐ์„ ์ˆ˜ํ–‰ํ•˜๋Š” ๋Šฅ๋ ฅ์ž…๋‹ˆ๋‹ค.

์ด ํ”„๋ ˆ์ž„์›Œํฌ๋Š” Scala๋กœ ์ž‘์„ฑ๋˜์—ˆ์œผ๋ฏ€๋กœ ๋จผ์ € ์„ค์น˜ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

sudo apt-get install scala

๊ณต์‹ ์›น์‚ฌ์ดํŠธ์—์„œ Spark ๋ฐฐํฌํŒ์„ ๋‹ค์šด๋กœ๋“œํ•˜์„ธ์š”.

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

์•„์นด์ด๋ธŒ์˜ ์••์ถ•์„ ํ’‰๋‹ˆ๋‹ค.

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

bash ํŒŒ์ผ์— Spark ๊ฒฝ๋กœ๋ฅผ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.

vim ~/.bashrc

ํŽธ์ง‘๊ธฐ๋ฅผ ํ†ตํ•ด ๋‹ค์Œ ์ค„์„ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

bashrc๋ฅผ ๋ณ€๊ฒฝํ•œ ํ›„ ์•„๋ž˜ ๋ช…๋ น์„ ์‹คํ–‰ํ•˜์‹ญ์‹œ์˜ค.

source ~/.bashrc

AWS PostgreSQL ๋ฐฐํฌ

๋‚จ์€ ๊ฒƒ์€ ์ŠคํŠธ๋ฆผ์—์„œ ์ฒ˜๋ฆฌ๋œ ์ •๋ณด๋ฅผ ์—…๋กœ๋“œํ•  ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๋ฅผ ๋ฐฐํฌํ•˜๋Š” ๊ฒƒ๋ฟ์ž…๋‹ˆ๋‹ค. ์ด๋ฅผ ์œ„ํ•ด AWS RDS ์„œ๋น„์Šค๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

AWS ์ฝ˜์†” -> AWS RDS -> ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค -> ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ƒ์„ฑ์œผ๋กœ ์ด๋™ํ•ฉ๋‹ˆ๋‹ค.
Spark Streaming์„ ์‚ฌ์šฉํ•œ Apache Kafka ๋ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ

PostgreSQL์„ ์„ ํƒํ•˜๊ณ  ๋‹ค์Œ์„ ํด๋ฆญํ•ฉ๋‹ˆ๋‹ค.
Spark Streaming์„ ์‚ฌ์šฉํ•œ Apache Kafka ๋ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ

์™œ๋ƒํ•˜๋ฉด ์ด ์˜ˆ๋Š” ๊ต์œก ๋ชฉ์ ์œผ๋กœ๋งŒ ์‚ฌ์šฉ๋˜๋ฉฐ "์ตœ์†Œ"(๋ฌด๋ฃŒ ๊ณ„์ธต) ๋ฌด๋ฃŒ ์„œ๋ฒ„๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.
Spark Streaming์„ ์‚ฌ์šฉํ•œ Apache Kafka ๋ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ

๋‹ค์Œ์œผ๋กœ, ๋ฌด๋ฃŒ ๊ณ„์ธต ๋ธ”๋ก์— ์ฒดํฌ ํ‘œ์‹œ๋ฅผ ํ•˜๋ฉด t2.micro ํด๋ž˜์Šค์˜ ์ธ์Šคํ„ด์Šค๊ฐ€ ์ž๋™์œผ๋กœ ์ œ๊ณต๋ฉ๋‹ˆ๋‹ค. ๋น„๋ก ์•ฝํ•˜๊ธฐ๋Š” ํ•˜์ง€๋งŒ ๋ฌด๋ฃŒ์ด๋ฉฐ ์šฐ๋ฆฌ ์ž‘์—…์— ๋งค์šฐ ์ ํ•ฉํ•ฉ๋‹ˆ๋‹ค.
Spark Streaming์„ ์‚ฌ์šฉํ•œ Apache Kafka ๋ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ

๋‹ค์Œ์€ ๋งค์šฐ ์ค‘์š”ํ•œ ์‚ฌํ•ญ์ž…๋‹ˆ๋‹ค. ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ธ์Šคํ„ด์Šค ์ด๋ฆ„, ๋งˆ์Šคํ„ฐ ์‚ฌ์šฉ์ž ์ด๋ฆ„ ๋ฐ ๋น„๋ฐ€๋ฒˆํ˜ธ์ž…๋‹ˆ๋‹ค. ์ธ์Šคํ„ด์Šค ์ด๋ฆ„์„ myHabrTest, ๋งˆ์Šคํ„ฐ ์‚ฌ์šฉ์ž๋กœ ์ง€์ •ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค. ํ•˜๋ธŒ๋ฅด, ๋น„๋ฐ€๋ฒˆํ˜ธ: ํ•˜๋ธŒ๋ฅด12345 ๋‹ค์Œ ๋ฒ„ํŠผ์„ ํด๋ฆญํ•˜์„ธ์š”.
Spark Streaming์„ ์‚ฌ์šฉํ•œ Apache Kafka ๋ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ

๋‹ค์Œ ํŽ˜์ด์ง€์—๋Š” ์™ธ๋ถ€์—์„œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์„œ๋ฒ„์— ๋Œ€ํ•œ ์ ‘๊ทผ์„ฑ(๊ณต์šฉ ์ ‘๊ทผ์„ฑ) ๋ฐ ํฌํŠธ ๊ฐ€์šฉ์„ฑ์„ ๋‹ด๋‹นํ•˜๋Š” ๋งค๊ฐœ๋ณ€์ˆ˜๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.

Spark Streaming์„ ์‚ฌ์šฉํ•œ Apache Kafka ๋ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ

ํฌํŠธ 5432(PostgreSQL)๋ฅผ ํ†ตํ•ด ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์„œ๋ฒ„์— ๋Œ€ํ•œ ์™ธ๋ถ€ ์•ก์„ธ์Šค๋ฅผ ํ—ˆ์šฉํ•˜๋Š” VPC ๋ณด์•ˆ ๊ทธ๋ฃน์— ๋Œ€ํ•œ ์ƒˆ ์„ค์ •์„ ์ƒ์„ฑํ•ด ๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.
๋ณ„๋„์˜ ๋ธŒ๋ผ์šฐ์ € ์ฐฝ์—์„œ AWS ์ฝ˜์†”๋กœ ์ด๋™ํ•˜์—ฌ VPC ๋Œ€์‹œ๋ณด๋“œ -> ๋ณด์•ˆ ๊ทธ๋ฃน -> ๋ณด์•ˆ ๊ทธ๋ฃน ์ƒ์„ฑ ์„น์…˜์„ ์‚ดํŽด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.
Spark Streaming์„ ์‚ฌ์šฉํ•œ Apache Kafka ๋ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ

๋ณด์•ˆ ๊ทธ๋ฃน์˜ ์ด๋ฆ„์ธ PostgreSQL์„ ์„ค๋ช…์œผ๋กœ ์„ค์ •ํ•˜๊ณ  ์ด ๊ทธ๋ฃน์ด ์—ฐ๊ฒฐ๋˜์–ด์•ผ ํ•˜๋Š” VPC๋ฅผ ๋‚˜ํƒ€๋‚ด๊ณ  ์ƒ์„ฑ ๋ฒ„ํŠผ์„ ํด๋ฆญํ•ฉ๋‹ˆ๋‹ค.
Spark Streaming์„ ์‚ฌ์šฉํ•œ Apache Kafka ๋ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ

์•„๋ž˜ ๊ทธ๋ฆผ๊ณผ ๊ฐ™์ด ์ƒˆ๋กœ ์ƒ์„ฑ๋œ ๊ทธ๋ฃน์— ๋Œ€ํ•ด ํฌํŠธ 5432์— ๋Œ€ํ•œ ์ธ๋ฐ”์šด๋“œ ๊ทœ์น™์„ ์ž…๋ ฅํ•ฉ๋‹ˆ๋‹ค. ํฌํŠธ๋ฅผ ์ˆ˜๋™์œผ๋กœ ์ง€์ •ํ•  ์ˆ˜๋Š” ์—†์ง€๋งŒ ์œ ํ˜• ๋“œ๋กญ๋‹ค์šด ๋ชฉ๋ก์—์„œ PostgreSQL์„ ์„ ํƒํ•ฉ๋‹ˆ๋‹ค.

์—„๋ฐ€ํžˆ ๋งํ•˜๋ฉด ::/0 ๊ฐ’์€ ์ „ ์„ธ๊ณ„์—์„œ ์„œ๋ฒ„๋กœ ๋“ค์–ด์˜ค๋Š” ํŠธ๋ž˜ํ”ฝ์˜ ๊ฐ€์šฉ์„ฑ์„ ์˜๋ฏธํ•ฉ๋‹ˆ๋‹ค. ์ด๋Š” ์ •์‹์œผ๋กœ ์™„์ „ํžˆ ์‚ฌ์‹ค์€ ์•„๋‹ˆ์ง€๋งŒ ์˜ˆ์ œ๋ฅผ ๋ถ„์„ํ•˜๊ธฐ ์œ„ํ•ด ๋‹ค์Œ ์ ‘๊ทผ ๋ฐฉ์‹์„ ์‚ฌ์šฉํ•ด ๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.
Spark Streaming์„ ์‚ฌ์šฉํ•œ Apache Kafka ๋ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ

"๊ณ ๊ธ‰ ์„ค์ • ๊ตฌ์„ฑ"์ด ์—ด๋ ค ์žˆ๋Š” ๋ธŒ๋ผ์šฐ์ € ํŽ˜์ด์ง€๋กœ ๋Œ์•„๊ฐ€์„œ VPC ๋ณด์•ˆ ๊ทธ๋ฃน ์„น์…˜ -> ๊ธฐ์กด VPC ๋ณด์•ˆ ๊ทธ๋ฃน ์„ ํƒ -> PostgreSQL์„ ์„ ํƒํ•ฉ๋‹ˆ๋‹ค.
Spark Streaming์„ ์‚ฌ์šฉํ•œ Apache Kafka ๋ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ

๋‹ค์Œ์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์˜ต์…˜ -> ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ด๋ฆ„ -> ์ด๋ฆ„ ์„ค์ • - habrDB.

๊ธฐ๋ณธ์ ์œผ๋กœ ๋ฐฑ์—… ๋น„ํ™œ์„ฑํ™”(๋ฐฑ์—… ๋ณด์กด ๊ธฐ๊ฐ„ - 0์ผ), ๋ชจ๋‹ˆํ„ฐ๋ง ๋ฐ ์„ฑ๋Šฅ ๊ฐœ์„  ๋„์šฐ๋ฏธ๋ฅผ ์ œ์™ธํ•œ ๋‚˜๋จธ์ง€ ๋งค๊ฐœ ๋ณ€์ˆ˜๋Š” ๊ทธ๋Œ€๋กœ ๋‘˜ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋ฒ„ํŠผ์„ ํด๋ฆญํ•˜์„ธ์š” ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ƒ์„ฑ:
Spark Streaming์„ ์‚ฌ์šฉํ•œ Apache Kafka ๋ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ

์Šค๋ ˆ๋“œ ํ•ธ๋“ค๋Ÿฌ

๋งˆ์ง€๋ง‰ ๋‹จ๊ณ„๋Š” XNUMX์ดˆ๋งˆ๋‹ค Kafka์—์„œ ๋“ค์–ด์˜ค๋Š” ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ณ  ๊ฒฐ๊ณผ๋ฅผ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์ž…๋ ฅํ•˜๋Š” Spark ์ž‘์—…์˜ ๊ฐœ๋ฐœ์ž…๋‹ˆ๋‹ค.

์œ„์—์„œ ์–ธ๊ธ‰ํ•œ ๊ฒƒ์ฒ˜๋Ÿผ ์ฒดํฌํฌ์ธํŠธ๋Š” ๋‚ด๊ฒฐํ•จ์„ฑ์„ ๋ณด์žฅํ•˜๊ธฐ ์œ„ํ•ด ๊ตฌ์„ฑํ•ด์•ผ ํ•˜๋Š” SparkStreaming์˜ ํ•ต์‹ฌ ๋ฉ”์ปค๋‹ˆ์ฆ˜์ž…๋‹ˆ๋‹ค. ์ฒดํฌํฌ์ธํŠธ๋ฅผ ์‚ฌ์šฉํ•  ๊ฒƒ์ด๋ฉฐ, ์ ˆ์ฐจ๊ฐ€ ์‹คํŒจํ•  ๊ฒฝ์šฐ Spark Streaming ๋ชจ๋“ˆ์€ ๋งˆ์ง€๋ง‰ ์ฒดํฌํฌ์ธํŠธ๋กœ ๋Œ์•„๊ฐ€ ๊ณ„์‚ฐ์„ ์žฌ๊ฐœํ•˜๊ธฐ๋งŒ ํ•˜๋ฉด ์†์‹ค๋œ ๋ฐ์ดํ„ฐ๋ฅผ ๋ณต๊ตฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ฒดํฌํฌ์ธํŠธ ์ •๋ณด๊ฐ€ ์ €์žฅ๋  ๋‚ด๊ฒฐํ•จ์„ฑ, ์‹ ๋ขฐ์„ฑ ์žˆ๋Š” ํŒŒ์ผ ์‹œ์Šคํ…œ(์˜ˆ: HDFS, S3 ๋“ฑ)์— ๋””๋ ‰ํ„ฐ๋ฆฌ๋ฅผ ์„ค์ •ํ•˜์—ฌ ์ฒดํฌํฌ์ธํŠธ๋ฅผ ํ™œ์„ฑํ™”ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋Š” ์˜ˆ๋ฅผ ๋“ค์–ด ๋‹ค์Œ์„ ์‚ฌ์šฉํ•˜์—ฌ ์ˆ˜ํ–‰๋ฉ๋‹ˆ๋‹ค.

streamingContext.checkpoint(checkpointDirectory)

์ด ์˜ˆ์—์„œ๋Š” ๋‹ค์Œ ์ ‘๊ทผ ๋ฐฉ์‹์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ์ฆ‰, checkpointDirectory๊ฐ€ ์กด์žฌํ•˜๋Š” ๊ฒฝ์šฐ ์ฒดํฌํฌ์ธํŠธ ๋ฐ์ดํ„ฐ์—์„œ ์ปจํ…์ŠคํŠธ๊ฐ€ ๋‹ค์‹œ ์ƒ์„ฑ๋ฉ๋‹ˆ๋‹ค. ๋””๋ ‰ํ„ฐ๋ฆฌ๊ฐ€ ์กด์žฌํ•˜์ง€ ์•Š๋Š” ๊ฒฝ์šฐ(์ฆ‰, ์ฒ˜์Œ์œผ๋กœ ์‹คํ–‰๋œ ๊ฒฝ์šฐ) functionToCreateContext๊ฐ€ ํ˜ธ์ถœ๋˜์–ด ์ƒˆ ์ปจํ…์ŠคํŠธ๋ฅผ ์ƒ์„ฑํ•˜๊ณ  DStreams๋ฅผ ๊ตฌ์„ฑํ•ฉ๋‹ˆ๋‹ค.

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

KafkaUtils ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์˜ createDirectStream ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ "ํŠธ๋žœ์žญ์…˜" ์ฃผ์ œ์— ์—ฐ๊ฒฐํ•˜๊ธฐ ์œ„ํ•œ DirectStream ๊ฐœ์ฒด๋ฅผ ๋งŒ๋“ญ๋‹ˆ๋‹ค.

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

JSON ํ˜•์‹์œผ๋กœ ๋“ค์–ด์˜ค๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ๊ตฌ๋ฌธ ๋ถ„์„ํ•ฉ๋‹ˆ๋‹ค.

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Spark SQL์„ ์‚ฌ์šฉํ•˜์—ฌ ๊ฐ„๋‹จํ•œ ๊ทธ๋ฃนํ™”๋ฅผ ์ˆ˜ํ–‰ํ•˜๊ณ  ๊ฒฐ๊ณผ๋ฅผ ์ฝ˜์†”์— ํ‘œ์‹œํ•ฉ๋‹ˆ๋‹ค.

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

์ฟผ๋ฆฌ ํ…์ŠคํŠธ๋ฅผ ๊ฐ€์ ธ์™€ Spark SQL์„ ํ†ตํ•ด ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

๊ทธ๋Ÿฐ ๋‹ค์Œ ๊ฒฐ๊ณผ ์ง‘๊ณ„ ๋ฐ์ดํ„ฐ๋ฅผ AWS RDS์˜ ํ…Œ์ด๋ธ”์— ์ €์žฅํ•ฉ๋‹ˆ๋‹ค. ์ง‘๊ณ„ ๊ฒฐ๊ณผ๋ฅผ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ํ…Œ์ด๋ธ”์— ์ €์žฅํ•˜๊ธฐ ์œ„ํ•ด DataFrame ๊ฐ์ฒด์˜ write ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

AWS RDS์— ๋Œ€ํ•œ ์—ฐ๊ฒฐ ์„ค์ •์— ๋Œ€ํ•œ ๋ช‡ ๋งˆ๋””์ž…๋‹ˆ๋‹ค. โ€œAWS PostgreSQL ๋ฐฐํฌโ€ ๋‹จ๊ณ„์—์„œ ์ด์— ๋Œ€ํ•œ ์‚ฌ์šฉ์ž์™€ ๋น„๋ฐ€๋ฒˆํ˜ธ๋ฅผ ์ƒ์„ฑํ–ˆ์Šต๋‹ˆ๋‹ค. ์—ฐ๊ฒฐ ๋ฐ ๋ณด์•ˆ ์„น์…˜์— ํ‘œ์‹œ๋˜๋Š” ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์„œ๋ฒ„ URL๋กœ Endpoint๋ฅผ ์‚ฌ์šฉํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

Spark Streaming์„ ์‚ฌ์šฉํ•œ Apache Kafka ๋ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ

Spark์™€ Kafka๋ฅผ ์˜ฌ๋ฐ”๋ฅด๊ฒŒ ์—ฐ๊ฒฐํ•˜๋ ค๋ฉด ์•„ํ‹ฐํŒฉํŠธ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ smark-submit์„ ํ†ตํ•ด ์ž‘์—…์„ ์‹คํ–‰ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ์ŠคํŒŒํฌ ์ŠคํŠธ๋ฆฌ๋ฐ-kafka-0-8_2.11. ๋˜ํ•œ PostgreSQL ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์™€ ์ƒํ˜ธ์ž‘์šฉํ•˜๊ธฐ ์œ„ํ•œ ์•„ํ‹ฐํŒฉํŠธ๋„ ์‚ฌ์šฉํ•  ๊ฒƒ์ด๋ฉฐ --packages๋ฅผ ํ†ตํ•ด ์ด๋ฅผ ์ „์†กํ•  ๊ฒƒ์ž…๋‹ˆ๋‹ค.

์Šคํฌ๋ฆฝํŠธ์˜ ์œ ์—ฐ์„ฑ์„ ์œ„ํ•ด ๋ฉ”์‹œ์ง€ ์„œ๋ฒ„์˜ ์ด๋ฆ„๊ณผ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์‹ ํ•˜๋ ค๋Š” ์ฃผ์ œ๋„ ์ž…๋ ฅ ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ํฌํ•จํ•ฉ๋‹ˆ๋‹ค.

์ด์ œ ์‹œ์Šคํ…œ ๊ธฐ๋Šฅ์„ ์‹คํ–‰ํ•˜๊ณ  ํ™•์ธํ•  ์ฐจ๋ก€์ž…๋‹ˆ๋‹ค.

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

๋ชจ๋“  ๊ฒƒ์ด ํ•ด๊ฒฐ๋˜์—ˆ์Šต๋‹ˆ๋‹ค! ์•„๋ž˜ ๊ทธ๋ฆผ์—์„œ ๋ณผ ์ˆ˜ ์žˆ๋“ฏ์ด, StreamingContext ๊ฐ์ฒด๋ฅผ ์ƒ์„ฑํ•  ๋•Œ ์ผ๊ด„ ์ฒ˜๋ฆฌ ๊ฐ„๊ฒฉ์„ 2์ดˆ๋กœ ์„ค์ •ํ–ˆ๊ธฐ ๋•Œ๋ฌธ์— ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์ด ์‹คํ–‰๋˜๋Š” ๋™์•ˆ 2์ดˆ๋งˆ๋‹ค ์ƒˆ๋กœ์šด ์ง‘๊ณ„ ๊ฒฐ๊ณผ๊ฐ€ ์ถœ๋ ฅ๋ฉ๋‹ˆ๋‹ค.

Spark Streaming์„ ์‚ฌ์šฉํ•œ Apache Kafka ๋ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ

๋‹ค์Œ์œผ๋กœ, ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ๊ฐ„๋‹จํ•œ ์ฟผ๋ฆฌ๋ฅผ ๋งŒ๋“ค์–ด ํ…Œ์ด๋ธ”์— ๋ ˆ์ฝ”๋“œ๊ฐ€ ์žˆ๋Š”์ง€ ํ™•์ธํ•ฉ๋‹ˆ๋‹ค. transaction_flow:

Spark Streaming์„ ์‚ฌ์šฉํ•œ Apache Kafka ๋ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ

๊ฒฐ๋ก 

์ด ๊ธฐ์‚ฌ์—์„œ๋Š” Apache Kafka ๋ฐ PostgreSQL๊ณผ ํ•จ๊ป˜ Spark Streaming์„ ์‚ฌ์šฉํ•˜์—ฌ ์ •๋ณด ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ์˜ ์˜ˆ๋ฅผ ์‚ดํŽด๋ณด์•˜์Šต๋‹ˆ๋‹ค. ๋‹ค์–‘ํ•œ ์†Œ์Šค์˜ ๋ฐ์ดํ„ฐ๊ฐ€ ์ฆ๊ฐ€ํ•จ์— ๋”ฐ๋ผ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ ์‹ค์‹œ๊ฐ„ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์ œ์ž‘์— ์žˆ์–ด Spark Streaming์˜ ์‹ค์งˆ์ ์ธ ๊ฐ€์น˜๋ฅผ ๊ณผ๋Œ€ํ‰๊ฐ€ํ•˜๊ธฐ๋Š” ์–ด๋ ต์Šต๋‹ˆ๋‹ค.

๋‚ด ์ €์žฅ์†Œ์—์„œ ์ „์ฒด ์†Œ์Šค ์ฝ”๋“œ๋ฅผ ์ฐพ์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. GitHub์˜.

๋‚˜๋Š” ์ด ๊ธฐ์‚ฌ์— ๋Œ€ํ•ด ํ† ๋ก ํ•˜๊ฒŒ ๋˜์–ด ๊ธฐ์˜๊ณ , ์—ฌ๋Ÿฌ๋ถ„์˜ ์˜๊ฒฌ์„ ๊ธฐ๋Œ€ํ•˜๋ฉฐ, ๋˜ํ•œ ๊ด€์‹ฌ์„ ๊ฐ–๊ณ  ์žˆ๋Š” ๋ชจ๋“  ๋…์ž๋“ค์˜ ๊ฑด์„ค์ ์ธ ๋น„ํŒ์„ ๋ฐ”๋ž๋‹ˆ๋‹ค.

๋‚œ ๋‹น์‹ ์—๊ฒŒ ์„ฑ๊ณต์„ ๊ธฐ์›ํ•ฉ๋‹ˆ๋‹ค!

์‹œ. ์ฒ˜์Œ์—๋Š” ๋กœ์ปฌ PostgreSQL ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๋ฅผ ์‚ฌ์šฉํ•  ๊ณ„ํš์ด์—ˆ์ง€๋งŒ AWS์— ๋Œ€ํ•œ ์• ์ •์„ ๊ณ ๋ คํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๋ฅผ ํด๋ผ์šฐ๋“œ๋กœ ์ด๋™ํ•˜๊ธฐ๋กœ ๊ฒฐ์ •ํ–ˆ์Šต๋‹ˆ๋‹ค. ์ด ์ฃผ์ œ์— ๋Œ€ํ•œ ๋‹ค์Œ ๊ธฐ์‚ฌ์—์„œ๋Š” AWS Kinesis ๋ฐ AWS EMR์„ ์‚ฌ์šฉํ•˜์—ฌ AWS์—์„œ ์œ„์—์„œ ์„ค๋ช…ํ•œ ์ „์ฒด ์‹œ์Šคํ…œ์„ ๊ตฌํ˜„ํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๋ณด์—ฌ ๋“œ๋ฆฌ๊ฒ ์Šต๋‹ˆ๋‹ค. ๋‰ด์Šค๋ฅผ ํŒ”๋กœ์šฐํ•˜์„ธ์š”!

์ถœ์ฒ˜ : habr.com

์ฝ”๋ฉ˜ํŠธ๋ฅผ ์ถ”๊ฐ€