Ejecutando Apache Spark en Kubernetes

Queridos lectores, buenas tardes. Hoy hablaremos un poco sobre Apache Spark y sus perspectivas de desarrollo.

Ejecutando Apache Spark en Kubernetes

En el mundo moderno de Big Data, Apache Spark es el estándar de facto para desarrollar tareas de procesamiento de datos por lotes. Además, también se utiliza para crear aplicaciones de streaming que funcionan en el concepto de micro lotes, procesando y enviando datos en pequeñas porciones (Spark Structured Streaming). Y tradicionalmente ha sido parte de la pila general de Hadoop, utilizando YARN (o en algunos casos Apache Mesos) como administrador de recursos. Para 2020, su uso en su forma tradicional está en duda para la mayoría de las empresas debido a la falta de distribuciones decentes de Hadoop: el desarrollo de HDP y CDH se ha detenido, CDH no está bien desarrollado y tiene un alto costo, y los proveedores restantes de Hadoop han O dejó de existir o tiene un futuro oscuro. Por lo tanto, el lanzamiento de Apache Spark utilizando Kubernetes es de creciente interés entre la comunidad y las grandes empresas: al convertirse en un estándar en la orquestación de contenedores y la gestión de recursos en nubes públicas y privadas, resuelve el problema de la programación inconveniente de recursos de las tareas de Spark en YARN y proporciona una plataforma en constante desarrollo con muchas distribuciones comerciales y abiertas para empresas de todos los tamaños y tipos. Además, a raíz de su popularidad, la mayoría ya ha conseguido adquirir un par de instalaciones propias y ha aumentado su experiencia en su uso, lo que simplifica la mudanza.

A partir de la versión 2.3.0, Apache Spark adquirió soporte oficial para ejecutar tareas en un clúster de Kubernetes y hoy hablaremos sobre la madurez actual de este enfoque, varias opciones para su uso y los obstáculos que se encontrarán durante la implementación.

En primer lugar, veamos el proceso de desarrollo de tareas y aplicaciones basadas en Apache Spark y resaltemos los casos típicos en los que necesita ejecutar una tarea en un clúster de Kubernetes. Al preparar esta publicación, se utiliza OpenShift como distribución y se proporcionarán comandos relevantes para su utilidad de línea de comandos (oc). Para otras distribuciones de Kubernetes, se pueden utilizar los comandos correspondientes de la utilidad de línea de comandos estándar de Kubernetes (kubectl) o sus análogos (por ejemplo, para la política oc adm).

Primer caso de uso: envío de chispa

Durante el desarrollo de tareas y aplicaciones, el desarrollador necesita ejecutar tareas para depurar la transformación de datos. Teóricamente, los stubs se pueden utilizar para estos fines, pero el desarrollo con la participación de instancias reales (aunque de prueba) de sistemas finales ha demostrado ser más rápido y mejor en esta clase de tareas. En el caso de que depuremos en instancias reales de sistemas finales, son posibles dos escenarios:

  • el desarrollador ejecuta una tarea Spark localmente en modo independiente;

    Ejecutando Apache Spark en Kubernetes

  • un desarrollador ejecuta una tarea Spark en un clúster de Kubernetes en un bucle de prueba.

    Ejecutando Apache Spark en Kubernetes

La primera opción tiene derecho a existir, pero conlleva una serie de desventajas:

  • Cada desarrollador debe tener acceso desde el lugar de trabajo a todas las instancias de los sistemas finales que necesita;
  • Se requiere una cantidad suficiente de recursos en la máquina de trabajo para ejecutar la tarea que se está desarrollando.

La segunda opción no tiene estas desventajas, ya que el uso de un clúster de Kubernetes le permite asignar el grupo de recursos necesario para ejecutar tareas y proporcionarle el acceso necesario a las instancias finales del sistema, brindándole acceso de manera flexible utilizando el modelo de rol de Kubernetes para todos los miembros del equipo de desarrollo. Destaquémoslo como el primer caso de uso: iniciar tareas de Spark desde una máquina de desarrollador local en un clúster de Kubernetes en un circuito de prueba.

Hablemos más sobre el proceso de configuración de Spark para que se ejecute localmente. Para comenzar a usar Spark necesitas instalarlo:

mkdir /opt/spark
cd /opt/spark
wget http://mirror.linux-ia64.org/apache/spark/spark-2.4.5/spark-2.4.5.tgz
tar zxvf spark-2.4.5.tgz
rm -f spark-2.4.5.tgz

