เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

เชนเชพเชฏ, เชนเซเช‚ เชฆเชฟเชฎเชฟเชคเซเชฐเซ€ เชฒเซ‹เช—เชตเชฟเชจเซ‡เชจเซเช•เซ‹ เช›เซเช‚ - เชตเซ‡เชเซ‡เชŸ เชœเซ‚เชฅเชจเซ€ เช•เช‚เชชเชจเซ€เช“เชจเชพ เชเชจเชพเชฒเชฟเชŸเชฟเช•เซเชธ เชตเชฟเชญเชพเช—เชจเซ‹ เชกเซ‡เชŸเชพ เชเชจเซเชœเชฟเชจเชฟเชฏเชฐ.

เชนเซเช‚ เชคเชฎเชจเซ‡ ETL เชชเซเชฐเช•เซเชฐเชฟเชฏเชพเช“ เชตเชฟเช•เชธเชพเชตเชตเชพ เชฎเชพเชŸเซ‡ เชเช• เช…เชฆเซเชญเซเชค เชธเชพเชงเชจ เชตเชฟเชถเซ‡ เช•เชนเซ€เชถ - Apache Airflow. เชชเชฐเช‚เชคเซ เชเชฐเชซเซเชฒเซ‹ เชเชŸเชฒเซ‹ เชธเชฐเซเชตเชคเซ‹เชฎเซเช–เซ€ เช…เชจเซ‡ เชฌเชนเซเชชเช•เซเชทเซ€เชฏ เช›เซ‡ เช•เซ‡ เชคเชฎเซ‡ เชกเซ‡เชŸเชพ เชชเซเชฐเชตเชพเชนเชฎเชพเช‚ เชธเชพเชฎเซ‡เชฒ เชจ เชนเซ‹เชต เชคเซ‹ เชชเชฃ เชคเชฎเชพเชฐเซ‡ เชคเซ‡เชจเซ‡ เชจเชœเซ€เช•เชฅเซ€ เชœเซ‹เชตเซเช‚ เชœเซ‹เชˆเช, เชชเชฐเช‚เชคเซ เชธเชฎเชฏเชพเช‚เชคเชฐเซ‡ เช•เซ‹เชˆเชชเชฃ เชชเซเชฐเช•เซเชฐเชฟเชฏเชพเช“ เชถเชฐเซ‚ เช•เชฐเชตเชพเชจเซ€ เช…เชจเซ‡ เชคเซ‡เชจเชพ เช…เชฎเชฒ เชชเชฐ เชฆเซ‡เช–เชฐเซ‡เช– เชฐเชพเช–เชตเชพเชจเซ€ เชœเชฐเซ‚เชฐ เช›เซ‡.

เช…เชจเซ‡ เชนเชพ, เชนเซเช‚ เชฎเชพเชคเซเชฐ เช•เชนเซ€เชถ เชจเชนเซ€เช‚, เชชเชฃ เชฌเชคเชพเชตเซ€เชถ: เชชเซเชฐเซ‹เช—เซเชฐเชพเชฎเชฎเชพเช‚ เช˜เชฃเชพ เชฌเชงเชพ เช•เซ‹เชก, เชธเซเช•เซเชฐเซ€เชจเชถเซ‰เชŸเซเชธ เช…เชจเซ‡ เชญเชฒเชพเชฎเชฃเซ‹ เช›เซ‡.

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚
เชœเซเชฏเชพเชฐเซ‡ เชคเชฎเซ‡ เชเชฐเชซเซเชฒเซ‹ / เชตเชฟเช•เชฟเชฎเซ€เชกเชฟเชฏเชพ เช•เซ‹เชฎเชจเซเชธ เชถเชฌเซเชฆเชจเซ‡ เช—เซ‚เช—เชฒ เช•เชฐเซ‹ เช›เซ‹ เชคเซเชฏเชพเชฐเซ‡ เชคเชฎเซ‡ เชธเชพเชฎเชพเชจเซเชฏ เชฐเซ€เชคเซ‡ เชถเซเช‚ เชœเซเช“ เช›เซ‹

เชธเชฎเชพเชตเชฟเชทเซเชŸเซ‹เชจเซเช‚ เช•เซ‹เชทเซเชŸเช•

เชชเชฐเชฟเชšเชฏ

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹ เชกเซ€เชœเซ‡เช‚เช—เซ‹เชจเซ€ เชœเซ‡เชฎ เชœ เช›เซ‡:

  • เช…เชœเช—เชฐเชฎเชพเช‚ เชฒเช–เซ‡เชฒเซเช‚
  • เชเช• เชธเชฐเชธ เชเชกเชฎเชฟเชจ เชชเซ‡เชจเชฒ เช›เซ‡,
  • เช…เชจเชฟเชถเซเชšเชฟเชค เชธเชฎเชฏ เชฎเชพเชŸเซ‡ เชตเชฟเชธเซเชคเซƒเชค

- เชซเช•เซเชค เชตเชงเซ เชธเชพเชฐเซเช‚, เช…เชจเซ‡ เชคเซ‡ เชธเช‚เชชเซ‚เชฐเซเชฃเชชเชฃเซ‡ เช…เชฒเช— เชนเซ‡เชคเซเช“ เชฎเชพเชŸเซ‡ เชฌเชจเชพเชตเชตเชพเชฎเชพเช‚ เช†เชตเซเชฏเซเช‚ เชนเชคเซเช‚, เชเชŸเชฒเซ‡ เช•เซ‡ (เชœเซ‡เชฎ เช•เซ‡ เชคเซ‡ เช•เชพเชคเชพ เชชเชนเซ‡เชฒเชพ เชฒเช–เชพเชฏเซ‡เชฒ เช›เซ‡):

  • เช…เชฎเชฐเซเชฏเชพเชฆเชฟเชค เชธเช‚เช–เซเชฏเชพเชฎเชพเช‚ เชฎเชถเซ€เชจเซ‹ เชชเชฐ เช•เชพเชฐเซเชฏ เชšเชฒเชพเชตเชตเซเช‚ เช…เชจเซ‡ เชคเซ‡เชจเซเช‚ เชจเชฟเชฐเซ€เช•เซเชทเชฃ เช•เชฐเชตเซเช‚ (เชœเซ‡เชŸเชฒเชพ เชธเซ‡เชฒเชฐเซ€ / เช•เซเชฌเชฐเชจเซ‡เชŸเซเชธ เช…เชจเซ‡ เชคเชฎเชพเชฐเซ‹ เช…เช‚เชคเชฐเชพเชคเซเชฎเชพ เชคเชฎเชจเซ‡ เชชเชฐเชตเชพเชจเช—เซ€ เช†เชชเชถเซ‡)
  • เชชเชพเชฏเชฅเซ‹เชจ เช•เซ‹เชก เชฒเช–เชตเชพ เช…เชจเซ‡ เชธเชฎเชœเชตเชพ เชฎเชพเชŸเซ‡ เช–เซ‚เชฌ เชœ เชธเชฐเชณ เชฅเซ€ เชกเชพเชฏเชจเซ‡เชฎเชฟเช• เชตเชฐเซเช•เชซเซเชฒเซ‹ เชœเชจเชฐเซ‡เชถเชจ เชธเชพเชฅเซ‡
  • เช…เชจเซ‡ เช•เซ‹เชˆเชชเชฃ เชกเซ‡เชŸเชพเชฌเซ‡เชธเซ‡เชธ เช…เชจเซ‡ API เชจเซ‡ เชคเซˆเชฏเชพเชฐ เช˜เชŸเช•เซ‹ เช…เชจเซ‡ เชนเซ‹เชฎเชฎเซ‡เช‡เชก เชชเซเชฒเช—เชˆเชจเซเชธ เชฌเช‚เชจเซ‡เชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เชจเซ‡ เชเช•เชฌเซ€เชœเชพ เชธเชพเชฅเซ‡ เช•เชจเซ‡เช•เซเชŸ เช•เชฐเชตเชพเชจเซ€ เช•เซเชทเชฎเชคเชพ (เชœเซ‡ เช…เชคเซเชฏเช‚เชค เชธเชฐเชณ เช›เซ‡).

เช…เชฎเซ‡ เช† เชฐเซ€เชคเซ‡ เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹เชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เช เช›เซ€เช:

  • เช…เชฎเซ‡ DWH เช…เชจเซ‡ ODS (เช…เชฎเชพเชฐเซ€ เชชเชพเชธเซ‡ เชตเชฐเซเชŸเซ€เช•เชพ เช…เชจเซ‡ เช•เซเชฒเชฟเช•เชนเชพเช‰เชธ เช›เซ‡) เชฎเชพเช‚ เชตเชฟเชตเชฟเชง เชธเซเชคเซเชฐเซ‹เชคเซ‹ (เช˜เชฃเชพ SQL เชธเชฐเซเชตเชฐ เช…เชจเซ‡ PostgreSQL เช‰เชฆเชพเชนเชฐเชฃเซ‹, เชเชชเซเชฒเชฟเช•เซ‡เชถเชจ เชฎเซ‡เชŸเซเชฐเชฟเช•เซเชธ เชธเชพเชฅเซ‡เชจเชพ เชตเชฟเชตเชฟเชง API, 1C เชชเชฃ) เชฎเชพเช‚เชฅเซ€ เชกเซ‡เชŸเชพ เชเช•เชคเซเชฐเชฟเชค เช•เชฐเซ€เช เช›เซ€เช.
  • เช•เซ‡เชตเซ€ เชฐเซ€เชคเซ‡ เช…เชฆเซเชฏเชคเชจ cron, เชœเซ‡ ODS เชชเชฐ เชกเซ‡เชŸเชพ เชเช•เชคเซเชฐเซ€เช•เชฐเชฃ เชชเซเชฐเช•เซเชฐเชฟเชฏเชพเช“ เชถเชฐเซ‚ เช•เชฐเซ‡ เช›เซ‡, เช…เชจเซ‡ เชคเซ‡เชฎเชจเซ€ เชœเชพเชณเชตเชฃเซ€ เชชเชฐ เชชเชฃ เชจเชœเชฐ เชฐเชพเช–เซ‡ เช›เซ‡.

เชคเชพเชœเซ‡เชคเชฐเชฎเชพเช‚ เชธเซเชงเซ€, เช…เชฎเชพเชฐเซ€ เชœเชฐเซ‚เชฐเชฟเชฏเชพเชคเซ‹ 32 เช•เซ‹เชฐเซ‹ เช…เชจเซ‡ 50 GB RAM เชธเชพเชฅเซ‡ เชเช• เชจเชพเชจเชพ เชธเชฐเซเชตเชฐ เชฆเซเชตเชพเชฐเชพ เช†เชตเชฐเซ€ เชฒเซ‡เชตเชพเชฎเชพเช‚ เช†เชตเซ€ เชนเชคเซ€. เชเชฐเชซเซเชฒเซ‹เชฎเชพเช‚, เช† เช•เชพเชฎ เช•เชฐเซ‡ เช›เซ‡:

  • เชตเชงเซ 200 เชกเซ…เช—เซเชธ (เช–เชฐเซ‡เช–เชฐ เชตเชฐเซเช•เชซเซเชฒเซ‹, เชœเซ‡เชฎเชพเช‚ เช†เชชเชฃเซ‡ เช•เชพเชฐเซเชฏเซ‹ เชญเชฐเซเชฏเชพ เช›เซ‡),
  • เชฆเชฐเซ‡เช•เชฎเชพเช‚ เชธเชฐเซ‡เชฐเชพเชถ 70 เช•เชพเชฐเซเชฏเซ‹,
  • เช† เชญเชฒเชพเชˆ เชถเชฐเซ‚ เชฅเชพเชฏ เช›เซ‡ (เชธเชฐเซ‡เชฐเชพเชถ เชชเชฃ) เช•เชฒเชพเช•เชฎเชพเช‚ เชเช•เชตเชพเชฐ.

เช…เชจเซ‡ เช…เชฎเซ‡ เช•เซ‡เชตเซ€ เชฐเซ€เชคเซ‡ เชตเชฟเชธเซเชคเชฐเชฃ เช•เชฐเซเชฏเซเช‚ เชคเซ‡ เชตเชฟเชถเซ‡, เชนเซเช‚ เชจเซ€เชšเซ‡ เชฒเช–เซ€เชถ, เชชเชฐเช‚เชคเซ เชนเชตเซ‡ เชšเชพเชฒเซ‹ รผber-เชธเชฎเชธเซเชฏเชพเชจเซ‡ เชตเซเชฏเชพเช–เซเชฏเชพเชฏเชฟเชค เช•เชฐเซ€เช เชœเซ‡ เช†เชชเชฃเซ‡ เชนเชฒ เช•เชฐเซ€เชถเซเช‚:

เชคเซเชฏเชพเช‚ เชคเซเชฐเชฃ เชฎเซ‚เชณ SQL เชธเชฐเซเชตเชฐ เช›เซ‡, เชฆเชฐเซ‡เช•เชฎเชพเช‚ 50 เชกเซ‡เชŸเชพเชฌเซ‡เชธเซ‡เชธ เช›เซ‡ - เช…เชจเซเช•เซเชฐเชฎเซ‡ เชเช• เชชเซเชฐเซ‹เชœเซ‡เช•เซเชŸเชจเชพ เช‰เชฆเชพเชนเชฐเชฃเซ‹, เชคเซ‡เชฎเชจเซ€ เชชเชพเชธเซ‡ เชธเชฎเชพเชจ เชฎเชพเชณเช–เซเช‚ เช›เซ‡ (เชฒเช—เชญเช— เชฆเชฐเซ‡เช• เชœเช—เซเชฏเชพเช, เชฎเซเช†-เชนเชพ-เชนเชพ), เชœเซ‡เชจเซ‹ เช…เชฐเซเชฅ เช›เซ‡ เช•เซ‡ เชฆเชฐเซ‡เช• เชชเชพเชธเซ‡ เช“เชฐเซเชกเชฐ เชŸเซ‡เชฌเชฒ เช›เซ‡ (เชธเชฆเชจเชธเซ€เชฌเซ‡, เชคเซ‡ เชธเชพเชฅเซ‡เชจเซเช‚ เชŸเซ‡เชฌเชฒ เชจเชพเชฎ เช•เซ‹เชˆเชชเชฃ เชตเซเชฏเชตเชธเชพเชฏเชฎเชพเช‚ เชฆเชฌเชพเชฃ เช•เชฐเซ€ เชถเช•เชพเชฏ เช›เซ‡). เช…เชฎเซ‡ เชธเชฐเซเชตเชฟเชธ เชซเซ€เชฒเซเชกเซเชธ (เชธเซ‹เชฐเซเชธ เชธเชฐเซเชตเชฐ, เชธเซ‹เชฐเซเชธ เชกเซ‡เชŸเชพเชฌเซ‡เช, ETL เชŸเชพเชธเซเช• เช†เชˆเชกเซ€) เช‰เชฎเซ‡เชฐเซ€เชจเซ‡ เชกเซ‡เชŸเชพ เชฒเชˆเช เช›เซ€เช เช…เชจเซ‡ เชจเชฟเช–เชพเชฒเชธเชชเชฃเซ‡ เชคเซ‡เชจเซ‡ เชตเชฐเซเชŸเชฟเช•เชพเชฎเชพเช‚ เชจเชพเช–เซ€เช เช›เซ€เช.

เชšเชพเชฒเซ‹ เชœเชˆเช!

เชฎเซเช–เซเชฏ เชญเชพเช—, เชตเซเชฏเชตเชนเชพเชฐเซ (เช…เชจเซ‡ เชฅเซ‹เชกเซ‹ เชธเซˆเชฆเซเชงเชพเช‚เชคเชฟเช•)

เชถเชพ เชฎเชพเชŸเซ‡ เช…เชฎเซ‡ (เช…เชจเซ‡ เชคเชฎเซ‡)

เชœเซเชฏเชพเชฐเซ‡ เชตเซƒเช•เซเชทเซ‹ เชฎเซ‹เชŸเชพ เชนเชคเชพ เช…เชจเซ‡ เชนเซเช‚ เชธเชพเชฆเซ‹ เชนเชคเซ‹ SQL- เชเช• เชฐเชถเชฟเชฏเชจ เชฐเชฟเชŸเซ‡เชฒเชฎเชพเช‚, เช…เชฎเซ‡ เช…เชฎเชพเชฐเชพ เชฎเชพเชŸเซ‡ เช‰เชชเชฒเชฌเซเชง เชฌเซ‡ เชŸเซ‚เชฒเซเชธเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เชจเซ‡ ETL เชชเซเชฐเช•เซเชฐเชฟเชฏเชพเช“ เช‰เชฐเซเชซเซ‡ เชกเซ‡เชŸเชพ เชซเซเชฒเซ‹ เชธเชพเชฅเซ‡ เช•เซŒเชญเชพเช‚เชก เช•เชฐเซเชฏเซเช‚ เช›เซ‡:

  • เช‡เชจเซเชซเซ‹เชฐเซเชฎเซ‡เชŸเชฟเช•เชพ เชชเชพเชตเชฐ เชธเซ‡เชจเซเชŸเชฐ - เชเช• เช…เชคเซเชฏเช‚เชค เชซเซ‡เชฒเชพเชคเซ€ เชธเชฟเชธเซเชŸเชฎ, เช…เชคเซเชฏเช‚เชค เช‰เชคเซเชชเชพเชฆเช•, เชคเซ‡เชจเชพ เชชเซ‹เชคเชพเชจเชพ เชนเชพเชฐเซเชกเชตเซ‡เชฐ เชธเชพเชฅเซ‡, เชคเซ‡เชจเซเช‚ เชชเซ‹เชคเชพเชจเซเช‚ เชตเชฐเซเชเชจเชฟเช‚เช—. เชฎเซ‡เช‚ เชคเซ‡เชจเซ€ เช•เซเชทเชฎเชคเชพเช“เชจเชพ 1% เชญเช—เชตเชพเชจเชจเซ‡ เชชเซเชฐเชคเชฟเชฌเช‚เชงเชฟเชค เช•เชฐเชตเชพเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซเชฏเซ‹. เชถเชพ เชฎเชพเชŸเซ‡? เชธเชพเชฐเซเช‚, เชธเซŒ เชชเซเชฐเชฅเชฎ, เช† เช‡เชจเซเชŸเชฐเชซเซ‡เชธ, เช•เซเชฏเชพเช‚เช• 380 เชจเชพ เชฆเชพเชฏเช•เชพเชฅเซ€, เชฎเชพเชจเชธเชฟเช• เชฐเซ€เชคเซ‡ เช†เชชเชฃเชพ เชชเชฐ เชฆเชฌเชพเชฃ เชฒเชพเชตเซ‡ เช›เซ‡. เชฌเซ€เชœเซเช‚, เช† เช•เซ‹เชจเซเชŸเซเชฐเชพเชชเชถเชจ เช…เชคเซเชฏเช‚เชค เชซเซ‡เชจเซเชธเซ€ เชชเซเชฐเช•เซเชฐเชฟเชฏเชพเช“, เช—เซเชธเซเชธเซ‡ เช˜เชŸเช•เซ‹เชจเชพ เชชเซเชจเชƒเช‰เชชเชฏเซ‹เช— เช…เชจเซ‡ เช…เชจเซเชฏ เช…เชคเซเชฏเช‚เชค-เชฎเชนเชคเซเชตเชชเซ‚เชฐเซเชฃ-เช‰เชฆเซเชฏเซ‹เช—-เชฏเซเช•เซเชคเชฟเช“ เชฎเชพเชŸเซ‡ เชฐเชšเชพเชฏเซ‡เชฒ เช›เซ‡. เชเชฐเชฌเชธ AXNUMX / เชตเชฐเซเชท เชจเซ€ เชชเชพเช‚เช–เชจเซ€ เชœเซ‡เชฎ เชคเซ‡เชจเซ€ เช•เชฟเช‚เชฎเชค เช›เซ‡ เชคเซ‡ เชนเช•เซ€เช•เชค เชตเชฟเชถเซ‡, เช…เชฎเซ‡ เช•เช‚เชˆ เช•เชนเซ€เชถเซเช‚ เชจเชนเซ€เช‚.

    เชธเชพเชตเชšเซ‡เชค เชฐเชนเซ‹, เชธเซเช•เซเชฐเซ€เชจเชถเซ‰เชŸ 30 เชตเชฐเซเชทเชฅเซ€ เช“เช›เซ€ เช‰เช‚เชฎเชฐเชจเชพ เชฒเซ‹เช•เซ‹เชจเซ‡ เชฅเซ‹เชกเซเช‚ เชจเซเช•เชธเชพเชจ เชชเชนเซ‹เช‚เชšเชพเชกเซ€ เชถเช•เซ‡ เช›เซ‡

    เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

  • SQL เชธเชฐเซเชตเชฐ เชเช•เซ€เช•เชฐเชฃ เชธเชฐเซเชตเชฐ - เช…เชฎเซ‡ เช…เชฎเชพเชฐเชพ เช‡เชจเซเชŸเซเชฐเชพ-เชชเซเชฐเซ‹เชœเซ‡เช•เซเชŸ เชชเซเชฐเชตเชพเชนเชฎเชพเช‚ เช† เชธเชพเชฅเซ€เชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซเชฏเซ‹. เช เซ€เช• เช›เซ‡, เชนเช•เซ€เช•เชคเชฎเชพเช‚: เช…เชฎเซ‡ เชชเชนเซ‡เชฒเชพเชฅเซ€ เชœ SQL เชธเชฐเซเชตเชฐเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เช เช›เซ€เช, เช…เชจเซ‡ เชคเซ‡เชจเชพ ETL เชธเชพเชงเชจเซ‹เชจเซ‹ เช‰เชชเชฏเซ‹เช— เชจ เช•เชฐเชตเซ‹ เชคเซ‡ เช•เซ‹เชˆเช• เชฐเซ€เชคเซ‡ เช—เซ‡เชฐเชตเชพเชœเชฌเซ€ เชนเชถเซ‡. เชคเซ‡เชฎเชพเช‚ เชฌเชงเซเช‚ เชธเชพเชฐเซเช‚ เช›เซ‡: เชฌเช‚เชจเซ‡ เช‡เชจเซเชŸเชฐเชซเซ‡เชธ เชธเซเช‚เชฆเชฐ เช›เซ‡, เช…เชจเซ‡ เชชเซเชฐเซ‹เช—เซเชฐเซ‡เชธ เชฐเชฟเชชเซ‹เชฐเซเชŸเซเชธ... เชชเชฐเช‚เชคเซ เช† เชฎเชพเชŸเซ‡ เช…เชฎเซ‡ เชธเซ‹เชซเซเชŸเชตเซ‡เชฐ เชชเซเชฐเซ‹เชกเช•เซเชŸเซเชธเชจเซ‡ เชชเชธเช‚เชฆ เชจเชฅเซ€ เช•เชฐเชคเชพ, เช“เชน, เช† เชฎเชพเชŸเซ‡ เชจเชนเซ€เช‚. เชคเซ‡เชจเซเช‚ เชธเช‚เชธเซเช•เชฐเชฃ dtsx (เชœเซ‡ เชธเซ‡เชต เชชเชฐ เชถเชซเชฒเซเชก เชจเซ‹เชกเซเชธ เชธเชพเชฅเซ‡ XML เช›เซ‡) เช…เชฎเซ‡ เช•เชฐเซ€ เชถเช•เซ€เช เช›เซ€เช, เชชเชฐเช‚เชคเซ เชฎเซเชฆเซเชฆเซ‹ เชถเซเช‚ เช›เซ‡? เชเช• เชŸเชพเชธเซเช• เชชเซ‡เช•เซ‡เชœ เชฌเชจเชพเชตเชตเชพ เชตเชฟเชถเซ‡ เช•เซ‡ เชœเซ‡ เชธเซ‡เช‚เช•เชกเซ‹ เช•เซ‹เชทเซเชŸเช•เซ‹เชจเซ‡ เชเช• เชธเชฐเซเชตเชฐเชฅเซ€ เชฌเซ€เชœเชพ เชธเชฐเซเชตเชฐ เชชเชฐ เช–เซ‡เช‚เชšเชถเซ‡? เชนเชพ, เชถเซเช‚ เชธเซ‹, เชคเชฎเชพเชฐเซ€ เชคเชฐเซเชœเชจเซ€ เช†เช‚เช—เชณเซ€ เชตเซ€เชธ เชŸเซเช•เชกเชพเช“เชฎเชพเช‚เชฅเซ€ เชชเชกเซ€ เชœเชถเซ‡, เชฎเชพเช‰เชธ เชฌเชŸเชจ เชชเชฐ เช•เซเชฒเชฟเช• เช•เชฐเซ‹. เชชเชฐเช‚เชคเซ เชคเซ‡ เชšเซ‹เช•เซเช•เชธเชชเชฃเซ‡ เชตเชงเซ เชซเซ‡เชถเชจเซ‡เชฌเชฒ เชฒเชพเช—เซ‡ เช›เซ‡:

    เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

