Ampliación de Spark con MLflow

Hola, khabrovitas. Como ya escribimos, este mes OTUS lanza dos cursos sobre aprendizaje automático a la vez, a saber base и avanzado. En este sentido, seguimos compartiendo material útil.

El propósito de este artículo es hablar de nuestra primera experiencia con flujo ml.

Comenzaremos la revisión. flujo ml desde su servidor de seguimiento y prólogo de todas las iteraciones del estudio. Luego compartiremos la experiencia de conectar Spark con MLflow usando UDF.

Contexto

Estamos en salud alfa utilizamos el aprendizaje automático y la inteligencia artificial para empoderar a las personas para que cuiden su salud y bienestar. Es por eso que los modelos de aprendizaje automático están en el corazón de los productos de datos que desarrollamos, y es por eso que MLflow, una plataforma de código abierto que abarca todos los aspectos del ciclo de vida del aprendizaje automático, llamó nuestra atención.

flujo ml

El objetivo principal de MLflow es proporcionar una capa adicional además del aprendizaje automático que permitiría a los científicos de datos trabajar con casi cualquier biblioteca de aprendizaje automático (h2o, keras, msalto, piñón, aprender и tensorflow), llevando su trabajo al siguiente nivel.

MLflow proporciona tres componentes:

  • Seguimiento – registro y solicitudes de experimentos: código, datos, configuración y resultados. Es muy importante seguir el proceso de creación de un modelo.
  • Proyectos – Formato de empaquetado para ejecutar en cualquier plataforma (por ejemplo, SageMaker)
  • fexibles es un formato común para enviar modelos a varias herramientas de implementación.

MLflow (alfa en el momento de escribir este artículo) es una plataforma de código abierto que le permite administrar el ciclo de vida del aprendizaje automático, incluida la experimentación, la reutilización y la implementación.

Configuración de MLflow

Para usar MLflow, primero debe configurar todo el entorno de Python, para esto usaremos PyEnv (para instalar Python en una Mac, echa un vistazo aquí). Así podremos crear un entorno virtual donde instalaremos todas las librerías necesarias para su ejecución.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Instale las bibliotecas necesarias.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Nota: estamos usando PyArrow para ejecutar modelos como UDF. Las versiones de PyArrow y Numpy debían corregirse porque las últimas versiones estaban en conflicto entre sí.

Lanzamiento de la interfaz de usuario de seguimiento

MLflow Tracking nos permite registrar y consultar experimentos con Python y RESTO API. Además, puede definir dónde almacenar los artefactos del modelo (localhost, Amazon S3, Almacenamiento de blobs de Azure, Google Cloud Storage o servidor SFTP). Dado que usamos AWS en Alpha Health, S3 será el almacenamiento de los artefactos.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow recomienda usar almacenamiento de archivos persistente. El almacenamiento de archivos es donde el servidor almacenará los metadatos de ejecución y experimento. Al iniciar el servidor, asegúrese de que apunte al almacenamiento de archivos persistente. Aquí, por el bien del experimento, simplemente usaremos /tmp.

Tenga en cuenta que si queremos usar el servidor mlflow para ejecutar experimentos antiguos, deben estar presentes en el almacén de archivos. Sin embargo, incluso sin esto, podríamos usarlos en la UDF, ya que solo necesitamos la ruta al modelo.

Nota: Tenga en cuenta que la IU de seguimiento y el cliente modelo deben tener acceso a la ubicación del artefacto. Es decir, independientemente del hecho de que la IU de seguimiento esté ubicada en una instancia EC2, cuando se ejecuta MLflow localmente, la máquina debe tener acceso directo a S3 para escribir modelos de artefactos.

Ampliación de Spark con MLflow
El seguimiento de la interfaz de usuario almacena artefactos en el depósito S3

Modelos en ejecución

Tan pronto como el servidor de seguimiento se esté ejecutando, puede comenzar a entrenar los modelos.

Como ejemplo, usaremos la modificación del vino del ejemplo de MLflow en aprender.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Como dijimos, MLflow le permite registrar los parámetros, las métricas y los artefactos del modelo para que pueda realizar un seguimiento de cómo se desarrollan como iteraciones. Esta característica es extremadamente útil, porque nos permite reproducir el mejor modelo contactando al servidor de seguimiento o entendiendo qué código realizó la iteración requerida usando los registros hash de git de las confirmaciones.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Ampliación de Spark con MLflow
iteraciones de vino

Back-end para el modelo

