ขยาย Spark ด้วย MLflow

สวัสดี Khabrovites อย่างที่เราเขียนไปแล้ว เดือนนี้ OTUS เปิดตัวสองหลักสูตรเกี่ยวกับการเรียนรู้ของเครื่องพร้อมกัน ได้แก่ ฐาน и ขั้นสูง. ในเรื่องนี้ เรายังคงแบ่งปันเนื้อหาที่เป็นประโยชน์ต่อไป

บทความนี้มีวัตถุประสงค์เพื่อพูดคุยเกี่ยวกับประสบการณ์ครั้งแรกของเรากับ ม.ล.โฟลว์.

เราจะเริ่มรีวิว ม.ล.โฟลว์ จากเซิร์ฟเวอร์การติดตามและเปิดฉากการทำซ้ำทั้งหมดของการศึกษา จากนั้นเราจะแบ่งปันประสบการณ์การเชื่อมต่อ Spark กับ MLflow โดยใช้ UDF

สิ่งแวดล้อม

เราอยู่ใน สุขภาพอัลฟ่า เราใช้การเรียนรู้ของเครื่องและปัญญาประดิษฐ์เพื่อช่วยให้ผู้คนดูแลสุขภาพและความเป็นอยู่ที่ดีของพวกเขา นี่คือสาเหตุที่โมเดลแมชชีนเลิร์นนิงเป็นแกนหลักของผลิตภัณฑ์ข้อมูลที่เราพัฒนา และเหตุใด MLflow ซึ่งเป็นแพลตฟอร์มโอเพ่นซอร์สที่ครอบคลุมทุกแง่มุมของวงจรชีวิตของแมชชีนเลิร์นนิงจึงได้รับความสนใจจากเรา

ม.ล.โฟลว์

เป้าหมายหลักของ MLflow คือการจัดเตรียมเลเยอร์เพิ่มเติมที่ด้านบนของแมชชีนเลิร์นนิง ซึ่งจะช่วยให้นักวิทยาศาสตร์ข้อมูลสามารถทำงานกับไลบรารีแมชชีนเลิร์นนิงได้เกือบทุกชนิด (h2o, Keras, กระโดด, ไฟฉาย, สเลิร์น и tensorflow) ยกระดับงานของเธอไปอีกขั้น

MLflow มีสามองค์ประกอบ:

  • การติดตาม – การบันทึกและคำขอสำหรับการทดลอง: รหัส ข้อมูล การกำหนดค่า และผลลัพธ์ สิ่งสำคัญคือต้องปฏิบัติตามขั้นตอนการสร้างแบบจำลอง
  • โครงการ – รูปแบบบรรจุภัณฑ์สำหรับรันบนแพลตฟอร์มใดก็ได้ (เช่น SageMaker)
  • Models เป็นรูปแบบทั่วไปสำหรับการส่งโมเดลไปยังเครื่องมือการปรับใช้ต่างๆ

MLflow (อัลฟ่า ณ เวลาที่เขียน) เป็นแพลตฟอร์มโอเพ่นซอร์สที่ให้คุณจัดการวงจรชีวิตของแมชชีนเลิร์นนิง รวมถึงการทดลอง การใช้ซ้ำ และการปรับใช้

การตั้งค่า MLflow

หากต้องการใช้ MLflow คุณต้องตั้งค่าสภาพแวดล้อม Python ทั้งหมดก่อน เราจะใช้สิ่งนี้ PyEnv (หากต้องการติดตั้ง Python บน Mac โปรดดู ที่นี่). ดังนั้นเราจึงสามารถสร้างสภาพแวดล้อมเสมือนจริงที่เราจะติดตั้งไลบรารีทั้งหมดที่จำเป็นในการเรียกใช้

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

ติดตั้งไลบรารีที่จำเป็น

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

หมายเหตุ: เรากำลังใช้ PyArrow เพื่อเรียกใช้โมเดลเช่น UDF เวอร์ชันของ PyArrow และ Numpy จำเป็นต้องได้รับการแก้ไข เนื่องจากเวอร์ชันล่าสุดขัดแย้งกัน

เรียกใช้ UI การติดตาม

การติดตาม MLflow ช่วยให้เราสามารถบันทึกและค้นหาการทดสอบด้วย Python และ REST เอพีไอ นอกจากนี้ คุณสามารถกำหนดตำแหน่งที่จะจัดเก็บโมเดลอาร์ติแฟกต์ (localhost, Amazon S3, ที่เก็บข้อมูล Azure Blob, การจัดเก็บข้อมูล Google Cloud หรือ เซิร์ฟเวอร์ SFTP). เนื่องจากเราใช้ AWS ที่ Alpha Health S3 จะเป็นที่เก็บข้อมูลสำหรับสิ่งประดิษฐ์

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow แนะนำให้ใช้ที่จัดเก็บไฟล์ถาวร ที่เก็บไฟล์เป็นที่ที่เซิร์ฟเวอร์จะจัดเก็บข้อมูลเมตาที่เรียกใช้และทดสอบ เมื่อเริ่มต้นเซิร์ฟเวอร์ ตรวจสอบให้แน่ใจว่าชี้ไปยังที่เก็บไฟล์ถาวร ที่นี่เพื่อการทดลองเราจะใช้ /tmp.

