ಹಲೋ, ನಾನು ಡಿಮಿಟ್ರಿ ಲೋಗ್ವಿನೆಂಕೊ - ವೆಜೆಟ್ ಗ್ರೂಪ್ ಆಫ್ ಕಂಪನಿಗಳ ವಿಶ್ಲೇಷಣಾ ವಿಭಾಗದ ಡೇಟಾ ಇಂಜಿನಿಯರ್.
ಇಟಿಎಲ್ ಪ್ರಕ್ರಿಯೆಗಳನ್ನು ಅಭಿವೃದ್ಧಿಪಡಿಸಲು ಉತ್ತಮ ಸಾಧನದ ಬಗ್ಗೆ ನಾನು ನಿಮಗೆ ಹೇಳುತ್ತೇನೆ - ಅಪಾಚೆ ಏರ್ಫ್ಲೋ. ಆದರೆ ಗಾಳಿಯ ಹರಿವು ಬಹುಮುಖ ಮತ್ತು ಬಹುಮುಖಿಯಾಗಿದ್ದು, ನೀವು ಡೇಟಾ ಹರಿವುಗಳಲ್ಲಿ ಭಾಗಿಯಾಗದಿದ್ದರೂ ಸಹ ನೀವು ಅದನ್ನು ಹತ್ತಿರದಿಂದ ನೋಡಬೇಕು, ಆದರೆ ನಿಯತಕಾಲಿಕವಾಗಿ ಯಾವುದೇ ಪ್ರಕ್ರಿಯೆಗಳನ್ನು ಪ್ರಾರಂಭಿಸುವ ಮತ್ತು ಅವುಗಳ ಕಾರ್ಯಗತಗೊಳಿಸುವಿಕೆಯನ್ನು ಮೇಲ್ವಿಚಾರಣೆ ಮಾಡುವ ಅವಶ್ಯಕತೆಯಿದೆ.
ಮತ್ತು ಹೌದು, ನಾನು ನಿಮಗೆ ಹೇಳುವುದಿಲ್ಲ, ಆದರೆ ನಿಮಗೆ ತೋರಿಸುತ್ತೇನೆ: ಪ್ರೋಗ್ರಾಂ ಬಹಳಷ್ಟು ಕೋಡ್, ಸ್ಕ್ರೀನ್ಶಾಟ್ಗಳು ಮತ್ತು ಶಿಫಾರಸುಗಳನ್ನು ಒಳಗೊಂಡಿದೆ.

ನೀವು ಏರ್ಫ್ಲೋ / ವಿಕಿಮೀಡಿಯಾ ಕಾಮನ್ಸ್ ಪದವನ್ನು ಗೂಗಲ್ ಮಾಡಿದಾಗ ನೀವು ಸಾಮಾನ್ಯವಾಗಿ ಏನನ್ನು ನೋಡುತ್ತೀರಿ
ಪರಿವಿಡಿ
ಪರಿಚಯ
ಅಪಾಚೆ ಗಾಳಿಯ ಹರಿವು ಜಾಂಗೊದಂತೆಯೇ ಇದೆ:
- ಪೈಥಾನ್ನಲ್ಲಿ ಬರೆಯಲಾಗಿದೆ,
- ಅತ್ಯುತ್ತಮ ನಿರ್ವಾಹಕರು ಇದ್ದಾರೆ,
- ಅನಿಯಮಿತವಾಗಿ ವಿಸ್ತರಿಸಬಹುದಾಗಿದೆ
- ಕೇವಲ ಉತ್ತಮ, ಮತ್ತು ಸಂಪೂರ್ಣವಾಗಿ ವಿಭಿನ್ನ ಉದ್ದೇಶಗಳಿಗಾಗಿ ಮಾಡಲ್ಪಟ್ಟಿದೆ, ಅವುಗಳೆಂದರೆ (ಕಟಾ ಮೊದಲು ಬರೆದಂತೆ):
- ಅನಿಯಮಿತ ಸಂಖ್ಯೆಯ ಯಂತ್ರಗಳಲ್ಲಿ ಕಾರ್ಯಗಳನ್ನು ಪ್ರಾರಂಭಿಸುವುದು ಮತ್ತು ಮೇಲ್ವಿಚಾರಣೆ ಮಾಡುವುದು (ಅನೇಕ ಸೆಲೆರಿ/ಕುಬರ್ನೆಟ್ಗಳು ಮತ್ತು ನಿಮ್ಮ ಆತ್ಮಸಾಕ್ಷಿಯು ನಿಮಗೆ ಅನುಮತಿಸುವಂತೆ)
- ಪೈಥಾನ್ ಕೋಡ್ ಅನ್ನು ಸುಲಭವಾಗಿ ಬರೆಯಲು ಮತ್ತು ಅರ್ಥಮಾಡಿಕೊಳ್ಳಲು ಡೈನಾಮಿಕ್ ವರ್ಕ್ಫ್ಲೋ ಉತ್ಪಾದನೆಯೊಂದಿಗೆ
- ಮತ್ತು ಸಿದ್ಧ-ಸಿದ್ಧ ಘಟಕಗಳು ಮತ್ತು ಮನೆಯಲ್ಲಿ ತಯಾರಿಸಿದ ಪ್ಲಗಿನ್ಗಳನ್ನು ಬಳಸಿಕೊಂಡು ಯಾವುದೇ ಡೇಟಾಬೇಸ್ಗಳು ಮತ್ತು API ಗಳನ್ನು ಪರಸ್ಪರ ಸಂಪರ್ಕಿಸುವ ಸಾಮರ್ಥ್ಯ (ಇದು ಮಾಡಲು ತುಂಬಾ ಸುಲಭ).
ನಾವು ಅಪಾಚೆ ಏರ್ಫ್ಲೋ ಅನ್ನು ಈ ರೀತಿ ಬಳಸುತ್ತೇವೆ:
- ನಾವು ವಿವಿಧ ಮೂಲಗಳಿಂದ ಡೇಟಾವನ್ನು ಸಂಗ್ರಹಿಸುತ್ತೇವೆ (ಅನೇಕ SQL ಸರ್ವರ್ ಮತ್ತು PostgreSQL ನಿದರ್ಶನಗಳು, ಅಪ್ಲಿಕೇಶನ್ ಮೆಟ್ರಿಕ್ಗಳೊಂದಿಗೆ ವಿವಿಧ APIಗಳು, 1C ಸಹ) DWH ಮತ್ತು ODS ನಲ್ಲಿ (ನಮಗೆ ಇದು ವರ್ಟಿಕಾ ಮತ್ತು ಕ್ಲಿಕ್ಹೌಸ್ ಆಗಿದೆ).
- ಮುಂದುವರಿದಂತೆ
cron, ಇದು ODS ನಲ್ಲಿ ಡೇಟಾ ಬಲವರ್ಧನೆ ಪ್ರಕ್ರಿಯೆಗಳನ್ನು ನಡೆಸುತ್ತದೆ ಮತ್ತು ಅವುಗಳ ನಿರ್ವಹಣೆಯನ್ನು ಸಹ ಮೇಲ್ವಿಚಾರಣೆ ಮಾಡುತ್ತದೆ.
ಇತ್ತೀಚಿನವರೆಗೂ, ನಮ್ಮ ಅಗತ್ಯಗಳನ್ನು 32 ಕೋರ್ಗಳು ಮತ್ತು 50 GB RAM ಹೊಂದಿರುವ ಒಂದು ಸಣ್ಣ ಸರ್ವರ್ನಿಂದ ಮುಚ್ಚಲಾಗಿದೆ. ಗಾಳಿಯ ಹರಿವಿನಲ್ಲಿ ಇದು ಕಾರ್ಯನಿರ್ವಹಿಸುತ್ತದೆ:
- ಹೆಚ್ಚು 200 ದಿನಗಳು (ವಾಸ್ತವವಾಗಿ ನಾವು ಕಾರ್ಯಗಳನ್ನು ತುಂಬಿದ ಕೆಲಸದ ಹರಿವುಗಳು),
- ಪ್ರತಿಯೊಂದರಲ್ಲೂ ಸರಾಸರಿ 70 ಕಾರ್ಯಗಳು,
- ಈ ವಿಷಯವು ಪ್ರಾರಂಭವಾಗುತ್ತದೆ (ಸರಾಸರಿ ಸಹ) ಗಂಟೆಗೆ ಒಮ್ಮೆ.
ನಾವು ಹೇಗೆ ವಿಸ್ತರಿಸಿದ್ದೇವೆ ಎಂಬುದರ ಕುರಿತು ನಾನು ಕೆಳಗೆ ಬರೆಯುತ್ತೇನೆ, ಆದರೆ ಈಗ ನಾವು ಪರಿಹರಿಸುವ ಉಬರ್-ಕಾರ್ಯವನ್ನು ವ್ಯಾಖ್ಯಾನಿಸೋಣ:
ಮೂರು ಮೂಲ SQL ಸರ್ವರ್ಗಳಿವೆ, ಪ್ರತಿಯೊಂದೂ 50 ಡೇಟಾಬೇಸ್ಗಳನ್ನು ಹೊಂದಿದೆ - ಕ್ರಮವಾಗಿ ಒಂದು ಯೋಜನೆಯ ನಿದರ್ಶನಗಳು, ಅವುಗಳ ರಚನೆಯು ಒಂದೇ ಆಗಿರುತ್ತದೆ (ಬಹುತೇಕ ಎಲ್ಲೆಡೆ, ಮುವಾ-ಹ-ಹ), ಅಂದರೆ ಪ್ರತಿಯೊಂದೂ ಆರ್ಡರ್ಗಳ ಕೋಷ್ಟಕವನ್ನು ಹೊಂದಿದೆ (ಅದೃಷ್ಟವಶಾತ್, ನೀವು ಹೊಂದಬಹುದು ಆ ಹೆಸರಿನ ಟೇಬಲ್ ಯಾವುದೇ ವ್ಯವಹಾರಕ್ಕೆ ತಳ್ಳುತ್ತದೆ). ನಾವು ಸೇವಾ ಕ್ಷೇತ್ರಗಳನ್ನು (ಮೂಲ ಸರ್ವರ್, ಮೂಲ ಡೇಟಾಬೇಸ್, ಇಟಿಎಲ್ ಕಾರ್ಯ ಗುರುತಿಸುವಿಕೆ) ಸೇರಿಸುವ ಮೂಲಕ ಡೇಟಾವನ್ನು ತೆಗೆದುಕೊಳ್ಳುತ್ತೇವೆ ಮತ್ತು ಅವುಗಳನ್ನು ನಿಷ್ಕಪಟವಾಗಿ ವರ್ಟಿಕಾಗೆ ಎಸೆಯುತ್ತೇವೆ.
ಹೋಗೋಣ!
ಭಾಗವು ಮೂಲಭೂತ, ಪ್ರಾಯೋಗಿಕ (ಮತ್ತು ಸ್ವಲ್ಪ ಸೈದ್ಧಾಂತಿಕ)
ನಮಗೆ (ಮತ್ತು ನಿಮಗೆ) ಇದು ಏಕೆ ಬೇಕು?
ಮರಗಳು ದೊಡ್ಡದಾಗಿದ್ದಾಗ ಮತ್ತು ನಾನು ಸರಳನಾಗಿದ್ದೆ SQLಒಂದು ರಷ್ಯನ್ ಚಿಲ್ಲರೆ ವ್ಯಾಪಾರದಲ್ಲಿ ನಿರ್ವಾಹಕರಾಗಿ, ನಮಗೆ ಲಭ್ಯವಿರುವ ಎರಡು ಪರಿಕರಗಳನ್ನು ಬಳಸಿಕೊಂಡು ನಾವು ETL ಪ್ರಕ್ರಿಯೆಗಳನ್ನು ಅಕಾ ಡೇಟಾ ಹರಿವುಗಳನ್ನು ಸ್ಕೇಲಿಂಗ್ ಮಾಡುತ್ತಿದ್ದೇವೆ:
- ಇನ್ಫರ್ಮ್ಯಾಟಿಕಾ ಪವರ್ ಸೆಂಟರ್ - ಅತ್ಯಂತ ಬಹುಮುಖ ವ್ಯವಸ್ಥೆ, ಅತ್ಯಂತ ಉತ್ಪಾದಕ, ತನ್ನದೇ ಆದ ಯಂತ್ರಾಂಶ, ತನ್ನದೇ ಆದ ಆವೃತ್ತಿಯೊಂದಿಗೆ. ದೇವರ ಇಚ್ಛೆ, ನಾನು ಅದರ 1% ಸಾಮರ್ಥ್ಯಗಳನ್ನು ಬಳಸಿದ್ದೇನೆ. ಏಕೆ? ಒಳ್ಳೆಯದು, ಮೊದಲನೆಯದಾಗಿ, ಈ ಇಂಟರ್ಫೇಸ್ 380 ರ ದಶಕದಲ್ಲಿ ಎಲ್ಲೋ ನಮ್ಮ ಮೇಲೆ ಮಾನಸಿಕ ಒತ್ತಡವನ್ನು ಹಾಕಿತು. ಎರಡನೆಯದಾಗಿ, ಈ ವಿಷಯವನ್ನು ಅತ್ಯಂತ ಅತ್ಯಾಧುನಿಕ ಪ್ರಕ್ರಿಯೆಗಳು, ಘಟಕಗಳ ಉಗ್ರ ಮರುಬಳಕೆ ಮತ್ತು ಇತರ ಪ್ರಮುಖ-ಉದ್ಯಮ ವೈಶಿಷ್ಟ್ಯಗಳಿಗಾಗಿ ವಿನ್ಯಾಸಗೊಳಿಸಲಾಗಿದೆ. ವರ್ಷಕ್ಕೆ ಏರ್ಬಸ್ AXNUMX ವಿಂಗ್ನಷ್ಟು ವೆಚ್ಚವಾಗುತ್ತದೆ ಎಂಬ ಅಂಶದ ಬಗ್ಗೆ ನಾವು ಏನನ್ನೂ ಹೇಳುವುದಿಲ್ಲ.
ಎಚ್ಚರದಿಂದಿರಿ, ಸ್ಕ್ರೀನ್ಶಾಟ್ 30 ವರ್ಷಕ್ಕಿಂತ ಕಡಿಮೆ ವಯಸ್ಸಿನವರಿಗೆ ಸ್ವಲ್ಪ ನೋವುಂಟು ಮಾಡಬಹುದು

