Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

рдирдорд╕реНрддреЗ, рдореИрдВ рджрд┐рдорд┐рддреНрд░реА рд▓реЛрдЧрд╡рд┐рдиреЗрдВрдХреЛ рд╣реВрдВ - рд╡реЗрдЬрд╝реЗрдЯ рд╕рдореВрд╣ рдХреА рдХрдВрдкрдирд┐рдпреЛрдВ рдХреЗ рдПрдирд╛рд▓рд┐рдЯрд┐рдХреНрд╕ рд╡рд┐рднрд╛рдЧ рдХрд╛ рдбреЗрдЯрд╛ рдЗрдВрдЬреАрдирд┐рдпрд░ред

рдореИрдВ рдЖрдкрдХреЛ рдИрдЯреАрдПрд▓ рдкреНрд░рдХреНрд░рд┐рдпрд╛рдУрдВ рдХреЛ рд╡рд┐рдХрд╕рд┐рдд рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдПрдХ рдЕрджреНрднреБрдд рдЙрдкрдХрд░рдг - рдЕрдкрд╛рдЪреЗ рдПрдпрд░рдлреНрд▓реЛ рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рдмрддрд╛рдКрдВрдЧрд╛ред рд▓реЗрдХрд┐рди рдПрдпрд░рдлрд╝реНрд▓реЛ рдЗрддрдирд╛ рдмрд╣реБрдореБрдЦреА рдФрд░ рдмрд╣реБрдореБрдЦреА рд╣реИ рдХрд┐ рдЖрдкрдХреЛ рдЗрд╕ рдкрд░ рдХрд░реАрдм рд╕реЗ рдирдЬрд╝рд░ рдбрд╛рд▓рдиреА рдЪрд╛рд╣рд┐рдП, рднрд▓реЗ рд╣реА рдЖрдк рдбреЗрдЯрд╛ рдкреНрд░рд╡рд╛рд╣ рдореЗрдВ рд╢рд╛рдорд┐рд▓ рди рд╣реЛрдВ, рд▓реЗрдХрд┐рди рд╕рдордп-рд╕рдордп рдкрд░ рдХрд┐рд╕реА рднреА рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдХреЛ рд▓реЙрдиреНрдЪ рдХрд░рдиреЗ рдФрд░ рдЙрдирдХреЗ рдирд┐рд╖реНрдкрд╛рджрди рдХреА рдирд┐рдЧрд░рд╛рдиреА рдХрд░рдиреЗ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реЛред

рдФрд░ рд╣рд╛рдВ, рдореИрдВ рди рдХреЗрд╡рд▓ рдмрддрд╛рдКрдВрдЧрд╛, рдмрд▓реНрдХрд┐ рджрд┐рдЦрд╛рдКрдВрдЧрд╛: рдХрд╛рд░реНрдпрдХреНрд░рдо рдореЗрдВ рдмрд╣реБрдд рд╕рд╛рд░реЗ рдХреЛрдб, рд╕реНрдХреНрд░реАрдирд╢реЙрдЯ рдФрд░ рд╕рд┐рдлрд╛рд░рд┐рд╢реЗрдВ рд╣реИрдВред

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛
рдЬрдм рдЖрдк рдЧреВрдЧрд▓ рдкрд░ рдПрдпрд░рдлрд╝реНрд▓реЛ/рд╡рд┐рдХрд┐рдореАрдбрд┐рдпрд╛ рдХреЙрдордиреНрд╕ рд╢рдмреНрдж рджреЗрдЦрддреЗ рд╣реИрдВ рддреЛ рдЖрдк рдЖрдорддреМрд░ рдкрд░ рдХреНрдпрд╛ рджреЗрдЦрддреЗ рд╣реИрдВ

рд▓реЗрдЦ-рд╕реВрдЪреА

рдкрд░рд┐рдЪрдп

рдЕрдкрд╛рдЪреЗ рдПрдпрд░рдлрд╝реНрд▓реЛ рдмрд┐рд▓реНрдХреБрд▓ Django рдХреА рддрд░рд╣ рд╣реИ:

  • рдкрд╛рдпрдерди рдореЗрдВ рд▓рд┐рдЦрд╛ рдЧрдпрд╛ рд╣реИ
  • рдПрдХ рдмреЗрд╣рддрд░реАрди рдПрдбрдорд┐рди рдкреИрдирд▓ рд╣реИ,
  • рдЕрдирд┐рд╢реНрдЪрд┐рдд рдХрд╛рд▓ рддрдХ рд╡рд┐рд╕реНрддрд╛рд░ рдпреЛрдЧреНрдп

- рдХреЗрд╡рд▓ рдмреЗрд╣рддрд░, рдФрд░ рдЗрд╕реЗ рдкреВрд░реА рддрд░рд╣ рд╕реЗ рдЕрд▓рдЧ рдЙрджреНрджреЗрд╢реНрдпреЛрдВ рдХреЗ рд▓рд┐рдП рдмрдирд╛рдпрд╛ рдЧрдпрд╛ рдерд╛, рдЕрд░реНрдерд╛рддреН (рдЬреИрд╕рд╛ рдХрд┐ рдХреИрдЯ рд╕реЗ рдкрд╣рд▓реЗ рд▓рд┐рдЦрд╛ рдЧрдпрд╛ рд╣реИ):

  • рдЕрд╕реАрдорд┐рдд рд╕рдВрдЦреНрдпрд╛ рдореЗрдВ рдорд╢реАрдиреЛрдВ рдкрд░ рдХрд╛рд░реНрдп рдЪрд▓рд╛рдирд╛ рдФрд░ рдирд┐рдЧрд░рд╛рдиреА рдХрд░рдирд╛ (рдЬрд┐рддрдиреА рд╕реЗрд▓реЗрд░реА/рдХреБрдмреЗрд░рдиреЗрдЯреНрд╕ рдФрд░ рдЖрдкрдХрд╛ рд╡рд┐рд╡реЗрдХ рдЖрдкрдХреЛ рдЕрдиреБрдорддрд┐ рджреЗрдЧрд╛)
  • рдкрд╛рдЗрдерди рдХреЛрдб рдХреЛ рд▓рд┐рдЦрдиреЗ рдФрд░ рд╕рдордЭрдиреЗ рдореЗрдВ рдмрд╣реБрдд рдЖрд╕рд╛рди рд╕реЗ рдЧрддрд┐рд╢реАрд▓ рд╡рд░реНрдХрдлрд╝реНрд▓реЛ рдкреАрдврд╝реА рдХреЗ рд╕рд╛рде
  • рдФрд░ рддреИрдпрд╛рд░ рдШрдЯрдХреЛрдВ рдФрд░ рдШрд░реЗрд▓реВ рдкреНрд▓рдЧрдЗрдиреНрд╕ (рдЬреЛ рдмреЗрд╣рдж рд╕рд░рд▓ рд╣реИ) рджреЛрдиреЛрдВ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рдХрд┐рд╕реА рднреА рдбреЗрдЯрд╛рдмреЗрд╕ рдФрд░ рдПрдкреАрдЖрдИ рдХреЛ рдПрдХ рджреВрд╕рд░реЗ рд╕реЗ рдЬреЛрдбрд╝рдиреЗ рдХреА рдХреНрд╖рдорддрд╛ред

рд╣рдо Apache Airflow рдХрд╛ рдЙрдкрдпреЛрдЧ рдЗрд╕ рдкреНрд░рдХрд╛рд░ рдХрд░рддреЗ рд╣реИрдВ:

  • рд╣рдо DWH рдФрд░ ODS (рд╣рдорд╛рд░реЗ рдкрд╛рд╕ рд╡рд░реНрдЯрд┐рдХрд╛ рдФрд░ рдХреНрд▓рд┐рдХрд╣рд╛рдЙрд╕ рд╣реИрдВ) рдореЗрдВ рд╡рд┐рднрд┐рдиреНрди рд╕реНрд░реЛрддреЛрдВ (рдХрдИ SQL рд╕рд░реНрд╡рд░ рдФрд░ PostgreSQL рдЙрджрд╛рд╣рд░рдг, рдПрдкреНрд▓рд┐рдХреЗрд╢рди рдореЗрдЯреНрд░рд┐рдХреНрд╕ рдХреЗ рд╕рд╛рде рд╡рд┐рднрд┐рдиреНрди API, рдпрд╣рд╛рдВ рддрдХ тАЛтАЛрдХрд┐ 1C) рд╕реЗ рдбреЗрдЯрд╛ рдПрдХрддреНрд░ рдХрд░рддреЗ рд╣реИрдВред
  • рдХрд┐рддрдирд╛ рдЙрдиреНрдирдд cron, рдЬреЛ рдУрдбреАрдПрд╕ рдкрд░ рдбреЗрдЯрд╛ рд╕рдореЗрдХрди рдкреНрд░рдХреНрд░рд┐рдпрд╛ рд╢реБрд░реВ рдХрд░рддрд╛ рд╣реИ, рдФрд░ рдЙрдирдХреЗ рд░рдЦрд░рдЦрд╛рд╡ рдХреА рдирд┐рдЧрд░рд╛рдиреА рднреА рдХрд░рддрд╛ рд╣реИред

рдХреБрдЫ рд╕рдордп рдкрд╣рд▓реЗ рддрдХ, рд╣рдорд╛рд░реА рдЬрд╝рд░реВрд░рддреЗрдВ 32 рдХреЛрд░ рдФрд░ 50 рдЬреАрдмреА рд░реИрдо рд╡рд╛рд▓реЗ рдПрдХ рдЫреЛрдЯреЗ рд╕рд░реНрд╡рд░ рджреНрд╡рд╛рд░рд╛ рдкреВрд░реА рдХреА рдЬрд╛рддреА рдереАрдВред рдПрдпрд░рдлреНрд▓реЛ рдореЗрдВ, рдпрд╣ рдХрд╛рдо рдХрд░рддрд╛ рд╣реИ:

  • рдЕрдзрд┐рдХ рд╣реИ 200 рдбреЗрдЧ (рд╡рд╛рд╕реНрддрд╡ рдореЗрдВ рд╡рд░реНрдХрдлрд╝реНрд▓реЛ, рдЬрд┐рд╕рдореЗрдВ рд╣рдордиреЗ рдХрд╛рд░реНрдп рднрд░реЗ рд╣реИрдВ),
  • рдкреНрд░рддреНрдпреЗрдХ рдореЗрдВ рдФрд╕рддрди 70 рдХрд╛рд░реНрдп,
  • рдпрд╣ рдЕрдЪреНрдЫрд╛рдИ рд╢реБрд░реВ рд╣реЛрддреА рд╣реИ (рдФрд╕рддрди рднреА) рдПрдХ рдШрдВрдЯреЗ рдореЗрдВ рдПрдХ рдмрд╛рд░.

рдФрд░ рд╣рдордиреЗ рдХреИрд╕реЗ рд╡рд┐рд╕реНрддрд╛рд░ рдХрд┐рдпрд╛, рдЗрд╕рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рдореИрдВ рдиреАрдЪреЗ рд▓рд┐рдЦреВрдВрдЧрд╛, рд▓реЗрдХрд┐рди рдЕрдм рдЙрдмрд░-рд╕рдорд╕реНрдпрд╛ рдХреЛ рдкрд░рд┐рднрд╛рд╖рд┐рдд рдХрд░рддреЗ рд╣реИрдВ рдЬрд┐рд╕реЗ рд╣рдо рд╣рд▓ рдХрд░реЗрдВрдЧреЗ:

рддреАрди рд╕реНрд░реЛрдд SQL рд╕рд░реНрд╡рд░ рд╣реИрдВ, рдкреНрд░рддреНрдпреЗрдХ рдореЗрдВ 50 рдбреЗрдЯрд╛рдмреЗрд╕ рд╣реИрдВ - рдХреНрд░рдорд╢рдГ рдПрдХ рдкрд░рд┐рдпреЛрдЬрдирд╛ рдХреЗ рдЙрджрд╛рд╣рд░рдг, рдЙрдирдХреА рд╕рдВрд░рдЪрдирд╛ рд╕рдорд╛рди рд╣реИ (рд▓рдЧрднрдЧ рд╣рд░ рдЬрдЧрд╣, рдореБрдЖ-рд╣рд╛-рд╣рд╛), рдЬрд┐рд╕рдХрд╛ рдЕрд░реНрде рд╣реИ рдХрд┐ рдкреНрд░рддреНрдпреЗрдХ рдореЗрдВ рдПрдХ рдСрд░реНрдбрд░ рддрд╛рд▓рд┐рдХрд╛ рд╣реИ (рд╕реМрднрд╛рдЧреНрдп рд╕реЗ, рдЙрд╕рдХреЗ рд╕рд╛рде рдПрдХ рддрд╛рд▓рд┐рдХрд╛) рдирд╛рдо рдХреЛ рдХрд┐рд╕реА рднреА рд╡реНрдпрд╡рд╕рд╛рдп рдореЗрдВ рдзрдХреЗрд▓рд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИ)ред рд╣рдо рд╕реЗрд╡рд╛ рдлрд╝реАрд▓реНрдб (рд╕реНрд░реЛрдд рд╕рд░реНрд╡рд░, рд╕реНрд░реЛрдд рдбреЗрдЯрд╛рдмреЗрд╕, рдИрдЯреАрдПрд▓ рдХрд╛рд░реНрдп рдЖрдИрдбреА) рдЬреЛрдбрд╝рдХрд░ рдбреЗрдЯрд╛ рд▓реЗрддреЗ рд╣реИрдВ рдФрд░ рдЙрдиреНрд╣реЗрдВ рд╡рд░реНрдЯрд┐рдХрд╛ рдХрд╣рддреЗ рд╣реИрдВ, рдореЗрдВ рдлреЗрдВрдХ рджреЗрддреЗ рд╣реИрдВред

рдЪрд▓реЛ рдЪрд▓реЗрдВ!

рдореБрдЦреНрдп рднрд╛рдЧ, рд╡реНрдпрд╛рд╡рд╣рд╛рд░рд┐рдХ (рдФрд░ рдереЛрдбрд╝рд╛ рд╕реИрджреНрдзрд╛рдВрддрд┐рдХ)

рд╣рдо (рдФрд░ рдЖрдк) рдРрд╕рд╛ рдХреНрдпреЛрдВ рдХрд░рддреЗ рд╣реИрдВ

рдЬрдм рдкреЗрдбрд╝ рдмрдбрд╝реЗ рдереЗ рдФрд░ рдореИрдВ рд╕рд░рд▓ рдерд╛ SQL-рдПрдХ рд░реВрд╕реА рд░рд┐рдЯреЗрд▓ рдореЗрдВ, рд╣рдордиреЗ рд╣рдорд╛рд░реЗ рдкрд╛рд╕ рдЙрдкрд▓рдмреНрдз рджреЛ рдЙрдкрдХрд░рдгреЛрдВ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рдИрдЯреАрдПрд▓ рдкреНрд░рдХреНрд░рд┐рдпрд╛рдУрдВ рдЙрд░реНрдл тАЛтАЛтАЛтАЛрдбреЗрдЯрд╛ рдкреНрд░рд╡рд╛рд╣ рдореЗрдВ рдШреЛрдЯрд╛рд▓рд╛ рдХрд┐рдпрд╛:

  • рдЗрдВрдлреЙрд░реНрдореЗрдЯрд┐рдХрд╛ рдкрд╛рд╡рд░ рд╕реЗрдВрдЯрд░ - рдПрдХ рдмреЗрд╣рдж рдлреИрд▓рдиреЗ рд╡рд╛рд▓реА рдкреНрд░рдгрд╛рд▓реА, рдмреЗрд╣рдж рдЙрддреНрдкрд╛рджрдХ, рдЕрдкрдиреЗ рд╕реНрд╡рдпрдВ рдХреЗ рд╣рд╛рд░реНрдбрд╡реЗрдпрд░, рдЕрдкрдиреЗ рд╕реНрд╡рдпрдВ рдХреЗ рд╕рдВрд╕реНрдХрд░рдг рдХреЗ рд╕рд╛рдеред рдореИрдВрдиреЗ рднрдЧрд╡рд╛рди рди рдХрд░реЗ рдЗрд╕рдХреА рдХреНрд╖рдорддрд╛рдУрдВ рдХрд╛ 1% рдЙрдкрдпреЛрдЧ рдХрд┐рдпрд╛ред рдХреНрдпреЛрдВ? рдЦреИрд░, рд╕рдмрд╕реЗ рдкрд╣рд▓реЗ, рдпрд╣ рдЗрдВрдЯрд░рдлрд╝реЗрд╕, рд▓рдЧрднрдЧ 380 рдХреЗ рджрд╢рдХ рдХрд╛, рдорд╛рдирд╕рд┐рдХ рд░реВрдк рд╕реЗ рд╣рдо рдкрд░ рджрдмрд╛рд╡ рдбрд╛рд▓рддрд╛ рд╣реИред рджреВрд╕рд░реЗ, рдпрд╣ рдЙрдкрдХрд░рдг рдЕрддреНрдпрдзрд┐рдХ рдлреИрдВрд╕реА рдкреНрд░рдХреНрд░рд┐рдпрд╛рдУрдВ, рдЙрдЧреНрд░ рдШрдЯрдХ рдкреБрди: рдЙрдкрдпреЛрдЧ рдФрд░ рдЕрдиреНрдп рдмрд╣реБрдд рдорд╣рддреНрд╡рдкреВрд░реНрдг-рдЙрджреНрдпрдо-рдЪрд╛рд▓реЛрдВ рдХреЗ рд▓рд┐рдП рдбрд┐рдЬрд╝рд╛рдЗрди рдХрд┐рдпрд╛ рдЧрдпрд╛ рд╣реИред рдЗрд╕ рддрдереНрдп рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рдХрд┐ рдЗрд╕рдХреА рд▓рд╛рдЧрдд рдПрдпрд░рдмрд╕ рдПXNUMX/рд╡рд░реНрд╖ рдХреЗ рд╡рд┐рдВрдЧ рдХреА рддрд░рд╣ рд╣реИ, рд╣рдо рдХреБрдЫ рдирд╣реАрдВ рдХрд╣реЗрдВрдЧреЗред

    рд╕рд╛рд╡рдзрд╛рди, рдПрдХ рд╕реНрдХреНрд░реАрдирд╢реЙрдЯ 30 рд╡рд░реНрд╖ рд╕реЗ рдХрдо рдЙрдореНрд░ рдХреЗ рд▓реЛрдЧреЛрдВ рдХреЛ рдереЛрдбрд╝рд╛ рдиреБрдХрд╕рд╛рди рдкрд╣реБрдВрдЪрд╛ рд╕рдХрддрд╛ рд╣реИ

    Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

  • SQL рд╕рд░реНрд╡рд░ рдПрдХреАрдХрд░рдг рд╕рд░реНрд╡рд░ - рд╣рдордиреЗ рдЗрд╕ рдХреЙрдорд░реЗрдб рдХрд╛ рдЙрдкрдпреЛрдЧ рдЕрдкрдиреЗ рдЗрдВрдЯреНрд░рд╛-рдкреНрд░реЛрдЬреЗрдХреНрдЯ рдлреНрд▓реЛ рдореЗрдВ рдХрд┐рдпрд╛ред рдЦреИрд░, рд╡рд╛рд╕реНрддрд╡ рдореЗрдВ: рд╣рдо рдкрд╣рд▓реЗ рд╕реЗ рд╣реА SQL рд╕рд░реНрд╡рд░ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рддреЗ рд╣реИрдВ, рдФрд░ рдЗрд╕рдХреЗ ETL рдЯреВрд▓ рдХрд╛ рдЙрдкрдпреЛрдЧ рди рдХрд░рдирд╛ рдХрд┐рд╕реА рддрд░рд╣ рд╕реЗ рдЕрдиреБрдЪрд┐рдд рд╣реЛрдЧрд╛ред рдЗрд╕рдореЗрдВ рд╕рдм рдХреБрдЫ рдЕрдЪреНрдЫрд╛ рд╣реИ: рдЗрдВрдЯрд░рдлрд╝реЗрд╕ рд╕реБрдВрджрд░ рд╣реИ, рдФрд░ рдкреНрд░рдЧрддрд┐ рд░рд┐рдкреЛрд░реНрдЯ ... рд▓реЗрдХрд┐рди рдпрд╣реА рдХрд╛рд░рдг рд╣реИ рдХрд┐ рд╣рдо рд╕реЙрдлрд╝реНрдЯрд╡реЗрдпрд░ рдЙрддреНрдкрд╛рджреЛрдВ рдХреЛ рдкрд╕рдВрдж рдХрд░рддреЗ рд╣реИрдВ, рдУрд╣, рдЗрд╕рдХреЗ рд▓рд┐рдП рдирд╣реАрдВред рдЗрд╕рдХрд╛ рд╕рдВрд╕реНрдХрд░рдг dtsx (рдЬреЛ XML рд╣реИ рдЬрд┐рд╕рдореЗрдВ рд╕реЗрд╡ рдкрд░ рдиреЛрдбреНрд╕ рдХреЛ рдлреЗрд░рдмрджрд▓ рдХрд┐рдпрд╛ рдЧрдпрд╛ рд╣реИ) рд╣рдо рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ, рд▓реЗрдХрд┐рди рдЗрд╕рдХрд╛ рдорддрд▓рдм рдХреНрдпрд╛ рд╣реИ? рдПрдХ рдХрд╛рд░реНрдп рдкреИрдХреЗрдЬ рдмрдирд╛рдиреЗ рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рдХреНрдпрд╛ рдЦреНрдпрд╛рд▓ рд╣реИ рдЬреЛ рд╕реИрдХрдбрд╝реЛрдВ рддрд╛рд▓рд┐рдХрд╛рдУрдВ рдХреЛ рдПрдХ рд╕рд░реНрд╡рд░ рд╕реЗ рджреВрд╕рд░реЗ рд╕рд░реНрд╡рд░ рддрдХ рдЦреАрдВрдЪ рд▓реЗрдЧрд╛? рд╣рд╛рдБ, рд╕реМ рдХреНрдпрд╛, рдорд╛рдЙрд╕ рдмрдЯрди рдкрд░ рдХреНрд▓рд┐рдХ рдХрд░рддреЗ рд╣реА рдЖрдкрдХреА рддрд░реНрдЬрдиреА рдХреЗ рдмреАрд╕ рдЯреБрдХрдбрд╝реЗ рд╣реЛ рдЬрд╛рдпреЗрдВрдЧреЗред рд▓реЗрдХрд┐рди рдпрд╣ рдирд┐рд╢реНрдЪрд┐рдд рд░реВрдк рд╕реЗ рдЕрдзрд┐рдХ рдлреИрд╢рдиреЗрдмрд▓ рджрд┐рдЦрддрд╛ рд╣реИ:

    Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

