Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

์•ˆ๋…•ํ•˜์„ธ์š”, ์ €๋Š” Vezet ๊ทธ๋ฃน ํšŒ์‚ฌ ๋ถ„์„ ๋ถ€์„œ์˜ ๋ฐ์ดํ„ฐ ์—”์ง€๋‹ˆ์–ด์ธ Dmitry Logvinenko์ž…๋‹ˆ๋‹ค.

ETL ํ”„๋กœ์„ธ์Šค ๊ฐœ๋ฐœ์„ ์œ„ํ•œ ํ›Œ๋ฅญํ•œ ๋„๊ตฌ์ธ Apache Airflow์— ๋Œ€ํ•ด ๋ง์”€๋“œ๋ฆฌ๊ฒ ์Šต๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ Airflow๋Š” ๋งค์šฐ ๋‹ค์žฌ๋‹ค๋Šฅํ•˜๊ณ  ๋‹ค๋ฉด์ ์ด๋ฏ€๋กœ ๋ฐ์ดํ„ฐ ํ๋ฆ„์— ๊ด€์—ฌํ•˜์ง€ ์•Š๋”๋ผ๋„ ์ฃผ๊ธฐ์ ์œผ๋กœ ํ”„๋กœ์„ธ์Šค๋ฅผ ์‹œ์ž‘ํ•˜๊ณ  ์‹คํ–‰์„ ๋ชจ๋‹ˆํ„ฐ๋งํ•ด์•ผ ํ•˜๋Š” ๊ฒฝ์šฐ์—๋„ ์ž์„ธํžˆ ์‚ดํŽด๋ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

๊ทธ๋ฆฌ๊ณ  ์˜ˆ, ๋งํ• ๋ฟ๋งŒ ์•„๋‹ˆ๋ผ ๋ณด์—ฌ์ค„ ๊ฒƒ์ž…๋‹ˆ๋‹ค. ํ”„๋กœ๊ทธ๋žจ์—๋Š” ๋งŽ์€ ์ฝ”๋“œ, ์Šคํฌ๋ฆฐ ์ƒท ๋ฐ ๊ถŒ์žฅ ์‚ฌํ•ญ์ด ์žˆ์Šต๋‹ˆ๋‹ค.

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ
Google์—์„œ Airflow๋ผ๋Š” ๋‹จ์–ด๋ฅผ ๊ฒ€์ƒ‰ํ•˜๋ฉด ์ผ๋ฐ˜์ ์œผ๋กœ ํ‘œ์‹œ๋˜๋Š” ๋‚ด์šฉ / Wikimedia Commons

์ฐจ๋ก€

์†Œ๊ฐœ

Apache Airflow๋Š” Django์™€ ๊ฐ™์Šต๋‹ˆ๋‹ค.

  • ํŒŒ์ด์ฌ์œผ๋กœ ์ž‘์„ฑ
  • ํ›Œ๋ฅญํ•œ ๊ด€๋ฆฌ์ž ํŒจ๋„์ด ์žˆ์Šต๋‹ˆ๋‹ค.
  • ๋ฌดํ•œ ํ™•์žฅ ๊ฐ€๋Šฅ

- ๋‹จ์ง€ ๋” ๋‚ซ๊ณ  ์™„์ „ํžˆ ๋‹ค๋ฅธ ๋ชฉ์ ์„ ์œ„ํ•ด ๋งŒ๋“ค์–ด์กŒ์Šต๋‹ˆ๋‹ค.

  • ๋ฌด์ œํ•œ ์‹œ์Šคํ…œ์—์„œ ์ž‘์—… ์‹คํ–‰ ๋ฐ ๋ชจ๋‹ˆํ„ฐ๋ง(Celery/Kubernetes ๋ฐ ์–‘์‹ฌ์ด ํ—ˆ์šฉํ•˜๋Š” ๋งŒํผ)
  • ๋งค์šฐ ์‰ฝ๊ฒŒ ์ž‘์„ฑํ•˜๊ณ  ์ดํ•ดํ•  ์ˆ˜ ์žˆ๋Š” Python ์ฝ”๋“œ์—์„œ ๋™์  ์›Œํฌํ”Œ๋กœ ์ƒ์„ฑ
  • ๊ธฐ์„ฑํ’ˆ ๊ตฌ์„ฑ ์š”์†Œ์™€ ์ง‘์—์„œ ๋งŒ๋“  ํ”Œ๋Ÿฌ๊ทธ์ธ์„ ๋ชจ๋‘ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์™€ API๋ฅผ ์„œ๋กœ ์—ฐ๊ฒฐํ•˜๋Š” ๊ธฐ๋Šฅ(๋งค์šฐ ๊ฐ„๋‹จํ•จ).

๋‹ค์Œ๊ณผ ๊ฐ™์ด Apache Airflow๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

  • ์šฐ๋ฆฌ๋Š” DWH ๋ฐ ODS(Vertica ๋ฐ Clickhouse๊ฐ€ ์žˆ์Œ)์˜ ๋‹ค์–‘ํ•œ ์†Œ์Šค(๋งŽ์€ SQL Server ๋ฐ PostgreSQL ์ธ์Šคํ„ด์Šค, ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ๋ฉ”ํŠธ๋ฆญ์ด ์žˆ๋Š” ๋‹ค์–‘ํ•œ API, ์‹ฌ์ง€์–ด 1C)์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ง‘ํ•ฉ๋‹ˆ๋‹ค.
  • ์–ผ๋งˆ๋‚˜ ๋ฐœ์ „ํ–ˆ๋Š”์ง€ cron, ODS์—์„œ ๋ฐ์ดํ„ฐ ํ†ตํ•ฉ โ€‹โ€‹ํ”„๋กœ์„ธ์Šค๋ฅผ ์‹œ์ž‘ํ•˜๊ณ  ์œ ์ง€ ๊ด€๋ฆฌ๋„ ๋ชจ๋‹ˆํ„ฐ๋งํ•ฉ๋‹ˆ๋‹ค.

์ตœ๊ทผ๊นŒ์ง€ ์šฐ๋ฆฌ์˜ ์š”๊ตฌ ์‚ฌํ•ญ์€ 32์ฝ”์–ด์™€ 50GB RAM์ด ์žˆ๋Š” ํ•˜๋‚˜์˜ ์ž‘์€ ์„œ๋ฒ„๋กœ ์ฒ˜๋ฆฌ๋˜์—ˆ์Šต๋‹ˆ๋‹ค. Airflow์—์„œ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ž‘๋™ํ•ฉ๋‹ˆ๋‹ค.

  • ๋” 200 ๊ฐœ (์‹ค์ œ๋กœ ์šฐ๋ฆฌ๊ฐ€ ์ž‘์—…์„ ์ฑ„์šฐ๋Š” ์›Œํฌํ”Œ๋กœ),
  • ๊ฐ๊ฐ ํ‰๊ท ์ ์œผ๋กœ 70๊ฐœ์˜ ์ž‘์—…,
  • ์ด ์„ ํ•จ์€ ์‹œ์ž‘๋ฉ๋‹ˆ๋‹ค (๋˜ํ•œ ํ‰๊ท ์ ์œผ๋กœ) ํ•œ ์‹œ๊ฐ„์— ํ•œ ๋ฒˆ.

๊ทธ๋ฆฌ๊ณ  ์šฐ๋ฆฌ๊ฐ€ ์–ด๋–ป๊ฒŒ ํ™•์žฅํ–ˆ๋Š”์ง€์— ๋Œ€ํ•ด์„œ๋Š” ์•„๋ž˜์— ์“ฐ๊ฒ ์ง€๋งŒ ์ด์ œ ์šฐ๋ฆฌ๊ฐ€ ํ•ด๊ฒฐํ•  รผber-problem์„ ์ •์˜ํ•ด ๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

๊ฐ๊ฐ 50๊ฐœ์˜ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๊ฐ€ ์žˆ๋Š” XNUMX๊ฐœ์˜ ์†Œ์Šค SQL Server๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. ๊ฐ๊ฐ ํ•˜๋‚˜์˜ ํ”„๋กœ์ ํŠธ ์ธ์Šคํ„ด์Šค์ด๋ฉฐ ๊ตฌ์กฐ๊ฐ€ ๋™์ผํ•ฉ๋‹ˆ๋‹ค(๊ฑฐ์˜ ๋ชจ๋“  ๊ณณ, mua-ha-ha). ์ฆ‰, ๊ฐ๊ฐ Orders ํ…Œ์ด๋ธ”(๋‹คํ–‰ํžˆ๋„ ํ•ด๋‹น ํ…Œ์ด๋ธ” ์ด๋ฆ„์€ ๋ชจ๋“  ๋น„์ฆˆ๋‹ˆ์Šค์— ์ ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค). ์šฐ๋ฆฌ๋Š” ์„œ๋น„์Šค ํ•„๋“œ(์†Œ์Šค ์„œ๋ฒ„, ์†Œ์Šค ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค, ETL ์ž‘์—… ID)๋ฅผ ์ถ”๊ฐ€ํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜ค๊ณ  ์ˆœ์ง„ํ•˜๊ฒŒ Vertica์— ๋„ฃ์Šต๋‹ˆ๋‹ค.

๊ฐ€์ž!

์ฃผ์š” ๋ถ€๋ถ„, ์‹ค์šฉ์ ์ธ (๊ทธ๋ฆฌ๊ณ  ์•ฝ๊ฐ„์˜ ์ด๋ก ์ ์ธ)

์™œ ์šฐ๋ฆฌ(๊ทธ๋ฆฌ๊ณ  ๋‹น์‹ )๋Š”

๋‚˜๋ฌด๊ฐ€ ํฌ๊ณ  ๋‚ด๊ฐ€ ๋‹จ์ˆœํ–ˆ์„ ๋•Œ SQL-schik, ํ•œ ๋Ÿฌ์‹œ์•„ ์†Œ๋งค์ ์—์„œ ์šฐ๋ฆฌ๋Š” ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ๋‘ ๊ฐ€์ง€ ๋„๊ตฌ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ETL ํ”„๋กœ์„ธ์Šค, ์ฆ‰ ๋ฐ์ดํ„ฐ ํ๋ฆ„์„ ์‚ฌ๊ธฐ์ณค์Šต๋‹ˆ๋‹ค.

  • ์ธํฌ๋งคํ‹ฐ์นด ํŒŒ์›Œ ์„ผํ„ฐ - ์ž์ฒด ํ•˜๋“œ์›จ์–ด, ์ž์ฒด ๋ฒ„์ „ ๊ด€๋ฆฌ ๊ธฐ๋Šฅ์„ ๊ฐ–์ถ˜ ๊ทน๋„๋กœ ํ™•์‚ฐ๋˜๊ณ  ๋งค์šฐ ์ƒ์‚ฐ์ ์ธ ์‹œ์Šคํ…œ. ๋‚˜๋Š” ๊ทธ ๋Šฅ๋ ฅ์˜ 1%๋ฅผ ๊ธˆ์ง€ํ•˜๋Š” ์‹ ์„ ์‚ฌ์šฉํ–ˆ์Šต๋‹ˆ๋‹ค. ์™œ? ๊ธ€์Ž„์š”, ์šฐ์„  380 ๋…„๋Œ€ ์–ด๋”˜๊ฐ€์—์žˆ๋Š”์ด ์ธํ„ฐํŽ˜์ด์Šค๋Š” ์ •์‹ ์ ์œผ๋กœ ์šฐ๋ฆฌ์—๊ฒŒ ์••๋ ฅ์„๊ฐ€ํ•ฉ๋‹ˆ๋‹ค. ๋‘˜์งธ, ์ด ์žฅ์น˜๋Š” ๋งค์šฐ ๋ฉ‹์ง„ ํ”„๋กœ์„ธ์Šค, ๊ฒฉ๋ ฌํ•œ ๊ตฌ์„ฑ ์š”์†Œ ์žฌ์‚ฌ์šฉ ๋ฐ ๊ธฐํƒ€ ๋งค์šฐ ์ค‘์š”ํ•œ ์—”ํ„ฐํ”„๋ผ์ด์ฆˆ ํŠธ๋ฆญ์„ ์œ„ํ•ด ์„ค๊ณ„๋˜์—ˆ์Šต๋‹ˆ๋‹ค. Airbus AXNUMX / year์˜ ๋‚ ๊ฐœ์™€ ๊ฐ™์ด ๋น„์šฉ์ด ๋“ ๋‹ค๋Š” ์‚ฌ์‹ค์— ๋Œ€ํ•ด์„œ๋Š” ์•„๋ฌด ๋ง๋„ํ•˜์ง€ ์•Š๊ฒ ์Šต๋‹ˆ๋‹ค.

    ์ฃผ์˜, ์Šคํฌ๋ฆฐ์ƒท์€ 30์„ธ ๋ฏธ๋งŒ์˜ ์‚ฌ๋žŒ๋“ค์—๊ฒŒ ์•ฝ๊ฐ„์˜ ์ƒ์ฒ˜๋ฅผ ์ค„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

    Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

  • SQL ์„œ๋ฒ„ ํ†ตํ•ฉ ์„œ๋ฒ„ - ์šฐ๋ฆฌ๋Š” ํ”„๋กœ์ ํŠธ ๋‚ด ํ๋ฆ„์—์„œ ์ด ๋™์ง€๋ฅผ ์‚ฌ์šฉํ–ˆ์Šต๋‹ˆ๋‹ค. ์‚ฌ์‹ค, ์šฐ๋ฆฌ๋Š” ์ด๋ฏธ SQL Server๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ์œผ๋ฉฐ ETL ๋„๊ตฌ๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ์•Š๋Š” ๊ฒƒ์ด ์–ด๋–ป๊ฒŒ๋“  ๋น„ํ•ฉ๋ฆฌ์ ์ผ ๊ฒƒ์ž…๋‹ˆ๋‹ค. ๊ทธ๊ฒƒ์˜ ๋ชจ๋“  ๊ฒƒ์ด ์ข‹์Šต๋‹ˆ๋‹ค : ์ธํ„ฐํŽ˜์ด์Šค๊ฐ€ ์•„๋ฆ„๋‹ต๊ณ  ์ง„ํ–‰๋ฅ  ๋ณด๊ณ ์„œ ... ํ•˜์ง€๋งŒ ์ด๊ฒƒ์ด ์šฐ๋ฆฌ๊ฐ€ ์†Œํ”„ํŠธ์›จ์–ด ์ œํ’ˆ์„ ์ข‹์•„ํ•˜๋Š” ์ด์œ ๊ฐ€ ์•„๋‹™๋‹ˆ๋‹ค. ๊ทธ๊ฒƒ์„ ๋ฒ„์ „ dtsx (์ €์žฅํ•  ๋•Œ ๋…ธ๋“œ๊ฐ€ ์„ž์ธ XML) ํ•  ์ˆ˜ ์žˆ์ง€๋งŒ ์š”์ ์ด ๋ฌด์—‡์ž…๋‹ˆ๊นŒ? ํ•œ ์„œ๋ฒ„์—์„œ ๋‹ค๋ฅธ ์„œ๋ฒ„๋กœ ์ˆ˜๋ฐฑ ๊ฐœ์˜ ํ…Œ์ด๋ธ”์„ ๋“œ๋ž˜๊ทธํ•˜๋Š” ์ž‘์—… ํŒจํ‚ค์ง€๋ฅผ ๋งŒ๋“œ๋Š” ๊ฒƒ์€ ์–ด๋–ป์Šต๋‹ˆ๊นŒ? ์˜ˆ, ๋ฐฑ, ๋งˆ์šฐ์Šค ๋ฒ„ํŠผ์„ ํด๋ฆญํ•˜๋ฉด ์ง‘๊ฒŒ ์†๊ฐ€๋ฝ์ด ์Šค๋ฌด ์กฐ๊ฐ์—์„œ ๋–จ์–ด์ง‘๋‹ˆ๋‹ค. ํ•˜์ง€๋งŒ ํ™•์‹คํžˆ ๋” ํŒจ์…”๋„ˆ๋ธ”ํ•ด ๋ณด์ž…๋‹ˆ๋‹ค.

    Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

