A Spark kiterjesztése MLflow-val

Sziasztok Habrovsk lakosai. Ahogy már írtuk, ebben a hónapban az OTUS egyszerre két gépi tanulási tanfolyamot indít, mégpedig bázis и fejlett. Ezzel kapcsolatban továbbra is hasznos anyagokat osztunk meg.

Ennek a cikknek az a célja, hogy bemutassuk első tapasztalatainkat a használattal MLflow.

Elkezdjük a felülvizsgálatot MLflow nyomkövető szerveréről, és naplózza a vizsgálat összes iterációját. Ezután megosztjuk tapasztalatainkat a Spark és az MLflow összekapcsolásáról UDF használatával.

Kontextus

Benne vagyunk Alfa egészség Gépi tanulást és mesterséges intelligenciát használunk, hogy képessé tegyük az embereket arra, hogy felelősséget vállaljanak egészségükért és jólétükért. Ezért a gépi tanulási modellek állnak az általunk fejlesztett adattudományi termékek középpontjában, és ezért vonzott minket az MLflow, egy nyílt forráskódú platform, amely a gépi tanulás életciklusának minden aspektusát lefedi.

MLflow

Az MLflow fő célja, hogy egy további réteget biztosítson a gépi tanuláson felül, amely lehetővé teszi az adattudósok számára, hogy szinte bármilyen gépi tanulási könyvtárral dolgozhassanak (h2o, keras, mleap, pytorch, sklearn и tensorflow), magasabb szintre emeli munkáját.

Az MLflow három összetevőt tartalmaz:

  • Csomagkövetés – rögzítés és kísérleti kérések: kód, adatok, konfiguráció és eredmények. Nagyon fontos a modellalkotás folyamatának nyomon követése.
  • projektek – Csomagolási formátum bármilyen platformon futtatható (pl. SageMaker)
  • Modellek – a modellek különféle telepítési eszközökbe való beküldésének közös formátuma.

Az MLflow (az írás idején alfa állapotban) egy nyílt forráskódú platform, amely lehetővé teszi a gépi tanulási életciklus kezelését, beleértve a kísérletezést, az újrahasználatot és a telepítést.

Az MLflow beállítása

Az MLflow használatához először be kell állítania a teljes Python-környezetet, ezt fogjuk használni PyEnv (A Python Mac rendszerre történő telepítéséhez nézze meg itt). Így létrehozhatunk egy virtuális környezetet, ahol a futtatásához szükséges összes könyvtárat telepítjük.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Telepítsük a szükséges könyvtárakat.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Megjegyzés: A PyArrow-t használjuk olyan modellek futtatására, mint például az UDF. A PyArrow és a Numpy verzióit javítani kellett, mert az utóbbi verziók ütköztek egymással.

Indítsa el a Tracking UI-t

Az MLflow Tracking lehetővé teszi a kísérletek naplózását és lekérdezését a Python és a REST API. Ezenkívül meghatározhatja, hogy hol tárolja a modelltermékeket (localhost, Amazon S3, Azure Blob Storage, Google Cloud Storage vagy SFTP szerver). Mivel az Alpha Healthnél AWS-t használunk, a műterméktárhelyünk S3 lesz.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

Az MLflow tartós fájltárolást javasol. A fájltároló az a hely, ahol a szerver tárolja a futtatási és kísérleti metaadatokat. A kiszolgáló indításakor ügyeljen arra, hogy az állandó fájltárolóra mutasson. Itt a kísérlethez egyszerűen használjuk /tmp.

Ne feledje, hogy ha az mlflow szervert szeretnénk használni régi kísérletek futtatására, akkor azoknak jelen kell lenniük a fájltárolóban. Azonban e nélkül is használhatjuk őket az UDF-ben, hiszen csak a modell elérési útjára van szükségünk.

Megjegyzés: Ne feledje, hogy a nyomkövető felhasználói felületnek és a modellkliensnek hozzáféréssel kell rendelkeznie a műtermék helyéhez. Vagyis attól függetlenül, hogy a nyomkövetési felhasználói felület egy EC2-példányban található, az MLflow helyi futtatásakor a gépnek közvetlen hozzáféréssel kell rendelkeznie az S3-hoz, hogy műtermékmodelleket írhasson.

A Spark kiterjesztése MLflow-val
A nyomkövető felhasználói felület egy S3 tárolóban tárolja a műtermékeket

Futó modellek

Amint a nyomkövető szerver fut, megkezdheti a modellek betanítását.

Példaként az MLflow példájából származó bormódosítást fogjuk használni Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Ahogy már megbeszéltük, az MLflow lehetővé teszi a modellparaméterek, metrikák és műtermékek naplózását, így nyomon követheti, hogyan fejlődnek az iterációk során. Ez a funkció rendkívül hasznos, mert így a legjobb modellt reprodukálhatjuk, ha kapcsolatba lépünk a Tracking szerverrel, vagy megértjük, hogy melyik kód hajtotta végre a szükséges iterációt a véglegesítések git hash naplói segítségével.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

A Spark kiterjesztése MLflow-val
Boriterációk

Szerver alkatrész a modellhez

