MLflow๋กœ ์ŠคํŒŒํฌ ํ™•์žฅ

์•ˆ๋…•ํ•˜์„ธ์š”, Khabrovites. ์šฐ๋ฆฌ๊ฐ€ ์ด๋ฏธ ์“ด ๊ฒƒ์ฒ˜๋Ÿผ ์ด๋ฒˆ ๋‹ฌ OTUS๋Š” ๊ธฐ๊ณ„ ํ•™์Šต์— ๋Œ€ํ•œ ๋‘ ๊ฐ€์ง€ ๊ณผ์ •์„ ํ•œ ๋ฒˆ์— ์‹œ์ž‘ํ•ฉ๋‹ˆ๋‹ค. ๊ธฐ๋ณธ ะธ ๊ณ ๊ธ‰์˜. ์ด์™€ ๊ด€๋ จํ•˜์—ฌ ์œ ์šฉํ•œ ์ž๋ฃŒ๋ฅผ ๊ณ„์† ๊ณต์œ ํ•ฉ๋‹ˆ๋‹ค.

์ด ๊ธฐ์‚ฌ์˜ ๋ชฉ์ ์€ ์šฐ๋ฆฌ์˜ ์ฒซ ๋ฒˆ์งธ ๊ฒฝํ—˜์— ๋Œ€ํ•ด ์ด์•ผ๊ธฐํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค. MLํ”Œ๋กœ์šฐ.

๋ฆฌ๋ทฐ ์‹œ์ž‘ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค MLํ”Œ๋กœ์šฐ ์ถ”์  ์„œ๋ฒ„์—์„œ ์—ฐ๊ตฌ์˜ ๋ชจ๋“  ๋ฐ˜๋ณต์„ ํ”„๋กค๋กœ๊ทธํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฐ ๋‹ค์Œ UDF๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Spark์™€ MLflow๋ฅผ ์—ฐ๊ฒฐํ•œ ๊ฒฝํ—˜์„ ๊ณต์œ ํ•ฉ๋‹ˆ๋‹ค.

๋ฌธ๋งฅ

์šฐ๋ฆฌ๋Š” ์•ŒํŒŒ ๊ฑด๊ฐ• ์šฐ๋ฆฌ๋Š” ๊ธฐ๊ณ„ ํ•™์Šต๊ณผ ์ธ๊ณต ์ง€๋Šฅ์„ ์‚ฌ์šฉํ•˜์—ฌ ์‚ฌ๋žŒ๋“ค์ด ์ž์‹ ์˜ ๊ฑด๊ฐ•๊ณผ ์›ฐ๋น™์„ ๋Œ๋ณผ ์ˆ˜ ์žˆ๋„๋ก ์ง€์›ํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋ ‡๊ธฐ ๋•Œ๋ฌธ์— ์šฐ๋ฆฌ๊ฐ€ ๊ฐœ๋ฐœํ•˜๋Š” ๋ฐ์ดํ„ฐ ์ œํ’ˆ์˜ ํ•ต์‹ฌ์—๋Š” ๋จธ์‹  ๋Ÿฌ๋‹ ๋ชจ๋ธ์ด ์žˆ๊ณ , ๋จธ์‹  ๋Ÿฌ๋‹ ์ˆ˜๋ช… ์ฃผ๊ธฐ์˜ ๋ชจ๋“  ์ธก๋ฉด์„ ํฌ๊ด„ํ•˜๋Š” ์˜คํ”ˆ ์†Œ์Šค ํ”Œ๋žซํผ์ธ MLflow๊ฐ€ ์šฐ๋ฆฌ์˜ ์ฃผ๋ชฉ์„ ๋ฐ›์€ ์ด์œ ์ž…๋‹ˆ๋‹ค.

MLํ”Œ๋กœ์šฐ

