Apache Spark futtatása Kubernetesen

Kedves olvasók, jó napot! Ma egy kicsit az Apache Sparkról és fejlesztési kilátásairól fogunk beszélni.

Apache Spark futtatása Kubernetesen

A Big Data modern világában az Apache Spark a kötegelt adatfeldolgozási feladatok fejlesztésének de facto szabványa. Ezenkívül olyan adatfolyam-alkalmazások létrehozására is használható, amelyek mikro kötegelt koncepcióban működnek, és kis részletekben dolgozzák fel és szállítják az adatokat (Spark Structured Streaming). Hagyományosan a teljes Hadoop-verem része, YARN-t (vagy bizonyos esetekben Apache Mesos-t) használva erőforrás-kezelőként. 2020-ra a hagyományos formában való felhasználása kérdéses a legtöbb cég számára a megfelelő Hadoop disztribúciók hiánya miatt – a HDP és a CDH fejlesztése leállt, a CDH nem fejlett és magas költségekkel jár, a fennmaradó Hadoop beszállítók pedig vagy megszűnt létezni, vagy homályos jövője van. Ezért az Apache Spark Kubernetes használatával történő elindítása egyre nagyobb érdeklődésre tart számot a közösség és a nagyvállalatok körében – a konténer-hangszerelés és az erőforrás-kezelés szabványává válik a privát és nyilvános felhőkben, megoldja a problémát a Spark-feladatok kényelmetlen ütemezése a YARN-n, és folyamatosan fejlődő platform számos kereskedelmi és nyílt disztribúcióval minden méretű és kategóriájú vállalat számára. Ráadásul a népszerűség nyomán a legtöbbnek már sikerült beszereznie néhány saját installációt, és növelte a használatában szerzett szakértelmét, ami leegyszerűsíti a lépést.

A 2.3.0-s verziótól kezdődően az Apache Spark hivatalos támogatást szerzett a Kubernetes-fürtben futó feladatokhoz, és ma szó lesz ennek a megközelítésnek a jelenlegi érettségéről, a különféle felhasználási lehetőségekről és a megvalósítás során felmerülő buktatókról.

Először is nézzük meg az Apache Spark alapú feladatok és alkalmazások fejlesztésének folyamatát, és emeljük ki a tipikus eseteket, amikor Kubernetes-fürtön kell egy feladatot futtatni. A bejegyzés elkészítése során az OpenShiftet disztribúcióként használjuk, és a parancssori segédprogramhoz (oc) kapcsolódó parancsokat adjuk meg. Más Kubernetes disztribúciók esetén a szabványos Kubernetes parancssori segédprogram (kubectl) megfelelő parancsai vagy analógjaik (például az oc adm házirendhez) használhatók.

Első használati eset – szikra-benyújtás

A feladatok és alkalmazások fejlesztése során a fejlesztőnek feladatokat kell futtatnia az adatok átalakítása hibakereséséhez. Elméletileg a csonkok felhasználhatók erre a célra, de a végrendszerek valós (bár teszt) példányainak részvételével történő fejlesztés gyorsabbnak és jobbnak bizonyult ebben a feladatcsoportban. Abban az esetben, ha a végrendszerek valós példányain hibakeresést végzünk, két forgatókönyv lehetséges:

  • a fejlesztő helyileg, önálló módban futtat egy Spark-feladatot;

    Apache Spark futtatása Kubernetesen

  • a fejlesztő egy Spark-feladatot futtat egy Kubernetes-fürtön egy tesztkörben.

    Apache Spark futtatása Kubernetesen

Az első lehetőségnek joga van létezni, de számos hátránnyal jár:

  • Minden fejlesztőnek hozzáférést kell biztosítani a munkahelyről a végrendszerek összes példányához, amelyre szüksége van;
  • megfelelő mennyiségű erőforrás szükséges a munkagépen a fejlesztés alatt álló feladat futtatásához.

A második lehetőség nem rendelkezik ezekkel a hátrányokkal, mivel a Kubernetes-fürt használata lehetővé teszi, hogy lefoglalja a szükséges erőforráskészletet a futó feladatokhoz, és biztosítsa számára a szükséges hozzáférést a végrendszerpéldányokhoz, rugalmasan biztosítva a hozzáférést a Kubernetes példaképet használva. a fejlesztőcsapat összes tagja. Kiemeljük az első használati esetként – Spark-feladatok indítása egy Kubernetes-fürtön lévő helyi fejlesztői gépről egy tesztkörben.

Beszéljünk részletesebben a Spark helyi futtatására történő beállításának folyamatáról. A Spark használatának megkezdéséhez telepítenie kell:

mkdir /opt/spark
cd /opt/spark
wget http://mirror.linux-ia64.org/apache/spark/spark-2.4.5/spark-2.4.5.tgz
tar zxvf spark-2.4.5.tgz
rm -f spark-2.4.5.tgz

Összegyűjtjük a Kubernetes munkához szükséges csomagokat:

cd spark-2.4.5/
./build/mvn -Pkubernetes -DskipTests clean package

A teljes felépítés sok időt vesz igénybe, és a Docker-képek létrehozásához és Kubernetes-fürtön való futtatásához valójában csak az „assembly/” könyvtárból származó jar-fájlokra van szükség, így csak ezt az alprojektet építheti fel:

./build/mvn -f ./assembly/pom.xml -Pkubernetes -DskipTests clean package

Ha Spark-feladatokat szeretne futtatni a Kubernetes rendszeren, létre kell hoznia egy Docker-lemezképet, amelyet alapképként használhat. Itt 2 lehetséges megközelítés létezik:

  • Az előállított Docker-lemezkép tartalmazza a végrehajtható Spark-feladatkódot;
  • A létrehozott kép csak a Sparkot és a szükséges függőségeket tartalmazza, a végrehajtható kód távolról (például HDFS-ben) van tárolva.

Először készítsünk egy Docker-képet, amely egy Spark-feladat tesztpéldáját tartalmazza. A Docker-képek létrehozásához a Spark rendelkezik egy „docker-image-tool” nevű segédprogrammal. Tanulmányozzuk a segítséget:

./bin/docker-image-tool.sh --help

Segítségével létrehozhat Docker képeket és feltöltheti őket távoli nyilvántartásokba, de alapértelmezés szerint számos hátránya van:

  • hiba nélkül 3 Docker-képet hoz létre egyszerre - Spark, PySpark és R számára;
  • nem teszi lehetővé képnév megadását.

Ezért ennek a segédprogramnak az alábbiakban megadott módosított változatát fogjuk használni:

vi bin/docker-image-tool-upd.sh

#!/usr/bin/env bash

function error {
  echo "$@" 1>&2
  exit 1
}

if [ -z "${SPARK_HOME}" ]; then
  SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
. "${SPARK_HOME}/bin/load-spark-env.sh"

function image_ref {
  local image="$1"
  local add_repo="${2:-1}"
  if [ $add_repo = 1 ] && [ -n "$REPO" ]; then
    image="$REPO/$image"
  fi
  if [ -n "$TAG" ]; then
    image="$image:$TAG"
  fi
  echo "$image"
}

function build {
  local BUILD_ARGS
  local IMG_PATH

  if [ ! -f "$SPARK_HOME/RELEASE" ]; then
    IMG_PATH=$BASEDOCKERFILE
    BUILD_ARGS=(
      ${BUILD_PARAMS}
      --build-arg
      img_path=$IMG_PATH
      --build-arg
      datagram_jars=datagram/runtimelibs
      --build-arg
      spark_jars=assembly/target/scala-$SPARK_SCALA_VERSION/jars
    )
  else
    IMG_PATH="kubernetes/dockerfiles"
    BUILD_ARGS=(${BUILD_PARAMS})
  fi

  if [ -z "$IMG_PATH" ]; then
    error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
  fi

  if [ -z "$IMAGE_REF" ]; then
    error "Cannot find docker image reference. Please add -i arg."
  fi

  local BINDING_BUILD_ARGS=(
    ${BUILD_PARAMS}
    --build-arg
    base_img=$(image_ref $IMAGE_REF)
  )
  local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/docker/Dockerfile"}

  docker build $NOCACHEARG "${BUILD_ARGS[@]}" 
    -t $(image_ref $IMAGE_REF) 
    -f "$BASEDOCKERFILE" .
}

function push {
  docker push "$(image_ref $IMAGE_REF)"
}

function usage {
  cat <<EOF
Usage: $0 [options] [command]
Builds or pushes the built-in Spark Docker image.

Commands:
  build       Build image. Requires a repository address to be provided if the image will be
              pushed to a different registry.
  push        Push a pre-built image to a registry. Requires a repository address to be provided.

Options:
  -f file               Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
  -p file               Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark.
  -R file               Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark.
  -r repo               Repository address.
  -i name               Image name to apply to the built image, or to identify the image to be pushed.  
  -t tag                Tag to apply to the built image, or to identify the image to be pushed.
  -m                    Use minikube's Docker daemon.
  -n                    Build docker image with --no-cache
  -b arg      Build arg to build or push the image. For multiple build args, this option needs to
              be used separately for each build arg.

Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
available when running applications inside the minikube cluster.

Check the following documentation for more information on using the minikube Docker daemon:

  https://kubernetes.io/docs/getting-started-guides/minikube/#reusing-the-docker-daemon

Examples:
  - Build image in minikube with tag "testing"
    $0 -m -t testing build

  - Build and push image with tag "v2.3.0" to docker.io/myrepo
    $0 -r docker.io/myrepo -t v2.3.0 build
    $0 -r docker.io/myrepo -t v2.3.0 push
EOF
}

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
  usage
  exit 0