El servidor de seguimiento de MLflow iniciado con el comando "mlflow server" tiene una API REST para realizar un seguimiento de las ejecuciones y escribir datos en el sistema de archivos local. Puede especificar la dirección del servidor de seguimiento mediante la variable de entorno "MLFLOW_TRACKING_URI" y la API de seguimiento de MLflow se comunicará automáticamente con el servidor de seguimiento en esta dirección para crear/obtener información de lanzamiento, métricas de registro, etc.

Fuente: Docs// Ejecutar un servidor de seguimiento

Para proporcionar un servidor al modelo, necesitamos un servidor de seguimiento en ejecución (consulte la interfaz de inicio) y el ID de ejecución del modelo.

Ampliación de Spark con MLflow
ID de ejecución

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Para servir modelos usando la funcionalidad de servicio de MLflow, necesitamos acceso a la interfaz de usuario de seguimiento para obtener información sobre el modelo simplemente especificando --run_id.

Una vez que el modelo se pone en contacto con el servidor de seguimiento, podemos obtener un nuevo punto final del modelo.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Ejecución de modelos de Spark

A pesar de que el servidor de seguimiento es lo suficientemente potente como para servir modelos en tiempo real, capacítelos y use la funcionalidad del servidor (fuente: mlflow // documentos // modelos #local), usar Spark (batch o streaming) es una solución aún más poderosa debido a su distribución.

Imagine que acaba de realizar un entrenamiento fuera de línea y luego aplica el modelo de salida a todos sus datos. Aquí es donde Spark y MLflow entran en juego.

Instalar PySpark + Jupyter + Spark

Fuente: Empezar PySpark - Jupyter

Para mostrar cómo aplicamos los modelos de MLflow a los marcos de datos de Spark, debemos configurar los cuadernos de Jupyter Notebook para que funcionen con PySpark.

Comience instalando la última versión estable Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Instale PySpark y Jupyter en un entorno virtual:

pip install pyspark jupyter

Configurar variables de entorno:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Habiendo definido notebook-dir, podremos almacenar nuestros cuadernos en la carpeta deseada.

Ejecutando Jupyter desde PySpark

Dado que pudimos configurar Júpiter como controlador PySpark, ahora podemos ejecutar el cuaderno Jupyter en un contexto PySpark.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Ampliación de Spark con MLflow

Como se mencionó anteriormente, MLflow proporciona la función de registrar artefactos del modelo en S3. En cuanto tengamos en nuestras manos el modelo seleccionado, tenemos la oportunidad de importarlo como UDF usando el módulo mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Ampliación de Spark con MLflow
PySpark - Predicción de la calidad del vino

Hasta este punto, hemos hablado sobre cómo usar PySpark con MLflow ejecutando la predicción de la calidad del vino en todo el conjunto de datos del vino. Pero, ¿qué sucede si necesita usar los módulos Python MLflow de Scala Spark?

Probamos esto también al dividir el contexto de Spark entre Scala y Python. Es decir, registramos MLflow UDF en Python y lo usamos desde Scala (sí, tal vez no sea la mejor solución, pero la que tenemos).

Scala Spark + flujo ml

Para este ejemplo, agregaremos Núcleo de Toree en un Júpiter existente.

Instalar Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Como puede ver en el cuaderno adjunto, Spark y PySpark comparten UDF. Esperamos que esta parte sea útil para quienes aman Scala y desean implementar modelos de aprendizaje automático en producción.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Próximos pasos

Aunque MLflow está en Alpha en el momento de escribir este artículo, parece bastante prometedor. El simple hecho de poder ejecutar múltiples marcos de aprendizaje automático y usarlos desde un único punto final lleva a los sistemas de recomendación al siguiente nivel.

Además, MLflow acerca a los ingenieros de datos y los científicos de datos, creando una capa común entre ellos.

Después de esta exploración de MLflow, estamos seguros de seguir adelante y usarlo para nuestras canalizaciones y sistemas de recomendación de Spark.

Sería bueno sincronizar el almacenamiento de archivos con la base de datos en lugar del sistema de archivos. Esto debería darnos múltiples puntos finales que pueden usar el mismo recurso compartido de archivos. Por ejemplo, use varias instancias presto и Athena con el mismo metastore de Glue.

En resumen, me gustaría dar las gracias a la comunidad de MLFlow por hacer que nuestro trabajo con los datos sea más interesante.

Si juegas con MLflow, no dudes en escribirnos y contarnos cómo lo usas, y más aún si lo usas en producción.

Obtenga más información sobre los cursos:
aprendizaje automático. Curso basico
aprendizaje automático. curso avanzado

Lee mas:

Fuente: habr.com

Añadir un comentario