เช…เชฎเซ‡ เชšเซ‹เช•เซเช•เชธเชชเชฃเซ‡ เชฐเชธเซเชคเชพเช“ เชถเซ‹เชงเซ€ เชฐเชนเซเชฏเชพ เช›เซ€เช. เช•เซ‡เชธ เชชเชฃ เชฒเช—เชญเช— เชธเซเชต-เชฒเซ‡เช–เชฟเชค SSIS เชชเซ‡เช•เซ‡เชœ เชœเชจเชฐเซ‡เชŸเชฐ เชชเชฐ เช†เชตเซเชฏเชพ ...

โ€ฆเช…เชจเซ‡ เชชเช›เซ€ เชฎเชจเซ‡ เชจเชตเซ€ เชจเซ‹เช•เชฐเซ€ เชฎเชณเซ€. เช…เชจเซ‡ เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹ เชคเซ‡เชจเชพ เชชเชฐ เชฎเชจเซ‡ เช†เช—เชณ เชจเซ€เช•เชณเซ€ เช—เชฏเซ‹.

เชœเซเชฏเชพเชฐเซ‡ เชฎเชจเซ‡ เชœเชพเชฃเชตเชพ เชฎเชณเซเชฏเซเช‚ เช•เซ‡ ETL เชชเซเชฐเช•เซเชฐเชฟเชฏเชพ เชตเชฐเซเชฃเชจเซ‹ เชธเชฐเชณ เชชเชพเชฏเชฅเซ‹เชจ เช•เซ‹เชก เช›เซ‡, เชคเซเชฏเชพเชฐเซ‡ เชฎเซ‡เช‚ เช†เชจเช‚เชฆ เชฎเชพเชŸเซ‡ เชจเซƒเชคเซเชฏ เช•เชฐเซเชฏเซเช‚ เชจเชนเซ€เช‚. เช† เชฐเซ€เชคเซ‡ เชกเซ‡เชŸเชพ เชธเซเชŸเซเชฐเซ€เชฎเชจเซเช‚ เชตเชฐเซเชเชจ เช…เชจเซ‡ เชญเชฟเชจเซเชจเชคเชพ เช•เชฐเชตเชพเชฎเชพเช‚ เช†เชตเซ€ เชนเชคเซ€, เช…เชจเซ‡ เชธเซ‡เช‚เช•เชกเซ‹ เชกเซ‡เชŸเชพเชฌเซ‡เชเชฎเชพเช‚เชฅเซ€ เชเช• เชœ เชธเซเชŸเซเชฐเช•เซเชšเชฐ เชธเชพเชฅเซ‡ เช•เซ‹เชทเซเชŸเช•เซ‹เชจเซ‡ เชเช• เชฒเช•เซเชทเซเชฏเชฎเชพเช‚ เชฐเซ‡เชกเชตเซเช‚ เช เชฆเซ‹เชข เช•เซ‡ เชฌเซ‡ 13โ€ เชธเซเช•เซเชฐเซ€เชจเชฎเชพเช‚ เชชเชพเชฏเชฅเซ‹เชจ เช•เซ‹เชกเชจเซ€ เชฌเชพเชฌเชค เชฌเชจเซ€ เช—เชˆ เชนเชคเซ€.

เช•เซเชฒเชธเซเชŸเชฐ เชเชธเซ‡เชฎเซเชฌเชฒเซ€เช‚เช—

เชšเชพเชฒเซ‹ เชธเช‚เชชเซ‚เชฐเซเชฃเชชเชฃเซ‡ เชฌเชพเชฒเชฎเช‚เชฆเชฟเชฐเชจเซ€ เชตเซเชฏเชตเชธเซเชฅเชพ เชจ เช•เชฐเซ€เช, เช…เชจเซ‡ เช…เชนเซ€เช‚ เชธเช‚เชชเซ‚เชฐเซเชฃเชชเชฃเซ‡ เชธเซเชชเชทเซเชŸ เชตเชธเซเชคเซเช“ เชตเชฟเชถเซ‡ เชตเชพเชค เชจ เช•เชฐเซ€เช, เชœเซ‡เชฎ เช•เซ‡ เชเชฐเชซเซเชฒเซ‹, เชคเชฎเชพเชฐเชพ เชชเชธเช‚เชฆ เช•เชฐเซ‡เชฒเชพ เชกเซ‡เชŸเชพเชฌเซ‡เช, เชธเซ‡เชฒเชฐเซ€ เช…เชจเซ‡ เชกเซ‹เช•เซเชธเชฎเชพเช‚ เชตเชฐเซเชฃเชตเซ‡เชฒ เช…เชจเซเชฏ เช•เซ‡เชธเซ‹ เช‡เชจเซเชธเซเชŸเซ‹เชฒ เช•เชฐเชตเชพ.

เชœเซ‡เชฅเซ€ เช…เชฎเซ‡ เชคเชฐเชค เชœ เชชเซเชฐเชฏเซ‹เช—เซ‹ เชถเชฐเซ‚ เช•เชฐเซ€ เชถเช•เซ€เช, เชฎเซ‡เช‚ เชธเซเช•เซ‡เชš เชฌเชจเชพเชตเซเชฏเซเช‚ docker-compose.yml เชœเซ‡เชฎเชพเช‚:

  • เชšเชพเชฒเซ‹ เช–เชฐเซ‡เช–เชฐ เชตเชงเชพเชฐเซ€เช เชนเชตเชพ เชชเซเชฐเชตเชพเชน: เชถเซ‡เชกเซเชฏเซ‚เชฒเชฐ, เชตเซ‡เชฌเชธเชฐเซเชตเชฐ. เชธเซ‡เชฒเชฐเซ€เชจเชพ เช•เชพเชฐเซเชฏเซ‹เชจเซ‡ เชฎเซ‹เชจเชฟเชŸเชฐ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡ เชซเซเชฒเชพเชตเชฐ เชชเชฃ เชคเซเชฏเชพเช‚ เชซเชฐเชคเซเช‚ เชนเชถเซ‡ (เช•เชพเชฐเชฃ เช•เซ‡ เชคเซ‡ เชชเชนเซ‡เชฒเชพเชฅเซ€ เชœ apache/airflow:1.10.10-python3.7, เชชเชฃ เช…เชฎเชจเซ‡ เชตเชพเช‚เชงเซ‹ เชจเชฅเซ€)
  • เชชเซ‹เชธเซเชŸเช—เซเชฐเซ‡ เชเชธเช•เซเชฏเซเชเชฒ, เชœเซ‡เชฎเชพเช‚ เชเชฐเชซเซเชฒเซ‹ เชคเซ‡เชจเซ€ เชธเซ‡เชตเชพ เชฎเชพเชนเชฟเชคเซ€ (เชถเซ‡เชกเซเชฏเซเชฒเชฐ เชกเซ‡เชŸเชพ, เชเช•เซเชเซ‡เช•เซเชฏเซเชถเชจเชจเชพ เช†เช‚เช•เชกเชพ, เชตเช—เซ‡เชฐเซ‡) เชฒเช–เชถเซ‡ เช…เชจเซ‡ เชธเซ‡เชฒเชฐเซ€ เชชเซ‚เชฐเซเชฃ เชฅเชฏเซ‡เชฒเชพ เช•เชพเชฐเซเชฏเซ‹เชจเซ‡ เชšเชฟเชนเซเชจเชฟเชค เช•เชฐเชถเซ‡;
  • Redis, เชœเซ‡ เชธเซ‡เชฒเชฐเซ€ เชฎเชพเชŸเซ‡ เชŸเชพเชธเซเช• เชฌเซเชฐเซ‹เช•เชฐ เชคเชฐเซ€เช•เซ‡ เช•เชพเชฎ เช•เชฐเชถเซ‡;
  • เชธเซ‡เชฒเชฐเซ€ เช•เชพเชฎเชฆเชพเชฐ, เชœเซ‡ เช•เชพเชฐเซเชฏเซ‹เชจเชพ เชธเซ€เชงเชพ เช…เชฎเชฒเชฎเชพเช‚ เชฐเซ‹เช•เชพเชฏเซ‡เชฒ เชนเชถเซ‡.
  • เชซเซ‹เชฒเซเชกเชฐเชฎเชพเช‚ ./dags เช…เชฎเซ‡ เชกเซ‡เช—เซเชธเชจเชพ เชตเชฐเซเชฃเชจ เชธเชพเชฅเซ‡ เช…เชฎเชพเชฐเซ€ เชซเชพเช‡เชฒเซ‹ เช‰เชฎเซ‡เชฐเซ€เชถเซเช‚. เชคเซ‡เช“ เชซเซเชฒเชพเชฏ เชชเชฐ เชฒเซ‡เชตเชพเชฎเชพเช‚ เช†เชตเชถเซ‡, เชคเซ‡เชฅเซ€ เชฆเชฐเซ‡เช• เช›เซ€เช‚เช• เชชเช›เซ€ เชธเชฎเช—เซเชฐ เชธเซเชŸเซ‡เช•เชจเซ‡ เชœเช—เชฒ เช•เชฐเชตเชพเชจเซ€ เชœเชฐเซ‚เชฐ เชจเชฅเซ€.

เช•เซ‡เชŸเชฒเชพเช• เชธเซเชฅเชณเซ‹เช, เช‰เชฆเชพเชนเชฐเชฃเซ‹เชฎเชพเช‚เชจเซ‹ เช•เซ‹เชก เชธเช‚เชชเซ‚เชฐเซเชฃ เชฐเซ€เชคเซ‡ เชฌเชคเชพเชตเชตเชพเชฎเชพเช‚ เช†เชตเชคเซ‹ เชจเชฅเซ€ (เชœเซ‡เชฅเซ€ เชŸเซ‡เช•เซเชธเซเชŸเชฎเชพเช‚ เช—เชกเชฌเชก เชจ เชฅเชพเชฏ), เชชเชฐเช‚เชคเซ เช•เซเชฏเชพเช‚เช• เชคเซ‡ เชชเซเชฐเช•เซเชฐเชฟเชฏเชพเชฎเชพเช‚ เชธเช‚เชถเซ‹เชงเชฟเชค เช•เชฐเชตเชพเชฎเชพเช‚ เช†เชตเซ‡ เช›เซ‡. เชธเช‚เชชเซ‚เชฐเซเชฃ เชตเชฐเซเช•เชฟเช‚เช— เช•เซ‹เชก เช‰เชฆเชพเชนเชฐเชฃเซ‹ เชฐเซ€เชชเซ‹เชเซ€เชŸเชฐเซ€เชฎเชพเช‚ เชฎเชณเซ€ เชถเช•เซ‡ เช›เซ‡ https://github.com/dm-logv/airflow-tutorial.

เชกเซ‹เช•เชฐ-เช•เช‚เชชเซ‹เช.เชเชฎเชเชฎเชเชฒ

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

เชจเซ‹เช‚เชง:

  • เชฐเชšเชจเชพเชจเซ€ เชเชธเซ‡เชฎเซเชฌเชฒเซ€เชฎเชพเช‚, เชนเซเช‚ เชฎเซ‹เชŸเซ‡ เชญเชพเช—เซ‡ เชœเชพเชฃเซ€เชคเซ€ เช›เชฌเซ€ เชชเชฐ เช†เชงเชพเชฐ เชฐเชพเช–เชคเซ‹ เชนเชคเซ‹ เชชเช•เชฒ/เชกเซ‹เช•เชฐ-เชเชฐเชซเซเชฒเซ‹ - เชคเซ‡เชจเซ‡ เชคเชชเชพเชธเชตเชพเชจเซ€ เช–เชพเชคเชฐเซ€ เช•เชฐเซ‹. เช•เชฆเชพเชš เชคเชฎเชพเชฐเซ‡ เชคเชฎเชพเชฐเชพ เชœเซ€เชตเชจเชฎเชพเช‚ เชฌเซ€เชœเซ€ เช•เซ‹เชˆ เชตเชธเซเชคเซเชจเซ€ เชœเชฐเซ‚เชฐ เชจเชฅเซ€.
  • เชคเชฎเชพเชฎ เชเชฐเชซเซเชฒเซ‹ เชธเซ‡เชŸเชฟเช‚เช—เซเชธ เชฎเชพเชคเซเชฐ เชฎเชพเชฐเชซเชคเซ‡ เชœ เช‰เชชเชฒเชฌเซเชง เชจเชฅเซ€ airflow.cfg, เชชเชฃ เชชเชฐเซเชฏเชพเชตเชฐเชฃ เชšเชฒเซ‹ เชฆเซเชตเชพเชฐเชพ (เชตเชฟเช•เชพเชธเช•เชฐเซเชคเชพเช“เชจเซ‡ เช†เชญเชพเชฐ), เชœเซ‡เชจเซ‹ เชฎเซ‡เช‚ เชฆเซ‚เชทเชฟเชค เชฐเซ€เชคเซ‡ เชฒเชพเชญ เชฒเซ€เชงเซ‹.
  • เชธเซเชตเชพเชญเชพเชตเชฟเช• เชฐเซ€เชคเซ‡, เชคเซ‡ เช‰เชคเซเชชเชพเชฆเชจ เชฎเชพเชŸเซ‡ เชคเซˆเชฏเชพเชฐ เชจเชฅเซ€: เชฎเซ‡เช‚ เช‡เชฐเชพเชฆเชพเชชเซ‚เชฐเซเชตเช• เชนเซƒเชฆเชฏเชจเชพ เชงเชฌเช•เชพเชฐเชพ เช•เชจเซเชŸเซ‡เชจเชฐ เชชเชฐ เชฎเซ‚เช•เซเชฏเชพ เชจเชฅเซ€, เชฎเชจเซ‡ เชธเซเชฐเช•เซเชทเชพเชจเซ€ เชšเชฟเช‚เชคเชพ เชจเชฅเซ€. เชชเชฐเช‚เชคเซ เชฎเซ‡เช‚ เช…เชฎเชพเชฐเชพ เชชเซเชฐเชฏเซ‹เช—เช•เชพเชฐเซ‹ เชฎเชพเชŸเซ‡ เช“เช›เชพเชฎเชพเช‚ เช“เช›เซเช‚ เชฏเซ‹เช—เซเชฏ เช•เชฐเซเชฏเซเช‚.
  • เชจเซ‹เช‚เชง เช•เชฐเซ‹ เช•เซ‡:
    • เชกเซ‡เช— เชซเซ‹เชฒเซเชกเชฐ เชถเซ‡เชกเซเชฏเซ‚เชฒเชฐ เช…เชจเซ‡ เช•เชพเชฎเชฆเชพเชฐเซ‹ เชฌเช‚เชจเซ‡ เชฎเชพเชŸเซ‡ เชธเซเชฒเชญ เชนเซ‹เชตเซเช‚ เชœเซ‹เชˆเช.
    • เช† เชœ เชฌเชงเซ€ เชคเซƒเชคเซ€เชฏ-เชชเช•เซเชท เชฒเชพเช‡เชฌเซเชฐเซ‡เชฐเซ€เช“เชจเซ‡ เชฒเชพเช—เซ เชชเชกเซ‡ เช›เซ‡ - เชคเซ‡ เชฌเชงเชพ เชถเซ‡เชกเซเชฏเซ‚เชฒเชฐ เช…เชจเซ‡ เช•เชพเชฎเชฆเชพเชฐเซ‹ เชธเชพเชฅเซ‡ เชฎเชถเซ€เชจเซ‹ เชชเชฐ เช‡เชจเซเชธเซเชŸเซ‹เชฒ เช•เชฐเซ‡เชฒเชพ เชนเซ‹เชตเชพ เชœเซ‹เชˆเช.

เชธเชพเชฐเซเช‚, เชนเชตเซ‡ เชคเซ‡ เชธเชฐเชณ เช›เซ‡:

$ docker-compose up --scale worker=3

เชฌเชงเซเช‚ เชตเชงเซ€ เชœเชพเชฏ เชชเช›เซ€, เชคเชฎเซ‡ เชตเซ‡เชฌ เชˆเชจเซเชŸเชฐเชซเซ‡เชธ เชœเซ‹เชˆ เชถเช•เซ‹ เช›เซ‹:

เชฎเซ‚เชณเชญเซ‚เชค เช–เซเชฏเชพเชฒเซ‹

