Spark mit MLflow erweitern

Hallo, Bewohner von Chabrowsk. Wie wir bereits geschrieben haben, startet OTUS diesen Monat gleich zwei Kurse für maschinelles Lernen, nämlich Basic и fortgeschritten. In diesem Zusammenhang teilen wir weiterhin nützliches Material.

Der Zweck dieses Artikels besteht darin, über unsere ersten Erfahrungen mit der Verwendung zu sprechen MLflow.

Wir beginnen mit der Überprüfung MLflow von seinem Tracking-Server und protokollieren alle Iterationen der Studie. Anschließend teilen wir unsere Erfahrungen mit der Verbindung von Spark mit MLflow mithilfe von UDF.

Kontext

Wir sind Alpha-Gesundheit Wir nutzen maschinelles Lernen und künstliche Intelligenz, um Menschen in die Lage zu versetzen, die Verantwortung für ihre Gesundheit und ihr Wohlbefinden zu übernehmen. Aus diesem Grund stehen Modelle für maschinelles Lernen im Mittelpunkt der von uns entwickelten Data-Science-Produkte, und deshalb haben wir uns für MLflow interessiert, eine Open-Source-Plattform, die alle Aspekte des Lebenszyklus für maschinelles Lernen abdeckt.

MLflow

Das Hauptziel von MLflow besteht darin, eine zusätzliche Ebene zusätzlich zum maschinellen Lernen bereitzustellen, die es Datenwissenschaftlern ermöglichen würde, mit fast jeder Bibliothek für maschinelles Lernen zu arbeiten (h2o, keras, mleap, Pytorch, sklearn и Tensorfluss), was ihre Arbeit auf die nächste Ebene bringt.

MLflow bietet drei Komponenten:

  • Tracking – Aufzeichnung und Anfragen für Experimente: Code, Daten, Konfiguration und Ergebnisse. Es ist sehr wichtig, den Prozess der Modellerstellung zu überwachen.
  • Projekte – Verpackungsformat zur Ausführung auf jeder Plattform (z. B. SageMaker)
  • Modelle – ein gängiges Format zum Übermitteln von Modellen an verschiedene Bereitstellungstools.

MLflow (zum Zeitpunkt des Schreibens in Alpha) ist eine Open-Source-Plattform, mit der Sie den Lebenszyklus des maschinellen Lernens verwalten können, einschließlich Experimentieren, Wiederverwendung und Bereitstellung.

MLflow einrichten

Um MLflow nutzen zu können, müssen Sie zunächst Ihre gesamte Python-Umgebung einrichten, hierfür verwenden wir PyEnv (Um Python auf dem Mac zu installieren, schauen Sie sich Folgendes an hierher). Auf diese Weise können wir eine virtuelle Umgebung erstellen, in der wir alle für die Ausführung erforderlichen Bibliotheken installieren.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Lassen Sie uns die erforderlichen Bibliotheken installieren.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Hinweis: Wir verwenden PyArrow, um Modelle wie UDF auszuführen. Die Versionen von PyArrow und Numpy mussten repariert werden, da die letzteren Versionen miteinander in Konflikt standen.

Starten Sie die Tracking-Benutzeroberfläche

Mit MLflow Tracking können wir Experimente mit Python protokollieren und abfragen REST API. Darüber hinaus können Sie festlegen, wo Modellartefakte gespeichert werden sollen (localhost, Amazon S3, Azure Blob-Speicher, Google Cloud Storage oder SFTP-Server). Da wir bei Alpha Health AWS verwenden, wird unser Artefaktspeicher S3 sein.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow empfiehlt die Verwendung eines persistenten Dateispeichers. Im Dateispeicher speichert der Server Lauf- und Experimentmetadaten. Stellen Sie beim Starten des Servers sicher, dass er auf den persistenten Dateispeicher verweist. Hier für das Experiment verwenden wir einfach /tmp.

Denken Sie daran, dass, wenn wir den mlflow-Server zum Ausführen alter Experimente verwenden möchten, diese im Dateispeicher vorhanden sein müssen. Aber auch ohne dies könnten wir sie in der UDF verwenden, da wir nur den Pfad zum Modell benötigen.

Hinweis: Beachten Sie, dass die Tracking-Benutzeroberfläche und der Modell-Client Zugriff auf den Artefaktspeicherort haben müssen. Das heißt, unabhängig davon, dass sich die Tracking-Benutzeroberfläche in einer EC2-Instanz befindet, muss der Computer bei lokaler Ausführung von MLflow direkten Zugriff auf S3 haben, um Artefaktmodelle zu schreiben.

Spark mit MLflow erweitern
Die Tracking-Benutzeroberfläche speichert Artefakte in einem S3-Bucket

Laufmodelle

Sobald der Tracking-Server läuft, können Sie mit dem Training der Modelle beginnen.

Als Beispiel verwenden wir die Wine-Modifikation aus dem MLflow-Beispiel in Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Wie wir bereits besprochen haben, können Sie mit MLflow Modellparameter, Metriken und Artefakte protokollieren, sodass Sie verfolgen können, wie sie sich im Laufe der Iterationen entwickeln. Diese Funktion ist äußerst nützlich, da wir auf diese Weise das beste Modell reproduzieren können, indem wir den Tracking-Server kontaktieren oder anhand der Git-Hash-Protokolle von Commits herausfinden, welcher Code die erforderliche Iteration durchgeführt hat.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Spark mit MLflow erweitern
Weiniterationen

Serverteil für das Modell

