Apache Kafka เปเบฅเบฐเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ”เป‰เบงเบ Spark Streaming

เบชเบฐเบšเบฒเบเบ”เบต, Habr! เบกเบทเป‰เบ™เบตเป‰เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบชเป‰เบฒเบ‡เบฅเบฐเบšเบปเบšเบ—เบตเปˆเบˆเบฐเบ›เบฐเบกเบงเบ™เบœเบปเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ‚เปเป‰เบ„เบงเบฒเบก Apache Kafka เป‚เบ”เบเปƒเบŠเป‰ Spark Streaming เปเบฅเบฐเบ‚เบฝเบ™เบœเบปเบ™เบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เปƒเบชเปˆเบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™ AWS RDS cloud.

เบ‚เปเปƒเบซเป‰เบˆเบดเบ™เบ•เบฐเบ™เบฒเบเบฒเบ™เบงเปˆเบฒเบชเบฐเบ–เบฒเบšเบฑเบ™เบชเบดเบ™เป€เบŠเบทเปˆเบญเบชเบฐเป€เบžเบฒเบฐเปƒเบ”เบซเบ™เบถเปˆเบ‡เบเปเบฒเบ™เบปเบ”เปƒเบซเป‰เบžเบงเบเป€เบฎเบปเบฒเบ›เบฐเบ•เบดเบšเบฑเบ”เบซเบ™เป‰เบฒเบ—เบธเบฅเบฐเบเปเบฒเบ—เบตเปˆเป€เบ‚เบปเป‰เบฒเบกเบฒ "เปƒเบ™เบ—เบฑเบ™เบ—เบต" เปƒเบ™เบ—เบปเปˆเบงเบชเบฒเบ‚เบฒเบ‚เบญเบ‡เบกเบฑเบ™. เบ™เบตเป‰เบชเบฒเบกเบฒเบ”เป€เบฎเบฑเบ”เป„เบ”เป‰เป€เบžเบทเปˆเบญเบˆเบธเบ”เบ›เบฐเบชเบปเบ‡เบ‚เบญเบ‡เบเบฒเบ™เบ„เบดเบ”เป„เบฅเปˆเบ—เบฑเบ™เบ—เบตเบ—เบฑเบ™เปƒเบ”เบ•เปเบฒเปเบซเบ™เปˆเบ‡เป€เบ‡เบดเบ™เบ•เบฒเป€เบ›เบตเบ”เบชเปเบฒเบฅเบฑเบšเบ„เบฑเบ‡เป€เบ‡เบดเบ™, เบ‚เบญเบšเป€เบ‚เบ”เบˆเปเบฒเบเบฑเบ”เบซเบผเบทเบœเบปเบ™เป„เบ”เป‰เบฎเบฑเบšเบ—เบฒเบ‡เบ”เป‰เบฒเบ™เบเบฒเบ™เป€เบ‡เบดเบ™เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เป€เบฎเบฑเบ”เบ—เบธเบฅเบฐเบเปเบฒ, เปเบฅเบฐเบญเบทเปˆเบ™เป†.

เบงเบดเบ—เบตเบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ”เบเปเบฅเบฐเบ™เบตเบ™เบตเป‰เป‚เบ”เบเบšเปเปˆเบกเบตเบเบฒเบ™เปƒเบŠเป‰ magic เปเบฅเบฐ magic เบเบฒเบ™เบชเบฐเบเบปเบ”เบ„เปเบฒ - เบญเปˆเบฒเบ™เบžเบฒเบเปƒเบ•เป‰เบเบฒเบ™เบ•เบฑเบ”! เป„เบ›!

Apache Kafka เปเบฅเบฐเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ”เป‰เบงเบ Spark Streaming
(เปเบซเบผเปˆเบ‡เบฎเบนเบš)

เบเบฒเบ™เบ™เปเบฒเบชเบฐเป€เบซเบ™เบต

เปเบ™เปˆเบ™เบญเบ™, เบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบˆเปเบฒเบ™เบงเบ™เบซเบผเบงเบ‡เบซเบผเบฒเบเปƒเบ™เป€เบงเบฅเบฒเบ—เบตเปˆเปเบ—เป‰เบˆเบดเบ‡เปƒเบซเป‰เป‚เบญเบเบฒเบ”เบญเบฑเบ™เบžเบฝเบ‡เบžเปเบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบ™เปเบฒเปƒเบŠเป‰เปƒเบ™เบฅเบฐเบšเบปเบšเบ—เบตเปˆเบ—เบฑเบ™เบชเบฐเป„เบซเบก. เบซเบ™เบถเปˆเบ‡เปƒเบ™เบเบฒเบ™เบ›เบฐเบชเบปเบกเบ›เบฐเบชเบฒเบ™เบ—เบตเปˆเบ™เบดเบเบปเบกเบ—เบตเปˆเบชเบธเบ”เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบ™เบตเป‰เปเบกเปˆเบ™ tandem เบ‚เบญเบ‡ Apache Kafka เปเบฅเบฐ Spark Streaming, เบ—เบตเปˆ Kafka เบชเป‰เบฒเบ‡เบเบฐเปเบชเบ‚เบญเบ‡เบŠเบญเบ‡เบ‚เปเป‰เบ„เบงเบฒเบกเบ—เบตเปˆเป€เบ‚เบปเป‰เบฒเบกเบฒ, เปเบฅเบฐ Spark Streaming เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบŠเบญเบ‡เป€เบซเบผเบปเปˆเบฒเบ™เบตเป‰เปƒเบ™เบŠเปˆเบงเบ‡เป€เบงเบฅเบฒเปƒเบ”เบซเบ™เบถเปˆเบ‡.

เป€เบžเบทเปˆเบญเป€เบžเบตเปˆเบกเบ„เบงเบฒเบกเบ—เบปเบ™เบ—เบฒเบ™เบ„เบงเบฒเบกเบœเบดเบ”เบ‚เบญเบ‡เบ„เปเบฒเบฎเป‰เบญเบ‡เบชเบฐเบซเบกเบฑเบ, เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบ™เปเบฒเปƒเบŠเป‰เบˆเบธเบ”เบเบงเบ”เบเบฒ. เบ”เป‰เบงเบเบเบปเบ™เป„เบเบ™เบตเป‰, เป€เบกเบทเปˆเบญเป€เบ„เบทเปˆเบญเบ‡เบˆเบฑเบ Spark Streaming เบ•เป‰เบญเบ‡เบเบฒเบ™เบเบนเป‰เบ„เบทเบ™เบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเบชเบนเบ™เบซเบฒเบ, เบกเบฑเบ™เบžเบฝเบ‡เปเบ•เปˆเบ•เป‰เบญเบ‡เบเบฒเบ™เบเบฑเบšเบ„เบทเบ™เป„เบ›เบซเบฒเบˆเบธเบ”เบเบงเบ”เบเบฒเบชเบธเบ”เบ—เป‰เบฒเบเปเบฅเบฐเบชเบทเบšเบ•เปเปˆเบเบฒเบ™เบ„เบดเบ”เป„เบฅเปˆเบˆเบฒเบเบšเปˆเบญเบ™เบ™เบฑเป‰เบ™.

เบชเบฐเบ–เบฒเบ›เบฑเบ”เบ•เบฐเบเบฐเบเปเบฒเบ‚เบญเบ‡เบฅเบฐเบšเบปเบšเบเบฒเบ™เบžเบฑเบ”เบ—เบฐเบ™เบฒ

Apache Kafka เปเบฅเบฐเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ”เป‰เบงเบ Spark Streaming