fi

REPO=
TAG=
BASEDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
IMAGE_REF=
while getopts f:mr:t:nb:i: option
do
 case "${option}"
 in
 f) BASEDOCKERFILE=${OPTARG};;
 r) REPO=${OPTARG};;
 t) TAG=${OPTARG};;
 n) NOCACHEARG="--no-cache";;
 i) IMAGE_REF=${OPTARG};;
 b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};;
 esac
done

case "${@: -1}" in
  build)
    build
    ;;
  push)
    if [ -z "$REPO" ]; then
      usage
      exit 1
    fi
    push
    ;;
  *)
    usage
    exit 1
    ;;
esac

Segítségével összeállítunk egy alap Spark-képet, amely egy tesztfeladatot tartalmaz a Pi Spark segítségével történő kiszámításához (itt a {docker-registry-url} a Docker-képregisztráció URL-je, a {repo} a rendszerleíró adatbázison belüli tár neve, amely megegyezik a projekttel az OpenShiftben, {image-name} - a kép neve (ha a képek háromszintű elválasztását alkalmazzák, például, mint a Red Hat OpenShift képek integrált nyilvántartásában), {tag} - ennek címkéje a kép verziója):

./bin/docker-image-tool-upd.sh -f resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile -r {docker-registry-url}/{repo} -i {image-name} -t {tag} build

Jelentkezzen be az OKD-fürtbe a konzolsegédprogrammal (itt az {OKD-API-URL} az OKD-fürt API URL-je):

oc login {OKD-API-URL}

Szerezzük meg az aktuális felhasználó tokenjét az engedélyezéshez a Docker Registry-ben:

oc whoami -t

Jelentkezzen be az OKD-fürt belső Docker-nyilvántartásába (jelszóként az előző paranccsal kapott tokent használjuk):

docker login {docker-registry-url}

Töltsük fel az összeállított Docker-képet a Docker Registry OKD-be:

./bin/docker-image-tool-upd.sh -r {docker-registry-url}/{repo} -i {image-name} -t {tag} push

Ellenőrizzük, hogy az összeállított kép elérhető-e OKD-ben. Ehhez nyissa meg az URL-t a böngészőben a megfelelő projekt képeinek listájával (itt a {project} a projekt neve az OpenShift-fürtön belül, az {OKD-WEBUI-URL} az OpenShift webkonzol URL-je ) - https://{OKD-WEBUI-URL}/console /project/{project}/browse/images/{image-name}.

A feladatok futtatásához létre kell hozni egy szolgáltatásfiókot a pods rootként való futtatásához szükséges jogosultságokkal (erről a pontról később lesz szó):

oc create sa spark -n {project}
oc adm policy add-scc-to-user anyuid -z spark -n {project}

Futtassa a spark-submit parancsot egy Spark-feladat közzétételéhez az OKD-fürtben, megadva a létrehozott szolgáltatásfiókot és a Docker-képet:

 /opt/spark/bin/spark-submit --name spark-test --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar

itt:

—name — annak a feladatnak a neve, amely részt vesz a Kubernetes-hüvelyek nevének kialakításában;

—class — a végrehajtható fájl osztálya, amelyet a feladat indításakor hívunk meg;

—conf — Spark konfigurációs paraméterei;

spark.executor.instances — az indítandó Spark végrehajtók száma;

spark.kubernetes.authenticate.driver.serviceAccountName – a Kubernetes szolgáltatásfiók neve, amelyet a pod-ok indításakor használnak (a Kubernetes API-val való interakció során a biztonsági kontextus és képességek meghatározásához);

spark.kubernetes.namespace — Kubernetes névtér, amelyben az illesztőprogram és a végrehajtó podok elindulnak;

spark.submit.deployMode — a Spark indításának módja (a szabványos spark-submit „fürt” használatos, a Spark Operator és a Spark „kliens” újabb verziói);

spark.kubernetes.container.image – Docker-kép a pod-ok indításához;

spark.master — Kubernetes API URL (külső megadva, így a hozzáférés a helyi gépről történik);

local:// a Spark végrehajtható fájl elérési útja a Docker lemezképben.

Megyünk a megfelelő OKD-projekthez, és tanulmányozzuk a létrehozott podokat - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods.

