Extinderea Spark cu MLflow

Bună ziua, locuitorii din Khabrovsk. După cum am scris deja, luna aceasta OTUS lansează două cursuri de învățare automată simultan, și anume baza и avansat. În acest sens, continuăm să împărtășim materiale utile.

Scopul acestui articol este de a vorbi despre prima noastră experiență de utilizare MLflow.

Vom începe revizuirea MLflow de pe serverul său de urmărire și înregistrați toate iterațiile studiului. Apoi vom împărtăși experiența noastră de conectare a Spark cu MLflow folosind UDF.

context

Noi suntem in Alpha Health Folosim învățarea automată și inteligența artificială pentru a le permite oamenilor să preia controlul asupra sănătății și bunăstării lor. De aceea, modelele de învățare automată se află în centrul produselor de știință a datelor pe care le dezvoltăm și de aceea am fost atrași de MLflow, o platformă open source care acoperă toate aspectele ciclului de viață al învățării automate.

MLflow

Scopul principal al MLflow este de a oferi un strat suplimentar peste învățarea automată, care ar permite oamenilor de știință să lucreze cu aproape orice bibliotecă de învățare automată (h2o, keras, mleap, pirtorh, sklearn и tensorflow), ducându-și munca la următorul nivel.

MLflow oferă trei componente:

  • Urmărire – înregistrarea și solicitările de experimente: cod, date, configurație și rezultate. Monitorizarea procesului de creare a unui model este foarte importantă.
  • Proiecte – Format de ambalare pentru a rula pe orice platformă (de ex. SageMaker)
  • modele – un format comun pentru transmiterea modelelor către diverse instrumente de implementare.

MLflow (în alfa la momentul scrierii) este o platformă open source care vă permite să gestionați ciclul de viață al învățării automate, inclusiv experimentarea, reutilizarea și implementarea.

Configurarea MLflow

Pentru a utiliza MLflow, trebuie mai întâi să configurați întregul mediu Python, pentru asta vom folosi PyEnv (pentru a instala Python pe Mac, verificați aici). Astfel putem crea un mediu virtual unde vom instala toate bibliotecile necesare rulării acestuia.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Să instalăm bibliotecile necesare.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Notă: folosim PyArrow pentru a rula modele precum UDF. Versiunile PyArrow și Numpy trebuiau remediate, deoarece versiunile din urmă intrau în conflict între ele.

Lansați UI de urmărire

MLflow Tracking ne permite să înregistrăm și să interogăm experimente folosind Python și REST API. În plus, puteți determina unde să stocați artefactele modelului (localhost, Amazon S3, Stocare Blob Azure, Google Cloud Storage sau Server SFTP). Deoarece folosim AWS la Alpha Health, stocarea artefactelor va fi S3.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow recomandă utilizarea stocării de fișiere persistente. Stocarea fișierelor este locul în care serverul va stoca metadatele de rulare și experimente. Când porniți serverul, asigurați-vă că acesta indică către depozitul de fișiere persistente. Aici pentru experiment îl vom folosi pur și simplu /tmp.

Rețineți că, dacă vrem să folosim serverul mlflow pentru a rula experimente vechi, acestea trebuie să fie prezente în stocarea fișierelor. Cu toate acestea, chiar și fără acest lucru, le-am putea folosi în UDF, deoarece avem nevoie doar de calea către model.

Notă: rețineți că UI de urmărire și clientul model trebuie să aibă acces la locația artefactului. Adică, indiferent de faptul că UI de urmărire se află într-o instanță EC2, atunci când rulează MLflow local, mașina trebuie să aibă acces direct la S3 pentru a scrie modele de artefact.

Extinderea Spark cu MLflow
UI de urmărire stochează artefacte într-o găleată S3

Modele de alergare

De îndată ce serverul de urmărire rulează, puteți începe antrenamentul modelelor.

Ca exemplu, vom folosi modificarea vinului din exemplul MLflow în Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

După cum am discutat deja, MLflow vă permite să înregistrați parametrii modelului, valorile și artefactele, astfel încât să puteți urmări modul în care acestea evoluează pe parcursul iterațiilor. Această caracteristică este extrem de utilă deoarece astfel putem reproduce cel mai bun model contactând serverul de urmărire sau înțelegerea codului care a finalizat iterația necesară folosind jurnalele git hash ale commit-urilor.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Extinderea Spark cu MLflow
Iterații de vin

Piesa server pentru model

