ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

ื”ืขืœื, ืื™ืš ื‘ื™ืŸ ื“ืžื™ื˜ืจื™ ืœืึธื’ื•ื•ื™ื ืขื ืงืึธ - ื“ืึทื˜ืึท ื™ื ื–ืฉืขื ื™ืจ ืคื•ืŸ ื“ื™ ืึทื ืึทืœื™ื˜ื™ืงืก ืึธืคึผื˜ื™ื™ืœื•ื ื’ ืคื•ืŸ ื“ื™ ื•ื•ืขื–ืขื˜ ื’ืจื•ืคึผืข ืคื•ืŸ โ€‹โ€‹ืงืึธืžืคึผืึทื ื™ืขืก.

ืื™ืš ื•ื•ืขื˜ ื–ืึธื’ืŸ ืื™ืจ ื•ื•ืขื’ืŸ ืึท ื’ืจื•ื™ืก ื’ืขืฆื™ื™ึทื’ ืคึฟืึทืจ ื“ืขื•ื•ืขืœืึธืคึผื™ื ื’ ETL ืคึผืจืึทืกืขืกืึทื– - Apache Airflow. ืึธื‘ืขืจ ืึทื™ืจืคืœืึธื•ื• ืื™ื– ืึทื–ื•ื™ ื•ื•ืขืจืกืึทื˜ืึทืœ ืื•ืŸ ืžืึทืœื˜ื™ืคืึทืกืึทื˜ื™ื“ ืึทื– ืื™ืจ ื–ืึธืœ ื ืขืžืขืŸ ืึท ื ืขืขื ื˜ืขืจ ืงื•ืง ืื™ืŸ ืขืก ืืคื™ืœื• ืื•ื™ื‘ ืื™ืจ ื–ืขื ื˜ ื ื™ืฉื˜ ื™ื ื•ื•ืึทืœื•ื•ื“ ืื™ืŸ ื“ืึทื˜ืŸ ืคืœืึธื•ื–, ืึธื‘ืขืจ ืื™ืจ ื“ืึทืจืคึฟืŸ ืฆื• ืคึผื™ืจื™ืึทื“ื™ืงืœื™ ืงืึทื˜ืขืจ ืงื™ื™ืŸ ืคึผืจืึทืกืขืกืึทื– ืื•ืŸ ืžืึธื ื™ื˜ืึธืจ ื–ื™ื™ืขืจ ื“ื•ืจื›ืคื™ืจื•ื ื’.

ืื•ืŸ ื™ืึธ, ืื™ืš ื•ื•ืขืœ ื ื™ืฉื˜ ื‘ืœื•ื™ื– ื–ืึธื’ืŸ ืื™ืจ, ืึธื‘ืขืจ ืื•ื™ืš ื•ื•ื™ื™ึทื–ืŸ ืื™ืจ: ื“ื™ ืคึผืจืึธื’ืจืึทื ื›ึผื•ืœืœ ืึท ืคึผืœืึทืฅ ืคื•ืŸ ืงืึธื“, ืกืงืจืขืขื ืฉืึธืฅ ืื•ืŸ ืจืขืงืึทืžืึทื ื“ื™ื™ืฉืึทื ื–.

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ
ื•ื•ืึธืก ืื™ืจ ื™ื•ื–ืฉืึทื•ื•ืึทืœื™ ื–ืขืŸ ื•ื•ืขืŸ ืื™ืจ ื’ื•ื’ืœ ื“ื™ ื•ื•ืึธืจื˜ Airflow / Wikimedia Commons

ื˜ื™ืฉ ืคื•ืŸ ืื™ื ื”ืึทืœื˜

ื”ืงื“ืžื”

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื• ืื™ื– ืคึผื•ื ืงื˜ ื•ื•ื™ Django:

  • ื’ืขืฉืจื™ื‘ืŸ ืื™ืŸ ืคึผื™ื˜ื”ืึธืŸ,
  • ืขืก ืื™ื– ืึท ื•ื™ืกื’ืขืฆื™ื™ื›ื ื˜ ืึทื“ืžื™ื ื™ืกื˜ืจืึทื˜ืึธืจ,
  • ืึทื ืœื™ืžืึทื˜ืึทื“ืœื™ ื™ืงืกืคึผืึทื ื“ืึทื‘ืึทืœ

- ื ืึธืจ ื‘ืขืกืขืจ, ืื•ืŸ ื’ืขืžืื›ื˜ ืคึฟืึทืจ ื’ืึธืจ ืคืึทืจืฉื™ื“ืขื ืข ืฆื•ื•ืขืงืŸ, ื ื™ื™ืžืœื™ (ื•ื•ื™ ื’ืขืฉืจื™ื‘ืŸ ืื™ื™ื“ืขืจ ื“ื™ ืงืึทื˜):

  • ืงืึทื˜ืขืจ ืื•ืŸ ืžืึธื ื™ื˜ืึธืจื™ื ื’ ื˜ืึทืกืงืก ืื•ื™ืฃ ืึท ืึทื ืœื™ืžืึทื˜ืึทื“ ื ื•ืžืขืจ ืคื•ืŸ ืžืืฉื™ื ืขืŸ (ื•ื•ื™ ืคื™ืœืข ืกืขืœื“ืขืจื™ื™ / ืงื•ื‘ืขืจื ืขื˜ืขืก ืื•ืŸ ื“ื™ื™ืŸ ื’ืขื•ื•ื™ืกืŸ ื•ื•ืขื˜ ืœืึธื–ืŸ ืื™ืจ)
  • ืžื™ื˜ ื“ื™ื ืึทืžื™ืฉ ื•ื•ืึธืจืงืคืœืึธื•ื• ื“ื•ืจ ืคื•ืŸ ื–ื™ื™ืขืจ ื’ืจื™ื ื’ ืฆื• ืฉืจื™ื™ึทื‘ืŸ ืื•ืŸ ืคึฟืึทืจืฉื˜ื™ื™ืŸ ืคึผื™ื˜ื”ืึธืŸ ืงืึธื“
  • ืื•ืŸ ื“ื™ ืคื™ื™ื™ืงื™ื™ื˜ ืฆื• ืคืึทืจื‘ื™ื ื“ืŸ ืงื™ื™ืŸ ื“ืึทื˜ืึทื‘ื™ื™ืกื™ื– ืื•ืŸ ืึทืคึผื™ืก ืžื™ื˜ ื‘ื™ื™ื“ืข ืคืึทืจื˜ื™ืง ืงืึทืžืคึผืึธื•ื ืึทื ืฅ ืื•ืŸ ื›ืึธื•ืžืžื™ื™ื“ ืคึผืœื•ื’ื™ื ืก (ื•ื•ืึธืก ืื™ื– ื’ืึธืจ ืคึผืฉื•ื˜).

ืžื™ืจ ื ื•ืฆืŸ Apache Airflow ื•ื•ื™ ื“ืึธืก:

  • ืžื™ืจ ืงืœื™ื™ึทื‘ืŸ ื“ืึทื˜ืŸ ืคึฟื•ืŸ ืคืึทืจืฉื™ื“ืŸ ืงื•ื•ืืœืŸ (ืคื™ืœืข SQL ืกืขืจื•ื•ื™ืจืขืจ ืื•ืŸ PostgreSQL ื™ื ืกื˜ืึทื ืกื™ื–, ืคืึทืจืฉื™ื“ืŸ ืึทืคึผื™ืก ืžื™ื˜ ืึทืคึผืœืึทืงื™ื™ืฉืึทืŸ ืžืขื˜ืจื™ืงืก, ืืคื™ืœื• 1C) ืื™ืŸ DWH ืื•ืŸ ODS (ืคึฟืึทืจ ืื•ื ื“ื– ืขืก ืื™ื– ื•ื•ืขืจื˜ื™ืงืึท ืื•ืŸ ืงืœื™ืงื›ืึธื•ืกืข).
  • ื•ื•ื™ ืึทื•ื•ืึทื ืกื™ืจื˜ืข cron, ื•ื•ืึธืก ืœื•ื™ืคื˜ ื“ืึทื˜ืŸ ืงืึทื ืกืึทืœืึทื“ื™ื™ืฉืึทืŸ ืคึผืจืึทืกืขืกืึทื– ืื•ื™ืฃ ื“ื™ ODS ืื•ืŸ ืื•ื™ืš ืžืึธื ื™ื˜ืึธืจืก ื–ื™ื™ืขืจ ื•ื™ืฉืึทืœื˜.

ื‘ื™ื– ืœืขืฆื˜ื ืก, ืื•ื ื“ื–ืขืจ ื‘ืื“ืขืจืคืขื ื™ืฉืŸ ื–ืขื ืขืŸ ื‘ืื“ืขืงื˜ ื“ื•ืจืš ืื™ื™ืŸ ืงืœื™ื™ืŸ ืกืขืจื•ื•ืขืจ ืžื™ื˜ 32 ืงืึธืจืขืก ืื•ืŸ 50 ื’ื™ื’ืื‘ื™ื™ื˜ ืคื•ืŸ ื‘ืึทืจืึทืŸ. ืื™ืŸ Airflow ื“ืึธืก ืึทืจื‘ืขื˜:

  • ืžืขืจ 200 ื˜ืึธื’ (ืคืืงื˜ื™ืฉ ื•ื•ืึธืจืงืคืœืึธื•ื•ืก ืื™ืŸ ื•ื•ืึธืก ืžื™ืจ ื”ืึธื‘ืŸ ืึธื ื’ืขืคื™ืœื˜ ื“ื™ ื˜ืึทืกืงืก),
  • ืื™ืŸ ื™ืขื“ืขืจ ืื™ืŸ ื“ื•ืจื›ืฉื ื™ื˜ืœืขืš 70 ื˜ืึทืกืงืก,
  • ื“ื™ ืฉื˜ืึธืคึผืŸ ืกื˜ืึทืจืฅ (ืื•ื™ืš ืื™ืŸ ื“ื•ืจื›ืฉื ื™ื˜ืœืขืš) ืึทืžืึธืœ ืึท ืฉืขื”.

ืื™ืš ื•ื•ืขื˜ ืฉืจื™ื™ึทื‘ืŸ ื•ื•ืขื’ืŸ ื•ื•ื™ ืžื™ืจ ื™ืงืกืคึผืึทื ื“ื™ื“ ืื•ื ื˜ืŸ, ืึธื‘ืขืจ ืื™ืฆื˜ ืœืึธื–ืŸ ืื•ื ื“ื– ื“ืขืคื™ื ื™ืจืŸ ื“ื™ ืื™ื‘ืขืจ-ืึทืจื‘ืขื˜ ื•ื•ืึธืก ืžื™ืจ ื•ื•ืขืœืŸ ืกืึธืœื•ื•ืข:

ืขืก ื–ืขื ืขืŸ ื“ืจื™ื™ ืžืงื•ืจ ืกืงืœ ืกืขืจื•ื•ืขืจืก, ื™ืขื“ืขืจ ืžื™ื˜ 50 ื“ืึทื˜ืึทื‘ื™ื™ืกื™ื– - ืจื™ืกืคึผืขืงื˜ื™ื•ื•ืœื™ ื‘ื™ื™ ืื™ื™ืŸ ืคึผืจื•ื™ืขืงื˜, ื–ื™ื™ืขืจ ืกื˜ืจื•ืงื˜ื•ืจ ืื™ื– ื“ื™ ื–ืขืœื‘ืข (ื›ึผืžืขื˜ ืื•ืžืขื˜ื•ื, muah-ha-ha), ื•ื•ืึธืก ืžื™ื˜ืœ ืึทื– ื™ืขื“ืขืจ ื”ืื˜ ืึทืŸ ืึธืจื“ืขืจืก ื˜ื™ืฉ (ืฆื•ืž ื’ืœื™ืง, ืื™ืจ ืงืขื ืขืŸ ื”ืึธื‘ืŸ ืึท ื˜ื™ืฉ ืžื™ื˜ ื“ืขื ื ืึธืžืขืŸ ืฉื˜ื•ืคึผืŸ ืื™ืŸ ืงื™ื™ืŸ ื’ืขืฉืขืคื˜). ืžื™ืจ ื ืขืžืขืŸ ื“ื™ ื“ืึทื˜ืŸ ื“ื•ืจืš ืึทื“ื™ื ื’ ืกืขืจื•ื•ื™ืก ืคืขืœื“ืขืจ (ืžืงื•ืจ ืกืขืจื•ื•ืขืจ, ืžืงื•ืจ ื“ืึทื˜ืึทื‘ื™ื™ืก, ETL ื˜ืึทืกืง ื™ื™ื“ืขื ืึทืคื™ื™ื“) ืื•ืŸ ื ืึทื™ื•ื•ืœื™ ื•ื•ืึทืจืคืŸ ื–ื™ื™ ืื™ืŸ ื•ื•ืขืจื˜ื™ืงืึท.

ื–ืืœ ืก ื’ื™ื™ืŸ!

ื“ืขืจ ื˜ื™ื™ืœ ืื™ื– ื™ืงืขืจื“ื™ืง, ืคึผืจืึทืงื˜ื™ืฉ (ืื•ืŸ ืึท ื‘ื™ืกืœ ื˜ืขืึธืจืขื˜ื™ืฉ)

ืคืืจื•ื•ืืก ื˜ืึธืŸ ืžื™ืจ ื“ืึทืจืคึฟืŸ ืขืก (ืื•ืŸ ืื™ืจ)

ื•ื•ืขืŸ ื“ื™ ื‘ื™ื™ืžืขืจ ื–ืขื ืขืŸ ื’ืจื•ื™ืก ืื•ืŸ ืื™ืš ืื™ื– ื’ืขื•ื•ืขืŸ ืคึผืฉื•ื˜ SQLื•ื•ื™ ืึท ืคืึทืจื•ื•ืึทืœื˜ืขืจ ืื™ืŸ ืื™ื™ืŸ ืจื•ืกื™ืฉ ืœืึทื›ืึธื“ื™ืž, ืžื™ืจ ื”ืึธื‘ืŸ ืกืงื™ื™ืœื™ื ื’ ETL ืคึผืจืึทืกืขืกืึทื– ืึทืงืึท ื“ืึทื˜ืŸ ืคืœืึธื•ื– ื ื™ืฆืŸ ืฆื•ื•ื™ื™ ืžื›ืฉื™ืจื™ื ื‘ื ื™ืžืฆื ืฆื• ืื•ื ื“ื–:

  • ื™ื ืคืึธืจืžืึทื˜ื™ืงืึท ืžืึทื›ื˜ ืฆืขื ื˜ืขืจ - ืึท ื’ืึธืจ ื•ื•ืขืจืกืึทื˜ืึทืœ ืกื™ืกื˜ืขื, ื’ืึธืจ ืคึผืจืึธื“ื•ืงื˜ื™ื•ื•, ืžื™ื˜ ื–ื™ื™ืŸ ืื™ื™ื’ืขื ืข ื™ื™ึทื–ื ื•ื•ืึทืจื’, ื–ื™ื™ืŸ ืื™ื™ื’ืขื ืข ื•ื•ืขืจืกื™ืข. ืื•ื™ื‘ ื’ืึธื˜ ื•ื•ื™ืœ, ืื™ืš ื’ืขื•ื•ื™ื™ื ื˜ 1% ืคื•ืŸ ื–ื™ื™ึทืŸ ืงื™ื™ืคึผืึทื‘ื™ืœืึทื˜ื™ื–. ืคืืจื•ื•ืืก? ื ื•, ืขืจืฉื˜ืขืจ, ื“ืขื ืฆื•ื‘ื™ื ื“ ืฉื˜ืขืœืŸ ืคืกื™ื›ืืœืื’ื™ืฉืŸ ื“ืจื•ืง ืื•ื™ืฃ ืื•ื ื“ื– ืขืจื’ืขืฅ ืฆื•ืจื™ืง ืื™ืŸ ื“ื™ 380 ืก. ืฆื•ื•ื™ื™ื˜ื ืก, ื“ื™ ื–ืึทืš ืื™ื– ื“ื™ื–ื™ื™ื ื“ ืคึฟืึทืจ ื’ืึธืจ ืกืึทืคื™ืกื˜ืึทืงื™ื™ื˜ื™ื“ ืคึผืจืึทืกืขืกืึทื–, ื•ืคื’ืขืงืึธื›ื˜ ืจื™ื™ื•ืก ืคื•ืŸ ืงืึทืžืคึผืึธื•ื ืึทื ืฅ ืื•ืŸ ืื ื“ืขืจืข ื–ื™ื™ืขืจ ื•ื•ื™ื›ื˜ื™ืง ืคืึทืจื ืขืžื•ื ื’ ืคึฟืขื™ึดืงื™ื™ื˜ืŸ. ืžื™ืจ ื•ื•ืขืœืŸ ื ื™ืฉื˜ ื–ืึธื’ืŸ ืขืคึผืขืก ื•ื•ืขื’ืŸ ื“ืขื ืคืึทืงื˜ ืึทื– ืขืก ืงืึธืก ื•ื•ื™ ืคื™ืœ ื•ื•ื™ ืึท ืึทื™ืจื‘ื•ืก AXNUMX ืคืœื™ื’ืœ ืคึผืขืจ ื™ืึธืจ.

    ื”ื™ื˜ ืื™ื™ืš, ื“ื™ ืกืงืจืขืขื ืฉืึธื˜ ืงืขืŸ ืฉืึทื˜ืŸ ืžืขื ื˜ืฉืŸ ืื•ื ื˜ืขืจ 30 ืึท ื‘ื™ืกืœ

    ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

  • ืกืงืœ ืกืขืจื•ื•ื™ืจืขืจ ื™ื ื˜ืขื’ืจืึทื˜ื™ืึธืŸ ืกืขืจื•ื•ื™ืจืขืจ - ืžื™ืจ ื’ืขื•ื•ื™ื™ื ื˜ ื“ืขื ื‘ืึธื›ืขืจ ืื™ืŸ ืื•ื ื“ื–ืขืจ ื™ื ืขืจืœืขืš ืคึผืจื•ื™ืขืงื˜ ืคืœืึธื•ื–. ื ื•, ืื™ืŸ ืคืึทืงื˜: ืžื™ืจ ืฉื•ื™ืŸ ื ื•ืฆืŸ SQL Server, ืื•ืŸ ืขืก ื•ื•ืึธืœื˜ ื–ื™ื™ืŸ ืขืคืขืก ืงืจื•ื ื ื™ืฉื˜ ืฆื• ื ื•ืฆืŸ ื“ื™ ETL ืžื›ืฉื™ืจื™ื. ืึทืœืฅ ื•ื•ืขื’ืŸ ืื™ื ืื™ื– ื’ื•ื˜: ื“ื™ ืฆื•ื‘ื™ื ื“ ืื™ื– ืฉื™ื™ืŸ, ืื•ืŸ ื“ื™ ืคึผืจืึธื’ืจืขืก ืจื™ืคึผืึธืจืฅ ... ืึธื‘ืขืจ ื“ืึธืก ืื™ื– ื ื™ืฉื˜ ื•ื•ืึธืก ืžื™ืจ ืœื™ื‘ืข ื•ื•ื™ื™ื›ื•ื•ืืจื’ ืคึผืจืึธื“ื•ืงื˜ืŸ, ื˜ืึทืงืข, ื ื™ืฉื˜ ืคึฟืึทืจ ื•ื•ืึธืก. ื•ื•ืขืจืกื™ืข ืขืก dtsx (ื•ื•ืึธืก ืื™ื– ืงืกืžืœ ืžื™ื˜ ื ืึธื•ื“ื– ื•ื•ืึธืก ื–ืขื ืขืŸ ื’ืขืžื™ืฉื˜ ื•ื•ืขืŸ ื’ืขืจืื˜ืขื•ื•ืขื˜) ืžื™ืจ ืงืขื ืขืŸ, ืึธื‘ืขืจ ื•ื•ืึธืก ืื™ื– ื“ื™ ืคื•ื ื˜? ื•ื•ื™ ื•ื•ืขื’ืŸ ืžืึทื›ืŸ ืึท ืคึผืขืงืœ ืคื•ืŸ ื˜ืึทืกืงืก ื•ื•ืึธืก ื•ื•ืขื˜ ืฉืœืขืคึผืŸ ืึท ื”ื•ื ื“ืขืจื˜ ื˜ื™ืฉืŸ ืคื•ืŸ ืื™ื™ืŸ ืกืขืจื•ื•ืขืจ ืฆื• ืื ื“ืขืจืŸ? ืคืืจื•ื•ืืก, ืึท ื”ื•ื ื“ืขืจื˜, ืฆื•ื•ืึทื ืฆื™ืง ืคื•ืŸ ื–ื™ื™ ื•ื•ืขื˜ ืžืึทื›ืŸ ื“ื™ื™ืŸ ืื™ื ื“ืขืงืก ืคื™ื ื’ืขืจ ืคืึทืœืŸ ืึทื•ื•ืขืง ื‘ืฉืขืช ืงืœื™ืงื™ื ื’ ื“ื™ ืžื•ื™ื– ืงื ืขืคึผืœ. ืื‘ืขืจ ืขืก ื‘ืืฉื˜ื™ืžื˜ ืงื•ืงื˜ ืžืขืจ ืžืึธื“ืขืจืŸ:

    ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

