Running Apache Spark on Kubernetes

Dear readers, good afternoon. Today we will talk a little about Apache Spark and its development prospects.

Running Apache Spark on Kubernetes

In the modern world of Big Data, Apache Spark is the de facto standard for developing batch data processing tasks. In addition, it is also used to create streaming applications that work in the micro batch concept, process and upload data in small portions (Spark Structured Streaming). And traditionally it has been part of the overall Hadoop stack, using YARN (or, in some cases, Apache Mesos) as the resource manager. By 2020, its use in its traditional form for most companies is under a big question due to the lack of decent Hadoop distributions - the development of HDP and CDH has stopped, CDH is underdeveloped and has a high cost, and the rest of the Hadoop providers have either ceased to exist or have a vague future. Therefore, the community and large companies are increasingly interested in running Apache Spark using Kubernetes - having become the standard in container orchestration and resource management in private and public clouds, it solves the problem of inconvenient resource planning for Spark tasks on YARN and provides a steadily developing platform with many commercial and open source distributions for companies of all sizes. In addition, on the wave of popularity, most have already managed to acquire a couple of installations of their own and build up expertise in its use, which simplifies the move.

Starting with version 2.3.0, Apache Spark has received official support for running tasks in a Kubernetes cluster, and today we will talk about the current maturity of this approach, various options for its use, and pitfalls that will be encountered during implementation.

First of all, let's look at the process of developing tasks and applications based on Apache Spark and highlight typical cases in which you want to run a task on a Kubernetes cluster. In preparing this post, OpenShift is used as a distribution and commands relevant to its command line utility (oc) will be given. For other Kubernetes distributions, the appropriate commands of the standard Kubernetes command line utility (kubectl) or their equivalents (for example, for oc adm policy) can be used.

The first use case is spark-submit

During the development of tasks and applications, the developer needs to run tasks to debug data transformation. Theoretically, stubs can be used for these purposes, but development involving real (albeit test) instances of end systems has proven to be faster and better in this class of tasks. In the case when we are debugging on real instances of end systems, two work scenarios are possible:

  • the developer runs the Spark task locally in standalone mode;

    Running Apache Spark on Kubernetes

  • a developer runs a Spark task on a Kubernetes cluster in a test loop.

    Running Apache Spark on Kubernetes

The first option has the right to exist, but entails a number of disadvantages:

  • for each developer, it is required to provide access from the workplace to all the instances of end systems that he needs;
  • sufficient resources are required on the production machine to run the task being developed.

The second option is devoid of these shortcomings, since using a Kubernetes cluster allows you to allocate the necessary pool of resources to run tasks and provide it with the necessary access to instances of end systems, flexibly providing access to it using the Kubernetes role model for all members of the development team. Let's highlight it as the first use case - running Spark tasks from a local developer machine on a Kubernetes cluster in a test loop.

Let's take a closer look at the process of setting up Spark to run locally. To start using Spark, you need to install it:

mkdir /opt/spark
cd /opt/spark
wget http://mirror.linux-ia64.org/apache/spark/spark-2.4.5/spark-2.4.5.tgz
tar zxvf spark-2.4.5.tgz
rm -f spark-2.4.5.tgz

We collect the necessary packages for working with Kubernetes:

cd spark-2.4.5/
./build/mvn -Pkubernetes -DskipTests clean package

A full build takes a lot of time, and in reality, only jars from the β€œassembly/” directory are needed to create Docker images and run them on a Kubernetes cluster, so only this subproject can be built:

./build/mvn -f ./assembly/pom.xml -Pkubernetes -DskipTests clean package

Running Spark tasks on Kubernetes requires you to create a Docker image to use as a base image. There are 2 possible approaches here:

  • The generated Docker image includes the Spark task executable code;
  • The created image includes only Spark and the necessary dependencies, the executable code is hosted remotely (for example, in HDFS).

First, let's build a Docker image containing a Spark task test case. To create Docker images, Spark has a corresponding utility called "docker-image-tool". Let's study it for help:

./bin/docker-image-tool.sh --help

