Kukulitsa Spark ndi MLflow

Moni, okhala ku Khabrovsk. Monga talembera kale, mwezi uno OTUS ikuyambitsa maphunziro a makina awiri nthawi imodzi, yomwe ndi m'munsi ΠΈ kupita patsogolo. Pankhani imeneyi, tikupitirizabe kuuza ena mfundo zothandiza.

Cholinga cha nkhaniyi ndikulankhula za zomwe takumana nazo koyamba pogwiritsa ntchito MLflow.

Tiyamba ndemanga MLflow kuchokera pa seva yake yotsatila ndikulemba zobwereza zonse za phunziroli. Kenako tigawana zomwe takumana nazo polumikiza Spark ndi MLflow pogwiritsa ntchito UDF.

Nkhani

Tili mkati Alpha Health Timagwiritsa ntchito kuphunzira pamakina ndi luntha lochita kupanga kuti tipatse mphamvu anthu kuti aziyang'anira thanzi lawo ndi moyo wawo. Ichi ndichifukwa chake zitsanzo zophunzirira zamakina zili pakatikati pa zinthu za sayansi ya data zomwe timapanga, ndichifukwa chake tinakopeka ndi MLflow, nsanja yotseguka yomwe imakhudza mbali zonse za moyo wophunzirira makina.

MLflow

Cholinga chachikulu cha MLflow ndikupereka zowonjezera zowonjezera pamwamba pa kuphunzira kwa makina zomwe zingalole asayansi a deta kuti azigwira ntchito ndi pafupifupi laibulale iliyonse yophunzirira makina (h2o ndi, makamera, mleap, ptorch, dziwa ΠΈ tensorflow), kutengera ntchito yake pamlingo wina.

MLflow imapereka zigawo zitatu:

  • kutsatira - kujambula ndi zopempha zoyesera: ma code, deta, kasinthidwe ndi zotsatira. Kuyang'anira njira yopangira chitsanzo ndikofunikira kwambiri.
  • ntchito - Mapangidwe oyika kuti ayendetse papulatifomu iliyonse (mwachitsanzo. SageMaker)
  • zitsanzo - mawonekedwe wamba popereka zitsanzo ku zida zosiyanasiyana zotumizira.

MLflow (mu alpha panthawi yolemba) ndi nsanja yotseguka yomwe imakupatsani mwayi wowongolera moyo wophunzirira makina, kuphatikiza kuyesa, kugwiritsanso ntchito, ndi kutumiza.

Настройка MLflow

Kuti mugwiritse ntchito MLflow muyenera choyamba kukhazikitsa malo anu onse a Python, chifukwa cha izi tidzagwiritsa ntchito PyEnv (kukhazikitsa Python pa Mac, onani apa). Mwanjira iyi titha kupanga malo enieni pomwe tidzakhazikitsa malaibulale onse ofunikira kuti tiyendetse.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Tiyeni tiyike malaibulale ofunikira.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Chidziwitso: Timagwiritsa ntchito PyArrow kuyendetsa mitundu monga UDF. Mitundu ya PyArrow ndi Numpy idafunikira kukonzedwa chifukwa zomasulira zomalizazi zidasemphana.

Yambitsani Tracking UI

MLflow Tracking imatilola kulowa ndikufunsa zoyeserera pogwiritsa ntchito Python ndi Bwerani API. Kuphatikiza apo, mutha kudziwa komwe mungasungire zojambula zachitsanzo (localhost, Amazon S3, Azure Blob Storage, Google Cloud Storage kapena SFTP seva). Popeza timagwiritsa ntchito AWS ku Alpha Health, malo athu osungiramo zinthu zakale adzakhala S3.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow imalimbikitsa kugwiritsa ntchito kusungirako mafayilo mosalekeza. Kusunga mafayilo ndipamene seva imasunga run and kuyesa metadata. Mukayamba seva, onetsetsani kuti ikuloza ku sitolo yosungira mafayilo. Apa pakuyesera tidzangogwiritsa ntchito /tmp.