ืžื™ืจ ื”ืื‘ืŸ ื‘ืืฉื˜ื™ืžื˜ ืงื•ืงืŸ ืคึฟืึทืจ ื•ื•ืขื’ืŸ ืื•ื™ืก. ืขืก ืื™ื– ืืคื™ืœื• ืงื™ืžืึทื˜ ืขืก ื’ืขืงื•ืžืขืŸ ืฆื• ืึท ื–ื™ืš-ื’ืขืฉืจื™ื‘ืŸ SSIS ืคึผืขืงืœ ื’ืขื ืขืจืึทื˜ืึธืจ ...

... ืื•ืŸ ื“ืขืžืึธืœื˜ ืึท ื ื™ื™ึทืข ืึทืจื‘ืขื˜ ื’ืขืคึฟื•ื ืขืŸ ืžื™ืจ. ืื•ืŸ ืื•ื™ืฃ ืื™ื ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื• ืึธื•ื•ื•ืขืจื˜ื•ืง ืžื™ืจ.

ื•ื•ืขืŸ ืื™ืš ื’ืขืœืขืจื ื˜ ืึทื– ETL ืคึผืจืึธืฆืขืก ื“ื™ืกืงืจื™ืคึผืฉืึทื ื– ื–ืขื ืขืŸ ืคึผื•ื ืงื˜ ืคึผื™ื˜ื”ืึธืŸ ืงืึธื“, ืื™ืš ืงืขืŸ ื ื™ืฉื˜ ื”ืึธื‘ืŸ ื’ืขื•ื•ืขืŸ ืžืขืจ ื•ื•ื™ ื“ืึทื ืกื™ื ื’ ืคึฟืึทืจ ืคืจื™ื™ื“. ื“ืึธืก ืื™ื– ื•ื•ื™ ื“ืึทื˜ืŸ ืกื˜ืจื™ืžื– ื–ืขื ืขืŸ ืื•ื ื˜ืขืจื˜ืขื ื™ืง ืฆื• ื•ื•ืขืจื™ื™ื™ืฉืึทืŸ ืื•ืŸ ื“ื™ืคืคืขืจื™ื ื’, ืื•ืŸ ื’ื™ืกืŸ ื˜ื™ืฉืŸ ืžื™ื˜ ืึท ืื™ื™ืŸ ืกื˜ืจื•ืงื˜ื•ืจ ืคื•ืŸ ื”ื•ื ื“ืขืจื˜ืขืจ ืคื•ืŸ ื“ืึทื˜ืึทื‘ื™ื™ืกื™ื– ืื™ืŸ ืื™ื™ืŸ ืฆื™ืœ ืื™ื– ื’ืขื•ื•ืืจืŸ ืึท ืขื ื™ืŸ ืคื•ืŸ ืคึผื™ื˜ื”ืึธืŸ ืงืึธื“ ืื™ืŸ ืื™ื™ืŸ ืื•ืŸ ืึท ื”ืึทืœื‘ ืฆื• ืฆื•ื•ื™ื™ 13 "ืกืงืจื™ื ื–.

ืึทืกืขืžื‘ืึทืœ ืึท ืงื ื•ื™ืœ

ืœืึธืžื™ืจ ื ื™ืฉื˜ ืžืึทื›ืŸ ืขืก ืึท ื’ืึทื ืฅ ืงื™ื ื“ืขืจ - ื’ืึธืจื˜ืŸ ืื•ืŸ ื ื™ืฉื˜ ืจืขื“ืŸ ื•ื•ืขื’ืŸ ื’ืึธืจ ืงืœืึธืจ ื•ื•ื™ ื“ืขืจ ื˜ืึธื’ ื˜ื™ื ื’ื– ื“ืึธ, ื•ื•ื™ ื™ื ืกื˜ืึธืœื™ื ื’ ืึทื™ืจืคืœืึธื•ื•, ื“ื™ื™ืŸ ืื•ื™ืกื“ืขืจื•ื•ื™ื™ืœื˜ืข ื“ืึทื˜ืึทื‘ื™ื™ืก, ืกืขืœื“ืขืจื™ื™ ืื•ืŸ ืื ื“ืขืจืข ื–ืื›ืŸ ื“ื™ืกืงืจื™ื™ื‘ื“ ืื™ืŸ ื“ื™ ื“ืึธืงื•ืžืขื ื˜ืŸ.