โปรดทราบว่าหากเราต้องการใช้เซิร์ฟเวอร์ mlflow เพื่อรันการทดสอบเก่า การทดสอบเหล่านั้นจะต้องอยู่ในที่เก็บไฟล์ อย่างไรก็ตาม แม้จะไม่มีสิ่งนี้ เราก็สามารถใช้มันใน UDF ได้ เนื่องจากเราต้องการเพียงพาธไปยังโมเดลเท่านั้น

หมายเหตุ: โปรดทราบว่า Tracking UI และไคลเอนต์โมเดลต้องมีสิทธิ์เข้าถึงตำแหน่งของวัตถุ นั่นคือ ไม่ว่าข้อเท็จจริงที่ว่า Tracking UI จะอยู่ในอินสแตนซ์ EC2 ก็ตาม เมื่อเรียกใช้ MLflow ภายในเครื่อง เครื่องจะต้องมีสิทธิ์เข้าถึงโดยตรงไปยัง S3 เพื่อเขียนแบบจำลองสิ่งประดิษฐ์

ขยาย Spark ด้วย MLflow
UI การติดตามจัดเก็บสิ่งประดิษฐ์ในบัคเก็ต S3

โมเดลการวิ่ง

ทันทีที่เซิร์ฟเวอร์การติดตามทำงาน คุณสามารถเริ่มฝึกโมเดลได้

ตัวอย่างเช่น เราจะใช้การปรับเปลี่ยนไวน์จากตัวอย่าง MLflow ใน สเลิร์น.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

ดังที่เราได้กล่าวไปแล้ว MLflow อนุญาตให้คุณบันทึกพารามิเตอร์ เมตริก และโมเดลอาร์ติแฟกต์ เพื่อให้คุณสามารถติดตามว่าพวกมันพัฒนาอย่างไรในรูปแบบการวนซ้ำ คุณลักษณะนี้มีประโยชน์อย่างมาก เนื่องจากช่วยให้เราสามารถสร้างแบบจำลองที่ดีที่สุดได้โดยติดต่อเซิร์ฟเวอร์การติดตามหรือทำความเข้าใจว่าโค้ดใดดำเนินการวนซ้ำที่จำเป็นโดยใช้บันทึกแฮช git ของการคอมมิต

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

ขยาย Spark ด้วย MLflow
การทำซ้ำไวน์

ด้านหลังสำหรับรุ่น

เซิร์ฟเวอร์การติดตาม MLflow ที่เปิดใช้งานด้วยคำสั่ง "เซิร์ฟเวอร์ mlflow" มี REST API สำหรับการติดตามการรันและการเขียนข้อมูลไปยังระบบไฟล์ในเครื่อง คุณสามารถระบุที่อยู่ของเซิร์ฟเวอร์การติดตามโดยใช้ตัวแปรสภาพแวดล้อม "MLFLOW_TRACKING_URI" และ API การติดตามของ MLflow จะติดต่อเซิร์ฟเวอร์การติดตามโดยอัตโนมัติตามที่อยู่นี้เพื่อสร้าง/รับข้อมูลการเรียกใช้ เมตริกการบันทึก ฯลฯ

ที่มา: เอกสาร // เรียกใช้เซิร์ฟเวอร์การติดตาม

เพื่อให้โมเดลมีเซิร์ฟเวอร์ เราจำเป็นต้องมีเซิร์ฟเวอร์ติดตามที่กำลังทำงานอยู่ (ดูอินเทอร์เฟซการเรียกใช้งาน) และ Run ID ของโมเดล

ขยาย Spark ด้วย MLflow
รหัสเรียกใช้

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

ในการให้บริการโมเดลโดยใช้ฟังก์ชันการเสิร์ฟ MLflow เราจำเป็นต้องเข้าถึง UI การติดตามเพื่อรับข้อมูลเกี่ยวกับโมเดลโดยเพียงแค่ระบุ --run_id.

เมื่อโมเดลติดต่อกับ Tracking Server เราก็จะได้โมเดลปลายทางใหม่

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

เรียกใช้โมเดลจาก Spark

