ื”ืจื—ื‘ืช Spark ืขื MLflow

ืฉืœื•ื ืชื•ืฉื‘ื™ ื—ื‘ืจื•ื‘ืกืง. ื›ืคื™ ืฉื›ื‘ืจ ื›ืชื‘ื ื•, ื”ื—ื•ื“ืฉ OTUS ืžืฉื™ืงื” ืฉื ื™ ืงื•ืจืกื™ ืœืžื™ื“ืช ืžื›ื•ื ื” ื‘ื‘ืช ืื—ืช, ื›ืœื•ืžืจ ะฑะฐะทะพะฒั‹ะน ะธ ืžึดืชืงึทื“ึตื. ื‘ื”ืงืฉืจ ื–ื”, ืื ื• ืžืžืฉื™ื›ื™ื ืœื—ืœื•ืง ื—ื•ืžืจ ืฉื™ืžื•ืฉื™.

ืžื˜ืจืช ืžืืžืจ ื–ื” ื”ื™ื ืœื“ื‘ืจ ืขืœ ื”ื ื™ืกื™ื•ืŸ ื”ืจืืฉื•ืŸ ืฉืœื ื• ื‘ืฉื™ืžื•ืฉ ื–ืจื™ืžืช ML.

ื ืชื—ื™ืœ ื‘ืกืงื™ืจื” ื–ืจื™ืžืช ML ืžืฉืจืช ื”ืžืขืงื‘ ืฉืœื• ื•ืจืฉื•ื ืืช ื›ืœ ื”ืื™ื˜ืจืฆื™ื•ืช ืฉืœ ื”ืžื—ืงืจ. ืœืื—ืจ ืžื›ืŸ ื ืฉืชืฃ ืืช ื”ื—ื•ื•ื™ื” ืฉืœื ื• ื‘ื—ื™ื‘ื•ืจ Spark ืขื MLflow ื‘ืืžืฆืขื•ืช UDF.

ื”ึถืงืฉืึตืจ

ืื ื—ื ื• ื‘ืคื ื™ื ืืœืคื ื‘ืจื™ืื•ืช ืื ื• ืžืฉืชืžืฉื™ื ื‘ืœืžื™ื“ืช ืžื›ื•ื ื” ื•ื‘ื™ื ื” ืžืœืื›ื•ืชื™ืช ื›ื“ื™ ืœื”ืขืฆื™ื ืื ืฉื™ื ืœืงื—ืช ืื—ืจื™ื•ืช ืขืœ ื‘ืจื™ืื•ืชื ื•ืจื•ื•ื—ืชื. ื–ื• ื”ืกื™ื‘ื” ืฉืžื•ื“ืœื™ื ืฉืœ ืœืžื™ื“ืช ืžื›ื•ื ื” ื ืžืฆืื™ื ื‘ืœื‘ ืžื•ืฆืจื™ ืžื“ืขื™ ื”ื ืชื•ื ื™ื ืฉืื ื• ืžืคืชื—ื™ื, ื•ื–ื• ื”ืกื™ื‘ื” ืฉื ืžืฉื›ื ื• ืœ-MLflow, ืคืœื˜ืคื•ืจืžืช ืงื•ื“ ืคืชื•ื— ื”ืžื›ืกื” ืืช ื›ืœ ื”ื”ื™ื‘ื˜ื™ื ืฉืœ ืžื—ื–ื•ืจ ื”ื—ื™ื™ื ืฉืœ ืœืžื™ื“ืช ืžื›ื•ื ื”.

ื–ืจื™ืžืช ML

ื”ืžื˜ืจื” ื”ืขื™ืงืจื™ืช ืฉืœ MLflow ื”ื™ื ืœืกืคืง ืฉื›ื‘ื” ื ื•ืกืคืช ืขืœ ื’ื‘ื™ ืœืžื™ื“ืช ืžื›ื•ื ื” ืฉืชืืคืฉืจ ืœืžื“ืขื ื™ ื ืชื•ื ื™ื ืœืขื‘ื•ื“ ื›ืžืขื˜ ืขื ื›ืœ ืกืคืจื™ื™ืช ืœืžื™ื“ืช ืžื›ื•ื ื” (h2o, keras, ืงืคื™ืฆืช ืžื“ืจื’ื”, ืคื™ื˜ื•ืจืš, ืœืžื“ ะธ tensorflow), ืœื•ืงื— ืืช ืขื‘ื•ื“ืชื” ืœืฉืœื‘ ื”ื‘ื.