Kumbukirani kuti ngati tikufuna kugwiritsa ntchito seva ya mlflow kuyendetsa zoyeserera zakale, ziyenera kukhalapo posungira mafayilo. Komabe, ngakhale popanda izi tikhoza kuzigwiritsa ntchito mu UDF, popeza timangofunikira njira yopita ku chitsanzo.

Zindikirani: Kumbukirani kuti Tracking UI ndi kasitomala wachitsanzo ayenera kukhala ndi mwayi wopeza malo opangidwa. Ndiko kuti, mosasamala kanthu kuti Tracking UI imakhala mu EC2 chitsanzo, pamene ikuyendetsa MLflow kwanuko, makinawo ayenera kukhala ndi mwayi wopita ku S3 kuti alembe zitsanzo zamakono.

Kukulitsa Spark ndi MLflow
Kutsata UI kumasunga zinthu zakale mu chidebe cha S3

Kuthamanga zitsanzo

Seva Yotsatira ikangoyamba, mutha kuyamba kuphunzitsa zitsanzo.

Mwachitsanzo, tidzagwiritsa ntchito kusintha kwa vinyo kuchokera ku MLflow chitsanzo mu Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Monga tafotokozera kale, MLflow imakulolani kuti mulembe magawo amitundu, ma metric, ndi zinthu zakale kuti muzitha kuyang'anira momwe zimasinthira pakubwereza. Izi ndizothandiza kwambiri chifukwa mwanjira iyi titha kupanganso mtundu wabwino kwambiri polumikizana ndi seva Yoyang'anira kapena kumvetsetsa kuti ndi code iti yomwe idachitanso zofunikira pogwiritsa ntchito zipika za git hash of commits.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Kukulitsa Spark ndi MLflow
Kubwereza kwa vinyo

Gawo la seva lachitsanzo

Seva yotsata MLflow, yomwe idakhazikitsidwa pogwiritsa ntchito lamulo la "mlflow seva", ili ndi REST API yotsata mathamangitsidwe ndikulemba deta kumafayilo am'deralo. Mutha kutchula adilesi ya seva pogwiritsa ntchito kusintha kwa chilengedwe "MLFLOW_TRACKING_URI" ndipo MLflow tracking API ilumikizana ndi seva yolondolera pa adilesiyi kuti ipange/kulandira zidziwitso zakutsegulira, ma metrics olowera, ndi zina zambiri.

Source: Docs// Kuyendetsa seva yotsata

Kuti tipereke chitsanzo ndi seva, timafunikira seva yotsatirira (onani mawonekedwe oyambitsa) ndi Run ID yachitsanzo.

Kukulitsa Spark ndi MLflow
Thamangani ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Kuti tigwiritse ntchito mawonekedwe a MLflow kutumikira, tidzafunika mwayi wofikira ku Tracking UI kuti tilandire zambiri zachitsanzocho pongofotokoza. --run_id.

Chitsanzocho chikalumikizana ndi seva Yotsatira, tikhoza kupeza mapeto atsopano.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Mitundu yothamanga kuchokera ku Spark