แม้ว่าข้อเท็จจริงที่ว่าเซิร์ฟเวอร์การติดตามจะมีประสิทธิภาพมากพอที่จะให้บริการโมเดลแบบเรียลไทม์ แต่ให้ฝึกและใช้ฟังก์ชันการทำงานของเซิร์ฟเวอร์ (ที่มา: mlflow // docs // รุ่น #local) การใช้ Spark (เป็นชุดหรือการสตรีม) เป็นโซลูชันที่ทรงพลังยิ่งขึ้นเนื่องจากการกระจาย

ลองนึกภาพว่าคุณเพิ่งฝึกแบบออฟไลน์ แล้วใช้โมเดลเอาต์พุตกับข้อมูลทั้งหมดของคุณ นี่คือที่มาของ Spark และ MLflow

ติดตั้ง PySpark + Jupyter + Spark

ที่มา: เริ่มต้นใช้งาน PySpark - Jupyter

เพื่อแสดงให้เห็นว่าเราใช้โมเดล MLflow กับ Spark dataframes อย่างไร เราจำเป็นต้องตั้งค่าโน้ตบุ๊ก Jupyter ให้ทำงานร่วมกับ PySpark

เริ่มต้นด้วยการติดตั้งเวอร์ชันเสถียรล่าสุด Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

ติดตั้ง PySpark และ Jupyter ในสภาพแวดล้อมเสมือนจริง:

pip install pyspark jupyter

ตั้งค่าตัวแปรสภาพแวดล้อม:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

มีการกำหนด notebook-dirเราก็จะสามารถจัดเก็บสมุดบันทึกของเราในโฟลเดอร์ที่ต้องการได้

เรียกใช้ Jupyter จาก PySpark

เนื่องจากเราสามารถตั้งค่า Jupiter เป็นไดรเวอร์ PySpark ได้ เราจึงสามารถเรียกใช้สมุดบันทึก Jupyter ในบริบทของ PySpark

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

ขยาย Spark ด้วย MLflow

ตามที่กล่าวไว้ข้างต้น MLflow มีฟังก์ชันของการบันทึกแบบจำลองวัตถุใน S3 ทันทีที่เรามีโมเดลที่เลือกอยู่ในมือ เรามีโอกาสนำเข้าเป็น UDF โดยใช้โมดูล mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

ขยาย Spark ด้วย MLflow
PySpark - ทำนายคุณภาพไวน์

จนถึงจุดนี้ เราได้พูดคุยเกี่ยวกับวิธีใช้ PySpark กับ MLflow โดยการเรียกใช้การทำนายคุณภาพไวน์ในชุดข้อมูลไวน์ทั้งหมด แต่ถ้าคุณต้องการใช้โมดูล Python MLflow จาก Scala Spark ล่ะ

เราได้ทดสอบสิ่งนี้ด้วยการแยกบริบทของ Spark ระหว่าง Scala และ Python นั่นคือเราลงทะเบียน MLflow UDF ใน Python และใช้จาก Scala (ใช่ อาจไม่ใช่ทางออกที่ดีที่สุด แต่เป็นสิ่งที่เรามีอยู่)

สกาล่าสปาร์ค + MLflow

สำหรับตัวอย่างนี้ เราจะเพิ่ม เคอร์เนล Toree เข้าสู่ดาวพฤหัสบดีที่มีอยู่

ติดตั้ง Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

ดังที่คุณเห็นจากสมุดบันทึกที่แนบมา UDF จะถูกแชร์ระหว่าง Spark และ PySpark เราหวังว่าส่วนนี้จะเป็นประโยชน์สำหรับผู้ที่ชื่นชอบ Scala และต้องการปรับใช้โมเดลแมชชีนเลิร์นนิงในการผลิต

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

ขั้นตอนถัดไป

แม้ว่า MLflow จะอยู่ในอัลฟ่าในขณะที่เขียน แต่ก็ดูมีแนวโน้มที่ดี เพียงแค่สามารถเรียกใช้เฟรมเวิร์กแมชชีนเลิร์นนิงหลายตัวและใช้งานได้จากจุดสิ้นสุดเดียว ระบบผู้แนะนำจะก้าวไปสู่อีกระดับ

นอกจากนี้ MLflow ยังทำให้วิศวกรข้อมูลและนักวิทยาศาสตร์ข้อมูลใกล้ชิดกันมากขึ้น โดยวางเลเยอร์ร่วมกันระหว่างพวกเขา

หลังจากการสำรวจ MLflow นี้ เราแน่ใจว่าจะดำเนินการต่อและใช้สำหรับไปป์ไลน์ Spark และระบบผู้แนะนำของเรา

เป็นการดีที่จะซิงโครไนซ์ที่เก็บไฟล์กับฐานข้อมูลแทนระบบไฟล์ สิ่งนี้ควรให้ปลายทางหลายรายการที่สามารถใช้ไฟล์ร่วมกันได้ ตัวอย่างเช่น ใช้หลายอินสแตนซ์ โอมเพี้ยง и Athena ด้วย Metastore ของ Glue เดียวกัน

โดยสรุป ฉันอยากจะกล่าวขอบคุณชุมชน MLFlow ที่ทำให้งานของเรากับข้อมูลน่าสนใจยิ่งขึ้น

หากคุณเล่นกับ MLflow อย่าลังเลที่จะเขียนถึงเราและบอกเราว่าคุณใช้มันอย่างไร และยิ่งไปกว่านั้น ถ้าคุณใช้มันในการผลิต

เรียนรู้เพิ่มเติมเกี่ยวกับหลักสูตร:
การเรียนรู้ของเครื่อง หลักสูตรพื้นฐาน
การเรียนรู้ของเครื่อง หลักสูตรขั้นสูง

อ่านเพิ่มเติม:

ที่มา: will.com

เพิ่มความคิดเห็น