Exécuter Apache Spark sur Kubernetes

Chers lecteurs, bon après-midi. Aujourd'hui, nous allons parler un peu d'Apache Spark et de ses perspectives de développement.

Exécuter Apache Spark sur Kubernetes

Dans le monde moderne du Big Data, Apache Spark est le standard de facto pour développer des tâches de traitement de données par lots. En outre, il est également utilisé pour créer des applications de streaming qui fonctionnent selon le concept de micro-batch, traitant et expédiant des données en petites portions (Spark Structured Streaming). Et traditionnellement, il fait partie de la pile Hadoop globale, en utilisant YARN (ou dans certains cas Apache Mesos) comme gestionnaire de ressources. D'ici 2020, son utilisation sous sa forme traditionnelle est remise en question pour la plupart des entreprises en raison du manque de distributions Hadoop décentes - le développement de HDP et CDH s'est arrêté, CDH n'est pas bien développé et a un coût élevé, et les fournisseurs Hadoop restants ont soit ils ont cessé d’exister, soit ils ont un avenir sombre. Par conséquent, le lancement d'Apache Spark à l'aide de Kubernetes suscite un intérêt croissant parmi la communauté et les grandes entreprises - devenant un standard en matière d'orchestration de conteneurs et de gestion des ressources dans les cloud privés et publics, il résout le problème de la planification peu pratique des ressources des tâches Spark sur YARN et fournit une plate-forme en développement constant avec de nombreuses distributions commerciales et ouvertes pour les entreprises de toutes tailles et de tous horizons. De plus, face à la popularité, la plupart ont déjà réussi à acquérir quelques installations qui leur sont propres et ont accru leur expertise dans son utilisation, ce qui simplifie le déménagement.

À partir de la version 2.3.0, Apache Spark a acquis le support officiel pour l'exécution de tâches dans un cluster Kubernetes et aujourd'hui, nous parlerons de la maturité actuelle de cette approche, des différentes options pour son utilisation et des pièges qui seront rencontrés lors de la mise en œuvre.

Tout d'abord, examinons le processus de développement de tâches et d'applications basées sur Apache Spark et soulignons les cas typiques dans lesquels vous devez exécuter une tâche sur un cluster Kubernetes. Lors de la préparation de cet article, OpenShift est utilisé comme distribution et les commandes pertinentes pour son utilitaire de ligne de commande (oc) seront données. Pour les autres distributions Kubernetes, les commandes correspondantes de l'utilitaire de ligne de commande Kubernetes standard (kubectl) ou leurs analogues (par exemple, pour la politique oc adm) peuvent être utilisées.

Premier cas d'utilisation - spark-submit

Lors du développement de tâches et d'applications, le développeur doit exécuter des tâches pour déboguer la transformation des données. Théoriquement, les stubs peuvent être utilisés à ces fins, mais le développement avec la participation d'instances réelles (bien que de test) de systèmes finaux s'est avéré plus rapide et meilleur dans cette classe de tâches. Dans le cas où l'on débogue sur des instances réelles de systèmes finaux, deux scénarios sont possibles :

  • le développeur exécute une tâche Spark localement en mode autonome ;

    Exécuter Apache Spark sur Kubernetes

  • un développeur exécute une tâche Spark sur un cluster Kubernetes dans une boucle de test.

    Exécuter Apache Spark sur Kubernetes

La première option a le droit d'exister, mais comporte un certain nombre d'inconvénients :

  • Chaque développeur doit avoir accès depuis le lieu de travail à toutes les instances des systèmes finaux dont il a besoin ;
  • une quantité suffisante de ressources est requise sur la machine en fonctionnement pour exécuter la tâche en cours de développement.

La deuxième option ne présente pas ces inconvénients, puisque l'utilisation d'un cluster Kubernetes vous permet d'allouer le pool de ressources nécessaire à l'exécution des tâches et de lui fournir l'accès nécessaire aux instances finales du système, en y donnant accès de manière flexible en utilisant le modèle de rôle Kubernetes pour tous les membres de l’équipe de développement. Soulignons-le comme premier cas d'utilisation : lancer des tâches Spark à partir d'une machine de développeur locale sur un cluster Kubernetes dans une boucle de test.

Parlons davantage du processus de configuration de Spark pour qu'il s'exécute localement. Pour commencer à utiliser Spark, vous devez l'installer :

mkdir /opt/spark
cd /opt/spark
wget http://mirror.linux-ia64.org/apache/spark/spark-2.4.5/spark-2.4.5.tgz
tar zxvf spark-2.4.5.tgz
rm -f spark-2.4.5.tgz

