Memanjangkan Spark dengan MLflow

Hello, penduduk Khabrovsk. Seperti yang telah kami tulis, bulan ini OTUS melancarkan dua kursus pembelajaran mesin sekaligus, iaitu asas ΠΈ maju. Sehubungan itu, kami terus berkongsi bahan berguna.

Tujuan artikel ini adalah untuk bercakap tentang pengalaman pertama kami menggunakan MLflow.

Kami akan memulakan semakan MLflow daripada pelayan penjejakannya dan log semua lelaran kajian. Kemudian kami akan berkongsi pengalaman kami menyambungkan Spark dengan MLflow menggunakan UDF.

Konteks

Kita di dalam Kesihatan Alpha Kami menggunakan pembelajaran mesin dan kecerdasan buatan untuk memperkasakan orang ramai untuk menjaga kesihatan dan kesejahteraan mereka. Itulah sebabnya model pembelajaran mesin berada di tengah-tengah produk sains data yang kami bangunkan, dan itulah sebabnya kami tertarik kepada MLflow, platform sumber terbuka yang merangkumi semua aspek kitaran hayat pembelajaran mesin.

MLflow

Matlamat utama MLflow adalah untuk menyediakan lapisan tambahan di atas pembelajaran mesin yang akan membolehkan saintis data bekerja dengan hampir mana-mana perpustakaan pembelajaran mesin (h2o, keras, mleap, pytorch, sklearn ΠΈ tensorflow), membawa kerjanya ke peringkat seterusnya.

MLflow menyediakan tiga komponen:

  • Penjejakan – rakaman dan permintaan untuk eksperimen: kod, data, konfigurasi dan keputusan. Memantau proses mencipta model adalah sangat penting.
  • Projek – Format pembungkusan untuk dijalankan pada mana-mana platform (cth. SageMaker)
  • model – format biasa untuk menyerahkan model kepada pelbagai alatan penggunaan.

MLflow (dalam alfa semasa penulisan) ialah platform sumber terbuka yang membolehkan anda mengurus kitaran hayat pembelajaran mesin, termasuk percubaan, penggunaan semula dan penggunaan.

Menyediakan MLflow

Untuk menggunakan MLflow anda perlu menyediakan keseluruhan persekitaran Python anda terlebih dahulu, untuk ini kami akan gunakan PyEnv (untuk memasang Python pada Mac, lihat di sini). Dengan cara ini kita boleh mencipta persekitaran maya di mana kita akan memasang semua perpustakaan yang diperlukan untuk menjalankannya.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Mari pasang perpustakaan yang diperlukan.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Nota: Kami menggunakan PyArrow untuk menjalankan model seperti UDF. Versi PyArrow dan Numpy perlu diperbaiki kerana versi terakhir bercanggah antara satu sama lain.

Lancarkan UI Penjejakan

Penjejakan MLflow membolehkan kami log dan membuat pertanyaan eksperimen menggunakan Python dan REST API. Selain itu, anda boleh menentukan tempat untuk menyimpan artifak model (localhost, Amazon S3, Storan Gumpalan Azure, Storan Awan Google atau pelayan SFTP). Memandangkan kami menggunakan AWS di Alpha Health, storan artifak kami ialah S3.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow mengesyorkan menggunakan storan fail yang berterusan. Storan fail ialah tempat pelayan akan menyimpan metadata larian dan percubaan. Apabila memulakan pelayan, pastikan ia menunjuk ke kedai fail yang berterusan. Di sini untuk percubaan kami hanya akan menggunakan /tmp.

Ingat bahawa jika kita ingin menggunakan pelayan mlflow untuk menjalankan eksperimen lama, ia mesti ada dalam storan fail. Walau bagaimanapun, walaupun tanpa ini, kami boleh menggunakannya dalam UDF, kerana kami hanya memerlukan laluan ke model.

Nota: Perlu diingat bahawa UI Penjejakan dan klien model mesti mempunyai akses kepada lokasi artifak. Iaitu, tanpa mengira fakta bahawa UI Penjejakan berada dalam contoh EC2, apabila menjalankan MLflow secara tempatan, mesin mesti mempunyai akses terus kepada S3 untuk menulis model artifak.

Memanjangkan Spark dengan MLflow
UI Penjejakan menyimpan artifak dalam baldi S3

Model berjalan

Sebaik sahaja pelayan Penjejakan berjalan, anda boleh mula melatih model.

Sebagai contoh, kami akan menggunakan pengubahsuaian wain daripada contoh MLflow dalam Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Seperti yang telah kita bincangkan, MLflow membolehkan anda merekodkan parameter model, metrik dan artifak supaya anda boleh menjejaki cara ia berkembang melalui lelaran. Ciri ini amat berguna kerana dengan cara ini kita boleh menghasilkan semula model terbaik dengan menghubungi pelayan Penjejakan atau memahami kod yang melakukan lelaran yang diperlukan menggunakan log hash git commit.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Memanjangkan Spark dengan MLflow
Lelaran wain

Bahagian pelayan untuk model

