Forlenger Spark med MLflow

Hei, innbyggere i Khabrovsk. Som vi allerede har skrevet, lanserer OTUS denne måneden to maskinlæringskurs samtidig, nemlig utgangspunkt и avansert. I denne forbindelse fortsetter vi å dele nyttig materiale.

Hensikten med denne artikkelen er å snakke om vår første erfaring med å bruke MLflow.

Vi starter anmeldelsen MLflow fra sporingsserveren og logg alle iterasjoner av studien. Deretter vil vi dele vår erfaring med å koble Spark med MLflow ved hjelp av UDF.

Kontekst

Vi er i Alpha Helse Vi bruker maskinlæring og kunstig intelligens for å gi folk mulighet til å ta ansvar for deres helse og velvære. Det er derfor maskinlæringsmodeller er kjernen i datavitenskapsproduktene vi utvikler, og det er derfor vi ble trukket til MLflow, en åpen kildekodeplattform som dekker alle aspekter av maskinlæringslivssyklusen.

MLflow

Hovedmålet med MLflow er å gi et ekstra lag på toppen av maskinlæring som vil tillate dataforskere å jobbe med nesten alle maskinlæringsbiblioteker (h2o, hard, hopp, pytorch, lære и tensorflow), tar arbeidet hennes til neste nivå.

MLflow har tre komponenter:

  • Sporing – opptak og forespørsler om eksperimenter: kode, data, konfigurasjon og resultater. Å overvåke prosessen med å lage en modell er veldig viktig.
  • Prosjekter – Pakkeformat for å kjøre på hvilken som helst plattform (f.eks. SageMaker)
  • Modeller – et vanlig format for å sende inn modeller til ulike distribusjonsverktøy.

MLflow (i alfa i skrivende stund) er en åpen kildekode-plattform som lar deg administrere livssyklusen for maskinlæring, inkludert eksperimentering, gjenbruk og distribusjon.

Sette opp MLflow

For å bruke MLflow må du først sette opp hele Python-miljøet, for dette vil vi bruke PyEnv (for å installere Python på Mac, sjekk ut her). På denne måten kan vi lage et virtuelt miljø der vi installerer alle bibliotekene som er nødvendige for å kjøre det.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

La oss installere de nødvendige bibliotekene.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Merk: Vi bruker PyArrow til å kjøre modeller som UDF. Versjonene av PyArrow og Numpy måtte fikses fordi de sistnevnte versjonene kom i konflikt med hverandre.

Start sporingsgrensesnittet

MLflow Tracking lar oss logge og spørre eksperimenter ved hjelp av Python og REST API. I tillegg kan du bestemme hvor du skal lagre modellartefakter (localhost, Amazon S3, Azure Blob Storage, Google Cloud Storage eller SFTP-server). Siden vi bruker AWS hos Alpha Health, vil artefaktlagringen vår være S3.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow anbefaler å bruke vedvarende fillagring. Fillagring er der serveren vil lagre kjøre- og eksperimentmetadata. Når du starter serveren, sørg for at den peker til det vedvarende fillageret. Her for eksperimentet vil vi ganske enkelt bruke /tmp.

Husk at hvis vi ønsker å bruke mlflow-serveren til å kjøre gamle eksperimenter, må de være tilstede i fillagringen. Men selv uten dette kunne vi bruke dem i UDF, siden vi bare trenger veien til modellen.

Merk: Husk at sporingsgrensesnittet og modellklienten må ha tilgang til artefaktplasseringen. Det vil si, uavhengig av det faktum at sporingsgrensesnittet ligger i en EC2-instans, når MLflow kjøres lokalt, må maskinen ha direkte tilgang til S3 for å skrive artefaktmodeller.

Forlenger Spark med MLflow
Sporingsgrensesnitt lagrer artefakter i en S3-bøtte

Løpende modeller

Så snart sporingsserveren kjører, kan du begynne å trene modellene.

Som et eksempel vil vi bruke vinmodifikasjonen fra MLflow-eksemplet i Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Som vi allerede har diskutert, lar MLflow deg logge modellparametere, beregninger og artefakter slik at du kan spore hvordan de utvikler seg over iterasjoner. Denne funksjonen er ekstremt nyttig fordi vi på denne måten kan reprodusere den beste modellen ved å kontakte sporingsserveren eller forstå hvilken kode som utførte den nødvendige iterasjonen ved å bruke git-hash-loggene for forpliktelser.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Forlenger Spark med MLflow
Iterasjoner av vin

Serverdel for modellen

