Spark MLflow көмегімен кеңейтілуде

Сәлеметсіздер ме, Хабаровск қаласының тұрғындары. Біз бұрын жазғанымыздай, осы айда OTUS бірден екі машиналық оқыту курсын бастайды, атап айтқанда негіз и озат. Осыған байланысты біз пайдалы материалмен бөлісуді жалғастырамыз.

Бұл мақаланың мақсаты - бірінші қолдану тәжірибеміз туралы айту MLflow.

Біз шолуды бастаймыз MLflow оның бақылау серверінен және зерттеудің барлық итерацияларын журналға жазыңыз. Содан кейін біз UDF көмегімен Spark-ті MLflow-қа қосу тәжірибемізбен бөлісеміз.

Контекст

Біз кіреміз Альфа денсаулық Біз адамдарға өз денсаулығы мен әл-ауқатына жауап беруге мүмкіндік беру үшін машиналық оқытуды және жасанды интеллектті қолданамыз. Сондықтан машиналық оқыту үлгілері біз әзірлейтін деректер ғылымы өнімдерінің негізі болып табылады, сондықтан біз машинаны оқытудың өмірлік циклінің барлық аспектілерін қамтитын ашық бастапқы коды MLflow платформасына тартылдық.

MLflow

MLflow бағдарламасының негізгі мақсаты - деректерді зерттеушілерге машиналық оқытудың кез келген дерлік кітапханасымен жұмыс істеуге мүмкіндік беретін машиналық оқытудың үстіне қосымша қабат беру (сағ, кера, секіру, питорч, склерн и тензорфлоу), оның жұмысын келесі деңгейге көтеру.

MLflow үш компонентті қамтамасыз етеді:

  • Tracking – тәжірибелерді жазу және сұраулар: код, деректер, конфигурация және нәтижелер. Модельді құру процесін бақылау өте маңызды.
  • жобалар – Кез келген платформада іске қосу үшін қаптама пішімі (мысалы, SageMaker)
  • Models – әртүрлі орналастыру құралдарына үлгілерді жіберудің жалпы пішімі.

MLflow (жазу кезінде альфада) - эксперимент, қайта пайдалану және орналастыруды қоса, машиналық оқытудың өмірлік циклін басқаруға мүмкіндік беретін ашық бастапқы платформа.

MLflow орнату

MLflow пайдалану үшін алдымен бүкіл Python ортасын орнату керек, ол үшін біз қолданамыз PyEnv (Mac жүйесіне Python орнату үшін, тексеріңіз осында). Осылайша біз виртуалды орта жасай аламыз, онда біз оны іске қосу үшін барлық қажетті кітапханаларды орнатамыз.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Қажетті кітапханаларды орнатайық.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Ескерту: UDF сияқты үлгілерді іске қосу үшін PyArrow пайдаланамыз. PyArrow және Numpy нұсқаларын түзету қажет болды, себебі соңғы нұсқалар бір-бірімен қайшы келді.

Tracking UI іске қосыңыз

MLflow Tracking Python және көмегімен эксперименттерді тіркеуге және сұрауға мүмкіндік береді REST API. Оған қоса, үлгі артефактілерін қайда сақтау керектігін анықтауға болады (localhost, Amazon S3, Azure Blob сақтау орны, Google Cloud Storage немесе SFTP сервері). Біз Alpha Health-те AWS қолданатындықтан, артефакт қоймамыз S3 болады.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow тұрақты файлдарды сақтауды пайдалануды ұсынады. Файл қоймасы - сервер іске қосылған және тәжірибе метадеректерін сақтайтын орын. Серверді іске қосқан кезде оның тұрақты файлдар қоймасына нұсқайтынына көз жеткізіңіз. Мұнда эксперимент үшін біз жай ғана қолданамыз /tmp.

Есіңізде болсын, егер біз ескі эксперименттерді іске қосу үшін mlflow серверін пайдаланғымыз келсе, олар файл қоймасында болуы керек. Дегенмен, онсыз да біз оларды UDF-де пайдалана аламыз, өйткені бізге тек үлгіге жол қажет.

Ескертпе: Tracking UI және үлгі клиентінің артефакт орнына кіру мүмкіндігі болуы керек екенін есте сақтаңыз. Яғни, Tracking UI EC2 данасында орналасқанына қарамастан, MLflow жергілікті түрде іске қосылғанда, артефакт үлгілерін жазу үшін құрылғының S3 жүйесіне тікелей қатынасы болуы керек.

Spark MLflow көмегімен кеңейтілуде
Бақылау интерфейсі артефактілерді S3 шелегінде сақтайды

Іске қосылған модельдер

Бақылау сервері іске қосылғаннан кейін үлгілерді үйретуді бастауға болады.

Мысал ретінде біз MLflow мысалындағы шарап модификациясын қолданамыз Склерн.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Жоғарыда талқылағанымыздай, MLflow үлгісі параметрлерін, метрикасын және артефактілерін журналға жазуға мүмкіндік береді, осылайша олардың итерациялар бойынша қалай дамып жатқанын бақылай аласыз. Бұл мүмкіндік өте пайдалы, өйткені осылайша біз Бақылау серверіне хабарласу арқылы ең жақсы үлгіні шығара аламыз немесе git хэш журналдарын орындау арқылы қажетті итерацияны қай код орындағанын түсінуге болады.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Spark MLflow көмегімен кеңейтілуде
Шарап итерациялары

Үлгіге арналған сервер бөлігі

