Txuas Spark nrog MLflow

Nyob zoo, cov neeg nyob hauv Khabrovsk. Raws li peb twb tau sau lawm, lub hlis no OTUS tab tom pib ob chav kawm tshuab ib zaug, uas yog puag ΠΈ siab heev. Hauv qhov no, peb txuas ntxiv muab cov ntaub ntawv tseem ceeb.

Lub hom phiaj ntawm tsab xov xwm no yog los tham txog peb thawj qhov kev siv MLflow.

Peb mam li pib qhov kev tshuaj xyuas MLflow los ntawm nws tus neeg rau zaub mov taug qab thiab teev tag nrho cov iterations ntawm txoj kev kawm. Tom qab ntawd peb yuav qhia peb qhov kev paub ntawm kev txuas Spark nrog MLflow siv UDF.

Ntsiab Lus

Peb nyob hauv Alpha Health Peb siv tshuab kev kawm thiab kev txawj ntse txawj ntse los txhawb cov neeg los saib xyuas lawv txoj kev noj qab haus huv thiab kev noj qab haus huv. Yog vim li cas cov qauv kev kawm tshuab yog lub hauv paus ntawm cov ntaub ntawv kev tshawb fawb cov khoom peb tsim, thiab yog vim li cas peb thiaj li tau kos rau MLflow, lub platform qhib uas npog txhua yam ntawm lub tshuab kev kawm lifecycle.

MLflow

Lub hom phiaj tseem ceeb ntawm MLflow yog muab cov txheej txheem ntxiv rau saum cov tshuab kev kawm uas yuav tso cai rau cov kws tshawb fawb cov ntaub ntawv ua haujlwm nrog yuav luag txhua lub tsev qiv ntawv kawm tshuab (h2o ua, keras, loj, pytorch, sklearn ΠΈ tensorflow), coj nws txoj haujlwm mus rau qib tom ntej.

MLflow muab peb yam:

  • mus txog qhovtwg - Cov ntaub ntawv kaw tseg thiab thov rau kev sim: cov lej, cov ntaub ntawv, teeb tsa thiab cov txiaj ntsig. Kev soj ntsuam cov txheej txheem ntawm kev tsim qauv yog qhov tseem ceeb heev.
  • tej yaam num - Ntim hom kom khiav ntawm txhua lub platform (piv txwv li. SageMaker)
  • qauv - ib hom qauv rau xa cov qauv rau ntau yam khoom siv.

MLflow (hauv alpha thaum lub sijhawm sau ntawv) yog qhov qhib qhov chaw uas tso cai rau koj los tswj lub tshuab kev kawm lub neej, suav nrog kev sim, rov siv dua, thiab xa mus.

Kev teeb tsa MLflow

Txhawm rau siv MLflow koj yuav tsum xub teeb tsa koj qhov chaw Python tag nrho, rau qhov no peb yuav siv PyEnv (rau nruab Python ntawm Mac, kos tawm no). Txoj kev no peb tuaj yeem tsim ib puag ncig virtual uas peb yuav nruab tag nrho cov tsev qiv ntawv tsim nyog los khiav nws.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Cia peb nruab cov tsev qiv ntawv xav tau.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Nco tseg: Peb siv PyArrow los khiav cov qauv xws li UDF. Cov versions ntawm PyArrow thiab Numpy yuav tsum tau kho vim tias cov versions tom qab tsis sib haum xeeb.

Tua tawm Tracking UI

MLflow Tracking tso cai rau peb teev thiab nug cov kev sim siv Python thiab SO API. Tsis tas li ntawd, koj tuaj yeem txiav txim siab qhov twg los khaws cov qauv artifacts (localhost, Amazon S3, Azure Blob Cia, Google Huab Cia los yog SFTP server). Txij li thaum peb siv AWS ntawm Alpha Health, peb cov khoom khaws cia yuav yog S3.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow pom zoo kom siv cov ntaub ntawv tsis tu ncua. Cov ntaub ntawv cia yog qhov chaw uas tus neeg rau zaub mov yuav khaws cia khiav thiab sim metadata. Thaum pib lub server, xyuas kom tseeb tias nws taw qhia rau cov ntaub ntawv tsis tu ncua. Ntawm no rau kev sim peb tsuas yog siv /tmp.