Serverul de urmărire MLflow, lansat folosind comanda „server mlflow”, are un API REST pentru urmărirea rulărilor și scrierea datelor în sistemul de fișiere local. Puteți specifica adresa serverului de urmărire folosind variabila de mediu „MLFLOW_TRACKING_URI”, iar API-ul de urmărire MLflow va contacta automat serverul de urmărire la această adresă pentru a crea/primi informații de lansare, valori de jurnal etc.

Sursa: Documente// Rularea unui server de urmărire

Pentru a furniza modelului un server, avem nevoie de un server de urmărire care rulează (vezi interfața de lansare) și ID-ul de rulare al modelului.

Extinderea Spark cu MLflow
Run ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Pentru a difuza modele folosind funcționalitatea de servire MLflow, vom avea nevoie de acces la interfața de utilizare de urmărire pentru a primi informații despre model pur și simplu specificând --run_id.

Odată ce modelul contactează serverul de urmărire, putem obține un nou punct final de model.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Modele de rulare de la Spark

În ciuda faptului că serverul de urmărire este suficient de puternic pentru a servi modele în timp real, antrenați-le și utilizați funcționalitatea serverului (sursa: mlflow // documente // modele # local), utilizarea Spark (batch sau streaming) este o soluție și mai puternică datorită distribuției.

Imaginați-vă că pur și simplu ați făcut antrenamentul offline și apoi ați aplicat modelul de ieșire tuturor datelor. Aici strălucesc Spark și MLflow.

Instalați PySpark + Jupyter + Spark

Sursa: Începeți PySpark - Jupyter

Pentru a arăta cum aplicăm modelele MLflow cadrelor de date Spark, trebuie să setăm notebook-uri Jupyter pentru a funcționa împreună cu PySpark.

Începeți prin a instala cea mai recentă versiune stabilă Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Instalați PySpark și Jupyter în mediul virtual:

pip install pyspark jupyter

Configurați variabilele de mediu:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

După ce a hotărât notebook-dir, ne putem stoca caietele în folderul dorit.

Lansarea Jupyter din PySpark

Deoarece am putut configura Jupiter ca driver PySpark, acum putem rula notebook-ul Jupyter în contextul PySpark.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Extinderea Spark cu MLflow

După cum sa menționat mai sus, MLflow oferă o caracteristică pentru înregistrarea artefactelor modelului în S3. De îndată ce avem în mâini modelul selectat, avem posibilitatea de a-l importa ca UDF folosind modulul mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Extinderea Spark cu MLflow
PySpark - Emite previziuni privind calitatea vinului

Până în acest moment, am vorbit despre cum să utilizați PySpark cu MLflow, rulând predicții privind calitatea vinului pe întregul set de date despre vin. Dar ce se întâmplă dacă trebuie să utilizați module Python MLflow de la Scala Spark?

Am testat și acest lucru prin împărțirea contextului Spark între Scala și Python. Adică am înregistrat MLflow UDF în Python și l-am folosit de la Scala (da, poate nu este cea mai bună soluție, dar ce avem).

Scala Spark + MLflow

Pentru acest exemplu vom adăuga Toree Kernel în Jupiterul existent.

Instalați Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

După cum puteți vedea din blocnotesul atașat, UDF-ul este partajat între Spark și PySpark. Sperăm că această parte va fi utilă celor care iubesc Scala și doresc să implementeze modele de învățare automată în producție.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Pașii următori

Chiar dacă MLflow este în versiunea Alpha la momentul scrierii, pare destul de promițător. Doar capacitatea de a rula mai multe cadre de învățare automată și de a le consuma de la un singur punct final duce sistemele de recomandare la nivelul următor.

În plus, MLflow aduce inginerii de date și specialiștii în știința datelor mai aproape, punând un strat comun între ei.

După această explorare a MLflow, suntem încrezători că vom merge mai departe și îl vom folosi pentru conductele noastre Spark și sistemele de recomandare.

Ar fi bine să sincronizați stocarea fișierelor cu baza de date în loc de sistemul de fișiere. Acest lucru ar trebui să ne ofere mai multe puncte finale care pot folosi aceeași stocare de fișiere. De exemplu, utilizați mai multe instanțe Presto и Athena cu același metastore Glue.

Pentru a rezuma, aș dori să mulțumesc comunității MLFlow pentru că ne-a făcut munca cu date mai interesantă.

Dacă te joci cu MLflow, nu ezita să ne scrii și să ne spui cum îl folosești și cu atât mai mult dacă îl folosești în producție.

Aflați mai multe despre cursuri:
Învățare automată. Curs de bază
Învățare automată. Curs avansat

Citeste mai mult:

Sursa: www.habr.com

Adauga un comentariu