Recopilamos los paquetes necesarios para trabajar con Kubernetes:

cd spark-2.4.5/
./build/mvn -Pkubernetes -DskipTests clean package

Una compilación completa lleva mucho tiempo y, para crear imágenes de Docker y ejecutarlas en un clúster de Kubernetes, en realidad solo necesitas archivos jar del directorio “assembly/”, por lo que solo puedes compilar este subproyecto:

./build/mvn -f ./assembly/pom.xml -Pkubernetes -DskipTests clean package

Para ejecutar trabajos de Spark en Kubernetes, debe crear una imagen de Docker para usarla como imagen base. Hay 2 enfoques posibles aquí:

  • La imagen de Docker generada incluye el código de tarea ejecutable de Spark;
  • La imagen creada incluye solo Spark y las dependencias necesarias, el código ejecutable se aloja de forma remota (por ejemplo, en HDFS).

Primero, creemos una imagen de Docker que contenga un ejemplo de prueba de una tarea de Spark. Para crear imágenes de Docker, Spark tiene una utilidad llamada "docker-image-tool". Estudiemos la ayuda al respecto:

./bin/docker-image-tool.sh --help

Con él, puede crear imágenes de Docker y cargarlas en registros remotos, pero de forma predeterminada tiene una serie de desventajas:

  • sin falta crea 3 imágenes de Docker a la vez: para Spark, PySpark y R;
  • no le permite especificar un nombre de imagen.

Por lo tanto, utilizaremos una versión modificada de esta utilidad que se proporciona a continuación:

vi bin/docker-image-tool-upd.sh

#!/usr/bin/env bash

function error {
  echo "$@" 1>&2
  exit 1
}

if [ -z "${SPARK_HOME}" ]; then
  SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
. "${SPARK_HOME}/bin/load-spark-env.sh"

function image_ref {
  local image="$1"
  local add_repo="${2:-1}"
  if [ $add_repo = 1 ] && [ -n "$REPO" ]; then
    image="$REPO/$image"
  fi
  if [ -n "$TAG" ]; then
    image="$image:$TAG"
  fi
  echo "$image"
}

function build {
  local BUILD_ARGS
  local IMG_PATH

  if [ ! -f "$SPARK_HOME/RELEASE" ]; then
    IMG_PATH=$BASEDOCKERFILE
    BUILD_ARGS=(
      ${BUILD_PARAMS}
      --build-arg
      img_path=$IMG_PATH
      --build-arg
      datagram_jars=datagram/runtimelibs
      --build-arg
      spark_jars=assembly/target/scala-$SPARK_SCALA_VERSION/jars
    )
  else
    IMG_PATH="kubernetes/dockerfiles"
    BUILD_ARGS=(${BUILD_PARAMS})
  fi

  if [ -z "$IMG_PATH" ]; then
    error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
  fi

  if [ -z "$IMAGE_REF" ]; then
    error "Cannot find docker image reference. Please add -i arg."
  fi

  local BINDING_BUILD_ARGS=(
    ${BUILD_PARAMS}
    --build-arg
    base_img=$(image_ref $IMAGE_REF)
  )
  local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/docker/Dockerfile"}

  docker build $NOCACHEARG "${BUILD_ARGS[@]}" 
    -t $(image_ref $IMAGE_REF) 
    -f "$BASEDOCKERFILE" .
}

function push {
  docker push "$(image_ref $IMAGE_REF)"
}

function usage {
  cat <<EOF
Usage: $0 [options] [command]
Builds or pushes the built-in Spark Docker image.

Commands:
  build       Build image. Requires a repository address to be provided if the image will be
              pushed to a different registry.
  push        Push a pre-built image to a registry. Requires a repository address to be provided.

Options:
  -f file               Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
  -p file               Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark.
  -R file               Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark.
  -r repo               Repository address.
  -i name               Image name to apply to the built image, or to identify the image to be pushed.  
  -t tag                Tag to apply to the built image, or to identify the image to be pushed.
  -m                    Use minikube's Docker daemon.
  -n                    Build docker image with --no-cache
  -b arg      Build arg to build or push the image. For multiple build args, this option needs to
              be used separately for each build arg.

Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
available when running applications inside the minikube cluster.

Check the following documentation for more information on using the minikube Docker daemon:

  https://kubernetes.io/docs/getting-started-guides/minikube/#reusing-the-docker-daemon

Examples:
  - Build image in minikube with tag "testing"
    $0 -m -t testing build

  - Build and push image with tag "v2.3.0" to docker.io/myrepo
    $0 -r docker.io/myrepo -t v2.3.0 build
    $0 -r docker.io/myrepo -t v2.3.0 push
EOF
}

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
  usage
  exit 0