рд╣рдордиреЗ рдирд┐рд╢реНрдЪрд┐рдд рд░реВрдк рд╕реЗ рдмрд╛рд╣рд░ рдирд┐рдХрд▓рдиреЗ рдХреЗ рд░рд╛рд╕реНрддреЗ рддрд▓рд╛рд╢реЗред рдорд╛рдорд▓рд╛ рднреА рд▓рдЧрднрдЧ рдПрдХ рд╕реНрд╡-рд▓рд┐рдЦрд┐рдд рдПрд╕рдПрд╕рдЖрдИрдПрд╕ рдкреИрдХреЗрдЬ рдЬрдирд░реЗрдЯрд░ рдЖрдпрд╛ ...

...рдФрд░ рдлрд┐рд░ рдореБрдЭреЗ рдПрдХ рдирдИ рдиреМрдХрд░реА рдорд┐рд▓ рдЧрдИред рдФрд░ Apache Airflow рдЗрд╕ рдорд╛рдорд▓реЗ рдореЗрдВ рдореБрдЭрд╕реЗ рдЖрдЧреЗ рдирд┐рдХрд▓ рдЧрдпрд╛ред

рдЬрдм рдореБрдЭреЗ рдкрддрд╛ рдЪрд▓рд╛ рдХрд┐ рдИрдЯреАрдПрд▓ рдкреНрд░рдХреНрд░рд┐рдпрд╛ рд╡рд┐рд╡рд░рдг рд╕рд░рд▓ рдкрд╛рдпрдерди рдХреЛрдб рд╣реИрдВ, рддреЛ рдореИрдВ рдЦреБрд╢реА рд╕реЗ рдирд╛рдЪ рдирд╣реАрдВ рдкрд╛рдпрд╛ред рдЗрд╕ рдкреНрд░рдХрд╛рд░ рдбреЗрдЯрд╛ рд╕реНрдЯреНрд░реАрдо рдХреЛ рд╕рдВрд╕реНрдХрд░рдгрд┐рдд рдФрд░ рднрд┐рдиреНрди рдХрд┐рдпрд╛ рдЧрдпрд╛, рдФрд░ рд╕реИрдХрдбрд╝реЛрдВ рдбреЗрдЯрд╛рдмреЗрд╕ рд╕реЗ рдПрдХ рд╣реА рд╕рдВрд░рдЪрдирд╛ рдХреЗ рд╕рд╛рде рддрд╛рд▓рд┐рдХрд╛рдУрдВ рдХреЛ рдПрдХ рд▓рдХреНрд╖реНрдп рдореЗрдВ рдбрд╛рд▓рдирд╛ рдбреЗрдврд╝ рдпрд╛ рджреЛ 13 тАЭрд╕реНрдХреНрд░реАрди рдореЗрдВ рдкрд╛рдпрдерди рдХреЛрдб рдХрд╛ рдорд╛рдорд▓рд╛ рдмрди рдЧрдпрд╛ред

рдХреНрд▓рд╕реНрдЯрд░ рдХреЛ рдЕрд╕реЗрдВрдмрд▓ рдХрд░рдирд╛

рдЖрдЗрдП рдкреВрд░реА рддрд░рд╣ рд╕реЗ рдХрд┐рдВрдбрд░рдЧрд╛рд░реНрдЯрди рдХреА рд╡реНрдпрд╡рд╕реНрдерд╛ рди рдХрд░реЗрдВ, рдФрд░ рдпрд╣рд╛рдВ рдкреВрд░реА рддрд░рд╣ рд╕реЗ рд╕реНрдкрд╖реНрдЯ рдЪреАрдЬреЛрдВ рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рдмрд╛рдд рди рдХрд░реЗрдВ, рдЬреИрд╕реЗ рдПрдпрд░рдлреНрд▓реЛ рд╕реНрдерд╛рдкрд┐рдд рдХрд░рдирд╛, рдЖрдкрдХрд╛ рдЪреБрдирд╛ рд╣реБрдЖ рдбреЗрдЯрд╛рдмреЗрд╕, рд╕реЗрд▓реЗрд░реА рдФрд░ рдбреЙрдХ рдореЗрдВ рд╡рд░реНрдгрд┐рдд рдЕрдиреНрдп рдорд╛рдорд▓реЗред

рддрд╛рдХрд┐ рд╣рдо рддреБрд░рдВрдд рдкреНрд░рдпреЛрдЧ рд╢реБрд░реВ рдХрд░ рд╕рдХреЗрдВ, рдореИрдВрдиреЗ рд░реЗрдЦрд╛рдЪрд┐рддреНрд░ рдмрдирд╛рдпрд╛ docker-compose.yml рдЬрд┐рд╕рдореЗрдВ:

  • рдЖрдЗрдП рд╡рд╛рд╕реНрддрд╡ рдореЗрдВ рдмрдврд╝рд╛рдПрдВ Airflow: рд╢реЗрдбреНрдпреВрд▓рд░, рд╡реЗрдмрд╕рд░реНрд╡рд░ред рд╕реЗрд▓реЗрд░реА рдХрд╛рд░реНрдпреЛрдВ рдХреА рдирд┐рдЧрд░рд╛рдиреА рдХреЗ рд▓рд┐рдП рдлреНрд▓рд╛рд╡рд░ рднреА рд╡рд╣рд╛рдВ рдШреВрдореЗрдЧрд╛ (рдХреНрдпреЛрдВрдХрд┐ рдЗрд╕реЗ рдкрд╣рд▓реЗ рд╣реА рдЗрд╕рдореЗрдВ рд╢рд╛рдорд┐рд▓ рдХрд┐рдпрд╛ рдЬрд╛ рдЪреБрдХрд╛ рд╣реИ)ред apache/airflow:1.10.10-python3.7, рд▓реЗрдХрд┐рди рд╣рдореЗрдВ рдХреЛрдИ рдЖрдкрддреНрддрд┐ рдирд╣реАрдВ рд╣реИ)
  • рдкреЛрд╕реНрдЯрдЧреНрд░реЗрдПрд╕рдХреНрдпреВрдПрд▓, рдЬрд┐рд╕рдореЗрдВ рдПрдпрд░рдлрд╝реНрд▓реЛ рдЕрдкрдиреА рд╕реЗрд╡рд╛ рдЬрд╛рдирдХрд╛рд░реА (рдЕрдиреБрд╕реВрдЪрдХ рдбреЗрдЯрд╛, рдирд┐рд╖реНрдкрд╛рджрди рдЖрдБрдХрдбрд╝реЗ, рдЖрджрд┐) рд▓рд┐рдЦреЗрдЧрд╛, рдФрд░ рд╕реЗрд▓реЗрд░реА рдкреВрд░реНрдг рдХрд┐рдП рдЧрдП рдХрд╛рд░реНрдпреЛрдВ рдХреЛ рдЪрд┐рд╣реНрдирд┐рдд рдХрд░реЗрдЧрд╛;
  • Redis, рдЬреЛ рд╕реЗрд▓реЗрд░реА рдХреЗ рд▓рд┐рдП рдПрдХ рдХрд╛рд░реНрдп рджрд▓рд╛рд▓ рдХреЗ рд░реВрдк рдореЗрдВ рдХрд╛рд░реНрдп рдХрд░реЗрдЧрд╛;
  • рдЕрдЬрд╡рд╛рдЗрди рдХрд╛рд░реНрдпрдХрд░реНрддрд╛рдЬреЛ рд╕реАрдзреЗ рдХрд╛рд░реНрдпреЛрдВ рдХреЗ рдирд┐рд╖реНрдкрд╛рджрди рдореЗрдВ рд╕рдВрд▓рдЧреНрди рд╣реЛрдЧрд╛ред
  • рдлреЛрд▓реНрдбрд░ рдХреЛ ./dags рд╣рдо рдбреИрдЧреНрд╕ рдХреЗ рд╡рд┐рд╡рд░рдг рдХреЗ рд╕рд╛рде рдЕрдкрдиреА рдлрд╝рд╛рдЗрд▓реЗрдВ рдЬреЛрдбрд╝реЗрдВрдЧреЗред рдЙрдиреНрд╣реЗрдВ рддреБрд░рдВрдд рдЙрдард╛ рд▓рд┐рдпрд╛ рдЬрд╛рдПрдЧрд╛, рдЗрд╕рд▓рд┐рдП рдкреНрд░рддреНрдпреЗрдХ рдЫреАрдВрдХ рдХреЗ рдмрд╛рдж рдкреВрд░реЗ рдвреЗрд░ рдХреЛ рд╣рдерд┐рдпрд╛рдиреЗ рдХреА рдХреЛрдИ рдЬрд░реВрд░рдд рдирд╣реАрдВ рд╣реИред

рдХреБрдЫ рд╕реНрдерд╛рдиреЛрдВ рдкрд░, рдЙрджрд╛рд╣рд░рдгреЛрдВ рдореЗрдВ рдХреЛрдб рдкреВрд░реА рддрд░рд╣ рд╕реЗ рдирд╣реАрдВ рджрд┐рдЦрд╛рдпрд╛ рдЧрдпрд╛ рд╣реИ (рддрд╛рдХрд┐ рдкрд╛рда рдЕрд╡реНрдпрд╡рд╕реНрдерд┐рдд рди рд╣реЛ), рд▓реЗрдХрд┐рди рдХрд╣реАрдВ-рдХрд╣реАрдВ рдЗрд╕реЗ рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдореЗрдВ рд╕рдВрд╢реЛрдзрд┐рдд рдХрд┐рдпрд╛ рдЧрдпрд╛ рд╣реИред рд╕рдВрдкреВрд░реНрдг рдХрд╛рд░реНрдп рдХреЛрдб рдЙрджрд╛рд╣рд░рдг рд░рд┐рдкреЙрдЬрд┐рдЯрд░реА рдореЗрдВ рдкрд╛рдП рдЬрд╛ рд╕рдХрддреЗ рд╣реИрдВ https://github.com/dm-logv/airflow-tutorial.

рдбреЛрдХрд░-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

рдиреЛрдЯ:

  • рд░рдЪрдирд╛ рдХреЗ рд╕рдВрдпреЛрдЬрди рдореЗрдВ, рдореИрдВ рдХрд╛рдлреА рд╣рдж рддрдХ рд╕реБрдкреНрд░рд╕рд┐рджреНрдз рдЫрд╡рд┐ рдкрд░ рдирд┐рд░реНрднрд░ рдерд╛ рдкрдХреЗрд▓/рдбреЙрдХрд░-рдПрдпрд░рдлреНрд▓реЛ - рдпрд╣ рд╕реБрдирд┐рд╢реНрдЪрд┐рдд рдХрд░реЗрдВ рдХрд┐ рдЖрдкрдиреЗ рдЗрд╕реЗ рджреЗрдЦ рдХрд┐рдпрд╛ред рд╢рд╛рдпрдж рдЖрдкрдХреЛ рдЕрдкрдиреЗ рдЬреАрд╡рди рдореЗрдВ рдХрд┐рд╕реА рдФрд░ рдЪреАрдЬрд╝ рдХреА рдЬрд╝рд░реВрд░рдд рдирд╣реАрдВ рд╣реИред
  • рд╕рднреА рдПрдпрд░рдлреНрд▓реЛ рд╕реЗрдЯрд┐рдВрдЧреНрд╕ рди рдХреЗрд╡рд▓ рдХреЗ рдорд╛рдзреНрдпрдо рд╕реЗ рдЙрдкрд▓рдмреНрдз рд╣реИрдВ airflow.cfg, рд▓реЗрдХрд┐рди рдкрд░реНрдпрд╛рд╡рд░рдг рдЪрд░ рдХреЗ рдорд╛рдзреНрдпрдо рд╕реЗ рднреА (рдбреЗрд╡рд▓рдкрд░реНрд╕ рдХреЛ рдзрдиреНрдпрд╡рд╛рдж), рдЬрд┐рд╕рдХрд╛ рдореИрдВрдиреЗ рджреБрд░реНрднрд╛рд╡рдирд╛рдкреВрд░реНрд╡рдХ рдлрд╛рдпрджрд╛ рдЙрдард╛рдпрд╛ред
  • рд╕реНрд╡рд╛рднрд╛рд╡рд┐рдХ рд░реВрдк рд╕реЗ, рдпрд╣ рдЙрддреНрдкрд╛рджрди рдХреЗ рд▓рд┐рдП рддреИрдпрд╛рд░ рдирд╣реАрдВ рд╣реИ: рдореИрдВрдиреЗ рдЬрд╛рдирдмреВрдЭрдХрд░ рдХрдВрдЯреЗрдирд░реЛрдВ рдкрд░ рджрд┐рд▓ рдХреА рдзрдбрд╝рдХрди рдирд╣реАрдВ рдбрд╛рд▓реА, рдореИрдВрдиреЗ рд╕реБрд░рдХреНрд╖рд╛ рдХреА рдкрд░рд╡рд╛рд╣ рдирд╣реАрдВ рдХреАред рд▓реЗрдХрд┐рди рдореИрдВрдиреЗ рд╣рдорд╛рд░реЗ рдкреНрд░рдпреЛрдЧрдХрд░реНрддрд╛рдУрдВ рдХреЗ рд▓рд┐рдП рдиреНрдпреВрдирддрдо рдЙрдкрдпреБрдХреНрдд рдХрд╛рд░реНрдп рдХрд┐рдпрд╛ред
  • рдзреНрдпрд╛рди рджреЗрдВ рдХрд┐:
    • рдбреЗрдЧ рдлрд╝реЛрд▓реНрдбрд░ рд╢реЗрдбреНрдпреВрд▓рд░ рдФрд░ рд╢реНрд░рдорд┐рдХреЛрдВ рджреЛрдиреЛрдВ рдХреЗ рд▓рд┐рдП рдкрд╣реБрдВрдЪ рдпреЛрдЧреНрдп рд╣реЛрдирд╛ рдЪрд╛рд╣рд┐рдПред
    • рдпрд╣реА рдмрд╛рдд рд╕рднреА рддреГрддреАрдп-рдкрдХреНрд╖ рдкреБрд╕реНрддрдХрд╛рд▓рдпреЛрдВ рдкрд░ рд▓рд╛рдЧреВ рд╣реЛрддреА рд╣реИ - рдЙрди рд╕рднреА рдХреЛ рд╢реЗрдбреНрдпреВрд▓рд░ рдФрд░ рд╢реНрд░рдорд┐рдХреЛрдВ рд╡рд╛рд▓реА рдорд╢реАрдиреЛрдВ рдкрд░ рд╕реНрдерд╛рдкрд┐рдд рдХрд┐рдпрд╛ рдЬрд╛рдирд╛ рдЪрд╛рд╣рд┐рдПред

рдЦреИрд░, рдЕрдм рдпрд╣ рдЖрд╕рд╛рди рд╣реИ:

$ docker-compose up --scale worker=3

рд╕рдм рдХреБрдЫ рд╕рд╛рдордиреЗ рдЖрдиреЗ рдХреЗ рдмрд╛рдж, рдЖрдк рд╡реЗрдм рдЗрдВрдЯрд░рдлреЗрд╕ рджреЗрдЦ рд╕рдХрддреЗ рд╣реИрдВ:

рдмреБрдирд┐рдпрд╛рджреА рдЕрд╡рдзрд╛рд░рдгрд╛рдУрдВ