A fejlesztési folyamat leegyszerűsítésére egy másik lehetőség is használható, amelyben a Spark közös alapképe jön létre, amelyet minden feladat futtatásához használ, és a végrehajtható fájlok pillanatképei külső tárolón (például Hadoop) kerülnek közzétételre, és megadják a híváskor. spark-submit linkként. Ebben az esetben a Spark-feladatok különböző verzióit futtathatja a Docker-képfájlok újraépítése nélkül, például a WebHDFS használatával a képek közzétételéhez. Kérést küldünk egy fájl létrehozására (itt a {host} a WebHDFS szolgáltatás gazdagépe, a {port} a WebHDFS szolgáltatás portja, a {path-to-file-on-hdfs} a fájl kívánt elérési útja HDFS-en):

curl -i -X PUT "http://{host}:{port}/webhdfs/v1/{path-to-file-on-hdfs}?op=CREATE

Ehhez hasonló választ fog kapni (itt a {location} az az URL, amelyet a fájl letöltéséhez kell használni):

HTTP/1.1 307 TEMPORARY_REDIRECT
Location: {location}
Content-Length: 0

Töltse be a Spark végrehajtható fájlját a HDFS-be (itt a {local-fájl elérési útja} a Spark végrehajtható fájl elérési útja az aktuális gazdagépen):

curl -i -X PUT -T {path-to-local-file} "{location}"

Ezt követően a HDFS-re feltöltött Spark fájl segítségével megtehetjük a spark-submit-ot (itt a {class-name} annak az osztálynak a neve, amelyet a feladat végrehajtásához el kell indítani):

/opt/spark/bin/spark-submit --name spark-test --class {class-name} --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  hdfs://{host}:{port}/{path-to-file-on-hdfs}

Meg kell jegyezni, hogy a HDFS eléréséhez és a feladat működésének biztosításához szükség lehet a Dockerfile és az enterpoint.sh szkript módosítására – adjon hozzá egy direktívát a Dockerfile-hoz, hogy a függő könyvtárakat az /opt/spark/jars könyvtárba másolja, és tartalmazza a HDFS konfigurációs fájlt a belépési pont SPARK_CLASSPATH pontjában.

Második használati eset – Apache Livy

Továbbá, amikor egy feladatot kidolgozunk és az eredményt tesztelni kell, felmerül a kérdés, hogy a CI/CD folyamat részeként indítsuk el, és nyomon követjük a végrehajtás állapotát. Természetesen futtathatja helyi spark-submit hívással, de ez bonyolítja a CI/CD infrastruktúrát, mivel ehhez telepíteni és konfigurálni kell a Sparkot a CI-kiszolgáló ügynökein/futóin, valamint be kell állítania a Kubernetes API-hoz való hozzáférést. Ebben az esetben a célmegvalósítás úgy döntött, hogy az Apache Livy-t REST API-ként használja a Kubernetes-fürtön belül tárolt Spark-feladatok futtatásához. Segítségével Spark-feladatokat futtathatunk Kubernetes-fürtön, normál cURL-kérésekkel, ami bármilyen CI-megoldás alapján könnyen megvalósítható, a Kubernetes-fürtön belüli elhelyezése pedig megoldja a Kubernetes API-val való interakció során a hitelesítés problémáját.

Apache Spark futtatása Kubernetesen

Kiemeljük második használati esetként – a Spark-feladatok futtatása egy Kubernetes-fürt CI/CD-folyamat részeként egy tesztkörben.

Egy kicsit az Apache Livy-ről – HTTP-kiszolgálóként működik, amely webes felületet és RESTful API-t biztosít, amely lehetővé teszi a spark-submit távoli elindítását a szükséges paraméterek átadásával. Hagyományosan egy HDP-disztribúció részeként szállították, de üzembe helyezhető az OKD-ben vagy bármely más Kubernetes-telepítésben is a megfelelő jegyzék és a Docker-képkészlet segítségével, mint például ez - github.com/ttauveron/k8s-big-data-experiments/tree/master/livy-spark-2.3. A mi esetünkben egy hasonló Docker-lemezkép készült, beleértve a Spark 2.4.5-ös verzióját a következő Dockerfile-ból:

FROM java:8-alpine

ENV SPARK_HOME=/opt/spark
ENV LIVY_HOME=/opt/livy
ENV HADOOP_CONF_DIR=/etc/hadoop/conf
ENV SPARK_USER=spark

WORKDIR /opt

RUN apk add --update openssl wget bash && 
    wget -P /opt https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz && 
    tar xvzf spark-2.4.5-bin-hadoop2.7.tgz && 
    rm spark-2.4.5-bin-hadoop2.7.tgz && 
    ln -s /opt/spark-2.4.5-bin-hadoop2.7 /opt/spark

