Uitbreiding van Spark met MLflow

Hallo, Khabrovites. Soos ons reeds geskryf het, loods OTUS hierdie maand twee kursusse oor masjienleer gelyktydig, naamlik basis и gevorderd. In hierdie verband gaan ons voort om nuttige materiaal te deel.

Die doel van hierdie artikel is om te praat oor ons eerste ervaring met MLvloei.

Ons sal die hersiening begin MLvloei vanaf sy opsporingsbediener en prolog alle iterasies van die studie. Dan sal ons die ervaring deel om Spark met MLflow te verbind met UDF.

konteks

Ons is in Alpha Gesondheid ons gebruik masjienleer en kunsmatige intelligensie om mense te bemagtig om na hul gesondheid en welstand om te sien. Dit is hoekom masjienleermodelle die kern vorm van die dataprodukte wat ons ontwikkel, en waarom MLflow, 'n oopbronplatform wat alle aspekte van die masjienleer-lewensiklus dek, ons aandag getrek het.

MLvloei

Die hoofdoel van MLflow is om 'n bykomende laag bo-op masjienleer te voorsien wat datawetenskaplikes in staat sal stel om met byna enige masjienleerbiblioteek te werk (h2o, keras, mleap, takkie, leer и tensorflow), neem haar werk na die volgende vlak.

MLflow bied drie komponente:

  • Dop - opname en versoeke vir eksperimente: kode, data, konfigurasie en resultate. Dit is baie belangrik om die proses van die skep van 'n model te volg.
  • projekte - Verpakkingsformaat om op enige platform te loop (byvoorbeeld, SageMaker)
  • Modelle is 'n algemene formaat vir die indiening van modelle aan verskeie ontplooiingsnutsgoed.

MLflow (alfa ten tyde van skryf) is 'n oopbronplatform waarmee u die masjienleer-lewensiklus kan bestuur, insluitend eksperimentering, hergebruik en ontplooiing.

Stel MLflow op

