Rozszerzanie platformy Spark za pomocą MLflow

Witam mieszkańców Chabrowska. Jak już pisaliśmy, w tym miesiącu OTUS uruchamia jednocześnie dwa kursy uczenia maszynowego, a mianowicie baza и zaawansowane. W związku z tym nadal udostępniamy przydatne materiały.

Celem tego artykułu jest omówienie naszych pierwszych doświadczeń z używaniem MLflow.

Rozpoczynamy recenzję MLflow z serwera śledzącego i rejestrować wszystkie iteracje badania. Następnie podzielimy się naszymi doświadczeniami z łączenia Sparka z MLflow za pomocą UDF.

Kontekst

Jesteśmy w Zdrowie alfa Wykorzystujemy uczenie maszynowe i sztuczną inteligencję, aby umożliwić ludziom przejęcie kontroli nad swoim zdrowiem i dobrym samopoczuciem. Właśnie dlatego modele uczenia maszynowego stanowią serce opracowywanych przez nas produktów do nauki danych i dlatego przyciągnęła nas MLflow, platforma typu open source, która obejmuje wszystkie aspekty cyklu życia uczenia maszynowego.

MLflow

Głównym celem MLflow jest zapewnienie dodatkowej warstwy poza uczeniem maszynowym, która umożliwiłaby badaczom danych pracę z niemal każdą biblioteką uczenia maszynowego (H2o, keras, mskok, płomień, szorować и tensorflow), przenosząc swoją pracę na wyższy poziom.

MLflow zapewnia trzy komponenty:

  • Śledzenie – rejestracja i prośby o eksperymenty: kod, dane, konfiguracja i wyniki. Bardzo ważne jest monitorowanie procesu tworzenia modelu.
  • Projekty – Format opakowania do uruchomienia na dowolnej platformie (np. SageMaker)
  • modele – wspólny format przesyłania modeli do różnych narzędzi wdrożeniowych.

MLflow (w wersji alfa w chwili pisania tego tekstu) to platforma typu open source, która umożliwia zarządzanie cyklem życia uczenia maszynowego, w tym eksperymentowaniem, ponownym wykorzystaniem i wdrażaniem.

Konfigurowanie MLflow

Aby skorzystać z MLflow musisz najpierw skonfigurować całe środowisko Pythona, do tego będziemy używać PyEnv (aby zainstalować Python na Macu, sprawdź tutaj). W ten sposób możemy stworzyć środowisko wirtualne, w którym zainstalujemy wszystkie biblioteki niezbędne do jego uruchomienia.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Zainstalujmy wymagane biblioteki.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Uwaga: używamy PyArrow do uruchamiania modeli takich jak UDF. Wersje PyArrow i Numpy wymagały naprawy, ponieważ te ostatnie wersje kolidowały ze sobą.

Uruchom interfejs śledzenia

MLflow Tracking pozwala nam rejestrować i wysyłać zapytania do eksperymentów przy użyciu języka Python i REST API. Ponadto można określić, gdzie mają być przechowywane artefakty modelu (localhost, Amazon S3, Magazyn obiektów Blob Azure, Google Cloud Storage lub Serwer SFTP). Ponieważ używamy AWS w Alpha Health, naszym miejscem do przechowywania artefaktów będzie S3.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow zaleca używanie trwałego magazynu plików. Magazyn plików to miejsce, w którym serwer będzie przechowywać metadane przebiegu i eksperymentu. Uruchamiając serwer, upewnij się, że wskazuje on magazyn plików trwałych. Tutaj do eksperymentu po prostu użyjemy /tmp.

Pamiętaj, że jeśli chcemy wykorzystać serwer mlflow do przeprowadzenia starych eksperymentów, muszą one znajdować się w magazynie plików. Jednak nawet bez tego moglibyśmy ich użyć w UDF, ponieważ potrzebujemy jedynie ścieżki do modelu.

Uwaga: należy pamiętać, że interfejs śledzenia i klient modelu muszą mieć dostęp do lokalizacji artefaktu. Oznacza to, że niezależnie od faktu, że interfejs śledzenia znajduje się w instancji EC2, podczas lokalnego uruchamiania MLflow maszyna musi mieć bezpośredni dostęp do S3, aby móc zapisywać modele artefaktów.

Rozszerzanie platformy Spark za pomocą MLflow
Śledzący interfejs użytkownika przechowuje artefakty w zasobniku S3

Modele biegowe

Gdy tylko serwer śledzenia zostanie uruchomiony, możesz rozpocząć trenowanie modeli.

Jako przykład wykorzystamy modyfikację wina z przykładu MLflow w Wyczyść.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Jak już wspomnieliśmy, MLflow umożliwia rejestrowanie parametrów modelu, metryk i artefaktów, dzięki czemu można śledzić ich ewolucję w iteracjach. Ta funkcja jest niezwykle przydatna, ponieważ w ten sposób możemy odtworzyć najlepszy model, kontaktując się z serwerem śledzącym lub sprawdzając, który kod wykonał wymaganą iterację, korzystając z dzienników skrótów zatwierdzeń git.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Rozszerzanie platformy Spark za pomocą MLflow
Iteracje wina

Część serwerowa dla modelu