ืึทื–ื•ื™ ืึทื– ืžื™ืจ ืงืขื ืขืŸ ืึธื ื”ื™ื™ื‘ืŸ ืขืงืกืคึผืขืจืึทืžืขื ื˜ื™ื ื’ ื’ืœื™ื™ืš, ืื™ืš ืกืงืขื˜ืฉื˜ ืื•ื™ืก docker-compose.yml ืื™ืŸ ื•ื•ืขืœืขื›ืข:

  • ื–ืืœ ืก ื›ืึทืคึผืŸ ื“ื™ ืคืึทืงื˜ื™ืฉ ืึทื™ืจืคืœืึธื•ื•: ืกื˜ืฉืขื“ื•ืœืขืจ, ื•ื•ืขื‘ืกืขืจื•ื•ืขืจ. ื‘ืœื•ื ื•ื•ืขื˜ ืื•ื™ืš ืœื•ื™ืคืŸ ื“ืึธืจื˜ ืคึฟืึทืจ ืžืึธื ื™ื˜ืึธืจื™ื ื’ ืกืขืœื“ืขืจื™ื™ ื˜ืึทืกืงืก (ื•ื•ื™ื™ึทืœ ืขืก ืื™ื– ืฉื•ื™ืŸ ืคึผื•ืฉื˜ ืื™ืŸ apache/airflow:1.10.10-python3.7, ืื•ืŸ ืžื™ืจ ื˜ืึธืŸ ื ื™ื˜ ื˜ืจืึทื›ื˜ืŸ);
  • ืคึผืึธืกื˜ื’ืจืขืกืงืœ, ืื™ืŸ ื•ื•ืึธืก ืึทื™ืจืคืœืึธื•ื• ื•ื•ืขื˜ ืฉืจื™ื™ึทื‘ืŸ ื–ื™ื™ึทืŸ ืกืขืจื•ื•ื™ืก ืื™ื ืคึฟืึธืจืžืึทืฆื™ืข (ืกืงืขื“ื–ืฉื•ืœืขืจ ื“ืึทื˜ืŸ, ื“ื•ืจื›ืคื™ืจื•ื ื’ ืกื˜ืึทื˜ื™ืกื˜ื™ืง, ืืื–"ื• ื•), ืื•ืŸ ืกืขืœืขืจื™ ื•ื•ืขื˜ ืฆื™ื™ื›ืŸ ื’ืขืขื ื“ื™ืงื˜ ื˜ืึทืกืงืก;
  • ืจืขื“ื™ืก, ื•ื•ืืก ื•ื•ืขื˜ ืฉืคึผื™ืœืŸ ื•ื•ื™ ืึท ืึทืจื‘ืขื˜ ืžืขืงืœืขืจ ืคึฟืึทืจ ืกืขืœื“ืขืจื™ื™;
  • ืกืขืœื“ืขืจื™ื™ ืึทืจื‘ืขื˜ืขืจ, ืฐืขืœื›ืข ืจ ืฐืข ื˜ ื’ืœืฒ ืš ืื•ื™ืกืคื™ืจ ืŸ ื“ ื™ ืื•ื™ืคื’ืื‘ืŸ .
  • ืฆื• ื˜ืขืงืข ./dags ืžื™ืจ ื•ื•ืขืœืŸ ืฉื˜ืขืœืŸ ืฆื•ื–ืึทืžืขืŸ ืื•ื ื“ื–ืขืจ ื˜ืขืงืขืก ืžื™ื˜ ื“ื™ืกืงืจื™ืคึผืฉืึทื ื– ืคื•ืŸ ื“ื™ ื“ืึทื’ืก. ื–ื™ื™ ื•ื•ืขืœืŸ ื–ื™ื™ืŸ ืคึผื™ืงื˜ ืึทืจื•ื™ืฃ ืื•ื™ืฃ ื“ื™ ืคืœื™ืขืŸ, ืึทื–ื•ื™ ืขืก ืื™ื– ื ื™ื˜ ื“ืึทืจืคึฟืŸ ืฆื• ืžืึทืš ื“ื™ ื’ืื ืฆืข ืึธื ืœื™ื™ื’ืŸ ื ืึธืš ื™ืขื“ืขืจ ื ื™ืกืŸ.

ืื™ืŸ ืขื˜ืœืขื›ืข ืขืจื˜ืขืจ ื“ื™ ืงืึธื“ ืื™ืŸ ื“ื™ ื‘ื™ื™ืฉืคื™ืœืŸ ืื™ื– ื ื™ืฉื˜ ื’ืขื’ืขื‘ืŸ ืื™ืŸ ืคื•ืœ (ื›ื“ื™ ื ื™ืฉื˜ ืฆื• ืงืœืึทื˜ืขืจ ื“ื™ ื˜ืขืงืกื˜), ืื•ืŸ ืื™ืŸ ืขื˜ืœืขื›ืข ืขืจื˜ืขืจ ืขืก ืื™ื– ืžืึทื“ืึทืคื™ื™ื“ ืื™ืŸ ื“ืขื ืคึผืจืึธืฆืขืก. ื’ืึทื ืฅ ืืจื‘ืขื˜ืŸ ืงืึธื“ ื‘ื™ื™ืฉืคื™ืœืŸ ืงืขื ืขืŸ ื–ื™ื™ืŸ ื’ืขืคึฟื•ื ืขืŸ ืื™ืŸ ื“ื™ ืจื™ืคึผืึทื–ืึทื˜ืึธืจื™ https://github.com/dm-logv/airflow-tutorial.

ื“ืึธืงืงืขืจ-ืงืึธืžืคึผืึธืกืข.ื™ืžืœ

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

ื”ืขืจื•ืช:

  • ื‘ืฒ ื ืฆื•ื–ืืžืขื ืฉื˜ืขืœ ืŸ ื“ืข ื ืงืืžืคืื–ื™ืฆื™ืข , ื”ื ื‘ ืื™ ืš ื–ื™ ืš ืžื™ ื˜ ื“ืข ืจ ื’ืจืขืื˜ืข ืจ ืคืืจืœืื– ื˜ ืื•ื™ ืฃ ื“ืข ื ื‘ืืงืื ื˜ ืŸ ื‘ื™ืœื“ ืคึผื•ืงืงืขืœ / ื“ืึธืงืงืขืจ-ืขืจืคืœืึธื•ื• - ื–ื™ื™ื˜ ื–ื™ื›ืขืจ ืฆื• ืงืึธื ื˜ืจืึธืœื™ืจืŸ ืขืก. ืืคึฟืฉืจ ืื™ืจ ื•ื•ืขื˜ ื ื™ืฉื˜ ื“ืึทืจืคึฟืŸ ืขืคึผืขืก ืึทื ื“ืขืจืฉ ืื™ืŸ ืœืขื‘ืŸ.
  • ื›ืœ ืึทื™ืจืคืœืึธื•ื• ืกืขื˜ื˜ื™ื ื’ืก ื–ืขื ืขืŸ ื‘ืืจืขื›ื˜ื™ื’ื˜ ื ื™ื˜ ื‘ืœื•ื™ื– ื“ื•ืจืš airflow.cfg, ืึธื‘ืขืจ ืื•ื™ืš ื“ื•ืจืš ื™ื ื•ื•ื™ื™ืจืึทื ืžืขื ืึทืœ ื•ื•ืขืจื™ืึทื‘ืึทืœื– (ื›ื‘ื•ื“ ืฆื• ื“ื™ ื“ืขื•ื•ืขืœืึธืคึผืขืจืก), ื•ื•ืึธืก ืื™ืš ื‘ื™ื™ื–ืขืœื™ ื’ืขื•ื•ื™ื™ื ื˜.
  • ื’ืขื•ื•ื™ื™ื ื˜ืœืขืš, ืขืก ืื™ื– ื ื™ืฉื˜ ืคึผืจืึธื“ื•ืงืฆื™ืข-ื’ืจื™ื™ื˜: ืื™ืš ื“ื™ืœื™ื‘ืจืึทื˜ืœื™ ื”ืื˜ ื ื™ืฉื˜ ืฉื˜ืขืœืŸ ื›ืึทืจื˜ื‘ืขืึทืฅ ืื•ื™ืฃ ื“ื™ ืงืึทื ื˜ื™ื™ื ืขืจื– ืื•ืŸ ื ื™ืฉื˜ ืึทืจืŸ ืžื™ื˜ ื–ื™ื›ืขืจื”ื™ื™ื˜. ืื‘ืขืจ ืื™ืš ื”ืื˜ ื“ื™ ืžื™ื ื™ืžื•ื ื•ื•ืึธืก ืื™ื– ืคึผืึทืกื™ืง ืคึฟืึทืจ ืื•ื ื“ื–ืขืจ ื™ืงืกืคึผืขืจืึทืžืึทื ืฅ.
  • ื‘ืืžืขืจืง ืื–:
    • ื“ืขืจ ื˜ืขืงืข ืžื™ื˜ ื˜ืึธื’ ื–ืึธืœ ื–ื™ื™ืŸ ืฆื•ื˜ืจื™ื˜ืœืขืš ืคึฟืึทืจ ื‘ื™ื™ื“ืข ื“ื™ ืกืงืขื“ื–ืฉื•ืœืขืจ ืื•ืŸ ื˜ื•ืขืจืก.
    • ื“ืขืจ ื–ืขืœื‘ื™ืงืขืจ ืึทืคึผืœื™ื™ื– ืฆื• ืึทืœืข ื“ืจื™ื˜-ืคึผืึทืจื˜ื™ื™ ืœื™ื™ื‘ืจืขืจื™ื– - ื–ื™ื™ ืึทืœืข ืžื•ื–ืŸ ื–ื™ื™ืŸ ืื™ื ืกื˜ืึทืœื™ืจืŸ ืื•ื™ืฃ ืžืืฉื™ื ืขืŸ ืžื™ื˜ ืึท ืกืงืขื“ื–ืฉื•ืœืขืจ ืื•ืŸ ื˜ื•ืขืจืก.

ื ื•, ืื™ืฆื˜ ืขืก ืื™ื– ืคึผืฉื•ื˜:

$ docker-compose up --scale worker=3

ื ืึธืš ืึทืœืฅ ืื™ื– ืึทืจื•ื™ืฃ, ืื™ืจ ืงืขื ืขืŸ ืงื•ืงืŸ ืื™ืŸ ื“ื™ ื•ื•ืขื‘ ื™ื ื˜ืขืจืคื™ื™ืกื™ื–:

Basic concepts

ืื•ื™ื‘ ืื™ืจ ื˜ืึธืŸ ื ื™ื˜ ืคึฟืึทืจืฉื˜ื™ื™ืŸ ืขืคึผืขืก ืื™ืŸ ืึทืœืข ื“ื™ "ื˜ืึธื’", ื“ืึธ ืื™ื– ืึท ืงื•ืจืฅ ื’ืœืึธืกืึทืจ:

  • ืกื˜ืฉืขื“ื•ืœืขืจ - ื“ืขืจ ืžืขืจืกื˜ ื•ื•ื™ื›ื˜ื™ืง ื‘ืึธื›ืขืจ ืื™ืŸ ืึทื™ืจืคืœืึธื•ื•, ื•ื•ืึธืก ืžืื›ื˜ ื–ื™ื›ืขืจ ืึทื– ืจืึธื•ื‘ืึทืฅ ืึทืจื‘ืขื˜ ืฉื•ื•ืขืจ, ืื•ืŸ ื ื™ืฉื˜ ืžืขื ื˜ืฉืŸ: ืขืจ ืžืึธื ื™ื˜ืึธืจืก ื“ื™ ืคึผืœืึทืŸ, ื“ืขืจื”ื™ื™ึทื ื˜ื™ืงื•ื ื’ืขืŸ ื“ืึทื˜ืŸ, ืœื•ื™ืคื˜ ื˜ืึทืกืงืก.

    ืื™ืŸ ืึทืœื’ืขืžื™ื™ืŸ, ืื™ืŸ ืขืœื˜ืขืจืข ื•ื•ืขืจืกื™ืขืก, ืขืก ื”ืื˜ ืคึผืจืึธื‘ืœืขืžืก ืžื™ื˜ ื–ื™ืงืึธืจืŸ (ื ื™ื™ืŸ, ื ื™ื˜ ืึทืžื ื™ื–ืฉืึท, ืึธื‘ืขืจ ืœื™ืงืก) ืื•ืŸ ืขืก ืื™ื– ื’ืขื•ื•ืขืŸ ืืคื™ืœื• ืึท ืœืขื’ืึทื˜ ืคึผืึทืจืึทืžืขื˜ืขืจ ืื™ืŸ ื“ื™ ืงืึธื ืคื™ื’ืก run_duration - ื–ื™ื™ึทืŸ ืจื™ืกื˜ืึทืจื˜ ืžืขื”ืึทืœืขืš. ืื‘ืขืจ ืื™ืฆื˜ ืึทืœืฅ ืื™ื– ื’ื•ื˜.

  • DAG (aka "dag") ืื™ื– ืึท "ื“ื™ืจืขืงื˜ืขื“ ืึทืกื™ืงืœื™ืง ื’ืจืึทืคื™ืง", ืึธื‘ืขืจ ืึทื–ืึท ืึท ื“ืขืคึฟื™ื ื™ืฆื™ืข ื•ื•ืขื˜ ืžื™ื™ื ืขืŸ ืงืœื™ื™ืŸ ืคึฟืึทืจ ื•ื•ืขืจ ืขืก ื™ื–, ืึธื‘ืขืจ ืื™ืŸ ืขืกืึทื ืก ืขืก ืื™ื– ืึท ืงืึทื ื˜ื™ื™ื ืขืจ ืคึฟืึทืจ ื˜ืึทืกืงืก ื™ื ื˜ืขืจืึทืงื˜ื™ื ื’ ืžื™ื˜ ื™ืขื“ืขืจ ืื ื“ืขืจืขืจ (ื–ืขืŸ ื•ื•ื™ื™ื˜ืขืจ) ืึธื“ืขืจ ืึทืŸ ืึทื ืึทืœืึธื’ ืคื•ืŸ ืคึผืขืงืœ ืื™ืŸ SSIS ืื•ืŸ ื•ื•ืึธืจืงืคืœืึธื•ื•. ืื™ืŸ ื™ื ืคืึธืจืžืึทื˜ื™ืงืึท.

    ื ื—ื• ืฅ ื“ ื™ ื˜ื ื’ ืงืข ืŸ ืื•ื™ ืš ื–ืฒ ืŸ ืฉื‘ืชื™ื , ืื‘ืข ืจ ืžื™ ืจ ืฐืขืœ ืŸ ื ื™ืฉ ื˜ ืฆ ื• ื– ืฒ ื“ืขืจืงื•ืžืข ืŸ .

  • DAG Run โ€” ืึทืŸ ื™ื ื™ืฉื™ืืœื™ื–ื™ืจื˜ืŸ ื˜ืึธื’, ื•ื•ืึธืก ื•ื•ืขืจื˜ ื‘ืึทืฉื˜ื™ืžื˜ ืึทืŸ ืื™ื™ื’ืขื ืขื execution_date. ื“ืึทื’ืจืึทื ื– ืคื•ืŸ ืื™ื™ืŸ ื˜ืึธื’ ืงืขื ืขืŸ ื’ืึทื ืฅ ืึทืจื‘ืขื˜ ืื™ืŸ ืคึผืึทืจืึทืœืขืœ (ืื•ื™ื‘, ืคื•ืŸ ืงื•ืจืก, ืื™ืจ ื”ืึธื˜ ื’ืขืžืื›ื˜ ื“ื™ื™ืŸ ื˜ืึทืกืงืก ื™ื™ื“ืขืžืคึผืึทื˜ืึทื ื˜).
  • ืึธืคึผืขืจืึทื˜ืึธืจ - ื“ืึธืก ื–ืขื ืขืŸ ืคึผืึทืจืฅ ืคื•ืŸ ืงืึธื“ ืคืึทืจืึทื ื˜ื•ื•ืึธืจื˜ืœืขืš ืคึฟืึทืจ ืคึผืขืจืคืึธืจืžื™ื ื’ ืึท ืกืคึผืขืฆื™ืคื™ืฉ ืงืึทืžืฃ. ืขืก ื–ืขื ืขืŸ ื“ืจื™ื™ ื˜ื™ื™ืคึผืก ืคื•ืŸ ืึธืคึผืขืจื™ื™ื˜ืขืจื–:
    • ืึทืงืฆื™ืข, ืฐ ื™ ืื•ื ื“ื–ืข ืจ ื‘ืืœื™ื‘ื˜ืข PythonOperator, ื•ื•ืึธืก ืงืขื ืขืŸ ื•ื™ืกืคื™ืจืŸ ืงื™ื™ืŸ (ื’ื™ืœื˜ื™ืง) ืคึผื™ื˜ื”ืึธืŸ ืงืึธื“;
    • ืึทืจื™ื‘ืขืจืคื™ืจืŸ, ื•ื•ืึธืก ืึทืจื™ื‘ืขืจืคื™ืจืŸ ื“ืึทื˜ืŸ ืคื•ืŸ ืึธืจื˜ ืฆื• ืึธืจื˜, ื–ืึธื’ืŸ MsSqlToHiveTransfer;
    • ืกืขื ืกืขืจ ืขืก ื•ื•ืขื˜ ืื•ื™ืš ืœืึธื–ืŸ ืื™ืจ ืฆื• ืจืขืึทื’ื™ืจืŸ ืึธื“ืขืจ ืคึผืึทืžืขืœืขืš ื“ื™ ื•ื•ื™ื™ึทื˜ืขืจ ื“ื•ืจื›ืคื™ืจื•ื ื’ ืคื•ืŸ ื“ื™ ื˜ืึธื’ ืื™ื™ื“ืขืจ ื“ื™ ืคึผืึทืกื™ืจื•ื ื’ ืคื•ืŸ ืงื™ื™ืŸ ื’ืขืฉืขืขื ื™ืฉ. HttpSensor ืงืขื ืขืŸ ืฆื™ืขืŸ ื“ื™ ืกืคึผืขืกื™ืคื™ืขื“ ืขื ื“ืคึผื•ื™ื ื˜, ืื•ืŸ ื•ื•ืขืŸ ื“ืขืจ ื’ืขื•ื•ืืœื˜ ืขื ื˜ืคืขืจ ืื™ื– ื‘ืืงื•ืžืขืŸ, ืึธื ื”ื™ื™ื‘ืŸ ื“ื™ ืึทืจื™ื‘ืขืจืคื™ืจืŸ GoogleCloudStorageToS3Operator. ืึท ื ื™ื™ึทื’ืขืจื™ืง ืžื™ื™ื ื•ื ื’ ื•ื•ืขื˜ ืคืจืขื’ืŸ: "ืคืืจื•ื•ืืก? ื ืึธืš ืึทืœืข, ืื™ืจ ืงืขื ืขืŸ ื˜ืึธืŸ ืจืขืคึผืึทื˜ื™ืฉืึทื ื– ืจืขื›ื˜ ืื™ืŸ ื“ืขืจ ืึธืคึผืขืจืึทื˜ืึธืจ! ืื•ืŸ ื“ืขืจื™ื‘ืขืจ, ืึทื–ื•ื™ ื•ื•ื™ ื ื™ืฉื˜ ืฆื• ืคืึทืจืœื™ื™ื’ืŸ ื“ื™ ืึทืจื‘ืขื˜ ื‘ืขืงืŸ ืžื™ื˜ ืกื˜ืึทืง ืึธืคึผืขืจื™ื™ื˜ืขืจื–. ื“ืขืจ ืกืขื ืกืขืจ ืกื˜ืึทืจืฅ, ื˜ืขืกืฅ ืื•ืŸ ืฉื˜ืึทืจื‘ืŸ ื‘ื™ื– ื“ืขืจ ื•ื•ื™ื™ึทื˜ืขืจ ืคึผืจื•ื•ื•ืŸ.
  • ืึทืจื‘ืขื˜ - ื“ืขืจืงืœืขืจื˜ ืึธืคึผืขืจื™ื™ื˜ืขืจื–, ืจืึทื’ืึทืจื“ืœืึทืก ืคื•ืŸ ื˜ื™ืคึผ, ืื•ืŸ ืึทื˜ืึทื˜ืฉื˜ ืฆื• ืึท ื˜ืึธื’ ื–ืขื ืขืŸ ืคึผืจืึธืžืึธื˜ืขื“ ืฆื• ื“ื™ ืจื™ื™ ืคื•ืŸ ืึทืจื‘ืขื˜.
  • ืึทืจื‘ืขื˜ ื‘ื™ื™ึทืฉืคึผื™ืœ โ€” ื•ื•ืขืŸ ื“ืขืจ ืืœื’ืขืžื™ื™ื ืขืจ ืคืœืื ืขืจ ื”ืื˜ ื‘ืืฉืœืืกืŸ ืื– ืก'ืื™ื– ืฆื™ื™ื˜ ืฆื• ืฉื™ืงืŸ ื“ื™ ืื•ื™ืคื’ืื‘ืŸ ืื™ืŸ ืงืืžืฃ ืงืขื’ืŸ ื“ื™ ืืจื‘ื™ื™ื˜ืขืจ-ืืจื‘ืขื˜ืขืจ (ื’ืœื™ื™ืš ืื•ื™ืคืŸ ืืจื˜, ืื•ื™ื‘ ืžื™ืจ ื ื•ืฆืŸ LocalExecutor ืึธื“ืขืจ ืฆื• ืึท ื•ื•ื™ื™ึทื˜ ื ืึธื“ืข ืื™ืŸ ื“ื™ ืคืึทืœ ืคื•ืŸ CeleryExecutor), ืขืก ืึทืกื™ื™ื ื– ื–ื™ื™ ืึท ืงืึธื ื˜ืขืงืกื˜ (ื“"ื” ืึท ืกื›ื•ื ืคื•ืŸ ื•ื•ืขืจื™ืึทื‘ืึทืœื– - ื“ื•ืจื›ืคื™ืจื•ื ื’ ืคึผืึทืจืึทืžืขื˜ืขืจืก), ื™ืงืกืคึผืึทื ื“ื– ื‘ืึทืคึฟืขืœ ืึธื“ืขืจ ื‘ืขื˜ืŸ ื˜ืขืžืคึผืœืึทื˜ืขืก ืื•ืŸ ืฉื˜ืขืœืŸ ื–ื™ื™ ืื™ืŸ ืึท ื‘ืขืงืŸ.

ื“ื–ืฉืขื ืขืจื™ื™ื˜ื™ื ื’ ื˜ืึทืกืงืก

ืขืจืฉื˜ืขืจ, ืœืึธื–ืŸ ืก ืึทื•ื˜ืœื™ื™ืŸ ื“ื™ ืึทืœื’ืขืžื™ื™ื ืข ืกื›ืขืžืข ืคื•ืŸ โ€‹โ€‹โ€‹โ€‹ืื•ื ื“ื–ืขืจ ื˜ืึธื’, ืื•ืŸ ื“ืขืžืึธืœื˜ ืžื™ืจ ื•ื•ืขืœืŸ ื•ื ื˜ืขืจื˜ื•ืงื  ื–ื™ืš ืžืขืจ ืื•ืŸ ืžืขืจ ืื™ืŸ ื“ื™ ื“ืขื˜ืึทื™ืœืก, ื•ื•ื™ื™ึทืœ ืžื™ืจ ื ื•ืฆืŸ ืขื˜ืœืขื›ืข ื ื™ื˜-ื ื™ื˜ื•ื•ื™ืึทืœ ืกืึทืœื•ืฉืึทื ื–.

ืึทื–ื•ื™, ืื™ืŸ ื–ื™ื™ืŸ ืกื™ืžืคึผืœืึทืกื˜ ืคืึธืจืขื, ืึทื–ืึท ืึท ื˜ืึธื’ ื•ื•ืขื˜ ืงื•ืงืŸ ื•ื•ื™ ื“ืึธืก:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

ื“ืึทื•ื•ืึทื™ื˜ืข ืจืึทื–ื‘ื™ืจืึทืฅ:

  • ืขืจืฉื˜ืขืจ, ืึทืจื™ื™ึทื ืคื™ืจ ื“ื™ ืคืืจืœืื ื’ื˜ ืœื™ื‘ืก ืื•ืŸ ืขืคึผืขืก ืึทื ื“ืขืจืฉ;
  • sql_server_ds - ืื™ื– List[namedtuple[str, str]] ืžื™ื˜ ื“ื™ ื ืขืžืขืŸ ืคื•ืŸ ืงืึทื ืขืงืฉืึทื ื– ืคื•ืŸ ืึทื™ืจืคืœืึธื•ื• ืงืึทื ืขืงืฉืึทื ื– ืื•ืŸ ื“ื™ ื“ืึทื˜ืึทื‘ื™ื™ืกื™ื– ืคื•ืŸ ื•ื•ืึธืก ืžื™ืจ ื•ื•ืขืœืŸ ื ืขืžืขืŸ ืื•ื ื“ื–ืขืจ ื˜ืขืœืขืจ;
  • dag - ืึท ืžืขืœื“ืŸ ืคื•ืŸ ืื•ื ื“ื–ืขืจ ื˜ืึธื’, ื•ื•ืึธืก ืžื•ื–ืŸ ื–ื™ื™ืŸ ืื™ืŸ globals(), ืึทื ื“ืขืจืฉ ืึทื™ืจืคืœืึธื•ื• ื•ื•ืขื˜ ื ื™ืฉื˜ ื’ืขืคึฟื™ื ืขืŸ ืขืก. ื“ืึทื’ ืื•ื™ืš ื“ืึทืจืฃ ืฆื• ื–ืึธื’ืŸ:
    • ื•ื•ืึธืก ืื™ื– ื–ื™ื™ืŸ ื ืึธืžืขืŸ orders - ื“ืขืจ ื ืึธืžืขืŸ ื•ื•ืขื˜ ื–ื™ื™ืŸ ื’ืขื•ื•ื™ื–ืŸ ืื™ืŸ ื“ื™ ื•ื•ืขื‘ ืฆื•ื‘ื™ื ื“,
    • ืึทื– ืขืก ื•ื•ืขื˜ ืึทืจื‘ืขื˜ืŸ ืกื˜ืึทืจื˜ื™ื ื’ ื‘ื™ื™ึท ื”ืึทืœื‘ื ืึทื›ื˜ ืื•ื™ืฃ ื™ื•ืœื™ 8,
    • ืื•ืŸ ืขืก ื–ืึธืœ ืงืึทื˜ืขืจ ื‘ืขืขืจืขืš ื™ืขื“ืขืจ 6 ืฉืขื” (ืคึฟืึทืจ ื“ื™ ืงื™ืœ ื’ื™ื™ื–, ื“ืึธ ืึทื ืฉื˜ืึธื˜ timedelta() ืขืจืœื•ื™ื‘ื˜ cron-ืœื™ื ื™ืข 0 0 0/6 ? * * *, ืคึฟืึทืจ ื“ื™ ื•ื•ื™ื™ื ื™ืงืขืจ ืงื™ืœ - ืึทืŸ ืื•ื™ืกื“ืจื•ืง ื•ื•ื™ @daily);
  • workflow() ื•ื•ืขื˜ ื˜ืึธืŸ ื“ื™ ื”ื•ื™ืคึผื˜ ืึทืจื‘ืขื˜, ืึธื‘ืขืจ ื ื™ืฉื˜ ืื™ืฆื˜. ืื™ืฆื˜ ืžื™ืจ ื•ื•ืขืœืŸ ืคืฉื•ื˜ ื“ืึทืžืคึผ ืื•ื ื“ื–ืขืจ ืงืึธื ื˜ืขืงืกื˜ ืื™ืŸ ื“ื™ ืงืœืึธืฅ.
  • ืื•ืŸ ืื™ืฆื˜ ื“ื™ ืคึผืฉื•ื˜ ืžืึทื’ื™ืฉ ืคื•ืŸ ืงืจื™ื™ื™ื˜ื™ื ื’ ื˜ืึทืกืงืก:
    • ืœืึธืžื™ืจ ื“ื•ืจื›ื’ื™ื™ืŸ ืื•ื ื“ื–ืขืจืข ืžืงื•ืจื™ื;
    • ื™ื ื™ืฉืึทืœื™ื™ื– PythonOperator, ืฐืขืœื›ืข ืจ ืฐืข ื˜ ืื•ื™ืืคื™ืจ ืŸ ืื•ื ื“ื–ืข ืจ ื‘ืืง workflow(). ื“ื• ื–ืืœืกื˜ ื ื™ืฉื˜ ืคืึทืจื’ืขืกืŸ ืฆื• ืกืคึผืขืฆื™ืคื™ืฆื™ืจืŸ ืึท ื™ื™ื ืฆื™ืง (ืื™ืŸ ื“ื™ ื˜ืึธื’) ื ืึธืžืขืŸ ืคื•ืŸ ื“ื™ ืึทืจื‘ืขื˜ ืื•ืŸ ืฆื•ื˜ืฉืขืคึผืขืŸ ื“ื™ ื˜ืึธื’ ื–ื™ืš. ืคืึธืŸ provide_context ืื™ืŸ ืงืขืจ, ื•ื•ืขื˜ ื’ื™ืกืŸ ื ืึธืš ืึทืจื’ื•ืžืขื ื˜ืŸ ืื™ืŸ ื“ื™ ืคึฟื•ื ืงืฆื™ืข, ื•ื•ืึธืก ืžื™ืจ ื•ื•ืขืœืŸ ืงืขืจืคืึทืœื™ ืงืœื™ื™ึทื‘ืŸ ื ื™ืฆืŸ **context.

ืึทื– ืก ืึทืœืข ืคึฟืึทืจ ืื™ืฆื˜. ื•ื•ืึธืก ืžื™ืจ ื”ืึธื‘ืŸ:

  • ื ื™ื™ึท ื˜ืึธื’ ืื™ืŸ ื“ื™ ื•ื•ืขื‘ ืฆื•ื‘ื™ื ื“,
  • ืื™ื™ืŸ ืื•ืŸ ืึท ื”ืึทืœื‘ ื”ื•ื ื“ืขืจื˜ ื˜ืึทืกืงืก ื•ื•ืึธืก ื•ื•ืขื˜ ื–ื™ื™ืŸ ืขืงืกืึทืงื™ื•ื˜ืึทื“ ืื™ืŸ ืคึผืึทืจืึทืœืขืœ (ืื•ื™ื‘ ื“ื™ ืึทื™ืจืคืœืึธื•ื•, ืกืขืœื“ืขืจื™ื™ ืื•ืŸ ืกืขืจื•ื•ืขืจ ืžืึทื›ื˜ ืกืขื˜ื˜ื™ื ื’ืก ืœืึธื–ืŸ ืขืก).

ื ื•, ืžื™ืจ ื›ึผืžืขื˜ ื”ืึธื‘ืŸ ืขืก.

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ
ื•ื•ืขืจ ื•ื•ืขื˜ ื™ื ืกื˜ืึทืœื™ืจืŸ ื“ื™ ื“ื™ืคึผืขื ื“ืึทื ืกื™ื–?

ืฆื• ืคืึทืจืคึผืึธืฉืขื˜ืขืจืŸ ื“ืขื ื’ืึทื ืฆืŸ ืขื ื™ืŸ, ื”ืึธื‘ ืื™ืš ืขืก ืึทืจืฒึทื ื’ืขืœื™ื™ื’ื˜ docker-compose.yml ืคึผืจืึทืกืขืกื™ื ื’ requirements.txt ืื•ื™ืฃ ืึทืœืข ื ืึธื•ื“ื–.

ืื™ืฆื˜ ืžื™ืจ ื’ื™ื™ืŸ:

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

ื’ืจื™ื™ ืกืงื•ื•ืขืจื– ื–ืขื ืขืŸ ืึทืจื‘ืขื˜ ื™ื ืกื˜ืึทื ืกื™ื– ืคึผืจืึทืกืขืกื˜ ื“ื•ืจืš ื“ื™ ืกืงืขื“ื–ืฉื•ืœืขืจ.

ืžื™ืจ ื•ื•ืึทืจื˜ืŸ ืึท ื‘ื™ืกืœ, ื“ื™ ื˜ืึทืกืงืก ื–ืขื ืขืŸ ืกื ืึทืคึผื˜ ื“ื•ืจืš ื˜ื•ืขืจืก:

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

ื“ื™ ื’ืจื™ืŸ, ืคื•ืŸ ืงื•ืจืก, ื”ืึธื‘ืŸ ื’ืขืืจื‘ืขื˜ ื”ืฆืœื—ื”. ืจื•ื™ื˜ ื–ืขื ืขืŸ ื ื™ืฉื˜ ื–ื™ื™ืขืจ ืžืฆืœื™ื—.

ื“ื•ืจืš ื“ืขื ื•ื•ืขื’, ืขืก ืื™ื– ืงื™ื™ืŸ ื˜ืขืงืข ืื•ื™ืฃ ืื•ื ื“ื–ืขืจ ืคึผืจืึธื“ื•ืงื˜ ./dags, ืขืก ืื™ื– ืงื™ื™ืŸ ืกื™ื ื’ืงืจืึทื ืึทื–ื™ื™ืฉืึทืŸ ืฆื•ื•ื™ืฉืŸ ืžืืฉื™ื ืขืŸ - ืึทืœืข ื“ื™ ื“ืึทื˜ืŸ ื–ืขื ืขืŸ ืื™ืŸ git ืื•ื™ืฃ ืื•ื ื“ื–ืขืจ Gitlab, ืื•ืŸ Gitlab CI ื“ื™ืกื˜ืจื™ื‘ื™ื•ืฅ ื“ืขืจื”ื™ื™ึทื ื˜ื™ืงื•ื ื’ืขืŸ ืฆื• ืžืืฉื™ื ืขืŸ ื•ื•ืขืŸ ืžืขืจื“ื–ืฉื™ื ื’ ืื™ืŸ master.

ื ื‘ื™ืกืœ ื•ื•ืขื’ืŸ ื‘ืœื•ื

ื‘ืฉืขืช ื“ื™ ืึทืจื‘ืขื˜ืขืจ ืžืึธืœืŸ ืื•ื ื“ื–ืขืจ ื“ืึทืžื™ ืฉืึทืคืึทืœื–, ืœืึธืžื™ืจ ื’ืขื“ืขื ืงืขืŸ ื•ื•ืขื’ืŸ ืืŸ ืื ื“ืขืจ ื’ืขืฆื™ื™ึทื’ ื•ื•ืึธืก ืงืขื ืขืŸ ื•ื•ื™ื™ึทื–ืŸ ืื•ื ื“ื– ืขืคึผืขืก - ื‘ืœื•ื.

ื“ืขืจ ืขืจืฉื˜ืขืจ ื‘ืœืึทื˜ ืžื™ื˜ ืงื™ืฆืขืจ ืื™ื ืคึฟืึธืจืžืึทืฆื™ืข ืื•ื™ืฃ ืึทืจื‘ืขื˜ ื ืึธื•ื“ื–:

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

ื“ื™ ืžืขืจืกื˜ ืกืึทื˜ืฉืขืจื™ื™ื˜ืึทื“ ื‘ืœืึทื˜ ืžื™ื˜ ื˜ืึทืกืงืก ื’ืขืฉื™ืงื˜ ืฆื• ืึทืจื‘ืขื˜:

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

ื“ื™ ืžืขืจืกื˜ ื ื•ื“ื ืข ื‘ืœืึทื˜ ืžื™ื˜ ื“ื™ ืกื˜ืึทื˜ื•ืก ืคื•ืŸ ืื•ื ื“ื–ืขืจ ืžืขืงืœืขืจ:

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

ื“ื™ ืžืขืจืกื˜ ืกื˜ืจื™ื™ืงื™ื ื’ ื‘ืœืึทื˜ ืื™ื– ืžื™ื˜ ื’ืจืึทืคืก ืคื•ืŸ ื“ื™ ืฉื˜ืึทื˜ ืคื•ืŸ ื˜ืึทืกืงืก ืื•ืŸ ื–ื™ื™ืขืจ ื“ื•ืจื›ืคื™ืจื•ื ื’ ืฆื™ื™ื˜:

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

ืžื™ืจ ืจื™ืœืึธื•ื“ ื“ื™ ืึทื ื“ืขืจืœืึธื•ื“ื™ื“

ืึทื–ื•ื™, ืึทืœืข ื“ื™ ื˜ืึทืกืงืก ื”ืึธื‘ืŸ ืฉื•ื™ืŸ ื’ืขืขื ื“ื™ืงื˜, ื“ื™ ื•ื•ื•ื ื“ืึทื“ ืงืขื ืขืŸ ื–ื™ื™ืŸ ื’ืขืคื™ืจื˜ ืึทื•ื•ืขืง.

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

ืื• ืŸ ืกืณื–ืฒื ืข ืŸ ื’ืขืฐืข ืŸ ื’ืื  ืฅ ืขื˜ืœืขื› ืข ืคืืจืฐืื•ื ื“ืขื˜ ืข โ€” ืคื• ืŸ ืืฒ ืŸ ืื• ืŸ ืื ื“ืขืจ ืข ืกื™ื‘ื” . ืื•ื™ื‘ ืึทื™ืจืคืœืึธื•ื• ืื™ื– ื’ืขื•ื•ื™ื™ื ื˜ ืจื™ื›ื˜ื™ืง, ื“ื™ ื–ืขืœื‘ืข ืกืงื•ื•ืขืจื– ืึธื ื•ื•ื™ื™ึทื–ืŸ ืึทื– ื“ื™ ื“ืึทื˜ืŸ ื–ืขื ืขืŸ ื‘ืืฉื˜ื™ืžื˜ ื ื™ืฉื˜ ืื ื’ืขืงื•ืžืขืŸ.

ืื™ืจ ื“ืึทืจืคึฟืŸ ืฆื• ืงื•ืงืŸ ืื™ืŸ ื“ื™ ืงืœืึธืฅ ืื•ืŸ ืจื™ืกื˜ืึทืจื˜ ื’ืขืคืืœืŸ ืึทืจื‘ืขื˜ ื™ื ืกื˜ืึทื ืกื™ื–.

ื“ื•ืจืš ื’ืขื‘ืŸ ืึท ืงืœื™ืง ืื•ื™ืฃ ืงื™ื™ืŸ ืงื•ื•ืึทื“ืจืึทื˜, ืžื™ืจ ื•ื•ืขืœืŸ ื–ืขืŸ ื“ื™ ืึทืงืฉืึทื ื– ื‘ื ื™ืžืฆื ืฆื• ืื•ื ื“ื–:

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

ืื™ืจ ืงืขื ืขืŸ ื ืขืžืขืŸ ืขืก ืื•ืŸ ืžืึทื›ืŸ ืงืœืึธืจ ืคึฟืึทืจ ื“ื™ ื’ืขืคืืœืŸ ืื™ื™ื ืขืจ. ืึทื– ืื™ื–, ืžื™ืจ ืคืึทืจื’ืขืกืŸ ืึทื– ืขืคึผืขืก ืื™ื– ื’ืขืคืืœืŸ ืื™ืŸ ื“ืึธืจื˜, ืื•ืŸ ื“ืขืจ ื–ืขืœื‘ื™ืงืขืจ ืึทืจื‘ืขื˜ ื‘ื™ื™ึทืฉืคึผื™ืœ ื•ื•ืขื˜ ื’ื™ื™ืŸ ืฆื• ื“ื™ ืกืงืขื“ื–ืฉื•ืœืขืจ.

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

ืขืก ืื™ื– ืงืœืึธืจ ืึทื– ืฆื• ื˜ืึธืŸ ื“ืึธืก ืžื™ื˜ ื“ื™ ืžื•ื™ื– ืžื™ื˜ ืึทืœืข ื“ื™ ืจื•ื™ื˜ ืกืงื•ื•ืขืจื– ืื™ื– ื ื™ืฉื˜ ื–ื™ื™ืขืจ ืžืขื ื˜ืฉืœืขืš - ื“ืึธืก ืื™ื– ื ื™ืฉื˜ ื•ื•ืึธืก ืžื™ืจ ื“ืขืจื•ื•ืึทืจื˜ืŸ ืคื•ืŸ Airflow. ื’ืขื•ื•ื™ื™ื ื˜ืœืขืš, ืžื™ืจ ื”ืึธื‘ืŸ ื•ื•ืึธืคืŸ ืคื•ืŸ ืžืึทืกืข ืฆืขืฉื˜ืขืจื•ื ื’: Browse/Task Instances

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

ื–ืืœ ืก ืื•ื™ืกืงืœื™ื™ึทื‘ืŸ ืึทืœืฅ ืื™ืŸ ืึทืžืึธืœ ืื•ืŸ ื‘ืึทืฉื˜ืขื˜ื™ืง ืขืก ืฆื• ื ื•ืœ, ื’ื™ื˜ ื“ื™ ืจื™ื›ื˜ื™ืง ื ื•ืžืขืจ:

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

ื ืึธืš ืจื™ื™ื ื™ืงื•ื ื’, ืื•ื ื“ื–ืขืจ ื˜ืึทืงืกื™ืก ืงื•ืงืŸ ื•ื•ื™ ื“ืึธืก (ื–ื™ื™ ืงืขื ืขืŸ ื ื™ืฉื˜ ื•ื•ืึทืจื˜ืŸ ืคึฟืึทืจ ื“ื™ ืกืงืขื“ื–ืฉื•ืœืขืจ ืฆื• ืคึผืœืึทืŸ ื–ื™ื™):

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

ืงืึทื ืขืงืฉืึทื ื–, ื›ื•ืงืก ืื•ืŸ ืื ื“ืขืจืข ื•ื•ืขืจื™ืึทื‘ืึทืœื–

ืขืก ืื™ื– ืฆื™ื™ื˜ ืฆื• ืงื•ืงืŸ ืื™ืŸ ื“ื™ ื•ื•ื™ื™ึทื˜ืขืจ DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""ะ“ะพัะฟะพะดะฐ ั…ะพั€ะพัˆะธะต, ะพั‚ั‡ะตั‚ั‹ ะพะฑะฝะพะฒะปะตะฝั‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ะะฐั‚ะฐัˆ, ะฟั€ะพัั‹ะฟะฐะนัั, ะผั‹ {{ dag.dag_id }} ัƒั€ะพะฝะธะปะธ
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

ืึทืœืขืžืขืŸ ื”ืื˜ ืืœืฅ ื“ืขืจื”ื™ื™ึทื ื˜ื™ืงื˜ ื–ื™ื™ืขืจ ืจื™ืคึผืึธืจืฅ, ืจืขื›ื˜? ื“ืึธ ืขืก ืื™ื– ื•ื•ื™ื“ืขืจ: ืขืก ืื™ื– ืึท ืจืฉื™ืžื” ืคื•ืŸ ืžืงื•ืจื™ื ืคื•ืŸ ื•ื•ืึธืก ืฆื• ื‘ืึทืงื•ืžืขืŸ ื“ืึทื˜ืŸ; ืขืก ืื™ื– ืึท ืจืฉื™ืžื” ืคื•ืŸ ื•ื•ื• ืฆื• ืฉื˜ืขืœืŸ ืขืก; ื“ื• ื–ืืœืกื˜ ื ื™ืฉื˜ ืคืึทืจื’ืขืกืŸ ืฆื• ื”ืึธื ืง ื•ื•ืขืŸ ืึทืœืฅ ื›ืึทืคึผืึทื ื– ืึธื“ืขืจ ื‘ืจื™ื™ืงืก ืึทืจืึธืคึผ (ื ื•, ื“ืึธืก ืื™ื– ื ื™ืฉื˜ ื•ื•ืขื’ืŸ ืื•ื ื“ื–, ื ื™ื˜).

ืœืึธืžื™ืจ ื•ื•ื™ื“ืขืจ ื“ื•ืจื›ื’ื™ื™ืŸ ื“ื™ ื˜ืขืงืข ืื•ืŸ ืงื•ืง ืื•ื™ืฃ ื“ื™ ื ื™ื™ึทืข ืžืึธื“ื ืข ื–ืื›ืŸ:

  • from commons.operators import TelegramBotSendMessage - ื’ืึธืจื ื™ืฉื˜ ืคึผืจื™ื•ื•ืขื ืฅ ืื•ื ื“ื– ืฆื• ืžืึทื›ืŸ ืื•ื ื“ื–ืขืจ ืื™ื™ื’ืขื ืข ืึธืคึผืขืจื™ื™ื˜ืขืจื–, ื•ื•ืึธืก ืžื™ืจ ื”ืึธื‘ืŸ ื’ืขื•ื•ื™ื™ื ื˜ ื“ื•ืจืš ืžืึทื›ืŸ ืึท ืงืœื™ื™ืŸ ืจืึทืคึผืขืจ ืคึฟืึทืจ ืฉื™ืงืŸ ืึทืจื˜ื™ืงืœืขืŸ ืฆื• ื•ื ื‘ืœืึธืงืงืขื“. (ืžื™ืจ ื•ื•ืขืœืŸ ืจืขื“ืŸ ืžืขืจ ื•ื•ืขื’ืŸ ื“ืขื ืึธืคึผืขืจืึทื˜ืึธืจ ืื•ื ื˜ืŸ);
  • default_args={} - ื˜ืึธื’ ืงืขื ืขืŸ ืคืึทืจืฉืคึผืจื™ื™ื˜ืŸ ื“ื™ ื–ืขืœื‘ืข ืึทืจื’ื•ืžืขื ื˜ืŸ ืฆื• ืึทืœืข ื–ื™ื™ึทืŸ ืึธืคึผืขืจื™ื™ื˜ืขืจื–;
  • to='{{ var.value.all_the_kings_men }}' - ืคืขืœื“ to ืื•ื ื“ื–ืขืจ ื•ื•ืขื˜ ื ื™ืฉื˜ ื–ื™ื™ืŸ ื›ืึทืจื“ืงืึธื“ืึทื“, ืึธื‘ืขืจ ื“ื–ืฉืขื ืขืจื™ื™ื˜ืึทื“ ื“ื™ื ืึทืžื™ืงืึทืœืœื™ ื ื™ืฆืŸ ื“ื–ืฉื™ื ื“ื–ืฉืึท ืื•ืŸ ืึท ื‘ื™ื™ึทื˜ืขื•ื•ื“ื™ืง ืžื™ื˜ ืึท ืจืฉื™ืžื” ืคื•ืŸ ื™ืžื™ื™ืœื–, ื•ื•ืึธืก ืื™ืš ืงืขืจืคืึทืœื™ ืฉื˜ืขืœืŸ ืื™ืŸ Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - ืึธืคึผืขืจืึทื˜ืึธืจ ืงืึทื˜ืขืจ ืฆื•ืฉื˜ืึทื ื“. ืื™ืŸ ืื•ื ื“ื–ืขืจ ืคืึทืœ, ื“ืขืจ ื‘ืจื™ื•ื• ื•ื•ืขื˜ ื–ื™ื™ืŸ ื’ืขืฉื™ืงื˜ ืฆื• ื“ื™ ื‘ืึธืกืกืขืก ื‘ืœื•ื™ื– ืื•ื™ื‘ ืึทืœืข ื“ืขืคึผืขื ื“ืขื ืกื™ืขืก ื–ืขื ืขืŸ ืžืงื™ื™ื ื”ืฆืœื—ื”;
  • tg_bot_conn_id='tg_main' - ื˜ืขื ื•ืช conn_id ืึธื ื ืขืžืขืŸ ื“ื™ ื™ื“ืขื ื˜ื™ืคื™ืขืจืก ืคื•ืŸ ื“ื™ ืงืึทื ืขืงืฉืึทื ื– ื•ื•ืึธืก ืžื™ืจ ืžืึทื›ืŸ ืื™ืŸ Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - ืึทืจื˜ื™ืงืœืขืŸ ืื™ืŸ ื˜ืขืœืขื’ืจืึทื ื•ื•ืขื˜ ืคืœื™ืขืŸ ืึทื•ื•ืขืง ื‘ืœื•ื™ื– ืื•ื™ื‘ ืขืก ื–ืขื ืขืŸ ื’ืขืคืืœืŸ ื˜ืึทืกืงืก;
  • task_concurrency=1 - ืžื™ืจ ืคืึทืจื•ื•ืขืจืŸ ื“ื™ ืกื™ื™ืžืึทืœื˜ื™ื™ื ื™ืึทืก ืงืึทื˜ืขืจ ืคื•ืŸ ืขื˜ืœืขื›ืข ืึทืจื‘ืขื˜ ื™ื ืกื˜ืึทื ืกื™ื– ืคื•ืŸ ืื™ื™ืŸ ืึทืจื‘ืขื˜. ืึทื ื“ืขืจืฉ, ืžื™ืจ ื•ื•ืขืœืŸ ื‘ืึทืงื•ืžืขืŸ ืขื˜ืœืขื›ืข ืกื™ื™ืžืึทืœื˜ื™ื™ื ื™ืึทืก ืœืึธื ื˜ืฉื™ื– VerticaOperator (ืงื•ืงื ื“ื™ืง ืื•ื™ืฃ ืื™ื™ืŸ ื˜ื™ืฉ);
  • report_update >> [email, tg] - ืึทืœืข VerticaOperator ื•ื•ืขื˜ ืฉื˜ื™ืžืขืŸ ืฆื• ืฉื™ืงืŸ ื‘ืจื™ื•ื• ืื•ืŸ ืึทืจื˜ื™ืงืœืขืŸ, ื•ื•ื™ ื“ืึธืก:
    ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

    ืึธื‘ืขืจ ื–ื™ื ื˜ ื“ื™ ื ืึธื•ื˜ืึทืคื™ื™ื™ื ื’ ืึธืคึผืขืจื™ื™ื˜ืขืจื– ื”ืึธื‘ืŸ ืคืึทืจืฉื™ื“ืขื ืข ืงืึทื˜ืขืจ ื˜ื ืึธื™ื, ื‘ืœื•ื™ื– ืื™ื™ื ืขืจ ื•ื•ืขื˜ ืึทืจื‘ืขื˜ืŸ. ืื™ืŸ ื˜ืจื™ View ืึทืœืฅ ืงื•ืงื˜ ืึท ื‘ื™ืกืœ ื•ื•ื™ื™ื ื™ืงืขืจ ืงืœืึธืจ:
    ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

ืื™ืš ื•ื•ืขืœ ื–ืึธื’ืŸ ืขื˜ืœืขื›ืข ื•ื•ืขืจื˜ืขืจ ื•ื•ืขื’ืŸ ืžืึทืงืจืึธืก ืื•ืŸ ื–ื™ื™ืขืจืข ื—ื‘ืจื™ื - ื•ื•ืขืจื™ืึทื‘ืึทืœื–.

ืžืึทืงืจืึธืก ื–ืขื ืขืŸ ื“ื–ืฉื™ื ื“ื–ืฉืึท ืึธืจื˜ื”ืึธืœื“ืขืจืก ื•ื•ืึธืก ืงืขื ืขืŸ ืึทืจื™ื™ึทื ืœื™ื™ื’ืŸ ืคืึทืจืฉื™ื“ืŸ ื ื•ืฆื™ืง ืื™ื ืคึฟืึธืจืžืึทืฆื™ืข ืื™ืŸ ืึธืคึผืขืจืึทื˜ืึธืจ ืึทืจื’ื•ืžืขื ื˜ืŸ. ืคึฟืึทืจ ื‘ื™ื™ึทืฉืคึผื™ืœ, ื•ื•ื™ ื“ืึธืก:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} ื•ื•ืขื˜ ื™ืงืกืคึผืึทื ื“ ืฆื• ื“ื™ ืื™ื ื”ืึทืœื˜ ืคื•ืŸ ื“ื™ ืงืึธื ื˜ืขืงืกื˜ ื‘ื™ื™ึทื˜ืขื•ื•ื“ื™ืง execution_date ืื™ืŸ ื“ืขื ืคึฟืึธืจืžืึทื˜ YYYY-MM-DD: 2020-07-14. ื“ืขืจ ื‘ืขืกื˜ืขืจ ื˜ื™ื™ืœ ืื™ื– ืึทื– ืงืึธื ื˜ืขืงืกื˜ ื•ื•ืขืจื™ืึทื‘ืึทืœื– ื–ืขื ืขืŸ ื ื™ื™ืœื“ ืฆื• ืึท ืกืคึผืขืฆื™ืคื™ืฉ ืึทืจื‘ืขื˜ ื‘ื™ื™ึทืฉืคึผื™ืœ (ืึท ืงื•ื•ืึทื“ืจืึทื˜ ืื™ืŸ ื“ื™ ื˜ืจื™ View), ืื•ืŸ ื•ื•ืขืŸ ืจื™ืกื˜ืึทืจื˜ื™ื“ ื“ื™ ืคึผืœืึทืกื›ืึธื•ืœื“ืขืจื– ื•ื•ืขื˜ ื™ืงืกืคึผืึทื ื“ ืฆื• ื“ื™ ื–ืขืœื‘ืข ื•ื•ืึทืœื•ืขืก.