fi

REPO=
TAG=
BASEDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
IMAGE_REF=
while getopts f:mr:t:nb:i: option
do
 case "${option}"
 in
 f) BASEDOCKERFILE=${OPTARG};;
 r) REPO=${OPTARG};;
 t) TAG=${OPTARG};;
 n) NOCACHEARG="--no-cache";;
 i) IMAGE_REF=${OPTARG};;
 b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};;
 esac
done

case "${@: -1}" in
  build)
    build
    ;;
  push)
    if [ -z "$REPO" ]; then
      usage
      exit 1
    fi
    push
    ;;
  *)
    usage
    exit 1
    ;;
esac

Con su ayuda, ensamblamos una imagen básica de Spark que contiene una tarea de prueba para calcular Pi usando Spark (aquí {docker-registry-url} es la URL de su registro de imágenes de Docker, {repo} es el nombre del repositorio dentro del registro, que coincide con el proyecto en OpenShift, {image-name} - nombre de la imagen (si se utiliza una separación de imágenes de tres niveles, por ejemplo, como en el registro integrado de imágenes de Red Hat OpenShift), {tag} - etiqueta de este versión de la imagen):

./bin/docker-image-tool-upd.sh -f resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile -r {docker-registry-url}/{repo} -i {image-name} -t {tag} build

Inicie sesión en el clúster OKD mediante la utilidad de la consola (aquí {OKD-API-URL} es la URL de la API del clúster OKD):

oc login {OKD-API-URL}

Obtengamos el token del usuario actual para autorización en el Registro Docker:

oc whoami -t

Inicie sesión en el Registro Docker interno del clúster OKD (usamos el token obtenido usando el comando anterior como contraseña):

docker login {docker-registry-url}

Carguemos la imagen de Docker ensamblada en Docker Registry OKD:

./bin/docker-image-tool-upd.sh -r {docker-registry-url}/{repo} -i {image-name} -t {tag} push

Comprobemos que la imagen ensamblada esté disponible en OKD. Para hacer esto, abra la URL en el navegador con una lista de imágenes del proyecto correspondiente (aquí {proyecto} es el nombre del proyecto dentro del clúster de OpenShift, {OKD-WEBUI-URL} es la URL de la consola web de OpenShift ) - https://{OKD-WEBUI-URL}/console /project/{project}/browse/images/{image-name}.

Para ejecutar tareas, se debe crear una cuenta de servicio con privilegios para ejecutar pods como root (analizaremos este punto más adelante):

oc create sa spark -n {project}
oc adm policy add-scc-to-user anyuid -z spark -n {project}

Ejecutemos el comando spark-submit para publicar una tarea de Spark en el clúster OKD, especificando la cuenta de servicio creada y la imagen de Docker:

 /opt/spark/bin/spark-submit --name spark-test --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar

Aquí:

—nombre: el nombre de la tarea que participará en la formación del nombre de los pods de Kubernetes;

—class — clase del archivo ejecutable, llamada cuando se inicia la tarea;

—conf — Parámetros de configuración de Spark;

spark.executor.instances: la cantidad de ejecutores de Spark que se lanzarán;

spark.kubernetes.authenticate.driver.serviceAccountName: el nombre de la cuenta de servicio de Kubernetes utilizada al iniciar pods (para definir el contexto de seguridad y las capacidades al interactuar con la API de Kubernetes);

spark.kubernetes.namespace: espacio de nombres de Kubernetes en el que se lanzarán los pods de controladores y ejecutores;

spark.submit.deployMode: método para iniciar Spark (para el envío estándar de Spark se utiliza el “clúster”, para Spark Operador y versiones posteriores de Spark el “cliente”);

spark.kubernetes.container.image: imagen de Docker utilizada para iniciar pods;

spark.master: URL de la API de Kubernetes (se especifica externa para que el acceso se produzca desde la máquina local);

local:// es la ruta al ejecutable de Spark dentro de la imagen de Docker.

Vamos al proyecto OKD correspondiente y estudiamos los pods creados: https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods.

