Kører Apache Spark på Kubernetes

Kære læsere, god eftermiddag. I dag vil vi tale lidt om Apache Spark og dets udviklingsmuligheder.

Kører Apache Spark på Kubernetes

I den moderne verden af ​​Big Data er Apache Spark de facto-standarden for udvikling af batch-databehandlingsopgaver. Derudover bruges det også til at skabe streaming-applikationer, der fungerer i mikrobatch-konceptet, behandling og forsendelse af data i små portioner (Spark Structured Streaming). Og traditionelt har det været en del af den overordnede Hadoop-stack, ved at bruge YARN (eller i nogle tilfælde Apache Mesos) som ressourcemanager. I 2020 er brugen af ​​det i sin traditionelle form på tale for de fleste virksomheder på grund af manglen på anstændige Hadoop-distributioner - udviklingen af ​​HDP og CDH er stoppet, CDH er ikke veludviklet og har høje omkostninger, og de resterende Hadoop-leverandører har enten ophørt med at eksistere eller har en mørk fremtid. Derfor er lanceringen af ​​Apache Spark ved hjælp af Kubernetes af stigende interesse blandt samfundet og store virksomheder - ved at blive en standard inden for containerorkestrering og ressourcestyring i private og offentlige skyer, løser det problemet med ubelejlig ressourceplanlægning af Spark-opgaver på YARN og giver en støt udviklende platform med mange kommercielle og åbne distributioner til virksomheder i alle størrelser og striber. Derudover har de fleste i kølvandet på populariteten allerede formået at anskaffe sig et par egne installationer og har øget deres ekspertise i brugen, hvilket letter flytningen.

Fra og med version 2.3.0 erhvervede Apache Spark officiel støtte til at køre opgaver i en Kubernetes-klynge, og i dag vil vi tale om den nuværende modenhed af denne tilgang, forskellige muligheder for dens brug og faldgruber, der vil blive stødt på under implementeringen.

Lad os først og fremmest se på processen med at udvikle opgaver og applikationer baseret på Apache Spark og fremhæve typiske tilfælde, hvor du skal køre en opgave på en Kubernetes-klynge. Ved forberedelsen af ​​dette indlæg bruges OpenShift som en distribution, og kommandoer, der er relevante for dets kommandolinjeværktøj (oc), vil blive givet. For andre Kubernetes-distributioner kan de tilsvarende kommandoer fra standard Kubernetes-kommandolinjeværktøjet (kubectl) eller deres analoger (for eksempel til oc adm-politik) bruges.

Første brug - gnist-indsend

Under udviklingen af ​​opgaver og applikationer skal udvikleren køre opgaver for at fejlsøge datatransformation. Teoretisk kan stubs bruges til disse formål, men udvikling med deltagelse af reelle (omend test) forekomster af slutsystemer har vist sig at være hurtigere og bedre i denne klasse af opgaver. I det tilfælde, hvor vi fejlretter på rigtige forekomster af slutsystemer, er to scenarier mulige:

  • udvikleren kører en Spark-opgave lokalt i selvstændig tilstand;

    Kører Apache Spark på Kubernetes

  • en udvikler kører en Spark-opgave på en Kubernetes-klynge i en testloop.

    Kører Apache Spark på Kubernetes

Den første mulighed har ret til at eksistere, men medfører en række ulemper:

  • Hver udvikler skal have adgang fra arbejdspladsen til alle forekomster af de slutsystemer, han har brug for;
  • der kræves en tilstrækkelig mængde ressourcer på arbejdsmaskinen til at køre den opgave, der udvikles.

Den anden mulighed har ikke disse ulemper, da brugen af ​​en Kubernetes-klynge giver dig mulighed for at allokere den nødvendige ressourcepulje til at køre opgaver og give den den nødvendige adgang til slutsysteminstanser, hvilket fleksibelt giver adgang til det ved hjælp af Kubernetes-rollemodellen for alle medlemmer af udviklingsteamet. Lad os fremhæve det som det første use case - lancering af Spark-opgaver fra en lokal udviklermaskine på en Kubernetes-klynge i et testkredsløb.

Lad os tale mere om processen med at konfigurere Spark til at køre lokalt. For at begynde at bruge Spark skal du installere det:

mkdir /opt/spark
cd /opt/spark
wget http://mirror.linux-ia64.org/apache/spark/spark-2.4.5/spark-2.4.5.tgz
tar zxvf spark-2.4.5.tgz
rm -f spark-2.4.5.tgz

