áááºá¹ááá¬áá«á áá»áœááºá¯ááºááẠVezet áá¯áá¹ááá®á¡á¯ááºá á¯á Analytics áá¬áá០áá±áá¬á¡ááºáá»ááºáá®áᬠDmitry Logvinenko ááŒá áºáá«áááºá
ETL áá¯ááºáááºážá ááºáá»á¬áž áá±á¬áºáá±á¬ááºáááºá¡ááœáẠá¡á¶á·ááŒááœááºáá±á¬ááºážáá±á¬ áááááá¬áá áºáá¯ááŒá áºááá·áº Apache Airflow ááᯠááá·áºá¡á¬áž ááŒá±á¬ááŒáá«áááºá ááá¯á·áá±á¬áº Airflow ááẠá áœááºá á¯á¶áááŸáá·áº áááºá á¯á¶áá¯á¶ážáá±á¬ááŒá±á¬áá·áº áááºááẠáá±áá¬á á®ážáááºážááŸá¯ááœáẠááá«áááºáá±á¬áºáááºáž áááºážááᯠá¡áá®ážáááºááŒáá·áºááŸá¯ááá·áºáá±á¬áºáááºáž áááºááá·áºáá¯ááºáááºážá ááºááá¯áááᯠá¡áá»áááºá¡áá«á¡ááá¯áẠá áááºáá¯ááºáá±á¬ááºáááºááŸáá·áº áááºážááá¯á·ááá¯ááºáá±á¬ááºááŸá¯ááᯠá á±á¬áá·áºááŒáá·áºááẠááá¯á¡ááºáá«áááºá
áá¯ááºáá«áááºá áá«ááŒá±á¬ááŒáá¯á¶áááºáááá²á áááá¯ááááºááŸá¬ áá¯ááºááœá±á áááºáá¬ážááŒááºáá¬ááºáá¯á¶ááœá±áá²á· á¡ááŒá¶ááŒá¯áá»ááºááœá± á¡áá»á¬ážááŒá®ážáá«ááŸááá«áááºá
Airflow / Wikimedia Commons áá°áá±á¬ á
áá¬ážáá¯á¶ážááᯠGoogle ááœáẠááœá±á·ááááºáááºá
áá¬áááá¬
áááá«ááºáž á¡ááá á¡ááá¯ááºážááá±á¬á· áááºááœá±á· (áá®á¡áá¯áá® á¡áááºážáááº)á áá¬ááŒá±á¬áá·áº áá«ááá¯á· (áááºážáá²á·) á¡á á¯á¡áá±ážááᯠá á¯á ááºážááŒááºážá á¡ááŒá±áá¶ááá±á¬ááá¬áž áá»áœááºá¯ááºááá¯á·ááẠá¡áá¯ááºáá»á¬ážááᯠáá¯ááºáá±ážáá«áááºá áááºážá¡ááŒá±á¬ááºážá¡áááºážááẠáá»áœááºá¯ááºááá¯á·ááẠáááºáá¯ááºáááºááá¯ážááᯠáááºážááœááºáááºá áá»áááºáááºááŸá¯áá»á¬ážá áá»áááºáá»á¬ážááŸáá·áº á¡ááŒá¬ážááŒá±á¬ááºážáá²ááŸá¯áá»á¬áž á áááºááŒáá¯ááºá¡á±á¬áºááá±áá¬ááᯠááœá²ááŒááºážá áááºááŒá¬ááŒááºážá ááááºážááŸá¬áá¬ážá áááºááá»áŒážááŒá¯áá»
á¡ááá¯ááºážáá±á¬ááºáá¯á¶ážá á¡ááá¯ážá¡áá¬ážááŸáá·áºá¡áá»ááºá¡ááẠááá¯ážáá¬áž
áááá«ááºáž
Apache Airflow ááẠDjango ááŸáá·áºáá°áááºá
- python áá²á·áá±ážáá¬ážáááºá
- áá±á¬ááºážááœááºáá²á· admin panel áá áºáá¯ááŸááá«áááºá
- á¡ááá·áºá¡áááºáááŸá áá»á²á·ááá¯ááºáááºá
- áá¬ááá¬á ááŒá¬ážáá¬ážáá±á¬ áááºááœááºáá»ááºááŒáá·áº ááŒá¯áá¯ááºáá¬ážááŒááºážááŒá áºáááºá á¡ááá¡áá»ááá¯ááá±á¬áº (ááá¹áá®áá«áá áá±ážáá¬ážááá²á·ááá¯á·)á
- á¡ááá·áºá¡áááºáááŸá á ááºáá»á¬ážáá±á«áºááœáẠá¡áá¯ááºáá¯ááºááŒááºážááŸáá·áº á á±á¬áá·áºááŒáá·áºááŒááºážáá¯ááºáááºážáá¬áááºáá»á¬áž (áááá® / Kubernetes á¡áá»á¬ážá¡ááŒá¬ážááŸáá·áº ááá·áºá¡ááá áááºá ááá·áºááá¯ááœáá·áºááŒá¯áááº)
- á¡ááœááºááœááºáá°áá±á¬ Python áá¯ááºááŸáá±ážáá¬ážáááºááŸáá·áºáá¬ážáááºááẠdynamic workflow áá»áá¯ážáááºááŸáá·áºá¡áá°
- ááŸáá·áº á¡áááºááá·áºáá¯ááºáá¬ážáá±á¬ á¡á áááºá¡ááá¯ááºážáá»á¬ážááŸáá·áº á¡áááºáá¯ááºááááºá¡ááºáá»á¬áž (á¡ááœááºááá¯ážááŸááºážáá±á¬) ááᯠá¡áá¯á¶ážááŒá¯á áááºááá·áºáá±áá¬áá±á·á áºááŸáá·áº API áá»á¬ážááá¯áááᯠáá»áááºáááºááá¯ááºá á±áá«áááºá
áá»áœááºá¯ááºááá¯á·ááẠá€áá²á·ááá¯á·áá±á¬ Apache Airflow ááá¯á¡áá¯á¶ážááŒá¯áááº-
- áá»áœááºá¯ááºááá¯á·ááẠá¡áá»áá¯ážáá»áá¯ážáá±á¬áááºážááŒá áºáá»á¬ážá០á¡áá»ááºá¡áááºáá»á¬ážááᯠá á¯áá±á¬ááºážáá«ááẠ(áá»á¬ážá áœá¬áá±á¬ SQL Server ááŸáá·áº PostgreSQL ááŒá áºáááºáá»á¬ážá á¡ááºááºáááºááá áºáá»á¬ážáá«ááá·áº API á¡áá»áá¯ážáá»áá¯ážá DWH ááŸáá·áº ODS ááá¯á·ááœááºááẠ1C) (áá»áœááºá¯ááºááá¯á·ááœáẠVertica ááŸáá·áº Clickhouse ááŸááááº)á
- áááºáá±á¬ááºá¡ááá·áºááŒáá·áºáá²á
cron
ODS ááœáẠáá±áá¬á á¯á ááºážááŸá¯ áá¯ááºáááºážá ááºáá»á¬ážááᯠá áááºáᬠáááºážááá¯á·á ááŒá¯ááŒááºááááºážááááºážááŸá¯áá»á¬ážááá¯áááºáž á á±á¬áá·áºááŒáá·áºáááºá
áááŒá¬áá±ážáá®á¡ááá áá»áœááºá¯ááºááá¯á·áááá¯á¡ááºáá»ááºáá»á¬ážááᯠ32 cores ááŸáá·áº 50 GB RAM áá«ááŸááá±á¬ áá¬áá¬áááºáá±ážáá áºáá¯á ááŒáá·áºáááºážáá±ážáá²á·áááºá Airflow ááœááºá áááºážááẠá¡áá¯ááºáá¯ááºáááº-
- бПлее 200 ááẠ(ááááºáá±á¬á· áá»áœááºáá±á¬áºááá¯á· á¡áá¯ááºááœá±ááᯠáá¯ááºááá¯ážáá¬ážáá²á· workflows)á
- áá áºáá¯áá»ááºážá á®ááœááºáá»ááºážáá»áŸ á¡áá¯áẠáá,
- á€áá±á¬ááºážááŒááºááŸá¯ááẠ(áá»ááºážáá»áŸá¡á¬ážááŒáá·áºáááºáž) á áááºááẠáá áºáá¬áá®áá áºáá«.
áá»á²á·ááœááºáá¯á¶ááŸáá·áº áááºáááºá á¡á±á¬ááºááœáẠáá»áœááºá¯áẠáá±ážáá«áááºá ááá¯á·áá±á¬áº ááᯠááŒá±ááŸááºážááá·áº ÃŒber-problem ááᯠáááºááŸááºááŒáá«á áá¯á·á
áá°á SQL Server áá¯á¶ážáá¯ááŸáááŒá®áž áá áºáá¯á á®ááœáẠáá±áá¬áá±á·á Ạ50 áá«ááŸáááẠ- ááá±á¬áá»ááºáá áºáá¯áá¥ááá¬á¡áá®ážáá®ážááœáẠáááºážááá¯á·ááœááºáá°áá®áá±á¬ááœá²á·á ááºážáá¯á¶ (mua-ha-ha)á ááá¯ááá¯áááºááŸá¬ áá áºáá¯á á®ááœáẠá¡ááŸá¬á á¬ááá¬ážáá áºáá¯ááŸáááẠ(áá¶áá±á¬ááºážáááºááŸá¬á áááºážááœáẠááá¬ážáá áºáá¯ááŸááááºá á¡áááºááᯠáááºááá·áºáá¯ááºáááºážááœááºáááᯠááá·áºááœááºážááá¯ááºáááºá) áá»áœááºá¯ááºááá¯á·ááẠáááºáá±á¬ááºááŸá¯áááºáááºáá»á¬áž (áááºážááŒá áºáá¬áá¬á á¡áááºážá¡ááŒá áºáá±áá¬áá±á·á áºá ETL á¡áá¯áẠID) ááá¯áá±á«ááºážááá·áºááŒááºážááŒáá·áº áá±áá¬ááá¯áá°áᬠáááºážááá¯á·ááᯠVertica áᯠááááºáá¬á áœá¬ááŒá±á¬áá«á
ááœá¬ážá áá¯á·!
á¡ááá á¡ááá¯ááºážááá±á¬á· áááºááœá±á· (áá®á¡áá¯áá® á¡áááºážáááº)á
áá¬ááŒá±á¬áá·áº áá«ááá¯á· (áááºážáá²á·)
áá
áºáááºááŒá®ážááœá±á ááŒá®ážáá¬áá±á¬á· ááá¯ážááá¯ážááŸááºážááŸááºážáá²á SQL
-schik ááẠáá¯ááŸá¬ážáááºáá®á¡áá±á¬ááºážááá¯ááºáá
áºáá¯ááœááºá áá»áœááºá¯ááºááá¯á·á¡ááœááºáááŸáááá¯ááºááá·áºáááááá¬ááŸá
áºáá¯ááá¯á¡áá¯á¶ážááŒá¯á ETL áá¯ááºáááºážá
ááºáá»á¬áž (áá±á«áº) áá±áá¬á
á®ážáááºážááŸá¯ááᯠááŸáá·áºá
á¬ážáá²á·áááº-
- Informatica áá«áá«á
ááºáᬠ- áááºážáááá¯ááºááá¯ááºáá¬á·ááºáá²á áááºážáááá¯ááºááá¯ááºáá¬ážááŸááºážááŒáá·áºá á¡ááœááºá¡áá»áá¯ážááŒá
áºááœááºážáá±á¬ á¡ááœááºááŒáá·áºááœá¬ážáá±á¬á
áá
áºá áá°á·áá²á· á
áœááºážáá±á¬ááºááá¯ááºááẠ1% ááᯠáá¯áá¬ážáááẠáá¬ážááŒá
áºáá²á·áááºá á¡áááºááŒá±á¬ááºá·? áá±á¬ááºážááŒá®á áááá¡áá»ááºá áááá ááŒáá·áºááœááºááŸá
áºáá»á¬ážáá®á áá
áºáá±áá¬áá¬ááŸá¬ áá®á¡ááºáá¬áá±á·á
áºá áá»áœááºá¯ááºááá¯á·ááᯠá
áááºááá¯ááºážááá¯ááºáᬠááá¡á¬ážááŒá
áºá
á±áá«áááºá áá¯áááá¡áá±ááŸáá·áºá ဠcontraption ááẠá¡ááœááºáááºážááŒááºáá±á¬ áá¯ááºáááºážá
ááºáá»á¬ážá áá±á«áááŒá®ážáá±á¬ á¡á
áááºá¡ááá¯ááºážááᯠááŒááºáááºá¡áá¯á¶ážááŒá¯ááŒááºážááŸáá·áº á¡ááŒá¬ážáá±á¬ á¡ááœááºá¡áá±ážááŒá®ážáá±á¬ áá¯ááºáááºáž-ááŸáá·áºááœááºáá»á¬ážá¡ááœáẠáá®ááá¯ááºážáá¯ááºáá¬ážáááºá Airbus A380 / year á áá±á¬ááºáá¶áá²á·ááá¯á· áá¯ááºáá»á
ááááºááŸáá·áº áááºáááºá áá»áœááºá¯ááºááá¯á· áá¬á០áááŒá±á¬ááá¯áá«á
ááááá¬ážáá«á áááºáá¬ážááŒááºáá¬ááºáá¯á¶ááẠá¡ááẠ30 ááŸá áºá¡á±á¬áẠáá°áá»á¬ážááᯠá¡áááºážááẠááááá¯ááºááá¯ááºáááºá
- SQL Server áá±á«ááºážá
ááºážááŒááºáž áá¬áᬠ- áá»áœááºá¯ááºááá¯á·ááẠáá»áœááºá¯ááºááá¯á·á á
á®áá¶ááááºážá¡ááœááºáž á
á®ážáááºážááŸá¯ááœáẠá€áá²áá±á¬áºááᯠá¡áá¯á¶ážááŒá¯áá²á·áááºá á¡ááŸááºáá±á¬á·á áá»áœááºá¯ááºááá¯á·ááẠSQL Server ááᯠá¡áá¯á¶ážááŒá¯áá¬ážááŒá®ážááŒá
áºááŒá®áž áááºážá ETL áááááá¬áá»á¬ážááᯠá¡áá¯á¶ážáááŒá¯ááŒááºážááẠáá
áºáááºážáááºážááŒáá·áº áá¯áá¹áááááŸááá±á áááºážááœááºá¡áá¬á¡á¬ážáá¯á¶ážáá±á¬ááºážáááº- á¡ááºáá¬áá±á·á
áºááŸá
áºáá¯á
áá¯á¶ážáááºááŸááááºá ááá¯ážáááºááŸá¯á¡á
á®áááºáá¶á
á¬áá»á¬ážááŒá
áºááẠ... ááá¯á·áá±á¬áºáá»áœááºá¯ááºááá¯á·áááºáá±á¬á·ááºáá²áá¯ááºáá¯ááºáá»á¬ážááá¯ááŸá
áºáááºááŒáááºá á€á¡ááœááºááŒá±á¬áá·áºááá¯ááºáá«á áá¬ážááŸááºáž
dtsx
(ááááºážáááºážááẠnode áá»á¬ážáá«áá±á¬ XML áá°áááº) áá»áœááºá¯ááºááá¯á· áááºááá¯ááºáááºá ááá¯á·áá±á¬áº á¡áááºááŒá±á¬áá·áºáááºážá áá¬áá¬áá áºáá¯á០áá áºáá¯ááá¯á· á á¬ážááœá²áá¬áá±á«ááºážáá»á¬ážá áœá¬ááᯠááœá²áááºááá¯ááºá á±ááá·áº Task Package áá áºáá¯ááᯠáááºááá¯á·ááŒá¯áá¯ááºááááºáááºážá áá¯ááºáááºá áá±á¬ááºá áºááá¯ááºááá¯ááŸáááºááá¯ááºá áááºážáááºááá¯ážááŸá áºáááºááá± ááŒá¯ááºáá»ááœá¬ážáááá·áºáááºá áá«áá±ááá·áº áá±áá»á¬áá¬ááá±á¬á· ááá¯áááºááŸááºáá»áá¯á¶áá«áá²á
áá«ááá¯á·áá±áá»á¬áá±á«ááºááœááºáááºážááŸá¬áááºá ááŒá áºáááºááŸáẠáá®ážáá«áž ááá¯ááºááá¯ááºáá±ážáá¬ážáá²á· SSIS package generator áá áºáá¯áá±á¬ááºáá¬áááº..á
ááŒá®ážáá±á¬á· á¡áá¯ááºáá áºáá áºáá¯ááŸá¬áááºá ááŒá®ážáá±á¬á· Apache Airflow á áá»áœááºáá±á¬á·áºááᯠáá»á±á¬áºááœá¬ážáááºá
ETL áá¯ááºáááºážá ááºáá±á¬áºááŒáá»ááºáá»á¬ážááẠááá¯ážááŸááºážáá±á¬ Python áá¯ááºááŒá áºááŒá±á¬ááºáž áá»áœááºá¯ááºááœá±á·ááŸááá±á¬á¡áá«ááœáẠáá»áœááºá¯ááºááẠáá»á±á¬áºááœáŸááºá áœá¬ ááá¯ááºáá²á·áá«á á€áááºááŸá¬ áá±áá¬á á®ážááŒá±á¬ááºážáá»á¬ážááᯠáá¬ážááŸááºážááŒá±á¬ááºážáá²ááŒá®áž ááœá²ááŒá¬ážááœá¬ážáá¬á áá±áá¬áá±á·á áºáá¬áá±á«ááºážáá»á¬ážá áœá¬á០áá áºááŸááºáá áºáá¯ááá¯á· áá áºáá¯áááºážááœá²á·á ááºážáá¯á¶áá«ááŸááá±á¬ ááá¬ážáá»á¬ážááᯠ13â áááºáá¬ážááŒááºáá áºáá¯ááŸáá·áºáá áºáá¯ááœá² ááá¯á·ááá¯áẠááŸá áºáá¯ááœáẠPython áá¯ááºáááá á¹á ááŒá áºáá¬áá²á·áááºá
á¡á á¯á¡áá±ážááᯠá á¯á ááºážááŒááºážá
áá¯á¶ážá áá°áááºáááºážááᯠáá á®á ááºáá¡á±á¬ááºá Airflow ááᯠáááºáááºááŒááºážá áááºááœá±ážáá»ááºáá¬ážáá±á¬ áá±áá¬áá±á·á áºá áááá® ááŸáá·áº docks ááœááºáá±á¬áºááŒáá¬ážáá±á¬ á¡ááŒá¬ážááá á¹á áá»á¬ážáá²á·ááá¯á· á€áá±áá¬ááœáẠáá¯á¶ážááááá¬áááºááŸá¬ážáá±á¬á¡áá¬áá»á¬ážá¡ááŒá±á¬ááºáž áááŒá±á¬áá«ááŸáá·áºá
á
ááºážáááºááŸá¯ááœá±ááᯠáá»ááºáá»ááºážá
áááºááá¯ááºá
á±ááá¯á· áá»áœááºáá±á¬áº áá¯á¶ááŒááºážááœá²áá²á·áááºá docker-compose.yml
ááá¯á¡áá²ááœááº-
- á¡ááŸááºáááẠááŒáŸáá·áºáááºááŒáá«á
áá¯á· áá±á
á®ážááŒá±á¬ááºáž: á¡á
á®á¡á
ááºááœá²áá°á Webserverá áááºážááœáá·áºááẠáááá®á¡áá¯ááºáá»á¬ážááᯠá
á±á¬áá·áºááŒáá·áºááẠááá¯áá±áá¬ááœáẠááŸáá·áºáááºáá±áááá·áºááẠ(áááºážááᯠááœááºážááá¯á·áá¬ážááŒá®ážááŒá
áºáá±á¬ááŒá±á¬áá·áº ááŒá
áºáááºá
apache/airflow:1.10.10-python3.7
áá«áá±ááá·áº áá«ááá¯á· á áááºááááºá á¬ážáá°áž) - PostgreSQLAirflow ááẠáááºážááááºáá±á¬ááºááŸá¯á¡áá»ááºá¡áááºáá»á¬áž (á¡á á®á¡á ááºááœá²áá°áá±áá¬á áá¯ááºáá±á¬ááºááŸá¯á á¬áááºážá¡ááºážá áááºááŒáá·áº) ááá¯áá±ážáá¬ážáááºááŒá áºááŒá®ážá Celery ááẠááŒá®ážááŒá±á¬ááºááá·áºá¡áá¯ááºáá»á¬ážááᯠá¡ááŸááºá¡áá¬ážááŒá¯áááºááŒá áºáááºá
- Redisáááá®á¡ááœáẠá¡áá¯ááºááœá²á á¬ážá¡ááŒá Ạáá±á¬ááºááœááºááá·áºá
- ááá¯ááºáá¶áᶠá¡áá¯ááºááá¬ážááá¯ááºááá¯ááºáá¯ááºáá±á¬ááºáááá·áº áá¯ááºáááºážáá¬áááºáá»á¬ážá
- ááá¯áá«ááá¯á·
./dags
dags ááá±á¬áºááŒáá»ááºááŸáá·áºá¡áá° áá»áœááºá¯ááºááá¯á·áááá¯ááºáá»á¬ážááᯠáá±á«ááºážááá·áºáá«áááºá áááºážááá¯á·ááᯠáá»á¶áá±á«áºááœáẠáá±á¬ááºáá°ááœá¬ážáááºááŒá áºááŒá®ážá ááá¯á·ááŒá±á¬áá·áº ááŸá¬áá»á±ááŸá¯ááá¯ááºážááŒá®ážáá±á¬áẠá¡á á¯á¡áá±ážáá áºáá¯áá¯á¶ážááᯠááŸáá·áºáááºááẠáááá¯á¡ááºáá«á
á¡áá»áá¯á·áá±áá¬áá»á¬ážááœáẠá¥ááá¬áá»á¬ážááŸá áá¯ááºááᯠáá¯á¶ážááááŒááá¯ááºáá² (á á¬áá¬ážááŸá¯ááºááœáá±á á±áááº)á ááá¯á·áá±á¬áº á¡áá»áá¯á·áá±áá¬áá»á¬ážááœáẠáááºážááᯠáá¯ááºáááºážá ááºááœáẠááœááºážáá¶áá¬ážáááºá ááŒá®ážááŒáá·áºá á¯á¶áá±á¬ á¡áá¯ááºáá¯ááºááá°áá¬áá»á¬ážááᯠááá¯ááŸá±á¬ááºáá¯á¶ááœáẠááœá±á·ááá¯ááºáá«áááºá
https://github.com/dm-logv/airflow-tutorial .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- broker
ááŸááºáá»ááºááœá±:
- áá±ážáá±áž ááááááºáá²ááŸá¬ áá°áááá»á¬ážáá²á· áá¯á¶ááááºááᯠá¡á¬ážááá¯ážáááááºá
puckel/docker-airflow - á¡á²áá«ááᯠáá±áá»á¬á á áºáá±ážáá«á ááá·áºááááŸá¬ áááŒá¬ážáá¬á០áááá¯á¡ááºáá±á¬á·áá¬áááºáž ááŒá áºááá¯ááºáá«áááºá - Airflow áááºáááºáá»á¬ážá¡á¬ážáá¯á¶ážááŸáááá·áºáᬠáááŸáááá¯ááºáá«áááºá
airflow.cfg
áá«áá±ááá·áºáááºáž áá»áœááºáá±á¬áº á¡ááœáá·áºáá±á¬ááºážáá°ááŒá®áž ááá¯ážááá¯ážááœá¬ážááœá¬áž á¡áá¯á¶ážáá»áá²á·áá²á· áááºáááºážáá»áẠááááºážááŸááºáá»á¬áž ( developer áá»á¬áž áá»á±ážáá°ážáááºáá«áááº)á - ááá¬áá¡ááá¯ááºážá áááºážááẠáá¯ááºáá¯ááºááŸá¯á¡áááºááá·áºááá¯ááºáá«- áá»áœááºá¯ááºááẠááœááºááááºáá¬áá»á¬ážáá±á«áºááœáẠááŸáá¯á¶ážáá¯ááºáá¶áá»á¬ážááᯠááááºááᬠáááá·áºáá¬ážáá² áá¯á¶ááŒá¯á¶áá±ážá¡ááœáẠá áááºááááºá á¬ážáá«á áá«áá±ááá·áº áá»áœááºáá±á¬áºááá¯á·áá²á· á ááºážáááºáá°ááœá±á¡ááœáẠááá·áºáá±á¬áºáá²á· á¡áááá·áºáá¯á¶ážááᯠáá»áœááºáá±á¬áº áá¯ááºáá²á·áááºá
- ááŸááºáááº:
- Dag ááá¯ááºááœá²ááᯠá¡áá»áááºááá¬ážáá±ážááœá²áá°ááŸáá·áº á¡áá¯ááºááá¬ážáá»á¬áž ááŸá áºáŠážá áá¯á¶áž áááºáá±á¬ááºááŒáá·áºááŸá¯ááá¯ááºááá«áááºá
- Third-party á á¬ááŒáá·áºááá¯ááºáá»á¬ážá¡á¬ážáá¯á¶ážááŸáá·áº á¡áá°áá°áááºááŒá áºááẠ- áááºážááá¯á·á¡á¬ážáá¯á¶ážááᯠá¡áá»áááºááá¬ážááœá²áá°ááŸáá·áº á¡áá¯ááºááá¬ážáá»á¬ážááŒáá·áº á ááºáá»á¬ážááœáẠááá·áºááœááºážááá«áááºá
áá²á á¡áá¯á ááá¯ážááŸááºážáá«áááºá
$ docker-compose up --scale worker=3
á¡áá¬á¡á¬ážáá¯á¶ážáááºáá¬ááŒá®ážáá±á¬ááºá áááºááẠáááºá¡ááºáá¬áá±á·á áºáá»á¬ážááᯠááŒáá·áºááŸá¯ááá¯ááºáááº-
- áá±á
á®ážááŒá±á¬ááºáž:
http://127.0.0.1:8080/admin/ - áááºáž
http://127.0.0.1:5555/dashboard
á¡ááŒá±áá¶ááá±á¬ááá¬áž
ဠ"dags" áá»á¬ážá¡á¬ážáá¯á¶ážááœáẠáááºáá¬ááŸáá¬ážááááºáá«áá á€áááºááŸá¬ á¡áááá¬ááºá¡ááá¯ááŒá áºáááºá
- Scheduler ááᯠ- á
ááºáá¯ááºáá»á¬áž áááºáááºáá²áá² á¡áá¯ááºáá¯ááºá áá°áá
áºáŠážááá¯ááºáá±á¬ Airflow ááœáẠá¡áá±ážá¡áá«áá¯á¶áž áŠážáá±ážááŒá
áºáá°- á¡áá»áááºááá¬ážááᯠá
á±á¬áá·áºááŒáá·áºááŒááºážá ááááºááááºáá¯ááºááŒááºážá á¡áá¯ááºáá»á¬ážááᯠáá¯ááºáá±á¬ááºááŒááºážáá»á¬áž áá¯ááºáá±á¬ááºáááºá
áá±áá¯áá»á¡á¬ážááŒáá·áºá áá¬ážááŸááºážá¡áá±á¬ááºážáá»á¬ážááœááºá áá°ááẠááŸááºáá¬ááºááá¯ááºáᬠááŒá¿áá¬áá»á¬áž (ááá¯ááºáá«á ááááá±á·ááŒááºážááá¯ááºáá±á¬áºáááºáž áá±á«ááºááŒá¬ážááŒááºáž) ááŸáááŒá®áž á¡ááœá±á¡ááŸá áºáááºááŸááºáá»ááºááẠconfiguration ááœááºááẠáá»ááºááŸááá±áá«áááºá
run_duration
- áááºážáááŒááºáááºá áááºáá»áááºáá¬áá áá«áá±ááá·áº á¡áá¯áá±á¬á· á¡á¬ážáá¯á¶ážá¡áááºááŒá±ááœá¬ážáá«ááŒá®á - DAG (aka "dag") - "ááœáŸááºááŒá¬ážáá¬ážááá·áº acyclic ááááº"á ááá¯á·áá±á¬áº ááá¯ááá¯á·áá±á¬ á¡áááá¹áá«ááºááœáá·áºááá¯áá»ááºááẠáá°á¡áááºážáááºááᯠááŒá±á¬ááŒáááá·áºáááºá ááá¯á·áá±á¬áº á¡ááŸááºááááºááœáẠáááºážááẠá¡áá»ááºážáá»ááºáž á¡ááŒááºá¡ááŸáẠá¡áá»áá¯ážááŒá¯ááá·áº á¡áá¯ááºáá»á¬ážá¡ááœáẠááœááºááááºáá¬áá
áºáᯠ(á¡á±á¬ááºááœááºááŒáá·áºáá«) ááá¯á·ááá¯áẠSSIS ááŸá Package ááŸáá·áº Informatica ááŸá Workflow ááá¯á·á analogue áá
áºáá¯ááŒá
áºáááºá .
áá¬ážáá»á¬ážá¡ááŒááºá ááá¯ááºážáá¯ááºáá»á¬ážáá«ááŸáááá¯ááºáá±á¬áºáááºáž áááºážááá¯á·ááᯠáá»áœááºá¯ááºááá¯á· ááá±á¬ááºááá¯ááºáá«á
- DAG Run - áááºážáááá¯ááºááá¯ááºáááºááŸááºáá¬ážáá±á¬áááºááá¯á
áááºáááºá
execution_date
. áá°áá®áá±á¬ ááá¬ááá¬ááºáá»á¬ážááẠááŒáá¯ááºáá°á¡áá¯ááºáá¯ááºááá¯ááºááẠ(áááºááá¬áááºáá»á¬ážááᯠá¡á¬ážáááºážá¡á±á¬ááºááŒá¯áá¯ááºáá¬ážáá»áŸáẠáá¯ááºáá«áááº)á - á¡á±á¬áºááá±áᬠáááá»áá±á¬áá¯ááºáá±á¬ááºáá»ááºáá
áºáá¯áá¯ááºáá±á¬ááºááẠáá¬áááºááŸááá±á¬áá¯ááºá¡ááá¯ááºážá¡á
áá»á¬ážááŒá
áºáááºá á¡á±á¬áºááá±áá¬áá¯á¶ážáá»áá¯ážááŸááááºá
- ááŸá¯ááºááŸá¬ážááŸá¯áá«ááá¯á·á¡ááŒáá¯ááº
PythonOperator
áááºááá·áº (ááá¬ážáááº) Python áá¯ááºááá¯áááᯠáá¯ááºáá±á¬ááºááá¯ááºáááºá - ááœáŸá²ááŒá±á¬ááºážáá
áºáá±áá¬á០áá
áºáá±áá¬ááá¯á· ááá¯á·áá±á¬ááºáá±ážáá±á¬ áá±áá¬á
MsSqlToHiveTransfer
; - á¡á¬áá¯á¶áá¶áááááᬠá¡ááŒá¬ážáá
áºáááºááœááºá áááºážááẠááŒá
áºáááºáá
áºáá¯áááŒá
áºááœá¬ážáá®á¡áá ááá¯ááºážááá±á¬ááºáááºáá¯ááºáá±á¬ááºááŸá¯ááᯠáá¯á¶á·ááŒááºááẠááá¯á·ááá¯áẠááŸá±ážááœá±ážá
á±áááºááŒá
áºáááºá
HttpSensor
áááºááŸááºáá¬ážáá±á¬ á¡áá¯á¶ážá¡ááŸááºááᯠááœá²áá¯ááºááá¯ááºááŒá®áž á¡ááá¯ááŸááá±á¬ áá¯á¶á·ááŒááºááŸá¯ááᯠá á±á¬áá·áºááá¯ááºážáá±áá»áááºááœáẠááœáŸá²ááŒá±á¬ááºážááŸá¯ááᯠá áááºáá«áGoogleCloudStorageToS3Operator
. á á°ážá ááºážááá¯á áááºá âáá¬ááŒá±á¬áá·áºáá²á áá±á¬ááºáá¯á¶ážá¡áá±áá²á·á á¡á±á¬áºááá±áá¬ááŸá¬ áááºáá«áááºáá«áá¯ááºááá¯ááºáá«áááºáâ ááá¯á·áá±á¬áẠááá¯ááºážáá¶á·áá¬ážáá±á¬ á¡á±á¬áºááá±áá¬áá»á¬ážááŸáá·áº á¡áá¯ááºáá»á¬áž ááááºááá¯á·ááŒááºáž áááŸáá á±áá±ážá áá±á¬ááºáá áºááŒáááºáááŒáá¯ážá á¬ážáá® á¡á¬áá¯á¶áá¶áááááá¬ááẠá áááºáááºá á á áºáá±ážááŒá®áž áá±áá¯á¶ážáááºá
- ááŸá¯ááºááŸá¬ážááŸá¯áá«ááá¯á·á¡ááŒáá¯ááº
- áá¯ááºáááºáž - á¡áá»áá¯ážá¡á á¬ážáááœá²ááŒá¬ážáá² ááŒá±ááŒá¬áá¬ážáá±á¬ á¡á±á¬áºááá±áá¬áá»á¬ážá¡á¬áž á¡áá¯ááºáá¬áá°ážá¡ááá·áºááá¯á· ááá¯ážááŒáŸáá·áºáá±ážáááºá
- á¡áá¯ááºá¥ááᬠ- áá»á±á¬áºááŒá±áá°-á¡áá¯ááºááá¬ážáá»á¬ážááᯠááá¯ááºááœá²áááºááẠá¡ááœá±ááœá±á
á®á
ááºáá°á០áá¬áááºáá»á¬ážáá±ážááá¯á·ááẠá¡áá»áááºáááºááŒá®áᯠáá¯á¶ážááŒááºáá±á¬á¡áá« (áá»áœááºá¯ááºááá¯á·á¡áá¯á¶ážááŒá¯áá»áŸáẠáá±áá¬ááŸááºá
LocalExecutor
ááá¯á·ááá¯áẠáááá á¹á áááºááœáẠá¡áá±ážááááºáž Node áá áºáá¯áá®ááá¯á·CeleryExecutor
) áááºážááẠáááºážááá¯á·á¡á¬áž áááºá ááºá¡ááŒá±á¬ááºážá¡áá¬áá áºáᯠáááºááŸááºáá±ážááẠ(ááá¯ááá¯áááºááŸá¬á ááááºážááŸááºá¡á á¯áá áºáᯠ- áá¯ááºáá±á¬ááºááŸá¯áá±á¬ááºáá»á¬áž)á á¡áááá·áºáá±ážáá»áẠááá¯á·ááá¯áẠá á¯á¶á ááºážááŸá¯áá¯á¶á á¶áá»á¬ážááᯠáá»á²á·ááœááºááŒá®áž áááºážááá¯á·ááᯠáá±á«ááºážá ááºážáá±ážáááºá
áá»áœááºá¯ááºááá¯á·ááẠá¡áá¯ááºáá»á¬ážááᯠáá¯ááºáá±ážáá«áááºá
áŠážá áœá¬á áá»áœááºá¯ááºááá¯á·á doug á áá±áá°áá»á¡á á®á¡á á¥áºááᯠá¡ááŒááºážáá»ááºážáá±á¬áºááŒááŒáá«á áá¯á·á ááá¯á·áá±á¬áẠáá»áœááºá¯ááºááá¯á·ááẠá¡áá±ážá¡ááœá²ááá¯ááºáá±á¬ ááŒá±ááŸááºážáááºážá¡áá»áá¯á·ááᯠá¡áá¯á¶ážááŒá¯áá¬ážáá±á¬ááŒá±á¬áá·áº á¡áá±ážá áááºá¡áá»ááºá¡áááºáá»á¬ážááᯠááá¯áááá¯áááá¯ážá á¡áá±ážá áááºáá±á·áá¬ááŒáá·áºáá«áááºá
ááá¯á·ááŒá±á¬áá·áºá áááºážáá¡ááá¯ážááŸááºážáá¯á¶ážáá¯á¶á á¶ááŒáá·áºá ááá¯áá²á·ááá¯á·áá±á¬ áá¬ážáá¯á¶ááẠá€áá²á·ááá¯á·ááŒá áºáá±áááá·áºáááº-
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
á¡ááŒá±ááŸá¬ááŒáá·áºáá¡á±á¬ááºá
- ááááŠážá áœá¬ áá»áœááºá¯ááºááá¯á·ááẠááá¯á¡ááºáá±á¬ libs áá»á¬ážááŸáá·áº áááºááœááºážáááºá á¡ááŒá¬ážáá áºáá¯áá¯;
sql_server_ds
- áList[namedtuple[str, str]]
Airflow Connections ááŸáá»áááºáááºááŸá¯áá»á¬ážáá¡áááºáá»á¬ážááŸáá·áºáá»áœááºá¯ááºááá¯á·ááááºážáááºááá¯áá°ááá·áºáá±áá¬áá±á·á áºáá»á¬ážádag
- áá±áá»á¬áá±á«ááºááŒá áºááááºá áá»áœááºá¯ááºááá¯á·á doug áááŒá±ááŒá¬áá»ááºglobals()
ááá¯ááºááẠAirflow á ááŸá¬áááœá±á·áá°ážá Doug ááááºáž ááŒá±á¬ááá¯áá«áááºá- áá°á·áá¬áááºá áá¬áá²á
orders
- ááá¯á·áá±á¬áẠá€á¡áááºááẠáááºá¡ááºáá¬áá±á·á áºááœáẠáá±á«áºáá¬áááá·áºáááºá - áá°ááá¯ááºá ááŸá áºáááºáá±á· ááááºážáá±á«ááºá០á¡áá¯ááºáááºážááááºá
- ááá·áºááŸááºážááŒá±á¡á¬ážááŒáá·áº 6 áá¬áá®ááá¯ááºáž ááŒá±ážááá·áºáá«áááºá
timedelta()
ááœáá·áºáááŒá¯cron
-ááá¯ááºáž0 0 0/6 ? * * *
, for the less cool - ááŒáá¯ááºáá²á· expression áá áºáá¯@daily
);
- áá°á·áá¬áááºá áá¬áá²á
workflow()
á¡ááá á¡áá¯ááºááᯠáá¯ááºáááá·áºáááºá áá«áá±ááá·áº á¡áᯠááá¯ááºáá°ážá ááá¯á¡áá»áááºááœáẠáá»áœááºá¯ááºááá¯á·áá¡ááŒá±á¬ááºážá¡áá¬ááᯠááŸááºáááºážáá²ááá¯á· á áœáá·áºáá áºááá¯ááºáá«áááºá- ááá¯ááœáẠáá¯ááºáá±á¬ááºá
áá¬áá»á¬ážááᯠáááºáá®ážááŒááºážá ááá¯ážááŸááºážáá±á¬ ááŸá±á¬áºááá¬á
- áá»áœááºá¯ááºááá¯á·ááẠáá»áœááºá¯ááºááá¯á·ááááºážááŒá áºáá»á¬ážááŸáááá·áº áá¯ááºáá±á¬ááºáá«áááºá
- á
áááºááá¯ááºáá«á
PythonOperator
áá»áœááºá¯ááºááá¯á·á dummy ááᯠá¡áá±á¬ááºá¡áááºáá±á¬áºááá·áºáworkflow()
. á¡áá¯ááºááá°ážááŒá¬ážáá±á¬ (áááºá¡ááœááºáž) á¡áááºááᯠáááºááŸááºá áá¬ážááᯠáá»ááºááŸá±á¬ááºááẠááá±á·áá«ááŸáá·áºá á¡áá¶provide_context
áá áºáááºá áá»áœááºá¯ááºááá¯á· ááá¯áá áá¯áẠá á¯áá±á¬ááºážá¡áá¯á¶ážááŒá¯áá¬ážáá±á¬ function áá²ááá¯á· áá±á¬ááºááẠá¡ááŒá±á¬ááºážááŒáá»ááºáá»á¬ážááᯠáá±á¬ááºážááá·áºáá«áááºá**context
.
áá±á¬áá±á¬áááºáá±á¬á· áá®áá±á¬ááºáá«áá²á áá»áœááºá¯ááºááá¯á·áááŸááááº-
- web interface ááœáẠdag á¡áá áºá
- á¡ááŒáá¯ááºáá¯ááºáá±á¬ááºááá·áº á¡áá¯ááºáá±á«ááºáž áá áºáá¬ááœá² (Airflowá Celery áááºáááºáá»á¬ážááŸáá·áº áá¬áá¬á áœááºážáááºá ááœáá·áºááŒá¯áá«á)á
áá±á¬ááºážááŒá®á áááŒá®á
ááŸá®ááá¯ááŸá¯áá»á¬ážááᯠáááºáá°á ááá·áºááœááºážáááºáááºážá
á€á¡áá¬á¡á¬ážáá¯á¶ážááᯠááá¯ážááŸááºážá
á±áááºá¡ááœáẠáá»áœááºá¯áẠááŸáá·áºá
á¬ážáá²á·áá«áááºá docker-compose.yml
áá¯ááºáá±á¬ááºáá±ááẠrequirements.txt
node á¡á¬ážáá¯á¶ážááœááºá
á¡áá¯áá±á¬á· ááœá¬ážááŒá®-
áá®ážááá¯ážáá±á¬ááºá áá¯áááºážáá»á¬ážááẠá¡áá»áááºááá¬ážááœá²áá°á០áá¯ááºáá±á¬ááºáá±á¬ á¡áá¯ááºááŒá áºáááºáá»á¬ážááŒá áºáááºá
ááá á±á¬áá·áºáá«á á¡áá¯ááºááá¬ážááœá±á á¡áá¯ááºááœá± ááŸá¯ááºáá¯ááºáááºá
áá¯ááºáá«ááẠá¡á áááºážááœá± áᬠáá°ááá¯á·áá²á· á¡áá¯ááºááᯠá¡á±á¬ááºááŒááºá áœá¬ ááŒá®ážááŒá±á¬ááºáá²á·áጠáá«áááºá á¡áá®áá±á¬ááºááœá±á ááááºáá¡á±á¬ááºááŒááºáá«áá°ážá
á áá¬ážáá ááºá áá»áœááºá¯ááºááá¯á·ááá¯ááºáá¯ááºááœáẠfolder áááŸááá«á
./dags
á ááºáá»á¬ážááŒá¬ážááœáẠáááºáá°ááŒá¯ááŸá¯ áááŸááá« - ááá¯ááºážáá»á¬áž á¡á¬ážáá¯á¶áž áá«áááºáá«áááºágit
áá»áœááºá¯ááºááá¯á·á Gitlab ááœááºá ááŸáá·áº Gitlab CI ááẠáá±á«ááºážá ááºážááá·áºá¡áá« á ááºáá»á¬ážááá¯á· á¡ááºááááºáá»á¬ážááᯠááŒáá·áºáá±áá«áááºámaster
.
áááºážá¡ááŒá±á¬ááºážá¡áááºážáááº
á¡áá¯ááºááá¬ážááœá±á áá»áœááºáá±á¬áºááá¯á·áá²á· áááºáá»ááºá á±áá²á·á¡áá¬ááœá±ááᯠááœááºááá¯ááºáá±áá»áááºááŸá¬ áá»áœááºáá±á¬áºááá¯á·ááᯠáá áºáá¯áá¯ááŒáá±ážááá¯ááºáá²á· áá±á¬ááºáááºáááááá¬áá áºáᯠ- Flower ááᯠááááááá¯ááºááŒáá¡á±á¬ááºá
á¡áá¯ááºááá¬áž áá¯á¶ááŸááºáá»á¬ážááá¯ááºáᬠá¡áá»ááºážáá»á¯áẠá¡áá»ááºá¡áááºáá«ááŸááá±á¬ áááá á¬áá»ááºááŸá¬
á¡áá¯ááºááœá¬ážáá²á·ááá·áº á¡áá¯ááºáá»á¬ážáá« á¡ááŒááºážáááºáá¯á¶áž á á¬áá»ááºááŸá¬
áá»áœááºá¯ááºááá¯á·á ááœá²á á¬ážá á¡ááŒá±á¡áá±ááŸáá·áº á¡áá»ááºážáá¯á¶áž á á¬áá»ááºááŸá¬
á¡áá±á¬ááºááá¯á¶ážá á¬áá»ááºááŸá¬ááŸá¬ á¡áá¯ááºá¡ááŒá±á¡áá±ááááºáá»á¬ážááŸáá·áº áááºážááá¯á·ááá¯ááºáá±á¬ááºáá»áááºáá»á¬ážáá«ááŸááááº-
áá»áœááºá¯ááºááá¯á·ááẠáááºáá¯ááºáááºááá¯ážááᯠáááºážááœááºáááºá
áá«á០á¡áá¯ááºááœá± ááŒá®ážááœá¬ážááŒá®á áááºáá¬ááá°ááœá±ááᯠáááºááœá¬ážááá¯ááºáááºá
á¡ááŒá±á¬ááºážáááºážáá áºáá¯ááá¯ááºáá áºáá¯ááŒá±á¬áá·áº áááºáá¬ááá°á¡áá»á¬ážá¡ááŒá¬ážááŸááá²á·áááºá Airflow áááŸááºáááºáá±á¬á¡áá¯á¶ážááŒá¯ááŸá¯ááá á¹á ááœááºá á€á áá¯áááºážáá¯á¶áá»á¬ážááẠdata áá»á¬ážáá±áá»á¬áá±á«ááºáá±á¬ááºááá¬ááŒá±á¬ááºážáá±á¬áºááŒáááºá
ááŸááºáááºážááᯠááŒáá·áºááŸá¯ááŒá®áž áá»áááºážááœá¬ážáá±á¬ áá¯ááºáá±á¬ááºá áá¬ááŒá áºáááºáá»á¬ážááᯠááŒááºáááºá áááºááẠááá¯á¡ááºáááºá
áááºááá·áºá áá¯áááºážááᯠááŸáááºááŒááºážááŒáá·áºá áá»áœááºá¯ááºááá¯á·á¡ááœáẠáááŸáááá¯ááºáá±á¬ áá¯ááºáá±á¬ááºáá»ááºáá»á¬ážááᯠáá»áœááºá¯ááºááá¯á· ááœá±á·ááááá·áºáááº-
ááŒá¯ááºáá»áá¬ááᯠááŸááºážá¡á±á¬ááºáá¯ááºááá¯á·ááááºá ááá¯ááá¯áááºááŸá¬á ááá¯áá±áá¬ááœáẠáá áºá á¯á¶áá áºáᯠáá»ááºááœááºááœá¬ážáááºááᯠáá»áœááºá¯ááºááá¯á·áá±á·áá¬ážááŒá®áž áá°áá®áá±á¬áá¯ááºáá±á¬ááºá áá¬ááŸá¬ á¡áá»áááºááá¬ážááœá²áá°áᶠáá±á¬ááºááœá¬ážáááºááŒá áºáááºá
á¡áá®áá±á¬ááºá
áá¯áááºážá¡á¬ážáá¯á¶ážááᯠáá±á¬ááºá
áºááŒáá·áºááŒá¯áá¯ááºááŒááºážááẠáá°áá¬ážáááºááŒááºážááá¯ááºááŒá±á¬ááºáž ááŸááºážááŸááºážáááºážáááºážáááááẠ- áááºážááẠAirflow á០áá»áœááºá¯ááºááá¯á·áá»áŸá±á¬áºááá·áºáá¬ážááá·áºá¡áá¬ááá¯ááºáá«á ááá¬áá¡á¬ážááŒáá·áºá áá»áœááºá¯ááºááá¯á·ááœáẠá¡á
á¯ááá¯ááºá¡ááŒá¯á¶ááá¯áẠáá»ááºáá®ážááá¯ááºáá±á¬ áááºáááºáá»á¬ážááŸááááºá Browse/Task Instances
á¡áá¬á¡á¬ážáá¯á¶ážááᯠáá áºááŒáá¯ááºááẠááœá±ážáá»ááºááŒá®áž áá¯áááá¯á· ááŒááºáááºáááºááŸááºááŒáá«á áá¯á·á ááŸááºáááºááá·áºá¡áá¬ááᯠááŸáááºáá«-
ááá·áºááŸááºážáá±ážáá¯ááºááŒá®ážáá±á¬ááºá áá»áœááºá¯ááºááá¯á·áááá¹áá á®áá»á¬ážááẠá€áá¯á¶á á¶á¡ááá¯ááºážááŒá áºááẠ(áá°ááá¯á·á á®á ááºáá±ážááá·áºá¡á á®á¡á ááºááŸá°ážááᯠá á±á¬áá·áºáá±ááŒá®ááŒá áºáááº)á
áá»áááºáááºááŸá¯áá»á¬ážá áá»áááºáá»á¬ážááŸáá·áº á¡ááŒá¬ážááŒá±á¬ááºážáá²ááŸá¯áá»á¬áž
áá±á¬áẠDAG ááá¯ááŒáá·áºááá¯á·á¡áá»áááºáá±á¬ááºááŒá®á update_reports.py
:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ÐПÑпПЎа Ñ
ПÑПÑОе, ПÑÑеÑÑ ÐŸÐ±ÐœÐŸÐ²Ð»ÐµÐœÑ"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ÐаÑаÑ, пÑПÑÑпайÑÑ, ÐŒÑ {{ dag.dag_id }} ÑÑПМОлО
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
áá°ááá¯ááºáž á¡á á®áááºáá¶á ᬠá¡ááºááááºáá¯ááºáá°ážáá«ááá¬ážá á€áááºááŸá¬ áá°áá áá±á¬ááºáá áºááŒáááºááŒá áºáááº- áá±áá¬ááá°áááá·áºáá±áá¬á០áááºážááŒá áºáá»á¬ážá á¬áááºážáá áºáᯠááŸááá«áááºá áá¬ážáááá·áºá á¬áááºážáá áºáá¯ááŸááááºá á¡áá¬á¡á¬ážáá¯á¶ážááŒá áºâáá»ááºâááœá¬ážáá²á·á¡áá« ááœááºážáá®ážááá¯á·áâáá±á·áá«áá²á· (áá«á áá«ááá¯á·áá²á·áááá¯ááºâáá«áá°áž)á
ááá¯ááºááá¯ááááºááŒááºááŒá®áž ááááºáááŸá¬ážá¡áá¬á¡áá áºáá»á¬ážááᯠááŒáá·áºááŒáá«á áá¯á·á
from commons.operators import TelegramBotSendMessage
- Unblocked ááá¯á· áááºáá±á·áá»áºááá¯á·ááŒááºážá¡ááœáẠáá±ážáááºáá±á¬ wrapper áá áºáá¯ááŒá¯áá¯ááºááŒááºážááŒáá·áº áá»áœááºá¯ááºááá¯á·áááá¯ááºááá¯ááºá¡á±á¬áºááá±áá¬áá»á¬ážááŒá¯áá¯ááºááŒááºážá០áá»áœááºá¯ááºááá¯á·á¡á¬áž áááºááá·áºá¡áá¬áá០áá¬ážáá®ážááááºáááºáááºááá¯ááºáá«á (á€á¡á±á¬áºááá±áá¬á¡ááŒá±á¬ááºážááᯠá¡á±á¬ááºááœáẠáááºáááºááœá±ážááœá±ážáá«áááºádefault_args={}
- dag ááẠáááºážá á¡á±á¬áºááá±áá¬á¡á¬ážáá¯á¶ážááᯠáá°áá®áá±á¬ á¡ááŒá±á¬ááºážááŒáá»ááºáá»á¬ážááᯠááŒáá·áºáá±ááá¯ááºáááºáto='{{ var.value.all_the_kings_men }}'
- á¡ááœááºto
áá»áœááºá¯ááºááá¯á·ááœáẠhardcoded áá¯ááºáááºááá¯ááºáá±á¬áºáááºážá áá»áœááºá¯ááºááá¯áá áá¯ááºááá·áºááœááºážáá¬ážááá·áº á¡á®ážáá±ážááºá á¬áááºážáá«ááá·áº Jinja ááŸáá·áº variable ááá¯á·ááᯠá¡áá¯á¶ážááŒá¯á ááá¯ááºážááá áºáá¯ááºáá±ážáá«áááºáAdmin/Variables
;trigger_rule=TriggerRule.ALL_SUCCESS
- á¡á±á¬áºááá±áá¬á áááºáááºá¡ááŒá±á¡áá±á áá»áœááºá¯ááºááá¯á·áá¡ááŒá±á¡áá±ááœááºá ááŸá®ááá¯ááŸá¯á¡á¬ážáá¯á¶áž ááŒá±áááºááœá¬ážááŸáᬠáá°áá±ážáᶠá á¬ááá¯á·áá«áááºá á¡á±á¬ááºááŒááºá áœá¬;tg_bot_conn_id='tg_main'
- á¡ááŒááºážááœá¬ážááŸá¯áá»á¬ážconn_id
áá»áœááºá¯ááºááá¯á· áááºáá®ážáá¬ážáá±á¬ áá»áááºáááºááŸá¯ ID áá»á¬ážááᯠáááºáá¶áá«áAdmin/Connections
;trigger_rule=TriggerRule.ONE_FAILED
- ááŒá¯ááºáá»áá±á¬á¡áá¯ááºáá»á¬ážááŸáááŸáᬠTelegram ááŸáááºáá±á·ááºá»áá»á¬ážáááºáá±ážááœá¬áááá·áºáááºátask_concurrency=1
- áá¯ááºáááºážáá áºáá¯á áá¯ááºáááºážáá±á¬ááºáá¬áá»á¬ážá áœá¬ááᯠáá áºááŒáá¯ááºáááºáááºáž áá¯ááºáá±á¬ááºááŒááºážááᯠáá»áœááºá¯ááºááá¯á· áá¬ážááŒá áºáá¬ážáá«áááºá ááá¯ááºáá«áá áá»áœááºá¯ááºááá¯á·ááẠá¡áá»á¬ážá¡ááŒá¬ážááᯠáááŒáá¯ááºáááºáááºáž áá áºááœáŸááºááá¯ááºáááºááŒá áºáááºáVerticaOperator
(á á¬ážááœá²áá áºáá¯á¶ážááá¯ááŒáá·áºááŒááºáž);report_update >> [email, tg]
- á¡á¬ážáá¯á¶ážVerticaOperator
á€áá²á·ááá¯á·áá±á¬ á á¬áá»á¬ážááŸáá·áº áááºáá±á·áá»áºáá»á¬áž áá±ážááá¯á·ááŒááºážááœáẠáá±á«ááºážá ááºáá«á
ááá¯á·áá±á¬áº á¡ááŒá±á¬ááºážááŒá¬ážáá° á¡á±á¬áºááá±áá¬áá»á¬ážááœáẠááá°áá®áá±á¬ ááœáŸáá·áºáááºááŸá¯ á¡ááŒá±á¡áá±áá»á¬áž ááŸááá±á¬ááŒá±á¬áá·áºá áá áºáá¯áᬠá¡áá¯ááºáá¯ááºáá«áááºá Tree View ááœááºá á¡áá¬á¡á¬ážáá¯á¶ážááẠá¡áááºážáááºáá¬ááŒááºáá¬áááº-
á áá¬ážá¡áááºážáááºááŒá±á¬áá«áááºá áááºáááᯠáá°ááá¯á·áá²á· áá°áááºáá»ááºážáá»á¬áž- ááááºážááŸááºáá»á¬áž.
Macros ááẠá¡áá»áá¯ážáá»áá¯ážáá±á¬ á¡áá¯á¶ážáááºáá±á¬ á¡áá»ááºá¡áááºáá»á¬ážááᯠá¡á±á¬áºááá±áᬠá¡ááŒááºážá¡áá¯á¶áá»á¬ážá¡ááŒá Ạá¡á á¬ážááá¯ážááá¯ááºáá±á¬ Jinja placeholder ááŒá áºáááºá á¥ááá¬á á€áá²á·ááá¯á·áá±á¬á
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }}
context variable á á¡ááŒá±á¬ááºážá¡áá¬áá»á¬ážááá¯á· áá»á²á·ááœááºáá«áááºá execution_date
áá¯á¶á
á¶á¡ááœáẠYYYY-MM-DD
: 2020-07-14
. á¡áá±á¬ááºážáá¯á¶ážá¡ááá¯ááºážááŸá¬ áááºá
ááºááááºážááŸááºáá»á¬ážááᯠáááºáááºááŸááºááŸááºáá¯ááºáá±á¬ááºá
áᬠá¥ááá¬áá
áºáᯠ(áá
áºáááºááŒááºááœááºážááŸá á
áá¯áááºážáá
áºáá¯) ááœáẠáááºáááºáá¬ážááŒá®áž ááŒááºáááºá
áááºááá·áºá¡áá« áá±áá¬ááá¯ááºáá±á¬ááºáá°áá»á¬ážááẠáá°áá®áá±á¬áááºááá¯ážáá»á¬ážá¡áá áá»á²á·ááœááºáá¬áááºááŒá
áºáááºá
áá¬áááºáá áºáá¯á á®ááŸá áá¯ááºáá±á¬ááºáá»ááºáá áºáá¯á á®ááŸá Rendered ááá¯ááºááᯠá¡áá¯á¶ážááŒá¯á áááºááŸááºáá¬ážáá±á¬ áááºááá¯ážáá»á¬ážááᯠááŒáá·áºááŸá¯ááá¯ááºáá«áááºá á€áááºááŸá¬ á á¬áá áºá á±á¬ááºáá±ážááá¯á·ááŒááºážá áá¬áááºááŒá áºáááºá
áááºáá±á·áá»áºááá¯á·áá²á·á¡áá¯ááºááŸá¬á
áá±á¬ááºáá¯á¶ážáááá¯ááºáá±á¬áá¬ážááŸááºážá¡ááœáẠbuilt-in macro á
á¬áááºážá¡ááŒáá·áºá¡á
á¯á¶ááᯠá€áá±áá¬ááœáẠáááá¯ááºáá«áááº-
ááá¯á·á¡ááŒááºá plugins áá»á¬ážáá¡áá°á¡áá®ááŒáá·áºá áá»áœááºá¯ááºááá¯á·ááẠáá»áœááºá¯ááºááá¯á·áááá¯ááºááá¯áẠmacro ááá¯ááŒá±ááŒá¬ááá¯ááºáááºá ááá¯á·áá±á¬áºáááºážáááºá¡ááŒá¬ážáá¬ááºáááºážááŒá áºáááºá
ááŒáá¯áááºáááºááŸááºáá¬ážááá·áºá¡áá¬áá»á¬ážá¡ááŒááºá áá»áœááºá¯ááºááá¯á·ááẠáá»áœááºá¯ááºááá¯á·á variable áá»á¬ážááááºááá¯ážáá»á¬ážááᯠá¡á
á¬ážááá¯ážááá¯ááºááẠ(á¡áááºáá¯ááºááœáẠáááºážááᯠáá»áœááºá¯ááºá¡áá¯á¶ážááŒá¯áá¬ážááŒá®ážááŒá
áºáááº)á áááºáá®ážááŒáá¡á±á¬áẠAdmin/Variables
á¡áá¬ááŸá
áºáá¯-
áááºá¡áá¯á¶ážááŒá¯ááá¯ááºááá»áŸ-
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')
áááºááá¯ážááẠá áá±ážáá áºáᯠááŒá áºááá¯ááºáááºá ááá¯á·ááá¯áẠJSON áááºáž ááŒá áºááá¯ááºáááºá JSON ááœááºá
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}
ááá¯áá»ááºáá±á¬áá®ážáá®ááá¯á· áááºážááŒá±á¬ááºážááᯠá¡áá¯á¶ážááŒá¯áá«á {{ var.json.bot_config.bot.token }}
.
á
áá¬ážáá¯á¶ážáá
áºáá¯á¶ážáááºážáá²á· áááºáá¬ážááŒááºáá¯á¶áá
áºáá¯á¶ááᯠááŒáá«áááºá áááºááœááºááŸá¯áá»á¬áž. á¡áá¬á¡á¬ážáá¯á¶ážááẠá€áá±áá¬ááœáẠá¡ááŒá±áá¶ááŒá
áºáááº- á
á¬áá»ááºááŸá¬áá±á«áºááœáẠAdmin/Connections
áá»áœááºá¯ááºááá¯á·ááẠáá»áááºáááºááŸá¯áá
áºáá¯ááᯠáááºáá®ážááŒá®ážá áá»áœááºá¯ááºááá¯á·á áá±á¬á·ááºá¡ááºáá»á¬áž/á
áá¬ážááŸááºáá»á¬ážááᯠáá±á«ááºážááá·áºáᬠááá¯áá±áá¬ááœáẠááá¯ááá¯áááá»áá±á¬ ááá·áºáááºáá±á¬ááºáá»á¬ážááᯠááá·áºáá«áááºá áá®ááá¯áá»áá¯áž:
á
áá¬ážááŸááºáá»á¬ážááᯠáá¯ááºááŸááºáá¬ážááá¯ááºááẠ(áá°áááºážááẠááá¯áá±áá»á¬áááº) ááá¯á·ááá¯áẠáá»áááºáááºááŸá¯á¡áá»áá¯ážá¡á
á¬ážááᯠáá»ááºáá¬ážááá¯ááºááẠ(áá»áœááºáá±á¬áºááŒá¯áá¯ááºáá²á·ááá·áºá¡ááá¯ááºážá tg_main
) - á¡ááŸááºááŸá¬ á¡áá»áá¯ážá¡á
á¬ážáá»á¬ážá
á¬áááºážááᯠAirflow áá±á¬áºáááºáá»á¬ážááœáẠhardwired áá¯ááºááŒá®áž á¡áááºážá¡ááŒá
áºáá¯ááºáá»á¬ážáá²ááá¯á· ááááºáá² áá»á²á·ááœááºááááá« (áá¯ááºáááẠáá»áœááºá¯áẠgoogle á០áá
áºá
á¯á¶áá
áºáᯠááá¯ááºáá±á¬ááºáá«áá áá»á±ážáá°ážááŒá¯á ááŒááºáá±ážáá«)á ááá¯á·áá±á¬áº ááááºáá
áºáá»á¬ážááá°ááŒááºážá០áá»áœááºá¯ááºááá¯á·á¡á¬áž áááºááá·áºá¡áá¬á០áá¬ážáá®ážááá¯ááºáááºááá¯ááºáá«á áá¬áááºá
áá°áá®áá±á¬á¡áááºááŒáá·áº áá»áááºáááºááŸá¯áá»á¬ážá
áœá¬ ááŒá¯áá¯ááºááá¯ááºáááº- á€ááá
á¹á
ááœááºá áááºážáááºáž BaseHook.get_connection()
á¡áááºááŒáá·áºáááºááœááºááŸá¯áá»á¬ážáááŸáááœá¬ážáá±á¬á áá»áááºáž áá¬áááºá¡áá»áá¯ážáá»áá¯ážá០( Round Robin ááá¯áá¯ááºááŒááºážááẠááá¯ááá¯áá¹ááááŸááááá·áºáááºá ááá¯á·áá±á¬áº Airflow developer áá»á¬ážáá
áááºááœááºáá¬ážáá²á·ááŒáá«á
áá¯á·)á
Variables áá»á¬ážááŸáá·áº Connections áá»á¬ážááẠá¡ááœááºáá±á¬ááºážááœááºáá±á¬ áááááá¬áá»á¬ážááŒá áºáááºá ááá¯á·áá±á¬áº áááºáá»ááºááá®ááẠá¡áá±ážááŒá®ážáááº- ááá·áºá á®ážáááºážááŸá¯á á¡á áááºá¡ááá¯ááºážáá»á¬ážááᯠáá¯ááºááá¯ááºááá¯áẠááááºážáááºážáá¬ážááŒá®áž áááºááá·áºá¡ááá¯ááºážáá»á¬ážááᯠááá¯ááŸá±á¬ááºáááºá¡ááœáẠAirflow ááá¯á· áá±ážáááºáááºážá áá áºáááºááœááºá UI ááŸáá áºááá·áºá á¥ááá¬á á á¬ááá¯á·áá±áá¹áá¬áá áºáá¯á áááºááá¯ážááᯠáá»ááºááŒááºá áœá¬ááŒá±á¬ááºážáá²ááẠá¡áááºááŒá±ááá¯ááºáááºá á¡ááŒá¬ážáá áºáááºááœááºá áááºážáááºáá»áœááºá¯ááºááá¯á· (áá»áœááºá¯ááº) ááŸáááºááŸá¬ážááá¯áá±á¬ mouse ááá¯ááŸáááºááŒááºážááá¯á·ááŒááºááœá¬ážáá²ááŒá áºáááºá
áá»áááºáááºááŸá¯ááŒáá·áº áá¯ááºáá±á¬ááºááŒááºážááẠá¡áá¯ááºáá»á¬ážáá²á០áá
áºáá¯ááŒá
áºáááºá áá»áááº. áá±áá¯áá»á¡á¬ážááŒáá·áºá Airflow áá»áááºáá»á¬ážááẠáááºážááᯠááŒááºááááºáá±á¬ááºááŸá¯áá»á¬ážááŸáá·áº á
á¬ááŒáá·áºááá¯ááºáá»á¬ážááá¯á· áá»áááºáááºáááºá¡ááœáẠá¡ááŸááºáá»á¬ážááŒá
áºáááºá á¥ááá¬- JiraHook
Jira ááŸáá·áº á¡ááŒááºá¡ááŸáẠáá¯á¶á·ááŒááºááẠáá»áœááºá¯ááºááá¯á·á¡ááœáẠclient áá
áºáá¯ááᯠááœáá·áºáá±ážááẠ(áááºáá¯ááºáá±á¬ááºá
áá¬áá»á¬ážááᯠáá±á¬ááºááŒááºááŸáá·áºááá¯ááºáááº) ááŸáá·áº áá¡áá°á¡áá®ááŒáá·áº SambaHook
local ááá¯ááºáá
áºáá¯ááá¯á· ááœááºážááá¯ááºáááºá smb
-pointá
á áááºááŒáá¯ááºá¡á±á¬áºááá±áá¬ááᯠááœá²ááŒááºážá áááºááŒá¬ááŒááºážá
á¡á²áá«ááᯠáááºááá¯áááºáá®ážáá²ááá¯áá¬ááᯠáá»áœááºáá±á¬áºááá¯á· áá®ážá
ááºáá¬áá«ááŒá®á TelegramBotSendMessage
áá¯áẠcommons/operators.py
á¡ááŸááºááááºá¡á±á¬áºááá±áá¬ááŸáá·áºá¡áá°
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
á€ááœááºá Airflow ááŸá á¡ááŒá¬ážá¡áá¬á¡á¬ážáá¯á¶ážáá²á·ááá¯á·áááºá á¡áá¬á¡á¬ážáá¯á¶ážááẠá¡ááœááºááá¯ážááŸááºážáá«áááºá
- á០á¡ááœá±áááºáá¶áááºá
BaseOperator
á¡áááºážáááºáá±á¬ Airflow áá®ážááá·áºá¡áá¬áá»á¬ážááᯠá¡áá±á¬ááºá¡áááºáá±á¬áºáá±ážáá±á¬ (ááá·áºá¡á¬ážáááºáááºááá¯ááŒáá·áºáá«) - á¡ááœááºáá»á¬áž ááŒá±ááŒá¬áááºá
template_fields
Jinja ááẠáá¯ááºáá±á¬ááºááẠáááºáááá¯áá»á¬ážááᯠááŸá¬ááœá±áááºááŒá áºáááºá - ááŸááºáááºáá±á¬ ááŒááºážáá¯á¶ááŸá¯áá»á¬ážááᯠá
á®á
ááºáá±ážáá²á·áááºá
__init__()
ááá¯á¡ááºáá«á áá¯á¶áá±áááºááŸááºáá«á - ááá¯ážáá±ážá á¡á ááŒá¯ááŒááºážááá¯áááºáž áá»áœááºá¯ááºááá¯á· ááá±á·áá²á·áá«á
- áááºááá¯ááºáᬠáá»áááºááᯠááœáá·áºááá¯ááºáááºá
TelegramBotHook
áááºážáá¶á០client object áá áºáá¯ááᯠáááºáá¶áááŸááá²á·áááºá - Overridden (ááŒááºáááºáááºááŸááº) áááºážáááºáž
BaseOperator.execute()
á¡á±á¬áºááá±áá¬á áááºáá»áááºáá»áá¬áá±á¬á¡áá« Airfow ááẠáá¯ááºááŸá¯ááºááœá¬ážáááá·áºááẠ- áááºážááœáẠáá»áœááºá¯ááºááá¯á·ááẠáá±á¬á·ááºá¡ááºáá¯ááºááẠáá±á·áá»á±á¬á·áᬠáááºááá¯ááºáá±á¬ááºáá»ááºááᯠá¡áá±á¬ááºá¡áááºáá±á¬áºáá«áááºá (áá»áœááºá¯ááºááá¯á· log in á á áá¬ážá¡á¬ážááŒáá·áºá áá»ááºáá»ááºážáááºáá«ástdout
Оstderr
- Airflow ááẠá¡áá¬á¡á¬ážáá¯á¶ážááᯠááŒá¬ážááŒááºá ááŸááŸáá áá¯ááºááá¯ážááŒá®áž ááá¯á¡ááºáá«á ááŒáá¯ááœá²ááœá¬ážáááºááŒá áºáááºá)
áá«ááá¯á·ááŸá¬ áá¬ááœá±ááŸááá² ááŒáá·áºáá¡á±á¬áẠcommons/hooks.py
. ááá¯ááºááááááá¯ááºážá áá»áááºááá¯ááºááá¯ááº-
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
áá®áá±áá¬ááŸá¬ áá¬ááŸááºážááŒáááŸááºážáá±á¬áẠááááá±á¬á·áá°ážá á¡áá±ážááŒá®ážáá²á· á¡áá»ááºááœá±ááᯠááŸááºáá¬ážáá¬ážáá«á·áááºá
- áá»áœááºá¯ááºááá¯á·ááẠá¡ááœá±áááºáá¶ááŸá¯á ááŒááºážáá¯á¶ááŸá¯áá»á¬ážááᯠá
ááºážá
á¬ážááŒáá·áºáá«- ááá
á¹á
á¡áá»á¬ážá
á¯ááœáẠáááºážááẠáá
áºáá¯ááŒá
áºáááº-
conn_id
; - á
á¶áááºážáááºážáá»á¬ážááᯠááá¬ááá¬ážááŒááºáž- áá»áœááºá¯ááºááẠááááááá¯ááºááᯠááá·áºáááºáá¬ážáááºá
get_conn()
áá»áááºáááºááŸá¯ parameters áá»á¬ážááᯠáá¬áááºááŒáá·áº ááá°ááŒá®áž á¡ááá¯ááºážááᯠááá°áá«áextra
(áá«á JSON á¡ááœááºáá áºáá¯áá«)á áá»áœááºá¯áẠ(áá»áœááºá¯ááºáááá¯ááºááá¯ááºááœáŸááºááŒá¬ážáá»ááºá¡á) ááœáẠTelegram bot ááá¯áááºááᯠááá·áºááœááºážáá²á·áááº-{"bot_token": "YOuRAwEsomeBOtToKen"}
. - áá«ááá¯á·áá²á· á¥ááá¬áá
áºáá¯ááᯠáá«áááºáá®ážáááºá
TelegramBot
áááá»áá±á¬ ááá¯áááºáá áºáá¯áá±ážáááºá
áá«áá«áá²á á¡áá¯á¶ážááŒá¯ááŒá®áž áá»áááºáá
áºáá¯á០client áá
áºáá¯ááᯠáááºáááá¯ááºáááºá TelegramBotHook().clent
ááá¯á·ááá¯áẠTelegramBotHook().get_conn()
.
ááá¯ááºááá¯áááá¡ááá¯ááºážááẠTelegram REST API á¡ááœáẠmicrowrapper áá
áºáá¯ááŒá¯áá¯ááºá áá°áá®áá±á¬ááœá²áá°ááŸá¯áááŒá
áºá
á±áááºá python-telegram-bot
sendMessage
.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))
ááŸááºáááºáá±á¬áááºážáááºážááŸá¬ á¡á¬ážáá¯á¶ážáá±á«ááºážááá·áºáááºááŒá áºáááº-
TelegramBotSendMessage
,TelegramBotHook
,TelegramBot
- plugin ááœááºá á¡áá»á¬ážáá°ááŸá¬ááá¯ááŸá±á¬ááºááŸá¯ááœááºááá·áºáá«á áááºážááᯠOpen Source ááá¯á·áá±ážáá«á
áá»áœááºá¯ááºááá¯á· á€á¡áá¬á¡á¬ážáá¯á¶ážááᯠáá±á·áá¬áá±á ááºá áá»áœááºá¯ááºááá¯á·á á¡á á®áááºáá¶á ᬠá¡ááºááááºáá»á¬ážááẠá¡á±á¬ááºááŒááºá áœá¬ áá¡á±á¬ááºááŒááºááá¯ááºáá² áá»ááºáááºááœáẠá¡ááŸá¬ážá¡ááœááºáž áááºáá±á·áá»áºáá áºáᯠáá±ážááá¯á·ááá¯ááºáá²á·áááºá ááŸá¬ážáá±áá¬áž á á áºáá±ážááŒáá·áºáááº...
áá»áœááºá¯ááºááá¯á·áááœá±ážááá±ážááœáẠáá
áºá
á¯á¶áá
áºáᯠáá»ááºááœá¬ážááẠá á¡á²áá« áá«ááá¯á·áá»áŸá±á¬áºááá·áºáá¬ážáᬠááá¯ááºáá°ážáá¬áž? á¡ááá¡áá»
ááááºážááŸá¬áá¬ážá
áá áºáá¯áá¯ááᯠááœá²áá»á±á¬áºáá±áááºááá¯á· áá¶á á¬ážááá«ááá¬ážá SQL Server á០áá±áá¬áá»á¬ážááᯠVertica ááá¯á· ááœáŸá²ááŒá±á¬ááºážáá±ážáááºáᯠáááááŒá¯áá¬ážáá¯á¶áááŒá®ážá ááá¯á·áá±á¬áẠáááºážááᯠáá°áᬠáá±á«ááºážá ááºá០áááºááá¯ááºáááºáá²á·á
á€áááºá ááºáá¯ááºáá¬ááŸá¯ááẠáááºááœááºáá»ááºááŸáááŸáá áá»áœááºá¯ááºááẠááá·áºá¡ááœáẠáá±á«áá¬áá¡áá¯á¶ážá¡ááŸá¯ááºážá¡áá»áá¯á·ááᯠááá¯ážááá¯ážááŸááºážááŸááºážáá±á¬áºááŒááááºááŒá áºáá«áááºá ááá¯áááºááá¯ááá¯ááœá¬ážááá¯ááºáááºá
áá»áœááºá¯ááºááá¯á·áá¡á á®á¡á ááºááŸá¬ á€á¡áá¬ááŒá áºáááº-
- ááá¯ááºážáá¯ááºáá«á
- á¡áá¯ááºáá»á¬ážááᯠáááºáá®ážáá«á
- á¡áá¬á¡á¬ážáá¯á¶ážá áááºáá±á¬ááºááŸáá²ááá¯áᬠááŒáá·áºááá¯ááºáá«á
- ááŒáá·áºá áœááºááẠá ááºááŸááºáá¶áá«ááºáá»á¬ážááᯠáááºááŸááºáá«á
- SQL Server ááŸáá±áá¬ááá¯ááá°áá«á
- áá±áá¬ááᯠVertica ááœááºááá·áºáá«á
- á á¬áááºážááá¬ážáá»á¬ážá á¯áá±á¬ááºážáá«á
áá«ááŒá±á¬áá·áº áá®á¡áá¬á¡á¬ážáá¯á¶ážááᯠá¡áá±á¬ááºá¡áááºáá±á¬áºááá¯ááºááá¯á·á áá»áœááºáá±á¬áºááá¯á·áá²á· á¡áá±ážá¡ááœáŸá¬ážáááºááá¯ážááŸá¯áá
áºáᯠáá¯ááºáá²á·áááºá docker-compose.yml
:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.py
á¡á²áá®ááŸá¬áááº
- á¡áááºááŸááºá¡ááŒá
ẠVertica
dwh
áá¯á¶áá±áááºááŸááºáá»ááºá¡áá»á¬ážá á¯ááŸáá·áºá¡áá°á - SQL Server áá¯á¶ážáá»áá¯ážá
- áá»áœááºá¯ááºááá¯á·ááẠáá±á¬ááºááá¯ááºážááœáẠáá±áá¬áá±á·á
áºáá»á¬ážááᯠáá±áá¬á¡áá»áá¯á·ááŒáá·áº ááŒáá·áºááœááºážááẠ(áááºááá·áºá¡ááŒá±á¡áá±ááœááºá០á
á°ážá
ááºážááá±áá«á
mssql_init.py
!)
áá»áœááºá¯ááºááá¯á·ááẠááááºá¡ááŒáááºááẠá¡áááºážáááºááá¯ááŸá¯ááºááœá±ážáá±á¬ command áá¡áá°á¡áá®ááŒáá·áº áá±á¬ááºážááœááºáá±á¬á¡áá¬á¡á¬ážáá¯á¶ážááᯠá áááºááá¯ááºáááº-
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
áá»áœááºá¯ááºááá¯á·á miracle randomizer ááŸáá¯ááºáá±ážáá±á¬á¡áá¬ááᯠáááºá¡áá¯á¶ážááŒá¯ááá¯ááºáá«áááºá Data Profiling/Ad Hoc Query
:
á¡áááááá±á¬á· á¡á²áá«ááᯠá¡áá²áááºááœá±ááᯠáááŒááá¯á·áá«á
á¡áá±ážá áááºáá±á¬áºááŒáá«á ETL á¡á ááºážá¡áá±ážáá»á¬áž ááá¯ááºáá°ážá á¡áá¬á¡á¬ážáá¯á¶ážá á¡áá±ážá¡ááœá²áá² ááá¯ááºáá°ážá áá«ááá¯á·á á¡ááŒá±áá¶áá áºáá¯áá¯ááºáááºá á¡á²áá«ááŸá¬ ááá¯ááºážáá¯ááºáá áºáá¯ááŸááááºá á¡áá¬á¡á¬ážáá¯á¶ážááᯠáááºá ááºáááºáá±áá»á¬áá²á· ááŒá¯á¶ááŒá®áž á¡áᯠáá«ááá¯áá¯ááºáááº-
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
á¡áá»áááºáá»ááŒá®á áá»áœááºá¯ááºááá¯á·ááá±áá¬ááá¯á á¯áá±á¬ááºážáá«á áá«ááá¯á·á á¬ážááœá²áá áºáá¬ááœá²ááá±á á¡ááœááºááá¬á០áááºááá±á¬ááºáá±á¬ ááá¯ááºážáá»á¬ážá¡áá°á¡áá®ááŒáá·áº á€á¡áá¬ááᯠáá¯ááºááŒáá«á áá¯á·á
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
- áá»áááºáá
áºáá¯áá¡áá°á¡áá®ááŒáá·áº Airflow ááŸáááŸááááºá
pymssql
-áá»áááºáááºáá«á - áá±á¬ááºážááá¯ááŸá¯ááœáẠáááºá áœá²áá¯á¶á á¶ááŒáá·áº ááá·áºáááºáá»ááºáá áºáá¯ááᯠá¡á á¬ážááá¯ážááŒáá«á áá¯á· - áááºážááᯠáá¯á¶á á¶ááœááºá¡ááºáá»ááºááŒáá·áº áá¯ááºáá±á¬ááºááŸá¯áá²ááá¯á· ááá·áºááœááºážáááºááŒá áºáááºá
- áá»áœááºá¯ááºááá¯á·ááá±á¬ááºážááá¯ááŸá¯ááᯠáá»áœá±ážááœá±ážááŒááºážá
pandas
áá«ááá¯á·ááᯠáááºáá°áá°ááá²áDataFrame
- á¡áá¬áááºááœáẠáá»áœááºá¯ááºááá¯á·á¡ááœáẠá¡áá¯á¶ážáááºáá«áááá·áºáááºá
á¡á á¬ážááá¯ážáá¯á¶ážáá±áááºá
{dt}
áá±á¬ááºážááá¯ááŸá¯ááá·áºáááºáá»ááºá¡á á¬áž%s
áá«á ááá±á¬ááºážááá¯ážáá«áž Pinocchio ááŒá±á¬áá·áºááá¯ááºáá²ápandas
áááá¯ááºááœááºááá¯ááºáá°ážápymssql
ááŒá®ážáá±á¬á· áá±á¬ááºáá¯á¶ážáá áºáá¯ááᯠááŒááºáá áºááá¯ááºáááºáparams: List
áá°ááááºááá¯áá»ááºáá±ááá·áºtuple
.
ááŒá¯á á¯áá°ááá¯áááºáž áááááŒá¯áá«ápymssql
áá°á·ááᯠááá±á¬ááºáá¶áá±á¬á·áá°ážááá¯á· áá¯á¶ážááŒááºááŒá®áž ááœááºááœá¬ážááá¯á· á¡áá»áááºáááºááŒá®ápyodbc
.
Airflow ááẠáá»áœááºá¯ááºááá¯á·ááá¯ááºáá±á¬ááºáá»ááºáá»á¬ážá ááŒááºážáá¯á¶áá»ááºáá»á¬ážááᯠááŒáá·áºáááºážáá±ážáááºááᯠááŒáá·áºááŒáá«á áá¯á·á
áá±áá¬áááŸáááẠáááºáá¯ááºá áᬠá¡ááŒá±á¬ááºážáááŸááá«áá°ážá áá«áá±ááá·áº ááŒáá·áºááœááºážááŸá¯ á¡á±á¬ááºááŒááºáááºááá¯á· áá°ááá¬á áá°ážáááºážáá«áááºá áá«áá±ááá·áº áá«á á¡ááŸá¬ážááá¯ááºáá«áá°ážá ááŒá±á¬áº áá¬áá¯ááºáááá²! ááŒá®ážáá±á¬á· áá«ááá¬áá²á
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException
á¡ááŸá¬ážá¡ááœááºážáá»á¬ážáááŸááᯠAirflow ááá¯ááŒá±á¬áá±á¬áºáááºáž áá»áœááºá¯ááºááá¯á·ááẠáá¯ááºáá±á¬ááºááŸá¯ááᯠáá»á±á¬áºááœá¬ážáá«áááºá á¡ááºáá¬áá±á·á
áºááœáẠá¡á
áááºážáá±á¬áẠááá¯á·ááá¯áẠá¡áá®áá±á¬ááºá
áá¯áááºážááŸááááºááá¯ááºáá±á¬áºáááºáž áááºážáá±á¬ááºááŸááááºá
áá»áœááºáá±á¬áºááá¯á·áá²á·áá±áá¬ááá¯áá áºááŒáá«á áá¯á· áá±á¬áºáá¶áá»áá¯ážá á¯á¶:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])
á¡áááº:
- á¡á±á¬áºáá«áá°áá¬ážáá²á· áá±áá¬áá±á·á áºá
- áá»áœááºá¯ááºááá¯á·á áá±ááŒá®ážááŒááºážááá¯ááºáᬠá¡áá¯ááºáá® (ááœá²ááŒá¬ážáá«áááºá á¡áá¯ááºááá¯ááºážá¡ááœááº),
- á¡áááºážá¡ááŒá áºááŸáá·áº á¡ááŸá¬á ᬠID á០hash áá áºáᯠ- ááá¯á·ááŒá±á¬áá·áº áá±á¬ááºáá¯á¶ážáá±áá¬áá±á·á Ạ(ááá¬ážáá áºáá¯áá²ááá¯á· á¡áá¬á¡á¬ážáá¯á¶ážááᯠáá±á¬ááºážááá·áºááá·áº) ááœáẠáá°ážááŒá¬ážáá±á¬ á¡ááŸá¬á ᬠID áá áºáá¯ááŸááááºá
á¡áá¯á¶ážá áœááºáá±á¬á¡ááá·áºáá»ááºáááº- á¡áá¬á¡á¬ážáá¯á¶ážááᯠVertica áá²ááá¯á·áá±á¬ááºážááá·áºáá«á áá°ážáááºážáá¬áá áá«ááá¯áá¯ááºááá¯á· á¡á¶á·ááááºážáá²á· á¡áááá±á¬ááºáá¯á¶ážáááºážáááºážááœá±áá²ááá áºáá¯ááá±á¬á· CSV ááŸáááá·áºáá«á
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
- áá»áœááºá¯ááºááá¯á·ááẠá¡áá°ážáááºáá¶áá°á¡á¬áž ááŒá¯áá¯ááºáá±áá«áááºá
StringIO
. pandas
ááŒááºááŒááºáá¬áᬠáá«ááá¯á·ááá¯áá¬ážáááºáDataFrame
ááá¯á¶á á¶CSV
-ááá¯ááºážáá»á¬ážá- áá»áááºáá áºáá¯ááŒáá·áº áá»áœááºá¯ááºááá¯á·áá¡ááŒáá¯ááºáá¯á¶áž Vertica ááá¯á· áá»áááºáááºááŸá¯áá áºáá¯ááœáá·áºááŒáá«á áá¯á·á
- ááá¯á¡áá°á¡áá®ááŒáá·áº
copy()
áá»áœááºá¯ááºááá¯á·ááá±áá¬ááᯠVertika ááá¯á· ááá¯ááºááá¯ááºáá±ážááá¯á·áá«á
ááá¯ááºážáááºáá»áŸááŒáá·áºáá¬ážáááºááᯠáááá¯ááºáá¬áá¶á០áá°áááºááŒá áºááŒá®áž á¡áá¬á¡á¬ážáá¯á¶áž á¡áááºááŒá±ááŒá±á¬ááºáž á ááºááŸááºáááºáá±áá»á¬á¡á¬áž ááŒá±á¬ááŒáá«áááºá
session.loaded_rows = cursor.rowcount
session.successful = True
áá«áá«áá²á
áá±á¬ááºážáá»ááŸá¯ááœááºá áá»áœááºá¯ááºááá¯á·ááẠáá áºááŸááºáááºážáááºááᯠááá¯ááºááá¯ááºáááºáá®ážáá«áááºá á€ááœáẠáá»áœááºá¯ááºááẠáá»áœááºá¯ááºá¡á¬áž á ááºáá±ážáá±ážáá±ážáá áºáá¯á¶ážááᯠááœáá·áºááŒá¯ááá¯ááºáá«áááºá
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
áá«á¡áá¯á¶ážááŒá¯áá±áááº
VerticaOperator()
áá±áá¬áá±á·á áºá áá®áá¬áá áºáá¯ááŸáá·áº ááá¬ážáá áºáá¯ááᯠáá«áááºáá®ážááẠ(áááºážááá¯á·áááŸááá±ážáá«áá áá¯ááºáá«áááº)á á¡áááá¡áá»ááºááŸá¬ ááŸá®ááá¯á¡á¬ážáá¬ážááŸá¯áá»á¬ážááᯠááŸááºáááºá áœá¬á á®á ááºáááºááŒá áºáááºá
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> load
áááºááá»áŒážááŒá¯áá»
ááŒáœááºááá±ážá ááŒá±á¬ááẠááá¯ááºáá¬ážá
áá«á áá±á¬áá²ááŸá¬ ááŒá±á¬ááºá áá¬á¡áá±á¬ááºážáá¯á¶áž ááááá á¹áá¬ááºáá²ááá¯áᬠáááºážáá¯á¶áá¬ážá
Julia Donaldson, The Gruffalo
á¡áááºá áá»áœááºá¯ááºááá¯ááºáá±á¬áºááá¯ááºáááºáá»á¬ážááŸáá·áº áá»áœááºá¯ááºááœáẠááŒáá¯ááºááá¯ááºááŸá¯áá áºáá¯ááŸááá²á·áá«áá ETL áá¯ááºáááºážá ááºááᯠá¡áá»ááºá¡ááŒááºáááºáá®ážááŒá®áž áááºáá°á á áááºááá¯ááºáááºáááºážá áááºážááá¯á·ááẠáááºážááá¯á·á SSIS ááŸáá·áº mouse ááŸáá·áº áá»áœááºá¯ááºááᯠAirflow ááŒáá·áº ááŸáá¯ááºážááŸááºááŒáá·áºáá«á ááŒá¯ááŒááºááááºážááááºážááŸá¯ ááœááºáá°ááŸá¯ááá¯áááºáž ááŸáá¯ááºážááŸááºááŒáá·áºáááºááŒá áºáá«áááº... ááá¯ážá áá«áá°ááá¯á·ááᯠá¡áááºáááºááŸá¬ á¡ááá¯ááºáá°áááºááá¯áᬠáááºážááá±á¬áá°áááºáááºáááºá
áááºážáááºážáá±ážáá±ážáááºáááºááŒá±á¬áááẠApache Airflow - áááá¯ááááºáá¯ááºáá¯á¶á á¶áá²á· áá¯ááºáááºážá ááºááœá±ááᯠáá±á¬áºááŒááŒááºážá¡á¬ážááŒáá·áº - áá«á·á¡áá¯áẠááá¯ááŒá®áž ááá¯á¡áááºááŒá±ááŒá®áž áá»á±á¬áºá áá¬áá±á¬ááºážáááºá
áááºážá á¡ááá·áºá¡áááºáááŸá áá»á²á·ááœááºááá¯ááºááŸá¯ááẠááááºá¡ááºáá»á¬ážááŸáá·áº áá»á²á·ááœááºááá¯ááºááŸá¯á¡áá±á«áº ááœááºážá¡á¬ážáá±ážááŸá¯ááá¯á·ááŒá±á¬áá·áº ááá·áºá¡á¬áž áá±áá¬ááá¯ááºážáá®ážáá«ážááœáẠAirflow ááᯠá¡áá¯á¶ážááŒá¯ááẠá¡ááœáá·áºá¡áá±ážááᯠáá±ážáááº- áá±áá¬á á¯áá±á¬ááºážááŒááºážá ááŒááºáááºááŒááºážááŸáá·áº áá¯ááºáá±á¬ááºááŒááºáž áá¶ááá¬áá áºáá¯áá¯á¶ážááœááºáááºá áá¯á¶ážáá»á¶ááœáŸááºáááºááŒááºážááœááºááẠ(á¡ááºá¹áá«ááŒáá¯ááºááá¯á·á áááºáááºáž)á
á¡ááá¯ááºážáá±á¬ááºáá¯á¶ážá á¡ááá¯ážá¡áá¬ážááŸáá·áºá¡áá»ááºá¡áááº
áááºážá¡ááœáẠáá«ááá¯á·á á¯áá±á¬ááºážáá¬ážáá²á· ááœááºáá¯á¶áž
start_date
. áá¯ááºáááºá áá«á áá±ááᶠmeme ááŒá áºáá±áá«ááŒá®á Doug áá¡áááá¡ááŒááºážá¡áá¯á¶ááŸáááá·áºstart_date
á¡á¬ážáá¯á¶áž pass. á¡ááá¯áá»á¯ááºá¡á¬ážááŒáá·áº áááºááŸááºááá»áŸááºstart_date
áááºááŸááááºá áœá²ááŸáá·áºschedule_interval
- áá áºáá±á·á ááá¯á·áá±á¬áẠDAG ááẠááááºááŒááºááœáẠá áááºáááºááŒá áºáááºástart_date = datetime(2020, 7, 7, 0, 1, 2)
ááŒá®ážáá±á¬á· ááŒá¿áá¬áááŸááá±á¬á·áá°ážá
áááºážááŸáá·áºáááºá ááºáá±áá±á¬ á¡ááŒá¬áž runtime error áá áºáá¯ááŸááá«áááº-
Task is missing the start_date parameter
Dag á¡á±á¬áºááá±áá¬ááŸáá·áº áá»áááºááẠáá±á·ááœá¬ážááŒá±á¬ááºáž áááŒá¬áá ááœáŸááºááŒáá±á¬á- á¡á¬ážáá¯á¶ážá ááºáá áºáá¯áááºážááŸá¬á áá¯ááºáá²á·á ááŸáá·áº á¡ááŒá±áá¶áá»á¬áž (Airflow ááá¯ááºááá¯ááºááŸáá·áº áá»áœááºá¯ááºááá¯á·á á¡áá±á«áºáá¶ááá¯ááºáž) ááŸáá·áº áááºáá¬áá¬áá áºáá¯á á¡áá»áááºááá¬ážááœá²áá°á á¡áá¯ááºááá¬ážáá»á¬ážá ááŒá®ážáá±á¬á· á¡áá¯ááºááŒá áºáá±ážáááºá ááá¯á·áá±á¬áº á¡áá»áááºááŒá¬áá¬áááºááŸáá·áºá¡áá»áŸá áááºáá±á¬ááºááŸá¯áá»á¬ážá¡ááœáẠáá¯ááºáá±á¬ááºá áá¬áá»á¬áž áá»á¬ážááŒá¬ážáá¬ááŒá®áž PostgreSQL ááẠá¡ááœáŸááºážááááºážááᯠ20 ms á¡á á¬áž 5 s ááœáẠá áááºáá¯á¶á·ááŒááºáá±á¬á¡áá«á áá»áœááºá¯ááºááá¯á· áááºážááá¯áá°á áááºáá±á¬ááºááœá¬ážáá²á·áááºá
- LocalExecutorá áá¯ááºáááºá áá«ááá¯á·á á¡á²áá«ááᯠááá¯ááºáá±áá¯ááºážá áá«ááá¯á· áá»á±á¬ááºáá²áá±á¬ááºáá±ááŒá®á LocalExecutor ááẠááá¯á¡áá»áááºá¡áá áá»áœááºá¯ááºááá¯á·á¡ááœáẠáá¯á¶áá±á¬ááºáá±ááŒá®ááŒá áºáá±á¬áºáááºáž ááá¯á¡áá»áááºááœáẠá¡áááºážáá¯á¶áž á¡áá¯ááºááá¬ážáá áºáŠážááŸáá·áº ááá¯ážáá»á²á·ááẠá¡áá»áááºáá»áá±á¬ááºááŒá®ááŒá áºááŒá®áž CeleryExecutor ááá¯á· ááŒá±á¬ááºážááœáŸá±á·ááẠáá»áœááºá¯ááºááá¯á· ááŒáá¯ážá á¬ážááááºááŒá áºáá«áááºá á ááºáá áºáá¯áááºážááœáẠááẠáááºážááŸáá·áºá¡áá¯ááºáá¯ááºááá¯ááºáá±á¬ááŒá±á¬áá·áºá áá¬áá¬áá áºáá¯áá±á«áºááœááºááẠCelery ááá¯á¡áá¯á¶ážááŒá¯ááŒááºážááᯠáááºááá·áºá¡áá¬áá០áááºááá·áºá á±áááºááá¯ááºáá±á
- á¡áá¯á¶ážáááŒá¯áá«á built-in tools áá»á¬áž:
- connections áááºáá±á¬ááºááŸá¯á¡áá±á¬ááºá¡áá¬ážáá»á¬ážááᯠááááºážáááºážáááºá
- SLA ááœááºážááœááºááŒááºážá á¡áá»áááºáá® áááŒá®ážááŒááºáá²á· á¡áá¯ááºááœá±ááᯠáá¯á¶á·ááŒááºááá¯á·á
- xcom metadata áááŸááºááŒááºážá¡ááœáẠ(áá»áœááºáá±á¬áºááŒá±á¬áá²á·áááºá metadata!) dag á¡áá¯ááºáá»á¬ážááŒá¬ážá
- áá±ážáẠá¡ááœá²áá¯á¶ážá á¬ážáá¯ááºááŒááºážá áá±á¬ááºážááŒá®á áá«áá¬ááŒá±á¬ááá¯ááºááá²á áá»áááºážááœá¬ážáá±á¬ áá¯ááºáá±á¬ááºá áá¬áá»á¬áž áááºáá«ááá²áá² ááŒá¯áá¯ááºááŒááºážá¡ááœáẠááááá±ážáá»ááºáá»á¬ážááᯠááá·áºááœááºážáá¬ážáá«áááºá ááᯠáá»áœááºá¯ááºáá¡áá¯áẠGmail ááœáẠAirflow á០á¡á®ážáá±ážááºáá±á«ááºáž 90k ááŸáááŒá®ážá web mail muzzle ááẠáá áºááŒáááºáá»áŸáẠ100 áá»á±á¬áºááᯠáá±á¬ááºáá°ááŒá®áž áá»ááºááẠááŒááºážááá¯áá¬ážáááºá
áá±á¬ááºááẠá¡áá¹ááá¬ááºáá»á¬áž-
Apache Airflow Pitfails
áá±á¬ááºááẠá¡ááá¯á¡áá»á±á¬áẠáááááá¬áá»á¬áž
áá»áœááºá¯ááºááá¯á·ááẠáá»áœááºá¯ááºááá¯á·ááŠážáá±á«ááºážááŸáá·áºáááºááŒáá·áºááá¯ááºáá² áá»áœááºá¯ááºááá¯á·ááŠážáá±á«ááºážááŸáá·áºááá¯áá¡áá¯ááºáá¯ááºááá¯ááºá á±áááºá¡ááœáẠAirflow ááẠáá»áœááºá¯ááºááá¯á·á¡ááœáẠá€á¡áá¬ááá¯ááŒááºáááºáá¬ážáááºá
REST API ááᯠ- áá°á·ááŸá¬ ááá¹áá¬ááá·áºáá²á· á¡áá±á¡áá¬ážááŸááá¯ááºážáá²á á¡áá¯ááºááá¯ááºá¡á±á¬áẠáá¬ážáá¬ážááá¯á·á áááºážááŸáá·áºá¡áá°á áááºááẠdags ááŸáá·áº áá¯ááºáá±á¬ááºá áá¬áá»á¬ážá¡ááŒá±á¬ááºáž á¡áá»ááºá¡áááºáá»á¬ážááá¯áá¬áá áá»áŸááºáá áºááŒááºáá áºáá¯á¡á¬áž á áááºáááºá DAG Run ááá¯á·ááá¯áẠáá±áá°ážáááºáá áºáá¯ááᯠáááºáá®ážááá¯ááºáááºáCLI - WebUI ááŸáá áºááá·áº á¡áá¯á¶ážááŒá¯ááẠá¡áááºáááŒá±áá¯á¶áá¬áá áá±áá¯áá»á¡á¬ážááŒáá·áº áááŸááá±á¬á·ááá·áº áááááá¬á¡áá»á¬ážá¡ááŒá¬ážááᯠá¡áááá·áºáá±ážá á¬ááŒá±á¬ááºážááŸáá áºááá·áº áááŸáááá¯ááºáááºá á¥ááá¬á¡á¬ážááŒááºá·:backfill
áá¯ááºáááºážáá±á¬ááºáá¬áá»á¬ážááᯠááŒááºáááºá áááºááẠááá¯á¡ááºáá«áááºá
á¥ááá¬á¡á¬ážááŒáá·áºá áá±á·áá¬áá¯á¶ážáááºáá°áá»á¬ážááẠáá¬áááŒá±á¬áááº- âá¡ááá¯ááá¯á·á áááºááá«áá® á áááºá០áá áááºá¡áá áá±áá¬áá»á¬ážááœáẠá¡áááá¹áá¬ááºáááŸááá±á ááŒááºáá«á ááŒááºáá«á ááŒááºáá«á ááŒááºáá«áâ ááŒá®ážáá±á¬á· áááºážá áá®ááᯠáá«ááá¬áá« áairflow backfill -s '2020-01-01' -e '2020-01-13' orders
- á¡ááŒá±áá¶áááºáá±á¬ááºááŸá¯-
initdb
,resetdb
,upgradedb
,checkdb
. run
áááºážááẠááá·áºá¡á¬áž instance áá¯ááºáá±á¬ááºá áá¬áá áºáá¯á¡á¬áž áá¯ááºáá±á¬ááºááá¯ááºááŒá®áž ááŸá®ááá¯ááŸá¯á¡á¬ážáá¯á¶ážááœáẠá¡ááŸááºáááá¯ááºáááºá ááá¯ááŸááá«ážá áááºáááºáááºážááá¯ááŸáááá·áº run ááá¯ááºáááºáLocalExecutor
áááá®á¡á á¯á¡áá±ážááŸááá»áŸááºáááºážá- áá±á¬áºáá±á¬áº áá°áá¬ááᯠáá¯ááºáááºá
test
ááŒá±á áœááºááŸá¬áááºáž áá¬á០ááá±ážáá°ážá connections
shell á០áá»áááºáááºááŸá¯áá»á¬ážááᯠá¡á á¯ááá¯ááºá¡ááŒá¯á¶ááá¯áẠáááºáá®ážááœáá·áºááŒá¯áááºá
python api - ááááºá¡ááºáá»á¬ážá¡ááœáẠáááºááœááºááŒá®áž áááºážááᯠáááºá¡áááºážáááºááŒáá·áº ááœáŸá±ááŸá±á¬ááºááŒááºážáááŒá¯áá² á¡ááŒááºá¡ááŸááºáááºáá¶ááŒááºážá áááºáá²áá±á¬áááºážáááºážááŒá áºáááºá áá«áá±ááá·áº áá«ááá¯á·ááᯠáááºáá°á áá¬ážááŸá¬áá²á/home/airflow/dags
ááŒá±ážipython
ááŸá¯ááºááœáá±áá±á¬á·ááá¬ážá á¥ááá¬á¡á¬ážááŒáá·áº áááºááẠá¡á±á¬ááºáá«áá¯ááºááŒáá·áº áááºááœááºááŸá¯á¡á¬ážáá¯á¶ážááᯠáá¯ááºáá°ááá¯ááºáááº-from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d)
- Airflow áááºáá¬áá±áá¬áá±á·á
áºááá¯á· áá»áááºáááºáá±áááºá áááºážááá¯á
á¬áá±ážááẠáá»áœááºá¯ááºáá¡ááŒá¶ááŒá¯áá¬ážáá±á¬áºáááºáž á¡áá»áá¯ážáá»áá¯ážáá±á¬áááá»áá±á¬áááºááá
áºáá»á¬ážá¡ááœáẠáá¯ááºáá±á¬ááºá
áá¬á¡ááŒá±á¡áá±áá»á¬ážááá¯ááá°ááŒááºážááẠAPIs áá»á¬ážááá¯á¡áá¯á¶ážááŒá¯ááŒááºážááẠáá»á¬ážá
áœá¬ááá¯ááá¯ááŒááºáááºááœááºáá°áá«áááºá
áá»áœááºá¯ááºááá¯á·áá¡áá¯ááºáá»á¬ážá¡á¬ážáá¯á¶ážááẠá¡á áœááºážá¡á áááŸááá±á¬áºáááºáž áá áºáá«áá áºáᶠááŒá¯ááºáá»ááá¯ááºááŒá®ážá áááºážááẠáá¯á¶ááŸááºááŒá áºáááºáᯠááá¯ááŒáá«á áá¯á·á ááá¯á·áá±á¬áº á¡áá»áá¯á·áá±á¬ ááááºááá¯á·ááŸá¯áá»á¬ážááẠáá¶ááááŒá áºááœááºááŸáááŒá®áž á á áºáá±ážááẠááá¯á¡ááºáááºááŒá áºáááºá
SQL ááááá¬ážáá«á
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
ááá¯ážáá¬áž
áá¯ááºáá«áááºá Google ááá¯ááºáá±ážáá²á· ááá·áºááºáááºáá¯áᬠáá»áœááºáá±á¬áºáá²á· bookmarks áá²á Airflow folder áá²á· á¡ááŒá±á¬ááºážá¡áá¬ááœá±áá«á
Apache Airflow á á¬ááœááºá á¬áááºáž - áá¯ááºáá«áááºá áá«ááá¯á·áá¯á¶ážááá±á ááááºá á á¬ááœááºá á¬áááºážá ááá¯á·áá±á¬áº ááœáŸááºááŒá¬ážáá»ááºááᯠáááºáá°áááºááááºážáá¡áá±á¬ááºážáá¯á¶ážá¡áá±á·á¡áá»ááºá· - á¡áááºážáá¯á¶ážá áááºáá®ážáá°áá»á¬ážáá¶á០á¡ááŒá¶ááŒá¯áá»ááºáá»á¬ážááᯠáááºáá«áAirflow UI - á¡á ááá¯ááºáž- áá¯á¶áá»á¬ážááœáẠá¡áá¯á¶ážááŒá¯áá°áá»ááºááŸá¬ááŒááºApache Airflow á á¡ááá ááá±á¬ááá¬ážáá»á¬ážááᯠáá¬ážáááºááŒááºážá - á¡áááºá (áá¯ááºááááº) áááºááẠáá»áœááºá¯ááºáá¶á០áá áºá á¯á¶áá áºáá¬ááᯠáá¬ážááááºáá«á á¡ááŒá±áá¶ááá±á¬ááá¬ážáá»á¬ážááᯠáá±á¬ááºážá áœá¬áá±á¬áºááŒáá¬ážáá«áááºáTianlong áááá±á¬á· â Airflow Server/Cluster áá áºáá¯áááºáá±á¬ááºáááºážáááºážááœáŸáẠ- Airflow á¡á á¯á¡áá±ážáá áºáᯠáááºááŸááºááŒááºážá¡ááœáẠá¡ááá¯áá»á¯á¶ážáááºážááœáŸááºáLyft ááœáẠApache Airflow ááᯠáá¯ááºáá±á¬ááºáá±áááºá - áá°áá®áá±á¬á áááºáááºá á¬ážá áá¬áá±á¬ááºážáá±á¬áá±á¬ááºážáá«ážá ááŒá áºááá¯ááºáááºááŸá¬ ááá¯á ááá¬ážáááºáá«áááŸáá·áº á¥ááá¬á¡áááºážáááºááŸááœá²áApache Airflow ááẠCelery áá¯ááºáá¬ážáá»á¬ážá¡áá±á«áº á¡áá¯ááºá¡ááá¯ááºáá»á¬áž ááŒáá·áºáá±áá¯á¶ - Celery ááŸáá·áºááœá²áááºáá¯ááºáá±á¬ááºááŒááºážá¡ááŒá±á¬ááºážáDAG ááẠApache Airflow ááœáẠá¡áá±á¬ááºážáá¯á¶áž á¡áá±á·á¡áá»áá·áºáá»á¬ážááᯠáá±ážáá¬ážááŒááºáž - áááºá áœá²á¡á á¬áž ID ááŒáá·áºáááºááŒááºážá á¡ááœááºááŒá±á¬ááºážááŒááºážá ááá¯ááºááœá²á·á ááºážáá¯á¶ááŸáá·áº á¡ááŒá¬ážá áááºáááºá á¬ážááœááºá¡áá¬áá»á¬ážá¡ááŒá±á¬ááºáž - á¡áá¯ááºáá»á¬ážááá¯á¶áá±á¬ááºááŒááºážá¡ááŒá±á¬ááºážáApache Airflow ááŸá ááŸá®ááá¯ááŸá¯áá»á¬ážá¡á¬áž á á®áá¶ááá·áºááœá²ááŒááºážá - áá»áœááºá¯ááºáá±á¬áºááŒáá²á·ááá·áº á¡áá¯ááºáá»á¬ážá ááŸá®ááá¯ááŸá¯ááŸáá·áº Trigger Ruleááá±áááºáá±ááœááº- áááºážáá²á· DAG áᬠá¡áá»áááºááá¬ážáá²á·áá±á¬ááºááœááºááŸá¬ ááŸááá±áá²á·á¡áá« - á¡á á®á¡á ááºááœá²áá°ááœáẠ"áááºááœááºáá¬ážááá·áºá¡ááá¯ááºážá¡áá¯ááºáá¯ááºáááº" á¡áá»áá¯á·ááá¯áá»á±á¬áºááœáŸá¬ážáááºážá áá¯á¶ážááŸá¯á¶ážááœá¬ážáá±á¬áá±áá¬ááá¯áááºááŒá®ážá¡áá¯ááºáá»á¬ážááá¯áŠážá á¬ážáá±ážáá¯ááºáá«áApache Airflow á¡ááœáẠá¡áá¯á¶ážáááºáá±á¬ SQL áá±ážááœááºážáá»á¬áž â Airflow áááºáá¬áá±áá¬á¡ááœáẠá¡áá¯á¶ážáááºáá±á¬ SQL áá±ážááŒááºážááŸá¯áá»á¬ážáApache Airflow ááŒáá·áº á¡áá¯ááºá¡ááœá¬ážá¡áá¬áá»á¬ážááᯠá áááºáááºáá®ážááá¯ááºáá«á - á áááºááŒáá¯ááºá¡á¬áá¯á¶áá¶áááááá¬áááºáá®ážááŒááºážááŸáá·áº áááºáááºá á¡áá¯á¶ážáááºáá±á¬á¡ááá¯ááºážáá áºáá¯ááŸááááºáPresto ááŸáá·áº Airflow ááŒáá·áº AWS ááœáẠFetchr Data Science Infra ááá¯áááºáá±á¬ááºááŒááºážá â Data Science á¡ááœáẠAWS ááœáẠá¡ááŒá±áá¶á¡áá±á¬ááºá¡á¡á¯á¶áááºáá±á¬ááºááŒááºážááŸáá·áºáááºáááºáá±á¬ á áááºáááºá á¬ážááœááºá¡ááá¯áá»á¯á¶ážááŸááºá á¯á7 Airflow DAGs ááᯠDebugging áá¯ááºáá±á¬á¡áá«ááœáẠááœá±á·ááá±á·ááŸááá±á¬ á¡ááŸá¬ážáá»á¬áž - áá¬áááºá¡ááŸá¬ážáá»á¬áž (áá áºá á¯á¶áá áºáá±á¬ááºá ááœáŸááºááŒá¬ážáá»ááºááᯠááááºááá±ážááá·áºá¡áá«)áApache Airflow ááᯠá¡áá¯á¶ážááŒá¯á á áá¬ážááŸááºááᯠááááºážáááºážááŒá®áž áááºáá¯á¶ážáá«á - ááẠConnections ááá¯áá¯á¶ážáá¯á¶ááŒáá·áº á áá¬ážááŸááºáá»á¬ážááᯠááááºážáááºážáá¬ážáá±á¬áºáááºáž áá°áá»á¬ážá ááŒá¯á¶ážáá±áá¯á¶áPython ááŸáá·áº Apache Airflow ááá¯á·á Zen - ááœááºááá¯ááºáá±á¬ DAG áááºááá·áºááá¯á·ááŒááºážá áá¯ááºáá±á¬ááºáá»ááºáá»á¬ážááœáẠá áá¬ážá ááºááááŒááºážá ááŸá®ááá¯ááŸá¯áá»á¬ážá¡ááŒá±á¬ááºáž áááºáá¬áááºáá¬ááŸáá·áº áá¯ááºáááºážá á¥áºáá»á¬ážááᯠáá»á±á¬áºááœá¬ážááŒááºážá¡ááŒá±á¬ááºážáAirflow- áá°áááááºážáá±á¬ á¡ááŒá¶áá¬ááºáá»á¬ážá ááŸáá·áºááœááºáá»á¬ážááŸáá·áº á¡áá±á¬ááºážáá¯á¶áž á¡áá±á·á¡áá»áá·áºáá»á¬áž - á¡áá¯á¶ážááŒá¯ááŸá¯ááŸáá·áº áááºáááºdefault arguments
Оparams
ááá°áá¬áá¯á¶á á¶áá»á¬ážá¡ááŒáẠVariables áá»á¬ážááŸáá·áº áá»áááºáááºááŸá¯áá»á¬ážáAirflow Scheduler ááᯠáááá¯ááá¯ááºáá¯ááºááŒááºážá - á á®á ááºáá°ááẠAirflow 2.0 á¡ááœáẠááŒááºáááºáá±áá¯á¶á¡ááŒá±á¬ááºáž áá¬ááºáááºážáDocker-compose ááœáẠCelery á¡áá¯ááºááá¬áž 3 áá±á¬ááºááŸáá·áºá¡áá° Apache Airflow - áá»áœááºá¯ááºááá¯á·á á¡á á¯á¡áá±ážááœáẠááŒáá·áºáá»ááºááŒááºážá¡ááŒá±á¬ááºáž á¡áááºážááẠáá±ááºáá±á¬ááºáá»áá±áá±á¬ áá±á¬ááºážáá«áždocker-compose
.4 Airflow Context ááá¯á¡áá¯á¶ážááŒá¯á Templating Tasks - áááºážáááááºáá»á¬ážááŸáá·áº á¡ááŒá±á¬ááºážá¡áᬠáááºááá·áºááá¯á·ááŒááºážááᯠá¡áá¯á¶ážááŒá¯á ááŒá±á¬ááºážáá²áá±áá±á¬ á¡áá¯ááºáá»á¬ážáAirflow ááŸá á¡ááŸá¬ážá¡ááœááºáž á¡ááŒá±á¬ááºážááŒá¬ážáá»ááºáá»á¬áž â á á¬ááá¯ááºááŸáá·áº Slack á០á á¶ááŸáá·áº á áááºááŒáá¯áẠá¡áááá±ážáá»ááºáá»á¬ážáAirflow á¡áá¯ááºáá¯á¶- áá»áá¯ááºážáá±á¬ááºááá«áá² ááŸá¯ááºááœá±ážáá±á¬ DAG áá»á¬áž - áá¯ááºáááºážáá¬áááºáá»á¬ážá áááºáááá¯áá»á¬ážááŸáá·áº XCom ááá¯ááœá²áá¯ááºááŒááºážá
ááŸáá·áº áá±á¬ááºážáá«ážááœáẠá¡áá¯á¶ážááŒá¯ááá·áº ááá·áºááºáá»á¬áž
áááºáááá¯á¡ááá¯ážá¡áá¬áž - áááºážáááááºáá»á¬ážááœáẠá¡áá¯á¶ážááŒá¯ááẠáá±áá¬áá°ááá¯ááºáááºáá¡ááŒá áºáá»á¬ážáá±á¬ á¡áá±á«ááºáá»á¬ážâáá±áááºáá±ááœáẠ- ááá¯ááºážáá»á¬ážááá¯áááºáá®ážáá¬ááœáẠá¡ááŒá áºáá»á¬ážáá±á¬á¡ááŸá¬ážáá»á¬ážápuckel/docker-airflow- Docker Apache Airflow -docker-compose
á ááºážáááºááŒááºážá á¡ááŸá¬ážááŒááºáááºááŒááºážááŸáá·áº á¡ááŒá¬ážá¡áá¬áá»á¬ážá¡ááœááºápython-telegram-bot/python-telegram-bot- áááºážááᯠáááŒááºážááá¯ááºáá²á· áá¯ááºááá¯ážááŸá¯áá áºáᯠáá¯ááºáá¬ážáááºá - Telegram REST API á¡ááœáẠPython wrapperá
source: www.habr.com