ื“ื™ ืึทืกื™ื™ื ื“ ื•ื•ืึทืœื•ืขืก ืงืขื ืขืŸ ื–ื™ื™ืŸ ื•ื•ื™ื•ื“ ืžื™ื˜ ื“ื™ ืจืขื ื“ืขืจื“ ืงื ืขืคึผืœ ืื™ืŸ ื™ืขื“ืขืจ ืึทืจื‘ืขื˜ ื‘ื™ื™ึทืฉืคึผื™ืœ. ืึทื–ื•ื™ ืงื•ืงื˜ ื“ื™ ืึทืจื‘ืขื˜ ืคึฟืึทืจ ืฉื™ืงืŸ ืึท ื‘ืจื™ื•ื•:

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

ืื•ืŸ ืึทื–ื•ื™ ืคึฟืึทืจ ื“ื™ ืึทืจื‘ืขื˜ ืžื™ื˜ ืฉื™ืงื˜ ืึท ืึธื ื–ืึธื’:

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

ื“ื™ ืคื•ืœ ืจืฉื™ืžื” ืคื•ืŸ ื’ืขื‘ื•ื™ื˜-ืื™ืŸ ืžืึทืงืจืึธืก ืคึฟืึทืจ ื“ื™ ืœืขืฆื˜ืข ื‘ื ื™ืžืฆื ื•ื•ืขืจืกื™ืข ืื™ื– ื‘ื ื™ืžืฆื ื“ืึธ: ืžืึทืงืจืึธืก ืจืขืคืขืจืขื ืฅ