MLflow์˜ ์ฃผ์š” ๋ชฉํ‘œ๋Š” ๋ฐ์ดํ„ฐ ๊ณผํ•™์ž๊ฐ€ ๊ฑฐ์˜ ๋ชจ๋“  ๊ธฐ๊ณ„ ํ•™์Šต ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋กœ ์ž‘์—…ํ•  ์ˆ˜ ์žˆ๋„๋ก ๊ธฐ๊ณ„ ํ•™์Šต ์œ„์— ์ถ”๊ฐ€ ๊ณ„์ธต์„ ์ œ๊ณตํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.h2o, ์ผ€ ๋ผ์Šค, ๋„์•ฝ, ํŒŒ์ด ํ† ์น˜, kle ๋‹ค ะธ ํ…์„œ ํ๋ฆ„), ๊ทธ๋…€์˜ ์ž‘์—…์„ ๋‹ค์Œ ๋‹จ๊ณ„๋กœ ๋Œ์–ด ์˜ฌ๋ฆฝ๋‹ˆ๋‹ค.

MLflow๋Š” ์„ธ ๊ฐ€์ง€ ๊ตฌ์„ฑ ์š”์†Œ๋ฅผ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

  • ์ถ”์  โ€“ ์‹คํ—˜ ๊ธฐ๋ก ๋ฐ ์š”์ฒญ: ์ฝ”๋“œ, ๋ฐ์ดํ„ฐ, ๊ตฌ์„ฑ ๋ฐ ๊ฒฐ๊ณผ. ๋ชจ๋ธ์„ ๋งŒ๋“œ๋Š” ๊ณผ์ •์„ ๋”ฐ๋ฅด๋Š” ๊ฒƒ์ด ๋งค์šฐ ์ค‘์š”ํ•ฉ๋‹ˆ๋‹ค.
  • ํ”„๋กœ์ ํŠธ โ€“ ๋ชจ๋“  ํ”Œ๋žซํผ์—์„œ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋Š” ํŒจํ‚ค์ง• ํ˜•์‹(์˜ˆ: ์„ธ์ด์ง€ ๋ฉ”์ด์ปค)
  • ๋ชจ๋ธ ๋‹ค์–‘ํ•œ ๋ฐฐํฌ ๋„๊ตฌ์— ๋ชจ๋ธ์„ ์ œ์ถœํ•˜๊ธฐ ์œ„ํ•œ ์ผ๋ฐ˜์ ์ธ ํ˜•์‹์ž…๋‹ˆ๋‹ค.

MLflow(์ž‘์„ฑ ๋‹น์‹œ ์•ŒํŒŒ)๋Š” ์‹คํ—˜, ์žฌ์‚ฌ์šฉ ๋ฐ ๋ฐฐํฌ๋ฅผ ํฌํ•จํ•˜์—ฌ ๊ธฐ๊ณ„ ํ•™์Šต ์ˆ˜๋ช… ์ฃผ๊ธฐ๋ฅผ ๊ด€๋ฆฌํ•  ์ˆ˜ ์žˆ๋Š” ์˜คํ”ˆ ์†Œ์Šค ํ”Œ๋žซํผ์ž…๋‹ˆ๋‹ค.

MLflow ์„ค์ •

MLflow๋ฅผ ์‚ฌ์šฉํ•˜๋ ค๋ฉด ๋จผ์ € ์ „์ฒด Python ํ™˜๊ฒฝ์„ ์„ค์ •ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. PyEnv (Mac์— Python์„ ์„ค์น˜ํ•˜๋ ค๋ฉด ์—ฌ๊ธฐ์—). ๋”ฐ๋ผ์„œ ์‹คํ–‰์— ํ•„์š”ํ•œ ๋ชจ๋“  ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ์„ค์น˜ํ•  ๊ฐ€์ƒ ํ™˜๊ฒฝ์„ ๋งŒ๋“ค ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

ํ•„์š”ํ•œ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ์„ค์น˜ํ•ฉ๋‹ˆ๋‹ค.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

์ฐธ๊ณ : UDF์™€ ๊ฐ™์€ ๋ชจ๋ธ์„ ์‹คํ–‰ํ•˜๊ธฐ ์œ„ํ•ด PyArrow๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ์ตœ์‹  ๋ฒ„์ „์ด ์„œ๋กœ ์ถฉ๋Œํ•˜์—ฌ PyArrow์™€ Numpy์˜ ๋ฒ„์ „์„ ์ˆ˜์ •ํ•ด์•ผ ํ–ˆ์Šต๋‹ˆ๋‹ค.

์ถ”์  UI ์‹คํ–‰

