Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

рд╣рд╛рдп, рдореА рджрд┐рдорд┐рддреНрд░реА рд▓реЙрдЧрд╡рд┐рдиреЗрдВрдХреЛ рдЖрд╣реЗ - рд╡реНрд╣реЗрдЭреЗрдЯ рдЧреНрд░реБрдк рдСрдл рдХрдВрдкрдиреНрдпрд╛рдВрдЪреНрдпрд╛ рд╡рд┐рд╢реНрд▓реЗрд╖рдг рд╡рд┐рднрд╛рдЧрд╛рдЪрд╛ рдбреЗрдЯрд╛ рдЕрднрд┐рдпрдВрддрд╛.

рдореА рддреБрдореНрд╣рд╛рд▓рд╛ ETL рдкреНрд░рдХреНрд░рд┐рдпрд╛ рд╡рд┐рдХрд╕рд┐рдд рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдПрдХ рдЕрджреНрднреБрдд рд╕рд╛рдзрди рд╕рд╛рдВрдЧреЗрди - Apache Airflow. рдкрд░рдВрддреБ рдПрдЕрд░рдлреНрд▓реЛ рдЗрддрдХрд╛ рдЕрд╖реНрдЯрдкреИрд▓реВ рдЖрдгрд┐ рдмрд╣реБрдЖрдпрд╛рдореА рдЖрд╣реЗ рдХреА рддреБрдореНрд╣реА рдбреЗрдЯрд╛ рдкреНрд░рд╡рд╛рд╣рд╛рдд рдЧреБрдВрддрд▓реЗрд▓реЗ рдирд╕рд▓реЗ рддрд░реАрд╣реА рддреБрдореНрд╣реА рддреНрдпрд╛рд╡рд░ рдмрд╛рд░рдХрд╛рдИрдиреЗ рд▓рдХреНрд╖ рджрд┐рд▓реЗ рдкрд╛рд╣рд┐рдЬреЗ, рдкрд░рдВрддреБ рд╡реЗрд│реЛрд╡реЗрд│реА рдХреЛрдгрддреАрд╣реА рдкреНрд░рдХреНрд░рд┐рдпрд╛ рд╕реБрд░реВ рдХрд░рдгреЗ рдЖрдгрд┐ рддреНрдпрд╛рдВрдЪреНрдпрд╛ рдЕрдВрдорд▓рдмрдЬрд╛рд╡рдгреАрдЪреЗ рдирд┐рд░реАрдХреНрд╖рдг рдХрд░рдгреЗ рдЖрд╡рд╢реНрдпрдХ рдЖрд╣реЗ.

рдЖрдгрд┐ рд╣реЛрдп, рдореА рдлрдХреНрдд рд╕рд╛рдВрдЧрдгрд╛рд░ рдирд╛рд╣реА, рддрд░ рджрд░реНрд╢рд╡рд┐рддреЛ: рдкреНрд░реЛрдЧреНрд░рд╛рдордордзреНрдпреЗ рдмрд░реЗрдЪ рдХреЛрдб, рд╕реНрдХреНрд░реАрдирд╢реЙрдЯ рдЖрдгрд┐ рд╢рд┐рдлрд╛рд░рд╕реА рдЖрд╣реЗрдд.

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ
рдПрдЕрд░рдлреНрд▓реЛ / рд╡рд┐рдХрд┐рдореАрдбрд┐рдпрд╛ рдХреЙрдордиреНрд╕ рд╣рд╛ рд╢рдмреНрдж рдЧреБрдЧрд▓ рдХрд░рддрд╛рдирд╛ рддреБрдореНрд╣реА рд╕рд╣рд╕рд╛ рдХрд╛рдп рдкрд╛рд╣рддрд╛

рд╕рд╛рдордЧреНрд░реА рд╕рд╛рд░рдгреА

рдкрд░рд┐рдЪрдп

рдЕрдкрд╛рдЪреЗ рдПрдЕрд░рдлреНрд▓реЛ рдЬреЕрдВрдЧреЛрдкреНрд░рдорд╛рдгреЗрдЪ рдЖрд╣реЗ:

  • рдЕрдЬрдЧрд░рд╛рдд рд▓рд┐рд╣рд┐рд▓реЗрд▓реЗ
  • рдПрдХ рдЙрддреНрддрдо рдЕреЕрдбрдорд┐рди рдкреЕрдирд▓ рдЖрд╣реЗ,
  • рдЕрдирд┐рд╢реНрдЪрд┐рдд рдХрд╛рд│рд╛рд╕рд╛рдареА рд╡рд┐рд╕реНрддрд╛рд░рдгреНрдпрд╛рдпреЛрдЧреНрдп

- рдлрдХреНрдд рдЪрд╛рдВрдЧрд▓реЗ, рдЖрдгрд┐ рддреЗ рдкреВрд░реНрдгрдкрдгреЗ рднрд┐рдиреНрди рд╣реЗрддреВрдВрд╕рд╛рдареА рдмрдирд╡рд▓реЗ рдЧреЗрд▓реЗ рд╣реЛрддреЗ, рдореНрд╣рдгрдЬреЗ (рдЬрд╕реЗ рддреЗ рдХреЕрдЯрдЪреНрдпрд╛ рдЖрдзреА рд▓рд┐рд╣рд┐рд▓реЗрд▓реЗ рдЖрд╣реЗ):

  • рдЕрдорд░реНрдпрд╛рджрд┐рдд рдорд╢реАрдирд╡рд░ рдХрд╛рд░реНрдпреЗ рдЪрд╛рд▓рд╡рдгреЗ рдЖрдгрд┐ рджреЗрдЦрд░реЗрдЦ рдХрд░рдгреЗ (рдмрд░реЗрдЪ рд╕реЗрд▓реЗрд░реА / рдХреБрдмрд░реНрдиреЗрдЯреНрд╕ рдЖрдгрд┐ рддреБрдордЪрд╛ рд╡рд┐рд╡реЗрдХ рддреБрдореНрд╣рд╛рд▓рд╛ рдЕрдиреБрдорддреА рджреЗрдИрд▓)
  • рдбрд╛рдпрдиреЕрдорд┐рдХ рд╡рд░реНрдХрдлреНрд▓реЛ рдирд┐рд░реНрдорд┐рддреАрд╕рд╣ рдкрд╛рдпрдерди рдХреЛрдб рд▓рд┐рд╣рд┐рдгреНрдпрд╛рд╕ рдЖрдгрд┐ рд╕рдордЬрдгреНрдпрд╛рд╕ рдЕрдЧрджреА рд╕реЛрдкреЗ рдЖрд╣реЗ
  • рдЖрдгрд┐ рддрдпрд╛рд░ рдХреЗрд▓реЗрд▓реЗ рдШрдЯрдХ рдЖрдгрд┐ рд╣реЛрдордореЗрдб рдкреНрд▓рдЧрдЗрди (рдЬреЗ рдЕрддреНрдпрдВрдд рд╕реЛрдкреЗ рдЖрд╣реЗ) рд╡рд╛рдкрд░реВрди рдХреЛрдгрддрд╛рд╣реА рдбреЗрдЯрд╛рдмреЗрд╕ рдЖрдгрд┐ API рдПрдХрдореЗрдХрд╛рдВрд╢реА рдХрдиреЗрдХреНрдЯ рдХрд░рдгреНрдпрд╛рдЪреА рдХреНрд╖рдорддрд╛.

рдЖрдореНрд╣реА Apache Airflow рдпрд╛рдкреНрд░рдорд╛рдгреЗ рд╡рд╛рдкрд░рддреЛ:

  • рдЖрдореНрд╣реА рдбреАрдбрдмреНрд▓реНрдпреВрдПрдЪ рдЖрдгрд┐ рдУрдбреАрдПрд╕ (рдЖрдордЪреНрдпрд╛рдХрдбреЗ рд╡реНрд╣рд░реНрдЯрд┐рдХрд╛ рдЖрдгрд┐ рдХреНрд▓рд┐рдХрд╣рд╛рдКрд╕ рдЖрд╣реЗрдд) рдордзреАрд▓ рд╡рд┐рд╡рд┐рдз рд╕реНрддреНрд░реЛрддрд╛рдВрдХрдбреВрди (рдЕрдиреЗрдХ SQL рд╕рд░реНрд╡реНрд╣рд░ рдЖрдгрд┐ рдкреЛрд╕реНрдЯрдЧреНрд░реЗрдПрд╕рдХреНрдпреВрдПрд▓ рдЙрджрд╛рд╣рд░рдгреЗ, рдЕрдиреБрдкреНрд░рдпреЛрдЧ рдореЗрдЯреНрд░рд┐рдХрд╕рд╣ рд╡рд┐рд╡рд┐рдз API, рдЕрдЧрджреА 1C) рдбреЗрдЯрд╛ рдЧреЛрд│рд╛ рдХрд░рддреЛ.
  • рдХрд┐рддреА рдкреНрд░рдЧрдд cron, рдЬреЗ ODS рд╡рд░ рдбреЗрдЯрд╛ рдПрдХрддреНрд░реАрдХрд░рдг рдкреНрд░рдХреНрд░рд┐рдпрд╛ рд╕реБрд░реВ рдХрд░рддреЗ рдЖрдгрд┐ рддреНрдпрд╛рдВрдЪреНрдпрд╛ рджреЗрдЦрднрд╛рд▓реАрд╡рд░ рджреЗрдЦреАрд▓ рд▓рдХреНрд╖ рдареЗрд╡рддреЗ.

рдЕрд▓реАрдХрдбреЗ рдкрд░реНрдпрдВрдд, рдЖрдордЪреНрдпрд╛ рдЧрд░рдЬрд╛ 32 рдХреЛрд░ рдЖрдгрд┐ 50 GB RAM рдЕрд╕рд▓реЗрд▓реНрдпрд╛ рдПрдХрд╛ рдЫреЛрдЯреНрдпрд╛ рд╕рд░реНрд╡реНрд╣рд░рджреНрд╡рд╛рд░реЗ рдкреВрд░реНрдг рдХреЗрд▓реНрдпрд╛ рдЬрд╛рдд рд╣реЛрддреНрдпрд╛. рдПрдЕрд░рдлреНрд▓реЛрдордзреНрдпреЗ, рд╣реЗ рдХрд╛рд░реНрдп рдХрд░рддреЗ:

  • рдЕрдзрд┐рдХ 200 рдбреЕрдЧреНрдЬ (рд╡рд╛рд╕реНрддрд╡рд┐рдХ рдХрд╛рд░реНрдпрдкреНрд░рд╡рд╛рд╣, рдЬреНрдпрд╛рдордзреНрдпреЗ рдЖрдореНрд╣реА рдХрд╛рд░реНрдпреЗ рднрд░рд▓реА рдЖрд╣реЗрдд)
  • рдкреНрд░рддреНрдпреЗрдХрд╛рдордзреНрдпреЗ рд╕рд░рд╛рд╕рд░реА 70 рдХрд╛рд░реНрдпреЗ,
  • рд╣рд╛ рдЪрд╛рдВрдЧреБрд▓рдкрдгрд╛ рд╕реБрд░реВ рд╣реЛрддреЛ (рд╕рд░рд╛рд╕рд░реА рджреЗрдЦреАрд▓) рддрд╛рд╕рд╛рддреВрди рдПрдХрджрд╛.

рдЖрдгрд┐ рдЖрдореНрд╣реА рдХрд╕реЗ рд╡рд┐рд╕реНрддрд╛рд░рд┐рдд рдХреЗрд▓реЗ рдпрд╛рдмрджреНрджрд▓, рдореА рдЦрд╛рд▓реА рд▓рд┐рд╣реАрди, рдкрд░рдВрддреБ рдЖрддрд╛ рдЖрдкрдг ├╝ber-рд╕рдорд╕реНрдпрд╛ рдкрд░рд┐рднрд╛рд╖рд┐рдд рдХрд░реВрдпрд╛ рдЬреА рдЖрдкрдг рд╕реЛрдбрд╡реВ:

рддреАрди рдореВрд│ SQL рд╕рд░реНрд╡реНрд╣рд░ рдЖрд╣реЗрдд, рдкреНрд░рддреНрдпреЗрдХрд╛рдордзреНрдпреЗ 50 рдбреЗрдЯрд╛рдмреЗрд╕реЗрд╕ рдЖрд╣реЗрдд - рдЕрдиреБрдХреНрд░рдореЗ рдПрдХрд╛ рдкреНрд░рдХрд▓реНрдкрд╛рдЪреА рдЙрджрд╛рд╣рд░рдгреЗ, рддреНрдпрд╛рдВрдЪреА рд░рдЪрдирд╛ рд╕рдорд╛рди рдЖрд╣реЗ (рдЬрд╡рд│рдЬрд╡рд│ рд╕рд░реНрд╡рддреНрд░, mua-ha-ha), рдореНрд╣рдгрдЬреЗ рдкреНрд░рддреНрдпреЗрдХрд╛рдХрдбреЗ рдСрд░реНрдбрд░ рдЯреЗрдмрд▓ рдЖрд╣реЗ (рд╕реБрджреИрд╡рд╛рдиреЗ, рддреНрдпрд╛рд╕рд╣ рдПрдХ рдЯреЗрдмрд▓ рдирд╛рд╡ рдХреЛрдгрддреНрдпрд╛рд╣реА рд╡реНрдпрд╡рд╕рд╛рдпрд╛рдд рдврдХрд▓рд▓реЗ рдЬрд╛рдК рд╢рдХрддреЗ). рдЖрдореНрд╣реА рд╕рд░реНрд╡реНрд╣рд┐рд╕ рдлреАрд▓реНрдб (рд╕реНрд░реЛрдд рд╕рд░реНрд╡реНрд╣рд░, рд╕реЛрд░реНрд╕ рдбреЗрдЯрд╛рдмреЗрд╕, рдИрдЯреАрдПрд▓ рдЯрд╛рд╕реНрдХ рдЖрдпрдбреА) рдЬреЛрдбреВрди рдбреЗрдЯрд╛ рдШреЗрддреЛ рдЖрдгрд┐ рдЕрдЧрджреА рд╕рд╣рдЬрддреЗрдиреЗ рд╡реНрд╣рд░реНрдЯрд┐рдХрд╛рдордзреНрдпреЗ рдЯрд╛рдХрддреЛ.

рдЪрд▓рд╛ рдЬрд╛рдКрдпрд╛!

рдореБрдЦреНрдп рднрд╛рдЧ, рд╡реНрдпрд╛рд╡рд╣рд╛рд░рд┐рдХ (рдЖрдгрд┐ рдереЛрдбрд╛ рд╕реИрджреНрдзрд╛рдВрддрд┐рдХ)

рдЖрдореНрд╣реА (рдЖрдгрд┐ рддреБрдореНрд╣реА) рдХрд╛?

рдЬреЗрд╡реНрд╣рд╛ рдЭрд╛рдбрдВ рдореЛрдареА рд╣реЛрддреА рдЖрдгрд┐ рдореА рд╕рд╛рдзрд╛ рд╣реЛрддреЛ SQL-рдПрдХ рд░рд╢рд┐рдпрди рдХрд┐рд░рдХреЛрд│ рд╡рд┐рдХреНрд░реАрдордзреНрдпреЗ, рдЖрдореНрд╣реА рдЖрдордЪреНрдпрд╛рдХрдбреЗ рдЙрдкрд▓рдмреНрдз рджреЛрди рд╕рд╛рдзрдирд╛рдВрдЪрд╛ рд╡рд╛рдкрд░ рдХрд░реВрди рдИрдЯреАрдПрд▓ рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдЙрд░реНрдл тАЛтАЛрдбреЗрдЯрд╛ рдкреНрд░рд╡рд╛рд╣рд╛рдЪрд╛ рдШреЛрдЯрд╛рд│рд╛ рдХреЗрд▓рд╛:

  • рдЗрдиреНрдлреЙрд░реНрдореЗрдЯрд┐рдХрд╛ рдкреЙрд╡рд░ рд╕реЗрдВрдЯрд░ - рдПрдХ рдЕрддреНрдпрдВрдд рдкрд╕рд░рдгрд╛рд░реА рдкреНрд░рдгрд╛рд▓реА, рдЕрддреНрдпрдВрдд рдЙрддреНрдкрд╛рджрдХ, рд╕реНрд╡рддрдГрдЪреНрдпрд╛ рд╣рд╛рд░реНрдбрд╡реЗрдЕрд░рд╕рд╣, рд╕реНрд╡рддрдГрдЪреА рдЖрд╡реГрддреНрддреА. рдореА рддреНрдпрд╛рдЪреНрдпрд╛ рдХреНрд╖рдорддреЗрдкреИрдХреА 1% рдЧреЙрдб рдлреЙрд░рдмрд┐рдб рд╡рд╛рдкрд░рд▓рд╛. рдХрд╛? рдмрд░рдВ, рд╕рд░реНрд╡ рдкреНрд░рдердо, 380 рдЪреНрдпрд╛ рджрд╢рдХрд╛рдкрд╛рд╕реВрди рдпрд╛ рдЗрдВрдЯрд░рдлреЗрд╕рдиреЗ рдЖрдкрд▓реНрдпрд╛рд╡рд░ рдорд╛рдирд╕рд┐рдХ рджрдмрд╛рд╡ рдЖрдгрд▓рд╛. рджреБрд╕рд░реЗ рдореНрд╣рдгрдЬреЗ, рд╣реЗ рдХреЙрдиреНрдЯреНрд░реЕрдкреНрд╢рди рдЕрддреНрдпрдВрдд рдлреЕрдиреНрд╕реА рдкреНрд░рдХреНрд░рд┐рдпрд╛рдВрд╕рд╛рдареА рдбрд┐рдЭрд╛рдЗрди рдХреЗрд▓реЗрд▓реЗ рдЖрд╣реЗ, рддреАрд╡реНрд░ рдШрдЯрдХрд╛рдВрдЪрд╛ рдкреБрдирд░реНрд╡рд╛рдкрд░ рдЖрдгрд┐ рдЗрддрд░ рдЕрддреНрдпрдВрдд-рдорд╣рддреНрддреНрд╡рд╛рдЪреНрдпрд╛-рдПрдВрдЯрд░рдкреНрд░рд╛рдЗрдЭ-рдпреБрдХреНрддреНрдпрд╛. рдПрдЕрд░рдмрд╕ AXNUMX / рд╡рд░реНрд╖рд╛рдЪреНрдпрд╛ рдкрдВрдЦрд╛рдкреНрд░рдорд╛рдгреЗ рддреНрдпрд╛рдЪреА рдХрд┐рдВрдордд рдХрд┐рддреА рдЖрд╣реЗ рдпрд╛рдмрджреНрджрд▓ рдЖрдореНрд╣реА рдХрд╛рд╣реАрд╣реА рдмреЛрд▓рдгрд╛рд░ рдирд╛рд╣реА.

    рд╕рд╛рд╡рдз рд░рд╣рд╛, рд╕реНрдХреНрд░реАрдирд╢реЙрдЯ 30 рд╡рд░реНрд╖рд╛рдВрдкреЗрдХреНрд╖рд╛ рдХрдореА рд╡рдпрд╛рдЪреНрдпрд╛ рд▓реЛрдХрд╛рдВрдирд╛ рддреНрд░рд╛рд╕ рджреЗрдК рд╢рдХрддреЛ

    Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

  • SQL рд╕рд░реНрд╡реНрд╣рд░ рдПрдХрддреНрд░реАрдХрд░рдг рд╕рд░реНрд╡реНрд╣рд░ - рдЖрдореНрд╣реА рдЖрдордЪреНрдпрд╛ рдЗрдВрдЯреНрд░рд╛-рдкреНрд░реЛрдЬреЗрдХреНрдЯ рдлреНрд▓реЛрдордзреНрдпреЗ рдпрд╛ рдХреЙрдорд░реЗрдбрдЪрд╛ рд╡рд╛рдкрд░ рдХреЗрд▓рд╛. рдмрд░рдВ, рдЦрд░рдВ рддрд░: рдЖрдореНрд╣реА рдЖрдзреАрдЪ рдПрд╕рдХреНрдпреВрдПрд▓ рд╕рд░реНрд╡реНрд╣рд░ рд╡рд╛рдкрд░рддреЛ рдЖрдгрд┐ рддреНрдпрд╛рдЪреА рдИрдЯреАрдПрд▓ рд╕рд╛рдзрдиреЗ рди рд╡рд╛рдкрд░рдгреЗ рдХрд╛рд╣реАрд╕реЗ рдЕрд╡рд╛рд╕реНрддрд╡ рдард░реЗрд▓. рддреНрдпрд╛рдд рд╕рд░реНрд╡ рдХрд╛рд╣реА рдЪрд╛рдВрдЧрд▓реЗ рдЖрд╣реЗ: рдЗрдВрдЯрд░рдлреЗрд╕ рд╕реБрдВрджрд░ рдЖрд╣реЗ рдЖрдгрд┐ рдкреНрд░рдЧрддреА рдЕрд╣рд╡рд╛рд▓ ... рдкрд░рдВрддреБ рдпрд╛рдореБрд│реЗ рдЖрдореНрд╣рд╛рд▓рд╛ рд╕реЙрдлреНрдЯрд╡реЗрдЕрд░ рдЙрддреНрдкрд╛рджрдиреЗ рдЖрд╡рдбрдд рдирд╛рд╣реАрдд, рдЕрд░реЗрд░реЗ, рдпрд╛рд╕рд╛рдареА рдирд╛рд╣реА. рддреНрдпрд╛рдЪреА рдЖрд╡реГрддреНрддреА dtsx (рдЬреЗ рдПрдХреНрд╕рдПрдордПрд▓ рдЖрд╣реЗ рдЬреНрдпрд╛рдордзреНрдпреЗ рд╕реЗрд╡реНрд╣рд╡рд░ рдиреЛрдбреНрд╕ рдмрджрд▓рд▓реЗ рдЖрд╣реЗрдд) рдЖрдореНрд╣реА рдХрд░реВ рд╢рдХрддреЛ, рдкрдг рдореБрджреНрджрд╛ рдХрд╛рдп рдЖрд╣реЗ? рд╢реЗрдХрдбреЛ рдЯреЗрдмрд▓реНрд╕ рдПрдХрд╛ рд╕рд░реНрд╡реНрд╣рд░рд╡рд░реВрди рджреБрд╕рд▒реНрдпрд╛ рд╕рд░реНрд╡реНрд╣рд░рд╡рд░ рдбреНрд░реЕрдЧ рдХрд░рдгрд╛рд░рдВ рдЯрд╛рд╕реНрдХ рдкреЕрдХреЗрдЬ рдмрдирд╡рдгреНрдпрд╛рдмрджреНрджрд▓ рдХрд╛рдп? рд╣реЛрдп, рд╢рдВрднрд░ рдХрд╛рдп, рдорд╛рдКрд╕рдЪреНрдпрд╛ рдмрдЯрдгрд╛рд╡рд░ рдХреНрд▓рд┐рдХ рдХрд░реВрди рд╡реАрд╕ рддреБрдХрдбреНрдпрд╛рдВрдордзреВрди рддреБрдордЪреА рддрд░реНрдЬрдиреА рдЦрд╛рд▓реА рдкрдбреЗрд▓. рдкрд░рдВрддреБ рд╣реЗ рдирд┐рд╢реНрдЪрд┐рддрдкрдгреЗ рдЕрдзрд┐рдХ рдлреЕрд╢рдиреЗрдмрд▓ рджрд┐рд╕рддреЗ:

    Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