Para simplificar el proceso de desarrollo, se puede utilizar otra opción, en la que se crea una imagen base común de Spark, utilizada por todas las tareas a ejecutar, y las instantáneas de los archivos ejecutables se publican en un almacenamiento externo (por ejemplo, Hadoop) y se especifican al llamar. Spark-envío como enlace. En este caso, puede ejecutar diferentes versiones de tareas de Spark sin reconstruir imágenes de Docker, utilizando, por ejemplo, WebHDFS para publicar imágenes. Enviamos una solicitud para crear un archivo (aquí {host} es el host del servicio WebHDFS, {port} es el puerto del servicio WebHDFS, {path-to-file-on-hdfs} es la ruta deseada al archivo en HDFS):

curl -i -X PUT "http://{host}:{port}/webhdfs/v1/{path-to-file-on-hdfs}?op=CREATE

Recibirá una respuesta como esta (aquí {ubicación} está la URL que debe usarse para descargar el archivo):

HTTP/1.1 307 TEMPORARY_REDIRECT
Location: {location}
Content-Length: 0

Cargue el archivo ejecutable de Spark en HDFS (aquí {ruta al archivo local} es la ruta al archivo ejecutable de Spark en el host actual):

curl -i -X PUT -T {path-to-local-file} "{location}"

Después de esto, podemos realizar el envío Spark utilizando el archivo Spark cargado en HDFS (aquí {nombre-clase} es el nombre de la clase que debe iniciarse para completar la tarea):

/opt/spark/bin/spark-submit --name spark-test --class {class-name} --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  hdfs://{host}:{port}/{path-to-file-on-hdfs}

Cabe señalar que para acceder a HDFS y garantizar que la tarea funcione, es posible que deba cambiar el Dockerfile y el script Entrypoint.sh: agregue una directiva al Dockerfile para copiar las bibliotecas dependientes al directorio /opt/spark/jars y incluya el archivo de configuración HDFS en SPARK_CLASSPATH en el punto de entrada.sh.

Segundo caso de uso: Apache Livy

Además, cuando se desarrolla una tarea y es necesario probar el resultado, surge la cuestión de iniciarla como parte del proceso CI/CD y realizar un seguimiento del estado de su ejecución. Por supuesto, puede ejecutarlo mediante una llamada de envío de Spark local, pero esto complica la infraestructura de CI/CD, ya que requiere instalar y configurar Spark en los agentes/ejecutadores del servidor de CI y configurar el acceso a la API de Kubernetes. Para este caso, la implementación de destino optó por utilizar Apache Livy como API REST para ejecutar tareas Spark alojadas dentro de un clúster de Kubernetes. Con su ayuda, puede ejecutar tareas de Spark en un clúster de Kubernetes mediante solicitudes cURL regulares, que se implementa fácilmente en función de cualquier solución de CI, y su ubicación dentro del clúster de Kubernetes resuelve el problema de autenticación al interactuar con la API de Kubernetes.

Ejecutando Apache Spark en Kubernetes

Destaquémoslo como un segundo caso de uso: ejecutar tareas de Spark como parte de un proceso de CI/CD en un clúster de Kubernetes en un bucle de prueba.

Un poco sobre Apache Livy: funciona como un servidor HTTP que proporciona una interfaz web y una API RESTful que le permite iniciar Spark-Submit de forma remota pasando los parámetros necesarios. Tradicionalmente, se ha enviado como parte de una distribución HDP, pero también se puede implementar en OKD o cualquier otra instalación de Kubernetes utilizando el manifiesto apropiado y un conjunto de imágenes de Docker, como esta: github.com/ttauveron/k8s-big-data-experiments/tree/master/livy-spark-2.3. Para nuestro caso, se creó una imagen de Docker similar, que incluye Spark versión 2.4.5 del siguiente Dockerfile:

FROM java:8-alpine

ENV SPARK_HOME=/opt/spark
ENV LIVY_HOME=/opt/livy
ENV HADOOP_CONF_DIR=/etc/hadoop/conf
ENV SPARK_USER=spark

WORKDIR /opt

RUN apk add --update openssl wget bash && 
    wget -P /opt https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz && 
    tar xvzf spark-2.4.5-bin-hadoop2.7.tgz && 
    rm spark-2.4.5-bin-hadoop2.7.tgz && 
    ln -s /opt/spark-2.4.5-bin-hadoop2.7 /opt/spark