рдпрджрд┐ рдЖрдкрдХреЛ рдЗрди рд╕рднреА "рдбреИрдЧреНрд╕" рдореЗрдВ рдХреБрдЫ рднреА рд╕рдордЭ рдореЗрдВ рдирд╣реАрдВ рдЖрдпрд╛, рддреЛ рдпрд╣рд╛рдВ рдПрдХ рд╕рдВрдХреНрд╖рд┐рдкреНрдд рд╢рдмреНрджрдХреЛрд╢ рд╣реИ:

  • рд╕рдордпрдмрджреНрдзрдХ - рдПрдпрд░рдлрд╝реНрд▓реЛ рдореЗрдВ рд╕рдмрд╕реЗ рдорд╣рддреНрд╡рдкреВрд░реНрдг рдЪрд╛рдЪрд╛, рдЬреЛ рдирд┐рдпрдВрддреНрд░рд┐рдд рдХрд░рддреЗ рд╣реИрдВ рдХрд┐ рд░реЛрдмреЛрдЯ рдХрдбрд╝реА рдореЗрд╣рдирдд рдХрд░рддреЗ рд╣реИрдВ, рди рдХрд┐ рдПрдХ рд╡реНрдпрдХреНрддрд┐: рд╢реЗрдбреНрдпреВрд▓ рдкрд░ рдирдЬрд╝рд░ рд░рдЦрддрд╛ рд╣реИ, рдбреИрдЧ рдЕрдкрдбреЗрдЯ рдХрд░рддрд╛ рд╣реИ, рдХрд╛рд░реНрдп рд▓реЙрдиреНрдЪ рдХрд░рддрд╛ рд╣реИред

    рд╕рд╛рдорд╛рдиреНрдп рддреМрд░ рдкрд░, рдкреБрд░рд╛рдиреЗ рд╕рдВрд╕реНрдХрд░рдгреЛрдВ рдореЗрдВ, рдЙрдиреНрд╣реЗрдВ рдореЗрдореЛрд░реА рдХреА рд╕рдорд╕реНрдпрд╛ рдереА (рдирд╣реАрдВ, рднреВрд▓рдиреЗ рдХреА рдмреАрдорд╛рд░реА рдирд╣реАрдВ, рдмрд▓реНрдХрд┐ рд▓реАрдХ) рдФрд░ рд▓реАрдЧреЗрд╕реА рдкреИрд░рд╛рдореАрдЯрд░ рдХреЙрдиреНрдлрд╝рд┐рдЧрд░реЗрд╢рди рдореЗрдВ рднреА рдмрдирд╛ рд░рд╣рд╛ run_duration - рдЗрд╕рдХрд╛ рдкреБрдирд░рд╛рд░рдВрдн рдЕрдВрддрд░рд╛рд▓ред рд▓реЗрдХрд┐рди рдЕрдм рд╕рдм рдХреБрдЫ рдареАрдХ рд╣реИ.

  • рдбреЗрдЧ (рдЙрд░реНрдл "рдбреЗрдЧ") - "рдирд┐рд░реНрджреЗрд╢рд┐рдд рдПрд╕рд╛рдЗрдХреНрд▓рд┐рдХ рдЧреНрд░рд╛рдл", рд▓реЗрдХрд┐рди рдРрд╕реА рдкрд░рд┐рднрд╛рд╖рд╛ рдХреБрдЫ рд▓реЛрдЧреЛрдВ рдХреЛ рдмрддрд╛рдПрдЧреА, рд▓реЗрдХрд┐рди рд╡рд╛рд╕реНрддрд╡ рдореЗрдВ рдпрд╣ рдПрдХ рджреВрд╕рд░реЗ рдХреЗ рд╕рд╛рде рдмрд╛рддрдЪреАрдд рдХрд░рдиреЗ рд╡рд╛рд▓реЗ рдХрд╛рд░реНрдпреЛрдВ рдХреЗ рд▓рд┐рдП рдПрдХ рдХрдВрдЯреЗрдирд░ рд╣реИ (рдиреАрдЪреЗ рджреЗрдЦреЗрдВ) рдпрд╛ рдПрд╕рдПрд╕рдЖрдИрдПрд╕ рдореЗрдВ рдкреИрдХреЗрдЬ рдФрд░ рдЗрдВрдлреЙрд░реНрдореЗрдЯрд┐рдХрд╛ рдореЗрдВ рд╡рд░реНрдХрдлрд╝реНрд▓реЛ рдХрд╛ рдПрдХ рдПрдирд╛рд▓реЙрдЧ .

    рдбреИрдЧ рдХреЗ рдЕрд▓рд╛рд╡рд╛, рдЕрднреА рднреА рд╕рдмрдбреИрдЧ рд╣реЛ рд╕рдХрддреЗ рд╣реИрдВ, рд▓реЗрдХрд┐рди рд╕рдмрд╕реЗ рдЕрдзрд┐рдХ рд╕рдВрднрд╛рд╡рдирд╛ рд╣реИ рдХрд┐ рд╣рдо рдЙрди рддрдХ рдирд╣реАрдВ рдкрд╣реБрдВрдЪ рдкрд╛рдПрдВрдЧреЗред

  • рдбреАрдПрдЬреА рд░рди - рдкреНрд░рд╛рд░рдВрднрд┐рдХ рдбреИрдЧ, рдЬрд┐рд╕реЗ рдЕрдкрдирд╛ рд╕реНрд╡рдпрдВ рдХрд╛ рдЕрд╕рд╛рдЗрди рдХрд┐рдпрд╛ рдЧрдпрд╛ рд╣реИ execution_date. рдПрдХ рд╣реА рджрд┐рди рдХреЗ рдбреИрдЧреНрд░реЗрди рд╕рдорд╛рдирд╛рдВрддрд░ рдореЗрдВ рдХрд╛рдо рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ (рдпрджрд┐ рдЖрдкрдиреЗ рдЕрдкрдиреЗ рдХрд╛рд░реНрдпреЛрдВ рдХреЛ рдирд┐рд╖реНрдХреНрд░рд┐рдп рдмрдирд╛ рджрд┐рдпрд╛ рд╣реИ, рддреЛ рдирд┐рд╢реНрдЪрд┐рдд рд░реВрдк рд╕реЗ)ред
  • рдСрдкрд░реЗрдЯрд░ рдХрд┐рд╕реА рд╡рд┐рд╢рд┐рд╖реНрдЯ рдХрд╛рд░реНрдп рдХреЛ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдЬрд┐рдореНрдореЗрджрд╛рд░ рдХреЛрдб рдХреЗ рдЯреБрдХрдбрд╝реЗ рд╣реИрдВред рдСрдкрд░реЗрдЯрд░ рддреАрди рдкреНрд░рдХрд╛рд░ рдХреЗ рд╣реЛрддреЗ рд╣реИрдВ:
    • рдХрд╛рд░реНрдпрд╣рдорд╛рд░реЗ рдкрд╕рдВрджреАрджрд╛ рдХреА рддрд░рд╣ PythonOperator, рдЬреЛ рдХрд┐рд╕реА рднреА (рд╡реИрдз) рдкрд╛рдпрдерди рдХреЛрдб рдХреЛ рдирд┐рд╖реНрдкрд╛рджрд┐рдд рдХрд░ рд╕рдХрддрд╛ рд╣реИ;
    • рд╕реНрдерд╛рдирд╛рдВрддрд░рдг, рдЬреЛ рдбреЗрдЯрд╛ рдХреЛ рдПрдХ рд╕реНрдерд╛рди рд╕реЗ рджреВрд╕рд░реЗ рд╕реНрдерд╛рди рддрдХ рдкрд╣реБрдБрдЪрд╛рддрд╛ рд╣реИ, рдорд╛рди рд▓реАрдЬрд┐рдП, MsSqlToHiveTransfer;
    • рд╕реЗрдВрд╕рд░ рджреВрд╕рд░реА рдУрд░, рдпрд╣ рдЖрдкрдХреЛ рдкреНрд░рддрд┐рдХреНрд░рд┐рдпрд╛ рджреЗрдиреЗ рдпрд╛ рдХрд┐рд╕реА рдШрдЯрдирд╛ рдХреЗ рдШрдЯрд┐рдд рд╣реЛрдиреЗ рддрдХ рдбреИрдЧ рдХреЗ рдЖрдЧреЗ рдХреЗ рдирд┐рд╖реНрдкрд╛рджрди рдХреЛ рдзреАрдорд╛ рдХрд░рдиреЗ рдХреА рдЕрдиреБрдорддрд┐ рджреЗрдЧрд╛ред HttpSensor рдирд┐рд░реНрджрд┐рд╖реНрдЯ рд╕рдорд╛рдкрди рдмрд┐рдВрджреБ рдХреЛ рдЦреАрдВрдЪ рд╕рдХрддрд╛ рд╣реИ, рдФрд░ рдЬрдм рд╡рд╛рдВрдЫрд┐рдд рдкреНрд░рддрд┐рдХреНрд░рд┐рдпрд╛ рдкреНрд░рддреАрдХреНрд╖рд╛ рдХрд░ рд░рд╣реА рд╣реЛ, рддреЛ рд╕реНрдерд╛рдирд╛рдВрддрд░рдг рдкреНрд░рд╛рд░рдВрдн рдХрд░реЗрдВ GoogleCloudStorageToS3Operator. рдПрдХ рдЬрд┐рдЬреНрдЮрд╛рд╕реБ рдорди рдкреВрдЫреЗрдЧрд╛: тАЬрдХреНрдпреЛрдВ? рдЖрдЦрд╝рд┐рд░рдХрд╛рд░, рдЖрдк рд╕реАрдзреЗ рдСрдкрд░реЗрдЯрд░ рдореЗрдВ рд╣реА рджреЛрд╣рд░рд╛рд╡ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ!тАЭ рдФрд░ рдлрд┐рд░, рдирд┐рд▓рдВрдмрд┐рдд рдСрдкрд░реЗрдЯрд░реЛрдВ рдХреЗ рд╕рд╛рде рдХрд╛рд░реНрдпреЛрдВ рдХреЗ рдкреВрд▓ рдХреЛ рдЕрд╡рд░реБрджреНрдз рди рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдПред рд╕реЗрдВрд╕рд░ рд╢реБрд░реВ рд╣реЛрддрд╛ рд╣реИ, рдЬрд╛рдБрдЪ рдХрд░рддрд╛ рд╣реИ рдФрд░ рдЕрдЧрд▓реЗ рдкреНрд░рдпрд╛рд╕ рд╕реЗ рдкрд╣рд▓реЗ рдЦрд╝рддреНрдо рд╣реЛ рдЬрд╛рддрд╛ рд╣реИред
  • рдХрд╛рд░реНрдп - рдШреЛрд╖рд┐рдд рдСрдкрд░реЗрдЯрд░реЛрдВ рдХреЛ, рдкреНрд░рдХрд╛рд░ рдХреА рдкрд░рд╡рд╛рд╣ рдХрд┐рдП рдмрд┐рдирд╛, рдФрд░ рдбреИрдЧ рд╕реЗ рдЬреБрдбрд╝реЗ рд▓реЛрдЧреЛрдВ рдХреЛ рдХрд╛рд░реНрдп рдХреЗ рдкрдж рдкрд░ рдкрджреЛрдиреНрдирдд рдХрд┐рдпрд╛ рдЬрд╛рддрд╛ рд╣реИред
  • рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдг - рдЬрдм рд╕рд╛рдорд╛рдиреНрдп рдпреЛрдЬрдирд╛рдХрд╛рд░ рдиреЗ рдирд┐рд░реНрдгрдп рд▓рд┐рдпрд╛ рдХрд┐ рдЕрдм рдХрд╛рд░реНрдпреЛрдВ рдХреЛ рдирд┐рд╖реНрдкрд╛рджрдХ-рд╢реНрд░рдорд┐рдХреЛрдВ рдкрд░ рднреЗрдЬрдиреЗ рдХрд╛ рд╕рдордп рдЖ рдЧрдпрд╛ рд╣реИ (рд╕рд╣реА рдореМрдХреЗ рдкрд░, рдпрджрд┐ рд╣рдо рдЗрд╕рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рддреЗ рд╣реИрдВ) LocalExecutor рдпрд╛ рдХрд┐рд╕реА рджреВрд░рд╕реНрде рдиреЛрдб рдХреЗ рдорд╛рдорд▓реЗ рдореЗрдВ CeleryExecutor), рдпрд╣ рдЙрдиреНрд╣реЗрдВ рдПрдХ рд╕рдВрджрд░реНрдн рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХрд░рддрд╛ рд╣реИ (рдпрд╛рдиреА, рдЪрд░ рдХрд╛ рдПрдХ рд╕реЗрдЯ - рдирд┐рд╖реНрдкрд╛рджрди рдкреИрд░рд╛рдореАрдЯрд░), рдХрдорд╛рдВрдб рдпрд╛ рдХреНрд╡реЗрд░реА рдЯреЗрдореНрдкрд▓реЗрдЯреНрд╕ рдХрд╛ рд╡рд┐рд╕реНрддрд╛рд░ рдХрд░рддрд╛ рд╣реИ, рдФрд░ рдЙрдиреНрд╣реЗрдВ рдкреВрд▓ рдХрд░рддрд╛ рд╣реИред

рд╣рдо рдХрд╛рд░реНрдп рдЙрддреНрдкрдиреНрди рдХрд░рддреЗ рд╣реИрдВ

рд╕рдмрд╕реЗ рдкрд╣рд▓реЗ, рдЖрдЗрдП рд╣рдорд╛рд░реЗ рдбреМрдЧ рдХреА рд╕рд╛рдорд╛рдиреНрдп рдпреЛрдЬрдирд╛ рдХреА рд░реВрдкрд░реЗрдЦрд╛ рддреИрдпрд╛рд░ рдХрд░реЗрдВ, рдФрд░ рдлрд┐рд░ рд╣рдо рдЕрдзрд┐рдХ рд╕реЗ рдЕрдзрд┐рдХ рд╡рд┐рд╡рд░рдгреЛрдВ рдореЗрдВ рдЙрддрд░реЗрдВрдЧреЗ, рдХреНрдпреЛрдВрдХрд┐ рд╣рдо рдХреБрдЫ рдЧреИрд░-рддреБрдЪреНрдЫ рд╕рдорд╛рдзрд╛рди рд▓рд╛рдЧреВ рдХрд░рддреЗ рд╣реИрдВред

рддреЛ, рдЕрдкрдиреЗ рд╕рд░рд▓рддрдо рд░реВрдк рдореЗрдВ, рдРрд╕рд╛ рдбреИрдЧ рдЗрд╕ рддрд░рд╣ рджрд┐рдЦреЗрдЧрд╛:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

рдЖрдЗрдП рдЗрд╕рдХрд╛ рдкрддрд╛ рд▓рдЧрд╛рдПрдВ:

  • рд╕рдмрд╕реЗ рдкрд╣рд▓реЗ, рд╣рдо рдЖрд╡рд╢реНрдпрдХ libs рдЖрдпрд╛рдд рдХрд░рддреЗ рд╣реИрдВ рдФрд░ рдХреБрдЫ рдФрд░;
  • sql_server_ds - рдХреНрдпрд╛ List[namedtuple[str, str]] рдПрдпрд░рдлреНрд▓реЛ рдХрдиреЗрдХреНрд╢рдВрд╕ рд╕реЗ рдХрдиреЗрдХреНрд╢рдиреЛрдВ рдХреЗ рдирд╛рдо рдФрд░ рдЙрди рдбреЗрдЯрд╛рдмреЗрд╕ рдХреЗ рд╕рд╛рде рдЬрд┐рдирд╕реЗ рд╣рдо рдЕрдкрдиреА рдкреНрд▓реЗрдЯ рд▓реЗрдВрдЧреЗ;
  • dag - рд╣рдорд╛рд░реЗ рдбреИрдЧ рдХреА рдШреЛрд╖рдгрд╛, рдЬреЛ рдЕрдирд┐рд╡рд╛рд░реНрдп рд░реВрдк рд╕реЗ рд╣реЛрдиреА рдЪрд╛рд╣рд┐рдП globals(), рдЕрдиреНрдпрдерд╛ рдПрдпрд░рдлрд╝реНрд▓реЛ рдЗрд╕реЗ рдирд╣реАрдВ рдвреВрдВрдв рдкрд╛рдПрдЧрд╛ред рдбреМрдЧ рдХреЛ рдпрд╣ рднреА рдХрд╣рдирд╛ рд╣реЛрдЧрд╛:
    • рдЗрд╕рдХрд╛ рдирд╛рдо рдХреНрдпрд╛ рд╣реИ orders - рдпрд╣ рдирд╛рдо рддрдм рд╡реЗрдм рдЗрдВрдЯрд░рдлрд╝реЗрд╕ рдореЗрдВ рджрд┐рдЦрд╛рдИ рджреЗрдЧрд╛,
    • рдХрд┐ рд╡рд╣ рдЖрда рдЬреБрд▓рд╛рдИ рдХреА рдЖрдзреА рд░рд╛рдд рд╕реЗ рдХрд╛рдо рдХрд░реЗрдВрдЧреЗ,
    • рдФрд░ рдЗрд╕реЗ рд▓рдЧрднрдЧ рд╣рд░ 6 рдШрдВрдЯреЗ рдореЗрдВ рдЪрд▓рдирд╛ рдЪрд╛рд╣рд┐рдП (рдпрд╣рд╛рдБ рдЗрд╕рдХреЗ рдмрдЬрд╛рдп рд╕рдЦреНрдд рд▓реЛрдЧреЛрдВ рдХреЗ рд▓рд┐рдП)ред timedelta() рд╕реНрд╡реАрдХрд╛рд░реНрдп cron-рдкрдВрдХреНрддрд┐ 0 0 0/6 ? * * *, рдХрдо рд╢рд╛рдВрдд рдХреЗ рд▓рд┐рдП - рдПрдХ рдЕрднрд┐рд╡реНрдпрдХреНрддрд┐ рдХреА рддрд░рд╣ @daily);
  • workflow() рдореБрдЦреНрдп рдХрд╛рдо рдХрд░реЗрдВрдЧреЗ, рд▓реЗрдХрд┐рди рдЕрднреА рдирд╣реАрдВ. рдЕрднреА рдХреЗ рд▓рд┐рдП, рд╣рдо рдЕрдкрдирд╛ рд╕рдВрджрд░реНрдн рдХреЗрд╡рд▓ рд▓реЙрдЧ рдореЗрдВ рдбрд╛рд▓реЗрдВрдЧреЗред
  • рдФрд░ рдЕрдм рдХрд╛рд░реНрдп рдмрдирд╛рдиреЗ рдХрд╛ рд╕рд░рд▓ рдЬрд╛рджреВ:
    • рд╣рдо рдЕрдкрдиреЗ рд╕реНрд░реЛрддреЛрдВ рд╕реЗ рдЪрд▓рддреЗ рд╣реИрдВ;
    • рдЗрдирд┐рд╢рд┐рдпрд▓рд╛рдЗрдЬрд╝ PythonOperator, рдЬреЛ рд╣рдорд╛рд░реА рдбрдореА рдХреЛ рдирд┐рд╖реНрдкрд╛рджрд┐рдд рдХрд░реЗрдЧрд╛ workflow(). рдХрд╛рд░реНрдп рдХрд╛ рдПрдХ рдЕрджреНрд╡рд┐рддреАрдп (рдбреЗрдЧ рдХреЗ рднреАрддрд░) рдирд╛рдо рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХрд░рдирд╛ рдФрд░ рдбреЗрдЧ рдХреЛ рд╕реНрд╡рдпрдВ рдмрд╛рдВрдзрдирд╛ рди рднреВрд▓реЗрдВред рдЭрдВрдбрд╛ provide_context рдмрджрд▓реЗ рдореЗрдВ, рдлрд╝рдВрдХреНрд╢рди рдореЗрдВ рдЕрддрд┐рд░рд┐рдХреНрдд рддрд░реНрдХ рдбрд╛рд▓реЗрдВрдЧреЗ, рдЬрд┐рдиреНрд╣реЗрдВ рд╣рдо рд╕рд╛рд╡рдзрд╛рдиреАрдкреВрд░реНрд╡рдХ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рдПрдХрддреНрд░ рдХрд░реЗрдВрдЧреЗ **context.

