Esecuzione di Apache Spark su Kubernetes

Cari lettori, buon pomeriggio. Oggi parleremo un po’ di Apache Spark e delle sue prospettive di sviluppo.

Esecuzione di Apache Spark su Kubernetes

Nel mondo moderno dei Big Data, Apache Spark è lo standard de facto per lo sviluppo di attività di elaborazione dati batch. Inoltre, viene utilizzato anche per creare applicazioni di streaming che funzionano secondo il concetto di micro batch, elaborando e inviando dati in piccole porzioni (Spark Structured Streaming). E tradizionalmente ha fatto parte dello stack Hadoop complessivo, utilizzando YARN (o in alcuni casi Apache Mesos) come gestore delle risorse. Entro il 2020, il suo utilizzo nella sua forma tradizionale sarà in discussione per la maggior parte delle aziende a causa della mancanza di distribuzioni Hadoop decenti: lo sviluppo di HDP e CDH si è interrotto, CDH non è ben sviluppato e ha un costo elevato e i restanti fornitori di Hadoop hanno o hanno cessato di esistere o hanno un futuro incerto. Pertanto, il lancio di Apache Spark utilizzando Kubernetes è di crescente interesse tra la comunità e le grandi aziende: diventando uno standard nell'orchestrazione dei contenitori e nella gestione delle risorse nei cloud privati ​​e pubblici, risolve il problema della scomoda pianificazione delle risorse delle attività Spark su YARN e fornisce una piattaforma in costante sviluppo con molte distribuzioni commerciali e aperte per aziende di ogni dimensione e tipo. Inoltre, sulla scia della popolarità, la maggior parte è già riuscita ad acquisire un paio di installazioni proprie e ad aumentare la propria esperienza nel suo utilizzo, il che semplifica il trasloco.

A partire dalla versione 2.3.0, Apache Spark ha acquisito il supporto ufficiale per l'esecuzione di attività in un cluster Kubernetes e oggi parleremo dell'attuale maturità di questo approccio, delle varie opzioni per il suo utilizzo e delle insidie ​​che si incontreranno durante l'implementazione.

Innanzitutto, diamo un'occhiata al processo di sviluppo di attività e applicazioni basate su Apache Spark ed evidenziamo i casi tipici in cui è necessario eseguire un'attività su un cluster Kubernetes. Nella preparazione di questo post, OpenShift viene utilizzato come distribuzione e verranno forniti i comandi relativi alla sua utilità della riga di comando (oc). Per altre distribuzioni Kubernetes è possibile utilizzare i comandi corrispondenti dell'utilità della riga di comando Kubernetes standard (kubectl) o i loro analoghi (ad esempio, per la policy oc adm).

Primo caso d'uso: invio di spark

Durante lo sviluppo di attività e applicazioni, lo sviluppatore deve eseguire attività per eseguire il debug della trasformazione dei dati. In teoria, gli stub possono essere utilizzati per questi scopi, ma lo sviluppo con la partecipazione di istanze reali (anche se di prova) dei sistemi finali si è dimostrato più veloce e migliore in questa classe di compiti. Nel caso in cui eseguiamo il debug su istanze reali di sistemi finali, sono possibili due scenari:

  • lo sviluppatore esegue un'attività Spark localmente in modalità autonoma;

    Esecuzione di Apache Spark su Kubernetes

  • uno sviluppatore esegue un'attività Spark su un cluster Kubernetes in un ciclo di test.

    Esecuzione di Apache Spark su Kubernetes

La prima opzione ha il diritto di esistere, ma comporta una serie di svantaggi:

  • A ogni sviluppatore deve essere fornito l'accesso dal posto di lavoro a tutte le istanze dei sistemi finali di cui ha bisogno;
  • è necessaria una quantità sufficiente di risorse sulla macchina funzionante per eseguire l'attività in fase di sviluppo.

La seconda opzione non presenta questi svantaggi, poiché l'uso di un cluster Kubernetes consente di allocare il pool di risorse necessario per l'esecuzione delle attività e fornirgli l'accesso necessario alle istanze del sistema finale, fornendo in modo flessibile l'accesso ad esso utilizzando il modello di ruolo Kubernetes per tutti i membri del team di sviluppo. Evidenziamolo come primo caso d'uso: avvio di attività Spark da un computer di sviluppo locale su un cluster Kubernetes in un circuito di test.

Parliamo più approfonditamente del processo di configurazione di Spark per l'esecuzione locale. Per iniziare a utilizzare Spark è necessario installarlo:

mkdir /opt/spark
cd /opt/spark
wget http://mirror.linux-ia64.org/apache/spark/spark-2.4.5/spark-2.4.5.tgz
tar zxvf spark-2.4.5.tgz
rm -f spark-2.4.5.tgz

Raccogliamo i pacchetti necessari per lavorare con Kubernetes:

cd spark-2.4.5/
./build/mvn -Pkubernetes -DskipTests clean package

Una build completa richiede molto tempo e per creare immagini Docker ed eseguirle su un cluster Kubernetes, in realtà hai solo bisogno dei file jar dalla directory "assembly/", quindi puoi creare solo questo sottoprogetto:

./build/mvn -f ./assembly/pom.xml -Pkubernetes -DskipTests clean package

Per eseguire processi Spark su Kubernetes, è necessario creare un'immagine Docker da utilizzare come immagine di base. Ci sono 2 possibili approcci qui:

  • L'immagine Docker generata include il codice dell'attività Spark eseguibile;
  • L'immagine creata include solo Spark e le dipendenze necessarie, il codice eseguibile è ospitato in remoto (ad esempio in HDFS).

Innanzitutto, creiamo un'immagine Docker contenente un esempio di test di un'attività Spark. Per creare immagini Docker, Spark dispone di un'utilità chiamata "docker-image-tool". Studiamo l'aiuto su di esso:

./bin/docker-image-tool.sh --help

Con il suo aiuto, puoi creare immagini Docker e caricarle su registri remoti, ma per impostazione predefinita presenta una serie di svantaggi:

  • crea senza errori 3 immagini Docker contemporaneamente: per Spark, PySpark e R;
  • non consente di specificare un nome di immagine.

Pertanto, utilizzeremo una versione modificata di questa utility indicata di seguito:

vi bin/docker-image-tool-upd.sh

#!/usr/bin/env bash

function error {
  echo "$@" 1>&2
  exit 1
}

if [ -z "${SPARK_HOME}" ]; then
  SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
. "${SPARK_HOME}/bin/load-spark-env.sh"

function image_ref {
  local image="$1"
  local add_repo="${2:-1}"
  if [ $add_repo = 1 ] && [ -n "$REPO" ]; then
    image="$REPO/$image"
  fi
  if [ -n "$TAG" ]; then
    image="$image:$TAG"
  fi
  echo "$image"
}

function build {
  local BUILD_ARGS
  local IMG_PATH

  if [ ! -f "$SPARK_HOME/RELEASE" ]; then
    IMG_PATH=$BASEDOCKERFILE
    BUILD_ARGS=(
      ${BUILD_PARAMS}
      --build-arg
      img_path=$IMG_PATH
      --build-arg
      datagram_jars=datagram/runtimelibs
      --build-arg
      spark_jars=assembly/target/scala-$SPARK_SCALA_VERSION/jars
    )
  else
    IMG_PATH="kubernetes/dockerfiles"
    BUILD_ARGS=(${BUILD_PARAMS})
  fi

  if [ -z "$IMG_PATH" ]; then
    error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
  fi

  if [ -z "$IMAGE_REF" ]; then
    error "Cannot find docker image reference. Please add -i arg."
  fi

  local BINDING_BUILD_ARGS=(
    ${BUILD_PARAMS}
    --build-arg
    base_img=$(image_ref $IMAGE_REF)
  )
  local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/docker/Dockerfile"}

  docker build $NOCACHEARG "${BUILD_ARGS[@]}" 
    -t $(image_ref $IMAGE_REF) 
    -f "$BASEDOCKERFILE" .
}

function push {
  docker push "$(image_ref $IMAGE_REF)"
}

function usage {
  cat <<EOF
Usage: $0 [options] [command]
Builds or pushes the built-in Spark Docker image.

Commands:
  build       Build image. Requires a repository address to be provided if the image will be
              pushed to a different registry.
  push        Push a pre-built image to a registry. Requires a repository address to be provided.

Options:
  -f file               Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
  -p file               Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark.
  -R file               Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark.
  -r repo               Repository address.
  -i name               Image name to apply to the built image, or to identify the image to be pushed.  
  -t tag                Tag to apply to the built image, or to identify the image to be pushed.
  -m                    Use minikube's Docker daemon.
  -n                    Build docker image with --no-cache
  -b arg      Build arg to build or push the image. For multiple build args, this option needs to
              be used separately for each build arg.

Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
available when running applications inside the minikube cluster.

Check the following documentation for more information on using the minikube Docker daemon:

  https://kubernetes.io/docs/getting-started-guides/minikube/#reusing-the-docker-daemon

Examples:
  - Build image in minikube with tag "testing"
    $0 -m -t testing build

  - Build and push image with tag "v2.3.0" to docker.io/myrepo
    $0 -r docker.io/myrepo -t v2.3.0 build
    $0 -r docker.io/myrepo -t v2.3.0 push
EOF
}

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
  usage
  exit 0