RUN wget http://mirror.its.dal.ca/apache/incubator/livy/0.7.0-incubating/apache-livy-0.7.0-incubating-bin.zip && 
    unzip apache-livy-0.7.0-incubating-bin.zip && 
    rm apache-livy-0.7.0-incubating-bin.zip && 
    ln -s /opt/apache-livy-0.7.0-incubating-bin /opt/livy && 
    mkdir /var/log/livy && 
    ln -s /var/log/livy /opt/livy/logs && 
    cp /opt/livy/conf/log4j.properties.template /opt/livy/conf/log4j.properties

ADD livy.conf /opt/livy/conf
ADD spark-defaults.conf /opt/spark/conf/spark-defaults.conf
ADD entrypoint.sh /entrypoint.sh

ENV PATH="/opt/livy/bin:${PATH}"

EXPOSE 8998

ENTRYPOINT ["/entrypoint.sh"]
CMD ["livy-server"]

La imagen generada se puede crear y cargar en su repositorio Docker existente, como el repositorio interno OKD. Para implementarlo, use el siguiente manifiesto ({registry-url} - URL del registro de imágenes de Docker, {image-name} - nombre de la imagen de Docker, {tag} - etiqueta de imagen de Docker, {livy-url} - URL deseada donde Se podrá acceder al servidor Livy; el manifiesto “Ruta” se usa si se usa Red Hat OpenShift como distribución de Kubernetes; de lo contrario, se usa el manifiesto de Ingress o Servicio correspondiente de tipo NodePort):

---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: livy
  name: livy
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      component: livy
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        component: livy
    spec:
      containers:
        - command:
            - livy-server
          env:
            - name: K8S_API_HOST
              value: localhost
            - name: SPARK_KUBERNETES_IMAGE
              value: 'gnut3ll4/spark:v1.0.14'
          image: '{registry-url}/{image-name}:{tag}'
          imagePullPolicy: Always
          name: livy
          ports:
            - containerPort: 8998
              name: livy-rest
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - mountPath: /var/log/livy
              name: livy-log
            - mountPath: /opt/.livy-sessions/
              name: livy-sessions
            - mountPath: /opt/livy/conf/livy.conf
              name: livy-config
              subPath: livy.conf
            - mountPath: /opt/spark/conf/spark-defaults.conf
              name: spark-config
              subPath: spark-defaults.conf
        - command:
            - /usr/local/bin/kubectl
            - proxy
            - '--port'
            - '8443'
          image: 'gnut3ll4/kubectl-sidecar:latest'
          imagePullPolicy: Always
          name: kubectl
          ports:
            - containerPort: 8443
              name: k8s-api
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      serviceAccount: spark
      serviceAccountName: spark
      terminationGracePeriodSeconds: 30
      volumes:
        - emptyDir: {}
          name: livy-log
        - emptyDir: {}
          name: livy-sessions
        - configMap:
            defaultMode: 420
            items:
              - key: livy.conf
                path: livy.conf
            name: livy-config
          name: livy-config
        - configMap:
            defaultMode: 420
            items:
              - key: spark-defaults.conf
                path: spark-defaults.conf
            name: livy-config
          name: spark-config
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: livy-config
data:
  livy.conf: |-
    livy.spark.deploy-mode=cluster
    livy.file.local-dir-whitelist=/opt/.livy-sessions/
    livy.spark.master=k8s://http://localhost:8443
    livy.server.session.state-retain.sec = 8h
  spark-defaults.conf: 'spark.kubernetes.container.image        "gnut3ll4/spark:v1.0.14"'
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: livy
  name: livy
spec:
  ports:
    - name: livy-rest
      port: 8998
      protocol: TCP
      targetPort: 8998
  selector:
    component: livy
  sessionAffinity: None
  type: ClusterIP
---
apiVersion: route.openshift.io/v1
kind: Route
metadata:
  labels:
    app: livy
  name: livy
spec:
  host: {livy-url}
  port:
    targetPort: livy-rest
  to:
    kind: Service
    name: livy
    weight: 100
  wildcardPolicy: None

Después de aplicarlo e iniciar exitosamente el pod, la interfaz gráfica de Livy está disponible en el enlace: http://{livy-url}/ui. Con Livy, podemos publicar nuestra tarea Spark utilizando una solicitud REST de, por ejemplo, Postman. A continuación se presenta un ejemplo de una colección con solicitudes (los argumentos de configuración con las variables necesarias para el funcionamiento de la tarea iniciada se pueden pasar en la matriz "args"):

