Memperluas Spark dengan MLflow

Halo warga Khabrovsk. Seperti yang sudah kami tulis, bulan ini OTUS meluncurkan dua kursus machine learning sekaligus yaitu basis ΠΈ canggih. Sehubungan dengan itu, kami terus membagikan materi yang bermanfaat.

Tujuan artikel ini adalah untuk membicarakan pengalaman pertama kami menggunakan aliran ml.

Kami akan memulai peninjauannya aliran ml dari server pelacakannya dan mencatat semua iterasi penelitian. Selanjutnya kami akan berbagi pengalaman menghubungkan Spark dengan MLflow menggunakan UDF.

Konteks

Kami berada di Kesehatan Alfa Kami menggunakan pembelajaran mesin dan kecerdasan buatan untuk memberdayakan masyarakat agar bertanggung jawab atas kesehatan dan kesejahteraan mereka. Itulah sebabnya model pembelajaran mesin merupakan inti dari produk ilmu data yang kami kembangkan, dan itulah mengapa kami tertarik pada MLflow, sebuah platform sumber terbuka yang mencakup semua aspek siklus hidup pembelajaran mesin.

aliran ml

Tujuan utama MLflow adalah untuk menyediakan lapisan tambahan di atas pembelajaran mesin yang memungkinkan data scientist bekerja dengan hampir semua pustaka pembelajaran mesin (h2o, keras, lompatan, pytorch, sklearn ΠΈ tensorflow), membawa pekerjaannya ke tingkat berikutnya.

MLflow menyediakan tiga komponen:

  • Pelacakan – pencatatan dan permintaan eksperimen: kode, data, konfigurasi, dan hasil. Memantau proses pembuatan model sangatlah penting.
  • Proyek – Format pengemasan untuk dijalankan pada platform apa pun (mis. SageMaker)
  • Model – format umum untuk mengirimkan model ke berbagai alat penerapan.

MLflow (dalam versi alfa pada saat penulisan) adalah platform sumber terbuka yang memungkinkan Anda mengelola siklus hidup pembelajaran mesin, termasuk eksperimen, penggunaan kembali, dan penerapan.

Menyiapkan MLflow

Untuk menggunakan MLflow Anda harus terlebih dahulu menyiapkan seluruh lingkungan Python Anda, untuk ini kami akan menggunakan PyEnv (untuk menginstal Python di Mac, lihat di sini). Dengan cara ini kita dapat membuat lingkungan virtual tempat kita menginstal semua perpustakaan yang diperlukan untuk menjalankannya.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Mari instal perpustakaan yang diperlukan.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Catatan: Kami menggunakan PyArrow untuk menjalankan model seperti UDF. Versi PyArrow dan Numpy perlu diperbaiki karena versi terakhir saling bertentangan.

Luncurkan UI Pelacakan

Pelacakan MLflow memungkinkan kita mencatat dan menanyakan eksperimen menggunakan Python dan ISTIRAHAT API. Selain itu, Anda dapat menentukan tempat menyimpan artefak model (localhost, Amazon S3, Penyimpanan Azure Blob, Penyimpanan Cloud Google ΠΈΠ»ΠΈ server SFTP). Karena kami menggunakan AWS di Alpha Health, penyimpanan artefak kami akan menjadi S3.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow merekomendasikan penggunaan penyimpanan file persisten. Penyimpanan file adalah tempat server menyimpan metadata proses dan eksperimen. Saat memulai server, pastikan server mengarah ke penyimpanan file persisten. Di sini untuk percobaan kita hanya akan menggunakan /tmp.

Ingatlah bahwa jika kita ingin menggunakan server mlflow untuk menjalankan eksperimen lama, eksperimen tersebut harus ada di penyimpanan file. Namun, bahkan tanpa ini kita dapat menggunakannya di UDF, karena kita hanya memerlukan jalur ke modelnya.

Catatan: Perlu diingat bahwa UI Pelacakan dan klien model harus memiliki akses ke lokasi artefak. Artinya, terlepas dari kenyataan bahwa UI Pelacakan berada di instans EC2, saat menjalankan MLflow secara lokal, mesin harus memiliki akses langsung ke S3 untuk menulis model artefak.

Memperluas Spark dengan MLflow
UI pelacakan menyimpan artefak dalam bucket S3

Model lari

Segera setelah server Pelacakan berjalan, Anda dapat mulai melatih model.

Sebagai contoh, kita akan menggunakan modifikasi wine dari contoh MLflow di Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Seperti yang telah kita bahas, MLflow memungkinkan Anda mencatat parameter model, metrik, dan artefak sehingga Anda dapat melacak perkembangannya selama iterasi. Fitur ini sangat berguna karena dengan cara ini kita dapat mereproduksi model terbaik dengan menghubungi server Pelacakan atau memahami kode mana yang melakukan iterasi yang diperlukan menggunakan log hash git dari penerapan.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Memperluas Spark dengan MLflow
Iterasi anggur

Bagian server untuk model