рдЖрдореНрд╣реА рдирдХреНрдХреАрдЪ рдорд╛рд░реНрдЧ рд╢реЛрдзрдд рд╣реЛрддреЛ. рдХреЗрд╕ рдЕрдЧрджреА рдЬрд╡рд│рдкрд╛рд╕ рд╕реНрд╡-рд▓рд┐рдЦрд┐рдд SSIS рдкреЕрдХреЗрдЬ рдЬрдирд░реЗрдЯрд░рд╡рд░ рдЖрд▓реЗ ...

тАжрдЖрдгрд┐ рдордЧ рдорд▓рд╛ рдирд╡реАрди рдиреЛрдХрд░реА рдорд┐рд│рд╛рд▓реА. рдЖрдгрд┐ рддреНрдпрд╛рд╡рд░ рдЕрдкрд╛рдЪреЗ рдПрдЕрд░рдлреНрд▓реЛрдиреЗ рдорд▓рд╛ рдорд╛рдЧреЗ рдЯрд╛рдХрд▓реЗ.

рдЬреЗрд╡реНрд╣рд╛ рдорд▓рд╛ рдХрд│рд▓реЗ рдХреА ETL рдкреНрд░рдХреНрд░рд┐рдпреЗрдЪреЗ рд╡рд░реНрдгрди рд╕реЛрдкреЗ рдкрд╛рдпрдерди рдХреЛрдб рдЖрд╣реЗрдд, рддреЗрд╡реНрд╣рд╛ рдореА рдЖрдирдВрджрд╛рдиреЗ рдирд╛рдЪрд▓реЛ рдирд╛рд╣реА. рдЕрд╢рд╛рдкреНрд░рдХрд╛рд░реЗ рдбреЗрдЯрд╛ рд╕реНрдЯреНрд░реАрдореНрд╕рдЪреЗ рд╡реНрд╣рд░реНрдЬрди рдЖрдгрд┐ рдлрд░рдХ рдХреЗрд▓реЗ рдЧреЗрд▓реЗ рдЖрдгрд┐ рд╢реЗрдХрдбреЛ рдбреЗрдЯрд╛рдмреЗрд╕реЗрд╕рдордзреВрди рдПрдХрд╛рдЪ рд╕реНрдЯреНрд░рдХреНрдЪрд░рд╕рд╣ рдЯреЗрдмрд▓реНрд╕ рдПрдХрд╛ рдЯрд╛рд░реНрдЧреЗрдЯрдордзреНрдпреЗ рдЯрд╛рдХрдгреЗ рд╣реА рджреАрдб рдХрд┐рдВрд╡рд╛ рджреЛрди 13тАЭ рд╕реНрдХреНрд░реАрдирдордзреНрдпреЗ рдкрд╛рдпрдерди рдХреЛрдбрдЪреА рдмрд╛рдм рдмрдирд▓реА.

рдХреНрд▓рд╕реНрдЯрд░ рдПрдХрддреНрд░ рдХрд░рдгреЗ

рдЪрд▓рд╛ рдкреВрд░реНрдгрдкрдгреЗ рдмрд╛рд▓рд╡рд╛рдбреАрдЪреА рд╡реНрдпрд╡рд╕реНрдерд╛ рдХрд░реВ рдирдХрд╛ рдЖрдгрд┐ рдпреЗрдереЗ рдкреВрд░реНрдгрдкрдгреЗ рд╕реНрдкрд╖реНрдЯ рдЧреЛрд╖реНрдЯреАрдВрдмрджреНрджрд▓ рдмреЛрд▓реВ рдирдХрд╛, рдЬрд╕реЗ рдХреА рдПрдЕрд░рдлреНрд▓реЛ рд╕реНрдерд╛рдкрд┐рдд рдХрд░рдгреЗ, рддреБрдордЪрд╛ рдирд┐рд╡рдбрд▓реЗрд▓рд╛ рдбреЗрдЯрд╛рдмреЗрд╕, рд╕реЗрд▓реЗрд░реА рдЖрдгрд┐ рдбреЙрдХреНрд╕рдордзреНрдпреЗ рд╡рд░реНрдгрди рдХреЗрд▓реЗрд▓реНрдпрд╛ рдЗрддрд░ рдХреЗрд╕реЗрд╕.

рдЬреЗрдгреЗрдХрд░реВрди рдЖрдореНрд╣реА рддрд╛рдмрдбрддреЛрдм рдкреНрд░рдпреЛрдЧ рд╕реБрд░реВ рдХрд░реВ рд╢рдХреВ, рдореА рд░реЗрдЦрд╛рдЯрди рдХреЗрд▓реЗ docker-compose.yml рдЬреНрдпрд╛рдордзреНрдпреЗ:

  • рдЪрд▓рд╛ рдкреНрд░рддреНрдпрдХреНрд╖рд╛рдд рд╡рд╛рдврд╡реВ рд╡рд╛рдпреБрдкреНрд░рд╡рд╛рд╣: рд╢реЗрдбреНрдпреБрд▓рд░, рд╡реЗрдмрд╕рд░реНрд╡реНрд╣рд░. рд╕реЗрд▓рд░реАрдЪреНрдпрд╛ рдХрд╛рдорд╛рдВрдЪреЗ рдирд┐рд░реАрдХреНрд╖рдг рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдлреНрд▓реЙрд╡рд░ рджреЗрдЦреАрд▓ рддреЗрдереЗ рдлрд┐рд░рдд рдЕрд╕реЗрд▓ (рдХрд╛рд░рдг рддреЗ рдЖрдзреАрдЪ рдврдХрд▓рд▓реЗ рдЧреЗрд▓реЗ рдЖрд╣реЗ apache/airflow:1.10.10-python3.7рдкрдг рдЖрдордЪреА рд╣рд░рдХрдд рдирд╛рд╣реА)
  • рдкреЛрд╕реНрдЯрдЧреНрд░реЗ рдПрд╕рдХреНрдпреВрдПрд▓, рдЬреНрдпрд╛рдордзреНрдпреЗ рдПрдЕрд░рдлреНрд▓реЛ рддреНрдпрд╛рдЪреА рд╕реЗрд╡рд╛ рдорд╛рд╣рд┐рддреА (рд╢реЗрдбреНрдпреВрд▓рд░ рдбреЗрдЯрд╛, рдЕрдВрдорд▓рдмрдЬрд╛рд╡рдгреА рдЖрдХрдбреЗрд╡рд╛рд░реА рдЗ.) рд▓рд┐рд╣реАрд▓ рдЖрдгрд┐ рд╕реЗрд▓реЗрд░реА рдкреВрд░реНрдг рдЭрд╛рд▓реЗрд▓реНрдпрд╛ рдХрд╛рд░реНрдпрд╛рдВрдирд╛ рдЪрд┐рдиреНрд╣рд╛рдВрдХрд┐рдд рдХрд░реЗрд▓;
  • Redis, рдЬреЗ рд╕реЗрд▓реЗрд░реАрд╕рд╛рдареА рдЯрд╛рд╕реНрдХ рдмреНрд░реЛрдХрд░ рдореНрд╣рдгреВрди рдХрд╛рдо рдХрд░реЗрд▓;
  • рд╕реЗрд▓реЗрд░реА рдХрд╛рдордЧрд╛рд░, рдЬреЗ рдХрд╛рд░реНрдпрд╛рдВрдЪреНрдпрд╛ рдереЗрдЯ рдЕрдВрдорд▓рдмрдЬрд╛рд╡рдгреАрдордзреНрдпреЗ рдЧреБрдВрддрд▓реЗрд▓реЗ рдЕрд╕реЗрд▓.
  • рдлреЛрд▓реНрдбрд░ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА ./dags рдЖрдореНрд╣реА рдЖрдордЪреНрдпрд╛ рдлрд╛рдпрд▓реА рдбреЕрдЧреНрдЬрдЪреНрдпрд╛ рд╡рд░реНрдгрдирд╛рд╕рд╣ рдЬреЛрдбреВ. рддреЗ рдорд╛рд╢реАрд╡рд░ рдЙрдЪрд▓рд▓реЗ рдЬрд╛рддреАрд▓, рдореНрд╣рдгреВрди рдкреНрд░рддреНрдпреЗрдХ рд╢рд┐рдВрдХрд▓реНрдпрд╛рдирдВрддрд░ рд╕рдВрдкреВрд░реНрдг рд╕реНрдЯреЕрдХрд▓рд╛ рд╣рд╛рдд рд▓рд╛рд╡рдгреНрдпрд╛рдЪреА рдЧрд░рдЬ рдирд╛рд╣реА.

рдХрд╛рд╣реА рдард┐рдХрд╛рдгреА, рдЙрджрд╛рд╣рд░рдгрд╛рдВрдордзреАрд▓ рдХреЛрдб рдкреВрд░реНрдгрдкрдгреЗ рджрд░реНрд╢рд╡рд┐рд▓рд╛ рдЬрд╛рдд рдирд╛рд╣реА (рдЬреЗрдгреЗрдХрд░реБрди рдордЬрдХреВрд░ рдЧреЛрдВрдзрд│реВрди рдЬрд╛рдК рдирдпреЗ), рдкрд░рдВрддреБ рдХреБрдареЗрддрд░реА рдкреНрд░рдХреНрд░рд┐рдпреЗрдд рддреЛ рд╕реБрдзрд╛рд░рд┐рдд рдХреЗрд▓рд╛ рдЬрд╛рддреЛ. рд╕рдВрдкреВрд░реНрдг рдХрд╛рд░реНрдп рдХреЛрдб рдЙрджрд╛рд╣рд░рдгреЗ рд░реЗрдкреЙрдЬрд┐рдЯрд░реАрдордзреНрдпреЗ рдЖрдврд│реВ рд╢рдХрддрд╛рдд https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

рдЯрд┐рдкрд╛:

  • рд░рдЪрдиреЗрдЪреНрдпрд╛ рдЕрд╕реЗрдВрдмреНрд▓реАрдордзреНрдпреЗ, рдореА рдореЛрдареНрдпрд╛ рдкреНрд░рдорд╛рдгрд╛рд╡рд░ рд╕реБрдкреНрд░рд╕рд┐рджреНрдз рдкреНрд░рддрд┐рдореЗрд╡рд░ рдЕрд╡рд▓рдВрдмреВрди рд╣реЛрддреЛ рдкреБрдХрд▓/рдбреЙрдХрд░-рдПрдЕрд░рдлреНрд▓реЛ - рд╣реЗ рдирдХреНрдХреА рдкрд╣рд╛. рдХрджрд╛рдЪрд┐рдд рддреБрдореНрд╣рд╛рд▓рд╛ рддреБрдордЪреНрдпрд╛ рдЖрдпреБрд╖реНрдпрд╛рдд рдЗрддрд░ рдХрд╢рд╛рдЪреАрд╣реА рдЧрд░рдЬ рдирд╕реЗрд▓.
  • рд╕рд░реНрд╡ рдПрдЕрд░рдлреНрд▓реЛ рд╕реЗрдЯрд┐рдВрдЧреНрдЬ рдХреЗрд╡рд│ рджреНрд╡рд╛рд░реЗ рдЙрдкрд▓рдмреНрдз рдирд╛рд╣реАрдд airflow.cfg, рдкрд░рдВрддреБ рдкрд░реНрдпрд╛рд╡рд░рдгреАрдп рд╡реНрд╣реЗрд░рд┐рдПрдмрд▓реНрд╕рджреНрд╡рд╛рд░реЗ рджреЗрдЦреАрд▓ (рд╡рд┐рдХрд╛рд╕рдХрд╛рдВрдирд╛ рдзрдиреНрдпрд╡рд╛рдж), рдЬреНрдпрд╛рдЪрд╛ рдореА рджреБрд░реНрднрд╛рд╡рдирд╛рдкреВрд░реНрдгрдкрдгреЗ рдлрд╛рдпрджрд╛ рдШреЗрддрд▓рд╛.
  • рд╕реНрд╡рд╛рднрд╛рд╡рд┐рдХрдЪ, рддреЗ рдЙрддреНрдкрд╛рджрдирд╛рд╕рд╛рдареА рддрдпрд╛рд░ рдирд╛рд╣реА: рдореА рдЬрд╛рдгреАрд╡рдкреВрд░реНрд╡рдХ рдХрдВрдЯреЗрдирд░рд╡рд░ рд╣реГрджрдпрд╛рдЪреЗ рдареЛрдХреЗ рдареЗрд╡рд▓реЗ рдирд╛рд╣реАрдд, рдорд▓рд╛ рд╕реБрд░рдХреНрд╖рд┐рддрддреЗрдЪрд╛ рддреНрд░рд╛рд╕ рдЭрд╛рд▓рд╛ рдирд╛рд╣реА. рдкрдг рдореА рдЖрдордЪреНрдпрд╛ рдкреНрд░рдпреЛрдЧрдХрд░реНрддреНрдпрд╛рдВрд╕рд╛рдареА рдХрд┐рдорд╛рди рдпреЛрдЧреНрдп рддреЗ рдХреЗрд▓реЗ.
  • рд▓рдХреНрд╖рд╛рдд рдареЗрд╡рд╛ рдХреА:
    • рдбреЕрдЧ рдлреЛрд▓реНрдбрд░ рд╢реЗрдбреНрдпреВрд▓рд░ рдЖрдгрд┐ рдХрд╛рдордЧрд╛рд░ рджреЛрдШрд╛рдВрдирд╛рд╣реА рдкреНрд░рд╡реЗрд╢рдпреЛрдЧреНрдп рдЕрд╕рдгреЗ рдЖрд╡рд╢реНрдпрдХ рдЖрд╣реЗ.
    • рд╣реЗ рд╕рд░реНрд╡ рддреГрддреАрдп-рдкрдХреНрд╖ рд▓рд╛рдпрдмреНрд░рд░реАрдВрдирд╛ рд▓рд╛рдЧреВ рд╣реЛрддреЗ - рддреЗ рд╕рд░реНрд╡ рд╢реЗрдбреНрдпреВрд▓рд░ рдЖрдгрд┐ рдХрд╛рдордЧрд╛рд░рд╛рдВрд╕рд╣ рдорд╢реАрдирд╡рд░ рд╕реНрдерд╛рдкрд┐рдд рдХреЗрд▓реЗ рдЬрд╛рдгреЗ рдЖрд╡рд╢реНрдпрдХ рдЖрд╣реЗ.

рдмрд░рдВ, рдЖрддрд╛ рд╣реЗ рд╕реЛрдкреЗ рдЖрд╣реЗ:

$ docker-compose up --scale worker=3

рд╕рд░реНрд╡рдХрд╛рд╣реА рд╡рд╛рдврд▓реНрдпрд╛рдирдВрддрд░, рдЖрдкрдг рд╡реЗрдм рдЗрдВрдЯрд░рдлреЗрд╕ рдкрд╛рд╣реВ рд╢рдХрддрд╛:

рдореВрд▓рднреВрдд рд╕рдВрдХрд▓реНрдкрдирд╛