Nous collectons les packages nécessaires pour travailler avec Kubernetes :

cd spark-2.4.5/
./build/mvn -Pkubernetes -DskipTests clean package

Une construction complète prend beaucoup de temps, et pour créer des images Docker et les exécuter sur un cluster Kubernetes, vous n'avez en réalité besoin que des fichiers jar du répertoire « assembly/ », vous ne pouvez donc construire que ce sous-projet :

./build/mvn -f ./assembly/pom.xml -Pkubernetes -DskipTests clean package

Pour exécuter des tâches Spark sur Kubernetes, vous devez créer une image Docker à utiliser comme image de base. Il y a 2 approches possibles ici :

  • L'image Docker générée inclut le code de tâche Spark exécutable ;
  • L'image créée inclut uniquement Spark et les dépendances nécessaires, le code exécutable est hébergé à distance (par exemple, en HDFS).

Tout d'abord, créons une image Docker contenant un exemple de test d'une tâche Spark. Pour créer des images Docker, Spark dispose d'un utilitaire appelé « docker-image-tool ». Étudions l'aide à ce sujet :

./bin/docker-image-tool.sh --help

Avec son aide, vous pouvez créer des images Docker et les télécharger dans des registres distants, mais par défaut, cela présente un certain nombre d'inconvénients :

  • crée sans faute 3 images Docker à la fois - pour Spark, PySpark et R ;
  • ne vous permet pas de spécifier un nom d'image.

Par conséquent, nous utiliserons une version modifiée de cet utilitaire donnée ci-dessous :

vi bin/docker-image-tool-upd.sh

#!/usr/bin/env bash

function error {
  echo "$@" 1>&2
  exit 1
}

if [ -z "${SPARK_HOME}" ]; then
  SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
. "${SPARK_HOME}/bin/load-spark-env.sh"

function image_ref {
  local image="$1"
  local add_repo="${2:-1}"
  if [ $add_repo = 1 ] && [ -n "$REPO" ]; then
    image="$REPO/$image"
  fi
  if [ -n "$TAG" ]; then
    image="$image:$TAG"
  fi
  echo "$image"
}

function build {
  local BUILD_ARGS
  local IMG_PATH

  if [ ! -f "$SPARK_HOME/RELEASE" ]; then
    IMG_PATH=$BASEDOCKERFILE
    BUILD_ARGS=(
      ${BUILD_PARAMS}
      --build-arg
      img_path=$IMG_PATH
      --build-arg
      datagram_jars=datagram/runtimelibs
      --build-arg
      spark_jars=assembly/target/scala-$SPARK_SCALA_VERSION/jars
    )
  else
    IMG_PATH="kubernetes/dockerfiles"
    BUILD_ARGS=(${BUILD_PARAMS})
  fi

  if [ -z "$IMG_PATH" ]; then
    error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
  fi

  if [ -z "$IMAGE_REF" ]; then
    error "Cannot find docker image reference. Please add -i arg."
  fi

  local BINDING_BUILD_ARGS=(
    ${BUILD_PARAMS}
    --build-arg
    base_img=$(image_ref $IMAGE_REF)
  )
  local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/docker/Dockerfile"}

  docker build $NOCACHEARG "${BUILD_ARGS[@]}" 
    -t $(image_ref $IMAGE_REF) 
    -f "$BASEDOCKERFILE" .
}

function push {
  docker push "$(image_ref $IMAGE_REF)"
}

function usage {
  cat <<EOF
Usage: $0 [options] [command]
Builds or pushes the built-in Spark Docker image.

Commands:
  build       Build image. Requires a repository address to be provided if the image will be
              pushed to a different registry.
  push        Push a pre-built image to a registry. Requires a repository address to be provided.

Options:
  -f file               Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
  -p file               Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark.
  -R file               Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark.
  -r repo               Repository address.
  -i name               Image name to apply to the built image, or to identify the image to be pushed.  
  -t tag                Tag to apply to the built image, or to identify the image to be pushed.
  -m                    Use minikube's Docker daemon.
  -n                    Build docker image with --no-cache
  -b arg      Build arg to build or push the image. For multiple build args, this option needs to
              be used separately for each build arg.

Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
available when running applications inside the minikube cluster.

Check the following documentation for more information on using the minikube Docker daemon:

  https://kubernetes.io/docs/getting-started-guides/minikube/#reusing-the-docker-daemon

Examples:
  - Build image in minikube with tag "testing"
    $0 -m -t testing build

  - Build and push image with tag "v2.3.0" to docker.io/myrepo
    $0 -r docker.io/myrepo -t v2.3.0 build
    $0 -r docker.io/myrepo -t v2.3.0 push
EOF
}

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
  usage
  exit 0
