áááşášáááŹááŤá ááťá˝ááşáŻááşáááş Vezet ááŻáášáááŽáĄáŻááşá áŻá Analytics ááŹáááž ááąááŹáĄááşááťááşááŽáᏠDmitry Logvinenko ááźá áşááŤáááşá
ETL ááŻááşáááşá¸á ááşááťáŹá¸ ááąáŹáşááąáŹááşáááşáĄáá˝ááş áĄáśáˇááźáá˝ááşááąáŹááşá¸ááąáŹ ááááááŹáá áşááŻááźá áşáááˇáş Apache Airflow ááᯠáááˇáşáĄáŹá¸ ááźáąáŹááźááŤáááşá áááŻáˇááąáŹáş Airflow áááş á á˝ááşá áŻáśááážááˇáş áááşá áŻáśááŻáśá¸ááąáŹááźáąáŹááˇáş áááşáááş ááąááŹá áŽá¸áááşá¸áážáŻáá˝ááş áááŤáááşááąáŹáşáááşá¸ áááşá¸ááᯠáĄááŽá¸áááşááźááˇáşáážáŻáááˇáşááąáŹáşáááşá¸ áááşáááˇáşááŻááşáááşá¸á ááşáááŻáááᯠáĄááťáááşáĄááŤáĄáááŻááş á áááşááŻááşááąáŹááşáááşáážááˇáş áááşá¸áááŻáˇáááŻááşááąáŹááşáážáŻááᯠá áąáŹááˇáşááźááˇáşáááş áááŻáĄááşááŤáááşá
ááŻááşááŤáááşá ááŤááźáąáŹááźááŻáśáááşáááá˛á ááááŻááááşáážáŹ ááŻááşáá˝áąá áááşááŹá¸ááźááşááŹááşááŻáśáá˝áąáá˛áˇ áĄááźáśááźáŻááťááşáá˝áą áĄááťáŹá¸ááźáŽá¸ááŤáážáááŤáááşá

Airflow / Wikimedia Commons áá°ááąáŹ á
ááŹá¸ááŻáśá¸ááᯠGoogle áá˝ááş áá˝áąáˇááááşáááşá
ááŹááááŹ
ááááŤááşá¸
Apache Airflow áááş Django áážááˇáşáá°áááşá
- python áá˛áˇááąá¸ááŹá¸áááşá
- ááąáŹááşá¸áá˝ááşáá˛áˇ admin panel áá áşááŻáážáááŤáááşá
- áĄáááˇáşáĄáááşááážá ááťá˛áˇáááŻááşáááşá
- ááŹáááŹá ááźáŹá¸ááŹá¸ááąáŹ áááşáá˝ááşááťááşááźááˇáş ááźáŻááŻááşááŹá¸ááźááşá¸ááźá áşáááşá áĄáááĄááťáááŻáááąáŹáş (ááášááŽááŤáá ááąá¸ááŹá¸ááá˛áˇáááŻáˇ)á
- áĄáááˇáşáĄáááşááážá á ááşááťáŹá¸ááąáŤáşáá˝ááş áĄááŻááşááŻááşááźááşá¸áážááˇáş á áąáŹááˇáşááźááˇáşááźááşá¸ááŻááşáááşá¸ááŹáááşááťáŹá¸ (áááᎠ/ Kubernetes áĄááťáŹá¸áĄááźáŹá¸áážááˇáş áááˇáşáĄááá áááşá áááˇáşáááŻáá˝ááˇáşááźáŻáááş)
- áĄáá˝ááşáá˝ááşáá°ááąáŹ Python ááŻááşáážááąá¸ááŹá¸áááşáážááˇáşááŹá¸áááşáááş dynamic workflow ááťááŻá¸áááşáážááˇáşáĄáá°
- áážááˇáş áĄáááşáááˇáşááŻááşááŹá¸ááąáŹ áĄá áááşáĄáááŻááşá¸ááťáŹá¸áážááˇáş áĄáááşááŻááşááááşáĄááşááťáŹá¸ (áĄáá˝ááşáááŻá¸áážááşá¸ááąáŹ) ááᯠáĄááŻáśá¸ááźáŻá áááşáááˇáşááąááŹááąáˇá áşáážááˇáş API ááťáŹá¸áááŻáááᯠááťáááşáááşáááŻááşá áąááŤáááşá
ááťá˝ááşáŻááşáááŻáˇáááş á¤áá˛áˇáááŻáˇááąáŹ Apache Airflow áááŻáĄááŻáśá¸ááźáŻáááş-
- ááťá˝ááşáŻááşáááŻáˇáááş áĄááťááŻá¸ááťááŻá¸ááąáŹáááşá¸ááźá áşááťáŹá¸ááž áĄááťááşáĄáááşááťáŹá¸ááᯠá áŻááąáŹááşá¸ááŤáááş (ááťáŹá¸á á˝áŹááąáŹ SQL Server áážááˇáş PostgreSQL ááźá áşáááşááťáŹá¸á áĄááşááşáááşááá áşááťáŹá¸ááŤáááˇáş API áĄááťááŻá¸ááťááŻá¸á DWH áážááˇáş ODS áááŻáˇáá˝ááşáááş 1C) (ááťá˝ááşáŻááşáááŻáˇáá˝ááş Vertica áážááˇáş Clickhouse áážááááş)á
- áááşááąáŹááşáĄáááˇáşááźááˇáşáá˛á
cronODS áá˝ááş ááąááŹá áŻá ááşá¸áážáŻ ááŻááşáááşá¸á ááşááťáŹá¸ááᯠá áááşáᏠáááşá¸áááŻáˇá ááźáŻááźááşááááşá¸ááááşá¸áážáŻááťáŹá¸áááŻáááşá¸ á áąáŹááˇáşááźááˇáşáááşá
áááźáŹááąá¸ááŽáĄááá ááťá˝ááşáŻááşáááŻáˇááááŻáĄááşááťááşááťáŹá¸ááᯠ32 cores áážááˇáş 50 GB RAM ááŤáážáááąáŹ ááŹááŹáááşááąá¸áá áşááŻá ááźááˇáşáááşá¸ááąá¸áá˛áˇáááşá Airflow áá˝ááşá áááşá¸áááş áĄááŻááşááŻááşáááş-
- йОНоо 200 áááş (ááááşááąáŹáˇ ááťá˝ááşááąáŹáşáááŻáˇ áĄááŻááşáá˝áąááᯠááŻááşáááŻá¸ááŹá¸áá˛áˇ workflows)á
- áá áşááŻááťááşá¸á áŽáá˝ááşááťááşá¸ááťáž áĄááŻááş áá,
- á¤ááąáŹááşá¸ááźááşáážáŻáááş (ááťááşá¸ááťážáĄáŹá¸ááźááˇáşáááşá¸) á áááşáááş áá áşááŹááŽáá áşááŤ.
ááťá˛áˇáá˝ááşááŻáśáážááˇáş áááşáááşá áĄáąáŹááşáá˝ááş ááťá˝ááşáŻááş ááąá¸ááŤáááşá áááŻáˇááąáŹáş ááᯠááźáąáážááşá¸áááˇáş Ăźber-problem ááᯠáááşáážááşááźááŤá ááŻáˇá
áá°á SQL Server ááŻáśá¸ááŻáážáááźáŽá¸ áá áşááŻá áŽáá˝ááş ááąááŹááąáˇá áş 50 ááŤáážááááş - áááąáŹááťááşáá áşááŻááĽáááŹáĄááŽá¸ááŽá¸áá˝ááş áááşá¸áááŻáˇáá˝ááşáá°ááŽááąáŹáá˝á˛áˇá ááşá¸ááŻáś (mua-ha-ha)á áááŻáááŻáááşáážáŹ áá áşááŻá áŽáá˝ááş áĄáážáŹá áŹáááŹá¸áá áşááŻáážááááş (ááśááąáŹááşá¸áááşáážáŹá áááşá¸áá˝ááş áááŹá¸áá áşááŻáážááááşá áĄáááşááᯠáááşáááˇáşááŻááşáááşá¸áá˝ááşáááᯠáááˇáşáá˝ááşá¸áááŻááşáááşá) ááťá˝ááşáŻááşáááŻáˇáááş áááşááąáŹááşáážáŻáááşáááşááťáŹá¸ (áááşá¸ááźá áşááŹááŹá áĄáááşá¸áĄááźá áşááąááŹááąáˇá áşá ETL áĄááŻááş ID) áááŻááąáŤááşá¸áááˇáşááźááşá¸ááźááˇáş ááąááŹáááŻáá°áᏠáááşá¸áááŻáˇááᯠVertica áᯠááááşááŹá á˝áŹááźáąáŹááŤá
áá˝áŹá¸á ááŻáˇ!
áĄááá áĄáááŻááşá¸áááąáŹáˇ áááşáá˝áąáˇ (ááŽáĄááŻáᎠáĄáááşá¸áááş)á
ááŹááźáąáŹááˇáş ááŤáááŻáˇ (áááşá¸áá˛áˇ)
áá
áşáááşááźáŽá¸áá˝áąá ááźáŽá¸ááŹááąáŹáˇ áááŻá¸áááŻá¸áážááşá¸áážááşá¸áá˛á SQL-schik áááş ááŻáážáŹá¸áááşááŽáĄááąáŹááşá¸áááŻááşáá
áşááŻáá˝ááşá ááťá˝ááşáŻááşáááŻáˇáĄáá˝ááşááážááááŻááşáááˇáşááááááŹáážá
áşááŻáááŻáĄááŻáśá¸ááźáŻá ETL ááŻááşáááşá¸á
ááşááťáŹá¸ (ááąáŤáş) ááąááŹá
áŽá¸áááşá¸áážáŻááᯠáážááˇáşá
áŹá¸áá˛áˇáááş-
- Informatica ááŤááŤá
ááşáᏠ- áááşá¸ááááŻááşáááŻááşááŹáˇááşáá˛á áááşá¸ááááŻááşáááŻááşááŹá¸áážááşá¸ááźááˇáşá áĄáá˝ááşáĄááťááŻá¸ááźá
áşáá˝ááşá¸ááąáŹ áĄáá˝ááşááźááˇáşáá˝áŹá¸ááąáŹá
áá
áşá áá°áˇáá˛áˇ á
á˝ááşá¸ááąáŹááşáááŻááşáááş 1% ááᯠááŻááŹá¸ááááş ááŹá¸ááźá
áşáá˛áˇáááşá áĄáááşááźáąáŹááşáˇ? ááąáŹááşá¸ááźáŽá ááááĄááťááşá áááá ááźááˇáşáá˝ááşáážá
áşááťáŹá¸ááŽá áá
áşááąááŹááŹáážáŹ ááŽáĄááşááŹááąáˇá
áşá ááťá˝ááşáŻááşáááŻáˇááᯠá
áááşáááŻááşá¸áááŻááşáᏠáááĄáŹá¸ááźá
áşá
áąááŤáááşá ááŻááááĄááąáážááˇáşá ᤠcontraption áááş áĄáá˝ááşáááşá¸ááźááşááąáŹ ááŻááşáááşá¸á
ááşááťáŹá¸á ááąáŤáááźáŽá¸ááąáŹ áĄá
áááşáĄáááŻááşá¸ááᯠááźááşáááşáĄááŻáśá¸ááźáŻááźááşá¸áážááˇáş áĄááźáŹá¸ááąáŹ áĄáá˝ááşáĄááąá¸ááźáŽá¸ááąáŹ ááŻááşáááşá¸-áážááˇáşáá˝ááşááťáŹá¸áĄáá˝ááş ááŽáááŻááşá¸ááŻááşááŹá¸áááşá Airbus A380 / year á ááąáŹááşááśáá˛áˇáááŻáˇ ááŻááşááťá
ááááşáážááˇáş áááşáááşá ááťá˝ááşáŻááşáááŻáˇ ááŹááž áááźáąáŹáááŻááŤá
áááááŹá¸ááŤá áááşááŹá¸ááźááşááŹááşááŻáśáááş áĄáááş 30 áážá áşáĄáąáŹááş áá°ááťáŹá¸ááᯠáĄáááşá¸áááş áááááŻááşáááŻááşáááşá

