Разширяване на Spark с MLflow

Здравейте, жители на Хабровск. Както вече писахме, този месец OTUS стартира два курса за машинно обучение наведнъж, а именно база и напреднал. В тази връзка продължаваме да споделяме полезни материали.

Целта на тази статия е да говорим за нашия първи опит с използването MLflow.

Ще започнем прегледа MLflow от неговия сървър за проследяване и регистрирайте всички повторения на изследването. След това ще споделим нашия опит от свързването на Spark с MLflow с помощта на UDF.

контекст

Вътре сме Алфа Здраве Използваме машинно обучение и изкуствен интелект, за да дадем възможност на хората да поемат отговорност за своето здраве и благополучие. Ето защо моделите за машинно обучение са в основата на продуктите за наука за данни, които разработваме, и затова бяхме привлечени от MLflow, платформа с отворен код, която обхваща всички аспекти на жизнения цикъл на машинното обучение.

MLflow

Основната цел на MLflow е да осигури допълнителен слой върху машинното обучение, което би позволило на специалистите по данни да работят с почти всяка библиотека за машинно обучение (h2o, keras, скок, питорка, sklearn и tensorflow), извеждайки нейната работа на следващото ниво.

MLflow предоставя три компонента:

  • Проследяване – запис и заявки за експерименти: код, данни, конфигурация и резултати. Наблюдението на процеса на създаване на модел е много важно.
  • Проекти – Опаковъчен формат за изпълнение на всяка платформа (напр. SageMaker)
  • Модели – общ формат за подаване на модели към различни инструменти за внедряване.

MLflow (в алфа версия към момента на писане) е платформа с отворен код, която ви позволява да управлявате жизнения цикъл на машинното обучение, включително експериментиране, повторно използване и внедряване.

Настройване на MLflow

За да използвате MLflow, трябва първо да настроите цялата си Python среда, за това ще използваме PyEnv (за да инсталирате Python на Mac, проверете тук). По този начин можем да създадем виртуална среда, в която ще инсталираме всички библиотеки, необходими за стартирането й.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Нека инсталираме необходимите библиотеки.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Забележка: Използваме PyArrow за изпълнение на модели като UDF. Версиите на PyArrow и Numpy трябваше да бъдат коригирани, тъй като последните бяха в конфликт помежду си.

Стартирайте потребителския интерфейс за проследяване

Проследяването на MLflow ни позволява да регистрираме и правим заявки за експерименти с помощта на Python и ПОЧИВКА API. Освен това можете да определите къде да съхранявате артефактите на модела (localhost, Amazon S3, Azure Blob съхранение, Google Cloud Storage или SFTP сървър). Тъй като използваме AWS в Alpha Health, нашето хранилище за артефакти ще бъде S3.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow препоръчва използването на постоянно съхранение на файлове. Съхранението на файлове е мястото, където сървърът ще съхранява метаданни за изпълнение и експеримент. Когато стартирате сървъра, уверете се, че той сочи към хранилището за постоянни файлове. Тук за експеримента просто ще използваме /tmp.

Не забравяйте, че ако искаме да използваме сървъра mlflow за стартиране на стари експерименти, те трябва да присъстват във файловото хранилище. Но и без това бихме могли да ги използваме в СДС, тъй като ни трябва само пътят до модела.

Забележка: Имайте предвид, че потребителският интерфейс за проследяване и клиентът на модела трябва да имат достъп до местоположението на артефакта. Тоест, независимо от факта, че потребителският интерфейс за проследяване се намира в екземпляр на EC2, когато се изпълнява локално MLflow, машината трябва да има директен достъп до S3, за да напише модели на артефакти.

Разширяване на Spark с MLflow
Потребителският интерфейс за проследяване съхранява артефакти в кофа S3

Бягащи модели

Веднага щом сървърът за проследяване стартира, можете да започнете да обучавате моделите.

Като пример ще използваме модификацията на wine от примера на MLflow в Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Както вече обсъдихме, MLflow ви позволява да регистрирате параметри, показатели и артефакти на модела, така че да можете да проследите как се развиват през итерациите. Тази функция е изключително полезна, защото по този начин можем да възпроизведем най-добрия модел, като се свържем със сървъра за проследяване или разберем кой код е изпълнил необходимата итерация, използвайки git хеш регистрационните файлове на ангажиментите.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Разширяване на Spark с MLflow
Вино повторения

Сървърна част за модела