It can be used to create Docker images and upload them to remote registries, but by default it has a number of disadvantages:

  • without fail creates 3 Docker images at once - for Spark, PySpark and R;
  • does not allow you to specify an image name.

Therefore, we will use a modified version of this utility, given below:

vi bin/docker-image-tool-upd.sh

#!/usr/bin/env bash

function error {
  echo "$@" 1>&2
  exit 1
}

if [ -z "${SPARK_HOME}" ]; then
  SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
. "${SPARK_HOME}/bin/load-spark-env.sh"

function image_ref {
  local image="$1"
  local add_repo="${2:-1}"
  if [ $add_repo = 1 ] && [ -n "$REPO" ]; then
    image="$REPO/$image"
  fi
  if [ -n "$TAG" ]; then
    image="$image:$TAG"
  fi
  echo "$image"
}

function build {
  local BUILD_ARGS
  local IMG_PATH

  if [ ! -f "$SPARK_HOME/RELEASE" ]; then
    IMG_PATH=$BASEDOCKERFILE
    BUILD_ARGS=(
      ${BUILD_PARAMS}
      --build-arg
      img_path=$IMG_PATH
      --build-arg
      datagram_jars=datagram/runtimelibs
      --build-arg
      spark_jars=assembly/target/scala-$SPARK_SCALA_VERSION/jars
    )
  else
    IMG_PATH="kubernetes/dockerfiles"
    BUILD_ARGS=(${BUILD_PARAMS})
  fi

  if [ -z "$IMG_PATH" ]; then
    error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
  fi

  if [ -z "$IMAGE_REF" ]; then
    error "Cannot find docker image reference. Please add -i arg."
  fi

  local BINDING_BUILD_ARGS=(
    ${BUILD_PARAMS}
    --build-arg
    base_img=$(image_ref $IMAGE_REF)
  )
  local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/docker/Dockerfile"}

  docker build $NOCACHEARG "${BUILD_ARGS[@]}" 
    -t $(image_ref $IMAGE_REF) 
    -f "$BASEDOCKERFILE" .
}

function push {
  docker push "$(image_ref $IMAGE_REF)"
}

function usage {
  cat <<EOF
Usage: $0 [options] [command]
Builds or pushes the built-in Spark Docker image.

Commands:
  build       Build image. Requires a repository address to be provided if the image will be
              pushed to a different registry.
  push        Push a pre-built image to a registry. Requires a repository address to be provided.

Options:
  -f file               Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
  -p file               Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark.
  -R file               Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark.
  -r repo               Repository address.
  -i name               Image name to apply to the built image, or to identify the image to be pushed.  
  -t tag                Tag to apply to the built image, or to identify the image to be pushed.
  -m                    Use minikube's Docker daemon.
  -n                    Build docker image with --no-cache
  -b arg      Build arg to build or push the image. For multiple build args, this option needs to
              be used separately for each build arg.

Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
available when running applications inside the minikube cluster.

Check the following documentation for more information on using the minikube Docker daemon:

  https://kubernetes.io/docs/getting-started-guides/minikube/#reusing-the-docker-daemon

Examples:
  - Build image in minikube with tag "testing"
    $0 -m -t testing build

  - Build and push image with tag "v2.3.0" to docker.io/myrepo
    $0 -r docker.io/myrepo -t v2.3.0 build
    $0 -r docker.io/myrepo -t v2.3.0 push
EOF
}

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
  usage
  exit 0
fi

REPO=
TAG=
BASEDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
IMAGE_REF=
while getopts f:mr:t:nb:i: option
do
 case "${option}"
 in
 f) BASEDOCKERFILE=${OPTARG};;
 r) REPO=${OPTARG};;
 t) TAG=${OPTARG};;
 n) NOCACHEARG="--no-cache";;
 i) IMAGE_REF=${OPTARG};;
 b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};;
 esac
done

case "${@: -1}" in
  build)
    build
    ;;
  push)
    if [ -z "$REPO" ]; then
      usage
      exit 1
    fi
    push
    ;;
  *)
    usage
    exit 1
    ;;
esac

