Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

เบชเบฐเบšเบฒเบเบ”เบต, เบ‚เป‰เบญเบเปเบกเปˆเบ™ Dmitry Logvinenko - เบงเบดเบชเบฐเบงเบฐเบเบญเบ™เบ‚เปเป‰เบกเบนเบ™เบ‚เบญเบ‡เบžเบฐเปเบ™เบเบเบฒเบ™เบงเบดเป€เบ„เบฒเบฐเบ‚เบญเบ‡เบเบธเปˆเบกเบšเปเบฅเบดเบชเบฑเบ” Vezet.

เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบˆเบฐเบšเบญเบเบ—เปˆเบฒเบ™เบเปˆเบฝเบงเบเบฑเบšเป€เบ„เบทเปˆเบญเบ‡เบกเบทเบ—เบตเปˆเบ›เบฐเป€เบชเบตเบ”เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบžเบฑเบ”เบ—เบฐเบ™เบฒเบ‚เบฐเบšเบงเบ™เบเบฒเบ™ ETL - Apache Airflow. เปเบ•เปˆ Airflow เปเบกเปˆเบ™เบกเบตเบ„เบงเบฒเบกเบซเบฅเบฒเบเบซเบฅเบฒเบเปเบฅเบฐเบกเบตเบซเบผเบฒเบเบฎเบนเบšเปเบšเบšเบ—เบตเปˆเบ—เปˆเบฒเบ™เบ„เบงเบ™เบžเบดเบˆเบฒเบฅเบฐเบ™เบฒเป€เบšเบดเปˆเบ‡เบกเบฑเบ™เบขเปˆเบฒเบ‡เปƒเบเป‰เบŠเบดเบ”เป€เบ–เบดเบ‡เปเบกเปˆเบ™เบงเปˆเบฒเบ—เปˆเบฒเบ™เบˆเบฐเบšเปเปˆเบกเบตเบชเปˆเบงเบ™เบฎเปˆเบงเบกเปƒเบ™เบเบฒเบ™เป„เบซเบฅเบ‚เบญเบ‡เบ‚เปเป‰เบกเบนเบ™, เปเบ•เปˆเบ•เป‰เบญเบ‡เบกเบตเบเบฒเบ™เป€เบ›เบตเบ”เบ•เบปเบงเบ‚เบฐเบšเบงเบ™เบเบฒเบ™เบ•เปˆเบฒเบ‡เป†เป€เบ›เบฑเบ™เป„เบฅเบเบฐเปเบฅเบฐเบ•เบดเบ”เบ•เบฒเบกเบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ”เบ‚เบญเบ‡เบžเบงเบเป€เบ‚เบปเบฒ.

เปเบฅเบฐเปเบกเปˆเบ™เปเบฅเป‰เบง, เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบˆเบฐเบšเปเปˆเบžเบฝเบ‡เปเบ•เปˆเบšเบญเบ, เปเบ•เปˆเบเบฑเบ‡เบชเบฐเปเบ”เบ‡เปƒเบซเป‰เป€เบซเบฑเบ™: เป‚เบ„เบ‡เบเบฒเบ™เบกเบตเบซเบผเบฒเบเบฅเบฐเบซเบฑเบ”, screenshots เปเบฅเบฐเบ„เปเบฒเปเบ™เบฐเบ™เปเบฒ.

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™
เบชเบดเปˆเบ‡เบ—เบตเปˆเบ—เปˆเบฒเบ™เบกเบฑเบเบˆเบฐเป€เบซเบฑเบ™เปƒเบ™เป€เบงเบฅเบฒเบ—เบตเปˆเบ—เปˆเบฒเบ™ google เบ„เปเบฒเบงเปˆเบฒ Airflow / Wikimedia Commons

เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เป€เบ™เบทเป‰เบญเบซเบฒ

เบเบฒเบ™เบ™เปเบฒเบชเบฐเป€เบซเบ™เบต

Apache Airflow เปเบกเปˆเบ™เบ„เบทเบเบฑเบ™เบเบฑเบš Django:

  • เบ‚เบฝเบ™เปƒเบ™ python
  • เบกเบตเบ„เบฐเบ™เบฐเบšเปเบฅเบดเบซเบฒเบ™เบ‡เบฒเบ™เบ—เบตเปˆเบ”เบต,
  • เบชเบฒเบกเบฒเบ”เบ‚เบฐเบซเบเบฒเบเป„เบ”เป‰เบขเปˆเบฒเบ‡เบšเปเปˆเบกเบตเบเบณเบ™เบปเบ”

- เบžเบฝเบ‡โ€‹เปเบ•เปˆโ€‹เบ”เบตเบโ€‹เบงเปˆเบฒโ€‹, เปเบฅเบฐโ€‹เบกเบฑเบ™โ€‹เป„เบ”เป‰โ€‹เบ–เบทเบโ€‹เบชเป‰เบฒเบ‡โ€‹เบ‚เบถเป‰เบ™โ€‹เบชเปเบฒโ€‹เบฅเบฑเบšโ€‹เบˆเบธเบ”โ€‹เบ›เบฐโ€‹เบชเบปเบ‡โ€‹เบ—เบตเปˆโ€‹เปเบ•เบโ€‹เบ•เปˆเบฒเบ‡โ€‹เบเบฑเบ™โ€‹เบซเบกเบปเบ”โ€‹, เบ„เบท (เบ”เบฑเปˆเบ‡โ€‹เบ—เบตเปˆโ€‹เบกเบฑเบ™โ€‹เป„เบ”เป‰โ€‹เบ–เบทเบโ€‹เบ‚เบฝเบ™โ€‹เป„เบงเป‰โ€‹เบเปˆเบญเบ™ katโ€‹)โ€‹:

  • เปเบฅเปˆเบ™โ€‹เปเบฅเบฐโ€‹เบ•เบดเบ”โ€‹เบ•เบฒเบกโ€‹เบเบงเบ”โ€‹เบเบฒโ€‹เบงเบฝเบโ€‹เบ‡เบฒเบ™โ€‹เปƒเบ™โ€‹เบˆเปเบฒโ€‹เบ™เบงเบ™โ€‹เบšเปเปˆโ€‹เบˆเปเบฒโ€‹เบเบฑเบ”โ€‹เบ‚เบญเบ‡โ€‹เป€เบ„เบทเปˆเบญเบ‡โ€‹เบˆเบฑเบ (เป€เบ›เบฑเบ™ Celery / Kubernetes เบซเบผเบฒเบโ€‹เปเบฅเบฐโ€‹เบˆเบดเบ”โ€‹เบชเปเบฒโ€‹เบ™เบถเบโ€‹เบ‚เบญเบ‡โ€‹เบ—เปˆเบฒเบ™โ€‹เบˆเบฐโ€‹เบญเบฐโ€‹เบ™เบธโ€‹เบเบฒเบ”โ€‹เปƒเบซเป‰โ€‹เบ—เปˆเบฒเบ™โ€‹)
  • เบ”เป‰เบงเบเบเบฒเบ™เบชเป‰เบฒเบ‡เบ‚เบฐเบšเบงเบ™เบเบฒเบ™เป€เบฎเบฑเบ”เบงเบฝเบเปเบšเบšเป€เบ„เบทเปˆเบญเบ™เป„เบซเบงเบˆเบฒเบเบเบฒเบ™เบ‚เบฝเบ™เปเบฅเบฐเป€เบ‚เบปเป‰เบฒเปƒเบˆเบฅเบฐเบซเบฑเบ” Python เบ‡เปˆเบฒเบเบซเบผเบฒเบ
  • เปเบฅเบฐเบ„เบงเบฒเบกเบชเบฒเบกเบฒเบ”เปƒเบ™เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เปเบฅเบฐ APIs เบเบฑเบšเบเบฑเบ™เปเบฅเบฐเบเบฑเบ™เป‚เบ”เบเปƒเบŠเป‰เบญเบปเบ‡เบ›เบฐเบเบญเบšเบ—เบตเปˆเบเบฝเบกเบžเป‰เบญเบกเปเบฅเบฐ plugins เบ—เบตเปˆเป€เบฎเบฑเบ”เปƒเบ™เป€เบฎเบทเบญเบ™ (เป€เบŠเบดเปˆเบ‡เบ‡เปˆเบฒเบเบ”เบฒเบเบ—เบตเปˆเบชเบธเบ”).

เบžเบงเบเป€เบฎเบปเบฒเปƒเบŠเป‰ Apache Airflow เปเบšเบšเบ™เบตเป‰:

  • เบžเบงเบเป€เบฎเบปเบฒเป€เบเบฑเบšเบเปเบฒเบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเปเบซเบผเปˆเบ‡เบ•เปˆเบฒเบ‡เป† (เบซเบผเบฒเบเป† SQL Server เปเบฅเบฐ PostgreSQL instances, APIs เบ•เปˆเบฒเบ‡เป†เบ—เบตเปˆเบกเบต metrics เบ„เปเบฒเบฎเป‰เบญเบ‡เบชเบฐเบซเบกเบฑเบ, เป€เบ–เบดเบ‡เปเบกเปˆเบ™เบงเปˆเบฒ 1C) เปƒเบ™ DWH เปเบฅเบฐ ODS (เบžเบงเบเป€เบฎเบปเบฒเบกเบต Vertica เปเบฅเบฐ Clickhouse).
  • เบเป‰เบฒเบงเบซเบ™เป‰เบฒเปเบ™เบงเปƒเบ” cron, เป€เบŠเบดเปˆเบ‡เป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เบ‚เบฐเบšเบงเบ™เบเบฒเบ™เบฅเบงเบกเบ‚เปเป‰เบกเบนเบ™เบขเบนเปˆเปƒเบ™ ODS, เปเบฅเบฐเบเบฑเบ‡เบ•เบดเบ”เบ•เบฒเบกเบเบฒเบ™เบšเปเบฒเบฅเบธเบ‡เบฎเบฑเบเบชเบฒเบ‚เบญเบ‡เบžเบงเบเป€เบ‚เบปเบฒ.

เบˆเบปเบ™เบเปˆเบงเบฒเบšเปเปˆเบ”เบปเบ™เบกเบฒเบ™เบตเป‰, เบ„เบงเบฒเบกเบ•เป‰เบญเบ‡เบเบฒเบ™เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเป„เบ”เป‰เบ–เบทเบเบ„เบธเป‰เบกเบ„เบญเบ‡เป‚เบ”เบเป€เบ„เบทเปˆเบญเบ‡เปเบกเปˆเบ‚เปˆเบฒเบเบ‚เบฐเบซเบ™เบฒเบ”เบ™เป‰เบญเบเบซเบ™เบถเปˆเบ‡เบ—เบตเปˆเบกเบต 32 cores เปเบฅเบฐ 50 GB เบ‚เบญเบ‡ RAM. เปƒเบ™ Airflow, เบ™เบตเป‰เป€เบฎเบฑเบ”เบงเบฝเบ:

  • เบซเบผเบฒเบ 200 เบ”เบฒเบ” (เบ•เบปเบงเบˆเบดเบ‡เปเบฅเป‰เบง workflows, เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒ stuffed เบงเบฝเบเบ‡เบฒเบ™),
  • เปƒเบ™โ€‹เปเบ•เปˆโ€‹เบฅเบฐโ€‹เป‚เบ”เบโ€‹เบชเบฐโ€‹เป€เบฅเปˆเบโ€‹ 70 เบงเบฝเบเบ‡เบฒเบ™,
  • เบ„เบงเบฒเบกเบ”เบตเบ™เบตเป‰เป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™ (เป‚เบ”เบเบชเบฐเป€เบฅเปˆเบ) เบซเบ™เบถเปˆเบ‡เบŠเบปเปˆเบงเป‚เบกเบ‡.

เปเบฅเบฐเบเปˆเบฝเบงเบเบฑเบšเบงเบดเบ—เบตเบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบ‚เบฐเบซเบเบฒเบ, เบ‚เป‰เบญเบเบˆเบฐเบ‚เบฝเบ™เบ‚เป‰เบฒเบ‡เบฅเบธเปˆเบกเบ™เบตเป‰, เปเบ•เปˆเบ•เบญเบ™เบ™เบตเป‰เปƒเบซเป‰เบเปเบฒเบ™เบปเบ”เบšเบฑเบ™เบซเบฒ รผber เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเปเบเป‰เป„เบ‚:

เบกเบตเบชเบฒเบกเปเบซเบผเปˆเบ‡ SQL Servers, เปเบ•เปˆเบฅเบฐเบ„เบปเบ™เบกเบต 50 เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™ - เบ•เบปเบงเบขเปˆเบฒเบ‡เบ‚เบญเบ‡เป‚เบ„เบ‡เบเบฒเบ™เบซเบ™เบถเปˆเบ‡, เบ•เบฒเบกเบฅเปเบฒเบ”เบฑเบš, เบžเบงเบเป€เบ‚เบปเบฒเบกเบตเป‚เบ„เบ‡เบชเป‰เบฒเบ‡เบ”เบฝเบงเบเบฑเบ™ (เป€เบเบทเบญเบšเบ—เบธเบเบšเปˆเบญเบ™, mua-ha-ha), เบŠเบถเปˆเบ‡เบซเบกเบฒเบเบ„เบงเบฒเบกเบงเปˆเบฒเปเบ•เปˆเบฅเบฐเบ„เบปเบ™เบกเบตเบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบ„เปเบฒเบชเบฑเปˆเบ‡ (เป‚เบŠเบเบ”เบต, เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบ—เบตเปˆเบกเบตเบ™เบฑเป‰เบ™. เบŠเบทเปˆเบชเบฒเบกเบฒเบ”เบเบปเบ”เป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เบ—เบธเบฅเบฐเบเบดเบ”เปƒเบ”เบเปเปˆเบ•เบฒเบก). เบžเบงเบเป€เบฎเบปเบฒเป€เบญเบปเบฒเบ‚เปเป‰เบกเบนเบ™เป‚เบ”เบเบเบฒเบ™เป€เบžเบตเปˆเบกเบŠเปˆเบญเบ‡เบšเปเบฅเบดเบเบฒเบ™ (เป€เบ„เบทเปˆเบญเบ‡เปเบกเปˆเบ‚เปˆเบฒเบเปเบซเบผเปˆเบ‡, เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เปเบซเบผเปˆเบ‡, ETL task ID) เปเบฅเบฐเบ–เบดเป‰เบกเบžเบงเบเบกเบฑเบ™เปƒเบชเปˆ, เป€เบงเบปเป‰เบฒ, Vertica.

เปƒเบซเป‰เป„เบ›!

เบžเบฒเบเบชเปˆเบงเบ™เบ•เบปเป‰เบ™เบ•เป, เบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ” (เปเบฅเบฐเบ—เบดเบ”เบชเบฐเบ”เบตเป€เบฅเบฑเบเบ™เป‰เบญเบ)

เป€เบ›เบฑเบ™เบซเบเบฑเบ‡เบžเบงเบเป€เบฎเบปเบฒ (เปเบฅเบฐเป€เบˆเบปเป‰เบฒ)

เปƒเบ™เป€เบงเบฅเบฒเบ—เบตเปˆเบ•เบปเป‰เบ™เป„เบกเป‰เปƒเบซเบเปˆเปเบฅเบฐเบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบ‡เปˆเบฒเบเบ”เบฒเบ SQL-schik เปƒเบ™โ€‹เบเบฒเบ™โ€‹เบ„เป‰เบฒโ€‹เบ‚เบฒเบโ€‹เบเปˆเบญเบโ€‹เบ‚เบญเบ‡โ€‹เบฅเบฑเบ”โ€‹เป€เบŠเบโ€‹เบซเบ™เบถเปˆเบ‡โ€‹, เบžเบงเบโ€‹เป€เบฎเบปเบฒโ€‹เบซเบฅเบญเบโ€‹เบฅเบงเบ‡โ€‹เบ‚เบฐโ€‹เบšเบงเบ™โ€‹เบเบฒเบ™ ETL aka เบเบฒเบ™โ€‹เป„เบซเบผโ€‹เบ‚เบญเบ‡โ€‹เบ‚เปเป‰โ€‹เบกเบนเบ™โ€‹เป‚เบ”เบโ€‹เบ™เปเบฒโ€‹เปƒเบŠเป‰โ€‹เบชเบญเบ‡โ€‹เป€เบ„เบทเปˆเบญเบ‡โ€‹เบกเบทโ€‹เบ—เบตเปˆโ€‹เบกเบตโ€‹เปƒเบซเป‰โ€‹เบžเบงเบโ€‹เป€เบฎเบปเบฒโ€‹:

  • เบชเบนเบ™เบžเบฐเบฅเบฑเบ‡เบ‡เบฒเบ™ Informatica - เป€เบ›เบฑเบ™โ€‹เบฅเบฐโ€‹เบšเบปเบšโ€‹เบเบฒเบ™โ€‹เปเบœเปˆโ€‹เบเบฐโ€‹เบˆเบฒเบโ€‹เบ—เบตเปˆโ€‹เบชเบธเบ”โ€‹, เบœเบฐโ€‹เบฅเบดเบ”โ€‹เบ•เบฐโ€‹เบžเบฑเบ™โ€‹เบ—เบตเปˆโ€‹เบชเบธเบ”โ€‹, เบกเบตโ€‹เบฎเบฒเบ”โ€‹เปเบงโ€‹เบ‚เบญเบ‡โ€‹เบ•เบปเบ™โ€‹เป€เบญเบ‡โ€‹, เบชเบฐโ€‹เบšเบฑเบšโ€‹เบ‚เบญเบ‡โ€‹เบ•เบปเบ™โ€‹เป€เบญเบ‡โ€‹. เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเป„เบ”เป‰เปƒเบŠเป‰เบžเบฃเบฐเป€เบˆเบปเป‰เบฒเบซเป‰เบฒเบก 1% เบ‚เบญเบ‡เบ„เบงเบฒเบกเบชเบฒเบกเบฒเบ”เบ‚เบญเบ‡เบกเบฑเบ™. เป€เบ›เบฑเบ™เบซเบเบฑเบ‡? เบ”เบต, เบเปˆเบญเบ™เบญเบทเปˆเบ™ เปเบปเบ”, เบเบฒเบ™เป‚เบ•เป‰เบ•เบญเบšเบ™เบตเป‰, เบขเบนเปˆเบšเปˆเบญเบ™เปƒเบ”เบšเปˆเบญเบ™ เปœเบถเปˆเบ‡ เบˆเบฒเบเบŠเบธเบกเบ›เบต 380, เบ„เบงเบฒเบกเบเบปเบ”เบ”เบฑเบ™เบ—เบฒเบ‡เบˆเบดเบ”เปƒเบˆเบ•เปเปˆเบžเบงเบเป€เบฎเบปเบฒ. เบญเบฑเบ™เบ—เบตเบชเบญเบ‡, contraption เบ™เบตเป‰เป„เบ”เป‰เบ–เบทเบเบญเบญเบเปเบšเบšเบชเปเบฒเบฅเบฑเบšเบ‚เบฐเบšเบงเบ™เบเบฒเบ™ fancy เบ—เบตเปˆเบชเบธเบ”, เบญเบปเบ‡เบ›เบฐเบเบญเบš furious reuse เปเบฅเบฐ tricks เบ—เบตเปˆเบชเปเบฒเบ„เบฑเบ™เบซเบผเบฒเบเบงเบดเบชเบฒเบซเบฐเบเบดเบ”เบญเบทเปˆเบ™เป†. เบเปˆเบฝเบงเบเบฑเบšเบ„เบงเบฒเบกเบˆเบดเบ‡เบ—เบตเปˆเบงเปˆเบฒเบกเบฑเบ™เบกเบตเบ„เปˆเบฒเปƒเบŠเป‰เบˆเปˆเบฒเบ, เป€เบŠเบฑเปˆเบ™เบ›เบตเบเบ‚เบญเบ‡ Airbus AXNUMX / เบ›เบต, เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบšเปเปˆเป€เบงเบปเป‰เบฒเบซเบเบฑเบ‡.

    เบฅเบฐเบงเบฑเบ‡, เบžเบฒเบšเปœเป‰เบฒเบˆเปเบญเบฒเบ”เป€เบฎเบฑเบ”เปƒเบซเป‰เบ„เบปเบ™เบญเบฒเบเบธเบ•เปเปˆเบฒเบเบงเปˆเบฒ 30 เบ›เบตเบšเบฒเบ”เป€เบˆเบฑเบšเป€เบฅเบฑเบเบ™เป‰เบญเบ

    Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

  • SQL Server Integration Server - เบžเบงเบโ€‹เป€เบฎเบปเบฒโ€‹เป„เบ”เป‰โ€‹เบ™เปเบฒโ€‹เปƒเบŠเป‰ comrade เบ™เบตเป‰โ€‹เปƒเบ™โ€‹เบเบฒเบ™โ€‹เป„เบซเบผโ€‹เป€เบ‚เบปเป‰เบฒโ€‹เบ‚เบญเบ‡โ€‹เป‚เบ„เบ‡โ€‹เบเบฒเบ™โ€‹เบ‚เบญเบ‡โ€‹เบžเบงเบโ€‹เป€เบฎเบปเบฒโ€‹. เปเบฅเป‰เบง, เปƒเบ™เบ„เบงเบฒเบกเป€เบ›เบฑเบ™เบˆเบดเบ‡: เบžเบงเบเป€เบฎเบปเบฒเปƒเบŠเป‰ SQL Server เปเบฅเป‰เบง, เปเบฅเบฐเบกเบฑเบ™เบญเบฒเบ”เบˆเบฐเบšเปเปˆเบชเบปเบกเป€เบซเบ”เบชเบปเบกเบœเบปเบ™เบ—เบตเปˆเบˆเบฐเบšเปเปˆเปƒเบŠเป‰เป€เบ„เบทเปˆเบญเบ‡เบกเบท ETL เบ‚เบญเบ‡เบกเบฑเบ™. เบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡เบขเบนเปˆเปƒเบ™เบกเบฑเบ™เบ”เบต: เบ—เบฑเบ‡เบเบฒเบ™เป‚เบ•เป‰เบ•เบญเบšเปเบกเปˆเบ™เบชเบงเบเบ‡เบฒเบก, เปเบฅเบฐเบฅเบฒเบเบ‡เบฒเบ™เบ„เบงเบฒเบกเบ„เบทเบšเบซเบ™เป‰เบฒ ... เปเบ•เปˆเบ™เบตเป‰เบšเปเปˆเปเบกเปˆเบ™เป€เบซเบ”เบœเบปเบ™เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบฎเบฑเบเบœเบฐเบฅเบดเบ”เบ•เบฐเบžเบฑเบ™เบŠเบญเบšเปเบง, เป‚เบญเป‰, เบšเปเปˆเปเบกเปˆเบ™เบชเปเบฒเบฅเบฑเบšเป€เบฅเบทเปˆเบญเบ‡เบ™เบตเป‰. เบฅเบธเป‰เบ™เบกเบฑเบ™ dtsx (เป€เบŠเบดเปˆเบ‡เปเบกเปˆเบ™ XML เบเบฑเบš nodes shuffled on save) เบžเบงเบเป€เบฎเบปเบฒเบชเบฒเบกเบฒเบ”, เปเบ•เปˆเบชเบดเปˆเบ‡เบ—เบตเปˆเป€เบ›เบฑเบ™เบˆเบธเบ”? เป€เบฎเบฑเบ”เปเบ™เบงเปƒเบ”เบเปˆเบฝเบงเบเบฑเบšเบเบฒเบ™เบชเป‰เบฒเบ‡เบŠเบธเบ”เบงเบฝเบเบ—เบตเปˆเบˆเบฐเบฅเบฒเบเบซเบผเบฒเบเบฎเป‰เบญเบเบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบˆเบฒเบเป€เบ„เบทเปˆเบญเบ‡เปเบกเปˆเบ‚เปˆเบฒเบเบซเบ™เบถเปˆเบ‡เป„เบ›เบซเบฒเบญเบทเปˆเบ™? เปเบกเปˆเบ™เปเบฅเป‰เบง, เบชเบดเปˆเบ‡เบ—เบตเปˆเป€เบ›เบฑเบ™เบฎเป‰เบญเบ, เบ™เบดเป‰เบงเบกเบทเบ”เบฑเบ”เบชเบฐเบ™เบตเบ‚เบญเบ‡เบ—เปˆเบฒเบ™เบˆเบฐเบซเบผเบธเบ”เบฅเบปเบ‡เบˆเบฒเบเบŠเบฒเบงเบ•เปˆเบญเบ™, เบ„เบฅเบดเบเปƒเบชเปˆเบ›เบธเปˆเบกเบซเบ™เบน. เปเบ•เปˆเปเบ™เปˆเบ™เบญเบ™เบกเบฑเบ™เป€เบšเบดเปˆเบ‡เบ„เบปเบ™เบญเบฑเบšเป€เบ”เบ”: เบซเบผเบฒเบ:

    Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