Pelayan penjejakan MLflow, yang dilancarkan menggunakan arahan "pelayan mlflow", mempunyai API REST untuk menjejak larian dan menulis data ke sistem fail setempat. Anda boleh menentukan alamat pelayan penjejakan menggunakan pembolehubah persekitaran "MLFLOW_TRACKING_URI" dan API penjejakan MLflow akan menghubungi pelayan penjejakan di alamat ini secara automatik untuk membuat/menerima maklumat pelancaran, metrik log, dsb.

Sumber: Docs// Menjalankan pelayan penjejakan

Untuk menyediakan model dengan pelayan, kami memerlukan pelayan penjejakan yang sedang berjalan (lihat antara muka pelancaran) dan ID Jalankan model.

Memanjangkan Spark dengan MLflow
Jalankan ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Untuk menyediakan model menggunakan fungsi servis MLflow, kami memerlukan akses kepada UI Penjejakan untuk menerima maklumat tentang model hanya dengan menentukan --run_id.

Setelah model menghubungi pelayan Penjejakan, kami boleh mendapatkan titik akhir model baharu.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Model berjalan dari Spark

Walaupun pada hakikatnya pelayan Penjejakan cukup berkuasa untuk menyediakan model dalam masa nyata, melatih mereka dan menggunakan fungsi pelayan (sumber: mlflow // docs // models # local), menggunakan Spark (kelompok atau penstriman) ialah penyelesaian yang lebih berkuasa kerana pengedarannya.

Bayangkan anda hanya melakukan latihan di luar talian dan kemudian menggunakan model output pada semua data anda. Di sinilah Spark dan MLflow bersinar.

Pasang PySpark + Jupyter + Spark

Sumber: Mulakan PySpark - Jupyter

Untuk menunjukkan cara kami menggunakan model MLflow pada bingkai data Spark, kami perlu menyediakan buku nota Jupyter untuk berfungsi bersama-sama dengan PySpark.

Mulakan dengan memasang versi stabil terkini Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/sparkΜ€

Pasang PySpark dan Jupyter dalam persekitaran maya:

pip install pyspark jupyter

Sediakan pembolehubah persekitaran:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Setelah berazam notebook-dir, kita boleh menyimpan buku nota kita dalam folder yang dikehendaki.

Melancarkan Jupyter dari PySpark

Memandangkan kami dapat mengkonfigurasi Musytari sebagai pemacu PySpark, kami kini boleh menjalankan buku nota Jupyter dalam konteks PySpark.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Memanjangkan Spark dengan MLflow

Seperti yang dinyatakan di atas, MLflow menyediakan ciri untuk artifak model pengelogan dalam S3. Sebaik sahaja kami mempunyai model yang dipilih di tangan kami, kami mempunyai peluang untuk mengimportnya sebagai UDF menggunakan modul mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Memanjangkan Spark dengan MLflow
PySpark - Menghasilkan ramalan kualiti wain

Sehingga tahap ini, kami telah bercakap tentang cara menggunakan PySpark dengan MLflow, menjalankan ramalan kualiti wain pada keseluruhan set data wain. Tetapi bagaimana jika anda perlu menggunakan modul Python MLflow dari Scala Spark?

Kami juga menguji ini dengan memisahkan konteks Spark antara Scala dan Python. Iaitu, kami mendaftarkan MLflow UDF dalam Python, dan menggunakannya dari Scala (ya, mungkin bukan penyelesaian terbaik, tetapi apa yang kami ada).

Scala Spark + MLflow

Untuk contoh ini kami akan menambah Toree Kernel ke dalam Musytari yang sedia ada.

Pasang Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Seperti yang anda boleh lihat dari buku nota yang dilampirkan, UDF dikongsi antara Spark dan PySpark. Kami berharap bahagian ini berguna kepada mereka yang menyukai Scala dan ingin menggunakan model pembelajaran mesin dalam pengeluaran.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Langkah seterusnya

Walaupun MLflow berada dalam versi Alpha pada masa penulisan, ia kelihatan agak menjanjikan. Hanya keupayaan untuk menjalankan berbilang rangka kerja pembelajaran mesin dan menggunakannya dari satu titik akhir membawa sistem pengesyor ke peringkat seterusnya.

Selain itu, MLflow mendekatkan Jurutera Data dan pakar Sains Data, meletakkan lapisan yang sama di antara mereka.

Selepas penerokaan MLflow ini, kami yakin bahawa kami akan bergerak ke hadapan dan menggunakannya untuk saluran paip Spark dan sistem pengesyor kami.

Adalah lebih baik untuk menyegerakkan storan fail dengan pangkalan data dan bukannya sistem fail. Ini sepatutnya memberi kita berbilang titik akhir yang boleh menggunakan storan fail yang sama. Sebagai contoh, gunakan berbilang contoh Presto ΠΈ Athena dengan metastor Gam yang sama.

Untuk meringkaskan, saya ingin mengucapkan terima kasih kepada komuniti MLFlow kerana menjadikan kerja kami dengan data lebih menarik.

Jika anda bermain-main dengan MLflow, jangan teragak-agak untuk menulis kepada kami dan beritahu kami cara anda menggunakannya, dan lebih-lebih lagi jika anda menggunakannya dalam pengeluaran.

Ketahui lebih lanjut mengenai kursus:
Pembelajaran mesin. Kursus asas
Pembelajaran mesin. Kursus lanjutan

Baca lebih lanjut:

Sumber: www.habr.com

Tambah komen