เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

เดนเดพเดฏเต, เดžเดพเตป เดฆเดฟเดฎเดฟเดคเตเดฐเดฟ เดฒเต‹เด—เตโ€Œเดตเดฟเดจเต†เด™เตเด•เต‹ - เดตเต†เดธเต†เดฑเตเดฑเต เด—เตเดฐเต‚เดชเตเดชเต เด“เดซเต เด•เดฎเตเดชเดจเดฟเด•เดณเตเดŸเต† เด…เดจเดฒเดฟเดฑเตเดฑเดฟเด•เตโ€Œเดธเต เดกเดฟเดชเตเดชเดพเตผเดŸเตเดŸเตโ€Œเดฎเต†เดจเตเดฑเดฟเดจเตเดฑเต† เดกเดพเดฑเตเดฑเดพ เดŽเดžเตเดšเดฟเดจเต€เดฏเตผ.

ETL เดชเตเดฐเด•เตเดฐเดฟเดฏเด•เตพ เดตเดฟเด•เดธเดฟเดชเตเดชเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเดจเตเดณเตเดณ เด’เดฐเต เด…เดคเตเดญเตเดคเด•เดฐเดฎเดพเดฏ เด‰เดชเด•เดฐเดฃเดคเตเดคเต†เด•เตเด•เตเดฑเดฟเดšเตเดšเต เดžเดพเตป เดจเดฟเด™เตเด™เดณเต‹เดŸเต เดชเดฑเดฏเตเด‚ - เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹. เดŽเดจเตเดจเดพเตฝ เดŽเดฏเตผเดซเตเดฒเต‹ เดตเดณเดฐเต† เดตเตˆเดตเดฟเดงเตเดฏเดฎเดพเตผเดจเตเดจเดคเตเด‚ เดฌเดนเตเดฎเตเด–เดตเตเดฎเดพเดฃเต, เดจเดฟเด™เตเด™เตพ เดกเดพเดฑเตเดฑเดพ เดซเตเดฒเต‹เด•เดณเดฟเตฝ เดเตผเดชเตเดชเต†เดŸเตเดŸเดฟเดŸเตเดŸเดฟเดฒเตเดฒเต†เด™เตเด•เดฟเดฒเตเด‚ เดจเดฟเด™เตเด™เตพ เด…เดคเต เดธเต‚เด•เตเดทเตเดฎเดฎเดพเดฏเดฟ เดชเดฐเดฟเดถเต‹เดงเดฟเด•เตเด•เดฃเด‚, เดŽเดจเตเดจเดพเตฝ เดเดคเต†เด™เตเด•เดฟเดฒเตเด‚ เดชเตเดฐเด•เตเดฐเดฟเดฏเด•เตพ เด‡เดŸเดฏเตเด•เตเด•เดฟเดŸเต† เดธเดฎเดพเดฐเด‚เดญเดฟเด•เตเด•เตเด•เดฏเตเด‚ เด…เดตเดฏเตเดŸเต† เดจเดฟเตผเดตเตเดตเดนเดฃเด‚ เดจเดฟเดฐเต€เด•เตเดทเดฟเด•เตเด•เตเด•เดฏเตเด‚ เดšเต†เดฏเตเดฏเต‡เดฃเตเดŸเดคเต เด†เดตเดถเตเดฏเดฎเดพเดฃเต.

เด…เดคเต†, เดžเดพเตป เดชเดฑเดฏเตเด• เดฎเดพเดคเตเดฐเดฎเดฒเตเดฒ, เด•เดพเดฃเดฟเด•เตเด•เตเด•เดฏเตเด‚ เดšเต†เดฏเตเดฏเตเด‚: เดชเตเดฐเต‹เด—เตเดฐเดพเดฎเดฟเดจเต เดงเดพเดฐเดพเดณเด‚ เด•เต‹เดกเตเด•เดณเตเด‚ เดธเตเด•เตเดฐเต€เตปเดทเต‹เดŸเตเดŸเตเด•เดณเตเด‚ เดถเตเดชเดพเตผเดถเด•เดณเตเด‚ เด‰เดฃเตเดŸเต.

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต
เดŽเดฏเตผเดซเตเดฒเต‹ / เดตเดฟเด•เตเด•เดฟเดฎเต€เดกเดฟเดฏ เด•เต‹เดฎเตบเดธเต เดŽเดจเตเดจ เดตเดพเด•เตเด•เต เด—เต‚เด—เดฟเตพ เดšเต†เดฏเตเดฏเตเดฎเตเดชเต‹เตพ เดจเดฟเด™เตเด™เตพ เดธเดพเดงเดพเดฐเดฃเดฏเดพเดฏเดฟ เด•เดพเดฃเตเดจเตเดจเดคเต

เด‰เดณเตเดณเดŸเด•เตเด• เดชเดŸเตเดŸเดฟเด•

เด†เดฎเตเด–เด‚

เดœเดพเด™เตเด•เต‹ เดชเต‹เดฒเต†เดฏเดพเดฃเต เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹:

  • เดชเตˆเดคเตเดคเดฃเดฟเตฝ เดŽเดดเตเดคเดฟเดฏเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจเต
  • เด’เดฐเต เดตเดฒเดฟเดฏ เด…เดกเตเดฎเดฟเตป เดชเดพเดจเตฝ เด‰เดฃเตเดŸเต,
  • เด…เดจเดฟเดถเตเดšเดฟเดคเดฎเดพเดฏเดฟ เดตเดฟเด•เดธเดฟเดชเตเดชเดฟเด•เตเด•เดพเดตเตเดจเตเดจ

- เดจเดฒเตเดฒเดคเต เดฎเดพเดคเตเดฐเด‚, เด‡เดคเต เดคเดฟเด•เดšเตเดšเตเด‚ เดตเตเดฏเดคเตเดฏเดธเตเดคเดฎเดพเดฏ เด‰เดฆเตเดฆเต‡เดถเตเดฏเด™เตเด™เตพเด•เตเด•เดพเดฏเดฟ เดจเดฟเตผเดฎเตเดฎเดฟเดšเตเดšเดคเดพเดฃเต, เด…เดคเดพเดฏเดคเต (เด•เดพเดฑเตเดฑเดฟเดจเต เดฎเตเดฎเตเดชเต เดŽเดดเตเดคเดฟเดฏเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจเดคเตเดชเต‹เดฒเต†):

  • เดชเดฐเดฟเดงเดฟเดฏเดฟเดฒเตเดฒเดพเดคเตเดค เดฎเต†เดทเต€เดจเตเด•เดณเดฟเตฝ เดŸเดพเดธเตโ€Œเด•เตเด•เตเด•เตพ เดชเตเดฐเดตเตผเดคเตเดคเดฟเดชเตเดชเดฟเด•เตเด•เตเดจเตเดจเดคเตเด‚ เดจเดฟเดฐเต€เด•เตเดทเดฟเด•เตเด•เตเดจเตเดจเดคเตเด‚ (เดจเดฟเดฐเดตเดงเดฟ เดธเต†เดฒเดฑเดฟ / เด•เตเดฌเตผเดจเต†เดฑเตเดฑเตเด•เดณเตเด‚ เดจเดฟเด™เตเด™เดณเตเดŸเต† เดฎเดจเดธเตเดธเดพเด•เตเดทเดฟเดฏเตเด‚ เดจเดฟเด™เตเด™เดณเต† เด…เดจเตเดตเดฆเดฟเด•เตเด•เตเด‚)
  • เดชเตˆเดคเตเดคเตบ เด•เต‹เดกเต เดŽเดดเตเดคเดพเดจเตเด‚ เดฎเดจเดธเตเดธเดฟเดฒเดพเด•เตเด•เดพเดจเตเด‚ เดตเดณเดฐเต† เดŽเดณเตเดชเตเดชเดฎเตเดณเตเดณ เดกเตˆเดจเดพเดฎเดฟเด•เต เดตเตผเด•เตเด•เตเดซเตเดฒเต‹ เดœเดจเดฑเต‡เดทเตป
  • เด•เต‚เดŸเดพเดคเต† เดฑเต†เดกเดฟเดฎเต†เดฏเตเดกเต เด˜เดŸเด•เด™เตเด™เดณเตเด‚ เดนเต‹เด‚-เดฎเต†เดฏเตเดกเต เดชเตเดฒเด—เดฟเดจเตเด•เดณเตเด‚ เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เดเดคเต†เด™เตเด•เดฟเดฒเตเด‚ เดกเดพเดฑเตเดฑเดพเดฌเต‡เดธเตเด•เดณเตเด‚ API-เด•เดณเตเด‚ เดชเดฐเดธเตเดชเดฐเด‚ เดฌเดจเตเดงเดฟเดชเตเดชเดฟเด•เตเด•เดพเดจเตเดณเตเดณ เด•เดดเดฟเดตเต (เด…เดคเต เดตเดณเดฐเต† เดฒเดณเดฟเดคเดฎเดพเดฃเต).

เดžเด™เตเด™เตพ เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹ เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เตเดจเตเดจเดคเต เด‡เดคเตเดชเต‹เดฒเต†เดฏเดพเดฃเต:

  • DWH-เดฒเตเด‚ ODS-เดฒเตเด‚ (เดžเด™เตเด™เตพเด•เตเด•เต เดตเต†เตผเดŸเตเดŸเดฟเด•เตเด•เดฏเตเด‚ เด•เตเดฒเดฟเด•เตเด•เตเดนเต—เดธเตเด‚ เด‰เดฃเตเดŸเต) เดตเดฟเดตเดฟเดง เด‰เดฑเดตเดฟเดŸเด™เตเด™เดณเดฟเตฝ เดจเดฟเดจเตเดจเต (เดจเดฟเดฐเดตเดงเดฟ SQL เดธเต†เตผเดตเตผ, PostgreSQL เดธเด‚เดญเดตเด™เตเด™เตพ, เด†เดชเตเดฒเดฟเด•เตเด•เต‡เดทเตป เดฎเต†เดŸเตเดฐเดฟเด•เตเด•เตเด•เดณเตเดณเตเดณ เดตเดฟเดตเดฟเดง API-เด•เตพ, 1C เดชเต‹เดฒเตเด‚) เดžเด™เตเด™เตพ เดกเดพเดฑเตเดฑ เดถเต‡เด–เดฐเดฟเด•เตเด•เตเดจเตเดจเต.
  • เดŽเดคเตเดฐ เดชเตเดฐเต‹เด—เดฎเดฟเดšเตเดšเต cron, เด‡เดคเต ODS-เตฝ เดกเดพเดฑเตเดฑ เดเด•เต€เด•เดฐเดฃ เดชเตเดฐเด•เตเดฐเดฟเดฏเด•เตพ เด†เดฐเด‚เดญเดฟเด•เตเด•เตเด•เดฏเตเด‚ เด…เดตเดฏเตเดŸเต† เดชเดฐเดฟเดชเดพเดฒเดจเด‚ เดจเดฟเดฐเต€เด•เตเดทเดฟเด•เตเด•เตเด•เดฏเตเด‚ เดšเต†เดฏเตเดฏเตเดจเตเดจเต.

เด…เดŸเตเดคเตเดค เด•เดพเดฒเด‚ เดตเดฐเต†, เดžเด™เตเด™เดณเตเดŸเต† เด†เดตเดถเตเดฏเด™เตเด™เตพ 32 เด•เต‹เดฑเตเด•เดณเตเด‚ 50 GB เดฑเดพเดฎเตเด‚ เด‰เดณเตเดณ เด’เดฐเต เดšเต†เดฑเดฟเดฏ เดธเต†เตผเดตเดฑเดพเดฃเต. เดŽเดฏเตผเดซเตเดฒเต‹เดฏเดฟเตฝ, เด‡เดคเต เดชเตเดฐเดตเตผเดคเตเดคเดฟเด•เตเด•เตเดจเตเดจเต:

  • ะฑะพะปะตะต 200 เดฆเดฟเดตเดธเด‚ (เดฏเดฅเดพเตผเดคเตเดฅเดคเตเดคเดฟเตฝ เดตเตผเด•เตเด•เตเดซเตเดฒเต‹เด•เตพ, เด…เดคเดฟเตฝ เดžเด™เตเด™เตพ เดœเต‹เดฒเดฟเด•เตพ เดธเตเดฑเตเดฑเดซเต เดšเต†เดฏเตเดฏเตเดจเตเดจเต)
  • เด“เดฐเต‹เดจเตเดจเดฟเดฒเตเด‚ เดถเดฐเดพเดถเดฐเดฟ 70 เดœเต‹เดฒเดฟเด•เตพ,
  • เดˆ เดจเดจเตเดฎ เด†เดฐเด‚เดญเดฟเด•เตเด•เตเดจเตเดจเต (เดถเดฐเดพเดถเดฐเดฟเดฏเดฟเดฒเตเด‚) เดฎเดฃเดฟเด•เตเด•เต‚เดฑเดฟเตฝ เด’เดฐเดฟเด•เตเด•เตฝ.

เดžเด™เตเด™เตพ เดŽเด™เตเด™เดจเต† เดตเดฟเดชเตเดฒเต€เด•เดฐเดฟเดšเตเดšเต เดŽเดจเตเดจเดคเดฟเดจเต†เด•เตเด•เตเดฑเดฟเดšเตเดšเต, เดžเดพเตป เดšเตเดตเดŸเต† เดŽเดดเตเดคเดพเด‚, เดŽเดจเตเดจเดพเตฝ เด‡เดชเตเดชเต‹เตพ เดจเดฎเตเดฎเตพ เดชเดฐเดฟเดนเดฐเดฟเด•เตเด•เตเดจเตเดจ เด‰เดฌเตผ-เดชเตเดฐเดถเตเดจเด‚ เดจเดฟเตผเดตเดšเดฟเด•เตเด•เดพเด‚:

เดฎเต‚เดจเตเดจเต เดฏเดฅเดพเตผเดคเตเดฅ SQL เดธเต†เตผเดตเดฑเตเด•เตพ เด‰เดฃเตเดŸเต, เด“เดฐเต‹เดจเตเดจเดฟเดจเตเด‚ 50 เดกเดพเดฑเตเดฑเดพเดฌเต‡เดธเตเด•เตพ เด‰เดฃเตเดŸเต - เดฏเดฅเดพเด•เตเดฐเดฎเด‚ เด’เดฐเต เดชเตเดฐเต‹เดœเด•เตเดฑเตเดฑเดฟเดจเตเดฑเต† เด‰เดฆเดพเดนเดฐเดฃเด™เตเด™เตพ, เด…เดตเดฏเตเด•เตเด•เต เด’เดฐเต‡ เด˜เดŸเดจเดฏเตเดฃเตเดŸเต (เดเดคเดพเดฃเตเดŸเต เดŽเดฒเตเดฒเดพเดฏเดฟเดŸเดคเตเดคเตเด‚, mua-ha-ha), เด…เดคเดพเดฏเดคเต เด“เดฐเต‹เดจเตเดจเดฟเดจเตเด‚ เด’เดฐเต เด“เตผเดกเตผ เดชเดŸเตเดŸเดฟเด•เดฏเตเดฃเตเดŸเต (เดญเดพเด—เตเดฏเดตเดถเดพเตฝ, เด…เดคเดฟเดจเตเดณเตเดณ เด’เดฐเต เดชเดŸเตเดŸเดฟเด• เดเดคเต เดฌเดฟเดธเดฟเดจเดธเตเดธเดฟเดฒเต‡เด•เตเด•เตเด‚ เดชเต‡เดฐเต เดคเดณเตเดณเดพเด‚). เดธเต‡เดตเดจ เดซเต€เตฝเดกเตเด•เตพ (เด‰เดฑเดตเดฟเดŸ เดธเต†เตผเดตเตผ, เด‰เดฑเดตเดฟเดŸ เดกเดพเดฑเตเดฑเดพเดฌเต‡เดธเต, ETL เดŸเดพเดธเตโ€Œเด•เต เดเดกเดฟ) เดšเต‡เตผเดคเตเดคเต เดžเด™เตเด™เตพ เดกเดพเดฑเตเดฑ เดŽเดŸเตเด•เตเด•เตเด•เดฏเตเด‚ เดจเดฟเดทเตโ€Œเด•เดณเด™เตเด•เดฎเดพเดฏเดฟ เดตเต†เตผเดŸเตเดŸเดฟเด•เตเด•เดฏเดฟเดฒเต‡เด•เตเด•เต เดŽเดฑเดฟเดฏเตเด•เดฏเตเด‚ เดšเต†เดฏเตเดฏเตเดจเตเดจเต.

เดชเต‹เด•เดพเด‚!

เดชเตเดฐเดงเดพเดจ เดญเดพเด—เด‚, เดชเตเดฐเดพเดฏเต‹เด—เดฟเด•เด‚ (เด…เดฒเตเดชเด‚ เดธเตˆเดฆเตเดงเดพเดจเตเดคเดฟเด•เดตเตเด‚)

เดŽเดจเตเดคเตเด•เตŠเดฃเตเดŸเดพเดฃเต เดžเด™เตเด™เตพ (เดจเดฟเด™เตเด™เดณเตเด‚)

เดฎเดฐเด™เตเด™เตพ เดตเดฒเตเดคเดพเดฏเดฟเดฐเตเดจเตเดจเดชเตเดชเต‹เตพ เดžเดพเตป เดฒเดณเดฟเดคเดฏเดพเดฏเดฟเดฐเตเดจเตเดจเต SQLเด’เดฐเต เดฑเดทเตเดฏเตป เดฑเต€เดŸเตเดŸเต†เดฏเดฟเดฒเดฟเตฝ -schik, เดžเด™เตเด™เตพเด•เตเด•เต เดฒเดญเตเดฏเดฎเดพเดฏ เดฐเดฃเตเดŸเต เดŸเต‚เดณเตเด•เตพ เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เดžเด™เตเด™เตพ ETL เดชเตเดฐเด•เตเดฐเดฟเดฏเด•เตพ เด…เดฒเตเดฒเต†เด™เตเด•เดฟเตฝ เดกเดพเดฑเตเดฑเดพ เดซเตเดฒเต‹เด•เตพ เดคเดŸเตเดŸเดฟเดชเตเดชเต เดจเดŸเดคเตเดคเดฟ:

  • เด‡เตปเดซเต‹เตผเดฎเดพเดฑเตเดฑเดฟเด•เตเด• เดชเดตเตผ เดธเต†เดจเตเดฑเตผ - เด…เดคเดฟเดจเตเดฑเต‡เดคเดพเดฏ เดนเดพเตผเดกเตโ€Œเดตเต†เดฏเดฑเตเด‚ เดธเตเดตเดจเตเดคเด‚ เดชเดคเดฟเดชเตเดชเตเด‚ เด‰เดณเตเดณ, เด…เด™เตเด™เต‡เดฏเดฑเตเดฑเด‚ เดตเตเดฏเดพเดชเดฟเด•เตเด•เตเดจเตเดจ, เด…เดคเตเดฏเดงเดฟเด•เด‚ เด‰เตฝเดชเตเดชเดพเดฆเดจเด•เตเดทเดฎเดคเดฏเตเดณเตเดณ เด’เดฐเต เดธเดฟเดธเตเดฑเตเดฑเด‚. เด…เดคเดฟเดจเตเดฑเต† เด•เดดเดฟเดตเดฟเดจเตเดฑเต† 1% เดฆเตˆเดตเด‚ เดตเดฟเดฒเด•เตเด•เตเดจเตเดจเตเดตเต†เดจเตเดจเต เดžเดพเตป เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต. เดŽเดจเตเดคเตเด•เตŠเดฃเตเดŸเต? เดถเดฐเดฟ, เด’เดจเตเดจเดพเดฎเดคเดพเดฏเดฟ, เดˆ เด‡เดจเตเดฑเตผเดซเต‡เดธเต, 380-เด•เดณเดฟเตฝ เดŽเดตเดฟเดŸเต†เดฏเต‹, เดฎเดพเดจเดธเดฟเด•เดฎเดพเดฏเดฟ เดžเด™เตเด™เดณเต† เดธเดฎเตเดฎเตผเดฆเตเดฆเดคเตเดคเดฟเดฒเดพเด•เตเด•เดฟ. เดฐเดฃเตเดŸเดพเดฎเดคเดพเดฏเดฟ, เดˆ เด•เต‹เตบเดŸเตเดฐเดพเดชเตเดทเตป เดตเดณเดฐเต† เดซเดพเตปเดธเดฟ เดชเตเดฐเด•เตเดฐเดฟเดฏเด•เตพ, เด‰เด—เตเดฐเดฎเดพเดฏ เด˜เดŸเด•เดคเตเดคเดฟเดจเตเดฑเต† เดชเตเดจเดฐเตเดชเดฏเต‹เด—เด‚, เดฎเดฑเตเดฑเต เดตเดณเดฐเต† เดชเตเดฐเดงเดพเดจเดชเตเดชเต†เดŸเตเดŸ-เดŽเดจเตเดฑเตผเดชเตเดฐเตˆเดธเต-เดคเดจเตเดคเตเดฐเด™เตเด™เตพ เดŽเดจเตเดจเดฟเดตเดฏเตเด•เตเด•เดพเดฏเดฟ เดฐเต‚เดชเด•เตฝเดชเตเดชเดจ เดšเต†เดฏเตเดคเดฟเดŸเตเดŸเตเดณเตเดณเดคเดพเดฃเต. เดชเตเดฐเดคเดฟเดตเตผเดทเด‚ เดŽเดฏเตผเดฌเดธเต เดŽ XNUMX เดจเตเดฑเต† เดšเดฟเดฑเด•เต เดชเต‹เดฒเต† เด‡เดคเดฟเดจเต เดšเดฟเดฒเดตเดพเด•เตเด‚ เดŽเดจเตเดจ เดตเดธเตเดคเตเดคเดฏเต†เด•เตเด•เตเดฑเดฟเดšเตเดšเต เดžเด™เตเด™เตพ เด’เดจเตเดจเตเด‚ เดชเดฑเดฏเดฟเดฒเตเดฒ.

    เดธเต‚เด•เตเดทเดฟเด•เตเด•เตเด•, เด’เดฐเต เดธเตเด•เตเดฐเต€เตปเดทเต‹เดŸเตเดŸเต 30 เดตเดฏเดธเตเดธเดฟเดจเต เดคเดพเดดเต†เดฏเตเดณเตเดณเดตเดฐเต† เด…เตฝเดชเตเดชเด‚ เดตเต‡เดฆเดจเดฟเดชเตเดชเดฟเด•เตเด•เตเด‚

    เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

  • SQL เดธเต†เตผเดตเตผ เด‡เดจเตเดฑเด—เตเดฐเต‡เดทเตป เดธเต†เตผเดตเตผ - เดžเด™เตเด™เดณเตเดŸเต† เด‡เตปเดŸเตเดฐเดพ เดชเตเดฐเต‹เดœเด•เตเดฑเตเดฑเต เดซเตเดฒเต‹เด•เดณเดฟเตฝ เดžเด™เตเด™เตพ เดˆ เดธเด–เดพเดตเดฟเดจเต† เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต. เดถเดฐเดฟ, เดตเดพเดธเตเดคเดตเดคเตเดคเดฟเตฝ: เดžเด™เตเด™เตพ เด‡เดคเดฟเดจเด•เด‚ เดคเดจเตเดจเต† SQL เดธเต†เตผเดตเตผ เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เตเดจเตเดจเต, เดฎเดพเดคเตเดฐเดฎเดฒเตเดฒ เด…เดคเดฟเดจเตเดฑเต† ETL เด‰เดชเด•เดฐเดฃเด™เตเด™เตพ เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เดพเดคเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจเดคเต เดŽเด™เตเด™เดจเต†เดฏเต†เด™เตเด•เดฟเดฒเตเด‚ เดฏเตเด•เตเดคเดฟเดฐเดนเดฟเดคเดฎเดพเดฃเต. เด‡เดคเดฟเดฒเต† เดŽเดฒเตเดฒเดพเด‚ เดจเดฒเตเดฒเดคเดพเดฃเต: เดฐเดฃเตเดŸเต เด‡เดจเตเดฑเตผเดซเต‡เดธเตเด‚ เดฎเดจเต‹เดนเดฐเดฎเดพเดฃเต, เด•เต‚เดŸเดพเดคเต† เดชเตเดฐเต‹เด—เดคเดฟ เดฑเดฟเดชเตเดชเต‹เตผเดŸเตเดŸเตเด•เดณเตเด‚ ... เดŽเดจเตเดจเดพเตฝ เดžเด™เตเด™เตพ เดธเต‹เดซเตเดฑเตเดฑเตโ€Œเดตเต†เดฏเตผ เด‰เตฝเดชเตเดชเดจเตเดจเด™เตเด™เดณเต† เด‡เดทเตเดŸเดชเตเดชเต†เดŸเตเดจเตเดจเดคเต เด…เดคเตเด•เตŠเดฃเตเดŸเดฒเตเดฒ, เด“, เด‡เดคเดฟเดจเดฒเตเดฒ. เด…เดคเดฟเดจเตเดฑเต† เดชเดคเดฟเดชเตเดชเต dtsx (เดธเด‚เดฐเด•เตเดทเดฟเด•เตเด•เตเดฎเตเดชเต‹เตพ เดจเต‹เดกเตเด•เตพ เดทเดซเดฟเตพ เดšเต†เดฏเตเดค XML เด†เดฃเต) เดจเดฎเตเด•เตเด•เต เด•เดดเดฟเดฏเตเด‚, เดŽเดจเตเดจเดพเตฝ เดŽเดจเตเดคเดพเดฃเต เด•เดพเดฐเตเดฏเด‚? เด’เดฐเต เดธเต†เตผเดตเดฑเดฟเตฝ เดจเดฟเดจเตเดจเต เดฎเดฑเตเดฑเตŠเดจเตเดจเดฟเดฒเต‡เด•เตเด•เต เดจเต‚เดฑเตเด•เดฃเด•เตเด•เดฟเดจเต เดŸเต‡เดฌเดฟเดณเตเด•เตพ เดตเดฒเดฟเดšเตเดšเดฟเดŸเตเดจเตเดจ เด’เดฐเต เดŸเดพเดธเตโ€Œเด•เต เดชเดพเด•เตเด•เต‡เดœเต เดŽเด™เตเด™เดจเต† เดจเดฟเตผเดฎเตเดฎเดฟเด•เตเด•เดพเด‚? เด…เดคเต†, เดŽเดจเตเดคเต เดจเต‚เดฑเต, เดจเดฟเด™เตเด™เดณเตเดŸเต† เดšเต‚เดฃเตเดŸเต เดตเดฟเดฐเตฝ เด‡เดฐเตเดชเดคเต เด•เดทเดฃเด™เตเด™เดณเดฟเตฝ เดจเดฟเดจเตเดจเต เดตเต€เดดเตเด‚, เดฎเต—เดธเต เดฌเดŸเตเดŸเดฃเดฟเตฝ เด•เตเดฒเดฟเด•เตเด•เต เดšเต†เดฏเตเดฏเตเด•. เดŽเดจเตเดจเดพเตฝ เด‡เดคเต เดคเต€เตผเดšเตเดšเดฏเดพเดฏเตเด‚ เด•เต‚เดŸเตเดคเตฝ เดซเดพเดทเดจเดพเดฏเดฟ เด•เดพเดฃเดชเตเดชเต†เดŸเตเดจเตเดจเต:

    เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

