āĻšāĻžāĻ, āĻāĻŽāĻŋ āĻĻāĻŋāĻŽāĻŋāϤā§āϰāĻŋ āϞāĻāĻāĻŋāύā§āĻā§āĻā§ - āĻā§āĻā§āĻ āĻā§āϰā§āĻĒ āĻ āĻĢ āĻā§āĻŽā§āĻĒāĻžāύāĻŋāϰ āĻŦāĻŋāĻļā§āϞā§āώāĻŖ āĻŦāĻŋāĻāĻžāĻā§āϰ āĻĄā§āĻāĻž āĻāĻā§āĻāĻŋāύāĻŋāϝāĻŧāĻžāϰāĨ¤
āĻāĻŽāĻŋ āĻāĻĒāύāĻžāĻā§ ETL āĻĒā§āϰāĻā§āϰāĻŋāϝāĻŧāĻžāĻā§āϞāĻŋ āĻŦāĻŋāĻāĻžāĻļā§āϰ āĻāύā§āϝ āĻāĻāĻāĻŋ āĻĻā§āϰā§āĻĻāĻžāύā§āϤ āϏāϰāĻā§āĻāĻžāĻŽ āϏāĻŽā§āĻĒāϰā§āĻā§ āĻŦāϞāĻŦ - āĻ ā§āϝāĻžāĻĒāĻžāĻāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞā§āĨ¤ āϤāĻŦā§ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āĻāϤāĻ āĻŦāĻšā§āĻŽā§āĻā§ āĻāĻŦāĻ āĻŦāĻšā§āĻŽā§āĻā§ āϝ⧠āĻāĻĒāύāĻŋ āĻĄā§āĻāĻž āĻĒā§āϰāĻŦāĻžāĻšā§āϰ āϏāĻžāĻĨā§ āĻāĻĄāĻŧāĻŋāϤ āύāĻž āĻšāϞā§āĻ āĻāĻāĻŋāĻā§ āĻāϰāĻ āĻāύāĻŋāώā§āĻ āĻāĻžāĻŦā§ āĻĻā§āĻā§ āύā§āĻāϝāĻŧāĻž āĻāĻāĻŋāϤ, āϤāĻŦā§ āĻĒāϰā§āϝāĻžāϝāĻŧāĻā§āϰāĻŽā§ āϝ⧠āĻā§āύāĻ āĻĒā§āϰāĻā§āϰāĻŋāϝāĻŧāĻž āĻāĻžāϞ⧠āĻāϰāĻž āĻāĻŦāĻ āϤāĻžāĻĻā§āϰ āϏāĻŽā§āĻĒāĻžāĻĻāύ āύāĻŋāϰā§āĻā§āώāĻŖ āĻāϰāĻžāϰ āĻĒā§āϰāϝāĻŧā§āĻāύ āϰāϝāĻŧā§āĻā§āĨ¤
āĻāĻŦāĻ āĻšā§āϝāĻžāĻ, āĻāĻŽāĻŋ āĻā§āĻŦāϞ āĻŦāϞāĻŦ āύāĻž, āϤāĻŦā§ āĻĻā§āĻāĻžāĻŦ: āĻĒā§āϰā§āĻā§āϰāĻžāĻŽāĻāĻŋāϤ⧠āĻĒā§āϰāĻā§āϰ āĻā§āĻĄ, āϏā§āĻā§āϰāĻŋāύāĻļāĻ āĻāĻŦāĻ āϏā§āĻĒāĻžāϰāĻŋāĻļ āϰāϝāĻŧā§āĻā§āĨ¤

āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠/ āĻāĻāĻāĻŋāĻŽāĻŋāĻĄāĻŋāϝāĻŧāĻž āĻāĻŽāύā§āϏ āĻļāĻŦā§āĻĻāĻāĻŋ āĻā§āĻāϞ āĻāϰāϞ⧠āĻāĻĒāύāĻŋ āϏāĻžāϧāĻžāϰāĻŖāϤ āϝāĻž āĻĻā§āĻāϤ⧠āĻĒāĻžāύ
āĻŦāĻŋāώāϝāĻŧāĻŦāϏā§āϤ⧠āϏā§āĻāĻŋ
āĻā§āĻŽāĻŋāĻāĻž
āĻ ā§āϝāĻžāĻĒāĻžāĻāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āĻ āĻŋāĻ āĻā§āϝāĻžāĻā§āĻā§āϰ āĻŽāϤā§:
- āĻĒāĻžāĻāĻĨāύ⧠āϞā§āĻāĻž
- āĻāĻāĻāĻŋ āĻŽāĻšāĻžāύ āĻ ā§āϝāĻžāĻĄāĻŽāĻŋāύ āĻĒā§āϝāĻžāύā§āϞ āĻāĻā§,
- āĻ āύāĻŋāϰā§āĻĻāĻŋāώā§āĻāĻāĻžāϞā§āϰ āĻāύā§āϝ āĻĒā§āϰāϏāĻžāϰāĻŖāϝā§āĻā§āϝ
- āĻļā§āϧā§āĻŽāĻžāϤā§āϰ āĻāĻžāϞ, āĻāĻŦāĻ āĻāĻāĻŋ āϏāĻŽā§āĻĒā§āϰā§āĻŖ āĻāĻŋāύā§āύ āĻāĻĻā§āĻĻā§āĻļā§āϝ⧠āϤā§āϰāĻŋ āĻāϰāĻž āĻšāϝāĻŧā§āĻāĻŋāϞ, āϝāĻĨāĻž (āϝā§āĻŽāύ āĻāĻāĻŋ āĻā§āϝāĻžāĻā§āϰ āĻāĻā§ āϞā§āĻāĻž āĻāĻā§):
- āϏā§āĻŽāĻžāĻšā§āύ āϏāĻāĻā§āϝāĻ āĻŽā§āĻļāĻŋāύ⧠āĻāĻžāĻ āĻāĻžāϞāĻžāύ⧠āĻāĻŦāĻ āĻĒāϰā§āϝāĻŦā§āĻā§āώāĻŖ āĻāϰāĻž (āϝāϤ āĻŦā§āĻļāĻŋ āϏā§āϞāĻžāϰāĻŋ/āĻā§āĻŦāĻžāϰāύā§āĻāϏ āĻāĻŦāĻ āĻāĻĒāύāĻžāϰ āĻŦāĻŋāĻŦā§āĻ āĻāĻĒāύāĻžāĻā§ āĻ āύā§āĻŽāϤāĻŋ āĻĻā§āĻŦā§)
- āĻĄāĻžāϝāĻŧāύāĻžāĻŽāĻŋāĻ āĻāϝāĻŧāĻžāϰā§āĻāĻĢā§āϞ⧠āĻā§āύāĻžāϰā§āĻļāύā§āϰ āϏāĻžāĻĨā§ āĻĒāĻžāĻāĻĨāύ āĻā§āĻĄ āϞāĻŋāĻāϤ⧠āĻāĻŦāĻ āĻŦā§āĻāϤ⧠āĻā§āĻŦ āϏāĻšāĻ āĻĨā§āĻā§
- āĻāĻŦāĻ āϰā§āĻĄāĻŋāĻŽā§āĻĄ āĻāĻŽā§āĻĒā§āύā§āύā§āĻ āĻāĻŦāĻ āĻāϰ⧠āϤā§āϰāĻŋ āĻĒā§āϞāĻžāĻāĻāύ āĻāĻāϝāĻŧ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰ⧠āϝā§āĻā§āύ āĻĄāĻžāĻāĻžāĻŦā§āϏ āĻāĻŦāĻ āĻāĻĒāĻŋāĻāĻ āĻāĻā§ āĻ āĻĒāϰā§āϰ āϏāĻžāĻĨā§ āϏāĻāϝā§āĻ āĻāϰāĻžāϰ āĻā§āώāĻŽāϤāĻž (āϝāĻž āĻ āϤā§āϝāύā§āϤ āϏāĻšāĻ)āĨ¤
āĻāĻŽāϰāĻž āĻāĻ āĻŽāϤ Apache Airflow āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰāĻŋ:
- āĻāĻŽāϰāĻž āĻŦāĻŋāĻāĻŋāύā§āύ āĻā§āϏ āĻĨā§āĻā§ āĻĄā§āĻāĻž āϏāĻāĻā§āϰāĻš āĻāϰāĻŋ (āĻ āύā§āĻ SQL āϏāĻžāϰā§āĻāĻžāϰ āĻāĻŦāĻ PostgreSQL āĻāĻĻāĻžāĻšāϰāĻŖ, āĻ ā§āϝāĻžāĻĒā§āϞāĻŋāĻā§āĻļāύ āĻŽā§āĻā§āϰāĻŋāĻā§āϏ āϏāĻš āĻŦāĻŋāĻāĻŋāύā§āύ API, āĻāĻŽāύāĻāĻŋ 1C) DWH āĻāĻŦāĻ ODS (āĻāĻŽāĻžāĻĻā§āϰ Vertica āĻāĻŦāĻ Clickhouse āĻāĻā§)āĨ¤
- āĻāϤ āĻāύā§āύāϤ
cron, āϝāĻž āĻāĻĄāĻŋāĻāϏ-āĻ āĻĄā§āĻāĻž āĻāĻāϤā§āϰā§āĻāϰāĻŖ āĻĒā§āϰāĻā§āϰāĻŋāϝāĻŧāĻž āĻļā§āϰ⧠āĻāϰ⧠āĻāĻŦāĻ āϤāĻžāĻĻā§āϰ āϰāĻā§āώāĻŖāĻžāĻŦā§āĻā§āώāĻŖāĻ āĻĒāϰā§āϝāĻŦā§āĻā§āώāĻŖ āĻāϰā§āĨ¤
āϏāĻŽā§āĻĒā§āϰāϤāĻŋ āĻ āĻŦāϧāĻŋ, āĻāĻŽāĻžāĻĻā§āϰ āĻāĻžāĻšāĻŋāĻĻāĻžāĻā§āϞāĻŋ 32 āĻā§āϰ āĻāĻŦāĻ 50 GB RAM āϏāĻš āĻāĻāĻāĻŋ āĻā§āĻ āϏāĻžāϰā§āĻāĻžāϰ āĻĻā§āĻŦāĻžāϰāĻž āĻāĻā§āĻāĻžāĻĻāĻŋāϤ āĻāĻŋāϞ⧎ āĻāϝāĻŧāĻžāϰāĻĢā§āϞā§āϤā§, āĻāĻāĻŋ āĻāĻžāĻ āĻāϰā§:
- āĻ āϧāĻŋāĻ 200 āĻĄā§āϝāĻžāĻ (āĻāϏāϞ⧠āĻāϝāĻŧāĻžāϰā§āĻāĻĢā§āϞā§, āϝā§āĻāĻžāύ⧠āĻāĻŽāϰāĻž āĻāĻžāĻāĻā§āϞāĻŋ āϏā§āĻāĻžāĻĢ āĻāϰā§āĻāĻŋ)
- āĻāĻĄāĻŧā§ āĻĒā§āϰāϤāĻŋāĻāĻŋāϤ⧠70āĻāĻŋ āĻāĻžāĻ,
- āĻāĻ āϧāĻžāϰā§āĻŽāĻŋāĻāϤāĻž āĻļā§āϰ⧠āĻšāϝāĻŧ (āĻāĻĄāĻŧā§āĻ) āĻāύā§āĻāĻžāϝāĻŧ āĻāĻāĻŦāĻžāϰ.
āĻāĻŦāĻ āĻāĻŽāϰāĻž āĻā§āĻāĻžāĻŦā§ āĻĒā§āϰāϏāĻžāϰāĻŋāϤ āĻāϰā§āĻāĻŋ āϏ⧠āϏāĻŽā§āĻĒāϰā§āĻā§, āĻāĻŽāĻŋ āύā§āĻā§ āϞāĻŋāĻāĻŦ, āĻāĻŋāύā§āϤ⧠āĻāĻāύ āĻāϏā§āύ Ãŧber-āϏāĻŽāϏā§āϝāĻžāĻāĻŋ āϏāĻāĻā§āĻāĻžāϝāĻŧāĻŋāϤ āĻāϰāĻŋ āϝāĻž āĻāĻŽāϰāĻž āϏāĻŽāĻžāϧāĻžāύ āĻāϰāĻŦ:
āϤāĻŋāύāĻāĻŋ āĻā§āϏ āĻāϏāĻāĻŋāĻāĻāϞ āϏāĻžāϰā§āĻāĻžāϰ āϰāϝāĻŧā§āĻā§, āĻĒā§āϰāϤāĻŋāĻāĻŋāϤ⧠50āĻāĻŋ āĻĄāĻžāĻāĻžāĻŦā§āϏ āϰāϝāĻŧā§āĻā§ - āϝāĻĨāĻžāĻā§āϰāĻŽā§ āĻāĻāĻāĻŋ āĻĒā§āϰāĻāϞā§āĻĒā§āϰ āĻāĻĻāĻžāĻšāϰāĻŖ, āϤāĻžāĻĻā§āϰ āĻāĻāĻ āĻāĻžāĻ āĻžāĻŽā§ āϰāϝāĻŧā§āĻā§ (āĻĒā§āϰāĻžāϝāĻŧ āϏāϰā§āĻŦāϤā§āϰ, āĻŽā§āĻ-āĻšāĻž-āĻšāĻž), āϝāĻžāϰ āĻ āϰā§āĻĨ āĻĒā§āϰāϤāĻŋāĻāĻŋāϤ⧠āĻāĻāĻāĻŋ āĻ āϰā§āĻĄāĻžāϰ āĻā§āĻŦāĻŋāϞ āϰāϝāĻŧā§āĻā§ (āϏā§āĻāĻžāĻā§āϝāĻā§āϰāĻŽā§, āĻāĻāĻŋ āϏāĻš āĻāĻāĻāĻŋ āĻā§āĻŦāĻŋāϞ āύāĻžāĻŽ āϝā§āĻā§āύ āĻŦā§āϝāĻŦāϏāĻžāϝāĻŧ āĻĒā§āĻļ āĻāϰāĻž āϝā§āϤ⧠āĻĒāĻžāϰā§)āĨ¤ āĻāĻŽāϰāĻž āĻĒāϰāĻŋāώā§āĻŦāĻžāϰ āĻā§āώā§āϤā§āϰāĻā§āϞāĻŋ (āϏā§āϰā§āϏ āϏāĻžāϰā§āĻāĻžāϰ, āϏā§āϰā§āϏ āĻĄāĻžāĻāĻžāĻŦā§āϏ, ETL āĻāĻžāϏā§āĻ āĻāĻāĻĄāĻŋ) āϝā§āĻ āĻāϰ⧠āĻĄā§āĻāĻž āύāĻŋāϝāĻŧā§ āĻĨāĻžāĻāĻŋ āĻāĻŦāĻ āϏāĻšāĻā§ āϏā§āĻā§āϞāĻŋāĻā§ āĻāĻžāϰā§āĻāĻŋāĻāĻžāϰ āĻŽāϧā§āϝ⧠āĻĢā§āϞ⧠āĻĻāĻŋāĻāĨ¤
āĻāϞ āϝāĻžāĻ!
āĻĒā§āϰāϧāĻžāύ āĻ āĻāĻļ, āĻŦā§āϝāĻŦāĻšāĻžāϰāĻŋāĻ (āĻāĻŦāĻ āĻāĻāĻā§ āϤāĻžāϤā§āϤā§āĻŦāĻŋāĻ)
āĻā§āύ āĻāĻŽāϰāĻž (āĻāĻŦāĻ āĻāĻĒāύāĻŋ)
āϝāĻāύ āĻāĻžāĻāĻā§āϞ⧠āĻŦāĻĄāĻŧ āĻāĻŋāϞ āĻāϰ āĻāĻŽāĻŋ āϏāϰāϞ āĻāĻŋāϞāĻžāĻŽ SQL-āĻāĻāĻāĻŋ āϰāĻžāĻļāĻŋāϝāĻŧāĻžāύ āĻā§āĻāϰā§āϤā§, āĻāĻŽāϰāĻž āĻāĻŽāĻžāĻĻā§āϰ āĻāĻžāĻā§ āĻāĻĒāϞāĻŦā§āϧ āĻĻā§āĻāĻŋ āϏāϰāĻā§āĻāĻžāĻŽ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰ⧠ETL āĻĒā§āϰāĻā§āϰāĻŋāϝāĻŧāĻž āĻāϰāĻĢā§ āĻĄā§āĻāĻž āĻĒā§āϰāĻŦāĻžāĻšāĻā§ āϏā§āĻā§āϝāĻžāĻŽ āĻāϰā§āĻāĻŋ:
- āĻāύāĻĢāϰāĻŽā§āĻāĻŋāĻāĻž ââāĻĒāĻžāĻāϝāĻŧāĻžāϰ āϏā§āύā§āĻāĻžāϰ - āĻāĻāĻāĻŋ āĻ
āϤā§āϝāύā§āϤ āĻāĻĄāĻŧāĻŋāϝāĻŧā§ āĻĒāĻĄāĻŧāĻž āϏāĻŋāϏā§āĻā§āĻŽ, āĻ
āϤā§āϝāύā§āϤ āĻāϤā§āĻĒāĻžāĻĻāύāĻļā§āϞ, āύāĻŋāĻāϏā§āĻŦ āĻšāĻžāϰā§āĻĄāĻāϝāĻŧā§āϝāĻžāϰ āϏāĻš, āĻāϰ āύāĻŋāĻāϏā§āĻŦ āϏāĻāϏā§āĻāϰāĻŖāĨ¤ āĻāĻŽāĻŋ āĻāϰ āĻā§āώāĻŽāϤāĻžāϰ 1% āĻāĻļā§āĻŦāϰ āύāĻŋāώā§āϧ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰā§āĻāĻŋāĨ¤ āĻā§āύ? āĻ āĻŋāĻ āĻāĻā§, āĻĒā§āϰāĻĨāĻŽāϤ, āĻāĻ āĻāύā§āĻāĻžāϰāĻĢā§āϏāĻāĻŋ 380 āĻāϰ āĻĻāĻļāĻā§āϰ āĻā§āĻĨāĻžāĻ āĻŽāĻžāύāϏāĻŋāĻāĻāĻžāĻŦā§ āĻāĻŽāĻžāĻĻā§āϰ āĻāĻĒāϰ āĻāĻžāĻĒ āϏā§āώā§āĻāĻŋ āĻāϰā§āĻā§āĨ¤ āĻĻā§āĻŦāĻŋāϤā§āϝāĻŧāϤ, āĻāĻ āĻāύāĻā§āϰāĻžāĻĒāĻļāύāĻāĻŋ āĻ
āϤā§āϝāύā§āϤ āĻ
āĻāĻŋāύāĻŦ āĻĒā§āϰāĻā§āϰāĻŋāϝāĻŧāĻž, āĻāĻā§āϰ āĻāĻĒāĻžāĻĻāĻžāύ āĻĒā§āύāĻāĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāĻŦāĻ āĻ
āύā§āϝāĻžāύā§āϝ āĻ
āϤā§āϝāύā§āϤ-āĻā§āϰā§āϤā§āĻŦāĻĒā§āϰā§āĻŖ-āĻāύā§āĻāĻžāϰāĻĒā§āϰāĻžāĻāĻ-āĻā§āĻļāϞā§āϰ āĻāύā§āϝ āĻĄāĻŋāĻāĻžāĻāύ āĻāϰāĻž āĻšāϝāĻŧā§āĻā§āĨ¤ āĻāϝāĻŧāĻžāϰāĻŦāĻžāϏ AXNUMX / āĻŦāĻāϰā§āϰ āĻāĻāĻ āĻāϰ āĻŽāϤ āĻāĻāĻŋāϰ āĻĻāĻžāĻŽ āĻā§ āϤāĻž āϏāĻŽā§āĻĒāϰā§āĻā§ āĻāĻŽāϰāĻž āĻāĻŋāĻā§ āĻŦāϞāĻŦ āύāĻžāĨ¤
āϏāĻžāĻŦāϧāĻžāύ, āĻāĻāĻāĻŋ āϏā§āĻā§āϰāĻŋāύāĻļāĻ 30 āĻŦāĻāϰā§āϰ āĻāĻŽ āĻŦāϝāĻŧāϏ⧠āϞā§āĻā§āĻĻā§āϰ āĻāĻŋāĻā§āĻāĻž āĻā§āώāϤāĻŋ āĻāϰāϤ⧠āĻĒāĻžāϰā§