Сървърът за проследяване MLflow, стартиран с помощта на командата „mlflow server“, има REST API за проследяване на стартирания и запис на данни в локалната файлова система. Можете да посочите адреса на сървъра за проследяване, като използвате променливата на средата „MLFLOW_TRACKING_URI“ и API за проследяване на MLflow автоматично ще се свърже със сървъра за проследяване на този адрес, за да създаде/получи информация за стартиране, регистрационни показатели и т.н.

Източник: Документи// Изпълнение на сървър за проследяване

За да предоставим на модела сървър, имаме нужда от работещ сървър за проследяване (вижте интерфейса за стартиране) и Run ID на модела.

Разширяване на Spark с MLflow
Run ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

За да обслужваме модели, използвайки функционалността за обслужване на MLflow, ще ни е необходим достъп до потребителския интерфейс за проследяване, за да получим информация за модела, просто като посочим --run_id.

След като моделът се свърже със сървъра за проследяване, можем да получим нова крайна точка на модела.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Работещи модели от Spark

Въпреки факта, че сървърът за проследяване е достатъчно мощен, за да обслужва модели в реално време, обучете ги и използвайте функционалността на сървъра (източник: mlflow // документи // модели # местни), използването на Spark (партида или стрийминг) е още по-мощно решение поради разпространението.

Представете си, че просто сте извършили обучението офлайн и след това сте приложили изходния модел към всичките си данни. Това е мястото, където Spark и MLflow блестят.

Инсталирайте PySpark + Jupyter + Spark

Източник: Започнете PySpark - Jupyter

За да покажем как прилагаме MLflow модели към рамки с данни на Spark, трябва да настроим преносимите компютри Jupyter да работят заедно с PySpark.

Започнете с инсталиране на най-новата стабилна версия Апачи Спарк:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Инсталирайте PySpark и Jupyter във виртуалната среда:

pip install pyspark jupyter

Настройте променливи на средата:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Като определи notebook-dir, можем да съхраняваме тетрадките си в желаната папка.

Стартиране на Jupyter от PySpark

Тъй като успяхме да конфигурираме Jupiter като драйвер на PySpark, сега можем да стартираме преносим компютър Jupyter в контекста на PySpark.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Разширяване на Spark с MLflow

Както бе споменато по-горе, MLflow предоставя функция за регистриране на артефакти на модела в S3. Веднага след като имаме избрания модел в ръцете си, имаме възможност да го импортираме като UDF с помощта на модула mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Разширяване на Spark с MLflow
PySpark - Извежда прогнози за качеството на виното

До този момент говорихме как да използваме PySpark с MLflow, изпълнявайки прогнози за качеството на виното върху целия набор от данни за вино. Но какво ще стане, ако трябва да използвате модули Python MLflow от Scala Spark?

Тествахме и това, като разделихме контекста на Spark между Scala и Python. Тоест регистрирахме MLflow UDF в Python и го използвахме от Scala (да, може би не най-доброто решение, но това, което имаме).

Scala Spark + MLflow

За този пример ще добавим Ядрото на Тори в съществуващия Юпитер.

Инсталирайте Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Както можете да видите от прикачения бележник, UDF се споделя между Spark и PySpark. Надяваме се, че тази част ще бъде полезна за тези, които обичат Scala и искат да разположат модели за машинно обучение в производството.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Следващи шаги

Въпреки че MLflow е в алфа версия към момента на писане, изглежда доста обещаващо. Само способността да се изпълняват множество рамки за машинно обучение и да се използват от една крайна точка извежда препоръчителните системи на следващото ниво.

Освен това MLflow сближава инженерите по данни и специалистите по наука за данни, поставяйки общ слой между тях.

След това изследване на MLflow сме уверени, че ще продължим напред и ще го използваме за нашите тръбопроводи и препоръчителни системи на Spark.

Би било хубаво да синхронизирате файловото хранилище с базата данни, вместо с файловата система. Това трябва да ни даде множество крайни точки, които могат да използват едно и също файлово хранилище. Например, използвайте няколко екземпляра Престо и Атина със същия Glue metastore.

За да обобщя, бих искал да благодаря на общността на MLFlow, че направи работата ни с данни по-интересна.

Ако си играете с MLflow, не се колебайте да ни пишете и да ни кажете как го използвате и още повече, ако го използвате в производството.

Научете повече за курсовете:
Машинно обучение. Основен курс
Машинно обучение. Курс за напреднали

Прочетете още:

Източник: www.habr.com

Добавяне на нов коментар