Vi indsamler de nødvendige pakker til at arbejde med Kubernetes:

cd spark-2.4.5/
./build/mvn -Pkubernetes -DskipTests clean package

En fuld build tager meget tid, og for at oprette Docker-billeder og køre dem på en Kubernetes-klynge behøver du egentlig kun jar-filer fra mappen "assembly/", så du kan kun bygge dette underprojekt:

./build/mvn -f ./assembly/pom.xml -Pkubernetes -DskipTests clean package

For at køre Spark-job på Kubernetes skal du oprette et Docker-image, der skal bruges som et basisbillede. Der er 2 mulige tilgange her:

  • Det genererede Docker-billede inkluderer den eksekverbare Spark-opgavekode;
  • Det oprettede billede inkluderer kun Spark og de nødvendige afhængigheder, den eksekverbare kode hostes eksternt (for eksempel i HDFS).

Lad os først bygge et Docker-billede, der indeholder et testeksempel på en Spark-opgave. For at oprette Docker-billeder har Spark et værktøj kaldet "docker-image-tool". Lad os studere hjælpen til det:

./bin/docker-image-tool.sh --help

Med dens hjælp kan du oprette Docker-billeder og uploade dem til eksterne registre, men som standard har det en række ulemper:

  • skaber uden fejl 3 Docker-billeder på én gang - til Spark, PySpark og R;
  • tillader dig ikke at angive et billednavn.

Derfor vil vi bruge en modificeret version af dette værktøj givet nedenfor:

vi bin/docker-image-tool-upd.sh

#!/usr/bin/env bash

function error {
  echo "$@" 1>&2
  exit 1
}

if [ -z "${SPARK_HOME}" ]; then
  SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
. "${SPARK_HOME}/bin/load-spark-env.sh"

function image_ref {
  local image="$1"
  local add_repo="${2:-1}"
  if [ $add_repo = 1 ] && [ -n "$REPO" ]; then
    image="$REPO/$image"
  fi
  if [ -n "$TAG" ]; then
    image="$image:$TAG"
  fi
  echo "$image"
}

function build {
  local BUILD_ARGS
  local IMG_PATH

  if [ ! -f "$SPARK_HOME/RELEASE" ]; then
    IMG_PATH=$BASEDOCKERFILE
    BUILD_ARGS=(
      ${BUILD_PARAMS}
      --build-arg
      img_path=$IMG_PATH
      --build-arg
      datagram_jars=datagram/runtimelibs
      --build-arg
      spark_jars=assembly/target/scala-$SPARK_SCALA_VERSION/jars
    )
  else
    IMG_PATH="kubernetes/dockerfiles"
    BUILD_ARGS=(${BUILD_PARAMS})
  fi

  if [ -z "$IMG_PATH" ]; then
    error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
  fi

  if [ -z "$IMAGE_REF" ]; then
    error "Cannot find docker image reference. Please add -i arg."
  fi

  local BINDING_BUILD_ARGS=(
    ${BUILD_PARAMS}
    --build-arg
    base_img=$(image_ref $IMAGE_REF)
  )
  local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/docker/Dockerfile"}

  docker build $NOCACHEARG "${BUILD_ARGS[@]}" 
    -t $(image_ref $IMAGE_REF) 
    -f "$BASEDOCKERFILE" .
}

function push {
  docker push "$(image_ref $IMAGE_REF)"
}

function usage {
  cat <<EOF
Usage: $0 [options] [command]
Builds or pushes the built-in Spark Docker image.

Commands:
  build       Build image. Requires a repository address to be provided if the image will be
              pushed to a different registry.
  push        Push a pre-built image to a registry. Requires a repository address to be provided.

Options:
  -f file               Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
  -p file               Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark.
  -R file               Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark.
  -r repo               Repository address.
  -i name               Image name to apply to the built image, or to identify the image to be pushed.  
  -t tag                Tag to apply to the built image, or to identify the image to be pushed.
  -m                    Use minikube's Docker daemon.
  -n                    Build docker image with --no-cache
  -b arg      Build arg to build or push the image. For multiple build args, this option needs to
              be used separately for each build arg.

Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
available when running applications inside the minikube cluster.

Check the following documentation for more information on using the minikube Docker daemon:

  https://kubernetes.io/docs/getting-started-guides/minikube/#reusing-the-docker-daemon

Examples:
  - Build image in minikube with tag "testing"
    $0 -m -t testing build

  - Build and push image with tag "v2.3.0" to docker.io/myrepo
    $0 -r docker.io/myrepo -t v2.3.0 build
    $0 -r docker.io/myrepo -t v2.3.0 push
EOF
}

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
  usage
  exit 0