рдЬрд░ рддреБрдореНрд╣рд╛рд▓рд╛ рдпрд╛ рд╕рд░реНрд╡ "рдбреЕрдЧреНрдЬ" рдордзреАрд▓ рдХрд╛рд╣реАрд╣реА рд╕рдордЬрд▓реЗ рдирд╕реЗрд▓, рддрд░ рдпреЗрдереЗ рдПрдХ рд▓рд╣рд╛рди рд╢рдмреНрджрдХреЛрд╢ рдЖрд╣реЗ:

  • рд╢реЗрдбреНрдпреВрд▓рд░ - рдПрдЕрд░рдлреНрд▓реЛрдордзреАрд▓ рд╕рд░реНрд╡рд╛рдд рдорд╣рддреНрд╡рд╛рдЪреЗ рдХрд╛рдХрд╛, рдЬреЗ рд░реЛрдмреЛрдЯ рдХрдареЛрд░ рдкрд░рд┐рд╢реНрд░рдо рдХрд░рддрд╛рдд рдпрд╛рд╡рд░ рдирд┐рдпрдВрддреНрд░рдг рдареЗрд╡рддрд╛рдд, рд╡реНрдпрдХреНрддреА рдирд╛рд╣реА: рд╡реЗрд│рд╛рдкрддреНрд░рдХрд╛рдЪреЗ рдирд┐рд░реАрдХреНрд╖рдг рдХрд░рддрд╛рдд, рдбреЕрдЧреНрдЬ рдЕрдкрдбреЗрдЯ рдХрд░рддрд╛рдд, рдХрд╛рд░реНрдпреЗ рд▓рд╛рдБрдЪ рдХрд░рддрд╛рдд.

    рд╕рд░реНрд╡рд╕рд╛рдзрд╛рд░рдгрдкрдгреЗ, рдЬреБрдиреНрдпрд╛ рдЖрд╡реГрддреНрддреНрдпрд╛рдВрдордзреНрдпреЗ, рддреНрдпрд╛рд▓рд╛ рд╕реНрдорд░рдгрд╢рдХреНрддреАрдЪреА рд╕рдорд╕реНрдпрд╛ рд╣реЛрддреА (рдирд╛рд╣реА, рд╕реНрдореГрддрд┐рднреНрд░рдВрд╢ рдирд╛рд╣реА, рдкрд░рдВрддреБ рд▓реАрдХ) рдЖрдгрд┐ рд▓реЗрдЧрд╕реА рдкреЕрд░рд╛рдореАрдЯрд░ рдЕрдЧрджреА рдХреЙрдиреНрдлрд┐рдЧрдордзреНрдпреЗ рд░рд╛рд╣рд┐рд▓реЗ. run_duration - рддреНрдпрд╛рдЪреЗ рд░реАрд╕реНрдЯрд╛рд░реНрдЯ рдордзреНрдпрд╛рдВрддрд░. рдкрдг рдЖрддрд╛ рд╕рд░реНрд╡ рдХрд╛рд╣реА рдареАрдХ рдЖрд╣реЗ.

  • рджрд┐рд╡рд╕ (рдЙрд░реНрдл "рдбреЕрдЧ") - "рджрд┐рдЧреНрджрд░реНрд╢рд┐рдд рдЕреЕрд╕рд╛рдпрдХреНрд▓рд┐рдХ рдЖрд▓реЗрдЦ", рдкрд░рдВрддреБ рдЕрд╢реА рд╡реНрдпрд╛рдЦреНрдпрд╛ рдХрд╛рд╣реА рд▓реЛрдХрд╛рдВрдирд╛ рд╕рд╛рдВрдЧреЗрд▓, рдкрд░рдВрддреБ рдкреНрд░рддреНрдпрдХреНрд╖рд╛рдд рддреЗ рдПрдХрдореЗрдХрд╛рдВрд╢реА рд╕рдВрд╡рд╛рдж рд╕рд╛рдзрдгрд╛рд░реНтАНрдпрд╛ рдХрд╛рд░реНрдпрд╛рдВрд╕рд╛рдареА рдХрдВрдЯреЗрдирд░ рдЖрд╣реЗ (рдЦрд╛рд▓реА рдкрд╣рд╛) рдХрд┐рдВрд╡рд╛ SSIS рдордзреАрд▓ рдкреЕрдХреЗрдЬ рдЖрдгрд┐ рдЗрдиреНрдлреЙрд░реНрдореЗрдЯрд┐рдХрд╛ рдордзреАрд▓ рд╡рд░реНрдХрдлреНрд▓реЛрдЪреЗ рдЕреЕрдирд╛рд▓реЙрдЧ .

    рдбреЕрдЧреНрдЬ рд╡реНрдпрддрд┐рд░рд┐рдХреНрдд, рдЕрдЬреВрдирд╣реА рд╕рдмрдбреЕрдЧ рдЕрд╕реВ рд╢рдХрддрд╛рдд, рдкрд░рдВрддреБ рдмрд╣реБрдзрд╛ рдЖрдореНрд╣рд╛рд▓рд╛ рддреЗ рдорд┐рд│рдгрд╛рд░ рдирд╛рд╣реАрдд.

  • DAG рдзрд╛рд╡ - рдЖрд░рдВрдн рдХреЗрд▓реЗрд▓рд╛ рдбреЕрдЧ, рдЬреЛ рд╕реНрд╡рддрдГрдЪрд╛ рдирд┐рдпреБрдХреНрдд рдХреЗрд▓рд╛ рдЬрд╛рддреЛ execution_date. рд╕рдорд╛рди рдбреЗрдЧрдЪреЗ рдбреЕрдЧрди рд╕рдорд╛рдВрддрд░рдкрдгреЗ рдХрд╛рд░реНрдп рдХрд░реВ рд╢рдХрддрд╛рдд (рдЬрд░ рддреБрдореНрд╣реА рддреБрдордЪреА рдХрд╛рд░реНрдпреЗ рдирд┐рдГрд╕рдВрд╢рдпрдкрдгреЗ рдХреЗрд▓реА рдЕрд╕рддреАрд▓ рддрд░).
  • рдСрдкрд░реЗрдЯрд░ рд╡рд┐рд╢рд┐рд╖реНрдЯ рдХреНрд░рд┐рдпрд╛ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдЬрдмрд╛рдмрджрд╛рд░ рдЕрд╕рд▓реЗрд▓реНрдпрд╛ рдХреЛрдбрдЪреЗ рддреБрдХрдбреЗ рдЖрд╣реЗрдд. рддреАрди рдкреНрд░рдХрд╛рд░рдЪреЗ рдСрдкрд░реЗрдЯрд░ рдЖрд╣реЗрдд:
    • рдХрд╛рд░рд╡рд╛рдИрдЖрдордЪреНрдпрд╛ рдЖрд╡рдбрддреНрдпрд╛рдкреНрд░рдорд╛рдгреЗ PythonOperator, рдЬреЛ рдХреЛрдгрддрд╛рд╣реА (рд╡реИрдз) рдкрд╛рдпрдерди рдХреЛрдб рдХрд╛рд░реНрдпрд╛рдиреНрд╡рд┐рдд рдХрд░реВ рд╢рдХрддреЛ;
    • рд╣рд╕реНрддрд╛рдВрддрд░рдг, рдЬреЗ рдард┐рдХрд╛рдгрд╛рд╣реВрди рдбреЗрдЯрд╛ рд╡рд╛рд╣рддреВрдХ рдХрд░рддрд╛рдд, рдореНрд╣рдгрддрд╛рдд, MsSqlToHiveTransfer;
    • рд╕реЗрдиреНрд╕рд░ рджреБрд╕рд░реАрдХрдбреЗ, рдПрдЦрд╛рджреА рдШрдЯрдирд╛ рдШрдбреЗрдкрд░реНрдпрдВрдд рддреЗ рддреБрдореНрд╣рд╛рд▓рд╛ рдкреНрд░рддрд┐рдХреНрд░рд┐рдпрд╛ рджреЗрдгреНрдпрд╛рд╕ рдХрд┐рдВрд╡рд╛ рдбреЕрдЧрдЪреНрдпрд╛ рдкреБрдвреАрд▓ рдЕрдВрдорд▓рдмрдЬрд╛рд╡рдгреАрдЪреА рдЧрддреА рдХрдореА рдХрд░рдгреНрдпрд╛рд╕ рдЕрдиреБрдорддреА рджреЗрдИрд▓. HttpSensor рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдПрдВрдбрдкреЙрдЗрдВрдЯ рдЦреЗрдЪреВ рд╢рдХрддреЛ, рдЖрдгрд┐ рдЬреЗрд╡реНрд╣рд╛ рдЗрдЪреНрдЫрд┐рдд рдкреНрд░рддрд┐рд╕рд╛рдж рдкреНрд░рддреАрдХреНрд╖рд╛ рдХрд░рдд рдЕрд╕реЗрд▓ рддреЗрд╡реНрд╣рд╛ рд╣рд╕реНрддрд╛рдВрддрд░рдг рд╕реБрд░реВ рдХрд░рд╛ GoogleCloudStorageToS3Operator. рдПрдХ рдЬрд┐рдЬреНрдЮрд╛рд╕реВ рдорди рд╡рд┐рдЪрд╛рд░реЗрд▓: тАЬрдХрд╛? рд╢реЗрд╡рдЯреА, рддреБрдореНрд╣реА рдСрдкрд░реЗрдЯрд░рдордзреНрдпреЗрдЪ рдкреБрдирд░рд╛рд╡реГрддреНрддреА рдХрд░реВ рд╢рдХрддрд╛!тАЭ рдЖрдгрд┐ рдордЧ, рдирд┐рд▓рдВрдмрд┐рдд рдСрдкрд░реЗрдЯрд░рд╕рд╣ рдХрд╛рд░реНрдпрд╛рдВрдЪрд╛ рдкреВрд▓ рд░реЛрдЦреВ рдирдпреЗ рдореНрд╣рдгреВрди. рдкреБрдвреАрд▓ рдкреНрд░рдпрддреНрдирд╛рдкреВрд░реНрд╡реА рд╕реЗрдиреНрд╕рд░ рд╕реБрд░реВ рд╣реЛрддреЛ, рддрдкрд╛рд╕рддреЛ рдЖрдгрд┐ рдорд░рддреЛ.
  • рдХрд╛рд░реНрдп - рдШреЛрд╖рд┐рдд рдСрдкрд░реЗрдЯрд░, рдкреНрд░рдХрд╛рд░рд╛рдХрдбреЗ рджреБрд░реНрд▓рдХреНрд╖ рдХрд░реВрди, рдЖрдгрд┐ рдбреЕрдЧрд╢реА рд╕рдВрд▓рдЧреНрди рдХреЗрд▓реЗрд▓реНрдпрд╛ рдХрд╛рд░реНрдпрд╛рдЪреНрдпрд╛ рд░рдБрдХрд╡рд░ рдкрджреЛрдиреНрдирдд рдХреЗрд▓реЗ рдЬрд╛рддрд╛рдд.
  • рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдг - рдЬреЗрд╡реНрд╣рд╛ рд╕рд╛рдорд╛рдиреНрдп рдирд┐рдпреЛрдЬрдХрд╛рдиреЗ рдард░рд╡рд▓реЗ рдХреА рдХрд╛рдордЧрд┐рд░реА рдХрд░рдгрд╛рд░реНтАНрдпрд╛ рдХрд╛рдордЧрд╛рд░рд╛рдВрд╡рд░ рд▓рдврд╛рдИрдд рдХрд╛рд░реНрдпреЗ рдкрд╛рдард╡рдгреНрдпрд╛рдЪреА рд╡реЗрд│ рдЖрд▓реА рдЖрд╣реЗ (рдЬрд░ рдЖрдкрдг рд╡рд╛рдкрд░рдд рдЕрд╕рд╛рд▓ рддрд░ LocalExecutor рдХрд┐рдВрд╡рд╛ рдЪреНрдпрд╛ рдмрд╛рдмрддреАрдд рд░рд┐рдореЛрдЯ рдиреЛрдбрд▓рд╛ CeleryExecutor), рддреЗ рддреНрдпрд╛рдВрдирд╛ рд╕рдВрджрд░реНрдн рдирд┐рдпреБрдХреНрдд рдХрд░рддреЗ (рдореНрд╣рдгрдЬреЗ, рд╡реНрд╣реЗрд░рд┐рдПрдмрд▓реНрд╕рдЪрд╛ рдПрдХ рд╕рдВрдЪ - рдЕрдВрдорд▓рдмрдЬрд╛рд╡рдгреА рдкреЕрд░рд╛рдореАрдЯрд░реНрд╕), рдХрдорд╛рдВрдб рдХрд┐рдВрд╡рд╛ рдХреНрд╡реЗрд░реА рдЯреЗрдореНрдкрд▓реЗрдЯреНрд╕ рд╡рд┐рд╕реНрддреГрдд рдХрд░рддреЗ рдЖрдгрд┐ рддреНрдпрд╛рдВрдирд╛ рдПрдХрддреНрд░ рдХрд░рддреЗ.

рдЖрдореНрд╣реА рдХрд╛рд░реНрдпреЗ рд╡реНрдпреБрддреНрдкрдиреНрди рдХрд░рддреЛ

рдкреНрд░рдердо, рдЖрдордЪреНрдпрд╛ рдбрдЧрдЪреНрдпрд╛ рд╕рд╛рдорд╛рдиреНрдп рдпреЛрдЬрдиреЗрдЪреА рд░реВрдкрд░реЗрд╖рд╛ рдХрд╛рдвреВрдпрд╛, рдЖрдгрд┐ рдирдВрддрд░ рдЖрдореНрд╣реА рдЕрдзрд┐рдХрд╛рдзрд┐рдХ рддрдкрд╢реАрд▓рд╛рдВрдордзреНрдпреЗ рдЬрд╛рдК, рдХрд╛рд░рдг рдЖрдореНрд╣реА рдХрд╛рд╣реА рдХреНрд╖реБрд▓реНрд▓рдХ рдЙрдкрд╛рдп рд▓рд╛рдЧреВ рдХрд░рддреЛ.

рддрд░, рддреНрдпрд╛рдЪреНрдпрд╛ рд╕рд░реНрд╡рд╛рдд рд╕реЛрдкреНрдпрд╛ рд╕реНрд╡рд░реВрдкрд╛рдд, рдЕрд╕рд╛ рдбрдЧ рдпрд╛рд╕рд╛рд░рдЦрд╛ рджрд┐рд╕реЗрд▓:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

рдЪрд▓рд╛ рддреЗ рд╢реЛрдзреВрдпрд╛:

  • рдкреНрд░рдердо, рдЖрдореНрд╣реА рдЖрд╡рд╢реНрдпрдХ libs рдЖрдпрд╛рдд рдХрд░рддреЛ рдЖрдгрд┐ рдХрд╛рд╣реАрддрд░реА;
  • sql_server_ds - рд╣реЗ рдЖрд╣реЗ List[namedtuple[str, str]] рдПрдЕрд░рдлреНрд▓реЛ рдХрдиреЗрдХреНрд╢рдиреНрд╕рдЪреНрдпрд╛ рдХрдиреЗрдХреНрд╢рдирдЪреНрдпрд╛ рдирд╛рд╡рд╛рдВрд╕рд╣ рдЖрдгрд┐ рдбреЗрдЯрд╛рдмреЗрд╕реЗрд╕ рдЬреНрдпрд╛рдордзреВрди рдЖрдореНрд╣реА рдЖрдордЪреА рдкреНрд▓реЗрдЯ рдШреЗрдК;
  • dag - рдЖрдордЪреНрдпрд╛ рдбреЗрдЧрдЪреА рдШреЛрд╖рдгрд╛, рдЬреА рдЕрдкрд░рд┐рд╣рд╛рд░реНрдпрдкрдгреЗ рдЕрд╕рдгреЗ рдЖрд╡рд╢реНрдпрдХ рдЖрд╣реЗ globals(), рдЕрдиреНрдпрдерд╛ Airflow рд▓рд╛ рддреЗ рд╕рд╛рдкрдбрдгрд╛рд░ рдирд╛рд╣реА. рдбрдЧ рд╣реЗ рджреЗрдЦреАрд▓ рдореНрд╣рдгрд╛рдпрдЪреЗ рдЖрд╣реЗ:
    • рддреНрдпрд╛рдЪреЗ рдирд╛рд╡ рдХрд╛рдп orders - рд╣реЗ рдирд╛рд╡ рдирдВрддрд░ рд╡реЗрдм рдЗрдВрдЯрд░рдлреЗрд╕рдордзреНрдпреЗ рджрд┐рд╕реЗрд▓,
    • рддреЛ рдЖрда рдЬреБрд▓реИрдЪреНрдпрд╛ рдордзреНрдпрд░рд╛рддреНрд░реАрдкрд╛рд╕реВрди рдХрд╛рдо рдХрд░реЗрд▓,
    • рдЖрдгрд┐ рддреЗ рдЪрд╛рд▓рд▓реЗ рдкрд╛рд╣рд┐рдЬреЗ, рдЕрдВрджрд╛рдЬреЗ рджрд░ 6 рддрд╛рд╕рд╛рдВрдиреА (рддреНрдпрд╛рдРрд╡рдЬреА рдпреЗрдереЗ рдХрдареАрдг рдореБрд▓рд╛рдВрд╕рд╛рдареА timedelta() рд╕реНрд╡реАрдХрд╛рд░реНрдп cron-рдУрд│ 0 0 0/6 ? * * *, рдХрдореА рдердВрдб рд╕рд╛рдареА - рдПрдХ рдЕрднрд┐рд╡реНрдпрдХреНрддреА рдЬрд╕реЗ @daily);
  • workflow() рдореБрдЦреНрдп рдХрд╛рдо рдХрд░реЗрд▓, рдкрдг рдЖрддрд╛ рдирд╛рд╣реА. рдЖрддреНрддрд╛рд╕рд╛рдареА, рдЖрдореНрд╣реА рдЖрдордЪрд╛ рд╕рдВрджрд░реНрдн рд▓реЙрдЧрдордзреНрдпреЗ рдЯрд╛рдХреВ.
  • рдЖрдгрд┐ рдЖрддрд╛ рдХрд╛рд░реНрдпреЗ рддрдпрд╛рд░ рдХрд░рдгреНрдпрд╛рдЪреА рд╕рд╛рдзреА рдЬрд╛рджреВ:
    • рдЖрдореНрд╣реА рдЖрдордЪреНрдпрд╛ рд╕реНрддреНрд░реЛрддрд╛рдВрдордзреВрди рдзрд╛рд╡рддреЛ;
    • рдЖрд░рдВрдн рдХрд░рд╛ PythonOperator, рдЬреЗ рдЖрдордЪреНрдпрд╛ рдбрдореАрдЪреА рдЕрдВрдорд▓рдмрдЬрд╛рд╡рдгреА рдХрд░реЗрд▓ workflow(). рдЯрд╛рд╕реНрдХрдЪреЗ рдЕрдирдиреНрдп (рдбреЗрдЧрдордзреНрдпреЗ) рдирд╛рд╡ рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХрд░рдгреНрдпрд╛рд╕ рд╡рд┐рд╕рд░реВ рдирдХрд╛ рдЖрдгрд┐ рдбреЕрдЧ рд╕реНрд╡рддрдГрдЪ рдмрд╛рдВрдзрд╛. рдЭреЗрдВрдбрд╛ provide_context рдпрд╛рдордзреВрди, рдлрдВрдХреНрд╢рдирдордзреНрдпреЗ рдЕрддрд┐рд░рд┐рдХреНрдд рдпреБрдХреНрддрд┐рд╡рд╛рдж рдУрддрддреЛ, рдЬреНрдпрд╛рдЪрд╛ рд╡рд╛рдкрд░ рдХрд░реВрди рдЖрдореНрд╣реА рдХрд╛рд│рдЬреАрдкреВрд░реНрд╡рдХ рдЧреЛрд│рд╛ рдХрд░реВ **context.

