Duke ekzekutuar Apache Spark në Kubernetes

Të dashur lexues, mirëdita. Sot do të flasim pak për Apache Spark dhe perspektivat e zhvillimit të tij.

Duke ekzekutuar Apache Spark në Kubernetes

Në botën moderne të Big Data, Apache Spark është standardi de fakto për zhvillimin e detyrave të përpunimit të të dhënave në grup. Përveç kësaj, përdoret gjithashtu për të krijuar aplikacione transmetimi që funksionojnë në konceptin e grupit mikro, përpunimin dhe dërgimin e të dhënave në pjesë të vogla (Spark Structured Streaming). Dhe tradicionalisht ka qenë pjesë e grupit të përgjithshëm të Hadoop, duke përdorur YARN (ose në disa raste Apache Mesos) si menaxher të burimeve. Deri në vitin 2020, përdorimi i tij në formën e tij tradicionale është në pikëpyetje për shumicën e kompanive për shkak të mungesës së shpërndarjeve të mira të Hadoop - zhvillimi i HDP dhe CDH ka ndaluar, CDH nuk është zhvilluar mirë dhe ka një kosto të lartë, dhe furnizuesit e mbetur Hadoop kanë ose pushoi së ekzistuari ose kishte një të ardhme të zbehtë. Prandaj, lëshimi i Apache Spark duke përdorur Kubernetes është me interes në rritje midis komunitetit dhe kompanive të mëdha - duke u bërë një standard në orkestrimin e kontejnerëve dhe menaxhimin e burimeve në retë private dhe publike, ai zgjidh problemin me planifikimin e papërshtatshëm të burimeve të detyrave të Spark në YARN dhe ofron një platformë në zhvillim të vazhdueshëm me shumë shpërndarje komerciale dhe të hapura për kompani të të gjitha madhësive dhe vijave. Përveç kësaj, në vazhdën e popullaritetit, shumica tashmë kanë arritur të blejnë disa instalime të tyre dhe kanë rritur ekspertizën e tyre në përdorimin e tij, gjë që thjeshton lëvizjen.

Duke filluar me versionin 2.3.0, Apache Spark mori mbështetje zyrtare për ekzekutimin e detyrave në një grup Kubernetes dhe sot do të flasim për pjekurinë aktuale të kësaj qasjeje, opsionet e ndryshme për përdorimin e saj dhe kurthet që do të hasen gjatë zbatimit.

Para së gjithash, le të shohim procesin e zhvillimit të detyrave dhe aplikacioneve bazuar në Apache Spark dhe të nxjerrim në pah rastet tipike në të cilat duhet të ekzekutoni një detyrë në një grup Kubernetes. Në përgatitjen e këtij postimi, OpenShift përdoret si një shpërndarje dhe do të jepen komandat përkatëse për shërbimin e linjës së komandës (oc). Për shpërndarjet e tjera të Kubernetes, mund të përdoren komandat përkatëse nga programi standard i linjës së komandës Kubernetes (kubectl) ose analogët e tyre (për shembull, për politikën oc adm).

Rasti i parë i përdorimit - shkëndija-submit

Gjatë zhvillimit të detyrave dhe aplikacioneve, zhvilluesi duhet të ekzekutojë detyra për të korrigjuar transformimin e të dhënave. Teorikisht, cungët mund të përdoren për këto qëllime, por zhvillimi me pjesëmarrjen e rasteve reale (megjithëse testuese) të sistemeve përfundimtare është dëshmuar të jetë më i shpejtë dhe më i mirë në këtë klasë detyrash. Në rastin kur korrigjojmë instancat reale të sistemeve fundore, dy skenarë janë të mundshëm:

  • zhvilluesi ekzekuton një detyrë Spark në nivel lokal në modalitetin e pavarur;

    Duke ekzekutuar Apache Spark në Kubernetes

  • një zhvillues ekzekuton një detyrë Spark në një grup Kubernetes në një lak testi.

    Duke ekzekutuar Apache Spark në Kubernetes

Opsioni i parë ka të drejtë të ekzistojë, por sjell një sërë disavantazhesh:

  • Çdo zhvilluesi duhet t'i sigurohet akses nga vendi i punës në të gjitha rastet e sistemeve përfundimtare që i nevojiten;
  • kërkohet një sasi e mjaftueshme burimesh në makinën e punës për të ekzekutuar detyrën që po zhvillohet.

Opsioni i dytë nuk i ka këto disavantazhe, pasi përdorimi i një grupi Kubernetes ju lejon të ndani grupin e nevojshëm të burimeve për ekzekutimin e detyrave dhe t'i siguroni atij aksesin e nevojshëm në rastet përfundimtare të sistemit, duke siguruar në mënyrë fleksibël aksesin në të duke përdorur modelin e roleve Kubernetes për të gjithë anëtarët e ekipit të zhvillimit. Le ta theksojmë atë si rastin e parë të përdorimit - nisjen e detyrave Spark nga një makinë zhvilluesish lokal në një grup Kubernetes në një lak testi.

