āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

āĻšāĻžāχ, āφāĻŽāĻŋ āĻĻāĻŋāĻŽāĻŋāĻ¤ā§āϰāĻŋ āϞāĻ—āĻ­āĻŋāύ⧇āĻ™ā§āϕ⧋ - āϭ⧇āĻœā§‡āϟ āĻ—ā§āϰ⧁āĻĒ āĻ…āĻĢ āϕ⧋āĻŽā§āĻĒāĻžāύāĻŋāϰ āĻŦāĻŋāĻļā§āϞ⧇āώāĻŖ āĻŦāĻŋāĻ­āĻžāϗ⧇āϰ āĻĄā§‡āϟāĻž āχāĻžā§āϜāĻŋāύāĻŋāϝāĻŧāĻžāϰāĨ¤

āφāĻŽāĻŋ āφāĻĒāύāĻžāϕ⧇ ETL āĻĒā§āϰāĻ•ā§āϰāĻŋāϝāĻŧāĻžāϗ⧁āϞāĻŋ āĻŦāĻŋāĻ•āĻžāĻļ⧇āϰ āϜāĻ¨ā§āϝ āĻāĻ•āϟāĻŋ āĻĻ⧁āĻ°ā§āĻĻāĻžāĻ¨ā§āϤ āϏāϰāĻžā§āϜāĻžāĻŽ āϏāĻŽā§āĻĒāĻ°ā§āϕ⧇ āĻŦāϞāĻŦ - āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋āĨ¤ āϤāĻŦ⧇ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āĻāϤāχ āĻŦāĻšā§āĻŽā§āĻ–ā§€ āĻāĻŦāĻ‚ āĻŦāĻšā§āĻŽā§āĻ–ā§€ āϝ⧇ āφāĻĒāύāĻŋ āĻĄā§‡āϟāĻž āĻĒā§āϰāĻŦāĻžāĻšā§‡āϰ āϏāĻžāĻĨ⧇ āϜāĻĄāĻŧāĻŋāϤ āύāĻž āĻšāϞ⧇āĻ“ āĻāϟāĻŋāϕ⧇ āφāϰāĻ“ āϘāύāĻŋāĻˇā§āĻ āĻ­āĻžāĻŦ⧇ āĻĻ⧇āϖ⧇ āύ⧇āĻ“āϝāĻŧāĻž āωāϚāĻŋāϤ, āϤāĻŦ⧇ āĻĒāĻ°ā§āϝāĻžāϝāĻŧāĻ•ā§āϰāĻŽā§‡ āϝ⧇ āϕ⧋āύāĻ“ āĻĒā§āϰāĻ•ā§āϰāĻŋāϝāĻŧāĻž āϚāĻžāϞ⧁ āĻ•āϰāĻž āĻāĻŦāĻ‚ āϤāĻžāĻĻ⧇āϰ āϏāĻŽā§āĻĒāĻžāĻĻāύ āύāĻŋāϰ⧀āĻ•ā§āώāĻŖ āĻ•āϰāĻžāϰ āĻĒā§āϰāϝāĻŧā§‹āϜāύ āϰāϝāĻŧ⧇āϛ⧇āĨ¤

āĻāĻŦāĻ‚ āĻšā§āϝāĻžāρ, āφāĻŽāĻŋ āϕ⧇āĻŦāϞ āĻŦāϞāĻŦ āύāĻž, āϤāĻŦ⧇ āĻĻ⧇āĻ–āĻžāĻŦ: āĻĒā§āϰ⧋āĻ—ā§āϰāĻžāĻŽāϟāĻŋāϤ⧇ āĻĒā§āϰāϚ⧁āϰ āϕ⧋āĻĄ, āĻ¸ā§āĻ•ā§āϰāĻŋāύāĻļāϟ āĻāĻŦāĻ‚ āϏ⧁āĻĒāĻžāϰāĻŋāĻļ āϰāϝāĻŧ⧇āϛ⧇āĨ¤

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž
āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ / āωāχāĻ•āĻŋāĻŽāĻŋāĻĄāĻŋāϝāĻŧāĻž āĻ•āĻŽāĻ¨ā§āϏ āĻļāĻŦā§āĻĻāϟāĻŋ āϗ⧁āĻ—āϞ āĻ•āϰāϞ⧇ āφāĻĒāύāĻŋ āϏāĻžāϧāĻžāϰāĻŖāϤ āϝāĻž āĻĻ⧇āĻ–āϤ⧇ āĻĒāĻžāύ

āĻŦāĻŋāώāϝāĻŧāĻŦāĻ¸ā§āϤ⧁ āϏ⧂āϚāĻŋ

āĻ­ā§‚āĻŽāĻŋāĻ•āĻž

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āĻ āĻŋāĻ• āĻœā§āϝāĻžāĻ™ā§āĻ—ā§‹āϰ āĻŽāϤ⧋:

  • āĻĒāĻžāχāĻĨāύ⧇ āϞ⧇āĻ–āĻž
  • āĻāĻ•āϟāĻŋ āĻŽāĻšāĻžāύ āĻ…ā§āϝāĻžāĻĄāĻŽāĻŋāύ āĻĒā§āϝāĻžāύ⧇āϞ āφāϛ⧇,
  • āĻ…āύāĻŋāĻ°ā§āĻĻāĻŋāĻˇā§āϟāĻ•āĻžāϞ⧇āϰ āϜāĻ¨ā§āϝ āĻĒā§āϰāϏāĻžāϰāĻŖāϝ⧋āĻ—ā§āϝ

- āĻļ⧁āϧ⧁āĻŽāĻžāĻ¤ā§āϰ āĻ­āĻžāϞ, āĻāĻŦāĻ‚ āĻāϟāĻŋ āϏāĻŽā§āĻĒā§‚āĻ°ā§āĻŖ āĻ­āĻŋāĻ¨ā§āύ āωāĻĻā§āĻĻ⧇āĻļā§āϝ⧇ āϤ⧈āϰāĻŋ āĻ•āϰāĻž āĻšāϝāĻŧ⧇āĻ›āĻŋāϞ, āϝāĻĨāĻž (āϝ⧇āĻŽāύ āĻāϟāĻŋ āĻ•ā§āϝāĻžāĻŸā§‡āϰ āφāϗ⧇ āϞ⧇āĻ–āĻž āφāϛ⧇):

  • āϏ⧀āĻŽāĻžāĻšā§€āύ āϏāĻ‚āĻ–ā§āϝāĻ• āĻŽā§‡āĻļāĻŋāύ⧇ āĻ•āĻžāϜ āϚāĻžāϞāĻžāύ⧋ āĻāĻŦāĻ‚ āĻĒāĻ°ā§āϝāĻŦ⧇āĻ•ā§āώāĻŖ āĻ•āϰāĻž (āϝāϤ āĻŦ⧇āĻļāĻŋ āϏ⧇āϞāĻžāϰāĻŋ/āϕ⧁āĻŦāĻžāϰāύ⧇āϟāϏ āĻāĻŦāĻ‚ āφāĻĒāύāĻžāϰ āĻŦāĻŋāĻŦ⧇āĻ• āφāĻĒāύāĻžāϕ⧇ āĻ…āύ⧁āĻŽāϤāĻŋ āĻĻ⧇āĻŦ⧇)
  • āĻĄāĻžāϝāĻŧāύāĻžāĻŽāĻŋāĻ• āĻ“āϝāĻŧāĻžāĻ°ā§āĻ•āĻĢā§āϞ⧋ āĻœā§‡āύāĻžāϰ⧇āĻļāύ⧇āϰ āϏāĻžāĻĨ⧇ āĻĒāĻžāχāĻĨāύ āϕ⧋āĻĄ āϞāĻŋāĻ–āϤ⧇ āĻāĻŦāĻ‚ āĻŦ⧁āĻāϤ⧇ āϖ⧁āĻŦ āϏāĻšāϜ āĻĨ⧇āϕ⧇
  • āĻāĻŦāĻ‚ āϰ⧇āĻĄāĻŋāĻŽā§‡āĻĄ āĻ•āĻŽā§āĻĒā§‹āύ⧇āĻ¨ā§āϟ āĻāĻŦāĻ‚ āϘāϰ⧇ āϤ⧈āϰāĻŋ āĻĒā§āϞāĻžāĻ—āχāύ āωāĻ­āϝāĻŧ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰ⧇ āϝ⧇āϕ⧋āύ āĻĄāĻžāϟāĻžāĻŦ⧇āϏ āĻāĻŦāĻ‚ āĻāĻĒāĻŋāφāχ āĻāϕ⧇ āĻ…āĻĒāϰ⧇āϰ āϏāĻžāĻĨ⧇ āϏāĻ‚āϝ⧋āĻ— āĻ•āϰāĻžāϰ āĻ•ā§āώāĻŽāϤāĻž (āϝāĻž āĻ…āĻ¤ā§āϝāĻ¨ā§āϤ āϏāĻšāϜ)āĨ¤

āφāĻŽāϰāĻž āĻāχ āĻŽāϤ Apache Airflow āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰāĻŋ:

  • āφāĻŽāϰāĻž āĻŦāĻŋāĻ­āĻŋāĻ¨ā§āύ āĻ‰ā§ŽāϏ āĻĨ⧇āϕ⧇ āĻĄā§‡āϟāĻž āϏāĻ‚āĻ—ā§āϰāĻš āĻ•āϰāĻŋ (āĻ…āύ⧇āĻ• SQL āϏāĻžāĻ°ā§āĻ­āĻžāϰ āĻāĻŦāĻ‚ PostgreSQL āωāĻĻāĻžāĻšāϰāĻŖ, āĻ…ā§āϝāĻžāĻĒā§āϞāĻŋāϕ⧇āĻļāύ āĻŽā§‡āĻŸā§āϰāĻŋāĻ•ā§āϏ āϏāĻš āĻŦāĻŋāĻ­āĻŋāĻ¨ā§āύ API, āĻāĻŽāύāĻ•āĻŋ 1C) DWH āĻāĻŦāĻ‚ ODS (āφāĻŽāĻžāĻĻ⧇āϰ Vertica āĻāĻŦāĻ‚ Clickhouse āφāϛ⧇)āĨ¤
  • āĻ•āϤ āωāĻ¨ā§āύāϤ cron, āϝāĻž āĻ“āĻĄāĻŋāĻāϏ-āĻ āĻĄā§‡āϟāĻž āĻāĻ•āĻ¤ā§āϰ⧀āĻ•āϰāĻŖ āĻĒā§āϰāĻ•ā§āϰāĻŋāϝāĻŧāĻž āĻļ⧁āϰ⧁ āĻ•āϰ⧇ āĻāĻŦāĻ‚ āϤāĻžāĻĻ⧇āϰ āϰāĻ•ā§āώāĻŖāĻžāĻŦ⧇āĻ•ā§āώāĻŖāĻ“ āĻĒāĻ°ā§āϝāĻŦ⧇āĻ•ā§āώāĻŖ āĻ•āϰ⧇āĨ¤

āϏāĻŽā§āĻĒā§āϰāϤāĻŋ āĻ…āĻŦāϧāĻŋ, āφāĻŽāĻžāĻĻ⧇āϰ āϚāĻžāĻšāĻŋāĻĻāĻžāϗ⧁āϞāĻŋ 32 āϕ⧋āϰ āĻāĻŦāĻ‚ 50 GB RAM āϏāĻš āĻāĻ•āϟāĻŋ āϛ⧋āϟ āϏāĻžāĻ°ā§āĻ­āĻžāϰ āĻĻā§āĻŦāĻžāϰāĻž āφāĻšā§āĻ›āĻžāĻĻāĻŋāϤ āĻ›āĻŋāϞ⧎ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋āϤ⧇, āĻāϟāĻŋ āĻ•āĻžāϜ āĻ•āϰ⧇:

  • āĻ…āϧāĻŋāĻ• 200 āĻĄā§āϝāĻžāĻ— (āφāϏāϞ⧇ āĻ“āϝāĻŧāĻžāĻ°ā§āĻ•āĻĢā§āϞ⧋, āϝ⧇āĻ–āĻžāύ⧇ āφāĻŽāϰāĻž āĻ•āĻžāϜāϗ⧁āϞāĻŋ āĻ¸ā§āϟāĻžāĻĢ āĻ•āϰ⧇āĻ›āĻŋ)
  • āĻ—āĻĄāĻŧ⧇ āĻĒā§āϰāϤāĻŋāϟāĻŋāϤ⧇ 70āϟāĻŋ āĻ•āĻžāϜ,
  • āĻāχ āϧāĻžāĻ°ā§āĻŽāĻŋāĻ•āϤāĻž āĻļ⧁āϰ⧁ āĻšāϝāĻŧ (āĻ—āĻĄāĻŧ⧇āĻ“) āϘāĻ¨ā§āϟāĻžāϝāĻŧ āĻāĻ•āĻŦāĻžāϰ.

āĻāĻŦāĻ‚ āφāĻŽāϰāĻž āϕ⧀āĻ­āĻžāĻŦ⧇ āĻĒā§āϰāϏāĻžāϰāĻŋāϤ āĻ•āϰ⧇āĻ›āĻŋ āϏ⧇ āϏāĻŽā§āĻĒāĻ°ā§āϕ⧇, āφāĻŽāĻŋ āύ⧀āĻšā§‡ āϞāĻŋāĻ–āĻŦ, āĻ•āĻŋāĻ¨ā§āϤ⧁ āĻāĻ–āύ āφāϏ⧁āύ Ãŧber-āϏāĻŽāĻ¸ā§āϝāĻžāϟāĻŋ āϏāĻ‚āĻœā§āĻžāĻžāϝāĻŧāĻŋāϤ āĻ•āϰāĻŋ āϝāĻž āφāĻŽāϰāĻž āϏāĻŽāĻžāϧāĻžāύ āĻ•āϰāĻŦ:

āϤāĻŋāύāϟāĻŋ āĻ‰ā§ŽāϏ āĻāϏāĻ•āĻŋāωāĻāϞ āϏāĻžāĻ°ā§āĻ­āĻžāϰ āϰāϝāĻŧ⧇āϛ⧇, āĻĒā§āϰāϤāĻŋāϟāĻŋāϤ⧇ 50āϟāĻŋ āĻĄāĻžāϟāĻžāĻŦ⧇āϏ āϰāϝāĻŧ⧇āϛ⧇ - āϝāĻĨāĻžāĻ•ā§āϰāĻŽā§‡ āĻāĻ•āϟāĻŋ āĻĒā§āϰāĻ•āĻ˛ā§āĻĒ⧇āϰ āωāĻĻāĻžāĻšāϰāĻŖ, āϤāĻžāĻĻ⧇āϰ āĻāĻ•āχ āĻ•āĻžāĻ āĻžāĻŽā§‹ āϰāϝāĻŧ⧇āϛ⧇ (āĻĒā§āϰāĻžāϝāĻŧ āϏāĻ°ā§āĻŦāĻ¤ā§āϰ, āĻŽā§āφ-āĻšāĻž-āĻšāĻž), āϝāĻžāϰ āĻ…āĻ°ā§āĻĨ āĻĒā§āϰāϤāĻŋāϟāĻŋāϤ⧇ āĻāĻ•āϟāĻŋ āĻ…āĻ°ā§āĻĄāĻžāϰ āĻŸā§‡āĻŦāĻŋāϞ āϰāϝāĻŧ⧇āϛ⧇ (āϏ⧌āĻ­āĻžāĻ—ā§āϝāĻ•ā§āϰāĻŽā§‡, āĻāϟāĻŋ āϏāĻš āĻāĻ•āϟāĻŋ āĻŸā§‡āĻŦāĻŋāϞ āύāĻžāĻŽ āϝ⧇āϕ⧋āύ āĻŦā§āϝāĻŦāϏāĻžāϝāĻŧ āĻĒ⧁āĻļ āĻ•āϰāĻž āϝ⧇āϤ⧇ āĻĒāĻžāϰ⧇)āĨ¤ āφāĻŽāϰāĻž āĻĒāϰāĻŋāώ⧇āĻŦāĻžāϰ āĻ•ā§āώ⧇āĻ¤ā§āϰāϗ⧁āϞāĻŋ (āϏ⧋āĻ°ā§āϏ āϏāĻžāĻ°ā§āĻ­āĻžāϰ, āϏ⧋āĻ°ā§āϏ āĻĄāĻžāϟāĻžāĻŦ⧇āϏ, ETL āϟāĻžāĻ¸ā§āĻ• āφāχāĻĄāĻŋ) āϝ⧋āĻ— āĻ•āϰ⧇ āĻĄā§‡āϟāĻž āύāĻŋāϝāĻŧ⧇ āĻĨāĻžāĻ•āĻŋ āĻāĻŦāĻ‚ āϏāĻšāĻœā§‡ āϏ⧇āϗ⧁āϞāĻŋāϕ⧇ āĻ­āĻžāĻ°ā§āϟāĻŋāĻ•āĻžāϰ āĻŽāĻ§ā§āϝ⧇ āĻĢ⧇āϞ⧇ āĻĻāĻŋāχāĨ¤

āϚāϞ āϝāĻžāχ!

āĻĒā§āϰāϧāĻžāύ āĻ…āĻ‚āĻļ, āĻŦā§āϝāĻŦāĻšāĻžāϰāĻŋāĻ• (āĻāĻŦāĻ‚ āĻāĻ•āϟ⧁ āϤāĻžāĻ¤ā§āĻ¤ā§āĻŦāĻŋāĻ•)

āϕ⧇āύ āφāĻŽāϰāĻž (āĻāĻŦāĻ‚ āφāĻĒāύāĻŋ)

āϝāĻ–āύ āĻ—āĻžāĻ›āϗ⧁āϞ⧋ āĻŦāĻĄāĻŧ āĻ›āĻŋāϞ āφāϰ āφāĻŽāĻŋ āϏāϰāϞ āĻ›āĻŋāϞāĻžāĻŽ SQL-āĻāĻ•āϟāĻŋ āϰāĻžāĻļāĻŋāϝāĻŧāĻžāύ āϖ⧁āϚāϰ⧋āϤ⧇, āφāĻŽāϰāĻž āφāĻŽāĻžāĻĻ⧇āϰ āĻ•āĻžāϛ⧇ āωāĻĒāϞāĻŦā§āϧ āĻĻ⧁āϟāĻŋ āϏāϰāĻžā§āϜāĻžāĻŽ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰ⧇ ETL āĻĒā§āϰāĻ•ā§āϰāĻŋāϝāĻŧāĻž āĻ“āϰāĻĢ⧇ āĻĄā§‡āϟāĻž āĻĒā§āϰāĻŦāĻžāĻšāϕ⧇ āĻ¸ā§āĻ•ā§āϝāĻžāĻŽ āĻ•āϰ⧇āĻ›āĻŋ:

  • āχāύāĻĢāϰāĻŽā§‡āϟāĻŋāĻ•āĻž ​​āĻĒāĻžāĻ“āϝāĻŧāĻžāϰ āϏ⧇āĻ¨ā§āϟāĻžāϰ - āĻāĻ•āϟāĻŋ āĻ…āĻ¤ā§āϝāĻ¨ā§āϤ āĻ›āĻĄāĻŧāĻŋāϝāĻŧ⧇ āĻĒāĻĄāĻŧāĻž āϏāĻŋāĻ¸ā§āĻŸā§‡āĻŽ, āĻ…āĻ¤ā§āϝāĻ¨ā§āϤ āωāĻ¤ā§āĻĒāĻžāĻĻāύāĻļā§€āϞ, āύāĻŋāϜāĻ¸ā§āĻŦ āĻšāĻžāĻ°ā§āĻĄāĻ“āϝāĻŧā§āϝāĻžāϰ āϏāĻš, āĻāϰ āύāĻŋāϜāĻ¸ā§āĻŦ āϏāĻ‚āĻ¸ā§āĻ•āϰāĻŖāĨ¤ āφāĻŽāĻŋ āĻāϰ āĻ•ā§āώāĻŽāϤāĻžāϰ 1% āψāĻļā§āĻŦāϰ āύāĻŋāώ⧇āϧ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰ⧇āĻ›āĻŋāĨ¤ āϕ⧇āύ? āĻ āĻŋāĻ• āφāϛ⧇, āĻĒā§āϰāĻĨāĻŽāϤ, āĻāχ āχāĻ¨ā§āϟāĻžāϰāĻĢ⧇āϏāϟāĻŋ 380 āĻāϰ āĻĻāĻļāϕ⧇āϰ āϕ⧋āĻĨāĻžāĻ“ āĻŽāĻžāύāϏāĻŋāĻ•āĻ­āĻžāĻŦ⧇ āφāĻŽāĻžāĻĻ⧇āϰ āωāĻĒāϰ āϚāĻžāĻĒ āϏ⧃āĻˇā§āϟāĻŋ āĻ•āϰ⧇āϛ⧇āĨ¤ āĻĻā§āĻŦāĻŋāϤ⧀āϝāĻŧāϤ, āĻāχ āĻ•āύāĻŸā§āϰāĻžāĻĒāĻļāύāϟāĻŋ āĻ…āĻ¤ā§āϝāĻ¨ā§āϤ āĻ…āĻ­āĻŋāύāĻŦ āĻĒā§āϰāĻ•ā§āϰāĻŋāϝāĻŧāĻž, āωāĻ—ā§āϰ āωāĻĒāĻžāĻĻāĻžāύ āĻĒ⧁āύāσāĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāĻŦāĻ‚ āĻ…āĻ¨ā§āϝāĻžāĻ¨ā§āϝ āĻ…āĻ¤ā§āϝāĻ¨ā§āϤ-āϗ⧁āϰ⧁āĻ¤ā§āĻŦāĻĒā§‚āĻ°ā§āĻŖ-āĻāĻ¨ā§āϟāĻžāϰāĻĒā§āϰāĻžāχāϜ-āĻ•ā§ŒāĻļāϞ⧇āϰ āϜāĻ¨ā§āϝ āĻĄāĻŋāϜāĻžāχāύ āĻ•āϰāĻž āĻšāϝāĻŧ⧇āϛ⧇āĨ¤ āĻāϝāĻŧāĻžāϰāĻŦāĻžāϏ AXNUMX / āĻŦāĻ›āϰ⧇āϰ āωāχāĻ‚ āĻāϰ āĻŽāϤ āĻāϟāĻŋāϰ āĻĻāĻžāĻŽ āϕ⧀ āϤāĻž āϏāĻŽā§āĻĒāĻ°ā§āϕ⧇ āφāĻŽāϰāĻž āĻ•āĻŋāϛ⧁ āĻŦāϞāĻŦ āύāĻžāĨ¤

    āϏāĻžāĻŦāϧāĻžāύ, āĻāĻ•āϟāĻŋ āĻ¸ā§āĻ•ā§āϰāĻŋāύāĻļāϟ 30 āĻŦāĻ›āϰ⧇āϰ āĻ•āĻŽ āĻŦāϝāĻŧāϏ⧀ āϞ⧋āϕ⧇āĻĻ⧇āϰ āĻ•āĻŋāϛ⧁āϟāĻž āĻ•ā§āώāϤāĻŋ āĻ•āϰāϤ⧇ āĻĒāĻžāϰ⧇

    āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

  • SQL āϏāĻžāĻ°ā§āĻ­āĻžāϰ āχāĻ¨ā§āϟāĻŋāĻ—ā§āϰ⧇āĻļāύ āϏāĻžāĻ°ā§āĻ­āĻžāϰ - āφāĻŽāϰāĻž āφāĻŽāĻžāĻĻ⧇āϰ āφāĻ¨ā§āϤāσāĻĒā§āϰāĻ•āĻ˛ā§āĻĒ āĻĒā§āϰāĻŦāĻžāĻšā§‡ āĻāχ āĻ•āĻŽāϰ⧇āĻĄ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰ⧇āĻ›āĻŋāĨ¤ āĻ āĻŋāĻ• āφāϛ⧇, āφāϏāϞ⧇: āφāĻŽāϰāĻž āχāϤāĻŋāĻŽāĻ§ā§āϝ⧇āχ SQL āϏāĻžāĻ°ā§āĻ­āĻžāϰ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰāĻŋ, āĻāĻŦāĻ‚ āĻāϟāĻŋāϰ ETL āϏāϰāĻžā§āϜāĻžāĻŽāϗ⧁āϞāĻŋ āĻŦā§āϝāĻŦāĻšāĻžāϰ āύāĻž āĻ•āϰāĻž āĻāĻ•āϰāĻ•āĻŽ āĻ…āϝ⧌āĻ•ā§āϤāĻŋāĻ• āĻšāĻŦ⧇āĨ¤ āĻāϰ āĻŽāĻ§ā§āϝ⧇ āϏāĻŦāĻ•āĻŋāϛ⧁āχ āĻ­āĻžāϞ⧋: āχāĻ¨ā§āϟāĻžāϰāĻĢ⧇āϏ āĻĻ⧁āϟāĻŋāχ āϏ⧁āĻ¨ā§āĻĻāϰ, āĻāĻŦāĻ‚ āĻ…āĻ—ā§āϰāĻ—āϤāĻŋ āĻĒā§āϰāϤāĻŋāĻŦ⧇āĻĻāύ... āĻ•āĻŋāĻ¨ā§āϤ⧁ āĻāχ āĻ•āĻžāϰāϪ⧇āχ āφāĻŽāϰāĻž āϏāĻĢā§āϟāĻ“āϝāĻŧā§āϝāĻžāϰ āĻĒāĻŖā§āϝ āĻĒāĻ›āĻ¨ā§āĻĻ āĻ•āϰāĻŋ āύāĻž, āĻ“āĻš, āĻāϰ āϜāĻ¨ā§āϝ āύāϝāĻŧāĨ¤ āĻāϟāĻž āϏāĻ‚āĻ¸ā§āĻ•āϰāĻŖ dtsx (āϝ⧇āϟāĻŋ XML āύ⧋āĻĄāϗ⧁āϞāĻŋāϕ⧇ āϏ⧇āĻ­ āĻ•āϰāĻžāϰ āϏāĻŽāϝāĻŧ āĻāϞ⧋āĻŽā§‡āϞ⧋ āĻ•āϰ⧇ āĻĻ⧇āĻ“āϝāĻŧāĻž āĻšāϝāĻŧ) āφāĻŽāϰāĻž āĻĒāĻžāϰāĻŋ, āĻ•āĻŋāĻ¨ā§āϤ⧁ āϕ⧀ āϞāĻžāĻ­? āϕ⧀āĻ­āĻžāĻŦ⧇ āĻāĻ•āϟāĻŋ āϟāĻžāĻ¸ā§āĻ• āĻĒā§āϝāĻžāϕ⧇āϜ āϤ⧈āϰāĻŋ āĻ•āϰāĻŦ⧇āύ āϝāĻž āĻāĻ• āϏāĻžāĻ°ā§āĻ­āĻžāϰ āĻĨ⧇āϕ⧇ āĻ…āĻ¨ā§āϝ āϏāĻžāĻ°ā§āĻ­āĻžāϰ⧇ āĻļāϤ āĻļāϤ āĻŸā§‡āĻŦāĻŋāϞ āĻŸā§‡āύ⧇ āφāύāĻŦ⧇? āĻšā§āϝāĻžāρ, āĻ•āĻŋ āĻāĻ•āĻļ, āφāĻĒāύāĻžāϰ āϤāĻ°ā§āϜāύ⧀ āĻŦāĻŋāĻļ āϟ⧁āĻ•āϰāĻž āĻĨ⧇āϕ⧇ āĻĒāĻĄāĻŧ⧇ āϝāĻžāĻŦ⧇, āĻŽāĻžāωāϏ āĻŦā§‹āϤāĻžāĻŽā§‡ āĻ•ā§āϞāĻŋāĻ• āĻ•āϰ⧁āύāĨ¤ āϤāĻŦ⧇ āĻāϟāĻŋ āĻ…āĻŦāĻļā§āϝāχ āφāϰāĻ“ āĻĢā§āϝāĻžāĻļāύ⧇āĻŦāϞ āĻĻ⧇āĻ–āĻžāϝāĻŧ:

    āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