{
    "info": {
        "_postman_id": "be135198-d2ff-47b6-a33e-0d27b9dba4c8",
        "name": "Spark Livy",
        "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
    },
    "item": [
        {
            "name": "1 Submit job with jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar", nt"className": "org.apache.spark.examples.SparkPi",nt"numExecutors":1,nt"name": "spark-test-1",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt}n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        },
        {
            "name": "2 Submit job without jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "hdfs://{host}:{port}/{path-to-file-on-hdfs}", nt"className": "{class-name}",nt"numExecutors":1,nt"name": "spark-test-2",nt"proxyUser": "0",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt},nt"args": [ntt"HADOOP_CONF_DIR=/opt/spark/hadoop-conf",ntt"MASTER=k8s://https://kubernetes.default.svc:8443"nt]n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        }
    ],
    "event": [
        {
            "listen": "prerequest",
            "script": {
                "id": "41bea1d0-278c-40c9-ad42-bf2e6268897d",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        },
        {
            "listen": "test",
            "script": {
                "id": "3cdd7736-a885-4a2d-9668-bd75798f4560",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        }
    ],
    "protocolProfileBehavior": {}
}

Ejecutemos la primera solicitud de la colección, vayamos a la interfaz OKD y verifiquemos que la tarea se haya iniciado correctamente: https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods. Al mismo tiempo, aparecerá una sesión en la interfaz de Livy (http://{livy-url}/ui), dentro de la cual, utilizando la API de Livy o la interfaz gráfica, podrá seguir el progreso de la tarea y estudiar la sesión. registros.

Ahora mostremos cómo funciona Livy. Para hacer esto, examinemos los registros del contenedor Livy dentro del pod con el servidor Livy: https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods/{livy-pod-name }?tab=registros. De ellos podemos ver que al llamar a la API REST de Livy en un contenedor llamado "livy", se ejecuta un envío de chispa, similar al que usamos anteriormente (aquí {livy-pod-name} es el nombre del pod creado con el servidor Livy). La colección también presenta una segunda consulta que le permite ejecutar tareas que alojan de forma remota un ejecutable de Spark utilizando un servidor Livy.

Tercer caso de uso: Operador Spark

Ahora que la tarea ha sido probada, surge la cuestión de ejecutarla regularmente. La forma nativa de ejecutar tareas regularmente en un clúster de Kubernetes es la entidad CronJob y puedes usarla, pero por el momento el uso de operadores para administrar aplicaciones en Kubernetes es muy popular y para Spark existe un operador bastante maduro, que también es utilizado en soluciones de nivel empresarial (por ejemplo, Lightbend FastData Platform). Recomendamos usarlo: la versión estable actual de Spark (2.4.5) tiene opciones de configuración bastante limitadas para ejecutar tareas de Spark en Kubernetes, mientras que la próxima versión principal (3.0.0) declara soporte total para Kubernetes, pero se desconoce su fecha de lanzamiento. . Spark Operador compensa esta deficiencia agregando importantes opciones de configuración (por ejemplo, montar un ConfigMap con configuración de acceso a Hadoop en los pods de Spark) y la capacidad de ejecutar una tarea programada regularmente.

Ejecutando Apache Spark en Kubernetes
Destaquémoslo como un tercer caso de uso: ejecutar regularmente tareas de Spark en un clúster de Kubernetes en un ciclo de producción.

Spark Operador es de código abierto y está desarrollado dentro de Google Cloud Platform. github.com/GoogleCloudPlatform/spark-on-k8s-operator. Su instalación se puede realizar de 3 formas:

  1. Como parte de la instalación de Lightbend FastData Platform/Cloudflow;
  2. Usando timón:
    helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
    helm install incubator/sparkoperator --namespace spark-operator
    	

  3. Usando manifiestos del repositorio oficial (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/tree/master/manifest). Vale la pena señalar lo siguiente: Cloudflow incluye un operador con la versión API v1beta1. Si se utiliza este tipo de instalación, las descripciones del manifiesto de la aplicación Spark deben basarse en etiquetas de ejemplo en Git con la versión de API adecuada, por ejemplo, "v1beta1-0.9.0-2.4.0". La versión del operador la podemos encontrar en la descripción del CRD incluida en el operador en el diccionario “versiones”:
    oc get crd sparkapplications.sparkoperator.k8s.io -o yaml
    	

Si el operador está instalado correctamente, aparecerá un pod activo con el operador Spark en el proyecto correspondiente (por ejemplo, cloudflow-fdp-sparkoperator en el espacio de Cloudflow para la instalación de Cloudflow) y aparecerá un tipo de recurso de Kubernetes correspondiente llamado "sparkapplications". . Puede explorar las aplicaciones Spark disponibles con el siguiente comando:

oc get sparkapplications -n {project}

Para ejecutar tareas usando Spark Operador necesitas hacer 3 cosas:

  • cree una imagen de Docker que incluya todas las bibliotecas necesarias, así como archivos de configuración y ejecutables. En la imagen de destino, esta es una imagen creada en la etapa CI/CD y probada en un grupo de prueba;
  • publicar una imagen de Docker en un registro accesible desde el clúster de Kubernetes;
  • generar un manifiesto con el tipo “SparkApplication” y una descripción de la tarea a ejecutar. Los manifiestos de ejemplo están disponibles en el repositorio oficial (p. ej. github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta1-0.9.0-2.4.0/examples/spark-pi.yaml). Hay puntos importantes a tener en cuenta sobre el manifiesto:
    1. el diccionario “apiVersion” debe indicar la versión de API correspondiente a la versión del operador;
    2. el diccionario “metadata.namespace” debe indicar el espacio de nombres en el que se iniciará la aplicación;
    3. el diccionario “spec.image” debe contener la dirección de la imagen Docker creada en un registro accesible;
    4. el diccionario "spec.mainClass" debe contener la clase de tarea Spark que debe ejecutarse cuando se inicia el proceso;
    5. el diccionario “spec.mainApplicationFile” debe contener la ruta al archivo jar ejecutable;
    6. el diccionario “spec.sparkVersion” debe indicar la versión de Spark que se está utilizando;
    7. el diccionario “spec.driver.serviceAccount” debe especificar la cuenta de servicio dentro del espacio de nombres de Kubernetes correspondiente que se utilizará para ejecutar la aplicación;
    8. el diccionario “spec.executor” debe indicar la cantidad de recursos asignados a la aplicación;
    9. el diccionario "spec.volumeMounts" debe especificar el directorio local en el que se crearán los archivos de tareas locales de Spark.

Un ejemplo de generación de un manifiesto (aquí {spark-service-account} es una cuenta de servicio dentro del clúster de Kubernetes para ejecutar tareas de Spark):

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: {spark-service-account}
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Este manifiesto especifica una cuenta de servicio para la cual, antes de publicar el manifiesto, debe crear los enlaces de roles necesarios que proporcionen los derechos de acceso necesarios para que la aplicación Spark interactúe con la API de Kubernetes (si es necesario). En nuestro caso, la aplicación necesita derechos para crear Pods. Creemos el enlace de roles necesario:

oc adm policy add-role-to-user edit system:serviceaccount:{project}:{spark-service-account} -n {project}

También vale la pena señalar que esta especificación de manifiesto puede incluir un parámetro "hadoopConfigMap", que le permite especificar un ConfigMap con la configuración de Hadoop sin tener que colocar primero el archivo correspondiente en la imagen de Docker. También es adecuado para ejecutar tareas con regularidad: utilizando el parámetro "programación", se puede especificar un cronograma para ejecutar una tarea determinada.

Después de eso, guardamos nuestro manifiesto en el archivo spark-pi.yaml y lo aplicamos a nuestro clúster de Kubernetes:

oc apply -f spark-pi.yaml

Esto creará un objeto de tipo “sparkapplications”:

oc get sparkapplications -n {project}
> NAME       AGE
> spark-pi   22h

En este caso, se creará un pod con una aplicación, cuyo estado se mostrará en las “sparkapplications” creadas. Puedes verlo con el siguiente comando:

oc get sparkapplications spark-pi -o yaml -n {project}

Al finalizar la tarea, el POD pasará al estado "Completado", que también se actualizará en "sparkapplications". Los registros de la aplicación se pueden ver en el navegador o usando el siguiente comando (aquí {sparkapplications-pod-name} es el nombre del pod de la tarea en ejecución):

oc logs {sparkapplications-pod-name} -n {project}

Las tareas de Spark también se pueden gestionar mediante la utilidad especializada sparkctl. Para instalarlo, clone el repositorio con su código fuente, instale Go y cree esta utilidad:

git clone https://github.com/GoogleCloudPlatform/spark-on-k8s-operator.git
cd spark-on-k8s-operator/
wget https://dl.google.com/go/go1.13.3.linux-amd64.tar.gz
tar -xzf go1.13.3.linux-amd64.tar.gz
sudo mv go /usr/local
mkdir $HOME/Projects
export GOROOT=/usr/local/go
export GOPATH=$HOME/Projects
export PATH=$GOPATH/bin:$GOROOT/bin:$PATH
go -version
cd sparkctl
go build -o sparkctl
sudo mv sparkctl /usr/local/bin

Examinemos la lista de tareas de Spark en ejecución:

sparkctl list -n {project}

Creemos una descripción para la tarea Spark:

vi spark-app.yaml

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1000m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Ejecutemos la tarea descrita usando sparkctl:

sparkctl create spark-app.yaml -n {project}

Examinemos la lista de tareas de Spark en ejecución:

sparkctl list -n {project}

Examinemos la lista de eventos de una tarea Spark iniciada:

sparkctl event spark-pi -n {project} -f

Examinemos el estado de la tarea Spark en ejecución:

sparkctl status spark-pi -n {project}

En conclusión, me gustaría considerar las desventajas descubiertas al usar la versión estable actual de Spark (2.4.5) en Kubernetes:

  1. La primera y, quizás, la principal desventaja es la falta de localidad de datos. A pesar de todas las deficiencias de YARN, su uso también tenía ventajas, por ejemplo, el principio de entregar código a datos (en lugar de datos a código). Gracias a ello, se ejecutaron tareas de Spark en los nodos donde se encontraban los datos involucrados en los cálculos y se redujo significativamente el tiempo necesario para entregar los datos a través de la red. Cuando utilizamos Kubernetes, nos enfrentamos a la necesidad de mover datos involucrados en una tarea a través de la red. Si son lo suficientemente grandes, el tiempo de ejecución de la tarea puede aumentar significativamente y también requerir una cantidad bastante grande de espacio en disco asignado a las instancias de tareas de Spark para su almacenamiento temporal. Esta desventaja se puede mitigar mediante el uso de software especializado que garantice la localidad de los datos en Kubernetes (por ejemplo, Alluxio), pero esto en realidad significa la necesidad de almacenar una copia completa de los datos en los nodos del clúster de Kubernetes.
  2. La segunda desventaja importante es la seguridad. De forma predeterminada, las funciones relacionadas con la seguridad relacionadas con la ejecución de tareas Spark están deshabilitadas, el uso de Kerberos no está cubierto en la documentación oficial (aunque las opciones correspondientes se introdujeron en la versión 3.0.0, lo que requerirá trabajo adicional) y la documentación de seguridad para Al usar Spark (https://spark.apache.org/docs/2.4.5/security.html), solo YARN, Mesos y Standalone Cluster aparecen como almacenes de claves. Al mismo tiempo, el usuario bajo el cual se inician las tareas de Spark no se puede especificar directamente; solo especificamos la cuenta de servicio bajo la cual funcionará y el usuario se selecciona en función de las políticas de seguridad configuradas. En este sentido, se utiliza el usuario root, que no es seguro en un entorno productivo, o un usuario con un UID aleatorio, lo que resulta inconveniente a la hora de distribuir los derechos de acceso a los datos (esto se puede solucionar creando PodSecurityPolicies y vinculándolos al correspondientes cuentas de servicio). Actualmente, la solución es colocar todos los archivos necesarios directamente en la imagen de Docker o modificar el script de inicio de Spark para utilizar el mecanismo de almacenamiento y recuperación de secretos adoptado en su organización.
  3. La ejecución de trabajos de Spark usando Kubernetes todavía está oficialmente en modo experimental y puede haber cambios significativos en los artefactos utilizados (archivos de configuración, imágenes base de Docker y scripts de inicio) en el futuro. De hecho, al preparar el material, se probaron las versiones 2.3.0 y 2.4.5, el comportamiento fue significativamente diferente.

Esperemos actualizaciones: recientemente se lanzó una nueva versión de Spark (3.0.0), que trajo cambios significativos al trabajo de Spark en Kubernetes, pero mantuvo el estado experimental de soporte para este administrador de recursos. Quizás las próximas actualizaciones realmente permitan recomendar por completo el abandono de YARN y la ejecución de tareas de Spark en Kubernetes sin temor por la seguridad de su sistema y sin la necesidad de modificar componentes funcionales de forma independiente.

Fin.

Fuente: habr.com

Añadir un comentario