Mở rộng Spark với MLflow

Xin chào, Khabrovites. Như chúng tôi đã viết, tháng này OTUS ra mắt hai khóa học về máy học cùng một lúc, đó là cơ sở и trình độ cao. Về vấn đề này, chúng tôi tiếp tục chia sẻ tài liệu hữu ích.

Mục đích của bài viết này là để nói về kinh nghiệm đầu tiên của chúng tôi với Dòng chảy ML.

Chúng tôi sẽ bắt đầu xem xét Dòng chảy ML từ máy chủ theo dõi của nó và prolog tất cả các lần lặp lại của nghiên cứu. Sau đó, chúng tôi sẽ chia sẻ kinh nghiệm kết nối Spark với MLflow bằng UDF.

Bối cảnh

Chúng tôi đang trong sức khỏe anpha chúng tôi sử dụng máy học và trí tuệ nhân tạo để hỗ trợ mọi người chăm sóc sức khỏe và hạnh phúc của họ. Đây là lý do tại sao các mô hình máy học là cốt lõi của các sản phẩm dữ liệu mà chúng tôi phát triển và tại sao MLflow, một nền tảng nguồn mở bao gồm tất cả các khía cạnh của vòng đời máy học, lại thu hút sự chú ý của chúng tôi.

Dòng chảy ML

Mục tiêu chính của MLflow là cung cấp một lớp bổ sung trên đầu học máy cho phép các nhà khoa học dữ liệu làm việc với hầu hết mọi thư viện máy học (H2o, máy ảnh, nhảy nhót, ngọn đuốc, sklearning и tensorflow), đưa công việc của cô ấy lên một tầm cao mới.

MLflow cung cấp ba thành phần:

  • Theo dõi – ghi lại và yêu cầu thí nghiệm: mã, dữ liệu, cấu hình và kết quả. Điều rất quan trọng là phải tuân theo quy trình tạo mô hình.
  • Dự án – Định dạng đóng gói để chạy trên bất kỳ nền tảng nào (ví dụ: SageMaker)
  • mô hình là một định dạng phổ biến để gửi mô hình tới các công cụ triển khai khác nhau.

MLflow (alpha tại thời điểm viết) là một nền tảng mã nguồn mở cho phép bạn quản lý vòng đời máy học, bao gồm thử nghiệm, sử dụng lại và triển khai.

Thiết lập MLflow

Để sử dụng MLflow, trước tiên bạn phải thiết lập toàn bộ môi trường Python, để làm điều này, chúng tôi sẽ sử dụng PyEnv (để cài đặt Python trên máy Mac, hãy xem đây). Vì vậy, chúng tôi có thể tạo một môi trường ảo nơi chúng tôi sẽ cài đặt tất cả các thư viện cần thiết để chạy.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Cài đặt các thư viện cần thiết.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Lưu ý: Chúng tôi đang sử dụng PyArrow để chạy các mô hình như UDF. Các phiên bản của PyArrow và Numpy cần được sửa vì các phiên bản mới nhất xung đột với nhau.

Khởi chạy giao diện người dùng theo dõi

Theo dõi MLflow cho phép chúng tôi ghi nhật ký và truy vấn các thử nghiệm bằng Python và REST của API. Ngoài ra, bạn có thể xác định nơi lưu trữ các tạo phẩm mô hình (localhost, Amazon S3, Bộ nhớ Azure Blob, Lưu trữ đám mây của Google hoặc máy chủ SFTP). Vì chúng tôi sử dụng AWS tại Alpha Health nên S3 sẽ là nơi lưu trữ các thành phần lạ.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow khuyến nghị sử dụng lưu trữ tệp liên tục. Bộ lưu trữ tệp là nơi máy chủ sẽ lưu trữ siêu dữ liệu chạy và thử nghiệm. Khi khởi động máy chủ, hãy đảm bảo rằng nó trỏ đến bộ lưu trữ tệp liên tục. Ở đây, để thử nghiệm, chúng ta sẽ chỉ sử dụng /tmp.