fi

REPO=
TAG=
BASEDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
IMAGE_REF=
while getopts f:mr:t:nb:i: option
do
 case "${option}"
 in
 f) BASEDOCKERFILE=${OPTARG};;
 r) REPO=${OPTARG};;
 t) TAG=${OPTARG};;
 n) NOCACHEARG="--no-cache";;
 i) IMAGE_REF=${OPTARG};;
 b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};;
 esac
done

case "${@: -1}" in
  build)
    build
    ;;
  push)
    if [ -z "$REPO" ]; then
      usage
      exit 1
    fi
    push
    ;;
  *)
    usage
    exit 1
    ;;
esac

Avec son aide, nous assemblons une image Spark de base contenant une tâche de test pour calculer Pi à l'aide de Spark (ici {docker-registry-url} est l'URL de votre registre d'images Docker, {repo} est le nom du référentiel dans le registre, qui correspond au projet dans OpenShift , {image-name} - nom de l'image (si une séparation des images à trois niveaux est utilisée, par exemple, comme dans le registre intégré des images Red Hat OpenShift), {tag} - balise de ceci version de l'image) :

./bin/docker-image-tool-upd.sh -f resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile -r {docker-registry-url}/{repo} -i {image-name} -t {tag} build

Connectez-vous au cluster OKD à l'aide de l'utilitaire de console (ici {OKD-API-URL} est l'URL de l'API du cluster OKD) :

oc login {OKD-API-URL}

Récupérons le jeton de l'utilisateur actuel pour l'autorisation dans le registre Docker :

oc whoami -t

Connectez-vous au registre Docker interne du cluster OKD (nous utilisons le jeton obtenu à l'aide de la commande précédente comme mot de passe) :

docker login {docker-registry-url}

Téléchargeons l'image Docker assemblée dans le Docker Registry OKD :

./bin/docker-image-tool-upd.sh -r {docker-registry-url}/{repo} -i {image-name} -t {tag} push

Vérifions que l'image assemblée est disponible dans OKD. Pour cela, ouvrez l'URL dans le navigateur avec une liste d'images du projet correspondant (ici {project} est le nom du projet à l'intérieur du cluster OpenShift, {OKD-WEBUI-URL} est l'URL de la console Web OpenShift ) - https://{OKD-WEBUI-URL}/console /project/{project}/browse/images/{image-name}.

Pour exécuter des tâches, un compte de service doit être créé avec les privilèges permettant d'exécuter des pods en tant que root (nous aborderons ce point plus tard) :

oc create sa spark -n {project}
oc adm policy add-scc-to-user anyuid -z spark -n {project}

Exécutons la commande spark-submit pour publier une tâche Spark sur le cluster OKD, en spécifiant le compte de service créé et l'image Docker :

 /opt/spark/bin/spark-submit --name spark-test --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar

Ici:

—name — le nom de la tâche qui participera à la formation du nom des pods Kubernetes ;

—class — classe du fichier exécutable, appelée au démarrage de la tâche ;

—conf — Paramètres de configuration de Spark ;

spark.executor.instances — le nombre d'exécuteurs Spark à lancer ;

spark.kubernetes.authenticate.driver.serviceAccountName - le nom du compte de service Kubernetes utilisé lors du lancement des pods (pour définir le contexte de sécurité et les capacités lors de l'interaction avec l'API Kubernetes) ;

spark.kubernetes.namespace — Espace de noms Kubernetes dans lequel les pods de pilote et d'exécuteur seront lancés ;

spark.submit.deployMode — méthode de lancement de Spark (pour le « cluster » standard de soumission spark, pour Spark Operator et les versions ultérieures du « client » Spark) ;

spark.kubernetes.container.image - Image Docker utilisée pour lancer les pods ;

spark.master — URL de l'API Kubernetes (externe est spécifié pour que l'accès se fasse depuis la machine locale) ;

local:// est le chemin d'accès à l'exécutable Spark dans l'image Docker.

Nous allons au projet OKD correspondant et étudions les pods créés - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods.