рдЖрддрд╛рд╕рд╛рдареА, рдПрд╡рдвреЗрдЪ. рдЖрдореНрд╣рд╛рд▓рд╛ рдХрд╛рдп рдорд┐рд│рд╛рд▓реЗ:

  • рд╡реЗрдм рдЗрдВрдЯрд░рдлреЗрд╕рдордзреНрдпреЗ рдирд╡реАрди рдбреЗрдЧ,
  • рджреАрдбрд╢реЗ рдХрд╛рд░реНрдпреЗ рд╕рдорд╛рдВрддрд░рдкрдгреЗ рдкрд╛рд░ рдкрд╛рдбрд▓реА рдЬрд╛рддреАрд▓ (рдЬрд░ рдПрдЕрд░рдлреНрд▓реЛ, рд╕реЗрд▓реЗрд░реА рд╕реЗрдЯрд┐рдВрдЧреНрдЬ рдЖрдгрд┐ рд╕рд░реНрд╡реНрд╣рд░рдЪреА рдХреНрд╖рдорддрд╛ рдкрд░рд╡рд╛рдирдЧреА рджреЗрдд тАЛтАЛрдЕрд╕реЗрд▓).

рдмрд░рдВ, рдЬрд╡рд│рдЬрд╡рд│ рд╕рдордЬрд▓рдВ.

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ
рдЕрд╡рд▓рдВрдмрд┐рддреНрд╡ рдХреЛрдг рд╕реНрдерд╛рдкрд┐рдд рдХрд░реЗрд▓?

рд╣реА рд╕рдВрдкреВрд░реНрдг рдЧреЛрд╖реНрдЯ рд╕реЛрдкреА рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА, рдореА рд╕реНрдХреНрд░реВ рдХреЗрд▓реЗ docker-compose.yml рдкреНрд░рдХреНрд░рд┐рдпрд╛ requirements.txt рд╕рд░реНрд╡ рдиреЛрдбреНрд╕ рд╡рд░.

рдЖрддрд╛ рддреЗ рдЧреЗрд▓реЗ рдЖрд╣реЗ:

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

рдЧреНрд░реЗ рд╕реНрдХреНрд╡реЗрдЕрд░ рд╣реЗ рд╢реЗрдбреНрдпреБрд▓рд░рджреНрд╡рд╛рд░реЗ рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдХреЗрд▓реЗрд▓реЗ рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдг рдЖрд╣реЗрдд.

рдЖрдореНрд╣реА рдереЛрдбреА рдкреНрд░рддреАрдХреНрд╖рд╛ рдХрд░рддреЛ, рдХрд╛рд░реНрдпреЗ рдХрд╛рдордЧрд╛рд░рд╛рдВрдиреА рдкреВрд░реНрдг рдХреЗрд▓реА рдЖрд╣реЗрдд:

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

рд╣рд┐рд░рд╡реЗ, рдЕрд░реНрдерд╛рддрдЪ, рддреНрдпрд╛рдВрдЪреЗ рдХрд╛рдо рдпрд╢рд╕реНрд╡реАрд░рд┐рддреНрдпрд╛ рдкреВрд░реНрдг рдХреЗрд▓реЗ рдЖрд╣реЗ. рд░реЗрдбреНрд╕ рдлрд╛рд░рд╕реЗ рдпрд╢рд╕реНрд╡реА рдирд╛рд╣реАрдд.

рддрд╕реЗ, рдЖрдордЪреНрдпрд╛ рдЙрддреНрдкрд╛рджрдирд╛рд╡рд░ рдХреЛрдгрддреЗрд╣реА рдлреЛрд▓реНрдбрд░ рдирд╛рд╣реА ./dags, рдорд╢реАрдиреНрд╕рдордзреНрдпреЗ рдХреЛрдгрддреЗрд╣реА рд╕рд┐рдВрдХреНрд░реЛрдирд╛рдЗрдЭреЗрд╢рди рдирд╛рд╣реА - рд╕рд░реНрд╡ рдбреЕрдЧреНрдЬ рдЖрдд рдЖрд╣реЗрдд git рдЖрдордЪреНрдпрд╛ Gitlab рд╡рд░, рдЖрдгрд┐ Gitlab CI рдорд╢рд┐рдирдордзреНрдпреЗ рд╡рд┐рд▓реАрди рдЭрд╛рд▓реНрдпрд╛рд╡рд░ рдЕрдкрдбреЗрдЯ рд╡рд┐рддрд░рд┐рдд рдХрд░рддреЗ master.

рдлреНрд▓реЙрд╡рд░ рдмрджреНрджрд▓ рдереЛрдбреЗ

рдХрд╛рдордЧрд╛рд░ рдЖрдордЪреНрдпрд╛ рдкреЕрд╕рд┐рдлрд╛рдпрд░рд▓рд╛ рдорд╛рд░рдд рдЕрд╕рддрд╛рдирд╛, рдЖрдкрдг рдЖрдгрдЦреА рдПрдХ рд╕рд╛рдзрди рд▓рдХреНрд╖рд╛рдд рдареЗрд╡реВ рдЬреЗ рдЖрдкрд▓реНрдпрд╛рд▓рд╛ рдХрд╛рд╣реАрддрд░реА рджрд░реНрд╢рд╡реВ рд╢рдХрддреЗ - рдлреНрд▓реЙрд╡рд░.

рдХрд╛рдордЧрд╛рд░ рдиреЛрдбреНрд╕рд╡рд░реАрд▓ рд╕рд╛рд░рд╛рдВрд╢ рдорд╛рд╣рд┐рддреАрд╕рд╣ рдкрд╣рд┐рд▓реЗ рдкреГрд╖реНрда:

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

рдХрд╛рд░реНрдп рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдЧреЗрд▓реЗрд▓реНрдпрд╛ рдХрд╛рд░реНрдпрд╛рдВрд╕рд╣ рд╕рд░реНрд╡рд╛рдд рддреАрд╡реНрд░ рдкреГрд╖реНрда:

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

рдЖрдордЪреНрдпрд╛ рдмреНрд░реЛрдХрд░рдЪреНрдпрд╛ рд╕реНрдерд┐рддреАрд╕рд╣ рд╕рд░реНрд╡рд╛рдд рдХрдВрдЯрд╛рд│рд╡рд╛рдгрд╛ рдкреГрд╖реНрда:

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

рд╕рд░реНрд╡рд╛рдд рдЙрдЬрд│ рдкреГрд╖реНрда рд╣реЗ рдХрд╛рд░реНрдп рд╕реНрдерд┐рддреА рдЖрд▓реЗрдЦ рдЖрдгрд┐ рддреНрдпрд╛рдВрдЪреНрдпрд╛ рдЕрдВрдорд▓рдмрдЬрд╛рд╡рдгреАрдЪреНрдпрд╛ рд╡реЗрд│реЗрд╕рд╣ рдЖрд╣реЗ:

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

рдЖрдореНрд╣реА рдЕрдВрдбрд░рд▓реЛрдбреЗрдб рд▓реЛрдб рдХрд░рддреЛ

рддрд░, рд╕рд░реНрд╡ рдХрд╛рд░реНрдпреЗ рдкреВрд░реНрдг рдЭрд╛рд▓реА рдЖрд╣реЗрдд, рдЖрдкрдг рдЬрдЦрдореАрдВрдирд╛ рдШреЗрдКрди рдЬрд╛рдК рд╢рдХрддрд╛.

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

рдЖрдгрд┐ рддреЗрдереЗ рдмрд░реЗрдЪ рдЬрдЦрдореА рдЭрд╛рд▓реЗ - рдПрдХ рдХрд┐рдВрд╡рд╛ рджреБрд╕рд░реНрдпрд╛ рдХрд╛рд░рдгрд╛рд╕реНрддрд╡. рдПрдЕрд░рдлреНрд▓реЛрдЪреНрдпрд╛ рдпреЛрдЧреНрдп рд╡рд╛рдкрд░рд╛рдЪреНрдпрд╛ рдмрд╛рдмрддреАрдд, рд╣реЗ рдЪреМрд░рд╕ рд╕реВрдЪрд┐рдд рдХрд░рддрд╛рдд рдХреА рдбреЗрдЯрд╛ рдирд┐рд╢реНрдЪрд┐рддрдкрдгреЗ рдЖрд▓рд╛ рдирд╛рд╣реА.

рддреБрдореНрд╣рд╛рд▓рд╛ рд▓реЙрдЧ рдкрд╛рд╣рдгреНрдпрд╛рдЪреА рдЖрдгрд┐ рдкрдбрд▓реЗрд▓реНрдпрд╛ рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдгреЗ рд░реАрд╕реНрдЯрд╛рд░реНрдЯ рдХрд░рдгреНрдпрд╛рдЪреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рдЖрд╣реЗ.

рдХреЛрдгрддреНрдпрд╛рд╣реА рд╕реНрдХреНрд╡реЗрдЕрд░рд╡рд░ рдХреНрд▓рд┐рдХ рдХрд░реВрди, рдЖрдореНрд╣реА рдЖрдордЪреНрдпрд╛рд╕рд╛рдареА рдЙрдкрд▓рдмреНрдз рдХреНрд░рд┐рдпрд╛ рдкрд╛рд╣реВ:

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

рддреБрдореНрд╣реА рдШреЗрдК рд╢рдХрддрд╛ рдЖрдгрд┐ рдХреНрд▓рд┐рдЕрд░ рдж рдлреЙрд▓рди рдХрд░реВ рд╢рдХрддрд╛. рдореНрд╣рдгрдЬреЗрдЪ, рдЖрдореНрд╣реА рд╡рд┐рд╕рд░рддреЛ рдХреА рддреЗрдереЗ рдХрд╛рд╣реАрддрд░реА рдЕрдпрд╢рд╕реНрд╡реА рдЭрд╛рд▓реЗ рдЖрд╣реЗ рдЖрдгрд┐ рддреЗрдЪ рдЙрджрд╛рд╣рд░рдг рдХрд╛рд░реНрдп рд╢реЗрдбреНрдпреВрд▓рд░рдХрдбреЗ рдЬрд╛рдИрд▓.

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

рд╣реЗ рд╕реНрдкрд╖реНрдЯ рдЖрд╣реЗ рдХреА рд╕рд░реНрд╡ рд▓рд╛рд▓ рдЪреМрд░рд╕рд╛рдВрд╕рд╣ рдорд╛рдКрд╕рд╕рд╣ рд╣реЗ рдХрд░рдгреЗ рдлрд╛рд░рд╕реЗ рдорд╛рдирд╡реАрдп рдирд╛рд╣реА - рд╣реЗ рдЖрдореНрд╣реА рдПрдЕрд░рдлреНрд▓реЛрдХрдбреВрди рдЕрдкреЗрдХреНрд╖рд┐рдд рдирд╛рд╣реА. рд╕рд╛рд╣рдЬрд┐рдХрдЪ, рдЖрдордЪреНрдпрд╛рдХрдбреЗ рдореЛрдареНрдпрд╛ рдкреНрд░рдорд╛рдгрд╛рд╡рд░ рд╡рд┐рдирд╛рд╢ рдХрд░рдгрд╛рд░реА рд╢рд╕реНрддреНрд░реЗ рдЖрд╣реЗрдд: Browse/Task Instances

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

рдЪрд▓рд╛ рд╕рд░реНрд╡ рдХрд╛рд╣реА рдПрдХрд╛рдЪ рд╡реЗрд│реА рдирд┐рд╡рдбрд╛ рдЖрдгрд┐ рд╢реВрдиреНрдпрд╛рд╡рд░ рд░реАрд╕реЗрдЯ рдХрд░реВ, рдпреЛрдЧреНрдп рдЖрдпрдЯрдорд╡рд░ рдХреНрд▓рд┐рдХ рдХрд░рд╛:

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

рд╕рд╛рдлрд╕рдлрд╛рдИ рдХреЗрд▓реНрдпрд╛рдирдВрддрд░, рдЖрдордЪреНрдпрд╛ рдЯреЕрдХреНрд╕реА рдЕрд╢рд╛ рджрд┐рд╕рддрд╛рдд (рддреЗ рд╢реЗрдбреНрдпреВрд▓рд░ рд╢реЗрдбреНрдпреВрд▓ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдЖрдзреАрдЪ рд╡рд╛рдЯ рдкрд╛рд╣рдд рдЖрд╣реЗрдд):

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

рдХрдиреЗрдХреНрд╢рди, рд╣реБрдХ рдЖрдгрд┐ рдЗрддрд░ рдЪрд▓

рдкреБрдвреАрд▓ рдбреАрдПрдЬреА рдкрд╛рд╣рдгреНрдпрд╛рдЪреА рд╡реЗрд│ рдЖрд▓реА рдЖрд╣реЗ, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""╨У╨╛╤Б╨┐╨╛╨┤╨░ ╤Е╨╛╤А╨╛╤И╨╕╨╡, ╨╛╤В╤З╨╡╤В╤Л ╨╛╨▒╨╜╨╛╨▓╨╗╨╡╨╜╤Л"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ╨Э╨░╤В╨░╤И, ╨┐╤А╨╛╤Б╤Л╨┐╨░╨╣╤Б╤П, ╨╝╤Л {{ dag.dag_id }} ╤Г╤А╨╛╨╜╨╕╨╗╨╕
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

рдкреНрд░рддреНрдпреЗрдХрд╛рдиреЗ рдХрдзреА рдЕрд╣рд╡рд╛рд▓ рдЕрдкрдбреЗрдЯ рдХреЗрд▓рд╛ рдЖрд╣реЗ рдХрд╛? рд╣реА рдкреБрдиреНрд╣рд╛ рддрд┐рдЪреА рдЖрд╣реЗ: рдбреЗрдЯрд╛ рдХреЛрдареВрди рдорд┐рд│рд╡рд╛рдпрдЪрд╛ рдпрд╛рдЪреА рд╕реНрддреНрд░реЛрддрд╛рдВрдЪреА рдпрд╛рджреА рдЖрд╣реЗ; рдХреБрдареЗ рдареЗрд╡рд╛рдпрдЪреЗ рдпрд╛рдЪреА рдпрд╛рджреА рдЖрд╣реЗ; рдЬреЗрд╡реНрд╣рд╛ рд╕рд░реНрд╡рдХрд╛рд╣реА рдШрдбрд▓реЗ рдХрд┐рдВрд╡рд╛ рддреБрдЯрд▓реЗ рддреЗрд╡реНрд╣рд╛ рд╣реЙрдВрдХ рдХрд░рдгреНрдпрд╛рд╕ рд╡рд┐рд╕рд░реВ рдирдХрд╛ (рдареАрдХ рдЖрд╣реЗ, рд╣реЗ рдЖрдкрд▓реНрдпрд╛рдмрджреНрджрд▓ рдирд╛рд╣реА, рдирд╛рд╣реА).

рдЪрд▓рд╛ рдкреБрдиреНрд╣рд╛ рдлрд╛рдЗрд▓рдордзреНрдпреЗ рдЬрд╛рдК рдЖрдгрд┐ рдирд╡реАрди рдЕрд╕реНрдкрд╖реНрдЯ рд╕рд╛рдордЧреНрд░реА рдкрд╛рд╣реВ:

  • from commons.operators import TelegramBotSendMessage - рдЖрдореНрд╣рд╛рд▓рд╛ рдЖрдордЪреЗ рд╕реНрд╡рддрдГрдЪреЗ рдСрдкрд░реЗрдЯрд░ рдмрдирд╡рдгреНрдпрд╛рдкрд╛рд╕реВрди рдХрд╛рд╣реАрд╣реА рдкреНрд░рддрд┐рдмрдВрдзрд┐рдд рдХрд░рдд рдирд╛рд╣реА, рдЬреНрдпрд╛рдЪрд╛ рдЖрдореНрд╣реА рдЕрдирдмреНрд▓реЙрдХ рдХреЗрд▓реЗрд▓реЗ рд╕рдВрджреЗрд╢ рдкрд╛рдард╡рдгреНрдпрд╛рд╕рд╛рдареА рдПрдХ рдЫреЛрдЯрд╛ рдЖрд╡рд░рдг рдмрдирд╡реВрди рдлрд╛рдпрджрд╛ рдШреЗрддрд▓рд╛. (рдЖрдореНрд╣реА рдЦрд╛рд▓реА рдпрд╛ рдСрдкрд░реЗрдЯрд░рдмрджреНрджрд▓ рдЕрдзрд┐рдХ рдмреЛрд▓реВ);
  • default_args={} - dag рддреНрдпрд╛рдЪреНрдпрд╛ рд╕рд░реНрд╡ рдСрдкрд░реЗрдЯрд░рдирд╛ рд╕рдорд╛рди рдпреБрдХреНрддрд┐рд╡рд╛рдж рд╡рд┐рддрд░рд┐рдд рдХрд░реВ рд╢рдХрддреЗ;
  • to='{{ var.value.all_the_kings_men }}' - рдлреАрд▓реНрдб to рдЖрдордЪреНрдпрд╛рдХрдбреЗ рд╣рд╛рд░реНрдбрдХреЛрдб рдирд╕реЗрд▓, рдкрд░рдВрддреБ рдЬрд┐рдиреНрдЬрд╛ рд╡рд╛рдкрд░реВрди рдбрд╛рдпрдиреЕрдорд┐рдХрд▓реА рд╡реНрдпреБрддреНрдкрдиреНрди рдХреЗрд▓реЗ рдЬрд╛рдИрд▓ рдЖрдгрд┐ рдИрдореЗрд▓рдЪреНрдпрд╛ рд╕реВрдЪреАрд╕рд╣ рдПрдХ рд╡реНрд╣реЗрд░рд┐рдПрдмрд▓, рдЬреЗ рдореА рдХрд╛рд│рдЬреАрдкреВрд░реНрд╡рдХ рдареЗрд╡рд▓реЗ рдЖрд╣реЗ Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS тАФ рдСрдкрд░реЗрдЯрд░ рд╕реБрд░реВ рдХрд░рдгреНрдпрд╛рдЪреА рдЕрдЯ. рдЖрдордЪреНрдпрд╛ рдмрд╛рдмрддреАрдд, рд╕рд░реНрд╡ рдЕрд╡рд▓рдВрдмрд┐рддреНрд╡ рдкреВрд░реНрдг рдЭрд╛рд▓реЗ рдЕрд╕рд▓реНрдпрд╛рд╕рдЪ рдкрддреНрд░ рдмреЙрд╕рдХрдбреЗ рдЬрд╛рдИрд▓ рдпрд╢рд╕реНрд╡реАрд░рд┐рддреНрдпрд╛;
  • tg_bot_conn_id='tg_main' - рдпреБрдХреНрддрд┐рд╡рд╛рдж conn_id рдЖрдореНрд╣реА рддрдпрд╛рд░ рдХреЗрд▓реЗрд▓реЗ рдХрдиреЗрдХреНрд╢рди рдЖрдпрдбреА рд╕реНрд╡реАрдХрд╛рд░рддреЛ Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - рдкрдбрд▓реЗрд▓реНрдпрд╛ рдХрд╛рд░реНрдпреЗ рдЕрд╕рд▓реНрдпрд╛рд╕рдЪ рдЯреЗрд▓рд┐рдЧреНрд░рд╛рдордордзреАрд▓ рд╕рдВрджреЗрд╢ рдЙрдбреВрди рдЬрд╛рддреАрд▓;
  • task_concurrency=1 - рдЖрдореНрд╣реА рдПрдХрд╛ рдХрд╛рд░реНрдпрд╛рдЪреНрдпрд╛ рдЕрдиреЗрдХ рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдгреЗ рдПрдХрд╛рдЪ рд╡реЗрд│реА рд▓рд╛рдБрдЪ рдХрд░рдгреНрдпрд╛рд╕ рдкреНрд░рддрд┐рдмрдВрдзрд┐рдд рдХрд░рддреЛ. рдЕрдиреНрдпрдерд╛, рдЖрдореНрд╣рд╛рд▓рд╛ рдЕрдиреЗрдХрд╛рдВрдЪреЗ рдПрдХрд╛рдЪрд╡реЗрд│реА рдкреНрд░рдХреНрд╖реЗрдкрдг рдорд┐рд│реЗрд▓ VerticaOperator (рдПрдХрд╛ тАЛтАЛрдЯреЗрдмрд▓рдХрдбреЗ рдкрд╣рд╛рдд рдЖрд╣реЗ);
  • report_update >> [email, tg] - рд╕рд░реНрд╡ VerticaOperator рдпрд╛рдкреНрд░рдорд╛рдгреЗ рдкрддреНрд░реЗ рдЖрдгрд┐ рд╕рдВрджреЗрд╢ рдкрд╛рдард╡рд┐рдгреНрдпрд╛рдд рдПрдХрддреНрд░ рдпреЗрдгреЗ:
    Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

    рдкрд░рдВрддреБ рдиреЛрдЯрд┐рдлрд╛рдпрд░ рдСрдкрд░реЗрдЯрд░реНрд╕рдЪреА рд▓реЙрдиреНрдЪ рдкрд░рд┐рд╕реНрдерд┐рддреА рднрд┐рдиреНрди рдЕрд╕рд▓реНрдпрд╛рдиреЗ, рдлрдХреНрдд рдПрдХрдЪ рдХрд╛рд░реНрдп рдХрд░реЗрд▓. рдЯреНрд░реА рд╡реНрд╣реНрдпреВрдордзреНрдпреЗ, рд╕рд░реНрд╡ рдХрд╛рд╣реА рдереЛрдбреЗ рдХрдореА рд╡реНрд╣рд┐рдЬреНрдпреБрдЕрд▓ рджрд┐рд╕рддреЗ:
    Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

