Rulează Apache Spark pe Kubernetes

Dragi cititori, bună seara. Astăzi vom vorbi puțin despre Apache Spark și perspectivele sale de dezvoltare.

Rulează Apache Spark pe Kubernetes

În lumea modernă a Big Data, Apache Spark este standardul de facto pentru dezvoltarea sarcinilor de procesare a datelor în lot. În plus, este folosit și pentru a crea aplicații de streaming care funcționează în conceptul micro batch, procesând și expediind date în porțiuni mici (Spark Structured Streaming). Și în mod tradițional a făcut parte din stiva generală Hadoop, folosind YARN (sau, în unele cazuri, Apache Mesos) ca manager de resurse. Până în 2020, utilizarea sa în forma sa tradițională este pusă sub semnul întrebării pentru majoritatea companiilor din cauza lipsei distribuțiilor Hadoop decente - dezvoltarea HDP și CDH s-a oprit, CDH nu este bine dezvoltat și are un cost ridicat, iar furnizorii Hadoop rămași au fie au încetat să mai existe, fie au un viitor slab. Prin urmare, lansarea Apache Spark folosind Kubernetes prezintă un interes din ce în ce mai mare în rândul comunității și al companiilor mari - devenind un standard în orchestrarea containerelor și gestionarea resurselor în cloud-urile private și publice, rezolvă problema cu programarea incomodă a resurselor sarcinilor Spark pe YARN și oferă o platformă în continuă dezvoltare, cu multe distribuții comerciale și deschise pentru companii de toate dimensiunile și liniile. În plus, în urma popularității, majoritatea au reușit deja să achiziționeze câteva instalații proprii și și-au sporit expertiza în utilizarea acesteia, ceea ce simplifică mutarea.

Începând cu versiunea 2.3.0, Apache Spark a dobândit suport oficial pentru rularea sarcinilor într-un cluster Kubernetes și astăzi, vom vorbi despre maturitatea actuală a acestei abordări, diverse opțiuni de utilizare a acesteia și capcanele care vor fi întâlnite în timpul implementării.

În primul rând, să ne uităm la procesul de dezvoltare a sarcinilor și aplicațiilor bazate pe Apache Spark și să evidențiem cazurile tipice în care trebuie să rulați o sarcină pe un cluster Kubernetes. În pregătirea acestei postări, OpenShift este folosit ca distribuție și vor fi date comenzi relevante pentru utilitarul său de linie de comandă (oc). Pentru alte distribuții Kubernetes, pot fi utilizate comenzile corespunzătoare din utilitarul standard de linie de comandă Kubernetes (kubectl) sau analogii acestora (de exemplu, pentru politica oc adm).

Primul caz de utilizare - spark-submit

În timpul dezvoltării sarcinilor și aplicațiilor, dezvoltatorul trebuie să execute sarcini pentru a depana transformarea datelor. Teoretic, stub-urile pot fi utilizate în aceste scopuri, dar dezvoltarea cu participarea instanțelor reale (deși de testare) a sistemelor finale s-a dovedit a fi mai rapidă și mai bună în această clasă de sarcini. În cazul în care depanăm pe instanțe reale ale sistemelor finale, sunt posibile două scenarii:

  • dezvoltatorul rulează o sarcină Spark local în modul de sine stătător;

    Rulează Apache Spark pe Kubernetes

  • un dezvoltator rulează o sarcină Spark pe un cluster Kubernetes într-o buclă de testare.

    Rulează Apache Spark pe Kubernetes

Prima opțiune are dreptul de a exista, dar implică o serie de dezavantaje:

  • Fiecare dezvoltator trebuie să aibă acces de la locul de muncă la toate instanțele sistemelor finale de care are nevoie;
  • este necesară o cantitate suficientă de resurse pe mașina de lucru pentru a rula sarcina în curs de dezvoltare.

A doua opțiune nu are aceste dezavantaje, deoarece utilizarea unui cluster Kubernetes vă permite să alocați pool-ul de resurse necesar pentru rularea sarcinilor și să îi oferiți accesul necesar la instanțele sistemului final, oferind în mod flexibil acces la acesta folosind modelul Kubernetes pentru toți membrii echipei de dezvoltare. Să-l evidențiem ca primul caz de utilizare - lansarea sarcinilor Spark de pe o mașină de dezvoltator local pe un cluster Kubernetes într-un circuit de testare.

Să vorbim mai multe despre procesul de configurare a Spark pentru a rula local. Pentru a începe să utilizați Spark, trebuie să îl instalați:

mkdir /opt/spark
cd /opt/spark
wget http://mirror.linux-ia64.org/apache/spark/spark-2.4.5/spark-2.4.5.tgz
tar zxvf spark-2.4.5.tgz
rm -f spark-2.4.5.tgz

Colectăm pachetele necesare pentru a lucra cu Kubernetes:

cd spark-2.4.5/
./build/mvn -Pkubernetes -DskipTests clean package

O construcție completă necesită mult timp, iar pentru a crea imagini Docker și a le rula pe un cluster Kubernetes, aveți nevoie doar de fișiere jar din directorul „asamblare/”, așa că puteți construi doar acest subproiect:

./build/mvn -f ./assembly/pom.xml -Pkubernetes -DskipTests clean package

Pentru a rula joburi Spark pe Kubernetes, trebuie să creați o imagine Docker pe care să o utilizați ca imagine de bază. Există 2 abordări posibile aici:

  • Imaginea Docker generată include codul de activitate executabil Spark;
  • Imaginea creată include doar Spark și dependențele necesare, codul executabil este găzduit de la distanță (de exemplu, în HDFS).

Mai întâi, să construim o imagine Docker care să conțină un exemplu de testare al unei sarcini Spark. Pentru a crea imagini Docker, Spark are un utilitar numit „docker-image-tool”. Să studiem ajutorul pentru el:

./bin/docker-image-tool.sh --help

Cu ajutorul acestuia, puteți crea imagini Docker și le puteți încărca în registre la distanță, dar implicit are o serie de dezavantaje:

  • fără greș creează 3 imagini Docker simultan - pentru Spark, PySpark și R;
  • nu vă permite să specificați un nume de imagine.

Prin urmare, vom folosi o versiune modificată a acestui utilitar, prezentată mai jos:

vi bin/docker-image-tool-upd.sh

#!/usr/bin/env bash

function error {
  echo "$@" 1>&2
  exit 1
}

if [ -z "${SPARK_HOME}" ]; then
  SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
. "${SPARK_HOME}/bin/load-spark-env.sh"

function image_ref {
  local image="$1"
  local add_repo="${2:-1}"
  if [ $add_repo = 1 ] && [ -n "$REPO" ]; then
    image="$REPO/$image"
  fi
  if [ -n "$TAG" ]; then
    image="$image:$TAG"
  fi
  echo "$image"
}

function build {
  local BUILD_ARGS
  local IMG_PATH

  if [ ! -f "$SPARK_HOME/RELEASE" ]; then
    IMG_PATH=$BASEDOCKERFILE
    BUILD_ARGS=(
      ${BUILD_PARAMS}
      --build-arg
      img_path=$IMG_PATH
      --build-arg
      datagram_jars=datagram/runtimelibs
      --build-arg
      spark_jars=assembly/target/scala-$SPARK_SCALA_VERSION/jars
    )
  else
    IMG_PATH="kubernetes/dockerfiles"
    BUILD_ARGS=(${BUILD_PARAMS})
  fi

  if [ -z "$IMG_PATH" ]; then
    error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
  fi

  if [ -z "$IMAGE_REF" ]; then
    error "Cannot find docker image reference. Please add -i arg."
  fi

  local BINDING_BUILD_ARGS=(
    ${BUILD_PARAMS}
    --build-arg
    base_img=$(image_ref $IMAGE_REF)
  )
  local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/docker/Dockerfile"}

  docker build $NOCACHEARG "${BUILD_ARGS[@]}" 
    -t $(image_ref $IMAGE_REF) 
    -f "$BASEDOCKERFILE" .
}

function push {
  docker push "$(image_ref $IMAGE_REF)"
}

function usage {
  cat <<EOF
Usage: $0 [options] [command]
Builds or pushes the built-in Spark Docker image.

Commands:
  build       Build image. Requires a repository address to be provided if the image will be
              pushed to a different registry.
  push        Push a pre-built image to a registry. Requires a repository address to be provided.

Options:
  -f file               Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
  -p file               Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark.
  -R file               Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark.
  -r repo               Repository address.
  -i name               Image name to apply to the built image, or to identify the image to be pushed.  
  -t tag                Tag to apply to the built image, or to identify the image to be pushed.
  -m                    Use minikube's Docker daemon.
  -n                    Build docker image with --no-cache
  -b arg      Build arg to build or push the image. For multiple build args, this option needs to
              be used separately for each build arg.

Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
available when running applications inside the minikube cluster.

Check the following documentation for more information on using the minikube Docker daemon:

  https://kubernetes.io/docs/getting-started-guides/minikube/#reusing-the-docker-daemon

Examples:
  - Build image in minikube with tag "testing"
    $0 -m -t testing build

  - Build and push image with tag "v2.3.0" to docker.io/myrepo
    $0 -r docker.io/myrepo -t v2.3.0 build
    $0 -r docker.io/myrepo -t v2.3.0 push
EOF
}

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
  usage
  exit 0
