Spark mat MLflow verlängeren

Moien, Khabrovites. Wéi mir scho geschriwwen hunn, lancéiert dëse Mount OTUS zwee Coursen iwwer Maschinnléieren op eemol, nämlech Basis и fortgeschratt. An dëser Hisiicht wäerte mir weider nëtzlech Material deelen.

Den Zweck vun dësem Artikel ass iwwer eis éischt Erfahrung ze schwätzen mat MLflow.

Mir fänken un der Iwwerpréiwung MLflow vu sengem Tracking Server a prolog all Iteratiounen vun der Studie. Da wäerte mir d'Erfahrung deelen fir Spark mat MLflow mat UDF ze verbannen.

Kontext

Mir sinn an Alpha Gesondheet mir benotze Maschinnléieren a kënschtlech Intelligenz fir d'Leit z'erméiglechen fir hir Gesondheet a Wuelbefannen ze këmmeren. Dofir sinn Maschinnléieremodeller am Kär vun den Dateprodukter déi mir entwéckelen, a firwat MLflow, eng Open Source Plattform déi all Aspekter vum Maschinnléiere Liewenszyklus ofdeckt, eis Opmierksamkeet gefaangen huet.

MLflow

D'Haaptziel vum MLflow ass eng zousätzlech Schicht uewen op Maschinnléieren ze bidden, déi Datewëssenschaftler erlaabt mat bal all Maschinnléierebibliothéik ze schaffen (h2o, keras, mellen, pytorch, léieren и tensorflow), hir Aarbecht op den nächsten Niveau ze huelen.

MLflow bitt dräi Komponenten:

  • Tracking - Opnam an Ufroe fir Experimenter: Code, Daten, Konfiguratioun a Resultater. Et ass ganz wichteg de Prozess vun der Schafung vun engem Modell ze verfollegen.
  • Projeten - Verpackungsformat fir op all Plattform ze lafen (zum Beispill, SageMaker)
  • Modeller ass e gemeinsame Format fir Modeller op verschidden Deployment Tools ofzeginn.

MLflow (Alpha zum Schreiwen) ass eng Open Source Plattform déi Iech erlaabt de Maschinn Léieren Liewenszyklus ze managen, inklusiv Experimenter, Wiederverwendung an Deployment.

MLflow opsetzen

