使用 MLflow 擴展 Spark

你好,Khabrovites。 正如我們已經寫過的,本月 OTUS 同時推出了兩門機器學習課程,即 根據 и 先進的. 在這方面,我們繼續分享有用的材料。

這篇文章的目的是談談我們的第一次體驗 流量.

我們將開始審查 流量 從它的跟踪服務器和序言研究的所有迭代。 然後我們將分享使用UDF連接Spark和MLflow的經驗。

上下文

我們在 阿爾法健康 我們使用機器學習和人工智能來幫助人們照顧自己的健康和福祉。 這就是為什麼機器學習模型是我們開發的數據產品的核心,也是為什麼 MLflow 這一涵蓋機器學習生命週期各個方面的開源平台引起了我們的關注。

流量

MLflow 的主要目標是在機器學習之上提供一個附加層,使數據科學家能夠使用幾乎所有機器學習庫(h2o, 凱拉斯, 跳躍, pytorch, 斯克萊恩 и tensorflow), 使她的工作更上一層樓。

MLflow 提供三個組件:

  • 追踪 – 記錄和實驗請求:代碼、數據、配置和結果。 遵循創建模型的過程非常重要。
  • 項目 – 在任何平台上運行的打包格式(例如, SageMaker)
  • 模特兒 是將模型提交給各種部署工具的通用格式。

MLflow(撰寫本文時為 alpha 版)是一個開源平台,可讓您管理機器學習生命週期,包括試驗、重用和部署。

設置 MLflow

要使用 MLflow,您必須首先設置整個 Python 環境,為此我們將使用 環境 (要在 Mac 上安裝 Python,請查看 這裡). 所以我們可以創建一個虛擬環境,我們將在其中安裝運行所需的所有庫。

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

安裝所需的庫。

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

注意:我們使用 PyArrow 來運行像 UDF 這樣的模型。 PyArrow 和 Numpy 的版本需要修復,因為最新版本相互衝突。

啟動跟踪用戶界面

MLflow Tracking 允許我們使用 Python 和 REST的 應用程序接口。 此外,您可以定義模型工件的存儲位置(本地主機、 亞馬遜S3, Azure Blob存儲, 谷歌雲存儲服務SFTP 服務器). 由於我們在 Alpha Health 使用 AWS,因此 S3 將成為工件的存儲。

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow 建議使用持久文件存儲。 文件存儲是服務器將存儲運行和實驗元數據的地方。 啟動服務器時,確保它指向持久文件存儲。 在這裡,為了實驗,我們將簡單地使用 /tmp.

請記住,如果我們想使用 mlflow 服務器運行舊實驗,它們必須存在於文件存儲中。 然而,即使沒有這個,我們也可以在 UDF 中使用它們,因為我們只需要模型的路徑。

注意:請記住,跟踪 UI 和模型客戶端必須有權訪問工件的位置。 也就是說,不管 Tracking UI 位於 EC2 實例中,在本地運行 MLflow 時,機器必須直接訪問 S3 才能編寫工件模型。

使用 MLflow 擴展 Spark
跟踪 UI 將工件存儲在 S3 存儲桶中

運行模型

一旦跟踪服務器運行,您就可以開始訓練模型。

作為示例,我們將使用 MLflow 示例中的 wine 修改 斯克萊恩.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

正如我們所說,MLflow 允許您記錄參數、指標和模型工件,以便您可以跟踪它們如何作為迭代進行開發。 此功能非常有用,因為它允許我們通過聯繫跟踪服務器或使用提交的 git 哈希日誌了解哪些代碼執行了所需的迭代來重現最佳模型。

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

使用 MLflow 擴展 Spark
葡萄酒迭代

模型的後端

使用“mlflow server”命令啟動的 MLflow 跟踪服務器有一個 REST API,用於跟踪運行並將數據寫入本地文件系統。 您可以使用“MLFLOW_TRACKING_URI”環境變量指定跟踪服務器的地址,MLflow 跟踪 API 將自動聯繫該地址的跟踪服務器以創建/獲取啟動信息、日誌記錄指標等。