Serwer śledzący MLflow, uruchamiany za pomocą polecenia „mlflow server”, posiada API REST umożliwiające śledzenie przebiegów i zapisywanie danych do lokalnego systemu plików. Możesz określić adres serwera śledzącego za pomocą zmiennej środowiskowej „MLFLOW_TRACKING_URI”, a interfejs API śledzenia MLflow automatycznie skontaktuje się z serwerem śledzącym pod tym adresem w celu utworzenia/odbioru informacji o uruchomieniu, metryk dziennika itp.

Źródło: Dokumenty// Uruchamianie serwera śledzącego

Aby zapewnić modelowi serwer, potrzebujemy działającego serwera śledzącego (patrz interfejs uruchamiania) i identyfikatora uruchomienia modelu.

Rozszerzanie platformy Spark za pomocą MLflow
Uruchom identyfikator

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Aby udostępniać modele za pomocą funkcji obsługi MLflow, będziemy potrzebować dostępu do interfejsu użytkownika śledzenia, aby otrzymywać informacje o modelu po prostu poprzez określenie --run_id.

Gdy model skontaktuje się z serwerem śledzenia, możemy uzyskać nowy punkt końcowy modelu.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Działające modele firmy Spark

Pomimo tego, że serwer Tracking jest na tyle wydajny, aby obsługiwać modele w czasie rzeczywistym, szkolić je i korzystać z funkcjonalności serwera (źródło: mlflow // docs // modele # lokalne), wykorzystanie Sparka (wsadowo lub strumieniowo) jest jeszcze potężniejszym rozwiązaniem ze względu na dystrybucję.

Wyobraź sobie, że po prostu przeprowadziłeś szkolenie w trybie offline, a następnie zastosowałeś model wyjściowy do wszystkich swoich danych. To tutaj błyszczą Spark i MLflow.

Zainstaluj PySpark + Jupyter + Spark

Źródło: Rozpocznij pracę z PySpark — Jupyter

Aby pokazać, jak stosujemy modele MLflow do ramek danych Spark, musimy skonfigurować notatniki Jupyter do współpracy z PySpark.

Zacznij od zainstalowania najnowszej stabilnej wersji Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Zainstaluj PySpark i Jupyter w środowisku wirtualnym:

pip install pyspark jupyter

Skonfiguruj zmienne środowiskowe:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Po zdefiniowaniu notebook-dir, możemy przechowywać nasze notesy w wybranym folderze.

Uruchamianie Jupytera z PySpark

Ponieważ udało nam się skonfigurować Jupitera jako sterownik PySpark, możemy teraz uruchomić notatnik Jupyter w kontekście PySpark.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Rozszerzanie platformy Spark za pomocą MLflow

Jak wspomniano powyżej, MLflow udostępnia funkcję rejestrowania artefaktów modelu w S3. Gdy tylko wybrany model trafi w nasze ręce, mamy możliwość zaimportowania go w formacie UDF za pomocą modułu mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Rozszerzanie platformy Spark za pomocą MLflow
PySpark — wyświetlanie prognoz jakości wina

Do tego momentu mówiliśmy o tym, jak używać PySpark z MLflow, uruchamiając prognozy jakości wina dla całego zbioru danych wina. Ale co, jeśli potrzebujesz użyć modułów Python MLflow od Scala Spark?

To również przetestowaliśmy, dzieląc kontekst Sparka między Scalę i Python. Oznacza to, że zarejestrowaliśmy MLflow UDF w Pythonie i użyliśmy go ze Scali (tak, może nie jest to najlepsze rozwiązanie, ale jakie mamy).

Scala Spark + MLflow

W tym przykładzie dodamy Jądro Toree do istniejącego Jowisza.

Zainstaluj Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Jak widać z dołączonego notatnika, UDF jest współdzielony pomiędzy Spark i PySpark. Mamy nadzieję, że ta część będzie przydatna dla tych, którzy kochają Scalę i chcą wdrożyć modele uczenia maszynowego w środowisku produkcyjnym.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Kolejne kroki

Mimo że w chwili pisania tego tekstu MLflow jest w wersji Alpha, wygląda całkiem obiecująco. Sama możliwość uruchamiania wielu platform uczenia maszynowego i korzystania z nich z jednego punktu końcowego przenosi systemy rekomendacyjne na wyższy poziom.

Ponadto MLflow zbliża do siebie inżynierów danych i specjalistów Data Science, tworząc między nimi wspólną warstwę.

Po tej eksploracji MLflow jesteśmy pewni, że pójdziemy dalej i wykorzystamy go w naszych potokach Spark i systemach rekomendacyjnych.

Byłoby miło zsynchronizować przechowywanie plików z bazą danych zamiast z systemem plików. To powinno dać nam wiele punktów końcowych, które mogą korzystać z tego samego miejsca na pliki. Na przykład użyj wielu instancji presto и Athena z tym samym metastorem Glue.

Podsumowując, chciałbym podziękować społeczności MLFlow za uczynienie naszej pracy z danymi bardziej interesującą.

Jeśli bawisz się MLflow, nie wahaj się do nas napisać i powiedzieć, jak go używasz, a tym bardziej, jeśli używasz go w produkcji.

Dowiedz się więcej o kursach:
Nauczanie maszynowe. Kurs podstawowy
Nauczanie maszynowe. Zaawansowany kurs

Czytaj więcej:

Źródło: www.habr.com

Dodaj komentarz