เชœเซ‹ เชคเชฎเซ‡ เช† เชฌเชงเชพ "เชกเซ‡เช—เซเชธ" เชฎเชพเช‚ เช•เช‚เชˆเชชเชฃ เชธเชฎเชœเซ€ เชถเช•เซเชฏเชพ เชจเชฅเซ€, เชคเซ‹ เช…เชนเซ€เช‚ เชเช• เชจเชพเชจเซ‹ เชถเชฌเซเชฆเช•เซ‹เชถ เช›เซ‡:

  • เชถเซ‡เชกเซเชฏเซ‚เชฒเชฐ - เชเชฐเชซเซเชฒเซ‹เชฎเชพเช‚ เชธเซŒเชฅเซ€ เชฎเชนเชคเซเชตเชชเซ‚เชฐเซเชฃ เช•เชพเช•เชพ, เชœเซ‡ เชจเชฟเชฏเช‚เชคเซเชฐเชฟเชค เช•เชฐเซ‡ เช›เซ‡ เช•เซ‡ เชฐเซ‹เชฌเซ‹เชŸเซเชธ เชธเช–เชค เชฎเชนเซ‡เชจเชค เช•เชฐเซ‡ เช›เซ‡, เชตเซเชฏเช•เซเชคเชฟ เชจเชนเซ€เช‚: เชถเซ‡เชกเซเชฏเซ‚เชฒเชจเซเช‚ เชจเชฟเชฐเซ€เช•เซเชทเชฃ เช•เชฐเซ‡ เช›เซ‡, เชกเซ‡เช— เช…เชชเชกเซ‡เชŸ เช•เชฐเซ‡ เช›เซ‡, เช•เชพเชฐเซเชฏเซ‹ เชถเชฐเซ‚ เช•เชฐเซ‡ เช›เซ‡.

    เชธเชพเชฎเชพเชจเซเชฏ เชฐเซ€เชคเซ‡, เชœเซ‚เชจเชพ เชธเช‚เชธเซเช•เชฐเชฃเซ‹เชฎเชพเช‚, เชคเซ‡เชจเซ‡ เชฎเซ‡เชฎเชฐเซ€เชฎเชพเช‚ เชธเชฎเชธเซเชฏเชพ เชนเชคเซ€ (เชจเชพ, เชธเซเชฎเซƒเชคเชฟ เชญเซเชฐเช‚เชถ เชจเชฅเซ€, เชชเชฐเช‚เชคเซ เชฒเซ€เช•เซเชธ) เช…เชจเซ‡ เชฒเซ‡เช—เชธเซ€ เชชเซ‡เชฐเชพเชฎเซ€เชŸเชฐ เชชเชฃ เชฐเซ‚เชชเชฐเซ‡เช–เชพเช“เชฎเชพเช‚ เชœ เชฐเชนเซเชฏเซเช‚. run_duration - เชคเซ‡เชจเซเช‚ เชชเซเชจเชƒเชชเซเชฐเชพเชฐเช‚เชญ เช…เช‚เชคเชฐเชพเชฒ. เชชเชฐเช‚เชคเซ เชนเชตเซ‡ เชฌเชงเซเช‚ เชฌเชฐเชพเชฌเชฐ เช›เซ‡.

  • เชกเซ…เช— (เช‰เชฐเซเชซเซ‡ "เชกเซ‡เช—") - "เชจเชฟเชฐเซเชฆเซ‡เชถเชฟเชค เชเชธเชพเชฏเช•เซเชฒเชฟเช• เช—เซเชฐเชพเชซ", เชชเชฐเช‚เชคเซ เช†เชตเซ€ เชตเซเชฏเชพเช–เซเชฏเชพ เชฅเซ‹เชกเชพ เชฒเซ‹เช•เซ‹เชจเซ‡ เช•เชนเซ‡เชถเซ‡, เชชเชฐเช‚เชคเซ เชนเช•เซ€เช•เชคเชฎเชพเช‚ เชคเซ‡ เชเช•เชฌเซ€เชœเชพ เชธเชพเชฅเซ‡ เช•เซเชฐเชฟเชฏเชพเชชเซเชฐเชคเชฟเช•เซเชฐเชฟเชฏเชพ เช•เชฐเชคเชพ เช•เชพเชฐเซเชฏเซ‹ เชฎเชพเชŸเซ‡เชจเซเช‚ เช•เชจเซเชŸเซ‡เชจเชฐ เช›เซ‡ (เชจเซ€เชšเซ‡ เชœเซเช“) เช…เชฅเชตเชพ SSIS เชฎเชพเช‚ เชชเซ‡เช•เซ‡เชœ เช…เชจเซ‡ เช‡เชจเซเชซเซ‹เชฐเซเชฎเซ‡เชŸเชฟเช•เชพเชฎเชพเช‚ เชตเชฐเซเช•เชซเซเชฒเซ‹เชจเซเช‚ เชเชจเชพเชฒเซ‹เช— เช›เซ‡. .

    เชกเซ‡เช—เซเชธ เช‰เชชเชฐเชพเช‚เชค, เชนเชœเซ€ เชชเชฃ เชธเชฌเชกเซ‡เช—เซเชธ เชนเซ‹เชˆ เชถเช•เซ‡ เช›เซ‡, เชชเชฐเช‚เชคเซ เช…เชฎเซ‡ เชฎเซ‹เชŸเซ‡ เชญเชพเช—เซ‡ เชคเซ‡ เชฎเซ‡เชณเชตเซ€ เชถเช•เชคเชพ เชจเชฅเซ€.

  • เชกเซ€เชเชœเซ€ เชฐเชจ - เชชเซเชฐเชพเชฐเช‚เชญเชฟเช• เชกเซ‡เช—, เชœเซ‡ เชคเซ‡เชจเซ€ เชชเซ‹เชคเชพเชจเซ€ เชธเซ‹เช‚เชชเซ‡เชฒ เช›เซ‡ execution_date. เชธเชฎเชพเชจ เชกเซ‡เช—เชจเชพ เชกเชพเช—เซเชฐเชจเซเชธ เชธเชฎเชพเช‚เชคเชฐ เชฐเซ€เชคเซ‡ เช•เชพเชฎ เช•เชฐเซ€ เชถเช•เซ‡ เช›เซ‡ (เชœเซ‹ เชคเชฎเซ‡ เชคเชฎเชพเชฐเชพ เช•เชพเชฐเซเชฏเซ‹เชจเซ‡ เช…เชธเซเชชเชทเซเชŸ เชฌเชจเชพเชตเซเชฏเชพ เช›เซ‡, เช…เชฒเชฌเชคเซเชค).
  • เช‘เชชเชฐเซ‡เชŸเชฐ เชšเซ‹เช•เซเช•เชธ เช•เซเชฐเชฟเชฏเชพ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡ เชœเชตเชพเชฌเชฆเชพเชฐ เช•เซ‹เชกเชจเชพ เชŸเซเช•เชกเชพเช“ เช›เซ‡. เชคเซเชฏเชพเช‚ เชคเซเชฐเชฃ เชชเซเชฐเช•เชพเชฐเชจเชพ เช“เชชเชฐเซ‡เชŸเชฐเซ‹ เช›เซ‡:
    • เช•เซเชฐเชฟเชฏเชพเช…เชฎเชพเชฐเชพ เชชเซเชฐเชฟเชฏเชจเซ€ เชœเซ‡เชฎ PythonOperator, เชœเซ‡ เช•เซ‹เชˆเชชเชฃ (เชฎเชพเชจเซเชฏ) Python เช•เซ‹เชก เชšเชฒเชพเชตเซ€ เชถเช•เซ‡ เช›เซ‡;
    • เชŸเซเชฐเชพเชจเซเชธเชซเชฐ, เชœเซ‡ เชกเซ‡เชŸเชพเชจเซ‡ เชธเซเชฅเชณเซ‡เชฅเซ€ เชฌเซ€เชœเชพ เชธเซเชฅเชพเชจเซ‡ เชชเชฐเชฟเชตเชนเชจ เช•เชฐเซ‡ เช›เซ‡, เช•เชนเซ‡ เช›เซ‡, MsSqlToHiveTransfer;
    • เชธเซ‡เชจเซเชธเชฐ เชฌเซ€เชœเซ€ เชฌเชพเชœเซ, เชคเซ‡ เชคเชฎเชจเซ‡ เช˜เชŸเชจเชพ เชจ เชฌเชจเซ‡ เชคเซเชฏเชพเช‚ เชธเซเชงเซ€ เชกเซ‡เช—เชจเชพ เช†เช—เชณเชจเชพ เช…เชฎเชฒเชจเซ‡ เชชเซเชฐเชคเชฟเช•เซเชฐเชฟเชฏเชพ เช†เชชเชตเชพ เช…เชฅเชตเชพ เชงเซ€เชฎเซเช‚ เช•เชฐเชตเชพเชจเซ€ เชฎเช‚เชœเซ‚เชฐเซ€ เช†เชชเชถเซ‡. HttpSensor เช‰เชฒเซเชฒเซ‡เช–เชฟเชค เช…เช‚เชคเชฟเชฎ เชฌเชฟเช‚เชฆเซ เช–เซ‡เช‚เชšเซ€ เชถเช•เซ‡ เช›เซ‡, เช…เชจเซ‡ เชœเซเชฏเชพเชฐเซ‡ เช‡เชšเซเช›เชฟเชค เชชเซเชฐเชคเชฟเชธเชพเชฆเชจเซ€ เชฐเชพเชน เชœเซ‹เชตเชพเชฎเชพเช‚ เช†เชตเซ‡ เช›เซ‡, เชคเซเชฏเชพเชฐเซ‡ เชŸเซเชฐเชพเชจเซเชธเชซเชฐ เชถเชฐเซ‚ เช•เชฐเซ‹ GoogleCloudStorageToS3Operator. เชœเชฟเชœเซเชžเชพเชธเซ เชฎเชจ เชชเซ‚เช›เชถเซ‡: โ€œเชถเชพ เชฎเชพเชŸเซ‡? เช›เซ‡เชตเชŸเซ‡, เชคเชฎเซ‡ เช‘เชชเชฐเซ‡เชŸเชฐเชฎเชพเช‚ เชœ เชชเซเชจเชฐเชพเชตเชฐเซเชคเชจเซ‹ เช•เชฐเซ€ เชถเช•เซ‹ เช›เซ‹!" เช…เชจเซ‡ เชชเช›เซ€, เชธเชธเซเชชเซ‡เชจเซเชกเซ‡เชก เช“เชชเชฐเซ‡เชŸเชฐเซ‹ เชธเชพเชฅเซ‡เชจเชพ เช•เชพเชฐเซเชฏเซ‹เชจเชพ เชชเซ‚เชฒเชจเซ‡ เชšเซ‹เช‚เชŸเซ€ เชจ เชœเชตเชพ เชฎเชพเชŸเซ‡. เชธเซ‡เชจเซเชธเชฐ เช†เช—เชฒเชพ เชชเซเชฐเชฏเชพเชธ เชชเชนเซ‡เชฒเชพ เชถเชฐเซ‚ เชฅเชพเชฏ เช›เซ‡, เชคเชชเชพเชธเซ‡ เช›เซ‡ เช…เชจเซ‡ เชฎเซƒเชคเซเชฏเซ เชชเชพเชฎเซ‡ เช›เซ‡.
  • เช•เชพเชฐเซเชฏ - เช˜เซ‹เชทเชฟเชค เช“เชชเชฐเซ‡เชŸเชฐเซ‹, เชชเซเชฐเช•เชพเชฐเชจเซ‡ เชงเซเชฏเชพเชจเชฎเชพเช‚ เชฒเซ€เชงเชพ เชตเชฟเชจเชพ, เช…เชจเซ‡ เชกเซ‡เช— เชธเชพเชฅเซ‡ เชœเซ‹เชกเชพเชฏเซ‡เชฒเชพเชจเซ‡ เช•เชพเชฐเซเชฏเชจเชพ เชชเชฆ เชชเชฐ เชฌเชขเชคเซ€ เช†เชชเชตเชพเชฎเชพเช‚ เช†เชตเซ‡ เช›เซ‡.
  • เช•เชพเชฐเซเชฏ เช‰เชฆเชพเชนเชฐเชฃ - เชœเซเชฏเชพเชฐเซ‡ เชธเชพเชฎเชพเชจเซเชฏ เช†เชฏเซ‹เชœเช•เซ‡ เชจเช•เซเช•เซ€ เช•เชฐเซเชฏเซเช‚ เช•เซ‡ เช•เชพเชฐเซเชฏเช•เชฐเซเชคเชพ-เช•เชพเชฎเชฆเชพเชฐเซ‹ เชชเชฐ เชฏเซเชฆเซเชงเชฎเชพเช‚ เช•เชพเชฐเซเชฏเซ‹ เชฎเซ‹เช•เชฒเชตเชพเชจเซ‹ เชธเชฎเชฏ เช†เชตเซ€ เช—เชฏเซ‹ เช›เซ‡ (เชœเซ‹ เช†เชชเชฃเซ‡ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เช เชคเซ‹ เชธเซเชฅเชณ เชชเชฐ เชœ LocalExecutor เช…เชฅเชตเชพ เชจเชพ เช•เชฟเชธเซเชธเชพเชฎเชพเช‚ เชฆเซ‚เชฐเชธเซเชฅ เชจเซ‹เชก เชชเชฐ CeleryExecutor), เชคเซ‡ เชคเซ‡เชฎเชจเซ‡ เชธเช‚เชฆเชฐเซเชญ เชธเซ‹เช‚เชชเซ‡ เช›เซ‡ (เชเชŸเชฒเซ‡ โ€‹โ€‹โ€‹โ€‹เช•เซ‡, เชšเชฒเซ‹เชจเซ‹ เชธเชฎเซ‚เชน - เชเช•เซเชเซ‡เช•เซเชฏเซเชถเชจ เชชเซ‡เชฐเชพเชฎเซ€เชŸเชฐ), เช†เชฆเซ‡เชถ เช…เชฅเชตเชพ เช•เซเชตเซ‡เชฐเซ€ เชŸเซ‡เชฎเซเชชเชฒเซ‡เชŸเซเชธเชจเซ‡ เชตเชฟเชธเซเชคเซƒเชค เช•เชฐเซ‡ เช›เซ‡, เช…เชจเซ‡ เชคเซ‡เชฎเชจเซ‡ เชชเซ‚เชฒ เช•เชฐเซ‡ เช›เซ‡.

เช…เชฎเซ‡ เช•เชพเชฐเซเชฏเซ‹ เชœเชจเชฐเซ‡เชŸ เช•เชฐเซ€เช เช›เซ€เช

เชชเซเชฐเชฅเชฎ, เชšเชพเชฒเซ‹ เช†เชชเชฃเชพ เชกเช—เชจเซ€ เชธเชพเชฎเชพเชจเซเชฏ เชฏเซ‹เชœเชจเชพเชจเซ€ เชฐเซ‚เชชเชฐเซ‡เช–เชพ เช†เชชเซ€เช, เช…เชจเซ‡ เชชเช›เซ€ เช†เชชเชฃเซ‡ เชตเชงเซ เช…เชจเซ‡ เชตเชงเซ เชตเชฟเช—เชคเซ‹เชฎเชพเช‚ เชกเชพเช‡เชต เช•เชฐเซ€เชถเซเช‚, เช•เชพเชฐเชฃ เช•เซ‡ เช†เชชเชฃเซ‡ เช•เซ‡เชŸเชฒเชพเช• เชฌเชฟเชจ-เชคเซเชšเซเช› เช‰เช•เซ‡เชฒเซ‹ เชฒเชพเช—เซ เช•เชฐเซ€เช เช›เซ€เช.

เชคเซ‡เชฅเซ€, เชคเซ‡เชจเชพ เชธเชฐเชณ เชธเซเชตเชฐเซ‚เชชเชฎเชพเช‚, เช†เชตเชพ เชกเซ‡เช— เช†เชจเชพ เชœเซ‡เชตเซ‹ เชฆเซ‡เช–เชพเชถเซ‡:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

เชšเชพเชฒเซ‹ เชคเซ‡เชจเซ‡ เชถเซ‹เชงเซ€ เช•เชพเชขเซ€เช:

  • เชชเซเชฐเชฅเชฎ, เช…เชฎเซ‡ เชœเชฐเซ‚เชฐเซ€ เชฒเชฟเชฌเซเชธ เช†เชฏเชพเชค เช•เชฐเซ€เช เช›เซ€เช เช…เชจเซ‡ เช•เช‚เชˆเช• เชฌเซ€เชœเซเช‚;
  • sql_server_ds เช›เซ‡ List[namedtuple[str, str]] เชเชฐเชซเซเชฒเซ‹ เช•เชจเซ‡เช•เซเชถเชจเซเชธเชจเชพ เช•เชจเซ‡เช•เซเชถเชจเชจเชพ เชจเชพเชฎเซ‹ เช…เชจเซ‡ เชกเซ‡เชŸเชพเชฌเซ‡เชธเซ‡เชธ เช•เซ‡ เชœเซ‡เชฎเชพเช‚เชฅเซ€ เช…เชฎเซ‡ เช…เชฎเชพเชฐเซ€ เชชเซเชฒเซ‡เชŸ เชฒเชˆเชถเซเช‚;
  • dag - เช…เชฎเชพเชฐเชพ เชกเซ‡เช—เชจเซ€ เชœเชพเชนเซ‡เชฐเชพเชค, เชœเซ‡ เช†เชตเชถเซเชฏเช•เชชเชฃเซ‡ เชนเซ‹เชตเซ€ เชœเซ‹เชˆเช globals(), เช…เชจเซเชฏเชฅเชพ เชเชฐเชซเซเชฒเซ‹ เชคเซ‡เชจเซ‡ เชถเซ‹เชงเซ€ เชถเช•เชถเซ‡ เชจเชนเซ€เช‚. เชกเช—เชจเซ‡ เชชเชฃ เช•เชนเซ‡เชตเชพเชจเซ€ เชœเชฐเซ‚เชฐ เช›เซ‡:
    • เชคเซ‡เชจเซเช‚ เชจเชพเชฎ เชถเซเช‚ เช›เซ‡ orders - เช† เชจเชพเชฎ เชชเช›เซ€ เชตเซ‡เชฌ เชˆเชจเซเชŸเชฐเชซเซ‡เชธเชฎเชพเช‚ เชฆเซ‡เช–เชพเชถเซ‡,
    • เช•เซ‡ เชคเซ‡ เช†เช เชฎเซ€ เชœเซเชฒเชพเชˆเชจเซ€ เชฎเชงเซเชฏเชฐเชพเชคเซเชฐเชฟเชฅเซ€ เช•เชพเชฎ เช•เชฐเชถเซ‡,
    • เช…เชจเซ‡ เชคเซ‡ เชฒเช—เชญเช— เชฆเชฐ 6 เช•เชฒเชพเช•เซ‡ เชšเชพเชฒเชตเซเช‚ เชœเซ‹เชˆเช (เช…เชนเซ€เช‚ เชคเซ‡เชจเชพ เชฌเชฆเชฒเซ‡ เช…เช˜เชฐเชพ เชฒเซ‹เช•เซ‹ เชฎเชพเชŸเซ‡ timedelta() เชธเซเชตเซ€เช•เชพเชฐเซเชฏ cron-เชฒเชพเช‡เชจ 0 0 0/6 ? * * *, เช“เช›เซ€ เช เช‚เชกเซ€ เชฎเชพเชŸเซ‡ - เชœเซ‡เชตเซ€ เช…เชญเชฟเชตเซเชฏเช•เซเชคเชฟ @daily);
  • workflow() เชฎเซเช–เซเชฏ เช•เชพเชฎ เช•เชฐเชถเซ‡, เชชเชฃ เช…เชคเซเชฏเชพเชฐเซ‡ เชจเชนเซ€เช‚. เชนเชฎเชฃเชพเช‚ เชฎเชพเชŸเซ‡, เช…เชฎเซ‡ เชซเช•เซเชค เช…เชฎเชพเชฐเชพ เชธเช‚เชฆเชฐเซเชญเชจเซ‡ เชฒเซ‹เช—เชฎเชพเช‚ เชกเชฎเซเชช เช•เชฐเซ€เชถเซเช‚.
  • เช…เชจเซ‡ เชนเชตเซ‡ เช•เชพเชฐเซเชฏเซ‹ เชฌเชจเชพเชตเชตเชพเชจเซ‹ เชธเชฐเชณ เชœเชพเชฆเซ:
    • เช…เชฎเซ‡ เช…เชฎเชพเชฐเชพ เชธเซเชคเซเชฐเซ‹เชคเซ‹ เชฆเซเชตเชพเชฐเชพ เชšเชฒเชพเชตเซ€เช เช›เซ€เช;
    • เชชเซเชฐเชพเชฐเช‚เชญ เช•เชฐเซ‹ PythonOperator, เชœเซ‡ เช…เชฎเชพเชฐเชพ เชกเชฎเซ€เชจเซ‡ เช…เชฎเชฒเชฎเชพเช‚ เชฎเซ‚เช•เชถเซ‡ workflow(). เช•เชพเชฐเซเชฏเชจเซเช‚ เชตเชฟเชถเชฟเชทเซเชŸ เชจเชพเชฎ (เชกเซ‡เช—เชจเซ€ เช…เช‚เชฆเชฐ) เชธเซเชชเชทเซเชŸ เช•เชฐเชตเชพเชจเซเช‚ เชญเซ‚เชฒเชถเซ‹ เชจเชนเซ€เช‚ เช…เชจเซ‡ เชกเซ‡เช—เชจเซ‡ เชœ เชฌเชพเช‚เชงเซ‹. เชงเซเชตเชœ provide_context เชฌเชฆเชฒเชพเชฎเชพเช‚, เชซเช‚เช•เซเชถเชจเชฎเชพเช‚ เชตเชงเชพเชฐเชพเชจเซ€ เชฆเชฒเซ€เชฒเซ‹ เชฐเซ‡เชกเชถเซ‡, เชœเซ‡เชจเซ‹ เช…เชฎเซ‡ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เชจเซ‡ เช•เชพเชณเชœเซ€เชชเซ‚เชฐเซเชตเช• เชเช•เชคเซเชฐเชฟเชค เช•เชฐเซ€เชถเซเช‚ **context.