рдЕрднреА рдХреЗ рд▓рд┐рдП, рдмрд╕ рдЗрддрдирд╛ рд╣реАред рд╣рдореЗрдВ рдХреНрдпрд╛ рдорд┐рд▓рд╛:

  • рд╡реЗрдм рдЗрдВрдЯрд░рдлрд╝реЗрд╕ рдореЗрдВ рдирдпрд╛ рдбреЗрдЧ,
  • рдбреЗрдврд╝ рд╕реМ рдХрд╛рд░реНрдп рдЬреЛ рд╕рдорд╛рдирд╛рдВрддрд░ рдореЗрдВ рдирд┐рд╖реНрдкрд╛рджрд┐рдд рдХрд┐рдП рдЬрд╛рдПрдВрдЧреЗ (рдпрджрд┐ рдПрдпрд░рдлреНрд▓реЛ, рд╕реЗрд▓реЗрд░реА рд╕реЗрдЯрд┐рдВрдЧреНрд╕ рдФрд░ рд╕рд░реНрд╡рд░ рдХреНрд╖рдорддрд╛ рдЗрд╕рдХреА рдЕрдиреБрдорддрд┐ рджреЗрддреА рд╣реИ)ред

рдЦрд╝реИрд░, рд▓рдЧрднрдЧ рдорд┐рд▓ рдЧрдпрд╛ред

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛
рдирд┐рд░реНрднрд░рддрд╛рдПрдБ рдХреМрди рд╕реНрдерд╛рдкрд┐рдд рдХрд░реЗрдЧрд╛?

рдЗрд╕ рдкреВрд░реА рдЪреАрдЬрд╝ рдХреЛ рд╕рд░рд▓ рдмрдирд╛рдиреЗ рдХреЗ рд▓рд┐рдП, рдореИрдВрдиреЗ рдЗрд╕рдореЗрдВ рдЧрдбрд╝рдмрдбрд╝реА рдХреА docker-compose.yml рдкреНрд░рд╕рдВрд╕реНрдХрд░рдг requirements.txt рд╕рднреА рдиреЛрдбреНрд╕ рдкрд░.

рдЕрдм рдпрд╣ рдЪрд▓рд╛ рдЧрдпрд╛ рд╣реИ:

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

рдЧреНрд░реЗ рд╡рд░реНрдЧ рд╢реЗрдбреНрдпреВрд▓рд░ рджреНрд╡рд╛рд░рд╛ рд╕рдВрд╕рд╛рдзрд┐рдд рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдг рд╣реИрдВред

рд╣рдо рдереЛрдбрд╝рд╛ рдЗрдВрддрдЬрд╛рд░ рдХрд░рддреЗ рд╣реИрдВ, рдХрд╛рд░реНрдп рд╢реНрд░рдорд┐рдХреЛрдВ рджреНрд╡рд╛рд░рд╛ рдирд┐рдкрдЯрд╛ рд▓рд┐рдП рдЬрд╛рддреЗ рд╣реИрдВ:

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

рдмреЗрд╢рдХ, рд╣рд░реЗ рд▓реЛрдЧреЛрдВ рдиреЗ рдЕрдкрдирд╛ рдХрд╛рдо рд╕рдлрд▓рддрд╛рдкреВрд░реНрд╡рдХ рдкреВрд░рд╛ рдХрд░ рд▓рд┐рдпрд╛ рд╣реИред рд░реЗрдбреНрд╕ рдмрд╣реБрдд рд╕рдлрд▓ рдирд╣реАрдВ рд╣реИрдВ.

рд╡реИрд╕реЗ, рд╣рдорд╛рд░реЗ рдЙрддреНрдкрд╛рдж рдкрд░ рдХреЛрдИ рдлрд╝реЛрд▓реНрдбрд░ рдирд╣реАрдВ рд╣реИ ./dags, рдорд╢реАрдиреЛрдВ рдХреЗ рдмреАрдЪ рдХреЛрдИ рддрд╛рд▓рдореЗрд▓ рдирд╣реАрдВ рд╣реИ - рд╕рднреА рдбреИрдЧ рдЭреВрда рдмреЛрд▓рддреЗ рд╣реИрдВ git рд╣рдорд╛рд░реЗ Gitlab рдкрд░, рдФрд░ Gitlab CI рд╡рд┐рд▓рдп рд╣реЛрдиреЗ рдкрд░ рдорд╢реАрдиреЛрдВ рдХреЛ рдЕрдкрдбреЗрдЯ рд╡рд┐рддрд░рд┐рдд рдХрд░рддрд╛ рд╣реИ master.

рдлреВрд▓ рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рдереЛрдбрд╝рд╛

рдЬрдмрдХрд┐ рдХрд╛рд░реНрдпрдХрд░реНрддрд╛ рд╣рдорд╛рд░реЗ рд╢рд╛рдВрддрдЪрд┐рддреНрддреЛрдВ рдХреЛ рдкреАрдЯ рд░рд╣реЗ рд╣реИрдВ, рдЖрдЗрдП рдПрдХ рдФрд░ рдЙрдкрдХрд░рдг рдпрд╛рдж рд░рдЦреЗрдВ рдЬреЛ рд╣рдореЗрдВ рдХреБрдЫ рджрд┐рдЦрд╛ рд╕рдХрддрд╛ рд╣реИ - рдлреВрд▓ред

рдХрд╛рд░реНрдпрдХрд░реНрддрд╛ рдиреЛрдбреНрд╕ рдкрд░ рд╕рд╛рд░рд╛рдВрд╢ рдЬрд╛рдирдХрд╛рд░реА рд╡рд╛рд▓рд╛ рдкрд╣рд▓рд╛ рдкреГрд╖реНрда:

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

рдХрд╛рдо рдкрд░ рдЧрдП рдХрд╛рд░реНрдпреЛрдВ рд╡рд╛рд▓рд╛ рд╕рдмрд╕реЗ рдЧрд╣рди рдкреГрд╖реНрда:

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

рд╣рдорд╛рд░реЗ рдмреНрд░реЛрдХрд░ рдХреА рд╕реНрдерд┐рддрд┐ рд╡рд╛рд▓рд╛ рд╕рдмрд╕реЗ рдЙрдмрд╛рдК рдкреЗрдЬ:

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

рд╕рдмрд╕реЗ рдЪрдордХрджрд╛рд░ рдкреГрд╖реНрда рдХрд╛рд░реНрдп рд╕реНрдерд┐рддрд┐ рдЧреНрд░рд╛рдлрд╝ рдФрд░ рдЙрдирдХреЗ рдирд┐рд╖реНрдкрд╛рджрди рд╕рдордп рдХреЗ рд╕рд╛рде рд╣реИ:

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

рд╣рдо рдЕрдВрдбрд░рд▓реЛрдбреЗрдб рд▓реЛрдб рдХрд░рддреЗ рд╣реИрдВ

рддреЛ, рд╕рднреА рдХрд╛рд░реНрдп рдкреВрд░реЗ рд╣реЛ рдЧрдП рд╣реИрдВ, рдЖрдк рдШрд╛рдпрд▓реЛрдВ рдХреЛ рд▓реЗ рдЬрд╛ рд╕рдХрддреЗ рд╣реИрдВред

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

рдФрд░ рдмрд╣реБрдд рд╕реЗ рд▓реЛрдЧ рдШрд╛рдпрд▓ рд╣реБрдП - рдХрд┐рд╕реА рди рдХрд┐рд╕реА рдХрд╛рд░рдг рд╕реЗред рдПрдпрд░рдлрд╝реНрд▓реЛ рдХреЗ рд╕рд╣реА рдЙрдкрдпреЛрдЧ рдХреЗ рдорд╛рдорд▓реЗ рдореЗрдВ, рдпреЗ рд╡рд╣реА рд╡рд░реНрдЧ рдЗрдВрдЧрд┐рдд рдХрд░рддреЗ рд╣реИрдВ рдХрд┐ рдбреЗрдЯрд╛ рдирд┐рд╢реНрдЪрд┐рдд рд░реВрдк рд╕реЗ рдирд╣реАрдВ рдЖрдпрд╛ рд╣реИред

рдЖрдкрдХреЛ рд▓реЙрдЧ рджреЗрдЦрдиреЗ рдФрд░ рдЧрд┐рд░реЗ рд╣реБрдП рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдгреЛрдВ рдХреЛ рдкреБрдирдГ рдЖрд░рдВрдн рдХрд░рдиреЗ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИред

рдХрд┐рд╕реА рднреА рд╡рд░реНрдЧ рдкрд░ рдХреНрд▓рд┐рдХ рдХрд░рдХреЗ, рд╣рдо рд╣рдорд╛рд░реЗ рд▓рд┐рдП рдЙрдкрд▓рдмреНрдз рдХреНрд░рд┐рдпрд╛рдПрдБ рджреЗрдЦреЗрдВрдЧреЗ:

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

рдЖрдк рдЧрд┐рд░реЗ рд╣реБрдП рдХреЛ рд▓реЗ рд╕рдХрддреЗ рд╣реИрдВ рдФрд░ рд╕рд╛рдлрд╝ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВред рдпрд╛рдиреА рд╣рдо рднреВрд▓ рдЬрд╛рддреЗ рд╣реИрдВ рдХрд┐ рд╡рд╣рд╛рдВ рдХреБрдЫ рдлреЗрд▓ рд╣реЛ рдЧрдпрд╛ рд╣реИ рдФрд░ рд╡рд╣реА рдЗрдВрд╕реНрдЯреЗрдВрд╕ рдЯрд╛рд╕реНрдХ рд╢реЗрдбреНрдпреВрд▓рд░ рдХреЗ рдкрд╛рд╕ рдЪрд▓рд╛ рдЬрд╛рдПрдЧрд╛ред

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

рдпрд╣ рд╕реНрдкрд╖реНрдЯ рд╣реИ рдХрд┐ рд╕рднреА рд▓рд╛рд▓ рд╡рд░реНрдЧреЛрдВ рдХреЗ рд╕рд╛рде рдорд╛рдЙрд╕ рдХреЗ рд╕рд╛рде рдРрд╕рд╛ рдХрд░рдирд╛ рдмрд╣реБрдд рдорд╛рдирд╡реАрдп рдирд╣реАрдВ рд╣реИ - рдпрд╣ рд╡рд╣ рдирд╣реАрдВ рд╣реИ рдЬреЛ рд╣рдо рдПрдпрд░рдлреНрд▓реЛ рд╕реЗ рдЙрдореНрдореАрдж рдХрд░рддреЗ рд╣реИрдВред рд╕реНрд╡рд╛рднрд╛рд╡рд┐рдХ рд░реВрдк рд╕реЗ, рд╣рдорд╛рд░реЗ рдкрд╛рд╕ рд╕рд╛рдореВрд╣рд┐рдХ рд╡рд┐рдирд╛рд╢ рдХреЗ рд╣рдерд┐рдпрд╛рд░ рд╣реИрдВ: Browse/Task Instances

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

рдЖрдЗрдП рдПрдХ рдмрд╛рд░ рдореЗрдВ рд╕рдм рдХреБрдЫ рдЪреБрдиреЗрдВ рдФрд░ рд╢реВрдиреНрдп рдкрд░ рд░реАрд╕реЗрдЯ рдХрд░реЗрдВ, рд╕рд╣реА рдЖрдЗрдЯрдо рдкрд░ рдХреНрд▓рд┐рдХ рдХрд░реЗрдВ:

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

рд╕рдлрд╛рдИ рдХреЗ рдмрд╛рдж, рд╣рдорд╛рд░реА рдЯреИрдХреНрд╕рд┐рдпрд╛рдБ рдЗрд╕ рддрд░рд╣ рджрд┐рдЦрддреА рд╣реИрдВ (рд╡реЗ рдкрд╣рд▓реЗ рд╕реЗ рд╣реА рд╢реЗрдбреНрдпреВрд▓рд░ рджреНрд╡рд╛рд░рд╛ рдЙрдиреНрд╣реЗрдВ рд╢реЗрдбреНрдпреВрд▓ рдХрд░рдиреЗ рдХреА рдкреНрд░рддреАрдХреНрд╖рд╛ рдХрд░ рд░рд╣реА рд╣реИрдВ):

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

рдХрдиреЗрдХреНрд╢рди, рд╣реБрдХ рдФрд░ рдЕрдиреНрдп рдЪрд░

рдЕрдм рдЕрдЧрд▓реЗ рдбреАрдПрдЬреА рдХреЛ рджреЗрдЦрдиреЗ рдХрд╛ рд╕рдордп рдЖ рдЧрдпрд╛ рд╣реИ, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""╨У╨╛╤Б╨┐╨╛╨┤╨░ ╤Е╨╛╤А╨╛╤И╨╕╨╡, ╨╛╤В╤З╨╡╤В╤Л ╨╛╨▒╨╜╨╛╨▓╨╗╨╡╨╜╤Л"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ╨Э╨░╤В╨░╤И, ╨┐╤А╨╛╤Б╤Л╨┐╨░╨╣╤Б╤П, ╨╝╤Л {{ dag.dag_id }} ╤Г╤А╨╛╨╜╨╕╨╗╨╕
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

рдХреНрдпрд╛ рд╣рд░ рдХрд┐рд╕реА рдиреЗ рдХрднреА рд░рд┐рдкреЛрд░реНрдЯ рдЕрдкрдбреЗрдЯ рдХрд┐рдпрд╛ рд╣реИ? рдпрд╣ рдлрд┐рд░ рд╕реЗ рд╣реИ: рдЙрди рд╕реНрд░реЛрддреЛрдВ рдХреА рдПрдХ рд╕реВрдЪреА рд╣реИ рдЬрд╣рд╛рдВ рд╕реЗ рдбреЗрдЯрд╛ рдкреНрд░рд╛рдкреНрдд рдХрд┐рдпрд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИ; рдХрд╣рд╛рдВ рд░рдЦрдирд╛ рд╣реИ рдЗрд╕рдХреА рдПрдХ рд╕реВрдЪреА рд╣реИ; рдЬрдм рд╕рдм рдХреБрдЫ рдШрдЯрд┐рдд рд╣реЛ рдпрд╛ рдЯреВрдЯ рдЬрд╛рдП рддреЛ рд╣реЙрд░реНрди рдмрдЬрд╛рдирд╛ рди рднреВрд▓реЗрдВ (рдЦреИрд░, рдпрд╣ рд╣рдорд╛рд░реЗ рдмрд╛рд░реЗ рдореЗрдВ рдирд╣реАрдВ рд╣реИ, рдирд╣реАрдВ)ред

рдЖрдЗрдП рдлрд╝рд╛рдЗрд▓ рдХреЛ рдлрд┐рд░ рд╕реЗ рджреЗрдЦреЗрдВ рдФрд░ рдирдИ рдЕрд╕реНрдкрд╖реНрдЯ рд╕рд╛рдордЧреНрд░реА рджреЗрдЦреЗрдВ:

  • from commons.operators import TelegramBotSendMessage - рд╣рдореЗрдВ рдЕрдкрдирд╛ рд╕реНрд╡рдпрдВ рдХрд╛ рдСрдкрд░реЗрдЯрд░ рдмрдирд╛рдиреЗ рд╕реЗ рдХреЛрдИ рдирд╣реАрдВ рд░реЛрдХрддрд╛, рдЬрд┐рд╕рдХрд╛ рд▓рд╛рдн рд╣рдордиреЗ рдЕрдирдмреНрд▓реЙрдХреНрдб рдкрд░ рд╕рдВрджреЗрд╢ рднреЗрдЬрдиреЗ рдХреЗ рд▓рд┐рдП рдПрдХ рдЫреЛрдЯрд╛ рд╕рд╛ рд░реИрдкрд░ рдмрдирд╛рдХрд░ рдЙрдард╛рдпрд╛ред (рд╣рдо рдиреАрдЪреЗ рдЗрд╕ рдСрдкрд░реЗрдЯрд░ рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рдЕрдзрд┐рдХ рдмрд╛рдд рдХрд░реЗрдВрдЧреЗ);
  • default_args={} - рдбреИрдЧ рдЕрдкрдиреЗ рд╕рднреА рдСрдкрд░реЗрдЯрд░реЛрдВ рдХреЛ рд╕рдорд╛рди рддрд░реНрдХ рд╡рд┐рддрд░рд┐рдд рдХрд░ рд╕рдХрддрд╛ рд╣реИ;
  • to='{{ var.value.all_the_kings_men }}' - рдореИрджрд╛рди to рд╣рдордиреЗ рд╣рд╛рд░реНрдбрдХреЛрдб рдирд╣реАрдВ рдХрд┐рдпрд╛ рд╣реЛрдЧрд╛, рд▓реЗрдХрд┐рди рдЬрд┐рдВрдЬрд╛ рдФрд░ рдИрдореЗрд▓ рдХреА рдПрдХ рд╕реВрдЪреА рдХреЗ рд╕рд╛рде рдПрдХ рд╡реЗрд░рд┐рдПрдмрд▓ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рдЧрддрд┐рд╢реАрд▓ рд░реВрдк рд╕реЗ рдЬреЗрдирд░реЗрдЯ рдХрд┐рдпрд╛ рдЬрд╛рдПрдЧрд╛, рдЬрд┐рд╕реЗ рдореИрдВрдиреЗ рд╕рд╛рд╡рдзрд╛рдиреАрдкреВрд░реНрд╡рдХ рдбрд╛рд▓рд╛ рд╣реИ Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - рдСрдкрд░реЗрдЯрд░ рд╢реБрд░реВ рдХрд░рдиреЗ рдХреА рд╢рд░реНрддред рд╣рдорд╛рд░реЗ рдорд╛рдорд▓реЗ рдореЗрдВ, рдкрддреНрд░ рдорд╛рд▓рд┐рдХреЛрдВ рдХреЛ рддрднреА рднреЗрдЬрд╛ рдЬрд╛рдПрдЧрд╛ рдЬрдм рд╕рднреА рдирд┐рд░реНрднрд░рддрд╛рдПрдБ рдкреВрд░реА рд╣реЛ рдЬрд╛рдПрдБрдЧреА рд╕рдлрд▓рддрд╛рдкреВрд░реНрд╡рдХ;
  • tg_bot_conn_id='tg_main' - рддрд░реНрдХ conn_id рд╣рдорд╛рд░реЗ рджреНрд╡рд╛рд░рд╛ рдмрдирд╛рдИ рдЧрдИ рдХрдиреЗрдХреНрд╢рди рдЖрдИрдбреА рд╕реНрд╡реАрдХрд╛рд░ рдХрд░реЗрдВ Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - рдЯреЗрд▓реАрдЧреНрд░рд╛рдо рдореЗрдВ рд╕рдВрджреЗрд╢ рддрднреА рдЙрдбрд╝реЗрдВрдЧреЗ рдЬрдм рдЧрд┐рд░реЗ рд╣реБрдП рдХрд╛рд░реНрдп рд╣реЛрдВрдЧреЗ;
  • task_concurrency=1 - рд╣рдо рдПрдХ рдХрд╛рд░реНрдп рдХреЗ рдХрдИ рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдгреЛрдВ рдХреЛ рдПрдХ рд╕рд╛рде рд▓реЙрдиреНрдЪ рдХрд░рдиреЗ рдкрд░ рд░реЛрдХ рд▓рдЧрд╛рддреЗ рд╣реИрдВред рдЕрдиреНрдпрдерд╛, рд╣рдореЗрдВ рдПрдХ рд╕рд╛рде рдХрдИ рд▓реЙрдиреНрдЪ рдорд┐рд▓реЗрдВрдЧреЗ VerticaOperator (рдПрдХ рдЯреЗрдмрд▓ рдХреЛ рджреЗрдЦрддреЗ рд╣реБрдП);
  • report_update >> [email, tg] - рд╕рднреА VerticaOperator рдЗрд╕ рддрд░рд╣ рдкрддреНрд░ рдФрд░ рд╕рдВрджреЗрд╢ рднреЗрдЬрдиреЗ рдореЗрдВ рдЬреБрдЯреЗрдВ:
    Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

    рд▓реЗрдХрд┐рди рдЪреВрдБрдХрд┐ рдиреЛрдЯрд┐рдлрд╝рд╛рдпрд░ рдСрдкрд░реЗрдЯрд░реЛрдВ рдХреА рд▓реЙрдиреНрдЪ рд╕реНрдерд┐рддрд┐рдпрд╛рдБ рдЕрд▓рдЧ-рдЕрд▓рдЧ рд╣реЛрддреА рд╣реИрдВ, рдЗрд╕рд▓рд┐рдП рдХреЗрд╡рд▓ рдПрдХ рд╣реА рдХрд╛рдо рдХрд░реЗрдЧрд╛ред рд╡реГрдХреНрд╖ рджреГрд╢реНрдп рдореЗрдВ, рд╕рдм рдХреБрдЫ рдереЛрдбрд╝рд╛ рдХрдо рджреГрд╢реНрдпрдорд╛рди рджрд┐рдЦрддрд╛ рд╣реИ:
    Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