์šฐ๋ฆฌ๋Š” ํ™•์‹คํžˆ ํƒˆ์ถœ๊ตฌ๋ฅผ ์ฐพ์•˜์Šต๋‹ˆ๋‹ค. ์ผ€์ด์Šค ์ง์ˆ˜ ๊ฑฐ์˜ ์ž์ฒด ์ž‘์„ฑ SSIS ํŒจํ‚ค์ง€ ์ƒ์„ฑ๊ธฐ์— ์™”์Šต๋‹ˆ๋‹ค ...

... ๊ทธ๋ฆฌ๊ณ  ์ƒˆ๋กœ์šด ์ง์—…์ด ์ €๋ฅผ ์ฐพ์•˜์Šต๋‹ˆ๋‹ค. ๊ทธ๋ฆฌ๊ณ  Apache Airflow๊ฐ€ ์ €๋ฅผ ์ถ”์›”ํ–ˆ์Šต๋‹ˆ๋‹ค.

ETL ํ”„๋กœ์„ธ์Šค ์„ค๋ช…์ด ๊ฐ„๋‹จํ•œ Python ์ฝ”๋“œ๋ผ๋Š” ๊ฒƒ์„ ์•Œ์•˜์„ ๋•Œ ๋‚˜๋Š” ๊ธฐ๋ป์„œ ์ถค์ถ”์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค. ์ด๊ฒƒ์ด ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์˜ ๋ฒ„์ „์ด ์ง€์ •๋˜๊ณ  ๊ตฌ๋ถ„๋˜๋Š” ๋ฐฉ์‹์ด๋ฉฐ, ์ˆ˜๋ฐฑ ๊ฐœ์˜ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์—์„œ ๋‹จ์ผ ๊ตฌ์กฐ์˜ ํ…Œ์ด๋ธ”์„ ํ•˜๋‚˜์˜ ๋Œ€์ƒ์œผ๋กœ ์Ÿ์•„ ๋ถ“๋Š” ๊ฒƒ์€ 13 ๋˜๋Š” XNUMX๊ฐœ์˜ XNUMX์ธ์น˜ ํ™”๋ฉด์—์„œ Python ์ฝ”๋“œ์˜ ๋ฌธ์ œ๊ฐ€ ๋˜์—ˆ์Šต๋‹ˆ๋‹ค.

ํด๋Ÿฌ์Šคํ„ฐ ์กฐ๋ฆฝ

์™„์ „ํ•œ ์œ ์น˜์›์„ ์ค€๋น„ํ•˜์ง€ ๋ง๊ณ  ์—ฌ๊ธฐ์—์„œ Airflow, ์„ ํƒํ•œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค, Celery ๋ฐ ๋„ํฌ์— ์„ค๋ช… ๋œ ๊ธฐํƒ€ ์‚ฌ๋ก€ ์„ค์น˜์™€ ๊ฐ™์ด ์™„์ „ํžˆ ๋ช…๋ฐฑํ•œ ๊ฒƒ์— ๋Œ€ํ•ด ์ด์•ผ๊ธฐํ•˜์ง€ ๋งˆ์‹ญ์‹œ์˜ค.

์ฆ‰์‹œ ์‹คํ—˜์„ ์‹œ์ž‘ํ•  ์ˆ˜ ์žˆ๋„๋ก ์Šค์ผ€์น˜ํ–ˆ์Šต๋‹ˆ๋‹ค. docker-compose.yml ์—ฌ๊ธฐ์„œ:

  • ์‹ค์ œ๋กœ ์˜ฌ๋ฆฌ์ž ๊ธฐ๋ฅ˜: ์Šค์ผ€์ค„๋Ÿฌ, ์›น์„œ๋ฒ„. ๊ฝƒ์€ ์…€๋Ÿฌ๋ฆฌ ์ž‘์—…์„ ๋ชจ๋‹ˆํ„ฐ๋งํ•˜๊ธฐ ์œ„ํ•ด ๊ทธ๊ณณ์—์„œ ํšŒ์ „ํ•  ๊ฒƒ์ž…๋‹ˆ๋‹ค(์ด๋ฏธ apache/airflow:1.10.10-python3.7, ๊ทธ๋Ÿฌ๋‚˜ ์šฐ๋ฆฌ๋Š” ์ƒ๊ด€ํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค)
  • PostgreSQL, Airflow๋Š” ์„œ๋น„์Šค ์ •๋ณด(์Šค์ผ€์ค„๋Ÿฌ ๋ฐ์ดํ„ฐ, ์‹คํ–‰ ํ†ต๊ณ„ ๋“ฑ)๋ฅผ ์ž‘์„ฑํ•˜๊ณ  Celery๋Š” ์™„๋ฃŒ๋œ ์ž‘์—…์„ ํ‘œ์‹œํ•ฉ๋‹ˆ๋‹ค.
  • Redis, Celery์˜ ํƒœ์Šคํฌ ๋ธŒ๋กœ์ปค ์—ญํ• ์„ ํ•ฉ๋‹ˆ๋‹ค.
  • ์…€๋Ÿฌ๋ฆฌ ์ผ๊พผ, ์ž‘์—…์„ ์ง์ ‘ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.
  • ํด๋”๋กœ ./dags dags์— ๋Œ€ํ•œ ์„ค๋ช…๊ณผ ํ•จ๊ป˜ ํŒŒ์ผ์„ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค. ์ฆ‰์„์—์„œ ํ”ฝ์—…๋˜๋ฏ€๋กœ ์žฌ์ฑ„๊ธฐ๋ฅผ ํ•  ๋•Œ๋งˆ๋‹ค ์ „์ฒด ์Šคํƒ์„ ์ €๊ธ€๋งํ•  ํ•„์š”๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.

์–ด๋–ค ๊ณณ์—์„œ๋Š” ์˜ˆ์ œ์˜ ์ฝ”๋“œ๊ฐ€ ์™„์ „ํžˆ ํ‘œ์‹œ๋˜์ง€ ์•Š์ง€๋งŒ(ํ…์ŠคํŠธ๋ฅผ ์–ด์ง€๋Ÿฝํžˆ์ง€ ์•Š๋„๋ก) ์–ด๋”˜๊ฐ€์—์„œ ํ”„๋กœ์„ธ์Šค์—์„œ ์ˆ˜์ •๋ฉ๋‹ˆ๋‹ค. ์ „์ฒด ์ž‘์—… ์ฝ”๋“œ ์˜ˆ์ œ๋Š” ์ €์žฅ์†Œ์—์„œ ์ฐพ์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

์ฐธ๊ณ  ์‚ฌํ•ญ :

  • ์ž‘๊ณก์˜ ์กฐ๋ฆฝ์—์„œ ๋‚˜๋Š” ์ž˜ ์•Œ๋ ค์ง„ ์ด๋ฏธ์ง€์— ํฌ๊ฒŒ ์˜์กดํ–ˆ์Šต๋‹ˆ๋‹ค. ํผํด/๋„์ปค-๊ธฐ๋ฅ˜ - ๊ผญ ํ™•์ธํ•˜์„ธ์š”. ์•„๋งˆ๋„ ๋‹น์‹ ์€ ๋‹น์‹ ์˜ ์‚ถ์—์„œ ๋‹ค๋ฅธ ๊ฒƒ์ด ํ•„์š”ํ•˜์ง€ ์•Š์„ ๊ฒƒ์ž…๋‹ˆ๋‹ค.
  • ๋ชจ๋“  ๊ธฐ๋ฅ˜ ์„ค์ •์€ airflow.cfg, ๋ฟ๋งŒ ์•„๋‹ˆ๋ผ (๊ฐœ๋ฐœ์ž ๋•๋ถ„์—) ํ™˜๊ฒฝ ๋ณ€์ˆ˜๋ฅผ ํ†ตํ•ด ์•…์˜์ ์œผ๋กœ ์ด์šฉํ–ˆ์Šต๋‹ˆ๋‹ค.
  • ๋‹น์—ฐํžˆ ํ”„๋กœ๋•์…˜ ์ค€๋น„๊ฐ€ ๋˜์–ด ์žˆ์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ์ผ๋ถ€๋Ÿฌ ์ปจํ…Œ์ด๋„ˆ์— ํ•˜ํŠธ๋น„ํŠธ๋ฅผ ๋„ฃ์ง€ ์•Š์•˜๊ณ  ๋ณด์•ˆ์— ์‹ ๊ฒฝ ์“ฐ์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ๋‚˜๋Š” ์‹คํ—˜์ž๋“ค์—๊ฒŒ ์ ํ•ฉํ•œ ์ตœ์†Œํ•œ์˜ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ–ˆ์Šต๋‹ˆ๋‹ค.
  • ์ฐธ๊ณ  :
    • dag ํด๋”๋Š” ์Šค์ผ€์ค„๋Ÿฌ์™€ ์ž‘์—…์ž ๋ชจ๋‘์— ์•ก์„ธ์Šคํ•  ์ˆ˜ ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.
    • ๋ชจ๋“  ํƒ€์‚ฌ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์—๋„ ๋™์ผํ•˜๊ฒŒ ์ ์šฉ๋ฉ๋‹ˆ๋‹ค. ์Šค์ผ€์ค„๋Ÿฌ์™€ ์ž‘์—…์ž๊ฐ€ ์žˆ๋Š” ์‹œ์Šคํ…œ์— ๋ชจ๋‘ ์„ค์น˜ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

์ด์ œ ๊ฐ„๋‹จํ•ฉ๋‹ˆ๋‹ค.

$ docker-compose up --scale worker=3

๋ชจ๋“  ๊ฒƒ์ด ์ƒ์Šนํ•˜๋ฉด ์›น ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๋ณผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๊ธฐ๋ณธ ๊ฐœ๋…

