Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

рдирдорд╕реНрддреЗ, рдо рджрд┐рдорд┐рддреНрд░реА рд▓рдЧрднрд┐рдиреЗрдиреНрдХреЛ рд╣реБрдБ - рдХрдореНрдкрдиреАрд╣рд░реВрдХреЛ рднреЗрдЬреЗрдЯ рд╕рдореВрд╣рдХреЛ рд╡рд┐рд╢реНрд▓реЗрд╖рдг рд╡рд┐рднрд╛рдЧрдХреЛ рдбрд╛рдЯрд╛ рдЗрдиреНрдЬрд┐рдирд┐рдпрд░ред

рдо рддрдкрд╛рдИрдВрд▓рд╛рдИ ETL рдкреНрд░рдХреНрд░рд┐рдпрд╛рд╣рд░реВ рд╡рд┐рдХрд╛рд╕ рдЧрд░реНрдирдХреЛ рд▓рд╛рдЧрд┐ рдПрдХ рдЕрджреНрднреБрдд рдЙрдкрдХрд░рдгрдХреЛ рдмрд╛рд░реЗрдорд╛ рдмрддрд╛рдЙрдиреЗрдЫреБ - Apache Airflowред рддрд░ рдПрдпрд░рдлреНрд▓реЛ рдпрддрд┐ рдмрд╣реБрдореБрдЦреА рд░ рдмрд╣реБрдореБрдЦреА рдЫ рдХрд┐ рддрдкрд╛рдИрд▓реЗ рдпрд╕рд▓рд╛рдИ рдирдЬрд┐рдХрдмрд╛рдЯ рд╣реЗрд░реНрдиреБ рдкрд░реНрдЫ рдпрджрд┐ рддрдкрд╛рдИ рдбреЗрдЯрд╛ рдкреНрд░рд╡рд╛рд╣рдорд╛ рд╕рдВрд▓рдЧреНрди рд╣реБрдиреБрд╣реБрдиреНрди рднрдиреЗ рдкрдирд┐, рддрд░ рдЖрд╡рдзрд┐рдХ рд░реВрдкрдорд╛ рдХреБрдиреИ рдкрдирд┐ рдкреНрд░рдХреНрд░рд┐рдпрд╛рд╣рд░реВ рд╕реБрд░реВ рдЧрд░реНрди рд░ рддрд┐рдиреАрд╣рд░реВрдХреЛ рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрдирдХреЛ рдирд┐рдЧрд░рд╛рдиреА рдЧрд░реНрди рдЖрд╡рд╢реНрдпрдХ рдЫред

рд░ рд╣реЛ, рдо рдорд╛рддреНрд░ рдмрддрд╛рдЙрдиреЗ рдЫреИрди, рддрд░ рдкрдирд┐ рджреЗрдЦрд╛рдЙрдБрдЫреБ: рдХрд╛рд░реНрдпрдХреНрд░рдордорд╛ рдзреЗрд░реИ рдХреЛрдб, рд╕реНрдХреНрд░рд┐рдирд╕рдЯ рд░ рд╕рд┐рдлрд╛рд░рд┐рд╕рд╣рд░реВ рдЫрдиреНред

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ
рддрдкрд╛рдИрд▓реЗ рд╕рд╛рдорд╛рдиреНрдпрддрдпрд╛ рдХреЗ рджреЗрдЦреНрдиреБрд╣реБрдиреНрдЫ рдЬрдм рддрдкрд╛рдИрд▓реЗ Airflow / Wikimedia Commons рд╢рдмреНрдж рдЧреБрдЧрд▓ рдЧрд░реНрдиреБрд╣реБрдиреНрдЫ

рд╕рд╛рдордЧреНрд░реАрдХреЛ рддрд╛рд▓рд┐рдХрд╛

рдкрд░рд┐рдЪрдп

Apache Airflow Django рдЬрд╕реНрддреИ рдЫ:

  • python рдорд╛ рд▓реЗрдЦрд┐рдПрдХреЛ
  • рддреНрдпрд╣рд╛рдБ рдПрдХ рдорд╣рд╛рди рдкреНрд░рд╢рд╛рд╕рдХ рдкреНрдпрд╛рдирд▓ рдЫ,
  • рдЕрдирд┐рд╢реНрдЪрд┐рдд рдХрд╛рд▓рдХрд╛ рд▓рд╛рдЧрд┐ рд╡рд┐рд╕реНрддрд╛рд░ рдпреЛрдЧреНрдп

- рдХреЗрд╡рд▓ рд░рд╛рдореНрд░реЛ, рд░ рдпреЛ рдкреВрд░реНрдг рд░реВрдкрдорд╛ рдлрд░рдХ рдЙрджреНрджреЗрд╢реНрдпрдХрд╛ рд▓рд╛рдЧрд┐ рдмрдирд╛рдЗрдПрдХреЛ рдерд┐рдпреЛ, рдЕрд░реНрдерд╛рддреН (рдЬрд╕реНрддреИ рдпреЛ kat рдЕрдШрд┐ рд▓реЗрдЦрд┐рдПрдХреЛ рдЫ):

  • рдЕрд╕реАрдорд┐рдд рд╕рдВрдЦреНрдпрд╛рдорд╛ рдореЗрд╢рд┐рдирд╣рд░реВрдорд╛ рдЪрд▓рд┐рд░рд╣реЗрдХреЛ рд░ рдЕрдиреБрдЧрдорди рдХрд╛рд░реНрдпрд╣рд░реВ (рдЬрддрд┐ рдзреЗрд░реИ рд╕реЗрд▓реЗрд░реА / рдХреБрдмрд░реНрдиреЗрдЯреНрд╕ рд░ рддрдкрд╛рдИрдВрдХреЛ рдЕрдиреНрддрд╕реНрдХрд░рдгрд▓реЗ рддрдкрд╛рдИрдВрд▓рд╛рдИ рдЕрдиреБрдорддрд┐ рджрд┐рдиреЗрдЫ)
  • рдкрд╛рдЗрдерди рдХреЛрдб рд▓реЗрдЦреНрди рд░ рдмреБрдЭреНрди рдзреЗрд░реИ рд╕рдЬрд┐рд▓реЛ рджреЗрдЦрд┐ рдЧрддрд┐рд╢реАрд▓ рдХрд╛рд░реНрдпрдкреНрд░рд╡рд╛рд╣ рдЙрддреНрдкрд╛рджрди рд╕рдВрдЧ
  • рд░ рдХреБрдиреИ рдкрдирд┐ рдбрд╛рдЯрд╛рдмреЗрд╕ рд░ API рд╣рд░реВ рджреБрд╡реИ рддрдпрд╛рд░-рдирд┐рд░реНрдорд┐рдд рдХрдореНрдкреЛрдиреЗрдиреНрдЯрд╣рд░реВ рд░ рдШрд░-рдирд┐рд░реНрдорд┐рдд рдкреНрд▓рдЧрдЗрдирд╣рд░реВ рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрд░ рдПрдХрдЕрд░реНрдХрд╛рд╕рдБрдЧ рдЬрдбрд╛рди рдЧрд░реНрдиреЗ рдХреНрд╖рдорддрд╛ (рдЬреБрди рдЕрддреНрдпрдиреНрдд рд╕рд░рд▓ рдЫ)ред

рд╣рд╛рдореА рдпрд╕рд░реА Apache Airflow рдкреНрд░рдпреЛрдЧ рдЧрд░реНрдЫреМрдВ:

  • рд╣рд╛рдореА DWH рд░ ODS (рд╣рд╛рдореАрд╕рдБрдЧ Vertica рд░ Clickhouse рдЫ) рдорд╛ рд╡рд┐рднрд┐рдиреНрди рд╕реНрд░реЛрддрд╣рд░реВ (рдзреЗрд░реИ SQL рд╕рд░реНрднрд░ рд░ PostgreSQL рдЙрджрд╛рд╣рд░рдгрд╣рд░реВ, рдЕрдиреБрдкреНрд░рдпреЛрдЧ рдореЗрдЯреНрд░рд┐рдХрд╣рд░реВ рд╕рд╣рд┐рдд рд╡рд┐рднрд┐рдиреНрди API, 1C рдкрдирд┐) рдмрд╛рдЯ рдбреЗрдЯрд╛ рд╕рдЩреНрдХрд▓рди рдЧрд░реНрдЫреМрдВред
  • рдХрддрд┐ рдЙрдиреНрдирдд cron, рдЬрд╕рд▓реЗ ODS рдорд╛ рдбрд╛рдЯрд╛ рд╕рдореЗрдХрди рдкреНрд░рдХреНрд░рд┐рдпрд╛рд╣рд░реВ рд╕реБрд░реБ рдЧрд░реНрджрдЫ, рд░ рддрд┐рдиреАрд╣рд░реВрдХреЛ рдорд░реНрдорддрд╕рдореНрднрд╛рд░ рдкрдирд┐ рдирд┐рдЧрд░рд╛рдиреА рдЧрд░реНрджрдЫред

рднрд░реНрдЦрд░реИ рд╕рдореНрдо, рд╣рд╛рдореНрд░рд╛ рдЖрд╡рд╢реНрдпрдХрддрд╛рд╣рд░реВ 32 рдХреЛрд░ рд░ 50 GB RAM рднрдПрдХреЛ рдПрдЙрдЯрд╛ рд╕рд╛рдиреЛ рд╕рд░реНрднрд░рджреНрд╡рд╛рд░рд╛ рдХрднрд░ рдЧрд░рд┐рдПрдХреЛ рдерд┐рдпреЛред Airflow рдорд╛, рдпреЛ рдХрд╛рдо рдЧрд░реНрджрдЫ:

  • рдЕрдзрд┐рдХ 200 рдбрдЧрд╣рд░реВ (рд╡рд╛рд╕реНрддрд╡рдорд╛ рдХрд╛рд░реНрдпрдкреНрд░рд╡рд╛рд╣, рдЬрд╕рдорд╛ рд╣рд╛рдореАрд▓реЗ рдХрд╛рд░реНрдпрд╣рд░реВ рднрд░реНрдпреМрдВ),
  • рдкреНрд░рддреНрдпреЗрдХрдорд╛ рдФрд╕рддрдорд╛ 70 рдХрд╛рд░реНрдпрд╣рд░реВ,
  • рдпреЛ рднрд▓рд╛рдЗ рд╕реБрд░реБ рд╣реБрдиреНрдЫ (рдФрд╕рддрдорд╛ рдкрдирд┐) рдПрдХ рдШрдгреНрдЯрд╛ рдПрдХ рдкрдЯрдХ.

рд░ рд╣рд╛рдореАрд▓реЗ рдХрд╕рд░реА рд╡рд┐рд╕реНрддрд╛рд░ рдЧрд░реНрдпреМрдВ рднрдиреНрдиреЗ рдмрд╛рд░реЗ, рдо рддрд▓ рд▓реЗрдЦреНрдиреЗрдЫреБ, рддрд░ рдЕрдм рд╣рд╛рдореА ├╝ber-рд╕рдорд╕реНрдпрд╛рд▓рд╛рдИ рдкрд░рд┐рднрд╛рд╖рд┐рдд рдЧрд░реМрдВ рдЬреБрди рд╣рд╛рдореА рд╕рдорд╛рдзрд╛рди рдЧрд░реНрдиреЗрдЫреМрдВ:

рддреНрдпрд╣рд╛рдБ рддреАрдирд╡рдЯрд╛ рд╕реНрд░реЛрдд SQL рд╕рд░реНрднрд░рд╣рд░реВ рдЫрдиреН, рдкреНрд░рддреНрдпреЗрдХрдорд╛ 50 рдбрд╛рдЯрд╛рдмреЗрд╕рд╣рд░реВ рдЫрдиреН - рдХреНрд░рдорд╢рдГ рдПрдЙрдЯрд╛ рдкрд░рд┐рдпреЛрдЬрдирд╛рдХреЛ рдЙрджрд╛рд╣рд░рдгрд╣рд░реВ, рддрд┐рдиреАрд╣рд░реВрд╕рдБрдЧ рдПрдЙрдЯреИ рд╕рдВрд░рдЪрдирд╛ рдЫ (рд▓рдЧрднрдЧ рд╕рдмреИ рдард╛рдЙрдБрдорд╛, mua-ha-ha), рдЬрд╕рдХреЛ рдорддрд▓рдм рдкреНрд░рддреНрдпреЗрдХрд╕рдБрдЧ рдЕрд░реНрдбрд░ рддрд╛рд▓рд┐рдХрд╛ рдЫ (рднрд╛рдЧреНрдпрд╡рд╢, рддреНрдпрд╕рд╕рдБрдЧрдХреЛ рддрд╛рд▓рд┐рдХрд╛ред рдирд╛рдо рдХреБрдиреИ рдкрдирд┐ рд╡реНрдпрд╡рд╕рд╛рдпрдорд╛ рдзрдХреЗрд▓реНрди рд╕рдХрд┐рдиреНрдЫ)ред рд╣рд╛рдореА рд╕реЗрд╡рд╛ рдХреНрд╖реЗрддреНрд░рд╣рд░реВ (рд╕реНрд░реЛрдд рд╕рд░реНрднрд░, рд╕реНрд░реЛрдд рдбрд╛рдЯрд╛рдмреЗрд╕, ETL рдЯрд╛рд╕реНрдХ рдЖрдИрдбреА) рдердкреЗрд░ рдбрд╛рдЯрд╛ рд▓рд┐рдиреНрдЫреМрдВ рд░ рддрд┐рдиреАрд╣рд░реВрд▓рд╛рдИ рднрд░реНрдЯрд┐рдХрд╛рдорд╛ рдлреНрдпрд╛рдБрдХреНрдЫреМрдВред

рдЪрд▓реЛ рдЬрд╛рдиреБрд╣реЛрд╕реН!

рдореБрдЦреНрдп рднрд╛рдЧ, рд╡реНрдпрд╛рд╡рд╣рд╛рд░рд┐рдХ (рд░ рдереЛрд░реИ рд╕реИрджреНрдзрд╛рдиреНрддрд┐рдХ)

рдХрд┐рди рд╣рд╛рдореА (рд░ рддрдкрд╛рдИрдВ)

рдЬрдм рд░реБрдЦрд╣рд░реВ рдареВрд▓рд╛ рдерд┐рдП рд░ рдо рд╕рд╛рдзрд╛рд░рдг рдерд┐рдПрдБ SQL-рдПрдХ рд░реВрд╕реА рдЦреБрджреНрд░рд╛ рдорд╛ schik, рд╣рд╛рдореАрд▓реЗ рд╣рд╛рдореАрд▓рд╛рдИ рдЙрдкрд▓рдмреНрдз рджреБрдИ рдЙрдкрдХрд░рдгрд╣рд░реВ рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрд░ ETL рдкреНрд░рдХреНрд░рд┐рдпрд╛рд╣рд░реВ рдЙрд░реНрдл тАЛтАЛрдбреЗрдЯрд╛ рдкреНрд░рд╡рд╛рд╣ рдШреЛрдЯрд╛рд▓рд╛ рдЧрд░реНрдпреМрдВ:

  • рд╕реВрдЪрдирд╛ рд╢рдХреНрддрд┐ рдХреЗрдиреНрджреНрд░ - рдПрдХ рдЕрддреНрдпрдиреНрдд рдлреИрд▓рд╛рдЙрдиреЗ рдкреНрд░рдгрд╛рд▓реА, рдЕрддреНрдпрдиреНрдд рдЙрддреНрдкрд╛рджрдХ, рдпрд╕рдХреЛ рдЖрдлреНрдиреИ рд╣рд╛рд░реНрдбрд╡реЗрдпрд░, рдпрд╕рдХреЛ рдЖрдлреНрдиреИ рд╕рдВрд╕реНрдХрд░рдгред рдореИрд▓реЗ рдпрд╕рдХреЛ рдХреНрд╖рдорддрд╛рдХреЛ рез% рднрдЧрд╡рд╛рди рдирд┐рд╖реЗрдз рдЧрд░реЗрдВред рдХрд┐рди? рдареАрдХ рдЫ, рд╕рдмреИ рднрдиреНрджрд╛ рдкрд╣рд┐рд▓реЗ, рдпреЛ рдЗрдиреНрдЯрд░рдлреЗрд╕, 1 рдХреЛ рджрд╢рдХ рджреЗрдЦрд┐, рдорд╛рдирд╕рд┐рдХ рд░реВрдкрдорд╛ рд╣рд╛рдореАрд▓рд╛рдИ рджрдмрд╛рдм рджрд┐рдпреЛред рджреЛрд╕реНрд░реЛ, рдпреЛ рд╕рдВрдХреБрдЪрди рдЕрддреНрдпрдиреНрддреИ рдлреИрдВрд╕реА рдкреНрд░рдХреНрд░рд┐рдпрд╛рд╣рд░реВ, рдЙрдЧреНрд░ рдШрдЯрдХ рдкреБрди: рдкреНрд░рдпреЛрдЧ рд░ рдЕрдиреНрдп рдзреЗрд░реИ-рдорд╣рддреНрд╡рдкреВрд░реНрдг-рдЙрджреНрдпрдо-рдЯреНрд░рд┐рдХрд╣рд░реВрдХреЛ рд▓рд╛рдЧрд┐ рдбрд┐рдЬрд╛рдЗрди рдЧрд░рд┐рдПрдХреЛ рд╣реЛред рдпрд╕рдХреЛ рд▓рд╛рдЧрдд рдХреЗ рдмрд╛рд░реЗ рдорд╛, рдПрдпрд░рдмрд╕ A380 / рд╡рд░реНрд╖ рдХреЛ рдкрдЦреЗрдЯрд╛ рдЬрд╕реНрддреИ, рд╣рд╛рдореА рдХреЗрд╣рд┐ рднрдиреНрди рдЫреИрдиред

    рд╕рд╛рд╡рдзрд╛рди, рд╕реНрдХреНрд░рд┐рдирд╕рдЯрд▓реЗ рейреж рд╡рд░реНрд╖рднрдиреНрджрд╛ рдХрдо рдЙрдореЗрд░рдХрд╛ рдорд╛рдирд┐рд╕рд╣рд░реВрд▓рд╛рдИ рдЕрд▓рд┐рдХрддрд┐ рдЪреЛрдЯ рдкреБрд░реНрдпрд╛рдЙрди рд╕рдХреНрдЫ

    Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

  • SQL рд╕рд░реНрднрд░ рдПрдХреАрдХрд░рдг рд╕рд░реНрднрд░ - рд╣рд╛рдореАрд▓реЗ рдпреЛ рд╕рд╛рдереАрд▓рд╛рдИ рд╣рд╛рдореНрд░реЛ рдЕрдиреНрддрд░реНрджреЗрд╢реАрдп рдкрд░рд┐рдпреЛрдЬрдирд╛ рдкреНрд░рд╡рд╛рд╣рдорд╛ рдкреНрд░рдпреЛрдЧ рдЧрд░реНрдпреМрдВред рдареАрдХ рдЫ, рд╡рд╛рд╕реНрддрд╡рдорд╛: рд╣рд╛рдореАрд▓реЗ рдкрд╣рд┐рд▓реЗ рдиреИ SQL рд╕рд░реНрднрд░ рдкреНрд░рдпреЛрдЧ рдЧрд░реНрдЫреМрдВ, рд░ рдпрд╕рдХреЛ ETL рдЙрдкрдХрд░рдгрд╣рд░реВ рдкреНрд░рдпреЛрдЧ рдирдЧрд░реНрдиреБ рдХреБрдиреИ рди рдХреБрдиреИ рд░реВрдкрдорд╛ рдЕрдиреБрдЪрд┐рдд рд╣реБрдиреЗрдЫред рдпрд╕рдорд╛ рд╕рдмреИ рдХреБрд░рд╛ рд░рд╛рдореНрд░реЛ рдЫ: рджреБрдмреИ рдЗрдиреНрдЯрд░рдлреЗрд╕ рд╕реБрдиреНрджрд░ рдЫ, рд░ рдкреНрд░рдЧрддрд┐ рд░рд┐рдкреЛрд░реНрдЯрд╣рд░реВ ... рддрд░ рд╣рд╛рдореА рдХрд┐рди рд╕рдлреНрдЯрд╡реЗрдпрд░ рдЙрддреНрдкрд╛рджрдирд╣рд░реВ рдорди рдкрд░рд╛рдЙрдБрджреИрдиреМрдВ, рдУрд╣, рдпрд╕рдХреЛ рд▓рд╛рдЧрд┐ рд╣реЛрдЗрдиред рдпрд╕рд▓рд╛рдИ рд╕рдВрд╕реНрдХрд░рдг dtsx (рдЬрд╕рдХреЛ XML рдиреЛрдбрд╣рд░реВ рдмрдЪрддрдорд╛ рдлреЗрд░рдмрджрд▓ рдЧрд░рд┐рдПрдХреЛ рдЫ) рд╣рд╛рдореА рдЧрд░реНрди рд╕рдХреНрдЫреМрдВ, рддрд░ рдпрд╕рдХреЛ рдорддрд▓рдм рдХреЗ рд╣реЛ? рдПрдХ рд╕рд░реНрднрд░рдмрд╛рдЯ рдЕрд░реНрдХреЛ рд╕рд░реНрднрд░рдорд╛ рд╕рдпреМрдВ рддрд╛рд▓рд┐рдХрд╛рд╣рд░реВ рддрд╛рдиреНрдиреЗ рдХрд╛рд░реНрдп рдкреНрдпрд╛рдХреЗрдЬ рдХрд╕рд░реА рдмрдирд╛рдЙрдиреЗ? рд╣реЛ, рдХреЗ рдПрдХ рд╕рдп, рддрдкрд╛рдИрдХреЛ рдФрдВрд▓рд╛ рдмреАрд╕ рдЯреБрдХреНрд░рд╛рдмрд╛рдЯ рдЦрд╕реНрдиреЗрдЫ, рдорд╛рдЙрд╕рдХреЛ рдмрдЯрдирдорд╛ рдХреНрд▓рд┐рдХ рдЧрд░реНрдиреБрд╣реЛрд╕реНред рддрд░ рдпреЛ рдирд┐рд╢реНрдЪрд┐рдд рд░реВрдкрдорд╛ рдЕрдзрд┐рдХ рдлреИрд╢рдиреЗрдмрд▓ рджреЗрдЦрд┐рдиреНрдЫ:

    Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

