Spark uitbreiden met MLflow

Hallo, inwoners van Khabrovsk. Zoals we al schreven lanceert OTUS deze maand twee machine learning-cursussen tegelijk, namelijk baseren и Geavanceerd. In dit opzicht blijven we nuttig materiaal delen.

Het doel van dit artikel is om te praten over onze eerste ervaringen met het gebruik MLstroom.

We beginnen met de beoordeling MLstroom vanaf de trackingserver en log alle iteraties van het onderzoek. Vervolgens delen we onze ervaring met het verbinden van Spark met MLflow met behulp van UDF.

verband

Wij zijn in Alfa Gezondheid We gebruiken machinaal leren en kunstmatige intelligentie om mensen in staat te stellen de leiding te nemen over hun gezondheid en welzijn. Dat is de reden waarom machine learning-modellen de kern vormen van de data science-producten die we ontwikkelen, en daarom werden we aangetrokken door MLflow, een open source platform dat alle aspecten van de machine learning-levenscyclus bestrijkt.

MLstroom

Het belangrijkste doel van MLflow is om een ​​extra laag bovenop machine learning te bieden, waardoor datawetenschappers met vrijwel elke machine learning-bibliotheek kunnen werken (h2o, Keras, msprong, pytorch, sluw и tensorflow), waardoor haar werk naar een hoger niveau wordt getild.

MLflow biedt drie componenten:

  • Tracking – vastleggen en aanvragen van experimenten: code, data, configuratie en resultaten. Het monitoren van het proces van het maken van een model is erg belangrijk.
  • Projecten – Verpakkingsformaat dat op elk platform kan worden uitgevoerd (bijv. SageMaker)
  • Modellen – een gemeenschappelijk formaat voor het indienen van modellen bij verschillende implementatietools.

MLflow (in alpha op het moment van schrijven) is een open source-platform waarmee u de levenscyclus van machine learning kunt beheren, inclusief experimenteren, hergebruik en implementatie.

MLflow instellen

Om MLflow te kunnen gebruiken moet je eerst je gehele Python-omgeving inrichten, hiervoor gaan wij gebruik maken van PyEnv (Om Python op Mac te installeren, ga naar hier). Op deze manier kunnen we een virtuele omgeving creëren waarin we alle bibliotheken zullen installeren die nodig zijn om het uit te voeren.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Laten we de vereiste bibliotheken installeren.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Opmerking: we gebruiken PyArrow om modellen zoals UDF uit te voeren. De versies van PyArrow en Numpy moesten worden gerepareerd omdat de laatste versies met elkaar conflicteerden.

Start de tracking-UI

Met MLflow Tracking kunnen we experimenten loggen en opvragen met behulp van Python en REST API. Bovendien kunt u bepalen waar modelartefacten (localhost, Amazon S3, Azure Blob-opslag, Google Cloud Storage of SFTP-server). Omdat we AWS gebruiken bij Alpha Health, zal onze artefactopslag S3 zijn.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow raadt aan om permanente bestandsopslag te gebruiken. Bestandsopslag is waar de server metagegevens van uitvoeringen en experimenten opslaat. Zorg er bij het starten van de server voor dat deze verwijst naar het permanente bestandsarchief. Hier voor het experiment zullen we gewoon gebruiken /tmp.

Houd er rekening mee dat als we de mlflow-server willen gebruiken om oude experimenten uit te voeren, deze aanwezig moeten zijn in de bestandsopslag. Maar zelfs zonder dit zouden we ze in de UDF kunnen gebruiken, omdat we alleen het pad naar het model nodig hebben.

Opmerking: Houd er rekening mee dat de Tracking-UI en de modelclient toegang moeten hebben tot de artefactlocatie. Dat wil zeggen dat, ongeacht het feit dat de Tracking UI zich in een EC2-instantie bevindt, de machine bij het lokaal uitvoeren van MLflow directe toegang tot S3 moet hebben om artefactmodellen te schrijven.

Spark uitbreiden met MLflow
Tracking-UI slaat artefacten op in een S3-bucket

Lopende modellen

Zodra de Tracking-server draait, kunt u beginnen met het trainen van de modellen.

Als voorbeeld gebruiken we de wijnmodificatie uit het MLflow-voorbeeld in Sleren.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Zoals we al hebben besproken, kunt u met MLflow modelparameters, statistieken en artefacten loggen, zodat u kunt volgen hoe deze zich in de loop van iteraties ontwikkelen. Deze functie is uiterst nuttig omdat we op deze manier het beste model kunnen reproduceren door contact op te nemen met de Tracking-server of te begrijpen welke code de vereiste iteratie heeft uitgevoerd met behulp van de git-hashlogs van commits.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Spark uitbreiden met MLflow
Wijn iteraties

Servergedeelte voor het model

