ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ

ಹಲೋ, ಹಬ್ರ್! ಇಂದು ನಾವು ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್ ಅನ್ನು ಬಳಸಿಕೊಂಡು ಅಪಾಚೆ ಕಾಫ್ಕಾ ಸಂದೇಶ ಸ್ಟ್ರೀಮ್‌ಗಳನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸುವ ವ್ಯವಸ್ಥೆಯನ್ನು ನಿರ್ಮಿಸುತ್ತೇವೆ ಮತ್ತು ಪ್ರಕ್ರಿಯೆ ಫಲಿತಾಂಶಗಳನ್ನು AWS RDS ಕ್ಲೌಡ್ ಡೇಟಾಬೇಸ್‌ಗೆ ಬರೆಯುತ್ತೇವೆ.

ಒಂದು ನಿರ್ದಿಷ್ಟ ಕ್ರೆಡಿಟ್ ಸಂಸ್ಥೆಯು ಅದರ ಎಲ್ಲಾ ಶಾಖೆಗಳಲ್ಲಿ ಒಳಬರುವ ವಹಿವಾಟುಗಳನ್ನು "ಫ್ಲೈನಲ್ಲಿ" ಪ್ರಕ್ರಿಯೆಗೊಳಿಸುವ ಕಾರ್ಯವನ್ನು ನಮಗೆ ಹೊಂದಿಸುತ್ತದೆ ಎಂದು ಊಹಿಸೋಣ. ಖಜಾನೆ, ಮಿತಿಗಳು ಅಥವಾ ವಹಿವಾಟುಗಳಿಗೆ ಹಣಕಾಸಿನ ಫಲಿತಾಂಶಗಳು ಇತ್ಯಾದಿಗಳಿಗೆ ಮುಕ್ತ ಕರೆನ್ಸಿ ಸ್ಥಾನವನ್ನು ತ್ವರಿತವಾಗಿ ಲೆಕ್ಕಾಚಾರ ಮಾಡುವ ಉದ್ದೇಶಕ್ಕಾಗಿ ಇದನ್ನು ಮಾಡಬಹುದು.

ಮ್ಯಾಜಿಕ್ ಮತ್ತು ಮ್ಯಾಜಿಕ್ ಮಂತ್ರಗಳ ಬಳಕೆಯಿಲ್ಲದೆ ಈ ಪ್ರಕರಣವನ್ನು ಹೇಗೆ ಕಾರ್ಯಗತಗೊಳಿಸುವುದು - ಕಟ್ ಅಡಿಯಲ್ಲಿ ಓದಿ! ಹೋಗು!

ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ
(ಚಿತ್ರ ಮೂಲ)

ಪರಿಚಯ

ಸಹಜವಾಗಿ, ನೈಜ ಸಮಯದಲ್ಲಿ ಹೆಚ್ಚಿನ ಪ್ರಮಾಣದ ಡೇಟಾವನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸುವುದು ಆಧುನಿಕ ವ್ಯವಸ್ಥೆಗಳಲ್ಲಿ ಬಳಕೆಗೆ ಸಾಕಷ್ಟು ಅವಕಾಶಗಳನ್ನು ಒದಗಿಸುತ್ತದೆ. ಇದಕ್ಕಾಗಿ ಅತ್ಯಂತ ಜನಪ್ರಿಯ ಸಂಯೋಜನೆಗಳೆಂದರೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನ ಸಂಯೋಜನೆಯಾಗಿದೆ, ಅಲ್ಲಿ ಕಾಫ್ಕಾ ಒಳಬರುವ ಸಂದೇಶ ಪ್ಯಾಕೆಟ್‌ಗಳ ಸ್ಟ್ರೀಮ್ ಅನ್ನು ರಚಿಸುತ್ತದೆ ಮತ್ತು ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್ ಈ ಪ್ಯಾಕೆಟ್‌ಗಳನ್ನು ನಿರ್ದಿಷ್ಟ ಸಮಯದ ಮಧ್ಯಂತರದಲ್ಲಿ ಪ್ರಕ್ರಿಯೆಗೊಳಿಸುತ್ತದೆ.

ಅಪ್ಲಿಕೇಶನ್‌ನ ದೋಷ ಸಹಿಷ್ಣುತೆಯನ್ನು ಹೆಚ್ಚಿಸಲು, ನಾವು ಚೆಕ್‌ಪಾಯಿಂಟ್‌ಗಳನ್ನು ಬಳಸುತ್ತೇವೆ. ಈ ಕಾರ್ಯವಿಧಾನದೊಂದಿಗೆ, ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್ ಎಂಜಿನ್ ಕಳೆದುಹೋದ ಡೇಟಾವನ್ನು ಮರುಪಡೆಯಲು ಅಗತ್ಯವಿರುವಾಗ, ಅದು ಕೊನೆಯ ಚೆಕ್‌ಪಾಯಿಂಟ್‌ಗೆ ಹಿಂತಿರುಗಿ ಮತ್ತು ಅಲ್ಲಿಂದ ಲೆಕ್ಕಾಚಾರಗಳನ್ನು ಪುನರಾರಂಭಿಸಬೇಕಾಗುತ್ತದೆ.

ಅಭಿವೃದ್ಧಿ ಹೊಂದಿದ ವ್ಯವಸ್ಥೆಯ ಆರ್ಕಿಟೆಕ್ಚರ್

ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ

ಬಳಸಿದ ಘಟಕಗಳು:

  • ಅಪಾಚೆ ಕಾಫ್ಕಾ ವಿತರಿಸಿದ ಪ್ರಕಟಣೆ-ಚಂದಾದಾರ ಸಂದೇಶ ವ್ಯವಸ್ಥೆಯಾಗಿದೆ. ಆಫ್‌ಲೈನ್ ಮತ್ತು ಆನ್‌ಲೈನ್ ಸಂದೇಶ ಬಳಕೆ ಎರಡಕ್ಕೂ ಸೂಕ್ತವಾಗಿದೆ. ಡೇಟಾ ನಷ್ಟವನ್ನು ತಡೆಗಟ್ಟಲು, ಕಾಫ್ಕಾ ಸಂದೇಶಗಳನ್ನು ಡಿಸ್ಕ್‌ನಲ್ಲಿ ಸಂಗ್ರಹಿಸಲಾಗುತ್ತದೆ ಮತ್ತು ಕ್ಲಸ್ಟರ್‌ನಲ್ಲಿ ಪುನರಾವರ್ತಿಸಲಾಗುತ್ತದೆ. ಕಾಫ್ಕಾ ವ್ಯವಸ್ಥೆಯನ್ನು ZooKeeper ಸಿಂಕ್ರೊನೈಸೇಶನ್ ಸೇವೆಯ ಮೇಲೆ ನಿರ್ಮಿಸಲಾಗಿದೆ;
  • ಅಪಾಚೆ ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್ - ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾವನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸಲು ಸ್ಪಾರ್ಕ್ ಘಟಕ. ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್ ಮಾಡ್ಯೂಲ್ ಅನ್ನು ಮೈಕ್ರೋ-ಬ್ಯಾಚ್ ಆರ್ಕಿಟೆಕ್ಚರ್ ಬಳಸಿ ನಿರ್ಮಿಸಲಾಗಿದೆ, ಅಲ್ಲಿ ಡೇಟಾ ಸ್ಟ್ರೀಮ್ ಅನ್ನು ಸಣ್ಣ ಡೇಟಾ ಪ್ಯಾಕೆಟ್‌ಗಳ ನಿರಂತರ ಅನುಕ್ರಮವಾಗಿ ಅರ್ಥೈಸಲಾಗುತ್ತದೆ. ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್ ವಿವಿಧ ಮೂಲಗಳಿಂದ ಡೇಟಾವನ್ನು ತೆಗೆದುಕೊಳ್ಳುತ್ತದೆ ಮತ್ತು ಅದನ್ನು ಸಣ್ಣ ಪ್ಯಾಕೇಜ್‌ಗಳಾಗಿ ಸಂಯೋಜಿಸುತ್ತದೆ. ನಿಯಮಿತ ಮಧ್ಯಂತರದಲ್ಲಿ ಹೊಸ ಪ್ಯಾಕೇಜ್‌ಗಳನ್ನು ರಚಿಸಲಾಗುತ್ತದೆ. ಪ್ರತಿ ಸಮಯದ ಮಧ್ಯಂತರದ ಆರಂಭದಲ್ಲಿ, ಹೊಸ ಪ್ಯಾಕೆಟ್ ಅನ್ನು ರಚಿಸಲಾಗುತ್ತದೆ ಮತ್ತು ಆ ಮಧ್ಯಂತರದಲ್ಲಿ ಸ್ವೀಕರಿಸಿದ ಯಾವುದೇ ಡೇಟಾವನ್ನು ಪ್ಯಾಕೆಟ್‌ನಲ್ಲಿ ಸೇರಿಸಲಾಗುತ್ತದೆ. ಮಧ್ಯಂತರದ ಕೊನೆಯಲ್ಲಿ, ಪ್ಯಾಕೆಟ್ ಬೆಳವಣಿಗೆ ನಿಲ್ಲುತ್ತದೆ. ಮಧ್ಯಂತರದ ಗಾತ್ರವನ್ನು ಬ್ಯಾಚ್ ಮಧ್ಯಂತರ ಎಂಬ ನಿಯತಾಂಕದಿಂದ ನಿರ್ಧರಿಸಲಾಗುತ್ತದೆ;
  • ಅಪಾಚೆ ಸ್ಪಾರ್ಕ್ SQL - ಸ್ಪಾರ್ಕ್ ಕ್ರಿಯಾತ್ಮಕ ಪ್ರೋಗ್ರಾಮಿಂಗ್ನೊಂದಿಗೆ ಸಂಬಂಧಿತ ಸಂಸ್ಕರಣೆಯನ್ನು ಸಂಯೋಜಿಸುತ್ತದೆ. ರಚನಾತ್ಮಕ ಡೇಟಾ ಎಂದರೆ ಸ್ಕೀಮಾವನ್ನು ಹೊಂದಿರುವ ಡೇಟಾ, ಅಂದರೆ, ಎಲ್ಲಾ ದಾಖಲೆಗಳಿಗಾಗಿ ಕ್ಷೇತ್ರಗಳ ಒಂದು ಸೆಟ್. Spark SQL ವಿವಿಧ ರಚನಾತ್ಮಕ ಡೇಟಾ ಮೂಲಗಳಿಂದ ಇನ್‌ಪುಟ್ ಅನ್ನು ಬೆಂಬಲಿಸುತ್ತದೆ ಮತ್ತು ಸ್ಕೀಮಾ ಮಾಹಿತಿಯ ಲಭ್ಯತೆಗೆ ಧನ್ಯವಾದಗಳು, ಇದು ದಾಖಲೆಗಳ ಅಗತ್ಯವಿರುವ ಕ್ಷೇತ್ರಗಳನ್ನು ಮಾತ್ರ ಸಮರ್ಥವಾಗಿ ಹಿಂಪಡೆಯಬಹುದು ಮತ್ತು DataFrame API ಗಳನ್ನು ಸಹ ಒದಗಿಸುತ್ತದೆ;
  • AWS RDS ಇದು ತುಲನಾತ್ಮಕವಾಗಿ ಅಗ್ಗದ ಕ್ಲೌಡ್-ಆಧಾರಿತ ಸಂಬಂಧಿತ ಡೇಟಾಬೇಸ್ ಆಗಿದೆ, ಇದು ಸೆಟಪ್, ಕಾರ್ಯಾಚರಣೆ ಮತ್ತು ಸ್ಕೇಲಿಂಗ್ ಅನ್ನು ಸರಳಗೊಳಿಸುವ ವೆಬ್ ಸೇವೆಯಾಗಿದೆ ಮತ್ತು ನೇರವಾಗಿ Amazon ನಿಂದ ನಿರ್ವಹಿಸಲ್ಪಡುತ್ತದೆ.

ಕಾಫ್ಕಾ ಸರ್ವರ್ ಅನ್ನು ಸ್ಥಾಪಿಸುವುದು ಮತ್ತು ಚಾಲನೆ ಮಾಡುವುದು

ಕಾಫ್ಕಾವನ್ನು ನೇರವಾಗಿ ಬಳಸುವ ಮೊದಲು, ನೀವು ಜಾವಾವನ್ನು ಹೊಂದಿರುವಿರಾ ಎಂದು ಖಚಿತಪಡಿಸಿಕೊಳ್ಳಬೇಕು, ಏಕೆಂದರೆ... JVM ಅನ್ನು ಕೆಲಸಕ್ಕಾಗಿ ಬಳಸಲಾಗುತ್ತದೆ:

sudo apt-get update 
sudo apt-get install default-jre
java -version

