Utöka Spark med MLflow

Hej, Khabrovites. Som vi redan skrivit lanserar OTUS denna månad två kurser om maskininlärning på en gång, nämligen bas и Avancerad. I detta avseende fortsätter vi att dela användbart material.

Syftet med den här artikeln är att prata om vår första erfarenhet av MLflow.

Vi kommer att påbörja granskningen MLflow från dess spårningsserver och prolog alla iterationer av studien. Sedan kommer vi att dela upplevelsen av att koppla Spark med MLflow med UDF.

sammanhang

Vi är inne Alpha hälsa vi använder maskininlärning och artificiell intelligens för att ge människor möjlighet att ta hand om sin hälsa och sitt välbefinnande. Det är därför maskininlärningsmodeller är kärnan i de dataprodukter vi utvecklar, och varför MLflow, en öppen källkodsplattform som täcker alla aspekter av maskininlärningslivscykeln, fångade vår uppmärksamhet.

MLflow

Huvudmålet med MLflow är att tillhandahålla ett extra lager ovanpå maskininlärning som skulle göra det möjligt för datavetare att arbeta med nästan alla maskininlärningsbibliotek (h2o, Keras, mleap, pytorch, lära sig и tensorflow), tar hennes arbete till nästa nivå.

MLflow tillhandahåller tre komponenter:

  • Spårning – Registrering och förfrågningar om experiment: kod, data, konfiguration och resultat. Det är mycket viktigt att följa processen för att skapa en modell.
  • Projekt – Förpackningsformat för att köras på vilken plattform som helst (t.ex. SageMaker)
  • Modeller är ett vanligt format för att skicka in modeller till olika distributionsverktyg.

MLflow (alfa i skrivande stund) är en öppen källkodsplattform som låter dig hantera maskininlärningslivscykeln, inklusive experiment, återanvändning och driftsättning.

Konfigurera MLflow

För att använda MLflow måste du först ställa in hela Python-miljön, för detta kommer vi att använda PyEnv (för att installera Python på en Mac, ta en titt här). Så vi kan skapa en virtuell miljö där vi installerar alla bibliotek som behövs för att köra.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Installera de nödvändiga biblioteken.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Obs: Vi använder PyArrow för att köra modeller som UDF. Versioner av PyArrow och Numpy behövde fixas eftersom de senaste versionerna var i konflikt med varandra.

Startar Tracking UI

MLflow Tracking låter oss logga och fråga experiment med Python och REST API. Dessutom kan du definiera var modellartefakter ska lagras (localhost, Amazon S3, Azure Blob Storage, Google Cloud Storage eller SFTP-server). Eftersom vi använder AWS på Alpha Health kommer S3 att vara lagringen för artefakterna.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow rekommenderar att du använder beständig fillagring. Fillagringen är där servern lagrar körnings- och experimentmetadata. När du startar servern, se till att den pekar på beständig fillagring. Här, för experimentets skull, kommer vi helt enkelt att använda /tmp.

Tänk på att om vi vill använda mlflow-servern för att köra gamla experiment måste de finnas i filarkivet. Men även utan detta skulle vi kunna använda dem i UDF, eftersom vi bara behöver vägen till modellen.

Obs: Tänk på att spårningsgränssnittet och modellklienten måste ha åtkomst till artefaktens plats. Det vill säga, oavsett det faktum att spårningsgränssnittet finns i en EC2-instans, när MLflow körs lokalt, måste maskinen ha direktåtkomst till S3 för att skriva artefaktmodeller.

Utöka Spark med MLflow
Spårningsgränssnittet lagrar artefakter i S3-hinken

Löpande modeller

Så fort spårningsservern är igång kan du börja träna modellerna.

Som ett exempel kommer vi att använda vinmodifieringen från MLflow-exemplet i Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Som vi sa låter MLflow dig logga parametrar, mätvärden och modellera artefakter så att du kan spåra hur de utvecklas som iterationer. Den här funktionen är extremt användbar, eftersom den tillåter oss att återskapa den bästa modellen genom att kontakta spårningsservern eller förstå vilken kod som utförde den nödvändiga iterationen med hjälp av git-hash-loggarna för commits.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Utöka Spark med MLflow
vin iterationer

Bakände för modellen

