Estendendo o Spark com MLflow

Olá, Khabrovitas. Como já escrevemos, este mês a OTUS lança dois cursos de aprendizado de máquina ao mesmo tempo, a saber base и avançado. Nesse sentido, continuamos a compartilhar material útil.

O objetivo deste artigo é falar sobre nossa primeira experiência com Fluxo de ML.

Vamos começar a revisão Fluxo de ML de seu servidor de rastreamento e prologue todas as iterações do estudo. Em seguida, compartilharemos a experiência de conectar o Spark ao MLflow usando UDF.

Contexto

Estamos em Saúde Alfa usamos aprendizado de máquina e inteligência artificial para capacitar as pessoas a cuidar de sua saúde e bem-estar. É por isso que os modelos de aprendizado de máquina estão no centro dos produtos de dados que desenvolvemos e é por isso que o MLflow, uma plataforma de código aberto que abrange todos os aspectos do ciclo de vida do aprendizado de máquina, chamou nossa atenção.

Fluxo de ML

O principal objetivo do MLflow é fornecer uma camada adicional sobre o aprendizado de máquina que permitiria que os cientistas de dados trabalhassem com praticamente qualquer biblioteca de aprendizado de máquina (h2o, keras, pulo, pytorch, aprender и fluxo tensor), levando seu trabalho para o próximo nível.

O MLflow fornece três componentes:

  • Rastreamento – registos e pedidos de experiências: código, dados, configuração e resultados. É muito importante seguir o processo de criação de um modelo.
  • Projectos – Formato de empacotamento para rodar em qualquer plataforma (por exemplo, SageMaker)
  • Modelos é um formato comum para enviar modelos para várias ferramentas de implantação.

O MLflow (alfa no momento da redação) é uma plataforma de software livre que permite gerenciar o ciclo de vida do aprendizado de máquina, incluindo experimentação, reutilização e implantação.

Configurando o MLflow

Para usar o MLflow, primeiro você deve configurar todo o ambiente Python, para isso usaremos PyEnvGenericName (para instalar o Python em um Mac, dê uma olhada aqui). Assim podemos criar um ambiente virtual onde iremos instalar todas as bibliotecas necessárias para rodar.

```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```

Instale as bibliotecas necessárias.

```
pip install mlflow==0.7.0 
            Cython==0.29  
            numpy==1.14.5 
            pandas==0.23.4 
            pyarrow==0.11.0
```

Observação: estamos usando o PyArrow para executar modelos como UDFs. As versões do PyArrow e Numpy precisavam ser corrigidas porque as versões mais recentes estavam em conflito umas com as outras.

Iniciando a interface do usuário de rastreamento

MLflow Tracking nos permite registrar e consultar experimentos com Python e DESCANSO API. Além disso, você pode definir onde armazenar os artefatos do modelo (localhost, Amazon S3, Armazenamento de Blob do Azure, Google Cloud Storage ou servidor SFTP). Como usamos a AWS na Alpha Health, o S3 será o armazenamento dos artefatos.

# Running a Tracking Server
mlflow server 
    --file-store /tmp/mlflow/fileStore 
    --default-artifact-root s3://<bucket>/mlflow/artifacts/ 
    --host localhost
    --port 5000

MLflow recomenda o uso de armazenamento de arquivo persistente. O armazenamento de arquivos é onde o servidor armazenará os metadados de execução e experimento. Ao iniciar o servidor, verifique se ele aponta para armazenamento de arquivo persistente. Aqui, para fins de experiência, usaremos simplesmente /tmp.

Lembre-se de que, se quisermos usar o servidor mlflow para executar experimentos antigos, eles devem estar presentes no armazenamento de arquivos. Porém, mesmo sem isso, poderíamos utilizá-los na UDF, pois só precisamos do caminho para o modelo.

Observação: lembre-se de que a interface do usuário de rastreamento e o cliente modelo devem ter acesso ao local do artefato. Ou seja, independentemente de a Tracking UI estar localizada em uma instância do EC2, ao executar o MLflow localmente, a máquina deve ter acesso direto ao S3 para escrever modelos de artefatos.

