рд╣рд╛рдп, рдореА рджрд┐рдорд┐рддреНрд░реА рд▓реЙрдЧрд╡рд┐рдиреЗрдВрдХреЛ рдЖрд╣реЗ - рд╡реНрд╣реЗрдЭреЗрдЯ рдЧреНрд░реБрдк рдСрдл рдХрдВрдкрдиреНрдпрд╛рдВрдЪреНрдпрд╛ рд╡рд┐рд╢реНрд▓реЗрд╖рдг рд╡рд┐рднрд╛рдЧрд╛рдЪрд╛ рдбреЗрдЯрд╛ рдЕрднрд┐рдпрдВрддрд╛.
рдореА рддреБрдореНрд╣рд╛рд▓рд╛ ETL рдкреНрд░рдХреНрд░рд┐рдпрд╛ рд╡рд┐рдХрд╕рд┐рдд рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдПрдХ рдЕрджреНрднреБрдд рд╕рд╛рдзрди рд╕рд╛рдВрдЧреЗрди - Apache Airflow. рдкрд░рдВрддреБ рдПрдЕрд░рдлреНрд▓реЛ рдЗрддрдХрд╛ рдЕрд╖реНрдЯрдкреИрд▓реВ рдЖрдгрд┐ рдмрд╣реБрдЖрдпрд╛рдореА рдЖрд╣реЗ рдХреА рддреБрдореНрд╣реА рдбреЗрдЯрд╛ рдкреНрд░рд╡рд╛рд╣рд╛рдд рдЧреБрдВрддрд▓реЗрд▓реЗ рдирд╕рд▓реЗ рддрд░реАрд╣реА рддреБрдореНрд╣реА рддреНрдпрд╛рд╡рд░ рдмрд╛рд░рдХрд╛рдИрдиреЗ рд▓рдХреНрд╖ рджрд┐рд▓реЗ рдкрд╛рд╣рд┐рдЬреЗ, рдкрд░рдВрддреБ рд╡реЗрд│реЛрд╡реЗрд│реА рдХреЛрдгрддреАрд╣реА рдкреНрд░рдХреНрд░рд┐рдпрд╛ рд╕реБрд░реВ рдХрд░рдгреЗ рдЖрдгрд┐ рддреНрдпрд╛рдВрдЪреНрдпрд╛ рдЕрдВрдорд▓рдмрдЬрд╛рд╡рдгреАрдЪреЗ рдирд┐рд░реАрдХреНрд╖рдг рдХрд░рдгреЗ рдЖрд╡рд╢реНрдпрдХ рдЖрд╣реЗ.
рдЖрдгрд┐ рд╣реЛрдп, рдореА рдлрдХреНрдд рд╕рд╛рдВрдЧрдгрд╛рд░ рдирд╛рд╣реА, рддрд░ рджрд░реНрд╢рд╡рд┐рддреЛ: рдкреНрд░реЛрдЧреНрд░рд╛рдордордзреНрдпреЗ рдмрд░реЗрдЪ рдХреЛрдб, рд╕реНрдХреНрд░реАрдирд╢реЙрдЯ рдЖрдгрд┐ рд╢рд┐рдлрд╛рд░рд╕реА рдЖрд╣реЗрдд.
рдПрдЕрд░рдлреНрд▓реЛ / рд╡рд┐рдХрд┐рдореАрдбрд┐рдпрд╛ рдХреЙрдордиреНрд╕ рд╣рд╛ рд╢рдмреНрдж рдЧреБрдЧрд▓ рдХрд░рддрд╛рдирд╛ рддреБрдореНрд╣реА рд╕рд╣рд╕рд╛ рдХрд╛рдп рдкрд╛рд╣рддрд╛
рд╕рд╛рдордЧреНрд░реА рд╕рд╛рд░рдгреА
рдкрд░рд┐рдЪрдп рдореБрдЦреНрдп рднрд╛рдЧ, рд╡реНрдпрд╛рд╡рд╣рд╛рд░рд┐рдХ (рдЖрдгрд┐ рдереЛрдбрд╛ рд╕реИрджреНрдзрд╛рдВрддрд┐рдХ) рдЖрдореНрд╣реА (рдЖрдгрд┐ рддреБрдореНрд╣реА) рдХрд╛? рдХреНрд▓рд╕реНрдЯрд░ рдПрдХрддреНрд░ рдХрд░рдгреЗ рдореВрд▓рднреВрдд рд╕рдВрдХрд▓реНрдкрдирд╛ рдЖрдореНрд╣реА рдХрд╛рд░реНрдпреЗ рд╡реНрдпреБрддреНрдкрдиреНрди рдХрд░рддреЛ рдлреНрд▓реЙрд╡рд░ рдмрджреНрджрд▓ рдереЛрдбреЗ рдЖрдореНрд╣реА рдЕрдВрдбрд░рд▓реЛрдбреЗрдб рд▓реЛрдб рдХрд░рддреЛ рдХрдиреЗрдХреНрд╢рди, рд╣реБрдХ рдЖрдгрд┐ рдЗрддрд░ рдЪрд▓ рд╕рд╛рдиреБрдХреВрд▓ рдСрдкрд░реЗрдЯрд░рдЪреЗ рд╡рд┐рд╢реНрд▓реЗрд╖рдг рдУрддрдгрд╛рд░ рдЖрд╣рд╛рдд рдХрд╛? рдЧреЛрд│рд╛ рдХрд░реАрдд рдЖрд╣реЗ
рднрд╛рдЧ рдЕрдВрддрд┐рдо, рд╕рдВрджрд░реНрдн рдЖрдгрд┐ рдорд╛рд╣рд┐рддреА рд╕рдВрджрд░реНрдн
рдкрд░рд┐рдЪрдп
рдЕрдкрд╛рдЪреЗ рдПрдЕрд░рдлреНрд▓реЛ рдЬреЕрдВрдЧреЛрдкреНрд░рдорд╛рдгреЗрдЪ рдЖрд╣реЗ:
- рдЕрдЬрдЧрд░рд╛рдд рд▓рд┐рд╣рд┐рд▓реЗрд▓реЗ
- рдПрдХ рдЙрддреНрддрдо рдЕреЕрдбрдорд┐рди рдкреЕрдирд▓ рдЖрд╣реЗ,
- рдЕрдирд┐рд╢реНрдЪрд┐рдд рдХрд╛рд│рд╛рд╕рд╛рдареА рд╡рд┐рд╕реНрддрд╛рд░рдгреНрдпрд╛рдпреЛрдЧреНрдп
- рдлрдХреНрдд рдЪрд╛рдВрдЧрд▓реЗ, рдЖрдгрд┐ рддреЗ рдкреВрд░реНрдгрдкрдгреЗ рднрд┐рдиреНрди рд╣реЗрддреВрдВрд╕рд╛рдареА рдмрдирд╡рд▓реЗ рдЧреЗрд▓реЗ рд╣реЛрддреЗ, рдореНрд╣рдгрдЬреЗ (рдЬрд╕реЗ рддреЗ рдХреЕрдЯрдЪреНрдпрд╛ рдЖрдзреА рд▓рд┐рд╣рд┐рд▓реЗрд▓реЗ рдЖрд╣реЗ):
- рдЕрдорд░реНрдпрд╛рджрд┐рдд рдорд╢реАрдирд╡рд░ рдХрд╛рд░реНрдпреЗ рдЪрд╛рд▓рд╡рдгреЗ рдЖрдгрд┐ рджреЗрдЦрд░реЗрдЦ рдХрд░рдгреЗ (рдмрд░реЗрдЪ рд╕реЗрд▓реЗрд░реА / рдХреБрдмрд░реНрдиреЗрдЯреНрд╕ рдЖрдгрд┐ рддреБрдордЪрд╛ рд╡рд┐рд╡реЗрдХ рддреБрдореНрд╣рд╛рд▓рд╛ рдЕрдиреБрдорддреА рджреЗрдИрд▓)
- рдбрд╛рдпрдиреЕрдорд┐рдХ рд╡рд░реНрдХрдлреНрд▓реЛ рдирд┐рд░реНрдорд┐рддреАрд╕рд╣ рдкрд╛рдпрдерди рдХреЛрдб рд▓рд┐рд╣рд┐рдгреНрдпрд╛рд╕ рдЖрдгрд┐ рд╕рдордЬрдгреНрдпрд╛рд╕ рдЕрдЧрджреА рд╕реЛрдкреЗ рдЖрд╣реЗ
- рдЖрдгрд┐ рддрдпрд╛рд░ рдХреЗрд▓реЗрд▓реЗ рдШрдЯрдХ рдЖрдгрд┐ рд╣реЛрдордореЗрдб рдкреНрд▓рдЧрдЗрди (рдЬреЗ рдЕрддреНрдпрдВрдд рд╕реЛрдкреЗ рдЖрд╣реЗ) рд╡рд╛рдкрд░реВрди рдХреЛрдгрддрд╛рд╣реА рдбреЗрдЯрд╛рдмреЗрд╕ рдЖрдгрд┐ API рдПрдХрдореЗрдХрд╛рдВрд╢реА рдХрдиреЗрдХреНрдЯ рдХрд░рдгреНрдпрд╛рдЪреА рдХреНрд╖рдорддрд╛.
рдЖрдореНрд╣реА Apache Airflow рдпрд╛рдкреНрд░рдорд╛рдгреЗ рд╡рд╛рдкрд░рддреЛ:
- рдЖрдореНрд╣реА рдбреАрдбрдмреНрд▓реНрдпреВрдПрдЪ рдЖрдгрд┐ рдУрдбреАрдПрд╕ (рдЖрдордЪреНрдпрд╛рдХрдбреЗ рд╡реНрд╣рд░реНрдЯрд┐рдХрд╛ рдЖрдгрд┐ рдХреНрд▓рд┐рдХрд╣рд╛рдКрд╕ рдЖрд╣реЗрдд) рдордзреАрд▓ рд╡рд┐рд╡рд┐рдз рд╕реНрддреНрд░реЛрддрд╛рдВрдХрдбреВрди (рдЕрдиреЗрдХ SQL рд╕рд░реНрд╡реНрд╣рд░ рдЖрдгрд┐ рдкреЛрд╕реНрдЯрдЧреНрд░реЗрдПрд╕рдХреНрдпреВрдПрд▓ рдЙрджрд╛рд╣рд░рдгреЗ, рдЕрдиреБрдкреНрд░рдпреЛрдЧ рдореЗрдЯреНрд░рд┐рдХрд╕рд╣ рд╡рд┐рд╡рд┐рдз API, рдЕрдЧрджреА 1C) рдбреЗрдЯрд╛ рдЧреЛрд│рд╛ рдХрд░рддреЛ.
- рдХрд┐рддреА рдкреНрд░рдЧрдд
cron
, рдЬреЗ ODS рд╡рд░ рдбреЗрдЯрд╛ рдПрдХрддреНрд░реАрдХрд░рдг рдкреНрд░рдХреНрд░рд┐рдпрд╛ рд╕реБрд░реВ рдХрд░рддреЗ рдЖрдгрд┐ рддреНрдпрд╛рдВрдЪреНрдпрд╛ рджреЗрдЦрднрд╛рд▓реАрд╡рд░ рджреЗрдЦреАрд▓ рд▓рдХреНрд╖ рдареЗрд╡рддреЗ.
рдЕрд▓реАрдХрдбреЗ рдкрд░реНрдпрдВрдд, рдЖрдордЪреНрдпрд╛ рдЧрд░рдЬрд╛ 32 рдХреЛрд░ рдЖрдгрд┐ 50 GB RAM рдЕрд╕рд▓реЗрд▓реНрдпрд╛ рдПрдХрд╛ рдЫреЛрдЯреНрдпрд╛ рд╕рд░реНрд╡реНрд╣рд░рджреНрд╡рд╛рд░реЗ рдкреВрд░реНрдг рдХреЗрд▓реНрдпрд╛ рдЬрд╛рдд рд╣реЛрддреНрдпрд╛. рдПрдЕрд░рдлреНрд▓реЛрдордзреНрдпреЗ, рд╣реЗ рдХрд╛рд░реНрдп рдХрд░рддреЗ:
- рдЕрдзрд┐рдХ 200 рдбреЕрдЧреНрдЬ (рд╡рд╛рд╕реНрддрд╡рд┐рдХ рдХрд╛рд░реНрдпрдкреНрд░рд╡рд╛рд╣, рдЬреНрдпрд╛рдордзреНрдпреЗ рдЖрдореНрд╣реА рдХрд╛рд░реНрдпреЗ рднрд░рд▓реА рдЖрд╣реЗрдд)
- рдкреНрд░рддреНрдпреЗрдХрд╛рдордзреНрдпреЗ рд╕рд░рд╛рд╕рд░реА 70 рдХрд╛рд░реНрдпреЗ,
- рд╣рд╛ рдЪрд╛рдВрдЧреБрд▓рдкрдгрд╛ рд╕реБрд░реВ рд╣реЛрддреЛ (рд╕рд░рд╛рд╕рд░реА рджреЗрдЦреАрд▓) рддрд╛рд╕рд╛рддреВрди рдПрдХрджрд╛.
рдЖрдгрд┐ рдЖрдореНрд╣реА рдХрд╕реЗ рд╡рд┐рд╕реНрддрд╛рд░рд┐рдд рдХреЗрд▓реЗ рдпрд╛рдмрджреНрджрд▓, рдореА рдЦрд╛рд▓реА рд▓рд┐рд╣реАрди, рдкрд░рдВрддреБ рдЖрддрд╛ рдЖрдкрдг ├╝ber-рд╕рдорд╕реНрдпрд╛ рдкрд░рд┐рднрд╛рд╖рд┐рдд рдХрд░реВрдпрд╛ рдЬреА рдЖрдкрдг рд╕реЛрдбрд╡реВ:
рддреАрди рдореВрд│ SQL рд╕рд░реНрд╡реНрд╣рд░ рдЖрд╣реЗрдд, рдкреНрд░рддреНрдпреЗрдХрд╛рдордзреНрдпреЗ 50 рдбреЗрдЯрд╛рдмреЗрд╕реЗрд╕ рдЖрд╣реЗрдд - рдЕрдиреБрдХреНрд░рдореЗ рдПрдХрд╛ рдкреНрд░рдХрд▓реНрдкрд╛рдЪреА рдЙрджрд╛рд╣рд░рдгреЗ, рддреНрдпрд╛рдВрдЪреА рд░рдЪрдирд╛ рд╕рдорд╛рди рдЖрд╣реЗ (рдЬрд╡рд│рдЬрд╡рд│ рд╕рд░реНрд╡рддреНрд░, mua-ha-ha), рдореНрд╣рдгрдЬреЗ рдкреНрд░рддреНрдпреЗрдХрд╛рдХрдбреЗ рдСрд░реНрдбрд░ рдЯреЗрдмрд▓ рдЖрд╣реЗ (рд╕реБрджреИрд╡рд╛рдиреЗ, рддреНрдпрд╛рд╕рд╣ рдПрдХ рдЯреЗрдмрд▓ рдирд╛рд╡ рдХреЛрдгрддреНрдпрд╛рд╣реА рд╡реНрдпрд╡рд╕рд╛рдпрд╛рдд рдврдХрд▓рд▓реЗ рдЬрд╛рдК рд╢рдХрддреЗ). рдЖрдореНрд╣реА рд╕рд░реНрд╡реНрд╣рд┐рд╕ рдлреАрд▓реНрдб (рд╕реНрд░реЛрдд рд╕рд░реНрд╡реНрд╣рд░, рд╕реЛрд░реНрд╕ рдбреЗрдЯрд╛рдмреЗрд╕, рдИрдЯреАрдПрд▓ рдЯрд╛рд╕реНрдХ рдЖрдпрдбреА) рдЬреЛрдбреВрди рдбреЗрдЯрд╛ рдШреЗрддреЛ рдЖрдгрд┐ рдЕрдЧрджреА рд╕рд╣рдЬрддреЗрдиреЗ рд╡реНрд╣рд░реНрдЯрд┐рдХрд╛рдордзреНрдпреЗ рдЯрд╛рдХрддреЛ.
рдЪрд▓рд╛ рдЬрд╛рдКрдпрд╛!
рдореБрдЦреНрдп рднрд╛рдЧ, рд╡реНрдпрд╛рд╡рд╣рд╛рд░рд┐рдХ (рдЖрдгрд┐ рдереЛрдбрд╛ рд╕реИрджреНрдзрд╛рдВрддрд┐рдХ)
рдЖрдореНрд╣реА (рдЖрдгрд┐ рддреБрдореНрд╣реА) рдХрд╛?
рдЬреЗрд╡реНрд╣рд╛ рдЭрд╛рдбрдВ рдореЛрдареА рд╣реЛрддреА рдЖрдгрд┐ рдореА рд╕рд╛рдзрд╛ рд╣реЛрддреЛ SQL
-рдПрдХ рд░рд╢рд┐рдпрди рдХрд┐рд░рдХреЛрд│ рд╡рд┐рдХреНрд░реАрдордзреНрдпреЗ, рдЖрдореНрд╣реА рдЖрдордЪреНрдпрд╛рдХрдбреЗ рдЙрдкрд▓рдмреНрдз рджреЛрди рд╕рд╛рдзрдирд╛рдВрдЪрд╛ рд╡рд╛рдкрд░ рдХрд░реВрди рдИрдЯреАрдПрд▓ рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдЙрд░реНрдл тАЛтАЛрдбреЗрдЯрд╛ рдкреНрд░рд╡рд╛рд╣рд╛рдЪрд╛ рдШреЛрдЯрд╛рд│рд╛ рдХреЗрд▓рд╛:
- рдЗрдиреНрдлреЙрд░реНрдореЗрдЯрд┐рдХрд╛ рдкреЙрд╡рд░ рд╕реЗрдВрдЯрд░ - рдПрдХ рдЕрддреНрдпрдВрдд рдкрд╕рд░рдгрд╛рд░реА рдкреНрд░рдгрд╛рд▓реА, рдЕрддреНрдпрдВрдд рдЙрддреНрдкрд╛рджрдХ, рд╕реНрд╡рддрдГрдЪреНрдпрд╛ рд╣рд╛рд░реНрдбрд╡реЗрдЕрд░рд╕рд╣, рд╕реНрд╡рддрдГрдЪреА рдЖрд╡реГрддреНрддреА. рдореА рддреНрдпрд╛рдЪреНрдпрд╛ рдХреНрд╖рдорддреЗрдкреИрдХреА 1% рдЧреЙрдб рдлреЙрд░рдмрд┐рдб рд╡рд╛рдкрд░рд▓рд╛. рдХрд╛? рдмрд░рдВ, рд╕рд░реНрд╡ рдкреНрд░рдердо, 380 рдЪреНрдпрд╛ рджрд╢рдХрд╛рдкрд╛рд╕реВрди рдпрд╛ рдЗрдВрдЯрд░рдлреЗрд╕рдиреЗ рдЖрдкрд▓реНрдпрд╛рд╡рд░ рдорд╛рдирд╕рд┐рдХ рджрдмрд╛рд╡ рдЖрдгрд▓рд╛. рджреБрд╕рд░реЗ рдореНрд╣рдгрдЬреЗ, рд╣реЗ рдХреЙрдиреНрдЯреНрд░реЕрдкреНрд╢рди рдЕрддреНрдпрдВрдд рдлреЕрдиреНрд╕реА рдкреНрд░рдХреНрд░рд┐рдпрд╛рдВрд╕рд╛рдареА рдбрд┐рдЭрд╛рдЗрди рдХреЗрд▓реЗрд▓реЗ рдЖрд╣реЗ, рддреАрд╡реНрд░ рдШрдЯрдХрд╛рдВрдЪрд╛ рдкреБрдирд░реНрд╡рд╛рдкрд░ рдЖрдгрд┐ рдЗрддрд░ рдЕрддреНрдпрдВрдд-рдорд╣рддреНрддреНрд╡рд╛рдЪреНрдпрд╛-рдПрдВрдЯрд░рдкреНрд░рд╛рдЗрдЭ-рдпреБрдХреНрддреНрдпрд╛. рдПрдЕрд░рдмрд╕ AXNUMX / рд╡рд░реНрд╖рд╛рдЪреНрдпрд╛ рдкрдВрдЦрд╛рдкреНрд░рдорд╛рдгреЗ рддреНрдпрд╛рдЪреА рдХрд┐рдВрдордд рдХрд┐рддреА рдЖрд╣реЗ рдпрд╛рдмрджреНрджрд▓ рдЖрдореНрд╣реА рдХрд╛рд╣реАрд╣реА рдмреЛрд▓рдгрд╛рд░ рдирд╛рд╣реА.
рд╕рд╛рд╡рдз рд░рд╣рд╛, рд╕реНрдХреНрд░реАрдирд╢реЙрдЯ 30 рд╡рд░реНрд╖рд╛рдВрдкреЗрдХреНрд╖рд╛ рдХрдореА рд╡рдпрд╛рдЪреНрдпрд╛ рд▓реЛрдХрд╛рдВрдирд╛ рддреНрд░рд╛рд╕ рджреЗрдК рд╢рдХрддреЛ
- SQL рд╕рд░реНрд╡реНрд╣рд░ рдПрдХрддреНрд░реАрдХрд░рдг рд╕рд░реНрд╡реНрд╣рд░ - рдЖрдореНрд╣реА рдЖрдордЪреНрдпрд╛ рдЗрдВрдЯреНрд░рд╛-рдкреНрд░реЛрдЬреЗрдХреНрдЯ рдлреНрд▓реЛрдордзреНрдпреЗ рдпрд╛ рдХреЙрдорд░реЗрдбрдЪрд╛ рд╡рд╛рдкрд░ рдХреЗрд▓рд╛. рдмрд░рдВ, рдЦрд░рдВ рддрд░: рдЖрдореНрд╣реА рдЖрдзреАрдЪ рдПрд╕рдХреНрдпреВрдПрд▓ рд╕рд░реНрд╡реНрд╣рд░ рд╡рд╛рдкрд░рддреЛ рдЖрдгрд┐ рддреНрдпрд╛рдЪреА рдИрдЯреАрдПрд▓ рд╕рд╛рдзрдиреЗ рди рд╡рд╛рдкрд░рдгреЗ рдХрд╛рд╣реАрд╕реЗ рдЕрд╡рд╛рд╕реНрддрд╡ рдард░реЗрд▓. рддреНрдпрд╛рдд рд╕рд░реНрд╡ рдХрд╛рд╣реА рдЪрд╛рдВрдЧрд▓реЗ рдЖрд╣реЗ: рдЗрдВрдЯрд░рдлреЗрд╕ рд╕реБрдВрджрд░ рдЖрд╣реЗ рдЖрдгрд┐ рдкреНрд░рдЧрддреА рдЕрд╣рд╡рд╛рд▓ ... рдкрд░рдВрддреБ рдпрд╛рдореБрд│реЗ рдЖрдореНрд╣рд╛рд▓рд╛ рд╕реЙрдлреНрдЯрд╡реЗрдЕрд░ рдЙрддреНрдкрд╛рджрдиреЗ рдЖрд╡рдбрдд рдирд╛рд╣реАрдд, рдЕрд░реЗрд░реЗ, рдпрд╛рд╕рд╛рдареА рдирд╛рд╣реА. рддреНрдпрд╛рдЪреА рдЖрд╡реГрддреНрддреА
dtsx
(рдЬреЗ рдПрдХреНрд╕рдПрдордПрд▓ рдЖрд╣реЗ рдЬреНрдпрд╛рдордзреНрдпреЗ рд╕реЗрд╡реНрд╣рд╡рд░ рдиреЛрдбреНрд╕ рдмрджрд▓рд▓реЗ рдЖрд╣реЗрдд) рдЖрдореНрд╣реА рдХрд░реВ рд╢рдХрддреЛ, рдкрдг рдореБрджреНрджрд╛ рдХрд╛рдп рдЖрд╣реЗ? рд╢реЗрдХрдбреЛ рдЯреЗрдмрд▓реНрд╕ рдПрдХрд╛ рд╕рд░реНрд╡реНрд╣рд░рд╡рд░реВрди рджреБрд╕рд▒реНрдпрд╛ рд╕рд░реНрд╡реНрд╣рд░рд╡рд░ рдбреНрд░реЕрдЧ рдХрд░рдгрд╛рд░рдВ рдЯрд╛рд╕реНрдХ рдкреЕрдХреЗрдЬ рдмрдирд╡рдгреНрдпрд╛рдмрджреНрджрд▓ рдХрд╛рдп? рд╣реЛрдп, рд╢рдВрднрд░ рдХрд╛рдп, рдорд╛рдКрд╕рдЪреНрдпрд╛ рдмрдЯрдгрд╛рд╡рд░ рдХреНрд▓рд┐рдХ рдХрд░реВрди рд╡реАрд╕ рддреБрдХрдбреНрдпрд╛рдВрдордзреВрди рддреБрдордЪреА рддрд░реНрдЬрдиреА рдЦрд╛рд▓реА рдкрдбреЗрд▓. рдкрд░рдВрддреБ рд╣реЗ рдирд┐рд╢реНрдЪрд┐рддрдкрдгреЗ рдЕрдзрд┐рдХ рдлреЕрд╢рдиреЗрдмрд▓ рджрд┐рд╕рддреЗ:
рдЖрдореНрд╣реА рдирдХреНрдХреАрдЪ рдорд╛рд░реНрдЧ рд╢реЛрдзрдд рд╣реЛрддреЛ. рдХреЗрд╕ рдЕрдЧрджреА рдЬрд╡рд│рдкрд╛рд╕ рд╕реНрд╡-рд▓рд┐рдЦрд┐рдд SSIS рдкреЕрдХреЗрдЬ рдЬрдирд░реЗрдЯрд░рд╡рд░ рдЖрд▓реЗ ...
тАжрдЖрдгрд┐ рдордЧ рдорд▓рд╛ рдирд╡реАрди рдиреЛрдХрд░реА рдорд┐рд│рд╛рд▓реА. рдЖрдгрд┐ рддреНрдпрд╛рд╡рд░ рдЕрдкрд╛рдЪреЗ рдПрдЕрд░рдлреНрд▓реЛрдиреЗ рдорд▓рд╛ рдорд╛рдЧреЗ рдЯрд╛рдХрд▓реЗ.
рдЬреЗрд╡реНрд╣рд╛ рдорд▓рд╛ рдХрд│рд▓реЗ рдХреА ETL рдкреНрд░рдХреНрд░рд┐рдпреЗрдЪреЗ рд╡рд░реНрдгрди рд╕реЛрдкреЗ рдкрд╛рдпрдерди рдХреЛрдб рдЖрд╣реЗрдд, рддреЗрд╡реНрд╣рд╛ рдореА рдЖрдирдВрджрд╛рдиреЗ рдирд╛рдЪрд▓реЛ рдирд╛рд╣реА. рдЕрд╢рд╛рдкреНрд░рдХрд╛рд░реЗ рдбреЗрдЯрд╛ рд╕реНрдЯреНрд░реАрдореНрд╕рдЪреЗ рд╡реНрд╣рд░реНрдЬрди рдЖрдгрд┐ рдлрд░рдХ рдХреЗрд▓реЗ рдЧреЗрд▓реЗ рдЖрдгрд┐ рд╢реЗрдХрдбреЛ рдбреЗрдЯрд╛рдмреЗрд╕реЗрд╕рдордзреВрди рдПрдХрд╛рдЪ рд╕реНрдЯреНрд░рдХреНрдЪрд░рд╕рд╣ рдЯреЗрдмрд▓реНрд╕ рдПрдХрд╛ рдЯрд╛рд░реНрдЧреЗрдЯрдордзреНрдпреЗ рдЯрд╛рдХрдгреЗ рд╣реА рджреАрдб рдХрд┐рдВрд╡рд╛ рджреЛрди 13тАЭ рд╕реНрдХреНрд░реАрдирдордзреНрдпреЗ рдкрд╛рдпрдерди рдХреЛрдбрдЪреА рдмрд╛рдм рдмрдирд▓реА.
рдХреНрд▓рд╕реНрдЯрд░ рдПрдХрддреНрд░ рдХрд░рдгреЗ
рдЪрд▓рд╛ рдкреВрд░реНрдгрдкрдгреЗ рдмрд╛рд▓рд╡рд╛рдбреАрдЪреА рд╡реНрдпрд╡рд╕реНрдерд╛ рдХрд░реВ рдирдХрд╛ рдЖрдгрд┐ рдпреЗрдереЗ рдкреВрд░реНрдгрдкрдгреЗ рд╕реНрдкрд╖реНрдЯ рдЧреЛрд╖реНрдЯреАрдВрдмрджреНрджрд▓ рдмреЛрд▓реВ рдирдХрд╛, рдЬрд╕реЗ рдХреА рдПрдЕрд░рдлреНрд▓реЛ рд╕реНрдерд╛рдкрд┐рдд рдХрд░рдгреЗ, рддреБрдордЪрд╛ рдирд┐рд╡рдбрд▓реЗрд▓рд╛ рдбреЗрдЯрд╛рдмреЗрд╕, рд╕реЗрд▓реЗрд░реА рдЖрдгрд┐ рдбреЙрдХреНрд╕рдордзреНрдпреЗ рд╡рд░реНрдгрди рдХреЗрд▓реЗрд▓реНрдпрд╛ рдЗрддрд░ рдХреЗрд╕реЗрд╕.
рдЬреЗрдгреЗрдХрд░реВрди рдЖрдореНрд╣реА рддрд╛рдмрдбрддреЛрдм рдкреНрд░рдпреЛрдЧ рд╕реБрд░реВ рдХрд░реВ рд╢рдХреВ, рдореА рд░реЗрдЦрд╛рдЯрди рдХреЗрд▓реЗ docker-compose.yml
рдЬреНрдпрд╛рдордзреНрдпреЗ:
- рдЪрд▓рд╛ рдкреНрд░рддреНрдпрдХреНрд╖рд╛рдд рд╡рд╛рдврд╡реВ рд╡рд╛рдпреБрдкреНрд░рд╡рд╛рд╣: рд╢реЗрдбреНрдпреБрд▓рд░, рд╡реЗрдмрд╕рд░реНрд╡реНрд╣рд░. рд╕реЗрд▓рд░реАрдЪреНрдпрд╛ рдХрд╛рдорд╛рдВрдЪреЗ рдирд┐рд░реАрдХреНрд╖рдг рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдлреНрд▓реЙрд╡рд░ рджреЗрдЦреАрд▓ рддреЗрдереЗ рдлрд┐рд░рдд рдЕрд╕реЗрд▓ (рдХрд╛рд░рдг рддреЗ рдЖрдзреАрдЪ рдврдХрд▓рд▓реЗ рдЧреЗрд▓реЗ рдЖрд╣реЗ
apache/airflow:1.10.10-python3.7
рдкрдг рдЖрдордЪреА рд╣рд░рдХрдд рдирд╛рд╣реА) - рдкреЛрд╕реНрдЯрдЧреНрд░реЗ рдПрд╕рдХреНрдпреВрдПрд▓, рдЬреНрдпрд╛рдордзреНрдпреЗ рдПрдЕрд░рдлреНрд▓реЛ рддреНрдпрд╛рдЪреА рд╕реЗрд╡рд╛ рдорд╛рд╣рд┐рддреА (рд╢реЗрдбреНрдпреВрд▓рд░ рдбреЗрдЯрд╛, рдЕрдВрдорд▓рдмрдЬрд╛рд╡рдгреА рдЖрдХрдбреЗрд╡рд╛рд░реА рдЗ.) рд▓рд┐рд╣реАрд▓ рдЖрдгрд┐ рд╕реЗрд▓реЗрд░реА рдкреВрд░реНрдг рдЭрд╛рд▓реЗрд▓реНрдпрд╛ рдХрд╛рд░реНрдпрд╛рдВрдирд╛ рдЪрд┐рдиреНрд╣рд╛рдВрдХрд┐рдд рдХрд░реЗрд▓;
- Redis, рдЬреЗ рд╕реЗрд▓реЗрд░реАрд╕рд╛рдареА рдЯрд╛рд╕реНрдХ рдмреНрд░реЛрдХрд░ рдореНрд╣рдгреВрди рдХрд╛рдо рдХрд░реЗрд▓;
- рд╕реЗрд▓реЗрд░реА рдХрд╛рдордЧрд╛рд░, рдЬреЗ рдХрд╛рд░реНрдпрд╛рдВрдЪреНрдпрд╛ рдереЗрдЯ рдЕрдВрдорд▓рдмрдЬрд╛рд╡рдгреАрдордзреНрдпреЗ рдЧреБрдВрддрд▓реЗрд▓реЗ рдЕрд╕реЗрд▓.
- рдлреЛрд▓реНрдбрд░ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА
./dags
рдЖрдореНрд╣реА рдЖрдордЪреНрдпрд╛ рдлрд╛рдпрд▓реА рдбреЕрдЧреНрдЬрдЪреНрдпрд╛ рд╡рд░реНрдгрдирд╛рд╕рд╣ рдЬреЛрдбреВ. рддреЗ рдорд╛рд╢реАрд╡рд░ рдЙрдЪрд▓рд▓реЗ рдЬрд╛рддреАрд▓, рдореНрд╣рдгреВрди рдкреНрд░рддреНрдпреЗрдХ рд╢рд┐рдВрдХрд▓реНрдпрд╛рдирдВрддрд░ рд╕рдВрдкреВрд░реНрдг рд╕реНрдЯреЕрдХрд▓рд╛ рд╣рд╛рдд рд▓рд╛рд╡рдгреНрдпрд╛рдЪреА рдЧрд░рдЬ рдирд╛рд╣реА.
рдХрд╛рд╣реА рдард┐рдХрд╛рдгреА, рдЙрджрд╛рд╣рд░рдгрд╛рдВрдордзреАрд▓ рдХреЛрдб рдкреВрд░реНрдгрдкрдгреЗ рджрд░реНрд╢рд╡рд┐рд▓рд╛ рдЬрд╛рдд рдирд╛рд╣реА (рдЬреЗрдгреЗрдХрд░реБрди рдордЬрдХреВрд░ рдЧреЛрдВрдзрд│реВрди рдЬрд╛рдК рдирдпреЗ), рдкрд░рдВрддреБ рдХреБрдареЗрддрд░реА рдкреНрд░рдХреНрд░рд┐рдпреЗрдд рддреЛ рд╕реБрдзрд╛рд░рд┐рдд рдХреЗрд▓рд╛ рдЬрд╛рддреЛ. рд╕рдВрдкреВрд░реНрдг рдХрд╛рд░реНрдп рдХреЛрдб рдЙрджрд╛рд╣рд░рдгреЗ рд░реЗрдкреЙрдЬрд┐рдЯрд░реАрдордзреНрдпреЗ рдЖрдврд│реВ рд╢рдХрддрд╛рдд
https://github.com/dm-logv/airflow-tutorial .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- broker
рдЯрд┐рдкрд╛:
- рд░рдЪрдиреЗрдЪреНрдпрд╛ рдЕрд╕реЗрдВрдмреНрд▓реАрдордзреНрдпреЗ, рдореА рдореЛрдареНрдпрд╛ рдкреНрд░рдорд╛рдгрд╛рд╡рд░ рд╕реБрдкреНрд░рд╕рд┐рджреНрдз рдкреНрд░рддрд┐рдореЗрд╡рд░ рдЕрд╡рд▓рдВрдмреВрди рд╣реЛрддреЛ
рдкреБрдХрд▓/рдбреЙрдХрд░-рдПрдЕрд░рдлреНрд▓реЛ - рд╣реЗ рдирдХреНрдХреА рдкрд╣рд╛. рдХрджрд╛рдЪрд┐рдд рддреБрдореНрд╣рд╛рд▓рд╛ рддреБрдордЪреНрдпрд╛ рдЖрдпреБрд╖реНрдпрд╛рдд рдЗрддрд░ рдХрд╢рд╛рдЪреАрд╣реА рдЧрд░рдЬ рдирд╕реЗрд▓. - рд╕рд░реНрд╡ рдПрдЕрд░рдлреНрд▓реЛ рд╕реЗрдЯрд┐рдВрдЧреНрдЬ рдХреЗрд╡рд│ рджреНрд╡рд╛рд░реЗ рдЙрдкрд▓рдмреНрдз рдирд╛рд╣реАрдд
airflow.cfg
, рдкрд░рдВрддреБ рдкрд░реНрдпрд╛рд╡рд░рдгреАрдп рд╡реНрд╣реЗрд░рд┐рдПрдмрд▓реНрд╕рджреНрд╡рд╛рд░реЗ рджреЗрдЦреАрд▓ (рд╡рд┐рдХрд╛рд╕рдХрд╛рдВрдирд╛ рдзрдиреНрдпрд╡рд╛рдж), рдЬреНрдпрд╛рдЪрд╛ рдореА рджреБрд░реНрднрд╛рд╡рдирд╛рдкреВрд░реНрдгрдкрдгреЗ рдлрд╛рдпрджрд╛ рдШреЗрддрд▓рд╛. - рд╕реНрд╡рд╛рднрд╛рд╡рд┐рдХрдЪ, рддреЗ рдЙрддреНрдкрд╛рджрдирд╛рд╕рд╛рдареА рддрдпрд╛рд░ рдирд╛рд╣реА: рдореА рдЬрд╛рдгреАрд╡рдкреВрд░реНрд╡рдХ рдХрдВрдЯреЗрдирд░рд╡рд░ рд╣реГрджрдпрд╛рдЪреЗ рдареЛрдХреЗ рдареЗрд╡рд▓реЗ рдирд╛рд╣реАрдд, рдорд▓рд╛ рд╕реБрд░рдХреНрд╖рд┐рддрддреЗрдЪрд╛ рддреНрд░рд╛рд╕ рдЭрд╛рд▓рд╛ рдирд╛рд╣реА. рдкрдг рдореА рдЖрдордЪреНрдпрд╛ рдкреНрд░рдпреЛрдЧрдХрд░реНрддреНрдпрд╛рдВрд╕рд╛рдареА рдХрд┐рдорд╛рди рдпреЛрдЧреНрдп рддреЗ рдХреЗрд▓реЗ.
- рд▓рдХреНрд╖рд╛рдд рдареЗрд╡рд╛ рдХреА:
- рдбреЕрдЧ рдлреЛрд▓реНрдбрд░ рд╢реЗрдбреНрдпреВрд▓рд░ рдЖрдгрд┐ рдХрд╛рдордЧрд╛рд░ рджреЛрдШрд╛рдВрдирд╛рд╣реА рдкреНрд░рд╡реЗрд╢рдпреЛрдЧреНрдп рдЕрд╕рдгреЗ рдЖрд╡рд╢реНрдпрдХ рдЖрд╣реЗ.
- рд╣реЗ рд╕рд░реНрд╡ рддреГрддреАрдп-рдкрдХреНрд╖ рд▓рд╛рдпрдмреНрд░рд░реАрдВрдирд╛ рд▓рд╛рдЧреВ рд╣реЛрддреЗ - рддреЗ рд╕рд░реНрд╡ рд╢реЗрдбреНрдпреВрд▓рд░ рдЖрдгрд┐ рдХрд╛рдордЧрд╛рд░рд╛рдВрд╕рд╣ рдорд╢реАрдирд╡рд░ рд╕реНрдерд╛рдкрд┐рдд рдХреЗрд▓реЗ рдЬрд╛рдгреЗ рдЖрд╡рд╢реНрдпрдХ рдЖрд╣реЗ.
рдмрд░рдВ, рдЖрддрд╛ рд╣реЗ рд╕реЛрдкреЗ рдЖрд╣реЗ:
$ docker-compose up --scale worker=3
рд╕рд░реНрд╡рдХрд╛рд╣реА рд╡рд╛рдврд▓реНрдпрд╛рдирдВрддрд░, рдЖрдкрдг рд╡реЗрдм рдЗрдВрдЯрд░рдлреЗрд╕ рдкрд╛рд╣реВ рд╢рдХрддрд╛:
- рдПрдЕрд░рдлреНрд▓реЛ:
http://127.0.0.1:8080/admin/ - рдлреНрд▓реЙрд╡рд░:
http://127.0.0.1:5555/dashboard
рдореВрд▓рднреВрдд рд╕рдВрдХрд▓реНрдкрдирд╛
рдЬрд░ рддреБрдореНрд╣рд╛рд▓рд╛ рдпрд╛ рд╕рд░реНрд╡ "рдбреЕрдЧреНрдЬ" рдордзреАрд▓ рдХрд╛рд╣реАрд╣реА рд╕рдордЬрд▓реЗ рдирд╕реЗрд▓, рддрд░ рдпреЗрдереЗ рдПрдХ рд▓рд╣рд╛рди рд╢рдмреНрджрдХреЛрд╢ рдЖрд╣реЗ:
- рд╢реЗрдбреНрдпреВрд▓рд░ - рдПрдЕрд░рдлреНрд▓реЛрдордзреАрд▓ рд╕рд░реНрд╡рд╛рдд рдорд╣рддреНрд╡рд╛рдЪреЗ рдХрд╛рдХрд╛, рдЬреЗ рд░реЛрдмреЛрдЯ рдХрдареЛрд░ рдкрд░рд┐рд╢реНрд░рдо рдХрд░рддрд╛рдд рдпрд╛рд╡рд░ рдирд┐рдпрдВрддреНрд░рдг рдареЗрд╡рддрд╛рдд, рд╡реНрдпрдХреНрддреА рдирд╛рд╣реА: рд╡реЗрд│рд╛рдкрддреНрд░рдХрд╛рдЪреЗ рдирд┐рд░реАрдХреНрд╖рдг рдХрд░рддрд╛рдд, рдбреЕрдЧреНрдЬ рдЕрдкрдбреЗрдЯ рдХрд░рддрд╛рдд, рдХрд╛рд░реНрдпреЗ рд▓рд╛рдБрдЪ рдХрд░рддрд╛рдд.
рд╕рд░реНрд╡рд╕рд╛рдзрд╛рд░рдгрдкрдгреЗ, рдЬреБрдиреНрдпрд╛ рдЖрд╡реГрддреНрддреНрдпрд╛рдВрдордзреНрдпреЗ, рддреНрдпрд╛рд▓рд╛ рд╕реНрдорд░рдгрд╢рдХреНрддреАрдЪреА рд╕рдорд╕реНрдпрд╛ рд╣реЛрддреА (рдирд╛рд╣реА, рд╕реНрдореГрддрд┐рднреНрд░рдВрд╢ рдирд╛рд╣реА, рдкрд░рдВрддреБ рд▓реАрдХ) рдЖрдгрд┐ рд▓реЗрдЧрд╕реА рдкреЕрд░рд╛рдореАрдЯрд░ рдЕрдЧрджреА рдХреЙрдиреНрдлрд┐рдЧрдордзреНрдпреЗ рд░рд╛рд╣рд┐рд▓реЗ.
run_duration
- рддреНрдпрд╛рдЪреЗ рд░реАрд╕реНрдЯрд╛рд░реНрдЯ рдордзреНрдпрд╛рдВрддрд░. рдкрдг рдЖрддрд╛ рд╕рд░реНрд╡ рдХрд╛рд╣реА рдареАрдХ рдЖрд╣реЗ. - рджрд┐рд╡рд╕ (рдЙрд░реНрдл "рдбреЕрдЧ") - "рджрд┐рдЧреНрджрд░реНрд╢рд┐рдд рдЕреЕрд╕рд╛рдпрдХреНрд▓рд┐рдХ рдЖрд▓реЗрдЦ", рдкрд░рдВрддреБ рдЕрд╢реА рд╡реНрдпрд╛рдЦреНрдпрд╛ рдХрд╛рд╣реА рд▓реЛрдХрд╛рдВрдирд╛ рд╕рд╛рдВрдЧреЗрд▓, рдкрд░рдВрддреБ рдкреНрд░рддреНрдпрдХреНрд╖рд╛рдд рддреЗ рдПрдХрдореЗрдХрд╛рдВрд╢реА рд╕рдВрд╡рд╛рдж рд╕рд╛рдзрдгрд╛рд░реНтАНрдпрд╛ рдХрд╛рд░реНрдпрд╛рдВрд╕рд╛рдареА рдХрдВрдЯреЗрдирд░ рдЖрд╣реЗ (рдЦрд╛рд▓реА рдкрд╣рд╛) рдХрд┐рдВрд╡рд╛ SSIS рдордзреАрд▓ рдкреЕрдХреЗрдЬ рдЖрдгрд┐ рдЗрдиреНрдлреЙрд░реНрдореЗрдЯрд┐рдХрд╛ рдордзреАрд▓ рд╡рд░реНрдХрдлреНрд▓реЛрдЪреЗ рдЕреЕрдирд╛рд▓реЙрдЧ .
рдбреЕрдЧреНрдЬ рд╡реНрдпрддрд┐рд░рд┐рдХреНрдд, рдЕрдЬреВрдирд╣реА рд╕рдмрдбреЕрдЧ рдЕрд╕реВ рд╢рдХрддрд╛рдд, рдкрд░рдВрддреБ рдмрд╣реБрдзрд╛ рдЖрдореНрд╣рд╛рд▓рд╛ рддреЗ рдорд┐рд│рдгрд╛рд░ рдирд╛рд╣реАрдд.
- DAG рдзрд╛рд╡ - рдЖрд░рдВрдн рдХреЗрд▓реЗрд▓рд╛ рдбреЕрдЧ, рдЬреЛ рд╕реНрд╡рддрдГрдЪрд╛ рдирд┐рдпреБрдХреНрдд рдХреЗрд▓рд╛ рдЬрд╛рддреЛ
execution_date
. рд╕рдорд╛рди рдбреЗрдЧрдЪреЗ рдбреЕрдЧрди рд╕рдорд╛рдВрддрд░рдкрдгреЗ рдХрд╛рд░реНрдп рдХрд░реВ рд╢рдХрддрд╛рдд (рдЬрд░ рддреБрдореНрд╣реА рддреБрдордЪреА рдХрд╛рд░реНрдпреЗ рдирд┐рдГрд╕рдВрд╢рдпрдкрдгреЗ рдХреЗрд▓реА рдЕрд╕рддреАрд▓ рддрд░). - рдСрдкрд░реЗрдЯрд░ рд╡рд┐рд╢рд┐рд╖реНрдЯ рдХреНрд░рд┐рдпрд╛ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдЬрдмрд╛рдмрджрд╛рд░ рдЕрд╕рд▓реЗрд▓реНрдпрд╛ рдХреЛрдбрдЪреЗ рддреБрдХрдбреЗ рдЖрд╣реЗрдд. рддреАрди рдкреНрд░рдХрд╛рд░рдЪреЗ рдСрдкрд░реЗрдЯрд░ рдЖрд╣реЗрдд:
- рдХрд╛рд░рд╡рд╛рдИрдЖрдордЪреНрдпрд╛ рдЖрд╡рдбрддреНрдпрд╛рдкреНрд░рдорд╛рдгреЗ
PythonOperator
, рдЬреЛ рдХреЛрдгрддрд╛рд╣реА (рд╡реИрдз) рдкрд╛рдпрдерди рдХреЛрдб рдХрд╛рд░реНрдпрд╛рдиреНрд╡рд┐рдд рдХрд░реВ рд╢рдХрддреЛ; - рд╣рд╕реНрддрд╛рдВрддрд░рдг, рдЬреЗ рдард┐рдХрд╛рдгрд╛рд╣реВрди рдбреЗрдЯрд╛ рд╡рд╛рд╣рддреВрдХ рдХрд░рддрд╛рдд, рдореНрд╣рдгрддрд╛рдд,
MsSqlToHiveTransfer
; - рд╕реЗрдиреНрд╕рд░ рджреБрд╕рд░реАрдХрдбреЗ, рдПрдЦрд╛рджреА рдШрдЯрдирд╛ рдШрдбреЗрдкрд░реНрдпрдВрдд рддреЗ рддреБрдореНрд╣рд╛рд▓рд╛ рдкреНрд░рддрд┐рдХреНрд░рд┐рдпрд╛ рджреЗрдгреНрдпрд╛рд╕ рдХрд┐рдВрд╡рд╛ рдбреЕрдЧрдЪреНрдпрд╛ рдкреБрдвреАрд▓ рдЕрдВрдорд▓рдмрдЬрд╛рд╡рдгреАрдЪреА рдЧрддреА рдХрдореА рдХрд░рдгреНрдпрд╛рд╕ рдЕрдиреБрдорддреА рджреЗрдИрд▓.
HttpSensor
рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдПрдВрдбрдкреЙрдЗрдВрдЯ рдЦреЗрдЪреВ рд╢рдХрддреЛ, рдЖрдгрд┐ рдЬреЗрд╡реНрд╣рд╛ рдЗрдЪреНрдЫрд┐рдд рдкреНрд░рддрд┐рд╕рд╛рдж рдкреНрд░рддреАрдХреНрд╖рд╛ рдХрд░рдд рдЕрд╕реЗрд▓ рддреЗрд╡реНрд╣рд╛ рд╣рд╕реНрддрд╛рдВрддрд░рдг рд╕реБрд░реВ рдХрд░рд╛GoogleCloudStorageToS3Operator
. рдПрдХ рдЬрд┐рдЬреНрдЮрд╛рд╕реВ рдорди рд╡рд┐рдЪрд╛рд░реЗрд▓: тАЬрдХрд╛? рд╢реЗрд╡рдЯреА, рддреБрдореНрд╣реА рдСрдкрд░реЗрдЯрд░рдордзреНрдпреЗрдЪ рдкреБрдирд░рд╛рд╡реГрддреНрддреА рдХрд░реВ рд╢рдХрддрд╛!тАЭ рдЖрдгрд┐ рдордЧ, рдирд┐рд▓рдВрдмрд┐рдд рдСрдкрд░реЗрдЯрд░рд╕рд╣ рдХрд╛рд░реНрдпрд╛рдВрдЪрд╛ рдкреВрд▓ рд░реЛрдЦреВ рдирдпреЗ рдореНрд╣рдгреВрди. рдкреБрдвреАрд▓ рдкреНрд░рдпрддреНрдирд╛рдкреВрд░реНрд╡реА рд╕реЗрдиреНрд╕рд░ рд╕реБрд░реВ рд╣реЛрддреЛ, рддрдкрд╛рд╕рддреЛ рдЖрдгрд┐ рдорд░рддреЛ.
- рдХрд╛рд░рд╡рд╛рдИрдЖрдордЪреНрдпрд╛ рдЖрд╡рдбрддреНрдпрд╛рдкреНрд░рдорд╛рдгреЗ
- рдХрд╛рд░реНрдп - рдШреЛрд╖рд┐рдд рдСрдкрд░реЗрдЯрд░, рдкреНрд░рдХрд╛рд░рд╛рдХрдбреЗ рджреБрд░реНрд▓рдХреНрд╖ рдХрд░реВрди, рдЖрдгрд┐ рдбреЕрдЧрд╢реА рд╕рдВрд▓рдЧреНрди рдХреЗрд▓реЗрд▓реНрдпрд╛ рдХрд╛рд░реНрдпрд╛рдЪреНрдпрд╛ рд░рдБрдХрд╡рд░ рдкрджреЛрдиреНрдирдд рдХреЗрд▓реЗ рдЬрд╛рддрд╛рдд.
- рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдг - рдЬреЗрд╡реНрд╣рд╛ рд╕рд╛рдорд╛рдиреНрдп рдирд┐рдпреЛрдЬрдХрд╛рдиреЗ рдард░рд╡рд▓реЗ рдХреА рдХрд╛рдордЧрд┐рд░реА рдХрд░рдгрд╛рд░реНтАНрдпрд╛ рдХрд╛рдордЧрд╛рд░рд╛рдВрд╡рд░ рд▓рдврд╛рдИрдд рдХрд╛рд░реНрдпреЗ рдкрд╛рдард╡рдгреНрдпрд╛рдЪреА рд╡реЗрд│ рдЖрд▓реА рдЖрд╣реЗ (рдЬрд░ рдЖрдкрдг рд╡рд╛рдкрд░рдд рдЕрд╕рд╛рд▓ рддрд░
LocalExecutor
рдХрд┐рдВрд╡рд╛ рдЪреНрдпрд╛ рдмрд╛рдмрддреАрдд рд░рд┐рдореЛрдЯ рдиреЛрдбрд▓рд╛CeleryExecutor
), рддреЗ рддреНрдпрд╛рдВрдирд╛ рд╕рдВрджрд░реНрдн рдирд┐рдпреБрдХреНрдд рдХрд░рддреЗ (рдореНрд╣рдгрдЬреЗ, рд╡реНрд╣реЗрд░рд┐рдПрдмрд▓реНрд╕рдЪрд╛ рдПрдХ рд╕рдВрдЪ - рдЕрдВрдорд▓рдмрдЬрд╛рд╡рдгреА рдкреЕрд░рд╛рдореАрдЯрд░реНрд╕), рдХрдорд╛рдВрдб рдХрд┐рдВрд╡рд╛ рдХреНрд╡реЗрд░реА рдЯреЗрдореНрдкрд▓реЗрдЯреНрд╕ рд╡рд┐рд╕реНрддреГрдд рдХрд░рддреЗ рдЖрдгрд┐ рддреНрдпрд╛рдВрдирд╛ рдПрдХрддреНрд░ рдХрд░рддреЗ.
рдЖрдореНрд╣реА рдХрд╛рд░реНрдпреЗ рд╡реНрдпреБрддреНрдкрдиреНрди рдХрд░рддреЛ
рдкреНрд░рдердо, рдЖрдордЪреНрдпрд╛ рдбрдЧрдЪреНрдпрд╛ рд╕рд╛рдорд╛рдиреНрдп рдпреЛрдЬрдиреЗрдЪреА рд░реВрдкрд░реЗрд╖рд╛ рдХрд╛рдвреВрдпрд╛, рдЖрдгрд┐ рдирдВрддрд░ рдЖрдореНрд╣реА рдЕрдзрд┐рдХрд╛рдзрд┐рдХ рддрдкрд╢реАрд▓рд╛рдВрдордзреНрдпреЗ рдЬрд╛рдК, рдХрд╛рд░рдг рдЖрдореНрд╣реА рдХрд╛рд╣реА рдХреНрд╖реБрд▓реНрд▓рдХ рдЙрдкрд╛рдп рд▓рд╛рдЧреВ рдХрд░рддреЛ.
рддрд░, рддреНрдпрд╛рдЪреНрдпрд╛ рд╕рд░реНрд╡рд╛рдд рд╕реЛрдкреНрдпрд╛ рд╕реНрд╡рд░реВрдкрд╛рдд, рдЕрд╕рд╛ рдбрдЧ рдпрд╛рд╕рд╛рд░рдЦрд╛ рджрд┐рд╕реЗрд▓:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
рдЪрд▓рд╛ рддреЗ рд╢реЛрдзреВрдпрд╛:
- рдкреНрд░рдердо, рдЖрдореНрд╣реА рдЖрд╡рд╢реНрдпрдХ libs рдЖрдпрд╛рдд рдХрд░рддреЛ рдЖрдгрд┐ рдХрд╛рд╣реАрддрд░реА;
sql_server_ds
- рд╣реЗ рдЖрд╣реЗList[namedtuple[str, str]]
рдПрдЕрд░рдлреНрд▓реЛ рдХрдиреЗрдХреНрд╢рдиреНрд╕рдЪреНрдпрд╛ рдХрдиреЗрдХреНрд╢рдирдЪреНрдпрд╛ рдирд╛рд╡рд╛рдВрд╕рд╣ рдЖрдгрд┐ рдбреЗрдЯрд╛рдмреЗрд╕реЗрд╕ рдЬреНрдпрд╛рдордзреВрди рдЖрдореНрд╣реА рдЖрдордЪреА рдкреНрд▓реЗрдЯ рдШреЗрдК;dag
- рдЖрдордЪреНрдпрд╛ рдбреЗрдЧрдЪреА рдШреЛрд╖рдгрд╛, рдЬреА рдЕрдкрд░рд┐рд╣рд╛рд░реНрдпрдкрдгреЗ рдЕрд╕рдгреЗ рдЖрд╡рд╢реНрдпрдХ рдЖрд╣реЗglobals()
, рдЕрдиреНрдпрдерд╛ Airflow рд▓рд╛ рддреЗ рд╕рд╛рдкрдбрдгрд╛рд░ рдирд╛рд╣реА. рдбрдЧ рд╣реЗ рджреЗрдЦреАрд▓ рдореНрд╣рдгрд╛рдпрдЪреЗ рдЖрд╣реЗ:- рддреНрдпрд╛рдЪреЗ рдирд╛рд╡ рдХрд╛рдп
orders
- рд╣реЗ рдирд╛рд╡ рдирдВрддрд░ рд╡реЗрдм рдЗрдВрдЯрд░рдлреЗрд╕рдордзреНрдпреЗ рджрд┐рд╕реЗрд▓, - рддреЛ рдЖрда рдЬреБрд▓реИрдЪреНрдпрд╛ рдордзреНрдпрд░рд╛рддреНрд░реАрдкрд╛рд╕реВрди рдХрд╛рдо рдХрд░реЗрд▓,
- рдЖрдгрд┐ рддреЗ рдЪрд╛рд▓рд▓реЗ рдкрд╛рд╣рд┐рдЬреЗ, рдЕрдВрджрд╛рдЬреЗ рджрд░ 6 рддрд╛рд╕рд╛рдВрдиреА (рддреНрдпрд╛рдРрд╡рдЬреА рдпреЗрдереЗ рдХрдареАрдг рдореБрд▓рд╛рдВрд╕рд╛рдареА
timedelta()
рд╕реНрд╡реАрдХрд╛рд░реНрдпcron
-рдУрд│0 0 0/6 ? * * *
, рдХрдореА рдердВрдб рд╕рд╛рдареА - рдПрдХ рдЕрднрд┐рд╡реНрдпрдХреНрддреА рдЬрд╕реЗ@daily
);
- рддреНрдпрд╛рдЪреЗ рдирд╛рд╡ рдХрд╛рдп
workflow()
рдореБрдЦреНрдп рдХрд╛рдо рдХрд░реЗрд▓, рдкрдг рдЖрддрд╛ рдирд╛рд╣реА. рдЖрддреНрддрд╛рд╕рд╛рдареА, рдЖрдореНрд╣реА рдЖрдордЪрд╛ рд╕рдВрджрд░реНрдн рд▓реЙрдЧрдордзреНрдпреЗ рдЯрд╛рдХреВ.- рдЖрдгрд┐ рдЖрддрд╛ рдХрд╛рд░реНрдпреЗ рддрдпрд╛рд░ рдХрд░рдгреНрдпрд╛рдЪреА рд╕рд╛рдзреА рдЬрд╛рджреВ:
- рдЖрдореНрд╣реА рдЖрдордЪреНрдпрд╛ рд╕реНрддреНрд░реЛрддрд╛рдВрдордзреВрди рдзрд╛рд╡рддреЛ;
- рдЖрд░рдВрдн рдХрд░рд╛
PythonOperator
, рдЬреЗ рдЖрдордЪреНрдпрд╛ рдбрдореАрдЪреА рдЕрдВрдорд▓рдмрдЬрд╛рд╡рдгреА рдХрд░реЗрд▓workflow()
. рдЯрд╛рд╕реНрдХрдЪреЗ рдЕрдирдиреНрдп (рдбреЗрдЧрдордзреНрдпреЗ) рдирд╛рд╡ рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХрд░рдгреНрдпрд╛рд╕ рд╡рд┐рд╕рд░реВ рдирдХрд╛ рдЖрдгрд┐ рдбреЕрдЧ рд╕реНрд╡рддрдГрдЪ рдмрд╛рдВрдзрд╛. рдЭреЗрдВрдбрд╛provide_context
рдпрд╛рдордзреВрди, рдлрдВрдХреНрд╢рдирдордзреНрдпреЗ рдЕрддрд┐рд░рд┐рдХреНрдд рдпреБрдХреНрддрд┐рд╡рд╛рдж рдУрддрддреЛ, рдЬреНрдпрд╛рдЪрд╛ рд╡рд╛рдкрд░ рдХрд░реВрди рдЖрдореНрд╣реА рдХрд╛рд│рдЬреАрдкреВрд░реНрд╡рдХ рдЧреЛрд│рд╛ рдХрд░реВ**context
.
рдЖрддрд╛рд╕рд╛рдареА, рдПрд╡рдвреЗрдЪ. рдЖрдореНрд╣рд╛рд▓рд╛ рдХрд╛рдп рдорд┐рд│рд╛рд▓реЗ:
- рд╡реЗрдм рдЗрдВрдЯрд░рдлреЗрд╕рдордзреНрдпреЗ рдирд╡реАрди рдбреЗрдЧ,
- рджреАрдбрд╢реЗ рдХрд╛рд░реНрдпреЗ рд╕рдорд╛рдВрддрд░рдкрдгреЗ рдкрд╛рд░ рдкрд╛рдбрд▓реА рдЬрд╛рддреАрд▓ (рдЬрд░ рдПрдЕрд░рдлреНрд▓реЛ, рд╕реЗрд▓реЗрд░реА рд╕реЗрдЯрд┐рдВрдЧреНрдЬ рдЖрдгрд┐ рд╕рд░реНрд╡реНрд╣рд░рдЪреА рдХреНрд╖рдорддрд╛ рдкрд░рд╡рд╛рдирдЧреА рджреЗрдд тАЛтАЛрдЕрд╕реЗрд▓).
рдмрд░рдВ, рдЬрд╡рд│рдЬрд╡рд│ рд╕рдордЬрд▓рдВ.
рдЕрд╡рд▓рдВрдмрд┐рддреНрд╡ рдХреЛрдг рд╕реНрдерд╛рдкрд┐рдд рдХрд░реЗрд▓?
рд╣реА рд╕рдВрдкреВрд░реНрдг рдЧреЛрд╖реНрдЯ рд╕реЛрдкреА рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА, рдореА рд╕реНрдХреНрд░реВ рдХреЗрд▓реЗ docker-compose.yml
рдкреНрд░рдХреНрд░рд┐рдпрд╛ requirements.txt
рд╕рд░реНрд╡ рдиреЛрдбреНрд╕ рд╡рд░.
рдЖрддрд╛ рддреЗ рдЧреЗрд▓реЗ рдЖрд╣реЗ:
рдЧреНрд░реЗ рд╕реНрдХреНрд╡реЗрдЕрд░ рд╣реЗ рд╢реЗрдбреНрдпреБрд▓рд░рджреНрд╡рд╛рд░реЗ рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдХреЗрд▓реЗрд▓реЗ рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдг рдЖрд╣реЗрдд.
рдЖрдореНрд╣реА рдереЛрдбреА рдкреНрд░рддреАрдХреНрд╖рд╛ рдХрд░рддреЛ, рдХрд╛рд░реНрдпреЗ рдХрд╛рдордЧрд╛рд░рд╛рдВрдиреА рдкреВрд░реНрдг рдХреЗрд▓реА рдЖрд╣реЗрдд:
рд╣рд┐рд░рд╡реЗ, рдЕрд░реНрдерд╛рддрдЪ, рддреНрдпрд╛рдВрдЪреЗ рдХрд╛рдо рдпрд╢рд╕реНрд╡реАрд░рд┐рддреНрдпрд╛ рдкреВрд░реНрдг рдХреЗрд▓реЗ рдЖрд╣реЗ. рд░реЗрдбреНрд╕ рдлрд╛рд░рд╕реЗ рдпрд╢рд╕реНрд╡реА рдирд╛рд╣реАрдд.
рддрд╕реЗ, рдЖрдордЪреНрдпрд╛ рдЙрддреНрдкрд╛рджрдирд╛рд╡рд░ рдХреЛрдгрддреЗрд╣реА рдлреЛрд▓реНрдбрд░ рдирд╛рд╣реА
./dags
, рдорд╢реАрдиреНрд╕рдордзреНрдпреЗ рдХреЛрдгрддреЗрд╣реА рд╕рд┐рдВрдХреНрд░реЛрдирд╛рдЗрдЭреЗрд╢рди рдирд╛рд╣реА - рд╕рд░реНрд╡ рдбреЕрдЧреНрдЬ рдЖрдд рдЖрд╣реЗрддgit
рдЖрдордЪреНрдпрд╛ Gitlab рд╡рд░, рдЖрдгрд┐ Gitlab CI рдорд╢рд┐рдирдордзреНрдпреЗ рд╡рд┐рд▓реАрди рдЭрд╛рд▓реНрдпрд╛рд╡рд░ рдЕрдкрдбреЗрдЯ рд╡рд┐рддрд░рд┐рдд рдХрд░рддреЗmaster
.
рдлреНрд▓реЙрд╡рд░ рдмрджреНрджрд▓ рдереЛрдбреЗ
рдХрд╛рдордЧрд╛рд░ рдЖрдордЪреНрдпрд╛ рдкреЕрд╕рд┐рдлрд╛рдпрд░рд▓рд╛ рдорд╛рд░рдд рдЕрд╕рддрд╛рдирд╛, рдЖрдкрдг рдЖрдгрдЦреА рдПрдХ рд╕рд╛рдзрди рд▓рдХреНрд╖рд╛рдд рдареЗрд╡реВ рдЬреЗ рдЖрдкрд▓реНрдпрд╛рд▓рд╛ рдХрд╛рд╣реАрддрд░реА рджрд░реНрд╢рд╡реВ рд╢рдХрддреЗ - рдлреНрд▓реЙрд╡рд░.
рдХрд╛рдордЧрд╛рд░ рдиреЛрдбреНрд╕рд╡рд░реАрд▓ рд╕рд╛рд░рд╛рдВрд╢ рдорд╛рд╣рд┐рддреАрд╕рд╣ рдкрд╣рд┐рд▓реЗ рдкреГрд╖реНрда:
рдХрд╛рд░реНрдп рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдЧреЗрд▓реЗрд▓реНрдпрд╛ рдХрд╛рд░реНрдпрд╛рдВрд╕рд╣ рд╕рд░реНрд╡рд╛рдд рддреАрд╡реНрд░ рдкреГрд╖реНрда:
рдЖрдордЪреНрдпрд╛ рдмреНрд░реЛрдХрд░рдЪреНрдпрд╛ рд╕реНрдерд┐рддреАрд╕рд╣ рд╕рд░реНрд╡рд╛рдд рдХрдВрдЯрд╛рд│рд╡рд╛рдгрд╛ рдкреГрд╖реНрда:
рд╕рд░реНрд╡рд╛рдд рдЙрдЬрд│ рдкреГрд╖реНрда рд╣реЗ рдХрд╛рд░реНрдп рд╕реНрдерд┐рддреА рдЖрд▓реЗрдЦ рдЖрдгрд┐ рддреНрдпрд╛рдВрдЪреНрдпрд╛ рдЕрдВрдорд▓рдмрдЬрд╛рд╡рдгреАрдЪреНрдпрд╛ рд╡реЗрд│реЗрд╕рд╣ рдЖрд╣реЗ:
рдЖрдореНрд╣реА рдЕрдВрдбрд░рд▓реЛрдбреЗрдб рд▓реЛрдб рдХрд░рддреЛ
рддрд░, рд╕рд░реНрд╡ рдХрд╛рд░реНрдпреЗ рдкреВрд░реНрдг рдЭрд╛рд▓реА рдЖрд╣реЗрдд, рдЖрдкрдг рдЬрдЦрдореАрдВрдирд╛ рдШреЗрдКрди рдЬрд╛рдК рд╢рдХрддрд╛.
рдЖрдгрд┐ рддреЗрдереЗ рдмрд░реЗрдЪ рдЬрдЦрдореА рдЭрд╛рд▓реЗ - рдПрдХ рдХрд┐рдВрд╡рд╛ рджреБрд╕рд░реНрдпрд╛ рдХрд╛рд░рдгрд╛рд╕реНрддрд╡. рдПрдЕрд░рдлреНрд▓реЛрдЪреНрдпрд╛ рдпреЛрдЧреНрдп рд╡рд╛рдкрд░рд╛рдЪреНрдпрд╛ рдмрд╛рдмрддреАрдд, рд╣реЗ рдЪреМрд░рд╕ рд╕реВрдЪрд┐рдд рдХрд░рддрд╛рдд рдХреА рдбреЗрдЯрд╛ рдирд┐рд╢реНрдЪрд┐рддрдкрдгреЗ рдЖрд▓рд╛ рдирд╛рд╣реА.
рддреБрдореНрд╣рд╛рд▓рд╛ рд▓реЙрдЧ рдкрд╛рд╣рдгреНрдпрд╛рдЪреА рдЖрдгрд┐ рдкрдбрд▓реЗрд▓реНрдпрд╛ рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдгреЗ рд░реАрд╕реНрдЯрд╛рд░реНрдЯ рдХрд░рдгреНрдпрд╛рдЪреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рдЖрд╣реЗ.
рдХреЛрдгрддреНрдпрд╛рд╣реА рд╕реНрдХреНрд╡реЗрдЕрд░рд╡рд░ рдХреНрд▓рд┐рдХ рдХрд░реВрди, рдЖрдореНрд╣реА рдЖрдордЪреНрдпрд╛рд╕рд╛рдареА рдЙрдкрд▓рдмреНрдз рдХреНрд░рд┐рдпрд╛ рдкрд╛рд╣реВ:
рддреБрдореНрд╣реА рдШреЗрдК рд╢рдХрддрд╛ рдЖрдгрд┐ рдХреНрд▓рд┐рдЕрд░ рдж рдлреЙрд▓рди рдХрд░реВ рд╢рдХрддрд╛. рдореНрд╣рдгрдЬреЗрдЪ, рдЖрдореНрд╣реА рд╡рд┐рд╕рд░рддреЛ рдХреА рддреЗрдереЗ рдХрд╛рд╣реАрддрд░реА рдЕрдпрд╢рд╕реНрд╡реА рдЭрд╛рд▓реЗ рдЖрд╣реЗ рдЖрдгрд┐ рддреЗрдЪ рдЙрджрд╛рд╣рд░рдг рдХрд╛рд░реНрдп рд╢реЗрдбреНрдпреВрд▓рд░рдХрдбреЗ рдЬрд╛рдИрд▓.
рд╣реЗ рд╕реНрдкрд╖реНрдЯ рдЖрд╣реЗ рдХреА рд╕рд░реНрд╡ рд▓рд╛рд▓ рдЪреМрд░рд╕рд╛рдВрд╕рд╣ рдорд╛рдКрд╕рд╕рд╣ рд╣реЗ рдХрд░рдгреЗ рдлрд╛рд░рд╕реЗ рдорд╛рдирд╡реАрдп рдирд╛рд╣реА - рд╣реЗ рдЖрдореНрд╣реА рдПрдЕрд░рдлреНрд▓реЛрдХрдбреВрди рдЕрдкреЗрдХреНрд╖рд┐рдд рдирд╛рд╣реА. рд╕рд╛рд╣рдЬрд┐рдХрдЪ, рдЖрдордЪреНрдпрд╛рдХрдбреЗ рдореЛрдареНрдпрд╛ рдкреНрд░рдорд╛рдгрд╛рд╡рд░ рд╡рд┐рдирд╛рд╢ рдХрд░рдгрд╛рд░реА рд╢рд╕реНрддреНрд░реЗ рдЖрд╣реЗрдд: Browse/Task Instances
рдЪрд▓рд╛ рд╕рд░реНрд╡ рдХрд╛рд╣реА рдПрдХрд╛рдЪ рд╡реЗрд│реА рдирд┐рд╡рдбрд╛ рдЖрдгрд┐ рд╢реВрдиреНрдпрд╛рд╡рд░ рд░реАрд╕реЗрдЯ рдХрд░реВ, рдпреЛрдЧреНрдп рдЖрдпрдЯрдорд╡рд░ рдХреНрд▓рд┐рдХ рдХрд░рд╛:
рд╕рд╛рдлрд╕рдлрд╛рдИ рдХреЗрд▓реНрдпрд╛рдирдВрддрд░, рдЖрдордЪреНрдпрд╛ рдЯреЕрдХреНрд╕реА рдЕрд╢рд╛ рджрд┐рд╕рддрд╛рдд (рддреЗ рд╢реЗрдбреНрдпреВрд▓рд░ рд╢реЗрдбреНрдпреВрд▓ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдЖрдзреАрдЪ рд╡рд╛рдЯ рдкрд╛рд╣рдд рдЖрд╣реЗрдд):
рдХрдиреЗрдХреНрд╢рди, рд╣реБрдХ рдЖрдгрд┐ рдЗрддрд░ рдЪрд▓
рдкреБрдвреАрд▓ рдбреАрдПрдЬреА рдкрд╛рд╣рдгреНрдпрд╛рдЪреА рд╡реЗрд│ рдЖрд▓реА рдЖрд╣реЗ, update_reports.py
:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""╨У╨╛╤Б╨┐╨╛╨┤╨░ ╤Е╨╛╤А╨╛╤И╨╕╨╡, ╨╛╤В╤З╨╡╤В╤Л ╨╛╨▒╨╜╨╛╨▓╨╗╨╡╨╜╤Л"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
╨Э╨░╤В╨░╤И, ╨┐╤А╨╛╤Б╤Л╨┐╨░╨╣╤Б╤П, ╨╝╤Л {{ dag.dag_id }} ╤Г╤А╨╛╨╜╨╕╨╗╨╕
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
рдкреНрд░рддреНрдпреЗрдХрд╛рдиреЗ рдХрдзреА рдЕрд╣рд╡рд╛рд▓ рдЕрдкрдбреЗрдЯ рдХреЗрд▓рд╛ рдЖрд╣реЗ рдХрд╛? рд╣реА рдкреБрдиреНрд╣рд╛ рддрд┐рдЪреА рдЖрд╣реЗ: рдбреЗрдЯрд╛ рдХреЛрдареВрди рдорд┐рд│рд╡рд╛рдпрдЪрд╛ рдпрд╛рдЪреА рд╕реНрддреНрд░реЛрддрд╛рдВрдЪреА рдпрд╛рджреА рдЖрд╣реЗ; рдХреБрдареЗ рдареЗрд╡рд╛рдпрдЪреЗ рдпрд╛рдЪреА рдпрд╛рджреА рдЖрд╣реЗ; рдЬреЗрд╡реНрд╣рд╛ рд╕рд░реНрд╡рдХрд╛рд╣реА рдШрдбрд▓реЗ рдХрд┐рдВрд╡рд╛ рддреБрдЯрд▓реЗ рддреЗрд╡реНрд╣рд╛ рд╣реЙрдВрдХ рдХрд░рдгреНрдпрд╛рд╕ рд╡рд┐рд╕рд░реВ рдирдХрд╛ (рдареАрдХ рдЖрд╣реЗ, рд╣реЗ рдЖрдкрд▓реНрдпрд╛рдмрджреНрджрд▓ рдирд╛рд╣реА, рдирд╛рд╣реА).
рдЪрд▓рд╛ рдкреБрдиреНрд╣рд╛ рдлрд╛рдЗрд▓рдордзреНрдпреЗ рдЬрд╛рдК рдЖрдгрд┐ рдирд╡реАрди рдЕрд╕реНрдкрд╖реНрдЯ рд╕рд╛рдордЧреНрд░реА рдкрд╛рд╣реВ:
from commons.operators import TelegramBotSendMessage
- рдЖрдореНрд╣рд╛рд▓рд╛ рдЖрдордЪреЗ рд╕реНрд╡рддрдГрдЪреЗ рдСрдкрд░реЗрдЯрд░ рдмрдирд╡рдгреНрдпрд╛рдкрд╛рд╕реВрди рдХрд╛рд╣реАрд╣реА рдкреНрд░рддрд┐рдмрдВрдзрд┐рдд рдХрд░рдд рдирд╛рд╣реА, рдЬреНрдпрд╛рдЪрд╛ рдЖрдореНрд╣реА рдЕрдирдмреНрд▓реЙрдХ рдХреЗрд▓реЗрд▓реЗ рд╕рдВрджреЗрд╢ рдкрд╛рдард╡рдгреНрдпрд╛рд╕рд╛рдареА рдПрдХ рдЫреЛрдЯрд╛ рдЖрд╡рд░рдг рдмрдирд╡реВрди рдлрд╛рдпрджрд╛ рдШреЗрддрд▓рд╛. (рдЖрдореНрд╣реА рдЦрд╛рд▓реА рдпрд╛ рдСрдкрд░реЗрдЯрд░рдмрджреНрджрд▓ рдЕрдзрд┐рдХ рдмреЛрд▓реВ);default_args={}
- dag рддреНрдпрд╛рдЪреНрдпрд╛ рд╕рд░реНрд╡ рдСрдкрд░реЗрдЯрд░рдирд╛ рд╕рдорд╛рди рдпреБрдХреНрддрд┐рд╡рд╛рдж рд╡рд┐рддрд░рд┐рдд рдХрд░реВ рд╢рдХрддреЗ;to='{{ var.value.all_the_kings_men }}'
- рдлреАрд▓реНрдбto
рдЖрдордЪреНрдпрд╛рдХрдбреЗ рд╣рд╛рд░реНрдбрдХреЛрдб рдирд╕реЗрд▓, рдкрд░рдВрддреБ рдЬрд┐рдиреНрдЬрд╛ рд╡рд╛рдкрд░реВрди рдбрд╛рдпрдиреЕрдорд┐рдХрд▓реА рд╡реНрдпреБрддреНрдкрдиреНрди рдХреЗрд▓реЗ рдЬрд╛рдИрд▓ рдЖрдгрд┐ рдИрдореЗрд▓рдЪреНрдпрд╛ рд╕реВрдЪреАрд╕рд╣ рдПрдХ рд╡реНрд╣реЗрд░рд┐рдПрдмрд▓, рдЬреЗ рдореА рдХрд╛рд│рдЬреАрдкреВрд░реНрд╡рдХ рдареЗрд╡рд▓реЗ рдЖрд╣реЗAdmin/Variables
;trigger_rule=TriggerRule.ALL_SUCCESS
тАФ рдСрдкрд░реЗрдЯрд░ рд╕реБрд░реВ рдХрд░рдгреНрдпрд╛рдЪреА рдЕрдЯ. рдЖрдордЪреНрдпрд╛ рдмрд╛рдмрддреАрдд, рд╕рд░реНрд╡ рдЕрд╡рд▓рдВрдмрд┐рддреНрд╡ рдкреВрд░реНрдг рдЭрд╛рд▓реЗ рдЕрд╕рд▓реНрдпрд╛рд╕рдЪ рдкрддреНрд░ рдмреЙрд╕рдХрдбреЗ рдЬрд╛рдИрд▓ рдпрд╢рд╕реНрд╡реАрд░рд┐рддреНрдпрд╛;tg_bot_conn_id='tg_main'
- рдпреБрдХреНрддрд┐рд╡рд╛рджconn_id
рдЖрдореНрд╣реА рддрдпрд╛рд░ рдХреЗрд▓реЗрд▓реЗ рдХрдиреЗрдХреНрд╢рди рдЖрдпрдбреА рд╕реНрд╡реАрдХрд╛рд░рддреЛAdmin/Connections
;trigger_rule=TriggerRule.ONE_FAILED
- рдкрдбрд▓реЗрд▓реНрдпрд╛ рдХрд╛рд░реНрдпреЗ рдЕрд╕рд▓реНрдпрд╛рд╕рдЪ рдЯреЗрд▓рд┐рдЧреНрд░рд╛рдордордзреАрд▓ рд╕рдВрджреЗрд╢ рдЙрдбреВрди рдЬрд╛рддреАрд▓;task_concurrency=1
- рдЖрдореНрд╣реА рдПрдХрд╛ рдХрд╛рд░реНрдпрд╛рдЪреНрдпрд╛ рдЕрдиреЗрдХ рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдгреЗ рдПрдХрд╛рдЪ рд╡реЗрд│реА рд▓рд╛рдБрдЪ рдХрд░рдгреНрдпрд╛рд╕ рдкреНрд░рддрд┐рдмрдВрдзрд┐рдд рдХрд░рддреЛ. рдЕрдиреНрдпрдерд╛, рдЖрдореНрд╣рд╛рд▓рд╛ рдЕрдиреЗрдХрд╛рдВрдЪреЗ рдПрдХрд╛рдЪрд╡реЗрд│реА рдкреНрд░рдХреНрд╖реЗрдкрдг рдорд┐рд│реЗрд▓VerticaOperator
(рдПрдХрд╛ тАЛтАЛрдЯреЗрдмрд▓рдХрдбреЗ рдкрд╣рд╛рдд рдЖрд╣реЗ);report_update >> [email, tg]
- рд╕рд░реНрд╡VerticaOperator
рдпрд╛рдкреНрд░рдорд╛рдгреЗ рдкрддреНрд░реЗ рдЖрдгрд┐ рд╕рдВрджреЗрд╢ рдкрд╛рдард╡рд┐рдгреНрдпрд╛рдд рдПрдХрддреНрд░ рдпреЗрдгреЗ:
рдкрд░рдВрддреБ рдиреЛрдЯрд┐рдлрд╛рдпрд░ рдСрдкрд░реЗрдЯрд░реНрд╕рдЪреА рд▓реЙрдиреНрдЪ рдкрд░рд┐рд╕реНрдерд┐рддреА рднрд┐рдиреНрди рдЕрд╕рд▓реНрдпрд╛рдиреЗ, рдлрдХреНрдд рдПрдХрдЪ рдХрд╛рд░реНрдп рдХрд░реЗрд▓. рдЯреНрд░реА рд╡реНрд╣реНрдпреВрдордзреНрдпреЗ, рд╕рд░реНрд╡ рдХрд╛рд╣реА рдереЛрдбреЗ рдХрдореА рд╡реНрд╣рд┐рдЬреНрдпреБрдЕрд▓ рджрд┐рд╕рддреЗ:
рдореА рдпрд╛рдмрджреНрджрд▓ рдХрд╛рд╣реА рд╢рдмреНрдж рд╕рд╛рдВрдЧреЗрди рдореЕрдХреНрд░реЛ рдЖрдгрд┐ рддреНрдпрд╛рдВрдЪреЗ рдорд┐рддреНрд░ - рдЪрд▓.
рдореЕрдХреНрд░реЛ рд╣реЗ рдЬрд┐рдиреНрдЬрд╛ рдкреНрд▓реЗрд╕рд╣реЛрд▓реНрдбрд░ рдЖрд╣реЗрдд рдЬреЗ рдСрдкрд░реЗрдЯрд░ рд╡рд┐рддрд░реНрдХрд╛рдВрдордзреНрдпреЗ рд╡рд┐рд╡рд┐рдз рдЙрдкрдпреБрдХреНрдд рдорд╛рд╣рд┐рддреА рдмрджрд▓реВ рд╢рдХрддрд╛рдд. рдЙрджрд╛рд╣рд░рдгрд╛рд░реНрде, рдпрд╛рд╕рд╛рд░рдЦреЗ:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }}
рд╕рдВрджрд░реНрдн рд╡реНрд╣реЗрд░рд┐рдПрдмрд▓рдЪреНрдпрд╛ рд╕рд╛рдордЧреНрд░реАрдордзреНрдпреЗ рд╡рд┐рд╕реНрддрд╛рд░рд┐рдд рд╣реЛрдИрд▓ execution_date
рд╕реНрд╡рд░реВрдкрд╛рдд YYYY-MM-DD
: 2020-07-14
. рд╕рд░реНрд╡реЛрддреНрдХреГрд╖реНрдЯ рднрд╛рдЧ рдЕрд╕рд╛ рдЖрд╣реЗ рдХреА рд╕рдВрджрд░реНрдн рд╡реНрд╣реЗрд░рд┐рдПрдмрд▓реНрд╕ рдПрдХрд╛ рд╡рд┐рд╢рд┐рд╖реНрдЯ рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдгрд╛рд╡рд░ (рдЯреНрд░реА рд╡реНрд╣реНрдпреВрдордзреАрд▓ рд╕реНрдХреНрд╡реЗрдЕрд░) рдиреЗрд▓ рдХреЗрд▓реЗ рдЬрд╛рддрд╛рдд рдЖрдгрд┐ рд░реАрд╕реНрдЯрд╛рд░реНрдЯ рдХреЗрд▓реНрдпрд╛рд╡рд░, рдкреНрд▓реЗрд╕рд╣реЛрд▓реНрдбрд░ рд╕рдорд╛рди рдореВрд▓реНрдпрд╛рдВрдордзреНрдпреЗ рд╡рд┐рд╕реНрддреГрдд рд╣реЛрддреАрд▓.
рдирд┐рдпреБрдХреНрдд рдХреЗрд▓реЗрд▓реА рдореВрд▓реНрдпреЗ рдкреНрд░рддреНрдпреЗрдХ рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдгрд╛рд╡рд░ рдкреНрд░рд╕реНрддреБрдд рдмрдЯрдг рд╡рд╛рдкрд░реВрди рдкрд╛рд╣рд┐рд▓реА рдЬрд╛рдК рд╢рдХрддрд╛рдд. рдкрддреНрд░ рдкрд╛рдард╡рдгреНрдпрд╛рдЪреЗ рдХрд╛рд░реНрдп рдЕрд╕реЗ рдЖрд╣реЗ:
рдЖрдгрд┐ рдореНрд╣рдгреВрди рд╕рдВрджреЗрд╢ рдкрд╛рдард╡рдгреНрдпрд╛рдЪреНрдпрд╛ рдХрд╛рд░реНрдпрд╛рд╡рд░:
рдирд╡реАрдирддрдо рдЙрдкрд▓рдмреНрдз рдЖрд╡реГрддреНрддреАрд╕рд╛рдареА рдЕрдВрдЧрднреВрдд рдореЕрдХреНрд░реЛрдЪреА рд╕рдВрдкреВрд░реНрдг рдпрд╛рджреА рдпреЗрдереЗ рдЙрдкрд▓рдмреНрдз рдЖрд╣реЗ:
рд╢рд┐рд╡рд╛рдп, рдкреНрд▓рдЧрдЗрдирдЪреНрдпрд╛ рдорджрддреАрдиреЗ, рдЖрдореНрд╣реА рдЖрдордЪреЗ рд╕реНрд╡рддрдГрдЪреЗ рдореЕрдХреНрд░реЛ рдШреЛрд╖рд┐рдд рдХрд░реВ рд╢рдХрддреЛ, рдкрд░рдВрддреБ рддреА рджреБрд╕рд░реА рдЧреЛрд╖реНрдЯ рдЖрд╣реЗ.
рдкреВрд░реНрд╡рдирд┐рд░реНрдзрд╛рд░рд┐рдд рдЧреЛрд╖реНрдЯреАрдВрд╡реНрдпрддрд┐рд░рд┐рдХреНрдд, рдЖрдореНрд╣реА рдЖрдордЪреНрдпрд╛ рд╡реНрд╣реЗрд░рд┐рдПрдмрд▓реНрд╕рдЪреА рдореВрд▓реНрдпреЗ рдмрджрд▓реВ рд╢рдХрддреЛ (рдореА рд╣реЗ рдЖрдзреАрдЪ рд╡рд░реАрд▓ рдХреЛрдбрдордзреНрдпреЗ рд╡рд╛рдкрд░рд▓реЗ рдЖрд╣реЗ). рдЪреНрдпрд╛ рдордзреНрдпреЗ рддрдпрд╛рд░ рдХрд░реВ Admin/Variables
рдХрд╛рд╣реА рдЧреЛрд╖реНрдЯреА:
рдЖрдкрдг рд╡рд╛рдкрд░реВ рд╢рдХрддрд╛ рд╕рд░реНрд╡рдХрд╛рд╣реА:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')
рдореВрд▓реНрдп рд╕реНрдХреЗрд▓рд░ рдЕрд╕реВ рд╢рдХрддреЗ рдХрд┐рдВрд╡рд╛ рддреЗ JSON рджреЗрдЦреАрд▓ рдЕрд╕реВ рд╢рдХрддреЗ. JSON рдЪреНрдпрд╛ рдмрд╛рдмрддреАрдд:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}
рдлрдХреНрдд рдЗрдЪреНрдЫрд┐рдд рдХреАрдЪрд╛ рдорд╛рд░реНрдЧ рд╡рд╛рдкрд░рд╛: {{ var.json.bot_config.bot.token }}
.
рдореА рдЕрдХреНрд╖рд░рд╢рдГ рдПрдХ рд╢рдмреНрдж рдмреЛрд▓реЗрди рдЖрдгрд┐ рдПрдХ рд╕реНрдХреНрд░реАрдирд╢реЙрдЯ рджрд╛рдЦрд╡реЗрди рдХрдиреЗрдХреНрд╢рди. рдпреЗрдереЗ рд╕рд░реНрд╡ рдХрд╛рд╣реА рдкреНрд░рд╛рдердорд┐рдХ рдЖрд╣реЗ: рдкреГрд╖реНрдард╛рд╡рд░ Admin/Connections
рдЖрдореНрд╣реА рдПрдХ рдХрдиреЗрдХреНрд╢рди рддрдпрд╛рд░ рдХрд░рддреЛ, рдЖрдордЪреЗ рд▓реЙрдЧрд┐рди / рдкрд╛рд╕рд╡рд░реНрдб рдЖрдгрд┐ рдЕрдзрд┐рдХ рд╡рд┐рд╢рд┐рд╖реНрдЯ рдкреЕрд░рд╛рдореАрдЯрд░реНрд╕ рддреЗрдереЗ рдЬреЛрдбрддреЛ. рдпрд╛рдкреНрд░рдорд╛рдгреЗ:
рдкрд╛рд╕рд╡рд░реНрдб рдПрдиреНрдХреНрд░рд┐рдкреНрдЯ рдХреЗрд▓реЗ рдЬрд╛рдК рд╢рдХрддрд╛рдд (рдбреАрдлреЙрд▓реНрдЯрдкреЗрдХреНрд╖рд╛ рдЕрдзрд┐рдХ рдкреВрд░реНрдгрдкрдгреЗ), рдХрд┐рдВрд╡рд╛ рддреБрдореНрд╣реА рдХрдиреЗрдХреНрд╢рди рдкреНрд░рдХрд╛рд░ рд╕реЛрдбреВ рд╢рдХрддрд╛ (рдЬрд╕реЗ рдореА рдХреЗрд▓реЗ tg_main
) - рд╡рд╕реНрддреБрд╕реНрдерд┐рддреА рдЕрд╢реА рдЖрд╣реЗ рдХреА рдкреНрд░рдХрд╛рд░рд╛рдВрдЪреА рдпрд╛рджреА рдПрдЕрд░рдлреНрд▓реЛ рдореЙрдбреЗрд▓реНрд╕рдордзреНрдпреЗ рд╣рд╛рд░реНрдбрд╡рд╛рдпрд░ рдХреЗрд▓реЗрд▓реА рдЖрд╣реЗ рдЖрдгрд┐ рд╕реНрддреНрд░реЛрдд рдХреЛрдбрдордзреНрдпреЗ рдкреНрд░рд╡реЗрд╢ рдХреЗрд▓реНрдпрд╛рд╢рд┐рд╡рд╛рдп рд╡рд┐рд╕реНрддрд╛рд░рд┐рдд рдХреЗрд▓реА рдЬрд╛рдК рд╢рдХрдд рдирд╛рд╣реА (рдЕрдЪрд╛рдирдХ рдореА рдХрд╛рд╣реАрддрд░реА рдЧреБрдЧрд▓ рдХреЗрд▓реЗ рдирд╛рд╣реА рддрд░, рдХреГрдкрдпрд╛ рдорд▓рд╛ рджреБрд░реБрд╕реНрдд рдХрд░рд╛), рдкрд░рдВрддреБ рдХрд╛рд╣реАрд╣реА рдЖрдореНрд╣рд╛рд▓рд╛ рдХреНрд░реЗрдбрд┐рдЯреНрд╕ рдорд┐рд│рдгреНрдпрд╛рдкрд╛рд╕реВрди рд░реЛрдЦрдгрд╛рд░ рдирд╛рд╣реА рдирд╛рд╡
рдЖрдкрдг рдПрдХрд╛рдЪ рдирд╛рд╡рд╛рдиреЗ рдЕрдиреЗрдХ рдХрдиреЗрдХреНрд╢рди рджреЗрдЦреАрд▓ рдХрд░реВ рд╢рдХрддрд╛: рдпрд╛ рдкреНрд░рдХрд░рдгрд╛рдд, рдкрджреНрдзрдд BaseHook.get_connection()
, рдЬреЗ рдЖрдореНрд╣рд╛рд▓рд╛ рдирд╛рд╡рд╛рдиреЗ рдХрдиреЗрдХреНрд╢рди рдорд┐рд│рд╡рддреЗ, рджреЗрдИрд▓ рдпрд╛рджреГрдЪреНрдЫрд┐рдХ рдЕрдиреЗрдХ рдирд╛рд╡рд╛рдВрдордзреВрди (рд░рд╛рдКрдВрдб рд░реЙрдмрд┐рди рдмрдирд╡рдгреЗ рдЕрдзрд┐рдХ рддрд░реНрдХрд╕рдВрдЧрдд рдЕрд╕реЗрд▓, рдкрд░рдВрддреБ рдПрдЕрд░рдлреНрд▓реЛ рдбреЗрд╡реНрд╣рд▓рдкрд░рдЪреНрдпрд╛ рд╡рд┐рд╡реЗрдХрд╛рд╡рд░ рд╕реЛрдбреВрдпрд╛).
рд╡реНрд╣реЗрд░рд┐рдПрдмрд▓реНрд╕ рдЖрдгрд┐ рдХрдиреЗрдХреНрд╢рди рд╣реА рдирдХреНрдХреАрдЪ рдЫрд╛рди рд╕рд╛рдзрдиреЗ рдЖрд╣реЗрдд, рдкрд░рдВрддреБ рд╢рд┐рд▓реНрд▓рдХ рди рдЧрдорд╛рд╡рдгреЗ рдорд╣рддреНрд╡рд╛рдЪреЗ рдЖрд╣реЗ: рддреБрдордЪреНрдпрд╛ рдкреНрд░рд╡рд╛рд╣рд╛рдЪреЗ рдХреЛрдгрддреЗ рднрд╛рдЧ рддреБрдореНрд╣реА рдХреЛрдбрдордзреНрдпреЗрдЪ рд╕рд╛рдард╡рддрд╛ рдЖрдгрд┐ рд╕реНрдЯреЛрд░реЗрдЬрд╕рд╛рдареА рддреБрдореНрд╣реА рдПрдЕрд░рдлреНрд▓реЛрд▓рд╛ рдХреЛрдгрддреЗ рднрд╛рдЧ рджреЗрддрд╛. рдПрдХреАрдХрдбреЗ, UI рджреНрд╡рд╛рд░реЗ рддреНрд╡рд░реАрдд рдореВрд▓реНрдп рдмрджрд▓рдгреЗ рд╕реЛрдпреАрдЪреЗ рдЕрд╕реВ рд╢рдХрддреЗ, рдЙрджрд╛рд╣рд░рдгрд╛рд░реНрде, рдореЗрд▓рд┐рдВрдЧ рдмреЙрдХреНрд╕. рджреБрд╕рд░реАрдХрдбреЗ, рд╣реЗ рдЕрдЬреВрдирд╣реА рдорд╛рдКрд╕ рдХреНрд▓рд┐рдХрд╡рд░ рдкрд░рдд рдпреЗрдгреЗ рдЖрд╣реЗ, рдЬреНрдпрд╛рдкрд╛рд╕реВрди рдЖрдореНрд╣рд╛рд▓рд╛ (рдореА) рд╕реБрдЯрдХрд╛ рд╣рд╡реА рд╣реЛрддреА.
рдХрдиреЗрдХреНрд╢рдирд╕рд╣ рдХрд╛рд░реНрдп рдХрд░рдгреЗ рд╣реЗ рдХрд╛рд░реНрдпрд╛рдВрдкреИрдХреА рдПрдХ рдЖрд╣реЗ рд╣реБрдХ. рд╕рд░реНрд╡рд╕рд╛рдзрд╛рд░рдгрдкрдгреЗ, рдПрдЕрд░рдлреНрд▓реЛ рд╣реБрдХ рд╣реЗ рддреГрддреАрдп-рдкрдХреНрд╖ рд╕реЗрд╡рд╛ рдЖрдгрд┐ рд▓рд╛рдпрдмреНрд░рд░реАрд╢реА рдХрдиреЗрдХреНрдЯ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдкреЙрдЗрдВрдЯ рдЖрд╣реЗрдд. рдЙрджрд╛. JiraHook
рдЬрд┐рд░рд╛рд╢реА рд╕рдВрд╡рд╛рдж рд╕рд╛рдзрдгреНрдпрд╛рд╕рд╛рдареА рдЖрдордЪреНрдпрд╛рд╕рд╛рдареА рдХреНрд▓рд╛рдпрдВрдЯ рдЙрдШрдбреЗрд▓ (рддреБрдореНрд╣реА рдХрд╛рдореЗ рдкреБрдвреЗ-рдорд╛рдЧреЗ рд╣рд▓рд╡реВ рд╢рдХрддрд╛), рдЖрдгрд┐ рддреНрдпрд╛рдЪреНрдпрд╛ рдорджрддреАрдиреЗ SambaHook
рддреБрдореНрд╣реА рд╕реНрдерд╛рдирд┐рдХ рдлрд╛рдЗрд▓ рдкреБрд╢ рдХрд░реВ рд╢рдХрддрд╛ smb
-рдмрд┐рдВрджреВ.
рд╕рд╛рдиреБрдХреВрд▓ рдСрдкрд░реЗрдЯрд░рдЪреЗ рд╡рд┐рд╢реНрд▓реЗрд╖рдг
рдЖрдгрд┐ рдЖрдореНрд╣реА рддреЗ рдХрд╕реЗ рдмрдирд╡рд▓реЗ рдЖрд╣реЗ рддреЗ рдкрд╛рд╣рдгреНрдпрд╛рдЪреНрдпрд╛ рдЬрд╡рд│ рдкреЛрд╣реЛрдЪрд▓реЛ TelegramBotSendMessage
рдХреЛрдб commons/operators.py
рд╡рд╛рд╕реНрддрд╡рд┐рдХ рдСрдкрд░реЗрдЯрд░рд╕рд╣:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
рдпреЗрдереЗ, рдПрдЕрд░рдлреНрд▓реЛрдордзреАрд▓ рдЗрддрд░ рд╕рд░реНрд╡ рдЧреЛрд╖реНрдЯреАрдВрдкреНрд░рдорд╛рдгреЗ, рд╕рд░реНрд╡рдХрд╛рд╣реА рдЕрдЧрджреА рд╕реЛрдкреЗ рдЖрд╣реЗ:
- рдХрдбреВрди рд╡рд╛рд░рд╕рд╛ рдорд┐рд│рд╛рд▓рд╛
BaseOperator
, рдЬреЗ рдХрд╛рд╣реА рдПрдЕрд░рдлреНрд▓реЛ-рд╡рд┐рд╢рд┐рд╖реНрдЯ рдЧреЛрд╖реНрдЯреА рд▓рд╛рдЧреВ рдХрд░рддреЗ (рддреБрдордЪреНрдпрд╛ рд╡рд┐рд╢реНрд░рд╛рдВрддреАрдХрдбреЗ рдкрд╣рд╛) - рдШреЛрд╖рд┐рдд рдлреАрд▓реНрдб
template_fields
, рдЬреНрдпрд╛рдордзреНрдпреЗ Jinja рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдореЕрдХреНрд░реЛ рд╢реЛрдзреЗрд▓. - рд╕рд╛рдареА рдпреЛрдЧреНрдп рдпреБрдХреНрддрд┐рд╡рд╛рдж рдорд╛рдВрдбрд▓реЗ
__init__()
, рдЖрд╡рд╢реНрдпрдХ рддреЗрдереЗ рдбреАрдлреЙрд▓реНрдЯ рд╕реЗрдЯ рдХрд░рд╛. - рдЖрдореНрд╣реА рдкреВрд░реНрд╡рдЬрд╛рдВрдЪреНрдпрд╛ рдкреНрд░рд╛рд░рдВрднрд╛рдмрджреНрджрд▓ рджреЗрдЦреАрд▓ рд╡рд┐рд╕рд░рд▓реЛ рдирд╛рд╣реА.
- рд╕рдВрдмрдВрдзрд┐рдд рд╣реБрдХ рдЙрдШрдбрд▓рд╛
TelegramBotHook
рддреНрдпрд╛рддреВрди рдХреНрд▓рд╛рдпрдВрдЯ рдСрдмреНрдЬреЗрдХреНрдЯ рдкреНрд░рд╛рдкреНрдд рдЭрд╛рд▓рд╛. - рдЕрдзрд┐рд▓рд┐рдЦрд┐рдд (рдкреБрдиреНрд╣рд╛ рдкрд░рд┐рднрд╛рд╖рд┐рдд) рдкрджреНрдзрдд
BaseOperator.execute()
, рдЬреЗрд╡реНрд╣рд╛ рдСрдкрд░реЗрдЯрд░ рд▓реЙрдиреНрдЪ рдХрд░рдгреНрдпрд╛рдЪреА рд╡реЗрд│ рдпреЗрдИрд▓ рддреЗрд╡реНрд╣рд╛ рдХреЛрдгрддрд╛ Airfow рдЯреНрд╡рд┐рдЪ рдХрд░реЗрд▓ - рддреНрдпрд╛рдд рдЖрдореНрд╣реА рд▓реЙрдЧ рдЗрди рдХрд░рдгреЗ рд╡рд┐рд╕рд░реВрди рдореБрдЦреНрдп рдХреНрд░рд┐рдпрд╛ рдЕрдВрдорд▓рд╛рдд рдЖрдгреВ. (рдЖрдореНрд╣реА рд▓реЙрдЧ рдЗрди рдХрд░рддреЛ, рддрд╕реЗ, рдЕрдЧрджреА рдЖрддstdout
╨╕stderr
- рдПрдЕрд░рдлреНрд▓реЛ рд╕рд░реНрд╡рдХрд╛рд╣реА рд░реЛрдЦреЗрд▓, рддреЗ рд╕реБрдВрджрд░рдкрдгреЗ рдЧреБрдВрдбрд╛рд│реЗрд▓, рдЖрд╡рд╢реНрдпрдХ рдЕрд╕реЗрд▓ рддреЗрдереЗ рддреЗ рд╡рд┐рдШрдЯрд┐рдд рдХрд░реЗрд▓.)
рдЖрдкрдг рдХрд╛рдп рдЖрд╣реЗ рддреЗ рдкрд╛рд╣реВрдпрд╛ commons/hooks.py
. рдлрд╛рдЗрд▓рдЪрд╛ рдкрд╣рд┐рд▓рд╛ рднрд╛рдЧ, рд╣реБрдХрд╕рд╣:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
рдорд▓рд╛ рдпреЗрдереЗ рдХрд╛рдп рд╕реНрдкрд╖реНрдЯ рдХрд░рд╛рд╡реЗ рд╣реЗ рджреЗрдЦреАрд▓ рдорд╛рд╣рд┐рдд рдирд╛рд╣реА, рдореА рдлрдХреНрдд рдорд╣рддреНрд╡рд╛рдЪреЗ рдореБрджреНрджреЗ рд▓рдХреНрд╖рд╛рдд рдареЗрд╡рддреЛ:
- рдЖрдореНрд╣реА рд╡рд╛рд░рд╕рд╛ рдШреЗрддреЛ, рд╡рд┐рддрд░реНрдХрд╛рдВрдЪрд╛ рд╡рд┐рдЪрд╛рд░ рдХрд░рд╛ - рдмрд╣реБрддреЗрдХ рдкреНрд░рдХрд░рдгрд╛рдВрдордзреНрдпреЗ рддреЗ рдПрдХ рдЕрд╕реЗрд▓:
conn_id
; - рдорд╛рдирдХ рдкрджреНрдзрддреА рдУрд╡реНрд╣рд░рд░рд╛рдЗрдб рдХрд░рдгреЗ: рдореА рд╕реНрд╡рддрдГрд▓рд╛ рдорд░реНрдпрд╛рджрд┐рдд рдХреЗрд▓реЗ
get_conn()
, рдЬреНрдпрд╛рдордзреНрдпреЗ рдорд▓рд╛ рдирд╛рд╡рд╛рдиреБрд╕рд╛рд░ рдХрдиреЗрдХреНрд╢рди рдкреЕрд░рд╛рдореАрдЯрд░реНрд╕ рдорд┐рд│рддрд╛рдд рдЖрдгрд┐ рдлрдХреНрдд рд╡рд┐рднрд╛рдЧ рдорд┐рд│рддреЛextra
(рд╣реЗ рдПрдХ JSON рдлреАрд▓реНрдб рдЖрд╣реЗ), рдЬреНрдпрд╛рдордзреНрдпреЗ рдореА (рдорд╛рдЭреНрдпрд╛ рд╕реНрд╡рддрдГрдЪреНрдпрд╛ рд╕реВрдЪрдирд╛рдВрдиреБрд╕рд╛рд░!) рдЯреЗрд▓реАрдЧреНрд░рд╛рдо рдмреЙрдЯ рдЯреЛрдХрди рдареЗрд╡рд▓реЗ рдЖрд╣реЗ:{"bot_token": "YOuRAwEsomeBOtToKen"}
. - рдореА рдЖрдордЪреЗ рдПрдХ рдЙрджрд╛рд╣рд░рдг рддрдпрд╛рд░ рдХрд░рддреЛ
TelegramBot
, рддреНрдпрд╛рд▓рд╛ рд╡рд┐рд╢рд┐рд╖реНрдЯ рдЯреЛрдХрди рджреЗрдКрди.
рдЗрддрдХрдВрдЪ. рдЖрдкрдг рд╡рд╛рдкрд░реВрди рд╣реБрдХ рдкрд╛рд╕реВрди рдХреНрд▓рд╛рдпрдВрдЯ рдорд┐рд│рд╡реВ рд╢рдХрддрд╛ TelegramBotHook().clent
рдХрд┐рдВрд╡рд╛ TelegramBotHook().get_conn()
.
рдЖрдгрд┐ рдлрд╛рдЗрд▓рдЪрд╛ рджреБрд╕рд░рд╛ рднрд╛рдЧ, рдЬреНрдпрд╛рдордзреНрдпреЗ рдореА рдЯреЗрд▓реАрдЧреНрд░рд╛рдо REST API рд╕рд╛рдареА рдорд╛рдпрдХреНрд░реЛрд░реЕрдкрд░ рдмрдирд╡рддреЛ, рдЬреЗрдгреЗрдХрд░реВрди рддреЗ рдбреНрд░реЕрдЧ рд╣реЛрдК рдирдпреЗ. python-telegram-bot
sendMessage
.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))
рд╣реЗ рд╕рд░реНрд╡ рдЬреЛрдбрдгреНрдпрд╛рдЪрд╛ рдпреЛрдЧреНрдп рдорд╛рд░реНрдЧ рдЖрд╣реЗ:
TelegramBotSendMessage
,TelegramBotHook
,TelegramBot
- рдкреНрд▓рдЧрдЗрдирдордзреНрдпреЗ, рд╕рд╛рд░реНрд╡рдЬрдирд┐рдХ рднрд╛рдВрдбрд╛рд░рд╛рдд рдареЗрд╡рд╛ рдЖрдгрд┐ рддреЗ рдореБрдХреНрдд рд╕реНрддреНрд░реЛрддрд╛рд▓рд╛ рджреНрдпрд╛.
рдЖрдореНрд╣реА рдпрд╛ рд╕рд░реНрд╡рд╛рдВрдЪрд╛ рдЕрднреНрдпрд╛рд╕ рдХрд░рдд рдЕрд╕рддрд╛рдирд╛, рдЖрдордЪреЗ рдЕрд╣рд╡рд╛рд▓ рдЕрдкрдбреЗрдЯ рдпрд╢рд╕реНрд╡реАрд░рд┐рддреНрдпрд╛ рдЕрдпрд╢рд╕реНрд╡реА рдЭрд╛рд▓реЗ рдЖрдгрд┐ рдорд▓рд╛ рдЪреЕрдиреЗрд▓рдордзреНрдпреЗ рдПрдХ рддреНрд░реБрдЯреА рд╕рдВрджреЗрд╢ рдкрд╛рдард╡рд▓рд╛. рддреЗ рдЪреБрдХреАрдЪреЗ рдЖрд╣реЗ рдХрд╛ рддреЗ рдореА рддрдкрд╛рд╕рдгрд╛рд░ рдЖрд╣реЗ...
рдЖрдордЪреНрдпрд╛ рдХреБрддреНрд░реНрдпрд╛рдд рдХрд╛рд╣реАрддрд░реА рддреБрдЯрд▓реЗ! рдЖрдореНрд╣рд╛рд▓рд╛ рддреЗрдЪ рдЕрдкреЗрдХреНрд╖рд┐рдд рд╣реЛрддрдВ рдирд╛? рдирдХреНрдХреА!
рдУрддрдгрд╛рд░ рдЖрд╣рд╛рдд рдХрд╛?
рдорд╛рдЭреЗ рдХрд╛рд╣реА рдЪреБрдХрд▓реЗ рдЕрд╕реЗ рддреБрдореНрд╣рд╛рд▓рд╛ рд╡рд╛рдЯрддреЗ рдХрд╛? рдЕрд╕реЗ рджрд┐рд╕рддреЗ рдХреА рддреНрдпрд╛рдиреЗ рдПрд╕рдХреНрдпреВрдПрд▓ рд╕рд░реНрд╡реНрд╣рд░рд╡рд░реВрди рд╡реНрд╣рд░реНрдЯрд┐рдХрд╛рдордзреНрдпреЗ рдбреЗрдЯрд╛ рд╣рд╕реНрддрд╛рдВрддрд░рд┐рдд рдХрд░рдгреНрдпрд╛рдЪреЗ рд╡рдЪрди рджрд┐рд▓реЗ рд╣реЛрддреЗ, рдЖрдгрд┐ рдирдВрддрд░ рддреНрдпрд╛рдиреЗ рддреЛ рдШреЗрддрд▓рд╛ рдЖрдгрд┐ рд╡рд┐рд╖рдп рд╕реЛрдбреВрди рджрд┐рд▓рд╛, рдмрджрдорд╛рд╢!
рд╣рд╛ рдЕрддреНрдпрд╛рдЪрд╛рд░ рд╣реЗрддреБрдкреБрд░рд╕реНрд╕рд░ рд╣реЛрддрд╛, рдорд▓рд╛ рддреБрдордЪреНрдпрд╛рд╕рд╛рдареА рдХрд╛рд╣реА рд╢рдмреНрджрд╛рд╡рд▓реА рдЙрд▓рдЧрдбрд╛рдпрдЪреА рд╣реЛрддреА. рдЖрддрд╛ рдЖрдкрдг рдкреБрдвреЗ рдЬрд╛рдК рд╢рдХрддрд╛.
рдЖрдордЪреА рдпреЛрдЬрдирд╛ рдЕрд╢реА рд╣реЛрддреА:
- рдбреЗрдЧ рдХрд░рд╛
- рдХрд╛рд░реНрдпреЗ рд╡реНрдпреБрддреНрдкрдиреНрди рдХрд░рд╛
- рд╕рд░реНрд╡ рдХрд╛рд╣реА рдХрд┐рддреА рд╕реБрдВрджрд░ рдЖрд╣реЗ рддреЗ рдкрд╣рд╛
- рднрд░рдгреНрдпрд╛рд╕рд╛рдареА рд╕рддреНрд░ рдХреНрд░рдорд╛рдВрдХ рдирд┐рдпреБрдХреНрдд рдХрд░рд╛
- SQL рд╕рд░реНрд╡реНрд╣рд░ рд╡рд░реВрди рдбреЗрдЯрд╛ рдорд┐рд│рд╡рд╛
- рд╡реНрд╣рд░реНрдЯрд┐рдХрд╛ рдордзреНрдпреЗ рдбреЗрдЯрд╛ рдареЗрд╡рд╛
- рдЖрдХрдбреЗрд╡рд╛рд░реА рдЧреЛрд│рд╛ рдХрд░рд╛
рдореНрд╣рдгреВрди, рд╣реЗ рд╕рд░реНрд╡ рдЪрд╛рд▓реВ рдареЗрд╡рдгреНрдпрд╛рд╕рд╛рдареА, рдореА рдЖрдордЪреНрдпрд╛рдордзреНрдпреЗ рдПрдХ рдЫреЛрдЯреАрд╢реА рднрд░ рдЯрд╛рдХрд▓реА docker-compose.yml
:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.py
рддреЗрдереЗ рдЖрдореНрд╣реА рд╡рд╛рдврд╡рддреЛ:
- рд╡реНрд╣рд░реНрдЯрд┐рдХрд╛ рдпрдЬрдорд╛рди рдореНрд╣рдгреВрди
dwh
рд╕рд░реНрд╡рд╛рдд рдбреАрдлреЙрд▓реНрдЯ рд╕реЗрдЯрд┐рдВрдЧреНрдЬрд╕рд╣, - SQL рд╕рд░реНрд╡реНрд╣рд░рдЪреА рддреАрди рдЙрджрд╛рд╣рд░рдгреЗ,
- рдЖрдореНрд╣реА рдирдВрддрд░рдЪреЗ рдбреЗрдЯрд╛рдмреЗрд╕ рдХрд╛рд╣реА рдбреЗрдЯрд╛рд╕рд╣ рднрд░рддреЛ (рдХреЛрдгрддреНрдпрд╛рд╣реА рдкрд░рд┐рд╕реНрдерд┐рддреАрдд рдкрд╛рд╣реВ рдирдХрд╛
mssql_init.py
!)
рдЖрдореНрд╣реА рдорд╛рдЧреАрд▓ рд╡реЗрд│реЗрдкреЗрдХреНрд╖рд╛ рдХрд┐рдВрдЪрд┐рдд рдЕрдзрд┐рдХ рдХреНрд▓рд┐рд╖реНрдЯ рдХрдорд╛рдВрдбрдЪреНрдпрд╛ рдорджрддреАрдиреЗ рд╕рд░реНрд╡ рдЪрд╛рдВрдЧрд▓реЗ рд▓реЙрдиреНрдЪ рдХрд░рддреЛ:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
рдЖрдордЪреНрдпрд╛ рдЪрдорддреНрдХрд╛рд░реА рдпрд╛рджреГрдЪреНрдЫрд┐рдХрддреЗрдиреЗ рдХрд╛рдп рддрдпрд╛рд░ рдХреЗрд▓реЗ, рдЖрдкрдг рдЖрдпрдЯрдо рд╡рд╛рдкрд░реВ рд╢рдХрддрд╛ Data Profiling/Ad Hoc Query
:
рдореБрдЦреНрдп рдЧреЛрд╖реНрдЯ рд╡рд┐рд╢реНрд▓реЗрд╖рдХрд╛рдВрдирд╛ рджрд╛рдЦрд╡рдгреЗ рдирд╛рд╣реА
рдЪреНрдпрд╛ рд╡рд░ рддрдкрд╢реАрд▓рд╡рд╛рд░ ETL рд╕рддреНрд░реЗ рдореА рдХрд░рдгрд╛рд░ рдирд╛рд╣реА, рддрд┐рдереЗ рд╕рд░реНрд╡ рдХрд╛рд╣реА рдХреНрд╖реБрд▓реНрд▓рдХ рдЖрд╣реЗ: рдЖрдореНрд╣реА рдПрдХ рдЖрдзрд╛рд░ рдмрдирд╡рддреЛ, рддреНрдпрд╛рдд рдПрдХ рдЪрд┐рдиреНрд╣ рдЖрд╣реЗ, рдЖрдореНрд╣реА рдкреНрд░рддреНрдпреЗрдХ рдЧреЛрд╖реНрдЯ рд╕рдВрджрд░реНрдн рд╡реНрдпрд╡рд╕реНрдерд╛рдкрдХрд╛рд╕рд╣ рдЧреБрдВрдбрд╛рд│рддреЛ рдЖрдгрд┐ рдЖрддрд╛ рдЖрдореНрд╣реА рд╣реЗ рдХрд░рддреЛ:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
рд╡реЗрд│ рдЖрд▓реА рдЖрд╣реЗ рдЖрдордЪрд╛ рдбреЗрдЯрд╛ рдЧреЛрд│рд╛ рдХрд░рд╛ рдЖрдордЪреНрдпрд╛ рджреАрдбрд╢реЗ рдЯреЗрдмрд▓рд╛рдВрд╡рд░реВрди. рдЪрд▓рд╛ рд╣реЗ рдЕрдЧрджреА рдирдореНрд░ рдУрд│реАрдВрдЪреНрдпрд╛ рдорджрддреАрдиреЗ рдХрд░реВрдпрд╛:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
- рд╣реБрдХрдЪреНрдпрд╛ рдорджрддреАрдиреЗ рдЖрдореНрд╣реА рдПрдЕрд░рдлреНрд▓реЛрдордзреВрди рдорд┐рд│рд╡рддреЛ
pymssql
- рдХрдиреЗрдХреНрдЯ рдХрд░рд╛ - рдЪрд▓рд╛ рд╡рд┐рдирдВрддреАрдордзреНрдпреЗ рддрд╛рд░рдЦреЗрдЪреНрдпрд╛ рд░реВрдкрд╛рдд рдкреНрд░рддрд┐рдмрдВрдз рдмрджрд▓реВ - рддреЗ рдЯреЗрдореНрдкрд▓реЗрдЯ рдЗрдВрдЬрд┐рдирджреНрд╡рд╛рд░реЗ рдлрдВрдХреНрд╢рдирдордзреНрдпреЗ рдЯрд╛рдХрд▓реЗ рдЬрд╛рдИрд▓.
- рдЖрдордЪреНрдпрд╛ рд╡рд┐рдирдВрддреАрд▓рд╛ рдЖрд╣рд╛рд░ рджреЗрдгреЗ
pandas
рдХреЛрдг рдЖрдореНрд╣рд╛рд▓рд╛ рдорд┐рд│реЗрд▓DataFrame
- рднрд╡рд┐рд╖реНрдпрд╛рдд рддреЗ рдЖрдореНрд╣рд╛рд▓рд╛ рдЙрдкрдпреБрдХреНрдд рдард░реЗрд▓.
рдореА рдкреНрд░рддрд┐рд╕реНрдерд╛рдкрди рд╡рд╛рдкрд░рдд рдЖрд╣реЗ
{dt}
рд╡рд┐рдирдВрддреА рдкреЕрд░рд╛рдореАрдЯрд░ рдРрд╡рдЬреА%s
рдореА рдПрдХ рджреБрд╖реНрдЯ рдкрд┐рдиреЛрдЪрд┐рдпреЛ рдЖрд╣реЗ рдореНрд╣рдгреВрди рдирд╛рд╣реА рддрд░pandas
рд╣рд╛рддрд╛рд│реВ рд╢рдХрдд рдирд╛рд╣реАpymssql
рдЖрдгрд┐ рд╢реЗрд╡рдЯрдЪрд╛ рд╕рд░рдХрддреЛparams: List
рдЬрд░реА рддреНрдпрд╛рд▓рд╛ рдЦрд░реЛрдЦрд░ рд╣рд╡реЗ рдЖрд╣реЗtuple
.
рд╣реЗ рджреЗрдЦреАрд▓ рд▓рдХреНрд╖рд╛рдд рдареЗрд╡рд╛ рдХреА рд╡рд┐рдХрд╕рдХpymssql
рддреНрдпрд╛рд▓рд╛ рдпрд╛рдкреБрдвреЗ рдкрд╛рдард┐рдВрдмрд╛ рди рджреЗрдгреНрдпрд╛рдЪрд╛ рдирд┐рд░реНрдгрдп рдШреЗрддрд▓рд╛ рдЖрдгрд┐ рдЖрддрд╛ рдмрд╛рд╣реЗрд░ рдкрдбрдгреНрдпрд╛рдЪреА рд╡реЗрд│ рдЖрд▓реА рдЖрд╣реЗpyodbc
.
рдПрдЕрд░рдлреНрд▓реЛрдиреЗ рдЖрдордЪреНрдпрд╛ рдлрдВрдХреНрд╢рдиреНрд╕рдЪреЗ рдпреБрдХреНрддрд┐рд╡рд╛рдж рдХрд╢рд╛рд╕рд╣ рднрд░рд▓реЗ рддреЗ рдкрд╛рд╣реВрдпрд╛:
рдЬрд░ рдбреЗрдЯрд╛ рдирд╕реЗрд▓, рддрд░ рдкреБрдвреЗ рдЪрд╛рд▓реВ рдареЗрд╡рдгреНрдпрд╛рдд рдХрд╛рд╣реА рдЕрд░реНрде рдирд╛рд╣реА. рдкрд░рдВрддреБ рднрд░рдгреЗ рдпрд╢рд╕реНрд╡реА рдорд╛рдирдгреЗ рджреЗрдЦреАрд▓ рд╡рд┐рдЪрд┐рддреНрд░ рдЖрд╣реЗ. рдкрдг рд╣реА рдЪреВрдХ рдирд╛рд╣реА. рдП-рдЖрд╣-рдЖрд╣, рдХрд╛рдп рдХрд░рд╛рд╡реЗ ?! рдЖрдгрд┐ рдпреЗрдереЗ рдХрд╛рдп рдЖрд╣реЗ:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException
рдПрдЕрд░рдлреНрд▓реЛрд▓рд╛ рд╕рд╛рдВрдЧрддреЗ рдХреА рдХреЛрдгрддреНрдпрд╛рд╣реА рддреНрд░реБрдЯреА рдирд╛рд╣реАрдд, рдкрд░рдВрддреБ рдЖрдореНрд╣реА рдХрд╛рд░реНрдп рд╡рдЧрд│рддреЛ. рдЗрдВрдЯрд░рдлреЗрд╕рдордзреНрдпреЗ рд╣рд┐рд░рд╡рд╛ рдХрд┐рдВрд╡рд╛ рд▓рд╛рд▓ рдЪреМрд░рд╕ рдирд╕реВрди рдЧреБрд▓рд╛рдмреА рдЕрд╕реЗрд▓.
рдЪрд▓рд╛ рдЖрдордЪрд╛ рдбреЗрдЯрд╛ рдЯрд╛рдХреВрдпрд╛ рдПрдХрд╛рдзрд┐рдХ рд╕реНрддрдВрдн:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])
рдореНрд╣рдгрдЬреЗ:
- рдЖрдореНрд╣реА рдЬреНрдпрд╛ рдбреЗрдЯрд╛рдмреЗрд╕рдордзреВрди рдСрд░реНрдбрд░ рдШреЗрддрд▓реА,
- рдЖрдордЪреНрдпрд╛ рдкреВрд░ рд╕рддреНрд░рд╛рдЪрд╛ рдЖрдпрдбреА (рддреЛ рд╡реЗрдЧрд│рд╛ рдЕрд╕реЗрд▓ рдкреНрд░рддреНрдпреЗрдХ рдХрд╛рдорд╛рд╕рд╛рдареА),
- рд╕реНрддреНрд░реЛрдд рдЖрдгрд┐ рдСрд░реНрдбрд░ рдЖрдпрдбреА рд╡рд░реВрди рд╣реЕрд╢ - рдЬреЗрдгреЗрдХрд░реВрди рдЕрдВрддрд┐рдо рдбреЗрдЯрд╛рдмреЗрд╕рдордзреНрдпреЗ (рдЬреЗрдереЗ рд╕рд░реНрд╡ рдХрд╛рд╣реА рдПрдХрд╛ рдЯреЗрдмрд▓рдордзреНрдпреЗ рдУрддрд▓реЗ рдЬрд╛рддреЗ) рдЖрдордЪреНрдпрд╛рдХрдбреЗ рдПрдХ рдЕрджреНрд╡рд┐рддреАрдп рдСрд░реНрдбрд░ рдЖрдпрдбреА рдЖрд╣реЗ.
рд╢реЗрд╡рдЯрдЪреА рдкрд╛рдпрд░реА рд╢рд┐рд▓реНрд▓рдХ рдЖрд╣реЗ: рд╕рд░реНрд╡ рдХрд╛рд╣реА рд╡реНрд╣рд░реНрдЯрд┐рдХрд╛рдордзреНрдпреЗ рдШрд╛рд▓рд╛. рдЖрдгрд┐, рд╡рд┐рдЪрд┐рддреНрд░рдкрдгреЗ, рд╣реЗ рдХрд░рдгреНрдпрд╛рдЪрд╛ рд╕рд░реНрд╡рд╛рдд рдиреЗрддреНрд░рджреАрдкрдХ рдЖрдгрд┐ рдХрд╛рд░реНрдпрдХреНрд╖рдо рдорд╛рд░реНрдЧрд╛рдВрдкреИрдХреА рдПрдХ рдореНрд╣рдгрдЬреЗ CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
- рдЖрдореНрд╣реА рдПрдХ рд╡рд┐рд╢реЗрд╖ рд░рд┐рд╕реАрд╡реНрд╣рд░ рдмрдирд╡рдд рдЖрд╣реЛрдд
StringIO
. pandas
рдХреГрдкрдпрд╛ рдЖрдордЪреНрдпрд╛ рдареЗрд╡реВDataFrame
рдлреЙрд░реНрдо рдордзреНрдпреЗCSV
-рд░реЗрд╖рд╛.- рдЪрд▓рд╛ рдЖрдордЪреНрдпрд╛ рдЖрд╡рдбрддреНрдпрд╛ рд╡реНрд╣рд░реНрдЯрд┐рдХрд╛рд▓рд╛ рд╣реБрдХрд╕рд╣ рдХрдиреЗрдХреНрд╢рди рдЙрдШрдбреВрдпрд╛.
- рдЖрдгрд┐ рдЖрддрд╛ рдорджрддреАрдиреЗ
copy()
рдЖрдордЪрд╛ рдбреЗрдЯрд╛ рдереЗрдЯ Vertika рд╡рд░ рдкрд╛рдард╡рд╛!
рдЖрдореНрд╣реА рдбреНрд░рд╛рдпрд╡реНрд╣рд░рдХрдбреВрди рдХрд┐рддреА рдУрд│реА рднрд░рд▓реНрдпрд╛ рдЖрд╣реЗрдд рддреЗ рдШреЗрдК рдЖрдгрд┐ рд╕рддреНрд░ рд╡реНрдпрд╡рд╕реНрдерд╛рдкрдХрд╛рд▓рд╛ рд╕рд╛рдВрдЧреВ рдХреА рд╕рд░реНрд╡ рдХрд╛рд╣реА рдареАрдХ рдЖрд╣реЗ:
session.loaded_rows = cursor.rowcount
session.successful = True
рдПрд╡рдвреЗрдЪ.
рд╡рд┐рдХреНрд░реАрд╡рд░, рдЖрдореНрд╣реА рд▓рдХреНрд╖реНрдп рдкреНрд▓реЗрдЯ рд╕реНрд╡рддрдГ рддрдпрд╛рд░ рдХрд░рддреЛ. рдпреЗрдереЗ рдореА рд╕реНрд╡рдд: рд▓рд╛ рдПрдХ рд▓рд╣рд╛рди рдорд╢реАрди рдкрд░рд╡рд╛рдирдЧреА рджрд┐рд▓реА:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
рдореА рд╡рд╛рдкрд░рдд рдЖрд╣реЗ
VerticaOperator()
рдореА рдбреЗрдЯрд╛рдмреЗрд╕ рд╕реНрдХреАрдорд╛ рдЖрдгрд┐ рдЯреЗрдмрд▓ рддрдпрд╛рд░ рдХрд░рддреЛ (рдЬрд░ рддреЗ рдЖрдзреАрдкрд╛рд╕реВрди рдЕрд╕реНрддрд┐рддреНрд╡рд╛рдд рдирд╕реЗрд▓ рддрд░ рдирдХреНрдХреАрдЪ). рдореБрдЦреНрдп рдЧреЛрд╖реНрдЯ рдореНрд╣рдгрдЬреЗ рдЕрд╡рд▓рдВрдмрдирд╛рдВрдЪреА рдпреЛрдЧреНрдпрд░рд┐рддреНрдпрд╛ рд╡реНрдпрд╡рд╕реНрдерд╛ рдХрд░рдгреЗ:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> load
рдЧреЛрд│рд╛ рдХрд░реАрдд рдЖрд╣реЗ
- рдмрд░рдВ, - рд▓рд╣рд╛рди рдЙрдВрджреАрд░ рдореНрд╣рдгрд╛рд▓рд╛, - рдЖрддрд╛ рдирд╛рд╣реА рдХрд╛
рдореА рдЬрдВрдЧрд▓рд╛рддреАрд▓ рд╕рд░реНрд╡рд╛рдд рднрдпрдВрдХрд░ рдкреНрд░рд╛рдгреА рдЖрд╣реЗ рдпрд╛рдЪреА рддреБрдореНрд╣рд╛рд▓рд╛ рдЦрд╛рддреНрд░реА рдкрдЯрд▓реА рдЖрд╣реЗ рдХрд╛?
рдЬреНрдпреБрд▓рд┐рдпрд╛ рдбреЛрдирд╛рд▓реНрдбрд╕рди, рдж рдЧреНрд░реБрдлреЗрд▓реЛ
рдорд▓рд╛ рд╡рд╛рдЯрддреЗ рдХреА рдорд╛рдЭреНрдпрд╛ рд╕рд╣рдХрд╛рд░реНтАНрдпрд╛рдВрдордзреНрдпреЗ рдЖрдгрд┐ рдорд╛рдЭреНрдпрд╛рдд рд╕реНрдкрд░реНрдзрд╛ рдЕрд╕реЗрд▓ рддрд░: рдХреЛрдг рдкрдЯрдХрди рд╕реБрд░рд╡рд╛рддреАрдкрд╛рд╕реВрди ETL рдкреНрд░рдХреНрд░рд┐рдпрд╛ рддрдпрд╛рд░ рдХрд░реЗрд▓ рдЖрдгрд┐ рд▓реЙрдиреНрдЪ рдХрд░реЗрд▓: рддреЗ рддреНрдпрд╛рдВрдЪреНрдпрд╛ SSIS рдЖрдгрд┐ рдорд╛рдЙрд╕рд╕рд╣ рдЖрдгрд┐ рдореА рдПрдЕрд░рдлреНрд▓реЛрд╕рд╣ ... рдЖрдгрд┐ рдордЧ рдЖрдореНрд╣реА рджреЗрдЦрднрд╛рд▓ рд╕реБрд▓рднрддреЗрдЪреА рддреБрд▓рдирд╛ рджреЗрдЦреАрд▓ рдХрд░реВ ... рд╡реНрд╡рд╛, рдорд▓рд╛ рд╡рд╛рдЯрддреЗ рдХреА рдореА рддреНрдпрд╛рдВрдирд╛ рд╕рд░реНрд╡ рдЖрдШрд╛рдбреНрдпрд╛рдВрд╡рд░ рдкрд░рд╛рднреВрдд рдХрд░реЗрди рд╣реЗ рддреБрдореНрд╣реА рдорд╛рдиреНрдп рдХрд░рд╛рд▓!
рдЬрд░ рдереЛрдбреЗ рдЕрдзрд┐рдХ рдЧрдВрднреАрд░рдкрдгреЗ рдкрд╛рд╣рд┐рд▓реЗ рддрд░, рдЕрдкрд╛рдЪреЗ рдПрдЕрд░рдлреНрд▓реЛ - рдкреНрд░реЛрдЧреНрд░рд╛рдо рдХреЛрдбрдЪреНрдпрд╛ рд░реВрдкрд╛рдд рдкреНрд░рдХреНрд░рд┐рдпрд╛рдВрдЪреЗ рд╡рд░реНрдгрди рдХрд░реВрди - рдорд╛рдЭреЗ рдХрд╛рдо рдХреЗрд▓реЗ рдЦреВрдк рдЕрдзрд┐рдХ рдЖрд░рд╛рдорджрд╛рдпрдХ рдЖрдгрд┐ рдЖрдирдВрджрджрд╛рдпрдХ.
рддреНрдпрд╛рдЪреА рдЕрдорд░реНрдпрд╛рджрд┐рдд рд╡рд┐рд╕реНрддрд╛рд░рдХреНрд╖рдорддрд╛, рдкреНрд▓рдЧ-рдЗрди рдЖрдгрд┐ рд╕реНрдХреЗрд▓реЗрдмрд┐рд▓рд┐рдЯреАрдЪреА рдкреВрд░реНрд╡рд╕реНрдерд┐рддреА рдпрд╛ рджреЛрдиреНрд╣реА рдмрд╛рдмрддреАрдд, рдЖрдкрд▓реНрдпрд╛рд▓рд╛ рдЬрд╡рд│рдЬрд╡рд│ рдХреЛрдгрддреНрдпрд╛рд╣реА рдХреНрд╖реЗрддреНрд░рд╛рдд рдПрдЕрд░рдлреНрд▓реЛ рд╡рд╛рдкрд░рдгреНрдпрд╛рдЪреА рд╕рдВрдзреА рджреЗрддреЗ: рдЕрдЧрджреА рдбреЗрдЯрд╛ рдЧреЛрд│рд╛ рдХрд░рдгреЗ, рддрдпрд╛рд░ рдХрд░рдгреЗ рдЖрдгрд┐ рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдХрд░рдгреЗ рдпрд╛ рд╕рдВрдкреВрд░реНрдг рдЪрдХреНрд░рд╛рдд, рдЕрдЧрджреА рд░реЙрдХреЗрдЯреНрд╕ (рдордВрдЧрд│рд╛рд╡рд░, рдЪреНрдпрд╛ рдкреНрд░рдХреНрд╖реЗрдкрдгрд╛рдд) рдЕрд░реНрдерд╛рдд).
рднрд╛рдЧ рдЕрдВрддрд┐рдо, рд╕рдВрджрд░реНрдн рдЖрдгрд┐ рдорд╛рд╣рд┐рддреА
рдЖрдореНрд╣реА рддреБрдордЪреНрдпрд╛рд╕рд╛рдареА рдЧреЛрд│рд╛ рдХреЗрд▓реЗрд▓рд╛ рд░реЗрдХ
start_date
. рд╣реЛрдп, рд╣реЗ рдЖрдзреАрдЪ рд╕реНрдерд╛рдирд┐рдХ рдореЗрдо рдЖрд╣реЗ. рд╡рд╛рдпрд╛ рдбрдЧрдЪрд╛ рдореБрдЦреНрдп рдпреБрдХреНрддрд┐рд╡рд╛рджstart_date
рд╕рд░реНрд╡ рдкрд╛рд╕. рдереЛрдбрдХреНрдпрд╛рдд, рдЖрдкрдг рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХреЗрд▓реНрдпрд╛рд╕start_date
рд╡рд░реНрддрдорд╛рди рддрд╛рд░реАрдЦ, рдЖрдгрд┐schedule_interval
- рдПрдХ рджрд┐рд╡рд╕, рдирдВрддрд░ рдбреАрдПрдЬреА рдЙрджреНрдпрд╛ рд╕реБрд░реВ рд╣реЛрдИрд▓.start_date = datetime(2020, 7, 7, 0, 1, 2)
рдЖрдгрд┐ рдЖрдгрдЦреА рд╕рдорд╕реНрдпрд╛ рдирд╛рд╣реАрдд.
рддреНрдпрд╛рдЪреНрдпрд╛рд╢реА рд╕рдВрдмрдВрдзрд┐рдд рдЖрдгрдЦреА рдПрдХ рд░рдирдЯрд╛рдЗрдо рддреНрд░реБрдЯреА рдЖрд╣реЗ:
Task is missing the start_date parameter
, рдЬреЗ рдмрд╣реБрддреЗрдХрджрд╛ рд╕реВрдЪрд┐рдд рдХрд░рддреЗ рдХреА рдЖрдкрдг рдбреЕрдЧ рдСрдкрд░реЗрдЯрд░рд▓рд╛ рдмрд╛рдВрдзрд╛рдпрд▓рд╛ рд╡рд┐рд╕рд░рд▓рд╛рдд.- рд╕рд░реНрд╡ рдПрдХрд╛рдЪ рдорд╢реАрдирд╡рд░. рд╣реЛрдп, рдЖрдгрд┐ рдмреЗрд╕ (рдПрдЕрд░рдлреНрд▓реЛ рд╕реНрд╡рддрдГ рдЖрдгрд┐ рдЖрдордЪреЗ рдХреЛрдЯрд┐рдВрдЧ), рдЖрдгрд┐ рдПрдХ рд╡реЗрдм рд╕рд░реНрд╡реНрд╣рд░, рдЖрдгрд┐ рд╢реЗрдбреНрдпреВрд▓рд░ рдЖрдгрд┐ рдХрд╛рдордЧрд╛рд░. рдЖрдгрд┐ рддреЗ рдХрд╛рдорд╣реА рдХреЗрд▓реЗ. рдкрд░рдВрддреБ рдХрд╛рд▓рд╛рдВрддрд░рд╛рдиреЗ, рд╕реЗрд╡рд╛рдВрд╕рд╛рдареА рдХрд╛рд░реНрдпрд╛рдВрдЪреА рд╕рдВрдЦреНрдпрд╛ рд╡рд╛рдврд▓реА, рдЖрдгрд┐ рдЬреЗрд╡реНрд╣рд╛ PostgreSQL рдиреЗ 20 ms рдРрд╡рдЬреА 5 s рдордзреНрдпреЗ рдирд┐рд░реНрджреЗрд╢рд╛рдВрдХрд╛рд▓рд╛ рдкреНрд░рддрд┐рд╕рд╛рдж рджреЗрдгреНрдпрд╛рд╕ рд╕реБрд░реБрд╡рд╛рдд рдХреЗрд▓реА, рддреЗрд╡реНрд╣рд╛ рдЖрдореНрд╣реА рддреЗ рдШреЗрддрд▓реЗ рдЖрдгрд┐ рджреВрд░ рдиреЗрд▓реЗ.
- LocalExecutor. рд╣реЛрдп, рдЖрдореНрд╣реА рдЕрдЬреВрдирд╣реА рддреНрдпрд╛рд╡рд░ рдмрд╕рд▓реЛ рдЖрд╣реЛрдд рдЖрдгрд┐ рдЖрдореНрд╣реА рдЖрдзреАрдЪ рдЕрдерд╛рдВрдЧ рдбреЛрд╣рд╛рдЪреНрдпрд╛ рдХрд╛рдард╛рд╡рд░ рдЖрд▓реЛ рдЖрд╣реЛрдд. LocalExecutor рдЖрддрд╛рдкрд░реНрдпрдВрдд рдЖрдордЪреНрдпрд╛рд╕рд╛рдареА рдкреБрд░реЗрд╕реЗ рдЖрд╣реЗ, рдкрд░рдВрддреБ рдЖрддрд╛ рдХрд┐рдорд╛рди рдПрдХрд╛ рдХрд╛рдордЧрд╛рд░рд╛рд╕рд╣ рд╡рд┐рд╕реНрддрд╛рд░рд┐рдд рдХрд░рдгреНрдпрд╛рдЪреА рд╡реЗрд│ рдЖрд▓реА рдЖрд╣реЗ рдЖрдгрд┐ рдЖрдореНрд╣рд╛рд▓рд╛ CeleryExecutor рд╡рд░ рдЬрд╛рдгреНрдпрд╛рд╕рд╛рдареА рдХрдареЛрд░ рдкрд░рд┐рд╢реНрд░рдо рдХрд░рд╛рд╡реЗ рд▓рд╛рдЧрддреАрд▓. рдЖрдгрд┐ рдЖрдкрдг рдПрдХрд╛ рдорд╢реАрдирд╡рд░ рддреНрдпрд╛рдЪреНрдпрд╛рд╕рд╣ рдХрд╛рд░реНрдп рдХрд░реВ рд╢рдХрддрд╛ рд╣реЗ рд▓рдХреНрд╖рд╛рдд рдШреЗрддрд╛, рд╕рд░реНрд╡реНрд╣рд░рд╡рд░ рджреЗрдЦреАрд▓ рд╕реЗрд▓реЗрд░реА рд╡рд╛рдкрд░рдгреНрдпрд╛рдкрд╛рд╕реВрди рдХрд╛рд╣реАрд╣реА рдерд╛рдВрдмрд╡рдд рдирд╛рд╣реА, рдЬреЗ "рдЕрд░реНрдерд╛рдд, рдкреНрд░рд╛рдорд╛рдгрд┐рдХрдкрдгреЗ рдХрдзреАрд╣реА рдЙрддреНрдкрд╛рджрдирд╛рдд рдЬрд╛рдгрд╛рд░ рдирд╛рд╣реА!"
- рди рд╡рд╛рдкрд░рдгреЗ рдЕрдВрдЧрднреВрдд рд╕рд╛рдзрдиреЗ:
- рдЬреЛрдбрдгреНрдпрд╛ рд╕реЗрд╡рд╛ рдХреНрд░реЗрдбреЗрдиреНрд╢рд┐рдпрд▓реНрд╕ рд╕рд╛рдард╡рдгреНрдпрд╛рд╕рд╛рдареА,
- SLA рдЪреБрдХрддреЛ рд╡реЗрд│реЗрд╡рд░ рдкреВрд░реНрдг рди рдЭрд╛рд▓реЗрд▓реНрдпрд╛ рдХрд╛рдорд╛рдВрдирд╛ рдкреНрд░рддрд┐рд╕рд╛рдж рджреЗрдгреЗ,
- xcom рдореЗрдЯрд╛рдбреЗрдЯрд╛ рдПрдХреНрд╕рдЪреЗрдВрдЬрд╕рд╛рдареА (рдореА рдореНрд╣рдгрд╛рд▓реЛ рдореЗрдЯрд╛рдбреЗрдЯрд╛!) рдбреЕрдЧ рдЯрд╛рд╕реНрдХ рджрд░рдореНрдпрд╛рди.
- рдореЗрд▓рдЪрд╛ рдЧреИрд░рд╡рд╛рдкрд░. рдмрд░рдВ, рдореА рдХрд╛рдп рд╕рд╛рдВрдЧреВ? рдкрдбрд▓реЗрд▓реНрдпрд╛ рдХрд╛рдорд╛рдВрдЪреНрдпрд╛ рд╕рд░реНрд╡ рдкреБрдирд░рд╛рд╡реГрддреНрддреАрд╕рд╛рдареА рдЕрд▓рд░реНрдЯ рд╕реЗрдЯ рдХреЗрд▓реЗ рдЧреЗрд▓реЗ. рдЖрддрд╛ рдорд╛рдЭреЗ рдХрд╛рд░реНрдп Gmail рдордзреНрдпреЗ Airflow рдХрдбреВрди >90k рдИрдореЗрд▓ рдЖрд╣реЗрдд рдЖрдгрд┐ рд╡реЗрдм рдореЗрд▓ рдереВрдерди рдПрдХрд╛ рд╡реЗрд│реА 100 рд╣реВрди рдЕрдзрд┐рдХ рдЙрдЪрд▓рдгреНрдпрд╛рд╕ рдЖрдгрд┐ рд╣рдЯрд╡рд┐рдгреНрдпрд╛рд╕ рдирдХрд╛рд░ рджреЗрддреЗ.
рдЖрдгрдЦреА рддреЛрдЯреЗ:
рдЕрдкрд╛рдЪреЗ рдПрдЕрд░рдлреНрд▓реЛ рдкрд┐рдЯрдлреЗрд▓реНрд╕
рдЕрдзрд┐рдХ рдСрдЯреЛрдореЗрд╢рди рд╕рд╛рдзрдиреЗ
рдЖрдкрд▓реНрдпрд╛ рд╣рд╛рддрд╛рдВрдиреА рдирд╡реНрд╣реЗ рддрд░ рдЖрдкрд▓реНрдпрд╛ рдбреЛрдХреНрдпрд╛рдиреЗ рдЕрдзрд┐рдХ рдХрд╛рд░реНрдп рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА, рдПрдЕрд░рдлреНрд▓реЛрдиреЗ рдЖрдкрд▓реНрдпрд╛рд╕рд╛рдареА рд╣реЗ рддрдпрд╛рд░ рдХреЗрд▓реЗ рдЖрд╣реЗ:
рдЖрд░рдИрдПрд╕рдЯреА API - рддреНрдпрд╛рдЪреНрдпрд╛рдХрдбреЗ рдЕрдЬреВрдирд╣реА рдкреНрд░рд╛рдпреЛрдЧрд┐рдХ рд╕реНрдерд┐рддреА рдЖрд╣реЗ, рдЬреА рддреНрдпрд╛рд▓рд╛ рдХрд╛рдо рдХрд░рдгреНрдпрд╛рдкрд╛рд╕реВрди рд░реЛрдЦрдд рдирд╛рд╣реА. рддреНрдпрд╛рджреНрд╡рд╛рд░реЗ, рддреБрдореНрд╣реА рдХреЗрд╡рд│ рдбреЕрдЧреНрдЬ рдЖрдгрд┐ рдЯрд╛рд╕реНрдХрдЪреА рдорд╛рд╣рд┐рддреА рдорд┐рд│рд╡реВ рд╢рдХрдд рдирд╛рд╣реА, рддрд░ рдбреЕрдЧ рдерд╛рдВрдмрд╡реВ/рд╕реБрд░реВ рдХрд░реВ рд╢рдХрддрд╛, рдбреАрдПрдЬреА рд░рди рдХрд┐рдВрд╡рд╛ рдкреВрд▓ рддрдпрд╛рд░ рдХрд░реВ рд╢рдХрддрд╛.CLI - рдХрдорд╛рдВрдб рд▓рд╛рдЗрдирджреНрд╡рд╛рд░реЗ рдЕрдиреЗрдХ рд╕рд╛рдзрдиреЗ рдЙрдкрд▓рдмреНрдз рдЖрд╣реЗрдд рдЬреА рдлрдХреНрдд WebUI рджреНрд╡рд╛рд░реЗ рд╡рд╛рдкрд░рдгреНрдпрд╛рд╕рд╛рдареА рдЧреИрд░рд╕реЛрдпреАрдЪреА рдирд╛рд╣реАрдд, рдкрд░рдВрддреБ рд╕рд╛рдорд╛рдиреНрдпрддрдГ рдЕрдиреБрдкрд╕реНрдерд┐рдд рдЖрд╣реЗрдд. рдЙрджрд╛рд╣рд░рдгрд╛рд░реНрде:backfill
рдХрд╛рд░реНрдп рдЙрджрд╛рд╣рд░рдгреЗ рд░реАрд╕реНрдЯрд╛рд░реНрдЯ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдЖрд╡рд╢реНрдпрдХ рдЖрд╣реЗ.
рдЙрджрд╛рд╣рд░рдгрд╛рд░реНрде, рд╡рд┐рд╢реНрд▓реЗрд╖рдХ рдЖрд▓реЗ рдЖрдгрд┐ рдореНрд╣рдгрд╛рд▓реЗ: тАЬрдЖрдгрд┐ рдХреЙрдореНрд░реЗрдб, рддреБрдордЪреНрдпрд╛рдХрдбреЗ 1 рддреЗ 13 рдЬрд╛рдиреЗрд╡рд╛рд░реАрдЪреНрдпрд╛ рдбреЗрдЯрд╛рдордзреНрдпреЗ рдореВрд░реНрдЦрдкрдгрд╛ рдЖрд╣реЗ! рджреБрд░реБрд╕реНрдд рдХрд░рд╛, рджреБрд░реБрд╕реНрдд рдХрд░рд╛, рджреБрд░реБрд╕реНрдд рдХрд░рд╛, рджреБрд░реБрд╕реНрдд рдХрд░рд╛!" рдЖрдгрд┐ рдЖрдкрдг рдЕрд╕реЗ рд╣реЙрдм рдЖрд╣рд╛рдд:airflow backfill -s '2020-01-01' -e '2020-01-13' orders
- рдмреЗрд╕ рд╕реЗрд╡рд╛:
initdb
,resetdb
,upgradedb
,checkdb
. run
, рдЬреЗ рддреБрдореНрд╣рд╛рд▓рд╛ рдПрдХ рдЙрджрд╛рд╣рд░рдг рдХрд╛рд░реНрдп рдЪрд╛рд▓рд╡рд┐рдгреНрдпрд╛рд╕ рдЖрдгрд┐ рд╕рд░реНрд╡ рдЕрд╡рд▓рдВрдмрдирд╛рдВрд╡рд░ рд╕реНрдХреЛрдЕрд░ рдХрд░рдгреНрдпрд╛рд╕ рдЕрдиреБрдорддреА рджреЗрддреЗ. рд╢рд┐рд╡рд╛рдп, рдЖрдкрдг рддреЗ рджреНрд╡рд╛рд░реЗ рдЪрд╛рд▓рд╡реВ рд╢рдХрддрд╛LocalExecutor
, рддреБрдордЪреНрдпрд╛рдХрдбреЗ рд╕реЗрд▓реЗрд░реА рдХреНрд▓рд╕реНрдЯрд░ рдЕрд╕рд▓рд╛ рддрд░реАрд╣реА.- рдЬрд╡рд│рдЬрд╡рд│ рд╕рдорд╛рди рдЧреЛрд╖реНрдЯ рдХрд░рддреЗ
test
, рдлрдХреНрдд рдмреЗрд╕ рдордзреНрдпреЗ рджреЗрдЦреАрд▓ рдХрд╛рд╣реАрд╣реА рд▓рд┐рд╣рд┐рдд рдирд╛рд╣реА. connections
рд╢реЗрд▓рдордзреВрди рдореЛрдареНрдпрд╛ рдкреНрд░рдорд╛рдгрд╛рдд рдХрдиреЗрдХреНрд╢рди рддрдпрд╛рд░ рдХрд░рдгреНрдпрд╛рд╕ рдЕрдиреБрдорддреА рджреЗрддреЗ.
рдкрд╛рдпрдерди рдПрдкреАрдЖрдп - рд╕рдВрд╡рд╛рдж рд╕рд╛рдзрдгреНрдпрд╛рдЪрд╛ рдПрдХ рдРрд╡рдЬреА рд╣рд╛рд░реНрдбрдХреЛрд░ рдорд╛рд░реНрдЧ, рдЬреЛ рдкреНрд▓рдЧрдЗрдиреНрд╕рд╕рд╛рдареА рдЖрд╣реЗ рдЖрдгрд┐ рддреНрдпрд╛рдд рд▓рд╣рд╛рди рд╣рд╛рддрд╛рдВрдиреА рдЭреБрдВрдбреВ рджреЗрдд рдирд╛рд╣реА. рдкрдг рдЖрдореНрд╣рд╛рд▓рд╛ рдЬрд╛рдгреНрдпрд╛рдкрд╛рд╕реВрди рдХреЛрдг рд░реЛрдЦрдгрд╛рд░/home/airflow/dags
, рдзрд╛рд╡рдгреЗipython
рдЖрдгрд┐ рдЧреЛрдВрдзрд│ рд╕реБрд░реВ? рдЖрдкрдг, рдЙрджрд╛рд╣рд░рдгрд╛рд░реНрде, рдЦрд╛рд▓реАрд▓ рдХреЛрдбрд╕рд╣ рд╕рд░реНрд╡ рдХрдиреЗрдХреНрд╢рди рдирд┐рд░реНрдпрд╛рдд рдХрд░реВ рд╢рдХрддрд╛:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d)
- рдПрдЕрд░рдлреНрд▓реЛ рдореЗрдЯрд╛рдбреЗрдЯрд╛рдмреЗрд╕рд╢реА рдХрдиреЗрдХреНрдЯ рдХрд░рдд рдЖрд╣реЗ. рдореА рддреНрдпрд╛рд╡рд░ рд▓рд┐рд╣рд┐рдгреНрдпрд╛рдЪреА рд╢рд┐рдлрд╛рд░рд╕ рдХрд░рдд рдирд╛рд╣реА, рдкрд░рдВрддреБ рд╡рд┐рд╡рд┐рдз рд╡рд┐рд╢рд┐рд╖реНрдЯ рдореЗрдЯреНрд░рд┐рдХреНрд╕рд╕рд╛рдареА рдХрд╛рд░реНрдп рд╕реНрдерд┐рддреА рдорд┐рд│рд╡рдгреЗ рдХреЛрдгрддреНрдпрд╛рд╣реА API рд╡рд╛рдкрд░рдгреНрдпрд╛рдкреЗрдХреНрд╖рд╛ рдмрд░реЗрдЪ рдЬрд▓рдж рдЖрдгрд┐ рд╕реЛрдкреЗ рдЕрд╕реВ рд╢рдХрддреЗ.
рдЖрдкрдг рдЕрд╕реЗ рдореНрд╣рдгреВрдпрд╛ рдХреА рдЖрдкрд▓реА рд╕рд░реНрд╡ рдХрд╛рд░реНрдпреЗ рдЙрджрд╛рддреНрдд рдирд╕рддрд╛рдд, рдкрд░рдВрддреБ рддреА рдХрдзреАрдХрдзреА рдкрдбреВ рд╢рдХрддрд╛рдд рдЖрдгрд┐ рд╣реЗ рд╕рд╛рдорд╛рдиреНрдп рдЖрд╣реЗ. рдкрд░рдВрддреБ рдХрд╛рд╣реА рдЕрдбрдерд│реЗ рдЖрдзреАрдЪ рд╕рдВрд╢рдпрд╛рд╕реНрдкрдж рдЖрд╣реЗрдд рдЖрдгрд┐ рддреЗ рддрдкрд╛рд╕рдгреЗ рдЖрд╡рд╢реНрдпрдХ рдЖрд╣реЗ.
рдПрд╕рдХреНрдпреВрдПрд▓ рд╕рд╛рд╡рдз рд░рд╣рд╛!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
рд╕рдВрджрд░реНрдн
рдЖрдгрд┐ рдЕрд░реНрдерд╛рддрдЪ, Google рдЬрд╛рд░реА рдХреЗрд▓реНрдпрд╛рдкрд╛рд╕реВрди рдкрд╣рд┐рд▓реЗ рджрд╣рд╛ рджреБрд╡реЗ рдорд╛рдЭреНрдпрд╛ рдмреБрдХрдорд╛рд░реНрдХрдордзреАрд▓ рдПрдЕрд░рдлреНрд▓реЛ рдлреЛрд▓реНрдбрд░рдордзреАрд▓ рд╕рд╛рдордЧреНрд░реА рдЖрд╣реЗрдд.
рдЕрдкрд╛рдЪреЗ рдПрдЕрд░рдлреНрд▓реЛ рджрд╕реНрддрдРрд╡рдЬреАрдХрд░рдг - рдЕрд░реНрдерд╛рддрдЪ, рдЖрдкрдг рдХрд╛рд░реНрдпрд╛рд▓рдпрд╛рдкрд╛рд╕реВрди рд╕реБрд░реБрд╡рд╛рдд рдХреЗрд▓реА рдкрд╛рд╣рд┐рдЬреЗ. рджрд╕реНрддрдРрд╡рдЬреАрдХрд░рдг, рдкрдг рд╕реВрдЪрдирд╛ рдХреЛрдг рд╡рд╛рдЪрддреЛ?рдЪрд╛рдВрдЧрд▓рд╛ рд╕рд░рд╛рд╡ - рдмрд░рдВ, рдХрдореАрддрдХрдореА рдирд┐рд░реНрдорд╛рддреНрдпрд╛рдВрдЪреНрдпрд╛ рд╢рд┐рдлрд╛рд░рд╕реА рд╡рд╛рдЪрд╛.рдПрдЕрд░рдлреНрд▓реЛ UI - рдЕрдЧрджреА рд╕реБрд░реБрд╡рд╛рдд: рдЪрд┐рддреНрд░рд╛рдВрдордзреАрд▓ рд╡рд╛рдкрд░рдХрд░реНрддрд╛ рдЗрдВрдЯрд░рдлреЗрд╕Apache Airflow рдЪреНрдпрд╛ рдкреНрд░рдореБрдЦ рд╕рдВрдХрд▓реНрдкрдирд╛ рд╕рдордЬреВрди рдШреЗрдгреЗ - рдореВрд▓рднреВрдд рд╕рдВрдХрд▓реНрдкрдирд╛ рдЪрд╛рдВрдЧрд▓реНрдпрд╛ рдкреНрд░рдХрд╛рд░реЗ рд╡рд░реНрдгрди рдХреЗрд▓реНрдпрд╛ рдЖрд╣реЗрдд, рдЬрд░ (рдЕрдЪрд╛рдирдХ!) рддреБрдореНрд╣рд╛рд▓рд╛ рдорд╛рдЭреНрдпрд╛рдХрдбреВрди рдХрд╛рд╣реАрддрд░реА рд╕рдордЬрд▓реЗ рдирд╛рд╣реА.Tianlong's Blog тАФ рдПрдЕрд░рдлреНрд▓реЛ рд╕рд░реНрд╡реНрд╣рд░/рдХреНрд▓рд╕реНрдЯрд░ рдХрд╕реЗ рддрдпрд╛рд░ рдХрд░рд╛рд╡реЗ рдпрд╛рдмрджреНрджрд▓ рдорд╛рд░реНрдЧрджрд░реНрд╢рдХ - рдПрдЕрд░рдлреНрд▓реЛ рдХреНрд▓рд╕реНрдЯрд░ рд╕реЗрдЯ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдПрдХ рд▓рд╣рд╛рди рдорд╛рд░реНрдЧрджрд░реНрд╢рдХ.Lyft рд╡рд░ Apache Airflow рдЪрд╛рд▓реВ рдЖрд╣реЗ - рдЬрд╡рд│рдЬрд╡рд│ рд╕рдорд╛рди рдордиреЛрд░рдВрдЬрдХ рд▓реЗрдЦ, рдХрджрд╛рдЪрд┐рдд рдЕрдзрд┐рдХ рдФрдкрдЪрд╛рд░рд┐рдХрддрд╛ рдЖрдгрд┐ рдХрдореА рдЙрджрд╛рд╣рд░рдгреЗ рд╡рдЧрд│рддрд╛.рдЕрдкрд╛рдЪреЗ рдПрдЕрд░рдлреНрд▓реЛ рд╕реЗрд▓реЗрд░реА рдХрд╛рдордЧрд╛рд░рд╛рдВрд╡рд░ рдиреЛрдХрд▒реНрдпрд╛рдВрдЪреЗ рд╡рд┐рддрд░рдг рдХрд╕реЗ рдХрд░рддреЗ - рд╕реЗрд▓реЗрд░реАрдЪреНрдпрд╛ рд╕рдВрдпреЛрдЧрд╛рдиреЗ рдХрд╛рдо рдХрд░рдгреНрдпрд╛рдмрджреНрджрд▓.рдЕрдкрд╛рдЪреЗ рдПрдЕрд░рдлреНрд▓реЛрдордзреНрдпреЗ DAG рд▓реЗрдЦрди рд╕рд░реНрд╡реЛрддреНрддрдо рдкрджреНрдзрддреА - рдХрд╛рд░реНрдпрд╛рдВрдЪреА рдХреНрд╖рдорддрд╛, рддрд╛рд░рдЦреЗрдРрд╡рдЬреА рдЖрдпрдбреАрджреНрд╡рд╛рд░реЗ рд▓реЛрдб рдХрд░рдгреЗ, рдкрд░рд┐рд╡рд░реНрддрди, рдлрд╛рдЗрд▓ рд╕рдВрд░рдЪрдирд╛ рдЖрдгрд┐ рдЗрддрд░ рдордиреЛрд░рдВрдЬрдХ рдЧреЛрд╖реНрдЯреА.рдЕрдкрд╛рдЪреЗ рдПрдЕрд░рдлреНрд▓реЛрдордзреНрдпреЗ рдЕрд╡рд▓рдВрдмрд┐рддреНрд╡ рд╡реНрдпрд╡рд╕реНрдерд╛рдкрд┐рдд рдХрд░рдгреЗ - рдХрд╛рд░реНрдпрд╛рдВрдЪреЗ рдЕрд╡рд▓рдВрдмрди рдЖрдгрд┐ рдЯреНрд░рд┐рдЧрд░ рдирд┐рдпрдо, рдЬреНрдпрд╛рдЪрд╛ рдореА рдлрдХреНрдд рдкрд╛рд╕рд┐рдВрдЧрдордзреНрдпреЗ рдЙрд▓реНрд▓реЗрдЦ рдХреЗрд▓рд╛ рдЖрд╣реЗ.рдПрдЕрд░рдлреНрд▓реЛ: рдЬреЗрд╡реНрд╣рд╛ рддреБрдордЪрд╛ DAG рд╢реЗрдбреНрдпреВрд▓рдЪреНрдпрд╛ рдорд╛рдЧреЗ рдЕрд╕рддреЛ - рд╢реЗрдбреНрдпреБрд▓рд░рдордзреАрд▓ рдХрд╛рд╣реА "рдЙрджреНрджреЗрд╢рд╛рдиреБрд╕рд╛рд░ рдХрд╛рд░реНрдп" рдХрд╕реЗ рдорд╛рдд рдХрд░рд╛рд╡реЗ, рдЧрдорд╛рд╡рд▓реЗрд▓рд╛ рдбреЗрдЯрд╛ рд▓реЛрдб рдХрд░рд╛ рдЖрдгрд┐ рдХрд╛рд░реНрдпрд╛рдВрдирд╛ рдкреНрд░рд╛рдзрд╛рдиреНрдп рдХрд╕реЗ рджреНрдпрд╛.Apache Airflow рд╕рд╛рдареА рдЙрдкрдпреБрдХреНрдд SQL рдХреНрд╡реЗрд░реА тАФ рдПрдЕрд░рдлреНрд▓реЛ рдореЗрдЯрд╛рдбреЗрдЯрд╛рд▓рд╛ рдЙрдкрдпреБрдХреНрдд SQL рдХреНрд╡реЗрд░реА.Apache Airflow рд╕рд╣ рд╡рд░реНрдХрдлреНрд▓реЛ рд╡рд┐рдХрд╕рд┐рдд рдХрд░рдгреНрдпрд╛рд╕ рд╕реБрд░реБрд╡рд╛рдд рдХрд░рд╛ - рд╕рд╛рдиреБрдХреВрд▓ рд╕реЗрдиреНрд╕рд░ рддрдпрд╛рд░ рдХрд░рдгреНрдпрд╛рдмрджреНрджрд▓ рдПрдХ рдЙрдкрдпреБрдХреНрдд рд╡рд┐рднрд╛рдЧ рдЖрд╣реЗ.Presto рдЖрдгрд┐ Airflow рд╕рд╣ AWS рд╡рд░ Fetchr рдбреЗрдЯрд╛ рд╕рд╛рдпрдиреНрд╕ рдЗрдиреНрдлреНрд░рд╛ рддрдпрд╛рд░ рдХрд░рдгреЗ тАФ рдбреЗрдЯрд╛ рд╕рд╛рдпрдиреНрд╕рд╕рд╛рдареА AWS рд╡рд░ рдкрд╛рдпрд╛рднреВрдд рд╕реБрд╡рд┐рдзрд╛ рддрдпрд╛рд░ рдХрд░рдгреНрдпрд╛рдмрджреНрджрд▓ рдПрдХ рдордиреЛрд░рдВрдЬрдХ рдЫреЛрдЯреА рдЯреАрдк.рдПрдЕрд░рдлреНрд▓реЛ DAGs рдбреАрдмрдЧ рдХрд░рддрд╛рдирд╛ рддрдкрд╛рд╕рдгреНрдпрд╛рд╕рд╛рдареА 7 рд╕рд╛рдорд╛рдиреНрдп рддреНрд░реБрдЯреА - рд╕рд╛рдорд╛рдиреНрдп рдЪреБрдХрд╛ (рдЬреЗрд╡реНрд╣рд╛ рдХреЛрдгреАрддрд░реА рд╕реВрдЪрдирд╛ рд╡рд╛рдЪрдд рдирд╛рд╣реА).Apache Airflow рд╡рд╛рдкрд░реВрди рдкрд╛рд╕рд╡рд░реНрдб рд╕рд╛рдард╡рд╛ рдЖрдгрд┐ рдкреНрд░рд╡реЗрд╢ рдХрд░рд╛ - рддреБрдореНрд╣реА рдлрдХреНрдд рдХрдиреЗрдХреНрд╢рди рд╡рд╛рдкрд░реВ рд╢рдХрддрд╛ рддрд░реАрд╣реА рд▓реЛрдХ рдкрд╛рд╕рд╡рд░реНрдб рд╕рд╛рдард╡реВрди рдареЗрд╡рддрд╛рдд рдХрд╕реЗ рддреЗ рд╕реНрдорд┐рдд рдХрд░рд╛.рдкрд╛рдпрдерди рдЖрдгрд┐ рдЕрдкрд╛рдЪреЗ рдПрдЕрд░рдлреНрд▓реЛрдЪрд╛ рдЭреЗрди - рдЕрдВрддрд░реНрдирд┐рд╣рд┐рдд рдбреАрдПрдЬреА рдлреЙрд░рд╡рд░реНрдбрд┐рдВрдЧ, рдлрдВрдХреНрд╢рдиреНрд╕рдордзреНрдпреЗ рдХреЙрдиреНрдЯреЗрдХреНрд╕реНрдЯ рдереНрд░реЛрдЗрдВрдЧ, рдкреБрдиреНрд╣рд╛ рдЕрд╡рд▓рдВрдмрдирд╛рдВрдмрджреНрджрд▓ рдЖрдгрд┐ рдЯрд╛рд╕реНрдХ рд▓реЙрдиреНрдЪ рд╡рдЧрд│рдгреНрдпрд╛рдмрджреНрджрд▓.рд╡рд╛рдпреБрдкреНрд░рд╡рд╛рд╣: рдХрдореА рдЬреНрдЮрд╛рдд рдЯрд┐рдкрд╛, рдпреБрдХреНрддреНрдпрд╛ рдЖрдгрд┐ рд╕рд░реНрд╡реЛрддреНрддрдо рдкрджреНрдзрддреА - рд╡рд╛рдкрд░рд╛рдмрджреНрджрд▓default arguments
╨╕params
рдЯреЗрдореНрдкрд▓реЗрдЯреНрд╕, рддрд╕реЗрдЪ рд╡реНрд╣реЗрд░рд┐рдПрдмрд▓реНрд╕ рдЖрдгрд┐ рдХрдиреЗрдХреНрд╢рдиреНрд╕рдордзреНрдпреЗ.рдПрдЕрд░рдлреНрд▓реЛ рд╢реЗрдбреНрдпреВрд▓рд░ рдкреНрд░реЛрдлрд╛рдЗрд▓ рдХрд░рдгреЗ - рдкреНрд▓реЕрдирд░ рдПрдЕрд░рдлреНрд▓реЛ 2.0 рд╕рд╛рдареА рдХрд╢реА рддрдпрд╛рд░реА рдХрд░рдд рдЖрд╣реЗ рдпрд╛рдЪреА рдПрдХ рдХрдерд╛.рдбреЙрдХрд░-рдХрдВрдкреЛрдЬрдордзреНрдпреЗ 3 рд╕реЗрд▓реЗрд░реА рдХрд╛рдордЧрд╛рд░рд╛рдВрд╕рд╣ Apache Airflow - рдордзреНрдпреЗ рдЖрдордЪреЗ рдХреНрд▓рд╕реНрдЯрд░ рддреИрдирд╛рдд рдХрд░рдгреНрдпрд╛рдмрджреНрджрд▓ рдереЛрдбрд╛ рдЬреБрдирд╛ рд▓реЗрдЦdocker-compose
.рдПрдЕрд░рдлреНрд▓реЛ рд╕рдВрджрд░реНрдн рд╡рд╛рдкрд░реВрди 4 рдЯреЗрдореНрдкреНрд▓реЗрдЯрд┐рдВрдЧ рдХрд╛рд░реНрдпреЗ - рдЯреЗрдореНрдкрд▓реЗрдЯреНрд╕ рдЖрдгрд┐ рд╕рдВрджрд░реНрдн рдлреЙрд░рд╡рд░реНрдбрд┐рдВрдЧ рд╡рд╛рдкрд░реВрди рдбрд╛рдпрдиреЕрдорд┐рдХ рдХрд╛рд░реНрдпреЗ.рдПрдЕрд░рдлреНрд▓реЛрдордзреНрдпреЗ рддреНрд░реБрдЯреА рд╕реВрдЪрдирд╛ тАФ рдореЗрд▓ рдЖрдгрд┐ рд╕реНрд▓реЕрдХрджреНрд╡рд╛рд░реЗ рдорд╛рдирдХ рдЖрдгрд┐ рд╕рд╛рдиреБрдХреВрд▓ рд╕реВрдЪрдирд╛.рдПрдЕрд░рдлреНрд▓реЛ рд╡рд░реНрдХрд╢реЙрдк: рдХреНрд░реЕрдЪрд╢рд┐рд╡рд╛рдп рдЬрдЯрд┐рд▓ DAGs - рд╢рд╛рдЦрд╛ рдХрд╛рд░реНрдпреЗ, рдореЕрдХреНрд░реЛ рдЖрдгрд┐ XCom.
рдЖрдгрд┐ рд▓реЗрдЦрд╛рдд рд╡рд╛рдкрд░рд▓реЗрд▓реЗ рджреБрд╡реЗ:
рдореЕрдХреНрд░реЛ рд╕рдВрджрд░реНрдн - рдЯреЗрдореНрдкрд▓реЗрдЯреНрд╕рдордзреНрдпреЗ рд╡рд╛рдкрд░рдгреНрдпрд╛рд╕рд╛рдареА рдкреНрд▓реЗрд╕рд╣реЛрд▓реНрдбрд░реНрд╕ рдЙрдкрд▓рдмреНрдз рдЖрд╣реЗрдд.рд╕рд╛рдорд╛рдиреНрдп рдиреБрдХрд╕рд╛рди - рд╣рд╡реЗрдЪрд╛ рдкреНрд░рд╡рд╛рд╣ - рдбреЕрдЧ рддрдпрд╛рд░ рдХрд░рддрд╛рдирд╛ рд╕рд╛рдорд╛рдиреНрдп рдЪреБрдХрд╛.puckel/docker-airflow: Docker Apache Airflow -docker-compose
рдкреНрд░рдпреЛрдЧ, рдбреАрдмрдЧрд┐рдВрдЧ рдЖрдгрд┐ рдЕрдзрд┐рдХрд╕рд╛рдареА.python-telegram-bot/python-telegram-bot: рдЖрдореНрд╣реА рддреБрдореНрд╣рд╛рд▓рд╛ рдПрдХ рдЖрд╡рд░рдг рдмрдирд╡рд▓реЗ рдЖрд╣реЗ рдЬреЗ рддреБрдореНрд╣реА рдирд╛рдХрд╛рд░реВ рд╢рдХрдд рдирд╛рд╣реА тАФ рдЯреЗрд▓рд┐рдЧреНрд░рд╛рдо REST API рд╕рд╛рдареА рдкрд╛рдпрдерди рд░реЕрдкрд░.
рд╕реНрддреНрд░реЛрдд: www.habr.com