Hãy nhớ rằng nếu chúng tôi muốn sử dụng máy chủ mlflow để chạy các thử nghiệm cũ, chúng phải có mặt trong kho lưu trữ tệp. Tuy nhiên, ngay cả khi không có điều này, chúng tôi vẫn có thể sử dụng chúng trong UDF, vì chúng tôi chỉ cần đường dẫn đến mô hình.

Lưu ý: Hãy nhớ rằng Giao diện người dùng theo dõi và ứng dụng khách mô hình phải có quyền truy cập vào vị trí của phần mềm. Nghĩa là, bất kể giao diện người dùng theo dõi nằm trong phiên bản EC2, khi chạy MLflow cục bộ, máy phải có quyền truy cập trực tiếp vào S3 để viết các mô hình tạo tác.

Mở rộng Spark với MLflow
Giao diện người dùng theo dõi lưu trữ các thành phần tạo tác trong bộ chứa S3

Chạy Người Mẫu

Ngay khi máy chủ Theo dõi đang chạy, bạn có thể bắt đầu huấn luyện các mô hình.

Ví dụ: chúng tôi sẽ sử dụng sửa đổi rượu vang từ ví dụ MLflow trong Sklearning.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Như chúng tôi đã nói, MLflow cho phép bạn ghi nhật ký các tham số, số liệu và các tạo phẩm mô hình để bạn có thể theo dõi cách chúng phát triển dưới dạng các lần lặp lại. Tính năng này cực kỳ hữu ích vì nó cho phép chúng tôi tái tạo mô hình tốt nhất bằng cách liên hệ với máy chủ Theo dõi hoặc hiểu mã nào đã thực hiện phép lặp cần thiết bằng cách sử dụng nhật ký băm git của các lần xác nhận.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Mở rộng Spark với MLflow
lặp lại rượu vang

Mặt sau cho mô hình

Máy chủ theo dõi MLflow được khởi chạy với lệnh “máy chủ mlflow” có API REST để theo dõi các lần chạy và ghi dữ liệu vào hệ thống tệp cục bộ. Bạn có thể chỉ định địa chỉ của máy chủ theo dõi bằng cách sử dụng biến môi trường "MLFLOW_TRACKING_URI" và API theo dõi MLflow sẽ tự động liên hệ với máy chủ theo dõi tại địa chỉ này để tạo/nhận thông tin khởi chạy, chỉ số ghi nhật ký, v.v.

Nguồn: Tài liệu // Chạy máy chủ theo dõi

Để cung cấp cho mô hình một máy chủ, chúng tôi cần một máy chủ theo dõi hoạt động (xem giao diện khởi chạy) và Run ID của mô hình.

Mở rộng Spark với MLflow
Chạy ID

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Để phân phối các mô hình bằng chức năng phân phối MLflow, chúng tôi cần truy cập vào Giao diện người dùng theo dõi để lấy thông tin về mô hình bằng cách chỉ định --run_id.

Sau khi mô hình liên hệ với Máy chủ theo dõi, chúng tôi có thể nhận điểm cuối mô hình mới.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Chạy mô hình từ Spark