Om MLflow te gebruik, moet jy eers die hele Python-omgewing opstel, hiervoor sal ons gebruik PyEnv (kyk gerus om Python op 'n Mac te installeer hier). Ons kan dus 'n virtuele omgewing skep waar ons al die biblioteke sal installeer wat nodig is om te hardloop.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Installeer die vereiste biblioteke.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Let wel: Ons gebruik PyArrow om modelle soos UDF's te laat loop. Die weergawes van PyArrow en Numpy moes reggemaak word omdat die nuutste weergawes in konflik met mekaar was.

Begin dop-UI

MLflow Tracking stel ons in staat om eksperimente aan te teken en navraag te doen met Python en REST API. Daarbenewens kan jy definieer waar om model artefakte te stoor (localhost, Amazon S3, Azure Blob-berging, Google Cloud Storage of SFTP-bediener). Aangesien ons AWS by Alpha Health gebruik, sal S3 die stoorplek vir die artefakte wees.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow beveel aan om aanhoudende lêerberging te gebruik. Die lêerberging is waar die bediener hardloop- en eksperiment-metadata sal stoor. Wanneer u die bediener begin, maak seker dit wys na aanhoudende lêerberging. Hier, ter wille van eksperiment, sal ons eenvoudig gebruik /tmp.

Hou in gedagte dat as ons die mlflow-bediener wil gebruik om ou eksperimente uit te voer, dit in die lêerstoor teenwoordig moet wees. Selfs daarsonder sou ons dit egter in die UDF kon gebruik, aangesien ons net die pad na die model benodig.

Let wel: Hou in gedagte dat die dop-UI en die modelkliënt toegang tot die artefak se ligging moet hê. Dit wil sê, ongeag die feit dat die Tracking UI in 'n EC2-instansie geleë is, wanneer MLflow plaaslik uitgevoer word, moet die masjien direkte toegang tot S3 hê om artefakmodelle te skryf.

Uitbreiding van Spark met MLflow
Naspoor-UI stoor artefakte in S3-emmer

Lopende modelle

Sodra die dopbediener aan die gang is, kan u die modelle begin oplei.

As 'n voorbeeld sal ons die wynmodifikasie van die MLflow-voorbeeld in Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Soos ons gesê het, laat MLflow jou toe om parameters, statistieke en artefakte aan te teken sodat jy kan naspoor hoe dit as iterasies ontwikkel. Hierdie kenmerk is uiters nuttig, want dit stel ons in staat om die beste model te reproduseer deur die dopbediener te kontak of te verstaan ​​watter kode die vereiste iterasie uitgevoer het deur die git-hash-logboeke van die commits te gebruik.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Uitbreiding van Spark met MLflow
wyniterasies

Agterkant vir die model

Die MLflow-opspoorbediener wat met die "mlflow-bediener"-opdrag van stapel gestuur is, het 'n REST API om lopies op te spoor en data na die plaaslike lêerstelsel te skryf. Jy kan die adres van die opsporingsbediener spesifiseer deur gebruik te maak van die "MLFLOW_TRACKING_URI" omgewingsveranderlike en die MLflow opsporings-API sal outomaties die opsporingsbediener by hierdie adres kontak om inligting te skep/loop te kry, aantekenmaatstawwe, ens.

Bron: Dokumente// Begin 'n opsporingsbediener

Om die model van 'n bediener te voorsien, benodig ons 'n lopende opsporingsbediener (sien die bekendstellingskoppelvlak) en die Run ID van die model.

Uitbreiding van Spark met MLflow
Hardloop ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Om modelle te bedien wat die MLflow-diensfunksionaliteit gebruik, benodig ons toegang tot die Tracking UI om inligting oor die model te kry deur bloot te spesifiseer --run_id.

Sodra die model die opsporingsbediener kontak, kan ons 'n nuwe model-eindpunt kry.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Lopende modelle van Spark

Ten spyte van die feit dat die opsporingsbediener kragtig genoeg is om modelle intyds te bedien, lei hulle op en gebruik die bedienerfunksies (bron: mlflow // dokumente // modelle #plaaslik), die gebruik van Spark (batch of streaming) is 'n selfs kragtiger oplossing vanweë die verspreiding daarvan.

Stel jou voor dat jy net vanlyn opleiding gedoen het en dan die uitsetmodel op al jou data toegepas het. Dit is waar Spark en MLflow tot hul reg kom.

Installeer PySpark + Jupyter + Spark

Bron: Begin PySpark - Jupyter

Om te wys hoe ons MLflow-modelle op Spark-datarame toepas, moet ons Jupyter-notaboeke opstel om met PySpark te werk.

Begin deur die nuutste stabiele weergawe te installeer Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Installeer PySpark en Jupyter in 'n virtuele omgewing:

pip install pyspark jupyter

Stel omgewingsveranderlikes op:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Gedefinieer notebook-dir, sal ons ons notaboeke in die verlangde gids kan stoor.

Hardloop Jupyter van PySpark

Aangesien ons Jupiter as 'n PySpark-bestuurder kon opstel, kan ons nou die Jupyter-notaboek in 'n PySpark-konteks laat loop.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Uitbreiding van Spark met MLflow

Soos hierbo genoem, bied MLflow die funksie om modelartefakte in S3 aan te teken. Sodra ons die geselekteerde model in ons hande het, het ons die geleentheid om dit as 'n UDF in te voer deur die module te gebruik mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Uitbreiding van Spark met MLflow
PySpark - Voorspel wynkwaliteit

Tot op hierdie stadium het ons gepraat oor hoe om PySpark met MLflow te gebruik deur wyngehaltevoorspelling op die hele wyndatastel uit te voer. Maar wat as jy die Python MLflow-modules van Scala Spark moet gebruik?

Ons het dit ook getoets deur die Spark-konteks tussen Scala en Python te verdeel. Dit wil sê, ons het MLflow UDF in Python geregistreer en dit vanaf Scala gebruik (ja, miskien nie die beste oplossing nie, maar wat ons het).

Scala Spark + MLflow

Vir hierdie voorbeeld sal ons byvoeg Toree Kernel in 'n bestaande Jupiter.

Installeer Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Soos u uit die aangehegte notaboek kan sien, word UDF tussen Spark en PySpark gedeel. Ons hoop dat hierdie deel nuttig sal wees vir diegene wat van Scala hou en masjienleermodelle na produksie wil ontplooi.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Volgende stappe

Alhoewel MLflow op die oomblik van skryf in Alpha is, lyk dit redelik belowend. Net om veelvuldige masjienleerraamwerke uit te voer en dit vanaf 'n enkele eindpunt te gebruik, neem aanbevelerstelsels na die volgende vlak.

Daarbenewens bring MLflow Data-ingenieurs en Data Scientists nader aan mekaar, en lê 'n gemeenskaplike laag tussen hulle.

Na hierdie verkenning van MLflow is ons vol vertroue dat ons sal voortgaan en dit sal gebruik vir ons Spark-pypleidings en aanbevelingstelsels.

Dit sal lekker wees om die lêerberging met die databasis te sinchroniseer in plaas van die lêerstelsel. Dit behoort vir ons verskeie eindpunte te gee wat dieselfde lêeraandeel kan gebruik. Gebruik byvoorbeeld verskeie gevalle presto и Athena met dieselfde Glue metastore.

Opsomming, ek wil graag dankie sê aan die MLFlow-gemeenskap wat ons werk met data interessanter gemaak het.

As jy met MLflow speel, skryf gerus vir ons en vertel ons hoe jy dit gebruik, en nog meer as jy dit in produksie gebruik.

Kom meer te wete oor kursusse:
Masjienleer. Basiese kursus
Masjienleer. gevorderde kursus

Lees meer:

Bron: will.com

Voeg 'n opmerking