Ausführen von Apache Spark auf Kubernetes

Liebe Leserinnen und Leser, guten Tag. Heute werden wir ein wenig über Apache Spark und seine Entwicklungsperspektiven sprechen.

Ausführen von Apache Spark auf Kubernetes

In der modernen Welt von Big Data ist Apache Spark der De-facto-Standard für die Entwicklung von Batch-Datenverarbeitungsaufgaben. Darüber hinaus werden damit auch Streaming-Anwendungen erstellt, die im Micro-Batch-Konzept arbeiten und Daten in kleinen Portionen verarbeiten und versenden (Spark Structured Streaming). Und traditionell ist es Teil des gesamten Hadoop-Stacks und verwendet YARN (oder in einigen Fällen Apache Mesos) als Ressourcenmanager. Bis zum Jahr 2020 ist die Nutzung in seiner traditionellen Form für die meisten Unternehmen aufgrund des Mangels an angemessenen Hadoop-Distributionen fraglich – die Entwicklung von HDP und CDH wurde gestoppt, CDH ist nicht gut entwickelt und verursacht hohe Kosten, und die übrigen Hadoop-Anbieter haben dies getan entweder aufgehört zu existieren oder eine düstere Zukunft haben. Daher stößt die Einführung von Apache Spark mithilfe von Kubernetes auf zunehmendes Interesse in der Community und bei großen Unternehmen. Es wird zum Standard für die Container-Orchestrierung und das Ressourcenmanagement in privaten und öffentlichen Clouds und löst das Problem der umständlichen Ressourcenplanung von Spark-Aufgaben auf YARN eine sich stetig weiterentwickelnde Plattform mit vielen kommerziellen und offenen Distributionen für Unternehmen aller Größen und Branchen. Darüber hinaus ist es den meisten im Zuge der Beliebtheit bereits gelungen, einige eigene Installationen zu erwerben und ihre Kenntnisse in der Nutzung zu erweitern, was den Umzug vereinfacht.

Ab Version 2.3.0 erhielt Apache Spark offizielle Unterstützung für die Ausführung von Aufgaben in einem Kubernetes-Cluster. Heute werden wir über den aktuellen Reifegrad dieses Ansatzes, verschiedene Optionen für seine Verwendung und Fallstricke sprechen, die bei der Implementierung auftreten werden.

Schauen wir uns zunächst den Prozess der Aufgaben- und Anwendungsentwicklung auf Basis von Apache Spark an und beleuchten typische Fälle, in denen Sie eine Aufgabe auf einem Kubernetes-Cluster ausführen müssen. Bei der Vorbereitung dieses Beitrags wird OpenShift als Distribution verwendet und es werden Befehle gegeben, die für sein Befehlszeilenprogramm (oc) relevant sind. Für andere Kubernetes-Distributionen können die entsprechenden Befehle des standardmäßigen Kubernetes-Befehlszeilendienstprogramms (kubectl) oder deren Analoga (z. B. für die OC-ADM-Richtlinie) verwendet werden.

Erster Anwendungsfall – Spark-Submit

Während der Entwicklung von Aufgaben und Anwendungen muss der Entwickler Aufgaben ausführen, um die Datentransformation zu debuggen. Theoretisch können für diese Zwecke Stubs verwendet werden, aber die Entwicklung unter Beteiligung realer (wenn auch testweiser) Instanzen von Endsystemen hat sich bei dieser Aufgabenklasse als schneller und besser erwiesen. Wenn wir auf realen Instanzen von Endsystemen debuggen, sind zwei Szenarien möglich:

  • Der Entwickler führt eine Spark-Aufgabe lokal im Standalone-Modus aus.

    Ausführen von Apache Spark auf Kubernetes

  • Ein Entwickler führt eine Spark-Aufgabe auf einem Kubernetes-Cluster in einer Testschleife aus.

    Ausführen von Apache Spark auf Kubernetes

Die erste Variante hat ihre Daseinsberechtigung, bringt jedoch eine Reihe von Nachteilen mit sich:

  • Jedem Entwickler muss vom Arbeitsplatz aus Zugriff auf alle Instanzen der von ihm benötigten Endsysteme gewährt werden;
  • Auf der Arbeitsmaschine sind ausreichend Ressourcen erforderlich, um die zu entwickelnde Aufgabe auszuführen.

Die zweite Option weist diese Nachteile nicht auf, da Sie durch die Verwendung eines Kubernetes-Clusters den erforderlichen Ressourcenpool für die Ausführung von Aufgaben zuweisen und ihm den erforderlichen Zugriff auf Endsysteminstanzen gewähren können, indem Sie mithilfe des Kubernetes-Rollenmodells flexibel Zugriff darauf gewähren alle Mitglieder des Entwicklungsteams. Lassen Sie uns es als ersten Anwendungsfall hervorheben – das Starten von Spark-Aufgaben von einem lokalen Entwicklercomputer auf einem Kubernetes-Cluster in einer Testschleife.

Lassen Sie uns mehr über den Prozess der Einrichtung von Spark für die lokale Ausführung sprechen. Um Spark verwenden zu können, müssen Sie es installieren:

mkdir /opt/spark
cd /opt/spark
wget http://mirror.linux-ia64.org/apache/spark/spark-2.4.5/spark-2.4.5.tgz
tar zxvf spark-2.4.5.tgz
rm -f spark-2.4.5.tgz

Wir sammeln die notwendigen Pakete für die Arbeit mit Kubernetes:

cd spark-2.4.5/
./build/mvn -Pkubernetes -DskipTests clean package

Ein vollständiger Build nimmt viel Zeit in Anspruch, und um Docker-Images zu erstellen und auf einem Kubernetes-Cluster auszuführen, benötigen Sie eigentlich nur JAR-Dateien aus dem Verzeichnis „assembly/“, sodass Sie nur dieses Unterprojekt erstellen können:

./build/mvn -f ./assembly/pom.xml -Pkubernetes -DskipTests clean package

Um Spark-Jobs auf Kubernetes auszuführen, müssen Sie ein Docker-Image erstellen, das Sie als Basis-Image verwenden können. Hier gibt es 2 mögliche Vorgehensweisen:

  • Das generierte Docker-Image enthält den ausführbaren Spark-Task-Code;
  • Das erstellte Image enthält nur Spark und die notwendigen Abhängigkeiten, der ausführbare Code wird remote gehostet (z. B. in HDFS).

Lassen Sie uns zunächst ein Docker-Image erstellen, das ein Testbeispiel einer Spark-Aufgabe enthält. Zum Erstellen von Docker-Images verfügt Spark über ein Dienstprogramm namens „docker-image-tool“. Schauen wir uns die Hilfe dazu an:

./bin/docker-image-tool.sh --help

Mit seiner Hilfe können Sie Docker-Images erstellen und in Remote-Registrierungen hochladen, aber standardmäßig hat es eine Reihe von Nachteilen:

  • erstellt unbedingt 3 Docker-Images gleichzeitig – für Spark, PySpark und R;
  • erlaubt Ihnen nicht, einen Bildnamen anzugeben.

Daher verwenden wir eine modifizierte Version dieses unten aufgeführten Dienstprogramms:

vi bin/docker-image-tool-upd.sh

#!/usr/bin/env bash

function error {
  echo "$@" 1>&2
  exit 1
}

if [ -z "${SPARK_HOME}" ]; then
  SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
. "${SPARK_HOME}/bin/load-spark-env.sh"

function image_ref {
  local image="$1"
  local add_repo="${2:-1}"
  if [ $add_repo = 1 ] && [ -n "$REPO" ]; then
    image="$REPO/$image"
  fi
  if [ -n "$TAG" ]; then
    image="$image:$TAG"
  fi
  echo "$image"
}

function build {
  local BUILD_ARGS
  local IMG_PATH

  if [ ! -f "$SPARK_HOME/RELEASE" ]; then
    IMG_PATH=$BASEDOCKERFILE
    BUILD_ARGS=(
      ${BUILD_PARAMS}
      --build-arg
      img_path=$IMG_PATH
      --build-arg
      datagram_jars=datagram/runtimelibs
      --build-arg
      spark_jars=assembly/target/scala-$SPARK_SCALA_VERSION/jars
    )
  else
    IMG_PATH="kubernetes/dockerfiles"
    BUILD_ARGS=(${BUILD_PARAMS})
  fi

  if [ -z "$IMG_PATH" ]; then
    error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
  fi

  if [ -z "$IMAGE_REF" ]; then
    error "Cannot find docker image reference. Please add -i arg."
  fi

  local BINDING_BUILD_ARGS=(
    ${BUILD_PARAMS}
    --build-arg
    base_img=$(image_ref $IMAGE_REF)
  )
  local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/docker/Dockerfile"}

  docker build $NOCACHEARG "${BUILD_ARGS[@]}" 
    -t $(image_ref $IMAGE_REF) 
    -f "$BASEDOCKERFILE" .
}

function push {
  docker push "$(image_ref $IMAGE_REF)"
}

function usage {
  cat <<EOF
Usage: $0 [options] [command]
Builds or pushes the built-in Spark Docker image.

Commands:
  build       Build image. Requires a repository address to be provided if the image will be
              pushed to a different registry.
  push        Push a pre-built image to a registry. Requires a repository address to be provided.

Options:
  -f file               Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
  -p file               Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark.
  -R file               Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark.
  -r repo               Repository address.
  -i name               Image name to apply to the built image, or to identify the image to be pushed.  
  -t tag                Tag to apply to the built image, or to identify the image to be pushed.
  -m                    Use minikube's Docker daemon.
  -n                    Build docker image with --no-cache
  -b arg      Build arg to build or push the image. For multiple build args, this option needs to
              be used separately for each build arg.

Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
available when running applications inside the minikube cluster.

Check the following documentation for more information on using the minikube Docker daemon:

  https://kubernetes.io/docs/getting-started-guides/minikube/#reusing-the-docker-daemon

Examples:
  - Build image in minikube with tag "testing"
    $0 -m -t testing build

  - Build and push image with tag "v2.3.0" to docker.io/myrepo
    $0 -r docker.io/myrepo -t v2.3.0 build
    $0 -r docker.io/myrepo -t v2.3.0 push
EOF
}

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
  usage
  exit 0
