Ampliando Spark con MLflow

Ola, veciños de Khabrovsk. Como xa escribimos, este mes OTUS lanza dous cursos de aprendizaxe automática á vez, a saber base и avanzado. Neste sentido, seguimos compartindo material útil.

O propósito deste artigo é falar da nosa primeira experiencia de uso MLflow.

Comezaremos a revisión MLflow desde o seu servidor de seguimento e rexistrar todas as iteracións do estudo. Despois compartiremos a nosa experiencia de conectar Spark con MLflow usando UDF.

Contexto

Estamos dentro Saúde Alfa Usamos a aprendizaxe automática e a intelixencia artificial para que as persoas se fagan cargo da súa saúde e benestar. É por iso que os modelos de aprendizaxe automática están no centro dos produtos de ciencia de datos que desenvolvemos, e por iso nos atraeu MLflow, unha plataforma de código aberto que abarca todos os aspectos do ciclo de vida da aprendizaxe automática.

MLflow

O obxectivo principal de MLflow é proporcionar unha capa adicional sobre a aprendizaxe automática que permita aos científicos de datos traballar con case calquera biblioteca de aprendizaxe automática (h2o, keras, mleap, pirtorco, sklearn и fluxo tensor), levando o seu traballo ao seguinte nivel.

MLflow ofrece tres compoñentes:

  • Seguimento – gravación e solicitudes de experimentos: código, datos, configuración e resultados. O seguimento do proceso de creación dun modelo é moi importante.
  • proxectos – Formato de embalaxe para executarse en calquera plataforma (p. ex. SageMaker)
  • Modelos – un formato común para enviar modelos a varias ferramentas de implantación.

MLflow (en alfa no momento da escritura) é unha plataforma de código aberto que che permite xestionar o ciclo de vida da aprendizaxe automática, incluíndo a experimentación, a reutilización e a implantación.

Configuración de MLflow

Para usar MLflow, primeiro debes configurar todo o teu ambiente Python, para iso imos usar PyEnv (para instalar Python en Mac, consulta aquí). Deste xeito poderemos crear un entorno virtual onde instalaremos todas as bibliotecas necesarias para executalo.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Imos instalar as bibliotecas necesarias.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Nota: Usamos PyArrow para executar modelos como UDF. As versións de PyArrow e Numpy debían ser corrixidas porque as últimas versións entraban en conflito entre si.

Inicie a IU de seguimento

O seguimento de MLflow permítenos rexistrar e consultar experimentos usando Python e DESCANSO API. Ademais, pode determinar onde almacenar os artefactos do modelo (localhost, Amazon S3, Azure Blob Storage, Almacenamento na nube de Google ou Servidor SFTP). Dado que usamos AWS en Alpha Health, o noso almacenamento de artefactos será S3.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow recomenda usar o almacenamento de ficheiros persistente. O almacenamento de ficheiros é onde o servidor almacenará os metadatos de execución e proba. Ao iniciar o servidor, asegúrese de que apunta ao almacén de ficheiros persistentes. Aquí para o experimento simplemente usaremos /tmp.

Lembra que se queremos utilizar o servidor mlflow para executar experimentos antigos, deben estar presentes no almacenamento de ficheiros. Non obstante, aínda sen iso poderiamos utilizalos na UDF, xa que só necesitamos o camiño ao modelo.

Nota: Teña en conta que a interface de usuario de seguimento e o cliente do modelo deben ter acceso á localización do artefacto. É dicir, independentemente de que a IU de seguimento resida nunha instancia EC2, cando se executa MLflow localmente, a máquina debe ter acceso directo a S3 para escribir modelos de artefactos.

Ampliando Spark con MLflow
A IU de seguimento almacena artefactos nun depósito S3

Modelos de carreira

En canto se estea a executar o servidor de seguimento, podes comezar a adestrar os modelos.

Como exemplo, usaremos a modificación do viño do exemplo MLflow en Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Como xa comentamos, MLflow permítelle rexistrar parámetros, métricas e artefactos do modelo para que poida seguir como evolucionan ao longo das iteracións. Esta función é extremadamente útil porque deste xeito podemos reproducir o mellor modelo contactando co servidor de seguimento ou entendendo que código realizou a iteración requirida mediante os rexistros hash de commits de git.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Ampliando Spark con MLflow
Iteracións do viño

Parte do servidor para o modelo