рдореА рдпрд╛рдмрджреНрджрд▓ рдХрд╛рд╣реА рд╢рдмреНрдж рд╕рд╛рдВрдЧреЗрди рдореЕрдХреНрд░реЛ рдЖрдгрд┐ рддреНрдпрд╛рдВрдЪреЗ рдорд┐рддреНрд░ - рдЪрд▓.

рдореЕрдХреНрд░реЛ рд╣реЗ рдЬрд┐рдиреНрдЬрд╛ рдкреНрд▓реЗрд╕рд╣реЛрд▓реНрдбрд░ рдЖрд╣реЗрдд рдЬреЗ рдСрдкрд░реЗрдЯрд░ рд╡рд┐рддрд░реНрдХрд╛рдВрдордзреНрдпреЗ рд╡рд┐рд╡рд┐рдз рдЙрдкрдпреБрдХреНрдд рдорд╛рд╣рд┐рддреА рдмрджрд▓реВ рд╢рдХрддрд╛рдд. рдЙрджрд╛рд╣рд░рдгрд╛рд░реНрде, рдпрд╛рд╕рд╛рд░рдЦреЗ:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} рд╕рдВрджрд░реНрдн рд╡реНрд╣реЗрд░рд┐рдПрдмрд▓рдЪреНрдпрд╛ рд╕рд╛рдордЧреНрд░реАрдордзреНрдпреЗ рд╡рд┐рд╕реНрддрд╛рд░рд┐рдд рд╣реЛрдИрд▓ execution_date рд╕реНрд╡рд░реВрдкрд╛рдд YYYY-MM-DD: 2020-07-14. рд╕рд░реНрд╡реЛрддреНрдХреГрд╖реНрдЯ рднрд╛рдЧ рдЕрд╕рд╛ рдЖрд╣реЗ рдХреА рд╕рдВрджрд░реНрдн рд╡реНрд╣реЗрд░рд┐рдПрдмрд▓реНрд╕ рдПрдХрд╛ рд╡рд┐рд╢рд┐рд╖реНрдЯ рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдгрд╛рд╡рд░ (рдЯреНрд░реА рд╡реНрд╣реНрдпреВрдордзреАрд▓ рд╕реНрдХреНрд╡реЗрдЕрд░) рдиреЗрд▓ рдХреЗрд▓реЗ рдЬрд╛рддрд╛рдд рдЖрдгрд┐ рд░реАрд╕реНрдЯрд╛рд░реНрдЯ рдХреЗрд▓реНрдпрд╛рд╡рд░, рдкреНрд▓реЗрд╕рд╣реЛрд▓реНрдбрд░ рд╕рдорд╛рди рдореВрд▓реНрдпрд╛рдВрдордзреНрдпреЗ рд╡рд┐рд╕реНрддреГрдд рд╣реЛрддреАрд▓.

рдирд┐рдпреБрдХреНрдд рдХреЗрд▓реЗрд▓реА рдореВрд▓реНрдпреЗ рдкреНрд░рддреНрдпреЗрдХ рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдгрд╛рд╡рд░ рдкреНрд░рд╕реНрддреБрдд рдмрдЯрдг рд╡рд╛рдкрд░реВрди рдкрд╛рд╣рд┐рд▓реА рдЬрд╛рдК рд╢рдХрддрд╛рдд. рдкрддреНрд░ рдкрд╛рдард╡рдгреНрдпрд╛рдЪреЗ рдХрд╛рд░реНрдп рдЕрд╕реЗ рдЖрд╣реЗ:

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

рдЖрдгрд┐ рдореНрд╣рдгреВрди рд╕рдВрджреЗрд╢ рдкрд╛рдард╡рдгреНрдпрд╛рдЪреНрдпрд╛ рдХрд╛рд░реНрдпрд╛рд╡рд░:

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

рдирд╡реАрдирддрдо рдЙрдкрд▓рдмреНрдз рдЖрд╡реГрддреНрддреАрд╕рд╛рдареА рдЕрдВрдЧрднреВрдд рдореЕрдХреНрд░реЛрдЪреА рд╕рдВрдкреВрд░реНрдг рдпрд╛рджреА рдпреЗрдереЗ рдЙрдкрд▓рдмреНрдз рдЖрд╣реЗ: рдореЕрдХреНрд░реЛ рд╕рдВрджрд░реНрдн

рд╢рд┐рд╡рд╛рдп, рдкреНрд▓рдЧрдЗрдирдЪреНрдпрд╛ рдорджрддреАрдиреЗ, рдЖрдореНрд╣реА рдЖрдордЪреЗ рд╕реНрд╡рддрдГрдЪреЗ рдореЕрдХреНрд░реЛ рдШреЛрд╖рд┐рдд рдХрд░реВ рд╢рдХрддреЛ, рдкрд░рдВрддреБ рддреА рджреБрд╕рд░реА рдЧреЛрд╖реНрдЯ рдЖрд╣реЗ.

рдкреВрд░реНрд╡рдирд┐рд░реНрдзрд╛рд░рд┐рдд рдЧреЛрд╖реНрдЯреАрдВрд╡реНрдпрддрд┐рд░рд┐рдХреНрдд, рдЖрдореНрд╣реА рдЖрдордЪреНрдпрд╛ рд╡реНрд╣реЗрд░рд┐рдПрдмрд▓реНрд╕рдЪреА рдореВрд▓реНрдпреЗ рдмрджрд▓реВ рд╢рдХрддреЛ (рдореА рд╣реЗ рдЖрдзреАрдЪ рд╡рд░реАрд▓ рдХреЛрдбрдордзреНрдпреЗ рд╡рд╛рдкрд░рд▓реЗ рдЖрд╣реЗ). рдЪреНрдпрд╛ рдордзреНрдпреЗ рддрдпрд╛рд░ рдХрд░реВ Admin/Variables рдХрд╛рд╣реА рдЧреЛрд╖реНрдЯреА:

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

рдЖрдкрдг рд╡рд╛рдкрд░реВ рд╢рдХрддрд╛ рд╕рд░реНрд╡рдХрд╛рд╣реА:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

рдореВрд▓реНрдп рд╕реНрдХреЗрд▓рд░ рдЕрд╕реВ рд╢рдХрддреЗ рдХрд┐рдВрд╡рд╛ рддреЗ JSON рджреЗрдЦреАрд▓ рдЕрд╕реВ рд╢рдХрддреЗ. JSON рдЪреНрдпрд╛ рдмрд╛рдмрддреАрдд:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

рдлрдХреНрдд рдЗрдЪреНрдЫрд┐рдд рдХреАрдЪрд╛ рдорд╛рд░реНрдЧ рд╡рд╛рдкрд░рд╛: {{ var.json.bot_config.bot.token }}.

рдореА рдЕрдХреНрд╖рд░рд╢рдГ рдПрдХ рд╢рдмреНрдж рдмреЛрд▓реЗрди рдЖрдгрд┐ рдПрдХ рд╕реНрдХреНрд░реАрдирд╢реЙрдЯ рджрд╛рдЦрд╡реЗрди рдХрдиреЗрдХреНрд╢рди. рдпреЗрдереЗ рд╕рд░реНрд╡ рдХрд╛рд╣реА рдкреНрд░рд╛рдердорд┐рдХ рдЖрд╣реЗ: рдкреГрд╖реНрдард╛рд╡рд░ Admin/Connections рдЖрдореНрд╣реА рдПрдХ рдХрдиреЗрдХреНрд╢рди рддрдпрд╛рд░ рдХрд░рддреЛ, рдЖрдордЪреЗ рд▓реЙрдЧрд┐рди / рдкрд╛рд╕рд╡рд░реНрдб рдЖрдгрд┐ рдЕрдзрд┐рдХ рд╡рд┐рд╢рд┐рд╖реНрдЯ рдкреЕрд░рд╛рдореАрдЯрд░реНрд╕ рддреЗрдереЗ рдЬреЛрдбрддреЛ. рдпрд╛рдкреНрд░рдорд╛рдгреЗ:

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

рдкрд╛рд╕рд╡рд░реНрдб рдПрдиреНрдХреНрд░рд┐рдкреНрдЯ рдХреЗрд▓реЗ рдЬрд╛рдК рд╢рдХрддрд╛рдд (рдбреАрдлреЙрд▓реНрдЯрдкреЗрдХреНрд╖рд╛ рдЕрдзрд┐рдХ рдкреВрд░реНрдгрдкрдгреЗ), рдХрд┐рдВрд╡рд╛ рддреБрдореНрд╣реА рдХрдиреЗрдХреНрд╢рди рдкреНрд░рдХрд╛рд░ рд╕реЛрдбреВ рд╢рдХрддрд╛ (рдЬрд╕реЗ рдореА рдХреЗрд▓реЗ tg_main) - рд╡рд╕реНрддреБрд╕реНрдерд┐рддреА рдЕрд╢реА рдЖрд╣реЗ рдХреА рдкреНрд░рдХрд╛рд░рд╛рдВрдЪреА рдпрд╛рджреА рдПрдЕрд░рдлреНрд▓реЛ рдореЙрдбреЗрд▓реНрд╕рдордзреНрдпреЗ рд╣рд╛рд░реНрдбрд╡рд╛рдпрд░ рдХреЗрд▓реЗрд▓реА рдЖрд╣реЗ рдЖрдгрд┐ рд╕реНрддреНрд░реЛрдд рдХреЛрдбрдордзреНрдпреЗ рдкреНрд░рд╡реЗрд╢ рдХреЗрд▓реНрдпрд╛рд╢рд┐рд╡рд╛рдп рд╡рд┐рд╕реНрддрд╛рд░рд┐рдд рдХреЗрд▓реА рдЬрд╛рдК рд╢рдХрдд рдирд╛рд╣реА (рдЕрдЪрд╛рдирдХ рдореА рдХрд╛рд╣реАрддрд░реА рдЧреБрдЧрд▓ рдХреЗрд▓реЗ рдирд╛рд╣реА рддрд░, рдХреГрдкрдпрд╛ рдорд▓рд╛ рджреБрд░реБрд╕реНрдд рдХрд░рд╛), рдкрд░рдВрддреБ рдХрд╛рд╣реАрд╣реА рдЖрдореНрд╣рд╛рд▓рд╛ рдХреНрд░реЗрдбрд┐рдЯреНрд╕ рдорд┐рд│рдгреНрдпрд╛рдкрд╛рд╕реВрди рд░реЛрдЦрдгрд╛рд░ рдирд╛рд╣реА рдирд╛рд╡

рдЖрдкрдг рдПрдХрд╛рдЪ рдирд╛рд╡рд╛рдиреЗ рдЕрдиреЗрдХ рдХрдиреЗрдХреНрд╢рди рджреЗрдЦреАрд▓ рдХрд░реВ рд╢рдХрддрд╛: рдпрд╛ рдкреНрд░рдХрд░рдгрд╛рдд, рдкрджреНрдзрдд BaseHook.get_connection(), рдЬреЗ рдЖрдореНрд╣рд╛рд▓рд╛ рдирд╛рд╡рд╛рдиреЗ рдХрдиреЗрдХреНрд╢рди рдорд┐рд│рд╡рддреЗ, рджреЗрдИрд▓ рдпрд╛рджреГрдЪреНрдЫрд┐рдХ рдЕрдиреЗрдХ рдирд╛рд╡рд╛рдВрдордзреВрди (рд░рд╛рдКрдВрдб рд░реЙрдмрд┐рди рдмрдирд╡рдгреЗ рдЕрдзрд┐рдХ рддрд░реНрдХрд╕рдВрдЧрдд рдЕрд╕реЗрд▓, рдкрд░рдВрддреБ рдПрдЕрд░рдлреНрд▓реЛ рдбреЗрд╡реНрд╣рд▓рдкрд░рдЪреНрдпрд╛ рд╡рд┐рд╡реЗрдХрд╛рд╡рд░ рд╕реЛрдбреВрдпрд╛).

рд╡реНрд╣реЗрд░рд┐рдПрдмрд▓реНрд╕ рдЖрдгрд┐ рдХрдиреЗрдХреНрд╢рди рд╣реА рдирдХреНрдХреАрдЪ рдЫрд╛рди рд╕рд╛рдзрдиреЗ рдЖрд╣реЗрдд, рдкрд░рдВрддреБ рд╢рд┐рд▓реНрд▓рдХ рди рдЧрдорд╛рд╡рдгреЗ рдорд╣рддреНрд╡рд╛рдЪреЗ рдЖрд╣реЗ: рддреБрдордЪреНрдпрд╛ рдкреНрд░рд╡рд╛рд╣рд╛рдЪреЗ рдХреЛрдгрддреЗ рднрд╛рдЧ рддреБрдореНрд╣реА рдХреЛрдбрдордзреНрдпреЗрдЪ рд╕рд╛рдард╡рддрд╛ рдЖрдгрд┐ рд╕реНрдЯреЛрд░реЗрдЬрд╕рд╛рдареА рддреБрдореНрд╣реА рдПрдЕрд░рдлреНрд▓реЛрд▓рд╛ рдХреЛрдгрддреЗ рднрд╛рдЧ рджреЗрддрд╛. рдПрдХреАрдХрдбреЗ, UI рджреНрд╡рд╛рд░реЗ рддреНрд╡рд░реАрдд рдореВрд▓реНрдп рдмрджрд▓рдгреЗ рд╕реЛрдпреАрдЪреЗ рдЕрд╕реВ рд╢рдХрддреЗ, рдЙрджрд╛рд╣рд░рдгрд╛рд░реНрде, рдореЗрд▓рд┐рдВрдЧ рдмреЙрдХреНрд╕. рджреБрд╕рд░реАрдХрдбреЗ, рд╣реЗ рдЕрдЬреВрдирд╣реА рдорд╛рдКрд╕ рдХреНрд▓рд┐рдХрд╡рд░ рдкрд░рдд рдпреЗрдгреЗ рдЖрд╣реЗ, рдЬреНрдпрд╛рдкрд╛рд╕реВрди рдЖрдореНрд╣рд╛рд▓рд╛ (рдореА) рд╕реБрдЯрдХрд╛ рд╣рд╡реА рд╣реЛрддреА.

рдХрдиреЗрдХреНрд╢рдирд╕рд╣ рдХрд╛рд░реНрдп рдХрд░рдгреЗ рд╣реЗ рдХрд╛рд░реНрдпрд╛рдВрдкреИрдХреА рдПрдХ рдЖрд╣реЗ рд╣реБрдХ. рд╕рд░реНрд╡рд╕рд╛рдзрд╛рд░рдгрдкрдгреЗ, рдПрдЕрд░рдлреНрд▓реЛ рд╣реБрдХ рд╣реЗ рддреГрддреАрдп-рдкрдХреНрд╖ рд╕реЗрд╡рд╛ рдЖрдгрд┐ рд▓рд╛рдпрдмреНрд░рд░реАрд╢реА рдХрдиреЗрдХреНрдЯ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдкреЙрдЗрдВрдЯ рдЖрд╣реЗрдд. рдЙрджрд╛. JiraHook рдЬрд┐рд░рд╛рд╢реА рд╕рдВрд╡рд╛рдж рд╕рд╛рдзрдгреНрдпрд╛рд╕рд╛рдареА рдЖрдордЪреНрдпрд╛рд╕рд╛рдареА рдХреНрд▓рд╛рдпрдВрдЯ рдЙрдШрдбреЗрд▓ (рддреБрдореНрд╣реА рдХрд╛рдореЗ рдкреБрдвреЗ-рдорд╛рдЧреЗ рд╣рд▓рд╡реВ рд╢рдХрддрд╛), рдЖрдгрд┐ рддреНрдпрд╛рдЪреНрдпрд╛ рдорджрддреАрдиреЗ SambaHook рддреБрдореНрд╣реА рд╕реНрдерд╛рдирд┐рдХ рдлрд╛рдЗрд▓ рдкреБрд╢ рдХрд░реВ рд╢рдХрддрд╛ smb-рдмрд┐рдВрджреВ.