เชนเชฎเชฃเชพเช‚ เชฎเชพเชŸเซ‡, เชคเซ‡ เชฌเชงเซ เชœ เช›เซ‡. เช…เชฎเชจเซ‡ เชถเซเช‚ เชฎเชณเซเชฏเซเช‚:

  • เชตเซ‡เชฌ เชˆเชจเซเชŸเชฐเชซเซ‡เชธเชฎเชพเช‚ เชจเชตเซเช‚ เชกเซ‡เช—,
  • เชฆเซ‹เชข เชธเซ‹ เช•เชพเชฐเซเชฏเซ‹ เชœเซ‡ เชธเชฎเชพเช‚เชคเชฐ เชฐเซ€เชคเซ‡ เชšเชฒเชพเชตเชตเชพเชฎเชพเช‚ เช†เชตเชถเซ‡ (เชœเซ‹ เชเชฐเชซเซเชฒเซ‹, เชธเซ‡เชฒเชฐเซ€ เชธเซ‡เชŸเชฟเช‚เช—เซเชธ เช…เชจเซ‡ เชธเชฐเซเชตเชฐ เช•เซเชทเชฎเชคเชพ เชคเซ‡เชจเซ‡ เชฎเช‚เชœเซ‚เชฐเซ€ เช†เชชเซ‡ เช›เซ‡).

เชธเชพเชฐเซเช‚, เชฒเช—เชญเช— เชธเชฎเชœเชพเชฏเซเช‚.

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚
เช…เชตเชฒเช‚เชฌเชจ เช•เซ‹เชฃ เชธเซเชฅเชพเชชเชฟเชค เช•เชฐเชถเซ‡?

เช† เช†เช–เซ€ เชตเชธเซเชคเซเชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเชพ เชฎเชพเชŸเซ‡, เชฎเซ‡เช‚ เชธเซเช•เซเชฐเซ‚ เช•เชฐเซเชฏเซเช‚ docker-compose.yml เชชเซเชฐเช•เซเชฐเชฟเชฏเชพ requirements.txt เชฌเชงเชพ เช—เชพเช‚เช เซ‹ เชชเชฐ.

เชนเชตเซ‡ เชคเซ‡ เช—เชฏเซ‹:

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

เช—เซเชฐเซ‡ เชธเซเช•เซเชตเซ‡เชฐ เช เชถเซ‡เชกเซเชฏเซ‚เชฒเชฐ เชฆเซเชตเชพเชฐเชพ เชชเซเชฐเช•เซเชฐเชฟเชฏเชพ เช•เชฐเชพเชฏเซ‡เชฒเชพ เช•เชพเชฐเซเชฏ เช‰เชฆเชพเชนเชฐเชฃเซ‹ เช›เซ‡.

เช…เชฎเซ‡ เชฅเซ‹เชกเซ€ เชฐเชพเชน เชœเซ‹เชˆเช เช›เซ€เช, เช•เชพเชฎเชฆเชพเชฐเซ‹ เชฆเซเชตเชพเชฐเชพ เช•เชพเชฐเซเชฏเซ‹ เชชเซ‚เชฐเซเชฃ เช•เชฐเชตเชพเชฎเชพเช‚ เช†เชตเซ‡ เช›เซ‡:

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

เชฒเซ€เชฒเชพเช“, เช…เชฒเชฌเชคเซเชค, เชธเชซเชณเชคเชพเชชเซ‚เชฐเซเชตเช• เชคเซ‡เชฎเชจเซเช‚ เช•เชพเชฐเซเชฏ เชชเซ‚เชฐเซเชฃ เช•เชฐเซเชฏเซเช‚ เช›เซ‡. เชฐเซ‡เชกเซเชธ เช–เซ‚เชฌ เชธเชซเชณ เชจเชฅเซ€.

เชฎเชพเชฐเซเช— เชฆเซเชตเชพเชฐเชพ, เช…เชฎเชพเชฐเชพ เช‰เชคเซเชชเชพเชฆเชจ เชชเชฐ เช•เซ‹เชˆ เชซเซ‹เชฒเซเชกเชฐ เชจเชฅเซ€ ./dags, เชฎเชถเซ€เชจเซ‹ เชตเชšเซเชšเซ‡ เช•เซ‹เชˆ เชธเซเชฎเซ‡เชณ เชจเชฅเซ€ - เชฌเชงเชพ เชกเซ…เช—เซเชธ เช…เช‚เชฆเชฐ เชฐเชนเซ‡ เช›เซ‡ git เช…เชฎเชพเชฐเชพ Gitlab เชชเชฐ, เช…เชจเซ‡ Gitlab CI เชœเซเชฏเชพเชฐเซ‡ เชฎเชฐเซเชœ เช•เชฐเชตเชพเชฎเชพเช‚ เช†เชตเซ‡ เชคเซเชฏเชพเชฐเซ‡ เชฎเชถเซ€เชจเซ‹เชจเซ‡ เช…เชชเชกเซ‡เชŸเซเชธเชจเซเช‚ เชตเชฟเชคเชฐเชฃ เช•เชฐเซ‡ เช›เซ‡ master.

เชซเซ‚เชฒ เชตเชฟเชถเซ‡ เชฅเซ‹เชกเซเช‚

เชœเซเชฏเชพเชฐเซ‡ เช•เชพเชฎเชฆเชพเชฐเซ‹ เช…เชฎเชพเชฐเชพ เชชเซ‡เชธเชฟเชซเชพเชฏเชฐเซเชธเชจเซ‡ เชฎเชพเชฐเชคเชพ เชนเซ‹เชฏ, เชคเซเชฏเชพเชฐเซ‡ เชšเชพเชฒเซ‹ เชฌเซ€เชœเซเช‚ เชเช• เชธเชพเชงเชจ เชฏเชพเชฆ เช•เชฐเซ€เช เชœเซ‡ เช†เชชเชฃเชจเซ‡ เช•เช‚เชˆเช• เชฌเชคเชพเชตเซ€ เชถเช•เซ‡ เช›เซ‡ - เชซเซเชฒเชพเชตเชฐ.

เชตเชฐเซเช•เชฐ เชจเซ‹เชกเซเชธ เชชเชฐ เชธเชพเชฐเชพเช‚เชถ เชฎเชพเชนเชฟเชคเซ€ เชธเชพเชฅเซ‡เชจเซเช‚ เชชเซเชฐเชฅเชฎ เชชเซƒเชทเซเช :

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

เช•เชพเชฐเซเชฏเซ‹ เชธเชพเชฅเซ‡เชจเซเช‚ เชธเซŒเชฅเซ€ เชธเช˜เชจ เชชเซƒเชทเซเช  เชœเซ‡ เช•เชพเชฎ เชชเชฐ เช—เชฏเชพ เช›เซ‡:

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

เช…เชฎเชพเชฐเชพ เชฌเซเชฐเซ‹เช•เชฐเชจเซ€ เชธเซเชฅเชฟเชคเชฟ เชธเชพเชฅเซ‡เชจเซเช‚ เชธเซŒเชฅเซ€ เช•เช‚เชŸเชพเชณเชพเชœเชจเช• เชชเซƒเชทเซเช :

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

เชธเซŒเชฅเซ€ เชคเซ‡เชœเชธเซเชตเซ€ เชชเซƒเชทเซเช  เช•เชพเชฐเซเชฏ เชธเซเชฅเชฟเชคเชฟ เช—เซเชฐเชพเชซ เช…เชจเซ‡ เชคเซ‡เชฎเชจเชพ เช…เชฎเชฒเชจเชพ เชธเชฎเชฏ เชธเชพเชฅเซ‡ เช›เซ‡:

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

เช…เชฎเซ‡ เช…เชจเซเชกเชฐเชฒเซ‹เชกเซ‡เชก เชฒเซ‹เชก เช•เชฐเซ€เช เช›เซ€เช

เชคเซ‡เชฅเซ€, เชคเชฎเชพเชฎ เช•เชพเชฐเซเชฏเซ‹ เชชเซ‚เชฐเซเชฃ เชฅเชˆ เช—เชฏเชพ เช›เซ‡, เชคเชฎเซ‡ เช˜เชพเชฏเชฒเซ‹เชจเซ‡ เชฒเชˆ เชœเชˆ เชถเช•เซ‹ เช›เซ‹.

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

เช…เชจเซ‡ เชคเซเชฏเชพเช‚ เช˜เชฃเชพ เช˜เชพเชฏเชฒ เชฅเชฏเชพ เชนเชคเชพ - เชเช• เช…เชฅเชตเชพ เชฌเซ€เชœเชพ เช•เชพเชฐเชฃเซ‹เชธเชฐ. เชเชฐเชซเซเชฒเซ‹เชจเชพ เชธเชพเชšเชพ เช‰เชชเชฏเซ‹เช—เชจเชพ เช•เชฟเชธเซเชธเชพเชฎเชพเช‚, เช† เช–เซ‚เชฌ เชœ เชšเซ‹เชฐเชธ เชธเซ‚เชšเชตเซ‡ เช›เซ‡ เช•เซ‡ เชกเซ‡เชŸเชพ เชšเซ‹เช•เซเช•เชธเชชเชฃเซ‡ เช†เชตเซเชฏเซ‹ เชจเชฅเซ€.

เชคเชฎเชพเชฐเซ‡ เชฒเซ‹เช— เชœเซ‹เชตเชพเชจเซ€ เชœเชฐเซ‚เชฐ เช›เซ‡ เช…เชจเซ‡ เช˜เชŸเซ€ เช—เชฏเซ‡เชฒเชพ เช•เชพเชฐเซเชฏ เช‰เชฆเชพเชนเชฐเชฃเซ‹เชจเซ‡ เชซเชฐเซ€เชฅเซ€ เชชเซเชฐเชพเชฐเช‚เชญ เช•เชฐเชตเชพเชจเซ€ เชœเชฐเซ‚เชฐ เช›เซ‡.

เช•เซ‹เชˆเชชเชฃ เชšเซ‹เชฐเชธ เชชเชฐ เช•เซเชฒเชฟเช• เช•เชฐเซ€เชจเซ‡, เช…เชฎเซ‡ เช…เชฎเชพเชฐเชพ เชฎเชพเชŸเซ‡ เช‰เชชเชฒเชฌเซเชง เช•เซเชฐเชฟเชฏเชพเช“ เชœเซ‹เชˆเชถเซเช‚:

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

เชคเชฎเซ‡ เชฒเชˆ เชถเช•เซ‹ เช›เซ‹ เช…เชจเซ‡ เชซเซ‹เชฒเชจเชจเซ‡ เชธเชพเชซ เช•เชฐเซ€ เชถเช•เซ‹ เช›เซ‹. เชเชŸเชฒเซ‡ เช•เซ‡, เช†เชชเชฃเซ‡ เชญเซ‚เชฒเซ€ เชœเชˆเช เช›เซ€เช เช•เซ‡ เชคเซเชฏเชพเช‚ เช•เช‚เชˆเช• เชจเชฟเชทเซเชซเชณ เชฅเชฏเซเช‚ เช›เซ‡, เช…เชจเซ‡ เชคเซ‡ เชœ เช‰เชฆเชพเชนเชฐเชฃ เช•เชพเชฐเซเชฏ เชถเซ‡เชกเซเชฏเซ‚เชฒเชฐ เชชเชฐ เชœเชถเซ‡.

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

เชคเซ‡ เชธเซเชชเชทเซเชŸ เช›เซ‡ เช•เซ‡ เชฌเชงเชพ เชฒเชพเชฒ เชšเซ‹เชฐเชธ เชธเชพเชฅเซ‡ เชฎเชพเช‰เชธ เชธเชพเชฅเซ‡ เช†เชตเซเช‚ เช•เชฐเชตเซเช‚ เช เช–เซ‚เชฌ เชฎเชพเชจเชตเซ€เชฏ เชจเชฅเซ€ - เช† เชคเซ‡ เชจเชฅเซ€ เชœเซ‡ เช†เชชเชฃเซ‡ เชเชฐเชซเซเชฒเซ‹ เชชเชพเชธเซ‡เชฅเซ€ เช…เชชเซ‡เช•เซเชทเชพ เชฐเชพเช–เซ€เช เช›เซ€เช. เชธเซเชตเชพเชญเชพเชตเชฟเช• เชฐเซ€เชคเซ‡, เช…เชฎเชพเชฐเซ€ เชชเชพเชธเซ‡ เชธเชพเชฎเซ‚เชนเชฟเช• เชตเชฟเชจเชพเชถเชจเชพ เชถเชธเซเชคเซเชฐเซ‹ เช›เซ‡: Browse/Task Instances

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

เชšเชพเชฒเซ‹ เชเช• เชœ เชธเชฎเชฏเซ‡ เชฌเชงเซเช‚ เชชเชธเช‚เชฆ เช•เชฐเซ€เช เช…เชจเซ‡ เชถเซ‚เชจเซเชฏ เชชเชฐ เชซเชฐเซ€เชฅเซ€ เชธเซ‡เชŸ เช•เชฐเซ€เช, เชธเชพเชšเซ€ เช†เช‡เชŸเชฎ เชชเชฐ เช•เซเชฒเชฟเช• เช•เชฐเซ‹:

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

เชธเชซเชพเชˆ เช•เชฐเซเชฏเชพ เชชเช›เซ€, เช…เชฎเชพเชฐเซ€ เชŸเซ‡เช•เซเชธเซ€เช“ เช†เชจเชพ เชœเซ‡เชตเซ€ เชฆเซ‡เช–เชพเชฏ เช›เซ‡ (เชคเซ‡เช“ เชชเชนเซ‡เชฒเซ‡เชฅเซ€ เชœ เชถเซ‡เชกเซเชฏเซ‚เชฒเชฐ เชฆเซเชตเชพเชฐเชพ เชคเซ‡เชฎเชจเซ‡ เชถเซ‡เชกเซเชฏเซ‚เชฒ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡ เชฐเชพเชน เชœเซ‹เชˆ เชฐเชนเซเชฏเชพ เช›เซ‡):

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

เชœเซ‹เชกเชพเชฃเซ‹, เชนเซเช•เซเชธ เช…เชจเซ‡ เช…เชจเซเชฏ เชšเชฒเซ‹

เช†เช—เชพเชฎเซ€ เชกเซ€เชเชœเซ€ เชœเซ‹เชตเชพเชจเซ‹ เชธเชฎเชฏ เช†เชตเซ€ เช—เชฏเซ‹ เช›เซ‡, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""ะ“ะพัะฟะพะดะฐ ั…ะพั€ะพัˆะธะต, ะพั‚ั‡ะตั‚ั‹ ะพะฑะฝะพะฒะปะตะฝั‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ะะฐั‚ะฐัˆ, ะฟั€ะพัั‹ะฟะฐะนัั, ะผั‹ {{ dag.dag_id }} ัƒั€ะพะฝะธะปะธ
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

เชถเซเช‚ เชฆเชฐเซ‡เช• เชตเซเชฏเช•เซเชคเชฟเช เช•เซเชฏเชพเชฐเซ‡เชฏ เชฐเชฟเชชเซ‹เชฐเซเชŸ เช…เชชเชกเซ‡เชŸ เช•เชฐเซเชฏเซเช‚ เช›เซ‡? เช† เชซเชฐเซ€เชฅเซ€ เชคเซ‡เชฃเซ€ เช›เซ‡: เชกเซ‡เชŸเชพ เช•เซเชฏเชพเช‚เชฅเซ€ เชฎเซ‡เชณเชตเชตเซ‹ เชคเซ‡ เชธเซเชคเซเชฐเซ‹เชคเซ‹เชจเซ€ เชธเซ‚เชšเชฟ เช›เซ‡; เชœเซเชฏเชพเช‚ เชฎเซ‚เช•เชตเซเช‚ เชคเซ‡เชจเซ€ เชธเซ‚เชšเชฟ เช›เซ‡; เชœเซเชฏเชพเชฐเซ‡ เชฌเชงเซเช‚ เชฅเชฏเซเช‚ เช…เชฅเชตเชพ เชคเซ‚เชŸเซ€ เช—เชฏเซเช‚ เชคเซเชฏเชพเชฐเซ‡ เชนเซ‹เช‚เช• เชฎเชพเชฐเชตเชพเชจเซเช‚ เชญเซ‚เชฒเชถเซ‹ เชจเชนเซ€เช‚ (เชธเชพเชฐเซเช‚, เช† เช†เชชเชฃเชพ เชตเชฟเชถเซ‡ เชจเชฅเซ€, เชจเชพ).

เชšเชพเชฒเซ‹ เชซเชฐเซ€เชฅเซ€ เชซเชพเช‡เชฒเชฎเชพเช‚ เชœเชˆเช เช…เชจเซ‡ เชจเชตเซ€ เช…เชธเซเชชเชทเซเชŸ เชธเชพเชฎเช—เซเชฐเซ€ เชœเซ‹เชˆเช:

  • from commons.operators import TelegramBotSendMessage - เช…เชฎเชจเซ‡ เช…เชฎเชพเชฐเชพ เชชเซ‹เชคเชพเชจเชพ เช“เชชเชฐเซ‡เชŸเชฐเซเชธ เชฌเชจเชพเชตเชตเชพเชฅเซ€ เช•เช‚เชˆ เชฐเซ‹เช•เชคเซเช‚ เชจเชฅเซ€, เชœเซ‡เชจเซ‹ เช…เชฎเซ‡ เช…เชจเชฌเซเชฒเซ‹เช• เช•เชฐเซ‡เชฒเชพเชจเซ‡ เชธเช‚เชฆเซ‡เชถเชพ เชฎเซ‹เช•เชฒเชตเชพ เชฎเชพเชŸเซ‡ เชเช• เชจเชพเชจเซเช‚ เชฐเซ‡เชชเชฐ เชฌเชจเชพเชตเซ€เชจเซ‡ เชฒเชพเชญ เชฒเซ€เชงเซ‹ เช›เซ‡. (เช…เชฎเซ‡ เชจเซ€เชšเซ‡ เช† เช“เชชเชฐเซ‡เชŸเชฐ เชตเชฟเชถเซ‡ เชตเชงเซ เชตเชพเชค เช•เชฐเซ€เชถเซเช‚);
  • default_args={} - เชกเซ‡เช— เชคเซ‡เชจเชพ เชคเชฎเชพเชฎ เช“เชชเชฐเซ‡เชŸเชฐเซ‹เชจเซ‡ เชธเชฎเชพเชจ เชฆเชฒเซ€เชฒเซ‹ เชตเชฟเชคเชฐเชฟเชค เช•เชฐเซ€ เชถเช•เซ‡ เช›เซ‡;
  • to='{{ var.value.all_the_kings_men }}' - เช•เซเชทเซ‡เชคเซเชฐ to เช…เชฎเชพเชฐเซ€ เชชเชพเชธเซ‡ เชนเชพเชฐเซเชกเช•เซ‹เชก เชจเชนเซ€เช‚ เชนเซ‹เชฏ, เชชเชฐเช‚เชคเซ เชœเชฟเชจเซเชœเชพเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เชจเซ‡ เช—เชคเชฟเชถเซ€เชฒ เชฐเซ€เชคเซ‡ เชœเชจเชฐเซ‡เชŸ เช•เชฐเชตเชพเชฎเชพเช‚ เช†เชตเซ‡ เช›เซ‡ เช…เชจเซ‡ เช‡เชฎเซ‡เช‡เชฒเซเชธเชจเซ€ เชธเซ‚เชšเชฟ เชธเชพเชฅเซ‡เชจเชพ เชšเชฒ, เชœเซ‡ เชฎเซ‡เช‚ เช•เชพเชณเชœเซ€เชชเซ‚เชฐเซเชตเช• เชฎเซ‚เช•เซเชฏเซเช‚ เช›เซ‡ Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - เช“เชชเชฐเซ‡เชŸเชฐ เชถเชฐเซ‚ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡เชจเซ€ เชถเชฐเชค. เช…เชฎเชพเชฐเชพ เช•เชฟเชธเซเชธเชพเชฎเชพเช‚, เชœเซ‹ เชฌเชงเซ€ เชจเชฟเชฐเซเชญเชฐเชคเชพเช“ เช•เชพเชฎ เช•เชฐเซ€ เช—เชˆ เชนเซ‹เชฏ เชคเซ‹ เชœ เชชเชคเซเชฐ เชฌเซ‹เชธเชจเซ‡ เชœเชถเซ‡ เชธเชซเชณเชคเชพเชชเซ‚เชฐเซเชตเช•;
  • tg_bot_conn_id='tg_main' - เชฆเชฒเซ€เชฒเซ‹ conn_id เช…เชฎเซ‡ เชœเซ‡ เช•เชจเซ‡เช•เซเชถเชจ ID เชฌเชจเชพเชตเซ€เช เช›เซ€เช เชคเซ‡ เชธเซเชตเซ€เช•เชพเชฐเซ€เช เช›เซ€เช Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - เชŸเซ‡เชฒเชฟเช—เซเชฐเชพเชฎเชฎเชพเช‚ เชธเช‚เชฆเซ‡เชถเชพเช“ เชซเช•เซเชค เชคเซเชฏเชพเชฐเซ‡ เชœ เช‰เชกเซ€ เชœเชถเซ‡ เชœเซ‹ เชคเซเชฏเชพเช‚ เชชเชกเชคเซ€ เช•เชพเชฐเซเชฏเซ‹ เชนเซ‹เชฏ;
  • task_concurrency=1 - เช…เชฎเซ‡ เชเช• เช•เชพเชฐเซเชฏเชจเชพ เช…เชจเซ‡เช• เช•เชพเชฐเซเชฏ เช‰เชฆเชพเชนเชฐเชฃเซ‹เชจเชพ เชเช•เชธเชพเชฅเซ‡ เชฒเซ‹เช‚เชš เช•เชฐเชตเชพ เชชเชฐ เชชเซเชฐเชคเชฟเชฌเช‚เชง เชฎเซ‚เช•เซ€เช เช›เซ€เช. เชจเชนเชฟเช‚เชคเชฐ, เช…เชฎเซ‡ เชเช• เชธเชพเชฅเซ‡ เช…เชจเซ‡เช• เชฒเซ‹เชจเซเชš เชฎเซ‡เชณเชตเซ€เชถเซเช‚ VerticaOperator (เชเช• เชŸเซ‡เชฌเชฒ เชคเชฐเชซ เชœเซ‹เชตเซเช‚);
  • report_update >> [email, tg] - เชฌเชงเชพ VerticaOperator เช†เชจเชพ เชœเซ‡เชตเชพ เชชเชคเซเชฐเซ‹ เช…เชจเซ‡ เชธเช‚เชฆเซ‡เชถเชพเช“ เชฎเซ‹เช•เชฒเชตเชพเชฎเชพเช‚ เชเช•เชฐเซ‚เชช เชฅเชพเช“:
    เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

    เชชเชฐเช‚เชคเซ เชจเซ‹เชŸเชฟเชซเชพเชฏเชฐ เช“เชชเชฐเซ‡เชŸเชฐเซ‹เชจเซ€ เชฒเซ‹เช‚เชšเชจเซ€ เชถเชฐเชคเซ‹ เช…เชฒเช— เชนเซ‹เชตเชพเชฅเซ€, เชฎเชพเชคเซเชฐ เชเช• เชœ เช•เชพเชฎ เช•เชฐเชถเซ‡. เชŸเซเชฐเซ€ เชตเซเชฏเซ‚เชฎเชพเช‚, เชฌเชงเซเช‚ เชฅเซ‹เชกเซเช‚ เช“เช›เซเช‚ เชฆเซเชฐเชถเซเชฏ เชฆเซ‡เช–เชพเชฏ เช›เซ‡:
    เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

