เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

เจนเฉˆเจฒเฉ‹, เจฎเฉˆเจ‚ เจฆเจฎเจฟเจคเจฐเฉ€ เจฒเฉŒเจ—เจตเจฟเจจเฉ‡เจจเจ•เฉ‹ เจนเจพเจ‚ - เจตเฉ‡เจœเจผเฉ‡เจŸ เจ—เจฐเฉเฉฑเจช เจ†เจซเจผ เจ•เฉฐเจชเจจเฉ€เจ†เจ‚ เจฆเฉ‡ เจตเจฟเจธเจผเจฒเฉ‡เจธเจผเจฃ เจตเจฟเจญเจพเจ— เจฆเจพ เจกเฉ‡เจŸเจพ เจ‡เฉฐเจœเฉ€เจจเฉ€เจ…เจฐเฅค

เจฎเฉˆเจ‚ เจคเฉเจนเจพเจจเฉ‚เฉฐ เจˆเจŸเฉ€เจเจฒ เจชเฉเจฐเจ•เจฟเจฐเจฟเจ†เจตเจพเจ‚ เจฆเฉ‡ เจตเจฟเจ•เจพเจธ เจฒเจˆ เจ‡เฉฑเจ• เจธเจผเจพเจจเจฆเจพเจฐ เจŸเฉ‚เจฒ เจฌเจพเจฐเฉ‡ เจฆเฉฑเจธเจพเจ‚เจ—เจพ - เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹. เจชเจฐ เจเจ…เจฐเจซเจฒเฉ‹ เจ‡เฉฐเจจเจพ เจฌเจนเฉเจชเฉฑเจ–เฉ€ เจ…เจคเฉ‡ เจฌเจนเฉเจชเฉฑเจ–เฉ€ เจนเฉˆ เจ•เจฟ เจคเฉเจนเจพเจจเฉ‚เฉฐ เจ‡เจธ 'เจคเฉ‡ เจกเฉ‚เฉฐเจ˜เจพเจˆ เจจเจพเจฒ เจตเจฟเจšเจพเจฐ เจ•เจฐเจจเจพ เจšเจพเจนเฉ€เจฆเจพ เจนเฉˆ เจญเจพเจตเฉ‡เจ‚ เจคเฉเจธเฉ€เจ‚ เจกเฉ‡เจŸเจพ เจฆเฉ‡ เจชเฉเจฐเจตเจพเจน เจตเจฟเฉฑเจš เจธเจผเจพเจฎเจฒ เจจเจพ เจนเฉ‹เจตเฉ‹, เจชเจฐ เจธเจฎเฉ‡เจ‚-เจธเจฎเฉ‡เจ‚ 'เจคเฉ‡ เจ•เจฟเจธเฉ‡ เจตเฉ€ เจชเฉเจฐเจ•เจฟเจฐเจฟเจ† เจจเฉ‚เฉฐ เจธเจผเฉเจฐเฉ‚ เจ•เจฐเจจ เจ…เจคเฉ‡ เจ‰เจนเจจเจพเจ‚ เจฆเฉ‡ เจเจ—เจœเจผเฉ€เจ•เจฟเจŠเจธเจผเจจ เจฆเฉ€ เจจเจฟเจ—เจฐเจพเจจเฉ€ เจ•เจฐเจจ เจฆเฉ€ เจฒเฉ‹เฉœ เจนเฉเฉฐเจฆเฉ€ เจนเฉˆเฅค

เจ…เจคเฉ‡ เจนเจพเจ‚, เจฎเฉˆเจ‚ เจจเจพ เจธเจฟเจฐเจซ เจฆเฉฑเจธเจพเจ‚เจ—เจพ, เจฌเจฒเจ•เจฟ เจ‡เจน เจตเฉ€ เจฆเจฟเจ–เจพเจตเจพเจ‚เจ—เจพ: เจชเฉเจฐเฉ‹เจ—เจฐเจพเจฎ เจตเจฟเฉฑเจš เจฌเจนเฉเจค เจธเจพเจฐเฉ‡ เจ•เฉ‹เจก, เจธเจ•เฉเจฐเฉ€เจจเจธเจผเจพเจŸ เจ…เจคเฉ‡ เจธเจฟเจซเจผเจพเจฐเจธเจผเจพเจ‚ เจนเจจ.

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ
เจœเจฆเฉ‹เจ‚ เจคเฉเจธเฉ€เจ‚ เจเจ…เจฐเจซเจฒเฉ‹ / เจตเจฟเจ•เฉ€เจฎเฉ€เจกเฉ€เจ† เจ•เจพเจฎเจจเจœเจผ เจธเจผเจฌเจฆ เจจเฉ‚เฉฐ เจ—เฉ‚เจ—เจฒ เจ•เจฐเจฆเฉ‡ เจนเฉ‹ เจคเจพเจ‚ เจคเฉเจธเฉ€เจ‚ เจ†เจฎ เจคเฉŒเจฐ 'เจคเฉ‡ เจ•เฉ€ เจฆเฉ‡เจ–เจฆเฉ‡ เจนเฉ‹

เจตเจฟเจธเจผเจพ-เจธเฉ‚เจšเฉ€

เจœเจพเจฃ เจชเจ›เจพเจฃ

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹ เจœเฉˆเจ‚เจ—เฉ‹ เจตเจพเจ‚เจ— เจนเฉˆ:

  • python เจตเจฟเฉฑเจš เจฒเจฟเจ–เจฟเจ†
  • เจ‡เฉฑเจ• เจตเจงเฉ€เจ† เจเจกเจฎเจฟเจจ เจชเฉˆเจจเจฒ เจนเฉˆ,
  • เจ…เจจเจฟเจธเจผเจšเจฟเจค เจธเจฎเฉ‡เจ‚ เจฒเจˆ เจตเจฟเจธเจคเจพเจฐเจฏเฉ‹เจ—

- เจธเจฟเจฐเจซ เจฌเจฟเจนเจคเจฐ, เจ…เจคเฉ‡ เจ‡เจน เจฌเจฟเจฒเจ•เฉเจฒ เจตเฉฑเจ–เจฐเฉ‡ เจ‰เจฆเฉ‡เจธเจผเจพเจ‚ เจฒเจˆ เจฌเจฃเจพเจ‡เจ† เจ—เจฟเจ† เจธเฉ€, เจ…เจฐเจฅเจพเจค (เจœเจฟเจตเฉ‡เจ‚ เจ•เจฟ เจ‡เจน เจ•เจพเจŸเจพ เจคเฉ‹เจ‚ เจชเจนเจฟเจฒเจพเจ‚ เจฒเจฟเจ–เจฟเจ† เจ—เจฟเจ† เจนเฉˆ):

  • เจ…เจธเฉ€เจฎเจค เจ—เจฟเจฃเจคเฉ€ เจฆเฉ€เจ†เจ‚ เจฎเจธเจผเฉ€เจจเจพเจ‚ 'เจคเฉ‡ เจ•เฉฐเจฎ เจšเจฒเจพเจ‰เจฃเจพ เจ…เจคเฉ‡ เจจเจฟเจ—เจฐเจพเจจเฉ€ เจ•เจฐเจจเจพ (เจœเจฟเฉฐเจจเฉ‡ เจธเฉˆเจฒเจฐเฉ€ / เจ•เฉเจฌเจฐเจจเฉ‡เจŸเจธ เจ…เจคเฉ‡ เจคเฉเจนเจพเจกเฉ€ เจœเจผเจฎเฉ€เจฐ เจคเฉเจนเจพเจจเฉ‚เฉฐ เจ‡เจœเจพเจœเจผเจค เจฆเฉ‡เจตเฉ‡เจ—เฉ€)
  • เจชเจพเจˆเจฅเจจ เจ•เฉ‹เจก เจจเฉ‚เฉฐ เจฒเจฟเจ–เจฃ เจ…เจคเฉ‡ เจธเจฎเจเจฃ เจฒเจˆ เจฌเจนเฉเจค เจนเฉ€ เจ†เจธเจพเจจ เจคเฉ‹เจ‚ เจกเจพเจ‡เจจเจพเจฎเจฟเจ• เจตเจฐเจ•เจซเจฒเฉ‹ เจชเฉ€เฉœเฉเจนเฉ€ เจฆเฉ‡ เจจเจพเจฒ
  • เจ…เจคเฉ‡ เจ•เจฟเจธเฉ‡ เจตเฉ€ เจกเฉ‡เจŸเจพเจฌเฉ‡เจธ เจ…เจคเฉ‡ API เจจเฉ‚เฉฐ เจ‡เฉฑเจ• เจฆเฉ‚เจœเฉ‡ เจจเจพเจฒ เจคเจฟเจ†เจฐ เจ•เฉ€เจคเฉ‡ เจญเจพเจ—เจพเจ‚ เจ…เจคเฉ‡ เจ˜เจฐเฉ‡เจฒเฉ‚-เจฌเจฃเฉ‡ เจชเจฒเฉฑเจ—เจ‡เจจเจพเจ‚ (เจœเฉ‹ เจ•เจฟ เจฌเจนเฉเจค เจนเฉ€ เจธเจงเจพเจฐเจจ เจนเฉˆ) เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจ•เฉ‡ เจ‡เฉฑเจ• เจฆเฉ‚เจœเฉ‡ เจจเจพเจฒ เจœเฉ‹เฉœเจจ เจฆเฉ€ เจธเจฎเจฐเฉฑเจฅเจพ เจนเฉˆเฅค

เจ…เจธเฉ€เจ‚ เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹ เจจเฉ‚เฉฐ เจ‡เจธ เจคเจฐเฉเจนเจพเจ‚ เจตเจฐเจคเจฆเฉ‡ เจนเจพเจ‚:

  • เจ…เจธเฉ€เจ‚ DWH เจ…เจคเฉ‡ ODS (เจธเจพเจกเฉ‡ เจ•เฉ‹เจฒ Vertica เจ…เจคเฉ‡ Clickhouse เจนเฉˆ) เจตเจฟเฉฑเจš เจตเฉฑเจ–-เจตเฉฑเจ– เจธเจฐเฉ‹เจคเจพเจ‚ (เจฌเจนเฉเจค เจธเจพเจฐเฉ‡ SQL เจธเจฐเจตเจฐ เจ…เจคเฉ‡ PostgreSQL เจ‰เจฆเจพเจนเจฐเจจเจพเจ‚, เจเจชเจฒเฉ€เจ•เฉ‡เจธเจผเจจ เจฎเฉˆเจŸเฉเจฐเจฟเจ•เจธ เจฆเฉ‡ เจจเจพเจฒ เจตเฉฑเจ–-เจตเฉฑเจ– API, เจ‡เฉฑเจฅเฉ‹เจ‚ เจคเฉฑเจ• เจ•เจฟ 1C) เจคเฉ‹เจ‚ เจกเจพเจŸเจพ เจ‡เจ•เฉฑเจ เจพ เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚เฅค
  • เจ•เจฟเฉฐเจจเจพ เจ‰เฉฑเจจเจค cron, เจœเฉ‹ เจ•เจฟ ODS 'เจคเฉ‡ เจกเจพเจŸเจพ เจ‡เจ•เจธเฉเจฐเจคเจพ เจชเฉเจฐเจ•เจฟเจฐเจฟเจ†เจตเจพเจ‚ เจจเฉ‚เฉฐ เจธเจผเฉเจฐเฉ‚ เจ•เจฐเจฆเจพ เจนเฉˆ, เจ…เจคเฉ‡ เจ‰เจนเจจเจพเจ‚ เจฆเฉ‡ เจฐเฉฑเจ–-เจฐเจ–เจพเจ… เจฆเฉ€ เจจเจฟเจ—เจฐเจพเจจเฉ€ เจตเฉ€ เจ•เจฐเจฆเจพ เจนเฉˆเฅค

เจนเจพเจฒ เจนเฉ€ เจตเจฟเฉฑเจš, เจธเจพเจกเฉ€เจ†เจ‚ เจฒเฉ‹เฉœเจพเจ‚ เจจเฉ‚เฉฐ 32 เจ•เฉ‹เจฐ เจ…เจคเฉ‡ 50 GB RAM เจตเจพเจฒเฉ‡ เจ‡เฉฑเจ• เจ›เฉ‹เจŸเฉ‡ เจธเจฐเจตเจฐ เจฆเฉเจ†เจฐเจพ เจ•เจตเจฐ เจ•เฉ€เจคเจพ เจ—เจฟเจ† เจธเฉ€เฅค เจเจ…เจฐเจซเจฒเฉ‹ เจตเจฟเฉฑเจš, เจ‡เจน เจ•เฉฐเจฎ เจ•เจฐเจฆเจพ เจนเฉˆ:

  • เจนเฉ‹เจฐ 200 เจกเจพเจ—เจธ (เจ…เจธเจฒ เจตเจฟเฉฑเจš เจตเจฐเจ•เจซเจฒเฉ‹, เจœเจฟเจธ เจตเจฟเฉฑเจš เจ…เจธเฉ€เจ‚ เจ•เฉฐเจฎ เจญเจฐเจฆเฉ‡ เจนเจพเจ‚),
  • เจ”เจธเจค 'เจคเฉ‡ เจนเจฐ เจตเจฟเฉฑเจš 70 เจ•เจพเจฐเจœ,
  • เจ‡เจน เจšเฉฐเจ—เจฟเจ†เจˆ เจธเจผเฉเจฐเฉ‚ เจนเฉเฉฐเจฆเฉ€ เจนเฉˆ (เจ”เจธเจคเจจ เจตเฉ€) เจ‡เฉฑเจ• เจ˜เฉฐเจŸเฉ‡ เจตเจฟเฉฑเจš เจ‡เฉฑเจ• เจตเจพเจฐ.

เจ…เจคเฉ‡ เจ‡เจธ เจฌเจพเจฐเฉ‡ เจ•เจฟ เจ…เจธเฉ€เจ‚ เจ•เจฟเจตเฉ‡เจ‚ เจตเจฟเจธเจคเจพเจฐ เจ•เฉ€เจคเจพ, เจฎเฉˆเจ‚ เจนเฉ‡เจ เจพเจ‚ เจฒเจฟเจ–เจพเจ‚เจ—เจพ, เจชเจฐ เจนเฉเจฃ เจ…เจธเฉ€เจ‚ รผber-เจธเจฎเฉฑเจธเจฟเจ† เจจเฉ‚เฉฐ เจชเจฐเจฟเจญเจพเจธเจผเจฟเจค เจ•เจฐเฉ€เจ เจœเจฟเจธ เจจเฉ‚เฉฐ เจ…เจธเฉ€เจ‚ เจนเฉฑเจฒ เจ•เจฐเจพเจ‚เจ—เฉ‡:

เจ‡เฉฑเจฅเฉ‡ เจคเจฟเฉฐเจจ เจ…เจธเจฒเฉ€ SQL เจธเจฐเจตเจฐ เจนเจจ, เจนเจฐ เจ‡เฉฑเจ• เจตเจฟเฉฑเจš 50 เจกเฉ‡เจŸเจพเจฌเฉ‡เจธ เจนเจจ - เจ‡เฉฑเจ• เจชเฉเจฐเฉ‹เจœเฉˆเจ•เจŸ เจฆเฉ€เจ†เจ‚ เจ‰เจฆเจพเจนเจฐเจฃเจพเจ‚, เจ•เฉเจฐเจฎเจตเจพเจฐ, เจ‰เจนเจจเจพเจ‚ เจ•เฉ‹เจฒ เจ‡เฉฑเจ•เฉ‹ เจขเจพเจ‚เจšเจพ เจนเฉˆ (เจฒเจ—เจญเจ— เจนเจฐ เจฅเจพเจ‚, เจฎเฉเจ†-เจนเจพ-เจนเจพ), เจœเจฟเจธเจฆเจพ เจฎเจคเจฒเจฌ เจนเฉˆ เจ•เจฟ เจนเจฐเฉ‡เจ• เจ•เฉ‹เจฒ เจ‡เฉฑเจ• เจ†เจฐเจกเจฐ เจŸเฉ‡เจฌเจฒ เจนเฉˆ (เจ–เฉเจธเจผเจ•เจฟเจธเจฎเจคเฉ€ เจจเจพเจฒ, เจ‰เจธ เจจเจพเจฒ เจ‡เฉฑเจ• เจธเจพเจฐเจฃเฉ€) เจจเจพเจฎ เจจเฉ‚เฉฐ เจ•เจฟเจธเฉ‡ เจตเฉ€ เจ•เจพเจฐเฉ‹เจฌเจพเจฐ เจตเจฟเฉฑเจš เจงเฉฑเจ•เจฟเจ† เจœเจพ เจธเจ•เจฆเจพ เจนเฉˆ) เจ…เจธเฉ€เจ‚ เจธเฉ‡เจตเจพ เจ–เฉ‡เจคเจฐเจพเจ‚ (เจธเจฐเฉ‹เจค เจธเจฐเจตเจฐ, เจธเจฐเฉ‹เจค เจกเฉ‡เจŸเจพเจฌเฉ‡เจธ, ETL เจŸเจพเจธเจ• เจ†เจˆเจกเฉ€) เจจเฉ‚เฉฐ เจœเฉ‹เฉœ เจ•เฉ‡ เจกเฉ‡เจŸเจพ เจฒเฉˆเจ‚เจฆเฉ‡ เจนเจพเจ‚ เจ…เจคเฉ‡ เจ‰เจนเจจเจพเจ‚ เจจเฉ‚เฉฐ เจจเจฟเจฐเจชเฉฑเจ–เจคเจพ เจจเจพเจฒ เจตเจฐเจŸเฉ€เจ•เจพ เจตเจฟเฉฑเจš เจธเฉเฉฑเจŸ เจฆเจฟเฉฐเจฆเฉ‡ เจนเจพเจ‚เฅค

เจšเฉฑเจฒเฉ€เจ!

เจฎเฉเฉฑเจ– เจนเจฟเฉฑเจธเจพ, เจตเจฟเจนเจพเจฐเจ• (เจ…เจคเฉ‡ เจฅเฉ‹เฉœเจพ เจธเจฟเจงเจพเจ‚เจคเจ•)

เจ…เจธเฉ€เจ‚ (เจ…เจคเฉ‡ เจคเฉเจธเฉ€เจ‚) เจ•เจฟเจ‰เจ‚ เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚

เจœเจฆเฉ‹เจ‚ เจฐเฉเฉฑเจ– เจตเฉฑเจกเฉ‡ เจธเจจ เจคเฉ‡ เจฎเฉˆเจ‚ เจธเจพเจฆเจพ เจธเฉ€ SQL-เจ‡เฉฑเจ• เจฐเฉ‚เจธเฉ€ เจฐเจฟเจŸเฉ‡เจฒ เจตเจฟเฉฑเจš, เจ…เจธเฉ€เจ‚ เจธเจพเจกเฉ‡ เจฒเจˆ เจ‰เจชเจฒเจฌเจง เจฆเฉ‹ เจŸเฉ‚เจฒเจธ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจ•เฉ‡ ETL เจชเฉเจฐเจ•เจฟเจฐเจฟเจ†เจตเจพเจ‚ เจ‰เจฐเจซเจผ เจกเจพเจŸเจพ เจชเฉเจฐเจตเจพเจน เจจเจพเจฒ เจ˜เจชเจฒเจพ เจ•เฉ€เจคเจพ:

  • เจธเฉ‚เจšเจจเจพ เจธเจผเจ•เจคเฉ€ เจ•เฉ‡เจ‚เจฆเจฐ - เจ‡เฉฑเจ• เจฌเจนเฉเจค เจนเฉ€ เจซเฉˆเจฒเจฃ เจตเจพเจฒเจพ เจธเจฟเจธเจŸเจฎ, เจฌเจนเฉเจค เจฒเจพเจญเจ•เจพเจฐเฉ€, เจ‡เจธเจฆเฉ‡ เจ†เจชเจฃเฉ‡ เจนเจพเจฐเจกเจตเฉ‡เจ…เจฐ เจฆเฉ‡ เจจเจพเจฒ, เจ‡เจธเจฆเจพ เจ†เจชเจฃเจพ เจธเฉฐเจธเจ•เจฐเจฃเฅค เจฎเฉˆเจ‚ เจ‡เจธเจฆเฉ€ เจธเจฎเจฐเฉฑเจฅเจพ เจฆเจพ 1% เจชเฉเจฐเจฎเจพเจคเจฎเจพ เจตเจฐเจœเจฟเจ† เจนเฉˆเฅค เจ•เจฟเจ‰เจ‚? เจ–เฉˆเจฐ, เจธเจญ เจคเฉ‹เจ‚ เจชเจนเจฟเจฒเจพเจ‚, เจ‡เจน เจ‡เฉฐเจŸเจฐเจซเฉ‡เจธ, เจ•เจฟเจคเฉ‡ 380 เจฆเฉ‡ เจฆเจนเจพเจ•เฉ‡ เจคเฉ‹เจ‚, เจฎเจพเจจเจธเจฟเจ• เจคเฉŒเจฐ 'เจคเฉ‡ เจธเจพเจกเฉ‡ 'เจคเฉ‡ เจฆเจฌเจพเจ… เจชเจพเจ‰เจ‚เจฆเจพ เจนเฉˆ. เจฆเฉ‚เจœเจพ, เจ‡เจน เจ•เฉฐเจŸเจฐเฉˆเจชเจธเจผเจจ เจฌเจนเฉเจค เจนเฉ€ เจธเจผเจพเจจเจฆเจพเจฐ เจชเฉเจฐเจ•เจฟเจฐเจฟเจ†เจตเจพเจ‚, เจ—เฉเฉฑเจธเฉ‡ เจตเจพเจฒเฉ‡ เจนเจฟเฉฑเจธเฉ‡ เจฆเฉ€ เจฎเฉเฉœ เจตเจฐเจคเฉ‹เจ‚ เจ…เจคเฉ‡ เจนเฉ‹เจฐ เจฌเจนเฉเจค เจฎเจนเฉฑเจคเจตเจชเฉ‚เจฐเจจ-เจ‰เจฆเจฎ-เจšเจพเจฒเจพเจ‚ เจฒเจˆ เจคเจฟเจ†เจฐ เจ•เฉ€เจคเจพ เจ—เจฟเจ† เจนเฉˆเฅค เจเจ…เจฐเจฌเฉฑเจธ เจXNUMX / เจธเจพเจฒ เจฆเฉ‡ เจตเจฟเฉฐเจ— เจตเจพเจ‚เจ— เจ‡เจธเจฆเฉ€ เจ•เฉ€เจฎเจค เจ•เฉ€ เจนเฉˆ, เจ…เจธเฉ€เจ‚ เจ•เฉเจ เจจเจนเฉ€เจ‚ เจ•เจนเจพเจ‚เจ—เฉ‡เฅค

    เจธเจพเจตเจงเจพเจจ, เจ‡เฉฑเจ• เจธเจ•เฉเจฐเฉ€เจจเจธเจผเฉŒเจŸ 30 เจธเจพเจฒ เจคเฉ‹เจ‚ เจ˜เฉฑเจŸ เจ‰เจฎเจฐ เจฆเฉ‡ เจฒเฉ‹เจ•เจพเจ‚ เจจเฉ‚เฉฐ เจฅเฉ‹เฉœเจพ เจœเจฟเจนเจพ เจจเฉเจ•เจธเจพเจจ เจชเจนเฉเฉฐเจšเจพ เจธเจ•เจฆเจพ เจนเฉˆ

    เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

  • SQL เจธเจฐเจตเจฐ เจเจ•เฉ€เจ•เจฐเจฃ เจธเจฐเจตเจฐ - เจ…เจธเฉ€เจ‚ เจ†เจชเจฃเฉ‡ เจ‡เฉฐเจŸเจฐเจพ-เจชเฉเจฐเฉ‹เจœเฉˆเจ•เจŸ เจชเฉเจฐเจตเจพเจน เจตเจฟเฉฑเจš เจ‡เจธ เจ•เจพเจฎเจฐเฉ‡เจก เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เฉ€เจคเฉ€เฅค เจ–เฉˆเจฐ, เจตเจพเจธเจคเจต เจตเจฟเฉฑเจš: เจ…เจธเฉ€เจ‚ เจชเจนเจฟเจฒเจพเจ‚ เจนเฉ€ SQL เจธเจฐเจตเจฐ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚, เจ…เจคเฉ‡ เจ‡เจธเจฆเฉ‡ ETL เจธเจพเจงเจจเจพเจ‚ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจจเจพ เจ•เจฐเจจเจพ เจ•เจฟเจธเฉ‡ เจคเจฐเฉเจนเจพเจ‚ เจ—เฉˆเจฐเจตเจพเจœเจฌ เจนเฉ‹เจตเฉ‡เจ—เจพ. เจ‡เจธ เจตเจฟเฉฑเจš เจธเจญ เจ•เฉเจ เจตเจงเฉ€เจ† เจนเฉˆ: เจฆเฉ‹เจตเฉ‡เจ‚ เจ‡เฉฐเจŸเจฐเจซเฉ‡เจธ เจธเฉเฉฐเจฆเจฐ เจนเจจ, เจ…เจคเฉ‡ เจชเฉเจฐเจ—เจคเฉ€ เจฐเจฟเจชเฉ‹เจฐเจŸเจพเจ‚ ... เจชเจฐ เจ‡เจน เจ‡เจธ เจฒเจˆ เจจเจนเฉ€เจ‚ เจนเฉˆ เจ•เจฟ เจ…เจธเฉ€เจ‚ เจธเจพเจซเจŸเจตเฉ‡เจ…เจฐ เจ‰เจคเจชเจพเจฆเจพเจ‚ เจจเฉ‚เฉฐ เจชเจฟเจ†เจฐ เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚, เจ“, เจ‡เจธเจฆเฉ‡ เจฒเจˆ เจจเจนเฉ€เจ‚. เจ‡เจธเจฆเจพ เจธเฉฐเจธเจ•เจฐเจฃ dtsx (เจœเฉ‹ เจ•เจฟ เจจเฉ‹เจกเจธ เจจเฉ‚เฉฐ เจธเฉ‡เจต เจ•เจฐเจจ 'เจคเฉ‡ เจฌเจฆเจฒเจฟเจ† เจนเฉ‹เจ‡เจ† XML เจนเฉˆ) เจ…เจธเฉ€เจ‚ เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเจพเจ‚, เจชเจฐ เจ‡เจธ เจฆเจพ เจ•เฉ€ เจฎเจคเจฒเจฌ เจนเฉˆ? เจ‡เฉฑเจ• เจŸเจพเจธเจ• เจชเฉˆเจ•เฉ‡เจœ เจฌเจฃเจพเจ‰เจฃ เจฌเจพเจฐเฉ‡ เจ•เฉ€ เจนเฉˆ เจœเฉ‹ เจ‡เฉฑเจ• เจธเจฐเจตเจฐ เจคเฉ‹เจ‚ เจฆเฉ‚เจœเฉ‡ เจธเจฐเจตเจฐ เจตเจฟเฉฑเจš เจธเฉˆเจ‚เจ•เฉœเฉ‡ เจŸเฉ‡เจฌเจฒเจพเจ‚ เจจเฉ‚เฉฐ เจ–เจฟเฉฑเจšเฉ‡เจ—เจพ? เจนเจพเจ‚, เจ•เฉ€ เจธเฉŒ, เจคเฉเจนเจพเจกเฉ€ เจ‡เฉฐเจกเฉˆเจ•เจธ เจซเจฟเฉฐเจ—เจฐ เจตเฉ€เจน เจŸเฉเจ•เฉœเจฟเจ†เจ‚ เจคเฉ‹เจ‚ เจกเจฟเฉฑเจ— เจœเจพเจตเฉ‡เจ—เฉ€, เจฎเจพเจŠเจธ เจฌเจŸเจจ 'เจคเฉ‡ เจ•เจฒเจฟเฉฑเจ• เจ•เจฐเฉ‹. เจชเจฐ เจ‡เจน เจฏเจ•เฉ€เจจเฉ€ เจคเฉŒเจฐ 'เจคเฉ‡ เจตเจงเฉ‡เจฐเฉ‡ เจซเฉˆเจธเจผเจจเฉ‡เจฌเจฒ เจฆเจฟเจ–เจพเจˆ เจฆเจฟเฉฐเจฆเจพ เจนเฉˆ:

    เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

เจ…เจธเฉ€เจ‚ เจจเจฟเจธเจผเจšเจค เจคเฉŒเจฐ 'เจคเฉ‡ เจฌเจพเจนเจฐ เจฆเฉ‡ เจคเจฐเฉ€เจ•เจฟเจ†เจ‚ เจฆเฉ€ เจญเจพเจฒ เจ•เฉ€เจคเฉ€. เจ•เฉ‡เจธ เจตเฉ€ เจฒเจ—เจญเจ— เจฒเจ—เจญเจ— เจ‡เฉฑเจ• เจธเจตเฉˆ-เจฒเจฟเจ–เจค SSIS เจชเฉˆเจ•เฉ‡เจœ เจœเจจเจฐเฉ‡เจŸเจฐ เจคเฉ‡ เจ†เจ‡เจ† ...

โ€ฆเจ…เจคเฉ‡ เจซเจฟเจฐ เจฎเฉˆเจจเฉ‚เฉฐ เจ‡เฉฑเจ• เจจเจตเฉ€เจ‚ เจจเฉŒเจ•เจฐเฉ€ เจฎเจฟเจฒเฉ€เฅค เจ…เจคเฉ‡ เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹ เจจเฉ‡ เจฎเฉˆเจจเฉ‚เฉฐ เจ‡เจธ 'เจคเฉ‡ เจชเจ›เจพเฉœ เจฆเจฟเฉฑเจคเจพ.

เจœเจฆเฉ‹เจ‚ เจฎเฉˆเจจเฉ‚เฉฐ เจชเจคเจพ เจฒเฉฑเจ—เจพ เจ•เจฟ ETL เจชเฉเจฐเจ•เจฟเจฐเจฟเจ† เจฆเฉ‡ เจตเจฐเจฃเจจ เจธเจงเจพเจฐเจจ เจชเจพเจˆเจฅเจจ เจ•เฉ‹เจก เจนเจจ, เจคเจพเจ‚ เจฎเฉˆเจ‚ เจ–เฉเจธเจผเฉ€ เจฒเจˆ เจจเฉฑเจšเจฟเจ† เจจเจนเฉ€เจ‚ เจธเฉ€. เจ‡เจธ เจคเจฐเฉเจนเจพเจ‚ เจกเฉ‡เจŸเจพ เจธเจŸเฉเจฐเฉ€เจฎเจพเจ‚ เจฆเจพ เจธเฉฐเจธเจ•เจฐเจฃ เจ…เจคเฉ‡ เจตเฉฑเจ–เจฐเจพ เจ•เฉ€เจคเจพ เจ—เจฟเจ† เจธเฉ€, เจ…เจคเฉ‡ เจธเฉˆเจ‚เจ•เฉœเฉ‡ เจกเฉ‡เจŸเจพเจฌเฉ‡เจธ เจคเฉ‹เจ‚ เจ‡เฉฑเจ• เจธเจฟเฉฐเจ—เจฒ เจขเจพเจ‚เจšเฉ‡ เจฆเฉ‡ เจจเจพเจฒ เจŸเฉ‡เจฌเจฒ เจจเฉ‚เฉฐ เจ‡เฉฑเจ• เจŸเฉ€เจšเฉ‡ เจตเจฟเฉฑเจš เจชเจพเจ‰เจฃเจพ เจกเฉ‡เจข เจœเจพเจ‚ เจฆเฉ‹ 13 โ€เจธเจ•เฉเจฐเฉ€เจจเจพเจ‚ เจตเจฟเฉฑเจš เจชเจพเจˆเจฅเจจ เจ•เฉ‹เจก เจฆเจพ เจฎเจพเจฎเจฒเจพ เจฌเจฃ เจ—เจฟเจ† เจธเฉ€เฅค

เจ•เจฒเฉฑเจธเจŸเจฐ เจจเฉ‚เฉฐ เจ‡เจ•เฉฑเจ เจพ เจ•เจฐเจจเจพ

เจ†เจ“ เจ‡เฉฑเจ• เจชเฉ‚เจฐเฉ€ เจคเจฐเฉเจนเจพเจ‚ เจ•เจฟเฉฐเจกเจฐเจ—เจพเจฐเจŸเจจ เจฆเจพ เจชเฉเจฐเจฌเฉฐเจง เจจเจพ เจ•เจฐเฉ€เจ, เจ…เจคเฉ‡ เจ‡เฉฑเจฅเฉ‡ เจชเฉ‚เจฐเฉ€ เจคเจฐเฉเจนเจพเจ‚ เจธเจชเฉฑเจธเจผเจŸ เจšเฉ€เจœเจผเจพเจ‚ เจฌเจพเจฐเฉ‡ เจ—เฉฑเจฒ เจจเจพ เจ•เจฐเฉ€เจ, เจœเจฟเจตเฉ‡เจ‚ เจ•เจฟ เจเจ…เจฐเจซเจฒเฉ‹ เจจเฉ‚เฉฐ เจธเจฅเจพเจชเจฟเจค เจ•เจฐเจจเจพ, เจคเฉเจนเจพเจกเฉ‡ เจšเฉเจฃเฉ‡ เจนเฉ‹เจ เจกเฉ‡เจŸเจพเจฌเฉ‡เจธ, เจธเฉˆเจฒเจฐเฉ€ เจ…เจคเฉ‡ เจกเฉŒเจ•เจธ เจตเจฟเฉฑเจš เจตเจฐเจฃเจฟเจค เจนเฉ‹เจฐ เจ•เฉ‡เจธเฅค

เจคเจพเจ‚ เจœเฉ‹ เจ…เจธเฉ€เจ‚ เจคเฉเจฐเฉฐเจค เจชเฉเจฐเจฏเฉ‹เจ— เจธเจผเฉเจฐเฉ‚ เจ•เจฐ เจธเจ•เฉ€เจ, เจฎเฉˆเจ‚ เจธเจ•เฉˆเจš เจ•เฉ€เจคเจพ docker-compose.yml เจœเจฟเจธ เจตเจฟเฉฑเจš:

  • เจฆเฉ‡ เจ…เจธเจฒ เจตเจฟเฉฑเจš เจ‰เจ เจพเจ‰เจฃ เจ•เจฐเฉ€เจ airflow: เจธเจผเจกเจฟเจŠเจฒเจฐ, เจตเฉˆเจฌเจธเจฐเจตเจฐเฅค เจธเฉˆเจฒเจฐเฉ€ เจฆเฉ‡ เจ•เฉฐเจฎเจพเจ‚ เจฆเฉ€ เจจเจฟเจ—เจฐเจพเจจเฉ€ เจ•เจฐเจจ เจฒเจˆ เจซเจฒเจพเจตเจฐ เจตเฉ€ เจ‰เฉฑเจฅเฉ‡ เจ˜เฉเฉฐเจฎ เจฐเจฟเจนเจพ เจนเฉ‹เจตเฉ‡เจ—เจพ (เจ•เจฟเจ‰เจ‚เจ•เจฟ เจ‡เจน เจชเจนเจฟเจฒเจพเจ‚ เจนเฉ€ เจ‡เจธ เจตเจฟเฉฑเจš เจงเฉฑเจ•เจฟเจ† เจœเจพ เจšเฉเฉฑเจ•เจพ เจนเฉˆ apache/airflow:1.10.10-python3.7, เจชเจฐ เจธเจพเจจเฉ‚เฉฐ เจ•เฉ‹เจˆ เจ‡เจคเจฐเจพเจœเจผ เจจเจนเฉ€เจ‚ เจนเฉˆ)
  • PostgreSQL, เจœเจฟเจธ เจตเจฟเฉฑเจš เจเจ…เจฐเจซเจฒเฉ‹ เจ†เจชเจฃเฉ€ เจธเฉ‡เจตเจพ เจœเจพเจฃเจ•เจพเจฐเฉ€ (เจธเจผเจกเจฟเจŠเจฒเจฐ เจกเฉ‡เจŸเจพ, เจเจ—เจœเจผเฉ€เจ•เจฟเจŠเจธเจผเจจ เจธเจŸเฉˆเจŸเจฟเจธเจŸเจฟเจ•เจธ, เจ†เจฆเจฟ) เจฒเจฟเจ–เฉ‡เจ—เจพ, เจ…เจคเฉ‡ เจธเฉˆเจฒเจฐเฉ€ เจชเฉ‚เจฐเฉ‡ เจ•เฉ€เจคเฉ‡ เจ•เฉฐเจฎเจพเจ‚ เจจเฉ‚เฉฐ เจšเจฟเฉฐเจจเฉเจนเจฟเจค เจ•เจฐเฉ‡เจ—เฉ€;
  • เจฐเฉ‡เจกเจฟเจธ, เจœเฉ‹ เจธเฉˆเจฒเจฐเฉ€ เจฒเจˆ เจŸเจพเจธเจ• เจฌเฉเจฐเฉ‹เจ•เจฐ เจตเจœเฉ‹เจ‚ เจ•เฉฐเจฎ เจ•เจฐเฉ‡เจ—เจพ;
  • เจธเฉˆเจฒเจฐเฉ€ เจตเจฐเจ•เจฐ, เจœเฉ‹ เจ•เจฟ เจ•เฉฐเจฎเจพเจ‚ เจฆเฉ‡ เจธเจฟเฉฑเจงเฉ‡ เจเจ—เจœเจผเฉ€เจ•เจฟเจŠเจธเจผเจจ เจตเจฟเฉฑเจš เจฒเฉฑเจ—เฉ‡เจ—เจพเฅค
  • เจซเฉ‹เจฒเจกเจฐ เจจเฉ‚เฉฐ ./dags เจ…เจธเฉ€เจ‚ เจ†เจชเจฃเฉ€เจ†เจ‚ เจซเจพเจˆเจฒเจพเจ‚ เจจเฉ‚เฉฐ เจกเฉˆเจ—เจธ เจฆเฉ‡ เจตเจฐเจฃเจจ เจจเจพเจฒ เจœเฉ‹เฉœเจพเจ‚เจ—เฉ‡เฅค เจ‰เจนเจจเจพเจ‚ เจจเฉ‚เฉฐ เจ‰เฉฑเจกเจฃ 'เจคเฉ‡ เจšเฉเฉฑเจ•เจฟเจ† เจœเจพเจตเฉ‡เจ—เจพ, เจ‡เจธ เจฒเจˆ เจนเจฐ เจ›เจฟเฉฑเจ• เจฆเฉ‡ เจฌเจพเจ…เจฆ เจชเฉ‚เจฐเฉ‡ เจธเจŸเฉˆเจ• เจจเฉ‚เฉฐ เจœเฉเจ—เจฒ เจ•เจฐเจจ เจฆเฉ€ เจ•เฉ‹เจˆ เจฒเฉ‹เฉœ เจจเจนเฉ€เจ‚ เจนเฉˆเฅค

เจ•เฉเจ เจฅเจพเจตเจพเจ‚ 'เจคเฉ‡, เจ‰เจฆเจพเจนเจฐเจจเจพเจ‚ เจตเจฟเฉฑเจš เจ•เฉ‹เจก เจชเฉ‚เจฐเฉ€ เจคเจฐเฉเจนเจพเจ‚ เจจเจนเฉ€เจ‚ เจฆเจฟเจ–เจพเจ‡เจ† เจ—เจฟเจ† เจนเฉˆ (เจคเจพเจ‚ เจ•เจฟ เจŸเฉˆเจ•เจธเจŸ เจตเจฟเฉฑเจš เจ—เฉœเจฌเฉœ เจจเจพ เจนเฉ‹เจตเฉ‡), เจชเจฐ เจ•เจฟเจคเฉ‡ เจ‡เจน เจชเฉเจฐเจ•เจฟเจฐเจฟเจ† เจตเจฟเฉฑเจš เจธเฉ‹เจงเจฟเจ† เจ—เจฟเจ† เจนเฉˆเฅค เจธเฉฐเจชเฉ‚เจฐเจจ เจ•เจพเจฐเจœ เจ•เฉ‹เจก เจ‰เจฆเจพเจนเจฐเจจเจพเจ‚ เจฐเจฟเจชเฉ‹เจœเจผเจŸเจฐเฉ€ เจตเจฟเฉฑเจš เจฒเฉฑเจญเฉ€เจ†เจ‚ เจœเจพ เจธเจ•เจฆเฉ€เจ†เจ‚ เจนเจจ https://github.com/dm-logv/airflow-tutorial.

เจกเฉŒเจ•เจฐ-เจ•เฉฐเจชเฉ‹เจœเจผ.เจ†เจˆ.เจเจฎ.เจเจฒ.

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

เจŸเจฟเฉฑเจชเจฃเฉ€:

  • เจฐเจšเจจเจพ เจฆเฉ€ เจ…เจธเฉˆเจ‚เจฌเจฒเฉ€ เจตเจฟเฉฑเจš, เจฎเฉˆเจ‚ เจ•เจพเจซเจผเฉ€ เจนเฉฑเจฆ เจคเฉฑเจ• เจœเจพเจฃเฉ‡-เจชเจ›เจพเจฃเฉ‡ เจšเจฟเฉฑเจคเจฐ 'เจคเฉ‡ เจญเจฐเฉ‹เจธเจพ เจ•เฉ€เจคเจพ puckel/docker- airflow - เจ‡เจธเจฆเฉ€ เจœเจพเจ‚เจš เจ•เจฐเจจเจพ เจฏเจ•เฉ€เจจเฉ€ เจฌเจฃเจพเจ“เฅค เจนเฉ‹ เจธเจ•เจฆเจพ เจนเฉˆ เจ•เจฟ เจคเฉเจนเจพเจจเฉ‚เฉฐ เจ†เจชเจฃเฉ€ เจœเจผเจฟเฉฐเจฆเจ—เฉ€ เจตเจฟเฉฑเจš เจนเฉ‹เจฐ เจ•เจฟเจธเฉ‡ เจšเฉ€เจœเจผ เจฆเฉ€ เจฒเฉ‹เฉœ เจจเจพ เจนเฉ‹เจตเฉ‡เฅค
  • เจธเจพเจฐเฉ€เจ†เจ‚ เจเจ…เจฐเจซเจฒเฉ‹ เจธเฉˆเจŸเจฟเฉฐเจ—เจพเจ‚ เจจเจพ เจธเจฟเจฐเจซเจผ เจฐเจพเจนเฉ€เจ‚ เจ‰เจชเจฒเจฌเจง เจนเจจ airflow.cfg, เจชเจฐ เจตเจพเจคเจพเจตเจฐเจฃ เจตเฉ‡เจฐเฉ€เจเจฌเจฒ (เจกเจฟเจตเฉˆเจฒเจชเจฐเจพเจ‚ เจฆเจพ เจงเฉฐเจจเจตเจพเจฆ) เจฆเฉเจ†เจฐเจพ เจตเฉ€, เจœเจฟเจธเจฆเจพ เจฎเฉˆเจ‚ เจ—เจฒเจค เจคเจฐเฉ€เจ•เฉ‡ เจจเจพเจฒ เจซเจพเจ‡เจฆเจพ เจ‰เจ เจพเจ‡เจ†เฅค
  • เจ•เฉเจฆเจฐเจคเฉ€ เจคเฉŒเจฐ 'เจคเฉ‡, เจ‡เจน เจ‰เจคเจชเจพเจฆเจจ เจฒเจˆ เจคเจฟเจ†เจฐ เจจเจนเฉ€เจ‚ เจนเฉˆ: เจฎเฉˆเจ‚ เจœเจพเจฃเจฌเฉเฉฑเจ เจ•เฉ‡ เจ•เฉฐเจŸเฉ‡เจจเจฐเจพเจ‚ 'เจคเฉ‡ เจฆเจฟเจฒ เจฆเฉ€ เจงเฉœเจ•เจฃ เจจเจนเฉ€เจ‚ เจชเจพเจˆ, เจฎเฉˆเจ‚ เจธเฉเจฐเฉฑเจ–เจฟเจ† เจจเจพเจฒ เจชเจฐเฉ‡เจธเจผเจพเจจ เจจเจนเฉ€เจ‚ เจ•เฉ€เจคเจพ. เจชเจฐ เจฎเฉˆเจ‚ เจธเจพเจกเฉ‡ เจชเฉเจฐเจฏเฉ‹เจ— เจ•เจฐเจจ เจตเจพเจฒเจฟเจ†เจ‚ เจฒเจˆ เจ˜เฉฑเจŸเฉ‹-เจ˜เฉฑเจŸ เจขเฉเจ•เจตเจพเจ‚ เจ•เฉ€เจคเจพเฅค
  • เจจเฉ‹เจŸ เจ•เจฐเฉ‹:
    • เจกเฉˆเจ— เจซเฉ‹เจฒเจกเจฐ เจธเจผเจกเจฟเจŠเจฒเจฐ เจ…เจคเฉ‡ เจตเจฐเจ•เจฐเจพเจ‚ เจฆเฉ‹เจตเจพเจ‚ เจฒเจˆ เจชเจนเฉเฉฐเจšเจฏเฉ‹เจ— เจนเฉ‹เจฃเจพ เจšเจพเจนเฉ€เจฆเจพ เจนเฉˆเฅค
    • เจ‡เจนเฉ€ เจธเจพเจฐเฉ€เจ†เจ‚ เจคเฉ€เจœเฉ€-เจงเจฟเจฐ เจฒเจพเจ‡เจฌเฉเจฐเฉ‡เจฐเฉ€เจ†เจ‚ 'เจคเฉ‡ เจฒเจพเจ—เฉ‚ เจนเฉเฉฐเจฆเจพ เจนเฉˆ - เจ‰เจน เจธเจพเจฐเฉ€เจ†เจ‚ เจฎเจธเจผเฉ€เจจเจพเจ‚ 'เจคเฉ‡ เจ‡เฉฑเจ• เจธเจผเจกเจฟเจŠเจฒเจฐ เจ…เจคเฉ‡ เจตเจฐเจ•เจฐเจพเจ‚ เจจเจพเจฒ เจธเจฅเจพเจชเจฟเจค เจนเฉ‹เจฃเฉ€เจ†เจ‚ เจšเจพเจนเฉ€เจฆเฉ€เจ†เจ‚ เจนเจจเฅค

เจ–เฉˆเจฐ, เจนเฉเจฃ เจ‡เจน เจธเจงเจพเจฐเจจ เจนเฉˆ:

$ docker-compose up --scale worker=3

เจธเจญ เจ•เฉเจ เจตเจงเจฃ เจคเฉ‹เจ‚ เจฌเจพเจ…เจฆ, เจคเฉเจธเฉ€เจ‚ เจตเฉˆเฉฑเจฌ เจ‡เฉฐเจŸเจฐเจซเฉ‡เจธเจพเจ‚ เจจเฉ‚เฉฐ เจฆเฉ‡เจ– เจธเจ•เจฆเฉ‡ เจนเฉ‹:

เจฌเฉ‡เจธเจฟเจ• เจงเจพเจฐเจจเจพ

เจœเฉ‡ เจคเฉเจธเฉ€เจ‚ เจ‡เจนเจจเจพเจ‚ เจธเจพเจฐเฉ‡ "เจกเฉˆเจ—เจœเจผ" เจตเจฟเฉฑเจš เจ•เฉเจ เจจเจนเฉ€เจ‚ เจธเจฎเจเจฟเจ†, เจคเจพเจ‚ เจ‡เฉฑเจฅเฉ‡ เจ‡เฉฑเจ• เจ›เฉ‹เจŸเจพ เจธเจผเจฌเจฆเจ•เฉ‹เจธเจผ เจนเฉˆ:

  • เจธเฉˆเจกเจฟเจŠเจฒเจฐ - เจเจ…เจฐเจซเจฒเฉ‹ เจตเจฟเฉฑเจš เจธเจญ เจคเฉ‹เจ‚ เจฎเจนเฉฑเจคเจตเจชเฉ‚เจฐเจจ เจ…เฉฐเจ•เจฒ, เจ‡เจน เจจเจฟเจฏเฉฐเจคเจฐเจฟเจค เจ•เจฐเจจเจพ เจ•เจฟ เจฐเฉ‹เจฌเฉ‹เจŸ เจธเจ–เจค เจฎเจฟเจนเจจเจค เจ•เจฐเจฆเฉ‡ เจนเจจ, เจจเจพ เจ•เจฟ เจ‡เฉฑเจ• เจตเจฟเจ…เจ•เจคเฉ€: เจธเจฎเจพเจ‚-เจธเฉ‚เจšเฉ€ เจฆเฉ€ เจจเจฟเจ—เจฐเจพเจจเฉ€ เจ•เจฐเจฆเจพ เจนเฉˆ, เจกเฉˆเจ—เจœเจผ เจจเฉ‚เฉฐ เจ…เจชเจกเฉ‡เจŸ เจ•เจฐเจฆเจพ เจนเฉˆ, เจ•เฉฐเจฎ เจธเจผเฉเจฐเฉ‚ เจ•เจฐเจฆเจพ เจนเฉˆเฅค

    เจ†เจฎ เจคเฉŒเจฐ 'เจคเฉ‡, เจชเฉเจฐเจพเจฃเฉ‡ เจธเฉฐเจธเจ•เจฐเจฃเจพเจ‚ เจตเจฟเฉฑเจš, เจ‰เจธเจจเฉ‚เฉฐ เจฏเจพเจฆเจฆเจพเจธเจผเจค เจจเจพเจฒ เจธเจฎเฉฑเจธเจฟเจ†เจตเจพเจ‚ เจธเจจ (เจจเจนเฉ€เจ‚, เจเจฎเจจเฉ‡เจธเจผเฉ€เจ† เจจเจนเฉ€เจ‚, เจชเจฐ เจฒเฉ€เจ•) เจ…เจคเฉ‡ เจตเจฟเจฐเจพเจธเจคเฉ€ เจชเฉˆเจฐเจพเจฎเฉ€เจŸเจฐ เจธเฉฐเจฐเจšเจจเจพ เจตเจฟเฉฑเจš เจตเฉ€ เจฐเจฟเจนเจพเฅค run_duration - เจ‡เจธเจฆเจพ เจฎเฉเฉœ-เจšเจพเจฒเฉ‚ เจ…เฉฐเจคเจฐเจพเจฒเฅค เจชเจฐ เจนเฉเจฃ เจธเจญ เจ•เฉเจ เจ เฉ€เจ• เจนเฉˆเฅค

  • เจกเฉ€.เจ.เจœเฉ€. (เจ‰เจฐเจซเจผ "เจกเฉˆเจ—") - "เจกเจพเจ‡เจฐเฉˆเจ•เจŸเจก เจเจธเฉ€เจ•เจฒเจฟเจ• เจ—เฉเจฐเจพเจซ", เจชเจฐ เจ…เจœเจฟเจนเฉ€ เจชเจฐเจฟเจญเจพเจธเจผเจพ เจ•เฉเจ เจฒเฉ‹เจ•เจพเจ‚ เจจเฉ‚เฉฐ เจฆเฉฑเจธเฉ‡เจ—เฉ€, เจชเจฐ เจ…เจธเจฒ เจตเจฟเฉฑเจš เจ‡เจน เจ‡เฉฑเจ• เจฆเฉ‚เจœเฉ‡ เจจเจพเจฒ เจ‡เฉฐเจŸเจฐเฉˆเจ•เจŸ เจ•เจฐเจจ เจตเจพเจฒเฉ‡ เจ•เฉฐเจฎเจพเจ‚ เจฒเจˆ เจ‡เฉฑเจ• เจ•เฉฐเจŸเฉ‡เจจเจฐ เจนเฉˆ (เจนเฉ‡เจ เจพเจ‚ เจฆเฉ‡เจ–เฉ‹) เจœเจพเจ‚ SSIS เจตเจฟเฉฑเจš เจชเฉˆเจ•เฉ‡เจœ เจฆเจพ เจเจจเจพเจฒเจพเจ— เจ…เจคเฉ‡ เจ‡เจจเจซเจพเจฐเจฎเฉˆเจŸเจฟเจ•เจพ เจตเจฟเฉฑเจš เจตเจฐเจ•เจซเจฒเฉ‹ .

    เจกเฉˆเจ—เจธ เจคเฉ‹เจ‚ เจ‡เจฒเจพเจตเจพ, เจ…เจœเฉ‡ เจตเฉ€ เจธเจฌเจกเฉˆเจ— เจนเฉ‹ เจธเจ•เจฆเฉ‡ เจนเจจ, เจชเจฐ เจ…เจธเฉ€เจ‚ เจธเฉฐเจญเจพเจตเจค เจคเฉŒเจฐ 'เจคเฉ‡ เจ‰เจจเฉเจนเจพเจ‚ เจคเฉฑเจ• เจจเจนเฉ€เจ‚ เจชเจนเฉเฉฐเจšเจพเจ‚เจ—เฉ‡เฅค

  • เจกเฉ€เจเจœเฉ€ เจฐเจจ - เจธเจผเฉเจฐเฉ‚เจ†เจคเฉ€ เจกเฉˆเจ—, เจœเจฟเจธ เจจเฉ‚เฉฐ เจ†เจชเจฃเจพ เจจเจฟเจฐเจงเจพเจฐเจค เจ•เฉ€เจคเจพ เจ—เจฟเจ† เจนเฉˆ execution_date. เจ‡เฉฑเจ•เฉ‹ เจกเฉˆเจ— เจฆเฉ‡ เจกเฉˆเจ—เจฐเจจ เจธเจฎเจพเจจเจพเจ‚เจคเจฐ เจตเจฟเฉฑเจš เจ•เฉฐเจฎ เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเจจ (เจœเฉ‡ เจคเฉเจธเฉ€เจ‚ เจ†เจชเจฃเฉ‡ เจ•เจพเจฐเจœเจพเจ‚ เจจเฉ‚เฉฐ เจฌเฉ‡เจธเจผเจ•, เจฌเฉ‡เจธเจผเจ•) เจฌเจฃเจพเจ‡เจ† เจนเฉˆเฅค
  • เจ“เจชเจฐเฉ‡เจŸเจฐ เจ‡เฉฑเจ• เจ–เจพเจธ เจ•เจพเจฐเจตเจพเจˆ เจ•เจฐเจจ เจฒเจˆ เจœเจผเจฟเฉฐเจฎเฉ‡เจตเจพเจฐ เจ•เฉ‹เจก เจฆเฉ‡ เจŸเฉเจ•เฉœเฉ‡ เจนเจจเฅค เจ“เจชเจฐเฉ‡เจŸเจฐเจพเจ‚ เจฆเฉ€เจ†เจ‚ เจคเจฟเฉฐเจจ เจ•เจฟเจธเจฎเจพเจ‚ เจนเจจ:
    • เจ•เจพเจฐเจตเจพเจˆเจธเจพเจกเฉ‡ เจฎเจจเจชเจธเฉฐเจฆ เจตเจพเจ‚เจ— PythonOperator, เจœเฉ‹ เจ•เจฟเจธเฉ‡ เจตเฉ€ (เจตเฉˆเจง) เจชเจพเจˆเจฅเจจ เจ•เฉ‹เจก เจจเฉ‚เฉฐ เจšเจฒเจพ เจธเจ•เจฆเจพ เจนเฉˆ;
    • เจŸเฉเจฐเจพเจ‚เจธเจซเจฐ, เจœเฉ‹ เจกเฉ‡เจŸเจพ เจจเฉ‚เฉฐ เจฅเจพเจ‚-เจฅเจพเจ‚ เจŸเฉเจฐเจพเจ‚เจธเจชเฉ‹เจฐเจŸ เจ•เจฐเจฆเฉ‡ เจนเจจ, เจ•เจนเจฟเฉฐเจฆเฉ‡ เจนเจจ, MsSqlToHiveTransfer;
    • เจธเฉ‚เจšเจ• เจฆเฉ‚เจœเฉ‡ เจชเจพเจธเฉ‡, เจ‡เจน เจคเฉเจนเจพเจจเฉ‚เฉฐ เจ•เฉ‹เจˆ เจ˜เจŸเจจเจพ เจตเจพเจชเจฐเจจ เจคเฉฑเจ• เจกเฉˆเจ— เจฆเฉ‡ เจ…เจ—เจฒเฉ‡ เจเจ—เจœเจผเฉ€เจ•เจฟเจŠเจธเจผเจจ เจจเฉ‚เฉฐ เจชเฉเจฐเจคเฉ€เจ•เจฟเจฐเจฟเจ† เจ•เจฐเจจ เจœเจพเจ‚ เจนเฉŒเจฒเฉ€ เจ•เจฐเจจ เจฆเฉ€ เจ‡เจœเจพเจœเจผเจค เจฆเฉ‡เจตเฉ‡เจ—เจพเฅค HttpSensor เจจเจฟเจฐเจงเจพเจฐเจค เจ…เฉฐเจคเจฎ เจฌเจฟเฉฐเจฆเฉ‚ เจจเฉ‚เฉฐ เจ–เจฟเฉฑเจš เจธเจ•เจฆเจพ เจนเฉˆ, เจ…เจคเฉ‡ เจœเจฆเฉ‹เจ‚ เจฒเฉ‹เฉœเฉ€เจ‚เจฆเจพ เจœเจตเจพเจฌ เจ‰เจกเฉ€เจ• เจ•เจฐ เจฐเจฟเจนเจพ เจนเฉ‹เจตเฉ‡, เจŸเฉเจฐเจพเจ‚เจธเจซเจฐ เจธเจผเฉเจฐเฉ‚ เจ•เจฐเฉ‹ GoogleCloudStorageToS3Operator. เจ‡เฉฑเจ• เจ–เฉ‹เจœเฉ€ เจฎเจจ เจชเฉเฉฑเจ›เฉ‡เจ—เจพ: โ€œเจ•เจฟเจ‰เจ‚? เจ†เจ–เจฐเจ•เจพเจฐ, เจคเฉเจธเฉ€เจ‚ เจ†เจชเจฐเฉ‡เจŸเจฐ เจตเจฟเฉฑเจš เจนเฉ€ เจฆเฉเจนเจฐเจพเจ“ เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเฉ‹!โ€ เจ…เจคเฉ‡ เจซเจฟเจฐ, เจฎเฉเจ…เฉฑเจคเจฒ เจ•เฉ€เจคเฉ‡ เจ“เจชเจฐเฉ‡เจŸเจฐเจพเจ‚ เจฆเฉ‡ เจจเจพเจฒ เจ•เจพเจฐเจœเจพเจ‚ เจฆเฉ‡ เจชเฉ‚เจฒ เจจเฉ‚เฉฐ เจฌเฉฐเจฆ เจจเจพ เจ•เจฐเจจ เจฒเจˆ. เจ…เจ—เจฒเฉ€ เจ•เฉ‹เจธเจผเจฟเจธเจผ เจคเฉ‹เจ‚ เจชเจนเจฟเจฒเจพเจ‚ เจธเฉˆเจ‚เจธเจฐ เจธเจผเฉเจฐเฉ‚ เจนเฉเฉฐเจฆเจพ เจนเฉˆ, เจœเจพเจ‚เจš เจ•เจฐเจฆเจพ เจนเฉˆ เจ…เจคเฉ‡ เจฎเจฐ เจœเจพเจ‚เจฆเจพ เจนเฉˆเฅค
  • เจŸเจพเจธเจ• - เจ˜เฉ‹เจธเจผเจฟเจค เจ“เจชเจฐเฉ‡เจŸเจฐ, เจ•เจฟเจธเจฎ เจฆเฉ€ เจชเจฐเจตเจพเจน เจ•เฉ€เจคเฉ‡ เจฌเจฟเจจเจพเจ‚, เจ…เจคเฉ‡ เจกเฉˆเจ— เจจเจพเจฒ เจœเฉเฉœเฉ‡ เจนเฉ‹เจ เจจเฉ‚เฉฐ เจ•เจพเจฐเจœ เจฆเฉ‡ เจฆเจฐเจœเฉ‡ 'เจคเฉ‡ เจคเจฐเฉฑเจ•เฉ€ เจฆเจฟเฉฑเจคเฉ€ เจœเจพเจ‚เจฆเฉ€ เจนเฉˆเฅค
  • เจ•เจพเจฐเจœ เจ‰เจฆเจพเจนเจฐเจจ - เจœเจฆเฉ‹เจ‚ เจ†เจฎ เจฏเฉ‹เจœเจจเจพเจ•เจพเจฐ เจจเฉ‡ เจซเฉˆเจธเจฒเจพ เจ•เฉ€เจคเจพ เจ•เจฟ เจ‡เจน เจ•เฉฐเจฎ เจ•เจฐเจจ เจตเจพเจฒเฉ‡-เจ•เจฐเจฎเจšเจพเจฐเฉ€เจ†เจ‚ เจจเฉ‚เฉฐ เจฒเฉœเจพเจˆ เจตเจฟเฉฑเจš เจญเฉ‡เจœเจฃ เจฆเจพ เจธเจฎเจพเจ‚ เจนเฉˆ (เจธเจนเฉ€ เจฎเฉŒเจ•เฉ‡ 'เจคเฉ‡, เจœเฉ‡ เจ…เจธเฉ€เจ‚ เจตเจฐเจคเจฆเฉ‡ เจนเจพเจ‚ LocalExecutor เจœเจพเจ‚ เจฆเฉ‡ เจฎเจพเจฎเจฒเฉ‡ เจตเจฟเฉฑเจš เจ‡เฉฑเจ• เจฐเจฟเจฎเฉ‹เจŸ เจจเฉ‹เจก เจคเฉฑเจ• CeleryExecutor), เจ‡เจน เจ‰เจนเจจเจพเจ‚ เจจเฉ‚เฉฐ เจ‡เฉฑเจ• เจธเฉฐเจฆเจฐเจญ เจจเจฟเจฐเจงเจพเจฐเจค เจ•เจฐเจฆเจพ เจนเฉˆ (เจ…เจฐเจฅเจพเจค, เจตเฉ‡เจฐเฉ€เจเจฌเจฒเจพเจ‚ เจฆเจพ เจ‡เฉฑเจ• เจธเฉˆเฉฑเจŸ - เจเจ—เจœเจผเฉ€เจ•เจฟเจŠเจธเจผเจจ เจชเฉˆเจฐเจพเจฎเฉ€เจŸเจฐ), เจ•เจฎเจพเจ‚เจก เจœเจพเจ‚ เจชเฉเฉฑเจ›เจ—เจฟเฉฑเจ› เจŸเฉˆเจ‚เจชเจฒเฉ‡เจŸเจธ เจฆเจพ เจตเจฟเจธเจคเจพเจฐ เจ•เจฐเจฆเจพ เจนเฉˆ, เจ…เจคเฉ‡ เจ‰เจนเจจเจพเจ‚ เจจเฉ‚เฉฐ เจชเฉ‚เจฒ เจ•เจฐเจฆเจพ เจนเฉˆเฅค

เจ…เจธเฉ€เจ‚ เจ•เจพเจฐเจœ เจคเจฟเจ†เจฐ เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚

เจชเจนเจฟเจฒเจพเจ‚, เจ†เจ“ เจ†เจชเจฃเฉ‡ เจกเฉŒเจ— เจฆเฉ€ เจ†เจฎ เจธเจ•เฉ€เจฎ เจฆเฉ€ เจฐเฉ‚เจชเจฐเฉ‡เจ–เจพ เจ•เจฐเฉ€เจ, เจ…เจคเฉ‡ เจซเจฟเจฐ เจ…เจธเฉ€เจ‚ เจตเจงเฉ‡เจฐเฉ‡ เจ…เจคเฉ‡ เจนเฉ‹เจฐ เจตเฉ‡เจฐเจตเจฟเจ†เจ‚ เจตเจฟเฉฑเจš เจกเฉเจฌเจ•เฉ€ เจฒเจตเจพเจ‚เจ—เฉ‡, เจ•เจฟเจ‰เจ‚เจ•เจฟ เจ…เจธเฉ€เจ‚ เจ•เฉเจ เจ—เฉˆเจฐ-เจฎเจพเจฎเฉ‚เจฒเฉ€ เจนเฉฑเจฒ เจฒเจพเจ—เฉ‚ เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚.

เจ‡เจธ เจฒเจˆ, เจ‡เจธเจฆเฉ‡ เจธเจฐเจฒ เจฐเฉ‚เจช เจตเจฟเฉฑเจš, เจ…เจœเจฟเจนเจพ เจกเฉˆเจ— เจ‡เจธ เจคเจฐเฉเจนเจพเจ‚ เจฆเจฟเจ–เจพเจˆ เจฆเฉ‡เจตเฉ‡เจ—เจพ:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

เจ†เจ“ เจ‡เจธเจฆเจพ เจชเจคเจพ เจ•เจฐเฉ€เจ:

  • เจชเจนเจฟเจฒเจพเจ‚, เจ…เจธเฉ€เจ‚ เจฒเฉ‹เฉœเฉ€เจ‚เจฆเฉ‡ libs เจ…เจคเฉ‡ เจ†เจฏเจพเจค เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚ เจ•เฉเจ เจนเฉ‹เจฐ;
  • sql_server_ds เจนเฉˆ List[namedtuple[str, str]] เจเจ…เจฐเจซเจฒเฉ‹ เจ•เจจเฉˆเจ•เจธเจผเจจเจพเจ‚ เจฆเฉ‡ เจ•เจจเฉˆเจ•เจธเจผเจจเจพเจ‚ เจฆเฉ‡ เจจเจพเจตเจพเจ‚ เจ…เจคเฉ‡ เจกเฉ‡เจŸเจพเจฌเฉ‡เจธ เจฆเฉ‡ เจจเจพเจฒ เจœเจฟเจจเฉเจนเจพเจ‚ เจคเฉ‹เจ‚ เจ…เจธเฉ€เจ‚ เจ†เจชเจฃเฉ€ เจชเจฒเฉ‡เจŸ เจฒเจตเจพเจ‚เจ—เฉ‡;
  • dag - เจธเจพเจกเฉ‡ เจกเฉ‡เจ— เจฆเฉ€ เจ˜เฉ‹เจธเจผเจฃเจพ, เจœเฉ‹ เจœเจผเจฐเฉ‚เจฐเฉ€ เจคเฉŒเจฐ 'เจคเฉ‡ เจนเฉ‹เจฃเฉ€ เจšเจพเจนเฉ€เจฆเฉ€ เจนเฉˆ globals(), เจจเจนเฉ€เจ‚ เจคเจพเจ‚ เจเจ…เจฐเจซเจฒเฉ‹ เจ‡เจธ เจจเฉ‚เฉฐ เจจเจนเฉ€เจ‚ เจฒเฉฑเจญ เจธเจ•เฉ‡เจ—เจพเฅค เจกเฉฑเจ— เจจเฉ‚เฉฐ เจ‡เจน เจตเฉ€ เจ•เจนเจฟเจฃ เจฆเฉ€ เจฒเฉ‹เฉœ เจนเฉˆ:
    • เจ‰เจธเจฆเจพ เจจเจพเจฎ เจ•เฉ€ เจนเฉˆ orders - เจ‡เจน เจจเจพเจฎ เจซเจฟเจฐ เจตเฉˆเฉฑเจฌ เจ‡เฉฐเจŸเจฐเจซเฉ‡เจธ เจตเจฟเฉฑเจš เจฆเจฟเจ–เจพเจˆ เจฆเฉ‡เจตเฉ‡เจ—เจพ,
    • เจ•เจฟ เจ‰เจน เจ…เฉฑเจ  เจœเฉเจฒเจพเจˆ เจฆเฉ€ เจ…เฉฑเจงเฉ€ เจฐเจพเจค เจคเฉ‹เจ‚ เจ•เฉฐเจฎ เจ•เจฐเฉ‡เจ—เจพ,
    • เจ…เจคเฉ‡ เจ‡เจน เจฒเจ—เจญเจ— เจนเจฐ 6 เจ˜เฉฐเจŸเจฟเจ†เจ‚ เจฌเจพเจ…เจฆ เจšเฉฑเจฒเจฃเจพ เจšเจพเจนเฉ€เจฆเจพ เจนเฉˆ (เจ‡เจธ เจฆเฉ€ เจฌเจœเจพเจ เจ‡เฉฑเจฅเฉ‡ เจธเจ–เจผเจค เจฎเฉเฉฐเจกเจฟเจ†เจ‚ เจฒเจˆ timedelta() เจ‡เจœเจพเจœเจผเจค เจนเฉˆ cron-เจฒเจพเจˆเจจ 0 0 0/6 ? * * *, เจ˜เฉฑเจŸ เจ เฉฐเจกเฉ‡ เจฒเจˆ - เจ‡เฉฑเจ• เจธเจฎเฉ€เจ•เจฐเจจ เจตเจฐเจ—เจพ @daily);
  • workflow() เจฎเฉเฉฑเจ– เจ•เฉฐเจฎ เจ•เจฐเฉ‡เจ—เจพ, เจชเจฐ เจนเฉเจฃ เจจเจนเฉ€เจ‚เฅค เจนเฉเจฃ เจฒเจˆ, เจ…เจธเฉ€เจ‚ เจ†เจชเจฃเฉ‡ เจธเฉฐเจฆเจฐเจญ เจจเฉ‚เฉฐ เจฒเฉŒเจ— เจตเจฟเฉฑเจš เจกเฉฐเจช เจ•เจฐเจพเจ‚เจ—เฉ‡เฅค
  • เจ…เจคเฉ‡ เจนเฉเจฃ เจ•เจพเจฐเจœ เจฌเจฃเจพเจ‰เจฃ เจฆเจพ เจธเจงเจพเจฐเจจ เจœเจพเจฆเฉ‚:
    • เจ…เจธเฉ€เจ‚ เจ†เจชเจฃเฉ‡ เจธเจฐเฉ‹เจคเจพเจ‚ เจฐเจพเจนเฉ€เจ‚ เจšเฉฑเจฒเจฆเฉ‡ เจนเจพเจ‚;
    • เจธเจผเฉเจฐเฉ‚เจ†เจค PythonOperator, เจœเฉ‹ เจธเจพเจกเฉ‡ เจกเจฎเฉ€ เจจเฉ‚เฉฐ เจšเจฒเจพเจเจ—เจพ workflow(). เจ•เฉฐเจฎ เจฆเจพ เจ‡เฉฑเจ• เจตเจฟเจฒเฉฑเจ–เจฃ (เจกเฉˆเจ— เจฆเฉ‡ เจ…เฉฐเจฆเจฐ) เจจเจพเจฎ เจจเจฟเจฐเจงเจพเจฐเจค เจ•เจฐเจจเจพ เจจเจพ เจญเฉเฉฑเจฒเฉ‹ เจ…เจคเฉ‡ เจกเฉˆเจ— เจจเฉ‚เฉฐ เจ†เจชเจฃเฉ‡ เจ†เจช เจฌเฉฐเจจเฉเจนเฉ‹เฅค เจเฉฐเจกเจพ provide_context เจฌเจฆเจฒเฉ‡ เจตเจฟเฉฑเจš, เจซเฉฐเจ•เจธเจผเจจ เจตเจฟเฉฑเจš เจตเจพเจงเฉ‚ เจ†เจฐเจ—เฉ‚เจฎเฉˆเจ‚เจŸ เจชเจพเจตเฉ‡เจ—เจพ, เจœเจฟเจธเจจเฉ‚เฉฐ เจ…เจธเฉ€เจ‚ เจงเจฟเจ†เจจ เจจเจพเจฒ เจตเจฐเจค เจ•เฉ‡ เจ‡เจ•เฉฑเจ เจพ เจ•เจฐเจพเจ‚เจ—เฉ‡ **context.

เจนเฉเจฃ เจฒเจˆ, เจ‡เจน เจธเจญ เจนเฉˆ. เจธเจพเจจเฉ‚เฉฐ เจ•เฉ€ เจฎเจฟเจฒเจฟเจ†:

  • เจตเฉˆเฉฑเจฌ เจ‡เฉฐเจŸเจฐเจซเฉ‡เจธ เจตเจฟเฉฑเจš เจจเจตเจพเจ‚ เจกเฉˆเจ—,
  • เจกเฉ‡เจข เจธเฉŒ เจ•เจพเจฐเจœ เจœเฉ‹ เจธเจฎเจพเจจเจพเจ‚เจคเจฐ เจคเฉŒเจฐ 'เจคเฉ‡ เจฒเจพเจ—เฉ‚ เจ•เฉ€เจคเฉ‡ เจœเจพเจฃเจ—เฉ‡ (เจœเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹, เจธเฉˆเจฒเจฐเฉ€ เจธเฉˆเจŸเจฟเฉฐเจ—เจพเจ‚ เจ…เจคเฉ‡ เจธเจฐเจตเจฐ เจธเจฎเจฐเฉฑเจฅเจพ เจ‡เจธเจฆเฉ€ เจ‡เจœเจพเจœเจผเจค เจฆเจฟเฉฐเจฆเฉ‡ เจนเจจ)เฅค

เจ–เฉˆเจฐ, เจฒเจ—เจญเจ— เจ‡เจธ เจจเฉ‚เฉฐ เจฎเจฟเจฒ เจ—เจฟเจ†.

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ
เจจเจฟเจฐเจญเจฐเจคเจพเจตเจพเจ‚ เจจเฉ‚เฉฐ เจ•เฉŒเจฃ เจธเจฅเจพเจชเจฟเจค เจ•เจฐเฉ‡เจ—เจพ?

เจ‡เจธ เจธเจพเจฐเฉ€ เจ—เฉฑเจฒ เจจเฉ‚เฉฐ เจธเจฐเจฒ เจฌเจฃเจพเจ‰เจฃ เจฒเจˆ, เจฎเฉˆเจ‚ เจ…เฉฐเจฆเจฐ เจ˜เฉเจธเจชเฉˆเจ  เจ•เฉ€เจคเฉ€ docker-compose.yml เจ•เจพเจฐเจตเจพเจˆ requirements.txt เจธเจพเจฐเฉ‡ เจจเฉ‹เจกเจพเจ‚ 'เจคเฉ‡.

เจนเฉเจฃ เจ‡เจน เจšเจฒเจพ เจ—เจฟเจ† เจนเฉˆ:

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

เจธเจฒเฉ‡เจŸเฉ€ เจตเจฐเจ— เจธเจผเจกเจฟเจŠเจฒเจฐ เจฆเฉเจ†เจฐเจพ เจธเฉฐเจธเจพเจงเจฟเจค เจ•เฉ€เจคเฉ‡ เจ—เจ เจŸเจพเจธเจ• เจ‰เจฆเจพเจนเจฐเจจ เจนเจจเฅค

เจ…เจธเฉ€เจ‚ เจฅเฉ‹เฉœเจพ เจ‡เฉฐเจคเจœเจผเจพเจฐ เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚, เจตเจฐเจ•เจฐเจพเจ‚ เจฆเฉเจ†เจฐเจพ เจ•เฉฐเจฎ เจฌเฉฐเจฆ เจ•เจฐ เจฆเจฟเฉฑเจคเฉ‡ เจœเจพเจ‚เจฆเฉ‡ เจนเจจ:

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

เจนเจฐเฉ‡ เจตเจพเจฒเฉ‡, เจฌเฉ‡เจธเจผเฉฑเจ•, เจธเจซเจฒเจคเจพเจชเฉ‚เจฐเจตเจ• เจ†เจชเจฃเจพ เจ•เฉฐเจฎ เจชเฉ‚เจฐเจพ เจ•เจฐ เจšเฉเฉฑเจ•เฉ‡ เจนเจจ. เจฒเจพเจฒ เจฌเจนเฉเจค เจธเจซเจฒ เจจเจนเฉ€เจ‚ เจนเจจ.

เจคเจฐเฉ€เจ•เฉ‡ เจจเจพเจฒ, เจธเจพเจกเฉ‡ เจ‰เจคเจชเจพเจฆ 'เจคเฉ‡ เจ•เฉ‹เจˆ เจซเฉ‹เจฒเจกเจฐ เจจเจนเฉ€เจ‚ เจนเฉˆ ./dags, เจฎเจธเจผเฉ€เจจเจพเจ‚ เจตเจฟเจšเจ•เจพเจฐ เจ•เฉ‹เจˆ เจธเจฎเจ•เจพเจฒเฉ€เจ•เจฐเจจ เจจเจนเฉ€เจ‚ เจนเฉˆ - เจธเจพเจฐเฉ‡ เจกเจพเจ— เจ…เฉฐเจฆเจฐ เจชเจ เจนเจจ git เจธเจพเจกเฉ€ เจ—เจฟเจŸเจฒเฉˆเจฌ 'เจคเฉ‡, เจ…เจคเฉ‡ เจ—เจฟเจŸเจฒเฉˆเจฌ เจธเฉ€เจ†เจˆ เจฎเจธเจผเฉ€เจจเจพเจ‚ เจจเฉ‚เฉฐ เจ…เฉฑเจชเจกเฉ‡เจŸ เจตเฉฐเจกเจฆเจพ เจนเฉˆ เจœเจฆเฉ‹เจ‚ เจ‡เจธ เจตเจฟเฉฑเจš เจ…เจญเฉ‡เจฆ เจนเฉเฉฐเจฆเจพ เจนเฉˆ master.

เจซเฉเฉฑเจฒ เจฌเจพเจฐเฉ‡ เจฅเฉ‹เฉœเจพ เจœเจฟเจนเจพ

เจœเจฆเฉ‹เจ‚ เจ•เจฟ เจตเจฐเจ•เจฐ เจธเจพเจกเฉ‡ เจธเจผเจพเจ‚เจค เจ•เจฐเจจ เจตเจพเจฒเจฟเจ†เจ‚ เจจเฉ‚เฉฐ เจ•เฉเฉฑเจŸ เจฐเจนเฉ‡ เจนเจจ, เจ†เจ“ เจ‡เจ• เจนเฉ‹เจฐ เจธเจพเจงเจจ เจจเฉ‚เฉฐ เจฏเจพเจฆ เจ•เจฐเฉ€เจ เจœเฉ‹ เจธเจพเจจเฉ‚เฉฐ เจ•เฉเจ เจฆเจฟเจ–เจพ เจธเจ•เจฆเจพ เจนเฉˆ - เจซเจฒเจพเจตเจฐเฅค

เจตเจฐเจ•เจฐ เจจเฉ‹เจกเจพเจ‚ 'เจคเฉ‡ เจธเฉฐเจ–เฉ‡เจช เจœเจพเจฃเจ•เจพเจฐเฉ€ เจตเจพเจฒเจพ เจชเจนเจฟเจฒเจพ เจชเฉฐเจจเจพ:

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

เจ•เฉฐเจฎ เจ•เจฐเจจ เจตเจพเจฒเฉ‡ เจ•เฉฐเจฎเจพเจ‚ เจฆเฉ‡ เจจเจพเจฒ เจธเจญ เจคเฉ‹เจ‚ เจคเฉ€เจฌเจฐ เจชเฉฐเจจเจพ:

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

เจธเจพเจกเฉ‡ เจฌเฉเจฐเฉ‹เจ•เจฐ เจฆเฉ€ เจธเจฅเจฟเจคเฉ€ เจตเจพเจฒเจพ เจธเจญ เจคเฉ‹เจ‚ เจฌเฉ‹เจฐเจฟเฉฐเจ— เจชเฉฐเจจเจพ:

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

เจธเจญ เจคเฉ‹เจ‚ เจšเจฎเจ•เจฆเจพเจฐ เจชเฉ‡เจœ เจŸเจพเจธเจ• เจธเจŸเฉ‡เจŸเจธ เจ—เฉเจฐเจพเจซ เจ…เจคเฉ‡ เจ‰เจนเจจเจพเจ‚ เจฆเฉ‡ เจเจ—เจœเจผเฉ€เจ•เจฟเจŠเจธเจผเจจ เจŸเจพเจˆเจฎ เจฆเฉ‡ เจจเจพเจฒ เจนเฉˆ:

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

เจ…เจธเฉ€เจ‚ เจ…เฉฐเจกเจฐเจฒเฉ‹เจก เจฒเฉ‹เจก เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚

เจ‡เจธ เจฒเจˆ, เจธเจพเจฐเฉ‡ เจ•เฉฐเจฎ เจ•เฉ€เจคเฉ‡ เจ—เจ เจนเจจ, เจคเฉเจธเฉ€เจ‚ เจœเจผเจ–เจฎเฉ€เจ†เจ‚ เจจเฉ‚เฉฐ เจšเฉเฉฑเจ• เจธเจ•เจฆเฉ‡ เจนเฉ‹.

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

เจ…เจคเฉ‡ เจฌเจนเฉเจค เจธเจพเจฐเฉ‡ เจœเจผเจ–เจฎเฉ€ เจธเจจ - เจ‡เฉฑเจ• เจœเจพเจ‚ เจ•เจฟเจธเฉ‡ เจนเฉ‹เจฐ เจ•เจพเจฐเจจ เจ•เจฐเจ•เฉ‡. เจเจ…เจฐเจซเจฒเฉ‹ เจฆเฉ€ เจธเจนเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจฆเฉ‡ เจฎเจพเจฎเจฒเฉ‡ เจตเจฟเฉฑเจš, เจ‡เจน เจฌเจนเฉเจค เจนเฉ€ เจตเจฐเจ— เจฆเจฐเจธเจพเจ‰เจ‚เจฆเฉ‡ เจนเจจ เจ•เจฟ เจกเฉ‡เจŸเจพ เจฏเจ•เฉ€เจจเฉ€ เจคเฉŒเจฐ 'เจคเฉ‡ เจจเจนเฉ€เจ‚ เจ†เจ‡เจ† เจธเฉ€เฅค

เจคเฉเจนเจพเจจเฉ‚เฉฐ เจฒเฉŒเจ— เจฆเฉ‡เจ–เจฃ เจ…เจคเฉ‡ เจกเจฟเฉฑเจ—เฉ‡ เจนเฉ‹เจ เจŸเจพเจธเจ• เจ‰เจฆเจพเจนเจฐเจจเจพเจ‚ เจจเฉ‚เฉฐ เจฎเฉเฉœ เจšเจพเจฒเฉ‚ เจ•เจฐเจจ เจฆเฉ€ เจฒเฉ‹เฉœ เจนเฉˆเฅค

เจ•เจฟเจธเฉ‡ เจตเฉ€ เจตเจฐเจ— 'เจคเฉ‡ เจ•เจฒเจฟเฉฑเจ• เจ•เจฐเจจ เจจเจพเจฒ, เจ…เจธเฉ€เจ‚ เจธเจพเจกเฉ‡ เจฒเจˆ เจ‰เจชเจฒเจฌเจง เจ•เจพเจฐเจตเจพเจˆเจ†เจ‚ เจฆเฉ‡เจ–เจพเจ‚เจ—เฉ‡:

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

เจคเฉเจธเฉ€เจ‚ เจฒเฉˆ เจธเจ•เจฆเฉ‡ เจนเฉ‹ เจ…เจคเฉ‡ เจ•เจฒเฉ€เจ…เจฐ เจฆ เจซเจพเจฒเจจ เจฌเจฃเจพ เจธเจ•เจฆเฉ‡ เจนเฉ‹เฅค เจญเจพเจต, เจ…เจธเฉ€เจ‚ เจญเฉเฉฑเจฒ เจœเจพเจ‚เจฆเฉ‡ เจนเจพเจ‚ เจ•เจฟ เจ‰เฉฑเจฅเฉ‡ เจ•เฉเจ เจ…เจธเจซเจฒ เจนเฉ‹ เจ—เจฟเจ† เจนเฉˆ, เจ…เจคเฉ‡ เจ‰เจนเฉ€ เจ‰เจฆเจพเจนเจฐเจฃ เจ•เจพเจฐเจœ เจธเจผเจกเจฟเจŠเจฒเจฐ เจ•เฉ‹เจฒ เจœเจพเจตเฉ‡เจ—เจพ.

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

เจ‡เจน เจธเจชเฉฑเจธเจผเจŸ เจนเฉˆ เจ•เจฟ เจธเจพเจฐเฉ‡ เจฒเจพเจฒ เจตเจฐเจ—เจพเจ‚ เจฆเฉ‡ เจจเจพเจฒ เจฎเจพเจŠเจธ เจจเจพเจฒ เจ…เจœเจฟเจนเจพ เจ•เจฐเจจเจพ เจฌเจนเฉเจค เจฎเจจเฉเฉฑเจ–เฉ€ เจจเจนเฉ€เจ‚ เจนเฉˆ - เจ‡เจน เจ‰เจน เจจเจนเฉ€เจ‚ เจนเฉˆ เจœเฉ‹ เจ…เจธเฉ€เจ‚ เจเจ…เจฐเจซเจฒเฉ‹ เจคเฉ‹เจ‚ เจ‰เจฎเฉ€เจฆ เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚. เจ•เฉเจฆเจฐเจคเฉ€ เจคเฉŒเจฐ 'เจคเฉ‡, เจธเจพเจกเฉ‡ เจ•เฉ‹เจฒ เจตเจฟเจ†เจชเจ• เจคเจฌเจพเจนเฉ€ เจฆเฉ‡ เจนเจฅเจฟเจ†เจฐ เจนเจจ: Browse/Task Instances

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

เจšเจฒเฉ‹ เจธเจญ เจ•เฉเจ เจ‡เฉฑเจ•เฉ‹ เจตเจพเจฐ เจšเฉเจฃเฉ€เจ เจ…เจคเฉ‡ เจœเจผเฉ€เจฐเฉ‹ 'เจคเฉ‡ เจฐเฉ€เจธเฉˆเจŸ เจ•เจฐเฉ€เจ, เจธเจนเฉ€ เจ†เจˆเจŸเจฎ 'เจคเฉ‡ เจ•เจฒเจฟเฉฑเจ• เจ•เจฐเฉ‹:

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

เจธเจซเจพเจˆ เจ•เจฐเจจ เจคเฉ‹เจ‚ เจฌเจพเจ…เจฆ, เจธเจพเจกเฉ€เจ†เจ‚ เจŸเฉˆเจ•เจธเฉ€เจ†เจ‚ เจ‡เจธ เจคเจฐเฉเจนเจพเจ‚ เจฆเจฟเจ–เจพเจˆ เจฆเจฟเฉฐเจฆเฉ€เจ†เจ‚ เจนเจจ (เจ‰เจน เจชเจนเจฟเจฒเจพเจ‚ เจนเฉ€ เจ…เจจเฉเจธเฉ‚เจšเจฟเจคเจ•เจฐเจคเจพ เจฆเฉเจ†เจฐเจพ เจ‰เจนเจจเจพเจ‚ เจจเฉ‚เฉฐ เจคเจนเจฟ เจ•เจฐเจจ เจฒเจˆ เจ‰เจกเฉ€เจ• เจ•เจฐ เจฐเจนเฉ€เจ†เจ‚ เจนเจจ):

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

เจ•เจจเฉˆเจ•เจธเจผเจจ, เจนเฉเฉฑเจ• เจ…เจคเฉ‡ เจนเฉ‹เจฐ เจตเฉ‡เจฐเฉ€เจเจฌเจฒ

เจ‡เจน เจ…เจ—เจฒเฉ€ เจกเฉ€เจเจœเฉ€ เจจเฉ‚เฉฐ เจฆเฉ‡เจ–เจฃ เจฆเจพ เจธเจฎเจพเจ‚ เจนเฉˆ, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""ะ“ะพัะฟะพะดะฐ ั…ะพั€ะพัˆะธะต, ะพั‚ั‡ะตั‚ั‹ ะพะฑะฝะพะฒะปะตะฝั‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ะะฐั‚ะฐัˆ, ะฟั€ะพัั‹ะฟะฐะนัั, ะผั‹ {{ dag.dag_id }} ัƒั€ะพะฝะธะปะธ
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

เจ•เฉ€ เจนเจฐ เจ•เจฟเจธเฉ‡ เจจเฉ‡ เจ•เจฆเฉ‡ เจฐเจฟเจชเฉ‹เจฐเจŸ เจ…เจชเจกเฉ‡เจŸ เจ•เฉ€เจคเฉ€ เจนเฉˆ? เจ‡เจน เจ‰เจธ เจฆเฉ€ เจฆเฉเจฌเจพเจฐเจพ เจนเฉˆ: เจ‡เฉฑเจฅเฉ‡ เจธเจฐเฉ‹เจคเจพเจ‚ เจฆเฉ€ เจ‡เฉฑเจ• เจธเฉ‚เจšเฉ€ เจนเฉˆ เจœเจฟเฉฑเจฅเฉ‡ เจกเฉ‡เจŸเจพ เจชเฉเจฐเจพเจชเจค เจ•เจฐเจจเจพ เจนเฉˆ; เจ‡เฉฑเจฅเฉ‡ เจ‡เฉฑเจ• เจธเฉ‚เจšเฉ€ เจนเฉˆ เจœเจฟเฉฑเจฅเฉ‡ เจฒเจ—เจพเจ‰เจฃเจพ เจนเฉˆ; เจœเจฆเฉ‹เจ‚ เจธเจญ เจ•เฉเจ เจตเจพเจชเจฐเจฟเจ† เจœเจพเจ‚ เจŸเฉเฉฑเจŸ เจ—เจฟเจ† เจคเจพเจ‚ เจนเจพเจจเจ• เจ•เจฐเจจเจพ เจจเจพ เจญเฉเฉฑเจฒเฉ‹ (เจ เฉ€เจ• เจนเฉˆ, เจ‡เจน เจธเจพเจกเฉ‡ เจฌเจพเจฐเฉ‡ เจจเจนเฉ€เจ‚ เจนเฉˆ, เจจเจนเฉ€เจ‚)เฅค

เจšเจฒเฉ‹ เจซเจพเจˆเจฒ เจจเฉ‚เฉฐ เจฆเฉเจฌเจพเจฐเจพ เจตเฉ‡เจ–เฉ€เจ เจ…เจคเฉ‡ เจจเจตเฉ€เจ‚ เจ…เจธเจชเจธเจผเจŸ เจšเฉ€เจœเจผเจพเจ‚ เจจเฉ‚เฉฐ เจตเฉ‡เจ–เฉ€เจ:

  • from commons.operators import TelegramBotSendMessage - เจ•เฉ‹เจˆ เจตเฉ€ เจšเฉ€เจœเจผ เจธเจพเจจเฉ‚เฉฐ เจ†เจชเจฃเฉ‡ เจ†เจชเจฐเฉ‡เจŸเจฐ เจฌเจฃเจพเจ‰เจฃ เจคเฉ‹เจ‚ เจจเจนเฉ€เจ‚ เจฐเฉ‹เจ•เจฆเฉ€, เจœเจฟเจธเจฆเจพ เจ…เจธเฉ€เจ‚ เจ…เจจเจฌเจฒเฉŒเจ•เจก เจจเฉ‚เฉฐ เจธเฉฐเจฆเฉ‡เจธเจผ เจญเฉ‡เจœเจฃ เจฒเจˆ เจ‡เฉฑเจ• เจ›เฉ‹เจŸเจพ เจฐเฉˆเจชเจฐ เจฌเจฃเจพ เจ•เฉ‡ เจซเจพเจ‡เจฆเจพ เจฒเจฟเจ†เฅค (เจ…เจธเฉ€เจ‚ เจนเฉ‡เจ เจพเจ‚ เจ‡เจธ เจ†เจชเจฐเฉ‡เจŸเจฐ เจฌเจพเจฐเฉ‡ เจนเฉ‹เจฐ เจ—เฉฑเจฒ เจ•เจฐเจพเจ‚เจ—เฉ‡);
  • default_args={} - dag เจ†เจชเจฃเฉ‡ เจธเจพเจฐเฉ‡ เจ“เจชเจฐเฉ‡เจŸเจฐเจพเจ‚ เจจเฉ‚เฉฐ เจ‡เฉฑเจ•เฉ‹ เจœเจฟเจนเฉ€เจ†เจ‚ เจฆเจฒเฉ€เจฒเจพเจ‚ เจตเฉฐเจก เจธเจ•เจฆเจพ เจนเฉˆ;
  • to='{{ var.value.all_the_kings_men }}' - เจ–เฉ‡เจคเจฐ to เจธเจพเจกเฉ‡ เจ•เฉ‹เจฒ เจนเจพเจฐเจกเจ•เฉ‹เจก เจจเจนเฉ€เจ‚ เจนเฉ‹เจตเฉ‡เจ—เจพ, เจชเจฐ เจœเจฟเฉฐเจœเจพ เจ…เจคเฉ‡ เจˆเจฎเฉ‡เจฒเจพเจ‚ เจฆเฉ€ เจธเฉ‚เจšเฉ€ เจฆเฉ‡ เจจเจพเจฒ เจ‡เฉฑเจ• เจตเฉ‡เจฐเฉ€เจเจฌเจฒ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจ•เฉ‡ เจ—เจคเฉ€เจธเจผเฉ€เจฒ เจคเฉŒเจฐ 'เจคเฉ‡ เจคเจฟเจ†เจฐ เจ•เฉ€เจคเจพ เจ—เจฟเจ† เจนเฉˆ, เจœเฉ‹ เจฎเฉˆเจ‚ เจงเจฟเจ†เจจ เจจเจพเจฒ เจฐเฉฑเจ–เจฆเจพ เจนเจพเจ‚ Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS โ€” เจ†เจชเจฐเฉ‡เจŸเจฐ เจธเจผเฉเจฐเฉ‚ เจ•เจฐเจจ เจฒเจˆ เจธเจผเจฐเจคเฅค เจธเจพเจกเฉ‡ เจ•เฉ‡เจธ เจตเจฟเฉฑเจš, เจชเฉฑเจคเจฐ เจธเจฟเจฐเจซ เจคเจพเจ‚ เจนเฉ€ เจฎเจพเจฒเจ•เจพเจ‚ เจจเฉ‚เฉฐ เจœเจพเจตเฉ‡เจ—เจพ เจœเฉ‡เจ•เจฐ เจธเจพเจฐเฉ€เจ†เจ‚ เจจเจฟเจฐเจญเจฐเจคเจพเจตเจพเจ‚ เจจเฉ‡ เจ•เฉฐเจฎ เจ•เฉ€เจคเจพ เจนเฉˆ เจธเจซเจฒเจคเจพเจชเฉ‚เจฐเจตเจ•;
  • tg_bot_conn_id='tg_main' - เจฆเจฒเฉ€เจฒเจพเจ‚ conn_id เจ•เจจเฉˆเจ•เจธเจผเจจ ID เจธเจตเฉ€เจ•เจพเจฐ เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚ เจœเฉ‹ เจ…เจธเฉ€เจ‚ เจฌเจฃเจพเจ‰เจ‚เจฆเฉ‡ เจนเจพเจ‚ Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - เจŸเฉˆเจฒเฉ€เจ—เฉเจฐเจพเจฎ เจตเจฟเฉฑเจš เจธเฉเจจเฉ‡เจนเฉ‡ เจคเจพเจ‚ เจนเฉ€ เจ‰เฉฑเจก เจœเจพเจฃเจ—เฉ‡ เจœเฉ‡ เจ•เฉ‹เจˆ เจ•เฉฐเจฎ เจกเจฟเฉฑเจ—เฉ‡ เจนเฉ‹เจ เจนเจจ;
  • task_concurrency=1 - เจ…เจธเฉ€เจ‚ เจ‡เฉฑเจ• เจ•เฉฐเจฎ เจฆเฉ‡ เจ•เจˆ เจŸเจพเจธเจ• เจ‰เจฆเจพเจนเจฐเจจเจพเจ‚ เจจเฉ‚เฉฐ เจ‡เฉฑเจ•เฉ‹ เจธเจฎเฉ‡เจ‚ เจฒเจพเจ‚เจš เจ•เจฐเจจ เจฆเฉ€ เจฎเจจเจพเจนเฉ€ เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚เฅค เจจเจนเฉ€เจ‚ เจคเจพเจ‚, เจ…เจธเฉ€เจ‚ เจ•เจˆเจ†เจ‚ เจฆเฉ€ เจ‡เฉฑเจ•เฉ‹ เจธเจฎเฉ‡เจ‚ เจฒเจพเจ‚เจšเจฟเฉฐเจ— เจชเฉเจฐเจพเจชเจค เจ•เจฐเจพเจ‚เจ—เฉ‡ VerticaOperator (เจ‡เฉฑเจ• เจฎเฉ‡เจœเจผ เจตเฉฑเจฒ เจฆเฉ‡เจ– เจฐเจฟเจนเจพ เจนเฉˆ);
  • report_update >> [email, tg] - เจธเจพเจฐเฉ‡ VerticaOperator เจšเจฟเฉฑเจ เฉ€เจ†เจ‚ เจ…เจคเฉ‡ เจธเฉเจจเฉ‡เจนเจฟเจ†เจ‚ เจจเฉ‚เฉฐ เจญเฉ‡เจœเจฃ เจตเจฟเฉฑเจš เจ‡เจ•เจธเจพเจฐ เจนเฉ‹เจตเฉ‹, เจ‡เจธ เจคเจฐเฉเจนเจพเจ‚:
    เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

    เจชเจฐ เจ•เจฟเจ‰เจ‚เจ•เจฟ เจจเฉ‹เจŸเฉ€เจซเจพเจ‡เจฐ เจ†เจชเจฐเฉ‡เจŸเจฐเจพเจ‚ เจฆเฉ€เจ†เจ‚ เจตเฉฑเจ–-เจตเฉฑเจ– เจฒเจพเจ‚เจš เจธเจผเจฐเจคเจพเจ‚ เจนเจจ, เจ•เฉ‡เจตเจฒ เจ‡เฉฑเจ• เจนเฉ€ เจ•เฉฐเจฎ เจ•เจฐเฉ‡เจ—เจพเฅค เจŸเฉเจฐเฉ€ เจตเจฟเจŠ เจตเจฟเฉฑเจš, เจธเจญ เจ•เฉเจ เจฅเฉ‹เฉœเจพ เจ˜เฉฑเจŸ เจตเจฟเจœเจผเฉ‚เจ…เจฒ เจฆเจฟเจ–เจพเจˆ เจฆเจฟเฉฐเจฆเจพ เจนเฉˆ:
    เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

เจฌเจพเจฐเฉ‡ เจ•เฉเจ เจธเจผเจฌเจฆ เจ•เจนเจพเจ‚เจ—เจพ เจฎเฉˆเจ•เจฐเฉ‹ เจ…เจคเฉ‡ เจ‰เจนเจจเจพเจ‚ เจฆเฉ‡ เจฆเฉ‹เจธเจค - เจตเฉ‡เจฐเฉ€เจเจฌเจฒ.

เจฎเฉˆเจ•เจฐเฉ‹ เจœเจฟเฉฐเจœเจพ เจชเจฒเฉ‡เจธเจนเฉ‹เจฒเจกเจฐ เจนเจจ เจœเฉ‹ เจตเฉฑเจ–-เจตเฉฑเจ– เจ‰เจชเจฏเฉ‹เจ—เฉ€ เจœเจพเจฃเจ•เจพเจฐเฉ€ เจจเฉ‚เฉฐ เจ†เจชเจฐเฉ‡เจŸเจฐ เจ†เจฐเจ—เฉ‚เจฎเฉˆเจ‚เจŸเจพเจ‚ เจตเจฟเฉฑเจš เจฌเจฆเจฒ เจธเจ•เจฆเฉ‡ เจนเจจเฅค เจ‰เจฆเจพเจนเจฐเจจ เจฒเจˆ, เจ‡เจธ เจคเจฐเฉเจนเจพเจ‚:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} เจธเฉฐเจฆเจฐเจญ เจตเฉ‡เจฐเฉ€เจเจฌเจฒ เจฆเฉ€ เจธเจฎเฉฑเจ—เจฐเฉ€ เจคเฉฑเจ• เจตเจฟเจธเจคเจพเจฐ เจ•เจฐเฉ‡เจ—เจพ execution_date เจซเจพเจฐเจฎเฉˆเจŸ เจตเจฟเฉฑเจš YYYY-MM-DD: 2020-07-14. เจธเจญ เจคเฉ‹เจ‚ เจตเจงเฉ€เจ† เจ—เฉฑเจฒ เจ‡เจน เจนเฉˆ เจ•เจฟ เจธเฉฐเจฆเจฐเจญ เจตเฉ‡เจฐเฉ€เจเจฌเจฒเจพเจ‚ เจจเฉ‚เฉฐ เจ‡เฉฑเจ• เจ–เจพเจธ เจŸเจพเจธเจ• เจ‰เจฆเจพเจนเจฐเจจ (เจŸเฉเจฐเฉ€ เจตเจฟเจŠ เจตเจฟเฉฑเจš เจ‡เฉฑเจ• เจตเจฐเจ—) เจจเจพเจฒ เจœเฉ‹เฉœเจฟเจ† เจœเจพเจ‚เจฆเจพ เจนเฉˆ, เจ…เจคเฉ‡ เจœเจฆเฉ‹เจ‚ เจฎเฉเฉœ เจšเจพเจฒเฉ‚ เจ•เฉ€เจคเจพ เจœเจพเจ‚เจฆเจพ เจนเฉˆ, เจคเจพเจ‚ เจชเจฒเฉ‡เจธเจนเฉ‹เจฒเจกเจฐ เจ‰เจธเฉ‡ เจฎเฉเฉฑเจฒเจพเจ‚ เจคเฉฑเจ• เจซเฉˆเจฒ เจœเจพเจฃเจ—เฉ‡เฅค

เจจเจฟเจฐเจงเจพเจฐเจค เจฎเฉเฉฑเจฒเจพเจ‚ เจจเฉ‚เฉฐ เจนเจฐเฉ‡เจ• เจŸเจพเจธเจ• เจ‰เจฆเจพเจนเจฐเจจ 'เจคเฉ‡ เจฐเฉˆเจ‚เจกเจฐเจก เจฌเจŸเจจ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจ•เฉ‡ เจฆเฉ‡เจ–เจฟเจ† เจœเจพ เจธเจ•เจฆเจพ เจนเฉˆเฅค เจชเฉฑเจคเจฐ เจญเฉ‡เจœเจฃ เจฆเจพ เจ•เฉฐเจฎ เจ‡เจธ เจคเจฐเฉเจนเจพเจ‚ เจนเฉˆ:

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

เจ…เจคเฉ‡ เจ‡เจธ เจฒเจˆ เจ‡เฉฑเจ• เจธเฉเจจเฉ‡เจนเจพ เจญเฉ‡เจœเจฃ เจฆเฉ‡ เจ•เฉฐเจฎ 'เจคเฉ‡:

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

เจจเจตเฉ€เจจเจคเจฎ เจ‰เจชเจฒเจฌเจง เจธเฉฐเจธเจ•เจฐเจฃ เจฒเจˆ เจฌเจฟเจฒเจŸ-เจ‡เจจ เจฎเฉˆเจ•เจฐเฉ‹ เจฆเฉ€ เจ‡เฉฑเจ• เจชเฉ‚เจฐเฉ€ เจธเฉ‚เจšเฉ€ เจ‡เฉฑเจฅเฉ‡ เจ‰เจชเจฒเจฌเจง เจนเฉˆ: เจฎเฉˆเจ•เจฐเฉ‹ เจธเฉฐเจฆเจฐเจญ

เจ‡เจธ เจคเฉ‹เจ‚ เจ‡เจฒเจพเจตเจพ, เจชเจฒเฉฑเจ—เจ‡เจจ เจฆเฉ€ เจฎเจฆเจฆ เจจเจพเจฒ, เจ…เจธเฉ€เจ‚ เจ†เจชเจฃเฉ‡ เจ–เฉเจฆ เจฆเฉ‡ เจฎเฉˆเจ•เจฐเฉ‹ เจ˜เฉ‹เจธเจผเจฟเจค เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเจพเจ‚, เจชเจฐ เจ‡เจน เจ‡เจ• เจนเฉ‹เจฐ เจ•เจนเจพเจฃเฉ€ เจนเฉˆเฅค

เจชเจนเจฟเจฒเจพเจ‚ เจคเฉ‹เจ‚ เจชเจฐเจฟเจญเจพเจธเจผเจฟเจค เจšเฉ€เจœเจผเจพเจ‚ เจคเฉ‹เจ‚ เจ‡เจฒเจพเจตเจพ, เจ…เจธเฉ€เจ‚ เจ†เจชเจฃเฉ‡ เจตเฉ‡เจฐเฉ€เจเจฌเจฒ เจฆเฉ‡ เจฎเฉเฉฑเจฒเจพเจ‚ เจจเฉ‚เฉฐ เจฌเจฆเจฒ เจธเจ•เจฆเฉ‡ เจนเจพเจ‚ (เจฎเฉˆเจ‚ เจชเจนเจฟเจฒเจพเจ‚ เจนเฉ€ เจ‰เจชเจฐเฉ‹เจ•เจค เจ•เฉ‹เจก เจตเจฟเฉฑเจš เจ‡เจธเจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เฉ€เจคเฉ€ เจนเฉˆ)เฅค เจšเจฒเฉ‹ เจ…เฉฐเจฆเจฐ เจฌเจฃเจพเจ‰ Admin/Variables เจ•เฉเจ เจšเฉ€เจœเจผเจพเจ‚:

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

เจนเจฐ เจšเฉ€เจœเจผ เจœเฉ‹ เจคเฉเจธเฉ€เจ‚ เจตเจฐเจค เจธเจ•เจฆเฉ‡ เจนเฉ‹:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

เจฎเฉเฉฑเจฒ เจ‡เฉฑเจ• เจธเจ•เฉ‡เจฒเจฐ เจนเฉ‹ เจธเจ•เจฆเจพ เจนเฉˆ, เจœเจพเจ‚ เจ‡เจน JSON เจตเฉ€ เจนเฉ‹ เจธเจ•เจฆเจพ เจนเฉˆเฅค JSON เจฆเฉ‡ เจฎเจพเจฎเจฒเฉ‡ เจตเจฟเฉฑเจš:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

เจธเจฟเจฐเจซเจผ เจฒเฉ‹เฉœเฉ€เจ‚เจฆเฉ€ เจ•เฉเฉฐเจœเฉ€ เจฒเจˆ เจฎเจพเจฐเจ— เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเฉ‹: {{ var.json.bot_config.bot.token }}.

เจฎเฉˆเจ‚ เจธเจผเจพเจฌเจฆเจฟเจ• เจคเฉŒเจฐ 'เจคเฉ‡ เจ‡เฉฑเจ• เจธเจผเจฌเจฆ เจ•เจนเจพเจ‚เจ—เจพ เจ…เจคเฉ‡ เจ‡เจธ เจฌเจพเจฐเฉ‡ เจ‡เฉฑเจ• เจธเจ•เฉเจฐเฉ€เจจเจธเจผเฉŒเจŸ เจฆเจฟเจ–เจพเจตเจพเจ‚เจ—เจพ เจ•เฉเจจเฉˆเจ•เจธเจผเจจ. เจ‡เฉฑเจฅเฉ‡ เจธเจญ เจ•เฉเจ เจฎเฉเฉฑเจขเจฒเฉ€ เจนเฉˆ: เจชเฉฐเจจเฉ‡ 'เจคเฉ‡ Admin/Connections เจ…เจธเฉ€เจ‚ เจ‡เฉฑเจ• เจ•เฉเจจเฉˆเจ•เจธเจผเจจ เจฌเจฃเจพเจ‰เจ‚เจฆเฉ‡ เจนเจพเจ‚, เจ‰เฉฑเจฅเฉ‡ เจธเจพเจกเฉ‡ เจฒเฉŒเจ—เจฟเจจ/เจชเจพเจธเจตเจฐเจก เจ…เจคเฉ‡ เจนเฉ‹เจฐ เจ–เจพเจธ เจฎเจพเจชเจฆเฉฐเจก เจœเฉ‹เฉœเจฆเฉ‡ เจนเจพเจ‚เฅค เจ‡เจธ เจคเจฐเฉเจนเจพเจ‚:

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

เจชเจพเจธเจตเจฐเจกเจพเจ‚ เจจเฉ‚เฉฐ เจเจจเจ•เฉเจฐเจฟเจชเจŸ เจ•เฉ€เจคเจพ เจœเจพ เจธเจ•เจฆเจพ เจนเฉˆ (เจกเจฟเจซเฉŒเจฒเจŸ เจจเจพเจฒเฉ‹เจ‚ เจœเจผเจฟเจ†เจฆเจพ เจšเฉฐเจ—เฉ€ เจคเจฐเฉเจนเจพเจ‚), เจœเจพเจ‚ เจคเฉเจธเฉ€เจ‚ เจ•เฉเจจเฉˆเจ•เจธเจผเจจ เจฆเฉ€ เจ•เจฟเจธเจฎ เจจเฉ‚เฉฐ เจ›เฉฑเจก เจธเจ•เจฆเฉ‡ เจนเฉ‹ (เจœเจฟเจตเฉ‡เจ‚ เจ•เจฟ เจฎเฉˆเจ‚ เจ‡เจธ เจฒเจˆ เจ•เฉ€เจคเจพ เจธเฉ€เฅค tg_main) - เจคเฉฑเจฅ เจ‡เจน เจนเฉˆ เจ•เจฟ เจ•เจฟเจธเจฎเจพเจ‚ เจฆเฉ€ เจธเฉ‚เจšเฉ€ เจเจ…เจฐเจซเจฒเฉ‹ เจฎเจพเจกเจฒเจพเจ‚ เจตเจฟเฉฑเจš เจนเจพเจฐเจกเจตเจพเจ‡เจฐเจก เจนเฉˆ เจ…เจคเฉ‡ เจธเจฐเฉ‹เจค เจ•เฉ‹เจกเจพเจ‚ เจตเจฟเฉฑเจš เจธเจผเจพเจฎเจฒ เจ•เฉ€เจคเฉ‡ เจฌเจฟเจจเจพเจ‚ เจตเจฟเจธเจคเจพเจฐ เจจเจนเฉ€เจ‚ เจ•เฉ€เจคเจพ เจœเจพ เจธเจ•เจฆเจพ เจนเฉˆ (เจœเฉ‡ เจ…เจšเจพเจจเจ• เจฎเฉˆเจ‚ เจ•เฉเจ เจ—เฉ‚เจ—เจฒ เจจเจนเฉ€เจ‚ เจ•เฉ€เจคเจพ, เจคเจพเจ‚ เจ•เจฟเจฐเจชเจพ เจ•เจฐเจ•เฉ‡ เจฎเฉˆเจจเฉ‚เฉฐ เจ เฉ€เจ• เจ•เจฐเฉ‹), เจชเจฐ เจ•เฉเจ เจตเฉ€ เจธเจพเจจเฉ‚เฉฐ เจ•เฉเจฐเฉˆเจกเจฟเจŸ เจชเฉเจฐเจพเจชเจค เจ•เจฐเจจ เจคเฉ‹เจ‚ เจฐเฉ‹เจ• เจจเจนเฉ€เจ‚ เจธเจ•เฉ‡เจ—เจพ เจจเจพเจฎ

เจคเฉเจธเฉ€เจ‚ เจ‡เฉฑเจ•เฉ‹ เจจเจพเจฎ เจฆเฉ‡ เจจเจพเจฒ เจ•เจˆ เจ•เฉเจจเฉˆเจ•เจธเจผเจจ เจตเฉ€ เจฌเจฃเจพ เจธเจ•เจฆเฉ‡ เจนเฉ‹: เจ‡เจธ เจ•เฉ‡เจธ เจตเจฟเฉฑเจš, เจตเจฟเจงเฉ€ BaseHook.get_connection(), เจœเฉ‹ เจธเจพเจจเฉ‚เฉฐ เจจเจพเจฎ เจฆเฉเจ†เจฐเจพ เจ•เฉเจจเฉˆเจ•เจธเจผเจจ เจชเฉเจฐเจพเจชเจค เจ•เจฐเจฆเจพ เจนเฉˆ, เจฆเฉ‡เจตเฉ‡เจ—เจพ เจฌเฉ‡เจคเจฐเจคเฉ€เจฌ เจ•เจˆ เจจเจพเจฎเจพเจ‚ เจคเฉ‹เจ‚ (เจฐเจพเจŠเจ‚เจก เจฐเฉŒเจฌเจฟเจจ เจฌเจฃเจพเจ‰เจฃเจพ เจตเจงเฉ‡เจฐเฉ‡ เจคเจฐเจ•เจชเฉ‚เจฐเจจ เจนเฉ‹เจตเฉ‡เจ—เจพ, เจชเจฐ เจ†เจ“ เจ‡เจธเจจเฉ‚เฉฐ เจเจ…เจฐเจซเจฒเฉ‹ เจกเจฟเจตเฉˆเจฒเจชเจฐเจพเจ‚ เจฆเฉ€ เจœเจผเจฎเฉ€เจฐ 'เจคเฉ‡ เจ›เฉฑเจก เจฆเฉ‡เจˆเจ)เฅค

เจตเฉ‡เจฐเฉ€เจเจฌเจฒ เจ…เจคเฉ‡ เจ•เจจเฉˆเจ•เจธเจผเจจ เจจเจฟเจธเจผเจšเจฟเจค เจคเฉŒเจฐ 'เจคเฉ‡ เจตเจงเฉ€เจ† เจŸเฉ‚เจฒ เจนเจจ, เจชเจฐ เจธเฉฐเจคเฉเจฒเจจ เจจเจพ เจ—เฉเจ†เจ‰เจฃเจพ เจฎเจนเฉฑเจคเจตเจชเฉ‚เจฐเจจ เจนเฉˆ: เจคเฉเจนเจพเจกเฉ‡ เจชเฉเจฐเจตเจพเจน เจฆเฉ‡ เจ•เจฟเจนเฉœเฉ‡ เจนเจฟเฉฑเจธเฉ‡ เจคเฉเจธเฉ€เจ‚ เจ•เฉ‹เจก เจตเจฟเฉฑเจš เจธเจŸเฉ‹เจฐ เจ•เจฐเจฆเฉ‡ เจนเฉ‹, เจ…เจคเฉ‡ เจคเฉเจธเฉ€เจ‚ เจธเจŸเฉ‹เจฐเฉ‡เจœ เจฒเจˆ เจเจ…เจฐเจซเจฒเฉ‹ เจจเฉ‚เฉฐ เจ•เจฟเจนเฉœเฉ‡ เจนเจฟเฉฑเจธเฉ‡ เจฆเจฟเฉฐเจฆเฉ‡ เจนเฉ‹เฅค เจ‡เฉฑเจ• เจชเจพเจธเฉ‡, UI เจฆเฉเจ†เจฐเจพ เจฎเฉเฉฑเจฒ เจจเฉ‚เฉฐ เจคเฉ‡เจœเจผเฉ€ เจจเจพเจฒ เจฌเจฆเจฒเจฃเจพ เจธเฉเจตเจฟเจงเจพเจœเจจเจ• เจนเฉ‹ เจธเจ•เจฆเจพ เจนเฉˆ, เจ‰เจฆเจพเจนเจฐเจจ เจฒเจˆ, เจ‡เฉฑเจ• เจฎเฉ‡เจฒเจฟเฉฐเจ— เจฌเจพเจ•เจธเฅค เจฆเฉ‚เจœเฉ‡ เจชเจพเจธเฉ‡, เจ‡เจน เจ…เจœเฉ‡ เจตเฉ€ เจฎเจพเจŠเจธ เจ•เจฒเจฟเฉฑเจ• เจฆเฉ€ เจตเจพเจชเจธเฉ€ เจนเฉˆ, เจœเจฟเจธ เจคเฉ‹เจ‚ เจ…เจธเฉ€เจ‚ (เจฎเฉˆเจ‚) เจ›เฉเจŸเจ•เจพเจฐเจพ เจชเจพเจ‰เจฃเจพ เจšเจพเจนเฉเฉฐเจฆเฉ‡ เจธเฉ€เฅค

เจ•เฉเจจเฉˆเจ•เจธเจผเจจเจพเจ‚ เจจเจพเจฒ เจ•เฉฐเจฎ เจ•เจฐเจจเจพ เจ‡เฉฑเจ• เจ•เฉฐเจฎ เจนเฉˆ เจนเฉเฉฑเจ•. เจ†เจฎ เจคเฉŒเจฐ 'เจคเฉ‡, เจเจ…เจฐเจซเจฒเฉ‹ เจนเฉเฉฑเจ• เจ‡เจธ เจจเฉ‚เฉฐ เจคเฉ€เจœเฉ€-เจงเจฟเจฐ เจฆเฉ€เจ†เจ‚ เจธเฉ‡เจตเจพเจตเจพเจ‚ เจ…เจคเฉ‡ เจฒเจพเจ‡เจฌเฉเจฐเฉ‡เจฐเฉ€เจ†เจ‚ เจจเจพเจฒ เจœเฉ‹เฉœเจจ เจฒเจˆ เจชเฉเจ†เจ‡เฉฐเจŸ เจนเฉเฉฐเจฆเฉ‡ เจนเจจเฅค เจœเจฟเจตเฉ‡เจ‚, JiraHook เจœเฉ€เจฐเจพ เจจเจพเจฒ เจ—เฉฑเจฒเจฌเจพเจค เจ•เจฐเจจ เจฒเจˆ เจธเจพเจกเฉ‡ เจฒเจˆ เจ‡เฉฑเจ• เจ•เจฒเจพเจ‡เฉฐเจŸ เจ–เฉ‹เจฒเฉเจนเฉ‡เจ—เจพ (เจคเฉเจธเฉ€เจ‚ เจ•เฉฐเจฎเจพเจ‚ เจจเฉ‚เฉฐ เจ…เฉฑเจ—เฉ‡ เจ…เจคเฉ‡ เจชเจฟเฉฑเจ›เฉ‡ เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเฉ‹), เจ…เจคเฉ‡ เจฆเฉ€ เจฎเจฆเจฆ เจจเจพเจฒ SambaHook เจคเฉเจธเฉ€เจ‚ เจ‡เฉฑเจ• เจธเจฅเจพเจจเจ• เจซเจพเจˆเจฒ เจจเฉ‚เฉฐ เจฆเจฌเจพ เจธเจ•เจฆเฉ‡ เจนเฉ‹ smb- เจฌเจฟเฉฐเจฆเฉ‚.

เจ•เจธเจŸเจฎ เจ†เจชเจฐเฉ‡เจŸเจฐ เจจเฉ‚เฉฐ เจชเจพเจฐเจธ เจ•เจฐ เจฐเจฟเจนเจพ เจนเฉˆ

เจ…เจคเฉ‡ เจ…เจธเฉ€เจ‚ เจ‡เจน เจฆเฉ‡เจ–เจฃ เจฆเฉ‡ เจจเฉ‡เฉœเฉ‡ เจนเฉ‹ เจ—เจ เจ•เจฟ เจ‡เจน เจ•เจฟเจตเฉ‡เจ‚ เจฌเจฃเจฟเจ† เจนเฉˆ TelegramBotSendMessage

เจ•เฉ‹เจก commons/operators.py เจ…เจธเจฒ เจ†เจชเจฐเฉ‡เจŸเจฐ เจจเจพเจฒ:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

เจ‡เฉฑเจฅเฉ‡, เจเจ…เจฐเจซเจฒเฉ‹ เจตเจฟเฉฑเจš เจนเจฐ เจšเฉ€เจœเจผ เจฆเฉ€ เจคเจฐเฉเจนเจพเจ‚, เจนเจฐ เจšเฉ€เจœเจผ เจฌเจนเฉเจค เจธเจงเจพเจฐเจจ เจนเฉˆ:

  • เจคเฉ‹เจ‚ เจตเจฟเจฐเจธเฉ‡ เจตเจฟเฉฑเจš เจฎเจฟเจฒเฉ€ BaseOperator, เจœเฉ‹ เจ•เจฟ เจ•เฉเจ เจเจ…เจฐเจซเจฒเฉ‹-เจตเจฟเจธเจผเฉ‡เจธเจผ เจšเฉ€เจœเจผเจพเจ‚ เจจเฉ‚เฉฐ เจฒเจพเจ—เฉ‚ เจ•เจฐเจฆเจพ เจนเฉˆ (เจ†เจชเจฃเฉ‡ เจฎเจจเฉ‹เจฐเฉฐเจœเจจ เจจเฉ‚เฉฐ เจฆเฉ‡เจ–เฉ‹)
  • เจ˜เฉ‹เจธเจผเจฟเจค เจ–เฉ‡เจคเจฐ template_fields, เจœเจฟเจธ เจตเจฟเฉฑเจš Jinja เจชเฉเจฐเจ•เจฟเจฐเจฟเจ† เจ•เจฐเจจ เจฒเจˆ เจฎเฉˆเจ•เจฐเฉ‹ เจฆเฉ€ เจ–เฉ‹เจœ เจ•เจฐเฉ‡เจ—เจพเฅค
  • เจฒเจˆ เจธเจนเฉ€ เจฆเจฒเฉ€เจฒเจพเจ‚ เจฆเจพ เจชเฉเจฐเจฌเฉฐเจง เจ•เฉ€เจคเจพ __init__(), เจœเจฟเฉฑเจฅเฉ‡ เจฒเฉ‹เฉœ เจนเฉ‹เจตเฉ‡ เจกเจฟเจซเจพเจฒเจŸ เจธเฉˆเฉฑเจŸ เจ•เจฐเฉ‹เฅค
  • เจ…เจธเฉ€เจ‚ เจชเฉ‚เจฐเจตเจœ เจฆเฉ€ เจธเจผเฉเจฐเฉ‚เจ†เจค เจฌเจพเจฐเฉ‡ เจตเฉ€ เจจเจนเฉ€เจ‚ เจญเฉเฉฑเจฒเฉ‡เฅค
  • เจ…เจจเฉเจธเจพเจฐเฉ€ เจนเฉเฉฑเจ• เจ–เฉ‹เจฒเฉเจนเฉ€ TelegramBotHookเจ‡เจธ เจคเฉ‹เจ‚ เจ‡เฉฑเจ• เจ•เจฒเจพเจ‡เฉฐเจŸ เจ†เจฌเจœเฉˆเจ•เจŸ เจชเฉเจฐเจพเจชเจค เจ•เฉ€เจคเจพเฅค
  • เจ“เจตเจฐเจฐเจพเจˆเจก (เจฎเฉเฉœ เจชเจฐเจฟเจญเจพเจธเจผเจฟเจค) เจตเจฟเจงเฉ€ BaseOperator.execute(), เจœเจฆเฉ‹เจ‚ เจ“เจชเจฐเฉ‡เจŸเจฐ เจจเฉ‚เฉฐ เจฒเจพเจ‚เจš เจ•เจฐเจจ เจฆเจพ เจธเจฎเจพเจ‚ เจ†เจ‰เจ‚เจฆเจพ เจนเฉˆ เจคเจพเจ‚ เจเจ…เจฐเจซเฉ‹ เจจเฉ‚เฉฐ เจฎเจฐเฉ‹เฉœ เจฆเฉ‡เจตเฉ‡เจ—เจพ - เจ‡เจธ เจตเจฟเฉฑเจš เจ…เจธเฉ€เจ‚ เจฒเฉŒเจ—เจ‡เจจ เจ•เจฐเจจเจพ เจญเฉเฉฑเจฒ เจ•เฉ‡ เจฎเฉเฉฑเจ– เจ•เจพเจฐเจตเจพเจˆ เจจเฉ‚เฉฐ เจฒเจพเจ—เฉ‚ เจ•เจฐเจพเจ‚เจ—เฉ‡เฅค (เจ…เจธเฉ€เจ‚ เจฒเฉŒเจ—เจ‡เจจ เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚, เจคเจฐเฉ€เจ•เฉ‡ เจจเจพเจฒ, เจฌเจฟเจฒเจ•เฉเจฒ เจ…เฉฐเจฆเจฐ stdout ะธ stderr - เจเจ…เจฐเจซเจฒเฉ‹ เจนเจฐ เจšเฉ€เจœเจผ เจจเฉ‚เฉฐ เจฐเฉ‹เจ• เจฆเฉ‡เจตเฉ‡เจ—เจพ, เจ‡เจธ เจจเฉ‚เฉฐ เจธเฉเฉฐเจฆเจฐเจคเจพ เจจเจพเจฒ เจฒเจชเฉ‡เจŸ เจฆเฉ‡เจตเฉ‡เจ—เจพ, เจœเจฟเฉฑเจฅเฉ‡ เจฒเฉ‹เฉœ เจนเฉ‹เจตเฉ‡, เจ‡เจธ เจจเฉ‚เฉฐ เจตเจฟเจ—เจพเฉœ เจฆเฉ‡เจตเฉ‡เจ—เจพเฅค)

เจ†เจ“ เจฆเฉ‡เจ–เฉ€เจ เจ•เจฟ เจธเจพเจกเฉ‡ เจ•เฉ‹เจฒ เจ•เฉ€ เจนเฉˆ commons/hooks.py. เจซเจพเจˆเจฒ เจฆเจพ เจชเจนเจฟเจฒเจพ เจนเจฟเฉฑเจธเจพ, เจนเฉเฉฑเจ• เจฆเฉ‡ เจจเจพเจฒ:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

เจฎเฉˆเจจเฉ‚เฉฐ เจ‡เจน เจตเฉ€ เจจเจนเฉ€เจ‚ เจชเจคเจพ เจ•เจฟ เจ‡เฉฑเจฅเฉ‡ เจ•เฉ€ เจธเจฎเจเจพเจ‰เจฃเจพ เจนเฉˆ, เจฎเฉˆเจ‚ เจธเจฟเจฐเจซเจผ เจฎเจนเฉฑเจคเจตเจชเฉ‚เจฐเจจ เจจเฉเจ•เจคเฉ‡ เจจเฉ‹เจŸ เจ•เจฐเจพเจ‚เจ—เจพ:

  • เจ…เจธเฉ€เจ‚ เจตเจฟเจฐเจพเจธเจค เจตเจฟเฉฑเจš เจนเจพเจ‚, เจฆเจฒเฉ€เจฒเจพเจ‚ เจฌเจพเจฐเฉ‡ เจธเฉ‹เจšเฉ‹ - เจœเจผเจฟเจ†เจฆเจพเจคเจฐ เจฎเจพเจฎเจฒเจฟเจ†เจ‚ เจตเจฟเฉฑเจš เจ‡เจน เจ‡เฉฑเจ• เจนเฉ‹เจตเฉ‡เจ—เจพ: conn_id;
  • เจฎเจฟเจ†เจฐเฉ€ เจขเฉฐเจ—เจพเจ‚ เจจเฉ‚เฉฐ เจ“เจตเจฐเจฐเจพเจˆเจก เจ•เจฐเจจเจพ: เจฎเฉˆเจ‚ เจ†เจชเจฃเฉ‡ เจ†เจช เจจเฉ‚เฉฐ เจธเฉ€เจฎเจค เจ•เจฐ เจฒเจฟเจ† เจนเฉˆ get_conn(), เจœเจฟเจธ เจตเจฟเฉฑเจš เจฎเฉˆเจ‚ เจจเจพเจฎ เจฆเฉเจ†เจฐเจพ เจ•เจจเฉˆเจ•เจธเจผเจจ เจชเฉˆเจฐเจพเจฎเฉ€เจŸเจฐ เจชเฉเจฐเจพเจชเจค เจ•เจฐเจฆเจพ เจนเจพเจ‚ เจ…เจคเฉ‡ เจ•เฉ‡เจตเจฒ เจญเจพเจ— เจชเฉเจฐเจพเจชเจค เจ•เจฐเจฆเจพ เจนเจพเจ‚ extra (เจ‡เจน เจ‡เฉฑเจ• JSON เจ–เฉ‡เจคเจฐ เจนเฉˆ), เจœเจฟเจธ เจตเจฟเฉฑเจš เจฎเฉˆเจ‚ (เจฎเฉ‡เจฐเฉ€เจ†เจ‚ เจ†เจชเจฃเฉ€เจ†เจ‚ เจนเจฆเจพเจ‡เจคเจพเจ‚ เจ…เจจเฉเจธเจพเจฐ!) เจŸเฉˆเจฒเฉ€เจ—เฉเจฐเจพเจฎ เจฌเฉ‹เจŸ เจŸเฉ‹เจ•เจจ เจชเจพ เจฆเจฟเฉฑเจคเจพ เจนเฉˆ: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • เจฎเฉˆเจ‚ เจธเจพเจกเฉ€ เจ‡เฉฑเจ• เจ‰เจฆเจพเจนเจฐเจฃ เจฌเจฃเจพเจ‰เจ‚เจฆเจพ เจนเจพเจ‚ TelegramBot, เจ‡เจธ เจจเฉ‚เฉฐ เจ‡เฉฑเจ• เจ–เจพเจธ เจŸเฉ‹เจ•เจจ เจฆเจฟเฉฐเจฆเฉ‡ เจนเฉ‹เจเฅค

เจ‡เจน เจธเจญ เจนเฉˆ. เจคเฉเจธเฉ€เจ‚ เจตเจฐเจค เจ•เฉ‡ เจ‡เฉฑเจ• เจนเฉเฉฑเจ• เจคเฉ‹เจ‚ เจ‡เฉฑเจ• เจ•เจฒเจพเจ‡เฉฐเจŸ เจชเฉเจฐเจพเจชเจค เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเฉ‹ TelegramBotHook().clent เจœ TelegramBotHook().get_conn().

เจ…เจคเฉ‡ เจซเจพเจˆเจฒ เจฆเจพ เจฆเฉ‚เจœเจพ เจนเจฟเฉฑเจธเจพ, เจœเจฟเจธ เจตเจฟเฉฑเจš เจฎเฉˆเจ‚ เจŸเฉˆเจฒเฉ€เจ—เฉเจฐเจพเจฎ REST API เจฒเจˆ เจ‡เฉฑเจ• เจฎเจพเจˆเจ•เฉเจฐเฉ‹เจตเจฐเฉˆเจชเจฐ เจฌเจฃเจพเจ‰เจ‚เจฆเจพ เจนเจพเจ‚, เจคเจพเจ‚ เจœเฉ‹ เจ‰เจธเฉ‡ เจจเฉ‚เฉฐ เจ–เจฟเฉฑเจšเจฟเจ† เจจเจพ เจœเจพเจตเฉ‡ python-telegram-bot เจ‡เฉฑเจ• เจขเฉฐเจ— เจฒเจˆ sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

เจธเจนเฉ€ เจคเจฐเฉ€เจ•เจพ เจนเฉˆ เจ‡เจน เจธเจญ เจœเฉ‹เฉœเจจเจพ: TelegramBotSendMessage, TelegramBotHook, TelegramBot - เจชเจฒเฉฑเจ—เจ‡เจจ เจตเจฟเฉฑเจš, เจ‡เฉฑเจ• เจœเจจเจคเจ• เจฐเจฟเจชเฉ‹เจœเจผเจŸเจฐเฉ€ เจตเจฟเฉฑเจš เจชเจพเจ“, เจ…เจคเฉ‡ เจ‡เจธเจจเฉ‚เฉฐ เจ“เจชเจจ เจธเฉ‹เจฐเจธ เจจเฉ‚เฉฐ เจฆเจฟเจ“เฅค

เจœเจฆเฉ‹เจ‚ เจ…เจธเฉ€เจ‚ เจ‡เจธ เจธเจญ เจฆเจพ เจ…เจงเจฟเจเจจ เจ•เจฐ เจฐเจนเฉ‡ เจธเฉ€, เจธเจพเจกเฉ€ เจฐเจฟเจชเฉ‹เจฐเจŸ เจ…เฉฑเจชเจกเฉ‡เจŸ เจธเจซเจฒเจคเจพเจชเฉ‚เจฐเจตเจ• เจ…เจธเจซเจฒ เจนเฉ‹ เจ—เจˆ เจ…เจคเฉ‡ เจฎเฉˆเจจเฉ‚เฉฐ เจšเฉˆเจจเจฒ เจตเจฟเฉฑเจš เจ‡เฉฑเจ• เจ—เจฒเจคเฉ€ เจธเฉเจจเฉ‡เจนเจพ เจญเฉ‡เจœเจฟเจ† เจ—เจฟเจ†เฅค เจฎเฉˆเจ‚ เจ‡เจน เจฆเฉ‡เจ–เจฃ เจฒเจˆ เจœเจพเจ‚เจš เจ•เจฐเจจ เจœเจพ เจฐเจฟเจนเจพ เจนเจพเจ‚ เจ•เจฟ เจ•เฉ€ เจ‡เจน เจ—เจฒเจค เจนเฉˆ...

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ
เจธเจพเจกเฉ‡ เจ•เฉเฉฑเจคเฉ‡ เจตเจฟเฉฑเจš เจ•เฉเจ เจŸเฉเฉฑเจŸ เจ—เจฟเจ†! เจ•เฉ€ เจ‡เจน เจ‰เจนเฉ€ เจจเจนเฉ€เจ‚ เจนเฉˆ เจœเจฟเจธเจฆเฉ€ เจ…เจธเฉ€เจ‚ เจ‰เจฎเฉ€เจฆ เจ•เจฐ เจฐเจนเฉ‡ เจธเฉ€? เจฌเจฟเจฒเจ•เฉเจฒ!

เจ•เฉ€ เจคเฉเจธเฉ€เจ‚ เจกเฉ‹เจฒเฉเจนเจฃ เจœเจพ เจฐเจนเฉ‡ เจนเฉ‹?

เจ•เฉ€ เจคเฉเจธเฉ€เจ‚ เจฎเจนเจฟเจธเฉ‚เจธ เจ•เจฐเจฆเฉ‡ เจนเฉ‹ เจ•เจฟ เจฎเฉˆเจ‚ เจ•เฉเจ เจ—เฉเจ† เจฆเจฟเฉฑเจคเจพ เจนเฉˆ? เจ…เจœเจฟเจนเจพ เจฒเจ—เจฆเจพ เจนเฉˆ เจ•เจฟ เจ‰เจธเจจเฉ‡ SQL เจธเจฐเจตเจฐ เจคเฉ‹เจ‚ เจตเจฐเจŸเจฟเจ•เจพ เจตเจฟเฉฑเจš เจกเฉ‡เจŸเจพ เจŸเฉเจฐเจพเจ‚เจธเจซเจฐ เจ•เจฐเจจ เจฆเจพ เจตเจพเจ…เจฆเจพ เจ•เฉ€เจคเจพ เจธเฉ€, เจ…เจคเฉ‡ เจซเจฟเจฐ เจ‰เจธเจจเฉ‡ เจ‡เจธเจจเฉ‚เฉฐ เจฒเจฟเจ† เจ…เจคเฉ‡ เจตเจฟเจธเจผเฉ‡ เจจเฉ‚เฉฐ เจ›เฉฑเจก เจฆเจฟเฉฑเจคเจพ, เจฌเจฆเจจเจพเจฎ!

เจ‡เจน เจ…เฉฑเจคเจฟเจ†เจšเจพเจฐ เจœเจพเจฃเจฌเฉเฉฑเจ เจ•เฉ‡ เจ•เฉ€เจคเจพ เจ—เจฟเจ† เจธเฉ€, เจฎเฉˆเจจเฉ‚เฉฐ เจคเฉเจนเจพเจกเฉ‡ เจฒเจˆ เจ•เฉเจ เจธเจผเจฌเจฆเจพเจตเจฒเฉ€ เจธเจฎเจเจฃเฉ€ เจธเฉ€เฅค เจนเฉเจฃ เจคเฉเจธเฉ€เจ‚ เจนเฉ‹เจฐ เจ…เฉฑเจ—เฉ‡ เจœเจพ เจธเจ•เจฆเฉ‡ เจนเฉ‹เฅค

เจธเจพเจกเฉ€ เจฏเฉ‹เจœเจจเจพ เจ‡เจน เจธเฉ€:

  1. เจกเจพเจ— เจ•เจฐเฉ‹
  2. เจ•เจพเจฐเจœ เจคเจฟเจ†เจฐ เจ•เจฐเฉ‹
  3. เจฆเฉ‡เจ–เฉ‹ เจ•เจฟ เจนเจฐ เจšเฉ€เจœเจผ เจ•เจฟเฉฐเจจเฉ€ เจธเฉเฉฐเจฆเจฐ เจนเฉˆ
  4. เจญเจฐเจจ เจฒเจˆ เจธเฉˆเจธเจผเจจ เจจเฉฐเจฌเจฐ เจจเจฟเจฐเจงเจพเจฐเจค เจ•เจฐเฉ‹
  5. SQL เจธเจฐเจตเจฐ เจคเฉ‹เจ‚ เจกเจพเจŸเจพ เจชเฉเจฐเจพเจชเจค เจ•เจฐเฉ‹
  6. Vertica เจตเจฟเฉฑเจš เจกเฉ‡เจŸเจพ เจชเจพเจ“
  7. เจ…เฉฐเจ•เฉœเฉ‡ เจ‡เจ•เฉฑเจ เฉ‡ เจ•เจฐเฉ‹

เจ‡เจธ เจฒเจˆ, เจ‡เจธ เจธเจญ เจจเฉ‚เฉฐ เจชเฉเจฐเจพเจชเจค เจ•เจฐเจจ เจ…เจคเฉ‡ เจšเจฒเจพเจ‰เจฃ เจฒเจˆ, เจฎเฉˆเจ‚ เจธเจพเจกเฉ‡ เจตเจฟเฉฑเจš เจ‡เฉฑเจ• เจ›เฉ‹เจŸเจพ เจœเจฟเจนเจพ เจตเจพเจงเจพ เจ•เฉ€เจคเจพ docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

เจ‰เฉฑเจฅเฉ‡ เจ…เจธเฉ€เจ‚ เจ‰เจ เจพเจ‰เจ‚เจฆเฉ‡ เจนเจพเจ‚:

  • เจฎเฉ‡เจœเจผเจฌเจพเจจ เจตเจœเฉ‹เจ‚ Vertica dwh เจธเจญ เจคเฉ‹เจ‚ เจกเจฟเจซเฉŒเจฒเจŸ เจธเฉˆเจŸเจฟเฉฐเจ—เจพเจ‚ เจฆเฉ‡ เจจเจพเจฒ,
  • SQL เจธเจฐเจตเจฐ เจฆเฉ‡ เจคเจฟเฉฐเจจ เจฎเฉŒเจ•เฉ‡,
  • เจ…เจธเฉ€เจ‚ เจฌเจพเจ…เจฆ เจตเจฟเฉฑเจš เจกเฉ‡เจŸเจพเจฌเฉ‡เจธ เจจเฉ‚เฉฐ เจ•เฉเจ เจกเฉ‡เจŸเจพ เจจเจพเจฒ เจญเจฐเจฆเฉ‡ เจนเจพเจ‚ (เจ•เจฟเจธเฉ‡ เจตเฉ€ เจธเจฅเจฟเจคเฉ€ เจตเจฟเฉฑเจš เจจเจพ เจตเฉ‡เจ–เฉ‹ mssql_init.py!)

เจ…เจธเฉ€เจ‚ เจชเจฟเจ›เจฒเฉ€ เจตเจพเจฐ เจฆเฉ‡ เจฎเฉเจ•เจพเจฌเจฒเฉ‡ เจฅเฉ‹เฉœเฉเจนเฉ€ เจœเจฟเจนเฉ€ เจ—เฉเฉฐเจเจฒเจฆเจพเจฐ เจ•เจฎเจพเจ‚เจก เจฆเฉ€ เจฎเจฆเจฆ เจจเจพเจฒ เจธเจพเจฐเฉ‡ เจšเฉฐเจ—เฉ‡ เจฒเจพเจ‚เจš เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

เจธเจพเจกเฉ‡ เจšเจฎเจคเจ•เจพเจฐ เจฐเฉˆเจ‚เจกเจฎเจพเจˆเจœเจผเจฐ เจจเฉ‡ เจ•เฉ€ เจฌเจฃเจพเจ‡เจ† เจนเฉˆ, เจคเฉเจธเฉ€เจ‚ เจ†เจˆเจŸเจฎ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเฉ‹ Data Profiling/Ad Hoc Query:

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ
เจฎเฉเฉฑเจ– เจ—เฉฑเจฒ เจ‡เจน เจนเฉˆ เจ•เจฟ เจ‡เจธ เจจเฉ‚เฉฐ เจตเจฟเจธเจผเจฒเฉ‡เจธเจผเจ•เจพเจ‚ เจจเฉ‚เฉฐ เจฆเจฟเจ–เจพเจ‰เจฃเจพ เจจเจนเฉ€เจ‚ เจนเฉˆ

'เจคเฉ‡ เจตเจฟเจธเจคเฉเจฐเจฟเจค ETL เจธเฉˆเจธเจผเจจ เจฎเฉˆเจ‚ เจจเจนเฉ€เจ‚ เจ•เจฐเจพเจ‚เจ—เจพ, เจ‡เฉฑเจฅเฉ‡ เจธเจญ เจ•เฉเจ เจฎเจพเจฎเฉ‚เจฒเฉ€ เจนเฉˆ: เจ…เจธเฉ€เจ‚ เจ‡เฉฑเจ• เจ…เจงเจพเจฐ เจฌเจฃเจพเจ‰เจ‚เจฆเฉ‡ เจนเจพเจ‚, เจ‡เจธ เจตเจฟเฉฑเจš เจ‡เฉฑเจ• เจšเจฟเฉฐเจจเฉเจน เจนเฉเฉฐเจฆเจพ เจนเฉˆ, เจ…เจธเฉ€เจ‚ เจ‡เฉฑเจ• เจชเฉเจฐเจธเฉฐเจ— เจฎเฉˆเจจเฉ‡เจœเจฐ เจจเจพเจฒ เจนเจฐ เจšเฉ€เจœเจผ เจจเฉ‚เฉฐ เจธเจฎเฉ‡เจŸเจฆเฉ‡ เจนเจพเจ‚, เจ…เจคเฉ‡ เจนเฉเจฃ เจ…เจธเฉ€เจ‚ เจ‡เจน เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

เจธเจฎเจพเจ‚ เจ† เจ—เจฟเจ† เจนเฉˆ เจธเจพเจกเจพ เจกเจพเจŸเจพ เจ‡เจ•เฉฑเจ เจพ เจ•เจฐเฉ‹ เจธเจพเจกเฉ‡ เจกเฉ‡เจข เจธเฉŒ เจฎเฉ‡เจœเจผเจพเจ‚ เจคเฉ‹เจ‚เฅค เจ†เจ‰ เจ‡เจธเจจเฉ‚เฉฐ เจฌเจนเฉเจค เจนเฉ€ เจฌเฉ‡เจฎเจฟเจธเจพเจฒ เจฒเจพเจˆเจจเจพเจ‚ เจฆเฉ€ เจฎเจฆเจฆ เจจเจพเจฒ เจ•เจฐเฉ€เจ:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. เจนเฉเฉฑเจ• เจฆเฉ€ เจฎเจฆเจฆ เจจเจพเจฒ เจ…เจธเฉ€เจ‚ เจเจ…เจฐเจซเจฒเฉ‹ เจคเฉ‹เจ‚ เจชเฉเจฐเจพเจชเจค เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚ pymssql- เจœเฉเฉœเฉ‹
  2. เจ†เจ‰ เจฌเฉ‡เจจเจคเฉ€ เจตเจฟเฉฑเจš เจ‡เฉฑเจ• เจฎเจฟเจคเฉ€ เจฆเฉ‡ เจฐเฉ‚เจช เจตเจฟเฉฑเจš เจ‡เฉฑเจ• เจชเจพเจฌเฉฐเจฆเฉ€ เจจเฉ‚เฉฐ เจฌเจฆเจฒเฉ€เจ - เจ‡เจธเจจเฉ‚เฉฐ เจŸเฉˆเจ‚เจชเจฒเฉ‡เจŸ เจ‡เฉฐเจœเจฃ เจฆเฉเจ†เจฐเจพ เจซเฉฐเจ•เจธเจผเจจ เจตเจฟเฉฑเจš เจธเฉเฉฑเจŸ เจฆเจฟเฉฑเจคเจพ เจœเจพเจตเฉ‡เจ—เจพเฅค
  3. เจธเจพเจกเฉ€ เจฌเฉ‡เจจเจคเฉ€ เจจเฉ‚เฉฐ เจ–เฉเจ†เจ‰เจฃเจพ pandasเจœเฉ‹ เจธเจพเจจเฉ‚เฉฐ เจชเฉเจฐเจพเจชเจค เจ•เจฐเฉ‡เจ—เจพ DataFrame - เจ‡เจน เจญเจตเจฟเฉฑเจ– เจตเจฟเฉฑเจš เจธเจพเจกเฉ‡ เจฒเจˆ เจฒเจพเจญเจฆเจพเจ‡เจ• เจนเฉ‹เจตเฉ‡เจ—เจพเฅค

เจฎเฉˆเจ‚ เจฌเจฆเจฒ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐ เจฐเจฟเจนเจพ เจนเจพเจ‚ {dt} เจ‡เฉฑเจ• เจฌเฉ‡เจจเจคเฉ€ เจชเฉˆเจฐเจพเจฎเฉ€เจŸเจฐ เจฆเฉ€ เจฌเจœเจพเจ %s เจ‡เจธ เจฒเจˆ เจจเจนเฉ€เจ‚ เจ•เจฟ เจฎเฉˆเจ‚ เจ‡เฉฑเจ• เจฆเฉเจธเจผเจŸ เจชเจฟเจจเฉ‹เจšเจฟเจ“ เจนเจพเจ‚, เจชเจฐ เจ•เจฟเจ‰เจ‚เจ•เจฟ pandas เจธเฉฐเจญเจพเจฒ เจจเจนเฉ€เจ‚ เจธเจ•เจฆเจพ pymssql เจ…เจคเฉ‡ เจ†เจ–เจฐเฉ€ เจจเฉ‚เฉฐ เจ–เจฟเจธเจ•เจพเจ‰เจ‚เจฆเจพ เจนเฉˆ params: Listเจนเจพเจฒเจพเจ‚เจ•เจฟ เจ‰เจน เจ…เจธเจฒ เจตเจฟเฉฑเจš เจšเจพเจนเฉเฉฐเจฆเจพ เจนเฉˆ tuple.
เจ‡เจน เจตเฉ€ เจจเฉ‹เจŸ เจ•เจฐเฉ‹ เจ•เจฟ เจกเจฟเจตเฉˆเจฒเจชเจฐ pymssql เจนเฉเจฃ เจ‰เจธ เจฆเจพ เจธเจฎเจฐเจฅเจจ เจจเจพ เจ•เจฐเจจ เจฆเจพ เจซเฉˆเจธเจฒเจพ เจ•เฉ€เจคเจพ, เจ…เจคเฉ‡ เจ‡เจน เจฌเจพเจนเจฐ เจœเจพเจฃ เจฆเจพ เจธเจฎเจพเจ‚ เจนเฉˆ pyodbc.

เจ†เจ“ เจฆเฉ‡เจ–เฉ€เจ เจ•เจฟ เจเจ…เจฐเจซเจฒเฉ‹ เจจเฉ‡ เจธเจพเจกเฉ‡ เจซเฉฐเจ•เจธเจผเจจเจพเจ‚ เจฆเฉ€เจ†เจ‚ เจฆเจฒเฉ€เจฒเจพเจ‚ เจจเฉ‚เฉฐ เจ•เจฟเจธ เจจเจพเจฒ เจญเจฐเจฟเจ† เจนเฉˆ:

เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹: เจˆเจŸเฉ€เจเจฒ เจจเฉ‚เฉฐ เจ†เจธเจพเจจ เจฌเจฃเจพเจ‰เจฃเจพ

เจœเฉ‡เจ•เจฐ เจ•เฉ‹เจˆ เจกเจพเจŸเจพ เจจเจนเฉ€เจ‚ เจนเฉˆ, เจคเจพเจ‚ เจœเจพเจฐเฉ€ เจฐเฉฑเจ–เจฃ เจฆเจพ เจ•เฉ‹เจˆ เจฎเจคเจฒเจฌ เจจเจนเฉ€เจ‚ เจนเฉˆ. เจชเจฐ เจญเจฐเจจ เจจเฉ‚เฉฐ เจธเจซเจฒ เจฎเฉฐเจจเจฃเจพ เจตเฉ€ เจ…เจœเฉ€เจฌ เจนเฉˆเฅค เจชเจฐ เจ‡เจน เจ•เฉ‹เจˆ เจ—เจฒเจคเฉ€ เจจเจนเฉ€เจ‚ เจนเฉˆเฅค A-ah-ah, เจ•เฉ€ เจ•เจฐเจจเจพ เจนเฉˆ ?! เจ…เจคเฉ‡ เจ‡เฉฑเจฅเฉ‡ เจ•เฉ€ เจนเฉˆ:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException เจเจ…เจฐเจซเจฒเฉ‹ เจจเฉ‚เฉฐ เจฆเฉฑเจธเจฆเจพ เจนเฉˆ เจ•เจฟ เจ•เฉ‹เจˆ เจคเจฐเฉเฉฑเจŸเฉ€เจ†เจ‚ เจจเจนเฉ€เจ‚ เจนเจจ, เจชเจฐ เจ…เจธเฉ€เจ‚ เจ•เฉฐเจฎ เจจเฉ‚เฉฐ เจ›เฉฑเจก เจฆเจฟเฉฐเจฆเฉ‡ เจนเจพเจ‚เฅค เจ‡เฉฐเจŸเจฐเจซเฉ‡เจธ เจตเจฟเฉฑเจš เจ‡เฉฑเจ• เจนเจฐเจพ เจœเจพเจ‚ เจฒเจพเจฒ เจตเจฐเจ— เจจเจนเฉ€เจ‚ เจนเฉ‹เจตเฉ‡เจ—เจพ, เจชเจฐ เจ—เฉเจฒเจพเจฌเฉ€.

เจ†เจ‰ เจธเจพเจกเฉ‡ เจกเฉ‡เจŸเจพ เจจเฉ‚เฉฐ เจŸเฉŒเจธ เจ•เจฐเฉ€เจ เจ•เจˆ เจ•เจพเจฒเจฎ:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

เจ…เจฐเจฅเจพเจค:

  • เจ‰เจน เจกเฉ‡เจŸเจพเจฌเฉ‡เจธ เจœเจฟเจธ เจคเฉ‹เจ‚ เจ…เจธเฉ€เจ‚ เจ†เจฐเจกเจฐ เจฒเจ เจธเจจ,
  • เจธเจพเจกเฉ‡ เจนเฉœเฉเจน เจธเฉˆเจธเจผเจจ เจฆเฉ€ ID (เจ‡เจน เจตเฉฑเจ–เจฐเจพ เจนเฉ‹เจตเฉ‡เจ—เจพ เจนเจฐ เจ•เฉฐเจฎ เจฒเจˆ),
  • เจธเจฐเฉ‹เจค เจ…เจคเฉ‡ เจ†เจฐเจกเจฐ เจ†เจˆเจกเฉ€ เจคเฉ‹เจ‚ เจ‡เฉฑเจ• เจนเฉˆเจธเจผ - เจคเจพเจ‚ เจœเฉ‹ เจ…เฉฐเจคเจฎ เจกเฉ‡เจŸเจพเจฌเฉ‡เจธ เจตเจฟเฉฑเจš (เจœเจฟเฉฑเจฅเฉ‡ เจธเจญ เจ•เฉเจ เจ‡เฉฑเจ• เจŸเฉ‡เจฌเจฒ เจตเจฟเฉฑเจš เจชเจพเจ‡เจ† เจœเจพเจ‚เจฆเจพ เจนเฉˆ) เจธเจพเจกเฉ‡ เจ•เฉ‹เจฒ เจ‡เฉฑเจ• เจตเจฟเจฒเฉฑเจ–เจฃ เจ†เจฐเจกเจฐ เจ†เจˆเจกเฉ€ เจนเฉˆ.

เจ…เฉฐเจคเจฎ เจชเฉœเจพเจ… เจฌเจพเจ•เฉ€ เจนเฉˆ: เจนเจฐ เจšเฉ€เจœเจผ เจจเฉ‚เฉฐ เจตเจฐเจŸเฉ€เจ•เจพ เจตเจฟเฉฑเจš เจกเฉ‹เจฒเฉเจน เจฆเจฟเจ“เฅค เจ…เจคเฉ‡, เจ…เจœเฉ€เจฌ เจคเฉŒเจฐ 'เจคเฉ‡, เจ…เจœเจฟเจนเจพ เจ•เจฐเจจ เจฆเฉ‡ เจธเจญ เจคเฉ‹เจ‚ เจธเจผเจพเจจเจฆเจพเจฐ เจ…เจคเฉ‡ เจ•เฉเจธเจผเจฒ เจคเจฐเฉ€เจ•เจฟเจ†เจ‚ เจตเจฟเฉฑเจšเฉ‹เจ‚ เจ‡เฉฑเจ• เจนเฉˆ CSV เจฆเฉเจ†เจฐเจพ!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. เจ…เจธเฉ€เจ‚ เจ‡เฉฑเจ• เจตเจฟเจธเจผเฉ‡เจธเจผ เจฐเจฟเจธเฉ€เจตเจฐ เจฌเจฃเจพ เจฐเจนเฉ‡ เจนเจพเจ‚ StringIO.
  2. pandas เจ•เจฟเจฐเจชเจพ เจ•เจฐเจ•เฉ‡ เจธเจพเจกเฉ‡ เจชเจพ เจฆเฉ‡เจตเฉ‡เจ—เจพ DataFrame เจฆเฉ‡ เจฐเฉ‚เจช เจตเจฟเฉฑเจš CSV-เจฒเจพเจˆเจจเจพเจ‚เฅค
  3. เจ†เจ‰ เจ‡เฉฑเจ• เจนเฉเฉฑเจ• เจจเจพเจฒ เจธเจพเจกเฉ‡ เจฎเจจเจชเจธเฉฐเจฆ เจตเจฐเจŸเจฟเจ•เจพ เจจเจพเจฒ เจ‡เฉฑเจ• เจ•เฉเจจเฉˆเจ•เจธเจผเจจ เจ–เฉ‹เจฒเฉเจนเฉ€เจเฅค
  4. เจ…เจคเฉ‡ เจนเฉเจฃ เจฎเจฆเจฆ เจจเจพเจฒ copy() เจธเจพเจกเจพ เจกเฉ‡เจŸเจพ เจธเจฟเฉฑเจงเจพ เจตเจฐเจŸเจฟเจ•เจพ เจจเฉ‚เฉฐ เจญเฉ‡เจœเฉ‹!

เจ…เจธเฉ€เจ‚ เจกเจฐเจพเจˆเจตเจฐ เจคเฉ‹เจ‚ เจฒเจตเจพเจ‚เจ—เฉ‡ เจ•เจฟ เจ•เจฟเฉฐเจจเฉ€เจ†เจ‚ เจฒเจพเจˆเจจเจพเจ‚ เจญเจฐเฉ€เจ†เจ‚ เจ—เจˆเจ†เจ‚ เจธเจจ, เจ…เจคเฉ‡ เจธเฉˆเจธเจผเจจ เจฎเฉˆเจจเฉ‡เจœเจฐ เจจเฉ‚เฉฐ เจฆเฉฑเจธเจพเจ‚เจ—เฉ‡ เจ•เจฟ เจธเจญ เจ•เฉเจ เจ เฉ€เจ• เจนเฉˆ:

session.loaded_rows = cursor.rowcount
session.successful = True

เจ‡เจน เจธเจญ เจนเฉˆ.

เจตเจฟเจ•เจฐเฉ€ 'เจคเฉ‡, เจ…เจธเฉ€เจ‚ เจจเจฟเจธเจผเจพเจจเจพ เจชเจฒเฉ‡เจŸ เจนเฉฑเจฅเฉ€เจ‚ เจฌเจฃเจพเจ‰เจ‚เจฆเฉ‡ เจนเจพเจ‚เฅค เจ‡เฉฑเจฅเฉ‡ เจฎเฉˆเจ‚ เจ†เจชเจฃเฉ‡ เจ†เจช เจจเฉ‚เฉฐ เจ‡เฉฑเจ• เจ›เฉ‹เจŸเฉ€ เจฎเจธเจผเฉ€เจจ เจฆเฉ€ เจ‡เจœเจพเจœเจผเจค เจฆเจฟเฉฑเจคเฉ€:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

เจฎเฉˆเจ‚ เจตเจฐเจค เจฐเจฟเจนเจพ เจนเจพเจ‚ VerticaOperator() เจฎเฉˆเจ‚ เจ‡เฉฑเจ• เจกเฉ‡เจŸเจพเจฌเฉ‡เจธ เจธเจ•เฉ€เจฎเจพ เจ…เจคเฉ‡ เจ‡เฉฑเจ• เจธเจพเจฐเจฃเฉ€ เจฌเจฃเจพเจ‰เจ‚เจฆเจพ เจนเจพเจ‚ (เจœเฉ‡ เจ‰เจน เจชเจนเจฟเจฒเจพเจ‚ เจคเฉ‹เจ‚ เจฎเฉŒเจœเฉ‚เจฆ เจจเจนเฉ€เจ‚ เจนเจจ, เจฌเฉ‡เจธเจผเจ•)เฅค เจฎเฉเฉฑเจ– เจ—เฉฑเจฒ เจ‡เจน เจนเฉˆ เจ•เจฟ เจจเจฟเจฐเจญเจฐเจคเจพ เจจเฉ‚เฉฐ เจธเจนเฉ€ เจขเฉฐเจ— เจจเจพเจฒ เจตเจฟเจตเจธเจฅเจฟเจค เจ•เจฐเจจเจพ:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

เจธเฉฐเจ–เฉ‡เจช

- เจ เฉ€เจ• เจนเฉˆ, - เจ›เฉ‹เจŸเฉ‡ เจšเฉ‚เจนเฉ‡ เจจเฉ‡ เจ•เจฟเจนเจพ, - เจนเฉเจฃ เจ‡เจน เจจเจนเฉ€เจ‚ เจนเฉˆ
เจ•เฉ€ เจคเฉเจนเจพเจจเฉ‚เฉฐ เจฏเจ•เฉ€เจจ เจนเฉˆ เจ•เจฟ เจฎเฉˆเจ‚ เจœเฉฐเจ—เจฒ เจฆเจพ เจธเจญ เจคเฉ‹เจ‚ เจญเจฟเจ†เจจเจ• เจœเจพเจจเจตเจฐ เจนเจพเจ‚?

เจœเฉ‚เจฒเฉ€เจ† เจกเฉ‹เจจเจพเจฒเจกเจธเจจ, เจฆ เจ—เฉเจฐเจซเจพเจฒเฉ‹

เจฎเฉˆเจจเฉ‚เฉฐ เจฒเจ—เจฆเจพ เจนเฉˆ เจ•เจฟ เจœเฉ‡ เจฎเฉ‡เจฐเฉ‡ เจธเจพเจฅเฉ€เจ†เจ‚ เจ…เจคเฉ‡ เจฎเฉ‡เจฐเฉ‡ เจตเจฟเฉฑเจš เจ‡เฉฑเจ• เจฎเฉเจ•เจพเจฌเจฒเจพ เจธเฉ€: เจ•เฉŒเจฃ เจธเจ•เฉเจฐเฉˆเจš เจคเฉ‹เจ‚ เจ‡เฉฑเจ• ETL เจชเฉเจฐเจ•เจฟเจฐเจฟเจ† เจจเฉ‚เฉฐ เจœเจฒเจฆเฉ€ เจฌเจฃเจพเจตเฉ‡เจ—เจพ เจ…เจคเฉ‡ เจฒเจพเจ‚เจš เจ•เจฐเฉ‡เจ—เจพ: เจ‰เจน เจ†เจชเจฃเฉ‡ SSIS เจ…เจคเฉ‡ เจ‡เฉฑเจ• เจฎเจพเจŠเจธ เจจเจพเจฒ เจ…เจคเฉ‡ เจฎเฉˆเจ‚ เจเจ…เจฐเจซเจฒเฉ‹ เจจเจพเจฒ ... เจ…เจคเฉ‡ เจซเจฟเจฐ เจ…เจธเฉ€เจ‚ เจฐเฉฑเจ–-เจฐเจ–เจพเจ… เจฆเฉ€ เจธเฉŒเจ– เจฆเฉ€ เจคเฉเจฒเจจเจพ เจตเฉ€ เจ•เจฐเจพเจ‚เจ—เฉ‡ ... เจตเจพเจน, เจฎเฉˆเจจเฉ‚เฉฐ เจฒเจ—เจฆเจพ เจนเฉˆ เจ•เจฟ เจคเฉเจธเฉ€เจ‚ เจธเจนเจฟเจฎเจค เจนเฉ‹เจตเฉ‹เจ—เฉ‡ เจ•เจฟ เจฎเฉˆเจ‚ เจ‰เจจเฉเจนเจพเจ‚ เจจเฉ‚เฉฐ เจนเจฐ เจฎเฉ‹เจฐเจšเฉ‡ 'เจคเฉ‡ เจนเจฐเจพเจตเจพเจ‚เจ—เจพ!

เจœเฉ‡ เจฅเฉ‹เฉœเจพ เจนเฉ‹เจฐ เจ—เฉฐเจญเฉ€เจฐเจคเจพ เจจเจพเจฒ, เจคเจพเจ‚ เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹ - เจชเฉเจฐเฉ‹เจ—เจฐเจพเจฎ เจ•เฉ‹เจก เจฆเฉ‡ เจฐเฉ‚เจช เจตเจฟเฉฑเจš เจชเฉเจฐเจ•เจฟเจฐเจฟเจ†เจตเจพเจ‚ เจฆเจพ เจตเจฐเจฃเจจ เจ•เจฐเจ•เฉ‡ - เจฎเฉ‡เจฐเจพ เจ•เฉฐเจฎ เจ•เฉ€เจคเจพ. เจฌเจนเฉเจค เจ•เฉเจ เจตเจงเฉ‡เจฐเฉ‡ เจ†เจฐเจพเจฎเจฆเจพเจ‡เจ• เจ…เจคเฉ‡ เจฎเจœเจผเฉ‡เจฆเจพเจฐ.

เจ‡เจธเจฆเฉ€ เจ…เจธเฉ€เจฎเจฟเจค เจตเจฟเจธเจคเจพเจฐเจฏเฉ‹เจ—เจคเจพ, เจชเจฒเฉฑเจ—-เจ‡เจจเจพเจ‚ เจ…เจคเฉ‡ เจธเจ•เฉ‡เจฒเฉ‡เจฌเจฟเจฒเจŸเฉ€ เจฆเฉ€ เจชเฉเจฐเจตเจฟเจฐเจคเฉ€ เจฆเฉ‡ เจฐเฉ‚เจช เจตเจฟเฉฑเจš, เจคเฉเจนเจพเจจเฉ‚เฉฐ เจฒเจ—เจญเจ— เจ•เจฟเจธเฉ‡ เจตเฉ€ เจ–เฉ‡เจคเจฐ เจตเจฟเฉฑเจš เจเจ…เจฐเจซเจฒเฉ‹ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจจ เจฆเจพ เจฎเฉŒเจ•เจพ เจฆเจฟเฉฐเจฆเฉ€ เจนเฉˆ: เจ‡เฉฑเจฅเฉ‹เจ‚ เจคเฉฑเจ• เจ•เจฟ เจกเฉ‡เจŸเจพ เจ‡เจ•เฉฑเจ เจพ เจ•เจฐเจจ, เจคเจฟเจ†เจฐ เจ•เจฐเจจ เจ…เจคเฉ‡ เจชเฉเจฐเฉ‹เจธเฉˆเจธ เจ•เจฐเจจ เจฆเฉ‡ เจชเฉ‚เจฐเฉ‡ เจšเฉฑเจ•เจฐ เจตเจฟเฉฑเจš, เจ‡เฉฑเจฅเฉ‹เจ‚ เจคเฉฑเจ• เจ•เจฟ เจฐเจพเจ•เฉ‡เจŸ เจฒเจพเจ‚เจš เจ•เจฐเจจ เจตเจฟเฉฑเจš เจตเฉ€ (เจฎเฉฐเจ—เจฒ เจคเฉฑเจ•, เจ•เฉ‹เจฐเจธ).

เจญเจพเจ— เจซเจพเจˆเจจเจฒ, เจนเจตเจพเจฒเจพ เจ…เจคเฉ‡ เจœเจพเจฃเจ•เจพเจฐเฉ€

เจฐเฉ‡เจ• เจ…เจธเฉ€เจ‚ เจคเฉเจนเจพเจกเฉ‡ เจฒเจˆ เจ‡เจ•เฉฑเจ เจพ เจ•เฉ€เจคเจพ เจนเฉˆ

  • start_date. เจนเจพเจ‚, เจ‡เจน เจชเจนเจฟเจฒเจพเจ‚ เจนเฉ€ เจ‡เฉฑเจ• เจธเจฅเจพเจจเจ• เจฎเฉ€เจฎ เจนเฉˆเฅค เจกเฉŒเจ— เจฆเฉ€ เจฎเฉเฉฑเจ– เจฆเจฒเฉ€เจฒ เจฐเจพเจนเฉ€เจ‚ start_date เจธเจพเจฐเฉ‡ เจชเจพเจธ เจธเฉฐเจ–เฉ‡เจช เจตเจฟเฉฑเจš, เจœเฉ‡เจ•เจฐ เจคเฉเจธเฉ€เจ‚ เจตเจฟเฉฑเจš เจจเจฟเจธเจผเจšเจฟเจค เจ•เจฐเจฆเฉ‡ เจนเฉ‹ start_date เจฎเฉŒเจœเฉ‚เจฆเจพ เจฎเจฟเจคเฉ€, เจ…เจคเฉ‡ schedule_interval - เจ‡เฉฑเจ• เจฆเจฟเจจ, เจซเจฟเจฐ เจกเฉ€เจเจœเฉ€ เจ•เฉฑเจฒเฉเจน เจคเฉ‹เจ‚ เจชเจนเจฟเจฒเจพเจ‚ เจจเจนเฉ€เจ‚ เจธเจผเฉเจฐเฉ‚ เจนเฉ‹เจตเฉ‡เจ—เจพเฅค
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    เจ…เจคเฉ‡ เจ•เฉ‹เจˆ เจนเฉ‹เจฐ เจธเจฎเฉฑเจธเจฟเจ† เจจเจนเฉ€เจ‚.

    เจ‡เจธเจฆเฉ‡ เจจเจพเจฒ เจœเฉเฉœเฉ€ เจ‡เฉฑเจ• เจนเฉ‹เจฐ เจฐเจจเจŸเจพเจˆเจฎ เจ—เจฒเจคเฉ€ เจนเฉˆ: Task is missing the start_date parameter, เจœเฉ‹ เจ…เจ•เจธเจฐ เจ‡เจน เจฆเจฐเจธเจพเจ‰เจ‚เจฆเจพ เจนเฉˆ เจ•เจฟ เจคเฉเจธเฉ€เจ‚ เจกเฉˆเจ— เจ“เจชเจฐเฉ‡เจŸเจฐ เจจเจพเจฒ เจฌเฉฐเจจเฉเจนเจฃเจพ เจญเฉเฉฑเจฒ เจ—เจ เจนเฉ‹เฅค

  • เจธเจพเจฐเฉ‡ เจ‡เฉฑเจ• เจฎเจธเจผเฉ€เจจ 'เจคเฉ‡. เจนเจพเจ‚, เจ…เจคเฉ‡ เจฌเฉ‡เจธ (เจเจ…เจฐเจซเจฒเฉ‹ เจ–เฉเจฆ เจ…เจคเฉ‡ เจธเจพเจกเฉ€ เจ•เฉ‹เจŸเจฟเฉฐเจ—), เจ…เจคเฉ‡ เจ‡เฉฑเจ• เจตเฉˆเจฌ เจธเจฐเจตเจฐ, เจ…เจคเฉ‡ เจ‡เฉฑเจ• เจธเจผเจกเจฟเจŠเจฒเจฐ, เจ…เจคเฉ‡ เจตเจฐเจ•เจฐเฅค เจ…เจคเฉ‡ เจ‡เจน เจตเฉ€ เจ•เฉฐเจฎ เจ•เฉ€เจคเจพ. เจชเจฐ เจธเจฎเฉ‡เจ‚ เจฆเฉ‡ เจจเจพเจฒ, เจธเฉ‡เจตเจพเจตเจพเจ‚ เจฒเจˆ เจ•เจพเจฐเจœเจพเจ‚ เจฆเฉ€ เจ—เจฟเจฃเจคเฉ€ เจตเจงเจฆเฉ€ เจ—เจˆ, เจ…เจคเฉ‡ เจœเจฆเฉ‹เจ‚ PostgreSQL เจจเฉ‡ 20 ms เจฆเฉ€ เจฌเจœเจพเจ 5 s เจตเจฟเฉฑเจš เจธเฉ‚เจšเจ•เจพเจ‚เจ• เจฆเจพ เจœเจตเจพเจฌ เจฆเฉ‡เจฃเจพ เจธเจผเฉเจฐเฉ‚ เจ•เฉ€เจคเจพ, เจ…เจธเฉ€เจ‚ เจ‡เจธเจจเฉ‚เฉฐ เจฒเฉˆ เจฒเจฟเจ† เจ…เจคเฉ‡ เจ‡เจธเจจเฉ‚เฉฐ เจฆเฉ‚เจฐ เจ•เจฐ เจฆเจฟเฉฑเจคเจพเฅค
  • เจฒเฉ‹เจ•เจฒ เจเจ—เจœเจผเฉ€เจ•เจฟเจŠเจŸเจฐเฅค เจนเจพเจ‚, เจ…เจธเฉ€เจ‚ เจ…เจœเฉ‡ เจตเฉ€ เจ‡เจธ 'เจคเฉ‡ เจฌเฉˆเจ เฉ‡ เจนเจพเจ‚, เจ…เจคเฉ‡ เจ…เจธเฉ€เจ‚ เจชเจนเจฟเจฒเจพเจ‚ เจนเฉ€ เจ…เจฅเจพเจน เจ•เฉเฉฐเจก เจฆเฉ‡ เจ•เจฟเจจเจพเจฐเฉ‡ เจ† เจ—เจ เจนเจพเจ‚. LocalExecutor เจนเฉเจฃ เจคเฉฑเจ• เจธเจพเจกเฉ‡ เจฒเจˆ เจ•เจพเจซเฉ€ เจฐเจฟเจนเจพ เจนเฉˆ, เจชเจฐ เจนเฉเจฃ เจธเจฎเจพเจ‚ เจ† เจ—เจฟเจ† เจนเฉˆ เจ•เจฟ เจ˜เฉฑเจŸเฉ‹-เจ˜เฉฑเจŸ เจ‡เฉฑเจ• เจตเจฐเจ•เจฐ เจจเจพเจฒ เจตเจฟเจธเจคเจพเจฐ เจ•เฉ€เจคเจพ เจœเจพเจตเฉ‡, เจ…เจคเฉ‡ เจธเจพเจจเฉ‚เฉฐ CeleryExecutor เจตเจฟเฉฑเจš เจœเจพเจฃ เจฒเจˆ เจธเจ–เจผเจค เจฎเจฟเจนเจจเจค เจ•เจฐเจจเฉ€ เจชเจตเฉ‡เจ—เฉ€เฅค เจ…เจคเฉ‡ เจ‡เจธ เจคเฉฑเจฅ เจฆเฉ‡ เจฎเฉฑเจฆเฉ‡เจจเจœเจผเจฐ เจ•เจฟ เจคเฉเจธเฉ€เจ‚ เจ‡เฉฑเจ• เจฎเจธเจผเฉ€เจจ 'เจคเฉ‡ เจ‡เจธ เจจเจพเจฒ เจ•เฉฐเจฎ เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเฉ‹, เจคเฉเจนเจพเจจเฉ‚เฉฐ เจธเจฐเจตเจฐ 'เจคเฉ‡ เจตเฉ€ เจธเฉˆเจฒเจฐเฉ€ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจจ เจคเฉ‹เจ‚ เจ•เฉเจ เจจเจนเฉ€เจ‚ เจฐเฉ‹เจ•เจฆเจพ, เจœเฉ‹ เจ•เจฟ "เจฌเฉ‡เจธเจผเจ•, เจ•เจฆเฉ‡ เจตเฉ€ เจ‰เจคเจชเจพเจฆเจจ เจตเจฟเฉฑเจš เจจเจนเฉ€เจ‚ เจœเจพเจตเฉ‡เจ—เจพ, เจ‡เจฎเจพเจจเจฆเจพเจฐเฉ€ เจจเจพเจฒ!"
  • เจ—เฉˆเจฐ-เจตเจฐเจคเฉ‹เจ‚ เจฌเจฟเจฒเจŸ-เจ‡เจจ เจŸเฉ‚เจฒ:
    • เจ•เฉเจจเฉˆเจ•เจธเจผเจจ เจธเฉ‡เจตเจพ เจชเฉเจฐเจฎเจพเจฃ เจชเฉฑเจคเจฐเจพเจ‚ เจจเฉ‚เฉฐ เจธเจŸเฉ‹เจฐ เจ•เจฐเจจ เจฒเจˆ,
    • SLA เจฎเจฟเจธ เจธเจฎเฉ‡เจ‚ เจธเจฟเจฐ เจ•เฉฐเจฎ เจจเจพ เจ•เจฐเจจ เจตเจพเจฒเฉ‡ เจ•เฉฐเจฎเจพเจ‚ เจฆเจพ เจœเจตเจพเจฌ เจฆเฉ‡เจฃ เจฒเจˆ,
    • xcom เจฎเฉˆเจŸเจพเจกเฉ‡เจŸเจพ เจเจ•เจธเจšเฉ‡เจ‚เจœ เจฒเจˆ (เจฎเฉˆเจ‚ เจ•เจฟเจนเจพ เจฎเฉˆเจŸเจพเจกเจพเจŸเจพ!) เจกเฉˆเจ— เจ•เฉฐเจฎเจพเจ‚ เจฆเฉ‡ เจตเจฟเจšเจ•เจพเจฐเฅค
  • เจฎเฉ‡เจฒ เจฆเฉเจฐเจตเจฟเจตเจนเจพเจฐเฅค เจ–เฉˆเจฐ, เจฎเฉˆเจ‚ เจ•เฉ€ เจ•เจนเจฟ เจธเจ•เจฆเจพ เจนเจพเจ‚? เจกเจฟเฉฑเจ—เฉ‡ เจนเฉ‹เจ เจ•เฉฐเจฎเจพเจ‚ เจฆเฉ‡ เจธเจพเจฐเฉ‡ เจฆเฉเจนเจฐเจพเจ“ เจฒเจˆ เจ…เจฒเจฐเจŸ เจธเจฅเจพเจชเจค เจ•เฉ€เจคเฉ‡ เจ—เจ เจธเจจเฅค เจนเฉเจฃ เจฎเฉ‡เจฐเจพ เจ•เฉฐเจฎ Gmail เจตเจฟเฉฑเจš เจเจ…เจฐเจซเจฒเฉ‹ เจคเฉ‹เจ‚ >90k เจˆเจฎเฉ‡เจฒเจพเจ‚ เจนเจจ, เจ…เจคเฉ‡ เจตเฉˆเจฌ เจฎเฉ‡เจฒ เจฎเจœเจผเจฒ เจ‡เฉฑเจ• เจธเจฎเฉ‡เจ‚ เจตเจฟเฉฑเจš 100 เจคเฉ‹เจ‚ เจตเฉฑเจง เจจเฉ‚เฉฐ เจšเฉเฉฑเจ•เจฃ เจ…เจคเฉ‡ เจฎเจฟเจŸเจพเจ‰เจฃ เจคเฉ‹เจ‚ เจ‡เจจเจ•เจพเจฐ เจ•เจฐเจฆเจพ เจนเฉˆเฅค

เจนเฉ‹เจฐ เจจเฉเจ•เจธเจพเจจ: เจ…เจชเจพเจšเฉ‡ เจเจ…เจฐเจซเจฒเฉ‹ เจชเจฟเจŸเจซเฉ‡เจฒเจœเจผ

เจนเฉ‹เจฐ เจ†เจŸเฉ‹เจฎเฉ‡เจธเจผเจจ เจŸเฉ‚เจฒ

เจธเจพเจกเฉ‡ เจนเฉฑเจฅเจพเจ‚ เจจเจพเจฒ เจจเจนเฉ€เจ‚, เจธเจพเจกเฉ‡ เจธเจฟเจฐเจพเจ‚ เจจเจพเจฒ เจนเฉ‹เจฐ เจตเฉ€ เจ•เฉฐเจฎ เจ•เจฐเจจ เจฒเจˆ, เจเจ…เจฐเจซเจฒเฉ‹ เจจเฉ‡ เจธเจพเจกเฉ‡ เจฒเจˆ เจ‡เจน เจคเจฟเจ†เจฐ เจ•เฉ€เจคเจพ เจนเฉˆ:

  • REST API - เจ‰เจธ เจ•เฉ‹เจฒ เจ…เจœเฉ‡ เจตเฉ€ เจชเฉเจฐเจฏเฉ‹เจ—เจพเจคเจฎเจ• เจฆเจพ เจฆเจฐเจœเจพ เจนเฉˆ, เจœเฉ‹ เจ‰เจธเจจเฉ‚เฉฐ เจ•เฉฐเจฎ เจ•เจฐเจจ เจคเฉ‹เจ‚ เจจเจนเฉ€เจ‚ เจฐเฉ‹เจ•เจฆเจพเฅค เจ‡เจธ เจฆเฉ‡ เจจเจพเจฒ, เจคเฉเจธเฉ€เจ‚ เจจเจพ เจธเจฟเจฐเจซเจผ เจกเฉˆเจ— เจ…เจคเฉ‡ เจ•เฉฐเจฎเจพเจ‚ เจฌเจพเจฐเฉ‡ เจœเจพเจฃเจ•เจพเจฐเฉ€ เจชเฉเจฐเจพเจชเจค เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเฉ‹, เจธเจ—เฉ‹เจ‚ เจ‡เฉฑเจ• เจกเฉˆเจ— เจจเฉ‚เฉฐ เจฐเฉ‹เจ•/เจธเจผเฉเจฐเฉ‚ เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเฉ‹, เจ‡เฉฑเจ• เจกเฉ€เจเจœเฉ€ เจฐเจจ เจœเจพเจ‚ เจ‡เฉฑเจ• เจชเฉ‚เจฒ เจฌเจฃเจพ เจธเจ•เจฆเฉ‡ เจนเฉ‹เฅค
  • CLI - เจ•เจฎเจพเจ‚เจก เจฒเจพเจˆเจจ เจฐเจพเจนเฉ€เจ‚ เจฌเจนเฉเจค เจธเจพเจฐเฉ‡ เจŸเฉ‚เจฒ เจ‰เจชเจฒเจฌเจง เจนเจจ เจœเฉ‹ WebUI เจฐเจพเจนเฉ€เจ‚ เจตเจฐเจคเจฃ เจฒเจˆ เจ…เจธเฉเจตเจฟเจงเจพเจœเจจเจ• เจจเจนเฉ€เจ‚ เจนเจจ, เจชเจฐ เจ†เจฎ เจคเฉŒเจฐ 'เจคเฉ‡ เจ—เฉˆเจฐเจนเจพเจœเจผเจฐ เจนเจจเฅค เจ‰เจฆเจพเจนเจฐเจฃ เจฒเจˆ:
    • backfill เจ•เจพเจฐเจœ เจ‰เจฆเจพเจนเจฐเจจเจพเจ‚ เจจเฉ‚เฉฐ เจฎเฉเฉœ เจšเจพเจฒเฉ‚ เจ•เจฐเจจ เจฆเฉ€ เจฒเฉ‹เฉœ เจนเฉˆเฅค
      เจ‰เจฆเจพเจนเจฐเจฃ เจตเจœเฉ‹เจ‚, เจตเจฟเจธเจผเจฒเฉ‡เจธเจผเจ• เจ†เจ เจ…เจคเฉ‡ เจ•เจฟเจนเจพ: โ€œเจ…เจคเฉ‡ เจคเฉเจธเฉ€เจ‚, เจ•เจพเจฎเจฐเฉ‡เจก, 1 เจคเฉ‹เจ‚ 13 เจœเจจเจตเจฐเฉ€ เจคเฉฑเจ• เจฆเฉ‡ เจ…เฉฐเจ•เฉœเจฟเจ†เจ‚ เจตเจฟเฉฑเจš เจฌเจ•เจตเจพเจธ เจนเฉˆ! เจ‡เจธเจจเฉ‚เฉฐ เจ เฉ€เจ• เจ•เจฐเฉ‹, เจ‡เจธเจจเฉ‚เฉฐ เจ เฉ€เจ• เจ•เจฐเฉ‹, เจ‡เจธเจจเฉ‚เฉฐ เจ เฉ€เจ• เจ•เจฐเฉ‹, เจ‡เจธเจจเฉ‚เฉฐ เจ เฉ€เจ• เจ•เจฐเฉ‹!" เจ…เจคเฉ‡ เจคเฉเจธเฉ€เจ‚ เจ…เจœเจฟเจนเฉ‡ เจนเฉŒเจฌ เจนเฉ‹:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • เจ…เจงเจพเจฐ เจธเฉ‡เจตเจพ: initdb, resetdb, upgradedb, checkdb.
    • run, เจœเฉ‹ เจคเฉเจนเจพเจจเฉ‚เฉฐ เจ‡เฉฑเจ• เจ‰เจฆเจพเจนเจฐเจจ เจ•เจพเจฐเจœ เจจเฉ‚เฉฐ เจšเจฒเจพเจ‰เจฃ, เจ…เจคเฉ‡ เจธเจพเจฐเฉ€เจ†เจ‚ เจจเจฟเจฐเจญเจฐเจคเจพเจตเจพเจ‚ 'เจคเฉ‡ เจตเฉ€ เจธเจ•เฉ‹เจฐ เจ•เจฐเจจ เจฆเฉ€ เจ‡เจœเจพเจœเจผเจค เจฆเจฟเฉฐเจฆเจพ เจนเฉˆเฅค เจ‡เจธ เจคเฉ‹เจ‚ เจ‡เจฒเจพเจตเจพ, เจคเฉเจธเฉ€เจ‚ เจ‡เจธเจจเฉ‚เฉฐ เจฆเฉเจ†เจฐเจพ เจšเจฒเจพ เจธเจ•เจฆเฉ‡ เจนเฉ‹ LocalExecutor, เจญเจพเจตเฉ‡เจ‚ เจคเฉเจนเจพเจกเฉ‡ เจ•เฉ‹เจฒ เจธเฉˆเจฒเจฐเฉ€ เจ•เจฒเฉฑเจธเจŸเจฐ เจนเฉˆเฅค
    • เจชเจฐเฉˆเจŸเฉ€ เจฌเจนเฉเจค เจ•เฉเจ เจ‰เจนเฉ€ เจ•เจฐเจฆเจพ เจนเฉˆ test, เจธเจฟเจฐเจซเจผ เจ†เจงเจพเจฐเจพเจ‚ เจตเจฟเฉฑเจš เจตเฉ€ เจ•เฉเจ เจจเจนเฉ€เจ‚ เจฒเจฟเจ–เจฆเจพเฅค
    • connections เจธเจผเฉˆเฉฑเจฒ เจคเฉ‹เจ‚ เจ•เฉเจจเฉˆเจ•เจธเจผเจจ เจฌเจฃเจพเจ‰เจฃ เจฆเฉ€ เจ‡เจœเจพเจœเจผเจค เจฆเจฟเฉฐเจฆเจพ เจนเฉˆเฅค
  • เจชเจพเจˆเจฅเจจ เจเจชเฉ€เจ†เจˆ - เจชเจฐเจธเจชเจฐ เจ•เฉเจฐเจฟเจ† เจ•เจฐเจจ เจฆเจพ เจ‡เฉฑเจ• เจธเจ–เจผเจค เจคเจฐเฉ€เจ•เจพ, เจœเฉ‹ เจชเจฒเฉฑเจ—เจ‡เจจเจพเจ‚ เจฒเจˆ เจนเฉˆ, เจ…เจคเฉ‡ เจ‡เจธ เจตเจฟเฉฑเจš เจ›เฉ‹เจŸเฉ‡ เจนเฉฑเจฅเจพเจ‚ เจจเจพเจฒ เจเฉเจ•เจฃเจพ เจจเจนเฉ€เจ‚ เจนเฉˆเฅค เจชเจฐ เจธเจพเจจเฉ‚เฉฐ เจœเจพเจฃ เจคเฉ‹เจ‚ เจ•เฉŒเจฃ เจฐเฉ‹เจ•เจฆเจพ เจนเฉˆ /home/airflow/dags, เจฐเจจ ipython เจ…เจคเฉ‡ เจ†เจฒเฉ‡-เจฆเฉเจ†เจฒเฉ‡ เจ—เฉœเจฌเฉœ เจธเจผเฉเจฐเฉ‚? เจคเฉเจธเฉ€เจ‚, เจ‰เจฆเจพเจนเจฐเจจ เจฒเจˆ, เจนเฉ‡เจ เจพเจ‚ เจฆเจฟเฉฑเจคเฉ‡ เจ•เฉ‹เจก เจจเจพเจฒ เจธเจพเจฐเฉ‡ เจ•เจจเฉˆเจ•เจธเจผเจจเจพเจ‚ เจจเฉ‚เฉฐ เจจเจฟเจฐเจฏเจพเจค เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเฉ‹:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • เจเจ…เจฐเจซเจฒเฉ‹ เจฎเฉˆเจŸเจพเจกเฉ‡เจŸเจพเจฌเฉ‡เจธ เจจเจพเจฒ เจœเฉเฉœ เจฐเจฟเจนเจพ เจนเฉˆเฅค เจฎเฉˆเจ‚ เจ‡เจธ เจจเฉ‚เฉฐ เจฒเจฟเจ–เจฃ เจฆเฉ€ เจธเจฟเจซเจพเจฐเจธเจผ เจจเจนเฉ€เจ‚ เจ•เจฐเจฆเจพ เจนเจพเจ‚, เจชเจฐ เจตเฉฑเจ–-เจตเฉฑเจ– เจ–เจพเจธ เจฎเฉˆเจŸเฉเจฐเจฟเจ•เจธ เจฒเจˆ เจŸเจพเจธเจ• เจธเจŸเฉ‡เจŸเจธ เจชเฉเจฐเจพเจชเจค เจ•เจฐเจจเจพ เจ•เจฟเจธเฉ‡ เจตเฉ€ API เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจจ เจจเจพเจฒเฉ‹เจ‚ เจฌเจนเฉเจค เจคเฉ‡เจœเจผ เจ…เจคเฉ‡ เจ†เจธเจพเจจ เจนเฉ‹ เจธเจ•เจฆเจพ เจนเฉˆ.

    เจฆเฉฑเจธ เจฆเฉ‡เจˆเจ เจ•เจฟ เจธเจพเจกเฉ‡ เจธเจพเจฐเฉ‡ เจ•เฉฐเจฎ เจฌเฉ‡เจ•เจพเจฌเฉ‚ เจจเจนเฉ€เจ‚ เจนเฉเฉฐเจฆเฉ‡, เจชเจฐ เจ‰เจน เจ•เจˆ เจตเจพเจฐ เจกเจฟเฉฑเจ— เจธเจ•เจฆเฉ‡ เจนเจจ, เจ…เจคเฉ‡ เจ‡เจน เจ†เจฎ เจ—เฉฑเจฒ เจนเฉˆเฅค เจชเจฐ เจ•เฉเจ เจฐเฉเจ•เจพเจตเจŸเจพเจ‚ เจชเจนเจฟเจฒเจพเจ‚ เจนเฉ€ เจธเจผเฉฑเจ•เฉ€ เจนเจจ, เจ…เจคเฉ‡ เจ‡เจธเจฆเฉ€ เจœเจพเจ‚เจš เจ•เจฐเจจเฉ€ เจœเจผเจฐเฉ‚เจฐเฉ€ เจนเฉ‹เจตเฉ‡เจ—เฉ€เฅค

    SQL เจธเจพเจตเจงเจพเจจ!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

เจนเจตเจพเจฒเฉ‡

เจ…เจคเฉ‡ เจฌเฉ‡เจธเจผเฉฑเจ•, เจ—เฉ‚เจ—เจฒ เจฆเฉ‡ เจœเจพเจฐเฉ€ เจนเฉ‹เจฃ เจคเฉ‹เจ‚ เจชเจนเจฟเจฒเฉ‡ เจฆเจธ เจฒเจฟเฉฐเจ• เจฎเฉ‡เจฐเฉ‡ เจฌเฉเฉฑเจ•เจฎเจพเจฐเจ•เจธ เจคเฉ‹เจ‚ เจเจ…เจฐเจซเจฒเฉ‹ เจซเฉ‹เจฒเจกเจฐ เจฆเฉ€ เจธเจฎเฉฑเจ—เจฐเฉ€ เจนเจจ.

เจ…เจคเฉ‡ เจฒเฉ‡เจ– เจตเจฟเฉฑเจš เจตเจฐเจคเฉ‡ เจ—เจ เจฒเจฟเฉฐเจ•:

เจธเจฐเฉ‹เจค: www.habr.com