Estendendo o Spark com MLflow
A IU de rastreamento armazena artefatos no bucket S3

Modelos de corrida

Assim que o servidor de rastreamento estiver em execução, você poderá começar a treinar os modelos.

Como exemplo, usaremos a modificação wine do exemplo MLflow em SklearnName.

MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py 
  --alpha 0.9
  --l1_ration 0.5
  --wine_file ./data/winequality-red.csv

Como dissemos, o MLflow permite registrar os parâmetros, métricas e artefatos de modelo para que você possa acompanhar como eles se desenvolvem como iterações. Esse recurso é extremamente útil, pois nos permite reproduzir o melhor modelo entrando em contato com o servidor de Tracking ou entendendo qual código realizou a iteração necessária usando os git hash logs dos commits.

with mlflow.start_run():

    ... model ...

    mlflow.log_param("source", wine_path)
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    mlflow.set_tag('domain', 'wine')
    mlflow.set_tag('predict', 'quality')
    mlflow.sklearn.log_model(lr, "model")

Estendendo o Spark com MLflow
iterações de vinho

Back-end para o modelo

O servidor de rastreamento MLflow iniciado com o comando “servidor mlflow” tem uma API REST para rastrear execuções e gravar dados no sistema de arquivos local. Você pode especificar o endereço do servidor de rastreamento usando a variável de ambiente "MLFLOW_TRACKING_URI" e a API de rastreamento MLflow entrará em contato automaticamente com o servidor de rastreamento neste endereço para criar/obter informações de execução, métricas de registro, etc.

Fonte: Docs// Executando um servidor de rastreamento

Para fornecer um servidor ao modelo, precisamos de um servidor de rastreamento em execução (consulte a interface de inicialização) e o Run ID do modelo.

Estendendo o Spark com MLflow
ID de execução

# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve 
  --port 5005  
  --run_id 0f8691808e914d1087cf097a08730f17 
  --model-path model

Para servir modelos usando a funcionalidade de servir MLflow, precisamos acessar a interface do usuário de rastreamento para obter informações sobre o modelo simplesmente especificando --run_id.

Depois que o modelo entrar em contato com o servidor de rastreamento, podemos obter um novo ponto de extremidade do modelo.

# Query Tracking Server Endpoint
curl -X POST 
  http://127.0.0.1:5005/invocations 
  -H 'Content-Type: application/json' 
  -d '[
	{
		"fixed acidity": 3.42, 
		"volatile acidity": 1.66, 
		"citric acid": 0.48, 
		"residual sugar": 4.2, 
		"chloridessssss": 0.229, 
		"free sulfur dsioxide": 19, 
		"total sulfur dioxide": 25, 
		"density": 1.98, 
		"pH": 5.33, 
		"sulphates": 4.39, 
		"alcohol": 10.8
	}
]'

> {"predictions": [5.825055635303461]}

Executando modelos do Spark

Apesar do servidor de rastreamento ser poderoso o suficiente para atender modelos em tempo real, treine-os e use a funcionalidade do servidor (fonte: mlflow // docs // models #local), usando Spark (batch ou streaming) é uma solução ainda mais poderosa devido à sua distribuição.

Imagine que você acabou de fazer um treinamento off-line e depois aplicou o modelo de saída a todos os seus dados. É aqui que o Spark e o MLflow se destacam.

Instalar PySpark + Jupyter + Spark

Fonte: Introdução PySpark - Jupyter

Para mostrar como aplicamos os modelos MLflow aos dataframes do Spark, precisamos configurar os notebooks Jupyter para trabalhar com o PySpark.

Comece instalando a última versão estável Apache Spark:

cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀

Instale o PySpark e o Jupyter em um ambiente virtual:

pip install pyspark jupyter

Configure variáveis ​​de ambiente:

export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"

tendo definido notebook-dir, poderemos armazenar nossos notebooks na pasta desejada.

Executando o Jupyter do PySpark

Como conseguimos configurar o Júpiter como um driver PySpark, agora podemos executar o notebook Jupyter em um contexto PySpark.

(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]

    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745

Estendendo o Spark com MLflow

Conforme mencionado acima, o MLflow fornece a função de registrar artefatos de modelo no S3. Assim que tivermos o modelo selecionado em mãos, temos a oportunidade de importá-lo como UDF usando o módulo mlflow.pyfunc.

import mlflow.pyfunc

model_path = 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model'
wine_path = '/Users/afranzi/Projects/data/winequality-red.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
            "residual sugar", "chlorides", "free sulfur dioxide",
            "total sulfur dioxide", "density", "pH",
            "sulphates", "alcohol"
          ]
          