Az „mlflow server” paranccsal elindított MLflow nyomkövető szerver REST API-val rendelkezik a futások nyomon követéséhez és az adatok helyi fájlrendszerbe írásához. A nyomkövető szerver címét az „MLFLOW_TRACKING_URI” környezeti változó használatával adhatja meg, és az MLflow nyomkövető API automatikusan kapcsolatba lép a nyomkövető szerverrel ezen a címen, hogy létrehozza/fogadja az indítási információkat, naplózási mérőszámokat stb.

Forrás: Dokumentumok// Nyomkövető szerver futtatása

A modell szerverrel való ellátásához szükségünk van egy futó nyomkövető szerverre (lásd az indítófelületet) és a modell futtatási azonosítójára.

A Spark kiterjesztése MLflow-val
Futtatási azonosító

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Az MLflow kiszolgálási funkciót használó modellek kiszolgálásához hozzá kell férnünk a nyomkövető felhasználói felülethez, hogy információt kaphassunk a modellről a --run_id.

Miután a modell kapcsolatba lép a nyomkövető szerverrel, kaphatunk egy új modell végpontot.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Futó modellek a Sparktól

Annak ellenére, hogy a Tracking szerver elég erős ahhoz, hogy valós időben kiszolgálja a modelleket, betanítsa őket és használja a szerver funkcióit (forrás: mlflow // docs // modellek # helyi), a Spark használata (kötegelt vagy streaming) a terjesztés miatt még hatékonyabb megoldás.

Képzelje el, hogy egyszerűen elvégezte a képzést offline, majd alkalmazta a kimeneti modellt az összes adatára. Itt ragyog a Spark és az MLflow.

Telepítse a PySpark + Jupyter + Spark alkalmazást

Forrás: Kezdő lépések PySpark – Jupyter

Annak bemutatásához, hogyan alkalmazzuk az MLflow-modelleket a Spark-adatkeretekre, be kell állítanunk a Jupyter-jegyzetfüzeteket, hogy együttműködjenek a PySparkkal.

Kezdje a legújabb stabil verzió telepítésével Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Telepítse a PySparkot és a Jupytert a virtuális környezetben:

pip install pyspark jupyter

Környezeti változók beállítása:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Miután elhatározta notebook-dir, a jegyzetfüzeteinket a kívánt mappában tárolhatjuk.

A Jupyter indítása a PySparkból

Mivel a Jupitert tudtuk PySpark-illesztőprogramként konfigurálni, a Jupyter notebookot a PySpark kontextusában is futtathatjuk.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

A Spark kiterjesztése MLflow-val

Ahogy fentebb említettük, az MLflow szolgáltatást biztosít a modell melléktermékeinek naplózására az S3-ban. Amint a kiválasztott modell a kezünkben van, lehetőségünk van a modul segítségével UDF-ként importálni mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

A Spark kiterjesztése MLflow-val
PySpark – Borminőségi előrejelzések kiadása

Eddig a pontig beszéltünk arról, hogyan használhatjuk a PySparkot az MLflow-val, amely borminőségi előrejelzéseket futtat a teljes boradatkészleten. De mi van, ha a Scala Spark Python MLflow moduljait kell használnia?

Ezt is úgy teszteltük, hogy a Spark környezetet felosztottuk a Scala és a Python között. Vagyis Pythonban regisztráltuk az MLflow UDF-et, és a Scalából használtuk (igen, talán nem a legjobb megoldás, de ami van).

Scala Spark + MLflow

Ehhez a példához hozzáadjuk Toree Kernel a meglévő Jupiterbe.

Telepítse a Spark + Toree + Jupyter alkalmazást

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Amint a mellékelt jegyzetfüzetből látható, az UDF meg van osztva a Spark és a PySpark között. Reméljük, hogy ez a rész hasznos lesz azoknak, akik szeretik a Scalát, és gépi tanulási modelleket szeretnének üzembe helyezni az éles környezetben.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Következő lépések

Annak ellenére, hogy az MLflow az írás idején Alpha verzióban van, meglehetősen ígéretesnek tűnik. A több gépi tanulási keretrendszer futtatásának és egyetlen végpontról történő fogyasztásának képessége az ajánlórendszereket a következő szintre emeli.

Ezenkívül az MLflow közelebb hozza egymáshoz az adatmérnököket és az adattudományi szakembereket, közös réteget fektetve közéjük.

Az MLflow ezen feltárása után biztosak vagyunk abban, hogy tovább fogunk lépni, és használni fogjuk Spark csővezetékeinkhez és ajánlórendszereinkhez.

Jó lenne a fájltárolót az adatbázissal szinkronizálni a fájlrendszer helyett. Ennek több végpontot kell kapnia, amelyek ugyanazt a fájltárolót használhatják. Például használjon több példányt Gyors и Athéné ugyanazzal a Glue metastore-ral.

Összefoglalva, szeretném megköszönni az MLFlow közösségnek, hogy érdekesebbé tették az adatokkal kapcsolatos munkánkat.

Ha az MLflow-val játszol, ne habozzon, írjon nekünk, és mondja el, hogyan használja, és még inkább, ha termelésben használja.

Tudjon meg többet a tanfolyamokról:
Gépi tanulás. Alaptanfolyam
Gépi tanulás. Haladó tanfolyam

Olvass tovább:

Forrás: will.com

Hozzászólás