Spark útwreidzje mei MLflow

Hallo, ynwenners fan Khabrovsk. Lykas wy al skreaun hawwe, lanseart OTUS dizze moanne twa kursussen foar masinelearen tagelyk, nammentlik basis и avansearre. Yn dit ferbân bliuwe wy brûkber materiaal te dielen.

It doel fan dit artikel is om te praten oer ús earste ûnderfining mei it brûken MLflow.

Wy sille de resinsje begjinne MLflow fan syn tracking-tsjinner en log alle iteraasjes fan 'e stúdzje. Dan sille wy ús ûnderfining diele fan it ferbinen fan Spark mei MLflow mei UDF.

Kontekst

Wy binne yn Alpha Health Wy brûke masine learen en keunstmjittige yntelliginsje om minsken yn steat te stellen de lieding te nimmen oer har sûnens en wolwêzen. Dêrom binne modellen foar masine-learen it hert fan 'e produkten foar gegevenswittenskip dy't wy ûntwikkelje, en dat is wêrom't wy waarden oanlutsen troch MLflow, in iepen boarne-platfoarm dat alle aspekten fan' e libbenssyklus fan masinelearen beslacht.

MLflow

It haaddoel fan MLflow is it leverjen fan in ekstra laach boppe op masine learen wêrtroch gegevenswittenskippers kinne wurkje mei hast elke masine-learbibleteek (h2o, keras, mleap, pytorch, skele и tensorflow), nimt har wurk nei it folgjende nivo.

MLflow leveret trije komponinten:

  • tracking - opname en oanfragen foar eksperiminten: koade, gegevens, konfiguraasje en resultaten. It tafersjoch op it proses fan it meitsjen fan in model is heul wichtich.
  • Projects - Ferpakkingsformaat om op elk platfoarm te rinnen (bgl. SageMaker)
  • modellen - in mienskiplik formaat foar it yntsjinjen fan modellen nei ferskate ynset ark.

MLflow (yn alfa op it momint fan skriuwen) is in iepen boarne platfoarm wêrmei jo de libbenssyklus fan masine learen kinne beheare, ynklusyf eksperimintearjen, wergebrûk en ynset.

MLflow ynstelle

Om MLflow te brûken moatte jo earst jo hiele Python-omjouwing ynstelle, hjirfoar sille wy brûke PyEnv (om Python op Mac te ynstallearjen, check out hjir). Op dizze manier kinne wy ​​in firtuele omjouwing meitsje wêr't wy alle biblioteken sille ynstallearje dy't nedich binne om it út te fieren.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Litte wy de fereaske biblioteken ynstallearje.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Opmerking: wy brûke PyArrow om modellen lykas UDF út te fieren. De ferzjes fan PyArrow en Numpy moasten wurde reparearre om't de lêste ferzjes mei elkoar yn konflikt wiene.

Launch Tracking UI

MLflow Tracking lit ús eksperiminten oanmelde en opfreegje mei Python en RÊST API. Derneist kinne jo bepale wêr't jo modelartefakten opslaan (localhost, amazon S3, Azure Blob Storage, Google Cloud Storage of SFTP-tsjinner). Sûnt wy AWS brûke by Alpha Health, sil ús artefaktopslach S3 wêze.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow advisearret it brûken fan persistente triem opslach. Bestânsopslach is wêr't de tsjinner run- en eksperimintmetadata sil opslaan. As jo ​​​​de tsjinner begjinne, soargje derfoar dat it wiist nei de persistente triemwinkel. Hjir foar it eksperimint sille wy gewoan brûke /tmp.

Unthâld dat as wy de mlflow-tsjinner brûke wolle om âlde eksperiminten út te fieren, se moatte oanwêzich wêze yn 'e triem opslach. Ek sûnder dit koene wy ​​se lykwols brûke yn 'e UDF, om't wy allinich it paad nei it model nedich binne.

Opmerking: Hâld der rekken mei dat Tracking UI en de modelkliïnt tagong moatte hawwe ta de artefaktlokaasje. Dat is, nettsjinsteande it feit dat de Tracking UI yn in EC2-eksimplaar wennet, by it útfieren fan MLflow lokaal, moat de masine direkte tagong hawwe ta S3 om artefaktmodellen te skriuwen.

Spark útwreidzje mei MLflow
Tracking UI bewarret artefakten yn in S3-emmer

Running modellen

Sadree't de Tracking-tsjinner rint, kinne jo begjinne mei training fan 'e modellen.

As foarbyld sille wy de wynmodifikaasje brûke fan it MLflow-foarbyld yn Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Lykas wy al besprutsen hawwe, lit MLflow jo modelparameters, metriken en artefakten oanmelde, sadat jo kinne folgje hoe't se evoluearje oer iteraasjes. Dizze funksje is ekstreem nuttich, om't wy op dizze manier it bêste model kinne reprodusearje troch kontakt op te nimmen mei de Tracking-tsjinner of te begripen hokker koade de fereaske iteraasje hat útfierd mei de git-hash-logs fan commits.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Spark útwreidzje mei MLflow
Iteraasjes fan wyn

Tsjinner diel foar it model