MLflow ืžืกืคืง ืฉืœื•ืฉื” ืžืจื›ื™ื‘ื™ื:

  • ืžืขืงื‘ - ื”ืงืœื˜ื” ื•ื‘ืงืฉื•ืช ืœื ื™ืกื•ื™ื™ื: ืงื•ื“, ื ืชื•ื ื™ื, ืชืฆื•ืจื” ื•ืชื•ืฆืื•ืช. ืžืขืงื‘ ืื—ืจ ืชื”ืœื™ืš ื™ืฆื™ืจืช ื”ืžื•ื“ืœ ื—ืฉื•ื‘ ืžืื•ื“.
  • ืคืจื•ื™ืงื˜ื™ื - ืคื•ืจืžื˜ ืืจื™ื–ื” ืœื”ืคืขืœื” ื‘ื›ืœ ืคืœื˜ืคื•ืจืžื” (ืœืžืฉืœ. SageMaker)
  • ืžื•ื“ืœื™ื โ€“ ืคื•ืจืžื˜ ื ืคื•ืฅ ืœื”ื’ืฉืช ืžื•ื“ืœื™ื ืœื›ืœื™ ืคืจื™ืกื” ืฉื•ื ื™ื.

MLflow (ื‘ืืœืคื ื‘ื–ืžืŸ ื”ื›ืชื™ื‘ื”) ื”ื™ื ืคืœื˜ืคื•ืจืžืช ืงื•ื“ ืคืชื•ื— ื”ืžืืคืฉืจืช ืœืš ืœื ื”ืœ ืืช ืžื—ื–ื•ืจ ื”ื—ื™ื™ื ืฉืœ ืœืžื™ื“ืช ืžื›ื•ื ื”, ื›ื•ืœืœ ื ื™ืกื•ื™ื™ื, ืฉื™ืžื•ืฉ ื—ื•ื–ืจ ื•ืคืจื™ืกื”.

ื”ื’ื“ืจืช MLflow

ื›ื“ื™ ืœื”ืฉืชืžืฉ ื‘-MLflow ืชื—ื™ืœื” ืขืœื™ืš ืœื”ื’ื“ื™ืจ ืืช ื›ืœ ืกื‘ื™ื‘ืช Python ืฉืœืš, ืœืฉื ื›ืš ื ืฉืชืžืฉ PyEnv (ื›ื“ื™ ืœื”ืชืงื™ืŸ Python ื‘-Mac, ื‘ื“ื•ืง ื›ืืŸ). ื›ืš ื ื•ื›ืœ ืœื™ืฆื•ืจ ืกื‘ื™ื‘ื” ื•ื™ืจื˜ื•ืืœื™ืช ืฉื‘ื” ื ืชืงื™ืŸ ืืช ื›ืœ ื”ืกืคืจื™ื•ืช ื”ื ื—ื•ืฆื•ืช ืœื”ืคืขืœืชื”.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

ื‘ื•ืื• ื ืชืงื™ืŸ ืืช ื”ืกืคืจื™ื•ืช ื”ื ื“ืจืฉื•ืช.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

ื”ืขืจื”: ืื ื• ืžืฉืชืžืฉื™ื ื‘- PyArrow ื›ื“ื™ ืœื”ืคืขื™ืœ ืžื•ื“ืœื™ื ื›ื’ื•ืŸ UDF. ื”ื™ื” ืฆื•ืจืš ืœืชืงืŸ ืืช ื”ื’ื™ืจืกืื•ืช ืฉืœ PyArrow ื•-Numpy ืžื›ื™ื•ื•ืŸ ืฉื”ื’ื™ืจืกืื•ืช ื”ืื—ืจื•ื ื•ืช ื”ืชื ื’ืฉื• ื–ื• ื‘ื–ื•.

ื”ืคืขืœ ืืช ืžืžืฉืง ื”ืžืฉืชืžืฉ ืœืžืขืงื‘

