áááºá¹ááá¬áá« Habrá ááá±á·áá»áœááºá¯ááºááá¯á·ááẠSpark Streaming ááá¯á¡áá¯á¶ážááŒá¯á Apache Kafka áááºáá±á·ááºá»á á®ážááŒá±á¬ááºážáá»á¬ážááá¯áá¯ááºáá±á¬ááºááá·áºá áá áºáá áºáá¯áááºáá±á¬ááºááŒá®áž AWS RDS cloud áá±áá¬áá±á·á áºááœááºáá¯ááºáá±á¬ááºááŒááºážááááºáá»á¬ážááá¯áá±ážáá¬ážáá«áááºá
á¡áá»áá¯á·áá±á¬ ááááºáá áºá¡ááœá²á·á¡á ááºážáá áºáá¯ááẠáááºážáá¡ááá¯ááºážá¡áááºáá»á¬ážá¡á¬ážáá¯á¶ážááᯠâá¡áá»ááºá¡ááŒááºâ áá¯ááºáá±á¬ááºááẠáá»áœááºá¯ááºááá¯á·á¡á¬áž áá±ážáá±á¬ááºáááá·áºáá¬áááºááᯠá áááºáá°ážááŒáá·áºááŒáá«á áá¯á·á ááá¹áá¬ááá¯ááºá ááá·áºáááºáá»ááºáá»á¬áž ááá¯á·ááá¯áẠááœá±áá±ážááœá±áá°áá»á¬ážá¡ááœáẠááœá±ááŒá±ážááááºáá»á¬áž á áááºááá¯á·á¡ááœáẠááœáá·áºáááºážáá±á¬ ááœá±ááŒá±ážá¡áá±á¡áá¬ážááᯠáá»ááºááŒááºážááœááºáá»ááºááẠáááºááœááºáá»ááºá¡ááœáẠáááºážááᯠáá¯ááºáá±á¬ááºááá¯ááºáááºá
ááŸá±á¬áºááá¬ááŸáá·áº ááŸá±á¬áºá á¬áá¯á¶ážáá±á«ááºážáá»á¬ážá¡áá¯á¶ážáááŒá¯áá² á€á¡ááŸá¯ááᯠáááºááá¯á·á¡áá±á¬ááºá¡áááºáá±á¬áºááááºáááºáž - ááŒááºáá±á¬ááºááŸá¯á¡á±á¬ááºááœááºáááºáá«á ááœá¬áž!
áááá«ááºáž
ááŸááºáá«áááºá á¡áá»áááºááŸáá·áºáááŒá±ážáá® áá±áá¬á¡áá»á¬ážá¡ááŒá¬ážááᯠá á®áá¶áá±á¬ááºááœááºááŒááºážááẠáá±ááºáá®á áá áºáá»á¬ážááœáẠá¡áá¯á¶ážááŒá¯ááẠá¡ááœáá·áºá¡áááºážáá»á¬ážá áœá¬ áá±ážáá«áááºá á€á¡ááœáẠáá±áááºážá¡á á¬ážáá¯á¶áž áá±á«ááºážá ááºááŸá¯áá»á¬ážáá²á០áá áºáá¯ááẠApache Kafka ááŸáá·áº Spark Streaming á tandem ááŒá áºááŒá®ážá Kafka ááẠá¡áááºáááºáá±á·áá»áº áááºáá±á·ááºá»áá»á¬ážááᯠáááºáá®ážáá±ážáᬠSpark Streaming ááẠáááºááŸááºáá¬ážáá±á¬ á¡áá»áááºááŒá¬ážáá¬áááœáẠá¡ááá¯áá« áááºáááºáá»á¬ážááᯠáá¯ááºáá±á¬ááºáááºá
á¡ááá®áá±ážááŸááºážá á¡ááŸá¬ážáá¶ááá¯ááºáááºááᯠááá¯ážááŒáŸáá·áºáááºá¡ááœáẠá á áºáá±ážáá±ážááááºáá»á¬ážááᯠá¡áá¯á¶ážááŒá¯áá«áááºá á€ááá¹ááá¬ážááŒáá·áº Spark Streaming á¡ááºáá»ááºááẠáá»á±á¬ááºáá¯á¶ážááœá¬ážáá±á¬áá±áá¬ááᯠááŒááºáááºááá°ááẠááá¯á¡ááºáá±á¬á¡áá«á áááºážááẠáá±á¬ááºáá¯á¶ážá á áºáá±ážáá±ážááááºááá¯á· ááŒááºááœá¬ážááŒá®áž ááá¯áá±áá¬á០ááœááºáá»ááºááŸá¯áá»á¬ážááᯠááŒááºáááºáá¯ááºáá±á¬ááºáááºáᬠááá¯á¡ááºáááºá
áááá¯áá¬á áá Ạáá®ááœááºáá²á·áááºá
á¡áá¯á¶ážááŒá¯áá¬ážáá±á¬ á¡á áááºá¡ááá¯ááºážáá»á¬áž
Apache Kafka ááŒáá·áºáá±áá¬ážáá±á¬ áá¯ááºáá±-á á¬áááºážááœááºáž á á¬ááá¯áá±ážááá¯á·ááŸá¯á áá áºáá áºáá¯ááŒá áºáááºá á¡á±á¬á·ááºááá¯ááºážááŸáá·áº á¡áœááºááá¯ááºážá á¬ááᯠáá¯á¶ážá áœá²ááŸá¯ ááŸá áºáá»áá¯ážáá¯á¶ážá¡ááœáẠááá·áºáá»á±á¬áºáááºá áá±áá¬áá¯á¶ážááŸá¯á¶ážááŸá¯ááᯠáá¬ááœááºáááºá Kafka áááºáá±á·áá»áºáá»á¬ážááᯠáá áºááºáá±á«áºááœáẠááááºážáááºážááŒá®áž á¡á á¯á¡áá±ážá¡ááœááºáž áááºáá°ááŒá¯áá«áááºá Kafka á áá áºááẠZooKeeper áááºáá°ááŒá¯ááŒááºážáááºáá±á¬ááºááŸá¯áááááºááœááºáááºáá±á¬ááºáá¬ážáááºáApache Spark ááœáŸáá·áºááŒááºážá - ááá¯ááºááá¯ááºáá¯ááºááœáŸáá·áºááŸá¯áá±áá¬ááᯠáá¯ááºáá±á¬ááºáááºá¡ááœáẠSpark á¡á áááºá¡ááá¯ááºážá Spark Streaming module ááᯠáá±áá¬á á®ážááŒá±á¬ááºážá¡áá±ážá á¬áž áá±áá¬áááºáá±á·ááºá»áá»á¬ážá á ááºáááºáááŒáẠá¡á á®á¡á á¥áºá¡ááŒá Ạá¡áááá¹áá¬ááºááœáá·áºááá¯áá¬ážááá·áº micro-batch áááá¯áá¬ááᯠá¡áá¯á¶ážááŒá¯á áááºáá±á¬ááºáá¬ážáááºá Spark Streaming ááẠááá°áá®áá±á¬ á¡áááºážá¡ááŒá áºáá»á¬ážá០áá±áá¬áá»á¬ážááᯠáá°áá±á¬ááºááŒá®áž áááºážááᯠáá±ážáááºáá±á¬ áááºáá±á·ááºá»áá»á¬ážá¡ááŒá Ạáá±á«ááºážá ááºáá¬ážáááºá áááºáá±á·áá»áºá¡áá áºáá»á¬ážááᯠáá¯á¶ááŸááºá¡áá»áááºáá»á¬ážááœáẠáááºáá®ážáá«áááºá á¡áá»áááºáá¬ááá áºáá¯á á®áá¡á ááœááºá áááºáá±á·ááºá»á¡áá áºáá áºáá¯áááºáá®ážááŒá®áž ááá¯ááŒá¬ážáá¬áá¡ááœááºážáááŸááá±á¬áá±áá¬á¡á¬ážáá¯á¶ážááᯠáááºáááºááœááºááá·áºááœááºážáá¬ážáááºá ááŒá¬ážáá¬ááá¡áá¯á¶ážááœááºá áááºáááºááŒá®ážááœá¬ážááŸá¯áááºááá·áºááœá¬ážáááºá ááŒá¬ážáá¬ááá¡ááœááºá¡á á¬ážááᯠbatch interval áá¯áá±á«áºáá±á¬ á¡ááá¯ááºážá¡áá¬áá áºáá¯ááŒáá·áº áá¯á¶ážááŒááºáááºáApache Spark SQL - Spark functional programming ááŸáá·áº áááºá ááºáá±á¬ááºááœááºááŸá¯ááᯠáá±á«ááºážá ááºáá¬ážáááºá Structured data ááá¯áááºááŸá¬ ááŸááºáááºážá¡á¬ážáá¯á¶ážá¡ááœáẠááœááºáááºáá áºáá¯áááºážáá«áá±á¬ áá±áá¬ááᯠááá¯ááá¯áááºá Spark SQL ááẠá¡áá»áá¯ážáá»áá¯ážáá±á¬ááœá²á·á ááºážáá¯á¶áá±áá¬áááºážááŒá áºáá»á¬ážá០ááá·áºááœááºážááŸá¯ááᯠáá¶á·ááá¯ážáá±ážááŒá®áž schema á¡áá»ááºá¡áááºáááŸáááŸá¯ááŒá±á¬áá·áºá ááá¯á¡ááºáá±á¬ ááŸááºáááºážáá»á¬ážá á¡ááœááºáá»á¬ážááá¯áᬠáááá±á¬ááºá áœá¬ ááŒááºáááºááá°ááá¯ááºááŒá®áž DataFrame APIs áá»á¬ážááá¯áááºáž áá¶á·ááá¯ážáá±ážáá«áááºáAWS RDS á á»á±ážáááºáá¬áá±á¬ cloud-based áááºá ááºáá±áá¬áá±á·á áºá áááºáááºááŸá¯á áááºáááºááŸá¯ááŸáá·áº á¡ááá¯ááºážá¡áá¬ááᯠááá¯ážááŸááºážááœááºáá°á á±ááá·áº áááºáááºáá±á¬ááºááŸá¯ááŒá áºááŒá®áž Amazon á០ááá¯ááºááá¯ááºá á®áá¶ááá·áºááœá²áá«áááºá
Kafka áá¬áá¬ááᯠááá·áºááœááºážááŒááºážááŸáá·áº áá¯ááºáá±á¬ááºááŒááºážá
Kafka ááᯠááá¯ááºááá¯áẠá¡áá¯á¶ážáááŒá¯áá®á ááá·áºááœáẠJava ááŸáááẠááá¯á¡ááºáá±á¬ááŒá±á¬áá·áº ... JVM ááᯠá¡áá¯ááºá¡ááœáẠáá¯á¶ážáááº-
sudo apt-get update
sudo apt-get install default-jre
java -version
Kafka ááŸáá·áºá¡áá¯ááºáá¯ááºáááºá¡áá¯á¶ážááŒá¯áá°á¡áá áºááá¯áááºáá®ážááŒáá«á áá¯á·á
sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo
ááá¯á·áá±á¬ááºá ááá¬ážááẠApache Kafka áááºááá¯ááºá០ááŒáá·áºááŒá°ážááŸá¯ááᯠáá±á«ááºážáá¯ááºáá¯ááºáá«á
wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"
áá±á«ááºážáá¯ááºáá¯ááºáá¬ážáá±á¬ ááŸááºáááºážááᯠáá¯ááºááá¯ážáá«-
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
áá±á¬ááºáá áºááá·áºá ááœá±ážáá»ááºááœáá·áºáá«á á¡ááŸááºááŸá¬ áá°áááºážáááºáááºáá»á¬ážááẠApache Kafka áá áœááºážáááºá¡á¬ážáá¯á¶ážááᯠá¡ááŒáá·áºá¡áá¡áá¯á¶ážáááŒá¯ááŒááºážááŒá±á¬áá·áºááŒá áºáááºá á¥ááá¬á¡á¬ážááŒáá·áºá áá¯ááºáá±ááá¯ááºááá·áº áá±á«ááºážá ááºá á¡áá»áá¯ážá¡á á¬ážá á¡á¯ááºá á¯ááᯠáá»ááºáá«á áááºážááá¯ááŒá±á¬ááºážáá²áááºá ááœá²á·á ááºážááŸá¯áá¯á¶á á¶ááá¯ááºááᯠáááºážááŒááºááŒáá«á áá¯á·á
vim ~/kafka/config/server.properties
ááá¯ááºáá¡áá¯á¶ážááœáẠá¡á±á¬ááºáá«ááá¯á·ááᯠááá·áºáá«á
delete.topic.enable = true
Kafka áá¬áá¬ááᯠáá áááºáá®á áááºááẠZooKeeper áá¬áá¬ááᯠá áááºááẠááá¯á¡ááºáááºá áá»áœááºá¯ááºááá¯á·ááẠKafka ááŒáá·áºáá±ááŸá¯ááŸáá·áºá¡áá° áá«áá¬áá±á¬ á¡ááẠscript ááᯠá¡áá¯á¶ážááŒá¯áá«áááº-
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
ZooKeeper á¡á±á¬ááºááŒááºá áœá¬á áááºááŒá®ážáá±á¬ááºá áá®ážááŒá¬áž terminal ááœáẠKafka áá¬áá¬ááá¯ááœáá·áºáá«-
bin/kafka-server-start.sh config/server.properties
Transaction áá¯áá±á«áºáá±á¬ á¡ááŒá±á¬ááºážá¡áá¬á¡áá áºáá áºáá¯ááᯠáááºáá®ážááŒáá«á áá¯á·á
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction
ááá¯á¡ááºáá±á¬ partitions ááŸáá·áº replication á¡áá±á¡ááœááºáá»á¬ážáá«ááŸááá±á¬ áá±á«ááºážá ááºáá áºáá¯ááᯠáááºáá®ážáá¬ážááŒá±á¬ááºáž áá±áá»á¬áá«á á±á
bin/kafka-topics.sh --describe --zookeeper localhost:2181
á¡áá
áºáááºáá®ážáá¬ážáá±á¬ áá±á«ááºážá
ááºá¡ááœáẠáá¯ááºáá¯ááºáá°ááŸáá·áº á
á¬ážáá¯á¶ážáá°ááᯠá
ááºážáááºááá·áº á¡ááá¯ááºá¡ááá·áºáá»á¬ážááᯠáááºááœáŸááºááá¯ááºááŒáá«á
áá¯á·á áááºáá±á·áá»áºáá±ážááá¯á·ááŒááºážááŸáá·áº áááºáá¶ááŒááºážá¡á¬áž á
ááºážáááºááá¯ááºáá¯á¶á¡ááŒá±á¬ááºáž á¡áá±ážá
áááºá¡áá»ááºá¡áááºáá»á¬ážááᯠááá¬ážáááºá
á¬ááœááºá
á¬áááºážááœáẠáá±ážáá¬ážáá¬ážáááºá
áá¯ááºáá¯ááºáá°áá±ážáá¬ážááŒááºážá
áá¯ááºáá¯ááºáá°ááẠáá»áááºážáá±áᬠ- á áá¹ááá·áºááá¯ááºáž 100 áááºáá±á·áá»áºáá»á¬ážááᯠáá¯ááºáá±ážáááá·áºáááºá áá»áááºážáá±áá¬á¡á¬ážááŒáá·áº áá»áœááºá¯ááºááá¯á·ááẠá¡ááœááºáá¯á¶ážáá¯áá«áááºáá±á¬ á¡áááá¬ááºááᯠááá¯ááá¯áááº-
- áááºááœá²á¡ - ááááºáá áºá¡ááœá²á·á¡á ááºážááá±á¬ááºážá á»á±ážáá¡áááº;
- ááœá±ááŒá±áž - á¡áá±á¬ááºážá¡áááºááœá±ááŒá±áž;
- ááá¬á - á¡áá±á¬ááºážá¡áááºááá¬áá ááá¬áááẠáááºááŸááœá±ááŒá±ážáááºáá°ááŸá¯ááŒá áºáá«á á¡ááŒá¯ááá±á¬áá±á¬ááºáá±á¬áá¶áá«ááºááŒá áºáááºááŒá áºááŒá®ážá áááºážááẠá¡áá±á¬ááºážá¡áááºááŒá áºáá»áŸáẠá¡áá¯ááºáá¶áá«ááºááŒá áºáááºá
áá¯ááºáá¯ááºáá°á¡ááœáẠáá¯ááºááẠá€áá²á·ááá¯á· ááŒá áºáááº-
from numpy.random import choice, randint
def get_random_value():
new_dict = {}
branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
currency_list = ['RUB', 'USD', 'EUR', 'GBP']
new_dict['branch'] = choice(branch_list)
new_dict['currency'] = choice(currency_list)
new_dict['amount'] = randint(-100, 100)
return new_dict
ááá¯á·áá±á¬ááºá áá±ážááá¯á·ááá·áºáááºážáááºážááᯠá¡áá¯á¶ážááŒá¯á áá»áœááºá¯ááºááá¯á·ááẠJSON áá±á¬áºáááºááŒáá·áº áá»áœááºá¯ááºááá¯á· ááá¯á¡ááºááá·áº á¡ááŒá±á¬ááºážá¡áá¬áá®ááá¯á· áá¬áá¬áá¶ááá¯á· áááºáá±á·áá»áºáá áºá á±á¬áẠáá±ážááá¯á·áá«áááºá
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
script ááᯠrun áá±á¬á¡áá«á terminal ááœááºá¡á±á¬ááºáá«á á¬ááá¯áá»á¬ážááá¯áá»áœááºá¯ááºááá¯á·áááºáá¶áááŸááááº-
ááá¯ááá¯áááºááŸá¬ á¡áá¬á¡á¬ážáá¯á¶ážááẠáá»áœááºá¯ááºááá¯á·á¡ááá¯ááŸáááá·áºá¡ááá¯ááºáž á¡áá¯ááºáá¯ááºááẠ- áá¯ááºáá¯ááºáá°á áá»áœááºá¯ááºááá¯á·ááá¯á¡ááºááá·áºá¡ááŒá±á¬ááºážá¡áá¬áá®ááá¯á· áááºáá±á·ááºá»áá»á¬ážáá¯ááºáá±ážááŒá®áž áá±ážááá¯á·áá«áááºá
áá±á¬ááºáá
áºááá·áºááŸá¬ Spark ááᯠááá·áºááœááºážááŒá®áž á€áááºáá±á·áá»áºá
á®ážááŒá±á¬ááºážááᯠáá¯ááºáá±á¬ááºáááºááŒá
áºáááºá
Apache Spark ááᯠááá·áºááœááºážááŒááºážá
Apache ááᯠSpark universal ááŸáá·áº high-performance cluster computing platform áá áºáá¯ááŒá áºáááºá
Spark ááẠá¡ááŒááºá¡ááŸááºáá¯á¶á·ááŒááºáá±ážááŒááºážááŸá¯áá»á¬ážááŸáá·áº ááá¯ááºááá¯ááºáá¯ááºááœáŸáá·áºááŸá¯áá¯ááºáá±á¬ááºááŒááºážá¡áá«á¡ááẠááá¯ááá¯áá»ááºááŒáá·áºáá±á¬ ááœááºáá»ááºááŸá¯á¡áá»áá¯ážá¡á á¬ážáá»á¬ážááᯠáá¶á·ááá¯ážáá±ážáá»áááºááœáẠMapReduce áá±á¬áºáááºá áá°ááŒáá¯ááºáá»á¬ážáá±á¬ á¡áá±á¬ááºá¡áááºáá±á¬áºááŸá¯áá»á¬ážááẠááá¯ááá¯áá±á¬ááºážááœááºá áœá¬ áá¯ááºáá±á¬ááºáá«áááºá ááŒááºááŸá¯ááºážááẠáááá Ạááá¯á·ááá¯áẠáá¬áá®áá»á¬ážá áœá¬ á á±á¬áá·áºááá¯ááºážá áá¬áááá¯áá² á¡ááŒááºá¡ááŸááºáá¯á¶á·ááŒááºáá¯ááºáá±á¬ááºááá¯ááºá á±ááá·áº á¡ááŒááºááŸá¯ááºážááŒá áºáá±á¬ááŒá±á¬áá·áº áá±áá¬á¡áá»á¬ážá¡ááŒá¬ážááᯠáá¯ááºáá±á¬ááºáá¬ááœáẠá¡áá±ážááŒá®ážáá±á¬á¡áááºážááá¹áá០áá«áááºáá«áááºá Spark á á¡ááŒá®ážáá¬ážáá¯á¶ážáá±á¬ á¡á¬ážáá¬áá»ááºáá áºáá¯ááŸá¬ áááºážááᯠáá»ááºááŒááºá á±ááá·áº áááºááá¯áá®á¡ááœááºáž ááœááºáá»ááºááŸá¯áá»á¬ážááᯠáá¯ááºáá±á¬ááºááá¯ááºááŒááºážááŒá áºáááºá
á€áá±á¬ááºááᯠScala ááŒáá·áºáá±ážáá¬ážáá¬ážáá±á¬ááŒá±á¬áá·áº áááºážááᯠáŠážá áœá¬ááá·áºááœááºážááẠááá¯á¡ááºáááº-
sudo apt-get install scala
Spark ááŒáá·áºáá»á®ááŒááºážááᯠááá¬ážáááºáááºááá¯ááºá០áá±á«ááºážáá¯ááºáá¯ááºáá«á
wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"
ááŸááºáááºážááᯠáá¯ááºááá¯ážáá«-
sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark
Spark ááá¯á·áááºážááŒá±á¬ááºážááᯠbash ááá¯ááºááá¯á·ááá·áºáá«-
vim ~/.bashrc
áááºážááŒááºáá°ááŸáááá·áº á¡á±á¬ááºáá«á á¬ááŒá±á¬ááºážáá»á¬ážááᯠááá·áºáá«á
SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH
bashrc ááᯠááŒá±á¬ááºážáá²ááŒá®ážáá±á¬áẠá¡á±á¬ááºáá« command ááᯠrun áá«á
source ~/.bashrc
AWS PostgreSQL ááᯠá¡áá¯á¶ážááŒá¯ááŒááºážá
áá»ááºááŸááá±áá±ážáááºááŸá¬ áá»áœááºá¯ááºááá¯á·ááẠáá¯ááºááœáŸáá·áºááŸá¯áá»á¬ážá០áá¯ááºáá±á¬ááºááŒá®ážáá¬áž á¡áá»ááºá¡áááºáá»á¬ážááᯠá¡ááºáá¯ááºáá¯ááºááá·áº áá±áá¬áá±á·á áºááᯠááŒáá·áºáá»ááºáááºááŒá áºáááºá á€á¡ááœáẠáá»áœááºá¯ááºááá¯á·ááẠAWS RDS áááºáá±á¬ááºááŸá¯ááᯠá¡áá¯á¶ážááŒá¯áá«áááºá
AWS ááœááºááá¯ážáẠ-> AWS RDS -> áá±áá¬áá±á·á
Ạ-> áá±áá¬áá±á·á
áºááᯠáááºáá®ážáá«-
PostgreSQL ááá¯ááœá±ážááŒá®áž Next ááá¯ááŸáááºáá«á
áá¬ááŒá
áºááá¯á·áá²ááá¯áá±á¬á· á€á¥ááá¬ááẠááá¬áá±ážááá¯ááºáᬠáááºááœááºáá»ááºá¡ááœááºáá¬ááŒá
áºáááºá áá»áœááºá¯ááºááá¯á·ááẠá¡ááá²á·áá¬áᬠâá¡áááºážáá¯á¶ážâ (á¡ááá²á·á¡ááá·áº) ááᯠá¡áá¯á¶ážááŒá¯áá«áááºá
ááá¯á·áá±á¬ááºá áá»áœááºá¯ááºááá¯á·ááẠFree Tier ááá±á¬ááºááœáẠá¡ááŸááºááŒá
áºáá
áºáá¯ááá·áºáá¬ážááŒá®ážá ááá¯á·áá±á¬ááºááœáẠáá»áœááºá¯ááºááá¯á·ááẠt2.micro á¡áááºážá¡á
á¬áž á¡á¬ážáááºážáá±á¬áºáááºáž áááºážááẠá¡ááá²á·ááŒá
áºááŒá®áž áá»áœááºá¯ááºááá¯á·ááá¯ááºáááºážá¡ááœáẠá¡ááœááºááá·áºáá»á±á¬áºáá«áááºá
áá±á¬ááºáá
áºáá¯á á¡ááœááºá¡áá±ážááŒá®ážáá²á· á¡áá¬ááœá± ááŒá
áºáá²á· áá±áá¬áá±á·á
Ạá¥ááá¬áá²á· á¡áááºá áá¬á
áᬠá¡áá¯á¶ážááŒá¯áá°áá²á· á¡áááºáá²á· áá°á·áá²á· á
áá¬ážááŸááºá á¥ááá¬- myHabrTestá áá¬á
áá¬á¡áá¯á¶ážááŒá¯áá°- habrá
áá¬ážááŸááº- habr12345 ááá¯á·áá±á¬áẠNext ááá¯ááºááᯠááŸáááºáá«á
áá±á¬ááºá á¬áá»ááºááŸá¬ááœáẠáá»áœááºá¯ááºááá¯á·ááá±áá¬áá±á·á áºáá¬áá¬á ááŒááºáááŸáááºáá±á¬ááºááá¯ááºááŸá¯ (á¡áá»á¬ážááŒááºáá°áá¯á¶ážááá¯ááºááŸá¯) ááŸáá·áº ááááºáááºážáááŸáááá¯ááºááŸá¯ááá¯á·á¡ááœáẠáá¬áááºááŸááá±á¬áá±á¬ááºáá»á¬áž ááŸááá«áááºá
ááááºáááºáž 5432 (PostgreSQL) ááŸáá
áºááá·áº áá»áœááºá¯ááºááá¯á·ááá±áá¬áá±á·á
áºáá¬áá¬ááá¯á· ááŒááºáááá¯á·áááºáá±á¬ááºááœáá·áºáá±ážááá·áº VPC áá¯á¶ááŒá¯á¶áá±ážá¡ááœá²á·á¡ááœáẠáááºáááºá¡áá
áºáá
áºáá¯ááᯠáááºáá®ážááŒáá«á
áá¯á·á
VPC Dashboard -> Security Groups -> áá¯á¶ááŒá¯á¶áá±ážá¡á¯ááºá
á¯áá»á¬áž áááºáá®ážááŒááºážááá¹áááá¯á· áá®ážááŒá¬ážááá±á¬ááºáá¬áááºážááá¯ážá០AWS ááœááºááá¯ážááºááá¯á· ááœá¬ážááŒáá«á
áá¯á·á
áá¯á¶ááŒá¯á¶áá±ážá¡ááœá²á·á¡ááœáẠá¡áááºááᯠáá»áœááºá¯ááºááá¯á·áááºááŸááºáá¬ážááẠ- PostgreSQLá áá±á¬áºááŒáá»ááºáá
áºáá¯á áááºááá·áº VPC ááẠá€á¡ááœá²á·ááŸáá·áº áááºá
ááºááá·áºáááºááᯠááœáŸááºááŒááŒá®áž áááºáá®ážááẠááá¯ááºááᯠááŸáááºáá«-
á¡á±á¬ááºáá«áá¯á¶ááœááºááŒáá¬ážááá·áºá¡ááá¯ááºáž á¡áá áºáááºáá®ážáá¬ážáá±á¬á¡á¯ááºá á¯á¡ááœáẠport 5432 á¡ááœáẠInbound á ááºážáá»ááºážáá»á¬ážááᯠááŒáá·áºá áœááºáá«á ááááºáááºážááᯠáááºááá¯ááºááá¯áẠááááºááŸááºááá¯ááºáá±á¬áºáááºáž Type drop-down list á០PostgreSQL ááᯠááœá±ážáá«á
á¡ááá¡áá»ááŒá±á¬ááá»áŸáẠáááºááá¯áž ::/0 ááá¯áááºááŸá¬ ááá¹áá¬áá
áºááŸááºážá០áá¬áá¬ááá¯á· á¡áááºá¡ááœááºáááºážááŒá±á¬ááºážááᯠáááŸáááá¯ááºááŸá¯áᯠááá¯ááá¯áááºá áááºážááẠá¡áááá¹áá«ááºá¡á¬ážááŒáá·áº áá¯á¶ážááááŸááºáá«á ááá¯á·áá±á¬áº ááá°áá¬ááᯠááœá²ááŒááºážá
áááºááŒá¬áááºá á€áá»ááºážáááºáááºážááᯠáá»áœááºá¯ááºááá¯á· á¡áá¯á¶ážááŒá¯ááœáá·áºááŒá¯ááŒáá«á
áá¯á·á
áá»áœááºá¯ááºááá¯á·ááœáẠ"á¡ááá·áºááŒáá·áºáááºáááºáá»á¬ážááá¯ááŒááºáááºáá«" ááá¯ááœáá·áºááŒá®áž VPC áá¯á¶ááŒá¯á¶áá±ážá¡ááœá²á·áá»á¬ážááá¹áááœáẠááœá±ážáá»ááºáá¬ážáá±á¬ ááá±á¬ááºáá¬á
á¬áá»ááºááŸá¬ááá¯á· ááŒááºááœá¬ážááẠ-> ááŸáááŒá®ážáá¬áž VPC áá¯á¶ááŒá¯á¶áá±ážá¡ááœá²á·áá»á¬ážááᯠááœá±ážáá»ááºáá« -> PostgreSQL-
ááá¯á·áá±á¬áẠDatabase options -> Database name -> á¡áááºááᯠáááºááŸááºáá« - habrDB.
á¡áááºááááºážááŒááºáž (á¡áááºááááºážáá¬ážáá»ááẠ- 0 áááºáá»á¬áž)á á
á±á¬áá·áºááŒáá·áºááŒááºážááŸáá·áº á
áœááºážáá±á¬ááºáááºááá¯ááºáᬠááá¯ážááœááºážááááŒááºááŸá¯áá»á¬ážááᯠáá¯á¶ááŸááºá¡á¬ážááŒáá·áº ááááºááŒááºážááŸááœá²á áá»ááºáá±á¬ááºáá»á¬ážááᯠáá»áœááºá¯ááºááá¯á· áá»ááºáá¬ážááá¯ááºáá«áááºá ááá¯ááºááá¯ááŸáááºáá«á áá±áá¬áá±á·á
áºáááºáá®ážáá«á:
ááŒáá¯ážááá¯ááºáá°
áá±á¬ááºáá¯á¶ážá¡ááá·áºááŸá¬ Kafka ááŸáá¬áá±á¬ áá±áá¬á¡áá áºáá»á¬ážááᯠááŸá áºá áá¹ááá·áºááá¯ááºáž áá¯ááºáá±á¬ááºááŒá®áž ááááºááᯠáá±áá¬áá±á·á áºáá²ááá¯á· ááá·áºááœááºážáá±ážááá·áº Spark á¡áá¯ááºá áá±á¬ááºáá¯á¶ážá¡ááá·áº ááœá¶á·ááŒáá¯ážááá¯ážáááºáá¬áááºááŒá áºáááºá
á¡áááºááœááºáá±á¬áºááŒáá²á·ááá·áºá¡ááá¯ááºážá á á áºáá±ážáá±ážááááºáá»á¬ážááẠá¡ááŸá¬ážá¡ááœááºážáá¶ááá¯ááºáááºááŸáá á±áááºá¡ááœáẠááŒááºáááºáááºááŸááºáááá·áº SparkStreaming ááœáẠá¡áááááá¹ááá¬ážáá áºáá¯ááŒá áºáááºá áá»áœááºá¯ááºááá¯á·ááẠá á áºáá±ážáá±ážááááºáá»á¬ážááᯠá¡áá¯á¶ážááŒá¯áááºááŒá áºááŒá®ážá áá¯ááºáá¯á¶ážáá¯ááºáááºážáá¡á±á¬ááºááŒááºáá«á Spark Streaming module ááẠáá±á¬ááºáá¯á¶ážá á áºáá±ážáá±ážááááºááá¯á· ááŒááºááœá¬ážáááºáᬠááá¯á¡ááºáááºááŒá áºááŒá®áž áá»á±á¬ááºáá¯á¶ážááœá¬ážáá±á¬áá±áá¬ááᯠááŒááºáááºááá°áááºá¡ááœáẠáááºážá០ááœááºáá»ááºááŸá¯áá»á¬ážááᯠááŒááºáááºáá¯ááºáá±á¬ááºááẠááá¯á¡ááºáááºááŒá áºáááºá
á á áºáá±ážáá±ážááááºá¡áá»ááºá¡áááºááᯠááááºážáááºážáá¬ážááá·áº á¡ááŸá¬áž-áá¶ááá¯ááºáááºááŸááá±á¬á áá¯á¶ááŒááºá áááºáá»ááá±á¬ ááá¯ááºá áá Ạ(HDFSá S3 á áááºááŒáá·áº) ááœáẠáááºážááŒá±á¬ááºážáá áºáᯠáááºááŸááºááŒááºážááŒáá·áº á á áºáá±ážááŒááºážááᯠááœáá·áºááá¯ááºáááºá á¥ááá¬á¡á¬ážááŒáá·áºá
streamingContext.checkpoint(checkpointDirectory)
áá»áœááºá¯ááºááá¯á·áá¥ááá¬ááœááºá áá»áœááºá¯ááºááá¯á·ááẠá¡á±á¬ááºáá«áá»ááºážáááºáááºážááᯠá¡áá¯á¶ážááŒá¯áááºááŒá áºááŒá®ážá á¡ááá¡áá»ááŒá±á¬ááá»áŸáẠcheckpointDirectory ááŸááá«áá ááá¯á·áá±á¬áẠá¡ááŒá±á¬ááºážá¡áá¬ááᯠá á áºáá±ážáá±ážááááºáá±áá¬á០ááŒááºáááºáááºáá®ážáááºááŒá áºáááºá áááºážááœáŸááºáá»ááºáááŸááá«á (ááá¯ááá¯áááºááŸá¬ áááá¡ááŒáááºáá¯ááºáá±á¬ááºáá²á·áááº)á ááá¯á·áá±á¬áẠáááºá ááºá¡ááŒá±á¬ááºážá¡áá¬á¡áá áºáá áºáá¯áááºáá®ážáááºááŸáá·áº DStreams ááᯠconfigure áá¯ááºááẠfunctionToCreateContext ááᯠáá±á«áºáááº-
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
KafkaUtils á á¬ááŒáá·áºááá¯ááºá createDirectStream áááºážáááºážááᯠá¡áá¯á¶ážááŒá¯á "á¡áá±á¬ááºážá¡áááº" áá±á«ááºážá ááºááŸáá·áº áá»áááºáááºááẠDirectStream á¡áá¬ááá¹áá¯ááᯠáááºáá®ážáááº-
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)
broker_list = 'localhost:9092'
topic = 'transaction'
directKafkaStream = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": broker_list})
JSON áá±á¬áºáááºááŒáá·áº áááºáá¬áá±á¬áá±áá¬ááᯠááá¯ááºážááŒá¬ážá áááºááŒá¬ááŒááºáž-
rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
currency=w['currency'],
amount=w['amount']))
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")
Spark SQL ááá¯á¡áá¯á¶ážááŒá¯á áá»áœááºá¯ááºááá¯á·ááẠááá¯ážááŸááºážáá±á¬á¡á¯ááºá á¯ááœá²á·áᬠconsole ááœááºááááºááá¯ááŒááááº-
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Query á á¬áá¬ážááᯠááá°ááŒá®áž Spark SQL ááŸáááá·áº áá¯ááºáá±á¬ááºááŒááºáž-
sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)
ááá¯á·áá±á¬áẠáá»áœááºá¯ááºááá¯á·ááẠAWS RDS ááŸá ááá¬ážáá áºáá¯ááœáẠáááŸááá¬áá±á¬ á á¯á ááºážáá¬ážáá±á¬áá±áá¬ááᯠááááºážáááºážáá«áááºá á á¯á ááºážááŸá¯ááááºáá»á¬ážááᯠáá±áá¬áá±á·á áºááá¬ážáá áºáá¯ááœáẠááááºážáááºážáááºá DataFrame á¡áá¬ááá¹áá¯á áá±ážáááºážááᯠá¡áá¯á¶ážááŒá¯áá«áááºá
testResultDataFrame.write
.format("jdbc")
.mode("append")
.option("driver", 'org.postgresql.Driver')
.option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB")
.option("dbtable", "transaction_flow")
.option("user", "habr")
.option("password", "habr12345")
.save()
AWS RDS ááá¯á· áá»áááºáááºááŸá¯ á áá áºááá·áºááœááºážááŒááºážááá¯ááºáᬠá áá¬ážáá¯á¶ážá¡áá»áá¯á·á âAWS PostgreSQL ááᯠDeployingâ á¡ááá·áºááœáẠáááºážá¡ááœáẠá¡áá¯á¶ážááŒá¯áá°ááŸáá·áº á áá¬ážááŸááºááᯠáááºáá®ážáá²á·áááºá áá»áááºáááºááŸá¯ááŸáá·áº áá¯á¶ááŒá¯á¶áá±ážááá¹áááœáẠááŒááá¬ážááá·áº áá±áá¬áá±á·á áºáá¬áᬠurl á¡ááŒá ẠEndpoint ááᯠáááºá¡áá¯á¶ážááŒá¯ááá·áºáááº-
Spark ááŸáá·áº Kafka ááᯠááŸááºáááºá áœá¬ áá»áááºáááºááá¯ááºáááºá artifact ááᯠá¡áá¯á¶ážááŒá¯á smark-submit ááŸáá áºááá·áº á¡áá¯ááºááᯠáá¯ááºáá±á¬ááºááá·áºááẠspark-streaming-kafka-0-8_2.11. ááá¯á·á¡ááŒááºá áá»áœááºá¯ááºááá¯á·ááẠPostgreSQL áá±áá¬áá±á·á áºááŸáá·áº á¡ááŒááºá¡ááŸááºáá¯á¶á·ááŒááºáááºá¡ááœáẠááŸá±ážáá±á¬ááºážáá á¹á ááºážáá áºáá¯ááá¯áááºáž á¡áá¯á¶ážááŒá¯áááºááŒá áºáááºá áááºážááá¯á·ááᯠ--packages ááŸáá áºááá·áº ááœáŸá²ááŒá±á¬ááºážáá±ážáááºááŒá áºáááºá
Script á ááŒá±á¬ááºážááœááºááŒááºááœááºááŸá¯á¡ááœááºá áá»áœááºá¯ááºááá¯á·ááẠáááºáá±á·áá»áºáá¬áá¬á á¡áááºááŸáá·áº áá±áá¬áááºáá¶ááá°ááá¯ááá·áº á¡ááŒá±á¬ááºážá¡áá¬ááᯠááá·áºááœááºážááá·áº ááá·áºáááºáá»ááºáá»á¬ážá¡ááŒá Ạáá«áááºáááºááŒá áºáááºá
ááá¯á·ááŒá±á¬áá·áºá á áá áºááá¯ááºáá±á¬ááºááá¯ááºá áœááºážááᯠá áááºá á áºáá±ážááẠá¡áá»áááºáá±á¬ááºááŒá®-
spark-submit
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207
spark_job.py localhost:9092 transaction
á¡á¬ážáá¯á¶ážá¡áááºááŒá±ááœá¬ážáá«ááŒá®á á¡á±á¬ááºáá±á¬áºááŒáá«áá¯á¶ááœáẠáááºááŒááºáááá·áºá¡ááá¯ááºážá á¡ááá®áá±ážááŸááºážááá¯áááºáááºáá±áá»áááºááœááºá áá»áœááºá¯ááºááá¯á·ááẠStreamingContext á¡áá¬ááá¹áá¯ááá¯áááºáá®ážáá±á¬á¡áá«ááœáẠáá»áœááºá¯ááºááá¯á·ááẠá¡ááœá²ááá¯ááºááŒá¬ážáá¬áááᯠ2 á áá¹ááá·áºá¡ááŒá áºáááºááŸááºáá¬ážáá±á¬ááŒá±á¬áá·áº áá±á«ááºážá ááºážááŸá¯ááááºá¡áá áºáá»á¬áž 2 á áá¹ááá·áºááá¯ááºážááœááºááŸááá¬áá«áááºá
ááá¯á·áá±á¬ááºá áá»áœááºá¯ááºááá¯á·ááẠááá¬ážááŸá ááŸááºáááºážáá»á¬áž ááŸááá±ááŒááºážááᯠá á áºáá±ážááẠáá±áá¬áá±á·á áºááá¯á· ááá¯ážááŸááºážáá±á¬ á á¯á¶á ááºážááŸá¯áá áºáᯠááŒá¯áá¯ááºáááºá ááœá±áá±ážááœá±áá°_á á®ážáááºážááŸá¯:
áá±á¬ááºáá»ááº
á€áá±á¬ááºážáá«ážááẠApache Kafka ááŸáá·áº PostgreSQL ááá¯á·ááŸáá·áº ááœá²ááẠSpark Streaming ááᯠá¡áá¯á¶ážááŒá¯á ááááºážá¡áá»ááºá¡ááẠááá¯ááºááá¯ááºáá¯ááºááœáŸáá·áºááŒááºážá ááá°áá¬ááᯠáá±á·áá¬áá²á·áááºá áááºážááŒá áºá¡áá»áá¯ážáá»áá¯ážá០áá±áá¬áá»á¬áž ááá¯ážááœá¬ážáá¬áááºááŸáá·áºá¡áá»áŸ streaming ááŸáá·áº real-time applications áá»á¬ážáááºáá®ážáááºá¡ááœáẠSpark Streaming á áááºááœá±á·áá»áá±á¬áááºááá¯ážááᯠáá»á±á¬áºááœááºááá·áºááŸááºážááẠáááºáá²áááºá
áá»áœááºá¯ááºá repository ááœáẠá¡áááºážá¡ááŒá
áºáá¯ááºá¡ááŒáá·áºá¡á
á¯á¶ááᯠáááºááŸá¬ááœá±ááá¯ááºáá«áááºá
áá®áá±á¬ááºážáá«ážááᯠááœá±ážááœá±ážááᬠáááºážáá¬áá«áááºá áááºážáá²á· ááŸááºáá»ááºááœá±ááᯠá á±á¬áá·áºáá»áŸá±á¬áºáá±ááŒá®áž ááá¯á áá¯ááºáááºáá²á· á á¬áááºáá°á¡á¬ážáá¯á¶ážáá²á· á¡ááŒá¯ááá±á¬áá±á¬ááºáá²á· áá±áááºááŸá¯ááœá±ááᯠáá»áŸá±á¬áºááá·áºáá«áááºá
á¡á±á¬ááºááŒááºáá«á á±ááá¯á· áá¯áá±á¬ááºážáá«áááºá
áᬠá¡á
ááá¯ááºážááŸá¬áá±á¬á· áá±áááœááºáž PostgreSQL áá±áá¬áá±á·á
áºááᯠá¡áá¯á¶ážááŒá¯ááá¯á· á
á®á
ááºáá²á·áá±ááá·áº AWS ááᯠáá»á
áºááŒááºááá¯ážáá¬ááŒá±á¬áá·áº áá±áá¬áá±á·á
áºááᯠcloud ááá¯á· ááœáŸá±á·ááá¯á· áá¯á¶ážááŒááºáá²á·áá«áááºá á€á¡ááŒá±á¬ááºážá¡áá¬á¡ááœáẠáá±á¬ááºáá±á¬ááºážáá«ážááœááºá AWS Kinesis ááŸáá·áº AWS EMR ááá¯á¡áá¯á¶ážááŒá¯á á¡áááºááœááºáá±á¬áºááŒáá¬ážáá±á¬ AWS ááœááºáá±á¬áºááŒáá¬ážáá±á¬ á
áá
áºáá
áºáá¯áá¯á¶ážááᯠáááºááá¯á·á¡áá±á¬ááºá¡áááºáá±á¬áºááááºááᯠáá»áœááºá¯ááºááŒááá«áááºá ááááºážááᯠááá¯ááºáá¬áá«á
source: www.habr.com