ããã«ã¡ã¯ãããã«ïŒ ä»æ¥ã¯ãSpark Streaming ã䜿çšã㊠Apache Kafka ã®ã¡ãã»ãŒãž ã¹ããªãŒã ãåŠçããåŠççµæã AWS RDS ã¯ã©ãŠã ããŒã¿ããŒã¹ã«æžã蟌ãã·ã¹ãã ãæ§ç¯ããŸãã
ããä¿¡çšæ©é¢ãããã®ãã¹ãŠã®æ¯åºã«ããã£ãŠåä¿¡ãã©ã³ã¶ã¯ã·ã§ã³ãããã®å Žã§ãåŠçããã¿ã¹ã¯ãç§ãã¡ã«èª²ãããšæ³åããŠã¿ãŸãããã ããã¯ã財åçã®ãªãŒãã³é貚ããžã·ã§ã³ãååŒã®é床é¡ãŸãã¯è²¡åçµæãªã©ãè¿ éã«èšç®ããç®çã§è¡ãããšãã§ããŸãã
éæ³ãéæ³ã®åªæã䜿çšããã«ãã®ã±ãŒã¹ãå®è£ ããæ¹æ³ - ã«ããã®äžããèªã¿ãã ããã è¡ãïŒ
å°å ¥
ãã¡ããã倧éã®ããŒã¿ããªã¢ã«ã¿ã€ã ã§åŠçããããšã¯ãææ°ã®ã·ã¹ãã ã§äœ¿çšãããæ©äŒãååã«æäŸããŸãã ãã®ããã®æãäžè¬çãªçµã¿åããã® XNUMX ã€ã¯ãApache Kafka ãš Spark Streaming ã®ã¿ã³ãã ã§ããKafka ãåä¿¡ã¡ãã»ãŒãž ãã±ããã®ã¹ããªãŒã ãäœæããSpark Streaming ããããã®ãã±ãããæå®ãããæéééã§åŠçããŸãã
ã¢ããªã±ãŒã·ã§ã³ã®èé害æ§ãé«ããããã«ããã§ãã¯ãã€ã³ãã䜿çšããŸãã ãã®ã¡ã«ããºã ã䜿çšãããšãSpark ã¹ããªãŒãã³ã° ãšã³ãžã³ã倱ãããããŒã¿ãå埩ããå¿ èŠãããå ŽåãæåŸã®ãã§ãã¯ãã€ã³ãã«æ»ã£ãŠãããããèšç®ãåéããã ãã§æžã¿ãŸãã
éçºããã·ã¹ãã ã®ã¢ãŒããã¯ãã£
䜿çšããã³ã³ããŒãã³ã:
ã¢ãããã«ãã« ã¯ãåæ£åãããªãã·ã¥/ãµãã¹ã¯ã©ã€ã ã¡ãã»ãŒãžã³ã° ã·ã¹ãã ã§ãã ãªãã©ã€ã³ãšãªã³ã©ã€ã³ã®äž¡æ¹ã®ã¡ãã»ãŒãžæ¶è²»ã«é©ããŠããŸãã ããŒã¿æ倱ãé²ãããã«ãKafka ã¡ãã»ãŒãžã¯ãã£ã¹ã¯ã«ä¿åãããã¯ã©ã¹ã¿ãŒå ã§è€è£œãããŸãã Kafka ã·ã¹ãã ã¯ãZooKeeper åæãµãŒãã¹äžã«æ§ç¯ãããŠããŸããApache Spark ã¹ããªãŒãã³ã° - ã¹ããªãŒãã³ã° ããŒã¿ãåŠçããããã® Spark ã³ã³ããŒãã³ãã Spark Streaming ã¢ãžã¥ãŒã«ã¯ããã€ã¯ãããã ã¢ãŒããã¯ãã£ã䜿çšããŠæ§ç¯ãããŠãããããŒã¿ ã¹ããªãŒã ã¯å°ããªããŒã¿ ãã±ããã®é£ç¶ã·ãŒã±ã³ã¹ãšããŠè§£éãããŸãã Spark Streaming ã¯ãããŸããŸãªãœãŒã¹ããããŒã¿ãååŸãããããå°ããªããã±ãŒãžã«çµåããŸãã æ°ããããã±ãŒãžã¯å®æçã«äœæãããŸãã åæéééã®éå§æã«æ°ãããã±ãããäœæããããã®ééäžã«åä¿¡ãããããŒã¿ã¯ãã¹ãŠãã±ããã«å«ãŸããŸãã ééã®çµããã«ããã±ããã®å¢å ã¯åæ¢ããŸãã ééã®ãµã€ãºã¯ããããééãšåŒã°ãããã©ã¡ãŒã¿ã«ãã£ãŠæ±ºãŸããŸããã¢ããã ã¹ããŒã¯ SQL - ãªã¬ãŒã·ã§ãã«åŠçãš Spark é¢æ°ããã°ã©ãã³ã°ãçµã¿åãããŸãã æ§é åããŒã¿ãšã¯ãã¹ããŒããã€ãŸããã¹ãŠã®ã¬ã³ãŒãã«å¯ŸããŠåäžã®ãã£ãŒã«ã ã»ãããæã€ããŒã¿ãæå³ããŸãã Spark SQL ã¯ãããŸããŸãªæ§é åããŒã¿ ãœãŒã¹ããã®å ¥åããµããŒãããŠãããã¹ããŒãæ å ±ãå©çšã§ãããããã¬ã³ãŒãã®å¿ èŠãªãã£ãŒã«ãã®ã¿ãå¹ççã«ååŸã§ããDataFrame API ãæäŸããŸããAWS RDS ã¯æ¯èŒçå®äŸ¡ãªã¯ã©ãŠãããŒã¹ã®ãªã¬ãŒã·ã§ãã« ããŒã¿ããŒã¹ã§ãããã»ããã¢ãããæäœãæ¡åŒµãç°¡çŽ åãã Web ãµãŒãã¹ã§ãããAmazon ã«ãã£ãŠçŽæ¥ç®¡çãããŸãã
Kafka ãµãŒããŒã®ã€ã³ã¹ããŒã«ãšå®è¡
Kafka ãçŽæ¥äœ¿çšããåã«ãJava ãããããšã確èªããå¿ èŠããããŸãã JVM ã¯äœæ¥ã«äœ¿çšãããŸãã
sudo apt-get update
sudo apt-get install default-jre
java -version
Kafka ã䜿çšããæ°ãããŠãŒã¶ãŒãäœæããŸãããã
sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo
次ã«ãApache Kafka ã®å ¬åŒ Web ãµã€ããããã£ã¹ããªãã¥ãŒã·ã§ã³ãããŠã³ããŒãããŸãã
wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"
ããŠã³ããŒãããã¢ãŒã«ã€ãã解åããŸãã
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
次ã®ã¹ãããã¯ãªãã·ã§ã³ã§ãã å®éã®ãšãããããã©ã«ãèšå®ã§ã¯ Apache Kafka ã®ãã¹ãŠã®æ©èœãå®å šã«äœ¿çšããããšã¯ã§ããŸããã ããšãã°ãã¡ãã»ãŒãžãå ¬éã§ãããããã¯ãã«ããŽãªãã°ã«ãŒããåé€ããŸãã ãããå€æŽããã«ã¯ãæ§æãã¡ã€ã«ãç·šéããŸãããã
vim ~/kafka/config/server.properties
ãã¡ã€ã«ã®æ«å°Ÿã«ä»¥äžãè¿œå ããŸãã
delete.topic.enable = true
Kafka ãµãŒããŒãèµ·åããåã«ãZooKeeper ãµãŒããŒãèµ·åããå¿ èŠããããŸããKafka ãã£ã¹ããªãã¥ãŒã·ã§ã³ã«ä»å±ããè£å©ã¹ã¯ãªããã䜿çšããŸãã
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
ZooKeeper ãæ£åžžã«èµ·åããããå¥ã®ã¿ãŒããã«ã§ Kafka ãµãŒããŒãèµ·åããŸãã
bin/kafka-server-start.sh config/server.properties
ããã©ã³ã¶ã¯ã·ã§ã³ããšããæ°ãããããã¯ãäœæããŸãããã
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction
å¿ èŠãªæ°ã®ããŒãã£ã·ã§ã³ãšã¬ããªã±ãŒã·ã§ã³ãå«ããããã¯ãäœæãããŠããããšã確èªããŠãã ããã
bin/kafka-topics.sh --describe --zookeeper localhost:2181
æ°ããäœæããããããã¯ã®ãããã¥ãŒãµãŒãšã³ã³ã·ã¥ãŒããŒããã¹ãããç¬éãèŠéããŸãããã ã¡ãã»ãŒãžã®éåä¿¡ããã¹ãããæ¹æ³ã®è©³çŽ°ã«ã€ããŠã¯ãå
¬åŒããã¥ã¡ã³ãã«èšèŒãããŠããŸãã
ãããã¥ãŒãµãŒå·ç
ãããã¥ãŒãµã¯ã©ã³ãã ããŒã¿ (æ¯ç§ 100 ã¡ãã»ãŒãž) ãçæããŸãã ã©ã³ãã ããŒã¿ãšã¯ã次㮠XNUMX ã€ã®ãã£ãŒã«ãã§æ§æãããèŸæžãæå³ããŸãã
- ãã©ã³ã â ä¿¡çšæ©é¢ã®è²©å£²æç¹æ å ±ç®¡çã®ååã
- é貚 â ååŒé貚;
- åäœ - ååŒéé¡ã éè¡ã«ããé貚ã®è³Œå ¥ã®å Žåãéé¡ã¯æ£ã®æ°ã«ãªãã販売ã®å Žåã¯è² ã®æ°ã«ãªããŸãã
ãããã¥ãŒãµãŒã®ã³ãŒãã¯æ¬¡ã®ããã«ãªããŸãã
from numpy.random import choice, randint
def get_random_value():
new_dict = {}
branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
currency_list = ['RUB', 'USD', 'EUR', 'GBP']
new_dict['branch'] = choice(branch_list)
new_dict['currency'] = choice(currency_list)
new_dict['amount'] = randint(-100, 100)
return new_dict
次ã«ãsend ã¡ãœããã䜿çšããŠããµãŒããŒã«å¿ èŠãªãããã¯ã« JSON 圢åŒã§ã¡ãã»ãŒãžãéä¿¡ããŸãã
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
ã¹ã¯ãªãããå®è¡ãããšãã¿ãŒããã«ã«æ¬¡ã®ã¡ãã»ãŒãžã衚瀺ãããŸãã
ããã¯ããã¹ãŠãåžæã©ããã«æ©èœããããšãæå³ããŸãããããã¥ãŒãµãŒãã¡ãã»ãŒãžãçæããå¿
èŠãªãããã¯ã«éä¿¡ããŸãã
次ã®ã¹ãããã§ã¯ãSpark ãã€ã³ã¹ããŒã«ãããã®ã¡ãã»ãŒãž ã¹ããªãŒã ãåŠçããŸãã
Apache Spark ã®ã€ã³ã¹ããŒã«
Apache Spark ã¯ããŠãããŒãµã«ã§é«æ§èœãªã¯ã©ã¹ã¿ãŒ ã³ã³ãã¥ãŒãã£ã³ã° ãã©ãããã©ãŒã ã§ãã
Spark ã¯ã察話åã¯ãšãªãã¹ããªãŒã åŠçãªã©ãå¹ åºãçš®é¡ã®èšç®ããµããŒãããªãããMapReduce ã¢ãã«ã®äžè¬çãªå®è£ ãããåªããããã©ãŒãã³ã¹ãçºæ®ããŸãã 倧éã®ããŒã¿ãåŠçããå Žåãé床ã¯éèŠãªåœ¹å²ãæãããŸããé床ã«ãã£ãŠãæ°åããäœæéãåŸ ã€ããšãªã察話çã«äœæ¥ã§ããããã«ãªããŸãã Spark ãé«éåããæ倧ã®åŒ·ã¿ã® XNUMX ã€ã¯ãã¡ã¢ãªå ã§èšç®ãå®è¡ã§ããããšã§ãã
ãã®ãã¬ãŒã ã¯ãŒã¯ã¯ Scala ã§æžãããŠãããããæåã«ã€ã³ã¹ããŒã«ããå¿ èŠããããŸãã
sudo apt-get install scala
å ¬åŒ Web ãµã€ããã Spark ãã£ã¹ããªãã¥ãŒã·ã§ã³ãããŠã³ããŒãããŸãã
wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"
ã¢ãŒã«ã€ãã解åããŸãã
sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark
Spark ãžã®ãã¹ã bash ãã¡ã€ã«ã«è¿œå ããŸãã
vim ~/.bashrc
ãšãã£ã¿ãŒã䜿çšããŠæ¬¡ã®è¡ãè¿œå ããŸãã
SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH
bashrc ã«å€æŽãå ããåŸã以äžã®ã³ãã³ããå®è¡ããŸãã
source ~/.bashrc
AWS PostgreSQL ã®ãããã€
æ®ã£ãŠããã®ã¯ãã¹ããªãŒã ããåŠçãããæ å ±ãã¢ããããŒãããããŒã¿ããŒã¹ããããã€ããããšã ãã§ãã ãã®ããã«ãAWS RDS ãµãŒãã¹ã䜿çšããŸãã
AWS ã³ã³ãœãŒã« -> AWS RDS -> ããŒã¿ããŒã¹ -> ããŒã¿ããŒã¹ã®äœæã«ç§»åããŸãã
PostgreSQL ãéžæããã次ãžããã¯ãªãã¯ããŸãã
ãªããªããã®äŸã¯æè²ç®çã®ã¿ã§ããããå°ãªããšããç¡æãµãŒã㌠(ç¡æå©çšæ ) ã䜿çšããŸãã
次ã«ãç¡æå©çšæ ãããã¯ã«ãã§ãã¯ãå
¥ããŸãããã®åŸãt2.micro ã¯ã©ã¹ã®ã€ã³ã¹ã¿ã³ã¹ãèªåçã«æäŸãããŸããããã¯åŒ±ãã§ãããç¡æã§ãããç§ãã¡ã®ã¿ã¹ã¯ã«éåžžã«é©ããŠããŸãã
次ã«ãããŒã¿ããŒã¹ ã€ã³ã¹ã¿ã³ã¹ã®ååããã¹ã¿ãŒ ãŠãŒã¶ãŒã®ååããã¹ã¯ãŒããšããéåžžã«éèŠãªé
ç®ãç¶ããŸãã ã€ã³ã¹ã¿ã³ã¹ã« myHabrTestããã¹ã¿ãŒ ãŠãŒã¶ãŒãšããååãä»ããŸãã ããŒããŒããã¹ã¯ãŒã: ãã12345 ãããŠãã次ãžããã¿ã³ãã¯ãªãã¯ããŸãã
次ã®ããŒãžã«ã¯ãå€éšããã®ããŒã¿ããŒã¹ ãµãŒããŒãžã®ã¢ã¯ã»ã¹å¯èœæ§ (ãããªã㯠ã¢ã¯ã»ã·ããªãã£) ãšããŒãã®å¯çšæ§ãå¶åŸ¡ãããã©ã¡ãŒã¿ããããŸãã
VPC ã»ãã¥ãªã㣠ã°ã«ãŒãã®æ°ããèšå®ãäœæããŸããããããã«ãããããŒã 5432 (PostgreSQL) çµç±ã§ããŒã¿ããŒã¹ ãµãŒããŒãžã®å€éšã¢ã¯ã»ã¹ãèš±å¯ãããŸãã
å¥ã®ãã©ãŠã¶ ãŠã£ã³ããŠã§ AWS ã³ã³ãœãŒã«ã«ç§»åãã[VPC ããã·ã¥ããŒã] -> [ã»ãã¥ãªã㣠ã°ã«ãŒã] -> [ã»ãã¥ãªã㣠ã°ã«ãŒãã®äœæ] ã»ã¯ã·ã§ã³ã«é²ã¿ãŸãã
ã»ãã¥ãªã㣠ã°ã«ãŒãã®åå - PostgreSQLã説æãèšå®ãããã®ã°ã«ãŒããé¢é£ä»ããå¿
èŠããã VPC ãæå®ããŠã[äœæ] ãã¿ã³ãã¯ãªãã¯ããŸãã
以äžã®å³ã«ç€ºãããã«ãæ°ããäœæããã°ã«ãŒãã®ããŒã 5432 ã®åä¿¡ã«ãŒã«ãå ¥åããŸãã ããŒããæåã§æå®ããããšã¯ã§ããŸãããã[ã¿ã€ã] ããããããŠã³ ãªã¹ããã [PostgreSQL] ãéžæããŸãã
å³å¯ã«èšãã°ãå€ ::/0 ã¯ãäžçäžãããµãŒããŒãžã®åä¿¡ãã©ãã£ãã¯ãå©çšå¯èœã§ããããšãæå³ããŸããããã¯æšæºçã«ã¯å®å
šã«çå®ã§ã¯ãããŸããããäŸãåæããããã«ã次ã®ã¢ãããŒãã䜿çšã§ããããã«ããŠã¿ãŸãããã
ãã©ãŠã¶ãŒã®ããŒãžã«æ»ããã詳现èšå®ã®æ§æããéããŠãVPC ã»ãã¥ãªã㣠ã°ã«ãŒã ã»ã¯ã·ã§ã³ãéžæããŸã -> æ¢åã® VPC ã»ãã¥ãªã㣠ã°ã«ãŒããéžæ -> PostgreSQL:
次ã«ãããŒã¿ããŒã¹ãªãã·ã§ã³ -> ããŒã¿ããŒã¹å -> ååãèšå®ããŸã - ããDB.
ããã¯ã¢ãã (ããã¯ã¢ããä¿ææé - 0 æ¥)ãã¢ãã¿ãªã³ã°ãããã³ããã©ãŒãã³ã¹ ã€ã³ãµã€ãã®ç¡å¹åãé€ããæ®ãã®ãã©ã¡ãŒã¿ãŒã¯ããã©ã«ãã®ãŸãŸã«ããŠããããšãã§ããŸãã ãã¿ã³ãã¯ãªãã¯ããŠãã ãã ããŒã¿ããŒã¹ãäœæãã:
ã¹ã¬ãããã³ãã©ãŒ
æçµæ®µé㯠Spark ãžã§ãã®éçºã§ããSpark ãžã§ãã¯ãKafka ããã®æ°ããããŒã¿ã XNUMX ç§ããšã«åŠçããçµæãããŒã¿ããŒã¹ã«å ¥åããŸãã
äžã§è¿°ã¹ãããã«ããã§ãã¯ãã€ã³ã㯠SparkStreaming ã®äžå¿çãªã¡ã«ããºã ã§ããããã©ãŒã«ã ãã¬ã©ã³ã¹ã確ä¿ããããã«æ§æããå¿ èŠããããŸãã ãã§ãã¯ãã€ã³ãã䜿çšããŸããæé ã倱æããå ŽåãSpark Streaming ã¢ãžã¥ãŒã«ã¯æåŸã®ãã§ãã¯ãã€ã³ãã«æ»ã£ãŠããããèšç®ãåéããã ãã§ã倱ãããããŒã¿ãå埩ã§ããŸãã
ãã§ãã¯ãã€ã³ãæ©èœã¯ããã§ãã¯ãã€ã³ãæ å ±ãä¿åããããã©ãŒã«ã ãã¬ã©ã³ãã§ä¿¡é Œæ§ã®é«ããã¡ã€ã« ã·ã¹ãã (HDFSãS3 ãªã©) äžã«ãã£ã¬ã¯ããªãèšå®ããããšã§æå¹ã«ã§ããŸãã ããã¯ãããšãã°ä»¥äžã䜿çšããŠè¡ãããŸãã
streamingContext.checkpoint(checkpointDirectory)
ãã®äŸã§ã¯ã次ã®ã¢ãããŒãã䜿çšããŸããã€ãŸããcheckpointDirectory ãååšããå Žåãã³ã³ããã¹ãã¯ãã§ãã¯ãã€ã³ã ããŒã¿ããåäœæãããŸãã ãã£ã¬ã¯ããªãååšããªãå Žå (ã€ãŸããåããŠå®è¡ãããå Žå)ã functionToCreateContext ãåŒã³åºãããŠãæ°ããã³ã³ããã¹ããäœæãããDStreams ãæ§æãããŸãã
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
KafkaUtils ã©ã€ãã©ãªã® createDirectStream ã¡ãœããã䜿çšããŠãããã©ã³ã¶ã¯ã·ã§ã³ããããã¯ã«æ¥ç¶ãã DirectStream ãªããžã§ã¯ããäœæããŸãã
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)
broker_list = 'localhost:9092'
topic = 'transaction'
directKafkaStream = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": broker_list})
JSON 圢åŒã§åä¿¡ããŒã¿ã解æããŸãã
rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
currency=w['currency'],
amount=w['amount']))
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")
Spark SQL ã䜿çšããŠãåçŽãªã°ã«ãŒãåãå®è¡ããçµæãã³ã³ãœãŒã«ã«è¡šç€ºããŸãã
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
ã¯ãšãª ããã¹ããååŸããSpark SQL ãéããŠå®è¡ããŸãã
sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)
ãããŠãçµæãšããŠåŸãããéèšããŒã¿ã AWS RDS ã®ããŒãã«ã«ä¿åããŸãã éèšçµæãããŒã¿ããŒã¹ ããŒãã«ã«ä¿åããã«ã¯ãDataFrame ãªããžã§ã¯ãã® write ã¡ãœããã䜿çšããŸãã
testResultDataFrame.write
.format("jdbc")
.mode("append")
.option("driver", 'org.postgresql.Driver')
.option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB")
.option("dbtable", "transaction_flow")
.option("user", "habr")
.option("password", "habr12345")
.save()
AWS RDS ãžã®æ¥ç¶ã®ã»ããã¢ããã«ã€ããŠå°ã説æããŸãã ãŠãŒã¶ãŒãšãã¹ã¯ãŒãã¯ãAWS PostgreSQL ã®ãããã€ãã¹ãããã§äœæããŸããã [æ¥ç¶ãšã»ãã¥ãªãã£] ã»ã¯ã·ã§ã³ã«è¡šç€ºãããããŒã¿ããŒã¹ ãµãŒã㌠URL ãšããŠãšã³ããã€ã³ãã䜿çšããå¿ èŠããããŸãã
Spark ãš Kafka ãæ£ããæ¥ç¶ããã«ã¯ãã¢ãŒãã£ãã¡ã¯ãã䜿çšã㊠smark-submit çµç±ã§ãžã§ããå®è¡ããå¿ èŠããããŸãã ã¹ããŒã¯ã¹ããªãŒãã³ã°ã«ãã«0-8_2.11ã ããã«ãPostgreSQL ããŒã¿ããŒã¹ãšå¯Ÿè©±ããããã®ã¢ãŒãã£ãã¡ã¯ãã䜿çšãã --packages çµç±ã§è»¢éããŸãã
ã¹ã¯ãªããã®æè»æ§ãé«ããããã«ãã¡ãã»ãŒãž ãµãŒããŒã®ååãšããŒã¿ã®åä¿¡å ã®ãããã¯ãå ¥åãã©ã¡ãŒã¿ãŒãšããŠå«ããŸãã
ããã§ã¯ãã·ã¹ãã ãèµ·åããŠæ©èœã確èªããŠã¿ãŸãããã
spark-submit
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207
spark_job.py localhost:9092 transaction
ãã¹ãŠããŸããããŸããïŒ ä»¥äžã®å³ãããããããã«ãStreamingContext ãªããžã§ã¯ãã®äœææã«ãããééã 2 ç§ã«èšå®ãããããã¢ããªã±ãŒã·ã§ã³ã®å®è¡äžã¯æ°ããéèšçµæã 2 ç§ããšã«åºåãããŸãã
次ã«ãããŒã¿ããŒã¹ã«ç°¡åãªã¯ãšãªãå®è¡ããŠãããŒãã«å ã®ã¬ã³ãŒãã®ååšã確èªããŸãã ãã©ã³ã¶ã¯ã·ã§ã³ãããŒ:
ãŸãšã
ãã®èšäºã§ã¯ãSpark Streaming ã Apache Kafka ããã³ PostgreSQL ãšçµã¿åãããŠäœ¿çšââããæ å ±ã®ã¹ããªãŒã åŠçã®äŸãæ€èšããŸããã ããŸããŸãªãœãŒã¹ããã®ããŒã¿ãå¢å ããã«ã€ããŠãã¹ããªãŒãã³ã° ã¢ããªã±ãŒã·ã§ã³ããªã¢ã«ã¿ã€ã ã¢ããªã±ãŒã·ã§ã³ãäœæããããã® Spark Streaming ã®å®éçãªäŸ¡å€ãé倧è©äŸ¡ããããšã¯å°é£ã§ãã
å®å
šãªãœãŒã¹ ã³ãŒãã¯ç§ã®ãªããžããªã«ãããŸãã
ãã®èšäºã«ã€ããŠè°è«ã§ããããšãå¬ããæããŸããçããã®ã³ã¡ã³ãã楜ãã¿ã«ããŠããŸãããŸããæãããã®ããèªè ã®çæ§ããã®å»ºèšçãªæ¹å€ãæåŸ ããŠããŸãã
ç§ã¯ããªãã«æåãç¥ãïŒ
psã®ã åœåã¯ããŒã«ã«ã® PostgreSQL ããŒã¿ããŒã¹ã䜿çšããäºå®ã§ããããAWS ãžã®æçãèæ
®ããŠãããŒã¿ããŒã¹ãã¯ã©ãŠãã«ç§»è¡ããããšã«ããŸããã ãã®ãããã¯ã«é¢ãã次ã®èšäºã§ã¯ãAWS Kinesis ãš AWS EMR ã䜿çšããŠãäžèšã®ã·ã¹ãã å
šäœã AWS ã«å®è£
ããæ¹æ³ã説æããŸãã ãã¥ãŒã¹ããã©ããŒããŠãã ããïŒ
åºæïŒ habr.com