De MLflow-trackingserver, gelanceerd met de opdracht ‘mlflow server’, heeft een REST API voor het volgen van runs en het schrijven van gegevens naar het lokale bestandssysteem. U kunt het adres van de trackingserver opgeven met behulp van de omgevingsvariabele “MLFLOW_TRACKING_URI” en de MLflow-tracking-API zal automatisch contact opnemen met de trackingserver op dit adres om startinformatie te maken/ontvangen, statistieken te loggen, enz.

Bron: Docs// Een trackingserver draaien

Om het model van een server te voorzien, hebben we een actieve trackingserver nodig (zie opstartinterface) en de Run ID van het model.

Spark uitbreiden met MLflow
ID uitvoeren

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Om modellen te bedienen met behulp van de MLflow-servicefunctionaliteit, hebben we toegang nodig tot de Tracking-UI om informatie over het model te ontvangen door eenvoudigweg op te geven --run_id.

Zodra het model contact maakt met de Tracking-server, kunnen we een nieuw modeleindpunt krijgen.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Hardloopmodellen van Spark

Ondanks dat de Tracking-server krachtig genoeg is om modellen in realtime te bedienen, te trainen en de serverfunctionaliteit te gebruiken (bron: mlflow // docs // modellen # lokaal), is het gebruik van Spark (batch of streaming) door distributie een nog krachtigere oplossing.

Stel je voor dat je de training simpelweg offline hebt gedaan en vervolgens het outputmodel op al je data hebt toegepast. Dit is waar Spark en MLflow schitteren.

Installeer PySpark + Jupyter + Spark

Bron: Aan de slag PySpark - Jupyter

Om te laten zien hoe we MLflow-modellen toepassen op Spark-dataframes, moeten we Jupyter-notebooks instellen om samen te werken met PySpark.

Begin met het installeren van de nieuwste stabiele versie Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Installeer PySpark en Jupyter in de virtuele omgeving:

pip install pyspark jupyter

Omgevingsvariabelen instellen:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Na bepaald te hebben notebook-dirkunnen wij onze notitieboekjes in de gewenste map opbergen.

Jupyter starten vanuit PySpark

Omdat we Jupiter als PySpark-stuurprogramma konden configureren, kunnen we nu Jupyter-notebook in de context van PySpark draaien.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Spark uitbreiden met MLflow

Zoals hierboven vermeld, biedt MLflow een functie voor het loggen van modelartefacten in S3. Zodra we het geselecteerde model in handen hebben, hebben we de mogelijkheid om het met behulp van de module als UDF te importeren mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Spark uitbreiden met MLflow
PySpark - Voorspellingen over de wijnkwaliteit uitvoeren

Tot nu toe hebben we gesproken over het gebruik van PySpark met MLflow, waarbij voorspellingen van de wijnkwaliteit worden uitgevoerd op de volledige wijngegevensset. Maar wat als u Python MLflow-modules van Scala Spark moet gebruiken?

We hebben dit ook getest door de Spark-context te splitsen tussen Scala en Python. Dat wil zeggen, we hebben MLflow UDF in Python geregistreerd en vanuit Scala gebruikt (ja, misschien niet de beste oplossing, maar wat we hebben).

Scala Spark + MLflow

Voor dit voorbeeld zullen we toevoegen Toree Kernel in de bestaande Jupiter.

Installeer Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Zoals u kunt zien in het bijgevoegde notitieblok, wordt de UDF gedeeld tussen Spark en PySpark. We hopen dat dit deel nuttig zal zijn voor degenen die van Scala houden en machine learning-modellen in productie willen implementeren.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Volgende stappen

Ook al bevindt MLflow zich op het moment van schrijven in de Alpha-versie, het ziet er veelbelovend uit. Alleen al de mogelijkheid om meerdere machine learning-frameworks uit te voeren en deze vanaf één eindpunt te gebruiken, tilt aanbevelingssystemen naar een hoger niveau.

Daarnaast brengt MLflow Data Engineers en Data Science specialisten dichter bij elkaar, waardoor er een gemeenschappelijke laag tussen hen ontstaat.

Na deze verkenning van MLflow hebben we er vertrouwen in dat we verder zullen gaan en het zullen gebruiken voor onze Spark-pijplijnen en aanbevelingssystemen.

Het zou leuk zijn om de bestandsopslag te synchroniseren met de database in plaats van met het bestandssysteem. Dit zou ons meerdere eindpunten moeten opleveren die dezelfde bestandsopslag kunnen gebruiken. Gebruik bijvoorbeeld meerdere exemplaren Presto и Athene met dezelfde Glue-metastore.

Samenvattend wil ik de MLFlow-gemeenschap bedanken voor het interessanter maken van ons werk met data.

Als u met MLflow aan het spelen bent, aarzel dan niet om ons te schrijven en ons te vertellen hoe u het gebruikt, en nog meer als u het in productie gebruikt.

Meer informatie over de cursussen:
Machinaal leren. Basiscursus
Machinaal leren. Cursus voor gevorderden

Lees verder:

Bron: www.habr.com

Voeg een reactie