fi

REPO=
TAG=
BASEDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
IMAGE_REF=
while getopts f:mr:t:nb:i: option
do
 case "${option}"
 in
 f) BASEDOCKERFILE=${OPTARG};;
 r) REPO=${OPTARG};;
 t) TAG=${OPTARG};;
 n) NOCACHEARG="--no-cache";;
 i) IMAGE_REF=${OPTARG};;
 b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};;
 esac
done

case "${@: -1}" in
  build)
    build
    ;;
  push)
    if [ -z "$REPO" ]; then
      usage
      exit 1
    fi
    push
    ;;
  *)
    usage
    exit 1
    ;;
esac

Cu ajutorul acesteia, asamblam o imagine Spark de bază care conține o sarcină de testare pentru calcularea Pi folosind Spark (aici {docker-registry-url} este adresa URL a registrului de imagini Docker, {repo} este numele depozitului din registry, care se potrivește cu proiectul din OpenShift , {image-name} - numele imaginii (dacă se utilizează separarea pe trei niveluri a imaginilor, de exemplu, ca în registrul integrat al imaginilor Red Hat OpenShift), {tag} - eticheta acestei versiunea imaginii):

./bin/docker-image-tool-upd.sh -f resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile -r {docker-registry-url}/{repo} -i {image-name} -t {tag} build

Conectați-vă la clusterul OKD utilizând utilitarul consolei (aici {OKD-API-URL} este adresa URL API a clusterului OKD):

oc login {OKD-API-URL}

Să obținem simbolul utilizatorului curent pentru autorizare în Registrul Docker:

oc whoami -t

Conectați-vă la registrul Docker intern al cluster-ului OKD (folosim simbolul obținut folosind comanda anterioară ca parolă):

docker login {docker-registry-url}

Să încărcăm imaginea Docker asamblată în Docker Registry OKD:

./bin/docker-image-tool-upd.sh -r {docker-registry-url}/{repo} -i {image-name} -t {tag} push

Să verificăm dacă imaginea asamblată este disponibilă în OKD. Pentru a face acest lucru, deschideți adresa URL în browser cu o listă de imagini ale proiectului corespunzător (aici {proiect} este numele proiectului din clusterul OpenShift, {OKD-WEBUI-URL} este adresa URL a consolei Web OpenShift ) - https://{OKD-WEBUI-URL}/console /project/{project}/browse/images/{image-name}.

Pentru a rula sarcini, trebuie creat un cont de serviciu cu privilegiile de a rula pod-uri ca root (vom discuta acest punct mai târziu):

oc create sa spark -n {project}
oc adm policy add-scc-to-user anyuid -z spark -n {project}

Să rulăm comanda spark-submit pentru a publica o sarcină Spark în clusterul OKD, specificând contul de serviciu creat și imaginea Docker:

 /opt/spark/bin/spark-submit --name spark-test --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar

aici:

—name — numele sarcinii care va participa la formarea numelui podurilor Kubernetes;

—class — clasa fișierului executabil, apelată la pornirea sarcinii;

—conf — Parametrii de configurare Spark;

spark.executor.instances — numărul de executori Spark de lansat;

spark.kubernetes.authenticate.driver.serviceAccountName - numele contului de serviciu Kubernetes utilizat la lansarea podurilor (pentru a defini contextul de securitate și capabilitățile atunci când interacționați cu API-ul Kubernetes);

spark.kubernetes.namespace — Spațiul de nume Kubernetes în care vor fi lansate podurile de driver și executor;

spark.submit.deployMode — metoda de lansare a Spark (pentru „cluster” standard spark-submit se folosește, pentru Spark Operator și versiunile ulterioare ale Spark „client”);

spark.kubernetes.container.image - Imagine Docker folosită pentru lansarea podurilor;

spark.master — URL-ul API Kubernetes (este specificat extern, astfel încât accesul are loc de pe mașina locală);

local:// este calea către executabilul Spark în interiorul imaginii Docker.

Mergem la proiectul OKD corespunzător și studiem podurile create - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods.