With its help, we build a basic Spark image that contains a test task for calculating the number Pi using Spark (here {docker-registry-url} is the URL of your Docker image registry, {repo} is the name of the repository inside the registry, which matches the project in OpenShift , {image-name} is the name of the image (if a three-level separation of images is used, for example, as in the integrated Red Hat OpenShift image registry), {tag} is the tag of this image version):

./bin/docker-image-tool-upd.sh -f resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile -r {docker-registry-url}/{repo} -i {image-name} -t {tag} build

Authorize in the OKD cluster using the console utility (here {OKD-API-URL} is the OKD cluster API URL):

oc login {OKD-API-URL}

Let's get the current user's token for authorization in the Docker Registry:

oc whoami -t

Authorize in the internal Docker Registry of the OKD cluster (we use the token obtained using the previous command as a password):

docker login {docker-registry-url}

Upload the built Docker image to the Docker Registry OKD:

./bin/docker-image-tool-upd.sh -r {docker-registry-url}/{repo} -i {image-name} -t {tag} push

Let's check that the built image is available in OKD. To do this, open the URL with a list of images of the corresponding project in the browser (here {project} is the name of the project inside the OpenShift cluster, {OKD-WEBUI-URL} is the URL of the OpenShift Web console) β€” https://{OKD-WEBUI-URL}/console /project/{project}/browse/images/{image-name}.

To run tasks, a service account must be created with privileges to run pods as root (we will discuss this point later):

oc create sa spark -n {project}
oc adm policy add-scc-to-user anyuid -z spark -n {project}

Let's run the spark-submit command to publish the Spark task to the OKD cluster, specifying the created service account and the Docker image:

 /opt/spark/bin/spark-submit --name spark-test --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar

Here:

--name - the name of the task that will participate in the formation of the name of the Kubernetes pods;

--class β€” class of the executable file called when the task starts;

--conf - Spark configuration parameters;

spark.executor.instances - number of Spark executors to run

spark.kubernetes.authenticate.driver.serviceAccountName - The name of the Kubernetes service account used when running Pods (to define the security context and capabilities when interacting with the Kubernetes API)

spark.kubernetes.namespace - the Kubernetes namespace in which driver and executor pods will run;

spark.submit.deployMode - how to start Spark (for standard spark-submit use "cluster", for Spark Operator and later versions of Spark "client");

spark.kubernetes.container.image - Docker image used to run pods

spark.master - Kubernetes API URL (external is specified so the call is made from the local machine);

local:// is the path to the Spark executable inside the Docker image.

We go to the corresponding OKD project and study the created pods - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods.

To simplify the development process, another option can be used, in which a common base Spark image is created, used by all tasks to run, and snapshots of executable files are published to external storage (for example, Hadoop) and specified when calling spark-submit as a link. In this case, you can run different versions of Spark tasks without rebuilding Docker images using, for example, WebHDFS to publish the images. We send a request to create a file (here {host} is the host of the WebHDFS service, {port} is the port of the WebHDFS service, {path-to-file-on-hdfs} is the desired path to the file on HDFS):

curl -i -X PUT "http://{host}:{port}/webhdfs/v1/{path-to-file-on-hdfs}?op=CREATE

This will return a response of the form (here {location} is the URL to be used to download the file):

HTTP/1.1 307 TEMPORARY_REDIRECT
Location: {location}
Content-Length: 0

Upload the Spark executable to HDFS (where {path-to-local-file} is the path to the Spark executable on the current host):

curl -i -X PUT -T {path-to-local-file} "{location}"

After that, we can spark-submit using the Spark file uploaded to HDFS (here {class-name} is the name of the class that needs to be run to complete the task):

/opt/spark/bin/spark-submit --name spark-test --class {class-name} --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  hdfs://{host}:{port}/{path-to-file-on-hdfs}

At the same time, it should be noted that in order to access HDFS and make the task work, it may be necessary to change the Dockerfile and the entrypoint.sh script - add a directive to the Dockerfile to copy dependent libraries to the /opt/spark/jars directory and include the HDFS configuration file in SPARK_CLASSPATH in the entrypoint. sh.

The second use case is Apache Livy

