Dear readers, good afternoon. Today we will talk a little about Apache Spark and its development prospects.
In the modern world of Big Data, Apache Spark is the de facto standard for developing batch data processing tasks. In addition, it is also used to create streaming applications that work in the micro batch concept, process and upload data in small portions (Spark Structured Streaming). And traditionally it has been part of the overall Hadoop stack, using YARN (or, in some cases, Apache Mesos) as the resource manager. By 2020, its use in its traditional form for most companies is under a big question due to the lack of decent Hadoop distributions - the development of HDP and CDH has stopped, CDH is underdeveloped and has a high cost, and the rest of the Hadoop providers have either ceased to exist or have a vague future. Therefore, the community and large companies are increasingly interested in running Apache Spark using Kubernetes - having become the standard in container orchestration and resource management in private and public clouds, it solves the problem of inconvenient resource planning for Spark tasks on YARN and provides a steadily developing platform with many commercial and open source distributions for companies of all sizes. In addition, on the wave of popularity, most have already managed to acquire a couple of installations of their own and build up expertise in its use, which simplifies the move.
Starting with version 2.3.0, Apache Spark has received official support for running tasks in a Kubernetes cluster, and today we will talk about the current maturity of this approach, various options for its use, and pitfalls that will be encountered during implementation.
First of all, let's look at the process of developing tasks and applications based on Apache Spark and highlight typical cases in which you want to run a task on a Kubernetes cluster. In preparing this post, OpenShift is used as a distribution and commands relevant to its command line utility (oc) will be given. For other Kubernetes distributions, the appropriate commands of the standard Kubernetes command line utility (kubectl) or their equivalents (for example, for oc adm policy) can be used.
The first use case is spark-submit
During the development of tasks and applications, the developer needs to run tasks to debug data transformation. Theoretically, stubs can be used for these purposes, but development involving real (albeit test) instances of end systems has proven to be faster and better in this class of tasks. In the case when we are debugging on real instances of end systems, two work scenarios are possible:
the developer runs the Spark task locally in standalone mode;
a developer runs a Spark task on a Kubernetes cluster in a test loop.
The first option has the right to exist, but entails a number of disadvantages:
for each developer, it is required to provide access from the workplace to all the instances of end systems that he needs;
sufficient resources are required on the production machine to run the task being developed.
The second option is devoid of these shortcomings, since using a Kubernetes cluster allows you to allocate the necessary pool of resources to run tasks and provide it with the necessary access to instances of end systems, flexibly providing access to it using the Kubernetes role model for all members of the development team. Let's highlight it as the first use case - running Spark tasks from a local developer machine on a Kubernetes cluster in a test loop.
Let's take a closer look at the process of setting up Spark to run locally. To start using Spark, you need to install it:
mkdir /opt/spark
cd /opt/spark
wget http://mirror.linux-ia64.org/apache/spark/spark-2.4.5/spark-2.4.5.tgz
tar zxvf spark-2.4.5.tgz
rm -f spark-2.4.5.tgz
We collect the necessary packages for working with Kubernetes:
cd spark-2.4.5/
./build/mvn -Pkubernetes -DskipTests clean package
A full build takes a lot of time, and in reality, only jars from the βassembly/β directory are needed to create Docker images and run them on a Kubernetes cluster, so only this subproject can be built:
Running Spark tasks on Kubernetes requires you to create a Docker image to use as a base image. There are 2 possible approaches here:
The generated Docker image includes the Spark task executable code;
The created image includes only Spark and the necessary dependencies, the executable code is hosted remotely (for example, in HDFS).
First, let's build a Docker image containing a Spark task test case. To create Docker images, Spark has a corresponding utility called "docker-image-tool". Let's study it for help:
./bin/docker-image-tool.sh --help
It can be used to create Docker images and upload them to remote registries, but by default it has a number of disadvantages:
without fail creates 3 Docker images at once - for Spark, PySpark and R;
does not allow you to specify an image name.
Therefore, we will use a modified version of this utility, given below:
vi bin/docker-image-tool-upd.sh
#!/usr/bin/env bash
function error {
echo "$@" 1>&2
exit 1
}
if [ -z "${SPARK_HOME}" ]; then
SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
. "${SPARK_HOME}/bin/load-spark-env.sh"
function image_ref {
local image="$1"
local add_repo="${2:-1}"
if [ $add_repo = 1 ] && [ -n "$REPO" ]; then
image="$REPO/$image"
fi
if [ -n "$TAG" ]; then
image="$image:$TAG"
fi
echo "$image"
}
function build {
local BUILD_ARGS
local IMG_PATH
if [ ! -f "$SPARK_HOME/RELEASE" ]; then
IMG_PATH=$BASEDOCKERFILE
BUILD_ARGS=(
${BUILD_PARAMS}
--build-arg
img_path=$IMG_PATH
--build-arg
datagram_jars=datagram/runtimelibs
--build-arg
spark_jars=assembly/target/scala-$SPARK_SCALA_VERSION/jars
)
else
IMG_PATH="kubernetes/dockerfiles"
BUILD_ARGS=(${BUILD_PARAMS})
fi
if [ -z "$IMG_PATH" ]; then
error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
fi
if [ -z "$IMAGE_REF" ]; then
error "Cannot find docker image reference. Please add -i arg."
fi
local BINDING_BUILD_ARGS=(
${BUILD_PARAMS}
--build-arg
base_img=$(image_ref $IMAGE_REF)
)
local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/docker/Dockerfile"}
docker build $NOCACHEARG "${BUILD_ARGS[@]}"
-t $(image_ref $IMAGE_REF)
-f "$BASEDOCKERFILE" .
}
function push {
docker push "$(image_ref $IMAGE_REF)"
}
function usage {
cat <<EOF
Usage: $0 [options] [command]
Builds or pushes the built-in Spark Docker image.
Commands:
build Build image. Requires a repository address to be provided if the image will be
pushed to a different registry.
push Push a pre-built image to a registry. Requires a repository address to be provided.
Options:
-f file Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
-p file Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark.
-R file Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark.
-r repo Repository address.
-i name Image name to apply to the built image, or to identify the image to be pushed.
-t tag Tag to apply to the built image, or to identify the image to be pushed.
-m Use minikube's Docker daemon.
-n Build docker image with --no-cache
-b arg Build arg to build or push the image. For multiple build args, this option needs to
be used separately for each build arg.
Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
available when running applications inside the minikube cluster.
Check the following documentation for more information on using the minikube Docker daemon:
https://kubernetes.io/docs/getting-started-guides/minikube/#reusing-the-docker-daemon
Examples:
- Build image in minikube with tag "testing"
$0 -m -t testing build
- Build and push image with tag "v2.3.0" to docker.io/myrepo
$0 -r docker.io/myrepo -t v2.3.0 build
$0 -r docker.io/myrepo -t v2.3.0 push
EOF
}
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
usage
exit 0
fi
REPO=
TAG=
BASEDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
IMAGE_REF=
while getopts f:mr:t:nb:i: option
do
case "${option}"
in
f) BASEDOCKERFILE=${OPTARG};;
r) REPO=${OPTARG};;
t) TAG=${OPTARG};;
n) NOCACHEARG="--no-cache";;
i) IMAGE_REF=${OPTARG};;
b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};;
esac
done
case "${@: -1}" in
build)
build
;;
push)
if [ -z "$REPO" ]; then
usage
exit 1
fi
push
;;
*)
usage
exit 1
;;
esac
With its help, we build a basic Spark image that contains a test task for calculating the number Pi using Spark (here {docker-registry-url} is the URL of your Docker image registry, {repo} is the name of the repository inside the registry, which matches the project in OpenShift , {image-name} is the name of the image (if a three-level separation of images is used, for example, as in the integrated Red Hat OpenShift image registry), {tag} is the tag of this image version):
Let's check that the built image is available in OKD. To do this, open the URL with a list of images of the corresponding project in the browser (here {project} is the name of the project inside the OpenShift cluster, {OKD-WEBUI-URL} is the URL of the OpenShift Web console) β https://{OKD-WEBUI-URL}/console /project/{project}/browse/images/{image-name}.
To run tasks, a service account must be created with privileges to run pods as root (we will discuss this point later):
--name - the name of the task that will participate in the formation of the name of the Kubernetes pods;
--class β class of the executable file called when the task starts;
--conf - Spark configuration parameters;
spark.executor.instances - number of Spark executors to run
spark.kubernetes.authenticate.driver.serviceAccountName - The name of the Kubernetes service account used when running Pods (to define the security context and capabilities when interacting with the Kubernetes API)
spark.kubernetes.namespace - the Kubernetes namespace in which driver and executor pods will run;
spark.submit.deployMode - how to start Spark (for standard spark-submit use "cluster", for Spark Operator and later versions of Spark "client");
spark.kubernetes.container.image - Docker image used to run pods
spark.master - Kubernetes API URL (external is specified so the call is made from the local machine);
local:// is the path to the Spark executable inside the Docker image.
We go to the corresponding OKD project and study the created pods - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods.
To simplify the development process, another option can be used, in which a common base Spark image is created, used by all tasks to run, and snapshots of executable files are published to external storage (for example, Hadoop) and specified when calling spark-submit as a link. In this case, you can run different versions of Spark tasks without rebuilding Docker images using, for example, WebHDFS to publish the images. We send a request to create a file (here {host} is the host of the WebHDFS service, {port} is the port of the WebHDFS service, {path-to-file-on-hdfs} is the desired path to the file on HDFS):
curl -i -X PUT "http://{host}:{port}/webhdfs/v1/{path-to-file-on-hdfs}?op=CREATE
This will return a response of the form (here {location} is the URL to be used to download the file):
Upload the Spark executable to HDFS (where {path-to-local-file} is the path to the Spark executable on the current host):
curl -i -X PUT -T {path-to-local-file} "{location}"
After that, we can spark-submit using the Spark file uploaded to HDFS (here {class-name} is the name of the class that needs to be run to complete the task):
At the same time, it should be noted that in order to access HDFS and make the task work, it may be necessary to change the Dockerfile and the entrypoint.sh script - add a directive to the Dockerfile to copy dependent libraries to the /opt/spark/jars directory and include the HDFS configuration file in SPARK_CLASSPATH in the entrypoint. sh.
The second use case is Apache Livy
Further, when the task is developed and it is required to test the result obtained, the question arises of launching it as part of the CI / CD process and tracking the status of its execution. Of course, you can run it with a local call to spark-submit, but this complicates the CI / CD infrastructure because it requires the installation and configuration of Spark on the agents / runners of the CI server and the configuration of access to the Kubernetes API. For this case, the target implementation has chosen to use Apache Livy as the REST API for running Spark tasks hosted inside a Kubernetes cluster. With it, you can run Spark tasks on a Kubernetes cluster using regular cURL requests, which is easily implemented based on any CI solution, and its placement inside the Kubernetes cluster solves the issue of authentication when interacting with the Kubernetes API.
Let's highlight it as the second use case - running Spark tasks as part of a CI / CD process on a Kubernetes cluster in a test circuit.
A little about Apache Livy - it works as an HTTP server that provides a Web interface and a RESTful API that allows you to remotely launch spark-submit by passing the necessary parameters. It was traditionally shipped as part of the HDP distribution, but can also be deployed to OKD or any other Kubernetes installation using the appropriate manifest and a set of Docker images, such as this β github.com/ttauveron/k8s-big-data-experiments/tree/master/livy-spark-2.3. For our case, a similar Docker image was built, including Spark version 2.4.5 from the following Dockerfile:
The generated image can be built and uploaded to a Docker repository you have, such as the internal OKD repository. To deploy it, use the following manifest ({registry-url} β Docker image registry URL, {image-name} β Docker image name, {tag} β Docker image tag, {livy-url} β desired URL where the server will be available Livy; the "Route" manifest is used if Red Hat OpenShift is used as the Kubernetes distribution, otherwise the corresponding Ingress or Service manifest of type NodePort is used):
After applying it and successfully running the pod, the Livy GUI is available at: http://{livy-url}/ui. With Livy, we can publish our Spark task using a REST request from Postman, for example. An example of a collection with requests is presented below (configuration arguments with variables necessary for the work of the launched task can be passed in the "args" array):
Let's execute the first request from the collection, go to the OKD interface and check that the task has been successfully launched - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods. At the same time, a session will appear in the Livy interface (http://{livy-url}/ui), within which, using the Livy API or a graphical interface, you can monitor the progress of the task and study the session logs.
Now let's show how Livy works. To do this, let's examine the logs of the Livy container inside the pod with the Livy server - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods/{livy-pod-name}?tab=logs. You can see from them that when calling the Livy REST API in a container named "livy", a spark-submit is performed, similar to the one we used above (here {livy-pod-name} is the name of the created pod with the Livy server). The collection also provides a second query that allows you to run tasks with remote hosting of the Spark executable using the Livy server.
Third use case - Spark Operator
Now that the task has been tested, the question arises of its regular launch. The native way to regularly run tasks in a Kubernetes cluster is the CronJob entity and you can use it, but at the moment the use of operators for managing applications in Kubernetes is very popular and there is a fairly mature operator for Spark, which, among other things, is used in Enterprise-level solutions (for example, Lightbend FastData Platform). We recommend using it - the current stable version of Spark (2.4.5) has rather limited options for configuring the launch of Spark tasks in Kubernetes, while the next major version (3.0.0) claims full support for Kubernetes, but its release date remains unknown. Spark Operator makes up for this by adding important configuration options (such as mounting a ConfigMap with Hadoop access configuration to Spark Pods) and the ability to run a scheduled task on a regular basis.
Let's single it out as a third use case - regularly running Spark tasks on a Kubernetes cluster in a production loop.
Using manifests from the official repository (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/tree/master/manifest). At the same time, it is worth noting the following - Cloudflow includes an operator with an API version of v1beta1. If this type of installation is used, then Spark application manifest descriptions should be based on examples from tags in Git with the appropriate API version, for example, "v1beta1-0.9.0-2.4.0". The version of the operator can be viewed in the description of the CRD that is part of the operator in the "versions" dictionary:
oc get crd sparkapplications.sparkoperator.k8s.io -o yaml
If the operator is installed correctly, then the corresponding project will have an active Pod with the Spark operator (for example, cloudflow-fdp-sparkoperator in the Cloudflow space for installing Cloudflow) and the corresponding Kubernetes resource type named "sparkapplications" will appear. You can explore the available Spark applications with the following command:
oc get sparkapplications -n {project}
To run tasks using the Spark Operator, you need to do 3 things:
create a Docker image that includes all the necessary libraries, as well as configuration and executable files. In the target picture, this is an image created at the CI / CD stage and tested on a test cluster;
publish the Docker image to a registry accessible from the Kubernetes cluster;
the "apiVersion" dictionary must contain the API version corresponding to the version of the operator;
the "metadata.namespace" dictionary must contain the namespace in which the application will be launched;
the "spec.image" dictionary should contain the address of the created Docker image in the available registry;
the "spec.mainClass" dictionary must contain the class of the Spark task that you want to run when the process starts;
the "spec.mainApplicationFile" dictionary must contain the path to the executable jar file;
the "spec.sparkVersion" dictionary should be the version of Spark being used;
the "spec.driver.serviceAccount" dictionary must contain the service account within the corresponding Kubernetes namespace that will be used to run the application;
the "spec.executor" dictionary should indicate the amount of resources allocated to the application;
the "spec.volumeMounts" dictionary must be set to the local directory where the local Spark task files will be created.
An example of generating a manifest (here {spark-service-account} is a service account within a Kubernetes cluster for running Spark tasks):
This manifest specifies a service account that requires the necessary role bindings to be created prior to publishing the manifest to provide the necessary permissions for the Spark application to interact with the Kubernetes API (if needed). In our case, the application needs the rights to create Pods. Let's create the necessary role binding:
It is also worth noting that the "hadoopConfigMap" parameter can be specified in the specification of this manifest, which allows you to specify a ConfigMap with a Hadoop configuration without having to first put the corresponding file in the Docker image. It is also suitable for regular launch of tasks - using the "schedule" parameter, the schedule for launching this task can be specified.
After that, we save our manifest to the spark-pi.yaml file and apply it to our Kubernetes cluster:
oc apply -f spark-pi.yaml
This will create an object of type "sparkapplications":
oc get sparkapplications -n {project}
> NAME AGE
> spark-pi 22h
This will create a pod with an application whose status will be displayed in the created "sparkapplications". It can be viewed with the following command:
oc get sparkapplications spark-pi -o yaml -n {project}
Upon completion of the task, the POD will move to the "Completed" status, which will also be updated in "sparkapplications". Application logs can be viewed in the browser or with the following command (here {sparkapplications-pod-name} is the name of the running task pod):
oc logs {sparkapplications-pod-name} -n {project}
Also, Spark tasks can be managed using the specialized sparkctl utility. To install it, we clone the repository with its source code, install Go and build this utility:
git clone https://github.com/GoogleCloudPlatform/spark-on-k8s-operator.git
cd spark-on-k8s-operator/
wget https://dl.google.com/go/go1.13.3.linux-amd64.tar.gz
tar -xzf go1.13.3.linux-amd64.tar.gz
sudo mv go /usr/local
mkdir $HOME/Projects
export GOROOT=/usr/local/go
export GOPATH=$HOME/Projects
export PATH=$GOPATH/bin:$GOROOT/bin:$PATH
go -version
cd sparkctl
go build -o sparkctl
sudo mv sparkctl /usr/local/bin
Let's examine the list of events of a running Spark task:
sparkctl event spark-pi -n {project} -f
Let's examine the status of a running Spark task:
sparkctl status spark-pi -n {project}
In conclusion, I would like to consider the discovered disadvantages of operating the current stable version of Spark (2.4.5) in Kubernetes:
The first and, perhaps, the main disadvantage is the lack of Data Locality. Despite all the shortcomings of YARN, there were pluses in its use, for example, the principle of delivering code to data (rather than data to code). Thanks to him, Spark tasks were performed on the nodes where the data involved in the calculations was located, and the time to deliver data across the network was noticeably reduced. When using Kubernetes, we are faced with the need to move through the network the data involved in the work of the task. If they are large enough, then the task execution time can increase significantly, and a sufficiently large amount of disk space is required to be allocated to Spark task instances for their temporary storage. This drawback can be reduced by using specialized software tools that provide data locality in Kubernetes (for example, Alluxio), but this actually means the need to store a complete copy of the data on the nodes of the Kubernetes cluster.
The second major downside is security. By default, security-related features regarding running Spark tasks are disabled, the use of Kerberos is not covered in the official documentation (although the corresponding options appeared in version 3.0.0, which will require further work), and in the security documentation when using Spark (https ://spark.apache.org/docs/2.4.5/security.html) only YARN, Mesos and Standalone Cluster appear as keystores. At the same time, the user under which Spark tasks are launched cannot be specified directly - we only set the service account under which it will work under, and the user is selected based on the configured security policies. In this regard, either the root user is used, which is not safe in a productive environment, or a user with a random UID, which is inconvenient when distributing data access rights (decided by creating PodSecurityPolicies and linking them to the corresponding service accounts). At the moment, the solution is either putting all the necessary files directly into the Docker image, or modifying the Spark startup script to use the mechanism for storing and retrieving secrets accepted in your organization.
Running Spark tasks with Kubernetes is still officially in experimental mode and there may be significant changes in the artifacts used (config files, Docker base images, and startup scripts) in the future. And indeed - when preparing the material, versions 2.3.0 and 2.4.5 were tested, the behavior was significantly different.
Let's wait for updates - a fresh version of Spark (3.0.0) was recently released, which brought tangible changes to the work of Spark on Kubernetes, but retained the experimental status of support for this resource manager. Perhaps the next updates will really make it possible to fully recommend abandoning YARN and running Spark tasks on Kubernetes without fear for the security of your system and without the need to refine the functional components yourself.