рд╕рд╛рдиреБрдХреВрд▓ рдСрдкрд░реЗрдЯрд░рдЪреЗ рд╡рд┐рд╢реНрд▓реЗрд╖рдг

рдЖрдгрд┐ рдЖрдореНрд╣реА рддреЗ рдХрд╕реЗ рдмрдирд╡рд▓реЗ рдЖрд╣реЗ рддреЗ рдкрд╛рд╣рдгреНрдпрд╛рдЪреНрдпрд╛ рдЬрд╡рд│ рдкреЛрд╣реЛрдЪрд▓реЛ TelegramBotSendMessage

рдХреЛрдб commons/operators.py рд╡рд╛рд╕реНрддрд╡рд┐рдХ рдСрдкрд░реЗрдЯрд░рд╕рд╣:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

рдпреЗрдереЗ, рдПрдЕрд░рдлреНрд▓реЛрдордзреАрд▓ рдЗрддрд░ рд╕рд░реНрд╡ рдЧреЛрд╖реНрдЯреАрдВрдкреНрд░рдорд╛рдгреЗ, рд╕рд░реНрд╡рдХрд╛рд╣реА рдЕрдЧрджреА рд╕реЛрдкреЗ рдЖрд╣реЗ:

  • рдХрдбреВрди рд╡рд╛рд░рд╕рд╛ рдорд┐рд│рд╛рд▓рд╛ BaseOperator, рдЬреЗ рдХрд╛рд╣реА рдПрдЕрд░рдлреНрд▓реЛ-рд╡рд┐рд╢рд┐рд╖реНрдЯ рдЧреЛрд╖реНрдЯреА рд▓рд╛рдЧреВ рдХрд░рддреЗ (рддреБрдордЪреНрдпрд╛ рд╡рд┐рд╢реНрд░рд╛рдВрддреАрдХрдбреЗ рдкрд╣рд╛)
  • рдШреЛрд╖рд┐рдд рдлреАрд▓реНрдб template_fields, рдЬреНрдпрд╛рдордзреНрдпреЗ Jinja рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдореЕрдХреНрд░реЛ рд╢реЛрдзреЗрд▓.
  • рд╕рд╛рдареА рдпреЛрдЧреНрдп рдпреБрдХреНрддрд┐рд╡рд╛рдж рдорд╛рдВрдбрд▓реЗ __init__(), рдЖрд╡рд╢реНрдпрдХ рддреЗрдереЗ рдбреАрдлреЙрд▓реНрдЯ рд╕реЗрдЯ рдХрд░рд╛.
  • рдЖрдореНрд╣реА рдкреВрд░реНрд╡рдЬрд╛рдВрдЪреНрдпрд╛ рдкреНрд░рд╛рд░рдВрднрд╛рдмрджреНрджрд▓ рджреЗрдЦреАрд▓ рд╡рд┐рд╕рд░рд▓реЛ рдирд╛рд╣реА.
  • рд╕рдВрдмрдВрдзрд┐рдд рд╣реБрдХ рдЙрдШрдбрд▓рд╛ TelegramBotHookрддреНрдпрд╛рддреВрди рдХреНрд▓рд╛рдпрдВрдЯ рдСрдмреНрдЬреЗрдХреНрдЯ рдкреНрд░рд╛рдкреНрдд рдЭрд╛рд▓рд╛.
  • рдЕрдзрд┐рд▓рд┐рдЦрд┐рдд (рдкреБрдиреНрд╣рд╛ рдкрд░рд┐рднрд╛рд╖рд┐рдд) рдкрджреНрдзрдд BaseOperator.execute(), рдЬреЗрд╡реНрд╣рд╛ рдСрдкрд░реЗрдЯрд░ рд▓реЙрдиреНрдЪ рдХрд░рдгреНрдпрд╛рдЪреА рд╡реЗрд│ рдпреЗрдИрд▓ рддреЗрд╡реНрд╣рд╛ рдХреЛрдгрддрд╛ Airfow рдЯреНрд╡рд┐рдЪ рдХрд░реЗрд▓ - рддреНрдпрд╛рдд рдЖрдореНрд╣реА рд▓реЙрдЧ рдЗрди рдХрд░рдгреЗ рд╡рд┐рд╕рд░реВрди рдореБрдЦреНрдп рдХреНрд░рд┐рдпрд╛ рдЕрдВрдорд▓рд╛рдд рдЖрдгреВ. (рдЖрдореНрд╣реА рд▓реЙрдЧ рдЗрди рдХрд░рддреЛ, рддрд╕реЗ, рдЕрдЧрджреА рдЖрдд stdout ╨╕ stderr - рдПрдЕрд░рдлреНрд▓реЛ рд╕рд░реНрд╡рдХрд╛рд╣реА рд░реЛрдЦреЗрд▓, рддреЗ рд╕реБрдВрджрд░рдкрдгреЗ рдЧреБрдВрдбрд╛рд│реЗрд▓, рдЖрд╡рд╢реНрдпрдХ рдЕрд╕реЗрд▓ рддреЗрдереЗ рддреЗ рд╡рд┐рдШрдЯрд┐рдд рдХрд░реЗрд▓.)

рдЖрдкрдг рдХрд╛рдп рдЖрд╣реЗ рддреЗ рдкрд╛рд╣реВрдпрд╛ commons/hooks.py. рдлрд╛рдЗрд▓рдЪрд╛ рдкрд╣рд┐рд▓рд╛ рднрд╛рдЧ, рд╣реБрдХрд╕рд╣:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

рдорд▓рд╛ рдпреЗрдереЗ рдХрд╛рдп рд╕реНрдкрд╖реНрдЯ рдХрд░рд╛рд╡реЗ рд╣реЗ рджреЗрдЦреАрд▓ рдорд╛рд╣рд┐рдд рдирд╛рд╣реА, рдореА рдлрдХреНрдд рдорд╣рддреНрд╡рд╛рдЪреЗ рдореБрджреНрджреЗ рд▓рдХреНрд╖рд╛рдд рдареЗрд╡рддреЛ:

  • рдЖрдореНрд╣реА рд╡рд╛рд░рд╕рд╛ рдШреЗрддреЛ, рд╡рд┐рддрд░реНрдХрд╛рдВрдЪрд╛ рд╡рд┐рдЪрд╛рд░ рдХрд░рд╛ - рдмрд╣реБрддреЗрдХ рдкреНрд░рдХрд░рдгрд╛рдВрдордзреНрдпреЗ рддреЗ рдПрдХ рдЕрд╕реЗрд▓: conn_id;
  • рдорд╛рдирдХ рдкрджреНрдзрддреА рдУрд╡реНрд╣рд░рд░рд╛рдЗрдб рдХрд░рдгреЗ: рдореА рд╕реНрд╡рддрдГрд▓рд╛ рдорд░реНрдпрд╛рджрд┐рдд рдХреЗрд▓реЗ get_conn(), рдЬреНрдпрд╛рдордзреНрдпреЗ рдорд▓рд╛ рдирд╛рд╡рд╛рдиреБрд╕рд╛рд░ рдХрдиреЗрдХреНрд╢рди рдкреЕрд░рд╛рдореАрдЯрд░реНрд╕ рдорд┐рд│рддрд╛рдд рдЖрдгрд┐ рдлрдХреНрдд рд╡рд┐рднрд╛рдЧ рдорд┐рд│рддреЛ extra (рд╣реЗ рдПрдХ JSON рдлреАрд▓реНрдб рдЖрд╣реЗ), рдЬреНрдпрд╛рдордзреНрдпреЗ рдореА (рдорд╛рдЭреНрдпрд╛ рд╕реНрд╡рддрдГрдЪреНрдпрд╛ рд╕реВрдЪрдирд╛рдВрдиреБрд╕рд╛рд░!) рдЯреЗрд▓реАрдЧреНрд░рд╛рдо рдмреЙрдЯ рдЯреЛрдХрди рдареЗрд╡рд▓реЗ рдЖрд╣реЗ: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • рдореА рдЖрдордЪреЗ рдПрдХ рдЙрджрд╛рд╣рд░рдг рддрдпрд╛рд░ рдХрд░рддреЛ TelegramBot, рддреНрдпрд╛рд▓рд╛ рд╡рд┐рд╢рд┐рд╖реНрдЯ рдЯреЛрдХрди рджреЗрдКрди.

рдЗрддрдХрдВрдЪ. рдЖрдкрдг рд╡рд╛рдкрд░реВрди рд╣реБрдХ рдкрд╛рд╕реВрди рдХреНрд▓рд╛рдпрдВрдЯ рдорд┐рд│рд╡реВ рд╢рдХрддрд╛ TelegramBotHook().clent рдХрд┐рдВрд╡рд╛ TelegramBotHook().get_conn().

рдЖрдгрд┐ рдлрд╛рдЗрд▓рдЪрд╛ рджреБрд╕рд░рд╛ рднрд╛рдЧ, рдЬреНрдпрд╛рдордзреНрдпреЗ рдореА рдЯреЗрд▓реАрдЧреНрд░рд╛рдо REST API рд╕рд╛рдареА рдорд╛рдпрдХреНрд░реЛрд░реЕрдкрд░ рдмрдирд╡рддреЛ, рдЬреЗрдгреЗрдХрд░реВрди рддреЗ рдбреНрд░реЕрдЧ рд╣реЛрдК рдирдпреЗ. python-telegram-bot рдПрдХрд╛ рдкрджреНрдзрддреАрд╕рд╛рдареА sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

рд╣реЗ рд╕рд░реНрд╡ рдЬреЛрдбрдгреНрдпрд╛рдЪрд╛ рдпреЛрдЧреНрдп рдорд╛рд░реНрдЧ рдЖрд╣реЗ: TelegramBotSendMessage, TelegramBotHook, TelegramBot - рдкреНрд▓рдЧрдЗрдирдордзреНрдпреЗ, рд╕рд╛рд░реНрд╡рдЬрдирд┐рдХ рднрд╛рдВрдбрд╛рд░рд╛рдд рдареЗрд╡рд╛ рдЖрдгрд┐ рддреЗ рдореБрдХреНрдд рд╕реНрддреНрд░реЛрддрд╛рд▓рд╛ рджреНрдпрд╛.

рдЖрдореНрд╣реА рдпрд╛ рд╕рд░реНрд╡рд╛рдВрдЪрд╛ рдЕрднреНрдпрд╛рд╕ рдХрд░рдд рдЕрд╕рддрд╛рдирд╛, рдЖрдордЪреЗ рдЕрд╣рд╡рд╛рд▓ рдЕрдкрдбреЗрдЯ рдпрд╢рд╕реНрд╡реАрд░рд┐рддреНрдпрд╛ рдЕрдпрд╢рд╕реНрд╡реА рдЭрд╛рд▓реЗ рдЖрдгрд┐ рдорд▓рд╛ рдЪреЕрдиреЗрд▓рдордзреНрдпреЗ рдПрдХ рддреНрд░реБрдЯреА рд╕рдВрджреЗрд╢ рдкрд╛рдард╡рд▓рд╛. рддреЗ рдЪреБрдХреАрдЪреЗ рдЖрд╣реЗ рдХрд╛ рддреЗ рдореА рддрдкрд╛рд╕рдгрд╛рд░ рдЖрд╣реЗ...

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ
рдЖрдордЪреНрдпрд╛ рдХреБрддреНрд░реНрдпрд╛рдд рдХрд╛рд╣реАрддрд░реА рддреБрдЯрд▓реЗ! рдЖрдореНрд╣рд╛рд▓рд╛ рддреЗрдЪ рдЕрдкреЗрдХреНрд╖рд┐рдд рд╣реЛрддрдВ рдирд╛? рдирдХреНрдХреА!

рдУрддрдгрд╛рд░ рдЖрд╣рд╛рдд рдХрд╛?

рдорд╛рдЭреЗ рдХрд╛рд╣реА рдЪреБрдХрд▓реЗ рдЕрд╕реЗ рддреБрдореНрд╣рд╛рд▓рд╛ рд╡рд╛рдЯрддреЗ рдХрд╛? рдЕрд╕реЗ рджрд┐рд╕рддреЗ рдХреА рддреНрдпрд╛рдиреЗ рдПрд╕рдХреНрдпреВрдПрд▓ рд╕рд░реНрд╡реНрд╣рд░рд╡рд░реВрди рд╡реНрд╣рд░реНрдЯрд┐рдХрд╛рдордзреНрдпреЗ рдбреЗрдЯрд╛ рд╣рд╕реНрддрд╛рдВрддрд░рд┐рдд рдХрд░рдгреНрдпрд╛рдЪреЗ рд╡рдЪрди рджрд┐рд▓реЗ рд╣реЛрддреЗ, рдЖрдгрд┐ рдирдВрддрд░ рддреНрдпрд╛рдиреЗ рддреЛ рдШреЗрддрд▓рд╛ рдЖрдгрд┐ рд╡рд┐рд╖рдп рд╕реЛрдбреВрди рджрд┐рд▓рд╛, рдмрджрдорд╛рд╢!

рд╣рд╛ рдЕрддреНрдпрд╛рдЪрд╛рд░ рд╣реЗрддреБрдкреБрд░рд╕реНрд╕рд░ рд╣реЛрддрд╛, рдорд▓рд╛ рддреБрдордЪреНрдпрд╛рд╕рд╛рдареА рдХрд╛рд╣реА рд╢рдмреНрджрд╛рд╡рд▓реА рдЙрд▓рдЧрдбрд╛рдпрдЪреА рд╣реЛрддреА. рдЖрддрд╛ рдЖрдкрдг рдкреБрдвреЗ рдЬрд╛рдК рд╢рдХрддрд╛.

рдЖрдордЪреА рдпреЛрдЬрдирд╛ рдЕрд╢реА рд╣реЛрддреА:

  1. рдбреЗрдЧ рдХрд░рд╛
  2. рдХрд╛рд░реНрдпреЗ рд╡реНрдпреБрддреНрдкрдиреНрди рдХрд░рд╛
  3. рд╕рд░реНрд╡ рдХрд╛рд╣реА рдХрд┐рддреА рд╕реБрдВрджрд░ рдЖрд╣реЗ рддреЗ рдкрд╣рд╛
  4. рднрд░рдгреНрдпрд╛рд╕рд╛рдареА рд╕рддреНрд░ рдХреНрд░рдорд╛рдВрдХ рдирд┐рдпреБрдХреНрдд рдХрд░рд╛
  5. SQL рд╕рд░реНрд╡реНрд╣рд░ рд╡рд░реВрди рдбреЗрдЯрд╛ рдорд┐рд│рд╡рд╛
  6. рд╡реНрд╣рд░реНрдЯрд┐рдХрд╛ рдордзреНрдпреЗ рдбреЗрдЯрд╛ рдареЗрд╡рд╛
  7. рдЖрдХрдбреЗрд╡рд╛рд░реА рдЧреЛрд│рд╛ рдХрд░рд╛

рдореНрд╣рдгреВрди, рд╣реЗ рд╕рд░реНрд╡ рдЪрд╛рд▓реВ рдареЗрд╡рдгреНрдпрд╛рд╕рд╛рдареА, рдореА рдЖрдордЪреНрдпрд╛рдордзреНрдпреЗ рдПрдХ рдЫреЛрдЯреАрд╢реА рднрд░ рдЯрд╛рдХрд▓реА docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

рддреЗрдереЗ рдЖрдореНрд╣реА рд╡рд╛рдврд╡рддреЛ:

  • рд╡реНрд╣рд░реНрдЯрд┐рдХрд╛ рдпрдЬрдорд╛рди рдореНрд╣рдгреВрди dwh рд╕рд░реНрд╡рд╛рдд рдбреАрдлреЙрд▓реНрдЯ рд╕реЗрдЯрд┐рдВрдЧреНрдЬрд╕рд╣,
  • SQL рд╕рд░реНрд╡реНрд╣рд░рдЪреА рддреАрди рдЙрджрд╛рд╣рд░рдгреЗ,
  • рдЖрдореНрд╣реА рдирдВрддрд░рдЪреЗ рдбреЗрдЯрд╛рдмреЗрд╕ рдХрд╛рд╣реА рдбреЗрдЯрд╛рд╕рд╣ рднрд░рддреЛ (рдХреЛрдгрддреНрдпрд╛рд╣реА рдкрд░рд┐рд╕реНрдерд┐рддреАрдд рдкрд╛рд╣реВ рдирдХрд╛ mssql_init.py!)

рдЖрдореНрд╣реА рдорд╛рдЧреАрд▓ рд╡реЗрд│реЗрдкреЗрдХреНрд╖рд╛ рдХрд┐рдВрдЪрд┐рдд рдЕрдзрд┐рдХ рдХреНрд▓рд┐рд╖реНрдЯ рдХрдорд╛рдВрдбрдЪреНрдпрд╛ рдорджрддреАрдиреЗ рд╕рд░реНрд╡ рдЪрд╛рдВрдЧрд▓реЗ рд▓реЙрдиреНрдЪ рдХрд░рддреЛ:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

рдЖрдордЪреНрдпрд╛ рдЪрдорддреНрдХрд╛рд░реА рдпрд╛рджреГрдЪреНрдЫрд┐рдХрддреЗрдиреЗ рдХрд╛рдп рддрдпрд╛рд░ рдХреЗрд▓реЗ, рдЖрдкрдг рдЖрдпрдЯрдо рд╡рд╛рдкрд░реВ рд╢рдХрддрд╛ Data Profiling/Ad Hoc Query:

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ
рдореБрдЦреНрдп рдЧреЛрд╖реНрдЯ рд╡рд┐рд╢реНрд▓реЗрд╖рдХрд╛рдВрдирд╛ рджрд╛рдЦрд╡рдгреЗ рдирд╛рд╣реА

рдЪреНрдпрд╛ рд╡рд░ рддрдкрд╢реАрд▓рд╡рд╛рд░ ETL рд╕рддреНрд░реЗ рдореА рдХрд░рдгрд╛рд░ рдирд╛рд╣реА, рддрд┐рдереЗ рд╕рд░реНрд╡ рдХрд╛рд╣реА рдХреНрд╖реБрд▓реНрд▓рдХ рдЖрд╣реЗ: рдЖрдореНрд╣реА рдПрдХ рдЖрдзрд╛рд░ рдмрдирд╡рддреЛ, рддреНрдпрд╛рдд рдПрдХ рдЪрд┐рдиреНрд╣ рдЖрд╣реЗ, рдЖрдореНрд╣реА рдкреНрд░рддреНрдпреЗрдХ рдЧреЛрд╖реНрдЯ рд╕рдВрджрд░реНрдн рд╡реНрдпрд╡рд╕реНрдерд╛рдкрдХрд╛рд╕рд╣ рдЧреБрдВрдбрд╛рд│рддреЛ рдЖрдгрд┐ рдЖрддрд╛ рдЖрдореНрд╣реА рд╣реЗ рдХрд░рддреЛ:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

