Extension de Spark avec MLflow

Bonjour, habitants de Khabrovsk. Comme nous l'avons déjà écrit, OTUS lance ce mois-ci deux cours de machine learning à la fois, à savoir base и Avancée. À cet égard, nous continuons de partager du matériel utile.

Le but de cet article est de parler de notre première expérience d'utilisation MLflow.

Nous allons commencer l'examen MLflow depuis son serveur de suivi et enregistrez toutes les itérations de l'étude. Ensuite, nous partagerons notre expérience de connexion de Spark à MLflow à l'aide d'UDF.

Contexte

Nous sommes en Alpha Santé Nous utilisons l’apprentissage automatique et l’intelligence artificielle pour permettre aux individus de prendre en charge leur santé et leur bien-être. C'est pourquoi les modèles d'apprentissage automatique sont au cœur des produits de science des données que nous développons, et c'est pourquoi nous avons été attirés par MLflow, une plateforme open source qui couvre tous les aspects du cycle de vie de l'apprentissage automatique.

MLflow

L'objectif principal de MLflow est de fournir une couche supplémentaire en plus de l'apprentissage automatique qui permettrait aux data scientists de travailler avec presque toutes les bibliothèques d'apprentissage automatique (h2o, keras, saut, pytorch, apprendre и tensorflow), faisant passer son travail à un niveau supérieur.

MLflow fournit trois composants :

  • Tracking – enregistrement et demandes d’expérimentations : code, données, configuration et résultats. Le suivi du processus de création d'un modèle est très important.
  • Projets – Format de packaging pour fonctionner sur n’importe quelle plateforme (par ex. SageMaker)
  • Des modèles photo – un format commun pour soumettre des modèles à divers outils de déploiement.

MLflow (en version alpha au moment de la rédaction) est une plateforme open source qui vous permet de gérer le cycle de vie de l'apprentissage automatique, y compris l'expérimentation, la réutilisation et le déploiement.

Configuration de MLflow

Pour utiliser MLflow, vous devez d'abord configurer l'intégralité de votre environnement Python, pour cela nous utiliserons PyEnv (pour installer Python sur Mac, consultez ici). De cette façon, nous pouvons créer un environnement virtuel dans lequel nous installerons toutes les bibliothèques nécessaires à son exécution.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Installons les bibliothèques requises.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Remarque : Nous utilisons PyArrow pour exécuter des modèles tels que UDF. Les versions de PyArrow et Numpy devaient être corrigées car ces dernières versions étaient en conflit les unes avec les autres.

Lancer l'interface utilisateur de suivi

MLflow Tracking nous permet d'enregistrer et d'interroger des expériences à l'aide de Python et REST API. De plus, vous pouvez déterminer où stocker les artefacts du modèle (localhost, Amazon S3, Stockage d'objets blob Azure, Google Cloud Storage ou Serveur SFTP). Puisque nous utilisons AWS chez Alpha Health, notre stockage d'artefacts sera S3.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow recommande d'utiliser le stockage de fichiers persistant. Le stockage de fichiers est l'endroit où le serveur stockera les métadonnées d'exécution et d'expérimentation. Lors du démarrage du serveur, assurez-vous qu'il pointe vers le magasin de fichiers persistants. Ici pour l'expérience nous utiliserons simplement /tmp.

N'oubliez pas que si nous voulons utiliser le serveur mlflow pour exécuter d'anciennes expériences, elles doivent être présentes dans le stockage de fichiers. Cependant, même sans cela, nous pourrions les utiliser dans l'UDF, puisque nous n'avons besoin que du chemin d'accès au modèle.

Remarque : Gardez à l'esprit que l'interface utilisateur de suivi et le client modèle doivent avoir accès à l'emplacement de l'artefact. Autrement dit, indépendamment du fait que l'interface utilisateur de suivi réside dans une instance EC2, lors de l'exécution locale de MLflow, la machine doit avoir un accès direct à S3 pour écrire des modèles d'artefacts.

Extension de Spark avec MLflow
L'interface utilisateur de suivi stocke les artefacts dans un compartiment S3

Modèles en cours d'exécution

Dès que le serveur de suivi est exécuté, vous pouvez commencer à entraîner les modèles.

A titre d'exemple, nous utiliserons la modification wine de l'exemple MLflow dans Apprendre.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Comme nous l'avons déjà évoqué, MLflow vous permet d'enregistrer les paramètres, les métriques et les artefacts du modèle afin que vous puissiez suivre leur évolution au fil des itérations. Cette fonctionnalité est extrêmement utile car nous pouvons ainsi reproduire le meilleur modèle en contactant le serveur de suivi ou en comprenant quel code a effectué l'itération requise à l'aide des journaux de hachage git des commits.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Extension de Spark avec MLflow
Itérations de vin

Partie serveur pour le modèle