ื“ืขืจืฆื•, ืžื™ื˜ ื“ื™ ื”ื™ืœืฃ ืคื•ืŸ ืคึผืœื•ื’ื™ื ืก, ืžื™ืจ ืงืขื ืขืŸ ื“ืขืจืงืœืขืจืŸ ืื•ื ื“ื–ืขืจ ืื™ื™ื’ืขื ืข ืžืึทืงืจืึธืก, ืึธื‘ืขืจ ื“ืึธืก ืื™ื– ืึท ื’ืึธืจ ืึทื ื“ืขืจืฉ ื’ืขืฉื™ื›ื˜ืข.

ืื™ืŸ ืึทื“ื™ืฉืึทืŸ ืฆื• ื“ื™ ืคึผืจืขื“ืขืคื™ื ืขื“ ื˜ื™ื ื’ื–, ืžื™ืจ ืงืขื ืขืŸ ืคืึทืจื‘ื™ื™ึทื˜ืŸ ื“ื™ ื•ื•ืึทืœื•ืขืก ืคื•ืŸ ืื•ื ื“ื–ืขืจ ื•ื•ืขืจื™ืึทื‘ืึทืœื– (ืื™ืš ื”ืื‘ ืฉื•ื™ืŸ ื’ืขื ื™ืฆื˜ ื“ืขื ืื•ื™ื‘ืŸ ืื™ืŸ ื“ื™ ืงืึธื“). ื–ืืœ ืก ืฉืึทืคึฟืŸ ืื™ืŸ Admin/Variables ืึท ืคึผืึธืจ ืคื•ืŸ ื‘ืจืขืงืœืขืš:

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

ืึทื– ืก ืขืก, ืื™ืจ ืงืขื ืขืŸ ื ื•ืฆืŸ:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

ื“ื™ ื•ื•ืขืจื˜ ืงืขื ืขืŸ ื–ื™ื™ืŸ ืึท ืกืงืึทืœืึทืจ, ืึธื“ืขืจ ืขืก ืงืขืŸ ืื•ื™ืš ืึทื ื˜ื”ืึทืœื˜ืŸ JSON. ืื™ืŸ ื“ืขื ืคืึทืœ ืคื•ืŸ JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

ื ืึธืจ ื ื•ืฆืŸ ื“ืขื ื“ืจืš ืฆื• ื“ื™ ื’ืขื‘ืขื˜ืŸ ืฉืœื™ืกืœ: {{ var.json.bot_config.bot.token }}.

ืื™ืš ื•ื•ืขื˜ ืžืžืฉ ื–ืึธื’ืŸ ืื™ื™ืŸ ื•ื•ืึธืจื˜ ืื•ืŸ ื•ื•ื™ื™ึทื–ืŸ ืื™ื™ืŸ ืกืงืจืขืขื ืฉืึธื˜ ื•ื•ืขื’ืŸ ืงืึทื ืขืงืฉืึทื ื–. ืึทืœืฅ ืื™ื– ืขืœืขืžืขื ื˜ืึทืจ ื“ืึธ: ืื•ื™ืฃ ื“ืขื ื‘ืœืึทื˜ Admin/Connections ืžื™ืจ ืžืึทื›ืŸ ืึท ืงืฉืจ, ืœื™ื™ื’ืŸ ืื•ื ื“ื–ืขืจ ืœืึธื’ื™ื ืก / ืคึผืึทืกื•ื•ืขืจื“ื– ืื•ืŸ ืžืขืจ ืกืคึผืขืฆื™ืคื™ืฉ ืคึผืึทืจืึทืžืขื˜ืขืจืก. ืื–ื•ื™:

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

ืคึผืึทืกื•ื•ืขืจื“ื– ืงืขื ืขืŸ ื–ื™ื™ืŸ ื™ื ืงืจื™ืคึผื˜ื™ื“ (ืžืขืจ ืงืขืจืคืึทืœื™ ื•ื•ื™ ืื™ืŸ ื“ื™ ืคืขืœื™ืงื™ื™ึทื˜ ืึธืคึผืฆื™ืข), ืึธื“ืขืจ ืื™ืจ ืงืขื ืขืŸ ื ื™ืฉื˜ ืกืคึผืขืฆื™ืคื™ืฆื™ืจืŸ ื“ื™ ืงืฉืจ ื˜ื™ืคึผ (ื•ื•ื™ ืื™ืš ื”ืึธื‘ tg_main) - ื“ืขืจ ืคืึทืงื˜ ืื™ื– ืึทื– ื“ื™ ืจืฉื™ืžื” ืคื•ืŸ ื˜ื™ื™ืคึผืก ืื™ื– ื›ืึทืจื“ื•ื•ื™ื™ืขืจื“ ืื™ืŸ ืึทื™ืจืคืœืึธื•ื• ืžืึธื“ืขืœืก ืื•ืŸ ืงืขื ืขืŸ ื ื™ื˜ ื–ื™ื™ืŸ ื™ืงืกืคึผืึทื ื“ื™ื“ ืึธืŸ ื‘ืึทืงื•ืžืขืŸ ืื™ืŸ ื“ื™ ืžืงื•ืจ ืงืึธื“ (ืื•ื™ื‘ ืคึผืœื•ืฆืœื™ื ื’ ืื™ืš ื”ืื˜ ื ื™ืฉื˜ Google ืขืคึผืขืก, ื‘ื™ื˜ืข ืคืึทืจืจื™ื›ื˜ืŸ ืžื™ืจ), ืึธื‘ืขืจ ื’ืึธืจื ื™ืฉื˜ ื•ื•ืขื˜ ื”ืึทืœื˜ืŸ ืื•ื ื“ื– ืคื•ืŸ ื‘ืึทืงื•ืžืขืŸ ืงืจืขื“ื™ืฅ ืคืฉื•ื˜ ื“ื•ืจืš ื ืึธืžืขืŸ.

ืื™ืจ ืงืขื ืขืŸ ืื•ื™ืš ืžืึทื›ืŸ ืขื˜ืœืขื›ืข ืงืึทื ืขืงืฉืึทื ื– ืžื™ื˜ ื“ื™ ื–ืขืœื‘ืข ื ืึธืžืขืŸ: ืื™ืŸ ื“ืขื ืคืึทืœ, ื“ืขืจ ืื•ืคึฟืŸ BaseHook.get_connection(), ื•ื•ืึธืก ื’ืขืฅ ืื•ื ื“ื– ืงืึทื ืขืงืฉืึทื ื– ื“ื•ืจืš ื ืึธืžืขืŸ, ื•ื•ืขื˜ ื’ืขื‘ืŸ ื˜ืจืึทืค ืคึฟื•ืŸ ืขื˜ืœืขื›ืข ื ื™ื™ืžืกื™ื™ืงืขืก (ืขืก ื•ื•ืึธืœื˜ ื–ื™ื™ืŸ ืžืขืจ ืœืึทื“ื–ืฉื™ืงืึทืœ ืฆื• ืžืึทื›ืŸ ืจืึธื•ื ื“ ืจืึธื‘ื™ืŸ, ืึธื‘ืขืจ ืžื™ืจ ืœืึธื–ืŸ ื“ืึธืก ืฆื• ื“ื™ ื’ืขื•ื•ื™ืกืŸ ืคื•ืŸ ื“ื™ ืึทื™ืจืคืœืึธื•ื• ื“ืขื•ื•ืขืœืึธืคึผืขืจืก).

ื•ื•ืขืจื™ืึทื‘ืึทืœื– ืื•ืŸ ืงืึทื ืขืงืฉืึทื ื– ื–ืขื ืขืŸ ื–ื™ื›ืขืจ ืงื™ืœ ืžื›ืฉื™ืจื™ื, ืึธื‘ืขืจ ืขืก ืื™ื– ื•ื•ื™ื›ื˜ื™ืง ื ื™ืฉื˜ ืฆื• ืคืึทืจืœื™ืจืŸ ื“ื™ ื•ื•ืึธื’ ืคื•ืŸ ื•ื•ืึธืก ืคึผืึทืจืฅ ืคื•ืŸ ื“ื™ื™ืŸ ืคืœืึธื•ื– ืื™ืจ ืงืจืึธื ืื™ืŸ ืงืึธื“ ืื•ืŸ ื•ื•ืึธืก ืคึผืึทืจืฅ ืื™ืจ ื’ืขื‘ืŸ ืฆื• ืึทื™ืจืคืœืึธื•ื• ืคึฟืึทืจ ืกื˜ืึธืจื™ื“ื–ืฉ. ืื•ื™ืฃ ื“ื™ ืื™ื™ืŸ ื”ืึทื ื˜, ื’ืขืฉื•ื•ื™ื ื“ ื˜ืฉืึทื ื’ื™ื ื’ ืึท ื•ื•ืขืจื˜, ืœืžืฉืœ, ื‘ืจื™ื•ื•ืงืึทืกื˜ืŸ, ืงืขื ืขืŸ ื–ื™ื™ืŸ ื‘ืึทืงื•ื•ืขื ื“ื•ืจืš ื“ื™ ื•ื™. ืื•ื™ืฃ ื“ื™ ืื ื“ืขืจืข ื”ืึทื ื˜, ื“ืึธืก ืื™ื– ื ืึธืš ืึท ืฆื•ืจื™ืงืงืขืจ ืฆื• ื“ื™ ืžื•ื™ื– ื’ื™ื˜, ื•ื•ืึธืก ืžื™ืจ (ืื™ืš) ื’ืขื•ื•ืืœื˜ ืฆื• ื‘ืึทืงื•ืžืขืŸ ื‘ืึทืคืจื™ื™ึทืขืŸ ืคื•ืŸ.

ืืจื‘ืขื˜ืŸ ืžื™ื˜ ืงืึทื ืขืงืฉืึทื ื– ืื™ื– ืื™ื™ื ืขืจ ืคื•ืŸ ื“ื™ ื˜ืึทืกืงืก ื›ื•ืงืก. ืื™ืŸ ืึทืœื’ืขืžื™ื™ืŸ, ืึทื™ืจืคืœืึธื•ื• ื›ื•ืงืก ื–ืขื ืขืŸ ื•ื•ื™ื™ื–ื˜ ืคึฟืึทืจ ืงืึทื ืขืงื˜ื™ื ื’ ืขืก ืฆื• ื“ืจื™ื˜-ืคึผืึทืจื˜ื™ื™ ื‘ืึทื“ื™ื ื•ื ื’ืก ืื•ืŸ ืœื™ื™ื‘ืจืขืจื™ื–. ืœืžืฉืœ, JiraHook ื•ื•ืขื˜ ืขืคืขื ืขืŸ ืึท ืงืœื™ืขื ื˜ ืคึฟืึทืจ ืื•ื ื“ื– ืฆื• ื™ื ื˜ืขืจืึทืงื˜ ืžื™ื˜ Jira (ืื™ืจ ืงืขื ืขืŸ ืžืึทืš ื˜ืึทืกืงืก ื”ื™ืŸ ืื•ืŸ ืฆื•ืจื™ืง), ืื•ืŸ ืžื™ื˜ ื“ื™ ื”ื™ืœืฃ SambaHook ืื™ืจ ืงืขื ืขืŸ ืฉื˜ื•ืคึผืŸ ืึท ื”ื™ื’ืข ื˜ืขืงืข ืฆื• smb-ืคึผื•ื ืงื˜.