- SQL Server ááąáŤááşá¸á
ááşá¸ááźááşá¸ ááŹáᏠ- ááťá˝ááşáŻááşáááŻáˇáááş ááťá˝ááşáŻááşáááŻáˇá á
áŽááśááááşá¸áĄáá˝ááşá¸ á
áŽá¸áááşá¸áážáŻáá˝ááş á¤áá˛ááąáŹáşááᯠáĄááŻáśá¸ááźáŻáá˛áˇáááşá áĄáážááşááąáŹáˇá ááťá˝ááşáŻááşáááŻáˇáááş SQL Server ááᯠáĄááŻáśá¸ááźáŻááŹá¸ááźáŽá¸ááźá
áşááźáŽá¸ áááşá¸á ETL ááááááŹááťáŹá¸ááᯠáĄááŻáśá¸áááźáŻááźááşá¸áááş áá
áşáááşá¸áááşá¸ááźááˇáş ááŻáášááááážáááąá áááşá¸áá˝ááşáĄááŹáĄáŹá¸ááŻáśá¸ááąáŹááşá¸áááş- áĄááşááŹááąáˇá
áşáážá
áşááŻá
ááŻáśá¸áááşáážááááşá áááŻá¸áááşáážáŻáĄá
áŽáááşááśá
áŹááťáŹá¸ááźá
áşáááş ... áááŻáˇááąáŹáşááťá˝ááşáŻááşáááŻáˇáááşááąáŹáˇááşáá˛ááŻááşááŻááşááťáŹá¸áááŻáážá
áşáááşááźáááşá á¤áĄáá˝ááşááźáąáŹááˇáşáááŻááşááŤá ááŹá¸áážááşá¸
dtsx(ááááşá¸áááşá¸áááş node ááťáŹá¸ááŤááąáŹ XML áá°áááş) ááťá˝ááşáŻááşáááŻáˇ áááşáááŻááşáááşá áááŻáˇááąáŹáş áĄáááşááźáąáŹááˇáşáááşá¸á ááŹááŹáá áşááŻááž áá áşááŻáááŻáˇ á áŹá¸áá˝á˛ááŹááąáŤááşá¸ááťáŹá¸á á˝áŹááᯠáá˝á˛áááşáááŻááşá áąáááˇáş Task Package áá áşááŻááᯠáááşáááŻáˇááźáŻááŻááşááááşáááşá¸á ááŻááşáááşá ááąáŹááşá áşáááŻááşáááŻáážáááşáááŻááşá áááşá¸áááşáááŻá¸áážá áşáááşáááą ááźáŻááşááťáá˝áŹá¸ááááˇáşáááşá ááŤááąáááˇáş ááąááťáŹááŹáááąáŹáˇ áááŻáááşáážááşááťááŻáśááŤáá˛á
ááŤáááŻáˇááąááťáŹááąáŤááşáá˝ááşáááşá¸áážáŹáááşá ááźá áşáááşáážááş ááŽá¸ááŤá¸ áááŻááşáááŻááşááąá¸ááŹá¸áá˛áˇ SSIS package generator áá áşááŻááąáŹááşááŹáááş..á
ááźáŽá¸ááąáŹáˇ áĄááŻááşáá áşáá áşááŻáážáŹáááşá ááźáŽá¸ááąáŹáˇ Apache Airflow á ááťá˝ááşááąáŹáˇáşááᯠááťáąáŹáşáá˝áŹá¸áááşá
ETL ááŻááşáááşá¸á ááşááąáŹáşááźááťááşááťáŹá¸áááş áááŻá¸áážááşá¸ááąáŹ Python ááŻááşááźá áşááźáąáŹááşá¸ ááťá˝ááşáŻááşáá˝áąáˇáážáááąáŹáĄááŤáá˝ááş ááťá˝ááşáŻááşáááş ááťáąáŹáşáá˝ážááşá á˝áŹ áááŻááşáá˛áˇááŤá á¤áááşáážáŹ ááąááŹá áŽá¸ááźáąáŹááşá¸ááťáŹá¸ááᯠááŹá¸áážááşá¸ááźáąáŹááşá¸áá˛ááźáŽá¸ áá˝á˛ááźáŹá¸áá˝áŹá¸ááŹá ááąááŹááąáˇá áşááŹááąáŤááşá¸ááťáŹá¸á á˝áŹááž áá áşáážááşáá áşááŻáááŻáˇ áá áşááŻáááşá¸áá˝á˛áˇá ááşá¸ááŻáśááŤáážáááąáŹ áááŹá¸ááťáŹá¸ááᯠ13â áááşááŹá¸ááźááşáá áşááŻáážááˇáşáá áşááŻáá˝á˛ áááŻáˇáááŻááş áážá áşááŻáá˝ááş Python ááŻááşáááá ášá ááźá áşááŹáá˛áˇáááşá
áĄá áŻáĄááąá¸ááᯠá áŻá ááşá¸ááźááşá¸á
ááŻáśá¸á áá°áááşáááşá¸ááᯠáá áŽá ááşááĄáąáŹááşá Airflow ááᯠáááşáááşááźááşá¸á áááşáá˝áąá¸ááťááşááŹá¸ááąáŹ ááąááŹááąáˇá áşá áááᎠáážááˇáş docks áá˝ááşááąáŹáşááźááŹá¸ááąáŹ áĄááźáŹá¸ááá ášá ááťáŹá¸áá˛áˇáááŻáˇ á¤ááąááŹáá˝ááş ááŻáśá¸áááááŹáááşáážáŹá¸ááąáŹáĄááŹááťáŹá¸áĄááźáąáŹááşá¸ áááźáąáŹááŤáážááˇáşá
á
ááşá¸áááşáážáŻáá˝áąááᯠááťááşááťááşá¸á
áááşáááŻááşá
áąáááŻáˇ ááťá˝ááşááąáŹáş ááŻáśááźááşá¸áá˝á˛áá˛áˇáááşá docker-compose.yml áááŻáĄáá˛áá˝ááş-
- áĄáážááşááááş ááźážááˇáşáááşááźááŤá
ááŻáˇ ááąá
áŽá¸ááźáąáŹááşá¸: áĄá
áŽáĄá
ááşáá˝á˛áá°á Webserverá áááşá¸áá˝ááˇáşáááş ááááŽáĄááŻááşááťáŹá¸ááᯠá
áąáŹááˇáşááźááˇáşáááş áááŻááąááŹáá˝ááş áážááˇáşáááşááąááááˇáşáááş (áááşá¸ááᯠáá˝ááşá¸áááŻáˇááŹá¸ááźáŽá¸ááźá
áşááąáŹááźáąáŹááˇáş ááźá
áşáááşá
apache/airflow:1.10.10-python3.7ááŤááąáááˇáş ááŤáááŻáˇ á áááşááááşá áŹá¸áá°á¸) - PostgreSQLAirflow áááş áááşá¸ááááşááąáŹááşáážáŻáĄááťááşáĄáááşááťáŹá¸ (áĄá áŽáĄá ááşáá˝á˛áá°ááąááŹá ááŻááşááąáŹááşáážáŻá áŹáááşá¸áĄááşá¸á áááşááźááˇáş) áááŻááąá¸ááŹá¸áááşááźá áşááźáŽá¸á Celery áááş ááźáŽá¸ááźáąáŹááşáááˇáşáĄááŻááşááťáŹá¸ááᯠáĄáážááşáĄááŹá¸ááźáŻáááşááźá áşáááşá
- RedisááááŽáĄáá˝ááş áĄááŻááşáá˝á˛á áŹá¸áĄááźá áş ááąáŹááşáá˝ááşáááˇáşá
- áááŻááşááśááś áĄááŻááşáááŹá¸áááŻááşáááŻááşááŻááşááąáŹááşááááˇáş ááŻááşáááşá¸ááŹáááşááťáŹá¸á
- áááŻááŤáááŻáˇ
./dagsdags áááąáŹáşááźááťááşáážááˇáşáĄáá° ááťá˝ááşáŻááşáááŻáˇááááŻááşááťáŹá¸ááᯠááąáŤááşá¸áááˇáşááŤáááşá áááşá¸áááŻáˇááᯠááťáśááąáŤáşáá˝ááş ááąáŹááşáá°áá˝áŹá¸áááşááźá áşááźáŽá¸á áááŻáˇááźáąáŹááˇáş áážáŹááťáąáážáŻáááŻááşá¸ááźáŽá¸ááąáŹááş áĄá áŻáĄááąá¸áá áşááŻááŻáśá¸ááᯠáážááˇáşáááşáááş ááááŻáĄááşááŤá
áĄááťááŻáˇááąááŹááťáŹá¸áá˝ááş áĽáááŹááťáŹá¸áážá ááŻááşááᯠááŻáśá¸ááááźáááŻááşáᲠ(á áŹááŹá¸áážáŻááşáá˝ááąá áąáááş)á áááŻáˇááąáŹáş áĄááťááŻáˇááąááŹááťáŹá¸áá˝ááş áááşá¸ááᯠááŻááşáááşá¸á ááşáá˝ááş áá˝ááşá¸ááśááŹá¸áááşá ááźáŽá¸ááźááˇáşá áŻáśááąáŹ áĄááŻááşááŻááşááá°ááŹááťáŹá¸ááᯠáááŻáážáąáŹááşááŻáśáá˝ááş áá˝áąáˇáááŻááşááŤáááşá .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokeráážááşááťááşáá˝áą:
- ááąá¸ááąá¸ ááááááşáá˛áážáŹ áá°ááááťáŹá¸áá˛áˇ ááŻáśááááşááᯠáĄáŹá¸áááŻá¸áááááşá - áĄá˛ááŤááᯠááąááťáŹá á áşááąá¸ááŤá áááˇáşáááážáŹ áááźáŹá¸ááŹááž ááááŻáĄááşááąáŹáˇááŹáááşá¸ ááźá áşáááŻááşááŤáááşá
- Airflow áááşáááşááťáŹá¸áĄáŹá¸ááŻáśá¸áážááááˇáşáᏠááážááááŻááşááŤáááşá
airflow.cfgááŤááąáááˇáşáááşá¸ ááťá˝ááşááąáŹáş áĄáá˝ááˇáşááąáŹááşá¸áá°ááźáŽá¸ áááŻá¸áááŻá¸áá˝áŹá¸áá˝áŹá¸ áĄááŻáśá¸ááťáá˛áˇáá˛áˇ áááşáááşá¸ááťááş ááááşá¸áážááşááťáŹá¸ ( developer ááťáŹá¸ ááťáąá¸áá°á¸áááşááŤáááş)á - áááŹááĄáááŻááşá¸á áááşá¸áááş ááŻááşááŻááşáážáŻáĄáááşáááˇáşáááŻááşááŤ- ááťá˝ááşáŻááşáááş áá˝ááşááááşááŹááťáŹá¸ááąáŤáşáá˝ááş áážááŻáśá¸ááŻááşááśááťáŹá¸ááᯠááááşááᏠááááˇáşááŹá¸áᲠááŻáśááźáŻáśááąá¸áĄáá˝ááş á áááşááááşá áŹá¸ááŤá ááŤááąáááˇáş ááťá˝ááşááąáŹáşáááŻáˇáá˛áˇ á ááşá¸áááşáá°áá˝áąáĄáá˝ááş áááˇáşááąáŹáşáá˛áˇ áĄááááˇáşááŻáśá¸ááᯠááťá˝ááşááąáŹáş ááŻááşáá˛áˇáááşá
- áážááşáááş:
- Dag áááŻááşáá˝á˛ááᯠáĄááťáááşáááŹá¸ááąá¸áá˝á˛áá°áážááˇáş áĄááŻááşáááŹá¸ááťáŹá¸ áážá áşáŚá¸á ááŻáśá¸ áááşááąáŹááşááźááˇáşáážáŻáááŻááşáááŤáááşá
- Third-party á áŹááźááˇáşáááŻááşááťáŹá¸áĄáŹá¸ááŻáśá¸áážááˇáş áĄáá°áá°áááşááźá áşáááş - áááşá¸áááŻáˇáĄáŹá¸ááŻáśá¸ááᯠáĄááťáááşáááŹá¸áá˝á˛áá°áážááˇáş áĄááŻááşáááŹá¸ááťáŹá¸ááźááˇáş á ááşááťáŹá¸áá˝ááş áááˇáşáá˝ááşá¸áááŤáááşá
áá˛á áĄááŻá áááŻá¸áážááşá¸ááŤáááşá
$ docker-compose up --scale worker=3áĄááŹáĄáŹá¸ááŻáśá¸áááşááŹááźáŽá¸ááąáŹááşá áááşáááş áááşáĄááşááŹááąáˇá áşááťáŹá¸ááᯠááźááˇáşáážáŻáááŻááşáááş-
- ááąá áŽá¸ááźáąáŹááşá¸:
- áááşá¸
áĄááźáąááśáááąáŹáááŹá¸
ᤠ"dags" ááťáŹá¸áĄáŹá¸ááŻáśá¸áá˝ááş áááşááŹáážááŹá¸ááááşááŤáá á¤áááşáážáŹ áĄááááŹááşáĄáááŻááźá áşáááşá
- Scheduler ááᯠ- á
ááşááŻááşááťáŹá¸ áááşáááşáá˛áᲠáĄááŻááşááŻááşá áá°áá
áşáŚá¸áááŻááşááąáŹ Airflow áá˝ááş áĄááąá¸áĄááŤááŻáśá¸ áŚá¸ááąá¸ááźá
áşáá°- áĄááťáááşáááŹá¸ááᯠá
áąáŹááˇáşááźááˇáşááźááşá¸á ááááşááááşááŻááşááźááşá¸á áĄááŻááşááťáŹá¸ááᯠááŻááşááąáŹááşááźááşá¸ááťáŹá¸ ááŻááşááąáŹááşáááşá
ááąááŻááťáĄáŹá¸ááźááˇáşá ááŹá¸áážááşá¸áĄááąáŹááşá¸ááťáŹá¸áá˝ááşá áá°áááş áážááşááŹááşáááŻááşáᏠááźáżááŹááťáŹá¸ (áááŻááşááŤá áááááąáˇááźááşá¸áááŻááşááąáŹáşáááşá¸ ááąáŤááşááźáŹá¸ááźááşá¸) áážáááźáŽá¸ áĄáá˝áąáĄáážá áşáááşáážááşááťááşáááş configuration áá˝ááşáááş ááťááşáážáááąááŤáááşá
run_duration- áááşá¸áááźááşáááşá áááşááťáááşááŹáá ááŤááąáááˇáş áĄááŻááąáŹáˇ áĄáŹá¸ááŻáśá¸áĄáááşááźáąáá˝áŹá¸ááŤááźáŽá - DAG (aka "dag") - "áá˝ážááşááźáŹá¸ááŹá¸áááˇáş acyclic ááááş"á áááŻáˇááąáŹáş áááŻáááŻáˇááąáŹ áĄáááášááŤááşáá˝ááˇáşáááŻááťááşáááş áá°áĄáááşá¸áááşááᯠááźáąáŹááźááááˇáşáááşá áááŻáˇááąáŹáş áĄáážááşááááşáá˝ááş áááşá¸áááş áĄááťááşá¸ááťááşá¸ áĄááźááşáĄáážááş áĄááťááŻá¸ááźáŻáááˇáş áĄááŻááşááťáŹá¸áĄáá˝ááş áá˝ááşááááşááŹáá
áşáᯠ(áĄáąáŹááşáá˝ááşááźááˇáşááŤ) áááŻáˇáááŻááş SSIS áážá Package áážááˇáş Informatica áážá Workflow áááŻáˇá analogue áá
áşááŻááźá
áşáááşá .
ááŹá¸ááťáŹá¸áĄááźááşá áááŻááşá¸ááŻááşááťáŹá¸ááŤáážááááŻááşááąáŹáşáááşá¸ áááşá¸áááŻáˇááᯠááťá˝ááşáŻááşáááŻáˇ áááąáŹááşáááŻááşááŤá
- DAG ááźáąá¸ - áááşá¸ááááŻááşáááŻááşáááşáážááşááŹá¸ááąáŹáááşáááŻá
áááşáááşá
execution_date. áá°ááŽááąáŹ áááŹáááŹááşááťáŹá¸áááş ááźááŻááşáá°áĄááŻááşááŻááşáááŻááşáááş (áááşáááŹáááşááťáŹá¸ááᯠáĄáŹá¸áááşá¸áĄáąáŹááşááźáŻááŻááşááŹá¸ááťážááş ááŻááşááŤáááş)á - áĄáąáŹáşáááąáᏠááááťááąáŹááŻááşááąáŹááşááťááşáá
áşááŻááŻááşááąáŹááşáááş ááŹáááşáážáááąáŹááŻááşáĄáááŻááşá¸áĄá
ááťáŹá¸ááźá
áşáááşá áĄáąáŹáşáááąááŹááŻáśá¸ááťááŻá¸áážááááşá
- áážáŻááşáážáŹá¸áážáŻááŤáááŻáˇáĄááźááŻááş
PythonOperatoráááşáááˇáş (áááŹá¸áááş) Python ááŻááşáááŻáááᯠááŻááşááąáŹááşáááŻááşáááşá - áá˝ážá˛ááźáąáŹááşá¸áá
áşááąááŹááž áá
áşááąááŹáááŻáˇ áááŻáˇááąáŹááşááąá¸ááąáŹ ááąááŹá
MsSqlToHiveTransfer; - áĄáŹááŻáśááśáááááᏠáĄááźáŹá¸áá
áşáááşáá˝ááşá áááşá¸áááş ááźá
áşáááşáá
áşááŻáááźá
áşáá˝áŹá¸ááŽáĄáá áááŻááşá¸áááąáŹááşáááşááŻááşááąáŹááşáážáŻááᯠááŻáśáˇááźááşáááş áááŻáˇáááŻááş áážáąá¸áá˝áąá¸á
áąáááşááźá
áşáááşá
HttpSensoráááşáážááşááŹá¸ááąáŹ áĄááŻáśá¸áĄáážááşááᯠáá˝á˛ááŻááşáááŻááşááźáŽá¸ áĄáááŻáážáááąáŹ ááŻáśáˇááźááşáážáŻááᯠá áąáŹááˇáşáááŻááşá¸ááąááťáááşáá˝ááş áá˝ážá˛ááźáąáŹááşá¸áážáŻááᯠá áááşááŤáGoogleCloudStorageToS3Operator. á á°á¸á ááşá¸áááŻá áááşá âááŹááźáąáŹááˇáşáá˛á ááąáŹááşááŻáśá¸áĄááąáá˛áˇá áĄáąáŹáşáááąááŹáážáŹ áááşááŤáááşááŤááŻááşáááŻááşááŤáááşáâ áááŻáˇááąáŹááş áááŻááşá¸ááśáˇááŹá¸ááąáŹ áĄáąáŹáşáááąááŹááťáŹá¸áážááˇáş áĄááŻááşááťáŹá¸ ááááşáááŻáˇááźááşá¸ ááážáá áąááąá¸á ááąáŹááşáá áşááźáááşáááźááŻá¸á áŹá¸áᎠáĄáŹááŻáśááśááááááŹáááş á áááşáááşá á á áşááąá¸ááźáŽá¸ ááąááŻáśá¸áááşá
- áážáŻááşáážáŹá¸áážáŻááŤáááŻáˇáĄááźááŻááş
- ááŻááşáááşá¸ - áĄááťááŻá¸áĄá áŹá¸ááá˝á˛ááźáŹá¸áᲠááźáąááźáŹááŹá¸ááąáŹ áĄáąáŹáşáááąááŹááťáŹá¸áĄáŹá¸ áĄááŻááşááŹáá°á¸áĄáááˇáşáááŻáˇ áááŻá¸ááźážááˇáşááąá¸áááşá
- áĄááŻááşáĽááᏠ- ááťáąáŹáşááźáąáá°-áĄááŻááşáááŹá¸ááťáŹá¸ááᯠáááŻááşáá˝á˛áááşáááş áĄáá˝áąáá˝áąá
áŽá
ááşáá°ááž ááŹáááşááťáŹá¸ááąá¸áááŻáˇáááş áĄááťáááşáááşááźáŽáᯠááŻáśá¸ááźááşááąáŹáĄá፠(ááťá˝ááşáŻááşáááŻáˇáĄááŻáśá¸ááźáŻááťážááş ááąááŹáážááşá
LocalExecutoráááŻáˇáááŻááş áááá ášá áááşáá˝ááş áĄááąá¸ááááşá¸ Node áá áşááŻááŽáááŻáˇCeleryExecutor) áááşá¸áááş áááşá¸áááŻáˇáĄáŹá¸ áááşá ááşáĄááźáąáŹááşá¸áĄááŹáá áşáᯠáááşáážááşááąá¸áááş (áááŻáááŻáááşáážáŹá ááááşá¸áážááşáĄá áŻáá áşáᯠ- ááŻááşááąáŹááşáážáŻááąáŹááşááťáŹá¸)á áĄááááˇáşááąá¸ááťááş áááŻáˇáááŻááş á áŻáśá ááşá¸áážáŻááŻáśá áśááťáŹá¸ááᯠááťá˛áˇáá˝ááşááźáŽá¸ áááşá¸áááŻáˇááᯠááąáŤááşá¸á ááşá¸ááąá¸áááşá
ááťá˝ááşáŻááşáááŻáˇáááş áĄááŻááşááťáŹá¸ááᯠááŻááşááąá¸ááŤáááşá
áŚá¸á á˝áŹá ááťá˝ááşáŻááşáááŻáˇá doug á ááąáá°ááťáĄá áŽáĄá áĽáşááᯠáĄááźááşá¸ááťááşá¸ááąáŹáşááźááźááŤá ááŻáˇá áááŻáˇááąáŹááş ááťá˝ááşáŻááşáááŻáˇáááş áĄááąá¸áĄáá˝á˛áááŻááşááąáŹ ááźáąáážááşá¸áááşá¸áĄááťááŻáˇááᯠáĄááŻáśá¸ááźáŻááŹá¸ááąáŹááźáąáŹááˇáş áĄááąá¸á áááşáĄááťááşáĄáááşááťáŹá¸ááᯠáááŻááááŻááááŻá¸á áĄááąá¸á áááşááąáˇááŹááźááˇáşááŤáááşá
áááŻáˇááźáąáŹááˇáşá áááşá¸ááĄáááŻá¸áážááşá¸ááŻáśá¸ááŻáśá áśááźááˇáşá áááŻáá˛áˇáááŻáˇááąáŹ ááŹá¸ááŻáśáááş á¤áá˛áˇáááŻáˇááźá áşááąááááˇáşáááş-
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)áĄááźáąáážáŹááźááˇáşááĄáąáŹááşá
- ááááŚá¸á á˝áŹ ááťá˝ááşáŻááşáááŻáˇáááş áááŻáĄááşááąáŹ libs ááťáŹá¸áážááˇáş áááşáá˝ááşá¸áááşá áĄááźáŹá¸áá áşááŻááŻ;
sql_server_ds- áList[namedtuple[str, str]]Airflow Connections áážááťáááşáááşáážáŻááťáŹá¸ááĄáááşááťáŹá¸áážááˇáşááťá˝ááşáŻááşáááŻáˇááááşá¸áááşáááŻáá°áááˇáşááąááŹááąáˇá áşááťáŹá¸ádag- ááąááťáŹááąáŤááşááźá áşááááşá ááťá˝ááşáŻááşáááŻáˇá doug áááźáąááźáŹááťááşglobals()áááŻááşáááş Airflow á áážáŹááá˝áąáˇáá°á¸á Doug ááááşá¸ ááźáąáŹáááŻááŤáááşá- áá°áˇááŹáááşá ááŹáá˛á
orders- áááŻáˇááąáŹááş á¤áĄáááşáááş áááşáĄááşááŹááąáˇá áşáá˝ááş ááąáŤáşááŹááááˇáşáááşá - áá°áááŻááşá áážá áşáááşááąáˇ ááááşá¸ááąáŤááşááž áĄááŻááşáááşá¸ááááşá
- áááˇáşáážááşá¸ááźáąáĄáŹá¸ááźááˇáş 6 ááŹááŽáááŻááşá¸ ááźáąá¸áááˇáşááŤáááşá
timedelta()áá˝ááˇáşáááźáŻcron-áááŻááşá¸0 0 0/6 ? * * *, for the less cool - ááźááŻááşáá˛áˇ expression áá áşááŻ@daily);
- áá°áˇááŹáááşá ááŹáá˛á
workflow()áĄááá áĄááŻááşááᯠááŻááşááááˇáşáááşá ááŤááąáááˇáş áĄáᯠáááŻááşáá°á¸á áááŻáĄááťáááşáá˝ááş ááťá˝ááşáŻááşáááŻáˇááĄááźáąáŹááşá¸áĄááŹááᯠáážááşáááşá¸áá˛áááŻáˇ á á˝ááˇáşáá áşáááŻááşááŤáááşá- áááŻáá˝ááş ááŻááşááąáŹááşá
ááŹááťáŹá¸ááᯠáááşááŽá¸ááźááşá¸á áááŻá¸áážááşá¸ááąáŹ áážáąáŹáşáááŹá
- ááťá˝ááşáŻááşáááŻáˇáááş ááťá˝ááşáŻááşáááŻáˇááááşá¸ááźá áşááťáŹá¸áážááááˇáş ááŻááşááąáŹááşááŤáááşá
- á
áááşáááŻááşááŤá
PythonOperatorááťá˝ááşáŻááşáááŻáˇá dummy ááᯠáĄááąáŹááşáĄáááşááąáŹáşáááˇáşáworkflow(). áĄááŻááşááá°á¸ááźáŹá¸ááąáŹ (áááşáĄáá˝ááşá¸) áĄáááşááᯠáááşáážááşá ááŹá¸ááᯠááťááşáážáąáŹááşáááş áááąáˇááŤáážááˇáşá áĄááśprovide_contextáá áşáááşá ááťá˝ááşáŻááşáááŻáˇ áááŻáá ááŻááş á áŻááąáŹááşá¸áĄááŻáśá¸ááźáŻááŹá¸ááąáŹ function áá˛áááŻáˇ ááąáŹááşáááş áĄááźáąáŹááşá¸ááźááťááşááťáŹá¸ááᯠááąáŹááşá¸áááˇáşááŤáááşá**context.
ááąáŹááąáŹáááşááąáŹáˇ ááŽááąáŹááşááŤáá˛á ááťá˝ááşáŻááşáááŻáˇááážááááş-
- web interface áá˝ááş dag áĄáá áşá
- áĄááźááŻááşááŻááşááąáŹááşáááˇáş áĄááŻááşááąáŤááşá¸ áá áşááŹáá˝á˛ (Airflowá Celery áááşáááşááťáŹá¸áážááˇáş ááŹááŹá á˝ááşá¸áááşá áá˝ááˇáşááźáŻááŤá)á
ááąáŹááşá¸ááźáŽá áááźáŽá

