Проширување на Spark со MLflow

Здраво, жители на Хабровск. Како што веќе напишавме, овој месец OTUS започнува два курса за машинско учење одеднаш, имено база и напредна. Во овој поглед, продолжуваме да споделуваме корисен материјал.

Целта на оваа статија е да зборуваме за нашето прво искуство со користење MLflow.

Ќе го започнеме прегледот MLflow од неговиот сервер за следење и евидентирај ги сите повторувања на студијата. Потоа ќе го споделиме нашето искуство за поврзување на Spark со MLflow користејќи UDF.

Контекст

Ние сме во Алфа здравје Ние користиме машинско учење и вештачка интелигенција за да ги поттикнеме луѓето да преземат одговорност за нивното здравје и благосостојба. Затоа моделите за машинско учење се во срцето на производите за наука за податоци што ги развиваме, и затоа бевме привлечени од MLflow, платформа со отворен код што ги покрива сите аспекти на животниот циклус на машинското учење.

MLflow

Главната цел на MLflow е да обезбеди дополнителен слој на врвот на машинското учење што ќе им овозможи на научниците за податоци да работат со речиси секоја библиотека за машинско учење (h2o, керас, прескокнување, питорч, учат и tensorflow), однесувајќи ја нејзината работа на следното ниво.

MLflow обезбедува три компоненти:

  • Следење – снимање и барања за експерименти: код, податоци, конфигурација и резултати. Следењето на процесот на креирање модел е многу важно.
  • проекти – Формат на пакување да работи на која било платформа (на пр. SageMaker)
  • Модели – заеднички формат за поднесување модели на различни алатки за распоредување.

MLflow (во алфа во моментот на пишување) е платформа со отворен код која ви овозможува да управувате со животниот циклус на машинско учење, вклучувајќи експериментирање, повторна употреба и распоредување.

Поставување MLflow

За да користите MLflow, прво треба да ја поставите целата ваша околина на Python, за ова ќе ја користиме PyEnv (за да инсталирате Python на Mac, проверете тука). На овој начин можеме да создадеме виртуелна средина каде што ќе ги инсталираме сите библиотеки потребни за нејзино извршување.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Ајде да ги инсталираме потребните библиотеки.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Забелешка: Ние користиме PyArrow за извршување на модели како UDF. Верзиите на PyArrow и Numpy требаше да се поправат бидејќи последните верзии беа во конфликт една со друга.

Стартувајте го корисничкиот интерфејс за следење

Следењето MLflow ни овозможува да евидентираме и да бараме експерименти користејќи Python и ОДМОР API. Покрај тоа, можете да одредите каде да ги чувате артефактите на модели (локален домаќин, Амазон S3, Складирање Azure Blob, Складирање на облак на Google или SFTP сервер). Бидејќи користиме AWS во Alpha Health, нашето складирање на артефакти ќе биде S3.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow препорачува користење на постојано складирање на датотеки. Складирањето на датотеки е местото каде што серверот ќе ги складира метаподатоците за извршени и експерименти. Кога го стартувате серверот, проверете дали укажува на постојаната продавница за датотеки. Тука за експериментот едноставно ќе го искористиме /tmp.

Запомнете дека ако сакаме да го користиме серверот mlflow за да извршиме стари експерименти, тие мора да бидат присутни во складиштето на датотеки. Сепак, и без ова би можеле да ги користиме во UDF, бидејќи ни треба само патот до моделот.

Забелешка: Имајте на ум дека корисничкиот интерфејс за следење и моделот на клиентот мора да имаат пристап до локацијата на артефактот. Односно, без оглед на фактот што интерфејсот за следење се наоѓа во примерок EC2, кога локално работи MLflow, машината мора да има директен пристап до S3 за да пишува модели на артефакти.

Проширување на Spark со MLflow
UI за следење складира артефакти во кофа S3

Модели за трчање

Штом работи серверот за следење, можете да започнете со обука на моделите.

Како пример, ќе ја користиме модификацијата на виното од примерот MLflow во Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Како што веќе разговаравме, MLflow ви овозможува да ги евидентирате параметрите, метриките и артефактите на моделот за да можете да следите како тие се развиваат во текот на повторувањата. Оваа функција е исклучително корисна затоа што на овој начин можеме да го репродуцираме најдобриот модел со контактирање на серверот за следење или разбирање кој код ја извршил потребната итерација користејќи ги git hash логовите на обврзувања.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Проширување на Spark со MLflow
Итерации на вино

Серверски дел за моделот

