Розширення можливостей Spark за допомогою MLflow

Привіт, хабрівчани. Як ми вже писали, цього місяця OTUS запускає відразу два курси з машинного навчання, а саме базовый и просунутий. У зв'язку з цим продовжуємо ділитися корисним матеріалом.

Мета цієї статті – розповісти про наш перший досвід використання MLflow.

Ми почнемо огляд MLflow з його tracking-сервера та прологуємо всі ітерації дослідження. Потім поділимося досвідом з'єднання Spark з MLflow за допомогою UDF.

Контекст

Ми в Alpha Health використовуємо машинне навчання та штучний інтелект, щоб дати людям можливість піклуватися про своє здоров'я та благополуччя. Тому моделі машинного навчання лежать в основі продуктів обробки даних, які ми розробляємо, і саме тому нашу увагу привернула MLflow — платформа з відкритим вихідним кодом, яка охоплює всі аспекти життєвого циклу машинного навчання.

MLflow

Основна мета MLflow – забезпечити додатковий шар поверх машинного навчання, який дозволив би фахівцям з data science працювати практично з будь-якою бібліотекою машинного навчання (H2o, керас, mleap, піторх, sklearn и тензорний потік), виводячи її роботу новий рівень.

MLflow забезпечує три компоненти:

  • Відстеження – запис та запити до експериментів: код, дані, конфігурація та результати. Слідкувати за процесом створення моделі дуже важливо.
  • Завдання – Формат упаковки для запуску на будь-якій платформі (наприклад, SageMaker)
  • моделі – загальний формат надсилання моделей до різних інструментів розгортання.

MLflow (на момент написання статті в alpha-версії) — платформа з відкритим вихідним кодом, яка дозволяє керувати життєвим циклом машинного навчання, зокрема експериментами, перевикористанням та розгортанням.

Налаштування MLflow

Для використання MLflow потрібно спочатку налаштувати все середовище Python, для цього ми скористаємося PyEnv (щоб встановити Python на Mac, завітайте сюди). Так ми зможемо створити віртуальне середовище, куди встановимо всі необхідні для запуску бібліотеки.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Встановимо необхідні бібліотеки.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Примітка: ми використовуємо PyArrow для запуску таких моделей, як UDF. Версії PyArrow і Numpy потрібно було виправити, оскільки останні версії конфліктували між собою.

Запускаємо Tracking UI