เบญเบปเบ‡โ€‹เบ›เบฐโ€‹เบเบญเบšโ€‹เบ—เบตเปˆโ€‹เบ™เปเบฒโ€‹เปƒเบŠเป‰โ€‹:

  • Apache Kafka เปเบกเปˆเบ™เบฅเบฐเบšเบปเบšเบเบฒเบ™เบชเบปเปˆเบ‡เบ‚เปเป‰เบ„เบงเบฒเบกเบ—เบตเปˆเป€เบœเบตเบเปเบœเปˆ-เบชเบฐเปเบฑเบเบชเบฐเบกเบฒเบŠเบดเบ. เป€เบซเบกเบฒเบฐเบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบšเปเบฅเบดเป‚เบžเบเบ‚เปเป‰เบ„เบงเบฒเบกเบญเบญเบšเป„เบฅเบ™เปŒเปเบฅเบฐเบญเบญเบ™เป„เบฅเบ™เปŒ. เป€เบžเบทเปˆเบญเบ›เป‰เบญเบ‡เบเบฑเบ™เบเบฒเบ™เบชเบนเบ™เป€เบชเบเบ‚เปเป‰เบกเบนเบ™, เบ‚เปเป‰เบ„เบงเบฒเบก Kafka เบ–เบทเบเป€เบเบฑเบšเป„เบงเป‰เปƒเบ™เปเบœเปˆเบ™เปเบฅเบฐ replicated เบžเบฒเบเปƒเบ™ cluster. เบฅเบฐเบšเบปเบš Kafka เบ–เบทเบเบชเป‰เบฒเบ‡เบ‚เบถเป‰เบ™เบขเบนเปˆเป€เบ—เบดเบ‡เบชเบธเบ”เบ‚เบญเบ‡เบเบฒเบ™เบšเปเบฅเบดเบเบฒเบ™ synchronization ZooKeeper;
  • Apache Spark Streaming - เบญเบปเบ‡เบ›เบฐเบเบญเบš Spark เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™ streaming. เป‚เบกเบ”เบนเบ™ Spark Streaming เบ–เบทเบเบชเป‰เบฒเบ‡เบ‚เบถเป‰เบ™เป‚เบ”เบเปƒเบŠเป‰เบชเบฐเบ–เบฒเบ›เบฑเบ”เบ•เบฐเบเบฐเบเปเบฒเบˆเบธเบ™เบฅเบฐเบžเบฒเบ, เบšเปˆเบญเบ™เบ—เบตเปˆเบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ‚เปเป‰เบกเบนเบ™เบ–เบทเบเบ•เบตเบ„เบงเบฒเบกเบงเปˆเบฒเป€เบ›เบฑเบ™เบฅเปเบฒเบ”เบฑเบšเบขเปˆเบฒเบ‡เบ•เปเปˆเป€เบ™เบทเปˆเบญเบ‡เบ‚เบญเบ‡เบŠเบธเบ”เบ‚เปเป‰เบกเบนเบ™เบ‚เบฐเบซเบ™เบฒเบ”เบ™เป‰เบญเบ. Spark Streaming เป€เบญเบปเบฒเบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเปเบซเบผเปˆเบ‡เบ•เปˆเบฒเบ‡เป† เปเบฅเบฐเบฅเบงเบกเบกเบฑเบ™เป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เบŠเบธเบ”เบ™เป‰เบญเบเป†. เปเบžเบฑเบเป€เบเบ”เปƒเบซเบกเปˆเบ–เบทเบเบชเป‰เบฒเบ‡เบ‚เบทเป‰เบ™เปƒเบ™เบŠเปˆเบงเบ‡เป€เบงเบฅเบฒเบ›เบปเบเบเบฐเบ•เบด. เปƒเบ™เบ•เบญเบ™เป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เบ‚เบญเบ‡เปเบ•เปˆเบฅเบฐเบŠเปˆเบงเบ‡เป€เบงเบฅเบฒ, เปเบžเบฑเบเป€เบเบฑเบ”เปƒเปเปˆเบ–เบทเบเบชเป‰เบฒเบ‡เบ‚เบทเป‰เบ™, เปเบฅเบฐเบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเป„เบ”เป‰เบฎเบฑเบšเปƒเบ™เบฅเบฐเบซเบงเปˆเบฒเบ‡เบ™เบฑเป‰เบ™เปเบกเปˆเบ™เบฅเบงเบกเบขเบนเปˆเปƒเบ™เปเบžเบฑเบเป€เบเบฑเบ”. เปƒเบ™เบ•เบญเบ™เบ—เป‰เบฒเบเบ‚เบญเบ‡เป„เบฅเบเบฐเบซเปˆเบฒเบ‡, เบเบฒเบ™เบ‚เบฐเบซเบเบฒเบเบ•เบปเบงเบ‚เบญเบ‡เปเบžเบฑเบเป€เบเบฑเบ”เบขเบธเบ”. เบ‚เบฐเบซเบ™เบฒเบ”เบ‚เบญเบ‡เป„เบฅเบเบฐเบซเปˆเบฒเบ‡เปเบกเปˆเบ™เบ–เบทเบเบเปเบฒเบ™เบปเบ”เป‚เบ”เบเบžเบฒเบฅเบฒเบกเบดเป€เบ•เบตเบ—เบตเปˆเป€เบญเบตเป‰เบ™เบงเปˆเบฒ batch interval;
  • Apache Spark SQL - เบ›เบฐโ€‹เบชเบปเบกโ€‹เบ›เบฐโ€‹เบชเบฒเบ™โ€‹เบเบฒเบ™โ€‹เบ›เบฐโ€‹เบกเบงเบ™โ€‹เบœเบปเบ™โ€‹เบ—เบตเปˆโ€‹เบเปˆเบฝเบงโ€‹เบ‚เป‰เบญเบ‡โ€‹เบเบฑเบš Spark เบเบฒเบ™โ€‹เบ”เปเบฒโ€‹เป€เบ™เบตเบ™โ€‹เป‚เบ„เบ‡โ€‹เบเบฒเบ™โ€‹เบ—เบตเปˆโ€‹เป€เบ›เบฑเบ™โ€‹เบ›เบฐโ€‹เป‚เบซเบเบ”โ€‹. เบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเบกเบตเป‚เบ„เบ‡เบชเป‰เบฒเบ‡เบซเบกเบฒเบเบ„เบงเบฒเบกเบงเปˆเบฒเบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเบกเบต schema, เบ™เบฑเป‰เบ™เปเบกเปˆเบ™, เบŠเบธเบ”เบ”เบฝเบงเบ‚เบญเบ‡เบžเบฒเบเบชเบฐเบซเบ™เบฒเบกเบชเปเบฒเบฅเบฑเบšเบšเบฑเบ™เบ—เบถเบเบ—เบฑเบ‡เบซเบกเบปเบ”. Spark SQL เบชเบฐเบซเบ™เบฑเบšเบชเบฐเบซเบ™เบนเบ™เบเบฒเบ™เบ›เป‰เบญเบ™เบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเบซเบผเบฒเบเป†เปเบซเบผเปˆเบ‡เบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเบกเบตเป‚เบ„เบ‡เบชเป‰เบฒเบ‡เปเบฅเบฐ, เบเป‰เบญเบ™เบเบฒเบ™เบกเบตเบ‚เปเป‰เบกเบนเบ™เบ‚เบญเบ‡ schema, เบกเบฑเบ™เบชเบฒเบกเบฒเบ”เบ”เบถเบ‡เบ‚เปเป‰เบกเบนเบ™เบžเบฝเบ‡เปเบ•เปˆเบžเบฒเบเบชเบฐเบซเบ™เบฒเบกเบ—เบตเปˆเบเปเบฒเบ™เบปเบ”เป„เบงเป‰เบขเปˆเบฒเบ‡เบกเบตเบ›เบฐเบชเบดเบ”เบ—เบดเบžเบฒเบš, เปเบฅเบฐเบเบฑเบ‡เบชเบฐเบซเบ™เบญเบ‡ DataFrame APIs;
  • AWS RDS เป€เบ›เบฑเบ™เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เบ„เบงเบฒเบกเบชเปเบฒเบžเบฑเบ™เบ—เบตเปˆเบญเบตเบ‡เปƒเบชเปˆเบ„เบฅเบฒเบงเบ—เบตเปˆเบกเบตเบฅเบฒเบ„เบฒเบšเปเปˆเปเบžเบ‡, เบเบฒเบ™เบšเปเบฅเบดเบเบฒเบ™เป€เบงเบฑเบšเบ—เบตเปˆเบ‡เปˆเบฒเบเปƒเบ™เบเบฒเบ™เบ•เบดเบ”เบ•เบฑเป‰เบ‡, เบเบฒเบ™เบ”เปเบฒเป€เบ™เบตเบ™เบ‡เบฒเบ™เปเบฅเบฐเบเบฒเบ™เบ›เบฑเบšเบ‚เบฐเบซเบ™เบฒเบ”, เปเบฅเบฐเบ–เบทเบเบ„เบธเป‰เบกเบ„เบญเบ‡เป‚เบ”เบเบเบปเบ‡เป‚เบ”เบ Amazon.

เบเบฒเบ™เบ•เบดเบ”เบ•เบฑเป‰เบ‡เปเบฅเบฐเปเบฅเปˆเบ™เป€เบ„เบทเปˆเบญเบ‡เปเบกเปˆเบ‚เปˆเบฒเบ Kafka

เบเปˆเบญเบ™เบ—เบตเปˆเบˆเบฐเปƒเบŠเป‰ Kafka เป‚เบ”เบเบเบปเบ‡, เบ—เปˆเบฒเบ™เบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เปƒเบซเป‰เปเบ™เปˆเปƒเบˆเบงเปˆเบฒเบ—เปˆเบฒเบ™เบกเบต Java, เป€เบžเบฒเบฐเบงเปˆเบฒ ... JVM เบ–เบทเบเบ™เปเบฒเปƒเบŠเป‰เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เป€เบฎเบฑเบ”เบงเบฝเบ:

sudo apt-get update 
sudo apt-get install default-jre
java -version

เบกเบฒเบชเป‰เบฒเบ‡เบœเบนเป‰เปƒเบŠเป‰เปƒเปเปˆเป€เบžเบทเปˆเบญเป€เบฎเบฑเบ”เบงเบฝเบเบเบฑเบš Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

เบ•เปเปˆเป„เบ›, เบ”เบฒเบงเป‚เบซเบฅเบ”เบเบฒเบ™เปเบˆเบเบขเบฒเบเบˆเบฒเบเป€เบงเบฑเบšเป„เบŠเบ—เปŒเบ—เบฒเบ‡เบเบฒเบ™ Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

เบ–เบญเบ”เบŠเบญเบ‡เปเบŸเป‰เบกเบ—เบตเปˆเบ”เบฒเบงเป‚เบซเบฅเบ”เบกเบฒ:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

เบ‚เบฑเป‰เบ™เบ•เบญเบ™เบ•เปเปˆเป„เบ›เปเบกเปˆเบ™เบ—เบฒเบ‡เป€เบฅเบทเบญเบ. เบ„เบงเบฒเบกเบˆเบดเบ‡เปเบกเปˆเบ™เบงเปˆเบฒเบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เบšเปเปˆเบญเบฐเบ™เบธเบเบฒเบ”เปƒเบซเป‰เบ—เปˆเบฒเบ™เปƒเบŠเป‰เบ„เบงเบฒเบกเบชเบฒเบกเบฒเบ”เบ—เบฑเบ‡เบซเบกเบปเบ”เบ‚เบญเบ‡ Apache Kafka. เบ•เบปเบงเบขเปˆเบฒเบ‡, เบฅเบถเบšเบซเบปเบงเบ‚เปเป‰, เปเบงเบ”เปเบนเปˆ, เบเบธเปˆเบกเบ—เบตเปˆเบ‚เปเป‰เบ„เบงเบฒเบกเบชเบฒเบกเบฒเบ”เป€เบœเบตเบเปเบœเปˆเป„เบ”เป‰. เป€เบžเบทเปˆเบญเบ›เปˆเบฝเบ™เบญเบฑเบ™เบ™เบตเป‰, เปƒเบซเป‰เป€เบฎเบปเบฒเปเบเป‰เป„เบ‚เป„เบŸเบฅเปŒเบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒ:

vim ~/kafka/config/server.properties

เป€เบžเบตเปˆเบกเบชเบดเปˆเบ‡เบ•เปเปˆเป„เบ›เบ™เบตเป‰เปƒเบชเปˆเบ—เป‰เบฒเบเป„เบŸเบฅเปŒ:

delete.topic.enable = true

เบเปˆเบญเบ™เบ—เบตเปˆเบˆเบฐเป€เบฅเบตเปˆเบกเป€เบŠเบตเบšเป€เบงเบต Kafka, เบ—เปˆเบฒเบ™เบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เป€เบฅเบตเปˆเบกเป€เบŠเบตเบšเป€เบงเบต ZooKeeper; เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเปƒเบŠเป‰เบชเบฐเบ„เบดเบšเป€เบชเบตเบกเบ—เบตเปˆเบกเบฒเบžเป‰เบญเบกเบเบฑเบšเบเบฒเบ™เปเบˆเบเบขเบฒเบ Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

เบซเบผเบฑเบ‡เบˆเบฒเบ ZooKeeper เป„เบ”เป‰เป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เบขเปˆเบฒเบ‡เบชเปเบฒเป€เบฅเบฑเบ”เบœเบปเบ™, เป€เบ›เบตเบ”เป€เบ„เบทเปˆเบญเบ‡เปเบกเปˆเบ‚เปˆเบฒเบ Kafka เปƒเบ™ terminal เปเบเบเบ•เปˆเบฒเบ‡เบซเบฒเบ:

bin/kafka-server-start.sh config/server.properties

เปƒเบซเป‰เบžเบงเบเป€เบฎเบปเบฒเบชเป‰เบฒเบ‡เบซเบปเบงเบ‚เปเป‰เปƒเบซเบกเปˆเบ—เบตเปˆเป€เบญเบตเป‰เบ™เบงเปˆเบฒเบเบฒเบ™เป€เบฎเบฑเบ”เบ—เบธเบฅเบฐเบเปเบฒ:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

เปƒเบซเป‰เปเบ™เปˆเปƒเบˆเบงเปˆเบฒเบซเบปเบงเบ‚เปเป‰เบ—เบตเปˆเบกเบตเบˆเปเบฒเบ™เบงเบ™เบžเบฒเบ—เบดเบŠเบฑเบ™เบ—เบตเปˆเบ•เป‰เบญเบ‡เบเบฒเบ™เปเบฅเบฐเบเบฒเบ™เบˆเปเบฒเบฅเบญเบ‡เป„เบ”เป‰เบ–เบทเบเบชเป‰เบฒเบ‡เบ‚เบทเป‰เบ™:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka เปเบฅเบฐเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ”เป‰เบงเบ Spark Streaming

เปƒเบซเป‰เบžเบฒเบ”เป‚เบญเบเบฒเบ”เบ‚เบญเบ‡เบเบฒเบ™เบ—เบปเบ”เบชเบญเบšเบœเบนเป‰เบœเบฐเบฅเบดเบ”เปเบฅเบฐเบœเบนเป‰เบšเปเบฅเบดเป‚เบžเบเบชเปเบฒเบฅเบฑเบšเบซเบปเบงเบ‚เปเป‰เบ—เบตเปˆเบชเป‰เบฒเบ‡เปƒเบซเบกเปˆ. เบฅเบฒเบเบฅเบฐเบญเบฝเบ”เป€เบžเบตเปˆเบกเป€เบ•เบตเบกเบเปˆเบฝเบงเบเบฑเบšเบงเบดเบ—เบตเบ—เบตเปˆเบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบ—เบปเบ”เบชเบญเบšเบเบฒเบ™เบชเบปเปˆเบ‡เปเบฅเบฐเบฎเบฑเบšเบ‚เปเป‰เบ„เบงเบฒเบกเบ–เบทเบเบ‚เบฝเบ™เป„เบงเป‰เปƒเบ™เป€เบญเบเบฐเบชเบฒเบ™เบ—เบฒเบ‡เบเบฒเบ™ - เบชเบปเปˆเบ‡เบ‚เปเป‰เบ„เบงเบฒเบกเบšเบฒเบ‡เบญเบฑเบ™. เบ”เบต, เบžเบงเบเป€เบฎเบปเบฒเบเป‰เบฒเบเป„เบ›เบ‚เบฝเบ™เบœเบนเป‰เบœเบฐเบฅเบดเบ”เปƒเบ™ Python เป‚เบ”เบเปƒเบŠเป‰ KafkaProducer API.

เบœเบนเป‰เบœเบฐเบฅเบดเบ”เบ‚เบฝเบ™

เบœเบนเป‰เบœเบฐเบฅเบดเบ”เบˆเบฐเบชเป‰เบฒเบ‡เบ‚เปเป‰เบกเบนเบ™เปเบšเบšเบชเบธเปˆเบก - 100 เบ‚เปเป‰เบ„เบงเบฒเบกเบ—เบธเบเป†เบงเบดเบ™เบฒเบ—เบต. เป‚เบ”เบเบ‚เปเป‰เบกเบนเบ™เปเบšเบšเบชเบธเปˆเบกเบžเบงเบเป€เบฎเบปเบฒเบซเบกเบฒเบเป€เบ–เบดเบ‡เบงเบฑเบ”เบˆเบฐเบ™เบฒเบ™เบธเบเบปเบกเบ—เบตเปˆเบ›เบฐเบเบญเบšเบ”เป‰เบงเบเบชเบฒเบกเบŠเปˆเบญเบ‡เบ‚เปเป‰เบกเบนเบ™:

  • เบชเบฒเบ‚เบฒ - เบŠเบทเปˆเบ‚เบญเบ‡เบˆเบธเบ”เบ‚เบฒเบเบ‚เบญเบ‡เบชเบฐเบ–เบฒเบšเบฑเบ™เบชเบดเบ™เป€เบŠเบทเปˆเบญ;
  • เบชเบฐเบเบธเบ™เป€เบ‡เบดเบ™ - เป€เบ‡เบดเบ™เบ•เบฒเบเบฒเบ™เป€เบฎเบฑเบ”เบ—เบธเบฅเบฐเบเปเบฒ;
  • เบˆเปเบฒเบ™เบงเบ™ - เบˆเปเบฒโ€‹เบ™เบงเบ™โ€‹เบเบฒเบ™โ€‹เป‚เบญเบ™โ€‹. เบˆเปเบฒเบ™เบงเบ™เบ”เบฑเปˆเบ‡เบเปˆเบฒเบงเบˆเบฐเป€เบ›เบฑเบ™เบ•เบปเบงเป€เบฅเบเบšเบงเบเบ–เป‰เบฒเบกเบฑเบ™เป€เบ›เบฑเบ™เบเบฒเบ™เบŠเบทเป‰เบ‚เบญเบ‡เบชเบฐเบเบธเบ™เป€เบ‡เบดเบ™เป‚เบ”เบเบ—เบฐเบ™เบฒเบ„เบฒเบ™, เปเบฅเบฐเบ•เบปเบงเป€เบฅเบเบฅเบปเบšเบ–เป‰เบฒเบกเบฑเบ™เป€เบ›เบฑเบ™เบเบฒเบ™เบ‚เบฒเบ.