ืžืขืงื‘ MLflow ืžืืคืฉืจ ืœื ื• ืœืจืฉื•ื ื•ืœืฉืื•ืœ ื ื™ืกื•ื™ื™ื ื‘ืืžืฆืขื•ืช Python ื• REST ืžืžืฉืง API. ื‘ื ื•ืกืฃ, ืืชื” ื™ื›ื•ืœ ืœืงื‘ื•ืข ื”ื™ื›ืŸ ืœืื—ืกืŸ ื—ืคืฆื™ ืžื•ื“ืœ (localhost, ืืžื–ื•ืŸ S3, ืื—ืกื•ืŸ Azure Blob, ืื—ืกื•ืŸ ื‘ืขื ืŸ ืฉืœ Google ืื• ืฉืจืช SFTP). ืžื›ื™ื•ื•ืŸ ืฉืื ื• ืžืฉืชืžืฉื™ื ื‘-AWS ื‘-Alpha Health, ืื—ืกื•ืŸ ื”ื—ืคืฆื™ื ืฉืœื ื• ื™ื”ื™ื” S3.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow ืžืžืœื™ืฆื” ืœื”ืฉืชืžืฉ ื‘ืื—ืกื•ืŸ ืงื‘ืฆื™ื ืžืชืžืฉืš. ืื—ืกื•ืŸ ืงื‘ืฆื™ื ื”ื•ื ื”ืžืงื•ื ืฉื‘ื• ื”ืฉืจืช ื™ืื—ืกืŸ ืžื˜ื ื ืชื•ื ื™ื ืฉืœ ืจื™ืฆื” ื•ื ื™ืกื•ื™. ื‘ืขืช ื”ืคืขืœืช ื”ืฉืจืช, ื•ื“ื ืฉื”ื•ื ืžืฆื‘ื™ืข ืขืœ ืžืื’ืจ ื”ืงื‘ืฆื™ื ื”ืงื‘ื•ืข. ื›ืืŸ ืขื‘ื•ืจ ื”ื ื™ืกื•ื™ ืคืฉื•ื˜ ื ืฉืชืžืฉ /tmp.

ื–ื›ื•ืจ ืฉืื ื‘ืจืฆื•ื ื ื• ืœื”ืฉืชืžืฉ ื‘ืฉืจืช mlflow ื›ื“ื™ ืœื”ืคืขื™ืœ ื ื™ืกื•ื™ื™ื ื™ืฉื ื™ื, ื”ื ื—ื™ื™ื‘ื™ื ืœื”ื™ื•ืช ื ื•ื›ื—ื™ื ื‘ืื—ืกื•ืŸ ื”ืงื‘ืฆื™ื. ืขื ื–ืืช, ื’ื ื‘ืœื™ ื–ื” ื ื•ื›ืœ ืœื”ืฉืชืžืฉ ื‘ื”ื ื‘-UDF, ืžื›ื™ื•ื•ืŸ ืฉืื ื• ืฆืจื™ื›ื™ื ืจืง ืืช ื”ื ืชื™ื‘ ืœืžื•ื“ืœ.

ื”ืขืจื”: ื–ื›ื•ืจ ืฉืžืžืฉืง ื”ืžืฉืชืžืฉ ืœืžืขืงื‘ ื•ืœืœืงื•ื— ื”ื“ื’ื ื—ื™ื™ื‘ืช ืœื”ื™ื•ืช ื‘ืขืœืช ื’ื™ืฉื” ืœืžื™ืงื•ื ื”ื—ืคืฅ. ื›ืœื•ืžืจ, ืœืœื ืงืฉืจ ืœืขื•ื‘ื“ื” ืฉืžืžืฉืง ื”ืžืฉืชืžืฉ ืœืžืขืงื‘ ื ืžืฆื ื‘ืžื•ืคืข EC2, ื‘ืขืช ื”ืคืขืœืช MLflow ื‘ืื•ืคืŸ ืžืงื•ืžื™, ืœืžื›ื•ื ื” ื—ื™ื™ื‘ืช ืœื”ื™ื•ืช ื’ื™ืฉื” ื™ืฉื™ืจื” ืœ-S3 ื›ื“ื™ ืœื›ืชื•ื‘ ืžื•ื“ืœื™ื ืฉืœ ื—ืคืฆื™ื.

ื”ืจื—ื‘ืช Spark ืขื MLflow
ืžืžืฉืง ื”ืžืฉืชืžืฉ ืœืžืขืงื‘ ืžืื—ืกืŸ ื—ืคืฆื™ื ื‘ื“ืœื™ S3

ื“ื’ืžื™ื ืจืฆื™ื

ื‘ืจื’ืข ืฉืฉืจืช ื”-Tracking ืคื•ืขืœ, ืืชื” ื™ื›ื•ืœ ืœื”ืชื—ื™ืœ ืœืืžืŸ ืืช ื”ื“ื’ืžื™ื.