MLflow Tracking дозволяє нам логувати та робити запити до експериментів за допомогою Python та REST API. Крім цього можна визначити, де зберігати артефакти моделі (localhost, Amazon S3, Зберігання BLOB-об'єктів Azure, Google Cloud Storage або SFTP-сервер). Оскільки в Alpha Health ми користуємося AWS, як сховище артефактів буде S3.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow рекомендує використовувати постійне файлове сховище. Файлове сховище – це місце, де сервер зберігатиме метадані запусків та експериментів. Під час запуску сервера переконайтеся, що він вказує на постійне сховище. Тут для експерименту ми просто скористаємося /tmp.

Пам'ятайте, що якщо ми хочемо використовувати сервер mlflow для запуску старих експериментів, вони повинні бути присутніми у файловому сховищі. Однак і без цього ми змогли б їх використовувати в UDF, оскільки нам потрібен тільки шлях до моделі.

Примітка: Майте на увазі, що Tracking UI та клієнт моделі повинні мати доступ до розташування артефакту. Тобто незалежно від того, що Tracking UI знаходиться в екземплярі EC2, при локальному запуску MLflow у машини повинен бути прямий доступ до S3 для запису моделей артефактів.

Розширення можливостей Spark за допомогою MLflow
Tracking UI зберігає артефакти в бакеті S3

Запуск моделей

Як тільки працюватиме Tracking-сервер, можна починати навчати моделі.

Як приклад ми скористаємося модифікацією wine з прикладу MLflow Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Як ми вже говорили, MLflow дозволяє логувати параметри, метрики та артефакти моделей, щоб можна було відстежувати, як вони розвиваються у міру ітерацій. Ця функція вкрай корисна, оскільки ми зможемо відтворити кращу модель, звернувшись в Tracking-серверу або зрозумівши, який код виконав потрібну ітерацію, скориставшись логами git hash коммітів.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Розширення можливостей Spark за допомогою MLflow
Ітерація wine

Серверна частина для моделі

Tracking-сервер MLflow, запущений за допомогою команди “mlflow server”, має REST API для відстеження запусків та запису даних у локальну файлову систему. Ви можете вказати адресу tracking-сервера за допомогою змінного середовища «MLFLOW_TRACKING_URI» та tracking API MLflow автоматично зв'яжеться з tracking-сервером за цією адресою, щоб створити/отримати інформацію про запуск, метрики логів тощо.

Джерело: Docs// Running a tracking server

Щоб забезпечити модель сервером, нам знадобиться запущений tracking-сервер (див. інтерфейс запуску) і Run ID моделі.

Розширення можливостей Spark за допомогою MLflow
Ідентифікатор запуску

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Для обслуговування моделей за допомогою функціоналу MLflow serve нам знадобиться доступ до Tracking UI, щоб отримувати інформацію про модель просто вказавши --run_id.

Як тільки модель зв'язується з Tracking-сервером, ми можемо отримати нову кінцеву точку моделі.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Запуск моделей із Spark

Незважаючи на те, що Tracking-сервер досить потужний для обслуговування моделей в режимі реального часу, їх навчання та використання функціоналуserve (джерело: mlflow // docs // models # local), застосування Spark (batch чи streaming) – ще потужніше рішення з допомогою розподіленості.

Уявіть, що ви просто провели навчання в офлайні, а потім застосували вихідну модель до всіх ваших даних. Саме тут Spark та MLflow покажуть себе з кращого боку.

Встановлюємо PySpark + Jupyter + Spark

Джерело: Get started PySpark - Jupyter

Щоб показати, як ми застосовуємо моделі MLflow до датафрейму Spark, потрібно налаштувати спільну роботу Jupyter notebooks з PySpark.

Почніть із встановлення останньої стабільної версії Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Встановіть PySpark та Jupyter у віртуальне середовище:

pip install pyspark jupyter

Налаштуйте змінні середовища:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Визначивши notebook-dir, ми зможемо зберігати наші notebook-і в бажаній папці.

Запускаємо Jupyter із PySpark

Оскільки ми змогли налаштувати Jupiter як драйвер PySpark, тепер ми можемо запускати Jupyter notebook у контексті PySpark.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Розширення можливостей Spark за допомогою MLflow

Як було сказано вище, MLflow надає функцію логування артефактів моделі S3. Як тільки у нас з'являється обрана модель, ми маємо можливість імпортувати її як UDF за допомогою модуля mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Розширення можливостей Spark за допомогою MLflow
PySpark – Висновок прогнозу якості вина

До цього ми говорили про те, як використовувати PySpark з MLflow, запускаючи прогнозування якості вина на всьому наборі даних wine. Але що робити, якщо потрібно використовувати модулі Python MLflow із Scala Spark?

Ми протестували і це, розділивши контекст Spark між Scala та Python. Тобто ми зареєстрували MLflow UDF у Python, і використовували його з Scala (так, можливо, не найкраще рішення, але що маємо).

Scala Spark + MLflow

Для цього прикладу ми додамо Toree Kernel у існуючий Jupiter.

Встановлюємо Spark+Toree+Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Як видно з прикріпленого notebook-а, UDF використовується спільно Spark та PySpark. Ми сподіваємося, що ця частина буде корисною для тих, хто любить Scala і хоче розгорнути моделі машинного навчання на продакшені.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

наступні кроки

Незважаючи на те, що на момент написання статті MLflow знаходиться в Alpha-версії, вона виглядає досить багатообіцяюче. Одна лише можливість запускати кілька фреймворків машинного навчання та використовувати їх із однієї кінцевої точки виводить рекомендаційні системи на новий рівень.

До того ж, MLflow зближує Data-інженерів та фахівців з Data Science, прокладаючи між ними загальний шар.

Після цього дослідження MLflow ми впевнені, що підемо далі і будемо використовувати її для наших пайплайнів Spark і в рекомендаційних системах.

Було б непогано синхронізувати файлове сховище з базою даних замість файлової системи. Так ми повинні отримати кілька кінцевих точок, які можуть використовувати те саме файлове сховище. Наприклад, використовувати кілька екземплярів Престо и Афіна з тим самим Glue metastore.

Підбиваючи підсумки, хочеться подякувати спільноті MLFlow за те, що робите нашу роботу з даними цікавіше.

Якщо ви граєтеся з MLflow, не соромтеся писати нам і розповідати, як ви його використовуєте, і тим більше якщо використовуєте його на продакшені.

Дізнатись докладніше про курси:
Machine Learning. Базовий курс
Machine Learning. Просунутий курс

Читати ще:

Джерело: habr.com

Додати коментар або відгук