Estensione di Spark con MLflow

Ciao, residenti di Khabrovsk. Come abbiamo già scritto, questo mese OTUS lancerà due corsi di machine learning contemporaneamente, vale a dire di base и Avanzate. A questo proposito continuiamo a condividere materiale utile.

Lo scopo di questo articolo è parlare della nostra prima esperienza nell'utilizzo MLflow.

Inizieremo la recensione MLflow dal suo server di tracciamento e registra tutte le iterazioni dello studio. Quindi condivideremo la nostra esperienza nel connettere Spark con MLflow utilizzando UDF.

Contesto

Siamo dentro Salute alfa Utilizziamo l’apprendimento automatico e l’intelligenza artificiale per consentire alle persone di prendersi cura della propria salute e del proprio benessere. Ecco perché i modelli di machine learning sono al centro dei prodotti di data science che sviluppiamo ed è per questo che siamo stati attratti da MLflow, una piattaforma open source che copre tutti gli aspetti del ciclo di vita del machine learning.

MLflow

L'obiettivo principale di MLflow è fornire un livello aggiuntivo oltre all'apprendimento automatico che consenta ai data scientist di lavorare con quasi tutte le librerie di apprendimento automatico (h2o, keras, mleap, pytorch, sklearn и tensorflow), portando il suo lavoro al livello successivo.

MLflow fornisce tre componenti:

  • Tracking – registrazione e richieste di esperimenti: codice, dati, configurazione e risultati. Monitorare il processo di creazione di un modello è molto importante.
  • Progetti – Formato del packaging per l'esecuzione su qualsiasi piattaforma (ad es. SageMaker)
  • Modelli – un formato comune per l'invio di modelli a vari strumenti di distribuzione.

MLflow (in alpha al momento della scrittura) è una piattaforma open source che consente di gestire il ciclo di vita del machine learning, inclusi sperimentazione, riutilizzo e distribuzione.

Configurazione di MLflow

Per utilizzare MLflow devi prima configurare l'intero ambiente Python, per questo utilizzeremo PyEnv (per installare Python su Mac, consulta qui). In questo modo possiamo creare un ambiente virtuale in cui installeremo tutte le librerie necessarie per eseguirlo.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Installiamo le librerie richieste.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Nota: utilizziamo PyArrow per eseguire modelli come UDF. Le versioni di PyArrow e Numpy dovevano essere corrette perché queste ultime versioni erano in conflitto tra loro.

Avvia l'interfaccia utente di monitoraggio

MLflow Tracking ci consente di registrare ed eseguire query sugli esperimenti utilizzando Python e REST API. Inoltre, puoi determinare dove archiviare gli artefatti del modello (localhost, Amazon S3, Archiviazione BLOB di Azure, Google Cloud Storage o server SFTP). Poiché utilizziamo AWS presso Alpha Health, il nostro storage degli artefatti sarà S3.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow consiglia di utilizzare l'archiviazione di file persistente. L'archiviazione dei file è il luogo in cui il server memorizzerà i metadati della corsa e dell'esperimento. Quando si avvia il server, assicurarsi che punti all'archivio file persistente. Qui per l'esperimento useremo semplicemente /tmp.

Ricordiamo che se vogliamo utilizzare il server mlflow per eseguire vecchi esperimenti, questi devono essere presenti nel file storage. Tuttavia, anche senza questo, potremmo utilizzarli nell'UDF, poiché abbiamo solo bisogno del percorso del modello.

Nota: tieni presente che l'interfaccia utente di monitoraggio e il client del modello devono avere accesso alla posizione dell'artefatto. Cioè, indipendentemente dal fatto che l'interfaccia utente di monitoraggio risieda in un'istanza EC2, quando si esegue MLflow localmente, la macchina deve avere accesso diretto a S3 per scrivere modelli di artefatti.

Estensione di Spark con MLflow
L'interfaccia utente di monitoraggio memorizza gli artefatti in un bucket S3

Modelli in corsa

Non appena il server di monitoraggio è in esecuzione, puoi iniziare ad addestrare i modelli.

Ad esempio, utilizzeremo la modifica del vino dall'esempio MLflow in Impara.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Come abbiamo già discusso, MLflow ti consente di registrare parametri, metriche e artefatti del modello in modo da poter monitorare come si evolvono nel corso delle iterazioni. Questa funzionalità è estremamente utile perché in questo modo possiamo riprodurre il modello migliore contattando il server di Tracking o capire quale codice ha eseguito l'iterazione richiesta utilizzando i log hash git dei commit.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Estensione di Spark con MLflow
Iterazioni del vino

Parte server per il modello