fi

REPO=
TAG=
BASEDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
IMAGE_REF=
while getopts f:mr:t:nb:i: option
do
 case "${option}"
 in
 f) BASEDOCKERFILE=${OPTARG};;
 r) REPO=${OPTARG};;
 t) TAG=${OPTARG};;
 n) NOCACHEARG="--no-cache";;
 i) IMAGE_REF=${OPTARG};;
 b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};;
 esac
done

case "${@: -1}" in
  build)
    build
    ;;
  push)
    if [ -z "$REPO" ]; then
      usage
      exit 1
    fi
    push
    ;;
  *)
    usage
    exit 1
    ;;
esac

Mit seiner Hilfe stellen wir ein einfaches Spark-Image zusammen, das eine Testaufgabe zur Berechnung von Pi mit Spark enthält (hier ist {docker-registry-url} die URL Ihrer Docker-Image-Registrierung, {repo} der Name des Repositorys innerhalb der Registrierung). was mit dem Projekt in OpenShift übereinstimmt, {image-name} – Name des Bildes (wenn eine dreistufige Trennung von Bildern verwendet wird, wie zum Beispiel in der integrierten Registrierung von Red Hat OpenShift-Bildern), {tag} – Tag davon Version des Bildes):

./bin/docker-image-tool-upd.sh -f resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile -r {docker-registry-url}/{repo} -i {image-name} -t {tag} build

Melden Sie sich mit dem Konsolendienstprogramm beim OKD-Cluster an (hier ist {OKD-API-URL} die OKD-Cluster-API-URL):

oc login {OKD-API-URL}

Lassen Sie uns das Token des aktuellen Benutzers zur Autorisierung in der Docker-Registrierung abrufen:

oc whoami -t

Melden Sie sich bei der internen Docker-Registrierung des OKD-Clusters an (wir verwenden das mit dem vorherigen Befehl erhaltene Token als Passwort):

docker login {docker-registry-url}

Laden wir das zusammengestellte Docker-Image in die Docker-Registrierung OKD hoch:

./bin/docker-image-tool-upd.sh -r {docker-registry-url}/{repo} -i {image-name} -t {tag} push

Überprüfen wir, ob das zusammengestellte Bild in OKD verfügbar ist. Öffnen Sie dazu im Browser die URL mit einer Liste von Bildern des entsprechenden Projekts (hier ist {project} der Name des Projekts innerhalb des OpenShift-Clusters, {OKD-WEBUI-URL} ist die URL der OpenShift-Webkonsole ) – https://{OKD-WEBUI-URL}/console /project/{project}/browse/images/{image-name}.

Um Aufgaben auszuführen, muss ein Dienstkonto mit den Rechten erstellt werden, Pods als Root auszuführen (wir werden diesen Punkt später besprechen):

oc create sa spark -n {project}
oc adm policy add-scc-to-user anyuid -z spark -n {project}

Führen wir den Befehl spark-submit aus, um eine Spark-Aufgabe im OKD-Cluster zu veröffentlichen, und geben dabei das erstellte Dienstkonto und das Docker-Image an:

 /opt/spark/bin/spark-submit --name spark-test --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar

Hier:

—name — der Name der Aufgabe, die an der Bildung des Namens der Kubernetes-Pods beteiligt ist;

—class — Klasse der ausführbaren Datei, die beim Start der Aufgabe aufgerufen wird;

–conf – Spark-Konfigurationsparameter;

spark.executor.instances – die Anzahl der zu startenden Spark-Executoren;

spark.kubernetes.authenticate.driver.serviceAccountName – der Name des Kubernetes-Dienstkontos, das beim Starten von Pods verwendet wird (um den Sicherheitskontext und die Funktionen bei der Interaktion mit der Kubernetes-API zu definieren);

spark.kubernetes.namespace – Kubernetes-Namespace, in dem Treiber- und Executor-Pods gestartet werden;

spark.submit.deployMode – Methode zum Starten von Spark (für Standard-Spark-Submit wird „Cluster“ verwendet, für Spark-Operator und spätere Versionen von Spark „Client“);

spark.kubernetes.container.image – Docker-Image, das zum Starten von Pods verwendet wird;

spark.master – Kubernetes-API-URL (extern wird angegeben, sodass der Zugriff vom lokalen Computer aus erfolgt);

local:// ist der Pfad zur ausführbaren Spark-Datei im Docker-Image.

Wir gehen zum entsprechenden OKD-Projekt und studieren die erstellten Pods – https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods.