เชนเซเช‚ เชตเชฟเชถเซ‡ เชฅเซ‹เชกเชพ เชถเชฌเซเชฆเซ‹ เช•เชนเซ€เชถ เชฎเซ‡เช•เซเชฐเซ‹ เช…เชจเซ‡ เชคเซ‡เชฎเชจเชพ เชฎเชฟเชคเซเชฐเซ‹ - เชšเชฒเซ‹.

เชฎเซ‡เช•เซเชฐเซ‹ เช เชœเชฟเชจเซเชœเชพ เชชเซเชฒเซ‡เชธเชนเซ‹เชฒเซเชกเชฐเซเชธ เช›เซ‡ เชœเซ‡ เช“เชชเชฐเซ‡เชŸเชฐ เชฆเชฒเซ€เชฒเซ‹เชฎเชพเช‚ เชตเชฟเชตเชฟเชง เช‰เชชเชฏเซ‹เช—เซ€ เชฎเชพเชนเชฟเชคเซ€เชจเซ‡ เชฌเชฆเชฒเซ€ เชถเช•เซ‡ เช›เซ‡. เช‰เชฆเชพเชนเชฐเชฃ เชคเชฐเซ€เช•เซ‡, เช†เชจเซ€ เชœเซ‡เชฎ:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} เชธเช‚เชฆเชฐเซเชญ เชตเซ‡เชฐเซ€เชเชฌเชฒเชจเซ€ เชธเชพเชฎเช—เซเชฐเซ€ เชธเซเชงเซ€ เชตเชฟเชธเซเชคเซƒเชค เชฅเชถเซ‡ execution_date เชซเซ‹เชฐเซเชฎเซ‡เชŸเชฎเชพเช‚ YYYY-MM-DD: 2020-07-14. เชถเซเชฐเซ‡เชทเซเช  เชญเชพเช— เช เช›เซ‡ เช•เซ‡ เชธเช‚เชฆเชฐเซเชญ เชšเชฒเซ‹เชจเซ‡ เชšเซ‹เช•เซเช•เชธ เช•เชพเชฐเซเชฏ เช‰เชฆเชพเชนเชฐเชฃ (เชŸเซเชฐเซ€ เชตเซเชฏเซเชฎเชพเช‚ เชเช• เชšเซ‹เชฐเชธ) เชชเชฐ เช–เซ€เชฒเซ€ เชจเชพเช–เชตเชพเชฎเชพเช‚ เช†เชตเซ‡ เช›เซ‡, เช…เชจเซ‡ เชœเซเชฏเชพเชฐเซ‡ เชชเซเชจเชƒเชชเซเชฐเชพเชฐเช‚เชญ เช•เชฐเชตเชพเชฎเชพเช‚ เช†เชตเซ‡ เช›เซ‡, เชคเซเชฏเชพเชฐเซ‡ เชชเซเชฒเซ‡เชธเชนเซ‹เชฒเซเชกเชฐเซเชธ เชธเชฎเชพเชจ เชฎเซ‚เชฒเซเชฏเซ‹ เชธเซเชงเซ€ เชตเชฟเชธเซเชคเซƒเชค เชฅเชถเซ‡.

เชธเซ‹เช‚เชชเซ‡เชฒ เชฎเซ‚เชฒเซเชฏเซ‹ เชฆเชฐเซ‡เช• เช•เชพเชฐเซเชฏ เช‰เชฆเชพเชนเชฐเชฃ เชชเชฐ เชฐเซ‡เชจเซเชกเชฐ เช•เชฐเซ‡เชฒ เชฌเชŸเชจเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เชจเซ‡ เชœเซ‹เชˆ เชถเช•เชพเชฏ เช›เซ‡. เช† เชฐเซ€เชคเซ‡ เชชเชคเซเชฐ เชฎเซ‹เช•เชฒเชตเชพเชจเซเช‚ เช•เชพเชฐเซเชฏ เช›เซ‡:

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

เช…เชจเซ‡ เชคเซ‡เชฅเซ€ เชธเช‚เชฆเซ‡เชถ เชฎเซ‹เช•เชฒเชตเชพเชจเชพ เช•เชพเชฐเซเชฏ เชชเชฐ:

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

เชจเชตเซ€เชจเชคเชฎ เช‰เชชเชฒเชฌเซเชง เชธเช‚เชธเซเช•เชฐเชฃ เชฎเชพเชŸเซ‡ เชฌเชฟเชฒเซเชŸ-เช‡เชจ เชฎเซ‡เช•เซเชฐเซ‹เชจเซ€ เชธเช‚เชชเซ‚เชฐเซเชฃ เชธเซ‚เชšเชฟ เช…เชนเซ€เช‚ เช‰เชชเชฒเชฌเซเชง เช›เซ‡: เชฎเซ‡เช•เซเชฐเซ‹ เชธเช‚เชฆเชฐเซเชญ

เชตเชงเซเชฎเชพเช‚, เชชเซเชฒเช—เชฟเชจเซเชธเชจเซ€ เชฎเชฆเชฆเชฅเซ€, เช…เชฎเซ‡ เช…เชฎเชพเชฐเชพ เชชเซ‹เชคเชพเชจเชพ เชฎเซ‡เช•เซเชฐเซ‹เชจเซ‡ เชœเชพเชนเซ‡เชฐ เช•เชฐเซ€ เชถเช•เซ€เช เช›เซ€เช, เชชเชฐเช‚เชคเซ เชคเซ‡ เชฌเซ€เชœเซ€ เชตเชพเชฐเซเชคเชพ เช›เซ‡.

เชชเซ‚เชฐเซเชตเชตเซเชฏเชพเช–เซเชฏเชพเชฏเชฟเชค เชตเชธเซเชคเซเช“ เช‰เชชเชฐเชพเช‚เชค, เช…เชฎเซ‡ เช…เชฎเชพเชฐเชพ เชšเชฒเซ‹เชจเชพเช‚ เชฎเซ‚เชฒเซเชฏเซ‹เชจเซ‡ เชฌเชฆเชฒเซ€ เชถเช•เซ€เช เช›เซ€เช (เชฎเซ‡เช‚ เช‰เชชเชฐเชจเชพ เช•เซ‹เชกเชฎเชพเช‚ เชชเชนเซ‡เชฒเซ‡เชฅเซ€ เชœ เชคเซ‡เชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซเชฏเซ‹ เช›เซ‡). เชšเชพเชฒเซ‹ เช…เช‚เชฆเชฐ เชฌเชจเชพเชตเซ€เช Admin/Variables เชฌเซ‡ เชตเชธเซเชคเซเช“:

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

เชคเชฎเซ‡ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€ เชถเช•เซ‹ เชคเซ‡ เชฌเชงเซเช‚:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

เชฎเซ‚เชฒเซเชฏ เชเช• เชธเซเช•เซ‡เชฒเชฐ เชนเซ‹เชˆ เชถเช•เซ‡ เช›เซ‡ เช…เชฅเชตเชพ เชคเซ‡ JSON เชชเชฃ เชนเซ‹เชˆ เชถเช•เซ‡ เช›เซ‡. JSON เชจเชพ เช•เชฟเชธเซเชธเชพเชฎเชพเช‚:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

เชซเช•เซเชค เช‡เชšเซเช›เชฟเชค เช•เซ€เชจเชพ เชชเชพเชฅเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ‹: {{ var.json.bot_config.bot.token }}.

เชนเซเช‚ เชถเชพเชฌเซเชฆเชฟเช• เชฐเซ€เชคเซ‡ เชเช• เชถเชฌเซเชฆ เช•เชนเซ€เชถ เช…เชจเซ‡ เชคเซ‡เชจเชพ เชตเชฟเชถเซ‡ เชเช• เชธเซเช•เซเชฐเซ€เชจเชถเซ‰เชŸ เชฌเชคเชพเชตเซ€เชถ เชœเซ‹เชกเชพเชฃเซ‹. เช…เชนเซ€เช‚ เชฌเชงเซเช‚ เชชเซเชฐเชพเชฅเชฎเชฟเช• เช›เซ‡: เชชเซƒเชทเซเช  เชชเชฐ Admin/Connections เช…เชฎเซ‡ เช•เชจเซ‡เช•เซเชถเชจ เชฌเชจเชพเชตเซ€เช เช›เซ€เช, เชคเซเชฏเชพเช‚ เช…เชฎเชพเชฐเชพ เชฒเซ‹เช—เชฟเชจ/เชชเชพเชธเชตเชฐเซเชก เช…เชจเซ‡ เชตเชงเซ เชšเซ‹เช•เซเช•เชธ เชชเชฐเชฟเชฎเชพเชฃเซ‹ เช‰เชฎเซ‡เชฐเซ€เช เช›เซ€เช. เช†เชจเซ€ เชœเซ‡เชฎ:

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

เชชเชพเชธเชตเชฐเซเชกเซเชธ เชเชจเช•เซเชฐเชฟเชชเซเชŸ เชฅเชˆ เชถเช•เซ‡ เช›เซ‡ (เชกเชฟเชซเซ‹เชฒเซเชŸ เช•เชฐเชคเชพเช‚ เชตเชงเซ เชธเชพเชฐเซ€ เชฐเซ€เชคเซ‡), เช…เชฅเชตเชพ เชคเชฎเซ‡ เช•เชจเซ‡เช•เซเชถเชจ เชชเซเชฐเช•เชพเชฐ เช›เซ‹เชกเซ€ เชถเช•เซ‹ เช›เซ‹ (เชœเซ‡เชฎ เชฎเซ‡เช‚ เช•เชฐเซเชฏเซเช‚ tg_main) - เชนเช•เซ€เช•เชค เช เช›เซ‡ เช•เซ‡ เชเชฐเชซเซเชฒเซ‹ เชฎเซ‹เชกเชฒเซเชธเชฎเชพเช‚ เชชเซเชฐเช•เชพเชฐเซ‹เชจเซ€ เชธเซ‚เชšเชฟ เชนเชพเชฐเซเชกเชตเชพเชฏเชฐเซเชก เช›เซ‡ เช…เชจเซ‡ เชธเซเชฐเซ‹เชค เช•เซ‹เชกเชฎเชพเช‚ เชชเซเชฐเชตเซ‡เชถเซเชฏเชพ เชตเชฟเชจเชพ เชตเชฟเชธเซเชคเซƒเชค เช•เชฐเซ€ เชถเช•เชพเชคเซ€ เชจเชฅเซ€ (เชœเซ‹ เช…เชšเชพเชจเช• เชฎเซ‡เช‚ เช•เช‚เชˆเช• Google เชจ เช•เชฐเซเชฏเซเช‚ เชนเซ‹เชฏ, เชคเซ‹ เช•เซƒเชชเชพ เช•เชฐเซ€เชจเซ‡ เชฎเชจเซ‡ เชธเซเชงเชพเชฐเซ‹), เชชเชฐเช‚เชคเซ เช•เช‚เชˆเชชเชฃ เช…เชฎเชจเซ‡ เช•เซเชฐเซ‡เชกเชฟเชŸ เชฎเซ‡เชณเชตเชตเชพเชฅเซ€ เชฐเซ‹เช•เชถเซ‡ เชจเชนเซ€เช‚ เชจเชพเชฎ

เชคเชฎเซ‡ เชธเชฎเชพเชจ เชจเชพเชฎ เชธเชพเชฅเซ‡ เช˜เชฃเชพ เชœเซ‹เชกเชพเชฃเซ‹ เชชเชฃ เชฌเชจเชพเชตเซ€ เชถเช•เซ‹ เช›เซ‹: เช† เช•เชฟเชธเซเชธเชพเชฎเชพเช‚, เชชเชฆเซเชงเชคเชฟ BaseHook.get_connection(), เชœเซ‡ เช…เชฎเชจเซ‡ เชจเชพเชฎ เชฆเซเชตเชพเชฐเชพ เชœเซ‹เชกเชพเชฃเซ‹ เชฎเซ‡เชณเชตเซ‡ เช›เซ‡, เช†เชชเชถเซ‡ เชฐเซ‡เชจเซเชกเชฎ เช•เซ‡เชŸเชฒเชพเช• เชจเชพเชฎเซ‹เชฎเชพเช‚เชฅเซ€ (เชฐเชพเช‰เชจเซเชก เชฐเซ‹เชฌเชฟเชจ เชฌเชจเชพเชตเชตเซเช‚ เชตเชงเซ เชคเชพเชฐเซเช•เชฟเช• เชนเชถเซ‡, เชชเชฐเช‚เชคเซ เชšเชพเชฒเซ‹ เชคเซ‡เชจเซ‡ เชเชฐเชซเซเชฒเซ‹ เชตเชฟเช•เชพเชธเช•เชฐเซเชคเชพเช“เชจเชพ เช…เช‚เชคเชฐเชพเชคเซเชฎเชพ เชชเชฐ เช›เซ‹เชกเซ€ เชฆเชˆเช).

เชตเซ‡เชฐเชฟเชฏเซ‡เชฌเชฒเซเชธ เช…เชจเซ‡ เช•เชจเซ‡เช•เซเชถเชจเซเชธ เชšเซ‹เช•เซเช•เชธเชชเชฃเซ‡ เชธเชฐเชธ เชธเชพเชงเชจเซ‹ เช›เซ‡, เชชเชฐเช‚เชคเซ เชธเช‚เชคเซเชฒเชจ เชจ เช—เซเชฎเชพเชตเชตเซเช‚ เชฎเชนเชคเซเชตเชชเซ‚เชฐเซเชฃ เช›เซ‡: เชคเชฎเชพเชฐเชพ เชชเซเชฐเชตเชพเชนเชจเชพ เช•เชฏเชพ เชญเชพเช—เซ‹ เชคเชฎเซ‡ เช•เซ‹เชกเชฎเชพเช‚ เชœ เชธเช‚เช—เซเชฐเชนเชฟเชค เช•เชฐเซ‹ เช›เซ‹ เช…เชจเซ‡ เชธเซเชŸเซ‹เชฐเซ‡เชœ เชฎเชพเชŸเซ‡ เชคเชฎเซ‡ เชเชฐเชซเซเชฒเซ‹เชจเซ‡ เช•เชฏเชพ เชญเชพเช—เซ‹ เช†เชชเซ‹ เช›เซ‹. เชเช• เชคเชฐเชซ, UI เชฆเซเชตเชพเชฐเชพ, เช‰เชฆเชพเชนเชฐเชฃ เชคเชฐเซ€เช•เซ‡, เชฎเซ‡เช‡เชฒเชฟเช‚เช— เชฌเซ‹เช•เซเชธ, เชฎเซ‚เชฒเซเชฏเชจเซ‡ เชเชกเชชเชฅเซ€ เชฌเชฆเชฒเชตเซเช‚ เช…เชจเซเช•เซ‚เชณ เชนเซ‹เชˆ เชถเช•เซ‡ เช›เซ‡. เชฌเซ€เชœเซ€ เชฌเชพเชœเซ, เช† เชนเชœเซ€ เชชเชฃ เชฎเชพเช‰เชธ เช•เซเชฒเชฟเช• เชชเชฐเชจเซเช‚ เชตเชณเชคเชฐ เช›เซ‡, เชœเซ‡เชฎเชพเช‚เชฅเซ€ เช†เชชเชฃเซ‡ (เชนเซเช‚) เช›เซ‚เชŸเช•เชพเชฐเซ‹ เชฎเซ‡เชณเชตเชตเชพ เชฎเชพเช—เชคเชพ เชนเชคเชพ.

เช•เชจเซ‡เช•เซเชถเชจเซเชธ เชธเชพเชฅเซ‡ เช•เชพเชฎ เช•เชฐเชตเซเช‚ เช เช•เชพเชฐเซเชฏเซ‹เชฎเชพเช‚เชจเซเช‚ เชเช• เช›เซ‡ เชนเซเช•เซเชธ. เชธเชพเชฎเชพเชจเซเชฏ เชฐเซ€เชคเซ‡, เชเชฐเชซเซเชฒเซ‹ เชนเซเช•เซเชธ เชคเซ‡เชจเซ‡ เชคเซƒเชคเซ€เชฏ-เชชเช•เซเชท เชธเซ‡เชตเชพเช“ เช…เชจเซ‡ เชชเซเชธเซเชคเช•เชพเชฒเชฏเซ‹ เชธเชพเชฅเซ‡ เช•เชจเซ‡เช•เซเชŸ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡เชจเชพ เชฌเชฟเช‚เชฆเซเช“ เช›เซ‡. เชฆเชพ.เชค. JiraHook เช…เชฎเชพเชฐเชพ เชฎเชพเชŸเซ‡ เชœเซ€เชฐเชพ (เชคเชฎเซ‡ เช•เชพเชฐเซเชฏเซ‹เชจเซ‡ เช†เช—เชณ เช…เชจเซ‡ เชชเชพเช›เชณ เช–เชธเซ‡เชกเซ€ เชถเช•เซ‹ เช›เซ‹) เชธเชพเชฅเซ‡ เชธเช‚เชชเชฐเซเช• เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡ เช•เซเชฒเชพเชฏเชจเซเชŸ เช–เซ‹เชฒเชถเซ‡ เช…เชจเซ‡ เชคเซ‡เชจเซ€ เชฎเชฆเชฆเชฅเซ€ SambaHook เชคเชฎเซ‡ เชธเซเชฅเชพเชจเชฟเช• เชซเชพเช‡เชฒ เชชเชฐ เชฆเชฌเชพเชฃ เช•เชฐเซ€ เชถเช•เซ‹ เช›เซ‹ smb-เชฌเชฟเช‚เชฆเซ.