์ด ๋ชจ๋“  "dags"์—์„œ ์•„๋ฌด๊ฒƒ๋„ ์ดํ•ดํ•˜์ง€ ๋ชปํ–ˆ๋‹ค๋ฉด ์—ฌ๊ธฐ์— ์งง์€ ์‚ฌ์ „์ด ์žˆ์Šต๋‹ˆ๋‹ค.

  • ์Šค์ผ€์ค„๋Ÿฌ -Airflow์—์„œ ๊ฐ€์žฅ ์ค‘์š”ํ•œ ์‚ผ์ดŒ, ๋กœ๋ด‡์ด ์‚ฌ๋žŒ์ด ์•„๋‹Œ ์—ด์‹ฌํžˆ ์ž‘๋™ํ•˜๋„๋ก ์ œ์–ด: ์ผ์ • ๋ชจ๋‹ˆํ„ฐ๋ง, dags ์—…๋ฐ์ดํŠธ, ์ž‘์—… ์‹œ์ž‘.

    ์ผ๋ฐ˜์ ์œผ๋กœ ์ด์ „ ๋ฒ„์ „์—์„œ๋Š” ๋ฉ”๋ชจ๋ฆฌ์— ๋ฌธ์ œ๊ฐ€ ์žˆ์—ˆ๊ณ (๊ธฐ์–ต ์ƒ์‹ค์ด ์•„๋‹ˆ๋ผ ๋ˆ„์ˆ˜) ๋ ˆ๊ฑฐ์‹œ ๋งค๊ฐœ๋ณ€์ˆ˜๊ฐ€ ๊ตฌ์„ฑ์— ๋‚จ์•„ ์žˆ์—ˆ์Šต๋‹ˆ๋‹ค. run_duration - ์žฌ์‹œ์ž‘ ๊ฐ„๊ฒฉ. ๊ทธ๋Ÿฌ๋‚˜ ์ด์ œ ๋ชจ๋“  ๊ฒƒ์ด ๊ดœ์ฐฎ์Šต๋‹ˆ๋‹ค.

  • DAG (์ผ๋ช… "dag") - "๋ฐฉํ–ฅ์„ฑ ๋น„์ˆœํ™˜ ๊ทธ๋ž˜ํ”„", ๊ทธ๋Ÿฌ๋‚˜ ๊ทธ๋Ÿฌํ•œ ์ •์˜๋Š” ์†Œ์ˆ˜์˜ ์‚ฌ๋žŒ๋“ค์—๊ฒŒ๋งŒ ์•Œ๋ ค์ค„ ๊ฒƒ์ž…๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ์‹ค์ œ๋กœ๋Š” ์„œ๋กœ ์ƒํ˜ธ ์ž‘์šฉํ•˜๋Š” ์ž‘์—…์„ ์œ„ํ•œ ์ปจํ…Œ์ด๋„ˆ(์•„๋ž˜ ์ฐธ์กฐ)์ด๊ฑฐ๋‚˜ SSIS์˜ ํŒจํ‚ค์ง€ ๋ฐ Informatica์˜ ์›Œํฌํ”Œ๋กœ์™€ ์œ ์‚ฌํ•ฉ๋‹ˆ๋‹ค. .

    dags ์™ธ์—๋„ ์—ฌ์ „ํžˆ ํ•˜์œ„ dag๊ฐ€ ์žˆ์„ ์ˆ˜ ์žˆ์ง€๋งŒ ๋Œ€๋ถ€๋ถ„ ๋„๋‹ฌํ•˜์ง€ ๋ชปํ•  ๊ฒƒ์ž…๋‹ˆ๋‹ค.

  • DAG ์‹คํ–‰ - ์ž์ฒด์ ์œผ๋กœ ํ• ๋‹น๋œ ์ดˆ๊ธฐํ™”๋œ dag execution_date. ๊ฐ™์€ dag์˜ Dagrans๋Š” ๋ณ‘๋ ฌ๋กœ ์ž‘์—…ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค(๋ฌผ๋ก  ์ž‘์—…์„ ๋ฉฑ๋“ฑ์„ฑ์œผ๋กœ ๋งŒ๋“  ๊ฒฝ์šฐ).
  • ์šด์˜์ž ํŠน์ • ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋Š” ์ฝ”๋“œ ์กฐ๊ฐ์ž…๋‹ˆ๋‹ค. ์„ธ ๊ฐ€์ง€ ์œ ํ˜•์˜ ์—ฐ์‚ฐ์ž๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.
    • ๋™์ž‘์šฐ๋ฆฌ๊ฐ€ ๊ฐ€์žฅ ์ข‹์•„ํ•˜๋Š” ๊ฒƒ์ฒ˜๋Ÿผ PythonOperator๋ชจ๋“  (์œ ํšจํ•œ) Python ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
    • ์ด์ „, ์˜ˆ๋ฅผ ๋“ค์–ด ์žฅ์†Œ์—์„œ ์žฅ์†Œ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ „์†กํ•˜๋Š” MsSqlToHiveTransfer;
    • ๊ฐ์ง€๊ธฐ ๋ฐ˜๋ฉด์— ์ด๋ฒคํŠธ๊ฐ€ ๋ฐœ์ƒํ•  ๋•Œ๊นŒ์ง€ dag์˜ ์ถ”๊ฐ€ ์‹คํ–‰์— ๋ฐ˜์‘ํ•˜๊ฑฐ๋‚˜ ์†๋„๋ฅผ ๋Šฆ์ถœ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. HttpSensor ์ง€์ •๋œ ๋์ ์„ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ์œผ๋ฉฐ ์›ํ•˜๋Š” ์‘๋‹ต์ด ๋Œ€๊ธฐ ์ค‘์ผ ๋•Œ ์ „์†ก์„ ์‹œ์ž‘ํ•ฉ๋‹ˆ๋‹ค. GoogleCloudStorageToS3Operator. ํ˜ธ๊ธฐ์‹ฌ ๋งŽ์€ ๋งˆ์Œ์€ "์™œ? ๊ฒฐ๊ตญ ์—ฐ์‚ฐ์ž์—์„œ ๋ฐ”๋กœ ๋ฐ˜๋ณต์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค!โ€ ๊ทธ๋Ÿฐ ๋‹ค์Œ ์ •์ง€๋œ ์šด์˜์ž๋กœ ์ž‘์—… ํ’€์„ ๋ง‰ํžˆ์ง€ ์•Š๋„๋ก ํ•ฉ๋‹ˆ๋‹ค. ์„ผ์„œ๋Š” ๋‹ค์Œ ์‹œ๋„ ์ „์— ์‹œ์ž‘, ํ™•์ธ ๋ฐ ์ข…๋ฃŒ๋ฉ๋‹ˆ๋‹ค.
  • ํƒœ์Šคํฌ - ์„ ์–ธ๋œ ์—ฐ์‚ฐ์ž๋Š” ์œ ํ˜•์— ๊ด€๊ณ„์—†์ด dag์— ๋ถ€์ฐฉ๋˜์–ด ์ž‘์—…์˜ ์ˆœ์œ„๋กœ ์Šน๊ฒฉ๋ฉ๋‹ˆ๋‹ค.
  • ์ž‘์—… ์ธ์Šคํ„ด์Šค -์ผ๋ฐ˜ ๊ธฐํš์ž๊ฐ€ ์ˆ˜ํ–‰์ž-์ž‘์—…์ž์— ๋Œ€ํ•œ ์ „ํˆฌ์— ์ž‘์—…์„ ๋ณด๋‚ผ ์‹œ๊ฐ„์ด๋ผ๊ณ  ๊ฒฐ์ •ํ–ˆ์„ ๋•Œ (์šฐ๋ฆฌ๊ฐ€ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ ๋ฐ”๋กœ ๊ทธ ์ž๋ฆฌ์—์„œ) LocalExecutor ๋˜๋Š” ์›๊ฒฉ ๋…ธ๋“œ์˜ ๊ฒฝ์šฐ CeleryExecutor) ์ปจํ…์ŠคํŠธ๋ฅผ ํ• ๋‹นํ•˜๊ณ (์ฆ‰, ๋ณ€์ˆ˜ ์„ธํŠธ - ์‹คํ–‰ ๋งค๊ฐœ๋ณ€์ˆ˜) ๋ช…๋ น ๋˜๋Š” ์ฟผ๋ฆฌ ํ…œํ”Œ๋ฆฟ์„ ํ™•์žฅํ•˜๊ณ  ํ’€๋งํ•ฉ๋‹ˆ๋‹ค.

์šฐ๋ฆฌ๋Š” ์ž‘์—…์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค

๋จผ์ € doug์˜ ์ผ๋ฐ˜์ ์ธ ๊ณ„ํš์„ ๊ฐœ๋žต์ ์œผ๋กœ ์„ค๋ช…ํ•œ ๋‹ค์Œ ์‚ฌ์†Œํ•œ ์†”๋ฃจ์…˜์„ ์ ์šฉํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์„ธ๋ถ€ ์‚ฌํ•ญ์„ ์ ์  ๋” ์ž์„ธํžˆ ์‚ดํŽด ๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

๋”ฐ๋ผ์„œ ๊ฐ€์žฅ ๊ฐ„๋‹จํ•œ ํ˜•ํƒœ๋กœ ์ด๋Ÿฌํ•œ dag๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

๊ทธ๊ฒƒ์„ ์•Œ์•„ ๋ด…์‹œ๋‹ค :

  • ๋จผ์ € ํ•„์š”ํ•œ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ๊ฐ€์ ธ์˜ค๊ณ  ๋‹ค๋ฅธ ๊ฒƒ;
  • sql_server_ds -๊ฐ€์š” List[namedtuple[str, str]] Airflow Connections์˜ ์—ฐ๊ฒฐ ์ด๋ฆ„๊ณผ ํ”Œ๋ ˆ์ดํŠธ๋ฅผ ๊ฐ€์ ธ์˜ฌ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค;
  • dag -๋ฐ˜๋“œ์‹œ ์žˆ์–ด์•ผํ•˜๋Š” ์šฐ๋ฆฌ dag์˜ ๋ฐœํ‘œ globals(), ๊ทธ๋ ‡์ง€ ์•Š์œผ๋ฉด Airflow์—์„œ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค. Doug๋Š” ๋˜ํ•œ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ๋งํ•  ํ•„์š”๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.
    • ๊ทธ์˜ ์ด๋ฆ„์€ ๋ฌด์—‡์ž…๋‹ˆ๊นŒ orders - ๊ทธ๋Ÿฌ๋ฉด ์ด ์ด๋ฆ„์ด ์›น ์ธํ„ฐํŽ˜์ด์Šค์— ํ‘œ์‹œ๋ฉ๋‹ˆ๋‹ค.
    • ๊ทธ๋Š” XNUMX์›” XNUMX์ผ ์ž์ •๋ถ€ํ„ฐ ์ผํ•  ๊ฒƒ์ด๋ฉฐ,
    • ์•ฝ 6์‹œ๊ฐ„๋งˆ๋‹ค ์‹คํ–‰๋˜์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. timedelta() ํ—ˆ์šฉ cron-์„  0 0 0/6 ? * * *, ๋œ ๋ฉ‹์ง„ ๊ฒฝ์šฐ - ๋‹ค์Œ๊ณผ ๊ฐ™์€ ํ‘œํ˜„ @daily);
  • workflow() ์ฃผ์š” ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜์ง€๋งŒ ์ง€๊ธˆ์€ ์•„๋‹™๋‹ˆ๋‹ค. ์ง€๊ธˆ์€ ์ปจํ…์ŠคํŠธ๋ฅผ ๋กœ๊ทธ์— ๋คํ”„ํ•ฉ๋‹ˆ๋‹ค.
  • ์ด์ œ ์ž‘์—… ์ƒ์„ฑ์˜ ๊ฐ„๋‹จํ•œ ๋งˆ๋ฒ•:
    • ์šฐ๋ฆฌ๋Š” ์†Œ์Šค๋ฅผ ํ†ตํ•ด ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.
    • ์ดˆ๊ธฐํ™” PythonOperator, ๋”๋ฏธ๋ฅผ ์‹คํ–‰ํ•  ๊ฒƒ์ž…๋‹ˆ๋‹ค. workflow(). ์ž‘์—…์˜ ๊ณ ์œ ํ•œ(dag ๋‚ด) ์ด๋ฆ„์„ ์ง€์ •ํ•˜๊ณ  dag ์ž์ฒด๋ฅผ ์—ฐ๊ฒฐํ•˜๋Š” ๊ฒƒ์„ ์žŠ์ง€ ๋งˆ์‹ญ์‹œ์˜ค. ๊นƒ๋ฐœ provide_context ์ฐจ๋ก€๋กœ ํ•จ์ˆ˜์— ์ถ”๊ฐ€ ์ธ์ˆ˜๋ฅผ ์ถ”๊ฐ€ํ•˜๊ณ  ๋‹ค์Œ์„ ์‚ฌ์šฉํ•˜์—ฌ ์‹ ์ค‘ํ•˜๊ฒŒ ์ˆ˜์ง‘ํ•ฉ๋‹ˆ๋‹ค. **context.

์ง€๊ธˆ์€ ๊ทธ๊ฒŒ ๋‹ค์ž…๋‹ˆ๋‹ค. ์šฐ๋ฆฌ๊ฐ€ ์–ป์€ ๊ฒƒ:

  • ์›น ์ธํ„ฐํŽ˜์ด์Šค์˜ ์ƒˆ๋กœ์šด dag,
  • ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰๋  XNUMX๊ฐœ์˜ ์ž‘์—…(Airflow, Celery ์„ค์ • ๋ฐ ์„œ๋ฒ„ ์šฉ๋Ÿ‰์ด ํ—ˆ์šฉํ•˜๋Š” ๊ฒฝ์šฐ).

๊ธ€์Ž„, ๊ฑฐ์˜ ์•Œ์•˜์–ด์š”.

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ
๋ˆ„๊ฐ€ ์ข…์†์„ฑ์„ ์„ค์น˜ํ•ฉ๋‹ˆ๊นŒ?