เบžเบงเบเป€เบฎเบปเบฒเปเบ™เปˆเบ™เบญเบ™เบŠเบญเบเบซเบฒเบงเบดเบ—เบตเบ—เบฒเบ‡เบญเบญเบ. เบเปเบฅเบฐเบ™เบต เป€เบเบทเบญเบš เบกเบฒเบฎเบญเบ”เป€เบ„เบทเปˆเบญเบ‡เบœเบฐเบฅเบดเบ”เบŠเบธเบ” SSIS เบ—เบตเปˆเบ‚เบฝเบ™เบ”เป‰เบงเบเบ•เบปเบ™เป€เบญเบ‡ ...

โ€ฆ เปเบฅเป‰เบงเบงเบฝเบเปƒเปเปˆเบเปเบžเบปเบšเบ‚เป‰เบญเบ. เปเบฅเบฐ Apache Airflow overtook เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบเปˆเบฝเบงเบเบฑเบšเบกเบฑเบ™.

เป€เบกเบทเปˆเบญเบ‚เป‰เบญเบเบžเบปเบšเบงเปˆเบฒเบ„เปเบฒเบญเบฐเบ—เบดเบšเบฒเบเบเปˆเบฝเบงเบเบฑเบšเบ‚เบฐเบšเบงเบ™เบเบฒเบ™ ETL เปเบกเปˆเบ™เบฅเบฐเบซเบฑเบ” Python เบ‡เปˆเบฒเบเบ”เบฒเบ, เบ‚เป‰เบญเบเบšเปเปˆเป„เบ”เป‰เป€เบ•เบฑเป‰เบ™เบฅเปเบฒเบชเปเบฒเบฅเบฑเบšเบ„เบงเบฒเบกเบชเบธเบ. เบ™เบตเป‰เปเบกเปˆเบ™เบงเบดเบ—เบตเบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ‚เปเป‰เบกเบนเบ™เปเบฅเบฐเบ„เบงเบฒเบกเปเบ•เบเบ•เปˆเบฒเบ‡, เปเบฅเบฐเบเบฒเบ™เบ–เบญเบเป€เบ—เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบ—เบตเปˆเบกเบตเป‚เบ„เบ‡เบชเป‰เบฒเบ‡เบ”เบฝเบงเบˆเบฒเบเบซเบผเบฒเบเบฎเป‰เบญเบเบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เป€เบ›เบปเป‰เบฒเบซเบกเบฒเบเบ”เบฝเบงเป„เบ”เป‰เบเบฒเบเป€เบ›เบฑเบ™เป€เบฅเบทเปˆเบญเบ‡เบ‚เบญเบ‡เบฅเบฐเบซเบฑเบ” Python เปƒเบ™เบซเบ™เบถเปˆเบ‡เปเบฅเบฐเป€เบ„เบดเปˆเบ‡เบซเบ™เบถเปˆเบ‡เบซเบผเบทเบชเบญเบ‡เบซเบ™เป‰เบฒเบˆเป 13 ".

เบเบฒเบ™เบ›เบฐเบเบญเบšเบเบธเปˆเบก

เบšเปเปˆเปƒเบซเป‰เบˆเบฑเบ”เปเบˆเบ‡เป‚เบฎเบ‡เบฎเบฝเบ™เบญเบฐเบ™เบธเบšเบฒเบ™เบขเปˆเบฒเบ‡เบชเบปเบกเบšเบนเบ™, เปเบฅเบฐเบšเปเปˆเป„เบ”เป‰เป€เบงเบปเป‰เบฒเบเปˆเบฝเบงเบเบฑเบšเบชเบดเปˆเบ‡เบ—เบตเปˆเบŠเบฑเบ”เป€เบˆเบ™เบขเบนเปˆเบ—เบตเปˆเบ™เบตเป‰, เป€เบŠเบฑเปˆเบ™: เบเบฒเบ™เบ•เบดเบ”เบ•เบฑเป‰เบ‡ Airflow, เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเบ—เปˆเบฒเบ™เป€เบฅเบทเบญเบ, Celery เปเบฅเบฐเบเปเบฅเบฐเบ™เบตเบญเบทเปˆเบ™เป†เบ—เบตเปˆเบญเบฐเบ—เบดเบšเบฒเบเป„เบงเป‰เปƒเบ™ docks.

เป€เบžเบทเปˆเบญเปƒเบซเป‰เบžเบงเบเป€เบฎเบปเบฒเบชเบฒเบกเบฒเบ”เป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เบเบฒเบ™เบ—เบปเบ”เบฅเบญเบ‡เป„เบ”เป‰เบ—เบฑเบ™เบ—เบต, เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเป„เบ”เป‰เปเบ•เป‰เบกเบฎเบนเบš docker-compose.yml เปƒเบ™เบ™เบฑเป‰เบ™:

  • เปƒเบซเป‰เบ‚เบญเบ‡เบ•เบปเบงเบˆเบดเบ‡เบเบปเบเบชเบนเบ‡ เบเบฐเปเบชเบญเบฒเบเบฒเบ”: Scheduler, Webserver. เบ”เบญเบเป„เบกเป‰เบเบฑเบ‡เบˆเบฐเบ–เบทเบเบซเบกเบธเบ™เบขเบนเปˆเบ—เบตเปˆเบ™เบฑเป‰เบ™เป€เบžเบทเปˆเบญเบ•เบดเบ”เบ•เบฒเบกเบงเบฝเบเบ‡เบฒเบ™เบ‚เบญเบ‡ Celery (เป€เบ™เบทเปˆเบญเบ‡เบˆเบฒเบเบงเปˆเบฒเบกเบฑเบ™เป„เบ”เป‰เบ–เบทเบ pushed เปเบฅเป‰เบง apache/airflow:1.10.10-python3.7, เปเบ•เปˆโ€‹เบžเบงเบโ€‹เป€เบฎเบปเบฒโ€‹เบšเปเปˆโ€‹เบชเบปเบ™โ€‹เปƒเบˆโ€‹)
  • PostgreSQL, เปƒเบ™เบ—เบตเปˆ Airflow เบˆเบฐเบ‚เบฝเบ™เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบšเปเบฅเบดเบเบฒเบ™เบ‚เบญเบ‡เบกเบฑเบ™ (เบ‚เปเป‰เบกเบนเบ™เบ•เบฒเบ•เบฐเบฅเบฒเบ‡, เบชเบฐเบ–เบดเบ•เบดเบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ”, เปเบฅเบฐเบญเบทเปˆเบ™เป†), เปเบฅเบฐ Celery เบˆเบฐเบซเบกเบฒเบเบงเบฝเบเบ‡เบฒเบ™เบ—เบตเปˆเบชเปเบฒเป€เบฅเบฑเบ”;
  • Redis, เป€เบŠเบดเปˆเบ‡เบˆเบฐเป€เบฎเบฑเบ”เบซเบ™เป‰เบฒเบ—เบตเปˆเป€เบ›เบฑเบ™เบ™เบฒเบเบซเบ™เป‰เบฒเบชเปเบฒเบฅเบฑเบš Celery;
  • เบžเบฐเบ™เบฑเบเบ‡เบฒเบ™ Celery, เป€เบŠเบดเปˆเบ‡เบˆเบฐเป„เบ”เป‰เบฎเบฑเบšเบเบฒเบ™เบ”เปเบฒเป€เบ™เบตเบ™เบเบฒเบ™เป‚เบ”เบเบเบปเบ‡เบ‚เบญเบ‡เบงเบฝเบเบ‡เบฒเบ™.
  • เป„เบ›เบ—เบตเปˆเป‚เบŸเบ™เป€เบ”เบต ./dags เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเป€เบžเบตเปˆเบกเป„เบŸเบฅเปŒเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเบ”เป‰เบงเบเบ„เปเบฒเบญเบฐเบ—เบดเบšเบฒเบเบ‚เบญเบ‡ dags. เบžเบงเบเบกเบฑเบ™เบˆเบฐเบ–เบทเบเป€เบเบฑเบšเบ‚เบถเป‰เบ™เป€เบ—เบดเบ‡เปเบกเบ‡เบงเบฑเบ™, เบชเบฐเบ™เบฑเป‰เบ™เบšเปเปˆเบˆเบณเป€เบ›เบฑเบ™เบ—เบตเปˆเบˆเบฐเบˆเบนเบ”เบเบญเบ‡เบ—เบฑเบ‡เปเบปเบ”เบซเบผเบฑเบ‡เบˆเบฒเบเบˆเบฒเบกเปเบ•เปˆเบฅเบฐเบ„เบฑเป‰เบ‡.

เปƒเบ™เบšเบฒเบ‡เบšเปˆเบญเบ™, เบฅเบฐเบซเบฑเบ”เปƒเบ™เบ•เบปเบงเบขเปˆเบฒเบ‡เบšเปเปˆเป„เบ”เป‰เบชเบฐเปเบ”เบ‡เปƒเบซเป‰เป€เบซเบฑเบ™เบขเปˆเบฒเบ‡เบชเบปเบกเบšเบนเบ™ (เป€เบžเบทเปˆเบญเบšเปเปˆเปƒเบซเป‰ clutter เบ‚เปเป‰เบ„เบงเบฒเบก), เปเบ•เปˆเบšเบฒเบ‡เบšเปˆเบญเบ™เบกเบฑเบ™เบ–เบทเบเบ”เบฑเบ”เปเบเป‰เปƒเบ™เบ‚เบฐเบšเบงเบ™เบเบฒเบ™. เบ•เบปเบงเบขเปˆเบฒเบ‡เบฅเบฐเบซเบฑเบ”เบเบฒเบ™เป€เบฎเบฑเบ”เบงเบฝเบเบ—เบตเปˆเบชเบปเบกเบšเบนเบ™เบชเบฒเบกเบฒเบ”เบžเบปเบšเป„เบ”เป‰เบขเบนเปˆเปƒเบ™เบšเปˆเบญเบ™เป€เบเบฑเบšเบกเป‰เบฝเบ™ https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

เบซเบกเบฒเบเป€เบซเบ”:

  • เปƒเบ™เบเบฒเบ™เบ›เบฐเบเบญเบšเบ‚เบญเบ‡เบญเบปเบ‡เบ›เบฐเบเบญเบš, เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบชเปˆเบงเบ™เปƒเบซเบเปˆเปเบกเปˆเบ™เบญเบตเบ‡เปƒเบชเปˆเบฎเบนเบšเบžเบฒเบšเบ—เบตเปˆเบกเบตเบŠเบทเปˆเบชเบฝเบ‡ puckel/docker-airflow - เปƒเบซเป‰เปเบ™เปˆเปƒเบˆเบงเปˆเบฒเบเบงเบ”เป€เบšเบดเปˆเบ‡เบกเบฑเบ™เบญเบญเบ. เบšเบฒเบ‡เบ—เบตเป€เบˆเบปเป‰เบฒเบšเปเปˆเบ•เป‰เบญเบ‡เบเบฒเบ™เบซเบเบฑเบ‡เบญเบตเบเปƒเบ™เบŠเบตเบงเบดเบ”เบ‚เบญเบ‡เป€เบˆเบปเป‰เบฒ.
  • เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒ Airflow เบ—เบฑเบ‡เบซเบกเบปเบ”เปเบกเปˆเบ™เบกเบตเบขเบนเปˆเบšเปเปˆเบžเบฝเบ‡เปเบ•เปˆเบœเปˆเบฒเบ™ airflow.cfg, เปเบ•เปˆเบเบฑเบ‡เบœเปˆเบฒเบ™เบ•เบปเบงเปเบ›เบชเบฐเบžเบฒเบšเปเบงเบ”เบฅเป‰เบญเบก (เบ‚เปเบ‚เบญเบšเปƒเบˆเบเบฑเบšเบœเบนเป‰เบžเบฑเบ”เบ—เบฐเบ™เบฒ), เป€เบŠเบดเปˆเบ‡เบ‚เป‰เบญเบเป„เบ”เป‰เปƒเบŠเป‰เบ›เบฐเป‚เบซเบเบ”เบˆเบฒเบเบญเบฑเบ™เบ•เบฐเบฅเบฒเบ.
  • เบ•เบฒเบกเบ—เปเบฒเบกเบฐเบŠเบฒเบ”, เบกเบฑเบ™เบšเปเปˆเปเบกเปˆเบ™เบเบฒเบ™เบœเบฐเบฅเบดเบ”เบ—เบตเปˆเบเบฝเบกเบžเป‰เบญเบก: เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเป€เบˆเบ”เบ•เบฐเบ™เบฒเบšเปเปˆเป„เบ”เป‰เปƒเบชเปˆเบเบฒเบ™เป€เบ•เบฑเป‰เบ™เบ‚เบญเบ‡เบซเบปเบงเปƒเบˆเปƒเบชเปˆเบ–เบฑเบ‡, เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบšเปเปˆเป„เบ”เป‰เบฅเบปเบšเบเบงเบ™เบ„เบงเบฒเบกเบ›เบญเบ”เป„เบž. เปเบ•เปˆเบ‚เป‰เบญเบเป„เบ”เป‰เป€เบฎเบฑเบ”เบ•เปเบฒเปˆเบชเบธเบ”เบ—เบตเปˆเป€เบซเบกเบฒเบฐเบชเบปเบกเบชเปเบฒเบฅเบฑเบšเบ™เบฑเบเบ—เบปเบ”เบฅเบญเบ‡เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ.
  • เปƒเบซเป‰เบชเบฑเบ‡เป€เบเบ”เบงเปˆเบฒ:
    • เป‚เบŸเบ™เป€เบ”เบต dag เบ•เป‰เบญเบ‡เบชเบฒเบกเบฒเบ”เป€เบ‚เบปเป‰เบฒเป€เบ–เบดเบ‡เป„เบ”เป‰เบ—เบฑเบ‡เบœเบนเป‰เบเบณเบ™เบปเบ”เป€เบงเบฅเบฒ เปเบฅเบฐเบ„เบปเบ™เบ‡เบฒเบ™.
    • เบ”เบฝเบงเบเบฑเบ™เปƒเบŠเป‰เบเบฑเบšเบซเป‰เบญเบ‡เบชเบฐเบซเบกเบธเบ”เบžเบฒเบเบชเปˆเบงเบ™เบ—เบตเบชเบฒเบกเบ—เบฑเบ‡เบซเบกเบปเบ” - เบžเบงเบเป€เบ‚เบปเบฒเบ—เบฑเบ‡เบซเบกเบปเบ”เบ•เป‰เบญเบ‡เป„เบ”เป‰เบฎเบฑเบšเบเบฒเบ™เบ•เบดเบ”เบ•เบฑเป‰เบ‡เบขเบนเปˆเปƒเบ™เป€เบ„เบทเปˆเบญเบ‡เบˆเบฑเบเบ—เบตเปˆเบกเบตเบ•เบฒเบ•เบฐเบฅเบฒเบ‡เปเบฅเบฐเบžเบฐเบ™เบฑเบเบ‡เบฒเบ™.

เปเบฅเป‰เบง, เบ”เบฝเบงเบ™เบตเป‰เบกเบฑเบ™เบ‡เปˆเบฒเบเบ”เบฒเบ:

$ docker-compose up --scale worker=3

เบซเบผเบฑเบ‡เบˆเบฒเบเบ—เบตเปˆเบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡เป€เบžเบตเปˆเบกเบ‚เบถเป‰เบ™, เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เป€เบšเบดเปˆเบ‡เปƒเบ™เบเบฒเบ™เป‚เบ•เป‰เบ•เบญเบšเบ‚เบญเบ‡เป€เบงเบฑเบšเป„เบŠเบ•เปŒ:

เปเบ™เบงเบ„เบงเบฒเบกเบ„เบดเบ”เบžเบทเป‰เบ™เบ–เบฒเบ™

