Rozšíření Spark pomocí MLflow

Dobrý den, obyvatelé Chabrovska. Jak jsme již psali, tento měsíc OTUS spouští hned dva kurzy strojového učení, a to základna и pokročilý. V tomto ohledu nadále sdílíme užitečné materiály.

Účelem tohoto článku je pohovořit o našich prvních zkušenostech s používáním MLflow.

Začneme recenzi MLflow z jeho sledovacího serveru a zaznamenat všechny iterace studie. Poté se podělíme o naše zkušenosti s propojením Sparku s MLflow pomocí UDF.

Kontext

Jsme v Zdraví Alfa Využíváme strojové učení a umělou inteligenci, abychom lidem umožnili převzít kontrolu nad svým zdravím a blahobytem. Proto jsou modely strojového učení jádrem produktů pro datovou vědu, které vyvíjíme, a proto nás zaujala MLflow, platforma s otevřeným zdrojovým kódem, která pokrývá všechny aspekty životního cyklu strojového učení.

MLflow

Hlavním cílem MLflow je poskytnout další vrstvu nad strojovým učením, která by datovým vědcům umožnila pracovat s téměř jakoukoli knihovnou strojového učení (H2o, keras, mleap, pytorch, sklearn и tensorflow), posouvá její práci na další úroveň.

MLflow poskytuje tři komponenty:

  • Sledování – záznam a požadavky na experimenty: kód, data, konfigurace a výsledky. Sledování procesu tvorby modelu je velmi důležité.
  • Projekty – Formát balení pro běh na jakékoli platformě (např. SageMaker)
  • modely – společný formát pro odesílání modelů do různých nástrojů pro nasazení.

MLflow (v době psaní článku ve verzi alfa) je platforma s otevřeným zdrojovým kódem, která vám umožňuje spravovat životní cyklus strojového učení, včetně experimentování, opětovného použití a nasazení.

Nastavení MLflow

Chcete-li používat MLflow, musíte nejprve nastavit celé prostředí Pythonu, k tomu použijeme PyEnv (Chcete-li nainstalovat Python na Mac, podívejte se zde). Tímto způsobem můžeme vytvořit virtuální prostředí, kde nainstalujeme všechny knihovny potřebné k jeho spuštění.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Pojďme nainstalovat požadované knihovny.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Poznámka: PyArrow používáme ke spouštění modelů, jako je UDF. Verze PyArrow a Numpy bylo nutné opravit, protože poslední verze byly ve vzájemném konfliktu.

Spusťte uživatelské rozhraní sledování

MLflow Tracking nám umožňuje protokolovat a dotazovat experimenty pomocí Pythonu a REST API. Kromě toho můžete určit, kam ukládat artefakty modelu (localhost, Amazon S3, Azure Blob Storage, Google Cloud Storage nebo SFTP server). Protože v Alpha Health používáme AWS, naše úložiště artefaktů bude S3.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow doporučuje používat trvalé úložiště souborů. Úložiště souborů je místo, kam bude server ukládat metadata běhu a experimentu. Při spouštění serveru se ujistěte, že ukazuje na trvalé úložiště souborů. Zde pro experiment jednoduše použijeme /tmp.

Pamatujte, že pokud chceme použít mlflow server ke spuštění starých experimentů, musí být přítomny v úložišti souborů. Nicméně i bez toho bychom je mohli použít v UDF, protože potřebujeme pouze cestu k modelu.

Poznámka: Mějte na paměti, že uživatelské rozhraní sledování a klient modelu musí mít přístup k umístění artefaktu. To znamená, že bez ohledu na to, že Tracking UI sídlí v instanci EC2, při lokálním spuštění MLflow musí mít stroj přímý přístup k S3, aby mohl zapisovat modely artefaktů.

Rozšíření Spark pomocí MLflow
Sledovací uživatelské rozhraní ukládá artefakty do bucketu S3

Běžící modely

Jakmile je spuštěn Sledovací server, můžete začít trénovat modely.

Jako příklad použijeme modifikaci vína z příkladu MLflow v Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Jak jsme již probrali, MLflow vám umožňuje protokolovat parametry modelu, metriky a artefakty, abyste mohli sledovat, jak se vyvíjejí v průběhu iterací. Tato funkce je mimořádně užitečná, protože tímto způsobem můžeme reprodukovat nejlepší model kontaktováním sledovacího serveru nebo pochopením, který kód provedl požadovanou iteraci, pomocí git hash logů revizí.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Rozšíření Spark pomocí MLflow
Iterace vína

Serverová část pro model