Nco ntsoov tias yog tias peb xav siv mlflow server los khiav cov kev sim qub, lawv yuav tsum muaj nyob hauv cov ntaub ntawv khaws cia. Txawm li cas los xij, txawm tias tsis muaj qhov no peb tuaj yeem siv lawv hauv UDF, vim peb tsuas yog xav tau txoj hauv kev rau tus qauv.

Nco tseg: Nco ntsoov tias Kev Taug qab UI thiab tus qauv tus neeg siv yuav tsum tau nkag mus rau qhov chaw kos duab. Ntawd yog, tsis hais txog qhov tseeb tias Tracking UI nyob hauv EC2 piv txwv, thaum khiav MLflow hauv zos, lub tshuab yuav tsum muaj kev nkag ncaj qha rau S3 los sau cov qauv kos duab.

Txuas Spark nrog MLflow
Taug qab UI khaws cov khoom qub rau hauv lub thoob S3

Cov qauv khiav

Thaum tus neeg rau zaub mov khiav khiav, koj tuaj yeem pib cob qhia cov qauv.

Ua piv txwv, peb yuav siv cov cawv hloov pauv los ntawm MLflow piv txwv hauv Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Raws li peb twb tau tham lawm, MLflow tso cai rau koj los teev cov qauv ntsuas, ntsuas, thiab cov khoom cuav kom koj tuaj yeem taug qab qhov lawv hloov pauv li cas. Qhov no yog qhov tseem ceeb heev vim tias txoj hauv kev no peb tuaj yeem tsim cov qauv zoo tshaj plaws los ntawm kev hu rau Tus Neeg Saib Xyuas Kev Taug Kev lossis nkag siab cov lej twg ua qhov yuav tsum tau rov ua dua siv git hash cav ntawm kev cog lus.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Txuas Spark nrog MLflow
Caw iterations

Server ib feem rau tus qauv

Lub MLflow nrhiav neeg rau zaub mov, pib siv "mlflow server" hais kom ua, muaj REST API rau kev taug qab khiav thiab sau cov ntaub ntawv mus rau cov ntaub ntawv hauv zos. Koj tuaj yeem hais qhia qhov chaw nyob ntawm tus neeg rau zaub mov siv qhov hloov pauv ib puag ncig "MLFLOW_TRACKING_URI" thiab MLflow nrhiav API yuav cia li hu rau tus neeg rau zaub mov taug qab ntawm qhov chaw nyob no los tsim / tau txais cov ntaub ntawv xa tawm, ntsuas ntsuas, thiab lwm yam.

Tau qhov twg los: Docs// Khiav ib tus neeg rau zaub mov taug qab

Txhawm rau muab tus qauv nrog rau tus neeg rau zaub mov, peb xav tau cov neeg siv khiav mus txog qhov kawg (saib lub interface pib) thiab Khiav ID ntawm tus qauv.

Txuas Spark nrog MLflow
Khiav ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Txhawm rau pab cov qauv siv MLflow ua haujlwm ua haujlwm, peb yuav xav tau nkag mus rau Kev Tshawb Fawb UI kom tau txais cov ntaub ntawv hais txog tus qauv yooj yim los ntawm kev qhia meej. --run_id.

Thaum tus qauv hu rau tus neeg rau zaub mov mus txog qhovtwg, peb tuaj yeem tau txais tus qauv tshiab kawg.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Khiav qauv los ntawm Spark