MLflow-spårningsservern som lanseras med kommandot "mlflow server" har ett REST API för att spåra körningar och skriva data till det lokala filsystemet. Du kan ange adressen till spårningsservern med miljövariabeln "MLFLOW_TRACKING_URI" och MLflow tracking API kommer automatiskt att kontakta spårningsservern på den här adressen för att skapa/hämta körinformation, loggningsstatistik, etc.

Källa: Dokument// Köra en spårningsserver

För att förse modellen med en server behöver vi en körspårningsserver (se startgränssnittet) och modellens kör-ID.

Utöka Spark med MLflow
Kör ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

För att betjäna modeller som använder MLflow-serverfunktionen behöver vi tillgång till spårningsgränssnittet för att få information om modellen genom att helt enkelt ange --run_id.

När modellen kontaktar spårningsservern kan vi få en ny modellslutpunkt.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Löpande modeller från Spark

Trots att spårningsservern är tillräckligt kraftfull för att betjäna modeller i realtid, träna dem och använd serverfunktionaliteten (källa: mlflow // docs // modeller #local), att använda Spark (batch eller streaming) är en ännu kraftfullare lösning på grund av dess distribution.

Föreställ dig att du bara tränade offline och sedan använde utdatamodellen på all din data. Det är här Spark och MLflow kommer till sin rätt.

Installera PySpark + Jupyter + Spark

Källa: Kom igång PySpark - Jupyter

För att visa hur vi tillämpar MLflow-modeller på Spark-dataramar måste vi ställa in Jupyter-anteckningsböcker för att fungera med PySpark.

Börja med att installera den senaste stabila versionen Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Installera PySpark och Jupyter i en virtuell miljö:

pip install pyspark jupyter

Ställ in miljövariabler:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Efter att ha definierat notebook-dir, kommer vi att kunna lagra våra anteckningsböcker i önskad mapp.

Kör Jupyter från PySpark

Eftersom vi kunde ställa in Jupiter som PySpark-drivrutin kan vi nu köra Jupyter-anteckningsboken i PySpark-sammanhang.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Utöka Spark med MLflow

Som nämnts ovan tillhandahåller MLflow funktionen att logga modellartefakter i S3. Så fort vi har den valda modellen i våra händer har vi möjlighet att importera den som en UDF med hjälp av modulen mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Utöka Spark med MLflow
PySpark - Förutsäga vinkvalitet

Fram till denna punkt har vi pratat om hur man använder PySpark med MLflow genom att köra vinkvalitetsförutsägelse på hela vindataset. Men vad händer om du behöver använda Python MLflow-modulerna från Scala Spark?

Vi testade detta också genom att dela upp Spark-kontexten mellan Scala och Python. Det vill säga vi registrerade MLflow UDF i Python, och använde det från Scala (ja, kanske inte den bästa lösningen, men vad vi har).

Scala Spark + MLflow

För detta exempel kommer vi att lägga till Toree Kernel in i en befintlig Jupiter.

Installera Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Som du kan se från den bifogade anteckningsboken delas UDF mellan Spark och PySpark. Vi hoppas att den här delen kommer att vara användbar för dem som älskar Scala och vill distribuera maskininlärningsmodeller till produktion.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Nästa steg

Även om MLflow är i Alpha i skrivande stund ser det ganska lovande ut. Bara att kunna köra flera ramverk för maskininlärning och använda dem från en enda slutpunkt tar rekommendatorsystem till nästa nivå.

Dessutom för MLflow dataingenjörer och dataforskare närmare varandra och lägger ett gemensamt lager mellan dem.

Efter den här utforskningen av MLflow kommer vi garanterat att gå vidare och använda det för våra Spark-pipelines och rekommendationssystem.

Det skulle vara trevligt att synkronisera fillagringen med databasen istället för filsystemet. Detta borde ge oss flera slutpunkter som kan använda samma filresurs. Använd till exempel flera instanser Presto и Athena med samma Glue metastore.

Sammanfattningsvis skulle jag vilja säga tack till MLFlow-communityt för att göra vårt arbete med data mer intressant.

Om du spelar med MLflow, skriv gärna till oss och berätta hur du använder det, och ännu mer om du använder det i produktionen.

Läs mer om kurser:
maskininlärning. Grundkurs
maskininlärning. avancerad kurs

Läs mer:

Källa: will.com

Lägg en kommentar