Pentru a simplifica procesul de dezvoltare, poate fi folosită o altă opțiune, în care este creată o imagine de bază comună a Spark, folosită de toate sarcinile pentru a rula, iar instantaneele fișierelor executabile sunt publicate pe stocarea externă (de exemplu, Hadoop) și specificate la apelare. spark-submit ca link. În acest caz, puteți rula versiuni diferite de sarcini Spark fără a reconstrui imaginile Docker, folosind, de exemplu, WebHDFS pentru a publica imagini. Trimitem o solicitare de a crea un fișier (aici {host} este gazda serviciului WebHDFS, {port} este portul serviciului WebHDFS, {path-to-file-on-hdfs} este calea dorită către fișier pe HDFS):

curl -i -X PUT "http://{host}:{port}/webhdfs/v1/{path-to-file-on-hdfs}?op=CREATE

Veți primi un răspuns ca acesta (aici {location} este adresa URL care trebuie utilizată pentru a descărca fișierul):

HTTP/1.1 307 TEMPORARY_REDIRECT
Location: {location}
Content-Length: 0

Încărcați fișierul executabil Spark în HDFS (aici {path-to-local-file} este calea către fișierul executabil Spark pe gazda curentă):

curl -i -X PUT -T {path-to-local-file} "{location}"

După aceasta, putem face trimiterea prin spark folosind fișierul Spark încărcat în HDFS (aici {class-name} este numele clasei care trebuie lansată pentru a finaliza sarcina):

/opt/spark/bin/spark-submit --name spark-test --class {class-name} --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  hdfs://{host}:{port}/{path-to-file-on-hdfs}

Trebuie remarcat faptul că pentru a accesa HDFS și pentru a vă asigura că sarcina funcționează, poate fi necesar să modificați Dockerfile și scriptul entrypoint.sh - adăugați o directivă la Dockerfile pentru a copia bibliotecile dependente în directorul /opt/spark/jars și includeți fișierul de configurare HDFS în SPARK_CLASSPATH în punctul de intrare.

Al doilea caz de utilizare - Apache Livy

În plus, atunci când o sarcină este dezvoltată și rezultatul trebuie testat, se pune problema lansării acesteia ca parte a procesului CI/CD și urmărirea stării de execuție a acesteia. Desigur, îl puteți rula folosind un apel local de trimitere spark, dar acest lucru complică infrastructura CI/CD, deoarece necesită instalarea și configurarea Spark pe agenții/rulătorii serverului CI și configurarea accesului la API-ul Kubernetes. În acest caz, implementarea țintă a ales să folosească Apache Livy ca API REST pentru rularea sarcinilor Spark găzduite în interiorul unui cluster Kubernetes. Cu ajutorul acestuia, puteți rula sarcini Spark pe un cluster Kubernetes folosind solicitări cURL obișnuite, care se implementează cu ușurință pe baza oricărei soluții CI, iar plasarea acestuia în clusterul Kubernetes rezolvă problema autentificării atunci când interacționați cu API-ul Kubernetes.

Rulează Apache Spark pe Kubernetes

Să îl evidențiem ca un al doilea caz de utilizare - rularea sarcinilor Spark ca parte a unui proces CI/CD pe un cluster Kubernetes într-o buclă de testare.

Câteva despre Apache Livy - funcționează ca un server HTTP care oferă o interfață Web și un API RESTful care vă permite să lansați de la distanță spark-submit prin trecerea parametrilor necesari. În mod tradițional, a fost livrat ca parte a unei distribuții HDP, dar poate fi, de asemenea, implementat în OKD sau în orice altă instalare Kubernetes folosind manifestul corespunzător și un set de imagini Docker, cum ar fi aceasta - github.com/ttauveron/k8s-big-data-experiments/tree/master/livy-spark-2.3. Pentru cazul nostru, a fost construită o imagine Docker similară, inclusiv versiunea Spark 2.4.5 din următorul fișier Docker:

FROM java:8-alpine

ENV SPARK_HOME=/opt/spark
ENV LIVY_HOME=/opt/livy
ENV HADOOP_CONF_DIR=/etc/hadoop/conf
ENV SPARK_USER=spark

WORKDIR /opt

RUN apk add --update openssl wget bash && 
    wget -P /opt https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz && 
    tar xvzf spark-2.4.5-bin-hadoop2.7.tgz && 
    rm spark-2.4.5-bin-hadoop2.7.tgz && 
    ln -s /opt/spark-2.4.5-bin-hadoop2.7 /opt/spark

