Pwolonje Spark ak MLflow

Bonjou, rezidan Khabrovsk yo. Kòm nou te deja ekri, mwa sa a OTUS ap lanse de kou aprantisaj machin alafwa, sètadi baz и avanse. Nan sans sa a, nou kontinye pataje materyèl itil.

Objektif atik sa a se pale sou premye eksperyans nou itilize MLflow.

Nou pral kòmanse revizyon an MLflow soti nan sèvè swiv li yo epi konekte tout iterasyon etid la. Lè sa a, nou pral pataje eksperyans nou nan konekte Spark ak MLflow lè l sèvi avèk UDF.

Kontèks

Nou nan Alpha Sante Nou itilize aprantisaj machin ak entèlijans atifisyèl pou pèmèt moun yo pran chaj sante yo ak byennèt yo. Se poutèt sa modèl aprantisaj machin yo nan kè pwodwi syans done nou devlope yo, e se poutèt sa nou te atire MLflow, yon platfòm sous louvri ki kouvri tout aspè nan sik lavi aprantisaj machin yo.

MLflow

Objektif prensipal MLflow se bay yon kouch adisyonèl sou tèt aprantisaj machin ki ta pèmèt syantis done yo travay ak prèske nenpòt bibliyotèk aprantisaj machin (h2o, keras, meleap, pytorch, sklearn и tensorflow), pran travay li nan yon nivo pwochen.

MLflow bay twa konpozan:

  • Swiv – anrejistreman ak demann pou eksperyans: kòd, done, konfigirasyon ak rezilta. Siveyans pwosesis pou kreye yon modèl trè enpòtan.
  • Pwojè – Fòma anbalaj pou kouri sou nenpòt platfòm (egzanp. SageMaker)
  • Modèl – yon fòma komen pou soumèt modèl nan divès zouti deplwaman.

MLflow (nan alfa nan moman w ap ekri a) se yon platfòm sous louvri ki pèmèt ou jere sik lavi aprantisaj machin yo, ki gen ladan eksperimantasyon, reitilizasyon, ak deplwaman.

Mete kanpe MLflow

Pou itilize MLflow ou bezwen premye mete tout anviwònman Python ou, pou sa n ap itilize PyEnv (pou enstale Python sou Mac, tcheke deyò isit la). Nan fason sa a nou ka kreye yon anviwònman vityèl kote nou pral enstale tout bibliyotèk ki nesesè yo kouri li.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Ann enstale bibliyotèk ki nesesè yo.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Remak: Nou itilize PyArrow pou kouri modèl tankou UDF. Vèsyon PyArrow ak Numpy te bezwen fikse paske dènye vèsyon yo te konfli youn ak lòt.

Lanse Tracking UI

MLflow Tracking pèmèt nou konekte ak rechèch eksperyans lè l sèvi avèk Python ak REST API. Anplis de sa, ou ka detèmine ki kote yo estoke zafè modèl (localhost, Amazon S3, Azure Blob Depo, Google Cloud Depo oswa Sèvè SFTP). Depi nou itilize AWS nan Alpha Health, depo zafè nou an pral S3.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow rekòmande pou itilize depo dosye ki pèsistan. Depo dosye se kote sèvè a pral estoke Metadata kouri ak eksperyans. Lè w kòmanse sèvè a, asire w ke li pwen nan magazen an fichye ki pèsistan. Isit la pou eksperyans la nou pral tou senpleman itilize /tmp.

Sonje ke si nou vle sèvi ak sèvè mlflow la pou kouri ansyen eksperyans, yo dwe prezan nan depo dosye a. Sepandan, menm san sa a nou ta ka itilize yo nan UDF a, paske nou sèlman bezwen chemen an nan modèl la.

Remak: Kenbe nan tèt ou ke Tracking UI ak modèl kliyan an dwe gen aksè a kote objè a. Sa vle di, kèlkeswa lefèt ke Tracking UI a abite nan yon egzanp EC2, lè w ap kouri MLflow lokalman, machin nan dwe gen aksè dirèk nan S3 pou ekri modèl asosye.

Pwolonje Spark ak MLflow
Tracking UI estoke zafè nan yon bokit S3

Kouri modèl

Le pli vit ke sèvè Tracking la ap kouri, ou ka kòmanse fòme modèl yo.

Kòm yon egzanp, nou pral itilize modifikasyon diven ki soti nan egzanp MLflow nan Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Kòm nou te deja diskite, MLflow pèmèt ou konekte paramèt modèl, mezi, ak zafè pou ou ka swiv kijan yo evolye sou iterasyon. Karakteristik sa a trè itil paske fason sa a nou ka repwodui pi bon modèl la lè nou kontakte sèvè Tracking la oswa konprann ki kòd ki fè iterasyon ki nesesè yo lè l sèvi avèk git hash logs yo nan komèt.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Pwolonje Spark ak MLflow
Iterasyon diven

Pati sèvè pou modèl la