Further, when the task is developed and it is required to test the result obtained, the question arises of launching it as part of the CI / CD process and tracking the status of its execution. Of course, you can run it with a local call to spark-submit, but this complicates the CI / CD infrastructure because it requires the installation and configuration of Spark on the agents / runners of the CI server and the configuration of access to the Kubernetes API. For this case, the target implementation has chosen to use Apache Livy as the REST API for running Spark tasks hosted inside a Kubernetes cluster. With it, you can run Spark tasks on a Kubernetes cluster using regular cURL requests, which is easily implemented based on any CI solution, and its placement inside the Kubernetes cluster solves the issue of authentication when interacting with the Kubernetes API.

Running Apache Spark on Kubernetes

Let's highlight it as the second use case - running Spark tasks as part of a CI / CD process on a Kubernetes cluster in a test circuit.

A little about Apache Livy - it works as an HTTP server that provides a Web interface and a RESTful API that allows you to remotely launch spark-submit by passing the necessary parameters. It was traditionally shipped as part of the HDP distribution, but can also be deployed to OKD or any other Kubernetes installation using the appropriate manifest and a set of Docker images, such as this βˆ’ github.com/ttauveron/k8s-big-data-experiments/tree/master/livy-spark-2.3. For our case, a similar Docker image was built, including Spark version 2.4.5 from the following Dockerfile:

FROM java:8-alpine

ENV SPARK_HOME=/opt/spark
ENV LIVY_HOME=/opt/livy
ENV HADOOP_CONF_DIR=/etc/hadoop/conf
ENV SPARK_USER=spark

WORKDIR /opt

RUN apk add --update openssl wget bash && 
    wget -P /opt https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz && 
    tar xvzf spark-2.4.5-bin-hadoop2.7.tgz && 
    rm spark-2.4.5-bin-hadoop2.7.tgz && 
    ln -s /opt/spark-2.4.5-bin-hadoop2.7 /opt/spark

RUN wget http://mirror.its.dal.ca/apache/incubator/livy/0.7.0-incubating/apache-livy-0.7.0-incubating-bin.zip && 
    unzip apache-livy-0.7.0-incubating-bin.zip && 
    rm apache-livy-0.7.0-incubating-bin.zip && 
    ln -s /opt/apache-livy-0.7.0-incubating-bin /opt/livy && 
    mkdir /var/log/livy && 
    ln -s /var/log/livy /opt/livy/logs && 
    cp /opt/livy/conf/log4j.properties.template /opt/livy/conf/log4j.properties

ADD livy.conf /opt/livy/conf
ADD spark-defaults.conf /opt/spark/conf/spark-defaults.conf
ADD entrypoint.sh /entrypoint.sh

ENV PATH="/opt/livy/bin:${PATH}"

EXPOSE 8998

ENTRYPOINT ["/entrypoint.sh"]
CMD ["livy-server"]

The generated image can be built and uploaded to a Docker repository you have, such as the internal OKD repository. To deploy it, use the following manifest ({registry-url} β€” Docker image registry URL, {image-name} β€” Docker image name, {tag} β€” Docker image tag, {livy-url} β€” desired URL where the server will be available Livy; the "Route" manifest is used if Red Hat OpenShift is used as the Kubernetes distribution, otherwise the corresponding Ingress or Service manifest of type NodePort is used):