Mặc dù thực tế là máy chủ Theo dõi đủ mạnh để phục vụ các mô hình trong thời gian thực, hãy huấn luyện chúng và sử dụng chức năng của máy chủ (nguồn: mlflow // tài liệu // mô hình #local), sử dụng Spark (theo lô hoặc phát trực tuyến) thậm chí còn là một giải pháp mạnh mẽ hơn do khả năng phân phối của nó.

Hãy tưởng tượng rằng bạn vừa thực hiện đào tạo ngoại tuyến và sau đó áp dụng mô hình đầu ra cho tất cả dữ liệu của mình. Đây là nơi Spark và MLflow trở thành của riêng họ.

Cài đặt PySpark + Jupyter + Spark

Nguồn: Bắt đầu PySpark - Jupyter

Để chỉ ra cách chúng tôi áp dụng các mô hình MLflow cho khung dữ liệu Spark, chúng tôi cần thiết lập sổ ghi chép Jupyter để hoạt động với PySpark.

Bắt đầu bằng cách cài đặt phiên bản ổn định mới nhất Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Cài đặt PySpark và Jupyter trong môi trường ảo:

pip install pyspark jupyter

Thiết lập biến môi trường:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

đã xác định notebook-dir, chúng tôi sẽ có thể lưu trữ sổ ghi chép của mình trong thư mục mong muốn.

Chạy Jupyter từ PySpark

Vì chúng tôi có thể thiết lập Sao Mộc làm trình điều khiển PySpark, giờ đây chúng tôi có thể chạy sổ ghi chép Jupyter trong ngữ cảnh PySpark.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Mở rộng Spark với MLflow

Như đã đề cập ở trên, MLflow cung cấp chức năng ghi lại các thành phần tạo tác của mô hình trong S3. Ngay sau khi chúng tôi có mô hình đã chọn trong tay, chúng tôi có cơ hội nhập mô hình đó dưới dạng UDF bằng cách sử dụng mô-đun mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Mở rộng Spark với MLflow
PySpark - Dự đoán chất lượng rượu

Cho đến thời điểm này, chúng tôi đã nói về cách sử dụng PySpark với MLflow bằng cách chạy dự đoán chất lượng rượu trên toàn bộ tập dữ liệu rượu. Nhưng nếu bạn cần sử dụng các mô-đun Python MLflow từ Scala Spark thì sao?

Chúng tôi cũng đã thử nghiệm điều này bằng cách tách bối cảnh Spark giữa Scala và Python. Đó là, chúng tôi đã đăng ký MLflow UDF bằng Python và sử dụng nó từ Scala (vâng, có thể không phải là giải pháp tốt nhất, nhưng là những gì chúng tôi có).

Scala Spark + MLflow

Đối với ví dụ này, chúng tôi sẽ thêm hạt toree thành một sao Mộc hiện có.

Cài đặt Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Như bạn có thể thấy từ sổ ghi chép đính kèm, UDF được chia sẻ giữa Spark và PySpark. Chúng tôi hy vọng rằng phần này sẽ hữu ích cho những ai yêu thích Scala và muốn triển khai các mô hình học máy vào sản xuất.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Bước tiếp theo

Mặc dù MLflow đang ở giai đoạn Alpha tại thời điểm viết bài, nhưng nó có vẻ khá hứa hẹn. Việc có thể chạy nhiều khung máy học và sử dụng chúng từ một điểm cuối duy nhất sẽ đưa hệ thống đề xuất lên một tầm cao mới.

Ngoài ra, MLflow mang các kỹ sư Dữ liệu và Nhà khoa học dữ liệu lại gần nhau hơn, tạo ra một lớp chung giữa họ.

Sau khi khám phá MLflow này, chúng tôi chắc chắn sẽ tiếp tục và sử dụng nó cho các hệ thống đề xuất và quy trình Spark của chúng tôi.

Sẽ thật tuyệt nếu đồng bộ hóa bộ lưu trữ tệp với cơ sở dữ liệu thay vì hệ thống tệp. Điều này sẽ cung cấp cho chúng tôi nhiều điểm cuối có thể sử dụng cùng một tệp chia sẻ. Ví dụ: sử dụng nhiều trường hợp Mau и Athena với cùng một di căn Keo.

Tóm lại, tôi muốn gửi lời cảm ơn đến cộng đồng MLFlow vì đã làm cho công việc của chúng tôi với dữ liệu trở nên thú vị hơn.

Nếu bạn chơi với MLflow, vui lòng viết thư cho chúng tôi và cho chúng tôi biết cách bạn sử dụng nó và thậm chí nhiều hơn thế nếu bạn sử dụng nó trong sản xuất.

Tìm hiểu thêm về các khóa học:
học máy. Khóa học cơ bản
học máy. khóa học nâng cao

Đọc thêm:

Nguồn: www.habr.com

Thêm một lời nhận xét