ಕಾಫ್ಕಾ ಜೊತೆ ಕೆಲಸ ಮಾಡಲು ಹೊಸ ಬಳಕೆದಾರರನ್ನು ರಚಿಸೋಣ:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

ಮುಂದೆ, ಅಧಿಕೃತ ಅಪಾಚೆ ಕಾಫ್ಕಾ ವೆಬ್‌ಸೈಟ್‌ನಿಂದ ವಿತರಣೆಯನ್ನು ಡೌನ್‌ಲೋಡ್ ಮಾಡಿ:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

ಡೌನ್‌ಲೋಡ್ ಮಾಡಿದ ಆರ್ಕೈವ್ ಅನ್ನು ಅನ್ಪ್ಯಾಕ್ ಮಾಡಿ:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

ಮುಂದಿನ ಹಂತವು ಐಚ್ಛಿಕವಾಗಿರುತ್ತದೆ. ವಾಸ್ತವವೆಂದರೆ ಡೀಫಾಲ್ಟ್ ಸೆಟ್ಟಿಂಗ್‌ಗಳು ಅಪಾಚೆ ಕಾಫ್ಕಾದ ಎಲ್ಲಾ ಸಾಮರ್ಥ್ಯಗಳನ್ನು ಸಂಪೂರ್ಣವಾಗಿ ಬಳಸಲು ನಿಮಗೆ ಅನುಮತಿಸುವುದಿಲ್ಲ. ಉದಾಹರಣೆಗೆ, ಸಂದೇಶಗಳನ್ನು ಪ್ರಕಟಿಸಬಹುದಾದ ವಿಷಯ, ವರ್ಗ, ಗುಂಪನ್ನು ಅಳಿಸಿ. ಇದನ್ನು ಬದಲಾಯಿಸಲು, ಕಾನ್ಫಿಗರೇಶನ್ ಫೈಲ್ ಅನ್ನು ಸಂಪಾದಿಸೋಣ:

vim ~/kafka/config/server.properties

ಫೈಲ್‌ನ ಕೊನೆಯಲ್ಲಿ ಈ ಕೆಳಗಿನವುಗಳನ್ನು ಸೇರಿಸಿ:

delete.topic.enable = true

ಕಾಫ್ಕಾ ಸರ್ವರ್ ಅನ್ನು ಪ್ರಾರಂಭಿಸುವ ಮೊದಲು, ನೀವು ZooKeeper ಸರ್ವರ್ ಅನ್ನು ಪ್ರಾರಂಭಿಸಬೇಕು; ನಾವು ಕಾಫ್ಕಾ ವಿತರಣೆಯೊಂದಿಗೆ ಬರುವ ಸಹಾಯಕ ಸ್ಕ್ರಿಪ್ಟ್ ಅನ್ನು ಬಳಸುತ್ತೇವೆ:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

ZooKeeper ಯಶಸ್ವಿಯಾಗಿ ಪ್ರಾರಂಭಿಸಿದ ನಂತರ, ಪ್ರತ್ಯೇಕ ಟರ್ಮಿನಲ್‌ನಲ್ಲಿ ಕಾಫ್ಕಾ ಸರ್ವರ್ ಅನ್ನು ಪ್ರಾರಂಭಿಸಿ:

bin/kafka-server-start.sh config/server.properties

ವಹಿವಾಟು ಎಂಬ ಹೊಸ ವಿಷಯವನ್ನು ರಚಿಸೋಣ:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

ಅಗತ್ಯವಿರುವ ಸಂಖ್ಯೆಯ ವಿಭಾಗಗಳು ಮತ್ತು ಪ್ರತಿಕೃತಿಯೊಂದಿಗೆ ವಿಷಯವನ್ನು ರಚಿಸಲಾಗಿದೆ ಎಂದು ಖಚಿತಪಡಿಸಿಕೊಳ್ಳೋಣ:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ

ಹೊಸದಾಗಿ ರಚಿಸಲಾದ ವಿಷಯಕ್ಕಾಗಿ ನಿರ್ಮಾಪಕ ಮತ್ತು ಗ್ರಾಹಕರನ್ನು ಪರೀಕ್ಷಿಸುವ ಕ್ಷಣಗಳನ್ನು ಕಳೆದುಕೊಳ್ಳೋಣ. ಸಂದೇಶಗಳನ್ನು ಕಳುಹಿಸುವುದು ಮತ್ತು ಸ್ವೀಕರಿಸುವುದನ್ನು ನೀವು ಹೇಗೆ ಪರೀಕ್ಷಿಸಬಹುದು ಎಂಬುದರ ಕುರಿತು ಹೆಚ್ಚಿನ ವಿವರಗಳನ್ನು ಅಧಿಕೃತ ದಾಖಲೆಯಲ್ಲಿ ಬರೆಯಲಾಗಿದೆ - ಕೆಲವು ಸಂದೇಶಗಳನ್ನು ಕಳುಹಿಸಿ. ಸರಿ, ನಾವು ಕಾಫ್ಕಾಪ್ರೊಡ್ಯೂಸರ್ API ಅನ್ನು ಬಳಸಿಕೊಂಡು ಪೈಥಾನ್‌ನಲ್ಲಿ ನಿರ್ಮಾಪಕರನ್ನು ಬರೆಯಲು ಮುಂದುವರಿಯುತ್ತೇವೆ.

ನಿರ್ಮಾಪಕ ಬರವಣಿಗೆ

ನಿರ್ಮಾಪಕರು ಯಾದೃಚ್ಛಿಕ ಡೇಟಾವನ್ನು ರಚಿಸುತ್ತಾರೆ - ಪ್ರತಿ ಸೆಕೆಂಡಿಗೆ 100 ಸಂದೇಶಗಳು. ಯಾದೃಚ್ಛಿಕ ಡೇಟಾದಿಂದ ನಾವು ಮೂರು ಕ್ಷೇತ್ರಗಳನ್ನು ಒಳಗೊಂಡಿರುವ ನಿಘಂಟನ್ನು ಅರ್ಥೈಸುತ್ತೇವೆ:

  • ಶಾಖೆ - ಕ್ರೆಡಿಟ್ ಸಂಸ್ಥೆಯ ಮಾರಾಟದ ಬಿಂದುವಿನ ಹೆಸರು;
  • ಕರೆನ್ಸಿ - ವಹಿವಾಟು ಕರೆನ್ಸಿ;
  • ಪ್ರಮಾಣ - ವಹಿವಾಟು ಮೊತ್ತ. ಬ್ಯಾಂಕಿನಿಂದ ಕರೆನ್ಸಿಯ ಖರೀದಿಯಾಗಿದ್ದರೆ ಮೊತ್ತವು ಧನಾತ್ಮಕ ಸಂಖ್ಯೆಯಾಗಿರುತ್ತದೆ ಮತ್ತು ಅದು ಮಾರಾಟವಾಗಿದ್ದರೆ ಋಣಾತ್ಮಕ ಸಂಖ್ಯೆಯಾಗಿದೆ.