рд╣рд╛рдореАрд▓реЗ рдкрдХреНрдХреИ рдкрдирд┐ рдмрд╛рдЯреЛ рдЦреЛрдЬреНрдпреМрдВред рдорд╛рдорд▓рд╛ рдкрдирд┐ рд▓рдЧрднрдЧ рд╕реНрд╡-рд▓рд┐рдЦрд┐рдд SSIS рдкреНрдпрд╛рдХреЗрдЬ рдЬреЗрдирд░реЗрдЯрд░рдорд╛ рдЖрдпреЛ ...

... рд░ рддреНрдпрд╕рдкрдЫрд┐ рдорд▓рд╛рдИ рдирдпрд╛рдБ рдХрд╛рдо рднреЗрдЯрд┐рдпреЛред рд░ рдЕрдкрд╛рдЪреЗ рдПрдпрд░рдлреНрд▓реЛрд▓реЗ рдорд▓рд╛рдИ рдпрд╕рдорд╛ рдУрднрд░рдЯреЗрдХ рдЧрд░реНрдпреЛред

рдЬрдм рдореИрд▓реЗ рдкрддреНрддрд╛ рд▓рдЧрд╛рдПрдБ рдХрд┐ ETL рдкреНрд░рдХреНрд░рд┐рдпрд╛ рд╡рд┐рд╡рд░рдгрд╣рд░реВ рд╕рд╛рдзрд╛рд░рдг рдкрд╛рдЗрдерди рдХреЛрдб рд╣реБрдиреН, рдореИрд▓реЗ рдЖрдирдиреНрджрдХреЛ рд▓рд╛рдЧрд┐ рдирд╛рдЪрд┐рдиред рдпрд╕рд░реА рдбрд╛рдЯрд╛ рд╕реНрдЯреНрд░рд┐рдорд╣рд░реВ рд╕рдВрд╕реНрдХрд░рдг рд░ рднрд┐рдиреНрди рдерд┐рдП, рд░ рд╕рдпреМрдВ рдбрд╛рдЯрд╛рдмреЗрд╕рд╣рд░реВрдмрд╛рдЯ рдПрдЙрдЯреИ рд▓рдХреНрд╖реНрдпрдорд╛ рдПрдЙрдЯреИ рд╕рдВрд░рдЪрдирд╛рд╕рд╣рд┐рддрдХреЛ рддрд╛рд▓рд┐рдХрд╛рд╣рд░реВ рдбреЗрдв рд╡рд╛ рджреБрдИ 13тАЭ рд╕реНрдХреНрд░рд┐рдирд╣рд░реВрдорд╛ рдкрд╛рдЗрдерди рдХреЛрдбрдХреЛ рдХреБрд░рд╛ рднрдпреЛред

рдХреНрд▓рд╕реНрдЯрд░ рдЬрдореНрдорд╛ рдЧрд░реНрджреИ

рдкреВрд░реНрдгрддрдпрд╛ рдХрд┐рдиреНрдбрд░рдЧрд╛рд░реНрдЯрдирд▓рд╛рдИ рд╡реНрдпрд╡рд╕реНрдерд┐рдд рдирдЧрд░реМрдВ, рд░ рдпрд╣рд╛рдБ рдкреВрд░реНрдг рд░реВрдкрдорд╛ рд╕реНрдкрд╖реНрдЯ рдЪреАрдЬрд╣рд░реВрдХреЛ рдмрд╛рд░реЗрдорд╛ рдХреБрд░рд╛ рдирдЧрд░реМрдВ, рдЬрд╕реНрддреИ рдПрдпрд░рдлреНрд▓реЛ рд╕реНрдерд╛рдкрдирд╛ рдЧрд░реНрдиреЗ, рддрдкрд╛рдИрдВрдХреЛ рдЫрдиреМрдЯ рдЧрд░рд┐рдПрдХреЛ рдбрд╛рдЯрд╛рдмреЗрд╕, рд╕реЗрд▓реЗрд░реА рд░ рдбрдХреНрд╕рд╣рд░реВрдорд╛ рд╡рд░реНрдгрди рдЧрд░рд┐рдПрдХрд╛ рдЕрдиреНрдп рдХреЗрд╕рд╣рд░реВред

рддрд╛рдХрд┐ рд╣рд╛рдореА рддреБрд░реБрдиреНрддреИ рдкреНрд░рдпреЛрдЧрд╣рд░реВ рд╕реБрд░реБ рдЧрд░реНрди рд╕рдХреНрдЫреМрдВ, рдореИрд▓реЗ рд╕реНрдХреЗрдЪ рдЧрд░реЗрдВ docker-compose.yml рдЬрд╕рдорд╛:

  • рд╡рд╛рд╕реНрддрд╡рдорд╛ рдЙрдареМрдВ рд╣рд╛рд╡рд╛ рдкреНрд░рд╡рд╛рд╣: рдЕрдиреБрд╕реВрдЪреАрдХрд░реНрддрд╛, рд╡реЗрдмрд╕рд░реНрднрд░ред рд╕реЗрд▓рд░реА рдХрд╛рд░реНрдпрд╣рд░реВ рдЕрдиреБрдЧрдорди рдЧрд░реНрди рдлреВрд▓ рдкрдирд┐ рддреНрдпрд╣рд╛рдБ рдХрддрд╛рдИ рд╣реБрдиреЗрдЫ (рдХрд┐рдирдХрд┐ рдпреЛ рдкрд╣рд┐рд▓реЗ рдиреИ рднрд┐рддреНрд░ рдзрдХреЗрд▓рд┐рдПрдХреЛ рдЫред apache/airflow:1.10.10-python3.7рддрд░ рд╣рд╛рдореАрд▓рд╛рдИ рдХреБрдиреИ рдЖрдкрддреНрддрд┐ рдЫреИрди)
  • PostgreSQL, рдЬрд╕рдорд╛ Airflow рд▓реЗ рдЖрдлреНрдиреЛ рд╕реЗрд╡рд╛ рдЬрд╛рдирдХрд╛рд░реА (рд╢реЗрдбреНрдпреБрд▓рд░ рдбреЗрдЯрд╛, рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рддрдереНрдпрд╛рдЩреНрдХ, рдЖрджрд┐) рд▓реЗрдЦреНрдиреЗрдЫ, рд░ Celery рд▓реЗ рд╕рдореНрдкрдиреНрди рдХрд╛рд░реНрдпрд╣рд░реВ рдЪрд┐рдиреНрд╣ рд▓рдЧрд╛рдЙрдиреЗрдЫ;
  • Redis, рдЬрд╕рд▓реЗ рд╕реЗрд▓рд░реАрдХреЛ рд▓рд╛рдЧрд┐ рдЯрд╛рд╕реНрдХ рдмреНрд░реЛрдХрд░рдХреЛ рд░реВрдкрдорд╛ рдХрд╛рдо рдЧрд░реНрдиреЗрдЫ;
  • рд╕реЗрд▓реЗрд░реА рдХрд╛рдорджрд╛рд░, рдЬреБрди рдХрд╛рд░реНрдпрд╣рд░реВрдХреЛ рдкреНрд░рддреНрдпрдХреНрд╖ рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрдирдорд╛ рд╕рдВрд▓рдЧреНрди рд╣реБрдиреЗрдЫред
  • рдлреЛрд▓реНрдбрд░рдорд╛ ./dags рд╣рд╛рдореА dags рдХреЛ рд╡рд┐рд╡рд░рдг рд╕рдВрдЧ рд╣рд╛рдореНрд░реЛ рдлрд╛рдЗрд▓рд╣рд░реВ рдердкреНрдиреЗрдЫреМрдВред рддрд┐рдиреАрд╣рд░реВ рдЙрдбреНрдиреЗ рдХреНрд░рдордорд╛ рдЙрдард╛рдЗрдиреЗрдЫрдиреН, рддреНрдпрд╕реИрд▓реЗ рдкреНрд░рддреНрдпреЗрдХ рд╣рд╛рдЫреНрдпреБрдБ рдкрдЫрд┐ рд╕рдореНрдкреВрд░реНрдг рд╕реНрдЯреНрдпрд╛рдХрд▓рд╛рдИ рдЬрдЧрд▓ рдЧрд░реНрди рдЖрд╡рд╢реНрдпрдХ рдЫреИрдиред

рдХреЗрд╣реА рдард╛рдЙрдБрд╣рд░реВрдорд╛, рдЙрджрд╛рд╣рд░рдгрд╣рд░реВрдорд╛ рдХреЛрдб рдкреВрд░реНрдг рд░реВрдкрдорд╛ рджреЗрдЦрд╛рдЗрдПрдХреЛ рдЫреИрди (рдкрд╛рда рдЕрд╡реНрдпрд╡рд╕реНрдерд┐рдд рдирд╣реЛрд╕реН) рддрд░ рдХрддреИ рдпреЛ рдкреНрд░рдХреНрд░рд┐рдпрд╛рдорд╛ рдкрд░рд┐рдорд╛рд░реНрдЬрди рдЧрд░рд┐рдПрдХреЛ рдЫред рдкреВрд░реНрдг рдХрд╛рд░реНрдп рдХреЛрдб рдЙрджрд╛рд╣рд░рдгрд╣рд░реВ рднрдгреНрдбрд╛рд░рдорд╛ рдлреЗрд▓рд╛ рдкрд╛рд░реНрди рд╕рдХрд┐рдиреНрдЫ https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

рдЯрд┐рдкреНрдкрдгреАрд╣рд░реВ:

  • рд░рдЪрдирд╛ рдХреЛ рд╡рд┐рдзрд╛рдирд╕рднрд╛ рдорд╛, рдо рдзреЗрд░реИ рд╣рджрд╕рдореНрдо рдкреНрд░рд╕рд┐рджреНрдз рдЫрд╡рд┐ рдорд╛ рдирд┐рд░реНрднрд░ рдерд┐рдП puckel/docker- airflow - рдпрд╕рд▓рд╛рдИ рдЬрд╛рдБрдЪ рдЧрд░реНрди рдирд┐рд╢реНрдЪрд┐рдд рд╣реБрдиреБрд╣реЛрд╕реНред рд╕рд╛рдпрдж рддрдкрд╛рдИрд▓рд╛рдИ рддрдкрд╛рдИрдХреЛ рдЬреАрд╡рдирдорд╛ рдЕрд░реБ рдХреЗрд╣рд┐ рдЪрд╛рд╣рд┐рджреИрдиред
  • рд╕рдмреИ рдПрдпрд░рдлреНрд▓реЛ рд╕реЗрдЯрд┐рдЩрд╣рд░реВ рдорд╛рд░реНрдлрдд рдорд╛рддреНрд░ рдЙрдкрд▓рдмреНрдз рдЫреИрдирдиреН airflow.cfg, рддрд░ рд╡рд╛рддрд╛рд╡рд░рдг рдЪрд░ рдорд╛рд░реНрдлрдд рдкрдирд┐ (рд╡рд┐рдХрд╛рд╕рдХрд░реНрддрд╛рд╣рд░реВрд▓рд╛рдИ рдзрдиреНрдпрд╡рд╛рдж), рдЬреБрди рдореИрд▓реЗ рджреБрд░реНрднрд╛рд╡рдирд╛рдкреВрд░реНрдг рд░реВрдкрдорд╛ рдлрд╛рдЗрджрд╛ рдЙрдард╛рдПрдБред
  • рд╕реНрд╡рд╛рднрд╛рд╡рд┐рдХ рд░реВрдкрдорд╛, рдпреЛ рдЙрддреНрдкрд╛рджрди-рддрдпрд╛рд░ рдЫреИрди: рдореИрд▓реЗ рдЬрд╛рдирд╛рдЬрд╛рдиреА рдХрдиреНрдЯреЗрдирд░рд╣рд░реВрдорд╛ рдореБрдЯреБрдХреЛ рдзрдбреНрдХрдирд╣рд░реВ рд░рд╛рдЦрд┐рди, рдореИрд▓реЗ рд╕реБрд░рдХреНрд╖рд╛рдХреЛ рд╕рд╛рде рдЪрд┐рдиреНрддрд╛ рдЧрд░реЗрдиред рддрд░ рдореИрд▓реЗ рд╣рд╛рдореНрд░рд╛ рдкреНрд░рдпреЛрдЧрдХрд░реНрддрд╛рд╣рд░реВрдХреЛ рд▓рд╛рдЧрд┐ рдиреНрдпреВрдирддрдо рдЙрдкрдпреБрдХреНрдд рдЧрд░реЗрдВред
  • рдиреЛрдЯ рдЧрд░реНрдиреБрд╣реЛрд╕реН рдХрд┐:
    • рдбреЗрдЧ рдлреЛрд▓реНрдбрд░ рдЕрдиреБрд╕реВрдЪрдХ рд░ рдХрд╛рд░реНрдпрдХрд░реНрддрд╛ рджреБрд╡реИрдХреЛ рд▓рд╛рдЧрд┐ рдкрд╣реБрдБрдЪрдпреЛрдЧреНрдп рд╣реБрдиреБрдкрд░реНрдЫред
    • рдпреЛ рд╕рдмреИ рддреЗрд╕реНрд░реЛ-рдкрдХреНрд╖ рдкреБрд╕реНрддрдХрд╛рд▓рдпрд╣рд░реВрдорд╛ рд▓рд╛рдЧреВ рд╣реБрдиреНрдЫ - рддрд┐рдиреАрд╣рд░реВ рд╕рдмреИ рдПрдХ рдЕрдиреБрд╕реВрдЪрдХ рд░ рдХрд╛рдорджрд╛рд░рд╣рд░реВ рд╕рдВрдЧ рдореЗрд╢рд┐рди рдорд╛ рд╕реНрдерд╛рдкрд┐рдд рд╣реБрдиреБрдкрд░реНрдЫред

рдЦреИрд░, рдЕрдм рдпреЛ рд╕рд░рд▓ рдЫ:

$ docker-compose up --scale worker=3

рд╕рдмреИ рдХреБрд░рд╛ рдЙрдареЗ рдкрдЫрд┐, рддрдкрд╛рдЗрдБ рд╡реЗрдм рдЗрдиреНрдЯрд░рдлреЗрд╕рд╣рд░реВ рд╣реЗрд░реНрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ:

рдЖрдзрд╛рд░рднреВрдд рдЕрд╡рдзрд╛рд░рдгрд╛рд╣рд░реВ