áážáŽáááŻáážáŻááťáŹá¸ááᯠáááşáá°á áááˇáşáá˝ááşá¸áááşáááşá¸á
á¤áĄááŹáĄáŹá¸ááŻáśá¸ááᯠáááŻá¸áážááşá¸á
áąáááşáĄáá˝ááş ááťá˝ááşáŻááş áážááˇáşá
áŹá¸áá˛áˇááŤáááşá docker-compose.yml ááŻááşááąáŹááşááąáááş requirements.txt node áĄáŹá¸ááŻáśá¸áá˝ááşá
áĄááŻááąáŹáˇ áá˝áŹá¸ááźáŽ-

ááŽá¸áááŻá¸ááąáŹááşá ááŻáááşá¸ááťáŹá¸áááş áĄááťáááşáááŹá¸áá˝á˛áá°ááž ááŻááşááąáŹááşááąáŹ áĄááŻááşááźá áşáááşááťáŹá¸ááźá áşáááşá
ááá áąáŹááˇáşááŤá áĄááŻááşáááŹá¸áá˝áąá áĄááŻááşáá˝áą áážáŻááşááŻááşáááşá

ááŻááşááŤáááş áĄá áááşá¸áá˝áą áᏠáá°áááŻáˇáá˛áˇ áĄááŻááşááᯠáĄáąáŹááşááźááşá á˝áŹ ááźáŽá¸ááźáąáŹááşáá˛áˇááź ááŤáááşá áĄááŽááąáŹááşáá˝áąá ááááşááĄáąáŹááşááźááşááŤáá°á¸á
á ááŹá¸áá ááşá ááťá˝ááşáŻááşáááŻáˇáááŻááşááŻááşáá˝ááş folder ááážáááŤá
./dagsá ááşááťáŹá¸ááźáŹá¸áá˝ááş áááşáá°ááźáŻáážáŻ ááážáá፠- áááŻááşá¸ááťáŹá¸ áĄáŹá¸ááŻáśá¸ ááŤáááşááŤáááşágitááťá˝ááşáŻááşáááŻáˇá Gitlab áá˝ááşá áážááˇáş Gitlab CI áááş ááąáŤááşá¸á ááşá¸áááˇáşáĄá፠á ááşááťáŹá¸áááŻáˇ áĄááşááááşááťáŹá¸ááᯠááźááˇáşááąááŤáááşámaster.
áááşá¸áĄááźáąáŹááşá¸áĄáááşá¸áááş
áĄááŻááşáááŹá¸áá˝áąá ááťá˝ááşááąáŹáşáááŻáˇáá˛áˇ áááşááťááşá áąáá˛áˇáĄááŹáá˝áąááᯠáá˝ááşáááŻááşááąááťáááşáážáŹ ááťá˝ááşááąáŹáşáááŻáˇááᯠáá áşááŻááŻááźááąá¸áááŻááşáá˛áˇ ááąáŹááşáááşááááááŹáá áşáᯠ- Flower ááᯠáááááááŻááşááźááĄáąáŹááşá
áĄááŻááşáááŹá¸ ááŻáśáážááşááťáŹá¸áááŻááşáᏠáĄááťááşá¸ááťáŻááş áĄááťááşáĄáááşááŤáážáááąáŹ áááá áŹááťááşáážáŹ