ನಿರ್ಮಾಪಕರ ಕೋಡ್ ಈ ರೀತಿ ಕಾಣುತ್ತದೆ:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

ಮುಂದೆ, ಕಳುಹಿಸುವ ವಿಧಾನವನ್ನು ಬಳಸಿಕೊಂಡು, ನಾವು ಸರ್ವರ್‌ಗೆ ಸಂದೇಶವನ್ನು ಕಳುಹಿಸುತ್ತೇವೆ, ನಮಗೆ ಅಗತ್ಯವಿರುವ ವಿಷಯಕ್ಕೆ, JSON ಸ್ವರೂಪದಲ್ಲಿ:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

ಸ್ಕ್ರಿಪ್ಟ್ ಅನ್ನು ಚಾಲನೆ ಮಾಡುವಾಗ, ನಾವು ಟರ್ಮಿನಲ್‌ನಲ್ಲಿ ಈ ಕೆಳಗಿನ ಸಂದೇಶಗಳನ್ನು ಸ್ವೀಕರಿಸುತ್ತೇವೆ:

ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ

ಇದರರ್ಥ ಎಲ್ಲವೂ ನಾವು ಬಯಸಿದಂತೆ ಕಾರ್ಯನಿರ್ವಹಿಸುತ್ತದೆ - ನಿರ್ಮಾಪಕರು ನಮಗೆ ಅಗತ್ಯವಿರುವ ವಿಷಯಕ್ಕೆ ಸಂದೇಶಗಳನ್ನು ಉತ್ಪಾದಿಸುತ್ತಾರೆ ಮತ್ತು ಕಳುಹಿಸುತ್ತಾರೆ.
ಮುಂದಿನ ಹಂತವೆಂದರೆ ಸ್ಪಾರ್ಕ್ ಅನ್ನು ಸ್ಥಾಪಿಸುವುದು ಮತ್ತು ಈ ಸಂದೇಶ ಸ್ಟ್ರೀಮ್ ಅನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸುವುದು.

ಅಪಾಚೆ ಸ್ಪಾರ್ಕ್ ಅನ್ನು ಸ್ಥಾಪಿಸಲಾಗುತ್ತಿದೆ

ಅಪಾಚೆ ಸ್ಪಾರ್ಕ್ ಸಾರ್ವತ್ರಿಕ ಮತ್ತು ಉನ್ನತ-ಕಾರ್ಯಕ್ಷಮತೆಯ ಕ್ಲಸ್ಟರ್ ಕಂಪ್ಯೂಟಿಂಗ್ ವೇದಿಕೆಯಾಗಿದೆ.

ಸ್ಪಾರ್ಕ್ ಮ್ಯಾಪ್‌ರೆಡ್ಯೂಸ್ ಮಾದರಿಯ ಜನಪ್ರಿಯ ಅಳವಡಿಕೆಗಳಿಗಿಂತ ಉತ್ತಮವಾಗಿ ಕಾರ್ಯನಿರ್ವಹಿಸುತ್ತದೆ ಮತ್ತು ಸಂವಾದಾತ್ಮಕ ಪ್ರಶ್ನೆಗಳು ಮತ್ತು ಸ್ಟ್ರೀಮ್ ಪ್ರಕ್ರಿಯೆ ಸೇರಿದಂತೆ ವ್ಯಾಪಕ ಶ್ರೇಣಿಯ ಕಂಪ್ಯೂಟೇಶನ್ ಪ್ರಕಾರಗಳನ್ನು ಬೆಂಬಲಿಸುತ್ತದೆ. ಹೆಚ್ಚಿನ ಪ್ರಮಾಣದ ಡೇಟಾವನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸುವಾಗ ವೇಗವು ಪ್ರಮುಖ ಪಾತ್ರವನ್ನು ವಹಿಸುತ್ತದೆ, ಏಕೆಂದರೆ ಇದು ನಿಮಿಷಗಳು ಅಥವಾ ಗಂಟೆಗಳ ಕಾಲ ಕಾಯದೆ ಸಂವಾದಾತ್ಮಕವಾಗಿ ಕೆಲಸ ಮಾಡಲು ನಿಮಗೆ ಅನುವು ಮಾಡಿಕೊಡುತ್ತದೆ. ಸ್ಪಾರ್ಕ್‌ನ ದೊಡ್ಡ ಸಾಮರ್ಥ್ಯವೆಂದರೆ ಅದನ್ನು ವೇಗವಾಗಿ ಮಾಡುವ ಸಾಮರ್ಥ್ಯವೆಂದರೆ ಇನ್-ಮೆಮೊರಿ ಲೆಕ್ಕಾಚಾರಗಳನ್ನು ನಿರ್ವಹಿಸುವ ಸಾಮರ್ಥ್ಯ.

ಈ ಚೌಕಟ್ಟನ್ನು ಸ್ಕಾಲಾದಲ್ಲಿ ಬರೆಯಲಾಗಿದೆ, ಆದ್ದರಿಂದ ನೀವು ಅದನ್ನು ಮೊದಲು ಸ್ಥಾಪಿಸಬೇಕಾಗಿದೆ:

sudo apt-get install scala

ಅಧಿಕೃತ ವೆಬ್‌ಸೈಟ್‌ನಿಂದ ಸ್ಪಾರ್ಕ್ ವಿತರಣೆಯನ್ನು ಡೌನ್‌ಲೋಡ್ ಮಾಡಿ:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

ಆರ್ಕೈವ್ ಅನ್ನು ಅನ್ಪ್ಯಾಕ್ ಮಾಡಿ:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

ಬ್ಯಾಷ್ ಫೈಲ್‌ಗೆ ಸ್ಪಾರ್ಕ್‌ಗೆ ಮಾರ್ಗವನ್ನು ಸೇರಿಸಿ:

vim ~/.bashrc

ಸಂಪಾದಕರ ಮೂಲಕ ಈ ಕೆಳಗಿನ ಸಾಲುಗಳನ್ನು ಸೇರಿಸಿ:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