Der MLflow-Tracking-Server, der mit dem Befehl „mlflow server“ gestartet wird, verfügt über eine REST-API zum Verfolgen von Läufen und zum Schreiben von Daten in das lokale Dateisystem. Sie können die Tracking-Server-Adresse mit der Umgebungsvariablen „MLFLOW_TRACKING_URI“ angeben und die MLflow-Tracking-API kontaktiert automatisch den Tracking-Server unter dieser Adresse, um Startinformationen, Protokollmetriken usw. zu erstellen/empfangen.

Source: Dokumente// Ausführen eines Tracking-Servers

Um dem Modell einen Server zur Verfügung zu stellen, benötigen wir einen laufenden Tracking-Server (siehe Startoberfläche) und die Run-ID des Modells.

Spark mit MLflow erweitern
Lauf-ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Um Modelle mithilfe der MLflow-Serve-Funktionalität bereitzustellen, benötigen wir Zugriff auf die Tracking-Benutzeroberfläche, um Informationen über das Modell einfach durch Angabe zu erhalten --run_id.

Sobald das Modell den Tracking-Server kontaktiert, können wir einen neuen Modellendpunkt erhalten.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Laufmodelle von Spark

Trotz der Tatsache, dass der Tracking-Server leistungsstark genug ist, um Modelle in Echtzeit zu bedienen, sie zu trainieren und die Serverfunktionalität zu nutzen (Quelle: mlflow // docs // models # local) ist der Einsatz von Spark (Batch oder Streaming) aufgrund der Verteilung eine noch leistungsfähigere Lösung.

Stellen Sie sich vor, Sie hätten das Training einfach offline durchgeführt und dann das Ausgabemodell auf alle Ihre Daten angewendet. Hier glänzen Spark und MLflow.

Installieren Sie PySpark + Jupyter + Spark

Source: Beginnen Sie mit PySpark – Jupyter

Um zu zeigen, wie wir MLflow-Modelle auf Spark-Datenrahmen anwenden, müssen wir Jupyter-Notebooks für die Zusammenarbeit mit PySpark einrichten.

Beginnen Sie mit der Installation der neuesten stabilen Version Apache Funken:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Installieren Sie PySpark und Jupyter in der virtuellen Umgebung:

pip install pyspark jupyter

Umgebungsvariablen einrichten:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Definiert haben notebook-dirkönnen wir unsere Notizbücher im gewünschten Ordner ablegen.

Jupyter von PySpark aus starten

Da wir Jupiter als PySpark-Treiber konfigurieren konnten, können wir Jupyter Notebook nun im Kontext von PySpark ausführen.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Spark mit MLflow erweitern

Wie oben erwähnt, bietet MLflow eine Funktion zum Protokollieren von Modellartefakten in S3. Sobald wir das ausgewählte Modell in unseren Händen halten, haben wir die Möglichkeit, es über das Modul als UDF zu importieren mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Spark mit MLflow erweitern
PySpark – Ausgabe von Vorhersagen zur Weinqualität

Bisher haben wir darüber gesprochen, wie man PySpark mit MLflow verwendet und Weinqualitätsvorhersagen für den gesamten Weindatensatz durchführt. Was aber, wenn Sie Python-MLflow-Module von Scala Spark verwenden müssen?

Auch dies haben wir getestet, indem wir den Spark-Kontext zwischen Scala und Python aufgeteilt haben. Das heißt, wir haben MLflow UDF in Python registriert und es von Scala aus verwendet (ja, vielleicht nicht die beste Lösung, aber was wir haben).

Scala Spark + MLflow

Für dieses Beispiel werden wir hinzufügen Toree-Kernel in den existierenden Jupiter.

Installieren Sie Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Wie Sie dem beigefügten Notizbuch entnehmen können, wird die UDF von Spark und PySpark gemeinsam genutzt. Wir hoffen, dass dieser Teil für diejenigen nützlich sein wird, die Scala lieben und Modelle für maschinelles Lernen in der Produktion einsetzen möchten.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Nächste Schritte

Obwohl sich MLflow zum Zeitpunkt des Schreibens in der Alpha-Version befindet, sieht es recht vielversprechend aus. Allein die Möglichkeit, mehrere Frameworks für maschinelles Lernen auszuführen und sie von einem einzigen Endpunkt aus zu nutzen, bringt Empfehlungssysteme auf die nächste Ebene.

Darüber hinaus bringt MLflow Dateningenieure und Datenwissenschaftsspezialisten näher zusammen und schafft eine gemeinsame Ebene zwischen ihnen.

Nach dieser Erkundung von MLflow sind wir zuversichtlich, dass wir weitermachen und es für unsere Spark-Pipelines und Empfehlungssysteme verwenden werden.

Es wäre schön, den Dateispeicher mit der Datenbank statt mit dem Dateisystem zu synchronisieren. Dadurch sollten wir mehrere Endpunkte erhalten, die denselben Dateispeicher verwenden können. Verwenden Sie beispielsweise mehrere Instanzen Presto и Athena mit dem gleichen Glue-Metastore.

Zusammenfassend möchte ich mich bei der MLFlow-Community dafür bedanken, dass sie unsere Arbeit mit Daten interessanter gemacht hat.

Wenn Sie mit MLflow herumspielen, zögern Sie nicht, uns zu schreiben und uns mitzuteilen, wie Sie es verwenden, und noch mehr, wenn Sie es in der Produktion verwenden.

Erfahren Sie mehr über die Kurse:
Maschinelles Lernen. Grundkurs
Maschinelles Lernen. Fortgeschrittener Kurs

Weiterlesen:

Source: habr.com

Kommentar hinzufügen