來源: Docs// 運行跟踪服務器

要為模型提供服務器,我們需要一個正在運行的跟踪服務器(見啟動界面)和模型的運行 ID。

使用 MLflow 擴展 Spark
運行編號

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

要使用 MLflow 服務功能服務模型,我們需要訪問跟踪 UI 以通過簡單地指定來獲取有關模型的信息 --run_id.

一旦模型聯繫到跟踪服務器,我們就可以獲得一個新的模型端點。

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

從 Spark 運行模型

儘管 Tracking 服務器功能強大到足以實時為模型提供服務、訓練它們並使用服務器功能(來源: mlflow // 文檔 // 模型 #local),由於其分佈,使用 Spark(批處理或流處理)是一個更強大的解決方案。

想像一下,您剛剛進行了離線訓練,然後將輸出模型應用於所有數據。 這就是 Spark 和 MLflow 發揮作用的地方。

安裝 PySpark + Jupyter + Spark

來源: PySpark 入門 - Jupyter

為了展示我們如何將 MLflow 模型應用於 Spark 數據幀,我們需要設置 Jupyter notebooks 以使用 PySpark。

首先安裝最新的穩定版本 Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

在虛擬環境中安裝 PySpark 和 Jupyter:

pip install pyspark jupyter

設置環境變量:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

定義了 notebook-dir,我們將能夠將我們的筆記本存儲在所需的文件夾中。

從 PySpark 運行 Jupyter

由於我們能夠將 Jupiter 設置為 PySpark 驅動程序,因此我們現在可以在 PySpark 上下文中運行 Jupyter notebook。

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

使用 MLflow 擴展 Spark

如上所述,MLflow 在 S3 中提供了記錄模型工件的功能。 一旦我們掌握了選定的模型,我們就有機會使用模塊將其作為 UDF 導入 mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

使用 MLflow 擴展 Spark
PySpark - 預測葡萄酒質量

到目前為止,我們已經討論瞭如何通過在整個葡萄酒數據集上運行葡萄酒質量預測來將 PySpark 與 MLflow 結合使用。 但是,如果您需要使用 Scala Spark 中的 Python MLflow 模塊怎麼辦?

我們也通過在 Scala 和 Python 之間拆分 Spark 上下文來測試這一點。 也就是說,我們在 Python 中註冊了 MLflow UDF,並在 Scala 中使用它(是的,也許不是最好的解決方案,但我們有)。

斯卡拉星火 + MLflow

對於這個例子,我們將添加 托瑞內核 進入現有的木星。

安裝 Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

從隨附的筆記本中可以看出,UDF 在 Spark 和 PySpark 之間共享。 我們希望這部分內容對喜歡 Scala 並希望將機器學習模型部署到生產中的人有所幫助。

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

下一步

儘管 MLflow 在撰寫本文時處於 Alpha 階段,但它看起來很有前途。 僅僅能夠運行多個機器學習框架並從單個端點使用它們就可以將推薦系統提升到一個新的水平。

此外,MLflow 將數據工程師和數據科學家拉近了距離,在他們之間建立了一個公共層。

在探索 MLflow 之後,我們一定會繼續並將其用於我們的 Spark 管道和推薦系統。

最好將文件存儲與數據庫而不是文件系統同步。 這應該為我們提供多個可以使用相同文件共享的端點。 例如,使用多個實例 急板 и 雅典娜 使用相同的 Glue Metastore。

總而言之,我想感謝 MLFlow 社區,感謝他們讓我們的數據工作變得更加有趣。

如果您使用 MLflow,請隨時寫信告訴我們您如何使用它,如果您在生產中使用它更是如此。

了解有關課程的更多信息:
機器學習。 基礎課程
機器學習。 高級課程

閱讀更多:

來源: www.habr.com

添加評論