Pour simplifier le processus de développement, une autre option peut être utilisée, dans laquelle une image de base commune de Spark est créée, utilisée par toutes les tâches à exécuter, et des instantanés de fichiers exécutables sont publiés sur un stockage externe (par exemple, Hadoop) et spécifiés lors de l'appel. spark-submit sous forme de lien. Dans ce cas, vous pouvez exécuter différentes versions de tâches Spark sans reconstruire les images Docker, en utilisant, par exemple, WebHDFS pour publier des images. Nous envoyons une requête pour créer un fichier (ici {host} est l'hôte du service WebHDFS, {port} est le port du service WebHDFS, {path-to-file-on-hdfs} est le chemin souhaité vers le fichier sur HDFS) :

curl -i -X PUT "http://{host}:{port}/webhdfs/v1/{path-to-file-on-hdfs}?op=CREATE

Vous recevrez une réponse comme celle-ci (ici {location} est l'URL qui doit être utilisée pour télécharger le fichier) :

HTTP/1.1 307 TEMPORARY_REDIRECT
Location: {location}
Content-Length: 0

Chargez le fichier exécutable Spark dans HDFS (ici {path-to-local-file} est le chemin d'accès au fichier exécutable Spark sur l'hôte actuel) :

curl -i -X PUT -T {path-to-local-file} "{location}"

Après cela, nous pouvons effectuer une soumission spark en utilisant le fichier Spark téléchargé sur HDFS (ici {class-name} est le nom de la classe qui doit être lancée pour terminer la tâche) :

/opt/spark/bin/spark-submit --name spark-test --class {class-name} --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  hdfs://{host}:{port}/{path-to-file-on-hdfs}

Il convient de noter que pour accéder à HDFS et garantir le fonctionnement de la tâche, vous devrez peut-être modifier le Dockerfile et le script Entrypoint.sh - ajoutez une directive au Dockerfile pour copier les bibliothèques dépendantes dans le répertoire /opt/spark/jars et inclure le fichier de configuration HDFS dans SPARK_CLASSPATH dans le point d'entrée.sh.

Deuxième cas d'utilisation - Apache Livy

De plus, lorsqu'une tâche est développée et que le résultat doit être testé, la question se pose de la lancer dans le cadre du processus CI/CD et de suivre l'état de son exécution. Bien sûr, vous pouvez l'exécuter à l'aide d'un appel Spark-submit local, mais cela complique l'infrastructure CI/CD car cela nécessite l'installation et la configuration de Spark sur les agents/exécuteurs du serveur CI et la configuration de l'accès à l'API Kubernetes. Dans ce cas, l'implémentation cible a choisi d'utiliser Apache Livy comme API REST pour exécuter des tâches Spark hébergées dans un cluster Kubernetes. Avec son aide, vous pouvez exécuter des tâches Spark sur un cluster Kubernetes à l'aide de requêtes cURL régulières, qui sont facilement implémentées sur la base de n'importe quelle solution CI, et son placement à l'intérieur du cluster Kubernetes résout le problème d'authentification lors de l'interaction avec l'API Kubernetes.

Exécuter Apache Spark sur Kubernetes

Soulignons-le comme deuxième cas d'utilisation : exécuter des tâches Spark dans le cadre d'un processus CI/CD sur un cluster Kubernetes dans une boucle de test.

Un peu sur Apache Livy - il fonctionne comme un serveur HTTP qui fournit une interface Web et une API RESTful qui vous permet de lancer spark-submit à distance en passant les paramètres nécessaires. Traditionnellement, il est livré dans le cadre d'une distribution HDP, mais peut également être déployé sur OKD ou toute autre installation Kubernetes à l'aide du manifeste approprié et d'un ensemble d'images Docker, comme celle-ci : github.com/ttauveron/k8s-big-data-experiments/tree/master/livy-spark-2.3. Pour notre cas, une image Docker similaire a été créée, incluant Spark version 2.4.5 à partir du Dockerfile suivant :

FROM java:8-alpine

ENV SPARK_HOME=/opt/spark
ENV LIVY_HOME=/opt/livy
ENV HADOOP_CONF_DIR=/etc/hadoop/conf
ENV SPARK_USER=spark

WORKDIR /opt

RUN apk add --update openssl wget bash && 
    wget -P /opt https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz && 
    tar xvzf spark-2.4.5-bin-hadoop2.7.tgz && 
    rm spark-2.4.5-bin-hadoop2.7.tgz && 
    ln -s /opt/spark-2.4.5-bin-hadoop2.7 /opt/spark

RUN wget http://mirror.its.dal.ca/apache/incubator/livy/0.7.0-incubating/apache-livy-0.7.0-incubating-bin.zip && 
    unzip apache-livy-0.7.0-incubating-bin.zip && 
    rm apache-livy-0.7.0-incubating-bin.zip && 
    ln -s /opt/apache-livy-0.7.0-incubating-bin /opt/livy && 
    mkdir /var/log/livy && 
    ln -s /var/log/livy /opt/livy/logs && 
    cp /opt/livy/conf/log4j.properties.template /opt/livy/conf/log4j.properties

ADD livy.conf /opt/livy/conf
ADD spark-defaults.conf /opt/spark/conf/spark-defaults.conf
ADD entrypoint.sh /entrypoint.sh

ENV PATH="/opt/livy/bin:${PATH}"

EXPOSE 8998

ENTRYPOINT ["/entrypoint.sh"]
CMD ["livy-server"]

L'image générée peut être créée et téléchargée sur votre référentiel Docker existant, tel que le référentiel OKD interne. Pour le déployer, utilisez le manifeste suivant ({registry-url} - URL du registre d'images Docker, {image-name} - Nom de l'image Docker, {tag} - Balise d'image Docker, {livy-url} - URL souhaitée où le le serveur Livy sera accessible ; le manifeste « Route » est utilisé si Red Hat OpenShift est utilisé comme distribution Kubernetes, sinon le manifeste Ingress ou Service correspondant de type NodePort est utilisé) :