เดคเต€เตผเดšเตเดšเดฏเดพเดฏเตเด‚ เดžเด™เตเด™เตพ เด…เดคเดฟเดจเตเดณเตเดณ เดตเดดเดฟเด•เตพ เดคเต‡เดŸเดฟ. เด•เต‡เดธเต เดชเต‹เดฒเตเด‚ เดเดคเดพเดฃเตเดŸเต เดธเตเดตเดฏเด‚ เดŽเดดเตเดคเดฟเดฏ SSIS เดชเดพเด•เตเด•เต‡เดœเต เดœเดจเดฑเต‡เดฑเตเดฑเดฑเดฟเดฒเต‡เด•เตเด•เต เดตเดจเตเดจเต ...

โ€ฆเดชเดฟเดจเตเดจเต† เด’เดฐเต เดชเตเดคเดฟเดฏ เดœเต‹เดฒเดฟ เดŽเดจเตเดจเต† เด•เดฃเตเดŸเต†เดคเตเดคเดฟ. เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹ เดŽเดจเตเดจเต† เด…เดคเดฟเตฝ เดฎเดฑเดฟเด•เดŸเดจเตเดจเต.

ETL เดชเตเดฐเต‹เดธเดธเตเดธเต เดตเดฟเดตเดฐเดฃเด™เตเด™เตพ เดฒเดณเดฟเดคเดฎเดพเดฏ เดชเตˆเดคเตเดคเตบ เด•เต‹เดกเดพเดฃเต†เดจเตเดจเต เด•เดฃเตเดŸเต†เดคเตเดคเดฟเดฏเดชเตเดชเต‹เตพ, เดžเดพเตป เดธเดจเตเดคเต‹เดทเดคเตเดคเดฟเดจเดพเดฏเดฟ เดจเตƒเดคเตเดคเด‚ เดšเต†เดฏเตเดคเดฟเดฒเตเดฒ. เด‡เด™เตเด™เดจเต†เดฏเดพเดฃเต เดกเดพเดฑเตเดฑเดพ เดธเตเดŸเตเดฐเต€เดฎเตเด•เตพ เดชเดคเดฟเดชเตเดชเดฟเด•เตเด•เตเด•เดฏเตเด‚ เดตเตเดฏเดคเตเดฏเดธเตเดคเดฎเดพเด•เตเด•เตเด•เดฏเตเด‚ เดšเต†เดฏเตเดคเดคเต, เดจเต‚เดฑเตเด•เดฃเด•เตเด•เดฟเดจเต เดกเดพเดฑเตเดฑเดพเดฌเต‡เดธเตเด•เดณเดฟเตฝ เดจเดฟเดจเตเดจเต เด’เดฐเตŠเดฑเตเดฑ เด˜เดŸเดจเดฏเตเดณเตเดณ เดชเดŸเตเดŸเดฟเด•เด•เตพ เด’เดฐเต เดฒเด•เตเดทเตเดฏเดคเตเดคเดฟเดฒเต‡เด•เตเด•เต เดชเด•เดฐเตเดจเตเดจเดคเต เด’เดจเตเดจเดฐเดฏเต‹ เดฐเดฃเตเดŸเต‹ 13 โ€เดธเตโ€Œเด•เตเดฐเต€เดจเตเด•เดณเดฟเดฒเต† เดชเตˆเดคเตเดคเตบ เด•เต‹เดกเดฟเดจเตเดฑเต† เด•เดพเดฐเตเดฏเดฎเดพเดฏเดฟ เดฎเดพเดฑเดฟ.

เด•เตเดฒเดธเตเดฑเตเดฑเตผ เด•เต‚เดŸเตเดŸเดฟเดšเตเดšเต‡เตผเด•เตเด•เตเดจเตเดจเต

เดจเดฎเตเด•เตเด•เต เดชเต‚เตผเดฃเตเดฃเดฎเดพเดฏเตเด‚ เด•เดฟเดจเตเดฑเตผเด—เดพเตผเดŸเตเดŸเตป เด•เตเดฐเดฎเต€เด•เดฐเดฟเด•เตเด•เดฐเตเดคเต, เดŽเดฏเตผเดซเตเดฒเต‹, เดจเดฟเด™เตเด™เตพ เดคเดฟเดฐเดžเตเดžเต†เดŸเตเดคเตเดค เดกเดพเดฑเตเดฑเดพเดฌเต‡เดธเต, เดธเต†เดฒเดฑเดฟ, เดกเต‹เด•เตเด•เตเด•เดณเดฟเตฝ เดตเดฟเดตเดฐเดฟเดšเตเดšเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจ เดฎเดฑเตเดฑเต เด•เต‡เดธเตเด•เตพ เดŽเดจเตเดจเดฟเดต เด‡เตปเดธเตเดฑเตเดฑเดพเตพ เดšเต†เดฏเตเดฏเตเดจเตเดจเดคเต เดชเต‹เดฒเต†เดฏเตเดณเตเดณ เดตเตเดฏเด•เตเดคเดฎเดพเดฏ เด•เดพเดฐเตเดฏเด™เตเด™เดณเต†เด•เตเด•เตเดฑเดฟเดšเตเดšเต เด‡เดตเดฟเดŸเต† เดธเด‚เดธเดพเดฐเดฟเด•เตเด•เดฐเตเดคเต.

เด…เดคเดฟเดจเดพเตฝ เดจเดฎเตเด•เตเด•เต เด‰เดŸเตป เดคเดจเตเดจเต† เดชเดฐเต€เด•เตเดทเดฃเด™เตเด™เตพ เด†เดฐเด‚เดญเดฟเด•เตเด•เดพเด‚, เดžเดพเตป เดธเตเด•เต†เดšเตเดšเต เดšเต†เดฏเตเดคเต docker-compose.yml เด…เดคเดฟเตฝ:

  • เดฏเดฅเดพเตผเดคเตเดฅเดคเตเดคเดฟเตฝ เด‰เดฏเตผเดคเตเดคเดพเด‚ เดตเดพเดฏเต เดชเตเดฐเดตเดพเดนเด‚: เดทเต†เดกเตเดฏเต‚เดณเตผ, เดตเต†เดฌเตโ€Œเดธเต†เตผเดตเตผ. เดธเต†เดฒเดฑเดฟ เดŸเดพเดธเตโ€Œเด•เตเด•เตเด•เตพ เดจเดฟเดฐเต€เด•เตเดทเดฟเด•เตเด•เดพเตป เดซเตเดฒเดตเดฑเตเด‚ เด…เดตเดฟเดŸเต† เด•เดฑเด™เตเด™เตเด‚ (เด•เดพเดฐเดฃเด‚ เด‡เดคเต เด‡เดคเดฟเดจเด•เด‚ เดคเดจเตเดจเต† เดคเดณเตเดณเดชเตเดชเต†เดŸเตเดŸเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจเต apache/airflow:1.10.10-python3.7, เดชเด•เตเดทเต‡ เดžเด™เตเด™เตพ เด•เดพเดฐเตเดฏเดฎเดพเด•เตเด•เตเดจเตเดจเดฟเดฒเตเดฒ)
  • PostgreSQL เดŽเดจเตเดจเต€, เด…เดคเดฟเตฝ เดŽเดฏเตผเดซเตเดฒเต‹ เด…เดคเดฟเดจเตเดฑเต† เดธเต‡เดตเดจ เดตเดฟเดตเดฐเด™เตเด™เตพ (เดทเต†เดกเตเดฏเต‚เดณเตผ เดกเดพเดฑเตเดฑ, เดŽเด•เตเดธเดฟเด•เตเดฏเต‚เดทเตป เดธเตเดฑเตเดฑเดพเดฑเตเดฑเดฟเดธเตเดฑเตเดฑเดฟเด•เตเดธเต เดฎเตเดคเดฒเดพเดฏเดต) เดŽเดดเตเดคเตเด•เดฏเตเด‚ เดธเต†เดฒเดฑเดฟ เดชเต‚เตผเดคเตเดคเดฟเดฏเดพเด•เตเด•เดฟเดฏ เดœเต‹เดฒเดฟเด•เตพ เด…เดŸเดฏเดพเดณเดชเตเดชเต†เดŸเตเดคเตเดคเตเด•เดฏเตเด‚ เดšเต†เดฏเตเดฏเตเด‚;
  • เดฐเต†เดฆเดฟเดธเต, เด‡เดคเต เดธเต†เดฒเดฑเดฟเดฏเตเดŸเต† เด’เดฐเต เดŸเดพเดธเตโ€Œเด•เต เดฌเตเดฐเต‹เด•เตเด•เดฑเดพเดฏเดฟ เดชเตเดฐเดตเตผเดคเตเดคเดฟเด•เตเด•เตเด‚;
  • เดธเต†เดฒเดฑเดฟ เดคเตŠเดดเดฟเดฒเดพเดณเดฟ, เดŸเดพเดธเตเด•เตเด•เตเด•เดณเตเดŸเต† เดจเต‡เดฐเดฟเดŸเตเดŸเตเดณเตเดณ เดจเดฟเตผเดตเตเดตเดนเดฃเดคเตเดคเดฟเตฝ เดเตผเดชเตเดชเต†เดŸเตเดŸเดฟเดฐเดฟเด•เตเด•เตเด‚.
  • เดซเต‹เตพเดกเดฑเดฟเดฒเต‡เด•เตเด•เต ./dags เดกเดพเด—เตเด•เดณเตเดŸเต† เดตเดฟเดตเดฐเดฃเดคเตเดคเต‹เดŸเตŠเดชเตเดชเด‚ เดžเด™เตเด™เตพ เดžเด™เตเด™เดณเตเดŸเต† เดซเดฏเดฒเตเด•เตพ เดšเต‡เตผเด•เตเด•เตเด‚. เดˆเดšเตเดšเดฏเดฟเตฝ เด…เดต เดŽเดŸเตเด•เตเด•เดชเตเดชเต†เดŸเตเด‚, เด…เดคเดฟเดจเดพเตฝ เด“เดฐเต‹ เดคเตเดฎเตเดฎเดฒเดฟเดจเต เดถเต‡เดทเดตเตเด‚ เดฎเตเดดเตเดตเตป เดธเตเดฑเตเดฑเดพเด•เตเด•เตเด‚ เด•เดฌเดณเดฟเดชเตเดชเดฟเด•เตเด•เต‡เดฃเตเดŸ เด†เดตเดถเตเดฏเดฎเดฟเดฒเตเดฒ.

เดšเดฟเดฒ เดธเตเดฅเดฒเด™เตเด™เดณเดฟเตฝ, เด‰เดฆเดพเดนเดฐเดฃเด™เตเด™เดณเดฟเดฒเต† เด•เต‹เดกเต เดชเต‚เตผเดฃเตเดฃเดฎเดพเดฏเดฟ เด•เดพเดฃเดฟเดšเตเดšเดฟเดŸเตเดŸเดฟเดฒเตเดฒ (เด…เดคเดฟเดจเดพเตฝ เดตเดพเดšเด•เด‚ เด…เดฒเด™เตเด•เต‹เดฒเดชเตเดชเต†เดŸเตเดคเตเดคเดพเดคเดฟเดฐเดฟเด•เตเด•เดพเตป), เดŽเดจเตเดจเดพเตฝ เดŽเดตเดฟเดŸเต†เดฏเต†เด™เตเด•เดฟเดฒเตเด‚ เด…เดคเต เดชเตเดฐเด•เตเดฐเดฟเดฏเดฏเดฟเตฝ เดชเดฐเดฟเดทเตเด•เตเด•เดฐเดฟเดšเตเดšเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจเต. เดชเต‚เตผเดฃเตเดฃเดฎเดพเดฏ เดตเตผเด•เตเด•เดฟเด‚เด—เต เด•เต‹เดกเต เด‰เดฆเดพเดนเดฐเดฃเด™เตเด™เตพ เดฑเดฟเดชเตเดชเต‹เดธเดฟเดฑเตเดฑเดฑเดฟเดฏเดฟเตฝ เด•เดพเดฃเดพเด‚ https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

เด•เตเดฑเดฟเดชเตเดชเตเด•เตพ:

  • เด•เต‹เดฎเตเดชเต‹เดธเดฟเดทเดจเตเดฑเต† เด…เดธเด‚เดฌเตเดฒเดฟเดฏเดฟเตฝ, เดžเดพเตป เดชเตเดฐเดงเดพเดจเดฎเดพเดฏเตเด‚ เด…เดฑเดฟเดฏเดชเตเดชเต†เดŸเตเดจเตเดจ เดšเดฟเดคเตเดฐเดคเตเดคเต† เด†เดถเตเดฐเดฏเดฟเดšเตเดšเต puckel/docker-airflow - เด…เดคเต เดชเดฐเดฟเดถเต‹เดงเดฟเด•เตเด•เตเดจเตเดจเดคเต เด‰เดฑเดชเตเดชเดพเด•เตเด•เตเด•. เด’เดฐเตเดชเด•เตเดทเต‡ เดจเดฟเด™เตเด™เดณเตเดŸเต† เดœเต€เดตเดฟเดคเดคเตเดคเดฟเตฝ เดฎเดฑเตเดฑเตŠเดจเตเดจเตเด‚ เด†เดตเดถเตเดฏเดฎเดฟเดฒเตเดฒ.
  • เดตเดดเดฟ เดฎเดพเดคเตเดฐเดฎเดฒเตเดฒ เดŽเดฒเตเดฒเดพ เดŽเดฏเตผเดซเตเดฒเต‹ เด•เตเดฐเดฎเต€เด•เดฐเดฃเด™เตเด™เดณเตเด‚ เดฒเดญเตเดฏเดฎเดพเดฃเต airflow.cfg, เดฎเดพเดคเตเดฐเดฎเดฒเตเดฒ เดชเดฐเดฟเดธเตเดฅเดฟเดคเดฟ เดตเต‡เดฐเดฟเดฏเดฌเดฟเดณเตเด•เตพ เดตเดดเดฟเดฏเตเด‚ (เดกเต†เดตเดฒเดชเตเดชเตผเดฎเดพเตผเด•เตเด•เต เดจเดจเตเดฆเดฟ), เดžเดพเตป เด…เดคเต เดฆเตเดฐเตเดฆเตเดฆเต‡เดถเตเดฏเดคเตเดคเต‹เดŸเต† เดชเตเดฐเดฏเต‹เดœเดจเดชเตเดชเต†เดŸเตเดคเตเดคเดฟ.
  • เดธเตเดตเดพเดญเดพเดตเดฟเด•เดฎเดพเดฏเตเด‚, เด‡เดคเต เด‰เตฝโ€Œเดชเดพเดฆเดจเดคเตเดคเดฟเดจเต เดคเดฏเตเดฏเดพเดฑเดฒเตเดฒ: เดžเดพเตป เดฎเดจเดƒเดชเต‚เตผเดตเด‚ เด•เดฃเตเดŸเต†เดฏเตโ€Œเดจเดฑเตเด•เดณเดฟเตฝ เดนเตƒเดฆเดฏเดฎเดฟเดŸเดฟเดชเตเดชเต เด‡เดŸเตเดŸเดฟเดฒเตเดฒ, เดธเตเดฐเด•เตเดทเดฏเต†เด•เตเด•เตเดฑเดฟเดšเตเดšเต เดžเดพเตป เดตเดฟเดทเดฎเดฟเดšเตเดšเดฟเดฒเตเดฒ. เดŽเดจเตเดจเดพเตฝ เดžเด™เตเด™เดณเตเดŸเต† เดชเดฐเต€เด•เตเดทเดฃเด•เตเด•เดพเตผเด•เตเด•เต เดเดฑเตเดฑเดตเตเด‚ เด…เดจเตเดฏเต‹เดœเตเดฏเดฎเดพเดฏเดคเต เดžเดพเตป เดšเต†เดฏเตเดคเต.
  • เด…เดคเดฒเตเดฒ:
    • เดกเดพเด—เต เดซเต‹เตพเดกเตผ เดทเต†เดกเตเดฏเต‚เดณเตผเด•เตเด•เตเด‚ เดคเตŠเดดเดฟเดฒเดพเดณเดฟเด•เตพเด•เตเด•เตเด‚ เด†เด•เตเดธเดธเต เดšเต†เดฏเตเดฏเดพเดตเตเดจเตเดจเดคเดพเดฏเดฟเดฐเดฟเด•เตเด•เดฃเด‚.
    • เดŽเดฒเตเดฒเดพ เดฎเต‚เดจเตเดจเดพเด‚ เด•เด•เตเดทเดฟ เดฒเตˆเดฌเตเดฐเดฑเดฟเด•เตพเด•เตเด•เตเด‚ เด‡เดคเต เดฌเดพเดงเด•เดฎเดพเดฃเต - เด…เดตเดฏเต†เดฒเตเดฒเดพเด‚ เด’เดฐเต เดทเต†เดกเตเดฏเต‚เดณเดฑเตเด‚ เดคเตŠเดดเดฟเดฒเดพเดณเดฟเด•เดณเตเด‚ เด‰เดณเตเดณ เดฎเต†เดทเต€เดจเตเด•เดณเดฟเตฝ เด‡เตปเดธเตเดฑเตเดฑเดพเตพ เดšเต†เดฏเตเดคเดฟเดฐเดฟเด•เตเด•เดฃเด‚.

เดถเดฐเดฟ, เด‡เดชเตเดชเต‹เตพ เด‡เดคเต เดฒเดณเดฟเดคเดฎเดพเดฃเต:

$ docker-compose up --scale worker=3

เดŽเดฒเตเดฒเดพเด‚ เด‰เดฏเตผเดจเตเดจเดคเดฟเดจเต เดถเต‡เดทเด‚, เดจเดฟเด™เตเด™เตพเด•เตเด•เต เดตเต†เดฌเต เด‡เดจเตเดฑเตผเดซเต‡เดธเตเด•เตพ เดจเต‹เด•เตเด•เดพเด‚:

เด…เดŸเดฟเดธเตเดฅเดพเดจ เด†เดถเดฏเด™เตเด™เตพ