bashrc ಗೆ ಬದಲಾವಣೆಗಳನ್ನು ಮಾಡಿದ ನಂತರ ಕೆಳಗಿನ ಆಜ್ಞೆಯನ್ನು ಚಲಾಯಿಸಿ:

source ~/.bashrc

AWS PostgreSQL ಅನ್ನು ನಿಯೋಜಿಸಲಾಗುತ್ತಿದೆ

ಸ್ಟ್ರೀಮ್‌ಗಳಿಂದ ಸಂಸ್ಕರಿಸಿದ ಮಾಹಿತಿಯನ್ನು ನಾವು ಅಪ್‌ಲೋಡ್ ಮಾಡುವ ಡೇಟಾಬೇಸ್ ಅನ್ನು ನಿಯೋಜಿಸಲು ಮಾತ್ರ ಉಳಿದಿದೆ. ಇದಕ್ಕಾಗಿ ನಾವು AWS RDS ಸೇವೆಯನ್ನು ಬಳಸುತ್ತೇವೆ.

AWS ಕನ್ಸೋಲ್‌ಗೆ ಹೋಗಿ -> AWS RDS -> ಡೇಟಾಬೇಸ್‌ಗಳು -> ಡೇಟಾಬೇಸ್ ರಚಿಸಿ:
ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ

PostgreSQL ಆಯ್ಕೆಮಾಡಿ ಮತ್ತು ಮುಂದೆ ಕ್ಲಿಕ್ ಮಾಡಿ:
ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ

ಏಕೆಂದರೆ ಈ ಉದಾಹರಣೆಯು ಶೈಕ್ಷಣಿಕ ಉದ್ದೇಶಗಳಿಗಾಗಿ ಮಾತ್ರ; ನಾವು ಉಚಿತ ಸರ್ವರ್ ಅನ್ನು "ಕನಿಷ್ಠ" (ಉಚಿತ ಶ್ರೇಣಿ) ಬಳಸುತ್ತೇವೆ:
ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ

ಮುಂದೆ, ನಾವು ಫ್ರೀ ಟೈರ್ ಬ್ಲಾಕ್‌ನಲ್ಲಿ ಟಿಕ್ ಅನ್ನು ಹಾಕುತ್ತೇವೆ ಮತ್ತು ಅದರ ನಂತರ ನಮಗೆ ಸ್ವಯಂಚಾಲಿತವಾಗಿ t2.micro ವರ್ಗದ ಉದಾಹರಣೆಯನ್ನು ನೀಡಲಾಗುವುದು - ದುರ್ಬಲವಾಗಿದ್ದರೂ, ಇದು ಉಚಿತ ಮತ್ತು ನಮ್ಮ ಕಾರ್ಯಕ್ಕೆ ಸಾಕಷ್ಟು ಸೂಕ್ತವಾಗಿದೆ:
ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ

ಮುಂದೆ ಬಹಳ ಮುಖ್ಯವಾದ ವಿಷಯಗಳು ಬರುತ್ತವೆ: ಡೇಟಾಬೇಸ್ ನಿದರ್ಶನದ ಹೆಸರು, ಮಾಸ್ಟರ್ ಬಳಕೆದಾರರ ಹೆಸರು ಮತ್ತು ಅವರ ಪಾಸ್ವರ್ಡ್. ನಿದರ್ಶನವನ್ನು ಹೆಸರಿಸೋಣ: myHabrTest, ಮಾಸ್ಟರ್ ಬಳಕೆದಾರ: ಹಬ್ರ್, ಗುಪ್ತಪದ: habr12345 ಮತ್ತು ಮುಂದಿನ ಬಟನ್ ಕ್ಲಿಕ್ ಮಾಡಿ:
ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ

ಮುಂದಿನ ಪುಟದಲ್ಲಿ ಹೊರಗಿನಿಂದ ನಮ್ಮ ಡೇಟಾಬೇಸ್ ಸರ್ವರ್‌ನ ಪ್ರವೇಶಕ್ಕೆ ಜವಾಬ್ದಾರರಾಗಿರುವ ನಿಯತಾಂಕಗಳಿವೆ (ಸಾರ್ವಜನಿಕ ಪ್ರವೇಶ) ಮತ್ತು ಪೋರ್ಟ್ ಲಭ್ಯತೆ:

ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ

VPC ಭದ್ರತಾ ಗುಂಪಿಗೆ ಹೊಸ ಸೆಟ್ಟಿಂಗ್ ಅನ್ನು ರಚಿಸೋಣ, ಇದು ಪೋರ್ಟ್ 5432 (PostgreSQL) ಮೂಲಕ ನಮ್ಮ ಡೇಟಾಬೇಸ್ ಸರ್ವರ್‌ಗೆ ಬಾಹ್ಯ ಪ್ರವೇಶವನ್ನು ಅನುಮತಿಸುತ್ತದೆ.
VPC ಡ್ಯಾಶ್‌ಬೋರ್ಡ್‌ಗೆ ಪ್ರತ್ಯೇಕ ಬ್ರೌಸರ್ ವಿಂಡೋದಲ್ಲಿ AWS ಕನ್ಸೋಲ್‌ಗೆ ಹೋಗೋಣ -> ಭದ್ರತಾ ಗುಂಪುಗಳು -> ಭದ್ರತಾ ಗುಂಪು ವಿಭಾಗವನ್ನು ರಚಿಸಿ:
ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ

ನಾವು ಭದ್ರತಾ ಗುಂಪಿಗೆ ಹೆಸರನ್ನು ಹೊಂದಿಸಿದ್ದೇವೆ - PostgreSQL, ವಿವರಣೆ, ಈ ಗುಂಪನ್ನು ಯಾವ VPC ಯೊಂದಿಗೆ ಸಂಯೋಜಿಸಬೇಕು ಎಂಬುದನ್ನು ಸೂಚಿಸಿ ಮತ್ತು ರಚಿಸಿ ಬಟನ್ ಕ್ಲಿಕ್ ಮಾಡಿ:
ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ

ಕೆಳಗಿನ ಚಿತ್ರದಲ್ಲಿ ತೋರಿಸಿರುವಂತೆ ಹೊಸದಾಗಿ ರಚಿಸಲಾದ ಗುಂಪಿಗೆ ಪೋರ್ಟ್ 5432 ಗಾಗಿ ಒಳಬರುವ ನಿಯಮಗಳನ್ನು ಭರ್ತಿ ಮಾಡಿ. ನೀವು ಪೋರ್ಟ್ ಅನ್ನು ಹಸ್ತಚಾಲಿತವಾಗಿ ನಿರ್ದಿಷ್ಟಪಡಿಸಲು ಸಾಧ್ಯವಿಲ್ಲ, ಆದರೆ ಟೈಪ್ ಡ್ರಾಪ್-ಡೌನ್ ಪಟ್ಟಿಯಿಂದ PostgreSQL ಅನ್ನು ಆಯ್ಕೆ ಮಾಡಿ.

ಕಟ್ಟುನಿಟ್ಟಾಗಿ ಹೇಳುವುದಾದರೆ, ಮೌಲ್ಯ ::/0 ಎಂದರೆ ಪ್ರಪಂಚದಾದ್ಯಂತದ ಸರ್ವರ್‌ಗೆ ಒಳಬರುವ ದಟ್ಟಣೆಯ ಲಭ್ಯತೆ, ಇದು ಅಂಗೀಕೃತವಾಗಿ ಸಂಪೂರ್ಣವಾಗಿ ನಿಜವಲ್ಲ, ಆದರೆ ಉದಾಹರಣೆಯನ್ನು ವಿಶ್ಲೇಷಿಸಲು, ಈ ವಿಧಾನವನ್ನು ಬಳಸಲು ನಾವೇ ಅನುಮತಿಸೋಣ:
ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ

ನಾವು ಬ್ರೌಸರ್ ಪುಟಕ್ಕೆ ಹಿಂತಿರುಗುತ್ತೇವೆ, ಅಲ್ಲಿ ನಾವು "ಸುಧಾರಿತ ಸೆಟ್ಟಿಂಗ್‌ಗಳನ್ನು ಕಾನ್ಫಿಗರ್ ಮಾಡಿ" ಅನ್ನು ತೆರೆಯುತ್ತೇವೆ ಮತ್ತು VPC ಭದ್ರತಾ ಗುಂಪುಗಳ ವಿಭಾಗದಲ್ಲಿ ಆಯ್ಕೆಮಾಡಿ -> ಅಸ್ತಿತ್ವದಲ್ಲಿರುವ VPC ಭದ್ರತಾ ಗುಂಪುಗಳನ್ನು ಆರಿಸಿ -> PostgreSQL:
ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ

ಮುಂದೆ, ಡೇಟಾಬೇಸ್ ಆಯ್ಕೆಗಳಲ್ಲಿ -> ಡೇಟಾಬೇಸ್ ಹೆಸರು -> ಹೆಸರನ್ನು ಹೊಂದಿಸಿ - habrDB.

ಡೀಫಾಲ್ಟ್ ಆಗಿ ಬ್ಯಾಕಪ್ (ಬ್ಯಾಕಪ್ ಧಾರಣ ಅವಧಿ - 0 ದಿನಗಳು), ಮೇಲ್ವಿಚಾರಣೆ ಮತ್ತು ಕಾರ್ಯಕ್ಷಮತೆಯ ಒಳನೋಟಗಳನ್ನು ನಿಷ್ಕ್ರಿಯಗೊಳಿಸುವುದನ್ನು ಹೊರತುಪಡಿಸಿ ಉಳಿದ ನಿಯತಾಂಕಗಳನ್ನು ನಾವು ಬಿಡಬಹುದು. ಬಟನ್ ಮೇಲೆ ಕ್ಲಿಕ್ ಮಾಡಿ ಡೇಟಾಬೇಸ್ ರಚಿಸಿ:
ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ

ಥ್ರೆಡ್ ಹ್ಯಾಂಡ್ಲರ್

ಅಂತಿಮ ಹಂತವು ಸ್ಪಾರ್ಕ್ ಕೆಲಸದ ಅಭಿವೃದ್ಧಿಯಾಗಿರುತ್ತದೆ, ಇದು ಕಾಫ್ಕಾದಿಂದ ಪ್ರತಿ ಎರಡು ಸೆಕೆಂಡಿಗೆ ಬರುವ ಹೊಸ ಡೇಟಾವನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸುತ್ತದೆ ಮತ್ತು ಫಲಿತಾಂಶವನ್ನು ಡೇಟಾಬೇಸ್‌ಗೆ ನಮೂದಿಸುತ್ತದೆ.

ಮೇಲೆ ತಿಳಿಸಿದಂತೆ, ಚೆಕ್‌ಪಾಯಿಂಟ್‌ಗಳು ಸ್ಪಾರ್ಕ್‌ಸ್ಟ್ರೀಮಿಂಗ್‌ನಲ್ಲಿ ಪ್ರಮುಖ ಕಾರ್ಯವಿಧಾನವಾಗಿದ್ದು, ದೋಷ ಸಹಿಷ್ಣುತೆಯನ್ನು ಖಚಿತಪಡಿಸಿಕೊಳ್ಳಲು ಅದನ್ನು ಕಾನ್ಫಿಗರ್ ಮಾಡಬೇಕು. ನಾವು ಚೆಕ್‌ಪಾಯಿಂಟ್‌ಗಳನ್ನು ಬಳಸುತ್ತೇವೆ ಮತ್ತು ಕಾರ್ಯವಿಧಾನವು ವಿಫಲವಾದಲ್ಲಿ, ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್ ಮಾಡ್ಯೂಲ್ ಕಳೆದ ಚೆಕ್‌ಪಾಯಿಂಟ್‌ಗೆ ಹಿಂತಿರುಗಬೇಕಾಗುತ್ತದೆ ಮತ್ತು ಕಳೆದುಹೋದ ಡೇಟಾವನ್ನು ಮರುಪಡೆಯಲು ಅದರಿಂದ ಲೆಕ್ಕಾಚಾರಗಳನ್ನು ಪುನರಾರಂಭಿಸಬೇಕಾಗುತ್ತದೆ.

ಚೆಕ್ಪಾಯಿಂಟ್ ಮಾಹಿತಿಯನ್ನು ಸಂಗ್ರಹಿಸಲಾಗುವ ದೋಷ-ಸಹಿಷ್ಣು, ವಿಶ್ವಾಸಾರ್ಹ ಫೈಲ್ ಸಿಸ್ಟಮ್ (HDFS, S3, ಇತ್ಯಾದಿ) ನಲ್ಲಿ ಡೈರೆಕ್ಟರಿಯನ್ನು ಹೊಂದಿಸುವ ಮೂಲಕ ಚೆಕ್ಪಾಯಿಂಟಿಂಗ್ ಅನ್ನು ಸಕ್ರಿಯಗೊಳಿಸಬಹುದು. ಇದನ್ನು ಬಳಸಿ ಮಾಡಲಾಗುತ್ತದೆ, ಉದಾಹರಣೆಗೆ:

streamingContext.checkpoint(checkpointDirectory)

ನಮ್ಮ ಉದಾಹರಣೆಯಲ್ಲಿ, ನಾವು ಈ ಕೆಳಗಿನ ವಿಧಾನವನ್ನು ಬಳಸುತ್ತೇವೆ, ಅವುಗಳೆಂದರೆ, ಚೆಕ್‌ಪಾಯಿಂಟ್ ಡೈರೆಕ್ಟರಿ ಅಸ್ತಿತ್ವದಲ್ಲಿದ್ದರೆ, ನಂತರ ಚೆಕ್‌ಪಾಯಿಂಟ್ ಡೇಟಾದಿಂದ ಸಂದರ್ಭವನ್ನು ಮರುಸೃಷ್ಟಿಸಲಾಗುತ್ತದೆ. ಡೈರೆಕ್ಟರಿ ಅಸ್ತಿತ್ವದಲ್ಲಿಲ್ಲದಿದ್ದರೆ (ಅಂದರೆ ಮೊದಲ ಬಾರಿಗೆ ಕಾರ್ಯಗತಗೊಳಿಸಿದರೆ), ನಂತರ ಹೊಸ ಸಂದರ್ಭವನ್ನು ರಚಿಸಲು ಮತ್ತು ಡಿಸ್ಟ್ರೀಮ್‌ಗಳನ್ನು ಕಾನ್ಫಿಗರ್ ಮಾಡಲು ಫಂಕ್ಷನ್‌ಟುಕ್ರಿಯೇಟ್ ಕಾಂಟೆಕ್ಸ್ಟ್ ಅನ್ನು ಕರೆಯಲಾಗುತ್ತದೆ:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

KafkaUtils ಲೈಬ್ರರಿಯ createDirectStream ವಿಧಾನವನ್ನು ಬಳಸಿಕೊಂಡು "ವಹಿವಾಟು" ವಿಷಯಕ್ಕೆ ಸಂಪರ್ಕಿಸಲು ನಾವು ಡೈರೆಕ್ಟ್‌ಸ್ಟ್ರೀಮ್ ವಸ್ತುವನ್ನು ರಚಿಸುತ್ತೇವೆ:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

JSON ಸ್ವರೂಪದಲ್ಲಿ ಒಳಬರುವ ಡೇಟಾವನ್ನು ಪಾರ್ಸಿಂಗ್ ಮಾಡುವುದು:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

ಸ್ಪಾರ್ಕ್ SQL ಅನ್ನು ಬಳಸಿಕೊಂಡು, ನಾವು ಸರಳವಾದ ಗುಂಪನ್ನು ಮಾಡುತ್ತೇವೆ ಮತ್ತು ಫಲಿತಾಂಶವನ್ನು ಕನ್ಸೋಲ್‌ನಲ್ಲಿ ಪ್ರದರ್ಶಿಸುತ್ತೇವೆ:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

ಪ್ರಶ್ನೆ ಪಠ್ಯವನ್ನು ಪಡೆಯುವುದು ಮತ್ತು ಸ್ಪಾರ್ಕ್ SQL ಮೂಲಕ ಚಾಲನೆ ಮಾಡುವುದು:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

ತದನಂತರ ನಾವು ಒಟ್ಟುಗೂಡಿದ ಡೇಟಾವನ್ನು AWS RDS ನಲ್ಲಿ ಟೇಬಲ್‌ಗೆ ಉಳಿಸುತ್ತೇವೆ. ಒಟ್ಟುಗೂಡಿಸುವಿಕೆಯ ಫಲಿತಾಂಶಗಳನ್ನು ಡೇಟಾಬೇಸ್ ಟೇಬಲ್‌ಗೆ ಉಳಿಸಲು, ನಾವು ಡೇಟಾಫ್ರೇಮ್ ಆಬ್ಜೆಕ್ಟ್‌ನ ಬರೆಯುವ ವಿಧಾನವನ್ನು ಬಳಸುತ್ತೇವೆ:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

AWS RDS ಗೆ ಸಂಪರ್ಕವನ್ನು ಹೊಂದಿಸುವ ಕುರಿತು ಕೆಲವು ಪದಗಳು. "AWS PostgreSQL ಅನ್ನು ನಿಯೋಜಿಸಲಾಗುತ್ತಿದೆ" ಹಂತದಲ್ಲಿ ನಾವು ಬಳಕೆದಾರ ಮತ್ತು ಪಾಸ್‌ವರ್ಡ್ ಅನ್ನು ರಚಿಸಿದ್ದೇವೆ. ನೀವು ಎಂಡ್‌ಪಾಯಿಂಟ್ ಅನ್ನು ಡೇಟಾಬೇಸ್ ಸರ್ವರ್ url ಆಗಿ ಬಳಸಬೇಕು, ಇದನ್ನು ಕನೆಕ್ಟಿವಿಟಿ ಮತ್ತು ಸೆಕ್ಯುರಿಟಿ ವಿಭಾಗದಲ್ಲಿ ಪ್ರದರ್ಶಿಸಲಾಗುತ್ತದೆ:

ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ

ಸ್ಪಾರ್ಕ್ ಮತ್ತು ಕಾಫ್ಕಾವನ್ನು ಸರಿಯಾಗಿ ಸಂಪರ್ಕಿಸಲು, ನೀವು ಕಲಾಕೃತಿಯನ್ನು ಬಳಸಿಕೊಂಡು ಸ್ಮಾರ್ಕ್ ಸಲ್ಲಿಸುವ ಮೂಲಕ ಕೆಲಸವನ್ನು ಚಲಾಯಿಸಬೇಕು ಸ್ಪಾರ್ಕ್-ಸ್ಟ್ರೀಮಿಂಗ್-ಕಾಫ್ಕಾ-0-8_2.11. ಹೆಚ್ಚುವರಿಯಾಗಿ, PostgreSQL ಡೇಟಾಬೇಸ್‌ನೊಂದಿಗೆ ಸಂವಹನ ನಡೆಸಲು ನಾವು ಕಲಾಕೃತಿಯನ್ನು ಸಹ ಬಳಸುತ್ತೇವೆ; ನಾವು ಅವುಗಳನ್ನು --packages ಮೂಲಕ ವರ್ಗಾಯಿಸುತ್ತೇವೆ.