์ด ๋ชจ๋“  ๊ฒƒ์„ ๋‹จ์ˆœํ™”ํ•˜๊ธฐ ์œ„ํ•ด ๋‚˜๋Š” ๋ง์ณค์Šต๋‹ˆ๋‹ค docker-compose.yml ๊ฐ€๊ณต requirements.txt ๋ชจ๋“  ๋…ธ๋“œ์—์„œ.

์ด์ œ ์‚ฌ๋ผ์กŒ์Šต๋‹ˆ๋‹ค.

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

ํšŒ์ƒ‰ ์‚ฌ๊ฐํ˜•์€ ์Šค์ผ€์ค„๋Ÿฌ๊ฐ€ ์ฒ˜๋ฆฌํ•˜๋Š” ์ž‘์—… ์ธ์Šคํ„ด์Šค์ž…๋‹ˆ๋‹ค.

์šฐ๋ฆฌ๋Š” ์กฐ๊ธˆ ๊ธฐ๋‹ค๋ฆฌ๋ฉด ์ž‘์—…์ด ์ž‘์—…์ž์— ์˜ํ•ด ์Šค๋ƒ…๋ฉ๋‹ˆ๋‹ค.

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

๋ฌผ๋ก  ๋…น์ƒ‰์€ ์„ฑ๊ณต์ ์œผ๋กœ ์ž‘์—…์„ ์™„๋ฃŒํ–ˆ์Šต๋‹ˆ๋‹ค. Reds๋Š” ๊ทธ๋‹ค์ง€ ์„ฑ๊ณต์ ์ด์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

๊ทธ๋Ÿฐ๋ฐ ์šฐ๋ฆฌ ์ œํ’ˆ์—๋Š” ํด๋”๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค. ./dags, ๊ธฐ๊ณ„๊ฐ„์— ๋™๊ธฐํ™”๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค. ๋ชจ๋“  dag๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. git Gitlab์—์„œ Gitlab CI๋Š” ๋ณ‘ํ•ฉํ•  ๋•Œ ์ปดํ“จํ„ฐ์— ์—…๋ฐ์ดํŠธ๋ฅผ ๋ฐฐํฌํ•ฉ๋‹ˆ๋‹ค. master.

๊ฝƒ์— ๋Œ€ํ•ด ์กฐ๊ธˆ

์ผ๊พผ๋“ค์ด ์ –๊ผญ์ง€๋ฅผ ๋•Œ๋ฆฌ๋Š” ๋™์•ˆ ์šฐ๋ฆฌ์—๊ฒŒ ๋ฌด์—‡์ธ๊ฐ€๋ฅผ ๋ณด์—ฌ์ค„ ์ˆ˜ ์žˆ๋Š” ๋˜ ๋‹ค๋ฅธ ๋„๊ตฌ์ธ ๊ฝƒ์„ ๊ธฐ์–ตํ•ด ๋ด…์‹œ๋‹ค.

์ž‘์—…์ž ๋…ธ๋“œ์— ๋Œ€ํ•œ ์š”์•ฝ ์ •๋ณด๊ฐ€ ์žˆ๋Š” ์ฒซ ๋ฒˆ์งธ ํŽ˜์ด์ง€:

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

์ž‘์—…์ด ์ง„ํ–‰๋œ ๊ฐ€์žฅ ์ง‘์ค‘์ ์ธ ํŽ˜์ด์ง€:

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

๋ธŒ๋กœ์ปค ์ƒํƒœ๊ฐ€ ์žˆ๋Š” ๊ฐ€์žฅ ์ง€๋ฃจํ•œ ํŽ˜์ด์ง€:

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

๊ฐ€์žฅ ๋ฐ์€ ํŽ˜์ด์ง€๋Š” ์ž‘์—… ์ƒํƒœ ๊ทธ๋ž˜ํ”„์™€ ์‹คํ–‰ ์‹œ๊ฐ„์ด ์žˆ๋Š” ํŽ˜์ด์ง€์ž…๋‹ˆ๋‹ค.

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

์šฐ๋ฆฌ๋Š” ์ €๋ถ€ํ•˜๋ฅผ ๋กœ๋“œํ•ฉ๋‹ˆ๋‹ค

๋”ฐ๋ผ์„œ ๋ชจ๋“  ์ž‘์—…์ด ์™„๋ฃŒ๋˜์—ˆ์œผ๋ฏ€๋กœ ๋ถ€์ƒ์ž๋ฅผ ์˜ฎ๊ธธ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

๊ทธ๋ฆฌ๊ณ  ์—ฌ๋Ÿฌ ๊ฐ€์ง€ ์ด์œ ๋กœ ๋ถ€์ƒ์ž๊ฐ€ ๋งŽ์•˜์Šต๋‹ˆ๋‹ค. Airflow๋ฅผ ์˜ฌ๋ฐ”๋ฅด๊ฒŒ ์‚ฌ์šฉํ•œ ๊ฒฝ์šฐ ๋ฐ”๋กœ ์ด๋Ÿฌํ•œ ์‚ฌ๊ฐํ˜•์€ ๋ฐ์ดํ„ฐ๊ฐ€ ํ™•์‹คํžˆ ๋„์ฐฉํ•˜์ง€ ์•Š์•˜์Œ์„ ๋‚˜ํƒ€๋ƒ…๋‹ˆ๋‹ค.

๋กœ๊ทธ๋ฅผ ๋ณด๊ณ  ํƒ€๋ฝํ•œ ์ž‘์—… ์ธ์Šคํ„ด์Šค๋ฅผ ๋‹ค์‹œ ์‹œ์ž‘ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

์‚ฌ๊ฐํ˜•์„ ํด๋ฆญํ•˜๋ฉด ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ์ž‘์—…์ด ํ‘œ์‹œ๋ฉ๋‹ˆ๋‹ค.

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

์“ฐ๋Ÿฌ์ง„ ์ž๋“ค์„ ๋ฐ๋ ค๊ฐ€์„œ ํด๋ฆฌ์–ดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ฆ‰, ๊ฑฐ๊ธฐ์—์„œ ๋ฌด์–ธ๊ฐ€ ์‹คํŒจํ–ˆ๋‹ค๋Š” ์‚ฌ์‹ค์„ ์žŠ๊ณ  ๋™์ผํ•œ ์ธ์Šคํ„ด์Šค ์ž‘์—…์ด ์Šค์ผ€์ค„๋Ÿฌ๋กœ ์ด๋™ํ•ฉ๋‹ˆ๋‹ค.

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

๋ชจ๋“  ๋นจ๊ฐ„์ƒ‰ ์‚ฌ๊ฐํ˜•์ด ์žˆ๋Š” ๋งˆ์šฐ์Šค๋กœ ์ด ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋Š” ๊ฒƒ์€ ๊ทธ๋‹ค์ง€ ์ธ๋„์ ์ด์ง€ ์•Š๋‹ค๋Š” ๊ฒƒ์ด ๋ถ„๋ช…ํ•ฉ๋‹ˆ๋‹ค. ์ด๊ฒƒ์€ Airflow์—์„œ ๊ธฐ๋Œ€ํ•˜๋Š” ๊ฒƒ์ด ์•„๋‹™๋‹ˆ๋‹ค. ๋‹น์—ฐํžˆ ์šฐ๋ฆฌ์—๊ฒŒ๋Š” ๋Œ€๋Ÿ‰ ์‚ด์ƒ ๋ฌด๊ธฐ๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. Browse/Task Instances

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

ํ•œ ๋ฒˆ์— ๋ชจ๋“  ํ•ญ๋ชฉ์„ ์„ ํƒํ•˜๊ณ  XNUMX์œผ๋กœ ์žฌ์„ค์ •ํ•˜๊ณ  ์˜ฌ๋ฐ”๋ฅธ ํ•ญ๋ชฉ์„ ํด๋ฆญํ•ฉ๋‹ˆ๋‹ค.

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

์ฒญ์†Œ ํ›„ ํƒ์‹œ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค(์Šค์ผ€์ค„๋Ÿฌ๊ฐ€ ์˜ˆ์•ฝํ•˜๊ธฐ๋ฅผ ์ด๋ฏธ ๊ธฐ๋‹ค๋ฆฌ๊ณ  ์žˆ์Œ).

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

์—ฐ๊ฒฐ, ํ›„ํฌ ๋ฐ ๊ธฐํƒ€ ๋ณ€์ˆ˜

๋‹ค์Œ DAG๋ฅผ ์‚ดํŽด๋ณผ ์‹œ๊ฐ„์ž…๋‹ˆ๋‹ค. update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""ะ“ะพัะฟะพะดะฐ ั…ะพั€ะพัˆะธะต, ะพั‚ั‡ะตั‚ั‹ ะพะฑะฝะพะฒะปะตะฝั‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ะะฐั‚ะฐัˆ, ะฟั€ะพัั‹ะฟะฐะนัั, ะผั‹ {{ dag.dag_id }} ัƒั€ะพะฝะธะปะธ
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

๋ชจ๋‘๊ฐ€ ๋ณด๊ณ ์„œ ์—…๋ฐ์ดํŠธ๋ฅผ ์ˆ˜ํ–‰ํ•œ ์ ์ด ์žˆ์Šต๋‹ˆ๊นŒ? ์ด๊ฒƒ์€ ๋‹ค์‹œ ๊ทธ๋…€์ž…๋‹ˆ๋‹ค. ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜ฌ ์†Œ์Šค ๋ชฉ๋ก์ด ์žˆ์Šต๋‹ˆ๋‹ค. ๋„ฃ์„ ๋ชฉ๋ก์ด ์žˆ์Šต๋‹ˆ๋‹ค. ๋ชจ๋“  ์ผ์ด ๋ฐœ์ƒํ•˜๊ฑฐ๋‚˜ ์ค‘๋‹จ๋˜์—ˆ์„ ๋•Œ ๊ฒฝ์ ์„ ์šธ๋ฆฌ๋Š” ๊ฒƒ์„ ์žŠ์ง€ ๋งˆ์‹ญ์‹œ์˜ค. (์Œ, ์ด๊ฒƒ์€ ์šฐ๋ฆฌ์— ๊ด€ํ•œ ๊ฒƒ์ด ์•„๋‹™๋‹ˆ๋‹ค.)

ํŒŒ์ผ์„ ๋‹ค์‹œ ์‚ดํŽด๋ณด๊ณ  ๋ชจํ˜ธํ•œ ์ƒˆ ํ•ญ๋ชฉ์„ ์‚ดํŽด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

  • from commons.operators import TelegramBotSendMessage - ์šฐ๋ฆฌ๊ฐ€ Unblocked์— ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๊ธฐ ์œ„ํ•œ ์ž‘์€ ๋ž˜ํผ๋ฅผ ๋งŒ๋“ค์–ด ํ™œ์šฉํ•œ ์ž์ฒด ์—ฐ์‚ฐ์ž๋ฅผ ๋งŒ๋“œ๋Š” ๊ฒƒ์„ ๋ฐฉํ•ดํ•˜๋Š” ๊ฒƒ์€ ์—†์Šต๋‹ˆ๋‹ค. (์•„๋ž˜์—์„œ ์ด ์—ฐ์‚ฐ์ž์— ๋Œ€ํ•ด ์ž์„ธํžˆ ์„ค๋ช…ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.)
  • default_args={} - dag๋Š” ๋ชจ๋“  ์—ฐ์‚ฐ์ž์— ๋™์ผํ•œ ์ธ์ˆ˜๋ฅผ ๋ฐฐํฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
  • to='{{ var.value.all_the_kings_men }}' - ํ•„๋“œ to ์šฐ๋ฆฌ๋Š” ํ•˜๋“œ์ฝ”๋”ฉํ•˜์ง€ ์•Š๊ณ  Jinja์™€ ๋‚ด๊ฐ€ ์กฐ์‹ฌ์Šค๋Ÿฝ๊ฒŒ ์ž…๋ ฅํ•œ ์ด๋ฉ”์ผ ๋ชฉ๋ก์ด ์žˆ๋Š” ๋ณ€์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋™์ ์œผ๋กœ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - ์˜คํผ๋ ˆ์ดํ„ฐ ์‹œ์ž‘ ์กฐ๊ฑด. ์šฐ๋ฆฌ์˜ ๊ฒฝ์šฐ ๋ชจ๋“  ์ข…์†์„ฑ์ด ํ•ด๊ฒฐ๋œ ๊ฒฝ์šฐ์—๋งŒ ํŽธ์ง€๊ฐ€ ์ƒ์‚ฌ์—๊ฒŒ ์ „๋‹ฌ๋ฉ๋‹ˆ๋‹ค. ์„ฑ๊ณต์ ์œผ๋กœ;
  • tg_bot_conn_id='tg_main' - ์ธ์ˆ˜ conn_id ์šฐ๋ฆฌ๊ฐ€ ๋งŒ๋“  ์—ฐ๊ฒฐ ID๋ฅผ ์ˆ˜๋ฝํ•ฉ๋‹ˆ๋‹ค. Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Telegram์˜ ๋ฉ”์‹œ์ง€๋Š” ๋–จ์–ด์ง„ ์ž‘์—…์ด ์žˆ๋Š” ๊ฒฝ์šฐ์—๋งŒ ๋‚ ์•„๊ฐ‘๋‹ˆ๋‹ค.
  • task_concurrency=1 - ํ•˜๋‚˜์˜ ์ž‘์—…์— ๋Œ€ํ•œ ์—ฌ๋Ÿฌ ์ž‘์—… ์ธ์Šคํ„ด์Šค์˜ ๋™์‹œ ์‹œ์ž‘์„ ๊ธˆ์ง€ํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋ ‡์ง€ ์•Š์œผ๋ฉด ์—ฌ๋Ÿฌ ์ œํ’ˆ์„ ๋™์‹œ์— ์ถœ์‹œํ•  ๊ฒƒ์ž…๋‹ˆ๋‹ค. VerticaOperator (ํ•˜๋‚˜์˜ ํ…Œ์ด๋ธ”์„ ๋ณด๊ณ );
  • report_update >> [email, tg] - ๋ชจ๋‘ VerticaOperator ๋‹ค์Œ๊ณผ ๊ฐ™์ด ํŽธ์ง€์™€ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๋Š” ๋ฐ ์ˆ˜๋ ดํ•ฉ๋‹ˆ๋‹ค.
    Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

    ๊ทธ๋Ÿฌ๋‚˜ ์•Œ๋ฆผ ์—ฐ์‚ฐ์ž๋Š” ์‹œ์ž‘ ์กฐ๊ฑด์ด ๋‹ค๋ฅด๊ธฐ ๋•Œ๋ฌธ์— ํ•˜๋‚˜๋งŒ ์ž‘๋™ํ•ฉ๋‹ˆ๋‹ค. ํŠธ๋ฆฌ ๋ณด๊ธฐ์—์„œ๋Š” ๋ชจ๋“  ๊ฒƒ์ด ๋œ ์‹œ๊ฐ์ ์œผ๋กœ ๋ณด์ž…๋‹ˆ๋‹ค.
    Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