- SQL āϏāĻžāϰā§āĻāĻžāϰ āĻāύā§āĻāĻŋāĻā§āϰā§āĻļāύ āϏāĻžāϰā§āĻāĻžāϰ - āĻāĻŽāϰāĻž āĻāĻŽāĻžāĻĻā§āϰ āĻāύā§āϤāĻāĻĒā§āϰāĻāϞā§āĻĒ āĻĒā§āϰāĻŦāĻžāĻšā§ āĻāĻ āĻāĻŽāϰā§āĻĄ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰā§āĻāĻŋāĨ¤ āĻ āĻŋāĻ āĻāĻā§, āĻāϏāϞā§: āĻāĻŽāϰāĻž āĻāϤāĻŋāĻŽāϧā§āϝā§āĻ SQL āϏāĻžāϰā§āĻāĻžāϰ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰāĻŋ, āĻāĻŦāĻ āĻāĻāĻŋāϰ ETL āϏāϰāĻā§āĻāĻžāĻŽāĻā§āϞāĻŋ āĻŦā§āϝāĻŦāĻšāĻžāϰ āύāĻž āĻāϰāĻž āĻāĻāϰāĻāĻŽ āĻ
āϝā§āĻā§āϤāĻŋāĻ āĻšāĻŦā§āĨ¤ āĻāϰ āĻŽāϧā§āϝ⧠āϏāĻŦāĻāĻŋāĻā§āĻ āĻāĻžāϞā§: āĻāύā§āĻāĻžāϰāĻĢā§āϏ āĻĻā§āĻāĻŋāĻ āϏā§āύā§āĻĻāϰ, āĻāĻŦāĻ āĻ
āĻā§āϰāĻāϤāĻŋ āĻĒā§āϰāϤāĻŋāĻŦā§āĻĻāύ... āĻāĻŋāύā§āϤ⧠āĻāĻ āĻāĻžāϰāĻŖā§āĻ āĻāĻŽāϰāĻž āϏāĻĢā§āĻāĻāϝāĻŧā§āϝāĻžāϰ āĻĒāĻŖā§āϝ āĻĒāĻāύā§āĻĻ āĻāϰāĻŋ āύāĻž, āĻāĻš, āĻāϰ āĻāύā§āϝ āύāϝāĻŧāĨ¤ āĻāĻāĻž āϏāĻāϏā§āĻāϰāĻŖ
dtsx(āϝā§āĻāĻŋ XML āύā§āĻĄāĻā§āϞāĻŋāĻā§ āϏā§āĻ āĻāϰāĻžāϰ āϏāĻŽāϝāĻŧ āĻāϞā§āĻŽā§āϞ⧠āĻāϰ⧠āĻĻā§āĻāϝāĻŧāĻž āĻšāϝāĻŧ) āĻāĻŽāϰāĻž āĻĒāĻžāϰāĻŋ, āĻāĻŋāύā§āϤ⧠āĻā§ āϞāĻžāĻ? āĻā§āĻāĻžāĻŦā§ āĻāĻāĻāĻŋ āĻāĻžāϏā§āĻ āĻĒā§āϝāĻžāĻā§āĻ āϤā§āϰāĻŋ āĻāϰāĻŦā§āύ āϝāĻž āĻāĻ āϏāĻžāϰā§āĻāĻžāϰ āĻĨā§āĻā§ āĻ āύā§āϝ āϏāĻžāϰā§āĻāĻžāϰ⧠āĻļāϤ āĻļāϤ āĻā§āĻŦāĻŋāϞ āĻā§āύ⧠āĻāύāĻŦā§? āĻšā§āϝāĻžāĻ, āĻāĻŋ āĻāĻāĻļ, āĻāĻĒāύāĻžāϰ āϤāϰā§āĻāύ⧠āĻŦāĻŋāĻļ āĻā§āĻāϰāĻž āĻĨā§āĻā§ āĻĒāĻĄāĻŧā§ āϝāĻžāĻŦā§, āĻŽāĻžāĻāϏ āĻŦā§āϤāĻžāĻŽā§ āĻā§āϞāĻŋāĻ āĻāϰā§āύāĨ¤ āϤāĻŦā§ āĻāĻāĻŋ āĻ āĻŦāĻļā§āϝāĻ āĻāϰāĻ āĻĢā§āϝāĻžāĻļāύā§āĻŦāϞ āĻĻā§āĻāĻžāϝāĻŧ:
āĻāĻŽāϰāĻž āĻ āĻŦāĻļā§āϝāĻ āĻāĻĒāĻžāϝāĻŧ āĻā§āĻāĻāĻāĻŋ. āĻŽāĻžāĻŽāϞāĻž āĻāĻŽāύāĻāĻŋ āĻĒā§āϰāĻžāϝāĻŧ āĻāĻāĻāĻŋ āϏā§āĻŦ-āϞāĻŋāĻāĻŋāϤ SSIS āĻĒā§āϝāĻžāĻā§āĻ āĻā§āύāĻžāϰā§āĻāϰ⧠āĻāϏā§āĻā§ ...
âĻāĻāĻŦāĻ āϤāĻžāϰāĻĒāϰ āĻāĻāĻāĻŋ āύāϤā§āύ āĻāĻžāĻāϰāĻŋ āĻāĻŽāĻžāĻā§ āĻā§āĻāĻā§ āĻĒā§āϝāĻŧā§āĻāĻŋāϞāĨ¤ āĻāĻŦāĻ āĻ ā§āϝāĻžāĻĒāĻžāĻāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āĻāĻāĻŋāϤ⧠āĻāĻŽāĻžāĻā§ āĻāĻžāĻĄāĻŧāĻŋāϝāĻŧā§ āĻā§āĻā§āĨ¤
āϝāĻāύ āĻāĻŽāĻŋ āĻāĻžāύāϤ⧠āĻĒāĻžāϰāϞāĻžāĻŽ āϝ⧠ETL āĻĒā§āϰāĻā§āϰāĻŋāϝāĻŧāĻžāϰ āĻŦāĻŋāĻŦāϰāĻŖāĻā§āϞāĻŋ āĻšāϞ āϏāĻžāϧāĻžāϰāĻŖ āĻĒāĻžāĻāĻĨāύ āĻā§āĻĄ, āϤāĻāύ āĻāĻŽāĻŋ āĻāύāύā§āĻĻā§āϰ āĻāύā§āϝ āύāĻžāĻ āĻāϰāĻŋāύāĻŋāĨ¤ āĻāĻāĻžāĻŦā§āĻ āĻĄā§āĻāĻž āϏā§āĻā§āϰāĻŋāĻŽāĻā§āϞāĻŋāĻā§ āϏāĻāϏā§āĻāϰāĻŖ āĻāϰāĻž āĻšāϝāĻŧā§āĻāĻŋāϞ āĻāĻŦāĻ āĻāϞāĻžāĻĻāĻž āĻāϰāĻž āĻšāϝāĻŧā§āĻāĻŋāϞ āĻāĻŦāĻ āĻļāϤ āĻļāϤ āĻĄāĻžāĻāĻžāĻŦā§āϏ āĻĨā§āĻā§ āĻāĻāĻāĻŋ āĻāĻžāϰā§āĻā§āĻā§ āĻāĻāĻ āĻāĻžāĻ āĻžāĻŽā§āϰ āϏāĻžāĻĨā§ āĻā§āĻŦāĻŋāϞ āĻĸā§āϞ⧠āĻĻā§āĻāϝāĻŧāĻž āĻĻā§āĻĄāĻŧ āĻŦāĻž āĻĻā§āĻ 13â āϏā§āĻā§āϰāĻŋāύ⧠āĻĒāĻžāĻāĻĨāύ āĻā§āĻĄā§āϰ āĻŦāĻŋāώāϝāĻŧ āĻšāϝāĻŧā§ āĻĻāĻžāĻāĻĄāĻŧāĻŋāϝāĻŧā§āĻāĻŋāϞāĨ¤
āĻā§āϞāĻžāϏā§āĻāĻžāϰ āĻāĻāϤā§āϰāĻŋāϤ āĻāϰāĻž
āĻāϏā§āύ āĻāĻāĻāĻŋ āϏāĻŽā§āĻĒā§āϰā§āĻŖ āĻāĻŋāύā§āĻĄāĻžāϰāĻāĻžāϰā§āĻā§āύā§āϰ āĻŦā§āϝāĻŦāϏā§āĻĨāĻž āĻāϰāĻŋ āύāĻž, āĻāĻŦāĻ āĻāĻāĻžāύ⧠āϏāĻŽā§āĻĒā§āϰā§āĻŖ āϏā§āϏā§āĻĒāώā§āĻ āĻāĻŋāύāĻŋāϏāĻā§āϞāĻŋ āϏāĻŽā§āĻĒāϰā§āĻā§ āĻāĻĨāĻž āĻŦāϞāĻŋ āύāĻž, āϝā§āĻŽāύ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āĻāύāϏā§āĻāϞ āĻāϰāĻž, āĻāĻĒāύāĻžāϰ āύāĻŋāϰā§āĻŦāĻžāĻāĻŋāϤ āĻĄāĻžāĻāĻžāĻŦā§āϏ, āϏā§āϞāĻžāϰāĻŋ āĻāĻŦāĻ āĻĄāĻāĻā§āϞāĻŋāϤ⧠āĻŦāϰā§āĻŖāĻŋāϤ āĻ āύā§āϝāĻžāύā§āϝ āĻā§āώā§āϤā§āϰā§āĨ¤
āϝāĻžāϤ⧠āĻāĻŽāϰāĻž āĻ
āĻŦāĻŋāϞāĻŽā§āĻŦā§ āĻĒāϰā§āĻā§āώāĻž āĻļā§āϰ⧠āĻāϰāϤ⧠āĻĒāĻžāϰāĻŋ, āĻāĻŽāĻŋ āϏā§āĻā§āĻ āĻāϰā§āĻāĻŋ docker-compose.yml āϝāĻž:
- āĻāϰ āĻāϏāϞ⧠āĻŦāĻžāĻĄāĻŧāĻžāϤ⧠āϝāĻžāĻ āĻŦāĻžāϤāĻžāϏā§āϰ āĻĒā§āϰāĻŦāĻžāĻš: āĻļāĻŋāĻĄāĻŋāĻāϞāĻžāϰ, āĻāϝāĻŧā§āĻŦ āϏāĻžāϰā§āĻāĻžāϰāĨ¤ āϏā§āϞāĻžāϰāĻŋ āĻāĻžāĻāĻā§āϞāĻŋ āύāĻŋāϰā§āĻā§āώāĻŖā§āϰ āĻāύā§āϝ āĻĢā§āϞ āϏā§āĻāĻžāύ⧠āĻā§āϰāĻŦā§ (āĻāĻžāϰāĻŖ āĻāĻāĻŋ āĻāϤāĻŋāĻŽāϧā§āϝā§āĻ āĻ ā§āϞ⧠āĻĻā§āĻāϝāĻŧāĻž āĻšāϝāĻŧā§āĻā§
apache/airflow:1.10.10-python3.7āĻāĻŋāύā§āϤ⧠āĻāĻŽāϰāĻž āĻāĻŋāĻā§ āĻŽāύ⧠āĻāϰāĻŋ āύāĻž) - āĻĒā§āϏā§āĻāĻā§āϰāĻŋ, āϝāĻžāϤ⧠āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āϤāĻžāϰ āĻĒāϰāĻŋāώā§āĻŦāĻžāϰ āϤāĻĨā§āϝ āϞāĻŋāĻāĻŦā§ (āĻļāĻŋāĻĄāĻŋāĻāϞāĻžāϰ āĻĄā§āĻāĻž, āĻāĻā§āϏāĻŋāĻāĻŋāĻāĻļāύ āĻĒāϰāĻŋāϏāĻāĻā§āϝāĻžāύ, āĻāϤā§āϝāĻžāĻĻāĻŋ), āĻāĻŦāĻ āϏā§āϞāĻžāϰāĻŋ āϏāĻŽā§āĻĒā§āϰā§āĻŖ āĻāĻžāĻāĻā§āϞāĻŋ āĻāĻŋāĻšā§āύāĻŋāϤ āĻāϰāĻŦā§;
- Redis, āϝāĻž āϏā§āϞāĻžāϰāĻŋāϰ āĻāύā§āϝ āĻāĻāĻāĻŋ āĻāĻžāϏā§āĻ āĻŦā§āϰā§āĻāĻžāϰ āĻšāĻŋāϏāĻžāĻŦā§ āĻāĻžāĻ āĻāϰāĻŦā§;
- āϏā§āϞāĻžāϰāĻŋ āĻāϰā§āĻŽā§, āϝāĻž āϏāϰāĻžāϏāϰāĻŋ āĻāĻžāϰā§āϝ āϏāĻŽā§āĻĒāĻžāĻĻāύ⧠āύāĻŋāϝā§āĻā§āϤ āĻĨāĻžāĻāĻŦā§āĨ¤
- āĻĢā§āϞā§āĻĄāĻžāϰā§
./dagsāĻāĻŽāϰāĻž āĻāĻŽāĻžāĻĻā§āϰ āĻĢāĻžāĻāϞāĻā§āϞāĻŋāĻā§ āĻĄā§āϝāĻžāĻāĻā§āϞāĻŋāϰ āĻŦāĻŋāĻŦāϰāĻŖā§āϰ āϏāĻžāĻĨā§ āϝā§āĻā§āϤ āĻāϰāĻŦāĨ¤ āĻāĻā§āϞāĻŋ āĻāĻĄāĻŧā§ āϝāĻžāĻāϝāĻŧāĻžāϰ āϏāĻŽāϝāĻŧ āϤā§āϞ⧠āύā§āĻāϝāĻŧāĻž āĻšāĻŦā§, āϤāĻžāĻ āĻĒā§āϰāϤāĻŋāĻāĻŋ āĻšāĻžāĻāĻāĻŋāϰ āĻĒāϰ⧠āĻĒā§āϰ⧠āϏā§āĻā§āϝāĻžāĻāĻāĻŋ āĻā§āϰāĻžāĻā§āϰāĻŋ āĻāϰāĻžāϰ āĻĻāϰāĻāĻžāϰ āύā§āĻāĨ¤
āĻāĻŋāĻā§ āĻāĻžāϝāĻŧāĻāĻžāϝāĻŧ, āĻāĻĻāĻžāĻšāϰāĻŖāĻā§āϞāĻŋāϰ āĻā§āĻĄāĻāĻŋ āϏāĻŽā§āĻĒā§āϰā§āĻŖāϰā§āĻĒā§ āĻĻā§āĻāĻžāύ⧠āĻšāϝāĻŧ āύāĻž (āϝāĻžāϤ⧠āĻĒāĻžāĻ ā§āϝāĻāĻŋ āĻŦāĻŋāĻļā§āĻā§āĻāϞ āύāĻž āĻšāϝāĻŧ), āϤāĻŦā§ āĻā§āĻĨāĻžāĻ āĻāĻāĻŋ āĻĒā§āϰāĻā§āϰāĻŋāϝāĻŧāĻžāϝāĻŧ āĻĒāϰāĻŋāĻŦāϰā§āϤāύ āĻāϰāĻž āĻšāϝāĻŧāĨ¤ āϏāĻŽā§āĻĒā§āϰā§āĻŖ āĻāĻžāĻā§āϰ āĻā§āĻĄ āĻāĻĻāĻžāĻšāϰāĻŖ āϏāĻāĻā§āϰāĻšāϏā§āĻĨāϞ āĻĒāĻžāĻāϝāĻŧāĻž āϝāĻžāĻŦā§ .
Docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerāĻŽāύā§āϤāĻŦā§āϝ āϏāĻŽā§āĻš:
- āϰāĻāύāĻžāĻāĻŋāϰ āϏāĻŽāĻžāĻŦā§āĻļā§, āĻāĻŽāĻŋ āĻŽā§āϞāϤ āϏā§āĻĒāϰāĻŋāĻāĻŋāϤ āĻāĻŋāϤā§āϰā§āϰ āĻāĻĒāϰ āύāĻŋāϰā§āĻāϰ āĻāϰā§āĻāĻŋāϞāĻžāĻŽ - āĻāĻāĻŋ āĻā§āĻ āĻāĻāĻ āĻāϰāϤ⧠āĻā§āϞāĻŦā§āύ āύāĻž. āĻšāϝāĻŧāϤ⧠āϤā§āĻŽāĻžāϰ āĻā§āĻŦāύ⧠āĻāϰ āĻāĻŋāĻā§āϰ āĻĻāϰāĻāĻžāϰ āύā§āĻāĨ¤
- āϏāĻŽāϏā§āϤ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āϏā§āĻāĻŋāĻāϏ āύāĻž āĻļā§āϧā§āĻŽāĻžāϤā§āϰ āĻŽāĻžāϧā§āϝāĻŽā§ āĻāĻĒāϞāĻŦā§āϧ
airflow.cfg, āĻāĻŋāύā§āϤ⧠āĻĒāϰāĻŋāĻŦā§āĻļ āĻā§āϰāĻŋāϝāĻŧā§āĻŦāϞā§āϰ āĻŽāĻžāϧā§āϝāĻŽā§āĻ (āĻĄā§āĻā§āϞāĻĒāĻžāϰāĻĻā§āϰ āϧāύā§āϝāĻŦāĻžāĻĻ), āϝāĻž āĻāĻŽāĻŋ āĻĻā§āώāĻŋāϤāĻāĻžāĻŦā§ āϏā§āĻŦāĻŋāϧāĻž āύāĻŋāϝāĻŧā§āĻāĻŋāĨ¤ - āϏā§āĻŦāĻžāĻāĻžāĻŦāĻŋāĻāĻāĻžāĻŦā§āĻ, āĻāĻāĻŋ āĻāϤā§āĻĒāĻžāĻĻāύ-āĻĒā§āϰāϏā§āϤā§āϤ āύāϝāĻŧ: āĻāĻŽāĻŋ āĻāĻā§āĻāĻžāĻā§āϤāĻāĻžāĻŦā§ āĻĒāĻžāϤā§āϰ⧠āĻšāĻžāϰā§āĻāĻŦāĻŋāĻ āϰāĻžāĻāĻŋāύāĻŋ, āĻāĻŽāĻŋ āύāĻŋāϰāĻžāĻĒāϤā§āϤāĻž āύāĻŋāϝāĻŧā§ āĻŽāĻžāĻĨāĻž āĻāĻžāĻŽāĻžāĻāύāĻŋāĨ¤ āĻāĻŋāύā§āϤ⧠āĻāĻŽāĻŋ āĻāĻŽāĻžāĻĻā§āϰ āĻĒāϰā§āĻā§āώāĻžāĻāĻžāϰā§āĻĻā§āϰ āĻāύā§āϝ āύā§āϝā§āύāϤāĻŽ āĻāĻĒāϝā§āĻā§āϤ āĻāϰā§āĻāĻŋāĨ¤
- āĻŽāύ⧠āϰāĻžāĻāĻŦā§āύ āϝā§:
- āĻĄā§āĻ āĻĢā§āϞā§āĻĄāĻžāϰāĻāĻŋ āĻļāĻŋāĻĄāĻŋāĻāϞāĻāĻžāϰ⧠āĻāĻŦāĻ āĻāϰā§āĻŽā§āĻĻā§āϰ āĻāĻāϝāĻŧā§āϰ āĻāĻžāĻā§āĻ āĻ ā§āϝāĻžāĻā§āϏā§āϏāϝā§āĻā§āϝ āĻšāϤ⧠āĻšāĻŦā§āĨ¤
- āĻāĻāĻ āϏāĻŽāϏā§āϤ āϤā§āϤā§āϝāĻŧ āĻĒāĻā§āώā§āϰ āϞāĻžāĻāĻŦā§āϰā§āϰāĻŋāϰ āĻā§āώā§āϤā§āϰ⧠āĻĒā§āϰāϝā§āĻā§āϝ - āϏā§āĻā§āϞāĻŋ āĻ āĻŦāĻļā§āϝāĻ āĻāĻāĻāĻŋ āĻļāĻŋāĻĄāĻŋāĻāϞ āĻāĻŦāĻ āĻāϰā§āĻŽā§āĻĻā§āϰ āϏāĻžāĻĨā§ āĻŽā§āĻļāĻŋāύ⧠āĻāύāϏā§āĻāϞ āĻāϰāĻž āĻāĻāĻŋāϤāĨ¤
āĻāĻā§āĻāĻž, āĻāĻāύ āĻāĻāĻž āϏāĻšāĻ:
$ docker-compose up --scale worker=3āϏāĻŦāĻāĻŋāĻā§ āĻāĻ āĻžāϰ āĻĒāϰā§, āĻāĻĒāύāĻŋ āĻāϝāĻŧā§āĻŦ āĻāύā§āĻāĻžāϰāĻĢā§āϏāĻā§āϞāĻŋ āĻĻā§āĻāϤ⧠āĻĒāĻžāϰā§āύ:
- āĻŦāĻžāϤāĻžāϏā§āϰ āĻĒā§āϰāĻŦāĻžāĻš:
- āĻĢā§āϞāĻžāĻāϝāĻŧāĻžāϰ:
āĻŽā§āϞāĻŋāĻ āϧāĻžāϰāĻŖāĻž
āĻāĻĒāύāĻŋ āϝāĻĻāĻŋ āĻāĻ āϏāĻŽāϏā§āϤ "āĻĄā§āϝāĻžāĻāϏ" āĻāϰ āĻŽāϧā§āϝ⧠āĻāĻŋāĻā§ āύāĻž āĻŦā§āĻā§ āĻĨāĻžāĻā§āύ āϤāĻŦā§ āĻāĻāĻžāύ⧠āĻāĻāĻāĻŋ āϏāĻāĻā§āώāĻŋāĻĒā§āϤ āĻ āĻāĻŋāϧāĻžāύ āϰāϝāĻŧā§āĻā§:
- āύāĻŋāϰā§āϧāĻžāϰāĻŖāĻāĻžāϰ⧠- āĻāϝāĻŧāĻžāϰāĻĢā§āϞā§āϤ⧠āϏāĻŦāĻā§āϝāĻŧā§ āĻā§āϰā§āϤā§āĻŦāĻĒā§āϰā§āĻŖ āĻāĻžāĻāĻž, āϝāĻŋāύāĻŋ āύāĻŋāϝāĻŧāύā§āϤā§āϰāĻŖ āĻāϰā§āύ āϝ⧠āϰā§āĻŦāĻ āĻāĻ ā§āϰ āĻĒāϰāĻŋāĻļā§āϰāĻŽ āĻāϰā§, āĻāĻāĻāύ āĻŦā§āϝāĻā§āϤāĻŋ āύāϝāĻŧ: āϏāĻŽāϝāĻŧāϏā§āĻā§ āύāĻŋāϰā§āĻā§āώāĻŖ āĻāϰā§, āĻĄā§āĻ āĻāĻĒāĻĄā§āĻ āĻāϰā§, āĻāĻžāĻāĻā§āϞāĻŋ āĻāĻžāϞ⧠āĻāϰā§āĨ¤
āϏāĻžāϧāĻžāϰāĻŖāĻāĻžāĻŦā§, āĻĒā§āϰāĻžāύ⧠āϏāĻāϏā§āĻāϰāĻŖāĻā§āϞāĻŋāϤā§, āϤāĻžāϰ āϏā§āĻŽā§āϤāĻŋāϤ⧠āϏāĻŽāϏā§āϝāĻž āĻāĻŋāϞ (āύāĻž, āĻ ā§āϝāĻžāĻŽāύā§āϏāĻŋāϝāĻŧāĻž āύāϝāĻŧ, āϤāĻŦā§ āĻĢāĻžāĻāϏ) āĻāĻŦāĻ āϞāĻŋāĻā§āϝāĻžāϏāĻŋ āĻĒā§āϝāĻžāϰāĻžāĻŽāĻŋāĻāĻžāϰāĻāĻŋ āĻāĻŽāύāĻāĻŋ āĻāύāĻĢāĻŋāĻāĻžāϰ⧠āϰāϝāĻŧā§ āĻā§āĻā§
run_duration- āĻāĻāĻŋ āĻĒā§āύāϰāĻžāϝāĻŧ āĻāĻžāϞ⧠āĻāϰāĻžāϰ āĻŦā§āϝāĻŦāϧāĻžāύāĨ¤ āĻāĻŋāύā§āϤ⧠āĻāĻāύ āϏāĻŦ āĻ āĻŋāĻ āĻāĻā§āĨ¤ - DAG (āĻāϰāĻĢā§ "āĻĄā§āϝāĻžāĻ") - "āύāĻŋāϰā§āĻĻā§āĻļāĻŋāϤ āĻ
ā§āϝāĻžāϏāĻžāĻāĻā§āϞāĻŋāĻ āĻā§āϰāĻžāĻĢ", āĻāĻŋāύā§āϤ⧠āĻāĻ āϧāϰāύā§āϰ āϏāĻāĻā§āĻāĻž āĻā§āĻŦ āĻāĻŽ āϞā§āĻāĻā§ āĻŦāϞāϤ⧠āĻĒāĻžāϰāĻŦā§, āĻāĻŋāύā§āϤ⧠āĻāϏāϞ⧠āĻāĻāĻŋ āĻāĻā§ āĻ
āĻĒāϰā§āϰ āϏāĻžāĻĨā§ āĻāύā§āĻāĻžāϰāĻ
ā§āϝāĻžāĻā§āĻ āĻāϰāĻžāϰ āĻāύā§āϝ āĻāĻāĻāĻŋ āϧāĻžāϰāĻ (āύā§āĻā§ āĻĻā§āĻā§āύ) āĻŦāĻž SSIS-āĻ āĻĒā§āϝāĻžāĻā§āĻ āĻāĻŦāĻ āĻāύāĻĢāϰāĻŽā§āϝāĻžāĻāĻŋāĻāĻžāϝāĻŧ āĻāϝāĻŧāĻžāϰā§āĻāĻĢā§āϞā§-āĻāϰ āĻāĻāĻāĻŋ āĻ
ā§āϝāĻžāύāĻžāϞāĻāĨ¤ .
āĻĄā§āϝāĻžāĻāĻā§āϞāĻŋ āĻāĻžāĻĄāĻŧāĻžāĻ, āĻāĻāύāĻ āϏāĻžāĻŦāĻĄā§āϝāĻžāĻ āĻĨāĻžāĻāϤ⧠āĻĒāĻžāϰā§, āϤāĻŦā§ āĻāĻŽāϰāĻž āϏāĻŽā§āĻāĻŦāϤ āϏā§āĻā§āϞāĻŋ āĻĒāĻžāĻŦ āύāĻžāĨ¤
- DAG āϰāĻžāύ - āĻĒā§āϰāĻžāϰāĻŽā§āĻāĻŋāĻ āĻĄā§āĻ, āϝāĻž āϤāĻžāϰ āύāĻŋāĻāϏā§āĻŦ āĻŦāϰāĻžāĻĻā§āĻĻ āĻāϰāĻž āĻšāϝāĻŧ
execution_date. āĻāĻāĻ āĻĄā§āϝāĻžāĻā§āϰ āĻĄā§āϝāĻžāĻāϰāĻž āϏāĻŽāĻžāύā§āϤāϰāĻžāϞāĻāĻžāĻŦā§ āĻāĻžāĻ āĻāϰāϤ⧠āĻĒāĻžāϰ⧠(āϝāĻĻāĻŋ āĻāĻĒāύāĻŋ āĻ āĻŦāĻļā§āϝāĻ āĻāĻĒāύāĻžāϰ āĻāĻžāĻāĻā§āϞāĻŋāĻā§ āĻ āĻĻāĻŽā§āϝ āĻāϰ⧠āĻĨāĻžāĻā§āύ)āĨ¤ - āĻ
āĻĒāĻžāϰā§āĻāϰ āĻāĻāĻāĻŋ āύāĻŋāϰā§āĻĻāĻŋāώā§āĻ āĻā§āϰāĻŋāϝāĻŧāĻž āϏāĻŽā§āĻĒāĻžāĻĻāύā§āϰ āĻāύā§āϝ āĻĻāĻžāϝāĻŧā§ āĻā§āĻĄā§āϰ āĻā§āĻāϰā§āĨ¤ āϤāĻŋāύ āϧāϰāύā§āϰ āĻ
āĻĒāĻžāϰā§āĻāϰ āĻāĻā§:
- āĻāϰā§āĻŽāĻāĻŽāĻžāĻĻā§āϰ āĻĒā§āϰāĻŋāϝāĻŧ āĻŽāϤ
PythonOperator, āϝāĻž āϝā§āĻā§āύ⧠(āĻŦā§āϧ) āĻĒāĻžāĻāĻĨāύ āĻā§āĻĄ āĻāĻžāϞāĻžāϤ⧠āĻĒāĻžāϰā§; - āĻšāϏā§āϤāĻžāύā§āϤāϰ, āϝāĻž āϏā§āĻĨāĻžāύ āĻĨā§āĻā§ āĻ
āύā§āϝ āĻāĻžāϝāĻŧāĻāĻžāϝāĻŧ āĻĄā§āĻāĻž āĻĒāϰāĻŋāĻŦāĻšāύ āĻāϰā§, āĻŦāϞā§,
MsSqlToHiveTransfer; - āϏā§āύā§āϏāϰ āĻ
āύā§āϝāĻĻāĻŋāĻā§, āĻāĻāĻŋ āĻāĻĒāύāĻžāĻā§ āĻĒā§āϰāϤāĻŋāĻā§āϰāĻŋāϝāĻŧāĻž āĻāĻžāύāĻžāϤ⧠āĻŦāĻž āĻāĻāĻāĻŋ āĻāĻā§āύā§āĻ āύāĻž āĻšāĻāϝāĻŧāĻž āĻĒāϰā§āϝāύā§āϤ āĻĄā§āϝāĻžāĻāĻāĻŋāϰ āĻĒāϰāĻŦāϰā§āϤ⧠āϏāĻŽā§āĻĒāĻžāĻĻāύāĻā§ āϧā§āϰ āĻāϰāĻžāϰ āĻ
āύā§āĻŽāϤāĻŋ āĻĻā§āĻŦā§āĨ¤
HttpSensorāύāĻŋāϰā§āĻĻāĻŋāώā§āĻ āĻāύā§āĻĄāĻĒāϝāĻŧā§āύā§āĻ āĻāĻžāύāϤ⧠āĻĒāĻžāϰā§, āĻāĻŦāĻ āϝāĻāύ āĻāĻžāĻā§āĻā§āώāĻŋāϤ āĻĒā§āϰāϤāĻŋāĻā§āϰāĻŋāϝāĻŧāĻž āĻ āĻĒā§āĻā§āώāĻž āĻāϰāĻā§, āϏā§āĻĨāĻžāύāĻžāύā§āϤāϰ āĻļā§āϰ⧠āĻāϰā§āύGoogleCloudStorageToS3Operator. āĻāĻāĻāĻŋ āĻ āύā§āϏāύā§āϧāĻŋā§āϏ⧠āĻŽāύ āĻāĻŋāĻā§āĻāĻžāϏāĻž āĻāϰāĻŦā§: "āĻā§āύ? āϏāϰā§āĻŦā§āĻĒāϰāĻŋ, āĻāĻĒāύāĻŋ āĻ āĻŋāĻ āĻ āĻĒāĻžāϰā§āĻāϰ⧠āĻĒā§āύāϰāĻžāĻŦā§āϤā§āϤāĻŋ āĻāϰāϤ⧠āĻĒāĻžāϰā§āύ!" āĻāĻŦāĻ āϤāĻžāϰāĻĒāϰ, āϏā§āĻĨāĻāĻŋāϤ āĻ āĻĒāĻžāϰā§āĻāϰāĻĻā§āϰ āϏāĻžāĻĨā§ āĻāĻžāĻā§āϰ āĻĒā§āϞ āĻāĻāĻā§ āύāĻž āĻĻā§āĻāϝāĻŧāĻžāϰ āĻāύā§āϝāĨ¤ āϏā§āύā§āϏāϰ āĻļā§āϰ⧠āĻšāϝāĻŧ, āĻā§āĻ āĻāϰ⧠āĻāĻŦāĻ āĻĒāϰāĻŦāϰā§āϤ⧠āĻĒā§āϰāĻā§āώā§āĻāĻžāϰ āĻāĻā§ āĻŽāĻžāϰāĻž āϝāĻžāϝāĻŧāĨ¤
- āĻāϰā§āĻŽāĻāĻŽāĻžāĻĻā§āϰ āĻĒā§āϰāĻŋāϝāĻŧ āĻŽāϤ
- āĻāĻžāϰā§āϝ - āĻā§āώāĻŋāϤ āĻ āĻĒāĻžāϰā§āĻāϰ, āĻĒā§āϰāĻāĻžāϰ āύāĻŋāϰā§āĻŦāĻŋāĻļā§āώā§, āĻāĻŦāĻ āĻĄā§āϝāĻžāĻā§āϰ āϏāĻžāĻĨā§ āϏāĻāϝā§āĻā§āϤ āĻāϰā§āĻŽā§āϰ āĻĒāĻĻā§ āĻāύā§āύā§āϤ āĻšāϝāĻŧāĨ¤
- āĻāĻžāϏā§āĻ āĻāĻĻāĻžāĻšāϰāĻŖ - āϝāĻāύ āϏāĻžāϧāĻžāϰāĻŖ āĻĒāϰāĻŋāĻāϞā§āĻĒāύāĻžāĻāĻžāϰ⧠āϏāĻŋāĻĻā§āϧāĻžāύā§āϤ āύāĻŋāϝāĻŧā§āĻāĻŋāϞā§āύ āϝ⧠āĻĒāĻžāϰāĻĢāϰā§āĻŽāĻžāϰ-āĻāϰā§āĻŽā§āĻĻā§āϰ āϝā§āĻĻā§āϧ⧠āĻāĻžāĻ āĻĒāĻžāĻ āĻžāύā§āϰ āϏāĻŽāϝāĻŧ āĻāϏā§āĻā§ (āĻ āĻŋāĻ āĻāĻžāϝāĻŧāĻāĻžāϝāĻŧ, āϝāĻĻāĻŋ āĻāĻŽāϰāĻž āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰāĻŋ
LocalExecutorāĻŦāĻž āĻā§āώā§āϤā§āϰ⧠āĻāĻāĻāĻŋ āĻĻā§āϰāĻŦāϰā§āϤ⧠āύā§āĻĄCeleryExecutor), āĻāĻāĻŋ āϤāĻžāĻĻā§āϰ āĻāĻāĻāĻŋ āĻĒā§āϰāϏāĻā§āĻ āĻŦāϰāĻžāĻĻā§āĻĻ āĻāϰ⧠(āĻ āϰā§āĻĨāĻžā§, āĻā§āϰāĻŋāϝāĻŧā§āĻŦāϞā§āϰ āĻāĻāĻāĻŋ āϏā§āĻ - āĻāĻā§āϏāĻŋāĻāĻŋāĻāĻļāύ āĻĒā§āϝāĻžāϰāĻžāĻŽāĻŋāĻāĻžāϰ), āĻāĻŽāĻžāύā§āĻĄ āĻŦāĻž āĻā§āϝā§āϝāĻŧāĻžāϰ⧠āĻā§āĻŽāĻĒā§āϞā§āĻ āĻĒā§āϰāϏāĻžāϰāĻŋāϤ āĻāϰ⧠āĻāĻŦāĻ āϤāĻžāĻĻā§āϰ āĻĒā§āϞ āĻāϰā§āĨ¤
āĻāĻŽāϰāĻž āĻāĻžāĻ āϤā§āϰāĻŋ āĻāϰāĻŋ
āĻĒā§āϰāĻĨāĻŽā§, āĻāϏā§āύ āĻāĻŽāĻžāĻĻā§āϰ āĻĄāĻā§āϰ āϏāĻžāϧāĻžāϰāĻŖ āϏā§āĻāĻŋāĻŽāĻāĻŋāϰ āϰā§āĻĒāϰā§āĻāĻž āĻĻāĻŋāĻ, āĻāĻŦāĻ āϤāĻžāϰāĻĒāϰ⧠āĻāĻŽāϰāĻž āĻāϰāĻ āĻŦā§āĻļāĻŋ āĻāϰ⧠āĻŦāĻŋāĻļāĻĻāĻāĻŋāϤ⧠āĻĄā§āĻŦ āĻĻā§āĻŦ, āĻāĻžāϰāĻŖ āĻāĻŽāϰāĻž āĻāĻŋāĻā§ āĻ -āϤā§āĻā§āĻ āϏāĻŽāĻžāϧāĻžāύ āĻĒā§āϰāϝāĻŧā§āĻ āĻāϰāĻŋāĨ¤
āϏā§āϤāϰāĻžāĻ, āĻāϰ āϏāĻšāĻāϤāĻŽ āĻāĻāĻžāϰā§, āĻāĻ āĻāĻžāϤā§āϝāĻŧ āĻĄā§āϝāĻžāĻ āĻāϰ āĻŽāϤ⧠āĻĻā§āĻāĻžāĻŦā§:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)āĻāϏā§āύ āĻāĻāĻŋ āĻŦā§āϰ āĻāϰāĻž āϝāĻžāĻ:
- āĻĒā§āϰāĻĨāĻŽāϤ, āĻāĻŽāϰāĻž āĻĒā§āϰāϝāĻŧā§āĻāύā§āϝāĻŧ libs āĻāĻŦāĻ āĻāĻŽāĻĻāĻžāύāĻŋ āĻāϰāĻŋ āĻ āύā§āϝāĻāĻŋāĻā§;
sql_server_ds- āĻāĻāĻžList[namedtuple[str, str]]āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āĻāĻžāύā§āĻāĻļāύ āĻĨā§āĻā§ āϏāĻāϝā§āĻā§āϰ āύāĻžāĻŽ āĻāĻŦāĻ āĻĄāĻžāĻāĻžāĻŦā§āϏ āϝāĻž āĻĨā§āĻā§ āĻāĻŽāϰāĻž āĻāĻŽāĻžāĻĻā§āϰ āĻĒā§āϞā§āĻ āύā§āĻŦ;dag- āĻāĻŽāĻžāĻĻā§āϰ āĻĄā§āĻ āĻā§āώāĻŖāĻž, āϝāĻž āĻ āĻāϤā§āϝāĻž āĻŽāϧā§āϝ⧠āĻšāϤ⧠āĻšāĻŦā§globals(), āĻ āύā§āϝāĻĨāĻžāϝāĻŧ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āĻāĻāĻŋ āĻā§āĻāĻā§ āĻĒāĻžāĻŦā§ āύāĻžāĨ¤ āĻĄāĻāĻā§āĻ āĻŦāϞāϤ⧠āĻšāĻŦā§:- āϤāĻžāϰ āύāĻžāĻŽ āĻāĻŋ
orders- āĻāĻ āύāĻžāĻŽāĻāĻŋ āϤāĻāύ āĻāϝāĻŧā§āĻŦ āĻāύā§āĻāĻžāϰāĻĢā§āϏ⧠āĻĒā§āϰāĻĻāϰā§āĻļāĻŋāϤ āĻšāĻŦā§, - āϝ⧠āϤāĻŋāύāĻŋ āĻāĻāĻ āĻā§āϞāĻžāĻ āĻŽāϧā§āϝāϰāĻžāϤ āĻĨā§āĻā§ āĻāĻžāĻ āĻāϰāĻŦā§āύ,
- āĻāĻŦāĻ āĻāĻāĻŋ āĻāĻžāϞāĻžāύ⧠āĻāĻāĻŋāϤ, āĻĒā§āϰāĻžāϝāĻŧ āĻĒā§āϰāϤāĻŋ 6 āĻāύā§āĻāĻž (āĻāϰ āĻĒāϰāĻŋāĻŦāϰā§āϤ⧠āĻāĻāĻžāύ⧠āĻāĻ āĻŋāύ āϞā§āĻāĻĻā§āϰ āĻāύā§āϝ
timedelta()āĻā§āϰāĻšāĻŖāϝā§āĻā§āϝcron-āϞāĻžāĻāύ0 0 0/6 ? * * *, āĻāĻŽ āĻ āĻžāύā§āĻĄāĻž āĻāύā§āϝ - āĻŽāϤ āĻāĻāĻāĻŋ āĻ āĻāĻŋāĻŦā§āϝāĻā§āϤāĻŋ@daily);
- āϤāĻžāϰ āύāĻžāĻŽ āĻāĻŋ
workflow()āĻŽā§āϞ āĻāĻžāĻ āĻāϰāĻŦā§, āĻāĻŋāύā§āϤ⧠āĻāĻāύ āύāϝāĻŧāĨ¤ āĻāĻĒāĻžāϤāϤ, āĻāĻŽāϰāĻž āĻļā§āϧ⧠āϞāĻā§ āĻāĻŽāĻžāĻĻā§āϰ āĻĒā§āϰāϏāĻā§āĻ āĻĄāĻžāĻŽā§āĻĒ āĻāϰāĻŦāĨ¤- āĻāĻŦāĻ āĻāĻāύ āĻāĻžāĻ āϤā§āϰāĻŋ āĻāϰāĻžāϰ āϏāĻšāĻ āϝāĻžāĻĻā§:
- āĻāĻŽāϰāĻž āĻāĻŽāĻžāĻĻā§āϰ āĻāϤā§āϏ āĻŽāĻžāϧā§āϝāĻŽā§ āĻāĻžāϞāĻžāύ;
- āĻāϰāĻŽā§āĻ āĻāϰāĻž
PythonOperator, āϝāĻž āĻāĻŽāĻžāĻĻā§āϰ āĻĄāĻžāĻŽāĻŋ āĻāĻžāϰā§āϝāĻāϰ āĻāϰāĻŦā§workflow(). āĻāĻžāϏā§āĻā§āϰ āĻāĻāĻāĻŋ āĻ āύāύā§āϝ (āĻĄā§āĻā§āϰ āĻŽāϧā§āϝā§) āύāĻžāĻŽ āĻāϞā§āϞā§āĻ āĻāϰāϤ⧠āĻā§āϞāĻŦā§āύ āύāĻž āĻāĻŦāĻ āĻĄā§āϝāĻžāĻāĻāĻŋ āύāĻŋāĻā§āĻ āĻŦā§āĻāϧ⧠āĻĻāĻŋāύāĨ¤ āĻĒāϤāĻžāĻāĻžprovide_contextāĻĒāϰāĻŋāĻŦāϰā§āϤā§, āĻĢāĻžāĻāĻļāύ⧠āĻ āϤāĻŋāϰāĻŋāĻā§āϤ āĻāϰā§āĻā§āĻŽā§āύā§āĻ āĻĸāĻžāϞāĻž āĻšāĻŦā§, āϝāĻž āĻāĻŽāϰāĻž āϏāĻžāĻŦāϧāĻžāύ⧠āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰ⧠āϏāĻāĻā§āϰāĻš āĻāϰāĻŦ**context.
āĻāĻĒāĻžāϤāϤ āĻāĻāĻā§āĻā§āĻāĨ¤ āĻāĻŽāϰāĻž āϝāĻž āĻĒā§āϝāĻŧā§āĻāĻŋ:
- āĻāϝāĻŧā§āĻŦ āĻāύā§āĻāĻžāϰāĻĢā§āϏ⧠āύāϤā§āύ āĻĄā§āĻ,
- āĻĻā§āĻĄāĻŧ āĻļāϤāĻžāϧāĻŋāĻ āĻāĻžāĻ āϝāĻž āϏāĻŽāĻžāύā§āϤāϰāĻžāϞāĻāĻžāĻŦā§ āĻāĻžāϰā§āϝāĻāϰ āĻāϰāĻž āĻšāĻŦā§ (āϝāĻĻāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞā§, āϏā§āϞāĻžāϰāĻŋ āϏā§āĻāĻŋāĻāϏ āĻāĻŦāĻ āϏāĻžāϰā§āĻāĻžāϰā§āϰ āĻā§āώāĻŽāϤāĻž āĻ āύā§āĻŽāϤāĻŋ āĻĻā§āϝāĻŧ)āĨ¤
āĻāϝāĻŧā§āϞ, āĻĒā§āϰāĻžāϝāĻŧ āĻĒā§āϝāĻŧā§āĻāĻŋāϞāĻžāĻŽ.