เบฅเบฐเบซเบฑเบ”เบชเปเบฒเบฅเบฑเบšเบœเบนเป‰เบœเบฐเบฅเบดเบ”เป€เบšเบดเปˆเบ‡เบ„เบทเบงเปˆเบฒ:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

เบ•เปเปˆเป„เบ›, เบเบฒเบ™เบ™เปเบฒเปƒเบŠเป‰เบงเบดเบ—เบตเบเบฒเบ™เบชเบปเปˆเบ‡, เบžเบงเบเป€เบฎเบปเบฒเบชเบปเปˆเบ‡เบ‚เปเป‰เบ„เบงเบฒเบกเป„เบ›เบซเบฒเป€เบ„เบทเปˆเบญเบ‡เปเบกเปˆเบ‚เปˆเบฒเบ, เป„เบ›เบซเบฒเบซเบปเบงเบ‚เปเป‰เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบ•เป‰เบญเบ‡เบเบฒเบ™, เปƒเบ™เบฎเบนเบšเปเบšเบš JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

เป€เบกเบทเปˆเบญเปเบฅเปˆเบ™เบชเบฐเบ„เบฃเบดเบš, เบžเบงเบเป€เบฎเบปเบฒเป„เบ”เป‰เบฎเบฑเบšเบ‚เปเป‰เบ„เบงเบฒเบกเบ•เปเปˆเป„เบ›เบ™เบตเป‰เบขเบนเปˆเปƒเบ™ terminal:

Apache Kafka เปเบฅเบฐเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ”เป‰เบงเบ Spark Streaming

เบ™เบตเป‰เบซเบกเบฒเบเบ„เบงเบฒเบกเบงเปˆเบฒเบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡เป€เบฎเบฑเบ”เบงเบฝเบเบ•เบฒเบกเบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบ•เป‰เบญเบ‡เบเบฒเบ™ - เบœเบนเป‰เบœเบฐเบฅเบดเบ”เบชเป‰เบฒเบ‡เปเบฅเบฐเบชเบปเปˆเบ‡เบ‚เปเป‰เบ„เบงเบฒเบกเป„เบ›เบซเบฒเบซเบปเบงเบ‚เปเป‰เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบ•เป‰เบญเบ‡เบเบฒเบ™.
เบ‚เบฑเป‰เบ™เบ•เบญเบ™เบ•เปเปˆเป„เบ›เปเบกเปˆเบ™เบเบฒเบ™เบ•เบดเบ”เบ•เบฑเป‰เบ‡ Spark เปเบฅเบฐเบ›เบฐเบกเบงเบ™เบœเบปเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ‚เปเป‰เบ„เบงเบฒเบกเบ™เบตเป‰.

เบเบฒเบ™เบ•เบดเบ”เบ•เบฑเป‰เบ‡ Apache Spark

Apache Spark เป€เบ›เบฑเบ™เปเบžเบฅเบฐเบ•เบฐเบŸเบญเบกเบ„เบญเบกเบžเบตเบงเป€เบ•เบตเบ‚เบญเบ‡เบเบธเปˆเบกเปเบšเบšเบ—เบปเปˆเบงเป†เป„เบ› เปเบฅเบฐ เบ›เบฐเบชเบดเบ”เบ—เบดเบžเบฒเบšเบชเบนเบ‡.

Spark เบ›เบฐเบ•เบดเบšเบฑเบ”เป„เบ”เป‰เบ”เบตเบเบงเปˆเบฒเบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ”เบ—เบตเปˆเบ™เบดเบเบปเบกเบ‚เบญเบ‡เบฎเบนเบšเปเบšเบš MapReduce เปƒเบ™เบ‚เบฐเบ™เบฐเบ—เบตเปˆเบชเบฐเบซเบ™เบฑเบšเบชเบฐเบซเบ™เบนเบ™เบ›เบฐเป€เบžเบ”เบเบฒเบ™เบ„เบดเบ”เป„เบฅเปˆเบ—เบตเปˆเบเบงเป‰เบฒเบ‡เบ‚เบงเบฒเบ‡, เบฅเบงเบกเบ—เบฑเบ‡เบเบฒเบ™เบชเบญเบšเบ–เบฒเบกเปเบšเบšเป‚เบ•เป‰เบ•เบญเบšเปเบฅเบฐเบเบฒเบ™เบ›เบธเบ‡เปเบ•เปˆเบ‡เบ™เป‰เปเบฒ. เบ„เบงเบฒเบกเป„เบงเบกเบตเบšเบปเบ”เบšเบฒเบ”เบชเปเบฒเบ„เบฑเบ™เปƒเบ™เป€เบงเบฅเบฒเบ—เบตเปˆเบเบฒเบ™เบ›เบธเบ‡เปเบ•เปˆเบ‡เบ‚เปเป‰เบกเบนเบ™เบˆเปเบฒเบ™เบงเบ™เบซเบฅเบฒเบ, เป€เบ™เบทเปˆเบญเบ‡เบˆเบฒเบเบงเปˆเบฒเบกเบฑเบ™เป€เบ›เบฑเบ™เบ„เบงเบฒเบกเป„เบงเบ—เบตเปˆเบŠเปˆเบงเบเปƒเบซเป‰เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เป€เบฎเบฑเบ”เบงเบฝเบเปเบšเบšเป‚เบ•เป‰เบ•เบญเบšเป„เบ”เป‰เป‚เบ”เบเบšเปเปˆเบ•เป‰เบญเบ‡เปƒเบŠเป‰เป€เบงเบฅเบฒเบ™เบฒเบ—เบตเบซเบผเบทเบŠเบปเปˆเบงเป‚เบกเบ‡เบฅเปเบ–เป‰เบฒ. เบซเบ™เบถเปˆเบ‡เปƒเบ™เบˆเบธเบ”เปเบ‚เบ‡เบ—เบตเปˆเปƒเบซเบเปˆเบ—เบตเปˆเบชเบธเบ”เบ‚เบญเบ‡ Spark เบ—เบตเปˆเป€เบฎเบฑเบ”เปƒเบซเป‰เบกเบฑเบ™เป„เบงเบซเบผเบฒเบเปเบกเปˆเบ™เบ„เบงเบฒเบกเบชเบฒเบกเบฒเบ”เปƒเบ™เบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ”เบเบฒเบ™เบ„เบดเบ”เป„เบฅเปˆเปƒเบ™เบซเบ™เปˆเบงเบเบ„เบงเบฒเบกเบˆเปเบฒ.

เบเบญเบšเบ™เบตเป‰เบ–เบทเบเบ‚เบฝเบ™เป„เบงเป‰เปƒเบ™ Scala, เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™เบ—เปˆเบฒเบ™เบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เบ•เบดเบ”เบ•เบฑเป‰เบ‡เบกเบฑเบ™เบเปˆเบญเบ™:

sudo apt-get install scala

เบ”เบฒเบงเป‚เบซเบฅเบ”เบเบฒเบ™เปเบˆเบเบขเบฒเบ Spark เบˆเบฒเบเป€เบงเบฑเบšเป„เบŠเบ—เปŒเบ—เบฒเบ‡เบเบฒเบ™:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

เบ–เบญเบ™โ€‹เบเบฒเบ™โ€‹เป€เบเบฑเบšโ€‹เบฎเบฑเบโ€‹เบชเบฒโ€‹เป„เบงเป‰โ€‹:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

เป€เบžเบตเปˆเบกเป€เบชเบฑเป‰เบ™เบ—เบฒเบ‡เป„เบ› Spark เบเบฑเบšเป„เบŸเบฅเปŒ bash:

vim ~/.bashrc

