Udvidelse af Spark med MLflow

Hej Khabrovites. Som vi allerede har skrevet, lancerer OTUS i denne måned to kurser om maskinlæring på én gang, nemlig grundlag и fremskreden. I den forbindelse fortsætter vi med at dele nyttigt materiale.

Formålet med denne artikel er at fortælle om vores første oplevelse med MLflow.

Vi starter gennemgangen MLflow fra sin sporingsserver og prolog alle gentagelser af undersøgelsen. Så vil vi dele oplevelsen af ​​at forbinde Spark med MLflow ved hjælp af UDF.

kontekst

Vi er inde Alpha Sundhed vi bruger maskinlæring og kunstig intelligens til at give folk mulighed for at tage vare på deres sundhed og velvære. Det er derfor, maskinlæringsmodeller er kernen i de dataprodukter, vi udvikler, og hvorfor MLflow, en open source-platform, der dækker alle aspekter af maskinlærings-livscyklussen, fangede vores opmærksomhed.

MLflow

Hovedmålet med MLflow er at give et ekstra lag oven på maskinlæring, der vil gøre det muligt for datavidenskabsfolk at arbejde med næsten ethvert maskinlæringsbibliotek (h2o, Keras, spring, pytorch, lære и tensorflow), tager hendes arbejde til næste niveau.

MLflow indeholder tre komponenter:

  • Sporing – registrering og anmodninger om eksperimenter: kode, data, konfiguration og resultater. Det er meget vigtigt at følge processen med at skabe en model.
  • Projekter – Pakkeformat til at køre på enhver platform (f.eks. SageMaker)
  • Modeller er et almindeligt format til indsendelse af modeller til forskellige implementeringsværktøjer.

MLflow (alfa i skrivende stund) er en open source-platform, der giver dig mulighed for at administrere maskinlæringslivscyklussen, herunder eksperimentering, genbrug og implementering.

Opsætning af MLflow

For at bruge MLflow skal du først opsætte hele Python-miljøet, til dette vil vi bruge PyEnv (for at installere Python på en Mac, tag et kig her). Så vi kan skabe et virtuelt miljø, hvor vi installerer alle de biblioteker, der er nødvendige for at køre.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Installer de nødvendige biblioteker.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Bemærk: Vi bruger PyArrow til at køre modeller som UDF'er. Versioner af PyArrow og Numpy skulle rettes, fordi de seneste versioner var i konflikt med hinanden.

Starter Tracking UI

MLflow Tracking giver os mulighed for at logge og forespørge på eksperimenter med Python og REST API. Derudover kan du definere, hvor modelartefakter skal opbevares (localhost, Amazon S3, Azure Blob Storage, Google Cloud Storage eller SFTP-server). Da vi bruger AWS hos Alpha Health, vil S3 være lageret for artefakterne.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow anbefaler at bruge vedvarende fillagring. Fillageret er det sted, hvor serveren gemmer kørsels- og eksperimentmetadata. Når du starter serveren, skal du sørge for, at den peger på vedvarende fillagring. Her vil vi for eksperimentets skyld blot bruge /tmp.

Husk, at hvis vi vil bruge mlflow-serveren til at køre gamle eksperimenter, skal de være til stede i fillageret. Men selv uden dette ville vi være i stand til at bruge dem i UDF, da vi kun behøver vejen til modellen.

Bemærk: Husk, at sporings-UI og modelklienten skal have adgang til artefaktens placering. Det vil sige, at uanset at Tracking UI er placeret i en EC2-instans, skal maskinen, når den kører MLflow lokalt, have direkte adgang til S3 for at skrive artefaktmodeller.

Udvidelse af Spark med MLflow
Sporingsbrugergrænseflade gemmer artefakter i S3-bøtten

Løbende modeller

Så snart sporingsserveren kører, kan du begynde at træne modellerne.

Som eksempel vil vi bruge vinmodifikationen fra MLflow-eksemplet i Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Som vi sagde, giver MLflow dig mulighed for at logge parametre, metrikker og modellere artefakter, så du kan spore, hvordan de udvikler sig som iterationer. Denne funktion er ekstremt nyttig, fordi den giver os mulighed for at reproducere den bedste model ved at kontakte sporingsserveren eller forstå, hvilken kode der udførte den påkrævede iteration ved hjælp af git-hash-logs for commits.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Udvidelse af Spark med MLflow
vin iterationer

Bagenden til modellen