āφāĻŽāϰāĻž āĻ…āĻŦāĻļā§āϝāχ āωāĻĒāĻžāϝāĻŧ āϖ⧁āρāϜāĻ›āĻŋ. āĻŽāĻžāĻŽāϞāĻž āĻāĻŽāύāĻ•āĻŋ āĻĒā§āϰāĻžāϝāĻŧ āĻāĻ•āϟāĻŋ āĻ¸ā§āĻŦ-āϞāĻŋāĻ–āĻŋāϤ SSIS āĻĒā§āϝāĻžāϕ⧇āϜ āĻœā§‡āύāĻžāϰ⧇āϟāϰ⧇ āĻāϏ⧇āϛ⧇ ...

â€ĻāĻāĻŦāĻ‚ āϤāĻžāϰāĻĒāϰ āĻāĻ•āϟāĻŋ āύāϤ⧁āύ āϚāĻžāĻ•āϰāĻŋ āφāĻŽāĻžāϕ⧇ āϖ⧁āρāĻœā§‡ āĻĒ⧇āϝāĻŧ⧇āĻ›āĻŋāϞāĨ¤ āĻāĻŦāĻ‚ āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āĻāϟāĻŋāϤ⧇ āφāĻŽāĻžāϕ⧇ āĻ›āĻžāĻĄāĻŧāĻŋāϝāĻŧ⧇ āϗ⧇āϛ⧇āĨ¤

āϝāĻ–āύ āφāĻŽāĻŋ āϜāĻžāύāϤ⧇ āĻĒāĻžāϰāϞāĻžāĻŽ āϝ⧇ ETL āĻĒā§āϰāĻ•ā§āϰāĻŋāϝāĻŧāĻžāϰ āĻŦāĻŋāĻŦāϰāĻŖāϗ⧁āϞāĻŋ āĻšāϞ āϏāĻžāϧāĻžāϰāĻŖ āĻĒāĻžāχāĻĨāύ āϕ⧋āĻĄ, āϤāĻ–āύ āφāĻŽāĻŋ āφāύāĻ¨ā§āĻĻ⧇āϰ āϜāĻ¨ā§āϝ āύāĻžāϚ āĻ•āϰāĻŋāύāĻŋāĨ¤ āĻāĻ­āĻžāĻŦ⧇āχ āĻĄā§‡āϟāĻž āĻ¸ā§āĻŸā§āϰāĻŋāĻŽāϗ⧁āϞāĻŋāϕ⧇ āϏāĻ‚āĻ¸ā§āĻ•āϰāĻŖ āĻ•āϰāĻž āĻšāϝāĻŧ⧇āĻ›āĻŋāϞ āĻāĻŦāĻ‚ āφāϞāĻžāĻĻāĻž āĻ•āϰāĻž āĻšāϝāĻŧ⧇āĻ›āĻŋāϞ āĻāĻŦāĻ‚ āĻļāϤ āĻļāϤ āĻĄāĻžāϟāĻžāĻŦ⧇āϏ āĻĨ⧇āϕ⧇ āĻāĻ•āϟāĻŋ āϟāĻžāĻ°ā§āϗ⧇āĻŸā§‡ āĻāĻ•āĻ• āĻ•āĻžāĻ āĻžāĻŽā§‹āϰ āϏāĻžāĻĨ⧇ āĻŸā§‡āĻŦāĻŋāϞ āĻĸ⧇āϞ⧇ āĻĻ⧇āĻ“āϝāĻŧāĻž āĻĻ⧇āĻĄāĻŧ āĻŦāĻž āĻĻ⧁āχ 13” āĻ¸ā§āĻ•ā§āϰāĻŋāύ⧇ āĻĒāĻžāχāĻĨāύ āϕ⧋āĻĄā§‡āϰ āĻŦāĻŋāώāϝāĻŧ āĻšāϝāĻŧ⧇ āĻĻāĻžāρāĻĄāĻŧāĻŋāϝāĻŧ⧇āĻ›āĻŋāϞāĨ¤

āĻ•ā§āϞāĻžāĻ¸ā§āϟāĻžāϰ āĻāĻ•āĻ¤ā§āϰāĻŋāϤ āĻ•āϰāĻž

āφāϏ⧁āύ āĻāĻ•āϟāĻŋ āϏāĻŽā§āĻĒā§‚āĻ°ā§āĻŖ āĻ•āĻŋāĻ¨ā§āĻĄāĻžāϰāĻ—āĻžāĻ°ā§āĻŸā§‡āύ⧇āϰ āĻŦā§āϝāĻŦāĻ¸ā§āĻĨāĻž āĻ•āϰāĻŋ āύāĻž, āĻāĻŦāĻ‚ āĻāĻ–āĻžāύ⧇ āϏāĻŽā§āĻĒā§‚āĻ°ā§āĻŖ āϏ⧁āĻ¸ā§āĻĒāĻˇā§āϟ āϜāĻŋāύāĻŋāϏāϗ⧁āϞāĻŋ āϏāĻŽā§āĻĒāĻ°ā§āϕ⧇ āĻ•āĻĨāĻž āĻŦāϞāĻŋ āύāĻž, āϝ⧇āĻŽāύ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āχāύāĻ¸ā§āϟāϞ āĻ•āϰāĻž, āφāĻĒāύāĻžāϰ āύāĻŋāĻ°ā§āĻŦāĻžāϚāĻŋāϤ āĻĄāĻžāϟāĻžāĻŦ⧇āϏ, āϏ⧇āϞāĻžāϰāĻŋ āĻāĻŦāĻ‚ āĻĄāĻ•āϗ⧁āϞāĻŋāϤ⧇ āĻŦāĻ°ā§āĻŖāĻŋāϤ āĻ…āĻ¨ā§āϝāĻžāĻ¨ā§āϝ āĻ•ā§āώ⧇āĻ¤ā§āϰ⧇āĨ¤

āϝāĻžāϤ⧇ āφāĻŽāϰāĻž āĻ…āĻŦāĻŋāϞāĻŽā§āĻŦ⧇ āĻĒāϰ⧀āĻ•ā§āώāĻž āĻļ⧁āϰ⧁ āĻ•āϰāϤ⧇ āĻĒāĻžāϰāĻŋ, āφāĻŽāĻŋ āĻ¸ā§āϕ⧇āϚ āĻ•āϰ⧇āĻ›āĻŋ docker-compose.yml āϝāĻž:

  • āĻāϰ āφāϏāϞ⧇ āĻŦāĻžāĻĄāĻŧāĻžāϤ⧇ āϝāĻžāĻ• āĻŦāĻžāϤāĻžāϏ⧇āϰ āĻĒā§āϰāĻŦāĻžāĻš: āĻļāĻŋāĻĄāĻŋāωāϞāĻžāϰ, āĻ“āϝāĻŧ⧇āĻŦ āϏāĻžāĻ°ā§āĻ­āĻžāϰāĨ¤ āϏ⧇āϞāĻžāϰāĻŋ āĻ•āĻžāϜāϗ⧁āϞāĻŋ āύāĻŋāϰ⧀āĻ•ā§āώāϪ⧇āϰ āϜāĻ¨ā§āϝ āĻĢ⧁āϞ āϏ⧇āĻ–āĻžāύ⧇ āϘ⧁āϰāĻŦ⧇ (āĻ•āĻžāϰāĻŖ āĻāϟāĻŋ āχāϤāĻŋāĻŽāĻ§ā§āϝ⧇āχ āϠ⧇āϞ⧇ āĻĻ⧇āĻ“āϝāĻŧāĻž āĻšāϝāĻŧ⧇āϛ⧇ apache/airflow:1.10.10-python3.7āĻ•āĻŋāĻ¨ā§āϤ⧁ āφāĻŽāϰāĻž āĻ•āĻŋāϛ⧁ āĻŽāύ⧇ āĻ•āϰāĻŋ āύāĻž)
  • āĻĒā§‹āĻ¸ā§āϟāĻ—ā§āϰāĻŋ, āϝāĻžāϤ⧇ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āϤāĻžāϰ āĻĒāϰāĻŋāώ⧇āĻŦāĻžāϰ āϤāĻĨā§āϝ āϞāĻŋāĻ–āĻŦ⧇ (āĻļāĻŋāĻĄāĻŋāωāϞāĻžāϰ āĻĄā§‡āϟāĻž, āĻāĻ•ā§āϏāĻŋāĻ•āĻŋāωāĻļāύ āĻĒāϰāĻŋāϏāĻ‚āĻ–ā§āϝāĻžāύ, āχāĻ¤ā§āϝāĻžāĻĻāĻŋ), āĻāĻŦāĻ‚ āϏ⧇āϞāĻžāϰāĻŋ āϏāĻŽā§āĻĒā§‚āĻ°ā§āĻŖ āĻ•āĻžāϜāϗ⧁āϞāĻŋ āϚāĻŋāĻšā§āύāĻŋāϤ āĻ•āϰāĻŦ⧇;
  • Redis, āϝāĻž āϏ⧇āϞāĻžāϰāĻŋāϰ āϜāĻ¨ā§āϝ āĻāĻ•āϟāĻŋ āϟāĻžāĻ¸ā§āĻ• āĻŦā§āϰ⧋āĻ•āĻžāϰ āĻšāĻŋāϏāĻžāĻŦ⧇ āĻ•āĻžāϜ āĻ•āϰāĻŦ⧇;
  • āϏ⧇āϞāĻžāϰāĻŋ āĻ•āĻ°ā§āĻŽā§€, āϝāĻž āϏāϰāĻžāϏāϰāĻŋ āĻ•āĻžāĻ°ā§āϝ āϏāĻŽā§āĻĒāĻžāĻĻāύ⧇ āύāĻŋāϝ⧁āĻ•ā§āϤ āĻĨāĻžāĻ•āĻŦ⧇āĨ¤
  • āĻĢā§‹āĻ˛ā§āĻĄāĻžāϰ⧇ ./dags āφāĻŽāϰāĻž āφāĻŽāĻžāĻĻ⧇āϰ āĻĢāĻžāχāϞāϗ⧁āϞāĻŋāϕ⧇ āĻĄā§āϝāĻžāĻ—āϗ⧁āϞāĻŋāϰ āĻŦāĻŋāĻŦāϰāϪ⧇āϰ āϏāĻžāĻĨ⧇ āϝ⧁āĻ•ā§āϤ āĻ•āϰāĻŦāĨ¤ āĻāϗ⧁āϞāĻŋ āωāĻĄāĻŧ⧇ āϝāĻžāĻ“āϝāĻŧāĻžāϰ āϏāĻŽāϝāĻŧ āϤ⧁āϞ⧇ āύ⧇āĻ“āϝāĻŧāĻž āĻšāĻŦ⧇, āϤāĻžāχ āĻĒā§āϰāϤāĻŋāϟāĻŋ āĻšāĻžāρāϚāĻŋāϰ āĻĒāϰ⧇ āĻĒ⧁āϰ⧋ āĻ¸ā§āĻŸā§āϝāĻžāĻ•āϟāĻŋ āĻ˜ā§‹āϰāĻžāϘ⧁āϰāĻŋ āĻ•āϰāĻžāϰ āĻĻāϰāĻ•āĻžāϰ āύ⧇āχāĨ¤

āĻ•āĻŋāϛ⧁ āϜāĻžāϝāĻŧāĻ—āĻžāϝāĻŧ, āωāĻĻāĻžāĻšāϰāĻŖāϗ⧁āϞāĻŋāϰ āϕ⧋āĻĄāϟāĻŋ āϏāĻŽā§āĻĒā§‚āĻ°ā§āĻŖāϰ⧂āĻĒ⧇ āĻĻ⧇āĻ–āĻžāύ⧋ āĻšāϝāĻŧ āύāĻž (āϝāĻžāϤ⧇ āĻĒāĻžāĻ ā§āϝāϟāĻŋ āĻŦāĻŋāĻļ⧃āĻ™ā§āĻ–āϞ āύāĻž āĻšāϝāĻŧ), āϤāĻŦ⧇ āϕ⧋āĻĨāĻžāĻ“ āĻāϟāĻŋ āĻĒā§āϰāĻ•ā§āϰāĻŋāϝāĻŧāĻžāϝāĻŧ āĻĒāϰāĻŋāĻŦāĻ°ā§āϤāύ āĻ•āϰāĻž āĻšāϝāĻŧāĨ¤ āϏāĻŽā§āĻĒā§‚āĻ°ā§āĻŖ āĻ•āĻžāĻœā§‡āϰ āϕ⧋āĻĄ āωāĻĻāĻžāĻšāϰāĻŖ āϏāĻ‚āĻ—ā§āϰāĻšāĻ¸ā§āĻĨāϞ āĻĒāĻžāĻ“āϝāĻŧāĻž āϝāĻžāĻŦ⧇ https://github.com/dm-logv/airflow-tutorial.

Docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

āĻŽāĻ¨ā§āϤāĻŦā§āϝ āϏāĻŽā§‚āĻš:

  • āϰāϚāύāĻžāϟāĻŋāϰ āϏāĻŽāĻžāĻŦ⧇āĻļ⧇, āφāĻŽāĻŋ āĻŽā§‚āϞāϤ āϏ⧁āĻĒāϰāĻŋāϚāĻŋāϤ āϚāĻŋāĻ¤ā§āϰ⧇āϰ āωāĻĒāϰ āύāĻŋāĻ°ā§āĻ­āϰ āĻ•āϰ⧇āĻ›āĻŋāϞāĻžāĻŽ āĻĒāĻžāϕ⧇āϞ/āĻĄāĻ•āĻžāϰ-āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ - āĻāϟāĻŋ āĻšā§‡āĻ• āφāωāϟ āĻ•āϰāϤ⧇ āϭ⧁āϞāĻŦ⧇āύ āύāĻž. āĻšāϝāĻŧāϤ⧋ āϤ⧋āĻŽāĻžāϰ āĻœā§€āĻŦāύ⧇ āφāϰ āĻ•āĻŋāϛ⧁āϰ āĻĻāϰāĻ•āĻžāϰ āύ⧇āχāĨ¤
  • āϏāĻŽāĻ¸ā§āϤ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āϏ⧇āϟāĻŋāĻ‚āϏ āύāĻž āĻļ⧁āϧ⧁āĻŽāĻžāĻ¤ā§āϰ āĻŽāĻžāĻ§ā§āϝāĻŽā§‡ āωāĻĒāϞāĻŦā§āϧ airflow.cfg, āĻ•āĻŋāĻ¨ā§āϤ⧁ āĻĒāϰāĻŋāĻŦ⧇āĻļ āϭ⧇āϰāĻŋāϝāĻŧ⧇āĻŦāϞ⧇āϰ āĻŽāĻžāĻ§ā§āϝāĻŽā§‡āĻ“ (āĻĄā§‡āϭ⧇āϞāĻĒāĻžāϰāĻĻ⧇āϰ āϧāĻ¨ā§āϝāĻŦāĻžāĻĻ), āϝāĻž āφāĻŽāĻŋ āĻĻā§‚āώāĻŋāϤāĻ­āĻžāĻŦ⧇ āϏ⧁āĻŦāĻŋāϧāĻž āύāĻŋāϝāĻŧ⧇āĻ›āĻŋāĨ¤
  • āĻ¸ā§āĻŦāĻžāĻ­āĻžāĻŦāĻŋāĻ•āĻ­āĻžāĻŦ⧇āχ, āĻāϟāĻŋ āωāĻ¤ā§āĻĒāĻžāĻĻāύ-āĻĒā§āϰāĻ¸ā§āϤ⧁āϤ āύāϝāĻŧ: āφāĻŽāĻŋ āχāĻšā§āĻ›āĻžāĻ•ā§ƒāϤāĻ­āĻžāĻŦ⧇ āĻĒāĻžāĻ¤ā§āϰ⧇ āĻšāĻžāĻ°ā§āϟāĻŦāĻŋāϟ āϰāĻžāĻ–āĻŋāύāĻŋ, āφāĻŽāĻŋ āύāĻŋāϰāĻžāĻĒāĻ¤ā§āϤāĻž āύāĻŋāϝāĻŧ⧇ āĻŽāĻžāĻĨāĻž āϘāĻžāĻŽāĻžāχāύāĻŋāĨ¤ āĻ•āĻŋāĻ¨ā§āϤ⧁ āφāĻŽāĻŋ āφāĻŽāĻžāĻĻ⧇āϰ āĻĒāϰ⧀āĻ•ā§āώāĻžāĻ•āĻžāϰ⧀āĻĻ⧇āϰ āϜāĻ¨ā§āϝ āĻ¨ā§āϝ⧂āύāϤāĻŽ āωāĻĒāϝ⧁āĻ•ā§āϤ āĻ•āϰ⧇āĻ›āĻŋāĨ¤
  • āĻŽāύ⧇ āϰāĻžāĻ–āĻŦ⧇āύ āϝ⧇:
    • āĻĄā§‡āĻ— āĻĢā§‹āĻ˛ā§āĻĄāĻžāϰāϟāĻŋ āĻļāĻŋāĻĄāĻŋāωāϞāĻ•āĻžāϰ⧀ āĻāĻŦāĻ‚ āĻ•āĻ°ā§āĻŽā§€āĻĻ⧇āϰ āωāĻ­āϝāĻŧ⧇āϰ āĻ•āĻžāϛ⧇āχ āĻ…ā§āϝāĻžāĻ•ā§āϏ⧇āϏāϝ⧋āĻ—ā§āϝ āĻšāϤ⧇ āĻšāĻŦ⧇āĨ¤
    • āĻāĻ•āχ āϏāĻŽāĻ¸ā§āϤ āϤ⧃āϤ⧀āϝāĻŧ āĻĒāĻ•ā§āώ⧇āϰ āϞāĻžāχāĻŦā§āϰ⧇āϰāĻŋāϰ āĻ•ā§āώ⧇āĻ¤ā§āϰ⧇ āĻĒā§āϰāϝ⧋āĻœā§āϝ - āϏ⧇āϗ⧁āϞāĻŋ āĻ…āĻŦāĻļā§āϝāχ āĻāĻ•āϟāĻŋ āĻļāĻŋāĻĄāĻŋāωāϞ āĻāĻŦāĻ‚ āĻ•āĻ°ā§āĻŽā§€āĻĻ⧇āϰ āϏāĻžāĻĨ⧇ āĻŽā§‡āĻļāĻŋāύ⧇ āχāύāĻ¸ā§āϟāϞ āĻ•āϰāĻž āωāϚāĻŋāϤāĨ¤

āφāĻšā§āĻ›āĻž, āĻāĻ–āύ āĻāϟāĻž āϏāĻšāϜ:

$ docker-compose up --scale worker=3

āϏāĻŦāĻ•āĻŋāϛ⧁ āωāĻ āĻžāϰ āĻĒāϰ⧇, āφāĻĒāύāĻŋ āĻ“āϝāĻŧ⧇āĻŦ āχāĻ¨ā§āϟāĻžāϰāĻĢ⧇āϏāϗ⧁āϞāĻŋ āĻĻ⧇āĻ–āϤ⧇ āĻĒāĻžāϰ⧇āύ:

āĻŽā§ŒāϞāĻŋāĻ• āϧāĻžāϰāĻŖāĻž