Серверот за следење MLflow, лансиран со помош на командата „mlflow server“, има REST API за следење на работи и запишување податоци во локалниот датотечен систем. Може да ја одредите адресата на серверот за следење користејќи ја променливата на околината „MLFLOW_TRACKING_URI“, а API-то за следење MLflow автоматски ќе контактира со серверот за следење на оваа адреса за да креира/прима информации за стартување, метрика на дневници итн.

Извор: Документи// Водење сервер за следење

За да му обезбедиме на моделот сервер, потребен ни е сервер за следење што работи (види интерфејс за стартување) и Run ID на моделот.

Проширување на Spark со MLflow
Стартувај ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

За да ги опслужуваме моделите кои ја користат функционалноста на серверот MLflow, ќе ни треба пристап до интерфејсот за следење за да добиваме информации за моделот едноставно со наведување --run_id.

Откако моделот ќе го контактира серверот за следење, можеме да добиеме нова крајна точка на моделот.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Модели за трчање од Spark

И покрај фактот дека серверот за следење е доволно моќен да опслужува модели во реално време, да ги обучи и да ја користи функционалноста на серверот (извор: mlflow // docs // модели # локални), употребата на Spark (серија или стриминг) е уште помоќно решение поради дистрибуцијата.

Замислете дека едноставно сте ја направиле обуката офлајн, а потоа го примените излезниот модел на сите ваши податоци. Тука блескаат Spark и MLflow.

Инсталирајте PySpark + Jupyter + Spark

Извор: Започнете PySpark - Jupyter

За да покажеме како ги применуваме моделите MLflow на податочните рамки на Spark, треба да поставиме тетратки на Jupyter да работат заедно со PySpark.

Започнете со инсталирање на најновата стабилна верзија Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Инсталирајте ги PySpark и Jupyter во виртуелната средина:

pip install pyspark jupyter

Поставете променливи на околината:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Имајќи утврдено notebook-dir, можеме да ги чуваме нашите тетратки во саканата папка.

Лансирање на Jupyter од PySpark

Бидејќи можевме да го конфигурираме Јупитер како драјвер за PySpark, сега можеме да ја стартуваме Jupyter тетратката во контекст на PySpark.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Проширување на Spark со MLflow

Како што споменавме погоре, MLflow обезбедува функција за евидентирање на артефакти на модели во S3. Штом го имаме избраниот модел во наши раце, имаме можност да го увеземе како UDF користејќи го модулот mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Проширување на Spark со MLflow
PySpark - Изнесување предвидувања за квалитетот на виното

До овој момент, разговаравме за тоа како да го користиме PySpark со MLflow, извршувајќи предвидувања за квалитетот на виното на целата база на податоци за вино. Но, што ако треба да користите Python MLflow модули од Scala Spark?

Го тестиравме и ова со разделување на контекстот на Spark помеѓу Scala и Python. Односно, го регистриравме MLflow UDF во Python и го користевме од Scala (да, можеби не е најдоброто решение, но она што го имаме).

Scala Spark + MLflow

За овој пример ќе додадеме Торе кернелот во постоечкиот Јупитер.

Инсталирајте Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Како што можете да видите од приложената тетратка, UDF се дели помеѓу Spark и PySpark. Се надеваме дека овој дел ќе биде корисен за оние кои ја сакаат Scala и сакаат да распоредат модели за машинско учење во производството.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Следни чекори

Иако MLflow е во Алфа верзија за време на пишувањето, изгледа доста ветувачки. Само способноста да се стартуваат повеќе рамки за машинско учење и да се трошат од една крајна точка ги носи системите за препораки на следното ниво.

Покрај тоа, MLflow ги зближува инженерите за податоци и специјалистите за наука за податоци, поставувајќи заеднички слој меѓу нив.

По ова истражување на MLflow, уверени сме дека ќе продолжиме напред и ќе го користиме за нашите Spark цевководи и системи за препораки.

Би било убаво складирањето на датотеки да се синхронизира со базата на податоци наместо со датотечен систем. Ова треба да ни даде повеќе крајни точки кои можат да го користат истото складирање на датотеки. На пример, користете повеќе примероци престо и Атена со истиот метастор на лепак.

Да резимираме, би сакал да се заблагодарам на заедницата MLFlow што ја направи нашата работа со податоци поинтересна.

Ако си играте со MLflow, не двоумете се да ни пишете и да ни кажете како го користите, а уште повеќе ако го користите во производството.

Дознајте повеќе за курсевите:
Машинско учење. Основен курс
Машинско учење. Напреден курс

Прочитај повеќе:

Извор: www.habr.com

Додадете коментар