рдпрджрд┐ рддрдкрд╛рдИрдВрд▓реЗ рдпреА рд╕рдмреИ "рдбрд╛рдЧрд╣рд░реВ" рдорд╛ рдХреЗрд╣реА рдмреБрдЭреНрдиреБрднрдПрдХреЛ рдЫреИрди рднрдиреЗ, рдпрд╣рд╛рдБ рдЫреЛрдЯреЛ рд╢рдмреНрджрдХреЛрд╢ рдЫ:

  • рдЕрдиреБрд╕реВрдЪреА - рдПрдпрд░рдлреНрд▓реЛрдорд╛ рд╕рдмреИрднрдиреНрджрд╛ рдорд╣рддреНрддреНрд╡рдкреВрд░реНрдг рдЕрдВрдХрд▓, рд░реЛрдмреЛрдЯрд▓реЗ рдХрдбрд╛ рдореЗрд╣рдирдд рдЧрд░реНрдЫ рднрдиреНрдиреЗ рдирд┐рдпрдиреНрддреНрд░рдг рдЧрд░реНрджреИ, рд╡реНрдпрдХреНрддрд┐ рд╣реЛрдЗрди: рддрд╛рд▓рд┐рдХрд╛ рдЕрдиреБрдЧрдорди рдЧрд░реНрджрдЫ, рдбреНрдпрд╛рдЧрд╣рд░реВ рдЕрдкрдбреЗрдЯ рдЧрд░реНрджрдЫ, рдХрд╛рд░реНрдпрд╣рд░реВ рд╕реБрд░реВ рдЧрд░реНрджрдЫред

    рд╕рд╛рдорд╛рдиреНрдпрддрдпрд╛, рдкреБрд░рд╛рдиреЛ рд╕рдВрд╕реНрдХрд░рдгрд╣рд░реВрдорд╛, рдЙрд╕рд▓рд╛рдИ рдореЗрдореЛрд░реАрдорд╛ рд╕рдорд╕реНрдпрд╛ рдерд┐рдпреЛ (рд╣реЛрдЗрди, рдПрдореНрдиреЗрд╕рд┐рдпрд╛ рд╣реЛрдЗрди, рддрд░ рд▓реАрдХ) рд░ рд╡рд┐рд░рд╛рд╕рдд рдкреНрдпрд╛рд░рд╛рдорд┐рдЯрд░ рдкрдирд┐ рдХрдиреНрдлрд┐рдЧрдорд╛ рд░рд╣реНрдпреЛред run_duration - рдпрд╕рдХреЛ рдкреБрди: рд╕реБрд░реБ рдЕрдиреНрддрд░рд╛рд▓ред рддрд░ рдЕрд╣рд┐рд▓реЗ рд╕рдмреИ рдареАрдХ рдЫред

  • DAG (рдЙрд░реНрдл "рдбреЗрдЧ") - "рдирд┐рд░реНрджреЗрд╢рд┐рдд рдПрд╕рд╛рдЗрдХреНрд▓рд┐рдХ рдЧреНрд░рд╛рдл", рддрд░ рдпрд╕реНрддреЛ рдкрд░рд┐рднрд╛рд╖рд╛рд▓реЗ рдереЛрд░реИ рдорд╛рдирд┐рд╕рд╣рд░реВрд▓рд╛рдИ рдмрддрд╛рдЙрдиреЗрдЫ, рддрд░ рд╡рд╛рд╕реНрддрд╡рдорд╛ рдпреЛ рдПрдХрдЕрд░реНрдХрд╛рд╕рдБрдЧ рдЕрдиреНрддрд░рдХреНрд░рд┐рдпрд╛ рдЧрд░реНрдиреЗ рдХрд╛рд░реНрдпрд╣рд░реВрдХрд╛ рд▓рд╛рдЧрд┐ рдХрдиреНрдЯреЗрдирд░ рд╣реЛ (рддрд▓ рд╣реЗрд░реНрдиреБрд╣реЛрд╕реН) рд╡рд╛ SSIS рдорд╛ рдкреНрдпрд╛рдХреЗрдЬрдХреЛ рдПрдирд╛рд▓рдЧ рд░ Informatica рдорд╛ Workflowред ред

    Dags рдХреЛ рдЕрддрд┐рд░рд┐рдХреНрдд, рддреНрдпрд╣рд╛рдБ рдЕрдЭреИ рдкрдирд┐ subdags рд╣реБрди рд╕рдХреНрдЫ, рддрд░ рд╣рд╛рдореА рд╕рдореНрднрд╡рддрдГ рддрд┐рдиреАрд╣рд░реВрдорд╛ рдкреБрдЧреНрди рд╕рдХреНрджреИрдиреМрдВред

  • DAG рд░рди - рдкреНрд░рд╛рд░рдореНрднрд┐рдХ рдбреЗрдЧ, рдЬреБрди рдЖрдлреНрдиреИ рддреЛрдХрд┐рдПрдХреЛ рдЫ execution_dateред рдПрдЙрдЯреИ рдбреНрдпрд╛рдЧрдХрд╛ рдбрдЧреНрд░рд╛рдирд╣рд░реВрд▓реЗ рд╕рдорд╛рдирд╛рдиреНрддрд░ рд░реВрдкрдорд╛ рдХрд╛рдо рдЧрд░реНрди рд╕рдХреНрдЫрдиреН (рдпрджрд┐ рддрдкрд╛рдИрдВрд▓реЗ рдЖрдлреНрдиреЛ рдХрд╛рд░реНрдпрд╣рд░реВ рдЕрд╢рдХреНрдд рдмрдирд╛рдЙрдиреБ рднрдПрдХреЛ рдЫ рднрдиреЗ, рдЕрд╡рд╢реНрдп)ред
  • рдЕрдкрд░реЗрдЯрд░ рдХреЛрдб рдХреЛ рдЯреБрдХреНрд░рд╛рд╣рд░реБ рдПрдХ рд╡рд┐рд╢рд┐рд╖реНрдЯ рдХрд╛рд░реНрдп рдкреНрд░рджрд░реНрд╢рди рдХреЛ рд▓рд╛рдЧреА рдЬрд┐рдореНрдореЗрд╡рд╛рд░ рдЫрдиреНред рддреНрдпрд╣рд╛рдБ рддреАрди рдкреНрд░рдХрд╛рд░рдХрд╛ рдЕрдкрд░реЗрдЯрд░рд╣рд░реВ рдЫрдиреН:
    • рдХрд╛рд░реНрдпрд╣рд╛рдореНрд░реЛ рдордирдкрд░реНрдиреЗ рдЬрд╕реНрддреИ PythonOperator, рдЬрд╕рд▓реЗ рдХреБрдиреИ рдкрдирд┐ (рд╡реИрдз) рдкрд╛рдЗрдерди рдХреЛрдб рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рдЧрд░реНрди рд╕рдХреНрдЫ;
    • рд╕реНрдерд╛рдирд╛рдиреНрддрд░рдг рдЧрд░реНрдиреБрд╣реЛрд╕реН, рдЬрд╕рд▓реЗ рдбрд╛рдЯрд╛ рдПрдХ рдард╛рдЙрдБрдмрд╛рдЯ рдЕрд░реНрдХреЛ рд╕реНрдерд╛рдирдорд╛ рдвреБрд╡рд╛рдиреА рдЧрд░реНрдЫ, рднрдиреНрдиреБрд╣реЛрд╕реН, MsSqlToHiveTransfer;
    • рд╕реЗрдиреНрд╕рд░ рдЕрд░реНрдХреЛрддрд░реНрдл, рдпрд╕рд▓реЗ рддрдкрд╛рдИрдВрд▓рд╛рдИ рдкреНрд░рддрд┐рдХреНрд░рд┐рдпрд╛ рджрд┐рди рд╡рд╛ рдШрдЯрдирд╛ рдирднрдПрд╕рдореНрдо рдбреЗрдЧрдХреЛ рдЕрд░реНрдХреЛ рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрдирд▓рд╛рдИ рдврд┐рд▓реЛ рдЧрд░реНрди рдЕрдиреБрдорддрд┐ рджрд┐рдиреНрдЫред HttpSensor рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдЕрдиреНрддрд┐рдо рдмрд┐рдиреНрджреБ рддрд╛рдиреНрди рд╕рдХреНрдЫ, рд░ рдЬрдм рдЗрдЪреНрдЫрд┐рдд рдкреНрд░рддрд┐рдХреНрд░рд┐рдпрд╛ рдкрд░реНрдЦрд┐рд░рд╣реЗрдХреЛ рдЫ, рд╕реНрдерд╛рдирд╛рдиреНрддрд░рдг рд╕реБрд░реБ рдЧрд░реНрдиреБрд╣реЛрд╕реН GoogleCloudStorageToS3Operatorред рдЬрд┐рдЬреНрдЮрд╛рд╕реБ рдордирд▓реЗ рд╕реЛрдзреНрдиреЗрдЫ: "рдХрд┐рди? рдЖрдЦрд┐рд░, рддрдкрд╛рдИрд▓реЗ рдЕрдкрд░реЗрдЯрд░рдорд╛ рдкреБрдирд░рд╛рд╡реГрддреНрддрд┐рд╣рд░реВ рдЧрд░реНрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ! тАЭ рд░ рддреНрдпрд╕рдкрдЫрд┐, рдирд┐рд▓рдореНрдмрд┐рдд рдЕрдкрд░реЗрдЯрд░рд╣рд░реВрд╕рдБрдЧ рдХрд╛рд░реНрдпрд╣рд░реВрдХреЛ рдкреЛрдЦрд░реА рд░реЛрдХреНрдирдХреЛ рд▓рд╛рдЧрд┐ред рд╕реЗрдиреНрд╕рд░ рд╕реБрд░реБ рд╣реБрдиреНрдЫ, рдЬрд╛рдБрдЪ рдЧрд░реНрдЫ рд░ рдЕрд░реНрдХреЛ рдкреНрд░рдпрд╛рд╕ рдЕрдШрд┐ рдорд░реНрдЫред
  • рдХрд╛рд░реНрдп - рдШреЛрд╖рд┐рдд рдЕрдкрд░реЗрдЯрд░рд╣рд░реВ, рдкреНрд░рдХрд╛рд░рдХреЛ рдкрд░реНрд╡рд╛рд╣ рдирдЧрд░реА, рд░ рдбреЗрдЧрдорд╛ рд╕рдВрд▓рдЧреНрди рдХрд╛рд░реНрдпрдХреЛ рд╢реНрд░реЗрдгреАрдорд╛ рдмрдвреБрд╡рд╛ рдЧрд░рд┐рдиреНрдЫред
  • рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдг - рдЬрдм рд╕рд╛рдорд╛рдиреНрдп рдпреЛрдЬрдирд╛рдХрд╛рд░рд▓реЗ рдирд┐рд░реНрдгрдп рдЧрд░реЗ рдХрд┐ рдпреЛ рдХрд╛рд░реНрдпрдХрд░реНрддрд╛-рдХрд╛рдорджрд╛рд░рд╣рд░реВрдорд╛ рдпреБрджреНрдзрдорд╛ рдХрд╛рд░реНрдпрд╣рд░реВ рдкрдард╛рдЙрдиреЗ рд╕рдордп рд╣реЛ (рдпрджрд┐ рд╣рд╛рдореА рдкреНрд░рдпреЛрдЧ рдЧрд░реНрдЫреМрдВ LocalExecutor рд╡рд╛ рдХреЛ рдорд╛рдорд▓рд╛ рдорд╛ рдПрдХ рд░рд┐рдореЛрдЯ рдиреЛрдб рдорд╛ CeleryExecutor), рдпрд╕рд▓реЗ рддрд┐рдиреАрд╣рд░реВрд▓рд╛рдИ рд╕рдиреНрджрд░реНрдн рдкреНрд░рджрд╛рди рдЧрд░реНрджрдЫ (рдЕрд░реНрдерд╛рдд, рдЪрд░рд╣рд░реВрдХреЛ рд╕реЗрдЯ - рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рдкреНрдпрд╛рд░рд╛рдорд┐рдЯрд░рд╣рд░реВ), рдЖрджреЗрд╢ рд╡рд╛ рдХреНрд╡реЗрд░реА рдЯреЗрдореНрдкреНрд▓реЗрдЯрд╣рд░реВ рд╡рд┐рд╕реНрддрд╛рд░ рдЧрд░реНрджрдЫ, рд░ рддрд┐рдиреАрд╣рд░реВрд▓рд╛рдИ рдкреВрд▓ рдЧрд░реНрджрдЫред

рд╣рд╛рдореА рдХрд╛рд░реНрдпрд╣рд░реВ рдЙрддреНрдкрдиреНрди рдЧрд░реНрдЫреМрдВ

рдкрд╣рд┐рд▓реЗ, рд╣рд╛рдореНрд░реЛ рдбрдЧрдХреЛ рд╕рд╛рдорд╛рдиреНрдп рдпреЛрдЬрдирд╛рд▓рд╛рдИ рд░реВрдкрд░реЗрдЦрд╛ рдЧрд░реМрдВ, рд░ рддреНрдпрд╕рдкрдЫрд┐ рд╣рд╛рдореА рдердк рд╡рд┐рд╡рд░рдгрд╣рд░реВрдорд╛ рдбреБрдмреНрдиреЗрдЫреМрдВ, рдХрд┐рдирднрдиреЗ рд╣рд╛рдореА рдХреЗрд╣реА рдЧреИрд░-рддреБрдЪреНрдЫ рд╕рдорд╛рдзрд╛рдирд╣рд░реВ рд▓рд╛рдЧреВ рдЧрд░реНрдЫреМрдВред

рддреНрдпрд╕реЛрднрдП, рдпрд╕рдХреЛ рд╕рд░рд▓ рд░реВрдкрдорд╛, рдпрд╕реНрддреЛ рдбреЗрдЧ рдпрд╕реНрддреЛ рджреЗрдЦрд┐рдиреНрдЫ:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

рдЖрдЙрдиреБрд╣реЛрд╕реН рдпреЛ рдкрддреНрддрд╛ рд▓рдЧрд╛рдЙрдиреБрд╣реЛрд╕реН:

  • рдкрд╣рд┐рд▓реЗ, рд╣рд╛рдореА рдЖрд╡рд╢реНрдпрдХ libs рд░ рдЖрдпрд╛рдд рдЧрд░реНрдЫреМрдВ рдЕрд░реБ рдХреЗрд╣реА;
  • sql_server_ds рд╣реЛ List[namedtuple[str, str]] рдПрдпрд░рдлреНрд▓реЛ рдЬрдбрд╛рдирд╣рд░реВрдмрд╛рдЯ рдЬрдбрд╛рдирд╣рд░реВрдХреЛ рдирд╛рдо рд░ рдбрд╛рдЯрд╛рдмреЗрд╕рд╣рд░реВ рдЬрд╕рдмрд╛рдЯ рд╣рд╛рдореАрд▓реЗ рд╣рд╛рдореНрд░реЛ рдкреНрд▓реЗрдЯ рд▓рд┐рдиреЗрдЫреМрдВ;
  • dag - рд╣рд╛рдореНрд░реЛ рдбреЗрдЧрдХреЛ рдШреЛрд╖рдгрд╛, рдЬреБрди рдЕрдирд┐рд╡рд╛рд░реНрдп рд░реВрдкрдорд╛ рд╣реБрдиреБрдкрд░реНрдЫ globals(), рдЕрдиреНрдпрдерд╛ Airflow рд▓реЗ рдлреЗрд▓рд╛ рдкрд╛рд░реНрдиреЗ рдЫреИрдиред рдбрдЧ рдкрдирд┐ рднрдиреНрди рдЖрд╡рд╢реНрдпрдХ рдЫ:
    • рдЙрд╕рдХреЛ рдирд╛рдо рдХреЗ рд╣реЛ orders - рдпреЛ рдирд╛рдо рд╡реЗрдм рдЗрдиреНрдЯрд░рдлреЗрд╕рдорд╛ рджреЗрдЦрд╛ рдкрд░реНрдиреЗрдЫ,
    • рдЙрдирд▓реЗ рд╕рд╛рдЙрди рео рдЧрддреЗ рдордзреНрдпрд░рд╛рддрдмрд╛рдЯ рдХрд╛рдо рдЧрд░реНрдиреЗ
    • рд░ рдпреЛ рдЪрд▓реНрдиреБ рдкрд░реНрдЫ, рд▓рдЧрднрдЧ рд╣рд░реЗрдХ 6 рдШрдгреНрдЯрд╛ (рдпрд╣рд╛рдБ рдХрдбрд╛ рдорд╛рдиреНрдЫреЗ рдХреЛ рд▓рд╛рдЧрд┐ рдХреЛ рд╕рдЯреНрдЯрд╛ timedelta() рд╕реНрд╡реАрдХрд╛рд░реНрдп cron-рд▓рд╛рдЗрди 0 0 0/6 ? * * *, рдХрдо рд╢рд╛рдиреНрдд рдХреЛ рд▓рд╛рдЧреА - рдПрдХ рдЕрднрд┐рд╡реНрдпрдХреНрддрд┐ рдЬрд╕реНрддреИ @daily);
  • workflow() рдореБрдЦреНрдп рдХрд╛рдо рдЧрд░реНрдиреЗрдЫ, рддрд░ рдЕрд╣рд┐рд▓реЗ рд╣реЛрдЗрдиред рдЕрд╣рд┐рд▓реЗрдХреЛ рд▓рд╛рдЧрд┐, рд╣рд╛рдореА рд╣рд╛рдореНрд░реЛ рд╕рдиреНрджрд░реНрднрд▓рд╛рдИ рд▓рдЧрдорд╛ рдбрдореНрдк рдЧрд░реНрдиреЗрдЫреМрдВред
  • рд░ рдЕрдм рдХрд╛рд░реНрдпрд╣рд░реВ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдиреЗ рд╕рд░рд▓ рдЬрд╛рджреВ:
    • рд╣рд╛рдореА рд╣рд╛рдореНрд░рд╛ рд╕реНрд░реЛрддрд╣рд░реВ рдорд╛рд░реНрдлрдд рдЪрд▓реНрдЫреМрдВ;
    • рдкреНрд░рд╛рд░рдореНрдн PythonOperator, рдЬрд╕рд▓реЗ рд╣рд╛рдореНрд░реЛ рдбрдореАрд▓рд╛рдИ рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рдЧрд░реНрдиреЗрдЫ workflow()ред рдХрд╛рд░реНрдпрдХреЛ рдПрдХ рдЕрджреНрд╡рд┐рддреАрдп (рдбреЗрдЧ рднрд┐рддреНрд░) рдирд╛рдо рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдЧрд░реНрди рдирдмрд┐рд░реНрд╕рдиреБрд╣реЛрд╕реН рд░ рдбреНрдпрд╛рдЧ рдЖрдлреИ рдмрд╛рдБрдзреНрдиреБрд╣реЛрд╕реНред рдЭрдгреНрдбрд╛ provide_context рдмрд╛рд░реАрдорд╛, рдкреНрд░рдХрд╛рд░реНрдпрдорд╛ рдердк рддрд░реНрдХрд╣рд░реВ рдЦрдиреНрдпрд╛рдЙрдиреЗрдЫ, рдЬреБрди рд╣рд╛рдореАрд▓реЗ рд╕рд╛рд╡рдзрд╛рдиреАрдкреВрд░реНрд╡рдХ рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрд░ рд╕рдЩреНрдХрд▓рди рдЧрд░реНрдиреЗрдЫреМрдВ **context.