āφāĻĒāύāĻŋ āϝāĻĻāĻŋ āĻāχ āϏāĻŽāĻ¸ā§āϤ "āĻĄā§āϝāĻžāĻ—āϏ" āĻāϰ āĻŽāĻ§ā§āϝ⧇ āĻ•āĻŋāϛ⧁ āύāĻž āĻŦ⧁āĻā§‡ āĻĨāĻžāϕ⧇āύ āϤāĻŦ⧇ āĻāĻ–āĻžāύ⧇ āĻāĻ•āϟāĻŋ āϏāĻ‚āĻ•ā§āώāĻŋāĻĒā§āϤ āĻ…āĻ­āĻŋāϧāĻžāύ āϰāϝāĻŧ⧇āϛ⧇:

  • āύāĻŋāĻ°ā§āϧāĻžāϰāĻŖāĻ•āĻžāϰ⧀ - āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋āϤ⧇ āϏāĻŦāĻšā§‡āϝāĻŧ⧇ āϗ⧁āϰ⧁āĻ¤ā§āĻŦāĻĒā§‚āĻ°ā§āĻŖ āϚāĻžāϚāĻž, āϝāĻŋāύāĻŋ āύāĻŋāϝāĻŧāĻ¨ā§āĻ¤ā§āϰāĻŖ āĻ•āϰ⧇āύ āϝ⧇ āϰ⧋āĻŦāϟ āĻ•āĻ ā§‹āϰ āĻĒāϰāĻŋāĻļā§āϰāĻŽ āĻ•āϰ⧇, āĻāĻ•āϜāύ āĻŦā§āϝāĻ•ā§āϤāĻŋ āύāϝāĻŧ: āϏāĻŽāϝāĻŧāϏ⧂āĻšā§€ āύāĻŋāϰ⧀āĻ•ā§āώāĻŖ āĻ•āϰ⧇, āĻĄā§‡āĻ— āφāĻĒāĻĄā§‡āϟ āĻ•āϰ⧇, āĻ•āĻžāϜāϗ⧁āϞāĻŋ āϚāĻžāϞ⧁ āĻ•āϰ⧇āĨ¤

    āϏāĻžāϧāĻžāϰāĻŖāĻ­āĻžāĻŦ⧇, āĻĒ⧁āϰāĻžāύ⧋ āϏāĻ‚āĻ¸ā§āĻ•āϰāĻŖāϗ⧁āϞāĻŋāϤ⧇, āϤāĻžāϰ āĻ¸ā§āĻŽā§ƒāϤāĻŋāϤ⧇ āϏāĻŽāĻ¸ā§āϝāĻž āĻ›āĻŋāϞ (āύāĻž, āĻ…ā§āϝāĻžāĻŽāύ⧇āϏāĻŋāϝāĻŧāĻž āύāϝāĻŧ, āϤāĻŦ⧇ āĻĢāĻžāρāϏ) āĻāĻŦāĻ‚ āϞāĻŋāĻ—ā§āϝāĻžāϏāĻŋ āĻĒā§āϝāĻžāϰāĻžāĻŽāĻŋāϟāĻžāϰāϟāĻŋ āĻāĻŽāύāĻ•āĻŋ āĻ•āύāĻĢāĻŋāĻ—āĻžāϰ⧇ āϰāϝāĻŧ⧇ āϗ⧇āϛ⧇ run_duration - āĻāϟāĻŋ āĻĒ⧁āύāϰāĻžāϝāĻŧ āϚāĻžāϞ⧁ āĻ•āϰāĻžāϰ āĻŦā§āϝāĻŦāϧāĻžāύāĨ¤ āĻ•āĻŋāĻ¨ā§āϤ⧁ āĻāĻ–āύ āϏāĻŦ āĻ āĻŋāĻ• āφāϛ⧇āĨ¤

  • DAG (āĻ“āϰāĻĢ⧇ "āĻĄā§āϝāĻžāĻ—") - "āύāĻŋāĻ°ā§āĻĻ⧇āĻļāĻŋāϤ āĻ…ā§āϝāĻžāϏāĻžāχāĻ•ā§āϞāĻŋāĻ• āĻ—ā§āϰāĻžāĻĢ", āĻ•āĻŋāĻ¨ā§āϤ⧁ āĻāχ āϧāϰāύ⧇āϰ āϏāĻ‚āĻœā§āĻžāĻž āϖ⧁āĻŦ āĻ•āĻŽ āϞ⧋āĻ•āϕ⧇ āĻŦāϞāϤ⧇ āĻĒāĻžāϰāĻŦ⧇, āĻ•āĻŋāĻ¨ā§āϤ⧁ āφāϏāϞ⧇ āĻāϟāĻŋ āĻāϕ⧇ āĻ…āĻĒāϰ⧇āϰ āϏāĻžāĻĨ⧇ āχāĻ¨ā§āϟāĻžāϰāĻ…ā§āϝāĻžāĻ•ā§āϟ āĻ•āϰāĻžāϰ āϜāĻ¨ā§āϝ āĻāĻ•āϟāĻŋ āϧāĻžāϰāĻ• (āύ⧀āĻšā§‡ āĻĻ⧇āϖ⧁āύ) āĻŦāĻž SSIS-āĻ āĻĒā§āϝāĻžāϕ⧇āϜ āĻāĻŦāĻ‚ āχāύāĻĢāϰāĻŽā§āϝāĻžāϟāĻŋāĻ•āĻžāϝāĻŧ āĻ“āϝāĻŧāĻžāĻ°ā§āĻ•āĻĢā§āϞ⧋-āĻāϰ āĻāĻ•āϟāĻŋ āĻ…ā§āϝāĻžāύāĻžāϞāĻ—āĨ¤ .

    āĻĄā§āϝāĻžāĻ—āϗ⧁āϞāĻŋ āĻ›āĻžāĻĄāĻŧāĻžāĻ“, āĻāĻ–āύāĻ“ āϏāĻžāĻŦāĻĄā§āϝāĻžāĻ— āĻĨāĻžāĻ•āϤ⧇ āĻĒāĻžāϰ⧇, āϤāĻŦ⧇ āφāĻŽāϰāĻž āϏāĻŽā§āĻ­āĻŦāϤ āϏ⧇āϗ⧁āϞāĻŋ āĻĒāĻžāĻŦ āύāĻžāĨ¤

  • DAG āϰāĻžāύ - āĻĒā§āϰāĻžāϰāĻŽā§āĻ­āĻŋāĻ• āĻĄā§‡āĻ—, āϝāĻž āϤāĻžāϰ āύāĻŋāϜāĻ¸ā§āĻŦ āĻŦāϰāĻžāĻĻā§āĻĻ āĻ•āϰāĻž āĻšāϝāĻŧ execution_date. āĻāĻ•āχ āĻĄā§āϝāĻžāϗ⧇āϰ āĻĄā§āϝāĻžāĻ—āϰāĻž āϏāĻŽāĻžāĻ¨ā§āϤāϰāĻžāϞāĻ­āĻžāĻŦ⧇ āĻ•āĻžāϜ āĻ•āϰāϤ⧇ āĻĒāĻžāϰ⧇ (āϝāĻĻāĻŋ āφāĻĒāύāĻŋ āĻ…āĻŦāĻļā§āϝāχ āφāĻĒāύāĻžāϰ āĻ•āĻžāϜāϗ⧁āϞāĻŋāϕ⧇ āĻ…āĻĻāĻŽā§āϝ āĻ•āϰ⧇ āĻĨāĻžāϕ⧇āύ)āĨ¤
  • āĻ…āĻĒāĻžāϰ⧇āϟāϰ āĻāĻ•āϟāĻŋ āύāĻŋāĻ°ā§āĻĻāĻŋāĻˇā§āϟ āĻ•ā§āϰāĻŋāϝāĻŧāĻž āϏāĻŽā§āĻĒāĻžāĻĻāύ⧇āϰ āϜāĻ¨ā§āϝ āĻĻāĻžāϝāĻŧā§€ āϕ⧋āĻĄā§‡āϰ āϟ⧁āĻ•āϰ⧋āĨ¤ āϤāĻŋāύ āϧāϰāύ⧇āϰ āĻ…āĻĒāĻžāϰ⧇āϟāϰ āφāϛ⧇:
    • āĻ•āĻ°ā§āĻŽāφāĻŽāĻžāĻĻ⧇āϰ āĻĒā§āϰāĻŋāϝāĻŧ āĻŽāϤ PythonOperator, āϝāĻž āϝ⧇āϕ⧋āύ⧋ (āĻŦ⧈āϧ) āĻĒāĻžāχāĻĨāύ āϕ⧋āĻĄ āϚāĻžāϞāĻžāϤ⧇ āĻĒāĻžāϰ⧇;
    • āĻšāĻ¸ā§āϤāĻžāĻ¨ā§āϤāϰ, āϝāĻž āĻ¸ā§āĻĨāĻžāύ āĻĨ⧇āϕ⧇ āĻ…āĻ¨ā§āϝ āϜāĻžāϝāĻŧāĻ—āĻžāϝāĻŧ āĻĄā§‡āϟāĻž āĻĒāϰāĻŋāĻŦāĻšāύ āĻ•āϰ⧇, āĻŦāϞ⧇, MsSqlToHiveTransfer;
    • āϏ⧇āĻ¨ā§āϏāϰ āĻ…āĻ¨ā§āϝāĻĻāĻŋāϕ⧇, āĻāϟāĻŋ āφāĻĒāύāĻžāϕ⧇ āĻĒā§āϰāϤāĻŋāĻ•ā§āϰāĻŋāϝāĻŧāĻž āϜāĻžāύāĻžāϤ⧇ āĻŦāĻž āĻāĻ•āϟāĻŋ āχāϭ⧇āĻ¨ā§āϟ āύāĻž āĻšāĻ“āϝāĻŧāĻž āĻĒāĻ°ā§āϝāĻ¨ā§āϤ āĻĄā§āϝāĻžāĻ—āϟāĻŋāϰ āĻĒāϰāĻŦāĻ°ā§āϤ⧀ āϏāĻŽā§āĻĒāĻžāĻĻāύāϕ⧇ āϧ⧀āϰ āĻ•āϰāĻžāϰ āĻ…āύ⧁āĻŽāϤāĻŋ āĻĻ⧇āĻŦ⧇āĨ¤ HttpSensor āύāĻŋāĻ°ā§āĻĻāĻŋāĻˇā§āϟ āĻāĻ¨ā§āĻĄāĻĒāϝāĻŧ⧇āĻ¨ā§āϟ āϟāĻžāύāϤ⧇ āĻĒāĻžāϰ⧇, āĻāĻŦāĻ‚ āϝāĻ–āύ āĻ•āĻžāĻ™ā§āĻ•ā§āώāĻŋāϤ āĻĒā§āϰāϤāĻŋāĻ•ā§āϰāĻŋāϝāĻŧāĻž āĻ…āĻĒ⧇āĻ•ā§āώāĻž āĻ•āϰāϛ⧇, āĻ¸ā§āĻĨāĻžāύāĻžāĻ¨ā§āϤāϰ āĻļ⧁āϰ⧁ āĻ•āϰ⧁āύ GoogleCloudStorageToS3Operator. āĻāĻ•āϟāĻŋ āĻ…āύ⧁āϏāĻ¨ā§āϧāĻŋā§ŽāϏ⧁ āĻŽāύ āϜāĻŋāĻœā§āĻžāĻžāϏāĻž āĻ•āϰāĻŦ⧇: "āϕ⧇āύ? āϏāĻ°ā§āĻŦā§‹āĻĒāϰāĻŋ, āφāĻĒāύāĻŋ āĻ āĻŋāĻ• āĻ…āĻĒāĻžāϰ⧇āϟāϰ⧇ āĻĒ⧁āύāϰāĻžāĻŦ⧃āĻ¤ā§āϤāĻŋ āĻ•āϰāϤ⧇ āĻĒāĻžāϰ⧇āύ!" āĻāĻŦāĻ‚ āϤāĻžāϰāĻĒāϰ, āĻ¸ā§āĻĨāĻ—āĻŋāϤ āĻ…āĻĒāĻžāϰ⧇āϟāϰāĻĻ⧇āϰ āϏāĻžāĻĨ⧇ āĻ•āĻžāĻœā§‡āϰ āĻĒ⧁āϞ āφāϟāϕ⧇ āύāĻž āĻĻ⧇āĻ“āϝāĻŧāĻžāϰ āϜāĻ¨ā§āϝāĨ¤ āϏ⧇āĻ¨ā§āϏāϰ āĻļ⧁āϰ⧁ āĻšāϝāĻŧ, āĻšā§‡āĻ• āĻ•āϰ⧇ āĻāĻŦāĻ‚ āĻĒāϰāĻŦāĻ°ā§āϤ⧀ āĻĒā§āϰāĻšā§‡āĻˇā§āϟāĻžāϰ āφāϗ⧇ āĻŽāĻžāϰāĻž āϝāĻžāϝāĻŧāĨ¤
  • āĻ•āĻžāĻ°ā§āϝ - āĻ˜ā§‹āώāĻŋāϤ āĻ…āĻĒāĻžāϰ⧇āϟāϰ, āĻĒā§āϰāĻ•āĻžāϰ āύāĻŋāĻ°ā§āĻŦāĻŋāĻļ⧇āώ⧇, āĻāĻŦāĻ‚ āĻĄā§āϝāĻžāϗ⧇āϰ āϏāĻžāĻĨ⧇ āϏāĻ‚āϝ⧁āĻ•ā§āϤ āĻ•āĻ°ā§āĻŽā§‡āϰ āĻĒāĻĻ⧇ āωāĻ¨ā§āύ⧀āϤ āĻšāϝāĻŧāĨ¤
  • āϟāĻžāĻ¸ā§āĻ• āωāĻĻāĻžāĻšāϰāĻŖ - āϝāĻ–āύ āϏāĻžāϧāĻžāϰāĻŖ āĻĒāϰāĻŋāĻ•āĻ˛ā§āĻĒāύāĻžāĻ•āĻžāϰ⧀ āϏāĻŋāĻĻā§āϧāĻžāĻ¨ā§āϤ āύāĻŋāϝāĻŧ⧇āĻ›āĻŋāϞ⧇āύ āϝ⧇ āĻĒāĻžāϰāĻĢāĻ°ā§āĻŽāĻžāϰ-āĻ•āĻ°ā§āĻŽā§€āĻĻ⧇āϰ āϝ⧁āĻĻā§āϧ⧇ āĻ•āĻžāϜ āĻĒāĻžāĻ āĻžāύ⧋āϰ āϏāĻŽāϝāĻŧ āĻāϏ⧇āϛ⧇ (āĻ āĻŋāĻ• āϜāĻžāϝāĻŧāĻ—āĻžāϝāĻŧ, āϝāĻĻāĻŋ āφāĻŽāϰāĻž āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰāĻŋ LocalExecutor āĻŦāĻž āĻ•ā§āώ⧇āĻ¤ā§āϰ⧇ āĻāĻ•āϟāĻŋ āĻĻā§‚āϰāĻŦāĻ°ā§āϤ⧀ āύ⧋āĻĄ CeleryExecutor), āĻāϟāĻŋ āϤāĻžāĻĻ⧇āϰ āĻāĻ•āϟāĻŋ āĻĒā§āϰāϏāĻ™ā§āĻ— āĻŦāϰāĻžāĻĻā§āĻĻ āĻ•āϰ⧇ (āĻ…āĻ°ā§āĻĨāĻžā§Ž, āϭ⧇āϰāĻŋāϝāĻŧ⧇āĻŦāϞ⧇āϰ āĻāĻ•āϟāĻŋ āϏ⧇āϟ - āĻāĻ•ā§āϏāĻŋāĻ•āĻŋāωāĻļāύ āĻĒā§āϝāĻžāϰāĻžāĻŽāĻŋāϟāĻžāϰ), āĻ•āĻŽāĻžāĻ¨ā§āĻĄ āĻŦāĻž āĻ•ā§āϝ⧋āϝāĻŧāĻžāϰ⧀ āĻŸā§‡āĻŽāĻĒā§āϞ⧇āϟ āĻĒā§āϰāϏāĻžāϰāĻŋāϤ āĻ•āϰ⧇ āĻāĻŦāĻ‚ āϤāĻžāĻĻ⧇āϰ āĻĒ⧁āϞ āĻ•āϰ⧇āĨ¤

āφāĻŽāϰāĻž āĻ•āĻžāϜ āϤ⧈āϰāĻŋ āĻ•āϰāĻŋ

āĻĒā§āϰāĻĨāĻŽā§‡, āφāϏ⧁āύ āφāĻŽāĻžāĻĻ⧇āϰ āĻĄāϗ⧇āϰ āϏāĻžāϧāĻžāϰāĻŖ āĻ¸ā§āĻ•āĻŋāĻŽāϟāĻŋāϰ āϰ⧂āĻĒāϰ⧇āĻ–āĻž āĻĻāĻŋāχ, āĻāĻŦāĻ‚ āϤāĻžāϰāĻĒāϰ⧇ āφāĻŽāϰāĻž āφāϰāĻ“ āĻŦ⧇āĻļāĻŋ āĻ•āϰ⧇ āĻŦāĻŋāĻļāĻĻāϟāĻŋāϤ⧇ āĻĄā§āĻŦ āĻĻ⧇āĻŦ, āĻ•āĻžāϰāĻŖ āφāĻŽāϰāĻž āĻ•āĻŋāϛ⧁ āĻ…-āϤ⧁āĻšā§āĻ› āϏāĻŽāĻžāϧāĻžāύ āĻĒā§āϰāϝāĻŧā§‹āĻ— āĻ•āϰāĻŋāĨ¤

āϏ⧁āϤāϰāĻžāĻ‚, āĻāϰ āϏāĻšāϜāϤāĻŽ āφāĻ•āĻžāϰ⧇, āĻāχ āϜāĻžāϤ⧀āϝāĻŧ āĻĄā§āϝāĻžāĻ— āĻāϰ āĻŽāϤ⧋ āĻĻ⧇āĻ–āĻžāĻŦ⧇:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

āφāϏ⧁āύ āĻāϟāĻŋ āĻŦ⧇āϰ āĻ•āϰāĻž āϝāĻžāĻ•:

  • āĻĒā§āϰāĻĨāĻŽāϤ, āφāĻŽāϰāĻž āĻĒā§āϰāϝāĻŧā§‹āϜāύ⧀āϝāĻŧ libs āĻāĻŦāĻ‚ āφāĻŽāĻĻāĻžāύāĻŋ āĻ•āϰāĻŋ āĻ…āĻ¨ā§āϝāĻ•āĻŋāϛ⧁;
  • sql_server_ds - āĻāϟāĻž List[namedtuple[str, str]] āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āĻ•āĻžāύ⧇āĻ•āĻļāύ āĻĨ⧇āϕ⧇ āϏāĻ‚āϝ⧋āϗ⧇āϰ āύāĻžāĻŽ āĻāĻŦāĻ‚ āĻĄāĻžāϟāĻžāĻŦ⧇āϏ āϝāĻž āĻĨ⧇āϕ⧇ āφāĻŽāϰāĻž āφāĻŽāĻžāĻĻ⧇āϰ āĻĒā§āϞ⧇āϟ āύ⧇āĻŦ;
  • dag - āφāĻŽāĻžāĻĻ⧇āϰ āĻĄā§‡āĻ— āĻ˜ā§‹āώāĻŖāĻž, āϝāĻž āĻ…āĻ—āĻ¤ā§āϝāĻž āĻŽāĻ§ā§āϝ⧇ āĻšāϤ⧇ āĻšāĻŦ⧇ globals(), āĻ…āĻ¨ā§āϝāĻĨāĻžāϝāĻŧ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āĻāϟāĻŋ āϖ⧁āρāĻœā§‡ āĻĒāĻžāĻŦ⧇ āύāĻžāĨ¤ āĻĄāĻ—āϕ⧇āĻ“ āĻŦāϞāϤ⧇ āĻšāĻŦ⧇:
    • āϤāĻžāϰ āύāĻžāĻŽ āĻ•āĻŋ orders - āĻāχ āύāĻžāĻŽāϟāĻŋ āϤāĻ–āύ āĻ“āϝāĻŧ⧇āĻŦ āχāĻ¨ā§āϟāĻžāϰāĻĢ⧇āϏ⧇ āĻĒā§āϰāĻĻāĻ°ā§āĻļāĻŋāϤ āĻšāĻŦ⧇,
    • āϝ⧇ āϤāĻŋāύāĻŋ āφāϟāχ āϜ⧁āϞāĻžāχ āĻŽāĻ§ā§āϝāϰāĻžāϤ āĻĨ⧇āϕ⧇ āĻ•āĻžāϜ āĻ•āϰāĻŦ⧇āύ,
    • āĻāĻŦāĻ‚ āĻāϟāĻŋ āϚāĻžāϞāĻžāύ⧋ āωāϚāĻŋāϤ, āĻĒā§āϰāĻžāϝāĻŧ āĻĒā§āϰāϤāĻŋ 6 āϘāĻ¨ā§āϟāĻž (āĻāϰ āĻĒāϰāĻŋāĻŦāĻ°ā§āϤ⧇ āĻāĻ–āĻžāύ⧇ āĻ•āĻ āĻŋāύ āϞ⧋āĻ•āĻĻ⧇āϰ āϜāĻ¨ā§āϝ timedelta() āĻ—ā§āϰāĻšāĻŖāϝ⧋āĻ—ā§āϝ cron-āϞāĻžāχāύ 0 0 0/6 ? * * *, āĻ•āĻŽ āĻ āĻžāĻ¨ā§āĻĄāĻž āϜāĻ¨ā§āϝ - āĻŽāϤ āĻāĻ•āϟāĻŋ āĻ…āĻ­āĻŋāĻŦā§āϝāĻ•ā§āϤāĻŋ @daily);
  • workflow() āĻŽā§‚āϞ āĻ•āĻžāϜ āĻ•āϰāĻŦ⧇, āĻ•āĻŋāĻ¨ā§āϤ⧁ āĻāĻ–āύ āύāϝāĻŧāĨ¤ āφāĻĒāĻžāϤāϤ, āφāĻŽāϰāĻž āĻļ⧁āϧ⧁ āϞāϗ⧇ āφāĻŽāĻžāĻĻ⧇āϰ āĻĒā§āϰāϏāĻ™ā§āĻ— āĻĄāĻžāĻŽā§āĻĒ āĻ•āϰāĻŦāĨ¤
  • āĻāĻŦāĻ‚ āĻāĻ–āύ āĻ•āĻžāϜ āϤ⧈āϰāĻŋ āĻ•āϰāĻžāϰ āϏāĻšāϜ āϝāĻžāĻĻ⧁:
    • āφāĻŽāϰāĻž āφāĻŽāĻžāĻĻ⧇āϰ āωāĻ¤ā§āϏ āĻŽāĻžāĻ§ā§āϝāĻŽā§‡ āϚāĻžāϞāĻžāύ;
    • āφāϰāĻŽā§āĻ­ āĻ•āϰāĻž PythonOperator, āϝāĻž āφāĻŽāĻžāĻĻ⧇āϰ āĻĄāĻžāĻŽāĻŋ āĻ•āĻžāĻ°ā§āϝāĻ•āϰ āĻ•āϰāĻŦ⧇ workflow(). āϟāĻžāĻ¸ā§āϕ⧇āϰ āĻāĻ•āϟāĻŋ āĻ…āύāĻ¨ā§āϝ (āĻĄā§‡āϗ⧇āϰ āĻŽāĻ§ā§āϝ⧇) āύāĻžāĻŽ āωāĻ˛ā§āϞ⧇āĻ– āĻ•āϰāϤ⧇ āϭ⧁āϞāĻŦ⧇āύ āύāĻž āĻāĻŦāĻ‚ āĻĄā§āϝāĻžāĻ—āϟāĻŋ āύāĻŋāĻœā§‡āχ āĻŦ⧇āρāϧ⧇ āĻĻāĻŋāύāĨ¤ āĻĒāϤāĻžāĻ•āĻž provide_context āĻĒāϰāĻŋāĻŦāĻ°ā§āϤ⧇, āĻĢāĻžāĻ‚āĻļāύ⧇ āĻ…āϤāĻŋāϰāĻŋāĻ•ā§āϤ āφāĻ°ā§āϗ⧁āĻŽā§‡āĻ¨ā§āϟ āĻĸāĻžāϞāĻž āĻšāĻŦ⧇, āϝāĻž āφāĻŽāϰāĻž āϏāĻžāĻŦāϧāĻžāύ⧇ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰ⧇ āϏāĻ‚āĻ—ā§āϰāĻš āĻ•āϰāĻŦ **context.