рдореИрдВ рдЗрд╕рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рдХреБрдЫ рд╢рдмреНрдж рдХрд╣реВрдВрдЧрд╛ рдореИрдХреНрд░реЛ рдФрд░ рдЙрдирдХреЗ рджреЛрд╕реНрдд - рдЪрд░.

рдореИрдХреНрд░реЛрдЬрд╝ рдЬрд┐рдВрдЬрд╛ рдкреНрд▓реЗрд╕рд╣реЛрд▓реНрдбрд░ рд╣реИрдВ рдЬреЛ рд╡рд┐рднрд┐рдиреНрди рдЙрдкрдпреЛрдЧреА рдЬрд╛рдирдХрд╛рд░реА рдХреЛ рдСрдкрд░реЗрдЯрд░ рддрд░реНрдХреЛрдВ рдореЗрдВ рдкреНрд░рддрд┐рд╕реНрдерд╛рдкрд┐рдд рдХрд░ рд╕рдХрддреЗ рд╣реИрдВред рдЙрджрд╛рд╣рд░рдг рдХреЗ рд▓рд┐рдП, рдЗрд╕ рддрд░рд╣:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} рд╕рдВрджрд░реНрдн рдЪрд░ рдХреА рд╕рд╛рдордЧреНрд░реА рддрдХ рд╡рд┐рд╕реНрддрд╛рд░рд┐рдд рд╣реЛрдЧрд╛ execution_date рдкреНрд░рд╛рд░реВрдк рдореЗрдВ YYYY-MM-DD: 2020-07-14. рд╕рдмрд╕реЗ рдЕрдЪреНрдЫреА рдмрд╛рдд рдпрд╣ рд╣реИ рдХрд┐ рд╕рдВрджрд░реНрдн рдЪрд░ рдХреЛ рдПрдХ рд╡рд┐рд╢рд┐рд╖реНрдЯ рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдг (рдЯреНрд░реА рд╡реНрдпреВ рдореЗрдВ рдПрдХ рд╡рд░реНрдЧ) рдореЗрдВ рд░рдЦрд╛ рдЬрд╛рддрд╛ рд╣реИ, рдФрд░ рдЬрдм рдкреБрдирдГ рдЖрд░рдВрдн рдХрд┐рдпрд╛ рдЬрд╛рддрд╛ рд╣реИ, рддреЛ рдкреНрд▓реЗрд╕рд╣реЛрд▓реНрдбрд░ рд╕рдорд╛рди рдорд╛рдиреЛрдВ рддрдХ рд╡рд┐рд╕реНрддрд╛рд░рд┐рдд рд╣реЛ рдЬрд╛рдПрдВрдЧреЗред

рдкреНрд░рддреНрдпреЗрдХ рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдг рдкрд░ рд░реЗрдВрдбрд░ рдмрдЯрди рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдорд╛рди рджреЗрдЦреЗ рдЬрд╛ рд╕рдХрддреЗ рд╣реИрдВред рдкрддреНрд░ рднреЗрдЬрдиреЗ рдХрд╛ рдХрд╛рд░реНрдп рдЗрд╕ рдкреНрд░рдХрд╛рд░ рд╣реИ:

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

рдФрд░ рдЗрд╕рд▓рд┐рдП рд╕рдВрджреЗрд╢ рднреЗрдЬрдиреЗ рдХреЗ рдХрд╛рд░реНрдп рдореЗрдВ:

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

рдирд╡реАрдирддрдо рдЙрдкрд▓рдмреНрдз рд╕рдВрд╕реНрдХрд░рдг рдХреЗ рд▓рд┐рдП рдЕрдВрддрд░реНрдирд┐рд╣рд┐рдд рдореИрдХреНрд░реЛрдЬрд╝ рдХреА рдкреВрд░реА рд╕реВрдЪреА рдпрд╣рд╛рдВ рдЙрдкрд▓рдмреНрдз рд╣реИ: рдореИрдХреНрд░реЛрдЬрд╝ рд╕рдВрджрд░реНрдн

рдЗрд╕рдХреЗ рдЕрд▓рд╛рд╡рд╛, рдкреНрд▓рдЧрдЗрдиреНрд╕ рдХреА рдорджрдж рд╕реЗ, рд╣рдо рдЕрдкрдиреЗ рд╕реНрд╡рдпрдВ рдХреЗ рдореИрдХреНрд░реЛрдЬрд╝ рдШреЛрд╖рд┐рдд рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ, рд▓реЗрдХрд┐рди рдпрд╣ рдПрдХ рдФрд░ рдХрд╣рд╛рдиреА рд╣реИред

рдкреВрд░реНрд╡рдирд┐рд░реНрдзрд╛рд░рд┐рдд рдЪреАрдЬрд╝реЛрдВ рдХреЗ рдЕрд▓рд╛рд╡рд╛, рд╣рдо рдЕрдкрдиреЗ рд╡реЗрд░рд┐рдПрдмрд▓реНрд╕ рдХреЗ рдорд╛рдиреЛрдВ рдХреЛ рдкреНрд░рддрд┐рд╕реНрдерд╛рдкрд┐рдд рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ (рдореИрдВрдиреЗ рдкрд╣рд▓реЗ рд╣реА рдЙрдкрд░реЛрдХреНрдд рдХреЛрдб рдореЗрдВ рдЗрд╕рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд┐рдпрд╛ рд╣реИ)ред рдЖрдЗрдпреЗ рдмрдирд╛рддреЗ рд╣реИрдВ Admin/Variables рдЪреАрдЬреЛрдВ рдХреА рдЬреЛрдбрд╝реА:

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

рд╡рд╣ рд╕рдм рдХреБрдЫ рдЬреЛ рдЖрдк рдЙрдкрдпреЛрдЧ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

рдорд╛рди рдПрдХ рдЕрджрд┐рд╢ рд░рд╛рд╢рд┐ рд╣реЛ рд╕рдХрддрд╛ рд╣реИ, рдпрд╛ рдпрд╣ JSON рднреА рд╣реЛ рд╕рдХрддрд╛ рд╣реИред JSON рдХреЗ рдорд╛рдорд▓реЗ рдореЗрдВ:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

рдмрд╕ рд╡рд╛рдВрдЫрд┐рдд рдХреБрдВрдЬреА рдХреЗ рдкрде рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░реЗрдВ: {{ var.json.bot_config.bot.token }}.

рдореИрдВ рд╡рд╕реНрддреБрддрдГ рдПрдХ рд╢рдмреНрдж рдХрд╣реВрдВрдЧрд╛ рдФрд░ рдЗрд╕рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рдПрдХ рд╕реНрдХреНрд░реАрдирд╢реЙрдЯ рджрд┐рдЦрд╛рдКрдВрдЧрд╛ рд╕рдВрдмрдВрдз. рдпрд╣рд╛рдВ рд╕рдм рдХреБрдЫ рдкреНрд░рд╛рдердорд┐рдХ рд╣реИ: рдкреГрд╖реНрда рдкрд░ Admin/Connections рд╣рдо рдПрдХ рдХрдиреЗрдХреНрд╢рди рдмрдирд╛рддреЗ рд╣реИрдВ, рд╡рд╣рд╛рдВ рдЕрдкрдиреЗ рд▓реЙрдЧрд┐рди/рдкрд╛рд╕рд╡рд░реНрдб рдФрд░ рдЕрдзрд┐рдХ рд╡рд┐рд╢рд┐рд╖реНрдЯ рдкреИрд░рд╛рдореАрдЯрд░ рдЬреЛрдбрд╝рддреЗ рд╣реИрдВред рдЗрд╕ рдХрджрд░:

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

рдкрд╛рд╕рд╡рд░реНрдб рдХреЛ рдПрдиреНрдХреНрд░рд┐рдкреНрдЯ рдХрд┐рдпрд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИ (рдбрд┐рдлрд╝реЙрд▓реНрдЯ рд╕реЗ рдЕрдзрд┐рдХ рдЕрдЪреНрдЫреА рддрд░рд╣ рд╕реЗ), рдпрд╛ рдЖрдк рдХрдиреЗрдХреНрд╢рди рдкреНрд░рдХрд╛рд░ рдХреЛ рдЫреЛрдбрд╝ рд╕рдХрддреЗ рд╣реИрдВ (рдЬреИрд╕рд╛ рдХрд┐ рдореИрдВрдиреЗ рдХрд┐рдпрд╛ рдерд╛)ред tg_main) - рддрдереНрдп рдпрд╣ рд╣реИ рдХрд┐ рдкреНрд░рдХрд╛рд░реЛрдВ рдХреА рд╕реВрдЪреА рдПрдпрд░рдлреНрд▓реЛ рдореЙрдбрд▓ рдореЗрдВ рд╣рд╛рд░реНрдбрд╡рд╛рдпрд░реНрдб рд╣реИ рдФрд░ рд╕реНрд░реЛрдд рдХреЛрдб рдореЗрдВ рд╢рд╛рдорд┐рд▓ рд╣реБрдП рдмрд┐рдирд╛ рдЗрд╕рдХрд╛ рд╡рд┐рд╕реНрддрд╛рд░ рдирд╣реАрдВ рдХрд┐рдпрд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИ (рдЕрдЧрд░ рдЕрдЪрд╛рдирдХ рдореИрдВрдиреЗ рдХреБрдЫ рдЧреВрдЧрд▓ рдирд╣реАрдВ рдХрд┐рдпрд╛, рддреЛ рдХреГрдкрдпрд╛ рдореБрдЭреЗ рд╕рд╣реА рдХрд░реЗрдВ), рд▓реЗрдХрд┐рди рдХреБрдЫ рднреА рд╣рдореЗрдВ рдХреНрд░реЗрдбрд┐рдЯ рдкреНрд░рд╛рдкреНрдд рдХрд░рдиреЗ рд╕реЗ рдирд╣реАрдВ рд░реЛрдХреЗрдЧрд╛ рдирд╛рдоред

рдЖрдк рдПрдХ рд╣реА рдирд╛рдо рд╕реЗ рдХрдИ рдХрдиреЗрдХреНрд╢рди рднреА рдмрдирд╛ рд╕рдХрддреЗ рд╣реИрдВ: рдЗрд╕ рдорд╛рдорд▓реЗ рдореЗрдВ, рд╡рд┐рдзрд┐ BaseHook.get_connection(), рдЬреЛ рд╣рдореЗрдВ рдирд╛рдо рд╕реЗ рдХрдиреЗрдХреНрд╢рди рджрд┐рд▓рд╡рд╛рддрд╛ рд╣реИ, рджреЗрдЧрд╛ рдЕрдирд┐рдпрдорд┐рдд рдХрдИ рдирд╛рдореЛрдВ рд╕реЗ (рд░рд╛рдЙрдВрдб рд░реЙрдмрд┐рди рдмрдирд╛рдирд╛ рдЕрдзрд┐рдХ рддрд░реНрдХрд╕рдВрдЧрдд рд╣реЛрдЧрд╛, рд▓реЗрдХрд┐рди рдЖрдЗрдП рдЗрд╕реЗ рдПрдпрд░рдлреНрд▓реЛ рдбреЗрд╡рд▓рдкрд░реНрд╕ рдХреЗ рд╡рд┐рд╡реЗрдХ рдкрд░ рдЫреЛрдбрд╝ рджреЗрдВ)ред

рд╡реЗрд░рд┐рдПрдмрд▓реНрд╕ рдФрд░ рдХрдиреЗрдХреНрд╢рдВрд╕ рдирд┐рд╢реНрдЪрд┐рдд рд░реВрдк рд╕реЗ рдЕрдЪреНрдЫреЗ рдЙрдкрдХрд░рдг рд╣реИрдВ, рд▓реЗрдХрд┐рди рдпрд╣ рдорд╣рддреНрд╡рдкреВрд░реНрдг рд╣реИ рдХрд┐ рд╕рдВрддреБрд▓рди рди рдЦреЛрдПрдВ: рдЖрдкрдХреЗ рдкреНрд░рд╡рд╛рд╣ рдХреЗ рдХреМрди рд╕реЗ рд╣рд┐рд╕реНрд╕реЗ рдЖрдк рдХреЛрдб рдореЗрдВ рд╣реА рд╕рдВрдЧреНрд░рд╣реАрдд рдХрд░рддреЗ рд╣реИрдВ, рдФрд░ рдХреМрди рд╕реЗ рд╣рд┐рд╕реНрд╕реЗ рдЖрдк рднрдВрдбрд╛рд░рдг рдХреЗ рд▓рд┐рдП рдПрдпрд░рдлреНрд▓реЛ рдХреЛ рджреЗрддреЗ рд╣реИрдВред рдПрдХ рдУрд░, рдпреВрдЖрдИ рдХреЗ рдорд╛рдзреНрдпрдо рд╕реЗ рдореВрд▓реНрдп рдХреЛ рдЬрд▓реНрджреА рд╕реЗ рдмрджрд▓рдирд╛ рд╕реБрд╡рд┐рдзрд╛рдЬрдирдХ рд╣реЛ рд╕рдХрддрд╛ рд╣реИ, рдЙрджрд╛рд╣рд░рдг рдХреЗ рд▓рд┐рдП, рдПрдХ рдореЗрд▓рд┐рдВрдЧ рдмреЙрдХреНрд╕ред рджреВрд╕рд░реА рдУрд░, рдпрд╣ рдЕрднреА рднреА рдорд╛рдЙрд╕ рдХреНрд▓рд┐рдХ рдХреА рд╡рд╛рдкрд╕реА рд╣реИ, рдЬрд┐рд╕рд╕реЗ рд╣рдо (рдореИрдВ) рдЫреБрдЯрдХрд╛рд░рд╛ рдкрд╛рдирд╛ рдЪрд╛рд╣рддреЗ рдереЗред

рдХрдиреЗрдХреНрд╢рди рдХреЗ рд╕рд╛рде рдХрд╛рдо рдХрд░рдирд╛ рдХрд╛рд░реНрдпреЛрдВ рдореЗрдВ рд╕реЗ рдПрдХ рд╣реИ рд╣реБрдХ. рд╕рд╛рдорд╛рдиреНрдп рддреМрд░ рдкрд░, рдПрдпрд░рдлреНрд▓реЛ рд╣реБрдХ рдЗрд╕реЗ рддреГрддреАрдп-рдкрдХреНрд╖ рд╕реЗрд╡рд╛рдУрдВ рдФрд░ рдкреБрд╕реНрддрдХрд╛рд▓рдпреЛрдВ рд╕реЗ рдЬреЛрдбрд╝рдиреЗ рдХреЗ рд▓рд┐рдП рдмрд┐рдВрджреБ рд╣реИрдВред рдЬреИрд╕реЗ, JiraHook рдЬреАрд░рд╛ рдХреЗ рд╕рд╛рде рдмрд╛рддрдЪреАрдд рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рд╣рдорд╛рд░реЗ рд▓рд┐рдП рдПрдХ рдХреНрд▓рд╛рдЗрдВрдЯ рдЦреЛрд▓реЗрдЧрд╛ (рдЖрдк рдХрд╛рд░реНрдпреЛрдВ рдХреЛ рдЖрдЧреЗ рдФрд░ рдкреАрдЫреЗ рд▓реЗ рдЬрд╛ рд╕рдХрддреЗ рд╣реИрдВ), рдФрд░ рдЗрд╕рдХреА рдорджрдж рд╕реЗ SambaHook рдЖрдк рдХрд┐рд╕реА рд╕реНрдерд╛рдиреАрдп рдлрд╝рд╛рдЗрд▓ рдХреЛ рдЖрдЧреЗ рдмрдврд╝рд╛ рд╕рдХрддреЗ рд╣реИрдВ smb-рдмрд┐рдВрджреБред

рдХрд╕реНрдЯрдо рдСрдкрд░реЗрдЯрд░ рдХреЛ рдкрд╛рд░реНрд╕ рдХрд░рдирд╛

рдФрд░ рд╣рдо рдпрд╣ рджреЗрдЦрдиреЗ рдХреЗ рдХрд░реАрдм рдкрд╣реБрдВрдЪ рдЧрдП рдХрд┐ рдпрд╣ рдХреИрд╕реЗ рдмрдирд╛ рд╣реИ TelegramBotSendMessage

рдХреЛрдб commons/operators.py рд╡рд╛рд╕реНрддрд╡рд┐рдХ рдСрдкрд░реЗрдЯрд░ рдХреЗ рд╕рд╛рде:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