Sèvè Suivi MLflow la, ki te lanse lè l sèvi avèk kòmandman "mlflow sèvè", gen yon API REST pou swiv kouri ak ekri done nan sistèm dosye lokal la. Ou ka presize adrès sèvè swiv la lè l sèvi avèk varyab anviwònman an "MLFLOW_TRACKING_URI" epi MLflow tracking API la pral otomatikman kontakte sèvè swiv la nan adrès sa a pou kreye/resevwa enfòmasyon sou lanse, mezi log, elatriye.

Sous: Docs// Kouri yon sèvè swiv

Pou bay modèl la yon sèvè, nou bezwen yon sèvè swiv kouri (gade koòdone lansman) ak ID Run nan modèl la.

Pwolonje Spark ak MLflow
Kouri ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Pou sèvi modèl lè l sèvi avèk fonksyonalite MLflow sèvi, nou pral bezwen aksè a Tracking UI pou resevwa enfòmasyon sou modèl la tou senpleman lè nou espesifye --run_id.

Yon fwa modèl la kontakte sèvè Tracking la, nou ka jwenn yon nouvo pwen final modèl.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Kouri modèl soti nan Spark

Malgre lefèt ke sèvè Tracking la pwisan ase pou sèvi modèl an tan reyèl, fòme yo epi sèvi ak fonksyonalite sèvè a (sous: mlflow // docs // modèl # lokal), itilizasyon Spark (pakèt oswa difizyon) se yon solisyon menm pi pwisan akòz distribisyon.

Imajine ke ou tou senpleman te fè fòmasyon an offline ak Lè sa a, aplike modèl pwodiksyon an nan tout done ou yo. Sa a se kote Spark ak MLflow klere.

Enstale PySpark + Jupyter + Spark

Sous: Kòmanse PySpark - Jupyter

Pou montre kouman nou aplike modèl MLflow nan Spark dataframes, nou bezwen mete kanpe kaye Jupyter pou travay ansanm ak PySpark.

Kòmanse pa enstale dènye vèsyon ki estab Apache etensèl:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Enstale PySpark ak Jupyter nan anviwònman vityèl la:

pip install pyspark jupyter

Mete kanpe varyab anviwònman yo:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Èske w gen detèmine notebook-dir, nou ka estoke kaye nou yo nan katab la vle.

Lanse Jupyter soti nan PySpark

Depi nou te kapab konfigirasyon Jupiter kòm yon chofè PySpark, kounye a nou ka kouri kaye Jupyter nan yon kontèks PySpark.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Pwolonje Spark ak MLflow

Kòm mansyone pi wo a, MLflow bay yon karakteristik pou konekte zafè modèl nan S3. Le pli vit ke nou gen modèl la chwazi nan men nou, nou gen opòtinite pou enpòte li kòm yon UDF lè l sèvi avèk modil la. mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Pwolonje Spark ak MLflow
PySpark - Pwodiksyon prediksyon bon jan kalite diven

Jiska pwen sa a, nou te pale sou ki jan yo sèvi ak PySpark ak MLflow, kouri prediksyon bon jan kalite diven sou tout seri done diven an. Men, e si ou bezwen sèvi ak modil Python MLflow soti nan Scala Spark?

Nou teste sa a tou lè nou divize kontèks Spark la ant Scala ak Python. Sa vle di, nou anrejistre MLflow UDF nan Python, epi nou itilize li nan Scala (wi, petèt pa pi bon solisyon an, men sa nou genyen).

Scala Spark + MLflow

Pou egzanp sa a nou pral ajoute Toree Kernel nan Jipitè ki egziste deja.

Enstale Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Kòm ou ka wè nan kaye ki tache a, UDF la pataje ant Spark ak PySpark. Nou espere pati sa a pral itil pou moun ki renmen Scala epi ki vle deplwaye modèl aprantisaj machin nan pwodiksyon an.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Pwochen etap yo

Menm si MLflow se nan vèsyon Alpha nan moman sa a ekri, li sanble byen pwomèt. Jis kapasite nan kouri plizyè kad aprantisaj machin ak konsome yo soti nan yon sèl pwen final mennen sistèm rekòmandasyon nan pwochen nivo.

Anplis de sa, MLflow pote Enjenyè Done ak espesyalis Syans Done pi pre ansanm, mete yon kouch komen ant yo.

Apre eksplorasyon sa a nan MLflow, nou gen konfyans ke nou pral avanse epi sèvi ak li pou tiyo Spark nou yo ak sistèm rekòmandasyon.

Li ta bon pou senkronize depo dosye a ak baz done olye pou yo sistèm dosye a. Sa a ta dwe ban nou plizyè pwen final ki ka itilize menm depo dosye a. Pou egzanp, sèvi ak plizyè ka Presto и Athena ak menm Glue metastore a.

Pou rezime, mwen ta renmen di mèsi ak kominote MLFlow la pou fè travay nou an ak done pi enteresan.

Si w ap jwe ak MLflow, pa ezite ekri nou epi di nou ki jan ou sèvi ak li, e plis toujou si ou itilize li nan pwodiksyon.

Jwenn plis enfòmasyon sou kou yo:
Aprantisaj machin. Kou debaz
Aprantisaj machin. Kou avanse

Li piplis:

Sous: www.habr.com

Add nouvo kòmantè