āĻā§ āύāĻŋāϰā§āĻāϰāϤāĻž āĻāύāϏā§āĻāϞ āĻāϰāĻŦā§?
āĻāĻ āĻĒā§āϰ⧠āĻāĻŋāύāĻŋāϏāĻāĻŋ āϏāĻšāĻ āĻāϰāĻžāϰ āĻāύā§āϝ, āĻāĻŽāĻŋ āϏā§āĻā§āϰ⧠āĻāϰā§āĻāĻŋ docker-compose.yml āĻĒā§āϰāĻā§āϰāĻŋāϝāĻŧāĻžāĻāϰāĻŖ requirements.txt āϏāĻŦ āύā§āĻĄā§āϰ āĻāĻĒāϰāĨ¤
āĻāĻāύ āĻāĻāĻŋ āĻāϞ⧠āĻā§āĻā§:

āϧā§āϏāϰ āϏā§āĻā§āϝāĻŧāĻžāϰāĻā§āϞāĻŋ āĻšāϞ āϏāĻŽāϝāĻŧāϏā§āĻā§ āĻĻā§āĻŦāĻžāϰāĻž āĻĒā§āϰāĻā§āϰāĻŋāϝāĻŧāĻžāĻā§āϤ āĻāĻžāϏā§āĻ āĻĻā§āώā§āĻāĻžāύā§āϤāĨ¤
āĻāĻŽāϰāĻž āĻāĻāĻā§ āĻ āĻĒā§āĻā§āώāĻž āĻāϰāĻŋ, āĻāϰā§āĻŽā§āϰāĻž āĻāĻžāĻāĻā§āϞ⧠āĻŦāύā§āϧ āĻāϰ⧠āĻĻā§āύ:

āϏāĻŦā§āĻāϰāĻž āĻ āĻŦāĻļā§āϝāĻ āϤāĻžāĻĻā§āϰ āĻāĻžāĻ āϏāĻĢāϞāĻāĻžāĻŦā§ āϏāĻŽā§āĻĒāύā§āύ āĻāϰā§āĻā§āĨ¤ āϞāĻžāϞ āĻā§āĻŦ āϏāĻĢāϞ āĻšāϝāĻŧ āύāĻžāĨ¤
āϝāĻžāĻāĻšā§āĻ, āĻāĻŽāĻžāĻĻā§āϰ āĻĒā§āϰā§āĻĄā§ āĻā§āύ āĻĢā§āϞā§āĻĄāĻžāϰ āύā§āĻ
./dags, āĻŽā§āĻļāĻŋāύā§āϰ āĻŽāϧā§āϝ⧠āĻā§āύ āϏāĻŋāĻā§āĻā§āϰā§āύāĻžāĻāĻā§āĻļāύ āύā§āĻ - āϏāĻŦ āĻĄā§āϝāĻžāĻ āĻļā§āϝāĻŧā§ āĻāĻā§gitāĻāĻŽāĻžāĻĻā§āϰ āĻāĻŋāĻāϞā§āϝāĻžāĻŦā§, āĻāĻŦāĻ āĻāĻŋāĻāϞā§āϝāĻžāĻŦ āϏāĻŋāĻāĻ āĻŽā§āĻļāĻŋāύ⧠āĻāĻāϤā§āϰāĻŋāϤ āĻšāĻāϝāĻŧāĻžāϰ āϏāĻŽāϝāĻŧ āĻāĻĒāĻĄā§āĻāĻā§āϞāĻŋ āĻŦāĻŋāϤāϰāĻŖ āĻāϰā§master.
āĻĢā§āϞ āϏāĻŽā§āĻĒāϰā§āĻā§ āĻāĻāĻā§
āĻļā§āϰāĻŽāĻŋāĻāϰāĻž āϝāĻāύ āĻāĻŽāĻžāĻĻā§āϰ āĻĒā§āϰāĻļāĻžāύā§āϤāĻŋāĻā§ āĻŽāĻžāϰāĻā§, āϤāĻāύ āĻāϏā§āύ āĻāϰā§āĻāĻāĻŋ āĻšāĻžāϤāĻŋāϝāĻŧāĻžāϰ āĻŽāύ⧠āϰāĻžāĻāĻŋ āϝāĻž āĻāĻŽāĻžāĻĻā§āϰ āĻāĻŋāĻā§ āĻĻā§āĻāĻžāϤ⧠āĻĒāĻžāϰ⧠- āĻĢā§āϞāĻžāĻāϝāĻŧāĻžāϰāĨ¤
āĻāϰā§āĻŽā§ āύā§āĻĄā§āϰ āϏāĻāĻā§āώāĻŋāĻĒā§āϤ āϤāĻĨā§āϝ āϏāĻš āĻĒā§āϰāĻĨāĻŽ āĻĒā§āώā§āĻ āĻž:

āĻāĻžāĻāĻā§āϞāĻŋ āϏāĻš āϏāĻŦāĻā§āϝāĻŧā§ āϤā§āĻŦā§āϰ āĻĒā§āώā§āĻ āĻž āϝāĻž āĻāĻžāĻ āĻāϰāϤ⧠āĻāĻŋāϝāĻŧā§āĻāĻŋāϞ:

āĻāĻŽāĻžāĻĻā§āϰ āĻŦā§āϰā§āĻāĻžāϰā§āϰ āϏā§āĻā§āϝāĻžāĻāĻžāϏ āϏāĻš āϏāĻŦāĻā§āϝāĻŧā§ āĻŦāĻŋāϰāĻā§āϤāĻŋāĻāϰ āĻĒā§āώā§āĻ āĻž:

āϏāĻŦāĻā§āϝāĻŧā§ āĻāĻā§āĻā§āĻŦāϞ āĻĒā§āώā§āĻ āĻžāĻāĻŋ āĻāĻžāϏā§āĻ āϏā§āĻā§āϝāĻžāĻāĻžāϏ āĻā§āϰāĻžāĻĢ āĻāĻŦāĻ āϤāĻžāĻĻā§āϰ āĻāĻžāϰā§āϝāĻāϰ āĻāϰāĻžāϰ āϏāĻŽāϝāĻŧ āϏāĻš:

āĻāĻŽāϰāĻž underloaded āϞā§āĻĄ
āϏā§āϤāϰāĻžāĻ, āϏāĻŽāϏā§āϤ āĻāĻžāĻ āĻļā§āώ āĻšāϝāĻŧā§āĻā§, āĻāĻĒāύāĻŋ āĻāĻšāϤāĻĻā§āϰ āĻŦāĻšāύ āĻāϰāϤ⧠āĻĒāĻžāϰā§āύāĨ¤

āĻāĻŦāĻ āϏā§āĻāĻžāύ⧠āĻ āύā§āĻ āĻāĻšāϤ āĻšāϝāĻŧā§āĻāĻŋāϞ - āĻāĻ āĻŦāĻž āĻ āύā§āϝ āĻāĻžāϰāĻŖā§āĨ¤ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āϏāĻ āĻŋāĻ āĻŦā§āϝāĻŦāĻšāĻžāϰā§āϰ āĻā§āώā§āϤā§āϰā§, āĻāĻ āĻā§āĻŦ āϏā§āĻā§āϝāĻŧāĻžāϰāĻā§āϞāĻŋ āύāĻŋāϰā§āĻĻā§āĻļ āĻāϰ⧠āϝ⧠āĻĄā§āĻāĻž āύāĻŋāĻļā§āĻāĻŋāϤāĻāĻžāĻŦā§ āĻāϏā§āύāĻŋāĨ¤
āĻāĻĒāύāĻžāĻā§ āϞāĻāĻāĻŋ āĻĻā§āĻāϤ⧠āĻšāĻŦā§ āĻāĻŦāĻ āĻĒāϤāĻŋāϤ āĻāĻžāϏā§āĻ āĻāύāϏā§āĻā§āϝāĻžāύā§āϏāĻā§āϞāĻŋ āĻĒā§āύāϰāĻžāϝāĻŧ āĻāĻžāϞ⧠āĻāϰāϤ⧠āĻšāĻŦā§āĨ¤
āϝā§āĻā§āύ⧠āϏā§āĻā§āϝāĻŧāĻžāϰ⧠āĻā§āϞāĻŋāĻ āĻāϰā§, āĻāĻŽāϰāĻž āĻāĻŽāĻžāĻĻā§āϰ āĻāύā§āϝ āĻāĻĒāϞāĻŦā§āϧ āĻ ā§āϝāĻžāĻāĻļāύ āĻĻā§āĻāϤ⧠āĻĒāĻžāĻŦ:

āĻāĻĒāύāĻŋ āύāĻŋāϤ⧠āĻĒāĻžāϰā§āύ āĻāĻŦāĻ āĻĒāϤāĻŋāϤ āĻĒāϰāĻŋāώā§āĻāĻžāϰ āĻāϰāϤ⧠āĻĒāĻžāϰā§āύ. āĻ āϰā§āĻĨāĻžā§, āĻāĻŽāϰāĻž āĻā§āϞ⧠āϝāĻžāĻ āϝ⧠āϏā§āĻāĻžāύ⧠āĻāĻŋāĻā§ āĻŦā§āϝāϰā§āĻĨ āĻšāϝāĻŧā§āĻā§ āĻāĻŦāĻ āĻāĻāĻ āĻāĻĻāĻžāĻšāϰāĻŖ āĻāĻžāϏā§āĻ āĻļāĻŋāĻĄāĻŋāĻāϞāĻžāϰā§āϰ āĻāĻžāĻā§ āϝāĻžāĻŦā§āĨ¤

āĻāĻāĻž āϏā§āĻĒāώā§āĻ āϝ⧠āϏāĻŽāϏā§āϤ āϞāĻžāϞ āϏā§āĻā§āϝāĻŧāĻžāϰā§āϰ āϏāĻžāĻĨā§ āĻŽāĻžāĻāϏ āĻĻāĻŋāϝāĻŧā§ āĻāĻāĻŋ āĻāϰāĻž āĻā§āĻŦ āĻŽāĻžāύāĻŦāĻŋāĻ āύāϝāĻŧ - āĻāĻāĻŋ āĻāĻŽāϰāĻž āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āĻĨā§āĻā§ āĻāĻļāĻž āĻāϰāĻŋ āύāĻžāĨ¤ āϏā§āĻŦāĻžāĻāĻžāĻŦāĻŋāĻāĻāĻžāĻŦā§āĻ, āĻāĻŽāĻžāĻĻā§āϰ āĻāĻžāĻā§ āĻāĻŖāĻŦāĻŋāϧā§āĻŦāĻāϏ⧠āĻ
āϏā§āϤā§āϰ āϰāϝāĻŧā§āĻā§: Browse/Task Instances

āĻāϏā§āύ āĻāĻāĻŦāĻžāϰ⧠āϏāĻŦāĻāĻŋāĻā§ āύāĻŋāϰā§āĻŦāĻžāĻāύ āĻāϰāĻŋ āĻāĻŦāĻ āĻļā§āύā§āϝ⧠āϰāĻŋāϏā§āĻ āĻāϰāĻŋ, āϏāĻ āĻŋāĻ āĻāĻāĻā§āĻŽāĻāĻŋāϤ⧠āĻā§āϞāĻŋāĻ āĻāϰā§āύ:

āĻĒāϰāĻŋāώā§āĻāĻžāϰ āĻāϰāĻžāϰ āĻĒāϰā§, āĻāĻŽāĻžāĻĻā§āϰ āĻā§āϝāĻžāĻā§āϏāĻŋāĻā§āϞāĻŋ āĻāĻāϰāĻāĻŽ āĻĻā§āĻāĻžāϝāĻŧ (āϤāĻžāϰāĻž āĻāϤāĻŋāĻŽāϧā§āϝ⧠āĻļāĻŋāĻĄāĻŋāĻāϞāĻžāϰā§āϰ āĻāύā§āϝ āϤāĻžāĻĻā§āϰ āϏāĻŽāϝāĻŧāϏā§āĻā§āϰ āĻāύā§āϝ āĻ āĻĒā§āĻā§āώāĻž āĻāϰāĻā§):

āϏāĻāϝā§āĻ, āĻšā§āĻ āĻāĻŦāĻ āĻ āύā§āϝāĻžāύā§āϝ āĻā§āϰāĻŋāϝāĻŧā§āĻŦāϞ
āĻāĻāĻž āĻĒāϰāĻŦāϰā§āϤ⧠DAG āĻĻā§āĻāĻžāϰ āϏāĻŽāϝāĻŧ, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ĐĐžŅĐŋОда Ņ
ĐžŅĐžŅиĐĩ, ĐžŅŅĐĩŅŅ ĐžĐąĐŊОвĐģĐĩĐŊŅ"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ĐаŅаŅ, ĐŋŅĐžŅŅĐŋаКŅŅ, ĐŧŅ {{ dag.dag_id }} ŅŅĐžĐŊиĐģи
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]āϏāĻŦāĻžāĻ āĻāĻŋ āĻāĻāύ⧠āϰāĻŋāĻĒā§āϰā§āĻ āĻāĻĒāĻĄā§āĻ āĻāϰā§āĻā§? āĻāĻāĻŋ āĻāĻŦāĻžāϰ āϤāĻžāϰ: āϤāĻĨā§āϝ āĻā§āĻĨāĻž āĻĨā§āĻā§ āĻĒā§āϤ⧠āĻšāĻŦā§ āϤāĻžāϰ āĻāĻāĻāĻŋ āϤāĻžāϞāĻŋāĻāĻž āϰāϝāĻŧā§āĻā§; āϝā§āĻāĻžāύ⧠āϰāĻžāĻāĻž āĻšāĻŦā§ āĻāĻāĻāĻŋ āϤāĻžāϞāĻŋāĻāĻž āĻāĻā§; āϝāĻāύ āϏāĻŦāĻāĻŋāĻā§ āĻāĻā§ āĻŦāĻž āĻā§āĻā§ āϝāĻžāϝāĻŧ āϤāĻāύ āĻšāϰā§āύ āĻĻāĻŋāϤ⧠āĻā§āϞāĻŦā§āύ āύāĻž (āĻāĻžāϞ, āĻāĻāĻŋ āĻāĻŽāĻžāĻĻā§āϰ āϏāĻŽā§āĻĒāϰā§āĻā§ āύāϝāĻŧ, āύāĻž)āĨ¤
āĻāϏā§āύ āĻāĻŦāĻžāϰ āĻĢāĻžāĻāϞā§āϰ āĻŽāϧā§āϝ āĻĻāĻŋāϝāĻŧā§ āϝāĻžāύ āĻāĻŦāĻ āύāϤā§āύ āĻ āϏā§āĻĒāώā§āĻ āĻāĻŋāύāĻŋāϏāĻā§āϞāĻŋ āĻĻā§āĻā§āύ:
from commons.operators import TelegramBotSendMessage- āĻā§āύ⧠āĻāĻŋāĻā§āĻ āĻāĻŽāĻžāĻĻā§āϰ āύāĻŋāĻāϏā§āĻŦ āĻ āĻĒāĻžāϰā§āĻāϰ āϤā§āϰāĻŋ āĻāϰāϤ⧠āĻŦāĻžāϧāĻž āĻĻā§āϝāĻŧ āύāĻž, āϝāĻžāϰ āϏā§āĻŦāĻŋāϧāĻž āĻāĻŽāϰāĻž āĻāύāĻŦā§āϞāĻāĻĄ-āĻ āĻŦāĻžāϰā§āϤāĻž āĻĒāĻžāĻ āĻžāύā§āϰ āĻāύā§āϝ āĻāĻāĻāĻŋ āĻā§āĻ āĻŽā§āĻĄāĻŧāĻ āϤā§āϰāĻŋ āĻāϰ⧠āύāĻŋāϝāĻŧā§āĻāĻŋāĨ¤ (āĻāĻŽāϰāĻž āύā§āĻā§ āĻāĻ āĻ āĻĒāĻžāϰā§āĻāϰ āϏāĻŽā§āĻĒāϰā§āĻā§ āĻāϰāĻ āĻāĻĨāĻž āĻŦāϞāĻŦ);default_args={}- dag āϤāĻžāϰ āϏāĻŽāϏā§āϤ āĻ āĻĒāĻžāϰā§āĻāϰāĻā§ āĻāĻāĻ āĻāϰā§āĻā§āĻŽā§āύā§āĻ āĻŦāĻŋāϤāϰāĻŖ āĻāϰāϤ⧠āĻĒāĻžāϰā§;to='{{ var.value.all_the_kings_men }}'- āĻā§āώā§āϤā§āϰtoāĻāĻŽāĻžāĻĻā§āϰ āĻšāĻžāϰā§āĻĄāĻā§āĻĄ āĻāϰāĻž āĻšāĻŦā§ āύāĻž, āϤāĻŦā§ āĻāĻŋāύāĻāĻž āĻāĻŦāĻ āĻāĻŽā§āϞā§āϰ āϤāĻžāϞāĻŋāĻāĻž āϏāĻš āĻāĻāĻāĻŋ āĻā§āϰāĻŋāϝāĻŧā§āĻŦāϞ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰ⧠āĻāϤāĻŋāĻļā§āϞāĻāĻžāĻŦā§ āϤā§āϰāĻŋ āĻāϰāĻž āĻšāϝāĻŧā§āĻā§, āϝāĻž āĻāĻŽāĻŋ āϏāĻžāĻŦāϧāĻžāύ⧠āϰā§āĻā§āĻāĻŋAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS- āĻ āĻĒāĻžāϰā§āĻāϰ āĻļā§āϰ⧠āĻāϰāĻžāϰ āĻļāϰā§āϤāĨ¤ āĻāĻŽāĻžāĻĻā§āϰ āĻā§āώā§āϤā§āϰā§, āϏāĻŽāϏā§āϤ āύāĻŋāϰā§āĻāϰāϤāĻž āĻāĻžāĻ āĻāϰāϞā§āĻ āĻāĻŋāĻ āĻŋāĻāĻŋ āĻŦāϏāĻĻā§āϰ āĻāĻžāĻā§ āϝāĻžāĻŦā§ āϏāĻžāĻĢāϞā§āϝā§āϰ āϏāĻžāĻĨā§;tg_bot_conn_id='tg_main'- āϝā§āĻā§āϤāĻŋconn_idāĻāĻŽāϰāĻž āϝ⧠āϏāĻāϝā§āĻ āĻāĻāĻĄāĻŋ āϤā§āϰāĻŋ āĻāϰāĻŋ āϤāĻž āĻā§āϰāĻšāĻŖ āĻāϰāĻŋAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- āĻĒāϤāĻŋāϤ āĻāĻžāĻ āĻĨāĻžāĻāϞā§āĻ āĻā§āϞāĻŋāĻā§āϰāĻžāĻŽā§āϰ āĻŦāĻžāϰā§āϤāĻžāĻā§āϞāĻŋ āĻāĻĄāĻŧā§ āϝāĻžāĻŦā§;task_concurrency=1- āĻāĻŽāϰāĻž āĻāĻāĻāĻŋ āĻāĻžāϏā§āĻā§āϰ āĻāĻāĻžāϧāĻŋāĻ āĻāĻžāϏā§āĻ āĻāύāϏā§āĻā§āϝāĻžāύā§āϏā§āϰ āĻāĻāϝā§āĻā§ āϞāĻā§āĻ āύāĻŋāώāĻŋāĻĻā§āϧ āĻāϰāĻŋ⧎ āϤāĻž āύāĻž āĻšāϞ⧠āĻāĻŽāϰāĻž āĻāĻāϏāĻā§āĻā§ āĻŦā§āĻļ āĻāϝāĻŧā§āĻāĻāĻŋ āϞāĻā§āĻ āĻĒāĻžāĻŦVerticaOperator(āĻāĻāĻāĻŋ āĻā§āĻŦāĻŋāϞā§āϰ āĻĻāĻŋāĻā§ āϤāĻžāĻāĻŋāϝāĻŧā§);report_update >> [email, tg]- āϏāĻŦVerticaOperatorāĻāĻŋāĻ āĻŋ āĻāĻŦāĻ āĻŦāĻžāϰā§āϤāĻž āĻĒā§āϰā§āϰāĻŖā§ āĻāĻāϤā§āϰāĻŋāϤ āĻšāύ, āĻāĻāĻāĻžāĻŦā§:

āĻāĻŋāύā§āϤ⧠āϝā§āĻšā§āϤ⧠āύā§āĻāĻŋāĻĢāĻžāϝāĻŧāĻžāϰ āĻ āĻĒāĻžāϰā§āĻāϰāĻĻā§āϰ āϞāĻā§āĻā§āϰ āĻŦāĻŋāĻāĻŋāύā§āύ āĻļāϰā§āϤ āϰāϝāĻŧā§āĻā§, āĻļā§āϧā§āĻŽāĻžāϤā§āϰ āĻāĻāĻāĻŋ āĻāĻžāĻ āĻāϰāĻŦā§āĨ¤ āĻā§āϰāĻŋ āĻāĻŋāĻāϤā§, āϏāĻŦāĻāĻŋāĻā§ āĻāĻāĻā§ āĻāĻŽ āĻāĻŋāĻā§āϝā§āϝāĻŧāĻžāϞ āĻĻā§āĻāĻžāϝāĻŧ:

āĻāĻŽāĻŋ āϏāĻŽā§āĻĒāϰā§āĻā§ āĻāϝāĻŧā§āĻāĻāĻŋ āĻļāĻŦā§āĻĻ āĻŦāϞāĻŦ āĻŽā§āϝāĻžāĻā§āϰ⧠āĻāĻŦāĻ āϤāĻžāĻĻā§āϰ āĻŦāύā§āϧā§āϰāĻž - āĻā§āϰāĻŋāϝāĻŧā§āĻŦāϞ.
āĻŽā§āϝāĻžāĻā§āϰ⧠āĻšāϞ āĻāĻŋāύāĻāĻž āĻĒā§āϞā§āϏāĻšā§āϞā§āĻĄāĻžāϰ āϝāĻž āĻ āĻĒāĻžāϰā§āĻāϰ āĻāϰā§āĻā§āĻŽā§āύā§āĻā§ āĻŦāĻŋāĻāĻŋāύā§āύ āĻĻāϰāĻāĻžāϰ⧠āϤāĻĨā§āϝ āĻĒā§āϰāϤāĻŋāϏā§āĻĨāĻžāĻĒāύ āĻāϰāϤ⧠āĻĒāĻžāϰā§āĨ¤ āĻāĻĻāĻžāĻšāϰāĻŖāϏā§āĻŦāϰā§āĻĒ, āĻāĻ āĻŽāϤ:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} āĻĒā§āϰāϏāĻā§āĻ āĻā§āϰāĻŋāϝāĻŧā§āĻŦāϞā§āϰ āĻŦāĻŋāώāϝāĻŧāĻŦāϏā§āϤā§āϤ⧠āĻĒā§āϰāϏāĻžāϰāĻŋāϤ āĻšāĻŦā§ execution_date āĻŦāĻŋāύā§āϝāĻžāϏ⧠YYYY-MM-DD: 2020-07-14. āϏāĻŦāĻā§āϝāĻŧā§ āĻāĻžāϞ⧠āĻĻāĻŋāĻ āĻšāϞ āϝ⧠āĻĒā§āϰāϏāĻā§āĻ āĻā§āϰāĻŋāϝāĻŧā§āĻŦāϞāĻā§āϞāĻŋ āĻāĻāĻāĻŋ āύāĻŋāϰā§āĻĻāĻŋāώā§āĻ āĻāĻžāϏā§āĻ āĻāύāϏā§āĻā§āϝāĻžāύā§āϏ⧠(āĻā§āϰāĻŋ āĻāĻŋāĻāϤ⧠āĻāĻāĻāĻŋ āĻŦāϰā§āĻāĻā§āώā§āϤā§āϰ) āĻĒā§āϰā§āĻ āĻĻā§āĻāϝāĻŧāĻž āĻšāϝāĻŧ āĻāĻŦāĻ āϝāĻāύ āĻĒā§āύāϰāĻžāϝāĻŧ āĻāĻžāϞ⧠āĻāϰāĻž āĻšāϝāĻŧ, āϏā§āĻĨāĻžāύāϧāĻžāϰāĻāĻā§āϞāĻŋ āĻāĻāĻ āĻŽāĻžāύāĻā§āϞāĻŋāϤ⧠āĻĒā§āϰāϏāĻžāϰāĻŋāϤ āĻšāĻŦā§āĨ¤
āύāĻŋāϰā§āϧāĻžāϰāĻŋāϤ āĻŽāĻžāύāĻā§āϞāĻŋ āĻĒā§āϰāϤāĻŋāĻāĻŋ āĻāĻžāϏā§āĻ āĻāύāϏā§āĻā§āϝāĻžāύā§āϏ⧠āϰā§āύā§āĻĄāĻžāϰ āĻāϰāĻž āĻŦā§āϤāĻžāĻŽ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰ⧠āĻĻā§āĻāĻž āϝā§āϤ⧠āĻĒāĻžāϰā§āĨ¤ āĻāĻāĻāĻžāĻŦā§ āĻāĻāĻāĻŋ āĻāĻŋāĻ āĻŋ āĻĒāĻžāĻ āĻžāύā§āϰ āĻāĻžāĻ:

āĻāĻŦāĻ āϤāĻžāĻ āĻāĻāĻāĻŋ āĻŦāĻžāϰā§āϤāĻž āĻĒāĻžāĻ āĻžāύā§āϰ āĻāĻžāĻā§:

āϏāϰā§āĻŦāĻļā§āώ āĻāĻĒāϞāĻŦā§āϧ āϏāĻāϏā§āĻāϰāĻŖā§āϰ āĻāύā§āϝ āĻ āύā§āϤāϰā§āύāĻŋāϰā§āĻŽāĻŋāϤ āĻŽā§āϝāĻžāĻā§āϰā§āĻā§āϞāĻŋāϰ āĻāĻāĻāĻŋ āϏāĻŽā§āĻĒā§āϰā§āĻŖ āϤāĻžāϞāĻŋāĻāĻž āĻāĻāĻžāύ⧠āĻāĻĒāϞāĻŦā§āϧ:
āϤāĻžāĻāĻžāĻĄāĻŧāĻž, āĻĒā§āϞāĻžāĻāĻāύāĻā§āϞāĻŋāϰ āϏāĻžāĻšāĻžāϝā§āϝā§, āĻāĻŽāϰāĻž āĻāĻŽāĻžāĻĻā§āϰ āύāĻŋāĻāϏā§āĻŦ āĻŽā§āϝāĻžāĻā§āϰ⧠āĻā§āώāĻŖāĻž āĻāϰāϤ⧠āĻĒāĻžāϰāĻŋ, āϤāĻŦā§ āĻāĻāĻŋ āĻ āύā§āϝ āĻāϞā§āĻĒāĨ¤
āĻĒā§āϰā§āĻŦāύāĻŋāϰā§āϧāĻžāϰāĻŋāϤ āĻāĻŋāύāĻŋāϏāĻā§āϞāĻŋ āĻāĻžāĻĄāĻŧāĻžāĻ, āĻāĻŽāϰāĻž āĻāĻŽāĻžāĻĻā§āϰ āĻā§āϰāĻŋāϝāĻŧā§āĻŦāϞā§āϰ āĻŽāĻžāύāĻā§āϞāĻŋāĻā§ āĻĒā§āϰāϤāĻŋāϏā§āĻĨāĻžāĻĒāύ āĻāϰāϤ⧠āĻĒāĻžāϰāĻŋ (āĻāĻŽāĻŋ āĻāϤāĻŋāĻŽāϧā§āϝ⧠āĻāĻĒāϰā§āϰ āĻā§āĻĄā§ āĻāĻāĻŋ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰā§āĻāĻŋ)āĨ¤ āĻāϰ āĻŽāϧā§āϝ⧠āϤā§āϰāĻŋ āĻāϰāĻž āϝāĻžāĻ Admin/Variables āĻāĻŋāĻā§ āĻāĻŋāύāĻŋāϏ:

āĻāĻĒāύāĻŋ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰāϤ⧠āĻĒāĻžāϰā§āύ āϏāĻŦāĻāĻŋāĻā§:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')āĻŽāĻžāύ āĻāĻāĻāĻŋ āϏā§āĻā§āϞāĻžāϰ āĻšāϤ⧠āĻĒāĻžāϰā§, āĻ āĻĨāĻŦāĻž āĻāĻāĻŋ JSONāĻ āĻšāϤ⧠āĻĒāĻžāϰā§āĨ¤ JSON āĻāϰ āĻā§āώā§āϤā§āϰā§:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}āĻļā§āϧ⧠āĻĒāĻāύā§āĻĻāϏāĻ āĻā§āĻāĻŋāϰ āĻĒāĻĨāĻāĻŋ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰā§āύ: {{ var.json.bot_config.bot.token }}.
āĻāĻŽāĻŋ āĻāĻā§āώāϰāĻŋāĻ āĻ
āϰā§āĻĨā§ āĻāĻāĻāĻŋ āĻļāĻŦā§āĻĻ āĻŦāϞāĻŦ āĻāĻŦāĻ āĻāĻāĻāĻŋ āϏā§āĻā§āϰāĻŋāύāĻļāĻ āĻĻā§āĻāĻžāĻŦ āϏāĻāϝā§āĻ. āϏāĻŦāĻāĻŋāĻā§ āĻāĻāĻžāύ⧠āĻĒā§āϰāĻžāĻĨāĻŽāĻŋāĻ: āĻĒā§āώā§āĻ āĻžāϝāĻŧ Admin/Connections āĻāĻŽāϰāĻž āĻāĻāĻāĻŋ āϏāĻāϝā§āĻ āϤā§āϰāĻŋ āĻāϰāĻŋ, āϏā§āĻāĻžāύ⧠āĻāĻŽāĻžāĻĻā§āϰ āϞāĻāĻāύ/āĻĒāĻžāϏāĻāϝāĻŧāĻžāϰā§āĻĄ āĻāĻŦāĻ āĻāϰāĻ āύāĻŋāϰā§āĻĻāĻŋāώā§āĻ āĻĒā§āϝāĻžāϰāĻžāĻŽāĻŋāĻāĻžāϰ āϝā§āĻ āĻāϰāĻŋāĨ¤ āĻāĻāĻžāϰ āĻŽāϤ:

āĻĒāĻžāϏāĻāϝāĻŧāĻžāϰā§āĻĄāĻā§āϞāĻŋ āĻāύāĻā§āϰāĻŋāĻĒā§āĻ āĻāϰāĻž āϝā§āϤ⧠āĻĒāĻžāϰ⧠(āĻĄāĻŋāĻĢāϞā§āĻā§āϰ āĻā§āϝāĻŧā§ āĻāϰāĻ āĻĒā§āĻā§āĻāĻžāύā§āĻĒā§āĻā§āĻāĻāĻžāĻŦā§), āĻ
āĻĨāĻŦāĻž āĻāĻĒāύāĻŋ āϏāĻāϝā§āĻā§āϰ āϧāϰāύāĻāĻŋ āĻā§āĻĄāĻŧā§ āĻĻāĻŋāϤ⧠āĻĒāĻžāϰā§āύ (āϝā§āĻŽāύ āĻāĻŽāĻŋ āĻāϰā§āĻāĻŋ tg_main) - āĻāϏāϞ āĻŦāĻŋāώāϝāĻŧāĻāĻŋ āĻšāϞ āϝ⧠āĻĒā§āϰāĻāĻžāϰā§āϰ āϤāĻžāϞāĻŋāĻāĻžāĻāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āĻŽāĻĄā§āϞāĻā§āϞāĻŋāϤ⧠āĻšāĻžāϰā§āĻĄāĻāϝāĻŧā§āϝāĻžāϰāϝā§āĻā§āϤ āĻāĻŦāĻ āϏā§āϰā§āϏ āĻā§āĻĄāĻā§āϞāĻŋāϤ⧠āύāĻž āĻāĻŋāϝāĻŧā§ āĻĒā§āϰāϏāĻžāϰāĻŋāϤ āĻāϰāĻž āϝāĻžāϝāĻŧ āύāĻž (āϝāĻĻāĻŋ āĻšāĻ āĻžā§ āĻāĻŽāĻŋ āĻāĻŋāĻā§ āĻā§āĻāϞ āύāĻž āĻāϰāĻŋ āϤāĻŦā§ āĻĻāϝāĻŧāĻž āĻāϰ⧠āĻāĻŽāĻžāĻā§ āϏāĻāĻļā§āϧāύ āĻāϰā§āύ), āϤāĻŦā§ āĻāĻŋāĻā§āĻ āĻāĻŽāĻžāĻĻā§āϰ āĻā§āϰā§āĻĄāĻŋāĻ āĻĒā§āϤ⧠āĻŦāĻžāϧāĻž āĻĻā§āĻŦā§ āύāĻž āύāĻžāĻŽ
āĻāĻĒāύāĻŋ āĻāĻāĻ āύāĻžāĻŽā§āϰ āϏāĻžāĻĨā§ āĻŦā§āĻļ āĻāϝāĻŧā§āĻāĻāĻŋ āϏāĻāϝā§āĻāĻ āĻāϰāϤ⧠āĻĒāĻžāϰā§āύ: āĻāĻ āĻā§āώā§āϤā§āϰā§, āĻĒāĻĻā§āϧāϤāĻŋ BaseHook.get_connection(), āϝāĻž āĻāĻŽāĻžāĻĻā§āϰ āύāĻžāĻŽā§ āϏāĻāϝā§āĻ āĻĒāĻžāϝāĻŧ, āĻĻā§āĻŦā§ āĻāϞā§āĻŽā§āϞ⧠āĻŦāĻŋāĻāĻŋāύā§āύ āύāĻžāĻŽāĻāϰāĻŖ āĻĨā§āĻā§ (āϰāĻžāĻāύā§āĻĄ āϰāĻŦāĻŋāύ āϤā§āϰāĻŋ āĻāϰāĻž āĻāϰāĻ āϝā§āĻā§āϤāĻŋāĻ āĻšāĻŦā§, āϤāĻŦā§ āĻāĻāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āĻŦāĻŋāĻāĻžāĻļāĻāĻžāϰā§āĻĻā§āϰ āĻŦāĻŋāĻŦā§āĻā§āϰ āĻāĻĒāϰ āĻā§āĻĄāĻŧā§ āĻĻā§āĻāϝāĻŧāĻž āϝāĻžāĻ)āĨ¤
āĻā§āϰāĻŋāϝāĻŧā§āĻŦāϞ āĻāĻŦāĻ āϏāĻāϝā§āĻāĻā§āϞāĻŋ āĻ āĻŦāĻļā§āϝāĻ āĻĻā§āϰā§āĻĻāĻžāύā§āϤ āϏāϰāĻā§āĻāĻžāĻŽ, āϤāĻŦā§ āĻāĻžāϰāϏāĻžāĻŽā§āϝ āύāĻž āĻšāĻžāϰāĻžāύ⧠āĻā§āϰā§āϤā§āĻŦāĻĒā§āϰā§āĻŖ: āĻāĻĒāύāĻžāϰ āĻĒā§āϰāĻŦāĻžāĻšā§āϰ āĻā§āύ āĻ āĻāĻļāĻā§āϞāĻŋ āĻāĻĒāύāĻŋ āĻā§āĻĄā§ āϏāĻā§āĻāϝāĻŧ āĻāϰā§āύ āĻāĻŦāĻ āĻāĻĒāύāĻŋ āϏā§āĻā§āϰā§āĻā§āϰ āĻāύā§āϝ āĻā§āύ āĻ āĻāĻļāĻā§āϞāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞā§āĻā§ āĻĻā§āύ⧎ āĻāĻāĻĻāĻŋāĻā§, āĻĻā§āϰā§āϤ āĻŽāĻžāύ āĻĒāϰāĻŋāĻŦāϰā§āϤāύ āĻāϰāĻž āϏā§āĻŦāĻŋāϧāĻžāĻāύāĻ āĻšāϤ⧠āĻĒāĻžāϰā§, āĻāĻĻāĻžāĻšāϰāĻŖāϏā§āĻŦāϰā§āĻĒ, āĻāĻāĻāĻŋ āĻŽā§āĻāϞāĻŋāĻ āĻŦāĻā§āϏ, UI āĻāϰ āĻŽāĻžāϧā§āϝāĻŽā§āĨ¤ āĻ āύā§āϝāĻĻāĻŋāĻā§, āĻāĻāĻŋ āĻāĻāύāĻ āĻŽāĻžāĻāϏ āĻā§āϞāĻŋāĻā§ āĻĢāĻŋāϰ⧠āĻāϏāĻž, āϝāĻž āĻĨā§āĻā§ āĻāĻŽāϰāĻž (āĻāĻŽāĻŋ) āĻĒāϰāĻŋāϤā§āϰāĻžāĻŖ āĻĒā§āϤ⧠āĻā§āϝāĻŧā§āĻāĻŋāϞāĻžāĻŽāĨ¤
āϏāĻāϝā§āĻā§āϰ āϏāĻžāĻĨā§ āĻāĻžāĻ āĻāϰāĻž āĻāĻžāĻāĻā§āϞāĻŋāϰ āĻŽāϧā§āϝ⧠āĻāĻāĻāĻŋ āĻšā§āĻ. āϏāĻžāϧāĻžāϰāĻŖāĻāĻžāĻŦā§, āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āĻšā§āĻāĻā§āϞāĻŋ āĻāĻāĻŋāĻā§ āϤā§āϤā§āϝāĻŧ āĻĒāĻā§āώā§āϰ āĻĒāϰāĻŋāώā§āĻŦāĻž āĻāĻŦāĻ āϞāĻžāĻāĻŦā§āϰā§āϰāĻŋāϰ āϏāĻžāĻĨā§ āϏāĻāϝā§āĻā§āϤ āĻāϰāĻžāϰ āĻāύā§āϝ āĻĒāϝāĻŧā§āύā§āĻāĨ¤ āϝā§āĻŽāύ, JiraHook āĻāĻŋāϰāĻžāϰ āϏāĻžāĻĨā§ āϝā§āĻāĻžāϝā§āĻ āĻāϰāĻžāϰ āĻāύā§āϝ āĻāĻŽāĻžāĻĻā§āϰ āĻāύā§āϝ āĻāĻāĻāĻŋ āĻā§āϞāĻžāϝāĻŧā§āύā§āĻ āĻā§āϞāĻŦā§ (āĻāĻĒāύāĻŋ āĻāĻžāĻāĻā§āϞāĻŋ āϏāĻžāĻŽāύ⧠āĻāĻŦāĻ āĻĒāĻŋāĻāύ⧠āϏāϰāĻžāϤ⧠āĻĒāĻžāϰā§āύ), āĻāĻŦāĻ āĻāϰ āϏāĻžāĻšāĻžāϝā§āϝ⧠SambaHook āĻāĻĒāύāĻŋ āĻāĻāĻāĻŋ āϏā§āĻĨāĻžāύā§āϝāĻŧ āĻĢāĻžāĻāϞ āϧāĻžāĻā§āĻāĻž āĻĻāĻŋāϤ⧠āĻĒāĻžāϰā§āύ smb-āĻŦāĻŋāύā§āĻĻā§āĨ¤
āĻāĻžāϏā§āĻāĻŽ āĻ āĻĒāĻžāϰā§āĻāϰ āĻĒāĻžāϰā§āϏāĻŋāĻ
āĻāĻŦāĻ āĻāĻŽāϰāĻž āĻāĻāĻŋ āĻāĻŋāĻāĻžāĻŦā§ āϤā§āϰāĻŋ āĻāϰāĻž āĻšāϝāĻŧ āϤāĻž āĻĻā§āĻāϤ⧠āĻāĻžāĻāĻžāĻāĻžāĻāĻŋ āĻĒā§āϝāĻŧā§āĻāĻŋāϞāĻžāĻŽ TelegramBotSendMessage
āĻā§āĻĄ commons/operators.py āĻĒā§āϰāĻā§āϤ āĻ
āĻĒāĻžāϰā§āĻāϰā§āϰ āϏāĻžāĻĨā§:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)āĻāĻāĻžāύā§, āĻāϝāĻŧāĻžāϰāĻĢā§āϞā§āϤ⧠āĻ āύā§āϝ āϏāĻŦāĻāĻŋāĻā§āϰ āĻŽāϤā§, āϏāĻŦāĻāĻŋāĻā§ āĻā§āĻŦ āϏāĻšāĻ:
- āĻĨā§āĻā§ āĻāϤā§āϤāϰāĻžāϧāĻŋāĻāĻžāϰāϏā§āϤā§āϰ⧠āĻĒā§āϰāĻžāĻĒā§āϤ
BaseOperator, āϝāĻž āĻŦā§āĻļ āĻāϝāĻŧā§āĻāĻāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞā§-āύāĻŋāϰā§āĻĻāĻŋāώā§āĻ āĻāĻŋāύāĻŋāϏ āĻĒā§āϰāϝāĻŧā§āĻ āĻāϰ⧠(āĻāĻĒāύāĻžāϰ āĻ āĻŦāϏāϰā§āϰ āĻĻāĻŋāĻā§ āϤāĻžāĻāĻžāύ) - āĻā§āώāĻŋāϤ āĻā§āώā§āϤā§āϰ
template_fields, āϝā§āĻāĻžāύ⧠āĻāĻŋāύāĻāĻž āĻĒā§āϰāĻā§āϰāĻŋāϝāĻŧāĻž āĻāϰāĻžāϰ āĻāύā§āϝ āĻŽā§āϝāĻžāĻā§āϰā§āĻā§āϞāĻŋ āϏāύā§āϧāĻžāύ āĻāϰāĻŦā§ā§ˇ - āĻāύā§āϝ āϏāĻ āĻŋāĻ āϝā§āĻā§āϤāĻŋ āϏāĻžāĻāĻŋāϝāĻŧā§āĻā§āύ
__init__(), āϝā§āĻāĻžāύ⧠āĻĒā§āϰāϝāĻŧā§āĻāύ āϏā§āĻāĻžāύ⧠āĻĄāĻŋāĻĢāϞā§āĻ āϏā§āĻ āĻāϰā§āύāĨ¤ - āĻāĻŽāϰāĻž āĻĒā§āϰā§āĻŦāĻĒā§āϰā§āώā§āϰ āϏā§āĻāύāĻž āϏāĻŽā§āĻĒāϰā§āĻā§āĻ āĻā§āϞ⧠āϝāĻžāĻāύāĻŋāĨ¤
- āϏāĻāĻļā§āϞāĻŋāώā§āĻ āĻšā§āĻ āĻā§āϞāϞā§āύ
TelegramBotHookāĻāĻāĻŋ āĻĨā§āĻā§ āĻāĻāĻāĻŋ āĻā§āϞāĻžāϝāĻŧā§āύā§āĻ āĻ āĻŦāĻā§āĻā§āĻ āĻĒā§āϝāĻŧā§āĻāĻŋāĨ¤ - āĻāĻāĻžāϰāϰāĻžāĻāĻĄ (āĻĒā§āύāϰāĻžāϝāĻŧ āϏāĻāĻā§āĻāĻžāϝāĻŧāĻŋāϤ) āĻĒāĻĻā§āϧāϤāĻŋ
BaseOperator.execute(), āĻ āĻĒāĻžāϰā§āĻāϰ āĻāĻžāϞ⧠āĻāϰāĻžāϰ āϏāĻŽāϝāĻŧ āĻāϞ⧠āϝ⧠āĻāϝāĻŧāĻžāϰāĻĢā§ āĻā§āĻāĻ āĻāϰāĻŦā§ - āĻāϤ⧠āĻāĻŽāϰāĻž āϞāĻ āĻāύ āĻāϰāϤ⧠āĻā§āϞ⧠āĻāĻŋāϝāĻŧā§ āĻŽā§āϞ āĻā§āϰāĻŋāϝāĻŧāĻžāĻāĻŋ āĻŦāĻžāϏā§āϤāĻŦāĻžāϝāĻŧāύ āĻāϰāĻŦāĨ¤ (āĻāĻŽāϰāĻž āϞāĻ āĻāύ āĻāϰāĻŋ, āϝāĻžāĻāĻšā§āĻ, āĻĄāĻžāύ āĻāύstdoutиstderr- āĻŦāĻžāϝāĻŧā§āĻĒā§āϰāĻŦāĻžāĻš āϏāĻŦāĻāĻŋāĻā§āĻā§ āĻāĻāĻāĻžāĻŦā§, āϏā§āύā§āĻĻāϰāĻāĻžāĻŦā§ āĻŽā§āĻĄāĻŧāĻžāύ⧠āĻšāĻŦā§, āĻĒā§āϰāϝāĻŧā§āĻāύ⧠āĻāĻāĻŋ āĻĒāĻāĻŋāϝāĻŧā§ āĻĢā§āϞāĻŦā§āĨ¤)
āĻĻā§āĻāĻž āϝāĻžāĻ āĻāĻŽāĻžāĻĻā§āϰ āĻāĻŋ āĻāĻā§ commons/hooks.py. āĻĢāĻžāĻāϞā§āϰ āĻĒā§āϰāĻĨāĻŽ āĻ
āĻāĻļ, āĻšā§āĻ āύāĻŋāĻā§āĻ:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientāĻāĻŽāĻŋ āĻāĻāĻžāύ⧠āĻā§ āĻŦā§āϝāĻžāĻā§āϝāĻž āĻāϰāĻŦ āϤāĻžāĻ āĻāĻžāύāĻŋ āύāĻž, āĻāĻŽāĻŋ āĻļā§āϧ⧠āĻā§āϰā§āϤā§āĻŦāĻĒā§āϰā§āĻŖ āĻĒāϝāĻŧā§āύā§āĻāĻā§āϞāĻŋ āύā§āĻ āĻāϰāĻŦ:
- āĻāĻŽāϰāĻž āĻāϤā§āϤāϰāĻžāϧāĻŋāĻāĻžāϰāϏā§āϤā§āϰ⧠āĻĒāĻžāĻ, āϝā§āĻā§āϤāĻŋāĻā§āϞāĻŋ āϏāĻŽā§āĻĒāϰā§āĻā§ āĻāĻŋāύā§āϤāĻž āĻāϰāĻŋ - āĻŦā§āĻļāĻŋāϰāĻāĻžāĻ āĻā§āώā§āϤā§āϰā§āĻ āĻāĻāĻŋ āĻāĻ āĻšāĻŦā§:
conn_id; - āĻāĻāĻžāϰāϰāĻžāĻāĻĄāĻŋāĻ āϏā§āĻā§āϝāĻžāύā§āĻĄāĻžāϰā§āĻĄ āĻĒāĻĻā§āϧāϤāĻŋ: āĻāĻŽāĻŋ āύāĻŋāĻā§āĻā§ āϏā§āĻŽāĻŋāϤ āĻāϰā§āĻāĻŋ
get_conn(), āϝāĻžāϰ āĻŽāϧā§āϝ⧠āĻāĻŽāĻŋ āύāĻžāĻŽ āĻĻā§āĻŦāĻžāϰāĻž āϏāĻāϝā§āĻā§āϰ āĻĒāϰāĻžāĻŽāĻŋāϤāĻŋāĻā§āϞāĻŋ āĻĒāĻžāĻ āĻāĻŦāĻ āĻā§āĻŦāϞ āĻŦāĻŋāĻāĻžāĻāĻāĻŋ āĻĒāĻžāĻextra(āĻāĻāĻŋ āĻāĻāĻāĻŋ JSON āĻā§āώā§āϤā§āϰ), āϝā§āĻāĻžāύ⧠āĻāĻŽāĻŋ (āĻāĻŽāĻžāϰ āύāĻŋāĻā§āϰ āύāĻŋāϰā§āĻĻā§āĻļ āĻ āύā§āϏāĻžāϰā§!) āĻā§āϞāĻŋāĻā§āϰāĻžāĻŽ āĻŦāĻ āĻā§āĻā§āύ āϰā§āĻā§āĻāĻŋ:{"bot_token": "YOuRAwEsomeBOtToKen"}. - āĻāĻŽāĻŋ āĻāĻŽāĻžāĻĻā§āϰ āĻāĻāĻāĻŋ āĻāĻĻāĻžāĻšāϰāĻŖ āϤā§āϰāĻŋ
TelegramBot, āĻāĻāĻŋ āĻāĻāĻāĻŋ āύāĻŋāϰā§āĻĻāĻŋāώā§āĻ āĻā§āĻā§āύ āĻĒā§āϰāĻĻāĻžāύ.
āĻāĻāĻžāύā§āĻ āĻļā§āώ. āĻāĻĒāύāĻŋ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰ⧠āĻāĻāĻāĻŋ āĻšā§āĻ āĻĨā§āĻā§ āĻāĻāĻāĻŋ āĻā§āϞāĻžāϝāĻŧā§āύā§āĻ āĻĒā§āϤ⧠āĻĒāĻžāϰā§āύ TelegramBotHook().clent āĻŦāĻž TelegramBotHook().get_conn().
āĻāĻŦāĻ āĻĢāĻžāĻāϞā§āϰ āĻĻā§āĻŦāĻŋāϤā§āϝāĻŧ āĻ
āĻāĻļ, āϝā§āĻāĻžāύ⧠āĻāĻŽāĻŋ āĻā§āϞāĻŋāĻā§āϰāĻžāĻŽ REST API-āĻāϰ āĻāύā§āϝ āĻāĻāĻāĻŋ āĻŽāĻžāĻāĻā§āϰā§āĻāϰā§āϝāĻžāĻĒāĻžāϰ āϤā§āϰāĻŋ āĻāϰāĻŋ, āϝāĻžāϤ⧠āĻāĻāĻ āĻā§āύ⧠āύāĻž āĻāύ⧠āĻāĻāĻāĻŋ āĻĒāĻĻā§āϧāϤāĻŋāϰ āĻāύā§āϝ sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))āϏāĻ āĻŋāĻ āĻāĻĒāĻžāϝāĻŧ āĻšāϞ āĻāĻāĻŋ āϏāĻŦ āϝā§āĻ āĻāϰāĻž:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- āĻĒā§āϞāĻžāĻāĻāύā§, āĻāĻāĻāĻŋ āĻĒāĻžāĻŦāϞāĻŋāĻ āϰāĻŋāĻĒā§āĻāĻŋāĻāϰāĻŋ āϰāĻžāĻā§āύ āĻāĻŦāĻ āĻāĻĒā§āύ āϏā§āϰā§āϏ⧠āĻĻāĻŋāύāĨ¤
āϝāĻāύ āĻāĻŽāϰāĻž āĻāĻ āϏāĻŦ āĻ āϧā§āϝāϝāĻŧāύ āĻāϰāĻāĻŋāϞāĻžāĻŽ, āĻāĻŽāĻžāĻĻā§āϰ āϰāĻŋāĻĒā§āϰā§āĻ āĻāĻĒāĻĄā§āĻ āϏāĻĢāϞāĻāĻžāĻŦā§ āĻŦā§āϝāϰā§āĻĨ āĻšāϝāĻŧā§āĻā§ āĻāĻŦāĻ āĻāĻŽāĻžāĻā§ āĻā§āϝāĻžāύā§āϞ⧠āĻāĻāĻāĻŋ āϤā§āϰā§āĻāĻŋ āĻŦāĻžāϰā§āϤāĻž āĻĒāĻžāĻ āĻžāϤ⧠āϏāĻā§āώāĻŽ āĻšāϝāĻŧā§āĻā§ā§ˇ āĻāĻŽāĻŋ āĻāĻāĻž āĻā§āϞ āĻāĻŋāύāĻž āĻĻā§āĻāϤ⧠āĻā§āĻ āĻāϰāϤ⧠āϝāĻžāĻā§āĻāĻŋ...