рдЕрд╣рд┐рд▓реЗрдХреЛ рд▓рд╛рдЧрд┐, рдпрддрд┐ рдорд╛рддреНрд░ рд╣реЛред рд╣рд╛рдореАрд▓реЗ рдХреЗ рдкрд╛рдпреМрдВ:

  • рд╡реЗрдм рдЗрдиреНрдЯрд░рдлреЗрд╕рдорд╛ рдирдпрд╛рдБ рдбреЗрдЧ,
  • рдбреЗрдв рд╕рдп рдХрд╛рд░реНрдпрд╣рд░реВ рд╕рдорд╛рдирд╛рдиреНрддрд░ рд░реВрдкрдорд╛ рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рдЧрд░рд┐рдиреЗрдЫ (рдпрджрд┐ рдПрдпрд░рдлреНрд▓реЛ, рд╕реЗрд▓рд░реА рд╕реЗрдЯрд┐рдЩрд╣рд░реВ рд░ рд╕рд░реНрднрд░ рдХреНрд╖рдорддрд╛рд▓реЗ рдЕрдиреБрдорддрд┐ рджрд┐рдиреНрдЫ)ред

рдЦреИрд░, рд▓рдЧрднрдЧ рдпреЛ рдмреБрдЭреНрдпреЛред

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ
рдХрд╕рд▓реЗ рдирд┐рд░реНрднрд░рддрд╛рд╣рд░реВ рд╕реНрдерд╛рдкрдирд╛ рдЧрд░реНрдиреЗрдЫ?

рдпреЛ рд╕рдмреИ рдХреБрд░рд╛рд▓рд╛рдИ рд╕рд░рд▓ рдмрдирд╛рдЙрди, рдо рднрд┐рддреНрд░ рдкрд╕реЗрдВ docker-compose.yml рдкреНрд░рд╢реЛрдзрди рдЧрд░реНрджреИ requirements.txt рд╕рдмреИ рдиреЛрдбрд╣рд░реВрдорд╛ред

рдЕрдм рдпреЛ рдЧрдпреЛ:

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

рдЧреНрд░реЗ рд╕реНрдХреНрд╡рд╛рдпрд░рд╣рд░реВ рдЕрдиреБрд╕реВрдЪрдХрджреНрд╡рд╛рд░рд╛ рдкреНрд░рд╢реЛрдзрди рдЧрд░рд┐рдПрдХрд╛ рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдгрд╣рд░реВ рд╣реБрдиреНред

рд╣рд╛рдореА рдереЛрд░реИ рдкрд░реНрдЦрдиреНрдЫреМрдВ, рдХрд╛рд░реНрдпрд╣рд░реВ рдХрд╛рдорджрд╛рд░рд╣рд░реВ рджреНрд╡рд╛рд░рд╛ рд╕реНрдиреНрдпрд╛рдк рдЧрд░рд┐рдПрдХреЛ рдЫ:

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

рдирд┐рд╕реНрд╕рдиреНрджреЗрд╣, рд╣рд░рд┐рдпреЛрд▓реЗ рд╕рдлрд▓рддрд╛рдкреВрд░реНрд╡рдХ рдЖрдлреНрдиреЛ рдХрд╛рдо рдкреВрд░рд╛ рдЧрд░реЗрдХреЛ рдЫред рд░рд╛рддреЛ рдзреЗрд░реИ рд╕рдлрд▓ рдЫреИрдирдиреНред

рд╡реИрд╕реЗ, рд╣рд╛рдореНрд░реЛ рдЙрддреНрдкрд╛рджрди рдорд╛ рдХреБрдиреИ рдлреЛрд▓реНрдбрд░ рдЫреИрди ./dags, рдореЗрд╢рд┐рдирд╣рд░реВ рдмреАрдЪ рдХреБрдиреИ рд╕рд┐рдВрдХреНрд░реЛрдирд╛рдЗрдЬреЗрд╕рди рдЫреИрди - рд╕рдмреИ рдбреНрдпрд╛рдЧрд╣рд░реВ рдЫрдиреН git рд╣рд╛рдореНрд░реЛ Gitlab рдорд╛, рд░ Gitlab CI рд▓реЗ рдорд░реНрдЬ рдЧрд░реНрджрд╛ рдореЗрд╕рд┐рдирд╣рд░реВрдорд╛ рдЕрджреНрдпрд╛рд╡рдзрд┐рдХрд╣рд░реВ рд╡рд┐рддрд░рдг рдЧрд░реНрджрдЫ master.

рдлреВрд▓рдХреЛ рдмрд╛рд░реЗрдорд╛ рдереЛрд░реИ

рдЬрдм рдХрд╛рдорджрд╛рд░рд╣рд░реВрд▓реЗ рд╣рд╛рдореНрд░реЛ рдкреНрдпрд╛рд╕рд┐рдлрд╛рдпрд░рд╣рд░реВ рдкрд┐рдЯрд┐рд░рд╣реЗрдХрд╛ рдЫрдиреН, рдЖрдЙрдиреБрд╣реЛрд╕реН, рд╣рд╛рдореАрд▓рд╛рдИ рдХреЗрд╣рд┐ рджреЗрдЦрд╛рдЙрди рд╕рдХреНрдиреЗ рдЕрд░реНрдХреЛ рдЙрдкрдХрд░рдг рд╕рдореНрдЭреМрдВ - рдлреВрд▓ред

рдХрд╛рд░реНрдпрдХрд░реНрддрд╛ рдиреЛрдбрд╣рд░реВрдорд╛ рд╕рд╛рд░рд╛рдВрд╢ рдЬрд╛рдирдХрд╛рд░реАрдХреЛ рд╕рд╛рде рдкрд╣рд┐рд▓реЛ рдкреГрд╖реНрда:

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

рдХрд╛рдордорд╛ рдЧрдПрдХрд╛ рдХрд╛рд░реНрдпрд╣рд░реВрдХреЛ рд╕рд╛рде рд╕рдмреИрднрдиреНрджрд╛ рддреАрд╡реНрд░ рдкреГрд╖реНрда:

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

рд╣рд╛рдореНрд░реЛ рджрд▓рд╛рд▓ рдХреЛ рд╕реНрдерд┐рддрд┐ рд╕рдВрдЧ рд╕рдмреИ рднрдиреНрджрд╛ рдмреЛрд░рд┐рдВрдЧ рдкреГрд╖реНрда:

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

рдЙрдЬреНрдпрд╛рд▓реЛ рдкреГрд╖реНрда рдХрд╛рд░реНрдп рд╕реНрдерд┐рддрд┐ рдЧреНрд░рд╛рдл рд░ рддрд┐рдиреАрд╣рд░реВрдХреЛ рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рд╕рдордп рд╕рдВрдЧ рдЫ:

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

рд╣рд╛рдореА рдЕрдиреНрдбрд░рд▓реЛрдб рд▓реЛрдб рдЧрд░реНрдЫреМрдВ

рддреНрдпрд╕реЛрднрдП, рд╕рдмреИ рдХрд╛рд░реНрдпрд╣рд░реВ рдХрд╛рдо рдЧрд░рд┐рд╕рдХреЗрдХрд╛ рдЫрдиреН, рддрдкрд╛рдИрдВ рдШрд╛рдЗрддреЗрд╣рд░реВрд▓рд╛рдИ рдЯрд╛рдврд╛ рд▓реИрдЬрд╛рди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫред

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

рд░ рддреНрдпрд╣рд╛рдБ рдзреЗрд░реИ рдШрд╛рдЗрддреЗ рдерд┐рдП - рдПрдХ рдХрд╛рд░рдг рд╡рд╛ рдЕрд░реНрдХреЛ рд▓рд╛рдЧрд┐ред Airflow рдХреЛ рд╕рд╣реА рдкреНрд░рдпреЛрдЧрдХреЛ рдорд╛рдорд▓рд╛рдорд╛, рдпреА рдзреЗрд░реИ рд╡рд░реНрдЧрд╣рд░реВрд▓реЗ рдбреЗрдЯрд╛ рдирд┐рд╢реНрдЪрд┐рдд рд░реВрдкрдорд╛ рдЖрдЗрдкреБрдЧреЗрди рднрдиреЗрд░ рд╕рдВрдХреЗрдд рдЧрд░реНрджрдЫред

рддрдкрд╛рдИрдВрд▓реЗ рд▓рдЧ рд╣реЗрд░реНрдиреБ рдкрд░реНрдЫ рд░ рдкрддрд┐рдд рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдгрд╣рд░реВ рдкреБрди: рд╕реБрд░реБ рдЧрд░реНрди рдЖрд╡рд╢реНрдпрдХ рдЫред

рдХреБрдиреИ рдкрдирд┐ рд╡рд░реНрдЧрдорд╛ рдХреНрд▓рд┐рдХ рдЧрд░реЗрд░, рд╣рд╛рдореА рд╣рд╛рдореНрд░реЛ рд▓рд╛рдЧрд┐ рдЙрдкрд▓рдмреНрдз рдХрд╛рд░реНрдпрд╣рд░реВ рджреЗрдЦреНрдиреЗрдЫреМрдВ:

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

рддрдкрд╛рдИрдВ рд▓рд┐рди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ рд░ рдкрддрди рдЦрд╛рд▓реА рдЧрд░реНрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫред рддреНрдпреЛ рд╣реЛ, рд╣рд╛рдореА рдмрд┐рд░реНрд╕рдиреНрдЫреМрдВ рдХрд┐ рддреНрдпрд╣рд╛рдБ рдХреЗрд╣рд┐ рдЕрд╕рдлрд▓ рднрдПрдХреЛ рдЫ, рд░ рдЙрд╣реА рдЙрджрд╛рд╣рд░рдг рдХрд╛рд░реНрдп рдЕрдиреБрд╕реВрдЪрдХрдорд╛ рдЬрд╛рдиреНрдЫред

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

рдпреЛ рд╕реНрдкрд╖реНрдЯ рдЫ рдХрд┐ рд╕рдмреИ рд░рд╛рддреЛ рд╡рд░реНрдЧрд╣рд░реБ рд╕рдВрдЧ рдорд╛рдЙрд╕ рд╕рдВрдЧ рдпреЛ рдЧрд░реНрди рдзреЗрд░реИ рдорд╛рдирд╡реАрдп рдЫреИрди - рдпреЛ рд╣рд╛рдореАрд▓реЗ Airflow рдмрд╛рдЯ рдЕрдкреЗрдХреНрд╖рд╛ рдХреЗ рд╣реЛрдЗрдиред рд╕реНрд╡рд╛рднрд╛рд╡рд┐рдХ рд░реВрдкрдорд╛, рд╣рд╛рдореАрд╕рдБрдЧ рд╕рд╛рдореВрд╣рд┐рдХ рд╡рд┐рдирд╛рд╢рдХрд╛ рд╣рддрд┐рдпрд╛рд░рд╣рд░реВ рдЫрдиреН: Browse/Task Instances

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

рдПрдХреИрдЪреЛрдЯрд┐ рд╕рдмреИ рдХреБрд░рд╛ рдЪрдпрди рдЧрд░реМрдВ рд░ рд╢реВрдиреНрдпрдорд╛ рд░рд┐рд╕реЗрдЯ рдЧрд░реМрдВ, рд╕рд╣реА рд╡рд╕реНрддреБрдорд╛ рдХреНрд▓рд┐рдХ рдЧрд░реНрдиреБрд╣реЛрд╕реН:

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

рд╕рдлрд╛ рдЧрд░рд┐рд╕рдХреЗрдкрдЫрд┐, рд╣рд╛рдореНрд░рд╛ рдЯреНрдпрд╛рдХреНрд╕реАрд╣рд░реВ рдпрд╕реНрддреЛ рджреЗрдЦрд┐рдиреНрдЫрдиреН (рддрд┐рдиреАрд╣рд░реВ рдкрд╣рд┐рд▓реЗ рдиреИ рдЕрдиреБрд╕реВрдЪрд┐рддрдХрд░реНрддрд╛рд▓реЗ рддрд┐рдиреАрд╣рд░реВрд▓рд╛рдИ рддрд╛рд▓рд┐рдХрд╛ рдмрдирд╛рдЙрдирдХреЛ рд▓рд╛рдЧрд┐ рдкрд░реНрдЦрд┐рд░рд╣реЗрдХрд╛ рдЫрдиреН):

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

рдЬрдбрд╛рдирд╣рд░реВ, рд╣реБрдХрд╣рд░реВ рд░ рдЕрдиреНрдп рдЪрд░рд╣рд░реВ

рдпреЛ рдЕрд░реНрдХреЛ DAG рд╣реЗрд░реНрдиреЗ рд╕рдордп рд╣реЛ, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""╨У╨╛╤Б╨┐╨╛╨┤╨░ ╤Е╨╛╤А╨╛╤И╨╕╨╡, ╨╛╤В╤З╨╡╤В╤Л ╨╛╨▒╨╜╨╛╨▓╨╗╨╡╨╜╤Л"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ╨Э╨░╤В╨░╤И, ╨┐╤А╨╛╤Б╤Л╨┐╨░╨╣╤Б╤П, ╨╝╤Л {{ dag.dag_id }} ╤Г╤А╨╛╨╜╨╕╨╗╨╕
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

рдХреЗ рд╕рдмреИрд▓реЗ рдХрд╣рд┐рд▓реНрдпреИ рд░рд┐рдкреЛрд░реНрдЯ рдЕрдкрдбреЗрдЯ рдЧрд░реЗрдХрд╛ рдЫрдиреН? рдпреЛ рдлреЗрд░рд┐ рдЙрдирдХреЛ рд╣реЛ: рдбреЗрдЯрд╛ рдкреНрд░рд╛рдкреНрдд рдЧрд░реНрдиреЗ рд╕реНрд░реЛрддрд╣рд░реВрдХреЛ рд╕реВрдЪреА рдЫ; рдЬрд╣рд╛рдБ рд░рд╛рдЦреНрди рдХреЛ рд▓рд╛рдЧреА рдПрдХ рд╕реВрдЪреА рдЫ; рдЬрдм рд╕рдмреИ рднрдпреЛ рд╡рд╛ рдмрд┐рдЧреНрд░рд┐рдпреЛ (рдареАрдХ рдЫ, рдпреЛ рд╣рд╛рдореНрд░реЛ рдмрд╛рд░реЗрдорд╛ рд╣реЛрдЗрди, рд╣реЛрдЗрди) рд╣рдЩреНрдХ рдЧрд░реНрди рдирдмрд┐рд░реНрд╕рдиреБрд╣реЛрд╕реНред

рдлреЗрд░рд┐ рдлрд╛рдЗрд▓ рдорд╛рд░реНрдлрдд рдЬрд╛рдФрдВ рд░ рдирдпрд╛рдБ рдЕрд╕реНрдкрд╖реНрдЯ рд╕рд╛рдорд╛рдирд╣рд░реВ рд╣реЗрд░реМрдВ:

  • from commons.operators import TelegramBotSendMessage - рдХреБрдиреИ рдкрдирд┐ рдХреБрд░рд╛рд▓реЗ рд╣рд╛рдореАрд▓рд╛рдИ рд╣рд╛рдореНрд░реЛ рдЖрдлреНрдиреИ рдЕрдкрд░реЗрдЯрд░рд╣рд░реВ рдмрдирд╛рдЙрдирдмрд╛рдЯ рд░реЛрдХреНрджреИрди, рдЬреБрди рд╣рд╛рдореАрд▓реЗ рдЕрдирдмреНрд▓рдХ рдЧрд░рд┐рдПрдХреЛрдорд╛ рд╕рдиреНрджреЗрд╢рд╣рд░реВ рдкрдард╛рдЙрдирдХреЛ рд▓рд╛рдЧрд┐ рдПрдЙрдЯрд╛ рд╕рд╛рдиреЛ рд░реНрдпрд╛рдкрд░ рдмрдирд╛рдПрд░ рдлрд╛рдЗрджрд╛ рдЙрдард╛рдпреМрдВред (рд╣рд╛рдореА рддрд▓ рдпрд╕ рдЕрдкрд░реЗрдЯрд░рдХреЛ рдмрд╛рд░реЗрдорд╛ рдердк рдХреБрд░рд╛ рдЧрд░реНрдиреЗрдЫреМрдВ);
  • default_args={} - dag рд▓реЗ рдЖрдлреНрдирд╛ рд╕рдмреИ рдЕрдкрд░реЗрдЯрд░рд╣рд░реВрд▓рд╛рдИ рд╕рдорд╛рди рддрд░реНрдХрд╣рд░реВ рд╡рд┐рддрд░рдг рдЧрд░реНрди рд╕рдХреНрдЫ;
  • to='{{ var.value.all_the_kings_men }}' - рдХреНрд╖реЗрддреНрд░ to рд╣рд╛рдореАрд╕рдБрдЧ рд╣рд╛рд░реНрдбрдХреЛрдб рд╣реБрдиреЗрдЫреИрди, рддрд░ рдЧрддрд┐рд╢реАрд▓ рд░реВрдкрдорд╛ рдЬрд┐рдиреНрдЬрд╛ рд░ рдЗрдореЗрд▓рд╣рд░реВрдХреЛ рд╕реВрдЪреАрдХреЛ рд╕рд╛рде рдЪрд▓ рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрд░ рдЙрддреНрдкрдиреНрди рдЧрд░рд┐рдПрдХреЛ рдЫ, рдЬреБрди рдореИрд▓реЗ рдзреНрдпрд╛рдирдкреВрд░реНрд╡рдХ рд░рд╛рдЦреЗрдХреЛ рдЫреБред Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - рдЕрдкрд░реЗрдЯрд░ рд╕реБрд░реБ рдЧрд░реНрди рд╕рд░реНрддред рд╣рд╛рдореНрд░реЛ рдЕрд╡рд╕реНрдерд╛рдорд╛, рд╕рдмреИ рдирд┐рд░реНрднрд░рддрд╛рд╣рд░реВ рдХрд╛рдо рдЧрд░реЗрдорд╛ рдорд╛рддреНрд░ рдкрддреНрд░ рдорд╛рд▓рд┐рдХрд╣рд░реВрдорд╛ рдЙрдбреНрдиреЗрдЫ рд╕рдлрд▓рддрд╛рдкреВрд░реНрд╡рдХ;
  • tg_bot_conn_id='tg_main' - рддрд░реНрдХрд╣рд░реВ conn_id рд╣рд╛рдореАрд▓реЗ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реЗрдХреЛ рдЬрдбрд╛рди рдЖрдИрдбреАрд╣рд░реВ рд╕реНрд╡реАрдХрд╛рд░ рдЧрд░реНрдиреБрд╣реЛрд╕реН Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - рдЯреЗрд▓рд┐рдЧреНрд░рд╛рдордорд╛ рд╕рдиреНрджреЗрд╢рд╣рд░реВ рдорд╛рддреНрд░ рдЙрдбреНрдиреЗрдЫрдиреН рдпрджрд┐ рддреНрдпрд╣рд╛рдБ рдХрд╛рд░реНрдпрд╣рд░реВ рдЫрдиреН рднрдиреЗ;
  • task_concurrency=1 - рд╣рд╛рдореА рдПрдЙрдЯреИ рдХрд╛рд░реНрдпрдХреЛ рдзреЗрд░реИ рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдгрд╣рд░реВрдХреЛ рдПрдХреИрд╕рд╛рде рд╕реБрд░реБрд╡рд╛рдд рдЧрд░реНрди рдирд┐рд╖реЗрдз рдЧрд░реНрдЫреМрдВред рдЕрдиреНрдпрдерд╛, рд╣рд╛рдореА рдзреЗрд░реИ рдХреЛ рдПрдХреИ рд╕рд╛рде рдкреНрд░рдХреНрд╖реЗрдкрдг рдкреНрд░рд╛рдкреНрдд рдЧрд░реНрдиреЗрдЫреМрдВ VerticaOperator (рдПрдЙрдЯрд╛ рдЯреЗрдмрд▓ рд╣реЗрд░реНрджреИ);
  • report_update >> [email, tg] - рд╕рдмреИ VerticaOperator рдкрддреНрд░рд╣рд░реВ рд░ рд╕рдиреНрджреЗрд╢рд╣рд░реВ рдкрдард╛рдЙрдирдорд╛ рдорд┐рд▓рд╛рдЙрдиреБрд╣реЛрд╕реН, рдЬрд╕реНрддреИ:
    Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

    рддрд░ рдиреЛрдЯрд┐рдлрд╛рдпрд░ рдЕрдкрд░реЗрдЯрд░рд╣рд░реВрд╕рдБрдЧ рд╡рд┐рднрд┐рдиреНрди рд▓рдиреНрдЪ рдЕрд╡рд╕реНрдерд╛рд╣рд░реВ рднрдПрдХрд╛рд▓реЗ, рдПрдЙрдЯрд╛ рдорд╛рддреНрд░ рдХрд╛рдо рдЧрд░реНрдиреЗрдЫред рд░реВрдЦ рджреГрд╢реНрдпрдорд╛, рд╕рдмреИ рдХреБрд░рд╛ рдереЛрд░реИ рдХрдо рднрд┐рдЬреБрдЕрд▓ рджреЗрдЦрд┐рдиреНрдЫ:
    Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

