ืฉืืื, ืืืจ! ืืืื ื ืื ื ืืขืจืืช ืฉืชืขืื ืืช ืืจืื ืืืืืขืืช ืฉื Apache Kafka ืืืืฆืขืืช Spark Streaming ืืชืืชืื ืืช ืชืืฆืืืช ืืขืืืื ืืืกื ืื ืชืื ืื ืืขื ื AWS RDS.
ืืืื ื ืืืืื ืฉืืืกื ืืฉืจืื ืืกืืื ืืืื ืขืืื ื ืืช ืืืฉืืื ืฉื ืขืืืื ืขืกืงืืืช ื ืื ืกืืช "ืืชื ืืขื" ืืื ืกื ืืคืื. ื ืืชื ืืขืฉืืช ืืืช ืืฆืืจื ืืืฉืื ืืืืื ืฉื ืคืืืืฆืืืช ืืืืข ืคืชืืื ืืืืฆืจ, ืืืืืืช ืื ืชืืฆืืืช ืคืื ื ืกืืืช ืืขืกืงืืืช ืืื'.
ืืื ืืืืฉื ืืช ืืืงืจื ืืื ืืื ืฉืืืืฉ ืืืืฉืืคื ืงืกืืื ืืืฉืคืื - ืงืจื ืืชืืช ืืืืืจื! ืืืืช!
ืืืื
ืืืืื, ืขืืืื ืืืืช ืืืืื ืฉื ื ืชืื ืื ืืืื ืืืช ืืกืคืง ืืืืื ืืืืช ืจืืืช ืืฉืืืืฉ ืืืขืจืืืช ืืืืจื ืืืช. ืืื ืืฉืืืืืื ืืคืืคืืืจืืื ืืืืชืจ ืืื ืืื ืืื ืื ืฉื Apache Kafka ื-Spark Streaming, ืฉืื ืงืคืงื ืืืฆืจ ืืจื ืฉื ืืืืืืช ืืืืขืืช ื ืื ืกืืช, ื-Spark Streaming ืืขืื ืื ืืช ืืื ืืืจืืื ืืื ื ืชืื.
ืืื ืืืืืืจ ืืช ืกืืืืืช ืืชืงืืืช ืฉื ืืืคืืืงืฆืื, ื ืฉืชืืฉ ืื ืงืืืืช ืืืงืืจืช. ืขื ืืื ืื ืื ืืื, ืืืฉืจ ืื ืืข ื-Spark Streaming ืฆืจืื ืืฉืืืจ ื ืชืื ืื ืฉืืืื, ืืื ืจืง ืฆืจืื ืืืืืจ ืื ืงืืืช ืืืืกืื ืืืืจืื ื ืืืืืฉ ืืช ืืืืฉืืืื ืืฉื.
ืืจืืืืงืืืจื ืฉื ืืืขืจืืช ืฉืคืืชืื
ืจืืืืื ืืฉืืืืฉ:
ืืคืืฆ'ื ืงืคืงื ืืื ืืขืจืืช ืืืืืจืช ืืคืจืกืื-ืืจืฉืื ืืืืืขืืช. ืืชืืื ืื ืืฆืจืืืช ืืืืขืืช ืืืฆื ืื ืืงืืื ืืื ืืฆืจืืืช ืืืืขืืช ืืงืืื ืช. ืืื ืืื ืืข ืืืืื ื ืชืื ืื, ืืืืขืืช ืงืคืงื ืืืืืกื ืืช ืืืืกืง ืืืฉืืืคืืืช ืืชืื ืืืฉืืื. ืืขืจืืช ืงืคืงื ืื ืืื ืขื ืืื ืฉืืจืืช ืืกื ืืจืื ZooKeeper;ืืืจืืช ืืคืืฆ'ื ืกืคืืจืง - ืจืืื Spark ืืขืืืื ื ืชืื ืื ืืืจืืื. ืืืืื Spark Streaming ืื ืื ืืืืฆืขืืช ืืจืืืืงืืืจืช ืืืงืจื-ืืฆืืื, ืฉืื ืืจื ืื ืชืื ืื ืืชืคืจืฉ ืืจืฆืฃ ืจืฆืืฃ ืฉื ืื ืืช ื ืชืื ืื ืงืื ืืช. Spark Streaming ืืืงื ื ืชืื ืื ืืืงืืจืืช ืฉืื ืื ืืืฉืื ืืืชื ืืืืืืืช ืงืื ืืช. ืืืืืืช ืืืฉืืช ื ืืฆืจืืช ืืืจืืืื ืืื ืงืืืขืื. ืืชืืืืช ืื ืืจืืื ืืื, ื ืืฆืจืช ืืืืื ืืืฉื, ืืื ืื ืชืื ืื ืืืชืงืืืื ืืืืื ืืืชื ืืจืืื ืืื ื ืืืืื ืืืืืื. ืืชืื ืืืจืืื, ืฆืืืืช ืืืืืื ื ืขืฆืจืช. ืืืื ืืืจืืื ื ืงืืข ืขื ืืื ืคืจืืืจ ืื ืงืจื ืืจืืื ืืฆืืื;Apache Spark SQL - ืืฉืื ืขืืืื ืืืกื ืขื ืชืื ืืช ืคืื ืงืฆืืื ืื ืฉื Spark. ื ืชืื ืื ืืืื ืื ืคืืจืืฉื ื ืชืื ืื ืฉืืฉ ืืื ืกืืืื, ืืืืืจ ืงืืืฆื ืืืช ืฉื ืฉืืืช ืขืืืจ ืื ืืจืฉืืืืช. Spark SQL ืชืืื ืืงืื ืืืืืื ืืงืืจืืช ื ืชืื ืื ืืืื ืื, ืืืืืืช ืืืืื ืืช ืฉื ืืืืข ืกืืืื, ืืื ืืืื ืืืืืจ ืืืขืืืืช ืจืง ืืช ืฉืืืช ืืจืฉืืืืช ืื ืืจืฉืื, ืืื ืืกืคืง ืืืฉืงื DataFrame API;AWS RDS ืืื ืืกื ื ืชืื ืื ืืืกื ืืืืกืก ืขื ื ืืื ืืืกืืช, ืฉืืจืืช ืืื ืืจื ื ืืืคืฉื ืืืืจื, ืชืคืขืื ืืงื ื ืืืื, ืืื ืืื ืืฉืืจืืช ืขื ืืื ืืืืื.
ืืชืงื ื ืืืจืฆื ืฉื ืฉืจืช ืงืคืงื
ืืคื ื ืืฉืืืืฉ ืืฉืืจืืช ืืงืคืงื, ืขืืื ืืืืื ืฉืืฉ ืื Java, ืื... JVM ืืฉืืฉ ืืขืืืื:
sudo apt-get update
sudo apt-get install default-jre
java -version
ืืืื ื ืืฆืืจ ืืฉืชืืฉ ืืืฉ ืฉืืขืืื ืขื ืงืคืงื:
sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo
ืืืืจ ืืื, ืืืจื ืืช ืืืคืฆื ืืืืชืจ ืืจืฉืื ืฉื Apache Kafka:
wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"
ืคืจืง ืืช ืืืจืืืื ืฉืืืจื:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
ืืฉืื ืืื ืืื ืืืคืฆืืื ืื. ืืขืืืื ืืื ืฉืืืืจืืช ืืจืืจืช ืืืืื ืืื ื ืืืคืฉืจืืช ืื ืืืฉืชืืฉ ืืืืืื ืืื ืืชืืื ืืช ืฉื Apache Kafka. ืืืืืื, ืืืง ื ืืฉื, ืงืืืืจืื, ืงืืืฆื ืฉืืืื ื ืืชื ืืคืจืกื ืืืืขืืช. ืืื ืืฉื ืืช ืืืช, ืืืื ื ืขืจืื ืืช ืงืืืฅ ืืชืฆืืจื:
vim ~/kafka/config/server.properties
ืืืกืฃ ืืช ืืืืจืื ืืืืื ืืกืืฃ ืืงืืืฅ:
delete.topic.enable = true
ืืคื ื ืืคืขืืช ืฉืจืช Kafka, ืขืืื ืืืคืขืื ืืช ืฉืจืช ZooKeeper; ืื ื ื ืฉืชืืฉ ืืกืงืจืืคื ืืขืืจ ืฉืืืืข ืขื ืืคืฆืช Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
ืืืืจ ืืคืขืืช ZooKeeper ืืืฆืืื, ืืคืขื ืืช ืฉืจืช Kafka ืืืกืืฃ ื ืคืจื:
bin/kafka-server-start.sh config/server.properties
ืืืื ื ืืฆืืจ ื ืืฉื ืืืฉ ืืฉื ืขืกืงื:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction
ืืืื ื ืืืื ืฉื ืืฆืจ ื ืืฉื ืขื ืืืกืคืจ ืื ืืจืฉ ืฉื ืืืืฆืืช ืืฉืืคืื:
bin/kafka-topics.sh --describe --zookeeper localhost:2181
ืืืื ื ืชืืขืืข ืืจืืขืื ืฉื ืืืืงืช ืืืฆืจื ืืืฆืจืื ืืืื ืื ืืฉื ืืืืฉ ืฉื ืืฆืจ. ืคืจืืื ื ืืกืคืื ืขื ืืืคื ืืืืืงื ืฉื ืฉืืืืช ืืงืืืช ืืืืขืืช ืืชืืืื ืืชืืขืื ืืจืฉืื -
ืืืชื ืืคืืง
ืืืคืืง ืืคืืง ื ืชืื ืื ืืงืจืืืื - 100 ืืืืขืืช ืืื ืฉื ืืื. ืื ืชืื ืื ืืงืจืืืื ืื ื ืืชืืืื ืื ืืืืืื ืืืืจืื ืืฉืืืฉื ืฉืืืช:
- ืกื ืืฃ - ืฉื ื ืงืืืช ืืืืืจื ืฉื ืืืกื ืืืฉืจืื;
- ืึทืึฐืึผึตืขึท - ืืืืข ืขืกืงื;
- ืืืืช - ืกืืื ืืขืกืงื. ืืกืืื ืืืื ืืกืคืจ ืืืืื ืื ืืืืืจ ืืจืืืฉืช ืืืืข ืขื ืืื ืืื ืง, ืืืกืคืจ ืฉืืืื ืื ืืืืืจ ืืืืืจื.
ืืงืื ืฉื ืืืคืืง ื ืจืื ืื:
from numpy.random import choice, randint
def get_random_value():
new_dict = {}
branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
currency_list = ['RUB', 'USD', 'EUR', 'GBP']
new_dict['branch'] = choice(branch_list)
new_dict['currency'] = choice(currency_list)
new_dict['amount'] = randint(-100, 100)
return new_dict
ืืืืจ ืืื, ืืืืฆืขืืช ืฉืืืช ืืฉืืืื, ืื ื ืฉืืืืื ืืืืขื ืืฉืจืช, ืื ืืฉื ืฉืื ื ืฆืจืืืื, ืืคืืจืื JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
ืืขืช ืืคืขืืช ืืกืงืจืืคื, ืื ื ืืงืืืื ืืช ืืืืืขืืช ืืืืืช ืืืจืืื ื:
ืื ืืืืจ ืฉืืื ืขืืื ืืื ืฉืจืฆืื ื โ ืืืคืืง ืืืืฆืจ ืืฉืืื ืืกืจืื ืื ืืฉื ืฉืื ืื ื ืฆืจืืืื.
ืืฉืื ืืื ืืื ืืชืงื ืช Spark ืืขืืืื ืืจื ืืืืืขืืช ืืื.
ืืชืงื ืช Apache Spark
ืืคืืฆ 'ื ืกืคืืจืง ืืื ืคืืืคืืจืืช ืืืฉืื ืืฉืืืืืช ืืื ืืืจืกืืืช ืืืขืืช ืืืฆืืขืื ืืืืืื.
Spark ืืชืคืงื ืืื ืืืชืจ ืืืืืขืืช ืคืืคืืืจืืืช ืฉื ืืืื MapReduce ืชืื ืชืืืื ืืืืืื ืจืื ืืืชืจ ืฉื ืกืืื ืืืฉืื, ืืืื ืฉืืืืชืืช ืืื ืืจืืงืืืืืืช ืืขืืืื ืืจื. ืืืืืจืืช ืืฉ ืชืคืงืื ืืฉืื ืืขืืืื ืืืืืืช ืืืืืืช ืฉื ื ืชืื ืื, ืฉืื ืืืืืจืืช ืืื ืืืืคืฉืจืช ืื ืืขืืื ืืืืคื ืืื ืืจืืงืืืื ืืืื ืืืืื ืืงืืช ืื ืฉืขืืช ืืืชื ื. ืืืช ืืืชืจืื ืืช ืืืืืืื ืฉื Spark ืฉืืืคืืช ืืืชื ืืืืืจ ืื ืื ืืื ืืืืืืช ืฉืื ืืืฆืข ืืืฉืืืื ืืืืืจืื.
ืืืกืืจืช ืืื ืืชืืื ื-Scala, ืื ืชืืืื ืขืืื ืืืชืงืื ืืืชื:
sudo apt-get install scala
ืืืจื ืืช ืืคืฆืช Spark ืืืืชืจ ืืจืฉืื:
wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"
ืคืจืง ืืช ืืืจืืืื:
sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark
ืืืกืฃ ืืช ืื ืชืื ื-Spark ืืงืืืฅ bash:
vim ~/.bashrc
ืืืกืฃ ืืช ืืฉืืจืืช ืืืืืช ืืจื ืืขืืจื:
SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH
ืืคืขื ืืช ืืคืงืืื ืืืื ืืืืจ ืืืฆืืข ืฉืื ืืืื ื-bashrc:
source ~/.bashrc
ืคืจืืกืช AWS PostgreSQL
ืื ืฉื ืืชืจ ืืื ืืคืจืืก ืืช ืืกืืก ืื ืชืื ืื ืืืื ื ืขืื ืืช ืืืืืข ืืืขืืื ืืืืจืืื. ืืฉื ืื ื ืฉืชืืฉ ืืฉืืจืืช AWS RDS.
ืขืืืจ ืื ืืกืืฃ AWS -> AWS RDS -> ืืกืื ื ืชืื ืื -> ืฆืืจ ืืกื ื ืชืื ืื:
ืืืจ PostgreSQL ืืืืฅ ืขื ืืื:
ืื ืืืืื ืื ืืื ืืืืจืืช ืืื ืืืืืช ืืืื; ืื ื ื ืฉืชืืฉ ืืฉืจืช ืืื ืื "ืืคืืืช" (Free Tier):
ืืืืจ ืืื, ืฉืืื ืกืืืื ืืืืฉ Free Tier, ืืืืืจ ืืื ืืืฆืข ืื ื ืืืืืืืืช ืืืคืข ืฉื ืืืืงืช t2.micro - ืืืจืืช ืฉืืื ืืืฉื, ืืื ืืื ืืืช ืืื ืืชืืืื ืืืฉืืื ืฉืื ื:
ืืืืฉื ืืืืขืื ืืืจืื ืืฉืืืื ืืืื: ืฉื ืืืคืข ืืกืืก ืื ืชืื ืื, ืฉื ืืืฉืชืืฉ ืืจืืฉื ืืืกืืกืื ืฉืื. ืืืื ื ืงืจื ืืืืคืข: myHabrTest, ืืฉืชืืฉ ืจืืฉื: ืืืืจ, ืกืืกืื: habr12345 ืืืืฅ ืขื ืืคืชืืจ ืืื:
ืืขืืื ืืื ืืฉ ืคืจืืืจืื ืืืืจืืื ืขื ืื ืืืฉืืช ืฉื ืฉืจืช ืืกื ืื ืชืื ืื ืฉืื ื ืืืืืฅ (Public accessibility) ืืืืื ืืช ืืคืืจืืื:
ืืืื ื ืืฆืืจ ืืืืจื ืืืฉื ืืงืืืฆืช ืืืืืื VPC, ืฉืชืืคืฉืจ ืืืฉื ืืืฆืื ืืช ืืฉืจืช ืืกื ืื ืชืื ืื ืฉืื ื ืืจื ืืฆืืื 5432 (PostgreSQL).
ืืืื ื ืขืืืจ ืืืกืืฃ AWS ืืืืื ืืคืืคื ื ืคืจื ืืืื ืืืืืื ืื ืฉื VPC -> ืงืืืฆืืช ืืืืื -> ืืฆืืจืช ืงืืืฆืช ืืืืื:
ืื ื ืืืืืจืื ืืช ืืฉื ืืงืืืฆืช ืืืืืื - PostgreSQL, ืชืืืืจ, ืืฆืืื ืื ืืืืื VPC ืงืืืฆื ืื ืฆืจืืื ืืืืืช ืืฉืืืืช ืืืืฅ ืขื ืืคืชืืจ ืฆืืจ:
ืืื ืืช ืืืืืื ืื ืื ืกืื ืขืืืจ ืืฆืืื 5432 ืขืืืจ ืืงืืืฆื ืืืืฉื ืฉื ืืฆืจื, ืืคื ืฉืืืฆื ืืชืืื ื ืืืื. ืืชื ืื ืืืื ืืฆืืื ืืช ืืืฆืืื ืืืืคื ืืื ื, ืืื ืืืจ PostgreSQL ืืืจืฉืืื ืื ืคืชืืช ืกืื.
ืืืืคื ืงืคืื ื, ืืขืจื ::/0 ืคืืจืืฉื ืืืื ืืช ืฉื ืชืขืืืจื ื ืื ืกืช ืืฉืจืช ืืื ืจืืื ืืขืืื, ืื ืฉืงื ืื ื ืื ืืืืจื ื ืืื, ืืื ืืื ืื ืชื ืืช ืืืืืื, ืืืื ื ืืคืฉืจ ืืขืฆืื ื ืืืฉืชืืฉ ืืืืฉื ืืื:
ืื ื ืืืืจืื ืืืฃ ืืืคืืคื, ืฉื ื ืคืชื "ืืืืจ ืืืืจืืช ืืชืงืืืืช" ืืืืจ ืืกืขืืฃ ืงืืืฆืืช ืืืืืื ืฉื VPC -> ืืืจ ืงืืืฆืืช ืืืืื ืงืืืืืช ืฉื VPC -> PostgreSQL:
ืืืืจ ืืื, ืืืคืฉืจืืืืช ืืกื ืื ืชืื ืื -> ืฉื ืืกื ืื ืชืื ืื -> ืืืืจ ืืช ืืฉื - habrDB.
ืื ื ืืืืืื ืืืฉืืืจ ืืช ืฉืืจ ืืคืจืืืจืื, ืืืขื ืืืืื ืืืืื (ืชืงืืคืช ืฉืืืจืช ืืืืื - 0 ืืืื), ื ืืืืจ ืืชืืื ืืช ืืืฆืืขืื, ืืืจืืจืช ืืืื. ืืืฅ ืขื ืืืคืชืืจ ืฆืืจ ืืกืืก ื ืชืื ืื:
ืืืคื ืืืืืื
ืืฉืื ืืืืจืื ืืืื ืคืืชืื ืขืืืื ืฉื Spark, ืฉืชืขืื ื ืชืื ืื ืืืฉืื ืืืืืขืื ืืงืคืงื ืื ืฉืชื ืฉื ืืืช ืืชืื ืืก ืืช ืืชืืฆืื ืืืกื ืื ืชืื ืื.
ืืคื ืฉืฆืืื ืืขืื, ืืืกืืืื ืื ืื ืื ืื ืืืื ื-SparkStreaming ืฉืืฉ ืืืืืืจ ืืืชื ืืื ืืืืืื ืกืืืื ืืช ืืชืงืืืช. ืื ื ื ืฉืชืืฉ ืื ืงืืืืช ืืืงืืจืช, ืืื ืืืืื ื ืืฉื, ืืืืื Spark Streaming ืืฆืืจื ืจืง ืืืืืจ ืื ืงืืืช ืืืืืืง ืืืืจืื ื ืืืืืฉ ืืื ื ืืืฉืืืื ืืื ืืฉืืืจ ืืช ืื ืชืื ืื ืฉืืืื.
ื ืืชื ืืืคืขืื ืืช ืืืืกืื ืขื ืืื ืืืืจืช ืกืคืจืืื ืืืขืจืืช ืงืืฆืื ืขืืืื ืืชืงืืืช ืืืืื ื (ืืืื HDFS, S3 ืืื') ืฉืื ืืืฉืืจ ืคืจืื ืืืืกืื. ืื ื ืขืฉื ืืืืฆืขืืช, ืืืฉื:
streamingContext.checkpoint(checkpointDirectory)
ืืืืืื ืฉืื ื, ื ืฉืชืืฉ ืืืืฉื ืืืื, ืืืืืจ, ืื checkpointDirectory ืงืืื, ืื ืืืงืฉืจ ืืืืืฆืจ ืืืืฉ ืื ืชืื ื ืืืืกืื. ืื ืืกืคืจืืื ืื ืงืืืืช (ืืืืืจ ืืืืฆืขืช ืืคืขื ืืจืืฉืื ื), ืืื ื ืงืจื functionToCreateContext ืืื ืืืฆืืจ ืืงืฉืจ ืืืฉ ืืืืืืืจ ืืช DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
ืื ื ืืืฆืจืื ืืืืืืงื DirectStream ืืื ืืืชืืืจ ืื ืืฉื "ืืขืกืงื" ืืืืฆืขืืช ืฉืืืช createDirectStream ืฉื ืกืคืจืืืช KafkaUtils:
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)
broker_list = 'localhost:9092'
topic = 'transaction'
directKafkaStream = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": broker_list})
ื ืืชืื ื ืชืื ืื ื ืื ืกืื ืืคืืจืื JSON:
rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
currency=w['currency'],
amount=w['amount']))
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")
ืืืืฆืขืืช Spark SQL, ืื ื ืขืืฉืื ืงืืืืฅ ืคืฉืื ืืืฆืืืื ืืช ืืชืืฆืื ืืืกืืฃ:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
ืงืืืช ืืงืกื ืืฉืืืืชื ืืืจืฆืชื ืืืืฆืขืืช Spark SQL:
sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)
ืืื ืื ื ืฉืืืจืื ืืช ืื ืชืื ืื ืืืฆืืืจืื ืืืชืงืืืื ืืืืื ื-AWS RDS. ืืื ืืฉืืืจ ืืช ืชืืฆืืืช ืืฆืืืจื ืืืืืช ืืกื ื ืชืื ืื, ื ืฉืชืืฉ ืืฉืืืช ืืืชืืื ืฉื ืืืืืืืงื DataFrame:
testResultDataFrame.write
.format("jdbc")
.mode("append")
.option("driver", 'org.postgresql.Driver')
.option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB")
.option("dbtable", "transaction_flow")
.option("user", "habr")
.option("password", "habr12345")
.save()
ืืื ืืืืื ืขื ืืืืจืช ืืืืืจ ื-AWS RDS. ืืฆืจื ื ืืช ืืืฉืชืืฉ ืืืกืืกืื ืขืืืจื ืืฉืื "ืคืจืืกืช AWS PostgreSQL". ืขืืื ืืืฉืชืืฉ ื- Endpoint ืืืชืืืช ืืืชืจ ืฉื ืฉืจืช ืืกื ืื ืชืื ืื, ืืืืฆืืช ืืกืขืืฃ ืงืืฉืืจืืืช ืืืืืื:
ืขื ืื ืช ืืืืจ ื ืืื ืืช ืกืคืืจืง ืืงืคืงื, ืขืืื ืืืจืืฅ ืืช ืืขืืืื ืืืืฆืขืืช smark-submit ืืืืฆืขืืช ืืืคืฅ spark-streaming-kafka-0-8_2.11. ืื ืืกืฃ, ื ืฉืชืืฉ ืื ืืืคืฅ ืืืื ืืจืืงืฆืื ืขื ืืกื ืื ืชืื ืื PostgreSQL; ื ืขืืืจ ืืืชื ืืืืฆืขืืช --packages.
ืืฆืืจื ืืืืฉืืช ืืกืงืจืืคื ื ืืืื ืืคืจืืืจื ืงืื ืื ืืช ืฉื ืฉืจืช ืืืืืขืืช ืืื ืืฉื ืืื ื ื ืจืฆื ืืงืื ื ืชืื ืื.
ืื, ืืืืข ืืืื ืืืคืขืื ืืืืืืง ืืช ืืคืื ืงืฆืืื ืืืืช ืฉื ืืืขืจืืช:
spark-submit
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207
spark_job.py localhost:9092 transaction
ืืื ืืกืชืืจ! ืืคื ืฉื ืืชื ืืจืืืช ืืชืืื ื ืืืื, ืืืื ืฉืืืคืืืงืฆืื ืคืืขืืช, ืชืืฆืืืช ืฆืืืจื ืืืฉืืช ืืืฆืืืช ืื 2 ืฉื ืืืช, ืืืืืื ืฉืืืืจื ื ืืช ืืจืืื ืืืฆืืื ื-2 ืฉื ืืืช ืืืฉืจ ืืฆืจื ื ืืช ืืืืืืืงื StreamingContext:
ืืืืจ ืืื, ืื ื ืืืฆืขืื ืฉืืืืชื ืคืฉืืื ืืืกื ืื ืชืื ืื ืืื ืืืืืง ืืช ื ืืืืืชื ืฉื ืจืฉืืืืช ืืืืื ืืจืืืช_ืขืกืงื:
ืืกืงื ื
ืืืืจ ืื ืืื ืืืืื ืืขืืืื ืืจื ืฉื ืืืืข ืืืืฆืขืืช Spark Streaming ืืฉืืืื ืขื Apache Kafka ื- PostgreSQL. ืขื ืฆืืืืช ืื ืชืื ืื ืืืงืืจืืช ืฉืื ืื, ืงืฉื ืืืคืจืื ืืขืจืื ืืืขืฉื ืฉื Spark Streaming ืืืฆืืจืช ืกืืจืืืื ื ืืืคืืืงืฆืืืช ืืืื ืืืช.
ืืชื ืืืื ืืืฆืื ืืช ืงืื ืืืงืืจ ืืืื ืืืืืจ ืฉืื ืืืชืืืช
ืื ื ืฉืื ืืืื ืืืืืจ ืื, ืื ื ืืฆืคื ืืืขืจืืชืืื, ืืืงืืื ืื ืืืืงืืจืช ืืื ื ืืื ืืงืืจืืื ืืืืคืชืืื.
ืื ื ืืืืืช ืืื ืืฆืืื!
ืชืืืื. ืืชืืืื ืชืืื ื ืืืฉืชืืฉ ืืืกื ื ืชืื ืื ืืงืืื PostgreSQL, ืื ืืืืจ ืืืืชื ื-AWS, ืืืืืชื ืืืขืืืจ ืืช ืืกื ืื ืชืื ืื ืืขื ื. ืืืืืจ ืืื ืื ืืฉื ืื, ืืจืื ืืืฆื ืืืืฉื ืืช ืื ืืืขืจืืช ืืืชืืืจืช ืืขืื ื-AWS ืืืืฆืขืืช AWS Kinesis ื-AWS EMR. ืขืงืื ืืืจ ืืืืฉืืช!
ืืงืืจ: www.habr.com