Ngakhale kuti seva ya Tracking ndi yamphamvu yokwanira kugwiritsa ntchito zitsanzo munthawi yeniyeni, aphunzitseni ndikugwiritsa ntchito seva (gwero: mlflow // docs // zitsanzo # zakomweko), kugwiritsa ntchito Spark (batch kapena kukhamukira) ndi yankho lamphamvu kwambiri chifukwa chogawa.

Tangoganizani kuti mwangochita maphunzirowo osagwiritsa ntchito intaneti kenako ndikugwiritsa ntchito zotulutsa pa data yanu yonse. Apa ndipamene Spark ndi MLflow zimawala.

Ikani PySpark + Jupyter + Spark

Source: Yambani PySpark - Jupyter

Kuti tiwonetse momwe timagwiritsira ntchito zitsanzo za MLflow ku Spark dataframes, tifunika kukhazikitsa zolemba za Jupyter kuti tigwire ntchito limodzi ndi PySpark.

Yambani ndikuyika mtundu waposachedwa kwambiri Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/sparkΜ€

Ikani PySpark ndi Jupyter m'malo enieni:

pip install pyspark jupyter

Kupanga zosintha zachilengedwe:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Atatsimikiza notebook-dir, tikhoza kusunga zolemba zathu mu foda yomwe tikufuna.

Kukhazikitsa Jupyter kuchokera ku PySpark

Popeza tinatha kukonza Jupiter ngati dalaivala wa PySpark, tsopano titha kuyendetsa Jupyter notebook mu PySpark.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Kukulitsa Spark ndi MLflow

Monga tafotokozera pamwambapa, MLflow imapereka mawonekedwe odula mitengo mu S3. Tikakhala ndi chitsanzo chosankhidwa m'manja mwathu, timakhala ndi mwayi woti titengere ngati UDF pogwiritsa ntchito module mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Kukulitsa Spark ndi MLflow
PySpark - Kutulutsa zolosera zamtundu wa vinyo

Mpaka pano, takambirana momwe tingagwiritsire ntchito PySpark ndi MLflow, kuyendetsa maulosi a vinyo pamtundu wonse wa vinyo. Koma bwanji ngati mukufuna kugwiritsa ntchito ma module a Python MLflow kuchokera ku Scala Spark?

Tidayesanso izi pogawa nkhani ya Spark pakati pa Scala ndi Python. Ndiye kuti, tidalembetsa MLflow UDF ku Python, ndikuigwiritsa ntchito kuchokera ku Scala (inde, mwina osati yankho labwino kwambiri, koma zomwe tili nazo).

Scala Spark + MLflow

Kwa chitsanzo ichi tidzawonjezera Toree Kernel kulowa mu Jupiter yomwe ilipo.

Ikani Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Monga mukuwonera m'kabuku kophatikizidwa, UDF imagawidwa pakati pa Spark ndi PySpark. Tikukhulupirira kuti gawoli likhala lothandiza kwa iwo omwe amakonda Scala ndipo akufuna kugwiritsa ntchito makina ophunzirira makina pakupanga.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Masitepe otsatira

Ngakhale MLflow ili mu mtundu wa Alpha panthawi yolemba, ikuwoneka yolimbikitsa. Kungotha ​​kuyendetsa makina ambiri ophunzirira makina ndikuwagwiritsa ntchito kuchokera kumapeto kumatenga makina olimbikitsa kupita pamlingo wina.

Kuphatikiza apo, MLflow imabweretsa akatswiri a Data Engineers ndi Data Science pafupi, ndikuyika gawo limodzi pakati pawo.

Pambuyo pakufufuza uku kwa MLflow, tili ndi chidaliro kuti tidzapita patsogolo ndikuigwiritsa ntchito pamapaipi athu a Spark ndi makina olimbikitsa.

Zingakhale zabwino kugwirizanitsa zosungirako mafayilo ndi database m'malo mwa fayilo. Izi ziyenera kutipatsa malekezero angapo omwe angagwiritse ntchito kusungirako mafayilo komweko. Mwachitsanzo, gwiritsani ntchito zambiri Presto ΠΈ Athena ndi Glue metastore yomweyo.

Mwachidule, ndikufuna kunena zikomo kwa gulu la MLFlow chifukwa chopanga ntchito yathu ndi data kukhala yosangalatsa.

Ngati mukusewera ndi MLflow, musazengereze kutilembera ndi kutiuza momwe mumagwiritsira ntchito, ndipo makamaka ngati mumagwiritsa ntchito popanga.

Dziwani zambiri zamaphunzirowa:
Kuphunzira makina. Basic course
Kuphunzira makina. Maphunziro apamwamba

Werengani zambiri:

Source: www.habr.com

Kuwonjezera ndemanga