Spark'ı MLflow ile Genişletmek

Merhaba Habrovsk sakinleri. Daha önce de yazdığımız gibi OTUS bu ay aynı anda iki makine öğrenimi kursu başlatıyor: temel и ileri. Bu bağlamda faydalı materyaller paylaşmaya devam ediyoruz.

Bu yazının amacı ilk kullanım deneyimimizden bahsetmek. ML akışı.

İncelemeye başlayacağız ML akışı izleme sunucusundan alın ve çalışmanın tüm yinelemelerini günlüğe kaydedin. Daha sonra Spark'ı MLflow'a UDF kullanarak bağlama deneyimimizi paylaşacağız.

Bağlam

Biz Alfa Sağlık İnsanların sağlıklarının ve refahlarının sorumluluğunu üstlenmelerini sağlamak için makine öğrenimini ve yapay zekayı kullanıyoruz. Makine öğrenimi modellerinin geliştirdiğimiz veri bilimi ürünlerinin kalbinde yer almasının nedeni budur ve makine öğrenimi yaşam döngüsünün tüm yönlerini kapsayan açık kaynaklı bir platform olan MLflow'u tercih etmemizin nedeni de budur.

ML akışı

MLflow'un temel amacı, veri bilimcilerinin hemen hemen her makine öğrenimi kitaplığıyla çalışmasına olanak tanıyacak, makine öğreniminin üstüne ek bir katman sağlamaktır (h2o, keras, sıçrama, pytorch, skleöğrenmek и tensorflow), işini bir sonraki seviyeye taşıyor.

MLflow üç bileşen sağlar:

  • Takip – deneylerin kaydedilmesi ve istekleri: kod, veriler, konfigürasyon ve sonuçlar. Model oluşturma sürecini izlemek çok önemlidir.
  • Projeler – Herhangi bir platformda çalışacak paketleme formatı (ör. SageMaker)
  • Modeller – modelleri çeşitli dağıtım araçlarına göndermek için ortak bir format.

MLflow (yazıldığı sırada alfa sürümündeydi), deneme, yeniden kullanım ve dağıtım da dahil olmak üzere makine öğrenimi yaşam döngüsünü yönetmenize olanak tanıyan açık kaynaklı bir platformdur.

MLflow'u ayarlama

MLflow'u kullanmak için öncelikle Python ortamınızın tamamını kurmanız gerekir, bunun için kullanacağız PyEnv (Python'u Mac'e yüklemek için şuraya göz atın: burada). Bu şekilde, onu çalıştırmak için gerekli tüm kütüphaneleri kuracağımız sanal bir ortam oluşturabiliriz.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Gerekli kütüphaneleri kuralım.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Not: UDF gibi modelleri çalıştırmak için PyArrow'u kullanıyoruz. PyArrow ve Numpy sürümlerinin birbirleriyle çakışması nedeniyle düzeltilmesi gerekiyordu.

İzleme Arayüzünü Başlat

MLflow İzleme, Python kullanarak deneyleri günlüğe kaydetmemize ve sorgulamamıza olanak tanır ve DİNLENME API'dir. Ek olarak, model yapıtlarının nerede depolanacağını (localhost, Amazon S3, Azure Blob Depolama, Google Cloud Storage veya SFTP sunucusu). Alpha Health'te AWS kullandığımız için yapı depolama alanımız S3 olacaktır.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow, kalıcı dosya depolamanın kullanılmasını önerir. Dosya depolama, sunucunun çalıştırma ve deneme meta verilerini depolayacağı yerdir. Sunucuyu başlatırken kalıcı dosya deposunu gösterdiğinden emin olun. Burada deney için basitçe kullanacağız /tmp.

Eski denemeleri çalıştırmak için mlflow sunucusunu kullanmak istersek bunların dosya deposunda bulunması gerektiğini unutmayın. Ancak bu olmasa bile bunları UDF'de kullanabiliriz çünkü yalnızca modelin yoluna ihtiyacımız var.

Not: İzleme Kullanıcı Arayüzü ve model istemcisinin yapı konumuna erişimi olması gerektiğini unutmayın. Yani, İzleme Kullanıcı Arayüzü'nün bir EC2 örneğinde bulunmasına bakılmaksızın, MLflow'u yerel olarak çalıştırırken makinenin yapıt modelleri yazmak için S3'e doğrudan erişimi olması gerekir.

Spark'ı MLflow ile Genişletmek
İzleme kullanıcı arayüzü, yapıları bir S3 klasöründe depolar

Çalışan modeller

İzleme sunucusu çalışır çalışmaz modelleri eğitmeye başlayabilirsiniz.

Örnek olarak, MLflow örneğindeki şarap modifikasyonunu kullanacağız. Sklearn.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Daha önce tartıştığımız gibi MLflow, model parametrelerini, ölçümleri ve yapıtları günlüğe kaydetmenize olanak tanır, böylece yinelemeler boyunca nasıl geliştiklerini takip edebilirsiniz. Bu özellik son derece kullanışlıdır çünkü bu şekilde İzleme sunucusuyla iletişim kurarak veya git karma kayıtlarını kullanarak hangi kodun gerekli yinelemeyi tamamladığını anlayarak en iyi modeli yeniden üretebiliriz.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Spark'ı MLflow ile Genişletmek
Şarap yinelemeleri

Model için sunucu kısmı