рдо рдмрд╛рд░реЗ рдХреЗрд╣рд┐ рд╢рдмреНрджрд╣рд░реБ рднрдиреНрдиреЗрдЫреБ рдореНрдпрд╛рдХреНрд░реЛ рд░ рддрд┐рдиреАрд╣рд░реВрдХрд╛ рд╕рд╛рдереАрд╣рд░реВ - рдЪрд░.

рдореНрдпрд╛рдХреНрд░реЛрд╣рд░реВ рдЬрд┐рдиреНрдЬрд╛ рдкреНрд▓реЗрд╕рд╣реЛрд▓реНрдбрд░рд╣рд░реВ рд╣реБрдиреН рдЬрд╕рд▓реЗ рдЕрдкрд░реЗрдЯрд░ рддрд░реНрдХрд╣рд░реВрдорд╛ рд╡рд┐рднрд┐рдиреНрди рдЙрдкрдпреЛрдЧреА рдЬрд╛рдирдХрд╛рд░реАрд╣рд░реВ рдкреНрд░рддрд┐рд╕реНрдерд╛рдкрди рдЧрд░реНрди рд╕рдХреНрдЫрдиреНред рдЙрджрд╛рд╣рд░рдг рдХреЛ рд▓рд╛рдЧреА, рдпреЛ рдЬрд╕реНрддреИ:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} рд╕рдиреНрджрд░реНрдн рдЪрд░рдХреЛ рд╕рд╛рдордЧреНрд░реАрд╣рд░реВрдорд╛ рд╡рд┐рд╕реНрддрд╛рд░ рд╣реБрдиреЗрдЫ execution_date рдврд╛рдБрдЪрд╛рдорд╛ YYYY-MM-DD: 2020-07-14ред рд╕рдмреИ рднрдиреНрджрд╛ рд░рд╛рдореНрд░реЛ рдкрдХреНрд╖ рдпреЛ рд╣реЛ рдХрд┐ рд╕рдиреНрджрд░реНрдн рдЪрд░рд╣рд░реВ рдПрдХ рд╡рд┐рд╢реЗрд╖ рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдг (рдЯреНрд░реА рджреГрд╢реНрдпрдорд╛ рдПрдХ рд╡рд░реНрдЧ) рдорд╛ рдиреЗрд▓ рдЧрд░рд┐рдПрдХреЛ рдЫ, рд░ рдЬрдм рдкреБрди: рд╕реБрд░реБ рд╣реБрдиреНрдЫ, рдкреНрд▓реЗрд╕рд╣реЛрд▓реНрдбрд░рд╣рд░реВ рд╕рдорд╛рди рдорд╛рдирд╣рд░реВрдорд╛ рд╡рд┐рд╕реНрддрд╛рд░ рд╣реБрдиреЗрдЫрдиреНред

рддреЛрдХрд┐рдПрдХрд╛ рдорд╛рдирд╣рд░реВ рдкреНрд░рддреНрдпреЗрдХ рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдгрдорд╛ рд░реЗрдиреНрдбрд░ рдЧрд░рд┐рдПрдХреЛ рдмрдЯрди рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрд░ рд╣реЗрд░реНрди рд╕рдХрд┐рдиреНрдЫред рдпрд╕реНрддреЛ рдЫ рдкрддреНрд░ рдкрдард╛рдЙрдиреЗ рдХрд╛рд░реНрдп:

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

рд░ рддреНрдпрд╕реИрд▓реЗ рд╕рдиреНрджреЗрд╢ рдкрдард╛рдЙрдиреЗ рдХрд╛рд░реНрдпрдорд╛:

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

рдирд╡реАрдирддрдо рдЙрдкрд▓рдмреНрдз рд╕рдВрд╕реНрдХрд░рдгрдХреЛ рд▓рд╛рдЧрд┐ рдирд┐рд░реНрдорд┐рдд рдореНрдпрд╛рдХреНрд░реЛрд╣рд░реВрдХреЛ рдкреВрд░реНрдг рд╕реВрдЪреА рдпрд╣рд╛рдБ рдЙрдкрд▓рдмреНрдз рдЫ: рдореНрдпрд╛рдХреНрд░реЛ рд╕рдиреНрджрд░реНрдн

рдпрд╕рдмрд╛рд╣реЗрдХ, рдкреНрд▓рдЧрдЗрдирд╣рд░реВрдХреЛ рдорджреНрджрддрд▓реЗ, рд╣рд╛рдореА рд╣рд╛рдореНрд░реЛ рдЖрдлреНрдиреИ рдореНрдпрд╛рдХреНрд░реЛрд╣рд░реВ рдШреЛрд╖рдгрд╛ рдЧрд░реНрди рд╕рдХреНрдЫреМрдВ, рддрд░ рддреНрдпреЛ рдЕрд░реНрдХреЛ рдХрдерд╛ рд╣реЛред

рдкреВрд░реНрд╡рдирд┐рд░реНрдзрд╛рд░рд┐рдд рдЪреАрдЬрд╣рд░реВрдХреЛ рдЕрддрд┐рд░рд┐рдХреНрдд, рд╣рд╛рдореА рд╣рд╛рдореНрд░рд╛ рдЪрд░рд╣рд░реВрдХреЛ рдорд╛рдирд╣рд░реВ рдкреНрд░рддрд┐рд╕реНрдерд╛рдкрди рдЧрд░реНрди рд╕рдХреНрдЫреМрдВ (рдореИрд▓реЗ рдорд╛рдерд┐рдХреЛ рдХреЛрдбрдорд╛ рдпрд╕рд▓рд╛рдИ рдкрд╣рд┐рд▓реЗ рдиреИ рдкреНрд░рдпреЛрдЧ рдЧрд░рд┐рд╕рдХреЗрдХреЛ рдЫреБ)ред рднрд┐рддреНрд░ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реМрдВ Admin/Variables рдХреЗрд╣рд┐ рдЪреАрдЬрд╣рд░реВ:

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

рддрдкрд╛рдИрдВрд▓реЗ рдкреНрд░рдпреЛрдЧ рдЧрд░реНрди рд╕рдХреНрдиреЗ рд╕рдмреИ рдХреБрд░рд╛:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

рдорд╛рди рдПрдХ рд╕реНрдХреЗрд▓рд░ рд╣реБрди рд╕рдХреНрдЫ, рд╡рд╛ рдпреЛ JSON рдкрдирд┐ рд╣реБрди рд╕рдХреНрдЫред JSON рдХреЛ рдорд╛рдорд▓рд╛ рдорд╛:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

рдХреЗрд╡рд▓ рдЗрдЪреНрдЫрд┐рдд рдХреБрдЮреНрдЬреАрдорд╛ рдорд╛рд░реНрдЧ рдкреНрд░рдпреЛрдЧ рдЧрд░реНрдиреБрд╣реЛрд╕реН: {{ var.json.bot_config.bot.token }}.

рдо рд╢рд╛рдмреНрджрд┐рдХ рд░реВрдкрдорд╛ рдПрдХ рд╢рдмреНрдж рднрдиреНрдиреЗрдЫреБ рд░ рдПрдХ рд╕реНрдХреНрд░рд┐рдирд╕рдЯ рджреЗрдЦрд╛рдЙрдиреЗрдЫреБ рдЬрдбрд╛рдирд╣рд░реВред рд╕рдмреИ рдХреБрд░рд╛ рдпрд╣рд╛рдБ рдкреНрд░рд╛рдердорд┐рдХ рдЫ: рдкреГрд╖реНрдардорд╛ Admin/Connections рд╣рд╛рдореА рдЬрдбрд╛рди рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдЫреМрдВ, рддреНрдпрд╣рд╛рдБ рд╣рд╛рдореНрд░реЛ рд▓рдЧрдЗрдирд╣рд░реВ / рдкрд╛рд╕рд╡рд░реНрдбрд╣рд░реВ рд░ рдердк рд╡рд┐рд╢рд┐рд╖реНрдЯ рдкреНрдпрд╛рд░рд╛рдорд┐рдЯрд░рд╣рд░реВ рдердкреНрдЫреМрдВред рдпреЛ рдЬрд╕реНрддреЛ:

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

рдкрд╛рд╕рд╡рд░реНрдбрд╣рд░реВ рдЗрдиреНрдХреНрд░рд┐рдкреНрдЯ рдЧрд░реНрди рд╕рдХрд┐рдиреНрдЫ (рдкреВрд░реНрд╡рдирд┐рд░реНрдзрд╛рд░рд┐рдд рднрдиреНрджрд╛ рдзреЗрд░реИ рд░рд╛рдореНрд░рд░реА), рд╡рд╛ рддрдкрд╛рдИрдВрд▓реЗ рдЬрдбрд╛рди рдкреНрд░рдХрд╛рд░ рдЫреЛрдбреНрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ (рдЬрд╕реНрддреИ рдореИрд▓реЗ рдЧрд░реЗрдВред tg_main) - рддрдереНрдп рдпреЛ рд╣реЛ рдХрд┐ рдкреНрд░рдХрд╛рд░рд╣рд░реВрдХреЛ рд╕реВрдЪреА рдПрдпрд░рдлреНрд▓реЛ рдореЛрдбреЗрд▓рд╣рд░реВрдорд╛ рд╣рд╛рд░реНрдбрд╡рд╛рдЗрд░ рдЧрд░рд┐рдПрдХреЛ рдЫ рд░ рд╕реНрд░реЛрдд рдХреЛрдбрд╣рд░реВрдорд╛ рдирдкрд░реЗрд░ рд╡рд┐рд╕реНрддрд╛рд░ рдЧрд░реНрди рд╕рдХрд┐рдБрджреИрди (рдпрджрд┐ рдЕрдЪрд╛рдирдХ рдореИрд▓реЗ рдХреЗрд╣рд┐ рдЧреБрдЧрд▓ рдЧрд░реЗрди рднрдиреЗ, рдХреГрдкрдпрд╛ рдорд▓рд╛рдИ рд╕рдЪреНрдпрд╛рдЙрдиреБрд╣реЛрд╕реН), рддрд░ рд╣рд╛рдореАрд▓рд╛рдИ рдХреНрд░реЗрдбрд┐рдЯрд╣рд░реВ рдкреНрд░рд╛рдкреНрдд рдЧрд░реНрдирдмрд╛рдЯ рдХреБрдиреИ рдкрдирд┐ рдХреБрд░рд╛рд▓реЗ рд░реЛрдХреНрджреИрдиред рдирд╛рдоред

рддрдкрд╛рдИрд▓реЗ рдПрдЙрдЯреИ рдирд╛рдордХреЛ рд╕рд╛рде рдзреЗрд░реИ рдЬрдбрд╛рдирд╣рд░реВ рдкрдирд┐ рдмрдирд╛рдЙрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ: рдпрд╕ рдЕрд╡рд╕реНрдерд╛рдорд╛, рд╡рд┐рдзрд┐ BaseHook.get_connection(), рдЬрд╕рд▓реЗ рд╣рд╛рдореАрд▓рд╛рдИ рдирд╛рдордмрд╛рдЯ рдЬрдбрд╛рдирд╣рд░реВ рдкреНрд░рд╛рдкреНрдд рдЧрд░реНрджрдЫ, рджрд┐рдиреЗрдЫ рдЕрдирд┐рдпрдорд┐рдд рдзреЗрд░реИ рдирд╛рдорд╣рд░реВрдмрд╛рдЯ (рдпреЛ рд░рд╛рдЙрдиреНрдб рд░реЛрдмрд┐рди рдмрдирд╛рдЙрди рдмрдвреА рддрд╛рд░реНрдХрд┐рдХ рд╣реБрдиреЗрдЫ, рддрд░ рдпрд╕рд▓рд╛рдИ рдПрдпрд░рдлреНрд▓реЛ рд╡рд┐рдХрд╛рд╕рдХрд░реНрддрд╛рд╣рд░реВрдХреЛ рд╡рд┐рд╡реЗрдХрдорд╛ рдЫреЛрдбреМрдВ)ред

рдЪрд░рд╣рд░реВ рд░ рдЬрдбрд╛рдирд╣рд░реВ рдкрдХреНрдХреИ рдкрдирд┐ рд░рд╛рдореНрд░реЛ рдЙрдкрдХрд░рдгрд╣рд░реВ рд╣реБрдиреН, рддрд░ рдмреНрдпрд╛рд▓реЗрдиреНрд╕ рдЧреБрдорд╛рдЙрдиреБ рд╣реБрдБрджреИрди: рддрдкрд╛рдИрдВрдХреЛ рдкреНрд░рд╡рд╛рд╣рдХреЛ рдХреБрди рднрд╛рдЧрд╣рд░реВ рддрдкрд╛рдИрдВрд▓реЗ рдХреЛрдбрдорд╛ рднрдгреНрдбрд╛рд░рдг рдЧрд░реНрдиреБрд╣реБрдиреНрдЫ, рд░ рдХреБрди рднрд╛рдЧрд╣рд░реВ рддрдкрд╛рдИрдВрд▓реЗ рднрдгреНрдбрд╛рд░рдгрдХреЛ рд▓рд╛рдЧрд┐ Airflow рд▓рд╛рдИ рджрд┐рдиреБрд╣реБрдиреНрдЫред рдПрдХрд╛рддрд┐рд░, рдпреЛ рдЪрд╛рдБрдбреИ рдорд╛рди рдкрд░рд┐рд╡рд░реНрддрди рдЧрд░реНрди рд╕реБрд╡рд┐рдзрд╛рдЬрдирдХ рд╣реБрди рд╕рдХреНрдЫ, рдЙрджрд╛рд╣рд░рдгрдХрд╛ рд▓рд╛рдЧрд┐, рдореЗрд▓рд┐рдЩ рдмрдХреНрд╕, UI рдорд╛рд░реНрдлрддред рдЕрд░реНрдХреЛрддрд░реНрдл, рдпреЛ рдЕрдЭреИ рдкрдирд┐ рдорд╛рдЙрд╕ рдХреНрд▓рд┐рдХрдорд╛ рдлрд┐рд░реНрддрд╛ рдЫ, рдЬрд╕рдмрд╛рдЯ рд╣рд╛рдореА (рдо) рдЫреБрдЯрдХрд╛рд░рд╛ рдкрд╛рдЙрди рдЪрд╛рд╣рдиреНрдЫреМрдВред

рдЬрдбрд╛рдирд╣рд░реВрд╕рдБрдЧ рдХрд╛рдо рдЧрд░реНрдиреБ рдХрд╛рд░реНрдпрд╣рд░реВ рдордзреНрдпреЗ рдПрдХ рд╣реЛ рд╣реБрдХрд╣рд░реВред рд╕рд╛рдорд╛рдиреНрдпрддрдпрд╛, рдПрдпрд░рдлреНрд▓реЛ рд╣реБрдХрд╣рд░реВ рдпрд╕рд▓рд╛рдИ рддреЗрд╕реНрд░реЛ-рдкрдХреНрд╖ рд╕реЗрд╡рд╛рд╣рд░реВ рд░ рдкреБрд╕реНрддрдХрд╛рд▓рдпрд╣рд░реВрдорд╛ рдЬрдбрд╛рди рдЧрд░реНрдирдХрд╛ рд▓рд╛рдЧрд┐ рдмрд┐рдиреНрджреБрд╣рд░реВ рд╣реБрдиреНред рдЬрд╕реНрддреИ, JiraHook рд╣рд╛рдореНрд░реЛ рд▓рд╛рдЧрд┐ рдЬрд┐рд░рд╛рд╕рдБрдЧ рдЕрдиреНрддрд░рдХреНрд░рд┐рдпрд╛ рдЧрд░реНрдирдХреЛ рд▓рд╛рдЧрд┐ рдЧреНрд░рд╛рд╣рдХ рдЦреЛрд▓реНрдиреЗрдЫ (рддрдкрд╛рдИрд▓реЗ рдХрд╛рд░реНрдпрд╣рд░реВ рдЕрдЧрд╛рдбрд┐ рд░ рдкрдЫрд╛рдбрд┐ рд╕рд╛рд░реНрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ), рд░ рдХреЛ рдорджреНрджрддрд▓реЗ SambaHook рддрдкрд╛рдИрдВ рд╕реНрдерд╛рдиреАрдп рдлрд╛рдЗрд▓рдорд╛ рдкреБрд╢ рдЧрд░реНрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ smb- рдмрд┐рдиреНрджреБред