ื–ืืœ ืก ืคื•ื ืึทื ื“ืขืจืงืœื™ื™ึทื‘ืŸ ื“ื™ ืžื ื”ื’ ืึธืคึผืขืจืึทื˜ืึธืจ

ืื•ืŸ ืžื™ืจ ื–ืขื ืขืŸ ื’ืขื•ื•ืขืŸ ื ืึธืขื ื˜ ืฆื• ืงื•ืงืŸ ื•ื•ื™ ืขืก ืื™ื– ื’ืขืžืื›ื˜ TelegramBotSendMessage

ืงืึธื“ืขืงืก commons/operators.py ืžื™ื˜ ื“ืขืจ ืึธืคึผืขืจืึทื˜ืึธืจ ื–ื™ืš:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

ื“ืึธ, ื•ื•ื™ ืึทืœืฅ ืึทื ื“ืขืจืฉ ืื™ืŸ ืึทื™ืจืคืœืึธื•ื•, ืึทืœืฅ ืื™ื– ื–ื™ื™ืขืจ ืคึผืฉื•ื˜:

  • ื™ืจื•ืฉื” ืคื•ืŸ BaseOperator, ื•ื•ืึธืก ื™ืžืคึผืœืึทืžืึทื ืฅ ื’ืึทื ืฅ ืึท ืคึผืœืึทืฅ ืคื•ืŸ ืึทื™ืจืคืœืึธื•ื•-ืกืคึผืขืฆื™ืคื™ืฉ ืฉื˜ืึธืคึผืŸ (ื˜ืฉืขืง ืขืก ืื™ืŸ ื“ื™ื™ืŸ ืคืจื™ื™ึทืข ืฆื™ื™ึทื˜)
  • ืคืขืœื“ืขืจ ืžื•ื“ื™ืข template_fields, ืื™ืŸ ื•ื•ืึธืก ื“ื–ืฉื™ื ื“ื–ืฉืึท ื•ื•ืขื˜ ืงื•ืงืŸ ืคึฟืึทืจ ืžืึทืงืจืึธืก ืฆื• ืคึผืจืึธืฆืขืก.
  • ืึธืจื’ืึทื ื™ื–ื™ืจื˜ ื“ื™ ืจืขื›ื˜ ื˜ืขื ื•ืช ืคึฟืึทืจ __init__(), ื’ืขืฉื˜ืขืœื˜ ื“ื™ ื“ื™ืคืึธืœืฅ ื•ื•ื• ื ื™ื™ื˜ื™ืง.
  • ื–ื™ื™ ืื•ื™ืš ื”ืึธื‘ืŸ ื ื™ืฉื˜ ืคืึทืจื’ืขืกืŸ ื•ื•ืขื’ืŸ ื™ื ื™ืฉืึทืœื™ื–ื™ื ื’ ื“ืขื ืึธื•ื•ืขืก.
  • ื’ืขืขืคื ื˜ ื“ื™ ืงืึธืจืึทืกืคึผืึทื ื“ื™ื ื’ ืงืจื•ืง TelegramBotHook, ื‘ืืงื•ืžืขืŸ ืึท ืงืœื™ืขื ื˜ ื›ื™ื™ืคืขืฅ ืคื•ืŸ ืขืก.
  • ืึธื•ื•ื•ืขืจืจื™ื™ื“ืึทืŸ (ืจื™ื“ื™ืคื™ื™ื ื“) ืื•ืคึฟืŸ BaseOperator.execute(), ื•ื•ืึธืก ืึทื™ืจืคืึธื• ื•ื•ืขื˜ ื˜ื•ื•ื™ื˜ืฉ ื•ื•ืขืŸ ื“ื™ ืฆื™ื™ื˜ ืงื•ืžื˜ ืฆื• ืงืึทื˜ืขืจ ื“ื™ ืึธืคึผืขืจืึทื˜ืึธืจ - ืื™ืŸ ืขืก ืžื™ืจ ื™ื ืกื˜ืจื•ืžืขื ื˜ ื“ื™ ื”ื•ื™ืคึผื˜ ืงืึทืžืฃ, ืึธืŸ ืคืึทืจื’ืขืกืŸ ืฆื• ืงืœืึธืฅ ืื™ืŸ. (ืื’ื‘, ืžื™ืจ ืงืœืึธืฅ ืื™ืŸ ื’ืœื™ื™ึทืš ืฆื• stdout ะธ stderr - ืึทื™ืจืคืœืึธื•ื• ื•ื•ืขื˜ ื™ื ื˜ืขืจืกืขืคึผื˜ ืึทืœืฅ, ื™ื™ึทื ื•ื•ื™ืงืœืขืŸ ืขืก ืฉื™ื™ืŸ ืื•ืŸ ืฉื˜ืขืœืŸ ืขืก ื•ื•ื• ืขืก ื“ืึทืจืฃ ืฆื• ื–ื™ื™ืŸ.)

ื–ืืœ ืก ื–ืขืŸ ื•ื•ืึธืก ืžื™ืจ ื”ืึธื‘ืŸ ืื™ืŸ commons/hooks.py. ื“ืขืจ ืขืจืฉื˜ืขืจ ื˜ื™ื™ืœ ืคื•ืŸ ื“ืขืจ ื˜ืขืงืข, ืžื™ื˜ ื“ื™ ืงืจื•ืง ื–ื™ืš:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

ืื™ืš ื˜ืึธืŸ ื ื™ื˜ ืืคื™ืœื• ื•ื•ื™ืกืŸ ื•ื•ืึธืก ืงืขื ืขืŸ ื–ื™ื™ืŸ ื“ืขืจืงืœืขืจื˜ ื“ืึธ, ืื™ืš ื•ื•ืขื˜ ื ืึธืจ ื˜ืึธืŸ ื“ื™ ื•ื•ื™ื›ื˜ื™ืง ืคื•ื ืงื˜ืŸ:

  • ืžื™ืจ ื™ืจืฉืขื ืขืŸ, ื˜ืจืึทื›ื˜ืŸ ื•ื•ืขื’ืŸ ื“ื™ ื˜ืขื ื•ืช - ืื™ืŸ ืจื•ื‘ึฟ ืงืึทืกืขืก ืขืก ื•ื•ืขื˜ ื–ื™ื™ืŸ ืื™ื™ื ืขืจ: conn_id;
  • ืึธื•ื•ื•ืขืจืจื™ื™ื“ื™ื ื’ ื ืึธืจืžืึทืœ ืžืขื˜ื”ืึธื“ืก: ืื™ืš ืœื™ืžื™ื˜ืขื“ ื–ื™ืš get_conn(), ืื™ืŸ ื•ื•ืึธืก ืื™ืš ื‘ืึทืงื•ืžืขืŸ ืงืฉืจ ืคึผืึทืจืึทืžืขื˜ืขืจืก ื“ื•ืจืš ื ืึธืžืขืŸ ืื•ืŸ ื ืึธืจ ื‘ืึทืงื•ืžืขืŸ ื“ื™ ืึธืคึผื˜ื™ื™ืœื•ื ื’ extra (ื“ืึธืก ืื™ื– ืึท ืคืขืœื“ ืคึฟืึทืจ JSON), ืื™ืŸ ื•ื•ืึธืก ืื™ืš (ืœื•ื™ื˜ ืžื™ื™ืŸ ืื™ื™ื’ืขื ืข ืื™ื ืกื˜ืจื•ืงืฆื™ืขืก!) ืฉื˜ืขืœืŸ ื“ื™ ื˜ืขืœืขื’ืจืึทื ื‘ืึธื˜ ื˜ืึธืงืขืŸ: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • ืื™ืš ืžืึทื›ืŸ ืึท ื‘ื™ื™ึทืฉืคึผื™ืœ ืคื•ืŸ ืื•ื ื“ื–ืขืจ TelegramBot, ื’ืขื‘ืŸ ืื™ื ืึท ืกืคึผืขืฆื™ืคื™ืฉ ืกื™ืžืขืŸ.

ืึทื– ืก ืึทืœืข. ืื™ืจ ืงืขื ืขืŸ ื‘ืึทืงื•ืžืขืŸ ืึท ืงืœื™ืขื ื˜ ืคึฟื•ืŸ ืึท ืงืจื•ืง ื ื™ืฆืŸ TelegramBotHook().clent ืึธื“ืขืจ TelegramBotHook().get_conn().

ืื•ืŸ ื“ืขืจ ืฆื•ื•ื™ื™ื˜ืขืจ ื˜ื™ื™ืœ ืคื•ืŸ ื“ืขืจ ื˜ืขืงืข, ืื™ืŸ ื•ื•ืึธืก ืื™ืš ืžืึทื›ืŸ ืึท ืžื™ืงืจืึธ-ืจืึทืคึผืขืจ ืคึฟืึทืจ ื“ื™ ื˜ืขืœืขื’ืจืึทื REST API, ืึทื–ื•ื™ ื ื™ืฉื˜ ืฆื• ืฉืœืขืคึผืŸ ื“ื™ ื–ืขืœื‘ืข python-telegram-bot ืฆื•ืœื™ื‘ ืื™ื™ืŸ ืฉื™ื˜ื” sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

ื“ืขืจ ืจื™ื›ื˜ื™ืง ื•ื•ืขื’ ืื™ื– ืฆื• ืœื™ื™ื’ืŸ ืขืก ืึทืœืข ืึทืจื•ื™ืฃ: TelegramBotSendMessage, TelegramBotHook, TelegramBot - ืื™ืŸ ืึท ืคึผืœื•ื’ื™ืŸ, ืฉื˜ืขืœืŸ ืขืก ืื™ืŸ ืึท ืฆื™ื‘ื•ืจ ืจื™ืคึผืึทื–ืึทื˜ืึธืจื™ ืื•ืŸ ื’ืขื‘ืŸ ืขืก ืฆื• ืขืคึฟืŸ ืžืงื•ืจ.

ื‘ืฉืขืช ืžื™ืจ ื”ืึธื‘ืŸ ื’ืขืœืขืจื ื˜ ืึทืœืข ื“ืขื, ืื•ื ื“ื–ืขืจ ื‘ืึทืจื™ื›ื˜ ื“ืขืจื”ื™ื™ึทื ื˜ื™ืงื•ื ื’ืขืŸ ื’ืขืจืื˜ืŸ ืฆื• ืคืึทืจืœืึธื–ืŸ ื”ืฆืœื—ื” ืื•ืŸ ืฉื™ืงืŸ ืึท ื˜ืขื•ืช ืึธื ื–ืึธื’ ืฆื• ืžื™ื™ืŸ ืงืึทื ืึทืœ. ืื™ืš ื•ื•ืขืœ ื ืื›ืืžืืœ ื’ื™ื™ืŸ ื˜ืฉืขืงืŸ ื•ื•ืืก ืื™ื– ืฉืœืขื›ื˜...

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ
ืขืคึผืขืก ืื™ื– ืฆืขื‘ืจืื›ืŸ ืื™ืŸ ืื•ื ื“ื–ืขืจ ื˜ืึธื’! ืื™ื– ื ื™ื˜ ื“ืึธืก ื•ื•ืึธืก ืžื™ืจ ื•ื•ืืจื˜ืŸ ืคึฟืึทืจ? ืคึผื•ื ืงื˜!

ื•ื•ืขื˜ ืื™ืจ ืขืก ื’ื™ืกืŸ?

ืฆื™ ืื™ืจ ืคื™ืœืŸ ื•ื•ื™ ืื™ืš ืžื™ืกื˜ ืขืคึผืขืก? ืขืก ืžื™ื™ื ื˜ ืึทื– ืขืจ ื”ืึธื˜ ืฆื•ื’ืขื–ืื’ื˜ ืฆื• ืึทืจื™ื‘ืขืจืคื™ืจืŸ ื“ืึทื˜ืŸ ืคื•ืŸ ืกืงืœ ืกืขืจื•ื•ื™ืจืขืจ ืฆื• ื•ื•ืขืจื˜ื™ืงืึท, ืื•ืŸ ื“ืึทืŸ ืขืจ ื’ืขื ื•ืžืขืŸ ืขืก ืื•ืŸ ืœื™ื ืงืก ื“ืขืจ ื˜ืขืžืข, ื“ื™ื™ืŸ ืกืงืึทื ื“ืึทืœ!

ื“ื™ ื’ืจื•ื™ื–ืึทื ืื™ื– ื’ืขื•ื•ืขืŸ ื‘ืขืงื™ื•ื•ืŸ, ืื™ืš ื ืึธืจ ื”ืึธื‘ืŸ ืฆื• ื“ื™ืกื™ื™ืคืขืจ ืขื˜ืœืขื›ืข ื˜ืขืจืžื™ื ืึธืœืึธื’ื™ืข ืคึฟืึทืจ ืื™ืจ. ืื™ืฆื˜ ืื™ืจ ืงืขื ืขืŸ ืžืึทืš ืื•ื™ืฃ.

ืื•ื ื“ื–ืขืจ ืคึผืœืึทืŸ ืื™ื– ื’ืขื•ื•ืขืŸ ื“ืึธืก:

  1. ืžืึทื›ืŸ ืึท ื˜ืึธื’
  2. ื“ื–ืฉืขื ืขืจื™ื™ื˜ ื˜ืึทืกืงืก
  3. ืงื•ืง ื•ื•ื™ ืฉื™ื™ืŸ ืึทืœืฅ ืื™ื–
  4. ื‘ืึทืฉื˜ื™ืžืขืŸ ืกืขืกื™ืข ื ื•ืžืขืจืŸ ืฆื• ืคื™ืœื–
  5. ื‘ืึทืงื•ืžืขืŸ ื“ืึทื˜ืŸ ืคึฟื•ืŸ SQL Server
  6. ืฉื˜ืขืœืŸ ื“ืึทื˜ืŸ ืื™ืŸ ื•ื•ืขืจื˜ื™ืงืึท
  7. ืงืœื™ื™ึทื‘ืŸ ืกื˜ืึทื˜ื™ืกื˜ื™ืง

ืึทื–ื•ื™, ืฆื• ื‘ืึทืงื•ืžืขืŸ ื“ืขื ืึทืœืข ื’ืขื’ืื ื’ืขืŸ, ืื™ืš ื’ืขืžืื›ื˜ ืึท ื‘ื™ืกืœ ื“ืขืจืฆื• ืฆื• ืื•ื ื“ื–ืขืจ docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

ื“ืึธ ืžื™ืจ ื›ืึทืคึผืŸ:

  • ื•ื•ืขืจื˜ื™ืงืึท ื•ื•ื™ ื‘ืึทืœืขื‘ืึธืก dwh ืžื™ื˜ ื“ื™ ืžืขืจืกื˜ ืคืขืœื™ืงื™ื™ึทื˜ ืกืขื˜ื˜ื™ื ื’ืก,
  • ื“ืจื™ื™ ื™ื ืกื˜ืึทื ืกื™ื– ืคื•ืŸ SQL Server,
  • ืžื™ืจ ืคึผืœืึธืžื‘ื™ืจืŸ ื“ื™ ื“ืึทื˜ืึทื‘ื™ื™ืกื™ื– ืื™ืŸ ื“ื™ ืœืขืฆื˜ืข ืžื™ื˜ ืขื˜ืœืขื›ืข ื“ืึทื˜ืŸ (ืื•ื ื˜ืขืจ ืงื™ื™ืŸ ืฆื•ืฉื˜ืื ื“ืŸ ืงื•ืง ืื™ืŸ mssql_init.py!)

ืžื™ืจ ืงืึทื˜ืขืจ ืึทืœืฅ ืžื™ื˜ ืึท ื‘ื™ืกืœ ืžืขืจ ืงืึธืžืคึผืœื™ืฆื™ืจื˜ ื‘ืึทืคึฟืขืœ ื•ื•ื™ ื“ื™ ืœืขืฆื˜ืข ืžืึธืœ:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

ื•ื•ืึธืก ืื•ื ื“ื–ืขืจ ื ืก ืจืึทื ื“ืึทืžื™ื™ื–ืขืจ ื“ื–ืฉืขื ืขืจื™ื™ื˜ืึทื“ ืงืขื ืขืŸ ื–ื™ื™ืŸ ื’ืขื˜ืืŸ ืžื™ื˜ ื“ืขื ื ื•ืžืขืจ Data Profiling/Ad Hoc Query:

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ
ื“ื™ ื”ื•ื™ืคึผื˜ ื–ืึทืš ืื™ื– ื ื™ืฉื˜ ืฆื• ื•ื•ื™ื™ึทื–ืŸ ืขืก ืฆื• ืึทื ืึทืœื™ืก

ื‘ืึทื˜ืจืึทื›ื˜ืŸ ืื™ืŸ ื“ืขื˜ืึทืœ ETL ืกืขืฉืึทื ื– ืื™ืš ื˜ืึธืŸ ื ื™ื˜, ืึทืœืฅ ืื™ื– ื ื™ืฉื˜ื™ืง: ืžื™ืจ ืžืึทื›ืŸ ืึท ื“ืึทื˜ืึทื‘ื™ื™ืก, ืึท ื˜ื™ืฉ ืื™ืŸ ืขืก, ื™ื™ึทื ื•ื•ื™ืงืœืขืŸ ืึทืœืฅ ืžื™ื˜ ืึท ืงืึธื ื˜ืขืงืกื˜ ืคืึทืจื•ื•ืึทืœื˜ืขืจ, ืื•ืŸ ืื™ืฆื˜ ืžื™ืจ ื˜ืึธืŸ ื“ืึธืก:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