“mlflow sunucusu” komutu kullanılarak başlatılan MLflow izleme sunucusu, çalıştırmaları izlemek ve verileri yerel dosya sistemine yazmak için bir REST API'sine sahiptir. "MLFLOW_TRACKING_URI" ortam değişkenini kullanarak izleme sunucusu adresini belirtebilirsiniz; MLflow izleme API'si, başlatma bilgileri, günlük ölçümleri vb. oluşturmak/almak için bu adresteki izleme sunucusuyla otomatik olarak iletişim kuracaktır.

Kaynak: Dokümanlar// Bir izleme sunucusu çalıştırma

Modele bir sunucu sağlamak için çalışan bir izleme sunucusuna (başlatma arayüzüne bakın) ve modelin Çalıştırma Kimliğine ihtiyacımız var.

Spark'ı MLflow ile Genişletmek
Çalıştırma kimliği

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

MLflow sunma işlevini kullanarak modelleri sunmak için, yalnızca belirterek model hakkında bilgi almak amacıyla İzleme Kullanıcı Arayüzü'ne erişmemiz gerekir. --run_id.

Model İzleme sunucusuyla iletişim kurduğunda yeni bir model uç noktası alabiliriz.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Spark'tan model çalıştırma

İzleme sunucusu modellere gerçek zamanlı hizmet verecek kadar güçlü olmasına rağmen onları eğitin ve sunucunun işlevselliğini kullanın (kaynak: mlflow // belgeler // modeller # yerel), Spark kullanımı (toplu veya akış) dağıtım nedeniyle daha da güçlü bir çözümdür.

Eğitimi çevrimdışı yaptığınızı ve ardından çıktı modelini tüm verilerinize uyguladığınızı hayal edin. Spark ve MLflow'un parladığı yer burasıdır.

PySpark + Jupyter + Spark'ı yükleyin

Kaynak: PySpark'ı kullanmaya başlayın - Jupyter

MLflow modellerini Spark veri çerçevelerine nasıl uyguladığımızı göstermek için Jupyter not defterlerini PySpark ile birlikte çalışacak şekilde ayarlamamız gerekiyor.

En son kararlı sürümü yükleyerek başlayın Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

PySpark ve Jupyter'ı sanal ortama yükleyin:

pip install pyspark jupyter

Ortam değişkenlerini ayarlayın:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

Belirledikten sonra notebook-dir, defterlerimizi istenilen klasörde saklayabiliriz.

Jüpyter'i PySpark'tan başlatmak

Jupiter'i PySpark sürücüsü olarak yapılandırabildiğimiz için artık Jupyter notebook'u PySpark bağlamında çalıştırabiliyoruz.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Spark'ı MLflow ile Genişletmek

Yukarıda bahsedildiği gibi MLflow, S3'teki model yapıtlarının günlüğe kaydedilmesine yönelik bir özellik sağlar. Seçilen modeli elimize alır almaz modülü kullanarak UDF olarak içe aktarma olanağına sahip oluyoruz. mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Spark'ı MLflow ile Genişletmek
PySpark - Şarap kalitesi tahminlerinin çıktısını alma

Bu noktaya kadar PySpark'ı MLflow ile nasıl kullanacağımızı, şarap kalitesi tahminlerini tüm şarap veri kümesinde nasıl çalıştıracağımızı konuştuk. Peki ya Scala Spark'ın Python MLflow modüllerini kullanmanız gerekiyorsa?

Bunu Spark bağlamını Scala ve Python arasında bölerek de test ettik. Yani, MLflow UDF'yi Python'a kaydettik ve onu Scala'dan kullandık (evet, belki de en iyi çözüm değil, ama elimizde olan şey).

Scala Spark + MLflow

Bu örnek için ekleyeceğiz Toree Çekirdeği mevcut Jüpiter'in içine.

Spark + Toree + Jupyter'ı yükleyin

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Ekteki not defterinden görebileceğiniz gibi UDF, Spark ve PySpark arasında paylaşılmaktadır. Bu bölümün Scala'yı seven ve makine öğrenimi modellerini üretimde dağıtmak isteyenler için faydalı olacağını umuyoruz.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Takibin Shui

MLflow bu yazının yazıldığı sırada Alpha sürümünde olmasına rağmen oldukça umut verici görünüyor. Birden fazla makine öğrenimi çerçevesini çalıştırma ve bunları tek bir uç noktadan kullanma yeteneği, öneri sistemlerini bir sonraki seviyeye taşır.

Ayrıca MLflow, Veri Mühendisleri ile Veri Bilimi uzmanlarını birbirine yakınlaştırarak aralarında ortak bir katman oluşturur.

MLflow'un bu keşfinden sonra ilerleyeceğimize ve bunu Spark ardışık düzenlerimiz ve öneri sistemlerimiz için kullanacağımıza eminiz.

Dosya depolama alanını dosya sistemi yerine veritabanıyla senkronize etmek güzel olurdu. Bu bize aynı dosya depolama alanını kullanabilecek birden fazla uç nokta sağlamalıdır. Örneğin, birden çok örnek kullanın çabuk и Athena aynı Glue meta deposuyla.

Özetlemek gerekirse, verilerle yaptığımız çalışmayı daha ilginç hale getirdiği için MLFlow topluluğuna teşekkür etmek istiyorum.

MLflow ile uğraşıyorsanız, bize yazmaktan ve onu nasıl kullandığınızı ve hatta üretimde kullanıyorsanız daha fazlasını anlatmaktan çekinmeyin.

Kurslar hakkında daha fazla bilgi edinin:
Makine öğrenme. Temel kurs
Makine öğrenme. İleri düzey kurs

Daha fazla oku:

Kaynak: habr.com

Yorum ekle