Fir MLflow ze benotzen, musst Dir als éischt dat ganzt Python-Ëmfeld opbauen, dofir benotze mir PyEnv (fir Python op engem Mac z'installéieren, kuckt hei). Also kënne mir e virtuellt Ëmfeld erstellen wou mir all déi néideg Bibliothéiken installéiere fir ze lafen.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Installéiert déi néideg Bibliothéiken.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Notiz: Mir benotze PyArrow fir Modeller wéi UDFs ze lafen. D'Versioune vu PyArrow an Numpy hu misse fixéiert ginn, well déi lescht Versioune matenee Konflikt waren.

Start Tracking UI

MLflow Tracking erlaabt eis Experimenter mat Python a Rescht API. Zousätzlech kënnt Dir definéieren wou Modellartefakte gespäichert ginn (localhost, Amazon S3, Azure Blob Storage, Google Cloud Storage oder SFTP Server). Well mir AWS bei Alpha Health benotzen, wäert S3 d'Späichere fir d'Artefakte sinn.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow recommandéiert persistent Dateilagerung ze benotzen. D'Dateispäicherung ass wou de Server Lafen an Experimenter Metadaten späichert. Wann Dir de Server start, gitt sécher datt et op persistent Dateilagerung weist. Hei, zum Wuel vum Experiment, wäerte mir einfach benotzen /tmp.

Denkt drun datt wa mir den mlflow-Server benotze fir al Experimenter auszeféieren, mussen se am Dateigeschäft präsent sinn. Mä och ouni dëst kéinte mir se an der UDF benotzen, well mir nëmmen de Wee zum Modell brauchen.

Bemierkung: Denkt drun datt d'Tracking UI an de Model Client Zougang zum Artefakt Standuert hunn. Dat ass, onofhängeg vun der Tatsaach datt d'Tracking UI an enger EC2 Instanz läit, wann Dir MLflow lokal leeft, muss d'Maschinn direkten Zougang zu S3 hunn fir Artefaktmodeller ze schreiwen.

Spark mat MLflow verlängeren
Tracking UI späichert Artefakte am S3 Eemer

Lafen Modeller

Soubal den Tracking Server leeft, kënnt Dir d'Modeller trainéieren.

Als Beispill benotze mir d'Wäinmodifikatioun vum MLflow Beispill an Sklern.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Wéi mir gesot hunn, MLflow erlaabt Iech d'Parameteren, Metriken a Modellartefakte ze protokolléieren, sou datt Dir verfollege kënnt wéi se sech als Iteratiounen entwéckelen. Dës Fonktioun ass extrem nëtzlech, well et eis erlaabt de beschte Modell ze reproduzéieren andeems Dir den Tracking Server kontaktéiert oder versteet wéi ee Code déi erfuerderlech Iteratioun gemaach huet mat de Git Hash Logbicher vun de Verpflichtungen.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Spark mat MLflow verlängeren
Wäin Iteratiounen

Réck Enn fir de Modell

Den MLflow Tracking Server, deen mam Kommando "mlflow Server" gestart gouf, huet e REST API fir Lafen ze verfolgen an Daten op de lokalen Dateiesystem ze schreiwen. Dir kënnt d'Adress vum Tracking-Server mat der "MLFLOW_TRACKING_URI" Ëmfeldvariabel spezifizéieren an d'MLflow Tracking API kontaktéiert automatesch den Tracking-Server op dëser Adress fir Startinformatioun ze kreéieren/kréien, Metriken aloggen, etc.

Source: Docs// En Tracking Server lafen

Fir de Modell mat engem Server ze bidden, brauche mir e Lafen Tracking Server (kuckt d'Startinterface) an d'Run ID vum Modell.

Spark mat MLflow verlängeren
Run ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Fir Modeller mat der MLflow Serve Funktionalitéit ze déngen, brauche mir Zougang zu der Tracking UI fir Informatioun iwwer de Modell ze kréien andeems Dir einfach spezifizéiert --run_id.

Wann de Modell den Tracking Server kontaktéiert, kënne mir en neie Modellendpunkt kréien.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Lafen Modeller aus Spark

Trotz der Tatsaach datt den Tracking Server mächteg genuch ass fir Modeller an Echtzäit ze déngen, trainéiert se a benotzt d'Serverfunktionalitéit (Quell: mlflow // docs // models #local), mat Spark (Batch oder Streaming) ass eng nach méi mächteg Léisung wéinst senger Verdeelung.

Stellt Iech vir datt Dir just offline Training gemaach hutt an dann den Ausgangsmodell op all Är Donnéeën ugewannt hutt. Dëst ass wou Spark an MLflow an hir eegen kommen.

Installéiert PySpark + Jupyter + Spark

Source: Start PySpark - Jupyter

Fir ze weisen wéi mir MLflow Modeller op Spark Dataframes applizéieren, musse mir Jupyter Notizbicher opsetzen fir mat PySpark ze schaffen.

Start andeems Dir déi lescht stabil Versioun installéiert Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Installéiert PySpark a Jupyter an engem virtuellen Ëmfeld:

pip install pyspark jupyter

Ëmfeld Variablen opsetzen:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Definéiert hunn notebook-dir, kënne mir eis Notizbicher am gewënschten Dossier späicheren.

Lafen Jupyter aus PySpark

Well mir de Jupiter als PySpark Chauffer opgeriicht hunn, kënne mir elo de Jupyter Notizbuch an engem PySpark Kontext lafen.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Spark mat MLflow verlängeren

Wéi uewen erwähnt, bitt MLflow d'Funktioun fir Modellartefakte am S3 ze protokolléieren. Soubal mir de gewielte Modell an den Hänn hunn, hu mir d'Méiglechkeet et als UDF mam Modul ze importéieren mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Spark mat MLflow verlängeren
PySpark - Viraussoen Wäin Qualitéit

Bis zu dësem Zäitpunkt hu mir geschwat wéi Dir PySpark mat MLflow benotzt andeems Dir Wäinqualitéitsprognose op der ganzer Wäindateset leeft. Awer wat wann Dir d'Python MLflow Moduler vu Scala Spark benotze musst?

Mir hunn dëst och getest andeems de Spark Kontext tëscht Scala a Python opgedeelt ass. Dat ass, mir registréiert MLflow UDF am Python, a benotzt et aus Scala (jo, vläicht net déi bescht Léisung, mee wat mir hunn).

Scala Spark + MLflow

Fir dëst Beispill wäerte mir addéieren Toree Kernel an en existente Jupiter.

Installéiert Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Wéi Dir aus dem angeschlossene Notizbuch kënnt gesinn, gëtt UDF tëscht Spark a PySpark gedeelt. Mir hoffen datt dësen Deel nëtzlech ass fir déi, déi Scala gär hunn a Maschinnléiere Modeller an d'Produktioun wëllen ofsetzen.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Nächst Schrëtt

Och wann MLflow am Alpha am Moment vum Schreiwen ass, gesäit et zimlech villverspriechend aus. Just fäeg sinn verschidde Maschinnléiere Kaderen auszeféieren an se vun engem eenzegen Endpunkt ze benotzen hëlt Empfehlersystemer op den nächsten Niveau.

Zousätzlech bréngt MLflow Dateingenieuren an Datewëssenschaftler méi no zesummen, a leet eng gemeinsam Schicht tëscht hinnen.

No dëser Exploratioun vum MLflow si mir sécher weider ze goen a se fir eis Spark Pipelines an Empfehlungssystemer ze benotzen.

Et wier flott d'Dateilagerung mat der Datebank ze synchroniséieren amplaz vum Dateiesystem. Dëst sollt eis e puer Endpunkte ginn, déi deeselwechte Dateideele benotze kënnen. Zum Beispill, benotzt verschidde Instanzen Presto и Athena mat der selwechter Glue metastore.

Zesummefaassend wëll ech der MLFlow Gemeinschaft Merci soen fir eis Aarbecht mat Daten méi interessant ze maachen.

Wann Dir mat MLflow spillt, schreiwt eis gratis a sot eis wéi Dir et benotzt, an nach méi wann Dir et an der Produktioun benotzt.

Léiert méi iwwer Coursen:
Maschinn Léieren. Grondcours
Maschinn Léieren. fortgeschratt Course

Liest méi:

Source: will.com

Setzt e Commentaire