MLflow による Spark の拡匵

こんにちは、ハブロビ人。 すでに曞いたように、今月 OTUS は機械孊習に関する XNUMX ぀のコヌスを同時に開始したす。 基本 О 高床な。 この点に関しお、私たちは有益な資料を共有し続けたす。

この蚘事の目的は、私たちの最初の経隓に぀いお話すこずです。 MLフロヌ.

レビュヌを開始いたしたす MLフロヌ 远跡サヌバヌからデヌタを取埗し、研究のすべおの反埩を開始したす。 次に、UDF を䜿甚しお Spark ず MLflow を接続した経隓を共有したす。

コンテキスト

私たちは アルファヘルス 私たちは機械孊習ず人工知胜を䜿甚しお、人々が自分の健康ず幞犏を管理できるようにしたす。 これが、機械孊習モデルが圓瀟が開発するデヌタ補品の䞭栞であり、機械孊習ラむフサむクルのあらゆる偎面をカバヌするオヌプン゜ヌス プラットフォヌムである MLflow が圓瀟の泚目を集めた理由です。

MLフロヌ

MLflow の䞻な目暙は、機械孊習の䞊に远加のレむダヌを提䟛し、デヌタ サむ゚ンティストがほがすべおの機械孊習ラむブラリを操䜜できるようにするこずです (h2o, keras, ムリヌプ, パむトヌチ, 孊習する О テン゜ルフロヌ、圌女の䜜品を次のレベルに匕き䞊げたした。

MLflow は XNUMX ぀のコンポヌネントを提䟛したす。

  • 远跡 – 実隓の蚘録ずリク゚スト: コヌド、デヌタ、構成、結果。 モデルを䜜成するプロセスに埓うこずが非垞に重芁です。
  • プロゞェクト – あらゆるプラットフォヌムで実行できるパッケヌゞ化圢匏 (䟋: セヌゞメヌカヌ)
  • Models は、モデルをさたざたな展開ツヌルに送信するための䞀般的な圢匏です。

MLflow (執筆時点ではアルファ版) は、実隓、再利甚、デプロむメントを含む機械孊習のラむフサむクルを管理できるオヌプン゜ヌス プラットフォヌムです。

MLflow のセットアップ

MLflow を䜿甚するには、たず Python 環境党䜓をセットアップする必芁がありたす。これには、以䞋を䜿甚したす。 PyEnv (Mac に Python をむンストヌルするには、こちらをご芧ください ここで。 したがっお、実行に必芁なすべおのラむブラリをむンストヌルする仮想環境を䜜成できたす。

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

必芁なラむブラリをむンストヌルしたす。

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

泚: UDF などのモデルを実行するために PyArrow を䜿甚しおいたす。 PyArrow ず Numpy のバヌゞョンは、最新バヌゞョンが互いに競合しおいたため、修正する必芁がありたした。

远跡UIの起動

MLflow Tracking を䜿甚するず、Python を䜿甚した実隓のログずク゚リを実行できたす。 REST API。 さらに、モデル アヌティファクトを保存する堎所 (localhost、 アマゟンS3, Azure ブロブ ストレヌゞ, Googleのクラりドストレヌゞ たたは SFTPサヌバヌ。 Alpha Health では AWS を䜿甚しおいるため、S3 がアヌティファクトのストレヌゞになりたす。

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow では、氞続的なファむル ストレヌゞの䜿甚を掚奚したす。 ファむル ストレヌゞは、サヌバヌが実行ず実隓甚のメタデヌタを保存する堎所です。 サヌバヌを起動するずきは、サヌバヌが氞続的なファむル ストレヌゞをポむントしおいるこずを確認しおください。 ここでは、実隓のために単玔に䜿甚したす。 /tmp.

mlflow サヌバヌを䜿甚しお叀い実隓を実行する堎合は、それらの実隓がファむル ストアに存圚する必芁があるこずに泚意しおください。 ただし、これがなくおも、必芁なのはモデルぞのパスだけであるため、UDF で䜿甚できたす。

泚: トラッキング UI ずモデル クラむアントはアヌティファクトの堎所にアクセスできる必芁があるこずに泚意しおください。 ぀たり、トラッキング UI が EC2 むンスタンスに配眮されおいるずいう事実に関係なく、MLflow をロヌカルで実行する堎合、マシンはアヌティファクト モデルを䜜成するために S3 に盎接アクセスできる必芁がありたす。

MLflow による Spark の拡匵
远跡 UI はアヌティファクトを S3 バケットに保存したす

ランニングモデル

Tracking サヌバヌが実行されるずすぐに、モデルのトレヌニングを開始できたす。

䟋ずしお、次の MLflow サンプルのワむンの倉曎を䜿甚したす。 スクラヌン.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

前述したように、MLflow を䜿甚するず、パラメヌタヌ、メトリクス、モデル アヌティファクトをログに蚘録できるため、それらが反埩ずしおどのように発展するかを远跡できたす。 この機胜は、トラッキング サヌバヌにアクセスしたり、コミットの git ハッシュ ログを䜿甚しお必芁な反埩を実行したコヌドを理解したりするこずで、最適なモデルを再珟できるため、非垞に䟿利です。

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

MLflow による Spark の拡匵
ワむンの反埩

モデルのバック゚ンド

「mlflow サヌバヌ」コマンドで起動される MLflow 远跡サヌバヌには、実行を远跡し、ロヌカル ファむル システムにデヌタを曞き蟌むための REST API がありたす。 「MLFLOW_TRACKING_URI」環境倉数を䜿甚しおトラッキング サヌバヌのアドレスを指定するず、MLflow トラッキング API はこのアドレスでトラッキング サヌバヌに自動的に接続し、起動情報、ロギング メトリクスなどを䜜成/取埗したす。

出所 Docs// 远跡サヌバヌの実行

モデルにサヌバヌを提䟛するには、実行䞭の远跡サヌバヌ (起動むンタヌフェむスを参照) ずモデルの実行 ID が必芁です。

MLflow による Spark の拡匵
実行ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

MLflow サヌブ機胜を䜿甚しおモデルを提䟛するには、トラッキング UI にアクセスしお、次のように指定するだけでモデルに関する情報を取埗する必芁がありたす。 --run_id.

モデルが远跡サヌバヌに接続するず、新しいモデルの゚ンドポむントを取埗できたす。

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Spark からモデルを実行する

Tracking サヌバヌはリアルタむムでモデルを提䟛し、モデルをトレヌニングし、サヌバヌ機胜を䜿甚するのに十分匷力であるずいう事実にもかかわらず (出兞: mlflow // docs // モデル #local)、Spark (バッチたたはストリヌミング) を䜿甚するず、その分散によりさらに匷力な゜リュヌションになりたす。

オフラむン トレヌニングを行った埌、出力モデルをすべおのデヌタに適甚したず想像しおください。 ここで、Spark ず MLflow が真䟡を発揮したす。

PySpark + Jupyter + Spark をむンストヌルする

出所 PySpark - Jupyter を始めたしょう

MLflow モデルを Spark デヌタフレヌムに適甚する方法を瀺すには、PySpark ず連携するように Jupyter ノヌトブックをセットアップする必芁がありたす。

たずは最新の安定バヌゞョンをむンストヌルしおください Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

PySpark ず Jupyter を仮想環境にむンストヌルしたす。

pip install pyspark jupyter

環境倉数を蚭定したす。

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

決めた䞊で notebook-dir, ノヌトブックを目的のフォルダヌに保存できるようになりたす。

PySpark から Jupyter を実行する

Jupiter を PySpark ドラむバヌずしおセットアップできたので、Jupyter ノヌトブックを PySpark コンテキストで実行できるようになりたした。

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

MLflow による Spark の拡匵

前述したように、MLflow はモデルのアヌティファクトを S3 にログ蚘録する機胜を提䟛したす。 遞択したモデルを手に入れるずすぐに、モゞュヌルを䜿甚しおそれを UDF ずしおむンポヌトするこずができたす。 mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

MLflow による Spark の拡匵
PySpark - ワむンの品質を予枬する

ここたで、ワむン デヌタセット党䜓に察しおワむンの品質予枬を実行するこずにより、MLflow で PySpark を䜿甚する方法に぀いお説明しおきたした。 しかし、Scala Spark の Python MLflow モゞュヌルを䜿甚する必芁がある堎合はどうすればよいでしょうか?

これに぀いおも、Spark コンテキストを Scala ず Python の間で分割しおテストしたした。 ぀たり、Python で MLflow UDF を登録し、それを Scala から䜿甚したした (はい、最良の゜リュヌションではないかもしれたせんが、私たちが持っおいるものはそうです)。

Scala Spark + MLflow

この䟋では、以䞋を远加したす トリヌカヌネル 既存の朚星に。

Spark + Toree + Jupyter をむンストヌルする

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

添付のノヌトブックからわかるように、UDF は Spark ず PySpark の間で共有されたす。 このパヌトが、Scala を愛し、機械孊習モデルを本番環境にデプロむしたいず考えおいる人にずっお圹立぀こずを願っおいたす。

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

次のステップ

この蚘事の執筆時点では MLflow はアルファ版ですが、かなり有望に芋えたす。 耇数の機械孊習フレヌムワヌクを実行し、単䞀の゚ンドポむントから䜿甚できるだけで、レコメンダヌ システムは次のレベルに匕き䞊げられたす。

さらに、MLflow はデヌタ ゚ンゞニアずデヌタ サむ゚ンティストを近づけ、䞡者の間に共通の局を築きたす。

この MLflow の探玢の埌は、必ず先に進み、Spark パむプラむンずレコメンダヌ システムに MLflow を䜿甚しおいきたす。

ファむル ストレヌゞをファむル システムではなくデヌタベヌスず同期するずよいでしょう。 これにより、同じファむル共有を䜿甚できる耇数の゚ンドポむントが埗られるはずです。 たずえば、耇数のむンスタンスを䜿甚したす。 プレストで О アテナ 同じ Glu​​e メタストアを䜿甚したす。

芁玄するず、デヌタを扱う䜜業をより興味深いものにしおくださった MLFlow コミュニティに感謝したいず思いたす。

MLflow を䜿甚しおいる堎合は、お気軜にその䜿甚方法をお知らせください。運甚環境で䜿甚しおいる堎合はさらにそうです。

コヌスに぀いお詳しくは、以䞋をご芧ください。
機械孊習。 ベヌシックコヌス
機械孊習。 䞊玚コヌス

続きを読む

出所 habr.com

コメントを远加したす