---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: livy
  name: livy
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      component: livy
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        component: livy
    spec:
      containers:
        - command:
            - livy-server
          env:
            - name: K8S_API_HOST
              value: localhost
            - name: SPARK_KUBERNETES_IMAGE
              value: 'gnut3ll4/spark:v1.0.14'
          image: '{registry-url}/{image-name}:{tag}'
          imagePullPolicy: Always
          name: livy
          ports:
            - containerPort: 8998
              name: livy-rest
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - mountPath: /var/log/livy
              name: livy-log
            - mountPath: /opt/.livy-sessions/
              name: livy-sessions
            - mountPath: /opt/livy/conf/livy.conf
              name: livy-config
              subPath: livy.conf
            - mountPath: /opt/spark/conf/spark-defaults.conf
              name: spark-config
              subPath: spark-defaults.conf
        - command:
            - /usr/local/bin/kubectl
            - proxy
            - '--port'
            - '8443'
          image: 'gnut3ll4/kubectl-sidecar:latest'
          imagePullPolicy: Always
          name: kubectl
          ports:
            - containerPort: 8443
              name: k8s-api
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      serviceAccount: spark
      serviceAccountName: spark
      terminationGracePeriodSeconds: 30
      volumes:
        - emptyDir: {}
          name: livy-log
        - emptyDir: {}
          name: livy-sessions
        - configMap:
            defaultMode: 420
            items:
              - key: livy.conf
                path: livy.conf
            name: livy-config
          name: livy-config
        - configMap:
            defaultMode: 420
            items:
              - key: spark-defaults.conf
                path: spark-defaults.conf
            name: livy-config
          name: spark-config
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: livy-config
data:
  livy.conf: |-
    livy.spark.deploy-mode=cluster
    livy.file.local-dir-whitelist=/opt/.livy-sessions/
    livy.spark.master=k8s://http://localhost:8443
    livy.server.session.state-retain.sec = 8h
  spark-defaults.conf: 'spark.kubernetes.container.image        "gnut3ll4/spark:v1.0.14"'
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: livy
  name: livy
spec:
  ports:
    - name: livy-rest
      port: 8998
      protocol: TCP
      targetPort: 8998
  selector:
    component: livy
  sessionAffinity: None
  type: ClusterIP
---
apiVersion: route.openshift.io/v1
kind: Route
metadata:
  labels:
    app: livy
  name: livy
spec:
  host: {livy-url}
  port:
    targetPort: livy-rest
  to:
    kind: Service
    name: livy
    weight: 100
  wildcardPolicy: None

Après l'avoir appliqué et lancé avec succès le pod, l'interface graphique Livy est disponible sur le lien : http://{livy-url}/ui. Avec Livy, nous pouvons publier notre tâche Spark à l'aide d'une requête REST provenant par exemple de Postman. Un exemple de collection avec requêtes est présenté ci-dessous (les arguments de configuration avec les variables nécessaires au fonctionnement de la tâche lancée peuvent être passés dans le tableau « args ») :