- SQL ಸರ್ವರ್ ಇಂಟಿಗ್ರೇಷನ್ ಸರ್ವರ್ - ನಾವು ಈ ವ್ಯಕ್ತಿಯನ್ನು ನಮ್ಮ ಆಂತರಿಕ ಯೋಜನೆಯ ಹರಿವಿನಲ್ಲಿ ಬಳಸಿದ್ದೇವೆ. ಒಳ್ಳೆಯದು, ವಾಸ್ತವವಾಗಿ: ನಾವು ಈಗಾಗಲೇ SQL ಸರ್ವರ್ ಅನ್ನು ಬಳಸುತ್ತೇವೆ ಮತ್ತು ಅದರ ETL ಪರಿಕರಗಳನ್ನು ಬಳಸದಿರುವುದು ಹೇಗಾದರೂ ಅಸಮಂಜಸವಾಗಿದೆ. ಅದರ ಬಗ್ಗೆ ಎಲ್ಲವೂ ಒಳ್ಳೆಯದು: ಇಂಟರ್ಫೇಸ್ ಸುಂದರವಾಗಿದೆ, ಮತ್ತು ಪ್ರಗತಿ ವರದಿಗಳು ... ಆದರೆ ನಾವು ಸಾಫ್ಟ್ವೇರ್ ಉತ್ಪನ್ನಗಳನ್ನು ಪ್ರೀತಿಸುವ ಕಾರಣಕ್ಕಾಗಿ ಅಲ್ಲ, ಓಹ್, ಅದಕ್ಕಾಗಿ ಅಲ್ಲ. ಅದನ್ನು ಆವೃತ್ತಿ ಮಾಡಿ
dtsx(ಉಳಿಸಿದಾಗ ಮಿಶ್ರಿತ ನೋಡ್ಗಳೊಂದಿಗೆ XML) ನಾವು ಮಾಡಬಹುದು, ಆದರೆ ಏನು ಪಾಯಿಂಟ್? ಒಂದು ಸರ್ವರ್ನಿಂದ ಇನ್ನೊಂದಕ್ಕೆ ನೂರು ಕೋಷ್ಟಕಗಳನ್ನು ಎಳೆಯುವ ಕಾರ್ಯಗಳ ಪ್ಯಾಕೇಜ್ ಅನ್ನು ಹೇಗೆ ತಯಾರಿಸುವುದು? ಏಕೆ, ನೂರು, ಇಪ್ಪತ್ತು ಅವುಗಳಲ್ಲಿ ಮೌಸ್ ಬಟನ್ ಕ್ಲಿಕ್ ಮಾಡುವಾಗ ನಿಮ್ಮ ತೋರುಬೆರಳು ಬೀಳುವಂತೆ ಮಾಡುತ್ತದೆ. ಆದರೆ ಇದು ಖಂಡಿತವಾಗಿಯೂ ಹೆಚ್ಚು ಫ್ಯಾಶನ್ ಆಗಿ ಕಾಣುತ್ತದೆ:
ನಾವು ಖಂಡಿತವಾಗಿಯೂ ದಾರಿಗಳನ್ನು ಹುಡುಕುತ್ತಿದ್ದೇವೆ. ಇದು ಸಮ ಬಹುತೇಕ ಇದು ಸ್ವಯಂ-ಬರೆದ SSIS ಪ್ಯಾಕೇಜ್ ಜನರೇಟರ್ಗೆ ಬಂದಿದೆ...
... ತದನಂತರ ಹೊಸ ಕೆಲಸ ನನ್ನನ್ನು ಕಂಡುಹಿಡಿದಿದೆ. ಮತ್ತು ಅದರ ಮೇಲೆ ಅಪಾಚೆ ಏರ್ಫ್ಲೋ ನನ್ನನ್ನು ಹಿಂದಿಕ್ಕಿತು.
ETL ಪ್ರಕ್ರಿಯೆಯ ವಿವರಣೆಗಳು ಕೇವಲ ಸರಳ ಪೈಥಾನ್ ಕೋಡ್ ಎಂದು ನಾನು ತಿಳಿದಾಗ, ನಾನು ಸಂತೋಷಕ್ಕಾಗಿ ನೃತ್ಯ ಮಾಡುವುದಕ್ಕಿಂತ ಹೆಚ್ಚಿನದನ್ನು ಮಾಡಲಾಗಲಿಲ್ಲ. ಈ ರೀತಿಯಾಗಿ ಡೇಟಾ ಸ್ಟ್ರೀಮ್ಗಳು ಆವೃತ್ತಿ ಮತ್ತು ಡಿಫಿಂಗ್ಗೆ ಒಳಪಟ್ಟಿವೆ ಮತ್ತು ನೂರಾರು ಡೇಟಾಬೇಸ್ಗಳಿಂದ ಒಂದೇ ರಚನೆಯೊಂದಿಗೆ ಟೇಬಲ್ಗಳನ್ನು ಒಂದೇ ಗುರಿಗೆ ಸುರಿಯುವುದು ಒಂದೂವರೆ ರಿಂದ ಎರಡು 13" ಸ್ಕ್ರೀನ್ಗಳಲ್ಲಿ ಪೈಥಾನ್ ಕೋಡ್ನ ವಿಷಯವಾಯಿತು.
ಒಂದು ಕ್ಲಸ್ಟರ್ ಅನ್ನು ಜೋಡಿಸುವುದು
ನಾವು ಇದನ್ನು ಸಂಪೂರ್ಣ ಶಿಶುವಿಹಾರವನ್ನಾಗಿ ಮಾಡಬಾರದು ಮತ್ತು ಏರ್ಫ್ಲೋ ಅನ್ನು ಸ್ಥಾಪಿಸುವುದು, ನೀವು ಆಯ್ಕೆ ಮಾಡಿದ ಡೇಟಾಬೇಸ್, ಸೆಲೆರಿ ಮತ್ತು ಡಾಕ್ಸ್ನಲ್ಲಿ ವಿವರಿಸಿರುವ ಇತರ ವಿಷಯಗಳ ಬಗ್ಗೆ ಇಲ್ಲಿ ಸಂಪೂರ್ಣವಾಗಿ ಸ್ಪಷ್ಟವಾದ ವಿಷಯಗಳ ಬಗ್ಗೆ ಮಾತನಾಡಬಾರದು.
ಆದ್ದರಿಂದ ನಾವು ಈಗಿನಿಂದಲೇ ಪ್ರಯೋಗವನ್ನು ಪ್ರಾರಂಭಿಸಬಹುದು, ನಾನು ಸ್ಕೆಚ್ ಔಟ್ ಮಾಡಿದೆ docker-compose.yml ಯಾವುದರಲ್ಲಿ:
- ವಾಸ್ತವವನ್ನು ಹೆಚ್ಚಿಸೋಣ ಹವೇಯ ಚಲನ: ಶೆಡ್ಯೂಲರ್, ವೆಬ್ ಸರ್ವರ್. ಸೆಲರಿ ಕಾರ್ಯಗಳನ್ನು ಮೇಲ್ವಿಚಾರಣೆ ಮಾಡಲು ಹೂವು ಸಹ ಅಲ್ಲಿ ಓಡುತ್ತದೆ (ಏಕೆಂದರೆ ಅದನ್ನು ಈಗಾಗಲೇ ತಳ್ಳಲಾಗಿದೆ
apache/airflow:1.10.10-python3.7, ಮತ್ತು ನಾವು ತಲೆಕೆಡಿಸಿಕೊಳ್ಳುವುದಿಲ್ಲ); - PostgreSQL, ಇದರಲ್ಲಿ ಏರ್ಫ್ಲೋ ತನ್ನ ಸೇವಾ ಮಾಹಿತಿಯನ್ನು ಬರೆಯುತ್ತದೆ (ಶೆಡ್ಯೂಲರ್ ಡೇಟಾ, ಎಕ್ಸಿಕ್ಯೂಶನ್ ಅಂಕಿಅಂಶಗಳು, ಇತ್ಯಾದಿ), ಮತ್ತು ಸೆಲೆರಿ ಪೂರ್ಣಗೊಂಡ ಕಾರ್ಯಗಳನ್ನು ಗುರುತಿಸುತ್ತದೆ;
- ಕೆಂಪು, ಸೆಲೆರಿಗಾಗಿ ಟಾಸ್ಕ್ ಬ್ರೋಕರ್ ಆಗಿ ಯಾರು ಕಾರ್ಯನಿರ್ವಹಿಸುತ್ತಾರೆ;
- ಸೆಲರಿ ಕೆಲಸಗಾರ, ಯಾರು ನೇರವಾಗಿ ಕಾರ್ಯಗಳನ್ನು ನಿರ್ವಹಿಸುತ್ತಾರೆ.
- ಫೋಲ್ಡರ್ಗೆ
./dagsನಾವು ನಮ್ಮ ಫೈಲ್ಗಳನ್ನು ಅಗೆದ ವಿವರಣೆಗಳೊಂದಿಗೆ ಒಟ್ಟುಗೂಡಿಸುತ್ತೇವೆ. ಅವರು ಫ್ಲೈನಲ್ಲಿ ಎತ್ತಿಕೊಂಡು ಹೋಗುತ್ತಾರೆ, ಆದ್ದರಿಂದ ಪ್ರತಿ ಸೀನುವಿಕೆಯ ನಂತರ ಸಂಪೂರ್ಣ ಸ್ಟಾಕ್ ಅನ್ನು ಸರಿಸಲು ಅಗತ್ಯವಿಲ್ಲ.
ಕೆಲವು ಸ್ಥಳಗಳಲ್ಲಿ ಉದಾಹರಣೆಗಳಲ್ಲಿನ ಕೋಡ್ ಅನ್ನು ಪೂರ್ಣವಾಗಿ ನೀಡಲಾಗಿಲ್ಲ (ಪಠ್ಯವನ್ನು ಅಸ್ತವ್ಯಸ್ತಗೊಳಿಸದಂತೆ), ಮತ್ತು ಕೆಲವು ಸ್ಥಳಗಳಲ್ಲಿ ಅದನ್ನು ಪ್ರಕ್ರಿಯೆಯಲ್ಲಿ ಮಾರ್ಪಡಿಸಲಾಗಿದೆ. ಸಂಪೂರ್ಣ ವರ್ಕಿಂಗ್ ಕೋಡ್ ಉದಾಹರಣೆಗಳನ್ನು ರೆಪೊಸಿಟರಿಯಲ್ಲಿ ಕಾಣಬಹುದು .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerಪ್ರಾರ್ಥನೆ:
- ಸಂಯೋಜನೆಯನ್ನು ಜೋಡಿಸುವಲ್ಲಿ, ನಾನು ಹೆಚ್ಚಾಗಿ ಪ್ರಸಿದ್ಧ ಚಿತ್ರವನ್ನು ಅವಲಂಬಿಸಿದೆ - ಅದನ್ನು ಪರೀಕ್ಷಿಸಲು ಮರೆಯದಿರಿ. ಬಹುಶಃ ನಿಮಗೆ ಜೀವನದಲ್ಲಿ ಬೇರೆ ಏನೂ ಅಗತ್ಯವಿಲ್ಲ.
- ಎಲ್ಲಾ ಏರ್ಫ್ಲೋ ಸೆಟ್ಟಿಂಗ್ಗಳು ಮಾತ್ರ ಲಭ್ಯವಿಲ್ಲ
airflow.cfg, ಆದರೆ ಪರಿಸರ ವೇರಿಯಬಲ್ಗಳ ಮೂಲಕ (ಡೆವಲಪರ್ಗಳಿಗೆ ವೈಭವ), ನಾನು ದುರುದ್ದೇಶಪೂರ್ವಕವಾಗಿ ಪ್ರಯೋಜನವನ್ನು ಪಡೆದುಕೊಂಡಿದ್ದೇನೆ. - ಸ್ವಾಭಾವಿಕವಾಗಿ, ಇದು ಉತ್ಪಾದನೆಗೆ ಸಿದ್ಧವಾಗಿಲ್ಲ: ನಾನು ಉದ್ದೇಶಪೂರ್ವಕವಾಗಿ ಕಂಟೇನರ್ಗಳ ಮೇಲೆ ಹೃದಯ ಬಡಿತಗಳನ್ನು ಹಾಕಲಿಲ್ಲ ಮತ್ತು ಭದ್ರತೆಗೆ ತಲೆಕೆಡಿಸಿಕೊಳ್ಳಲಿಲ್ಲ. ಆದರೆ ನಮ್ಮ ಪ್ರಯೋಗಕಾರರಿಗೆ ಸೂಕ್ತವಾದ ಕನಿಷ್ಠವನ್ನು ನಾನು ಮಾಡಿದ್ದೇನೆ.
- ಇದನ್ನು ಗಮನಿಸಿ:
- ಡ್ಯಾಗ್ಗಳನ್ನು ಹೊಂದಿರುವ ಫೋಲ್ಡರ್ ಅನ್ನು ಶೆಡ್ಯೂಲರ್ ಮತ್ತು ಕೆಲಸಗಾರರಿಗೆ ಪ್ರವೇಶಿಸಬಹುದು.
- ಎಲ್ಲಾ ಥರ್ಡ್-ಪಾರ್ಟಿ ಲೈಬ್ರರಿಗಳಿಗೂ ಇದು ಅನ್ವಯಿಸುತ್ತದೆ - ಅವೆಲ್ಲವನ್ನೂ ಶೆಡ್ಯೂಲರ್ ಮತ್ತು ಕೆಲಸಗಾರರಿರುವ ಯಂತ್ರಗಳಲ್ಲಿ ಸ್ಥಾಪಿಸಬೇಕು.
ಸರಿ, ಈಗ ಇದು ಸರಳವಾಗಿದೆ:
$ docker-compose up --scale worker=3ಎಲ್ಲವೂ ಮುಗಿದ ನಂತರ, ನೀವು ವೆಬ್ ಇಂಟರ್ಫೇಸ್ಗಳನ್ನು ನೋಡಬಹುದು:
- ಹವೇಯ ಚಲನ:
- ಹೂ:
ಮೂಲ ಪರಿಕಲ್ಪನೆಗಳು
ಈ ಎಲ್ಲಾ "ಡ್ಯಾಗ್ಗಳಲ್ಲಿ" ನಿಮಗೆ ಏನೂ ಅರ್ಥವಾಗದಿದ್ದರೆ, ಇಲ್ಲಿ ಒಂದು ಸಣ್ಣ ಗ್ಲಾಸರಿ ಇದೆ:
- ವೇಳಾಪಟ್ಟಿ - ಏರ್ಫ್ಲೋನಲ್ಲಿನ ಪ್ರಮುಖ ವ್ಯಕ್ತಿ, ರೋಬೋಟ್ಗಳು ಕಷ್ಟಪಟ್ಟು ಕೆಲಸ ಮಾಡುತ್ತವೆ ಮತ್ತು ಜನರಲ್ಲ ಎಂದು ಖಚಿತಪಡಿಸಿಕೊಳ್ಳುತ್ತಾರೆ: ಅವರು ವೇಳಾಪಟ್ಟಿಯನ್ನು ಮೇಲ್ವಿಚಾರಣೆ ಮಾಡುತ್ತಾರೆ, ಡೇಟಾವನ್ನು ನವೀಕರಿಸುತ್ತಾರೆ, ಕಾರ್ಯಗಳನ್ನು ನಿರ್ವಹಿಸುತ್ತಾರೆ.
ಸಾಮಾನ್ಯವಾಗಿ, ಹಳೆಯ ಆವೃತ್ತಿಗಳಲ್ಲಿ, ಇದು ಮೆಮೊರಿಯೊಂದಿಗೆ ಸಮಸ್ಯೆಗಳನ್ನು ಹೊಂದಿತ್ತು (ಇಲ್ಲ, ವಿಸ್ಮೃತಿ ಅಲ್ಲ, ಆದರೆ ಸೋರಿಕೆಗಳು) ಮತ್ತು ಸಂರಚನೆಗಳಲ್ಲಿ ಪರಂಪರೆಯ ನಿಯತಾಂಕವೂ ಇತ್ತು
run_duration- ಅದರ ಮರುಪ್ರಾರಂಭದ ಮಧ್ಯಂತರ. ಆದರೆ ಈಗ ಎಲ್ಲವೂ ಸರಿಯಾಗಿದೆ. - DAG (ಅಕಾ "ಡಾಗ್") "ನಿರ್ದೇಶಿತ ಅಸಿಕ್ಲಿಕ್ ಗ್ರಾಫ್" ಆಗಿದೆ, ಆದರೆ ಅಂತಹ ವ್ಯಾಖ್ಯಾನವು ಯಾರಿಗಾದರೂ ಕಡಿಮೆ ಅರ್ಥವನ್ನು ನೀಡುತ್ತದೆ, ಆದರೆ ಮೂಲಭೂತವಾಗಿ ಇದು ಪರಸ್ಪರ ಸಂವಹನ ಮಾಡುವ ಕಾರ್ಯಗಳಿಗಾಗಿ ಕಂಟೇನರ್ ಆಗಿದೆ (ಕೆಳಗೆ ನೋಡಿ) ಅಥವಾ SSIS ಮತ್ತು ವರ್ಕ್ಫ್ಲೋನಲ್ಲಿ ಪ್ಯಾಕೇಜ್ನ ಅನಲಾಗ್ ಇನ್ಫರ್ಮ್ಯಾಟಿಕಾದಲ್ಲಿ.
ಡ್ಯಾಗ್ಗಳ ಜೊತೆಗೆ, ಸಬ್ಡಾಗ್ಗಳು ಸಹ ಇರಬಹುದು, ಆದರೆ ನಾವು ಅವುಗಳನ್ನು ಹೆಚ್ಚಾಗಿ ಪಡೆಯುವುದಿಲ್ಲ.
- DAG ರನ್ - ಪ್ರಾರಂಭಿಕ ಡಾಗ್, ಇದು ತನ್ನದೇ ಆದ ನಿಗದಿಪಡಿಸಲಾಗಿದೆ
execution_date. ಒಂದು ಡಾಗ್ನ ಡಾಗ್ರಾನ್ಗಳು ಸಮಾನಾಂತರವಾಗಿ ಕೆಲಸ ಮಾಡಬಹುದು (ಸಹಜವಾಗಿ, ನಿಮ್ಮ ಕಾರ್ಯಗಳನ್ನು ನೀವು ಅಸಮರ್ಥಗೊಳಿಸಿದ್ದರೆ). - ಆಪರೇಟರ್ - ಇವು ನಿರ್ದಿಷ್ಟ ಕ್ರಿಯೆಯನ್ನು ನಿರ್ವಹಿಸುವ ಜವಾಬ್ದಾರಿಯುತ ಕೋಡ್ನ ತುಣುಕುಗಳಾಗಿವೆ. ಮೂರು ವಿಧದ ಆಪರೇಟರ್ಗಳಿವೆ:
- ಕ್ರಮ, ನಮ್ಮ ಪ್ರೀತಿಯ ಹಾಗೆ
PythonOperator, ಇದು ಯಾವುದೇ (ಮಾನ್ಯ) ಪೈಥಾನ್ ಕೋಡ್ ಅನ್ನು ಕಾರ್ಯಗತಗೊಳಿಸಬಹುದು; - ವರ್ಗಾವಣೆ, ಇದು ಸ್ಥಳದಿಂದ ಸ್ಥಳಕ್ಕೆ ಡೇಟಾವನ್ನು ಸಾಗಿಸುತ್ತದೆ, ಹೇಳಿ
MsSqlToHiveTransfer; - ಸಂವೇದಕ ಯಾವುದೇ ಘಟನೆ ಸಂಭವಿಸುವ ಮೊದಲು ಡ್ಯಾಗ್ನ ಮತ್ತಷ್ಟು ಕಾರ್ಯಗತಗೊಳಿಸುವಿಕೆಯನ್ನು ಪ್ರತಿಕ್ರಿಯಿಸಲು ಅಥವಾ ನಿಧಾನಗೊಳಿಸಲು ಇದು ನಿಮ್ಮನ್ನು ಅನುಮತಿಸುತ್ತದೆ.
HttpSensorನಿರ್ದಿಷ್ಟಪಡಿಸಿದ ಅಂತಿಮ ಬಿಂದುವನ್ನು ಎಳೆಯಬಹುದು ಮತ್ತು ಬಯಸಿದ ಪ್ರತಿಕ್ರಿಯೆಯನ್ನು ಸ್ವೀಕರಿಸಿದಾಗ, ವರ್ಗಾವಣೆಯನ್ನು ಪ್ರಾರಂಭಿಸಿGoogleCloudStorageToS3Operator. ಜಿಜ್ಞಾಸೆಯ ಮನಸ್ಸು ಕೇಳುತ್ತದೆ: "ಯಾಕೆ? ಎಲ್ಲಾ ನಂತರ, ನೀವು ಆಪರೇಟರ್ನಲ್ಲಿಯೇ ಪುನರಾವರ್ತನೆಗಳನ್ನು ಮಾಡಬಹುದು! ತದನಂತರ, ಅಂಟಿಕೊಂಡಿರುವ ನಿರ್ವಾಹಕರೊಂದಿಗೆ ಟಾಸ್ಕ್ ಪೂಲ್ ಅನ್ನು ಮುಚ್ಚಿಹಾಕದಂತೆ. ಸಂವೇದಕವು ಪ್ರಾರಂಭವಾಗುತ್ತದೆ, ಪರೀಕ್ಷೆಗಳು ಮತ್ತು ಮುಂದಿನ ಪ್ರಯತ್ನದವರೆಗೆ ಸಾಯುತ್ತದೆ.
- ಕ್ರಮ, ನಮ್ಮ ಪ್ರೀತಿಯ ಹಾಗೆ
- ಕಾರ್ಯ - ಘೋಷಿತ ನಿರ್ವಾಹಕರು, ಪ್ರಕಾರವನ್ನು ಲೆಕ್ಕಿಸದೆ, ಮತ್ತು ಡ್ಯಾಗ್ಗೆ ಲಗತ್ತಿಸಲಾದ ಕಾರ್ಯದ ಶ್ರೇಣಿಗೆ ಬಡ್ತಿ ನೀಡಲಾಗುತ್ತದೆ.
- ಕಾರ್ಯ ನಿದರ್ಶನ - ಪ್ರದರ್ಶಕ-ಕಾರ್ಮಿಕರ ವಿರುದ್ಧ ಕಾರ್ಯಗಳನ್ನು ಯುದ್ಧಕ್ಕೆ ಕಳುಹಿಸುವ ಸಮಯ ಎಂದು ಸಾಮಾನ್ಯ ಯೋಜಕರು ನಿರ್ಧರಿಸಿದಾಗ (ಸ್ಥಳದಲ್ಲಿಯೇ, ನಾವು ಬಳಸಿದರೆ).
LocalExecutorಅಥವಾ ಸಂದರ್ಭದಲ್ಲಿ ರಿಮೋಟ್ ನೋಡ್ಗೆCeleryExecutor), ಇದು ಅವರಿಗೆ ಸಂದರ್ಭವನ್ನು ನಿಯೋಜಿಸುತ್ತದೆ (ಅಂದರೆ, ವೇರಿಯೇಬಲ್ಗಳ ಒಂದು ಸೆಟ್ - ಎಕ್ಸಿಕ್ಯೂಶನ್ ಪ್ಯಾರಾಮೀಟರ್ಗಳು), ಕಮಾಂಡ್ ಅಥವಾ ವಿನಂತಿ ಟೆಂಪ್ಲೇಟ್ಗಳನ್ನು ವಿಸ್ತರಿಸುತ್ತದೆ ಮತ್ತು ಅವುಗಳನ್ನು ಪೂಲ್ನಲ್ಲಿ ಇರಿಸುತ್ತದೆ.
ಕಾರ್ಯಗಳನ್ನು ರಚಿಸುವುದು
ಮೊದಲಿಗೆ, ನಮ್ಮ ಡ್ಯಾಗ್ನ ಸಾಮಾನ್ಯ ಯೋಜನೆಯನ್ನು ರೂಪಿಸೋಣ, ಮತ್ತು ನಂತರ ನಾವು ಹೆಚ್ಚು ಹೆಚ್ಚು ವಿವರಗಳಿಗೆ ಧುಮುಕುತ್ತೇವೆ, ಏಕೆಂದರೆ ನಾವು ಕೆಲವು ಕ್ಷುಲ್ಲಕವಲ್ಲದ ಪರಿಹಾರಗಳನ್ನು ಬಳಸುತ್ತಿದ್ದೇವೆ.
ಆದ್ದರಿಂದ, ಅದರ ಸರಳ ರೂಪದಲ್ಲಿ, ಅಂತಹ ಡ್ಯಾಗ್ ಈ ರೀತಿ ಕಾಣುತ್ತದೆ:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)ಅದನ್ನು ಲೆಕ್ಕಾಚಾರ ಮಾಡೋಣ:
- ಮೊದಲಿಗೆ, ಅಗತ್ಯವಿರುವ ಲಿಬ್ಗಳನ್ನು ಆಮದು ಮಾಡಿಕೊಳ್ಳಿ ಮತ್ತು ಬೇರೆ ಏನೋ;
sql_server_ds- ಇದುList[namedtuple[str, str]]ಏರ್ಫ್ಲೋ ಸಂಪರ್ಕಗಳಿಂದ ಸಂಪರ್ಕಗಳ ಹೆಸರುಗಳು ಮತ್ತು ನಾವು ನಮ್ಮ ಪ್ಲೇಟ್ ಅನ್ನು ತೆಗೆದುಕೊಳ್ಳುವ ಡೇಟಾಬೇಸ್ಗಳೊಂದಿಗೆ;dag- ನಮ್ಮ ಡ್ಯಾಗ್ನಿಂದ ಪ್ರಕಟಣೆ, ಅದು ಇರಬೇಕುglobals(), ಇಲ್ಲದಿದ್ದರೆ ಗಾಳಿಯ ಹರಿವು ಅದನ್ನು ಕಂಡುಹಿಡಿಯುವುದಿಲ್ಲ. ಡೌಗ್ ಸಹ ಹೇಳಬೇಕಾಗಿದೆ:- ಅವನ ಹೆಸರೇನು
orders- ಈ ಹೆಸರು ನಂತರ ವೆಬ್ ಇಂಟರ್ಫೇಸ್ನಲ್ಲಿ ಕಾಣಿಸಿಕೊಳ್ಳುತ್ತದೆ, - ಜುಲೈ XNUMX ರ ಮಧ್ಯರಾತ್ರಿಯಿಂದ ಇದು ಕೆಲಸ ಮಾಡುತ್ತದೆ,
- ಮತ್ತು ಇದು ಸರಿಸುಮಾರು ಪ್ರತಿ 6 ಗಂಟೆಗಳಿಗೊಮ್ಮೆ ಪ್ರಾರಂಭಿಸಬೇಕು (ತಂಪಾದ ಹುಡುಗರಿಗಾಗಿ, ಬದಲಿಗೆ ಇಲ್ಲಿ
timedelta()ಸ್ವೀಕಾರಾರ್ಹcron- ಸಾಲು0 0 0/6 ? * * *, ಕಡಿಮೆ ತಂಪು - ಒಂದು ರೀತಿಯ ಅಭಿವ್ಯಕ್ತಿ@daily);
- ಅವನ ಹೆಸರೇನು
workflow()ಮುಖ್ಯ ಕೆಲಸವನ್ನು ಮಾಡುತ್ತದೆ, ಆದರೆ ಈಗ ಅಲ್ಲ. ಈಗ ನಾವು ನಮ್ಮ ಸಂದರ್ಭವನ್ನು ಲಾಗ್ಗೆ ಸರಳವಾಗಿ ಡಂಪ್ ಮಾಡುತ್ತೇವೆ.- ಮತ್ತು ಈಗ ಕಾರ್ಯಗಳನ್ನು ರಚಿಸುವ ಸರಳ ಮ್ಯಾಜಿಕ್:
- ನಮ್ಮ ಮೂಲಗಳ ಮೂಲಕ ಹೋಗೋಣ;
- ಆರಂಭಿಸಲು
PythonOperator, ಇದು ನಮ್ಮ ಡಮ್ಮಿಯನ್ನು ನಿರ್ವಹಿಸುತ್ತದೆworkflow(). ಕಾರ್ಯದ ವಿಶಿಷ್ಟವಾದ (ಡ್ಯಾಗ್ನೊಳಗೆ) ಹೆಸರನ್ನು ಸೂಚಿಸಲು ಮರೆಯಬೇಡಿ ಮತ್ತು ಡ್ಯಾಗ್ ಅನ್ನು ಲಗತ್ತಿಸಿ. ಧ್ವಜprovide_contextಪ್ರತಿಯಾಗಿ, ಕಾರ್ಯಕ್ಕೆ ಹೆಚ್ಚುವರಿ ವಾದಗಳನ್ನು ಸುರಿಯುತ್ತಾರೆ, ಅದನ್ನು ನಾವು ಎಚ್ಚರಿಕೆಯಿಂದ ಬಳಸಿ ಸಂಗ್ರಹಿಸುತ್ತೇವೆ**context.
ಈಗ ಅಷ್ಟೆ. ನಮಗೆ ಏನು ಸಿಕ್ಕಿತು:
- ವೆಬ್ ಇಂಟರ್ಫೇಸ್ನಲ್ಲಿ ಹೊಸ ಡ್ಯಾಗ್,
- ಒಂದೂವರೆ ನೂರು ಕಾರ್ಯಗಳನ್ನು ಸಮಾನಾಂತರವಾಗಿ ಕಾರ್ಯಗತಗೊಳಿಸಲಾಗುತ್ತದೆ (ಗಾಳಿಯ ಹರಿವು, ಸೆಲರಿ ಮತ್ತು ಸರ್ವರ್ ಪವರ್ ಸೆಟ್ಟಿಂಗ್ಗಳು ಅದನ್ನು ಅನುಮತಿಸಿದರೆ).
ಸರಿ, ನಾವು ಅದನ್ನು ಬಹುತೇಕ ಪಡೆದುಕೊಂಡಿದ್ದೇವೆ.