áĄááŻááşáá˝áŹá¸áá˛áˇáááˇáş áĄááŻááşááťáŹá¸á፠áĄááźááşá¸áááşááŻáśá¸ á áŹááťááşáážáŹ

ááťá˝ááşáŻááşáááŻáˇá áá˝á˛á áŹá¸á áĄááźáąáĄááąáážááˇáş áĄááťááşá¸ááŻáśá¸ á áŹááťááşáážáŹ

áĄááąáŹááşáááŻáśá¸á áŹááťááşáážáŹáážáŹ áĄááŻááşáĄááźáąáĄááąááááşááťáŹá¸áážááˇáş áááşá¸áááŻáˇáááŻááşááąáŹááşááťáááşááťáŹá¸ááŤáážááááş-

ááťá˝ááşáŻááşáááŻáˇáááş áááşááŻááşáááşáááŻá¸ááᯠáááşá¸áá˝ááşáááşá
ááŤááž áĄááŻááşáá˝áą ááźáŽá¸áá˝áŹá¸ááźáŽá áááşááŹááá°áá˝áąááᯠáááşáá˝áŹá¸áááŻááşáááşá

áĄááźáąáŹááşá¸áááşá¸áá áşááŻáááŻááşáá áşááŻááźáąáŹááˇáş áááşááŹááá°áĄááťáŹá¸áĄááźáŹá¸áážááá˛áˇáááşá Airflow ááážááşáááşááąáŹáĄááŻáśá¸ááźáŻáážáŻááá ášá áá˝ááşá á¤á ááŻáááşá¸ááŻáśááťáŹá¸áááş data ááťáŹá¸ááąááťáŹááąáŤááşááąáŹááşáááŹááźáąáŹááşá¸ááąáŹáşááźáááşá
áážááşáááşá¸ááᯠááźááˇáşáážáŻááźáŽá¸ ááťáááşá¸áá˝áŹá¸ááąáŹ ááŻááşááąáŹááşá ááŹááźá áşáááşááťáŹá¸ááᯠááźááşáááşá áááşáááş áááŻáĄááşáááşá
áááşáááˇáşá ááŻáááşá¸ááᯠáážáááşááźááşá¸ááźááˇáşá ááťá˝ááşáŻááşáááŻáˇáĄáá˝ááş ááážááááŻááşááąáŹ ááŻááşááąáŹááşááťááşááťáŹá¸ááᯠááťá˝ááşáŻááşáááŻáˇ áá˝áąáˇáááááˇáşáááş-

ááźáŻááşááťááŹááᯠáážááşá¸áĄáąáŹááşááŻááşáááŻáˇááááşá áááŻáááŻáááşáážáŹá áááŻááąááŹáá˝ááş áá áşá áŻáśáá áşáᯠááťááşáá˝ááşáá˝áŹá¸áááşááᯠááťá˝ááşáŻááşáááŻáˇááąáˇááŹá¸ááźáŽá¸ áá°ááŽááąáŹááŻááşááąáŹááşá ááŹáážáŹ áĄááťáááşáááŹá¸áá˝á˛áá°ááś ááąáŹááşáá˝áŹá¸áááşááźá áşáááşá

áĄááŽááąáŹááşá
ááŻáááşá¸áĄáŹá¸ááŻáśá¸ááᯠááąáŹááşá
áşááźááˇáşááźáŻááŻááşááźááşá¸áááş áá°ááŹá¸áááşááźááşá¸áááŻááşááźáąáŹááşá¸ áážááşá¸áážááşá¸áááşá¸áááşá¸ááááááş - áááşá¸áááş Airflow ááž ááťá˝ááşáŻááşáááŻáˇááťážáąáŹáşáááˇáşááŹá¸áááˇáşáĄááŹáááŻááşááŤá áááŹááĄáŹá¸ááźááˇáşá ááťá˝ááşáŻááşáááŻáˇáá˝ááş áĄá
áŻáááŻááşáĄááźáŻáśáááŻááş ááťááşááŽá¸áááŻááşááąáŹ áááşáááşááťáŹá¸áážááááşá Browse/Task Instances

áĄááŹáĄáŹá¸ááŻáśá¸ááᯠáá áşááźááŻááşáááş áá˝áąá¸ááťááşááźáŽá¸ ááŻááááŻáˇ ááźááşáááşáááşáážááşááźááŤá ááŻáˇá áážááşáááşáááˇáşáĄááŹááᯠáážáááşááŤ-

áááˇáşáážááşá¸ááąá¸ááŻááşááźáŽá¸ááąáŹááşá ááťá˝ááşáŻááşáááŻáˇáááášáá áŽááťáŹá¸áááş á¤ááŻáśá áśáĄáááŻááşá¸ááźá áşáááş (áá°áááŻáˇá áŽá ááşááąá¸áááˇáşáĄá áŽáĄá ááşáážá°á¸ááᯠá áąáŹááˇáşááąááźáŽááźá áşáááş)á

ááťáááşáááşáážáŻááťáŹá¸á ááťáááşááťáŹá¸áážááˇáş áĄááźáŹá¸ááźáąáŹááşá¸áá˛áážáŻááťáŹá¸
ááąáŹááş DAG áááŻááźááˇáşáááŻáˇáĄááťáááşááąáŹááşááźáŽá update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ĐĐžŃпОда Ń
ĐžŃĐžŃио, ĐžŃŃĐľŃŃ ĐžĐąĐ˝ĐžĐ˛ĐťĐľĐ˝Ń"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ĐаŃаŃ, ĐżŃĐžŃŃпаКŃŃ, ĐźŃ {{ dag.dag_id }} ŃŃОниНи
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]áá°áááŻááşá¸ áĄá áŽáááşááśá ᏠáĄááşááááşááŻááşáá°á¸ááŤáááŹá¸á á¤áááşáážáŹ áá°áá ááąáŹááşáá áşááźáááşááźá áşáááş- ááąááŹááá°ááááˇáşááąááŹááž áááşá¸ááźá áşááťáŹá¸á áŹáááşá¸áá áşáᯠáážáááŤáááşá ááŹá¸ááááˇáşá áŹáááşá¸áá áşááŻáážááááşá áĄááŹáĄáŹá¸ááŻáśá¸ááźá áşâááťááşâáá˝áŹá¸áá˛áˇáĄá፠áá˝ááşá¸ááŽá¸áááŻáˇáâááąáˇááŤáá˛áˇ (ááŤá ááŤáááŻáˇáá˛áˇááááŻááşâááŤáá°á¸)á
áááŻááşáááŻááááşááźááşááźáŽá¸ ááááşááážáŹá¸áĄááŹáĄáá áşááťáŹá¸ááᯠááźááˇáşááźááŤá ááŻáˇá
from commons.operators import TelegramBotSendMessage- Unblocked áááŻáˇ áááşááąáˇááťáşáááŻáˇááźááşá¸áĄáá˝ááş ááąá¸áááşááąáŹ wrapper áá áşááŻááźáŻááŻááşááźááşá¸ááźááˇáş ááťá˝ááşáŻááşáááŻáˇááááŻááşáááŻááşáĄáąáŹáşáááąááŹááťáŹá¸ááźáŻááŻááşááźááşá¸ááž ááťá˝ááşáŻááşáááŻáˇáĄáŹá¸ áááşáááˇáşáĄááŹáááž ááŹá¸ááŽá¸ááááşáááşáááşáááŻááşááŤá (á¤áĄáąáŹáşáááąááŹáĄááźáąáŹááşá¸ááᯠáĄáąáŹááşáá˝ááş áááşáááşáá˝áąá¸áá˝áąá¸ááŤáááşádefault_args={}- dag áááş áááşá¸á áĄáąáŹáşáááąááŹáĄáŹá¸ááŻáśá¸ááᯠáá°ááŽááąáŹ áĄááźáąáŹááşá¸ááźááťááşááťáŹá¸ááᯠááźááˇáşááąáááŻááşáááşáto='{{ var.value.all_the_kings_men }}'- áĄáá˝ááştoááťá˝ááşáŻááşáááŻáˇáá˝ááş hardcoded ááŻááşáááşáááŻááşááąáŹáşáááşá¸á ááťá˝ááşáŻááşáááŻáá ááŻááşáááˇáşáá˝ááşá¸ááŹá¸áááˇáş áĄáŽá¸ááąá¸ááşá áŹáááşá¸ááŤáááˇáş Jinja áážááˇáş variable áááŻáˇááᯠáĄááŻáśá¸ááźáŻá áááŻááşá¸ááá áşááŻááşááąá¸ááŤáááşáAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS- áĄáąáŹáşáááąááŹá áááşáááşáĄááźáąáĄááąá ááťá˝ááşáŻááşáááŻáˇááĄááźáąáĄááąáá˝ááşá áážáŽáááŻáážáŻáĄáŹá¸ááŻáśá¸ ááźáąáááşáá˝áŹá¸áážáᏠáá°ááąá¸ááś á áŹáááŻáˇááŤáááşá áĄáąáŹááşááźááşá á˝áŹ;tg_bot_conn_id='tg_main'- áĄááźááşá¸áá˝áŹá¸áážáŻááťáŹá¸conn_idááťá˝ááşáŻááşáááŻáˇ áááşááŽá¸ááŹá¸ááąáŹ ááťáááşáááşáážáŻ ID ááťáŹá¸ááᯠáááşááśááŤáAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- ááźáŻááşááťááąáŹáĄááŻááşááťáŹá¸áážááážáᏠTelegram áážáááşááąáˇááşáťááťáŹá¸áááşááąá¸áá˝áŹááááˇáşáááşátask_concurrency=1- ááŻááşáááşá¸áá áşááŻá ááŻááşáááşá¸ááąáŹááşááŹááťáŹá¸á á˝áŹááᯠáá áşááźááŻááşáááşáááşá¸ ááŻááşááąáŹááşááźááşá¸ááᯠááťá˝ááşáŻááşáááŻáˇ ááŹá¸ááźá áşááŹá¸ááŤáááşá áááŻááşááŤáá ááťá˝ááşáŻááşáááŻáˇáááş áĄááťáŹá¸áĄááźáŹá¸ááᯠáááźááŻááşáááşáááşá¸ áá áşáá˝ážááşáááŻááşáááşááźá áşáááşáVerticaOperator(á áŹá¸áá˝á˛áá áşááŻáśá¸áááŻááźááˇáşááźááşá¸);report_update >> [email, tg]- áĄáŹá¸ááŻáśá¸VerticaOperatorá¤áá˛áˇáááŻáˇááąáŹ á áŹááťáŹá¸áážááˇáş áááşááąáˇááťáşááťáŹá¸ ááąá¸áááŻáˇááźááşá¸áá˝ááş ááąáŤááşá¸á ááşááŤá

áááŻáˇááąáŹáş áĄááźáąáŹááşá¸ááźáŹá¸áá° áĄáąáŹáşáááąááŹááťáŹá¸áá˝ááş ááá°ááŽááąáŹ áá˝ážááˇáşáááşáážáŻ áĄááźáąáĄááąááťáŹá¸ áážáááąáŹááźáąáŹááˇáşá áá áşááŻáᏠáĄááŻááşááŻááşááŤáááşá Tree View áá˝ááşá áĄááŹáĄáŹá¸ááŻáśá¸áááş áĄáááşá¸áááşááŹááźááşááŹáááş-

á ááŹá¸áĄáááşá¸áááşááźáąáŹááŤáááşá áááşáááᯠáá°áááŻáˇáá˛áˇ áá°áááşááťááşá¸ááťáŹá¸- ááááşá¸áážááşááťáŹá¸.
Macros áááş áĄááťááŻá¸ááťááŻá¸ááąáŹ áĄááŻáśá¸áááşááąáŹ áĄááťááşáĄáááşááťáŹá¸ááᯠáĄáąáŹáşáááąáᏠáĄááźááşá¸áĄááŻáśááťáŹá¸áĄááźá áş áĄá áŹá¸áááŻá¸áááŻááşááąáŹ Jinja placeholder ááźá áşáááşá áĽáááŹá á¤áá˛áˇáááŻáˇááąáŹá
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} context variable á áĄááźáąáŹááşá¸áĄááŹááťáŹá¸áááŻáˇ ááťá˛áˇáá˝ááşááŤáááşá execution_date ááŻáśá
áśáĄáá˝ááş YYYY-MM-DD: 2020-07-14. áĄááąáŹááşá¸ááŻáśá¸áĄáááŻááşá¸áážáŹ áááşá
ááşááááşá¸áážááşááťáŹá¸ááᯠáááşáááşáážááşáážááşááŻááşááąáŹááşá
áᏠáĽáááŹáá
áşáᯠ(áá
áşáááşááźááşáá˝ááşá¸áážá á
ááŻáááşá¸áá
áşááŻ) áá˝ááş áááşáááşááŹá¸ááźáŽá¸ ááźááşáááşá
áááşáááˇáşáĄá፠ááąááŹáááŻááşááąáŹááşáá°ááťáŹá¸áááş áá°ááŽááąáŹáááşáááŻá¸ááťáŹá¸áĄáá ááťá˛áˇáá˝ááşááŹáááşááźá
áşáááşá
ááŹáááşáá áşááŻá áŽáážá ááŻááşááąáŹááşááťááşáá áşááŻá áŽáážá Rendered áááŻááşááᯠáĄááŻáśá¸ááźáŻá áááşáážááşááŹá¸ááąáŹ áááşáááŻá¸ááťáŹá¸ááᯠááźááˇáşáážáŻáááŻááşááŤáááşá á¤áááşáážáŹ á áŹáá áşá áąáŹááşááąá¸áááŻáˇááźááşá¸á ááŹáááşááźá áşáááşá