เดˆ "เดกเดพเด—เตเด•เดณเดฟเตฝ" เดจเดฟเด™เตเด™เตพเด•เตเด•เต เด’เดจเตเดจเตเด‚ เดฎเดจเดธเตเดธเดฟเดฒเดพเดฏเดฟเดฒเตเดฒเต†เด™เตเด•เดฟเตฝ, เด‡เดคเดพ เด’เดฐเต เดšเต†เดฑเดฟเดฏ เดจเดฟเด˜เดฃเตเดŸเต:

  • เดทเต†เดกเตเดฏเต‚เดณเตผ - เดŽเดฏเตผเดซเตเดฒเต‹เดฏเดฟเดฒเต† เดเดฑเตเดฑเดตเตเด‚ เดชเตเดฐเดงเดพเดจเดชเตเดชเต†เดŸเตเดŸ เด…เดฎเตเดฎเดพเดตเตป, เดฑเต‹เดฌเต‹เดŸเตเดŸเตเด•เตพ เด•เด เดฟเดจเดพเดงเตเดตเดพเดจเด‚ เดšเต†เดฏเตเดฏเตเดจเตเดจเตเดตเต†เดจเตเดจเต เดจเดฟเดฏเดจเตเดคเตเดฐเดฟเด•เตเด•เตเดจเตเดจเต, เด’เดฐเต เดตเตเดฏเด•เตเดคเดฟเดฏเดฒเตเดฒ: เดทเต†เดกเตเดฏเต‚เตพ เดจเดฟเดฐเต€เด•เตเดทเดฟเด•เตเด•เตเดจเตเดจเต, เดกเดพเด—เตเด•เตพ เด…เดชเตเดกเต‡เดฑเตเดฑเต เดšเต†เดฏเตเดฏเตเดจเตเดจเต, เดŸเดพเดธเตเด•เตเด•เตเด•เตพ เดธเดฎเดพเดฐเด‚เดญเดฟเด•เตเด•เตเดจเตเดจเต.

    เดชเตŠเดคเตเดตเต‡, เดชเดดเดฏ เดชเดคเดฟเดชเตเดชเตเด•เดณเดฟเตฝ, เด…เดฆเตเดฆเต‡เดนเดคเตเดคเดฟเดจเต เดฎเต†เดฎเตเดฎเดฑเดฟเดฏเดฟเตฝ เดชเตเดฐเดถเตเดจเด™เตเด™เดณเตเดฃเตเดŸเดพเดฏเดฟเดฐเตเดจเตเดจเต (เด‡เดฒเตเดฒ, เด“เตผเดฎเตเดฎเด•เตเด•เตเดฑเดตเดฒเตเดฒ, เดšเต‹เตผเดšเตเดš) เด•เต‚เดŸเดพเดคเต† เดฒเต†เด—เดธเดฟ เดชเดพเดฐเดพเดฎเต€เดฑเตเดฑเตผ เด•เต‹เตบเดซเดฟเด—เดฑเตเด•เดณเดฟเตฝ เดชเต‹เดฒเตเด‚ เดคเตเดŸเตผเดจเตเดจเต. run_duration - เด…เดคเดฟเดจเตเดฑเต† เดชเตเดจเดฐเดพเดฐเด‚เดญเดฟเด•เตเด•เตฝ เด‡เดŸเดตเต‡เดณ. เดŽเดจเตเดจเดพเตฝ เด‡เดชเตเดชเต‹เตพ เดŽเดฒเตเดฒเดพเด‚ เดถเดฐเดฟเดฏเดพเดฃเต.

  • DAG (เด…เดคเดพเดฏเดคเต "เดกเดพเด—เต") - "เดกเดฏเดฑเด•เตเดŸเตเดกเต เด…เดธเตˆเด•เตเดฒเดฟเด•เต เด—เตเดฐเดพเดซเต", เดŽเดจเตเดจเดพเตฝ เด…เดคเตเดคเดฐเดฎเตŠเดฐเต เดจเดฟเตผเดตเดšเดจเด‚ เด•เตเดฑเดšเตเดšเต เด†เดณเตเด•เดณเต‹เดŸเต เดชเดฑเดฏเตเด‚, เดŽเดจเตเดจเดพเตฝ เดตเดพเดธเตเดคเดตเดคเตเดคเดฟเตฝ เด‡เดคเต เดชเดฐเดธเตเดชเดฐเด‚ เด‡เดŸเดชเดดเด•เตเดจเตเดจ เดœเต‹เดฒเดฟเด•เตพเด•เตเด•เดพเดฏเตเดณเตเดณ เด’เดฐเต เด•เดฃเตเดŸเต†เดฏเตเดจเดฑเดพเดฃเต (เดšเตเดตเดŸเต† เด•เดพเดฃเตเด•) เด…เดฒเตเดฒเต†เด™เตเด•เดฟเตฝ SSIS-เดฒเต† เดชเดพเด•เตเด•เต‡เดœเดฟเดจเตเดฑเต†เดฏเตเด‚ เด‡เตปเดซเต‹เตผเดฎเดพเดฑเตเดฑเดฟเด•เตเด•เดฏเดฟเดฒเต† เดตเตผเด•เตเด•เตเดซเตเดฒเต‹เดฏเตเดŸเต†เดฏเตเด‚ เด…เดจเดฒเต‹เด—เต .

    เดกเดพเด—เตเด•เตพเด•เตเด•เต เดชเตเดฑเดฎเต‡, เด‡เดชเตเดชเต‹เดดเตเด‚ เดธเดฌเตเดกเดพเด—เตเด•เตพ เด‰เดฃเตเดŸเดพเด•เดพเด‚, เดชเด•เตเดทเต‡ เดจเดฎเตเด•เตเด•เต เด…เดต เดฒเดญเดฟเด•เตเด•เดฟเดฒเตเดฒ.

  • DAG เดฑเตบ - เด‡เดจเต€เดทเตเดฏเดฒเตˆเดธเต เดšเต†เดฏเตเดค เดกเดพเด—เต, เด…เดคเต เดธเตเดตเดจเตเดคเดฎเดพเดฏเดฟ เดจเดฟเดฏเตเด•เตเดคเดฎเดพเด•เตเด•เดฟเดฏเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจเต execution_date. เด’เดฐเต‡ เดกเดพเด—เดฟเดจเตเดฑเต† เดกเดพเด—เตเดฐเตปเดธเดฟเดจเต เดธเดฎเดพเดจเตเดคเดฐเดฎเดพเดฏเดฟ เดชเตเดฐเดตเตผเดคเตเดคเดฟเด•เตเด•เดพเตป เด•เดดเดฟเดฏเตเด‚ (เดจเดฟเด™เตเด™เตพ เดจเดฟเด™เตเด™เดณเตเดŸเต† เดœเต‹เดฒเดฟเด•เตพ เดจเดฟเดทเตโ€Œเด•เดณเด™เตเด•เดฎเดพเด•เตเด•เดฟเดฏเต†เด™เตเด•เดฟเตฝ, เดคเต€เตผเดšเตเดšเดฏเดพเดฏเตเด‚).
  • เด“เดชเตเดชเดฑเต‡เดฑเตเดฑเตผ เด’เดฐเต เดจเดฟเตผเดฆเตเดฆเดฟเดทเตโ€ŒเดŸ เดชเตเดฐเดตเตผเดคเตเดคเดจเด‚ เดจเดŸเดคเตเดคเตเดจเตเดจเดคเดฟเดจเต เด‰เดคเตเดคเดฐเดตเดพเดฆเดฟเด•เดณเดพเดฏ เด•เต‹เดกเดฟเดจเตเดฑเต† เด•เดทเดฃเด™เตเด™เดณเดพเดฃเต. เดฎเต‚เดจเตเดจเต เดคเดฐเด‚ เด“เดชเตเดชเดฑเต‡เดฑเตเดฑเตผเดฎเดพเดฐเตเดฃเตเดŸเต:
    • เดจเดŸเดชเดŸเดฟเดจเดฎเตเดฎเตเดŸเต† เดชเตเดฐเดฟเดฏเดชเตเดชเต†เดŸเตเดŸ เดชเต‹เดฒเต† PythonOperator, เดเดคเต (เดธเดพเดงเตเดตเดพเดฏ) เดชเตˆเดคเตเดคเตบ เด•เต‹เดกเตเด‚ เดŽเด•เตเดธเดฟเด•เตเดฏเต‚เดŸเตเดŸเต เดšเต†เดฏเตเดฏเดพเตป เด•เดดเดฟเดฏเตเด‚;
    • เด•เตˆเดฎเดพเดฑเตเดฑเด‚ เดšเต†เดฏเตเดฏเตเด•, เดเดคเต เดธเตเดฅเดฒเดคเตเดคเตเดจเดฟเดจเตเดจเตเด‚ เดฎเดฑเตเดฑเตŠเดฐเดฟเดŸเดคเตเดคเต‡เด•เตเด•เต เดกเดพเดฑเตเดฑ เด•เตŠเดฃเตเดŸเตเดชเต‹เด•เตเดจเตเดจเต, เดชเดฑเดฏเตเด•, MsSqlToHiveTransfer;
    • เดธเต†เตปเดธเตผ เดฎเดฑเตเดตเดถเดคเตเดคเต, เด’เดฐเต เด‡เดตเดจเตเดฑเต เดธเด‚เดญเดตเดฟเด•เตเด•เตเดจเตเดจเดคเต เดตเดฐเต† เดกเดพเด—เดฟเดจเตเดฑเต† เดคเตเดŸเตผเดจเตเดจเตเดณเตเดณ เดจเดฟเตผเดตเตเดตเดนเดฃเดคเตเดคเต† เดชเตเดฐเดคเดฟเด•เดฐเดฟเด•เตเด•เดพเดจเต‹ เดฎเดจเตเดฆเด—เดคเดฟเดฏเดฟเดฒเดพเด•เตเด•เดพเดจเต‹ เด‡เดคเต เดจเดฟเด™เตเด™เดณเต† เด…เดจเตเดตเดฆเดฟเด•เตเด•เตเด‚. HttpSensor เดจเดฟเตผเดฆเตเดฆเดฟเดทเตเดŸ เด…เดตเดธเดพเดจ เดชเต‹เดฏเดฟเดจเตเดฑเต เดตเดฒเดฟเด•เตเด•เดพเตป เด•เดดเดฟเดฏเตเด‚, เด†เดตเดถเตเดฏเดฎเตเดณเตเดณ เดชเตเดฐเดคเดฟเด•เดฐเดฃเด‚ เด•เดพเดคเตเดคเดฟเดฐเดฟเด•เตเด•เตเดฎเตเดชเต‹เตพ, เด•เตˆเดฎเดพเดฑเตเดฑเด‚ เด†เดฐเด‚เดญเดฟเด•เตเด•เตเด• GoogleCloudStorageToS3Operator. เด…เดจเตเดตเต‡เดทเดฃเดพเดคเตเดฎเด• เดฎเดจเดธเตเดธเต เดšเต‹เดฆเดฟเด•เตเด•เตเด‚: "เดŽเดจเตเดคเตเด•เตŠเดฃเตเดŸเต? เดŽเดฒเตเดฒเดพเดคเตเดคเดฟเดจเตเดฎเตเดชเดฐเดฟ, เดจเดฟเด™เตเด™เตพเด•เตเด•เต เด“เดชเตเดชเดฑเต‡เดฑเตเดฑเดฑเดฟเตฝ เดคเดจเตเดจเต† เด†เดตเตผเดคเตเดคเดจเด™เตเด™เตพ เดšเต†เดฏเตเดฏเดพเตป เด•เดดเดฟเดฏเตเด‚! เดคเตเดŸเตผเดจเตเดจเต, เดธเดธเตเดชเต†เตปเดกเต เดšเต†เดฏเตเดค เด“เดชเตเดชเดฑเต‡เดฑเตเดฑเตผเดฎเดพเดฐเตเดฎเดพเดฏเตเดณเตเดณ เดŸเดพเดธเตเด•เตเด•เตเด•เดณเตเดŸเต† เดชเต‚เตพ เดคเดŸเดธเตเดธเดชเตเดชเต†เดŸเตเดคเตเดคเดพเดคเดฟเดฐเดฟเด•เตเด•เดพเตป. เด…เดŸเตเดคเตเดค เดถเตเดฐเดฎเดคเตเดคเดฟเดจเต เดฎเตเดฎเตเดชเต เดธเต†เตปเดธเตผ เด†เดฐเด‚เดญเดฟเด•เตเด•เตเด•เดฏเตเด‚ เดชเดฐเดฟเดถเต‹เดงเดฟเด•เตเด•เตเด•เดฏเตเด‚ เดฎเดฐเดฟเด•เตเด•เตเด•เดฏเตเด‚ เดšเต†เดฏเตเดฏเตเดจเตเดจเต.
  • เดŸเดพเดธเตเด•เต - เดชเตเดฐเด–เตเดฏเดพเดชเดฟเดค เด“เดชเตเดชเดฑเต‡เดฑเตเดฑเตผเดฎเดพเดฐเต†, เดคเดฐเด‚ เดชเดฐเดฟเด—เดฃเดฟเด•เตเด•เดพเดคเต†, เดกเดพเด—เดฟเตฝ เด˜เดŸเดฟเดชเตเดชเดฟเดšเตเดšเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจเดตเดฐเต† เดšเตเดฎเดคเดฒเดฏเตเดŸเต† เดฑเดพเด™เตเด•เดฟเดฒเต‡เด•เตเด•เต เดธเตเดฅเดพเดจเด•เตเด•เดฏเดฑเตเดฑเด‚ เดจเตฝเด•เตเดจเตเดจเต.
  • เดŸเดพเดธเตเด•เต เด‰เดฆเดพเดนเดฐเดฃเด‚ - เดชเต†เตผเดซเต‹เดฎเตผ-เดตเตผเด•เตเด•เตผเดฎเดพเดฐเตเดฎเดพเดฏเดฟ เดฏเตเดฆเตเดงเดคเตเดคเดฟเดฒเต‡เด•เตเด•เต เดŸเดพเดธเตโ€Œเด•เตเด•เตเด•เตพ เด…เดฏเดฏเตโ€Œเด•เตเด•เต‡เดฃเตเดŸ เดธเดฎเดฏเดฎเดพเดฃเดฟเดคเต†เดจเตเดจเต เดœเดจเดฑเตฝ เดชเตเดฒเดพเดจเตผ เดคเต€เดฐเตเดฎเดพเดจเดฟเดšเตเดšเดชเตเดชเต‹เตพ (เดžเด™เตเด™เตพ เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เตเด•เดฏเดพเดฃเต†เด™เตเด•เดฟเตฝ, เดธเตเดฅเดฒเดคเตเดคเตเดคเดจเตเดจเต†. LocalExecutor เด…เดฒเตเดฒเต†เด™เตเด•เดฟเตฝ เด•เต‡เดธเดฟเตฝ เด’เดฐเต เดฑเดฟเดฎเต‹เดŸเตเดŸเต เดจเต‹เดกเดฟเดฒเต‡เด•เตเด•เต CeleryExecutor), เด…เดคเต เด…เดตเตผเด•เตเด•เต เด’เดฐเต เดธเดจเตเดฆเตผเดญเด‚ เดจเตฝเด•เตเดจเตเดจเต (เด…เดคเดพเดฏเดคเต, เดตเต‡เดฐเดฟเดฏเดฌเดฟเดณเตเด•เดณเตเดŸเต† เด’เดฐเต เด•เต‚เดŸเตเดŸเด‚ - เดŽเด•เตเดธเดฟเด•เตเดฏเต‚เดทเตป เดชเดพเดฐเดพเดฎเต€เดฑเตเดฑเดฑเตเด•เตพ), เด•เดฎเดพเตปเดกเต เด…เดฒเตเดฒเต†เด™เตเด•เดฟเตฝ เด…เดจเตเดตเต‡เดทเดฃ เดŸเต†เด‚เดชเตเดฒเต‡เดฑเตเดฑเตเด•เตพ เดตเดฟเด•เดธเดฟเดชเตเดชเดฟเด•เตเด•เตเด•เดฏเตเด‚ เด…เดตเดฏเต† เดชเต‚เตพ เดšเต†เดฏเตเดฏเตเด•เดฏเตเด‚ เดšเต†เดฏเตเดฏเตเดจเตเดจเต.

เดžเด™เตเด™เตพ เดŸเดพเดธเตเด•เตเด•เตเด•เตพ เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเดจเตเดจเต

เด†เดฆเตเดฏเด‚, เดจเดฎเตเดฎเตเดŸเต† เดกเต‹เด—เดฟเดจเตเดฑเต† เดชเตŠเดคเตเดตเดพเดฏ เดธเตเด•เต€เดฎเดฟเดจเตเดฑเต† เดฐเต‚เดชเดฐเต‡เด– เดจเต‹เด•เตเด•เดพเด‚, เดคเตเดŸเตผเดจเตเดจเต เดžเด™เตเด™เตพ เด•เต‚เดŸเตเดคเตฝ เด•เต‚เดŸเตเดคเตฝ เดตเดฟเดถเดฆเดพเด‚เดถเด™เตเด™เดณเดฟเดฒเต‡เด•เตเด•เต เด•เดŸเด•เตเด•เตเด‚, เด•เดพเดฐเดฃเด‚ เดžเด™เตเด™เตพ เดšเดฟเดฒ เดจเดฟเดธเตเดธเดพเดฐ เดชเดฐเดฟเดนเดพเดฐเด™เตเด™เตพ เดชเตเดฐเดฏเต‹เด—เดฟเด•เตเด•เตเดจเตเดจเต.

เด…เดคเดฟเดจเดพเตฝ, เด…เดคเดฟเดจเตเดฑเต† เดฒเดณเดฟเดคเดฎเดพเดฏ เดฐเต‚เดชเดคเตเดคเดฟเตฝ, เด…เดคเตเดคเดฐเดฎเตŠเดฐเต เดกเดพเด—เต เด‡เดคเตเดชเต‹เดฒเต† เด•เดพเดฃเดชเตเดชเต†เดŸเตเด‚:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

เดจเดฎเตเด•เตเด•เต เด…เดคเต เด•เดฃเตเดŸเตเดชเดฟเดŸเดฟเด•เตเด•เดพเด‚:

  • เด†เดฆเตเดฏเด‚, เดžเด™เตเด™เตพ เด†เดตเดถเตเดฏเดฎเดพเดฏ เดฒเดฟเดฌเตเด•เตพ เด‡เดฑเด•เตเด•เตเดฎเดคเดฟ เดšเต†เดฏเตเดฏเตเดจเตเดจเต เดตเต‡เดฑเต† เดŽเดจเตเดคเต†เด™เตเด•เดฟเดฒเตเด‚;
  • sql_server_ds เด†เดฃเต List[namedtuple[str, str]] เดŽเดฏเตผเดซเตเดฒเต‹ เด•เดฃเด•เตเดทเดจเตเด•เดณเดฟเตฝ เดจเดฟเดจเตเดจเตเดณเตเดณ เด•เดฃเด•เตเดทเดจเตเด•เดณเตเดŸเต† เดชเต‡เดฐเตเด•เดณเตเด‚ เดžเด™เตเด™เดณเตเดŸเต† เดชเตเดฒเต‡เดฑเตเดฑเต เดŽเดŸเตเด•เตเด•เตเดจเตเดจ เดกเดพเดฑเตเดฑเดพเดฌเต‡เดธเตเด•เดณเตเด‚;
  • dag - เดžเด™เตเด™เดณเตเดŸเต† เดกเดพเด—เดฟเดจเตเดฑเต† เด…เดฑเดฟเดฏเดฟเดชเตเดชเต, เด…เดคเต เดจเดฟเตผเดฌเดจเตเดงเดฎเดพเดฏเตเด‚ เด‰เดฃเตเดŸเดพเดฏเดฟเดฐเดฟเด•เตเด•เดฃเด‚ globals(), เด…เดฒเตเดฒเต†เด™เตเด•เดฟเตฝ เดŽเดฏเตผเดซเตเดฒเต‹ เด…เดคเต เด•เดฃเตเดŸเต†เดคเตเดคเตเด•เดฏเดฟเดฒเตเดฒ. เดกเต‹เด—เตเด‚ เดชเดฑเดฏเต‡เดฃเตเดŸเดคเตเดฃเตเดŸเต:
    • เด…เดตเดจเตเดฑเต† เดชเต‡เดฐเต†เดจเตเดคเดพเดฃเต? orders - เดˆ เดชเต‡เดฐเต เดชเดฟเดจเตเดจเต€เดŸเต เดตเต†เดฌเต เด‡เดจเตเดฑเตผเดซเต‡เดธเดฟเตฝ เดฆเตƒเดถเตเดฏเดฎเดพเด•เตเด‚,
    • เดœเต‚เดฒเตˆ เดŽเดŸเตเดŸเดพเด‚ เดคเต€เดฏเดคเดฟ เด…เตผเดฆเตเดงเดฐเดพเดคเตเดฐเดฟ เดฎเตเดคเตฝ เดคเดพเตป เดœเต‹เดฒเดฟ เดšเต†เดฏเตเดฏเตเดฎเต†เดจเตเดจเต,
    • เด‡เดคเต เดเด•เดฆเต‡เดถเด‚ เด“เดฐเต‹ 6 เดฎเดฃเดฟเด•เตเด•เต‚เดฑเดฟเดฒเตเด‚ เดชเตเดฐเดตเตผเดคเตเดคเดฟเด•เตเด•เดฃเด‚ (เด‡เดตเดฟเดŸเต† เดฌเตเดฆเตเดงเดฟเดฎเตเดŸเตเดŸเตเดณเตเดณ เด†เดณเตเด•เตพเด•เตเด•เต เดชเด•เดฐเด‚ timedelta() เดธเตเดตเต€เด•เดพเดฐเตเดฏเดฎเดพเดฏ cron-เดฒเตˆเตป 0 0 0/6 ? * * *, เด•เตเดฑเดžเตเดž เดคเดฃเตเดชเตเดชเดฟเดจเต - เดชเต‹เดฒเต†เดฏเตเดณเตเดณ เด’เดฐเต เดชเดฆเดชเตเดฐเดฏเต‹เด—เด‚ @daily);
  • workflow() เดชเตเดฐเดงเดพเดจ เดœเต‹เดฒเดฟ เดšเต†เดฏเตเดฏเตเด‚, เดชเด•เตเดทเต‡ เด‡เดชเตเดชเต‹เตพ เด…เดฒเตเดฒ. เด‡เดชเตเดชเต‹เตพ, เดžเด™เตเด™เตพ เดžเด™เตเด™เดณเตเดŸเต† เดธเดจเตเดฆเตผเดญเด‚ เดฒเต‹เด—เดฟเดฒเต‡เด•เตเด•เต เดกเด‚เดชเต เดšเต†เดฏเตเดฏเตเด‚.
  • เด‡เดชเตเดชเต‹เตพ เดŸเดพเดธเตเด•เตเด•เตเด•เตพ เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเดจเตเดณเตเดณ เดฒเดณเดฟเดคเดฎเดพเดฏ เดฎเดพเดœเดฟเด•เต:
    • เดžเด™เตเด™เตพ เดžเด™เตเด™เดณเตเดŸเต† เด‰เดฑเดตเดฟเดŸเด™เตเด™เดณเดฟเดฒเต‚เดŸเต† เด“เดŸเตเดจเตเดจเต;
    • เดธเดฎเดพเดฐเด‚เดญเดฟเด•เตเด•เตเด• PythonOperator, เด…เดคเต เดจเดฎเตเดฎเตเดŸเต† เดกเดฎเตเดฎเดฟเดฏเต† เดจเดฟเตผเดตเตเดตเดนเดฟเด•เตเด•เตเด‚ workflow(). เดŸเดพเดธเตโ€Œเด•เตเด•เดฟเดจเตเดฑเต† เด’เดฐเต เด…เดฆเตเดตเดฟเดคเต€เดฏ (เดกเดพเด—เดฟเดจเตเดณเตเดณเดฟเตฝ) เดชเต‡เดฐเต เดตเตเดฏเด•เตเดคเดฎเดพเด•เตเด•เดพเดจเตเด‚ เดกเดพเด—เต เดคเดจเตเดจเต† เด•เต†เดŸเตเดŸเดพเดจเตเด‚ เดฎเดฑเด•เตเด•เดฐเตเดคเต. เดชเดคเดพเด• provide_context เด…เดคเดพเด•เดŸเตเดŸเต†, เดซเด‚เด—เตเดทเดจเดฟเดฒเต‡เด•เตเด•เต เด…เดงเดฟเด• เด†เตผเด—เตเดฏเตเดฎเต†เดจเตเดฑเตเด•เตพ เดชเด•เดฐเตเด‚, เด…เดคเต เดžเด™เตเด™เตพ เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เดถเตเดฐเดฆเตเดงเดพเดชเต‚เตผเดตเตเดตเด‚ เดถเต‡เด–เดฐเดฟเด•เตเด•เตเด‚ **context.

