ααααΆααα½αα’ααααααα»α Khabrovsk α ααΌα
αααααΎαααΆαααααααα½α
α αΎα αα
ααααα OTUS αααα»αα
αΆααααααΎααααααα·ααααΆαααΆαααΈαααΈααααα»αααααααα½α αααααΊ
αααααααααα’ααααααααααΊααΎααααΈαα·ααΆαα’αααΈαααα·αααααααααΌαααααααΎααααα»αααΆαααααΎααααΆαα
ααΎαααΉαα
αΆααααααΎαααΆααα·αα·αααα‘αΎααα·α
ααα·αα
ααΎαβαααα·αβαα
βαααα»α
ααα αΌα
ααααα
α
αααααα MLflow ααΊααΎααααΈαααααααΌααααααΆαααααααααα½ααα
ααΎααααΌαααααΆαααααααΆαααΈα αααααΉαα’αα»ααααΆαα±ααα’ααααα·ααααΆααΆααααααα·ααααααααααΎααΆαααΆαα½αααααΎαααααααααααααΆαααααααααΆαααΈα (
MLflow αααααααΌααααΆαααΆαα»ααΈαααΆαα
- ααΆαβααΆαααΆα - ααΆααααααααΆ αα·αααααΎαααααΆααααΆααα·αααααα ααΌα αα·αααααα ααΆαααααααα ααΆαααααααα αα·αααααααα ααΆααααα½ααα·αα·αααααααΎαααΆαααααΆααααααΎαααααΌααΊααΆαααΆααααααΆααααΆααα
- αααααα - ααααααααα
ααα
ααααΎααααΈααααΎαααΆαααΎαααα·ααΆααΆαα½α (α§.
SageMaker α ) - αααΌααα - ααααααααΌαα αααααΆααααΆααααααΌαααααΌαα ααΆααα§αααααααΆαααααααΆαααααααα
MLflow (ααΆα’αΆααα αααΆαα αααααααα) ααΊααΆαααα·ααΆαααααααΎαα αα αααα’αα»ααααΆαα±ααα’ααααααααααααααααααααΆααα·ααααΆαααααααΆαααΈα αα½αααΆααααΆααα·ααααα ααΆαααααΎααααΆααα‘αΎααα·α αα·αααΆαααΆααα±ααααααΎααααΆααα
ααΆαααα‘αΎα MLflow
ααΎααααΈααααΎ MLflow α’αααααααΌααααα
αααα·ααααΆα Python ααΆααααΌαααααα’αααααΆαα»ααα·α αααααΆααααΆααααααΎαααΉαααααΎ
```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```
αααααα‘αΎααααααΆααααααααααΌαααΆαα
```
pip install mlflow==0.7.0
Cython==0.29
numpy==1.14.5
pandas==0.23.4
pyarrow==0.11.0
```
α αααΆαα ααΎαααααΎ PyArrow ααΎααααΈααααΎαααΆααααΌαααααΌα ααΆ UDF ααΆααΎαα αααααααα PyArrow αα·α Numpy α αΆαααΆα αααααΌαααΆααα½ααα»α ααΈαααααααααα α»ααααααααΆαααααααααΆαα½αααααΆα
ααΎαααααΎαααΆα UI ααΆαααΆα
ααΆαααΆαααΆα MLflow α’αα»ααααΆαα±ααααΎααααααααΆ αα·αααΆααα½αααΆααα·ααααααααααααΎ Python αα·α
# Running a Tracking Server
mlflow server
--file-store /tmp/mlflow/fileStore
--default-artifact-root s3://<bucket>/mlflow/artifacts/
--host localhost
--port 5000
MLflow ααααΆαα±ααααααΎααΆααααα»αα―αααΆαααΆααααΆααα ααΆααααα»αα―αααΆαααΊααΆααααααααααααΆαααΈαααααΉααααααΆαα»αααααΎαααΆα αα·ααα·ααααααααααΆαα·αααααα αα
αααα
αΆααααααΎααααΆαααΈααα ααααΌαααααΆααααΆααΆα
ααα’α»ααα
αααααααααα»αα―αααΆαααΆααα αα
ααΈααααααααΆααααΆααα·ααααααααααΎαααΉαααααΎαααΆαααΆαααα /tmp
.
ααΌαα αα αΆαααΆ ααααα·αααΎααΎαα ααααααΎαααΆαααΈααα mlflow ααΎααααΈααααΎαααΆαααΆααα·αααααα αΆααα αα½αααααααΌαααααΆαααααααΆααα αααα»ααααααααααα»αα―αααΆαα ααααααΆαααΆααααα αααααΈααΆααααΆαααΆααααα ααΎαα’αΆα ααααΎαα½αααΆαα αααα»α UDF αααα ααα»ααΆααΎαααααΆααααααααΌαααΆαααααΌααα ααΆααααααΌααα»αααααα
α αααΆαα ααΌαα αα αΆαααΆααΆαααΆαααΆα UI αα·αα’αα·αα·ααααααΌααααΌαααααΆααα·αααα·α αΌααα ααΆααααΈααΆααααααα»αα»ααΆαα αααααΊααααα·ααα·αααΈααΆααα·ααααααΆ Tracking UI αααα·ααα αααα»αα§ααΆα ααα EC2 αα αααααααΎαααΆα MLflow αααα»ααααα»α αααΆαααΈαααααΌαααααΆααα·αααα·α αΌαααααΎαααααααΆαααα ααΆαα S3 ααΎααααΈαααααααααΌααααα»αα»ααΆαα
ααΆαααΆαααΆα UI αααααΆαα»αααααα»αα»ααΆααα
αααα»ααα»α S3
αααΌαααααααααα»αααααΎαααΆα
αααΆαααΆαααΆαααΈαααααΆαααΆααααα»αααααΎαααΆα α’αααα’αΆα α αΆααααααΎαααααα»ααααααΆαααααΌα
ααΆα§ααΆα ααα ααΎαααΉαααααΎααΆαααααααααααΆααΈα§ααΆα ααα MLflow αααα»α
MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py
--alpha 0.9
--l1_ration 0.5
--wine_file ./data/winequality-red.csv
ααΌα αααααΎαααΆααα·ααΆααααΆαα½α ααα αΎα MLflow α’αα»ααααΆαα±ααα’ααααααααααΆαααΆαααΆααααααααααΌ αααααΆαα αα·αααααα»αα»ααΆα ααΌα ααααα’αααα’αΆα ααΆαααΆαααΈααααααααα½αααΆαα·ααααααΎααΆαααααΎαααααααα αααααααα·ααααααααΆαααααααααααααΆααααΆαα ααΈααααααα·ααΈαααααΎαα’αΆα αααααΎαααααΌααα’αααα»αα‘αΎααα·ααααααΆαααααα αααΆαααΈαααααΆαααΆα α¬ααΆααααααΉαα’αααΈααΌαααΆαα½ααααα’αα»ααααααΆαααΆαααΆαα‘αΎααα·ααααααααΎ git hash logs of commitsα
with mlflow.start_run():
... model ...
mlflow.log_param("source", wine_path)
mlflow.log_param("alpha", alpha)
mlflow.log_param("l1_ratio", l1_ratio)
mlflow.log_metric("rmse", rmse)
mlflow.log_metric("r2", r2)
mlflow.log_metric("mae", mae)
mlflow.set_tag('domain', 'wine')
mlflow.set_tag('predict', 'quality')
mlflow.sklearn.log_model(lr, "model")
ααααΆααααα
ααααααααΆαααΈααααααααΆαααααΌααα
αααΆαααΈαααααΆαααΆα MLflow αααααΆαα αΆααααααΎααααααααΎααΆααααααααΆ "mlflow server" ααΆα REST API αααααΆααααΆαααΆαααΆαααα αα·αααΆαααααααα·αααααααα ααΆααααααααααα―αααΆαααΌαααααΆαα α’αααα’αΆα αααααΆααα’αΆααααααΆααααΆαααΈαααααΆαααΆααααααααΎα’αααααα·ααααΆα βMLFLOW_TRACKING_URIβ α αΎα API ααΆαααΆα MLflow ααΉαααΆαααααααααααααααααααα·αα αααΆαααΈαααααΆαααΆαααΆαα’αΆααααααΆαααα ααΎααααΈαααααΎα/ααα½αααααααΆαααααΆαααΎαααααΎαααΆα αααααα ααα»ααααααααα
ααααα:
α―αααΆα// αααα»αααααΎαααΆααααΆαααΈαααααΆαααΆα
ααΎααααΈααααααααΌαααααΆαα½ααααΆαααΈααα ααΎαααααΌαααΆααααΆαααΈαααααΆαααΆαααααααα»αααααΎαααΆα (ααΌαααΎαα ααα»α αααααΆααα αΆααααααΎα) αα·αααααααααΆααααααΎαααΆαααααααΌα
ααααΎαααΆαααααααααΆαα
# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve
--port 5005
--run_id 0f8691808e914d1087cf097a08730f17
--model-path model
ααΎααααΈαααααΎαααΌααααααααααΎαα»αααΆααααααΎ MLflow ααΎαααΉαααααΌαααΆαα
αΌααα
ααΆαα UI ααΆαααΆα ααΎααααΈααα½αααΆαααααααΆαα’αααΈααααΌαααααααΆαααααααααΆαα --run_id
.
αα αααααααααΌαααααΆαααααααΆαααΈαααααΆαααΆα ααΎαα’αΆα ααα½αααΆαα ααα»α αααα ααααααααΌααααΈα
# Query Tracking Server Endpoint
curl -X POST
http://127.0.0.1:5005/invocations
-H 'Content-Type: application/json'
-d '[
{
"fixed acidity": 3.42,
"volatile acidity": 1.66,
"citric acid": 0.48,
"residual sugar": 4.2,
"chloridessssss": 0.229,
"free sulfur dsioxide": 19,
"total sulfur dioxide": 25,
"density": 1.98,
"pH": 5.33,
"sulphates": 4.39,
"alcohol": 10.8
}
]'
> {"predictions": [5.825055635303461]}
αααΌαααααααααα»αααααΎαααΆαααΈ Spark
αααααΈααΆααΆααα·ααααααΆαααΆαααΈαααααΆαααΆαααΆαααΆααααααααααααΆααααΎααααΈαααααΎαααΌααααααα»ααααααααΆααΆαααααααααααα ααααα»ααααααΆααα½ααα αα·αααααΎααααΆαααα»αααΆααααΆαααΈααα (αααααα
αααααααΆα’αααααααΆααααααααΎααΆαα αααΉαα αααΊααααα αααααΆα α αΎααααααΆααααα’αα»ααααααααΌαααααααα ααΉααα·ααααααααααα’αααααΆααα’ααα αααααΊααΆααααααααα Spark αα·α MLflow ααααΊα
ααα‘αΎα PySpark + Jupyter + Spark
ααααα:
α αΆααααααΎα PySpark - Jupyter
ααΎααααΈαααα αΆαααΈαααααααααΎαα’αα»ααααααααΌ MLflow αα Spark dataframes ααΎαααααΌααααα α Jupyter notebooks ααΎααααΈααααΎααΆααα½αααααΆααΆαα½α PySparkα
α
αΆααααααΎααααααα‘αΎααααααααααααΆαα
α»αααααααααα»αα
cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/sparkΜ
ααα‘αΎα PySpark αα·α Jupyter αα αααα»αααα·ααΆααΆααα·αααα·αα
pip install pyspark jupyter
αααα αα’αααααα·ααααΆαα
export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"
αααααΆαααααα notebook-dir
ααΎαα’αΆα
αααααΆαα»αααααα
αααααααΆααααααΎααααα»αααα―αααΆααααα
ααααΆαα
ααΎαααααΎαααΆα Jupyter ααΈ PySpark
αααααΆαααΎαα’αΆα ααααααα ααΆαααααααα Jupiter ααΆαααααα·ααΈαααααΆ PySpark α₯α‘αΌααααααΎαα’αΆα ααααΎαααΆα Jupyter notebook αα αααα»αααα·αααα PySpark α
(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]
Copy/paste this URL into your browser when you connect for the first time,
to login with a token:
http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
ααΌα
αααααΆααααααΆααααΆαααΎ MLflow αααααααΌααααααααα·ααααααααΆααααΆααααααααΆααααα»αα»ααΆαααααΌαα
αααα»α S3 α αααΆαααΆααΎαααΆαααααΌαααααΆαααααΎαααΎααα
αααα»αααααααααΎα ααΎαααΆαα±ααΆαααΆαα
αΌαααΆααΆ UDF αααααααΎαααΌαα»α mlflow.pyfunc
.
import mlflow.pyfunc
model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)
df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
"residual sugar", "chlorides", "free sulfur dioxide",
"total sulfur dioxide", "density", "pH",
"sulphates", "alcohol"
]
df.withColumn('prediction', wine_udf(*columns)).show(100, False)
PySpark - αααα αΆαααΆαααααΆαααααα»αααΆαααααΆ
αα αΌααααααα ααα»α ααα ααΎαααΆααα·ααΆαα’αααΈααααααααΎ PySpark ααΆαα½α MLflow αααααααΎαααΆαααΆαααααΆαααααα»αααΆαααααΆαα ααΎαααα»ααα·ααααααααααΆααΆααααΌαα ααα»ααααα α»ααααΆαααΆααΎα’αααααααΌαααΆαααααΎαααΌαα»α Python MLflow ααΈ Scala Spark?
ααΎαααΆαααΆαααααααΆαααααααααααααααα·αα Spark αααΆα Scala αα·α Python α αααααΊααΎαααΆαα α»αααααα MLflow UDF αα αααα»α Python α αΎαααΆαααααΎααΆααΈ Scala (ααΆα αααα ααααΆαα·ααααααΆαααααααααΆαααααα’αααα»α ααα»ααααα’αααΈαααααΎαααΆα)α
Scala Spark + MLflow
αααααΆααα§ααΆα ααααααααΎαααΉααααααα
ααα‘αΎα Spark + Toree + Jupyter
pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
apache_toree_scala /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
python3 /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```
ααΌα αααα’αααα’αΆα ααΎαααΎαααΈααααα αααααααΆαααααΆαααααΆαα UDF ααααΌαααΆαα ααααααααααΆα Spark αα·α PySpark α ααΎααααααΉαααΆααααααααααΉαααΆααααααααααααααΆααα’αααααααααα‘αΆαα Scala α αΎαα ααααααΎαααΌαααααααααΆαααΈααα αααα»αααα·αααααα
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex
val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r
def getFieldAlias(field_name: String): String = {
FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}
def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
val fieldsToSelect: List[Column] = columns.map(field =>
col(field).as(getFieldAlias(field))
)
df.select(fieldsToSelect: _*)
}
def normalizeSchema(df: DataFrame): DataFrame = {
val schema = df.columns.toList
df.transform(selectFieldsNormalized(schema))
}
FirstAtRe = ^_
AliasRe = [s_.:@]+
getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
.format("csv")
.option("header", "true")
.option("delimiter", ";")
.load(winePath)
.transform(normalizeSchema)
df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc
model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)
spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT
quality,
wineQuality(
fixed_acidity,
volatile_acidity,
citric_acid,
residual_sugar,
chlorides,
free_sulfur_dioxide,
total_sulfur_dioxide,
density,
pH,
sulphates,
alcohol
) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality| prediction|
+-------+------------------+
| 5| 5.576883967129615|
| 5| 5.50664776916154|
| 5| 5.525504822954496|
| 6| 5.504311247097457|
| 5| 5.576883967129615|
| 5|5.5556903912725755|
| 5| 5.467882654744997|
| 7| 5.710602976324739|
| 7| 5.657319539336507|
| 5| 5.345098606538708|
+-------+------------------+
In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)
+-----------+--------+-----------+---------+-----------+
|name |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null |null |null |true |
+-----------+--------+-----------+---------+-----------+
ααα αΆαβαααααΆαα
αααααΈααΆ MLflow αααα·ααα αααα»αααααα’αΆααα αααΆαα ααααααααααααα ααΆααΎααα αα·αααΆααααααααΆααα ααααΆαααααααααααΆααααα»αααΆαααααΎαααΆααααααααααααααααΆαααΈαα αααΎα α αΎαααααΎααααΆααααΆααΈα ααα»α αααα αααααα½αααΆαααααααααααααΆααα ααααα·ααααααΆααα
ααΎαααΈααα MLflow ααΆαααααΌααα·ααααααα·αααααα αα·αα’αααααααΆαααααααα·ααααΆααΆααααααα·ααααααα±ααααΆαααααα·ααααα·αααααΆαα½αααααΆ αααααΆαααααααΆααααΌαα αααΆααα½αααα
αααααΆααααΈααΆααα»ααα MLflow ααα ααΎαααΆααααα»αα α·αααααΆααΎαααΉαααααααα αα»α α αΎαααααΎααααΆααααΆαααααΆαααααααααα Spark pipelines αα·αααααααααααααΆαααααααΎαα
ααΆααΆααΆαααα’αααα»αααΆαααααΎααααΆαααααααΆααααα»αα―αααΆαααΆαα½αααΌαααααΆααα·αααααααααα½αα±ααααααααααα―αααΆαα ααααα½ααααααααα±ααααΎαααΌαα
ααα»α
αααα
ααααΆα
αααΎααααα’αΆα
ααααΎαααααααααα»αα―αααΆαααΌα
ααααΆα α§ααΆα ααα ααααΎα§ααΆα αααα
αααΎαα
ααΎααααΈαααααα αααα»αα αααα·ααΆαα’ααα»αααααα αααα MLFlow αααααΆααααΆαααααΎα±ααααΆαααΆαααααααΎαααΆαα½αααΉααα·ααααααααΆαααααα½αα±ααα αΆααα’αΆααααααα
ααααα·αααΎα’ααααααα»ααααααΆαα½α MLflow ααΌααα»αααααΆααααααΎααααα»αααΆααααααααααΆααααΎα α αΎαααααΆααααΎαααΈαααααααα’αααααααΎααΆ α αΎααααααΆααααΌα αααααααααααα·αααΎα’αααααααΎααΆαα αααα»αααα·αααααα
ααααααααααααααα’αααΈαααααα·ααααΆα
α’αΆαβαααααα:
α αΆαα·ααα αα·αβααΆαβαααα»αααααααααβααβααΆαβα’αα»ααααβααΆαβαα·ααΆαβαααΆαααΆαβα ααααβαα βααΉαβαααα αΆβααΆαβαα·ααααΆβαααβααΆαβαααα½αβαα·αα·ααα ααΆαααΆααα±ααααααΎααααΆααααααΌααααααΆαααΈαααΆαα½α Docker - αααααααΈ 1 ααΆαααΆααα±ααααααΎααααΆααααααΌααααααΆαααΈαααΆαα½α Docker - αααααααΈ 2
ααααα: www.habr.com