O servidor de seguimento MLflow, lanzado mediante o comando "servidor mlflow", ten unha API REST para rastrexar execucións e escribir datos no sistema de ficheiros local. Podes especificar o enderezo do servidor de seguimento mediante a variable de ambiente "MLFLOW_TRACKING_URI" e a API de seguimento de MLflow contactará automaticamente co servidor de seguimento neste enderezo para crear/recibir información de inicio, métricas de rexistro, etc.

Fonte: Documentos// Execución dun servidor de seguimento

Para proporcionar ao modelo un servidor, necesitamos un servidor de seguimento en execución (consulte a interface de inicio) e o ID de execución do modelo.

Ampliando Spark con MLflow
Executar ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Para servir modelos usando a funcionalidade de servizo de MLflow, necesitaremos acceder á IU de seguimento para recibir información sobre o modelo simplemente especificando --run_id.

Unha vez que o modelo contacta co servidor de seguimento, podemos obter un novo punto final do modelo.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Modelos en execución de Spark

A pesar de que o servidor de seguimento é o suficientemente potente como para servir modelos en tempo real, adestraos e usa a funcionalidade do servidor (fonte: mlflow // docs // modelos # local), o uso de Spark (batch ou streaming) é unha solución aínda máis potente debido á distribución.

Imaxina que simplemente fixeches o adestramento fóra de liña e despois aplicaches o modelo de saída a todos os teus datos. Aquí é onde brillan Spark e MLflow.

Instala PySpark + Jupyter + Spark

Fonte: Comeza PySpark - Jupyter

Para mostrar como aplicamos os modelos MLflow aos marcos de datos de Spark, necesitamos configurar os cadernos de notas de Jupyter para traballar xunto con PySpark.

Comeza instalando a última versión estable Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Instala PySpark e Jupyter no entorno virtual:

pip install pyspark jupyter

Configurar as variables de ambiente:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Habendo determinado notebook-dir, podemos gardar os nosos cadernos no cartafol desexado.

Lanzamento de Jupyter desde PySpark

Dado que puidemos configurar Xúpiter como controlador de PySpark, agora podemos executar o portátil Jupyter no contexto de PySpark.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Ampliando Spark con MLflow

Como se mencionou anteriormente, MLflow ofrece unha función para rexistrar artefactos do modelo en S3. En canto teñamos o modelo seleccionado nas nosas mans, temos a oportunidade de importalo como UDF mediante o módulo mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Ampliando Spark con MLflow
PySpark: emite predicións de calidade do viño

Ata este punto, falamos de como usar PySpark con MLflow, executando predicións de calidade do viño en todo o conxunto de datos de viños. Pero que pasa se necesitas usar módulos Python MLflow de Scala Spark?

Tamén probamos isto dividindo o contexto de Spark entre Scala e Python. É dicir, rexistramos MLflow UDF en Python e usámolo desde Scala (si, quizais non sexa a mellor solución, pero a que temos).

Scala Spark + MLflow

Para este exemplo engadiremos Toree Kernel no Xúpiter existente.

Instala Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Como podes ver no caderno adxunto, a UDF compártese entre Spark e PySpark. Agardamos que esta parte sexa útil para aqueles que aman Scala e queiran implementar modelos de aprendizaxe automática na produción.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Próximos pasos

Aínda que MLflow está na versión Alpha no momento de escribir, parece bastante prometedor. Só a posibilidade de executar varios marcos de aprendizaxe automática e consumilos desde un único punto final leva os sistemas de recomendación ao seguinte nivel.

Ademais, MLflow achega aos enxeñeiros de datos e aos especialistas en ciencias de datos, establecendo unha capa común entre eles.

Despois desta exploración de MLflow, estamos seguros de que imos avanzar e usalo para os nosos pipelines e sistemas de recomendación Spark.

Sería bo sincronizar o almacenamento de ficheiros coa base de datos en lugar do sistema de ficheiros. Isto debería darnos varios puntos finais que poden usar o mesmo almacenamento de ficheiros. Por exemplo, use varias instancias presto и Atena coa mesma metastore Glue.

Para resumir, gustaríame agradecer á comunidade MLFlow por facer máis interesante o noso traballo cos datos.

Se estás xogando con MLflow, non dubides en escribirnos e dicirnos como o usas, e máis aínda se o usas en produción.

Máis información sobre os cursos:
Aprendizaxe automática. Curso básico
Aprendizaxe automática. Curso avanzado

Le máis:

Fonte: www.habr.com

Engadir un comentario