áááşááąáˇááťáşáááŻáˇáá˛áˇáĄááŻááşáážáŹá

ááąáŹááşááŻáśá¸ááááŻááşááąáŹááŹá¸áážááşá¸áĄáá˝ááş built-in macro á áŹáááşá¸áĄááźááˇáşáĄá áŻáśááᯠá¤ááąááŹáá˝ááş ááááŻááşááŤáááş-
áááŻáˇáĄááźááşá plugins ááťáŹá¸ááĄáá°áĄááŽááźááˇáşá ááťá˝ááşáŻááşáááŻáˇáááş ááťá˝ááşáŻááşáááŻáˇááááŻááşáááŻááş macro áááŻááźáąááźáŹáááŻááşáááşá áááŻáˇááąáŹáşáááşá¸áááşáĄááźáŹá¸ááŹááşáááşá¸ááźá áşáááşá
ááźááŻáááşáááşáážááşááŹá¸áááˇáşáĄááŹááťáŹá¸áĄááźááşá ááťá˝ááşáŻááşáááŻáˇáááş ááťá˝ááşáŻááşáááŻáˇá variable ááťáŹá¸ááááşáááŻá¸ááťáŹá¸ááᯠáĄá
áŹá¸áááŻá¸áááŻááşáááş (áĄáááşááŻááşáá˝ááş áááşá¸ááᯠááťá˝ááşáŻááşáĄááŻáśá¸ááźáŻááŹá¸ááźáŽá¸ááźá
áşáááş)á áááşááŽá¸ááźááĄáąáŹááş Admin/Variables áĄááŹáážá
áşááŻ-

áááşáĄááŻáśá¸ááźáŻáááŻááşáááťáž-
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')áááşáááŻá¸áááş á ááąá¸áá áşáᯠááźá áşáááŻááşáááşá áááŻáˇáááŻááş JSON áááşá¸ ááźá áşáááŻááşáááşá JSON áá˝ááşá
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}áááŻááťááşááąáŹááŽá¸ááŽáááŻáˇ áááşá¸ááźáąáŹááşá¸ááᯠáĄááŻáśá¸ááźáŻááŤá {{ var.json.bot_config.bot.token }}.
á
ááŹá¸ááŻáśá¸áá
áşááŻáśá¸áááşá¸áá˛áˇ áááşááŹá¸ááźááşááŻáśáá
áşááŻáśááᯠááźááŤáááşá áááşáá˝ááşáážáŻááťáŹá¸. áĄááŹáĄáŹá¸ááŻáśá¸áááş á¤ááąááŹáá˝ááş áĄááźáąááśááźá
áşáááş- á
áŹááťááşáážáŹááąáŤáşáá˝ááş Admin/Connections ááťá˝ááşáŻááşáááŻáˇáááş ááťáááşáááşáážáŻáá
áşááŻááᯠáááşááŽá¸ááźáŽá¸á ááťá˝ááşáŻááşáááŻáˇá ááąáŹáˇááşáĄááşááťáŹá¸/á
ááŹá¸áážááşááťáŹá¸ááᯠááąáŤááşá¸áááˇáşáᏠáááŻááąááŹáá˝ááş áááŻáááŻááááťááąáŹ áááˇáşáááşááąáŹááşááťáŹá¸ááᯠáááˇáşááŤáááşá ááŽáááŻááťááŻá¸:

á
ááŹá¸áážááşááťáŹá¸ááᯠááŻááşáážááşááŹá¸áááŻááşáááş (áá°áááşá¸áááş áááŻááąááťáŹáááş) áááŻáˇáááŻááş ááťáááşáááşáážáŻáĄááťááŻá¸áĄá
áŹá¸ááᯠááťááşááŹá¸áááŻááşáááş (ááťá˝ááşááąáŹáşááźáŻááŻááşáá˛áˇáááˇáşáĄáááŻááşá¸á tg_main) - áĄáážááşáážáŹ áĄááťááŻá¸áĄá
áŹá¸ááťáŹá¸á
áŹáááşá¸ááᯠAirflow ááąáŹáşáááşááťáŹá¸áá˝ááş hardwired ááŻááşááźáŽá¸ áĄáááşá¸áĄááźá
áşááŻááşááťáŹá¸áá˛áááŻáˇ ááááşáᲠááťá˛áˇáá˝ááşáááá፠(ááŻááşááááş ááťá˝ááşáŻááş google ááž áá
áşá
áŻáśáá
áşáᯠáááŻááşááąáŹááşááŤáá ááťáąá¸áá°á¸ááźáŻá ááźááşááąá¸ááŤ)á áááŻáˇááąáŹáş ááááşáá
áşááťáŹá¸ááá°ááźááşá¸ááž ááťá˝ááşáŻááşáááŻáˇáĄáŹá¸ áááşáááˇáşáĄááŹááž ááŹá¸ááŽá¸áááŻááşáááşáááŻááşááŤá ááŹáááşá
áá°ááŽááąáŹáĄáááşááźááˇáş ááťáááşáááşáážáŻááťáŹá¸á
á˝áŹ ááźáŻááŻááşáááŻááşáááş- á¤ááá
ášá
áá˝ááşá áááşá¸áááşá¸ BaseHook.get_connection()áĄáááşááźááˇáşáááşáá˝ááşáážáŻááťáŹá¸ááážááá˝áŹá¸ááąáŹá ááťáááşá¸ ááŹáááşáĄááťááŻá¸ááťááŻá¸ááž ( Round Robin áááŻááŻááşááźááşá¸áááş áááŻáááŻáášáááážáááááˇáşáááşá áááŻáˇááąáŹáş Airflow developer ááťáŹá¸áá
áááşáá˝ááşááŹá¸áá˛áˇááźááŤá
ááŻáˇ)á
Variables ááťáŹá¸áážááˇáş Connections ááťáŹá¸áááş áĄáá˝ááşááąáŹááşá¸áá˝ááşááąáŹ ááááááŹááťáŹá¸ááźá áşáááşá áááŻáˇááąáŹáş áááşááťááşáááŽáááş áĄááąá¸ááźáŽá¸áááş- áááˇáşá áŽá¸áááşá¸áážáŻá áĄá áááşáĄáááŻááşá¸ááťáŹá¸ááᯠááŻááşáááŻááşáááŻááş ááááşá¸áááşá¸ááŹá¸ááźáŽá¸ áááşáááˇáşáĄáááŻááşá¸ááťáŹá¸ááᯠáááŻáážáąáŹááşáááşáĄáá˝ááş Airflow áááŻáˇ ááąá¸áááşáááşá¸á áá áşáááşáá˝ááşá UI áážáá áşáááˇáşá áĽáááŹá á áŹáááŻáˇááąáášááŹáá áşááŻá áááşáááŻá¸ááᯠááťááşááźááşá á˝áŹááźáąáŹááşá¸áá˛áááş áĄáááşááźáąáááŻááşáááşá áĄááźáŹá¸áá áşáááşáá˝ááşá áááşá¸áááşááťá˝ááşáŻááşáááŻáˇ (ááťá˝ááşáŻááş) áážáááşáážáŹá¸áááŻááąáŹ mouse áááŻáážáááşááźááşá¸áááŻáˇááźááşáá˝áŹá¸áá˛ááźá áşáááşá
ááťáááşáááşáážáŻááźááˇáş ááŻááşááąáŹááşááźááşá¸áááş áĄááŻááşááťáŹá¸áá˛ááž áá
áşááŻááźá
áşáááşá ááťáááş. ááąááŻááťáĄáŹá¸ááźááˇáşá Airflow ááťáááşááťáŹá¸áááş áááşá¸ááᯠááźááşááááşááąáŹááşáážáŻááťáŹá¸áážááˇáş á
áŹááźááˇáşáááŻááşááťáŹá¸áááŻáˇ ááťáááşáááşáááşáĄáá˝ááş áĄáážááşááťáŹá¸ááźá
áşáááşá áĽáááŹ- JiraHook Jira áážááˇáş áĄááźááşáĄáážááş ááŻáśáˇááźááşáááş ááťá˝ááşáŻááşáááŻáˇáĄáá˝ááş client áá
áşááŻááᯠáá˝ááˇáşááąá¸áááş (áááşááŻááşááąáŹááşá
ááŹááťáŹá¸ááᯠááąáŹááşááźááşáážááˇáşáááŻááşáááş) áážááˇáş ááĄáá°áĄááŽááźááˇáş SambaHook local áááŻááşáá
áşááŻáááŻáˇ áá˝ááşá¸áááŻááşáááşá smb-pointá
á áááşááźááŻááşáĄáąáŹáşáááąááŹááᯠáá˝á˛ááźááşá¸á áááşááźáŹááźááşá¸á
áĄá˛ááŤááᯠáááşáááŻáááşááŽá¸áá˛áááŻááŹááᯠááťá˝ááşááąáŹáşáááŻáˇ ááŽá¸á
ááşááŹááŤááźáŽá TelegramBotSendMessage
ááŻááş commons/operators.py áĄáážááşááááşáĄáąáŹáşáááąááŹáážááˇáşáĄáá°
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)á¤áá˝ááşá Airflow áážá áĄááźáŹá¸áĄááŹáĄáŹá¸ááŻáśá¸áá˛áˇáááŻáˇáááşá áĄááŹáĄáŹá¸ááŻáśá¸áááş áĄáá˝ááşáááŻá¸áážááşá¸ááŤáááşá
- ááž áĄáá˝áąáááşááśáááşá
BaseOperatoráĄáááşá¸áááşááąáŹ Airflow ááŽá¸áááˇáşáĄááŹááťáŹá¸ááᯠáĄááąáŹááşáĄáááşááąáŹáşááąá¸ááąáŹ (áááˇáşáĄáŹá¸áááşáááşáááŻááźááˇáşááŤ) - áĄáá˝ááşááťáŹá¸ ááźáąááźáŹáááşá
template_fieldsJinja áááş ááŻááşááąáŹááşáááş áááşááááŻááťáŹá¸ááᯠáážáŹáá˝áąáááşááźá áşáááşá - áážááşáááşááąáŹ ááźááşá¸ááŻáśáážáŻááťáŹá¸ááᯠá
áŽá
ááşááąá¸áá˛áˇáááşá
__init__()áááŻáĄááşááŤá ááŻáśááąáááşáážááşááŤá - áááŻá¸ááąá¸á áĄá ááźáŻááźááşá¸áááŻáááşá¸ ááťá˝ááşáŻááşáááŻáˇ áááąáˇáá˛áˇááŤá
- áááşáááŻááşáᏠááťáááşááᯠáá˝ááˇáşáááŻááşáááşá
TelegramBotHookáááşá¸ááśááž client object áá áşááŻááᯠáááşááśááážááá˛áˇáááşá - Overridden (ááźááşáááşáááşáážááş) áááşá¸áááşá¸
BaseOperator.execute()áĄáąáŹáşáááąááŹá áááşááťáááşááťááŹááąáŹáĄá፠Airfow áááş ááŻááşáážáŻááşáá˝áŹá¸ááááˇáşáááş - áááşá¸áá˝ááş ááťá˝ááşáŻááşáááŻáˇáááş ááąáŹáˇááşáĄááşááŻááşáááş ááąáˇááťáąáŹáˇáᏠáááşáááŻááşááąáŹááşááťááşááᯠáĄááąáŹááşáĄáááşááąáŹáşááŤáááşá (ááťá˝ááşáŻááşáááŻáˇ log in á á ááŹá¸áĄáŹá¸ááźááˇáşá ááťááşááťááşá¸áááşááŤástdoutиstderr- Airflow áááş áĄááŹáĄáŹá¸ááŻáśá¸ááᯠááźáŹá¸ááźááşá áážáážáá ááŻááşáááŻá¸ááźáŽá¸ áááŻáĄááşááŤá ááźááŻáá˝á˛áá˝áŹá¸áááşááźá áşáááşá)
ááŤáááŻáˇáážáŹ ááŹáá˝áąáážááᲠááźááˇáşááĄáąáŹááş commons/hooks.py. áááŻááşáááááááŻááşá¸á ááťáááşáááŻááşáááŻááş-
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientááŽááąááŹáážáŹ ááŹáážááşá¸ááźááážááşá¸ááąáŹááş áááááąáŹáˇáá°á¸á áĄááąá¸ááźáŽá¸áá˛áˇ áĄááťááşáá˝áąááᯠáážááşááŹá¸ááŹá¸ááŤáˇáááşá
- ááťá˝ááşáŻááşáááŻáˇáááş áĄáá˝áąáááşááśáážáŻá ááźááşá¸ááŻáśáážáŻááťáŹá¸ááᯠá
ááşá¸á
áŹá¸ááźááˇáşááŤ- ááá
ášá
áĄááťáŹá¸á
áŻáá˝ááş áááşá¸áááş áá
áşááŻááźá
áşáááş-
conn_id; - á
áśáááşá¸áááşá¸ááťáŹá¸ááᯠáááŹáááŹá¸ááźááşá¸- ááťá˝ááşáŻááşáááş áááááááŻááşááᯠáááˇáşáááşááŹá¸áááşá
get_conn()ááťáááşáááşáážáŻ parameters ááťáŹá¸ááᯠááŹáááşááźááˇáş ááá°ááźáŽá¸ áĄáááŻááşá¸ááᯠááá°ááŤáextra(ááŤá JSON áĄáá˝ááşáá áşááŻááŤ)á ááťá˝ááşáŻááş (ááťá˝ááşáŻááşááááŻááşáááŻááşáá˝ážááşááźáŹá¸ááťááşáĄá) áá˝ááş Telegram bot áááŻáááşááᯠáááˇáşáá˝ááşá¸áá˛áˇáááş-{"bot_token": "YOuRAwEsomeBOtToKen"}. - ááŤáááŻáˇáá˛áˇ áĽáááŹáá
áşááŻááᯠááŤáááşááŽá¸áááşá
TelegramBotááááťááąáŹ áááŻáááşáá áşááŻááąá¸áááşá
ááŤááŤáá˛á áĄááŻáśá¸ááźáŻááźáŽá¸ ááťáááşáá
áşááŻááž client áá
áşááŻááᯠáááşááááŻááşáááşá TelegramBotHook().clent áááŻáˇáááŻááş TelegramBotHook().get_conn().
áááŻááşáááŻááááĄáááŻááşá¸áááş Telegram REST API áĄáá˝ááş microwrapper áá
áşááŻááźáŻááŻááşá áá°ááŽááąáŹáá˝á˛áá°áážáŻáááźá
áşá
áąáááşá áááşá¸áááşá¸áá
áşááŻáĄáá˝ááş sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))áážááşáááşááąáŹáááşá¸áááşá¸áážáŹ áĄáŹá¸ááŻáśá¸ááąáŤááşá¸áááˇáşáááşááźá áşáááş-
TelegramBotSendMessage,TelegramBotHook,TelegramBot- plugin áá˝ááşá áĄááťáŹá¸áá°áážáŹáááŻáážáąáŹááşáážáŻáá˝ááşáááˇáşááŤá áááşá¸ááᯠOpen Source áááŻáˇááąá¸ááŤá
ááťá˝ááşáŻááşáááŻáˇ á¤áĄááŹáĄáŹá¸ááŻáśá¸ááᯠááąáˇááŹááąá ááşá ááťá˝ááşáŻááşáááŻáˇá áĄá áŽáááşááśá ᏠáĄááşááááşááťáŹá¸áááş áĄáąáŹááşááźááşá á˝áŹ ááĄáąáŹááşááźááşáááŻááşáᲠááťááşáááşáá˝ááş áĄáážáŹá¸áĄáá˝ááşá¸ áááşááąáˇááťáşáá áşáᯠááąá¸áááŻáˇáááŻááşáá˛áˇáááşá áážáŹá¸ááąááŹá¸ á á áşááąá¸ááźááˇáşáááş...