ื“ื™ ืฆื™ื™ื˜ ืื™ื– ื’ืขืงื•ืžืขืŸ ื ืขืžืขืŸ ืื•ื ื“ื–ืขืจ ื“ืึทื˜ืŸ ืคึฟื•ืŸ ืื•ื ื“ื–ืขืจืข ืื•ืŸ ืึท ื”ืึทืœื‘ ื”ื•ื ื“ืขืจื˜ ื˜ื™ืฉืŸ. ื–ืืœ ืก ื˜ืึธืŸ ื“ืึธืก ืžื™ื˜ ื–ื™ื™ืขืจ ืคึผืฉื•ื˜ ืฉื•ืจื•ืช:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. ื ื™ืฆืŸ ืึท ืงืจื•ืง ืžื™ืจ ื‘ืึทืงื•ืžืขืŸ ืคึฟื•ืŸ ืึทื™ืจืคืœืึธื•ื• pymssql-ืคืึทืจื‘ื™ื ื“ืŸ
  2. ืœืึธืžื™ืจ ืึทืจื™ื™ึทื ืœื™ื™ื’ืŸ ืึท ืงืึทื ืกื˜ืจื™ื™ื ืฅ ืื™ืŸ ื“ื™ ืคืึธืจืขื ืคื•ืŸ ืึท ื“ืึทื˜ืข ืื™ืŸ ื“ื™ ื‘ืงืฉื” - ื“ืขืจ ืžื•ืกื˜ืขืจ ืžืึธื˜ืึธืจ ื•ื•ืขื˜ ื•ื•ืึทืจืคืŸ ืขืก ืื™ืŸ ื“ื™ ืคึฟื•ื ืงืฆื™ืข.
  3. ืคื™ื“ื™ื ื’ ืื•ื ื“ื–ืขืจ ื‘ืงืฉื” pandasื•ื•ืขืจ ื•ื•ืขื˜ ืื•ื ื“ื– ื‘ืึทืงื•ืžืขืŸ DataFrame - ืขืก ื•ื•ืขื˜ ื–ื™ื™ืŸ ื ื•ืฆื™ืง ืคึฟืึทืจ ืื•ื ื“ื– ืื™ืŸ ื“ืขืจ ืฆื•ืงื•ื ืคึฟื˜.

ืื™ืš ื ื•ืฆืŸ ืกืึทื‘ืกื˜ื™ื˜ื•ืฉืึทืŸ {dt} ืึทื ืฉื˜ืึธื˜ ืึท ื‘ืงืฉื” ืคึผืึทืจืึทืžืขื˜ืขืจ %s ื ื™ื˜ ื•ื•ื™ื™ึทืœ ืื™ืš ื‘ื™ืŸ ืึท ื‘ื™ื™ื– ืคึผื™ื ืึธืงื˜ืฉื™ืึธ, ืึธื‘ืขืจ ื•ื•ื™ื™ึทืœ pandas ืงืขื ืขืŸ ื ื™ืฉื˜ ืงืึธืคึผืข ืžื™ื˜ pymssql ืื•ืŸ ืกืœื™ืคึผืก ืขืก ืฆื• ื“ื™ ืœืขืฆื˜ืข ืื™ื™ื ืขืจ params: List, ื›ืึธื˜ืฉ ืขืจ ื•ื•ื™ืœ ื˜ืึทืงืข tuple.
ืื•ื™ืš ื˜ืึธืŸ ืึทื– ื“ื™ ื“ืขื•ื•ืขืœืึธืคึผืขืจ pymssql ื‘ืึทืฉืœืึธืกืŸ ื ื™ืฉื˜ ืฆื• ืฉื˜ื™ืฆืŸ ืื™ื ืขื ื™ืžืึธืจ, ืื•ืŸ ืขืก ืก ืฆื™ื™ึทื˜ ืฆื• ืจื™ืจืŸ ืื•ื™ืก pyodbc.

ื–ืืœ ืก ื–ืขืŸ ื•ื•ืึธืก ืึทื™ืจืคืœืึธื•ื• ืกื˜ืึทืคื˜ ืื™ืŸ ื“ื™ ืึทืจื’ื•ืžืขื ื˜ืŸ ืคื•ืŸ ืื•ื ื“ื–ืขืจ ืคืึทื ื’ืงืฉืึทื ื–:

ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื•: ืžืื›ืŸ ETL ื’ืจื™ื ื’ืขืจ

ืื•ื™ื‘ ืขืก ืื™ื– ืงื™ื™ืŸ ื“ืึทื˜ืŸ, ืขืก ืื™ื– ืงื™ื™ืŸ ืคื•ื ื˜ ืฆื• ืคืึธืจื–ืขืฆืŸ. ืื‘ืขืจ ืขืก ืื™ื– ืื•ื™ืš ืžืึธื“ื ืข ืฆื• ื‘ืึทื˜ืจืึทื›ื˜ืŸ ื“ื™ ืคื™ืœื•ื ื’ ื’ืขืจืึธื˜ืŸ. ืื‘ืขืจ ื“ืืก ืื™ื– ื ื™ืฉื˜ ืงื™ื™ืŸ ื˜ืขื•ืช. ืึทื”, ืึทื”, ื•ื•ืึธืก ืฆื• ื˜ืึธืŸ?! ื“ืึธ ืก ื•ื•ืึธืก:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException ืึทื™ืจืคืœืึธื•ื• ื•ื•ืขื˜ ื–ืึธื’ืŸ ืื™ืจ ืึทื– ืขืก ืื™ื– ืคืืงื˜ื™ืฉ ืงื™ื™ืŸ ื˜ืขื•ืช, ืึธื‘ืขืจ ืžื™ืจ ืกืงื™ืคึผื™ื ื’ ื“ื™ ืึทืจื‘ืขื˜. ื“ื™ ืฆื•ื‘ื™ื ื“ ื•ื•ืขื˜ ื ื™ืฉื˜ ื”ืึธื‘ืŸ ืึท ื’ืจื™ืŸ ืึธื“ืขืจ ืจื•ื™ื˜ ืงื•ื•ืึทื“ืจืึทื˜, ืึธื‘ืขืจ ื•ื•ืขื˜ ื–ื™ื™ืŸ ื‘ื•ื ื˜ ืจืึธื–ืขื•ื•ืข.

ื–ืืœ ืก ืงืึธืจืžืขืŸ ืื•ื ื“ื–ืขืจ ื“ืึทื˜ืŸ ืขื˜ืœืขื›ืข ืฉืคืืœื˜ืŸ:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

ื ื™ื™ืžืœื™

  • ื“ื™ ื“ืึทื˜ืึทื‘ื™ื™ืก ืคื•ืŸ ื•ื•ืึธืก ืžื™ืจ ื’ืขื ื•ืžืขืŸ ืึธืจื“ืขืจืก,
  • ID ืคื•ืŸ ืื•ื ื“ื–ืขืจ ื•ืคึผืœืึธืึทื“ ืกืขืกื™ืข (ืขืก ื•ื•ืขื˜ ื–ื™ื™ืŸ ืึทื ื“ืขืจืฉ ืคึฟืึทืจ ื™ืขื“ืขืจ ืึทืจื‘ืขื˜),
  • ื”ืึทืฉ ืคึฟื•ืŸ ื“ืขืจ ืžืงื•ืจ ืื•ืŸ ืกื“ืจ ืื™ื“ืขื ื˜ื™ืคึฟื™ืงืึทืฆื™ืข - ืึทื–ื•ื™ ืึทื– ืื™ืŸ ื“ื™ ืœืขืฆื˜ ื“ืึทื˜ืึทื‘ื™ื™ืก (ื•ื•ื• ืึทืœืฅ ืื™ื– ืื•ื™ืกื’ืขื’ืืกืŸ ืื™ืŸ ืื™ื™ืŸ ื˜ื™ืฉ) ืžื™ืจ ื”ืึธื‘ืŸ ืึท ื™ื™ื ืฆื™ืง ืกื“ืจ ื™ื“ืขื ื˜ื™ืคื™ืขืจ.

ื“ื™ ืœืขืฆื˜ืข ืฉืจื™ื˜ ื‘ืœื™ื™ื‘ื˜: ื’ื™ืกืŸ ืึทืœืฅ ืื™ืŸ ื•ื•ืขืจื˜ื™ืงืึท. ืื•ืŸ, ืึทื“ืœื™ ื’ืขื ื•ื’, ืื™ื™ื ืขืจ ืคื•ืŸ ื“ื™ ืžืขืจืกื˜ ืกืคึผืขืงื˜ืึทืงื™ืึทืœืขืจ ืื•ืŸ ืขืคืขืงื˜ื™ื•ื• ื•ื•ืขื’ืŸ ืฆื• ื˜ืึธืŸ ื“ืึธืก ืื™ื– ื“ื•ืจืš CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. ืžื™ืจ ืžืึทื›ืŸ ืึท ืกืคึผืขืฆื™ืขืœ ืคืึทืจื”ืึทืœื˜ื•ื ื’ ืฆืขื ื˜ืขืจ StringIO.
  2. pandas ื•ื•ืขื˜ ืœื™ื‘ ืœื™ื™ื’ืŸ ืื•ื ื“ื–ืขืจ ืื™ืŸ ืขืก DataFrame ื•ื•ื™ CSV-ืœื™ื ืขืก.
  3. ืœืึธืžื™ืจ ืขืคึฟืขื ืขืŸ ืึท ืคึฟืึทืจื‘ื™ื ื“ื•ื ื’ ืฆื• ืื•ื ื“ื–ืขืจ ื‘ืึทืœื™ื‘ื˜ ื•ื•ืขืจื˜ื™ืงืึท ืžื™ื˜ ืึท ืงืจื•ืง.
  4. ืื•ืŸ ืื™ืฆื˜ ืžื™ื˜ ื“ืขืจ ื”ื™ืœืฃ copy() ืœืึธืžื™ืจ ืฉื™ืงืŸ ืื•ื ื“ื–ืขืจ ื“ืึทื˜ืŸ ื’ืœื™ื™ืš ืฆื• Vertika!

ืžื™ืจ ื•ื•ืขืœืŸ ื ืขืžืขืŸ ืคื•ืŸ ื“ื™ ืฉืึธืคืขืจ ื•ื•ื™ ืคื™ืœืข ืฉื•ืจื•ืช ื–ืขื ืขืŸ ืึธื ื’ืขืคื™ืœื˜ ืื•ืŸ ื–ืึธื’ืŸ ื“ื™ ืกืขืกื™ืข ืคืึทืจื•ื•ืึทืœื˜ืขืจ ืึทื– ืึทืœืฅ ืื™ื– ื’ื•ื˜:

session.loaded_rows = cursor.rowcount
session.successful = True

ืึทื– ืก ืึทืœืข.

ืื™ืŸ ืคึผืจืึธื“ื•ืงืฆื™ืข, ืžื™ืจ ืžืึทื›ืŸ ื“ื™ ืฆื™ืœ ื˜ืขืœืขืจ ืžืึทื ื™ื•ืึทืœื™. ื“ื ื”ืื‘ ืื™ืš ืžื™ืจ ื’ืขืœืื–ื˜ ื ืงืœื™ื™ื ืข ืžืืฉื™ืŸ:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

ืื™ืš ื‘ื™ืŸ ื ื™ืฆืŸ VerticaOperator() ืื™ืš ืžืึทื›ืŸ ืึท ื“ืึทื˜ืึทื‘ื™ื™ืก ืกื˜ืฉืขืžืึท ืื•ืŸ ืึท ื˜ื™ืฉ (ืื•ื™ื‘ ื–ื™ื™ ื˜ืึธืŸ ื ื™ื˜ ืฉื•ื™ืŸ ืขืงืกื™ืกื˜ื™ืจืŸ, ืคื•ืŸ ืœื•ื™ืฃ). ื“ื™ ื”ื•ื™ืคึผื˜ ื–ืึทืš ืื™ื– ืฆื• ืฆื•ืœื™ื™ื’ืŸ ื“ื™ ื“ื™ืคึผืขื ื“ืึทื ืกื™ื– ืจื™ื›ื˜ื™ืง:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

ืกื•ืžืžื™ื ื’ ืึทืจื•ื™ืฃ

"ื ื•," ื”ืึธื˜ ื“ื™ ืžื•ื™ื– ื’ืขื–ืึธื’ื˜, "ืื™ื– ืขืก ื ื™ื˜ ืืžืช ืื™ืฆื˜?"
ื–ืขื ื˜ ืื™ืจ ืงืึทื ื•ื•ื™ื ืกื˜ ืึทื– ืื™ืš ื‘ื™ืŸ ื“ื™ ืžืขืจืกื˜ ืฉืจืขืงืœืขืš ื›ื™ื™ึทืข ืื™ืŸ ื“ื™ ื•ื•ืึทืœื“?

ื“ื–ืฉื•ืœื™ืึท ื“ืึธื ืึทืœื“ืกืึธืŸ, "ื“ื™ ื’ืจื•ืคืึทืœืึธ"

ืื™ืš ื˜ืจืึทื›ื˜ืŸ ืื•ื™ื‘ ืžื™ื™ืŸ ื—ื‘ืจื™ื ืื•ืŸ ืื™ืš ื”ืึธื‘ืŸ ืึท ืคืึทืจืžืขืกื˜: ื•ื•ืขืจ ื•ื•ืึธืœื˜ ื–ื™ื™ืŸ ื“ื™ ืคืึทืกื˜ืึทืกื˜ ืฆื• ืฉืึทืคึฟืŸ ืื•ืŸ ืงืึทื˜ืขืจ ืึทืŸ ETL ืคึผืจืึธืฆืขืก ืคึฟื•ืŸ ืงืจืึทืฆืŸ: ื–ื™ื™ ืžื™ื˜ ื–ื™ื™ืขืจ SSIS ืื•ืŸ ืžื•ื™ื– ืื•ืŸ ืžื™ืจ ืžื™ื˜ Airflow ... ืื•ืŸ ื“ืขืžืึธืœื˜ ืžื™ืจ ื•ื•ืึธืœื˜ ืื•ื™ืš ืคืึทืจื’ืœื™ื™ึทื›ืŸ ื™ื– ืคื•ืŸ ื•ื™ืฉืึทืœื˜. ื•ื•ืึทื•, ืื™ืš ื˜ืจืึทื›ื˜ืŸ ืื™ืจ ื•ื•ืขื˜ ืฉื˜ื™ืžืขืŸ ืึทื– ืื™ืš ื•ื•ืขืœ ื‘ื™ื™ืคึผืึทืก ื–ื™ื™ ืื•ื™ืฃ ืึทืœืข ืคืจืึทื ืฅ!

ืื•ื™ืฃ ืึท ื‘ื™ืกืœ ืžืขืจ ืขืจื ืกื˜ ื˜ืึธืŸ, Apache Airflow - ื“ื•ืจืš ื“ื™ืกืงืจื™ื™ื‘ื™ื ื’ ืคึผืจืึทืกืขืกืึทื– ืื™ืŸ ื“ื™ ืคืึธืจืขื ืคื•ืŸ ืคึผืจืึธื’ืจืึทื ืงืึธื“ - ื”ืื˜ ืžื™ื™ืŸ ืึทืจื‘ืขื˜ ืคื™ืœ ืžืขืจ ื‘ืึทืงื•ื•ืขื ืื•ืŸ ืึธื ื’ืขื ืขื.

ื–ื™ื™ืŸ ืึทื ืœื™ืžืึทื˜ืึทื“ ืขืงืกื˜ืขื ืกื™ื‘ื™ืœื™ื˜ื™: ื‘ื™ื™ื“ืข ืื™ืŸ ื˜ืขืจืžื™ื ืขืŸ ืคื•ืŸ ืคึผืœื•ื’ื™ื ืก ืื•ืŸ ืคึผืจื™ื“ื™ืกืคึผืึทื–ื™ืฉืึทืŸ ืฆื• ืกืงืึทืœืึทื‘ื™ืœื™ื˜ื™ - ื’ื™ื˜ ืื™ืจ ื“ื™ ื’ืขืœืขื’ื ื”ื™ื™ื˜ ืฆื• ื ื•ืฆืŸ Airflow ืื™ืŸ ื›ึผืžืขื˜ ืงื™ื™ืŸ ื’ืขื’ื ื˜: ืืคื™ืœื• ืื™ืŸ ื“ื™ ืคื•ืœ ืฆื™ืงืœ ืคื•ืŸ ืงืึทืœืขืงื˜ื™ื ื’, ืคึผืจื™ืคึผืขืจื™ื ื’ ืื•ืŸ ืคึผืจืึทืกืขืกื™ื ื’ ื“ืึทื˜ืŸ, ืืคื™ืœื• ืื™ืŸ ืœืึธื ื˜ืฉื™ื ื’ ืจืึทืงืึทืฅ (ืฆื• ืžืึทืจืก, ื“ืึธืš) .

ืœืขืฆื˜ ื˜ื™ื™ืœ, ืจืขืคึฟืขืจืขื ืฅ ืื•ืŸ ื™ื ืคืึธืจืžื™ื™ืฉืึทื ืึทืœ