āφāĻĒāĻžāϤāϤ āĻāχāϟ⧁āϕ⧁āχāĨ¤ āφāĻŽāϰāĻž āϝāĻž āĻĒ⧇āϝāĻŧ⧇āĻ›āĻŋ:

  • āĻ“āϝāĻŧ⧇āĻŦ āχāĻ¨ā§āϟāĻžāϰāĻĢ⧇āϏ⧇ āύāϤ⧁āύ āĻĄā§‡āĻ—,
  • āĻĻ⧇āĻĄāĻŧ āĻļāϤāĻžāϧāĻŋāĻ• āĻ•āĻžāϜ āϝāĻž āϏāĻŽāĻžāĻ¨ā§āϤāϰāĻžāϞāĻ­āĻžāĻŦ⧇ āĻ•āĻžāĻ°ā§āϝāĻ•āϰ āĻ•āϰāĻž āĻšāĻŦ⧇ (āϝāĻĻāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋, āϏ⧇āϞāĻžāϰāĻŋ āϏ⧇āϟāĻŋāĻ‚āϏ āĻāĻŦāĻ‚ āϏāĻžāĻ°ā§āĻ­āĻžāϰ⧇āϰ āĻ•ā§āώāĻŽāϤāĻž āĻ…āύ⧁āĻŽāϤāĻŋ āĻĻ⧇āϝāĻŧ)āĨ¤

āĻ“āϝāĻŧ⧇āϞ, āĻĒā§āϰāĻžāϝāĻŧ āĻĒ⧇āϝāĻŧ⧇āĻ›āĻŋāϞāĻžāĻŽ.

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž
āϕ⧇ āύāĻŋāĻ°ā§āĻ­āϰāϤāĻž āχāύāĻ¸ā§āϟāϞ āĻ•āϰāĻŦ⧇?

āĻāχ āĻĒ⧁āϰ⧋ āϜāĻŋāύāĻŋāϏāϟāĻŋ āϏāĻšāϜ āĻ•āϰāĻžāϰ āϜāĻ¨ā§āϝ, āφāĻŽāĻŋ āĻ¸ā§āĻ•ā§āϰ⧁ āĻ•āϰ⧇āĻ›āĻŋ docker-compose.yml āĻĒā§āϰāĻ•ā§āϰāĻŋāϝāĻŧāĻžāĻ•āϰāĻŖ requirements.txt āϏāĻŦ āύ⧋āĻĄā§‡āϰ āωāĻĒāϰāĨ¤

āĻāĻ–āύ āĻāϟāĻŋ āϚāϞ⧇ āϗ⧇āϛ⧇:

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

āϧ⧂āϏāϰ āĻ¸ā§āϕ⧋āϝāĻŧāĻžāϰāϗ⧁āϞāĻŋ āĻšāϞ āϏāĻŽāϝāĻŧāϏ⧂āĻšā§€ āĻĻā§āĻŦāĻžāϰāĻž āĻĒā§āϰāĻ•ā§āϰāĻŋāϝāĻŧāĻžāĻ•ā§ƒāϤ āϟāĻžāĻ¸ā§āĻ• āĻĻ⧃āĻˇā§āϟāĻžāĻ¨ā§āϤāĨ¤

āφāĻŽāϰāĻž āĻāĻ•āϟ⧁ āĻ…āĻĒ⧇āĻ•ā§āώāĻž āĻ•āϰāĻŋ, āĻ•āĻ°ā§āĻŽā§€āϰāĻž āĻ•āĻžāϜāϗ⧁āϞ⧋ āĻŦāĻ¨ā§āϧ āĻ•āϰ⧇ āĻĻ⧇āύ:

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

āϏāĻŦ⧁āϜāϰāĻž āĻ…āĻŦāĻļā§āϝāχ āϤāĻžāĻĻ⧇āϰ āĻ•āĻžāϜ āϏāĻĢāϞāĻ­āĻžāĻŦ⧇ āϏāĻŽā§āĻĒāĻ¨ā§āύ āĻ•āϰ⧇āϛ⧇āĨ¤ āϞāĻžāϞ āϖ⧁āĻŦ āϏāĻĢāϞ āĻšāϝāĻŧ āύāĻžāĨ¤

āϝāĻžāχāĻšā§‹āĻ•, āφāĻŽāĻžāĻĻ⧇āϰ āĻĒā§āϰ⧋āĻĄā§‡ āϕ⧋āύ āĻĢā§‹āĻ˛ā§āĻĄāĻžāϰ āύ⧇āχ ./dags, āĻŽā§‡āĻļāĻŋāύ⧇āϰ āĻŽāĻ§ā§āϝ⧇ āϕ⧋āύ āϏāĻŋāĻ™ā§āĻ•ā§āϰ⧋āύāĻžāχāĻœā§‡āĻļāύ āύ⧇āχ - āϏāĻŦ āĻĄā§āϝāĻžāĻ— āĻļ⧁āϝāĻŧ⧇ āφāϛ⧇ git āφāĻŽāĻžāĻĻ⧇āϰ āĻ—āĻŋāϟāĻ˛ā§āϝāĻžāĻŦ⧇, āĻāĻŦāĻ‚ āĻ—āĻŋāϟāĻ˛ā§āϝāĻžāĻŦ āϏāĻŋāφāχ āĻŽā§‡āĻļāĻŋāύ⧇ āĻāĻ•āĻ¤ā§āϰāĻŋāϤ āĻšāĻ“āϝāĻŧāĻžāϰ āϏāĻŽāϝāĻŧ āφāĻĒāĻĄā§‡āϟāϗ⧁āϞāĻŋ āĻŦāĻŋāϤāϰāĻŖ āĻ•āϰ⧇ master.

āĻĢ⧁āϞ āϏāĻŽā§āĻĒāĻ°ā§āϕ⧇ āĻāĻ•āϟ⧁

āĻļā§āϰāĻŽāĻŋāĻ•āϰāĻž āϝāĻ–āύ āφāĻŽāĻžāĻĻ⧇āϰ āĻĒā§āϰāĻļāĻžāĻ¨ā§āϤāĻŋāϕ⧇ āĻŽāĻžāϰāϛ⧇, āϤāĻ–āύ āφāϏ⧁āύ āφāϰ⧇āĻ•āϟāĻŋ āĻšāĻžāϤāĻŋāϝāĻŧāĻžāϰ āĻŽāύ⧇ āϰāĻžāĻ–āĻŋ āϝāĻž āφāĻŽāĻžāĻĻ⧇āϰ āĻ•āĻŋāϛ⧁ āĻĻ⧇āĻ–āĻžāϤ⧇ āĻĒāĻžāϰ⧇ - āĻĢā§āϞāĻžāĻ“āϝāĻŧāĻžāϰāĨ¤

āĻ•āĻ°ā§āĻŽā§€ āύ⧋āĻĄā§‡āϰ āϏāĻ‚āĻ•ā§āώāĻŋāĻĒā§āϤ āϤāĻĨā§āϝ āϏāĻš āĻĒā§āϰāĻĨāĻŽ āĻĒ⧃āĻˇā§āĻ āĻž:

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

āĻ•āĻžāϜāϗ⧁āϞāĻŋ āϏāĻš āϏāĻŦāĻšā§‡āϝāĻŧ⧇ āϤ⧀āĻŦā§āϰ āĻĒ⧃āĻˇā§āĻ āĻž āϝāĻž āĻ•āĻžāϜ āĻ•āϰāϤ⧇ āĻ—āĻŋāϝāĻŧ⧇āĻ›āĻŋāϞ:

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

āφāĻŽāĻžāĻĻ⧇āϰ āĻŦā§āϰ⧋āĻ•āĻžāϰ⧇āϰ āĻ¸ā§āĻŸā§āϝāĻžāϟāĻžāϏ āϏāĻš āϏāĻŦāĻšā§‡āϝāĻŧ⧇ āĻŦāĻŋāϰāĻ•ā§āϤāĻŋāĻ•āϰ āĻĒ⧃āĻˇā§āĻ āĻž:

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

āϏāĻŦāĻšā§‡āϝāĻŧ⧇ āωāĻœā§āĻœā§āĻŦāϞ āĻĒ⧃āĻˇā§āĻ āĻžāϟāĻŋ āϟāĻžāĻ¸ā§āĻ• āĻ¸ā§āĻŸā§āϝāĻžāϟāĻžāϏ āĻ—ā§āϰāĻžāĻĢ āĻāĻŦāĻ‚ āϤāĻžāĻĻ⧇āϰ āĻ•āĻžāĻ°ā§āϝāĻ•āϰ āĻ•āϰāĻžāϰ āϏāĻŽāϝāĻŧ āϏāĻš:

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

āφāĻŽāϰāĻž underloaded āϞ⧋āĻĄ

āϏ⧁āϤāϰāĻžāĻ‚, āϏāĻŽāĻ¸ā§āϤ āĻ•āĻžāϜ āĻļ⧇āώ āĻšāϝāĻŧ⧇āϛ⧇, āφāĻĒāύāĻŋ āφāĻšāϤāĻĻ⧇āϰ āĻŦāĻšāύ āĻ•āϰāϤ⧇ āĻĒāĻžāϰ⧇āύāĨ¤

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

āĻāĻŦāĻ‚ āϏ⧇āĻ–āĻžāύ⧇ āĻ…āύ⧇āĻ• āφāĻšāϤ āĻšāϝāĻŧ⧇āĻ›āĻŋāϞ - āĻāĻ• āĻŦāĻž āĻ…āĻ¨ā§āϝ āĻ•āĻžāϰāϪ⧇āĨ¤ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āϏāĻ āĻŋāĻ• āĻŦā§āϝāĻŦāĻšāĻžāϰ⧇āϰ āĻ•ā§āώ⧇āĻ¤ā§āϰ⧇, āĻāχ āϖ⧁āĻŦ āĻ¸ā§āϕ⧋āϝāĻŧāĻžāϰāϗ⧁āϞāĻŋ āύāĻŋāĻ°ā§āĻĻ⧇āĻļ āĻ•āϰ⧇ āϝ⧇ āĻĄā§‡āϟāĻž āύāĻŋāĻļā§āϚāĻŋāϤāĻ­āĻžāĻŦ⧇ āφāϏ⧇āύāĻŋāĨ¤

āφāĻĒāύāĻžāϕ⧇ āϞāĻ—āϟāĻŋ āĻĻ⧇āĻ–āϤ⧇ āĻšāĻŦ⧇ āĻāĻŦāĻ‚ āĻĒāϤāĻŋāϤ āϟāĻžāĻ¸ā§āĻ• āχāύāĻ¸ā§āĻŸā§āϝāĻžāĻ¨ā§āϏāϗ⧁āϞāĻŋ āĻĒ⧁āύāϰāĻžāϝāĻŧ āϚāĻžāϞ⧁ āĻ•āϰāϤ⧇ āĻšāĻŦ⧇āĨ¤

āϝ⧇āϕ⧋āύ⧋ āĻ¸ā§āϕ⧋āϝāĻŧāĻžāϰ⧇ āĻ•ā§āϞāĻŋāĻ• āĻ•āϰ⧇, āφāĻŽāϰāĻž āφāĻŽāĻžāĻĻ⧇āϰ āϜāĻ¨ā§āϝ āωāĻĒāϞāĻŦā§āϧ āĻ…ā§āϝāĻžāĻ•āĻļāύ āĻĻ⧇āĻ–āϤ⧇ āĻĒāĻžāĻŦ:

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

āφāĻĒāύāĻŋ āύāĻŋāϤ⧇ āĻĒāĻžāϰ⧇āύ āĻāĻŦāĻ‚ āĻĒāϤāĻŋāϤ āĻĒāϰāĻŋāĻˇā§āĻ•āĻžāϰ āĻ•āϰāϤ⧇ āĻĒāĻžāϰ⧇āύ. āĻ…āĻ°ā§āĻĨāĻžā§Ž, āφāĻŽāϰāĻž āϭ⧁āϞ⧇ āϝāĻžāχ āϝ⧇ āϏ⧇āĻ–āĻžāύ⧇ āĻ•āĻŋāϛ⧁ āĻŦā§āϝāĻ°ā§āĻĨ āĻšāϝāĻŧ⧇āϛ⧇ āĻāĻŦāĻ‚ āĻāĻ•āχ āωāĻĻāĻžāĻšāϰāĻŖ āϟāĻžāĻ¸ā§āĻ• āĻļāĻŋāĻĄāĻŋāωāϞāĻžāϰ⧇āϰ āĻ•āĻžāϛ⧇ āϝāĻžāĻŦ⧇āĨ¤

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

āĻāϟāĻž āĻ¸ā§āĻĒāĻˇā§āϟ āϝ⧇ āϏāĻŽāĻ¸ā§āϤ āϞāĻžāϞ āĻ¸ā§āϕ⧋āϝāĻŧāĻžāϰ⧇āϰ āϏāĻžāĻĨ⧇ āĻŽāĻžāωāϏ āĻĻāĻŋāϝāĻŧ⧇ āĻāϟāĻŋ āĻ•āϰāĻž āϖ⧁āĻŦ āĻŽāĻžāύāĻŦāĻŋāĻ• āύāϝāĻŧ - āĻāϟāĻŋ āφāĻŽāϰāĻž āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āĻĨ⧇āϕ⧇ āφāĻļāĻž āĻ•āϰāĻŋ āύāĻžāĨ¤ āĻ¸ā§āĻŦāĻžāĻ­āĻžāĻŦāĻŋāĻ•āĻ­āĻžāĻŦ⧇āχ, āφāĻŽāĻžāĻĻ⧇āϰ āĻ•āĻžāϛ⧇ āĻ—āĻŖāĻŦāĻŋāĻ§ā§āĻŦāĻ‚āϏ⧀ āĻ…āĻ¸ā§āĻ¤ā§āϰ āϰāϝāĻŧ⧇āϛ⧇: Browse/Task Instances

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

āφāϏ⧁āύ āĻāĻ•āĻŦāĻžāϰ⧇ āϏāĻŦāĻ•āĻŋāϛ⧁ āύāĻŋāĻ°ā§āĻŦāĻžāϚāύ āĻ•āϰāĻŋ āĻāĻŦāĻ‚ āĻļā§‚āĻ¨ā§āϝ⧇ āϰāĻŋāϏ⧇āϟ āĻ•āϰāĻŋ, āϏāĻ āĻŋāĻ• āφāχāĻŸā§‡āĻŽāϟāĻŋāϤ⧇ āĻ•ā§āϞāĻŋāĻ• āĻ•āϰ⧁āύ:

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

āĻĒāϰāĻŋāĻˇā§āĻ•āĻžāϰ āĻ•āϰāĻžāϰ āĻĒāϰ⧇, āφāĻŽāĻžāĻĻ⧇āϰ āĻŸā§āϝāĻžāĻ•ā§āϏāĻŋāϗ⧁āϞāĻŋ āĻāχāϰāĻ•āĻŽ āĻĻ⧇āĻ–āĻžāϝāĻŧ (āϤāĻžāϰāĻž āχāϤāĻŋāĻŽāĻ§ā§āϝ⧇ āĻļāĻŋāĻĄāĻŋāωāϞāĻžāϰ⧇āϰ āϜāĻ¨ā§āϝ āϤāĻžāĻĻ⧇āϰ āϏāĻŽāϝāĻŧāϏ⧂āĻšā§€āϰ āϜāĻ¨ā§āϝ āĻ…āĻĒ⧇āĻ•ā§āώāĻž āĻ•āϰāϛ⧇):

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

āϏāĻ‚āϝ⧋āĻ—, āĻšā§āĻ• āĻāĻŦāĻ‚ āĻ…āĻ¨ā§āϝāĻžāĻ¨ā§āϝ āϭ⧇āϰāĻŋāϝāĻŧ⧇āĻŦāϞ

āĻāϟāĻž āĻĒāϰāĻŦāĻ°ā§āϤ⧀ DAG āĻĻ⧇āĻ–āĻžāϰ āϏāĻŽāϝāĻŧ, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Đ“ĐžŅĐŋОда Ņ…ĐžŅ€ĐžŅˆĐ¸Đĩ, ĐžŅ‚Ņ‡Đĩ҂ҋ ОйĐŊОвĐģĐĩĐŊŅ‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ĐĐ°Ņ‚Đ°Ņˆ, ĐŋŅ€ĐžŅŅ‹ĐŋĐ°ĐšŅŅ, ĐŧŅ‹ {{ dag.dag_id }} ŅƒŅ€ĐžĐŊиĐģи
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

āϏāĻŦāĻžāχ āĻ•āĻŋ āĻ•āĻ–āύ⧋ āϰāĻŋāĻĒā§‹āĻ°ā§āϟ āφāĻĒāĻĄā§‡āϟ āĻ•āϰ⧇āϛ⧇? āĻāϟāĻŋ āφāĻŦāĻžāϰ āϤāĻžāϰ: āϤāĻĨā§āϝ āϕ⧋āĻĨāĻž āĻĨ⧇āϕ⧇ āĻĒ⧇āϤ⧇ āĻšāĻŦ⧇ āϤāĻžāϰ āĻāĻ•āϟāĻŋ āϤāĻžāϞāĻŋāĻ•āĻž āϰāϝāĻŧ⧇āϛ⧇; āϝ⧇āĻ–āĻžāύ⧇ āϰāĻžāĻ–āĻž āĻšāĻŦ⧇ āĻāĻ•āϟāĻŋ āϤāĻžāϞāĻŋāĻ•āĻž āφāϛ⧇; āϝāĻ–āύ āϏāĻŦāĻ•āĻŋāϛ⧁ āϘāĻŸā§‡ āĻŦāĻž āϭ⧇āϙ⧇ āϝāĻžāϝāĻŧ āϤāĻ–āύ āĻšāĻ°ā§āύ āĻĻāĻŋāϤ⧇ āϭ⧁āϞāĻŦ⧇āύ āύāĻž (āĻ­āĻžāϞ, āĻāϟāĻŋ āφāĻŽāĻžāĻĻ⧇āϰ āϏāĻŽā§āĻĒāĻ°ā§āϕ⧇ āύāϝāĻŧ, āύāĻž)āĨ¤

āφāϏ⧁āύ āφāĻŦāĻžāϰ āĻĢāĻžāχāϞ⧇āϰ āĻŽāĻ§ā§āϝ āĻĻāĻŋāϝāĻŧ⧇ āϝāĻžāύ āĻāĻŦāĻ‚ āύāϤ⧁āύ āĻ…āĻ¸ā§āĻĒāĻˇā§āϟ āϜāĻŋāύāĻŋāϏāϗ⧁āϞāĻŋ āĻĻ⧇āϖ⧁āύ:

  • from commons.operators import TelegramBotSendMessage - āϕ⧋āύ⧋ āĻ•āĻŋāϛ⧁āχ āφāĻŽāĻžāĻĻ⧇āϰ āύāĻŋāϜāĻ¸ā§āĻŦ āĻ…āĻĒāĻžāϰ⧇āϟāϰ āϤ⧈āϰāĻŋ āĻ•āϰāϤ⧇ āĻŦāĻžāϧāĻž āĻĻ⧇āϝāĻŧ āύāĻž, āϝāĻžāϰ āϏ⧁āĻŦāĻŋāϧāĻž āφāĻŽāϰāĻž āφāύāĻŦā§āϞāĻ•āĻĄ-āĻ āĻŦāĻžāĻ°ā§āϤāĻž āĻĒāĻžāĻ āĻžāύ⧋āϰ āϜāĻ¨ā§āϝ āĻāĻ•āϟāĻŋ āϛ⧋āϟ āĻŽā§‹āĻĄāĻŧāĻ• āϤ⧈āϰāĻŋ āĻ•āϰ⧇ āύāĻŋāϝāĻŧ⧇āĻ›āĻŋāĨ¤ (āφāĻŽāϰāĻž āύ⧀āĻšā§‡ āĻāχ āĻ…āĻĒāĻžāϰ⧇āϟāϰ āϏāĻŽā§āĻĒāĻ°ā§āϕ⧇ āφāϰāĻ“ āĻ•āĻĨāĻž āĻŦāϞāĻŦ);
  • default_args={} - dag āϤāĻžāϰ āϏāĻŽāĻ¸ā§āϤ āĻ…āĻĒāĻžāϰ⧇āϟāϰāϕ⧇ āĻāĻ•āχ āφāĻ°ā§āϗ⧁āĻŽā§‡āĻ¨ā§āϟ āĻŦāĻŋāϤāϰāĻŖ āĻ•āϰāϤ⧇ āĻĒāĻžāϰ⧇;
  • to='{{ var.value.all_the_kings_men }}' - āĻ•ā§āώ⧇āĻ¤ā§āϰ to āφāĻŽāĻžāĻĻ⧇āϰ āĻšāĻžāĻ°ā§āĻĄāϕ⧋āĻĄ āĻ•āϰāĻž āĻšāĻŦ⧇ āύāĻž, āϤāĻŦ⧇ āϜāĻŋāύāϜāĻž āĻāĻŦāĻ‚ āχāĻŽā§‡āϞ⧇āϰ āϤāĻžāϞāĻŋāĻ•āĻž āϏāĻš āĻāĻ•āϟāĻŋ āϭ⧇āϰāĻŋāϝāĻŧ⧇āĻŦāϞ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰ⧇ āĻ—āϤāĻŋāĻļā§€āϞāĻ­āĻžāĻŦ⧇ āϤ⧈āϰāĻŋ āĻ•āϰāĻž āĻšāϝāĻŧ⧇āϛ⧇, āϝāĻž āφāĻŽāĻŋ āϏāĻžāĻŦāϧāĻžāύ⧇ āϰ⧇āϖ⧇āĻ›āĻŋ Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - āĻ…āĻĒāĻžāϰ⧇āϟāϰ āĻļ⧁āϰ⧁ āĻ•āϰāĻžāϰ āĻļāĻ°ā§āϤāĨ¤ āφāĻŽāĻžāĻĻ⧇āϰ āĻ•ā§āώ⧇āĻ¤ā§āϰ⧇, āϏāĻŽāĻ¸ā§āϤ āύāĻŋāĻ°ā§āĻ­āϰāϤāĻž āĻ•āĻžāϜ āĻ•āϰāϞ⧇āχ āϚāĻŋāĻ āĻŋāϟāĻŋ āĻŦāϏāĻĻ⧇āϰ āĻ•āĻžāϛ⧇ āϝāĻžāĻŦ⧇ āϏāĻžāĻĢāĻ˛ā§āϝ⧇āϰ āϏāĻžāĻĨ⧇;
  • tg_bot_conn_id='tg_main' - āϝ⧁āĻ•ā§āϤāĻŋ conn_id āφāĻŽāϰāĻž āϝ⧇ āϏāĻ‚āϝ⧋āĻ— āφāχāĻĄāĻŋ āϤ⧈āϰāĻŋ āĻ•āϰāĻŋ āϤāĻž āĻ—ā§āϰāĻšāĻŖ āĻ•āϰāĻŋ Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - āĻĒāϤāĻŋāϤ āĻ•āĻžāϜ āĻĨāĻžāĻ•āϞ⧇āχ āĻŸā§‡āϞāĻŋāĻ—ā§āϰāĻžāĻŽā§‡āϰ āĻŦāĻžāĻ°ā§āϤāĻžāϗ⧁āϞāĻŋ āωāĻĄāĻŧ⧇ āϝāĻžāĻŦ⧇;
  • task_concurrency=1 - āφāĻŽāϰāĻž āĻāĻ•āϟāĻŋ āϟāĻžāĻ¸ā§āϕ⧇āϰ āĻāĻ•āĻžāϧāĻŋāĻ• āϟāĻžāĻ¸ā§āĻ• āχāύāĻ¸ā§āĻŸā§āϝāĻžāĻ¨ā§āϏ⧇āϰ āĻāĻ•āϝ⧋āϗ⧇ āϞāĻžā§āϚ āύāĻŋāώāĻŋāĻĻā§āϧ āĻ•āϰāĻŋ⧎ āϤāĻž āύāĻž āĻšāϞ⧇ āφāĻŽāϰāĻž āĻāĻ•āϏāĻ™ā§āϗ⧇ āĻŦ⧇āĻļ āĻ•āϝāĻŧ⧇āĻ•āϟāĻŋ āϞāĻžā§āϚ āĻĒāĻžāĻŦ VerticaOperator (āĻāĻ•āϟāĻŋ āĻŸā§‡āĻŦāĻŋāϞ⧇āϰ āĻĻāĻŋāϕ⧇ āϤāĻžāĻ•āĻŋāϝāĻŧ⧇);
  • report_update >> [email, tg] - āϏāĻŦ VerticaOperator āϚāĻŋāĻ āĻŋ āĻāĻŦāĻ‚ āĻŦāĻžāĻ°ā§āϤāĻž āĻĒā§āϰ⧇āϰāϪ⧇ āĻāĻ•āĻ¤ā§āϰāĻŋāϤ āĻšāύ, āĻāχāĻ­āĻžāĻŦ⧇:
    āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

    āĻ•āĻŋāĻ¨ā§āϤ⧁ āϝ⧇āĻšā§‡āϤ⧁ āύ⧋āϟāĻŋāĻĢāĻžāϝāĻŧāĻžāϰ āĻ…āĻĒāĻžāϰ⧇āϟāϰāĻĻ⧇āϰ āϞāĻžā§āĻšā§‡āϰ āĻŦāĻŋāĻ­āĻŋāĻ¨ā§āύ āĻļāĻ°ā§āϤ āϰāϝāĻŧ⧇āϛ⧇, āĻļ⧁āϧ⧁āĻŽāĻžāĻ¤ā§āϰ āĻāĻ•āϟāĻŋ āĻ•āĻžāϜ āĻ•āϰāĻŦ⧇āĨ¤ āĻŸā§āϰāĻŋ āĻ­āĻŋāωāϤ⧇, āϏāĻŦāĻ•āĻŋāϛ⧁ āĻāĻ•āϟ⧁ āĻ•āĻŽ āĻ­āĻŋāĻœā§āϝ⧁āϝāĻŧāĻžāϞ āĻĻ⧇āĻ–āĻžāϝāĻŧ:
    āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