Le të flasim më shumë për procesin e konfigurimit të Spark për të ekzekutuar në nivel lokal. Për të filluar përdorimin e Spark, duhet ta instaloni:

mkdir /opt/spark
cd /opt/spark
wget http://mirror.linux-ia64.org/apache/spark/spark-2.4.5/spark-2.4.5.tgz
tar zxvf spark-2.4.5.tgz
rm -f spark-2.4.5.tgz

Ne mbledhim paketat e nevojshme për të punuar me Kubernetes:

cd spark-2.4.5/
./build/mvn -Pkubernetes -DskipTests clean package

Një ndërtim i plotë kërkon shumë kohë, dhe për të krijuar imazhe Docker dhe për t'i ekzekutuar ato në një grup Kubernetes, ju nevojiten vërtet vetëm skedarë jar nga drejtoria "assembly/", kështu që mund të ndërtoni vetëm këtë nënprojekt:

./build/mvn -f ./assembly/pom.xml -Pkubernetes -DskipTests clean package

Për të ekzekutuar punët e Spark në Kubernetes, duhet të krijoni një imazh Docker për ta përdorur si imazh bazë. Këtu ka 2 qasje të mundshme:

  • Imazhi i gjeneruar i Docker përfshin kodin e ekzekutueshëm të detyrës Spark;
  • Imazhi i krijuar përfshin vetëm Spark dhe varësitë e nevojshme, kodi i ekzekutueshëm mbahet nga distanca (për shembull, në HDFS).

Së pari, le të ndërtojmë një imazh Docker që përmban një shembull provë të një detyre Spark. Për të krijuar imazhe Docker, Spark ka një mjet të quajtur "docker-image-tool". Le të studiojmë ndihmën për të:

./bin/docker-image-tool.sh --help

Me ndihmën e tij, ju mund të krijoni imazhe Docker dhe t'i ngarkoni ato në regjistrat e largët, por si parazgjedhje ka një sërë disavantazhesh:

  • pa dështuar krijon 3 imazhe Docker menjëherë - për Spark, PySpark dhe R;
  • nuk ju lejon të specifikoni një emër imazhi.

Prandaj, ne do të përdorim një version të modifikuar të këtij programi të dhënë më poshtë:

vi bin/docker-image-tool-upd.sh

#!/usr/bin/env bash

function error {
  echo "$@" 1>&2
  exit 1
}

if [ -z "${SPARK_HOME}" ]; then
  SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
. "${SPARK_HOME}/bin/load-spark-env.sh"

function image_ref {
  local image="$1"
  local add_repo="${2:-1}"
  if [ $add_repo = 1 ] && [ -n "$REPO" ]; then
    image="$REPO/$image"
  fi
  if [ -n "$TAG" ]; then
    image="$image:$TAG"
  fi
  echo "$image"
}

function build {
  local BUILD_ARGS
  local IMG_PATH

  if [ ! -f "$SPARK_HOME/RELEASE" ]; then
    IMG_PATH=$BASEDOCKERFILE
    BUILD_ARGS=(
      ${BUILD_PARAMS}
      --build-arg
      img_path=$IMG_PATH
      --build-arg
      datagram_jars=datagram/runtimelibs
      --build-arg
      spark_jars=assembly/target/scala-$SPARK_SCALA_VERSION/jars
    )
  else
    IMG_PATH="kubernetes/dockerfiles"
    BUILD_ARGS=(${BUILD_PARAMS})
  fi

  if [ -z "$IMG_PATH" ]; then
    error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
  fi

  if [ -z "$IMAGE_REF" ]; then
    error "Cannot find docker image reference. Please add -i arg."
  fi

  local BINDING_BUILD_ARGS=(
    ${BUILD_PARAMS}
    --build-arg
    base_img=$(image_ref $IMAGE_REF)
  )
  local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/docker/Dockerfile"}

  docker build $NOCACHEARG "${BUILD_ARGS[@]}" 
    -t $(image_ref $IMAGE_REF) 
    -f "$BASEDOCKERFILE" .
}

function push {
  docker push "$(image_ref $IMAGE_REF)"
}

function usage {
  cat <<EOF
Usage: $0 [options] [command]
Builds or pushes the built-in Spark Docker image.

Commands:
  build       Build image. Requires a repository address to be provided if the image will be
              pushed to a different registry.
  push        Push a pre-built image to a registry. Requires a repository address to be provided.

Options:
  -f file               Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
  -p file               Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark.
  -R file               Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark.
  -r repo               Repository address.
  -i name               Image name to apply to the built image, or to identify the image to be pushed.  
  -t tag                Tag to apply to the built image, or to identify the image to be pushed.
  -m                    Use minikube's Docker daemon.
  -n                    Build docker image with --no-cache
  -b arg      Build arg to build or push the image. For multiple build args, this option needs to
              be used separately for each build arg.

Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
available when running applications inside the minikube cluster.

Check the following documentation for more information on using the minikube Docker daemon:

  https://kubernetes.io/docs/getting-started-guides/minikube/#reusing-the-docker-daemon

Examples:
  - Build image in minikube with tag "testing"
    $0 -m -t testing build

  - Build and push image with tag "v2.3.0" to docker.io/myrepo
    $0 -r docker.io/myrepo -t v2.3.0 build
    $0 -r docker.io/myrepo -t v2.3.0 push
EOF
}

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
  usage
  exit 0