āĻāĻŽāĻžāĻĻā§āϰ āĻā§āĻā§āϰā§āϰ āĻŽāϧā§āϝ⧠āĻāĻŋāĻā§ āĻā§āĻā§ āĻā§āĻā§! āĻāĻāĻžāĻ āĻāĻŋ āĻāĻŽāϰāĻž āĻāĻļāĻž āĻāϰāĻāĻŋāϞāĻžāĻŽ āύāĻž? āĻšā§āĻŦāĻšā§ !
āĻāĻĒāύāĻŋ āĻĸāĻžāϞāĻž āϝāĻžāĻā§āĻā§āύ?
āϤā§āĻŽāĻžāϰ āĻāĻŋ āĻŽāύ⧠āĻšāϝāĻŧ āĻāĻŽāĻŋ āĻāĻŋāĻā§ āĻŽāĻŋāϏ āĻāϰā§āĻāĻŋ? āĻŽāύ⧠āĻšāĻā§āĻā§ āϤāĻŋāύāĻŋ āĻāϏāĻāĻŋāĻāĻāϞ āϏāĻžāϰā§āĻāĻžāϰ āĻĨā§āĻā§ āĻāĻžāϰā§āĻāĻŋāĻāĻžāϝāĻŧ āĻĄā§āĻāĻž āϏā§āĻĨāĻžāύāĻžāύā§āϤāϰ āĻāϰāĻžāϰ āĻĒā§āϰāϤāĻŋāĻļā§āϰā§āϤāĻŋ āĻĻāĻŋāϝāĻŧā§āĻāĻŋāϞā§āύ, āĻāĻŦāĻ āϤāĻžāϰāĻĒāϰ⧠āϤāĻŋāύāĻŋ āϤāĻž āύāĻŋāϝāĻŧā§āĻāĻŋāϞā§āύ āĻāĻŦāĻ āĻŦāĻŋāώāϝāĻŧāĻāĻŋ āĻĨā§āĻā§ āϏāϰ⧠āĻāĻŋāϝāĻŧā§āĻāĻŋāϞā§āύ, āĻŦāĻāĻžāĻā§!
āĻāĻ āύā§āĻļāĻāϏāϤāĻž āĻāĻā§āĻāĻžāĻā§āϤ āĻāĻŋāϞ, āĻāĻŽāĻžāĻā§ āĻā§āĻŦāϞ āĻāĻĒāύāĻžāϰ āĻāύā§āϝ āĻāĻŋāĻā§ āĻĒāϰāĻŋāĻāĻžāώāĻž āĻŦā§āĻāĻžāϤ⧠āĻšāϝāĻŧā§āĻāĻŋāϞāĨ¤ āĻāĻāύ āĻāĻĒāύāĻŋ āĻāϰāĻ āϝā§āϤ⧠āĻĒāĻžāϰā§āύāĨ¤
āĻāĻŽāĻžāĻĻā§āϰ āĻĒāϰāĻŋāĻāϞā§āĻĒāύāĻž āĻāĻŋāϞ āĻāĻ:
- āĻĻāĻžāĻ āĻāϰ
- āĻāĻžāĻ āϤā§āϰāĻŋ āĻāϰā§āύ
- āĻĻā§āĻā§āύ āϏāĻŦāĻāĻŋāĻā§ āĻāϤ āϏā§āύā§āĻĻāϰ
- āĻĒā§āϰāĻŖ āĻāϰāĻžāϰ āĻāύā§āϝ āϏā§āĻļāύ āύāĻŽā§āĻŦāϰ āĻŦāϰāĻžāĻĻā§āĻĻ āĻāϰā§āύ
- SQL āϏāĻžāϰā§āĻāĻžāϰ āĻĨā§āĻā§ āĻĄā§āĻāĻž āĻĒāĻžāύ
- āĻāĻžāϰā§āĻāĻŋāĻāĻžāϝāĻŧ āĻĄā§āĻāĻž āϰāĻžāĻā§āύ
- āĻĒāϰāĻŋāϏāĻāĻā§āϝāĻžāύ āϏāĻāĻā§āϰāĻš āĻāϰā§āύ
āϏā§āϤāϰāĻžāĻ, āĻāĻ āϏāĻŦ āĻĒā§āϤ⧠āĻāĻŦāĻ āĻāϞāĻŽāĻžāύ, āĻāĻŽāĻŋ āĻāĻŽāĻžāĻĻā§āϰ āĻāĻāĻāĻŋ āĻā§āĻ āϏāĻāϝā§āĻāύ āĻāϰāĻž docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyāϏā§āĻāĻžāύ⧠āĻāĻŽāϰāĻž āĻāϤā§āĻĨāĻžāĻĒāύ āĻāϰāĻŋ:
- āĻšā§āϏā§āĻ āĻšāĻŋāϏāĻžāĻŦā§ āĻāĻžāϰā§āĻāĻŋāĻāĻž
dwhāϏāĻŦāĻā§āϝāĻŧā§ āĻĄāĻŋāĻĢāϞā§āĻ āϏā§āĻāĻŋāĻāϏ āϏāĻš, - SQL āϏāĻžāϰā§āĻāĻžāϰā§āϰ āϤāĻŋāύāĻāĻŋ āĻāĻĻāĻžāĻšāϰāĻŖ,
- āĻāĻŽāϰāĻž āĻĒāϰāĻŦāϰā§āϤā§āϤ⧠āĻāĻŋāĻā§ āĻĄā§āĻāĻž āĻĻāĻŋāϝāĻŧā§ āĻĄā§āĻāĻžāĻŦā§āϏāĻā§āϞāĻŋ āĻĒā§āϰāĻŖ āĻāϰāĻŋ (āĻā§āύ āĻā§āώā§āϤā§āϰā§āĻ āϤāĻž āĻĻā§āĻāĻŦā§āύ āύāĻž
mssql_init.py!)
āĻāĻŽāϰāĻž āĻāϤāĻŦāĻžāϰā§āϰ āĻā§āϝāĻŧā§ āĻāĻŋāĻā§āĻāĻž āĻāĻāĻŋāϞ āĻāĻŽāĻžāύā§āĻĄā§āϰ āϏāĻžāĻšāĻžāϝā§āϝ⧠āϏāĻŽāϏā§āϤ āĻāĻžāϞ āĻāĻžāϞ⧠āĻāϰāĻŋ:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3āĻāĻŽāĻžāĻĻā§āϰ āĻ
āϞā§āĻāĻŋāĻ āϰā§āϝāĻžāύā§āĻĄāĻŽāĻžāĻāĻāĻžāϰ āĻāĻŋ āϤā§āϰāĻŋ āĻāϰā§āĻā§, āĻāĻĒāύāĻŋ āĻāĻāĻā§āĻŽāĻāĻŋ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰāϤ⧠āĻĒāĻžāϰā§āύ Data Profiling/Ad Hoc Query:

āĻŽā§āϞ āĻāĻŋāύāĻŋāϏāĻāĻŋ āĻŦāĻŋāĻļā§āϞā§āώāĻāĻĻā§āϰ āĻāĻžāĻā§ āĻāĻāĻŋ āĻĻā§āĻāĻžāύ⧠āύāϝāĻŧ
āϏāĻŽā§āĻĒā§āϰāϏāĻžāϰāĻŋāϤ ETL āϏā§āĻļāύ āĻāĻŽāĻŋ āĻāϰāĻŦ āύāĻž, āϏā§āĻāĻžāύ⧠āϏāĻŦāĻāĻŋāĻā§āĻ āϤā§āĻā§āĻ: āĻāĻŽāϰāĻž āĻāĻāĻāĻŋ āĻāĻŋāϤā§āϤāĻŋ āϤā§āϰāĻŋ āĻāϰāĻŋ, āĻāϤ⧠āĻāĻāĻāĻŋ āĻāĻŋāĻšā§āύ āϰāϝāĻŧā§āĻā§, āĻāĻŽāϰāĻž āĻāĻāĻāĻŋ āĻĒā§āϰāϏāĻā§āĻ āĻĒāϰāĻŋāĻāĻžāϞāĻā§āϰ āϏāĻžāĻĨā§ āϏāĻŦāĻāĻŋāĻā§ āĻŽā§āĻĄāĻŧā§ āĻĢā§āϞāĻŋ āĻāĻŦāĻ āĻāĻāύ āĻāĻŽāϰāĻž āĻāĻāĻŋ āĻāϰāĻŋ:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passāϏāĻŽāϝāĻŧ āĻāϏā§āĻā§ āĻāĻŽāĻžāĻĻā§āϰ āϤāĻĨā§āϝ āϏāĻāĻā§āϰāĻš āĻāϰā§āύ āĻāĻŽāĻžāĻĻā§āϰ āĻĻā§āĻĄāĻŧ āĻļ āĻā§āĻŦāĻŋāϞ āĻĨā§āĻā§āĨ¤ āĻāϏā§āύ āĻā§āĻŦ āύāĻāĻŋāϰāĻŦāĻŋāĻšā§āύ āϞāĻžāĻāύā§āϰ āϏāĻžāĻšāĻžāϝā§āϝ⧠āĻāĻāĻŋ āĻāϰāĻŋ:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- āĻāĻāĻāĻŋ āĻšā§āĻā§āϰ āϏāĻžāĻšāĻžāϝā§āϝ⧠āĻāĻŽāϰāĻž āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āĻĨā§āĻā§ āĻĒāĻžāĻ
pymssql- āϏāĻāϝā§āĻ āĻāϰā§āύ - āĻāϏā§āύ āĻ āύā§āϰā§āϧ⧠āϤāĻžāϰāĻŋāĻā§āϰ āĻāĻāĻžāϰ⧠āĻāĻāĻāĻŋ āϏā§āĻŽāĻžāĻŦāĻĻā§āϧāϤāĻž āĻĒā§āϰāϤāĻŋāϏā§āĻĨāĻžāĻĒāύ āĻāϰāĻŋ - āĻāĻāĻŋ āĻā§āĻŽāĻĒā§āϞā§āĻ āĻāĻā§āĻāĻŋāύ āĻĻā§āĻŦāĻžāϰāĻž āĻĢāĻžāĻāĻļāύ⧠āύāĻŋāĻā§āώā§āĻĒ āĻāϰāĻž āĻšāĻŦā§āĨ¤
- āĻāĻŽāĻžāĻĻā§āϰ āĻ
āύā§āϰā§āϧ āĻāĻžāĻāϝāĻŧāĻžāύā§
pandasāĻā§ āĻāĻŽāĻžāĻĻā§āϰ āĻĒāĻžāĻŦā§DataFrame- āĻāĻāĻž āĻāĻŦāĻŋāώā§āϝāϤ⧠āĻāĻŽāĻžāĻĻā§āϰ āĻāύā§āϝ āĻĻāϰāĻāĻžāϰ⧠āĻšāĻŦā§.
āĻāĻŽāĻŋ āĻĒā§āϰāϤāĻŋāϏā§āĻĨāĻžāĻĒāύ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰāĻāĻŋ
{dt}āĻāĻāĻāĻŋ āĻ āύā§āϰā§āϧ āĻĒāϰāĻžāĻŽāĻŋāϤāĻŋ āĻĒāϰāĻŋāĻŦāϰā§āϤā§%sāĻāĻ āĻāύā§āϝ āύāϝāĻŧ āϝ⧠āĻāĻŽāĻŋ āĻāĻāĻāύ āĻĻā§āώā§āĻ āĻĒāĻŋāύā§āĻāĻŋāĻ, āĻāĻŋāύā§āϤ⧠āĻāĻžāϰāĻŖpandasāϏāĻžāĻŽāϞāĻžāϤ⧠āĻĒāĻžāϰ⧠āύāĻžpymssqlāĻāĻŦāĻ āĻļā§āώ āĻāĻāĻāĻŋ āϏā§āϞāĻŋāĻĒparams: ListāϝāĻĻāĻŋāĻ āϏ⧠āϏāϤā§āϝāĻŋāĻ āĻāĻžāϝāĻŧtuple.
āĻāĻāĻžāĻĄāĻŧāĻžāĻ āĻŦāĻŋāĻāĻžāĻļāĻāĻžāϰ⧠āύā§āĻ āĻāϰā§āύpymssqlāϤāĻžāĻā§ āĻāϰ āϏāĻŽāϰā§āĻĨāύ āύāĻž āĻāϰāĻžāϰ āϏāĻŋāĻĻā§āϧāĻžāύā§āϤ āύāĻŋāϝāĻŧā§āĻā§, āĻāĻŦāĻ āĻāĻāĻŋ āϏāϰāĻžāύā§āϰ āϏāĻŽāϝāĻŧpyodbc.
āĻāϏā§āύ āĻĻā§āĻāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āĻāĻŽāĻžāĻĻā§āϰ āĻĢāĻžāĻāĻļāύāĻā§āϞāĻŋāϰ āĻāϰā§āĻā§āĻŽā§āύā§āĻāĻā§āϞāĻŋāĻā§ āĻā§ āĻĻāĻŋāϝāĻŧā§ āϏā§āĻāĻžāĻĢ āĻāϰā§āĻā§:

āϝāĻĻāĻŋ āĻā§āύ āϤāĻĨā§āϝ āύāĻž āĻĨāĻžāĻā§, āϤāĻžāĻšāϞ⧠āĻāĻžāϞāĻŋāϝāĻŧā§ āϝāĻžāĻāϝāĻŧāĻžāϰ āĻā§āύ āĻŽāĻžāύ⧠āύā§āĻāĨ¤ āĻāĻŋāύā§āϤ⧠āĻāĻāĻŋ āĻĒā§āϰāĻŖ āĻāϰāĻž āϏāĻĢāϞ āĻŦāĻŋāĻŦā§āĻāύāĻž āĻāϰāĻžāĻ āĻ āĻĻā§āĻā§āϤāĨ¤ āϤāĻŦā§ āĻāĻāĻŋ āĻāĻāĻāĻŋ āĻā§āϞ āύāϝāĻŧāĨ¤ āĻ-āĻāĻš-āĻāĻš, āĻāĻŋ āĻāϰāĻŦ?! āĻāĻŦāĻ āĻāĻāĻžāύ⧠āĻāĻŋ:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException Airflow āĻā§ āĻŦāϞāĻŦā§ āϝ⧠āĻā§āύ āϤā§āϰā§āĻāĻŋ āύā§āĻ, āĻāĻŋāύā§āϤ⧠āĻāĻŽāϰāĻž āĻāĻžāĻāĻāĻŋ āĻāĻĄāĻŧāĻŋāϝāĻŧā§ āϝāĻžāĻāĨ¤ āĻāύā§āĻāĻžāϰāĻĢā§āϏ⧠āϏāĻŦā§āĻ āĻŦāĻž āϞāĻžāϞ āĻŦāϰā§āĻāĻā§āώā§āϤā§āϰ āĻĨāĻžāĻāĻŦā§ āύāĻž, āĻāĻŋāύā§āϤ⧠āĻā§āϞāĻžāĻĒā§āĨ¤
āĻāϏā§āύ āĻāĻŽāĻžāĻĻā§āϰ āĻĄā§āĻāĻž āĻāϏ āĻāϰāĻŋ āĻāĻāĻžāϧāĻŋāĻ āĻāϞāĻžāĻŽ:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])āϝāĻĨāĻž:
- āϝ⧠āĻĄāĻžāĻāĻžāĻŦā§āϏ āĻĨā§āĻā§ āĻāĻŽāϰāĻž āĻ āϰā§āĻĄāĻžāϰ āύāĻŋāϝāĻŧā§āĻāĻŋāϞāĻžāĻŽ,
- āĻāĻŽāĻžāĻĻā§āϰ āĻŦāύā§āϝāĻž āϏā§āĻļāύā§āϰ āĻāĻāĻĄāĻŋ (āĻāĻāĻŋ āĻāĻŋāύā§āύ āĻšāĻŦā§ āĻĒā§āϰāϤāĻŋāĻāĻŋ āĻāĻžāĻā§āϰ āĻāύā§āϝ),
- āĻāϤā§āϏ āĻāĻŦāĻ āĻ āϰā§āĻĄāĻžāϰ āĻāĻāĻĄāĻŋ āĻĨā§āĻā§ āĻāĻāĻāĻŋ āĻšā§āϝāĻžāĻļ - āϝāĻžāϤ⧠āĻā§āĻĄāĻŧāĻžāύā§āϤ āĻĄāĻžāĻāĻžāĻŦā§āϏ⧠(āϝā§āĻāĻžāύ⧠āϏāĻŦāĻāĻŋāĻā§ āĻāĻāĻāĻŋ āĻā§āĻŦāĻŋāϞ⧠āĻĸā§āϞ⧠āĻĻā§āĻāϝāĻŧāĻž āĻšāϝāĻŧ) āĻāĻŽāĻžāĻĻā§āϰ āĻāĻāĻāĻŋ āĻ āύāύā§āϝ āĻ āϰā§āĻĄāĻžāϰ āĻāĻāĻĄāĻŋ āϰāϝāĻŧā§āĻā§āĨ¤
āĻļā§āώ āϧāĻžāĻĒ āĻŦāĻžāĻāĻŋ: āĻāĻžāϰā§āĻāĻŋāĻāĻžāϰ āĻŽāϧā§āϝ⧠āϏāĻŦāĻāĻŋāĻā§ āĻĸāĻžāϞāĻž. āĻāĻŦāĻ, āĻ āĻĻā§āĻā§āϤāĻāĻžāĻŦā§ āϝāĻĨā§āώā§āĻ, āĻāĻāĻŋ āĻāϰāĻžāϰ āϏāĻŦāĻā§āϝāĻŧā§ āĻĻāϰā§āĻļāύā§āϝāĻŧ āĻāĻŦāĻ āĻāĻžāϰā§āϝāĻāϰ āĻāĻĒāĻžāϝāĻŧāĻā§āϞāĻŋāϰ āĻŽāϧā§āϝ⧠āĻāĻāĻāĻŋ āĻšāϞ CSV āĻāϰ āĻŽāĻžāϧā§āϝāĻŽā§!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- āĻāĻŽāϰāĻž āĻāĻāĻāĻŋ āĻŦāĻŋāĻļā§āώ āϰāĻŋāϏāĻŋāĻāĻžāϰ āϤā§āϰāĻŋ āĻāϰāĻāĻŋ
StringIO. pandasāĻĻāϝāĻŧāĻž āĻāϰ⧠āĻāĻŽāĻžāĻĻā§āϰ āĻāϰāĻž āĻšāĻŦā§DataFrameāĻāĻāĻžāϰā§CSV-āϞāĻžāĻāύ- āĻāϏā§āύ āĻāĻāĻāĻŋ āĻšā§āĻ āĻĻāĻŋāϝāĻŧā§ āĻāĻŽāĻžāĻĻā§āϰ āĻĒā§āϰāĻŋāϝāĻŧ āĻāĻžāϰā§āĻāĻŋāĻāĻžāϰ āϏāĻžāĻĨā§ āĻāĻāĻāĻŋ āϏāĻāϝā§āĻ āĻā§āϞāĻŋāĨ¤
- āĻāĻŦāĻ āĻāĻāύ āϏāĻžāĻšāĻžāϝā§āϝā§āϰ āϏāĻžāĻĨā§
copy()āĻāĻžāϰā§āĻāĻŋāĻāĻžāϝāĻŧ āϏāϰāĻžāϏāϰāĻŋ āĻāĻŽāĻžāĻĻā§āϰ āĻĄā§āĻāĻž āĻĒāĻžāĻ āĻžāύ!
āĻāĻŽāϰāĻž āĻĄā§āϰāĻžāĻāĻāĻžāϰā§āϰ āĻāĻžāĻ āĻĨā§āĻā§ āύā§āĻŦ āĻāϤāĻā§āϞāĻŋ āϞāĻžāĻāύ āĻĒā§āϰā§āĻŖ āĻšāϝāĻŧā§āĻā§, āĻāĻŦāĻ āϏā§āĻļāύ āĻŽā§āϝāĻžāύā§āĻāĻžāϰāĻā§ āĻŦāϞāĻŦ āϝ⧠āϏāĻŦāĻāĻŋāĻā§ āĻ āĻŋāĻ āĻāĻā§:
session.loaded_rows = cursor.rowcount
session.successful = TrueāĻāĻāĻžāύā§āĻ āĻļā§āώ.
āĻŦāĻŋāĻā§āϰāϝāĻŧā§, āĻāĻŽāϰāĻž āĻŽā§āϝāĻžāύā§āϝāĻŧāĻžāϞāĻŋ āĻāĻžāϰā§āĻā§āĻ āĻĒā§āϞā§āĻ āϤā§āϰāĻŋ āĻāϰāĻŋāĨ¤ āĻāĻāĻžāύ⧠āĻāĻŽāĻŋ āύāĻŋāĻā§āĻā§ āĻāĻāĻāĻŋ āĻā§āĻ āĻŽā§āĻļāĻŋāύā§āϰ āĻ āύā§āĻŽāϤāĻŋ āĻĻāĻŋāϞāĻžāĻŽ:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)āĻāĻŽāĻŋ āĻŦā§āϝāĻžāĻŦāĻšāĻžāϰ āĻāϰāĻāĻŋ
VerticaOperator()āĻāĻŽāĻŋ āĻāĻāĻāĻŋ āĻĄāĻžāĻāĻžāĻŦā§āϏ āϏā§āĻāĻŋāĻŽāĻž āĻāĻŦāĻ āĻāĻāĻāĻŋ āĻā§āĻŦāĻŋāϞ āϤā§āϰāĻŋ āĻāϰāĻŋ (āϝāĻĻāĻŋ āϏā§āĻā§āϞāĻŋ āĻāϤāĻŋāĻŽāϧā§āϝ⧠āĻŦāĻŋāĻĻā§āϝāĻŽāĻžāύ āύāĻž āĻĨāĻžāĻā§, āĻ āĻŦāĻļā§āϝāĻ)āĨ¤ āĻĒā§āϰāϧāĻžāύ āĻāĻŋāύāĻŋāϏ āϏāĻ āĻŋāĻāĻāĻžāĻŦā§ āύāĻŋāϰā§āĻāϰāϤāĻž āĻŦā§āϝāĻŦāϏā§āĻĨāĻž āĻāϰāĻž āĻšāϝāĻŧ:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadāĻŦā§āĻĻā§āϧāĻŋāĻŽāĻžāύ
- āĻāĻā§āĻāĻž, - āĻā§āĻ āĻāĻāĻĻā§āϰ āĻŦāϞāϞ, - āĻāĻāύ āϤāĻžāĻ āύāĻž
āĻāĻĒāύāĻŋ āĻāĻŋ āύāĻŋāĻļā§āĻāĻŋāϤ āϝ⧠āĻāĻŽāĻŋ āĻŦāύā§āϰ āϏāĻŦāĻā§āϝāĻŧā§ āĻāϝāĻŧāĻā§āĻāϰ āĻĒā§āϰāĻžāĻŖā§?
āĻā§āϞāĻŋāϝāĻŧāĻž āĻĄā§āύāĻžāϞā§āĻĄāϏāύ, āĻĻā§āϝ āĻā§āϰā§āĻĢāĻžāϞā§
āĻāĻŽāĻŋ āĻŽāύ⧠āĻāϰāĻŋ āϝāĻĻāĻŋ āĻāĻŽāĻžāϰ āϏāĻšāĻāϰā§āĻŽā§ āĻāĻŦāĻ āĻāĻŽāĻžāϰ āĻŽāϧā§āϝ⧠āĻāĻāĻāĻŋ āĻĒā§āϰāϤāĻŋāϝā§āĻāĻŋāϤāĻž āĻĨāĻžāĻāϤ: āĻā§ āĻĻā§āϰā§āϤ āϏā§āĻā§āϰā§āϝāĻžāĻ āĻĨā§āĻā§ āĻāĻāĻāĻŋ ETL āĻĒā§āϰāĻā§āϰāĻŋāϝāĻŧāĻž āϤā§āϰāĻŋ āĻāϰāĻŦā§ āĻāĻŦāĻ āĻāĻžāϞ⧠āĻāϰāĻŦā§: āϤāĻžāϰāĻž āϤāĻžāĻĻā§āϰ SSIS āĻāĻŦāĻ āĻāĻāĻāĻŋ āĻŽāĻžāĻāϏ āĻāĻŦāĻ āĻāĻŽāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āϏāĻš ... āĻāĻŦāĻ āϤāĻžāϰāĻĒāϰ⧠āĻāĻŽāϰāĻž āϰāĻā§āώāĻŖāĻžāĻŦā§āĻā§āώāĻŖā§āϰ āϏāĻšāĻāϤāĻžāϰ āϏāĻžāĻĨā§ āϤā§āϞāύāĻž āĻāϰāĻŦ ... āĻŦāĻžāĻš, āĻāĻŽāĻŋ āĻŽāύ⧠āĻāϰāĻŋ āĻāĻĒāύāĻŋ āϏāĻŽā§āĻŽāϤ āĻšāĻŦā§āύ āϝ⧠āĻāĻŽāĻŋ āϤāĻžāĻĻā§āϰ āϏāĻŦ āĻĢā§āϰāύā§āĻā§ āĻĒāϰāĻžāĻāĻŋāϤ āĻāϰāĻŦ!
āϝāĻĻāĻŋ āĻāĻāĻā§ āĻŦā§āĻļāĻŋ āĻā§āϰā§āϤā§āĻŦ āϏāĻšāĻāĻžāϰā§, āϤāĻŦā§ āĻ ā§āϝāĻžāĻĒāĻžāĻāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠- āĻĒā§āϰā§āĻā§āϰāĻžāĻŽ āĻā§āĻĄ āĻāĻāĻžāϰ⧠āĻĒā§āϰāĻā§āϰāĻŋāϝāĻŧāĻžāĻā§āϞāĻŋ āĻŦāϰā§āĻŖāύāĻž āĻāϰ⧠- āĻāĻŽāĻžāϰ āĻāĻžāĻ āĻāϰā§āĻā§ āĻ āϧāĻŋāĻ āĻāϰ⧠āĻāϰāĻžāĻŽāĻĻāĻžāϝāĻŧāĻ āĻāĻŦāĻ āĻāĻĒāĻā§āĻā§āϝāĨ¤
āĻĒā§āϞāĻžāĻ-āĻāύ āĻāĻŦāĻ āϏā§āĻā§āϞā§āĻŦāĻŋāϞāĻŋāĻāĻŋāϰ āĻĒā§āϰāĻŦāĻŖāϤāĻž āĻāĻāϝāĻŧ āĻā§āώā§āϤā§āϰā§āĻ āĻāϰ āϏā§āĻŽāĻžāĻšā§āύ āĻāĻā§āϏāĻā§āύāϏāĻŋāĻŦāĻŋāϞāĻŋāĻāĻŋ, āĻāĻĒāύāĻžāĻā§ āĻĒā§āϰāĻžāϝāĻŧ āϝā§āĻā§āύ⧠āĻā§āώā§āϤā§āϰā§āĻ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰāĻžāϰ āϏā§āϝā§āĻ āĻĻā§āϝāĻŧ: āĻāĻŽāύāĻāĻŋ āĻĄā§āĻāĻž āϏāĻāĻā§āϰāĻš, āĻĒā§āϰāϏā§āϤā§āϤ āĻ āĻĒā§āϰāĻā§āϰāĻŋāϝāĻŧāĻžāĻāϰāĻŖā§āϰ āϏāĻŽā§āĻĒā§āϰā§āĻŖ āĻāĻā§āϰā§, āĻāĻŽāύāĻāĻŋ āϰāĻā§āĻ āĻā§āĻā§āώā§āĻĒāĻŖā§āϰ āĻā§āώā§āϤā§āϰā§āĻ (āĻŽāĻā§āĻāϞ āĻā§āϰāĻšā§, āĻāϰ āĻ āĻŦāĻļā§āϝāĻ)āĨ¤
āĻĒāĻžāϰā§āĻ āĻĢāĻžāĻāύāĻžāϞ, āϰā§āĻĢāĻžāϰā§āύā§āϏ āĻāĻŦāĻ āϤāĻĨā§āϝ
āĻāĻŽāϰāĻž āĻāĻĒāύāĻžāϰ āĻāύā§āϝ āϏāĻāĻā§āϰāĻš āĻāϰā§āĻāĻŋ āϰā§āĻ
start_date. āĻšā§āϝāĻžāĻ, āĻāĻāĻŋ āĻāϤāĻŋāĻŽāϧā§āϝ⧠āĻāĻāĻāĻŋ āϏā§āĻĨāĻžāύā§āϝāĻŧ āĻŽā§āĻŽāĨ¤ āĻĄāĻ āĻāϰ āĻĒā§āϰāϧāĻžāύ āϝā§āĻā§āϤāĻŋ āĻŽāĻžāϧā§āϝāĻŽā§start_dateāϏāĻŦ āĻĒāĻžāϏ. āϏāĻāĻā§āώā§āĻĒā§, āϝāĻĻāĻŋ āĻāĻĒāύāĻŋ āĻāϞā§āϞā§āĻ āĻāϰā§āύstart_dateāĻŦāϰā§āϤāĻŽāĻžāύ āϤāĻžāϰāĻŋāĻ, āĻāĻŦāĻschedule_interval- āĻāĻāĻĻāĻŋāύ, āϤāĻžāĻšāϞ⧠āĻĄāĻŋāĻāĻāĻŋ āĻāĻžāϞ āĻļā§āϰ⧠āĻšāĻŦā§ āĻāĻā§ āύāĻžāĨ¤start_date = datetime(2020, 7, 7, 0, 1, 2)āĻāĻŦāĻ āĻāϰ āĻā§āύ āϏāĻŽāϏā§āϝāĻž āύā§āĻāĨ¤
āĻāϰ āϏāĻžāĻĨā§ āϝā§āĻā§āϤ āĻāϰā§āĻāĻāĻŋ āϰāĻžāύāĻāĻžāĻāĻŽ āϤā§āϰā§āĻāĻŋ āϰāϝāĻŧā§āĻā§:
Task is missing the start_date parameter, āϝāĻž āĻĒā§āϰāĻžāϝāĻŧāĻļāĻ āύāĻŋāϰā§āĻĻā§āĻļ āĻāϰ⧠āϝ⧠āĻāĻĒāύāĻŋ dag āĻ āĻĒāĻžāϰā§āĻāϰā§āϰ āϏāĻžāĻĨā§ āĻāĻŦāĻĻā§āϧ āĻāϰāϤ⧠āĻā§āϞ⧠āĻā§āĻā§āύāĨ¤- āϏāĻŦ āĻāĻ āĻŽā§āĻļāĻŋāύā§āĨ¤ āĻšā§āϝāĻžāĻ, āĻāĻŦāĻ āĻāĻžāĻāĻāĻŋ (āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āύāĻŋāĻā§āĻ āĻāĻŦāĻ āĻāĻŽāĻžāĻĻā§āϰ āĻāĻŦāϰāĻŖ), āĻāĻŦāĻ āĻāĻāĻāĻŋ āĻāϝāĻŧā§āĻŦ āϏāĻžāϰā§āĻāĻžāϰ, āĻāĻŦāĻ āĻāĻāĻāĻŋ āϏāĻŽāϝāĻŧāϏā§āĻā§, āĻāĻŦāĻ āĻāϰā§āĻŽā§āϰāĻžāĨ¤ āĻāĻŦāĻ āĻāĻāĻž āĻāĻŽāύāĻāĻŋ āĻāĻžāĻ. āĻāĻŋāύā§āϤ⧠āϏāĻŽāϝāĻŧā§āϰ āϏāĻžāĻĨā§ āϏāĻžāĻĨā§, āĻĒāϰāĻŋāώā§āĻŦāĻžāĻā§āϞāĻŋāϰ āĻāύā§āϝ āĻāĻžāĻā§āϰ āϏāĻāĻā§āϝāĻž āĻŦāĻžāĻĄāĻŧāϤ⧠āĻĨāĻžāĻā§, āĻāĻŦāĻ āϝāĻāύ PostgreSQL 20 ms āĻāϰ āĻĒāϰāĻŋāĻŦāϰā§āϤ⧠5 āϏā§āĻā§āύā§āĻĄā§ āϏā§āĻāĻā§ āϏāĻžāĻĄāĻŧāĻž āĻĻāĻŋāϤ⧠āĻļā§āϰ⧠āĻāϰā§, āϤāĻāύ āĻāĻŽāϰāĻž āĻāĻāĻŋ āύāĻŋāϝāĻŧā§ āĻāĻŋāϝāĻŧā§āĻāĻŋāϞāĻžāĻŽ āĻāĻŦāĻ āϏāϰāĻŋāϝāĻŧā§ āύāĻŋāϝāĻŧā§āĻāĻŋāϞāĻžāĻŽāĨ¤
- āϞā§āĻāĻžāϞ āĻāĻā§āϏāĻŋāĻāĻŋāĻāĻāϰāĨ¤ āĻšā§āϝāĻžāĻ, āĻāĻŽāϰāĻž āĻāĻāύāĻ āĻāĻāĻŋāϰ āĻāĻĒāϰ āĻŦāϏ⧠āĻāĻāĻŋ āĻāĻŦāĻ āĻāĻŽāϰāĻž āĻāϤāĻŋāĻŽāϧā§āϝ⧠āĻ āϤāϞ āĻāĻšā§āĻŦāϰā§āϰ āϧāĻžāϰ⧠āĻāϞ⧠āĻāϏā§āĻāĻŋāĨ¤ LocalExecutor āĻāĻāύ āĻĒāϰā§āϝāύā§āϤ āĻāĻŽāĻžāĻĻā§āϰ āĻāύā§āϝ āϝāĻĨā§āώā§āĻ āĻāĻŋāϞ, āĻāĻŋāύā§āϤ⧠āĻāĻāύ āϏāĻŽāϝāĻŧ āĻāϏā§āĻā§ āĻ āύā§āϤāϤ āĻāĻāĻāύ āĻāϰā§āĻŽā§ āύāĻŋāϝāĻŧā§ āĻĒā§āϰāϏāĻžāϰāĻŋāϤ āĻāϰāĻžāϰ, āĻāĻŦāĻ CeleryExecutor-āĻ āϝāĻžāĻāϝāĻŧāĻžāϰ āĻāύā§āϝ āĻāĻŽāĻžāĻĻā§āϰ āĻāĻ ā§āϰ āĻĒāϰāĻŋāĻļā§āϰāĻŽ āĻāϰāϤ⧠āĻšāĻŦā§āĨ¤ āĻāĻŦāĻ āĻāĻĒāύāĻŋ āĻāĻāĻŋāϰ āϏāĻžāĻĨā§ āĻāĻāĻāĻŋ āĻŽā§āĻļāĻŋāύ⧠āĻāĻžāĻ āĻāϰāϤ⧠āĻĒāĻžāϰā§āύ āĻāĻ āĻŦāĻŋāώāϝāĻŧāĻāĻŋāϰ āĻĒāϰāĻŋāĻĒā§āϰā§āĻā§āώāĻŋāϤā§, āĻā§āύāĻ āϏāĻžāϰā§āĻāĻžāϰā§āĻ āϏā§āϞāĻžāϰāĻŋ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰāϤ⧠āĻāĻĒāύāĻžāĻā§ āĻŦāĻžāϧāĻž āĻĻā§āϝāĻŧ āύāĻž, āϝāĻž "āĻ āĻŦāĻļā§āϝāĻ, āϏāϤāϤāĻžāϰ āϏāĻžāĻĨā§ āĻāĻāύāĻ āĻāϤā§āĻĒāĻžāĻĻāύ⧠āϝāĻžāĻŦā§ āύāĻž!"
- āĻ
-āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ
āύā§āϤāϰā§āύāĻŋāϰā§āĻŽāĻŋāϤ āϏāϰāĻā§āĻāĻžāĻŽ:
- āϏāĻāϝā§āĻ āĻĒāϰāĻŋāώā§āĻŦāĻžāϰ āĻļāĻāϏāĻžāĻĒāϤā§āϰ āϏāĻāϰāĻā§āώāĻŖ āĻāϰāϤā§,
- SLA āĻŽāĻŋāϏ āϝ⧠āĻāĻžāĻāĻā§āϞ⧠āϏāĻŽāϝāĻŧāĻŽāϤ⧠āĻšāϝāĻŧāύāĻŋ āϏā§āĻā§āϞā§āϰ āĻāĻŦāĻžāĻŦ āĻĻāĻŋāϤā§,
- xcom āĻŽā§āĻāĻžāĻĄā§āĻāĻž āĻŦāĻŋāύāĻŋāĻŽāϝāĻŧā§āϰ āĻāύā§āϝ (āĻāĻŽāĻŋ āĻŦāϞāϞāĻžāĻŽ āĻŽā§āĻāĻžāĻĄā§āĻāĻž!) āĻĄā§āĻ āĻāĻžāĻā§āϰ āĻŽāϧā§āϝā§āĨ¤
- āĻŽā§āϞ āĻ āĻĒāĻŦā§āϝāĻŦāĻšāĻžāϰ. āĻāϝāĻŧā§āϞ āĻāĻŽāĻŋ āĻāĻŋ āĻŦāϞāϤ⧠āĻĒāĻžāϰā§āύ? āĻĒāϤāĻŋāϤ āĻāĻžāĻā§āϰ āĻĒā§āύāϰāĻžāĻŦā§āϤā§āϤāĻŋāϰ āĻāύā§āϝ āϏāϤāϰā§āĻāϤāĻž āϏā§āĻ āĻāĻĒ āĻāϰāĻž āĻšāϝāĻŧā§āĻāĻŋāϞāĨ¤ āĻāĻāύ āĻāĻŽāĻžāϰ āĻāĻžāĻ Gmail-āĻ Airflow āĻĨā§āĻā§ >90k āĻāĻŽā§āϞ āĻāĻā§, āĻāĻŦāĻ āĻāϝāĻŧā§āĻŦ āĻŽā§āĻāϞ ââāĻŽāĻā§āϞ āĻāĻāĻŦāĻžāϰ⧠100 āĻāĻŋāϰāĻ āĻŦā§āĻļāĻŋ āĻĒāĻŋāĻ āĻāĻĒ āĻāϰāϤ⧠āĻāĻŦāĻ āĻŽā§āĻāϤ⧠āĻ āϏā§āĻŦā§āĻāĻžāϰ āĻāϰā§āĨ¤
āĻāϰ⧠āĻā§āώāϤāĻŋ:
āĻāϰāĻ āĻ āĻā§āĻŽā§āĻļāύ āĻā§āϞ
āĻāĻŽāĻžāĻĻā§āϰ āĻšāĻžāϤ āĻĻāĻŋāϝāĻŧā§ āύāϝāĻŧ āĻāĻŽāĻžāĻĻā§āϰ āĻŽāĻžāĻĨāĻž āĻĻāĻŋāϝāĻŧā§ āĻāϰāĻ āĻŦā§āĻļāĻŋ āĻāĻžāĻ āĻāϰāĻžāϰ āĻāύā§āϝ, āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āĻāĻŽāĻžāĻĻā§āϰ āĻāύā§āϝ āĻāĻāĻŋ āĻĒā§āϰāϏā§āϤā§āϤ āĻāϰā§āĻā§:
- - āϤāĻžāϰ āĻāĻāύāĻ āĻĒāϰā§āĻā§āώāĻžāĻŽā§āϞāĻ āĻŽāϰā§āϝāĻžāĻĻāĻž āϰāϝāĻŧā§āĻā§, āϝāĻž āϤāĻžāĻā§ āĻāĻžāĻ āĻāϰāϤ⧠āĻŦāĻžāϧāĻž āĻĻā§āϝāĻŧ āύāĻžāĨ¤ āĻāĻāĻŋāϰ āϏāĻžāĻšāĻžāϝā§āϝā§, āĻāĻĒāύāĻŋ āĻļā§āϧā§āĻŽāĻžāϤā§āϰ āĻĄā§āϝāĻžāĻ āĻāĻŦāĻ āĻāĻžāĻ āϏāĻŽā§āĻĒāϰā§āĻā§ āϤāĻĨā§āϝ āĻĒā§āϤ⧠āĻĒāĻžāϰā§āύ āύāĻž, āϤāĻŦā§ āĻāĻāĻāĻŋ āĻĄā§āϝāĻžāĻ āĻŦāύā§āϧ/āĻļā§āϰ⧠āĻāϰāϤā§, āĻāĻāĻāĻŋ DAG āϰāĻžāύ āĻŦāĻž āĻāĻāĻāĻŋ āĻĒā§āϞ āϤā§āϰāĻŋ āĻāϰāϤ⧠āĻĒāĻžāϰā§āύ⧎
- - āĻāĻŽāĻžāύā§āĻĄ āϞāĻžāĻāύā§āϰ āĻŽāĻžāϧā§āϝāĻŽā§ āĻ
āύā§āĻ āĻā§āϞ āĻĒāĻžāĻāϝāĻŧāĻž āϝāĻžāϝāĻŧ āϝā§āĻā§āϞ⧠āĻļā§āϧ⧠WebUI āĻāϰ āĻŽāĻžāϧā§āϝāĻŽā§ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰāĻž āĻ
āϏā§āĻŦāĻŋāϧāĻžāĻāύāĻ āύāϝāĻŧ, āĻāĻŋāύā§āϤ⧠āϏāĻžāϧāĻžāϰāĻŖāϤ āĻ
āύā§āĻĒāϏā§āĻĨāĻŋāϤāĨ¤ āĻāĻĻāĻžāĻšāϰāĻŖ āϏā§āĻŦāϰā§āĻĒ:
backfillāĻāĻžāϏā§āĻ āĻĻā§āώā§āĻāĻžāύā§āϤ āĻĒā§āύāϰāĻžāϝāĻŧ āĻāϰāĻŽā§āĻ āĻāϰāϤ⧠āĻĒā§āϰāϝāĻŧā§āĻāύ.
āĻāĻĻāĻžāĻšāϰāĻŖ āϏā§āĻŦāϰā§āĻĒ, āĻŦāĻŋāĻļā§āϞā§āώāĻāϰāĻž āĻāϏ⧠āĻŦāϞāϞā§āύ: âāĻāĻŦāĻ āĻāĻŽāϰā§āĻĄ, āĻāĻžāύā§āϝāĻŧāĻžāϰ⧠1 āĻĨā§āĻā§ 13 āϤāĻžāϰāĻŋāĻā§āϰ āĻĄā§āĻāĻžāϤ⧠āĻāĻĒāύāĻŋ āĻāĻā§āĻŦāĻžāĻā§ āĻāĻĨāĻž āĻŦāϞā§āĻā§āύ! āĻ āĻŋāĻ āĻāϰ, āĻ āĻŋāĻ āĻāϰ, āĻ āĻŋāĻ āĻāϰ, āĻ āĻŋāĻ āĻāϰ!" āĻāĻŦāĻ āĻāĻĒāύāĻŋ āϝā§āĻŽāύ āĻāĻāĻāĻŋ āĻšāĻŦ:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- āĻŦā§āϏ āϏāĻžāϰā§āĻāĻŋāϏ:
initdb,resetdb,upgradedb,checkdb. run, āϝāĻž āĻāĻĒāύāĻžāĻā§ āĻāĻāĻāĻŋ āĻāύāϏā§āĻā§āϝāĻžāύā§āϏ āĻāĻžāϏā§āĻ āĻāĻžāϞāĻžāύā§āϰ āĻāĻŦāĻ āĻāĻŽāύāĻāĻŋ āϏāĻŽāϏā§āϤ āύāĻŋāϰā§āĻāϰāϤāĻžāϰ āĻāĻĒāϰ āϏā§āĻā§āϰ āĻāϰāϤ⧠āĻĻā§āϝāĻŧāĨ¤ āϤāĻžāĻāĻžāĻĄāĻŧāĻž, āĻāĻĒāύāĻŋ āĻāϰ āĻŽāĻžāϧā§āϝāĻŽā§ āĻāĻāĻŋ āĻāĻžāϞāĻžāϤ⧠āĻĒāĻžāϰā§āύLocalExecutor, āĻāĻŽāύāĻāĻŋ āϝāĻĻāĻŋ āĻāĻĒāύāĻžāϰ āϏā§āϞāĻžāϰāĻŋ āĻā§āϞāĻžāϏā§āĻāĻžāϰ āĻĨāĻžāĻā§āĨ¤- āĻĒā§āϰāĻžāϝāĻŧ āĻāĻāĻ āĻāĻŋāύāĻŋāϏ āĻāϰā§
test, āĻļā§āϧā§āĻŽāĻžāϤā§āϰ āĻāĻžāĻāĻāĻŋāϤā§āĻ āĻāĻŋāĻā§āĻ āϞā§āĻā§ āύāĻžāĨ¤ connectionsāĻļā§āϞ āĻĨā§āĻā§ āϏāĻāϝā§āĻ āϤā§āϰāĻŋ āĻāϰāϤ⧠āĻĻā§āϝāĻŧāĨ¤
- - āĻŽāĻŋāĻĨāϏā§āĻā§āϰāĻŋāϝāĻŧāĻž āĻāϰāĻžāϰ āĻāĻāĻāĻŋ āĻŦāϰāĻ āĻšāĻžāϰā§āĻĄāĻā§āϰ āĻāĻĒāĻžāϝāĻŧ, āϝāĻž āĻĒā§āϞāĻžāĻāĻāύāĻā§āϞāĻŋāϰ āĻāύā§āϝ āĻāĻĻā§āĻĻāĻŋāώā§āĻ, āĻāĻŦāĻ āϏāĻžāĻŽāĻžāύā§āϝ āĻšāĻžāϤ āĻĻāĻŋāϝāĻŧā§ āĻāϤ⧠āĻāĻžāĻāĻā§āύāĻŋ āύāĻžāĨ¤ āĻāĻŋāύā§āϤ⧠āĻāĻŽāĻžāĻĻā§āϰ āϝā§āϤ⧠āĻŦāĻžāϧāĻž āĻĻā§āĻŦā§ āĻā§
/home/airflow/dags, āĻāĻžāϞāĻžāύipythonāĻāĻŦāĻ āĻāĻžāϰāĻĒāĻžāĻļā§ āĻāĻāĻžāĻāĻŋāĻā§āĻĄāĻŧāĻŋ āĻļā§āϰā§? āĻāĻĒāύāĻŋ, āĻāĻĻāĻžāĻšāϰāĻŖāϏā§āĻŦāϰā§āĻĒ, āύāĻŋāĻŽā§āύāϞāĻŋāĻāĻŋāϤ āĻā§āĻĄ āϏāĻš āϏāĻŽāϏā§āϤ āϏāĻāϝā§āĻ āϰāĻĒā§āϤāĻžāύāĻŋ āĻāϰāϤ⧠āĻĒāĻžāϰā§āύ:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āĻŽā§āĻāĻžāĻĄā§āĻāĻžāĻŦā§āϏā§āϰ āϏāĻžāĻĨā§ āϏāĻāϝā§āĻ āĻāϰāĻž āĻšāĻā§āĻā§āĨ¤ āĻāĻŽāĻŋ āĻāĻāĻŋāϤ⧠āϞā§āĻāĻžāϰ āĻĒāϰāĻžāĻŽāϰā§āĻļ āĻĻāĻŋāĻ āύāĻž, āϤāĻŦā§ āĻŦāĻŋāĻāĻŋāύā§āύ āύāĻŋāϰā§āĻĻāĻŋāώā§āĻ āĻŽā§āĻā§āϰāĻŋāĻā§āϏā§āϰ āĻāύā§āϝ āĻāĻžāϏā§āĻ āϏā§āĻā§āĻ āĻĒāĻžāĻāϝāĻŧāĻž āϝ⧠āĻā§āύāĻ API āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰāĻžāϰ āĻā§āϝāĻŧā§ āĻ
āύā§āĻ āĻĻā§āϰā§āϤ āĻāĻŦāĻ āϏāĻšāĻ āĻšāϤ⧠āĻĒāĻžāϰā§āĨ¤
āĻāϏā§āύ āĻāĻŽāϰāĻž āĻŦāϞāĻŋ āϝ⧠āĻāĻŽāĻžāĻĻā§āϰ āϏāĻŽāϏā§āϤ āĻāĻžāĻāĻ āĻ āĻĻāĻŽā§āϝ āύāϝāĻŧ, āϤāĻŦā§ āϏā§āĻā§āϞāĻŋ āĻāĻāύāĻ āĻāĻāύāĻ āĻĒāĻĄāĻŧā§ āϝā§āϤ⧠āĻĒāĻžāϰ⧠āĻāĻŦāĻ āĻāĻāĻŋ āϏā§āĻŦāĻžāĻāĻžāĻŦāĻŋāĻāĨ¤ āĻāĻŋāύā§āϤ⧠āĻāϝāĻŧā§āĻāĻāĻŋ āĻŦā§āϞāĻā§āĻ āĻāϤāĻŋāĻŽāϧā§āϝā§āĻ āϏāύā§āĻĻā§āĻšāĻāύāĻ, āĻāĻŦāĻ āĻāĻāĻŋ āĻĒāϰā§āĻā§āώāĻž āĻāϰāĻž āĻĒā§āϰāϝāĻŧā§āĻāύāĨ¤
āĻāϏāĻāĻŋāĻāĻāϞ āϏāĻžāĻŦāϧāĻžāύ!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
āϰā§āĻĢāĻžāϰā§āύā§āϏ
āĻāĻŦāĻ āĻ āĻŦāĻļā§āϝāĻ, āĻā§āĻāϞ āĻāϏā§āϝ⧠āĻāϰāĻžāϰ āĻĒā§āϰāĻĨāĻŽ āĻĻāĻļāĻāĻŋ āϞāĻŋāĻā§āĻ āĻāĻŽāĻžāϰ āĻŦā§āĻāĻŽāĻžāϰā§āĻ āĻĨā§āĻā§ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āĻĢā§āϞā§āĻĄāĻžāϰā§āϰ āĻŦāĻŋāώāϝāĻŧāĻŦāϏā§āϤā§āĨ¤
- - āĻ āĻŦāĻļā§āϝāĻ āĻ āĻĢāĻŋāϏ āĻĻāĻŋāϝāĻŧā§ āĻļā§āϰ⧠āĻāϰāϤ⧠āĻšāĻŦā§āĨ¤ āĻĄāĻā§āĻŽā§āύā§āĻā§āĻļāύ, āĻāĻŋāύā§āϤ⧠āύāĻŋāϰā§āĻĻā§āĻļāĻžāĻŦāϞ⧠āĻā§ āĻĒāĻĄāĻŧā§?
- - āĻ āĻŋāĻ āĻāĻā§, āĻ āύā§āϤāϤ āύāĻŋāϰā§āĻŽāĻžāϤāĻžāĻĻā§āϰ āĻāĻžāĻ āĻĨā§āĻā§ āϏā§āĻĒāĻžāϰāĻŋāĻļ āĻĒāĻĄāĻŧā§āύāĨ¤
- - āĻāĻā§āĻŦāĻžāϰ⧠āĻļā§āϰā§: āĻāĻŦāĻŋāϤ⧠āĻāĻāĻāĻžāϰ āĻāύā§āĻāĻžāϰāĻĢā§āϏ
- - āĻŽā§āϞāĻŋāĻ āϧāĻžāϰāĻŖāĻžāĻā§āϞāĻŋ āĻāĻžāϞāĻāĻžāĻŦā§ āĻŦāϰā§āĻŖāύāĻž āĻāϰāĻž āĻšāϝāĻŧā§āĻā§, āϝāĻĻāĻŋ (āĻšāĻ āĻžā§!) āĻāĻĒāύāĻŋ āĻāĻŽāĻžāϰ āĻāĻžāĻ āĻĨā§āĻā§ āĻāĻŋāĻā§ āĻŦā§āĻāϤ⧠āύāĻž āĻĒāĻžāϰā§āύāĨ¤
- - āĻāĻāĻāĻŋ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āĻā§āϞāĻžāϏā§āĻāĻžāϰ āϏā§āĻ āĻāĻĒ āĻāϰāĻžāϰ āĻāύā§āϝ āĻāĻāĻāĻŋ āϏāĻāĻā§āώāĻŋāĻĒā§āϤ āύāĻŋāϰā§āĻĻā§āĻļāĻŋāĻāĻžāĨ¤
- - āĻĒā§āϰāĻžāϝāĻŧ āĻāĻāĻ āĻāĻāϰā§āώāĻŖā§āϝāĻŧ āύāĻŋāĻŦāύā§āϧ, āϏāĻŽā§āĻāĻŦāϤ āĻāϰāĻ āĻāύā§āώā§āĻ āĻžāύāĻŋāĻāϤāĻž āĻāĻŦāĻ āĻāĻŽ āĻāĻĻāĻžāĻšāϰāĻŖ āĻāĻžāĻĄāĻŧāĻžāĨ¤
- - āϏā§āϞāĻžāϰāĻŋāϰ āϏāĻžāĻĨā§ āĻāĻāϝā§āĻā§ āĻāĻžāĻ āĻāϰāĻžāϰ āĻŦāĻŋāώāϝāĻŧā§āĨ¤
- - āĻāĻžāĻā§āϰ āĻ āĻĻāĻŽā§āϝāϤāĻž āϏāĻŽā§āĻĒāϰā§āĻā§, āϤāĻžāϰāĻŋāĻā§āϰ āĻĒāϰāĻŋāĻŦāϰā§āϤ⧠āĻāĻāĻĄāĻŋ āĻĻā§āĻŦāĻžāϰāĻž āϞā§āĻĄ āĻāϰāĻž, āϰā§āĻĒāĻžāύā§āϤāϰ, āĻĢāĻžāĻāϞā§āϰ āĻāĻžāĻ āĻžāĻŽā§ āĻāĻŦāĻ āĻ āύā§āϝāĻžāύā§āϝ āĻāĻāϰā§āώāĻŖā§āϝāĻŧ āĻāĻŋāύāĻŋāϏāĨ¤
- - āĻāĻžāĻ āĻāĻŦāĻ āĻā§āϰāĻŋāĻāĻžāϰ āύāĻŋāϝāĻŧāĻŽā§āϰ āύāĻŋāϰā§āĻāϰāϤāĻž, āϝāĻž āĻāĻŽāĻŋ āĻļā§āϧā§āĻŽāĻžāϤā§āϰ āĻĒāĻžāϏ āĻāϰāĻžāϰ āϏāĻŽāϝāĻŧ āĻāϞā§āϞā§āĻ āĻāϰā§āĻāĻŋāĨ¤
- - āĻāĻŋāĻāĻžāĻŦā§ āĻļāĻŋāĻĄāĻŋāĻāϞāĻžāϰā§āϰ āĻāĻŋāĻā§ "āĻāĻĻā§āĻĻā§āĻļā§āϝ āĻ āύā§āϝāĻžāϝāĻŧā§ āĻāĻžāĻ" āĻāĻžāĻāĻŋāϝāĻŧā§ āĻāĻ āϤ⧠āĻšāϝāĻŧ, āĻšāĻžāϰāĻŋāϝāĻŧā§ āϝāĻžāĻāϝāĻŧāĻž āĻĄā§āĻāĻž āϞā§āĻĄ āĻāϰāϤ⧠āĻšāϝāĻŧ āĻāĻŦāĻ āĻāĻžāĻāĻā§āϞāĻŋāĻā§ āĻ āĻā§āϰāĻžāϧāĻŋāĻāĻžāϰ āĻĻāĻŋāϤ⧠āĻšāϝāĻŧāĨ¤
- â āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠āĻŽā§āĻāĻžāĻĄā§āĻāĻžāϤ⧠āĻĻāϰāĻāĻžāϰ⧠āĻāϏāĻāĻŋāĻāĻāϞ āĻĒā§āϰāĻļā§āύāĨ¤
- - āĻāĻāĻāĻŋ āĻāĻžāϏā§āĻāĻŽ āϏā§āύā§āϏāϰ āϤā§āϰāĻŋ āϏāĻŽā§āĻĒāϰā§āĻā§ āĻāĻāĻāĻŋ āĻĻāϰāĻāĻžāϰ⧠āĻŦāĻŋāĻāĻžāĻ āĻāĻā§āĨ¤
- â āĻĄā§āĻāĻž āϏāĻžāϝāĻŧā§āύā§āϏā§āϰ āĻāύā§āϝ AWS-āĻ āĻāĻāĻāĻŋ āĻĒāϰāĻŋāĻāĻžāĻ āĻžāĻŽā§ āύāĻŋāϰā§āĻŽāĻžāĻŖā§āϰ āĻŦāĻŋāώāϝāĻŧā§ āĻāĻāĻāĻŋ āĻāĻāϰā§āώāĻŖā§āϝāĻŧ āϏāĻāĻā§āώāĻŋāĻĒā§āϤ āύā§āĻāĨ¤
- - āϏāĻžāϧāĻžāϰāĻŖ āĻā§āϞ (āϝāĻāύ āĻā§āĻ āĻāĻāύāĻ āύāĻŋāϰā§āĻĻā§āĻļāĻžāĻŦāϞ⧠āĻĒāĻĄāĻŧā§ āύāĻž)āĨ¤
- - āĻšāĻžāϏā§āύ āϞā§āĻā§āϰāĻž āĻā§āĻāĻžāĻŦā§ āĻĒāĻžāϏāĻāϝāĻŧāĻžāϰā§āĻĄ āϏāĻāϰāĻā§āώāĻŖ āĻāϰā§, āϝāĻĻāĻŋāĻ āĻāĻĒāύāĻŋ āĻā§āĻŦāϞ āϏāĻāϝā§āĻāĻā§āϞāĻŋ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰāϤ⧠āĻĒāĻžāϰā§āύ⧎
- - āĻ āύā§āϤāϰā§āύāĻŋāĻšāĻŋāϤ DAG āĻĢāϰāĻāϝāĻŧāĻžāϰā§āĻĄāĻŋāĻ, āĻĢāĻžāĻāĻļāύ⧠āĻāύāĻā§āĻā§āϏāĻ āĻĨā§āϰā§āϝāĻŧāĻŋāĻ, āĻāĻŦāĻžāϰ āύāĻŋāϰā§āĻāϰāϤāĻž āϏāĻŽā§āĻĒāϰā§āĻā§, āĻāĻŦāĻ āĻāĻžāϏā§āĻ āϞāĻā§āĻ āĻāĻĄāĻŧāĻŋāϝāĻŧā§ āϝāĻžāĻāϝāĻŧāĻž āϏāĻŽā§āĻĒāϰā§āĻā§āĻāĨ¤
- - āĻŦā§āϝāĻŦāĻšāĻžāϰ āϏāĻŽā§āĻĒāϰā§āĻā§
default argumentsиparamsāĻā§āĻŽāĻĒā§āϞā§āĻ, āϏā§āĻāϏāĻžāĻĨā§ āĻā§āϰāĻŋāϝāĻŧā§āĻŦāϞ āĻāĻŦāĻ āϏāĻāϝā§āĻāĻā§āϞāĻŋāϤā§āĨ¤ - - āĻĒāϰāĻŋāĻāϞā§āĻĒāύāĻžāĻāĻžāϰ⧠āĻā§āĻāĻžāĻŦā§ āĻāϝāĻŧāĻžāϰāĻĢā§āϞ⧠2.0 āĻāϰ āĻāύā§āϝ āĻĒā§āϰāϏā§āϤā§āϤāĻŋ āύāĻŋāĻā§āĻā§āύ āϏ⧠āϏāĻŽā§āĻĒāϰā§āĻā§ āĻāĻāĻāĻŋ āĻāϞā§āĻĒāĨ¤
- - āĻāĻŽāĻžāĻĻā§āϰ āĻā§āϞāĻžāϏā§āĻāĻžāϰ āϏā§āĻĨāĻžāĻĒāύ āϏāĻŽā§āĻĒāϰā§āĻā§ āĻāĻāĻāĻŋ āϏāĻžāĻŽāĻžāύā§āϝ āĻĒā§āϰāĻžāύ⧠āύāĻŋāĻŦāύā§āϧ
docker-compose. - - āĻā§āĻŽāĻĒā§āϞā§āĻ āĻāĻŦāĻ āĻĒā§āϰāϏāĻā§āĻ āĻĢāϰāĻāϝāĻŧāĻžāϰā§āĻĄāĻŋāĻ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻāϰ⧠āĻāϤāĻŋāĻļā§āϞ āĻāĻžāĻāĨ¤
- â āĻŽā§āϞ āĻāĻŦāĻ āϏā§āϞā§āϝāĻžāĻā§āϰ āĻŽāĻžāϧā§āϝāĻŽā§ āϏā§āĻā§āϝāĻžāύā§āĻĄāĻžāϰā§āĻĄ āĻāĻŦāĻ āĻāĻžāϏā§āĻāĻŽ āĻŦāĻŋāĻā§āĻāĻĒā§āϤāĻŋāĨ¤
- - āĻŦā§āϰāĻžāĻā§āĻāĻŋāĻ āĻāĻžāϏā§āĻ, āĻŽā§āϝāĻžāĻā§āϰ⧠āĻāĻŦāĻ āĻāĻā§āϏāĻāĻŽāĨ¤
āĻāĻŦāĻ āύāĻŋāĻŦāύā§āϧ⧠āĻŦā§āϝāĻŦāĻšā§āϤ āϞāĻŋāĻā§āĻāĻā§āϞāĻŋ:
- - āĻā§āĻŽāĻĒā§āϞā§āĻ āĻŦā§āϝāĻŦāĻšāĻžāϰā§āϰ āĻāύā§āϝ āϏā§āĻĨāĻžāύāϧāĻžāϰāĻ āĻāĻĒāϞāĻŦā§āϧāĨ¤
- - āĻĄā§āϝāĻžāĻ āϤā§āϰāĻŋ āĻāϰāĻžāϰ āϏāĻŽāϝāĻŧ āϏāĻžāϧāĻžāϰāĻŖ āĻā§āϞāĨ¤
- -
docker-composeāĻĒāϰā§āĻā§āώāĻž, āĻĄāĻŋāĻŦāĻžāĻāĻŋāĻ āĻāĻŦāĻ āĻāϰāĻ āĻ āύā§āĻ āĻāĻŋāĻā§āϰ āĻāύā§āϝāĨ¤ - â āĻā§āϞāĻŋāĻā§āϰāĻžāĻŽ REST API-āĻāϰ āĻāύā§āϝ āĻĒāĻžāĻāĻĨāύ āϰâā§āϝāĻžāĻĒāĻžāϰāĨ¤
āĻāϤā§āϏ: www.habr.com