RUN wget http://mirror.its.dal.ca/apache/incubator/livy/0.7.0-incubating/apache-livy-0.7.0-incubating-bin.zip && 
    unzip apache-livy-0.7.0-incubating-bin.zip && 
    rm apache-livy-0.7.0-incubating-bin.zip && 
    ln -s /opt/apache-livy-0.7.0-incubating-bin /opt/livy && 
    mkdir /var/log/livy && 
    ln -s /var/log/livy /opt/livy/logs && 
    cp /opt/livy/conf/log4j.properties.template /opt/livy/conf/log4j.properties

ADD livy.conf /opt/livy/conf
ADD spark-defaults.conf /opt/spark/conf/spark-defaults.conf
ADD entrypoint.sh /entrypoint.sh

ENV PATH="/opt/livy/bin:${PATH}"

EXPOSE 8998

ENTRYPOINT ["/entrypoint.sh"]
CMD ["livy-server"]

Imaginea generată poate fi construită și încărcată în depozitul dvs. Docker existent, cum ar fi depozitul intern OKD. Pentru a-l implementa, utilizați următorul manifest ({registry-url} - adresa URL a registrului de imagini Docker, {image-name} - numele imaginii Docker, {tag} - eticheta imaginii Docker, {livy-url} - adresa URL dorită unde serverul va fi accesibil Livy; manifestul „Rută” este utilizat dacă Red Hat OpenShift este utilizat ca distribuție Kubernetes, în caz contrar este utilizat manifestul Ingress sau Service corespunzător de tip NodePort):

---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: livy
  name: livy
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      component: livy
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        component: livy
    spec:
      containers:
        - command:
            - livy-server
          env:
            - name: K8S_API_HOST
              value: localhost
            - name: SPARK_KUBERNETES_IMAGE
              value: 'gnut3ll4/spark:v1.0.14'
          image: '{registry-url}/{image-name}:{tag}'
          imagePullPolicy: Always
          name: livy
          ports:
            - containerPort: 8998
              name: livy-rest
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - mountPath: /var/log/livy
              name: livy-log
            - mountPath: /opt/.livy-sessions/
              name: livy-sessions
            - mountPath: /opt/livy/conf/livy.conf
              name: livy-config
              subPath: livy.conf
            - mountPath: /opt/spark/conf/spark-defaults.conf
              name: spark-config
              subPath: spark-defaults.conf
        - command:
            - /usr/local/bin/kubectl
            - proxy
            - '--port'
            - '8443'
          image: 'gnut3ll4/kubectl-sidecar:latest'
          imagePullPolicy: Always
          name: kubectl
          ports:
            - containerPort: 8443
              name: k8s-api
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      serviceAccount: spark
      serviceAccountName: spark
      terminationGracePeriodSeconds: 30
      volumes:
        - emptyDir: {}
          name: livy-log
        - emptyDir: {}
          name: livy-sessions
        - configMap:
            defaultMode: 420
            items:
              - key: livy.conf
                path: livy.conf
            name: livy-config
          name: livy-config
        - configMap:
            defaultMode: 420
            items:
              - key: spark-defaults.conf
                path: spark-defaults.conf
            name: livy-config
          name: spark-config
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: livy-config
data:
  livy.conf: |-
    livy.spark.deploy-mode=cluster
    livy.file.local-dir-whitelist=/opt/.livy-sessions/
    livy.spark.master=k8s://http://localhost:8443
    livy.server.session.state-retain.sec = 8h
  spark-defaults.conf: 'spark.kubernetes.container.image        "gnut3ll4/spark:v1.0.14"'
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: livy
  name: livy
spec:
  ports:
    - name: livy-rest
      port: 8998
      protocol: TCP
      targetPort: 8998
  selector:
    component: livy
  sessionAffinity: None
  type: ClusterIP
---
apiVersion: route.openshift.io/v1
kind: Route
metadata:
  labels:
    app: livy
  name: livy
spec:
  host: {livy-url}
  port:
    targetPort: livy-rest
  to:
    kind: Service
    name: livy
    weight: 100
  wildcardPolicy: None

După aplicarea acesteia și lansarea cu succes a podului, interfața grafică Livy este disponibilă la link-ul: http://{livy-url}/ui. Cu Livy, putem publica sarcina noastră Spark folosind o solicitare REST de la, de exemplu, Postman. Un exemplu de colecție cu solicitări este prezentat mai jos (argumentele de configurare cu variabile necesare funcționării sarcinii lansate pot fi trecute în tabloul „args”):

