Estenent Spark amb MLflow

Hola, habitants de Khabrovsk. Com ja hem escrit, aquest mes OTUS llança dos cursos d'aprenentatge automàtic alhora, és a dir base и avançat. En aquest sentit, continuem compartint material útil.

L'objectiu d'aquest article és parlar de la nostra primera experiència d'ús MLflow.

Comencem la revisió MLflow des del seu servidor de seguiment i registre totes les iteracions de l'estudi. A continuació, compartirem la nostra experiència de connectar Spark amb MLflow mitjançant UDF.

Context

Estem a Alfa Salut Utilitzem l'aprenentatge automàtic i la intel·ligència artificial per capacitar les persones perquè es facin càrrec de la seva salut i benestar. És per això que els models d'aprenentatge automàtic estan al centre dels productes de ciència de dades que desenvolupem, i per això ens va atreure MLflow, una plataforma de codi obert que cobreix tots els aspectes del cicle de vida de l'aprenentatge automàtic.

MLflow

L'objectiu principal de MLflow és proporcionar una capa addicional a l'aprenentatge automàtic que permetria als científics de dades treballar amb gairebé qualsevol biblioteca d'aprenentatge automàtic (H2o, keras, mleap, pirtorx, sklearn и flux tensor), portant la seva feina al següent nivell.

MLflow proporciona tres components:

  • Rastreig – Enregistrament i sol·licituds d'experiments: codi, dades, configuració i resultats. El seguiment del procés de creació d'un model és molt important.
  • Projectes – Format d'embalatge per executar-se en qualsevol plataforma (p. SageMaker)
  • Models – un format comú per enviar models a diverses eines de desplegament.