เดคเตฝเด•เตเด•เดพเดฒเด‚ เด…เดคเตเดฐเดฎเดพเดคเตเดฐเด‚. เดžเด™เตเด™เตพเด•เตเด•เต เดฒเดญเดฟเดšเตเดšเดคเต:

  • เดตเต†เดฌเต เด‡เดจเตเดฑเตผเดซเต‡เดธเดฟเตฝ เดชเตเดคเดฟเดฏ เดกเดพเด—เต,
  • เด’เดจเตเดจเดฐเดจเต‚เดฑเต เดœเต‹เดฒเดฟเด•เตพ เดธเดฎเดพเดจเตเดคเดฐเดฎเดพเดฏเดฟ เดจเดŸเดชเตเดชเดฟเดฒเดพเด•เตเด•เตเด‚ (เดŽเดฏเตผเดซเตเดฒเต‹, เดธเต†เดฒเดฑเดฟ เด•เตเดฐเดฎเต€เด•เดฐเดฃเด™เตเด™เตพ, เดธเต†เตผเดตเตผ เดถเต‡เดทเดฟ เดŽเดจเตเดจเดฟเดต เด…เดจเตเดตเดฆเดฟเด•เตเด•เตเด•เดฏเดพเดฃเต†เด™เตเด•เดฟเตฝ).

เดถเดฐเดฟ, เดเด•เดฆเต‡เดถเด‚ เดฎเดจเดธเตเดธเดฟเดฒเดพเดฏเดฟ.

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต
เด†เดฐเดพเดฃเต เดกเดฟเดชเตปเดกเตปเดธเดฟเด•เตพ เด‡เตปเดธเตเดฑเตเดฑเดพเตพ เดšเต†เดฏเตเดฏเตเด•?

เด‡เดคเต เดฎเตเดดเตเดตเตป เดฒเดณเดฟเดคเดฎเดพเด•เตเด•เดพเตป, เดžเดพเตป เดธเตเด•เตเดฐเต‚ เดšเต†เดฏเตเดคเต docker-compose.yml เดชเตเดฐเต‹เดธเดธเตเดธเดฟเด‚เด—เต requirements.txt เดŽเดฒเตเดฒเดพ เดจเต‹เดกเตเด•เดณเดฟเดฒเตเด‚.

เด‡เดชเตเดชเต‹เตพ เด‡เดคเดพ:

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

เดทเต†เดกเตเดฏเต‚เดณเตผ เดชเตเดฐเต‹เดธเดธเตเดธเต เดšเต†เดฏเตเดฏเตเดจเตเดจ เดŸเดพเดธเตโ€Œเด•เต เด‡เตปเดธเตโ€Œเดฑเตเดฑเตปเดธเตเด•เดณเดพเดฃเต เด—เตเดฐเต‡ เดธเตโ€Œเด•เตเดตเดฏเดฑเตเด•เตพ.

เดžเด™เตเด™เตพ เด…เตฝเดชเตเดชเด‚ เด•เดพเดคเตเดคเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจเต, เดœเต‹เดฒเดฟเด•เตเด•เดพเตผ เดœเต‹เดฒเดฟเด•เตพ เดธเตเดจเดพเดชเตเดชเต เดšเต†เดฏเตเดฏเตเดจเตเดจเต:

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

เดชเดšเตเดšเดฏเดพเดฏเดตเตผ เดคเต€เตผเดšเตเดšเดฏเดพเดฏเตเด‚ เด…เดตเดฐเตเดŸเต† เดœเต‹เดฒเดฟ เดตเดฟเดœเดฏเด•เดฐเดฎเดพเดฏเดฟ เดชเต‚เตผเดคเตเดคเดฟเดฏเดพเด•เตเด•เดฟ. เดšเตเดตเดชเตเดชเต เดตเดณเดฐเต† เดตเดฟเดœเดฏเด•เดฐเดฎเดฒเตเดฒ.

เดตเดดเดฟเดฏเดฟเตฝ, เดžเด™เตเด™เดณเตเดŸเต† เด‰เตฝเดชเตเดชเดจเตเดจเดคเตเดคเดฟเตฝ เด’เดฐเต เดซเต‹เตพเดกเดฑเตเด‚ เด‡เดฒเตเดฒ ./dags, เดฎเต†เดทเต€เดจเตเด•เตพเด•เตเด•เดฟเดŸเดฏเดฟเตฝ เดธเดฟเตปเด•เตเดฐเตŠเดฃเตˆเดธเต‡เดทเตป เด‡เดฒเตเดฒ - เดŽเดฒเตเดฒเดพ เดกเดพเด—เตเด•เดณเตเด‚ เด•เดฟเดŸเด•เตเด•เตเดจเตเดจเต git เดžเด™เตเด™เดณเตเดŸเต† Gitlab-เตฝ, เด’เดชเตเดชเด‚ Gitlab CI, เดฒเดฏเดฟเดชเตเดชเดฟเด•เตเด•เตเดฎเตเดชเต‹เตพ เดฎเต†เดทเต€เดจเตเด•เดณเดฟเดฒเต‡เด•เตเด•เต เด…เดชเตโ€Œเดกเต‡เดฑเตเดฑเตเด•เตพ เดตเดฟเดคเดฐเดฃเด‚ เดšเต†เดฏเตเดฏเตเดจเตเดจเต master.

เดชเตเดทเตเดชเดคเตเดคเต†เด•เตเด•เตเดฑเดฟเดšเตเดšเต เด•เตเดฑเดšเตเดšเต

เดคเตŠเดดเดฟเดฒเดพเดณเดฟเด•เตพ เดจเดฎเตเดฎเตเดŸเต† เดชเดธเดฟเดซเดฏเดฑเตเด•เตพ เดคเดฒเตเดฒเดฟเด•เตเด•เตŠเดฒเตเดฒเตเดฎเตเดชเต‹เตพ, เดจเดฎเตเด•เตเด•เต เดŽเดจเตเดคเต†เด™เตเด•เดฟเดฒเตเด‚ เด•เดพเดฃเดฟเด•เตเด•เดพเตป เด•เดดเดฟเดฏเตเดจเตเดจ เดฎเดฑเตเดฑเตŠเดฐเต เด‰เดชเด•เดฐเดฃเด‚ เด“เตผเด•เตเด•เดพเด‚ - เดชเตเดทเตเดชเด‚.

เดตเตผเด•เตเด•เตผ เดจเต‹เดกเตเด•เดณเต†เด•เตเด•เตเดฑเดฟเดšเตเดšเตเดณเตเดณ เดธเด‚เด—เตเดฐเดน เดตเดฟเดตเดฐเด™เตเด™เดณเตเดณเตเดณ เด†เดฆเตเดฏ เดชเต‡เดœเต:

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

เดœเต‹เดฒเดฟเด•เตเด•เต เดชเต‹เดฏ เดŸเดพเดธเตเด•เตเด•เตเด•เดณเตเดณเตเดณ เดเดฑเตเดฑเดตเตเด‚ เดคเต€เดตเตเดฐเดฎเดพเดฏ เดชเต‡เดœเต:

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

เดžเด™เตเด™เดณเตเดŸเต† เดฌเตเดฐเต‹เด•เตเด•เดฑเตเดŸเต† เดชเดฆเดตเดฟเดฏเตเดณเตเดณ เดเดฑเตเดฑเดตเตเด‚ เดตเดฟเดฐเดธเดฎเดพเดฏ เดชเต‡เดœเต:

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

เดŸเดพเดธเตโ€Œเด•เต เดธเตเดฑเตเดฑเดพเดฑเตเดฑเดธเต เด—เตเดฐเดพเดซเตเด•เดณเตเด‚ เด…เดตเดฏเตเดŸเต† เดจเดฟเตผเดตเตเดตเดนเดฃ เดธเดฎเดฏเดตเตเดฎเดพเดฃเต เดเดฑเตเดฑเดตเตเด‚ เดคเดฟเดณเด•เตเด•เดฎเตเดณเตเดณ เดชเต‡เดœเต:

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

เดžเด™เตเด™เตพ เด…เดฃเตเดŸเตผเดฒเต‹เดกเดกเต เดฒเต‹เดกเต เดšเต†เดฏเตเดฏเตเดจเตเดจเต

เด…เดคเดฟเดจเดพเตฝ, เดŽเดฒเตเดฒเดพ เดœเต‹เดฒเดฟเด•เดณเตเด‚ เดชเต‚เตผเดคเตเดคเดฟเดฏเดพเดฏเดฟ, เดจเดฟเด™เตเด™เตพเด•เตเด•เต เดชเดฐเดฟเด•เตเด•เต‡เดฑเตเดฑเดตเดฐเต† เด•เตŠเดฃเตเดŸเตเดชเต‹เด•เดพเตป เด•เดดเดฟเดฏเตเด‚.

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

เด•เต‚เดŸเดพเดคเต† เดจเดฟเดฐเดตเดงเดฟ เดชเต‡เตผเด•เตเด•เต เดชเดฐเดฟเด•เตเด•เต‡เดฑเตเดฑเต - เด’เดฐเต เด•เดพเดฐเดฃเดคเตเดคเดพเดฒเต‹ เดฎเดฑเตเดฑเตŠเดฐเต เด•เดพเดฐเดฃเดคเตเดคเดพเดฒเต‹. เดŽเดฏเตผเดซเตเดฒเต‹เดฏเตเดŸเต† เดถเดฐเดฟเดฏเดพเดฏ เด‰เดชเดฏเต‹เด—เดคเตเดคเดฟเดจเตเดฑเต† เด•เดพเดฐเตเดฏเดคเตเดคเดฟเตฝ, เดกเดพเดฑเตเดฑ เดคเต€เตผเดšเตเดšเดฏเดพเดฏเตเด‚ เดŽเดคเตเดคเดฟเดฏเดฟเดŸเตเดŸเดฟเดฒเตเดฒเต†เดจเตเดจเต เด‡เดคเต‡ เดธเตเด•เตเดตเดฏเดฑเตเด•เตพ เดธเต‚เดšเดฟเดชเตเดชเดฟเด•เตเด•เตเดจเตเดจเต.

เดจเดฟเด™เตเด™เตพ เดฒเต‹เด—เต เด•เดพเดฃเตเด•เดฏเตเด‚ เดตเต€เดฃเตเดชเต‹เดฏ เดŸเดพเดธเตโ€Œเด•เต เดธเดจเตเดฆเตผเดญเด™เตเด™เตพ เดชเตเดจเดฐเดพเดฐเด‚เดญเดฟเด•เตเด•เตเด•เดฏเตเด‚ เดตเต‡เดฃเด‚.

เดเดคเต†เด™เตเด•เดฟเดฒเตเด‚ เดšเดคเตเดฐเดคเตเดคเดฟเตฝ เด•เตเดฒเดฟเด•เตเด•เตเดšเต†เดฏเตเดฏเตเดจเตเดจเดคเดฟเดฒเต‚เดŸเต†, เดžเด™เตเด™เตพเด•เตเด•เต เดฒเดญเตเดฏเดฎเดพเดฏ เดชเตเดฐเดตเตผเดคเตเดคเดจเด™เตเด™เตพ เดžเด™เตเด™เตพ เด•เดพเดฃเตเด‚:

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

เดจเดฟเด™เตเด™เตพเด•เตเด•เต เดŽเดŸเตเดคเตเดคเต เดตเต€เดฃเดคเต เด•เตเดฒเดฟเดฏเตผ เดšเต†เดฏเตเดฏเดพเด‚. เด…เดคเดพเดฏเดคเต, เด…เดตเดฟเดŸเต† เดŽเดจเตเดคเต†เด™เตเด•เดฟเดฒเตเด‚ เดชเดฐเดพเดœเดฏเดชเตเดชเต†เดŸเตเดŸเตเดตเต†เดจเตเดจเต เดžเด™เตเด™เตพ เดฎเดฑเด•เตเด•เตเดจเตเดจเต, เด…เดคเต‡ เด‡เตปเดธเตโ€Œเดฑเตเดฑเตปเดธเต เดŸเดพเดธเตโ€Œเด•เต เดทเต†เดกเตเดฏเต‚เดณเดฑเดฟเดฒเต‡เด•เตเด•เต เดชเต‹เด•เตเด‚.

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

เดŽเดฒเตเดฒเดพ เดšเตเดตเดจเตเดจ เดšเดคเตเดฐเด™เตเด™เดณเตเดฎเตเดณเตเดณ เดฎเต—เดธเต เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เด‡เดคเต เดšเต†เดฏเตเดฏเตเดจเตเดจเดคเต เดตเดณเดฐเต† เดฎเดพเดจเตเดทเดฟเด•เดฎเดฒเตเดฒเต†เดจเตเดจเต เดตเตเดฏเด•เตเดคเดฎเดพเดฃเต - เด‡เดคเต เดŽเดฏเตผเดซเตเดฒเต‹เดฏเดฟเตฝ เดจเดฟเดจเตเดจเต เดžเด™เตเด™เตพ เดชเตเดฐเดคเต€เด•เตเดทเดฟเด•เตเด•เตเดจเตเดจเดฟเดฒเตเดฒ. เดธเตเดตเดพเดญเดพเดตเดฟเด•เดฎเดพเดฏเตเด‚, เดจเดฎเตเด•เตเด•เต เด•เต‚เดŸเตเดŸ เดจเดถเต€เด•เดฐเดฃ เด†เดฏเตเดงเด™เตเด™เดณเตเดฃเตเดŸเต: Browse/Task Instances

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

เดจเดฎเตเด•เตเด•เต เดŽเดฒเตเดฒเดพเด‚ เด’เดฑเตเดฑเดฏเดŸเดฟเด•เตเด•เต เดคเดฟเดฐเดžเตเดžเต†เดŸเตเดคเตเดคเต เดชเต‚เดœเตเดฏเดคเตเดคเดฟเดฒเต‡เด•เตเด•เต เดชเตเดจเดƒเดธเดœเตเดœเดฎเดพเด•เตเด•เดพเด‚, เดถเดฐเดฟเดฏเดพเดฏ เด‡เดจเดคเตเดคเดฟเตฝ เด•เตเดฒเดฟเด•เตเด•เตเดšเต†เดฏเตเดฏเตเด•:

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

เดตเตƒเดคเตเดคเดฟเดฏเดพเด•เตเด•เดฟเดฏ เดถเต‡เดทเด‚, เดžเด™เตเด™เดณเตเดŸเต† เดŸเดพเด•เตเดธเดฟเด•เตพ เด‡เดคเตเดชเต‹เดฒเต† เด•เดพเดฃเดชเตเดชเต†เดŸเตเดจเตเดจเต (เด…เดตเตผ เดทเต†เดกเตเดฏเต‚เดณเตผ เดทเต†เดกเตเดฏเต‚เตพ เดšเต†เดฏเตเดฏเตเดจเตเดจเดคเดฟเดจเดพเดฏเดฟ เด•เดพเดคเตเดคเดฟเดฐเดฟเด•เตเด•เตเด•เดฏเดพเดฃเต):

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

เด•เดฃเด•เตเดทเดจเตเด•เตพ, เดนเตเด•เตเด•เตเด•เตพ, เดฎเดฑเตเดฑเต เดตเต‡เดฐเดฟเดฏเดฌเดฟเดณเตเด•เตพ

เด…เดŸเตเดคเตเดค DAG เดจเต‹เด•เตเด•เดพเดจเตเดณเตเดณ เดธเดฎเดฏเดฎเดพเดฃเดฟเดคเต, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""ะ“ะพัะฟะพะดะฐ ั…ะพั€ะพัˆะธะต, ะพั‚ั‡ะตั‚ั‹ ะพะฑะฝะพะฒะปะตะฝั‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ะะฐั‚ะฐัˆ, ะฟั€ะพัั‹ะฟะฐะนัั, ะผั‹ {{ dag.dag_id }} ัƒั€ะพะฝะธะปะธ
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

เดŽเดฒเตเดฒเดพเดตเดฐเตเด‚ เดŽเดชเตเดชเต‹เดดเต†เด™เตเด•เดฟเดฒเตเด‚ เด’เดฐเต เดฑเดฟเดชเตเดชเต‹เตผเดŸเตเดŸเต เด…เดชเตเดกเต‡เดฑเตเดฑเต เดšเต†เดฏเตเดคเดฟเดŸเตเดŸเตเดฃเตเดŸเต‹? เด‡เดคเต เดตเต€เดฃเตเดŸเตเด‚ เด…เดตเดณเดพเดฃเต: เดกเดพเดฑเตเดฑ เดŽเดตเดฟเดŸเต† เดจเดฟเดจเตเดจเต เดฒเดญเดฟเด•เตเด•เตเด‚ เดŽเดจเตเดจเดคเดฟเดจเตเดฑเต† เด‰เดฑเดตเดฟเดŸเด™เตเด™เดณเตเดŸเต† เด’เดฐเต เดฒเดฟเดธเตเดฑเตเดฑเต เด‰เดฃเตเดŸเต; เดŽเดตเดฟเดŸเต† เดตเดฏเตเด•เตเด•เดฃเดฎเต†เดจเตเดจเต เด’เดฐเต เดฒเดฟเดธเตเดฑเตเดฑเต เด‰เดฃเตเดŸเต; เดŽเดฒเตเดฒเดพเด‚ เดธเด‚เดญเดตเดฟเด•เตเด•เตเดฎเตเดชเต‹เดดเต‹ เดคเด•เดฐเตเดฎเตเดชเต‹เดดเต‹ เดนเต‹เตบ เดšเต†เดฏเตเดฏเดพเตป เดฎเดฑเด•เตเด•เดฐเตเดคเต (เดถเดฐเดฟ, เด‡เดคเต เดžเด™เตเด™เดณเต†เด•เตเด•เตเดฑเดฟเดšเตเดšเดฒเตเดฒ, เด‡เดฒเตเดฒ).

เดจเดฎเตเด•เตเด•เต เดตเต€เดฃเตเดŸเตเด‚ เดซเดฏเดฒเดฟเดฒเต‚เดŸเต† เดชเต‹เดฏเดฟ เดชเตเดคเดฟเดฏ เด…เดตเตเดฏเด•เตเดคเดฎเดพเดฏ เด•เดพเดฐเตเดฏเด™เตเด™เตพ เดจเต‹เด•เตเด•เดพเด‚:

  • from commons.operators import TelegramBotSendMessage - เด…เตบเดฌเตเดฒเต‹เด•เตเด•เต เดšเต†เดฏเตเดคเดคเดฟเดฒเต‡เด•เตเด•เต เดธเดจเตเดฆเต‡เดถเด™เตเด™เตพ เด…เดฏเดฏเตโ€Œเด•เตเด•เตเดจเตเดจเดคเดฟเดจเต เด’เดฐเต เดšเต†เดฑเดฟเดฏ เดฑเดพเดชเตเดชเตผ เดจเดฟเตผเดฎเตเดฎเดฟเดšเตเดšเตเด•เตŠเดฃเตเดŸเต เดžเด™เตเด™เตพ เดชเตเดฐเดฏเต‹เดœเดจเดชเตเดชเต†เดŸเตเดคเตเดคเดฟเดฏ เดžเด™เตเด™เดณเตเดŸเต† เดธเตเดตเดจเตเดคเด‚ เด“เดชเตเดชเดฑเต‡เดฑเตเดฑเตผเดฎเดพเดฐเต† เด‰เดฃเตเดŸเดพเด•เตเด•เตเดจเตเดจเดคเดฟเตฝ เดจเดฟเดจเตเดจเต เด’เดจเตเดจเตเด‚ เดžเด™เตเด™เดณเต† เดคเดŸเดฏเตเดจเตเดจเดฟเดฒเตเดฒ. (เดˆ เด“เดชเตเดชเดฑเต‡เดฑเตเดฑเดฑเต† เด•เตเดฑเดฟเดšเตเดšเต เดžเด™เตเด™เตพ เดคเดพเดดเต† เดธเด‚เดธเดพเดฐเดฟเด•เตเด•เตเด‚);
  • default_args={} - dag-เดจเต เด…เดคเดฟเดจเตเดฑเต† เดŽเดฒเตเดฒเดพ เด“เดชเตเดชเดฑเต‡เดฑเตเดฑเตผเดฎเดพเตผเด•เตเด•เตเด‚ เด’เดฐเต‡ เด†เตผเด—เตเดฏเตเดฎเต†เดจเตเดฑเตเด•เตพ เดตเดฟเดคเดฐเดฃเด‚ เดšเต†เดฏเตเดฏเดพเตป เด•เดดเดฟเดฏเตเด‚;
  • to='{{ var.value.all_the_kings_men }}' - เดซเต€เตฝเดกเต to เดžเด™เตเด™เตพ เดนเดพเตผเดกเตโ€Œเด•เต‹เดกเต เดšเต†เดฏเตเดฏเดฟเดฒเตเดฒ, เดชเด•เตเดทเต‡ เดœเดฟเตปเดœเดฏเตเด‚ เด‡เดฎเต†เดฏเดฟเดฒเตเด•เดณเตเดŸเต† เด’เดฐเต เดฒเดฟเดธเตเดฑเตเดฑเต เด‰เดณเตเดณ เด’เดฐเต เดตเต‡เดฐเดฟเดฏเดฌเดฟเดณเตเด‚ เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เดšเดฒเดจเดพเดคเตเดฎเด•เดฎเดพเดฏเดฟ เดœเดจเดฑเต‡เดฑเตเดฑเต เดšเต†เดฏเตโ€Œเดคเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจเต, เด…เดคเต เดžเดพเตป เดถเตเดฐเดฆเตเดงเดพเดชเต‚เตผเดตเตเดตเด‚ เด‡เดŸเตเดŸเต Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - เด“เดชเตเดชเดฑเต‡เดฑเตเดฑเตผ เด†เดฐเด‚เดญเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเดจเตเดณเตเดณ เดตเตเดฏเดตเดธเตเดฅ. เดžเด™เตเด™เดณเตเดŸเต† เด•เดพเดฐเตเดฏเดคเตเดคเดฟเตฝ, เดŽเดฒเตเดฒเดพ เด†เดถเตเดฐเดฟเดคเดคเตเดตเด™เตเด™เดณเตเด‚ เดชเตเดฐเดตเตผเดคเตเดคเดฟเดšเตเดšเดพเตฝ เดฎเดพเดคเตเดฐเดฎเต‡ เด•เดคเตเดคเต เดฎเต‡เดฒเดงเดฟเด•เดพเดฐเดฟเด•เตพเด•เตเด•เต เดชเดฑเด•เตเด•เตเด‚ เดตเดฟเดœเดฏเด•เดฐเดฎเดพเดฏเดฟ;
  • tg_bot_conn_id='tg_main' - เดตเดพเดฆเด™เตเด™เตพ conn_id เดžเด™เตเด™เตพ เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเดจเตเดจ เด•เดฃเด•เตเดทเตป เดเดกเดฟเด•เตพ เดธเตเดตเต€เด•เดฐเดฟเด•เตเด•เตเด• Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - เดตเต€เดฃเตเดชเต‹เดฏ เดŸเดพเดธเตโ€Œเด•เตเด•เตเด•เตพ เด‰เดฃเตเดŸเต†เด™เตเด•เดฟเตฝ เดฎเดพเดคเตเดฐเดฎเต‡ เดŸเต†เดฒเดฟเด—เตเดฐเดพเดฎเดฟเดฒเต† เดธเดจเตเดฆเต‡เดถเด™เตเด™เตพ เดชเดฑเดจเตเดจเตเดชเต‹เด•เต‚;
  • task_concurrency=1 - เด’เดฐเต เดŸเดพเดธเตโ€Œเด•เตเด•เดฟเดจเตเดฑเต† เดจเดฟเดฐเดตเดงเดฟ เดŸเดพเดธเตโ€Œเด•เต เดธเดจเตเดฆเตผเดญเด™เตเด™เตพ เด’เดฐเต‡เดธเดฎเดฏเด‚ เดธเดฎเดพเดฐเด‚เดญเดฟเด•เตเด•เตเดจเตเดจเดคเต เดžเด™เตเด™เตพ เดจเดฟเดฐเต‹เดงเดฟเด•เตเด•เตเดจเตเดจเต. เด…เดฒเตเดฒเดพเดคเตเดคเดชเด•เตเดทเด‚, เดจเดฎเตเด•เตเด•เต เดชเดฒเดคเดฟเดจเตเดฑเต†เดฏเตเด‚ เด’เดฐเต‡เดธเดฎเดฏเด‚ เดฒเต‹เดžเตเดšเต เดฒเดญเดฟเด•เตเด•เตเด‚ VerticaOperator (เด’เดฐเต เดฎเต‡เดถเดฏเดฟเดฒเต‡เด•เตเด•เต เดจเต‹เด•เตเด•เตเดจเตเดจเต);
  • report_update >> [email, tg] - เดŽเดฒเตเดฒเดพเด‚ VerticaOperator เด‡เดคเตเดชเต‹เดฒเตเดณเตเดณ เด…เด•เตเดทเดฐเด™เตเด™เดณเตเด‚ เดธเดจเตเดฆเต‡เดถเด™เตเด™เดณเตเด‚ เด…เดฏเดฏเตเด•เตเด•เตเดจเตเดจเดคเดฟเตฝ เด’เดคเตเดคเตเดšเต‡เดฐเตเด•:
    เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

    เดŽเดจเตเดจเดพเตฝ เดจเต‹เดŸเตเดŸเดฟเดซเดฏเตผ เด“เดชเตเดชเดฑเต‡เดฑเตเดฑเตผเดฎเดพเตผเด•เตเด•เต เดตเตเดฏเดคเตเดฏเดธเตโ€Œเดค เดฒเต‹เดžเตเดšเต เดตเตเดฏเดตเดธเตเดฅเด•เตพ เด‰เดณเตเดณเดคเดฟเดจเดพเตฝ, เด’เดฐเต†เดฃเตเดฃเด‚ เดฎเดพเดคเตเดฐเดฎเต‡ เดชเตเดฐเดตเตผเดคเตเดคเดฟเด•เตเด•เต‚. เดŸเตเดฐเต€ เดตเตเดฏเต‚เดตเดฟเตฝ, เดŽเดฒเตเดฒเดพเด‚ เด•เตเดฑเดšเตเดšเตเด•เต‚เดŸเดฟ เดฆเตƒเดถเตเดฏเดชเดฐเดฎเดพเดฏเดฟ เด•เดพเดฃเดชเตเดชเต†เดŸเตเดจเตเดจเต:
    เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