เป€เบžเบตเปˆเบกเป€เบชเบฑเป‰เบ™เบ•เปเปˆเป„เบ›เบ™เบตเป‰เบœเปˆเบฒเบ™เบšเบฑเบ™เบ™เบฒเบ—เบดเบเบฒเบ™:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

เบ”เปเบฒเป€เบ™เบตเบ™เบเบฒเบ™เบ„เปเบฒเบชเบฑเปˆเบ‡เบ‚เป‰เบฒเบ‡เบฅเบธเปˆเบกเบ™เบตเป‰เบซเบผเบฑเบ‡เบˆเบฒเบเป€เบฎเบฑเบ”เบเบฒเบ™เบ›เปˆเบฝเบ™เปเบ›เบ‡ bashrc:

source ~/.bashrc

เบเบฒเบ™เบ™เบณเปƒเบŠเป‰ AWS PostgreSQL

เบ—เบฑเบ‡เบซเบกเบปเบ”เบ—เบตเปˆเบเบฑเบ‡เป€เบซเบผเบทเบญเปเบกเปˆเบ™เบเบฒเบ™เบ™เปเบฒเปƒเบŠเป‰เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบญเบฑเบšเป‚เบซเบฅเบ”เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ›เบธเบ‡เปเบ•เปˆเบ‡เบˆเบฒเบเบชเบฒเบเบ™เป‰เปเบฒ. เบชเปเบฒเบฅเบฑเบšเบ™เบตเป‰เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเปƒเบŠเป‰เบšเปเบฅเบดเบเบฒเบ™ AWS RDS.

เป„เบ›เบ—เบตเปˆเบ„เบญเบ™เป‚เบŠ AWS -> AWS RDS -> เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™ -> เบชเป‰เบฒเบ‡เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™:
Apache Kafka เปเบฅเบฐเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ”เป‰เบงเบ Spark Streaming

เป€เบฅเบทเบญเบ PostgreSQL เปเบฅเบฐเบ„เบฅเบดเบ Next:
Apache Kafka เปเบฅเบฐเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ”เป‰เบงเบ Spark Streaming

เป€เบ™เบทเปˆเบญเบ‡เบˆเบฒเบเบงเปˆเบฒ เบ•เบปเบงเบขเปˆเบฒเบ‡เบ™เบตเป‰เปเบกเปˆเบ™เป€เบžเบทเปˆเบญเบเบฒเบ™เบชเบถเบเบชเบฒเป€เบ—เบปเปˆเบฒเบ™เบฑเป‰เบ™; เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเปƒเบŠเป‰เป€เบŠเบตเบšเป€เบงเบตเบŸเบฃเบต โ€œเบขเปˆเบฒเบ‡เบ•เปเปˆเบฒโ€ (Free Tier):
Apache Kafka เปเบฅเบฐเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ”เป‰เบงเบ Spark Streaming

เบ•เปเปˆเป„เบ›, เบžเบงเบเป€เบฎเบปเบฒเป€เบญเบปเบฒเบซเบกเบฒเบเบ•เบดเบเปƒเบชเปˆเปƒเบ™ Free Tier block, เปเบฅเบฐเบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเป„เบ”เป‰เบฎเบฑเบšเบเบฒเบ™เบชเบฐเป€เบซเบ™เบตเบญเบฑเบ”เบ•เบฐเป‚เบ™เบกเบฑเบ”เบ‚เบญเบ‡ t2.micro class - เป€เบ–เบดเบ‡เปเบกเปˆเบ™เบงเปˆเบฒเบญเปˆเบญเบ™เปเบญ, เบกเบฑเบ™เปเบกเปˆเบ™เบšเปเปˆเป€เบชเบเบ„เปˆเบฒเปเบฅเบฐเบ‚เป‰เบญเบ™เบ‚เป‰เบฒเบ‡เป€เบซเบกเบฒเบฐเบชเบปเบกเบชเปเบฒเบฅเบฑเบšเบงเบฝเบเบ‡เบฒเบ™เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ:
Apache Kafka เปเบฅเบฐเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ”เป‰เบงเบ Spark Streaming

เบ•เปเปˆเป„เบ›เบชเบดเปˆเบ‡เบ—เบตเปˆเบกเบตเบ„เบงเบฒเบกเบชเปเบฒเบ„เบฑเบ™เบซเบผเบฒเบ: เบŠเบทเปˆเบ‚เบญเบ‡เบ•เบปเบงเบขเปˆเบฒเบ‡เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™, เบŠเบทเปˆเบ‚เบญเบ‡เบœเบนเป‰เปƒเบŠเป‰เบ•เบปเป‰เบ™เบชเบฐเบšเบฑเบšเปเบฅเบฐเบฅเบฐเบซเบฑเบ”เบœเปˆเบฒเบ™เบ‚เบญเบ‡เบฅเบฒเบง. เปƒเบซเป‰เบ•เบฑเป‰เบ‡เบŠเบทเปˆเบ•เบปเบงเบขเปˆเบฒเบ‡: myHabrTest, เบœเบนเป‰เปƒเบŠเป‰เบ•เบปเป‰เบ™เบชเบฐเบšเบฑเบš: habr, เบฅเบฐโ€‹เบซเบฑเบ”โ€‹เบœเปˆเบฒเบ™โ€‹: habr12345 เปเบฅเบฐโ€‹เปƒเบซเป‰โ€‹เบ„เบฅเบดเบโ€‹เปƒเบชเปˆโ€‹เบ›เบธเปˆเบกโ€‹เบ•เปเปˆโ€‹เป„เบ›โ€‹:
Apache Kafka เปเบฅเบฐเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ”เป‰เบงเบ Spark Streaming

เปƒเบ™เบซเบ™เป‰เบฒเบ•เปเปˆเป„เบ›เบกเบตเบžเบฒเบฅเบฒเบกเบดเป€เบ•เบตเบ—เบตเปˆเบฎเบฑเบšเบœเบดเบ”เบŠเบญเบšเบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เป€เบ‚เบปเป‰เบฒเป€เบ–เบดเบ‡เบ‚เบญเบ‡เป€เบ„เบทเปˆเบญเบ‡เปเบกเปˆเบ‚เปˆเบฒเบเบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเบˆเบฒเบเบžเบฒเบเบ™เบญเบ (เบเบฒเบ™เป€เบ‚เบปเป‰เบฒเป€เบ–เบดเบ‡เบชเบฒเบ—เบฒเบฅเบฐเบ™เบฐ) เปเบฅเบฐเบ„เบงเบฒเบกเบžเป‰เบญเบกเบ‚เบญเบ‡เบžเบญเบ”:

Apache Kafka เปเบฅเบฐเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ”เป‰เบงเบ Spark Streaming

เปƒเบซเป‰เบชเป‰เบฒเบ‡เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเปƒเบซเบกเปˆเบชเปเบฒเบฅเบฑเบšเบเบธเปˆเบกเบ„เบงเบฒเบกเบ›เบญเบ”เป„เบž VPC, เป€เบŠเบดเปˆเบ‡เบˆเบฐเบŠเปˆเบงเบเปƒเบซเป‰เบเบฒเบ™เป€เบ‚เบปเป‰เบฒเป€เบ–เบดเบ‡เบžเบฒเบเบ™เบญเบเบเบฑเบšเป€เบ„เบทเปˆเบญเบ‡เปเบกเปˆเบ‚เปˆเบฒเบเบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเบœเปˆเบฒเบ™เบžเบญเบ” 5432 (PostgreSQL).
เปƒเบซเป‰เป„เบ›เบ—เบตเปˆเบ„เบญเบ™เป‚เบŠ AWS เปƒเบ™เบ›เปˆเบญเบ‡เบขเป‰เบฝเบกเบ‚เบญเบ‡เบ•เบปเบงเบ—เปˆเบญเบ‡เป€เบงเบฑเบšเปเบเบเบ•เปˆเบฒเบ‡เบซเบฒเบเป„เบ›เบซเบฒ VPC Dashboard -> เบเบธเปˆเบกเบ„เบงเบฒเบกเบ›เบญเบ”เป„เบž -> เบชเป‰เบฒเบ‡เบเบธเปˆเบกเบ„เบงเบฒเบกเบ›เบญเบ”เป„เบž:
Apache Kafka เปเบฅเบฐเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ”เป‰เบงเบ Spark Streaming