Server pelacakan MLflow, diluncurkan menggunakan perintah β€œserver mlflow”, memiliki REST API untuk melacak proses dan menulis data ke sistem file lokal. Anda dapat menentukan alamat server pelacakan menggunakan variabel lingkungan β€œMLFLOW_TRACKING_URI” dan API pelacakan MLflow akan secara otomatis menghubungi server pelacakan di alamat ini untuk membuat/menerima informasi peluncuran, metrik log, dll.

Sumber: Dokumen// Menjalankan server pelacakan

Untuk menyediakan model dengan server, kita memerlukan server pelacakan yang berjalan (lihat antarmuka peluncuran) dan ID Proses model.

Memperluas Spark dengan MLflow
Jalankan ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Untuk menyajikan model menggunakan fungsionalitas penyajian MLflow, kita memerlukan akses ke UI Pelacakan untuk menerima informasi tentang model hanya dengan menentukan --run_id.

Setelah model menghubungi server Pelacakan, kita bisa mendapatkan titik akhir model baru.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Menjalankan model dari Spark

Terlepas dari kenyataan bahwa server Pelacakan cukup kuat untuk melayani model secara real time, latih mereka dan gunakan fungsionalitas server (sumber: mlflow // dokumen // model # lokal), penggunaan Spark (batch atau streaming) adalah solusi yang lebih ampuh karena distribusi.

Bayangkan Anda melakukan pelatihan secara offline dan kemudian menerapkan model keluaran ke semua data Anda. Di sinilah Spark dan MLflow bersinar.

Instal PySpark + Jupyter + Spark

Sumber: Memulai PySpark - Jupyter

Untuk menunjukkan bagaimana kita menerapkan model MLflow ke kerangka data Spark, kita perlu menyiapkan notebook Jupyter untuk bekerja sama dengan PySpark.

Mulailah dengan menginstal versi stabil terbaru Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/sparkΜ€

Instal PySpark dan Jupyter di lingkungan virtual:

pip install pyspark jupyter

Siapkan variabel lingkungan:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Setelah ditentukan notebook-dir, kita dapat menyimpan buku catatan kita di folder yang diinginkan.

Meluncurkan Jupyter dari PySpark

Karena kami dapat mengonfigurasi Jupiter sebagai driver PySpark, kini kami dapat menjalankan notebook Jupyter dalam konteks PySpark.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Memperluas Spark dengan MLflow

Seperti disebutkan di atas, MLflow menyediakan fitur untuk mencatat artefak model di S3. Segera setelah kami memiliki model yang dipilih, kami memiliki kesempatan untuk mengimpornya sebagai UDF menggunakan modul mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Memperluas Spark dengan MLflow
PySpark - Menghasilkan prediksi kualitas anggur

Hingga saat ini, kita telah membahas tentang cara menggunakan PySpark dengan MLflow, menjalankan prediksi kualitas wine di seluruh kumpulan data wine. Namun bagaimana jika Anda perlu menggunakan modul Python MLflow dari Scala Spark?

Kami juga mengujinya dengan membagi konteks Spark antara Scala dan Python. Artinya, kami mendaftarkan MLflow UDF dengan Python, dan menggunakannya dari Scala (ya, mungkin bukan solusi terbaik, tapi yang kami miliki).

Scala Spark + MLflow

Untuk contoh ini kami akan menambahkan Kernel Toree ke Jupiter yang ada.

Instal Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Seperti yang Anda lihat dari buku catatan terlampir, UDF dibagikan antara Spark dan PySpark. Kami berharap bagian ini bermanfaat bagi mereka yang menyukai Scala dan ingin menerapkan model pembelajaran mesin dalam produksi.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Langkah selanjutnya

Meskipun MLflow masih dalam versi Alpha pada saat penulisan, tampilannya cukup menjanjikan. Kemampuan untuk menjalankan beberapa kerangka kerja pembelajaran mesin dan menggunakannya dari satu titik akhir akan membawa sistem pemberi rekomendasi ke tingkat berikutnya.

Selain itu, MLflow mendekatkan para Data Engineer dan spesialis Ilmu Data, sehingga menciptakan lapisan yang sama di antara keduanya.

Setelah eksplorasi MLflow ini, kami yakin bahwa kami akan bergerak maju dan menggunakannya untuk pipeline Spark dan sistem pemberi rekomendasi kami.

Alangkah baiknya jika penyimpanan file disinkronkan dengan database, bukan dengan sistem file. Ini akan memberi kita beberapa titik akhir yang dapat menggunakan penyimpanan file yang sama. Misalnya, gunakan banyak contoh Presto ΠΈ Athena dengan metastore Lem yang sama.

Sebagai rangkuman, saya ingin mengucapkan terima kasih kepada komunitas MLFlow karena telah membuat pekerjaan kami dengan data menjadi lebih menarik.

Jika Anda bermain-main dengan MLflow, jangan ragu untuk menulis kepada kami dan memberi tahu kami cara Anda menggunakannya, dan terlebih lagi jika Anda menggunakannya dalam produksi.

Cari tahu lebih lanjut tentang kursus:
Pembelajaran mesin. Kursus dasar
Pembelajaran mesin. Kursus lanjutan

Baca selengkapnya:

Sumber: www.habr.com

Tambah komentar