เบ–เป‰เบฒเบ—เปˆเบฒเบ™เบšเปเปˆเป€เบ‚เบปเป‰เบฒเปƒเบˆเบชเบดเปˆเบ‡เปƒเบ”เปƒเบ™ "dags", เบ™เบตเป‰เปเบกเปˆเบ™เบงเบฑเบ”เบˆเบฐเบ™เบฒเบ™เบธเบเบปเบกเบชเบฑเป‰เบ™:

  • Scheduler - เบฅเบธเบ‡เบ—เบตเปˆเบชเปเบฒเบ„เบฑเบ™เบ—เบตเปˆเบชเบธเบ”เปƒเบ™ Airflow, เบเบฒเบ™เบ„เบงเบšเบ„เบธเบกเบ—เบตเปˆเบซเบธเปˆเบ™เบเบปเบ™เป€เบฎเบฑเบ”เบงเบฝเบเบซเบ™เบฑเบ, เปเบฅเบฐเบšเปเปˆเปเบกเปˆเบ™เบšเบธเบเบ„เบปเบ™: เบ•เบดเบ”เบ•เบฒเบกเบเบงเบ”เบเบฒเบ•เบฒเบ•เบฐเบฅเบฒเบ‡, เบ›เบฑเบšเบ›เบธเบ‡ dags, เป€เบ›เบตเบ”เบ•เบปเบงเบงเบฝเบเบ‡เบฒเบ™.

    เป‚เบ”เบเบ—เบปเปˆเบงเป„เบ›เปเบฅเป‰เบง, เปƒเบ™เบฎเบธเปˆเบ™เป€เบเบปเปˆเบฒ, เบฅเบฒเบงเบกเบตเบšเบฑเบ™เบซเบฒเบเบฑเบšเบ„เบงเบฒเบกเบŠเบปเบ‡เบˆเปเบฒ (เบšเปเปˆ, เบšเปเปˆเปเบกเปˆเบ™เบ„เบงเบฒเบกเบˆเปเบฒ, เปเบ•เปˆเบเบฒเบ™เบฎเบปเปˆเบงเป„เบซเบผ) เปเบฅเบฐเบ•เบปเบงเบเปเบฒเบ™เบปเบ”เบเบฒเบ™เบกเปเบฅเบฐเบ”เบปเบเบเบฑเบ‡เบขเบนเปˆเปƒเบ™เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒ. run_duration โ€” เป„เบฅโ€‹เบเบฐโ€‹เบเบฒเบ™โ€‹เป€เบฅเบตเปˆเบกโ€‹เบ•เบปเป‰เบ™โ€‹เปƒเบซเบกเปˆโ€‹เบ‚เบญเบ‡โ€‹เบ•เบปเบ™โ€‹. เปเบ•เปˆเบ”เบฝเบงเบ™เบตเป‰เบ—เบธเบเบขเปˆเบฒเบ‡เบ”เบตเปเบฅเป‰เบง.

  • DAG (aka "dug") - "เบเบฒเบŸ acyclic เบŠเบตเป‰", เปเบ•เปˆเบ„เปเบฒเบ™เบดเบเบฒเบกเบ”เบฑเปˆเบ‡เบเปˆเบฒเบงเบˆเบฐเบšเบญเบเบ„เบปเบ™เบˆเปเบฒเบ™เบงเบ™เบซเบ™เป‰เบญเบเบซเบ™เบถเปˆเบ‡, เปเบ•เปˆเปƒเบ™เบ„เบงเบฒเบกเป€เบ›เบฑเบ™เบˆเบดเบ‡เบกเบฑเบ™เป€เบ›เบฑเบ™เบ–เบฑเบ‡เบชเปเบฒเบฅเบฑเบšเบงเบฝเบเบ‡เบฒเบ™เบ—เบตเปˆเบžเบปเบงเบžเบฑเบ™เบเบฑเบšเบเบฑเบ™เปเบฅเบฐเบเบฑเบ™ (เป€เบšเบดเปˆเบ‡เบ‚เป‰เบฒเบ‡เบฅเบธเปˆเบกเบ™เบตเป‰) เบซเบผเบทเบเบฒเบ™เบ›เบฝเบšเบ—เบฝเบšเบ‚เบญเบ‡ Package เปƒเบ™ SSIS เปเบฅเบฐ Workflow เปƒเบ™ Informatica .

    เบ™เบญเบเป€เบซเบ™เบทเบญเบˆเบฒเบ dags, เบกเบฑเบ™เบญเบฒเบ”เบˆเบฐเบเบฑเบ‡เบกเบต subdags, เปเบ•เปˆเบžเบงเบเป€เบฎเบปเบฒเบชเปˆเบงเบ™เบซเบผเบฒเบเบญเบฒเบ”เบˆเบฐเบšเปเปˆเป€เบ‚เบปเป‰เบฒเบซเบฒเบžเบงเบเบกเบฑเบ™.

  • เปเบฅเปˆเบ™ DAG - เป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™ dag, เป€เบŠเบดเปˆเบ‡เบกเบญเบšเบซเบกเบฒเบเบ‚เบญเบ‡เบ•เบปเบ™เป€เบญเบ‡ execution_date. Dagrans เบ‚เบญเบ‡ dag เบ”เบฝเบงเบเบฑเบ™เบชเบฒเบกเบฒเบ”เป€เบฎเบฑเบ”เบงเบฝเบเบ‚เบฐเบซเบ™เบฒเบ™เบเบฑเบ™ (เบ–เป‰เบฒเบ—เปˆเบฒเบ™เป€เบฎเบฑเบ”เปƒเบซเป‰เบงเบฝเบเบ‡เบฒเบ™เบ‚เบญเบ‡เบ—เปˆเบฒเบ™เบšเปเปˆเบกเบตเบ„เบงเบฒเบกเป€เบ‚เบฑเป‰เบกเปเบ‚เบ‡, เปเบ™เปˆเบ™เบญเบ™).
  • Operator เปเบกเปˆเบ™เบŠเบดเป‰เบ™เบชเปˆเบงเบ™เบ‚เบญเบ‡เบฅเบฐเบซเบฑเบ”เบ—เบตเปˆเบฎเบฑเบšเบœเบดเบ”เบŠเบญเบšเบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบ”เปเบฒเป€เบ™เบตเบ™เบเบฒเบ™เบชเบฐเป€เบžเบฒเบฐเปƒเบ”เบซเบ™เบถเปˆเบ‡. เบกเบตเบชเบฒเบกเบ›เบฐเป€เบžเบ”เบ‚เบญเบ‡เบœเบนเป‰เบ›เบฐเบเบญเบšเบเบฒเบ™:
    • เบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ”เบ„เบท favorite เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ PythonOperator, เป€เบŠเบดเปˆเบ‡เบชเบฒเบกเบฒเบ”เบ›เบฐเบ•เบดเบšเบฑเบ”เบฅเบฐเบซเบฑเบ” Python เปƒเบ”เป† (เบ—เบตเปˆเบ–เบทเบเบ•เป‰เบญเบ‡);
    • เบเบฒเบ™เป‚เบญเบ™, เป€เบŠเบดเปˆเบ‡เบเบฒเบ™เบ‚เบปเบ™เบชเบปเปˆเบ‡เบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเบชเบฐเบ–เบฒเบ™เบ—เบตเปˆเป„เบ›เบซเบฒเบชเบฐเบ–เบฒเบ™เบ—เบตเปˆ, เป€เบงเบปเป‰เบฒเบงเปˆเบฒ, MsSqlToHiveTransfer;
    • sensor เปƒเบ™เบ—เบฒเบ‡เบเบปเบ‡เบเบฑเบ™เบ‚เป‰เบฒเบก, เบกเบฑเบ™เบˆเบฐเบŠเปˆเบงเบเปƒเบซเป‰เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบ•เบญเบšเป‚เบ•เป‰เบซเบผเบทเบŠเป‰เบฒเบฅเบปเบ‡เบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ”เบ•เปเปˆเป„เบ›เบ‚เบญเบ‡ dag เบˆเบปเบ™เบเปˆเบงเบฒเป€เบซเบ”เบเบฒเบ™เป€เบเบตเบ”เบ‚เบทเป‰เบ™. HttpSensor เบชเบฒเบกเบฒเบ”เบ”เบถเบ‡เบˆเบธเบ”เบชเบดเป‰เบ™เบชเบธเบ”เบ—เบตเปˆเบฅเบฐเบšเบธเป„เบงเป‰, เปเบฅเบฐเป€เบกเบทเปˆเบญเบ„เปเบฒเบ•เบญเบšเบ—เบตเปˆเบ•เป‰เบญเบ‡เบเบฒเบ™เบฅเปเบ–เป‰เบฒ, เป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เบเบฒเบ™เป‚เบญเบ™ GoogleCloudStorageToS3Operator. เบˆเบดเบ”เปƒเบˆเบ—เบตเปˆเบชเบปเบ‡เป„เบชเบˆเบฐเบ–เบฒเบกเบงเปˆเบฒ: โ€œเป€เบ›เบฑเบ™เบซเบเบฑเบ‡? เบซเบผเบฑเบ‡เบˆเบฒเบเบ—เบตเปˆเบ—เบฑเบ‡เบซเบกเบปเบ”, เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เป€เบฎเบฑเบ”เบŠเป‰เปเบฒเป„เบ”เป‰เปƒเบ™เบ•เบปเบงเบ›เบฐเบ•เบดเบšเบฑเบ”เบเบฒเบ™!โ€ เปเบฅเบฐเบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™, เปƒเบ™เบ„เปเบฒเบชเบฑเปˆเบ‡เบ—เบตเปˆเบˆเบฐเบšเปเปˆ clog เบชเบฐเบ™เบธเบเป€เบเบตเบ‚เบญเบ‡เบงเบฝเบเบ‡เบฒเบ™เบ—เบตเปˆเบกเบตเบœเบนเป‰เบ›เบฐเบ•เบดเบšเบฑเบ”เบเบฒเบ™เป‚เบˆเบฐ. เป€เบŠเบฑเบ™เป€เบŠเบตเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™, เบเบงเบ”เบชเบญเบšเปเบฅเบฐเบ•เบฒเบเบเปˆเบญเบ™เบ—เบตเปˆเบˆเบฐเบžเบฐเบเบฒเบเบฒเบกเบ•เปเปˆเป„เบ›.
  • Task - เบœเบนเป‰เบ›เบฐเบ•เบดเบšเบฑเบ”เบเบฒเบ™เบ›เบฐเบเบฒเบ”, เบšเปเปˆเบงเปˆเบฒเบˆเบฐเป€เบ›เบฑเบ™เบ›เบฐเป€เบžเบ”เปƒเบ”, เปเบฅเบฐเบ•เบดเบ”เบเบฑเบšเบ”เบฒเบเป„เบ”เป‰เบ–เบทเบเป€เบฅเบทเปˆเบญเบ™เบŠเบฑเป‰เบ™เปƒเบ™เบซเบ™เป‰เบฒเบงเบฝเบ.
  • เบ•เบปเบงเบขเปˆเบฒเบ‡เปœเป‰เบฒเบงเบฝเบ - เป€เบกเบทเปˆเบญเบœเบนเป‰เบงเบฒเบ‡เปเบœเบ™เบ—เบปเปˆเบงเป„เบ›เบ•เบฑเบ”เบชเบดเบ™เปƒเบˆเบงเปˆเบฒเบกเบฑเบ™เปเบกเปˆเบ™เป€เบงเบฅเบฒเบ—เบตเปˆเบˆเบฐเบชเบปเปˆเบ‡เบงเบฝเบเบ‡เบฒเบ™เป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เบเบฒเบ™เบชเบนเป‰เบฎเบปเบšเบเบฑเบšเบ™เบฑเบเบชเบฐเปเบ”เบ‡ - เบžเบฐเบ™เบฑเบเบ‡เบฒเบ™ (เบขเบนเปˆเปƒเบ™เบˆเบธเบ”, เบ–เป‰เบฒเบžเบงเบเป€เบฎเบปเบฒเปƒเบŠเป‰ LocalExecutor เบซเบผเบทเบเบฑเบš node เบซเปˆเบฒเบ‡เป„เบเบชเบญเบเบซเบผเบตเบเปƒเบ™เบเปเบฅเบฐเบ™เบตเบ‚เบญเบ‡ CeleryExecutor), เบกเบฑเบ™เบเปเบฒเบ™เบปเบ”เบšเปเบฅเบดเบšเบปเบ”เปƒเบซเป‰เบžเบงเบเป€เบ‚เบปเบฒ (i. e., เบŠเบธเบ”เบ‚เบญเบ‡เบ•เบปเบงเปเบ› - เบžเบฒเบฅเบฒเบกเบดเป€เบ•เบตเบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ”), เบ‚เบฐเบซเบเบฒเบเบ„เปเบฒเบชเบฑเปˆเบ‡เบซเบผเบทเปเบšเบšเบชเบญเบšเบ–เบฒเบก, เปเบฅเบฐเบ›เบฐเบเบญเบšเปƒเบซเป‰เป€เบ‚เบปเบฒเป€เบˆเบปเป‰เบฒ.

เบžเบงเบเป€เบฎเบปเบฒเบชเป‰เบฒเบ‡เบงเบฝเบเบ‡เบฒเบ™

เบ—เปเบฒเบญเบดเบ”, เปƒเบซเป‰เบญเบฐเบ—เบดเบšเบฒเบเป‚เบ„เบ‡เบเบฒเบ™เบ—เบปเปˆเบงเป„เบ›เบ‚เบญเบ‡ doug เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ, เปเบฅเบฐเบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เบฅเบฒเบเบฅเบฐเบญเบฝเบ”เป€เบžเบตเปˆเบกเป€เบ•เบตเบกเปเบฅเบฐเบซเบผเบฒเบ, เป€เบžเบฒเบฐเบงเปˆเบฒเบžเบงเบเป€เบฎเบปเบฒเบ™เปเบฒเปƒเบŠเป‰เบšเบฒเบ‡เบงเบดเบ—เบตเปเบเป‰เป„เบ‚เบ—เบตเปˆเบšเปเปˆเปเบกเปˆเบ™ trivial.

เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™, เปƒเบ™เบฎเบนเบšเปเบšเบšเบ—เบตเปˆเบ‡เปˆเบฒเบเบ”เบฒเบเบ—เบตเปˆเบชเบธเบ”, เบ”เบฑเปˆเบ‡เบเปˆเบฒเบงเบˆเบฐเบกเบตเบฅเบฑเบเบชเบฐเบ™เบฐเบ™เบตเป‰:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

เปƒเบซเป‰เบ„เบดเบ”เบญเบญเบเบงเปˆเบฒ:

  • เบซเบ™เป‰เบฒเบ—เปเบฒเบญเบดเบ”, เบžเบงเบเป€เบฎเบปเบฒเบ™เปเบฒเป€เบ‚เบปเป‰เบฒ libs เบ—เบตเปˆเบˆเปเบฒเป€เบ›เบฑเบ™เปเบฅเบฐ เบญเบฑเบ™โ€‹เบญเบทเปˆเบ™;
  • sql_server_ds - เบ™เบตเป‰โ€‹เปเบกเปˆเบ™ List[namedtuple[str, str]] เบเบฑเบšเบŠเบทเปˆเบ‚เบญเบ‡เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบˆเบฒเบ Airflow Connections เปเบฅเบฐเบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเป€เบญเบปเบฒเปเบœเปˆเบ™เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ;
  • dag - เบเบฒเบ™เบ›เบฐเบเบฒเบ”เบ‚เบญเบ‡ doug เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ, เป€เบŠเบดเปˆเบ‡เบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เบกเบตเบขเบนเปˆเปƒเบ™ globals()เบ–เป‰เบฒเบšเปเปˆเบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™ Airflow เบˆเบฐเบšเปเปˆเบžเบปเบšเบกเบฑเบ™. Doug เบเบฑเบ‡เบ•เป‰เบญเบ‡เบเบฒเบ™เป€เบงเบปเป‰เบฒเบงเปˆเบฒ:
    • เบฅเบฒเบงเบŠเบทเปˆเบซเบเบฑเบ‡ orders - เบซเบผเบฑเบ‡โ€‹เบˆเบฒเบโ€‹เบ™เบฑเป‰เบ™โ€‹เบŠเบทเปˆโ€‹เบ™เบตเป‰โ€‹เบˆเบฐโ€‹เบ›เบฒโ€‹เบเบปเบ”โ€‹เบขเบนเปˆโ€‹เปƒเบ™โ€‹เบเบฒเบ™โ€‹เป‚เบ•เป‰โ€‹เบ•เบญเบšโ€‹เบ‚เบญเบ‡โ€‹เป€เบงเบฑเบšโ€‹เป„เบŠโ€‹เบ•โ€‹เปŒโ€‹,
    • เบงเปˆเบฒเบฅเบฒเบงเบˆเบฐเป€เบฎเบฑเบ”เบงเบฝเบเบ•เบฑเป‰เบ‡เปเบ•เปˆเบ—เปˆเบฝเบ‡เบ„เบทเบ™เบ‚เบญเบ‡เบงเบฑเบ™เบ—เบตเปเบ›เบ”เบ‚เบญเบ‡เป€เบ”เบทเบญเบ™เบเปเบฅเบฐเบเบปเบ”,
    • เปเบฅเบฐเบกเบฑเบ™เบ„เบงเบ™เบˆเบฐเบ”เปเบฒเป€เบ™เบตเบ™เบเบฒเบ™, เบ›เบฐเบกเบฒเบ™เบ—เบธเบเป† 6 เบŠเบปเปˆเบงเป‚เบกเบ‡ (เบชเปเบฒเบฅเบฑเบš guys tough เบ—เบตเปˆเบ™เบตเป‰เปเบ—เบ™เบ—เบตเปˆเบˆเบฐเป€เบ›เบฑเบ™ timedelta() เบเบญเบกเบฎเบฑเบšเป„เบ”เป‰ cron- เบชเบฒเบ 0 0 0/6 ? * * *, เบชเปเบฒเบฅเบฑเบšเป€เบขเบฑเบ™เบซเบ™เป‰เบญเบ - เบเบฒเบ™เบชเบฐเปเบ”เบ‡เบญเบญเบเบ„เบท @daily);
  • workflow() เบˆเบฐเป€เบฎเบฑเบ”เบงเบฝเบเบ•เบปเป‰เบ™เบ•เป, เปเบ•เปˆเบšเปเปˆเปเบกเปˆเบ™เปƒเบ™เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™. เบชเปเบฒเบฅเบฑเบšเปƒเบ™เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™, เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบ–เบดเป‰เบกเบšเปเบฅเบดเบšเบปเบ”เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เบšเบฑเบ™เบ—เบถเบ.
  • เปเบฅเบฐเปƒเบ™เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™ magic เบ‡เปˆเบฒเบเบ”เบฒเบเบ‚เบญเบ‡เบเบฒเบ™เบชเป‰เบฒเบ‡เบงเบฝเบเบ‡เบฒเบ™:
    • เบžเบงเบเป€เบฎเบปเบฒเบ”เปเบฒเป€เบ™เบตเบ™เบเบฒเบ™เป‚เบ”เบเบœเปˆเบฒเบ™เปเบซเบผเปˆเบ‡เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ;
    • เป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™ PythonOperator, เป€เบŠเบดเปˆเบ‡เบˆเบฐเบ›เบฐเบ•เบดเบšเบฑเบ” dummy เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ workflow(). เบขเปˆเบฒโ€‹เบฅเบทเบกโ€‹เบฅเบฐโ€‹เบšเบธโ€‹เบŠเบทเปˆโ€‹เบ—เบตเปˆโ€‹เป€เบ›เบฑเบ™โ€‹เป€เบญโ€‹เบเบฐโ€‹เบฅเบฑเบ (เบžเบฒเบโ€‹เปƒเบ™โ€‹เบ”เบฒโ€‹เบโ€‹) เบ‚เบญเบ‡โ€‹เบงเบฝเบโ€‹เบ‡เบฒเบ™โ€‹เปเบฅเบฐโ€‹เบœเบนเบโ€‹เบžเบฑเบ™โ€‹เบ‚เบญเบ‡โ€‹เบ•เบปเบ™โ€‹เป€เบญเบ‡โ€‹. เบ—เบธเบ‡ provide_context เปƒเบ™เบ—เบฒเบ‡เบเบฑเบšเบเบฑเบ™, เบˆเบฐเบ–เบญเบเป€เบ—เบเบฒเบ™เป‚เบ•เป‰เบ–เบฝเบ‡เป€เบžเบตเปˆเบกเป€เบ•เบตเบกเป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เบซเบ™เป‰เบฒเบ—เบตเปˆ, เป€เบŠเบดเปˆเบ‡เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเป€เบเบฑเบšเบเปเบฒเบขเปˆเบฒเบ‡เบฅเบฐเบกเบฑเบ”เบฅเบฐเบงเบฑเบ‡เป‚เบ”เบเปƒเบŠเป‰ **context.

เบชเปเบฒเบฅเบฑเบšเปƒเบ™เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™, เบ™เบฑเป‰เบ™เปเบกเปˆเบ™เบ—เบฑเบ‡เบซเบกเบปเบ”. เบชเบดเปˆเบ‡เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเป„เบ”เป‰เบฎเบฑเบš:

  • dag เปƒเบซเบกเปˆเปƒเบ™เบเบฒเบ™เป‚เบ•เป‰เบ•เบญเบšเป€เบงเบฑเบšเป„เบŠเบ•เปŒ,
  • เบซเบ™เบถเปˆเบ‡เปเบฅเบฐเป€เบ„เบดเปˆเบ‡เบซเบ™เบถเปˆเบ‡เป€เบ›เบฑเบ™เบฎเป‰เบญเบเบซเบ™เป‰เบฒเบงเบฝเบเบ—เบตเปˆเบˆเบฐเบ–เบทเบเบ›เบฐเบ•เบดเบšเบฑเบ”เปƒเบ™เบ‚เบฐเบซเบ™เบฒเบ™ (เบ–เป‰เบฒ Airflow, เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒ Celery เปเบฅเบฐเบ„เบงเบฒเบกเบชเบฒเบกเบฒเบ”เบ‚เบญเบ‡เป€เบ„เบทเปˆเบญเบ‡เปเบกเปˆเบ‚เปˆเบฒเบเบญเบฐเบ™เบธเบเบฒเบ”เปƒเบซเป‰เบกเบฑเบ™).

เบ”เบต, เป€เบเบทเบญเบšเป„เบ”เป‰เบฎเบฑเบšเบกเบฑเบ™.

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™
เปƒเบœเบˆเบฐเบ•เบดเบ”เบ•เบฑเป‰เบ‡เบเบฒเบ™เป€เบžเบดเปˆเบ‡เบžเบฒเบญเบฒเป„เบช?