рдпрд╣рд╛рдВ, рдПрдпрд░рдлрд╝реНрд▓реЛ рдореЗрдВ рдмрд╛рдХреА рд╕рднреА рдЪреАрдЬрд╝реЛрдВ рдХреА рддрд░рд╣, рд╕рдм рдХреБрдЫ рдмрд╣реБрдд рд╕рд░рд▓ рд╣реИ:

  • рд╡рд┐рд░рд╛рд╕рдд рдореЗрдВ рдорд┐рд▓рд╛ BaseOperator, рдЬреЛ рдХрд╛рдлреА рдХреБрдЫ рдПрдпрд░рдлреНрд▓реЛ-рд╡рд┐рд╢рд┐рд╖реНрдЯ рдЪреАрдЬреЛрдВ рдХреЛ рд▓рд╛рдЧреВ рдХрд░рддрд╛ рд╣реИ (рдЕрдкрдиреЗ рдЕрд╡рдХрд╛рд╢ рдХреЛ рджреЗрдЦреЗрдВ)
  • рдШреЛрд╖рд┐рдд рдлрд╝реАрд▓реНрдб template_fields, рдЬрд┐рд╕рдореЗрдВ рдЬрд┐рдВрдЬрд╛ рдкреНрд░реЛрд╕реЗрд╕ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдореИрдХреНрд░реЛрдЬрд╝ рдХреА рддрд▓рд╛рд╢ рдХрд░реЗрдЧрд╛ред
  • рдХреЗ рд▓рд┐рдП рдЙрдЪрд┐рдд рддрд░реНрдХреЛрдВ рдХреА рд╡реНрдпрд╡рд╕реНрдерд╛ рдХреА __init__(), рдЬрд╣рд╛рдВ рдЖрд╡рд╢реНрдпрдХ рд╣реЛ рд╡рд╣рд╛рдВ рдбрд┐рдлрд╝реЙрд▓реНрдЯ рд╕реЗрдЯ рдХрд░реЗрдВред
  • рд╣рдо рдкреВрд░реНрд╡рдЬ рдХреЗ рдЖрд░рдВрднреАрдХрд░рдг рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рднреА рдирд╣реАрдВ рднреВрд▓реЗред
  • рд╕рдВрдмрдВрдзрд┐рдд рд╣реБрдХ рдЦреЛрд▓рд╛ TelegramBotHookрдЗрд╕рд╕реЗ рдПрдХ рдХреНрд▓рд╛рдЗрдВрдЯ рдСрдмреНрдЬреЗрдХреНрдЯ рдкреНрд░рд╛рдкреНрдд рд╣реБрдЖред
  • рдУрд╡рд░рд░рд╛рдЗрдб (рдкреБрдирд░реНрдкрд░рд┐рднрд╛рд╖рд┐рдд) рд╡рд┐рдзрд┐ BaseOperator.execute(), рдЬрдм рдСрдкрд░реЗрдЯрд░ рд▓реЙрдиреНрдЪ рдХрд░рдиреЗ рдХрд╛ рд╕рдордп рдЖрдПрдЧрд╛ рддреЛ рдПрдпрд░рдлрд╝реЙ рдЪрд┐рдХреЛрдЯреА рдХрд╛рдЯреЗрдЧрд╛ - рдЗрд╕рдореЗрдВ рд╣рдо рд▓реЙрдЧ рдЗрди рдХрд░рдирд╛ рднреВрд▓рдХрд░ рдореБрдЦреНрдп рдХреНрд░рд┐рдпрд╛ рд▓рд╛рдЧреВ рдХрд░реЗрдВрдЧреЗред (рд╡реИрд╕реЗ, рд╣рдо рд╕реАрдзреЗ рд▓реЙрдЧ рдЗрди рдХрд░рддреЗ рд╣реИрдВ stdout ╨╕ stderr - рдПрдпрд░рдлрд╝реНрд▓реЛ рд╣рд░ рдЪреАрдЬрд╝ рдХреЛ рд░реЛрдХ рджреЗрдЧрд╛, рдЙрд╕реЗ рдЦреВрдмрд╕реВрд░рддреА рд╕реЗ рд▓рдкреЗрдЯ рджреЗрдЧрд╛, рдЬрд╣рд╛рдВ рдЖрд╡рд╢реНрдпрдХ рд╣реЛ рдЙрд╕реЗ рд╡рд┐рдШрдЯрд┐рдд рдХрд░ рджреЗрдЧрд╛ред)

рдЖрдЗрдпреЗ рджреЗрдЦреЗрдВ рд╣рдорд╛рд░реЗ рдкрд╛рд╕ рдХреНрдпрд╛ рд╣реИ commons/hooks.py. рдлрд╝рд╛рдЗрд▓ рдХрд╛ рдкрд╣рд▓рд╛ рднрд╛рдЧ, рд╣реБрдХ рдХреЗ рд╕рд╛рде рд╣реА:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

рдореБрдЭреЗ рдпрд╣ рднреА рдирд╣реАрдВ рдкрддрд╛ рдХрд┐ рдпрд╣рд╛рдВ рдХреНрдпрд╛ рд╕рдордЭрд╛рдКрдВ, рдореИрдВ рд╕рд┐рд░реНрдл рдорд╣рддреНрд╡рдкреВрд░реНрдг рдмрд┐рдВрджреБрдУрдВ рдкрд░ рдзреНрдпрд╛рди рджреВрдВрдЧрд╛:

  • рд╣рдореЗрдВ рд╡рд┐рд░рд╛рд╕рдд рдорд┐рд▓реА рд╣реИ, рддрд░реНрдХреЛрдВ рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рд╕реЛрдЪреЗрдВ - рдЬреНрдпрд╛рджрд╛рддрд░ рдорд╛рдорд▓реЛрдВ рдореЗрдВ рдпрд╣ рдПрдХ рд╣реА рд╣реЛрдЧрд╛: conn_id;
  • рдорд╛рдирдХ рддрд░реАрдХреЛрдВ рд╕реЗ рдЖрдЧреЗ рдирд┐рдХрд▓рдирд╛: рдореИрдВрдиреЗ рдЦреБрдж рдХреЛ рд╕реАрдорд┐рдд рдХрд░ рд▓рд┐рдпрд╛ get_conn(), рдЬрд┐рд╕рдореЗрдВ рдореБрдЭреЗ рдирд╛рдо рд╕реЗ рдХрдиреЗрдХреНрд╢рди рдкреИрд░рд╛рдореАрдЯрд░ рдорд┐рд▓рддреЗ рд╣реИрдВ рдФрд░ рдХреЗрд╡рд▓ рдЕрдиреБрднрд╛рдЧ рдорд┐рд▓рддрд╛ рд╣реИ extra (рдпрд╣ рдПрдХ JSON рдлрд╝реАрд▓реНрдб рд╣реИ), рдЬрд┐рд╕рдореЗрдВ рдореИрдВрдиреЗ (рдореЗрд░реЗ рдЕрдкрдиреЗ рдирд┐рд░реНрджреЗрд╢реЛрдВ рдХреЗ рдЕрдиреБрд╕рд╛рд░!) рдЯреЗрд▓реАрдЧреНрд░рд╛рдо рдмреЙрдЯ рдЯреЛрдХрди рдбрд╛рд▓рд╛ рд╣реИ: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • рдореИрдВ рд╣рдорд╛рд░рд╛ рдПрдХ рдЙрджрд╛рд╣рд░рдг рдмрдирд╛рддрд╛ рд╣реВрдВ TelegramBot, рдЗрд╕реЗ рдПрдХ рд╡рд┐рд╢рд┐рд╖реНрдЯ рдЯреЛрдХрди рджреЗ рд░рд╣рд╛ рд╣реИред

рдмрд╕ рдЗрддрдирд╛ рд╣реАред рдЖрдк рд╣реБрдХ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рдХреНрд▓рд╛рдЗрдВрдЯ рдкреНрд░рд╛рдкреНрдд рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ TelegramBotHook().clent рдпрд╛ TelegramBotHook().get_conn().

рдФрд░ рдлрд╝рд╛рдЗрд▓ рдХрд╛ рджреВрд╕рд░рд╛ рднрд╛рдЧ, рдЬрд┐рд╕рдореЗрдВ рдореИрдВ рдЯреЗрд▓реАрдЧреНрд░рд╛рдо рд░реЗрд╕реНрдЯ рдПрдкреАрдЖрдИ рдХреЗ рд▓рд┐рдП рдПрдХ рдорд╛рдЗрдХреНрд░реЛрд░реИрдкрд░ рдмрдирд╛рддрд╛ рд╣реВрдВ, рддрд╛рдХрд┐ рдЙрд╕реЗ рдЦреАрдВрдЪрдирд╛ рди рдкрдбрд╝реЗ python-telegram-bot рдПрдХ рд╡рд┐рдзрд┐ рдХреЗ рд▓рд┐рдП sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

рд╕рд╣реА рддрд░реАрдХрд╛ рдпрд╣ рд╣реИ рдХрд┐ рдЗрд╕реЗ рд╕рдм рдЬреЛрдбрд╝ рджрд┐рдпрд╛ рдЬрд╛рдП: TelegramBotSendMessage, TelegramBotHook, TelegramBot - рдкреНрд▓рдЧрдЗрди рдореЗрдВ, рдПрдХ рд╕рд╛рд░реНрд╡рдЬрдирд┐рдХ рд░рд┐рдкреЙрдЬрд┐рдЯрд░реА рдореЗрдВ рдбрд╛рд▓реЗрдВ, рдФрд░ рдЗрд╕реЗ рдУрдкрди рд╕реЛрд░реНрд╕ рдХреЛ рджреЗрдВред

рдЬрдм рд╣рдо рдпрд╣ рд╕рдм рдЕрдзреНрдпрдпрди рдХрд░ рд░рд╣реЗ рдереЗ, рд╣рдорд╛рд░реА рд░рд┐рдкреЛрд░реНрдЯ рдЕрдкрдбреЗрдЯ рд╕рдлрд▓рддрд╛рдкреВрд░реНрд╡рдХ рд╡рд┐рдлрд▓ рд╣реЛ рдЧрдИ рдФрд░ рдореБрдЭреЗ рдЪреИрдирд▓ рдореЗрдВ рдПрдХ рддреНрд░реБрдЯрд┐ рд╕рдВрджреЗрд╢ рднреЗрдЬрд╛ рдЧрдпрд╛ред рдореИрдВ рдпрд╣ рджреЗрдЦрдиреЗ рдЬрд╛ рд░рд╣рд╛ рд╣реВрдВ рдХрд┐ рдХреНрдпрд╛ рдпрд╣ рдЧрд▓рдд рд╣реИ...

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛
рд╣рдорд╛рд░реЗ рдХреБрддреНрддреЗ рдореЗрдВ рдХреБрдЫ рдЯреВрдЯ рдЧрдпрд╛! рдХреНрдпрд╛ рд╣рдо рдпрд╣реА рдЙрдореНрдореАрдж рдирд╣реАрдВ рдХрд░ рд░рд╣реЗ рдереЗ? рдмрд┐рд▓реНрдХреБрд▓!

рдХреНрдпрд╛ рдЖрдк рдбрд╛рд▓рдиреЗ рдЬрд╛ рд░рд╣реЗ рд╣реИрдВ?

рдХреНрдпрд╛ рдЖрдкрдХреЛ рд▓рдЧрддрд╛ рд╣реИ рдХрд┐ рдореБрдЭрд╕реЗ рдХреБрдЫ рдЪреВрдХ рдЧрдпрд╛? рдРрд╕рд╛ рд▓рдЧрддрд╛ рд╣реИ рдХрд┐ рдЙрд╕рдиреЗ SQL рд╕рд░реНрд╡рд░ рд╕реЗ рд╡рд░реНрдЯрд┐рдХрд╛ рдореЗрдВ рдбреЗрдЯрд╛ рд╕реНрдерд╛рдирд╛рдВрддрд░рд┐рдд рдХрд░рдиреЗ рдХрд╛ рд╡рд╛рджрд╛ рдХрд┐рдпрд╛ рдерд╛, рдФрд░ рдлрд┐рд░ рдЙрд╕рдиреЗ рдЗрд╕реЗ рд▓реЗ рд▓рд┐рдпрд╛ рдФрд░ рд╡рд┐рд╖рдп рд╕реЗ рд╣рдЯ рдЧрдпрд╛, рдмрджрдорд╛рд╢!

рдпрд╣ рдЕрддреНрдпрд╛рдЪрд╛рд░ рдЬрд╛рдирдмреВрдЭрдХрд░ рдХрд┐рдпрд╛ рдЧрдпрд╛ рдерд╛, рдореБрдЭреЗ рдмрд╕ рдЖрдкрдХреЗ рд▓рд┐рдП рдХреБрдЫ рд╢рдмреНрджрд╛рд╡рд▓реА рд╕рдордЭрдиреА рдереАред рдЕрдм рдЖрдк рдЖрдЧреЗ рдмрдврд╝ рд╕рдХрддреЗ рд╣реИрдВ.

рд╣рдорд╛рд░реА рдпреЛрдЬрдирд╛ рдпрд╣ рдереА:

  1. рдбреЗрдЧ рдХрд░реЛ
  2. рдХрд╛рд░реНрдп рдЙрддреНрдкрдиреНрди рдХрд░реЗрдВ
  3. рджреЗрдЦреЛ рд╕рдм рдХреБрдЫ рдХрд┐рддрдирд╛ рд╕реБрдВрджрд░ рд╣реИ
  4. рднрд░рдиреЗ рдХреЗ рд▓рд┐рдП рд╕рддреНрд░ рд╕рдВрдЦреНрдпрд╛рдПрдБ рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХрд░реЗрдВ
  5. SQL рд╕рд░реНрд╡рд░ рд╕реЗ рдбреЗрдЯрд╛ рдкреНрд░рд╛рдкреНрдд рдХрд░реЗрдВ
  6. рд╡рд░реНрдЯрд┐рдХрд╛ рдореЗрдВ рдбреЗрдЯрд╛ рдбрд╛рд▓реЗрдВ
  7. рдЖрдБрдХрдбрд╝реЗ рдПрдХрддреНрд░ рдХрд░реЗрдВ

рддреЛ, рдпрд╣ рд╕рдм рдЪрд╛рд▓реВ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП, рдореИрдВрдиреЗ рд╣рдорд╛рд░реЗ рдореЗрдВ рдПрдХ рдЫреЛрдЯрд╛ рд╕рд╛ рдпреЛрдЧрджрд╛рди рдХрд┐рдпрд╛ docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

рд╡рд╣рд╛рдВ рд╣рдо рдЙрдард╛рддреЗ рд╣реИрдВ:

  • рдореЗрдЬрд╝рдмрд╛рди рдХреЗ рд░реВрдк рдореЗрдВ рд╡рд░реНрдЯрд┐рдХрд╛ dwh рд╕рдмрд╕реЗ рдбрд┐рдлрд╝реЙрд▓реНрдЯ рд╕реЗрдЯрд┐рдВрдЧреНрд╕ рдХреЗ рд╕рд╛рде,
  • SQL рд╕рд░реНрд╡рд░ рдХреЗ рддреАрди рдЙрджрд╛рд╣рд░рдг,
  • рд╣рдо рдмрд╛рдж рдореЗрдВ рдбреЗрдЯрд╛рдмреЗрд╕ рдХреЛ рдХреБрдЫ рдбреЗрдЯрд╛ рд╕реЗ рднрд░ рджреЗрддреЗ рд╣реИрдВ (рдХрд┐рд╕реА рднреА рд╕реНрдерд┐рддрд┐ рдореЗрдВ рдзреНрдпрд╛рди рди рджреЗрдВ mssql_init.py!)

рд╣рдо рдкрд┐рдЫрд▓реА рдмрд╛рд░ рдХреА рддреБрд▓рдирд╛ рдореЗрдВ рдереЛрдбрд╝реЗ рдЕрдзрд┐рдХ рдЬрдЯрд┐рд▓ рдХрдорд╛рдВрдб рдХреА рдорджрдж рд╕реЗ рд╕рднреА рдЕрдЪреНрдЫреЗ рд▓реЙрдиреНрдЪ рдХрд░рддреЗ рд╣реИрдВ:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

рд╣рдорд╛рд░реЗ рдЪрдорддреНрдХрд╛рд░рд┐рдХ рд░реИрдВрдбрдорд╛рдЗрдЬрд╝рд░ рдиреЗ рдЬреЛ рдЙрддреНрдкрдиреНрди рдХрд┐рдпрд╛, рдЖрдк рдЙрд╕ рдЖрдЗрдЯрдо рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ Data Profiling/Ad Hoc Query:

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛
рдореБрдЦреНрдп рдмрд╛рдд рдпрд╣ рд╣реИ рдХрд┐ рдЗрд╕реЗ рд╡рд┐рд╢реНрд▓реЗрд╖рдХреЛрдВ рдХреЛ рди рджрд┐рдЦрд╛рдпрд╛ рдЬрд╛рдП

рдЗрд╕реЗ рд╡рд┐рд╕реНрддрд╛рд░ рд╕реЗ рд╕рдордЭрд╛рдЗрдП рдИрдЯреАрдПрд▓ рд╕рддреНрд░ рдореИрдВ рдирд╣реАрдВ рдХрд░реВрдВрдЧрд╛, рд╡рд╣рд╛рдВ рд╕рдм рдХреБрдЫ рддреБрдЪреНрдЫ рд╣реИ: рд╣рдо рдПрдХ рдЖрдзрд╛рд░ рдмрдирд╛рддреЗ рд╣реИрдВ, рдЙрд╕рдореЗрдВ рдПрдХ рд╕рдВрдХреЗрдд рд╣реЛрддрд╛ рд╣реИ, рд╣рдо рд╕рдм рдХреБрдЫ рдПрдХ рд╕рдВрджрд░реНрдн рдкреНрд░рдмрдВрдзрдХ рдХреЗ рд╕рд╛рде рд▓рдкреЗрдЯрддреЗ рд╣реИрдВ, рдФрд░ рдЕрдм рд╣рдо рдпрд╣ рдХрд░рддреЗ рд╣реИрдВ:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