fi

REPO=
TAG=
BASEDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
IMAGE_REF=
while getopts f:mr:t:nb:i: option
do
 case "${option}"
 in
 f) BASEDOCKERFILE=${OPTARG};;
 r) REPO=${OPTARG};;
 t) TAG=${OPTARG};;
 n) NOCACHEARG="--no-cache";;
 i) IMAGE_REF=${OPTARG};;
 b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};;
 esac
done

case "${@: -1}" in
  build)
    build
    ;;
  push)
    if [ -z "$REPO" ]; then
      usage
      exit 1
    fi
    push
    ;;
  *)
    usage
    exit 1
    ;;
esac

Me ndihmën e tij, ne mbledhim një imazh bazë Spark që përmban një detyrë testimi për llogaritjen e Pi duke përdorur Spark (këtu {docker-registry-url} është URL-ja e regjistrit tuaj të imazheve Docker, {repo} është emri i depove brenda regjistrit, që përputhet me projektin në OpenShift , {image-name} - emri i imazhit (nëse përdoret ndarja në tre nivele e imazheve, për shembull, si në regjistrin e integruar të imazheve të Red Hat OpenShift), {tag} - etiketa e kësaj versioni i imazhit):

./bin/docker-image-tool-upd.sh -f resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile -r {docker-registry-url}/{repo} -i {image-name} -t {tag} build

Identifikohu në grupin OKD duke përdorur programin e konsolës (këtu {OKD-API-URL} është URL-ja e API-së së grupit OKD):

oc login {OKD-API-URL}

Le të marrim shenjën e përdoruesit aktual për autorizim në Regjistrin Docker:

oc whoami -t

Hyni në Regjistrin e brendshëm Docker të grupit OKD (ne përdorim shenjën e marrë duke përdorur komandën e mëparshme si fjalëkalim):

docker login {docker-registry-url}

Le të ngarkojmë imazhin e montuar të Docker në Regjistrin e Docker OKD:

./bin/docker-image-tool-upd.sh -r {docker-registry-url}/{repo} -i {image-name} -t {tag} push

Le të kontrollojmë që imazhi i mbledhur është i disponueshëm në OKD. Për ta bërë këtë, hapni URL-në në shfletues me një listë imazhesh të projektit përkatës (këtu {project} është emri i projektit brenda grupit OpenShift, {OKD-WEBUI-URL} është URL-ja e tastierës së uebit OpenShift ) - https://{OKD-WEBUI-URL}/console /project/{project}/browse/images/{image-name}.

Për të ekzekutuar detyrat, duhet të krijohet një llogari shërbimi me privilegjet për të ekzekutuar pods si rrënjë (ne do ta diskutojmë këtë pikë më vonë):

oc create sa spark -n {project}
oc adm policy add-scc-to-user anyuid -z spark -n {project}

Le të ekzekutojmë komandën spark-submit për të publikuar një detyrë Spark në grupin OKD, duke specifikuar llogarinë e krijuar të shërbimit dhe imazhin e Docker:

 /opt/spark/bin/spark-submit --name spark-test --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar

këtu:

—emri — emri i detyrës që do të marrë pjesë në formimin e emrit të pods Kubernetes;

—class — klasa e skedarit të ekzekutueshëm, thirrur kur fillon detyra;

—conf — Parametrat e konfigurimit të shkëndijës;

spark.executor.instances — numri i ekzekutuesve të Spark për të nisur;

spark.kubernetes.authenticate.driver.serviceAccountName - emri i llogarisë së shërbimit Kubernetes që përdoret gjatë lëshimit të pods (për të përcaktuar kontekstin e sigurisë dhe aftësitë kur ndërveprohet me Kubernetes API);

spark.kubernetes.namespace — Hapësira e emrave të Kubernetes në të cilën do të hapen pod-et e drejtuesit dhe ekzekutuesit;

spark.submit.deployMode — metoda e lëshimit të Spark (për standardin e spark-submit përdoret "cluster", për Operatorin Spark dhe versionet e mëvonshme të "klientit" të Spark);

spark.kubernetes.container.image - Imazhi i dokerit që përdoret për të nisur pods;

spark.master — URL-ja e Kubernetes API (e jashtme është e specifikuar në mënyrë që qasja të ndodhë nga makina lokale);

local:// është shtegu i ekzekutueshëm i Spark brenda imazhit të Docker.

Shkojmë te projekti përkatës OKD dhe studiojmë podet e krijuara - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods.