рдЕрдиреБрдХреВрд▓рди рдЕрдкрд░реЗрдЯрд░ рдкрд╛рд░реНрд╕ рдЧрд░реНрджреИ

рд░ рд╣рд╛рдореА рдпреЛ рдХрд╕рд░реА рдмрдиреЗрдХреЛ рдЫ рднрдиреЗрд░ рд╣реЗрд░реНрди рдирдЬрд┐рдХ рдкреБрдЧреНрдпреМрдВ TelegramBotSendMessage

рдХреЛрдб commons/operators.py рд╡рд╛рд╕реНрддрд╡рд┐рдХ рдЕрдкрд░реЗрдЯрд░ рд╕рдВрдЧ:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

рдпрд╣рд╛рдБ, Airflow рдорд╛ рд╕рдмреИ рдЪреАрдЬ рдЬрд╕реНрддреИ, рд╕рдмреИ рдзреЗрд░реИ рд╕рд░рд▓ рдЫ:

  • рдмрд╛рдЯ рд╡рдВрд╢рд╛рдиреБрдЧрдд BaseOperator, рдЬрд╕рд▓реЗ рдХреЗрд╣рд┐ рдПрдпрд░рдлреНрд▓реЛ-рд╡рд┐рд╢рд┐рд╖реНрдЯ рдЪреАрдЬрд╣рд░реВ рд▓рд╛рдЧреВ рдЧрд░реНрджрдЫ (рддрдкрд╛рдИрдВрдХреЛ рдлреБрд░реНрд╕рджрдорд╛ рд╣реЗрд░реНрдиреБрд╣реЛрд╕реН)
  • рдШреЛрд╖рд┐рдд рдХреНрд╖реЗрддреНрд░рд╣рд░реВ template_fields, рдЬрд╕рдорд╛ Jinja рд▓реЗ рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдЧрд░реНрди рдореНрдпрд╛рдХреНрд░реЛрд╣рд░реВ рдЦреЛрдЬреНрдиреЗрдЫред
  • рдХреЛ рд▓рд╛рдЧрд┐ рд╕рд╣реА рддрд░реНрдХрд╣рд░реВ рдорд┐рд▓рд╛рдЗрдпреЛ __init__()рдЖрд╡рд╢реНрдпрдХ рднрдПрдорд╛ рдкреВрд░реНрд╡рдирд┐рд░реНрдзрд╛рд░рд┐рддрд╣рд░реВ рд╕реЗрдЯ рдЧрд░реНрдиреБрд╣реЛрд╕реНред
  • рд╣рд╛рдореАрд▓реЗ рдкреБрд░реНрдЦрд╛рдХреЛ рдкреНрд░рд╛рд░рдореНрднрд┐рдХрддрд╛рдХреЛ рдмрд╛рд░реЗрдорд╛ рдкрдирд┐ рдмрд┐рд░реНрд╕реЗрдХрд╛ рдЫреИрдиреМрдВред
  • рд╕рдореНрдмрдиреНрдзрд┐рдд рд╣реБрдХ рдЦреЛрд▓реНрдпреЛ TelegramBotHookрдпрд╕рдмрд╛рдЯ рдЧреНрд░рд╛рд╣рдХ рд╡рд╕реНрддреБ рдкреНрд░рд╛рдкреНрдд рднрдпреЛред
  • рдУрднрд░рд░рд╛рдЗрдб (рдкреБрди: рдкрд░рд┐рднрд╛рд╖рд┐рдд) рд╡рд┐рдзрд┐ BaseOperator.execute(), рдЬреБрди рдПрдпрд░рдлреЛрд▓реЗ рдЕрдкрд░реЗрдЯрд░ рд╕реБрд░реБ рдЧрд░реНрдиреЗ рд╕рдордп рдЖрдЙрдБрджрд╛ рдЯреНрд╡рд┐рдЪ рдЧрд░реНрдиреЗрдЫ - рдпрд╕рдорд╛ рд╣рд╛рдореА рд▓рдЧ рдЗрди рдЧрд░реНрди рдмрд┐рд░реНрд╕реЗрд░ рдореБрдЦреНрдп рдХрд╛рд░реНрдп рд▓рд╛рдЧреВ рдЧрд░реНрдиреЗрдЫреМрдВред (рд╣рд╛рдореА рд▓рдЧ рдЗрди, рд╡реИрд╕реЗ, рджрд╛рдпрд╛рдБ рдЗрди stdout ╨╕ stderr - рдПрдпрд░рдлреНрд▓реЛрд▓реЗ рд╕рдмреИ рдХреБрд░рд╛рд▓рд╛рдИ рд░реЛрдХреНрдЫ, рдпрд╕рд▓рд╛рдИ рд╕реБрдиреНрджрд░ рдврдВрдЧрд▓реЗ рдмреЗрд░реНрдЫ, рдЖрд╡рд╢реНрдпрдХ рднрдПрдорд╛ рдпрд╕рд▓рд╛рдИ рд╡рд┐рдШрдЯрди рдЧрд░реНрджрдЫред)

рд╣рд╛рдореАрд╕рдБрдЧ рдХреЗ рдЫ рд╣реЗрд░реМрдВ commons/hooks.pyред рдлрд╛рдЗрд▓рдХреЛ рдкрд╣рд┐рд▓реЛ рднрд╛рдЧ, рд╣реБрдХ рдЖрдлреИрд╕рдБрдЧ:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

рдорд▓рд╛рдИ рдпрд╣рд╛рдБ рдХреЗ рд╡реНрдпрд╛рдЦреНрдпрд╛ рдЧрд░реНрдиреЗ рдкрдирд┐ рдерд╛рд╣рд╛ рдЫреИрди, рдо рдХреЗрд╡рд▓ рдорд╣рддреНрддреНрд╡рдкреВрд░реНрдг рдмреБрдБрджрд╛рд╣рд░реВ рдиреЛрдЯ рдЧрд░реНрдиреЗрдЫреБ:

  • рд╣рд╛рдореА рд╡рд┐рд░рд╛рд╕рдд рдЧрд░реНрдЫреМрдВ, рддрд░реНрдХрд╣рд░реВрдХреЛ рдмрд╛рд░реЗрдорд╛ рд╕реЛрдЪреНрдиреБрд╣реЛрд╕реН - рдзреЗрд░реИ рдЬрд╕реЛ рдЕрд╡рд╕реНрдерд╛рдорд╛ рдпреЛ рдПрдХ рд╣реБрдиреЗрдЫ: conn_id;
  • рдУрднрд░рд░рд╛рдЗрдбрд┐рдВрдЧ рдорд╛рдирдХ рд╡рд┐рдзрд┐рд╣рд░реВ: рдореИрд▓реЗ рдЖрдлреИрд▓рд╛рдИ рд╕реАрдорд┐рдд рдЧрд░реЗрдВ get_conn(), рдЬрд╕рдорд╛ рдореИрд▓реЗ рдирд╛рдорджреНрд╡рд╛рд░рд╛ рдЬрдбрд╛рди рдкреНрдпрд╛рд░рд╛рдорд┐рдЯрд░рд╣рд░реВ рдкрд╛рдЙрдБрдЫреБ рд░ рдХреЗрд╡рд▓ рдЦрдгреНрдб рдкрд╛рдЙрдБрдЫреБ extra (рдпреЛ JSON рдлрд┐рд▓реНрдб рд╣реЛ), рдЬрд╕рдорд╛ рдореИрд▓реЗ (рдореЗрд░реЛ рдЖрдлреНрдиреИ рдирд┐рд░реНрджреЗрд╢рди рдЕрдиреБрд╕рд╛рд░!) Telegram bot рдЯреЛрдХрди рд░рд╛рдЦреЗрдВ: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • рдо рд╣рд╛рдореНрд░реЛ рдЙрджрд╛рд╣рд░рдг рдмрдирд╛рдЙрдБрдЫреБ TelegramBot, рдпрд╕рд▓рд╛рдИ рдПрдХ рд╡рд┐рд╢рд┐рд╖реНрдЯ рдЯреЛрдХрди рджрд┐рдБрджреИред

рдпрддрд┐ рдиреИред рддрдкрд╛рдИрдВ рд╣реБрдХ рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрд░ рдЧреНрд░рд╛рд╣рдХ рдкреНрд░рд╛рдкреНрдд рдЧрд░реНрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ TelegramBotHook().clent рд╡рд╛ TelegramBotHook().get_conn().

рд░ рдлрд╛рдИрд▓рдХреЛ рджреЛрд╕реНрд░реЛ рднрд╛рдЧ, рдЬрд╕рдорд╛ рдо рдЯреЗрд▓реАрдЧреНрд░рд╛рдо REST API рдХреЛ рд▓рд╛рдЧрд┐ рдорд╛рдЗрдХреНрд░реЛрд░реНрдпрд╛рдкрд░ рдмрдирд╛рдЙрдБрдЫреБ, рддрд╛рдХрд┐ рдЙрд╣реА рддрд╛рдиреНрдиреБ рдирдкрд░реЛрд╕реНред python-telegram-bot рдПрдХ рд╡рд┐рдзрд┐ рдХреЛ рд▓рд╛рдЧреА sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

рд╕рд╣реА рддрд░рд┐рдХрд╛ рдпреЛ рд╕рдмреИ рдердкреНрди рд╣реЛ: TelegramBotSendMessage, TelegramBotHook, TelegramBot - рдкреНрд▓рдЧрдЗрдирдорд╛, рд╕рд╛рд░реНрд╡рдЬрдирд┐рдХ рднрдгреНрдбрд╛рд░рдорд╛ рд░рд╛рдЦреНрдиреБрд╣реЛрд╕реН, рд░ рдпрд╕рд▓рд╛рдИ рдЦреБрд▓рд╛ рд╕реНрд░реЛрддрдорд╛ рджрд┐рдиреБрд╣реЛрд╕реНред

рдЬрдм рд╣рд╛рдореАрд▓реЗ рдпреЛ рд╕рдмреИ рдЕрдзреНрдпрдпрди рдЧрд░рд┐рд░рд╣реЗрдХрд╛ рдерд┐рдпреМрдВ, рд╣рд╛рдореНрд░реЛ рд░рд┐рдкреЛрд░реНрдЯ рдЕрджреНрдпрд╛рд╡рдзрд┐рдХрд╣рд░реВ рд╕рдлрд▓рддрд╛рдкреВрд░реНрд╡рдХ рдЕрд╕рдлрд▓ рднрдпреЛ рд░ рдорд▓рд╛рдИ рдЪреНрдпрд╛рдирд▓рдорд╛ рддреНрд░реБрдЯрд┐ рд╕рдиреНрджреЗрд╢ рдкрдард╛рдЗрдпреЛред рдо рдпреЛ рдЧрд▓рдд рдЫ рдХрд┐ рднрдиреЗрд░ рдЬрд╛рдБрдЪ рдЧрд░реНрди рдЬрд╛рдБрджреИрдЫреБ ...

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ
рд╣рд╛рдореНрд░реЛ рдХреБрдХреБрд░рдорд╛ рдХреЗрд╣рд┐ рдмрд┐рдЧреНрд░рд┐рдпреЛ! рд╣рд╛рдореАрд▓реЗ рдЕрдкреЗрдХреНрд╖рд╛ рдЧрд░реЗрдХреЛ рддреНрдпрд╣реА рд╣реЛрдЗрди рд░ ? рдареНрдпрд╛рдХреНрдХреИ!

рдХреЗ рддрдкрд╛рдИрдВ рдЦрдиреНрдпрд╛рдЙрди рдЬрд╛рдБрджреИ рд╣реБрдиреБрд╣реБрдиреНрдЫ?

рдХреЗ рддрдкрд╛рдИрд▓рд╛рдИ рдореИрд▓реЗ рдХреЗрд╣рд┐ рдЫреБрдЯреЗрдХреЛ рдорд╣рд╕реБрд╕ рдЧрд░реНрдиреБрд╣реБрдиреНрдЫ? рдпрд╕реНрддреЛ рджреЗрдЦрд┐рдиреНрдЫ рдХрд┐ рдЙрд╕рд▓реЗ SQL Server рдмрд╛рдЯ Vertica рдорд╛ рдбрд╛рдЯрд╛ рд╕реНрдерд╛рдирд╛рдиреНрддрд░рдг рдЧрд░реНрдиреЗ рд╡рд╛рдЪрд╛ рдЧрд░реЗрдХреЛ рдерд┐рдпреЛ, рд░ рддреНрдпрд╕рдкрдЫрд┐ рдЙрд╕рд▓реЗ рдпрд╕рд▓рд╛рдИ рд▓рд┐рдпреЛ рд░ рд╡рд┐рд╖рдп, рд╕реНрдХрд╛рдЙрдиреНрдбрд░реЗрд▓рдмрд╛рдЯ рд╣рдЯреНрдпреЛ!

рдпреЛ рдХреНрд░реВрд░рддрд╛ рдЬрд╛рдирд╛рдЬрд╛рдиреА рдерд┐рдпреЛ, рдореИрд▓реЗ рддрдкрд╛рдЗрдБрдХреЛ рд▓рд╛рдЧрд┐ рдХреЗрд╣рд┐ рд╢рдмреНрджрд╛рд╡рд▓реА рдмреБрдЭрд╛рдЙрдиреБ рдерд┐рдпреЛред рдЕрдм рддрдкрд╛рдИрдВ рдЕрдЧрд╛рдбрд┐ рдмрдвреНрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫред

рд╣рд╛рдореНрд░реЛ рдпреЛрдЬрдирд╛ рдпрд╕реНрддреЛ рдерд┐рдпреЛ:

  1. рдЧрд░
  2. рдХрд╛рд░реНрдпрд╣рд░реВ рдЙрддреНрдкрдиреНрди рдЧрд░реНрдиреБрд╣реЛрд╕реН
  3. рд╣реЗрд░реМ рд╕рдмреИ рдХрддрд┐ рд╕реБрдиреНрджрд░ рдЫ
  4. рднрд░реНрдирдХрд╛ рд▓рд╛рдЧрд┐ рд╕рддреНрд░ рдирдореНрдмрд░рд╣рд░реВ рдЕрд╕рд╛рдЗрди рдЧрд░реНрдиреБрд╣реЛрд╕реН
  5. SQL рд╕рд░реНрднрд░рдмрд╛рдЯ рдбрд╛рдЯрд╛ рдкреНрд░рд╛рдкреНрдд рдЧрд░реНрдиреБрд╣реЛрд╕реН
  6. Vertica рдорд╛ рдбрд╛рдЯрд╛ рд░рд╛рдЦреНрдиреБрд╣реЛрд╕реН
  7. рддрдереНрдпрд╛рдЩреНрдХ рд╕рдЩреНрдХрд▓рди рдЧрд░реНрдиреБрд╣реЛрд╕реН

рддреНрдпрд╕реЛрднрдП, рдпреЛ рд╕рдмреИ рдкреНрд░рд╛рдкреНрдд рдЧрд░реНрди рд░ рдЪрд▓рд╛рдЙрдирдХреЛ рд▓рд╛рдЧрд┐, рдореИрд▓реЗ рд╣рд╛рдореНрд░реЛрдорд╛ рдПрдЙрдЯрд╛ рд╕рд╛рдиреЛ рдердк рдЧрд░реЗрдВ docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

рддреНрдпрд╣рд╛рдБ рд╣рд╛рдореА рдЙрдард╛рдЙрдБрдЫреМрдВ:

  • Vertica рд╣реЛрд╕реНрдЯрдХреЛ рд░реВрдкрдорд╛ dwh рд╕рдмреИрднрдиреНрджрд╛ рдкреВрд░реНрд╡рдирд┐рд░реНрдзрд╛рд░рд┐рдд рд╕реЗрдЯрд┐рдЩрд╣рд░реВрд╕рдБрдЧ,
  • SQL рд╕рд░реНрднрд░ рдХреЛ рддреАрди рдЙрджрд╛рд╣рд░рдгрд╣рд░реВ,
  • рд╣рд╛рдореА рдкрдЫрд┐рд▓реНрд▓реЛрдорд╛ рдбрд╛рдЯрд╛рдмреЗрд╕рд╣рд░реВ рднрд░реНрдЫреМрдВ рдХреЗрд╣реА рдбрд╛рдЯрд╛ (рдХреБрдиреИ рдкрдирд┐ рдЕрд╡рд╕реНрдерд╛рдорд╛ рдирд╣реЗрд░реНрдиреБрд╣реЛрд╕реН mssql_init.py!)

рд╣рд╛рдореАрд▓реЗ рдкрдЫрд┐рд▓реНрд▓реЛ рдкрдЯрдХ рднрдиреНрджрд╛ рдереЛрд░реИ рдЬрдЯрд┐рд▓ рдЖрджреЗрд╢рдХреЛ рдорджреНрджрддрд▓реЗ рд╕рдмреИ рд░рд╛рдореНрд░реЛ рд╕реБрд░реБрд╡рд╛рдд рдЧрд░реНрдЫреМрдВ:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

рдХреЗ рд╣рд╛рдореНрд░реЛ рдЪрдорддреНрдХрд╛рд░ randomizer рдЙрддреНрдкрдиреНрди, рддрдкрд╛рдИрдВ рд╡рд╕реНрддреБ рдкреНрд░рдпреЛрдЧ рдЧрд░реНрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ Data Profiling/Ad Hoc Query:

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ
рдореБрдЦреНрдп рдХреБрд░рд╛ рдпреЛ рд╡рд┐рд╢реНрд▓реЗрд╖рдХрд╣рд░реВрд▓рд╛рдИ рджреЗрдЦрд╛рдЙрди рд╣реЛрдЗрди

рдорд╛ рд╡рд┐рд╕реНрддреГрдд ETL рд╕рддреНрд░рд╣рд░реВ рдо рдЧрд░реНрджрд┐рди, рддреНрдпрд╣рд╛рдБ рд╕рдмреИ рдХреБрд░рд╛ рдорд╛рдореВрд▓реА рдЫ: рд╣рд╛рдореА рдЖрдзрд╛рд░ рдмрдирд╛рдЙрдБрдЫреМрдВ, рддреНрдпрд╣рд╛рдБ рдПрдЙрдЯрд╛ рдЪрд┐рдиреНрд╣ рдЫ, рд╣рд╛рдореА рд╕рдмреИ рдХреБрд░рд╛ рдХрдиреНрдЯреЗрдХреНрд╕реНрдЯ рдкреНрд░рдмрдиреНрдзрдХрд╕рдБрдЧ рд▓рдкреЗрдЯреНрдЫреМрдВ, рд░ рдЕрдм рд╣рд╛рдореА рдпреЛ рдЧрд░реНрдЫреМрдВ:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