เช•เชธเซเชŸเชฎ เช“เชชเชฐเซ‡เชŸเชฐเชจเซเช‚ เชชเชฆเชšเซเช›เซ‡เชฆเชจ

เช…เชจเซ‡ เช…เชฎเซ‡ เชคเซ‡ เช•เซ‡เชตเซ€ เชฐเซ€เชคเซ‡ เชฌเชจเซ‡ เช›เซ‡ เชคเซ‡ เชœเซ‹เชตเชพเชจเซ€ เชจเชœเซ€เช• เชชเชนเซ‹เช‚เชšเซ€ เช—เชฏเชพ TelegramBotSendMessage

เช•เซ‹เชก commons/operators.py เชตเชพเชธเซเชคเชตเชฟเช• เช“เชชเชฐเซ‡เชŸเชฐ เชธเชพเชฅเซ‡:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

เช…เชนเซ€เช‚, เชเชฐเชซเซเชฒเซ‹เชจเซ€ เช…เชจเซเชฏ เชฆเชฐเซ‡เช• เชตเชธเซเชคเซเชจเซ€ เชœเซ‡เชฎ, เชฌเชงเซเช‚ เช–เซ‚เชฌ เชœ เชธเชฐเชณ เช›เซ‡:

  • เชชเชพเชธเซ‡เชฅเซ€ เชตเชพเชฐเชธเชพเชฎเชพเช‚ เชฎเชณเซ‡เชฒ เช›เซ‡ BaseOperator, เชœเซ‡ เชฅเซ‹เชกเซ€เช• เชเชฐเชซเซเชฒเซ‹-เชตเชฟเชถเชฟเชทเซเชŸ เชตเชธเซเชคเซเช“เชจเซ‹ เช…เชฎเชฒ เช•เชฐเซ‡ เช›เซ‡ (เชคเชฎเชพเชฐเซ€ เชฒเซ‡เชเชฐ เชœเซเช“)
  • เช˜เซ‹เชทเชฟเชค เช•เซเชทเซ‡เชคเซเชฐเซ‹ template_fields, เชœเซ‡เชฎเชพเช‚ เชœเซ€เชจเซเชœเชพ เชชเซเชฐเช•เซเชฐเชฟเชฏเชพ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡ เชฎเซ‡เช•เซเชฐเซ‹เชจเซ€ เชถเซ‹เชง เช•เชฐเชถเซ‡.
  • เชฎเชพเชŸเซ‡ เชฏเซ‹เช—เซเชฏ เชฆเชฒเซ€เชฒเซ‹ เช—เซ‹เช เชตเซ€ __init__(), เชœเซเชฏเชพเช‚ เชœเชฐเซ‚เชฐเซ€ เชนเซ‹เชฏ เชคเซเชฏเชพเช‚ เชกเชฟเชซเซ‹เชฒเซเชŸ เชธเซ‡เชŸ เช•เชฐเซ‹.
  • เช…เชฎเซ‡ เชชเซ‚เชฐเซเชตเชœเชจเซ€ เชถเชฐเซ‚เช†เชค เชตเชฟเชถเซ‡ เชชเชฃ เชญเซ‚เชฒเซเชฏเชพ เชจเชฅเซ€.
  • เช…เชจเซเชฐเซ‚เชช เชนเซ‚เช• เช–เซ‹เชฒเซเชฏเซเช‚ TelegramBotHookเชคเซ‡เชฎเชพเช‚เชฅเซ€ เชเช• เช•เซเชฒเชพเชฏเชจเซเชŸ เช‘เชฌเซเชœเซ‡เช•เซเชŸ เชฎเซ‡เชณเชตเซเชฏเซ‹.
  • เช“เชตเชฐเชฐเชพเช‡เชก (เชชเซเชจเชƒเชตเซเชฏเชพเช–เซเชฏเชพเชฏเชฟเชค) เชชเชฆเซเชงเชคเชฟ BaseOperator.execute(), เชœเซเชฏเชพเชฐเซ‡ เช‘เชชเชฐเซ‡เชŸเชฐเชจเซ‡ เชฒเซ‰เชจเซเชš เช•เชฐเชตเชพเชจเซ‹ เชธเชฎเชฏ เช†เชตเชถเซ‡ เชคเซเชฏเชพเชฐเซ‡ เชเชฐเชซเซ‹เชต เชœเซ‡ เชŸเซเชตเซ€เชš เช•เชฐเชถเซ‡ - เชคเซ‡เชฎเชพเช‚ เช…เชฎเซ‡ เชฒเซ‹เช— เช‡เชจ เช•เชฐเชตเชพเชจเซเช‚ เชญเซ‚เชฒเซ€เชจเซ‡ เชฎเซเช–เซเชฏ เช•เซเชฐเชฟเชฏเชพเชจเซ‡ เช…เชฎเชฒเชฎเชพเช‚ เชฎเซ‚เช•เซ€เชถเซเช‚. (เช…เชฎเซ‡ เชฒเซ‰เช— เช‡เชจ เช•เชฐเซ€เช เช›เซ€เช, เชฎเชพเชฐเซเช— เชฆเซเชตเชพเชฐเชพ, เชธเซ€เชงเชพ เชœ stdout ะธ stderr - เชเชฐเชซเซเชฒเซ‹ เชฆเชฐเซ‡เช• เชตเชธเซเชคเซเชจเซ‡ เช…เชŸเช•เชพเชตเชถเซ‡, เชคเซ‡เชจเซ‡ เชธเซเช‚เชฆเชฐ เชฐเซ€เชคเซ‡ เชฒเชชเซ‡เชŸเซ€ เชฒเซ‡เชถเซ‡, เชœเซเชฏเชพเช‚ เชœเชฐเซ‚เชฐเซ€ เชนเซ‹เชฏ เชคเซเชฏเชพเช‚ เชคเซ‡เชจเซ‡ เชตเชฟเช˜เชŸเชฟเชค เช•เชฐเชถเซ‡.)

เชšเชพเชฒเซ‹ เชœเซ‹เชˆเช เช•เซ‡ เช†เชชเชฃเซ€ เชชเชพเชธเซ‡ เชถเซเช‚ เช›เซ‡ commons/hooks.py. เชซเชพเช‡เชฒเชจเซ‹ เชชเซเชฐเชฅเชฎ เชญเชพเช—, เชนเซ‚เช• เชธเชพเชฅเซ‡ เชœ:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

เชฎเชจเซ‡ เช…เชนเซ€เช‚ เชถเซเช‚ เชธเชฎเชœเชพเชตเชตเซเช‚ เชคเซ‡ เชชเชฃ เช–เชฌเชฐ เชจเชฅเซ€, เชนเซเช‚ เชซเช•เซเชค เชฎเชนเชคเซเชตเชชเซ‚เชฐเซเชฃ เชฎเซเชฆเซเชฆเชพเช“ เชจเซ‹เช‚เชงเซ€เชถ:

  • เช…เชฎเซ‡ เชตเชพเชฐเชธเชพเชฎเชพเช‚ เช›เซ€เช, เชฆเชฒเซ€เชฒเซ‹ เชตเชฟเชถเซ‡ เชตเชฟเชšเชพเชฐเซ‹ - เชฎเซ‹เชŸเชพเชญเชพเช—เชจเชพ เช•เชฟเชธเซเชธเชพเช“เชฎเชพเช‚ เชคเซ‡ เชเช• เชนเชถเซ‡: conn_id;
  • เชฎเชพเชจเช• เชชเชฆเซเชงเชคเชฟเช“เชจเซ‡ เช“เชตเชฐเชฐเชพเช‡เชกเชฟเช‚เช—: เชฎเซ‡เช‚ เชฎเชพเชฐเซ€ เชœเชพเชคเชจเซ‡ เชฎเชฐเซเชฏเชพเชฆเชฟเชค เช•เชฐเซ€ เช›เซ‡ get_conn(), เชœเซ‡เชฎเชพเช‚ เชฎเชจเซ‡ เชจเชพเชฎ เชฆเซเชตเชพเชฐเชพ เช•เชจเซ‡เช•เซเชถเชจ เชชเซ‡เชฐเชพเชฎเซ€เชŸเชฐ เชฎเชณเซ‡ เช›เซ‡ เช…เชจเซ‡ เชฎเชพเชคเซเชฐ เชตเชฟเชญเชพเช— เชฎเชณเซ‡ เช›เซ‡ extra (เช† เชเช• JSON เชซเซ€เชฒเซเชก เช›เซ‡), เชœเซ‡เชฎเชพเช‚ เชฎเซ‡เช‚ (เชฎเชพเชฐเซ€ เชชเซ‹เชคเชพเชจเซ€ เชธเซ‚เชšเชจเชพเช“ เช…เชจเซเชธเชพเชฐ!) เชŸเซ‡เชฒเชฟเช—เซเชฐเชพเชฎ เชฌเซ‹เชŸ เชŸเซ‹เช•เชจ เชฎเซ‚เช•เซเชฏเซเช‚ เช›เซ‡: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • เชนเซเช‚ เช…เชฎเชพเชฐเชพ เชเช• เช‰เชฆเชพเชนเชฐเชฃ เชฌเชจเชพเชตเซ‹ TelegramBot, เชคเซ‡เชจเซ‡ เชšเซ‹เช•เซเช•เชธ เชŸเซ‹เช•เชจ เช†เชชเซ€เชจเซ‡.

เชฌเชธ เชเชŸเชฒเซเช‚ เชœ. เชคเชฎเซ‡ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เชจเซ‡ เชนเซ‚เช•เชฎเชพเช‚เชฅเซ€ เช•เซเชฒเชพเชฏเช‚เชŸ เชฎเซ‡เชณเชตเซ€ เชถเช•เซ‹ เช›เซ‹ TelegramBotHook().clent เช…เชฅเชตเชพ TelegramBotHook().get_conn().

เช…เชจเซ‡ เชซเชพเช‡เชฒเชจเซ‹ เชฌเซ€เชœเซ‹ เชญเชพเช—, เชœเซ‡เชฎเชพเช‚ เชนเซเช‚ เชŸเซ‡เชฒเชฟเช—เซเชฐเชพเชฎ REST API เชฎเชพเชŸเซ‡ เชฎเชพเช‡เช•เซเชฐเซ‹เชฐเซ‡เชชเชฐ เชฌเชจเชพเชตเซเช‚ เช›เซเช‚, เชœเซ‡เชฅเซ€ เชคเซ‡เชจเซ‡ เช–เซ‡เช‚เชšเซ€ เชจ เชถเช•เชพเชฏ. python-telegram-bot เชเช• เชชเชฆเซเชงเชคเชฟ เชฎเชพเชŸเซ‡ sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

เชธเชพเชšเซ€ เชฐเซ€เชค เช เช›เซ‡ เช•เซ‡ เชคเซ‡ เชฌเชงเซเช‚ เช‰เชฎเซ‡เชฐเชตเซเช‚: TelegramBotSendMessage, TelegramBotHook, TelegramBot - เชชเซเชฒเช—เช‡เชจเชฎเชพเช‚, เชธเชพเชฐเซเชตเชœเชจเชฟเช• เชฐเชฟเชชเซ‹เชเซ€เชŸเชฐเซ€เชฎเชพเช‚ เชฎเซ‚เช•เซ‹ เช…เชจเซ‡ เชคเซ‡เชจเซ‡ เช“เชชเชจ เชธเซ‹เชฐเซเชธเชจเซ‡ เช†เชชเซ‹.

เชœเซเชฏเชพเชฐเซ‡ เช…เชฎเซ‡ เช† เชฌเชงเซเช‚ เช…เชญเซเชฏเชพเชธ เช•เชฐเซ€ เชฐเชนเซเชฏเชพ เชนเชคเชพ, เชคเซเชฏเชพเชฐเซ‡ เช…เชฎเชพเชฐเชพ เชฐเชฟเชชเซ‹เชฐเซเชŸ เช…เชชเชกเซ‡เชŸเซเชธ เชธเชซเชณเชคเชพเชชเซ‚เชฐเซเชตเช• เชจเชฟเชทเซเชซเชณ เชฅเชตเชพเชฎเชพเช‚ เชตเซเชฏเชตเชธเซเชฅเชพเชชเชฟเชค เชฅเชฏเชพ เช…เชจเซ‡ เชฎเชจเซ‡ เชšเซ‡เชจเชฒเชฎเชพเช‚ เชเช• เชญเซ‚เชฒ เชธเช‚เชฆเซ‡เชถ เชฎเซ‹เช•เชฒเซเชฏเซ‹. เชนเซเช‚ เชคเชชเชพเชธ เช•เชฐเซ€เชถ เช•เซ‡ เชถเซเช‚ เชคเซ‡ เช–เซ‹เชŸเซเช‚ เช›เซ‡...

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚
เช…เชฎเชพเชฐเชพ เชกเซ‹เช—เชฎเชพเช‚ เช•เช‚เชˆเช• เชคเซ‚เชŸเซ€ เช—เชฏเซเช‚! เชถเซเช‚ เช†เชชเชฃเซ‡ เชเชตเซ€ เชœ เช…เชชเซ‡เช•เซเชทเชพ เชฐเชพเช–เชคเชพ เชจเชฅเซ€? เชฌเชฐเชพเชฌเชฐ!

เชถเซเช‚ เชคเชฎเซ‡ เชฐเซ‡เชกเชตเชพเชจเชพ เช›เซ‹?

เชถเซเช‚ เชคเชฎเชจเซ‡ เชฒเชพเช—เซ‡ เช›เซ‡ เช•เซ‡ เชนเซเช‚ เช•เช‚เชˆเช• เชšเซ‚เช•เซ€ เช—เชฏเซ‹ เช›เซเช‚? เชเชตเซเช‚ เชฒเชพเช—เซ‡ เช›เซ‡ เช•เซ‡ เชคเซ‡เชฃเซ‡ เชเชธเช•เซเชฏเซเชเชฒ เชธเชฐเซเชตเชฐเชฅเซ€ เชตเชฐเซเชŸเชฟเช•เชพเชฎเชพเช‚ เชกเซ‡เชŸเชพ เชŸเซเชฐเชพเชจเซเชธเชซเชฐ เช•เชฐเชตเชพเชจเซเช‚ เชตเชšเชจ เช†เชชเซเชฏเซเช‚ เชนเชคเซเช‚, เช…เชจเซ‡ เชชเช›เซ€ เชคเซ‡เชฃเซ‡ เชคเซ‡ เชฒเซ€เชงเซเช‚ เช…เชจเซ‡ เชตเชฟเชทเชฏ, เชฌเชฆเชฎเชพเชถเซ‹เชจเซ‡ เช›เซ‹เชกเซ€ เชฆเซ€เชงเซ‹!

เช† เช…เชคเซเชฏเชพเชšเชพเชฐ เช‡เชฐเชพเชฆเชพเชชเซ‚เชฐเซเชตเช• เช•เชฐเชตเชพเชฎเชพเช‚ เช†เชตเซเชฏเซ‹ เชนเชคเซ‹, เชฎเชพเชฐเซ‡ เชคเชฎเชพเชฐเชพ เชฎเชพเชŸเซ‡ เช•เซ‡เชŸเชฒเซ€เช• เชชเชฐเชฟเชญเชพเชทเชพ เชธเชฎเชœเชตเชพเชจเซ€ เชนเชคเซ€. เชนเชตเซ‡ เชคเชฎเซ‡ เชตเชงเซ เช†เช—เชณ เชตเชงเซ€ เชถเช•เซ‹ เช›เซ‹.

เช…เชฎเชพเชฐเซ€ เชฏเซ‹เชœเชจเชพ เช† เชนเชคเซ€:

  1. เชกเซ‡เช— เช•เชฐเซ‹
  2. เช•เชพเชฐเซเชฏเซ‹ เชฌเชจเชพเชตเซ‹
  3. เชœเซเช“ เช•เซ‡ เชฌเชงเซเช‚ เช•เซ‡เชŸเชฒเซเช‚ เชธเซเช‚เชฆเชฐ เช›เซ‡
  4. เชญเชฐเชตเชพ เชฎเชพเชŸเซ‡ เชธเชคเซเชฐ เชจเช‚เชฌเชฐเซ‹ เชธเซ‹เช‚เชชเซ‹
  5. SQL เชธเชฐเซเชตเชฐเชฎเชพเช‚เชฅเซ€ เชกเซ‡เชŸเชพ เชฎเซ‡เชณเชตเซ‹
  6. เชตเชฐเซเชŸเชฟเช•เชพเชฎเชพเช‚ เชกเซ‡เชŸเชพ เชฎเซ‚เช•เซ‹
  7. เช†เช‚เช•เชกเชพ เชเช•เชคเซเชฐเชฟเชค เช•เชฐเซ‹

เชคเซ‡เชฅเซ€, เช† เชฌเชงเซเช‚ เชฎเซ‡เชณเชตเชตเชพ เช…เชจเซ‡ เชšเชฒเชพเชตเชตเชพ เชฎเชพเชŸเซ‡, เชฎเซ‡เช‚ เช…เชฎเชพเชฐเชพเชฎเชพเช‚ เชเช• เชจเชพเชจเซ‹ เช‰เชฎเซ‡เชฐเซ‹ เช•เชฐเซเชฏเซ‹ docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

เชคเซเชฏเชพเช‚ เช†เชชเชฃเซ‡ เช‰เชญเชพ เช•เชฐเซ€เช เช›เซ€เช:

  • เชฏเชœเชฎเชพเชจ เชคเชฐเซ€เช•เซ‡ เชตเชฐเซเชŸเซ€เช•เชพ dwh เชธเซŒเชฅเซ€ เชฎเซ‚เชณเชญเซ‚เชค เชธเซ‡เชŸเชฟเช‚เช—เซเชธ เชธเชพเชฅเซ‡,
  • SQL เชธเชฐเซเชตเชฐเชจเชพ เชคเซเชฐเชฃ เช‰เชฆเชพเชนเชฐเชฃเซ‹,
  • เช…เชฎเซ‡ เชชเช›เซ€เชจเชพ เชกเซ‡เชŸเชพเชฌเซ‡เชธเซ‡เชธเชจเซ‡ เช•เซ‡เชŸเชฒเชพเช• เชกเซ‡เชŸเชพ เชธเชพเชฅเซ‡ เชญเชฐเซ€เช เช›เซ€เช (เช•เซ‹เชˆเชชเชฃ เช•เชฟเชธเซเชธเชพเชฎเชพเช‚ เชคเชชเชพเชธ เช•เชฐเชถเซ‹ เชจเชนเซ€เช‚ mssql_init.py!)

เช…เชฎเซ‡ เช›เซ‡เชฒเซเชฒเซ€ เชตเช–เชค เช•เชฐเชคเชพเช‚ เชธเชนเซ‡เชœ เชตเชงเซ เชœเชŸเชฟเชฒ เช†เชฆเซ‡เชถเชจเซ€ เชฎเชฆเชฆเชฅเซ€ เชคเชฎเชพเชฎ เชธเชพเชฐเชพเชจเซ‡ เชฒเซ‹เชจเซเชš เช•เชฐเซ€เช เช›เซ€เช:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

เช…เชฎเชพเชฐเชพ เชšเชฎเชคเซเช•เชพเชฐ เชฐเซ‡เชจเซเชกเชฎเชพเช‡เชเชฐเซ‡ เชถเซเช‚ เชฌเชจเชพเชตเซเชฏเซเช‚ เช›เซ‡, เชคเชฎเซ‡ เช†เช‡เชŸเชฎเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€ เชถเช•เซ‹ เช›เซ‹ Data Profiling/Ad Hoc Query:

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚
เชฎเซเช–เซเชฏ เชตเชธเซเชคเซ เชคเซ‡เชจเซ‡ เชตเชฟเชถเซเชฒเซ‡เชทเช•เซ‹เชจเซ‡ เชฌเชคเชพเชตเชตเชพเชจเซ€ เชจเชฅเซ€

เชตเชฟเชธเซเชคเซƒเชค เช•เชฐเซ‹ ETL เชธเชคเซเชฐเซ‹ เชนเซเช‚ เชจเชนเซ€เช‚ เช•เชฐเซเช‚, เชคเซเชฏเชพเช‚ เชฌเชงเซเช‚ เชคเซเชšเซเช› เช›เซ‡: เช…เชฎเซ‡ เชเช• เช†เชงเชพเชฐ เชฌเชจเชพเชตเซ€เช เช›เซ€เช, เชคเซ‡เชฎเชพเช‚ เชเช• เชšเชฟเชนเซเชจ เช›เซ‡, เช…เชฎเซ‡ เชธเช‚เชฆเชฐเซเชญ เชฎเซ‡เชจเซ‡เชœเชฐ เชธเชพเชฅเซ‡ เชฌเชงเซเช‚ เชฒเชชเซ‡เชŸเซ€เช เช›เซ€เช, เช…เชจเซ‡ เชนเชตเซ‡ เช…เชฎเซ‡ เช† เช•เชฐเซ€เช เช›เซ€เช:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