df.withColumn('prediction', wine_udf(*columns)).show(100, False)

Estendendo o Spark com MLflow
PySpark - Prevendo a qualidade do vinho

Até este ponto, falamos sobre como usar o PySpark com MLflow executando a previsão da qualidade do vinho em todo o conjunto de dados do vinho. Mas e se você precisar usar os módulos Python MLflow do Scala Spark?

Também testamos isso dividindo o contexto Spark entre Scala e Python. Ou seja, registramos o MLflow UDF em Python e o usamos do Scala (sim, talvez não seja a melhor solução, mas o que temos).

Scala Spark + MLflow

Para este exemplo, adicionaremos Núcleo Toree em um Júpiter existente.

Instalar Spark + Toree + Jupyter

pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
  apache_toree_scala    /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
  python3               /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```

Como você pode ver no notebook anexo, o UDF é compartilhado entre o Spark e o PySpark. Esperamos que esta parte seja útil para aqueles que amam Scala e desejam implantar modelos de aprendizado de máquina na produção.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex

val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r

def getFieldAlias(field_name: String): String = {
    FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}

def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
    val fieldsToSelect: List[Column] = columns.map(field =>
        col(field).as(getFieldAlias(field))
    )
    df.select(fieldsToSelect: _*)
}

def normalizeSchema(df: DataFrame): DataFrame = {
    val schema = df.columns.toList
    df.transform(selectFieldsNormalized(schema))
}

FirstAtRe = ^_
AliasRe = [s_.:@]+

getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"

winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
              .format("csv")
              .option("header", "true")
              .option("delimiter", ";")
              .load(winePath)
              .transform(normalizeSchema)

df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc

model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)

spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
<function spark_udf.<locals>.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT 
    quality,
    wineQuality(
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        pH,
        sulphates,
        alcohol
    ) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      5| 5.576883967129615|
|      5|  5.50664776916154|
|      5| 5.525504822954496|
|      6| 5.504311247097457|
|      5| 5.576883967129615|
|      5|5.5556903912725755|
|      5| 5.467882654744997|
|      7| 5.710602976324739|
|      7| 5.657319539336507|
|      5| 5.345098606538708|
+-------+------------------+

In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)

+-----------+--------+-----------+---------+-----------+
|name       |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null    |null       |null     |true       |
+-----------+--------+-----------+---------+-----------+

Próximos passos

Mesmo que o MLflow esteja em Alpha no momento da escrita, parece bastante promissor. Ser capaz de executar várias estruturas de aprendizado de máquina e usá-las a partir de um único endpoint leva os sistemas de recomendação para o próximo nível.

Além disso, o MLflow aproxima engenheiros de dados e cientistas de dados, estabelecendo uma camada comum entre eles.

Após essa exploração do MLflow, com certeza iremos em frente e o usaremos para nossos pipelines Spark e sistemas de recomendação.

Seria bom sincronizar o armazenamento de arquivos com o banco de dados em vez do sistema de arquivos. Isso deve nos fornecer vários pontos de extremidade que podem usar o mesmo compartilhamento de arquivo. Por exemplo, use várias instâncias Presto и Atena com o mesmo metastore Glue.

Resumindo, gostaria de agradecer à comunidade MLFlow por tornar nosso trabalho com dados mais interessante.

Se você joga com MLflow, sinta-se à vontade para nos escrever e nos contar como você o usa, e ainda mais se você o usa em produção.

Saiba mais sobre os cursos:
aprendizado de máquina. Curso básico
aprendizado de máquina. curso avançado

Consulte Mais informação:

Fonte: habr.com

Adicionar um comentário