เดžเดพเตป เด‡เดคเดฟเดจเต†เด•เตเด•เตเดฑเดฟเดšเตเดšเต เด•เตเดฑเดšเตเดšเต เดตเดพเด•เตเด•เตเด•เตพ เดชเดฑเดฏเตเด‚ เดฎเดพเด•เตเดฐเต‹เด•เตพ เด…เดตเดฐเตเดŸเต† เดธเตเดนเตƒเดคเตเดคเตเด•เตเด•เดณเตเด‚ - เดตเต‡เดฐเดฟเดฏเดฌเดฟเดณเตเด•เตพ.

เดตเดฟเดตเดฟเดง เด‰เดชเดฏเต‹เด—เดชเตเดฐเดฆเดฎเดพเดฏ เดตเดฟเดตเดฐเด™เตเด™เตพ เด“เดชเตเดชเดฑเต‡เดฑเตเดฑเตผ เด†เตผเด—เตเดฏเตเดฎเต†เดจเตเดฑเตเด•เดณเดฟเดฒเต‡เด•เตเด•เต เดชเด•เดฐเดพเตป เด•เดดเดฟเดฏเตเดจเตเดจ เดœเดฟเดžเตเดš เดชเตเดฒเต†เดฏเตโ€Œเดธเตโ€Œเดนเต‹เตพเดกเดฑเตเด•เดณเดพเดฃเต เดฎเดพเด•เตเดฐเต‹เด•เตพ. เด‰เดฆเดพเดนเดฐเดฃเดคเตเดคเดฟเดจเต, เด‡เดคเตเดชเต‹เดฒเต†:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} เดธเดจเตเดฆเตผเดญ เดตเต‡เดฐเดฟเดฏเดฌเดฟเดณเดฟเดจเตเดฑเต† เด‰เดณเตเดณเดŸเด•เตเด•เด™เตเด™เดณเดฟเดฒเต‡เด•เตเด•เต เดตเดฟเด•เดธเดฟเดชเตเดชเดฟเด•เตเด•เตเด‚ execution_date เดซเต‹เตผเดฎเดพเดฑเตเดฑเดฟเตฝ YYYY-MM-DD: 2020-07-14. เดธเดจเตเดฆเตผเดญ เดตเต‡เดฐเดฟเดฏเดฌเดฟเดณเตเด•เตพ เด’เดฐเต เดจเดฟเตผเดฆเตเดฆเดฟเดทเตโ€ŒเดŸ เดŸเดพเดธเตโ€Œเด•เต เด‡เตปเดธเตโ€Œเดฑเตเดฑเตปเดธเดฟเดฒเต‡เด•เตเด•เต (เดŸเตเดฐเต€ เดตเตเดฏเต‚เดตเดฟเดฒเต† เด’เดฐเต เดธเตโ€Œเด•เตเดตเดฏเตผ) เดจเต†เดฏเดฟเตฝ เดšเต†เดฏเตโ€Œเดคเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจเต เดŽเดจเตเดจเดคเดพเดฃเต เดเดฑเตเดฑเดตเตเด‚ เดจเดฒเตเดฒ เดญเดพเด—เด‚, เดชเตเดจเดฐเดพเดฐเด‚เดญเดฟเด•เตเด•เตเดฎเตเดชเต‹เตพ, เดชเตเดฒเต†เดฏเตโ€Œเดธเตโ€Œเดนเต‹เตพเดกเดฑเตเด•เตพ เด…เดคเต‡ เดฎเต‚เดฒเตเดฏเด™เตเด™เดณเดฟเดฒเต‡เด•เตเด•เต เดตเดฟเด•เดธเดฟเด•เตเด•เตเด‚.

เด“เดฐเต‹ เดŸเดพเดธเตโ€Œเด•เต เด‡เตปเดธเตโ€Œเดฑเตเดฑเตปเดธเดฟเดฒเตเด‚ เดฑเต†เตปเดกเตผ เดšเต†เดฏเตโ€Œเดค เดฌเดŸเตเดŸเตบ เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เดจเดฟเดฏเตเด•เตเดค เดฎเต‚เดฒเตเดฏเด™เตเด™เตพ เด•เดพเดฃเดพเตป เด•เดดเดฟเดฏเตเด‚. เด’เดฐเต เด•เดคเตเดคเต เด…เดฏเดฏเตเด•เตเด•เตเดจเตเดจเดคเดฟเดจเตเดณเตเดณ เดšเตเดฎเดคเดฒ เด‡เด™เตเด™เดจเต†เดฏเดพเดฃเต:

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

เด…เด™เตเด™เดจเต† เด’เดฐเต เดธเดจเตเดฆเต‡เดถเด‚ เด…เดฏเด•เตเด•เตเดจเตเดจ เดšเตเดฎเดคเดฒเดฏเดฟเตฝ:

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

เดฒเดญเตเดฏเดฎเดพเดฏ เดเดฑเตเดฑเดตเตเด‚ เดชเตเดคเดฟเดฏ เดชเดคเดฟเดชเตเดชเดฟเดจเดพเดฏเตเดณเตเดณ เดฌเดฟเตฝเดฑเตเดฑเต-เด‡เตป เดฎเดพเด•เตเดฐเต‹เด•เดณเตเดŸเต† เดชเต‚เตผเดฃเตเดฃเดฎเดพเดฏ เดฒเดฟเดธเตเดฑเตเดฑเต เด‡เดตเดฟเดŸเต† เดฒเดญเตเดฏเดฎเดพเดฃเต: เดฎเดพเด•เตเดฐเต‹ เดฑเดซเดฑเตปเดธเต

เดฎเดพเดคเตเดฐเดฎเดฒเตเดฒ, เดชเตเดฒเด—เดฟเดจเตเด•เดณเตเดŸเต† เดธเดนเดพเดฏเดคเตเดคเต‹เดŸเต†, เดจเดฎเตเด•เตเด•เต เดจเดฎเตเดฎเตเดŸเต† เดธเตเดตเดจเตเดคเด‚ เดฎเดพเด•เตเดฐเต‹เด•เตพ เดชเตเดฐเด–เตเดฏเดพเดชเดฟเด•เตเด•เดพเตป เด•เดดเดฟเดฏเตเด‚, เดชเด•เตเดทเต‡ เด…เดคเต เดฎเดฑเตเดฑเตŠเดฐเต เด•เดฅเดฏเดพเดฃเต.

เดฎเตเตปเด•เต‚เดŸเตเดŸเดฟ เดจเดฟเดถเตเดšเดฏเดฟเดšเตเดš เด•เดพเดฐเตเดฏเด™เตเด™เตพเด•เตเด•เต เดชเตเดฑเดฎเต‡, เดจเดฎเตเด•เตเด•เต เดจเดฎเตเดฎเตเดŸเต† เดตเต‡เดฐเดฟเดฏเดฌเดฟเดณเตเด•เดณเตเดŸเต† เดฎเต‚เดฒเตเดฏเด™เตเด™เตพ เดฎเดพเดฑเตเดฑเดฟเดธเตเดฅเดพเดชเดฟเด•เตเด•เดพเด‚ (เดฎเตเด•เดณเดฟเดฒเตเดณเตเดณ เด•เต‹เดกเดฟเตฝ เดžเดพเตป เด‡เดคเต เด‡เดคเดฟเดจเด•เด‚ เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต). เดจเดฎเตเด•เตเด•เต เดธเตƒเดทเตเดŸเดฟเด•เตเด•เดพเด‚ Admin/Variables เด’เดจเตเดจเตเดฐเดฃเตเดŸเต เด•เดพเดฐเตเดฏเด™เตเด™เตพ:

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

เดจเดฟเด™เตเด™เตพเด•เตเด•เต เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เดพเตป เด•เดดเดฟเดฏเตเดจเตเดจ เดŽเดฒเตเดฒเดพเด‚:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

เดฎเต‚เดฒเตเดฏเด‚ เด’เดฐเต เดธเตเด•เต†เดฏเดฟเดฒเตผ เด†เด•เดพเด‚, เด…เดฒเตเดฒเต†เด™เตเด•เดฟเตฝ เด…เดคเต JSON เด†เด•เดพเด‚. JSON-เดจเตเดฑเต† เด•เดพเดฐเตเดฏเดคเตเดคเดฟเตฝ:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

เด†เดตเดถเตเดฏเดฎเตเดณเตเดณ เด•เต€เดฏเดฟเดฒเต‡เด•เตเด•เตเดณเตเดณ เดชเดพเดค เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เตเด•: {{ var.json.bot_config.bot.token }}.

เดžเดพเตป เด…เด•เตเดทเดฐเดพเตผเดคเตเดฅเดคเตเดคเดฟเตฝ เด’เดฐเต เดตเดพเด•เตเด•เต เดชเดฑเดฏเตเด•เดฏเตเด‚ เด’เดฐเต เดธเตเด•เตเดฐเต€เตปเดทเต‹เดŸเตเดŸเต เด•เดพเดฃเดฟเด•เตเด•เตเด•เดฏเตเด‚ เดšเต†เดฏเตเดฏเตเด‚ ัะพะตะดะธะฝะตะฝะธั. เดŽเดฒเตเดฒเดพเด‚ เด‡เดตเดฟเดŸเต† เดชเตเดฐเดพเดฅเดฎเดฟเด•เดฎเดพเดฃเต: เดชเต‡เดœเดฟเตฝ Admin/Connections เดžเด™เตเด™เตพ เด’เดฐเต เด•เดฃเด•เตเดทเตป เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเดจเตเดจเต, เด…เดตเดฟเดŸเต† เดžเด™เตเด™เดณเตเดŸเต† เดฒเต‹เด—เดฟเดจเตเด•เตพ / เดชเดพเดธเตโ€Œเดตเต‡เดกเตเด•เตพ, เด•เต‚เดŸเตเดคเตฝ เดจเดฟเตผเดฆเตเดฆเดฟเดทเตเดŸ เดชเดพเดฐเดพเดฎเต€เดฑเตเดฑเดฑเตเด•เตพ เดŽเดจเตเดจเดฟเดต เดšเต‡เตผเด•เตเด•เตเด•. เด‡เดคเตเดชเต‡เดพเดฒเต†:

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

เดชเดพเดธเตโ€Œเดตเต‡เดกเตเด•เตพ เดŽเตปเด•เตเดฐเดฟเดชเตเดฑเตเดฑเต เดšเต†เดฏเตเดฏเดพเตป เด•เดดเดฟเดฏเตเด‚ (เดกเดฟเดซเต‹เตพเดŸเตเดŸเดฟเดจเต†เด•เตเด•เดพเตพ เด•เต‚เดŸเตเดคเตฝ เดจเดจเตเดจเดพเดฏเดฟ), เด…เดฒเตเดฒเต†เด™เตเด•เดฟเตฝ เดจเดฟเด™เตเด™เตพเด•เตเด•เต เด•เดฃเด•เตเดทเตป เดคเดฐเด‚ เด‰เดชเต‡เด•เตเดทเดฟเด•เตเด•เดพเด‚ (เดžเดพเตป เดšเต†เดฏเตเดคเดคเตเดชเต‹เดฒเต† tg_main) - เดŽเดฏเตผเดซเตเดฒเต‹ เดฎเต‹เดกเดฒเตเด•เดณเดฟเตฝ เดคเดฐเด™เตเด™เดณเตเดŸเต† เดฒเดฟเดธเตเดฑเตเดฑเต เดนเดพเตผเดกเต เดตเดฏเตผเดกเต เด†เดฏเดคเดฟเดจเดพเตฝ เดธเต‹เดดเตเดธเต เด•เต‹เดกเตเด•เดณเดฟเดฒเต‡เด•เตเด•เต เด•เดŸเด•เตเด•เดพเดคเต† เดตเดฟเดชเตเดฒเต€เด•เดฐเดฟเด•เตเด•เดพเตป เด•เดดเดฟเดฏเดฟเดฒเตเดฒ เดŽเดจเตเดจเดคเดพเดฃเต เดตเดธเตเดคเตเดค (เดžเดพเตป เดชเต†เดŸเตเดŸเต†เดจเตเดจเต เดŽเดจเตเดคเต†เด™เตเด•เดฟเดฒเตเด‚ เด—เต‚เด—เดฟเตพ เดšเต†เดฏเตเดคเดฟเดฒเตเดฒเต†เด™เตเด•เดฟเตฝ, เดฆเดฏเดตเดพเดฏเดฟ เดŽเดจเตเดจเต† เดถเดฐเดฟเดฏเดพเด•เตเด•เตเด•), เดŽเดจเตเดจเดพเตฝ เด•เตเดฐเต†เดกเดฟเดฑเตเดฑเตเด•เตพ เดฒเดญเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเตฝ เดจเดฟเดจเตเดจเต เด’เดจเตเดจเตเด‚ เดžเด™เตเด™เดณเต† เดคเดŸเดฏเดฟเดฒเตเดฒ เดชเต‡เดฐเต.

เดจเดฟเด™เตเด™เตพเด•เตเด•เต เด’เดฐเต‡ เดชเต‡เดฐเดฟเตฝ เดจเดฟเดฐเดตเดงเดฟ เด•เดฃเด•เตเดทเดจเตเด•เตพ เด‰เดฃเตเดŸเดพเด•เตเด•เดพเด‚: เดˆ เดธเดพเดนเดšเดฐเตเดฏเดคเตเดคเดฟเตฝ, เดฐเต€เดคเดฟ BaseHook.get_connection(), เดจเดฎเตเด•เตเด•เต เดชเต‡เดฐเตเด•เตพ เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เด•เดฃเด•เตเดทเดจเตเด•เตพ เดฒเดญเดฟเด•เตเด•เตเดจเตเดจเดคเต, เดคเดฐเตเด‚ เด•เตเดฐเดฎเดฐเดนเดฟเดคเดฎเดพเดฏ เดจเดฟเดฐเดตเดงเดฟ เดชเต‡เดฐเตเด•เดณเดฟเตฝ เดจเดฟเดจเตเดจเต (เดฑเต—เดฃเตเดŸเต เดฑเต‹เดฌเดฟเตป เดจเดฟเตผเดฎเตเดฎเดฟเด•เตเด•เตเดจเตเดจเดคเต เด•เต‚เดŸเตเดคเตฝ เดฏเตเด•เตเดคเดฟเดธเดนเดฎเดพเดฏเดฟเดฐเดฟเด•เตเด•เตเด‚, เดชเด•เตเดทเต‡ เดจเดฎเตเด•เตเด•เต เด…เดคเต เดŽเดฏเตผเดซเตเดฒเต‹ เดกเต†เดตเดฒเดชเตเดชเตผเดฎเดพเดฐเตเดŸเต† เดฎเดจเดธเตเดธเดพเด•เตเดทเดฟเด•เตเด•เต เดตเดฟเดŸเดพเด‚).

เดตเต‡เดฐเดฟเดฏเดฌเดฟเดณเตเด•เดณเตเด‚ เด•เดฃเด•เตเดทเดจเตเด•เดณเตเด‚ เดคเต€เตผเดšเตเดšเดฏเดพเดฏเตเด‚ เดฐเดธเด•เดฐเดฎเดพเดฏ เด‰เดชเด•เดฐเดฃเด™เตเด™เดณเดพเดฃเต, เดŽเดจเตเดจเดพเตฝ เดฌเดพเดฒเตปเดธเต เดจเดทเตโ€ŒเดŸเดชเตเดชเต†เดŸเดพเดคเดฟเดฐเดฟเด•เตเด•เต‡เดฃเตเดŸเดคเต เดชเตเดฐเดงเดพเดจเดฎเดพเดฃเต: เดจเดฟเด™เตเด™เดณเตเดŸเต† เดซเตเดฒเต‹เด•เดณเตเดŸเต† เดเดคเตŠเด•เตเด•เต† เดญเดพเด—เด™เตเด™เตพ เด•เต‹เดกเดฟเตฝ เดคเดจเตเดจเต† เดธเด‚เดญเดฐเดฟเด•เตเด•เตเดจเตเดจเต, เดธเด‚เดญเดฐเดฃเดคเตเดคเดฟเดจเดพเดฏเดฟ เดŽเดฏเตผเดซเตเดฒเต‹เดฏเตโ€Œเด•เตเด•เต เดเดคเตŠเด•เตเด•เต† เดญเดพเด—เด™เตเด™เตพ เดจเตฝเด•เตเดจเตเดจเต. เด’เดฐเต เดตเดถเดคเตเดคเต, เดฎเต‚เดฒเตเดฏเด‚ เดตเต‡เด—เดคเตเดคเดฟเตฝ เดฎเดพเดฑเตเดฑเตเดจเตเดจเดคเต เดธเต—เด•เดฐเตเดฏเดชเตเดฐเดฆเดฎเดพเดฏเดฟเดฐเดฟเด•เตเด•เตเด‚, เด‰เดฆเดพเดนเดฐเดฃเดคเตเดคเดฟเดจเต, เด’เดฐเต เดฎเต†เดฏเดฟเดฒเดฟเด‚เด—เต เดฌเต‹เด•เตเดธเต, เดฏเตเด เดตเดดเดฟ. เดฎเดฑเตเดตเดถเดคเตเดคเต, เด‡เดคเต เด‡เดชเตเดชเต‹เดดเตเด‚ เดฎเต—เดธเต เด•เตเดฒเดฟเด•เตเด•เดฟเดฒเต‡เด•เตเด•เตเดณเตเดณ เดฎเดŸเด•เตเด•เดฎเดพเดฃเต, เด…เดคเดฟเตฝ เดจเดฟเดจเตเดจเต เดžเด™เตเด™เตพ (เดžเดพเตป) เดฐเด•เตเดทเดชเตเดชเต†เดŸเดพเตป เด†เด—เตเดฐเดนเดฟเดšเตเดšเต.

เด•เดฃเด•เตเดทเดจเตเด•เตพเด•เตเด•เตŠเดชเตเดชเด‚ เดชเตเดฐเดตเตผเดคเตเดคเดฟเด•เตเด•เตเดจเตเดจเดคเต เดšเตเดฎเดคเดฒเด•เดณเดฟเตฝ เด’เดจเตเดจเดพเดฃเต เด•เตŠเดณเตเดคเตเดคเตเด•เตพ. เดชเตŠเดคเตเดตเต‡, เดŽเดฏเตผเดซเตเดฒเต‹ เดนเตเด•เตเด•เตเด•เตพ เดฎเต‚เดจเตเดจเดพเด‚ เด•เด•เตเดทเดฟ เดธเต‡เดตเดจเด™เตเด™เดณเดฟเดฒเต‡เด•เตเด•เตเด‚ เดฒเตˆเดฌเตเดฐเดฑเดฟเด•เดณเดฟเดฒเต‡เด•เตเด•เตเด‚ เดฌเดจเตเดงเดฟเดชเตเดชเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเดจเตเดณเตเดณ เดชเต‹เดฏเดฟเดจเตเดฑเตเด•เดณเดพเดฃเต. เด‰เดฆเดพ, JiraHook เดœเดฟเดฑเดฏเตเดฎเดพเดฏเดฟ เด‡เดŸเดชเดดเด•เดพเตป เดžเด™เตเด™เตพเด•เตเด•เต เด’เดฐเต เด•เตเดฒเดฏเดจเตเดฑเต เดคเตเดฑเด•เตเด•เตเด‚ (เดจเดฟเด™เตเด™เตพเด•เตเด•เต เดŸเดพเดธเตโ€Œเด•เตเด•เตเด•เตพ เด…เด™เตเด™เต‹เดŸเตเดŸเตเด‚ เด‡เด™เตเด™เต‹เดŸเตเดŸเตเด‚ เดจเต€เด•เตเด•เดพเตป เด•เดดเดฟเดฏเตเด‚), เด•เต‚เดŸเดพเดคเต† SambaHook เดจเดฟเด™เตเด™เตพเด•เตเด•เต เด’เดฐเต เดฒเต‹เด•เตเด•เตฝ เดซเดฏเตฝ เดชเตเดทเต เดšเต†เดฏเตเดฏเดพเด‚ smb-เดชเต‹เดฏเดฟเดจเตเดฑเต.