Për të thjeshtuar procesin e zhvillimit, mund të përdoret një opsion tjetër, në të cilin krijohet një imazh bazë i përbashkët i Spark, i përdorur nga të gjitha detyrat për t'u ekzekutuar dhe fotografitë e skedarëve të ekzekutueshëm publikohen në ruajtjen e jashtme (për shembull, Hadoop) dhe specifikohen kur telefononi shkëndija-dorëzo si lidhje. Në këtë rast, mund të ekzekutoni versione të ndryshme të detyrave Spark pa rindërtuar imazhet e Docker, duke përdorur, për shembull, WebHDFS për të publikuar imazhe. Ne dërgojmë një kërkesë për të krijuar një skedar (këtu {host} është hosti i shërbimit WebHDFS, {port} është porti i shërbimit WebHDFS, {path-to-file-on-hdfs} është shtegu i dëshiruar për në skedar në HDFS):

curl -i -X PUT "http://{host}:{port}/webhdfs/v1/{path-to-file-on-hdfs}?op=CREATE

Do të merrni një përgjigje si kjo (këtu {location} është URL-ja që duhet të përdoret për të shkarkuar skedarin):

HTTP/1.1 307 TEMPORARY_REDIRECT
Location: {location}
Content-Length: 0

Ngarkoni skedarin e ekzekutueshëm Spark në HDFS (këtu {path-to-local-file} është shtegu për në skedarin e ekzekutueshëm Spark në hostin aktual):

curl -i -X PUT -T {path-to-local-file} "{location}"

Pas kësaj, ne mund të bëjmë spark-submit duke përdorur skedarin Spark të ngarkuar në HDFS (këtu {class-name} është emri i klasës që duhet të hapet për të përfunduar detyrën):

/opt/spark/bin/spark-submit --name spark-test --class {class-name} --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  hdfs://{host}:{port}/{path-to-file-on-hdfs}

Duhet të theksohet se për të hyrë në HDFS dhe për të siguruar funksionimin e detyrës, mund t'ju duhet të ndryshoni skriptin Dockerfile dhe entrypoint.sh - shtoni një direktivë në Dockerfile për të kopjuar bibliotekat e varura në drejtorinë /opt/spark/jars dhe përfshini skedarin e konfigurimit HDFS në SPARK_CLASSPATH në pikën hyrëse.

Rasti i dytë i përdorimit - Apache Livy

Më tej, kur një detyrë zhvillohet dhe rezultati duhet të testohet, lind pyetja e nisjes së saj si pjesë e procesit CI/CD dhe gjurmimit të statusit të ekzekutimit të saj. Natyrisht, mund ta ekzekutoni duke përdorur një thirrje lokale spark-submit, por kjo e ndërlikon infrastrukturën CI/CD pasi kërkon instalimin dhe konfigurimin e Spark në agjentët/drejtuesit e serverit CI dhe konfigurimin e aksesit në Kubernetes API. Për këtë rast, zbatimi i synuar ka zgjedhur të përdorë Apache Livy si një API REST për ekzekutimin e detyrave të Spark të pritura brenda një grupi Kubernetes. Me ndihmën e tij, ju mund të ekzekutoni detyrat e Spark në një grup Kubernetes duke përdorur kërkesa të rregullta cURL, të cilat zbatohen lehtësisht bazuar në çdo zgjidhje CI, dhe vendosja e tij brenda grupit Kubernetes zgjidh çështjen e vërtetimit kur ndërveproni me Kubernetes API.

Duke ekzekutuar Apache Spark në Kubernetes

Le ta theksojmë atë si një rast të dytë përdorimi - ekzekutimi i detyrave Spark si pjesë e një procesi CI/CD në një grupim Kubernetes në një lak testi.

Pak për Apache Livy - funksionon si një server HTTP që ofron një ndërfaqe në internet dhe një API RESTful që ju lejon të nisni nga distanca spark-submit duke kaluar parametrat e nevojshëm. Tradicionalisht ai është dërguar si pjesë e një shpërndarjeje HDP, por gjithashtu mund të vendoset në OKD ose në ndonjë instalim tjetër të Kubernetes duke përdorur manifestin e duhur dhe një grup imazhesh Docker, si ky - github.com/ttauveron/k8s-big-data-experiments/tree/master/livy-spark-2.3. Për rastin tonë, u ndërtua një imazh i ngjashëm Docker, duke përfshirë versionin Spark 2.4.5 nga skedari i mëposhtëm Docker:

FROM java:8-alpine

ENV SPARK_HOME=/opt/spark
ENV LIVY_HOME=/opt/livy
ENV HADOOP_CONF_DIR=/etc/hadoop/conf
ENV SPARK_USER=spark

WORKDIR /opt

RUN apk add --update openssl wget bash && 
    wget -P /opt https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz && 
    tar xvzf spark-2.4.5-bin-hadoop2.7.tgz && 
    rm spark-2.4.5-bin-hadoop2.7.tgz && 
    ln -s /opt/spark-2.4.5-bin-hadoop2.7 /opt/spark

