使用 MLflow 扩展 Spark

哈布罗夫斯克的居民们大家好。 正如我们已经写过的,本月 OTUS 将同时推出两门机器学习课程,即 基本 и 先进。 在这方面,我们继续分享有用的材料。

这篇文章的目的是谈谈我们第一次使用的经历 流量.

我们将开始审核 流量 来自其跟踪服务器并记录研究的所有迭代。 然后我们将分享使用UDF连接Spark和MLflow的经验。

上下文

我们在 阿尔法健康 我们使用机器学习和人工智能来帮助人们掌控自己的健康和福祉。 这就是为什么机器学习模型是我们开发的数据科学产品的核心,也是我们被 MLflow 这个涵盖机器学习生命周期各个方面的开源平台所吸引的原因。

流量

MLflow 的主要目标是在机器学习之上提供一个附加层,使数据科学家能够使用几乎所有机器学习库(h2o, 凯拉斯, 姆利普, pytorch, 斯克莱恩 и tensorflow),将她的工作提升到一个新的水平。

MLflow 提供三个组件:

  • 跟踪 – 实验记录和请求:代码、数据、配置和结果。 监控创建模型的过程非常重要。
  • 项目 – 可在任何平台上运行的打包格式(例如 SageMaker)
  • 型号 – 将模型提交给各种部署工具的通用格式。

MLflow(在撰写本文时处于 Alpha 版)是一个开源平台,可让您管理机器学习生命周期,包括实验、重用和部署。

设置 MLflow

要使用 MLflow,您需要首先设置整个 Python 环境,为此我们将使用 py环境 (要在 Mac 上安装 Python,请查看 这里)。 这样我们就可以创建一个虚拟环境,在其中安装运行它所需的所有库。

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

让我们安装所需的库。

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

注意:我们使用 PyArrow 来运行 UDF 等模型。 PyArrow 和 Numpy 的版本需要修复,因为后者版本相互冲突。

启动跟踪用户界面

MLflow Tracking 允许我们使用 Python 记录和查询实验 REST的 API。 此外,您还可以确定模型工件的存储位置(localhost、 Amazon S3, Azure Blob存储, 谷歌云存储 или SFTP服务器)。 由于我们在 Alpha Health 使用 AWS,因此我们的工件存储将是 S3。

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow 建议使用持久文件存储。 文件存储是服务器存储运行和实验元数据的地方。 启动服务器时,请确保它指向持久文件存储。 在实验中我们将简单地使用 /tmp.

请记住,如果我们想使用 mlflow 服务器来运行旧实验,它们必须存在于文件存储中。 然而,即使没有这个,我们也可以在 UDF 中使用它们,因为我们只需要模型的路径。

注意:请记住,跟踪 UI 和模型客户端必须有权访问工件位置。 也就是说,无论跟踪 UI 驻留在 EC2 实例中,在本地运行 MLflow 时,机器都必须能够直接访问 S3 才能编写工件模型。

使用 MLflow 扩展 Spark
跟踪 UI 将工件存储在 S3 存储桶中

跑步模型

跟踪服务器运行后,您就可以开始训练模型。

作为示例,我们将使用 MLflow 示例中的 wine 修改 斯克莱恩.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

正如我们已经讨论过的,MLflow 允许您记录模型参数、指标和工件,以便您可以跟踪它们在迭代过程中的演变情况。 此功能非常有用,因为通过这种方式,我们可以通过联系跟踪服务器或使用提交的 git 哈希日志了解哪些代码执行了所需的迭代来重现最佳模型。

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

使用 MLflow 扩展 Spark
葡萄酒迭代

模型的服务器部分

使用“mlflow server”命令启动的 MLflow 跟踪服务器具有一个 REST API,用于跟踪运行并将数据写入本地文件系统。 您可以使用环境变量“MLFLOW_TRACKING_URI”指定跟踪服务器地址,MLflow 跟踪 API 将自动联系该地址的跟踪服务器以创建/接收启动信息、日志指标等。

来源: Docs// 运行跟踪服务器

为了给模型提供服务器,我们需要一个正在运行的跟踪服务器(参见启动界面)和模型的运行 ID。

使用 MLflow 扩展 Spark
运行 ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

要使用 MLflow 服务功能来服务模型,我们需要访问跟踪 UI,只需指定即可接收有关模型的信息 --run_id.

一旦模型联系跟踪服务器,我们就可以获得一个新的模型端点。

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

从 Spark 运行模型

尽管跟踪服务器功能强大,足以实时服务模型,但仍需要训练它们并使用服务器功能(来源: mlflow // 文档 // 模型 # 本地),由于分布式,使用 Spark(批处理或流式处理)是一种更强大的解决方案。

想象一下,您只是离线进行训练,然后将输出模型应用于所有数据。 这就是 Spark 和 MLflow 的闪光点。

安装 PySpark + Jupyter + Spark

来源: 开始使用 PySpark - Jupyter

为了展示如何将 MLflow 模型应用到 Spark 数据帧,我们需要设置 Jupyter 笔记本以与 PySpark 配合使用。

首先安装最新的稳定版本 Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

在虚拟环境中安装PySpark和Jupyter:

pip install pyspark jupyter

设置环境变量:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

已确定 notebook-dir,我们可以将我们的笔记本存储在所需的文件夹中。

从 PySpark 启动 Jupyter

由于我们能够将 Jupiter 配置为 PySpark 驱动程序,因此我们现在可以在 PySpark 的上下文中运行 Jupyter Notebook。

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

使用 MLflow 扩展 Spark

如上所述,MLflow 提供了在 S3 中记录模型工件的功能。 一旦我们掌握了选定的模型,我们就有机会使用该模块将其作为 UDF 导入 mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

使用 MLflow 扩展 Spark
PySpark - 输出葡萄酒质量预测

到目前为止,我们已经讨论了如何将 PySpark 与 MLflow 结合使用,对整个葡萄酒数据集运行葡萄酒质量预测。 但是,如果您需要使用 Scala Spark 中的 Python MLflow 模块怎么办?

我们也通过在 Scala 和 Python 之间拆分 Spark 上下文来测试这一点。 也就是说,我们在 Python 中注册了 MLflow UDF,并从 Scala 中使用它(是的,也许不是最好的解决方案,但我们拥有)。

Scala Spark + MLflow

对于这个例子,我们将添加 Toree内核 进入现有的木星。

安装 Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

从附带的笔记本中可以看到,UDF 在 Spark 和 PySpark 之间共享。 我们希望这部分对那些热爱 Scala 并希望在生产中部署机器学习模型的人有用。

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

下一步

尽管 MLflow 在撰写本文时仍处于 Alpha 版本,但它看起来很有前途。 仅仅运行多个机器学习框架并从单个端点使用它们的能力就可以将推荐系统提升到一个新的水平。

此外,MLflow 使数据工程师和数据科学专家更加紧密地联系在一起,在他们之间奠定了一个公共层。

在对 MLflow 进行探索之后,我们有信心继续前进并将其用于我们的 Spark 管道和推荐系统。

将文件存储与数据库而不是文件系统同步会很好。 这应该为我们提供可以使用相同文件存储的多个端点。 例如,使用多个实例 急板 и 雅典娜 使用相同的 Glue 元存储。

总而言之,我要感谢 MLFlow 社区,让我们的数据工作变得更加有趣。

如果您正在使用 MLflow,请随时写信给我们并告诉我们您如何使用它,如果您在生产中使用它,则更是如此。

了解有关课程的更多信息:
机器学习。 基础课程
机器学习。 进阶课程

阅读更多:

来源: habr.com

添加评论