fi

REPO=
TAG=
BASEDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
IMAGE_REF=
while getopts f:mr:t:nb:i: option
do
 case "${option}"
 in
 f) BASEDOCKERFILE=${OPTARG};;
 r) REPO=${OPTARG};;
 t) TAG=${OPTARG};;
 n) NOCACHEARG="--no-cache";;
 i) IMAGE_REF=${OPTARG};;
 b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};;
 esac
done

case "${@: -1}" in
  build)
    build
    ;;
  push)
    if [ -z "$REPO" ]; then
      usage
      exit 1
    fi
    push
    ;;
  *)
    usage
    exit 1
    ;;
esac

Med dets hjælp samler vi et grundlæggende Spark-billede, der indeholder en testopgave til beregning af Pi ved hjælp af Spark (her er {docker-registry-url} URL'en på din Docker-billedregistrering, {repo} er navnet på lageret inde i registreringsdatabasen, som matcher projektet i OpenShift , {image-name} — billedets navn (hvis der bruges tre-niveaus adskillelse af billeder, f.eks. som i den integrerede registrering af Red Hat OpenShift-billeder), {tag} — tag af denne version af billedet):

./bin/docker-image-tool-upd.sh -f resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile -r {docker-registry-url}/{repo} -i {image-name} -t {tag} build

Log ind på OKD-klyngen ved hjælp af konsolværktøjet (her er {OKD-API-URL} OKD-klyngens API URL):

oc login {OKD-API-URL}

Lad os få den aktuelle brugers token til godkendelse i Docker Registry:

oc whoami -t

Log ind på det interne Docker Registry for OKD-klyngen (vi bruger tokenet opnået ved hjælp af den forrige kommando som adgangskode):

docker login {docker-registry-url}

Lad os uploade det samlede Docker-billede til Docker Registry OKD:

./bin/docker-image-tool-upd.sh -r {docker-registry-url}/{repo} -i {image-name} -t {tag} push

Lad os kontrollere, at det samlede billede er tilgængeligt i OKD. For at gøre dette skal du åbne URL'en i browseren med en liste over billeder af det tilsvarende projekt (her er {project} navnet på projektet inde i OpenShift-klyngen, {OKD-WEBUI-URL} er URL'en til OpenShift-webkonsollen ) - https://{OKD-WEBUI-URL}/console /project/{project}/browse/images/{image-name}.

For at køre opgaver skal der oprettes en servicekonto med rettighederne til at køre pods som root (vi vil diskutere dette punkt senere):

oc create sa spark -n {project}
oc adm policy add-scc-to-user anyuid -z spark -n {project}

Lad os køre spark-submit-kommandoen for at udgive en Spark-opgave til OKD-klyngen, med angivelse af den oprettede servicekonto og Docker-billede:

 /opt/spark/bin/spark-submit --name spark-test --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar

her:

—navn — navnet på den opgave, der vil deltage i dannelsen af ​​navnet på Kubernetes-bælgerne;

—class — klasse af den eksekverbare fil, kaldet når opgaven starter;

—conf — Spark-konfigurationsparametre;

spark.executor.instances — antallet af Spark-executorer, der skal lanceres;

spark.kubernetes.authenticate.driver.serviceAccountName - navnet på den Kubernetes-tjenestekonto, der bruges ved lancering af pods (for at definere sikkerhedskonteksten og mulighederne ved interaktion med Kubernetes API);

spark.kubernetes.namespace — Kubernetes navneområde, hvor driver- og eksekutør-pods vil blive lanceret;

spark.submit.deployMode — metode til lancering af Spark (til standard gnist-send bruges "cluster", for Spark Operator og senere versioner af Spark "klient");

spark.kubernetes.container.billede Docker-billede brugt til at starte pods;

spark.master — Kubernetes API URL (ekstern er angivet, så adgang sker fra den lokale maskine);

local:// er stien til den eksekverbare Spark inde i Docker-billedet.

Vi går til det tilsvarende OKD-projekt og studerer de oprettede pods - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods.