เป€เบžเบทเปˆเบญเป€เบฎเบฑเบ”เปƒเบซเป‰เบชเบดเปˆเบ‡เบ—เบฑเบ‡เบซเบกเบปเบ”เบ™เบตเป‰เบ‡เปˆเบฒเบเบ”เบฒเบ, เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเป„เบ”เป‰ screwed เปƒเบ™ docker-compose.yml เบเบฒเบ™เบ›เบธเบ‡เปเบ•เปˆเบ‡ requirements.txt เปƒเบ™เบ—เบธเบ nodes.

เปƒเบ™เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™เบกเบฑเบ™เบซเบกเบปเบ”เป„เบ›:

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

เบชเบตเปˆเบซเบผเปˆเบฝเบกโ€‹เบชเบตโ€‹เบ‚เบตเป‰โ€‹เป€เบ–เบปเปˆเบฒโ€‹เปเบกเปˆเบ™โ€‹เบ•เบปเบงโ€‹เบขเปˆเบฒเบ‡โ€‹เบงเบฝเบโ€‹เบ‡เบฒเบ™โ€‹เบ›เบฐโ€‹เบกเบงเบ™โ€‹เบœเบปเบ™โ€‹เป‚เบ”เบโ€‹เบเบฒเบ™โ€‹เบ•เบฑเป‰เบ‡โ€‹เป€เบงโ€‹เบฅเบฒโ€‹.

เบžเบงเบโ€‹เป€เบฎเบปเบฒโ€‹เบฅเปโ€‹เบ–เป‰เบฒโ€‹เบญเบตเบโ€‹เปœเป‰เบญเบโ€‹เปœเบถเปˆเบ‡, เบงเบฝเบโ€‹เบ‡เบฒเบ™โ€‹เบ—เบตเปˆโ€‹เบเบณโ€‹เบกเบฐโ€‹เบเบญเบ™โ€‹เป„เบ”เป‰โ€‹เบ–เบทเบโ€‹เบˆเบฑเบšโ€‹เบเบธเบก:

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

เบชเบตเบ‚เบฝเบง, เปเบ™เปˆเบ™เบญเบ™, เป„เบ”เป‰เบชเปเบฒเป€เบฅเบฑเบ”เบงเบฝเบเบ‡เบฒเบ™เบ‚เบญเบ‡เป€เบ‚เบปเบฒเป€เบˆเบปเป‰เบฒ. Reds เบšเปเปˆเบ›เบฐเบชเบปเบšเบœเบปเบ™เบชเปเบฒเป€เบฅเบฑเบ”เบซเบผเบฒเบ.

เป‚เบ”เบเบงเบดเบ—เบตเบ—เบฒเบ‡เบเบฒเบ™, เบšเปเปˆเบกเบตเป‚เบŸเบ™เป€เบ”เบตเบเปˆเบฝเบงเบเบฑเบšเบœเบฐเบฅเบดเบ”เบ•เบฐเบžเบฑเบ™เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ ./dags, เบšเปเปˆเบกเบตเบเบฒเบ™ synchronization เบฅเบฐเบซเบงเปˆเบฒเบ‡เป€เบ„เบทเปˆเบญเบ‡เบˆเบฑเบ - dags เบ—เบฑเบ‡เบซเบกเบปเบ”เบ™เบญเบ™เบขเบนเปˆเปƒเบ™ git เบขเบนเปˆเปƒเบ™ Gitlab เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ, เปเบฅเบฐ Gitlab CI เปเบˆเบเบขเบฒเบเบเบฒเบ™เบญเบฑเบšเป€เบ”เบ”เปƒเบซเป‰เบเบฑเบšเป€เบ„เบทเปˆเบญเบ‡เบˆเบฑเบเป€เบกเบทเปˆเบญเบฅเบงเบกเป€เบ‚เบปเป‰เบฒเบเบฑเบ™ master.

เป€เบฅเบฑเบเบ™เป‰เบญเบเบเปˆเบฝเบงเบเบฑเบšเบ”เบญเบเป„เบกเป‰

เปƒเบ™เบ‚เบฐเบ™เบฐเบ—เบตเปˆเบ„เบปเบ™เบ‡เบฒเบ™เบเบณเบฅเบฑเบ‡เบ•เบตเป€เบ„เบทเปˆเบญเบ‡เบ›เบฑเป‰เบ™เบ”เบดเบ™เป€เบœเบปเบฒเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ, เปƒเบซเป‰เป€เบฎเบปเบฒเบˆเบทเปˆเบˆเบณเป€เบ„เบทเปˆเบญเบ‡เบกเบทเปœเบถเปˆเบ‡เบ—เบตเปˆเบชเบฒเบกเบฒเบ”เบชเบฐเปเบ”เบ‡เปƒเบซเป‰เป€เบฎเบปเบฒเป€เบซเบฑเบ™เป„เบ”เป‰ - เบ”เบญเบเป„เบกเป‰.

เบซเบ™เป‰เบฒเบ—เปเบฒเบญเบดเบ”เบ—เบตเปˆเบกเบตเบ‚เปเป‰เบกเบนเบ™เบชเบฐเบซเบผเบธเบšเบเปˆเบฝเบงเบเบฑเบšเบ‚เปเป‰เบ‚เบญเบ‡เบžเบฐเบ™เบฑเบเบ‡เบฒเบ™:

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

เปœเป‰เบฒเบงเบฝเบเบ—เบตเปˆเป€เบ‚เบฑเป‰เบกเบ‡เบงเบ”เบ—เบตเปˆเบชเบธเบ”เบเบฑเบšเบงเบฝเบเบ—เบตเปˆเป„เบ›เป€เบฎเบฑเบ”เบงเบฝเบ:

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

เบซเบ™เป‰เบฒเป€เบšเบทเปˆเบญเบ—เบตเปˆเบชเบธเบ”เบเบฑเบšเบชเบฐเบ–เบฒเบ™เบฐเบ‚เบญเบ‡เบ™เบฒเบเบซเบ™เป‰เบฒเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ:

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

เบซเบ™เป‰เบฒเบ—เบตเปˆเบชเบปเบ”เปƒเบชเบ—เบตเปˆเบชเบธเบ”เปเบกเปˆเบ™เบเบฑเบšเบเบฒเบŸเบชเบฐเบ–เบฒเบ™เบฐเบซเบ™เป‰เบฒเบงเบฝเบเปเบฅเบฐเป€เบงเบฅเบฒเบ›เบฐเบ•เบดเบšเบฑเบ”เบ‚เบญเบ‡เบžเบงเบเป€เบ‚เบปเบฒ:

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

เบžเบงเบเป€เบฎเบปเบฒเป‚เบซเบผเบ” underloaded

เบ”เบฑเปˆเบ‡โ€‹เบ™เบฑเป‰เบ™โ€‹, เบงเบฝเบโ€‹เบ‡เบฒเบ™โ€‹เบ—เบฑเบ‡โ€‹เบซเบกเบปเบ”โ€‹เป„เบ”เป‰โ€‹เบฎเบฑเบšโ€‹เบœเบปเบ™โ€‹, เบ—เปˆเบฒเบ™โ€‹เบชเบฒโ€‹เบกเบฒเบ”โ€‹เบ›เบฐโ€‹เบ•เบดโ€‹เบšเบฑเบ”โ€‹เบเบฒเบ™โ€‹เบšเบฒเบ”โ€‹เป€เบˆเบฑเบšโ€‹เป„เบ”เป‰โ€‹.

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

เปเบฅเบฐโ€‹เบกเบตโ€‹เบœเบนเป‰โ€‹เบšเบฒเบ”โ€‹เป€เบˆเบฑเบšโ€‹เบซเบผเบฒเบโ€‹เบ„เบปเบ™โ€”เบ”เป‰เบงเบโ€‹เป€เบซเบ”โ€‹เบœเบปเบ™โ€‹เบญเบฑเบ™โ€‹เปœเบถเปˆเบ‡โ€‹เบซเบผเบทโ€‹เบญเบฑเบ™โ€‹เบญเบทเปˆเบ™. เปƒเบ™เบเปเบฅเบฐเบ™เบตเบ‚เบญเบ‡เบเบฒเบ™เบ™เปเบฒเปƒเบŠเป‰เบ—เบตเปˆเบ–เบทเบเบ•เป‰เบญเบ‡เบ‚เบญเบ‡ Airflow, เบชเบตเปˆเบซเบผเปˆเบฝเบกเป€เบซเบผเบปเปˆเบฒเบ™เบตเป‰เบŠเบตเป‰เปƒเบซเป‰เป€เบซเบฑเบ™เบงเปˆเบฒเบ‚เปเป‰เบกเบนเบ™เบšเปเปˆเป„เบ”เป‰เบกเบฒเบฎเบญเบ”เปเบ™เปˆเบ™เบญเบ™.

เบ—เปˆเบฒเบ™เบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เป€เบšเบดเปˆเบ‡เบšเบฑเบ™เบ—เบถเบเปเบฅเบฐ restart เบเปเบฅเบฐเบ™เบตเบ—เบตเปˆเบซเบผเบธเบ”เบฅเบปเบ‡.

เป‚เบ”เบโ€‹เบเบฒเบ™โ€‹เบ„เบฅเบดเบโ€‹เปƒเบชเปˆโ€‹เบชเบตเปˆโ€‹เบซเบฅเปˆเบฝเบกโ€‹เปƒเบ”โ€‹เบซเบ™เบถเปˆเบ‡โ€‹, เบžเบงเบโ€‹เป€เบฎเบปเบฒโ€‹เบˆเบฐโ€‹เป€เบซเบฑเบ™โ€‹เบเบฒเบ™โ€‹เบ›เบฐโ€‹เบ•เบดโ€‹เบšเบฑเบ”โ€‹เบ—เบตเปˆโ€‹เบกเบตโ€‹เปƒเบซเป‰โ€‹เบžเบงเบโ€‹เป€เบฎเบปเบฒโ€‹:

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

เบ—เปˆเบฒเบ™โ€‹เบชเบฒโ€‹เบกเบฒเบ”โ€‹เปƒเบŠเป‰โ€‹เป€เบงโ€‹เบฅเบฒโ€‹เปเบฅเบฐโ€‹เป€เบฎเบฑเบ”โ€‹เปƒเบซเป‰โ€‹เบฅเป‰เบฒเบ‡โ€‹เบเบฒเบ™โ€‹เบซเบผเบธเบ”โ€‹เบฅเบปเบ‡โ€‹เป„เบ”เป‰โ€‹. เบ™เบฑเป‰เบ™เปเบกเปˆเบ™, เบžเบงเบเป€เบฎเบปเบฒเบฅเบทเบกเบงเปˆเบฒเบšเบฒเบ‡เบชเบดเปˆเบ‡เบšเบฒเบ‡เบขเปˆเบฒเบ‡เบฅเบปเป‰เบกเป€เบซเบฅเบงเบขเบนเปˆเบ—เบตเปˆเบ™เบฑเป‰เบ™, เปเบฅเบฐเบงเบฝเบเบ‡เบฒเบ™เบ•เบปเบงเบขเปˆเบฒเบ‡เบ”เบฝเบงเบเบฑเบ™เบˆเบฐเป„เบ›เบซเบฒเบ•เบปเบงเบเปเบฒเบ™เบปเบ”เป€เบงเบฅเบฒ.

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

เบกเบฑเบ™เป€เบ›เบฑเบ™เบ—เบตเปˆเบŠเบฑเบ”เป€เบˆเบ™เบงเปˆเบฒเบเบฒเบ™เป€เบฎเบฑเบ”เปเบšเบšเบ™เบตเป‰เบเบฑเบšเบซเบ™เบนเบเบฑเบšเบชเบตเปˆเบซเบฅเปˆเบฝเบกเบชเบตเปเบ”เบ‡เบ—เบฑเบ‡เบซเบกเบปเบ”เปเบกเปˆเบ™เบšเปเปˆเป€เบ›เบฑเบ™เบกเบฐเบ™เบธเบ”เบซเบผเบฒเบ - เบ™เบตเป‰เบšเปเปˆเปเบกเปˆเบ™เบชเบดเปˆเบ‡เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบ„เบฒเบ”เบซเบงเบฑเบ‡เบˆเบฒเบ Airflow. เบ•เบฒเบกเบ—เปเบฒเบกเบฐเบŠเบฒเบ”, เบžเบงเบเป€เบฎเบปเบฒเบกเบตเบญเบฒเบงเบธเบ”เบ—เปเบฒเบฅเบฒเบเบกเบฐเบซเบฒเบŠเบปเบ™: Browse/Task Instances

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

เบกเบฒเป€เบฅเบทเบญเบเบ—เบธเบเบขเปˆเบฒเบ‡เบžเป‰เบญเบกเป†เบเบฑเบ™ เปเบฅเบฐเบฃเบตเป€เบŠเบฑเบ”เป€เบ›เบฑเบ™เบชเบนเบ™, เบ„เบฅเบดเบเบฅเบฒเบเบเบฒเบ™เบ—เบตเปˆเบ–เบทเบเบ•เป‰เบญเบ‡:

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

เบซเบผเบฑเบ‡โ€‹เบˆเบฒเบโ€‹เบเบฒเบ™โ€‹เบ—เปเบฒโ€‹เบ„เบงเบฒเบกโ€‹เบชเบฐโ€‹เบญเบฒเบ”โ€‹, เบฅเบปเบ”โ€‹เปเบ—เบฑเบโ€‹เบŠเบตโ€‹เบ‚เบญเบ‡โ€‹เบžเบงเบโ€‹เป€เบฎเบปเบฒโ€‹เบกเบตโ€‹เบฅเบฑเบโ€‹เบชเบฐโ€‹เบ™เบฐโ€‹เบ™เบตเป‰ (เบžเบงเบโ€‹เป€เบ‚เบปเบฒโ€‹เป€เบˆเบปเป‰เบฒโ€‹เป„เบ”เป‰โ€‹เบฅเปโ€‹เบ–เป‰เบฒโ€‹เบชเปเบฒโ€‹เบฅเบฑเบšโ€‹เบเบฒเบ™โ€‹เบˆเบฑเบ”โ€‹เบ•เบฒโ€‹เบ•เบฐโ€‹เบฅเบฒเบ‡โ€‹เบเบฒเบ™โ€‹เปƒเบซเป‰โ€‹เป€เบ‚เบปเบฒโ€‹เป€เบˆเบปเป‰เบฒโ€‹)โ€‹:

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆ, hooks เปเบฅเบฐเบ•เบปเบงเปเบ›เบญเบทเปˆเบ™เป†

เบกเบฑเบ™เป€เบ›เบฑเบ™เป€เบงเบฅเบฒเบ—เบตเปˆเบˆเบฐเป€เบšเบดเปˆเบ‡ DAG เบ•เปเปˆเป„เบ›, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""ะ“ะพัะฟะพะดะฐ ั…ะพั€ะพัˆะธะต, ะพั‚ั‡ะตั‚ั‹ ะพะฑะฝะพะฒะปะตะฝั‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ะะฐั‚ะฐัˆ, ะฟั€ะพัั‹ะฟะฐะนัั, ะผั‹ {{ dag.dag_id }} ัƒั€ะพะฝะธะปะธ
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

เบ—เบธเบเบ„เบปเบ™เป€เบ„เบตเบเป€เบฎเบฑเบ”เบเบฒเบ™เบญเบฑเบšเป€เบ”เบ”เบฅเบฒเบเบ‡เบฒเบ™เบšเป? เบ™เบตเป‰เปเบกเปˆเบ™เบ‚เบญเบ‡เบ™เบฒเบ‡เบญเบตเบเป€เบ—เบทเปˆเบญเบซเบ™เบถเปˆเบ‡: เบกเบตเบšเบฑเบ™เบŠเบตเบฅเบฒเบเบŠเบทเปˆเบ‚เบญเบ‡เปเบซเบผเปˆเบ‡เบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเบšเปˆเบญเบ™เบ—เบตเปˆเป„เบ”เป‰เบฎเบฑเบšเบ‚เปเป‰เบกเบนเบ™; เบกเบตเบšเบฑเบ™เบŠเบตเบฅเบฒเบเบŠเบทเปˆเบ—เบตเปˆเบˆเบฐเป€เบญเบปเบฒ; เบขเปˆเบฒเบฅเบทเบก honk เปƒเบ™เป€เบงเบฅเบฒเบ—เบตเปˆเบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡เป€เบเบตเบ”เบ‚เบถเป‰เบ™เบซเบผเบทเปเบ•เบ (เบ”เบต, เบ™เบตเป‰เบšเปเปˆเปเบกเปˆเบ™เบเปˆเบฝเบงเบเบฑเบšเบžเบงเบเป€เบฎเบปเบฒ, เบšเปเปˆ).

เปƒเบซเป‰เบœเปˆเบฒเบ™เป„เบŸเบฅเปŒเบญเบตเบเป€เบ—เบทเปˆเบญเบซเบ™เบถเปˆเบ‡เปเบฅเบฐเป€เบšเบดเปˆเบ‡เบชเบดเปˆเบ‡เบ—เบตเปˆเบšเปเปˆเบŠเบฑเบ”เป€เบˆเบ™เปƒเบซเบกเปˆ:

  • from commons.operators import TelegramBotSendMessage - เบšเปเปˆเบกเบตเบซเบเบฑเบ‡เบเบตเบ”เบ‚เบงเบฒเบ‡เบžเบงเบเป€เบฎเบปเบฒเบˆเบฒเบเบเบฒเบ™เป€เบฎเบฑเบ”เปƒเบซเป‰เบœเบนเป‰เบ›เบฐเบเบญเบšเบเบฒเบ™เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเป€เบญเบ‡, เป€เบŠเบดเปˆเบ‡เบžเบงเบเป€เบฎเบปเบฒเป„เบ”เป‰เปƒเบŠเป‰เบ›เบฐเป‚เบซเบเบ”เบˆเบฒเบเบเบฒเบ™เบชเป‰เบฒเบ‡ wrapper เบ‚เบฐเบซเบ™เบฒเบ”เบ™เป‰เบญเบเบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบชเบปเปˆเบ‡เบ‚เปเป‰เบ„เบงเบฒเบกเป„เบ›เบซเบฒ Unblocked. (เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบชเบปเบ™เบ—เบฐเบ™เบฒเป€เบžเบตเปˆเบกเป€เบ•เบตเบกเบเปˆเบฝเบงเบเบฑเบšเบœเบนเป‰เบ›เบฐเบเบญเบšเบเบฒเบ™เบ™เบตเป‰เบ‚เป‰เบฒเบ‡เบฅเบธเปˆเบกเบ™เบตเป‰);
  • default_args={} - dag เบชเบฒเบกเบฒเบ”เปเบˆเบเบขเบฒเบเบเบฒเบ™เป‚เบ•เป‰เบ–เบฝเบ‡เบ”เบฝเบงเบเบฑเบ™เบเบฑเบšเบœเบนเป‰เบ›เบฐเบ•เบดเบšเบฑเบ”เบเบฒเบ™เบ—เบฑเบ‡เบซเบกเบปเบ”เบ‚เบญเบ‡เบกเบฑเบ™;
  • to='{{ var.value.all_the_kings_men }}' - เบžเบฒเบเบชเบฐเปœเบฒเบก to เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบšเปเปˆเบกเบต hardcoded, เปเบ•เปˆเบชเป‰เบฒเบ‡เปเบšเบšเป€เบ„เบทเปˆเบญเบ™เป„เบซเบงเป‚เบ”เบเปƒเบŠเป‰ Jinja เปเบฅเบฐเบ•เบปเบงเปเบ›เบ—เบตเปˆเบกเบตเบฅเบฒเบเบŠเบทเปˆเบญเบตเป€เบกเบง, เบ—เบตเปˆเบ‚เป‰เบญเบเป€เบญเบปเบฒเปƒเบˆเปƒเบชเปˆเบขเปˆเบฒเบ‡เบฅเบฐเบกเบฑเบ”เบฅเบฐเบงเบฑเบ‡. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS โ€” เป€เบ‡เบทเปˆเบญเบ™โ€‹เป„เบ‚โ€‹เบชเปเบฒโ€‹เบฅเบฑเบšโ€‹เบเบฒเบ™โ€‹เป€เบฅเบตเปˆเบกโ€‹เบ•เบปเป‰เบ™โ€‹เบเบฒเบ™โ€‹เบ›เบฐโ€‹เบ•เบดโ€‹เบšเบฑเบ”โ€‹เบเบฒเบ™โ€‹. เปƒเบ™โ€‹เบเปโ€‹เบฅเบฐโ€‹เบ™เบตโ€‹เบ‚เบญเบ‡โ€‹เบžเบงเบโ€‹เป€เบฎเบปเบฒ, เบˆเบปเบ”โ€‹เบซเบกเบฒเบโ€‹เบชเบฐโ€‹เบšเบฑเบšโ€‹เบˆเบฐโ€‹เบšเบดเบ™โ€‹เป„เบ›โ€‹เบซเบฒโ€‹เบ™เบฒเบโ€‹เบˆเป‰เบฒเบ‡โ€‹เบžเบฝเบ‡โ€‹เปเบ•เปˆโ€‹เบ–เป‰เบฒโ€‹เบซเบฒเบโ€‹เบงเปˆเบฒโ€‹เบเบฒเบ™โ€‹เป€เบžเบดเปˆเบ‡โ€‹เบžเบฒโ€‹เบญเบฒโ€‹เป„เบชโ€‹เบ—เบฑเบ‡โ€‹เบซเบกเบปเบ”โ€‹เป„เบ”เป‰โ€‹เป€เบฎเบฑเบ”โ€‹เบงเบฝเบโ€‹เบญเบญเบ เบชเปเบฒเป€เบฅเบฑเบ”;
  • tg_bot_conn_id='tg_main' - เบเบฒเบ™เป‚เบ•เป‰เบ–เบฝเบ‡ conn_id เบเบญเบกเบฎเบฑเบš ID เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบชเป‰เบฒเบ‡ Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - เบ‚เปเป‰เบ„เบงเบฒเบกเปƒเบ™ Telegram เบˆเบฐเบšเบดเบ™เป„เบ›เบžเบฝเบ‡เปเบ•เปˆเบ–เป‰เบฒเบกเบตเบงเบฝเบเบ‡เบฒเบ™เบซเบผเบธเบ”เบฅเบปเบ‡;
  • task_concurrency=1 - เบžเบงเบโ€‹เป€เบฎเบปเบฒโ€‹เบซเป‰เบฒเบกโ€‹เบเบฒเบ™โ€‹เป€เบ›เบตเบ”โ€‹เบ•เบปเบงโ€‹เบžเป‰เบญเบกโ€‹เบเบฑเบ™โ€‹เบ‚เบญเบ‡โ€‹เบซเบผเบฒเบโ€‹เบ•เบปเบงโ€‹เบขเปˆเบฒเบ‡โ€‹เบ‚เบญเบ‡โ€‹เบงเบฝเบโ€‹เบ‡เบฒเบ™โ€‹เบซเบ™เบถเปˆเบ‡โ€‹. เบ–เป‰เบฒเบšเปเปˆเบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™, เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเป„เบ”เป‰เบฎเบฑเบšเบเบฒเบ™เป€เบ›เบตเบ”เบ•เบปเบงเบžเป‰เบญเบกเป†เบเบฑเบ™เบ‚เบญเบ‡เบซเบผเบฒเบเป†เบ„เบปเบ™ VerticaOperator (เป€เบšเบดเปˆเบ‡เบขเบนเปˆเปƒเบ™เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบซเบ™เบถเปˆเบ‡);
  • report_update >> [email, tg] - เบ—เบธเบเบขเปˆเบฒเบ‡ VerticaOperator เบกเบฒโ€‹เบฎเปˆเบงเบกโ€‹เบเบฑเบ™โ€‹เปƒเบ™โ€‹เบเบฒเบ™โ€‹เบชเบปเปˆเบ‡โ€‹เบˆเบปเบ”โ€‹เบซเบกเบฒเบโ€‹เปเบฅเบฐโ€‹เบ‚เปเป‰โ€‹เบ„เบงเบฒเบกโ€‹, เป€เบŠเบฑเปˆเบ™โ€‹เบ™เบตเป‰โ€‹:
    Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

    เปเบ•เปˆเป€เบ™เบทเปˆเบญเบ‡เบˆเบฒเบเบœเบนเป‰เบ›เบฐเบ•เบดเบšเบฑเบ”เบเบฒเบ™เปเบˆเป‰เบ‡เบเบฒเบ™เบกเบตเป€เบ‡เบทเปˆเบญเบ™เป„เบ‚เบเบฒเบ™เป€เบ›เบตเบ”เบ•เบปเบงเบ—เบตเปˆเปเบ•เบเบ•เปˆเบฒเบ‡เบเบฑเบ™, เบžเบฝเบ‡เปเบ•เปˆเบซเบ™เบถเปˆเบ‡เบˆเบฐเป€เบฎเบฑเบ”เบงเบฝเบ. เปƒเบ™ Tree View, เบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡เป€เบšเบดเปˆเบ‡เบซเบ™เป‰เบญเบเบฅเบปเบ‡เป€เบฅเบฑเบเบ™เป‰เบญเบ:
    Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

