ಪ್ರೊಹೋಸ್ಟರ್ > Блог > ಆಡಳಿತ > ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ
ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್ನೊಂದಿಗೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಸಂಸ್ಕರಣೆ
ಹಲೋ, ಹಬ್ರ್! ಇಂದು ನಾವು ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್ ಅನ್ನು ಬಳಸಿಕೊಂಡು ಅಪಾಚೆ ಕಾಫ್ಕಾ ಸಂದೇಶ ಸ್ಟ್ರೀಮ್ಗಳನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸುವ ವ್ಯವಸ್ಥೆಯನ್ನು ನಿರ್ಮಿಸುತ್ತೇವೆ ಮತ್ತು ಪ್ರಕ್ರಿಯೆ ಫಲಿತಾಂಶಗಳನ್ನು AWS RDS ಕ್ಲೌಡ್ ಡೇಟಾಬೇಸ್ಗೆ ಬರೆಯುತ್ತೇವೆ.
ಒಂದು ನಿರ್ದಿಷ್ಟ ಕ್ರೆಡಿಟ್ ಸಂಸ್ಥೆಯು ಅದರ ಎಲ್ಲಾ ಶಾಖೆಗಳಲ್ಲಿ ಒಳಬರುವ ವಹಿವಾಟುಗಳನ್ನು "ಫ್ಲೈನಲ್ಲಿ" ಪ್ರಕ್ರಿಯೆಗೊಳಿಸುವ ಕಾರ್ಯವನ್ನು ನಮಗೆ ಹೊಂದಿಸುತ್ತದೆ ಎಂದು ಊಹಿಸೋಣ. ಖಜಾನೆ, ಮಿತಿಗಳು ಅಥವಾ ವಹಿವಾಟುಗಳಿಗೆ ಹಣಕಾಸಿನ ಫಲಿತಾಂಶಗಳು ಇತ್ಯಾದಿಗಳಿಗೆ ಮುಕ್ತ ಕರೆನ್ಸಿ ಸ್ಥಾನವನ್ನು ತ್ವರಿತವಾಗಿ ಲೆಕ್ಕಾಚಾರ ಮಾಡುವ ಉದ್ದೇಶಕ್ಕಾಗಿ ಇದನ್ನು ಮಾಡಬಹುದು.
ಮ್ಯಾಜಿಕ್ ಮತ್ತು ಮ್ಯಾಜಿಕ್ ಮಂತ್ರಗಳ ಬಳಕೆಯಿಲ್ಲದೆ ಈ ಪ್ರಕರಣವನ್ನು ಹೇಗೆ ಕಾರ್ಯಗತಗೊಳಿಸುವುದು - ಕಟ್ ಅಡಿಯಲ್ಲಿ ಓದಿ! ಹೋಗು!
ಸಹಜವಾಗಿ, ನೈಜ ಸಮಯದಲ್ಲಿ ಹೆಚ್ಚಿನ ಪ್ರಮಾಣದ ಡೇಟಾವನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸುವುದು ಆಧುನಿಕ ವ್ಯವಸ್ಥೆಗಳಲ್ಲಿ ಬಳಕೆಗೆ ಸಾಕಷ್ಟು ಅವಕಾಶಗಳನ್ನು ಒದಗಿಸುತ್ತದೆ. ಇದಕ್ಕಾಗಿ ಅತ್ಯಂತ ಜನಪ್ರಿಯ ಸಂಯೋಜನೆಗಳೆಂದರೆ ಅಪಾಚೆ ಕಾಫ್ಕಾ ಮತ್ತು ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್ನ ಸಂಯೋಜನೆಯಾಗಿದೆ, ಅಲ್ಲಿ ಕಾಫ್ಕಾ ಒಳಬರುವ ಸಂದೇಶ ಪ್ಯಾಕೆಟ್ಗಳ ಸ್ಟ್ರೀಮ್ ಅನ್ನು ರಚಿಸುತ್ತದೆ ಮತ್ತು ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್ ಈ ಪ್ಯಾಕೆಟ್ಗಳನ್ನು ನಿರ್ದಿಷ್ಟ ಸಮಯದ ಮಧ್ಯಂತರದಲ್ಲಿ ಪ್ರಕ್ರಿಯೆಗೊಳಿಸುತ್ತದೆ.
ಅಪ್ಲಿಕೇಶನ್ನ ದೋಷ ಸಹಿಷ್ಣುತೆಯನ್ನು ಹೆಚ್ಚಿಸಲು, ನಾವು ಚೆಕ್ಪಾಯಿಂಟ್ಗಳನ್ನು ಬಳಸುತ್ತೇವೆ. ಈ ಕಾರ್ಯವಿಧಾನದೊಂದಿಗೆ, ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್ ಎಂಜಿನ್ ಕಳೆದುಹೋದ ಡೇಟಾವನ್ನು ಮರುಪಡೆಯಲು ಅಗತ್ಯವಿರುವಾಗ, ಅದು ಕೊನೆಯ ಚೆಕ್ಪಾಯಿಂಟ್ಗೆ ಹಿಂತಿರುಗಿ ಮತ್ತು ಅಲ್ಲಿಂದ ಲೆಕ್ಕಾಚಾರಗಳನ್ನು ಪುನರಾರಂಭಿಸಬೇಕಾಗುತ್ತದೆ.
ಅಭಿವೃದ್ಧಿ ಹೊಂದಿದ ವ್ಯವಸ್ಥೆಯ ಆರ್ಕಿಟೆಕ್ಚರ್
ಬಳಸಿದ ಘಟಕಗಳು:
ಅಪಾಚೆ ಕಾಫ್ಕಾ ವಿತರಿಸಿದ ಪ್ರಕಟಣೆ-ಚಂದಾದಾರ ಸಂದೇಶ ವ್ಯವಸ್ಥೆಯಾಗಿದೆ. ಆಫ್ಲೈನ್ ಮತ್ತು ಆನ್ಲೈನ್ ಸಂದೇಶ ಬಳಕೆ ಎರಡಕ್ಕೂ ಸೂಕ್ತವಾಗಿದೆ. ಡೇಟಾ ನಷ್ಟವನ್ನು ತಡೆಗಟ್ಟಲು, ಕಾಫ್ಕಾ ಸಂದೇಶಗಳನ್ನು ಡಿಸ್ಕ್ನಲ್ಲಿ ಸಂಗ್ರಹಿಸಲಾಗುತ್ತದೆ ಮತ್ತು ಕ್ಲಸ್ಟರ್ನಲ್ಲಿ ಪುನರಾವರ್ತಿಸಲಾಗುತ್ತದೆ. ಕಾಫ್ಕಾ ವ್ಯವಸ್ಥೆಯನ್ನು ZooKeeper ಸಿಂಕ್ರೊನೈಸೇಶನ್ ಸೇವೆಯ ಮೇಲೆ ನಿರ್ಮಿಸಲಾಗಿದೆ;
ಅಪಾಚೆ ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್ - ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾವನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸಲು ಸ್ಪಾರ್ಕ್ ಘಟಕ. ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್ ಮಾಡ್ಯೂಲ್ ಅನ್ನು ಮೈಕ್ರೋ-ಬ್ಯಾಚ್ ಆರ್ಕಿಟೆಕ್ಚರ್ ಬಳಸಿ ನಿರ್ಮಿಸಲಾಗಿದೆ, ಅಲ್ಲಿ ಡೇಟಾ ಸ್ಟ್ರೀಮ್ ಅನ್ನು ಸಣ್ಣ ಡೇಟಾ ಪ್ಯಾಕೆಟ್ಗಳ ನಿರಂತರ ಅನುಕ್ರಮವಾಗಿ ಅರ್ಥೈಸಲಾಗುತ್ತದೆ. ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್ ವಿವಿಧ ಮೂಲಗಳಿಂದ ಡೇಟಾವನ್ನು ತೆಗೆದುಕೊಳ್ಳುತ್ತದೆ ಮತ್ತು ಅದನ್ನು ಸಣ್ಣ ಪ್ಯಾಕೇಜ್ಗಳಾಗಿ ಸಂಯೋಜಿಸುತ್ತದೆ. ನಿಯಮಿತ ಮಧ್ಯಂತರದಲ್ಲಿ ಹೊಸ ಪ್ಯಾಕೇಜ್ಗಳನ್ನು ರಚಿಸಲಾಗುತ್ತದೆ. ಪ್ರತಿ ಸಮಯದ ಮಧ್ಯಂತರದ ಆರಂಭದಲ್ಲಿ, ಹೊಸ ಪ್ಯಾಕೆಟ್ ಅನ್ನು ರಚಿಸಲಾಗುತ್ತದೆ ಮತ್ತು ಆ ಮಧ್ಯಂತರದಲ್ಲಿ ಸ್ವೀಕರಿಸಿದ ಯಾವುದೇ ಡೇಟಾವನ್ನು ಪ್ಯಾಕೆಟ್ನಲ್ಲಿ ಸೇರಿಸಲಾಗುತ್ತದೆ. ಮಧ್ಯಂತರದ ಕೊನೆಯಲ್ಲಿ, ಪ್ಯಾಕೆಟ್ ಬೆಳವಣಿಗೆ ನಿಲ್ಲುತ್ತದೆ. ಮಧ್ಯಂತರದ ಗಾತ್ರವನ್ನು ಬ್ಯಾಚ್ ಮಧ್ಯಂತರ ಎಂಬ ನಿಯತಾಂಕದಿಂದ ನಿರ್ಧರಿಸಲಾಗುತ್ತದೆ;
ಅಪಾಚೆ ಸ್ಪಾರ್ಕ್ SQL - ಸ್ಪಾರ್ಕ್ ಕ್ರಿಯಾತ್ಮಕ ಪ್ರೋಗ್ರಾಮಿಂಗ್ನೊಂದಿಗೆ ಸಂಬಂಧಿತ ಸಂಸ್ಕರಣೆಯನ್ನು ಸಂಯೋಜಿಸುತ್ತದೆ. ರಚನಾತ್ಮಕ ಡೇಟಾ ಎಂದರೆ ಸ್ಕೀಮಾವನ್ನು ಹೊಂದಿರುವ ಡೇಟಾ, ಅಂದರೆ, ಎಲ್ಲಾ ದಾಖಲೆಗಳಿಗಾಗಿ ಕ್ಷೇತ್ರಗಳ ಒಂದು ಸೆಟ್. Spark SQL ವಿವಿಧ ರಚನಾತ್ಮಕ ಡೇಟಾ ಮೂಲಗಳಿಂದ ಇನ್ಪುಟ್ ಅನ್ನು ಬೆಂಬಲಿಸುತ್ತದೆ ಮತ್ತು ಸ್ಕೀಮಾ ಮಾಹಿತಿಯ ಲಭ್ಯತೆಗೆ ಧನ್ಯವಾದಗಳು, ಇದು ದಾಖಲೆಗಳ ಅಗತ್ಯವಿರುವ ಕ್ಷೇತ್ರಗಳನ್ನು ಮಾತ್ರ ಸಮರ್ಥವಾಗಿ ಹಿಂಪಡೆಯಬಹುದು ಮತ್ತು DataFrame API ಗಳನ್ನು ಸಹ ಒದಗಿಸುತ್ತದೆ;
AWS RDS ಇದು ತುಲನಾತ್ಮಕವಾಗಿ ಅಗ್ಗದ ಕ್ಲೌಡ್-ಆಧಾರಿತ ಸಂಬಂಧಿತ ಡೇಟಾಬೇಸ್ ಆಗಿದೆ, ಇದು ಸೆಟಪ್, ಕಾರ್ಯಾಚರಣೆ ಮತ್ತು ಸ್ಕೇಲಿಂಗ್ ಅನ್ನು ಸರಳಗೊಳಿಸುವ ವೆಬ್ ಸೇವೆಯಾಗಿದೆ ಮತ್ತು ನೇರವಾಗಿ Amazon ನಿಂದ ನಿರ್ವಹಿಸಲ್ಪಡುತ್ತದೆ.
ಕಾಫ್ಕಾ ಸರ್ವರ್ ಅನ್ನು ಸ್ಥಾಪಿಸುವುದು ಮತ್ತು ಚಾಲನೆ ಮಾಡುವುದು
ಕಾಫ್ಕಾವನ್ನು ನೇರವಾಗಿ ಬಳಸುವ ಮೊದಲು, ನೀವು ಜಾವಾವನ್ನು ಹೊಂದಿರುವಿರಾ ಎಂದು ಖಚಿತಪಡಿಸಿಕೊಳ್ಳಬೇಕು, ಏಕೆಂದರೆ... JVM ಅನ್ನು ಕೆಲಸಕ್ಕಾಗಿ ಬಳಸಲಾಗುತ್ತದೆ:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
ಮುಂದಿನ ಹಂತವು ಐಚ್ಛಿಕವಾಗಿರುತ್ತದೆ. ವಾಸ್ತವವೆಂದರೆ ಡೀಫಾಲ್ಟ್ ಸೆಟ್ಟಿಂಗ್ಗಳು ಅಪಾಚೆ ಕಾಫ್ಕಾದ ಎಲ್ಲಾ ಸಾಮರ್ಥ್ಯಗಳನ್ನು ಸಂಪೂರ್ಣವಾಗಿ ಬಳಸಲು ನಿಮಗೆ ಅನುಮತಿಸುವುದಿಲ್ಲ. ಉದಾಹರಣೆಗೆ, ಸಂದೇಶಗಳನ್ನು ಪ್ರಕಟಿಸಬಹುದಾದ ವಿಷಯ, ವರ್ಗ, ಗುಂಪನ್ನು ಅಳಿಸಿ. ಇದನ್ನು ಬದಲಾಯಿಸಲು, ಕಾನ್ಫಿಗರೇಶನ್ ಫೈಲ್ ಅನ್ನು ಸಂಪಾದಿಸೋಣ:
vim ~/kafka/config/server.properties
ಫೈಲ್ನ ಕೊನೆಯಲ್ಲಿ ಈ ಕೆಳಗಿನವುಗಳನ್ನು ಸೇರಿಸಿ:
delete.topic.enable = true
ಕಾಫ್ಕಾ ಸರ್ವರ್ ಅನ್ನು ಪ್ರಾರಂಭಿಸುವ ಮೊದಲು, ನೀವು ZooKeeper ಸರ್ವರ್ ಅನ್ನು ಪ್ರಾರಂಭಿಸಬೇಕು; ನಾವು ಕಾಫ್ಕಾ ವಿತರಣೆಯೊಂದಿಗೆ ಬರುವ ಸಹಾಯಕ ಸ್ಕ್ರಿಪ್ಟ್ ಅನ್ನು ಬಳಸುತ್ತೇವೆ:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
ZooKeeper ಯಶಸ್ವಿಯಾಗಿ ಪ್ರಾರಂಭಿಸಿದ ನಂತರ, ಪ್ರತ್ಯೇಕ ಟರ್ಮಿನಲ್ನಲ್ಲಿ ಕಾಫ್ಕಾ ಸರ್ವರ್ ಅನ್ನು ಪ್ರಾರಂಭಿಸಿ:
ಹೊಸದಾಗಿ ರಚಿಸಲಾದ ವಿಷಯಕ್ಕಾಗಿ ನಿರ್ಮಾಪಕ ಮತ್ತು ಗ್ರಾಹಕರನ್ನು ಪರೀಕ್ಷಿಸುವ ಕ್ಷಣಗಳನ್ನು ಕಳೆದುಕೊಳ್ಳೋಣ. ಸಂದೇಶಗಳನ್ನು ಕಳುಹಿಸುವುದು ಮತ್ತು ಸ್ವೀಕರಿಸುವುದನ್ನು ನೀವು ಹೇಗೆ ಪರೀಕ್ಷಿಸಬಹುದು ಎಂಬುದರ ಕುರಿತು ಹೆಚ್ಚಿನ ವಿವರಗಳನ್ನು ಅಧಿಕೃತ ದಾಖಲೆಯಲ್ಲಿ ಬರೆಯಲಾಗಿದೆ - ಕೆಲವು ಸಂದೇಶಗಳನ್ನು ಕಳುಹಿಸಿ. ಸರಿ, ನಾವು ಕಾಫ್ಕಾಪ್ರೊಡ್ಯೂಸರ್ API ಅನ್ನು ಬಳಸಿಕೊಂಡು ಪೈಥಾನ್ನಲ್ಲಿ ನಿರ್ಮಾಪಕರನ್ನು ಬರೆಯಲು ಮುಂದುವರಿಯುತ್ತೇವೆ.
ನಿರ್ಮಾಪಕ ಬರವಣಿಗೆ
ನಿರ್ಮಾಪಕರು ಯಾದೃಚ್ಛಿಕ ಡೇಟಾವನ್ನು ರಚಿಸುತ್ತಾರೆ - ಪ್ರತಿ ಸೆಕೆಂಡಿಗೆ 100 ಸಂದೇಶಗಳು. ಯಾದೃಚ್ಛಿಕ ಡೇಟಾದಿಂದ ನಾವು ಮೂರು ಕ್ಷೇತ್ರಗಳನ್ನು ಒಳಗೊಂಡಿರುವ ನಿಘಂಟನ್ನು ಅರ್ಥೈಸುತ್ತೇವೆ:
ಶಾಖೆ - ಕ್ರೆಡಿಟ್ ಸಂಸ್ಥೆಯ ಮಾರಾಟದ ಬಿಂದುವಿನ ಹೆಸರು;
ಕರೆನ್ಸಿ - ವಹಿವಾಟು ಕರೆನ್ಸಿ;
ಪ್ರಮಾಣ - ವಹಿವಾಟು ಮೊತ್ತ. ಬ್ಯಾಂಕಿನಿಂದ ಕರೆನ್ಸಿಯ ಖರೀದಿಯಾಗಿದ್ದರೆ ಮೊತ್ತವು ಧನಾತ್ಮಕ ಸಂಖ್ಯೆಯಾಗಿರುತ್ತದೆ ಮತ್ತು ಅದು ಮಾರಾಟವಾಗಿದ್ದರೆ ಋಣಾತ್ಮಕ ಸಂಖ್ಯೆಯಾಗಿದೆ.
ಮುಂದೆ, ಕಳುಹಿಸುವ ವಿಧಾನವನ್ನು ಬಳಸಿಕೊಂಡು, ನಾವು ಸರ್ವರ್ಗೆ ಸಂದೇಶವನ್ನು ಕಳುಹಿಸುತ್ತೇವೆ, ನಮಗೆ ಅಗತ್ಯವಿರುವ ವಿಷಯಕ್ಕೆ, JSON ಸ್ವರೂಪದಲ್ಲಿ:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
ಸ್ಕ್ರಿಪ್ಟ್ ಅನ್ನು ಚಾಲನೆ ಮಾಡುವಾಗ, ನಾವು ಟರ್ಮಿನಲ್ನಲ್ಲಿ ಈ ಕೆಳಗಿನ ಸಂದೇಶಗಳನ್ನು ಸ್ವೀಕರಿಸುತ್ತೇವೆ:
ಇದರರ್ಥ ಎಲ್ಲವೂ ನಾವು ಬಯಸಿದಂತೆ ಕಾರ್ಯನಿರ್ವಹಿಸುತ್ತದೆ - ನಿರ್ಮಾಪಕರು ನಮಗೆ ಅಗತ್ಯವಿರುವ ವಿಷಯಕ್ಕೆ ಸಂದೇಶಗಳನ್ನು ಉತ್ಪಾದಿಸುತ್ತಾರೆ ಮತ್ತು ಕಳುಹಿಸುತ್ತಾರೆ.
ಮುಂದಿನ ಹಂತವೆಂದರೆ ಸ್ಪಾರ್ಕ್ ಅನ್ನು ಸ್ಥಾಪಿಸುವುದು ಮತ್ತು ಈ ಸಂದೇಶ ಸ್ಟ್ರೀಮ್ ಅನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸುವುದು.
ಅಪಾಚೆ ಸ್ಪಾರ್ಕ್ ಅನ್ನು ಸ್ಥಾಪಿಸಲಾಗುತ್ತಿದೆ
ಅಪಾಚೆ ಸ್ಪಾರ್ಕ್ ಸಾರ್ವತ್ರಿಕ ಮತ್ತು ಉನ್ನತ-ಕಾರ್ಯಕ್ಷಮತೆಯ ಕ್ಲಸ್ಟರ್ ಕಂಪ್ಯೂಟಿಂಗ್ ವೇದಿಕೆಯಾಗಿದೆ.
ಸ್ಪಾರ್ಕ್ ಮ್ಯಾಪ್ರೆಡ್ಯೂಸ್ ಮಾದರಿಯ ಜನಪ್ರಿಯ ಅಳವಡಿಕೆಗಳಿಗಿಂತ ಉತ್ತಮವಾಗಿ ಕಾರ್ಯನಿರ್ವಹಿಸುತ್ತದೆ ಮತ್ತು ಸಂವಾದಾತ್ಮಕ ಪ್ರಶ್ನೆಗಳು ಮತ್ತು ಸ್ಟ್ರೀಮ್ ಪ್ರಕ್ರಿಯೆ ಸೇರಿದಂತೆ ವ್ಯಾಪಕ ಶ್ರೇಣಿಯ ಕಂಪ್ಯೂಟೇಶನ್ ಪ್ರಕಾರಗಳನ್ನು ಬೆಂಬಲಿಸುತ್ತದೆ. ಹೆಚ್ಚಿನ ಪ್ರಮಾಣದ ಡೇಟಾವನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸುವಾಗ ವೇಗವು ಪ್ರಮುಖ ಪಾತ್ರವನ್ನು ವಹಿಸುತ್ತದೆ, ಏಕೆಂದರೆ ಇದು ನಿಮಿಷಗಳು ಅಥವಾ ಗಂಟೆಗಳ ಕಾಲ ಕಾಯದೆ ಸಂವಾದಾತ್ಮಕವಾಗಿ ಕೆಲಸ ಮಾಡಲು ನಿಮಗೆ ಅನುವು ಮಾಡಿಕೊಡುತ್ತದೆ. ಸ್ಪಾರ್ಕ್ನ ದೊಡ್ಡ ಸಾಮರ್ಥ್ಯವೆಂದರೆ ಅದನ್ನು ವೇಗವಾಗಿ ಮಾಡುವ ಸಾಮರ್ಥ್ಯವೆಂದರೆ ಇನ್-ಮೆಮೊರಿ ಲೆಕ್ಕಾಚಾರಗಳನ್ನು ನಿರ್ವಹಿಸುವ ಸಾಮರ್ಥ್ಯ.
ಈ ಚೌಕಟ್ಟನ್ನು ಸ್ಕಾಲಾದಲ್ಲಿ ಬರೆಯಲಾಗಿದೆ, ಆದ್ದರಿಂದ ನೀವು ಅದನ್ನು ಮೊದಲು ಸ್ಥಾಪಿಸಬೇಕಾಗಿದೆ:
sudo apt-get install scala
ಅಧಿಕೃತ ವೆಬ್ಸೈಟ್ನಿಂದ ಸ್ಪಾರ್ಕ್ ವಿತರಣೆಯನ್ನು ಡೌನ್ಲೋಡ್ ಮಾಡಿ:
ಏಕೆಂದರೆ ಈ ಉದಾಹರಣೆಯು ಶೈಕ್ಷಣಿಕ ಉದ್ದೇಶಗಳಿಗಾಗಿ ಮಾತ್ರ; ನಾವು ಉಚಿತ ಸರ್ವರ್ ಅನ್ನು "ಕನಿಷ್ಠ" (ಉಚಿತ ಶ್ರೇಣಿ) ಬಳಸುತ್ತೇವೆ:
ಮುಂದೆ, ನಾವು ಫ್ರೀ ಟೈರ್ ಬ್ಲಾಕ್ನಲ್ಲಿ ಟಿಕ್ ಅನ್ನು ಹಾಕುತ್ತೇವೆ ಮತ್ತು ಅದರ ನಂತರ ನಮಗೆ ಸ್ವಯಂಚಾಲಿತವಾಗಿ t2.micro ವರ್ಗದ ಉದಾಹರಣೆಯನ್ನು ನೀಡಲಾಗುವುದು - ದುರ್ಬಲವಾಗಿದ್ದರೂ, ಇದು ಉಚಿತ ಮತ್ತು ನಮ್ಮ ಕಾರ್ಯಕ್ಕೆ ಸಾಕಷ್ಟು ಸೂಕ್ತವಾಗಿದೆ:
ಮುಂದೆ ಬಹಳ ಮುಖ್ಯವಾದ ವಿಷಯಗಳು ಬರುತ್ತವೆ: ಡೇಟಾಬೇಸ್ ನಿದರ್ಶನದ ಹೆಸರು, ಮಾಸ್ಟರ್ ಬಳಕೆದಾರರ ಹೆಸರು ಮತ್ತು ಅವರ ಪಾಸ್ವರ್ಡ್. ನಿದರ್ಶನವನ್ನು ಹೆಸರಿಸೋಣ: myHabrTest, ಮಾಸ್ಟರ್ ಬಳಕೆದಾರ: ಹಬ್ರ್, ಗುಪ್ತಪದ: habr12345 ಮತ್ತು ಮುಂದಿನ ಬಟನ್ ಕ್ಲಿಕ್ ಮಾಡಿ:
ಮುಂದಿನ ಪುಟದಲ್ಲಿ ಹೊರಗಿನಿಂದ ನಮ್ಮ ಡೇಟಾಬೇಸ್ ಸರ್ವರ್ನ ಪ್ರವೇಶಕ್ಕೆ ಜವಾಬ್ದಾರರಾಗಿರುವ ನಿಯತಾಂಕಗಳಿವೆ (ಸಾರ್ವಜನಿಕ ಪ್ರವೇಶ) ಮತ್ತು ಪೋರ್ಟ್ ಲಭ್ಯತೆ:
VPC ಭದ್ರತಾ ಗುಂಪಿಗೆ ಹೊಸ ಸೆಟ್ಟಿಂಗ್ ಅನ್ನು ರಚಿಸೋಣ, ಇದು ಪೋರ್ಟ್ 5432 (PostgreSQL) ಮೂಲಕ ನಮ್ಮ ಡೇಟಾಬೇಸ್ ಸರ್ವರ್ಗೆ ಬಾಹ್ಯ ಪ್ರವೇಶವನ್ನು ಅನುಮತಿಸುತ್ತದೆ.
VPC ಡ್ಯಾಶ್ಬೋರ್ಡ್ಗೆ ಪ್ರತ್ಯೇಕ ಬ್ರೌಸರ್ ವಿಂಡೋದಲ್ಲಿ AWS ಕನ್ಸೋಲ್ಗೆ ಹೋಗೋಣ -> ಭದ್ರತಾ ಗುಂಪುಗಳು -> ಭದ್ರತಾ ಗುಂಪು ವಿಭಾಗವನ್ನು ರಚಿಸಿ:
ನಾವು ಭದ್ರತಾ ಗುಂಪಿಗೆ ಹೆಸರನ್ನು ಹೊಂದಿಸಿದ್ದೇವೆ - PostgreSQL, ವಿವರಣೆ, ಈ ಗುಂಪನ್ನು ಯಾವ VPC ಯೊಂದಿಗೆ ಸಂಯೋಜಿಸಬೇಕು ಎಂಬುದನ್ನು ಸೂಚಿಸಿ ಮತ್ತು ರಚಿಸಿ ಬಟನ್ ಕ್ಲಿಕ್ ಮಾಡಿ:
ಕೆಳಗಿನ ಚಿತ್ರದಲ್ಲಿ ತೋರಿಸಿರುವಂತೆ ಹೊಸದಾಗಿ ರಚಿಸಲಾದ ಗುಂಪಿಗೆ ಪೋರ್ಟ್ 5432 ಗಾಗಿ ಒಳಬರುವ ನಿಯಮಗಳನ್ನು ಭರ್ತಿ ಮಾಡಿ. ನೀವು ಪೋರ್ಟ್ ಅನ್ನು ಹಸ್ತಚಾಲಿತವಾಗಿ ನಿರ್ದಿಷ್ಟಪಡಿಸಲು ಸಾಧ್ಯವಿಲ್ಲ, ಆದರೆ ಟೈಪ್ ಡ್ರಾಪ್-ಡೌನ್ ಪಟ್ಟಿಯಿಂದ PostgreSQL ಅನ್ನು ಆಯ್ಕೆ ಮಾಡಿ.
ಕಟ್ಟುನಿಟ್ಟಾಗಿ ಹೇಳುವುದಾದರೆ, ಮೌಲ್ಯ ::/0 ಎಂದರೆ ಪ್ರಪಂಚದಾದ್ಯಂತದ ಸರ್ವರ್ಗೆ ಒಳಬರುವ ದಟ್ಟಣೆಯ ಲಭ್ಯತೆ, ಇದು ಅಂಗೀಕೃತವಾಗಿ ಸಂಪೂರ್ಣವಾಗಿ ನಿಜವಲ್ಲ, ಆದರೆ ಉದಾಹರಣೆಯನ್ನು ವಿಶ್ಲೇಷಿಸಲು, ಈ ವಿಧಾನವನ್ನು ಬಳಸಲು ನಾವೇ ಅನುಮತಿಸೋಣ:
ನಾವು ಬ್ರೌಸರ್ ಪುಟಕ್ಕೆ ಹಿಂತಿರುಗುತ್ತೇವೆ, ಅಲ್ಲಿ ನಾವು "ಸುಧಾರಿತ ಸೆಟ್ಟಿಂಗ್ಗಳನ್ನು ಕಾನ್ಫಿಗರ್ ಮಾಡಿ" ಅನ್ನು ತೆರೆಯುತ್ತೇವೆ ಮತ್ತು VPC ಭದ್ರತಾ ಗುಂಪುಗಳ ವಿಭಾಗದಲ್ಲಿ ಆಯ್ಕೆಮಾಡಿ -> ಅಸ್ತಿತ್ವದಲ್ಲಿರುವ VPC ಭದ್ರತಾ ಗುಂಪುಗಳನ್ನು ಆರಿಸಿ -> PostgreSQL:
ಮುಂದೆ, ಡೇಟಾಬೇಸ್ ಆಯ್ಕೆಗಳಲ್ಲಿ -> ಡೇಟಾಬೇಸ್ ಹೆಸರು -> ಹೆಸರನ್ನು ಹೊಂದಿಸಿ - habrDB.
ಡೀಫಾಲ್ಟ್ ಆಗಿ ಬ್ಯಾಕಪ್ (ಬ್ಯಾಕಪ್ ಧಾರಣ ಅವಧಿ - 0 ದಿನಗಳು), ಮೇಲ್ವಿಚಾರಣೆ ಮತ್ತು ಕಾರ್ಯಕ್ಷಮತೆಯ ಒಳನೋಟಗಳನ್ನು ನಿಷ್ಕ್ರಿಯಗೊಳಿಸುವುದನ್ನು ಹೊರತುಪಡಿಸಿ ಉಳಿದ ನಿಯತಾಂಕಗಳನ್ನು ನಾವು ಬಿಡಬಹುದು. ಬಟನ್ ಮೇಲೆ ಕ್ಲಿಕ್ ಮಾಡಿ ಡೇಟಾಬೇಸ್ ರಚಿಸಿ:
ಥ್ರೆಡ್ ಹ್ಯಾಂಡ್ಲರ್
ಅಂತಿಮ ಹಂತವು ಸ್ಪಾರ್ಕ್ ಕೆಲಸದ ಅಭಿವೃದ್ಧಿಯಾಗಿರುತ್ತದೆ, ಇದು ಕಾಫ್ಕಾದಿಂದ ಪ್ರತಿ ಎರಡು ಸೆಕೆಂಡಿಗೆ ಬರುವ ಹೊಸ ಡೇಟಾವನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸುತ್ತದೆ ಮತ್ತು ಫಲಿತಾಂಶವನ್ನು ಡೇಟಾಬೇಸ್ಗೆ ನಮೂದಿಸುತ್ತದೆ.
ಮೇಲೆ ತಿಳಿಸಿದಂತೆ, ಚೆಕ್ಪಾಯಿಂಟ್ಗಳು ಸ್ಪಾರ್ಕ್ಸ್ಟ್ರೀಮಿಂಗ್ನಲ್ಲಿ ಪ್ರಮುಖ ಕಾರ್ಯವಿಧಾನವಾಗಿದ್ದು, ದೋಷ ಸಹಿಷ್ಣುತೆಯನ್ನು ಖಚಿತಪಡಿಸಿಕೊಳ್ಳಲು ಅದನ್ನು ಕಾನ್ಫಿಗರ್ ಮಾಡಬೇಕು. ನಾವು ಚೆಕ್ಪಾಯಿಂಟ್ಗಳನ್ನು ಬಳಸುತ್ತೇವೆ ಮತ್ತು ಕಾರ್ಯವಿಧಾನವು ವಿಫಲವಾದಲ್ಲಿ, ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್ ಮಾಡ್ಯೂಲ್ ಕಳೆದ ಚೆಕ್ಪಾಯಿಂಟ್ಗೆ ಹಿಂತಿರುಗಬೇಕಾಗುತ್ತದೆ ಮತ್ತು ಕಳೆದುಹೋದ ಡೇಟಾವನ್ನು ಮರುಪಡೆಯಲು ಅದರಿಂದ ಲೆಕ್ಕಾಚಾರಗಳನ್ನು ಪುನರಾರಂಭಿಸಬೇಕಾಗುತ್ತದೆ.
ಚೆಕ್ಪಾಯಿಂಟ್ ಮಾಹಿತಿಯನ್ನು ಸಂಗ್ರಹಿಸಲಾಗುವ ದೋಷ-ಸಹಿಷ್ಣು, ವಿಶ್ವಾಸಾರ್ಹ ಫೈಲ್ ಸಿಸ್ಟಮ್ (HDFS, S3, ಇತ್ಯಾದಿ) ನಲ್ಲಿ ಡೈರೆಕ್ಟರಿಯನ್ನು ಹೊಂದಿಸುವ ಮೂಲಕ ಚೆಕ್ಪಾಯಿಂಟಿಂಗ್ ಅನ್ನು ಸಕ್ರಿಯಗೊಳಿಸಬಹುದು. ಇದನ್ನು ಬಳಸಿ ಮಾಡಲಾಗುತ್ತದೆ, ಉದಾಹರಣೆಗೆ:
streamingContext.checkpoint(checkpointDirectory)
ನಮ್ಮ ಉದಾಹರಣೆಯಲ್ಲಿ, ನಾವು ಈ ಕೆಳಗಿನ ವಿಧಾನವನ್ನು ಬಳಸುತ್ತೇವೆ, ಅವುಗಳೆಂದರೆ, ಚೆಕ್ಪಾಯಿಂಟ್ ಡೈರೆಕ್ಟರಿ ಅಸ್ತಿತ್ವದಲ್ಲಿದ್ದರೆ, ನಂತರ ಚೆಕ್ಪಾಯಿಂಟ್ ಡೇಟಾದಿಂದ ಸಂದರ್ಭವನ್ನು ಮರುಸೃಷ್ಟಿಸಲಾಗುತ್ತದೆ. ಡೈರೆಕ್ಟರಿ ಅಸ್ತಿತ್ವದಲ್ಲಿಲ್ಲದಿದ್ದರೆ (ಅಂದರೆ ಮೊದಲ ಬಾರಿಗೆ ಕಾರ್ಯಗತಗೊಳಿಸಿದರೆ), ನಂತರ ಹೊಸ ಸಂದರ್ಭವನ್ನು ರಚಿಸಲು ಮತ್ತು ಡಿಸ್ಟ್ರೀಮ್ಗಳನ್ನು ಕಾನ್ಫಿಗರ್ ಮಾಡಲು ಫಂಕ್ಷನ್ಟುಕ್ರಿಯೇಟ್ ಕಾಂಟೆಕ್ಸ್ಟ್ ಅನ್ನು ಕರೆಯಲಾಗುತ್ತದೆ:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
KafkaUtils ಲೈಬ್ರರಿಯ createDirectStream ವಿಧಾನವನ್ನು ಬಳಸಿಕೊಂಡು "ವಹಿವಾಟು" ವಿಷಯಕ್ಕೆ ಸಂಪರ್ಕಿಸಲು ನಾವು ಡೈರೆಕ್ಟ್ಸ್ಟ್ರೀಮ್ ವಸ್ತುವನ್ನು ರಚಿಸುತ್ತೇವೆ:
ಸ್ಪಾರ್ಕ್ SQL ಅನ್ನು ಬಳಸಿಕೊಂಡು, ನಾವು ಸರಳವಾದ ಗುಂಪನ್ನು ಮಾಡುತ್ತೇವೆ ಮತ್ತು ಫಲಿತಾಂಶವನ್ನು ಕನ್ಸೋಲ್ನಲ್ಲಿ ಪ್ರದರ್ಶಿಸುತ್ತೇವೆ:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
ಪ್ರಶ್ನೆ ಪಠ್ಯವನ್ನು ಪಡೆಯುವುದು ಮತ್ತು ಸ್ಪಾರ್ಕ್ SQL ಮೂಲಕ ಚಾಲನೆ ಮಾಡುವುದು:
ತದನಂತರ ನಾವು ಒಟ್ಟುಗೂಡಿದ ಡೇಟಾವನ್ನು AWS RDS ನಲ್ಲಿ ಟೇಬಲ್ಗೆ ಉಳಿಸುತ್ತೇವೆ. ಒಟ್ಟುಗೂಡಿಸುವಿಕೆಯ ಫಲಿತಾಂಶಗಳನ್ನು ಡೇಟಾಬೇಸ್ ಟೇಬಲ್ಗೆ ಉಳಿಸಲು, ನಾವು ಡೇಟಾಫ್ರೇಮ್ ಆಬ್ಜೆಕ್ಟ್ನ ಬರೆಯುವ ವಿಧಾನವನ್ನು ಬಳಸುತ್ತೇವೆ:
AWS RDS ಗೆ ಸಂಪರ್ಕವನ್ನು ಹೊಂದಿಸುವ ಕುರಿತು ಕೆಲವು ಪದಗಳು. "AWS PostgreSQL ಅನ್ನು ನಿಯೋಜಿಸಲಾಗುತ್ತಿದೆ" ಹಂತದಲ್ಲಿ ನಾವು ಬಳಕೆದಾರ ಮತ್ತು ಪಾಸ್ವರ್ಡ್ ಅನ್ನು ರಚಿಸಿದ್ದೇವೆ. ನೀವು ಎಂಡ್ಪಾಯಿಂಟ್ ಅನ್ನು ಡೇಟಾಬೇಸ್ ಸರ್ವರ್ url ಆಗಿ ಬಳಸಬೇಕು, ಇದನ್ನು ಕನೆಕ್ಟಿವಿಟಿ ಮತ್ತು ಸೆಕ್ಯುರಿಟಿ ವಿಭಾಗದಲ್ಲಿ ಪ್ರದರ್ಶಿಸಲಾಗುತ್ತದೆ:
ಸ್ಪಾರ್ಕ್ ಮತ್ತು ಕಾಫ್ಕಾವನ್ನು ಸರಿಯಾಗಿ ಸಂಪರ್ಕಿಸಲು, ನೀವು ಕಲಾಕೃತಿಯನ್ನು ಬಳಸಿಕೊಂಡು ಸ್ಮಾರ್ಕ್ ಸಲ್ಲಿಸುವ ಮೂಲಕ ಕೆಲಸವನ್ನು ಚಲಾಯಿಸಬೇಕು ಸ್ಪಾರ್ಕ್-ಸ್ಟ್ರೀಮಿಂಗ್-ಕಾಫ್ಕಾ-0-8_2.11. ಹೆಚ್ಚುವರಿಯಾಗಿ, PostgreSQL ಡೇಟಾಬೇಸ್ನೊಂದಿಗೆ ಸಂವಹನ ನಡೆಸಲು ನಾವು ಕಲಾಕೃತಿಯನ್ನು ಸಹ ಬಳಸುತ್ತೇವೆ; ನಾವು ಅವುಗಳನ್ನು --packages ಮೂಲಕ ವರ್ಗಾಯಿಸುತ್ತೇವೆ.
ಸ್ಕ್ರಿಪ್ಟ್ನ ನಮ್ಯತೆಗಾಗಿ, ನಾವು ಇನ್ಪುಟ್ ಪ್ಯಾರಾಮೀಟರ್ಗಳಾಗಿ ಸಂದೇಶ ಸರ್ವರ್ನ ಹೆಸರು ಮತ್ತು ನಾವು ಡೇಟಾವನ್ನು ಸ್ವೀಕರಿಸಲು ಬಯಸುವ ವಿಷಯವನ್ನು ಸಹ ಸೇರಿಸುತ್ತೇವೆ.
ಆದ್ದರಿಂದ, ಸಿಸ್ಟಮ್ನ ಕಾರ್ಯವನ್ನು ಪ್ರಾರಂಭಿಸಲು ಮತ್ತು ಪರಿಶೀಲಿಸಲು ಇದು ಸಮಯ:
ಎಲ್ಲವೂ ಕೆಲಸ ಮಾಡಿದೆ! ಕೆಳಗಿನ ಚಿತ್ರದಲ್ಲಿ ನೀವು ನೋಡುವಂತೆ, ಅಪ್ಲಿಕೇಶನ್ ಚಾಲನೆಯಲ್ಲಿರುವಾಗ, ಹೊಸ ಒಟ್ಟುಗೂಡಿಸುವಿಕೆಯ ಫಲಿತಾಂಶಗಳು ಪ್ರತಿ 2 ಸೆಕೆಂಡುಗಳಿಗೆ ಔಟ್ಪುಟ್ ಆಗುತ್ತವೆ, ಏಕೆಂದರೆ ನಾವು StreamingContext ಆಬ್ಜೆಕ್ಟ್ ಅನ್ನು ರಚಿಸಿದಾಗ ನಾವು ಬ್ಯಾಚಿಂಗ್ ಮಧ್ಯಂತರವನ್ನು 2 ಸೆಕೆಂಡುಗಳಿಗೆ ಹೊಂದಿಸಿದ್ದೇವೆ:
ಮುಂದೆ, ಟೇಬಲ್ನಲ್ಲಿ ದಾಖಲೆಗಳ ಉಪಸ್ಥಿತಿಯನ್ನು ಪರಿಶೀಲಿಸಲು ನಾವು ಡೇಟಾಬೇಸ್ಗೆ ಸರಳವಾದ ಪ್ರಶ್ನೆಯನ್ನು ಮಾಡುತ್ತೇವೆ ವಹಿವಾಟು_ಹರಿವು:
ತೀರ್ಮಾನಕ್ಕೆ
ಈ ಲೇಖನವು Apache Kafka ಮತ್ತು PostgreSQL ಜೊತೆಗೆ ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್ ಅನ್ನು ಬಳಸಿಕೊಂಡು ಮಾಹಿತಿಯ ಸ್ಟ್ರೀಮ್ ಪ್ರಕ್ರಿಯೆಯ ಉದಾಹರಣೆಯನ್ನು ನೋಡಿದೆ. ವಿವಿಧ ಮೂಲಗಳಿಂದ ಡೇಟಾದ ಬೆಳವಣಿಗೆಯೊಂದಿಗೆ, ಸ್ಟ್ರೀಮಿಂಗ್ ಮತ್ತು ನೈಜ-ಸಮಯದ ಅಪ್ಲಿಕೇಶನ್ಗಳನ್ನು ರಚಿಸಲು ಸ್ಪಾರ್ಕ್ ಸ್ಟ್ರೀಮಿಂಗ್ನ ಪ್ರಾಯೋಗಿಕ ಮೌಲ್ಯವನ್ನು ಅತಿಯಾಗಿ ಅಂದಾಜು ಮಾಡುವುದು ಕಷ್ಟ.
ನನ್ನ ರೆಪೊಸಿಟರಿಯಲ್ಲಿ ನೀವು ಪೂರ್ಣ ಮೂಲ ಕೋಡ್ ಅನ್ನು ಕಾಣಬಹುದು GitHub.
ಈ ಲೇಖನವನ್ನು ಚರ್ಚಿಸಲು ನನಗೆ ಸಂತೋಷವಾಗಿದೆ, ನಿಮ್ಮ ಕಾಮೆಂಟ್ಗಳಿಗಾಗಿ ನಾನು ಎದುರು ನೋಡುತ್ತಿದ್ದೇನೆ ಮತ್ತು ಎಲ್ಲಾ ಕಾಳಜಿಯುಳ್ಳ ಓದುಗರಿಂದ ರಚನಾತ್ಮಕ ಟೀಕೆಗಳನ್ನು ಸಹ ನಾನು ಭಾವಿಸುತ್ತೇನೆ.
ನಾನು ನಿಮಗೆ ಯಶಸ್ಸನ್ನು ಬಯಸುತ್ತೇನೆ!
ಪಿ.ಎಸ್. ಆರಂಭದಲ್ಲಿ ಸ್ಥಳೀಯ PostgreSQL ಡೇಟಾಬೇಸ್ ಅನ್ನು ಬಳಸಲು ಯೋಜಿಸಲಾಗಿತ್ತು, ಆದರೆ AWS ಗಾಗಿ ನನ್ನ ಪ್ರೀತಿಯನ್ನು ಗಮನಿಸಿದರೆ, ನಾನು ಡೇಟಾಬೇಸ್ ಅನ್ನು ಕ್ಲೌಡ್ಗೆ ಸರಿಸಲು ನಿರ್ಧರಿಸಿದೆ. ಈ ವಿಷಯದ ಕುರಿತು ಮುಂದಿನ ಲೇಖನದಲ್ಲಿ, AWS ಕಿನೆಸಿಸ್ ಮತ್ತು AWS EMR ಅನ್ನು ಬಳಸಿಕೊಂಡು AWS ನಲ್ಲಿ ಮೇಲೆ ವಿವರಿಸಿದ ಸಂಪೂರ್ಣ ವ್ಯವಸ್ಥೆಯನ್ನು ಹೇಗೆ ಕಾರ್ಯಗತಗೊಳಿಸಬೇಕೆಂದು ನಾನು ತೋರಿಸುತ್ತೇನೆ. ಸುದ್ದಿಯನ್ನು ಅನುಸರಿಸಿ!