For at forenkle udviklingsprocessen kan der bruges en anden mulighed, hvor der oprettes et fælles basisbillede af Spark, som bruges af alle opgaver til at køre, og snapshots af eksekverbare filer publiceres til eksternt lager (for eksempel Hadoop) og specificeres ved opkald spark-send som et link. I dette tilfælde kan du køre forskellige versioner af Spark-opgaver uden at genopbygge Docker-billeder, ved at bruge for eksempel WebHDFS til at publicere billeder. Vi sender en anmodning om at oprette en fil (her er {host} værten for WebHDFS-tjenesten, {port} er porten for WebHDFS-tjenesten, {path-to-file-on-hdfs} er den ønskede sti til filen på HDFS):

curl -i -X PUT "http://{host}:{port}/webhdfs/v1/{path-to-file-on-hdfs}?op=CREATE

Du vil modtage et svar som dette (her {location} er den URL, der skal bruges til at downloade filen):

HTTP/1.1 307 TEMPORARY_REDIRECT
Location: {location}
Content-Length: 0

Indlæs den eksekverbare Spark-fil i HDFS (her er {path-to-local-file} stien til den eksekverbare Spark-fil på den aktuelle vært):

curl -i -X PUT -T {path-to-local-file} "{location}"

Herefter kan vi udføre spark-submit ved hjælp af Spark-filen, der er uploadet til HDFS (her {class-name} er navnet på den klasse, der skal startes for at fuldføre opgaven):

/opt/spark/bin/spark-submit --name spark-test --class {class-name} --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  hdfs://{host}:{port}/{path-to-file-on-hdfs}

Det skal bemærkes, at for at få adgang til HDFS og sikre, at opgaven fungerer, skal du muligvis ændre Dockerfilen og entrypoint.sh scriptet - tilføje et direktiv til Dockerfilen for at kopiere afhængige biblioteker til mappen /opt/spark/jars og inkludere HDFS-konfigurationsfilen i SPARK_CLASSPATH i indgangspunktet sh.

Anden brugsboks - Apache Livy

Yderligere, når en opgave er udviklet, og resultatet skal testes, opstår spørgsmålet om at starte den som en del af CI/CD-processen og spore status for dens udførelse. Selvfølgelig kan du køre det ved hjælp af et lokalt spark-submit-kald, men dette komplicerer CI/CD-infrastrukturen, da det kræver installation og konfiguration af Spark på CI-serveragenterne/-runners og opsætning af adgang til Kubernetes API. I dette tilfælde har målimplementeringen valgt at bruge Apache Livy som en REST API til at køre Spark-opgaver hostet inde i en Kubernetes-klynge. Med dens hjælp kan du køre Spark-opgaver på en Kubernetes-klynge ved hjælp af almindelige cURL-anmodninger, som let implementeres baseret på enhver CI-løsning, og dens placering inde i Kubernetes-klyngen løser problemet med godkendelse, når du interagerer med Kubernetes API.

Kører Apache Spark på Kubernetes

Lad os fremhæve det som et andet use case - at køre Spark-opgaver som en del af en CI/CD-proces på en Kubernetes-klynge i en testloop.

Lidt om Apache Livy - den fungerer som en HTTP-server, der giver en webgrænseflade og en RESTful API, der giver dig mulighed for at fjernstarte spark-submit ved at videregive de nødvendige parametre. Traditionelt er det blevet sendt som en del af en HDP-distribution, men kan også implementeres til OKD eller enhver anden Kubernetes-installation ved hjælp af det relevante manifest og et sæt Docker-billeder, såsom denne - github.com/ttauveron/k8s-big-data-experiments/tree/master/livy-spark-2.3. For vores tilfælde blev der bygget et lignende Docker-billede, inklusive Spark version 2.4.5 fra følgende Dockerfile:

FROM java:8-alpine

ENV SPARK_HOME=/opt/spark
ENV LIVY_HOME=/opt/livy
ENV HADOOP_CONF_DIR=/etc/hadoop/conf
ENV SPARK_USER=spark

WORKDIR /opt

RUN apk add --update openssl wget bash && 
    wget -P /opt https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz && 
    tar xvzf spark-2.4.5-bin-hadoop2.7.tgz && 
    rm spark-2.4.5-bin-hadoop2.7.tgz && 
    ln -s /opt/spark-2.4.5-bin-hadoop2.7 /opt/spark