เด‡เดทเตโ€ŒเดŸเดพเดจเตเดธเตƒเดค เด“เดชเตเดชเดฑเต‡เดฑเตเดฑเดฑเต† เดชเดพเดดเตโ€Œเดธเต เดšเต†เดฏเตเดฏเตเดจเตเดจเต

เด…เดคเต เดŽเด™เตเด™เดจเต† เด‰เดฃเตเดŸเดพเด•เตเด•เตเดจเตเดจเต เดŽเดจเตเดจเต เดจเต‹เด•เตเด•เดพเตป เดžเด™เตเด™เตพ เด…เดŸเตเดคเตเดคเต TelegramBotSendMessage

เด•เต‡เดพเดกเต commons/operators.py เดฏเดฅเดพเตผเดคเตเดฅ เด“เดชเตเดชเดฑเต‡เดฑเตเดฑเตผเด•เตเด•เตŠเดชเตเดชเด‚:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

เด‡เดตเดฟเดŸเต†, เดŽเดฏเตผเดซเตเดฒเต‹เดฏเดฟเดฒเต† เดฎเดฑเตเดฑเต†เดฒเตเดฒเดพ เด•เดพเดฐเตเดฏเด™เตเด™เดณเตเด‚ เดชเต‹เดฒเต†, เดŽเดฒเตเดฒเดพเด‚ เดตเดณเดฐเต† เดฒเดณเดฟเดคเดฎเดพเดฃเต:

  • เดชเดพเดฐเดฎเตเดชเดฐเตเดฏเดฎเดพเดฏเดฟ เดฒเดญเดฟเดšเตเดšเดคเต BaseOperator, เด‡เดคเต เด•เตเดฑเดšเตเดšเต เดŽเดฏเตผเดซเตเดฒเต‹-เดจเดฟเตผเดฆเตเดฆเดฟเดทเตเดŸ เด•เดพเดฐเตเดฏเด™เตเด™เตพ เดจเดŸเดชเตเดชเดฟเดฒเดพเด•เตเด•เตเดจเตเดจเต (เดจเดฟเด™เตเด™เดณเตเดŸเต† เด’เดดเดฟเดตเตเดธเดฎเดฏเด‚ เดจเต‹เด•เตเด•เตเด•)
  • เดชเตเดฐเด–เตเดฏเดพเดชเดฟเดค เดซเต€เตฝเดกเตเด•เตพ template_fields, เด…เดคเดฟเตฝ เดชเตเดฐเต‹เดธเดธเตเดธเต เดšเต†เดฏเตเดฏเดพเตป เดœเดฟเตปเดœ เดฎเดพเด•เตเดฐเต‹เด•เตพ เดจเต‹เด•เตเด•เตเด‚.
  • เดถเดฐเดฟเดฏเดพเดฏ เดตเดพเดฆเด™เตเด™เตพ เด•เตเดฐเดฎเต€เด•เดฐเดฟเดšเตเดšเต __init__(), เด†เดตเดถเตเดฏเดฎเตเดณเตเดณเดฟเดŸเดคเตเดคเต เดกเดฟเดซเต‹เตพเดŸเตเดŸเตเด•เตพ เดธเดœเตเดœเดฎเดพเด•เตเด•เตเด•.
  • เดชเต‚เตผเดตเตเดตเดฟเด•เดจเตเดฑเต† เดคเตเดŸเด•เตเด•เดคเตเดคเต†เด•เตเด•เตเดฑเดฟเดšเตเดšเตเด‚ เดžเด™เตเด™เตพ เดฎเดฑเดจเตเดจเดฟเดฒเตเดฒ.
  • เด…เดจเตเดฌเดจเตเดง เดนเตเด•เตเด•เต เดคเตเดฑเดจเตเดจเต TelegramBotHookเด…เดคเดฟเตฝ เดจเดฟเดจเตเดจเต เด’เดฐเต เด•เตเดฒเดฏเดจเตเดฑเต เด’เดฌเตเดœเด•เตเดฑเตเดฑเต เดฒเดญเดฟเดšเตเดšเต.
  • เด…เดธเดพเดงเตเดตเดพเด•เตเด•เดชเตเดชเต†เดŸเตเดŸ (เดชเตเดจเตผเดจเดฟเตผเดตเดšเดฟเด•เตเด•เดชเตเดชเต†เดŸเตเดŸ) เดฐเต€เดคเดฟ BaseOperator.execute(), เด“เดชเตเดชเดฑเต‡เดฑเตเดฑเตผ เดธเดฎเดพเดฐเด‚เดญเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเดจเตเดณเตเดณ เดธเดฎเดฏเด‚ เดตเดฐเตเดฎเตเดชเต‹เตพ เดเดคเต เดŽเดฏเตผเดซเต‹ เดตเดฒเดฟเด•เตเด•เตเด‚ - เด…เดคเดฟเตฝ เดžเด™เตเด™เตพ เดฒเต‹เด—เดฟเตป เดšเต†เดฏเตเดฏเดพเตป เดฎเดฑเดจเตเดจเต เดชเตเดฐเดงเดพเดจ เดชเตเดฐเดตเตผเดคเตเดคเดจเด‚ เดจเดŸเดชเตเดชเดฟเดฒเดพเด•เตเด•เตเด‚. (เดžเด™เตเด™เตพ เดจเต‡เดฐเดฟเดŸเตเดŸเต เดฒเต‹เด—เดฟเตป เดšเต†เดฏเตเดฏเตเดจเตเดจเต stdout ะธ stderr - เดตเดพเดฏเตเดชเตเดฐเดตเดพเดนเด‚ เดŽเดฒเตเดฒเดพเด‚ เดคเดŸเดธเตเดธเดชเตเดชเต†เดŸเตเดคเตเดคเตเด•เดฏเตเด‚ เดฎเดจเต‹เดนเดฐเดฎเดพเดฏเดฟ เดชเตŠเดคเดฟเดฏเตเด•เดฏเตเด‚ เด†เดตเดถเตเดฏเดฎเตเดณเตเดณเดฟเดŸเดคเตเดคเต เดตเดฟเด˜เดŸเดฟเดชเตเดชเดฟเด•เตเด•เตเด•เดฏเตเด‚ เดšเต†เดฏเตเดฏเตเด‚.)

เดจเดฎเตเด•เตเด•เต เดŽเดจเตเดคเดพเดฃเต เด‰เดณเตเดณเดคเต†เดจเตเดจเต เดจเต‹เด•เตเด•เดพเด‚ commons/hooks.py. เดซเดฏเดฒเดฟเดจเตเดฑเต† เด†เดฆเตเดฏ เดญเดพเด—เด‚, เดนเตเด•เตเด•เต เดคเดจเตเดจเต†:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

เด‡เดตเดฟเดŸเต† เดŽเดจเตเดคเดพเดฃเต เดตเดฟเดถเดฆเต€เด•เดฐเดฟเด•เตเด•เต‡เดฃเตเดŸเดคเต†เดจเตเดจเต เดŽเดจเดฟเด•เตเด•เดฑเดฟเดฏเดฟเดฒเตเดฒ, เดชเตเดฐเดงเดพเดจเดชเตเดชเต†เดŸเตเดŸ เด•เดพเดฐเตเดฏเด™เตเด™เตพ เดžเดพเตป เดถเตเดฐเดฆเตเดงเดฟเด•เตเด•เตเด‚:

  • เดžเด™เตเด™เตพเด•เตเด•เต เด…เดตเด•เดพเดถเดฎเตเดฃเตเดŸเต, เดตเดพเดฆเด™เตเด™เดณเต†เด•เตเด•เตเดฑเดฟเดšเตเดšเต เดšเดฟเดจเตเดคเดฟเด•เตเด•เตเด• - เดฎเดฟเด•เตเด• เด•เต‡เดธเตเด•เดณเดฟเดฒเตเด‚ เด‡เดคเต เด’เดจเตเดจเดพเดฏเดฟเดฐเดฟเด•เตเด•เตเด‚: conn_id;
  • เดธเตเดฑเตเดฑเดพเตปเดกเต‡เตผเดกเต เดฐเต€เดคเดฟเด•เตพ เดฎเดฑเดฟเด•เดŸเด•เตเด•เตเดจเตเดจเต: เดžเดพเตป เดŽเดจเตเดจเต†เดคเตเดคเดจเตเดจเต† เดชเดฐเดฟเดฎเดฟเดคเดชเตเดชเต†เดŸเตเดคเตเดคเดฟ get_conn(), เด…เดคเดฟเตฝ เดŽเดจเดฟเด•เตเด•เต เด•เดฃเด•เตเดทเตป เดชเดพเดฐเดพเดฎเต€เดฑเตเดฑเดฑเตเด•เตพ เดชเต‡เดฐเต เดชเตเดฐเด•เดพเดฐเด‚ เดฒเดญเดฟเด•เตเด•เตเด•เดฏเตเด‚ เดตเดฟเดญเดพเด—เด‚ เดจเต‡เดŸเตเด•เดฏเตเด‚ เดšเต†เดฏเตเดฏเตเดจเตเดจเต extra (เด‡เดคเตŠเดฐเต JSON เดซเต€เตฝเดกเดพเดฃเต), เด…เดคเดฟเตฝ เดžเดพเตป (เดŽเดจเตเดฑเต† เดธเตเดตเดจเตเดคเด‚ เดจเดฟเตผเดฆเตเดฆเต‡เดถเด™เตเด™เตพ เด…เดจเตเดธเดฐเดฟเดšเตเดšเต!) เดŸเต†เดฒเดฟเด—เตเดฐเดพเด‚ เดฌเต‹เดŸเตเดŸเต เดŸเต‹เด•เตเด•เตบ เด‡เดŸเตเดŸเต: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • เดžเดพเตป เดžเด™เตเด™เดณเตเดŸเต† เด’เดฐเต เด‰เดฆเดพเดนเดฐเดฃเด‚ เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเดจเตเดจเต TelegramBot, เด…เดคเดฟเดจเต เด’เดฐเต เดชเตเดฐเดคเตเดฏเต‡เด• เดŸเต‹เด•เตเด•เตบ เดจเตฝเด•เตเดจเตเดจเต.

เด…เดคเตเดฐเดฏเต‡เดฏเตเดณเตเดณเต‚. เด’เดฐเต เดนเตเด•เตเด•เต เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เดจเดฟเด™เตเด™เตพเด•เตเด•เต เด’เดฐเต เด•เตเดฒเดฏเดจเตเดฑเต เดฒเดญเดฟเด•เตเด•เตเด‚ TelegramBotHook().clent เด…เดฅเดตเดพ TelegramBotHook().get_conn().

เดŸเต†เดฒเดฟเด—เตเดฐเดพเด‚ REST API-เดฏเตโ€Œเด•เตเด•เดพเดฏเดฟ เดžเดพเตป เด’เดฐเต เดฎเตˆเด•เตเดฐเต‹เดฑเดพเดชเตเดชเตผ เดจเดฟเตผเดฎเตเดฎเดฟเด•เตเด•เตเดจเตเดจ เดซเดฏเดฒเดฟเดจเตเดฑเต† เดฐเดฃเตเดŸเดพเด‚ เดญเดพเด—เด‚, เด…เดคเต เดตเดฒเดฟเดšเตเดšเดฟเดŸเดพเดคเดฟเดฐเดฟเด•เตเด•เดพเตป python-telegram-bot เด’เดฐเต เดฐเต€เดคเดฟเด•เตเด•เต sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

เดŽเดฒเตเดฒเดพเด‚ เด•เต‚เดŸเตเดŸเดฟเดšเตเดšเต‡เตผเด•เตเด•เตเด• เดŽเดจเตเดจเดคเดพเดฃเต เดถเดฐเดฟเดฏเดพเดฏ เดฎเดพเตผเด—เด‚: TelegramBotSendMessage, TelegramBotHook, TelegramBot - เดชเตเดฒเด—เดฟเดจเดฟเตฝ, เด’เดฐเต เดชเตŠเดคเต เดธเด‚เดญเดฐเดฃเดฟเดฏเดฟเตฝ เด‡เดŸเตเด•, เด…เดคเต เด“เดชเตเดชเตบ เดธเต‹เดดเตโ€Œเดธเดฟเดจเต เดจเตฝเด•เตเด•.

เดžเด™เตเด™เตพ เด‡เดคเต†เดฒเตเดฒเดพเด‚ เดชเด เดฟเดšเตเดšเตเด•เตŠเดฃเตเดŸเดฟเดฐเดฟเด•เตเด•เตเดฎเตเดชเต‹เตพ, เดžเด™เตเด™เดณเตเดŸเต† เดฑเดฟเดชเตเดชเต‹เตผเดŸเตเดŸเต เด…เดชเตโ€Œเดกเต‡เดฑเตเดฑเตเด•เตพ เดตเดฟเดœเดฏเด•เดฐเดฎเดพเดฏเดฟ เดชเดฐเดพเดœเดฏเดชเตเดชเต†เดŸเตเด•เดฏเตเด‚ เดšเดพเดจเดฒเดฟเตฝ เดŽเดจเดฟเด•เตเด•เต เด’เดฐเต เดชเดฟเดถเด•เต เดธเดจเตเดฆเต‡เดถเด‚ เด…เดฏเดฏเตเด•เตเด•เตเด•เดฏเตเด‚ เดšเต†เดฏเตเดคเต. เด…เดคเต เดคเต†เดฑเตเดฑเดพเดฃเต‹ เดŽเดจเตเดจเต เดžเดพเตป เดชเดฐเดฟเดถเต‹เดงเดฟเด•เตเด•เตเด‚...

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต
เดžเด™เตเด™เดณเตเดŸเต† เดจเดพเดฏเดฏเดฟเตฝ เดŽเดจเตเดคเต‹ เดชเตŠเดŸเตเดŸเดฟ! เด…เดคเดฒเตเดฒเต‡ เดจเดฎเตเดฎเตพ เดชเตเดฐเดคเต€เด•เตเดทเดฟเดšเตเดšเดคเต? เด•เตƒเดคเตเดฏเดฎเดพเดฏเดฟ!

เดจเต€ เด’เดดเดฟเด•เตเด•เดพเตป เดชเต‹เดตเตเด•เดฏเดพเดฃเต‹?

เดŽเดจเดฟเด•เตเด•เต เดŽเดจเตเดคเต†เด™เตเด•เดฟเดฒเตเด‚ เดจเดทเตเดŸเดฎเดพเดฏเดคเดพเดฏเดฟ เดจเดฟเด™เตเด™เตพเด•เตเด•เต เดคเต‹เดจเตเดจเตเดจเตเดจเตเดฃเตเดŸเต‹? SQL เดธเต†เตผเดตเดฑเดฟเตฝ เดจเดฟเดจเตเดจเต เดตเต†เตผเดŸเตเดŸเดฟเด•เตเด•เดฏเดฟเดฒเต‡เด•เตเด•เต เดกเดพเดฑเตเดฑ เดŸเตเดฐเดพเตปเดธเตเดซเตผ เดšเต†เดฏเตเดฏเดพเดฎเต†เดจเตเดจเต เด…เดฆเตเดฆเต‡เดนเด‚ เดตเดพเด—เตเดฆเดพเดจเด‚ เดšเต†เดฏเตเดคเดคเดพเดฏเดฟ เดคเต‹เดจเตเดจเตเดจเตเดจเต, เดŽเดจเตเดจเดฟเดŸเตเดŸเต เด…เดฆเตเดฆเต‡เดนเด‚ เด…เดคเต เดŽเดŸเตเดคเตเดคเต เดตเดฟเดทเดฏเดคเตเดคเดฟเตฝ เดจเดฟเดจเตเดจเต เดฎเดพเดฑเดฟ, เด…เดดเดฟเดฎเดคเดฟ!

เดˆ เด•เตเดฐเต‚เดฐเดค เดฎเดจเดƒเดชเต‚เตผเดตเดฎเดพเดฏเดฟเดฐเตเดจเตเดจเต, เดŽเดจเดฟเด•เตเด•เต เดจเดฟเด™เตเด™เตพเด•เตเด•เดพเดฏเดฟ เดšเดฟเดฒ เดชเดฆเดพเดตเดฒเดฟเด•เตพ เดฎเดจเดธเตเดธเดฟเดฒเดพเด•เตเด•เต‡เดฃเตเดŸเดฟเดตเดจเตเดจเต. เด‡เดชเตเดชเต‹เตพ เดจเดฟเด™เตเด™เตพเด•เตเด•เต เด•เต‚เดŸเตเดคเตฝ เดฎเตเดจเตเดจเต‹เดŸเตเดŸเต เดชเต‹เด•เดพเด‚.

เดžเด™เตเด™เดณเตเดŸเต† เดชเดฆเตเดงเดคเดฟ เด‡เดคเดพเดฏเดฟเดฐเตเดจเตเดจเต:

  1. เดกเดพเด—เต เดšเต†เดฏเตเดฏเตเด•
  2. เดŸเดพเดธเตเด•เตเด•เตเด•เตพ เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเด•
  3. เดŽเดฒเตเดฒเดพเด‚ เดŽเดคเตเดฐ เดฎเดจเต‹เดนเดฐเดฎเดพเดฃเต†เดจเตเดจเต เด•เดพเดฃเตเด•
  4. เดชเต‚เดฐเดฟเดชเตเดชเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเดจเต เดธเต†เดทเตป เดจเดฎเตเดชเดฑเตเด•เตพ เดจเตฝเด•เตเด•
  5. SQL เดธเต†เตผเดตเดฑเดฟเตฝ เดจเดฟเดจเตเดจเต เดกเดพเดฑเตเดฑ เดจเต‡เดŸเตเด•
  6. เดตเต†เตผเดŸเตเดŸเดฟเด•เตเด•เดฏเดฟเตฝ เดกเดพเดฑเตเดฑ เด‡เดŸเตเด•
  7. เดธเตเดฅเดฟเดคเดฟเดตเดฟเดตเดฐเด•เตเด•เดฃเด•เตเด•เตเด•เตพ เดถเต‡เด–เดฐเดฟเด•เตเด•เตเด•

เด…เดคเดฟเดจเดพเตฝ, เด‡เดคเต†เดฒเตเดฒเดพเด‚ เดชเตเดฐเดตเตผเดคเตเดคเดจเด•เตเดทเดฎเดฎเดพเด•เตเด•เดพเตป, เดžเดพเตป เดžเด™เตเด™เดณเตเดŸเต† เด’เดฐเต เดšเต†เดฑเดฟเดฏ เด•เต‚เดŸเตเดŸเดฟเดšเตเดšเต‡เตผเด•เตเด•เตฝ เดจเดŸเดคเตเดคเดฟ docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

เด…เดตเดฟเดŸเต† เดžเด™เตเด™เตพ เด‰เดฏเตผเดคเตเดคเตเดจเตเดจเต:

  • เดนเต‹เดธเตเดฑเตเดฑเดพเดฏเดฟ เดตเต†เตผเดŸเตเดŸเดฟเด•เตเด• dwh เดเดฑเตเดฑเดตเตเด‚ เดธเตเดฅเดฟเดฐเดธเตเดฅเดฟเดคเดฟ เด•เตเดฐเดฎเต€เด•เดฐเดฃเด™เตเด™เตพเด•เตเด•เตŠเดชเตเดชเด‚,
  • SQL เดธเต†เตผเดตเดฑเดฟเดจเตเดฑเต† เดฎเต‚เดจเตเดจเต เดธเดจเตเดฆเตผเดญเด™เตเด™เตพ,
  • เดชเดฟเดจเตเดจเต€เดŸเตเดณเตเดณ เดกเดพเดฑเตเดฑเดพเดฌเต‡เดธเตเด•เตพ เดžเด™เตเด™เตพ เด•เตเดฑเดšเตเดšเต เดกเดพเดฑเตเดฑ เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เดชเต‚เดฐเดฟเดชเตเดชเดฟเด•เตเด•เตเดจเตเดจเต (เด’เดฐเต เดธเดพเดนเดšเดฐเตเดฏเดคเตเดคเดฟเดฒเตเด‚ เดจเต‹เด•เตเด•เดฐเตเดคเต mssql_init.py!)

เด•เดดเดฟเดžเตเดž เดคเดตเดฃเดคเตเดคเต‡เด•เตเด•เดพเตพ เด…เตฝเดชเตเดชเด‚ เดธเด™เตเด•เต€เตผเดฃเตเดฃเดฎเดพเดฏ เด’เดฐเต เด•เดฎเดพเตปเดกเดฟเดจเตเดฑเต† เดธเดนเดพเดฏเดคเตเดคเต‹เดŸเต† เดžเด™เตเด™เตพ เดŽเดฒเตเดฒเดพ เดจเดฒเตเดฒ เด•เดพเดฐเตเดฏเด™เตเด™เดณเตเด‚ เดธเดฎเดพเดฐเด‚เดญเดฟเด•เตเด•เตเดจเตเดจเต:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

เดžเด™เตเด™เดณเตเดŸเต† เด…เดคเตเดญเตเดค เดฑเดพเตปเดกเดฎเตˆเดธเตผ เดธเตƒเดทเตเดŸเดฟเดšเตเดšเดคเต, เดจเดฟเด™เตเด™เตพเด•เตเด•เต เด‡เดจเด‚ เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เดพเด‚ Data Profiling/Ad Hoc Query:

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต
เดชเตเดฐเดงเดพเดจ เด•เดพเดฐเตเดฏเด‚ เด…เดคเต เดตเดฟเดถเด•เดฒเดจ เดตเดฟเดฆเด—เตเดงเตผเด•เตเด•เต เด•เดพเดฃเดฟเด•เตเด•เดฐเตเดคเต เดŽเดจเตเดจเดคเดพเดฃเต

เดตเดฟเดตเดฐเดฟเด•เตเด•เดพเตป ETL เดธเต†เดทเดจเตเด•เตพ เดžเดพเตป เดšเต†เดฏเตเดฏเดฟเดฒเตเดฒ, เดŽเดฒเตเดฒเดพเด‚ เด…เดตเดฟเดŸเต† เดจเดฟเดธเตเดธเดพเดฐเดฎเดพเดฃเต: เดžเด™เตเด™เตพ เด’เดฐเต เด…เดŸเดฟเดคเตเดคเดฑ เด‰เดฃเตเดŸเดพเด•เตเด•เตเดจเตเดจเต, เด…เดคเดฟเตฝ เด’เดฐเต เด…เดŸเดฏเดพเดณเดฎเตเดฃเตเดŸเต, เดžเด™เตเด™เตพ เดŽเดฒเตเดฒเดพเด‚ เด’เดฐเต เดธเดจเตเดฆเตผเดญ เดฎเดพเดจเต‡เดœเตผ เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เดชเตŠเดคเดฟเดฏเตเดจเตเดจเต, เด‡เดชเตเดชเต‹เตพ เดžเด™เตเด™เตพ เด‡เดคเต เดšเต†เดฏเตเดฏเตเดจเตเดจเต:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