fi

REPO=
TAG=
BASEDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
IMAGE_REF=
while getopts f:mr:t:nb:i: option
do
 case "${option}"
 in
 f) BASEDOCKERFILE=${OPTARG};;
 r) REPO=${OPTARG};;
 t) TAG=${OPTARG};;
 n) NOCACHEARG="--no-cache";;
 i) IMAGE_REF=${OPTARG};;
 b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};;
 esac
done

case "${@: -1}" in
  build)
    build
    ;;
  push)
    if [ -z "$REPO" ]; then
      usage
      exit 1
    fi
    push
    ;;
  *)
    usage
    exit 1
    ;;
esac

Con il suo aiuto, assembliamo un'immagine Spark di base contenente un'attività di test per calcolare Pi utilizzando Spark (qui {docker-registry-url} è l'URL del registro delle immagini Docker, {repo} è il nome del repository all'interno del registro, che corrisponde al progetto in OpenShift, {image-name} - nome dell'immagine (se viene utilizzata la separazione delle immagini a tre livelli, ad esempio, come nel registro integrato delle immagini Red Hat OpenShift), {tag} - tag di questo versione dell'immagine):

./bin/docker-image-tool-upd.sh -f resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile -r {docker-registry-url}/{repo} -i {image-name} -t {tag} build

Accedi al cluster OKD utilizzando l'utilità della console (qui {OKD-API-URL} è l'URL dell'API del cluster OKD):

oc login {OKD-API-URL}

Otteniamo il token dell'utente corrente per l'autorizzazione nel registro Docker:

oc whoami -t

Accedi al Docker Registry interno del cluster OKD (usiamo come password il token ottenuto utilizzando il comando precedente):

docker login {docker-registry-url}

Carichiamo l'immagine Docker assemblata nel registro Docker OKD:

./bin/docker-image-tool-upd.sh -r {docker-registry-url}/{repo} -i {image-name} -t {tag} push

Verifichiamo che l'immagine assemblata sia disponibile in OKD. Per fare ciò, aprire nel browser l'URL con l'elenco delle immagini del progetto corrispondente (qui {project} è il nome del progetto all'interno del cluster OpenShift, {OKD-WEBUI-URL} è l'URL della console Web OpenShift ) - https://{OKD-WEBUI-URL}/console /project/{project}/browse/images/{nome-immagine}.

Per eseguire le attività, è necessario creare un account di servizio con i privilegi per eseguire i pod come root (discutiamo di questo punto più avanti):

oc create sa spark -n {project}
oc adm policy add-scc-to-user anyuid -z spark -n {project}

Eseguiamo il comando spark-submit per pubblicare un'attività Spark nel cluster OKD, specificando l'account di servizio creato e l'immagine Docker:

 /opt/spark/bin/spark-submit --name spark-test --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar

Qui:

—name: il nome dell'attività che parteciperà alla formazione del nome dei pod Kubernetes;

—class — classe del file eseguibile, richiamata all'avvio dell'attività;

—conf: parametri di configurazione di Spark;

spark.executor.instances: il numero di esecutori Spark da avviare;

spark.kubernetes.authenticate.driver.serviceAccountName: il nome dell'account del servizio Kubernetes utilizzato all'avvio dei pod (per definire il contesto di sicurezza e le funzionalità durante l'interazione con l'API Kubernetes);

spark.kubernetes.namespace: spazio dei nomi Kubernetes in cui verranno avviati i pod driver ed esecutori;

spark.submit.deployMode — metodo di avvio di Spark (per lo spark-submit standard viene utilizzato il "cluster", per Spark Operator e le versioni successive di Spark "client");

spark.kubernetes.container.image: immagine Docker utilizzata per avviare i pod;

spark.master: URL API Kubernetes (è specificato l'esterno in modo che l'accesso avvenga dalla macchina locale);

local:// è il percorso dell'eseguibile Spark all'interno dell'immagine Docker.

Andiamo al progetto OKD corrispondente e studiamo i pod creati - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods.