MLflow ์ถ”์ ์„ ํ†ตํ•ด Python์œผ๋กœ ์‹คํ—˜์„ ๊ธฐ๋กํ•˜๊ณ  ์ฟผ๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. REST API. ๋˜ํ•œ ๋ชจ๋ธ ์•„ํ‹ฐํŒฉํŠธ๋ฅผ ์ €์žฅํ•  ์œ„์น˜๋ฅผ ์ •์˜ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค(localhost, ์•„๋งˆ์กด S3, Azure Blob ์ €์žฅ์†Œ, Google ํด๋ผ์šฐ๋“œ ์ €์žฅ์†Œ ๋˜๋Š” SFTP ์„œ๋ฒ„). Alpha Health์—์„œ AWS๋ฅผ ์‚ฌ์šฉํ•˜๋ฏ€๋กœ S3๊ฐ€ ์•„ํ‹ฐํŒฉํŠธ์˜ ์Šคํ† ๋ฆฌ์ง€๊ฐ€ ๋ฉ๋‹ˆ๋‹ค.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow๋Š” ์˜๊ตฌ ํŒŒ์ผ ์ €์žฅ์†Œ๋ฅผ ์‚ฌ์šฉํ•  ๊ฒƒ์„ ๊ถŒ์žฅํ•ฉ๋‹ˆ๋‹ค. ํŒŒ์ผ ์ €์žฅ์†Œ๋Š” ์„œ๋ฒ„๊ฐ€ ์‹คํ–‰ ๋ฐ ์‹คํ—˜ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•˜๋Š” ๊ณณ์ž…๋‹ˆ๋‹ค. ์„œ๋ฒ„๋ฅผ ์‹œ์ž‘ํ•  ๋•Œ ์„œ๋ฒ„๊ฐ€ ์˜๊ตฌ ํŒŒ์ผ ์ €์žฅ์†Œ๋ฅผ ๊ฐ€๋ฆฌํ‚ค๋Š”์ง€ ํ™•์ธํ•˜์‹ญ์‹œ์˜ค. ์—ฌ๊ธฐ์„œ๋Š” ์‹คํ—˜์„ ์œ„ํ•ด ๊ฐ„๋‹จํžˆ /tmp.

mlflow ์„œ๋ฒ„๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ด์ „ ์‹คํ—˜์„ ์‹คํ–‰ํ•˜๋ ค๋ฉด ํŒŒ์ผ ์ €์žฅ์†Œ์— ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ์ด๊ฒƒ์ด ์—†์–ด๋„ ๋ชจ๋ธ์— ๋Œ€ํ•œ ๊ฒฝ๋กœ๋งŒ ํ•„์š”ํ•˜๋ฏ€๋กœ UDF์—์„œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ฐธ๊ณ : ์ถ”์  UI ๋ฐ ๋ชจ๋ธ ํด๋ผ์ด์–ธํŠธ๋Š” ์•„ํ‹ฐํŒฉํŠธ์˜ ์œ„์น˜์— ์•ก์„ธ์Šคํ•  ์ˆ˜ ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ์ฆ‰, ์ถ”์  UI๊ฐ€ EC2 ์ธ์Šคํ„ด์Šค์— ์žˆ๋‹ค๋Š” ์‚ฌ์‹ค์— ๊ด€๊ณ„์—†์ด MLflow๋ฅผ ๋กœ์ปฌ๋กœ ์‹คํ–‰ํ•  ๋•Œ ์ปดํ“จํ„ฐ๋Š” ์•„ํ‹ฐํŒฉํŠธ ๋ชจ๋ธ์„ ์ž‘์„ฑํ•˜๊ธฐ ์œ„ํ•ด S3์— ์ง์ ‘ ์•ก์„ธ์Šคํ•  ์ˆ˜ ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

MLflow๋กœ ์ŠคํŒŒํฌ ํ™•์žฅ
์ถ”์  UI๋Š” S3 ๋ฒ„ํ‚ท์— ์•„ํ‹ฐํŒฉํŠธ๋ฅผ ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.

๋ชจ๋ธ ์‹คํ–‰

์ถ”์  ์„œ๋ฒ„๊ฐ€ ์‹คํ–‰๋˜๋Š” ์ฆ‰์‹œ ๋ชจ๋ธ ๊ต์œก์„ ์‹œ์ž‘ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด MLflow ์˜ˆ์ œ์—์„œ ์™€์ธ ์ˆ˜์ •์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ์Šคํด๋Ÿฐ.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

