ããã«ã¡ã¯ããããã人ã ãã§ã«æžããããã«ãä»æ OTUS ã¯æ©æ¢°åŠç¿ã«é¢ãã XNUMX ã€ã®ã³ãŒã¹ãåæã«éå§ããŸãã
ãã®èšäºã®ç®çã¯ãç§ãã¡ã®æåã®çµéšã«ã€ããŠè©±ãããšã§ãã
ã¬ãã¥ãŒãéå§ããããŸã
ã³ã³ããã¹ã
ç§ãã¡ã¯
MLãããŒ
MLflow ã®äž»ãªç®æšã¯ãæ©æ¢°åŠç¿ã®äžã«è¿œå ã®ã¬ã€ã€ãŒãæäŸããããŒã¿ ãµã€ãšã³ãã£ã¹ããã»ãŒãã¹ãŠã®æ©æ¢°åŠç¿ã©ã€ãã©ãªãæäœã§ããããã«ããããšã§ã (
MLflow 㯠XNUMX ã€ã®ã³ã³ããŒãã³ããæäŸããŸãã
- 远跡 â å®éšã®èšé²ãšãªã¯ãšã¹ã: ã³ãŒããããŒã¿ãæ§æãçµæã ã¢ãã«ãäœæããããã»ã¹ã«åŸãããšãéåžžã«éèŠã§ãã
- ãããžã§ã¯ã â ãããããã©ãããã©ãŒã ã§å®è¡ã§ããããã±ãŒãžååœ¢åŒ (äŸ:
ã»ãŒãžã¡ãŒã«ãŒ ) - Models ã¯ãã¢ãã«ãããŸããŸãªå±éããŒã«ã«éä¿¡ããããã®äžè¬çãªåœ¢åŒã§ãã
MLflow (å·çæç¹ã§ã¯ã¢ã«ãã¡ç) ã¯ãå®éšãåå©çšããããã€ã¡ã³ããå«ãæ©æ¢°åŠç¿ã®ã©ã€ããµã€ã¯ã«ã管çã§ãããªãŒãã³ãœãŒã¹ ãã©ãããã©ãŒã ã§ãã
MLflow ã®ã»ããã¢ãã
MLflow ã䜿çšããã«ã¯ããŸã Python ç°å¢å
šäœãã»ããã¢ããããå¿
èŠããããŸããããã«ã¯ã以äžã䜿çšããŸãã
```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```
å¿ èŠãªã©ã€ãã©ãªãã€ã³ã¹ããŒã«ããŸãã
```
pip install mlflow==0.7.0
Cython==0.29
numpy==1.14.5
pandas==0.23.4
pyarrow==0.11.0
```
泚: UDF ãªã©ã®ã¢ãã«ãå®è¡ããããã« PyArrow ã䜿çšããŠããŸãã PyArrow ãš Numpy ã®ããŒãžã§ã³ã¯ãææ°ããŒãžã§ã³ãäºãã«ç«¶åããŠãããããä¿®æ£ããå¿ èŠããããŸããã
远跡UIã®èµ·å
MLflow Tracking ã䜿çšãããšãPython ã䜿çšããå®éšã®ãã°ãšã¯ãšãªãå®è¡ã§ããŸãã
# Running a Tracking Server
mlflow server
--file-store /tmp/mlflow/fileStore
--default-artifact-root s3://<bucket>/mlflow/artifacts/
--host localhost
--port 5000
MLflow ã§ã¯ãæ°žç¶çãªãã¡ã€ã« ã¹ãã¬ãŒãžã®äœ¿çšãæšå¥šããŸãã ãã¡ã€ã« ã¹ãã¬ãŒãžã¯ããµãŒããŒãå®è¡ãšå®éšçšã®ã¡ã¿ããŒã¿ãä¿åããå Žæã§ãã ãµãŒããŒãèµ·åãããšãã¯ããµãŒããŒãæ°žç¶çãªãã¡ã€ã« ã¹ãã¬ãŒãžããã€ã³ãããŠããããšã確èªããŠãã ããã ããã§ã¯ãå®éšã®ããã«åçŽã«äœ¿çšããŸãã /tmp
.
mlflow ãµãŒããŒã䜿çšããŠå€ãå®éšãå®è¡ããå Žåã¯ããããã®å®éšããã¡ã€ã« ã¹ãã¢ã«ååšããå¿ èŠãããããšã«æ³šæããŠãã ããã ãã ããããããªããŠããå¿ èŠãªã®ã¯ã¢ãã«ãžã®ãã¹ã ãã§ãããããUDF ã§äœ¿çšã§ããŸãã
泚: ãã©ããã³ã° UI ãšã¢ãã« ã¯ã©ã€ã¢ã³ãã¯ã¢ãŒãã£ãã¡ã¯ãã®å Žæã«ã¢ã¯ã»ã¹ã§ããå¿ èŠãããããšã«æ³šæããŠãã ããã ã€ãŸãããã©ããã³ã° UI ã EC2 ã€ã³ã¹ã¿ã³ã¹ã«é 眮ãããŠãããšããäºå®ã«é¢ä¿ãªããMLflow ãããŒã«ã«ã§å®è¡ããå Žåããã·ã³ã¯ã¢ãŒãã£ãã¡ã¯ã ã¢ãã«ãäœæããããã« S3 ã«çŽæ¥ã¢ã¯ã»ã¹ã§ããå¿ èŠããããŸãã
远跡 UI ã¯ã¢ãŒãã£ãã¡ã¯ãã S3 ãã±ããã«ä¿åããŸã
ã©ã³ãã³ã°ã¢ãã«
Tracking ãµãŒããŒãå®è¡ããããšããã«ãã¢ãã«ã®ãã¬ãŒãã³ã°ãéå§ã§ããŸãã
äŸãšããŠã次㮠MLflow ãµã³ãã«ã®ã¯ã€ã³ã®å€æŽã䜿çšããŸãã
MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py
--alpha 0.9
--l1_ration 0.5
--wine_file ./data/winequality-red.csv
åè¿°ããããã«ãMLflow ã䜿çšãããšããã©ã¡ãŒã¿ãŒãã¡ããªã¯ã¹ãã¢ãã« ã¢ãŒãã£ãã¡ã¯ãããã°ã«èšé²ã§ãããããããããå埩ãšããŠã©ã®ããã«çºå±ãããã远跡ã§ããŸãã ãã®æ©èœã¯ããã©ããã³ã° ãµãŒããŒã«ã¢ã¯ã»ã¹ããããã³ãããã® git ããã·ã¥ ãã°ã䜿çšããŠå¿ èŠãªå埩ãå®è¡ããã³ãŒããç解ãããããããšã§ãæé©ãªã¢ãã«ãåçŸã§ãããããéåžžã«äŸ¿å©ã§ãã
with mlflow.start_run():
... model ...
mlflow.log_param("source", wine_path)
mlflow.log_param("alpha", alpha)
mlflow.log_param("l1_ratio", l1_ratio)
mlflow.log_metric("rmse", rmse)
mlflow.log_metric("r2", r2)
mlflow.log_metric("mae", mae)
mlflow.set_tag('domain', 'wine')
mlflow.set_tag('predict', 'quality')
mlflow.sklearn.log_model(lr, "model")
ã¯ã€ã³ã®å埩
ã¢ãã«ã®ããã¯ãšã³ã
ãmlflow ãµãŒããŒãã³ãã³ãã§èµ·åããã MLflow 远跡ãµãŒããŒã«ã¯ãå®è¡ã远跡ããããŒã«ã« ãã¡ã€ã« ã·ã¹ãã ã«ããŒã¿ãæžã蟌ãããã® REST API ããããŸãã ãMLFLOW_TRACKING_URIãç°å¢å€æ°ã䜿çšããŠãã©ããã³ã° ãµãŒããŒã®ã¢ãã¬ã¹ãæå®ãããšãMLflow ãã©ããã³ã° API ã¯ãã®ã¢ãã¬ã¹ã§ãã©ããã³ã° ãµãŒããŒã«èªåçã«æ¥ç¶ããèµ·åæ å ±ããã®ã³ã° ã¡ããªã¯ã¹ãªã©ãäœæ/ååŸããŸãã
åºæïŒ
Docs// 远跡ãµãŒããŒã®å®è¡
ã¢ãã«ã«ãµãŒããŒãæäŸããã«ã¯ãå®è¡äžã®è¿œè·¡ãµãŒã㌠(èµ·åã€ã³ã¿ãŒãã§ã€ã¹ãåç §) ãšã¢ãã«ã®å®è¡ ID ãå¿ èŠã§ãã
å®è¡ID
# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve
--port 5005
--run_id 0f8691808e914d1087cf097a08730f17
--model-path model
MLflow ãµãŒãæ©èœã䜿çšããŠã¢ãã«ãæäŸããã«ã¯ããã©ããã³ã° UI ã«ã¢ã¯ã»ã¹ããŠã次ã®ããã«æå®ããã ãã§ã¢ãã«ã«é¢ããæ
å ±ãååŸããå¿
èŠããããŸãã --run_id
.
ã¢ãã«ã远跡ãµãŒããŒã«æ¥ç¶ãããšãæ°ããã¢ãã«ã®ãšã³ããã€ã³ããååŸã§ããŸãã
# Query Tracking Server Endpoint
curl -X POST
http://127.0.0.1:5005/invocations
-H 'Content-Type: application/json'
-d '[
{
"fixed acidity": 3.42,
"volatile acidity": 1.66,
"citric acid": 0.48,
"residual sugar": 4.2,
"chloridessssss": 0.229,
"free sulfur dsioxide": 19,
"total sulfur dioxide": 25,
"density": 1.98,
"pH": 5.33,
"sulphates": 4.39,
"alcohol": 10.8
}
]'
> {"predictions": [5.825055635303461]}
Spark ããã¢ãã«ãå®è¡ãã
Tracking ãµãŒããŒã¯ãªã¢ã«ã¿ã€ã ã§ã¢ãã«ãæäŸããã¢ãã«ããã¬ãŒãã³ã°ãããµãŒããŒæ©èœã䜿çšããã®ã«åå匷åã§ãããšããäºå®ã«ãããããã (åºå
ž:
ãªãã©ã€ã³ ãã¬ãŒãã³ã°ãè¡ã£ãåŸãåºåã¢ãã«ããã¹ãŠã®ããŒã¿ã«é©çšãããšæ³åããŠãã ããã ããã§ãSpark ãš MLflow ãç䟡ãçºæ®ããŸãã
PySpark + Jupyter + Spark ãã€ã³ã¹ããŒã«ãã
åºæïŒ
PySpark - Jupyter ãå§ããŸããã
MLflow ã¢ãã«ã Spark ããŒã¿ãã¬ãŒã ã«é©çšããæ¹æ³ã瀺ãã«ã¯ãPySpark ãšé£æºããããã« Jupyter ããŒãããã¯ãã»ããã¢ããããå¿ èŠããããŸãã
ãŸãã¯ææ°ã®å®å®ããŒãžã§ã³ãã€ã³ã¹ããŒã«ããŠãã ãã
cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/sparkÌ
PySpark ãš Jupyter ãä»®æ³ç°å¢ã«ã€ã³ã¹ããŒã«ããŸãã
pip install pyspark jupyter
ç°å¢å€æ°ãèšå®ããŸãã
export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"
決ããäžã§ notebook-dir
, ããŒãããã¯ãç®çã®ãã©ã«ããŒã«ä¿åã§ããããã«ãªããŸãã
PySpark ãã Jupyter ãå®è¡ãã
Jupiter ã PySpark ãã©ã€ããŒãšããŠã»ããã¢ããã§ããã®ã§ãJupyter ããŒãããã¯ã PySpark ã³ã³ããã¹ãã§å®è¡ã§ããããã«ãªããŸããã
(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]
Copy/paste this URL into your browser when you connect for the first time,
to login with a token:
http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
åè¿°ããããã«ãMLflow ã¯ã¢ãã«ã®ã¢ãŒãã£ãã¡ã¯ãã S3 ã«ãã°èšé²ããæ©èœãæäŸããŸãã éžæããã¢ãã«ãæã«å
¥ãããšããã«ãã¢ãžã¥ãŒã«ã䜿çšããŠããã UDF ãšããŠã€ã³ããŒãããããšãã§ããŸãã mlflow.pyfunc
.
import mlflow.pyfunc
model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)
df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
"residual sugar", "chlorides", "free sulfur dioxide",
"total sulfur dioxide", "density", "pH",
"sulphates", "alcohol"
]
df.withColumn('prediction', wine_udf(*columns)).show(100, False)
PySpark - ã¯ã€ã³ã®å質ãäºæž¬ãã
ãããŸã§ãã¯ã€ã³ ããŒã¿ã»ããå šäœã«å¯ŸããŠã¯ã€ã³ã®å質äºæž¬ãå®è¡ããããšã«ãããMLflow 㧠PySpark ã䜿çšããæ¹æ³ã«ã€ããŠèª¬æããŠããŸããã ããããScala Spark ã® Python MLflow ã¢ãžã¥ãŒã«ã䜿çšããå¿ èŠãããå Žåã¯ã©ãããã°ããã§ãããã?
ããã«ã€ããŠããSpark ã³ã³ããã¹ãã Scala ãš Python ã®éã§åå²ããŠãã¹ãããŸããã ã€ãŸããPython 㧠MLflow UDF ãç»é²ããããã Scala ãã䜿çšããŸãã (ã¯ããæè¯ã®ãœãªã¥ãŒã·ã§ã³ã§ã¯ãªããããããŸããããç§ãã¡ãæã£ãŠãããã®ã¯ããã§ã)ã
Scala Spark + MLflow
ãã®äŸã§ã¯ã以äžãè¿œå ããŸã
Spark + Toree + Jupyter ãã€ã³ã¹ããŒã«ãã
pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
apache_toree_scala /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
python3 /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```
æ·»ä»ã®ããŒãããã¯ãããããããã«ãUDF 㯠Spark ãš PySpark ã®éã§å ±æãããŸãã ãã®ããŒãããScala ãæããæ©æ¢°åŠç¿ã¢ãã«ãæ¬çªç°å¢ã«ãããã€ããããšèããŠãã人ã«ãšã£ãŠåœ¹ç«ã€ããšãé¡ã£ãŠããŸãã
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex
val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r
def getFieldAlias(field_name: String): String = {
FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}
def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
val fieldsToSelect: List[Column] = columns.map(field =>
col(field).as(getFieldAlias(field))
)
df.select(fieldsToSelect: _*)
}
def normalizeSchema(df: DataFrame): DataFrame = {
val schema = df.columns.toList
df.transform(selectFieldsNormalized(schema))
}
FirstAtRe = ^_
AliasRe = [s_.:@]+
getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
.format("csv")
.option("header", "true")
.option("delimiter", ";")
.load(winePath)
.transform(normalizeSchema)
df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc
model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)
spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT
quality,
wineQuality(
fixed_acidity,
volatile_acidity,
citric_acid,
residual_sugar,
chlorides,
free_sulfur_dioxide,
total_sulfur_dioxide,
density,
pH,
sulphates,
alcohol
) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality| prediction|
+-------+------------------+
| 5| 5.576883967129615|
| 5| 5.50664776916154|
| 5| 5.525504822954496|
| 6| 5.504311247097457|
| 5| 5.576883967129615|
| 5|5.5556903912725755|
| 5| 5.467882654744997|
| 7| 5.710602976324739|
| 7| 5.657319539336507|
| 5| 5.345098606538708|
+-------+------------------+
In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)
+-----------+--------+-----------+---------+-----------+
|name |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null |null |null |true |
+-----------+--------+-----------+---------+-----------+
次ã®ã¹ããã
ãã®èšäºã®å·çæç¹ã§ã¯ MLflow ã¯ã¢ã«ãã¡çã§ãããããªãææã«èŠããŸãã è€æ°ã®æ©æ¢°åŠç¿ãã¬ãŒã ã¯ãŒã¯ãå®è¡ããåäžã®ãšã³ããã€ã³ããã䜿çšã§ããã ãã§ãã¬ã³ã¡ã³ã㌠ã·ã¹ãã ã¯æ¬¡ã®ã¬ãã«ã«åŒãäžããããŸãã
ããã«ãMLflow ã¯ããŒã¿ ãšã³ãžãã¢ãšããŒã¿ ãµã€ãšã³ãã£ã¹ããè¿ã¥ããäž¡è ã®éã«å ±éã®å±€ãç¯ããŸãã
ãã® MLflow ã®æ¢çŽ¢ã®åŸã¯ãå¿ ãå ã«é²ã¿ãSpark ãã€ãã©ã€ã³ãšã¬ã³ã¡ã³ã㌠ã·ã¹ãã ã« MLflow ã䜿çšããŠãããŸãã
ãã¡ã€ã« ã¹ãã¬ãŒãžããã¡ã€ã« ã·ã¹ãã ã§ã¯ãªãããŒã¿ããŒã¹ãšåæãããšããã§ãããã ããã«ãããåããã¡ã€ã«å
±æã䜿çšã§ããè€æ°ã®ãšã³ããã€ã³ããåŸãããã¯ãã§ãã ããšãã°ãè€æ°ã®ã€ã³ã¹ã¿ã³ã¹ã䜿çšããŸãã
èŠçŽãããšãããŒã¿ãæ±ãäœæ¥ãããèå³æ·±ããã®ã«ããŠãã ãã£ã MLFlow ã³ãã¥ããã£ã«æè¬ããããšæããŸãã
MLflow ã䜿çšããŠããå Žåã¯ããæ°è»œã«ãã®äœ¿çšæ¹æ³ããç¥ãããã ãããéçšç°å¢ã§äœ¿çšããŠããå Žåã¯ããã«ããã§ãã
ã³ãŒã¹ã«ã€ããŠè©³ããã¯ã以äžãã芧ãã ããã
ç¶ããèªãïŒ
äž»æååæãæåž«ããåŠç¿åé¡ã«é©çšããå Žåã®ãªã¹ã¯ãšæ³šæç¹ Docker ã䜿çšããæ©æ¢°åŠç¿ã¢ãã«ã®ããã〠- ããŒã 1 Docker ã䜿çšããæ©æ¢°åŠç¿ã¢ãã«ã®ããã〠- ããŒã 2
åºæïŒ habr.com