เชธเชฎเชฏ เช†เชตเซ€ เช—เชฏเซ‹ เช›เซ‡ เช…เชฎเชพเชฐเซ‹ เชกเซ‡เชŸเชพ เชเช•เชคเซเชฐเชฟเชค เช•เชฐเซ‹ เช…เชฎเชพเชฐเชพ เชฆเซ‹เชขเชธเซ‹ เชŸเซ‡เชฌเชฒเชฎเชพเช‚เชฅเซ€. เชšเชพเชฒเซ‹ เช† เช–เซ‚เชฌ เชœ เช…เชญเซ‚เชคเชชเซ‚เชฐเซเชต เชฐเซ‡เช–เชพเช“เชจเซ€ เชฎเชฆเชฆเชฅเซ€ เช•เชฐเซ€เช:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. เชนเซ‚เช•เชจเซ€ เชฎเชฆเชฆเชฅเซ€ เช†เชชเชฃเซ‡ เชเชฐเชซเซเชฒเซ‹เชฎเชพเช‚เชฅเซ€ เชฎเซ‡เชณเชตเซ€เช เช›เซ€เช pymssql-เชœเซ‹เชกเชพเชตเชพ
  2. เชšเชพเชฒเซ‹ เชตเชฟเชจเช‚เชคเซ€เชฎเชพเช‚ เชคเชพเชฐเซ€เช–เชจเชพ เชธเซเชตเชฐเซ‚เชชเชฎเชพเช‚ เชชเซเชฐเชคเชฟเชฌเช‚เชงเชจเซ‡ เชฌเชฆเชฒเซ€เช - เชคเซ‡ เชŸเซ‡เชฎเซเชชเชฒเซ‡เชŸ เชเชจเซเชœเชฟเชจ เชฆเซเชตเชพเชฐเชพ เช•เชพเชฐเซเชฏเชฎเชพเช‚ เชจเชพเช–เชตเชพเชฎเชพเช‚ เช†เชตเชถเซ‡.
  3. เช…เชฎเชพเชฐเซ€ เชตเชฟเชจเช‚เชคเซ€เชจเซ‡ เช–เซ‹เชฐเชพเช• เช†เชชเชตเซ‹ pandasเช…เชฎเชจเซ‡ เช•เซ‹เชฃ เชฎเชณเชถเซ‡ DataFrame - เชคเซ‡ เชญเชตเชฟเชทเซเชฏเชฎเชพเช‚ เช†เชชเชฃเชพ เชฎเชพเชŸเซ‡ เช‰เชชเชฏเซ‹เช—เซ€ เชฅเชถเซ‡.

เชนเซเช‚ เช…เชตเซ‡เชœเซ€เชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซเช‚ เช›เซเช‚ {dt} เชตเชฟเชจเช‚เชคเซ€ เชชเชฐเชฟเชฎเชพเชฃเชจเซ‡ เชฌเชฆเชฒเซ‡ %s เชเชŸเชฒเชพ เชฎเชพเชŸเซ‡ เชจเชนเซ€เช‚ เช•เซ‡ เชนเซเช‚ เชฆเซเชทเซเชŸ เชชเชฟเชจเซ‹เชšเชฟเช“ เช›เซเช‚, เชชเชฃ เชเชŸเชฒเชพ เชฎเชพเชŸเซ‡ pandas เชธเช‚เชญเชพเชณเซ€ เชถเช•เชคเชพ เชจเชฅเซ€ pymssql เช…เชจเซ‡ เช›เซ‡เชฒเซเชฒเชพ เชเช• เชธเชฐเช•เซ€ เชœเชพเชฏ เช›เซ‡ params: Listเชœเซ‹เช•เซ‡ เชคเซ‡ เช–เชฐเซ‡เช–เชฐ เช‡เชšเซเช›เซ‡ เช›เซ‡ tuple.
เช เชชเชฃ เชจเซ‹เช‚เชง เช•เชฐเซ‹ เช•เซ‡ เชตเชฟเช•เชพเชธเช•เชฐเซเชคเชพ pymssql เชนเชตเซ‡ เชคเซ‡เชจเซ‡ เชŸเซ‡เช•เซ‹ เชจ เช†เชชเชตเชพเชจเซเช‚ เชจเช•เซเช•เซ€ เช•เชฐเซเชฏเซเช‚, เช…เชจเซ‡ เชคเซ‡ เชฌเชนเชพเชฐ เชœเชตเชพเชจเซ‹ เชธเชฎเชฏ เช›เซ‡ pyodbc.

เชšเชพเชฒเซ‹ เชœเซ‹เชˆเช เช•เซ‡ เชเชฐเชซเซเชฒเซ‹เช เช…เชฎเชพเชฐเชพ เช•เชพเชฐเซเชฏเซ‹เชจเซ€ เชฆเชฒเซ€เชฒเซ‹ เชถเซเช‚ เชญเชฐเซ‡เชฒเซ€ เช›เซ‡:

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹: ETL เชจเซ‡ เชธเชฐเชณ เชฌเชจเชพเชตเชตเซเช‚

เชœเซ‹ เชคเซเชฏเชพเช‚ เช•เซ‹เชˆ เชกเซ‡เชŸเชพ เชจเชฅเซ€, เชคเซ‹ เชชเช›เซ€ เชšเชพเชฒเซ เชฐเชพเช–เชตเชพเชจเซ‹ เช•เซ‹เชˆ เช…เชฐเซเชฅ เชจเชฅเซ€. เชชเชฐเช‚เชคเซ เชญเชฐเชฃเชจเซ‡ เชธเชซเชณ เช—เชฃเชตเซเช‚ เชชเชฃ เชตเชฟเชšเชฟเชคเซเชฐ เช›เซ‡. เชชเชฐเช‚เชคเซ เช† เช•เซ‹เชˆ เชญเซ‚เชฒ เชจเชฅเซ€. เช-เช†เชน-เช†เชน, เชถเซเช‚ เช•เชฐเชตเซเช‚ ?! เช…เชจเซ‡ เช…เชนเซ€เช‚ เชถเซเช‚ เช›เซ‡:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException เชเชฐเชซเซเชฒเซ‹เชจเซ‡ เช•เชนเซ‡ เช›เซ‡ เช•เซ‡ เชคเซเชฏเชพเช‚ เช•เซ‹เชˆ เชญเซ‚เชฒเซ‹ เชจเชฅเซ€, เชชเชฐเช‚เชคเซ เช…เชฎเซ‡ เช•เชพเชฐเซเชฏ เช›เซ‹เชกเซ€ เชฆเชˆเช เช›เซ€เช. เช‡เชจเซเชŸเชฐเชซเซ‡เชธเชฎเชพเช‚ เชฒเซ€เชฒเซ‹ เช…เชฅเชตเชพ เชฒเชพเชฒ เชšเซ‹เชฐเชธ เชจเชนเซ€เช‚, เชชเชฐเช‚เชคเซ เช—เซเชฒเชพเชฌเซ€ เชนเชถเซ‡.

เชšเชพเชฒเซ‹ เช†เชชเชฃเซ‹ เชกเซ‡เชŸเชพ เชŸเซ‰เชธ เช•เชฐเซ€เช เชฌเชนเซเชตเชฟเชง เช•เซ‰เชฒเชฎ:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

เชจเชพเชฎ:

  • เชกเซ‡เชŸเชพเชฌเซ‡เช เชœเซ‡เชฎเชพเช‚เชฅเซ€ เช…เชฎเซ‡ เช“เชฐเซเชกเชฐ เชฒเซ€เชงเซ‹ เชนเชคเซ‹,
  • เช…เชฎเชพเชฐเชพ เชชเซ‚เชฐ เชธเชคเซเชฐเชจเซเช‚ ID (เชคเซ‡ เช…เชฒเช— เชนเชถเซ‡ เชฆเชฐเซ‡เช• เช•เชพเชฐเซเชฏ เชฎเชพเชŸเซ‡),
  • เชธเซเชคเซเชฐเซ‹เชค เช…เชจเซ‡ เช“เชฐเซเชกเชฐ ID เชฎเชพเช‚เชฅเซ€ เชนเซ‡เชถ - เชœเซ‡เชฅเซ€ เช•เชฐเซ€เชจเซ‡ เช…เช‚เชคเชฟเชฎ เชกเซ‡เชŸเชพเชฌเซ‡เชเชฎเชพเช‚ (เชœเซเชฏเชพเช‚ เชฌเชงเซเช‚ เชเช• เชŸเซ‡เชฌเชฒเชฎเชพเช‚ เชฐเซ‡เชกเชตเชพเชฎเชพเช‚ เช†เชตเซ‡ เช›เซ‡) เช…เชฎเชพเชฐเซ€ เชชเชพเชธเซ‡ เชเช• เช…เชจเชจเซเชฏ เช“เชฐเซเชกเชฐ ID เช›เซ‡.

เช…เช‚เชคเชฟเชฎ เชชเช—เชฒเซเช‚ เชฌเชพเช•เซ€ เช›เซ‡: เชตเชฐเซเชŸเชฟเช•เชพเชฎเชพเช‚ เชฌเชงเซเช‚ เชฐเซ‡เชกเชตเซเช‚. เช…เชจเซ‡, เชตเชฟเชšเชฟเชคเซเชฐ เชฐเซ€เชคเซ‡, เช† เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡เชจเซ€ เชธเซŒเชฅเซ€ เช…เชฆเชญเซ‚เชค เช…เชจเซ‡ เช•เชพเชฐเซเชฏเช•เซเชทเชฎ เชฐเซ€เชคเซ‹เชฎเชพเช‚เชจเซ€ เชเช• CSV เชฆเซเชตเชพเชฐเชพ เช›เซ‡!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. เช…เชฎเซ‡ เชเช• เช–เชพเชธ เชฐเซ€เชธเซ€เชตเชฐ เชฌเชจเชพเชตเซ€ เชฐเชนเซเชฏเชพ เช›เซ€เช StringIO.
  2. pandas เช•เซƒเชชเชพ เช•เชฐเซ€เชจเซ‡ เช…เชฎเชพเชฐเชพ เชฎเซ‚เช•เชตเชพเชฎเชพเช‚ เช†เชตเชถเซ‡ DataFrame เชซเซ‹เชฐเซเชฎเชฎเชพเช‚ CSV-เชฒเชพเช‡เชจเซเชธ.
  3. เชšเชพเชฒเซ‹ เชนเซ‚เช• เชตเชกเซ‡ เช…เชฎเชพเชฐเชพ เชฎเชจเชชเชธเช‚เชฆ เชตเชฐเซเชŸเชฟเช•เชพเชจเซ‡ เช•เชจเซ‡เช•เซเชถเชจ เช–เซ‹เชฒเซ€เช.
  4. เช…เชจเซ‡ เชนเชตเซ‡ เชฎเชฆเชฆ เชธเชพเชฅเซ‡ copy() เช…เชฎเชพเชฐเซ‹ เชกเซ‡เชŸเชพ เชธเซ€เชงเซ‹ เชตเชฐเซเชŸเชฟเช•เชพเชฎเชพเช‚ เชฎเซ‹เช•เชฒเซ‹!

เช…เชฎเซ‡ เชกเซเชฐเชพเชˆเชตเชฐ เชชเชพเชธเซ‡เชฅเซ€ เชฒเชˆเชถเซเช‚ เช•เซ‡ เช•เซ‡เชŸเชฒเซ€ เชฒเชพเชˆเชจเซ‹ เชญเชฐเชพเชˆ เช›เซ‡, เช…เชจเซ‡ เชธเซ‡เชถเชจ เชฎเซ‡เชจเซ‡เชœเชฐเชจเซ‡ เช•เชนเซ€เชถเซเช‚ เช•เซ‡ เชฌเชงเซเช‚ เชฌเชฐเชพเชฌเชฐ เช›เซ‡:

session.loaded_rows = cursor.rowcount
session.successful = True

เชฌเชธ เชเชŸเชฒเซเช‚ เชœ.

เชตเซ‡เชšเชพเชฃ เชชเชฐ, เช…เชฎเซ‡ เชฒเช•เซเชทเซเชฏ เชชเซเชฒเซ‡เชŸ เชœเชพเชคเซ‡ เชฌเชจเชพเชตเซ€เช เช›เซ€เช. เช…เชนเซ€เช‚ เชฎเซ‡เช‚ เชฎเชพเชฐเซ€ เชœเชพเชคเชจเซ‡ เชเช• เชจเชพเชจเซ€ เชฎเชถเซ€เชจเชจเซ€ เชฎเช‚เชœเซ‚เชฐเซ€ เช†เชชเซ€:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

เชนเซเช‚ เชตเชพเชชเชฐเซเช‚ เช›เซเช‚ VerticaOperator() เชนเซเช‚ เชกเซ‡เชŸเชพเชฌเซ‡เช เชธเซเช•เซ€เชฎเชพ เช…เชจเซ‡ เชŸเซ‡เชฌเชฒ เชฌเชจเชพเชตเซเช‚ เช›เซเช‚ (เชœเซ‹ เชคเซ‡ เชชเชนเซ‡เชฒเชพเชฅเซ€ เช…เชธเซเชคเชฟเชคเซเชตเชฎเชพเช‚ เชจเชฅเซ€, เช…เชฒเชฌเชคเซเชค). เชฎเซเช–เซเชฏ เชตเชธเซเชคเซ เชจเชฟเชฐเซเชญเชฐเชคเชพเชจเซ‡ เชฏเซ‹เช—เซเชฏ เชฐเซ€เชคเซ‡ เช—เซ‹เช เชตเชตเชพเชจเซเช‚ เช›เซ‡:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

เชเช•เชคเซเชฐ เช•เชฐเชตเซเช‚

- เชธเชพเชฐเซเช‚, - เชจเชพเชจเชพ เช‰เช‚เชฆเชฐเซ‡ เช•เชนเซเชฏเซเช‚, - เชคเซ‡ เชนเชตเซ‡ เชจเชฅเซ€
เชถเซเช‚ เชคเชฎเชจเซ‡ เช–เชพเชคเชฐเซ€ เช›เซ‡ เช•เซ‡ เชนเซเช‚ เชœเช‚เช—เชฒเชจเซ‹ เชธเซŒเชฅเซ€ เชญเชฏเช‚เช•เชฐ เชชเซเชฐเชพเชฃเซ€ เช›เซเช‚?

เชœเซเชฒเชฟเชฏเชพ เชกเซ‹เชจเชพเชฒเซเชกเชธเชจ, เชง เช—เซเชฐเซเชซเชพเชฒเซ‹

เชฎเชจเซ‡ เชฒเชพเช—เซ‡ เช›เซ‡ เช•เซ‡ เชœเซ‹ เชฎเชพเชฐเชพ เชธเชพเชฅเซ€เชฆเชพเชฐเซ‹ เช…เชจเซ‡ เชฎเชพเชฐเซ€ เชตเชšเซเชšเซ‡ เชธเซเชชเชฐเซเชงเชพ เชนเซ‹เชฏ: เช•เซ‹เชฃ เชเชกเชชเชฅเซ€ เชถเชฐเซ‚เช†เชคเชฅเซ€ ETL เชชเซเชฐเช•เซเชฐเชฟเชฏเชพ เชฌเชจเชพเชตเชถเซ‡ เช…เชจเซ‡ เชฒเซ‹เชจเซเชš เช•เชฐเชถเซ‡: เชคเซ‡เช“ เชคเซ‡เชฎเชจเชพ SSIS เช…เชจเซ‡ เชฎเชพเช‰เชธ เชธเชพเชฅเซ‡ เช…เชจเซ‡ เชนเซเช‚ เชเชฐเชซเซเชฒเซ‹ เชธเชพเชฅเซ‡... เช…เชจเซ‡ เชชเช›เซ€ เช…เชฎเซ‡ เชœเชพเชณเชตเชฃเซ€เชจเซ€ เชธเชฐเชณเชคเชพเชจเซ€ เชชเชฃ เชคเซเชฒเชจเชพ เช•เชฐเซ€เชถเซเช‚ ... เชตเชพเชน, เชฎเชจเซ‡ เชฒเชพเช—เซ‡ เช›เซ‡ เช•เซ‡ เชคเชฎเซ‡ เชธเช‚เชฎเชค เชฅเชถเซ‹ เช•เซ‡ เชนเซเช‚ เชคเซ‡เชฎเชจเซ‡ เชคเชฎเชพเชฎ เชฎเซ‹เชฐเชšเซ‡ เชนเชฐเชพเชตเซ€เชถ!

เชœเซ‹ เชฅเซ‹เชกเซ€ เชตเชงเซ เช—เช‚เชญเซ€เชฐเชคเชพเชฅเซ€, เชคเซ‹ เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹ - เชชเซเชฐเซ‹เช—เซเชฐเชพเชฎ เช•เซ‹เชกเชจเชพ เชฐเซ‚เชชเชฎเชพเช‚ เชชเซเชฐเช•เซเชฐเชฟเชฏเชพเช“เชจเซเช‚ เชตเชฐเซเชฃเชจ เช•เชฐเซ€เชจเซ‡ - เชฎเชพเชฐเซเช‚ เช•เชพเชฎ เช•เชฐเซเชฏเซเช‚ เช˜เชฃเซเช‚ เชตเชงเซ เช†เชฐเชพเชฎเชฆเชพเชฏเช• เช…เชจเซ‡ เช†เชจเช‚เชฆเชชเซเชฐเชฆ.

เชคเซ‡เชจเซ€ เช…เชฎเชฐเซเชฏเชพเชฆเชฟเชค เชเช•เซเชธเซเชŸเซ‡เชจเซเชธเชฟเชฌเชฟเชฒเชฟเชŸเซ€, เชชเซเชฒเช—-เช‡เชจเซเชธเชจเซ€ เชฆเซเชฐเชทเซเชŸเชฟเช เช…เชจเซ‡ เชฎเชพเชชเชจเซ€เชฏเชคเชพเชจเชพ เชตเชฒเชฃ เชฌเช‚เชจเซ‡เชฎเชพเช‚, เชคเชฎเชจเซ‡ เชฒเช—เชญเช— เช•เซ‹เชˆเชชเชฃ เช•เซเชทเซ‡เชคเซเชฐเชฎเชพเช‚ เชเชฐเชซเซเชฒเซ‹เชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเชตเชพเชจเซ€ เชคเช• เช†เชชเซ‡ เช›เซ‡: เชกเซ‡เชŸเชพ เชเช•เชคเซเชฐเชฟเชค เช•เชฐเชตเชพ, เชคเซˆเชฏเชพเชฐ เช•เชฐเชตเชพ เช…เชจเซ‡ เชชเซเชฐเช•เซเชฐเชฟเชฏเชพ เช•เชฐเชตเชพเชจเชพ เชธเช‚เชชเซ‚เชฐเซเชฃ เชšเช•เซเชฐเชฎเชพเช‚ เชชเชฃ, เชฐเซ‹เช•เซ‡เชŸ เชฒเซ‹เชจเซเชš เช•เชฐเชตเชพเชฎเชพเช‚ เชชเชฃ (เชฎเช‚เช—เชณ เชชเชฐ, เช•เซ‹เชฐเซเชธ).

เชญเชพเช— เช…เช‚เชคเชฟเชฎ, เชธเช‚เชฆเชฐเซเชญ เช…เชจเซ‡ เชฎเชพเชนเชฟเชคเซ€