Il server di tracciamento MLflow, avviato utilizzando il comando "mlflow server", dispone di un'API REST per tenere traccia delle esecuzioni e scrivere dati nel file system locale. È possibile specificare l'indirizzo del server di monitoraggio utilizzando la variabile di ambiente "MLFLOW_TRACKING_URI" e l'API di monitoraggio MLflow contatterà automaticamente il server di monitoraggio a questo indirizzo per creare/ricevere informazioni sul lancio, metriche di registro, ecc.

Fonte: Documenti// Esecuzione di un server di tracciamento

Per fornire un server al modello, abbiamo bisogno di un server di tracciamento in esecuzione (vedi interfaccia di lancio) e dell'ID esecuzione del modello.

Estensione di Spark con MLflow
Esegui ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Per servire i modelli utilizzando la funzionalità di servizio MLflow, avremo bisogno di accedere all'interfaccia utente di monitoraggio per ricevere informazioni sul modello semplicemente specificando --run_id.

Una volta che il modello contatta il server di monitoraggio, possiamo ottenere un nuovo endpoint del modello.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Modelli in esecuzione da Spark

Nonostante il server di monitoraggio sia abbastanza potente da servire i modelli in tempo reale, addestrarli e utilizzare la funzionalità del server (fonte: mlflow // documenti // modelli # local), l'utilizzo di Spark (batch o streaming) è una soluzione ancora più potente grazie alla distribuzione.

Immagina di aver semplicemente eseguito la formazione offline e quindi di aver applicato il modello di output a tutti i tuoi dati. È qui che brillano Spark e MLflow.

Installa PySpark + Jupyter + Spark

Fonte: Inizia PySpark - Jupyter

Per mostrare come applichiamo i modelli MLflow ai dataframe Spark, dobbiamo configurare i notebook Jupyter per collaborare con PySpark.

Inizia installando l'ultima versione stabile Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Installa PySpark e Jupyter nell'ambiente virtuale:

pip install pyspark jupyter

Imposta le variabili di ambiente:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Avendo determinato notebook-dir, possiamo archiviare i nostri quaderni nella cartella desiderata.

Avvio di Jupyter da PySpark

Dato che siamo riusciti a configurare Jupiter come driver PySpark, ora possiamo eseguire il notebook Jupyter nel contesto di PySpark.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Estensione di Spark con MLflow

Come accennato in precedenza, MLflow fornisce una funzionalità per la registrazione degli artefatti del modello in S3. Non appena avremo tra le mani il modello selezionato, abbiamo la possibilità di importarlo come UDF utilizzando il modulo mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Estensione di Spark con MLflow
PySpark: produzione di previsioni sulla qualità del vino

Fino a questo punto abbiamo parlato di come utilizzare PySpark con MLflow, eseguendo previsioni sulla qualità del vino sull'intero set di dati del vino. Ma cosa succede se è necessario utilizzare i moduli Python MLflow di Scala Spark?

Abbiamo testato anche questo dividendo il contesto Spark tra Scala e Python. Cioè, abbiamo registrato MLflow UDF in Python e l'abbiamo utilizzato da Scala (sì, forse non è la soluzione migliore, ma quella che abbiamo).

Scala Spark + MLflow

Per questo esempio aggiungeremo Toree Kernel nell'attuale Giove.

Installa Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Come puoi vedere dal taccuino allegato, l'UDF è condivisa tra Spark e PySpark. Ci auguriamo che questa parte sia utile a coloro che amano Scala e desiderano distribuire modelli di machine learning in produzione.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Prossimi passi

Anche se MLflow è in versione Alpha al momento in cui scrivo, sembra abbastanza promettente. Solo la capacità di eseguire più framework di machine learning e di utilizzarli da un singolo endpoint porta i sistemi di raccomandazione a un livello superiore.

Inoltre, MLflow avvicina i data engineer e gli specialisti di data science, ponendo tra loro un livello comune.

Dopo questa esplorazione di MLflow, siamo fiduciosi che andremo avanti e lo utilizzeremo per le nostre pipeline Spark e i nostri sistemi di raccomandazione.

Sarebbe bello sincronizzare l'archiviazione dei file con il database anziché con il file system. Questo dovrebbe fornirci più endpoint che possono utilizzare lo stesso spazio di archiviazione file. Ad esempio, utilizza più istanze Presto и Athena con lo stesso metastore Glue.

Per riassumere, vorrei ringraziare la comunità MLFlow per aver reso il nostro lavoro con i dati più interessante.

Se stai giocando con MLflow, non esitare a scriverci e dirci come lo usi, e ancora di più se lo usi in produzione.

Scopri di più sui corsi:
Apprendimento automatico. Corso base
Apprendimento automatico. Corso avanzato

Per saperne di più:

Fonte: habr.com

Aggiungi un commento