---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: livy
  name: livy
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      component: livy
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        component: livy
    spec:
      containers:
        - command:
            - livy-server
          env:
            - name: K8S_API_HOST
              value: localhost
            - name: SPARK_KUBERNETES_IMAGE
              value: 'gnut3ll4/spark:v1.0.14'
          image: '{registry-url}/{image-name}:{tag}'
          imagePullPolicy: Always
          name: livy
          ports:
            - containerPort: 8998
              name: livy-rest
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - mountPath: /var/log/livy
              name: livy-log
            - mountPath: /opt/.livy-sessions/
              name: livy-sessions
            - mountPath: /opt/livy/conf/livy.conf
              name: livy-config
              subPath: livy.conf
            - mountPath: /opt/spark/conf/spark-defaults.conf
              name: spark-config
              subPath: spark-defaults.conf
        - command:
            - /usr/local/bin/kubectl
            - proxy
            - '--port'
            - '8443'
          image: 'gnut3ll4/kubectl-sidecar:latest'
          imagePullPolicy: Always
          name: kubectl
          ports:
            - containerPort: 8443
              name: k8s-api
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      serviceAccount: spark
      serviceAccountName: spark
      terminationGracePeriodSeconds: 30
      volumes:
        - emptyDir: {}
          name: livy-log
        - emptyDir: {}
          name: livy-sessions
        - configMap:
            defaultMode: 420
            items:
              - key: livy.conf
                path: livy.conf
            name: livy-config
          name: livy-config
        - configMap:
            defaultMode: 420
            items:
              - key: spark-defaults.conf
                path: spark-defaults.conf
            name: livy-config
          name: spark-config
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: livy-config
data:
  livy.conf: |-
    livy.spark.deploy-mode=cluster
    livy.file.local-dir-whitelist=/opt/.livy-sessions/
    livy.spark.master=k8s://http://localhost:8443
    livy.server.session.state-retain.sec = 8h
  spark-defaults.conf: 'spark.kubernetes.container.image        "gnut3ll4/spark:v1.0.14"'
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: livy
  name: livy
spec:
  ports:
    - name: livy-rest
      port: 8998
      protocol: TCP
      targetPort: 8998
  selector:
    component: livy
  sessionAffinity: None
  type: ClusterIP
---
apiVersion: route.openshift.io/v1
kind: Route
metadata:
  labels:
    app: livy
  name: livy
spec:
  host: {livy-url}
  port:
    targetPort: livy-rest
  to:
    kind: Service
    name: livy
    weight: 100
  wildcardPolicy: None

After applying it and successfully running the pod, the Livy GUI is available at: http://{livy-url}/ui. With Livy, we can publish our Spark task using a REST request from Postman, for example. An example of a collection with requests is presented below (configuration arguments with variables necessary for the work of the launched task can be passed in the "args" array):