ื›ื“ื•ื’ืžื”, ื ืฉืชืžืฉ ื‘ืฉื™ื ื•ื™ ื”ื™ื™ืŸ ืžื“ื•ื’ืžื” ืฉืœ MLflow ื‘ ืกืงืœืจืŸ.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

ื›ืคื™ ืฉื›ื‘ืจ ื“ื™ื‘ืจื ื•, MLflow ืžืืคืฉืจ ืœืš ืœืจืฉื•ื ืคืจืžื˜ืจื™ื, ืžื“ื“ื™ื ื•ื—ืคืฆื™ื ืฉืœ ืžื•ื“ืœ ื›ื“ื™ ืฉืชื•ื›ืœ ืœืขืงื•ื‘ ืื—ืจ ื”ืื•ืคืŸ ืฉื‘ื• ื”ื ืžืชืคืชื—ื™ื ื‘ืžื”ืœืš ืื™ื˜ืจืฆื™ื•ืช. ืชื›ื•ื ื” ื–ื• ืฉื™ืžื•ืฉื™ืช ื‘ื™ื•ืชืจ ืžื›ื™ื•ื•ืŸ ืฉื›ืš ื ื•ื›ืœ ืœืฉื—ื–ืจ ืืช ื”ืžื•ื“ืœ ื”ื˜ื•ื‘ ื‘ื™ื•ืชืจ ืขืœ ื™ื“ื™ ืคื ื™ื™ื” ืœืฉืจืช ื”ืžืขืงื‘ ืื• ืœื”ื‘ื™ืŸ ืื™ื–ื” ืงื•ื“ ื‘ื™ืฆืข ืืช ื”ืื™ื˜ืจืฆื™ื” ื”ื ื“ืจืฉืช ื‘ืืžืฆืขื•ืช ื™ื•ืžื ื™ ื”-git hash ืฉืœ commits.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

ื”ืจื—ื‘ืช Spark ืขื MLflow
ืื™ื˜ืจืฆื™ื•ืช ื™ื™ืŸ

ื—ืœืง ืฉืจืช ืœื“ื’ื

ืœืฉืจืช ื”ืžืขืงื‘ MLflow, ื”ืžื•ืคืขืœ ื‘ืืžืฆืขื•ืช ื”ืคืงื•ื“ื” "mlflow server", ื™ืฉ REST API ืœืžืขืงื‘ ืื—ืจ ืจื™ืฆื•ืช ื•ื›ืชื™ื‘ืช ื ืชื•ื ื™ื ืœืžืขืจื›ืช ื”ืงื‘ืฆื™ื ื”ืžืงื•ืžื™ืช. ืืชื” ื™ื›ื•ืœ ืœืฆื™ื™ืŸ ืืช ื›ืชื•ื‘ืช ืฉืจืช ื”ืžืขืงื‘ ื‘ืืžืฆืขื•ืช ืžืฉืชื ื” ื”ืกื‘ื™ื‘ื” "MLFLOW_TRACKING_URI" ื•ื”-API ืœืžืขืงื‘ ืฉืœ MLflow ื™ื™ืฆื•ืจ ืงืฉืจ ืื•ื˜ื•ืžื˜ื™ืช ืขื ืฉืจืช ื”ืžืขืงื‘ ื‘ื›ืชื•ื‘ืช ื–ื• ื›ื“ื™ ืœื™ืฆื•ืจ/ืœืงื‘ืœ ืžื™ื“ืข ืขืœ ื”ืฉืงื”, ืžื“ื“ื™ ื™ื•ืžืŸ ื•ื›ื•'.

ืžืงื•ืจ: Docs// ื”ืคืขืœืช ืฉืจืช ืžืขืงื‘

ื›ื“ื™ ืœืกืคืง ืœืžื•ื“ืœ ืฉืจืช, ืื ื• ื–ืงื•ืงื™ื ืœืฉืจืช ืžืขืงื‘ ืคื•ืขืœ (ืจืื” ืžืžืฉืง ื”ืฉืงื”) ื•ืืช ืžื–ื”ื” ื”ื”ืคืขืœื” ืฉืœ ื”ืžื•ื“ืœ.

ื”ืจื—ื‘ืช Spark ืขื MLflow
ืžื–ื”ื” ื”ืคืขืœื”

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