MLflow (en alfa en el moment d'escriure) és una plataforma de codi obert que us permet gestionar el cicle de vida de l'aprenentatge automàtic, inclosa l'experimentació, la reutilització i el desplegament.

Configuració de MLflow

Per utilitzar MLflow, primer heu de configurar tot el vostre entorn Python, per a això farem servir PyEnv (per instal·lar Python a Mac, feu una ullada aquí). D'aquesta manera podrem crear un entorn virtual on instal·larem totes les biblioteques necessàries per executar-lo.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Instal·lem les biblioteques necessàries.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Nota: fem servir PyArrow per executar models com ara UDF. Les versions de PyArrow i Numpy s'havien de solucionar perquè les darreres versions entraven en conflicte entre elles.

Inicieu la interfície d'usuari de seguiment

El seguiment de MLflow ens permet registrar i consultar experiments amb Python i RESTA API. A més, podeu determinar on emmagatzemar els artefactes del model (localhost, Amazon S3, Emmagatzematge de blobs Azure, Google Cloud Storage o Servidor SFTP). Com que utilitzem AWS a Alpha Health, el nostre emmagatzematge d'artefactes serà S3.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow recomana utilitzar l'emmagatzematge de fitxers persistent. L'emmagatzematge de fitxers és on el servidor emmagatzemarà les metadades d'execució i d'experimentació. Quan inicieu el servidor, assegureu-vos que apunta al magatzem de fitxers persistent. Aquí per a l'experiment simplement utilitzarem /tmp.

Recordeu que si volem utilitzar el servidor mlflow per executar experiments antics, aquests han d'estar presents a l'emmagatzematge de fitxers. Tanmateix, fins i tot sense això podríem utilitzar-los a l'UDF, ja que només necessitem el camí cap al model.

Nota: tingueu en compte que la interfície d'usuari de seguiment i el client del model han de tenir accés a la ubicació de l'artefacte. És a dir, independentment del fet que la IU de seguiment resideixi en una instància EC2, quan s'executa MLflow localment, la màquina ha de tenir accés directe a S3 per escriure models d'artefactes.

Estenent Spark amb MLflow
La interfície d'usuari de seguiment emmagatzema artefactes en un cub S3

Models de running

Tan bon punt s'executi el servidor de seguiment, podeu començar a entrenar els models.

Com a exemple, utilitzarem la modificació del vi de l'exemple MLflow a Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Com ja hem comentat, MLflow us permet registrar paràmetres, mètriques i artefactes del model perquè pugueu fer un seguiment de com evolucionen al llarg de les iteracions. Aquesta característica és extremadament útil perquè d'aquesta manera podem reproduir el millor model posant-nos en contacte amb el servidor de seguiment o entenent quin codi va realitzar la iteració requerida mitjançant els registres de commits git hash.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Estenent Spark amb MLflow
Iteracions del vi

Part del servidor per al model

El servidor de seguiment MLflow, llançat mitjançant l'ordre "mlflow server", té una API REST per fer el seguiment de les execucions i escriure dades al sistema de fitxers local. Podeu especificar l'adreça del servidor de seguiment mitjançant la variable d'entorn "MLFLOW_TRACKING_URI" i l'API de seguiment de MLflow contactarà automàticament amb el servidor de seguiment en aquesta adreça per crear/rebre informació d'inici, mètriques de registre, etc.

Font: Documents// Execució d'un servidor de seguiment

Per proporcionar al model un servidor, necessitem un servidor de seguiment en execució (vegeu la interfície de llançament) i l'identificador d'execució del model.

Estenent Spark amb MLflow
ID d'execució

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Per oferir models amb la funcionalitat de servei MLflow, necessitarem accedir a la interfície d'usuari de seguiment per rebre informació sobre el model simplement especificant --run_id.

Un cop el model en contacte amb el servidor de seguiment, podem obtenir un nou punt final del model.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Models en execució de Spark

Malgrat que el servidor de seguiment és prou potent per servir models en temps real, entreneu-los i utilitzeu la funcionalitat del servidor (font: mlflow // docs // models # local), l'ús de Spark (per lots o streaming) és una solució encara més potent a causa de la distribució.

Imagineu que simplement heu fet l'entrenament fora de línia i després heu aplicat el model de sortida a totes les vostres dades. Aquí és on brillen Spark i MLflow.

Instal·leu PySpark + Jupyter + Spark

Font: Comenceu PySpark - Jupyter

Per mostrar com apliquem models MLflow als marcs de dades de Spark, hem de configurar els quaderns de Jupyter perquè funcionin conjuntament amb PySpark.

Comenceu instal·lant la darrera versió estable Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Instal·leu PySpark i Jupyter a l'entorn virtual:

pip install pyspark jupyter

Configurar variables d'entorn:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Havent determinat notebook-dir, podem emmagatzemar els nostres quaderns a la carpeta desitjada.

Llançament de Jupyter des de PySpark

Com que vam poder configurar Júpiter com a controlador de PySpark, ara podem executar el bloc de notes de Jupyter en el context de PySpark.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Estenent Spark amb MLflow

Com s'ha esmentat anteriorment, MLflow ofereix una funció per registrar artefactes de model a S3. Tan bon punt tinguem el model seleccionat a les nostres mans, tenim l'oportunitat d'importar-lo com a UDF mitjançant el mòdul mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Estenent Spark amb MLflow
PySpark: emet prediccions de qualitat del vi

Fins a aquest punt, hem parlat de com utilitzar PySpark amb MLflow, executant prediccions de qualitat del vi a tot el conjunt de dades de vins. Però, què passa si necessiteu utilitzar mòduls Python MLflow de Scala Spark?

També ho vam provar dividint el context Spark entre Scala i Python. És a dir, vam registrar MLflow UDF a Python i l'hem utilitzat des de Scala (sí, potser no és la millor solució, però la que tenim).

Scala Spark + MLflow

Per a aquest exemple afegirem Toree Kernel al Júpiter existent.

Instal·leu Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Com podeu veure al quadern adjunt, l'UDF es comparteix entre Spark i PySpark. Esperem que aquesta part sigui útil per a aquells que estimen Scala i volen implementar models d'aprenentatge automàtic en producció.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Següents passos

Tot i que MLflow està en versió alfa en el moment d'escriure, sembla força prometedor. Només la possibilitat d'executar diversos marcs d'aprenentatge automàtic i consumir-los des d'un únic punt final porta els sistemes de recomanació al següent nivell.

A més, MLflow apropa els enginyers de dades i els especialistes en ciències de dades, establint una capa comuna entre ells.

Després d'aquesta exploració de MLflow, estem segurs que avançarem i l'utilitzarem per als nostres pipelines Spark i sistemes de recomanació.

Seria bo sincronitzar l'emmagatzematge de fitxers amb la base de dades en lloc del sistema de fitxers. Això ens hauria de proporcionar diversos punts finals que poden utilitzar el mateix emmagatzematge de fitxers. Per exemple, utilitzeu diverses instàncies prest и Athena amb la mateixa metastore de Glue.

En resum, m'agradaria donar les gràcies a la comunitat MLFlow per fer que el nostre treball amb dades sigui més interessant.

Si estàs jugant amb MLflow, no dubtis a escriure'ns i explicar-nos com el fas servir, i encara més si el fas servir en producció.

Més informació sobre els cursos:
Aprenentatge automàtic. Curs bàsic
Aprenentatge automàtic. Curs avançat

Llegeix més:

Font: www.habr.com

Afegeix comentari