Txawm hais tias qhov tseeb tias Cov Neeg Saib Xyuas Kev Taug Xyuas muaj zog txaus los ua cov qauv hauv lub sijhawm tiag tiag, cob qhia lawv thiab siv lub server ua haujlwm (qhov chaw: mlflow // docs // qauv # hauv zos), kev siv Spark (batch lossis streaming) yog ib qho kev daws teeb meem ntau dua vim kev faib khoom.

Xav txog tias koj tsuas yog ua qhov kev cob qhia offline thiab tom qab ntawd siv cov qauv tso zis rau tag nrho koj cov ntaub ntawv. Qhov no yog qhov uas Spark thiab MLflow ci.

Nruab PySpark + Jupyter + Spark

Tau qhov twg los: Pib PySpark - Jupyter

Txhawm rau qhia tias peb siv MLflow qauv li cas rau Spark dataframes, peb yuav tsum teeb tsa Jupyter phau ntawv sau ua haujlwm ua ke nrog PySpark.

Pib los ntawm kev txhim kho qhov tseeb ruaj khov version Apache txim:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/sparkΜ€

Nruab PySpark thiab Jupyter hauv ib puag ncig virtual:

pip install pyspark jupyter

Teeb tsa ib puag ncig hloov pauv:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Muaj kev txiav txim siab notebook-dir, peb tuaj yeem khaws peb cov ntawv sau rau hauv cov ntawv xav tau.

Tua tawm Jupyter los ntawm PySpark

Txij li thaum peb tuaj yeem teeb tsa Jupiter ua tus tsav tsheb PySpark, tam sim no peb tuaj yeem khiav Jupyter phau ntawv hauv cov ntsiab lus ntawm PySpark.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Txuas Spark nrog MLflow

Raws li tau hais los saum toj no, MLflow muab qhov tshwj xeeb rau kev txiav cov qauv kos duab hauv S3. Thaum peb muaj cov qauv xaiv hauv peb txhais tes, peb muaj lub sijhawm los import nws li UDF siv lub module mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Txuas Spark nrog MLflow
PySpark - Tshaj tawm cov txiaj ntsig zoo cawv

Txog rau lub sijhawm no, peb tau tham txog yuav ua li cas siv PySpark nrog MLflow, khiav kev kwv yees zoo ntawm cov cawv txiv hmab tag nrho. Tab sis ua li cas yog tias koj xav siv Python MLflow modules los ntawm Scala Spark?

Peb tau sim qhov no dhau los ntawm kev faib Spark ntsiab lus ntawm Scala thiab Python. Ntawd yog, peb tau sau npe MLflow UDF hauv Python, thiab siv nws los ntawm Scala (yog, tej zaum tsis yog qhov kev daws teeb meem zoo tshaj plaws, tab sis peb muaj dab tsi).

Scala Spark + MLflow

Rau qhov piv txwv no peb yuav ntxiv Toree Kernel rau hauv Jupiter uas twb muaj lawm.

Nruab Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Raws li koj tuaj yeem pom los ntawm phau ntawv txuas nrog, UDF tau sib koom ntawm Spark thiab PySpark. Peb vam tias qhov no yuav muaj txiaj ntsig zoo rau cov neeg nyiam Scala thiab xav siv cov qauv kev kawm hauv kev tsim khoom.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Cov kauj ruam tom ntej

Txawm hais tias MLflow yog nyob rau hauv Alpha version thaum lub sijhawm sau ntawv, nws zoo li pheej hmoo heev. Tsuas yog lub peev xwm los khiav ntau lub tshuab kev kawm lub moj khaum thiab haus lawv los ntawm ib qho kawg nkaus yuav siv cov lus pom zoo rau qib tom ntej.

Tsis tas li ntawd, MLflow coj Data Engineers thiab Data Science cov kws tshaj lij los ze zog ua ke, tso ib txheej txheej ntawm lawv.

Tom qab qhov kev tshawb fawb ntawm MLflow no, peb ntseeg siab tias peb yuav txav mus tom ntej thiab siv nws rau peb cov Spark pipelines thiab cov lus pom zoo.

Nws yuav zoo rau synchronize cov ntaub ntawv cia nrog lub database es tsis txhob ntawm cov ntaub ntawv system. Qhov no yuav tsum muab rau peb ntau qhov kawg uas tuaj yeem siv tib cov ntaub ntawv khaws cia. Piv txwv li, siv ntau zaus Presto ΠΈ Athena nrog tib cov kua nplaum metastore.

Txhawm rau kom ua tiav, Kuv xav hais ua tsaug rau MLFlow zej zog ua rau peb txoj haujlwm nrog cov ntaub ntawv nthuav dav.

Yog tias koj tab tom ua si nrog MLflow, tsis txhob yig sau ntawv rau peb thiab qhia peb seb koj siv nws li cas, thiab ntau ntxiv yog tias koj siv nws hauv kev tsim khoom.

Xav paub ntau ntxiv txog cov chav kawm:
Kev kawm tshuab. Kev kawm yooj yim
Kev kawm tshuab. Hoob kawm Advanced

Nyeem ntxiv:

Tau qhov twg los: www.hab.com

Ntxiv ib saib