์•ž์„œ ๋งํ–ˆ๋“ฏ์ด MLflow๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋งค๊ฐœ ๋ณ€์ˆ˜, ๋ฉ”ํŠธ๋ฆญ ๋ฐ ๋ชจ๋ธ ์•„ํ‹ฐํŒฉํŠธ๋ฅผ ๊ธฐ๋กํ•˜์—ฌ ๋ฐ˜๋ณต์œผ๋กœ ๊ฐœ๋ฐœ๋˜๋Š” ๋ฐฉ์‹์„ ์ถ”์ ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด ๊ธฐ๋Šฅ์€ ์ถ”์  ์„œ๋ฒ„์— ์—ฐ๊ฒฐํ•˜๊ฑฐ๋‚˜ ์ปค๋ฐ‹์˜ git ํ•ด์‹œ ๋กœ๊ทธ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํ•„์š”ํ•œ ๋ฐ˜๋ณต์„ ์ˆ˜ํ–‰ํ•œ ์ฝ”๋“œ๋ฅผ ์ดํ•ดํ•จ์œผ๋กœ์จ ์ตœ์ƒ์˜ ๋ชจ๋ธ์„ ์žฌํ˜„ํ•  ์ˆ˜ ์žˆ๊ธฐ ๋•Œ๋ฌธ์— ๋งค์šฐ ์œ ์šฉํ•ฉ๋‹ˆ๋‹ค.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

MLflow๋กœ ์ŠคํŒŒํฌ ํ™•์žฅ
์™€์ธ ๋ฐ˜๋ณต

๋ชจ๋ธ์˜ ๋ฐฑ์—”๋“œ

"mlflow server" ๋ช…๋ น์œผ๋กœ ์‹œ์ž‘๋œ MLflow ์ถ”์  ์„œ๋ฒ„์—๋Š” ์‹คํ–‰์„ ์ถ”์ ํ•˜๊ณ  ๋ฐ์ดํ„ฐ๋ฅผ ๋กœ์ปฌ ํŒŒ์ผ ์‹œ์Šคํ…œ์— ์“ฐ๊ธฐ ์œ„ํ•œ REST API๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. "MLFLOW_TRACKING_URI" ํ™˜๊ฒฝ ๋ณ€์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ถ”์  ์„œ๋ฒ„์˜ ์ฃผ์†Œ๋ฅผ ์ง€์ •ํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ MLflow ์ถ”์  API๋Š” ์ž๋™์œผ๋กœ ์ด ์ฃผ์†Œ์—์„œ ์ถ”์  ์„œ๋ฒ„์— ์—ฐ๊ฒฐํ•˜์—ฌ ์‹คํ–‰ ์ •๋ณด, ๋กœ๊น… ๋ฉ”ํŠธ๋ฆญ ๋“ฑ์„ ์ƒ์„ฑ/๊ฐ€์ ธ์˜ต๋‹ˆ๋‹ค.

์ถœ์ฒ˜ : Docs// ์ถ”์  ์„œ๋ฒ„ ์‹คํ–‰

๋ชจ๋ธ์— ์„œ๋ฒ„๋ฅผ ์ œ๊ณตํ•˜๋ ค๋ฉด ์‹คํ–‰ ์ค‘์ธ ์ถ”์  ์„œ๋ฒ„(์‹œ์ž‘ ์ธํ„ฐํŽ˜์ด์Šค ์ฐธ์กฐ)์™€ ๋ชจ๋ธ์˜ ์‹คํ–‰ ID๊ฐ€ ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค.

MLflow๋กœ ์ŠคํŒŒํฌ ํ™•์žฅ
์‹คํ–‰ ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

MLflow ์ œ๊ณต ๊ธฐ๋Šฅ์„ ์‚ฌ์šฉํ•˜์—ฌ ๋ชจ๋ธ์„ ์ œ๊ณตํ•˜๋ ค๋ฉด ๊ฐ„๋‹จํžˆ ์ง€์ •ํ•˜์—ฌ ๋ชจ๋ธ์— ๋Œ€ํ•œ ์ •๋ณด๋ฅผ ์–ป๊ธฐ ์œ„ํ•ด ์ถ”์  UI์— ์•ก์„ธ์Šคํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. --run_id.