{
    "info": {
        "_postman_id": "be135198-d2ff-47b6-a33e-0d27b9dba4c8",
        "name": "Spark Livy",
        "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
    },
    "item": [
        {
            "name": "1 Submit job with jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar", nt"className": "org.apache.spark.examples.SparkPi",nt"numExecutors":1,nt"name": "spark-test-1",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt}n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        },
        {
            "name": "2 Submit job without jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "hdfs://{host}:{port}/{path-to-file-on-hdfs}", nt"className": "{class-name}",nt"numExecutors":1,nt"name": "spark-test-2",nt"proxyUser": "0",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt},nt"args": [ntt"HADOOP_CONF_DIR=/opt/spark/hadoop-conf",ntt"MASTER=k8s://https://kubernetes.default.svc:8443"nt]n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        }
    ],
    "event": [
        {
            "listen": "prerequest",
            "script": {
                "id": "41bea1d0-278c-40c9-ad42-bf2e6268897d",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        },
        {
            "listen": "test",
            "script": {
                "id": "3cdd7736-a885-4a2d-9668-bd75798f4560",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        }
    ],
    "protocolProfileBehavior": {}
}

Să executăm prima solicitare din colecție, să mergem la interfața OKD și să verificăm dacă sarcina a fost lansată cu succes - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods. În același timp, în interfața Livy va apărea o sesiune (http://{livy-url}/ui), în cadrul căreia, folosind API-ul Livy sau interfața grafică, puteți urmări progresul sarcinii și studiați sesiunea busteni.

Acum să arătăm cum funcționează Livy. Pentru a face acest lucru, să examinăm jurnalele containerului Livy din interiorul podului cu serverul Livy - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods/{livy-pod-name }?tab=logs. Din ele putem vedea că atunci când apelăm API-ul Livy REST într-un container numit „livy”, se execută un spark-submit, similar cu cel pe care l-am folosit mai sus (aici {livy-pod-name} este numele podului creat cu serverul Livy). Colecția introduce, de asemenea, o a doua interogare care vă permite să rulați sarcini care găzduiesc de la distanță un executabil Spark folosind un server Livy.

Al treilea caz de utilizare - Spark Operator

Acum că sarcina a fost testată, se pune problema rulării acesteia în mod regulat. Modul nativ de a rula în mod regulat sarcini într-un cluster Kubernetes este entitatea CronJob și o poți folosi, dar în acest moment folosirea operatorilor pentru gestionarea aplicațiilor în Kubernetes este foarte populară și pentru Spark există un operator destul de matur, care este, de asemenea, utilizat în soluții la nivel de întreprindere (de exemplu, Lightbend FastData Platform). Vă recomandăm să-l utilizați - versiunea actuală stabilă a Spark (2.4.5) are opțiuni de configurare destul de limitate pentru rularea sarcinilor Spark în Kubernetes, în timp ce următoarea versiune majoră (3.0.0) declară suport deplin pentru Kubernetes, dar data lansării rămâne necunoscută. . Operatorul Spark compensează acest neajuns adăugând opțiuni de configurare importante (de exemplu, montarea unei configurații ConfigMap cu acces Hadoop pe podurile Spark) și capacitatea de a rula o sarcină programată în mod regulat.

Rulează Apache Spark pe Kubernetes
Să îl evidențiem ca un al treilea caz de utilizare - rulând în mod regulat sarcini Spark pe un cluster Kubernetes într-o buclă de producție.

Spark Operator este open source și dezvoltat în cadrul Google Cloud Platform - github.com/GoogleCloudPlatform/spark-on-k8s-operator. Instalarea acestuia se poate face în 3 moduri:

  1. Ca parte a instalării Lightbend FastData Platform/Cloudflow;
  2. Folosind Helm:
    helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
    helm install incubator/sparkoperator --namespace spark-operator
    	

  3. Folosind manifeste din depozitul oficial (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/tree/master/manifest). Este demn de remarcat următoarele - Cloudflow include un operator cu versiunea API v1beta1. Dacă se utilizează acest tip de instalare, descrierile manifestului aplicației Spark ar trebui să se bazeze pe exemple de etichete în Git cu versiunea API corespunzătoare, de exemplu, „v1beta1-0.9.0-2.4.0”. Versiunea operatorului poate fi găsită în descrierea CRD inclusă în operator în dicționarul „versiuni”:
    oc get crd sparkapplications.sparkoperator.k8s.io -o yaml
    	

Dacă operatorul este instalat corect, un pod activ cu operatorul Spark va apărea în proiectul corespunzător (de exemplu, cloudflow-fdp-sparkoperator în spațiul Cloudflow pentru instalarea Cloudflow) și va apărea un tip de resursă Kubernetes corespunzător numit „sparkapplications”. . Puteți explora aplicațiile Spark disponibile cu următoarea comandă:

oc get sparkapplications -n {project}

Pentru a rula sarcini folosind Spark Operator, trebuie să faceți 3 lucruri:

  • creați o imagine Docker care include toate bibliotecile necesare, precum și fișierele de configurare și executabile. În imaginea țintă, aceasta este o imagine creată în stadiul CI/CD și testată pe un cluster de testare;
  • publicați o imagine Docker într-un registru accesibil din clusterul Kubernetes;
  • generați un manifest cu tipul „SparkApplication” și o descriere a sarcinii care urmează să fie lansată. Exemple de manifeste sunt disponibile în depozitul oficial (de ex. github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta1-0.9.0-2.4.0/examples/spark-pi.yaml). Există puncte importante de reținut despre manifest:
    1. dicționarul „apiVersion” trebuie să indice versiunea API corespunzătoare versiunii operatorului;
    2. dicționarul „metadata.namespace” trebuie să indice spațiul de nume în care va fi lansată aplicația;
    3. dicționarul „spec.image” trebuie să conțină adresa imaginii Docker create într-un registru accesibil;
    4. dicționarul „spec.mainClass” trebuie să conțină clasa de activități Spark care trebuie să fie rulată când începe procesul;
    5. dicționarul „spec.mainApplicationFile” trebuie să conțină calea către fișierul jar executabil;
    6. dicționarul „spec.sparkVersion” trebuie să indice versiunea de Spark utilizată;
    7. dicționarul „spec.driver.serviceAccount” trebuie să specifice contul de serviciu în spațiul de nume Kubernetes corespunzător care va fi folosit pentru a rula aplicația;
    8. dicționarul „spec.executor” trebuie să indice numărul de resurse alocate aplicației;
    9. dicționarul „spec.volumeMounts” trebuie să specifice directorul local în care vor fi create fișierele locale de activități Spark.

Un exemplu de generare a unui manifest (aici {spark-service-account} este un cont de serviciu în cadrul clusterului Kubernetes pentru rularea sarcinilor Spark):

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: {spark-service-account}
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Acest manifest specifică un cont de serviciu pentru care, înainte de a publica manifestul, trebuie să creați legăturile de rol necesare care oferă drepturile de acces necesare pentru ca aplicația Spark să interacționeze cu API-ul Kubernetes (dacă este necesar). În cazul nostru, aplicația are nevoie de drepturi pentru a crea Pod-uri. Să creăm legarea de rol necesară:

oc adm policy add-role-to-user edit system:serviceaccount:{project}:{spark-service-account} -n {project}

De asemenea, este de remarcat faptul că această specificație manifest poate include un parametru „hadoopConfigMap”, care vă permite să specificați un ConfigMap cu configurația Hadoop fără a fi nevoie să plasați mai întâi fișierul corespunzător în imaginea Docker. Este, de asemenea, potrivit pentru rularea sarcinilor în mod regulat - folosind parametrul „programare”, poate fi specificat un program pentru rularea unei anumite sarcini.

După aceea, salvăm manifestul nostru în fișierul spark-pi.yaml și îl aplicăm clusterului nostru Kubernetes:

oc apply -f spark-pi.yaml

Aceasta va crea un obiect de tip „sparkapplications”:

oc get sparkapplications -n {project}
> NAME       AGE
> spark-pi   22h

În acest caz, va fi creat un pod cu o aplicație, a cărei stare va fi afișată în „sparkapplications” creat. Îl puteți vizualiza cu următoarea comandă:

oc get sparkapplications spark-pi -o yaml -n {project}

La finalizarea sarcinii, POD-ul va trece la starea „Terminat”, care se va actualiza și în „sparkapplications”. Jurnalele aplicațiilor pot fi vizualizate în browser sau utilizând următoarea comandă (aici {sparkapplications-pod-name} este numele pod-ului sarcinii care rulează):

oc logs {sparkapplications-pod-name} -n {project}

Sarcinile Spark pot fi gestionate și folosind utilitarul specializat sparkctl. Pentru a-l instala, clonați depozitul cu codul sursă, instalați Go și construiți acest utilitar:

git clone https://github.com/GoogleCloudPlatform/spark-on-k8s-operator.git
cd spark-on-k8s-operator/
wget https://dl.google.com/go/go1.13.3.linux-amd64.tar.gz
tar -xzf go1.13.3.linux-amd64.tar.gz
sudo mv go /usr/local
mkdir $HOME/Projects
export GOROOT=/usr/local/go
export GOPATH=$HOME/Projects
export PATH=$GOPATH/bin:$GOROOT/bin:$PATH
go -version
cd sparkctl
go build -o sparkctl
sudo mv sparkctl /usr/local/bin

Să examinăm lista sarcinilor Spark care rulează:

sparkctl list -n {project}

Să creăm o descriere pentru sarcina Spark:

vi spark-app.yaml

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1000m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Să rulăm sarcina descrisă folosind sparkctl:

sparkctl create spark-app.yaml -n {project}

Să examinăm lista sarcinilor Spark care rulează:

sparkctl list -n {project}

Să examinăm lista de evenimente a unei sarcini Spark lansate:

sparkctl event spark-pi -n {project} -f

Să examinăm starea sarcinii Spark care rulează:

sparkctl status spark-pi -n {project}

În concluzie, aș dori să iau în considerare dezavantajele descoperite ale utilizării versiunii stabile actuale a Spark (2.4.5) în Kubernetes:

  1. Primul și, poate, principalul dezavantaj este lipsa Data Locality. În ciuda tuturor deficiențelor YARN, au existat și avantaje în utilizarea lui, de exemplu, principiul livrării codului către date (mai degrabă decât date către cod). Datorită acesteia, sarcinile Spark au fost executate pe nodurile în care erau localizate datele implicate în calcule, iar timpul necesar pentru livrarea datelor prin rețea a fost redus semnificativ. Când folosim Kubernetes, ne confruntăm cu nevoia de a muta în rețea datele implicate într-o sarcină. Dacă sunt suficient de mari, timpul de execuție a sarcinii poate crește semnificativ și, de asemenea, necesită o cantitate destul de mare de spațiu pe disc alocată instanțelor de activitate Spark pentru stocarea lor temporară. Acest dezavantaj poate fi atenuat prin utilizarea unui software specializat care asigură localitatea datelor în Kubernetes (de exemplu, Alluxio), dar asta înseamnă de fapt necesitatea stocării unei copii complete a datelor pe nodurile clusterului Kubernetes.
  2. Al doilea dezavantaj important este securitatea. În mod implicit, caracteristicile legate de securitate pentru rularea sarcinilor Spark sunt dezactivate, utilizarea Kerberos nu este acoperită în documentația oficială (deși opțiunile corespunzătoare au fost introduse în versiunea 3.0.0, care va necesita muncă suplimentară) și în documentația pentru securitate când utilizați Spark (https ://spark.apache.org/docs/2.4.5/security.html) numai YARN, Mesos și Standalone Cluster apar ca depozite de cheie. În același timp, utilizatorul sub care sunt lansate sarcinile Spark nu poate fi specificat direct - specificăm doar contul de serviciu sub care va funcționa, iar utilizatorul este selectat pe baza politicilor de securitate configurate. În acest sens, se folosește fie utilizatorul root, care nu este sigur într-un mediu productiv, fie un utilizator cu un UID aleator, care este incomod la distribuirea drepturilor de acces la date (acest lucru poate fi rezolvat prin crearea PodSecurityPolicies și conectarea acestora la conturile de servicii corespunzătoare). În prezent, soluția este fie să plasați toate fișierele necesare direct în imaginea Docker, fie să modificați scriptul de lansare Spark pentru a utiliza mecanismul de stocare și recuperare a secretelor adoptat în organizația dvs.
  3. Rularea joburilor Spark folosind Kubernetes este oficial încă în modul experimental și pot exista modificări semnificative ale artefactelor utilizate (fișiere de configurare, imagini de bază Docker și scripturi de lansare) în viitor. Și într-adevăr, la pregătirea materialului, au fost testate versiunile 2.3.0 și 2.4.5, comportamentul a fost semnificativ diferit.

Să așteptăm actualizări - recent a fost lansată o nouă versiune de Spark (3.0.0), care a adus schimbări semnificative în activitatea Spark pe Kubernetes, dar a păstrat statutul experimental de suport pentru acest manager de resurse. Poate că următoarele actualizări vor face cu adevărat posibilă recomandarea completă a abandonului YARN și a rulării sarcinilor Spark pe Kubernetes, fără teamă pentru securitatea sistemului dvs. și fără a fi nevoie să modificați în mod independent componentele funcționale.

Capăt

Sursa: www.habr.com

Adauga un comentariu