RUN wget http://mirror.its.dal.ca/apache/incubator/livy/0.7.0-incubating/apache-livy-0.7.0-incubating-bin.zip && 
    unzip apache-livy-0.7.0-incubating-bin.zip && 
    rm apache-livy-0.7.0-incubating-bin.zip && 
    ln -s /opt/apache-livy-0.7.0-incubating-bin /opt/livy && 
    mkdir /var/log/livy && 
    ln -s /var/log/livy /opt/livy/logs && 
    cp /opt/livy/conf/log4j.properties.template /opt/livy/conf/log4j.properties

ADD livy.conf /opt/livy/conf
ADD spark-defaults.conf /opt/spark/conf/spark-defaults.conf
ADD entrypoint.sh /entrypoint.sh

ENV PATH="/opt/livy/bin:${PATH}"

EXPOSE 8998

ENTRYPOINT ["/entrypoint.sh"]
CMD ["livy-server"]

Imazhi i krijuar mund të ndërtohet dhe të ngarkohet në depon tuaj ekzistuese Docker, siç është depoja e brendshme OKD. Për ta vendosur atë, përdorni manifestin e mëposhtëm ({registry-url} - URL e regjistrit të imazheve Docker, {image-name} - Emri i imazhit të Docker, {tag} - Etiketa e imazhit Docker, {livy-url} - URL-ja e dëshiruar ku serveri do të jetë i aksesueshëm Livy; manifesti "Rruga" përdoret nëse Red Hat OpenShift përdoret si shpërndarje Kubernetes, përndryshe përdoret manifesti përkatës Ingress ose Service i llojit NodePort):

---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: livy
  name: livy
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      component: livy
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        component: livy
    spec:
      containers:
        - command:
            - livy-server
          env:
            - name: K8S_API_HOST
              value: localhost
            - name: SPARK_KUBERNETES_IMAGE
              value: 'gnut3ll4/spark:v1.0.14'
          image: '{registry-url}/{image-name}:{tag}'
          imagePullPolicy: Always
          name: livy
          ports:
            - containerPort: 8998
              name: livy-rest
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - mountPath: /var/log/livy
              name: livy-log
            - mountPath: /opt/.livy-sessions/
              name: livy-sessions
            - mountPath: /opt/livy/conf/livy.conf
              name: livy-config
              subPath: livy.conf
            - mountPath: /opt/spark/conf/spark-defaults.conf
              name: spark-config
              subPath: spark-defaults.conf
        - command:
            - /usr/local/bin/kubectl
            - proxy
            - '--port'
            - '8443'
          image: 'gnut3ll4/kubectl-sidecar:latest'
          imagePullPolicy: Always
          name: kubectl
          ports:
            - containerPort: 8443
              name: k8s-api
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      serviceAccount: spark
      serviceAccountName: spark
      terminationGracePeriodSeconds: 30
      volumes:
        - emptyDir: {}
          name: livy-log
        - emptyDir: {}
          name: livy-sessions
        - configMap:
            defaultMode: 420
            items:
              - key: livy.conf
                path: livy.conf
            name: livy-config
          name: livy-config
        - configMap:
            defaultMode: 420
            items:
              - key: spark-defaults.conf
                path: spark-defaults.conf
            name: livy-config
          name: spark-config
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: livy-config
data:
  livy.conf: |-
    livy.spark.deploy-mode=cluster
    livy.file.local-dir-whitelist=/opt/.livy-sessions/
    livy.spark.master=k8s://http://localhost:8443
    livy.server.session.state-retain.sec = 8h
  spark-defaults.conf: 'spark.kubernetes.container.image        "gnut3ll4/spark:v1.0.14"'
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: livy
  name: livy
spec:
  ports:
    - name: livy-rest
      port: 8998
      protocol: TCP
      targetPort: 8998
  selector:
    component: livy
  sessionAffinity: None
  type: ClusterIP
---
apiVersion: route.openshift.io/v1
kind: Route
metadata:
  labels:
    app: livy
  name: livy
spec:
  host: {livy-url}
  port:
    targetPort: livy-rest
  to:
    kind: Service
    name: livy
    weight: 100
  wildcardPolicy: None

Pas aplikimit të tij dhe nisjes me sukses të podit, ndërfaqja grafike Livy është e disponueshme në lidhjen: http://{livy-url}/ui. Me Livy, ne mund të publikojmë detyrën tonë Spark duke përdorur një kërkesë REST nga, për shembull, Postman. Një shembull i një koleksioni me kërkesa është paraqitur më poshtë (argumentet e konfigurimit me variabla të nevojshëm për funksionimin e detyrës së nisur mund të kalohen në grupin "args"):