ಸ್ಕ್ರಿಪ್ಟ್‌ನ ನಮ್ಯತೆಗಾಗಿ, ನಾವು ಇನ್‌ಪುಟ್ ಪ್ಯಾರಾಮೀಟರ್‌ಗಳಾಗಿ ಸಂದೇಶ ಸರ್ವರ್‌ನ ಹೆಸರು ಮತ್ತು ನಾವು ಡೇಟಾವನ್ನು ಸ್ವೀಕರಿಸಲು ಬಯಸುವ ವಿಷಯವನ್ನು ಸಹ ಸೇರಿಸುತ್ತೇವೆ.

ಆದ್ದರಿಂದ, ಸಿಸ್ಟಮ್ನ ಕಾರ್ಯವನ್ನು ಪ್ರಾರಂಭಿಸಲು ಮತ್ತು ಪರಿಶೀಲಿಸಲು ಇದು ಸಮಯ:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

ಎಲ್ಲವೂ ಕೆಲಸ ಮಾಡಿದೆ! ಕೆಳಗಿನ ಚಿತ್ರದಲ್ಲಿ ನೀವು ನೋಡುವಂತೆ, ಅಪ್ಲಿಕೇಶನ್ ಚಾಲನೆಯಲ್ಲಿರುವಾಗ, ಹೊಸ ಒಟ್ಟುಗೂಡಿಸುವಿಕೆಯ ಫಲಿತಾಂಶಗಳು ಪ್ರತಿ 2 ಸೆಕೆಂಡುಗಳಿಗೆ ಔಟ್‌ಪುಟ್ ಆಗುತ್ತವೆ, ಏಕೆಂದರೆ ನಾವು StreamingContext ಆಬ್ಜೆಕ್ಟ್ ಅನ್ನು ರಚಿಸಿದಾಗ ನಾವು ಬ್ಯಾಚಿಂಗ್ ಮಧ್ಯಂತರವನ್ನು 2 ಸೆಕೆಂಡುಗಳಿಗೆ ಹೊಂದಿಸಿದ್ದೇವೆ:

ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ

ಮುಂದೆ, ಟೇಬಲ್ನಲ್ಲಿ ದಾಖಲೆಗಳ ಉಪಸ್ಥಿತಿಯನ್ನು ಪರಿಶೀಲಿಸಲು ನಾವು ಡೇಟಾಬೇಸ್ಗೆ ಸರಳವಾದ ಪ್ರಶ್ನೆಯನ್ನು ಮಾಡುತ್ತೇವೆ ವಹಿವಾಟು_ಹರಿವು:

ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ

ತೀರ್ಮಾನಕ್ಕೆ

ಈ ಲೇಖನವು Apache Kafka ಮತ್ತು PostgreSQL ಜೊತೆಗೆ ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್ ಅನ್ನು ಬಳಸಿಕೊಂಡು ಮಾಹಿತಿಯ ಸ್ಟ್ರೀಮ್ ಪ್ರಕ್ರಿಯೆಯ ಉದಾಹರಣೆಯನ್ನು ನೋಡಿದೆ. ವಿವಿಧ ಮೂಲಗಳಿಂದ ಡೇಟಾದ ಬೆಳವಣಿಗೆಯೊಂದಿಗೆ, ಸ್ಟ್ರೀಮಿಂಗ್ ಮತ್ತು ನೈಜ-ಸಮಯದ ಅಪ್ಲಿಕೇಶನ್‌ಗಳನ್ನು ರಚಿಸಲು ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್‌ನ ಪ್ರಾಯೋಗಿಕ ಮೌಲ್ಯವನ್ನು ಅತಿಯಾಗಿ ಅಂದಾಜು ಮಾಡುವುದು ಕಷ್ಟ.

ನನ್ನ ರೆಪೊಸಿಟರಿಯಲ್ಲಿ ನೀವು ಪೂರ್ಣ ಮೂಲ ಕೋಡ್ ಅನ್ನು ಕಾಣಬಹುದು GitHub.

ಈ ಲೇಖನವನ್ನು ಚರ್ಚಿಸಲು ನನಗೆ ಸಂತೋಷವಾಗಿದೆ, ನಿಮ್ಮ ಕಾಮೆಂಟ್‌ಗಳಿಗಾಗಿ ನಾನು ಎದುರು ನೋಡುತ್ತಿದ್ದೇನೆ ಮತ್ತು ಎಲ್ಲಾ ಕಾಳಜಿಯುಳ್ಳ ಓದುಗರಿಂದ ರಚನಾತ್ಮಕ ಟೀಕೆಗಳನ್ನು ಸಹ ನಾನು ಭಾವಿಸುತ್ತೇನೆ.

ನಾನು ನಿಮಗೆ ಯಶಸ್ಸನ್ನು ಬಯಸುತ್ತೇನೆ!

ಪಿ.ಎಸ್. ಆರಂಭದಲ್ಲಿ ಸ್ಥಳೀಯ PostgreSQL ಡೇಟಾಬೇಸ್ ಅನ್ನು ಬಳಸಲು ಯೋಜಿಸಲಾಗಿತ್ತು, ಆದರೆ AWS ಗಾಗಿ ನನ್ನ ಪ್ರೀತಿಯನ್ನು ಗಮನಿಸಿದರೆ, ನಾನು ಡೇಟಾಬೇಸ್ ಅನ್ನು ಕ್ಲೌಡ್‌ಗೆ ಸರಿಸಲು ನಿರ್ಧರಿಸಿದೆ. ಈ ವಿಷಯದ ಕುರಿತು ಮುಂದಿನ ಲೇಖನದಲ್ಲಿ, AWS ಕಿನೆಸಿಸ್ ಮತ್ತು AWS EMR ಅನ್ನು ಬಳಸಿಕೊಂಡು AWS ನಲ್ಲಿ ಮೇಲೆ ವಿವರಿಸಿದ ಸಂಪೂರ್ಣ ವ್ಯವಸ್ಥೆಯನ್ನು ಹೇಗೆ ಕಾರ್ಯಗತಗೊಳಿಸಬೇಕೆಂದು ನಾನು ತೋರಿಸುತ್ತೇನೆ. ಸುದ್ದಿಯನ್ನು ಅನುಸರಿಸಿ!

ಮೂಲ: www.habr.com

ಕಾಮೆಂಟ್ ಅನ್ನು ಸೇರಿಸಿ