ಅವಲಂಬನೆಗಳನ್ನು ಯಾರು ಸ್ಥಾಪಿಸುತ್ತಾರೆ?
ಈ ಸಂಪೂರ್ಣ ವಿಷಯವನ್ನು ಸರಳೀಕರಿಸಲು, ನಾನು ಅದನ್ನು ಹಾಕಿದ್ದೇನೆ docker-compose.yml ಸಂಸ್ಕರಣೆ requirements.txt ಎಲ್ಲಾ ನೋಡ್ಗಳಲ್ಲಿ.
ಈಗ ನಾವು ಹೋಗುತ್ತೇವೆ:

ಬೂದು ಚೌಕಗಳು ಶೆಡ್ಯೂಲರ್ನಿಂದ ಪ್ರಕ್ರಿಯೆಗೊಳಿಸಲಾದ ಕಾರ್ಯ ನಿದರ್ಶನಗಳಾಗಿವೆ.
ನಾವು ಸ್ವಲ್ಪ ಕಾಯುತ್ತೇವೆ, ಕೆಲಸಗಾರರಿಂದ ಕಾರ್ಯಗಳನ್ನು ಸ್ನ್ಯಾಪ್ ಮಾಡಲಾಗಿದೆ:

ಹಸಿರು, ಸಹಜವಾಗಿ, ಯಶಸ್ವಿಯಾಗಿ ಕೆಲಸ ಮಾಡಿದೆ. ರೆಡ್ಸ್ ಹೆಚ್ಚು ಯಶಸ್ವಿಯಾಗುವುದಿಲ್ಲ.
ಮೂಲಕ, ನಮ್ಮ ಉತ್ಪನ್ನದಲ್ಲಿ ಯಾವುದೇ ಫೋಲ್ಡರ್ ಇಲ್ಲ
./dags, ಯಂತ್ರಗಳ ನಡುವೆ ಯಾವುದೇ ಸಿಂಕ್ರೊನೈಸೇಶನ್ ಇಲ್ಲ - ಎಲ್ಲಾ ಡೇಟಾ ಇದೆgitನಮ್ಮ Gitlab ನಲ್ಲಿ, ಮತ್ತು Gitlab CI ವಿಲೀನಗೊಳ್ಳುವಾಗ ಯಂತ್ರಗಳಿಗೆ ನವೀಕರಣಗಳನ್ನು ವಿತರಿಸುತ್ತದೆmaster.
ಹೂವಿನ ಬಗ್ಗೆ ಸ್ವಲ್ಪ
ಕೆಲಸಗಾರರು ನಮ್ಮ ಡಮ್ಮಿ ಷಫಲ್ಗಳನ್ನು ರುಬ್ಬುತ್ತಿರುವಾಗ, ನಮಗೆ ಏನನ್ನಾದರೂ ತೋರಿಸಬಹುದಾದ ಮತ್ತೊಂದು ಸಾಧನವನ್ನು ನೆನಪಿಸಿಕೊಳ್ಳೋಣ - ಹೂವು.
ವರ್ಕರ್ ನೋಡ್ಗಳ ಸಾರಾಂಶ ಮಾಹಿತಿಯೊಂದಿಗೆ ಮೊದಲ ಪುಟ:

ಕೆಲಸಕ್ಕೆ ಕಳುಹಿಸಲಾದ ಕಾರ್ಯಗಳನ್ನು ಹೊಂದಿರುವ ಅತ್ಯಂತ ಸ್ಯಾಚುರೇಟೆಡ್ ಪುಟ:

ನಮ್ಮ ಬ್ರೋಕರ್ನ ಸ್ಥಿತಿಯನ್ನು ಹೊಂದಿರುವ ಅತ್ಯಂತ ನೀರಸ ಪುಟ:

ಕಾರ್ಯಗಳ ಸ್ಥಿತಿ ಮತ್ತು ಅವುಗಳ ಕಾರ್ಯಗತಗೊಳಿಸುವ ಸಮಯದ ಗ್ರಾಫ್ಗಳೊಂದಿಗೆ ಅತ್ಯಂತ ಗಮನಾರ್ಹವಾದ ಪುಟವಾಗಿದೆ:

ನಾವು ಅಂಡರ್ಲೋಡ್ ಮಾಡಿರುವುದನ್ನು ಮರುಲೋಡ್ ಮಾಡುತ್ತೇವೆ
ಆದ್ದರಿಂದ, ಎಲ್ಲಾ ಕಾರ್ಯಗಳು ಪೂರ್ಣಗೊಂಡಿವೆ, ಗಾಯಾಳುಗಳನ್ನು ಕೊಂಡೊಯ್ಯಬಹುದು.

ಮತ್ತು ಕೆಲವು ಗಾಯಗೊಂಡರು - ಒಂದು ಕಾರಣಕ್ಕಾಗಿ ಅಥವಾ ಇನ್ನೊಂದು ಕಾರಣಕ್ಕಾಗಿ. ಗಾಳಿಯ ಹರಿವನ್ನು ಸರಿಯಾಗಿ ಬಳಸಿದರೆ, ಡೇಟಾ ಖಂಡಿತವಾಗಿಯೂ ಬಂದಿಲ್ಲ ಎಂದು ಇದೇ ಚೌಕಗಳು ಸೂಚಿಸುತ್ತವೆ.
ನೀವು ಲಾಗ್ ಅನ್ನು ನೋಡಬೇಕು ಮತ್ತು ಬಿದ್ದ ಕಾರ್ಯದ ನಿದರ್ಶನಗಳನ್ನು ಮರುಪ್ರಾರಂಭಿಸಬೇಕು.
ಯಾವುದೇ ಚೌಕದ ಮೇಲೆ ಕ್ಲಿಕ್ ಮಾಡುವ ಮೂಲಕ, ನಮಗೆ ಲಭ್ಯವಿರುವ ಕ್ರಿಯೆಗಳನ್ನು ನಾವು ನೋಡುತ್ತೇವೆ:

ನೀವು ಅದನ್ನು ತೆಗೆದುಕೊಂಡು ಬಿದ್ದವರನ್ನು ತೆರವುಗೊಳಿಸಬಹುದು. ಅಂದರೆ, ಅಲ್ಲಿ ಏನಾದರೂ ಬಿದ್ದಿದೆ ಎಂದು ನಾವು ಮರೆತುಬಿಡುತ್ತೇವೆ ಮತ್ತು ಅದೇ ಕಾರ್ಯದ ನಿದರ್ಶನವು ಶೆಡ್ಯೂಲರ್ಗೆ ಹೋಗುತ್ತದೆ.

ಎಲ್ಲಾ ಕೆಂಪು ಚೌಕಗಳೊಂದಿಗೆ ಮೌಸ್ನೊಂದಿಗೆ ಇದನ್ನು ಮಾಡುವುದು ತುಂಬಾ ಮಾನವೀಯವಲ್ಲ ಎಂಬುದು ಸ್ಪಷ್ಟವಾಗಿದೆ - ಇದು ಗಾಳಿಯ ಹರಿವಿನಿಂದ ನಾವು ನಿರೀಕ್ಷಿಸುವುದಿಲ್ಲ. ಸ್ವಾಭಾವಿಕವಾಗಿ, ನಮ್ಮಲ್ಲಿ ಸಾಮೂಹಿಕ ವಿನಾಶದ ಆಯುಧಗಳಿವೆ: Browse/Task Instances

ಎಲ್ಲವನ್ನೂ ಒಂದೇ ಬಾರಿಗೆ ಆಯ್ಕೆ ಮಾಡಿ ಮತ್ತು ಶೂನ್ಯಕ್ಕೆ ಮರುಹೊಂದಿಸೋಣ, ಸರಿಯಾದ ಐಟಂ ಅನ್ನು ಕ್ಲಿಕ್ ಮಾಡಿ:

ಸ್ವಚ್ಛಗೊಳಿಸಿದ ನಂತರ, ನಮ್ಮ ಟ್ಯಾಕ್ಸಿಗಳು ಈ ರೀತಿ ಕಾಣುತ್ತವೆ (ಶೆಡ್ಯೂಲರ್ ಅವುಗಳನ್ನು ನಿಗದಿಪಡಿಸಲು ಅವರು ಕಾಯಲು ಸಾಧ್ಯವಿಲ್ಲ):

ಸಂಪರ್ಕಗಳು, ಕೊಕ್ಕೆಗಳು ಮತ್ತು ಇತರ ಅಸ್ಥಿರಗಳು
ಇದು ಮುಂದಿನ DAG ಅನ್ನು ನೋಡುವ ಸಮಯ, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]ಪ್ರತಿಯೊಬ್ಬರೂ ತಮ್ಮ ವರದಿಗಳನ್ನು ಎಂದಾದರೂ ನವೀಕರಿಸಿದ್ದಾರೆ, ಸರಿ? ಇಲ್ಲಿ ಅದು ಮತ್ತೊಮ್ಮೆ: ಡೇಟಾವನ್ನು ಪಡೆಯಲು ಮೂಲಗಳ ಪಟ್ಟಿ ಇದೆ; ಅದನ್ನು ಎಲ್ಲಿ ಹಾಕಬೇಕೆಂಬುದರ ಪಟ್ಟಿ ಇದೆ; ಎಲ್ಲವೂ ಸಂಭವಿಸಿದಾಗ ಅಥವಾ ಮುರಿದುಹೋದಾಗ ಹಾರ್ನ್ ಮಾಡಲು ಮರೆಯಬೇಡಿ (ಅಲ್ಲದೆ, ಇದು ನಮ್ಮ ಬಗ್ಗೆ ಅಲ್ಲ, ಇಲ್ಲ).
ಮತ್ತೊಮ್ಮೆ ಫೈಲ್ ಮೂಲಕ ಹೋಗೋಣ ಮತ್ತು ಹೊಸ ವಿಚಿತ್ರ ವಿಷಯಗಳನ್ನು ನೋಡೋಣ:
from commons.operators import TelegramBotSendMessage— ಅನಿರ್ಬಂಧಿತರಿಗೆ ಸಂದೇಶಗಳನ್ನು ಕಳುಹಿಸಲು ಸಣ್ಣ ಹೊದಿಕೆಯನ್ನು ತಯಾರಿಸುವ ಮೂಲಕ ನಾವು ನಮ್ಮದೇ ಆದ ಆಪರೇಟರ್ಗಳನ್ನು ಮಾಡುವುದರಿಂದ ಯಾವುದೂ ನಮ್ಮನ್ನು ತಡೆಯುವುದಿಲ್ಲ. (ಈ ಆಪರೇಟರ್ ಬಗ್ಗೆ ನಾವು ಕೆಳಗೆ ಹೆಚ್ಚು ಮಾತನಾಡುತ್ತೇವೆ);default_args={}— dag ತನ್ನ ಎಲ್ಲಾ ಆಪರೇಟರ್ಗಳಿಗೆ ಒಂದೇ ರೀತಿಯ ಆರ್ಗ್ಯುಮೆಂಟ್ಗಳನ್ನು ವಿತರಿಸಬಹುದು;to='{{ var.value.all_the_kings_men }}'- ಕ್ಷೇತ್ರtoನಮ್ಮದನ್ನು ಹಾರ್ಡ್ಕೋಡ್ ಮಾಡಲಾಗುವುದಿಲ್ಲ, ಆದರೆ ಜಿಂಜಾ ಮತ್ತು ಇಮೇಲ್ಗಳ ಪಟ್ಟಿಯೊಂದಿಗೆ ವೇರಿಯೇಬಲ್ ಅನ್ನು ಬಳಸಿಕೊಂಡು ಕ್ರಿಯಾತ್ಮಕವಾಗಿ ರಚಿಸಲಾಗಿದೆ, ಅದನ್ನು ನಾನು ಎಚ್ಚರಿಕೆಯಿಂದ ಇರಿಸುತ್ತೇನೆAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS- ಆಪರೇಟರ್ ಉಡಾವಣಾ ಸ್ಥಿತಿ. ನಮ್ಮ ಸಂದರ್ಭದಲ್ಲಿ, ಎಲ್ಲಾ ಅವಲಂಬನೆಗಳನ್ನು ಪೂರೈಸಿದರೆ ಮಾತ್ರ ಪತ್ರವನ್ನು ಮೇಲಧಿಕಾರಿಗಳಿಗೆ ಕಳುಹಿಸಲಾಗುತ್ತದೆ ಯಶಸ್ವಿಯಾಗಿ;tg_bot_conn_id='tg_main'- ವಾದಗಳುconn_idನಾವು ರಚಿಸುವ ಸಂಪರ್ಕಗಳ ಗುರುತಿಸುವಿಕೆಗಳನ್ನು ಸ್ವೀಕರಿಸಿAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- ಬಿದ್ದ ಕಾರ್ಯಗಳಿದ್ದರೆ ಮಾತ್ರ ಟೆಲಿಗ್ರಾಮ್ನಲ್ಲಿನ ಸಂದೇಶಗಳು ಹಾರಿಹೋಗುತ್ತವೆ;task_concurrency=1— ನಾವು ಒಂದು ಕಾರ್ಯದ ಹಲವಾರು ಕಾರ್ಯ ನಿದರ್ಶನಗಳ ಏಕಕಾಲಿಕ ಉಡಾವಣೆಯನ್ನು ನಿಷೇಧಿಸುತ್ತೇವೆ. ಇಲ್ಲದಿದ್ದರೆ, ನಾವು ಹಲವಾರು ಏಕಕಾಲಿಕ ಉಡಾವಣೆಗಳನ್ನು ಪಡೆಯುತ್ತೇವೆVerticaOperator(ಒಂದು ಟೇಬಲ್ ನೋಡುವುದು);report_update >> [email, tg]- ಎಲ್ಲಾVerticaOperatorಈ ರೀತಿಯ ಪತ್ರಗಳು ಮತ್ತು ಸಂದೇಶಗಳನ್ನು ಕಳುಹಿಸಲು ಒಪ್ಪಿಕೊಳ್ಳುತ್ತಾರೆ:

ಆದರೆ ನೋಟಿಫೈಯರ್ ಆಪರೇಟರ್ಗಳು ವಿಭಿನ್ನ ಉಡಾವಣಾ ಪರಿಸ್ಥಿತಿಗಳನ್ನು ಹೊಂದಿರುವುದರಿಂದ, ಒಂದು ಮಾತ್ರ ಕಾರ್ಯನಿರ್ವಹಿಸುತ್ತದೆ. ಟ್ರೀ ವ್ಯೂನಲ್ಲಿ ಎಲ್ಲವೂ ಸ್ವಲ್ಪ ಕಡಿಮೆ ಸ್ಪಷ್ಟವಾಗಿ ಕಾಣುತ್ತದೆ:

ನಾನು ಬಗ್ಗೆ ಕೆಲವು ಪದಗಳನ್ನು ಹೇಳುತ್ತೇನೆ ಮ್ಯಾಕ್ರೋಗಳು ಮತ್ತು ಅವರ ಸ್ನೇಹಿತರು - ಅಸ್ಥಿರ.
ಮ್ಯಾಕ್ರೋಗಳು ಜಿಂಜಾ ಪ್ಲೇಸ್ಹೋಲ್ಡರ್ಗಳಾಗಿದ್ದು ಅದು ಆಪರೇಟರ್ ಆರ್ಗ್ಯುಮೆಂಟ್ಗಳಲ್ಲಿ ವಿವಿಧ ಉಪಯುಕ್ತ ಮಾಹಿತಿಯನ್ನು ಸೇರಿಸಬಹುದು. ಉದಾಹರಣೆಗೆ, ಈ ರೀತಿ:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} ಸಂದರ್ಭ ವೇರಿಯಬಲ್ನ ವಿಷಯಗಳಿಗೆ ವಿಸ್ತರಿಸುತ್ತದೆ execution_date ಸ್ವರೂಪದಲ್ಲಿ YYYY-MM-DD: 2020-07-14. ಉತ್ತಮ ಭಾಗವೆಂದರೆ ಸಂದರ್ಭದ ವೇರಿಯೇಬಲ್ಗಳನ್ನು ನಿರ್ದಿಷ್ಟ ಕಾರ್ಯ ನಿದರ್ಶನಕ್ಕೆ (ಟ್ರೀ ವ್ಯೂನಲ್ಲಿ ಒಂದು ಚೌಕ) ಹೊಡೆಯಲಾಗುತ್ತದೆ ಮತ್ತು ಮರುಪ್ರಾರಂಭಿಸಿದಾಗ ಪ್ಲೇಸ್ಹೋಲ್ಡರ್ಗಳು ಅದೇ ಮೌಲ್ಯಗಳಿಗೆ ವಿಸ್ತರಿಸುತ್ತವೆ.
ನಿಯೋಜಿತ ಮೌಲ್ಯಗಳನ್ನು ಪ್ರತಿ ಕಾರ್ಯ ನಿದರ್ಶನದಲ್ಲಿ ರೆಂಡರ್ಡ್ ಬಟನ್ ಬಳಸಿ ವೀಕ್ಷಿಸಬಹುದು. ಪತ್ರವನ್ನು ಕಳುಹಿಸುವ ಕಾರ್ಯವು ಈ ರೀತಿ ಕಾಣುತ್ತದೆ:

ಮತ್ತು ಸಂದೇಶವನ್ನು ಕಳುಹಿಸುವ ಕಾರ್ಯಕ್ಕಾಗಿ:

ಇತ್ತೀಚಿನ ಲಭ್ಯವಿರುವ ಆವೃತ್ತಿಗಾಗಿ ಅಂತರ್ನಿರ್ಮಿತ ಮ್ಯಾಕ್ರೋಗಳ ಸಂಪೂರ್ಣ ಪಟ್ಟಿ ಇಲ್ಲಿ ಲಭ್ಯವಿದೆ:
ಇದಲ್ಲದೆ, ಪ್ಲಗಿನ್ಗಳ ಸಹಾಯದಿಂದ, ನಾವು ನಮ್ಮದೇ ಆದ ಮ್ಯಾಕ್ರೋಗಳನ್ನು ಘೋಷಿಸಬಹುದು, ಆದರೆ ಇದು ಸಂಪೂರ್ಣವಾಗಿ ವಿಭಿನ್ನ ಕಥೆಯಾಗಿದೆ.
ಪೂರ್ವನಿರ್ಧರಿತ ವಿಷಯಗಳ ಜೊತೆಗೆ, ನಾವು ನಮ್ಮ ವೇರಿಯೇಬಲ್ಗಳ ಮೌಲ್ಯಗಳನ್ನು ಬದಲಿಸಬಹುದು (ನಾನು ಇದನ್ನು ಈಗಾಗಲೇ ಕೋಡ್ನಲ್ಲಿ ಬಳಸಿದ್ದೇನೆ). ಒಳಗೆ ರಚಿಸೋಣ Admin/Variables ಒಂದೆರಡು ತುಣುಕುಗಳು:

ಅಷ್ಟೆ, ನೀವು ಬಳಸಬಹುದು:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')ಮೌಲ್ಯವು ಸ್ಕೇಲಾರ್ ಆಗಿರಬಹುದು ಅಥವಾ ಅದು JSON ಅನ್ನು ಸಹ ಒಳಗೊಂಡಿರಬಹುದು. JSON ಸಂದರ್ಭದಲ್ಲಿ:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}ಬಯಸಿದ ಕೀಗೆ ಮಾರ್ಗವನ್ನು ಬಳಸಿ: {{ var.json.bot_config.bot.token }}.
ನಾನು ಅಕ್ಷರಶಃ ಒಂದು ಪದವನ್ನು ಹೇಳುತ್ತೇನೆ ಮತ್ತು ಅದರ ಬಗ್ಗೆ ಒಂದು ಸ್ಕ್ರೀನ್ಶಾಟ್ ತೋರಿಸುತ್ತೇನೆ соединения. ಇಲ್ಲಿ ಎಲ್ಲವೂ ಪ್ರಾಥಮಿಕವಾಗಿದೆ: ಪುಟದಲ್ಲಿ Admin/Connections ನಾವು ಸಂಪರ್ಕವನ್ನು ರಚಿಸುತ್ತೇವೆ, ನಮ್ಮ ಲಾಗಿನ್ಗಳು/ಪಾಸ್ವರ್ಡ್ಗಳು ಮತ್ತು ಹೆಚ್ಚಿನ ನಿರ್ದಿಷ್ಟ ನಿಯತಾಂಕಗಳನ್ನು ಅಲ್ಲಿ ಸೇರಿಸುತ್ತೇವೆ. ಹೀಗೆ:

ಪಾಸ್ವರ್ಡ್ಗಳನ್ನು ಎನ್ಕ್ರಿಪ್ಟ್ ಮಾಡಬಹುದು (ಡೀಫಾಲ್ಟ್ ಆಯ್ಕೆಗಿಂತ ಹೆಚ್ಚು ಎಚ್ಚರಿಕೆಯಿಂದ), ಅಥವಾ ನೀವು ಸಂಪರ್ಕದ ಪ್ರಕಾರವನ್ನು ನಿರ್ದಿಷ್ಟಪಡಿಸಲು ಸಾಧ್ಯವಿಲ್ಲ (ನಾನು ಮಾಡಿದಂತೆ tg_main) - ವಾಸ್ತವವೆಂದರೆ ಪ್ರಕಾರಗಳ ಪಟ್ಟಿಯನ್ನು ಏರ್ಫ್ಲೋ ಮಾದರಿಗಳಲ್ಲಿ ಹಾರ್ಡ್ವೈರ್ ಮಾಡಲಾಗಿದೆ ಮತ್ತು ಮೂಲ ಕೋಡ್ಗೆ ಪ್ರವೇಶಿಸದೆ ವಿಸ್ತರಿಸಲಾಗುವುದಿಲ್ಲ (ಇದ್ದಕ್ಕಿದ್ದಂತೆ ನಾನು ಏನನ್ನಾದರೂ ಗೂಗಲ್ ಮಾಡದಿದ್ದರೆ, ದಯವಿಟ್ಟು ನನ್ನನ್ನು ಸರಿಪಡಿಸಿ), ಆದರೆ ಯಾವುದೂ ನಮ್ಮನ್ನು ಸರಳವಾಗಿ ಕ್ರೆಡಿಟ್ಗಳನ್ನು ಪಡೆಯುವುದನ್ನು ತಡೆಯುವುದಿಲ್ಲ ಹೆಸರು.
ನೀವು ಅದೇ ಹೆಸರಿನೊಂದಿಗೆ ಹಲವಾರು ಸಂಪರ್ಕಗಳನ್ನು ಸಹ ಮಾಡಬಹುದು: ಈ ಸಂದರ್ಭದಲ್ಲಿ, ವಿಧಾನ BaseHook.get_connection(), ಇದು ನಮಗೆ ಹೆಸರಿನಿಂದ ಸಂಪರ್ಕಗಳನ್ನು ಪಡೆಯುತ್ತದೆ, ನೀಡುತ್ತದೆ ಯಾದೃಚ್ಛಿಕ ಹಲವಾರು ನೇಮ್ಸೇಕ್ಗಳಿಂದ (ರೌಂಡ್ ರಾಬಿನ್ ಮಾಡಲು ಇದು ಹೆಚ್ಚು ತಾರ್ಕಿಕವಾಗಿದೆ, ಆದರೆ ನಾವು ಅದನ್ನು ಏರ್ಫ್ಲೋ ಡೆವಲಪರ್ಗಳ ಆತ್ಮಸಾಕ್ಷಿಗೆ ಬಿಡುತ್ತೇವೆ).
ವೇರಿಯೇಬಲ್ಗಳು ಮತ್ತು ಸಂಪರ್ಕಗಳು ನಿಸ್ಸಂಶಯವಾಗಿ ತಂಪಾದ ಸಾಧನಗಳಾಗಿವೆ, ಆದರೆ ನಿಮ್ಮ ಹರಿವಿನ ಯಾವ ಭಾಗಗಳನ್ನು ನೀವು ಕೋಡ್ನಲ್ಲಿ ಸಂಗ್ರಹಿಸುತ್ತೀರಿ ಮತ್ತು ಯಾವ ಭಾಗಗಳನ್ನು ನೀವು ಶೇಖರಣೆಗಾಗಿ ಏರ್ಫ್ಲೋಗೆ ನೀಡುತ್ತೀರಿ ಎಂಬುದರ ಸಮತೋಲನವನ್ನು ಕಳೆದುಕೊಳ್ಳದಿರುವುದು ಮುಖ್ಯವಾಗಿದೆ. ಒಂದೆಡೆ, ಮೌಲ್ಯವನ್ನು ತ್ವರಿತವಾಗಿ ಬದಲಾಯಿಸುವುದು, ಉದಾಹರಣೆಗೆ, ಮೇಲ್ಬಾಕ್ಸ್, UI ಮೂಲಕ ಅನುಕೂಲಕರವಾಗಿರುತ್ತದೆ. ಮತ್ತೊಂದೆಡೆ, ಇದು ಇನ್ನೂ ಮೌಸ್ ಕ್ಲಿಕ್ಗೆ ಹಿಂತಿರುಗುತ್ತದೆ, ಅದನ್ನು ನಾವು (ನಾನು) ತೊಡೆದುಹಾಕಲು ಬಯಸಿದ್ದೇವೆ.
ಸಂಪರ್ಕಗಳೊಂದಿಗೆ ಕೆಲಸ ಮಾಡುವುದು ಕಾರ್ಯಗಳಲ್ಲಿ ಒಂದಾಗಿದೆ ಕೊಕ್ಕೆಗಳು. ಸಾಮಾನ್ಯವಾಗಿ, ಏರ್ಫ್ಲೋ ಕೊಕ್ಕೆಗಳು ಅದನ್ನು ಮೂರನೇ ವ್ಯಕ್ತಿಯ ಸೇವೆಗಳು ಮತ್ತು ಗ್ರಂಥಾಲಯಗಳಿಗೆ ಸಂಪರ್ಕಿಸುವ ಬಿಂದುಗಳಾಗಿವೆ. ಉದಾ, JiraHook ಜಿರಾ ಅವರೊಂದಿಗೆ ಸಂವಹನ ನಡೆಸಲು ನಮಗೆ ಕ್ಲೈಂಟ್ ಅನ್ನು ತೆರೆಯುತ್ತದೆ (ನೀವು ಕಾರ್ಯಗಳನ್ನು ಹಿಂದಕ್ಕೆ ಮತ್ತು ಮುಂದಕ್ಕೆ ಸರಿಸಬಹುದು), ಮತ್ತು ಸಹಾಯದಿಂದ SambaHook ನೀವು ಸ್ಥಳೀಯ ಫೈಲ್ ಅನ್ನು ತಳ್ಳಬಹುದು smb- ಪಾಯಿಂಟ್.
ಕಸ್ಟಮ್ ಆಪರೇಟರ್ ಅನ್ನು ವಿಶ್ಲೇಷಿಸೋಣ
ಮತ್ತು ಅದನ್ನು ಹೇಗೆ ತಯಾರಿಸಲಾಗಿದೆ ಎಂದು ನೋಡಲು ನಾವು ಹತ್ತಿರ ಬಂದಿದ್ದೇವೆ TelegramBotSendMessage
ಕೋಡ್ commons/operators.py ಆಪರೇಟರ್ನೊಂದಿಗೆ ಸ್ವತಃ:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)ಇಲ್ಲಿ, ಗಾಳಿಯ ಹರಿವಿನ ಎಲ್ಲದರಂತೆ, ಎಲ್ಲವೂ ತುಂಬಾ ಸರಳವಾಗಿದೆ:
- ನಿಂದ ಆನುವಂಶಿಕವಾಗಿ ಪಡೆದಿದೆ
BaseOperator, ಇದು ಸಾಕಷ್ಟು ಗಾಳಿಯ ಹರಿವು-ನಿರ್ದಿಷ್ಟ ವಿಷಯವನ್ನು ಕಾರ್ಯಗತಗೊಳಿಸುತ್ತದೆ (ನಿಮ್ಮ ಬಿಡುವಿನ ವೇಳೆಯಲ್ಲಿ ಇದನ್ನು ಪರಿಶೀಲಿಸಿ) - ಕ್ಷೇತ್ರಗಳನ್ನು ಘೋಷಿಸಲಾಗಿದೆ
template_fields, ಇದರಲ್ಲಿ ಜಿಂಜಾ ಮ್ಯಾಕ್ರೋಗಳನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸಲು ಹುಡುಕುತ್ತದೆ. - ಸರಿಯಾದ ವಾದಗಳನ್ನು ಆಯೋಜಿಸಲಾಗಿದೆ
__init__(), ಅಗತ್ಯವಿರುವಲ್ಲಿ ಡಿಫಾಲ್ಟ್ಗಳನ್ನು ಇರಿಸಲಾಗಿದೆ. - ಪೂರ್ವಜರನ್ನು ಪ್ರಾರಂಭಿಸುವ ಬಗ್ಗೆ ಅವರು ಮರೆಯಲಿಲ್ಲ.
- ಅನುಗುಣವಾದ ಕೊಕ್ಕೆ ತೆರೆಯಿತು
TelegramBotHook, ಅದರಿಂದ ಕ್ಲೈಂಟ್ ವಸ್ತುವನ್ನು ಸ್ವೀಕರಿಸಲಾಗಿದೆ. - ಅತಿಕ್ರಮಿಸಿದ (ಮರುವ್ಯಾಖ್ಯಾನಿತ) ವಿಧಾನ
BaseOperator.execute(), ಆಪರೇಟರ್ ಅನ್ನು ಪ್ರಾರಂಭಿಸಲು ಸಮಯ ಬಂದಾಗ ಯಾವ ಏರ್ಫೊ ಸೆಳೆಯುತ್ತದೆ - ಅದರಲ್ಲಿ ನಾವು ಲಾಗ್ ಇನ್ ಮಾಡಲು ಮರೆಯದೆ ಮುಖ್ಯ ಕ್ರಿಯೆಯನ್ನು ಕಾರ್ಯಗತಗೊಳಿಸುತ್ತೇವೆ. (ಮೂಲಕ, ನಾವು ನೇರವಾಗಿ ಲಾಗ್ ಇನ್ ಮಾಡುತ್ತೇವೆstdoutиstderr- ಗಾಳಿಯ ಹರಿವು ಎಲ್ಲವನ್ನೂ ಪ್ರತಿಬಂಧಿಸುತ್ತದೆ, ಅದನ್ನು ಸುಂದರವಾಗಿ ಸುತ್ತುತ್ತದೆ ಮತ್ತು ಅಗತ್ಯವಿರುವಲ್ಲಿ ಇರಿಸಿ.)
ನಮ್ಮಲ್ಲಿ ಏನಿದೆ ಎಂದು ನೋಡೋಣ commons/hooks.py. ಫೈಲ್ನ ಮೊದಲ ಭಾಗ, ಕೊಕ್ಕೆಯೊಂದಿಗೆ:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientಇಲ್ಲಿ ಏನು ವಿವರಿಸಬಹುದೆಂದು ನನಗೆ ತಿಳಿದಿಲ್ಲ, ನಾನು ಪ್ರಮುಖ ಅಂಶಗಳನ್ನು ಗಮನಿಸುತ್ತೇನೆ:
- ನಾವು ಆನುವಂಶಿಕವಾಗಿ ಪಡೆಯುತ್ತೇವೆ, ವಾದಗಳ ಬಗ್ಗೆ ಯೋಚಿಸಿ - ಹೆಚ್ಚಿನ ಸಂದರ್ಭಗಳಲ್ಲಿ ಒಂದು ಇರುತ್ತದೆ:
conn_id; - ಪ್ರಮಾಣಿತ ವಿಧಾನಗಳನ್ನು ಅತಿಕ್ರಮಿಸುವುದು: ನಾನು ನನ್ನನ್ನು ಸೀಮಿತಗೊಳಿಸಿದೆ
get_conn(), ಇದರಲ್ಲಿ ನಾನು ಹೆಸರಿನ ಮೂಲಕ ಸಂಪರ್ಕ ನಿಯತಾಂಕಗಳನ್ನು ಪಡೆಯುತ್ತೇನೆ ಮತ್ತು ವಿಭಾಗವನ್ನು ಪಡೆಯುತ್ತೇನೆextra(ಇದು JSON ಗಾಗಿ ಕ್ಷೇತ್ರವಾಗಿದೆ), ಇದರಲ್ಲಿ ನಾನು (ನನ್ನ ಸ್ವಂತ ಸೂಚನೆಗಳ ಪ್ರಕಾರ!) ಟೆಲಿಗ್ರಾಮ್ ಬಾಟ್ ಟೋಕನ್ ಅನ್ನು ಹಾಕಿದ್ದೇನೆ:{"bot_token": "YOuRAwEsomeBOtToKen"}. - ನಾನು ನಮ್ಮ ಉದಾಹರಣೆಯನ್ನು ರಚಿಸುತ್ತೇನೆ
TelegramBot, ಅವನಿಗೆ ನಿರ್ದಿಷ್ಟ ಟೋಕನ್ ನೀಡುವುದು.
ಅಷ್ಟೇ. ನೀವು ಬಳಸಿಕೊಂಡು ಕೊಕ್ಕೆಯಿಂದ ಕ್ಲೈಂಟ್ ಅನ್ನು ಪಡೆಯಬಹುದು TelegramBotHook().clent ಅಥವಾ TelegramBotHook().get_conn().
ಮತ್ತು ಫೈಲ್ನ ಎರಡನೇ ಭಾಗ, ಇದರಲ್ಲಿ ನಾನು ಟೆಲಿಗ್ರಾಮ್ REST API ಗಾಗಿ ಮೈಕ್ರೊ-ರ್ಯಾಪರ್ ಅನ್ನು ಮಾಡುತ್ತೇನೆ, ಆದ್ದರಿಂದ ಅದನ್ನು ಎಳೆಯುವುದಿಲ್ಲ ಒಂದು ವಿಧಾನದ ಸಲುವಾಗಿ sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))ಎಲ್ಲವನ್ನೂ ಸೇರಿಸುವುದು ಸರಿಯಾದ ಮಾರ್ಗವಾಗಿದೆ:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- ಪ್ಲಗ್ಇನ್ಗೆ, ಅದನ್ನು ಸಾರ್ವಜನಿಕ ರೆಪೊಸಿಟರಿಯಲ್ಲಿ ಇರಿಸಿ ಮತ್ತು ಅದನ್ನು ಓಪನ್ ಸೋರ್ಸ್ಗೆ ನೀಡಿ.
ನಾವು ಇದನ್ನೆಲ್ಲ ಅಧ್ಯಯನ ಮಾಡುತ್ತಿರುವಾಗ, ನಮ್ಮ ವರದಿಯ ನವೀಕರಣಗಳು ಯಶಸ್ವಿಯಾಗಿ ವಿಫಲವಾದವು ಮತ್ತು ನನ್ನ ಚಾನಲ್ಗೆ ದೋಷ ಸಂದೇಶವನ್ನು ಕಳುಹಿಸಲು ಸಾಧ್ಯವಾಯಿತು. ನಾನು ಮತ್ತೆ ಹೋಗಿ ಏನು ತಪ್ಪಾಗಿದೆ ಎಂದು ಪರಿಶೀಲಿಸುತ್ತೇನೆ ...