āφāĻŽāĻŋ āϏāĻŽā§āĻĒāĻ°ā§āϕ⧇ āĻ•āϝāĻŧ⧇āĻ•āϟāĻŋ āĻļāĻŦā§āĻĻ āĻŦāϞāĻŦ āĻŽā§āϝāĻžāĻ•ā§āϰ⧋ āĻāĻŦāĻ‚ āϤāĻžāĻĻ⧇āϰ āĻŦāĻ¨ā§āϧ⧁āϰāĻž - āϭ⧇āϰāĻŋāϝāĻŧ⧇āĻŦāϞ.

āĻŽā§āϝāĻžāĻ•ā§āϰ⧋ āĻšāϞ āϜāĻŋāύāϜāĻž āĻĒā§āϞ⧇āϏāĻšā§‹āĻ˛ā§āĻĄāĻžāϰ āϝāĻž āĻ…āĻĒāĻžāϰ⧇āϟāϰ āφāĻ°ā§āϗ⧁āĻŽā§‡āĻ¨ā§āĻŸā§‡ āĻŦāĻŋāĻ­āĻŋāĻ¨ā§āύ āĻĻāϰāĻ•āĻžāϰ⧀ āϤāĻĨā§āϝ āĻĒā§āϰāϤāĻŋāĻ¸ā§āĻĨāĻžāĻĒāύ āĻ•āϰāϤ⧇ āĻĒāĻžāϰ⧇āĨ¤ āωāĻĻāĻžāĻšāϰāĻŖāĻ¸ā§āĻŦāϰ⧂āĻĒ, āĻāχ āĻŽāϤ:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} āĻĒā§āϰāϏāĻ™ā§āĻ— āϭ⧇āϰāĻŋāϝāĻŧ⧇āĻŦāϞ⧇āϰ āĻŦāĻŋāώāϝāĻŧāĻŦāĻ¸ā§āϤ⧁āϤ⧇ āĻĒā§āϰāϏāĻžāϰāĻŋāϤ āĻšāĻŦ⧇ execution_date āĻŦāĻŋāĻ¨ā§āϝāĻžāϏ⧇ YYYY-MM-DD: 2020-07-14. āϏāĻŦāĻšā§‡āϝāĻŧ⧇ āĻ­āĻžāϞ⧋ āĻĻāĻŋāĻ• āĻšāϞ āϝ⧇ āĻĒā§āϰāϏāĻ™ā§āĻ— āϭ⧇āϰāĻŋāϝāĻŧ⧇āĻŦāϞāϗ⧁āϞāĻŋ āĻāĻ•āϟāĻŋ āύāĻŋāĻ°ā§āĻĻāĻŋāĻˇā§āϟ āϟāĻžāĻ¸ā§āĻ• āχāύāĻ¸ā§āĻŸā§āϝāĻžāĻ¨ā§āϏ⧇ (āĻŸā§āϰāĻŋ āĻ­āĻŋāωāϤ⧇ āĻāĻ•āϟāĻŋ āĻŦāĻ°ā§āĻ—āĻ•ā§āώ⧇āĻ¤ā§āϰ) āĻĒ⧇āϰ⧇āĻ• āĻĻ⧇āĻ“āϝāĻŧāĻž āĻšāϝāĻŧ āĻāĻŦāĻ‚ āϝāĻ–āύ āĻĒ⧁āύāϰāĻžāϝāĻŧ āϚāĻžāϞ⧁ āĻ•āϰāĻž āĻšāϝāĻŧ, āĻ¸ā§āĻĨāĻžāύāϧāĻžāϰāĻ•āϗ⧁āϞāĻŋ āĻāĻ•āχ āĻŽāĻžāύāϗ⧁āϞāĻŋāϤ⧇ āĻĒā§āϰāϏāĻžāϰāĻŋāϤ āĻšāĻŦ⧇āĨ¤

āύāĻŋāĻ°ā§āϧāĻžāϰāĻŋāϤ āĻŽāĻžāύāϗ⧁āϞāĻŋ āĻĒā§āϰāϤāĻŋāϟāĻŋ āϟāĻžāĻ¸ā§āĻ• āχāύāĻ¸ā§āĻŸā§āϝāĻžāĻ¨ā§āϏ⧇ āϰ⧇āĻ¨ā§āĻĄāĻžāϰ āĻ•āϰāĻž āĻŦā§‹āϤāĻžāĻŽ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰ⧇ āĻĻ⧇āĻ–āĻž āϝ⧇āϤ⧇ āĻĒāĻžāϰ⧇āĨ¤ āĻāχāĻ­āĻžāĻŦ⧇ āĻāĻ•āϟāĻŋ āϚāĻŋāĻ āĻŋ āĻĒāĻžāĻ āĻžāύ⧋āϰ āĻ•āĻžāϜ:

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

āĻāĻŦāĻ‚ āϤāĻžāχ āĻāĻ•āϟāĻŋ āĻŦāĻžāĻ°ā§āϤāĻž āĻĒāĻžāĻ āĻžāύ⧋āϰ āĻ•āĻžāĻœā§‡:

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

āϏāĻ°ā§āĻŦāĻļ⧇āώ āωāĻĒāϞāĻŦā§āϧ āϏāĻ‚āĻ¸ā§āĻ•āϰāϪ⧇āϰ āϜāĻ¨ā§āϝ āĻ…āĻ¨ā§āϤāĻ°ā§āύāĻŋāĻ°ā§āĻŽāĻŋāϤ āĻŽā§āϝāĻžāĻ•ā§āϰ⧋āϗ⧁āϞāĻŋāϰ āĻāĻ•āϟāĻŋ āϏāĻŽā§āĻĒā§‚āĻ°ā§āĻŖ āϤāĻžāϞāĻŋāĻ•āĻž āĻāĻ–āĻžāύ⧇ āωāĻĒāϞāĻŦā§āϧ: āĻŽā§āϝāĻžāĻ•ā§āϰ⧋ āϰ⧇āĻĢāĻžāϰ⧇āĻ¨ā§āϏ

āϤāĻžāĻ›āĻžāĻĄāĻŧāĻž, āĻĒā§āϞāĻžāĻ—āχāύāϗ⧁āϞāĻŋāϰ āϏāĻžāĻšāĻžāĻ¯ā§āϝ⧇, āφāĻŽāϰāĻž āφāĻŽāĻžāĻĻ⧇āϰ āύāĻŋāϜāĻ¸ā§āĻŦ āĻŽā§āϝāĻžāĻ•ā§āϰ⧋ āĻ˜ā§‹āώāĻŖāĻž āĻ•āϰāϤ⧇ āĻĒāĻžāϰāĻŋ, āϤāĻŦ⧇ āĻāϟāĻŋ āĻ…āĻ¨ā§āϝ āĻ—āĻ˛ā§āĻĒāĨ¤

āĻĒā§‚āĻ°ā§āĻŦāύāĻŋāĻ°ā§āϧāĻžāϰāĻŋāϤ āϜāĻŋāύāĻŋāϏāϗ⧁āϞāĻŋ āĻ›āĻžāĻĄāĻŧāĻžāĻ“, āφāĻŽāϰāĻž āφāĻŽāĻžāĻĻ⧇āϰ āϭ⧇āϰāĻŋāϝāĻŧ⧇āĻŦāϞ⧇āϰ āĻŽāĻžāύāϗ⧁āϞāĻŋāϕ⧇ āĻĒā§āϰāϤāĻŋāĻ¸ā§āĻĨāĻžāĻĒāύ āĻ•āϰāϤ⧇ āĻĒāĻžāϰāĻŋ (āφāĻŽāĻŋ āχāϤāĻŋāĻŽāĻ§ā§āϝ⧇ āωāĻĒāϰ⧇āϰ āϕ⧋āĻĄā§‡ āĻāϟāĻŋ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰ⧇āĻ›āĻŋ)āĨ¤ āĻāϰ āĻŽāĻ§ā§āϝ⧇ āϤ⧈āϰāĻŋ āĻ•āϰāĻž āϝāĻžāĻ• Admin/Variables āĻ•āĻŋāϛ⧁ āϜāĻŋāύāĻŋāϏ:

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

āφāĻĒāύāĻŋ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰāϤ⧇ āĻĒāĻžāϰ⧇āύ āϏāĻŦāĻ•āĻŋāϛ⧁:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

āĻŽāĻžāύ āĻāĻ•āϟāĻŋ āĻ¸ā§āϕ⧇āϞāĻžāϰ āĻšāϤ⧇ āĻĒāĻžāϰ⧇, āĻ…āĻĨāĻŦāĻž āĻāϟāĻŋ JSONāĻ“ āĻšāϤ⧇ āĻĒāĻžāϰ⧇āĨ¤ JSON āĻāϰ āĻ•ā§āώ⧇āĻ¤ā§āϰ⧇:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

āĻļ⧁āϧ⧁ āĻĒāĻ›āĻ¨ā§āĻĻāϏāχ āϕ⧀āϟāĻŋāϰ āĻĒāĻĨāϟāĻŋ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰ⧁āύ: {{ var.json.bot_config.bot.token }}.

āφāĻŽāĻŋ āφāĻ•ā§āώāϰāĻŋāĻ• āĻ…āĻ°ā§āĻĨ⧇ āĻāĻ•āϟāĻŋ āĻļāĻŦā§āĻĻ āĻŦāϞāĻŦ āĻāĻŦāĻ‚ āĻāĻ•āϟāĻŋ āĻ¸ā§āĻ•ā§āϰāĻŋāύāĻļāϟ āĻĻ⧇āĻ–āĻžāĻŦ āϏāĻ‚āϝ⧋āĻ—. āϏāĻŦāĻ•āĻŋāϛ⧁ āĻāĻ–āĻžāύ⧇ āĻĒā§āϰāĻžāĻĨāĻŽāĻŋāĻ•: āĻĒ⧃āĻˇā§āĻ āĻžāϝāĻŧ Admin/Connections āφāĻŽāϰāĻž āĻāĻ•āϟāĻŋ āϏāĻ‚āϝ⧋āĻ— āϤ⧈āϰāĻŋ āĻ•āϰāĻŋ, āϏ⧇āĻ–āĻžāύ⧇ āφāĻŽāĻžāĻĻ⧇āϰ āϞāĻ—āχāύ/āĻĒāĻžāϏāĻ“āϝāĻŧāĻžāĻ°ā§āĻĄ āĻāĻŦāĻ‚ āφāϰāĻ“ āύāĻŋāĻ°ā§āĻĻāĻŋāĻˇā§āϟ āĻĒā§āϝāĻžāϰāĻžāĻŽāĻŋāϟāĻžāϰ āϝ⧋āĻ— āĻ•āϰāĻŋāĨ¤ āĻāϟāĻžāϰ āĻŽāϤ:

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

āĻĒāĻžāϏāĻ“āϝāĻŧāĻžāĻ°ā§āĻĄāϗ⧁āϞāĻŋ āĻāύāĻ•ā§āϰāĻŋāĻĒā§āϟ āĻ•āϰāĻž āϝ⧇āϤ⧇ āĻĒāĻžāϰ⧇ (āĻĄāĻŋāĻĢāĻ˛ā§āĻŸā§‡āϰ āĻšā§‡āϝāĻŧ⧇ āφāϰāĻ“ āĻĒ⧁āĻ™ā§āĻ–āĻžāύ⧁āĻĒ⧁āĻ™ā§āĻ–āĻ­āĻžāĻŦ⧇), āĻ…āĻĨāĻŦāĻž āφāĻĒāύāĻŋ āϏāĻ‚āϝ⧋āϗ⧇āϰ āϧāϰāύāϟāĻŋ āϛ⧇āĻĄāĻŧ⧇ āĻĻāĻŋāϤ⧇ āĻĒāĻžāϰ⧇āύ (āϝ⧇āĻŽāύ āφāĻŽāĻŋ āĻ•āϰ⧇āĻ›āĻŋ tg_main) - āφāϏāϞ āĻŦāĻŋāώāϝāĻŧāϟāĻŋ āĻšāϞ āϝ⧇ āĻĒā§āϰāĻ•āĻžāϰ⧇āϰ āϤāĻžāϞāĻŋāĻ•āĻžāϟāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āĻŽāĻĄā§‡āϞāϗ⧁āϞāĻŋāϤ⧇ āĻšāĻžāĻ°ā§āĻĄāĻ“āϝāĻŧā§āϝāĻžāϰāϝ⧁āĻ•ā§āϤ āĻāĻŦāĻ‚ āϏ⧋āĻ°ā§āϏ āϕ⧋āĻĄāϗ⧁āϞāĻŋāϤ⧇ āύāĻž āĻ—āĻŋāϝāĻŧ⧇ āĻĒā§āϰāϏāĻžāϰāĻŋāϤ āĻ•āϰāĻž āϝāĻžāϝāĻŧ āύāĻž (āϝāĻĻāĻŋ āĻšāĻ āĻžā§Ž āφāĻŽāĻŋ āĻ•āĻŋāϛ⧁ āϗ⧁āĻ—āϞ āύāĻž āĻ•āϰāĻŋ āϤāĻŦ⧇ āĻĻāϝāĻŧāĻž āĻ•āϰ⧇ āφāĻŽāĻžāϕ⧇ āϏāĻ‚āĻļā§‹āϧāύ āĻ•āϰ⧁āύ), āϤāĻŦ⧇ āĻ•āĻŋāϛ⧁āχ āφāĻŽāĻžāĻĻ⧇āϰ āĻ•ā§āϰ⧇āĻĄāĻŋāϟ āĻĒ⧇āϤ⧇ āĻŦāĻžāϧāĻž āĻĻ⧇āĻŦ⧇ āύāĻž āύāĻžāĻŽ

āφāĻĒāύāĻŋ āĻāĻ•āχ āύāĻžāĻŽā§‡āϰ āϏāĻžāĻĨ⧇ āĻŦ⧇āĻļ āĻ•āϝāĻŧ⧇āĻ•āϟāĻŋ āϏāĻ‚āϝ⧋āĻ—āĻ“ āĻ•āϰāϤ⧇ āĻĒāĻžāϰ⧇āύ: āĻāχ āĻ•ā§āώ⧇āĻ¤ā§āϰ⧇, āĻĒāĻĻā§āϧāϤāĻŋ BaseHook.get_connection(), āϝāĻž āφāĻŽāĻžāĻĻ⧇āϰ āύāĻžāĻŽā§‡ āϏāĻ‚āϝ⧋āĻ— āĻĒāĻžāϝāĻŧ, āĻĻ⧇āĻŦ⧇ āĻāϞ⧋āĻŽā§‡āϞ⧋ āĻŦāĻŋāĻ­āĻŋāĻ¨ā§āύ āύāĻžāĻŽāĻ•āϰāĻŖ āĻĨ⧇āϕ⧇ (āϰāĻžāωāĻ¨ā§āĻĄ āϰāĻŦāĻŋāύ āϤ⧈āϰāĻŋ āĻ•āϰāĻž āφāϰāĻ“ āϝ⧌āĻ•ā§āϤāĻŋāĻ• āĻšāĻŦ⧇, āϤāĻŦ⧇ āĻāϟāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āĻŦāĻŋāĻ•āĻžāĻļāĻ•āĻžāϰ⧀āĻĻ⧇āϰ āĻŦāĻŋāĻŦ⧇āϕ⧇āϰ āωāĻĒāϰ āϛ⧇āĻĄāĻŧ⧇ āĻĻ⧇āĻ“āϝāĻŧāĻž āϝāĻžāĻ•)āĨ¤

āϭ⧇āϰāĻŋāϝāĻŧ⧇āĻŦāϞ āĻāĻŦāĻ‚ āϏāĻ‚āϝ⧋āĻ—āϗ⧁āϞāĻŋ āĻ…āĻŦāĻļā§āϝāχ āĻĻ⧁āĻ°ā§āĻĻāĻžāĻ¨ā§āϤ āϏāϰāĻžā§āϜāĻžāĻŽ, āϤāĻŦ⧇ āĻ­āĻžāϰāϏāĻžāĻŽā§āϝ āύāĻž āĻšāĻžāϰāĻžāύ⧋ āϗ⧁āϰ⧁āĻ¤ā§āĻŦāĻĒā§‚āĻ°ā§āĻŖ: āφāĻĒāύāĻžāϰ āĻĒā§āϰāĻŦāĻžāĻšā§‡āϰ āϕ⧋āύ āĻ…āĻ‚āĻļāϗ⧁āϞāĻŋ āφāĻĒāύāĻŋ āϕ⧋āĻĄā§‡ āϏāĻžā§āϚāϝāĻŧ āĻ•āϰ⧇āύ āĻāĻŦāĻ‚ āφāĻĒāύāĻŋ āĻ¸ā§āĻŸā§‹āϰ⧇āĻœā§‡āϰ āϜāĻ¨ā§āϝ āϕ⧋āύ āĻ…āĻ‚āĻļāϗ⧁āϞāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋āϕ⧇ āĻĻ⧇āύ⧎ āĻāĻ•āĻĻāĻŋāϕ⧇, āĻĻā§āϰ⧁āϤ āĻŽāĻžāύ āĻĒāϰāĻŋāĻŦāĻ°ā§āϤāύ āĻ•āϰāĻž āϏ⧁āĻŦāĻŋāϧāĻžāϜāύāĻ• āĻšāϤ⧇ āĻĒāĻžāϰ⧇, āωāĻĻāĻžāĻšāϰāĻŖāĻ¸ā§āĻŦāϰ⧂āĻĒ, āĻāĻ•āϟāĻŋ āĻŽā§‡āχāϞāĻŋāĻ‚ āĻŦāĻ•ā§āϏ, UI āĻāϰ āĻŽāĻžāĻ§ā§āϝāĻŽā§‡āĨ¤ āĻ…āĻ¨ā§āϝāĻĻāĻŋāϕ⧇, āĻāϟāĻŋ āĻāĻ–āύāĻ“ āĻŽāĻžāωāϏ āĻ•ā§āϞāĻŋāϕ⧇ āĻĢāĻŋāϰ⧇ āφāϏāĻž, āϝāĻž āĻĨ⧇āϕ⧇ āφāĻŽāϰāĻž (āφāĻŽāĻŋ) āĻĒāϰāĻŋāĻ¤ā§āϰāĻžāĻŖ āĻĒ⧇āϤ⧇ āĻšā§‡āϝāĻŧ⧇āĻ›āĻŋāϞāĻžāĻŽāĨ¤

āϏāĻ‚āϝ⧋āϗ⧇āϰ āϏāĻžāĻĨ⧇ āĻ•āĻžāϜ āĻ•āϰāĻž āĻ•āĻžāϜāϗ⧁āϞāĻŋāϰ āĻŽāĻ§ā§āϝ⧇ āĻāĻ•āϟāĻŋ āĻšā§āĻ•. āϏāĻžāϧāĻžāϰāĻŖāĻ­āĻžāĻŦ⧇, āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āĻšā§āĻ•āϗ⧁āϞāĻŋ āĻāϟāĻŋāϕ⧇ āϤ⧃āϤ⧀āϝāĻŧ āĻĒāĻ•ā§āώ⧇āϰ āĻĒāϰāĻŋāώ⧇āĻŦāĻž āĻāĻŦāĻ‚ āϞāĻžāχāĻŦā§āϰ⧇āϰāĻŋāϰ āϏāĻžāĻĨ⧇ āϏāĻ‚āϝ⧁āĻ•ā§āϤ āĻ•āϰāĻžāϰ āϜāĻ¨ā§āϝ āĻĒāϝāĻŧ⧇āĻ¨ā§āϟāĨ¤ āϝ⧇āĻŽāύ, JiraHook āϜāĻŋāϰāĻžāϰ āϏāĻžāĻĨ⧇ āϝ⧋āĻ—āĻžāϝ⧋āĻ— āĻ•āϰāĻžāϰ āϜāĻ¨ā§āϝ āφāĻŽāĻžāĻĻ⧇āϰ āϜāĻ¨ā§āϝ āĻāĻ•āϟāĻŋ āĻ•ā§āϞāĻžāϝāĻŧ⧇āĻ¨ā§āϟ āϖ⧁āϞāĻŦ⧇ (āφāĻĒāύāĻŋ āĻ•āĻžāϜāϗ⧁āϞāĻŋ āϏāĻžāĻŽāύ⧇ āĻāĻŦāĻ‚ āĻĒāĻŋāĻ›āύ⧇ āϏāϰāĻžāϤ⧇ āĻĒāĻžāϰ⧇āύ), āĻāĻŦāĻ‚ āĻāϰ āϏāĻžāĻšāĻžāĻ¯ā§āϝ⧇ SambaHook āφāĻĒāύāĻŋ āĻāĻ•āϟāĻŋ āĻ¸ā§āĻĨāĻžāύ⧀āϝāĻŧ āĻĢāĻžāχāϞ āϧāĻžāĻ•ā§āĻ•āĻž āĻĻāĻŋāϤ⧇ āĻĒāĻžāϰ⧇āύ smb-āĻŦāĻŋāĻ¨ā§āĻĻ⧁āĨ¤

āĻ•āĻžāĻ¸ā§āϟāĻŽ āĻ…āĻĒāĻžāϰ⧇āϟāϰ āĻĒāĻžāĻ°ā§āϏāĻŋāĻ‚

āĻāĻŦāĻ‚ āφāĻŽāϰāĻž āĻāϟāĻŋ āĻ•āĻŋāĻ­āĻžāĻŦ⧇ āϤ⧈āϰāĻŋ āĻ•āϰāĻž āĻšāϝāĻŧ āϤāĻž āĻĻ⧇āĻ–āϤ⧇ āĻ•āĻžāĻ›āĻžāĻ•āĻžāĻ›āĻŋ āĻĒ⧇āϝāĻŧ⧇āĻ›āĻŋāϞāĻžāĻŽ TelegramBotSendMessage

āϕ⧋āĻĄ commons/operators.py āĻĒā§āϰāĻ•ā§ƒāϤ āĻ…āĻĒāĻžāϰ⧇āϟāϰ⧇āϰ āϏāĻžāĻĨ⧇:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