๋‚˜๋Š”์— ๋Œ€ํ•ด ๋ช‡ ๋งˆ๋”” ๋งํ•  ๊ฒƒ์ด๋‹ค ๋งคํฌ๋กœ ๊ทธ๋ฆฌ๊ณ  ๊ทธ๋“ค์˜ ์นœ๊ตฌ๋“ค - ๋ณ€์ˆ˜.

๋งคํฌ๋กœ๋Š” ๋‹ค์–‘ํ•œ ์œ ์šฉํ•œ ์ •๋ณด๋ฅผ ์—ฐ์‚ฐ์ž ์ธ์ˆ˜๋กœ ๋Œ€์ฒดํ•  ์ˆ˜ ์žˆ๋Š” Jinja ์ž๋ฆฌ ํ‘œ์‹œ์ž์ž…๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} ์ปจํ…์ŠคํŠธ ๋ณ€์ˆ˜์˜ ๋‚ด์šฉ์œผ๋กœ ํ™•์žฅ๋ฉ๋‹ˆ๋‹ค. execution_date ํ˜•์‹์œผ๋กœ YYYY-MM-DD: 2020-07-14. ๊ฐ€์žฅ ์ข‹์€ ์ ์€ ์ปจํ…์ŠคํŠธ ๋ณ€์ˆ˜๊ฐ€ ํŠน์ • ์ž‘์—… ์ธ์Šคํ„ด์Šค(ํŠธ๋ฆฌ ๋ณด๊ธฐ์˜ ์‚ฌ๊ฐํ˜•)์— ๊ณ ์ •๋˜๊ณ  ๋‹ค์‹œ ์‹œ์ž‘ํ•˜๋ฉด ์ž๋ฆฌ ํ‘œ์‹œ์ž๊ฐ€ ๋™์ผํ•œ ๊ฐ’์œผ๋กœ ํ™•์žฅ๋œ๋‹ค๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

ํ• ๋‹น๋œ ๊ฐ’์€ ๊ฐ ์ž‘์—… ์ธ์Šคํ„ด์Šค์˜ ๋ Œ๋”๋ง๋จ ๋ฒ„ํŠผ์„ ์‚ฌ์šฉํ•˜์—ฌ ๋ณผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ํŽธ์ง€๋ฅผ ๋ณด๋‚ด๋Š” ์ž‘์—…์€ ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

๋”ฐ๋ผ์„œ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๋Š” ์ž‘์—…์—์„œ ๋‹ค์Œ์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ์ตœ์‹  ๋ฒ„์ „์˜ ๋‚ด์žฅ ๋งคํฌ๋กœ ์ „์ฒด ๋ชฉ๋ก์€ ์—ฌ๊ธฐ์—์„œ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋งคํฌ๋กœ ์ฐธ์กฐ

๋˜ํ•œ ํ”Œ๋Ÿฌ๊ทธ์ธ์˜ ๋„์›€์œผ๋กœ ์šฐ๋ฆฌ๋Š” ์šฐ๋ฆฌ ์ž์‹ ์˜ ๋งคํฌ๋กœ๋ฅผ ์„ ์–ธํ•  ์ˆ˜ ์žˆ์ง€๋งŒ ๊ทธ๊ฒƒ์€ ๋˜ ๋‹ค๋ฅธ ์ด์•ผ๊ธฐ์ž…๋‹ˆ๋‹ค.

๋ฏธ๋ฆฌ ์ •์˜๋œ ๊ฒƒ ์™ธ์—๋„ ๋ณ€์ˆ˜์˜ ๊ฐ’์„ ๋Œ€์ฒดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค(์ด๋ฏธ ์œ„์˜ ์ฝ”๋“œ์—์„œ ์‚ฌ์šฉํ–ˆ์Šต๋‹ˆ๋‹ค). ์—์„œ ์ƒ์„ฑํ•˜์ž Admin/Variables ๋ช‡ ๊ฐ€์ง€:

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ๋ชจ๋“  ๊ฒƒ:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

๊ฐ’์€ ์Šค์นผ๋ผ์ด๊ฑฐ๋‚˜ JSON์ผ ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค. JSON์˜ ๊ฒฝ์šฐ:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

์›ํ•˜๋Š” ํ‚ค์˜ ๊ฒฝ๋กœ๋ฅผ ์‚ฌ์šฉํ•˜์‹ญ์‹œ์˜ค. {{ var.json.bot_config.bot.token }}.

๋‚˜๋Š” ๋ฌธ์ž ๊ทธ๋Œ€๋กœ ํ•œ ๋‹จ์–ด๋ฅผ ๋งํ•˜๊ณ  ํ•˜๋‚˜์˜ ์Šคํฌ๋ฆฐ ์ƒท์„ ๋ณด์—ฌ์ค๋‹ˆ๋‹ค. ์—ฐ๊ฒฐ. ์—ฌ๊ธฐ์—์„œ๋Š” ๋ชจ๋“  ๊ฒƒ์ด ๊ธฐ๋ณธ์ž…๋‹ˆ๋‹ค. ํŽ˜์ด์ง€์—์„œ Admin/Connections ์šฐ๋ฆฌ๋Š” ์—ฐ๊ฒฐ์„ ์ƒ์„ฑํ•˜๊ณ  ๊ฑฐ๊ธฐ์— ๋กœ๊ทธ์ธ/์•”ํ˜ธ ๋ฐ ๋ณด๋‹ค ๊ตฌ์ฒด์ ์ธ ๋งค๊ฐœ ๋ณ€์ˆ˜๋ฅผ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค. ์ด์™€ ๊ฐ™์ด:

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

์•”ํ˜ธ๋ฅผ ์•”ํ˜ธํ™”ํ•˜๊ฑฐ๋‚˜(๊ธฐ๋ณธ๊ฐ’๋ณด๋‹ค ๋” ์ฒ ์ €ํ•˜๊ฒŒ) ์—ฐ๊ฒฐ ์œ ํ˜•์„ ์ƒ๋žตํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค(์˜ˆ: tg_main) - ์‚ฌ์‹ค์€ ์œ ํ˜• ๋ชฉ๋ก์ด Airflow ๋ชจ๋ธ์— ๊ณ ์ •๋˜์–ด ์žˆ๊ณ  ์†Œ์Šค ์ฝ”๋“œ์— ๋“ค์–ด๊ฐ€์ง€ ์•Š๊ณ ๋Š” ํ™•์žฅํ•  ์ˆ˜ ์—†๋‹ค๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค(๊ฐ‘์ž๊ธฐ Google์— ๋ฌด์–ธ๊ฐ€๋ฅผ ๊ฒ€์ƒ‰ํ•˜์ง€ ์•Š์€ ๊ฒฝ์šฐ ์ˆ˜์ •ํ•ด ์ฃผ์„ธ์š”). ์ด๋ฆ„.

๋™์ผํ•œ ์ด๋ฆ„์œผ๋กœ ์—ฌ๋Ÿฌ ์—ฐ๊ฒฐ์„ ๋งŒ๋“ค ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด ๊ฒฝ์šฐ ๋ฉ”์„œ๋“œ๋Š” BaseHook.get_connection(), ์ด๋ฆ„์œผ๋กœ ์šฐ๋ฆฌ๋ฅผ ์—ฐ๊ฒฐ์‹œ์ผœ ์ค„ ๊ฒƒ์ž…๋‹ˆ๋‹ค. ๋ฌด์ž‘์œ„์˜ ์—ฌ๋Ÿฌ ์ด๋ฆ„์—์„œ ๋”ฐ์™”์Šต๋‹ˆ๋‹ค(Round Robin์„ ๋งŒ๋“œ๋Š” ๊ฒƒ์ด ๋” ๋…ผ๋ฆฌ์ ์ด์ง€๋งŒ Airflow ๊ฐœ๋ฐœ์ž์˜ ์–‘์‹ฌ์— ๋งก๊ธฐ๊ฒ ์Šต๋‹ˆ๋‹ค).

๋ณ€์ˆ˜ ๋ฐ ์—ฐ๊ฒฐ์€ ํ™•์‹คํžˆ ๋ฉ‹์ง„ ๋„๊ตฌ์ด์ง€๋งŒ ๊ท ํ˜•์„ ์žƒ์ง€ ์•Š๋Š” ๊ฒƒ์ด ์ค‘์š”ํ•ฉ๋‹ˆ๋‹ค. ์ฝ”๋“œ ์ž์ฒด์— ์ €์žฅํ•˜๋Š” ํ๋ฆ„ ๋ถ€๋ถ„๊ณผ ์ €์žฅ์„ ์œ„ํ•ด Airflow์— ์ œ๊ณตํ•˜๋Š” ๋ถ€๋ถ„. ํ•œํŽธ์œผ๋กœ๋Š” UI๋ฅผ ํ†ตํ•ด ์šฐํŽธํ•จ๊ณผ ๊ฐ™์€ ๊ฐ’์„ ๋น ๋ฅด๊ฒŒ ๋ณ€๊ฒฝํ•˜๋Š” ๊ฒƒ์ด ํŽธ๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋ฐ˜๋ฉด์— ์ด๊ฒƒ์€ ์—ฌ์ „ํžˆ โ€‹โ€‹์šฐ๋ฆฌ๊ฐ€ ์ œ๊ฑฐํ•˜๊ณ  ์‹ถ์—ˆ๋˜ ๋งˆ์šฐ์Šค ํด๋ฆญ์œผ๋กœ์˜ ๋ณต๊ท€์ž…๋‹ˆ๋‹ค.

์—ฐ๊ฒฐ ์ž‘์—…์€ ์ž‘์—… ์ค‘ ํ•˜๋‚˜์ž…๋‹ˆ๋‹ค. ํ›„ํฌ. ์ผ๋ฐ˜์ ์œผ๋กœ Airflow hook์€ ์ด๋ฅผ ํƒ€์‚ฌ ์„œ๋น„์Šค ๋ฐ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์— ์—ฐ๊ฒฐํ•˜๊ธฐ ์œ„ํ•œ ์ง€์ ์ž…๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด, JiraHook Jira์™€ ์ƒํ˜ธ ์ž‘์šฉํ•  ์ˆ˜ ์žˆ๋„๋ก ํด๋ผ์ด์–ธํŠธ๋ฅผ ์—ด ๊ฒƒ์ด๋ฉฐ(์ž‘์—…์„ ์•ž๋’ค๋กœ ์ด๋™ํ•  ์ˆ˜ ์žˆ์Œ) SambaHook ๋กœ์ปฌ ํŒŒ์ผ์„ smb-๊ฐ€๋ฆฌํ‚ค๋‹ค.

์‚ฌ์šฉ์ž ์ง€์ • ์—ฐ์‚ฐ์ž ๊ตฌ๋ฌธ ๋ถ„์„

๊ทธ๋ฆฌ๊ณ  ์šฐ๋ฆฌ๋Š” ๊ทธ๊ฒƒ์ด ์–ด๋–ป๊ฒŒ ๋งŒ๋“ค์–ด์ง€๋Š”์ง€ ๊ฑฐ์˜ ๋ณผ ์ˆ˜ ์žˆ์—ˆ์Šต๋‹ˆ๋‹ค. TelegramBotSendMessage

์•”ํ˜ธ commons/operators.py ์‹ค์ œ ์—ฐ์‚ฐ์ž:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