рд╕рдордп рдЖ рдЧрдпрд╛ рд╣реИ рд╣рдорд╛рд░рд╛ рдбреЗрдЯрд╛ рдПрдХрддреНрд░ рдХрд░реЗрдВ рд╣рдорд╛рд░реА рдбреЗрдврд╝ рд╕реМ рдЯреЗрдмрд▓реЛрдВ рд╕реЗ. рдЖрдЗрдП рдЗрд╕реЗ рдмрд╣реБрдд рд╣реА рд╕рд░рд▓ рдкрдВрдХреНрддрд┐рдпреЛрдВ рдХреА рд╕рд╣рд╛рдпрддрд╛ рд╕реЗ рдХрд░реЗрдВ:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. рдПрдХ рд╣реБрдХ рдХреА рдорджрдж рд╕реЗ рд╣рдо рдПрдпрд░рдлреНрд▓реЛ рд╕реЗ рдкреНрд░рд╛рдкреНрдд рдХрд░рддреЗ рд╣реИрдВ pymssql-рдЬреЛрдбрд╝рдирд╛
  2. рдЖрдЗрдП рдЕрдиреБрд░реЛрдз рдореЗрдВ рджрд┐рдирд╛рдВрдХ рдХреЗ рд░реВрдк рдореЗрдВ рдПрдХ рдкреНрд░рддрд┐рдмрдВрдз рдХреЛ рдкреНрд░рддрд┐рд╕реНрдерд╛рдкрд┐рдд рдХрд░реЗрдВ - рдЗрд╕реЗ рдЯреЗрдореНрдкрд▓реЗрдЯ рдЗрдВрдЬрди рджреНрд╡рд╛рд░рд╛ рдлрд╝рдВрдХреНрд╢рди рдореЗрдВ рдбрд╛рд▓ рджрд┐рдпрд╛ рдЬрд╛рдПрдЧрд╛ред
  3. рд╣рдорд╛рд░реЗ рдЕрдиреБрд░реЛрдз рдХреЛ рдЦрд┐рд▓рд╛рдирд╛ pandasрд╣рдореЗрдВ рдХреМрди рдорд┐рд▓реЗрдЧрд╛ DataFrame - рдпрд╣ рднрд╡рд┐рд╖реНрдп рдореЗрдВ рд╣рдорд╛рд░реЗ рдХрд╛рдо рдЖрдПрдЧрд╛ред

рдореИрдВ рдкреНрд░рддрд┐рд╕реНрдерд╛рдкрди рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░ рд░рд╣рд╛ рд╣реВрдБ {dt} рдЕрдиреБрд░реЛрдз рдкреИрд░рд╛рдореАрдЯрд░ рдХреЗ рдмрдЬрд╛рдп %s рдЗрд╕рд▓рд┐рдП рдирд╣реАрдВ рдХрд┐ рдореИрдВ рдПрдХ рджреБрд╖реНрдЯ рдкрд┐рдиреЛрдЪреНрдЪрд┐рдпреЛ рд╣реВрдВ, рдмрд▓реНрдХрд┐ рдЗрд╕рд▓рд┐рдП рдХрд┐ pandas рд╕рд╛рдордирд╛ рдирд╣реАрдВ рдХрд░ рд╕рдХрддрд╛ pymssql рдФрд░ рдЖрдЦрд┐рд░реА рдХреЛ рдЦрд┐рд╕рдХрд╛ рджреЗрддрд╛ рд╣реИ params: Listрд╣рд╛рд▓рд╛рдБрдХрд┐ рд╡рд╣ рд╡рд╛рд╕реНрддрд╡ рдореЗрдВ рдЪрд╛рд╣рддрд╛ рд╣реИ tuple.
рдпрд╣ рднреА рдзреНрдпрд╛рди рджреЗрдВ рдХрд┐ рдбреЗрд╡рд▓рдкрд░ pymssql рдЕрдм рдЙрд╕рдХрд╛ рд╕рдорд░реНрдерди рди рдХрд░рдиреЗ рдХрд╛ рдирд┐рд░реНрдгрдп рд▓рд┐рдпрд╛ рдФрд░ рдЕрдм рдмрд╛рд╣рд░ рдирд┐рдХрд▓рдиреЗ рдХрд╛ рд╕рдордп рдЖ рдЧрдпрд╛ рд╣реИ pyodbc.

рдЖрдЗрдП рджреЗрдЦреЗрдВ рдХрд┐ рдПрдпрд░рдлреНрд▓реЛ рдиреЗ рд╣рдорд╛рд░реЗ рдХрд╛рд░реНрдпреЛрдВ рдХреЗ рддрд░реНрдХреЛрдВ рдХреЛ рдХреНрдпрд╛ рднрд░ рджрд┐рдпрд╛:

Apache Airflow: ETL рдХреЛ рдЖрд╕рд╛рди рдмрдирд╛рдирд╛

рдпрджрд┐ рдХреЛрдИ рдбреЗрдЯрд╛ рдирд╣реАрдВ рд╣реИ, рддреЛ рдЬрд╛рд░реА рд░рдЦрдиреЗ рдХрд╛ рдХреЛрдИ рдорддрд▓рдм рдирд╣реАрдВ рд╣реИред рд▓реЗрдХрд┐рди рдлрд┐рд▓рд┐рдВрдЧ рдХреЛ рд╕рдлрд▓ рдорд╛рдирдирд╛ тАЛтАЛрднреА рдЕрдЬреАрдм рд╣реИ. рд▓реЗрдХрд┐рди рдпреЗ рдХреЛрдИ рдЧрд▓рддреА рдирд╣реАрдВ рд╣реИ. рдП-рдЖрд╣-рдЖрд╣, рдХреНрдпрд╛ рдХрд░реЗрдВ?! рдФрд░ рдпрд╣рд╛рдБ рдХреНрдпрд╛ рд╣реИ:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException рдПрдпрд░рдлрд╝реНрд▓реЛ рдХреЛ рдмрддрд╛рддрд╛ рд╣реИ рдХрд┐ рдХреЛрдИ рддреНрд░реБрдЯрд┐ рдирд╣реАрдВ рд╣реИ, рд▓реЗрдХрд┐рди рд╣рдо рдХрд╛рд░реНрдп рдЫреЛрдбрд╝ рджреЗрддреЗ рд╣реИрдВред рдЗрдВрдЯрд░рдлрд╝реЗрд╕ рдореЗрдВ рд╣рд░рд╛ рдпрд╛ рд▓рд╛рд▓ рд╡рд░реНрдЧ рдирд╣реАрдВ, рдмрд▓реНрдХрд┐ рдЧреБрд▓рд╛рдмреА рд░рдВрдЧ рд╣реЛрдЧрд╛ред

рдЖрдЗрдП рдЕрдкрдирд╛ рдбреЗрдЯрд╛ рдЯреЙрд╕ рдХрд░реЗрдВ рдПрдХрд╛рдзрд┐рдХ рд╕реНрддрдВрдн:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

рдЕрд░реНрдерд╛рддреН

  • рдЬрд┐рд╕ рдбреЗрдЯрд╛рдмреЗрд╕ рд╕реЗ рд╣рдордиреЗ рдСрд░реНрдбрд░ рд▓рд┐рдпрд╛,
  • рд╣рдорд╛рд░реЗ рдмрд╛рдврд╝ рд╕рддреНрд░ рдХреА рдЖрдИрдбреА (рдпрд╣ рдЕрд▓рдЧ рд╣реЛрдЧреА) рд╣рд░ рдХрд╛рд░реНрдп рдХреЗ рд▓рд┐рдП),
  • рд╕реНрд░реЛрдд рдФрд░ рдСрд░реНрдбрд░ рдЖрдИрдбреА рд╕реЗ рдПрдХ рд╣реИрд╢ - рддрд╛рдХрд┐ рдЕрдВрддрд┐рдо рдбреЗрдЯрд╛рдмреЗрд╕ рдореЗрдВ (рдЬрд╣рд╛рдВ рд╕рдм рдХреБрдЫ рдПрдХ рддрд╛рд▓рд┐рдХрд╛ рдореЗрдВ рдбрд╛рд▓рд╛ рдЬрд╛рддрд╛ рд╣реИ) рд╣рдорд╛рд░реЗ рдкрд╛рд╕ рдПрдХ рдЕрджреНрд╡рд┐рддреАрдп рдСрд░реНрдбрд░ рдЖрдИрдбреА рд╣реЛред

рдЕрдВрддрд┐рдо рдЪрд░рдг рд╢реЗрд╖ рд╣реИ: рд╕рдм рдХреБрдЫ рд╡рд░реНрдЯрд┐рдХрд╛ рдореЗрдВ рдбрд╛рд▓реЗрдВред рдФрд░, рдЕрдЬреАрдм рддрд░рд╣ рд╕реЗ, рдРрд╕рд╛ рдХрд░рдиреЗ рдХрд╛ рд╕рдмрд╕реЗ рд╢рд╛рдирджрд╛рд░ рдФрд░ рдХреБрд╢рд▓ рддрд░реАрдХреЛрдВ рдореЗрдВ рд╕реЗ рдПрдХ рд╕реАрдПрд╕рд╡реА рдХреЗ рдорд╛рдзреНрдпрдо рд╕реЗ рд╣реИ!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. рд╣рдо рдПрдХ рд╡рд┐рд╢реЗрд╖ рд░рд┐рд╕реАрд╡рд░ рдмрдирд╛ рд░рд╣реЗ рд╣реИрдВ StringIO.
  2. pandas рдХреГрдкрдпрд╛ рдЕрдкрдирд╛ рдбрд╛рд▓ рджреЗрдВрдЧреЗ DataFrame рдЬреИрд╕рд╛ CSV-рд▓рд╛рдЗрдиреЗрдВ.
  3. рдЖрдЗрдП рдПрдХ рд╣реБрдХ рдХреЗ рд╕рд╛рде рд╣рдорд╛рд░реЗ рдкрд╕рдВрджреАрджрд╛ рд╡рд░реНрдЯрд┐рдХрд╛ рд╕реЗ рдПрдХ рдХрдиреЗрдХреНрд╢рди рдЦреЛрд▓реЗрдВред
  4. рдФрд░ рдЕрдм рдорджрдж рд╕реЗ copy() рд╣рдорд╛рд░рд╛ рдбреЗрдЯрд╛ рд╕реАрдзреЗ рд╡рд░реНрдЯрд┐рдХрд╛ рдХреЛ рднреЗрдЬреЗрдВ!

рд╣рдо рдбреНрд░рд╛рдЗрд╡рд░ рд╕реЗ рд▓реЗрдВрдЧреЗ рдХрд┐ рдХрд┐рддрдиреА рд▓рд╛рдЗрдиреЗрдВ рднрд░реА рдЧрдИрдВ, рдФрд░ рд╕рддреНрд░ рдкреНрд░рдмрдВрдзрдХ рдХреЛ рдмрддрд╛рдПрдВрдЧреЗ рдХрд┐ рд╕рдм рдХреБрдЫ рдареАрдХ рд╣реИ:

session.loaded_rows = cursor.rowcount
session.successful = True

рдмрд╕ рдЗрддрдирд╛ рд╣реАред

рдмрд┐рдХреНрд░реА рдкрд░, рд╣рдо рдореИрдиреНрдпреБрдЕрд▓ рд░реВрдк рд╕реЗ рд▓рдХреНрд╖реНрдп рдкреНрд▓реЗрдЯ рдмрдирд╛рддреЗ рд╣реИрдВред рдпрд╣рд╛рдВ рдореИрдВрдиреЗ рдЕрдкрдиреЗ рд▓рд┐рдП рдПрдХ рдЫреЛрдЯреА рдорд╢реАрди рдХреА рдЕрдиреБрдорддрд┐ рджреА:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

рдореИрдВ рдЙрдкрдпреЛрдЧ рдХрд░ рд░рд╣рд╛ рд╣реВрдБ VerticaOperator() рдореИрдВ рдПрдХ рдбреЗрдЯрд╛рдмреЗрд╕ рд╕реНрдХреАрдорд╛ рдФрд░ рдПрдХ рддрд╛рд▓рд┐рдХрд╛ рдмрдирд╛рддрд╛ рд╣реВрдВ (рдпрджрд┐ рд╡реЗ рдирд┐рд╢реНрдЪрд┐рдд рд░реВрдк рд╕реЗ рдкрд╣рд▓реЗ рд╕реЗ рдореМрдЬреВрдж рдирд╣реАрдВ рд╣реИрдВ)ред рдореБрдЦреНрдп рдмрд╛рдд рдирд┐рд░реНрднрд░рддрд╛рдУрдВ рдХреЛ рд╕рд╣реА рдврдВрдЧ рд╕реЗ рд╡реНрдпрд╡рд╕реНрдерд┐рдд рдХрд░рдирд╛ рд╣реИ:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

рдЙрдкрд╕рдВрд╣рд╛рд░

- рдЕрдЪреНрдЫрд╛, - рдЫреЛрдЯреЗ рдЪреВрд╣реЗ рдиреЗ рдХрд╣рд╛, - рд╣реИ рдирд╛, рдЕрднреА
рдХреНрдпрд╛ рддреБрдореНрд╣реЗрдВ рдпрдХреАрди рд╣реИ рдХрд┐ рдореИрдВ рдЬрдВрдЧрд▓ рдХрд╛ рд╕рдмрд╕реЗ рднрдпрд╛рдирдХ рдЬрд╛рдирд╡рд░ рд╣реВрдВ?

рдЬреВрд▓рд┐рдпрд╛ рдбреЛрдирд╛рд▓реНрдбрд╕рди, рдж рдЧреНрд░реБрдлрд╝рд╛рд▓реЛ

рдореБрдЭреЗ рд▓рдЧрддрд╛ рд╣реИ рдХрд┐ рдЕрдЧрд░ рдореЗрд░реЗ рд╕рд╣рдпреЛрдЧрд┐рдпреЛрдВ рдФрд░ рдореЗрд░реЗ рдмреАрдЪ рдПрдХ рдкреНрд░рддрд┐рдпреЛрдЧрд┐рддрд╛ рд╣реЛрддреА: рдХреМрди рдЬрд▓реНрджреА рд╕реЗ рдИрдЯреАрдПрд▓ рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдХреЛ рд╢реБрд░реВ рд╕реЗ рдмрдирд╛рдПрдЧрд╛ рдФрд░ рд▓реЙрдиреНрдЪ рдХрд░реЗрдЧрд╛: рд╡реЗ рдЕрдкрдиреЗ рдПрд╕рдПрд╕рдЖрдИрдПрд╕ рдФрд░ рдПрдХ рдорд╛рдЙрд╕ рдХреЗ рд╕рд╛рде рдФрд░ рдореИрдВ рдПрдпрд░рдлреНрд▓реЛ рдХреЗ рд╕рд╛рде ... рдФрд░ рдлрд┐рд░ рд╣рдо рд░рдЦрд░рдЦрд╛рд╡ рдореЗрдВ рдЖрд╕рд╛рдиреА рдХреА рднреА рддреБрд▓рдирд╛ рдХрд░реЗрдВрдЧреЗ ... рд╡рд╛рд╣, рдореБрдЭреЗ рд▓рдЧрддрд╛ рд╣реИ рдЖрдк рдЗрд╕ рдмрд╛рдд рд╕реЗ рд╕рд╣рдордд рд╣реЛрдВрдЧреЗ рдХрд┐ рдореИрдВ рдЙрдиреНрд╣реЗрдВ рд╕рднреА рдореЛрд░реНрдЪреЛрдВ рдкрд░ рд╣рд░рд╛ рджреВрдВрдЧрд╛!

рдпрджрд┐ рдереЛрдбрд╝рд╛ рдФрд░ рдЧрдВрднреАрд░рддрд╛ рд╕реЗ рд▓рд┐рдпрд╛ рдЬрд╛рдП, рддреЛ рдЕрдкрд╛рдЪреЗ рдПрдпрд░рдлреНрд▓реЛ - рдкреНрд░реЛрдЧреНрд░рд╛рдо рдХреЛрдб рдХреЗ рд░реВрдк рдореЗрдВ рдкреНрд░рдХреНрд░рд┐рдпрд╛рдУрдВ рдХрд╛ рд╡рд░реНрдгрди рдХрд░рдХреЗ - рдиреЗ рдореЗрд░рд╛ рдХрд╛рдо рдХрд┐рдпрд╛ рдЕрдзрд┐рдХ рдЕрдзрд┐рдХ рдЖрд░рд╛рдорджрд╛рдпрдХ рдФрд░ рдЖрдирдВрджрджрд╛рдпрдХ.

рдЗрд╕рдХреА рдЕрд╕реАрдорд┐рдд рд╡рд┐рд╕реНрддрд╛рд░рд╢реАрд▓рддрд╛, рдкреНрд▓рдЧ-рдЗрди рдФрд░ рд╕реНрдХреЗрд▓реЗрдмрд┐рд▓рд┐рдЯреА рдХреА рдкреНрд░рд╡реГрддреНрддрд┐ рджреЛрдиреЛрдВ рдХреЗ рд╕рдВрджрд░реНрдн рдореЗрдВ, рдЖрдкрдХреЛ рд▓рдЧрднрдЧ рдХрд┐рд╕реА рднреА рдХреНрд╖реЗрддреНрд░ рдореЗрдВ рдПрдпрд░рдлреНрд▓реЛ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдиреЗ рдХрд╛ рдЕрд╡рд╕рд░ рджреЗрддреА рд╣реИ: рдпрд╣рд╛рдВ рддрдХ тАЛтАЛтАЛтАЛрдХрд┐ рдбреЗрдЯрд╛ рдПрдХрддреНрд░ рдХрд░рдиреЗ, рддреИрдпрд╛рд░ рдХрд░рдиреЗ рдФрд░ рд╕рдВрд╕рд╛рдзрд┐рдд рдХрд░рдиреЗ рдХреЗ рдкреВрд░реНрдг рдЪрдХреНрд░ рдореЗрдВ, рдпрд╣рд╛рдВ рддрдХ тАЛтАЛтАЛтАЛрдХрд┐ рд░реЙрдХреЗрдЯ (рдордВрдЧрд▓ рдЧреНрд░рд╣ рдкрд░) рд▓реЙрдиреНрдЪ рдХрд░рдиреЗ рдореЗрдВ рднреА рдЕрд╡рдзрд┐)ред

рднрд╛рдЧ рдЕрдВрддрд┐рдо, рд╕рдВрджрд░реНрдн рдПрд╡рдВ рдЬрд╛рдирдХрд╛рд░реА