Le serveur de suivi MLflow, lancé à l'aide de la commande « mlflow server », dispose d'une API REST pour suivre les exécutions et écrire des données sur le système de fichiers local. Vous pouvez spécifier l'adresse du serveur de suivi à l'aide de la variable d'environnement « MLFLOW_TRACKING_URI » et l'API de suivi MLflow contactera automatiquement le serveur de suivi à cette adresse pour créer/recevoir des informations de lancement, enregistrer des métriques, etc.

Source: Docs// Exécution d'un serveur de suivi

Pour fournir au modèle un serveur, nous avons besoin d'un serveur de suivi en cours d'exécution (voir interface de lancement) et du Run ID du modèle.

Extension de Spark avec MLflow
ID d'exécution

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Pour servir des modèles à l'aide de la fonctionnalité de service MLflow, nous aurons besoin d'accéder à l'interface utilisateur de suivi pour recevoir des informations sur le modèle simplement en spécifiant --run_id.

Une fois que le modèle contacte le serveur de suivi, nous pouvons obtenir un nouveau point de terminaison du modèle.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Exécution de modèles depuis Spark

Malgré le fait que le serveur de suivi soit suffisamment puissant pour servir les modèles en temps réel, les former et utiliser les fonctionnalités du serveur (source : mlflow // docs // modèles # local), l’utilisation de Spark (batch ou streaming) est une solution encore plus puissante du fait de la distribution.

Imaginez que vous ayez simplement effectué la formation hors ligne, puis appliqué le modèle de sortie à toutes vos données. C'est là que Spark et MLflow brillent.

Installer PySpark + Jupyter + Spark

Source: Commencer PySpark - Jupyter

Pour montrer comment nous appliquons les modèles MLflow aux dataframes Spark, nous devons configurer les notebooks Jupyter pour qu'ils fonctionnent avec PySpark.

Commencez par installer la dernière version stable Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Installez PySpark et Jupyter dans l'environnement virtuel :

pip install pyspark jupyter

Configurez les variables d'environnement :

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Ayant déterminé notebook-dir, nous pouvons stocker nos cahiers dans le dossier souhaité.

Lancer Jupyter depuis PySpark

Depuis que nous avons pu configurer Jupiter en tant que pilote PySpark, nous pouvons désormais exécuter le notebook Jupyter dans le contexte de PySpark.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Extension de Spark avec MLflow

Comme mentionné ci-dessus, MLflow fournit une fonctionnalité de journalisation des artefacts de modèle dans S3. Dès que nous avons le modèle sélectionné entre nos mains, nous avons la possibilité de l'importer sous forme d'UDF à l'aide du module mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Extension de Spark avec MLflow
PySpark - Générer des prévisions sur la qualité du vin

Jusqu'à présent, nous avons expliqué comment utiliser PySpark avec MLflow, en exécutant des prévisions de qualité du vin sur l'ensemble de données sur le vin. Mais que se passe-t-il si vous devez utiliser les modules Python MLflow de Scala Spark ?

Nous avons également testé cela en divisant le contexte Spark entre Scala et Python. Autrement dit, nous avons enregistré MLflow UDF en Python et l'avons utilisé depuis Scala (oui, ce n'est peut-être pas la meilleure solution, mais ce que nous avons).

Scala Spark + MLflow

Pour cet exemple, nous ajouterons Noyau Torée dans le Jupiter existant.

Installer Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Comme vous pouvez le voir sur le bloc-notes ci-joint, l'UDF est partagée entre Spark et PySpark. Nous espérons que cette partie sera utile à ceux qui aiment Scala et souhaitent déployer des modèles d'apprentissage automatique en production.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Prochaines étapes

Même si MLflow est en version Alpha au moment de la rédaction, il semble assez prometteur. La simple possibilité d’exécuter plusieurs frameworks d’apprentissage automatique et de les utiliser à partir d’un seul point de terminaison fait passer les systèmes de recommandation au niveau supérieur.

De plus, MLflow rapproche les Data Engineers et les spécialistes de la Data Science, créant ainsi une couche commune entre eux.

Après cette exploration de MLflow, nous sommes convaincus que nous allons aller de l'avant et l'utiliser pour nos pipelines Spark et nos systèmes de recommandation.

Ce serait bien de synchroniser le stockage des fichiers avec la base de données plutôt qu'avec le système de fichiers. Cela devrait nous donner plusieurs points de terminaison pouvant utiliser le même stockage de fichiers. Par exemple, utilisez plusieurs instances Presto и Athena avec le même métastore Glue.

Pour résumer, je voudrais remercier la communauté MLFlow d'avoir rendu notre travail avec les données plus intéressant.

Si vous jouez avec MLflow, n'hésitez pas à nous écrire et à nous dire comment vous l'utilisez, et encore plus si vous l'utilisez en production.

En savoir plus sur les cours :
Apprentissage automatique. Cours de base
Apprentissage automatique. Cours avancé

Lire la suite:

Source: habr.com

Ajouter un commentaire