เบ‚เป‰เบญเบเบˆเบฐเป€เบงเบปเป‰เบฒเบชเบญเบ‡เบชเบฒเบกเบ„เปเบฒเบเปˆเบฝเบงเบเบฑเบš เบกเบฐเบซเบฒเบžเบฒเบ เปเบฅเบฐโ€‹เบซเบกเบนเปˆโ€‹เป€เบžเบทเปˆเบญเบ™โ€‹เบ‚เบญเบ‡โ€‹เป€เบ‚เบปเบฒโ€‹เป€เบˆเบปเป‰เบฒ - เบ•เบปเบงเปเบ›.

Macros เปเบกเปˆเบ™เบ•เบปเบงเบเบถเบ” Jinja เบ—เบตเปˆเบชเบฒเบกเบฒเบ”เบ—เบปเบ”เปเบ—เบ™เบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเป€เบ›เบฑเบ™เบ›เบฐเป‚เบซเบเบ”เบ•เปˆเบฒเบ‡เป†เป€เบ‚เบปเป‰เบฒเปƒเบ™เบเบฒเบ™เป‚เบ•เป‰เบ–เบฝเบ‡เบ‚เบญเบ‡เบœเบนเป‰เบ›เบฐเบเบญเบšเบเบฒเบ™. เบ•เบปเบงเบขเปˆเบฒเบ‡เป€เบŠเบฑเปˆเบ™เบ™เบตเป‰:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} เบˆเบฐเบ‚เบฐเบซเบเบฒเบเป„เบ›เบชเบนเปˆเป€เบ™เบทเป‰เบญเปƒเบ™เบ‚เบญเบ‡เบ•เบปเบงเปเบ›เบ‚เบญเบ‡เบชเบฐเบžเบฒเบšเบเบฒเบ™ execution_date เปƒเบ™เบฎเบนเบšเปเบšเบš YYYY-MM-DD: 2020-07-14. เบชเปˆเบงเบ™เบ—เบตเปˆเบ”เบตเบ—เบตเปˆเบชเบธเบ”เปเบกเปˆเบ™เบงเปˆเบฒเบ•เบปเบงเปเบ›เบšเปเบฅเบดเบšเบปเบ”เบ–เบทเบเบ•เบดเบ”เปƒเบชเปˆเบเบฑเบšเบ•เบปเบงเบขเปˆเบฒเบ‡เบงเบฝเบเบชเบฐเป€เบžเบฒเบฐ (เบชเบตเปˆเบซเบผเปˆเบฝเบกเปƒเบ™ Tree View), เปเบฅเบฐเป€เบกเบทเปˆเบญเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เปƒเบซเบกเปˆ, เบ•เบปเบงเบเบถเบ”เบˆเบฐเบ‚เบฐเบซเบเบฒเบเป„เบ›เบชเบนเปˆเบ„เปˆเบฒเบ”เบฝเบงเบเบฑเบ™.

เบ„เปˆเบฒเบ—เบตเปˆเป„เบ”เป‰เบฎเบฑเบšเบกเบญเบšเปเบฒเบเบชเบฒเบกเบฒเบ”เป€เบšเบดเปˆเบ‡เป„เบ”เป‰เป‚เบ”เบเปƒเบŠเป‰เบ›เบธเปˆเบก Rendered เปƒเบ™เปเบ•เปˆเบฅเบฐเปœเป‰เบฒเบงเบฝเบ. เบ™เบตเป‰เปเบกเปˆเบ™เบงเบดเบ—เบตเบเบฒเบ™เบชเบปเปˆเบ‡เบˆเบปเบ”เบซเบกเบฒเบ:

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

เปเบฅเบฐเบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™เบขเบนเปˆเปƒเบ™เบงเบฝเบเบ‡เบฒเบ™เบ—เบตเปˆเบกเบตเบเบฒเบ™เบชเบปเปˆเบ‡เบ‚เปเป‰เบ„เบงเบฒเบก:

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

เบšเบฑเบ™เบŠเบตเบฅเบฒเบเบŠเบทเปˆเบ„เบปเบšเบ–เป‰เบงเบ™เบ‚เบญเบ‡ macro เบ—เบตเปˆเบกเบตเปƒเบ™เบ•เบปเบงเบชเปเบฒเบฅเบฑเบšเบฎเบธเปˆเบ™เบซเบผเป‰เบฒเบชเบธเบ”เบ—เบตเปˆเบกเบตเบขเบนเปˆเปเบกเปˆเบ™เบขเบนเปˆเบ—เบตเปˆเบ™เบตเป‰: เบเบฒเบ™เบญเป‰เบฒเบ‡เบญเบตเบ‡เบกเบฐเบซเบฒเบžเบฒเบ

เบเบดเปˆเบ‡เป„เบ›เบเบงเปˆเบฒเบ™เบฑเป‰เบ™, เบ”เป‰เบงเบเบเบฒเบ™เบŠเปˆเบงเบเป€เบซเบผเบทเบญเบ‚เบญเบ‡ plugins, เบžเบงเบเป€เบฎเบปเบฒเบชเบฒเบกเบฒเบ”เบ›เบฐเบเบฒเบ” macro เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเป€เบญเบ‡, เปเบ•เปˆเบ™เบฑเป‰เบ™เปเบกเปˆเบ™เป€เบฅเบทเปˆเบญเบ‡เบญเบทเปˆเบ™.

เบ™เบญเบเป€เบซเบ™เบทเบญเบˆเบฒเบเบชเบดเปˆเบ‡เบ—เบตเปˆเบเปเบฒเบ™เบปเบ”เป„เบงเป‰เบฅเปˆเบงเบ‡เบซเบ™เป‰เบฒ, เบžเบงเบเป€เบฎเบปเบฒเบชเบฒเบกเบฒเบ”เบ—เบปเบ”เปเบ—เบ™เบ„เปˆเบฒเบ‚เบญเบ‡เบ•เบปเบงเปเบ›เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ (เบ‚เป‰เบญเบเป„เบ”เป‰เปƒเบŠเป‰เบกเบฑเบ™เบขเบนเปˆเปƒเบ™เบฅเบฐเบซเบฑเบ”เบ‚เป‰เบฒเบ‡เป€เบ—เบดเบ‡). เบกเบฒเบชเป‰เบฒเบ‡เปƒเบ™ Admin/Variables เบชเบญเบ‡เบชเบฒเบกเบขเปˆเบฒเบ‡:

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

เบ—เบธเบโ€‹เบชเบดเปˆเบ‡โ€‹เบ—เบธเบโ€‹เบขเปˆเบฒเบ‡โ€‹เบ—เบตเปˆโ€‹เบ—เปˆเบฒเบ™โ€‹เบชเบฒโ€‹เบกเบฒเบ”โ€‹เบ™เปเบฒโ€‹เปƒเบŠเป‰โ€‹:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

เบ„เปˆเบฒเบชเบฒเบกเบฒเบ”เป€เบ›เบฑเบ™ Scalar, เบซเบผเบทเบกเบฑเบ™เบชเบฒเบกเบฒเบ”เป€เบ›เบฑเบ™ JSON. เปƒเบ™เบเปเบฅเบฐเบ™เบตเบ‚เบญเบ‡ JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

เบžเบฝเบ‡เปเบ•เปˆเปƒเบŠเป‰เป€เบชเบฑเป‰เบ™เบ—เบฒเบ‡เป„เบ›เบซเบฒเบเบฐเปเบˆเบ—เบตเปˆเบ•เป‰เบญเบ‡เบเบฒเบ™: {{ var.json.bot_config.bot.token }}.

เบ‚เป‰เบฒโ€‹เบžเบฐโ€‹เป€เบˆเบปเป‰เบฒโ€‹เบฎเบนเป‰โ€‹เบซเบ™เบฑเบ‡โ€‹เบชเบทโ€‹เบˆเบฐโ€‹เป€เบงเบปเป‰เบฒโ€‹เบซเบ™เบถเปˆเบ‡โ€‹เบ„เปเบฒโ€‹เปเบฅเบฐโ€‹เบชเบฐโ€‹เปเบ”เบ‡โ€‹เปƒเบซเป‰โ€‹เป€เบซเบฑเบ™ screenshot เบซเบ™เบถเปˆเบ‡โ€‹เบเปˆเบฝเบงโ€‹เบเบฑเบšโ€‹เบเบฒเบ™โ€‹ เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆ. เบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡เปเบกเปˆเบ™เบžเบทเป‰เบ™เบ–เบฒเบ™เบขเบนเปˆเบ—เบตเปˆเบ™เบตเป‰: เปƒเบ™เบซเบ™เป‰เบฒ Admin/Connections เบžเบงเบเป€เบฎเบปเบฒเบชเป‰เบฒเบ‡เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆ, เป€เบžเบตเปˆเบกเบเบฒเบ™เป€เบ‚เบปเป‰เบฒเบชเบนเปˆเบฅเบฐเบšเบปเบš / เบฅเบฐเบซเบฑเบ”เบœเปˆเบฒเบ™เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเปเบฅเบฐเบ•เบปเบงเบเปเบฒเบ™เบปเบ”เบเบฒเบ™เบชเบฐเป€เบžเบฒเบฐเบซเบผเบฒเบเบขเบนเปˆเบ—เบตเปˆเบ™เบฑเป‰เบ™. เปเบšเบšเบ™เบตเป‰:

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

เบฅเบฐเบซเบฑเบ”เบœเปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบ–เบทเบเป€เบ‚เบปเป‰เบฒเบฅเบฐเบซเบฑเบ” (เบขเปˆเบฒเบ‡เบฅเบฐเบญเบฝเบ”เบเบงเปˆเบฒเบ„เปˆเบฒเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™), เบซเบผเบทเบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบญเบญเบเบˆเบฒเบเบ›เบฐเป€เบžเบ”เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆ (เบ”เบฑเปˆเบ‡เบ—เบตเปˆเบ‚เป‰เบญเบเป„เบ”เป‰เป€เบฎเบฑเบ”เบชเปเบฒเบฅเบฑเบš tg_main) - เบ„เบงเบฒเบกเบˆเบดเบ‡เปเบกเปˆเบ™เบงเปˆเบฒเบšเบฑเบ™เบŠเบตเบฅเบฒเบเบŠเบทเปˆเบ‚เบญเบ‡เบ›เบฐเป€เบžเบ”เปเบกเปˆเบ™ hardwired เปƒเบ™เบฎเบนเบšเปเบšเบš Airflow เปเบฅเบฐเบšเปเปˆเบชเบฒเบกเบฒเบ”เบ‚เบฐเบซเบเบฒเบเป„เบ”เป‰เป‚เบ”เบเบšเปเปˆเบกเบตเบเบฒเบ™เป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เบฅเบฐเบซเบฑเบ”เปเบซเบผเปˆเบ‡ (เบ–เป‰เบฒเบซเบฒเบเบงเปˆเบฒเบ—เบฑเบ™เบ—เบตเบ—เบฑเบ™เปƒเบ”เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบšเปเปˆเป„เบ”เป‰ google เบšเบฒเบ‡เบชเบดเปˆเบ‡เบšเบฒเบ‡เบขเปˆเบฒเบ‡, เบเบฐเบฅเบธเบ™เบฒเปเบเป‰เป„เบ‚เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒ), เปเบ•เปˆเบšเปเปˆเบกเบตเบซเบเบฑเบ‡เบˆเบฐเบขเบธเบ”เบžเบงเบเป€เบฎเบปเบฒเบˆเบฒเบเบเบฒเบ™เป„เบ”เป‰เบฎเบฑเบšเบชเบดเบ™เป€เบŠเบทเปˆเบญเบžเบฝเบ‡เปเบ•เปˆเป‚เบ”เบเบเบฒเบ™. เบŠเบทเปˆ.

เบ™เบญเบเบ™เบฑเป‰เบ™เบ—เปˆเบฒเบ™เบเบฑเบ‡เบชเบฒเบกเบฒเบ”เป€เบฎเบฑเบ”เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบˆเปเบฒเบ™เบงเบ™เบซเบ™เบถเปˆเบ‡เบ—เบตเปˆเบกเบตเบŠเบทเปˆเบ”เบฝเบงเบเบฑเบ™: เปƒเบ™เบเปเบฅเบฐเบ™เบตเบ™เบตเป‰, เบงเบดเบ—เบตเบเบฒเบ™ BaseHook.get_connection(), เป€เบŠเบดเปˆเบ‡เป€เบฎเบฑเบ”เปƒเบซเป‰เบžเบงเบเป€เบฎเบปเบฒเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเป‚เบ”เบเบŠเบทเปˆ, เบˆเบฐเปƒเบซเป‰ เบชเบธเปˆเบก เบˆเบฒเบเบซเบผเบฒเบเป† namesakes (เบกเบฑเบ™เบˆเบฐเบกเบตเป€เบซเบ”เบœเบปเบ™เบซเบผเบฒเบเบเบงเปˆเบฒเบ—เบตเปˆเบˆเบฐเป€เบฎเบฑเบ”เปƒเบซเป‰ Round Robin, เปเบ•เปˆเปƒเบซเป‰เบกเบฑเบ™เบขเบนเปˆเปƒเบ™เบˆเบดเบ”เปƒเบˆเบ‚เบญเบ‡เบœเบนเป‰เบžเบฑเบ”เบ—เบฐเบ™เบฒ Airflow).

เบ•เบปเบงเปเบ›เปเบฅเบฐเบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเปเบ™เปˆเบ™เบญเบ™เปเบกเปˆเบ™เป€เบ„เบทเปˆเบญเบ‡เบกเบทเบ—เบตเปˆเป€เบขเบฑเบ™, เปเบ•เปˆเบกเบฑเบ™เบชเปเบฒเบ„เบฑเบ™เบ—เบตเปˆเบˆเบฐเบšเปเปˆเบชเบนเบ™เป€เบชเบเบ„เบงเบฒเบกเบชเบปเบกเบ”เบนเบ™: เบชเปˆเบงเบ™เปƒเบ”เบ‚เบญเบ‡เบเบฒเบ™เป„เบซเบผเบ‚เบญเบ‡เป€เบˆเบปเป‰เบฒเบ—เบตเปˆเบ—เปˆเบฒเบ™เป€เบเบฑเบšเป„เบงเป‰เปƒเบ™เบฅเบฐเบซเบฑเบ”เบ•เบปเบงเบกเบฑเบ™เป€เบญเบ‡, เปเบฅเบฐเบชเปˆเบงเบ™เปƒเบ”เบ—เบตเปˆเบ—เปˆเบฒเบ™เปƒเบซเป‰ Airflow เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เป€เบเบฑเบšเบฎเบฑเบเบชเบฒ. เปƒเบ™เบ”เป‰เบฒเบ™เบซเบ™เบถเปˆเบ‡, เบกเบฑเบ™เบชเบฒเบกเบฒเบ”เบชเบฐเบ”เบงเบเปƒเบ™เบเบฒเบ™เบ›เปˆเบฝเบ™เปเบ›เบ‡เบกเบนเบ™เบ„เปˆเบฒเบขเปˆเบฒเบ‡เป„เบงเบงเบฒ, เบ•เบปเบงเบขเปˆเบฒเบ‡, เบเปˆเบญเบ‡เบˆเบปเบ”เบซเบกเบฒเบ, เบœเปˆเบฒเบ™ UI. เปƒเบ™เบ—เบฒเบ‡เบเบปเบ‡เบเบฑเบ™เบ‚เป‰เบฒเบก, เบ™เบตเป‰เบเบฑเบ‡เป€เบ›เบฑเบ™เบเบฒเบ™เบเบฑเบšเบ„เบทเบ™เป„เบ›เบซเบฒเบเบฒเบ™เบ„เบฅเบดเบเบซเบ™เบน, เบˆเบฒเบเบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒ (เบ‚เป‰เบญเบ) เบ•เป‰เบญเบ‡เบเบฒเบ™เบเปเบฒเบˆเบฑเบ”.

เบเบฒเบ™เป€เบฎเบฑเบ”เบงเบฝเบเบเบฑเบšเบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเปเบกเปˆเบ™เบซเบ™เบถเปˆเบ‡เปƒเบ™เบงเบฝเบเบ‡เบฒเบ™ hooks. เป‚เบ”เบเบ—เบปเปˆเบงเป„เบ›, Airflow hooks เปเบกเปˆเบ™เบˆเบธเบ”เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบกเบฑเบ™เบเบฑเบšเบเบฒเบ™เบšเปเบฅเบดเบเบฒเบ™เบžเบฒเบเบชเปˆเบงเบ™เบ—เบตเบชเบฒเบกเปเบฅเบฐเบซเป‰เบญเบ‡เบชเบฐเบซเบกเบธเบ”. เบ•เบปเบงเบขเปˆเบฒเบ‡: JiraHook เบˆเบฐเป€เบ›เบตเบ”เบฅเบนเบเบ„เป‰เบฒเปƒเบซเป‰เบžเบงเบเป€เบฎเบปเบฒเบžเบปเบงเบžเบฑเบ™เบเบฑเบš Jira (เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบเป‰เบฒเบเบงเบฝเบเป„เบ›เบกเบฒเป„เบ”เป‰), เปเบฅเบฐเบ”เป‰เบงเบเบเบฒเบ™เบŠเปˆเบงเบเป€เบซเบผเบทเบญเบ‚เบญเบ‡ SambaHook เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบเบนเป‰เป„เบŸเบฅเปŒเบ—เป‰เบญเบ‡เบ–เบดเปˆเบ™เป„เบ›เบซเบฒ smb- เบˆเบธเบ”.