เชฆเชพเช‚เชคเซ€ เช…เชฎเซ‡ เชคเชฎเชพเชฐเชพ เชฎเชพเชŸเซ‡ เชเช•เชคเซเชฐเชฟเชค เช•เชฐเซ€ เช›เซ‡

  • start_date. เชนเชพ, เช† เชชเชนเซ‡เชฒเซ‡เชฅเซ€ เชœ เชธเซเชฅเชพเชจเชฟเช• เชฎเซ‡เชฎ เช›เซ‡. เชตเชพเชฏเชพ เชกเซ‰เช—เชจเซ€ เชฎเซเช–เซเชฏ เชฆเชฒเซ€เชฒ start_date เชฌเชงเชพ เชชเชพเชธ. เชธเช‚เช•เซเชทเชฟเชชเซเชคเชฎเชพเช‚, เชœเซ‹ เชคเชฎเซ‡ เชธเซเชชเชทเซเชŸ เช•เชฐเซ‹ เช›เซ‹ start_date เชตเชฐเซเชคเชฎเชพเชจ เชคเชพเชฐเซ€เช–, เช…เชจเซ‡ schedule_interval - เชเช• เชฆเชฟเชตเชธ, เชชเช›เซ€ เชกเซ€เชเชœเซ€ เช•เชพเชฒเซ‡ เชถเชฐเซ‚ เชฅเชถเซ‡, เชตเชนเซ‡เชฒเซเช‚ เชจเชนเซ€เช‚.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    เช…เชจเซ‡ เช•เซ‹เชˆ เชตเชงเซ เชธเชฎเชธเซเชฏเชพ เชจเชฅเซ€.

    เชคเซ‡เชจเซ€ เชธเชพเชฅเซ‡ เชธเช‚เช•เชณเชพเชฏเซ‡เชฒ เชฌเซ€เชœเซ€ เชฐเชจเชŸเชพเช‡เชฎ เชญเซ‚เชฒ เช›เซ‡: Task is missing the start_date parameter, เชœเซ‡ เชฎเซ‹เชŸเซ‡ เชญเชพเช—เซ‡ เชธเซ‚เชšเชตเซ‡ เช›เซ‡ เช•เซ‡ เชคเชฎเซ‡ เชกเซ‡เช— เช“เชชเชฐเซ‡เชŸเชฐ เชธเชพเชฅเซ‡ เชœเซ‹เชกเชตเชพเชจเซเช‚ เชญเซ‚เชฒเซ€ เช—เชฏเชพ เช›เซ‹.

  • เชฌเชงเชพ เชเช• เชฎเชถเซ€เชจ เชชเชฐ. เชนเชพ, เช…เชจเซ‡ เชชเชพเชฏเชพ (เชเชฐเชซเซเชฒเซ‹ เชชเซ‹เชคเซ‡ เช…เชจเซ‡ เช…เชฎเชพเชฐเซเช‚ เช•เซ‹เชŸเชฟเช‚เช—), เช…เชจเซ‡ เชตเซ‡เชฌ เชธเชฐเซเชตเชฐ, เช…เชจเซ‡ เชถเซ‡เชกเซเชฏเซ‚เชฒเชฐ เช…เชจเซ‡ เช•เชพเชฎเชฆเชพเชฐเซ‹. เช…เชจเซ‡ เชคเซ‡ เช•เชพเชฎ เชชเชฃ เช•เชฐเซเชฏเซเช‚. เชชเชฐเช‚เชคเซ เชธเชฎเชฏ เชœเชคเชพเช‚, เชธเซ‡เชตเชพเช“ เชฎเชพเชŸเซ‡เชจเชพ เช•เชพเชฐเซเชฏเซ‹เชจเซ€ เชธเช‚เช–เซเชฏเชพเชฎเชพเช‚ เชตเชงเชพเชฐเซ‹ เชฅเชคเซ‹ เช—เชฏเซ‹, เช…เชจเซ‡ เชœเซเชฏเชพเชฐเซ‡ PostgreSQL เช 20 ms เชจเซ‡ เชฌเชฆเชฒเซ‡ 5 s เชฎเชพเช‚ เช…เชจเซเช•เซเชฐเชฎเชฃเชฟเช•เชพเชจเซ‹ เชชเซเชฐเชคเชฟเชธเชพเชฆ เช†เชชเชตเชพเชจเซเช‚ เชถเชฐเซ‚ เช•เชฐเซเชฏเซเช‚, เชคเซเชฏเชพเชฐเซ‡ เช…เชฎเซ‡ เชคเซ‡เชจเซ‡ เชฒเชˆ เชฒเซ€เชงเซเช‚ เช…เชจเซ‡ เชฒเชˆ เช—เชฏเชพ.
  • LocalExecutor. เชนเชพ, เช…เชฎเซ‡ เชนเชœเซ€ เชชเชฃ เชคเซ‡เชจเชพ เชชเชฐ เชฌเซ‡เช เชพ เช›เซ€เช, เช…เชจเซ‡ เช…เชฎเซ‡ เชชเชนเซ‡เชฒเซ‡เชฅเซ€ เชœ เชชเชพเชคเชพเชณเชจเซ€ เชงเชพเชฐ เชชเชฐ เช†เชตเซ€ เช—เชฏเชพ เช›เซ€เช. LocalExecutor เช…เชคเซเชฏเชพเชฐ เชธเซเชงเซ€ เช…เชฎเชพเชฐเชพ เชฎเชพเชŸเซ‡ เชชเซ‚เชฐเชคเซเช‚ เชนเชคเซเช‚, เชชเชฐเช‚เชคเซ เชนเชตเซ‡ เช“เช›เชพเชฎเชพเช‚ เช“เช›เชพ เชเช• เช•เชพเชฐเซเชฏเช•เชฐ เชธเชพเชฅเซ‡ เชตเชฟเชธเซเชคเชฐเชฃ เช•เชฐเชตเชพเชจเซ‹ เชธเชฎเชฏ เช†เชตเซ€ เช—เชฏเซ‹ เช›เซ‡ เช…เชจเซ‡ CeleryExecutor เชชเชฐ เชœเชตเชพ เชฎเชพเชŸเซ‡ เช…เชฎเชพเชฐเซ‡ เชธเช–เชค เชฎเชนเซ‡เชจเชค เช•เชฐเชตเซ€ เชชเชกเชถเซ‡. เช…เชจเซ‡ เช เชนเช•เซ€เช•เชคเชจเซ‡ เชงเซเชฏเชพเชจเชฎเชพเช‚ เชฐเชพเช–เซ€เชจเซ‡ เช•เซ‡ เชคเชฎเซ‡ เชคเซ‡เชจเซ€ เชธเชพเชฅเซ‡ เชเช• เชฎเชถเซ€เชจ เชชเชฐ เช•เชพเชฎ เช•เชฐเซ€ เชถเช•เซ‹ เช›เซ‹, เชธเชฐเซเชตเชฐ เชชเชฐ เชชเชฃ เชธเซ‡เชฒเชฐเซ€เชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเชตเชพเชฅเซ€ เชคเชฎเชจเซ‡ เช•เช‚เชˆเชชเชฃ เชฐเซ‹เช•เชคเซเช‚ เชจเชฅเซ€, เชœเซ‡ "เช…เชฒเชฌเชคเซเชค, เชชเซเชฐเชฎเชพเชฃเชฟเช•เชชเชฃเซ‡ เช•เซเชฏเชพเชฐเซ‡เชฏ เช‰เชคเซเชชเชพเชฆเชจเชฎเชพเช‚ เชœเชถเซ‡ เชจเชนเซ€เช‚!"
  • เชฌเชฟเชจ-เช‰เชชเชฏเซ‹เช— เชฌเชฟเชฒเซเชŸ-เช‡เชจ เชธเชพเชงเชจเซ‹:
    • เช•เชจเซ‡เช•เซเชถเชจเซเชธ เชธเซ‡เชตเชพ เช“เชณเช–เชชเชคเซเชฐเซ‹ เชธเช‚เช—เซเชฐเชนเชตเชพ เชฎเชพเชŸเซ‡,
    • SLA เชšเซ‚เช•เซ€ เชœเชพเชฏ เช›เซ‡ เชธเชฎเชฏเชธเชฐ เช•เชพเชฎ เชจ เช•เชฐเชคเชพ เช•เชพเชฐเซเชฏเซ‹เชจเซ‹ เชœเชตเชพเชฌ เช†เชชเชตเชพ เชฎเชพเชŸเซ‡,
    • เชเช•เซเชธเช•เซ‹เชฎ เชฎเซ‡เชŸเชพเชกเซ‡เชŸเชพ เชตเชฟเชจเชฟเชฎเชฏ เชฎเชพเชŸเซ‡ (เชฎเซ‡เช‚ เช•เชนเซเชฏเซเช‚ เชฎเซ‡เชŸเชพเชกเซ‡เชŸเชพ!) เชกเซ‡เช— เช•เชพเชฐเซเชฏเซ‹ เชตเชšเซเชšเซ‡.
  • เชฎเซ‡เชฒ เชฆเซเชฐเซเชชเชฏเซ‹เช—. เชธเชพเชฐเซเช‚, เชนเซเช‚ เชถเซเช‚ เช•เชนเซ€ เชถเช•เซเช‚? เช˜เชŸเซ€ เช—เชฏเซ‡เชฒเชพ เช•เชพเชฐเซเชฏเซ‹เชจเชพ เชคเชฎเชพเชฎ เชชเซเชจเชฐเชพเชตเชฐเซเชคเชจ เชฎเชพเชŸเซ‡ เชšเซ‡เชคเชตเชฃเซ€เช“ เช—เซ‹เช เชตเชตเชพเชฎเชพเช‚ เช†เชตเซ€ เชนเชคเซ€. เชนเชตเซ‡ เชฎเชพเชฐเซเช‚ เช•เชพเชฐเซเชฏ Gmail เชฎเชพเช‚ เชเชฐเชซเซเชฒเซ‹ เชคเชฐเชซเชฅเซ€ >90k เช‡เชฎเซ‡เช‡เชฒเซเชธ เช›เซ‡, เช…เชจเซ‡ เชตเซ‡เชฌ เชฎเซ‡เช‡เชฒ เชฎเชเชฒ เชเช• เชธเชฎเชฏเซ‡ 100 เชฅเซ€ เชตเชงเซ เช‰เชชเชพเชกเชตเชพเชจเซ‹ เช…เชจเซ‡ เช•เชพเชขเซ€ เชจเชพเช–เชตเชพเชจเซ‹ เช‡เชจเช•เชพเชฐ เช•เชฐเซ‡ เช›เซ‡.

เชตเชงเซ เชฎเซเชถเซเช•เซ‡เชฒเซ€เช“: เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹ เชชเชฟเชŸเชซเซ‡เชฒเซเชธ

เชตเชงเซ เช“เชŸเซ‹เชฎเซ‡เชถเชจ เชธเชพเชงเชจเซ‹

เช…เชฎเซ‡ เช…เชฎเชพเชฐเชพ เชนเชพเชฅเชฅเซ€ เชจเชนเซ€เช‚ เชชเชฃ เช…เชฎเชพเชฐเชพ เชฎเชพเชฅเชพเชฅเซ€ เชตเชงเซ เช•เชพเชฎ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡, เชเชฐเชซเซเชฒเซ‹เช เช…เชฎเชพเชฐเชพ เชฎเชพเชŸเซ‡ เช† เชคเซˆเชฏเชพเชฐ เช•เชฐเซเชฏเซเช‚ เช›เซ‡:

  • REST API - เชคเซ‡เชจเซ€ เชชเชพเชธเซ‡ เชนเชœเซ€ เชชเชฃ เชชเซเชฐเชพเชฏเซ‹เช—เชฟเช• เชธเซเชฅเชฟเชคเชฟ เช›เซ‡, เชœเซ‡ เชคเซ‡เชจเซ‡ เช•เชพเชฎ เช•เชฐเชคเชพ เช…เชŸเช•เชพเชตเชคเซ€ เชจเชฅเซ€. เชคเซ‡เชจเซ€ เชธเชพเชฅเซ‡, เชคเชฎเซ‡ เชฎเชพเชคเซเชฐ เชกเซ…เช—เซเชธ เช…เชจเซ‡ เช•เชพเชฐเซเชฏเซ‹ เชตเชฟเชถเซ‡ เชœ เชฎเชพเชนเชฟเชคเซ€ เชฎเซ‡เชณเชตเซ€ เชถเช•เชคเชพ เชจเชฅเซ€, เชชเชฐเช‚เชคเซ เชกเซ‡เช—เชจเซ‡ เชฌเช‚เชง/เชถเชฐเซ‚ เช•เชฐเซ€ เชถเช•เซ‹ เช›เซ‹, DAG เชฐเชจ เช…เชฅเชตเชพ เชชเซ‚เชฒ เชฌเชจเชพเชตเซ€ เชถเช•เซ‹ เช›เซ‹.
  • CLI - เช†เชฆเซ‡เชถ เชตเชพเช•เซเชฏ เชฆเซเชตเชพเชฐเชพ เช˜เชฃเชพ เชธเชพเชงเชจเซ‹ เช‰เชชเชฒเชฌเซเชง เช›เซ‡ เชœเซ‡ เชซเช•เซเชค WebUI เชฆเซเชตเชพเชฐเชพ เช‰เชชเชฏเซ‹เช—เชฎเชพเช‚ เชฒเซ‡เชตเชพ เชฎเชพเชŸเซ‡ เช…เชธเซเชตเชฟเชงเชพเชœเชจเช• เชจเชฅเซ€, เชชเชฐเช‚เชคเซ เชธเชพเชฎเชพเชจเซเชฏ เชฐเซ€เชคเซ‡ เช—เซ‡เชฐเชนเชพเชœเชฐ เช›เซ‡. เชฆเชพเช–เซเชฒเชพ เชคเชฐเซ€เช•เซ‡:
    • backfill เช•เชพเชฐเซเชฏ เช‰เชฆเชพเชนเชฐเชฃเซ‹ เชชเซเชจเชƒเชชเซเชฐเชพเชฐเช‚เชญ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡ เชœเชฐเซ‚เชฐเซ€ เช›เซ‡.
      เช‰เชฆเชพเชนเชฐเชฃ เชคเชฐเซ€เช•เซ‡, เชตเชฟเชถเซเชฒเซ‡เชทเช•เซ‹ เช†เชตเซเชฏเชพ เช…เชจเซ‡ เช•เชนเซเชฏเซเช‚: โ€œเช…เชจเซ‡ เชคเชฎเซ‡, เชธเชพเชฅเซ€, 1 เชฅเซ€ 13 เชœเชพเชจเซเชฏเซเช†เชฐเซ€ เชธเซเชงเซ€เชจเชพ เชกเซ‡เชŸเชพเชฎเชพเช‚ เชฌเช•เชตเชพเชธ เช›เซ‡! เชคเซ‡เชจเซ‡ เช เซ€เช• เช•เชฐเซ‹, เชคเซ‡เชจเซ‡ เช เซ€เช• เช•เชฐเซ‹, เชคเซ‡เชจเซ‡ เช เซ€เช• เช•เชฐเซ‹, เชคเซ‡เชจเซ‡ เช เซ€เช• เช•เชฐเซ‹!" เช…เชจเซ‡ เชคเชฎเซ‡ เช†เชตเชพ เชนเซ‹เชฌ เช›เซ‹:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • เช†เชงเชพเชฐ เชธเซ‡เชตเชพ: initdb, resetdb, upgradedb, checkdb.
    • run, เชœเซ‡ เชคเชฎเชจเซ‡ เชเช• เช‰เชฆเชพเชนเชฐเชฃ เช•เชพเชฐเซเชฏ เชšเชฒเชพเชตเชตเชพ เชฎเชพเชŸเซ‡ เชชเชฐเชตเชพเชจเช—เซ€ เช†เชชเซ‡ เช›เซ‡, เช…เชจเซ‡ เชคเชฎเชพเชฎ เชจเชฟเชฐเซเชญเชฐเชคเชพเช“ เชชเชฐ เชชเชฃ เชธเซเช•เซ‹เชฐ เช•เชฐเซ‡ เช›เซ‡. เชตเชงเซเชฎเชพเช‚, เชคเชฎเซ‡ เชคเซ‡เชจเซ‡ เชฎเชพเชฐเชซเชคเซ‡ เชšเชฒเชพเชตเซ€ เชถเช•เซ‹ เช›เซ‹ LocalExecutor, เชญเชฒเซ‡ เชคเชฎเชพเชฐเซ€ เชชเชพเชธเซ‡ เชธเซ‡เชฒเชฐเซ€ เช•เซเชฒเชธเซเชŸเชฐ เชนเซ‹เชฏ.
    • เชฒเช—เชญเช— เชเช• เชœ เชตเชธเซเชคเซ เช•เชฐเซ‡ เช›เซ‡ test, เชซเช•เซเชค เชชเชพเชฏเชพเชฎเชพเช‚ เชชเชฃ เช•เช‚เชˆ เชฒเช–เชคเซเช‚ เชจเชฅเซ€.
    • connections เชถเซ‡เชฒเชฎเชพเช‚เชฅเซ€ เช•เชจเซ‡เช•เซเชถเชจ เชธเชพเชฎเซ‚เชนเชฟเช• เชฌเชจเชพเชตเชตเชพเชจเซ€ เชฎเช‚เชœเซ‚เชฐเซ€ เช†เชชเซ‡ เช›เซ‡.
  • เชชเชพเชฏเชฅเซ‹เชจ API - เช•เซเชฐเชฟเชฏเชพเชชเซเชฐเชคเชฟเช•เซเชฐเชฟเชฏเชพ เช•เชฐเชตเชพเชจเซ€ เชเช• เชœเช—เซเชฏเชพเช เชนเชพเชฐเซเชกเช•เซ‹เชฐ เชฐเซ€เชค, เชœเซ‡ เชชเซเชฒเช—เช‡เชจเซเชธ เชฎเชพเชŸเซ‡ เชฌเชจเชพเชตเชพเชฏเซ‡เชฒ เช›เซ‡, เช…เชจเซ‡ เชคเซ‡เชฎเชพเช‚ เชจเชพเชจเชพ เชนเชพเชฅเชฅเซ€ เชเซ‚เชฎเชคเชพ เชจเชฅเซ€. เชชเชฃ เช…เชฎเชจเซ‡ เชœเชคเชพ เช•เซ‹เชฃ เชฐเซ‹เช•เซ‡ /home/airflow/dags, เชšเชฒเชพเชตเซ‹ ipython เช…เชจเซ‡ เช†เชธเชชเชพเชธ เช—เชกเชฌเชก เชถเชฐเซ‚? เชคเชฎเซ‡, เช‰เชฆเชพเชนเชฐเชฃ เชคเชฐเซ€เช•เซ‡, เชจเซ€เชšเซ‡เชจเชพ เช•เซ‹เชก เชธเชพเชฅเซ‡ เชคเชฎเชพเชฎ เช•เชจเซ‡เช•เซเชถเชจเซเชธ เชจเชฟเช•เชพเชธ เช•เชฐเซ€ เชถเช•เซ‹ เช›เซ‹:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • เชเชฐเชซเซเชฒเซ‹ เชฎเซ‡เชŸเชพเชกเซ‡เชŸเชพเชฌเซ‡เช เชธเชพเชฅเซ‡ เช•เชจเซ‡เช•เซเชŸ เชฅเชˆ เชฐเชนเซเชฏเซเช‚ เช›เซ‡. เชนเซเช‚ เชคเซ‡เชจเซ‡ เชฒเช–เชตเชพเชจเซ€ เชญเชฒเชพเชฎเชฃ เช•เชฐเชคเซ‹ เชจเชฅเซ€, เชชเชฐเช‚เชคเซ เชตเชฟเชตเชฟเชง เชตเชฟเชถเชฟเชทเซเชŸ เชฎเซ‡เชŸเซเชฐเชฟเช•เซเชธ เชฎเชพเชŸเซ‡ เช•เชพเชฐเซเชฏ เชธเซเชฅเชฟเชคเชฟเช“ เชฎเซ‡เชณเชตเชตเซ€ เช เช•เซ‹เชˆเชชเชฃ API เชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเชคเชพเช‚ เชตเชงเซ เชเชกเชชเซ€ เช…เชจเซ‡ เชธเชฐเชณ เชนเซ‹เชˆ เชถเช•เซ‡ เช›เซ‡.

    เชšเชพเชฒเซ‹ เช†เชชเชฃเซ‡ เช•เชนเซ€เช เช•เซ‡ เช†เชชเชฃเชพเช‚ เชฌเชงเชพเช‚ เช•เชพเชฐเซเชฏเซ‹ เชจเชฟเชฐเชพเชงเชพเชฐ เชจเชฅเซ€, เชชเชฐเช‚เชคเซ เชคเซ‡ เช•เซเชฏเชพเชฐเซ‡เช• เชชเชกเซ€ เชถเช•เซ‡ เช›เซ‡, เช…เชจเซ‡ เช† เชธเชพเชฎเชพเชจเซเชฏ เช›เซ‡. เชชเชฐเช‚เชคเซ เชฅเซ‹เชกเชพ เช…เชตเชฐเซ‹เชงเซ‹ เชชเชนเซ‡เชฒเซ‡เชฅเซ€ เชœ เชถเช‚เช•เชพเชธเซเชชเชฆ เช›เซ‡, เช…เชจเซ‡ เชคเซ‡ เชคเชชเชพเชธเชตเซเช‚ เชœเชฐเซ‚เชฐเซ€ เช›เซ‡.

    เชเชธเช•เซเชฏเซเชเชฒ เชธเชพเชตเชšเซ‡เชค เชฐเชนเซ‹!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

เชธเช‚เชฆเชฐเซเชญเซ‹

เช…เชจเซ‡ เช…เชฒเชฌเชคเซเชค, Google เชจเชพ เช‡เชถเซเชฏเซเชจเซ€ เชชเซเชฐเชฅเชฎ เชฆเชธ เชฒเชฟเช‚เช•เซเชธ เชฎเชพเชฐเชพ เชฌเซเช•เชฎเชพเชฐเซเช•เซเชธเชฎเชพเช‚เชฅเซ€ เชเชฐเชซเซเชฒเซ‹ เชซเซ‹เชฒเซเชกเชฐเชจเซ€ เชธเชพเชฎเช—เซเชฐเซ€ เช›เซ‡.

เช…เชจเซ‡ เชฒเซ‡เช–เชฎเชพเช‚ เชตเชชเชฐเชพเชฏเซ‡เชฒ เชฒเชฟเช‚เช•เซเชธ:

เชธเซ‹เชฐเซเชธ: www.habr.com