Um den Entwicklungsprozess zu vereinfachen, kann eine andere Option verwendet werden, bei der ein gemeinsames Basis-Image von Spark erstellt wird, das von allen Aufgaben zur Ausführung verwendet wird, und Snapshots ausführbarer Dateien auf einem externen Speicher (z. B. Hadoop) veröffentlicht und beim Aufruf angegeben werden spark-submit als Link. In diesem Fall können Sie verschiedene Versionen von Spark-Aufgaben ausführen, ohne Docker-Images neu zu erstellen, indem Sie beispielsweise WebHDFS zum Veröffentlichen von Bildern verwenden. Wir senden eine Anfrage zum Erstellen einer Datei (hier ist {host} der Host des WebHDFS-Dienstes, {port} ist der Port des WebHDFS-Dienstes, {path-to-file-on-hdfs} ist der gewünschte Pfad zur Datei auf HDFS):

curl -i -X PUT "http://{host}:{port}/webhdfs/v1/{path-to-file-on-hdfs}?op=CREATE

Sie erhalten eine Antwort wie diese (hier ist {location} die URL, die zum Herunterladen der Datei verwendet werden muss):

HTTP/1.1 307 TEMPORARY_REDIRECT
Location: {location}
Content-Length: 0

Laden Sie die ausführbare Spark-Datei in HDFS (hier ist {path-to-local-file} der Pfad zur ausführbaren Spark-Datei auf dem aktuellen Host):

curl -i -X PUT -T {path-to-local-file} "{location}"

Danach können wir Spark-Submit mit der auf HDFS hochgeladenen Spark-Datei durchführen (hier ist {class-name} der Name der Klasse, die gestartet werden muss, um die Aufgabe abzuschließen):

/opt/spark/bin/spark-submit --name spark-test --class {class-name} --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  hdfs://{host}:{port}/{path-to-file-on-hdfs}

Es ist zu beachten, dass Sie möglicherweise die Docker-Datei und das Skript „entrypoint.sh“ ändern müssen, um auf HDFS zuzugreifen und sicherzustellen, dass die Aufgabe funktioniert. Fügen Sie der Docker-Datei eine Anweisung hinzu, um abhängige Bibliotheken in das Verzeichnis /opt/spark/jars zu kopieren Fügen Sie die HDFS-Konfigurationsdatei in SPARK_CLASSPATH im Einstiegspunkt ein. sh.

Zweiter Anwendungsfall – Apache Livy

Wenn eine Aufgabe entwickelt wird und das Ergebnis getestet werden muss, stellt sich außerdem die Frage, ob sie als Teil des CI/CD-Prozesses gestartet und der Status ihrer Ausführung verfolgt werden soll. Natürlich können Sie es mit einem lokalen Spark-Submit-Aufruf ausführen, aber das verkompliziert die CI/CD-Infrastruktur, da Spark auf den CI-Server-Agenten/Runnern installiert und konfiguriert und der Zugriff auf die Kubernetes-API eingerichtet werden muss. Für diesen Fall hat sich die Zielimplementierung dafür entschieden, Apache Livy als REST-API zum Ausführen von Spark-Aufgaben zu verwenden, die in einem Kubernetes-Cluster gehostet werden. Mit seiner Hilfe können Sie Spark-Aufgaben auf einem Kubernetes-Cluster mit regulären cURL-Anfragen ausführen, was auf Basis jeder CI-Lösung leicht implementiert werden kann, und seine Platzierung innerhalb des Kubernetes-Clusters löst das Problem der Authentifizierung bei der Interaktion mit der Kubernetes-API.

Ausführen von Apache Spark auf Kubernetes

Lassen Sie uns es als zweiten Anwendungsfall hervorheben – das Ausführen von Spark-Aufgaben als Teil eines CI/CD-Prozesses auf einem Kubernetes-Cluster in einer Testschleife.

Ein wenig über Apache Livy – es funktioniert als HTTP-Server, der eine Webschnittstelle und eine RESTful-API bereitstellt, die es Ihnen ermöglicht, Spark-Submit aus der Ferne zu starten, indem Sie die erforderlichen Parameter übergeben. Traditionell wurde es als Teil einer HDP-Distribution ausgeliefert, kann aber auch auf OKD oder jeder anderen Kubernetes-Installation bereitgestellt werden, indem das entsprechende Manifest und eine Reihe von Docker-Images verwendet werden, wie zum Beispiel dieses – github.com/ttauveron/k8s-big-data-experiments/tree/master/livy-spark-2.3. Für unseren Fall wurde ein ähnliches Docker-Image erstellt, einschließlich der Spark-Version 2.4.5 aus der folgenden Docker-Datei:

FROM java:8-alpine

ENV SPARK_HOME=/opt/spark
ENV LIVY_HOME=/opt/livy
ENV HADOOP_CONF_DIR=/etc/hadoop/conf
ENV SPARK_USER=spark

WORKDIR /opt

RUN apk add --update openssl wget bash && 
    wget -P /opt https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz && 
    tar xvzf spark-2.4.5-bin-hadoop2.7.tgz && 
    rm spark-2.4.5-bin-hadoop2.7.tgz && 
    ln -s /opt/spark-2.4.5-bin-hadoop2.7 /opt/spark