RUN wget http://mirror.its.dal.ca/apache/incubator/livy/0.7.0-incubating/apache-livy-0.7.0-incubating-bin.zip && 
    unzip apache-livy-0.7.0-incubating-bin.zip && 
    rm apache-livy-0.7.0-incubating-bin.zip && 
    ln -s /opt/apache-livy-0.7.0-incubating-bin /opt/livy && 
    mkdir /var/log/livy && 
    ln -s /var/log/livy /opt/livy/logs && 
    cp /opt/livy/conf/log4j.properties.template /opt/livy/conf/log4j.properties

ADD livy.conf /opt/livy/conf
ADD spark-defaults.conf /opt/spark/conf/spark-defaults.conf
ADD entrypoint.sh /entrypoint.sh

ENV PATH="/opt/livy/bin:${PATH}"

EXPOSE 8998

ENTRYPOINT ["/entrypoint.sh"]
CMD ["livy-server"]

Az előállított kép összeállítható és feltölthető a meglévő Docker-tárhelyre, például a belső OKD-tárba. A telepítéshez használja a következő jegyzéket ({registry-url} - a Docker képregisztrációs adatbázis URL-je, {image-name} - Docker-képnév, {tag} - Docker-képcímke, {livy-url} - kívánt URL, ahol a a szerver elérhető lesz a Livy; az „Útvonal” jegyzéket használja, ha a Red Hat OpenShiftet Kubernetes disztribúcióként használja, ellenkező esetben a megfelelő NodePort típusú Ingress vagy Service jegyzéket használja):

---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: livy
  name: livy
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      component: livy
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        component: livy
    spec:
      containers:
        - command:
            - livy-server
          env:
            - name: K8S_API_HOST
              value: localhost
            - name: SPARK_KUBERNETES_IMAGE
              value: 'gnut3ll4/spark:v1.0.14'
          image: '{registry-url}/{image-name}:{tag}'
          imagePullPolicy: Always
          name: livy
          ports:
            - containerPort: 8998
              name: livy-rest
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - mountPath: /var/log/livy
              name: livy-log
            - mountPath: /opt/.livy-sessions/
              name: livy-sessions
            - mountPath: /opt/livy/conf/livy.conf
              name: livy-config
              subPath: livy.conf
            - mountPath: /opt/spark/conf/spark-defaults.conf
              name: spark-config
              subPath: spark-defaults.conf
        - command:
            - /usr/local/bin/kubectl
            - proxy
            - '--port'
            - '8443'
          image: 'gnut3ll4/kubectl-sidecar:latest'
          imagePullPolicy: Always
          name: kubectl
          ports:
            - containerPort: 8443
              name: k8s-api
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      serviceAccount: spark
      serviceAccountName: spark
      terminationGracePeriodSeconds: 30
      volumes:
        - emptyDir: {}
          name: livy-log
        - emptyDir: {}
          name: livy-sessions
        - configMap:
            defaultMode: 420
            items:
              - key: livy.conf
                path: livy.conf
            name: livy-config
          name: livy-config
        - configMap:
            defaultMode: 420
            items:
              - key: spark-defaults.conf
                path: spark-defaults.conf
            name: livy-config
          name: spark-config
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: livy-config
data:
  livy.conf: |-
    livy.spark.deploy-mode=cluster
    livy.file.local-dir-whitelist=/opt/.livy-sessions/
    livy.spark.master=k8s://http://localhost:8443
    livy.server.session.state-retain.sec = 8h
  spark-defaults.conf: 'spark.kubernetes.container.image        "gnut3ll4/spark:v1.0.14"'
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: livy
  name: livy
spec:
  ports:
    - name: livy-rest
      port: 8998
      protocol: TCP
      targetPort: 8998
  selector:
    component: livy
  sessionAffinity: None
  type: ClusterIP
---
apiVersion: route.openshift.io/v1
kind: Route
metadata:
  labels:
    app: livy
  name: livy
spec:
  host: {livy-url}
  port:
    targetPort: livy-rest
  to:
    kind: Service
    name: livy
    weight: 100
  wildcardPolicy: None

Alkalmazása és a pod sikeres elindítása után a Livy grafikus felület a következő linken érhető el: http://{livy-url}/ui. A Livy segítségével közzétehetjük Spark-feladatunkat például a Postman REST kérésével. Az alábbiakban egy kéréseket tartalmazó gyűjtemény példáját mutatjuk be (az elindított feladat működéséhez szükséges változókat tartalmazó konfigurációs argumentumok az „args” tömbben adhatók át):