Per semplificare il processo di sviluppo, è possibile utilizzare un'altra opzione, in cui viene creata un'immagine di base comune di Spark, utilizzata da tutte le attività per l'esecuzione, e le istantanee dei file eseguibili vengono pubblicate su un dispositivo di archiviazione esterno (ad esempio, Hadoop) e specificate quando si chiama spark-submit come collegamento. In questo caso, puoi eseguire versioni diverse delle attività Spark senza ricostruire le immagini Docker, utilizzando, ad esempio, WebHDFS per pubblicare le immagini. Inviamo una richiesta per creare un file (qui {host} è l'host del servizio WebHDFS, {port} è la porta del servizio WebHDFS, {path-to-file-on-hdfs} è il percorso desiderato del file su HDFS):

curl -i -X PUT "http://{host}:{port}/webhdfs/v1/{path-to-file-on-hdfs}?op=CREATE

Riceverai una risposta come questa (qui {location} è l'URL che deve essere utilizzato per scaricare il file):

HTTP/1.1 307 TEMPORARY_REDIRECT
Location: {location}
Content-Length: 0

Carica il file eseguibile di Spark in HDFS (qui {path-to-local-file} è il percorso del file eseguibile di Spark sull'host corrente):

curl -i -X PUT -T {path-to-local-file} "{location}"

Successivamente, possiamo eseguire spark-submit utilizzando il file Spark caricato su HDFS (qui {class-name} è il nome della classe che deve essere avviata per completare l'attività):

/opt/spark/bin/spark-submit --name spark-test --class {class-name} --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  hdfs://{host}:{port}/{path-to-file-on-hdfs}

Va notato che per accedere a HDFS e garantire che l'attività funzioni, potrebbe essere necessario modificare il Dockerfile e lo script entrypoint.sh: aggiungere una direttiva al Dockerfile per copiare le librerie dipendenti nella directory /opt/spark/jars e includere il file di configurazione HDFS in SPARK_CLASSPATH nell'entrypoint.sh.

Secondo caso d'uso: Apache Livy

Inoltre, quando un'attività viene sviluppata e il risultato deve essere testato, sorge la questione di avviarla come parte del processo CI/CD e di monitorare lo stato della sua esecuzione. Ovviamente, puoi eseguirlo utilizzando una chiamata spark-submit locale, ma ciò complica l'infrastruttura CI/CD poiché richiede l'installazione e la configurazione di Spark sugli agenti/runner del server CI e la configurazione dell'accesso all'API Kubernetes. In questo caso, l'implementazione di destinazione ha scelto di utilizzare Apache Livy come API REST per eseguire attività Spark ospitate all'interno di un cluster Kubernetes. Con il suo aiuto, puoi eseguire attività Spark su un cluster Kubernetes utilizzando normali richieste cURL, che possono essere facilmente implementate in base a qualsiasi soluzione CI, e il suo posizionamento all'interno del cluster Kubernetes risolve il problema dell'autenticazione quando si interagisce con l'API Kubernetes.

Esecuzione di Apache Spark su Kubernetes

Evidenziamolo come secondo caso d'uso: esecuzione di attività Spark come parte di un processo CI/CD su un cluster Kubernetes in un ciclo di test.

Qualcosa su Apache Livy: funziona come un server HTTP che fornisce un'interfaccia Web e un'API RESTful che ti consente di avviare in remoto spark-submit passando i parametri necessari. Tradizionalmente viene fornito come parte di una distribuzione HDP, ma può anche essere distribuito su OKD o qualsiasi altra installazione Kubernetes utilizzando il manifest appropriato e un set di immagini Docker, come questa: github.com/ttauveron/k8s-big-data-experiments/tree/master/livy-spark-2.3. Nel nostro caso, è stata creata un'immagine Docker simile, inclusa la versione Spark 2.4.5 dal seguente Dockerfile:

FROM java:8-alpine

ENV SPARK_HOME=/opt/spark
ENV LIVY_HOME=/opt/livy
ENV HADOOP_CONF_DIR=/etc/hadoop/conf
ENV SPARK_USER=spark

WORKDIR /opt

RUN apk add --update openssl wget bash && 
    wget -P /opt https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz && 
    tar xvzf spark-2.4.5-bin-hadoop2.7.tgz && 
    rm spark-2.4.5-bin-hadoop2.7.tgz && 
    ln -s /opt/spark-2.4.5-bin-hadoop2.7 /opt/spark

RUN wget http://mirror.its.dal.ca/apache/incubator/livy/0.7.0-incubating/apache-livy-0.7.0-incubating-bin.zip && 
    unzip apache-livy-0.7.0-incubating-bin.zip && 
    rm apache-livy-0.7.0-incubating-bin.zip && 
    ln -s /opt/apache-livy-0.7.0-incubating-bin /opt/livy && 
    mkdir /var/log/livy && 
    ln -s /var/log/livy /opt/livy/logs && 
    cp /opt/livy/conf/log4j.properties.template /opt/livy/conf/log4j.properties

ADD livy.conf /opt/livy/conf
ADD spark-defaults.conf /opt/spark/conf/spark-defaults.conf
ADD entrypoint.sh /entrypoint.sh

ENV PATH="/opt/livy/bin:${PATH}"

EXPOSE 8998

ENTRYPOINT ["/entrypoint.sh"]
CMD ["livy-server"]

L'immagine generata può essere creata e caricata nel repository Docker esistente, come il repository OKD interno. Per distribuirlo, utilizzare il seguente manifest ({registry-url} - URL del registro delle immagini Docker, {image-name} - nome dell'immagine Docker, {tag} - tag dell'immagine Docker, {livy-url} - URL desiderato in cui sarà accessibile Livy; il manifest "Route" viene utilizzato se Red Hat OpenShift viene utilizzato come distribuzione Kubernetes, altrimenti viene utilizzato il corrispondente manifest Ingress o Service di tipo NodePort):

---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: livy
  name: livy
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      component: livy
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        component: livy
    spec:
      containers:
        - command:
            - livy-server
          env:
            - name: K8S_API_HOST
              value: localhost
            - name: SPARK_KUBERNETES_IMAGE
              value: 'gnut3ll4/spark:v1.0.14'
          image: '{registry-url}/{image-name}:{tag}'
          imagePullPolicy: Always
          name: livy
          ports:
            - containerPort: 8998
              name: livy-rest
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - mountPath: /var/log/livy
              name: livy-log
            - mountPath: /opt/.livy-sessions/
              name: livy-sessions
            - mountPath: /opt/livy/conf/livy.conf
              name: livy-config
              subPath: livy.conf
            - mountPath: /opt/spark/conf/spark-defaults.conf
              name: spark-config
              subPath: spark-defaults.conf
        - command:
            - /usr/local/bin/kubectl
            - proxy
            - '--port'
            - '8443'
          image: 'gnut3ll4/kubectl-sidecar:latest'
          imagePullPolicy: Always
          name: kubectl
          ports:
            - containerPort: 8443
              name: k8s-api
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      serviceAccount: spark
      serviceAccountName: spark
      terminationGracePeriodSeconds: 30
      volumes:
        - emptyDir: {}
          name: livy-log
        - emptyDir: {}
          name: livy-sessions
        - configMap:
            defaultMode: 420
            items:
              - key: livy.conf
                path: livy.conf
            name: livy-config
          name: livy-config
        - configMap:
            defaultMode: 420
            items:
              - key: spark-defaults.conf
                path: spark-defaults.conf
            name: livy-config
          name: spark-config
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: livy-config
data:
  livy.conf: |-
    livy.spark.deploy-mode=cluster
    livy.file.local-dir-whitelist=/opt/.livy-sessions/
    livy.spark.master=k8s://http://localhost:8443
    livy.server.session.state-retain.sec = 8h
  spark-defaults.conf: 'spark.kubernetes.container.image        "gnut3ll4/spark:v1.0.14"'
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: livy
  name: livy
spec:
  ports:
    - name: livy-rest
      port: 8998
      protocol: TCP
      targetPort: 8998
  selector:
    component: livy
  sessionAffinity: None
  type: ClusterIP
---
apiVersion: route.openshift.io/v1
kind: Route
metadata:
  labels:
    app: livy
  name: livy
spec:
  host: {livy-url}
  port:
    targetPort: livy-rest
  to:
    kind: Service
    name: livy
    weight: 100
  wildcardPolicy: None

Dopo averlo applicato e avviato con successo il pod, l'interfaccia grafica di Livy è disponibile al link: http://{livy-url}/ui. Con Livy, possiamo pubblicare la nostra attività Spark utilizzando una richiesta REST, ad esempio, da Postman. Di seguito viene presentato un esempio di raccolta con richieste (nell'array “args” è possibile passare gli argomenti di configurazione con le variabili necessarie per il funzionamento dell'attività avviata):

{
    "info": {
        "_postman_id": "be135198-d2ff-47b6-a33e-0d27b9dba4c8",
        "name": "Spark Livy",
        "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
    },
    "item": [
        {
            "name": "1 Submit job with jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar", nt"className": "org.apache.spark.examples.SparkPi",nt"numExecutors":1,nt"name": "spark-test-1",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt}n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        },
        {
            "name": "2 Submit job without jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "hdfs://{host}:{port}/{path-to-file-on-hdfs}", nt"className": "{class-name}",nt"numExecutors":1,nt"name": "spark-test-2",nt"proxyUser": "0",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt},nt"args": [ntt"HADOOP_CONF_DIR=/opt/spark/hadoop-conf",ntt"MASTER=k8s://https://kubernetes.default.svc:8443"nt]n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        }
    ],
    "event": [
        {
            "listen": "prerequest",
            "script": {
                "id": "41bea1d0-278c-40c9-ad42-bf2e6268897d",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        },
        {
            "listen": "test",
            "script": {
                "id": "3cdd7736-a885-4a2d-9668-bd75798f4560",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        }
    ],
    "protocolProfileBehavior": {}
}

Eseguiamo la prima richiesta dalla raccolta, andiamo all'interfaccia OKD e controlliamo che l'attività sia stata avviata correttamente - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods. Allo stesso tempo, nell'interfaccia di Livy (http://{livy-url}/ui) apparirà una sessione, all'interno della quale, utilizzando l'API di Livy o l'interfaccia grafica, potrai monitorare lo stato di avanzamento dell'attività e studiare la sessione registri.

Ora mostriamo come funziona Livio. Per fare ciò, esaminiamo i log del contenitore Livy all'interno del pod con il server Livy - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods/{livy-pod-name }?tab=registri. Da loro possiamo vedere che quando si chiama l'API REST di Livy in un contenitore chiamato “livy”, viene eseguito uno spark-submit, simile a quello che abbiamo usato sopra (qui {livy-pod-name} è il nome del pod creato con il server Livy). La raccolta introduce anche una seconda query che consente di eseguire attività che ospitano in remoto un eseguibile Spark utilizzando un server Livy.

Terzo caso d'uso: operatore Spark

Ora che l'attività è stata testata, si pone la questione della sua esecuzione regolare. Il modo nativo per eseguire regolarmente attività in un cluster Kubernetes è l'entità CronJob ed è possibile utilizzarla, ma al momento l'uso degli operatori per gestire le applicazioni in Kubernetes è molto popolare e per Spark esiste un operatore abbastanza maturo, che è anche utilizzato in soluzioni di livello aziendale (ad esempio, Lightbend FastData Platform). Ti consigliamo di utilizzarlo: l'attuale versione stabile di Spark (2.4.5) ha opzioni di configurazione piuttosto limitate per l'esecuzione di attività Spark in Kubernetes, mentre la prossima versione principale (3.0.0) dichiara il pieno supporto per Kubernetes, ma la sua data di rilascio rimane sconosciuta . Spark Operator compensa questa lacuna aggiungendo importanti opzioni di configurazione (ad esempio, montando una ConfigMap con la configurazione di accesso Hadoop sui pod Spark) e la possibilità di eseguire un'attività pianificata regolarmente.

Esecuzione di Apache Spark su Kubernetes
Evidenziamolo come terzo caso d'uso: eseguire regolarmente attività Spark su un cluster Kubernetes in un ciclo di produzione.

Spark Operator è open source e sviluppato all'interno di Google Cloud Platform - github.com/GoogleCloudPlatform/spark-on-k8s-operator. La sua installazione può essere effettuata in 3 modi:

  1. Come parte dell'installazione della piattaforma Lightbend FastData/Cloudflow;
  2. Utilizzo del timone:
    helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
    helm install incubator/sparkoperator --namespace spark-operator
    	

  3. Utilizzando i manifest dal repository ufficiale (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/tree/master/manifest). Vale la pena notare quanto segue: Cloudflow include un operatore con la versione API v1beta1. Se viene utilizzato questo tipo di installazione, le descrizioni del manifesto dell'applicazione Spark devono essere basate su tag di esempio in Git con la versione API appropriata, ad esempio "v1beta1-0.9.0-2.4.0". La versione dell'operatore è reperibile nella descrizione del CRD inclusa nell'operatore nel dizionario “versioni”:
    oc get crd sparkapplications.sparkoperator.k8s.io -o yaml
    	

Se l'operatore è installato correttamente, nel progetto corrispondente verrà visualizzato un pod attivo con l'operatore Spark (ad esempio, cloudflow-fdp-sparkoperator nello spazio Cloudflow per l'installazione Cloudflow) e verrà visualizzato un tipo di risorsa Kubernetes corrispondente denominato "sparkapplications" . Puoi esplorare le applicazioni Spark disponibili con il comando seguente:

oc get sparkapplications -n {project}

Per eseguire attività utilizzando Spark Operator devi fare 3 cose:

  • creare un'immagine Docker che includa tutte le librerie necessarie, nonché i file di configurazione ed eseguibili. Nell'immagine di destinazione, si tratta di un'immagine creata nella fase CI/CD e testata su un cluster di test;
  • pubblicare un'immagine Docker su un registro accessibile dal cluster Kubernetes;
  • generare un manifest con il tipo "SparkApplication" e una descrizione dell'attività da avviare. Manifest di esempio sono disponibili nel repository ufficiale (ad es. github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta1-0.9.0-2.4.0/examples/spark-pi.yaml). Ci sono punti importanti da notare sul manifesto:
    1. il dizionario “apiVersion” deve indicare la versione API corrispondente alla versione dell'operatore;
    2. il dizionario “metadata.namespace” deve indicare il namespace in cui verrà lanciata l'applicazione;
    3. il dizionario “spec.image” deve contenere l'indirizzo dell'immagine Docker creata in un registro accessibile;
    4. il dizionario “spec.mainClass” deve contenere la classe dell'attività Spark che deve essere eseguita all'avvio del processo;
    5. il dizionario “spec.mainApplicationFile” deve contenere il percorso del file jar eseguibile;
    6. il dizionario “spec.sparkVersion” deve indicare la versione di Spark in uso;
    7. il dizionario “spec.driver.serviceAccount” deve specificare l'account del servizio all'interno del corrispondente spazio dei nomi Kubernetes che verrà utilizzato per eseguire l'applicazione;
    8. il dizionario “spec.executor” deve indicare il numero di risorse destinate all'applicazione;
    9. il dizionario "spec.volumeMounts" deve specificare la directory locale in cui verranno creati i file delle attività Spark locali.

Un esempio di generazione di un manifest (qui {spark-service-account} è un account di servizio all'interno del cluster Kubernetes per l'esecuzione di attività Spark):

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: {spark-service-account}
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Questo manifest specifica un account di servizio per il quale, prima di pubblicare il manifest, è necessario creare le associazioni di ruolo necessarie che forniscono i diritti di accesso necessari affinché l'applicazione Spark possa interagire con l'API Kubernetes (se necessario). Nel nostro caso, l'applicazione necessita dei diritti per creare pod. Creiamo l'associazione dei ruoli necessaria:

oc adm policy add-role-to-user edit system:serviceaccount:{project}:{spark-service-account} -n {project}

Vale anche la pena notare che questa specifica manifest può includere un parametro "hadoopConfigMap", che consente di specificare un ConfigMap con la configurazione Hadoop senza dover prima inserire il file corrispondente nell'immagine Docker. È adatto anche per l'esecuzione regolare delle attività: utilizzando il parametro "schedule", è possibile specificare un programma per l'esecuzione di una determinata attività.

Successivamente, salviamo il nostro manifest nel file spark-pi.yaml e lo applichiamo al nostro cluster Kubernetes:

oc apply -f spark-pi.yaml

Questo creerà un oggetto di tipo “sparkapplications”:

oc get sparkapplications -n {project}
> NAME       AGE
> spark-pi   22h

In questo caso verrà creato un pod con un'applicazione, il cui stato verrà visualizzato nelle “sparkapplications” create. Puoi visualizzarlo con il seguente comando:

oc get sparkapplications spark-pi -o yaml -n {project}

Al completamento dell'attività, il POD passerà allo stato "Completato", che verrà aggiornato anche in "sparkapplications". I log dell'applicazione possono essere visualizzati nel browser o utilizzando il seguente comando (qui {sparkapplications-pod-name} è il nome del pod dell'attività in esecuzione):

oc logs {sparkapplications-pod-name} -n {project}

Le attività Spark possono essere gestite anche utilizzando l'utilità specializzata sparkctl. Per installarlo, clona il repository con il suo codice sorgente, installa Go e crea questa utility:

git clone https://github.com/GoogleCloudPlatform/spark-on-k8s-operator.git
cd spark-on-k8s-operator/
wget https://dl.google.com/go/go1.13.3.linux-amd64.tar.gz
tar -xzf go1.13.3.linux-amd64.tar.gz
sudo mv go /usr/local
mkdir $HOME/Projects
export GOROOT=/usr/local/go
export GOPATH=$HOME/Projects
export PATH=$GOPATH/bin:$GOROOT/bin:$PATH
go -version
cd sparkctl
go build -o sparkctl
sudo mv sparkctl /usr/local/bin

Esaminiamo l'elenco delle attività Spark in esecuzione:

sparkctl list -n {project}

Creiamo una descrizione per l'attività Spark:

vi spark-app.yaml

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1000m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Eseguiamo l'attività descritta utilizzando sparkctl:

sparkctl create spark-app.yaml -n {project}

Esaminiamo l'elenco delle attività Spark in esecuzione:

sparkctl list -n {project}

Esaminiamo l'elenco degli eventi di un'attività Spark avviata:

sparkctl event spark-pi -n {project} -f

Esaminiamo lo stato dell'attività Spark in esecuzione:

sparkctl status spark-pi -n {project}

In conclusione, vorrei considerare gli svantaggi scoperti nell'utilizzo dell'attuale versione stabile di Spark (2.4.5) in Kubernetes:

  1. Il primo e, forse, principale svantaggio è la mancanza di Data Locality. Nonostante tutti i limiti di YARN, il suo utilizzo presentava anche dei vantaggi, ad esempio il principio di consegnare il codice ai dati (anziché i dati al codice). Grazie ad esso, le attività Spark sono state eseguite sui nodi in cui si trovavano i dati coinvolti nei calcoli e il tempo necessario per fornire i dati sulla rete è stato notevolmente ridotto. Quando utilizziamo Kubernetes, ci troviamo di fronte alla necessità di spostare i dati coinvolti in un'attività attraverso la rete. Se sono sufficientemente grandi, il tempo di esecuzione dell'attività può aumentare in modo significativo e richiedere anche una quantità abbastanza grande di spazio su disco allocato alle istanze dell'attività Spark per l'archiviazione temporanea. Questo svantaggio può essere mitigato utilizzando software specializzato che garantisca la localizzazione dei dati in Kubernetes (ad esempio Alluxio), ma ciò comporta in realtà la necessità di archiviare una copia completa dei dati sui nodi del cluster Kubernetes.
  2. Il secondo importante svantaggio è la sicurezza. Per impostazione predefinita, le funzionalità relative alla sicurezza relative all'esecuzione delle attività Spark sono disabilitate, l'uso di Kerberos non è trattato nella documentazione ufficiale (sebbene le opzioni corrispondenti siano state introdotte nella versione 3.0.0, che richiederà lavoro aggiuntivo) e la documentazione di sicurezza per utilizzando Spark (https://spark.apache.org/docs/2.4.5/security.html) solo YARN, Mesos e Standalone Cluster appaiono come key store. Allo stesso tempo, l'utente con cui vengono avviate le attività Spark non può essere specificato direttamente: specifichiamo solo l'account di servizio con cui funzionerà e l'utente viene selezionato in base alle politiche di sicurezza configurate. A questo proposito, o viene utilizzato l'utente root, che non è sicuro in un ambiente produttivo, oppure un utente con un UID casuale, il che è scomodo quando si distribuiscono i diritti di accesso ai dati (questo può essere risolto creando PodSecurityPolicies e collegandoli al conti di servizio corrispondenti). Attualmente, la soluzione consiste nel posizionare tutti i file necessari direttamente nell'immagine Docker o modificare lo script di avvio di Spark per utilizzare il meccanismo di archiviazione e recupero dei segreti adottato nella propria organizzazione.
  3. L'esecuzione di processi Spark utilizzando Kubernetes è ufficialmente ancora in modalità sperimentale e in futuro potrebbero esserci modifiche significative negli artefatti utilizzati (file di configurazione, immagini di base Docker e script di avvio). E infatti, durante la preparazione del materiale, sono state testate le versioni 2.3.0 e 2.4.5, il comportamento è stato significativamente diverso.

Aspettiamo gli aggiornamenti: recentemente è stata rilasciata una nuova versione di Spark (3.0.0), che ha apportato modifiche significative al lavoro di Spark su Kubernetes, ma ha mantenuto lo stato sperimentale di supporto per questo gestore di risorse. Forse i prossimi aggiornamenti permetteranno davvero di consigliare pienamente di abbandonare YARN e di eseguire attività Spark su Kubernetes senza timore per la sicurezza del proprio sistema e senza la necessità di modificare autonomamente i componenti funzionali.

Fine

Fonte: habr.com

Aggiungi un commento