์—ฌ๊ธฐ์—์„œ Airflow์˜ ๋‹ค๋ฅธ ๋ชจ๋“  ๊ฒƒ๊ณผ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ ๋ชจ๋“  ๊ฒƒ์ด ๋งค์šฐ ๊ฐ„๋‹จํ•ฉ๋‹ˆ๋‹ค.

  • ์—์„œ ์ƒ์† BaseOperator, ๊ฝค ๋งŽ์€ Airflow ๊ด€๋ จ ์‚ฌํ•ญ์„ ๊ตฌํ˜„ํ•ฉ๋‹ˆ๋‹ค(์—ฌ๊ฐ€๋ฅผ ๋ณด์‹ญ์‹œ์˜ค).
  • ์„ ์–ธ๋œ ํ•„๋“œ template_fields, ์—ฌ๊ธฐ์„œ Jinja๋Š” ์ฒ˜๋ฆฌํ•  ๋งคํฌ๋กœ๋ฅผ ์ฐพ์Šต๋‹ˆ๋‹ค.
  • ์— ๋Œ€ํ•œ ์˜ฌ๋ฐ”๋ฅธ ์ฃผ์žฅ์„ ์ •๋ฆฌํ–ˆ์Šต๋‹ˆ๋‹ค. __init__(), ํ•„์š”ํ•œ ๊ฒฝ์šฐ ๊ธฐ๋ณธ๊ฐ’์„ ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค.
  • ์กฐ์ƒ์˜ ์ดˆ๊ธฐํ™”๋„ ์žŠ์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค.
  • ํ•ด๋‹น ํ›„ํฌ๋ฅผ ์—ด์—ˆ์Šต๋‹ˆ๋‹ค. TelegramBotHook๊ทธ๊ฒƒ์œผ๋กœ๋ถ€ํ„ฐ ํด๋ผ์ด์–ธํŠธ ๊ฐœ์ฒด๋ฅผ ๋ฐ›์•˜์Šต๋‹ˆ๋‹ค.
  • ์žฌ์ •์˜(์žฌ์ •์˜) ๋ฉ”์„œ๋“œ BaseOperator.execute(), Airfow๋Š” ์šด์˜์ž๋ฅผ ์‹œ์ž‘ํ•  ๋•Œ๊ฐ€๋˜๋ฉด ํŠธ ์œ„์น˜ํ•ฉ๋‹ˆ๋‹ค. ๋กœ๊ทธ์ธํ•˜๋Š” ๊ฒƒ์„ ์žŠ๊ณ  ๊ธฐ๋ณธ ์ž‘์—…์„ ๊ตฌํ˜„ํ•ฉ๋‹ˆ๋‹ค. (๊ทธ๋Ÿฐ๋ฐ, ๋ฐ”๋กœ ๋กœ๊ทธ์ธํ•ฉ๋‹ˆ๋‹ค. stdout ะธ stderr - ๊ธฐ๋ฅ˜๊ฐ€ ๋ชจ๋“  ๊ฒƒ์„ ๊ฐ€๋กœ๋ง‰๊ณ  ์•„๋ฆ„๋‹ต๊ฒŒ ๊ฐ์‹ธ๊ณ  ํ•„์š”ํ•œ ๊ณณ์— ๋ถ„ํ•ดํ•ฉ๋‹ˆ๋‹ค.)

์šฐ๋ฆฌ๊ฐ€ ๊ฐ€์ง„ ๊ฒƒ์„ ๋ณด์ž commons/hooks.py. ํ›„ํฌ ์ž์ฒด๊ฐ€ ํฌํ•จ๋œ ํŒŒ์ผ์˜ ์ฒซ ๋ฒˆ์งธ ๋ถ€๋ถ„:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