рд░реЗрдХ рд╣рдордиреЗ рдЖрдкрдХреЗ рд▓рд┐рдП рдПрдХрддреНрд░ рдХрд┐рдпрд╛ рд╣реИ

  • start_date. рд╣рд╛рдБ, рдпрд╣ рдкрд╣рд▓реЗ рд╕реЗ рд╣реА рдПрдХ рд╕реНрдерд╛рдиреАрдп рдореЗрдо рд╣реИред рдбреМрдЧ рдХреЗ рдореБрдЦреНрдп рддрд░реНрдХ рдХреЗ рдорд╛рдзреНрдпрдо рд╕реЗ start_date рд╕рднреА рдкрд╛рд╕ред рд╕рдВрдХреНрд╖реЗрдк рдореЗрдВ, рдпрджрд┐ рдЖрдк рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХрд░реЗрдВ start_date рд╡рд░реНрддрдорд╛рди рддрд┐рдерд┐, рдФрд░ schedule_interval - рдПрдХ рджрд┐рди, рдлрд┐рд░ рдбреАрдПрдЬреА рдХрд▓ рд╕реЗ рдкрд╣рд▓реЗ рд╢реБрд░реВ рдирд╣реАрдВ рд╣реЛрдЧрд╛ред
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    рдФрд░ рдХреЛрдИ рд╕рдорд╕реНрдпрд╛ рдирд╣реАрдВ.

    рдЗрд╕рдХреЗ рд╕рд╛рде рдПрдХ рдФрд░ рд░рдирдЯрд╛рдЗрдо рддреНрд░реБрдЯрд┐ рдЬреБрдбрд╝реА рд╣реБрдИ рд╣реИ: Task is missing the start_date parameter, рдЬреЛ рдЕрдХреНрд╕рд░ рдЗрдВрдЧрд┐рдд рдХрд░рддрд╛ рд╣реИ рдХрд┐ рдЖрдк рдбреИрдЧ рдСрдкрд░реЗрдЯрд░ рд╕реЗ рдЬреБрдбрд╝рдирд╛ рднреВрд▓ рдЧрдП рд╣реИрдВред

  • рд╕рднреА рдПрдХ рдорд╢реАрди рдкрд░. рд╣рд╛рдВ, рдФрд░ рдЖрдзрд╛рд░ (рдПрдпрд░рдлреНрд▓реЛ рд╕реНрд╡рдпрдВ рдФрд░ рд╣рдорд╛рд░реА рдХреЛрдЯрд┐рдВрдЧ), рдФрд░ рдПрдХ рд╡реЗрдм рд╕рд░реНрд╡рд░, рдФрд░ рдПрдХ рд╢реЗрдбреНрдпреВрд▓рд░, рдФрд░ рдХрд╛рд░реНрдпрдХрд░реНрддрд╛ред рдФрд░ рдпрд╣ рдХрд╛рдо рднреА рдХрд░ рдЧрдпрд╛. рд▓реЗрдХрд┐рди рд╕рдордп рдХреЗ рд╕рд╛рде, рд╕реЗрд╡рд╛рдУрдВ рдХреЗ рд▓рд┐рдП рдХрд╛рд░реНрдпреЛрдВ рдХреА рд╕рдВрдЦреНрдпрд╛ рдореЗрдВ рд╡реГрджреНрдзрд┐ рд╣реБрдИ, рдФрд░ рдЬрдм PostgreSQL рдиреЗ 20 рдПрдордПрд╕ рдХреЗ рдмрдЬрд╛рдп 5 рдПрд╕ рдореЗрдВ рд╕реВрдЪрдХрд╛рдВрдХ рдкрд░ рдкреНрд░рддрд┐рдХреНрд░рд┐рдпрд╛ рджреЗрдирд╛ рд╢реБрд░реВ рдХрд┐рдпрд╛, рддреЛ рд╣рдордиреЗ рдЗрд╕реЗ рд▓реЗ рд▓рд┐рдпрд╛ рдФрд░ рдЗрд╕реЗ рджреВрд░ рд▓реЗ рдЧрдПред
  • рд╕реНрдерд╛рдиреАрдп рдирд┐рд╖реНрдкрд╛рджрдХ. рд╣рд╛рдБ, рд╣рдо рдЕрднреА рднреА рдЙрд╕ рдкрд░ рдмреИрдареЗ рд╣реИрдВ, рдФрд░ рд╣рдо рдкрд╣рд▓реЗ рд╣реА рд░рд╕рд╛рддрд▓ рдХреЗ рдХрд┐рдирд╛рд░реЗ рдкрд░ рдЖ рдЪреБрдХреЗ рд╣реИрдВред LocalExecutor рдЕрдм рддрдХ рд╣рдорд╛рд░реЗ рд▓рд┐рдП рдкрд░реНрдпрд╛рдкреНрдд рд░рд╣рд╛ рд╣реИ, рд▓реЗрдХрд┐рди рдЕрдм рдХрдо рд╕реЗ рдХрдо рдПрдХ рдХрд╛рд░реНрдпрдХрд░реНрддрд╛ рдХреЗ рд╕рд╛рде рд╡рд┐рд╕реНрддрд╛рд░ рдХрд░рдиреЗ рдХрд╛ рд╕рдордп рдЖ рдЧрдпрд╛ рд╣реИ, рдФрд░ рд╣рдореЗрдВ CeleryExecutor рдореЗрдВ рдЬрд╛рдиреЗ рдХреЗ рд▓рд┐рдП рдХрдбрд╝реА рдореЗрд╣рдирдд рдХрд░рдиреА рд╣реЛрдЧреАред рдФрд░ рдЗрд╕ рддрдереНрдп рдХреЛ рдзреНрдпрд╛рди рдореЗрдВ рд░рдЦрддреЗ рд╣реБрдП рдХрд┐ рдЖрдк рдЗрд╕рдХреЗ рд╕рд╛рде рдПрдХ рдорд╢реАрди рдкрд░ рдХрд╛рдо рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ, рдЖрдкрдХреЛ рд╕рд░реНрд╡рд░ рдкрд░ рднреА рд╕реЗрд▓реЗрд░реА рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдиреЗ рд╕реЗ рдХреЛрдИ рдирд╣реАрдВ рд░реЛрдХрддрд╛ рд╣реИ, рдЬреЛ "рдмреЗрд╢рдХ, рдИрдорд╛рдирджрд╛рд░реА рд╕реЗ рдХрднреА рднреА рдЙрддреНрдкрд╛рджрди рдореЗрдВ рдирд╣реАрдВ рдЬрд╛рдПрдЧрд╛!"
  • рдЕрдиреБрдкрдпреЛрдЧреА рдЕрдВрддрд░реНрдирд┐рд░реНрдорд┐рдд рдЙрдкрдХрд░рдг:
    • рдХрдиреЗрдХреНрд╢рди рд╕реЗрд╡рд╛ рдХреНрд░реЗрдбреЗрдВрд╢рд┐рдпрд▓ рд╕рдВрдЧреНрд░рд╣реАрдд рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП,
    • рдПрд╕рдПрд▓рдП рдЪреВрдХ рдЧрдпрд╛ рдЙрди рдХрд╛рд░реНрдпреЛрдВ рдХрд╛ рдЙрддреНрддрд░ рджреЗрдирд╛ рдЬреЛ рд╕рдордп рдкрд░ рдкреВрд░реЗ рдирд╣реАрдВ рд╣реБрдП,
    • рдПрдХреНрд╕рдХреЙрдо рдореЗрдЯрд╛рдбреЗрдЯрд╛ рд╡рд┐рдирд┐рдордп рдХреЗ рд▓рд┐рдП (рдореИрдВрдиреЗ рдХрд╣рд╛ рдореЗрдЯрд╛рдбреЗрдЯрд╛!) рдбреЗрдЧ рдХрд╛рд░реНрдпреЛрдВ рдХреЗ рдмреАрдЪред
  • рдореЗрд▓ рджреБрд░реБрдкрдпреЛрдЧ. рдЦреИрд░, рдореЗрд░реА рдУрд░ рд╕реЗ рдХреНрдпрд╛ рдХрд╣рд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИ? рдЧрд┐рд░реЗ рд╣реБрдП рдХрд╛рд░реНрдпреЛрдВ рдХреА рд╕рднреА рдкреБрдирд░рд╛рд╡реГрддреНрддрд┐рдпреЛрдВ рдХреЗ рд▓рд┐рдП рдЕрд▓рд░реНрдЯ рд╕реНрдерд╛рдкрд┐рдд рдХрд┐рдП рдЧрдП рдереЗред рдЕрдм рдореЗрд░реЗ рдХрд╛рдо рдЬреАрдореЗрд▓ рдореЗрдВ рдПрдпрд░рдлреНрд▓реЛ рд╕реЗ 90 рд╣рдЬрд╛рд░ рдИрдореЗрд▓ рд╣реИрдВ, рдФрд░ рд╡реЗрдм рдореЗрд▓ рдереВрдерди рдПрдХ рд╕рдордп рдореЗрдВ 100 рд╕реЗ рдЕрдзрд┐рдХ рдХреЛ рд▓реЗрдиреЗ рдФрд░ рд╣рдЯрд╛рдиреЗ рд╕реЗ рдЗрдирдХрд╛рд░ рдХрд░рддрд╛ рд╣реИред

рдЕрдзрд┐рдХ рдиреБрдХрд╕рд╛рди: рдЕрдкрд╛рдЪреЗ рдПрдпрд░рдлреНрд▓реЛ рдкрд┐рдЯрдлреЗрд▓

рдЕрдзрд┐рдХ рд╕реНрд╡рдЪрд╛рд▓рди рдЙрдкрдХрд░рдг

рд╣рдо рдЕрдкрдиреЗ рд╣рд╛рдереЛрдВ рд╕реЗ рдирд╣реАрдВ рдмрд▓реНрдХрд┐ рдЕрдкрдиреЗ рд╕рд┐рд░реЛрдВ рд╕реЗ рдФрд░ рднреА рдЕрдзрд┐рдХ рдХрд╛рдо рдХрд░ рд╕рдХреЗрдВ, рдЗрд╕рдХреЗ рд▓рд┐рдП рдПрдпрд░рдлреНрд▓реЛ рдиреЗ рд╣рдорд╛рд░реЗ рд▓рд┐рдП рдпрд╣ рддреИрдпрд╛рд░реА рдХреА рд╣реИ:

  • рдмрд╛рдХреА рдПрдкреАрдЖрдИ - рдЙрдирдХреЗ рдкрд╛рд╕ рдЕрднреА рднреА рдПрдХреНрд╕рдкреЗрд░рд┐рдореЗрдВрдЯрд▓ рдХрд╛ рджрд░реНрдЬрд╛ рд╣реИ, рдЬреЛ рдЙрдиреНрд╣реЗрдВ рдХрд╛рдо рдХрд░рдиреЗ рд╕реЗ рдирд╣реАрдВ рд░реЛрдХрддрд╛ рд╣реИред рдЗрд╕рдХреЗ рд╕рд╛рде, рдЖрдк рди рдХреЗрд╡рд▓ рдбреИрдЧ рдФрд░ рдХрд╛рд░реНрдпреЛрдВ рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рдЬрд╛рдирдХрд╛рд░реА рдкреНрд░рд╛рдкреНрдд рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ, рдмрд▓реНрдХрд┐ рдПрдХ рдбреИрдЧ рдХреЛ рд░реЛрдХ/рд╢реБрд░реВ рднреА рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ, рдПрдХ рдбреАрдПрдЬреА рд░рди рдпрд╛ рдПрдХ рдкреВрд▓ рдмрдирд╛ рд╕рдХрддреЗ рд╣реИрдВред
  • рд╕реАрдПрд▓рдЖрдИ - рдХрдИ рдЙрдкрдХрд░рдг рдХрдорд╛рдВрдб рд▓рд╛рдЗрди рдХреЗ рдорд╛рдзреНрдпрдо рд╕реЗ рдЙрдкрд▓рдмреНрдз рд╣реИрдВ рдЬреЛ рд╡реЗрдмрдпреВрдЖрдИ рдХреЗ рдорд╛рдзреНрдпрдо рд╕реЗ рдЙрдкрдпреЛрдЧ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рди рдХреЗрд╡рд▓ рдЕрд╕реБрд╡рд┐рдзрд╛рдЬрдирдХ рд╣реИрдВ, рдмрд▓реНрдХрд┐ рдЖрдо рддреМрд░ рдкрд░ рдЕрдиреБрдкрд╕реНрдерд┐рдд рд╣реИрдВред рдЙрджрд╛рд╣рд░рдг рдХреЗ рд▓рд┐рдП:
    • backfill рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдгреЛрдВ рдХреЛ рдкреБрдирдГ рдЖрд░рдВрдн рдХрд░рдиреЗ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИред
      рдЙрджрд╛рд╣рд░рдг рдХреЗ рд▓рд┐рдП, рд╡рд┐рд╢реНрд▓реЗрд╖рдХреЛрдВ рдиреЗ рдЖрдХрд░ рдХрд╣рд╛: тАЬрдФрд░ рдЖрдк, рдХреЙрдорд░реЗрдб, 1 рд╕реЗ 13 рдЬрдирд╡рд░реА рддрдХ рдХреЗ рдЖрдВрдХрдбрд╝реЛрдВ рдореЗрдВ рдмрдХрд╡рд╛рд╕ рд╣реИ! рдЗрд╕реЗ рдареАрдХ рдХрд░реЛ, рдЗрд╕реЗ рдареАрдХ рдХрд░реЛ, рдЗрд╕реЗ рдареАрдХ рдХрд░реЛ, рдЗрд╕реЗ рдареАрдХ рдХрд░реЛ!" рдФрд░ рдЖрдк рдРрд╕реЗ рд╢реМрдХреАрди рд╣реИрдВ:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • рдЖрдзрд╛рд░ рд╕реЗрд╡рд╛: initdb, resetdb, upgradedb, checkdb.
    • run, рдЬреЛ рдЖрдкрдХреЛ рдПрдХ рдЗрдВрд╕реНрдЯреЗрдВрд╕ рдХрд╛рд░реНрдп рдЪрд▓рд╛рдиреЗ рдФрд░ рдпрд╣рд╛рдВ рддрдХ тАЛтАЛрдХрд┐ рд╕рднреА рдирд┐рд░реНрднрд░рддрд╛рдУрдВ рдкрд░ рд╕реНрдХреЛрд░ рдХрд░рдиреЗ рдХреА рдЕрдиреБрдорддрд┐ рджреЗрддрд╛ рд╣реИред рдЗрд╕рдХреЗ рдЕрд▓рд╛рд╡рд╛, рдЖрдк рдЗрд╕реЗ рдЗрд╕рдХреЗ рдорд╛рдзреНрдпрдо рд╕реЗ рдЪрд▓рд╛ рд╕рдХрддреЗ рд╣реИрдВ LocalExecutor, рднрд▓реЗ рд╣реА рдЖрдкрдХреЗ рдкрд╛рд╕ рдЕрдЬрд╡рд╛рдЗрди рдХреНрд▓рд╕реНрдЯрд░ рд╣реЛред
    • рдХрд╛рдлреА рд╣рдж рддрдХ рд╡рд╣реА рдХрд╛рдо рдХрд░рддрд╛ рд╣реИ test, рдХреЗрд╡рд▓ рдЖрдзрд╛рд░реЛрдВ рдореЗрдВ рднреА рдХреБрдЫ рдирд╣реАрдВ рд▓рд┐рдЦрддрд╛ред
    • connections рд╢реЗрд▓ рд╕реЗ рдмрдбрд╝реЗ рдкреИрдорд╛рдиреЗ рдкрд░ рдХрдиреЗрдХреНрд╢рди рдмрдирд╛рдиреЗ рдХреА рдЕрдиреБрдорддрд┐ рджреЗрддрд╛ рд╣реИред
  • рдкрд╛рдпрдерди рдПрдкреАрдЖрдИ - рдЗрдВрдЯрд░реИрдХреНрдЯ рдХрд░рдиреЗ рдХрд╛ рдПрдХ рдХрдЯреНрдЯрд░ рддрд░реАрдХрд╛, рдЬреЛ рдкреНрд▓рдЧрдЗрдиреНрд╕ рдХреЗ рд▓рд┐рдП рд╣реИ, рди рдХрд┐ рдЗрд╕рдореЗрдВ рдЫреЛрдЯреЗ рд╣рд╛рдереЛрдВ рд╕реЗ рдЭреБрдВрдб рдмрдирд╛рдиреЗ рдХреЗ рд▓рд┐рдПред рд▓реЗрдХрд┐рди рд╣рдореЗрдВ рдЬрд╛рдиреЗ рд╕реЗ рд░реЛрдХрдиреЗ рд╡рд╛рд▓рд╛ рдХреМрди рд╣реИ /home/airflow/dags, рджреМрдбрд╝рдирд╛ ipython рдФрд░ рдЧрдбрд╝рдмрдбрд╝ рдХрд░рдирд╛ рд╢реБрд░реВ рдХрд░ рджреЛ? рдЙрджрд╛рд╣рд░рдг рдХреЗ рд▓рд┐рдП, рдЖрдк рдирд┐рдореНрдирд▓рд┐рдЦрд┐рдд рдХреЛрдб рдХреЗ рд╕рд╛рде рд╕рднреА рдХрдиреЗрдХреНрд╢рди рдирд┐рд░реНрдпрд╛рдд рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • рдПрдпрд░рдлреНрд▓реЛ рдореЗрдЯрд╛рдбреЗрдЯрд╛рдмреЗрд╕ рд╕реЗ рдХрдиреЗрдХреНрдЯ рд╣реЛ рд░рд╣рд╛ рд╣реИред рдореИрдВ рдЗрд╕реЗ рд▓рд┐рдЦрдиреЗ рдХреА рдЕрдиреБрд╢рдВрд╕рд╛ рдирд╣реАрдВ рдХрд░рддрд╛, рд▓реЗрдХрд┐рди рд╡рд┐рднрд┐рдиреНрди рд╡рд┐рд╢рд┐рд╖реНрдЯ рдореЗрдЯреНрд░рд┐рдХреНрд╕ рдХреЗ рд▓рд┐рдП рдХрд╛рд░реНрдп рд╕реНрдерд┐рддрд┐ рдкреНрд░рд╛рдкреНрдд рдХрд░рдирд╛ рдХрд┐рд╕реА рднреА рдПрдкреАрдЖрдИ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдиреЗ рдХреА рддреБрд▓рдирд╛ рдореЗрдВ рдмрд╣реБрдд рддреЗрдЬрд╝ рдФрд░ рдЖрд╕рд╛рди рд╣реЛ рд╕рдХрддрд╛ рд╣реИред

    рдорд╛рди рд▓реАрдЬрд┐рдП рдХрд┐ рд╣рдорд╛рд░реЗ рд╕рднреА рдХрд╛рд░реНрдп рдирд┐рд░рд░реНрдердХ рдирд╣реАрдВ рд╣реИрдВ, рд▓реЗрдХрд┐рди рдХрднреА-рдХрднреА рдЙрдирдореЗрдВ рдЧрд┐рд░рд╛рд╡рдЯ рдЖ рд╕рдХрддреА рд╣реИ, рдФрд░ рдпрд╣ рд╕рд╛рдорд╛рдиреНрдп рд╣реИред рд▓реЗрдХрд┐рди рдХреБрдЫ рд░реБрдХрд╛рд╡рдЯреЗрдВ рдкрд╣рд▓реЗ рд╕реЗ рд╣реА рд╕рдВрджрд┐рдЧреНрдз рд╣реИрдВ, рдФрд░ рдЙрдирдХреА рдЬрд╛рдБрдЪ рдХрд░рдирд╛ рдЖрд╡рд╢реНрдпрдХ рд╣реЛрдЧрд╛ред

    рдПрд╕рдХреНрдпреВрдПрд▓ рд╕реЗ рд╕рд╛рд╡рдзрд╛рди рд░рд╣реЗрдВ!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

рд╕рдВрджрд░реНрднреЛрдВ

рдФрд░ рдирд┐рд╢реНрдЪрд┐рдд рд░реВрдк рд╕реЗ, Google рдХреЗ рдЬрд╛рд░реА рд╣реЛрдиреЗ рдХреЗ рдкрд╣рд▓реЗ рджрд╕ рд▓рд┐рдВрдХ рдореЗрд░реЗ рдмреБрдХрдорд╛рд░реНрдХ рд╕реЗ рдПрдпрд░рдлреНрд▓реЛ рдлрд╝реЛрд▓реНрдбрд░ рдХреА рд╕рд╛рдордЧреНрд░реА рд╣реИрдВред

рдФрд░ рд▓реЗрдЦ рдореЗрдВ рдкреНрд░рдпреБрдХреНрдд рд▓рд┐рдВрдХ:

рд╕реНрд░реЛрдд: www.habr.com