เดธเดฎเดฏเด‚ เดตเดจเตเดจเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจเต เดžเด™เตเด™เดณเตเดŸเต† เดกเดพเดฑเตเดฑ เดถเต‡เด–เดฐเดฟเด•เตเด•เตเด• เดžเด™เตเด™เดณเตเดŸเต† เด’เดจเตเดจเดฐ เดจเต‚เดฑเต เดฎเต‡เดถเด•เดณเดฟเตฝ เดจเดฟเดจเตเดจเต. เดตเดณเดฐเต† เด…เดชเตเดฐเดธเด•เตเดคเดฎเดพเดฏ เดตเดฐเดฟเด•เดณเตเดŸเต† เดธเดนเดพเดฏเดคเตเดคเต‹เดŸเต† เดจเดฎเตเด•เตเด•เต เด‡เดคเต เดšเต†เดฏเตเดฏเดพเด‚:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. เด’เดฐเต เดนเตเด•เตเด•เต เดธเดนเดพเดฏเดคเตเดคเต‹เดŸเต† เดจเดฎเตเด•เตเด•เต เดŽเดฏเตผเดซเตเดฒเต‹เดฏเดฟเตฝ เดจเดฟเดจเตเดจเต เดฒเดญเดฟเด•เตเด•เตเด‚ pymssql- เดฌเดจเตเดงเดฟเดชเตเดชเดฟเด•เตเด•เตเด•
  2. เด…เดญเตเดฏเตผเดคเตเดฅเดจเดฏเดฟเดฒเต‡เด•เตเด•เต เด’เดฐเต เดคเต€เดฏเดคเดฟเดฏเตเดŸเต† เดฐเต‚เดชเดคเตเดคเดฟเตฝ เด’เดฐเต เดจเดฟเดฏเดจเตเดคเตเดฐเดฃเด‚ เดฎเดพเดฑเตเดฑเดฟเดธเตเดฅเดพเดชเดฟเด•เตเด•เดพเด‚ - เด‡เดคเต เดŸเต†เด‚เดชเตเดฒเต‡เดฑเตเดฑเต เดŽเดžเตเดšเดฟเตป เดซเด‚เด—เตเดทเดจเดฟเดฒเต‡เด•เตเด•เต เดŽเดฑเดฟเดฏเดชเตเดชเต†เดŸเตเด‚.
  3. เดžเด™เตเด™เดณเตเดŸเต† เด…เดญเตเดฏเตผเดคเตเดฅเดจ เดคเต€เตผเด•เตเด•เตเดจเตเดจเต pandasเด†เดฐเดพเดฃเต เดจเดฎเตเดฎเต† เดจเต‡เดŸเตเด• DataFrame - เด‡เดคเต เดญเดพเดตเดฟเดฏเดฟเตฝ เดžเด™เตเด™เตพเด•เตเด•เต เด‰เดชเดฏเต‹เด—เดชเตเดฐเดฆเดฎเดพเด•เตเด‚.

เดžเดพเตป เดชเด•เดฐเด‚ เดตเดฏเตเด•เตเด•เตเดจเตเดจเดคเต เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เตเดจเตเดจเต {dt} เด’เดฐเต เด…เดญเตเดฏเตผเดคเตเดฅเดจ เดชเดฐเดพเดฎเต€เดฑเตเดฑเดฑเดฟเดจเต เดชเด•เดฐเด‚ %s เดžเดพเดจเตŠเดฐเต เดฆเตเดทเตเดŸเดจเดพเดฏ เดชเดฟเดจเต‹เดšเตเดšเดฟเดฏเต‹ เด†เดฏเดคเตเด•เตŠเดฃเตเดŸเดฒเตเดฒ pandas เด•เตˆเด•เดพเดฐเตเดฏเด‚ เดšเต†เดฏเตเดฏเดพเตป เด•เดดเดฟเดฏเดฟเดฒเตเดฒ pymssql เด…เดตเดธเดพเดจเดคเตเดคเต‡เดคเต เดธเตเดฒเดฟเดชเตเดชเตเดšเต†เดฏเตเดฏเตเด•เดฏเตเด‚ เดšเต†เดฏเตเดฏเตเดจเตเดจเต params: Listเด…เดตเตป เดถเดฐเดฟเด•เตเด•เตเด‚ เด†เด—เตเดฐเดนเดฟเด•เตเด•เตเดจเตเดจเตเดตเต†เด™เตเด•เดฟเดฒเตเด‚ tuple.
เดกเต†เดตเดฒเดชเตเดชเตผ เดŽเดจเตเดจเดคเตเด‚ เดถเตเดฐเดฆเตเดงเดฟเด•เตเด•เตเด• pymssql เด‡เดจเดฟ เด…เดตเดจเต† เดชเดฟเดจเตเดคเตเดฃเดฏเตเด•เตเด•เต‡เดฃเตเดŸเดคเดฟเดฒเตเดฒเต†เดจเตเดจเต เดคเต€เดฐเตเดฎเดพเดจเดฟเดšเตเดšเต, เดชเตเดฑเดคเตเดคเตเดชเต‹เด•เดพเดจเตเดณเตเดณ เดธเดฎเดฏเดฎเดพเดฃเดฟเดคเต pyodbc.

เดŽเดฏเตผเดซเตเดฒเต‹ เดžเด™เตเด™เดณเตเดŸเต† เดซเด‚เด—เตโ€Œเดทเดจเตเด•เดณเตเดŸเต† เด†เตผเด—เตเดฏเตเดฎเต†เดจเตเดฑเตเด•เตพ เดŽเดจเตเดคเต†เดฒเตเดฒเดพเดฎเดพเดฏเดพเดฃเต เดจเดฟเดฑเดšเตเดšเดคเต†เดจเตเดจเต เดจเดฎเตเด•เตเด•เต เดจเต‹เด•เตเด•เดพเด‚:

เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹: ETL เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด•เตเดจเตเดจเต

เดกเดพเดฑเตเดฑ เด‡เดฒเตเดฒเต†เด™เตเด•เดฟเตฝ, เดคเตเดŸเดฐเตเดจเตเดจเดคเดฟเตฝ เด…เตผเดคเตเดฅเดฎเดฟเดฒเตเดฒ. เดŽเดจเตเดจเดพเตฝ เดชเต‚เดฐเดฟเดชเตเดชเดฟเด•เตเด•เตฝ เดตเดฟเดœเดฏเด•เดฐเดฎเดพเดฃเต†เดจเตเดจเต เด•เดฐเตเดคเตเดจเตเดจเดคเตเด‚ เดตเดฟเดšเดฟเดคเตเดฐเดฎเดพเดฃเต. เดŽเดจเตเดจเดพเตฝ เด‡เดคเต เด’เดฐเต เดคเต†เดฑเตเดฑเดฒเตเดฒ. เด†-เด†เดนเต, เดŽเดจเตเดคเต เดšเต†เดฏเตเดฏเดฃเด‚?! เด•เต‚เดŸเดพเดคเต† เด‡เดตเดฟเดŸเต† เดŽเดจเตเดคเดพเดฃเต:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException เดชเดฟเดถเด•เตเด•เดณเตŠเดจเตเดจเตเดฎเดฟเดฒเตเดฒเต†เดจเตเดจเต เดŽเดฏเตผเดซเตเดฒเต‹เดฏเต‹เดŸเต เดชเดฑเดฏเตเด‚, เดชเด•เตเดทเต‡ เดžเด™เตเด™เตพ เดŸเดพเดธเตเด•เต เด’เดดเดฟเดตเดพเด•เตเด•เตเดจเตเดจเต. เด‡เดจเตเดฑเตผเดซเต‡เดธเดฟเดจเต เดชเดšเตเดšเดฏเต‹ เดšเตเดตเดชเตเดชเต‹ เดšเดคเตเดฐเด‚ เด‰เดฃเตเดŸเดพเดฏเดฟเดฐเดฟเด•เตเด•เดฟเดฒเตเดฒ, เดชเด•เตเดทเต‡ เดชเดฟเด™เตเด•เต.

เดจเดฎเตเด•เตเด•เต เดจเดฎเตเดฎเตเดŸเต† เดกเดพเดฑเตเดฑ เดŸเต‹เดธเต เดšเต†เดฏเตเดฏเดพเด‚ เด’เดจเตเดจเดฟเดฒเดงเดฟเด•เด‚ เดจเดฟเดฐเด•เตพ:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

เด‡เดตเตผ:

  • เดžเด™เตเด™เตพ เด“เตผเดกเดฑเตเด•เตพ เดธเตเดตเต€เด•เดฐเดฟเดšเตเดš เดกเดพเดฑเตเดฑเดพเดฌเต‡เดธเต,
  • เดžเด™เตเด™เดณเตเดŸเต† เดตเต†เดณเตเดณเดชเตเดชเตŠเด•เตเด• เดธเต†เดทเดจเตเดฑเต† เดเดกเดฟ (เด…เดคเต เดตเตเดฏเดคเตเดฏเดธเตเดคเดฎเดพเดฏเดฟเดฐเดฟเด•เตเด•เตเด‚ เด“เดฐเต‹ เดœเต‹เดฒเดฟเด•เตเด•เตเด‚),
  • เด‰เดฑเดตเดฟเดŸเดคเตเดคเดฟเตฝ เดจเดฟเดจเตเดจเตเด‚ เด“เตผเดกเตผ เดเดกเดฟเดฏเดฟเตฝ เดจเดฟเดจเตเดจเตเด‚ เด’เดฐเต เดนเดพเดทเต - เด…เดคเดฟเดจเดพเตฝ เด…เดจเตเดคเดฟเดฎ เดกเดพเดฑเตเดฑเดพเดฌเต‡เดธเดฟเตฝ (เดŽเดฒเตเดฒเดพเด‚ เด’เดฐเต เดŸเต‡เดฌเดฟเดณเดฟเดฒเต‡เด•เตเด•เต เดชเด•เดฐเตเดจเตเดจเดฟเดŸเดคเตเดคเต) เดžเด™เตเด™เตพเด•เตเด•เต เด’เดฐเต เด…เดฆเตเดตเดฟเดคเต€เดฏ เด“เตผเดกเตผ เดเดกเดฟ เด‰เดฃเตเดŸเต.

เด…เดตเดธเดพเดจ เด˜เดŸเตเดŸเด‚ เด…เดตเดถเต‡เดทเดฟเด•เตเด•เตเดจเตเดจเต: เดŽเดฒเตเดฒเดพเด‚ เดตเต†เตผเดŸเตเดŸเดฟเด•เตเด•เดฏเดฟเดฒเต‡เด•เตเด•เต เด’เดดเดฟเด•เตเด•เตเด•. เด•เต‚เดŸเดพเดคเต†, เดตเดฟเดšเดฟเดคเตเดฐเดฎเต†เดจเตเดจเต เดชเดฑเดฏเดŸเตเดŸเต†, เด‡เดคเต เดšเต†เดฏเตเดฏเดพเดจเตเดณเตเดณ เดเดฑเตเดฑเดตเตเด‚ เด—เด‚เดญเต€เดฐเดตเตเด‚ เด•เดพเดฐเตเดฏเด•เตเดทเดฎเดตเตเดฎเดพเดฏ เดฎเดพเตผเด—เตเด—เด™เตเด™เดณเดฟเดฒเตŠเดจเตเดจเต CSV เด†เดฃเต!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. เดžเด™เตเด™เตพ เด’เดฐเต เดชเตเดฐเดคเตเดฏเต‡เด• เดฑเดฟเดธเต€เดตเตผ เดจเดฟเตผเดฎเตเดฎเดฟเด•เตเด•เตเดจเตเดจเต StringIO.
  2. pandas เดฆเดฏเดฏเต‹เดŸเต† เดžเด™เตเด™เดณเตเดŸเต† เด‡เดŸเตเด‚ DataFrame เดฐเต‚เดชเดคเตเดคเดฟเตฝ CSV-เดฒเตˆเดจเตเด•เตพ.
  3. เด’เดฐเต เดนเตเด•เตเด•เต เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เดจเดฎเตเด•เตเด•เต เดชเตเดฐเดฟเดฏเดชเตเดชเต†เดŸเตเดŸ เดตเต†เตผเดŸเตเดŸเดฟเด•เตเด•เดฏเดฟเดฒเต‡เด•เตเด•เต เด’เดฐเต เด•เดฃเด•เตเดทเตป เดคเตเดฑเด•เตเด•เดพเด‚.
  4. เด‡เดชเตเดชเต‹เตพ เดธเดนเดพเดฏเดคเตเดคเต‹เดŸเต† copy() เดžเด™เตเด™เดณเตเดŸเต† เดกเดพเดฑเตเดฑ เดตเต†เตผเดŸเตเดŸเดฟเด•เตเด•เดฏเดฟเดฒเต‡เด•เตเด•เต เดจเต‡เดฐเดฟเดŸเตเดŸเต เด…เดฏเดฏเตเด•เตเด•เตเด•!

เดกเตเดฐเตˆเดตเดฑเดฟเตฝ เดจเดฟเดจเตเดจเต เดŽเดคเตเดฐ เดฒเตˆเดจเตเด•เตพ เดชเต‚เดฐเดฟเดชเตเดชเดฟเดšเตเดšเตเดตเต†เดจเตเดจเต เดžเด™เตเด™เตพ เดŽเดŸเตเด•เตเด•เตเด‚, เดŽเดฒเตเดฒเดพเด‚ เดถเดฐเดฟเดฏเดพเดฃเต†เดจเตเดจเต เดธเต†เดทเตป เดฎเดพเดจเต‡เดœเดฐเต‹เดŸเต เดชเดฑเดฏเตเด‚:

session.loaded_rows = cursor.rowcount
session.successful = True

เด…เดคเตเดฐเดฏเต‡เดฏเตเดณเตเดณเต‚.

เดตเดฟเตฝเดชเตเดชเดจเดฏเดฟเตฝ, เดžเด™เตเด™เตพ เดŸเดพเตผเด—เต†เดฑเตเดฑเต เดชเตเดฒเต‡เดฑเตเดฑเต เดธเตเดตเดฎเต‡เดงเดฏเดพ เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเดจเตเดจเต. เด‡เดตเดฟเดŸเต† เดžเดพเตป เด’เดฐเต เดšเต†เดฑเดฟเดฏ เดฏเดจเตเดคเตเดฐเด‚ เด…เดจเตเดตเดฆเดฟเดšเตเดšเต:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

เดžเดพเตป เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เตเดจเตเดจเต VerticaOperator() เดžเดพเตป เด’เดฐเต เดกเดพเดฑเตเดฑเดพเดฌเต‡เดธเต เดธเตเด•เต€เดฎเดฏเตเด‚ เด’เดฐเต เดชเดŸเตเดŸเดฟเด•เดฏเตเด‚ เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเดจเตเดจเต (เด…เดต เด‡เดคเดฟเดจเด•เด‚ เดจเดฟเดฒเดตเดฟเดฒเดฟเดฒเตเดฒเต†เด™เตเด•เดฟเตฝ, เดคเต€เตผเดšเตเดšเดฏเดพเดฏเตเด‚). เดกเดฟเดชเตปเดกเตปเดธเดฟเด•เตพ เดถเดฐเดฟเดฏเดพเดฏเดฟ เด•เตเดฐเดฎเต€เด•เดฐเดฟเด•เตเด•เตเด• เดŽเดจเตเดจเดคเดพเดฃเต เดชเตเดฐเดงเดพเดจ เด•เดพเดฐเตเดฏเด‚:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

เดธเด‚เด—เตเดฐเดนเดฟเด•เตเด•เตเดจเตเดจเต

- เดถเดฐเดฟ, - เดšเต†เดฑเดฟเดฏ เดฎเต—เดธเต เดชเดฑเดžเตเดžเต, - เด…เดฒเตเดฒเต‡, เด‡เดชเตเดชเต‹เตพ
เด•เดพเดŸเตเดŸเดฟเดฒเต† เดเดฑเตเดฑเดตเตเด‚ เดญเต€เด•เดฐเดฎเดพเดฏ เดฎเตƒเด—เด‚ เดžเดพเดจเดพเดฃเต†เดจเตเดจเต เดจเดฟเด™เตเด™เตพเด•เตเด•เต เดฌเต‹เดงเตเดฏเดฎเตเดฃเตเดŸเต‹?

เดœเต‚เดฒเดฟเดฏ เดกเตŠเดฃเดพเตพเดกเตเดธเตบ, เด—เตเดฐเตเดซเดฒเตเดฒเต‹

เดŽเดจเดฟเด•เตเด•เตเด‚ เดŽเดจเตเดฑเต† เดธเดนเดชเตเดฐเดตเตผเดคเตเดคเด•เตผเด•เตเด•เตเด‚ เด’เดฐเต เดฎเดคเตเดธเดฐเดฎเตเดฃเตเดŸเต†เด™เตเด•เดฟเตฝ: เด†เดฆเตเดฏเด‚ เดฎเตเดคเตฝ เด†เดฐเดพเดฃเต เดชเต†เดŸเตเดŸเต†เดจเตเดจเต เด’เดฐเต ETL เดชเตเดฐเต‹เดธเดธเตเดธเต เดธเตƒเดทเตโ€ŒเดŸเดฟเด•เตเด•เตเด•เดฏเตเด‚ เดธเดฎเดพเดฐเด‚เดญเดฟเด•เตเด•เตเด•เดฏเตเด‚ เดšเต†เดฏเตเดฏเตเดจเตเดจเดคเต: เด…เดตเตผ เด…เดตเดฐเตเดŸเต† SSIS-เด‰เด‚ เด’เดฐเต เดฎเต—เดธเตเด‚ เด’เดชเตเดชเด‚ เดžเดพเดจเตเด‚ เดŽเดฏเตผเดซเตเดฒเต‹เดฏเตเด‚ ... เดคเตเดŸเตผเดจเตเดจเต เดžเด™เตเด™เตพ เด…เดฑเตเดฑเด•เตเดฑเตเดฑเดชเตเดชเดฃเดฟเดฏเตเดŸเต† เดŽเดณเตเดชเตเดชเดตเตเด‚ เดคเดพเดฐเดคเดฎเตเดฏเด‚ เดšเต†เดฏเตเดฏเตเด‚ ... เด•เตŠเดณเตเดณเดพเด‚, เดžเดพเตป เด…เดตเดฐเต† เดŽเดฒเตเดฒเดพ เดฎเตเดจเตเดจเดฃเดฟเด•เดณเดฟเดฒเตเด‚ เดคเต‹เตฝเดชเตเดชเดฟเด•เตเด•เตเดฎเต†เดจเตเดจเต เดจเดฟเด™เตเด™เตพ เดธเดฎเตเดฎเดคเดฟเด•เตเด•เตเดฎเต†เดจเตเดจเต เดžเดพเตป เด•เดฐเตเดคเตเดจเตเดจเต!

เด•เตเดฑเดšเตเดšเตเด•เต‚เดŸเดฟ เด—เต—เดฐเดตเดฎเดพเดฃเต†เด™เตเด•เดฟเตฝ, เด…เดชเตเดชเดพเดšเตเดšเต† เดŽเดฏเตผเดซเตเดฒเต‹ - เดชเตเดฐเต‹เด—เตเดฐเดพเด‚ เด•เต‹เดกเดฟเดจเตเดฑเต† เดฐเต‚เดชเดคเตเดคเดฟเตฝ เดชเตเดฐเด•เตเดฐเดฟเดฏเด•เตพ เดตเดฟเดตเดฐเดฟเดšเตเดšเตเด•เตŠเดฃเตเดŸเต - เดŽเดจเตเดฑเต† เดœเต‹เดฒเดฟ เดšเต†เดฏเตเดคเต เดตเดณเดฐเต†เดฏเดงเดฟเด•เด‚ เด•เต‚เดŸเตเดคเตฝ เดธเตเด–เด•เดฐเดตเตเด‚ เด†เดธเตเดตเดพเดฆเตเดฏเด•เดฐเดตเตเดฎเดพเดฃเต.

เดชเตเดฒเด—เต-เด‡เดจเตเดจเตเด•เดณเตเดŸเต†เดฏเตเด‚ เดธเตเด•เต‡เดฒเดฌเดฟเดฒเดฟเดฑเตเดฑเดฟเดฏเตเดŸเต† เดฎเตเตปเด•เดฐเตเดคเดฒเดฟเดจเตเดฑเต†เดฏเตเด‚ เด•เดพเดฐเตเดฏเดคเตเดคเดฟเตฝ เด…เดคเดฟเดจเตเดฑเต† เดชเดฐเดฟเดงเดฟเดฏเดฟเดฒเตเดฒเดพเดคเตเดค เดตเดฟเดชเตเดฒเต€เด•เดฐเดฃเด‚, เดฎเดฟเด•เตเด•เดตเดพเดฑเตเด‚ เดเดคเต เดฎเต‡เด–เดฒเดฏเดฟเดฒเตเด‚ เดŽเดฏเตผเดซเตเดฒเต‹ เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เดพเดจเตเดณเตเดณ เด…เดตเดธเดฐเด‚ เดจเดฟเด™เตเด™เตพเด•เตเด•เต เดจเตฝเด•เตเดจเตเดจเต: เดกเดพเดฑเตเดฑ เดถเต‡เด–เดฐเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเดจเตเด‚ เดคเดฏเตเดฏเดพเดฑเดพเด•เตเด•เตเดจเตเดจเดคเดฟเดจเตเด‚ เดชเตเดฐเต‹เดธเดธเตเดธเต เดšเต†เดฏเตเดฏเตเดจเตเดจเดคเดฟเดจเตเดฎเตเดณเตเดณ เดฎเตเดดเตเดตเตป เดšเด•เตเดฐเดคเตเดคเดฟเดฒเตเด‚, เดฑเต‹เด•เตเด•เดฑเตเดฑเตเด•เตพ เดตเดฟเด•เตเดทเต‡เดชเดฟเด•เตเด•เตเดฎเตเดชเต‹เดดเตเด‚ (เดšเตŠเดตเตเดตเดฏเดฟเดฒเต‡เด•เตเด•เต, เด•เต‹เดดเตเดธเต).

เดญเดพเด—เด‚ เด…เดจเตเดคเดฟเดฎเดตเตเด‚ เดฑเดซเดฑเตปเดธเตเด‚ เดตเดฟเดตเดฐเด™เตเด™เดณเตเด‚