ááťá˝ááşáŻááşáááŻáˇááá˝áąá¸áááąá¸áá˝ááş áá
áşá
áŻáśáá
áşáᯠááťááşáá˝áŹá¸áááş á áĄá˛á፠ááŤáááŻáˇááťážáąáŹáşáááˇáşááŹá¸áᏠáááŻááşáá°á¸ááŹá¸? áĄáááĄááť
ááááşá¸áážáŹááŹá¸á
áá áşááŻááŻááᯠáá˝á˛ááťáąáŹáşááąáááşáááŻáˇ ááśá áŹá¸áááŤáááŹá¸á SQL Server ááž ááąááŹááťáŹá¸ááᯠVertica áááŻáˇ áá˝ážá˛ááźáąáŹááşá¸ááąá¸áááşáᯠáááááźáŻááŹá¸ááŻáśáááźáŽá¸á áááŻáˇááąáŹááş áááşá¸ááᯠáá°áᏠááąáŤááşá¸á ááşááž áááşáááŻááşáááşáá˛áˇá
á¤áááşá ááşááŻááşááŹáážáŻáááş áááşáá˝ááşááťááşáážááážáá ááťá˝ááşáŻááşáááş áááˇáşáĄáá˝ááş ááąáŤááŹááĄááŻáśá¸áĄáážáŻááşá¸áĄááťááŻáˇááᯠáááŻá¸áááŻá¸áážááşá¸áážááşá¸ááąáŹáşááźááááşááźá áşááŤáááşá áááŻáááşáááŻáááŻáá˝áŹá¸áááŻááşáááşá
ááťá˝ááşáŻááşáááŻáˇááĄá áŽáĄá ááşáážáŹ á¤áĄááŹááźá áşáááş-
- áááŻááşá¸ááŻááşááŤá
- áĄááŻááşááťáŹá¸ááᯠáááşááŽá¸ááŤá
- áĄááŹáĄáŹá¸ááŻáśá¸á áááşááąáŹááşáážáá˛áááŻáᏠááźááˇáşáááŻááşááŤá
- ááźááˇáşá á˝ááşáááş á ááşáážááşááśááŤááşááťáŹá¸ááᯠáááşáážááşááŤá
- SQL Server áážááąááŹáááŻááá°ááŤá
- ááąááŹááᯠVertica áá˝ááşáááˇáşááŤá
- á áŹáááşá¸áááŹá¸ááťáŹá¸á áŻááąáŹááşá¸ááŤá
ááŤááźáąáŹááˇáş ááŽáĄááŹáĄáŹá¸ááŻáśá¸ááᯠáĄááąáŹááşáĄáááşááąáŹáşáááŻááşáááŻáˇá ááťá˝ááşááąáŹáşáááŻáˇáá˛áˇ áĄááąá¸áĄáá˝ážáŹá¸áááşáááŻá¸áážáŻáá
áşáᯠááŻááşáá˛áˇáááşá docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyáĄá˛ááŽáážáŹáááş
- áĄáááşáážááşáĄááźá
áş Vertica
dwhááŻáśááąáááşáážááşááťááşáĄááťáŹá¸á áŻáážááˇáşáĄáá°á - SQL Server ááŻáśá¸ááťááŻá¸á
- ááťá˝ááşáŻááşáááŻáˇáááş ááąáŹááşáááŻááşá¸áá˝ááş ááąááŹááąáˇá
áşááťáŹá¸ááᯠááąááŹáĄááťááŻáˇááźááˇáş ááźááˇáşáá˝ááşá¸áááş (áááşáááˇáşáĄááźáąáĄááąáá˝ááşááž á
á°á¸á
ááşá¸áááąááŤá
mssql_init.py!)
ááťá˝ááşáŻááşáááŻáˇáááş ááááşáĄááźáááşáááş áĄáááşá¸áááşáááŻáážáŻááşáá˝áąá¸ááąáŹ command ááĄáá°áĄááŽááźááˇáş ááąáŹááşá¸áá˝ááşááąáŹáĄááŹáĄáŹá¸ááŻáśá¸ááᯠá áááşáááŻááşáááş-
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3ááťá˝ááşáŻááşáááŻáˇá miracle randomizer áážááŻááşááąá¸ááąáŹáĄááŹááᯠáááşáĄááŻáśá¸ááźáŻáááŻááşááŤáááşá Data Profiling/Ad Hoc Query:

áĄááááááąáŹáˇ áĄá˛ááŤááᯠáĄáá˛áááşáá˝áąááᯠáááźáááŻáˇááŤá
áĄááąá¸á áááşááąáŹáşááźááŤá ETL áĄá ááşá¸áĄááąá¸ááťáŹá¸ áááŻááşáá°á¸á áĄááŹáĄáŹá¸ááŻáśá¸á áĄááąá¸áĄáá˝á˛áᲠáááŻááşáá°á¸á ááŤáááŻáˇá áĄááźáąááśáá áşááŻááŻááşáááşá áĄá˛ááŤáážáŹ áááŻááşá¸ááŻááşáá áşááŻáážááááşá áĄááŹáĄáŹá¸ááŻáśá¸ááᯠáááşá ááşáááşááąááťáŹáá˛áˇ ááźáŻáśááźáŽá¸ áĄáᯠááŤáááŻááŻááşáááş-
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passáĄááťáááşááťááźáŽá ááťá˝ááşáŻááşáááŻáˇáááąááŹáááŻá áŻááąáŹááşá¸ááŤá ááŤáááŻáˇá áŹá¸áá˝á˛áá áşááŹáá˝á˛áááąá áĄáá˝ááşáááŹááž áááşáááąáŹááşááąáŹ áááŻááşá¸ááťáŹá¸áĄáá°áĄááŽááźááˇáş á¤áĄááŹááᯠááŻááşááźááŤá ááŻáˇá
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- ááťáááşáá
áşááŻááĄáá°áĄááŽááźááˇáş Airflow áážááážááááşá
pymssql-ááťáááşáááşááŤá - ááąáŹááşá¸áááŻáážáŻáá˝ááş áááşá á˝á˛ááŻáśá áśááźááˇáş áááˇáşáááşááťááşáá áşááŻááᯠáĄá áŹá¸áááŻá¸ááźááŤá ááŻáˇ - áááşá¸ááᯠááŻáśá áśáá˝ááşáĄááşááťááşááźááˇáş ááŻááşááąáŹááşáážáŻáá˛áááŻáˇ áááˇáşáá˝ááşá¸áááşááźá áşáááşá
- ááťá˝ááşáŻááşáááŻáˇáááąáŹááşá¸áááŻáážáŻááᯠááťá˝áąá¸áá˝áąá¸ááźááşá¸á
pandasááŤáááŻáˇááᯠáááşáá°áá°ááá˛áDataFrame- áĄááŹáááşáá˝ááş ááťá˝ááşáŻááşáááŻáˇáĄáá˝ááş áĄááŻáśá¸áááşááŤááááˇáşáááşá
áĄá áŹá¸áááŻá¸ááŻáśá¸ááąáááşá
{dt}ááąáŹááşá¸áááŻáážáŻáááˇáşáááşááťááşáĄá áŹá¸%sááŤá áááąáŹááşá¸áááŻá¸ááŤá¸ Pinocchio ááźáąáŹááˇáşáááŻááşáá˛ápandasááááŻááşáá˝ááşáááŻááşáá°á¸ápymssqlááźáŽá¸ááąáŹáˇ ááąáŹááşááŻáśá¸áá áşááŻááᯠááźááşáá áşáááŻááşáááşáparams: Listáá°ááááşáááŻááťááşááąáááˇáştuple.
ááźáŻá áŻáá°áááŻáááşá¸ áááááźáŻááŤápymssqláá°áˇááᯠáááąáŹááşááśááąáŹáˇáá°á¸áááŻáˇ ááŻáśá¸ááźááşááźáŽá¸ áá˝ááşáá˝áŹá¸áááŻáˇ áĄááťáááşáááşááźáŽápyodbc.
Airflow áááş ááťá˝ááşáŻááşáááŻáˇáááŻááşááąáŹááşááťááşááťáŹá¸á ááźááşá¸ááŻáśááťááşááťáŹá¸ááᯠááźááˇáşáááşá¸ááąá¸áááşááᯠááźááˇáşááźááŤá ááŻáˇá