เบžเบงเบเป€เบฎเบปเบฒเบ•เบฑเป‰เบ‡เบŠเบทเปˆเบชเปเบฒเบฅเบฑเบšเบเบธเปˆเบกเบ„เบงเบฒเบกเบ›เบญเบ”เป„เบž - PostgreSQL, เบ„เปเบฒเบญเบฐเบ—เบดเบšเบฒเบ, เบŠเบตเป‰เบšเบญเบเบงเปˆเบฒ VPC เบเบธเปˆเบกเบ™เบตเป‰เบ„เบงเบ™เบˆเบฐเบ–เบทเบเป€เบŠเบทเปˆเบญเบกเป‚เบเบ‡เบเบฑเบšเปเบฅเบฐเบเบปเบ”เบ›เบธเปˆเบกเบชเป‰เบฒเบ‡:
Apache Kafka เปเบฅเบฐเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ”เป‰เบงเบ Spark Streaming

เบ•เบทเปˆเบกเบ‚เปเป‰เบกเบนเบ™เปƒเบชเปˆเบเบปเบ”เบฅเบฐเบšเบฝเบšเบ‚เบฒเป€เบ‚เบปเป‰เบฒเบชเปเบฒเบฅเบฑเบšเบžเบญเบ” 5432 เบชเปเบฒเบฅเบฑเบšเบเบธเปˆเบกเบ—เบตเปˆเบชเป‰เบฒเบ‡เปƒเบซเบกเปˆ, เบ”เบฑเปˆเบ‡เบ—เบตเปˆเบชเบฐเปเบ”เบ‡เบขเบนเปˆเปƒเบ™เบฎเบนเบšเบ‚เป‰เบฒเบ‡เบฅเบธเปˆเบกเบ™เบตเป‰. เบ—เปˆเบฒเบ™เบšเปเปˆเบชเบฒเบกเบฒเบ”เบฅเบฐเบšเบธเบžเบญเบ”เบ”เป‰เบงเบเบ•เบปเบ™เป€เบญเบ‡, เปเบ•เปˆเป€เบฅเบทเบญเบ PostgreSQL เบˆเบฒเบเบฅเบฒเบเบเบฒเบ™เปเบšเบšเป€เบฅเบทเปˆเบญเบ™เบฅเบปเบ‡เบ›เบฐเป€เบžเบ”.

เป€เบงเบปเป‰เบฒเบขเปˆเบฒเบ‡เป€เบ‚เบฑเป‰เบกเบ‡เบงเบ”, เบกเบนเบ™เบ„เปˆเบฒ ::/0 เบซเบกเบฒเบเป€เบ–เบดเบ‡เบเบฒเบ™เบกเบตเบเบฒเบ™เบˆเบฐเบฅเบฒเบˆเบญเบ™เป€เบ‚เบปเป‰เบฒเบกเบฒเบเบฑเบšเป€เบ„เบทเปˆเบญเบ‡เปเบกเปˆเบ‚เปˆเบฒเบเบˆเบฒเบเบ—เบปเปˆเบงเป‚เบฅเบ, เป€เบŠเบดเปˆเบ‡เบšเปเปˆเปเบกเปˆเบ™เบ„เบงเบฒเบกเบˆเบดเบ‡เบ—เบฑเบ‡เบซเบกเบปเบ”, เปเบ•เปˆเป€เบžเบทเปˆเบญเบงเบดเป€เบ„เบฒเบฐเบ•เบปเบงเบขเปˆเบฒเบ‡, เปƒเบซเป‰เบžเบงเบเป€เบฎเบปเบฒเบญเบฐเบ™เบธเบเบฒเบ”เปƒเบซเป‰เปƒเบŠเป‰เบงเบดเบ—เบตเบเบฒเบ™เบ™เบตเป‰:
Apache Kafka เปเบฅเบฐเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ”เป‰เบงเบ Spark Streaming

เบžเบงเบเป€เบฎเบปเบฒเบเบฑเบšเบ„เบทเบ™เป„เบ›เบซเบฒเบซเบ™เป‰เบฒเบ•เบปเบงเบ—เปˆเบญเบ‡เป€เบงเบฑเบš, เบšเปˆเบญเบ™เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบกเบต "เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบ‚เบฑเป‰เบ™เบชเบนเบ‡" เป€เบ›เบตเบ”เปเบฅเบฐเป€เบฅเบทเบญเบเปƒเบ™เบžเบฒเบเบเบธเปˆเบกเบ„เบงเบฒเบกเบ›เบญเบ”เป„เบž VPC -> เป€เบฅเบทเบญเบเบเบธเปˆเบกเบ„เบงเบฒเบกเบ›เบญเบ”เป„เบž VPC เบ—เบตเปˆเบกเบตเบขเบนเปˆเปเบฅเป‰เบง -> PostgreSQL:
Apache Kafka เปเบฅเบฐเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ”เป‰เบงเบ Spark Streaming

เบ•เปเปˆเป„เบ›, เปƒเบ™เบ•เบปเบงเป€เบฅเบทเบญเบเบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™ -> เบŠเบทเปˆเบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™ -> เบ•เบฑเป‰เบ‡เบŠเบทเปˆ - habrDB.

เบžเบงเบเป€เบฎเบปเบฒเบชเบฒเบกเบฒเบ”เบญเบญเบเบˆเบฒเบเบ•เบปเบงเบเปเบฒเบ™เบปเบ”เบเบฒเบ™เบ—เบตเปˆเบเบฑเบ‡เป€เบซเบผเบทเบญ, เบเบปเบเป€เบงเบฑเป‰เบ™เบเบฒเบ™เบ›เบดเบ”เบเบฒเบ™เบชเปเบฒเบฎเบญเบ‡เบ‚เปเป‰เบกเบนเบ™ (เป„เบฅเบเบฐเป€เบงเบฅเบฒเป€เบเบฑเบšเบฎเบฑเบเบชเบฒเบชเปเบฒเบฎเบญเบ‡เบ‚เปเป‰เบกเบนเบ™ - 0 เบกเบทเป‰), เบเบฒเบ™เบ•เบดเบ”เบ•เบฒเบกเปเบฅเบฐเบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ” Insights, เป‚เบ”เบเบ„เปˆเบฒเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™. เปƒเบซเป‰เบ„เบฅเบดเบเปƒเบชเปˆเบ›เบธเปˆเบก เบชเป‰เบฒเบ‡เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™:
Apache Kafka เปเบฅเบฐเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ”เป‰เบงเบ Spark Streaming

เบ•เบปเบงเบˆเบฑเบ”เบเบฒเบ™เบเบฐเบ—เบนเป‰

เบ‚เบฑเป‰เบ™เบ•เบญเบ™เบชเบธเบ”เบ—เป‰เบฒเบเบˆเบฐเป€เบ›เบฑเบ™เบเบฒเบ™เบžเบฑเบ”เบ—เบฐเบ™เบฒเบงเบฝเบ Spark, เป€เบŠเบดเปˆเบ‡เบˆเบฐเบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เปƒเบซเบกเปˆเบ—เบตเปˆเบกเบฒเบˆเบฒเบ Kafka เบ—เบธเบเป†เบชเบญเบ‡เบงเบดเบ™เบฒเบ—เบตเปเบฅเบฐเปƒเบชเปˆเบœเบปเบ™เป„เบ”เป‰เบฎเบฑเบšเป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™.

เบ”เบฑเปˆเบ‡เบ—เบตเปˆเป„เบ”เป‰เบเปˆเบฒเบงเป„เบงเป‰เบ‚เป‰เบฒเบ‡เป€เบ—เบดเบ‡, เบˆเบธเบ”เบเบงเบ”เบเบฒเปเบกเปˆเบ™เบเบปเบ™เป„เบเบซเบผเบฑเบเปƒเบ™ SparkStreaming เบ—เบตเปˆเบ•เป‰เบญเบ‡เป„เบ”เป‰เบฎเบฑเบšเบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเป€เบžเบทเปˆเบญเบฎเบฑเบšเบ›เบฐเบเบฑเบ™เบ„เบงเบฒเบกเบ—เบปเบ™เบ—เบฒเบ™เบ•เปเปˆเบ„เบงเบฒเบกเบœเบดเบ”เบžเบฒเบ”. เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบ™เปเบฒเปƒเบŠเป‰เบˆเบธเบ”เบเบงเบ”เบเบฒเปเบฅเบฐ, เบ–เป‰เบฒเบ‚เบฑเป‰เบ™เบ•เบญเบ™เบฅเบปเป‰เบกเป€เบซเบฅเบง, เป‚เบกเบ”เบนเบ™ Spark Streaming เบžเบฝเบ‡เปเบ•เปˆเบ•เป‰เบญเบ‡เบเบฒเบ™เบเบฑเบšเบ„เบทเบ™เป„เบ›เบซเบฒเบˆเบธเบ”เบเบงเบ”เบเบฒเบชเบธเบ”เบ—เป‰เบฒเบเปเบฅเบฐเบชเบทเบšเบ•เปเปˆเบเบฒเบ™เบ„เบดเบ”เป„เบฅเปˆเบˆเบฒเบเบกเบฑเบ™เป€เบžเบทเปˆเบญเบŸเบทเป‰เบ™เบ•เบปเบงเบ‚เปเป‰เบกเบนเบ™เบชเบนเบ™เป€เบชเบ.

