Extending Spark with MLflow

Hello, Khabrovites. As we already wrote, this month OTUS launches two courses on machine learning at once, namely base ΠΈ advanced. In this regard, we continue to share useful material.

The purpose of this article is to talk about our first experience with MLflow.

We will start the review MLflow from its tracking server and prolog all iterations of the study. Then we will share the experience of connecting Spark with MLflow using UDF.

Context

We are AlphaHealth we use machine learning and artificial intelligence to empower people to take care of their health and well-being. This is why machine learning models are at the core of the data products we develop, and why MLflow, an open source platform that covers all aspects of the machine learning lifecycle, caught our attention.

MLflow

The main goal of MLflow is to provide an additional layer on top of machine learning that would allow data scientists to work with almost any machine learning library (h2o, hard, mleap, pytorch, sklearn ΠΈ tensorflow), taking her work to the next level.

MLflow provides three components:

  • Tracking – recording and requests for experiments: code, data, configuration and results. It is very important to follow the process of creating a model.
  • Projects – Packaging format to run on any platform (for example, SageMaker)
  • Model fee is a common format for submitting models to various deployment tools.

MLflow (alpha at the time of writing) is an open source platform that allows you to manage the machine learning lifecycle, including experimentation, reuse, and deployment.

Setting up MLflow

To use MLflow, you must first set up the entire Python environment, for this we will use PyEnv (to install Python on a Mac, take a look here). So we can create a virtual environment where we will install all the libraries necessary to run.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Install the required libraries.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Note: We are using PyArrow to run models like UDFs. The versions of PyArrow and Numpy needed to be fixed because the latest versions were in conflict with each other.

Launching Tracking UI

MLflow Tracking allows us to log and query experiments with Python and REST API. In addition, you can define where to store model artifacts (localhost, Amazon S3, Azure Blob Storage, Google Cloud Storage or SFTP server). Since we use AWS at Alpha Health, S3 will be the storage for the artifacts.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow recommends using persistent file storage. The file storage is where the server will store run and experiment metadata. When starting the server, make sure it points to persistent file storage. Here, for the sake of experiment, we will simply use /tmp.

Keep in mind that if we want to use the mlflow server to run old experiments, they must be present in the file store. However, even without this, we would be able to use them in the UDF, since we only need the path to the model.

Note: Keep in mind that the Tracking UI and the model client must have access to the artifact's location. That is, regardless of the fact that the Tracking UI is located in an EC2 instance, when running MLflow locally, the machine must have direct access to S3 to write artifact models.

Extending Spark with MLflow
Tracking UI stores artifacts in S3 bucket

Running Models

As soon as the Tracking server is running, you can start training the models.

As an example, we will use the wine modification from the MLflow example in Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

As we said, MLflow allows you to log the parameters, metrics, and model artifacts so that you can track how they develop as iterations. This feature is extremely useful, because it allows us to reproduce the best model by contacting the Tracking server or understanding which code performed the required iteration using the git hash logs of the commits.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Extending Spark with MLflow
wine iterations

Back end for the model

The MLflow tracking server launched with the β€œmlflow server” command has a REST API for tracking runs and writing data to the local file system. You can specify the address of the tracking server using the "MLFLOW_TRACKING_URI" environment variable and the MLflow tracking API will automatically contact the tracking server at this address to create/get launch information, logging metrics, etc.

Source: Docs// Running a tracking server

To provide the model with a server, we need a running tracking server (see the launch interface) and the Run ID of the model.

Extending Spark with MLflow
Run ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

To serve models using the MLflow serve functionality, we need access to the Tracking UI to get information about the model by simply specifying --run_id.

Once the model contacts the Tracking Server, we can get a new model endpoint.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Running models from Spark

Despite the fact that the Tracking server is powerful enough to serve models in real time, train them and use the server functionality (source: mlflow // docs // models #local), using Spark (batch or streaming) is an even more powerful solution due to its distribution.

Imagine that you just did offline training and then applied the output model to all your data. This is where Spark and MLflow come into their own.

Install PySpark + Jupyter + Spark

Source: Get started PySpark - Jupyter

To show how we apply MLflow models to Spark dataframes, we need to set up Jupyter notebooks to work with PySpark.

Start by installing the latest stable version Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/sparkΜ€

Install PySpark and Jupyter in a virtual environment:

pip install pyspark jupyter

Set up environment variables:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Having defined notebook-dir, we will be able to store our notebooks in the desired folder.

Running Jupyter from PySpark

Since we were able to set up Jupiter as a PySpark driver, we can now run the Jupyter notebook in a PySpark context.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Extending Spark with MLflow

As mentioned above, MLflow provides the function of logging model artifacts in S3. As soon as we have the selected model in our hands, we have the opportunity to import it as a UDF using the module mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Extending Spark with MLflow
PySpark - Predicting wine quality

Up to this point, we have talked about how to use PySpark with MLflow by running wine quality prediction on the entire wine dataset. But what if you need to use the Python MLflow modules from Scala Spark?

We tested this as well by splitting the Spark context between Scala and Python. That is, we registered MLflow UDF in Python, and used it from Scala (yes, maybe not the best solution, but what we have).

Scala Spark + MLflow

For this example, we will add Toree Kernel into an existing Jupiter.

Install Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

As you can see from the attached notebook, UDF is shared between Spark and PySpark. We hope that this part will be useful for those who love Scala and want to deploy machine learning models to production.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Next Steps

Even though MLflow is in Alpha at the time of writing, it looks pretty promising. Just being able to run multiple machine learning frameworks and use them from a single endpoint takes recommender systems to the next level.

In addition, MLflow brings Data engineers and Data Scientists closer together, laying a common layer between them.

After this exploration of MLflow, we are sure to go ahead and use it for our Spark pipelines and recommender systems.

It would be nice to synchronize the file storage with the database instead of the file system. This should give us multiple endpoints that can use the same file share. For example, use multiple instances Presto ΠΈ Athena with the same Glue metastore.

Summing up, I would like to say thanks to the MLFlow community for making our work with data more interesting.

If you play with MLflow, feel free to write to us and tell us how you use it, and even more so if you use it in production.

Learn more about courses:
machine learning. Basic course
machine learning. advanced course

Read more:

Source: habr.com

Add a comment