Sledovací server MLflow, spuštěný pomocí příkazu „mlflow server“, má rozhraní REST API pro sledování běhů a zápis dat do místního systému souborů. Adresu sledovacího serveru můžete zadat pomocí proměnné prostředí „MLFLOW_TRACKING_URI“ a MLflow tracking API automaticky kontaktuje sledovací server na této adrese, aby vytvořil/přijal informace o spuštění, metriky protokolu atd.

Zdroj: Dokumenty// Spuštění sledovacího serveru

Abychom mohli modelu poskytnout server, potřebujeme běžící sledovací server (viz spouštěcí rozhraní) a Run ID modelu.

Rozšíření Spark pomocí MLflow
Spustit ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Abychom mohli obsluhovat modely pomocí funkce MLflow, budeme potřebovat přístup k uživatelskému rozhraní sledování, abychom mohli získat informace o modelu jednoduchým zadáním --run_id.

Jakmile model kontaktuje sledovací server, můžeme získat nový koncový bod modelu.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Běžecké modely od Spark

Přestože je Sledovací server dostatečně výkonný, aby obsluhoval modely v reálném čase, trénoval je a využíval funkcionalitu serveru (zdroj: mlflow // docs // modely # local), použití Spark (dávkové nebo streamování) je díky své distribuci ještě výkonnější řešení.

Představte si, že jste jednoduše provedli školení offline a poté aplikovali výstupní model na všechna svá data. To je místo, kde Spark a MLflow září.

Nainstalujte PySpark + Jupyter + Spark

Zdroj: Začněte PySpark - Jupyter

Abychom ukázali, jak aplikujeme modely MLflow na datové rámce Spark, musíme nastavit notebooky Jupyter, aby spolupracovaly s PySpark.

Začněte instalací nejnovější stabilní verze Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Nainstalujte PySpark a Jupyter do virtuálního prostředí:

pip install pyspark jupyter

Nastavení proměnných prostředí:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Po definování notebook-dir, můžeme si sešity uložit do požadované složky.

Spuštění Jupyter z PySpark

Protože jsme byli schopni nakonfigurovat Jupiter jako ovladač PySpark, můžeme nyní spouštět notebook Jupyter v kontextu PySpark.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Rozšíření Spark pomocí MLflow

Jak bylo uvedeno výše, MLflow poskytuje funkci pro protokolování artefaktů modelu v S3. Jakmile máme vybraný model v rukou, máme možnost jej pomocí modulu importovat jako UDF mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Rozšíření Spark pomocí MLflow
PySpark - výstup předpovědí kvality vína

Až do tohoto bodu jsme mluvili o tom, jak používat PySpark s MLflow a spouštět předpovědi kvality vína na celém souboru dat o víně. Co když ale potřebujete použít moduly Python MLflow od Scala Spark?

Testovali jsme to také rozdělením kontextu Spark mezi Scala a Python. To znamená, že jsme zaregistrovali MLflow UDF v Pythonu a použili jej ze Scaly (ano, možná to není nejlepší řešení, ale to, co máme).

Scala Spark + MLflow

Pro tento příklad doplníme Toree jádro do stávajícího Jupiteru.

Nainstalujte Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Jak můžete vidět z přiloženého poznámkového bloku, UDF je sdílen mezi Spark a PySpark. Doufáme, že tato část bude užitečná pro ty, kteří milují Scala a chtějí nasadit modely strojového učení ve výrobě.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Další kroky

I když je MLflow v době psaní článku ve verzi Alpha, vypadá docela slibně. Už jen možnost provozovat více rámců strojového učení a využívat je z jednoho koncového bodu posouvá doporučující systémy na další úroveň.

Kromě toho MLflow sbližuje datové inženýry a specialisty na datovou vědu a vytváří mezi nimi společnou vrstvu.

Po tomto prozkoumání MLflow jsme si jisti, že se posuneme vpřed a použijeme jej pro naše potrubí Spark a doporučovací systémy.

Bylo by hezké synchronizovat úložiště souborů s databází místo systému souborů. To by nám mělo poskytnout více koncových bodů, které mohou používat stejné úložiště souborů. Například použijte více instancí Presto и Athena se stejným metaúložištěm lepidla.

Abych to shrnul, rád bych poděkoval komunitě MLFlow za to, že naši práci s daty udělali zajímavější.

Pokud si s MLflow pohráváte, neváhejte nám napsat a řekněte nám, jak jej používáte, a ještě více, pokud jej používáte ve výrobě.

Zjistěte více o kurzech:
Strojové učení. Základní kurz
Strojové učení. Pokročilý kurz

Přečtěte si více:

Zdroj: www.habr.com

Přidat komentář