āĻāĻ–āĻžāύ⧇, āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋āϤ⧇ āĻ…āĻ¨ā§āϝ āϏāĻŦāĻ•āĻŋāϛ⧁āϰ āĻŽāϤ⧋, āϏāĻŦāĻ•āĻŋāϛ⧁ āϖ⧁āĻŦ āϏāĻšāϜ:

  • āĻĨ⧇āϕ⧇ āωāĻ¤ā§āϤāϰāĻžāϧāĻŋāĻ•āĻžāϰāϏ⧂āĻ¤ā§āϰ⧇ āĻĒā§āϰāĻžāĻĒā§āϤ BaseOperator, āϝāĻž āĻŦ⧇āĻļ āĻ•āϝāĻŧ⧇āĻ•āϟāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋-āύāĻŋāĻ°ā§āĻĻāĻŋāĻˇā§āϟ āϜāĻŋāύāĻŋāϏ āĻĒā§āϰāϝāĻŧā§‹āĻ— āĻ•āϰ⧇ (āφāĻĒāύāĻžāϰ āĻ…āĻŦāϏāϰ⧇āϰ āĻĻāĻŋāϕ⧇ āϤāĻžāĻ•āĻžāύ)
  • āĻ˜ā§‹āώāĻŋāϤ āĻ•ā§āώ⧇āĻ¤ā§āϰ template_fields, āϝ⧇āĻ–āĻžāύ⧇ āϜāĻŋāύāϜāĻž āĻĒā§āϰāĻ•ā§āϰāĻŋāϝāĻŧāĻž āĻ•āϰāĻžāϰ āϜāĻ¨ā§āϝ āĻŽā§āϝāĻžāĻ•ā§āϰ⧋āϗ⧁āϞāĻŋ āϏāĻ¨ā§āϧāĻžāύ āĻ•āϰāĻŦ⧇⧎
  • āϜāĻ¨ā§āϝ āϏāĻ āĻŋāĻ• āϝ⧁āĻ•ā§āϤāĻŋ āϏāĻžāϜāĻŋāϝāĻŧ⧇āϛ⧇āύ __init__(), āϝ⧇āĻ–āĻžāύ⧇ āĻĒā§āϰāϝāĻŧā§‹āϜāύ āϏ⧇āĻ–āĻžāύ⧇ āĻĄāĻŋāĻĢāĻ˛ā§āϟ āϏ⧇āϟ āĻ•āϰ⧁āύāĨ¤
  • āφāĻŽāϰāĻž āĻĒā§‚āĻ°ā§āĻŦāĻĒ⧁āϰ⧁āώ⧇āϰ āϏ⧂āϚāύāĻž āϏāĻŽā§āĻĒāĻ°ā§āϕ⧇āĻ“ āϭ⧁āϞ⧇ āϝāĻžāχāύāĻŋāĨ¤
  • āϏāĻ‚āĻļā§āϞāĻŋāĻˇā§āϟ āĻšā§āĻ• āϖ⧁āϞāϞ⧇āύ TelegramBotHookāĻāϟāĻŋ āĻĨ⧇āϕ⧇ āĻāĻ•āϟāĻŋ āĻ•ā§āϞāĻžāϝāĻŧ⧇āĻ¨ā§āϟ āĻ…āĻŦāĻœā§‡āĻ•ā§āϟ āĻĒ⧇āϝāĻŧ⧇āĻ›āĻŋāĨ¤
  • āĻ“āĻ­āĻžāϰāϰāĻžāχāĻĄ (āĻĒ⧁āύāϰāĻžāϝāĻŧ āϏāĻ‚āĻœā§āĻžāĻžāϝāĻŧāĻŋāϤ) āĻĒāĻĻā§āϧāϤāĻŋ BaseOperator.execute(), āĻ…āĻĒāĻžāϰ⧇āϟāϰ āϚāĻžāϞ⧁ āĻ•āϰāĻžāϰ āϏāĻŽāϝāĻŧ āĻāϞ⧇ āϝ⧇ āĻāϝāĻŧāĻžāϰāĻĢā§‹ āϟ⧁āχāϚ āĻ•āϰāĻŦ⧇ - āĻāϤ⧇ āφāĻŽāϰāĻž āϞāĻ— āχāύ āĻ•āϰāϤ⧇ āϭ⧁āϞ⧇ āĻ—āĻŋāϝāĻŧ⧇ āĻŽā§‚āϞ āĻ•ā§āϰāĻŋāϝāĻŧāĻžāϟāĻŋ āĻŦāĻžāĻ¸ā§āϤāĻŦāĻžāϝāĻŧāύ āĻ•āϰāĻŦāĨ¤ (āφāĻŽāϰāĻž āϞāĻ— āχāύ āĻ•āϰāĻŋ, āϝāĻžāχāĻšā§‹āĻ•, āĻĄāĻžāύ āχāύ stdout и stderr - āĻŦāĻžāϝāĻŧ⧁āĻĒā§āϰāĻŦāĻžāĻš āϏāĻŦāĻ•āĻŋāϛ⧁āϕ⧇ āφāϟāĻ•āĻžāĻŦ⧇, āϏ⧁āĻ¨ā§āĻĻāϰāĻ­āĻžāĻŦ⧇ āĻŽā§‹āĻĄāĻŧāĻžāύ⧋ āĻšāĻŦ⧇, āĻĒā§āϰāϝāĻŧā§‹āϜāύ⧇ āĻāϟāĻŋ āĻĒāϚāĻŋāϝāĻŧ⧇ āĻĢ⧇āϞāĻŦ⧇āĨ¤)

āĻĻ⧇āĻ–āĻž āϝāĻžāĻ• āφāĻŽāĻžāĻĻ⧇āϰ āĻ•āĻŋ āφāϛ⧇ commons/hooks.py. āĻĢāĻžāχāϞ⧇āϰ āĻĒā§āϰāĻĨāĻŽ āĻ…āĻ‚āĻļ, āĻšā§āĻ• āύāĻŋāĻœā§‡āχ:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

āφāĻŽāĻŋ āĻāĻ–āĻžāύ⧇ āϕ⧀ āĻŦā§āϝāĻžāĻ–ā§āϝāĻž āĻ•āϰāĻŦ āϤāĻžāĻ“ āϜāĻžāύāĻŋ āύāĻž, āφāĻŽāĻŋ āĻļ⧁āϧ⧁ āϗ⧁āϰ⧁āĻ¤ā§āĻŦāĻĒā§‚āĻ°ā§āĻŖ āĻĒāϝāĻŧ⧇āĻ¨ā§āϟāϗ⧁āϞāĻŋ āύ⧋āϟ āĻ•āϰāĻŦ:

  • āφāĻŽāϰāĻž āωāĻ¤ā§āϤāϰāĻžāϧāĻŋāĻ•āĻžāϰāϏ⧂āĻ¤ā§āϰ⧇ āĻĒāĻžāχ, āϝ⧁āĻ•ā§āϤāĻŋāϗ⧁āϞāĻŋ āϏāĻŽā§āĻĒāĻ°ā§āϕ⧇ āϚāĻŋāĻ¨ā§āϤāĻž āĻ•āϰāĻŋ - āĻŦ⧇āĻļāĻŋāϰāĻ­āĻžāĻ— āĻ•ā§āώ⧇āĻ¤ā§āϰ⧇āχ āĻāϟāĻŋ āĻāĻ• āĻšāĻŦ⧇: conn_id;
  • āĻ“āĻ­āĻžāϰāϰāĻžāχāĻĄāĻŋāĻ‚ āĻ¸ā§āĻŸā§āϝāĻžāĻ¨ā§āĻĄāĻžāĻ°ā§āĻĄ āĻĒāĻĻā§āϧāϤāĻŋ: āφāĻŽāĻŋ āύāĻŋāĻœā§‡āϕ⧇ āϏ⧀āĻŽāĻŋāϤ āĻ•āϰ⧇āĻ›āĻŋ get_conn(), āϝāĻžāϰ āĻŽāĻ§ā§āϝ⧇ āφāĻŽāĻŋ āύāĻžāĻŽ āĻĻā§āĻŦāĻžāϰāĻž āϏāĻ‚āϝ⧋āϗ⧇āϰ āĻĒāϰāĻžāĻŽāĻŋāϤāĻŋāϗ⧁āϞāĻŋ āĻĒāĻžāχ āĻāĻŦāĻ‚ āϕ⧇āĻŦāϞ āĻŦāĻŋāĻ­āĻžāĻ—āϟāĻŋ āĻĒāĻžāχ extra (āĻāϟāĻŋ āĻāĻ•āϟāĻŋ JSON āĻ•ā§āώ⧇āĻ¤ā§āϰ), āϝ⧇āĻ–āĻžāύ⧇ āφāĻŽāĻŋ (āφāĻŽāĻžāϰ āύāĻŋāĻœā§‡āϰ āύāĻŋāĻ°ā§āĻĻ⧇āĻļ āĻ…āύ⧁āϏāĻžāϰ⧇!) āĻŸā§‡āϞāĻŋāĻ—ā§āϰāĻžāĻŽ āĻŦāϟ āĻŸā§‹āϕ⧇āύ āϰ⧇āϖ⧇āĻ›āĻŋ: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • āφāĻŽāĻŋ āφāĻŽāĻžāĻĻ⧇āϰ āĻāĻ•āϟāĻŋ āωāĻĻāĻžāĻšāϰāĻŖ āϤ⧈āϰāĻŋ TelegramBot, āĻāϟāĻŋ āĻāĻ•āϟāĻŋ āύāĻŋāĻ°ā§āĻĻāĻŋāĻˇā§āϟ āĻŸā§‹āϕ⧇āύ āĻĒā§āϰāĻĻāĻžāύ.

āĻāĻ–āĻžāύ⧇āχ āĻļ⧇āώ. āφāĻĒāύāĻŋ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰ⧇ āĻāĻ•āϟāĻŋ āĻšā§āĻ• āĻĨ⧇āϕ⧇ āĻāĻ•āϟāĻŋ āĻ•ā§āϞāĻžāϝāĻŧ⧇āĻ¨ā§āϟ āĻĒ⧇āϤ⧇ āĻĒāĻžāϰ⧇āύ TelegramBotHook().clent āĻŦāĻž TelegramBotHook().get_conn().

āĻāĻŦāĻ‚ āĻĢāĻžāχāϞ⧇āϰ āĻĻā§āĻŦāĻŋāϤ⧀āϝāĻŧ āĻ…āĻ‚āĻļ, āϝ⧇āĻ–āĻžāύ⧇ āφāĻŽāĻŋ āĻŸā§‡āϞāĻŋāĻ—ā§āϰāĻžāĻŽ REST API-āĻāϰ āϜāĻ¨ā§āϝ āĻāĻ•āϟāĻŋ āĻŽāĻžāχāĻ•ā§āϰ⧋āĻ“āĻ°ā§āϝāĻžāĻĒāĻžāϰ āϤ⧈āϰāĻŋ āĻ•āϰāĻŋ, āϝāĻžāϤ⧇ āĻāĻ•āχ āĻŸā§‡āύ⧇ āύāĻž āφāύ⧇ python-telegram-bot āĻāĻ•āϟāĻŋ āĻĒāĻĻā§āϧāϤāĻŋāϰ āϜāĻ¨ā§āϝ sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

āϏāĻ āĻŋāĻ• āωāĻĒāĻžāϝāĻŧ āĻšāϞ āĻāϟāĻŋ āϏāĻŦ āϝ⧋āĻ— āĻ•āϰāĻž: TelegramBotSendMessage, TelegramBotHook, TelegramBot - āĻĒā§āϞāĻžāĻ—āχāύ⧇, āĻāĻ•āϟāĻŋ āĻĒāĻžāĻŦāϞāĻŋāĻ• āϰāĻŋāĻĒā§‹āϜāĻŋāϟāϰāĻŋ āϰāĻžāϖ⧁āύ āĻāĻŦāĻ‚ āĻ“āĻĒ⧇āύ āϏ⧋āĻ°ā§āϏ⧇ āĻĻāĻŋāύāĨ¤

āϝāĻ–āύ āφāĻŽāϰāĻž āĻāχ āϏāĻŦ āĻ…āĻ§ā§āϝāϝāĻŧāύ āĻ•āϰāĻ›āĻŋāϞāĻžāĻŽ, āφāĻŽāĻžāĻĻ⧇āϰ āϰāĻŋāĻĒā§‹āĻ°ā§āϟ āφāĻĒāĻĄā§‡āϟ āϏāĻĢāϞāĻ­āĻžāĻŦ⧇ āĻŦā§āϝāĻ°ā§āĻĨ āĻšāϝāĻŧ⧇āϛ⧇ āĻāĻŦāĻ‚ āφāĻŽāĻžāϕ⧇ āĻšā§āϝāĻžāύ⧇āϞ⧇ āĻāĻ•āϟāĻŋ āĻ¤ā§āϰ⧁āϟāĻŋ āĻŦāĻžāĻ°ā§āϤāĻž āĻĒāĻžāĻ āĻžāϤ⧇ āϏāĻ•ā§āώāĻŽ āĻšāϝāĻŧ⧇āϛ⧇⧎ āφāĻŽāĻŋ āĻāϟāĻž āϭ⧁āϞ āĻ•āĻŋāύāĻž āĻĻ⧇āĻ–āϤ⧇ āĻšā§‡āĻ• āĻ•āϰāϤ⧇ āϝāĻžāĻšā§āĻ›āĻŋ...

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž
āφāĻŽāĻžāĻĻ⧇āϰ āϕ⧁āϕ⧁āϰ⧇āϰ āĻŽāĻ§ā§āϝ⧇ āĻ•āĻŋāϛ⧁ āϭ⧇āϙ⧇ āϗ⧇āϛ⧇! āĻāϟāĻžāχ āĻ•āĻŋ āφāĻŽāϰāĻž āφāĻļāĻž āĻ•āϰāĻ›āĻŋāϞāĻžāĻŽ āύāĻž? āĻšā§āĻŦāĻšā§ !

āφāĻĒāύāĻŋ āĻĸāĻžāϞāĻž āϝāĻžāĻšā§āϛ⧇āύ?

āϤ⧋āĻŽāĻžāϰ āĻ•āĻŋ āĻŽāύ⧇ āĻšāϝāĻŧ āφāĻŽāĻŋ āĻ•āĻŋāϛ⧁ āĻŽāĻŋāϏ āĻ•āϰ⧇āĻ›āĻŋ? āĻŽāύ⧇ āĻšāĻšā§āϛ⧇ āϤāĻŋāύāĻŋ āĻāϏāĻ•āĻŋāωāĻāϞ āϏāĻžāĻ°ā§āĻ­āĻžāϰ āĻĨ⧇āϕ⧇ āĻ­āĻžāĻ°ā§āϟāĻŋāĻ•āĻžāϝāĻŧ āĻĄā§‡āϟāĻž āĻ¸ā§āĻĨāĻžāύāĻžāĻ¨ā§āϤāϰ āĻ•āϰāĻžāϰ āĻĒā§āϰāϤāĻŋāĻļā§āϰ⧁āϤāĻŋ āĻĻāĻŋāϝāĻŧ⧇āĻ›āĻŋāϞ⧇āύ, āĻāĻŦāĻ‚ āϤāĻžāϰāĻĒāϰ⧇ āϤāĻŋāύāĻŋ āϤāĻž āύāĻŋāϝāĻŧ⧇āĻ›āĻŋāϞ⧇āύ āĻāĻŦāĻ‚ āĻŦāĻŋāώāϝāĻŧāϟāĻŋ āĻĨ⧇āϕ⧇ āϏāϰ⧇ āĻ—āĻŋāϝāĻŧ⧇āĻ›āĻŋāϞ⧇āύ, āĻŦāĻ–āĻžāĻŸā§‡!

āĻāχ āύ⧃āĻļāĻ‚āϏāϤāĻž āχāĻšā§āĻ›āĻžāĻ•ā§ƒāϤ āĻ›āĻŋāϞ, āφāĻŽāĻžāϕ⧇ āϕ⧇āĻŦāϞ āφāĻĒāύāĻžāϰ āϜāĻ¨ā§āϝ āĻ•āĻŋāϛ⧁ āĻĒāϰāĻŋāĻ­āĻžāώāĻž āĻŦā§‹āĻāĻžāϤ⧇ āĻšāϝāĻŧ⧇āĻ›āĻŋāϞāĨ¤ āĻāĻ–āύ āφāĻĒāύāĻŋ āφāϰāĻ“ āϝ⧇āϤ⧇ āĻĒāĻžāϰ⧇āύāĨ¤

āφāĻŽāĻžāĻĻ⧇āϰ āĻĒāϰāĻŋāĻ•āĻ˛ā§āĻĒāύāĻž āĻ›āĻŋāϞ āĻāχ:

  1. āĻĻāĻžāĻ— āĻ•āϰ
  2. āĻ•āĻžāϜ āϤ⧈āϰāĻŋ āĻ•āϰ⧁āύ
  3. āĻĻ⧇āϖ⧁āύ āϏāĻŦāĻ•āĻŋāϛ⧁ āĻ•āϤ āϏ⧁āĻ¨ā§āĻĻāϰ
  4. āĻĒā§‚āϰāĻŖ āĻ•āϰāĻžāϰ āϜāĻ¨ā§āϝ āϏ⧇āĻļāύ āύāĻŽā§āĻŦāϰ āĻŦāϰāĻžāĻĻā§āĻĻ āĻ•āϰ⧁āύ
  5. SQL āϏāĻžāĻ°ā§āĻ­āĻžāϰ āĻĨ⧇āϕ⧇ āĻĄā§‡āϟāĻž āĻĒāĻžāύ
  6. āĻ­āĻžāĻ°ā§āϟāĻŋāĻ•āĻžāϝāĻŧ āĻĄā§‡āϟāĻž āϰāĻžāϖ⧁āύ
  7. āĻĒāϰāĻŋāϏāĻ‚āĻ–ā§āϝāĻžāύ āϏāĻ‚āĻ—ā§āϰāĻš āĻ•āϰ⧁āύ

āϏ⧁āϤāϰāĻžāĻ‚, āĻāχ āϏāĻŦ āĻĒ⧇āϤ⧇ āĻāĻŦāĻ‚ āϚāϞāĻŽāĻžāύ, āφāĻŽāĻŋ āφāĻŽāĻžāĻĻ⧇āϰ āĻāĻ•āϟāĻŋ āϛ⧋āϟ āϏāĻ‚āϝ⧋āϜāύ āĻ•āϰāĻž docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

āϏ⧇āĻ–āĻžāύ⧇ āφāĻŽāϰāĻž āωāĻ¤ā§āĻĨāĻžāĻĒāύ āĻ•āϰāĻŋ:

  • āĻšā§‹āĻ¸ā§āϟ āĻšāĻŋāϏāĻžāĻŦ⧇ āĻ­āĻžāĻ°ā§āϟāĻŋāĻ•āĻž dwh āϏāĻŦāĻšā§‡āϝāĻŧ⧇ āĻĄāĻŋāĻĢāĻ˛ā§āϟ āϏ⧇āϟāĻŋāĻ‚āϏ āϏāĻš,
  • SQL āϏāĻžāĻ°ā§āĻ­āĻžāϰ⧇āϰ āϤāĻŋāύāϟāĻŋ āωāĻĻāĻžāĻšāϰāĻŖ,
  • āφāĻŽāϰāĻž āĻĒāϰāĻŦāĻ°ā§āϤ⧀āϤ⧇ āĻ•āĻŋāϛ⧁ āĻĄā§‡āϟāĻž āĻĻāĻŋāϝāĻŧ⧇ āĻĄā§‡āϟāĻžāĻŦ⧇āϏāϗ⧁āϞāĻŋ āĻĒā§‚āϰāĻŖ āĻ•āϰāĻŋ (āϕ⧋āύ āĻ•ā§āώ⧇āĻ¤ā§āϰ⧇āχ āϤāĻž āĻĻ⧇āĻ–āĻŦ⧇āύ āύāĻž mssql_init.py!)

āφāĻŽāϰāĻž āĻ—āϤāĻŦāĻžāϰ⧇āϰ āĻšā§‡āϝāĻŧ⧇ āĻ•āĻŋāϛ⧁āϟāĻž āϜāϟāĻŋāϞ āĻ•āĻŽāĻžāĻ¨ā§āĻĄā§‡āϰ āϏāĻžāĻšāĻžāĻ¯ā§āϝ⧇ āϏāĻŽāĻ¸ā§āϤ āĻ­āĻžāϞ āϚāĻžāϞ⧁ āĻ•āϰāĻŋ:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

āφāĻŽāĻžāĻĻ⧇āϰ āĻ…āϞ⧌āĻ•āĻŋāĻ• āĻ°ā§āϝāĻžāĻ¨ā§āĻĄāĻŽāĻžāχāϜāĻžāϰ āĻ•āĻŋ āϤ⧈āϰāĻŋ āĻ•āϰ⧇āϛ⧇, āφāĻĒāύāĻŋ āφāχāĻŸā§‡āĻŽāϟāĻŋ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰāϤ⧇ āĻĒāĻžāϰ⧇āύ Data Profiling/Ad Hoc Query:

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž
āĻŽā§‚āϞ āϜāĻŋāύāĻŋāϏāϟāĻŋ āĻŦāĻŋāĻļā§āϞ⧇āώāĻ•āĻĻ⧇āϰ āĻ•āĻžāϛ⧇ āĻāϟāĻŋ āĻĻ⧇āĻ–āĻžāύ⧋ āύāϝāĻŧ

āϏāĻŽā§āĻĒā§āϰāϏāĻžāϰāĻŋāϤ ETL āϏ⧇āĻļāύ āφāĻŽāĻŋ āĻ•āϰāĻŦ āύāĻž, āϏ⧇āĻ–āĻžāύ⧇ āϏāĻŦāĻ•āĻŋāϛ⧁āχ āϤ⧁āĻšā§āĻ›: āφāĻŽāϰāĻž āĻāĻ•āϟāĻŋ āĻ­āĻŋāĻ¤ā§āϤāĻŋ āϤ⧈āϰāĻŋ āĻ•āϰāĻŋ, āĻāϤ⧇ āĻāĻ•āϟāĻŋ āϚāĻŋāĻšā§āύ āϰāϝāĻŧ⧇āϛ⧇, āφāĻŽāϰāĻž āĻāĻ•āϟāĻŋ āĻĒā§āϰāϏāĻ™ā§āĻ— āĻĒāϰāĻŋāϚāĻžāϞāϕ⧇āϰ āϏāĻžāĻĨ⧇ āϏāĻŦāĻ•āĻŋāϛ⧁ āĻŽā§āĻĄāĻŧ⧇ āĻĢ⧇āϞāĻŋ āĻāĻŦāĻ‚ āĻāĻ–āύ āφāĻŽāϰāĻž āĻāϟāĻŋ āĻ•āϰāĻŋ:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

āϏāĻŽāϝāĻŧ āĻāϏ⧇āϛ⧇ āφāĻŽāĻžāĻĻ⧇āϰ āϤāĻĨā§āϝ āϏāĻ‚āĻ—ā§āϰāĻš āĻ•āϰ⧁āύ āφāĻŽāĻžāĻĻ⧇āϰ āĻĻ⧇āĻĄāĻŧ āĻļ āĻŸā§‡āĻŦāĻŋāϞ āĻĨ⧇āϕ⧇āĨ¤ āφāϏ⧁āύ āϖ⧁āĻŦ āύāϜāĻŋāϰāĻŦāĻŋāĻšā§€āύ āϞāĻžāχāύ⧇āϰ āϏāĻžāĻšāĻžāĻ¯ā§āϝ⧇ āĻāϟāĻŋ āĻ•āϰāĻŋ:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. āĻāĻ•āϟāĻŋ āĻšā§āϕ⧇āϰ āϏāĻžāĻšāĻžāĻ¯ā§āϝ⧇ āφāĻŽāϰāĻž āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āĻĨ⧇āϕ⧇ āĻĒāĻžāχ pymssql- āϏāĻ‚āϝ⧋āĻ— āĻ•āϰ⧁āύ
  2. āφāϏ⧁āύ āĻ…āύ⧁āϰ⧋āϧ⧇ āϤāĻžāϰāĻŋāϖ⧇āϰ āφāĻ•āĻžāϰ⧇ āĻāĻ•āϟāĻŋ āϏ⧀āĻŽāĻžāĻŦāĻĻā§āϧāϤāĻž āĻĒā§āϰāϤāĻŋāĻ¸ā§āĻĨāĻžāĻĒāύ āĻ•āϰāĻŋ - āĻāϟāĻŋ āĻŸā§‡āĻŽāĻĒā§āϞ⧇āϟ āχāĻžā§āϜāĻŋāύ āĻĻā§āĻŦāĻžāϰāĻž āĻĢāĻžāĻ‚āĻļāύ⧇ āύāĻŋāĻ•ā§āώ⧇āĻĒ āĻ•āϰāĻž āĻšāĻŦ⧇āĨ¤
  3. āφāĻŽāĻžāĻĻ⧇āϰ āĻ…āύ⧁āϰ⧋āϧ āĻ–āĻžāĻ“āϝāĻŧāĻžāύ⧋ pandasāϕ⧇ āφāĻŽāĻžāĻĻ⧇āϰ āĻĒāĻžāĻŦ⧇ DataFrame - āĻāϟāĻž āĻ­āĻŦāĻŋāĻˇā§āϝāϤ⧇ āφāĻŽāĻžāĻĻ⧇āϰ āϜāĻ¨ā§āϝ āĻĻāϰāĻ•āĻžāϰ⧀ āĻšāĻŦ⧇.