เบเบณเบฅเบฑเบ‡เบงเบดเป€เบ„เบฒเบฐเบ•เบปเบงเบ”เบณเป€เบ™เบตเบ™เบเบฒเบ™เปเบšเบšเบเบณเบ™เบปเบ”เป€เบญเบ‡

เปเบฅเบฐโ€‹เบžเบงเบโ€‹เป€เบฎเบปเบฒโ€‹เป„เบ”เป‰โ€‹เป€เบ‚เบปเป‰เบฒโ€‹เป„เบ›โ€‹เปƒเบเป‰โ€‹เบเบฑเบšโ€‹เบเบฒเบ™โ€‹เป€เบšเบดเปˆเบ‡โ€‹เบงเบดโ€‹เบ—เบตโ€‹เบเบฒเบ™โ€‹เบ—เบตเปˆโ€‹เบกเบฑเบ™โ€‹เป€เบฎเบฑเบ”โ€‹เป„เบ”เป‰โ€‹ TelegramBotSendMessage

เบฅเบฐโ€‹เบซเบฑเบ” commons/operators.py เบเบฑเบšเบœเบนเป‰เบ›เบฐเบเบญเบšเบเบฒเบ™เบ•เบปเบงเบˆเบดเบ‡:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

เบ—เบตเปˆเบ™เบตเป‰, เป€เบŠเบฑเปˆเบ™เบ”เบฝเบงเบเบฑเบšเบชเบดเปˆเบ‡เบญเบทเปˆเบ™เปƒเบ™ Airflow, เบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡เปเบกเปˆเบ™เบ‡เปˆเบฒเบเบ”เบฒเบเบซเบผเบฒเบ:

  • เบชเบทเบšเบ—เบญเบ”เบกเบฒเบˆเบฒเบ BaseOperator, เป€เบŠเบดเปˆเบ‡เบ›เบฐเบ•เบดเบšเบฑเบ”เบšเบฒเบ‡เบชเบดเปˆเบ‡เบ—เบตเปˆเบชเบฐเป€เบžเบฒเบฐ Airflow (เป€เบšเบดเปˆเบ‡เบเบฒเบ™เบžเบฑเบเบœเปˆเบญเบ™เบ‚เบญเบ‡เบ—เปˆเบฒเบ™)
  • เบ—เบปเปˆเบ‡เบ™เบฒเบ—เบตเปˆเบ›เบฐเบเบฒเบ” template_fields, เปƒเบ™เบ—เบตเปˆ Jinja เบˆเบฐเบŠเบญเบเบซเบฒเบกเบฐเบซเบฒเบžเบฒเบเป€เบžเบทเปˆเบญเบ›เบฐเบกเบงเบ™เบœเบปเบ™.
  • เบˆเบฑเบ”เปเบˆเบ‡เบเบฒเบ™เป‚เบ•เป‰เบ–เบฝเบ‡เบ—เบตเปˆเบ–เบทเบเบ•เป‰เบญเบ‡เบชเปเบฒเบฅเบฑเบš __init__(), เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เปƒเบ™เบšเปˆเบญเบ™เบ—เบตเปˆเบกเบตเบ„เบงเบฒเบกเบˆเปเบฒเป€เบ›เบฑเบ™.
  • เบžเบงเบเป€เบฎเบปเบฒเบšเปเปˆเป„เบ”เป‰เบฅเบทเบกเบเปˆเบฝเบงเบเบฑเบšเบเบฒเบ™เป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เบ‚เบญเบ‡เบšเบฑเบ™เบžเบฐเบšเบธเบฅเบธเบ”.
  • เป€เบ›เบตเบ” hook เบ—เบตเปˆเบชเบญเบ”เบ„เป‰เบญเบ‡เบเบฑเบ™ TelegramBotHookเป„เบ”เป‰เบฎเบฑเบšเบงเบฑเบ”เบ–เบธเบ‚เบญเบ‡เบฅเบนเบเบ„เป‰เบฒเบˆเบฒเบเบกเบฑเบ™.
  • เบงเบดเบ—เบตเบเบฒเบ™ overridden (เบเปเบฒเบ™เบปเบ”เบ„เบทเบ™เปƒเบซเบกเปˆ). BaseOperator.execute(), เบ—เบตเปˆ Airfow เบˆเบฐ twitch เป€เบกเบทเปˆเบญเป€เบงเบฅเบฒเบ—เบตเปˆเบˆเบฐเป€เบ›เบตเบ”เบ•เบปเบงเบœเบนเป‰เบ›เบฐเบ•เบดเบšเบฑเบ”เบ‡เบฒเบ™ - เปƒเบ™เบกเบฑเบ™เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบ›เบฐเบ•เบดเบšเบฑเบ”เบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ”เบ•เบปเป‰เบ™เบ•เป, เบฅเบทเบกเป€เบ‚เบปเป‰เบฒเบชเบนเปˆเบฅเบฐเบšเบปเบš. (เบžเบงเบเป€เบฎเบปเบฒเป€เบ‚เบปเป‰เบฒเบชเบนเปˆเบฅเบฐเบšเบปเบš, เป‚เบ”เบเบงเบดเบ—เบตเบ—เบฒเบ‡เบเบฒเบ™, เป€เบ‚เบปเป‰เบฒเป„เบ› stdout ะธ stderr - เบเบฒเบ™โ€‹เป„เบซเบผโ€‹เบงเบฝเบ™โ€‹เบ‚เบญเบ‡โ€‹เบญเบฒโ€‹เบเบฒเบ”โ€‹เบˆเบฐโ€‹เบชเบฐโ€‹เบเบฑเบ”โ€‹เบ—เบธเบโ€‹เบชเบดเปˆเบ‡โ€‹เบ—เบธเบโ€‹เบขเปˆเบฒเบ‡โ€‹, เบซเปเปˆโ€‹เบกเบฑเบ™โ€‹เบ‡เบฒเบกโ€‹, decompose เบกเบฑเบ™โ€‹เปƒเบ™โ€‹เบ—เบตเปˆโ€‹เบˆเปเบฒโ€‹เป€เบ›เบฑเบ™โ€‹.

เปƒเบซเป‰เป€เบšเบดเปˆเบ‡เบชเบดเปˆเบ‡เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบกเบต commons/hooks.py. เบชเปˆเบงเบ™เบ—เปเบฒเบญเบดเบ”เบ‚เบญเบ‡เป„เบŸเบฅเปŒ, เบเบฑเบš hook เบ•เบปเบงเบ‚เบญเบ‡เบกเบฑเบ™เป€เบญเบ‡:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

เบ‚เป‰เบญเบเบšเปเปˆเบฎเบนเป‰เบงเปˆเบฒเบˆเบฐเบญเบฐเบ—เบดเบšเบฒเบเบซเบเบฑเบ‡เบขเบนเปˆเบ—เบตเปˆเบ™เบตเป‰, เบ‚เป‰เบญเบเบžเบฝเบ‡เปเบ•เปˆเบˆเบฐเบชเบฑเบ‡เป€เบเบ”เบˆเบธเบ”เบชเปเบฒเบ„เบฑเบ™:

  • เบžเบงเบเป€เบฎเบปเบฒเบชเบทเบšเบ—เบญเบ”, เบ„เบดเบ”เบเปˆเบฝเบงเบเบฑเบšเบเบฒเบ™เป‚เบ•เป‰เบ–เบฝเบ‡ - เปƒเบ™เบเปเบฅเบฐเบ™เบตเบซเบผเบฒเบเบ—เบตเปˆเบชเบธเบ”, เบกเบฑเบ™เบˆเบฐเป€เบ›เบฑเบ™เบซเบ™เบถเปˆเบ‡: conn_id;
  • overriding เบงเบดเบ—เบตเบเบฒเบ™เบกเบฒเบ”เบ•เบฐเบ–เบฒเบ™: เบ‚เป‰เบญเบเบˆเปเบฒเบเบฑเบ”เบ•เบปเบงเป€เบญเบ‡ get_conn(), เปƒเบ™เบ—เบตเปˆเบ‚เป‰เบญเบเป„เบ”เป‰เบฎเบฑเบšเบ•เบปเบงเบเปเบฒเบ™เบปเบ”เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเป‚เบ”เบเบŠเบทเปˆเปเบฅเบฐเบžเบฝเบ‡เปเบ•เปˆเป„เบ”เป‰เบฎเบฑเบšเบชเปˆเบงเบ™ extra (เบ™เบตเป‰เปเบกเปˆเบ™เบŠเปˆเบญเบ‡ JSON), เป€เบŠเบดเปˆเบ‡เบ‚เป‰เบญเบ (เบ•เบฒเบกเบ„เปเบฒเปเบ™เบฐเบ™เปเบฒเบ‚เบญเบ‡เบ‚เป‰เบญเบเป€เบญเบ‡!) เปƒเบชเปˆ Telegram bot token: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • เบ‚เป‰เบญเบเบชเป‰เบฒเบ‡เบ•เบปเบงเบขเปˆเบฒเบ‡เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ TelegramBot, เปƒเบซเป‰เบกเบฑเบ™เป€เบ›เบฑเบ™ token เบชเบฐเป€เบžเบฒเบฐ.

เบซเบกเบปเบ”โ€‹เป€เบ—เบปเปˆเบฒโ€‹เบ™เบตเป‰. เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เป„เบ”เป‰เบฎเบฑเบšเบฅเบนเบเบ„เป‰เบฒเบˆเบฒเบ hook เป‚เบ”เบเปƒเบŠเป‰ TelegramBotHook().clent เบซเบผเบท TelegramBotHook().get_conn().

เปเบฅเบฐเบชเปˆเบงเบ™เบ—เบตเบชเบญเบ‡เบ‚เบญเบ‡เป„เบŸเบฅเปŒ, เปƒเบ™เบ—เบตเปˆเบ‚เป‰เบญเบเป€เบฎเบฑเบ” microwrapper เบชเปเบฒเบฅเบฑเบš Telegram REST API, เป€เบžเบทเปˆเบญเบšเปเปˆเปƒเบซเป‰เบฅเบฒเบเบ„เบทเบเบฑเบ™. python-telegram-bot เบชเปเบฒเบฅเบฑเบšเบงเบดเบ—เบตเบเบฒเบ™เบซเบ™เบถเปˆเบ‡ sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

เบงเบดเบ—เบตเบ—เบตเปˆเบ–เบทเบเบ•เป‰เบญเบ‡เปเบกเปˆเบ™เบเบฒเบ™เป€เบžเบตเปˆเบกเบกเบฑเบ™เบ—เบฑเบ‡เปเบปเบ”: TelegramBotSendMessage, TelegramBotHook, TelegramBot - เปƒเบ™ plugin, เปƒเบชเปˆเปƒเบ™ repository เบชเบฒเบ—เบฒเบฅเบฐเบ™เบฐ, เปเบฅเบฐเปƒเบซเป‰เบกเบฑเบ™เบเบฑเบš Open Source.

เปƒเบ™เบ‚เบฐเบ™เบฐเบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบชเบถเบเบชเบฒเบ—เบฑเบ‡เบซเบกเบปเบ”เบ™เบตเป‰, เบเบฒเบ™เบ›เบฑเบšเบ›เบธเบ‡เบšเบปเบ”เบฅเบฒเบเบ‡เบฒเบ™เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเป„เบ”เป‰เบฅเบปเป‰เบกเป€เบซเบฅเบงเบขเปˆเบฒเบ‡เบชเปเบฒเป€เบฅเบฑเบ”เบœเบปเบ™เปเบฅเบฐเบชเบปเปˆเบ‡เบ‚เปเป‰เบ„เบงเบฒเบกเบ‚เปเป‰เบœเบดเบ”เบžเบฒเบ”เปƒเบซเป‰เบ‚เป‰เบญเบเปƒเบ™เบŠเปˆเบญเบ‡เบ—เบฒเบ‡. เบ‚เป‰เบญเบโ€‹เบˆเบฐโ€‹เบเบงเบ”โ€‹เป€เบšเบดเปˆเบ‡โ€‹เบงเปˆเบฒโ€‹เบœเบดเบ”โ€‹เบšเป...

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™
เบšเบฒเบ‡เบชเบดเปˆเบ‡เบšเบฒเบ‡เบขเปˆเบฒเบ‡เปเบ•เบเบขเบนเปˆเปƒเบ™ doge เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ! เบกเบฑเบ™เบšเปเปˆเปเบกเปˆเบ™เบชเบดเปˆเบ‡เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบ„เบฒเบ”เบซเบงเบฑเบ‡? เบขเปˆเบฒเบ‡โ€‹เปเบ™เปˆโ€‹เบ™เบญเบ™!

เป€เบˆเบปเป‰เบฒเบˆเบฐเบ–เบญเบเบšเป?

เป€เบˆเบปเป‰เบฒเบฎเบนเป‰เบชเบถเบเบงเปˆเบฒเบ‚เป‰เบญเบเบžเบฒเบ”เบšเบฒเบ‡เบชเบดเปˆเบ‡เบšเบฒเบ‡เบขเปˆเบฒเบ‡เบšเป? เบกเบฑเบ™เป€เบšเบดเปˆเบ‡เบ„เบทเบงเปˆเบฒเบฅเบฒเบงเบชเบฑเบ™เบเบฒเบงเปˆเบฒเบˆเบฐเป‚เบญเบ™เบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบ SQL Server เบเบฑเบš Vertica, เปเบฅเบฐเบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™เบฅเบฒเบงเป„เบ”เป‰เป€เบญเบปเบฒเบกเบฑเบ™เปเบฅเบฐเบเป‰เบฒเบเบญเบญเบเบˆเบฒเบเบซเบปเบงเบ‚เปเป‰, เบ„เบปเบ™เบ‚เบตเป‰เบ„เป‰เบฒเบ™!

เบ„เบงเบฒเบกเป‚เบซเบ”เบฎเป‰เบฒเบเบ™เบตเป‰เปเบกเปˆเบ™เบกเบตเป€เบˆเบ”เบ•เบฐเบ™เบฒ, เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบžเบฝเบ‡เปเบ•เปˆเบ•เป‰เบญเบ‡เป„เบ”เป‰เบ–เบญเบ”เบฅเบฐเบซเบฑเบ”เบšเบฒเบ‡เบ„เปเบฒเบชเบฑเบšเบชเปเบฒเบฅเบฑเบšเบ—เปˆเบฒเบ™. เปƒเบ™เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เป„เบ›เบ•เบทเปˆเบกเบญเบตเบ.

เปเบœเบ™โ€‹เบเบฒเบ™โ€‹เบ‚เบญเบ‡โ€‹เบžเบงเบโ€‹เป€เบฎเบปเบฒโ€‹เปเบกเปˆเบ™โ€‹เบ™เบตเป‰โ€‹:

  1. เป€เบฎเบฑเบ”เบšเปเปˆ
  2. เบชเป‰เบฒเบ‡เปœเป‰เบฒเบงเบฝเบ
  3. เป€เบšเบดเปˆเบ‡เบงเปˆเบฒเบ—เบธเบเบขเปˆเบฒเบ‡เบชเบงเบเบ‡เบฒเบกเบ›เบฒเบ™เปƒเบ”
  4. เบกเบญเบšเบซเบกเบฒเบเป€เบฅเบเป€เบŠเบ”เบŠเบฑเบ™เปƒเบซเป‰เบ•เบทเปˆเบก
  5. เป€เบญเบปเบฒเบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบ SQL Server
  6. เป€เบญเบปเบฒเบ‚เปเป‰เบกเบนเบ™เป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™ Vertica
  7. เป€เบเบฑเบšเบเปเบฒเบชเบฐเบ–เบดเบ•เบด

เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™, เป€เบžเบทเปˆเบญเป€เบฎเบฑเบ”เปƒเบซเป‰เบชเบดเปˆเบ‡เบ—เบฑเบ‡เบซเบกเบปเบ”เบ™เบตเป‰เป€เบžเบตเปˆเบกเบ‚เบถเป‰เบ™, เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเป„เบ”เป‰เป€เบžเบตเปˆเบกเบ‚เบฐเบซเบ™เบฒเบ”เบ™เป‰เบญเบเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

เบขเบนเปˆเบ—เบตเปˆเบ™เบฑเป‰เบ™เบžเบงเบเป€เบฎเบปเบฒเบเบปเบเบ‚เบถเป‰เบ™เบกเบฒ:

  • Vertica เป€เบ›เบฑเบ™เป€เบˆเบปเป‰เบฒเบžเบฒเบš dwh เบ”เป‰เบงเบเบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เบ—เบตเปˆเบชเบธเบ”,
  • เบชเบฒเบกเบ•เบปเบงเบขเปˆเบฒเบ‡เบ‚เบญเบ‡ SQL Server,
  • เบžเบงเบโ€‹เป€เบฎเบปเบฒโ€‹เบ•เบทเปˆเบกโ€‹เบ‚เปเป‰โ€‹เบกเบนเบ™โ€‹เปƒเบชเปˆโ€‹เบ–เบฒเบ™โ€‹เบ‚เปเป‰โ€‹เบกเบนเบ™โ€‹เปƒเบ™โ€‹เบเปโ€‹เบฅเบฐโ€‹เบ™เบตโ€‹เบ—เบตเปˆโ€‹เบกเบตโ€‹เบšเบฒเบ‡โ€‹เบ‚เปเป‰โ€‹เบกเบนเบ™ (เปƒเบ™โ€‹เบเปโ€‹เบฅเบฐโ€‹เบ™เบตโ€‹เบ—เบตเปˆโ€‹เบšเปเปˆโ€‹เบกเบตโ€‹เบšเปเปˆโ€‹เป„เบ”เป‰โ€‹เป€เบšเบดเปˆเบ‡โ€‹เป€เบ‚เบปเป‰เบฒโ€‹เป„เบ›โ€‹เปƒเบ™ mssql_init.py!)

เบžเบงเบเป€เบฎเบปเบฒเป€เบ›เบตเบ”เบ•เบปเบงเบชเบดเปˆเบ‡เบ—เบตเปˆเบ”เบตเบ—เบฑเบ‡เบซเบกเบปเบ”เบ”เป‰เบงเบเบเบฒเบ™เบŠเปˆเบงเบเป€เบซเบผเบทเบญเบ‚เบญเบ‡เบ„เปเบฒเบชเบฑเปˆเบ‡เบ—เบตเปˆเบชเบฑเบšเบชเบปเบ™เป€เบฅเบฑเบเบ™เป‰เบญเบเบเบงเปˆเบฒเบ„เบฑเป‰เบ‡เบ—เบตเปˆเบœเปˆเบฒเบ™เบกเบฒ:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

เบชเบดเปˆเบ‡เบ—เบตเปˆเบกเบฐเบซเบฑเบ”เบชเบฐเบˆเบฑเบ™ randomizer เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเบชเป‰เบฒเบ‡, เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบ™เปเบฒเปƒเบŠเป‰เบฅเบฒเบเบเบฒเบ™ Data Profiling/Ad Hoc Query:

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™
เบชเบดเปˆเบ‡เบ—เบตเปˆเบชเปเบฒเบ„เบฑเบ™เปเบกเปˆเบ™เบšเปเปˆเบชเบฐเปเบ”เบ‡เปƒเบซเป‰เบ™เบฑเบเบงเบดเป€เบ„เบฒเบฐ

เบฅเบฐเบญเบฝเบ”เบเปˆเบฝเบงเบเบฑเบš เป€เบŠเบ”เบŠเบฑเบ™ ETL เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบˆเบฐเบšเปเปˆ, เบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡เปเบกเปˆเบ™เป€เบฅเบทเปˆเบญเบ‡เป€เบฅเบฑเบเป†เบ™เป‰เบญเบเป†เบขเบนเปˆเบ—เบตเปˆเบ™เบฑเป‰เบ™: เบžเบงเบเป€เบฎเบปเบฒเบชเป‰เบฒเบ‡เบžเบทเป‰เบ™เบ–เบฒเบ™, เบกเบตเป€เบ„เบทเปˆเบญเบ‡เบซเบกเบฒเบเปƒเบ™เบกเบฑเบ™, เบžเบงเบเป€เบฎเบปเบฒเบซเปเปˆเบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡เบ”เป‰เบงเบเบ•เบปเบงเบˆเบฑเบ”เบเบฒเบ™เบชเบฐเบžเบฒเบšเบเบฒเบ™, เปเบฅเบฐเบ•เบญเบ™เบ™เบตเป‰เบžเบงเบเป€เบฎเบปเบฒเป€เบฎเบฑเบ”เบชเบดเปˆเบ‡เบ™เบตเป‰:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

เป€เบงเบฅเบฒเบกเบฒเบฎเบญเบ”เปเบฅเป‰เบง เป€เบเบฑเบšเบเปเบฒเบ‚เปเป‰เบกเบนเบ™เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ เบˆเบฒเบเบซเบ™เบถเปˆเบ‡เปเบฅเบฐเป€เบ„เบดเปˆเบ‡เบซเบ™เบถเปˆเบ‡เบฎเป‰เบญเบเบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ. เปƒเบซเป‰เป€เบฎเบฑเบ”เบชเบดเปˆเบ‡เบ™เบตเป‰เบ”เป‰เบงเบเบเบฒเบ™เบŠเปˆเบงเบเป€เบซเบผเบทเบญเบ‚เบญเบ‡เบชเบฒเบ unpretentious เบซเบผเบฒเบ:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. เบ”เป‰เบงเบเบเบฒเบ™เบŠเปˆเบงเบเป€เบซเบผเบทเบญเบ‚เบญเบ‡ hook เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเป„เบ”เป‰เบฎเบฑเบšเบˆเบฒเบ Airflow pymssql- เป€เบŠเบทเปˆเบญเบกโ€‹เบ•เปเปˆโ€‹
  2. เปƒเบซเป‰เบ—เบปเบ”เปเบ—เบ™เบ‚เปเป‰เบˆเปเบฒเบเบฑเบ”เปƒเบ™เบฎเบนเบšเปเบšเบšเบ‚เบญเบ‡เบงเบฑเบ™เบ—เบตเป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เบ„เปเบฒเบฎเป‰เบญเบ‡เบ‚เป - เบกเบฑเบ™เบˆเบฐเบ–เบทเบเป‚เบเบ™เป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เบซเบ™เป‰เบฒเบ—เบตเปˆเป‚เบ”เบเป€เบ„เบทเปˆเบญเบ‡เบˆเบฑเบเปเบกเปˆเปเบšเบš.
  3. เบเบฒเบ™เปƒเบซเป‰เบญเบฒเบซเบฒเบ™เบ„เปเบฒเบฎเป‰เบญเบ‡เบ‚เปเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ pandasเบœเบนเป‰โ€‹เบ—เบตเปˆโ€‹เบˆเบฐโ€‹เป„เบ”เป‰โ€‹เบฎเบฑเบšโ€‹เบžเบงเบโ€‹เป€เบฎเบปเบฒโ€‹ DataFrame - เบกเบฑเบ™เบˆเบฐเป€เบ›เบฑเบ™เบ›เบฐเป‚เบซเบเบ”เบ•เปเปˆเบžเบงเบเป€เบฎเบปเบฒเปƒเบ™เบญเบฐเบ™เบฒเบ„เบปเบ”.

เบ‚เป‰เบญเบเบเปเบฒเบฅเบฑเบ‡เปƒเบŠเป‰เบเบฒเบ™เบ—เบปเบ”เปเบ—เบ™ {dt} เปเบ—เบ™เบ—เบตเปˆเบˆเบฐเป€เบ›เบฑเบ™เบ•เบปเบงเบเปเบฒเบ™เบปเบ”เบเบฒเบ™เบฎเป‰เบญเบ‡เบ‚เป %s เบšเปเปˆเปเบกเปˆเบ™เบเป‰เบญเบ™เบงเปˆเบฒเบ‚เป‰เบญเบเป€เบ›เบฑเบ™ Pinocchio เบŠเบปเปˆเบงเบฎเป‰เบฒเบ, เปเบ•เปˆเป€เบ™เบทเปˆเบญเบ‡เบˆเบฒเบเบงเปˆเบฒ pandas เบšเปเปˆเบชเบฒเบกเบฒเบ”เบˆเบฑเบ”เบเบฒเบ™เป„เบ”เป‰ pymssql เปเบฅเบฐเป€เบฅเบทเปˆเบญเบ™เบญเบฑเบ™เบชเบธเบ”เบ—เป‰เบฒเบ params: Listเป€เบ–เบดเบ‡เปเบกเปˆเบ™เบงเปˆเบฒเบฅเบฒเบงเบ•เป‰เบญเบ‡เบเบฒเบ™เปเบ—เป‰เป† tuple.
เบเบฑเบ‡เบชเบฑเบ‡เป€เบเบ”เบงเปˆเบฒเบ™เบฑเบเบžเบฑเบ”เบ—เบฐเบ™เบฒ pymssql เป„เบ”เป‰โ€‹เบ•เบฑเบ”โ€‹เบชเบดเบ™โ€‹เปƒเบˆโ€‹เบ—เบตเปˆโ€‹เบˆเบฐโ€‹เบšเปเปˆโ€‹เบชเบฐโ€‹เบซเบ™เบฑเบšโ€‹เบชเบฐโ€‹เบซเบ™เบนเบ™โ€‹เป€เบ‚เบปเบฒโ€‹เบญเบตเบโ€‹เบ•เปเปˆโ€‹เป„เบ›โ€‹, เปเบฅเบฐโ€‹เบกเบฑเบ™โ€‹เป€เบ–เบดเบ‡โ€‹เป€เบงโ€‹เบฅเบฒโ€‹เบ—เบตเปˆโ€‹เบˆเบฐโ€‹เบเป‰เบฒเบโ€‹เบญเบญเบโ€‹ pyodbc.

เบกเบฒเป€เบšเบดเปˆเบ‡เบชเบดเปˆเบ‡เบ—เบตเปˆ Airflow เบ›เบฐเบเบญเบšเบเบฒเบ™เป‚เบ•เป‰เบ–เบฝเบ‡เบ‚เบญเบ‡เบซเบ™เป‰เบฒเบ—เบตเปˆเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเบเบฑเบš:

Apache Airflow: เป€เบฎเบฑเบ”เปƒเบซเป‰ ETL เบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™

เบ–เป‰เบฒเบšเปเปˆเบกเบตเบ‚เปเป‰เบกเบนเบ™, เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™เบšเปเปˆเบกเบตเบˆเบธเบ”เบ—เบตเปˆเบˆเบฐเบชเบทเบšเบ•เปเปˆ. เปเบ•เปˆเบกเบฑเบ™เบเปเปˆเป€เบ›เบฑเบ™เป€เบฅเบทเปˆเบญเบ‡เปเบ›เบเบ—เบตเปˆเบˆเบฐเบžเบดเบˆเบฒเบฅเบฐเบ™เบฒเบเบฒเบ™เบ•เบทเปˆเบกเบ‚เปเป‰เบกเบนเบ™เบชเบปเบšเบœเบปเบ™เบชเปเบฒเป€เบฅเบฑเบ”. เปเบ•เปˆเบ™เบตเป‰เบšเปเปˆเปเบกเปˆเบ™เบ„เบงเบฒเบกเบœเบดเบ”เบžเบฒเบ”. เบญเบฒ-เบญเบฒ-เบญเบฒ, เบˆเบฐเป€เบฎเบฑเบ”เปเบ™เบงเปƒเบ”?! เปเบฅเบฐเบ™เบตเป‰เปเบกเปˆเบ™เบชเบดเปˆเบ‡เบ—เบตเปˆ:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException เบˆเบฐเบšเบญเบ Airflow เบงเปˆเบฒเบšเปเปˆเบกเบตเบ‚เปเป‰เบœเบดเบ”เบžเบฒเบ”, เปเบ•เปˆเบžเบงเบเป€เบฎเบปเบฒเบ‚เป‰เบฒเบกเบซเบ™เป‰เบฒเบงเบฝเบ. เบเบฒเบ™เป‚เบ•เป‰เบ•เบญเบšเบˆเบฐเบšเปเปˆเบกเบตเบชเบตเปˆเบซเบผเปˆเบฝเบกเบชเบตเบ‚เบฝเบงเบซเบผเบทเบชเบตเปเบ”เบ‡, เปเบ•เปˆเบชเบตเบšเบปเบง.

เปƒเบซเป‰เบ–เบดเป‰เบกเบ‚เปเป‰เบกเบนเบ™เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ เบซเบผเบฒเบเบ–เบฑเบ™:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

เบ„เบท:

  • เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเป€เบญเบปเบฒเบ„เปเบฒเบชเบฑเปˆเบ‡,
  • ID เบ‚เบญเบ‡เบเบญเบ‡เบ›เบฐเบŠเบธเบกเบ™เปเป‰เบฒเบ–เป‰เบงเบกเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ (เบกเบฑเบ™เบˆเบฐเปเบ•เบเบ•เปˆเบฒเบ‡เบเบฑเบ™ เบชเปเบฒเบฅเบฑเบšเบ—เบธเบเป†เบงเบฝเบเบ‡เบฒเบ™),
  • A hash เบˆเบฒเบเปเบซเบผเปˆเบ‡เปเบฅเบฐเบ„เปเบฒเบชเบฑเปˆเบ‡ ID - เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™เปƒเบ™เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เบชเบธเบ”เบ—เป‰เบฒเบ (เบšเปˆเบญเบ™เบ—เบตเปˆเบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡เปเบกเปˆเบ™ poured เป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบซเบ™เบถเปˆเบ‡) เบžเบงเบเป€เบฎเบปเบฒเบกเบต ID เบ„เปเบฒเบชเบฑเปˆเบ‡เป€เบ›เบฑเบ™เป€เบญเบเบฐเบฅเบฑเบ.

เบ‚เบฑเป‰เบ™เบ•เบญเบ™เบชเบธเบ”เบ—เป‰เบฒเบเบเบฑเบ‡เบ„เบปเบ‡เบขเบนเปˆ: เป€เบญเบปเบฒเบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡เป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™ Vertica. เปเบฅเบฐ, oddly เบžเบฝเบ‡เบžเป, เบซเบ™เบถเปˆเบ‡เปƒเบ™เบงเบดเบ—เบตเบ—เบตเปˆเบซเบ™เป‰เบฒเบ›เบฐเบ—เบฑเบšเปƒเบˆเบ—เบตเปˆเบชเบธเบ”เปเบฅเบฐเบ›เบฐเบชเบดเบ”เบ—เบดเบžเบฒเบšเบ—เบตเปˆเบˆเบฐเป€เบฎเบฑเบ”เบ™เบตเป‰เปเบกเปˆเบ™เบœเปˆเบฒเบ™ CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. เบžเบงเบเป€เบฎเบปเบฒเบชเป‰เบฒเบ‡เป€เบ„เบทเปˆเบญเบ‡เบฎเบฑเบšเบžเบดเป€เบชเบ” StringIO.
  2. pandas เบ”เป‰เบงเบเบ„เบงเบฒเบกเบเบฐเบฅเบธเบ™เบฒเบˆเบฐเป€เบฎเบฑเบ”เปƒเบซเป‰เบžเบงเบเป€เบฎเบปเบฒ DataFrame เป€เบ›เบฑเบ™ CSV- เบชเบฒเบ.
  3. เปƒเบซเป‰เป€เบ›เบตเบ”เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบเบฑเบš Vertica เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบกเบฑเบเบ”เป‰เบงเบ hook.
  4. เปเบฅเบฐเปƒเบ™เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™เบ”เป‰เบงเบเบเบฒเบ™เบŠเปˆเบงเบเป€เบซเบผเบทเบญ copy() เบชเบปเปˆเบ‡เบ‚เปเป‰เบกเบนเบ™เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเป‚เบ”เบเบเบปเบ‡เบซเบฒ Vertika!

เบžเบงเบโ€‹เป€เบฎเบปเบฒโ€‹เบˆเบฐโ€‹เป€เบญเบปเบฒโ€‹เบˆเบฒเบโ€‹เบ„เบปเบ™โ€‹เบ‚เบฑเบšโ€‹เบงเปˆเบฒโ€‹เบกเบตโ€‹เปเบ–เบงโ€‹เปƒเบ”โ€‹เบ—เบตเปˆโ€‹เป€เบ•เบฑเบกโ€‹เป„เบ›โ€‹, เปเบฅเบฐโ€‹เบšเบญเบโ€‹เบœเบนเป‰โ€‹เบˆเบฑเบ”โ€‹เบเบฒเบ™โ€‹เบเบญเบ‡โ€‹เบ›เบฐโ€‹เบŠเบธเบกโ€‹เบงเปˆเบฒโ€‹เบ—เบธเบโ€‹เบชเบดเปˆเบ‡โ€‹เบ—เบธเบโ€‹เบขเปˆเบฒเบ‡โ€‹เปเบกเปˆเบ™ OKโ€‹:

session.loaded_rows = cursor.rowcount
session.successful = True

เบซเบกเบปเบ”โ€‹เป€เบ—เบปเปˆเบฒโ€‹เบ™เบตเป‰.

เปƒเบ™เบเบฒเบ™เบ‚เบฒเบ, เบžเบงเบเป€เบฎเบปเบฒเบชเป‰เบฒเบ‡เปเบœเปˆเบ™เป€เบ›เบปเป‰เบฒเบซเบกเบฒเบเบ”เป‰เบงเบเบ•เบปเบ™เป€เบญเบ‡. เบ—เบตเปˆเบ™เบตเป‰เบ‚เป‰เบญเบเบญเบฐเบ™เบธเบเบฒเบ”เปƒเบซเป‰เบ•เบปเบ™เป€เบญเบ‡เบกเบตเป€เบ„เบทเปˆเบญเบ‡เบˆเบฑเบเบ‚เบฐเบซเบ™เบฒเบ”เบ™เป‰เบญเบ:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

เบ‚เป‰เบญเบเบเปเบฒเบฅเบฑเบ‡เปƒเบŠเป‰ VerticaOperator() เบ‚เป‰เบญเบเบชเป‰เบฒเบ‡ schema เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เปเบฅเบฐเบ•เบฒเบ•เบฐเบฅเบฒเบ‡ (เบ–เป‰เบฒเบžเบงเบเป€เบ‚เบปเบฒเบšเปเปˆเบกเบตเบขเบนเปˆเปเบฅเป‰เบง, เปเบ™เปˆเบ™เบญเบ™). เบชเบดเปˆเบ‡เบ—เบตเปˆ เบชเบณ เบ„เบฑเบ™เปเบกเปˆเบ™เบเบฒเบ™เบˆเบฑเบ”เปเบˆเบ‡เบเบฒเบ™เป€เบžเบดเปˆเบ‡เบžเบฒเบญเบฒเป„เบชเบขเปˆเบฒเบ‡เบ–เบทเบเบ•เป‰เบญเบ‡:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Summing up

- เบ”เบต, - เบซเบ™เบนเบ™เป‰เบญเบเป€เบงเบปเป‰เบฒเบงเปˆเบฒ, - เบšเปเปˆเปเบกเปˆเบ™เบกเบฑเบ™, เปƒเบ™เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™
เป€เบˆเบปเป‰เบฒเปเบฑเป‰เบ™เปƒเบˆเบšเปเบงเปˆเบฒเบ‚เป‰เบญเบเป€เบ›เบฑเบ™เบชเบฑเบ”เบ—เบตเปˆเบ‚เบตเป‰เบฎเป‰เบฒเบเบ—เบตเปˆเบชเบธเบ”เปƒเบ™เบ›เปˆเบฒ?

Julia Donaldson, The Gruffalo

เบ‚เป‰เบญเบเบ„เบดเบ”เบงเปˆเบฒเบ–เป‰เบฒเป€เบžเบทเปˆเบญเบ™เบฎเปˆเบงเบกเบ‡เบฒเบ™เบ‚เบญเบ‡เบ‚เป‰เบญเบเปเบฅเบฐเบ‚เป‰เบญเบเบกเบตเบเบฒเบ™เปเบ‚เปˆเบ‡เบ‚เบฑเบ™: เบœเบนเป‰เบ—เบตเปˆเบˆเบฐเบชเป‰เบฒเบ‡เปเบฅเบฐเป€เบ›เบตเบ”เบ•เบปเบงเบ‚เบฐเบšเบงเบ™เบเบฒเบ™ ETL เบขเปˆเบฒเบ‡เป„เบงเบงเบฒเบˆเบฒเบเบˆเบธเบ”เป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™: เบžเบงเบเป€เบ‚เบปเบฒเบเบฑเบš SSIS เปเบฅเบฐเบซเบ™เบนเปเบฅเบฐเบ‚เป‰เบญเบเบเบฑเบš Airflow ... เปเบฅเบฐเบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™เบžเบงเบเป€เบฎเบปเบฒเบเบฑเบ‡เบˆเบฐเบ›เบฝเบšเบ—เบฝเบšเบ„เบงเบฒเบกเบ‡เปˆเบฒเบเบ‚เบญเบ‡เบเบฒเบ™เบšเปเบฒเบฅเบธเบ‡เบฎเบฑเบเบชเบฒ ... เบงเป‰เบฒเบง, เบ‚เป‰เบญเบเบ„เบดเบ”เบงเปˆเบฒเป€เบˆเบปเป‰เบฒเบˆเบฐเป€เบซเบฑเบ™เบ”เบตเบงเปˆเบฒเบ‚เป‰เบญเบเบˆเบฐเบ•เบตเบžเบงเบเป€เบ‚เบปเบฒเปƒเบ™เบ—เบธเบเบ”เป‰เบฒเบ™!

เบ–เป‰เบฒเบซเบฒเบเบงเปˆเบฒเป€เบฅเบฑเบเบ™เป‰เบญเบเบซเบผเบฒเบ, เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™ Apache Airflow - เป‚เบ”เบเบเบฒเบ™เบญเบฐเบ—เบดเบšเบฒเบเบ‚เบฐเบšเบงเบ™เบเบฒเบ™เปƒเบ™เบฎเบนเบšเปเบšเบšเบ‚เบญเบ‡เบฅเบฐเบซเบฑเบ”เป‚เบ„เบ‡เบเบฒเบ™ - เป„เบ”เป‰เป€เบฎเบฑเบ”เบงเบฝเบเบ‚เบญเบ‡เบ‚เป‰เบญเบ. เบซเบผเบฒเบ เบชเบฐเบ”เบงเบเบชเบฐเบšเบฒเบ เปเบฅเบฐเบกเบตเบ„เบงเบฒเบกเบชเบธเบเบซเบผเบฒเบเบ‚เบถเป‰เบ™.

เบเบฒเบ™เบ‚เบฐเบซเบเบฒเบเบ—เบตเปˆเบšเปเปˆเบˆเปเบฒเบเบฑเบ”เบ‚เบญเบ‡เบกเบฑเบ™, เบ—เบฑเบ‡เปƒเบ™เปเบ‡เปˆเบ‚เบญเบ‡ plug-ins เปเบฅเบฐ predisposition เบเบฑเบš scalability, เป€เบฎเบฑเบ”เปƒเบซเป‰เบ—เปˆเบฒเบ™เบกเบตเป‚เบญเบเบฒเบ”เบ—เบตเปˆเบˆเบฐเบ™เปเบฒเปƒเบŠเป‰ Airflow เปƒเบ™เป€เบเบทเบญเบšเบ—เบธเบเบžเบทเป‰เบ™เบ—เบตเปˆ: เป€เบ–เบดเบ‡เปเบกเปˆเบ™เบงเปˆเบฒเปƒเบ™เบงเบปเบ‡เบˆเบญเบ™เป€เบ•เบฑเบกเบ‚เบญเบ‡เบเบฒเบ™เป€เบเบฑเบšเบเปเบฒ, เบเบฒเบ™เบเบฐเบเบฝเบกเปเบฅเบฐเบเบฒเบ™เบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™, เป€เบ–เบดเบ‡เปเบกเปˆเบ™เบงเปˆเบฒเปƒเบ™เบเบฒเบ™เป€เบ›เบตเบ”เบ•เบปเบงเบˆเบฐเบฅเบงเบ” (เป„เบ› Mars, เบ‚เบญเบ‡. เบซเบผเบฑเบเบชเบนเบ”).

เบชเปˆเบงเบ™เบชเบธเบ”เบ—เป‰เบฒเบ, เบ‚เปเป‰เบกเบนเบ™เบญเป‰เบฒเบ‡เบญเบตเบ‡ เปเบฅเบฐเบ‚เปเป‰เบกเบนเบ™

rake เบ—เบตเปˆโ€‹เบžเบงเบโ€‹เป€เบฎเบปเบฒโ€‹เป„เบ”เป‰โ€‹เป€เบเบฑเบšโ€‹เบเปเบฒโ€‹เบชเปเบฒโ€‹เบฅเบฑเบšโ€‹เบ—เปˆเบฒเบ™โ€‹

  • start_date. เปเบกเปˆเบ™เปเบฅเป‰เบง, เบ™เบตเป‰เปเบกเปˆเบ™ meme เบ—เป‰เบญเบ‡เบ–เบดเปˆเบ™เปเบฅเป‰เบง. เบเบฒเบ™เป‚เบ•เป‰เบ–เบฝเบ‡เบซเบผเบฑเบเบ‚เบญเบ‡ Via Doug start_date เบœเปˆเบฒเบ™เบ—เบฑเบ‡เปเบปเบ”. เป‚เบ”เบเบซเบเปเป‰, เบ–เป‰เบฒเบ—เปˆเบฒเบ™เบฅเบฐเบšเบธเปƒเบ™ start_date เบงเบฑเบ™เบ—เบตเบ›เบฐเบˆเบธเบšเบฑเบ™, เปเบฅเบฐ schedule_interval - เบกเบทเป‰เบซเบ™เบถเปˆเบ‡, เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™ DAG เบˆเบฐเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เปƒเบ™เบกเบทเป‰เบญเบทเปˆเบ™เบšเปเปˆเบกเบตเบเปˆเบญเบ™เบซเบ™เป‰เบฒเบ™เบตเป‰.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    เปเบฅเบฐเบšเปเปˆเบกเบตเบšเบฑเบ™เบซเบฒเบซเบผเบฒเบ.

    เบกเบตเบ„เบงเบฒเบกเบœเบดเบ”เบžเบฒเบ” runtime เบญเบทเปˆเบ™เบ—เบตเปˆเบเปˆเบฝเบงเบ‚เป‰เบญเบ‡เบเบฑเบšเบกเบฑเบ™: Task is missing the start_date parameter, เป€เบŠเบดเปˆเบ‡เบชเปˆเบงเบ™เบซเบผเบฒเบเบกเบฑเบเบˆเบฐเบŠเบตเป‰เปƒเบซเป‰เป€เบซเบฑเบ™เบงเปˆเบฒเป€เบˆเบปเป‰เบฒเบฅเบทเบกเบœเบนเบเบกเบฑเบ”เบเบฑเบšเบ•เบปเบงเบ›เบฐเบ•เบดเบšเบฑเบ”เบเบฒเบ™ dag.

  • เบ—เบฑเบ‡เปเบปเบ”เปƒเบ™เป€เบ„เบทเปˆเบญเบ‡เบ”เบฝเบง. เปเบกเปˆเบ™เปเบฅเป‰เบง, เปเบฅเบฐเบžเบทเป‰เบ™เบ–เบฒเบ™ (Airflow เบ•เบปเบงเบ‚เบญเบ‡เบกเบฑเบ™เป€เบญเบ‡เปเบฅเบฐเบเบฒเบ™เป€เบ„เบทเบญเบšเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ), เปเบฅเบฐเป€เบ„เบทเปˆเบญเบ‡เปเบกเปˆเบ‚เปˆเบฒเบเป€เบงเบฑเบšเป„เบŠเบ•เปŒ, เปเบฅเบฐเบ•เบฒเบ•เบฐเบฅเบฒเบ‡, เปเบฅเบฐเบžเบฐเบ™เบฑเบเบ‡เบฒเบ™. เปเบฅเบฐเป€เบ–เบดเบ‡เปเบกเปˆเบ™เบงเปˆเบฒเบกเบฑเบ™เป€เบฎเบฑเบ”เบงเบฝเบ. เปเบ•เปˆเปƒเบ™เป„เบฅเบเบฐเป€เบงเบฅเบฒ, เบˆเปเบฒเบ™เบงเบ™เบซเบ™เป‰เบฒเบงเบฝเบเบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบšเปเบฅเบดเบเบฒเบ™เป€เบžเบตเปˆเบกเบ‚เบถเป‰เบ™, เปเบฅเบฐเป€เบกเบทเปˆเบญ PostgreSQL เป€เบฅเบตเปˆเบกเบ•เบญเบšเบชเบฐเบซเบ™เบญเบ‡เบเบฑเบšเบ”เบฑเบ”เบชเบฐเบ™เบตเปƒเบ™ 20 s เปเบ—เบ™เบ—เบตเปˆเบˆเบฐเป€เบ›เบฑเบ™ 5 ms, เบžเบงเบเป€เบฎเบปเบฒเป€เบญเบปเบฒเบกเบฑเบ™เป„เบ›.
  • LocalExecutor. เปเบกเปˆเบ™เปเบฅเป‰เบง, เบžเบงเบเป€เบฎเบปเบฒเบเบฑเบ‡เบ™เบฑเปˆเบ‡เบขเบนเปˆเป€เบ—เบดเบ‡เบกเบฑเบ™, เปเบฅเบฐเบžเบงเบเป€เบฎเบปเบฒเป„เบ”เป‰เบกเบฒเบฎเบญเบ”เบ‚เบญเบšเบ‚เบญเบ‡เป€เบซเบงเป€เบฅเบดเบเปเบฅเป‰เบง. LocalExecutor เป„เบ”เป‰เบžเบฝเบ‡เบžเปเบชเปเบฒเบฅเบฑเบšเบžเบงเบเป€เบฎเบปเบฒเบˆเบปเบ™เป€เบ–เบดเบ‡เบ›เบฐเบˆเบธเบšเบฑเบ™, เปเบ•เปˆเบ•เบญเบ™เบ™เบตเป‰เบกเบฑเบ™เป€เบ–เบดเบ‡เป€เบงเบฅเบฒเบ—เบตเปˆเบˆเบฐเบ‚เบฐเบซเบเบฒเบเบเบฑเบšเบžเบฐเบ™เบฑเบเบ‡เบฒเบ™เบขเปˆเบฒเบ‡เบซเบ™เป‰เบญเบเบซเบ™เบถเปˆเบ‡เบ„เบปเบ™, เปเบฅเบฐเบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบ•เป‰เบญเบ‡เป€เบฎเบฑเบ”เบงเบฝเบเบซเบ™เบฑเบเป€เบžเบทเปˆเบญเบเป‰เบฒเบเป„เบ› CeleryExecutor. เปเบฅเบฐเปƒเบ™เบ—เบฑเบ”เบชเบฐเบ™เบฐเบ‚เบญเบ‡เบ„เบงเบฒเบกเบˆเบดเบ‡เบ—เบตเปˆเบงเปˆเบฒเบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เป€เบฎเบฑเบ”เบงเบฝเบเบเบฑเบšเบกเบฑเบ™เบขเบนเปˆเปƒเบ™เป€เบ„เบทเปˆเบญเบ‡เบ”เบฝเบง, เบšเปเปˆเบกเบตเบซเบเบฑเบ‡เบขเบธเบ”เป€เบˆเบปเป‰เบฒเบˆเบฒเบเบเบฒเบ™เปƒเบŠเป‰ Celery เป€เบ–เบดเบ‡เปเบกเปˆเบ™เบงเปˆเบฒเบขเบนเปˆเปƒเบ™เป€เบ„เบทเปˆเบญเบ‡เปเบกเปˆเบ‚เปˆเบฒเบ, เป€เบŠเบดเปˆเบ‡ "เปเบ™เปˆเบ™เบญเบ™, เบˆเบฐเบšเปเปˆเป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เบเบฒเบ™เบœเบฐเบฅเบดเบ”, เบ”เป‰เบงเบเบ„เบงเบฒเบกเบŠเบทเปˆเบชเบฑเบ”!"
  • เบšเปเปˆเปƒเบŠเป‰ เป€เบ„เบทเปˆเบญเบ‡โ€‹เบกเบทโ€‹เปƒเบ™โ€‹เบ•เบปเบงโ€‹:
    • เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆ เป€เบžเบทเปˆเบญโ€‹เป€เบเบฑเบšโ€‹เบฎเบฑเบโ€‹เบชเบฒโ€‹เปƒเบšโ€‹เบขเบฑเป‰เบ‡โ€‹เบขเบทเบ™โ€‹เบเบฒเบ™โ€‹เบšเปโ€‹เบฅเบดโ€‹เบเบฒเบ™โ€‹,
    • SLA Misses เบ•เบญเบšโ€‹เบชเบฐโ€‹เบซเบ™เบญเบ‡โ€‹เบงเบฝเบโ€‹เบ‡เบฒเบ™โ€‹เบ—เบตเปˆโ€‹เบšเปเปˆโ€‹เป„เบ”เป‰โ€‹เบ—เบฑเบ™โ€‹เบเบฒเบ™โ€‹,
    • xcom เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เปเบฅเบเบ›เปˆเบฝเบ™ metadata (เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเป€เบงเบปเป‰เบฒ เป€เบกเบ•เบฒเบ‚เปเป‰เบกเบนเบ™!) เบฅเบฐเบซเบงเปˆเบฒเบ‡เบงเบฝเบเบ‡เบฒเบ™ dag.
  • เบเบฒเบ™เบฅเปˆเบงเบ‡เบฅเบฐเป€เบกเบตเบ”เบ—เบฒเบ‡เป„เบ›เบชเบฐเบ™เบต. เบ”เบต, เบ‚เป‰เบญเบเบชเบฒเบกเบฒเบ”เป€เบงเบปเป‰เบฒเบซเบเบฑเบ‡เป„เบ”เป‰? เบเบฒเบ™เป€เบ•เบทเบญเบ™เป„เบ”เป‰เบ–เบทเบเบชเป‰เบฒเบ‡เบ•เบฑเป‰เบ‡เบ‚เบถเป‰เบ™เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบ„เป‰เบฒเบ‡เบ„เบทเบ™เบ—เบฑเบ‡เบซเบกเบปเบ”เบ‚เบญเบ‡เบงเบฝเบเบ‡เบฒเบ™เบ—เบตเปˆเบซเบผเบธเบ”เบฅเบปเบ‡. เบ”เบฝเบงเบ™เบตเป‰ Gmail เบงเบฝเบเบ‚เบญเบ‡เบ‚เป‰เบญเบเบกเบต > 90k เบญเบตเป€เบกเบงเบˆเบฒเบ Airflow, เปเบฅเบฐ web mail muzzle เบ›เบฐเบ•เบดเป€เบชเบ”เบ—เบตเปˆเบˆเบฐเป€เบญเบปเบฒเปเบฅเบฐเบฅเบถเบšเบซเบผเบฒเบเบเบงเปˆเบฒ 100 เบชเบฐเบšเบฑเบšเบ•เปเปˆเบ„เบฑเป‰เบ‡.

เบ‚เบธเบกโ€‹เบซเบผเบฒเบโ€‹: Apache Airflow Pitfails

เป€เบ„เบทเปˆเบญเบ‡เบกเบทเบญเบฑเบ”เบ•เบฐเป‚เบ™เบกเบฑเบ”เป€เบžเบตเปˆเบกเป€เบ•เบตเบก

เป€เบžเบทเปˆเบญเปƒเบซเป‰เบžเบงเบเป€เบฎเบปเบฒเป€เบฎเบฑเบ”เบงเบฝเบเบซเบผเบฒเบเบ‚เบถเป‰เบ™เบ”เป‰เบงเบเบซเบปเบงเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเปเบฅเบฐเบšเปเปˆเปเบกเปˆเบ™เบ”เป‰เบงเบเบกเบทเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ, Airflow เป„เบ”เป‰เบเบฐเบเบฝเบกเบชเปเบฒเบฅเบฑเบšเบžเบงเบเป€เบฎเบปเบฒเบ™เบตเป‰:

  • REST API - เบฅเบฒเบงเบเบฑเบ‡เบกเบตเบชเบฐเบ–เบฒเบ™เบฐเบ‚เบญเบ‡ Experimental, เป€เบŠเบดเปˆเบ‡เบšเปเปˆเป„เบ”เป‰เบ›เป‰เบญเบ‡เบเบฑเบ™เบšเปเปˆเปƒเบซเป‰เบฅเบฒเบงเป€เบฎเบฑเบ”เบงเบฝเบ. เบ”เป‰เบงเบเบกเบฑเบ™, เบ—เปˆเบฒเบ™เบšเปเปˆเบžเบฝเบ‡เปเบ•เปˆเบชเบฒเบกเบฒเบ”เป„เบ”เป‰เบฎเบฑเบšเบ‚เปเป‰เบกเบนเบ™เบเปˆเบฝเบงเบเบฑเบš dags เปเบฅเบฐเบงเบฝเบเบ‡เบฒเบ™, เปเบ•เปˆเบเบฑเบ‡เบขเบธเบ” / เป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™ dag, เบชเป‰เบฒเบ‡ DAG Run เบซเบผเบทเบชเบฐเบ™เบธเบเป€เบเบต.
  • CLI - เป€เบ„เบทเปˆเบญเบ‡เบกเบทเบˆเปเบฒเบ™เบงเบ™เบซเบผเบฒเบเบชเบฒเบกเบฒเบ”เปƒเบŠเป‰เป„เบ”เป‰เบœเปˆเบฒเบ™เป€เบชเบฑเป‰เบ™เบ„เปเบฒเบชเบฑเปˆเบ‡เบ—เบตเปˆเบšเปเปˆเบžเบฝเบ‡เปเบ•เปˆเบšเปเปˆเบชเบฐเบ”เบงเบเปƒเบ™เบเบฒเบ™เบ™เปเบฒเปƒเบŠเป‰เบœเปˆเบฒเบ™ WebUI, เปเบ•เปˆเป‚เบ”เบเบ—เบปเปˆเบงเป„เบ›เปเบฅเป‰เบงเปเบกเปˆเบ™เบšเปเปˆเบกเบต. เบเบปเบโ€‹เบ•เบปเบงโ€‹เบขเปˆเบฒเบ‡:
    • backfill เบ•เป‰เบญเบ‡เบเบฒเบ™เป€เบžเบทเปˆเบญเบ›เบดเบ”เป€เบ›เบตเบ”เปœเป‰เบฒเบงเบฝเบเบ„เบทเบ™เปƒเปเปˆ.
      เบ•เบปเบงเบขเปˆเบฒเบ‡, เบ™เบฑเบเบงเบดเป€เบ„เบฒเบฐเบกเบฒเปเบฅเบฐเป€เบงเบปเป‰เบฒเบงเปˆเบฒ: "เปเบฅเบฐเบ—เปˆเบฒเบ™, comrade, เบกเบตเบ„เบงเบฒเบกเบšเปเปˆเบกเบตเป€เบซเบ”เบœเบปเบ™เปƒเบ™เบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเบงเบฑเบ™เบ—เบต 1 เบซเบฒ 13 เบกเบฑเบ‡เบเบญเบ™! เปเบเป‰เป„เบ‚, เปเบเป‰เป„เบ‚, เปเบเป‰เป„เบ‚, เปเบเป‰เป„เบ‚เบกเบฑเบ™!" เปเบฅเบฐเบ—เปˆเบฒเบ™เป€เบ›เบฑเบ™ hob เบ”เบฑเปˆเบ‡เบเปˆเบฒเบง:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • เบšเปเบฅเบดเบเบฒเบ™เบžเบทเป‰เบ™เบ–เบฒเบ™: initdb, resetdb, upgradedb, checkdb.
    • run, เป€เบŠเบดเปˆเบ‡เบญเบฐเบ™เบธเบเบฒเบ”เปƒเบซเป‰เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบ”เปเบฒเป€เบ™เบตเบ™เบงเบฝเบเบ‡เบฒเบ™เบ•เบปเบงเบขเปˆเบฒเบ‡เบซเบ™เบถเปˆเบ‡, เปเบฅเบฐเปเบกเป‰เบเบฐเบ—เบฑเป‰เบ‡เบ„เบฐเปเบ™เบ™เบเปˆเบฝเบงเบเบฑเบšเบเบฒเบ™เบ‚เบถเป‰เบ™เบเบฑเบšเบ—เบฑเบ‡เบซเบกเบปเบ”. เบเบดเปˆเบ‡เป„เบ›เบเบงเปˆเบฒเบ™เบฑเป‰เบ™, เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบ”เปเบฒเป€เบ™เบตเบ™เบเบฒเบ™เบœเปˆเบฒเบ™ LocalExecutor, เป€เบ–เบดเบ‡เปเบกเปˆเบ™เบงเปˆเบฒเบ—เปˆเบฒเบ™เบกเบตเบเบธเปˆเบก Celery.
    • เป€เบฎเบฑเบ”เบชเบดเปˆเบ‡เบ”เบฝเบงเบเบฑเบ™เบซเบผเบฒเบ test, เบžเบฝเบ‡เปเบ•เปˆเบเบฑเบ‡เบขเบนเปˆเปƒเบ™เบ–เบฒเบ™เบ‚เบฝเบ™เบšเปเปˆเบกเบตเบซเบเบฑเบ‡.
    • connections เบญเบฐเบ™เบธเบเบฒเบ”เปƒเบซเป‰เบเบฒเบ™เบชเป‰เบฒเบ‡เบกเบฐเบซเบฒเบŠเบปเบ™เบ‚เบญเบ‡เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบˆเบฒเบเปเบเบฐ.
  • python api - เป€เบ›เบฑเบ™เบงเบดเบ—เบตเบ—เบตเปˆเบเบฒเบเบซเบผเบฒเบเบ‚เบญเบ‡เบเบฒเบ™เป‚เบ•เป‰เบ•เบญเบš, เป€เบŠเบดเปˆเบ‡เบกเบตเบˆเบธเบ”เบ›เบฐเบชเบปเบ‡เบชเปเบฒเบฅเบฑเบš plugins, เปเบฅเบฐเบšเปเปˆ swarming เปƒเบ™เบกเบฑเบ™เบ”เป‰เบงเบเบกเบทเบžเบฝเบ‡เป€เบฅเบฑเบเบ™เป‰เบญเบ. เปเบ•เปˆโ€‹เปƒเบœโ€‹เบˆเบฐโ€‹เบขเบธเบ”โ€‹เบžเบงเบโ€‹เป€เบฎเบปเบฒโ€‹เบˆเบฒเบโ€‹เบเบฒเบ™โ€‹เป„เบ› /home/airflow/dags, เปเบฅเปˆเบ™ ipython เปเบฅเบฐเป€เบฅเบตเปˆเบก messing เบ›เบฐเบกเบฒเบ™? เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”, เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบเบปเบเบ•เบปเบงเบขเปˆเบฒเบ‡, เบชเบปเปˆเบ‡เบญเบญเบเบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบ—เบฑเบ‡เบซเบกเบปเบ”เบ—เบตเปˆเบกเบตเบฅเบฐเบซเบฑเบ”เบ”เบฑเปˆเบ‡เบ•เปเปˆเป„เบ›เบ™เบตเป‰:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • เบเบณเบฅเบฑเบ‡เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบเบฑเบšเบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เป€เบกเบ•เบฒเบ‚เบญเบ‡ Airflow. เบ‚เป‰เบญเบเบšเปเปˆเปเบ™เบฐเบ™เปเบฒเปƒเบซเป‰เบ‚เบฝเบ™เบกเบฑเบ™, เปเบ•เปˆเบเบฒเบ™เป„เบ”เป‰เบฎเบฑเบšเบฅเบฑเบ”เบงเบฝเบเบชเปเบฒเบฅเบฑเบš metrics เบชเบฐเป€เบžเบฒเบฐเบ•เปˆเบฒเบ‡เป†เบชเบฒเบกเบฒเบ”เป„เบงเปเบฅเบฐเบ‡เปˆเบฒเบเบเบงเปˆเบฒเบœเปˆเบฒเบ™ APIs เปƒเบ”.

    เปƒเบซเป‰เป€เบงเบปเป‰เบฒเบงเปˆเบฒเบšเปเปˆเปเบกเปˆเบ™เบงเบฝเบเบ‡เบฒเบ™เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเบ—เบฑเบ‡เบซเบกเบปเบ”เปเบกเปˆเบ™เบšเปเปˆเบกเบตเบ„เบงเบฒเบกเป€เบ‚เบฑเป‰เบกเปเบ‚เบ‡, เปเบ•เปˆเบšเบฒเบ‡เบ„เบฑเป‰เบ‡เบžเบงเบเป€เบ‚เบปเบฒเบชเบฒเบกเบฒเบ”เบฅเบปเป‰เบกเบฅเบปเบ‡, เปเบฅเบฐเบ™เบตเป‰เปเบกเปˆเบ™เป€เบฅเบทเปˆเบญเบ‡เบ›เบปเบเบเบฐเบ•เบด. เปเบ•เปˆเบเบฒเบ™เบ‚เบฑเบ”เบ‚เบงเบฒเบ‡เบˆเปเบฒเบ™เบงเบ™เบซเบ™เป‰เบญเบเปเบกเปˆเบ™เบชเบปเบ‡เปƒเบชเปเบฅเป‰เบง, เปเบฅเบฐเบกเบฑเบ™เบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เบเบงเบ”เป€เบšเบดเปˆเบ‡.

    เบฅเบฐเบงเบฑเบ‡ SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

เป€เบญเบเบฐเบชเบฒเบ™

เปเบฅเบฐเปเบ™เปˆเบ™เบญเบ™, เบชเบดเบšเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบ—เปเบฒเบญเบดเบ”เบˆเบฒเบเบเบฒเบ™เบญเบญเบเบ‚เบญเบ‡ Google เปเบกเปˆเบ™เป€เบ™เบทเป‰เบญเปƒเบ™เบ‚เบญเบ‡เป‚เบŸเป€เบ”เบต Airflow เบˆเบฒเบ bookmarks เบ‚เบญเบ‡เบ‚เป‰เบญเบ.

เปเบฅเบฐเบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบ—เบตเปˆเปƒเบŠเป‰เปƒเบ™เบšเบปเบ”เบ„เบงเบฒเบก:

เปเบซเบผเปˆเบ‡เบ‚เปเป‰เบกเบนเบ™: www.habr.com