ááąááŹááážááááş áááşááŻááşá áᏠáĄááźáąáŹááşá¸ááážáááŤáá°á¸á ááŤááąáááˇáş ááźááˇáşáá˝ááşá¸áážáŻ áĄáąáŹááşááźááşáááşáááŻáˇ áá°áááŹá áá°á¸áááşá¸ááŤáááşá ááŤááąáááˇáş ááŤá áĄáážáŹá¸áááŻááşááŤáá°á¸á ááźáąáŹáş ááŹááŻááşáááá˛! ááźáŽá¸ááąáŹáˇ ááŤáááŹáá˛á
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException áĄáážáŹá¸áĄáá˝ááşá¸ááťáŹá¸ááážááᯠAirflow áááŻááźáąáŹááąáŹáşáááşá¸ ááťá˝ááşáŻááşáááŻáˇáááş ááŻááşááąáŹááşáážáŻááᯠááťáąáŹáşáá˝áŹá¸ááŤáááşá áĄááşááŹááąáˇá
áşáá˝ááş áĄá
áááşá¸ááąáŹááş áááŻáˇáááŻááş áĄááŽááąáŹááşá
ááŻáááşá¸áážááááşáááŻááşááąáŹáşáááşá¸ áááşá¸ááąáŹááşáážááááşá
ááťá˝ááşááąáŹáşáááŻáˇáá˛áˇááąááŹáááŻáá áşááźááŤá ááŻáˇ ááąáŹáşááśááťááŻá¸á áŻáś:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])áĄáááş:
- áĄáąáŹáşááŤáá°ááŹá¸áá˛áˇ ááąááŹááąáˇá áşá
- ááťá˝ááşáŻááşáááŻáˇá ááąááźáŽá¸ááźááşá¸áááŻááşáᏠáĄááŻááşáᎠ(áá˝á˛ááźáŹá¸ááŤáááşá áĄááŻááşáááŻááşá¸áĄáá˝ááş),
- áĄáááşá¸áĄááźá áşáážááˇáş áĄáážáŹá ᏠID ááž hash áá áşáᯠ- áááŻáˇááźáąáŹááˇáş ááąáŹááşááŻáśá¸ááąááŹááąáˇá áş (áááŹá¸áá áşááŻáá˛áááŻáˇ áĄááŹáĄáŹá¸ááŻáśá¸ááᯠááąáŹááşá¸áááˇáşáááˇáş) áá˝ááş áá°á¸ááźáŹá¸ááąáŹ áĄáážáŹá ᏠID áá áşááŻáážááááşá
áĄááŻáśá¸á á˝ááşááąáŹáĄáááˇáşááťááşáááş- áĄááŹáĄáŹá¸ááŻáśá¸ááᯠVertica áá˛áááŻáˇááąáŹááşá¸áááˇáşááŤá áá°á¸áááşá¸ááŹáá ááŤáááŻááŻááşáááŻáˇ áĄáśáˇááááşá¸áá˛áˇ áĄááááąáŹááşááŻáśá¸áááşá¸áááşá¸áá˝áąáá˛ááá áşááŻáááąáŹáˇ CSV áážááááˇáşááŤá
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- ááťá˝ááşáŻááşáááŻáˇáááş áĄáá°á¸áááşááśáá°áĄáŹá¸ ááźáŻááŻááşááąááŤáááşá
StringIO. pandasááźááşááźááşááŹáᏠááŤáááŻáˇáááŻááŹá¸áááşáDataFrameáááŻáśá áśCSV-áááŻááşá¸ááťáŹá¸á- ááťáááşáá áşááŻááźááˇáş ááťá˝ááşáŻááşáááŻáˇááĄááźááŻááşááŻáśá¸ Vertica áááŻáˇ ááťáááşáááşáážáŻáá áşááŻáá˝ááˇáşááźááŤá ááŻáˇá
- áááŻáĄáá°áĄááŽááźááˇáş
copy()ááťá˝ááşáŻááşáááŻáˇáááąááŹááᯠVertika áááŻáˇ áááŻááşáááŻááşááąá¸áááŻáˇááŤá
áááŻááşá¸áááşááťážááźááˇáşááŹá¸áááşááᯠááááŻááşááŹááśááž áá°áááşááźá áşááźáŽá¸ áĄááŹáĄáŹá¸ááŻáśá¸ áĄáááşááźáąááźáąáŹááşá¸ á ááşáážááşáááşááąááťáŹáĄáŹá¸ ááźáąáŹááźááŤáááşá
session.loaded_rows = cursor.rowcount
session.successful = TrueááŤááŤáá˛á
ááąáŹááşá¸ááťáážáŻáá˝ááşá ááťá˝ááşáŻááşáááŻáˇáááş áá áşáážááşáááşá¸áááşááᯠáááŻááşáááŻááşáááşááŽá¸ááŤáááşá á¤áá˝ááş ááťá˝ááşáŻááşáááş ááťá˝ááşáŻááşáĄáŹá¸ á ááşááąá¸ááąá¸ááąá¸áá áşááŻáśá¸ááᯠáá˝ááˇáşááźáŻáááŻááşááŤáááşá
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)ááŤáĄááŻáśá¸ááźáŻááąáááş
VerticaOperator()ááąááŹááąáˇá áşá ááŽááŹáá áşááŻáážááˇáş áááŹá¸áá áşááŻááᯠááŤáááşááŽá¸áááş (áááşá¸áááŻáˇááážáááąá¸ááŤáá ááŻááşááŤáááş)á áĄááááĄááťááşáážáŹ áážáŽáááŻáĄáŹá¸ááŹá¸áážáŻááťáŹá¸ááᯠáážááşáááşá á˝áŹá áŽá ááşáááşááźá áşáááşá
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadáááşáááťáźá¸ááźáŻááť
ááźá˝ááşáááąá¸á ááźáąáŹáááş áááŻááşááŹá¸á
ááŤá ááąáŹáá˛áážáŹ ááźáąáŹááşá ááŹáĄááąáŹááşá¸ááŻáśá¸ ááááá ášááŹááşáá˛áááŻáᏠáááşá¸ááŻáśááŹá¸á
Julia Donaldson, The Gruffalo
áĄáááşá ááťá˝ááşáŻááşáááŻááşááąáŹáşáááŻááşáááşááťáŹá¸áážááˇáş ááťá˝ááşáŻááşáá˝ááş ááźááŻááşáááŻááşáážáŻáá áşááŻáážááá˛áˇááŤáá ETL ááŻááşáááşá¸á ááşááᯠáĄááťááşáĄááźááşáááşááŽá¸ááźáŽá¸ áááşáá°á á áááşáááŻááşáááşáááşá¸á áááşá¸áááŻáˇáááş áááşá¸áááŻáˇá SSIS áážááˇáş mouse áážááˇáş ááťá˝ááşáŻááşááᯠAirflow ááźááˇáş áážááŻááşá¸áážááşááźááˇáşááŤá ááźáŻááźááşááááşá¸ááááşá¸áážáŻ áá˝ááşáá°áážáŻáááŻáááşá¸ áážááŻááşá¸áážááşááźááˇáşáááşááźá áşááŤáááş... áááŻá¸á ááŤáá°áááŻáˇááᯠáĄáááşáááşáážáŹ áĄáááŻááşáá°áááşáááŻáᏠáááşá¸áááąáŹáá°áááşáááşáááşá
áááşá¸áááşá¸ááąá¸ááąá¸áááşáááşááźáąáŹááááş Apache Airflow - ááááŻááááşááŻááşááŻáśá áśáá˛áˇ ááŻááşáááşá¸á ááşáá˝áąááᯠááąáŹáşááźááźááşá¸áĄáŹá¸ááźááˇáş - ááŤáˇáĄááŻááş áááŻááźáŽá¸ áááŻáĄáááşááźáąááźáŽá¸ ááťáąáŹáşá ááŹááąáŹááşá¸áááşá
áááşá¸á áĄáááˇáşáĄáááşááážá ááťá˛áˇáá˝ááşáááŻááşáážáŻáááş ááááşáĄááşááťáŹá¸áážááˇáş ááťá˛áˇáá˝ááşáááŻááşáážáŻáĄááąáŤáş áá˝ááşá¸áĄáŹá¸ááąá¸áážáŻáááŻáˇááźáąáŹááˇáş áááˇáşáĄáŹá¸ ááąááŹáááŻááşá¸ááŽá¸ááŤá¸áá˝ááş Airflow ááᯠáĄááŻáśá¸ááźáŻáááş áĄáá˝ááˇáşáĄááąá¸ááᯠááąá¸áááş- ááąááŹá áŻááąáŹááşá¸ááźááşá¸á ááźááşáááşááźááşá¸áážááˇáş ááŻááşááąáŹááşááźááşá¸ ááśáááŹáá áşááŻááŻáśá¸áá˝ááşáááşá ááŻáśá¸ááťáśáá˝ážááşáááşááźááşá¸áá˝ááşáááş (áĄááşášááŤááźááŻááşáááŻáˇá áááşáááşá¸)á
áĄáááŻááşá¸ááąáŹááşááŻáśá¸á áĄáááŻá¸áĄááŹá¸áážááˇáşáĄááťááşáĄáááş
áááşá¸áĄáá˝ááş ááŤáááŻáˇá áŻááąáŹááşá¸ááŹá¸áá˛áˇ áá˝ááşááŻáśá¸
start_date. ááŻááşáááşá ááŤá ááąáááś meme ááźá áşááąááŤááźáŽá Doug ááĄááááĄááźááşá¸áĄááŻáśáážááááˇáşstart_dateáĄáŹá¸ááŻáśá¸ pass. áĄáááŻááťáŻááşáĄáŹá¸ááźááˇáş áááşáážááşáááťážááşstart_dateáááşáážááááşá á˝á˛áážááˇáşschedule_interval- áá áşááąáˇá áááŻáˇááąáŹááş DAG áááş ááááşááźááşáá˝ááş á áááşáááşááźá áşáááşástart_date = datetime(2020, 7, 7, 0, 1, 2)ááźáŽá¸ááąáŹáˇ ááźáżááŹááážáááąáŹáˇáá°á¸á
áááşá¸áážááˇáşáááşá ááşááąááąáŹ áĄááźáŹá¸ runtime error áá áşááŻáážáááŤáááş-
Task is missing the start_date parameterDag áĄáąáŹáşáááąááŹáážááˇáş ááťáááşáááş ááąáˇáá˝áŹá¸ááźáąáŹááşá¸ áááźáŹáá áá˝ážááşááźááąáŹá- áĄáŹá¸ááŻáśá¸á ááşáá áşááŻáááşá¸áážáŹá ááŻááşáá˛áˇá áážááˇáş áĄááźáąááśááťáŹá¸ (Airflow áááŻááşáááŻááşáážááˇáş ááťá˝ááşáŻááşáááŻáˇá áĄááąáŤáşááśáááŻááşá¸) áážááˇáş áááşááŹááŹáá áşááŻá áĄááťáááşáááŹá¸áá˝á˛áá°á áĄááŻááşáááŹá¸ááťáŹá¸á ááźáŽá¸ááąáŹáˇ áĄááŻááşááźá áşááąá¸áááşá áááŻáˇááąáŹáş áĄááťáááşááźáŹááŹáááşáážááˇáşáĄááťážá áááşááąáŹááşáážáŻááťáŹá¸áĄáá˝ááş ááŻááşááąáŹááşá ááŹááťáŹá¸ ááťáŹá¸ááźáŹá¸ááŹááźáŽá¸ PostgreSQL áááş áĄáá˝ážááşá¸ááááşá¸ááᯠ20 ms áĄá áŹá¸ 5 s áá˝ááş á áááşááŻáśáˇááźááşááąáŹáĄááŤá ááťá˝ááşáŻááşáááŻáˇ áááşá¸áááŻáá°á áááşááąáŹááşáá˝áŹá¸áá˛áˇáááşá
- LocalExecutorá ááŻááşáááşá ááŤáááŻáˇá áĄá˛ááŤááᯠáááŻááşááąááŻááşá¸á ááŤáááŻáˇ ááťáąáŹááşáá˛ááąáŹááşááąááźáŽá LocalExecutor áááş áááŻáĄááťáááşáĄáá ááťá˝ááşáŻááşáááŻáˇáĄáá˝ááş ááŻáśááąáŹááşááąááźáŽááźá áşááąáŹáşáááşá¸ áááŻáĄááťáááşáá˝ááş áĄáááşá¸ááŻáśá¸ áĄááŻááşáááŹá¸áá áşáŚá¸áážááˇáş áááŻá¸ááťá˛áˇáááş áĄááťáááşááťááąáŹááşááźáŽááźá áşááźáŽá¸ CeleryExecutor áááŻáˇ ááźáąáŹááşá¸áá˝ážáąáˇáááş ááťá˝ááşáŻááşáááŻáˇ ááźááŻá¸á áŹá¸ááááşááźá áşááŤáááşá á ááşáá áşááŻáááşá¸áá˝ááş áááş áááşá¸áážááˇáşáĄááŻááşááŻááşáááŻááşááąáŹááźáąáŹááˇáşá ááŹááŹáá áşááŻááąáŤáşáá˝ááşáááş Celery áááŻáĄááŻáśá¸ááźáŻááźááşá¸ááᯠáááşáááˇáşáĄááŹáááž áááşáááˇáşá áąáááşáááŻááşááąá
- áĄááŻáśá¸áááźáŻááŤá built-in tools ááťáŹá¸:
- connections áááşááąáŹááşáážáŻáĄááąáŹááşáĄááŹá¸ááťáŹá¸ááᯠááááşá¸áááşá¸áááşá
- SLA áá˝ááşá¸áá˝ááşááźááşá¸á áĄááťáááşáᎠáááźáŽá¸ááźááşáá˛áˇ áĄááŻááşáá˝áąááᯠááŻáśáˇááźááşáááŻáˇá
- xcom metadata ááážááşááźááşá¸áĄáá˝ááş (ááťá˝ááşááąáŹáşááźáąáŹáá˛áˇáááşá metadata!) dag áĄááŻááşááťáŹá¸ááźáŹá¸á
- ááąá¸ááş áĄáá˝á˛ááŻáśá¸á áŹá¸ááŻááşááźááşá¸á ááąáŹááşá¸ááźáŽá ááŤááŹááźáąáŹáááŻááşááá˛á ááťáááşá¸áá˝áŹá¸ááąáŹ ááŻááşááąáŹááşá ááŹááťáŹá¸ áááşááŤááá˛áᲠááźáŻááŻááşááźááşá¸áĄáá˝ááş áááááąá¸ááťááşááťáŹá¸ááᯠáááˇáşáá˝ááşá¸ááŹá¸ááŤáááşá ááᯠááťá˝ááşáŻááşááĄááŻááş Gmail áá˝ááş Airflow ááž áĄáŽá¸ááąá¸ááşááąáŤááşá¸ 90k áážáááźáŽá¸á web mail muzzle áááş áá áşááźáááşááťážááş 100 ááťáąáŹáşááᯠááąáŹááşáá°ááźáŽá¸ ááťááşáááş ááźááşá¸áááŻááŹá¸áááşá
ááąáŹááşáááş áĄáášáááŹááşááťáŹá¸-
ááąáŹááşáááş áĄáááŻáĄááťáąáŹááş ááááááŹááťáŹá¸
ááťá˝ááşáŻááşáááŻáˇáááş ááťá˝ááşáŻááşáááŻáˇááŚá¸ááąáŤááşá¸áážááˇáşáááşááźááˇáşáááŻááşáᲠááťá˝ááşáŻááşáááŻáˇááŚá¸ááąáŤááşá¸áážááˇáşáááŻááĄááŻááşááŻááşáááŻááşá áąáááşáĄáá˝ááş Airflow áááş ááťá˝ááşáŻááşáááŻáˇáĄáá˝ááş á¤áĄááŹáááŻááźááşáááşááŹá¸áááşá
- - áá°áˇáážáŹ ááášááŹáááˇáşáá˛áˇ áĄááąáĄááŹá¸áážáááŻááşá¸áá˛á áĄááŻááşáááŻááşáĄáąáŹááş ááŹá¸ááŹá¸áááŻáˇá áááşá¸áážááˇáşáĄáá°á áááşáááş dags áážááˇáş ááŻááşááąáŹááşá ááŹááťáŹá¸áĄááźáąáŹááşá¸ áĄááťááşáĄáááşááťáŹá¸áááŻááŹáá ááťážááşáá áşááźááşáá áşááŻáĄáŹá¸ á áááşáááşá DAG Run áááŻáˇáááŻááş ááąáá°á¸áááşáá áşááŻááᯠáááşááŽá¸áááŻááşáááşá
- - WebUI áážáá
áşáááˇáş áĄááŻáśá¸ááźáŻáááş áĄáááşáááźáąááŻáśááŹáá ááąááŻááťáĄáŹá¸ááźááˇáş ááážáááąáŹáˇáááˇáş ááááááŹáĄááťáŹá¸áĄááźáŹá¸ááᯠáĄááááˇáşááąá¸á
áŹááźáąáŹááşá¸áážáá
áşáááˇáş ááážááááŻááşáááşá áĽáááŹáĄáŹá¸ááźááşáˇ:
backfillááŻááşáááşá¸ááąáŹááşááŹááťáŹá¸ááᯠááźááşáááşá áááşáááş áááŻáĄááşááŤáááşá
áĽáááŹáĄáŹá¸ááźááˇáşá ááąáˇááŹááŻáśá¸áááşáá°ááťáŹá¸áááş ááŹáááźáąáŹáááş- âáĄáááŻáááŻáˇá áááşáááŤáᎠá áááşááž áá áááşáĄáá ááąááŹááťáŹá¸áá˝ááş áĄáááášááŹááşááážáááąá ááźááşááŤá ááźááşááŤá ááźááşááŤá ááźááşááŤáâ ááźáŽá¸ááąáŹáˇ áááşá¸á ááŽááᯠááŤáááŹá፠áairflow backfill -s '2020-01-01' -e '2020-01-13' orders- áĄááźáąááśáááşááąáŹááşáážáŻ-
initdb,resetdb,upgradedb,checkdb. runáááşá¸áááş áááˇáşáĄáŹá¸ instance ááŻááşááąáŹááşá ááŹáá áşááŻáĄáŹá¸ ááŻááşááąáŹááşáááŻááşááźáŽá¸ áážáŽáááŻáážáŻáĄáŹá¸ááŻáśá¸áá˝ááş áĄáážááşááááŻááşáááşá áááŻáážáááŤá¸á áááşáááşáááşá¸áááŻáážááááˇáş run áááŻááşáááşáLocalExecutorááááŽáĄá áŻáĄááąá¸áážáááťážááşáááşá¸á- ááąáŹáşááąáŹáş áá°ááŹááᯠááŻááşáááşá
testááźáąá á˝ááşáážáŹáááşá¸ ááŹááž áááąá¸áá°á¸á connectionsshell ááž ááťáááşáááşáážáŻááťáŹá¸ááᯠáĄá áŻáááŻááşáĄááźáŻáśáááŻááş áááşááŽá¸áá˝ááˇáşááźáŻáááşá
- - ááááşáĄááşááťáŹá¸áĄáá˝ááş áááşáá˝ááşááźáŽá¸ áááşá¸ááᯠáááşáĄáááşá¸áááşááźááˇáş áá˝ážáąáážáąáŹááşááźááşá¸áááźáŻáᲠáĄááźááşáĄáážááşáááşááśááźááşá¸á áááşáá˛ááąáŹáááşá¸áááşá¸ááźá
áşáááşá ááŤááąáááˇáş ááŤáááŻáˇááᯠáááşáá°á ááŹá¸áážáŹáá˛á
/home/airflow/dagsááźáąá¸ipythonáážáŻááşáá˝ááąááąáŹáˇáááŹá¸á áĽáááŹáĄáŹá¸ááźááˇáş áááşáááş áĄáąáŹááşááŤááŻááşááźááˇáş áááşáá˝ááşáážáŻáĄáŹá¸ááŻáśá¸ááᯠááŻááşáá°áááŻááşáááş-from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Airflow áááşááŹááąááŹááąáˇá
áşáááŻáˇ ááťáááşáááşááąáááşá áááşá¸áááŻá
áŹááąá¸áááş ááťá˝ááşáŻááşááĄááźáśááźáŻááŹá¸ááąáŹáşáááşá¸ áĄááťááŻá¸ááťááŻá¸ááąáŹááááťááąáŹáááşááá
áşááťáŹá¸áĄáá˝ááş ááŻááşááąáŹááşá
ááŹáĄááźáąáĄááąááťáŹá¸áááŻááá°ááźááşá¸áááş APIs ááťáŹá¸áááŻáĄááŻáśá¸ááźáŻááźááşá¸áááş ááťáŹá¸á
á˝áŹáááŻáááŻááźááşáááşáá˝ááşáá°ááŤáááşá
ááťá˝ááşáŻááşáááŻáˇááĄááŻááşááťáŹá¸áĄáŹá¸ááŻáśá¸áááş áĄá á˝ááşá¸áĄá ááážáááąáŹáşáááşá¸ áá áşááŤáá áşááś ááźáŻááşááťáááŻááşááźáŽá¸á áááşá¸áááş ááŻáśáážááşááźá áşáááşáᯠáááŻááźááŤá ááŻáˇá áááŻáˇááąáŹáş áĄááťááŻáˇááąáŹ ááááşáááŻáˇáážáŻááťáŹá¸áááş ááśááááźá áşáá˝ááşáážáááźáŽá¸ á á áşááąá¸áááş áááŻáĄááşáááşááźá áşáááşá
SQL áááááŹá¸ááŤá
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
áááŻá¸ááŹá¸
ááŻááşááŤáááşá Google áááŻááşááąá¸áá˛áˇ áááˇáşááşáááşááŻáᏠááťá˝ááşááąáŹáşáá˛áˇ bookmarks áá˛á Airflow folder áá˛áˇ áĄááźáąáŹááşá¸áĄááŹáá˝áąááŤá
- - ááŻááşááŤáááşá ááŤáááŻáˇááŻáśá¸áááąá ááááşá á áŹáá˝ááşá áŹáááşá¸á áááŻáˇááąáŹáş áá˝ážááşááźáŹá¸ááťááşááᯠáááşáá°áááşááááşá¸á
- - áĄáááşá¸ááŻáśá¸á áááşááŽá¸áá°ááťáŹá¸ááśááž áĄááźáśááźáŻááťááşááťáŹá¸ááᯠáááşááŤá
- - áĄá áááŻááşá¸- ááŻáśááťáŹá¸áá˝ááş áĄááŻáśá¸ááźáŻáá°ááťááşáážáŹááźááş
- - áĄáááşá (ááŻááşááááş) áááşáááş ááťá˝ááşáŻááşááśááž áá áşá áŻáśáá áşááŹááᯠááŹá¸ááááşááŤá áĄááźáąááśáááąáŹáááŹá¸ááťáŹá¸ááᯠááąáŹááşá¸á á˝áŹááąáŹáşááźááŹá¸ááŤáááşá
- - Airflow áĄá áŻáĄááąá¸áá áşáᯠáááşáážááşááźááşá¸áĄáá˝ááş áĄáááŻááťáŻáśá¸áááşá¸áá˝ážááşá
- - áá°ááŽááąáŹá áááşáááşá áŹá¸á ááŹááąáŹááşá¸ááąáŹááąáŹááşá¸ááŤá¸á ááźá áşáááŻááşáááşáážáŹ áááŻá áááŹá¸áááşááŤááážááˇáş áĽáááŹáĄáááşá¸áááşáážáá˝á˛á
- - Celery áážááˇáşáá˝á˛áááşááŻááşááąáŹááşááźááşá¸áĄááźáąáŹááşá¸á
- - áááşá á˝á˛áĄá áŹá¸ ID ááźááˇáşáááşááźááşá¸á áĄáá˝ááşááźáąáŹááşá¸ááźááşá¸á áááŻááşáá˝á˛áˇá ááşá¸ááŻáśáážááˇáş áĄááźáŹá¸á áááşáááşá áŹá¸áá˝ááşáĄááŹááťáŹá¸áĄááźáąáŹááşá¸ - áĄááŻááşááťáŹá¸áááŻáśááąáŹááşááźááşá¸áĄááźáąáŹááşá¸á
- - ááťá˝ááşáŻááşááąáŹáşááźáá˛áˇáááˇáş áĄááŻááşááťáŹá¸á áážáŽáááŻáážáŻáážááˇáş Trigger Ruleá
- - áĄá áŽáĄá ááşáá˝á˛áá°áá˝ááş "áááşáá˝ááşááŹá¸áááˇáşáĄáááŻááşá¸áĄááŻááşááŻááşáááş" áĄááťááŻáˇáááŻááťáąáŹáşáá˝ážáŹá¸áááşá¸á ááŻáśá¸áážáŻáśá¸áá˝áŹá¸ááąáŹááąááŹáááŻáááşááźáŽá¸áĄááŻááşááťáŹá¸áááŻáŚá¸á áŹá¸ááąá¸ááŻááşááŤá
- â Airflow áááşááŹááąááŹáĄáá˝ááş áĄááŻáśá¸áááşááąáŹ SQL ááąá¸ááźááşá¸áážáŻááťáŹá¸á
- - á áááşááźááŻááşáĄáŹááŻáśááśááááááŹáááşááŽá¸ááźááşá¸áážááˇáş áááşáááşá áĄááŻáśá¸áááşááąáŹáĄáááŻááşá¸áá áşááŻáážááááşá
- â Data Science áĄáá˝ááş AWS áá˝ááş áĄááźáąááśáĄááąáŹááşáĄáĄáŻáśáááşááąáŹááşááźááşá¸áážááˇáşáááşáááşááąáŹ á áááşáááşá áŹá¸áá˝ááşáĄáááŻááťáŻáśá¸áážááşá áŻá
- - ááŹáááşáĄáážáŹá¸ááťáŹá¸ (áá áşá áŻáśáá áşááąáŹááşá áá˝ážááşááźáŹá¸ááťááşááᯠááááşáááąá¸áááˇáşáĄááŤ)á
- - áááş Connections áááŻááŻáśá¸ááŻáśááźááˇáş á ááŹá¸áážááşááťáŹá¸ááᯠááááşá¸áááşá¸ááŹá¸ááąáŹáşáááşá¸ áá°ááťáŹá¸á ááźáŻáśá¸ááąááŻáśá
- - áá˝ááşáááŻááşááąáŹ DAG áááşáááˇáşáááŻáˇááźááşá¸á ááŻááşááąáŹááşááťááşááťáŹá¸áá˝ááş á ááŹá¸á ááşááááźááşá¸á áážáŽáááŻáážáŻááťáŹá¸áĄááźáąáŹááşá¸ áááşááŹáááşááŹáážááˇáş ááŻááşáááşá¸á áĽáşááťáŹá¸ááᯠááťáąáŹáşáá˝áŹá¸ááźááşá¸áĄááźáąáŹááşá¸á
- - áĄááŻáśá¸ááźáŻáážáŻáážááˇáş áááşáááş
default argumentsиparamsááá°ááŹááŻáśá áśááťáŹá¸áĄááźááş Variables ááťáŹá¸áážááˇáş ááťáááşáááşáážáŻááťáŹá¸á - - á áŽá ááşáá°áááş Airflow 2.0 áĄáá˝ááş ááźááşáááşááąááŻáśáĄááźáąáŹááşá¸ ááŹááşáááşá¸á
- - ááťá˝ááşáŻááşáááŻáˇá áĄá
áŻáĄááąá¸áá˝ááş ááźááˇáşááťááşááźááşá¸áĄááźáąáŹááşá¸ áĄáááşá¸áááş ááąááşááąáŹááşááťááąááąáŹ ááąáŹááşá¸ááŤá¸
docker-compose. - - áááşá¸áááááşááťáŹá¸áážááˇáş áĄááźáąáŹááşá¸áĄáᏠáááşáááˇáşáááŻáˇááźááşá¸ááᯠáĄááŻáśá¸ááźáŻá ááźáąáŹááşá¸áá˛ááąááąáŹ áĄááŻááşááťáŹá¸á
- â á áŹáááŻááşáážááˇáş Slack ááž á áśáážááˇáş á áááşááźááŻááş áĄááááąá¸ááťááşááťáŹá¸á
- - ááŻááşáááşá¸ááŹáááşááťáŹá¸á áááşááááŻááťáŹá¸áážááˇáş XCom áááŻáá˝á˛ááŻááşááźááşá¸á
áážááˇáş ááąáŹááşá¸ááŤá¸áá˝ááş áĄááŻáśá¸ááźáŻáááˇáş áááˇáşááşááťáŹá¸
- - áááşá¸áááááşááťáŹá¸áá˝ááş áĄááŻáśá¸ááźáŻáááş ááąááŹáá°áááŻááşáááşá
- - áááŻááşá¸ááťáŹá¸áááŻáááşááŽá¸ááŹáá˝ááş áĄááźá áşááťáŹá¸ááąáŹáĄáážáŹá¸ááťáŹá¸á
- -
docker-composeá ááşá¸áááşááźááşá¸á áĄáážáŹá¸ááźááşáááşááźááşá¸áážááˇáş áĄááźáŹá¸áĄááŹááťáŹá¸áĄáá˝ááşá - - Telegram REST API áĄáá˝ááş Python wrapperá
source: www.habr.com