āφāĻŽāĻŋ āĻĒā§āϰāϤāĻŋāĻ¸ā§āĻĨāĻžāĻĒāύ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰāĻ›āĻŋ {dt} āĻāĻ•āϟāĻŋ āĻ…āύ⧁āϰ⧋āϧ āĻĒāϰāĻžāĻŽāĻŋāϤāĻŋ āĻĒāϰāĻŋāĻŦāĻ°ā§āϤ⧇ %s āĻāχ āϜāĻ¨ā§āϝ āύāϝāĻŧ āϝ⧇ āφāĻŽāĻŋ āĻāĻ•āϜāύ āĻĻ⧁āĻˇā§āϟ āĻĒāĻŋāύ⧋āĻ•āĻŋāĻ“, āĻ•āĻŋāĻ¨ā§āϤ⧁ āĻ•āĻžāϰāĻŖ pandas āϏāĻžāĻŽāϞāĻžāϤ⧇ āĻĒāĻžāϰ⧇ āύāĻž pymssql āĻāĻŦāĻ‚ āĻļ⧇āώ āĻāĻ•āϟāĻŋ āĻ¸ā§āϞāĻŋāĻĒ params: ListāϝāĻĻāĻŋāĻ“ āϏ⧇ āϏāĻ¤ā§āϝāĻŋāχ āϚāĻžāϝāĻŧ tuple.
āĻāĻ›āĻžāĻĄāĻŧāĻžāĻ“ āĻŦāĻŋāĻ•āĻžāĻļāĻ•āĻžāϰ⧀ āύ⧋āϟ āĻ•āϰ⧁āύ pymssql āϤāĻžāϕ⧇ āφāϰ āϏāĻŽāĻ°ā§āĻĨāύ āύāĻž āĻ•āϰāĻžāϰ āϏāĻŋāĻĻā§āϧāĻžāĻ¨ā§āϤ āύāĻŋāϝāĻŧ⧇āϛ⧇, āĻāĻŦāĻ‚ āĻāϟāĻŋ āϏāϰāĻžāύ⧋āϰ āϏāĻŽāϝāĻŧ pyodbc.

āφāϏ⧁āύ āĻĻ⧇āĻ–āĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āφāĻŽāĻžāĻĻ⧇āϰ āĻĢāĻžāĻ‚āĻļāύāϗ⧁āϞāĻŋāϰ āφāĻ°ā§āϗ⧁āĻŽā§‡āĻ¨ā§āϟāϗ⧁āϞāĻŋāϕ⧇ āϕ⧀ āĻĻāĻŋāϝāĻŧ⧇ āĻ¸ā§āϟāĻžāĻĢ āĻ•āϰ⧇āϛ⧇:

āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋: āχāϟāĻŋāĻāϞāϕ⧇ āφāϰāĻ“ āϏāĻšāϜ āĻ•āϰāĻž

āϝāĻĻāĻŋ āϕ⧋āύ āϤāĻĨā§āϝ āύāĻž āĻĨāĻžāϕ⧇, āϤāĻžāĻšāϞ⧇ āϚāĻžāϞāĻŋāϝāĻŧ⧇ āϝāĻžāĻ“āϝāĻŧāĻžāϰ āϕ⧋āύ āĻŽāĻžāύ⧇ āύ⧇āχāĨ¤ āĻ•āĻŋāĻ¨ā§āϤ⧁ āĻāϟāĻŋ āĻĒā§‚āϰāĻŖ āĻ•āϰāĻž āϏāĻĢāϞ āĻŦāĻŋāĻŦ⧇āϚāύāĻž āĻ•āϰāĻžāĻ“ āĻ…āĻĻā§āϭ⧁āϤāĨ¤ āϤāĻŦ⧇ āĻāϟāĻŋ āĻāĻ•āϟāĻŋ āϭ⧁āϞ āύāϝāĻŧāĨ¤ āφ-āφāĻš-āφāĻš, āĻ•āĻŋ āĻ•āϰāĻŦ?! āĻāĻŦāĻ‚ āĻāĻ–āĻžāύ⧇ āĻ•āĻŋ:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException Airflow āϕ⧇ āĻŦāϞāĻŦ⧇ āϝ⧇ āϕ⧋āύ āĻ¤ā§āϰ⧁āϟāĻŋ āύ⧇āχ, āĻ•āĻŋāĻ¨ā§āϤ⧁ āφāĻŽāϰāĻž āĻ•āĻžāϜāϟāĻŋ āĻāĻĄāĻŧāĻŋāϝāĻŧ⧇ āϝāĻžāχāĨ¤ āχāĻ¨ā§āϟāĻžāϰāĻĢ⧇āϏ⧇ āϏāĻŦ⧁āϜ āĻŦāĻž āϞāĻžāϞ āĻŦāĻ°ā§āĻ—āĻ•ā§āώ⧇āĻ¤ā§āϰ āĻĨāĻžāĻ•āĻŦ⧇ āύāĻž, āĻ•āĻŋāĻ¨ā§āϤ⧁ āĻ—ā§‹āϞāĻžāĻĒā§€āĨ¤

āφāϏ⧁āύ āφāĻŽāĻžāĻĻ⧇āϰ āĻĄā§‡āϟāĻž āϟāϏ āĻ•āϰāĻŋ āĻāĻ•āĻžāϧāĻŋāĻ• āĻ•āϞāĻžāĻŽ:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

āϝāĻĨāĻž:

  • āϝ⧇ āĻĄāĻžāϟāĻžāĻŦ⧇āϏ āĻĨ⧇āϕ⧇ āφāĻŽāϰāĻž āĻ…āĻ°ā§āĻĄāĻžāϰ āύāĻŋāϝāĻŧ⧇āĻ›āĻŋāϞāĻžāĻŽ,
  • āφāĻŽāĻžāĻĻ⧇āϰ āĻŦāĻ¨ā§āϝāĻž āϏ⧇āĻļāύ⧇āϰ āφāχāĻĄāĻŋ (āĻāϟāĻŋ āĻ­āĻŋāĻ¨ā§āύ āĻšāĻŦ⧇ āĻĒā§āϰāϤāĻŋāϟāĻŋ āĻ•āĻžāĻœā§‡āϰ āϜāĻ¨ā§āϝ),
  • āωāĻ¤ā§āϏ āĻāĻŦāĻ‚ āĻ…āĻ°ā§āĻĄāĻžāϰ āφāχāĻĄāĻŋ āĻĨ⧇āϕ⧇ āĻāĻ•āϟāĻŋ āĻšā§āϝāĻžāĻļ - āϝāĻžāϤ⧇ āĻšā§‚āĻĄāĻŧāĻžāĻ¨ā§āϤ āĻĄāĻžāϟāĻžāĻŦ⧇āϏ⧇ (āϝ⧇āĻ–āĻžāύ⧇ āϏāĻŦāĻ•āĻŋāϛ⧁ āĻāĻ•āϟāĻŋ āĻŸā§‡āĻŦāĻŋāϞ⧇ āĻĸ⧇āϞ⧇ āĻĻ⧇āĻ“āϝāĻŧāĻž āĻšāϝāĻŧ) āφāĻŽāĻžāĻĻ⧇āϰ āĻāĻ•āϟāĻŋ āĻ…āύāĻ¨ā§āϝ āĻ…āĻ°ā§āĻĄāĻžāϰ āφāχāĻĄāĻŋ āϰāϝāĻŧ⧇āϛ⧇āĨ¤

āĻļ⧇āώ āϧāĻžāĻĒ āĻŦāĻžāĻ•āĻŋ: āĻ­āĻžāĻ°ā§āϟāĻŋāĻ•āĻžāϰ āĻŽāĻ§ā§āϝ⧇ āϏāĻŦāĻ•āĻŋāϛ⧁ āĻĸāĻžāϞāĻž. āĻāĻŦāĻ‚, āĻ…āĻĻā§āϭ⧁āϤāĻ­āĻžāĻŦ⧇ āϝāĻĨ⧇āĻˇā§āϟ, āĻāϟāĻŋ āĻ•āϰāĻžāϰ āϏāĻŦāĻšā§‡āϝāĻŧ⧇ āĻĻāĻ°ā§āĻļāύ⧀āϝāĻŧ āĻāĻŦāĻ‚ āĻ•āĻžāĻ°ā§āϝāĻ•āϰ āωāĻĒāĻžāϝāĻŧāϗ⧁āϞāĻŋāϰ āĻŽāĻ§ā§āϝ⧇ āĻāĻ•āϟāĻŋ āĻšāϞ CSV āĻāϰ āĻŽāĻžāĻ§ā§āϝāĻŽā§‡!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. āφāĻŽāϰāĻž āĻāĻ•āϟāĻŋ āĻŦāĻŋāĻļ⧇āώ āϰāĻŋāϏāĻŋāĻ­āĻžāϰ āϤ⧈āϰāĻŋ āĻ•āϰāĻ›āĻŋ StringIO.
  2. pandas āĻĻāϝāĻŧāĻž āĻ•āϰ⧇ āφāĻŽāĻžāĻĻ⧇āϰ āĻ•āϰāĻž āĻšāĻŦ⧇ DataFrame āφāĻ•āĻžāϰ⧇ CSV-āϞāĻžāχāύ
  3. āφāϏ⧁āύ āĻāĻ•āϟāĻŋ āĻšā§āĻ• āĻĻāĻŋāϝāĻŧ⧇ āφāĻŽāĻžāĻĻ⧇āϰ āĻĒā§āϰāĻŋāϝāĻŧ āĻ­āĻžāĻ°ā§āϟāĻŋāĻ•āĻžāϰ āϏāĻžāĻĨ⧇ āĻāĻ•āϟāĻŋ āϏāĻ‚āϝ⧋āĻ— āϖ⧁āϞāĻŋāĨ¤
  4. āĻāĻŦāĻ‚ āĻāĻ–āύ āϏāĻžāĻšāĻžāĻ¯ā§āϝ⧇āϰ āϏāĻžāĻĨ⧇ copy() āĻ­āĻžāĻ°ā§āϟāĻŋāĻ•āĻžāϝāĻŧ āϏāϰāĻžāϏāϰāĻŋ āφāĻŽāĻžāĻĻ⧇āϰ āĻĄā§‡āϟāĻž āĻĒāĻžāĻ āĻžāύ!

āφāĻŽāϰāĻž āĻĄā§āϰāĻžāχāĻ­āĻžāϰ⧇āϰ āĻ•āĻžāĻ› āĻĨ⧇āϕ⧇ āύ⧇āĻŦ āĻ•āϤāϗ⧁āϞāĻŋ āϞāĻžāχāύ āĻĒā§‚āĻ°ā§āĻŖ āĻšāϝāĻŧ⧇āϛ⧇, āĻāĻŦāĻ‚ āϏ⧇āĻļāύ āĻŽā§āϝāĻžāύ⧇āϜāĻžāϰāϕ⧇ āĻŦāϞāĻŦ āϝ⧇ āϏāĻŦāĻ•āĻŋāϛ⧁ āĻ āĻŋāĻ• āφāϛ⧇:

session.loaded_rows = cursor.rowcount
session.successful = True

āĻāĻ–āĻžāύ⧇āχ āĻļ⧇āώ.

āĻŦāĻŋāĻ•ā§āϰāϝāĻŧ⧇, āφāĻŽāϰāĻž āĻŽā§āϝāĻžāύ⧁āϝāĻŧāĻžāϞāĻŋ āϟāĻžāĻ°ā§āϗ⧇āϟ āĻĒā§āϞ⧇āϟ āϤ⧈āϰāĻŋ āĻ•āϰāĻŋāĨ¤ āĻāĻ–āĻžāύ⧇ āφāĻŽāĻŋ āύāĻŋāĻœā§‡āϕ⧇ āĻāĻ•āϟāĻŋ āϛ⧋āϟ āĻŽā§‡āĻļāĻŋāύ⧇āϰ āĻ…āύ⧁āĻŽāϤāĻŋ āĻĻāĻŋāϞāĻžāĻŽ:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

āφāĻŽāĻŋ āĻŦā§āϝāĻžāĻŦāĻšāĻžāϰ āĻ•āϰāĻ›āĻŋ VerticaOperator() āφāĻŽāĻŋ āĻāĻ•āϟāĻŋ āĻĄāĻžāϟāĻžāĻŦ⧇āϏ āĻ¸ā§āĻ•āĻŋāĻŽāĻž āĻāĻŦāĻ‚ āĻāĻ•āϟāĻŋ āĻŸā§‡āĻŦāĻŋāϞ āϤ⧈āϰāĻŋ āĻ•āϰāĻŋ (āϝāĻĻāĻŋ āϏ⧇āϗ⧁āϞāĻŋ āχāϤāĻŋāĻŽāĻ§ā§āϝ⧇ āĻŦāĻŋāĻĻā§āϝāĻŽāĻžāύ āύāĻž āĻĨāĻžāϕ⧇, āĻ…āĻŦāĻļā§āϝāχ)āĨ¤ āĻĒā§āϰāϧāĻžāύ āϜāĻŋāύāĻŋāϏ āϏāĻ āĻŋāĻ•āĻ­āĻžāĻŦ⧇ āύāĻŋāĻ°ā§āĻ­āϰāϤāĻž āĻŦā§āϝāĻŦāĻ¸ā§āĻĨāĻž āĻ•āϰāĻž āĻšāϝāĻŧ:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

āĻŦ⧁āĻĻā§āϧāĻŋāĻŽāĻžāύ

- āφāĻšā§āĻ›āĻž, - āϛ⧋āϟ āχāρāĻĻ⧁āϰ āĻŦāϞāϞ, - āĻāĻ–āύ āϤāĻžāχ āύāĻž
āφāĻĒāύāĻŋ āĻ•āĻŋ āύāĻŋāĻļā§āϚāĻŋāϤ āϝ⧇ āφāĻŽāĻŋ āĻŦāύ⧇āϰ āϏāĻŦāĻšā§‡āϝāĻŧ⧇ āĻ­āϝāĻŧāĻ™ā§āĻ•āϰ āĻĒā§āϰāĻžāĻŖā§€?

āϜ⧁āϞāĻŋāϝāĻŧāĻž āĻĄā§‹āύāĻžāĻ˛ā§āĻĄāϏāύ, āĻĻā§āϝ āĻ—ā§āϰ⧁āĻĢāĻžāϞ⧋

āφāĻŽāĻŋ āĻŽāύ⧇ āĻ•āϰāĻŋ āϝāĻĻāĻŋ āφāĻŽāĻžāϰ āϏāĻšāĻ•āĻ°ā§āĻŽā§€ āĻāĻŦāĻ‚ āφāĻŽāĻžāϰ āĻŽāĻ§ā§āϝ⧇ āĻāĻ•āϟāĻŋ āĻĒā§āϰāϤāĻŋāϝ⧋āĻ—āĻŋāϤāĻž āĻĨāĻžāĻ•āϤ: āϕ⧇ āĻĻā§āϰ⧁āϤ āĻ¸ā§āĻ•ā§āĻ°ā§āϝāĻžāϚ āĻĨ⧇āϕ⧇ āĻāĻ•āϟāĻŋ ETL āĻĒā§āϰāĻ•ā§āϰāĻŋāϝāĻŧāĻž āϤ⧈āϰāĻŋ āĻ•āϰāĻŦ⧇ āĻāĻŦāĻ‚ āϚāĻžāϞ⧁ āĻ•āϰāĻŦ⧇: āϤāĻžāϰāĻž āϤāĻžāĻĻ⧇āϰ SSIS āĻāĻŦāĻ‚ āĻāĻ•āϟāĻŋ āĻŽāĻžāωāϏ āĻāĻŦāĻ‚ āφāĻŽāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āϏāĻš ... āĻāĻŦāĻ‚ āϤāĻžāϰāĻĒāϰ⧇ āφāĻŽāϰāĻž āϰāĻ•ā§āώāĻŖāĻžāĻŦ⧇āĻ•ā§āώāϪ⧇āϰ āϏāĻšāϜāϤāĻžāϰ āϏāĻžāĻĨ⧇ āϤ⧁āϞāύāĻž āĻ•āϰāĻŦ ... āĻŦāĻžāĻš, āφāĻŽāĻŋ āĻŽāύ⧇ āĻ•āϰāĻŋ āφāĻĒāύāĻŋ āϏāĻŽā§āĻŽāϤ āĻšāĻŦ⧇āύ āϝ⧇ āφāĻŽāĻŋ āϤāĻžāĻĻ⧇āϰ āϏāĻŦ āĻĢā§āϰāĻ¨ā§āĻŸā§‡ āĻĒāϰāĻžāϜāĻŋāϤ āĻ•āϰāĻŦ!

āϝāĻĻāĻŋ āĻāĻ•āϟ⧁ āĻŦ⧇āĻļāĻŋ āϗ⧁āϰ⧁āĻ¤ā§āĻŦ āϏāĻšāĻ•āĻžāϰ⧇, āϤāĻŦ⧇ āĻ…ā§āϝāĻžāĻĒāĻžāϚāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ - āĻĒā§āϰ⧋āĻ—ā§āϰāĻžāĻŽ āϕ⧋āĻĄ āφāĻ•āĻžāϰ⧇ āĻĒā§āϰāĻ•ā§āϰāĻŋāϝāĻŧāĻžāϗ⧁āϞāĻŋ āĻŦāĻ°ā§āĻŖāύāĻž āĻ•āϰ⧇ - āφāĻŽāĻžāϰ āĻ•āĻžāϜ āĻ•āϰ⧇āϛ⧇ āĻ…āϧāĻŋāĻ• āφāϰ⧋ āφāϰāĻžāĻŽāĻĻāĻžāϝāĻŧāĻ• āĻāĻŦāĻ‚ āωāĻĒāĻ­ā§‹āĻ—ā§āϝāĨ¤

āĻĒā§āϞāĻžāĻ—-āχāύ āĻāĻŦāĻ‚ āĻ¸ā§āϕ⧇āϞ⧇āĻŦāĻŋāϞāĻŋāϟāĻŋāϰ āĻĒā§āϰāĻŦāĻŖāϤāĻž āωāĻ­āϝāĻŧ āĻ•ā§āώ⧇āĻ¤ā§āϰ⧇āχ āĻāϰ āϏ⧀āĻŽāĻžāĻšā§€āύ āĻāĻ•ā§āϏāĻŸā§‡āύāϏāĻŋāĻŦāĻŋāϞāĻŋāϟāĻŋ, āφāĻĒāύāĻžāϕ⧇ āĻĒā§āϰāĻžāϝāĻŧ āϝ⧇āϕ⧋āύ⧋ āĻ•ā§āώ⧇āĻ¤ā§āϰ⧇āχ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰāĻžāϰ āϏ⧁āϝ⧋āĻ— āĻĻ⧇āϝāĻŧ: āĻāĻŽāύāĻ•āĻŋ āĻĄā§‡āϟāĻž āϏāĻ‚āĻ—ā§āϰāĻš, āĻĒā§āϰāĻ¸ā§āϤ⧁āϤ āĻ“ āĻĒā§āϰāĻ•ā§āϰāĻŋāϝāĻŧāĻžāĻ•āϰāϪ⧇āϰ āϏāĻŽā§āĻĒā§‚āĻ°ā§āĻŖ āϚāĻ•ā§āϰ⧇, āĻāĻŽāύāĻ•āĻŋ āϰāϕ⧇āϟ āĻ‰ā§ŽāĻ•ā§āώ⧇āĻĒāϪ⧇āϰ āĻ•ā§āώ⧇āĻ¤ā§āϰ⧇āĻ“ (āĻŽāĻ™ā§āĻ—āϞ āĻ—ā§āϰāĻšā§‡, āĻāϰ āĻ…āĻŦāĻļā§āϝāχ)āĨ¤

āĻĒāĻžāĻ°ā§āϟ āĻĢāĻžāχāύāĻžāϞ, āϰ⧇āĻĢāĻžāϰ⧇āĻ¨ā§āϏ āĻāĻŦāĻ‚ āϤāĻĨā§āϝ