рд╡реЗрд│ рдЖрд▓реА рдЖрд╣реЗ рдЖрдордЪрд╛ рдбреЗрдЯрд╛ рдЧреЛрд│рд╛ рдХрд░рд╛ рдЖрдордЪреНрдпрд╛ рджреАрдбрд╢реЗ рдЯреЗрдмрд▓рд╛рдВрд╡рд░реВрди. рдЪрд▓рд╛ рд╣реЗ рдЕрдЧрджреА рдирдореНрд░ рдУрд│реАрдВрдЪреНрдпрд╛ рдорджрддреАрдиреЗ рдХрд░реВрдпрд╛:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. рд╣реБрдХрдЪреНрдпрд╛ рдорджрддреАрдиреЗ рдЖрдореНрд╣реА рдПрдЕрд░рдлреНрд▓реЛрдордзреВрди рдорд┐рд│рд╡рддреЛ pymssql- рдХрдиреЗрдХреНрдЯ рдХрд░рд╛
  2. рдЪрд▓рд╛ рд╡рд┐рдирдВрддреАрдордзреНрдпреЗ рддрд╛рд░рдЦреЗрдЪреНрдпрд╛ рд░реВрдкрд╛рдд рдкреНрд░рддрд┐рдмрдВрдз рдмрджрд▓реВ - рддреЗ рдЯреЗрдореНрдкрд▓реЗрдЯ рдЗрдВрдЬрд┐рдирджреНрд╡рд╛рд░реЗ рдлрдВрдХреНрд╢рдирдордзреНрдпреЗ рдЯрд╛рдХрд▓реЗ рдЬрд╛рдИрд▓.
  3. рдЖрдордЪреНрдпрд╛ рд╡рд┐рдирдВрддреАрд▓рд╛ рдЖрд╣рд╛рд░ рджреЗрдгреЗ pandasрдХреЛрдг рдЖрдореНрд╣рд╛рд▓рд╛ рдорд┐рд│реЗрд▓ DataFrame - рднрд╡рд┐рд╖реНрдпрд╛рдд рддреЗ рдЖрдореНрд╣рд╛рд▓рд╛ рдЙрдкрдпреБрдХреНрдд рдард░реЗрд▓.

рдореА рдкреНрд░рддрд┐рд╕реНрдерд╛рдкрди рд╡рд╛рдкрд░рдд рдЖрд╣реЗ {dt} рд╡рд┐рдирдВрддреА рдкреЕрд░рд╛рдореАрдЯрд░ рдРрд╡рдЬреА %s рдореА рдПрдХ рджреБрд╖реНрдЯ рдкрд┐рдиреЛрдЪрд┐рдпреЛ рдЖрд╣реЗ рдореНрд╣рдгреВрди рдирд╛рд╣реА рддрд░ pandas рд╣рд╛рддрд╛рд│реВ рд╢рдХрдд рдирд╛рд╣реА pymssql рдЖрдгрд┐ рд╢реЗрд╡рдЯрдЪрд╛ рд╕рд░рдХрддреЛ params: ListрдЬрд░реА рддреНрдпрд╛рд▓рд╛ рдЦрд░реЛрдЦрд░ рд╣рд╡реЗ рдЖрд╣реЗ tuple.
рд╣реЗ рджреЗрдЦреАрд▓ рд▓рдХреНрд╖рд╛рдд рдареЗрд╡рд╛ рдХреА рд╡рд┐рдХрд╕рдХ pymssql рддреНрдпрд╛рд▓рд╛ рдпрд╛рдкреБрдвреЗ рдкрд╛рдард┐рдВрдмрд╛ рди рджреЗрдгреНрдпрд╛рдЪрд╛ рдирд┐рд░реНрдгрдп рдШреЗрддрд▓рд╛ рдЖрдгрд┐ рдЖрддрд╛ рдмрд╛рд╣реЗрд░ рдкрдбрдгреНрдпрд╛рдЪреА рд╡реЗрд│ рдЖрд▓реА рдЖрд╣реЗ pyodbc.

рдПрдЕрд░рдлреНрд▓реЛрдиреЗ рдЖрдордЪреНрдпрд╛ рдлрдВрдХреНрд╢рдиреНрд╕рдЪреЗ рдпреБрдХреНрддрд┐рд╡рд╛рдж рдХрд╢рд╛рд╕рд╣ рднрд░рд▓реЗ рддреЗ рдкрд╛рд╣реВрдпрд╛:

Apache Airflow: ETL рд╕реБрд▓рдн рдХрд░рдгреЗ

рдЬрд░ рдбреЗрдЯрд╛ рдирд╕реЗрд▓, рддрд░ рдкреБрдвреЗ рдЪрд╛рд▓реВ рдареЗрд╡рдгреНрдпрд╛рдд рдХрд╛рд╣реА рдЕрд░реНрде рдирд╛рд╣реА. рдкрд░рдВрддреБ рднрд░рдгреЗ рдпрд╢рд╕реНрд╡реА рдорд╛рдирдгреЗ рджреЗрдЦреАрд▓ рд╡рд┐рдЪрд┐рддреНрд░ рдЖрд╣реЗ. рдкрдг рд╣реА рдЪреВрдХ рдирд╛рд╣реА. рдП-рдЖрд╣-рдЖрд╣, рдХрд╛рдп рдХрд░рд╛рд╡реЗ ?! рдЖрдгрд┐ рдпреЗрдереЗ рдХрд╛рдп рдЖрд╣реЗ:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException рдПрдЕрд░рдлреНрд▓реЛрд▓рд╛ рд╕рд╛рдВрдЧрддреЗ рдХреА рдХреЛрдгрддреНрдпрд╛рд╣реА рддреНрд░реБрдЯреА рдирд╛рд╣реАрдд, рдкрд░рдВрддреБ рдЖрдореНрд╣реА рдХрд╛рд░реНрдп рд╡рдЧрд│рддреЛ. рдЗрдВрдЯрд░рдлреЗрд╕рдордзреНрдпреЗ рд╣рд┐рд░рд╡рд╛ рдХрд┐рдВрд╡рд╛ рд▓рд╛рд▓ рдЪреМрд░рд╕ рдирд╕реВрди рдЧреБрд▓рд╛рдмреА рдЕрд╕реЗрд▓.

рдЪрд▓рд╛ рдЖрдордЪрд╛ рдбреЗрдЯрд╛ рдЯрд╛рдХреВрдпрд╛ рдПрдХрд╛рдзрд┐рдХ рд╕реНрддрдВрдн:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

рдореНрд╣рдгрдЬреЗ:

  • рдЖрдореНрд╣реА рдЬреНрдпрд╛ рдбреЗрдЯрд╛рдмреЗрд╕рдордзреВрди рдСрд░реНрдбрд░ рдШреЗрддрд▓реА,
  • рдЖрдордЪреНрдпрд╛ рдкреВрд░ рд╕рддреНрд░рд╛рдЪрд╛ рдЖрдпрдбреА (рддреЛ рд╡реЗрдЧрд│рд╛ рдЕрд╕реЗрд▓ рдкреНрд░рддреНрдпреЗрдХ рдХрд╛рдорд╛рд╕рд╛рдареА),
  • рд╕реНрддреНрд░реЛрдд рдЖрдгрд┐ рдСрд░реНрдбрд░ рдЖрдпрдбреА рд╡рд░реВрди рд╣реЕрд╢ - рдЬреЗрдгреЗрдХрд░реВрди рдЕрдВрддрд┐рдо рдбреЗрдЯрд╛рдмреЗрд╕рдордзреНрдпреЗ (рдЬреЗрдереЗ рд╕рд░реНрд╡ рдХрд╛рд╣реА рдПрдХрд╛ рдЯреЗрдмрд▓рдордзреНрдпреЗ рдУрддрд▓реЗ рдЬрд╛рддреЗ) рдЖрдордЪреНрдпрд╛рдХрдбреЗ рдПрдХ рдЕрджреНрд╡рд┐рддреАрдп рдСрд░реНрдбрд░ рдЖрдпрдбреА рдЖрд╣реЗ.

рд╢реЗрд╡рдЯрдЪреА рдкрд╛рдпрд░реА рд╢рд┐рд▓реНрд▓рдХ рдЖрд╣реЗ: рд╕рд░реНрд╡ рдХрд╛рд╣реА рд╡реНрд╣рд░реНрдЯрд┐рдХрд╛рдордзреНрдпреЗ рдШрд╛рд▓рд╛. рдЖрдгрд┐, рд╡рд┐рдЪрд┐рддреНрд░рдкрдгреЗ, рд╣реЗ рдХрд░рдгреНрдпрд╛рдЪрд╛ рд╕рд░реНрд╡рд╛рдд рдиреЗрддреНрд░рджреАрдкрдХ рдЖрдгрд┐ рдХрд╛рд░реНрдпрдХреНрд╖рдо рдорд╛рд░реНрдЧрд╛рдВрдкреИрдХреА рдПрдХ рдореНрд╣рдгрдЬреЗ CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. рдЖрдореНрд╣реА рдПрдХ рд╡рд┐рд╢реЗрд╖ рд░рд┐рд╕реАрд╡реНрд╣рд░ рдмрдирд╡рдд рдЖрд╣реЛрдд StringIO.
  2. pandas рдХреГрдкрдпрд╛ рдЖрдордЪреНрдпрд╛ рдареЗрд╡реВ DataFrame рдлреЙрд░реНрдо рдордзреНрдпреЗ CSV-рд░реЗрд╖рд╛.
  3. рдЪрд▓рд╛ рдЖрдордЪреНрдпрд╛ рдЖрд╡рдбрддреНрдпрд╛ рд╡реНрд╣рд░реНрдЯрд┐рдХрд╛рд▓рд╛ рд╣реБрдХрд╕рд╣ рдХрдиреЗрдХреНрд╢рди рдЙрдШрдбреВрдпрд╛.
  4. рдЖрдгрд┐ рдЖрддрд╛ рдорджрддреАрдиреЗ copy() рдЖрдордЪрд╛ рдбреЗрдЯрд╛ рдереЗрдЯ Vertika рд╡рд░ рдкрд╛рдард╡рд╛!

рдЖрдореНрд╣реА рдбреНрд░рд╛рдпрд╡реНрд╣рд░рдХрдбреВрди рдХрд┐рддреА рдУрд│реА рднрд░рд▓реНрдпрд╛ рдЖрд╣реЗрдд рддреЗ рдШреЗрдК рдЖрдгрд┐ рд╕рддреНрд░ рд╡реНрдпрд╡рд╕реНрдерд╛рдкрдХрд╛рд▓рд╛ рд╕рд╛рдВрдЧреВ рдХреА рд╕рд░реНрд╡ рдХрд╛рд╣реА рдареАрдХ рдЖрд╣реЗ:

session.loaded_rows = cursor.rowcount
session.successful = True

рдПрд╡рдвреЗрдЪ.

рд╡рд┐рдХреНрд░реАрд╡рд░, рдЖрдореНрд╣реА рд▓рдХреНрд╖реНрдп рдкреНрд▓реЗрдЯ рд╕реНрд╡рддрдГ рддрдпрд╛рд░ рдХрд░рддреЛ. рдпреЗрдереЗ рдореА рд╕реНрд╡рдд: рд▓рд╛ рдПрдХ рд▓рд╣рд╛рди рдорд╢реАрди рдкрд░рд╡рд╛рдирдЧреА рджрд┐рд▓реА:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

рдореА рд╡рд╛рдкрд░рдд рдЖрд╣реЗ VerticaOperator() рдореА рдбреЗрдЯрд╛рдмреЗрд╕ рд╕реНрдХреАрдорд╛ рдЖрдгрд┐ рдЯреЗрдмрд▓ рддрдпрд╛рд░ рдХрд░рддреЛ (рдЬрд░ рддреЗ рдЖрдзреАрдкрд╛рд╕реВрди рдЕрд╕реНрддрд┐рддреНрд╡рд╛рдд рдирд╕реЗрд▓ рддрд░ рдирдХреНрдХреАрдЪ). рдореБрдЦреНрдп рдЧреЛрд╖реНрдЯ рдореНрд╣рдгрдЬреЗ рдЕрд╡рд▓рдВрдмрдирд╛рдВрдЪреА рдпреЛрдЧреНрдпрд░рд┐рддреНрдпрд╛ рд╡реНрдпрд╡рд╕реНрдерд╛ рдХрд░рдгреЗ:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

рдЧреЛрд│рд╛ рдХрд░реАрдд рдЖрд╣реЗ

- рдмрд░рдВ, - рд▓рд╣рд╛рди рдЙрдВрджреАрд░ рдореНрд╣рдгрд╛рд▓рд╛, - рдЖрддрд╛ рдирд╛рд╣реА рдХрд╛
рдореА рдЬрдВрдЧрд▓рд╛рддреАрд▓ рд╕рд░реНрд╡рд╛рдд рднрдпрдВрдХрд░ рдкреНрд░рд╛рдгреА рдЖрд╣реЗ рдпрд╛рдЪреА рддреБрдореНрд╣рд╛рд▓рд╛ рдЦрд╛рддреНрд░реА рдкрдЯрд▓реА рдЖрд╣реЗ рдХрд╛?

рдЬреНрдпреБрд▓рд┐рдпрд╛ рдбреЛрдирд╛рд▓реНрдбрд╕рди, рдж рдЧреНрд░реБрдлреЗрд▓реЛ

рдорд▓рд╛ рд╡рд╛рдЯрддреЗ рдХреА рдорд╛рдЭреНрдпрд╛ рд╕рд╣рдХрд╛рд░реНтАНрдпрд╛рдВрдордзреНрдпреЗ рдЖрдгрд┐ рдорд╛рдЭреНрдпрд╛рдд рд╕реНрдкрд░реНрдзрд╛ рдЕрд╕реЗрд▓ рддрд░: рдХреЛрдг рдкрдЯрдХрди рд╕реБрд░рд╡рд╛рддреАрдкрд╛рд╕реВрди ETL рдкреНрд░рдХреНрд░рд┐рдпрд╛ рддрдпрд╛рд░ рдХрд░реЗрд▓ рдЖрдгрд┐ рд▓реЙрдиреНрдЪ рдХрд░реЗрд▓: рддреЗ рддреНрдпрд╛рдВрдЪреНрдпрд╛ SSIS рдЖрдгрд┐ рдорд╛рдЙрд╕рд╕рд╣ рдЖрдгрд┐ рдореА рдПрдЕрд░рдлреНрд▓реЛрд╕рд╣ ... рдЖрдгрд┐ рдордЧ рдЖрдореНрд╣реА рджреЗрдЦрднрд╛рд▓ рд╕реБрд▓рднрддреЗрдЪреА рддреБрд▓рдирд╛ рджреЗрдЦреАрд▓ рдХрд░реВ ... рд╡реНрд╡рд╛, рдорд▓рд╛ рд╡рд╛рдЯрддреЗ рдХреА рдореА рддреНрдпрд╛рдВрдирд╛ рд╕рд░реНрд╡ рдЖрдШрд╛рдбреНрдпрд╛рдВрд╡рд░ рдкрд░рд╛рднреВрдд рдХрд░реЗрди рд╣реЗ рддреБрдореНрд╣реА рдорд╛рдиреНрдп рдХрд░рд╛рд▓!

рдЬрд░ рдереЛрдбреЗ рдЕрдзрд┐рдХ рдЧрдВрднреАрд░рдкрдгреЗ рдкрд╛рд╣рд┐рд▓реЗ рддрд░, рдЕрдкрд╛рдЪреЗ рдПрдЕрд░рдлреНрд▓реЛ - рдкреНрд░реЛрдЧреНрд░рд╛рдо рдХреЛрдбрдЪреНрдпрд╛ рд░реВрдкрд╛рдд рдкреНрд░рдХреНрд░рд┐рдпрд╛рдВрдЪреЗ рд╡рд░реНрдгрди рдХрд░реВрди - рдорд╛рдЭреЗ рдХрд╛рдо рдХреЗрд▓реЗ рдЦреВрдк рдЕрдзрд┐рдХ рдЖрд░рд╛рдорджрд╛рдпрдХ рдЖрдгрд┐ рдЖрдирдВрджрджрд╛рдпрдХ.

рддреНрдпрд╛рдЪреА рдЕрдорд░реНрдпрд╛рджрд┐рдд рд╡рд┐рд╕реНрддрд╛рд░рдХреНрд╖рдорддрд╛, рдкреНрд▓рдЧ-рдЗрди рдЖрдгрд┐ рд╕реНрдХреЗрд▓реЗрдмрд┐рд▓рд┐рдЯреАрдЪреА рдкреВрд░реНрд╡рд╕реНрдерд┐рддреА рдпрд╛ рджреЛрдиреНрд╣реА рдмрд╛рдмрддреАрдд, рдЖрдкрд▓реНрдпрд╛рд▓рд╛ рдЬрд╡рд│рдЬрд╡рд│ рдХреЛрдгрддреНрдпрд╛рд╣реА рдХреНрд╖реЗрддреНрд░рд╛рдд рдПрдЕрд░рдлреНрд▓реЛ рд╡рд╛рдкрд░рдгреНрдпрд╛рдЪреА рд╕рдВрдзреА рджреЗрддреЗ: рдЕрдЧрджреА рдбреЗрдЯрд╛ рдЧреЛрд│рд╛ рдХрд░рдгреЗ, рддрдпрд╛рд░ рдХрд░рдгреЗ рдЖрдгрд┐ рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдХрд░рдгреЗ рдпрд╛ рд╕рдВрдкреВрд░реНрдг рдЪрдХреНрд░рд╛рдд, рдЕрдЧрджреА рд░реЙрдХреЗрдЯреНрд╕ (рдордВрдЧрд│рд╛рд╡рд░, рдЪреНрдпрд╛ рдкреНрд░рдХреНрд╖реЗрдкрдгрд╛рдд) рдЕрд░реНрдерд╛рдд).

рднрд╛рдЧ рдЕрдВрддрд┐рдо, рд╕рдВрджрд░реНрдн рдЖрдгрд┐ рдорд╛рд╣рд┐рддреА