๋ชจ๋ธ์ด ์ถ”์  ์„œ๋ฒ„์— ์—ฐ๊ฒฐ๋˜๋ฉด ์ƒˆ ๋ชจ๋ธ ์—”๋“œํฌ์ธํŠธ๋ฅผ ์–ป์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Spark์—์„œ ๋ชจ๋ธ ์‹คํ–‰

์ถ”์  ์„œ๋ฒ„๊ฐ€ ๋ชจ๋ธ์„ ์‹ค์‹œ๊ฐ„์œผ๋กœ ์ œ๊ณตํ•  ์ˆ˜ ์žˆ์„ ๋งŒํผ ๊ฐ•๋ ฅํ•˜๋‹ค๋Š” ์‚ฌ์‹ค์—๋„ ๋ถˆ๊ตฌํ•˜๊ณ  ๋ชจ๋ธ์„ ๊ต์œกํ•˜๊ณ  ์„œ๋ฒ„ ๊ธฐ๋Šฅ์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค(์ถœ์ฒ˜: mlflow // ๋ฌธ์„œ // ๋ชจ๋ธ #local), Spark(๋ฐฐ์น˜ ๋˜๋Š” ์ŠคํŠธ๋ฆฌ๋ฐ)๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์€ ๋ฐฐํฌ๋กœ ์ธํ•ด ํ›จ์”ฌ โ€‹โ€‹๋” ๊ฐ•๋ ฅํ•œ ์†”๋ฃจ์…˜์ž…๋‹ˆ๋‹ค.

๋ฐฉ๊ธˆ ์˜คํ”„๋ผ์ธ ๊ต์œก์„ ์ˆ˜ํ–‰ํ•œ ๋‹ค์Œ ์ถœ๋ ฅ ๋ชจ๋ธ์„ ๋ชจ๋“  ๋ฐ์ดํ„ฐ์— ์ ์šฉํ–ˆ๋‹ค๊ณ  ์ƒ์ƒํ•ด ๋ณด์‹ญ์‹œ์˜ค. ์—ฌ๊ธฐ์—์„œ Spark์™€ MLflow๊ฐ€ ์ œ ์—ญํ• ์„ ํ•ฉ๋‹ˆ๋‹ค.

ํŒŒ์ด์ŠคํŒŒํฌ + ์ฃผํ”ผํ„ฐ + ์ŠคํŒŒํฌ ์„ค์น˜

์ถœ์ฒ˜ : ์‹œ์ž‘ํ•˜๊ธฐ PySpark - Jupyter

Spark ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์— MLflow ๋ชจ๋ธ์„ ์ ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๋ณด์—ฌ์ฃผ๊ธฐ ์œ„ํ•ด PySpark์™€ ํ•จ๊ป˜ ์ž‘๋™ํ•˜๋„๋ก Jupyter ๋…ธํŠธ๋ถ์„ ์„ค์ •ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

์ตœ์‹  ์•ˆ์ • ๋ฒ„์ „์„ ์„ค์น˜ํ•˜์—ฌ ์‹œ์ž‘ํ•˜์‹ญ์‹œ์˜ค. ์•„ํŒŒ์น˜ ์ŠคํŒŒํฌ:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/sparkฬ€

๊ฐ€์ƒ ํ™˜๊ฒฝ์— PySpark ๋ฐ Jupyter๋ฅผ ์„ค์น˜ํ•ฉ๋‹ˆ๋‹ค.

pip install pyspark jupyter

ํ™˜๊ฒฝ ๋ณ€์ˆ˜๋ฅผ ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค.

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

๊ฒฐ์ •ํ•œ notebook-dir, ์›ํ•˜๋Š” ํด๋”์— ๋…ธํŠธ๋ถ์„ ์ €์žฅํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

PySpark์—์„œ Jupyter ์‹คํ–‰

Jupiter๋ฅผ PySpark ๋“œ๋ผ์ด๋ฒ„๋กœ ์„ค์ •ํ•  ์ˆ˜ ์žˆ์—ˆ๊ธฐ ๋•Œ๋ฌธ์— ์ด์ œ PySpark ์ปจํ…์ŠคํŠธ์—์„œ Jupyter ๋…ธํŠธ๋ถ์„ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

MLflow๋กœ ์ŠคํŒŒํฌ ํ™•์žฅ

์œ„์—์„œ ์–ธ๊ธ‰ํ–ˆ๋“ฏ์ด MLflow๋Š” S3์—์„œ ๋ชจ๋ธ ์•„ํ‹ฐํŒฉํŠธ๋ฅผ ๋กœ๊น…ํ•˜๋Š” ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค. ์„ ํƒํ•œ ๋ชจ๋ธ์„ ์†์— ์ฅ๋Š” ์ฆ‰์‹œ ๋ชจ๋“ˆ์„ ์‚ฌ์šฉํ•˜์—ฌ ๋ชจ๋ธ์„ UDF๋กœ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

MLflow๋กœ ์ŠคํŒŒํฌ ํ™•์žฅ
PySpark - ์™€์ธ ํ’ˆ์งˆ ์˜ˆ์ธก

์ง€๊ธˆ๊นŒ์ง€ ์ „์ฒด ์™€์ธ ๋ฐ์ดํ„ฐ ์„ธํŠธ์—์„œ ์™€์ธ ํ’ˆ์งˆ ์˜ˆ์ธก์„ ์‹คํ–‰ํ•˜์—ฌ MLflow์™€ ํ•จ๊ป˜ PySpark๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•์— ๋Œ€ํ•ด ์ด์•ผ๊ธฐํ–ˆ์Šต๋‹ˆ๋‹ค. ํ•˜์ง€๋งŒ Scala Spark์—์„œ Python MLflow ๋ชจ๋“ˆ์„ ์‚ฌ์šฉํ•ด์•ผ ํ•˜๋Š” ๊ฒฝ์šฐ์—๋Š” ์–ด๋–ป๊ฒŒ ํ•ด์•ผ ํ• ๊นŒ์š”?

Scala์™€ Python ๊ฐ„์— Spark ์ปจํ…์ŠคํŠธ๋ฅผ ๋ถ„ํ• ํ•˜์—ฌ ์ด๋ฅผ ํ…Œ์ŠคํŠธํ–ˆ์Šต๋‹ˆ๋‹ค. ์ฆ‰, Python์— MLflow UDF๋ฅผ ๋“ฑ๋กํ•˜๊ณ  Scala์—์„œ ์‚ฌ์šฉํ–ˆ์Šต๋‹ˆ๋‹ค(์˜ˆ, ์ตœ์ƒ์˜ ์†”๋ฃจ์…˜์€ ์•„๋‹ˆ์ง€๋งŒ ์šฐ๋ฆฌ๊ฐ€ ๊ฐ€์ง€๊ณ  ์žˆ๋Š” ๊ฒƒ).

์Šค์นผ๋ผ ์ŠคํŒŒํฌ + MLflow

์ด ์˜ˆ์—์„œ๋Š” ๋‹ค์Œ์„ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค. ํ† ๋ฆฌ ์ปค๋„ ๊ธฐ์กด ๋ชฉ์„ฑ์—.

Spark + Toree + Jupyter ์„ค์น˜

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

์ฒจ๋ถ€๋œ ๋…ธํŠธ๋ถ์—์„œ ๋ณผ ์ˆ˜ ์žˆ๋“ฏ์ด UDF๋Š” Spark์™€ PySpark ๊ฐ„์— ๊ณต์œ ๋ฉ๋‹ˆ๋‹ค. ์ด ๋ถ€๋ถ„์ด Scala๋ฅผ ์‚ฌ๋ž‘ํ•˜๊ณ  ๊ธฐ๊ณ„ ํ•™์Šต ๋ชจ๋ธ์„ ํ”„๋กœ๋•์…˜์— ๋ฐฐํฌํ•˜๋ ค๋Š” ์‚ฌ๋žŒ๋“ค์—๊ฒŒ ์œ ์šฉํ•˜๊ธฐ๋ฅผ ๋ฐ”๋ž๋‹ˆ๋‹ค.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

๋‹ค์Œ ๋‹จ๊ณ„

MLflow๋Š” ์ž‘์„ฑ ์‹œ์ ์— ์•ŒํŒŒ ๋ฒ„์ „์ด์ง€๋งŒ ๊ฝค ์œ ๋งํ•ด ๋ณด์ž…๋‹ˆ๋‹ค. ์—ฌ๋Ÿฌ ๊ธฐ๊ณ„ ํ•™์Šต ํ”„๋ ˆ์ž„์›Œํฌ๋ฅผ ์‹คํ–‰ํ•˜๊ณ  ๋‹จ์ผ ์—”๋“œํฌ์ธํŠธ์—์„œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ๊ฒƒ๋งŒ์œผ๋กœ๋„ ์ถ”์ฒœ ์‹œ์Šคํ…œ์„ ํ•œ ๋‹จ๊ณ„ ๋Œ์–ด์˜ฌ๋ฆด ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋˜ํ•œ MLflow๋Š” ๋ฐ์ดํ„ฐ ์—”์ง€๋‹ˆ์–ด์™€ ๋ฐ์ดํ„ฐ ๊ณผํ•™์ž๋ฅผ ๋” ๊ฐ€๊น๊ฒŒ ๋งŒ๋“ค์–ด ๊ทธ๋“ค ์‚ฌ์ด์— ๊ณตํ†ต ๊ณ„์ธต์„ ๋ฐฐ์น˜ํ•ฉ๋‹ˆ๋‹ค.

์ด MLflow ํƒ์ƒ‰ ํ›„์—๋Š” ๊ณ„์†ํ•ด์„œ Spark ํŒŒ์ดํ”„๋ผ์ธ ๋ฐ ์ถ”์ฒœ ์‹œ์Šคํ…œ์— ์‚ฌ์šฉํ•  ๊ฒƒ์ž…๋‹ˆ๋‹ค.

ํŒŒ์ผ ์‹œ์Šคํ…œ ๋Œ€์‹  ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์™€ ํŒŒ์ผ ์Šคํ† ๋ฆฌ์ง€๋ฅผ ๋™๊ธฐํ™”ํ•˜๋Š” ๊ฒƒ์ด ์ข‹์„ ๊ฒƒ์ž…๋‹ˆ๋‹ค. ์ด๋ ‡๊ฒŒ ํ•˜๋ฉด ๋™์ผํ•œ ํŒŒ์ผ ๊ณต์œ ๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ์—ฌ๋Ÿฌ ์—”๋“œํฌ์ธํŠธ๊ฐ€ ์ œ๊ณต๋ฉ๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด ์—ฌ๋Ÿฌ ์ธ์Šคํ„ด์Šค๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ํ”„๋ ˆ์Šคํ†  ์•…์žฅ ะธ ์•„ํ…Œ๋‚˜ ๋™์ผํ•œ Glue metastore๋กœ.

์š”์•ฝํ•˜๋ฉด ๋ฐ์ดํ„ฐ ์ž‘์—…์„ ๋”์šฑ ํฅ๋ฏธ๋กญ๊ฒŒ ๋งŒ๋“ค์–ด์ค€ MLFlow ์ปค๋ฎค๋‹ˆํ‹ฐ์— ๊ฐ์‚ฌ์˜ ๋ง์„ ์ „ํ•˜๊ณ  ์‹ถ์Šต๋‹ˆ๋‹ค.

MLflow๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ ์–ธ์ œ๋“ ์ง€ ์ €ํฌ์—๊ฒŒ ํŽธ์ง€๋ฅผ ๋ณด๋‚ด ์‚ฌ์šฉ ๋ฐฉ๋ฒ•์„ ์•Œ๋ ค์ฃผ์‹ญ์‹œ์˜ค. ํ”„๋กœ๋•์…˜์—์„œ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ์—๋Š” ๋”์šฑ ๊ทธ๋ ‡์Šต๋‹ˆ๋‹ค.

๊ณผ์ •์— ๋Œ€ํ•ด ์ž์„ธํžˆ ์•Œ์•„๋ณด๊ธฐ:
๊ธฐ๊ณ„ ํ•™์Šต. ๊ธฐ๋ณธ ๊ณผ์ •
๊ธฐ๊ณ„ ํ•™์Šต. ๊ณ ๊ธ‰ ์ฝ”์Šค

๋” ์ฝ์–ด๋ณด๊ธฐ:

์ถœ์ฒ˜ : habr.com

์ฝ”๋ฉ˜ํŠธ๋ฅผ ์ถ”๊ฐ€