ื›ื“ื™ ืœืฉืจืช ืžื•ื“ืœื™ื ื”ืžืฉืชืžืฉื™ื ื‘ืคื•ื ืงืฆื™ื•ื ืœื™ื•ืช ื”ืฉื™ืจื•ืช ืฉืœ MLflow, ื ืฆื˜ืจืš ื’ื™ืฉื” ืœืžืžืฉืง ื”ืžืฉืชืžืฉ ืœืžืขืงื‘ ื›ื“ื™ ืœืงื‘ืœ ืžื™ื“ืข ืขืœ ื”ืžื•ื“ืœ ืคืฉื•ื˜ ืขืœ ื™ื“ื™ ืฆื™ื•ืŸ --run_id.

ื‘ืจื’ืข ืฉื”ื“ื’ื ื™ื•ืฆืจ ืงืฉืจ ืขื ืฉืจืช ื”ืžืขืงื‘, ื ื•ื›ืœ ืœืงื‘ืœ ื ืงื•ื“ืช ืงืฆื” ื—ื“ืฉื” ืฉืœ ื”ื“ื’ื.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

ืจื™ืฆื” ื“ื’ืžื™ื ืžื‘ื™ืช Spark

ืœืžืจื•ืช ื”ืขื•ื‘ื“ื” ืฉืฉืจืช ื”ืžืขืงื‘ ื—ื–ืง ืžืกืคื™ืง ื›ื“ื™ ืœืฉืจืช ื“ื’ืžื™ื ื‘ื–ืžืŸ ืืžืช, ื™ืฉ ืœืืžืŸ ืื•ืชื ื•ืœื”ืฉืชืžืฉ ื‘ืคื•ื ืงืฆื™ื•ื ืœื™ื•ืช ื”ืฉืจืช (ืžืงื•ืจ: mlflow // docs // models # local), ื”ืฉื™ืžื•ืฉ ื‘-Spark (ืืฆื•ื•ื” ืื• ืกื˜ืจื™ืžื™ื ื’) ื”ื•ื ืคืชืจื•ืŸ ื—ื–ืง ืขื•ื“ ื™ื•ืชืจ ื‘ืฉืœ ื”ื”ืคืฆื”.

ืชืืจื• ืœืขืฆืžื›ื ืฉืคืฉื•ื˜ ื‘ื™ืฆืขืชื ืืช ื”ื”ื“ืจื›ื” ื‘ืžืฆื‘ ืœื ืžืงื•ื•ืŸ ื•ืื– ื”ื—ืœืชื ืืช ืžื•ื“ืœ ื”ืคืœื˜ ืขืœ ื›ืœ ื”ื ืชื•ื ื™ื ืฉืœื›ื. ื–ื” ื”ืžืงื•ื ืฉื‘ื• Spark ื•-MLflow ื–ื•ื”ืจื™ื.

ื”ืชืงืŸ PySpark + Jupyter + Spark

ืžืงื•ืจ: ื”ืชื—ืœ PySpark - Jupyter

ื›ื“ื™ ืœื”ืจืื•ืช ื›ื™ืฆื“ ืื ื• ืžื™ื™ืฉืžื™ื ืžื•ื“ืœื™ื ืฉืœ MLflow ืขืœ ืžืกื’ืจื•ืช ื ืชื•ื ื™ื ืฉืœ Spark, ืขืœื™ื ื• ืœื”ื’ื“ื™ืจ ืžื—ื‘ืจื•ืช Jupyter ื›ืš ืฉื™ืขื‘ื“ื• ื™ื—ื“ ืขื PySpark.

ื”ืชื—ืœ ื‘ื”ืชืงื ืช ื”ื’ืจืกื” ื”ื™ืฆื™ื‘ื” ื”ืื—ืจื•ื ื” ืืคืืฆ 'ื™ ืกืคืืจืง:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/sparkฬ€

ื”ืชืงืŸ ืืช PySpark ื•-Jupyter ื‘ืกื‘ื™ื‘ื” ื”ื•ื™ืจื˜ื•ืืœื™ืช:

pip install pyspark jupyter

ื”ื’ื“ืจ ืžืฉืชื ื™ ืกื‘ื™ื‘ื”:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

ืœืื—ืจ ืฉื ืงื‘ืข notebook-dir, ื ื•ื›ืœ ืœืื—ืกืŸ ืืช ื”ืžื—ื‘ืจื•ืช ืฉืœื ื• ื‘ืชื™ืงื™ื™ื” ื”ืจืฆื•ื™ื”.

ื”ืฉืงืช Jupyter ืžื‘ื™ืช PySpark

ืžื›ื™ื•ื•ืŸ ืฉื”ืฆืœื—ื ื• ืœื”ื’ื“ื™ืจ ืืช Jupiter ื›ืžื ื”ืœ ื”ืชืงืŸ PySpark, ืื ื• ื™ื›ื•ืœื™ื ื›ืขืช ืœื”ืคืขื™ืœ ืืช Jupyter Notebook ื‘ื”ืงืฉืจ ืฉืœ PySpark.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

ื”ืจื—ื‘ืช Spark ืขื MLflow

ื›ืคื™ ืฉื”ื•ื–ื›ืจ ืœืขื™ืœ, MLflow ืžืกืคืง ืชื›ื•ื ื” ืœืจื™ืฉื•ื ื—ืคืฆื™ ืžื•ื“ืœ ื‘-S3. ื‘ืจื’ืข ืฉื™ืฉ ืœื ื• ืืช ื”ื“ื’ื ื”ื ื‘ื—ืจ ื‘ื™ื“ื™ื ื•, ื™ืฉ ืœื ื• ื”ื–ื“ืžื ื•ืช ืœื™ื™ื‘ื ืื•ืชื• ื›-UDF ื‘ืืžืฆืขื•ืช ื”ืžื•ื“ื•ืœ mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

ื”ืจื—ื‘ืช Spark ืขื MLflow
PySpark - ืคืœื˜ ืชื—ื–ื™ื•ืช ืื™ื›ื•ืช ื”ื™ื™ืŸ

ืขื“ ืœื ืงื•ื“ื” ื–ื•, ื“ื™ื‘ืจื ื• ืขืœ ืื™ืš ืœื”ืฉืชืžืฉ ื‘-PySpark ืขื MLflow, ื”ืคืขืœืช ืชื—ื–ื™ื•ืช ืื™ื›ื•ืช ื™ื™ืŸ ืขืœ ื›ืœ ืžืขืจืš ื”ื ืชื•ื ื™ื ืฉืœ ื”ื™ื™ืŸ. ืื‘ืœ ืžื” ืื ืืชื” ืฆืจื™ืš ืœื”ืฉืชืžืฉ ื‘ืžื•ื“ื•ืœื™ Python MLflow ืž- Scala Spark?

ื‘ื“ืงื ื• ื’ื ืืช ื–ื” ืขืœ ื™ื“ื™ ืคื™ืฆื•ืœ ื”ื”ืงืฉืจ ืฉืœ Spark ื‘ื™ืŸ Scala ืœืคื™ื™ืชื•ืŸ. ื›ืœื•ืžืจ, ืจืฉืžื ื• ืืช MLflow UDF ื‘-Python, ื•ื”ืฉืชืžืฉื ื• ื‘ื• ืž-Scala (ื›ืŸ, ืื•ืœื™ ืœื ื”ืคืชืจื•ืŸ ื”ื›ื™ ื˜ื•ื‘, ืื‘ืœ ืžื” ืฉื™ืฉ ืœื ื•).

ืกืงืืœื” ืกืคืืจืง + MLflow

ืœื“ื•ื’ืžื ื–ื• ื ื•ืกื™ืฃ ื˜ื•ืจื™ ื’ืจืขื™ืŸ ืœืชื•ืš ืฆื“ืง ื”ืงื™ื™ื.

ื”ืชืงืŸ ืืช Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

ื›ืคื™ ืฉื ื™ืชืŸ ืœืจืื•ืช ืžื”ืžื—ื‘ืจืช ื”ืžืฆื•ืจืคืช, ื”-UDF ืžืฉื•ืชืฃ ื‘ื™ืŸ Spark ืœ-PySpark. ืื ื• ืžืงื•ื•ื™ื ืฉื—ืœืง ื–ื” ื™ื”ื™ื” ืฉื™ืžื•ืฉื™ ืขื‘ื•ืจ ืืœื” ืฉืื•ื”ื‘ื™ื ืืช Scala ื•ืจื•ืฆื™ื ืœืคืจื•ืก ืžื•ื“ืœื™ื ืฉืœ ืœืžื™ื“ืช ืžื›ื•ื ื” ื‘ื™ื™ืฆื•ืจ.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

ื”ืฉืœื‘ื™ื ื”ื‘ืื™ื

ืœืžืจื•ืช ืฉ-MLflow ื ืžืฆื ื‘ื’ืจืกืช ืืœืคื ื‘ื–ืžืŸ ื”ื›ืชื™ื‘ื”, ื–ื” ื ืจืื” ืžื‘ื˜ื™ื— ืœืžื“ื™. ืจืง ื”ื™ื›ื•ืœืช ืœื”ืคืขื™ืœ ืžืกื’ืจื•ืช ืœืžื™ื“ืช ืžื›ื•ื ื” ืžืจื•ื‘ื•ืช ื•ืœืฆืจื•ืš ืื•ืชืŸ ืžื ืงื•ื“ืช ืงืฆื” ืื—ืช ืžืขืœื” ืืช ืžืขืจื›ื•ืช ื”ืžืžืœื™ืฆื™ื ืœืฉืœื‘ ื”ื‘ื.

ื‘ื ื•ืกืฃ, MLflow ืžืงืจื‘ืช ืืช ืžื”ื ื“ืกื™ ื”ื ืชื•ื ื™ื ื•ืžื•ืžื—ื™ ืžื“ืขื™ ื”ื ืชื•ื ื™ื, ื•ืžื ื™ื—ืช ืฉื›ื‘ื” ืžืฉื•ืชืคืช ื‘ื™ื ื™ื”ื.

ืœืื—ืจ ื—ืงื™ืจื” ื–ื• ืฉืœ MLflow, ืื ื• ื‘ื˜ื•ื—ื™ื ืฉื ืชืงื“ื ื•ื ืฉืชืžืฉ ื‘ื• ืขื‘ื•ืจ ืฆื™ื ื•ืจื•ืช Spark ื•ืžืขืจื›ื•ืช ื”ืžืžืœื™ืฆื™ื ืฉืœื ื•.

ื–ื” ื™ื”ื™ื” ื ื—ืžื“ ืœืกื ื›ืจืŸ ืืช ืื—ืกื•ืŸ ื”ืงื‘ืฆื™ื ืขื ืžืกื“ ื”ื ืชื•ื ื™ื ื‘ืžืงื•ื ืขื ืžืขืจื›ืช ื”ืงื‘ืฆื™ื. ื–ื” ืืžื•ืจ ืœืชืช ืœื ื• ืžืกืคืจ ื ืงื•ื“ื•ืช ืงืฆื” ืฉื™ื›ื•ืœื•ืช ืœื”ืฉืชืžืฉ ื‘ืื•ืชื• ืื—ืกื•ืŸ ืงื‘ืฆื™ื. ืœื“ื•ื’ืžื”, ื”ืฉืชืžืฉ ื‘ืžืกืคืจ ืžื•ืคืขื™ื ืคืจืกื˜ื• ะธ ืืชื ื” ืขื ืื•ืชื• Metastore ืฉืœ Glue.

ืœืกื™ื›ื•ื, ืื ื™ ืจื•ืฆื” ืœื•ืžืจ ืชื•ื“ื” ืœืงื”ื™ืœืช MLFlow ืขืœ ืฉื”ืคื›ืช ืืช ื”ืขื‘ื•ื“ื” ืฉืœื ื• ืขื ื ืชื•ื ื™ื ืœืžืขื ื™ื™ื ืช ื™ื•ืชืจ.

ืื ืืชื” ืžืฉื—ืง ืขื MLflow, ืืœ ืชื”ืกืก ืœื›ืชื•ื‘ ืœื ื• ื•ืœืกืคืจ ืœื ื• ืื™ืš ืืชื” ืžืฉืชืžืฉ ื‘ื•, ื•ืขื•ื“ ื™ื•ืชืจ ืื ืืชื” ืžืฉืชืžืฉ ื‘ื• ื‘ื”ืคืงื”.

ืœืžื™ื“ืข ื ื•ืกืฃ ืขืœ ื”ืงื•ืจืกื™ื:
ืœืžื™ื“ืช ืžื›ื•ื ื”. ืงื•ืจืก ื‘ืกื™ืกื™
ืœืžื™ื“ืช ืžื›ื•ื ื”. ืงื•ืจืก ืžืชืงื“ื

ืงืจื ืขื•ื“:

ืžืงื•ืจ: www.habr.com

ื”ื•ืกืคืช ืชื’ื•ื‘ื”