āφāĻŽāϰāĻž āφāĻĒāύāĻžāϰ āϜāĻ¨ā§āϝ āϏāĻ‚āĻ—ā§āϰāĻš āĻ•āϰ⧇āĻ›āĻŋ āϰ⧇āĻ•

  • start_date. āĻšā§āϝāĻžāρ, āĻāϟāĻŋ āχāϤāĻŋāĻŽāĻ§ā§āϝ⧇ āĻāĻ•āϟāĻŋ āĻ¸ā§āĻĨāĻžāύ⧀āϝāĻŧ āĻŽā§‡āĻŽāĨ¤ āĻĄāĻ— āĻāϰ āĻĒā§āϰāϧāĻžāύ āϝ⧁āĻ•ā§āϤāĻŋ āĻŽāĻžāĻ§ā§āϝāĻŽā§‡ start_date āϏāĻŦ āĻĒāĻžāϏ. āϏāĻ‚āĻ•ā§āώ⧇āĻĒ⧇, āϝāĻĻāĻŋ āφāĻĒāύāĻŋ āωāĻ˛ā§āϞ⧇āĻ– āĻ•āϰ⧇āύ start_date āĻŦāĻ°ā§āϤāĻŽāĻžāύ āϤāĻžāϰāĻŋāĻ–, āĻāĻŦāĻ‚ schedule_interval - āĻāĻ•āĻĻāĻŋāύ, āϤāĻžāĻšāϞ⧇ āĻĄāĻŋāĻāϜāĻŋ āĻ•āĻžāϞ āĻļ⧁āϰ⧁ āĻšāĻŦ⧇ āφāϗ⧇ āύāĻžāĨ¤
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    āĻāĻŦāĻ‚ āφāϰ āϕ⧋āύ āϏāĻŽāĻ¸ā§āϝāĻž āύ⧇āχāĨ¤

    āĻāϰ āϏāĻžāĻĨ⧇ āϝ⧁āĻ•ā§āϤ āφāϰ⧇āĻ•āϟāĻŋ āϰāĻžāύāϟāĻžāχāĻŽ āĻ¤ā§āϰ⧁āϟāĻŋ āϰāϝāĻŧ⧇āϛ⧇: Task is missing the start_date parameter, āϝāĻž āĻĒā§āϰāĻžāϝāĻŧāĻļāχ āύāĻŋāĻ°ā§āĻĻ⧇āĻļ āĻ•āϰ⧇ āϝ⧇ āφāĻĒāύāĻŋ dag āĻ…āĻĒāĻžāϰ⧇āϟāϰ⧇āϰ āϏāĻžāĻĨ⧇ āφāĻŦāĻĻā§āϧ āĻ•āϰāϤ⧇ āϭ⧁āϞ⧇ āϗ⧇āϛ⧇āύāĨ¤

  • āϏāĻŦ āĻāĻ• āĻŽā§‡āĻļāĻŋāύ⧇āĨ¤ āĻšā§āϝāĻžāρ, āĻāĻŦāĻ‚ āϘāĻžāρāϟāĻŋ (āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āύāĻŋāĻœā§‡āχ āĻāĻŦāĻ‚ āφāĻŽāĻžāĻĻ⧇āϰ āφāĻŦāϰāĻŖ), āĻāĻŦāĻ‚ āĻāĻ•āϟāĻŋ āĻ“āϝāĻŧ⧇āĻŦ āϏāĻžāĻ°ā§āĻ­āĻžāϰ, āĻāĻŦāĻ‚ āĻāĻ•āϟāĻŋ āϏāĻŽāϝāĻŧāϏ⧂āĻšā§€, āĻāĻŦāĻ‚ āĻ•āĻ°ā§āĻŽā§€āϰāĻžāĨ¤ āĻāĻŦāĻ‚ āĻāϟāĻž āĻāĻŽāύāĻ•āĻŋ āĻ•āĻžāϜ. āĻ•āĻŋāĻ¨ā§āϤ⧁ āϏāĻŽāϝāĻŧ⧇āϰ āϏāĻžāĻĨ⧇ āϏāĻžāĻĨ⧇, āĻĒāϰāĻŋāώ⧇āĻŦāĻžāϗ⧁āϞāĻŋāϰ āϜāĻ¨ā§āϝ āĻ•āĻžāĻœā§‡āϰ āϏāĻ‚āĻ–ā§āϝāĻž āĻŦāĻžāĻĄāĻŧāϤ⧇ āĻĨāĻžāϕ⧇, āĻāĻŦāĻ‚ āϝāĻ–āύ PostgreSQL 20 ms āĻāϰ āĻĒāϰāĻŋāĻŦāĻ°ā§āϤ⧇ 5 āϏ⧇āϕ⧇āĻ¨ā§āĻĄā§‡ āϏ⧂āϚāϕ⧇ āϏāĻžāĻĄāĻŧāĻž āĻĻāĻŋāϤ⧇ āĻļ⧁āϰ⧁ āĻ•āϰ⧇, āϤāĻ–āύ āφāĻŽāϰāĻž āĻāϟāĻŋ āύāĻŋāϝāĻŧ⧇ āĻ—āĻŋāϝāĻŧ⧇āĻ›āĻŋāϞāĻžāĻŽ āĻāĻŦāĻ‚ āϏāϰāĻŋāϝāĻŧ⧇ āύāĻŋāϝāĻŧ⧇āĻ›āĻŋāϞāĻžāĻŽāĨ¤
  • āϞ⧋āĻ•āĻžāϞ āĻāĻ•ā§āϏāĻŋāĻ•āĻŋāωāϟāϰāĨ¤ āĻšā§āϝāĻžāρ, āφāĻŽāϰāĻž āĻāĻ–āύāĻ“ āĻāϟāĻŋāϰ āωāĻĒāϰ āĻŦāϏ⧇ āφāĻ›āĻŋ āĻāĻŦāĻ‚ āφāĻŽāϰāĻž āχāϤāĻŋāĻŽāĻ§ā§āϝ⧇ āĻ…āϤāϞ āĻ—āĻšā§āĻŦāϰ⧇āϰ āϧāĻžāϰ⧇ āϚāϞ⧇ āĻāϏ⧇āĻ›āĻŋāĨ¤ LocalExecutor āĻāĻ–āύ āĻĒāĻ°ā§āϝāĻ¨ā§āϤ āφāĻŽāĻžāĻĻ⧇āϰ āϜāĻ¨ā§āϝ āϝāĻĨ⧇āĻˇā§āϟ āĻ›āĻŋāϞ, āĻ•āĻŋāĻ¨ā§āϤ⧁ āĻāĻ–āύ āϏāĻŽāϝāĻŧ āĻāϏ⧇āϛ⧇ āĻ…āĻ¨ā§āϤāϤ āĻāĻ•āϜāύ āĻ•āĻ°ā§āĻŽā§€ āύāĻŋāϝāĻŧ⧇ āĻĒā§āϰāϏāĻžāϰāĻŋāϤ āĻ•āϰāĻžāϰ, āĻāĻŦāĻ‚ CeleryExecutor-āĻ āϝāĻžāĻ“āϝāĻŧāĻžāϰ āϜāĻ¨ā§āϝ āφāĻŽāĻžāĻĻ⧇āϰ āĻ•āĻ ā§‹āϰ āĻĒāϰāĻŋāĻļā§āϰāĻŽ āĻ•āϰāϤ⧇ āĻšāĻŦ⧇āĨ¤ āĻāĻŦāĻ‚ āφāĻĒāύāĻŋ āĻāϟāĻŋāϰ āϏāĻžāĻĨ⧇ āĻāĻ•āϟāĻŋ āĻŽā§‡āĻļāĻŋāύ⧇ āĻ•āĻžāϜ āĻ•āϰāϤ⧇ āĻĒāĻžāϰ⧇āύ āĻāχ āĻŦāĻŋāώāϝāĻŧāϟāĻŋāϰ āĻĒāϰāĻŋāĻĒā§āϰ⧇āĻ•ā§āώāĻŋāϤ⧇, āϕ⧋āύāĻ“ āϏāĻžāĻ°ā§āĻ­āĻžāϰ⧇āĻ“ āϏ⧇āϞāĻžāϰāĻŋ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰāϤ⧇ āφāĻĒāύāĻžāϕ⧇ āĻŦāĻžāϧāĻž āĻĻ⧇āϝāĻŧ āύāĻž, āϝāĻž "āĻ…āĻŦāĻļā§āϝāχ, āϏāϤāϤāĻžāϰ āϏāĻžāĻĨ⧇ āĻ•āĻ–āύāĻ“ āωāĻ¤ā§āĻĒāĻžāĻĻāύ⧇ āϝāĻžāĻŦ⧇ āύāĻž!"
  • āĻ…-āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ…āĻ¨ā§āϤāĻ°ā§āύāĻŋāĻ°ā§āĻŽāĻŋāϤ āϏāϰāĻžā§āϜāĻžāĻŽ:
    • āϏāĻ‚āϝ⧋āĻ— āĻĒāϰāĻŋāώ⧇āĻŦāĻžāϰ āĻļāĻ‚āϏāĻžāĻĒāĻ¤ā§āϰ āϏāĻ‚āϰāĻ•ā§āώāĻŖ āĻ•āϰāϤ⧇,
    • SLA āĻŽāĻŋāϏ āϝ⧇ āĻ•āĻžāϜāϗ⧁āϞ⧋ āϏāĻŽāϝāĻŧāĻŽāϤ⧋ āĻšāϝāĻŧāύāĻŋ āϏ⧇āϗ⧁āϞ⧋āϰ āϜāĻŦāĻžāĻŦ āĻĻāĻŋāϤ⧇,
    • xcom āĻŽā§‡āϟāĻžāĻĄā§‡āϟāĻž āĻŦāĻŋāύāĻŋāĻŽāϝāĻŧ⧇āϰ āϜāĻ¨ā§āϝ (āφāĻŽāĻŋ āĻŦāϞāϞāĻžāĻŽ āĻŽā§‡āϟāĻžāĻĄā§‡āϟāĻž!) āĻĄā§‡āĻ— āĻ•āĻžāĻœā§‡āϰ āĻŽāĻ§ā§āϝ⧇āĨ¤
  • āĻŽā§‡āϞ āĻ…āĻĒāĻŦā§āϝāĻŦāĻšāĻžāϰ. āĻ“āϝāĻŧ⧇āϞ āφāĻŽāĻŋ āĻ•āĻŋ āĻŦāϞāϤ⧇ āĻĒāĻžāϰ⧇āύ? āĻĒāϤāĻŋāϤ āĻ•āĻžāĻœā§‡āϰ āĻĒ⧁āύāϰāĻžāĻŦ⧃āĻ¤ā§āϤāĻŋāϰ āϜāĻ¨ā§āϝ āϏāϤāĻ°ā§āĻ•āϤāĻž āϏ⧇āϟ āφāĻĒ āĻ•āϰāĻž āĻšāϝāĻŧ⧇āĻ›āĻŋāϞāĨ¤ āĻāĻ–āύ āφāĻŽāĻžāϰ āĻ•āĻžāϜ Gmail-āĻ Airflow āĻĨ⧇āϕ⧇ >90k āχāĻŽā§‡āϞ āφāϛ⧇, āĻāĻŦāĻ‚ āĻ“āϝāĻŧ⧇āĻŦ āĻŽā§‡āχāϞ ​​āĻŽāĻœā§‡āϞ āĻāĻ•āĻŦāĻžāϰ⧇ 100 āϟāĻŋāϰāĻ“ āĻŦ⧇āĻļāĻŋ āĻĒāĻŋāĻ• āφāĻĒ āĻ•āϰāϤ⧇ āĻāĻŦāĻ‚ āĻŽā§āĻ›āϤ⧇ āĻ…āĻ¸ā§āĻŦā§€āĻ•āĻžāϰ āĻ•āϰ⧇āĨ¤

āφāϰ⧋ āĻ•ā§āώāϤāĻŋ: Apache Airflow Pitfails

āφāϰāĻ“ āĻ…āĻŸā§‹āĻŽā§‡āĻļāύ āϟ⧁āϞ

āφāĻŽāĻžāĻĻ⧇āϰ āĻšāĻžāϤ āĻĻāĻŋāϝāĻŧ⧇ āύāϝāĻŧ āφāĻŽāĻžāĻĻ⧇āϰ āĻŽāĻžāĻĨāĻž āĻĻāĻŋāϝāĻŧ⧇ āφāϰāĻ“ āĻŦ⧇āĻļāĻŋ āĻ•āĻžāϜ āĻ•āϰāĻžāϰ āϜāĻ¨ā§āϝ, āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āφāĻŽāĻžāĻĻ⧇āϰ āϜāĻ¨ā§āϝ āĻāϟāĻŋ āĻĒā§āϰāĻ¸ā§āϤ⧁āϤ āĻ•āϰ⧇āϛ⧇:

  • āĻŦāĻŋāĻļā§āϰāĻžāĻŽ āĻāĻĒāĻŋāφāχ - āϤāĻžāϰ āĻāĻ–āύāĻ“ āĻĒāϰ⧀āĻ•ā§āώāĻžāĻŽā§‚āϞāĻ• āĻŽāĻ°ā§āϝāĻžāĻĻāĻž āϰāϝāĻŧ⧇āϛ⧇, āϝāĻž āϤāĻžāϕ⧇ āĻ•āĻžāϜ āĻ•āϰāϤ⧇ āĻŦāĻžāϧāĻž āĻĻ⧇āϝāĻŧ āύāĻžāĨ¤ āĻāϟāĻŋāϰ āϏāĻžāĻšāĻžāĻ¯ā§āϝ⧇, āφāĻĒāύāĻŋ āĻļ⧁āϧ⧁āĻŽāĻžāĻ¤ā§āϰ āĻĄā§āϝāĻžāĻ— āĻāĻŦāĻ‚ āĻ•āĻžāϜ āϏāĻŽā§āĻĒāĻ°ā§āϕ⧇ āϤāĻĨā§āϝ āĻĒ⧇āϤ⧇ āĻĒāĻžāϰ⧇āύ āύāĻž, āϤāĻŦ⧇ āĻāĻ•āϟāĻŋ āĻĄā§āϝāĻžāĻ— āĻŦāĻ¨ā§āϧ/āĻļ⧁āϰ⧁ āĻ•āϰāϤ⧇, āĻāĻ•āϟāĻŋ DAG āϰāĻžāύ āĻŦāĻž āĻāĻ•āϟāĻŋ āĻĒ⧁āϞ āϤ⧈āϰāĻŋ āĻ•āϰāϤ⧇ āĻĒāĻžāϰ⧇āύ⧎
  • CLI - āĻ•āĻŽāĻžāĻ¨ā§āĻĄ āϞāĻžāχāύ⧇āϰ āĻŽāĻžāĻ§ā§āϝāĻŽā§‡ āĻ…āύ⧇āĻ• āϟ⧁āϞ āĻĒāĻžāĻ“āϝāĻŧāĻž āϝāĻžāϝāĻŧ āϝ⧇āϗ⧁āϞ⧋ āĻļ⧁āϧ⧁ WebUI āĻāϰ āĻŽāĻžāĻ§ā§āϝāĻŽā§‡ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰāĻž āĻ…āϏ⧁āĻŦāĻŋāϧāĻžāϜāύāĻ• āύāϝāĻŧ, āĻ•āĻŋāĻ¨ā§āϤ⧁ āϏāĻžāϧāĻžāϰāĻŖāϤ āĻ…āύ⧁āĻĒāĻ¸ā§āĻĨāĻŋāϤāĨ¤ āωāĻĻāĻžāĻšāϰāĻŖ āĻ¸ā§āĻŦāϰ⧂āĻĒ:
    • backfill āϟāĻžāĻ¸ā§āĻ• āĻĻ⧃āĻˇā§āϟāĻžāĻ¨ā§āϤ āĻĒ⧁āύāϰāĻžāϝāĻŧ āφāϰāĻŽā§āĻ­ āĻ•āϰāϤ⧇ āĻĒā§āϰāϝāĻŧā§‹āϜāύ.
      āωāĻĻāĻžāĻšāϰāĻŖ āĻ¸ā§āĻŦāϰ⧂āĻĒ, āĻŦāĻŋāĻļā§āϞ⧇āώāĻ•āϰāĻž āĻāϏ⧇ āĻŦāϞāϞ⧇āύ: “āĻāĻŦāĻ‚ āĻ•āĻŽāϰ⧇āĻĄ, āϜāĻžāύ⧁āϝāĻŧāĻžāϰ⧀ 1 āĻĨ⧇āϕ⧇ 13 āϤāĻžāϰāĻŋāϖ⧇āϰ āĻĄā§‡āϟāĻžāϤ⧇ āφāĻĒāύāĻŋ āφāĻœā§‡āĻŦāĻžāĻœā§‡ āĻ•āĻĨāĻž āĻŦāϞ⧇āϛ⧇āύ! āĻ āĻŋāĻ• āĻ•āϰ, āĻ āĻŋāĻ• āĻ•āϰ, āĻ āĻŋāĻ• āĻ•āϰ, āĻ āĻŋāĻ• āĻ•āϰ!" āĻāĻŦāĻ‚ āφāĻĒāύāĻŋ āϝ⧇āĻŽāύ āĻāĻ•āϟāĻŋ āĻšāĻŦ:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • āĻŦ⧇āϏ āϏāĻžāĻ°ā§āĻ­āĻŋāϏ: initdb, resetdb, upgradedb, checkdb.
    • run, āϝāĻž āφāĻĒāύāĻžāϕ⧇ āĻāĻ•āϟāĻŋ āχāύāĻ¸ā§āĻŸā§āϝāĻžāĻ¨ā§āϏ āϟāĻžāĻ¸ā§āĻ• āϚāĻžāϞāĻžāύ⧋āϰ āĻāĻŦāĻ‚ āĻāĻŽāύāĻ•āĻŋ āϏāĻŽāĻ¸ā§āϤ āύāĻŋāĻ°ā§āĻ­āϰāϤāĻžāϰ āωāĻĒāϰ āĻ¸ā§āϕ⧋āϰ āĻ•āϰāϤ⧇ āĻĻ⧇āϝāĻŧāĨ¤ āϤāĻžāĻ›āĻžāĻĄāĻŧāĻž, āφāĻĒāύāĻŋ āĻāϰ āĻŽāĻžāĻ§ā§āϝāĻŽā§‡ āĻāϟāĻŋ āϚāĻžāϞāĻžāϤ⧇ āĻĒāĻžāϰ⧇āύ LocalExecutor, āĻāĻŽāύāĻ•āĻŋ āϝāĻĻāĻŋ āφāĻĒāύāĻžāϰ āϏ⧇āϞāĻžāϰāĻŋ āĻ•ā§āϞāĻžāĻ¸ā§āϟāĻžāϰ āĻĨāĻžāϕ⧇āĨ¤
    • āĻĒā§āϰāĻžāϝāĻŧ āĻāĻ•āχ āϜāĻŋāύāĻŋāϏ āĻ•āϰ⧇ test, āĻļ⧁āϧ⧁āĻŽāĻžāĻ¤ā§āϰ āϘāĻžāρāϟāĻŋāϤ⧇āĻ“ āĻ•āĻŋāϛ⧁āχ āϞ⧇āϖ⧇ āύāĻžāĨ¤
    • connections āĻļ⧇āϞ āĻĨ⧇āϕ⧇ āϏāĻ‚āϝ⧋āĻ— āϤ⧈āϰāĻŋ āĻ•āϰāϤ⧇ āĻĻ⧇āϝāĻŧāĨ¤
  • āĻĒāĻžāχāĻĨāύ āĻāĻĒāĻŋāφāχ - āĻŽāĻŋāĻĨāĻ¸ā§āĻ•ā§āϰāĻŋāϝāĻŧāĻž āĻ•āϰāĻžāϰ āĻāĻ•āϟāĻŋ āĻŦāϰāĻ‚ āĻšāĻžāĻ°ā§āĻĄāϕ⧋āϰ āωāĻĒāĻžāϝāĻŧ, āϝāĻž āĻĒā§āϞāĻžāĻ—āχāύāϗ⧁āϞāĻŋāϰ āϜāĻ¨ā§āϝ āωāĻĻā§āĻĻāĻŋāĻˇā§āϟ, āĻāĻŦāĻ‚ āϏāĻžāĻŽāĻžāĻ¨ā§āϝ āĻšāĻžāϤ āĻĻāĻŋāϝāĻŧ⧇ āĻāϤ⧇ āĻāĻžāρāϕ⧁āύāĻŋ āύāĻžāĨ¤ āĻ•āĻŋāĻ¨ā§āϤ⧁ āφāĻŽāĻžāĻĻ⧇āϰ āϝ⧇āϤ⧇ āĻŦāĻžāϧāĻž āĻĻ⧇āĻŦ⧇ āϕ⧇ /home/airflow/dags, āϚāĻžāϞāĻžāύ ipython āĻāĻŦāĻ‚ āϚāĻžāϰāĻĒāĻžāĻļ⧇ āϜāĻ—āĻžāĻ–āĻŋāϚ⧁āĻĄāĻŧāĻŋ āĻļ⧁āϰ⧁? āφāĻĒāύāĻŋ, āωāĻĻāĻžāĻšāϰāĻŖāĻ¸ā§āĻŦāϰ⧂āĻĒ, āύāĻŋāĻŽā§āύāϞāĻŋāĻ–āĻŋāϤ āϕ⧋āĻĄ āϏāĻš āϏāĻŽāĻ¸ā§āϤ āϏāĻ‚āϝ⧋āĻ— āϰāĻĒā§āϤāĻžāύāĻŋ āĻ•āϰāϤ⧇ āĻĒāĻžāϰ⧇āύ:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āĻŽā§‡āϟāĻžāĻĄā§‡āϟāĻžāĻŦ⧇āϏ⧇āϰ āϏāĻžāĻĨ⧇ āϏāĻ‚āϝ⧋āĻ— āĻ•āϰāĻž āĻšāĻšā§āϛ⧇āĨ¤ āφāĻŽāĻŋ āĻāϟāĻŋāϤ⧇ āϞ⧇āĻ–āĻžāϰ āĻĒāϰāĻžāĻŽāĻ°ā§āĻļ āĻĻāĻŋāχ āύāĻž, āϤāĻŦ⧇ āĻŦāĻŋāĻ­āĻŋāĻ¨ā§āύ āύāĻŋāĻ°ā§āĻĻāĻŋāĻˇā§āϟ āĻŽā§‡āĻŸā§āϰāĻŋāĻ•ā§āϏ⧇āϰ āϜāĻ¨ā§āϝ āϟāĻžāĻ¸ā§āĻ• āĻ¸ā§āĻŸā§‡āϟ āĻĒāĻžāĻ“āϝāĻŧāĻž āϝ⧇ āϕ⧋āύāĻ“ API āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰāĻžāϰ āĻšā§‡āϝāĻŧ⧇ āĻ…āύ⧇āĻ• āĻĻā§āϰ⧁āϤ āĻāĻŦāĻ‚ āϏāĻšāϜ āĻšāϤ⧇ āĻĒāĻžāϰ⧇āĨ¤

    āφāϏ⧁āύ āφāĻŽāϰāĻž āĻŦāϞāĻŋ āϝ⧇ āφāĻŽāĻžāĻĻ⧇āϰ āϏāĻŽāĻ¸ā§āϤ āĻ•āĻžāϜāχ āĻ…āĻĻāĻŽā§āϝ āύāϝāĻŧ, āϤāĻŦ⧇ āϏ⧇āϗ⧁āϞāĻŋ āĻ•āĻ–āύāĻ“ āĻ•āĻ–āύāĻ“ āĻĒāĻĄāĻŧ⧇ āϝ⧇āϤ⧇ āĻĒāĻžāϰ⧇ āĻāĻŦāĻ‚ āĻāϟāĻŋ āĻ¸ā§āĻŦāĻžāĻ­āĻžāĻŦāĻŋāĻ•āĨ¤ āĻ•āĻŋāĻ¨ā§āϤ⧁ āĻ•āϝāĻŧ⧇āĻ•āϟāĻŋ āĻŦā§āϞāϕ⧇āϜ āχāϤāĻŋāĻŽāĻ§ā§āϝ⧇āχ āϏāĻ¨ā§āĻĻ⧇āĻšāϜāύāĻ•, āĻāĻŦāĻ‚ āĻāϟāĻŋ āĻĒāϰ⧀āĻ•ā§āώāĻž āĻ•āϰāĻž āĻĒā§āϰāϝāĻŧā§‹āϜāύāĨ¤

    āĻāϏāĻ•āĻŋāωāĻāϞ āϏāĻžāĻŦāϧāĻžāύ!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

āϰ⧇āĻĢāĻžāϰ⧇āĻ¨ā§āϏ

āĻāĻŦāĻ‚ āĻ…āĻŦāĻļā§āϝāχ, āϗ⧁āĻ—āϞ āχāĻ¸ā§āϝ⧁ āĻ•āϰāĻžāϰ āĻĒā§āϰāĻĨāĻŽ āĻĻāĻļāϟāĻŋ āϞāĻŋāĻ™ā§āĻ• āφāĻŽāĻžāϰ āĻŦ⧁āĻ•āĻŽāĻžāĻ°ā§āĻ• āĻĨ⧇āϕ⧇ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧋ āĻĢā§‹āĻ˛ā§āĻĄāĻžāϰ⧇āϰ āĻŦāĻŋāώāϝāĻŧāĻŦāĻ¸ā§āϤ⧁āĨ¤

āĻāĻŦāĻ‚ āύāĻŋāĻŦāĻ¨ā§āϧ⧇ āĻŦā§āϝāĻŦāĻšā§ƒāϤ āϞāĻŋāĻ™ā§āĻ•āϗ⧁āϞāĻŋ:

āωāĻ¤ā§āϏ: www.habr.com

DDoS āϏ⧁āϰāĻ•ā§āώāĻž, VPS VDS āϏāĻžāĻ°ā§āĻ­āĻžāϰ āϏāĻš āϏāĻžāχāϟāϗ⧁āϞāĻŋāϰ āϜāĻ¨ā§āϝ āύāĻŋāĻ°ā§āĻ­āϰāϝ⧋āĻ—ā§āϝ āĻšā§‹āĻ¸ā§āϟāĻŋāĻ‚ āĻ•āĻŋāύ⧁āύ đŸ”Ĩ DDoS āϏ⧁āϰāĻ•ā§āώāĻž āϏāĻš āύāĻŋāĻ°ā§āĻ­āϰāϝ⧋āĻ—ā§āϝ āĻ“āϝāĻŧ⧇āĻŦāϏāĻžāχāϟ āĻšā§‹āĻ¸ā§āϟāĻŋāĻ‚ āĻ•āĻŋāύ⧁āύ, VPS VDS āϏāĻžāĻ°ā§āĻ­āĻžāϰ | ProHoster