{
    "info": {
        "_postman_id": "be135198-d2ff-47b6-a33e-0d27b9dba4c8",
        "name": "Spark Livy",
        "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
    },
    "item": [
        {
            "name": "1 Submit job with jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar", nt"className": "org.apache.spark.examples.SparkPi",nt"numExecutors":1,nt"name": "spark-test-1",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt}n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        },
        {
            "name": "2 Submit job without jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "hdfs://{host}:{port}/{path-to-file-on-hdfs}", nt"className": "{class-name}",nt"numExecutors":1,nt"name": "spark-test-2",nt"proxyUser": "0",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt},nt"args": [ntt"HADOOP_CONF_DIR=/opt/spark/hadoop-conf",ntt"MASTER=k8s://https://kubernetes.default.svc:8443"nt]n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        }
    ],
    "event": [
        {
            "listen": "prerequest",
            "script": {
                "id": "41bea1d0-278c-40c9-ad42-bf2e6268897d",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        },
        {
            "listen": "test",
            "script": {
                "id": "3cdd7736-a885-4a2d-9668-bd75798f4560",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        }
    ],
    "protocolProfileBehavior": {}
}

Exécutons la première requête de la collection, allons dans l'interface OKD et vérifions que la tâche a été lancée avec succès - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods. Parallèlement, une session apparaîtra dans l'interface Livy (http://{livy-url}/ui), au sein de laquelle, à l'aide de l'API Livy ou de l'interface graphique, vous pourrez suivre l'avancement de la tâche et étudier la session. journaux.

Montrons maintenant comment fonctionne Livy. Pour ce faire, examinons les journaux du conteneur Livy à l'intérieur du pod avec le serveur Livy - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods/{livy-pod-name }?tab=journaux. D'eux, nous pouvons voir que lors de l'appel de l'API Livy REST dans un conteneur nommé « livy », un spark-submit est exécuté, similaire à celui que nous avons utilisé ci-dessus (ici {livy-pod-name} est le nom du pod créé avec le serveur Livy). La collection introduit également une deuxième requête qui vous permet d'exécuter des tâches qui hébergent à distance un exécutable Spark à l'aide d'un serveur Livy.

Troisième cas d'utilisation - Spark Operator

Maintenant que la tâche a été testée, la question de son exécution se pose régulièrement. La manière native d'exécuter régulièrement des tâches dans un cluster Kubernetes est l'entité CronJob et vous pouvez l'utiliser, mais pour le moment, l'utilisation d'opérateurs pour gérer les applications dans Kubernetes est très populaire et pour Spark, il existe un opérateur assez mature, qui est également utilisé dans les solutions de niveau entreprise (par exemple, Lightbend FastData Platform). Nous recommandons de l'utiliser - la version stable actuelle de Spark (2.4.5) a des options de configuration plutôt limitées pour exécuter des tâches Spark dans Kubernetes, tandis que la prochaine version majeure (3.0.0) déclare une prise en charge complète de Kubernetes, mais sa date de sortie reste inconnue. . Spark Operator compense cette lacune en ajoutant des options de configuration importantes (par exemple, le montage d'un ConfigMap avec configuration d'accès Hadoop sur les pods Spark) et la possibilité d'exécuter une tâche régulièrement planifiée.

Exécuter Apache Spark sur Kubernetes
Soulignons-le comme troisième cas d'utilisation : exécuter régulièrement des tâches Spark sur un cluster Kubernetes dans une boucle de production.

Spark Operator est open source et développé au sein de Google Cloud Platform - github.com/GoogleCloudPlatform/spark-on-k8s-operator. Son installation peut se faire de 3 manières :

  1. Dans le cadre de l'installation de Lightbend FastData Platform/Cloudflow ;
  2. Utiliser Helm :
    helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
    helm install incubator/sparkoperator --namespace spark-operator
    	

  3. Utilisation des manifestes du référentiel officiel (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/tree/master/manifest). Il convient de noter ce qui suit : Cloudflow inclut un opérateur avec la version API v1beta1. Si ce type d'installation est utilisé, les descriptions du manifeste de l'application Spark doivent être basées sur des exemples de balises dans Git avec la version d'API appropriée, par exemple « v1beta1-0.9.0-2.4.0 ». La version de l'opérateur se trouve dans la description du CRD inclus dans l'opérateur dans le dictionnaire « versions » :
    oc get crd sparkapplications.sparkoperator.k8s.io -o yaml
    	

Si l'opérateur est installé correctement, un pod actif avec l'opérateur Spark apparaîtra dans le projet correspondant (par exemple, cloudflow-fdp-sparkoperator dans l'espace Cloudflow pour l'installation Cloudflow) et un type de ressource Kubernetes correspondant nommé « sparkapplications » apparaîtra . Vous pouvez explorer les applications Spark disponibles avec la commande suivante :

oc get sparkapplications -n {project}

Pour exécuter des tâches à l'aide de Spark Operator, vous devez faire 3 choses :

  • créez une image Docker qui comprend toutes les bibliothèques nécessaires, ainsi que les fichiers de configuration et exécutables. Dans l'image cible, il s'agit d'une image créée au stade CI/CD et testée sur un cluster de test ;
  • publier une image Docker dans un registre accessible depuis le cluster Kubernetes ;
  • générer un manifeste de type « SparkApplication » et une description de la tâche à lancer. Des exemples de manifestes sont disponibles dans le référentiel officiel (par ex. github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta1-0.9.0-2.4.0/examples/spark-pi.yaml). Il y a des points importants à noter à propos du manifeste :
    1. le dictionnaire « apiVersion » doit indiquer la version API correspondant à la version opérateur ;
    2. le dictionnaire « metadata.namespace » doit indiquer l'espace de noms dans lequel l'application sera lancée ;
    3. le dictionnaire « spec.image » doit contenir l'adresse de l'image Docker créée dans un registre accessible ;
    4. le dictionnaire « spec.mainClass » doit contenir la classe de tâches Spark qui doit être exécutée au démarrage du processus ;
    5. le dictionnaire « spec.mainApplicationFile » doit contenir le chemin d'accès au fichier jar exécutable ;
    6. le dictionnaire « spec.sparkVersion » doit indiquer la version de Spark utilisée ;
    7. le dictionnaire « spec.driver.serviceAccount » doit spécifier le compte de service dans l'espace de noms Kubernetes correspondant qui sera utilisé pour exécuter l'application ;
    8. le dictionnaire « spec.executor » doit indiquer le nombre de ressources allouées à l'application ;
    9. le dictionnaire "spec.volumeMounts" doit spécifier le répertoire local dans lequel les fichiers de tâches Spark locaux seront créés.

Un exemple de génération d'un manifeste (ici {spark-service-account} est un compte de service au sein du cluster Kubernetes pour exécuter des tâches Spark) :

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: {spark-service-account}
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Ce manifeste spécifie un compte de service pour lequel, avant de publier le manifeste, vous devez créer les liaisons de rôle nécessaires qui fournissent les droits d'accès nécessaires pour que l'application Spark puisse interagir avec l'API Kubernetes (si nécessaire). Dans notre cas, l'application a besoin de droits pour créer des Pods. Créons la liaison de rôle nécessaire :

oc adm policy add-role-to-user edit system:serviceaccount:{project}:{spark-service-account} -n {project}

Il convient également de noter que cette spécification de manifeste peut inclure un paramètre « hadoopConfigMap », qui vous permet de spécifier un ConfigMap avec la configuration Hadoop sans avoir à placer au préalable le fichier correspondant dans l'image Docker. Il convient également à l'exécution régulière de tâches - en utilisant le paramètre « planification », un calendrier d'exécution d'une tâche donnée peut être spécifié.

Après cela, nous enregistrons notre manifeste dans le fichier spark-pi.yaml et l'appliquons à notre cluster Kubernetes :

oc apply -f spark-pi.yaml

Cela créera un objet de type « sparkapplications » :

oc get sparkapplications -n {project}
> NAME       AGE
> spark-pi   22h

Dans ce cas, un pod avec une application sera créé, dont le statut sera affiché dans les « sparkapplications » créées. Vous pouvez le visualiser avec la commande suivante :

oc get sparkapplications spark-pi -o yaml -n {project}

Une fois la tâche terminée, le POD passera au statut « Terminé », qui sera également mis à jour dans « sparkapplications ». Les journaux d'application peuvent être consultés dans le navigateur ou à l'aide de la commande suivante (ici {sparkapplications-pod-name} est le nom du pod de la tâche en cours d'exécution) :

oc logs {sparkapplications-pod-name} -n {project}

Les tâches Spark peuvent également être gérées à l'aide de l'utilitaire spécialisé sparkctl. Pour l'installer, clonez le référentiel avec son code source, installez Go et build cet utilitaire :

git clone https://github.com/GoogleCloudPlatform/spark-on-k8s-operator.git
cd spark-on-k8s-operator/
wget https://dl.google.com/go/go1.13.3.linux-amd64.tar.gz
tar -xzf go1.13.3.linux-amd64.tar.gz
sudo mv go /usr/local
mkdir $HOME/Projects
export GOROOT=/usr/local/go
export GOPATH=$HOME/Projects
export PATH=$GOPATH/bin:$GOROOT/bin:$PATH
go -version
cd sparkctl
go build -o sparkctl
sudo mv sparkctl /usr/local/bin

Examinons la liste des tâches Spark en cours d'exécution :

sparkctl list -n {project}

Créons une description pour la tâche Spark :

vi spark-app.yaml

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1000m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Exécutons la tâche décrite en utilisant sparkctl :

sparkctl create spark-app.yaml -n {project}

Examinons la liste des tâches Spark en cours d'exécution :

sparkctl list -n {project}

Examinons la liste des événements d'une tâche Spark lancée :

sparkctl event spark-pi -n {project} -f

Examinons l'état de la tâche Spark en cours d'exécution :

sparkctl status spark-pi -n {project}

En conclusion, je voudrais considérer les inconvénients découverts lors de l'utilisation de la version stable actuelle de Spark (2.4.5) dans Kubernetes :

  1. Le premier et peut-être le principal inconvénient est le manque de localisation des données. Malgré toutes les lacunes de YARN, son utilisation présentait également des avantages, par exemple le principe de transmission du code aux données (plutôt que des données au code). Grâce à lui, les tâches Spark ont ​​été exécutées sur les nœuds où se trouvaient les données impliquées dans les calculs, et le temps nécessaire à la transmission des données sur le réseau a été considérablement réduit. Lorsque nous utilisons Kubernetes, nous sommes confrontés à la nécessité de déplacer les données impliquées dans une tâche à travers le réseau. S'ils sont suffisamment volumineux, le temps d'exécution des tâches peut augmenter considérablement et nécessiter également une quantité d'espace disque assez importante allouée aux instances de tâches Spark pour leur stockage temporaire. Cet inconvénient peut être atténué en utilisant un logiciel spécialisé qui garantit la localité des données dans Kubernetes (par exemple, Alluxio), mais cela signifie en réalité la nécessité de stocker une copie complète des données sur les nœuds du cluster Kubernetes.
  2. Le deuxième inconvénient important est la sécurité. Par défaut, les fonctionnalités liées à la sécurité concernant l'exécution des tâches Spark sont désactivées, l'utilisation de Kerberos n'est pas couverte dans la documentation officielle (bien que les options correspondantes aient été introduites dans la version 3.0.0, ce qui nécessitera un travail supplémentaire), et la documentation de sécurité pour en utilisant Spark (https ://spark.apache.org/docs/2.4.5/security.html), seuls YARN, Mesos et Standalone Cluster apparaissent comme magasins de clés. Dans le même temps, l'utilisateur sous lequel les tâches Spark sont lancées ne peut pas être spécifié directement - nous indiquons uniquement le compte de service sous lequel elles fonctionneront et l'utilisateur est sélectionné en fonction des politiques de sécurité configurées. À cet égard, soit l'utilisateur root est utilisé, ce qui n'est pas sûr dans un environnement productif, soit un utilisateur avec un UID aléatoire, ce qui n'est pas pratique lors de la distribution des droits d'accès aux données (cela peut être résolu en créant des PodSecurityPolicies et en les liant au comptes de service correspondants). Actuellement, la solution consiste soit à placer tous les fichiers nécessaires directement dans l'image Docker, soit à modifier le script de lancement de Spark pour utiliser le mécanisme de stockage et de récupération des secrets adopté dans votre organisation.
  3. L'exécution de tâches Spark à l'aide de Kubernetes est officiellement encore en mode expérimental et il pourrait y avoir des changements importants dans les artefacts utilisés (fichiers de configuration, images de base Docker et scripts de lancement) à l'avenir. Et en effet, lors de la préparation du matériel, les versions 2.3.0 et 2.4.5 ont été testées, le comportement était sensiblement différent.

Attendons les mises à jour - une nouvelle version de Spark (3.0.0) a été récemment publiée, qui a apporté des changements importants au travail de Spark sur Kubernetes, mais a conservé le statut expérimental de prise en charge de ce gestionnaire de ressources. Peut-être que les prochaines mises à jour permettront vraiment de recommander pleinement d'abandonner YARN et d'exécuter des tâches Spark sur Kubernetes sans craindre pour la sécurité de votre système et sans avoir besoin de modifier indépendamment les composants fonctionnels.

Fin

Source: habr.com

Ajouter un commentaire