RUN wget http://mirror.its.dal.ca/apache/incubator/livy/0.7.0-incubating/apache-livy-0.7.0-incubating-bin.zip && 
    unzip apache-livy-0.7.0-incubating-bin.zip && 
    rm apache-livy-0.7.0-incubating-bin.zip && 
    ln -s /opt/apache-livy-0.7.0-incubating-bin /opt/livy && 
    mkdir /var/log/livy && 
    ln -s /var/log/livy /opt/livy/logs && 
    cp /opt/livy/conf/log4j.properties.template /opt/livy/conf/log4j.properties

ADD livy.conf /opt/livy/conf
ADD spark-defaults.conf /opt/spark/conf/spark-defaults.conf
ADD entrypoint.sh /entrypoint.sh

ENV PATH="/opt/livy/bin:${PATH}"

EXPOSE 8998

ENTRYPOINT ["/entrypoint.sh"]
CMD ["livy-server"]

Det genererede billede kan bygges og uploades til dit eksisterende Docker-lager, såsom det interne OKD-lager. For at implementere det skal du bruge følgende manifest ({registry-url} - URL for Docker-billedregistret, {image-name} - Docker-billednavn, {tag} - Docker-billedtag, {livy-url} - ønsket URL, hvor serveren vil være tilgængelig Livy; "Rute"-manifestet bruges, hvis Red Hat OpenShift bruges som Kubernetes-distribution, ellers bruges det tilsvarende Ingress- eller Service-manifest af typen NodePort):

---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: livy
  name: livy
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      component: livy
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        component: livy
    spec:
      containers:
        - command:
            - livy-server
          env:
            - name: K8S_API_HOST
              value: localhost
            - name: SPARK_KUBERNETES_IMAGE
              value: 'gnut3ll4/spark:v1.0.14'
          image: '{registry-url}/{image-name}:{tag}'
          imagePullPolicy: Always
          name: livy
          ports:
            - containerPort: 8998
              name: livy-rest
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - mountPath: /var/log/livy
              name: livy-log
            - mountPath: /opt/.livy-sessions/
              name: livy-sessions
            - mountPath: /opt/livy/conf/livy.conf
              name: livy-config
              subPath: livy.conf
            - mountPath: /opt/spark/conf/spark-defaults.conf
              name: spark-config
              subPath: spark-defaults.conf
        - command:
            - /usr/local/bin/kubectl
            - proxy
            - '--port'
            - '8443'
          image: 'gnut3ll4/kubectl-sidecar:latest'
          imagePullPolicy: Always
          name: kubectl
          ports:
            - containerPort: 8443
              name: k8s-api
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      serviceAccount: spark
      serviceAccountName: spark
      terminationGracePeriodSeconds: 30
      volumes:
        - emptyDir: {}
          name: livy-log
        - emptyDir: {}
          name: livy-sessions
        - configMap:
            defaultMode: 420
            items:
              - key: livy.conf
                path: livy.conf
            name: livy-config
          name: livy-config
        - configMap:
            defaultMode: 420
            items:
              - key: spark-defaults.conf
                path: spark-defaults.conf
            name: livy-config
          name: spark-config
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: livy-config
data:
  livy.conf: |-
    livy.spark.deploy-mode=cluster
    livy.file.local-dir-whitelist=/opt/.livy-sessions/
    livy.spark.master=k8s://http://localhost:8443
    livy.server.session.state-retain.sec = 8h
  spark-defaults.conf: 'spark.kubernetes.container.image        "gnut3ll4/spark:v1.0.14"'
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: livy
  name: livy
spec:
  ports:
    - name: livy-rest
      port: 8998
      protocol: TCP
      targetPort: 8998
  selector:
    component: livy
  sessionAffinity: None
  type: ClusterIP
---
apiVersion: route.openshift.io/v1
kind: Route
metadata:
  labels:
    app: livy
  name: livy
spec:
  host: {livy-url}
  port:
    targetPort: livy-rest
  to:
    kind: Service
    name: livy
    weight: 100
  wildcardPolicy: None

Efter at have anvendt det og succesfuldt lancering af pod'en, er Livy grafiske grænseflade tilgængelig på linket: http://{livy-url}/ui. Med Livy kan vi udgive vores Spark-opgave ved hjælp af en REST-anmodning fra for eksempel Postman. Et eksempel på en samling med anmodninger er præsenteret nedenfor (konfigurationsargumenter med variabler nødvendige for driften af ​​den lancerede opgave kan sendes i "args"-arrayet):