ನಮ್ಮ ಬಾಯಲ್ಲಿ ಏನೋ ಒಡೆದಿದೆ! ನಾವು ಕಾಯುತ್ತಿದ್ದದ್ದು ಇದೇ ಅಲ್ಲವೇ? ನಿಖರವಾಗಿ!
ನೀವು ಅದನ್ನು ಸುರಿಯಲು ಹೋಗುತ್ತೀರಾ?
ನಾನು ಏನನ್ನಾದರೂ ಕಳೆದುಕೊಂಡಿದ್ದೇನೆ ಎಂದು ನಿಮಗೆ ಅನಿಸುತ್ತದೆಯೇ? SQL ಸರ್ವರ್ನಿಂದ ವರ್ಟಿಕಾಗೆ ಡೇಟಾವನ್ನು ವರ್ಗಾಯಿಸಲು ಅವನು ಭರವಸೆ ನೀಡಿದ್ದನೆಂದು ತೋರುತ್ತದೆ, ಮತ್ತು ನಂತರ ಅವನು ಅದನ್ನು ತೆಗೆದುಕೊಂಡು ವಿಷಯವನ್ನು ಬಿಟ್ಟುಬಿಟ್ಟನು, ನೀಚ!
ಈ ಅಪರಾಧವು ಉದ್ದೇಶಪೂರ್ವಕವಾಗಿದೆ, ನಾನು ನಿಮಗಾಗಿ ಕೆಲವು ಪರಿಭಾಷೆಯನ್ನು ಅರ್ಥಮಾಡಿಕೊಳ್ಳಬೇಕಾಗಿತ್ತು. ಈಗ ನೀವು ಮುಂದುವರಿಯಬಹುದು.
ನಮ್ಮ ಯೋಜನೆ ಹೀಗಿತ್ತು:
- ಒಂದು ಡ್ಯಾಗ್ ಮಾಡಿ
- ಕಾರ್ಯಗಳನ್ನು ರಚಿಸಿ
- ಎಲ್ಲವೂ ಎಷ್ಟು ಸುಂದರವಾಗಿದೆ ಎಂದು ನೋಡಿ
- ಭರ್ತಿ ಮಾಡಲು ಸೆಷನ್ ಸಂಖ್ಯೆಗಳನ್ನು ನಿಯೋಜಿಸಿ
- SQL ಸರ್ವರ್ನಿಂದ ಡೇಟಾವನ್ನು ಪಡೆಯಿರಿ
- ಡೇಟಾವನ್ನು ವರ್ಟಿಕಾದಲ್ಲಿ ಇರಿಸಿ
- ಅಂಕಿಅಂಶಗಳನ್ನು ಸಂಗ್ರಹಿಸಿ
ಆದ್ದರಿಂದ, ಇದೆಲ್ಲವನ್ನೂ ಪಡೆಯಲು, ನಾನು ನಮ್ಮದಕ್ಕೆ ಸ್ವಲ್ಪ ಸೇರ್ಪಡೆ ಮಾಡಿದ್ದೇನೆ docker-compose.yml:
ಡಾಕರ್-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyಅಲ್ಲಿ ನಾವು ಬೆಳೆಸುತ್ತೇವೆ:
- ಹೋಸ್ಟ್ ಆಗಿ ವರ್ಟಿಕಾ
dwhಅತ್ಯಂತ ಡೀಫಾಲ್ಟ್ ಸೆಟ್ಟಿಂಗ್ಗಳೊಂದಿಗೆ, - SQL ಸರ್ವರ್ನ ಮೂರು ನಿದರ್ಶನಗಳು,
- ನಾವು ನಂತರದ ಡೇಟಾಬೇಸ್ಗಳನ್ನು ಕೆಲವು ಡೇಟಾದೊಂದಿಗೆ ತುಂಬುತ್ತೇವೆ (ಯಾವುದೇ ಸಂದರ್ಭಗಳಲ್ಲಿ ನೋಡುವುದಿಲ್ಲ
mssql_init.py!)
ಕಳೆದ ಬಾರಿಗಿಂತ ಸ್ವಲ್ಪ ಹೆಚ್ಚು ಸಂಕೀರ್ಣವಾದ ಆಜ್ಞೆಯನ್ನು ಬಳಸಿಕೊಂಡು ನಾವು ಎಲ್ಲವನ್ನೂ ಪ್ರಾರಂಭಿಸುತ್ತೇವೆ:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3ನಮ್ಮ ಪವಾಡ ಯಾದೃಚ್ಛಿಕವು ಏನನ್ನು ಉತ್ಪಾದಿಸಿದೆ ಎಂಬುದನ್ನು ಐಟಂ ಬಳಸಿ ಮಾಡಬಹುದು Data Profiling/Ad Hoc Query:

ಮುಖ್ಯ ವಿಷಯವೆಂದರೆ ಅದನ್ನು ವಿಶ್ಲೇಷಕರಿಗೆ ತೋರಿಸಬಾರದು
ವಿವರವಾಗಿ ವಾಸಿಸಿ ETL ಅವಧಿಗಳು ನಾನು ಮಾಡುವುದಿಲ್ಲ, ಎಲ್ಲವೂ ಕ್ಷುಲ್ಲಕವಾಗಿದೆ: ನಾವು ಡೇಟಾಬೇಸ್, ಅದರಲ್ಲಿ ಟೇಬಲ್ ಅನ್ನು ರಚಿಸುತ್ತೇವೆ, ಎಲ್ಲವನ್ನೂ ಸಂದರ್ಭ ನಿರ್ವಾಹಕರೊಂದಿಗೆ ಸುತ್ತಿಕೊಳ್ಳುತ್ತೇವೆ ಮತ್ತು ಈಗ ನಾವು ಇದನ್ನು ಮಾಡುತ್ತೇವೆ:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passಸಮಯ ಬಂದಿದೆ ನಮ್ಮ ಡೇಟಾವನ್ನು ತೆಗೆದುಕೊಳ್ಳಿ ನಮ್ಮ ಒಂದೂವರೆ ನೂರು ಕೋಷ್ಟಕಗಳಿಂದ. ಸರಳವಾದ ಸಾಲುಗಳೊಂದಿಗೆ ಇದನ್ನು ಮಾಡೋಣ:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- ಕೊಕ್ಕೆ ಬಳಸಿ ನಾವು ಗಾಳಿಯ ಹರಿವಿನಿಂದ ಪಡೆಯುತ್ತೇವೆ
pymssql-ಸಂಪರ್ಕಿಸಿ - ವಿನಂತಿಯಲ್ಲಿ ದಿನಾಂಕದ ರೂಪದಲ್ಲಿ ನಿರ್ಬಂಧವನ್ನು ಸೇರಿಸೋಣ - ಟೆಂಪ್ಲೇಟ್ ಎಂಜಿನ್ ಅದನ್ನು ಕಾರ್ಯಕ್ಕೆ ಎಸೆಯುತ್ತದೆ.
- ನಮ್ಮ ವಿನಂತಿಯನ್ನು ಪೋಷಿಸುವುದು
pandasಯಾರು ನಮ್ಮನ್ನು ಪಡೆಯುತ್ತಾರೆDataFrame- ಇದು ಭವಿಷ್ಯದಲ್ಲಿ ನಮಗೆ ಉಪಯುಕ್ತವಾಗಿರುತ್ತದೆ.
ನಾನು ಪರ್ಯಾಯವನ್ನು ಬಳಸುತ್ತಿದ್ದೇನೆ
{dt}ವಿನಂತಿಯ ನಿಯತಾಂಕದ ಬದಲಿಗೆ%sನಾನು ದುಷ್ಟ ಪಿನೋಚ್ಚಿಯೋ ಆಗಿರುವುದರಿಂದ ಅಲ್ಲ, ಆದರೆ ಏಕೆಂದರೆpandasನಿಭಾಯಿಸಲು ಸಾಧ್ಯವಿಲ್ಲpymssqlಮತ್ತು ಅದನ್ನು ಕೊನೆಯದಕ್ಕೆ ಸ್ಲಿಪ್ ಮಾಡುತ್ತದೆparams: List, ಅವನು ನಿಜವಾಗಿಯೂ ಬಯಸುತ್ತಿದ್ದರೂtuple.
ಡೆವಲಪರ್ ಎಂಬುದನ್ನು ಸಹ ಗಮನಿಸಿpymssqlಇನ್ನು ಮುಂದೆ ಅವನನ್ನು ಬೆಂಬಲಿಸದಿರಲು ನಿರ್ಧರಿಸಿದೆ, ಮತ್ತು ಇದು ಹೊರಹೋಗುವ ಸಮಯpyodbc.
ನಮ್ಮ ಕಾರ್ಯಗಳ ಆರ್ಗ್ಯುಮೆಂಟ್ಗಳಲ್ಲಿ ಏರ್ಫ್ಲೋ ಏನು ತುಂಬಿದೆ ಎಂಬುದನ್ನು ನೋಡೋಣ:

ಯಾವುದೇ ಡೇಟಾ ಇಲ್ಲದಿದ್ದರೆ, ಮುಂದುವರಿಯುವುದರಲ್ಲಿ ಯಾವುದೇ ಅರ್ಥವಿಲ್ಲ. ಆದರೆ ಭರ್ತಿ ಯಶಸ್ವಿಯಾಗಿದೆ ಎಂದು ಪರಿಗಣಿಸುವುದು ಸಹ ವಿಚಿತ್ರವಾಗಿದೆ. ಆದರೆ ಇದು ತಪ್ಪಲ್ಲ. ಆಹ್-ಆಹ್, ಏನು ಮಾಡಬೇಕು?! ಇಲ್ಲಿದೆ ನೋಡಿ:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException ವಾಸ್ತವವಾಗಿ ಯಾವುದೇ ದೋಷವಿಲ್ಲ ಎಂದು ಗಾಳಿಯ ಹರಿವು ನಿಮಗೆ ತಿಳಿಸುತ್ತದೆ, ಆದರೆ ನಾವು ಕೆಲಸವನ್ನು ಬಿಟ್ಟುಬಿಡುತ್ತೇವೆ. ಇಂಟರ್ಫೇಸ್ ಹಸಿರು ಅಥವಾ ಕೆಂಪು ಚೌಕವನ್ನು ಹೊಂದಿರುವುದಿಲ್ಲ, ಆದರೆ ಗುಲಾಬಿ ಬಣ್ಣವನ್ನು ಹೊಂದಿರುತ್ತದೆ.
ನಮ್ಮ ಡೇಟಾವನ್ನು ಫೀಡ್ ಮಾಡೋಣ ಹಲವಾರು ಕಾಲಮ್ಗಳು:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])ಅವುಗಳೆಂದರೆ:
- ನಾವು ಆದೇಶಗಳನ್ನು ತೆಗೆದುಕೊಂಡ ಡೇಟಾಬೇಸ್,
- ನಮ್ಮ ಅಪ್ಲೋಡ್ ಸೆಶನ್ನ ಐಡಿ (ಇದು ವಿಭಿನ್ನವಾಗಿರುತ್ತದೆ ಪ್ರತಿ ಕಾರ್ಯಕ್ಕೂ),
- ಮೂಲ ಮತ್ತು ಆರ್ಡರ್ ಐಡೆಂಟಿಫೈಯರ್ನಿಂದ ಹ್ಯಾಶ್ ಮಾಡಿ - ಆದ್ದರಿಂದ ಅಂತಿಮ ಡೇಟಾಬೇಸ್ನಲ್ಲಿ (ಎಲ್ಲವನ್ನೂ ಒಂದೇ ಟೇಬಲ್ಗೆ ಸುರಿಯಲಾಗುತ್ತದೆ) ನಾವು ಅನನ್ಯ ಆರ್ಡರ್ ಐಡೆಂಟಿಫೈಯರ್ ಅನ್ನು ಹೊಂದಿದ್ದೇವೆ.
ಅಂತಿಮ ಹಂತವು ಉಳಿದಿದೆ: ಎಲ್ಲವನ್ನೂ ವರ್ಟಿಕಾಗೆ ಸುರಿಯಿರಿ. ಮತ್ತು, ವಿಚಿತ್ರವಾಗಿ ಸಾಕಷ್ಟು, ಇದನ್ನು ಮಾಡಲು ಅತ್ಯಂತ ಅದ್ಭುತವಾದ ಮತ್ತು ಪರಿಣಾಮಕಾರಿ ಮಾರ್ಗವೆಂದರೆ CSV ಮೂಲಕ!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- ನಾವು ವಿಶೇಷ ಬಂಧನ ಕೇಂದ್ರವನ್ನು ಮಾಡುತ್ತಿದ್ದೇವೆ
StringIO. pandasದಯೆಯಿಂದ ನಮ್ಮದನ್ನು ಅದರಲ್ಲಿ ಹಾಕುತ್ತಾರೆDataFrameಎಂದುCSV- ಸಾಲುಗಳು.- ಕೊಕ್ಕೆ ಬಳಸಿ ನಮ್ಮ ಪ್ರೀತಿಯ ವರ್ಟಿಕಾಗೆ ಸಂಪರ್ಕವನ್ನು ತೆರೆಯೋಣ.
- ಮತ್ತು ಈಗ ಸಹಾಯದಿಂದ
copy()ನಮ್ಮ ಡೇಟಾವನ್ನು ನೇರವಾಗಿ ವರ್ಟಿಕಾಗೆ ಕಳುಹಿಸೋಣ!
ಎಷ್ಟು ಸಾಲುಗಳನ್ನು ತುಂಬಲಾಗಿದೆ ಎಂಬುದನ್ನು ನಾವು ಚಾಲಕರಿಂದ ತೆಗೆದುಕೊಳ್ಳುತ್ತೇವೆ ಮತ್ತು ಎಲ್ಲವೂ ಸರಿಯಾಗಿದೆ ಎಂದು ಸೆಷನ್ ಮ್ಯಾನೇಜರ್ಗೆ ಹೇಳುತ್ತೇವೆ:
session.loaded_rows = cursor.rowcount
session.successful = Trueಅಷ್ಟೇ.
ಉತ್ಪಾದನೆಯಲ್ಲಿ, ನಾವು ಗುರಿ ಫಲಕವನ್ನು ಹಸ್ತಚಾಲಿತವಾಗಿ ರಚಿಸುತ್ತೇವೆ. ಇಲ್ಲಿ ನಾನು ಸಣ್ಣ ಯಂತ್ರವನ್ನು ಅನುಮತಿಸಿದೆ:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)ನಾನು ಬಳಸುತ್ತಿದ್ದೇನೆ
VerticaOperator()ನಾನು ಡೇಟಾಬೇಸ್ ಸ್ಕೀಮಾ ಮತ್ತು ಟೇಬಲ್ ಅನ್ನು ರಚಿಸುತ್ತೇನೆ (ಅವರು ಈಗಾಗಲೇ ಅಸ್ತಿತ್ವದಲ್ಲಿಲ್ಲದಿದ್ದರೆ, ಸಹಜವಾಗಿ). ಅವಲಂಬನೆಗಳನ್ನು ಸರಿಯಾಗಿ ಜೋಡಿಸುವುದು ಮುಖ್ಯ ವಿಷಯ:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadಸಂಕ್ಷಿಪ್ತವಾಗಿ
"ಸರಿ," ಮೌಸ್ ಹೇಳಿದರು, "ಇದು ಈಗ ನಿಜವಲ್ಲವೇ?"
ನಾನು ಕಾಡಿನಲ್ಲಿ ಅತ್ಯಂತ ಭಯಾನಕ ಪ್ರಾಣಿ ಎಂದು ನಿಮಗೆ ಮನವರಿಕೆಯಾಗಿದೆಯೇ?
ಜೂಲಿಯಾ ಡೊನಾಲ್ಡ್ಸನ್, "ದಿ ಗ್ರುಫಲೋ"
ನನ್ನ ಸಹೋದ್ಯೋಗಿಗಳು ಮತ್ತು ನಾನು ಸ್ಪರ್ಧೆಯನ್ನು ಹೊಂದಿದ್ದರೆ: ಮೊದಲಿನಿಂದಲೂ ETL ಪ್ರಕ್ರಿಯೆಯನ್ನು ರಚಿಸಲು ಮತ್ತು ಪ್ರಾರಂಭಿಸಲು ಯಾರು ವೇಗವಾಗಿರುತ್ತಾರೆ ಎಂದು ನಾನು ಭಾವಿಸುತ್ತೇನೆ: ಅವರು ತಮ್ಮ SSIS ಮತ್ತು ಮೌಸ್ನೊಂದಿಗೆ ಮತ್ತು ನಾನು ಗಾಳಿಯ ಹರಿವಿನೊಂದಿಗೆ ... ಮತ್ತು ನಂತರ ನಾವು ನಿರ್ವಹಣೆಯ ಸುಲಭತೆಯನ್ನು ಹೋಲಿಸುತ್ತೇವೆ. ವಾಹ್, ನಾನು ಅವರನ್ನು ಎಲ್ಲಾ ರಂಗಗಳಲ್ಲಿ ಬೈಪಾಸ್ ಮಾಡುತ್ತೇನೆ ಎಂದು ನೀವು ಒಪ್ಪುತ್ತೀರಿ ಎಂದು ನಾನು ಭಾವಿಸುತ್ತೇನೆ!
ಸ್ವಲ್ಪ ಹೆಚ್ಚು ಗಂಭೀರವಾದ ಟಿಪ್ಪಣಿಯಲ್ಲಿ, ಅಪಾಚೆ ಏರ್ಫ್ಲೋ - ಪ್ರೋಗ್ರಾಮ್ ಕೋಡ್ನ ರೂಪದಲ್ಲಿ ಪ್ರಕ್ರಿಯೆಗಳನ್ನು ವಿವರಿಸುವ ಮೂಲಕ - ನನ್ನ ಕೆಲಸವನ್ನು ಮಾಡಿದೆ ಹೆಚ್ಚು ಹೆಚ್ಚು ಅನುಕೂಲಕರ ಮತ್ತು ಆಹ್ಲಾದಕರ.
ಇದರ ಅನಿಯಮಿತ ವಿಸ್ತರಣೆ: ಪ್ಲಗಿನ್ಗಳ ವಿಷಯದಲ್ಲಿ ಮತ್ತು ಸ್ಕೇಲೆಬಿಲಿಟಿಗೆ ಅದರ ಪೂರ್ವಭಾವಿಯಾಗಿ, ಯಾವುದೇ ಪ್ರದೇಶದಲ್ಲಿ ಗಾಳಿಯ ಹರಿವನ್ನು ಬಳಸಲು ನಿಮಗೆ ಅವಕಾಶವನ್ನು ನೀಡುತ್ತದೆ: ಡೇಟಾವನ್ನು ಸಂಗ್ರಹಿಸುವ, ಸಿದ್ಧಪಡಿಸುವ ಮತ್ತು ಸಂಸ್ಕರಿಸುವ ಸಂಪೂರ್ಣ ಚಕ್ರದಲ್ಲಿ ಅಥವಾ ರಾಕೆಟ್ಗಳನ್ನು ಉಡಾವಣೆ ಮಾಡುವಾಗ (ಮಂಗಳಕ್ಕೆ, ಸಹಜವಾಗಿ. )
ಅಂತಿಮ ಭಾಗ, ಉಲ್ಲೇಖ ಮತ್ತು ಮಾಹಿತಿ
ನಾವು ನಿಮಗಾಗಿ ಸಂಗ್ರಹಿಸಿರುವ ಕುಂಟೆ
start_date. ಹೌದು, ಇದು ಈಗಾಗಲೇ ಸ್ಥಳೀಯ ಮೆಮೆ ಆಗಿದೆ. ದಾಗ್ ಅವರ ಮುಖ್ಯ ವಾದದ ಮೂಲಕstart_dateಎಲ್ಲರೂ ಹಾದುಹೋಗುತ್ತಾರೆ. ಸಂಕ್ಷಿಪ್ತವಾಗಿ, ನೀವು ನಿರ್ದಿಷ್ಟಪಡಿಸಿದರೆstart_dateಪ್ರಸ್ತುತ ದಿನಾಂಕ, ಮತ್ತುschedule_interval- ಒಂದು ದಿನ, ನಂತರ DAG ನಾಳೆಗಿಂತ ಮುಂಚಿತವಾಗಿ ಪ್ರಾರಂಭಿಸುವುದಿಲ್ಲ.start_date = datetime(2020, 7, 7, 0, 1, 2)ಮತ್ತು ಹೆಚ್ಚಿನ ಸಮಸ್ಯೆಗಳಿಲ್ಲ.
ಮತ್ತೊಂದು ಮರಣದಂಡನೆ ದೋಷವು ಅದರೊಂದಿಗೆ ಸಂಬಂಧಿಸಿದೆ:
Task is missing the start_date parameter, ನೀವು ಆಪರೇಟರ್ಗೆ ಡ್ಯಾಗ್ ಅನ್ನು ಬಂಧಿಸಲು ಮರೆತಿದ್ದೀರಿ ಎಂದು ಇದು ಹೆಚ್ಚಾಗಿ ಸೂಚಿಸುತ್ತದೆ.- ಎಲ್ಲವೂ ಒಂದೇ ಯಂತ್ರದಲ್ಲಿ. ಹೌದು, ಮತ್ತು ಡೇಟಾಬೇಸ್ಗಳು (ಏರ್ಫ್ಲೋ ಸ್ವತಃ ಮತ್ತು ನಮ್ಮ ಲೇಪನ), ಮತ್ತು ವೆಬ್ ಸರ್ವರ್, ಮತ್ತು ಶೆಡ್ಯೂಲರ್ ಮತ್ತು ಕೆಲಸಗಾರರು. ಮತ್ತು ಅದು ಸಹ ಕೆಲಸ ಮಾಡಿದೆ. ಆದರೆ ಕಾಲಾನಂತರದಲ್ಲಿ, ಸೇವೆಗಳಿಗೆ ಕಾರ್ಯಗಳ ಸಂಖ್ಯೆಯು ಬೆಳೆಯಿತು, ಮತ್ತು PostgreSQL 20 ms ಬದಲಿಗೆ 5 ಸೆಕೆಂಡುಗಳಲ್ಲಿ ಸೂಚ್ಯಂಕಕ್ಕೆ ಪ್ರತಿಕ್ರಿಯಿಸಲು ಪ್ರಾರಂಭಿಸಿದಾಗ, ನಾವು ಅದನ್ನು ತೆಗೆದುಕೊಂಡು ಅದನ್ನು ಸಾಗಿಸಿದ್ದೇವೆ.
- ಲೋಕಲ್ ಎಕ್ಸಿಕ್ಯೂಟರ್. ಹೌದು, ನಾವು ಇನ್ನೂ ಅದರ ಮೇಲೆ ಕುಳಿತಿದ್ದೇವೆ ಮತ್ತು ನಾವು ಈಗಾಗಲೇ ಪ್ರಪಾತದ ಅಂಚಿಗೆ ಬಂದಿದ್ದೇವೆ. LocalExecutor ಇಲ್ಲಿಯವರೆಗೆ ನಮಗೆ ಸಾಕಾಗಿತ್ತು, ಆದರೆ ಈಗ ಕನಿಷ್ಠ ಒಬ್ಬ ಕೆಲಸಗಾರರೊಂದಿಗೆ ವಿಸ್ತರಿಸುವ ಸಮಯ ಬಂದಿದೆ ಮತ್ತು CeleryExecutor ಗೆ ಹೋಗಲು ನಾವು ಹೆಚ್ಚು ಶ್ರಮಿಸಬೇಕಾಗುತ್ತದೆ. ಮತ್ತು ನೀವು ಅದರೊಂದಿಗೆ ಒಂದು ಯಂತ್ರದಲ್ಲಿ ಕೆಲಸ ಮಾಡಬಹುದಾದ್ದರಿಂದ, ಸರ್ವರ್ನಲ್ಲಿಯೂ ಸಹ ಸೆಲರಿಯನ್ನು ಬಳಸುವುದನ್ನು ಯಾವುದೂ ತಡೆಯುವುದಿಲ್ಲ, ಅದು "ನೈಸರ್ಗಿಕವಾಗಿ ಎಂದಿಗೂ ಉತ್ಪಾದನೆಗೆ ಹೋಗುವುದಿಲ್ಲ, ಪ್ರಾಮಾಣಿಕವಾಗಿ!"
- ಬಳಕೆಯಾಗದಿರುವುದು ಅಂತರ್ನಿರ್ಮಿತ ಉಪಕರಣಗಳು:
- ಸಂಪರ್ಕಗಳು ಸೇವಾ ರುಜುವಾತುಗಳನ್ನು ಸಂಗ್ರಹಿಸಲು,
- SLA ಮಿಸ್ ಸಮಯಕ್ಕೆ ಪೂರ್ಣಗೊಳ್ಳದ ಕಾರ್ಯಗಳಿಗೆ ಪ್ರತಿಕ್ರಿಯಿಸಲು,
- xcom ಮೆಟಾಡೇಟಾವನ್ನು ವಿನಿಮಯ ಮಾಡಿಕೊಳ್ಳಲು (ನಾನು ಹೇಳಿದೆ ಮೆಟಾಡೇಟಾ!) ಡಾಗ್ನ ಕಾರ್ಯಗಳ ನಡುವೆ.
- ಮೇಲ್ ದುರುಪಯೋಗ. ಸರಿ ನಾನು ಏನು ಹೇಳಬಲ್ಲೆ? ಕೈಬಿಡಲಾದ ಕಾರ್ಯಗಳ ಎಲ್ಲಾ ಪುನರಾವರ್ತನೆಗಳಿಗಾಗಿ ಎಚ್ಚರಿಕೆಗಳನ್ನು ಹೊಂದಿಸಲಾಗಿದೆ. ಈಗ ನನ್ನ ಕೆಲಸ Gmail ನಲ್ಲಿ ಏರ್ಫ್ಲೋ ನಿಂದ 90k ಅಕ್ಷರಗಳಿವೆ ಮತ್ತು ವೆಬ್ ಮೇಲ್ ಮುಖವು ಒಂದು ಸಮಯದಲ್ಲಿ 100 ಕ್ಕೂ ಹೆಚ್ಚು ತುಣುಕುಗಳನ್ನು ತೆಗೆದುಕೊಳ್ಳಲು ಮತ್ತು ಅಳಿಸಲು ನಿರಾಕರಿಸುತ್ತದೆ.
ಇನ್ನಷ್ಟು ಅಪಾಯಗಳು:
ಇನ್ನೂ ಹೆಚ್ಚಿನ ಯಾಂತ್ರೀಕೃತಗೊಂಡ ಸಾಧನಗಳು
ನಮ್ಮ ಕೈಗಳಿಂದ ಅಲ್ಲ ಮತ್ತು ನಮ್ಮ ತಲೆಯಿಂದ ಇನ್ನಷ್ಟು ಕೆಲಸ ಮಾಡಲು, ಏರ್ಫ್ಲೋ ಇದನ್ನು ನಮಗಾಗಿ ಸಿದ್ಧಪಡಿಸಿದೆ:
- — ಇದು ಇನ್ನೂ ಪ್ರಾಯೋಗಿಕ ಸ್ಥಿತಿಯನ್ನು ಹೊಂದಿದೆ, ಅದು ಕೆಲಸ ಮಾಡುವುದನ್ನು ತಡೆಯುವುದಿಲ್ಲ. ಅದರ ಸಹಾಯದಿಂದ, ನೀವು ಡ್ಯಾಗ್ಗಳು ಮತ್ತು ಕಾರ್ಯಗಳ ಬಗ್ಗೆ ಮಾಹಿತಿಯನ್ನು ಮಾತ್ರ ಸ್ವೀಕರಿಸಬಹುದು, ಆದರೆ ಡ್ಯಾಗ್ ಅನ್ನು ನಿಲ್ಲಿಸಿ / ಪ್ರಾರಂಭಿಸಿ, DAG ರನ್ ಅಥವಾ ಪೂಲ್ ಅನ್ನು ರಚಿಸಿ.
- — WebUI ಮೂಲಕ ಬಳಸಲು ಅನನುಕೂಲವಾಗದಿದ್ದರೂ ಸಂಪೂರ್ಣವಾಗಿ ಇಲ್ಲದಿರುವ ಆಜ್ಞಾ ಸಾಲಿನ ಮೂಲಕ ಅನೇಕ ಉಪಕರಣಗಳು ಲಭ್ಯವಿವೆ. ಉದಾಹರಣೆಗೆ:
backfillಕಾರ್ಯ ನಿದರ್ಶನಗಳನ್ನು ಮರುಪ್ರಾರಂಭಿಸಲು ಅಗತ್ಯವಿದೆ.
ಉದಾಹರಣೆಗೆ, ವಿಶ್ಲೇಷಕರು ಬಂದು ಹೇಳಿದರು: “ಮತ್ತು ನಿಮ್ಮ ಡೇಟಾ, ಒಡನಾಡಿ, ಜನವರಿ 1 ರಿಂದ ಜನವರಿ 13 ರವರೆಗೆ ಅಸಂಬದ್ಧವಾಗಿದೆ! ಸರಿಪಡಿಸಿ, ಸರಿಪಡಿಸಿ, ಸರಿಪಡಿಸಿ, ಸರಿಪಡಿಸಿ! ಮತ್ತು ನೀವು ಅಂತಹ ಹೋಬಾ:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- ಬೇಸ್ ನಿರ್ವಹಣೆ:
initdb,resetdb,upgradedb,checkdb. run, ಇದು ಒಂದು ಕಾರ್ಯ ನಿದರ್ಶನವನ್ನು ಪ್ರಾರಂಭಿಸಲು ನಿಮಗೆ ಅನುಮತಿಸುತ್ತದೆ ಮತ್ತು ಎಲ್ಲಾ ಅವಲಂಬನೆಗಳನ್ನು ಮರೆತುಬಿಡಿ. ಇದಲ್ಲದೆ, ನೀವು ಅದನ್ನು ಚಲಾಯಿಸಬಹುದುLocalExecutor, ನೀವು ಸೆಲರಿ ಕ್ಲಸ್ಟರ್ ಅನ್ನು ಹೊಂದಿದ್ದರೂ ಸಹ.- ಸರಿಸುಮಾರು ಅದೇ ಕೆಲಸವನ್ನು ಮಾಡುತ್ತದೆ
test, ಆದರೆ ಇದು ಡೇಟಾಬೇಸ್ಗೆ ಏನನ್ನೂ ಬರೆಯುವುದಿಲ್ಲ. connectionsಶೆಲ್ನಿಂದ ದೊಡ್ಡ ಪ್ರಮಾಣದಲ್ಲಿ ಸಂಪರ್ಕಗಳನ್ನು ರಚಿಸಲು ನಿಮಗೆ ಅನುಮತಿಸುತ್ತದೆ.
- - ಪರಸ್ಪರ ಕ್ರಿಯೆಯ ಬದಲಿಗೆ ಹಾರ್ಡ್ಕೋರ್ ವಿಧಾನ, ಇದು ಪ್ಲಗಿನ್ಗಳಿಗಾಗಿ ಉದ್ದೇಶಿಸಲಾಗಿದೆ ಮತ್ತು ನಿಮ್ಮ ಕೈಗಳಿಂದ ಅದರೊಂದಿಗೆ ಟಿಂಕರ್ ಮಾಡಬಾರದು. ಆದರೆ ನಮ್ಮನ್ನು ಹೋಗದಂತೆ ತಡೆಯುವವರು ಯಾರು
/home/airflow/dags, ಓಡುipythonಮತ್ತು ಗೊಂದಲವನ್ನು ಪ್ರಾರಂಭಿಸುವುದೇ? ಉದಾಹರಣೆಗೆ, ಈ ಕೋಡ್ ಅನ್ನು ಬಳಸಿಕೊಂಡು ನೀವು ಎಲ್ಲಾ ಸಂಪರ್ಕಗಳನ್ನು ರಫ್ತು ಮಾಡಬಹುದು:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - ಏರ್ಫ್ಲೋ ಮೆಟಾಡೇಟಾ ಡೇಟಾಬೇಸ್ಗೆ ಸಂಪರ್ಕಪಡಿಸಿ. ನಾನು ಅದನ್ನು ಬರೆಯಲು ಶಿಫಾರಸು ಮಾಡುವುದಿಲ್ಲ, ಆದರೆ ನೀವು ಯಾವುದೇ API ಗಳಿಗಿಂತ ಹೆಚ್ಚು ವೇಗವಾಗಿ ಮತ್ತು ಸುಲಭವಾಗಿ ವಿವಿಧ ನಿರ್ದಿಷ್ಟ ಮೆಟ್ರಿಕ್ಗಳಿಗಾಗಿ ಕಾರ್ಯ ಸ್ಥಿತಿಗಳನ್ನು ಪಡೆಯಬಹುದು.
ನಮ್ಮ ಎಲ್ಲಾ ಕಾರ್ಯಗಳು ದುರ್ಬಲವಾಗಿಲ್ಲ ಎಂದು ಹೇಳೋಣ, ಆದರೆ ಅವು ಕೆಲವೊಮ್ಮೆ ವಿಫಲವಾಗಬಹುದು ಮತ್ತು ಇದು ಸಾಮಾನ್ಯವಾಗಿದೆ. ಆದರೆ ಕೆಲವು ಕಲ್ಲುಮಣ್ಣುಗಳು ಈಗಾಗಲೇ ಅನುಮಾನಾಸ್ಪದವಾಗಿದೆ ಮತ್ತು ನಾವು ಅದನ್ನು ಪರಿಶೀಲಿಸಬೇಕಾಗಿದೆ.
ಹುಷಾರಾಗಿರು, SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
ಉಲ್ಲೇಖಗಳು
ಸರಿ, ಸಹಜವಾಗಿ, Google ನಿಂದ ಮೊದಲ ಹತ್ತು ಲಿಂಕ್ಗಳು ನನ್ನ ಬುಕ್ಮಾರ್ಕ್ಗಳಿಂದ ಏರ್ಫ್ಲೋ ಫೋಲ್ಡರ್ನ ವಿಷಯಗಳಾಗಿವೆ.
- - ಸಹಜವಾಗಿ, ನಾವು ಕಚೇರಿಯಿಂದ ಪ್ರಾರಂಭಿಸಬೇಕಾಗಿದೆ. ದಸ್ತಾವೇಜನ್ನು, ಆದರೆ ಯಾರು ಸೂಚನೆಗಳನ್ನು ಓದುತ್ತಾರೆ?
- - ಸರಿ, ಕನಿಷ್ಠ ರಚನೆಕಾರರಿಂದ ಶಿಫಾರಸುಗಳನ್ನು ಓದಿ.
- — ಅತ್ಯಂತ ಆರಂಭ: ಚಿತ್ರಗಳಲ್ಲಿ ಬಳಕೆದಾರ ಇಂಟರ್ಫೇಸ್
- - ನೀವು ನನ್ನಿಂದ ಏನನ್ನಾದರೂ ಅರ್ಥಮಾಡಿಕೊಳ್ಳದಿದ್ದರೆ (ಇದ್ದಕ್ಕಿದ್ದಂತೆ!) ಮೂಲ ಪರಿಕಲ್ಪನೆಗಳನ್ನು ಚೆನ್ನಾಗಿ ವಿವರಿಸಲಾಗಿದೆ.
- - ಏರ್ ಫ್ಲೋ ಕ್ಲಸ್ಟರ್ ಅನ್ನು ಹೊಂದಿಸಲು ಒಂದು ಕಿರು ಮಾರ್ಗದರ್ಶಿ.
- - ಹೆಚ್ಚು ಔಪಚಾರಿಕತೆ ಮತ್ತು ಕಡಿಮೆ ಉದಾಹರಣೆಗಳನ್ನು ಹೊರತುಪಡಿಸಿ ಬಹುತೇಕ ಅದೇ ಆಸಕ್ತಿದಾಯಕ ಲೇಖನ.
- - ಸೆಲೆರಿ ಜೊತೆಯಲ್ಲಿ ಕೆಲಸ ಮಾಡುವ ಬಗ್ಗೆ.
- - ಕಾರ್ಯಗಳ ಅಸಮರ್ಥತೆ, ದಿನಾಂಕದ ಬದಲಿಗೆ ID ಮೂಲಕ ಲೋಡ್ ಮಾಡುವುದು, ರೂಪಾಂತರಗಳು, ಫೈಲ್ ರಚನೆ ಮತ್ತು ಇತರ ಆಸಕ್ತಿದಾಯಕ ವಿಷಯಗಳ ಬಗ್ಗೆ.
- — ಟಾಸ್ಕ್ ಮತ್ತು ಟ್ರಿಗ್ಗರ್ ರೂಲ್ ಅವಲಂಬನೆಗಳು, ನಾನು ಹಾದುಹೋಗುವಲ್ಲಿ ಮಾತ್ರ ಉಲ್ಲೇಖಿಸಿದ್ದೇನೆ.
- — ಶೆಡ್ಯೂಲರ್ನೊಂದಿಗೆ ಕೆಲವು "ಉದ್ದೇಶಿತ ಕೆಲಸಗಳು" ಸಮಸ್ಯೆಗಳನ್ನು ನಿವಾರಿಸುವುದು ಹೇಗೆ, ಕಳೆದುಹೋದ ಡೇಟಾವನ್ನು ಲೋಡ್ ಮಾಡುವುದು ಮತ್ತು ಕಾರ್ಯಗಳಿಗೆ ಆದ್ಯತೆ ನೀಡುವುದು.
- - ಏರ್ಫ್ಲೋ ಮೆಟಾಡೇಟಾಕ್ಕೆ ಉಪಯುಕ್ತ SQL ಪ್ರಶ್ನೆಗಳು.
- - ಕಸ್ಟಮ್ ಸಂವೇದಕವನ್ನು ರಚಿಸುವ ಬಗ್ಗೆ ಉಪಯುಕ್ತ ವಿಭಾಗವಿದೆ.
- - ಡೇಟಾ ಸೈನ್ಸ್ಗಾಗಿ AWS ನಲ್ಲಿ ಮೂಲಸೌಕರ್ಯವನ್ನು ನಿರ್ಮಿಸುವ ಬಗ್ಗೆ ಆಸಕ್ತಿದಾಯಕ ಕಿರು ಟಿಪ್ಪಣಿ.
- - ಸಾಮಾನ್ಯ ತಪ್ಪುಗಳು (ಕೆಲವರು ಇನ್ನೂ ಸೂಚನೆಗಳನ್ನು ಓದದಿದ್ದಾಗ).
- — ನೀವು ಕೇವಲ ಸಂಪರ್ಕಗಳನ್ನು ಬಳಸಬಹುದಾದರೂ ಪಾಸ್ವರ್ಡ್ಗಳನ್ನು ಸಂಗ್ರಹಿಸುವಲ್ಲಿ ಜನರು ಹೇಗೆ ಕಷ್ಟಪಡುತ್ತಾರೆ ಎಂಬುದನ್ನು ನೋಡಿ ಕಿರುನಗೆ.
- — ಸೂಚ್ಯ DAG ಫಾರ್ವರ್ಡ್ ಮಾಡುವಿಕೆ, ಕಾರ್ಯಗಳಿಗೆ ಸಂದರ್ಭವನ್ನು ಎಸೆಯುವುದು, ಮತ್ತೊಮ್ಮೆ ಅವಲಂಬನೆಗಳ ಬಗ್ಗೆ ಮತ್ತು ಟಾಸ್ಕ್ ಲಾಂಚ್ಗಳನ್ನು ಬಿಟ್ಟುಬಿಡುವುದು.
- - ಬಳಕೆಯ ಬಗ್ಗೆ
default argumentsиparamsಟೆಂಪ್ಲೇಟ್ಗಳಲ್ಲಿ, ಹಾಗೆಯೇ ವೇರಿಯೇಬಲ್ಗಳು ಮತ್ತು ಸಂಪರ್ಕಗಳ ಬಗ್ಗೆ. - — ಏರ್ಫ್ಲೋ 2.0 ಗಾಗಿ ಯೋಜಕವನ್ನು ಹೇಗೆ ಸಿದ್ಧಪಡಿಸಲಾಗುತ್ತಿದೆ ಎಂಬುದರ ಕುರಿತು ಒಂದು ಕಥೆ.
- - ನಮ್ಮ ಕ್ಲಸ್ಟರ್ನ ನಿಯೋಜನೆಯ ಕುರಿತು ಸ್ವಲ್ಪ ಹಳೆಯ ಲೇಖನ
docker-compose. - - ಟೆಂಪ್ಲೇಟ್ಗಳು ಮತ್ತು ಸಂದರ್ಭ ಫಾರ್ವರ್ಡ್ ಮಾಡುವ ಮೂಲಕ ಕ್ರಿಯಾತ್ಮಕ ಕಾರ್ಯಗಳು.
- - ಮೇಲ್ ಮತ್ತು ಸ್ಲಾಕ್ ಮೂಲಕ ಪ್ರಮಾಣಿತ ಮತ್ತು ಕಸ್ಟಮ್ ಅಧಿಸೂಚನೆಗಳು.
- — ಶಾಖೆಯ ಕಾರ್ಯಗಳು, ಮ್ಯಾಕ್ರೋಗಳು ಮತ್ತು XCom.
ಮತ್ತು ಲೇಖನದಲ್ಲಿ ಒಳಗೊಂಡಿರುವ ಲಿಂಕ್ಗಳು:
- - ಟೆಂಪ್ಲೇಟ್ಗಳಲ್ಲಿ ಬಳಸಲು ಪ್ಲೇಸ್ಹೋಲ್ಡರ್ಗಳು ಲಭ್ಯವಿದೆ.
- - ಡ್ಯಾಗ್ಗಳನ್ನು ರಚಿಸುವಾಗ ಸಾಮಾನ್ಯ ತಪ್ಪುಗಳು.
- -
docker-composeಪ್ರಯೋಗಗಳು, ಡೀಬಗ್ ಮಾಡುವಿಕೆ ಮತ್ತು ಹೆಚ್ಚಿನವುಗಳಿಗಾಗಿ. - - ಟೆಲಿಗ್ರಾಮ್ REST API ಗಾಗಿ ಪೈಥಾನ್ ಹೊದಿಕೆ.
ಮೂಲ: www.habr.com




