์๋ , ํ๋ธ๋ฅด! ์ค๋์ Spark Streaming์ ์ฌ์ฉํ์ฌ Apache Kafka ๋ฉ์์ง ์คํธ๋ฆผ์ ์ฒ๋ฆฌํ๊ณ ์ฒ๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ AWS RDS ํด๋ผ์ฐ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ฐ๋ ์์คํ ์ ๊ตฌ์ถํ๊ฒ ์ต๋๋ค.
ํน์ ์ ์ฉ ๊ธฐ๊ด์ด ๋ชจ๋ ์ง์ ์์ "์ฆ์" ๋ค์ด์ค๋ ๊ฑฐ๋๋ฅผ ์ฒ๋ฆฌํ๋ ์์ ์ ์ฐ๋ฆฌ์๊ฒ ์ง์ ํ๋ค๊ณ ๊ฐ์ ํด ๋ณด๊ฒ ์ต๋๋ค. ์ด๋ ์ฌ๋ฌด๋ถ์ ๊ณต๊ฐ ํตํ ํฌ์ง์ , ๊ฑฐ๋ ํ๋ ๋๋ ์ฌ๋ฌด ๊ฒฐ๊ณผ ๋ฑ์ ์ ์ํ๊ฒ ๊ณ์ฐํ๊ธฐ ์ํ ๋ชฉ์ ์ผ๋ก ์ํ๋ ์ ์์ต๋๋ค.
๋ง๋ฒ๊ณผ ๋ง๋ฒ ์ฃผ๋ฌธ์ ์ฌ์ฉํ์ง ์๊ณ ์ด ์ฌ๋ก๋ฅผ ๊ตฌํํ๋ ๋ฐฉ๋ฒ - ์๋ ๋ด์ฉ์ ์ฝ์ด๋ณด์ธ์! ๊ฐ๋ค!
์๊ฐ
๋ฌผ๋ก ๋๋์ ๋ฐ์ดํฐ๋ฅผ ์ค์๊ฐ์ผ๋ก ์ฒ๋ฆฌํ๋ฉด ํ๋ ์์คํ ์์ ์ฌ์ฉํ ์ ์๋ ์ถฉ๋ถํ ๊ธฐํ๊ฐ ์ ๊ณต๋ฉ๋๋ค. ๊ฐ์ฅ ๋๋ฆฌ ์ฌ์ฉ๋๋ ์กฐํฉ ์ค ํ๋๋ Apache Kafka์ Spark Streaming์ ์ง๋ ฌ ์กฐํฉ์ ๋๋ค. ์ฌ๊ธฐ์ Kafka๋ ์์ ๋ฉ์์ง ํจํท์ ์คํธ๋ฆผ์ ์์ฑํ๊ณ Spark Streaming์ ์ง์ ๋ ์๊ฐ ๊ฐ๊ฒฉ์ผ๋ก ์ด๋ฌํ ํจํท์ ์ฒ๋ฆฌํฉ๋๋ค.
์ ํ๋ฆฌ์ผ์ด์ ์ ๋ด๊ฒฐํจ์ฑ์ ๋์ด๊ธฐ ์ํด ์ฒดํฌํฌ์ธํธ๋ฅผ ์ฌ์ฉํฉ๋๋ค. ์ด ๋ฉ์ปค๋์ฆ์ ์ฌ์ฉํ๋ฉด Spark Streaming ์์ง์ด ์์ค๋ ๋ฐ์ดํฐ๋ฅผ ๋ณต๊ตฌํด์ผ ํ ๋ ๋ง์ง๋ง ์ฒดํฌํฌ์ธํธ๋ก ๋์๊ฐ์ ๊ฑฐ๊ธฐ์์ ๊ณ์ฐ์ ์ฌ๊ฐํ๊ธฐ๋ง ํ๋ฉด ๋ฉ๋๋ค.
๊ฐ๋ฐ๋ ์์คํ ์ ์ํคํ ์ฒ
์ฌ์ฉ๋ ๊ตฌ์ฑ ์์:
์ํ์น ์นดํ์นด ๋ถ์ฐ ๊ฒ์-๊ตฌ๋ ๋ฉ์์ง ์์คํ ์ ๋๋ค. ์คํ๋ผ์ธ ๋ฐ ์จ๋ผ์ธ ๋ฉ์์ง ์๋น ๋ชจ๋์ ์ ํฉํฉ๋๋ค. ๋ฐ์ดํฐ ์์ค์ ๋ฐฉ์งํ๊ธฐ ์ํด Kafka ๋ฉ์์ง๋ ๋์คํฌ์ ์ ์ฅ๋๊ณ ํด๋ฌ์คํฐ ๋ด์ ๋ณต์ ๋ฉ๋๋ค. Kafka ์์คํ ์ ZooKeeper ๋๊ธฐํ ์๋น์ค ์์ ๊ตฌ์ถ๋์์ต๋๋ค.์ํ์น ์คํํฌ ์คํธ๋ฆฌ๋ฐ - ์คํธ๋ฆฌ๋ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ์ํ Spark ๊ตฌ์ฑ ์์์ ๋๋ค. Spark Streaming ๋ชจ๋์ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ด ์์ ๋ฐ์ดํฐ ํจํท์ ์ฐ์ ์ํ์ค๋ก ํด์๋๋ ๋ง์ดํฌ๋ก ๋ฐฐ์น ์ํคํ ์ฒ๋ฅผ ์ฌ์ฉํ์ฌ ๊ตฌ์ถ๋์์ต๋๋ค. Spark Streaming์ ๋ค์ํ ์์ค์์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ ์์ ํจํค์ง๋ก ๊ฒฐํฉํฉ๋๋ค. ์๋ก์ด ํจํค์ง๋ ์ ๊ธฐ์ ์ผ๋ก ์์ฑ๋ฉ๋๋ค. ๊ฐ ์๊ฐ ๊ฐ๊ฒฉ์ด ์์๋ ๋ ์ ํจํท์ด ์์ฑ๋๊ณ ํด๋น ๊ฐ๊ฒฉ ๋์ ์์ ๋ ๋ชจ๋ ๋ฐ์ดํฐ๊ฐ ํจํท์ ํฌํจ๋ฉ๋๋ค. ๊ฐ๊ฒฉ์ด ๋๋๋ฉด ํจํท ์ฆ๊ฐ๊ฐ ์ค์ง๋ฉ๋๋ค. ๊ฐ๊ฒฉ์ ํฌ๊ธฐ๋ ๋ฐฐ์น ๊ฐ๊ฒฉ์ด๋ผ๋ ๋งค๊ฐ๋ณ์์ ์ํด ๊ฒฐ์ ๋ฉ๋๋ค.์ํ์น ์คํํฌ SQL - ๊ด๊ณํ ์ฒ๋ฆฌ์ Spark ํจ์ํ ํ๋ก๊ทธ๋๋ฐ์ ๊ฒฐํฉํฉ๋๋ค. ๊ตฌ์กฐํ๋ ๋ฐ์ดํฐ๋ ์คํค๋ง, ์ฆ ๋ชจ๋ ๋ ์ฝ๋์ ๋ํ ๋จ์ผ ํ๋ ์งํฉ์ด ์๋ ๋ฐ์ดํฐ๋ฅผ ์๋ฏธํฉ๋๋ค. Spark SQL์ ๋ค์ํ ๊ตฌ์กฐํ๋ ๋ฐ์ดํฐ ์์ค์ ์ ๋ ฅ์ ์ง์ํ๋ฉฐ, ์คํค๋ง ์ ๋ณด์ ๊ฐ์ฉ์ฑ ๋๋ถ์ ํ์ํ ๋ ์ฝ๋ ํ๋๋ง ํจ์จ์ ์ผ๋ก ๊ฒ์ํ ์ ์์ผ๋ฉฐ DataFrame API๋ ์ ๊ณตํฉ๋๋ค.AWS RDS ์๋์ ์ผ๋ก ์ ๋ ดํ ํด๋ผ์ฐ๋ ๊ธฐ๋ฐ ๊ด๊ณํ ๋ฐ์ดํฐ๋ฒ ์ด์ค, ์ค์ , ์ด์ ๋ฐ ํ์ฅ์ ๋จ์ํํ๊ณ Amazon์์ ์ง์ ๊ด๋ฆฌํ๋ ์น ์๋น์ค์ ๋๋ค.
Kafka ์๋ฒ ์ค์น ๋ฐ ์คํ
Kafka๋ฅผ ์ง์ ์ฌ์ฉํ๊ธฐ ์ ์ Java๊ฐ ์๋์ง ํ์ธํด์ผ ํฉ๋๋ค. ์๋ํ๋ฉด... JVM์ ์์ ์ ์ฌ์ฉ๋ฉ๋๋ค.
sudo apt-get update
sudo apt-get install default-jre
java -version
Kafka๋ฅผ ์ฌ์ฉํ ์ ์ฌ์ฉ์๋ฅผ ๋ง๋ค์ด ๋ณด๊ฒ ์ต๋๋ค.
sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo
๋ค์์ผ๋ก ๊ณต์ Apache Kafka ์น์ฌ์ดํธ์์ ๋ฐฐํฌํ์ ๋ค์ด๋ก๋ํฉ๋๋ค.
wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"
๋ค์ด๋ก๋ํ ์์นด์ด๋ธ์ ์์ถ์ ํผ๋ค:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
๋ค์ ๋จ๊ณ๋ ์ ํ์ฌํญ์ ๋๋ค. ์ฌ์ค ๊ธฐ๋ณธ ์ค์ ์ผ๋ก๋ Apache Kafka์ ๋ชจ๋ ๊ธฐ๋ฅ์ ์์ ํ ์ฌ์ฉํ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค์ด ๋ฉ์์ง๋ฅผ ๊ฒ์ํ ์ ์๋ ์ฃผ์ , ์นดํ ๊ณ ๋ฆฌ, ๊ทธ๋ฃน์ ์ญ์ ํฉ๋๋ค. ์ด๋ฅผ ๋ณ๊ฒฝํ๋ ค๋ฉด ๊ตฌ์ฑ ํ์ผ์ ํธ์งํด ๋ณด๊ฒ ์ต๋๋ค.
vim ~/kafka/config/server.properties
ํ์ผ ๋์ ๋ค์์ ์ถ๊ฐํฉ๋๋ค.
delete.topic.enable = true
Kafka ์๋ฒ๋ฅผ ์์ํ๊ธฐ ์ ์ ZooKeeper ์๋ฒ๋ฅผ ์์ํด์ผ ํ๋ฉฐ Kafka ๋ฐฐํฌ์ ํจ๊ป ์ ๊ณต๋๋ ๋ณด์กฐ ์คํฌ๋ฆฝํธ๋ฅผ ์ฌ์ฉํฉ๋๋ค.
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
ZooKeeper๊ฐ ์ฑ๊ณต์ ์ผ๋ก ์์๋ ํ ๋ณ๋์ ํฐ๋ฏธ๋์์ Kafka ์๋ฒ๋ฅผ ์์ํฉ๋๋ค.
bin/kafka-server-start.sh config/server.properties
Transaction์ด๋ผ๋ ์ ์ฃผ์ ๋ฅผ ๋ง๋ค์ด ๋ณด๊ฒ ์ต๋๋ค.
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction
ํ์ํ ์์ ํํฐ์ ๊ณผ ๋ณต์ ๊ฐ ํฌํจ๋ ์ฃผ์ ๊ฐ ์์ฑ๋์๋์ง ํ์ธํด ๋ณด๊ฒ ์ต๋๋ค.
bin/kafka-topics.sh --describe --zookeeper localhost:2181
์๋ก ์์ฑ๋ ์ฃผ์ ์ ๋ํด ์์ฐ์์ ์๋น์๋ฅผ ํ
์คํธํ๋ ์๊ฐ์ ๋์น์ง ๋ง์ธ์. ๋ฉ์์ง ๋ณด๋ด๊ธฐ ๋ฐ ๋ฐ๊ธฐ๋ฅผ ํ
์คํธํ๋ ๋ฐฉ๋ฒ์ ๋ํ ์์ธํ ๋ด์ฉ์ ๊ณต์ ๋ฌธ์์ ๊ธฐ๋ก๋์ด ์์ต๋๋ค.
ํ๋ก๋์ ๊ธ์ฐ๊ธฐ
์์ฐ์๋ ๋ฌด์์ ๋ฐ์ดํฐ(๋งค์ด 100๊ฐ์ ๋ฉ์์ง)๋ฅผ ์์ฑํฉ๋๋ค. ๋ฌด์์ ๋ฐ์ดํฐ๋ ์ธ ๊ฐ์ง ํ๋๋ก ๊ตฌ์ฑ๋ ์ฌ์ ์ ์๋ฏธํฉ๋๋ค.
- ์ง์ฌ - ์ ์ฉ ๊ธฐ๊ด์ ํ๋งค ์ง์ ์ด๋ฆ
- ํ์จ โ ๊ฑฐ๋ ํตํ
- ๊ธ์ก โ ๊ฑฐ๋ ๊ธ์ก. ๊ธ์ก์ ์ํ์ด ํตํ๋ฅผ ๊ตฌ๋งคํ๋ ๊ฒฝ์ฐ ์์, ํ๋งค์ธ ๊ฒฝ์ฐ ์์๊ฐ ๋ฉ๋๋ค.
์์ฐ์์ ์ฝ๋๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
from numpy.random import choice, randint
def get_random_value():
new_dict = {}
branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
currency_list = ['RUB', 'USD', 'EUR', 'GBP']
new_dict['branch'] = choice(branch_list)
new_dict['currency'] = choice(currency_list)
new_dict['amount'] = randint(-100, 100)
return new_dict
๋ค์์ผ๋ก, send ๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ JSON ํ์์ผ๋ก ํ์ํ ์ฃผ์ ์ ๋ํ ๋ฉ์์ง๋ฅผ ์๋ฒ์ ๋ณด๋ ๋๋ค.
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
์คํฌ๋ฆฝํธ๋ฅผ ์คํํ๋ฉด ํฐ๋ฏธ๋์ ๋ค์ ๋ฉ์์ง๊ฐ ํ์๋ฉ๋๋ค.
์ด๋ ๋ชจ๋ ๊ฒ์ด ์ฐ๋ฆฌ๊ฐ ์ํ๋ ๋๋ก ์๋ํ๋ค๋ ๊ฒ์ ์๋ฏธํฉ๋๋ค. ์์ฐ์๋ ์ฐ๋ฆฌ๊ฐ ํ์ํ ์ฃผ์ ์ ๋ฉ์์ง๋ฅผ ์์ฑํ๊ณ ๋ณด๋
๋๋ค.
๋ค์ ๋จ๊ณ๋ Spark๋ฅผ ์ค์นํ๊ณ ์ด ๋ฉ์์ง ์คํธ๋ฆผ์ ์ฒ๋ฆฌํ๋ ๊ฒ์
๋๋ค.
์ํ์น ์คํํฌ ์ค์น
์ํ์น ์คํํฌ ๋ฒ์ฉ ๊ณ ์ฑ๋ฅ ํด๋ฌ์คํฐ ์ปดํจํ ํ๋ซํผ์ ๋๋ค.
Spark๋ ๋ํํ ์ฟผ๋ฆฌ ๋ฐ ์คํธ๋ฆผ ์ฒ๋ฆฌ๋ฅผ ํฌํจํ์ฌ ๊ด๋ฒ์ํ ๊ณ์ฐ ์ ํ์ ์ง์ํ๋ฉด์ ๋๋ฆฌ ์ฌ์ฉ๋๋ MapReduce ๋ชจ๋ธ ๊ตฌํ๋ณด๋ค ๋ ๋์ ์ฑ๋ฅ์ ๋ฐํํฉ๋๋ค. ์๋๋ ๋์ฉ๋ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ๋ ์ค์ํ ์ญํ ์ ํฉ๋๋ค. ๋ช ๋ถ, ๋ช ์๊ฐ์ ๊ธฐ๋ค๋ฆฌ์ง ์๊ณ ๋ ๋ํํ์ผ๋ก ์์ ํ ์ ์๊ฒ ํด์ฃผ๋ ์๋์ด๊ธฐ ๋๋ฌธ์ ๋๋ค. Spark๋ฅผ ๋งค์ฐ ๋น ๋ฅด๊ฒ ๋ง๋๋ ๊ฐ์ฅ ํฐ ์ฅ์ ์ค ํ๋๋ ๋ฉ๋ชจ๋ฆฌ ๋ด ๊ณ์ฐ์ ์ํํ๋ ๋ฅ๋ ฅ์ ๋๋ค.
์ด ํ๋ ์์ํฌ๋ Scala๋ก ์์ฑ๋์์ผ๋ฏ๋ก ๋จผ์ ์ค์นํด์ผ ํฉ๋๋ค.
sudo apt-get install scala
๊ณต์ ์น์ฌ์ดํธ์์ Spark ๋ฐฐํฌํ์ ๋ค์ด๋ก๋ํ์ธ์.
wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"
์์นด์ด๋ธ์ ์์ถ์ ํ๋๋ค.
sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark
bash ํ์ผ์ Spark ๊ฒฝ๋ก๋ฅผ ์ถ๊ฐํฉ๋๋ค.
vim ~/.bashrc
ํธ์ง๊ธฐ๋ฅผ ํตํด ๋ค์ ์ค์ ์ถ๊ฐํฉ๋๋ค.
SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH
bashrc๋ฅผ ๋ณ๊ฒฝํ ํ ์๋ ๋ช ๋ น์ ์คํํ์ญ์์ค.
source ~/.bashrc
AWS PostgreSQL ๋ฐฐํฌ
๋จ์ ๊ฒ์ ์คํธ๋ฆผ์์ ์ฒ๋ฆฌ๋ ์ ๋ณด๋ฅผ ์ ๋ก๋ํ ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ฅผ ๋ฐฐํฌํ๋ ๊ฒ๋ฟ์ ๋๋ค. ์ด๋ฅผ ์ํด AWS RDS ์๋น์ค๋ฅผ ์ฌ์ฉํฉ๋๋ค.
AWS ์ฝ์ -> AWS RDS -> ๋ฐ์ดํฐ๋ฒ ์ด์ค -> ๋ฐ์ดํฐ๋ฒ ์ด์ค ์์ฑ์ผ๋ก ์ด๋ํฉ๋๋ค.
PostgreSQL์ ์ ํํ๊ณ ๋ค์์ ํด๋ฆญํฉ๋๋ค.
์๋ํ๋ฉด ์ด ์๋ ๊ต์ก ๋ชฉ์ ์ผ๋ก๋ง ์ฌ์ฉ๋๋ฉฐ "์ต์"(๋ฌด๋ฃ ๊ณ์ธต) ๋ฌด๋ฃ ์๋ฒ๋ฅผ ์ฌ์ฉํฉ๋๋ค.
๋ค์์ผ๋ก, ๋ฌด๋ฃ ๊ณ์ธต ๋ธ๋ก์ ์ฒดํฌ ํ์๋ฅผ ํ๋ฉด t2.micro ํด๋์ค์ ์ธ์คํด์ค๊ฐ ์๋์ผ๋ก ์ ๊ณต๋ฉ๋๋ค. ๋น๋ก ์ฝํ๊ธฐ๋ ํ์ง๋ง ๋ฌด๋ฃ์ด๋ฉฐ ์ฐ๋ฆฌ ์์
์ ๋งค์ฐ ์ ํฉํฉ๋๋ค.
๋ค์์ ๋งค์ฐ ์ค์ํ ์ฌํญ์
๋๋ค. ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ธ์คํด์ค ์ด๋ฆ, ๋ง์คํฐ ์ฌ์ฉ์ ์ด๋ฆ ๋ฐ ๋น๋ฐ๋ฒํธ์
๋๋ค. ์ธ์คํด์ค ์ด๋ฆ์ myHabrTest, ๋ง์คํฐ ์ฌ์ฉ์๋ก ์ง์ ํ๊ฒ ์ต๋๋ค. ํ๋ธ๋ฅด, ๋น๋ฐ๋ฒํธ: ํ๋ธ๋ฅด12345 ๋ค์ ๋ฒํผ์ ํด๋ฆญํ์ธ์.
๋ค์ ํ์ด์ง์๋ ์ธ๋ถ์์ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์๋ฒ์ ๋ํ ์ ๊ทผ์ฑ(๊ณต์ฉ ์ ๊ทผ์ฑ) ๋ฐ ํฌํธ ๊ฐ์ฉ์ฑ์ ๋ด๋นํ๋ ๋งค๊ฐ๋ณ์๊ฐ ์์ต๋๋ค.
ํฌํธ 5432(PostgreSQL)๋ฅผ ํตํด ๋ฐ์ดํฐ๋ฒ ์ด์ค ์๋ฒ์ ๋ํ ์ธ๋ถ ์ก์ธ์ค๋ฅผ ํ์ฉํ๋ VPC ๋ณด์ ๊ทธ๋ฃน์ ๋ํ ์ ์ค์ ์ ์์ฑํด ๋ณด๊ฒ ์ต๋๋ค.
๋ณ๋์ ๋ธ๋ผ์ฐ์ ์ฐฝ์์ AWS ์ฝ์๋ก ์ด๋ํ์ฌ VPC ๋์๋ณด๋ -> ๋ณด์ ๊ทธ๋ฃน -> ๋ณด์ ๊ทธ๋ฃน ์์ฑ ์น์
์ ์ดํด๋ณด๊ฒ ์ต๋๋ค.
๋ณด์ ๊ทธ๋ฃน์ ์ด๋ฆ์ธ PostgreSQL์ ์ค๋ช
์ผ๋ก ์ค์ ํ๊ณ ์ด ๊ทธ๋ฃน์ด ์ฐ๊ฒฐ๋์ด์ผ ํ๋ VPC๋ฅผ ๋ํ๋ด๊ณ ์์ฑ ๋ฒํผ์ ํด๋ฆญํฉ๋๋ค.
์๋ ๊ทธ๋ฆผ๊ณผ ๊ฐ์ด ์๋ก ์์ฑ๋ ๊ทธ๋ฃน์ ๋ํด ํฌํธ 5432์ ๋ํ ์ธ๋ฐ์ด๋ ๊ท์น์ ์ ๋ ฅํฉ๋๋ค. ํฌํธ๋ฅผ ์๋์ผ๋ก ์ง์ ํ ์๋ ์์ง๋ง ์ ํ ๋๋กญ๋ค์ด ๋ชฉ๋ก์์ PostgreSQL์ ์ ํํฉ๋๋ค.
์๋ฐํ ๋งํ๋ฉด ::/0 ๊ฐ์ ์ ์ธ๊ณ์์ ์๋ฒ๋ก ๋ค์ด์ค๋ ํธ๋ํฝ์ ๊ฐ์ฉ์ฑ์ ์๋ฏธํฉ๋๋ค. ์ด๋ ์ ์์ผ๋ก ์์ ํ ์ฌ์ค์ ์๋์ง๋ง ์์ ๋ฅผ ๋ถ์ํ๊ธฐ ์ํด ๋ค์ ์ ๊ทผ ๋ฐฉ์์ ์ฌ์ฉํด ๋ณด๊ฒ ์ต๋๋ค.
"๊ณ ๊ธ ์ค์ ๊ตฌ์ฑ"์ด ์ด๋ ค ์๋ ๋ธ๋ผ์ฐ์ ํ์ด์ง๋ก ๋์๊ฐ์ VPC ๋ณด์ ๊ทธ๋ฃน ์น์
-> ๊ธฐ์กด VPC ๋ณด์ ๊ทธ๋ฃน ์ ํ -> PostgreSQL์ ์ ํํฉ๋๋ค.
๋ค์์ผ๋ก ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ต์ -> ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ด๋ฆ -> ์ด๋ฆ ์ค์ - habrDB.
๊ธฐ๋ณธ์ ์ผ๋ก ๋ฐฑ์
๋นํ์ฑํ(๋ฐฑ์
๋ณด์กด ๊ธฐ๊ฐ - 0์ผ), ๋ชจ๋ํฐ๋ง ๋ฐ ์ฑ๋ฅ ๊ฐ์ ๋์ฐ๋ฏธ๋ฅผ ์ ์ธํ ๋๋จธ์ง ๋งค๊ฐ ๋ณ์๋ ๊ทธ๋๋ก ๋ ์ ์์ต๋๋ค. ๋ฒํผ์ ํด๋ฆญํ์ธ์ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์์ฑ:
์ค๋ ๋ ํธ๋ค๋ฌ
๋ง์ง๋ง ๋จ๊ณ๋ XNUMX์ด๋ง๋ค Kafka์์ ๋ค์ด์ค๋ ์๋ก์ด ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ณ ๊ฒฐ๊ณผ๋ฅผ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ ๋ ฅํ๋ Spark ์์ ์ ๊ฐ๋ฐ์ ๋๋ค.
์์์ ์ธ๊ธํ ๊ฒ์ฒ๋ผ ์ฒดํฌํฌ์ธํธ๋ ๋ด๊ฒฐํจ์ฑ์ ๋ณด์ฅํ๊ธฐ ์ํด ๊ตฌ์ฑํด์ผ ํ๋ SparkStreaming์ ํต์ฌ ๋ฉ์ปค๋์ฆ์ ๋๋ค. ์ฒดํฌํฌ์ธํธ๋ฅผ ์ฌ์ฉํ ๊ฒ์ด๋ฉฐ, ์ ์ฐจ๊ฐ ์คํจํ ๊ฒฝ์ฐ Spark Streaming ๋ชจ๋์ ๋ง์ง๋ง ์ฒดํฌํฌ์ธํธ๋ก ๋์๊ฐ ๊ณ์ฐ์ ์ฌ๊ฐํ๊ธฐ๋ง ํ๋ฉด ์์ค๋ ๋ฐ์ดํฐ๋ฅผ ๋ณต๊ตฌํ ์ ์์ต๋๋ค.
์ฒดํฌํฌ์ธํธ ์ ๋ณด๊ฐ ์ ์ฅ๋ ๋ด๊ฒฐํจ์ฑ, ์ ๋ขฐ์ฑ ์๋ ํ์ผ ์์คํ (์: HDFS, S3 ๋ฑ)์ ๋๋ ํฐ๋ฆฌ๋ฅผ ์ค์ ํ์ฌ ์ฒดํฌํฌ์ธํธ๋ฅผ ํ์ฑํํ ์ ์์ต๋๋ค. ์ด๋ ์๋ฅผ ๋ค์ด ๋ค์์ ์ฌ์ฉํ์ฌ ์ํ๋ฉ๋๋ค.
streamingContext.checkpoint(checkpointDirectory)
์ด ์์์๋ ๋ค์ ์ ๊ทผ ๋ฐฉ์์ ์ฌ์ฉํฉ๋๋ค. ์ฆ, checkpointDirectory๊ฐ ์กด์ฌํ๋ ๊ฒฝ์ฐ ์ฒดํฌํฌ์ธํธ ๋ฐ์ดํฐ์์ ์ปจํ ์คํธ๊ฐ ๋ค์ ์์ฑ๋ฉ๋๋ค. ๋๋ ํฐ๋ฆฌ๊ฐ ์กด์ฌํ์ง ์๋ ๊ฒฝ์ฐ(์ฆ, ์ฒ์์ผ๋ก ์คํ๋ ๊ฒฝ์ฐ) functionToCreateContext๊ฐ ํธ์ถ๋์ด ์ ์ปจํ ์คํธ๋ฅผ ์์ฑํ๊ณ DStreams๋ฅผ ๊ตฌ์ฑํฉ๋๋ค.
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
KafkaUtils ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ createDirectStream ๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ "ํธ๋์ญ์ " ์ฃผ์ ์ ์ฐ๊ฒฐํ๊ธฐ ์ํ DirectStream ๊ฐ์ฒด๋ฅผ ๋ง๋ญ๋๋ค.
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)
broker_list = 'localhost:9092'
topic = 'transaction'
directKafkaStream = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": broker_list})
JSON ํ์์ผ๋ก ๋ค์ด์ค๋ ๋ฐ์ดํฐ๋ฅผ ๊ตฌ๋ฌธ ๋ถ์ํฉ๋๋ค.
rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
currency=w['currency'],
amount=w['amount']))
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")
Spark SQL์ ์ฌ์ฉํ์ฌ ๊ฐ๋จํ ๊ทธ๋ฃนํ๋ฅผ ์ํํ๊ณ ๊ฒฐ๊ณผ๋ฅผ ์ฝ์์ ํ์ํฉ๋๋ค.
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
์ฟผ๋ฆฌ ํ ์คํธ๋ฅผ ๊ฐ์ ธ์ Spark SQL์ ํตํด ์คํํฉ๋๋ค.
sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)
๊ทธ๋ฐ ๋ค์ ๊ฒฐ๊ณผ ์ง๊ณ ๋ฐ์ดํฐ๋ฅผ AWS RDS์ ํ ์ด๋ธ์ ์ ์ฅํฉ๋๋ค. ์ง๊ณ ๊ฒฐ๊ณผ๋ฅผ ๋ฐ์ดํฐ๋ฒ ์ด์ค ํ ์ด๋ธ์ ์ ์ฅํ๊ธฐ ์ํด DataFrame ๊ฐ์ฒด์ write ๋ฉ์๋๋ฅผ ์ฌ์ฉํฉ๋๋ค.
testResultDataFrame.write
.format("jdbc")
.mode("append")
.option("driver", 'org.postgresql.Driver')
.option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB")
.option("dbtable", "transaction_flow")
.option("user", "habr")
.option("password", "habr12345")
.save()
AWS RDS์ ๋ํ ์ฐ๊ฒฐ ์ค์ ์ ๋ํ ๋ช ๋ง๋์ ๋๋ค. โAWS PostgreSQL ๋ฐฐํฌโ ๋จ๊ณ์์ ์ด์ ๋ํ ์ฌ์ฉ์์ ๋น๋ฐ๋ฒํธ๋ฅผ ์์ฑํ์ต๋๋ค. ์ฐ๊ฒฐ ๋ฐ ๋ณด์ ์น์ ์ ํ์๋๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์๋ฒ URL๋ก Endpoint๋ฅผ ์ฌ์ฉํด์ผ ํฉ๋๋ค.
Spark์ Kafka๋ฅผ ์ฌ๋ฐ๋ฅด๊ฒ ์ฐ๊ฒฐํ๋ ค๋ฉด ์ํฐํฉํธ๋ฅผ ์ฌ์ฉํ์ฌ smark-submit์ ํตํด ์์ ์ ์คํํด์ผ ํฉ๋๋ค. ์คํํฌ ์คํธ๋ฆฌ๋ฐ-kafka-0-8_2.11. ๋ํ PostgreSQL ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ํธ์์ฉํ๊ธฐ ์ํ ์ํฐํฉํธ๋ ์ฌ์ฉํ ๊ฒ์ด๋ฉฐ --packages๋ฅผ ํตํด ์ด๋ฅผ ์ ์กํ ๊ฒ์ ๋๋ค.
์คํฌ๋ฆฝํธ์ ์ ์ฐ์ฑ์ ์ํด ๋ฉ์์ง ์๋ฒ์ ์ด๋ฆ๊ณผ ๋ฐ์ดํฐ๋ฅผ ์์ ํ๋ ค๋ ์ฃผ์ ๋ ์ ๋ ฅ ๋งค๊ฐ๋ณ์๋ก ํฌํจํฉ๋๋ค.
์ด์ ์์คํ ๊ธฐ๋ฅ์ ์คํํ๊ณ ํ์ธํ ์ฐจ๋ก์ ๋๋ค.
spark-submit
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207
spark_job.py localhost:9092 transaction
๋ชจ๋ ๊ฒ์ด ํด๊ฒฐ๋์์ต๋๋ค! ์๋ ๊ทธ๋ฆผ์์ ๋ณผ ์ ์๋ฏ์ด, StreamingContext ๊ฐ์ฒด๋ฅผ ์์ฑํ ๋ ์ผ๊ด ์ฒ๋ฆฌ ๊ฐ๊ฒฉ์ 2์ด๋ก ์ค์ ํ๊ธฐ ๋๋ฌธ์ ์ ํ๋ฆฌ์ผ์ด์ ์ด ์คํ๋๋ ๋์ 2์ด๋ง๋ค ์๋ก์ด ์ง๊ณ ๊ฒฐ๊ณผ๊ฐ ์ถ๋ ฅ๋ฉ๋๋ค.
๋ค์์ผ๋ก, ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๊ฐ๋จํ ์ฟผ๋ฆฌ๋ฅผ ๋ง๋ค์ด ํ ์ด๋ธ์ ๋ ์ฝ๋๊ฐ ์๋์ง ํ์ธํฉ๋๋ค. transaction_flow:
๊ฒฐ๋ก
์ด ๊ธฐ์ฌ์์๋ Apache Kafka ๋ฐ PostgreSQL๊ณผ ํจ๊ป Spark Streaming์ ์ฌ์ฉํ์ฌ ์ ๋ณด ์คํธ๋ฆผ ์ฒ๋ฆฌ์ ์๋ฅผ ์ดํด๋ณด์์ต๋๋ค. ๋ค์ํ ์์ค์ ๋ฐ์ดํฐ๊ฐ ์ฆ๊ฐํจ์ ๋ฐ๋ผ ์คํธ๋ฆฌ๋ฐ ๋ฐ ์ค์๊ฐ ์ ํ๋ฆฌ์ผ์ด์ ์ ์์ ์์ด Spark Streaming์ ์ค์ง์ ์ธ ๊ฐ์น๋ฅผ ๊ณผ๋ํ๊ฐํ๊ธฐ๋ ์ด๋ ต์ต๋๋ค.
๋ด ์ ์ฅ์์์ ์ ์ฒด ์์ค ์ฝ๋๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค.
๋๋ ์ด ๊ธฐ์ฌ์ ๋ํด ํ ๋ก ํ๊ฒ ๋์ด ๊ธฐ์๊ณ , ์ฌ๋ฌ๋ถ์ ์๊ฒฌ์ ๊ธฐ๋ํ๋ฉฐ, ๋ํ ๊ด์ฌ์ ๊ฐ๊ณ ์๋ ๋ชจ๋ ๋ ์๋ค์ ๊ฑด์ค์ ์ธ ๋นํ์ ๋ฐ๋๋๋ค.
๋ ๋น์ ์๊ฒ ์ฑ๊ณต์ ๊ธฐ์ํฉ๋๋ค!
์. ์ฒ์์๋ ๋ก์ปฌ PostgreSQL ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ฅผ ์ฌ์ฉํ ๊ณํ์ด์์ง๋ง AWS์ ๋ํ ์ ์ ์ ๊ณ ๋ คํ์ฌ ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ฅผ ํด๋ผ์ฐ๋๋ก ์ด๋ํ๊ธฐ๋ก ๊ฒฐ์ ํ์ต๋๋ค. ์ด ์ฃผ์ ์ ๋ํ ๋ค์ ๊ธฐ์ฌ์์๋ AWS Kinesis ๋ฐ AWS EMR์ ์ฌ์ฉํ์ฌ AWS์์ ์์์ ์ค๋ช
ํ ์ ์ฒด ์์คํ
์ ๊ตฌํํ๋ ๋ฐฉ๋ฒ์ ๋ณด์ฌ ๋๋ฆฌ๊ฒ ์ต๋๋ค. ๋ด์ค๋ฅผ ํ๋ก์ฐํ์ธ์!
์ถ์ฒ : habr.com