{
    "info": {
        "_postman_id": "be135198-d2ff-47b6-a33e-0d27b9dba4c8",
        "name": "Spark Livy",
        "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
    },
    "item": [
        {
            "name": "1 Submit job with jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar", nt"className": "org.apache.spark.examples.SparkPi",nt"numExecutors":1,nt"name": "spark-test-1",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt}n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        },
        {
            "name": "2 Submit job without jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "hdfs://{host}:{port}/{path-to-file-on-hdfs}", nt"className": "{class-name}",nt"numExecutors":1,nt"name": "spark-test-2",nt"proxyUser": "0",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt},nt"args": [ntt"HADOOP_CONF_DIR=/opt/spark/hadoop-conf",ntt"MASTER=k8s://https://kubernetes.default.svc:8443"nt]n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        }
    ],
    "event": [
        {
            "listen": "prerequest",
            "script": {
                "id": "41bea1d0-278c-40c9-ad42-bf2e6268897d",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        },
        {
            "listen": "test",
            "script": {
                "id": "3cdd7736-a885-4a2d-9668-bd75798f4560",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        }
    ],
    "protocolProfileBehavior": {}
}

Végezzük el az első kérést a gyűjteményből, menjünk az OKD felületre, és ellenőrizzük, hogy a feladat sikeresen elindult - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods. Ezzel egyidejűleg a Livy felületen (http://{livy-url}/ui) megjelenik egy munkamenet, amelyen belül a Livy API vagy grafikus felület segítségével nyomon követheti a feladat előrehaladását és tanulmányozhatja a munkamenetet rönkök.

Most pedig mutassuk meg, hogyan működik Livy. Ehhez vizsgáljuk meg a pod belsejében lévő Livy tároló naplóit a Livy szerverrel - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods/{livy-pod-name }?tab=logs. Láthatjuk belőlük, hogy a Livy REST API meghívásakor egy „livy” nevű konténerben egy spark-submit fut le, hasonlóan a fentebb használthoz (itt a {livy-pod-name} a létrehozott pod neve a Livy szerverrel). A gyűjtemény egy második lekérdezést is bevezet, amely lehetővé teszi olyan feladatok futtatását, amelyek távolról tárolnak egy Spark végrehajtható fájlt egy Livy-kiszolgáló segítségével.

Harmadik használati eset – Spark Operator

Most, hogy a feladatot tesztelték, felmerül a rendszeres futtatás kérdése. A Kubernetes-fürtben a feladatok rendszeres futtatásának natív módja a CronJob entitás, és használhatja is, de jelenleg nagyon népszerű az operátorok használata a Kubernetes alkalmazások kezelésére, és a Spark számára van egy meglehetősen érett operátor, amely szintén vállalati szintű megoldásokban használatos (például Lightbend FastData Platform). Javasoljuk a használatát - a Spark jelenlegi stabil verziója (2.4.5) meglehetősen korlátozott konfigurációs lehetőségeket kínál a Spark-feladatok futtatásához Kubernetesben, míg a következő nagyobb verzió (3.0.0) teljes mértékben támogatja a Kubernetes-et, de megjelenési dátuma ismeretlen . A Spark Operator ezt a hiányosságot azzal kompenzálja, hogy fontos konfigurációs lehetőségeket ad hozzá (például egy Hadoop hozzáférési konfigurációval rendelkező ConfigMap felszerelése a Spark podokhoz), és képes rendszeresen ütemezett feladatok futtatására.

Apache Spark futtatása Kubernetesen
Kiemeljük harmadik használati esetként – a Spark-feladatok rendszeres futtatása egy termelési ciklusban lévő Kubernetes-fürtön.

A Spark Operator nyílt forráskódú, és a Google Cloud Platformon belül fejlesztették ki - github.com/GoogleCloudPlatform/spark-on-k8s-operator. Telepítése 3 módon történhet:

  1. A Lightbend FastData Platform/Cloudflow telepítés részeként;
  2. Helm használata:
    helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
    helm install incubator/sparkoperator --namespace spark-operator
    	

  3. A hivatalos adattárból (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/tree/master/manifest) származó jegyzékek használata. Érdemes megjegyezni a következőket - A Cloudflow tartalmaz egy operátort az API v1beta1 verziójával. Ha ezt a típusú telepítést használja, a Spark-alkalmazások jegyzékleírásainak a megfelelő API-verziójú Gitben található példacímkéken kell alapulniuk, például „v1beta1-0.9.0-2.4.0”. Az operátor verziója megtalálható a CRD leírásában, amely az operátorban található a „verziók” szótárban:
    oc get crd sparkapplications.sparkoperator.k8s.io -o yaml
    	

Ha az operátor megfelelően van telepítve, egy aktív pod jelenik meg a Spark operátorral a megfelelő projektben (például cloudflow-fdp-sparkoperator a Cloudflow területen a Cloudflow telepítéshez), és megjelenik egy megfelelő Kubernetes erőforrástípus „sparkapplications” néven. . Az elérhető Spark-alkalmazásokat a következő paranccsal fedezheti fel:

oc get sparkapplications -n {project}

A feladatok Spark Operator használatával történő futtatásához 3 dolgot kell tennie:

  • hozzon létre egy Docker lemezképet, amely tartalmazza az összes szükséges könyvtárat, valamint konfigurációs és végrehajtható fájlokat. A célképben ez egy CI/CD szakaszban létrehozott kép, amelyet egy tesztklaszteren teszteltek;
  • Docker-kép közzététele a Kubernetes-fürtből elérhető rendszerleíró adatbázisban;
  • generáljon egy manifestet a „SparkApplication” típussal és az indítandó feladat leírásával. A példajegyzékek elérhetők a hivatalos adattárban (pl. github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta1-0.9.0-2.4.0/examples/spark-pi.yaml). Fontos megjegyezni a kiáltványt:
    1. az „apiVersion” szótárban fel kell tüntetni az üzemeltetői verziónak megfelelő API-verziót;
    2. a „metadata.namespace” szótárnak fel kell tüntetnie azt a névteret, amelyben az alkalmazás elindul;
    3. a „spec.image” szótárnak tartalmaznia kell a létrehozott Docker-kép címét egy elérhető nyilvántartásban;
    4. a „spec.mainClass” szótárnak tartalmaznia kell a Spark feladatosztályt, amelyet a folyamat indításakor le kell futtatni;
    5. a végrehajtható jar fájl elérési útját meg kell adni a „spec.mainApplicationFile” szótárban;
    6. a „spec.sparkVersion” szótárnak jeleznie kell a Spark használt verzióját;
    7. a „spec.driver.serviceAccount” szótárnak meg kell adnia az alkalmazás futtatásához használt szolgáltatásfiókot a megfelelő Kubernetes névtérben;
    8. a „spec.executor” szótárban fel kell tüntetni az alkalmazáshoz hozzárendelt erőforrások számát;
    9. a "spec.volumeMounts" szótárnak meg kell adnia azt a helyi könyvtárat, amelyben a helyi Spark feladatfájlok létrejönnek.

Példa egy jegyzék létrehozására (itt a {spark-service-account} egy szolgáltatásfiók a Kubernetes-fürtön belül Spark-feladatok futtatásához):

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: {spark-service-account}
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Ez a jegyzék egy szolgáltatásfiókot határoz meg, amelyhez a jegyzék közzététele előtt létre kell hoznia a szükséges szerepkör-összerendeléseket, amelyek biztosítják a szükséges hozzáférési jogokat a Spark-alkalmazás számára a Kubernetes API-val való interakcióhoz (ha szükséges). Esetünkben az alkalmazásnak jogokra van szüksége a pod-ok létrehozásához. Hozzuk létre a szükséges szerepkötést:

oc adm policy add-role-to-user edit system:serviceaccount:{project}:{spark-service-account} -n {project}

Azt is érdemes megjegyezni, hogy ez a jegyzékspecifikáció tartalmazhat egy „hadoopConfigMap” paramétert, amely lehetővé teszi egy ConfigMap megadását a Hadoop konfigurációval anélkül, hogy először el kellene helyeznie a megfelelő fájlt a Docker-képfájlban. Alkalmas feladatok rendszeres futtatására is - az „ütemezés” paraméterrel egy adott feladat futtatásának ütemezése adható meg.

Ezt követően mentjük a manifestet a spark-pi.yaml fájlba, és alkalmazzuk a Kubernetes-fürtünkre:

oc apply -f spark-pi.yaml

Ezzel létrehoz egy „sparkapplications” típusú objektumot:

oc get sparkapplications -n {project}
> NAME       AGE
> spark-pi   22h

Ebben az esetben létrejön egy pod egy alkalmazással, amelynek állapota megjelenik a létrehozott „sparkapplikációkban”. Megtekintheti a következő paranccsal:

oc get sparkapplications spark-pi -o yaml -n {project}

A feladat befejezése után a POD „Befejezve” állapotba kerül, amely a „sparkapplications”-ben is frissül. Az alkalmazásnaplók megtekinthetők a böngészőben vagy a következő paranccsal (itt a {sparkapplications-pod-name} a futó feladat podjának neve):

oc logs {sparkapplications-pod-name} -n {project}

A Spark feladatok a speciális sparkctl segédprogrammal is kezelhetők. A telepítéshez klónozza a tárolót a forráskódjával, telepítse a Go-t, és készítse el ezt a segédprogramot:

git clone https://github.com/GoogleCloudPlatform/spark-on-k8s-operator.git
cd spark-on-k8s-operator/
wget https://dl.google.com/go/go1.13.3.linux-amd64.tar.gz
tar -xzf go1.13.3.linux-amd64.tar.gz
sudo mv go /usr/local
mkdir $HOME/Projects
export GOROOT=/usr/local/go
export GOPATH=$HOME/Projects
export PATH=$GOPATH/bin:$GOROOT/bin:$PATH
go -version
cd sparkctl
go build -o sparkctl
sudo mv sparkctl /usr/local/bin

Vizsgáljuk meg a futó Spark-feladatok listáját:

sparkctl list -n {project}

Készítsünk leírást a Spark feladathoz:

vi spark-app.yaml

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1000m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Futtassuk le a leírt feladatot sparkctl segítségével:

sparkctl create spark-app.yaml -n {project}

Vizsgáljuk meg a futó Spark-feladatok listáját:

sparkctl list -n {project}

Vizsgáljuk meg egy elindított Spark feladat eseménylistáját:

sparkctl event spark-pi -n {project} -f

Vizsgáljuk meg a futó Spark feladat állapotát:

sparkctl status spark-pi -n {project}

Végezetül szeretném megvizsgálni a Spark jelenlegi stabil verziójának (2.4.5) Kubernetesben való használatának felfedezett hátrányait:

  1. Az első és talán a fő hátrány az Adathelység hiánya. A YARN minden hiányossága ellenére előnyei is voltak a használatának, például a kód adatba juttatásának elve (nem pedig adatból kódba). Ennek köszönhetően a Spark feladatok azokon a csomópontokon valósultak meg, ahol a számításba bevont adatok találhatók, és jelentősen lecsökkent az adatok hálózaton keresztüli szállításának ideje. A Kubernetes használatakor azzal kell szembesülnünk, hogy a feladathoz kapcsolódó adatokat a hálózaton át kell mozgatni. Ha elég nagyok, a feladat végrehajtási ideje jelentősen megnőhet, és meglehetősen nagy mennyiségű lemezterületet igényel a Spark-feladatpéldányok ideiglenes tárolására. Ez a hátrány csökkenthető speciális szoftverek használatával, amelyek biztosítják az adatok elhelyezkedését a Kubernetesben (például Alluxio), de ez valójában azt jelenti, hogy az adatok teljes másolatát kell tárolni a Kubernetes-fürt csomópontjain.
  2. A második fontos hátrány a biztonság. Alapértelmezés szerint a Spark-feladatok futtatásával kapcsolatos biztonsági szolgáltatások le vannak tiltva, a hivatalos dokumentáció nem tárgyalja a Kerberos használatát (bár a megfelelő opciókat a 3.0.0-s verzió bevezette, ami további munkát igényel), és a biztonsági dokumentáció a Spark használatával (https://spark.apache.org/docs/2.4.5/security.html) csak a YARN, a Mesos és az önálló fürt jelenik meg kulcstárolóként. Ugyanakkor azt a felhasználót, aki alatt a Spark-feladatok elindulnak, nem lehet közvetlenül megadni - csak azt a szolgáltatásfiókot adjuk meg, amely alatt működni fog, és a felhasználót a beállított biztonsági házirendek alapján választjuk ki. Ebben a tekintetben vagy a root felhasználót használják, ami nem biztonságos produktív környezetben, vagy egy véletlenszerű UID-vel rendelkező felhasználót, ami kényelmetlen az adatokhoz való hozzáférési jogok elosztása során (ez megoldható PodSecurityPolicies létrehozásával és összekapcsolásával a megfelelő szolgáltatási számlák). Jelenleg a megoldás az, hogy vagy az összes szükséges fájlt közvetlenül a Docker-lemezképbe helyezi, vagy módosítja a Spark indítószkriptet, hogy a szervezetében elfogadott titkok tárolására és visszanyerésére szolgáló mechanizmust használja.
  3. A Spark-feladatok futtatása Kubernetes használatával hivatalosan még kísérleti módban van, és a jövőben jelentős változások következhetnek be a felhasznált melléktermékekben (konfigurációs fájlok, Docker alapképek és indító szkriptek). És valóban, az anyag elkészítésekor a 2.3.0 és a 2.4.5 verziókat tesztelték, a viselkedés jelentősen eltért.

Várjuk a frissítéseket – a közelmúltban megjelent a Spark új verziója (3.0.0), amely jelentős változásokat hozott a Spark munkájában a Kubernetesen, de megőrizte az erőforrás-kezelő támogatásának kísérleti állapotát. Talán a következő frissítések valóban lehetővé teszik a YARN elhagyását és a Spark-feladatok futtatását a Kubernetes rendszeren anélkül, hogy félne a rendszer biztonsága és a funkcionális összetevők önálló módosítása nélkül.

Uszony.

Forrás: will.com

Hozzászólás