MLflow-sporingsserveren, lansert ved hjelp av "mlflow server"-kommandoen, har en REST API for sporing av kjøringer og skriving av data til det lokale filsystemet. Du kan spesifisere sporingsserveradressen ved å bruke miljøvariabelen "MLFLOW_TRACKING_URI", og MLflow sporings-API vil automatisk kontakte sporingsserveren på denne adressen for å opprette/motta lanseringsinformasjon, loggberegninger osv.

Kilde: Dokumenter// Kjøre en sporingsserver

For å gi modellen en server, trenger vi en kjørende sporingsserver (se lanseringsgrensesnitt) og kjørings-IDen til modellen.

Forlenger Spark med MLflow
Kjør ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

For å betjene modeller som bruker MLflow-serverfunksjonaliteten, trenger vi tilgang til sporingsgrensesnittet for å motta informasjon om modellen ganske enkelt ved å spesifisere --run_id.

Når modellen kontakter sporingsserveren, kan vi få et nytt modellendepunkt.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Løpemodeller fra Spark

Til tross for at sporingsserveren er kraftig nok til å betjene modeller i sanntid, tren dem opp og bruk serverfunksjonaliteten (kilde: mlflow // docs // models # local), er bruken av Spark (batch eller streaming) en enda kraftigere løsning på grunn av distribusjon.

Tenk deg at du ganske enkelt utførte opplæringen offline og deretter brukte utdatamodellen på alle dataene dine. Det er her Spark og MLflow skinner.

Installer PySpark + Jupyter + Spark

Kilde: Kom i gang PySpark - Jupyter

For å vise hvordan vi bruker MLflow-modeller på Spark-datarammer, må vi sette opp Jupyter-notatbøker for å fungere sammen med PySpark.

Start med å installere den siste stabile versjonen Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Installer PySpark og Jupyter i det virtuelle miljøet:

pip install pyspark jupyter

Sett opp miljøvariabler:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Etter å ha bestemt notebook-dir, kan vi lagre notatbøkene våre i ønsket mappe.

Lanserer Jupyter fra PySpark

Siden vi var i stand til å konfigurere Jupiter som en PySpark-driver, kan vi nå kjøre Jupyter notebook i PySpark-sammenheng.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Forlenger Spark med MLflow

Som nevnt ovenfor gir MLflow en funksjon for logging av modellartefakter i S3. Så snart vi har den valgte modellen i våre hender, har vi muligheten til å importere den som en UDF ved hjelp av modulen mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Forlenger Spark med MLflow
PySpark - Utgir vinkvalitetsspådommer

Frem til dette punktet har vi snakket om hvordan du bruker PySpark med MLflow, og kjører vinkvalitetsprediksjoner på hele vindatasettet. Men hva om du trenger å bruke Python MLflow-moduler fra Scala Spark?

Vi testet dette også ved å dele Spark-konteksten mellom Scala og Python. Det vil si at vi registrerte MLflow UDF i Python, og brukte det fra Scala (ja, kanskje ikke den beste løsningen, men det vi har).

Scala Spark + MLflow

For dette eksemplet vil vi legge til Toree kjerne inn i den eksisterende Jupiter.

Installer Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Som du kan se fra den vedlagte notatboken, er UDF delt mellom Spark og PySpark. Vi håper denne delen vil være nyttig for de som elsker Scala og ønsker å distribuere maskinlæringsmodeller i produksjon.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

De neste trinnene

Selv om MLflow er i Alpha-versjon i skrivende stund, ser det ganske lovende ut. Bare muligheten til å kjøre flere maskinlæringsrammeverk og konsumere dem fra ett enkelt endepunkt tar anbefalersystemer til neste nivå.

I tillegg bringer MLflow dataingeniører og datavitenskapsspesialister nærmere hverandre, og legger et felles lag mellom dem.

Etter denne utforskningen av MLflow er vi sikre på at vi vil gå videre og bruke den til våre Spark-rørledninger og anbefalingssystemer.

Det ville vært fint å synkronisere fillagringen med databasen i stedet for filsystemet. Dette bør gi oss flere endepunkter som kan bruke samme fillagring. Bruk for eksempel flere forekomster Presto и Athena med samme Glue metastore.

For å oppsummere vil jeg takke MLFlow-fellesskapet for å gjøre arbeidet vårt med data mer interessant.

Hvis du leker med MLflow, ikke nøl med å skrive til oss og fortelle oss hvordan du bruker det, og enda mer hvis du bruker det i produksjon.

Finn ut mer om kursene:
Maskinlæring. Grunnkurs
Maskinlæring. Videregående kurs

Les mer:

Kilde: www.habr.com

Legg til en kommentar