De MLflow-tracking-tsjinner, lansearre mei it kommando "mlflow-tsjinner", hat in REST API foar it folgjen fan runen en it skriuwen fan gegevens nei it lokale bestânsysteem. Jo kinne it adres fan de trackingtsjinner opjaan mei de omjouwingsfariabele "MLFLOW_TRACKING_URI" en de MLflow tracking API sil automatysk kontakt opnimme mei de trackingtsjinner op dit adres om lansearynformaasje, logmetriken, ensfh.

Boarne: Docs// In trackingtsjinner útfiere

Om foarsjen it model mei in tsjinner, wy moatte in rinnende tracking tsjinner (sjoch lansearring ynterface) en de Run ID fan it model.

Spark útwreidzje mei MLflow
Run ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Om modellen te tsjinjen mei de MLflow-tsjinstfunksjonaliteit, sille wy tagong hawwe ta de Tracking UI om ynformaasje oer it model te ûntfangen troch gewoan op te jaan --run_id.

Sadree't it model kontakt hat mei de Tracking-tsjinner, kinne wy ​​in nij modeleindpunt krije.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Running modellen út Spark

Nettsjinsteande it feit dat de Tracking-tsjinner krêftich genôch is om modellen yn realtime te tsjinjen, trainje se en brûk de serverfunksjonaliteit (boarne: mlflow // docs // models # local), mei Spark (batch of streaming) is in noch machtiger oplossing fanwegen syn distribúsje.

Stel jo foar dat jo de training gewoan offline hawwe dien en dan it útfiermodel tapast op al jo gegevens. Dit is wêr't Spark en MLflow skine.

Ynstallearje PySpark + Jupyter + Spark

Boarne: Te begjinnen PySpark - Jupyter

Om sjen te litten hoe't wy MLflow-modellen tapasse op Spark-dataframes, moatte wy Jupyter-notebooks ynstelle om gear te wurkjen mei PySpark.

Begjin by it ynstallearjen fan de lêste stabile ferzje Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Ynstallearje PySpark en Jupyter yn 'e firtuele omjouwing:

pip install pyspark jupyter

Omjouwingsfariabelen ynstelle:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Bepaald hawwe notebook-dir, kinne wy ​​ús notebooks opslaan yn 'e winske map.

Launching Jupyter út PySpark

Om't wy Jupiter as PySpark-bestjoerder konfigurearje kinne, kinne wy ​​no Jupyter-notebook útfiere yn 'e kontekst fan PySpark.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Spark útwreidzje mei MLflow

Lykas hjirboppe neamd, leveret MLflow in funksje foar it loggen fan modelartefakten yn S3. Sadree't wy it selektearre model yn ús hannen hawwe, hawwe wy de mooglikheid om it as UDF te ymportearjen mei de module mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Spark útwreidzje mei MLflow
PySpark - Foarsizzingen fan wynkwaliteit útfiere

Oant dit punt hawwe wy praat oer hoe't jo PySpark kinne brûke mei MLflow, it útfieren fan foarsizzingen fan wynkwaliteit op 'e heule wyndataset. Mar wat as jo Python MLflow-modules moatte brûke fan Scala Spark?

Wy testten dit ek troch de Spark-kontekst te splitsen tusken Scala en Python. Dat is, wy registrearre MLflow UDF yn Python, en brûkten it út Scala (ja, miskien net de bêste oplossing, mar wat wy hawwe).

Scala Spark + MLflow

Foar dit foarbyld sille wy tafoegje Toree Kernel yn de besteande Jupiter.

Ynstallearje Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Lykas jo kinne sjen fan it taheakke notebook, wurdt de UDF dield tusken Spark en PySpark. Wy hoopje dat dit diel nuttich sil wêze foar dyjingen dy't fan Scala hâlde en masinelearmodellen yn produksje wolle ynsette.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Folgjende stappen

Sels hoewol MLflow op it momint fan skriuwen yn Alpha-ferzje is, liket it nochal belofte. Allinich de mooglikheid om meardere masine-learkaders út te fieren en se te konsumearjen fan ien einpunt nimt oanbefellingssystemen nei it folgjende nivo.

Derneist bringt MLflow Data Engineers en Data Science-spesjalisten tichter byinoar, en lizze in mienskiplike laach tusken har.

Nei dizze ferkenning fan MLflow binne wy ​​der wis fan dat wy foarút sille gean en it sille brûke foar ús Spark-pipelines en oanbefellingssystemen.

It soe moai wêze om de triem opslach te syngronisearjen mei de databank ynstee fan it bestânsysteem. Dit soe ús meardere einpunten moatte jaan dy't deselde triemopslach kinne brûke. Brûk bygelyks meardere eksimplaren presto и Athena mei deselde Glue metastore.

Om gearfetsje wol ik de MLFlow-mienskip tank sizze foar it meitsjen fan ús wurk mei gegevens ynteressanter.

As jo ​​spielje mei MLflow, aarzel dan net om ús te skriuwen en te fertellen hoe't jo it brûke, en noch mear as jo it brûke yn produksje.

Lês mear oer de kursussen:
Masine learen. Basiskursus
Masine learen. Avansearre kursus

Lês mear:

Boarne: www.habr.com

Add a comment