{
    "info": {
        "_postman_id": "be135198-d2ff-47b6-a33e-0d27b9dba4c8",
        "name": "Spark Livy",
        "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
    },
    "item": [
        {
            "name": "1 Submit job with jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar", nt"className": "org.apache.spark.examples.SparkPi",nt"numExecutors":1,nt"name": "spark-test-1",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt}n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        },
        {
            "name": "2 Submit job without jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "hdfs://{host}:{port}/{path-to-file-on-hdfs}", nt"className": "{class-name}",nt"numExecutors":1,nt"name": "spark-test-2",nt"proxyUser": "0",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt},nt"args": [ntt"HADOOP_CONF_DIR=/opt/spark/hadoop-conf",ntt"MASTER=k8s://https://kubernetes.default.svc:8443"nt]n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        }
    ],
    "event": [
        {
            "listen": "prerequest",
            "script": {
                "id": "41bea1d0-278c-40c9-ad42-bf2e6268897d",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        },
        {
            "listen": "test",
            "script": {
                "id": "3cdd7736-a885-4a2d-9668-bd75798f4560",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        }
    ],
    "protocolProfileBehavior": {}
}

Le të ekzekutojmë kërkesën e parë nga koleksioni, të shkojmë te ndërfaqja OKD dhe të kontrollojmë nëse detyra është nisur me sukses - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods. Në të njëjtën kohë, një seancë do të shfaqet në ndërfaqen Livy (http://{livy-url}/ui), brenda së cilës, duke përdorur API-në Livy ose ndërfaqen grafike, mund të gjurmoni përparimin e detyrës dhe të studioni seancën trungje.

Tani le të tregojmë se si funksionon Livy. Për ta bërë këtë, le të shqyrtojmë regjistrat e kontejnerit Livy brenda pod me serverin Livy - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods/{livy-pod-name }?tab=regjistrat. Prej tyre mund të shohim se kur thërrasim Livy REST API në një kontejner të quajtur "livy", ekzekutohet një spark-submit, i ngjashëm me atë që kemi përdorur më lart (këtu {livy-pod-name} është emri i podit të krijuar me serverin Livy). Koleksioni prezanton gjithashtu një pyetje të dytë që ju lejon të ekzekutoni detyra që presin nga distanca një ekzekutues Spark duke përdorur një server Livy.

Rasti i tretë i përdorimit - Operatori Spark

Tani që detyra është testuar, lind çështja e ekzekutimit të saj rregullisht. Mënyra vendase për të ekzekutuar rregullisht detyrat në një grup Kubernetes është entiteti CronJob dhe mund ta përdorni, por për momentin përdorimi i operatorëve për të menaxhuar aplikacionet në Kubernetes është shumë i popullarizuar dhe për Spark ekziston një operator mjaft i pjekur, i cili është gjithashtu përdoret në zgjidhjet e nivelit të ndërmarrjes (për shembull, Lightbend FastData Platform). Ne rekomandojmë përdorimin e tij - versioni aktual i qëndrueshëm i Spark (2.4.5) ka opsione mjaft të kufizuara konfigurimi për ekzekutimin e detyrave të Spark në Kubernetes, ndërsa versioni tjetër kryesor (3.0.0) deklaron mbështetje të plotë për Kubernetes, por data e lëshimit të tij mbetet e panjohur . Operatori Spark kompenson këtë mangësi duke shtuar opsione të rëndësishme konfigurimi (për shembull, montimi i një ConfigMap me konfigurimin e aksesit Hadoop në Spark pods) dhe aftësinë për të ekzekutuar një detyrë të planifikuar rregullisht.

Duke ekzekutuar Apache Spark në Kubernetes
Le ta theksojmë atë si një rast të tretë përdorimi - duke ekzekutuar rregullisht detyrat e Spark në një grupim Kubernetes në një qark prodhimi.

Operatori Spark është me burim të hapur dhe i zhvilluar brenda platformës Google Cloud - github.com/GoogleCloudPlatform/spark-on-k8s-operator. Instalimi i tij mund të bëhet në 3 mënyra:

  1. Si pjesë e instalimit Lightbend FastData Platform/Cloudflow;
  2. Duke përdorur Helm:
    helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
    helm install incubator/sparkoperator --namespace spark-operator
    	

  3. Përdorimi i manifesteve nga depoja zyrtare (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/tree/master/manifest). Vlen të përmendet sa vijon - Cloudflow përfshin një operator me versionin API v1beta1. Nëse përdoret ky lloj instalimi, përshkrimet e manifestit të aplikacionit Spark duhet të bazohen në etiketat e shembujve në Git me versionin e duhur të API, për shembull, "v1beta1-0.9.0-2.4.0". Versioni i operatorit mund të gjendet në përshkrimin e CRD të përfshirë në operator në fjalorin "versionet":
    oc get crd sparkapplications.sparkoperator.k8s.io -o yaml
    	

Nëse operatori është instaluar saktë, një pod aktiv me operatorin Spark do të shfaqet në projektin përkatës (për shembull, cloudflow-fdp-sparkoperator në hapësirën Cloudflow për instalimin e Cloudflow) dhe do të shfaqet një lloj burimi përkatës Kubernetes i quajtur "sparkapplications". . Ju mund të eksploroni aplikacionet e disponueshme Spark me komandën e mëposhtme:

oc get sparkapplications -n {project}

Për të ekzekutuar detyrat duke përdorur Spark Operator, duhet të bëni 3 gjëra:

  • krijoni një imazh Docker që përfshin të gjitha bibliotekat e nevojshme, si dhe skedarët e konfigurimit dhe të ekzekutueshëm. Në foton e synuar, ky është një imazh i krijuar në fazën CI/CD dhe i testuar në një grup testimi;
  • publikoni një imazh Docker në një regjistër të aksesueshëm nga grupi Kubernetes;
  • gjeneroni një manifest me llojin "SparkApplication" dhe një përshkrim të detyrës që do të nisë. Shembujt e manifestimeve janë në dispozicion në depon zyrtare (p.sh. github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta1-0.9.0-2.4.0/examples/spark-pi.yaml). Ka pika të rëndësishme për t'u theksuar në lidhje me manifestin:
    1. fjalori “apiVersion” duhet të tregojë versionin API që korrespondon me versionin e operatorit;
    2. fjalori "metadata.namespace" duhet të tregojë hapësirën e emrave në të cilën do të hapet aplikacioni;
    3. fjalori "spec.image" duhet të përmbajë adresën e imazhit të krijuar Docker në një regjistër të aksesueshëm;
    4. fjalori “spec.mainClass” duhet të përmbajë klasën e detyrave Spark që duhet të ekzekutohet kur të fillojë procesi;
    5. fjalori "spec.mainApplicationFile" duhet të përmbajë shtegun drejt skedarit të ekzekutueshëm jar;
    6. fjalori “spec.sparkVersion” duhet të tregojë versionin e Spark që përdoret;
    7. fjalori "spec.driver.serviceAccount" duhet të specifikojë llogarinë e shërbimit brenda hapësirës përkatëse të emrave të Kubernetes që do të përdoret për të ekzekutuar aplikacionin;
    8. fjalori “spec.executor” duhet të tregojë numrin e burimeve të alokuara për aplikacionin;
    9. fjalori "spec.volumeMounts" duhet të specifikojë drejtorinë lokale në të cilën do të krijohen skedarët lokal të detyrave Spark.

Një shembull i krijimit të një manifesti (këtu {spark-service-account} është një llogari shërbimi brenda grupit Kubernetes për ekzekutimin e detyrave Spark):

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: {spark-service-account}
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Ky manifest specifikon një llogari shërbimi për të cilën, përpara se të publikoni manifestin, duhet të krijoni lidhjet e nevojshme të roleve që ofrojnë të drejtat e nevojshme të aksesit që aplikacioni Spark të ndërveprojë me Kubernetes API (nëse është e nevojshme). Në rastin tonë, aplikacioni ka nevojë për të drejta për të krijuar Pods. Le të krijojmë lidhjen e nevojshme të rolit:

oc adm policy add-role-to-user edit system:serviceaccount:{project}:{spark-service-account} -n {project}

Vlen gjithashtu të përmendet se ky specifikim manifest mund të përfshijë një parametër "hadoopConfigMap", i cili ju lejon të specifikoni një ConfigMap me konfigurimin Hadoop pa pasur nevojë të vendosni së pari skedarin përkatës në imazhin Docker. Është gjithashtu i përshtatshëm për ekzekutimin e detyrave rregullisht - duke përdorur parametrin "orari", mund të specifikohet një orar për ekzekutimin e një detyre të caktuar.

Pas kësaj, ne e ruajmë manifestin tonë në skedarin spark-pi.yaml dhe e aplikojmë atë në grupin tonë Kubernetes:

oc apply -f spark-pi.yaml

Kjo do të krijojë një objekt të tipit "sparkapplications":

oc get sparkapplications -n {project}
> NAME       AGE
> spark-pi   22h

Në këtë rast, do të krijohet një pod me një aplikacion, statusi i të cilit do të shfaqet në "sparkapplications" të krijuara. Mund ta shikoni me komandën e mëposhtme:

oc get sparkapplications spark-pi -o yaml -n {project}

Pas përfundimit të detyrës, POD do të kalojë në statusin "Përfunduar", i cili gjithashtu do të përditësohet në "sparkapplications". Regjistrat e aplikacioneve mund të shikohen në shfletues ose duke përdorur komandën e mëposhtme (këtu {sparkapplications-pod-name} është emri i pod-it të detyrës që ekzekutohet):

oc logs {sparkapplications-pod-name} -n {project}

Detyrat Spark mund të menaxhohen gjithashtu duke përdorur programin e specializuar sparkctl. Për ta instaluar, klononi depon me kodin e tij burimor, instaloni Go dhe ndërtoni këtë mjet:

git clone https://github.com/GoogleCloudPlatform/spark-on-k8s-operator.git
cd spark-on-k8s-operator/
wget https://dl.google.com/go/go1.13.3.linux-amd64.tar.gz
tar -xzf go1.13.3.linux-amd64.tar.gz
sudo mv go /usr/local
mkdir $HOME/Projects
export GOROOT=/usr/local/go
export GOPATH=$HOME/Projects
export PATH=$GOPATH/bin:$GOROOT/bin:$PATH
go -version
cd sparkctl
go build -o sparkctl
sudo mv sparkctl /usr/local/bin

Le të shqyrtojmë listën e detyrave të ekzekutimit të Spark:

sparkctl list -n {project}

Le të krijojmë një përshkrim për detyrën Spark:

vi spark-app.yaml

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1000m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Le të ekzekutojmë detyrën e përshkruar duke përdorur sparkctl:

sparkctl create spark-app.yaml -n {project}

Le të shqyrtojmë listën e detyrave të ekzekutimit të Spark:

sparkctl list -n {project}

Le të shqyrtojmë listën e ngjarjeve të një detyre të nisur Spark:

sparkctl event spark-pi -n {project} -f

Le të shqyrtojmë statusin e detyrës së ekzekutimit të Spark:

sparkctl status spark-pi -n {project}

Si përfundim, do të doja të konsideroja disavantazhet e zbuluara të përdorimit të versionit aktual të qëndrueshëm të Spark (2.4.5) në Kubernetes:

  1. Disavantazhi i parë dhe ndoshta kryesor është mungesa e lokalitetit të të dhënave. Përkundër të gjitha mangësive të YARN, kishte edhe avantazhe në përdorimin e tij, për shembull, parimi i dërgimit të kodit në të dhëna (në vend të të dhënave në kod). Falë tij, detyrat Spark u ekzekutuan në nyjet ku ndodheshin të dhënat e përfshira në llogaritjet dhe koha që u desh për të shpërndarë të dhënat në rrjet u zvogëlua ndjeshëm. Kur përdorim Kubernetes, ne përballemi me nevojën për të lëvizur të dhënat e përfshira në një detyrë nëpër rrjet. Nëse ato janë mjaft të mëdha, koha e ekzekutimit të detyrës mund të rritet ndjeshëm, dhe gjithashtu të kërkojë një sasi mjaft të madhe të hapësirës në disk të caktuar për instancat e detyrave Spark për ruajtjen e tyre të përkohshme. Ky disavantazh mund të zbutet duke përdorur softuer të specializuar që siguron lokalitetin e të dhënave në Kubernetes (për shembull, Alluxio), por kjo në fakt nënkupton nevojën për të ruajtur një kopje të plotë të të dhënave në nyjet e grupit Kubernetes.
  2. Disavantazhi i dytë i rëndësishëm është siguria. Si parazgjedhje, veçoritë e lidhura me sigurinë në lidhje me ekzekutimin e detyrave të Spark janë të çaktivizuara, përdorimi i Kerberos nuk mbulohet në dokumentacionin zyrtar (megjithëse opsionet përkatëse u prezantuan në versionin 3.0.0, i cili do të kërkojë punë shtesë) dhe dokumentacioni i sigurisë për duke përdorur Spark (https ://spark.apache.org/docs/2.4.5/security.html) vetëm YARN, Mesos dhe Standalone Cluster shfaqen si depo kryesore. Në të njëjtën kohë, përdoruesi nën të cilin hapen detyrat e Spark nuk mund të specifikohet drejtpërdrejt - ne specifikojmë vetëm llogarinë e shërbimit nën të cilën do të funksionojë, dhe përdoruesi zgjidhet bazuar në politikat e konfiguruara të sigurisë. Në këtë drejtim, ose përdoret përdoruesi rrënjë, i cili nuk është i sigurt në një mjedis produktiv, ose një përdorues me një UID të rastësishëm, i cili është i papërshtatshëm kur shpërndahen të drejtat e aksesit në të dhëna (kjo mund të zgjidhet duke krijuar PodSecurityPolicies dhe duke i lidhur ato me llogaritë përkatëse të shërbimit). Aktualisht, zgjidhja është ose të vendosni të gjithë skedarët e nevojshëm drejtpërdrejt në imazhin e Docker, ose të modifikoni skriptin e nisjes së Spark për të përdorur mekanizmin për ruajtjen dhe marrjen e sekreteve të miratuara në organizatën tuaj.
  3. Ekzekutimi i punëve Spark duke përdorur Kubernetes është zyrtarisht ende në modalitetin eksperimental dhe mund të ketë ndryshime të rëndësishme në artefaktet e përdorura (skedarët e konfigurimit, imazhet e bazës Docker dhe skriptet e nisjes) në të ardhmen. Dhe në të vërtetë, gjatë përgatitjes së materialit, u testuan versionet 2.3.0 dhe 2.4.5, sjellja ishte dukshëm e ndryshme.

Le të presim për përditësime - një version i ri i Spark (3.0.0) u lëshua së fundmi, i cili solli ndryshime të rëndësishme në punën e Spark në Kubernetes, por ruajti statusin eksperimental të mbështetjes për këtë menaxher burimesh. Ndoshta përditësimet e ardhshme do të bëjnë vërtet të mundur rekomandimin e plotë të braktisjes së YARN dhe ekzekutimit të detyrave të Spark në Kubernetes pa frikë për sigurinë e sistemit tuaj dhe pa nevojën për të modifikuar në mënyrë të pavarur komponentët funksionalë.

Fund

Burimi: www.habr.com

Shto një koment