เบชเบฐเบเบฒเบเบเบต, เบเปเบญเบเปเบกเปเบ Dmitry Logvinenko - เบงเบดเบชเบฐเบงเบฐเบเบญเบเบเปเปเบกเบนเบเบเบญเบเบเบฐเปเบเบเบเบฒเบเบงเบดเปเบเบฒเบฐเบเบญเบเบเบธเปเบกเบเปเบฅเบดเบชเบฑเบ Vezet.
เบเปเบฒเบเบฐเปเบเบปเปเบฒเบเบฐเบเบญเบเบเปเบฒเบเบเปเบฝเบงเบเบฑเบเปเบเบทเปเบญเบเบกเบทเบเบตเปเบเบฐเปเบชเบตเบเบชเปเบฒเบฅเบฑเบเบเบฒเบเบเบฑเบเบเบฐเบเบฒเบเบฐเบเบงเบเบเบฒเบ ETL - Apache Airflow. เปเบเป Airflow เปเบกเปเบเบกเบตเบเบงเบฒเบกเบซเบฅเบฒเบเบซเบฅเบฒเบเปเบฅเบฐเบกเบตเบซเบผเบฒเบเบฎเบนเบเปเบเบเบเบตเปเบเปเบฒเบเบเบงเบเบเบดเบเบฒเบฅเบฐเบเบฒเปเบเบดเปเบเบกเบฑเบเบขเปเบฒเบเปเบเปเบเบดเบเปเบเบดเบเปเบกเปเบเบงเปเบฒเบเปเบฒเบเบเบฐเบเปเปเบกเบตเบชเปเบงเบเบฎเปเบงเบกเปเบเบเบฒเบเปเบซเบฅเบเบญเบเบเปเปเบกเบนเบ, เปเบเปเบเปเบญเบเบกเบตเบเบฒเบเปเบเบตเบเบเบปเบงเบเบฐเบเบงเบเบเบฒเบเบเปเบฒเบเปเปเบเบฑเบเปเบฅเบเบฐเปเบฅเบฐเบเบดเบเบเบฒเบกเบเบฒเบเบเบฐเบเบดเบเบฑเบเบเบญเบเบเบงเบเปเบเบปเบฒ.
เปเบฅเบฐเปเบกเปเบเปเบฅเปเบง, เบเปเบฒเบเบฐเปเบเบปเปเบฒเบเบฐเบเปเปเบเบฝเบเปเบเปเบเบญเบ, เปเบเปเบเบฑเบเบชเบฐเปเบเบเปเบซเปเปเบซเบฑเบ: เปเบเบเบเบฒเบเบกเบตเบซเบผเบฒเบเบฅเบฐเบซเบฑเบ, screenshots เปเบฅเบฐเบเปเบฒเปเบเบฐเบเปเบฒ.
เบชเบดเปเบเบเบตเปเบเปเบฒเบเบกเบฑเบเบเบฐเปเบซเบฑเบเปเบเปเบงเบฅเบฒเบเบตเปเบเปเบฒเบ google เบเปเบฒเบงเปเบฒ Airflow / Wikimedia Commons
เบเบฒเบเบฐเบฅเบฒเบเปเบเบทเปเบญเบซเบฒ
เบเบฒเบเบเปเบฒเบชเบฐเปเบซเบเบต เบเบฒเบเบชเปเบงเบเบเบปเปเบเบเป, เบเบฒเบเบเบฐเบเบดเบเบฑเบ (เปเบฅเบฐเบเบดเบเบชเบฐเบเบตเปเบฅเบฑเบเบเปเบญเบ) เปเบเบฑเบเบซเบเบฑเบเบเบงเบเปเบฎเบปเบฒ (เปเบฅเบฐเปเบเบปเปเบฒ) เบเบฒเบเบเบฐเบเบญเบเบเบธเปเบก เปเบเบงเบเบงเบฒเบกเบเบดเบเบเบทเปเบเบเบฒเบ เบเบงเบเปเบฎเบปเบฒเบชเปเบฒเบเบงเบฝเบเบเบฒเบ เปเบฅเบฑเบเบเปเบญเบเบเปเบฝเบงเบเบฑเบเบเบญเบเปเบกเป เบเบงเบเปเบฎเบปเบฒเปเบซเบผเบ underloaded เบเบฒเบเปเบเบทเปเบญเบกเบเปเป, hooks เปเบฅเบฐเบเบปเบงเปเบเบญเบทเปเบเป เบเบณเบฅเบฑเบเบงเบดเปเบเบฒเบฐเบเบปเบงเบเบณเปเบเบตเบเบเบฒเบเปเบเบเบเบณเบเบปเบเปเบญเบ เปเบเบปเปเบฒเบเบฐเบเบญเบเบเป? Summing up
เบชเปเบงเบเบชเบธเบเบเปเบฒเบ, เบเปเปเบกเบนเบเบญเปเบฒเบเบญเบตเบ เปเบฅเบฐเบเปเปเบกเบนเบ เปเบญเบเบฐเบชเบฒเบ
เบเบฒเบเบเปเบฒเบชเบฐเปเบซเบเบต
Apache Airflow เปเบกเปเบเบเบทเบเบฑเบเบเบฑเบ Django:
- เบเบฝเบเปเบ python
- เบกเบตเบเบฐเบเบฐเบเปเบฅเบดเบซเบฒเบเบเบฒเบเบเบตเปเบเบต,
- เบชเบฒเบกเบฒเบเบเบฐเบซเบเบฒเบเปเบเปเบขเปเบฒเบเบเปเปเบกเบตเบเบณเบเบปเบ
- เบเบฝเบโเปเบเปโเบเบตเบโเบงเปเบฒโ, เปเบฅเบฐโเบกเบฑเบโเปเบเปโเบเบทเบโเบชเปเบฒเบโเบเบถเปเบโเบชเปเบฒโเบฅเบฑเบโเบเบธเบโเบเบฐโเบชเบปเบโเบเบตเปโเปเบเบโเบเปเบฒเบโเบเบฑเบโเบซเบกเบปเบโ, เบเบท (เบเบฑเปเบโเบเบตเปโเบกเบฑเบโเปเบเปโเบเบทเบโเบเบฝเบโเปเบงเปโเบเปเบญเบ katโ)โ:
- เปเบฅเปเบโเปเบฅเบฐโเบเบดเบโเบเบฒเบกโเบเบงเบโเบเบฒโเบงเบฝเบโเบเบฒเบโเปเบโเบเปเบฒโเบเบงเบโเบเปเปโเบเปเบฒโเบเบฑเบโเบเบญเบโเปเบเบทเปเบญเบโเบเบฑเบ (เปเบเบฑเบ Celery / Kubernetes เบซเบผเบฒเบโเปเบฅเบฐโเบเบดเบโเบชเปเบฒโเบเบถเบโเบเบญเบโเบเปเบฒเบโเบเบฐโเบญเบฐโเบเบธโเบเบฒเบโเปเบซเปโเบเปเบฒเบโ)
- เบเปเบงเบเบเบฒเบเบชเปเบฒเบเบเบฐเบเบงเบเบเบฒเบเปเบฎเบฑเบเบงเบฝเบเปเบเบเปเบเบทเปเบญเบเปเบซเบงเบเบฒเบเบเบฒเบเบเบฝเบเปเบฅเบฐเปเบเบปเปเบฒเปเบเบฅเบฐเบซเบฑเบ Python เบเปเบฒเบเบซเบผเบฒเบ
- เปเบฅเบฐเบเบงเบฒเบกเบชเบฒเบกเบฒเบเปเบเบเบฒเบเปเบเบทเปเบญเบกเบเปเปเบเบฒเบเบเปเปเบกเบนเบเปเบฅเบฐ APIs เบเบฑเบเบเบฑเบเปเบฅเบฐเบเบฑเบเปเบเบเปเบเปเบญเบปเบเบเบฐเบเบญเบเบเบตเปเบเบฝเบกเบเปเบญเบกเปเบฅเบฐ plugins เบเบตเปเปเบฎเบฑเบเปเบเปเบฎเบทเบญเบ (เปเบเบดเปเบเบเปเบฒเบเบเบฒเบเบเบตเปเบชเบธเบ).
เบเบงเบเปเบฎเบปเบฒเปเบเป Apache Airflow เปเบเบเบเบตเป:
- เบเบงเบเปเบฎเบปเบฒเปเบเบฑเบเบเปเบฒเบเปเปเบกเบนเบเบเบฒเบเปเบซเบผเปเบเบเปเบฒเบเป (เบซเบผเบฒเบเป SQL Server เปเบฅเบฐ PostgreSQL instances, APIs เบเปเบฒเบเปเบเบตเปเบกเบต metrics เบเปเบฒเบฎเปเบญเบเบชเบฐเบซเบกเบฑเบ, เปเบเบดเบเปเบกเปเบเบงเปเบฒ 1C) เปเบ DWH เปเบฅเบฐ ODS (เบเบงเบเปเบฎเบปเบฒเบกเบต Vertica เปเบฅเบฐ Clickhouse).
- เบเปเบฒเบงเบซเบเปเบฒเปเบเบงเปเบ
cron
, เปเบเบดเปเบเปเบฅเบตเปเบกเบเบปเปเบเบเบฐเบเบงเบเบเบฒเบเบฅเบงเบกเบเปเปเบกเบนเบเบขเบนเปเปเบ ODS, เปเบฅเบฐเบเบฑเบเบเบดเบเบเบฒเบกเบเบฒเบเบเปเบฒเบฅเบธเบเบฎเบฑเบเบชเบฒเบเบญเบเบเบงเบเปเบเบปเบฒ.
เบเบปเบเบเปเบงเบฒเบเปเปเบเบปเบเบกเบฒเบเบตเป, เบเบงเบฒเบกเบเปเบญเบเบเบฒเบเบเบญเบเบเบงเบเปเบฎเบปเบฒเปเบเปเบเบทเบเบเบธเปเบกเบเบญเบเปเบเบเปเบเบทเปเบญเบเปเบกเปเบเปเบฒเบเบเบฐเบซเบเบฒเบเบเปเบญเบเบซเบเบถเปเบเบเบตเปเบกเบต 32 cores เปเบฅเบฐ 50 GB เบเบญเบ RAM. เปเบ Airflow, เบเบตเปเปเบฎเบฑเบเบงเบฝเบ:
- เบซเบผเบฒเบ 200 เบเบฒเบ (เบเบปเบงเบเบดเบเปเบฅเปเบง workflows, เบเบตเปเบเบงเบเปเบฎเบปเบฒ stuffed เบงเบฝเบเบเบฒเบ),
- เปเบโเปเบเปโเบฅเบฐโเปเบเบโเบชเบฐโเปเบฅเปเบโ 70 เบงเบฝเบเบเบฒเบ,
- เบเบงเบฒเบกเบเบตเบเบตเปเปเบฅเบตเปเบกเบเบปเปเบ (เปเบเบเบชเบฐเปเบฅเปเบ) เบซเบเบถเปเบเบเบปเปเบงเปเบกเบ.
เปเบฅเบฐเบเปเบฝเบงเบเบฑเบเบงเบดเบเบตเบเบตเปเบเบงเบเปเบฎเบปเบฒเบเบฐเบซเบเบฒเบ, เบเปเบญเบเบเบฐเบเบฝเบเบเปเบฒเบเบฅเบธเปเบกเบเบตเป, เปเบเปเบเบญเบเบเบตเปเปเบซเปเบเปเบฒเบเบปเบเบเบฑเบเบซเบฒ รผber เบเบตเปเบเบงเบเปเบฎเบปเบฒเบเบฐเปเบเปเปเบ:
เบกเบตเบชเบฒเบกเปเบซเบผเปเบ SQL Servers, เปเบเปเบฅเบฐเบเบปเบเบกเบต 50 เบเบฒเบเบเปเปเบกเบนเบ - เบเบปเบงเบขเปเบฒเบเบเบญเบเปเบเบเบเบฒเบเบซเบเบถเปเบ, เบเบฒเบกเบฅเปเบฒเบเบฑเบ, เบเบงเบเปเบเบปเบฒเบกเบตเปเบเบเบชเปเบฒเบเบเบฝเบงเบเบฑเบ (เปเบเบทเบญเบเบเบธเบเบเปเบญเบ, mua-ha-ha), เบเบถเปเบเบซเบกเบฒเบเบเบงเบฒเบกเบงเปเบฒเปเบเปเบฅเบฐเบเบปเบเบกเบตเบเบฒเบเบฐเบฅเบฒเบเบเปเบฒเบชเบฑเปเบ (เปเบเบเบเบต, เบเบฒเบเบฐเบฅเบฒเบเบเบตเปเบกเบตเบเบฑเปเบ. เบเบทเปเบชเบฒเบกเบฒเบเบเบปเบเปเบเบปเปเบฒเปเบเปเบเบเบธเบฅเบฐเบเบดเบเปเบเบเปเปเบเบฒเบก). เบเบงเบเปเบฎเบปเบฒเปเบญเบปเบฒเบเปเปเบกเบนเบเปเบเบเบเบฒเบเปเบเบตเปเบกเบเปเบญเบเบเปเบฅเบดเบเบฒเบ (เปเบเบทเปเบญเบเปเบกเปเบเปเบฒเบเปเบซเบผเปเบ, เบเบฒเบเบเปเปเบกเบนเบเปเบซเบผเปเบ, ETL task ID) เปเบฅเบฐเบเบดเปเบกเบเบงเบเบกเบฑเบเปเบชเป, เปเบงเบปเปเบฒ, Vertica.
เปเบซเปเปเบ!
เบเบฒเบเบชเปเบงเบเบเบปเปเบเบเป, เบเบฒเบเบเบฐเบเบดเบเบฑเบ (เปเบฅเบฐเบเบดเบเบชเบฐเบเบตเปเบฅเบฑเบเบเปเบญเบ)
เปเบเบฑเบเบซเบเบฑเบเบเบงเบเปเบฎเบปเบฒ (เปเบฅเบฐเปเบเบปเปเบฒ)
เปเบเปเบงเบฅเบฒเบเบตเปเบเบปเปเบเปเบกเปเปเบซเบเปเปเบฅเบฐเบเปเบฒเบเบฐเปเบเบปเปเบฒเบเปเบฒเบเบเบฒเบ SQL
-schik เปเบโเบเบฒเบโเบเปเบฒโเบเบฒเบโเบเปเบญเบโเบเบญเบโเบฅเบฑเบโเปเบเบโเบซเบเบถเปเบโ, เบเบงเบโเปเบฎเบปเบฒโเบซเบฅเบญเบโเบฅเบงเบโเบเบฐโเบเบงเบโเบเบฒเบ ETL aka เบเบฒเบโเปเบซเบผโเบเบญเบโเบเปเปโเบกเบนเบโเปเบเบโเบเปเบฒโเปเบเปโเบชเบญเบโเปเบเบทเปเบญเบโเบกเบทโเบเบตเปโเบกเบตโเปเบซเปโเบเบงเบโเปเบฎเบปเบฒโ:
- เบชเบนเบเบเบฐเบฅเบฑเบเบเบฒเบ Informatica - เปเบเบฑเบโเบฅเบฐโเบเบปเบโเบเบฒเบโเปเบเปโเบเบฐโเบเบฒเบโเบเบตเปโเบชเบธเบโ, เบเบฐโเบฅเบดเบโเบเบฐโเบเบฑเบโเบเบตเปโเบชเบธเบโ, เบกเบตโเบฎเบฒเบโเปเบงโเบเบญเบโเบเบปเบโเปเบญเบโ, เบชเบฐโเบเบฑเบโเบเบญเบโเบเบปเบโเปเบญเบโ. เบเปเบฒเบเบฐเปเบเบปเปเบฒเปเบเปเปเบเปเบเบฃเบฐเปเบเบปเปเบฒเบซเปเบฒเบก 1% เบเบญเบเบเบงเบฒเบกเบชเบฒเบกเบฒเบเบเบญเบเบกเบฑเบ. เปเบเบฑเบเบซเบเบฑเบ? เบเบต, เบเปเบญเบเบญเบทเปเบ เปเบปเบ, เบเบฒเบเปเบเปเบเบญเบเบเบตเป, เบขเบนเปเบเปเบญเบเปเบเบเปเบญเบ เปเบถเปเบ เบเบฒเบเบเบธเบกเบเบต 380, เบเบงเบฒเบกเบเบปเบเบเบฑเบเบเบฒเบเบเบดเบเปเบเบเปเปเบเบงเบเปเบฎเบปเบฒ. เบญเบฑเบเบเบตเบชเบญเบ, contraption เบเบตเปเปเบเปเบเบทเบเบญเบญเบเปเบเบเบชเปเบฒเบฅเบฑเบเบเบฐเบเบงเบเบเบฒเบ fancy เบเบตเปเบชเบธเบ, เบญเบปเบเบเบฐเบเบญเบ furious reuse เปเบฅเบฐ tricks เบเบตเปเบชเปเบฒเบเบฑเบเบซเบผเบฒเบเบงเบดเบชเบฒเบซเบฐเบเบดเบเบญเบทเปเบเป. เบเปเบฝเบงเบเบฑเบเบเบงเบฒเบกเบเบดเบเบเบตเปเบงเปเบฒเบกเบฑเบเบกเบตเบเปเบฒเปเบเปเบเปเบฒเบ, เปเบเบฑเปเบเบเบตเบเบเบญเบ Airbus AXNUMX / เบเบต, เบเบงเบเปเบฎเบปเบฒเบเบฐเบเปเปเปเบงเบปเปเบฒเบซเบเบฑเบ.
เบฅเบฐเบงเบฑเบ, เบเบฒเบเปเปเบฒเบเปเบญเบฒเบเปเบฎเบฑเบเปเบซเปเบเบปเบเบญเบฒเบเบธเบเปเปเบฒเบเบงเปเบฒ 30 เบเบตเบเบฒเบเปเบเบฑเบเปเบฅเบฑเบเบเปเบญเบ
- SQL Server Integration Server - เบเบงเบโเปเบฎเบปเบฒโเปเบเปโเบเปเบฒโเปเบเป comrade เบเบตเปโเปเบโเบเบฒเบโเปเบซเบผโเปเบเบปเปเบฒโเบเบญเบโเปเบเบโเบเบฒเบโเบเบญเบโเบเบงเบโเปเบฎเบปเบฒโ. เปเบฅเปเบง, เปเบเบเบงเบฒเบกเปเบเบฑเบเบเบดเบ: เบเบงเบเปเบฎเบปเบฒเปเบเป SQL Server เปเบฅเปเบง, เปเบฅเบฐเบกเบฑเบเบญเบฒเบเบเบฐเบเปเปเบชเบปเบกเปเบซเบเบชเบปเบกเบเบปเบเบเบตเปเบเบฐเบเปเปเปเบเปเปเบเบทเปเบญเบเบกเบท ETL เบเบญเบเบกเบฑเบ. เบเบธเบเบชเบดเปเบเบเบธเบเบขเปเบฒเบเบขเบนเปเปเบเบกเบฑเบเบเบต: เบเบฑเบเบเบฒเบเปเบเปเบเบญเบเปเบกเปเบเบชเบงเบเบเบฒเบก, เปเบฅเบฐเบฅเบฒเบเบเบฒเบเบเบงเบฒเบกเบเบทเบเบซเบเปเบฒ ... เปเบเปเบเบตเปเบเปเปเปเบกเปเบเปเบซเบเบเบปเบเบเบตเปเบเบงเบเปเบฎเบปเบฒเบฎเบฑเบเบเบฐเบฅเบดเบเบเบฐเบเบฑเบเบเบญเบเปเบง, เปเบญเป, เบเปเปเปเบกเปเบเบชเปเบฒเบฅเบฑเบเปเบฅเบทเปเบญเบเบเบตเป. เบฅเบธเปเบเบกเบฑเบ
dtsx
(เปเบเบดเปเบเปเบกเปเบ XML เบเบฑเบ nodes shuffled on save) เบเบงเบเปเบฎเบปเบฒเบชเบฒเบกเบฒเบ, เปเบเปเบชเบดเปเบเบเบตเปเปเบเบฑเบเบเบธเบ? เปเบฎเบฑเบเปเบเบงเปเบเบเปเบฝเบงเบเบฑเบเบเบฒเบเบชเปเบฒเบเบเบธเบเบงเบฝเบเบเบตเปเบเบฐเบฅเบฒเบเบซเบผเบฒเบเบฎเปเบญเบเบเบฒเบเบฐเบฅเบฒเบเบเบฒเบเปเบเบทเปเบญเบเปเบกเปเบเปเบฒเบเบซเบเบถเปเบเปเบเบซเบฒเบญเบทเปเบ? เปเบกเปเบเปเบฅเปเบง, เบชเบดเปเบเบเบตเปเปเบเบฑเบเบฎเปเบญเบ, เบเบดเปเบงเบกเบทเบเบฑเบเบชเบฐเบเบตเบเบญเบเบเปเบฒเบเบเบฐเบซเบผเบธเบเบฅเบปเบเบเบฒเบเบเบฒเบงเบเปเบญเบ, เบเบฅเบดเบเปเบชเปเบเบธเปเบกเบซเบเบน. เปเบเปเปเบเปเบเบญเบเบกเบฑเบเปเบเบดเปเบเบเบปเบเบญเบฑเบเปเบเบ: เบซเบผเบฒเบ:
เบเบงเบเปเบฎเบปเบฒเปเบเปเบเบญเบเบเบญเบเบซเบฒเบงเบดเบเบตเบเบฒเบเบญเบญเบ. เบเปเบฅเบฐเบเบต เปเบเบทเบญเบ เบกเบฒเบฎเบญเบเปเบเบทเปเบญเบเบเบฐเบฅเบดเบเบเบธเบ SSIS เบเบตเปเบเบฝเบเบเปเบงเบเบเบปเบเปเบญเบ ...
โฆ เปเบฅเปเบงเบงเบฝเบเปเปเปเบเปเบเบปเบเบเปเบญเบ. เปเบฅเบฐ Apache Airflow overtook เบเปเบฒเบเบฐเปเบเบปเปเบฒเบเปเบฝเบงเบเบฑเบเบกเบฑเบ.
เปเบกเบทเปเบญเบเปเบญเบเบเบปเบเบงเปเบฒเบเปเบฒเบญเบฐเบเบดเบเบฒเบเบเปเบฝเบงเบเบฑเบเบเบฐเบเบงเบเบเบฒเบ ETL เปเบกเปเบเบฅเบฐเบซเบฑเบ Python เบเปเบฒเบเบเบฒเบ, เบเปเบญเบเบเปเปเปเบเปเปเบเบฑเปเบเบฅเปเบฒเบชเปเบฒเบฅเบฑเบเบเบงเบฒเบกเบชเบธเบ. เบเบตเปเปเบกเปเบเบงเบดเบเบตเบเบฒเบเบเปเบฒเบเบเบญเบเบเปเปเบกเบนเบเปเบฅเบฐเบเบงเบฒเบกเปเบเบเบเปเบฒเบ, เปเบฅเบฐเบเบฒเบเบเบญเบเปเบเบเบฒเบเบฐเบฅเบฒเบเบเบตเปเบกเบตเปเบเบเบชเปเบฒเบเบเบฝเบงเบเบฒเบเบซเบผเบฒเบเบฎเปเบญเบเบเบฒเบเบเปเปเบกเบนเบเปเบเบปเปเบฒเปเบเปเบเปเบเบปเปเบฒเบซเบกเบฒเบเบเบฝเบงเปเบเปเบเบฒเบเปเบเบฑเบเปเบฅเบทเปเบญเบเบเบญเบเบฅเบฐเบซเบฑเบ Python เปเบเบซเบเบถเปเบเปเบฅเบฐเปเบเบดเปเบเบซเบเบถเปเบเบซเบผเบทเบชเบญเบเบซเบเปเบฒเบเป 13 ".
เบเบฒเบเบเบฐเบเบญเบเบเบธเปเบก
เบเปเปเปเบซเปเบเบฑเบเปเบเบเปเบฎเบเบฎเบฝเบเบญเบฐเบเบธเบเบฒเบเบขเปเบฒเบเบชเบปเบกเบเบนเบ, เปเบฅเบฐเบเปเปเปเบเปเปเบงเบปเปเบฒเบเปเบฝเบงเบเบฑเบเบชเบดเปเบเบเบตเปเบเบฑเบเปเบเบเบขเบนเปเบเบตเปเบเบตเป, เปเบเบฑเปเบ: เบเบฒเบเบเบดเบเบเบฑเปเบ Airflow, เบเบฒเบเบเปเปเบกเบนเบเบเบตเปเบเปเบฒเบเปเบฅเบทเบญเบ, Celery เปเบฅเบฐเบเปเบฅเบฐเบเบตเบญเบทเปเบเปเบเบตเปเบญเบฐเบเบดเบเบฒเบเปเบงเปเปเบ docks.
เปเบเบทเปเบญเปเบซเปเบเบงเบเปเบฎเบปเบฒเบชเบฒเบกเบฒเบเปเบฅเบตเปเบกเบเบปเปเบเบเบฒเบเบเบปเบเบฅเบญเบเปเบเปเบเบฑเบเบเบต, เบเปเบฒเบเบฐเปเบเบปเปเบฒเปเบเปเปเบเปเบกเบฎเบนเบ docker-compose.yml
เปเบเบเบฑเปเบ:
- เปเบซเปเบเบญเบเบเบปเบงเบเบดเบเบเบปเบเบชเบนเบ เบเบฐเปเบชเบญเบฒเบเบฒเบ: Scheduler, Webserver. เบเบญเบเปเบกเปเบเบฑเบเบเบฐเบเบทเบเบซเบกเบธเบเบขเบนเปเบเบตเปเบเบฑเปเบเปเบเบทเปเบญเบเบดเบเบเบฒเบกเบงเบฝเบเบเบฒเบเบเบญเบ Celery (เปเบเบทเปเบญเบเบเบฒเบเบงเปเบฒเบกเบฑเบเปเบเปเบเบทเบ pushed เปเบฅเปเบง
apache/airflow:1.10.10-python3.7
, เปเบเปโเบเบงเบโเปเบฎเบปเบฒโเบเปเปโเบชเบปเบโเปเบโ) - PostgreSQL, เปเบเบเบตเป Airflow เบเบฐเบเบฝเบเบเปเปเบกเบนเบเบเบฒเบเบเปเบฅเบดเบเบฒเบเบเบญเบเบกเบฑเบ (เบเปเปเบกเบนเบเบเบฒเบเบฐเบฅเบฒเบ, เบชเบฐเบเบดเบเบดเบเบฒเบเบเบฐเบเบดเบเบฑเบ, เปเบฅเบฐเบญเบทเปเบเป), เปเบฅเบฐ Celery เบเบฐเบซเบกเบฒเบเบงเบฝเบเบเบฒเบเบเบตเปเบชเปเบฒเปเบฅเบฑเบ;
- Redis, เปเบเบดเปเบเบเบฐเปเบฎเบฑเบเบซเบเปเบฒเบเบตเปเปเบเบฑเบเบเบฒเบเบซเบเปเบฒเบชเปเบฒเบฅเบฑเบ Celery;
- เบเบฐเบเบฑเบเบเบฒเบ Celery, เปเบเบดเปเบเบเบฐเปเบเปเบฎเบฑเบเบเบฒเบเบเปเบฒเปเบเบตเบเบเบฒเบเปเบเบเบเบปเบเบเบญเบเบงเบฝเบเบเบฒเบ.
- เปเบเบเบตเปเปเบเบเปเบเบต
./dags
เบเบงเบเปเบฎเบปเบฒเบเบฐเปเบเบตเปเบกเปเบเบฅเปเบเบญเบเบเบงเบเปเบฎเบปเบฒเบเปเบงเบเบเปเบฒเบญเบฐเบเบดเบเบฒเบเบเบญเบ dags. เบเบงเบเบกเบฑเบเบเบฐเบเบทเบเปเบเบฑเบเบเบถเปเบเปเบเบดเบเปเบกเบเบงเบฑเบ, เบชเบฐเบเบฑเปเบเบเปเปเบเบณเปเบเบฑเบเบเบตเปเบเบฐเบเบนเบเบเบญเบเบเบฑเบเปเบปเบเบซเบผเบฑเบเบเบฒเบเบเบฒเบกเปเบเปเบฅเบฐเบเบฑเปเบ.
เปเบเบเบฒเบเบเปเบญเบ, เบฅเบฐเบซเบฑเบเปเบเบเบปเบงเบขเปเบฒเบเบเปเปเปเบเปเบชเบฐเปเบเบเปเบซเปเปเบซเบฑเบเบขเปเบฒเบเบชเบปเบกเบเบนเบ (เปเบเบทเปเบญเบเปเปเปเบซเป clutter เบเปเปเบเบงเบฒเบก), เปเบเปเบเบฒเบเบเปเบญเบเบกเบฑเบเบเบทเบเบเบฑเบเปเบเปเปเบเบเบฐเบเบงเบเบเบฒเบ. เบเบปเบงเบขเปเบฒเบเบฅเบฐเบซเบฑเบเบเบฒเบเปเบฎเบฑเบเบงเบฝเบเบเบตเปเบชเบปเบกเบเบนเบเบชเบฒเบกเบฒเบเบเบปเบเปเบเปเบขเบนเปเปเบเบเปเบญเบเปเบเบฑเบเบกเปเบฝเบ
https://github.com/dm-logv/airflow-tutorial .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- broker
เบซเบกเบฒเบเปเบซเบ:
- เปเบเบเบฒเบเบเบฐเบเบญเบเบเบญเบเบญเบปเบเบเบฐเบเบญเบ, เบเปเบฒเบเบฐเปเบเบปเปเบฒเบชเปเบงเบเปเบซเบเปเปเบกเปเบเบญเบตเบเปเบชเปเบฎเบนเบเบเบฒเบเบเบตเปเบกเบตเบเบทเปเบชเบฝเบ
puckel/docker-airflow - เปเบซเปเปเบเปเปเบเบงเปเบฒเบเบงเบเปเบเบดเปเบเบกเบฑเบเบญเบญเบ. เบเบฒเบเบเบตเปเบเบปเปเบฒเบเปเปเบเปเบญเบเบเบฒเบเบซเบเบฑเบเบญเบตเบเปเบเบเบตเบงเบดเบเบเบญเบเปเบเบปเปเบฒ. - เบเบฒเบเบเบฑเปเบเบเปเบฒ Airflow เบเบฑเบเบซเบกเบปเบเปเบกเปเบเบกเบตเบขเบนเปเบเปเปเบเบฝเบเปเบเปเบเปเบฒเบ
airflow.cfg
, เปเบเปเบเบฑเบเบเปเบฒเบเบเบปเบงเปเบเบชเบฐเบเบฒเบเปเบงเบเบฅเปเบญเบก (เบเปเบเบญเบเปเบเบเบฑเบเบเบนเปเบเบฑเบเบเบฐเบเบฒ), เปเบเบดเปเบเบเปเบญเบเปเบเปเปเบเปเบเบฐเปเบซเบเบเบเบฒเบเบญเบฑเบเบเบฐเบฅเบฒเบ. - เบเบฒเบกเบเปเบฒเบกเบฐเบเบฒเบ, เบกเบฑเบเบเปเปเปเบกเปเบเบเบฒเบเบเบฐเบฅเบดเบเบเบตเปเบเบฝเบกเบเปเบญเบก: เบเปเบฒเบเบฐเปเบเบปเปเบฒเปเบเบเบเบฐเบเบฒเบเปเปเปเบเปเปเบชเปเบเบฒเบเปเบเบฑเปเบเบเบญเบเบซเบปเบงเปเบเปเบชเปเบเบฑเบ, เบเปเบฒเบเบฐเปเบเบปเปเบฒเบเปเปเปเบเปเบฅเบปเบเบเบงเบเบเบงเบฒเบกเบเบญเบเปเบ. เปเบเปเบเปเบญเบเปเบเปเปเบฎเบฑเบเบเปเบฒเปเบชเบธเบเบเบตเปเปเบซเบกเบฒเบฐเบชเบปเบกเบชเปเบฒเบฅเบฑเบเบเบฑเบเบเบปเบเบฅเบญเบเบเบญเบเบเบงเบเปเบฎเบปเบฒ.
- เปเบซเปเบชเบฑเบเปเบเบเบงเปเบฒ:
- เปเบเบเปเบเบต dag เบเปเบญเบเบชเบฒเบกเบฒเบเปเบเบปเปเบฒเปเบเบดเบเปเบเปเบเบฑเบเบเบนเปเบเบณเบเบปเบเปเบงเบฅเบฒ เปเบฅเบฐเบเบปเบเบเบฒเบ.
- เบเบฝเบงเบเบฑเบเปเบเปเบเบฑเบเบซเปเบญเบเบชเบฐเบซเบกเบธเบเบเบฒเบเบชเปเบงเบเบเบตเบชเบฒเบกเบเบฑเบเบซเบกเบปเบ - เบเบงเบเปเบเบปเบฒเบเบฑเบเบซเบกเบปเบเบเปเบญเบเปเบเปเบฎเบฑเบเบเบฒเบเบเบดเบเบเบฑเปเบเบขเบนเปเปเบเปเบเบทเปเบญเบเบเบฑเบเบเบตเปเบกเบตเบเบฒเบเบฐเบฅเบฒเบเปเบฅเบฐเบเบฐเบเบฑเบเบเบฒเบ.
เปเบฅเปเบง, เบเบฝเบงเบเบตเปเบกเบฑเบเบเปเบฒเบเบเบฒเบ:
$ docker-compose up --scale worker=3
เบซเบผเบฑเบเบเบฒเบเบเบตเปเบเบธเบเบชเบดเปเบเบเบธเบเบขเปเบฒเบเปเบเบตเปเบกเบเบถเปเบ, เบเปเบฒเบเบชเบฒเบกเบฒเบเปเบเบดเปเบเปเบเบเบฒเบเปเบเปเบเบญเบเบเบญเบเปเบงเบฑเบเปเบเบเป:
- เบเบฒเบเบเบดเบ:
http://127.0.0.1:8080/admin/ - เบเบญเบเปเบกเป:
http://127.0.0.1:5555/dashboard
เปเบเบงเบเบงเบฒเบกเบเบดเบเบเบทเปเบเบเบฒเบ
เบเปเบฒเบเปเบฒเบเบเปเปเปเบเบปเปเบฒเปเบเบชเบดเปเบเปเบเปเบ "dags", เบเบตเปเปเบกเปเบเบงเบฑเบเบเบฐเบเบฒเบเบธเบเบปเบกเบชเบฑเปเบ:
- Scheduler - เบฅเบธเบเบเบตเปเบชเปเบฒเบเบฑเบเบเบตเปเบชเบธเบเปเบ Airflow, เบเบฒเบเบเบงเบเบเบธเบกเบเบตเปเบซเบธเปเบเบเบปเบเปเบฎเบฑเบเบงเบฝเบเบซเบเบฑเบ, เปเบฅเบฐเบเปเปเปเบกเปเบเบเบธเบเบเบปเบ: เบเบดเบเบเบฒเบกเบเบงเบเบเบฒเบเบฒเบเบฐเบฅเบฒเบ, เบเบฑเบเบเบธเบ dags, เปเบเบตเบเบเบปเบงเบงเบฝเบเบเบฒเบ.
เปเบเบเบเบปเปเบงเปเบเปเบฅเปเบง, เปเบเบฎเบธเปเบเปเบเบปเปเบฒ, เบฅเบฒเบงเบกเบตเบเบฑเบเบซเบฒเบเบฑเบเบเบงเบฒเบกเบเบปเบเบเปเบฒ (เบเปเป, เบเปเปเปเบกเปเบเบเบงเบฒเบกเบเปเบฒ, เปเบเปเบเบฒเบเบฎเบปเปเบงเปเบซเบผ) เปเบฅเบฐเบเบปเบงเบเปเบฒเบเบปเบเบเบฒเบเบกเปเบฅเบฐเบเบปเบเบเบฑเบเบขเบนเปเปเบเบเบฒเบเบเบฑเปเบเบเปเบฒ.
run_duration
โ เปเบฅโเบเบฐโเบเบฒเบโเปเบฅเบตเปเบกโเบเบปเปเบโเปเบซเบกเปโเบเบญเบโเบเบปเบโ. เปเบเปเบเบฝเบงเบเบตเปเบเบธเบเบขเปเบฒเบเบเบตเปเบฅเปเบง. - DAG (aka "dug") - "เบเบฒเบ acyclic เบเบตเป", เปเบเปเบเปเบฒเบเบดเบเบฒเบกเบเบฑเปเบเบเปเบฒเบงเบเบฐเบเบญเบเบเบปเบเบเปเบฒเบเบงเบเบซเบเปเบญเบเบซเบเบถเปเบ, เปเบเปเปเบเบเบงเบฒเบกเปเบเบฑเบเบเบดเบเบกเบฑเบเปเบเบฑเบเบเบฑเบเบชเปเบฒเบฅเบฑเบเบงเบฝเบเบเบฒเบเบเบตเปเบเบปเบงเบเบฑเบเบเบฑเบเบเบฑเบเปเบฅเบฐเบเบฑเบ (เปเบเบดเปเบเบเปเบฒเบเบฅเบธเปเบกเบเบตเป) เบซเบผเบทเบเบฒเบเบเบฝเบเบเบฝเบเบเบญเบ Package เปเบ SSIS เปเบฅเบฐ Workflow เปเบ Informatica .
เบเบญเบเปเบซเบเบทเบญเบเบฒเบ dags, เบกเบฑเบเบญเบฒเบเบเบฐเบเบฑเบเบกเบต subdags, เปเบเปเบเบงเบเปเบฎเบปเบฒเบชเปเบงเบเบซเบผเบฒเบเบญเบฒเบเบเบฐเบเปเปเปเบเบปเปเบฒเบซเบฒเบเบงเบเบกเบฑเบ.
- เปเบฅเปเบ DAG - เปเบฅเบตเปเบกเบเบปเปเบ dag, เปเบเบดเปเบเบกเบญเบเบซเบกเบฒเบเบเบญเบเบเบปเบเปเบญเบ
execution_date
. Dagrans เบเบญเบ dag เบเบฝเบงเบเบฑเบเบชเบฒเบกเบฒเบเปเบฎเบฑเบเบงเบฝเบเบเบฐเบซเบเบฒเบเบเบฑเบ (เบเปเบฒเบเปเบฒเบเปเบฎเบฑเบเปเบซเปเบงเบฝเบเบเบฒเบเบเบญเบเบเปเบฒเบเบเปเปเบกเบตเบเบงเบฒเบกเปเบเบฑเปเบกเปเบเบ, เปเบเปเบเบญเบ). - Operator เปเบกเปเบเบเบดเปเบเบชเปเบงเบเบเบญเบเบฅเบฐเบซเบฑเบเบเบตเปเบฎเบฑเบเบเบดเบเบเบญเบเบชเปเบฒเบฅเบฑเบเบเบฒเบเบเปเบฒเปเบเบตเบเบเบฒเบเบชเบฐเปเบเบฒเบฐเปเบเบซเบเบถเปเบ. เบกเบตเบชเบฒเบกเบเบฐเปเบเบเบเบญเบเบเบนเปเบเบฐเบเบญเบเบเบฒเบ:
- เบเบฒเบเบเบฐเบเบดเบเบฑเบเบเบท favorite เบเบญเบเบเบงเบเปเบฎเบปเบฒ
PythonOperator
, เปเบเบดเปเบเบชเบฒเบกเบฒเบเบเบฐเบเบดเบเบฑเบเบฅเบฐเบซเบฑเบ Python เปเบเป (เบเบตเปเบเบทเบเบเปเบญเบ); - เบเบฒเบเปเบญเบ, เปเบเบดเปเบเบเบฒเบเบเบปเบเบชเบปเปเบเบเปเปเบกเบนเบเบเบฒเบเบชเบฐเบเบฒเบเบเบตเปเปเบเบซเบฒเบชเบฐเบเบฒเบเบเบตเป, เปเบงเบปเปเบฒเบงเปเบฒ,
MsSqlToHiveTransfer
; - sensor เปเบเบเบฒเบเบเบปเบเบเบฑเบเบเปเบฒเบก, เบกเบฑเบเบเบฐเบเปเบงเบเปเบซเปเบเปเบฒเบเบชเบฒเบกเบฒเบเบเบญเบเปเบเปเบซเบผเบทเบเปเบฒเบฅเบปเบเบเบฒเบเบเบฐเบเบดเบเบฑเบเบเปเปเปเบเบเบญเบ dag เบเบปเบเบเปเบงเบฒเปเบซเบเบเบฒเบเปเบเบตเบเบเบทเปเบ.
HttpSensor
เบชเบฒเบกเบฒเบเบเบถเบเบเบธเบเบชเบดเปเบเบชเบธเบเบเบตเปเบฅเบฐเบเบธเปเบงเป, เปเบฅเบฐเปเบกเบทเปเบญเบเปเบฒเบเบญเบเบเบตเปเบเปเบญเบเบเบฒเบเบฅเปเบเปเบฒ, เปเบฅเบตเปเบกเบเบปเปเบเบเบฒเบเปเบญเบGoogleCloudStorageToS3Operator
. เบเบดเบเปเบเบเบตเปเบชเบปเบเปเบชเบเบฐเบเบฒเบกเบงเปเบฒ: โเปเบเบฑเบเบซเบเบฑเบ? เบซเบผเบฑเบเบเบฒเบเบเบตเปเบเบฑเบเบซเบกเบปเบ, เบเปเบฒเบเบชเบฒเบกเบฒเบเปเบฎเบฑเบเบเปเปเบฒเปเบเปเปเบเบเบปเบงเบเบฐเบเบดเบเบฑเบเบเบฒเบ!โ เปเบฅเบฐเบซเบผเบฑเบเบเบฒเบเบเบฑเปเบ, เปเบเบเปเบฒเบชเบฑเปเบเบเบตเปเบเบฐเบเปเป clog เบชเบฐเบเบธเบเปเบเบตเบเบญเบเบงเบฝเบเบเบฒเบเบเบตเปเบกเบตเบเบนเปเบเบฐเบเบดเบเบฑเบเบเบฒเบเปเบเบฐ. เปเบเบฑเบเปเบเบตเปเบฅเบตเปเบกเบเบปเปเบ, เบเบงเบเบชเบญเบเปเบฅเบฐเบเบฒเบเบเปเบญเบเบเบตเปเบเบฐเบเบฐเบเบฒเบเบฒเบกเบเปเปเปเบ.
- เบเบฒเบเบเบฐเบเบดเบเบฑเบเบเบท favorite เบเบญเบเบเบงเบเปเบฎเบปเบฒ
- Task - เบเบนเปเบเบฐเบเบดเบเบฑเบเบเบฒเบเบเบฐเบเบฒเบ, เบเปเปเบงเปเบฒเบเบฐเปเบเบฑเบเบเบฐเปเบเบเปเบ, เปเบฅเบฐเบเบดเบเบเบฑเบเบเบฒเบเปเบเปเบเบทเบเปเบฅเบทเปเบญเบเบเบฑเปเบเปเบเบซเบเปเบฒเบงเบฝเบ.
- เบเบปเบงเบขเปเบฒเบเปเปเบฒเบงเบฝเบ - เปเบกเบทเปเบญเบเบนเปเบงเบฒเบเปเบเบเบเบปเปเบงเปเบเบเบฑเบเบชเบดเบเปเบเบงเปเบฒเบกเบฑเบเปเบกเปเบเปเบงเบฅเบฒเบเบตเปเบเบฐเบชเบปเปเบเบงเบฝเบเบเบฒเบเปเบเบปเปเบฒเปเบเปเบเบเบฒเบเบชเบนเปเบฎเบปเบเบเบฑเบเบเบฑเบเบชเบฐเปเบเบ - เบเบฐเบเบฑเบเบเบฒเบ (เบขเบนเปเปเบเบเบธเบ, เบเปเบฒเบเบงเบเปเบฎเบปเบฒเปเบเป
LocalExecutor
เบซเบผเบทเบเบฑเบ node เบซเปเบฒเบเปเบเบชเบญเบเบซเบผเบตเบเปเบเบเปเบฅเบฐเบเบตเบเบญเบCeleryExecutor
), เบกเบฑเบเบเปเบฒเบเบปเบเบเปเบฅเบดเบเบปเบเปเบซเปเบเบงเบเปเบเบปเบฒ (i. e., เบเบธเบเบเบญเบเบเบปเบงเปเบ - เบเบฒเบฅเบฒเบกเบดเปเบเบตเบเบฒเบเบเบฐเบเบดเบเบฑเบ), เบเบฐเบซเบเบฒเบเบเปเบฒเบชเบฑเปเบเบซเบผเบทเปเบเบเบชเบญเบเบเบฒเบก, เปเบฅเบฐเบเบฐเบเบญเบเปเบซเปเปเบเบปเบฒเปเบเบปเปเบฒ.
เบเบงเบเปเบฎเบปเบฒเบชเปเบฒเบเบงเบฝเบเบเบฒเบ
เบเปเบฒเบญเบดเบ, เปเบซเปเบญเบฐเบเบดเบเบฒเบเปเบเบเบเบฒเบเบเบปเปเบงเปเบเบเบญเบ doug เบเบญเบเบเบงเบเปเบฎเบปเบฒ, เปเบฅเบฐเบซเบผเบฑเบเบเบฒเบเบเบฑเปเบเบเบงเบเปเบฎเบปเบฒเบเบฐเปเบเบปเปเบฒเปเบเปเบเบฅเบฒเบเบฅเบฐเบญเบฝเบเปเบเบตเปเบกเปเบเบตเบกเปเบฅเบฐเบซเบผเบฒเบ, เปเบเบฒเบฐเบงเปเบฒเบเบงเบเปเบฎเบปเบฒเบเปเบฒเปเบเปเบเบฒเบเบงเบดเบเบตเปเบเปเปเบเบเบตเปเบเปเปเปเบกเปเบ trivial.
เบเบฑเปเบเบเบฑเปเบ, เปเบเบฎเบนเบเปเบเบเบเบตเปเบเปเบฒเบเบเบฒเบเบเบตเปเบชเบธเบ, เบเบฑเปเบเบเปเบฒเบงเบเบฐเบกเบตเบฅเบฑเบเบชเบฐเบเบฐเบเบตเป:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
เปเบซเปเบเบดเบเบญเบญเบเบงเปเบฒ:
- เบซเบเปเบฒเบเปเบฒเบญเบดเบ, เบเบงเบเปเบฎเบปเบฒเบเปเบฒเปเบเบปเปเบฒ libs เบเบตเปเบเปเบฒเปเบเบฑเบเปเบฅเบฐ เบญเบฑเบโเบญเบทเปเบ;
sql_server_ds
- เบเบตเปโเปเบกเปเบList[namedtuple[str, str]]
เบเบฑเบเบเบทเปเบเบญเบเบเบฒเบเปเบเบทเปเบญเบกเบเปเปเบเบฒเบ Airflow Connections เปเบฅเบฐเบเบฒเบเบเปเปเบกเบนเบเบเบตเปเบเบงเบเปเบฎเบปเบฒเบเบฐเปเบญเบปเบฒเปเบเปเบเบเบญเบเบเบงเบเปเบฎเบปเบฒ;dag
- เบเบฒเบเบเบฐเบเบฒเบเบเบญเบ doug เบเบญเบเบเบงเบเปเบฎเบปเบฒ, เปเบเบดเปเบเบเปเบฒเปเบเบฑเบเบเปเบญเบเบกเบตเบขเบนเปเปเบglobals()
เบเปเบฒเบเปเปเบเบฑเปเบเบเบฑเปเบ Airflow เบเบฐเบเปเปเบเบปเบเบกเบฑเบ. Doug เบเบฑเบเบเปเบญเบเบเบฒเบเปเบงเบปเปเบฒเบงเปเบฒ:- เบฅเบฒเบงเบเบทเปเบซเบเบฑเบ
orders
- เบซเบผเบฑเบโเบเบฒเบโเบเบฑเปเบโเบเบทเปโเบเบตเปโเบเบฐโเบเบฒโเบเบปเบโเบขเบนเปโเปเบโเบเบฒเบโเปเบเปโเบเบญเบโเบเบญเบโเปเบงเบฑเบโเปเบโเบโเปโ, - เบงเปเบฒเบฅเบฒเบงเบเบฐเปเบฎเบฑเบเบงเบฝเบเบเบฑเปเบเปเบเปเบเปเบฝเบเบเบทเบเบเบญเบเบงเบฑเบเบเบตเปเบเบเบเบญเบเปเบเบทเบญเบเบเปเบฅเบฐเบเบปเบ,
- เปเบฅเบฐเบกเบฑเบเบเบงเบเบเบฐเบเปเบฒเปเบเบตเบเบเบฒเบ, เบเบฐเบกเบฒเบเบเบธเบเป 6 เบเบปเปเบงเปเบกเบ (เบชเปเบฒเบฅเบฑเบ guys tough เบเบตเปเบเบตเปเปเบเบเบเบตเปเบเบฐเปเบเบฑเบ
timedelta()
เบเบญเบกเบฎเบฑเบเปเบเปcron
- เบชเบฒเบ0 0 0/6 ? * * *
, เบชเปเบฒเบฅเบฑเบเปเบขเบฑเบเบซเบเปเบญเบ - เบเบฒเบเบชเบฐเปเบเบเบญเบญเบเบเบท@daily
);
- เบฅเบฒเบงเบเบทเปเบซเบเบฑเบ
workflow()
เบเบฐเปเบฎเบฑเบเบงเบฝเบเบเบปเปเบเบเป, เปเบเปเบเปเปเปเบกเปเบเปเบเบเบฑเบเบเบธเบเบฑเบ. เบชเปเบฒเบฅเบฑเบเปเบเบเบฑเบเบเบธเบเบฑเบ, เบเบงเบเปเบฎเบปเบฒเบเบฐเบเบดเปเบกเบเปเบฅเบดเบเบปเบเบเบญเบเบเบงเบเปเบฎเบปเบฒเปเบเบปเปเบฒเปเบเปเบเบเบฑเบเบเบถเบ.- เปเบฅเบฐเปเบเบเบฑเบเบเบธเบเบฑเบ magic เบเปเบฒเบเบเบฒเบเบเบญเบเบเบฒเบเบชเปเบฒเบเบงเบฝเบเบเบฒเบ:
- เบเบงเบเปเบฎเบปเบฒเบเปเบฒเปเบเบตเบเบเบฒเบเปเบเบเบเปเบฒเบเปเบซเบผเปเบเบเบญเบเบเบงเบเปเบฎเบปเบฒ;
- เปเบฅเบตเปเบกเบเบปเปเบ
PythonOperator
, เปเบเบดเปเบเบเบฐเบเบฐเบเบดเบเบฑเบ dummy เบเบญเบเบเบงเบเปเบฎเบปเบฒworkflow()
. เบขเปเบฒโเบฅเบทเบกโเบฅเบฐโเบเบธโเบเบทเปโเบเบตเปโเปเบเบฑเบโเปเบญโเบเบฐโเบฅเบฑเบ (เบเบฒเบโเปเบโเบเบฒโเบโ) เบเบญเบโเบงเบฝเบโเบเบฒเบโเปเบฅเบฐโเบเบนเบโเบเบฑเบโเบเบญเบโเบเบปเบโเปเบญเบโ. เบเบธเบprovide_context
เปเบเบเบฒเบเบเบฑเบเบเบฑเบ, เบเบฐเบเบญเบเปเบเบเบฒเบเปเบเปเบเบฝเบเปเบเบตเปเบกเปเบเบตเบกเปเบเบปเปเบฒเปเบเปเบเบซเบเปเบฒเบเบตเป, เปเบเบดเปเบเบเบงเบเปเบฎเบปเบฒเบเบฐเปเบเบฑเบเบเปเบฒเบขเปเบฒเบเบฅเบฐเบกเบฑเบเบฅเบฐเบงเบฑเบเปเบเบเปเบเป**context
.
เบชเปเบฒเบฅเบฑเบเปเบเบเบฑเบเบเบธเบเบฑเบ, เบเบฑเปเบเปเบกเปเบเบเบฑเบเบซเบกเบปเบ. เบชเบดเปเบเบเบตเปเบเบงเบเปเบฎเบปเบฒเปเบเปเบฎเบฑเบ:
- dag เปเบซเบกเปเปเบเบเบฒเบเปเบเปเบเบญเบเปเบงเบฑเบเปเบเบเป,
- เบซเบเบถเปเบเปเบฅเบฐเปเบเบดเปเบเบซเบเบถเปเบเปเบเบฑเบเบฎเปเบญเบเบซเบเปเบฒเบงเบฝเบเบเบตเปเบเบฐเบเบทเบเบเบฐเบเบดเบเบฑเบเปเบเบเบฐเบซเบเบฒเบ (เบเปเบฒ Airflow, เบเบฒเบเบเบฑเปเบเบเปเบฒ Celery เปเบฅเบฐเบเบงเบฒเบกเบชเบฒเบกเบฒเบเบเบญเบเปเบเบทเปเบญเบเปเบกเปเบเปเบฒเบเบญเบฐเบเบธเบเบฒเบเปเบซเปเบกเบฑเบ).
เบเบต, เปเบเบทเบญเบเปเบเปเบฎเบฑเบเบกเบฑเบ.
เปเบเบเบฐเบเบดเบเบเบฑเปเบเบเบฒเบเปเบเบดเปเบเบเบฒเบญเบฒเปเบช?
เปเบเบทเปเบญเปเบฎเบฑเบเปเบซเปเบชเบดเปเบเบเบฑเบเบซเบกเบปเบเบเบตเปเบเปเบฒเบเบเบฒเบ, เบเปเบฒเบเบฐเปเบเบปเปเบฒเปเบเป screwed เปเบ docker-compose.yml
เบเบฒเบเบเบธเบเปเบเปเบ requirements.txt
เปเบเบเบธเบ nodes.
เปเบเบเบฑเบเบเบธเบเบฑเบเบกเบฑเบเบซเบกเบปเบเปเบ:
เบชเบตเปเบซเบผเปเบฝเบกโเบชเบตโเบเบตเปโเปเบเบปเปเบฒโเปเบกเปเบโเบเบปเบงโเบขเปเบฒเบโเบงเบฝเบโเบเบฒเบโเบเบฐโเบกเบงเบโเบเบปเบโเปเบเบโเบเบฒเบโเบเบฑเปเบโเปเบงโเบฅเบฒโ.
เบเบงเบโเปเบฎเบปเบฒโเบฅเปโเบเปเบฒโเบญเบตเบโเปเปเบญเบโเปเบถเปเบ, เบงเบฝเบโเบเบฒเบโเบเบตเปโเบเบณโเบกเบฐโเบเบญเบโเปเบเปโเบเบทเบโเบเบฑเบโเบเบธเบก:
เบชเบตเบเบฝเบง, เปเบเปเบเบญเบ, เปเบเปเบชเปเบฒเปเบฅเบฑเบเบงเบฝเบเบเบฒเบเบเบญเบเปเบเบปเบฒเปเบเบปเปเบฒ. Reds เบเปเปเบเบฐเบชเบปเบเบเบปเบเบชเปเบฒเปเบฅเบฑเบเบซเบผเบฒเบ.
เปเบเบเบงเบดเบเบตเบเบฒเบเบเบฒเบ, เบเปเปเบกเบตเปเบเบเปเบเบตเบเปเบฝเบงเบเบฑเบเบเบฐเบฅเบดเบเบเบฐเบเบฑเบเบเบญเบเบเบงเบเปเบฎเบปเบฒ
./dags
, เบเปเปเบกเบตเบเบฒเบ synchronization เบฅเบฐเบซเบงเปเบฒเบเปเบเบทเปเบญเบเบเบฑเบ - dags เบเบฑเบเบซเบกเบปเบเบเบญเบเบขเบนเปเปเบgit
เบขเบนเปเปเบ Gitlab เบเบญเบเบเบงเบเปเบฎเบปเบฒ, เปเบฅเบฐ Gitlab CI เปเบเบเบขเบฒเบเบเบฒเบเบญเบฑเบเปเบเบเปเบซเปเบเบฑเบเปเบเบทเปเบญเบเบเบฑเบเปเบกเบทเปเบญเบฅเบงเบกเปเบเบปเปเบฒเบเบฑเบmaster
.
เปเบฅเบฑเบเบเปเบญเบเบเปเบฝเบงเบเบฑเบเบเบญเบเปเบกเป
เปเบเบเบฐเบเบฐเบเบตเปเบเบปเบเบเบฒเบเบเบณเบฅเบฑเบเบเบตเปเบเบทเปเบญเบเบเบฑเปเบเบเบดเบเปเบเบปเบฒเบเบญเบเบเบงเบเปเบฎเบปเบฒ, เปเบซเปเปเบฎเบปเบฒเบเบทเปเบเบณเปเบเบทเปเบญเบเบกเบทเปเบถเปเบเบเบตเปเบชเบฒเบกเบฒเบเบชเบฐเปเบเบเปเบซเปเปเบฎเบปเบฒเปเบซเบฑเบเปเบเป - เบเบญเบเปเบกเป.
เบซเบเปเบฒเบเปเบฒเบญเบดเบเบเบตเปเบกเบตเบเปเปเบกเบนเบเบชเบฐเบซเบผเบธเบเบเปเบฝเบงเบเบฑเบเบเปเปเบเบญเบเบเบฐเบเบฑเบเบเบฒเบ:
เปเปเบฒเบงเบฝเบเบเบตเปเปเบเบฑเปเบกเบเบงเบเบเบตเปเบชเบธเบเบเบฑเบเบงเบฝเบเบเบตเปเปเบเปเบฎเบฑเบเบงเบฝเบ:
เบซเบเปเบฒเปเบเบทเปเบญเบเบตเปเบชเบธเบเบเบฑเบเบชเบฐเบเบฒเบเบฐเบเบญเบเบเบฒเบเบซเบเปเบฒเบเบญเบเบเบงเบเปเบฎเบปเบฒ:
เบซเบเปเบฒเบเบตเปเบชเบปเบเปเบชเบเบตเปเบชเบธเบเปเบกเปเบเบเบฑเบเบเบฒเบเบชเบฐเบเบฒเบเบฐเบซเบเปเบฒเบงเบฝเบเปเบฅเบฐเปเบงเบฅเบฒเบเบฐเบเบดเบเบฑเบเบเบญเบเบเบงเบเปเบเบปเบฒ:
เบเบงเบเปเบฎเบปเบฒเปเบซเบผเบ underloaded
เบเบฑเปเบโเบเบฑเปเบโ, เบงเบฝเบโเบเบฒเบโเบเบฑเบโเบซเบกเบปเบโเปเบเปโเบฎเบฑเบโเบเบปเบโ, เบเปเบฒเบโเบชเบฒโเบกเบฒเบโเบเบฐโเบเบดโเบเบฑเบโเบเบฒเบโเบเบฒเบโเปเบเบฑเบโเปเบเปโ.
เปเบฅเบฐโเบกเบตโเบเบนเปโเบเบฒเบโเปเบเบฑเบโเบซเบผเบฒเบโเบเบปเบโเบเปเบงเบโเปเบซเบโเบเบปเบโเบญเบฑเบโเปเบถเปเบโเบซเบผเบทโเบญเบฑเบโเบญเบทเปเบ. เปเบเบเปเบฅเบฐเบเบตเบเบญเบเบเบฒเบเบเปเบฒเปเบเปเบเบตเปเบเบทเบเบเปเบญเบเบเบญเบ Airflow, เบชเบตเปเบซเบผเปเบฝเบกเปเบซเบผเบปเปเบฒเบเบตเปเบเบตเปเปเบซเปเปเบซเบฑเบเบงเปเบฒเบเปเปเบกเบนเบเบเปเปเปเบเปเบกเบฒเบฎเบญเบเปเบเปเบเบญเบ.
เบเปเบฒเบเบเปเบฒเปเบเบฑเบเบเปเบญเบเปเบเบดเปเบเบเบฑเบเบเบถเบเปเบฅเบฐ restart เบเปเบฅเบฐเบเบตเบเบตเปเบซเบผเบธเบเบฅเบปเบ.
เปเบเบโเบเบฒเบโเบเบฅเบดเบโเปเบชเปโเบชเบตเปโเบซเบฅเปเบฝเบกโเปเบโเบซเบเบถเปเบโ, เบเบงเบโเปเบฎเบปเบฒโเบเบฐโเปเบซเบฑเบโเบเบฒเบโเบเบฐโเบเบดโเบเบฑเบโเบเบตเปโเบกเบตโเปเบซเปโเบเบงเบโเปเบฎเบปเบฒโ:
เบเปเบฒเบโเบชเบฒโเบกเบฒเบโเปเบเปโเปเบงโเบฅเบฒโเปเบฅเบฐโเปเบฎเบฑเบโเปเบซเปโเบฅเปเบฒเบโเบเบฒเบโเบซเบผเบธเบโเบฅเบปเบโเปเบเปโ. เบเบฑเปเบเปเบกเปเบ, เบเบงเบเปเบฎเบปเบฒเบฅเบทเบกเบงเปเบฒเบเบฒเบเบชเบดเปเบเบเบฒเบเบขเปเบฒเบเบฅเบปเปเบกเปเบซเบฅเบงเบขเบนเปเบเบตเปเบเบฑเปเบ, เปเบฅเบฐเบงเบฝเบเบเบฒเบเบเบปเบงเบขเปเบฒเบเบเบฝเบงเบเบฑเบเบเบฐเปเบเบซเบฒเบเบปเบงเบเปเบฒเบเบปเบเปเบงเบฅเบฒ.
เบกเบฑเบเปเบเบฑเบเบเบตเปเบเบฑเบเปเบเบเบงเปเบฒเบเบฒเบเปเบฎเบฑเบเปเบเบเบเบตเปเบเบฑเบเบซเบเบนเบเบฑเบเบชเบตเปเบซเบฅเปเบฝเบกเบชเบตเปเบเบเบเบฑเบเบซเบกเบปเบเปเบกเปเบเบเปเปเปเบเบฑเบเบกเบฐเบเบธเบเบซเบผเบฒเบ - เบเบตเปเบเปเปเปเบกเปเบเบชเบดเปเบเบเบตเปเบเบงเบเปเบฎเบปเบฒเบเบฒเบเบซเบงเบฑเบเบเบฒเบ Airflow. เบเบฒเบกเบเปเบฒเบกเบฐเบเบฒเบ, เบเบงเบเปเบฎเบปเบฒเบกเบตเบญเบฒเบงเบธเบเบเปเบฒเบฅเบฒเบเบกเบฐเบซเบฒเบเบปเบ: Browse/Task Instances
เบกเบฒเปเบฅเบทเบญเบเบเบธเบเบขเปเบฒเบเบเปเบญเบกเปเบเบฑเบ เปเบฅเบฐเบฃเบตเปเบเบฑเบเปเบเบฑเบเบชเบนเบ, เบเบฅเบดเบเบฅเบฒเบเบเบฒเบเบเบตเปเบเบทเบเบเปเบญเบ:
เบซเบผเบฑเบโเบเบฒเบโเบเบฒเบโเบเปเบฒโเบเบงเบฒเบกโเบชเบฐโเบญเบฒเบโ, เบฅเบปเบโเปเบเบฑเบโเบเบตโเบเบญเบโเบเบงเบโเปเบฎเบปเบฒโเบกเบตโเบฅเบฑเบโเบชเบฐโเบเบฐโเบเบตเป (เบเบงเบโเปเบเบปเบฒโเปเบเบปเปเบฒโเปเบเปโเบฅเปโเบเปเบฒโเบชเปเบฒโเบฅเบฑเบโเบเบฒเบโเบเบฑเบโเบเบฒโเบเบฐโเบฅเบฒเบโเบเบฒเบโเปเบซเปโเปเบเบปเบฒโเปเบเบปเปเบฒโ)โ:
เบเบฒเบเปเบเบทเปเบญเบกเบเปเป, hooks เปเบฅเบฐเบเบปเบงเปเบเบญเบทเปเบเป
เบกเบฑเบเปเบเบฑเบเปเบงเบฅเบฒเบเบตเปเบเบฐเปเบเบดเปเบ DAG เบเปเปเปเบ, update_reports.py
:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ะะพัะฟะพะดะฐ ั
ะพัะพัะธะต, ะพััะตัั ะพะฑะฝะพะฒะปะตะฝั"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ะะฐัะฐั, ะฟัะพััะฟะฐะนัั, ะผั {{ dag.dag_id }} ััะพะฝะธะปะธ
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
เบเบธเบเบเบปเบเปเบเบตเบเปเบฎเบฑเบเบเบฒเบเบญเบฑเบเปเบเบเบฅเบฒเบเบเบฒเบเบเป? เบเบตเปเปเบกเปเบเบเบญเบเบเบฒเบเบญเบตเบเปเบเบทเปเบญเบซเบเบถเปเบ: เบกเบตเบเบฑเบเบเบตเบฅเบฒเบเบเบทเปเบเบญเบเปเบซเบผเปเบเบเปเปเบกเบนเบเบเบฒเบเบเปเบญเบเบเบตเปเปเบเปเบฎเบฑเบเบเปเปเบกเบนเบ; เบกเบตเบเบฑเบเบเบตเบฅเบฒเบเบเบทเปเบเบตเปเบเบฐเปเบญเบปเบฒ; เบขเปเบฒเบฅเบทเบก honk เปเบเปเบงเบฅเบฒเบเบตเปเบเบธเบเบชเบดเปเบเบเบธเบเบขเปเบฒเบเปเบเบตเบเบเบถเปเบเบซเบผเบทเปเบเบ (เบเบต, เบเบตเปเบเปเปเปเบกเปเบเบเปเบฝเบงเบเบฑเบเบเบงเบเปเบฎเบปเบฒ, เบเปเป).
เปเบซเปเบเปเบฒเบเปเบเบฅเปเบญเบตเบเปเบเบทเปเบญเบซเบเบถเปเบเปเบฅเบฐเปเบเบดเปเบเบชเบดเปเบเบเบตเปเบเปเปเบเบฑเบเปเบเบเปเบซเบกเป:
from commons.operators import TelegramBotSendMessage
- เบเปเปเบกเบตเบซเบเบฑเบเบเบตเบเบเบงเบฒเบเบเบงเบเปเบฎเบปเบฒเบเบฒเบเบเบฒเบเปเบฎเบฑเบเปเบซเปเบเบนเปเบเบฐเบเบญเบเบเบฒเบเบเบญเบเบเบงเบเปเบฎเบปเบฒเปเบญเบ, เปเบเบดเปเบเบเบงเบเปเบฎเบปเบฒเปเบเปเปเบเปเบเบฐเปเบซเบเบเบเบฒเบเบเบฒเบเบชเปเบฒเบ wrapper เบเบฐเบซเบเบฒเบเบเปเบญเบเบชเปเบฒเบฅเบฑเบเบเบฒเบเบชเบปเปเบเบเปเปเบเบงเบฒเบกเปเบเบซเบฒ Unblocked. (เบเบงเบเปเบฎเบปเบฒเบเบฐเบชเบปเบเบเบฐเบเบฒเปเบเบตเปเบกเปเบเบตเบกเบเปเบฝเบงเบเบฑเบเบเบนเปเบเบฐเบเบญเบเบเบฒเบเบเบตเปเบเปเบฒเบเบฅเบธเปเบกเบเบตเป);default_args={}
- dag เบชเบฒเบกเบฒเบเปเบเบเบขเบฒเบเบเบฒเบเปเบเปเบเบฝเบเบเบฝเบงเบเบฑเบเบเบฑเบเบเบนเปเบเบฐเบเบดเบเบฑเบเบเบฒเบเบเบฑเบเบซเบกเบปเบเบเบญเบเบกเบฑเบ;to='{{ var.value.all_the_kings_men }}'
- เบเบฒเบเบชเบฐเปเบฒเบกto
เบเบงเบเปเบฎเบปเบฒเบเบฐเบเปเปเบกเบต hardcoded, เปเบเปเบชเปเบฒเบเปเบเบเปเบเบทเปเบญเบเปเบซเบงเปเบเบเปเบเป Jinja เปเบฅเบฐเบเบปเบงเปเบเบเบตเปเบกเบตเบฅเบฒเบเบเบทเปเบญเบตเปเบกเบง, เบเบตเปเบเปเบญเบเปเบญเบปเบฒเปเบเปเบชเปเบขเปเบฒเบเบฅเบฐเบกเบฑเบเบฅเบฐเบงเบฑเบ.Admin/Variables
;trigger_rule=TriggerRule.ALL_SUCCESS
โ เปเบเบทเปเบญเบโเปเบโเบชเปเบฒโเบฅเบฑเบโเบเบฒเบโเปเบฅเบตเปเบกโเบเบปเปเบโเบเบฒเบโเบเบฐโเบเบดโเบเบฑเบโเบเบฒเบโ. เปเบโเบเปโเบฅเบฐโเบเบตโเบเบญเบโเบเบงเบโเปเบฎเบปเบฒ, เบเบปเบโเบซเบกเบฒเบโเบชเบฐโเบเบฑเบโเบเบฐโเบเบดเบโเปเบโเบซเบฒโเบเบฒเบโเบเปเบฒเบโเบเบฝเบโเปเบเปโเบเปเบฒโเบซเบฒเบโเบงเปเบฒโเบเบฒเบโเปเบเบดเปเบโเบเบฒโเบญเบฒโเปเบชโเบเบฑเบโเบซเบกเบปเบโเปเบเปโเปเบฎเบฑเบโเบงเบฝเบโเบญเบญเบ เบชเปเบฒเปเบฅเบฑเบ;tg_bot_conn_id='tg_main'
- เบเบฒเบเปเบเปเบเบฝเบconn_id
เบเบญเบกเบฎเบฑเบ ID เบเบฒเบเปเบเบทเปเบญเบกเบเปเปเบเบตเปเบเบงเบเปเบฎเบปเบฒเบชเปเบฒเบAdmin/Connections
;trigger_rule=TriggerRule.ONE_FAILED
- เบเปเปเบเบงเบฒเบกเปเบ Telegram เบเบฐเบเบดเบเปเบเบเบฝเบเปเบเปเบเปเบฒเบกเบตเบงเบฝเบเบเบฒเบเบซเบผเบธเบเบฅเบปเบ;task_concurrency=1
- เบเบงเบโเปเบฎเบปเบฒโเบซเปเบฒเบกโเบเบฒเบโเปเบเบตเบโเบเบปเบงโเบเปเบญเบกโเบเบฑเบโเบเบญเบโเบซเบผเบฒเบโเบเบปเบงโเบขเปเบฒเบโเบเบญเบโเบงเบฝเบโเบเบฒเบโเบซเบเบถเปเบโ. เบเปเบฒเบเปเปเบเบฑเปเบเบเบฑเปเบ, เบเบงเบเปเบฎเบปเบฒเบเบฐเปเบเปเบฎเบฑเบเบเบฒเบเปเบเบตเบเบเบปเบงเบเปเบญเบกเปเบเบฑเบเบเบญเบเบซเบผเบฒเบเปเบเบปเบVerticaOperator
(เปเบเบดเปเบเบขเบนเปเปเบเบเบฒเบเบฐเบฅเบฒเบเบซเบเบถเปเบ);report_update >> [email, tg]
- เบเบธเบเบขเปเบฒเบVerticaOperator
เบกเบฒโเบฎเปเบงเบกโเบเบฑเบโเปเบโเบเบฒเบโเบชเบปเปเบโเบเบปเบโเบซเบกเบฒเบโเปเบฅเบฐโเบเปเปโเบเบงเบฒเบกโ, เปเบเบฑเปเบโเบเบตเปโ:
เปเบเปเปเบเบทเปเบญเบเบเบฒเบเบเบนเปเบเบฐเบเบดเบเบฑเบเบเบฒเบเปเบเปเบเบเบฒเบเบกเบตเปเบเบทเปเบญเบเปเบเบเบฒเบเปเบเบตเบเบเบปเบงเบเบตเปเปเบเบเบเปเบฒเบเบเบฑเบ, เบเบฝเบเปเบเปเบซเบเบถเปเบเบเบฐเปเบฎเบฑเบเบงเบฝเบ. เปเบ Tree View, เบเบธเบเบชเบดเปเบเบเบธเบเบขเปเบฒเบเปเบเบดเปเบเบซเบเปเบญเบเบฅเบปเบเปเบฅเบฑเบเบเปเบญเบ:
เบเปเบญเบเบเบฐเปเบงเบปเปเบฒเบชเบญเบเบชเบฒเบกเบเปเบฒเบเปเบฝเบงเบเบฑเบ เบกเบฐเบซเบฒเบเบฒเบ เปเบฅเบฐโเบซเบกเบนเปโเปเบเบทเปเบญเบโเบเบญเบโเปเบเบปเบฒโเปเบเบปเปเบฒ - เบเบปเบงเปเบ.
Macros เปเบกเปเบเบเบปเบงเบเบถเบ Jinja เบเบตเปเบชเบฒเบกเบฒเบเบเบปเบเปเบเบเบเปเปเบกเบนเบเบเบตเปเปเบเบฑเบเบเบฐเปเบซเบเบเบเปเบฒเบเปเปเบเบปเปเบฒเปเบเบเบฒเบเปเบเปเบเบฝเบเบเบญเบเบเบนเปเบเบฐเบเบญเบเบเบฒเบ. เบเบปเบงเบขเปเบฒเบเปเบเบฑเปเบเบเบตเป:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }}
เบเบฐเบเบฐเบซเบเบฒเบเปเบเบชเบนเปเปเบเบทเปเบญเปเบเบเบญเบเบเบปเบงเปเบเบเบญเบเบชเบฐเบเบฒเบเบเบฒเบ execution_date
เปเบเบฎเบนเบเปเบเบ YYYY-MM-DD
: 2020-07-14
. เบชเปเบงเบเบเบตเปเบเบตเบเบตเปเบชเบธเบเปเบกเปเบเบงเปเบฒเบเบปเบงเปเบเบเปเบฅเบดเบเบปเบเบเบทเบเบเบดเบเปเบชเปเบเบฑเบเบเบปเบงเบขเปเบฒเบเบงเบฝเบเบชเบฐเปเบเบฒเบฐ (เบชเบตเปเบซเบผเปเบฝเบกเปเบ Tree View), เปเบฅเบฐเปเบกเบทเปเบญเปเบฅเบตเปเบกเบเบปเปเบเปเบซเบกเป, เบเบปเบงเบเบถเบเบเบฐเบเบฐเบซเบเบฒเบเปเบเบชเบนเปเบเปเบฒเบเบฝเบงเบเบฑเบ.
เบเปเบฒเบเบตเปเปเบเปเบฎเบฑเบเบกเบญเบเปเบฒเบเบชเบฒเบกเบฒเบเปเบเบดเปเบเปเบเปเปเบเบเปเบเปเบเบธเปเบก Rendered เปเบเปเบเปเบฅเบฐเปเปเบฒเบงเบฝเบ. เบเบตเปเปเบกเปเบเบงเบดเบเบตเบเบฒเบเบชเบปเปเบเบเบปเบเบซเบกเบฒเบ:
เปเบฅเบฐเบเบฑเปเบเบเบฑเปเบเบขเบนเปเปเบเบงเบฝเบเบเบฒเบเบเบตเปเบกเบตเบเบฒเบเบชเบปเปเบเบเปเปเบเบงเบฒเบก:
เบเบฑเบเบเบตเบฅเบฒเบเบเบทเปเบเบปเบเบเปเบงเบเบเบญเบ macro เบเบตเปเบกเบตเปเบเบเบปเบงเบชเปเบฒเบฅเบฑเบเบฎเบธเปเบเบซเบผเปเบฒเบชเบธเบเบเบตเปเบกเบตเบขเบนเปเปเบกเปเบเบขเบนเปเบเบตเปเบเบตเป:
เบเบดเปเบเปเบเบเบงเปเบฒเบเบฑเปเบ, เบเปเบงเบเบเบฒเบเบเปเบงเบเปเบซเบผเบทเบญเบเบญเบ plugins, เบเบงเบเปเบฎเบปเบฒเบชเบฒเบกเบฒเบเบเบฐเบเบฒเบ macro เบเบญเบเบเบงเบเปเบฎเบปเบฒเปเบญเบ, เปเบเปเบเบฑเปเบเปเบกเปเบเปเบฅเบทเปเบญเบเบญเบทเปเบ.
เบเบญเบเปเบซเบเบทเบญเบเบฒเบเบชเบดเปเบเบเบตเปเบเปเบฒเบเบปเบเปเบงเปเบฅเปเบงเบเบซเบเปเบฒ, เบเบงเบเปเบฎเบปเบฒเบชเบฒเบกเบฒเบเบเบปเบเปเบเบเบเปเบฒเบเบญเบเบเบปเบงเปเบเบเบญเบเบเบงเบเปเบฎเบปเบฒ (เบเปเบญเบเปเบเปเปเบเปเบกเบฑเบเบขเบนเปเปเบเบฅเบฐเบซเบฑเบเบเปเบฒเบเปเบเบดเบ). เบกเบฒเบชเปเบฒเบเปเบ Admin/Variables
เบชเบญเบเบชเบฒเบกเบขเปเบฒเบ:
เบเบธเบโเบชเบดเปเบโเบเบธเบโเบขเปเบฒเบโเบเบตเปโเบเปเบฒเบโเบชเบฒโเบกเบฒเบโเบเปเบฒโเปเบเปโ:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')
เบเปเบฒเบชเบฒเบกเบฒเบเปเบเบฑเบ Scalar, เบซเบผเบทเบกเบฑเบเบชเบฒเบกเบฒเบเปเบเบฑเบ JSON. เปเบเบเปเบฅเบฐเบเบตเบเบญเบ JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}
เบเบฝเบเปเบเปเปเบเปเปเบชเบฑเปเบเบเบฒเบเปเบเบซเบฒเบเบฐเปเบเบเบตเปเบเปเบญเบเบเบฒเบ: {{ var.json.bot_config.bot.token }}
.
เบเปเบฒโเบเบฐโเปเบเบปเปเบฒโเบฎเบนเปโเบซเบเบฑเบโเบชเบทโเบเบฐโเปเบงเบปเปเบฒโเบซเบเบถเปเบโเบเปเบฒโเปเบฅเบฐโเบชเบฐโเปเบเบโเปเบซเปโเปเบซเบฑเบ screenshot เบซเบเบถเปเบโเบเปเบฝเบงโเบเบฑเบโเบเบฒเบโ เบเบฒเบเปเบเบทเปเบญเบกเบเปเป. เบเบธเบเบชเบดเปเบเบเบธเบเบขเปเบฒเบเปเบกเปเบเบเบทเปเบเบเบฒเบเบขเบนเปเบเบตเปเบเบตเป: เปเบเบซเบเปเบฒ Admin/Connections
เบเบงเบเปเบฎเบปเบฒเบชเปเบฒเบเบเบฒเบเปเบเบทเปเบญเบกเบเปเป, เปเบเบตเปเบกเบเบฒเบเปเบเบปเปเบฒเบชเบนเปเบฅเบฐเบเบปเบ / เบฅเบฐเบซเบฑเบเบเปเบฒเบเบเบญเบเบเบงเบเปเบฎเบปเบฒเปเบฅเบฐเบเบปเบงเบเปเบฒเบเบปเบเบเบฒเบเบชเบฐเปเบเบฒเบฐเบซเบผเบฒเบเบขเบนเปเบเบตเปเบเบฑเปเบ. เปเบเบเบเบตเป:
เบฅเบฐเบซเบฑเบเบเปเบฒเบเบชเบฒเบกเบฒเบเบเบทเบเปเบเบปเปเบฒเบฅเบฐเบซเบฑเบ (เบขเปเบฒเบเบฅเบฐเบญเบฝเบเบเบงเปเบฒเบเปเบฒเปเบฅเบตเปเบกเบเบปเปเบ), เบซเบผเบทเบเปเบฒเบเบชเบฒเบกเบฒเบเบญเบญเบเบเบฒเบเบเบฐเปเบเบเบเบฒเบเปเบเบทเปเบญเบกเบเปเป (เบเบฑเปเบเบเบตเปเบเปเบญเบเปเบเปเปเบฎเบฑเบเบชเปเบฒเบฅเบฑเบ tg_main
) - เบเบงเบฒเบกเบเบดเบเปเบกเปเบเบงเปเบฒเบเบฑเบเบเบตเบฅเบฒเบเบเบทเปเบเบญเบเบเบฐเปเบเบเปเบกเปเบ hardwired เปเบเบฎเบนเบเปเบเบ Airflow เปเบฅเบฐเบเปเปเบชเบฒเบกเบฒเบเบเบฐเบซเบเบฒเบเปเบเปเปเบเบเบเปเปเบกเบตเบเบฒเบเปเบเบปเปเบฒเปเบเปเบเบฅเบฐเบซเบฑเบเปเบซเบผเปเบ (เบเปเบฒเบซเบฒเบเบงเปเบฒเบเบฑเบเบเบตเบเบฑเบเปเบเบเปเบฒเบเบฐเปเบเบปเปเบฒเบเปเปเปเบเป google เบเบฒเบเบชเบดเปเบเบเบฒเบเบขเปเบฒเบ, เบเบฐเบฅเบธเบเบฒเปเบเปเปเบเบเปเบฒเบเบฐเปเบเบปเปเบฒ), เปเบเปเบเปเปเบกเบตเบซเบเบฑเบเบเบฐเบขเบธเบเบเบงเบเปเบฎเบปเบฒเบเบฒเบเบเบฒเบเปเบเปเบฎเบฑเบเบชเบดเบเปเบเบทเปเบญเบเบฝเบเปเบเปเปเบเบเบเบฒเบ. เบเบทเป.
เบเบญเบเบเบฑเปเบเบเปเบฒเบเบเบฑเบเบชเบฒเบกเบฒเบเปเบฎเบฑเบเบเบฒเบเปเบเบทเปเบญเบกเบเปเปเบเปเบฒเบเบงเบเบซเบเบถเปเบเบเบตเปเบกเบตเบเบทเปเบเบฝเบงเบเบฑเบ: เปเบเบเปเบฅเบฐเบเบตเบเบตเป, เบงเบดเบเบตเบเบฒเบ BaseHook.get_connection()
, เปเบเบดเปเบเปเบฎเบฑเบเปเบซเปเบเบงเบเปเบฎเบปเบฒเปเบเบทเปเบญเบกเบเปเปเปเบเบเบเบทเป, เบเบฐเปเบซเป เบชเบธเปเบก เบเบฒเบเบซเบผเบฒเบเป namesakes (เบกเบฑเบเบเบฐเบกเบตเปเบซเบเบเบปเบเบซเบผเบฒเบเบเบงเปเบฒเบเบตเปเบเบฐเปเบฎเบฑเบเปเบซเป Round Robin, เปเบเปเปเบซเปเบกเบฑเบเบขเบนเปเปเบเบเบดเบเปเบเบเบญเบเบเบนเปเบเบฑเบเบเบฐเบเบฒ Airflow).
เบเบปเบงเปเบเปเบฅเบฐเบเบฒเบเปเบเบทเปเบญเบกเบเปเปเปเบเปเบเบญเบเปเบกเปเบเปเบเบทเปเบญเบเบกเบทเบเบตเปเปเบขเบฑเบ, เปเบเปเบกเบฑเบเบชเปเบฒเบเบฑเบเบเบตเปเบเบฐเบเปเปเบชเบนเบเปเบชเบเบเบงเบฒเบกเบชเบปเบกเบเบนเบ: เบชเปเบงเบเปเบเบเบญเบเบเบฒเบเปเบซเบผเบเบญเบเปเบเบปเปเบฒเบเบตเปเบเปเบฒเบเปเบเบฑเบเปเบงเปเปเบเบฅเบฐเบซเบฑเบเบเบปเบงเบกเบฑเบเปเบญเบ, เปเบฅเบฐเบชเปเบงเบเปเบเบเบตเปเบเปเบฒเบเปเบซเป Airflow เบชเปเบฒเบฅเบฑเบเบเบฒเบเปเบเบฑเบเบฎเบฑเบเบชเบฒ. เปเบเบเปเบฒเบเบซเบเบถเปเบ, เบกเบฑเบเบชเบฒเบกเบฒเบเบชเบฐเบเบงเบเปเบเบเบฒเบเบเปเบฝเบเปเบเบเบกเบนเบเบเปเบฒเบขเปเบฒเบเปเบงเบงเบฒ, เบเบปเบงเบขเปเบฒเบ, เบเปเบญเบเบเบปเบเบซเบกเบฒเบ, เบเปเบฒเบ UI. เปเบเบเบฒเบเบเบปเบเบเบฑเบเบเปเบฒเบก, เบเบตเปเบเบฑเบเปเบเบฑเบเบเบฒเบเบเบฑเบเบเบทเบเปเบเบซเบฒเบเบฒเบเบเบฅเบดเบเบซเบเบน, เบเบฒเบเบเบตเปเบเบงเบเปเบฎเบปเบฒ (เบเปเบญเบ) เบเปเบญเบเบเบฒเบเบเปเบฒเบเบฑเบ.
เบเบฒเบเปเบฎเบฑเบเบงเบฝเบเบเบฑเบเบเบฒเบเปเบเบทเปเบญเบกเบเปเปเปเบกเปเบเบซเบเบถเปเบเปเบเบงเบฝเบเบเบฒเบ hooks. เปเบเบเบเบปเปเบงเปเบ, Airflow hooks เปเบกเปเบเบเบธเบเบชเปเบฒเบฅเบฑเบเบเบฒเบเปเบเบทเปเบญเบกเบเปเปเบกเบฑเบเบเบฑเบเบเบฒเบเบเปเบฅเบดเบเบฒเบเบเบฒเบเบชเปเบงเบเบเบตเบชเบฒเบกเปเบฅเบฐเบซเปเบญเบเบชเบฐเบซเบกเบธเบ. เบเบปเบงเบขเปเบฒเบ: JiraHook
เบเบฐเปเบเบตเบเบฅเบนเบเบเปเบฒเปเบซเปเบเบงเบเปเบฎเบปเบฒเบเบปเบงเบเบฑเบเบเบฑเบ Jira (เบเปเบฒเบเบชเบฒเบกเบฒเบเบเปเบฒเบเบงเบฝเบเปเบเบกเบฒเปเบเป), เปเบฅเบฐเบเปเบงเบเบเบฒเบเบเปเบงเบเปเบซเบผเบทเบญเบเบญเบ SambaHook
เบเปเบฒเบเบชเบฒเบกเบฒเบเบเบนเปเปเบเบฅเปเบเปเบญเบเบเบดเปเบเปเบเบซเบฒ smb
- เบเบธเบ.
เบเบณเบฅเบฑเบเบงเบดเปเบเบฒเบฐเบเบปเบงเบเบณเปเบเบตเบเบเบฒเบเปเบเบเบเบณเบเบปเบเปเบญเบ
เปเบฅเบฐโเบเบงเบโเปเบฎเบปเบฒโเปเบเปโเปเบเบปเปเบฒโเปเบโเปเบเปโเบเบฑเบโเบเบฒเบโเปเบเบดเปเบโเบงเบดโเบเบตโเบเบฒเบโเบเบตเปโเบกเบฑเบโเปเบฎเบฑเบโเปเบเปโ TelegramBotSendMessage
เบฅเบฐโเบซเบฑเบ commons/operators.py
เบเบฑเบเบเบนเปเบเบฐเบเบญเบเบเบฒเบเบเบปเบงเบเบดเบ:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
เบเบตเปเบเบตเป, เปเบเบฑเปเบเบเบฝเบงเบเบฑเบเบชเบดเปเบเบญเบทเปเบเปเบ Airflow, เบเบธเบเบชเบดเปเบเบเบธเบเบขเปเบฒเบเปเบกเปเบเบเปเบฒเบเบเบฒเบเบซเบผเบฒเบ:
- เบชเบทเบเบเบญเบเบกเบฒเบเบฒเบ
BaseOperator
, เปเบเบดเปเบเบเบฐเบเบดเบเบฑเบเบเบฒเบเบชเบดเปเบเบเบตเปเบชเบฐเปเบเบฒเบฐ Airflow (เปเบเบดเปเบเบเบฒเบเบเบฑเบเบเปเบญเบเบเบญเบเบเปเบฒเบ) - เบเบปเปเบเบเบฒเบเบตเปเบเบฐเบเบฒเบ
template_fields
, เปเบเบเบตเป Jinja เบเบฐเบเบญเบเบซเบฒเบกเบฐเบซเบฒเบเบฒเบเปเบเบทเปเบญเบเบฐเบกเบงเบเบเบปเบ. - เบเบฑเบเปเบเบเบเบฒเบเปเบเปเบเบฝเบเบเบตเปเบเบทเบเบเปเบญเบเบชเปเบฒเบฅเบฑเบ
__init__()
, เบเบฑเปเบเบเปเบฒเปเบฅเบตเปเบกเบเบปเปเบเปเบเบเปเบญเบเบเบตเปเบกเบตเบเบงเบฒเบกเบเปเบฒเปเบเบฑเบ. - เบเบงเบเปเบฎเบปเบฒเบเปเปเปเบเปเบฅเบทเบกเบเปเบฝเบงเบเบฑเบเบเบฒเบเปเบฅเบตเปเบกเบเบปเปเบเบเบญเบเบเบฑเบเบเบฐเบเบธเบฅเบธเบ.
- เปเบเบตเบ hook เบเบตเปเบชเบญเบเบเปเบญเบเบเบฑเบ
TelegramBotHook
เปเบเปเบฎเบฑเบเบงเบฑเบเบเบธเบเบญเบเบฅเบนเบเบเปเบฒเบเบฒเบเบกเบฑเบ. - เบงเบดเบเบตเบเบฒเบ overridden (เบเปเบฒเบเบปเบเบเบทเบเปเบซเบกเป).
BaseOperator.execute()
, เบเบตเป Airfow เบเบฐ twitch เปเบกเบทเปเบญเปเบงเบฅเบฒเบเบตเปเบเบฐเปเบเบตเบเบเบปเบงเบเบนเปเบเบฐเบเบดเบเบฑเบเบเบฒเบ - เปเบเบกเบฑเบเบเบงเบเปเบฎเบปเบฒเบเบฐเบเบฐเบเบดเบเบฑเบเบเบฒเบเบเบฐเบเบดเบเบฑเบเบเบปเปเบเบเป, เบฅเบทเบกเปเบเบปเปเบฒเบชเบนเปเบฅเบฐเบเบปเบ. (เบเบงเบเปเบฎเบปเบฒเปเบเบปเปเบฒเบชเบนเปเบฅเบฐเบเบปเบ, เปเบเบเบงเบดเบเบตเบเบฒเบเบเบฒเบ, เปเบเบปเปเบฒเปเบstdout
ะธstderr
- เบเบฒเบโเปเบซเบผโเบงเบฝเบโเบเบญเบโเบญเบฒโเบเบฒเบโเบเบฐโเบชเบฐโเบเบฑเบโเบเบธเบโเบชเบดเปเบโเบเบธเบโเบขเปเบฒเบโ, เบซเปเปโเบกเบฑเบโเบเบฒเบกโ, decompose เบกเบฑเบโเปเบโเบเบตเปโเบเปเบฒโเปเบเบฑเบโ.
เปเบซเปเปเบเบดเปเบเบชเบดเปเบเบเบตเปเบเบงเบเปเบฎเบปเบฒเบกเบต commons/hooks.py
. เบชเปเบงเบเบเปเบฒเบญเบดเบเบเบญเบเปเบเบฅเป, เบเบฑเบ hook เบเบปเบงเบเบญเบเบกเบฑเบเปเบญเบ:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
เบเปเบญเบเบเปเปเบฎเบนเปเบงเปเบฒเบเบฐเบญเบฐเบเบดเบเบฒเบเบซเบเบฑเบเบขเบนเปเบเบตเปเบเบตเป, เบเปเบญเบเบเบฝเบเปเบเปเบเบฐเบชเบฑเบเปเบเบเบเบธเบเบชเปเบฒเบเบฑเบ:
- เบเบงเบเปเบฎเบปเบฒเบชเบทเบเบเบญเบ, เบเบดเบเบเปเบฝเบงเบเบฑเบเบเบฒเบเปเบเปเบเบฝเบ - เปเบเบเปเบฅเบฐเบเบตเบซเบผเบฒเบเบเบตเปเบชเบธเบ, เบกเบฑเบเบเบฐเปเบเบฑเบเบซเบเบถเปเบ:
conn_id
; - overriding เบงเบดเบเบตเบเบฒเบเบกเบฒเบเบเบฐเบเบฒเบ: เบเปเบญเบเบเปเบฒเบเบฑเบเบเบปเบงเปเบญเบ
get_conn()
, เปเบเบเบตเปเบเปเบญเบเปเบเปเบฎเบฑเบเบเบปเบงเบเปเบฒเบเบปเบเบเบฒเบเปเบเบทเปเบญเบกเบเปเปเปเบเบเบเบทเปเปเบฅเบฐเบเบฝเบเปเบเปเปเบเปเบฎเบฑเบเบชเปเบงเบextra
(เบเบตเปเปเบกเปเบเบเปเบญเบ JSON), เปเบเบดเปเบเบเปเบญเบ (เบเบฒเบกเบเปเบฒเปเบเบฐเบเปเบฒเบเบญเบเบเปเบญเบเปเบญเบ!) เปเบชเป Telegram bot token:{"bot_token": "YOuRAwEsomeBOtToKen"}
. - เบเปเบญเบเบชเปเบฒเบเบเบปเบงเบขเปเบฒเบเบเบญเบเบเบงเบเปเบฎเบปเบฒ
TelegramBot
, เปเบซเปเบกเบฑเบเปเบเบฑเบ token เบชเบฐเปเบเบฒเบฐ.
เบซเบกเบปเบโเปเบเบปเปเบฒโเบเบตเป. เบเปเบฒเบเบชเบฒเบกเบฒเบเปเบเปเบฎเบฑเบเบฅเบนเบเบเปเบฒเบเบฒเบ hook เปเบเบเปเบเป TelegramBotHook().clent
เบซเบผเบท TelegramBotHook().get_conn()
.
เปเบฅเบฐเบชเปเบงเบเบเบตเบชเบญเบเบเบญเบเปเบเบฅเป, เปเบเบเบตเปเบเปเบญเบเปเบฎเบฑเบ microwrapper เบชเปเบฒเบฅเบฑเบ Telegram REST API, เปเบเบทเปเบญเบเปเปเปเบซเปเบฅเบฒเบเบเบทเบเบฑเบ. python-telegram-bot
sendMessage
.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))
เบงเบดเบเบตเบเบตเปเบเบทเบเบเปเบญเบเปเบกเปเบเบเบฒเบเปเบเบตเปเบกเบกเบฑเบเบเบฑเบเปเบปเบ:
TelegramBotSendMessage
,TelegramBotHook
,TelegramBot
- เปเบ plugin, เปเบชเปเปเบ repository เบชเบฒเบเบฒเบฅเบฐเบเบฐ, เปเบฅเบฐเปเบซเปเบกเบฑเบเบเบฑเบ Open Source.
เปเบเบเบฐเบเบฐเบเบตเปเบเบงเบเปเบฎเบปเบฒเบชเบถเบเบชเบฒเบเบฑเบเบซเบกเบปเบเบเบตเป, เบเบฒเบเบเบฑเบเบเบธเบเบเบปเบเบฅเบฒเบเบเบฒเบเบเบญเบเบเบงเบเปเบฎเบปเบฒเปเบเปเบฅเบปเปเบกเปเบซเบฅเบงเบขเปเบฒเบเบชเปเบฒเปเบฅเบฑเบเบเบปเบเปเบฅเบฐเบชเบปเปเบเบเปเปเบเบงเบฒเบกเบเปเปเบเบดเบเบเบฒเบเปเบซเปเบเปเบญเบเปเบเบเปเบญเบเบเบฒเบ. เบเปเบญเบโเบเบฐโเบเบงเบโเปเบเบดเปเบโเบงเปเบฒโเบเบดเบโเบเป...
เบเบฒเบเบชเบดเปเบเบเบฒเบเบขเปเบฒเบเปเบเบเบขเบนเปเปเบ doge เบเบญเบเบเบงเบเปเบฎเบปเบฒ! เบกเบฑเบเบเปเปเปเบกเปเบเบชเบดเปเบเบเบตเปเบเบงเบเปเบฎเบปเบฒเบเบฒเบเบซเบงเบฑเบ? เบขเปเบฒเบโเปเบเปโเบเบญเบ!
เปเบเบปเปเบฒเบเบฐเบเบญเบเบเป?
เปเบเบปเปเบฒเบฎเบนเปเบชเบถเบเบงเปเบฒเบเปเบญเบเบเบฒเบเบเบฒเบเบชเบดเปเบเบเบฒเบเบขเปเบฒเบเบเป? เบกเบฑเบเปเบเบดเปเบเบเบทเบงเปเบฒเบฅเบฒเบงเบชเบฑเบเบเบฒเบงเปเบฒเบเบฐเปเบญเบเบเปเปเบกเบนเบเบเบฒเบ SQL Server เบเบฑเบ Vertica, เปเบฅเบฐเบซเบผเบฑเบเบเบฒเบเบเบฑเปเบเบฅเบฒเบงเปเบเปเปเบญเบปเบฒเบกเบฑเบเปเบฅเบฐเบเปเบฒเบเบญเบญเบเบเบฒเบเบซเบปเบงเบเปเป, เบเบปเบเบเบตเปเบเปเบฒเบ!
เบเบงเบฒเบกเปเบซเบเบฎเปเบฒเบเบเบตเปเปเบกเปเบเบกเบตเปเบเบเบเบฐเบเบฒ, เบเปเบฒเบเบฐเปเบเบปเปเบฒเบเบฝเบเปเบเปเบเปเบญเบเปเบเปเบเบญเบเบฅเบฐเบซเบฑเบเบเบฒเบเบเปเบฒเบชเบฑเบเบชเปเบฒเบฅเบฑเบเบเปเบฒเบ. เปเบเบเบฑเบเบเบธเบเบฑเบเบเปเบฒเบเบชเบฒเบกเบฒเบเปเบเบเบทเปเบกเบญเบตเบ.
เปเบเบโเบเบฒเบโเบเบญเบโเบเบงเบโเปเบฎเบปเบฒโเปเบกเปเบโเบเบตเปโ:
- เปเบฎเบฑเบเบเปเป
- เบชเปเบฒเบเปเปเบฒเบงเบฝเบ
- เปเบเบดเปเบเบงเปเบฒเบเบธเบเบขเปเบฒเบเบชเบงเบเบเบฒเบกเบเบฒเบเปเบ
- เบกเบญเบเบซเบกเบฒเบเปเบฅเบเปเบเบเบเบฑเบเปเบซเปเบเบทเปเบก
- เปเบญเบปเบฒเบเปเปเบกเบนเบเบเบฒเบ SQL Server
- เปเบญเบปเบฒเบเปเปเบกเบนเบเปเบเบปเปเบฒเปเบเปเบ Vertica
- เปเบเบฑเบเบเปเบฒเบชเบฐเบเบดเบเบด
เบเบฑเปเบเบเบฑเปเบ, เปเบเบทเปเบญเปเบฎเบฑเบเปเบซเปเบชเบดเปเบเบเบฑเบเบซเบกเบปเบเบเบตเปเปเบเบตเปเบกเบเบถเปเบ, เบเปเบฒเบเบฐเปเบเบปเปเบฒเปเบเปเปเบเบตเปเบกเบเบฐเบซเบเบฒเบเบเปเบญเบเบเบญเบเบเบงเบเปเบฎเบปเบฒ docker-compose.yml
:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.py
เบขเบนเปเบเบตเปเบเบฑเปเบเบเบงเบเปเบฎเบปเบฒเบเบปเบเบเบถเปเบเบกเบฒ:
- Vertica เปเบเบฑเบเปเบเบปเปเบฒเบเบฒเบ
dwh
เบเปเบงเบเบเบฒเบเบเบฑเปเบเบเปเบฒเปเบฅเบตเปเบกเบเบปเปเบเบเบตเปเบชเบธเบ, - เบชเบฒเบกเบเบปเบงเบขเปเบฒเบเบเบญเบ SQL Server,
- เบเบงเบโเปเบฎเบปเบฒโเบเบทเปเบกโเบเปเปโเบกเบนเบโเปเบชเปโเบเบฒเบโเบเปเปโเบกเบนเบโเปเบโเบเปโเบฅเบฐโเบเบตโเบเบตเปโเบกเบตโเบเบฒเบโเบเปเปโเบกเบนเบ (เปเบโเบเปโเบฅเบฐโเบเบตโเบเบตเปโเบเปเปโเบกเบตโเบเปเปโเปเบเปโเปเบเบดเปเบโเปเบเบปเปเบฒโเปเบโเปเบ
mssql_init.py
!)
เบเบงเบเปเบฎเบปเบฒเปเบเบตเบเบเบปเบงเบชเบดเปเบเบเบตเปเบเบตเบเบฑเบเบซเบกเบปเบเบเปเบงเบเบเบฒเบเบเปเบงเบเปเบซเบผเบทเบญเบเบญเบเบเปเบฒเบชเบฑเปเบเบเบตเปเบชเบฑเบเบชเบปเบเปเบฅเบฑเบเบเปเบญเบเบเบงเปเบฒเบเบฑเปเบเบเบตเปเบเปเบฒเบเบกเบฒ:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
เบชเบดเปเบเบเบตเปเบกเบฐเบซเบฑเบเบชเบฐเบเบฑเบ randomizer เบเบญเบเบเบงเบเปเบฎเบปเบฒเบชเปเบฒเบ, เบเปเบฒเบเบชเบฒเบกเบฒเบเบเปเบฒเปเบเปเบฅเบฒเบเบเบฒเบ Data Profiling/Ad Hoc Query
:
เบชเบดเปเบเบเบตเปเบชเปเบฒเบเบฑเบเปเบกเปเบเบเปเปเบชเบฐเปเบเบเปเบซเปเบเบฑเบเบงเบดเปเบเบฒเบฐ
เบฅเบฐเบญเบฝเบเบเปเบฝเบงเบเบฑเบ เปเบเบเบเบฑเบ ETL เบเปเบฒเบเบฐเปเบเบปเปเบฒเบเบฐเบเปเป, เบเบธเบเบชเบดเปเบเบเบธเบเบขเปเบฒเบเปเบกเปเบเปเบฅเบทเปเบญเบเปเบฅเบฑเบเปเบเปเบญเบเปเบขเบนเปเบเบตเปเบเบฑเปเบ: เบเบงเบเปเบฎเบปเบฒเบชเปเบฒเบเบเบทเปเบเบเบฒเบ, เบกเบตเปเบเบทเปเบญเบเบซเบกเบฒเบเปเบเบกเบฑเบ, เบเบงเบเปเบฎเบปเบฒเบซเปเปเบเบธเบเบชเบดเปเบเบเบธเบเบขเปเบฒเบเบเปเบงเบเบเบปเบงเบเบฑเบเบเบฒเบเบชเบฐเบเบฒเบเบเบฒเบ, เปเบฅเบฐเบเบญเบเบเบตเปเบเบงเบเปเบฎเบปเบฒเปเบฎเบฑเบเบชเบดเปเบเบเบตเป:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
เปเบงเบฅเบฒเบกเบฒเบฎเบญเบเปเบฅเปเบง เปเบเบฑเบเบเปเบฒเบเปเปเบกเบนเบเบเบญเบเบเบงเบเปเบฎเบปเบฒ เบเบฒเบเบซเบเบถเปเบเปเบฅเบฐเปเบเบดเปเบเบซเบเบถเปเบเบฎเปเบญเบเบเบฒเบเบฐเบฅเบฒเบเบเบญเบเบเบงเบเปเบฎเบปเบฒ. เปเบซเปเปเบฎเบฑเบเบชเบดเปเบเบเบตเปเบเปเบงเบเบเบฒเบเบเปเบงเบเปเบซเบผเบทเบญเบเบญเบเบชเบฒเบ unpretentious เบซเบผเบฒเบ:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
- เบเปเบงเบเบเบฒเบเบเปเบงเบเปเบซเบผเบทเบญเบเบญเบ hook เบเบตเปเบเบงเบเปเบฎเบปเบฒเปเบเปเบฎเบฑเบเบเบฒเบ Airflow
pymssql
- เปเบเบทเปเบญเบกโเบเปเปโ - เปเบซเปเบเบปเบเปเบเบเบเปเปเบเปเบฒเบเบฑเบเปเบเบฎเบนเบเปเบเบเบเบญเบเบงเบฑเบเบเบตเปเบเบปเปเบฒเปเบเปเบเบเปเบฒเบฎเปเบญเบเบเป - เบกเบฑเบเบเบฐเบเบทเบเปเบเบเปเบเบปเปเบฒเปเบเปเบเบซเบเปเบฒเบเบตเปเปเบเบเปเบเบทเปเบญเบเบเบฑเบเปเบกเปเปเบเบ.
- เบเบฒเบเปเบซเปเบญเบฒเบซเบฒเบเบเปเบฒเบฎเปเบญเบเบเปเบเบญเบเบเบงเบเปเบฎเบปเบฒ
pandas
เบเบนเปโเบเบตเปโเบเบฐโเปเบเปโเบฎเบฑเบโเบเบงเบโเปเบฎเบปเบฒโDataFrame
- เบกเบฑเบเบเบฐเปเบเบฑเบเบเบฐเปเบซเบเบเบเปเปเบเบงเบเปเบฎเบปเบฒเปเบเบญเบฐเบเบฒเบเบปเบ.
เบเปเบญเบเบเปเบฒเบฅเบฑเบเปเบเปเบเบฒเบเบเบปเบเปเบเบ
{dt}
เปเบเบเบเบตเปเบเบฐเปเบเบฑเบเบเบปเบงเบเปเบฒเบเบปเบเบเบฒเบเบฎเปเบญเบเบเป%s
เบเปเปเปเบกเปเบเบเปเบญเบเบงเปเบฒเบเปเบญเบเปเบเบฑเบ Pinocchio เบเบปเปเบงเบฎเปเบฒเบ, เปเบเปเปเบเบทเปเบญเบเบเบฒเบเบงเปเบฒpandas
เบเปเปเบชเบฒเบกเบฒเบเบเบฑเบเบเบฒเบเปเบเปpymssql
เปเบฅเบฐเปเบฅเบทเปเบญเบเบญเบฑเบเบชเบธเบเบเปเบฒเบparams: List
เปเบเบดเบเปเบกเปเบเบงเปเบฒเบฅเบฒเบงเบเปเบญเบเบเบฒเบเปเบเปเปtuple
.
เบเบฑเบเบชเบฑเบเปเบเบเบงเปเบฒเบเบฑเบเบเบฑเบเบเบฐเบเบฒpymssql
เปเบเปโเบเบฑเบโเบชเบดเบโเปเบโเบเบตเปโเบเบฐโเบเปเปโเบชเบฐโเบซเบเบฑเบโเบชเบฐโเบซเบเบนเบโเปเบเบปเบฒโเบญเบตเบโเบเปเปโเปเบโ, เปเบฅเบฐโเบกเบฑเบโเปเบเบดเบโเปเบงโเบฅเบฒโเบเบตเปโเบเบฐโเบเปเบฒเบโเบญเบญเบโpyodbc
.
เบกเบฒเปเบเบดเปเบเบชเบดเปเบเบเบตเป Airflow เบเบฐเบเบญเบเบเบฒเบเปเบเปเบเบฝเบเบเบญเบเบซเบเปเบฒเบเบตเปเบเบญเบเบเบงเบเปเบฎเบปเบฒเบเบฑเบ:
เบเปเบฒเบเปเปเบกเบตเบเปเปเบกเบนเบ, เบซเบผเบฑเบเบเบฒเบเบเบฑเปเบเบเปเปเบกเบตเบเบธเบเบเบตเปเบเบฐเบชเบทเบเบเปเป. เปเบเปเบกเบฑเบเบเปเปเปเบเบฑเบเปเบฅเบทเปเบญเบเปเบเบเบเบตเปเบเบฐเบเบดเบเบฒเบฅเบฐเบเบฒเบเบฒเบเบเบทเปเบกเบเปเปเบกเบนเบเบชเบปเบเบเบปเบเบชเปเบฒเปเบฅเบฑเบ. เปเบเปเบเบตเปเบเปเปเปเบกเปเบเบเบงเบฒเบกเบเบดเบเบเบฒเบ. เบญเบฒ-เบญเบฒ-เบญเบฒ, เบเบฐเปเบฎเบฑเบเปเบเบงเปเบ?! เปเบฅเบฐเบเบตเปเปเบกเปเบเบชเบดเปเบเบเบตเป:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException
เบเบฐเบเบญเบ Airflow เบงเปเบฒเบเปเปเบกเบตเบเปเปเบเบดเบเบเบฒเบ, เปเบเปเบเบงเบเปเบฎเบปเบฒเบเปเบฒเบกเบซเบเปเบฒเบงเบฝเบ. เบเบฒเบเปเบเปเบเบญเบเบเบฐเบเปเปเบกเบตเบชเบตเปเบซเบผเปเบฝเบกเบชเบตเบเบฝเบงเบซเบผเบทเบชเบตเปเบเบ, เปเบเปเบชเบตเบเบปเบง.
เปเบซเปเบเบดเปเบกเบเปเปเบกเบนเบเบเบญเบเบเบงเบเปเบฎเบปเบฒ เบซเบผเบฒเบเบเบฑเบ:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])
เบเบท:
- เบเบฒเบเบเปเปเบกเบนเบเบเบตเปเบเบงเบเปเบฎเบปเบฒเปเบญเบปเบฒเบเปเบฒเบชเบฑเปเบ,
- ID เบเบญเบเบเบญเบเบเบฐเบเบธเบกเบเปเปเบฒเบเปเบงเบกเบเบญเบเบเบงเบเปเบฎเบปเบฒ (เบกเบฑเบเบเบฐเปเบเบเบเปเบฒเบเบเบฑเบ เบชเปเบฒเบฅเบฑเบเบเบธเบเปเบงเบฝเบเบเบฒเบ),
- A hash เบเบฒเบเปเบซเบผเปเบเปเบฅเบฐเบเปเบฒเบชเบฑเปเบ ID - เบเบฑเปเบเบเบฑเปเบเปเบเบเบฒเบเบเปเปเบกเบนเบเบชเบธเบเบเปเบฒเบ (เบเปเบญเบเบเบตเปเบเบธเบเบชเบดเปเบเบเบธเบเบขเปเบฒเบเปเบกเปเบ poured เปเบเบปเปเบฒเปเบเปเบเบเบฒเบเบฐเบฅเบฒเบเบซเบเบถเปเบ) เบเบงเบเปเบฎเบปเบฒเบกเบต ID เบเปเบฒเบชเบฑเปเบเปเบเบฑเบเปเบญเบเบฐเบฅเบฑเบ.
เบเบฑเปเบเบเบญเบเบชเบธเบเบเปเบฒเบเบเบฑเบเบเบปเบเบขเบนเป: เปเบญเบปเบฒเบเบธเบเบชเบดเปเบเบเบธเบเบขเปเบฒเบเปเบเบปเปเบฒเปเบเปเบ Vertica. เปเบฅเบฐ, oddly เบเบฝเบเบเป, เบซเบเบถเปเบเปเบเบงเบดเบเบตเบเบตเปเบซเบเปเบฒเบเบฐเบเบฑเบเปเบเบเบตเปเบชเบธเบเปเบฅเบฐเบเบฐเบชเบดเบเบเบดเบเบฒเบเบเบตเปเบเบฐเปเบฎเบฑเบเบเบตเปเปเบกเปเบเบเปเบฒเบ CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
- เบเบงเบเปเบฎเบปเบฒเบชเปเบฒเบเปเบเบทเปเบญเบเบฎเบฑเบเบเบดเปเบชเบ
StringIO
. pandas
เบเปเบงเบเบเบงเบฒเบกเบเบฐเบฅเบธเบเบฒเบเบฐเปเบฎเบฑเบเปเบซเปเบเบงเบเปเบฎเบปเบฒDataFrame
เปเบเบฑเบCSV
- เบชเบฒเบ.- เปเบซเปเปเบเบตเบเบเบฒเบเปเบเบทเปเบญเบกเบเปเปเบเบฑเบ Vertica เบเบตเปเบเบงเบเปเบฎเบปเบฒเบกเบฑเบเบเปเบงเบ hook.
- เปเบฅเบฐเปเบเบเบฑเบเบเบธเบเบฑเบเบเปเบงเบเบเบฒเบเบเปเบงเบเปเบซเบผเบทเบญ
copy()
เบชเบปเปเบเบเปเปเบกเบนเบเบเบญเบเบเบงเบเปเบฎเบปเบฒเปเบเบเบเบปเบเบซเบฒ Vertika!
เบเบงเบโเปเบฎเบปเบฒโเบเบฐโเปเบญเบปเบฒโเบเบฒเบโเบเบปเบโเบเบฑเบโเบงเปเบฒโเบกเบตโเปเบเบงโเปเบโเบเบตเปโเปเบเบฑเบกโเปเบโ, เปเบฅเบฐโเบเบญเบโเบเบนเปโเบเบฑเบโเบเบฒเบโเบเบญเบโเบเบฐโเบเบธเบกโเบงเปเบฒโเบเบธเบโเบชเบดเปเบโเบเบธเบโเบขเปเบฒเบโเปเบกเปเบ OKโ:
session.loaded_rows = cursor.rowcount
session.successful = True
เบซเบกเบปเบโเปเบเบปเปเบฒโเบเบตเป.
เปเบเบเบฒเบเบเบฒเบ, เบเบงเบเปเบฎเบปเบฒเบชเปเบฒเบเปเบเปเบเปเบเบปเปเบฒเบซเบกเบฒเบเบเปเบงเบเบเบปเบเปเบญเบ. เบเบตเปเบเบตเปเบเปเบญเบเบญเบฐเบเบธเบเบฒเบเปเบซเปเบเบปเบเปเบญเบเบกเบตเปเบเบทเปเบญเบเบเบฑเบเบเบฐเบซเบเบฒเบเบเปเบญเบ:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
เบเปเบญเบเบเปเบฒเบฅเบฑเบเปเบเป
VerticaOperator()
เบเปเบญเบเบชเปเบฒเบ schema เบเบฒเบเบเปเปเบกเบนเบเปเบฅเบฐเบเบฒเบเบฐเบฅเบฒเบ (เบเปเบฒเบเบงเบเปเบเบปเบฒเบเปเปเบกเบตเบขเบนเปเปเบฅเปเบง, เปเบเปเบเบญเบ). เบชเบดเปเบเบเบตเป เบชเบณ เบเบฑเบเปเบกเปเบเบเบฒเบเบเบฑเบเปเบเบเบเบฒเบเปเบเบดเปเบเบเบฒเบญเบฒเปเบชเบขเปเบฒเบเบเบทเบเบเปเบญเบ:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> load
Summing up
- เบเบต, - เบซเบเบนเบเปเบญเบเปเบงเบปเปเบฒเบงเปเบฒ, - เบเปเปเปเบกเปเบเบกเบฑเบ, เปเบเบเบฑเบเบเบธเบเบฑเบ
เปเบเบปเปเบฒเปเบฑเปเบเปเบเบเปเบงเปเบฒเบเปเบญเบเปเบเบฑเบเบชเบฑเบเบเบตเปเบเบตเปเบฎเปเบฒเบเบเบตเปเบชเบธเบเปเบเบเปเบฒ?
Julia Donaldson, The Gruffalo
เบเปเบญเบเบเบดเบเบงเปเบฒเบเปเบฒเปเบเบทเปเบญเบเบฎเปเบงเบกเบเบฒเบเบเบญเบเบเปเบญเบเปเบฅเบฐเบเปเบญเบเบกเบตเบเบฒเบเปเบเปเบเบเบฑเบ: เบเบนเปเบเบตเปเบเบฐเบชเปเบฒเบเปเบฅเบฐเปเบเบตเบเบเบปเบงเบเบฐเบเบงเบเบเบฒเบ ETL เบขเปเบฒเบเปเบงเบงเบฒเบเบฒเบเบเบธเบเปเบฅเบตเปเบกเบเบปเปเบ: เบเบงเบเปเบเบปเบฒเบเบฑเบ SSIS เปเบฅเบฐเบซเบเบนเปเบฅเบฐเบเปเบญเบเบเบฑเบ Airflow ... เปเบฅเบฐเบซเบผเบฑเบเบเบฒเบเบเบฑเปเบเบเบงเบเปเบฎเบปเบฒเบเบฑเบเบเบฐเบเบฝเบเบเบฝเบเบเบงเบฒเบกเบเปเบฒเบเบเบญเบเบเบฒเบเบเปเบฒเบฅเบธเบเบฎเบฑเบเบชเบฒ ... เบงเปเบฒเบง, เบเปเบญเบเบเบดเบเบงเปเบฒเปเบเบปเปเบฒเบเบฐเปเบซเบฑเบเบเบตเบงเปเบฒเบเปเบญเบเบเบฐเบเบตเบเบงเบเปเบเบปเบฒเปเบเบเบธเบเบเปเบฒเบ!
เบเปเบฒเบซเบฒเบเบงเปเบฒเปเบฅเบฑเบเบเปเบญเบเบซเบผเบฒเบ, เบซเบผเบฑเบเบเบฒเบเบเบฑเปเบ Apache Airflow - เปเบเบเบเบฒเบเบญเบฐเบเบดเบเบฒเบเบเบฐเบเบงเบเบเบฒเบเปเบเบฎเบนเบเปเบเบเบเบญเบเบฅเบฐเบซเบฑเบเปเบเบเบเบฒเบ - เปเบเปเปเบฎเบฑเบเบงเบฝเบเบเบญเบเบเปเบญเบ. เบซเบผเบฒเบ เบชเบฐเบเบงเบเบชเบฐเบเบฒเบ เปเบฅเบฐเบกเบตเบเบงเบฒเบกเบชเบธเบเบซเบผเบฒเบเบเบถเปเบ.
เบเบฒเบเบเบฐเบซเบเบฒเบเบเบตเปเบเปเปเบเปเบฒเบเบฑเบเบเบญเบเบกเบฑเบ, เบเบฑเบเปเบเปเบเปเบเบญเบ plug-ins เปเบฅเบฐ predisposition เบเบฑเบ scalability, เปเบฎเบฑเบเปเบซเปเบเปเบฒเบเบกเบตเปเบญเบเบฒเบเบเบตเปเบเบฐเบเปเบฒเปเบเป Airflow เปเบเปเบเบทเบญเบเบเบธเบเบเบทเปเบเบเบตเป: เปเบเบดเบเปเบกเปเบเบงเปเบฒเปเบเบงเบปเบเบเบญเบเปเบเบฑเบกเบเบญเบเบเบฒเบเปเบเบฑเบเบเปเบฒ, เบเบฒเบเบเบฐเบเบฝเบกเปเบฅเบฐเบเบฒเบเบเบฐเบกเบงเบเบเบปเบเบเปเปเบกเบนเบ, เปเบเบดเบเปเบกเปเบเบงเปเบฒเปเบเบเบฒเบเปเบเบตเบเบเบปเบงเบเบฐเบฅเบงเบ (เปเบ Mars, เบเบญเบ. เบซเบผเบฑเบเบชเบนเบ).
เบชเปเบงเบเบชเบธเบเบเปเบฒเบ, เบเปเปเบกเบนเบเบญเปเบฒเบเบญเบตเบ เปเบฅเบฐเบเปเปเบกเบนเบ
rake เบเบตเปโเบเบงเบโเปเบฎเบปเบฒโเปเบเปโเปเบเบฑเบโเบเปเบฒโเบชเปเบฒโเบฅเบฑเบโเบเปเบฒเบโ
start_date
. เปเบกเปเบเปเบฅเปเบง, เบเบตเปเปเบกเปเบ meme เบเปเบญเบเบเบดเปเบเปเบฅเปเบง. เบเบฒเบเปเบเปเบเบฝเบเบซเบผเบฑเบเบเบญเบ Via Dougstart_date
เบเปเบฒเบเบเบฑเบเปเบปเบ. เปเบเบเบซเบเปเป, เบเปเบฒเบเปเบฒเบเบฅเบฐเบเบธเปเบstart_date
เบงเบฑเบเบเบตเบเบฐเบเบธเบเบฑเบ, เปเบฅเบฐschedule_interval
- เบกเบทเปเบซเบเบถเปเบ, เบซเบผเบฑเบเบเบฒเบเบเบฑเปเบ DAG เบเบฐเปเบฅเบตเปเบกเบเบปเปเบเปเบเบกเบทเปเบญเบทเปเบเบเปเปเบกเบตเบเปเบญเบเบซเบเปเบฒเบเบตเป.start_date = datetime(2020, 7, 7, 0, 1, 2)
เปเบฅเบฐเบเปเปเบกเบตเบเบฑเบเบซเบฒเบซเบผเบฒเบ.
เบกเบตเบเบงเบฒเบกเบเบดเบเบเบฒเบ runtime เบญเบทเปเบเบเบตเปเบเปเบฝเบงเบเปเบญเบเบเบฑเบเบกเบฑเบ:
Task is missing the start_date parameter
, เปเบเบดเปเบเบชเปเบงเบเบซเบผเบฒเบเบกเบฑเบเบเบฐเบเบตเปเปเบซเปเปเบซเบฑเบเบงเปเบฒเปเบเบปเปเบฒเบฅเบทเบกเบเบนเบเบกเบฑเบเบเบฑเบเบเบปเบงเบเบฐเบเบดเบเบฑเบเบเบฒเบ dag.- เบเบฑเบเปเบปเบเปเบเปเบเบทเปเบญเบเบเบฝเบง. เปเบกเปเบเปเบฅเปเบง, เปเบฅเบฐเบเบทเปเบเบเบฒเบ (Airflow เบเบปเบงเบเบญเบเบกเบฑเบเปเบญเบเปเบฅเบฐเบเบฒเบเปเบเบทเบญเบเบเบญเบเบเบงเบเปเบฎเบปเบฒ), เปเบฅเบฐเปเบเบทเปเบญเบเปเบกเปเบเปเบฒเบเปเบงเบฑเบเปเบเบเป, เปเบฅเบฐเบเบฒเบเบฐเบฅเบฒเบ, เปเบฅเบฐเบเบฐเบเบฑเบเบเบฒเบ. เปเบฅเบฐเปเบเบดเบเปเบกเปเบเบงเปเบฒเบกเบฑเบเปเบฎเบฑเบเบงเบฝเบ. เปเบเปเปเบเปเบฅเบเบฐเปเบงเบฅเบฒ, เบเปเบฒเบเบงเบเบซเบเปเบฒเบงเบฝเบเบชเปเบฒเบฅเบฑเบเบเบฒเบเบเปเบฅเบดเบเบฒเบเปเบเบตเปเบกเบเบถเปเบ, เปเบฅเบฐเปเบกเบทเปเบญ PostgreSQL เปเบฅเบตเปเบกเบเบญเบเบชเบฐเบซเบเบญเบเบเบฑเบเบเบฑเบเบชเบฐเบเบตเปเบ 20 s เปเบเบเบเบตเปเบเบฐเปเบเบฑเบ 5 ms, เบเบงเบเปเบฎเบปเบฒเปเบญเบปเบฒเบกเบฑเบเปเบ.
- LocalExecutor. เปเบกเปเบเปเบฅเปเบง, เบเบงเบเปเบฎเบปเบฒเบเบฑเบเบเบฑเปเบเบขเบนเปเปเบเบดเบเบกเบฑเบ, เปเบฅเบฐเบเบงเบเปเบฎเบปเบฒเปเบเปเบกเบฒเบฎเบญเบเบเบญเบเบเบญเบเปเบซเบงเปเบฅเบดเบเปเบฅเปเบง. LocalExecutor เปเบเปเบเบฝเบเบเปเบชเปเบฒเบฅเบฑเบเบเบงเบเปเบฎเบปเบฒเบเบปเบเปเบเบดเบเบเบฐเบเบธเบเบฑเบ, เปเบเปเบเบญเบเบเบตเปเบกเบฑเบเปเบเบดเบเปเบงเบฅเบฒเบเบตเปเบเบฐเบเบฐเบซเบเบฒเบเบเบฑเบเบเบฐเบเบฑเบเบเบฒเบเบขเปเบฒเบเบซเบเปเบญเบเบซเบเบถเปเบเบเบปเบ, เปเบฅเบฐเบเบงเบเปเบฎเบปเบฒเบเบฐเบเปเบญเบเปเบฎเบฑเบเบงเบฝเบเบซเบเบฑเบเปเบเบทเปเบญเบเปเบฒเบเปเบ CeleryExecutor. เปเบฅเบฐเปเบเบเบฑเบเบชเบฐเบเบฐเบเบญเบเบเบงเบฒเบกเบเบดเบเบเบตเปเบงเปเบฒเบเปเบฒเบเบชเบฒเบกเบฒเบเปเบฎเบฑเบเบงเบฝเบเบเบฑเบเบกเบฑเบเบขเบนเปเปเบเปเบเบทเปเบญเบเบเบฝเบง, เบเปเปเบกเบตเบซเบเบฑเบเบขเบธเบเปเบเบปเปเบฒเบเบฒเบเบเบฒเบเปเบเป Celery เปเบเบดเบเปเบกเปเบเบงเปเบฒเบขเบนเปเปเบเปเบเบทเปเบญเบเปเบกเปเบเปเบฒเบ, เปเบเบดเปเบ "เปเบเปเบเบญเบ, เบเบฐเบเปเปเปเบเบปเปเบฒเปเบเปเบเบเบฒเบเบเบฐเบฅเบดเบ, เบเปเบงเบเบเบงเบฒเบกเบเบทเปเบชเบฑเบ!"
- เบเปเปเปเบเป เปเบเบทเปเบญเบโเบกเบทโเปเบโเบเบปเบงโ:
- เบเบฒเบเปเบเบทเปเบญเบกเบเปเป เปเบเบทเปเบญโเปเบเบฑเบโเบฎเบฑเบโเบชเบฒโเปเบโเบขเบฑเปเบโเบขเบทเบโเบเบฒเบโเบเปโเบฅเบดโเบเบฒเบโ,
- SLA Misses เบเบญเบโเบชเบฐโเบซเบเบญเบโเบงเบฝเบโเบเบฒเบโเบเบตเปโเบเปเปโเปเบเปโเบเบฑเบโเบเบฒเบโ,
- xcom เบชเปเบฒเบฅเบฑเบเบเบฒเบเปเบฅเบเบเปเบฝเบ metadata (เบเปเบฒเบเบฐเปเบเบปเปเบฒเปเบงเบปเปเบฒ เปเบกเบเบฒเบเปเปเบกเบนเบ!) เบฅเบฐเบซเบงเปเบฒเบเบงเบฝเบเบเบฒเบ dag.
- เบเบฒเบเบฅเปเบงเบเบฅเบฐเปเบกเบตเบเบเบฒเบเปเบเบชเบฐเบเบต. เบเบต, เบเปเบญเบเบชเบฒเบกเบฒเบเปเบงเบปเปเบฒเบซเบเบฑเบเปเบเป? เบเบฒเบเปเบเบทเบญเบเปเบเปเบเบทเบเบชเปเบฒเบเบเบฑเปเบเบเบถเปเบเบชเปเบฒเบฅเบฑเบเบเบฒเบเบเปเบฒเบเบเบทเบเบเบฑเบเบซเบกเบปเบเบเบญเบเบงเบฝเบเบเบฒเบเบเบตเปเบซเบผเบธเบเบฅเบปเบ. เบเบฝเบงเบเบตเป Gmail เบงเบฝเบเบเบญเบเบเปเบญเบเบกเบต > 90k เบญเบตเปเบกเบงเบเบฒเบ Airflow, เปเบฅเบฐ web mail muzzle เบเบฐเบเบดเปเบชเบเบเบตเปเบเบฐเปเบญเบปเบฒเปเบฅเบฐเบฅเบถเบเบซเบผเบฒเบเบเบงเปเบฒ 100 เบชเบฐเบเบฑเบเบเปเปเบเบฑเปเบ.
เบเบธเบกโเบซเบผเบฒเบโ:
Apache Airflow Pitfails
เปเบเบทเปเบญเบเบกเบทเบญเบฑเบเบเบฐเปเบเบกเบฑเบเปเบเบตเปเบกเปเบเบตเบก
เปเบเบทเปเบญเปเบซเปเบเบงเบเปเบฎเบปเบฒเปเบฎเบฑเบเบงเบฝเบเบซเบผเบฒเบเบเบถเปเบเบเปเบงเบเบซเบปเบงเบเบญเบเบเบงเบเปเบฎเบปเบฒเปเบฅเบฐเบเปเปเปเบกเปเบเบเปเบงเบเบกเบทเบเบญเบเบเบงเบเปเบฎเบปเบฒ, Airflow เปเบเปเบเบฐเบเบฝเบกเบชเปเบฒเบฅเบฑเบเบเบงเบเปเบฎเบปเบฒเบเบตเป:
REST API - เบฅเบฒเบงเบเบฑเบเบกเบตเบชเบฐเบเบฒเบเบฐเบเบญเบ Experimental, เปเบเบดเปเบเบเปเปเปเบเปเบเปเบญเบเบเบฑเบเบเปเปเปเบซเปเบฅเบฒเบงเปเบฎเบฑเบเบงเบฝเบ. เบเปเบงเบเบกเบฑเบ, เบเปเบฒเบเบเปเปเบเบฝเบเปเบเปเบชเบฒเบกเบฒเบเปเบเปเบฎเบฑเบเบเปเปเบกเบนเบเบเปเบฝเบงเบเบฑเบ dags เปเบฅเบฐเบงเบฝเบเบเบฒเบ, เปเบเปเบเบฑเบเบขเบธเบ / เปเบฅเบตเปเบกเบเบปเปเบ dag, เบชเปเบฒเบ DAG Run เบซเบผเบทเบชเบฐเบเบธเบเปเบเบต.CLI - เปเบเบทเปเบญเบเบกเบทเบเปเบฒเบเบงเบเบซเบผเบฒเบเบชเบฒเบกเบฒเบเปเบเปเปเบเปเบเปเบฒเบเปเบชเบฑเปเบเบเปเบฒเบชเบฑเปเบเบเบตเปเบเปเปเบเบฝเบเปเบเปเบเปเปเบชเบฐเบเบงเบเปเบเบเบฒเบเบเปเบฒเปเบเปเบเปเบฒเบ WebUI, เปเบเปเปเบเบเบเบปเปเบงเปเบเปเบฅเปเบงเปเบกเปเบเบเปเปเบกเบต. เบเบปเบโเบเบปเบงโเบขเปเบฒเบ:backfill
เบเปเบญเบเบเบฒเบเปเบเบทเปเบญเบเบดเบเปเบเบตเบเปเปเบฒเบงเบฝเบเบเบทเบเปเปเป.
เบเบปเบงเบขเปเบฒเบ, เบเบฑเบเบงเบดเปเบเบฒเบฐเบกเบฒเปเบฅเบฐเปเบงเบปเปเบฒเบงเปเบฒ: "เปเบฅเบฐเบเปเบฒเบ, comrade, เบกเบตเบเบงเบฒเบกเบเปเปเบกเบตเปเบซเบเบเบปเบเปเบเบเปเปเบกเบนเบเบเบฒเบเบงเบฑเบเบเบต 1 เบซเบฒ 13 เบกเบฑเบเบเบญเบ! เปเบเปเปเบ, เปเบเปเปเบ, เปเบเปเปเบ, เปเบเปเปเบเบกเบฑเบ!" เปเบฅเบฐเบเปเบฒเบเปเบเบฑเบ hob เบเบฑเปเบเบเปเบฒเบง:airflow backfill -s '2020-01-01' -e '2020-01-13' orders
- เบเปเบฅเบดเบเบฒเบเบเบทเปเบเบเบฒเบ:
initdb
,resetdb
,upgradedb
,checkdb
. run
, เปเบเบดเปเบเบญเบฐเบเบธเบเบฒเบเปเบซเปเบเปเบฒเบเบชเบฒเบกเบฒเบเบเปเบฒเปเบเบตเบเบงเบฝเบเบเบฒเบเบเบปเบงเบขเปเบฒเบเบซเบเบถเปเบ, เปเบฅเบฐเปเบกเปเบเบฐเบเบฑเปเบเบเบฐเปเบเบเบเปเบฝเบงเบเบฑเบเบเบฒเบเบเบถเปเบเบเบฑเบเบเบฑเบเบซเบกเบปเบ. เบเบดเปเบเปเบเบเบงเปเบฒเบเบฑเปเบ, เบเปเบฒเบเบชเบฒเบกเบฒเบเบเปเบฒเปเบเบตเบเบเบฒเบเบเปเบฒเบLocalExecutor
, เปเบเบดเบเปเบกเปเบเบงเปเบฒเบเปเบฒเบเบกเบตเบเบธเปเบก Celery.- เปเบฎเบฑเบเบชเบดเปเบเบเบฝเบงเบเบฑเบเบซเบผเบฒเบ
test
, เบเบฝเบเปเบเปเบเบฑเบเบขเบนเปเปเบเบเบฒเบเบเบฝเบเบเปเปเบกเบตเบซเบเบฑเบ. connections
เบญเบฐเบเบธเบเบฒเบเปเบซเปเบเบฒเบเบชเปเบฒเบเบกเบฐเบซเบฒเบเบปเบเบเบญเบเบเบฒเบเปเบเบทเปเบญเบกเบเปเปเบเบฒเบเปเบเบฐ.
python api - เปเบเบฑเบเบงเบดเบเบตเบเบตเปเบเบฒเบเบซเบผเบฒเบเบเบญเบเบเบฒเบเปเบเปเบเบญเบ, เปเบเบดเปเบเบกเบตเบเบธเบเบเบฐเบชเบปเบเบชเปเบฒเบฅเบฑเบ plugins, เปเบฅเบฐเบเปเป swarming เปเบเบกเบฑเบเบเปเบงเบเบกเบทเบเบฝเบเปเบฅเบฑเบเบเปเบญเบ. เปเบเปโเปเบโเบเบฐโเบขเบธเบโเบเบงเบโเปเบฎเบปเบฒโเบเบฒเบโเบเบฒเบโเปเบ/home/airflow/dags
, เปเบฅเปเบipython
เปเบฅเบฐเปเบฅเบตเปเบก messing เบเบฐเบกเบฒเบ? เบเปเบฒเบเบชเบฒเบกเบฒเบ, เบชเปเบฒเบฅเบฑเบเบเบฒเบเบเบปเบเบเบปเบงเบขเปเบฒเบ, เบชเบปเปเบเบญเบญเบเบเบฒเบเปเบเบทเปเบญเบกเบเปเปเบเบฑเบเบซเบกเบปเบเบเบตเปเบกเบตเบฅเบฐเบซเบฑเบเบเบฑเปเบเบเปเปเปเบเบเบตเป:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d)
- เบเบณเบฅเบฑเบเปเบเบทเปเบญเบกเบเปเปเบเบฑเบเบเบฒเบเบเปเปเบกเบนเบเปเบกเบเบฒเบเบญเบ Airflow. เบเปเบญเบเบเปเปเปเบเบฐเบเปเบฒเปเบซเปเบเบฝเบเบกเบฑเบ, เปเบเปเบเบฒเบเปเบเปเบฎเบฑเบเบฅเบฑเบเบงเบฝเบเบชเปเบฒเบฅเบฑเบ metrics เบชเบฐเปเบเบฒเบฐเบเปเบฒเบเปเบชเบฒเบกเบฒเบเปเบงเปเบฅเบฐเบเปเบฒเบเบเบงเปเบฒเบเปเบฒเบ APIs เปเบ.
เปเบซเปเปเบงเบปเปเบฒเบงเปเบฒเบเปเปเปเบกเปเบเบงเบฝเบเบเบฒเบเบเบญเบเบเบงเบเปเบฎเบปเบฒเบเบฑเบเบซเบกเบปเบเปเบกเปเบเบเปเปเบกเบตเบเบงเบฒเบกเปเบเบฑเปเบกเปเบเบ, เปเบเปเบเบฒเบเบเบฑเปเบเบเบงเบเปเบเบปเบฒเบชเบฒเบกเบฒเบเบฅเบปเปเบกเบฅเบปเบ, เปเบฅเบฐเบเบตเปเปเบกเปเบเปเบฅเบทเปเบญเบเบเบปเบเบเบฐเบเบด. เปเบเปเบเบฒเบเบเบฑเบเบเบงเบฒเบเบเปเบฒเบเบงเบเบซเบเปเบญเบเปเบกเปเบเบชเบปเบเปเบชเปเบฅเปเบง, เปเบฅเบฐเบกเบฑเบเบเปเบฒเปเบเบฑเบเบเปเบญเบเบเบงเบเปเบเบดเปเบ.
เบฅเบฐเบงเบฑเบ SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
เปเบญเบเบฐเบชเบฒเบ
เปเบฅเบฐเปเบเปเบเบญเบ, เบชเบดเบเปเบเบทเปเบญเบกเบเปเปเบเปเบฒเบญเบดเบเบเบฒเบเบเบฒเบเบญเบญเบเบเบญเบ Google เปเบกเปเบเปเบเบทเปเบญเปเบเบเบญเบเปเบเปเบเบต Airflow เบเบฒเบ bookmarks เบเบญเบเบเปเบญเบ.
เปเบญเบเบฐเบชเบฒเบ Apache Airflow - เปเบเปเบเบญเบ, เบเบงเบเปเบฎเบปเบฒเบเปเบญเบเปเบฅเบตเปเบกเบเบปเปเบเบเปเบงเบเบซเปเบญเบเบเบฒเบ. เปเบญเบเบฐเบชเบฒเบ, เปเบเปเบงเปเบฒเปเบเบญเปเบฒเบเบเปเบฒเปเบเบฐเบเปเบฒ?เบเบฒเบโเบเบฐโเบเบดโเบเบฑเบโเบเบตเปโเบเบตโเบเบตเปโเบชเบธเบ - เบเบต, เบขเปเบฒเบเบซเบเปเบญเบเบญเปเบฒเบเบเปเบฒเปเบเบฐเบเปเบฒเบเบฒเบเบเบนเปเบชเปเบฒเบ.UI Airflow - เบเบธเบโเปเบฅเบตเปเบกโเบเบปเปเบโ: เบเบฒเบโเปเบเปโเบเบญเบโเบเบนเปโเปเบเปโเปเบโเบฎเบนเบโเบเบฒเบโเบเบงเบฒเบกเปเบเบปเปเบฒเปเบเปเบเบงเบเบงเบฒเบกเบเบดเบเบเบตเปเบชเปเบฒเบเบฑเบเบเบญเบ Apache Airflow - เปเบเบงเบเบงเบฒเบกเบเบดเบเบเบทเปเบเบเบฒเบเปเบเปเบเบทเบเบญเบฐเบเบดเบเบฒเบเปเบเปเบเบต, เบเปเบฒ (เบเบฑเบเบเบตเบเบฑเบเปเบ!) เปเบเบปเปเบฒเบเปเปเปเบเบปเปเบฒเปเบเบเบฒเบเบชเบดเปเบเบเบฒเบเบขเปเบฒเบเบเบฒเบเบเปเบญเบ.Blog เบเบญเบ Tianlong โ เบเบนเปเบกเบทเบเปเบฝเบงเบเบฑเบเบงเบดเบเบตเบเบฒเบเบชเปเบฒเบ Airflow Server/Cluster - เบเบนเปเบกเบทเบชเบฑเปเบเปเบชเปเบฒเบฅเบฑเบเบเบฒเบเบเบฑเปเบเบเบธเปเบก Airflow.เปเบฅเปเบ Apache Airflow เบขเบนเป Lyft - เปเบเบทเบญเบเบเบปเบเบเบงเบฒเบกเบเบตเปเบซเบเปเบฒเบชเบปเบเปเบเบเบฝเบงเบเบฑเบ, เบเบปเบเปเบงเบฑเปเบเบเบฒเบเบเบตเบญเบฒเบเปเบเบฑเบเบเบฒเบเบเบฒเบเบซเบผเบฒเบ, เปเบฅเบฐเบเบปเบงเบขเปเบฒเบเบซเบเปเบญเบ.เบงเบดเบเบตเบเบฒเบ Apache Airflow เปเบเบเบขเบฒเบเบงเบฝเบเบเปเบฝเบงเบเบฑเบเบเบฐเบเบฑเบเบเบฒเบ Celery โ เบเปเบฝเบงโเบเบฑเบโเบเบฒเบโเปเบฎเบฑเบโเบงเบฝเบโเบฎเปเบงเบกโเบเบฑเบ Celeryโ.DAG เบเบฝเบเบเบฒเบเบเบฐเบเบดเบเบฑเบเบเบตเปเบเบตเบเบตเปเบชเบธเบเปเบ Apache Airflow - เบเปเบฝเบงเบเบฑเบ ideempotency เบเบญเบเบงเบฝเบเบเบฒเบ, เบเบฒเบเปเบซเบผเบเปเบเบ ID เปเบเบเบเบตเปเบเบฐเปเบเบฑเบเบงเบฑเบเบเบต, เบเบฒเบเบซเบฑเบเปเบเบฑเบ, เปเบเบเบชเปเบฒเบเปเบเบฅเปเปเบฅเบฐเบชเบดเปเบเบเบตเปเบซเบเปเบฒเบชเบปเบเปเบเบญเบทเปเบเป.เบเบฒเบเบเบธเปเบกเบเบญเบเบเบฒเบเปเบเบดเปเบเบเบฒเบญเบฒเปเบชเปเบ Apache Airflow - เบเบงเบฒเบกเปเบเบดเปเบเบเบฒเบญเบฒเปเบชเบเบญเบเบงเบฝเบเบเบฒเบเปเบฅเบฐเบเบปเบเบฅเบฐเบเบฝเบเบเบปเบเบเบฐเบเบปเบเบเปเป, เบเบตเปเบเปเบฒเบเบฐเปเบเบปเปเบฒเปเบเปเบเปเบฒเบงเปเบเบดเบเบเบฝเบเปเบเปเปเบเบเบฒเบเบเปเบฒเบเบเบญเบ.Airflow: เปเบกเบทเปเบญ DAG เบเบญเบเปเบเบปเปเบฒเบขเบนเปเปเบเบซเบฅเบฑเบเบเบฒเบเบฐเบฅเบฒเบ - เบงเบดเบเบตเบเบฒเบเปเบญเบปเบฒเบเบฐเบเบฐเบเบฒเบ "เปเบฎเบฑเบเบงเบฝเบเบเบฒเบกเบเบธเบเบเบฐเบชเบปเบ" เปเบเบเบปเบงเบเปเบฒเบเบปเบเปเบงเบฅเบฒ, เปเบซเบผเบเบเปเปเบกเบนเบเบชเบนเบเปเบชเบเปเบฅเบฐเบเบฑเบเบฅเปเบฒเบเบฑเบเบเบงเบฒเบกเบชเปเบฒเบเบฑเบเบเบญเบเบงเบฝเบเบเบฒเบ.เบเปเบฒเบเบฒเบก SQL เบเบตเปเปเบเบฑเบเบเบฐเปเบซเบเบเบชเปเบฒเบฅเบฑเบ Apache Airflow โ เบเปเบฒเบเบฒเบก SQL เบเบตเปเปเบเบฑเบเบเบฐเปเบซเบเบเบเปเป metadata เบเบญเบ Airflow.เปเบฅเบตเปเบกเบเบปเปเบเบเบฑเบเบเบฐเบเบฒเบเบฑเปเบเบเบญเบเบเบฒเบเปเบฎเบฑเบเบงเบฝเบเบเปเบงเบ Apache Airflow - เบกเบตเบชเปเบงเบเบเบตเปเปเบเบฑเบเบเบฐเปเบซเบเบเบเปเบฝเบงเบเบฑเบเบเบฒเบเบชเปเบฒเบเปเบเบฑเบเปเบเบตเบเบตเปเบเปเบฒเบซเบเบปเบเปเบญเบ.เบเบฒเบเบชเปเบฒเบ Fetchr Data Science Infra เปเบเบดเบ AWS เบเปเบงเบ Presto เปเบฅเบฐ Airflow โ เบเปเปเบชเบฑเบเปเบเบเบชเบฑเปเบเปเบเบตเปเปเปเบฒเบชเบปเบเปเบเบเปเบฝเบงเบเบฑเบเบเบฒเบเบชเปเบฒเบเปเบเบเบชเปเบฒเบเบเบทเปเบเบเบฒเบเปเบ AWS เบชเปเบฒเบฅเบฑเบเบงเบดเบเบฐเบเบฒเบชเบฒเบเบเปเปเบกเบนเบ.7 เบเบงเบฒเบกเบเบดเบเบเบฒเบเบเบปเปเบงเปเบเบเบตเปเบเบฐเบเบงเบเบชเบญเบเปเบเปเบงเบฅเบฒเบเบตเป Debugging Airflow DAGs - เบเบงเบฒเบกเบเบดเบเบเบฒเบเบเบปเปเบงเปเบ (เปเบเปเบงเบฅเบฒเบเบตเปเบเบนเปเปเบเบเบนเปเบซเบเบถเปเบเบเบฑเบเบเปเปเปเบเปเบญเปเบฒเบเบเปเบฒเปเบเบฐเบเปเบฒ).เปเบเบฑเบเบฎเบฑเบเบชเบฒเปเบฅเบฐเปเบเบปเปเบฒเปเบเบดเบเบฅเบฐเบซเบฑเบเบเปเบฒเบเปเบเบเปเบเป Apache Airflow - เบเบดเปเบกเบงเบดเบเบตเบเบตเปเบเบปเบเปเบฎเบปเบฒเปเบเบฑเบเบฅเบฐเบซเบฑเบเบเปเบฒเบ, เปเบเบดเบเปเบกเปเบเบงเปเบฒเบเปเบฒเบเบเบฝเบเปเบเปเบชเบฒเบกเบฒเบเปเบเปเบเบฒเบเปเบเบทเปเบญเบกเบเปเป.Zen เบเบญเบ Python เปเบฅเบฐ Apache Airflow - เบเบฒเบเบชเบปเปเบเบเปเป DAG implicit, context throwing เปเบเบซเบเปเบฒเบเบตเป, เบญเบตเบเปเบเบทเปเบญเบซเบเบถเปเบเบเปเบฝเบงเบเบฑเบเบเบฒเบเบเบถเปเบเบเบฑเบ, เปเบฅเบฐเบเบฑเบเบเปเบฝเบงเบเบฑเบเบเบฒเบเบเปเบฒเบกเบเบฒเบเปเบเบตเบเบเบปเบงเบงเบฝเบเบเบฒเบ.Airflow: เบเปเบฒเปเบเบฐเบเปเบฒเบเบตเปเบฎเบนเปเบเบฑเบเบซเบเปเบญเบ, เปเบเบฑเบเบฅเบฑเบ, เปเบฅเบฐเบเบฒเบเบเบฐเบเบดเบเบฑเบเบเบตเปเบเบตเบเบตเปเบชเบธเบ - เบเปเบฝเบงโเบเบฑเบโเบเบฒเบโเบเปเบฒโเปเบเปโdefault arguments
ะธparams
เปเบเปเบกเปเปเบเบ, เปเบเบฑเปเบเบเบฝเบงเบเบฑเบเบเบฑเบเบเบปเบงเปเบเปเบฅเบฐเบเบฒเบเปเบเบทเปเบญเบกเบเปเป.เบเบฒเบเบชเปเบฒเบเบเปเปเบกเบนเบเบเบฒเบเบฐเบฅเบฒเบเบเบฒเบเปเบซเบผเบงเบฝเบเบเบญเบเบญเบฒเบเบฒเบ - เปเบฅเบทเปเบญเบเบเปเบฝเบงเบเบฑเบเบงเบดเบเบตเบเบตเปเบเบนเปเบงเบฒเบเปเบเบเบเปเบฒเบฅเบฑเบเบเบฐเบเบฝเบกเบชเปเบฒเบฅเบฑเบ Airflow 2.0.Apache Airflow เบเบฑเบ 3 เบเบปเบเบเบฒเบ Celery เปเบ docker-compose - เบเบปเบเบเบงเบฒเบกเบเบตเปเบฅเปเบฒเบชเบฐเปเบซเบกเปเบฅเบฑเบเบเปเบญเบเบเปเบฝเบงเบเบฑเบเบเบฒเบเบเปเบฒเบเบธเปเบกเบเบญเบเบเบงเบเปเบฎเบปเบฒเปเบเบปเปเบฒเปเบdocker-compose
.4 Templating Tasks เปเบเบเปเบเป Airflow Context - เบงเบฝเบโเบเบฒเบโเปเบเบทเปเบญเบโเปเบซเบงโเปเบเบโเบเปเบฒโเปเบเปโเปเบกเปโเปเบเบโเปเบฅเบฐโเบเบฒเบโเบชเบปเปเบโเบเปเปโเบชเบฐโเบเบฒเบโเบเบฒเบโ.เบเบฒเบเปเบเปเบเปเบเบทเบญเบเบเบงเบฒเบกเบเบดเบเบเบฒเบเปเบ Airflow โ เบเบฒเบโเปเบเปเบโเบเบฒเบโเบกเบฒเบโเบเบฐโเบเบฒเบโเปเบฅเบฐโเบเปเบฒโเบเบปเบโเปเบญเบโเปเบเบ mail เปเบฅเบฐ Slackโ.Airflow Workshop: DAGs เบชเบฐเบฅเบฑเบเบชเบฑเบเบเปเบญเบเปเบเบเบเปเปเบกเบตเปเบกเปเบเปเบญเบเปเบเบปเปเบฒ - เบชเบฒเบเบฒเบงเบฝเบ, เบกเบฐเบซเบฒเบเบฒเบ เปเบฅเบฐ XCom.
เปเบฅเบฐเบเบฒเบเปเบเบทเปเบญเบกเบเปเปเบเบตเปเปเบเปเปเบเบเบปเบเบเบงเบฒเบก:
เบเบฒเบเบญเปเบฒเบเบญเบตเบเบกเบฐเบซเบฒเบเบฒเบ - placeholder เบกเบตโเบชเปเบฒโเบฅเบฑเบโเบเบฒเบโเบเปเบฒโเปเบเปโเปเบโเปเบกเปโเปเบเบโ.เบเบธเบกโเบเบปเปเบงโเปเบโเบเบฒเบโเปเบซเบผโเบเบญเบโเบญเบฒโเบเบฒเบ - เบเบงเบฒเบกเบเบดเบเบเบฒเบเบเบปเปเบงเปเบเปเบเปเบงเบฅเบฒเบชเปเบฒเบ dags.puckel/docker-airflow: Docker Apache Airflow -docker-compose
เบชเปเบฒเบฅเบฑเบเบเบฒเบเบเบปเบเบฅเบญเบ, debugging เปเบฅเบฐเบญเบทเปเบเป.python-telegram-bot/python-telegram-bot: เบเบงเบเปเบฎเบปเบฒเปเบเปเบเบฑเปเบเปเบซเปเบเปเบฒเบเปเบเบฑเบ wrapper เบเบตเปเบเปเบฒเบเบเปเปเบชเบฒเบกเบฒเบเบเบฐเบเบดเปเบชเบเปเบเป. - Python wrapper เบชเปเบฒเบฅเบฑเบ Telegram REST API.
เปเบซเบผเปเบเบเปเปเบกเบนเบ: www.habr.com