RUN wget http://mirror.its.dal.ca/apache/incubator/livy/0.7.0-incubating/apache-livy-0.7.0-incubating-bin.zip && 
    unzip apache-livy-0.7.0-incubating-bin.zip && 
    rm apache-livy-0.7.0-incubating-bin.zip && 
    ln -s /opt/apache-livy-0.7.0-incubating-bin /opt/livy && 
    mkdir /var/log/livy && 
    ln -s /var/log/livy /opt/livy/logs && 
    cp /opt/livy/conf/log4j.properties.template /opt/livy/conf/log4j.properties

ADD livy.conf /opt/livy/conf
ADD spark-defaults.conf /opt/spark/conf/spark-defaults.conf
ADD entrypoint.sh /entrypoint.sh

ENV PATH="/opt/livy/bin:${PATH}"

EXPOSE 8998

ENTRYPOINT ["/entrypoint.sh"]
CMD ["livy-server"]

Das generierte Image kann erstellt und in Ihr vorhandenes Docker-Repository hochgeladen werden, beispielsweise das interne OKD-Repository. Um es bereitzustellen, verwenden Sie das folgende Manifest ({registry-url} – URL der Docker-Image-Registrierung, {image-name} – Docker-Image-Name, {tag} – Docker-Image-Tag, {livy-url} – gewünschte URL, wo die Der Server ist auf Livy zugreifbar; das „Route“-Manifest wird verwendet, wenn Red Hat OpenShift als Kubernetes-Distribution verwendet wird, andernfalls wird das entsprechende Ingress- oder Service-Manifest vom Typ NodePort verwendet):

---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: livy
  name: livy
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      component: livy
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        component: livy
    spec:
      containers:
        - command:
            - livy-server
          env:
            - name: K8S_API_HOST
              value: localhost
            - name: SPARK_KUBERNETES_IMAGE
              value: 'gnut3ll4/spark:v1.0.14'
          image: '{registry-url}/{image-name}:{tag}'
          imagePullPolicy: Always
          name: livy
          ports:
            - containerPort: 8998
              name: livy-rest
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - mountPath: /var/log/livy
              name: livy-log
            - mountPath: /opt/.livy-sessions/
              name: livy-sessions
            - mountPath: /opt/livy/conf/livy.conf
              name: livy-config
              subPath: livy.conf
            - mountPath: /opt/spark/conf/spark-defaults.conf
              name: spark-config
              subPath: spark-defaults.conf
        - command:
            - /usr/local/bin/kubectl
            - proxy
            - '--port'
            - '8443'
          image: 'gnut3ll4/kubectl-sidecar:latest'
          imagePullPolicy: Always
          name: kubectl
          ports:
            - containerPort: 8443
              name: k8s-api
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      serviceAccount: spark
      serviceAccountName: spark
      terminationGracePeriodSeconds: 30
      volumes:
        - emptyDir: {}
          name: livy-log
        - emptyDir: {}
          name: livy-sessions
        - configMap:
            defaultMode: 420
            items:
              - key: livy.conf
                path: livy.conf
            name: livy-config
          name: livy-config
        - configMap:
            defaultMode: 420
            items:
              - key: spark-defaults.conf
                path: spark-defaults.conf
            name: livy-config
          name: spark-config
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: livy-config
data:
  livy.conf: |-
    livy.spark.deploy-mode=cluster
    livy.file.local-dir-whitelist=/opt/.livy-sessions/
    livy.spark.master=k8s://http://localhost:8443
    livy.server.session.state-retain.sec = 8h
  spark-defaults.conf: 'spark.kubernetes.container.image        "gnut3ll4/spark:v1.0.14"'
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: livy
  name: livy
spec:
  ports:
    - name: livy-rest
      port: 8998
      protocol: TCP
      targetPort: 8998
  selector:
    component: livy
  sessionAffinity: None
  type: ClusterIP
---
apiVersion: route.openshift.io/v1
kind: Route
metadata:
  labels:
    app: livy
  name: livy
spec:
  host: {livy-url}
  port:
    targetPort: livy-rest
  to:
    kind: Service
    name: livy
    weight: 100
  wildcardPolicy: None

Nach der Anwendung und dem erfolgreichen Start des Pods ist die grafische Benutzeroberfläche von Livy unter dem Link http://{livy-url}/ui verfügbar. Mit Livy können wir unsere Spark-Aufgabe mithilfe einer REST-Anfrage beispielsweise von Postman veröffentlichen. Im Folgenden wird ein Beispiel für eine Sammlung mit Anforderungen dargestellt (im Array „args“ können Konfigurationsargumente mit Variablen übergeben werden, die für den Betrieb der gestarteten Aufgabe erforderlich sind):