{
    "info": {
        "_postman_id": "be135198-d2ff-47b6-a33e-0d27b9dba4c8",
        "name": "Spark Livy",
        "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
    },
    "item": [
        {
            "name": "1 Submit job with jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar", nt"className": "org.apache.spark.examples.SparkPi",nt"numExecutors":1,nt"name": "spark-test-1",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt}n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        },
        {
            "name": "2 Submit job without jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "hdfs://{host}:{port}/{path-to-file-on-hdfs}", nt"className": "{class-name}",nt"numExecutors":1,nt"name": "spark-test-2",nt"proxyUser": "0",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt},nt"args": [ntt"HADOOP_CONF_DIR=/opt/spark/hadoop-conf",ntt"MASTER=k8s://https://kubernetes.default.svc:8443"nt]n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        }
    ],
    "event": [
        {
            "listen": "prerequest",
            "script": {
                "id": "41bea1d0-278c-40c9-ad42-bf2e6268897d",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        },
        {
            "listen": "test",
            "script": {
                "id": "3cdd7736-a885-4a2d-9668-bd75798f4560",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        }
    ],
    "protocolProfileBehavior": {}
}

Let's execute the first request from the collection, go to the OKD interface and check that the task has been successfully launched - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods. At the same time, a session will appear in the Livy interface (http://{livy-url}/ui), within which, using the Livy API or a graphical interface, you can monitor the progress of the task and study the session logs.

Now let's show how Livy works. To do this, let's examine the logs of the Livy container inside the pod with the Livy server - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods/{livy-pod-name}?tab=logs. You can see from them that when calling the Livy REST API in a container named "livy", a spark-submit is performed, similar to the one we used above (here {livy-pod-name} is the name of the created pod with the Livy server). The collection also provides a second query that allows you to run tasks with remote hosting of the Spark executable using the Livy server.

Third use case - Spark Operator

Now that the task has been tested, the question arises of its regular launch. The native way to regularly run tasks in a Kubernetes cluster is the CronJob entity and you can use it, but at the moment the use of operators for managing applications in Kubernetes is very popular and there is a fairly mature operator for Spark, which, among other things, is used in Enterprise-level solutions (for example, Lightbend FastData Platform). We recommend using it - the current stable version of Spark (2.4.5) has rather limited options for configuring the launch of Spark tasks in Kubernetes, while the next major version (3.0.0) claims full support for Kubernetes, but its release date remains unknown. Spark Operator makes up for this by adding important configuration options (such as mounting a ConfigMap with Hadoop access configuration to Spark Pods) and the ability to run a scheduled task on a regular basis.

Running Apache Spark on Kubernetes
Let's single it out as a third use case - regularly running Spark tasks on a Kubernetes cluster in a production loop.

Spark Operator is open source and developed within the Google Cloud Platform βˆ’ github.com/GoogleCloudPlatform/spark-on-k8s-operator. It can be installed in 3 ways:

  1. As part of installing Lightbend FastData Platform/Cloudflow;
  2. With Helms:
    helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
    helm install incubator/sparkoperator --namespace spark-operator
    	

  3. Using manifests from the official repository (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/tree/master/manifest). At the same time, it is worth noting the following - Cloudflow includes an operator with an API version of v1beta1. If this type of installation is used, then Spark application manifest descriptions should be based on examples from tags in Git with the appropriate API version, for example, "v1beta1-0.9.0-2.4.0". The version of the operator can be viewed in the description of the CRD that is part of the operator in the "versions" dictionary:
    oc get crd sparkapplications.sparkoperator.k8s.io -o yaml
    	

If the operator is installed correctly, then the corresponding project will have an active Pod with the Spark operator (for example, cloudflow-fdp-sparkoperator in the Cloudflow space for installing Cloudflow) and the corresponding Kubernetes resource type named "sparkapplications" will appear. You can explore the available Spark applications with the following command:

oc get sparkapplications -n {project}

To run tasks using the Spark Operator, you need to do 3 things:

  • create a Docker image that includes all the necessary libraries, as well as configuration and executable files. In the target picture, this is an image created at the CI / CD stage and tested on a test cluster;
  • publish the Docker image to a registry accessible from the Kubernetes cluster;
  • generate a manifest with the type "SparkApplication" and a description of the task to be launched. Sample manifests are available in the official repository (for example, github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta1-0.9.0-2.4.0/examples/spark-pi.yaml). It is worth noting important points regarding the manifesto:
    1. the "apiVersion" dictionary must contain the API version corresponding to the version of the operator;
    2. the "metadata.namespace" dictionary must contain the namespace in which the application will be launched;
    3. the "spec.image" dictionary should contain the address of the created Docker image in the available registry;
    4. the "spec.mainClass" dictionary must contain the class of the Spark task that you want to run when the process starts;
    5. the "spec.mainApplicationFile" dictionary must contain the path to the executable jar file;
    6. the "spec.sparkVersion" dictionary should be the version of Spark being used;
    7. the "spec.driver.serviceAccount" dictionary must contain the service account within the corresponding Kubernetes namespace that will be used to run the application;
    8. the "spec.executor" dictionary should indicate the amount of resources allocated to the application;
    9. the "spec.volumeMounts" dictionary must be set to the local directory where the local Spark task files will be created.

An example of generating a manifest (here {spark-service-account} is a service account within a Kubernetes cluster for running Spark tasks):

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: {spark-service-account}
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

This manifest specifies a service account that requires the necessary role bindings to be created prior to publishing the manifest to provide the necessary permissions for the Spark application to interact with the Kubernetes API (if needed). In our case, the application needs the rights to create Pods. Let's create the necessary role binding:

oc adm policy add-role-to-user edit system:serviceaccount:{project}:{spark-service-account} -n {project}

It is also worth noting that the "hadoopConfigMap" parameter can be specified in the specification of this manifest, which allows you to specify a ConfigMap with a Hadoop configuration without having to first put the corresponding file in the Docker image. It is also suitable for regular launch of tasks - using the "schedule" parameter, the schedule for launching this task can be specified.

After that, we save our manifest to the spark-pi.yaml file and apply it to our Kubernetes cluster:

oc apply -f spark-pi.yaml

This will create an object of type "sparkapplications":

oc get sparkapplications -n {project}
> NAME       AGE
> spark-pi   22h

This will create a pod with an application whose status will be displayed in the created "sparkapplications". It can be viewed with the following command:

oc get sparkapplications spark-pi -o yaml -n {project}

Upon completion of the task, the POD will move to the "Completed" status, which will also be updated in "sparkapplications". Application logs can be viewed in the browser or with the following command (here {sparkapplications-pod-name} is the name of the running task pod):

oc logs {sparkapplications-pod-name} -n {project}

Also, Spark tasks can be managed using the specialized sparkctl utility. To install it, we clone the repository with its source code, install Go and build this utility:

git clone https://github.com/GoogleCloudPlatform/spark-on-k8s-operator.git
cd spark-on-k8s-operator/
wget https://dl.google.com/go/go1.13.3.linux-amd64.tar.gz
tar -xzf go1.13.3.linux-amd64.tar.gz
sudo mv go /usr/local
mkdir $HOME/Projects
export GOROOT=/usr/local/go
export GOPATH=$HOME/Projects
export PATH=$GOPATH/bin:$GOROOT/bin:$PATH
go -version
cd sparkctl
go build -o sparkctl
sudo mv sparkctl /usr/local/bin

Let's examine the list of running Spark tasks:

sparkctl list -n {project}

Let's create a description for the Spark task:

vi spark-app.yaml

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1000m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Let's run the described task using sparkctl:

sparkctl create spark-app.yaml -n {project}

Let's examine the list of running Spark tasks:

sparkctl list -n {project}

Let's examine the list of events of a running Spark task:

sparkctl event spark-pi -n {project} -f

Let's examine the status of a running Spark task:

sparkctl status spark-pi -n {project}

In conclusion, I would like to consider the discovered disadvantages of operating the current stable version of Spark (2.4.5) in Kubernetes:

  1. The first and, perhaps, the main disadvantage is the lack of Data Locality. Despite all the shortcomings of YARN, there were pluses in its use, for example, the principle of delivering code to data (rather than data to code). Thanks to him, Spark tasks were performed on the nodes where the data involved in the calculations was located, and the time to deliver data across the network was noticeably reduced. When using Kubernetes, we are faced with the need to move through the network the data involved in the work of the task. If they are large enough, then the task execution time can increase significantly, and a sufficiently large amount of disk space is required to be allocated to Spark task instances for their temporary storage. This drawback can be reduced by using specialized software tools that provide data locality in Kubernetes (for example, Alluxio), but this actually means the need to store a complete copy of the data on the nodes of the Kubernetes cluster.
  2. The second major downside is security. By default, security-related features regarding running Spark tasks are disabled, the use of Kerberos is not covered in the official documentation (although the corresponding options appeared in version 3.0.0, which will require further work), and in the security documentation when using Spark (https ://spark.apache.org/docs/2.4.5/security.html) only YARN, Mesos and Standalone Cluster appear as keystores. At the same time, the user under which Spark tasks are launched cannot be specified directly - we only set the service account under which it will work under, and the user is selected based on the configured security policies. In this regard, either the root user is used, which is not safe in a productive environment, or a user with a random UID, which is inconvenient when distributing data access rights (decided by creating PodSecurityPolicies and linking them to the corresponding service accounts). At the moment, the solution is either putting all the necessary files directly into the Docker image, or modifying the Spark startup script to use the mechanism for storing and retrieving secrets accepted in your organization.
  3. Running Spark tasks with Kubernetes is still officially in experimental mode and there may be significant changes in the artifacts used (config files, Docker base images, and startup scripts) in the future. And indeed - when preparing the material, versions 2.3.0 and 2.4.5 were tested, the behavior was significantly different.

Let's wait for updates - a fresh version of Spark (3.0.0) was recently released, which brought tangible changes to the work of Spark on Kubernetes, but retained the experimental status of support for this resource manager. Perhaps the next updates will really make it possible to fully recommend abandoning YARN and running Spark tasks on Kubernetes without fear for the security of your system and without the need to refine the functional components yourself.

End.

Source: habr.com

Add a comment