เดžเด™เตเด™เตพ เดจเดฟเด™เตเด™เตพเด•เตเด•เดพเดฏเดฟ เดถเต‡เด–เดฐเดฟเดšเตเดš เดฑเต‡เด•เตเด•เต

  • start_date. เด…เดคเต†, เด‡เดคเต เด‡เดคเดฟเดจเด•เด‚ เด’เดฐเต เดชเตเดฐเดพเดฆเต‡เดถเดฟเด• เดฎเต†เดฎเตเดฎเดพเดฃเต. เดกเด—เดฟเดจเตเดฑเต† เดชเตเดฐเดงเดพเดจ เดตเดพเดฆเด‚ เดตเดดเดฟ start_date เดŽเดฒเตเดฒเดพเด‚ เด•เดŸเดจเตเดจเตเดชเต‹เด•เตเด•. เดšเตเดฐเตเด•เตเด•เดคเตเดคเดฟเตฝ, เดจเดฟเด™เตเด™เตพ เดตเตเดฏเด•เตเดคเดฎเดพเด•เตเด•เตเด•เดฏเดพเดฃเต†เด™เตเด•เดฟเตฝ start_date เดจเดฟเดฒเดตเดฟเดฒเต† เดคเต€เดฏเดคเดฟ, เด’เดชเตเดชเด‚ schedule_interval - เด’เดฐเต เดฆเดฟเดตเดธเด‚, เดชเดฟเดจเตเดจเต† DAG เดจเดพเดณเต† เด†เดฐเด‚เดญเดฟเด•เตเด•เตเด‚.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    เดชเดฟเดจเตเดจเต† เดชเตเดฐเดถเตโ€Œเดจเด™เตเด™เดณเตŠเดจเตเดจเตเดฎเดฟเดฒเตเดฒ.

    เด‡เดคเตเดฎเดพเดฏเดฟ เดฌเดจเตเดงเดชเตเดชเต†เดŸเตเดŸ เดฎเดฑเตเดฑเตŠเดฐเต เดฑเตบเดŸเตˆเด‚ เดชเดฟเดถเด•เต เด‰เดฃเตเดŸเต: Task is missing the start_date parameter, เด‡เดคเต เดฎเดฟเด•เตเด•เดชเตเดชเต‹เดดเตเด‚ เดธเต‚เดšเดฟเดชเตเดชเดฟเด•เตเด•เตเดจเตเดจเดคเต เดจเดฟเด™เตเด™เตพ เดกเดพเด—เต เด“เดชเตเดชเดฑเต‡เดฑเตเดฑเดฑเตเดฎเดพเดฏเดฟ เดฌเดจเตเดงเดฟเดชเตเดชเดฟเด•เตเด•เดพเตป เดฎเดฑเดจเตเดจเตเดตเต†เดจเตเดจเดพเดฃเต.

  • เดŽเดฒเตเดฒเดพเด‚ เด’เดฐเต เดฎเต†เดทเต€เดจเดฟเตฝ. เด…เดคเต†, เด•เต‚เดŸเดพเดคเต† เดฌเต‡เดธเตเด•เดณเตเด‚ (เดŽเดฏเตผเดซเตเดฒเต‹เดฏเตเด‚ เดžเด™เตเด™เดณเตเดŸเต† เด•เต‹เดŸเตเดŸเดฟเด‚เด—เตเด‚), เด’เดฐเต เดตเต†เดฌเต เดธเต†เตผเดตเดฑเตเด‚ เด’เดฐเต เดทเต†เดกเตเดฏเต‚เดณเดฑเตเด‚ เดคเตŠเดดเดฟเดฒเดพเดณเดฟเด•เดณเตเด‚. เด…เดคเต เดชเต‹เดฒเตเด‚ เดชเตเดฐเดตเตผเดคเตเดคเดฟเดšเตเดšเต. เดŽเดจเตเดจเดพเตฝ เด•เดพเดฒเด•เตเดฐเดฎเต‡เดฃ, เดธเต‡เดตเดจเด™เตเด™เตพเด•เตเด•เดพเดฏเตเดณเตเดณ เดœเต‹เดฒเดฟเด•เดณเตเดŸเต† เดŽเดฃเตเดฃเด‚ เดตเตผเดฆเตเดงเดฟเดšเตเดšเต, PostgreSQL 20 ms-เดจเต เดชเด•เดฐเด‚ 5 เดธเต†เด•เตเด•เตปเดกเดฟเตฝ เดธเต‚เดšเดฟเด•เดฏเต‹เดŸเต เดชเตเดฐเดคเดฟเด•เดฐเดฟเด•เตเด•เดพเตป เดคเตเดŸเด™เตเด™เดฟเดฏเดชเตเดชเต‹เตพ, เดžเด™เตเด™เตพ เด…เดคเต เดŽเดŸเตเดคเตเดคเต เด•เตŠเดฃเตเดŸเตเดชเต‹เดฏเดฟ.
  • เดฒเต‹เด•เตเด•เตฝ เดŽเด•เตเดธเดฟเด•เตเดฏเต‚เดŸเตเดŸเตผ. เด…เดคเต†, เดžเด™เตเด™เตพ เด‡เดชเตเดชเต‹เดดเตเด‚ เด…เดคเดฟเตฝ เด‡เดฐเดฟเด•เตเด•เตเด•เดฏเดพเดฃเต, เดžเด™เตเด™เตพ เด‡เดคเดฟเดจเด•เด‚ เด…เด—เดพเดงเดคเตเดคเดฟเดจเตเดฑเต† เด…เดฐเดฟเด•เดฟเตฝ เดŽเดคเตเดคเดฟเดฏเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจเต. LocalExecutor เดžเด™เตเด™เตพเด•เตเด•เต เด‡เดคเตเดตเดฐเต† เดฎเดคเดฟเดฏเดพเดฏเดฟเดฐเตเดจเตเดจเต, เดŽเดจเตเดจเดพเตฝ เด‡เดชเตเดชเต‹เตพ เด’เดฐเต เดคเตŠเดดเดฟเดฒเดพเดณเดฟเดฏเต†เด™เตเด•เดฟเดฒเตเดฎเดพเดฏเดฟ เดตเดฟเด•เดธเดฟเดชเตเดชเดฟเด•เตเด•เดพเดจเตเดณเตเดณ เดธเดฎเดฏเดฎเดพเดฏเดฟ, CeleryExecutor-เดฒเต‡เด•เตเด•เต เดฎเดพเดฑเดพเตป เดžเด™เตเด™เตพ เด•เด เดฟเดจเดฎเดพเดฏเดฟ เดชเดฐเดฟเดถเตเดฐเดฎเดฟเด•เตเด•เต‡เดฃเตเดŸเดฟเดตเดฐเตเด‚. เดจเดฟเด™เตเด™เตพเด•เตเด•เต เด’เดฐเต เดฎเต†เดทเต€เดจเดฟเตฝ เด‡เดคเต เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เดชเตเดฐเดตเตผเดคเตเดคเดฟเด•เตเด•เดพเตป เด•เดดเดฟเดฏเตเด‚ เดŽเดจเตเดจ เดตเดธเตเดคเตเดค เด•เดฃเด•เตเด•เดฟเดฒเต†เดŸเตเดคเตเดคเต, เด’เดฐเต เดธเต†เตผเดตเดฑเดฟเตฝ เดชเต‹เดฒเตเด‚ เดธเต†เดฒเดฑเดฟ เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เตเดจเตเดจเดคเดฟเตฝ เดจเดฟเดจเตเดจเต เด’เดจเตเดจเตเด‚ เดจเดฟเด™เตเด™เดณเต† เดคเดŸเดฏเตเดจเตเดจเดฟเดฒเตเดฒ, เด…เดคเต "เดคเต€เตผเดšเตเดšเดฏเดพเดฏเตเด‚, เด’เดฐเดฟเด•เตเด•เดฒเตเด‚ เด‰เตฝเดชเดพเดฆเดจเดคเตเดคเดฟเดฒเต‡เด•เตเด•เต เดชเต‹เด•เดฟเดฒเตเดฒ, เดธเดคเตเดฏเดธเดจเตเดงเดฎเดพเดฏเดฟ!"
  • เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เดพเดคเตเดคเดคเต เด…เดจเตเดคเตผเดจเดฟเตผเดฎเตเดฎเดฟเดค เด‰เดชเด•เดฐเดฃเด™เตเด™เตพ:
    • เด•เดฃเด•เตเดทเดจเตเด•เตพ เดธเต‡เดตเดจ เด•เตเดฐเต†เดกเตปเดทเตเดฏเดฒเตเด•เตพ เดธเต‚เด•เตเดทเดฟเด•เตเด•เดพเตป,
    • SLA เดฎเดฟเดธเตเดธเตเด•เตพ เด•เตƒเดคเตเดฏเดธเดฎเดฏเดคเตเดคเต เดชเตเดฐเดตเตผเดคเตเดคเดฟเด•เตเด•เดพเดคเตเดค เดœเต‹เดฒเดฟเด•เดณเต‹เดŸเต เดชเตเดฐเดคเดฟเด•เดฐเดฟเด•เตเด•เดพเตป,
    • xcom เดฎเต†เดฑเตเดฑเดพเดกเดพเดฑเตเดฑ เด•เตˆเดฎเดพเดฑเตเดฑเดคเตเดคเดฟเดจเดพเดฏเดฟ (เดžเดพเตป เดชเดฑเดžเตเดžเต เดฎเต†เดฑเตเดฑเดพเดกเดพเดฑเตเดฑ!) เดกเดพเด—เต เดŸเดพเดธเตเด•เตเด•เตเด•เตพเด•เตเด•เดฟเดŸเดฏเดฟเตฝ.
  • เดฎเต†เดฏเดฟเตฝ เดฆเตเดฐเตเดชเดฏเต‹เด—เด‚. เดถเดฐเดฟ, เดŽเดจเดฟเด•เตเด•เต เดŽเดจเตเดคเต เดชเดฑเดฏเดพเตป เด•เดดเดฟเดฏเตเด‚? เดตเต€เดดเตเดš เดตเดฐเตเดคเตเดคเดฟเดฏ เดœเต‹เดฒเดฟเด•เดณเตเดŸเต† เดŽเดฒเตเดฒเดพ เด†เดตเตผเดคเตเดคเดจเด™เตเด™เตพเด•เตเด•เตเด‚ เด…เดฒเต‡เตผเดŸเตเดŸเตเด•เตพ เดธเดœเตเดœเต€เด•เดฐเดฟเดšเตเดšเต. เด‡เดชเตเดชเต‹เตพ เดŽเดจเตเดฑเต† เดตเตผเด•เตเด•เต Gmail-เตฝ เดŽเดฏเตผเดซเตเดฒเต‹เดฏเดฟเตฝ เดจเดฟเดจเตเดจเต 90k เด‡เดฎเต†เดฏเดฟเดฒเตเด•เตพ เด‰เดฃเตเดŸเต, เด•เต‚เดŸเดพเดคเต† เดตเต†เดฌเต เดฎเต†เดฏเดฟเตฝ เดฎเต‚เด•เตเด•เต เด’เดฐเต เดธเดฎเดฏเด‚ 100-เตฝ เด•เต‚เดŸเตเดคเตฝ เดŽเดŸเตเด•เตเด•เดพเดจเตเด‚ เด‡เดฒเตเดฒเดพเดคเดพเด•เตเด•เดพเดจเตเด‚ เดตเดฟเดธเดฎเตเดฎเดคเดฟเด•เตเด•เตเดจเตเดจเต.

เด•เต‚เดŸเตเดคเตฝ เด•เตเดดเดชเตเดชเด™เตเด™เตพ: Apache Airflow Pitfails

เด•เต‚เดŸเตเดคเตฝ เด“เดŸเตเดŸเต‹เดฎเต‡เดทเตป เดŸเต‚เดณเตเด•เตพ

เด•เตˆเด•เตŠเดฃเตเดŸเดฒเตเดฒ, เดคเดฒเด•เตŠเดฃเตเดŸเต เด•เต‚เดŸเตเดคเตฝ เดชเตเดฐเดตเตผเดคเตเดคเดฟเด•เตเด•เดพเตป, เดŽเดฏเตผเดซเตเดฒเต‹ เดžเด™เตเด™เตพเด•เตเด•เดพเดฏเดฟ เด‡เดคเต เดคเดฏเตเดฏเดพเดฑเดพเด•เตเด•เดฟเดฏเดฟเดŸเตเดŸเตเดฃเตเดŸเต:

  • REST API - เด…เดฆเตเดฆเต‡เดนเดคเตเดคเดฟเดจเต เด‡เดชเตเดชเต‹เดดเตเด‚ เดชเดฐเต€เด•เตเดทเดฃเดพเดคเตเดฎเด• เดชเดฆเดตเดฟเดฏเตเดฃเตเดŸเต, เด…เดคเต เด…เดตเดจเต† เดชเตเดฐเดตเตผเดคเตเดคเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเตฝ เดจเดฟเดจเตเดจเต เดคเดŸเดฏเตเดจเตเดจเดฟเดฒเตเดฒ. เด‡เดคเต เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต, เดจเดฟเด™เตเด™เตพเด•เตเด•เต เดกเดพเด—เตเด•เดณเต‡เดฏเตเด‚ เดŸเดพเดธเตเด•เตเด•เตเด•เดณเต‡เดฏเตเด‚ เด•เตเดฑเดฟเดšเตเดšเตเดณเตเดณ เดตเดฟเดตเดฐเด™เตเด™เตพ เดฎเดพเดคเตเดฐเดฎเดฒเตเดฒ, เด’เดฐเต เดกเดพเด—เต เดจเดฟเตผเดคเตเดคเดพเดจเตเด‚ / เด†เดฐเด‚เดญเดฟเด•เตเด•เดพเดจเตเด‚, เด’เดฐเต DAG เดฑเตบ เด…เดฒเตเดฒเต†เด™เตเด•เดฟเตฝ เด’เดฐเต เดชเต‚เตพ เดธเตƒเดทเตเดŸเดฟเด•เตเด•เดพเดจเตเด‚ เด•เดดเดฟเดฏเตเด‚.
  • CLI - WebUI เดตเดดเดฟ เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เดพเตป เด…เดธเต—เด•เดฐเตเดฏเด‚ เดฎเดพเดคเตเดฐเดฎเดฒเตเดฒ, เดชเตŠเดคเตเดตเต† เด‡เดฒเตเดฒเดพเดคเตเดคเดคเตเดฎเดพเดฏ เดจเดฟเดฐเดตเดงเดฟ เดŸเต‚เดณเตเด•เตพ เด•เดฎเดพเตปเดกเต เดฒเตˆเดจเดฟเดฒเต‚เดŸเต† เดฒเดญเตเดฏเดฎเดพเดฃเต. เด‰เดฆเดพเดนเดฐเดฃเดคเตเดคเดฟเดจเต:
    • backfill เดŸเดพเดธเตโ€Œเด•เต เดธเดจเตเดฆเตผเดญเด™เตเด™เตพ เดชเตเดจเดฐเดพเดฐเด‚เดญเดฟเด•เตเด•เต‡เดฃเตเดŸเดคเตเดฃเตเดŸเต.
      เด‰เดฆเดพเดนเดฐเดฃเดคเตเดคเดฟเดจเต, เดตเดฟเดถเด•เดฒเดจ เดตเดฟเดฆเด—เตเดงเตผ เดตเดจเตเดจเต เดชเดฑเดžเตเดžเต: โ€œเดธเด–เดพเดตเต‡, เดœเดจเตเดตเดฐเดฟ 1 เดฎเตเดคเตฝ 13 เดตเดฐเต†เดฏเตเดณเตเดณ เดกเดพเดฑเตเดฑเดฏเดฟเตฝ เดจเดฟเด™เตเด™เตพเด•เตเด•เต เด…เดธเด‚เดฌเดจเตเดงเดฎเตเดฃเตเดŸเต! เดถเดฐเดฟเดฏเดพเด•เตเด•เตเด•, เดถเดฐเดฟเดฏเดพเด•เตเด•เตเด•, เดถเดฐเดฟเดฏเดพเด•เตเด•เตเด•, เดถเดฐเดฟเดฏเดพเด•เตเด•เตเด•!" เดจเดฟเด™เตเด™เตพ เด…เดคเตเดคเดฐเดฎเตŠเดฐเต เดนเต‹เดฌเต เด†เดฃเต:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • เด…เดŸเดฟเดธเตเดฅเดพเดจ เดธเต‡เดตเดจเด‚: initdb, resetdb, upgradedb, checkdb.
    • run, เด’เดฐเต เด‡เตปเดธเตโ€Œเดฑเตเดฑเตปเดธเต เดŸเดพเดธเตโ€Œเด•เต เดชเตเดฐเดตเตผเดคเตเดคเดฟเดชเตเดชเดฟเด•เตเด•เดพเดจเตเด‚ เดŽเดฒเตเดฒเดพ เดกเดฟเดชเตปเดกเตปเดธเดฟเด•เดณเดฟเดฒเตเด‚ เดธเตโ€Œเด•เต‹เตผ เดšเต†เดฏเตเดฏเดพเดจเตเด‚ เด‡เดคเต เดจเดฟเด™เตเด™เดณเต† เด…เดจเตเดตเดฆเดฟเด•เตเด•เตเดจเตเดจเต. เด•เต‚เดŸเดพเดคเต†, เดจเดฟเด™เตเด™เตพเด•เตเด•เต เด‡เดคเต เดตเดดเดฟ เดชเตเดฐเดตเตผเดคเตเดคเดฟเดชเตเดชเดฟเด•เตเด•เดพเตป เด•เดดเดฟเดฏเตเด‚ LocalExecutor, เดจเดฟเด™เตเด™เตพเด•เตเด•เต เด’เดฐเต เดธเต†เดฒเดฑเดฟ เด•เตเดฒเดธเตเดฑเตเดฑเตผ เด‰เดฃเตเดŸเต†เด™เตเด•เดฟเตฝ เดชเต‹เดฒเตเด‚.
    • เดเดคเดพเดฃเตเดŸเต เด’เดฐเต‡ เด•เดพเดฐเตเดฏเด‚ เดšเต†เดฏเตเดฏเตเดจเตเดจเต test, เด…เดŸเดฟเดธเตเดฅเดพเดจเด™เตเด™เดณเดฟเตฝ เดฎเดพเดคเตเดฐเด‚ เด’เดจเตเดจเตเด‚ เดŽเดดเตเดคเตเดจเตเดจเดฟเดฒเตเดฒ.
    • connections เดทเต†เดฒเตเดฒเดฟเตฝ เดจเดฟเดจเตเดจเตเดณเตเดณ เด•เดฃเด•เตเดทเดจเตเด•เตพ เดตเตปเดคเต‹เดคเดฟเตฝ เดธเตƒเดทเตเดŸเดฟเด•เตเด•เดพเตป เด…เดจเตเดตเดฆเดฟเด•เตเด•เตเดจเตเดจเต.
  • เดชเตˆเดคเตเดคเตบ API - เดธเด‚เดตเดฆเดฟเด•เตเด•เดพเดจเตเดณเตเดณ เด’เดฐเต เดนเดพเตผเดกเตโ€Œเด•เต‹เตผ เดฎเดพเตผเด—เด‚, เด…เดคเต เดชเตเดฒเด—เดฟเดจเตเด•เตพเด•เตเด•เดพเดฏเดฟ เด‰เดฆเตเดฆเต‡เดถเดฟเดšเตเดšเตเดณเตเดณเดคเดพเดฃเต, เดฎเดพเดคเตเดฐเดฎเดฒเตเดฒ เด…เดคเดฟเตฝ เดšเต†เดฑเดฟเดฏ เด•เตˆเด•เดณเดพเตฝ เด•เต‚เดŸเตเดŸเด‚ เด•เต‚เดŸเดฐเตเดคเต. เดชเด•เตเดทเต† เดžเด™เตเด™เดณเต† เดชเต‹เด•เตเดจเตเดจเดคเดฟเตฝ เดจเดฟเดจเตเดจเต เด†เดฐเดพเดฃเต เดคเดŸเดฏเตเด• /home/airflow/dags, เด“เดŸเตเด• ipython เดชเดฟเดจเตเดจเต† เด•เตเดดเดชเตเดชเด‚ เดคเตเดŸเด™เตเด™เดฃเต‹? เด‰เดฆเดพเดนเดฐเดฃเดคเตเดคเดฟเดจเต, เด‡เดจเดฟเดชเตเดชเดฑเดฏเตเดจเตเดจ เด•เต‹เดกเต เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เดจเดฟเด™เตเด™เตพเด•เตเด•เต เดŽเดฒเตเดฒเดพ เด•เดฃเด•เตเดทเดจเตเด•เดณเตเด‚ เด•เดฏเดฑเตเดฑเตเดฎเดคเดฟ เดšเต†เดฏเตเดฏเดพเตป เด•เดดเดฟเดฏเตเด‚:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • เดŽเดฏเตผเดซเตเดฒเต‹ เดฎเต†เดฑเตเดฑเดพเดกเดพเดฑเตเดฑเดฌเต‡เดธเดฟเดฒเต‡เด•เตเด•เต เดฌเดจเตเดงเดฟเดชเตเดชเดฟเด•เตเด•เตเดจเตเดจเต. เด‡เดคเดฟเดฒเต‡เด•เตเด•เต เดŽเดดเตเดคเดพเตป เดžเดพเตป เดถเตเดชเดพเตผเดถ เดšเต†เดฏเตเดฏเตเดจเตเดจเดฟเดฒเตเดฒ, เดŽเดจเตเดจเดพเตฝ เดตเดฟเดตเดฟเดง เดจเดฟเตผเดฆเตเดฆเดฟเดทเตเดŸ เดฎเต†เดŸเตเดฐเดฟเด•เตเด•เตเด•เตพเด•เตเด•เดพเดฏเดฟ เดŸเดพเดธเตโ€Œเด•เต เดธเตเดฑเตเดฑเต‡เดฑเตเดฑเตเด•เตพ เดฒเดญเดฟเด•เตเด•เตเดจเตเดจเดคเต เดเดคเต†เด™เตเด•เดฟเดฒเตเด‚ API-เด•เตพ เดตเดดเดฟเดฏเตเดณเตเดณเดคเดฟเดจเต‡เด•เตเด•เดพเตพ เดตเต‡เด—เดคเตเดคเดฟเดฒเตเด‚ เดŽเดณเตเดชเตเดชเดคเตเดคเดฟเดฒเตเด‚ เด†เดฏเดฟเดฐเดฟเด•เตเด•เตเด‚.

    เดจเดฎเตเดฎเตเดŸเต† เดŽเดฒเตเดฒเดพ เดœเต‹เดฒเดฟเด•เดณเตเด‚ เดจเดฟเตผเดœเตเดœเต€เดตเดฎเดฒเตเดฒเต†เดจเตเดจเต เดจเดฎเตเด•เตเด•เต เดชเดฑเดฏเดพเด‚, เดชเด•เตเดทเต‡ เด…เดต เดšเดฟเดฒเดชเตเดชเต‹เตพ เดตเต€เดดเดพเด‚, เด‡เดคเต เดธเดพเดงเดพเดฐเดฃเดฎเดพเดฃเต. เดŽเดจเตเดจเดพเตฝ เดšเดฟเดฒ เดคเดŸเดธเตเดธเด™เตเด™เตพ เด‡เดคเดฟเดจเด•เด‚ เดธเด‚เดถเดฏเดพเดธเตเดชเดฆเดฎเดพเดฃเต, เด…เดคเต เดชเดฐเดฟเดถเต‹เดงเดฟเด•เตเด•เต‡เดฃเตเดŸเดคเต เด†เดตเดถเตเดฏเดฎเดพเดฃเต.

    SQL เดธเต‚เด•เตเดทเดฟเด•เตเด•เตเด•!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

เดฑเต†เดซเดฑเตปเดธเตเด•เตพ

เดคเต€เตผเดšเตเดšเดฏเดพเดฏเตเด‚, Google-เดจเตเดฑเต† เด‡เดทเตเดฏเต‚เดตเดฟเตฝ เดจเดฟเดจเตเดจเตเดณเตเดณ เด†เดฆเตเดฏเดคเตเดคเต† เดชเดคเตเดคเต เดฒเดฟเด™เตเด•เตเด•เตพ เดŽเดจเตเดฑเต† เดฌเตเด•เตเด•เตโ€Œเดฎเดพเตผเด•เตเด•เตเด•เดณเดฟเตฝ เดจเดฟเดจเตเดจเตเดณเตเดณ เดŽเดฏเตผเดซเตเดฒเต‹ เดซเต‹เตพเดกเดฑเดฟเดฒเต† เด‰เดณเตเดณเดŸเด•เตเด•เด™เตเด™เดณเดพเดฃเต.

เดฒเต‡เด–เดจเดคเตเดคเดฟเตฝ เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจ เดฒเดฟเด™เตเด•เตเด•เดณเตเด‚:

เด…เดตเดฒเด‚เดฌเด‚: www.habr.com