{
    "info": {
        "_postman_id": "be135198-d2ff-47b6-a33e-0d27b9dba4c8",
        "name": "Spark Livy",
        "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
    },
    "item": [
        {
            "name": "1 Submit job with jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar", nt"className": "org.apache.spark.examples.SparkPi",nt"numExecutors":1,nt"name": "spark-test-1",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt}n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        },
        {
            "name": "2 Submit job without jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "hdfs://{host}:{port}/{path-to-file-on-hdfs}", nt"className": "{class-name}",nt"numExecutors":1,nt"name": "spark-test-2",nt"proxyUser": "0",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt},nt"args": [ntt"HADOOP_CONF_DIR=/opt/spark/hadoop-conf",ntt"MASTER=k8s://https://kubernetes.default.svc:8443"nt]n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        }
    ],
    "event": [
        {
            "listen": "prerequest",
            "script": {
                "id": "41bea1d0-278c-40c9-ad42-bf2e6268897d",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        },
        {
            "listen": "test",
            "script": {
                "id": "3cdd7736-a885-4a2d-9668-bd75798f4560",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        }
    ],
    "protocolProfileBehavior": {}
}

Führen wir die erste Anfrage aus der Sammlung aus, gehen zur OKD-Schnittstelle und prüfen, ob die Aufgabe erfolgreich gestartet wurde – https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods. Gleichzeitig wird in der Livy-Benutzeroberfläche (http://{livy-url}/ui) eine Sitzung angezeigt, in der Sie mithilfe der Livy-API oder der grafischen Benutzeroberfläche den Fortschritt der Aufgabe verfolgen und die Sitzung studieren können Protokolle.

Lassen Sie uns nun zeigen, wie Livy funktioniert. Dazu untersuchen wir die Protokolle des Livy-Containers im Pod mit dem Livy-Server – https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods/{livy-pod-name }?tab=logs. Daraus können wir ersehen, dass beim Aufruf der Livy-REST-API in einem Container namens „livy“ ein Spark-Submit ausgeführt wird, ähnlich dem, das wir oben verwendet haben (hier ist {livy-pod-name} der Name des erstellten Pods). mit dem Livy-Server). Die Sammlung führt außerdem eine zweite Abfrage ein, mit der Sie Aufgaben ausführen können, die eine ausführbare Spark-Datei mithilfe eines Livy-Servers remote hosten.

Dritter Anwendungsfall – Spark Operator

Nachdem die Aufgabe nun getestet wurde, stellt sich die Frage, ob sie regelmäßig ausgeführt werden soll. Die native Möglichkeit, Aufgaben in einem Kubernetes-Cluster regelmäßig auszuführen, ist die CronJob-Entität und Sie können sie verwenden, aber derzeit ist die Verwendung von Operatoren zum Verwalten von Anwendungen in Kubernetes sehr beliebt und für Spark gibt es einen ziemlich ausgereiften Operator, was auch der Fall ist Wird in Lösungen auf Unternehmensebene verwendet (z. B. Lightbend FastData Platform). Wir empfehlen die Verwendung – die aktuelle stabile Version von Spark (2.4.5) bietet eher begrenzte Konfigurationsmöglichkeiten für die Ausführung von Spark-Aufgaben in Kubernetes, während die nächste Hauptversion (3.0.0) volle Unterstützung für Kubernetes ankündigt, ihr Veröffentlichungsdatum jedoch unbekannt bleibt . Spark Operator gleicht dieses Manko aus, indem es wichtige Konfigurationsoptionen hinzufügt (z. B. das Mounten einer ConfigMap mit Hadoop-Zugriffskonfiguration auf Spark-Pods) und die Möglichkeit, eine regelmäßig geplante Aufgabe auszuführen.

Ausführen von Apache Spark auf Kubernetes
Lassen Sie uns es als dritten Anwendungsfall hervorheben – das regelmäßige Ausführen von Spark-Aufgaben auf einem Kubernetes-Cluster in einer Produktionsschleife.

Spark Operator ist Open Source und wird innerhalb der Google Cloud Platform entwickelt – github.com/GoogleCloudPlatform/spark-on-k8s-operator. Die Installation kann auf drei Arten erfolgen:

  1. Als Teil der Lightbend FastData Platform/Cloudflow-Installation;
  2. Helm verwenden:
    helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
    helm install incubator/sparkoperator --namespace spark-operator
    	

  3. Verwendung von Manifesten aus dem offiziellen Repository (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/tree/master/manifest). Es ist Folgendes zu beachten: Cloudflow enthält einen Operator mit der API-Version v1beta1. Wenn dieser Installationstyp verwendet wird, sollten die Beschreibungen der Spark-Anwendungsmanifeste auf Beispiel-Tags in Git mit der entsprechenden API-Version basieren, zum Beispiel „v1beta1-0.9.0-2.4.0“. Die Version des Operators finden Sie in der Beschreibung des im Operator enthaltenen CRD im Wörterbuch „versions“:
    oc get crd sparkapplications.sparkoperator.k8s.io -o yaml
    	

Wenn der Operator korrekt installiert ist, erscheint im entsprechenden Projekt ein aktiver Pod mit dem Spark-Operator (z. B. cloudflow-fdp-sparkoperator im Cloudflow-Bereich für die Cloudflow-Installation) und ein entsprechender Kubernetes-Ressourcentyp mit dem Namen „sparkapplications“. . Sie können verfügbare Spark-Anwendungen mit dem folgenden Befehl erkunden:

oc get sparkapplications -n {project}

Um Aufgaben mit Spark Operator auszuführen, müssen Sie drei Dinge tun:

  • Erstellen Sie ein Docker-Image, das alle erforderlichen Bibliotheken sowie Konfigurations- und ausführbare Dateien enthält. Beim Zielbild handelt es sich um ein Image, das im CI/CD-Stadium erstellt und auf einem Testcluster getestet wurde;
  • Veröffentlichen Sie ein Docker-Image in einer Registrierung, auf die vom Kubernetes-Cluster aus zugegriffen werden kann.
  • Generieren Sie ein Manifest mit dem Typ „SparkApplication“ und einer Beschreibung der auszuführenden Aufgabe. Beispielmanifeste sind im offiziellen Repository verfügbar (z. B. github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta1-0.9.0-2.4.0/examples/spark-pi.yaml). Zum Manifest sind wichtige Punkte zu beachten:
    1. Das Wörterbuch „apiVersion“ muss die API-Version angeben, die der Operatorversion entspricht.
    2. Das Wörterbuch „metadata.namespace“ muss den Namespace angeben, in dem die Anwendung gestartet wird.
    3. Das Wörterbuch „spec.image“ muss die Adresse des erstellten Docker-Images in einer zugänglichen Registrierung enthalten.
    4. Das Wörterbuch „spec.mainClass“ muss die Spark-Aufgabenklasse enthalten, die ausgeführt werden muss, wenn der Prozess startet.
    5. der Pfad zur ausführbaren JAR-Datei muss im Wörterbuch „spec.mainApplicationFile“ angegeben werden;
    6. Das Wörterbuch „spec.sparkVersion“ muss die verwendete Version von Spark angeben.
    7. Das Wörterbuch „spec.driver.serviceAccount“ muss das Dienstkonto im entsprechenden Kubernetes-Namespace angeben, das zum Ausführen der Anwendung verwendet wird.
    8. Das Wörterbuch „spec.executor“ muss die Anzahl der der Anwendung zugewiesenen Ressourcen angeben;
    9. Das Wörterbuch „spec.volumeMounts“ muss das lokale Verzeichnis angeben, in dem die lokalen Spark-Aufgabendateien erstellt werden.

Ein Beispiel für die Generierung eines Manifests (hier ist {spark-service-account} ein Dienstkonto innerhalb des Kubernetes-Clusters zum Ausführen von Spark-Aufgaben):

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: {spark-service-account}
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Dieses Manifest gibt ein Dienstkonto an, für das Sie vor der Veröffentlichung des Manifests die erforderlichen Rollenbindungen erstellen müssen, die die erforderlichen Zugriffsrechte für die Spark-Anwendung bereitstellen, um mit der Kubernetes-API zu interagieren (falls erforderlich). In unserem Fall benötigt die Anwendung Rechte zum Erstellen von Pods. Erstellen wir die erforderliche Rollenbindung:

oc adm policy add-role-to-user edit system:serviceaccount:{project}:{spark-service-account} -n {project}

Es ist auch erwähnenswert, dass diese Manifestspezifikation möglicherweise einen „hadoopConfigMap“-Parameter enthält, der es Ihnen ermöglicht, eine ConfigMap mit der Hadoop-Konfiguration anzugeben, ohne zuerst die entsprechende Datei im Docker-Image platzieren zu müssen. Es eignet sich auch für die regelmäßige Ausführung von Aufgaben – mit dem Parameter „schedule“ kann ein Zeitplan für die Ausführung einer bestimmten Aufgabe festgelegt werden.

Danach speichern wir unser Manifest in der Datei spark-pi.yaml und wenden es auf unseren Kubernetes-Cluster an:

oc apply -f spark-pi.yaml

Dadurch wird ein Objekt vom Typ „sparkapplications“ erstellt:

oc get sparkapplications -n {project}
> NAME       AGE
> spark-pi   22h

In diesem Fall wird ein Pod mit einer Anwendung erstellt, deren Status in den erstellten „sparkapplications“ angezeigt wird. Sie können es mit dem folgenden Befehl anzeigen:

oc get sparkapplications spark-pi -o yaml -n {project}

Nach Abschluss der Aufgabe wechselt der POD in den Status „Abgeschlossen“, der auch in „sparkapplications“ aktualisiert wird. Anwendungsprotokolle können im Browser oder mit dem folgenden Befehl angezeigt werden (hier ist {sparkapplications-pod-name} der Name des Pods der laufenden Aufgabe):

oc logs {sparkapplications-pod-name} -n {project}

Spark-Aufgaben können auch mit dem speziellen Dienstprogramm sparkctl verwaltet werden. Um es zu installieren, klonen Sie das Repository mit seinem Quellcode, installieren Sie Go und erstellen Sie dieses Dienstprogramm:

git clone https://github.com/GoogleCloudPlatform/spark-on-k8s-operator.git
cd spark-on-k8s-operator/
wget https://dl.google.com/go/go1.13.3.linux-amd64.tar.gz
tar -xzf go1.13.3.linux-amd64.tar.gz
sudo mv go /usr/local
mkdir $HOME/Projects
export GOROOT=/usr/local/go
export GOPATH=$HOME/Projects
export PATH=$GOPATH/bin:$GOROOT/bin:$PATH
go -version
cd sparkctl
go build -o sparkctl
sudo mv sparkctl /usr/local/bin

Sehen wir uns die Liste der ausgeführten Spark-Aufgaben an:

sparkctl list -n {project}

Erstellen wir eine Beschreibung für die Spark-Aufgabe:

vi spark-app.yaml

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1000m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Lassen Sie uns die beschriebene Aufgabe mit sparkctl ausführen:

sparkctl create spark-app.yaml -n {project}

Sehen wir uns die Liste der ausgeführten Spark-Aufgaben an:

sparkctl list -n {project}

Sehen wir uns die Liste der Ereignisse einer gestarteten Spark-Aufgabe an:

sparkctl event spark-pi -n {project} -f

Lassen Sie uns den Status der laufenden Spark-Aufgabe untersuchen:

sparkctl status spark-pi -n {project}

Abschließend möchte ich die entdeckten Nachteile der Verwendung der aktuellen stabilen Version von Spark (2.4.5) in Kubernetes betrachten:

  1. Der erste und vielleicht größte Nachteil ist die fehlende Datenlokalität. Trotz aller Mängel von YARN gab es auch Vorteile bei der Verwendung, beispielsweise das Prinzip der Bereitstellung von Code zu Daten (und nicht von Daten zu Code). Dadurch wurden Spark-Aufgaben auf den Knoten ausgeführt, auf denen sich die an den Berechnungen beteiligten Daten befanden, und die Zeit, die für die Übermittlung der Daten über das Netzwerk benötigt wurde, wurde erheblich verkürzt. Bei der Verwendung von Kubernetes stehen wir vor der Notwendigkeit, die an einer Aufgabe beteiligten Daten über das Netzwerk zu verschieben. Wenn sie groß genug sind, kann sich die Ausführungszeit der Aufgabe erheblich verlängern und es wird außerdem eine relativ große Menge an Speicherplatz benötigt, der den Spark-Aufgabeninstanzen für deren temporäre Speicherung zugewiesen wird. Dieser Nachteil kann durch den Einsatz spezieller Software gemildert werden, die die Datenlokalität in Kubernetes gewährleistet (z. B. Alluxio). Dies bedeutet jedoch tatsächlich, dass eine vollständige Kopie der Daten auf den Knoten des Kubernetes-Clusters gespeichert werden muss.
  2. Der zweite wichtige Nachteil ist die Sicherheit. Standardmäßig sind sicherheitsrelevante Funktionen bezüglich der Ausführung von Spark-Aufgaben deaktiviert, die Verwendung von Kerberos wird in der offiziellen Dokumentation nicht behandelt (obwohl die entsprechenden Optionen in Version 3.0.0 eingeführt wurden, was zusätzliche Arbeit erfordert) und die Sicherheitsdokumentation für Bei Verwendung von Spark (https://spark.apache.org/docs/2.4.5/security.html) werden nur YARN, Mesos und Standalone Cluster als Schlüsselspeicher angezeigt. Gleichzeitig kann der Benutzer, unter dem Spark-Aufgaben gestartet werden, nicht direkt angegeben werden – wir geben nur das Dienstkonto an, unter dem es funktionieren soll, und der Benutzer wird basierend auf den konfigurierten Sicherheitsrichtlinien ausgewählt. In diesem Zusammenhang wird entweder der Root-Benutzer verwendet, was in einer produktiven Umgebung nicht sicher ist, oder ein Benutzer mit einer zufälligen UID, was bei der Verteilung von Zugriffsrechten auf Daten unpraktisch ist (dies kann durch die Erstellung von PodSecurityPolicies und deren Verknüpfung mit dem gelöst werden). entsprechende Dienstkonten). Derzeit besteht die Lösung darin, entweder alle erforderlichen Dateien direkt im Docker-Image zu platzieren oder das Spark-Startskript so zu ändern, dass es den in Ihrer Organisation übernommenen Mechanismus zum Speichern und Abrufen von Geheimnissen verwendet.
  3. Das Ausführen von Spark-Jobs mit Kubernetes befindet sich offiziell noch im experimentellen Modus und es kann in Zukunft zu erheblichen Änderungen an den verwendeten Artefakten (Konfigurationsdateien, Docker-Basisimages und Startskripts) kommen. Und tatsächlich wurden bei der Vorbereitung des Materials die Versionen 2.3.0 und 2.4.5 getestet, das Verhalten war deutlich unterschiedlich.

Warten wir auf Updates – kürzlich wurde eine neue Version von Spark (3.0.0) veröffentlicht, die erhebliche Änderungen an der Arbeit von Spark auf Kubernetes mit sich brachte, aber den experimentellen Status der Unterstützung für diesen Ressourcenmanager beibehielt. Vielleicht können die nächsten Updates es wirklich ermöglichen, den Verzicht auf YARN und die Ausführung von Spark-Aufgaben auf Kubernetes uneingeschränkt zu empfehlen, ohne Angst um die Sicherheit Ihres Systems zu haben und ohne dass Funktionskomponenten unabhängig geändert werden müssen.

Ende

Source: habr.com

Kommentar hinzufügen