เบเบฒเบ™เบเบงเบ”เบชเบญเบšเบชเบฒเบกเบฒเบ”เบ–เบทเบเป€เบ›เบตเบ”เปƒเบŠเป‰เป‚เบ”เบเบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเป„เบ”เป€เบฅเบเบฐเบ—เปเบฅเบตเปƒเบ™เบฅเบฐเบšเบปเบšเป„เบŸเบฅเปŒเบ—เบตเปˆเบ—เบปเบ™เบ—เบฒเบ™เบ•เปเปˆเบ„เบงเบฒเบกเบœเบดเบ”, เป€เบŠเบทเปˆเบญเบ–เบทเป„เบ”เป‰ (เป€เบŠเบฑเปˆเบ™ HDFS, S3, เปเบฅเบฐเบญเบทเปˆเบ™เป†) เป€เบŠเบดเปˆเบ‡เบ‚เปเป‰เบกเบนเบ™เบ”เปˆเบฒเบ™เบˆเบฐเบ–เบทเบเป€เบเบฑเบšเป„เบงเป‰. เบ™เบตเป‰เปเบกเปˆเบ™เป€เบฎเบฑเบ”เป„เบ”เป‰เป‚เบ”เบเปƒเบŠเป‰เบ•เบปเบงเบขเปˆเบฒเบ‡:

streamingContext.checkpoint(checkpointDirectory)

เปƒเบ™เบ•เบปเบงเบขเปˆเบฒเบ‡เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ, เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบ™เปเบฒเปƒเบŠเป‰เบงเบดเบ—เบตเบเบฒเบ™เบ”เบฑเปˆเบ‡เบ•เปเปˆเป„เบ›เบ™เบตเป‰, เบ„เบท, เบ–เป‰เบฒ checkpointDirectory เบกเบต, เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™เบชเบฐเบžเบฒเบšเบเบฒเบ™เบˆเบฐเบ–เบทเบเบชเป‰เบฒเบ‡เปƒเบซเบกเปˆเบˆเบฒเบเบ‚เปเป‰เบกเบนเบ™เบ”เปˆเบฒเบ™. เบ–เป‰เบฒเป„เบ”เป€เบฅเบเบฐเบ—เปเบฅเบตเบšเปเปˆเบกเบต (i. e. เบ›เบฐเบ•เบดเบšเบฑเบ”เบ„เบฑเป‰เบ‡เบ—เปเบฒเบญเบดเบ”), เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™ functionToCreateContext เบ–เบทเบเป€เบญเบตเป‰เบ™เป€เบžเบทเปˆเบญเบชเป‰เบฒเบ‡เบชเบฐเบžเบฒเบšเบเบฒเบ™เปƒเบซเบกเปˆเปเบฅเบฐเบเปเบฒเบซเบ™เบปเบ”เบ„เปˆเบฒ DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

เบžเบงเบเป€เบฎเบปเบฒเบชเป‰เบฒเบ‡เบงเบฑเบ”เบ–เบธ DirectStream เป€เบžเบทเปˆเบญเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบเบฑเบšเบซเบปเบงเบ‚เปเป‰ "เบ—เบธเบฅเบฐเบเปเบฒ" เป‚เบ”เบเปƒเบŠเป‰เบงเบดเบ—เบตเบเบฒเบ™ createDirectStream เบ‚เบญเบ‡เบซเป‰เบญเบ‡เบชเบฐเปเบธเบ” KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

เบเบณเบฅเบฑเบ‡เบงเบดเป€เบ„เบฒเบฐเบ‚เปเป‰เบกเบนเบ™เบ‚เบฒเป€เบ‚เบปเป‰เบฒเปƒเบ™เบฎเบนเบšเปเบšเบš JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

เบเบฒเบ™เบ™เปเบฒเปƒเบŠเป‰ Spark SQL, เบžเบงเบเป€เบฎเบปเบฒเป€เบฎเบฑเบ”เบเบฒเบ™เบˆเบฑเบ”เบเบธเปˆเบกเปเบšเบšเบ‡เปˆเบฒเบเบ”เบฒเบเปเบฅเบฐเบชเบฐเปเบ”เบ‡เบœเบปเบ™เป„เบ”เป‰เบฎเบฑเบšเปƒเบ™ console:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

เป€เบญเบปเบฒเบ‚เปเป‰เบ„เบงเบฒเบกเบชเบญเบšเบ–เบฒเบกเปเบฅเบฐเปเบฅเปˆเบ™เบกเบฑเบ™เบœเปˆเบฒเบ™ Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

เปเบฅเบฐเบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™เบžเบงเบเป€เบฎเบปเบฒเบšเบฑเบ™เบ—เบถเบเบ‚เปเป‰เบกเบนเบ™เบฅเบงเบกเบœเบปเบ™เป„เบ”เป‰เบฎเบฑเบšเป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เปƒเบ™ AWS RDS. เป€เบžเบทเปˆเบญเบšเบฑเบ™เบ—เบถเบเบœเบปเบ™เบเบฒเบ™เบฅเบงเบšเบฅเบงเบกเป„เบงเป‰เปƒเบ™เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™, เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเปƒเบŠเป‰เบงเบดเบ—เบตเบเบฒเบ™เบ‚เบฝเบ™เบ‚เบญเบ‡เบงเบฑเบ”เบ–เบธ DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

เบชเบญเบ‡เบชเบฒเบกเบ„เปเบฒเบเปˆเบฝเบงเบเบฑเบšเบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบเบฑเบš AWS RDS. เบžเบงเบเป€เบฎเบปเบฒเบชเป‰เบฒเบ‡เบœเบนเป‰เปƒเบŠเป‰เปเบฅเบฐเบฅเบฐเบซเบฑเบ”เบœเปˆเบฒเบ™เบชเปเบฒเบฅเบฑเบšเบกเบฑเบ™เบขเบนเปˆเปƒเบ™เบ‚เบฑเป‰เบ™เบ•เบญเบ™ "Deploying AWS PostgreSQL". เบ—เปˆเบฒเบ™เบ„เบงเบ™เปƒเบŠเป‰ Endpoint เป€เบ›เบฑเบ™ url เป€เบŠเบตเบšเป€เบงเบตเบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™, เป€เบŠเบดเปˆเบ‡เบชเบฐเปเบ”เบ‡เบขเบนเปˆเปƒเบ™เบžเบฒเบ Connectivity & security:

Apache Kafka เปเบฅเบฐเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ”เป‰เบงเบ Spark Streaming

เป€เบžเบทเปˆเบญเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆ Spark เปเบฅเบฐ Kafka เบขเปˆเบฒเบ‡เบ–เบทเบเบ•เป‰เบญเบ‡, เบ—เปˆเบฒเบ™เบ„เบงเบ™เป€เบฎเบฑเบ”เบงเบฝเบเบœเปˆเบฒเบ™ smark-submit เป‚เบ”เบเปƒเบŠเป‰ artifact. spark-streaming-kafka-0-8_2.11. เบ™เบญเบเบˆเบฒเบเบ™เบฑเป‰เบ™, เบžเบงเบเป€เบฎเบปเบฒเบเบฑเบ‡เบˆเบฐเปƒเบŠเป‰เบชเบดเปˆเบ‡เบ›เบฐเบ”เบดเบ”เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบžเบปเบงเบžเบฑเบ™เบเบฑเบšเบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™ PostgreSQL; เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเป‚เบญเบ™เบžเบงเบเบกเบฑเบ™เบœเปˆเบฒเบ™ --packages.

เบชเปเบฒเบฅเบฑเบšเบ„เบงเบฒเบกเบเบทเบ”เบซเบเบธเปˆเบ™เบ‚เบญเบ‡เบชเบฐเบ„เบดเบš, เบžเบงเบเป€เบฎเบปเบฒเบเบฑเบ‡เบˆเบฐเบ›เบฐเบเบญเบšเบกเบตเบ•เบปเบงเบเปเบฒเบ™เบปเบ”เบเบฒเบ™เบ›เป‰เบญเบ™เบ‚เปเป‰เบกเบนเบ™เบŠเบทเปˆเบ‚เบญเบ‡เป€เบ„เบทเปˆเบญเบ‡เปเบกเปˆเบ‚เปˆเบฒเบเบ‚เปเป‰เบ„เบงเบฒเบกเปเบฅเบฐเบซเบปเบงเบ‚เปเป‰เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบ•เป‰เบญเบ‡เบเบฒเบ™เบ—เบตเปˆเบˆเบฐเป„เบ”เป‰เบฎเบฑเบšเบ‚เปเป‰เบกเบนเบ™.

เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™, เบกเบฑเบ™เป€เบ›เบฑเบ™เป€เบงเบฅเบฒเบ—เบตเปˆเบˆเบฐเป€เบ›เบตเบ”เบ•เบปเบงเปเบฅเบฐเบเบงเบ”เป€เบšเบดเปˆเบ‡เบเบฒเบ™เป€เบฎเบฑเบ”เบงเบฝเบเบ‚เบญเบ‡เบฅเบฐเบšเบปเบš:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

เบ—เบธเบเบขเปˆเบฒเบ‡เบชเบณเป€เบฅเบฑเบ”เปเบฅเป‰เบง! เบ”เบฑเปˆเบ‡เบ—เบตเปˆเบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เป€เบซเบฑเบ™เป„เบ”เป‰เปƒเบ™เบฎเบนเบšเบ‚เป‰เบฒเบ‡เบฅเบธเปˆเบกเบ™เบตเป‰, เปƒเบ™เบ‚เบฐเบ™เบฐเบ—เบตเปˆเปเบญเบฑเบšเบžเบฅเบดเป€เบ„เบŠเบฑเบ™เบเปเบฒเบฅเบฑเบ‡เป€เบฎเบฑเบ”เบงเบฝเบ, เบœเบปเบ™เบเบฒเบ™เบฅเบงเบšเบฅเบงเบกเปƒเบซเบกเปˆเปเบกเปˆเบ™เบœเบปเบ™เป„เบ”เป‰เบฎเบฑเบšเบ—เบธเบเป† 2 เบงเบดเบ™เบฒเบ—เบต, เป€เบžเบฒเบฐเบงเปˆเบฒเบžเบงเบเป€เบฎเบปเบฒเบเปเบฒเบ™เบปเบ”เบŠเปˆเบงเบ‡เป€เบงเบฅเบฒ batching เป€เบ›เบฑเบ™ 2 เบงเบดเบ™เบฒเบ—เบตเป€เบกเบทเปˆเบญเบžเบงเบเป€เบฎเบปเบฒเบชเป‰เบฒเบ‡เบงเบฑเบ”เบ–เบธ StreamingContext:

Apache Kafka เปเบฅเบฐเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ”เป‰เบงเบ Spark Streaming

เบ•เปเปˆเป„เบ›, เบžเบงเบเป€เบฎเบปเบฒเป€เบฎเบฑเบ”เปเบšเบšเบชเบญเบšเบ–เบฒเบกเบ‡เปˆเบฒเบเป†เบเบฑเบšเบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เป€เบžเบทเปˆเบญเบเบงเบ”เป€เบšเบดเปˆเบ‡เบเบฒเบ™เบ›เบฐเบเบปเบ”เบ•เบปเบงเบ‚เบญเบ‡เบšเบฑเบ™เบ—เบถเบเปƒเบ™เบ•เบฒเบ•เบฐเบฅเบฒเบ‡ transaction_flow:

Apache Kafka เปเบฅเบฐเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ”เป‰เบงเบ Spark Streaming

เบชเบฐเบซเบฅเบธเบš

เบšเบปเบ”เบ„เบงเบฒเบกเบ™เบตเป‰เป„เบ”เป‰เป€เบšเบดเปˆเบ‡เบ•เบปเบงเบขเปˆเบฒเบ‡เบ‚เบญเบ‡เบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เป‚เบ”เบเปƒเบŠเป‰ Spark Streaming เป‚เบ”เบเบชเบปเบกเบ—เบปเบšเบเบฑเบš Apache Kafka เปเบฅเบฐ PostgreSQL. เบ”เป‰เบงเบเบเบฒเบ™เบ‚เบฐเบซเบเบฒเบเบ•เบปเบงเบ‚เบญเบ‡เบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเปเบซเบผเปˆเบ‡เบ•เปˆเบฒเบ‡เป†, เบกเบฑเบ™เป€เบ›เบฑเบ™เบเบฒเบ™เบเบฒเบเบ—เบตเปˆเบˆเบฐเบ›เบฐเป€เบกเบตเบ™เบกเบนเบ™เบ„เปˆเบฒเบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ”เบ‚เบญเบ‡ Spark Streaming เป€เบเบตเบ™เป„เบ›เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบชเป‰เบฒเบ‡เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เปเบฅเบฐเบ„เปเบฒเบฎเป‰เบญเบ‡เบชเบฐเบซเบกเบฑเบเปƒเบ™เป€เบงเบฅเบฒเบˆเบดเบ‡.

เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบŠเบญเบเบซเบฒเบฅเบฐเบซเบฑเบ”เปเบซเบผเปˆเบ‡เป€เบ•เบฑเบกเปƒเบ™ repository เบ‚เบญเบ‡เบ‚เป‰เบญเบเบขเบนเปˆเบ—เบตเปˆ GitHub.

เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบเบดเบ™เบ”เบตเบ—เบตเปˆเบˆเบฐเบชเบปเบ™เบ—เบฐเบ™เบฒเบšเบปเบ”เบ„เบงเบฒเบกเบ™เบตเป‰, เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบซเบงเบฑเบ‡เบงเปˆเบฒเบˆเบฐเป„เบ”เป‰เบ„เปเบฒเบ„เบดเบ”เป€เบซเบฑเบ™เบ‚เบญเบ‡เบ—เปˆเบฒเบ™, เปเบฅเบฐเบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบเบฑเบ‡เบซเบงเบฑเบ‡เบงเปˆเบฒเบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบงเบดเบžเบฒเบเบงเบดเบˆเบฒเบ™เบเบฒเบ™เบเปเปˆเบชเป‰เบฒเบ‡เบˆเบฒเบเบœเบนเป‰เบญเปˆเบฒเบ™เบ—เบตเปˆเป€เบ›เบฑเบ™เบซเปˆเบงเบ‡เป€เบ›เบฑเบ™เป„เบเบ—เบธเบเบ„เบปเบ™.

เบ‚เป‰เบฒโ€‹เบžเบฐโ€‹เป€เบˆเบปเป‰เบฒโ€‹เบ‚เปโ€‹เปƒเบซเป‰โ€‹เบ—เปˆเบฒเบ™โ€‹เบชเบปเบšโ€‹เบœเบปเบ™โ€‹เบชเปเบฒโ€‹เป€เบฅเบฑเบ”โ€‹!

เบ›. เปƒเบ™เป€เบšเบทเป‰เบญเบ‡เบ•เบปเป‰เบ™เบกเบฑเบ™เป„เบ”เป‰เบ–เบทเบเบงเบฒเบ‡เปเบœเบ™เบ—เบตเปˆเบˆเบฐเปƒเบŠเป‰เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™ PostgreSQL เบ—เป‰เบญเบ‡เบ–เบดเปˆเบ™, เปเบ•เปˆเบเป‰เบญเบ™เบ„เบงเบฒเบกเบฎเบฑเบเบ‚เบญเบ‡เบ‚เป‰เบญเบเบชเปเบฒเบฅเบฑเบš AWS, เบ‚เป‰เบญเบเป„เบ”เป‰เบ•เบฑเบ”เบชเบดเบ™เปƒเบˆเบเป‰เบฒเบเบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เป„เบ›เบเบฑเบ‡เป€เบกเบ„. เปƒเบ™เบšเบปเบ”เบ„เบงเบฒเบกเบ•เปเปˆเป„เบ›เบเปˆเบฝเบงเบเบฑเบšเบซเบปเบงเบ‚เปเป‰เบ™เบตเป‰, เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบˆเบฐเบชเบฐเปเบ”เบ‡เบงเบดเบ—เบตเบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ”เบฅเบฐเบšเบปเบšเบ—เบฑเบ‡เบซเบกเบปเบ”เบ—เบตเปˆเป„เบ”เป‰เบญเบฐเบ—เบดเบšเบฒเบเบ‚เป‰เบฒเบ‡เป€เบ—เบดเบ‡เปƒเบ™ AWS เป‚เบ”เบเปƒเบŠเป‰ AWS Kinesis เปเบฅเบฐ AWS EMR. เบ•เบดเบ”เบ•เบฒเบกเบ‚เปˆเบฒเบง!

เปเบซเบผเปˆเบ‡เบ‚เปเป‰เบกเบนเบ™: www.habr.com

เป€เบžเบตเปˆเบกเบ„เบงเบฒเบกเบ„เบดเบ”เป€เบซเบฑเบ™