ื“ื™ ื’ืจืึทื‘ืœื™ืข ืžื™ืจ ื’ืขื–ืืžืœื˜ ืคึฟืึทืจ ืื™ืจ

  • start_date. ื™ืึธ, ื“ืึธืก ืื™ื– ืฉื•ื™ืŸ ืึท ื”ื™ื’ืข ืžืขืžืข. ื“ื•ืจืš ื“ืึทื’ืก ื”ื•ื™ืคึผื˜ ืึทืจื’ื•ืžืขื ื˜ start_date ืึทืœืข ืคืึธืจืŸ. ื‘ืขืงื™ืฆืขืจ, ืื•ื™ื‘ ืื™ืจ ืกืคึผืขืฆื™ืคื™ืฆื™ืจืŸ ืื™ืŸ start_date ืงืจืึทื ื˜ ื“ืึทื˜ืข, ืื•ืŸ ืื•ื™ืฃ schedule_interval - ืื™ื™ืŸ ื˜ืึธื’, ื“ืึทื’ ื•ื•ืขื˜ ืงืึทื˜ืขืจ ื ื™ื˜ ืคืจื™ืขืจ ื•ื•ื™ ืžืึธืจื’ืŸ.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    ืื•ืŸ ื ื™ื˜ ืžืขืจ ืคึผืจืึธื‘ืœืขืžืก.

    ืืŸ ืื ื“ืขืจ ื“ื•ืจื›ืคื™ืจื•ื ื’ ื˜ืขื•ืช ืื™ื– ืคืืจื‘ื•ื ื“ืŸ ืžื™ื˜ ืื™ื: Task is missing the start_date parameter, ื•ื•ืึธืก ืจื•ื‘ึฟ ืึธืคื˜ ื™ื ื“ื™ืงื™ื™ืฅ ืึทื– ืื™ืจ ืคืืจื’ืขืกืŸ ืฆื• ื‘ื™ื ื“ืŸ ื˜ืึธื’ ืฆื• ื“ืขืจ ืึธืคึผืขืจืึทื˜ืึธืจ.

  • ืึทืœืฅ ืื•ื™ืฃ ืื™ื™ืŸ ืžืึทืฉื™ืŸ. ื™ืึธ, ืื•ืŸ ื“ืึทื˜ืึทื‘ื™ื™ืกื™ื– (ืึทื™ืจืคืœืึธื•ื• ื–ื™ืš ืื•ืŸ ืื•ื ื“ื–ืขืจ ืงืึธื•ื˜ื™ื ื’), ืื•ืŸ ืึท ื•ื•ืขื‘ ืกืขืจื•ื•ืขืจ, ืื•ืŸ ืึท ืกืงืขื“ื–ืฉื•ืœืขืจ, ืื•ืŸ ื˜ื•ืขืจืก. ืื•ืŸ ืขืก ืืคื™ืœื• ื’ืขืืจื‘ืขื˜. ืึธื‘ืขืจ ืžื™ื˜ ื“ืขืจ ืฆื™ื™ื˜, ื“ื™ ื ื•ืžืขืจ ืคื•ืŸ ื˜ืึทืกืงืก ืคึฟืึทืจ ื“ื™ ืกืขืจื•ื•ื™ืกืขืก ื’ืขื•ื•ืืงืกืŸ, ืื•ืŸ ื•ื•ืขืŸ PostgreSQL ืื ื’ืขื”ื•ื™ื‘ืŸ ืฆื• ืจื™ืกืคึผืึทื ื“ ืฆื• ื“ื™ ืื™ื ื“ืขืงืก ืื™ืŸ 20 ืก ืึทื ืฉื˜ืึธื˜ ืคื•ืŸ 5 ืžื™ื–, ืžื™ืจ ื’ืขื ื•ืžืขืŸ ืขืก ืื•ืŸ ื’ืขืคื™ืจื˜ ืขืก ืึทื•ื•ืขืง.
  • ืœืึธืงืึทืœืขืงืกืขืงื•ื˜ืึธืจ. ื™ื, ืžื™ืจ ื–ื™ืฆืŸ ื ืืš ื“ืขืจื•ื™ืฃ, ืื•ืŸ ืžื™ืจ ื–ืขื ืขืŸ ืฉื•ื™ืŸ ื’ืขืงื•ืžืขืŸ ืฆื•ื ืจืื ื“ ืคื•ืŸ ื“ืขืจ ืชื”ื•ื. LocalExecutor ืื™ื– ื’ืขื•ื•ืขืŸ ื’ืขื ื•ื’ ืคึฟืึทืจ ืื•ื ื“ื– ื‘ื™ื– ืื™ืฆื˜, ืึธื‘ืขืจ ืื™ืฆื˜ ืขืก ืื™ื– ืฆื™ื™ื˜ ืฆื• ื™ืงืกืคึผืึทื ื“ ืžื™ื˜ ืœืคึผื—ื•ืช ืื™ื™ืŸ ืึทืจื‘ืขื˜ืขืจ, ืื•ืŸ ืžื™ืจ ื•ื•ืขืœืŸ ื”ืึธื‘ืŸ ืฆื• ืึทืจื‘ืขื˜ืŸ ื”ืึทืจื“ืขืจ ืฆื• ืึทืจื™ื‘ืขืจืคื™ืจืŸ ืฆื• CeleryExecutor. ืื•ืŸ ื–ื™ื ื˜ ืื™ืจ ืงืขื ืขืŸ ืึทืจื‘ืขื˜ืŸ ืžื™ื˜ ืขืก ืื•ื™ืฃ ืื™ื™ืŸ ืžืึทืฉื™ืŸ, ื’ืึธืจื ื™ืฉื˜ ืกื˜ืึทืคึผืก ืื™ืจ ืคื•ืŸ ื ื™ืฆืŸ ืกืขืœื“ืขืจื™ื™ ืืคื™ืœื• ืื•ื™ืฃ ืึท ืกืขืจื•ื•ืขืจ, ื•ื•ืึธืก "ื’ืขื•ื•ื™ื™ื ื˜ืœืขืš ื•ื•ืขื˜ ืงื™ื™ื ืžืึธืœ ื’ื™ื™ืŸ ืื™ืŸ ืคึผืจืึธื“ื•ืงืฆื™ืข, ื”ืึธื ืขืกื˜ืœื™!"
  • ื ื™ื˜-ื ื•ืฆืŸ ื’ืขื‘ื•ื™ื˜-ืื™ืŸ ืžื›ืฉื™ืจื™ื:
    • ืงืึธื ื ืขืงื˜ื™ืึธื ืก ืฆื• ืงืจืึธื ืกืขืจื•ื•ื™ืก ืงืจืึทื“ืขื ื˜ืฉืึทืœื–,
    • SLA ืžื™ืกืึทื– ืฆื• ืจื™ืกืคึผืึทื ื“ ืฆื• ื˜ืึทืกืงืก ื•ื•ืึธืก ื–ืขื ืขืŸ ื ื™ืฉื˜ ื’ืขืขื ื“ื™ืงื˜ ืื™ืŸ ืฆื™ื™ื˜,
    • XCom ืฆื• ื•ื•ืขืงืกืœ ืžืขื˜ืึทื“ืึทื˜ืึท (ืื™ืš ื’ืขื–ืื’ื˜ metaื“ืึทื˜ืŸ!) ืฆื•ื•ื™ืฉืŸ ื˜ืึธื’ ืก ื˜ืึทืกืงืก.
  • ื–ื™ื“ืœืขืŸ ืคื•ืŸ ืคึผืึธืกื˜. ื ื• ื•ื•ืืก ืงืขืŸ ืื™ืš ื–ืื’ืŸ? ืึทืœืขืจืฅ ื–ืขื ืขืŸ ืฉื˜ืขืœืŸ ืึทืจื•ื™ืฃ ืคึฟืึทืจ ืึทืœืข ืจื™ืคึผืขื˜ื™ืฉืึทื ื– ืคื•ืŸ ื“ืจืึทืคึผื˜ ื˜ืึทืกืงืก. ืื™ืฆื˜ ืื™ืŸ ืžื™ื™ืŸ ืึทืจื‘ืขื˜ Gmail ืขืก ื–ืขื ืขืŸ> 90 ืง ืื•ืชื™ื•ืช ืคื•ืŸ ืึทื™ืจืคืœืึธื•ื•, ืื•ืŸ ื“ื™ ื•ื•ืขื‘ ืคึผืึธืกื˜ ืคึผื ื™ื ื•ื•ื™ืœ ืฆื• ื ืขืžืขืŸ ืื•ืŸ ื•ื™ืกืžืขืงืŸ ืžืขืจ ื•ื•ื™ 100 ื‘ืจืขืงืœืขืš ืื™ืŸ ืึท ืฆื™ื™ื˜.

ืžืขืจ ื—ืกืจื•ื ื•ืช: ืึทืคึผืึทื˜ืฉื™ ืึทื™ืจืคืœืึธื•ื• ืคึผื™ื˜ืคืึทื™ืœืก

ืžื™ื˜ืœ ืคื•ืŸ ืืคื™ืœื• ื’ืจืขืกืขืจ ืึธื˜ืึทืžื™ื™ืฉืึทืŸ

ื›ึผื“ื™ ืื•ื ื“ื– ื–ืึธืœ ืึทืจื‘ืขื˜ืŸ ื ืึธืš ืžืขืจ ืžื™ื˜ ืื•ื ื“ื–ืขืจ ืงืขืค ืื•ืŸ ื ื™ืฉื˜ ืžื™ื˜ ืื•ื ื“ื–ืขืจ ื”ืขื ื˜, Airflow ื”ืื˜ ืฆื•ื’ืขื’ืจื™ื™ื˜ ื“ืึธืก ืคึฟืึทืจ ืื•ื ื“ื–:

  • REST API - ืขืก ื ืึธืš ื”ืื˜ ืขืงืกืคึผืขืจื™ืžืขื ื˜ืึทืœ ืกื˜ืึทื˜ื•ืก, ื•ื•ืึธืก ืงืขืŸ ื ื™ืฉื˜ ืคืึทืจืžื™ื™ึทื“ืŸ ืขืก ืฆื• ืึทืจื‘ืขื˜ืŸ. ืžื™ื˜ ื–ื™ื™ืŸ ื”ื™ืœืฃ, ืื™ืจ ืงืขื ื˜ ื ื™ืฉื˜ ื‘ืœื•ื™ื– ื‘ืึทืงื•ืžืขืŸ ืื™ื ืคึฟืึธืจืžืึทืฆื™ืข ื•ื•ืขื’ืŸ ื˜ืึธื’ ืื•ืŸ ื˜ืึทืกืงืก, ืึธื‘ืขืจ ืื•ื™ืš ื”ืึทืœื˜ืŸ / ืึธื ื”ื™ื™ื‘ืŸ ืึท ื˜ืึธื’, ืฉืึทืคึฟืŸ ืึท DAG Run ืึธื“ืขืจ ืึท ื‘ืขืงืŸ.
  • ืงืœื™ - ืคื™ืœืข ืžื›ืฉื™ืจื™ื ื–ืขื ืขืŸ ื‘ืืจืขื›ื˜ื™ื’ื˜ ื“ื•ืจืš ื“ื™ ื‘ืึทืคึฟืขืœืŸ ืฉื•ืจื” ื•ื•ืึธืก ื–ืขื ืขืŸ ื ื™ืฉื˜ ื‘ืœื•ื™ื– ื•ืžื‘ืึทืงื•ื•ืขื ืฆื• ื ื•ืฆืŸ ื“ื•ืจืš WebUI, ืึธื‘ืขืจ ื–ืขื ืขืŸ ื’ืึธืจ ื ื™ื˜ืึธ. ืœืžืฉืœ:
    • backfill ื“ืืจืฃ ืฆื• ืจื™ืกื˜ืึทืจื˜ ืึทืจื‘ืขื˜ ื™ื ืกื˜ืึทื ืกื™ื–.
      ืคึฟืึทืจ ื‘ื™ื™ึทืฉืคึผื™ืœ, ืึทื ืึทืœื™ืก ื’ืขืงื•ืžืขืŸ ืื•ืŸ ื’ืขื–ืื’ื˜: "ืื•ืŸ ื“ื™ื™ืŸ ื“ืึทื˜ืŸ, ื›ืึทื•ื•ืขืจ, ื–ืขื ืขืŸ ื•ืžื–ื™ืŸ ืคื•ืŸ 1 ื™ืื ื•ืืจ ื‘ื™ื– 13 ื™ืื ื•ืืจ! ืคืึทืจืจื™ื›ื˜ืŸ ืขืก, ืคืึทืจืจื™ื›ื˜ืŸ ืขืก, ืคืึทืจืจื™ื›ื˜ืŸ ืขืก, ืคืึทืจืจื™ื›ื˜ืŸ ืขืก! ืื•ืŸ ืื™ืจ ื–ืขื ื˜ ืึทื–ืึท ืึท ื›ืึทื‘ืึท:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • ื‘ืึทื–ืข ื•ื™ืฉืึทืœื˜: initdb, resetdb, upgradedb, checkdb.
    • run, ื•ื•ืึธืก ืึทืœืึทื•ื– ืื™ืจ ืฆื• ืงืึทื˜ืขืจ ืื™ื™ืŸ ืึทืจื‘ืขื˜ ื‘ื™ื™ึทืฉืคึผื™ืœ, ืื•ืŸ ืืคื™ืœื• ืคืึทืจื’ืขืกืŸ ืึทืœืข ื“ื™ ื“ื™ืคึผืขื ื“ืึทื ืกื™ื–. ื“ืขืจืฆื•, ืื™ืจ ืงืขื ืขืŸ ืœื•ื™ืคืŸ ืขืก ื“ื•ืจืš LocalExecutor, ืืคื™ืœื• ืื•ื™ื‘ ืื™ืจ ื”ืึธื‘ืŸ ืึท ืกืขืœื“ืขืจื™ื™ ืงื ื•ื™ืœ.
    • ื˜ื•ื˜ ื‘ืขืจืš ื“ื™ ื–ืขืœื‘ืข ื–ืืš test, ืึธื‘ืขืจ ืขืก ื˜ื•ื˜ ื ื™ืฉื˜ ืฉืจื™ื™ึทื‘ืŸ ืขืคึผืขืก ืฆื• ื“ื™ ื“ืึทื˜ืึทื‘ื™ื™ืก.
    • connections ืึทืœืึทื•ื– ืื™ืจ ืฆื• ืฉืึทืคึฟืŸ ืงืึทื ืขืงืฉืึทื ื– ืื™ืŸ ืคืึทืจื ืขื ืคื•ืŸ ื“ื™ ืฉืึธืœ.
  • ืคึผื™ื˜ื”ืึธืŸ ืึทืคึผื™ - ืึท ื’ืึทื ืฅ ื›ืึทืจื“ืงืึธืจ ื™ื ื˜ืขืจืึทืงืฉืึทืŸ ืื•ืคึฟืŸ, ื•ื•ืึธืก ืื™ื– ื‘ื“ืขื” ืคึฟืึทืจ ืคึผืœื•ื’ื™ื ืก, ืื•ืŸ ื ื™ืฉื˜ ื˜ื™ื ื’ืงื™ื ื’ ืžื™ื˜ ื“ื™ื™ืŸ ื”ืขื ื˜. ืื‘ืขืจ ื•ื•ืขืจ ื•ื•ืขื˜ ื”ืึทืœื˜ืŸ ืื•ื ื“ื– ืคื•ืŸ ื’ื™ื™ืŸ ืฆื• /home/airflow/dags, ืœื•ื™ืคืŸ ipython ืื•ืŸ ืึธื ื”ื™ื™ื‘ืŸ ืžืขืกื™ื ื’ ืึทืจื•ื? ืื™ืจ ืงืขื ื˜, ืœืžืฉืœ, ืึทืจื•ื™ืกืคื™ืจืŸ ืึทืœืข ืงืึทื ืขืงืฉืึทื ื– ืžื™ื˜ ื“ืขื ืงืึธื“:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • ืคืึทืจื‘ื™ื ื“ืŸ ืฆื• ืึทื™ืจืคืœืึธื•ื• ืžืขื˜ืึทื“ืึทื˜ืึท ื“ืึทื˜ืึทื‘ื™ื™ืก. ืื™ืš ื˜ืึธืŸ ื ื™ืฉื˜ ืจืขืงืึธืžืขื ื“ื™ืจืŸ ืฆื• ืฉืจื™ื™ึทื‘ืŸ ืฆื• ืขืก, ืึธื‘ืขืจ ืื™ืจ ืงืขื ืขืŸ ื‘ืึทืงื•ืžืขืŸ ืึทืจื‘ืขื˜ ืฉื˜ืึทื˜ืŸ ืคึฟืึทืจ ืคืึทืจืฉื™ื“ืŸ ืกืคึผืขืฆื™ืคื™ืฉ ืžืขื˜ืจื™ืงืก ืคื™ืœ ืคืึทืกื˜ืขืจ ืื•ืŸ ื’ืจื™ื ื’ืขืจ ื•ื•ื™ ื“ื•ืจืš ืงื™ื™ืŸ ืคื•ืŸ ื“ื™ ืึทืคึผื™ืก.

    ื–ืืœ ืก ื–ืึธื’ืŸ ืึทื– ื ื™ื˜ ืึทืœืข ืคื•ืŸ โ€‹โ€‹ืื•ื ื“ื–ืขืจ ื˜ืึทืกืงืก ื–ืขื ืขืŸ ื™ื“ืขืžืคึผืึธื˜ืขื ื˜, ืึธื‘ืขืจ ื–ื™ื™ ืงืขื ืขืŸ ืžืืœ ืคืึทืจืœืึธื–ืŸ ืื•ืŸ ื“ืึธืก ืื™ื– ื ืึธืจืžืึทืœ. ืื‘ืขืจ ืึท ื‘ื™ืกืœ ื‘ื•ื™ื‘ืจืขืš ืื™ื– ืฉื•ื™ืŸ ืกืึทืกืคึผื™ืฉืึทืก, ืื•ืŸ ืžื™ืจ ื“ืึทืจืคึฟืŸ ืฆื• ืงืึธื ื˜ืจืึธืœื™ืจืŸ ืขืก.

    ื”ื™ื˜ ืื™ื™ืš, SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

ืจืขืคึฟืขืจืขื ืฆืŸ

ื ื•, ืคื•ืŸ ืงื•ืจืก, ื“ื™ ืขืจืฉื˜ืขืจ ืฆืขืŸ ืœื™ื ืงืก ืคึฟื•ืŸ Google ื–ืขื ืขืŸ ื“ื™ ืื™ื ื”ืึทืœื˜ ืคื•ืŸ ื“ื™ Airflow ื˜ืขืงืข ืคึฟื•ืŸ ืžื™ื™ืŸ ื‘ื•ืงืžืึทืจืงืก.

ืื•ืŸ ืœื™ื ืงืก ื™ื ื•ื•ืึทืœื•ื•ื“ ืื™ืŸ ื“ืขื ืึทืจื˜ื™ืงืœ:

ืžืงื•ืจ: www.habr.com

ืงื•ื™ืคืŸ ืคืึทืจืœืึธื–ืœืขืš ื”ืึธืกื˜ื™ื ื’ ืคึฟืึทืจ ื–ื™ื™ื˜ืœืขืš ืžื™ื˜ DDoS ืฉื•ืฅ, VPS VDS ืกืขืจื•ื•ืขืจืก ๐Ÿ”ฅ ืงื•ื™ืคื˜ ืคืึทืจืœืขืกืœืขื›ืข ื•ื•ืขื‘ื–ื™ื™ื˜ืœ ื”ืึธืกื˜ื™ื ื’ ืžื™ื˜ DDoS ืฉื•ืฅ, VPS VDS ืกืขืจื•ื•ืขืจืก | ProHoster