{
    "info": {
        "_postman_id": "be135198-d2ff-47b6-a33e-0d27b9dba4c8",
        "name": "Spark Livy",
        "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
    },
    "item": [
        {
            "name": "1 Submit job with jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar", nt"className": "org.apache.spark.examples.SparkPi",nt"numExecutors":1,nt"name": "spark-test-1",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt}n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        },
        {
            "name": "2 Submit job without jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "hdfs://{host}:{port}/{path-to-file-on-hdfs}", nt"className": "{class-name}",nt"numExecutors":1,nt"name": "spark-test-2",nt"proxyUser": "0",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt},nt"args": [ntt"HADOOP_CONF_DIR=/opt/spark/hadoop-conf",ntt"MASTER=k8s://https://kubernetes.default.svc:8443"nt]n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        }
    ],
    "event": [
        {
            "listen": "prerequest",
            "script": {
                "id": "41bea1d0-278c-40c9-ad42-bf2e6268897d",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        },
        {
            "listen": "test",
            "script": {
                "id": "3cdd7736-a885-4a2d-9668-bd75798f4560",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        }
    ],
    "protocolProfileBehavior": {}
}

Lad os udføre den første anmodning fra samlingen, gå til OKD-grænsefladen og kontrollere, at opgaven er blevet lanceret med succes - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods. Samtidig vises en session i Livy-grænsefladen (http://{livy-url}/ui), hvor du ved hjælp af Livy API eller grafiske grænseflade kan spore opgavens fremskridt og studere sessionen logs.

Lad os nu vise, hvordan Livy fungerer. For at gøre dette, lad os undersøge logfilerne for Livy-beholderen inde i poden med Livy-serveren - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods/{livy-pod-name }?tab=logfiler. Fra dem kan vi se, at når man kalder Livy REST API i en container ved navn "livy", udføres en spark-submit, svarende til den, vi brugte ovenfor (her er {livy-pod-name} navnet på den oprettede pod med Livy-serveren). Samlingen introducerer også en anden forespørgsel, der giver dig mulighed for at køre opgaver, der eksternt hoster en Spark-eksekverbar fil ved hjælp af en Livy-server.

Tredje use case - Spark Operator

Nu hvor opgaven er blevet testet, opstår spørgsmålet om at køre den jævnligt. Den oprindelige måde at køre opgaver i en Kubernetes-klynge på er CronJob-enheden, og du kan bruge den, men i øjeblikket er brugen af ​​operatører til at administrere applikationer i Kubernetes meget populær, og for Spark er der en ret moden operatør, som også er bruges i løsninger på virksomhedsniveau (for eksempel Lightbend FastData Platform). Vi anbefaler at bruge det - den nuværende stabile version af Spark (2.4.5) har ret begrænsede konfigurationsmuligheder til at køre Spark-opgaver i Kubernetes, mens den næste større version (3.0.0) erklærer fuld understøttelse af Kubernetes, men dens udgivelsesdato forbliver ukendt . Spark Operator kompenserer for denne mangel ved at tilføje vigtige konfigurationsmuligheder (for eksempel at montere en ConfigMap med Hadoop-adgangskonfiguration til Spark-pods) og muligheden for at køre en regelmæssigt planlagt opgave.

Kører Apache Spark på Kubernetes
Lad os fremhæve det som en tredje use case - regelmæssigt at køre Spark-opgaver på en Kubernetes-klynge i en produktionsloop.

Spark Operator er open source og udviklet inden for Google Cloud Platform - github.com/GoogleCloudPlatform/spark-on-k8s-operator. Dens installation kan udføres på 3 måder:

  1. Som en del af Lightbend FastData Platform/Cloudflow installationen;
  2. Brug af Helm:
    helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
    helm install incubator/sparkoperator --namespace spark-operator
    	

  3. Brug af manifester fra det officielle lager (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/tree/master/manifest). Det er værd at bemærke følgende - Cloudflow inkluderer en operatør med API-version v1beta1. Hvis denne type installation bruges, skal Spark-applikationsmanifestbeskrivelserne være baseret på eksempel-tags i Git med den passende API-version, for eksempel "v1beta1-0.9.0-2.4.0". Operatørens version kan findes i beskrivelsen af ​​CRD inkluderet i operatøren i "versions" ordbogen:
    oc get crd sparkapplications.sparkoperator.k8s.io -o yaml
    	

Hvis operatøren er installeret korrekt, vises en aktiv pod med Spark-operatøren i det tilsvarende projekt (f.eks. cloudflow-fdp-sparkoperator i Cloudflow-rummet til Cloudflow-installationen), og en tilsvarende Kubernetes-ressourcetype kaldet "sparkplications" vises . Du kan udforske tilgængelige Spark-applikationer med følgende kommando:

oc get sparkapplications -n {project}

For at køre opgaver ved hjælp af Spark Operator skal du gøre 3 ting:

  • oprette et Docker-billede, der inkluderer alle de nødvendige biblioteker, samt konfigurations- og eksekverbare filer. I målbilledet er dette et billede skabt på CI/CD-stadiet og testet på en testklynge;
  • publicere et Docker-image til et register, der er tilgængeligt fra Kubernetes-klyngen;
  • generere et manifest med typen "SparkApplication" og en beskrivelse af den opgave, der skal lanceres. Eksempler på manifester er tilgængelige i det officielle arkiv (f.eks. github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta1-0.9.0-2.4.0/examples/spark-pi.yaml). Der er vigtige punkter at bemærke om manifestet:
    1. "apiVersion"-ordbogen skal angive den API-version, der svarer til operatørversionen;
    2. "metadata.namespace"-ordbogen skal angive det navneområde, hvor applikationen vil blive lanceret;
    3. "spec.image"-ordbogen skal indeholde adressen på det oprettede Docker-billede i et tilgængeligt register;
    4. "spec.mainClass"-ordbogen skal indeholde opgaveklassen Spark, der skal køres, når processen starter;
    5. "spec.mainApplicationFile"-ordbogen skal indeholde stien til den eksekverbare jar-fil;
    6. "spec.sparkVersion"-ordbogen skal angive den version af Spark, der bruges;
    7. "spec.driver.serviceAccount"-ordbogen skal angive tjenestekontoen inden for det tilsvarende Kubernetes-navneområde, der vil blive brugt til at køre programmet;
    8. "spec.executor"-ordbogen skal angive antallet af ressourcer, der er allokeret til applikationen;
    9. "spec.volumeMounts"-ordbogen skal angive den lokale mappe, hvori de lokale Spark-opgavefiler vil blive oprettet.

Et eksempel på generering af et manifest (her er {spark-service-account} en tjenestekonto inde i Kubernetes-klyngen til at køre Spark-opgaver):

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: {spark-service-account}
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Dette manifest specificerer en tjenestekonto, som du, før du udgiver manifestet, skal oprette de nødvendige rollebindinger, der giver de nødvendige adgangsrettigheder til, at Spark-applikationen kan interagere med Kubernetes API (hvis nødvendigt). I vores tilfælde skal applikationen have rettigheder til at oprette Pods. Lad os skabe den nødvendige rollebinding:

oc adm policy add-role-to-user edit system:serviceaccount:{project}:{spark-service-account} -n {project}

Det er også værd at bemærke, at denne manifestspecifikation kan indeholde en "hadoopConfigMap"-parameter, som giver dig mulighed for at angive et ConfigMap med Hadoop-konfigurationen uden først at skulle placere den tilsvarende fil i Docker-billedet. Det er også velegnet til at køre opgaver regelmæssigt - ved hjælp af parameteren "planlægning" kan en tidsplan for at køre en given opgave specificeres.

Derefter gemmer vi vores manifest i spark-pi.yaml-filen og anvender det på vores Kubernetes-klynge:

oc apply -f spark-pi.yaml

Dette vil oprette et objekt af typen "sparkapplications":

oc get sparkapplications -n {project}
> NAME       AGE
> spark-pi   22h

I dette tilfælde vil der blive oprettet en pod med en applikation, hvis status vil blive vist i de oprettede "sparkapplications". Du kan se det med følgende kommando:

oc get sparkapplications spark-pi -o yaml -n {project}

Efter fuldførelse af opgaven vil POD'en flytte til statussen "Fuldført", som også opdateres i "sparkapplications". Applikationslogfiler kan ses i browseren eller ved hjælp af følgende kommando (her er {sparkapplications-pod-name} navnet på poden for den kørende opgave):

oc logs {sparkapplications-pod-name} -n {project}

Spark-opgaver kan også styres ved hjælp af det specialiserede sparkctl-værktøj. For at installere det skal du klone depotet med dets kildekode, installere Go og bygge dette værktøj:

git clone https://github.com/GoogleCloudPlatform/spark-on-k8s-operator.git
cd spark-on-k8s-operator/
wget https://dl.google.com/go/go1.13.3.linux-amd64.tar.gz
tar -xzf go1.13.3.linux-amd64.tar.gz
sudo mv go /usr/local
mkdir $HOME/Projects
export GOROOT=/usr/local/go
export GOPATH=$HOME/Projects
export PATH=$GOPATH/bin:$GOROOT/bin:$PATH
go -version
cd sparkctl
go build -o sparkctl
sudo mv sparkctl /usr/local/bin

Lad os undersøge listen over kørende Spark-opgaver:

sparkctl list -n {project}

Lad os lave en beskrivelse af Spark-opgaven:

vi spark-app.yaml

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1000m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Lad os køre den beskrevne opgave ved hjælp af sparkctl:

sparkctl create spark-app.yaml -n {project}

Lad os undersøge listen over kørende Spark-opgaver:

sparkctl list -n {project}

Lad os undersøge listen over begivenheder for en lanceret Spark-opgave:

sparkctl event spark-pi -n {project} -f

Lad os undersøge status for den kørende Spark-opgave:

sparkctl status spark-pi -n {project}

Afslutningsvis vil jeg gerne overveje de opdagede ulemper ved at bruge den nuværende stabile version af Spark (2.4.5) i Kubernetes:

  1. Den første og måske største ulempe er manglen på datalokalitet. På trods af alle manglerne ved YARN, var der også fordele ved at bruge det, for eksempel princippet om at levere kode til data (i stedet for data til kode). Takket være det blev Spark-opgaver udført på de noder, hvor dataene involveret i beregningerne var placeret, og tiden det tog at levere data over netværket blev væsentligt reduceret. Når vi bruger Kubernetes, står vi over for behovet for at flytte data involveret i en opgave på tværs af netværket. Hvis de er store nok, kan opgaveudførelsestiden stige betydeligt og også kræve en ret stor mængde diskplads, der er allokeret til Spark-opgaveinstanser til deres midlertidige lagring. Denne ulempe kan afbødes ved at bruge specialiseret software, der sikrer datalokalitet i Kubernetes (for eksempel Alluxio), men det betyder faktisk behovet for at gemme en komplet kopi af dataene på noderne i Kubernetes-klyngen.
  2. Den anden vigtige ulempe er sikkerheden. Som standard er sikkerhedsrelaterede funktioner vedrørende kørsel af Spark-opgaver deaktiveret, brugen af ​​Kerberos er ikke dækket af den officielle dokumentation (selvom de tilsvarende muligheder blev introduceret i version 3.0.0, hvilket vil kræve yderligere arbejde), og sikkerhedsdokumentationen for ved brug af Spark (https ://spark.apache.org/docs/2.4.5/security.html) vises kun YARN, Mesos og Standalone Cluster som nøglebutikker. Samtidig kan den bruger, som Spark-opgaver lanceres under, ikke specificeres direkte - vi angiver kun den servicekonto, som den skal fungere under, og brugeren vælges ud fra de konfigurerede sikkerhedspolitikker. I denne forbindelse bruges enten root-brugeren, som ikke er sikker i et produktivt miljø, eller en bruger med et tilfældigt UID, hvilket er ubelejligt ved distribution af adgangsrettigheder til data (dette kan løses ved at oprette PodSecurityPolicies og linke dem til tilsvarende servicekonti). I øjeblikket er løsningen enten at placere alle de nødvendige filer direkte i Docker-billedet eller at ændre Spark-lanceringsscriptet til at bruge mekanismen til lagring og genfinding af hemmeligheder, der er vedtaget i din organisation.
  3. Kørsel af Spark-job ved hjælp af Kubernetes er officielt stadig i eksperimentel tilstand, og der kan være betydelige ændringer i de anvendte artefakter (konfigurationsfiler, Docker-basebilleder og startscripts) i fremtiden. Og faktisk, da materialet blev udarbejdet, blev version 2.3.0 og 2.4.5 testet, adfærden var signifikant anderledes.

Lad os vente på opdateringer - en ny version af Spark (3.0.0) blev for nylig udgivet, som medførte betydelige ændringer i arbejdet med Spark på Kubernetes, men beholdt den eksperimentelle status for support til denne ressourcemanager. Måske vil de næste opdateringer virkelig gøre det muligt fuldt ud at anbefale at opgive YARN og køre Spark-opgaver på Kubernetes uden frygt for dit systems sikkerhed og uden behov for selvstændigt at ændre funktionelle komponenter.

Ende.

Kilde: www.habr.com

Tilføj en kommentar