рд╕рдордп рдЖрдпреЛ рд╣рд╛рдореНрд░реЛ рдбрд╛рдЯрд╛ рд╕рдЩреНрдХрд▓рди рд╣рд╛рдореНрд░реЛ рдбреЗрдв рд╕рдп рдЯреЗрдмрд▓рдмрд╛рдЯред рдзреЗрд░реИ рдирдореНрд░ рд░реЗрдЦрд╛рд╣рд░реВрдХреЛ рдорджреНрджрддрд▓реЗ рдпреЛ рдЧрд░реМрдВ:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. рд╣реБрдХрдХреЛ рдорджреНрджрддрд▓реЗ рд╣рд╛рдореА рдПрдпрд░рдлреНрд▓реЛрдмрд╛рдЯ рдкреНрд░рд╛рдкреНрдд рдЧрд░реНрдЫреМрдВ pymssql- рдЬрдбрд╛рди рдЧрд░реНрдиреБрд╣реЛрд╕реН
  2. рдЕрдиреБрд░реЛрдзрдорд╛ рдорд┐рддрд┐рдХреЛ рд░реВрдкрдорд╛ рдкреНрд░рддрд┐рдмрдиреНрдзрд▓рд╛рдИ рдкреНрд░рддрд┐рд╕реНрдерд╛рдкрди рдЧрд░реМрдВ - рдпрд╕рд▓рд╛рдИ рдЯреЗрдореНрдкреНрд▓реЗрдЯ рдЗрдиреНрдЬрд┐рдирджреНрд╡рд╛рд░рд╛ рдкреНрд░рдХрд╛рд░реНрдпрдорд╛ рдлрд╛рд▓рд┐рдиреЗрдЫред
  3. рд╣рд╛рдореНрд░реЛ рдЕрдиреБрд░реЛрдз рдЦреБрд╡рд╛рдЙрдиреЗ pandasрдЬрд╕рд▓реЗ рд╣рд╛рдореАрд▓рд╛рдИ рдкреНрд░рд╛рдкреНрдд рдЧрд░реНрдиреЗрдЫ DataFrame - рдпреЛ рднрд╡рд┐рд╖реНрдпрдорд╛ рд╣рд╛рдореНрд░реЛ рд▓рд╛рдЧрд┐ рдЙрдкрдпреЛрдЧреА рд╣реБрдиреЗрдЫред

рдо рдкреНрд░рддрд┐рд╕реНрдерд╛рдкрди рдкреНрд░рдпреЛрдЧ рдЧрд░реНрджреИрдЫреБ {dt} рдЕрдиреБрд░реЛрдз рдкреНрдпрд╛рд░рд╛рдорд┐рдЯрд░рдХреЛ рд╕рдЯреНрдЯрд╛ %s рдо рджреБрд╖реНрдЯ рдкрд┐рдиреЛрдЪрд┐рдпреЛ рд╣реБрдБ рднрдиреЗрд░ рд╣реЛрдЗрди, рддрд░ рдХрд┐рдирднрдиреЗ pandas рд╕рдореНрд╣рд╛рд▓реНрди рд╕рдХреНрджреИрди pymssql рд░ рдЕрдиреНрддрд┐рдордорд╛ рдЪрд┐рдкреНрд▓рд┐рдиреНрдЫ params: ListрдпрджреНрдпрдкрд┐ рдЙрдиреА рд╡рд╛рд╕реНрддрд╡рдореИ рдЪрд╛рд╣рдиреНрдЫрдиреН tuple.
рдпреЛ рдкрдирд┐ рдзреНрдпрд╛рди рджрд┐рдиреБрд╣реЛрд╕реН рдХрд┐ рд╡рд┐рдХрд╛рд╕рдХрд░реНрддрд╛ pymssql рдЙрд╕рд▓рд╛рдИ рдЕрдм рд╕рдорд░реНрдерди рдирдЧрд░реНрдиреЗ рдирд┐рд░реНрдгрдп рдЧрд░реЗ, рд░ рдпреЛ рдмрд╛рд╣рд┐рд░ рдЬрд╛рдиреЗ рд╕рдордп рд╣реЛ pyodbc.

рд╣реЗрд░реМрдВ рдПрдпрд░рдлреНрд▓реЛрд▓реЗ рд╣рд╛рдореНрд░реЛ рдкреНрд░рдХрд╛рд░реНрдпрд╣рд░реВрдХреЛ рддрд░реНрдХрд╣рд░реВ рдХреЗ рднрд░реНрдпреЛ:

Apache Airflow: ETL рд╕рдЬрд┐рд▓реЛ рдмрдирд╛рдЙрдБрджреИ

рдпрджрд┐ рддреНрдпрд╣рд╛рдБ рдХреБрдиреИ рдбрд╛рдЯрд╛ рдЫреИрди рднрдиреЗ, рддреНрдпрд╕рдкрдЫрд┐ рдЬрд╛рд░реА рд░рд╛рдЦреНрди рдХреЛ рд▓рд╛рдЧреА рдХреБрдиреИ рдорддрд▓рдм рдЫреИрдиред рддрд░ рдпреЛ рднрд░рд╛рдИ рд╕рдлрд▓ рдорд╛рдиреНрди рдкрдирд┐ рдЕрдиреМрдареЛ рдЫред рддрд░ рдпреЛ рдЧрд▓реНрддреА рд╣реЛрдЗрдиред рдП-рдЖрд╣, рдХреЗ рдЧрд░реНрдиреЗ?! рд░ рдпрд╣рд╛рдБ рдХреЗ рдЫ:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException Airflow рд▓рд╛рдИ рдмрддрд╛рдЙрдиреЗ рдЫ рдХрд┐ рддреНрдпрд╣рд╛рдБ рдХреБрдиреИ рддреНрд░реБрдЯрд┐рд╣рд░реВ рдЫреИрдирдиреН, рддрд░ рд╣рд╛рдореА рдХрд╛рд░реНрдп рдЫреЛрдбреНрдЫреМрдВред рдЗрдиреНрдЯрд░рдлреЗрд╕рдорд╛ рд╣рд░рд┐рдпреЛ рд╡рд╛ рд░рд╛рддреЛ рд╡рд░реНрдЧ рд╣реБрдиреЗрдЫреИрди, рддрд░ рдЧреБрд▓рд╛рдмреАред

рд╣рд╛рдореНрд░реЛ рдбрд╛рдЯрд╛ рдЯрд╕ рдЧрд░реМрдВ рдзреЗрд░реИ рд╕реНрддрдореНрднрд╣рд░реВ:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

рдЕрд░реНрдерд╛рддреН:

  • рд╣рд╛рдореАрд▓реЗ рдЖрджреЗрд╢ рд▓рд┐рдПрдХреЛ рдбрд╛рдЯрд╛рдмреЗрд╕,
  • рд╣рд╛рдореНрд░реЛ рдмрд╛рдвреА рд╕рддреНрд░рдХреЛ ID (рдпреЛ рдлрд░рдХ рд╣реБрдиреЗрдЫ рд╣рд░реЗрдХ рдХрд╛рд░реНрдпрдХреЛ рд▓рд╛рдЧрд┐),
  • рд╕реНрд░реЛрдд рд░ рдЕрд░реНрдбрд░ ID рдмрд╛рдЯ рд╣реНрдпрд╛рд╕ - рддрд╛рдХрд┐ рдЕрдиреНрддрд┐рдо рдбрд╛рдЯрд╛рдмреЗрд╕рдорд╛ (рдЬрд╣рд╛рдБ рд╕рдмреИ рдХреБрд░рд╛ рдПрдЙрдЯреИ рдЯреЗрдмрд▓рдорд╛ рд░рд╛рдЦрд┐рдПрдХреЛ рдЫ) рд╣рд╛рдореАрд╕рдБрдЧ рдПрдХ рдЕрджреНрд╡рд┐рддреАрдп рдЕрд░реНрдбрд░ рдЖрдИрдбреА рдЫред

рдЕрдиреНрддрд┐рдо рдЪрд░рдг рдмрд╛рдБрдХреА рдЫ: рд╕рдмреИ рдХреБрд░рд╛ Vertica рдорд╛ рдЦрдиреНрдпрд╛рдЙрдиреБрд╣реЛрд╕реНред рд░, рдЕрдиреМрдареЛ рдХреБрд░рд╛, рдпреЛ рдЧрд░реНрдиреЗ рд╕рдмреИрднрдиреНрджрд╛ рд╢рд╛рдирджрд╛рд░ рд░ рдкреНрд░рднрд╛рд╡рдХрд╛рд░реА рддрд░рд┐рдХрд╛рд╣рд░реВ рдордзреНрдпреЗ рдПрдХ CSV рдорд╛рд░реНрдлрдд рд╣реЛ!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. рд╣рд╛рдореА рдПрдХ рд╡рд┐рд╢реЗрд╖ рд░рд┐рд╕реАрднрд░ рдмрдирд╛рдЙрдБрджреИрдЫреМрдВ StringIO.
  2. pandas рдХреГрдкрдпрд╛ рд╣рд╛рдореНрд░реЛ рд░рд╛рдЦреНрдиреЗрдЫ DataFrame рдлрд╛рд░рдордорд╛ CSV-рд▓рд╛рдЗрдирд╣рд░реВред
  3. рд╣реБрдХрдХреЛ рд╕рд╛рде рд╣рд╛рдореНрд░реЛ рдордирдкрд░реНрдиреЗ Vertica рдорд╛ рдЬрдбрд╛рди рдЦреЛрд▓реМрдВред
  4. рд░ рдЕрдм рд╕рд╣рдпреЛрдЧ рд╕рдВрдЧ copy() рд╣рд╛рдореНрд░реЛ рдбрд╛рдЯрд╛ рд╕рд┐рдзреИ Vertika рдорд╛ рдкрдард╛рдЙрдиреБрд╣реЛрд╕реН!

рд╣рд╛рдореА рдЪрд╛рд▓рдХрдмрд╛рдЯ рд▓рд┐рдиреЗрдЫреМрдВ рдХрд┐ рдХрддрд┐ рд▓рд╛рдЗрдирд╣рд░реВ рднрд░рд┐рдПрдХрд╛ рдерд┐рдП, рд░ рд╕рддреНрд░ рдкреНрд░рдмрдиреНрдзрдХрд▓рд╛рдИ рд╕рдмреИ рдХреБрд░рд╛ рдареАрдХ рдЫ рднрдиреА рдмрддрд╛рдЙрдиреЗрдЫреМрдВ:

session.loaded_rows = cursor.rowcount
session.successful = True

рдпрддрд┐ рдиреИред

рдмрд┐рдХреНрд░реАрдорд╛, рд╣рд╛рдореА рдореНрдпрд╛рдиреБрдЕрд▓ рд░реВрдкрдорд╛ рд▓рдХреНрд╖реНрдп рдкреНрд▓реЗрдЯ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдЫреМрдВред рдпрд╣рд╛рдБ рдореИрд▓реЗ рдЖрдлреИрд▓рд╛рдИ рдПрдЙрдЯрд╛ рд╕рд╛рдиреЛ рдореЗрд╕рд┐рди рдЕрдиреБрдорддрд┐ рджрд┐рдПрдХреЛ рдЫреБ:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

рдо рдкреНрд░рдпреЛрдЧ рдЧрд░реНрджреИ рдЫреБ VerticaOperator() рдо рдбрд╛рдЯрд╛рдмреЗрд╕ рд╕реНрдХреАрдорд╛ рд░ рддрд╛рд▓рд┐рдХрд╛ рдмрдирд╛рдЙрдБрдЫреБ (рдпрджрд┐ рддрд┐рдиреАрд╣рд░реВ рдкрд╣рд┐рд▓реЗ рдиреИ рдЕрд╡рд╕реНрдерд┐рдд рдЫреИрдирдиреН рднрдиреЗ, рдЕрд╡рд╢реНрдп)ред рдореБрдЦреНрдп рдХреБрд░рд╛ рд╕рд╣реА рд░реВрдкрдорд╛ рдирд┐рд░реНрднрд░рддрд╛рд╣рд░реВ рд╡реНрдпрд╡рд╕реНрдерд┐рдд рдЧрд░реНрди рд╣реЛ:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

рд╕рдВрдХреНрд╖реЗрдк рд╣реБрдБрджреИ

- рдард┐рдХреИ рдЫ, - рд╕рд╛рдиреЛ рдореБрд╕рд╛рд▓реЗ рднрдиреНрдпреЛ, - рдпреЛ рдЫреИрди, рдЕрдм
рдЬрдВрдЧрд▓рдХреЛ рд╕рдмреИрднрдиреНрджрд╛ рдбрд░рд▓рд╛рдЧреНрджреЛ рдЬрдирд╛рд╡рд░ рдо рдиреИ рд╣реБрдБ рднрдиреНрдиреЗ рдХреБрд░рд╛рдорд╛ рддрдкрд╛рдИ рд╡рд┐рд╢реНрд╡рд╕реНрдд рд╣реБрдиреБрд╣реБрдиреНрдЫ?

рдЬреБрд▓рд┐рдпрд╛ рдбреЛрдирд╛рд▓реНрдбрд╕рди, рдж рдЧреНрд░реБрдлреЗрд▓реЛ

рдорд▓рд╛рдИ рд▓рд╛рдЧреНрдЫ рдпрджрд┐ рдореЗрд░рд╛ рд╕рд╣рдХрд░реНрдореАрд╣рд░реВ рд░ рдорд╕рдБрдЧ рдкреНрд░рддрд┐рд╕реНрдкрд░реНрдзрд╛ рдерд┐рдпреЛ: рдХрд╕рд▓реЗ рддреБрд░реБрдиреНрддреИ рд╕реБрд░реБрдмрд╛рдЯ ETL рдкреНрд░рдХреНрд░рд┐рдпрд╛ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдиреЗ рд░ рд╕реБрд░реВ рдЧрд░реНрдиреЗ: рддрд┐рдиреАрд╣рд░реВ рддрд┐рдиреАрд╣рд░реВрдХреЛ SSIS рд░ рдорд╛рдЙрд╕ рд░ рдо рдПрдпрд░рдлреНрд▓реЛрд╕рдБрдЧ ... рд░ рддреНрдпрд╕рдкрдЫрд┐ рд╣рд╛рдореА рдорд░реНрдорддрд╕рдореНрднрд╛рд░рдХреЛ рд╕рд╣рдЬрддрд╛ рдкрдирд┐ рддреБрд▓рдирд╛ рдЧрд░реНрдиреЗрдЫреМрдВ ... рд╡рд╛рд╣, рдорд▓рд╛рдИ рд▓рд╛рдЧреНрдЫ рдХрд┐ рддрдкрд╛рдИрдВ рд╕рд╣рдордд рд╣реБрдиреБрд╣реБрдиреЗрдЫ рдХрд┐ рдо рддрд┐рдиреАрд╣рд░реВрд▓рд╛рдИ рд╕рдмреИ рдореЛрд░реНрдЪрд╛рд╣рд░реВрдорд╛ рд╣рд░рд╛рдЙрдиреЗрдЫреБ!

рдпрджрд┐ рдЕрд▓рд┐ рдмрдвреА рдЧрдореНрднреАрд░ рд░реВрдкрдорд╛, рддреНрдпрд╕рдкрдЫрд┐ Apache Airflow - рдХрд╛рд░реНрдпрдХреНрд░рдо рдХреЛрдбрдХреЛ рд░реВрдкрдорд╛ рдкреНрд░рдХреНрд░рд┐рдпрд╛рд╣рд░реВ рд╡рд░реНрдгрди рдЧрд░реЗрд░ - рдореЗрд░реЛ рдХрд╛рдо рдЧрд░реНрдпреЛред рдзреЗрд░реИ рдердк рд╕рд╣рдЬ рд░ рд░рдорд╛рдЗрд▓реЛред

рдпрд╕рдХреЛ рдЕрд╕реАрдорд┐рдд рдПрдХреНрд╕рдЯреЗрдиреНрд╕рд┐рдмрд┐рд▓рд┐рдЯреА, рджреБрдмреИ рдкреНрд▓рдЧ-рдЗрди рд░ рд╕реНрдХреЗрд▓реЗрдмрд┐рд▓рд┐рдЯреАрдХреЛ рдкреНрд░рд╡реГрддрд┐рдХреЛ рд╕рдиреНрджрд░реНрднрдорд╛, рддрдкрд╛рдИрдВрд▓рд╛рдИ рд▓рдЧрднрдЧ рдХреБрдиреИ рдкрдирд┐ рдХреНрд╖реЗрддреНрд░рдорд╛ рдПрдпрд░рдлреНрд▓реЛ рдкреНрд░рдпреЛрдЧ рдЧрд░реНрдиреЗ рдЕрд╡рд╕рд░ рджрд┐рдиреНрдЫ: рдбреЗрдЯрд╛ рд╕рдЩреНрдХрд▓рди, рддрдпрд╛рд░реА рд░ рдкреНрд░рд╢реЛрдзрди рдЧрд░реНрдиреЗ рдкреВрд░реНрдг рдЪрдХреНрд░рдорд╛, рд░рдХреЗрдЯрд╣рд░реВ рдкреНрд░рдХреНрд╖реЗрдкрдгрдорд╛ рд╕рдореЗрдд (рдордВрдЧрд▓ рдЧреНрд░рд╣рдорд╛, рдХреЛред рдкрд╛рдареНрдпрдХреНрд░рдо)ред

рднрд╛рдЧ рдЕрдиреНрддрд┐рдо, рд╕рдиреНрджрд░реНрдн рд░ рдЬрд╛рдирдХрд╛рд░реА