์—ฌ๊ธฐ์—์„œ ๋ฌด์—‡์„ ์„ค๋ช…ํ•ด์•ผํ• ์ง€์กฐ์ฐจ ๋ชจ๋ฅด๊ฒ ๊ณ  ์ค‘์š”ํ•œ ์‚ฌํ•ญ๋งŒ ์–ธ๊ธ‰ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.

  • ์šฐ๋ฆฌ๋Š” ์ƒ์†ํ•˜๊ณ  ์ธ์ˆ˜์— ๋Œ€ํ•ด ์ƒ๊ฐํ•ฉ๋‹ˆ๋‹ค. ๋Œ€๋ถ€๋ถ„์˜ ๊ฒฝ์šฐ ๋‹ค์Œ ์ค‘ ํ•˜๋‚˜์ž…๋‹ˆ๋‹ค. conn_id;
  • ํ‘œ์ค€ ๋ฐฉ๋ฒ• ์žฌ์ •์˜: ๋‚˜๋Š” ์Šค์Šค๋กœ๋ฅผ ์ œํ•œํ–ˆ๋‹ค get_conn(), ์ด๋ฆ„์œผ๋กœ ์—ฐ๊ฒฐ ๋งค๊ฐœ๋ณ€์ˆ˜๋ฅผ ๊ฐ€์ ธ์˜ค๊ณ  ์„น์…˜๋งŒ ๊ฐ€์ ธ์˜ต๋‹ˆ๋‹ค. extra (์ด๊ฒƒ์€ JSON ํ•„๋“œ์ž…๋‹ˆ๋‹ค) (๋‚ด ์ž์‹ ์˜ ์ง€์นจ์— ๋”ฐ๋ผ!) Telegram ๋ด‡ ํ† ํฐ์„ ๋„ฃ์Šต๋‹ˆ๋‹ค. {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • ๋‚˜๋Š” ์šฐ๋ฆฌ์˜ ์ธ์Šคํ„ด์Šค๋ฅผ ์ƒ์„ฑ TelegramBot, ํŠน์ • ํ† ํฐ์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

๊ทธ๊ฒŒ ๋‹ค์•ผ. ๋‹ค์Œ์„ ์‚ฌ์šฉํ•˜์—ฌ ํ›„ํฌ์—์„œ ํด๋ผ์ด์–ธํŠธ๋ฅผ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. TelegramBotHook().clent ๋˜๋Š” TelegramBotHook().get_conn().

๊ทธ๋ฆฌ๊ณ  Telegram REST API์— ๋Œ€ํ•œ ๋งˆ์ดํฌ๋กœ ๋žฉํผ๋ฅผ ๋งŒ๋“œ๋Š” ํŒŒ์ผ์˜ ๋‘ ๋ฒˆ์งธ ๋ถ€๋ถ„์€ ๋™์ผํ•œ ํ•ญ๋ชฉ์„ ๋“œ๋ž˜๊ทธํ•˜์ง€ ์•Š๋„๋ก ํ•ฉ๋‹ˆ๋‹ค. python-telegram-bot ํ•œ ๊ฐ€์ง€ ๋ฐฉ๋ฒ• sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

์˜ฌ๋ฐ”๋ฅธ ๋ฐฉ๋ฒ•์€ ๋ชจ๋‘ ๋”ํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค. TelegramBotSendMessage, TelegramBotHook, TelegramBot - ํ”Œ๋Ÿฌ๊ทธ์ธ์—์„œ ๊ณต๊ฐœ ์ €์žฅ์†Œ์— ๋„ฃ๊ณ  ์˜คํ”ˆ ์†Œ์Šค์— ์ œ๊ณตํ•˜์‹ญ์‹œ์˜ค.

์ด ๋ชจ๋“  ๊ฒƒ์„ ์—ฐ๊ตฌํ•˜๋Š” ๋™์•ˆ ๋ณด๊ณ ์„œ ์—…๋ฐ์ดํŠธ๊ฐ€ ์„ฑ๊ณต์ ์œผ๋กœ ์‹คํŒจํ•˜๊ณ  ์ฑ„๋„์—์„œ ์˜ค๋ฅ˜ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋ƒˆ์Šต๋‹ˆ๋‹ค. ์ž˜๋ชป๋œ๊ฑด ์•„๋‹Œ์ง€ ํ™•์ธํ•ด๋ด์•ผ๊ฒ ๋„ค์š”...

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ
์šฐ๋ฆฌ ๊ฐœ์—์„œ ๋ญ”๊ฐ€ ๊ณ ์žฅ๋‚ฌ์Šต๋‹ˆ๋‹ค! ๊ทธ๊ฒƒ์ด ์šฐ๋ฆฌ๊ฐ€ ๊ธฐ๋Œ€ํ–ˆ๋˜ ๊ฒƒ์ด ์•„๋‹™๋‹ˆ๊นŒ? ์ •ํ™•ํžˆ!

๋ฟŒ๋ฆด๊นŒ?

๋‚ด๊ฐ€ ๋ญ”๊ฐ€๋ฅผ ๋†“์นœ ๊ฒƒ ๊ฐ™๋‹ˆ? ๊ทธ๋Š” SQL Server์—์„œ Vertica๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ „์†กํ•˜๊ฒ ๋‹ค๊ณ  ์•ฝ์†ํ•œ ๋‹ค์Œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์™€ ์•…๋‹น์ด๋ผ๋Š” ์ฃผ์ œ์—์„œ ๋ฒ—์–ด๋‚ฌ์Šต๋‹ˆ๋‹ค!

์ด ์ž”ํ•™ ํ–‰์œ„๋Š” ์˜๋„์ ์ด์—ˆ์Šต๋‹ˆ๋‹ค. ๋‚˜๋Š” ๋‹น์‹ ์„ ์œ„ํ•ด ๋ช‡ ๊ฐ€์ง€ ์šฉ์–ด๋ฅผ ํ•ด๋…ํ•ด์•ผ ํ–ˆ์Šต๋‹ˆ๋‹ค. ์ด์ œ ๋” ๋ฉ€๋ฆฌ ๊ฐˆ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์šฐ๋ฆฌ์˜ ๊ณ„ํš์€ ์ด๋žฌ์Šต๋‹ˆ๋‹ค.

  1. ๋Œ€๊ทธ๋ฅผ ํ•˜๋‹ค
  2. ์ž‘์—… ์ƒ์„ฑ
  3. ๋ชจ๋“  ๊ฒƒ์ด ์–ผ๋งˆ๋‚˜ ์•„๋ฆ„๋‹ค์šด์ง€๋ณด์‹ญ์‹œ์˜ค
  4. ์ฑ„์šฐ๊ธฐ์— ์„ธ์…˜ ๋ฒˆํ˜ธ ํ• ๋‹น
  5. SQL Server์—์„œ ๋ฐ์ดํ„ฐ ๊ฐ€์ ธ์˜ค๊ธฐ
  6. Vertica์— ๋ฐ์ดํ„ฐ ๋„ฃ๊ธฐ
  7. ํ†ต๊ณ„ ์ˆ˜์ง‘

๊ทธ๋ž˜์„œ ์ด ๋ชจ๋“  ๊ฒƒ์„ ์‹œ์ž‘ํ•˜๊ณ  ์‹คํ–‰ํ•˜๊ธฐ ์œ„ํ•ด ์ €๋Š” ์šฐ๋ฆฌ docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

๊ฑฐ๊ธฐ์—์„œ ์šฐ๋ฆฌ๋Š” ๋‹ค์Œ์„ ์ œ๊ธฐํ•ฉ๋‹ˆ๋‹ค.

  • Vertica๋ฅผ ํ˜ธ์ŠคํŠธ๋กœ dwh ๊ฐ€์žฅ ๊ธฐ๋ณธ์ ์ธ ์„ค์ •์œผ๋กœ
  • ์„ธ ๊ฐœ์˜ SQL Server ์ธ์Šคํ„ด์Šค,
  • ์šฐ๋ฆฌ๋Š” ํ›„์ž์˜ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๋ฅผ ์ผ๋ถ€ ๋ฐ์ดํ„ฐ๋กœ ์ฑ„์›๋‹ˆ๋‹ค(์–ด๋– ํ•œ ๊ฒฝ์šฐ์—๋„ mssql_init.py!)

์šฐ๋ฆฌ๋Š” ์ง€๋‚œ ๋ฒˆ๋ณด๋‹ค ์•ฝ๊ฐ„ ๋” ๋ณต์žกํ•œ ๋ช…๋ น์˜ ๋„์›€์œผ๋กœ ๋ชจ๋“  ์ข‹์€ ๊ฒƒ์„ ์‹œ์ž‘ํ•ฉ๋‹ˆ๋‹ค.

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

์šฐ๋ฆฌ์˜ Miracle Randomizer๊ฐ€ ์ƒ์„ฑํ•œ ํ•ญ๋ชฉ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. Data Profiling/Ad Hoc Query:

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ
๊ฐ€์žฅ ์ค‘์š”ํ•œ ๊ฒƒ์€ ๋ถ„์„๊ฐ€์—๊ฒŒ ๋ณด์—ฌ์ฃผ์ง€ ์•Š๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

~์— ๋Œ€ํ•ด ์ž์„ธํ•˜๊ฒŒ ๋งํ•˜๋‹ค ETL ์„ธ์…˜ ๊ฑฐ๊ธฐ์—๋Š” ๋ชจ๋“  ๊ฒƒ์ด ์‚ฌ์†Œํ•ฉ๋‹ˆ๋‹ค. ๋ฒ ์ด์Šค๋ฅผ ๋งŒ๋“ค๊ณ  ๊ทธ ์•ˆ์— ์‚ฌ์ธ์ด ์žˆ๊ณ  ์ปจํ…์ŠคํŠธ ๊ด€๋ฆฌ์ž๋กœ ๋ชจ๋“  ๊ฒƒ์„ ๋ž˜ํ•‘ํ•˜๊ณ  ์ด์ œ ๋‹ค์Œ์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

๋•Œ๊ฐ€ ์™”๋‹ค ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘ ์šฐ๋ฆฌ์˜ XNUMX๊ฐœ ํ…Œ์ด๋ธ”์—์„œ. ๋งค์šฐ ์†Œ๋ฐ•ํ•œ ๋ผ์ธ์˜ ๋„์›€์œผ๋กœ ์ด๊ฒƒ์„ ํ•ด๋ณด์ž:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Airflow์—์„œ ์–ป์€ ํ›„ํฌ์˜ ๋„์›€์œผ๋กœ pymssql-์—ฐ๊ฒฐํ•˜๋‹ค
  2. ๋‚ ์งœ ํ˜•์‹์˜ ์ œํ•œ์„ ์š”์ฒญ์œผ๋กœ ๋Œ€์ฒดํ•ด ๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค. ์ œํ•œ์€ ํ…œํ”Œ๋ฆฟ ์—”์ง„์— ์˜ํ•ด ํ•จ์ˆ˜์— ๋˜์ ธ์ง‘๋‹ˆ๋‹ค.
  3. ์šฐ๋ฆฌ์˜ ์š”์ฒญ์„ ๊ณต๊ธ‰ pandas๋ˆ„๊ฐ€ ์šฐ๋ฆฌ๋ฅผ ์–ป์„ ๊ฒƒ์ธ๊ฐ€ DataFrame - ์•ž์œผ๋กœ ์šฐ๋ฆฌ์—๊ฒŒ ์œ ์šฉํ•  ๊ฒƒ์ž…๋‹ˆ๋‹ค.

๋‚˜๋Š” ๋Œ€์ฒด๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค {dt} ์š”์ฒญ ๋งค๊ฐœ๋ณ€์ˆ˜ ๋Œ€์‹  %s ๋‚ด๊ฐ€ ์‚ฌ์•…ํ•œ ํ”ผ๋…ธํ‚ค์˜ค๋ผ์„œ๊ฐ€ ์•„๋‹ˆ๋ผ pandas ๋Œ€์ฒ˜ํ•  ์ˆ˜ ์—†๋‹ค. pymssql ๊ทธ๋ฆฌ๊ณ  ๋งˆ์ง€๋ง‰์„ ์Šฌ์ฉ params: List๊ทธ๊ฐ€ ์ •๋ง๋กœ ์›ํ•˜์ง€๋งŒ tuple.
๊ฐœ๋ฐœ์ž๋„ ์ฐธ๊ณ ํ•˜์„ธ์š” pymssql ๋” ์ด์ƒ ๊ทธ๋ฅผ ์ง€์›ํ•˜์ง€ ์•Š๊ธฐ๋กœ ๊ฒฐ์ •ํ–ˆ๊ณ  ์ด์ œ ์ด์‚ฌ๋ฅผ ๊ฐ€์•ผ ํ•  ๋•Œ์ž…๋‹ˆ๋‹ค pyodbc.

Airflow๊ฐ€ ํ•จ์ˆ˜์˜ ์ธ์ˆ˜๋ฅผ ๋ฌด์—‡์œผ๋กœ ์ฑ„์› ๋Š”์ง€ ์‚ดํŽด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

Apache Airflow: ETL์„ ๋” ์‰ฝ๊ฒŒ ๋งŒ๋“ค๊ธฐ

๋ฐ์ดํ„ฐ๊ฐ€ ์—†์œผ๋ฉด ๊ณ„์†ํ•  ํ•„์š”๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ์ฑ„์šฐ๊ธฐ๊ฐ€ ์„ฑ๊ณต์ ์ด๋ผ๊ณ  ์ƒ๊ฐํ•˜๋Š” ๊ฒƒ๋„ ์ด์ƒํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ์ด๊ฒƒ์€ ์‹ค์ˆ˜๊ฐ€ ์•„๋‹™๋‹ˆ๋‹ค. ์•„์•„์•„, ์–ด๋–กํ•ด?! ๋‹ค์Œ์€ ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException Airflow์— ์˜ค๋ฅ˜๊ฐ€ ์—†๋‹ค๊ณ  ์•Œ๋ ค์ฃผ์ง€๋งŒ ์ž‘์—…์„ ๊ฑด๋„ˆ๋œ๋‹ˆ๋‹ค. ์ธํ„ฐํŽ˜์ด์Šค์—๋Š” ๋…น์ƒ‰ ๋˜๋Š” ๋นจ๊ฐ„์ƒ‰ ์‚ฌ๊ฐํ˜•์ด ์•„๋‹ˆ๋ผ ๋ถ„ํ™์ƒ‰์ด ์žˆ์Šต๋‹ˆ๋‹ค.

๋ฐ์ดํ„ฐ๋ฅผ ๋˜์ง€์ž ์—ฌ๋Ÿฌ ์—ด:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

์ฆ‰

  • ์šฐ๋ฆฌ๊ฐ€ ์ฃผ๋ฌธ์„ ๋ฐ›์€ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค,
  • ํ”Œ๋Ÿฌ๋”ฉ ์„ธ์…˜์˜ ID(๋‹ค๋ฅผ ๊ฒƒ์ž…๋‹ˆ๋‹ค. ๋ชจ๋“  ์ž‘์—…์— ๋Œ€ํ•ด),
  • ์†Œ์Šค ๋ฐ ์ฃผ๋ฌธ ID์˜ ํ•ด์‹œ - ์ตœ์ข… ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค(๋ชจ๋“  ๊ฒƒ์ด ํ•˜๋‚˜์˜ ํ…Œ์ด๋ธ”์— ์Ÿ์•„์ง€๋Š” ๊ณณ)์—์„œ ๊ณ ์œ ํ•œ ์ฃผ๋ฌธ ID๋ฅผ ๊ฐ–๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.

๋งˆ์ง€๋ง‰ ๋‘ ๋ฒˆ์งธ ๋‹จ๊ณ„๊ฐ€ ๋‚จ์•„ ์žˆ์Šต๋‹ˆ๋‹ค. ๋ชจ๋“  ๊ฒƒ์„ Vertica์— ๋ถ“์Šต๋‹ˆ๋‹ค. ๊ทธ๋ฆฌ๊ณ  ์ด์ƒํ•˜๊ฒŒ๋„ ์ด๋ฅผ ์ˆ˜ํ–‰ํ•˜๋Š” ๊ฐ€์žฅ ํ›Œ๋ฅญํ•˜๊ณ  ํšจ์œจ์ ์ธ ๋ฐฉ๋ฒ• ์ค‘ ํ•˜๋‚˜๋Š” CSV๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. ์šฐ๋ฆฌ๋Š” ํŠน๋ณ„ํ•œ ์ˆ˜์‹ ๊ธฐ๋ฅผ ๋งŒ๋“ค๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค StringIO.
  2. pandas ์นœ์ ˆํ•˜๊ฒŒ ์šฐ๋ฆฌ์˜ DataFrame ์˜ ํ˜•ํƒœ๋กœ CSV-์œค๊ณฝ.
  3. ํ›„ํฌ๋กœ ์šฐ๋ฆฌ๊ฐ€ ๊ฐ€์žฅ ์ข‹์•„ํ•˜๋Š” Vertica์— ๋Œ€ํ•œ ์—ฐ๊ฒฐ์„ ์—ด์–ด ๋ด…์‹œ๋‹ค.
  4. ์ด์ œ ๋„์›€์„ ๋ฐ›์•„ copy() ๋ฐ์ดํ„ฐ๋ฅผ Vertika๋กœ ์ง์ ‘ ๋ณด๋‚ด์‹ญ์‹œ์˜ค!

์šฐ๋ฆฌ๋Š” ๋“œ๋ผ์ด๋ฒ„์—์„œ ์–ผ๋งˆ๋‚˜ ๋งŽ์€ ๋ผ์ธ์ด ์ฑ„์›Œ์กŒ๋Š”์ง€ ๊ฐ€์ ธ์˜ค๊ณ  ์„ธ์…˜ ๊ด€๋ฆฌ์ž์—๊ฒŒ ๋ชจ๋“  ๊ฒƒ์ด ์ •์ƒ์ด๋ผ๊ณ  ์•Œ๋ฆด ๊ฒƒ์ž…๋‹ˆ๋‹ค.

session.loaded_rows = cursor.rowcount
session.successful = True

๊ทธ๊ฒŒ ๋‹ค์•ผ.

ํŒ๋งค์‹œ ํƒ€๊ฒŸ ํ”Œ๋ ˆ์ดํŠธ๋ฅผ ์ˆ˜๋™์œผ๋กœ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. ์—ฌ๊ธฐ์—์„œ ๋‚˜๋Š” ์ž‘์€ ๊ธฐ๊ณ„๋ฅผ ํ—ˆ์šฉํ–ˆ์Šต๋‹ˆ๋‹ค.

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

๋‚ด๊ฐ€ ์‚ฌ์šฉํ•˜๊ณ  VerticaOperator() ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์Šคํ‚ค๋งˆ์™€ ํ…Œ์ด๋ธ”์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค(๋ฌผ๋ก  ์•„์ง ์กด์žฌํ•˜์ง€ ์•Š๋Š” ๊ฒฝ์šฐ). ๊ฐ€์žฅ ์ค‘์š”ํ•œ ๊ฒƒ์€ ์ข…์†์„ฑ์„ ์˜ฌ๋ฐ”๋ฅด๊ฒŒ ์ •๋ ฌํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

ํ•ฉ์‚ฐ

-๊ธ€์Ž„,-์ž‘์€ ์ฅ๊ฐ€ ๋งํ–ˆ๋‹ค.-๊ทธ๋ ‡์ง€, ์ง€๊ธˆ
๋‚ด๊ฐ€ ์ˆฒ์—์„œ ๊ฐ€์žฅ ๋”์ฐํ•œ ๋™๋ฌผ์ด๋ผ๊ณ  ํ™•์‹ ํ•ฉ๋‹ˆ๊นŒ?

์ค„๋ฆฌ์•„ ๋„๋‚ ๋“œ์Šจ, Gruffalo

๋™๋ฃŒ๋“ค๊ณผ ๊ฒฝ์Ÿ์ด ์žˆ์—ˆ๋‹ค๋ฉด ์ฒ˜์Œ๋ถ€ํ„ฐ ETL ํ”„๋กœ์„ธ์Šค๋ฅผ ์‹ ์†ํ•˜๊ฒŒ ์ƒ์„ฑํ•˜๊ณ  ์‹œ์ž‘ํ•  ์‚ฌ๋žŒ : ๊ทธ๋“ค์€ SSIS์™€ ๋งˆ์šฐ์Šค๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  Airflow๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ... ๊ทธ๋ฆฌ๊ณ  ์œ ์ง€ ๊ด€๋ฆฌ์˜ ์šฉ์ด์„ฑ์„ ๋น„๊ตํ•  ๊ฒƒ์ž…๋‹ˆ๋‹ค ... ์™€์šฐ, ๋‚ด๊ฐ€ ๋ชจ๋“  ๋ฉด์—์„œ ๊ทธ๋“ค์„ ์ด๊ธธ ๊ฒƒ์ด๋ผ๋Š” ๋ฐ ๋™์˜ํ•˜์‹ค ๊ฒƒ ๊ฐ™์•„์š”!

์กฐ๊ธˆ ๋” ์ง„์ง€ํ•˜๊ฒŒ ๋งํ•˜์ž๋ฉด, ํ”„๋กœ๊ทธ๋žจ ์ฝ”๋“œ์˜ ํ˜•ํƒœ๋กœ ํ”„๋กœ์„ธ์Šค๋ฅผ ์„ค๋ช…ํ•จ์œผ๋กœ์จ Apache Airflow๊ฐ€ ์ œ ์ผ์„ ํ–ˆ์Šต๋‹ˆ๋‹ค. ๋งŽ์ด ๋” ํŽธํ•˜๊ณ  ์ฆ๊ฒ๊ฒŒ.

ํ”Œ๋Ÿฌ๊ทธ์ธ ๋ฐ ํ™•์žฅ์„ฑ ์ธก๋ฉด์—์„œ ๋ฌด์ œํ•œ ํ™•์žฅ์„ฑ์€ ๊ฑฐ์˜ ๋ชจ๋“  ์˜์—ญ์—์„œ Airflow๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ๊ธฐํšŒ๋ฅผ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค. ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘, ์ค€๋น„ ๋ฐ ์ฒ˜๋ฆฌ์˜ ์ „์ฒด ์ฃผ๊ธฐ, ๋กœ์ผ“ ๋ฐœ์‚ฌ(ํ™”์„ฑ, ๊ฐ•์˜).

ํŒŒํŠธ ์ตœ์ข…, ์ฐธ์กฐ ๋ฐ ์ •๋ณด

์šฐ๋ฆฌ๊ฐ€ ๋‹น์‹ ์„ ์œ„ํ•ด ๋ชจ์€ ๊ฐˆํ€ด

  • start_date. ์˜ˆ, ์ด๊ฒƒ์€ ์ด๋ฏธ ์ง€์—ญ ๋ฐˆ์ž…๋‹ˆ๋‹ค. Doug์˜ ์ฃผ์š” ์ฃผ์žฅ์„ ํ†ตํ•ด start_date ๋ชจ๋‘ ํ†ต๊ณผ ํ•จ. ๊ฐ„๋‹จํžˆ ๋งํ•ด์„œ, ๋‹น์‹ ์ด ์ง€์ •ํ•˜๋Š” ๊ฒฝ์šฐ start_date ํ˜„์žฌ ๋‚ ์งœ ๋ฐ schedule_interval - ์–ธ์  ๊ฐ€๋Š” DAG๊ฐ€ ๋‚ด์ผ ๋” ์ผ์ฐ ์‹œ์ž‘๋˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    ๊ทธ๋ฆฌ๊ณ  ๋” ์ด์ƒ ๋ฌธ์ œ๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.

    ์ด์™€ ๊ด€๋ จ๋œ ๋˜ ๋‹ค๋ฅธ ๋Ÿฐํƒ€์ž„ ์˜ค๋ฅ˜๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. Task is missing the start_date parameter, ์ด๊ฒƒ์€ dag ์—ฐ์‚ฐ์ž์— ๋ฐ”์ธ๋”ฉํ•˜๋Š” ๊ฒƒ์„ ์žŠ์—ˆ๋‹ค๋Š” ๊ฒƒ์„ ๊ฐ€์žฅ ์ž์ฃผ ๋‚˜ํƒ€๋ƒ…๋‹ˆ๋‹ค.

  • ๋ชจ๋‘ ํ•˜๋‚˜์˜ ๊ธฐ๊ณ„์— ์žˆ์Šต๋‹ˆ๋‹ค. ์˜ˆ, ๋ฒ ์ด์Šค(Airflow ์ž์ฒด ๋ฐ ์ฝ”ํŒ…), ์›น ์„œ๋ฒ„, ์Šค์ผ€์ค„๋Ÿฌ ๋ฐ ์ž‘์—…์ž. ๊ทธ๋ฆฌ๊ณ  ๊ทธ๊ฒƒ์€ ํšจ๊ณผ๊ฐ€ ์žˆ์—ˆ์Šต๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ์‹œ๊ฐ„์ด ์ง€๋‚จ์— ๋”ฐ๋ผ ์„œ๋น„์Šค์— ๋Œ€ํ•œ ์ž‘์—… ์ˆ˜๊ฐ€ ์ฆ๊ฐ€ํ–ˆ๊ณ  PostgreSQL์ด 20ms๊ฐ€ ์•„๋‹Œ 5s์—์„œ ์ธ๋ฑ์Šค์— ์‘๋‹ตํ•˜๊ธฐ ์‹œ์ž‘ํ–ˆ์„ ๋•Œ ์šฐ๋ฆฌ๋Š” ๊ทธ๊ฒƒ์„ ๊ฐ€์ ธ ์™€์„œ ๊ฐ€์ ธ๊ฐ”์Šต๋‹ˆ๋‹ค.
  • ๋กœ์ปฌ ์‹คํ–‰์ž. ์˜ˆ, ์šฐ๋ฆฌ๋Š” ์—ฌ์ „ํžˆ ๊ทธ ์œ„์— ์•‰์•„ ์žˆ๊ณ  ์ด๋ฏธ ์‹ฌ์—ฐ์˜ ๊ฐ€์žฅ์ž๋ฆฌ์— ์™”์Šต๋‹ˆ๋‹ค. ์ง€๊ธˆ๊นŒ์ง€๋Š” LocalExecutor๋กœ๋„ ์ถฉ๋ถ„ํ–ˆ์ง€๋งŒ ์ด์ œ ์ ์–ด๋„ ํ•œ ๋ช…์˜ ์ž‘์—…์ž๋กœ ํ™•์žฅํ•ด์•ผ ํ•  ๋•Œ์ด๋ฉฐ CeleryExecutor๋กœ ์ด๋™ํ•˜๊ธฐ ์œ„ํ•ด ์—ด์‹ฌํžˆ ๋…ธ๋ ฅํ•ด์•ผ ํ•  ๊ฒƒ์ž…๋‹ˆ๋‹ค. ๊ทธ๋ฆฌ๊ณ  ํ•˜๋‚˜์˜ ์‹œ์Šคํ…œ์—์„œ ์ž‘์—…ํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ์‚ฌ์‹ค์„ ๊ณ ๋ คํ•  ๋•Œ ์„œ๋ฒ„์—์„œ๋„ ์…€๋Ÿฌ๋ฆฌ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์„ ๋ง‰์„ ์ˆ˜ ์žˆ๋Š” ๊ฒƒ์€ ์—†์Šต๋‹ˆ๋‹ค.
  • ๋ฏธ์‚ฌ์šฉ ๋‚ด์žฅ ๋„๊ตฌ:
    • ์—ฐ๊ฒฐ ์„œ๋น„์Šค ์ž๊ฒฉ ์ฆ๋ช…์„ ์ €์žฅํ•˜๊ธฐ ์œ„ํ•ด
    • SLA ๋ˆ„๋ฝ ์ œ์‹œ๊ฐ„์— ํ•ด๊ฒฐ๋˜์ง€ ์•Š์€ ์ž‘์—…์— ์‘๋‹ตํ•˜๊ธฐ ์œ„ํ•ด
    • ์—‘์Šค์ปด ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ ๊ตํ™˜์„ ์œ„ํ•ด (๋‚ด๊ฐ€ ๋งํ–ˆ๋‹ค ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ!) dag ์ž‘์—… ์‚ฌ์ด.
  • ๋ฉ”์ผ ๋‚จ์šฉ. ๊ธ€์Ž„, ๋‚ด๊ฐ€ ๋ญ๋ผ๊ณ  ๋งํ•  ์ˆ˜ ์žˆ๋‹ˆ? ๋„˜์–ด์ง„ ์ž‘์—…์˜ ๋ชจ๋“  ๋ฐ˜๋ณต์— ๋Œ€ํ•ด ๊ฒฝ๊ณ ๊ฐ€ ์„ค์ •๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ์ด์ œ ๋‚ด ์ž‘์—… Gmail์—๋Š” Airflow์—์„œ ๋ณด๋‚ธ 90๊ฐœ ์ด์ƒ์˜ ์ด๋ฉ”์ผ์ด ์žˆ๊ณ  ์›น ๋ฉ”์ผ ์ด๊ตฌ๋Š” ํ•œ ๋ฒˆ์— 100๊ฐœ ์ด์ƒ์„ ์„ ํƒํ•˜๊ณ  ์‚ญ์ œํ•˜๋Š” ๊ฒƒ์„ ๊ฑฐ๋ถ€ํ•ฉ๋‹ˆ๋‹ค.

๋” ๋งŽ์€ ํ•จ์ •: Apache Airflow ํ•จ์ •

๋” ๋งŽ์€ ์ž๋™ํ™” ๋„๊ตฌ

์šฐ๋ฆฌ๊ฐ€ ์†์ด ์•„๋‹Œ ๋จธ๋ฆฌ๋กœ ๋” ๋งŽ์ด ์ผํ•  ์ˆ˜ ์žˆ๋„๋ก Airflow๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ค€๋น„ํ–ˆ์Šต๋‹ˆ๋‹ค.

  • REST API - ๊ทธ๋Š” ์—ฌ์ „ํžˆ ์ž‘์—…์„ ๋ฐฉํ•ดํ•˜์ง€ ์•Š๋Š” ์‹คํ—˜์  ์ƒํƒœ๋ฅผ ์œ ์ง€ํ•ฉ๋‹ˆ๋‹ค. ์ด๋ฅผ ํ†ตํ•ด dag ๋ฐ ์ž‘์—…์— ๋Œ€ํ•œ ์ •๋ณด๋ฅผ ์–ป์„ ์ˆ˜ ์žˆ์„ ๋ฟ๋งŒ ์•„๋‹ˆ๋ผ dag๋ฅผ ์ค‘์ง€/์‹œ์ž‘ํ•˜๊ณ  DAG ์‹คํ–‰ ๋˜๋Š” ํ’€์„ ๋งŒ๋“ค ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
  • CLI - WebUI๋ฅผ ํ†ตํ•ด ์‚ฌ์šฉํ•˜๊ธฐ ๋ถˆํŽธํ•  ๋ฟ๋งŒ ์•„๋‹ˆ๋ผ ์ผ๋ฐ˜์ ์œผ๋กœ ์—†๋Š” ๋งŽ์€ ๋„๊ตฌ๋ฅผ ๋ช…๋ น์ค„์„ ํ†ตํ•ด ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด:
    • backfill ์ž‘์—… ์ธ์Šคํ„ด์Šค๋ฅผ ๋‹ค์‹œ ์‹œ์ž‘ํ•˜๋Š” ๋ฐ ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค.
      ์˜ˆ๋ฅผ ๋“ค์–ด, ๋ถ„์„๊ฐ€๋“ค์ด ์™€์„œ ์ด๋ ‡๊ฒŒ ๋งํ–ˆ์Šต๋‹ˆ๋‹ค. ๊ณ ์ณ๋ผ, ๊ณ ์ณ๋ผ, ๊ณ ์ณ๋ผ, ๊ณ ์ณ๋ผ!" ๊ทธ๋ฆฌ๊ณ  ๋‹น์‹ ์€ ๊ทธ๋Ÿฐ ํ˜ธ๋ธŒ์ž…๋‹ˆ๋‹ค.

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • ๊ธฐ๋ณธ ์„œ๋น„์Šค: initdb, resetdb, upgradedb, checkdb.
    • run, ํ•˜๋‚˜์˜ ์ธ์Šคํ„ด์Šค ์ž‘์—…์„ ์‹คํ–‰ํ•˜๊ณ  ๋ชจ๋“  ์ข…์†์„ฑ์— ๋Œ€ํ•ด ์ ์ˆ˜๋ฅผ ๋งค๊ธธ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋˜ํ•œ ๋‹ค์Œ์„ ํ†ตํ•ด ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. LocalExecutor, ์…€๋Ÿฌ๋ฆฌ ํด๋Ÿฌ์Šคํ„ฐ๊ฐ€ ์žˆ๋Š” ๊ฒฝ์šฐ์—๋„ ๋งˆ์ฐฌ๊ฐ€์ง€์ž…๋‹ˆ๋‹ค.
    • ๊ฑฐ์˜ ๊ฐ™์€ ์ผ์„ ํ•œ๋‹ค test, ๊ธฐ์ง€์—์„œ๋งŒ ์•„๋ฌด๊ฒƒ๋„ ์“ฐ์ง€ ์•Š์Šต๋‹ˆ๋‹ค.
    • connections ์…ธ์—์„œ ์—ฐ๊ฒฐ์„ ๋Œ€๋Ÿ‰์œผ๋กœ ์ƒ์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
  • ํŒŒ์ด์ฌ API -ํ”Œ๋Ÿฌ๊ทธ์ธ์„ ์œ„ํ•œ ๋‹ค์†Œ ํ•˜๋“œ์ฝ”์–ดํ•œ ์ƒํ˜ธ ์ž‘์šฉ ๋ฐฉ์‹์ด๋ฉฐ ์ž‘์€ ์†์œผ๋กœ ๋ฌด๋ฆฌ๋ฅผ ์ง“์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ํ•˜์ง€๋งŒ ๋ˆ„๊ฐ€ ์šฐ๋ฆฌ๊ฐ€ ๊ฐ€๋Š” ๊ฑธ ๋ง‰๊ฒ ์–ด /home/airflow/dags, ๋‹ฌ๋ฆฌ๋‹ค ipython ๊ทธ๋ฆฌ๊ณ  ์žฅ๋‚œ์„ ์‹œ์ž‘? ์˜ˆ๋ฅผ ๋“ค์–ด ๋‹ค์Œ ์ฝ”๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ชจ๋“  ์—ฐ๊ฒฐ์„ ๋‚ด๋ณด๋‚ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Airflow ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์—ฐ๊ฒฐ ์ค‘์ž…๋‹ˆ๋‹ค. ์—ฌ๊ธฐ์— ์ž‘์„ฑํ•˜๋Š” ๊ฒƒ์€ ๊ถŒ์žฅํ•˜์ง€ ์•Š์ง€๋งŒ ๋‹ค์–‘ํ•œ ํŠน์ • ๋ฉ”ํŠธ๋ฆญ์— ๋Œ€ํ•œ ์ž‘์—… ์ƒํƒœ๋ฅผ ๊ฐ€์ ธ์˜ค๋Š” ๊ฒƒ์ด API๋ฅผ ํ†ตํ•˜๋Š” ๊ฒƒ๋ณด๋‹ค ํ›จ์”ฌ ๋น ๋ฅด๊ณ  ์‰ฌ์šธ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

    ๋ชจ๋“  ์ž‘์—…์ด idempotent๋Š” ์•„๋‹ˆ์ง€๋งŒ ๋•Œ๋•Œ๋กœ ๋–จ์–ด์งˆ ์ˆ˜ ์žˆ์œผ๋ฉฐ ์ด๋Š” ์ •์ƒ์ž…๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ๋ช‡ ๊ฐ€์ง€ ๋ง‰ํž˜์€ ์ด๋ฏธ ์˜์‹ฌ์Šค๋Ÿฝ๊ณ  ํ™•์ธ์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค.

    SQL์— ์ฃผ์˜ํ•˜์„ธ์š”!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

์ฐธ์กฐ

๋ฌผ๋ก  Google ๋ฐœํ–‰์˜ ์ฒ˜์Œ XNUMX๊ฐœ ๋งํฌ๋Š” ๋‚ด ์ฑ…๊ฐˆํ”ผ์˜ Airflow ํด๋” ๋‚ด์šฉ์ž…๋‹ˆ๋‹ค.

๊ทธ๋ฆฌ๊ณ  ๊ธฐ์‚ฌ์— ์‚ฌ์šฉ๋œ ๋งํฌ:

์ถœ์ฒ˜ : habr.com