рдЖрдореНрд╣реА рддреБрдордЪреНрдпрд╛рд╕рд╛рдареА рдЧреЛрд│рд╛ рдХреЗрд▓реЗрд▓рд╛ рд░реЗрдХ

  • start_date. рд╣реЛрдп, рд╣реЗ рдЖрдзреАрдЪ рд╕реНрдерд╛рдирд┐рдХ рдореЗрдо рдЖрд╣реЗ. рд╡рд╛рдпрд╛ рдбрдЧрдЪрд╛ рдореБрдЦреНрдп рдпреБрдХреНрддрд┐рд╡рд╛рдж start_date рд╕рд░реНрд╡ рдкрд╛рд╕. рдереЛрдбрдХреНрдпрд╛рдд, рдЖрдкрдг рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХреЗрд▓реНрдпрд╛рд╕ start_date рд╡рд░реНрддрдорд╛рди рддрд╛рд░реАрдЦ, рдЖрдгрд┐ schedule_interval - рдПрдХ рджрд┐рд╡рд╕, рдирдВрддрд░ рдбреАрдПрдЬреА рдЙрджреНрдпрд╛ рд╕реБрд░реВ рд╣реЛрдИрд▓.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    рдЖрдгрд┐ рдЖрдгрдЦреА рд╕рдорд╕реНрдпрд╛ рдирд╛рд╣реАрдд.

    рддреНрдпрд╛рдЪреНрдпрд╛рд╢реА рд╕рдВрдмрдВрдзрд┐рдд рдЖрдгрдЦреА рдПрдХ рд░рдирдЯрд╛рдЗрдо рддреНрд░реБрдЯреА рдЖрд╣реЗ: Task is missing the start_date parameter, рдЬреЗ рдмрд╣реБрддреЗрдХрджрд╛ рд╕реВрдЪрд┐рдд рдХрд░рддреЗ рдХреА рдЖрдкрдг рдбреЕрдЧ рдСрдкрд░реЗрдЯрд░рд▓рд╛ рдмрд╛рдВрдзрд╛рдпрд▓рд╛ рд╡рд┐рд╕рд░рд▓рд╛рдд.

  • рд╕рд░реНрд╡ рдПрдХрд╛рдЪ рдорд╢реАрдирд╡рд░. рд╣реЛрдп, рдЖрдгрд┐ рдмреЗрд╕ (рдПрдЕрд░рдлреНрд▓реЛ рд╕реНрд╡рддрдГ рдЖрдгрд┐ рдЖрдордЪреЗ рдХреЛрдЯрд┐рдВрдЧ), рдЖрдгрд┐ рдПрдХ рд╡реЗрдм рд╕рд░реНрд╡реНрд╣рд░, рдЖрдгрд┐ рд╢реЗрдбреНрдпреВрд▓рд░ рдЖрдгрд┐ рдХрд╛рдордЧрд╛рд░. рдЖрдгрд┐ рддреЗ рдХрд╛рдорд╣реА рдХреЗрд▓реЗ. рдкрд░рдВрддреБ рдХрд╛рд▓рд╛рдВрддрд░рд╛рдиреЗ, рд╕реЗрд╡рд╛рдВрд╕рд╛рдареА рдХрд╛рд░реНрдпрд╛рдВрдЪреА рд╕рдВрдЦреНрдпрд╛ рд╡рд╛рдврд▓реА, рдЖрдгрд┐ рдЬреЗрд╡реНрд╣рд╛ PostgreSQL рдиреЗ 20 ms рдРрд╡рдЬреА 5 s рдордзреНрдпреЗ рдирд┐рд░реНрджреЗрд╢рд╛рдВрдХрд╛рд▓рд╛ рдкреНрд░рддрд┐рд╕рд╛рдж рджреЗрдгреНрдпрд╛рд╕ рд╕реБрд░реБрд╡рд╛рдд рдХреЗрд▓реА, рддреЗрд╡реНрд╣рд╛ рдЖрдореНрд╣реА рддреЗ рдШреЗрддрд▓реЗ рдЖрдгрд┐ рджреВрд░ рдиреЗрд▓реЗ.
  • LocalExecutor. рд╣реЛрдп, рдЖрдореНрд╣реА рдЕрдЬреВрдирд╣реА рддреНрдпрд╛рд╡рд░ рдмрд╕рд▓реЛ рдЖрд╣реЛрдд рдЖрдгрд┐ рдЖрдореНрд╣реА рдЖрдзреАрдЪ рдЕрдерд╛рдВрдЧ рдбреЛрд╣рд╛рдЪреНрдпрд╛ рдХрд╛рдард╛рд╡рд░ рдЖрд▓реЛ рдЖрд╣реЛрдд. LocalExecutor рдЖрддрд╛рдкрд░реНрдпрдВрдд рдЖрдордЪреНрдпрд╛рд╕рд╛рдареА рдкреБрд░реЗрд╕реЗ рдЖрд╣реЗ, рдкрд░рдВрддреБ рдЖрддрд╛ рдХрд┐рдорд╛рди рдПрдХрд╛ рдХрд╛рдордЧрд╛рд░рд╛рд╕рд╣ рд╡рд┐рд╕реНрддрд╛рд░рд┐рдд рдХрд░рдгреНрдпрд╛рдЪреА рд╡реЗрд│ рдЖрд▓реА рдЖрд╣реЗ рдЖрдгрд┐ рдЖрдореНрд╣рд╛рд▓рд╛ CeleryExecutor рд╡рд░ рдЬрд╛рдгреНрдпрд╛рд╕рд╛рдареА рдХрдареЛрд░ рдкрд░рд┐рд╢реНрд░рдо рдХрд░рд╛рд╡реЗ рд▓рд╛рдЧрддреАрд▓. рдЖрдгрд┐ рдЖрдкрдг рдПрдХрд╛ рдорд╢реАрдирд╡рд░ рддреНрдпрд╛рдЪреНрдпрд╛рд╕рд╣ рдХрд╛рд░реНрдп рдХрд░реВ рд╢рдХрддрд╛ рд╣реЗ рд▓рдХреНрд╖рд╛рдд рдШреЗрддрд╛, рд╕рд░реНрд╡реНрд╣рд░рд╡рд░ рджреЗрдЦреАрд▓ рд╕реЗрд▓реЗрд░реА рд╡рд╛рдкрд░рдгреНрдпрд╛рдкрд╛рд╕реВрди рдХрд╛рд╣реАрд╣реА рдерд╛рдВрдмрд╡рдд рдирд╛рд╣реА, рдЬреЗ "рдЕрд░реНрдерд╛рдд, рдкреНрд░рд╛рдорд╛рдгрд┐рдХрдкрдгреЗ рдХрдзреАрд╣реА рдЙрддреНрдкрд╛рджрдирд╛рдд рдЬрд╛рдгрд╛рд░ рдирд╛рд╣реА!"
  • рди рд╡рд╛рдкрд░рдгреЗ рдЕрдВрдЧрднреВрдд рд╕рд╛рдзрдиреЗ:
    • рдЬреЛрдбрдгреНрдпрд╛ рд╕реЗрд╡рд╛ рдХреНрд░реЗрдбреЗрдиреНрд╢рд┐рдпрд▓реНрд╕ рд╕рд╛рдард╡рдгреНрдпрд╛рд╕рд╛рдареА,
    • SLA рдЪреБрдХрддреЛ рд╡реЗрд│реЗрд╡рд░ рдкреВрд░реНрдг рди рдЭрд╛рд▓реЗрд▓реНрдпрд╛ рдХрд╛рдорд╛рдВрдирд╛ рдкреНрд░рддрд┐рд╕рд╛рдж рджреЗрдгреЗ,
    • xcom рдореЗрдЯрд╛рдбреЗрдЯрд╛ рдПрдХреНрд╕рдЪреЗрдВрдЬрд╕рд╛рдареА (рдореА рдореНрд╣рдгрд╛рд▓реЛ рдореЗрдЯрд╛рдбреЗрдЯрд╛!) рдбреЕрдЧ рдЯрд╛рд╕реНрдХ рджрд░рдореНрдпрд╛рди.
  • рдореЗрд▓рдЪрд╛ рдЧреИрд░рд╡рд╛рдкрд░. рдмрд░рдВ, рдореА рдХрд╛рдп рд╕рд╛рдВрдЧреВ? рдкрдбрд▓реЗрд▓реНрдпрд╛ рдХрд╛рдорд╛рдВрдЪреНрдпрд╛ рд╕рд░реНрд╡ рдкреБрдирд░рд╛рд╡реГрддреНрддреАрд╕рд╛рдареА рдЕрд▓рд░реНрдЯ рд╕реЗрдЯ рдХреЗрд▓реЗ рдЧреЗрд▓реЗ. рдЖрддрд╛ рдорд╛рдЭреЗ рдХрд╛рд░реНрдп Gmail рдордзреНрдпреЗ Airflow рдХрдбреВрди >90k рдИрдореЗрд▓ рдЖрд╣реЗрдд рдЖрдгрд┐ рд╡реЗрдм рдореЗрд▓ рдереВрдерди рдПрдХрд╛ рд╡реЗрд│реА 100 рд╣реВрди рдЕрдзрд┐рдХ рдЙрдЪрд▓рдгреНрдпрд╛рд╕ рдЖрдгрд┐ рд╣рдЯрд╡рд┐рдгреНрдпрд╛рд╕ рдирдХрд╛рд░ рджреЗрддреЗ.

рдЖрдгрдЦреА рддреЛрдЯреЗ: рдЕрдкрд╛рдЪреЗ рдПрдЕрд░рдлреНрд▓реЛ рдкрд┐рдЯрдлреЗрд▓реНрд╕

рдЕрдзрд┐рдХ рдСрдЯреЛрдореЗрд╢рди рд╕рд╛рдзрдиреЗ

рдЖрдкрд▓реНрдпрд╛ рд╣рд╛рддрд╛рдВрдиреА рдирд╡реНрд╣реЗ рддрд░ рдЖрдкрд▓реНрдпрд╛ рдбреЛрдХреНрдпрд╛рдиреЗ рдЕрдзрд┐рдХ рдХрд╛рд░реНрдп рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА, рдПрдЕрд░рдлреНрд▓реЛрдиреЗ рдЖрдкрд▓реНрдпрд╛рд╕рд╛рдареА рд╣реЗ рддрдпрд╛рд░ рдХреЗрд▓реЗ рдЖрд╣реЗ:

  • рдЖрд░рдИрдПрд╕рдЯреА API - рддреНрдпрд╛рдЪреНрдпрд╛рдХрдбреЗ рдЕрдЬреВрдирд╣реА рдкреНрд░рд╛рдпреЛрдЧрд┐рдХ рд╕реНрдерд┐рддреА рдЖрд╣реЗ, рдЬреА рддреНрдпрд╛рд▓рд╛ рдХрд╛рдо рдХрд░рдгреНрдпрд╛рдкрд╛рд╕реВрди рд░реЛрдЦрдд рдирд╛рд╣реА. рддреНрдпрд╛рджреНрд╡рд╛рд░реЗ, рддреБрдореНрд╣реА рдХреЗрд╡рд│ рдбреЕрдЧреНрдЬ рдЖрдгрд┐ рдЯрд╛рд╕реНрдХрдЪреА рдорд╛рд╣рд┐рддреА рдорд┐рд│рд╡реВ рд╢рдХрдд рдирд╛рд╣реА, рддрд░ рдбреЕрдЧ рдерд╛рдВрдмрд╡реВ/рд╕реБрд░реВ рдХрд░реВ рд╢рдХрддрд╛, рдбреАрдПрдЬреА рд░рди рдХрд┐рдВрд╡рд╛ рдкреВрд▓ рддрдпрд╛рд░ рдХрд░реВ рд╢рдХрддрд╛.
  • CLI - рдХрдорд╛рдВрдб рд▓рд╛рдЗрдирджреНрд╡рд╛рд░реЗ рдЕрдиреЗрдХ рд╕рд╛рдзрдиреЗ рдЙрдкрд▓рдмреНрдз рдЖрд╣реЗрдд рдЬреА рдлрдХреНрдд WebUI рджреНрд╡рд╛рд░реЗ рд╡рд╛рдкрд░рдгреНрдпрд╛рд╕рд╛рдареА рдЧреИрд░рд╕реЛрдпреАрдЪреА рдирд╛рд╣реАрдд, рдкрд░рдВрддреБ рд╕рд╛рдорд╛рдиреНрдпрддрдГ рдЕрдиреБрдкрд╕реНрдерд┐рдд рдЖрд╣реЗрдд. рдЙрджрд╛рд╣рд░рдгрд╛рд░реНрде:
    • backfill рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдгреЗ рд░реАрд╕реНрдЯрд╛рд░реНрдЯ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдЖрд╡рд╢реНрдпрдХ рдЖрд╣реЗ.
      рдЙрджрд╛рд╣рд░рдгрд╛рд░реНрде, рд╡рд┐рд╢реНрд▓реЗрд╖рдХ рдЖрд▓реЗ рдЖрдгрд┐ рдореНрд╣рдгрд╛рд▓реЗ: тАЬрдЖрдгрд┐ рдХреЙрдореНрд░реЗрдб, рддреБрдордЪреНрдпрд╛рдХрдбреЗ 1 рддреЗ 13 рдЬрд╛рдиреЗрд╡рд╛рд░реАрдЪреНрдпрд╛ рдбреЗрдЯрд╛рдордзреНрдпреЗ рдореВрд░реНрдЦрдкрдгрд╛ рдЖрд╣реЗ! рджреБрд░реБрд╕реНрдд рдХрд░рд╛, рджреБрд░реБрд╕реНрдд рдХрд░рд╛, рджреБрд░реБрд╕реНрдд рдХрд░рд╛, рджреБрд░реБрд╕реНрдд рдХрд░рд╛!" рдЖрдгрд┐ рдЖрдкрдг рдЕрд╕реЗ рд╣реЙрдм рдЖрд╣рд╛рдд:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • рдмреЗрд╕ рд╕реЗрд╡рд╛: initdb, resetdb, upgradedb, checkdb.
    • run, рдЬреЗ рддреБрдореНрд╣рд╛рд▓рд╛ рдПрдХ рдЙрджрд╛рд╣рд░рдг рдХрд╛рд░реНрдп рдЪрд╛рд▓рд╡рд┐рдгреНрдпрд╛рд╕ рдЖрдгрд┐ рд╕рд░реНрд╡ рдЕрд╡рд▓рдВрдмрдирд╛рдВрд╡рд░ рд╕реНрдХреЛрдЕрд░ рдХрд░рдгреНрдпрд╛рд╕ рдЕрдиреБрдорддреА рджреЗрддреЗ. рд╢рд┐рд╡рд╛рдп, рдЖрдкрдг рддреЗ рджреНрд╡рд╛рд░реЗ рдЪрд╛рд▓рд╡реВ рд╢рдХрддрд╛ LocalExecutor, рддреБрдордЪреНрдпрд╛рдХрдбреЗ рд╕реЗрд▓реЗрд░реА рдХреНрд▓рд╕реНрдЯрд░ рдЕрд╕рд▓рд╛ рддрд░реАрд╣реА.
    • рдЬрд╡рд│рдЬрд╡рд│ рд╕рдорд╛рди рдЧреЛрд╖реНрдЯ рдХрд░рддреЗ test, рдлрдХреНрдд рдмреЗрд╕ рдордзреНрдпреЗ рджреЗрдЦреАрд▓ рдХрд╛рд╣реАрд╣реА рд▓рд┐рд╣рд┐рдд рдирд╛рд╣реА.
    • connections рд╢реЗрд▓рдордзреВрди рдореЛрдареНрдпрд╛ рдкреНрд░рдорд╛рдгрд╛рдд рдХрдиреЗрдХреНрд╢рди рддрдпрд╛рд░ рдХрд░рдгреНрдпрд╛рд╕ рдЕрдиреБрдорддреА рджреЗрддреЗ.
  • рдкрд╛рдпрдерди рдПрдкреАрдЖрдп - рд╕рдВрд╡рд╛рдж рд╕рд╛рдзрдгреНрдпрд╛рдЪрд╛ рдПрдХ рдРрд╡рдЬреА рд╣рд╛рд░реНрдбрдХреЛрд░ рдорд╛рд░реНрдЧ, рдЬреЛ рдкреНрд▓рдЧрдЗрдиреНрд╕рд╕рд╛рдареА рдЖрд╣реЗ рдЖрдгрд┐ рддреНрдпрд╛рдд рд▓рд╣рд╛рди рд╣рд╛рддрд╛рдВрдиреА рдЭреБрдВрдбреВ рджреЗрдд рдирд╛рд╣реА. рдкрдг рдЖрдореНрд╣рд╛рд▓рд╛ рдЬрд╛рдгреНрдпрд╛рдкрд╛рд╕реВрди рдХреЛрдг рд░реЛрдЦрдгрд╛рд░ /home/airflow/dags, рдзрд╛рд╡рдгреЗ ipython рдЖрдгрд┐ рдЧреЛрдВрдзрд│ рд╕реБрд░реВ? рдЖрдкрдг, рдЙрджрд╛рд╣рд░рдгрд╛рд░реНрде, рдЦрд╛рд▓реАрд▓ рдХреЛрдбрд╕рд╣ рд╕рд░реНрд╡ рдХрдиреЗрдХреНрд╢рди рдирд┐рд░реНрдпрд╛рдд рдХрд░реВ рд╢рдХрддрд╛:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • рдПрдЕрд░рдлреНрд▓реЛ рдореЗрдЯрд╛рдбреЗрдЯрд╛рдмреЗрд╕рд╢реА рдХрдиреЗрдХреНрдЯ рдХрд░рдд рдЖрд╣реЗ. рдореА рддреНрдпрд╛рд╡рд░ рд▓рд┐рд╣рд┐рдгреНрдпрд╛рдЪреА рд╢рд┐рдлрд╛рд░рд╕ рдХрд░рдд рдирд╛рд╣реА, рдкрд░рдВрддреБ рд╡рд┐рд╡рд┐рдз рд╡рд┐рд╢рд┐рд╖реНрдЯ рдореЗрдЯреНрд░рд┐рдХреНрд╕рд╕рд╛рдареА рдХрд╛рд░реНрдп рд╕реНрдерд┐рддреА рдорд┐рд│рд╡рдгреЗ рдХреЛрдгрддреНрдпрд╛рд╣реА API рд╡рд╛рдкрд░рдгреНрдпрд╛рдкреЗрдХреНрд╖рд╛ рдмрд░реЗрдЪ рдЬрд▓рдж рдЖрдгрд┐ рд╕реЛрдкреЗ рдЕрд╕реВ рд╢рдХрддреЗ.

    рдЖрдкрдг рдЕрд╕реЗ рдореНрд╣рдгреВрдпрд╛ рдХреА рдЖрдкрд▓реА рд╕рд░реНрд╡ рдХрд╛рд░реНрдпреЗ рдЙрджрд╛рддреНрдд рдирд╕рддрд╛рдд, рдкрд░рдВрддреБ рддреА рдХрдзреАрдХрдзреА рдкрдбреВ рд╢рдХрддрд╛рдд рдЖрдгрд┐ рд╣реЗ рд╕рд╛рдорд╛рдиреНрдп рдЖрд╣реЗ. рдкрд░рдВрддреБ рдХрд╛рд╣реА рдЕрдбрдерд│реЗ рдЖрдзреАрдЪ рд╕рдВрд╢рдпрд╛рд╕реНрдкрдж рдЖрд╣реЗрдд рдЖрдгрд┐ рддреЗ рддрдкрд╛рд╕рдгреЗ рдЖрд╡рд╢реНрдпрдХ рдЖрд╣реЗ.

    рдПрд╕рдХреНрдпреВрдПрд▓ рд╕рд╛рд╡рдз рд░рд╣рд╛!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

рд╕рдВрджрд░реНрдн

рдЖрдгрд┐ рдЕрд░реНрдерд╛рддрдЪ, Google рдЬрд╛рд░реА рдХреЗрд▓реНрдпрд╛рдкрд╛рд╕реВрди рдкрд╣рд┐рд▓реЗ рджрд╣рд╛ рджреБрд╡реЗ рдорд╛рдЭреНрдпрд╛ рдмреБрдХрдорд╛рд░реНрдХрдордзреАрд▓ рдПрдЕрд░рдлреНрд▓реЛ рдлреЛрд▓реНрдбрд░рдордзреАрд▓ рд╕рд╛рдордЧреНрд░реА рдЖрд╣реЗрдд.

рдЖрдгрд┐ рд▓реЗрдЦрд╛рдд рд╡рд╛рдкрд░рд▓реЗрд▓реЗ рджреБрд╡реЗ:

рд╕реНрддреНрд░реЛрдд: www.habr.com