MLflow-sporingsserveren lanceret med kommandoen "mlflow server" har en REST API til at spore kørsler og skrive data til det lokale filsystem. Du kan angive adressen på sporingsserveren ved hjælp af miljøvariablen "MLFLOW_TRACKING_URI", og MLflow-sporings-API'en vil automatisk kontakte sporingsserveren på denne adresse for at oprette/hente lanceringsoplysninger, logningsmålinger osv.

Kilde: Docs// Kørsel af en sporingsserver

For at forsyne modellen med en server, har vi brug for en kørende sporingsserver (se startgrænsefladen) og modellens Run ID.

Udvidelse af Spark med MLflow
Kør ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

For at betjene modeller, der bruger MLflow-servefunktionaliteten, skal vi have adgang til Tracking UI for at få oplysninger om modellen ved blot at angive --run_id.

Når modellen kontakter sporingsserveren, kan vi få et nyt modelslutpunkt.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Løbende modeller fra Spark

På trods af at sporingsserveren er kraftig nok til at betjene modeller i realtid, skal du træne dem og bruge serverfunktionaliteten (kilde: mlflow // docs // modeller #local), at bruge Spark (batch eller streaming) er en endnu mere kraftfuld løsning på grund af dens distribution.

Forestil dig, at du lige har lavet offline træning og derefter har anvendt outputmodellen på alle dine data. Det er her, Spark og MLflow kommer til deres ret.

Installer PySpark + Jupyter + Spark

Kilde: Kom i gang PySpark - Jupyter

For at vise, hvordan vi anvender MLflow-modeller til Spark-datarammer, skal vi konfigurere Jupyter-notebooks til at arbejde med PySpark.

Start med at installere den seneste stabile version Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Installer PySpark og Jupyter i et virtuelt miljø:

pip install pyspark jupyter

Opsæt miljøvariabler:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Efter at have defineret notebook-dir, vil vi være i stand til at gemme vores notesbøger i den ønskede mappe.

Kører Jupyter fra PySpark

Da vi var i stand til at konfigurere Jupiter som en PySpark-driver, kan vi nu køre Jupyter-notesbogen i en PySpark-sammenhæng.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Udvidelse af Spark med MLflow

Som nævnt ovenfor giver MLflow funktionen til at logge modelartefakter i S3. Så snart vi har den valgte model i hænderne, har vi mulighed for at importere den som UDF ved hjælp af modulet mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Udvidelse af Spark med MLflow
PySpark - Forudsigelse af vinkvalitet

Indtil nu har vi talt om, hvordan man bruger PySpark med MLflow ved at køre forudsigelse af vinkvalitet på hele vindatasættet. Men hvad nu hvis du skal bruge Python MLflow-modulerne fra Scala Spark?

Vi testede også dette ved at opdele Spark-konteksten mellem Scala og Python. Det vil sige, at vi registrerede MLflow UDF i Python, og brugte det fra Scala (ja, måske ikke den bedste løsning, men hvad vi har).

Scala Spark + MLflow

For dette eksempel vil vi tilføje Toree Kernel ind i en eksisterende Jupiter.

Installer Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Som du kan se fra den vedhæftede notesbog, er UDF delt mellem Spark og PySpark. Vi håber, at denne del vil være nyttig for dem, der elsker Scala og ønsker at implementere maskinlæringsmodeller til produktion.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Næste trin

Selvom MLflow er i Alpha i skrivende stund, ser det ret lovende ud. Bare det at kunne køre flere maskinlæringsrammer og bruge dem fra et enkelt slutpunkt tager anbefalingssystemer til det næste niveau.

Derudover bringer MLflow dataingeniører og dataforskere tættere på hinanden og lægger et fælles lag mellem dem.

Efter denne udforskning af MLflow er vi sikre på at gå videre og bruge det til vores Spark-pipelines og anbefalingssystemer.

Det ville være rart at synkronisere fillageret med databasen i stedet for filsystemet. Dette skulle give os flere endepunkter, der kan bruge den samme fildeling. Brug for eksempel flere instanser Presto и Athena med samme Glue metastore.

Sammenfattende vil jeg gerne sige tak til MLFlow-fællesskabet for at gøre vores arbejde med data mere interessant.

Hvis du spiller med MLflow, er du velkommen til at skrive til os og fortælle os, hvordan du bruger det, og endnu mere hvis du bruger det i produktionen.

Lær mere om kurser:
maskinelæring. Grundkursus
maskinelæring. avanceret kursus

Læs mere:

Kilde: www.habr.com

Tilføj en kommentar