«mlflow server» пәрмені арқылы іске қосылған MLflow бақылау серверінде іске қосуларды қадағалау және жергілікті файлдық жүйеге деректерді жазу үшін REST API бар. Бақылау серверінің мекенжайын "MLFLOW_TRACKING_URI" айнымалы ортасын пайдаланып көрсетуге болады және MLflow бақылау API іске қосу ақпаратын, журнал көрсеткіштерін және т.б. жасау/қабылдау үшін осы мекенжайдағы бақылау серверімен автоматты түрде байланысады.

Ақпарат көзі: Docs// Бақылау серверін іске қосу

Модельді сервермен қамтамасыз ету үшін бізге іске қосылған бақылау сервері (іске қосу интерфейсін қараңыз) және үлгінің іске қосу идентификаторы қажет.

Spark MLflow көмегімен кеңейтілуде
Іске қосу идентификаторы

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

MLflow қызмет көрсету мүмкіндігін пайдаланатын үлгілерге қызмет көрсету үшін бізге үлгі туралы ақпаратты жай көрсету арқылы алу үшін Tracking UI-ге кіру қажет болады. --run_id.

Модель Бақылау серверімен байланысқаннан кейін біз жаңа үлгінің соңғы нүктесін ала аламыз.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Spark ұсынған үлгілер

Бақылау сервері нақты уақыт режимінде үлгілерге қызмет көрсету, оларды үйрету және сервердің функционалдығын пайдалану үшін жеткілікті қуатты болғанына қарамастан (көзі: mlflow // құжаттар // үлгілер # жергілікті), Spark (топтама немесе ағынды) пайдалану оның таралуына байланысты одан да күшті шешім болып табылады.

Жаттығуды офлайн режимінде орындағаныңызды, содан кейін барлық деректеріңізге шығыс үлгісін қолданғаныңызды елестетіңіз. Бұл жерде Spark және MLflow жарқырайды.

PySpark + Jupyter + Spark орнатыңыз

Ақпарат көзі: Жұмысты бастау PySpark - Jupyter

Spark деректер фреймдеріне MLflow үлгілерін қалай қолданатынымызды көрсету үшін PySpark бағдарламасымен бірге жұмыс істеу үшін Jupyter жазу кітапшаларын орнату керек.

Ең соңғы тұрақты нұсқаны орнату арқылы бастаңыз Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Виртуалды ортада PySpark және Jupyter орнатыңыз:

pip install pyspark jupyter

Ортаның айнымалы мәндерін орнату:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Анықтап notebook-dir, біз дәптерімізді қалаған қалтаға сақтай аламыз.

PySpark-тен Jupyter іске қосылды

Біз Юпитерді PySpark драйвері ретінде конфигурациялай алғандықтан, енді Jupyter жазу кітапшасын PySpark контекстінде іске қоса аламыз.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Spark MLflow көмегімен кеңейтілуде

Жоғарыда айтылғандай, MLflow S3 жүйесінде үлгі артефактілерін тіркеу мүмкіндігін береді. Таңдалған үлгі қолымызда болғаннан кейін оны модуль арқылы UDF ретінде импорттау мүмкіндігіміз бар mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Spark MLflow көмегімен кеңейтілуде
PySpark - шарап сапасының болжамдарын шығару

Осы уақытқа дейін біз шараптың барлық деректер жинағында шарап сапасының болжамдарын орындайтын PySpark-ті MLflow көмегімен қалай пайдалану керектігі туралы сөйлестік. Бірақ Scala Spark-тен Python MLflow модульдерін пайдалану қажет болса ше?

Біз мұны Scala және Python арасында Spark контекстін бөлу арқылы сынадық. Яғни, біз MLflow UDF-ті Python-да тіркедік және оны Scala-дан қолдандық (иә, мүмкін ең жақсы шешім емес, бірақ бізде бар).

Scala Spark + MLflow

Бұл мысал үшін біз қосамыз Төре ядросы бар Юпитерге.

Spark + Toree + Jupyter орнатыңыз

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Қосылған жазу кітапшасынан көріп отырғаныңыздай, UDF Spark және PySpark арасында ортақ пайдаланылады. Бұл бөлім Scala-ны жақсы көретін және өндірісте машиналық оқыту үлгілерін қолданғысы келетіндерге пайдалы болады деп үміттенеміз.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Келесі қадамдар

Жазу кезінде MLflow Альфа нұсқасында болса да, ол өте перспективалы болып көрінеді. Бірнеше машиналық оқыту жүйесін іске қосу және оларды бір соңғы нүктеден тұтыну мүмкіндігі ғана кеңес беру жүйелерін келесі деңгейге көтереді.

Сонымен қатар, MLflow деректер инженерлері мен деректер ғылымының мамандарын бір-біріне жақындатып, олардың арасында ортақ қабат жасайды.

MLflow осы барлаудан кейін біз алға жылжып, оны Spark құбырлары мен кеңес беру жүйелері үшін қолданатынымызға сенімдіміз.

Файлдық жүйенің орнына файл қоймасын дерекқормен синхрондау жақсы болар еді. Бұл бізге бір файл сақтау орнын пайдалана алатын бірнеше соңғы нүктелерді беруі керек. Мысалы, бірнеше дананы пайдаланыңыз Presto и Афина бірдей Glue мета қоймасымен.

Қорытындылай келе, MLFlow қауымдастығына деректермен жұмысымызды қызықтырақ еткені үшін алғыс айтқым келеді.

Егер сіз MLflow-пен ойнап жатсаңыз, бізге хат жазудан тартынбаңыз және оны қалай пайдаланатыныңызды айтыңыз, тіпті егер сіз оны өндірісте қолдансаңыз.

Курстар туралы көбірек біліңіз:
Машиналық оқыту. Негізгі курс
Машиналық оқыту. Жетілдірілген курс

Ары қарай оқу:

Ақпарат көзі: www.habr.com

пікір қалдыру