рд╣рд╛рдореАрд▓реЗ рддрдкрд╛рдИрдВрдХреЛ рд▓рд╛рдЧрд┐ рд╕рдЩреНрдХрд▓рди рдЧрд░реЗрдХреЛ рд░реЗрдХ

  • start_dateред рд╣реЛ, рдпреЛ рдкрд╣рд┐рд▓реЗ рдиреИ рд╕реНрдерд╛рдиреАрдп рдореЗрдо рд╣реЛред рдбрдЧрдХреЛ рдореБрдЦреНрдп рддрд░реНрдХ рдорд╛рд░реНрдлрдд start_date рд╕рдмреИ рдкрд╛рд╕ред рд╕рдВрдХреНрд╖реЗрдкрдорд╛, рдпрджрд┐ рддрдкрд╛рдЗрдБ рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдЧрд░реНрдиреБрд╣реБрдиреНрдЫ start_date рд╡рд░реНрддрдорд╛рди рдорд┐рддрд┐, рд░ schedule_interval - рдПрдХ рджрд┐рди, рддреНрдпрд╕рдкрдЫрд┐ DAG рднреЛрд▓рд┐ рд╕реБрд░реБ рд╣реБрдиреЗрдЫред
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    рд░ рдердк рд╕рдорд╕реНрдпрд╛рд╣рд░реВ рдЫреИрдирдиреНред

    рдпрд╕рд╕рдБрдЧ рд╕рдореНрдмрдиреНрдзрд┐рдд рдЕрд░реНрдХреЛ рд░рдирдЯрд╛рдЗрдо рддреНрд░реБрдЯрд┐ рдЫ: Task is missing the start_date parameter, рдЬреБрди рдкреНрд░рд╛рдпрдГ рддрдкрд╛рдИрд▓реЗ рдбреЗрдЧ рдЕрдкрд░реЗрдЯрд░рдорд╛ рдмрд╛рдБрдзреНрди рдмрд┐рд░реНрд╕рдиреБрднрдпреЛ рднрдиреЗрд░ рд╕рдВрдХреЗрдд рдЧрд░реНрджрдЫред

  • рд╕рдмреИ рдПрдЙрдЯреИ рдореЗрд╕рд┐рдирдорд╛ред рд╣реЛ, рд░ рдЖрдзрд╛рд░рд╣рд░реВ (рдПрдпрд░рдлреНрд▓реЛ рдЖрдлреИрдВ рд░ рд╣рд╛рдореНрд░реЛ рдХреЛрдЯрд┐рдВрдЧ), рд░ рд╡реЗрдм рд╕рд░реНрднрд░, рд░ рдПрдХ рд╢реЗрдбреНрдпреВрд▓рд░, рд░ рдХрд╛рдорджрд╛рд░рд╣рд░реВред рд░ рдпрд╕рд▓реЗ рдХрд╛рдо рдкрдирд┐ рдЧрд░реНрдпреЛред рддрд░ рд╕рдордпрдХреЛ рд╕рд╛рдерд╕рд╛рдереИ, рд╕реЗрд╡рд╛рд╣рд░реВрдХреЛ рд▓рд╛рдЧрд┐ рдХрд╛рд░реНрдпрд╣рд░реВрдХреЛ рд╕рдВрдЦреНрдпрд╛ рдмрдвреНрджреИ рдЧрдпреЛ, рд░ рдЬрдм PostgreSQL рд▓реЗ 20 ms рдХреЛ рд╕рдЯреНрдЯрд╛ 5 s рдорд╛ рдЕрдиреБрдХреНрд░рдордгрд┐рдХрд╛рдорд╛ рдкреНрд░рддрд┐рдХреНрд░рд┐рдпрд╛ рджрд┐рди рдерд╛рд▓реНрдпреЛ, рд╣рд╛рдореАрд▓реЗ рдпрд╕рд▓рд╛рдИ рд▓рд┐рдПрд░ рд▓реИрдЬрд╛рдпреМрдВред
  • LocalExecutorред рд╣реЛ, рд╣рд╛рдореА рдЕрдЭреИ рдпрд╕рдорд╛ рдмрд╕рд┐рд░рд╣реЗрдХрд╛ рдЫреМрдВ, рд░ рд╣рд╛рдореА рдкрд╣рд┐рд▓реЗ рдиреИ рдЕрдЧрд╛рдз рдЦрд╛рдбрд▓рдХреЛ рдХрд┐рдирд╛рд░рдорд╛ рдЖрдЗрдкреБрдЧреЗрдХрд╛ рдЫреМрдВред LocalExecutor рд╣рд╛рдореНрд░реЛ рд▓рд╛рдЧрд┐ рдЕрд╣рд┐рд▓реЗрд╕рдореНрдо рдкрд░реНрдпрд╛рдкреНрдд рдЫ, рддрд░ рдЕрдм рдпреЛ рдХрдореНрддрд┐рдорд╛ рдПрдХ рдХрд╛рд░реНрдпрдХрд░реНрддрд╛рд╕рдБрдЧ рд╡рд┐рд╕реНрддрд╛рд░ рдЧрд░реНрдиреЗ рд╕рдордп рд╣реЛ, рд░ рд╣рд╛рдореАрд▓реЗ CeleryExecutor рдорд╛ рдЬрд╛рди рдХрдбрд╛ рдкрд░рд┐рд╢реНрд░рдо рдЧрд░реНрдиреБрдкрд░реНрдиреЗрдЫред рд░ рдпреЛ рддрдереНрдпрд▓рд╛рдИ рдзреНрдпрд╛рдирдорд╛ рд░рд╛рдЦреНрджреИ рдХрд┐ рддрдкрд╛рдЗрдБ рдПрдХ рдореЗрд╕рд┐рдирдорд╛ рдпрд╕рдХреЛ рд╕рд╛рде рдХрд╛рдо рдЧрд░реНрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ, рддрдкрд╛рдЗрдБрд▓рд╛рдИ рд╕рд░реНрднрд░рдорд╛ рдкрдирд┐ рд╕реЗрд▓реЗрд░реА рдкреНрд░рдпреЛрдЧ рдЧрд░реНрдирдмрд╛рдЯ рдХреБрдиреИ рдкрдирд┐ рдХреБрд░рд╛рд▓реЗ рд░реЛрдХреНрджреИрди, рдЬреБрди "рдирд┐рд╕реНрд╕рдиреНрджреЗрд╣, рдЗрдорд╛рдирджрд╛рд░реАрдкреВрд░реНрд╡рдХ рдЙрддреНрдкрд╛рджрдирдорд╛ рдХрд╣рд┐рд▓реНрдпреИ рдЬрд╛рдБрджреИрди!"
  • рдкреНрд░рдпреЛрдЧ рдирдЧрд░реНрдиреЗ рдирд┐рд░реНрдорд┐рдд рдЙрдкрдХрд░рдгрд╣рд░реВ:
    • рдЬрдбрд╛рди рд╕реЗрд╡рд╛ рдкреНрд░рдорд╛рдгрд╣рд░реВ рднрдгреНрдбрд╛рд░рдг рдЧрд░реНрди,
    • SLA рдорд┐рд╕рд╣рд░реВ рд╕рдордпрдорд╛ рдХрд╛рдо рдирдЧрд░реНрдиреЗ рдХрд╛рд░реНрдпрд╣рд░реВрдХреЛ рдЬрд╡рд╛рдл рджрд┐рди,
    • xcom рдореЗрдЯрд╛рдбрд╛рдЯрд╛ рд╡рд┐рдирд┐рдордпрдХреЛ рд▓рд╛рдЧрд┐ (рдореИрд▓реЗ рднрдиреЗ рдореЗрдЯрд╛рдбрд╛рдЯрд╛!) dag рдХрд╛рд░реНрдпрд╣рд░реВ рдмреАрдЪред
  • рдореЗрд▓ рджреБрд░реБрдкрдпреЛрдЧред рдЦреИрд░, рдо рдХреЗ рднрдиреНрди рд╕рдХреНрдЫреБ? рдЕрд▓рд░реНрдЯрд╣рд░реВ рдкрддрди рдЧрд░рд┐рдПрдХрд╛ рдХрд╛рд░реНрдпрд╣рд░реВрдХреЛ рдкреБрдирд░рд╛рд╡реГрддреНрддрд┐рдХреЛ рд▓рд╛рдЧрд┐ рд╕реЗрдЯ рдЕрдк рдЧрд░рд┐рдПрдХреЛ рдерд┐рдпреЛред рдЕрдм рдореЗрд░реЛ рдХрд╛рдо Gmail рдорд╛ Airflow рдмрд╛рдЯ 90k рдЗрдореЗрд▓рд╣рд░реВ рдЫрдиреН, рд░ рд╡реЗрдм рдореЗрд▓ рдереВрдердирд▓реЗ рдПрдХ рдкрдЯрдХрдорд╛ 100 рднрдиреНрджрд╛ рдмрдвреА рдЙрдард╛рдЙрди рд░ рдореЗрдЯрд╛рдЙрди рдЕрд╕реНрд╡реАрдХрд╛рд░ рдЧрд░реНрджрдЫред

рдердк рд╕рдорд╕реНрдпрд╛рд╣рд░реВ: Apache Airflow Pitfails

рдердк рд╕реНрд╡рдЪрд╛рд▓рди рдЙрдкрдХрд░рдгрд╣рд░реВ

рд╣рд╛рдореНрд░реЛ рд╣рд╛рддрд▓реЗ рд╣реЛрдЗрди рд╣рд╛рдореНрд░реЛ рдЯрд╛рдЙрдХреЛрд▓реЗ рдЕрдЭ рдмрдвреА рдХрд╛рдо рдЧрд░реНрдирдХреЛ рд▓рд╛рдЧрд┐, рдПрдпрд░рдлреНрд▓реЛрд▓реЗ рд╣рд╛рдореНрд░реЛ рд▓рд╛рдЧрд┐ рдпреЛ рддрдпрд╛рд░ рдЧрд░реЗрдХреЛ рдЫ:

  • REST API - рдЙрд╣рд╛рдБрд╕рдБрдЧ рдЕрдЭреИ рдкрдирд┐ рдкреНрд░рдпреЛрдЧрд╛рддреНрдордХ рд╕реНрдерд┐рддрд┐ рдЫ, рдЬрд╕рд▓реЗ рдЙрд╣рд╛рдБрд▓рд╛рдИ рдХрд╛рдо рдЧрд░реНрдирдмрд╛рдЯ рд░реЛрдХреНрджреИрдиред рдпрд╕рдХреЛ рд╕рд╛рде, рддрдкрд╛рдИрдВрд▓реЗ рдбреНрдпрд╛рдЧрд╣рд░реВ рд░ рдХрд╛рд░реНрдпрд╣рд░реВ рдмрд╛рд░реЗ рдЬрд╛рдирдХрд╛рд░реА рдорд╛рддреНрд░ рдкреНрд░рд╛рдкреНрдд рдЧрд░реНрди рд╕рдХреНрдиреБрд╣реБрдиреНрди, рддрд░ рдбреЗрдЧ рд░реЛрдХреНрди/рд╕реБрд░реБ рдЧрд░реНрди, DAG рд░рди рд╡рд╛ рдкреВрд▓ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫред
  • CLI - рдзреЗрд░реИ рдЙрдкрдХрд░рдгрд╣рд░реВ рдХрдорд╛рдгреНрдб рд▓рд╛рдЗрди рдорд╛рд░реНрдлрдд рдЙрдкрд▓рдмреНрдз рдЫрдиреН рдЬреБрди WebUI рдорд╛рд░реНрдлрдд рдкреНрд░рдпреЛрдЧ рдЧрд░реНрди рдЕрд╕реБрд╡рд┐рдзрд╛рдЬрдирдХ рдорд╛рддреНрд░ рд╣реЛрдЗрди, рддрд░ рд╕рд╛рдорд╛рдиреНрдпрддрдпрд╛ рдЕрдиреБрдкрд╕реНрдерд┐рдд рдЫрдиреНред рдЙрджрд╛рд╣рд░рдгрдХрд╛ рд▓рд╛рдЧрд┐:
    • backfill рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдгрд╣рд░реВ рдкреБрди: рд╕реБрд░реБ рдЧрд░реНрди рдЖрд╡рд╢реНрдпрдХ рдЫред
      рдЙрджрд╛рд╣рд░рдгрдХрд╛ рд▓рд╛рдЧрд┐, рд╡рд┐рд╢реНрд▓реЗрд╖рдХрд╣рд░реВ рдЖрдПрд░ рднрдиреЗ: "рд░, рддрдкрд╛рдИрдВ, рдХрдорд░реЗрдб, рдЬрдирд╡рд░реА 1 рджреЗрдЦрд┐ 13 рд╕рдореНрдордХреЛ рдбрд╛рдЯрд╛рдорд╛ рдмрдХрд╡рд╛рд╕ рдЫ! рдпрд╕рд▓рд╛рдИ рдареАрдХ рдЧрд░реНрдиреБрд╣реЛрд╕реН, рдпрд╕рд▓рд╛рдИ рдареАрдХ рдЧрд░реНрдиреБрд╣реЛрд╕реН, рдпрд╕рд▓рд╛рдИ рдареАрдХ рдЧрд░реНрдиреБрд╣реЛрд╕реН, рдпрд╕рд▓рд╛рдИ рдареАрдХ рдЧрд░реНрдиреБрд╣реЛрд╕реН!" рд░ рддрдкрд╛рдИрдВ рдпрд╕реНрддреЛ рд╣рдм рд╣реБрдиреБрд╣реБрдиреНрдЫ:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • рдЖрдзрд╛рд░ рд╕реЗрд╡рд╛: initdb, resetdb, upgradedb, checkdb.
    • run, рдЬрд╕рд▓реЗ рддрдкрд╛рдИрдВрд▓рд╛рдИ рдПрдЙрдЯрд╛ рдЙрджрд╛рд╣рд░рдг рдХрд╛рд░реНрдп рдЪрд▓рд╛рдЙрди рд░ рд╕рдмреИ рдирд┐рд░реНрднрд░рддрд╛рд╣рд░реВрдорд╛ рд╕реНрдХреЛрд░ рдЧрд░реНрди рдЕрдиреБрдорддрд┐ рджрд┐рдиреНрдЫред рдпрд╕рдмрд╛рд╣реЗрдХ, рддрдкрд╛рдИрдВ рдпрд╕рд▓рд╛рдИ рдорд╛рд░реНрдлрдд рдЪрд▓рд╛рдЙрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ LocalExecutor, рддрдкрд╛рдИрдВрд╕рдБрдЧ рд╕реЗрд▓реЗрд░реА рдХреНрд▓рд╕реНрдЯрд░ рднрдП рдкрдирд┐ред
    • рд▓рдЧрднрдЧ рдПрдЙрдЯреИ рдХреБрд░рд╛ рдЧрд░реНрдЫ test, рдХреЗрд╡рд▓ рдЖрдзрд╛рд░рд╣рд░реВрдорд╛ рдкрдирд┐ рдХреЗрд╣рд┐ рд▓реЗрдЦреНрджреИрдиред
    • connections рдЦреЛрд▓рдмрд╛рдЯ рдЬрдбрд╛рдирд╣рд░реВрдХреЛ рдареВрд▓реЛ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрди рдЕрдиреБрдорддрд┐ рджрд┐рдиреНрдЫред
  • рдкрд╛рдЗрдерди рдПрдкреАрдЖрдИ - рдЕрдиреНрддрд░реНрдХреНрд░рд┐рдпрд╛ рдЧрд░реНрдиреЗ рдПрдХ рдмрд░реБ рд╣рд╛рд░реНрдбрдХреЛрд░ рддрд░рд┐рдХрд╛, рдЬреБрди рдкреНрд▓рдЧрдЗрдирд╣рд░реВрдХреЛ рд▓рд╛рдЧрд┐ рд╣реЛ, рд░ рдпрд╕рдорд╛ рд╕рд╛рдирд╛ рд╣рд╛рддрд╣рд░реВрд▓реЗ рдЭреБрдиреНрдбреНрдпрд╛рдЙрдБрджреИрдиред рддрд░ рд╣рд╛рдореАрд▓рд╛рдИ рдЬрд╛рдирдмрд╛рдЯ рдХрд╕рд▓реЗ рд░реЛрдХреНрдиреЗ? /home/airflow/dags, рджреМрдб ipython рд░ рд╡рд░рдкрд░ рдЧрдбрдмрдб рд╕реБрд░реБ? рддрдкрд╛рдИрдВ, рдЙрджрд╛рд╣рд░рдгрдХрд╛ рд▓рд╛рдЧрд┐, рдирд┐рдореНрди рдХреЛрдбрдХреЛ рд╕рд╛рде рд╕рдмреИ рдЬрдбрд╛рдирд╣рд░реВ рдирд┐рд░реНрдпрд╛рдд рдЧрд░реНрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Airflow рдореЗрдЯрд╛рдбреЗрдЯрд╛рдмреЗрд╕ рдЬрдбрд╛рди рдЧрд░реНрджреИред рдо рдпрд╕рдорд╛ рд▓реЗрдЦреНрди рд╕рд┐рдлрд╛рд░рд┐рд╕ рдЧрд░реНрджрд┐рди, рддрд░ рд╡рд┐рднрд┐рдиреНрди рд╡рд┐рд╢рд┐рд╖реНрдЯ рдореЗрдЯреНрд░рд┐рдХреНрд╕рдХрд╛ рд▓рд╛рдЧрд┐ рдХрд╛рд░реНрдп рд░рд╛рдЬреНрдпрд╣рд░реВ рдкреНрд░рд╛рдкреНрдд рдЧрд░реНрдиреБ рдХреБрдиреИ рдкрдирд┐ API рдорд╛рд░реНрдлрдд рднрдиреНрджрд╛ рдзреЗрд░реИ рдЫрд┐рдЯреЛ рд░ рд╕рдЬрд┐рд▓реЛ рд╣реБрди рд╕рдХреНрдЫред

    рднрдиреМрдВ рдХрд┐ рд╣рд╛рдореНрд░рд╛ рд╕рдмреИ рдХрд╛рд░реНрдпрд╣рд░реВ рдирд┐рд░реНрджреЛрд╖ рдЫреИрдирдиреН, рддрд░ рддрд┐рдиреАрд╣рд░реВ рдХрд╣рд┐рд▓реЗрдХрд╛рд╣реАрдВ рдЦрд╕реНрди рд╕рдХреНрдЫрдиреН, рд░ рдпреЛ рд╕рд╛рдорд╛рдиреНрдп рд╣реЛред рддрд░ рдХреЗрд╣реА рдЕрд╡рд░реЛрдзрд╣рд░реВ рдкрд╣рд┐рд▓реЗ рдиреИ рд╢рдВрдХрд╛рд╕реНрдкрдж рдЫрдиреН, рд░ рдпреЛ рдЬрд╛рдБрдЪ рдЧрд░реНрди рдЖрд╡рд╢реНрдпрдХ рд╣реБрдиреЗрдЫред

    SQL рд╕рд╛рд╡рдзрд╛рди!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

рд╕рдиреНрджрд░реНрдн

рд░ рдирд┐рд╕реНрд╕рдиреНрджреЗрд╣, Google рдХреЛ рдЬрд╛рд░реАрдмрд╛рдЯ рдкрд╣рд┐рд▓реЛ рджрд╕ рд▓рд┐рдЩреНрдХрд╣рд░реВ рдореЗрд░реЛ рдмреБрдХрдорд╛рд░реНрдХрд╣рд░реВрдмрд╛рдЯ Airflow рдлреЛрд▓реНрдбрд░рдХреЛ рд╕рд╛рдордЧреНрд░реА рд╣реБрдиреНред

рд░ рд▓реЗрдЦрдорд╛ рдкреНрд░рдпреЛрдЧ рдЧрд░рд┐рдПрдХрд╛ рд▓рд┐рдЩреНрдХрд╣рд░реВ:

рд╕реНрд░реЛрдд: www.habr.com