Изпълнение на Apache Spark на Kubernetes

Уважаеми читатели, добър ден. Днес ще поговорим малко за Apache Spark и неговите перспективи за развитие.

Изпълнение на Apache Spark на Kubernetes

В съвременния свят на големи данни Apache Spark е де факто стандартът за разработване на задачи за пакетна обработка на данни. В допълнение, той се използва и за създаване на стрийминг приложения, които работят в концепцията за микро пакети, обработват и качват данни на малки порции (Spark Structured Streaming). И традиционно той е част от цялостния стек на Hadoop, използвайки YARN (или, в някои случаи, Apache Mesos) като мениджър на ресурси. До 2020 г. използването му в традиционната му форма за повечето компании е под голям въпрос поради липсата на достойни дистрибуции на Hadoop - развитието на HDP и CDH е спряло, CDH е слабо развит и има висока цена, а останалата част от Hadoop доставчиците или са престанали да съществуват, или имат неясно бъдеще. Следователно общността и големите компании се интересуват все повече от стартирането на Apache Spark с помощта на Kubernetes - след като се превърна в стандарт в оркестрацията на контейнери и управлението на ресурсите в частни и публични облаци, той решава проблема с неудобното планиране на ресурсите за задачите на Spark на YARN и осигурява стабилно развиваща се платформа с много търговски дистрибуции и дистрибуции с отворен код за компании от всякакъв размер. Освен това, на вълната на популярност, повечето вече са успели да придобият няколко собствени инсталации и да натрупат опит в използването им, което опростява преместването.

Започвайки с версия 2.3.0, Apache Spark получи официална поддръжка за изпълнение на задачи в клъстер на Kubernetes и днес ще говорим за текущата зрялост на този подход, различните опции за неговото използване и клопките, които ще срещнем по време на внедряването.

Първо, нека да разгледаме процеса на разработване на задачи и приложения, базирани на Apache Spark, и да подчертаем типични случаи, в които искате да изпълните задача на Kubernetes клъстер. При подготовката на тази публикация OpenShift се използва като дистрибуция и ще бъдат дадени команди, свързани с неговата помощна програма за команден ред (oc). За други дистрибуции на Kubernetes могат да се използват подходящите команди на стандартната помощна програма за команден ред на Kubernetes (kubectl) или техни еквиваленти (например за oc adm policy).

Първият случай на използване е spark-submit

По време на разработването на задачи и приложения, разработчикът трябва да изпълнява задачи за отстраняване на грешки при трансформацията на данни. Теоретично мъничетата могат да се използват за тези цели, но разработката, включваща реални (макар и тестови) екземпляри на крайни системи, се оказа по-бърза и по-добра в този клас задачи. В случай, че дебъгваме на реални екземпляри на крайни системи, са възможни два сценария на работа:

  • разработчикът изпълнява задачата на Spark локално в самостоятелен режим;

    Изпълнение на Apache Spark на Kubernetes

  • разработчик изпълнява задача на Spark на клъстер на Kubernetes в тестов цикъл.

    Изпълнение на Apache Spark на Kubernetes

Първият вариант има право да съществува, но включва редица недостатъци:

  • за всеки разработчик се изисква да осигури достъп от работното място до всички копия на крайни системи, от които се нуждае;
  • необходими са достатъчно ресурси на производствената машина за изпълнение на задачата, която се разработва.

Вторият вариант е лишен от тези недостатъци, тъй като използването на клъстер на Kubernetes ви позволява да разпределите необходимия набор от ресурси за изпълнение на задачи и да му предоставите необходимия достъп до екземпляри на крайни системи, като гъвкаво предоставяте достъп до него с помощта на ролевия модел на Kubernetes за всички членове на екипа за разработка. Нека го подчертаем като първия случай на използване - изпълнение на задачи на Spark от локална машина за разработчици на клъстер на Kubernetes в тестов цикъл.

Нека разгледаме по-подробно процеса на настройка на Spark за локално изпълнение. За да започнете да използвате Spark, трябва да го инсталирате:

mkdir /opt/spark
cd /opt/spark
wget http://mirror.linux-ia64.org/apache/spark/spark-2.4.5/spark-2.4.5.tgz
tar zxvf spark-2.4.5.tgz
rm -f spark-2.4.5.tgz

Ние събираме необходимите пакети за работа с Kubernetes:

cd spark-2.4.5/
./build/mvn -Pkubernetes -DskipTests clean package

Пълното изграждане отнема много време и в действителност са необходими само буркани от директорията „assembly/“, за да се създадат Docker изображения и да се изпълнят на Kubernetes клъстер, така че само този подпроект може да бъде изграден:

./build/mvn -f ./assembly/pom.xml -Pkubernetes -DskipTests clean package

Изпълнението на задачи на Spark в Kubernetes изисква да създадете Docker изображение, което да използвате като базово изображение. Тук има 2 възможни подхода:

  • Генерираното изображение на Docker включва изпълнимия код на задачата на Spark;
  • Създаденото изображение включва само Spark и необходимите зависимости, изпълнимият код се хоства отдалечено (например в HDFS).

Първо, нека изградим изображение на Docker, съдържащо тестов случай на задача на Spark. За създаване на Docker изображения, Spark има съответна помощна програма, наречена „docker-image-tool“. Нека го проучим за помощ:

./bin/docker-image-tool.sh --help

Може да се използва за създаване на Docker изображения и качването им в отдалечени регистри, но по подразбиране има редица недостатъци:

  • безпроблемно създава 3 Docker изображения наведнъж - за Spark, PySpark и R;
  • не ви позволява да посочите име на изображение.

Затова ще използваме модифицирана версия на тази помощна програма, дадена по-долу:

vi bin/docker-image-tool-upd.sh

#!/usr/bin/env bash

function error {
  echo "$@" 1>&2
  exit 1
}

if [ -z "${SPARK_HOME}" ]; then
  SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
. "${SPARK_HOME}/bin/load-spark-env.sh"

function image_ref {
  local image="$1"
  local add_repo="${2:-1}"
  if [ $add_repo = 1 ] && [ -n "$REPO" ]; then
    image="$REPO/$image"
  fi
  if [ -n "$TAG" ]; then
    image="$image:$TAG"
  fi
  echo "$image"
}

function build {
  local BUILD_ARGS
  local IMG_PATH

  if [ ! -f "$SPARK_HOME/RELEASE" ]; then
    IMG_PATH=$BASEDOCKERFILE
    BUILD_ARGS=(
      ${BUILD_PARAMS}
      --build-arg
      img_path=$IMG_PATH
      --build-arg
      datagram_jars=datagram/runtimelibs
      --build-arg
      spark_jars=assembly/target/scala-$SPARK_SCALA_VERSION/jars
    )
  else
    IMG_PATH="kubernetes/dockerfiles"
    BUILD_ARGS=(${BUILD_PARAMS})
  fi

  if [ -z "$IMG_PATH" ]; then
    error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
  fi

  if [ -z "$IMAGE_REF" ]; then
    error "Cannot find docker image reference. Please add -i arg."
  fi

  local BINDING_BUILD_ARGS=(
    ${BUILD_PARAMS}
    --build-arg
    base_img=$(image_ref $IMAGE_REF)
  )
  local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/docker/Dockerfile"}

  docker build $NOCACHEARG "${BUILD_ARGS[@]}" 
    -t $(image_ref $IMAGE_REF) 
    -f "$BASEDOCKERFILE" .
}

function push {
  docker push "$(image_ref $IMAGE_REF)"
}

function usage {
  cat <<EOF
Usage: $0 [options] [command]
Builds or pushes the built-in Spark Docker image.

Commands:
  build       Build image. Requires a repository address to be provided if the image will be
              pushed to a different registry.
  push        Push a pre-built image to a registry. Requires a repository address to be provided.

Options:
  -f file               Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
  -p file               Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark.
  -R file               Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark.
  -r repo               Repository address.
  -i name               Image name to apply to the built image, or to identify the image to be pushed.  
  -t tag                Tag to apply to the built image, or to identify the image to be pushed.
  -m                    Use minikube's Docker daemon.
  -n                    Build docker image with --no-cache
  -b arg      Build arg to build or push the image. For multiple build args, this option needs to
              be used separately for each build arg.

Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
available when running applications inside the minikube cluster.

Check the following documentation for more information on using the minikube Docker daemon:

  https://kubernetes.io/docs/getting-started-guides/minikube/#reusing-the-docker-daemon

Examples:
  - Build image in minikube with tag "testing"
    $0 -m -t testing build

  - Build and push image with tag "v2.3.0" to docker.io/myrepo
    $0 -r docker.io/myrepo -t v2.3.0 build
    $0 -r docker.io/myrepo -t v2.3.0 push
EOF
}

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
  usage
  exit 0
fi

REPO=
TAG=
BASEDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
IMAGE_REF=
while getopts f:mr:t:nb:i: option
do
 case "${option}"
 in
 f) BASEDOCKERFILE=${OPTARG};;
 r) REPO=${OPTARG};;
 t) TAG=${OPTARG};;
 n) NOCACHEARG="--no-cache";;
 i) IMAGE_REF=${OPTARG};;
 b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};;
 esac
done

case "${@: -1}" in
  build)
    build
    ;;
  push)
    if [ -z "$REPO" ]; then
      usage
      exit 1
    fi
    push
    ;;
  *)
    usage
    exit 1
    ;;
esac

С негова помощ изграждаме основно изображение на Spark, което съдържа тестова задача за изчисляване на числото Pi с помощта на Spark (тук {docker-registry-url} е URL адресът на вашия регистър на изображения на Docker, {repo} е името на хранилището вътре регистърът, който съответства на проекта в OpenShift, {image-name} е името на изображението (ако се използва разделяне на изображения на три нива, например, както в интегрирания регистър на изображения на Red Hat OpenShift), {tag} е етикетът на тази версия на изображението):

./bin/docker-image-tool-upd.sh -f resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile -r {docker-registry-url}/{repo} -i {image-name} -t {tag} build

Упълномощавайте в клъстера OKD, като използвате помощната програма на конзолата (тук {OKD-API-URL} е URL адресът на API на клъстера OKD):

oc login {OKD-API-URL}

Нека вземем токена на текущия потребител за оторизация в регистъра на Docker:

oc whoami -t

Упълномощаване във вътрешния регистър на Docker на клъстера OKD (използваме токена, получен чрез предишната команда като парола):

docker login {docker-registry-url}

Качете изграденото изображение на Docker в регистъра на Docker OKD:

./bin/docker-image-tool-upd.sh -r {docker-registry-url}/{repo} -i {image-name} -t {tag} push

Нека проверим дали изграденото изображение е налично в OKD. За да направите това, отворете URL адреса със списък от изображения на съответния проект в браузъра (тук {project} е името на проекта в клъстера OpenShift, {OKD-WEBUI-URL} е URL адресът на уеб конзолата OpenShift ) — https://{OKD-WEBUI-URL}/console /project/{project}/browse/images/{image-name}.

За да изпълнявате задачи, трябва да бъде създаден акаунт за услуга с привилегии за стартиране на pods като root (ще обсъдим тази точка по-късно):

oc create sa spark -n {project}
oc adm policy add-scc-to-user anyuid -z spark -n {project}

Нека изпълним командата spark-submit, за да публикуваме задачата Spark в OKD клъстера, като посочим създадения акаунт за услуга и изображението на Docker:

 /opt/spark/bin/spark-submit --name spark-test --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar

Тук:

--name - името на задачата, която ще участва във формирането на името на Kubernetes pods;

--class — клас на изпълнимия файл, извикан при стартиране на задачата;

--conf - конфигурационни параметри на Spark;

spark.executor.instances - брой изпълнители на Spark за изпълнение

spark.kubernetes.authenticate.driver.serviceAccountName - Името на акаунта за услуга Kubernetes, използван при стартиране на Pods (за определяне на контекста на сигурността и възможностите при взаимодействие с Kubernetes API)

spark.kubernetes.namespace - пространството от имена на Kubernetes, в което ще се изпълняват модулите на драйвера и изпълнителя;

spark.submit.deployMode - как да стартирате Spark (за стандартен spark-submit използвайте "cluster", за Spark Operator и по-нови версии на Spark "client");

spark.kubernetes.container.image - Docker изображение, използвано за стартиране на pods

spark.master - URL адрес на Kubernetes API (посочен е външен, така че извикването се извършва от локалната машина);

local:// е пътят до изпълнимия файл на Spark в изображението на Docker.

Отиваме до съответния OKD проект и изучаваме създадените pods - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods.

За да се опрости процеса на разработка, може да се използва друга опция, при която се създава общо базово изображение на Spark, използвано от всички задачи за изпълнение, и моментни снимки на изпълними файлове се публикуват във външно хранилище (например Hadoop) и се посочват при извикване на spark -изпратете като връзка. В този случай можете да изпълнявате различни версии на задачите на Spark, без да изграждате отново Docker изображения, като използвате например WebHDFS за публикуване на изображенията. Изпращаме заявка за създаване на файл (тук {host} е хостът на услугата WebHDFS, {port} е портът на услугата WebHDFS, {path-to-file-on-hdfs} е желаният път до файла на HDFS):

curl -i -X PUT "http://{host}:{port}/webhdfs/v1/{path-to-file-on-hdfs}?op=CREATE

Това ще върне отговор на формуляра (тук {location} е URL адресът, който да се използва за изтегляне на файла):

HTTP/1.1 307 TEMPORARY_REDIRECT
Location: {location}
Content-Length: 0

Качете изпълнимия файл на Spark в HDFS (където {path-to-local-file} е пътят до изпълнимия файл на Spark на текущия хост):

curl -i -X PUT -T {path-to-local-file} "{location}"

След това можем да изпратим чрез Spark файл, качен в HDFS (тук {class-name} е името на класа, който трябва да бъде стартиран, за да завърши задачата):

/opt/spark/bin/spark-submit --name spark-test --class {class-name} --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  hdfs://{host}:{port}/{path-to-file-on-hdfs}

В същото време трябва да се отбележи, че за да получите достъп до HDFS и да направите задачата да работи, може да е необходимо да промените Dockerfile и скрипта entrypoint.sh - добавете директива към Dockerfile за копиране на зависими библиотеки в /opt /spark/jars директория и включете HDFS конфигурационния файл в SPARK_CLASSPATH във входната точка.

Вторият случай на използване е Apache Livy

Освен това, когато задачата е разработена и е необходимо да се тества полученият резултат, възниква въпросът за стартирането й като част от процеса CI / CD и проследяване на състоянието на нейното изпълнение. Разбира се, можете да го стартирате с локално повикване за spark-submit, но това усложнява CI / CD инфраструктурата, защото изисква инсталиране и конфигуриране на Spark на агентите / изпълнителите на CI сървъра и конфигурацията на достъпа до Kubernetes API. В този случай целевата реализация е избрала да използва Apache Livy като REST API за изпълнение на задачи на Spark, хоствани в клъстер на Kubernetes. С него можете да изпълнявате задачи на Spark на клъстер на Kubernetes, като използвате редовни заявки cURL, което лесно се внедрява въз основа на всяко CI решение, а поставянето му в клъстера на Kubernetes решава проблема с удостоверяването при взаимодействие с API на Kubernetes.

Изпълнение на Apache Spark на Kubernetes

Нека го подчертаем като втори случай на използване - изпълнение на задачи на Spark като част от CI / CD процес на клъстер на Kubernetes в тестов цикъл.

Малко за Apache Livy - той работи като HTTP сървър, който предоставя уеб интерфейс и RESTful API, който ви позволява отдалечено да стартирате spark-submit чрез предаване на необходимите параметри. Традиционно се доставя като част от дистрибуцията на HDP, но може също да се внедри в OKD или друга инсталация на Kubernetes, като се използва подходящият манифест и набор от Docker изображения, като това - github.com/ttauveron/k8s-big-data-experiments/tree/master/livy-spark-2.3. За нашия случай беше създадено подобно Docker изображение, включително Spark версия 2.4.5 от следния Dockerfile:

FROM java:8-alpine

ENV SPARK_HOME=/opt/spark
ENV LIVY_HOME=/opt/livy
ENV HADOOP_CONF_DIR=/etc/hadoop/conf
ENV SPARK_USER=spark

WORKDIR /opt

RUN apk add --update openssl wget bash && 
    wget -P /opt https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz && 
    tar xvzf spark-2.4.5-bin-hadoop2.7.tgz && 
    rm spark-2.4.5-bin-hadoop2.7.tgz && 
    ln -s /opt/spark-2.4.5-bin-hadoop2.7 /opt/spark

RUN wget http://mirror.its.dal.ca/apache/incubator/livy/0.7.0-incubating/apache-livy-0.7.0-incubating-bin.zip && 
    unzip apache-livy-0.7.0-incubating-bin.zip && 
    rm apache-livy-0.7.0-incubating-bin.zip && 
    ln -s /opt/apache-livy-0.7.0-incubating-bin /opt/livy && 
    mkdir /var/log/livy && 
    ln -s /var/log/livy /opt/livy/logs && 
    cp /opt/livy/conf/log4j.properties.template /opt/livy/conf/log4j.properties

ADD livy.conf /opt/livy/conf
ADD spark-defaults.conf /opt/spark/conf/spark-defaults.conf
ADD entrypoint.sh /entrypoint.sh

ENV PATH="/opt/livy/bin:${PATH}"

EXPOSE 8998

ENTRYPOINT ["/entrypoint.sh"]
CMD ["livy-server"]

Генерираното изображение може да бъде изградено и качено в хранилище на Docker, което имате, като например вътрешното хранилище на OKD. За да го внедрите, използвайте следния манифест ({registry-url} — URL адрес на регистъра на изображение на Docker, {image-name} — Име на изображение на Docker, {tag} — Таг на изображение на Docker, {livy-url} — желан URL адрес, където сървърът ще бъде наличен Livy; манифестът „Route“ се използва, ако Red Hat OpenShift се използва като дистрибуция на Kubernetes, в противен случай се използва съответният манифест на Ingress или Service от тип NodePort):

---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: livy
  name: livy
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      component: livy
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        component: livy
    spec:
      containers:
        - command:
            - livy-server
          env:
            - name: K8S_API_HOST
              value: localhost
            - name: SPARK_KUBERNETES_IMAGE
              value: 'gnut3ll4/spark:v1.0.14'
          image: '{registry-url}/{image-name}:{tag}'
          imagePullPolicy: Always
          name: livy
          ports:
            - containerPort: 8998
              name: livy-rest
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - mountPath: /var/log/livy
              name: livy-log
            - mountPath: /opt/.livy-sessions/
              name: livy-sessions
            - mountPath: /opt/livy/conf/livy.conf
              name: livy-config
              subPath: livy.conf
            - mountPath: /opt/spark/conf/spark-defaults.conf
              name: spark-config
              subPath: spark-defaults.conf
        - command:
            - /usr/local/bin/kubectl
            - proxy
            - '--port'
            - '8443'
          image: 'gnut3ll4/kubectl-sidecar:latest'
          imagePullPolicy: Always
          name: kubectl
          ports:
            - containerPort: 8443
              name: k8s-api
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      serviceAccount: spark
      serviceAccountName: spark
      terminationGracePeriodSeconds: 30
      volumes:
        - emptyDir: {}
          name: livy-log
        - emptyDir: {}
          name: livy-sessions
        - configMap:
            defaultMode: 420
            items:
              - key: livy.conf
                path: livy.conf
            name: livy-config
          name: livy-config
        - configMap:
            defaultMode: 420
            items:
              - key: spark-defaults.conf
                path: spark-defaults.conf
            name: livy-config
          name: spark-config
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: livy-config
data:
  livy.conf: |-
    livy.spark.deploy-mode=cluster
    livy.file.local-dir-whitelist=/opt/.livy-sessions/
    livy.spark.master=k8s://http://localhost:8443
    livy.server.session.state-retain.sec = 8h
  spark-defaults.conf: 'spark.kubernetes.container.image        "gnut3ll4/spark:v1.0.14"'
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: livy
  name: livy
spec:
  ports:
    - name: livy-rest
      port: 8998
      protocol: TCP
      targetPort: 8998
  selector:
    component: livy
  sessionAffinity: None
  type: ClusterIP
---
apiVersion: route.openshift.io/v1
kind: Route
metadata:
  labels:
    app: livy
  name: livy
spec:
  host: {livy-url}
  port:
    targetPort: livy-rest
  to:
    kind: Service
    name: livy
    weight: 100
  wildcardPolicy: None

След като го приложите и успешно стартирате под, Livy GUI е достъпен на: http://{livy-url}/ui. С Livy можем да публикуваме нашата Spark задача, използвайки REST заявка от Postman, например. Пример за колекция със заявки е представен по-долу (в масива "args" могат да бъдат предадени конфигурационни аргументи с променливи, необходими за работата на стартираната задача):

{
    "info": {
        "_postman_id": "be135198-d2ff-47b6-a33e-0d27b9dba4c8",
        "name": "Spark Livy",
        "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
    },
    "item": [
        {
            "name": "1 Submit job with jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar", nt"className": "org.apache.spark.examples.SparkPi",nt"numExecutors":1,nt"name": "spark-test-1",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt}n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        },
        {
            "name": "2 Submit job without jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "hdfs://{host}:{port}/{path-to-file-on-hdfs}", nt"className": "{class-name}",nt"numExecutors":1,nt"name": "spark-test-2",nt"proxyUser": "0",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt},nt"args": [ntt"HADOOP_CONF_DIR=/opt/spark/hadoop-conf",ntt"MASTER=k8s://https://kubernetes.default.svc:8443"nt]n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        }
    ],
    "event": [
        {
            "listen": "prerequest",
            "script": {
                "id": "41bea1d0-278c-40c9-ad42-bf2e6268897d",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        },
        {
            "listen": "test",
            "script": {
                "id": "3cdd7736-a885-4a2d-9668-bd75798f4560",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        }
    ],
    "protocolProfileBehavior": {}
}

Нека изпълним първата заявка от колекцията, отидете на интерфейса на OKD и проверете дали задачата е стартирана успешно - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods. В същото време в интерфейса на Livy (http://{livy-url}/ui) ще се появи сесия, в рамките на която, като използвате API на Livy или графичен интерфейс, можете да наблюдавате напредъка на задачата и да изучавате регистрационни файлове на сесии.

Сега нека покажем как работи Livy. За да направите това, нека разгледаме регистрационните файлове на контейнера Livy вътре в pod със сървъра Livy - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods/{livy-pod-name }?tab=дневници. Можете да видите от тях, че при извикване на Livy REST API в контейнер с име „livy“ се изпълнява spark-submit, подобно на това, което използвахме по-горе (тук {livy-pod-name} е името на създадения pod със сървъра Livy). Колекцията предоставя и втора заявка, която ви позволява да изпълнявате задачи с отдалечено хостване на изпълнимия файл на Spark, като използвате сървъра Livy.

Трети случай на използване - Spark Operator

Сега, когато задачата е тествана, възниква въпросът за редовното й стартиране. Основният начин за редовно изпълнение на задачи в клъстер на Kubernetes е обектът CronJob и можете да го използвате, но в момента използването на оператори за управление на приложения в Kubernetes е много популярно и има доста зрял оператор за Spark, който сред други неща, се използва в решения на корпоративно ниво (например Lightbend FastData Platform). Препоръчваме да го използвате - текущата стабилна версия на Spark (2.4.5) има доста ограничени опции за конфигуриране на стартирането на задачите на Spark в Kubernetes, докато следващата основна версия (3.0.0) претендира за пълна поддръжка за Kubernetes, но датата на пускане остава неизвестен. Spark Operator компенсира това, като добавя важни опции за конфигуриране (като монтиране на ConfigMap с конфигурация за достъп на Hadoop към Spark Pods) и възможност за редовно изпълнение на планирана задача.

Изпълнение на Apache Spark на Kubernetes
Нека го отделим като трети случай на използване - редовно изпълняване на задачи на Spark на клъстер на Kubernetes в производствен цикъл.

Spark Operator е с отворен код и е разработен в рамките на Google Cloud Platform − github.com/GoogleCloudPlatform/spark-on-k8s-operator. Може да се инсталира по 3 начина:

  1. Като част от инсталирането на Lightbend FastData Platform/Cloudflow;
  2. С Хелмс:
    helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
    helm install incubator/sparkoperator --namespace spark-operator
    	

  3. Използване на манифести от официалното хранилище (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/tree/master/manifest). В същото време си струва да се отбележи следното - Cloudflow включва оператор с API версия v1beta1. Ако се използва този тип инсталация, описанията на манифеста на приложението на Spark трябва да се основават на примери от тагове в Git с подходящата версия на API, например „v1beta1-0.9.0-2.4.0“. Версията на оператора може да се види в описанието на CRD, който е част от оператора в речника "версии":
    oc get crd sparkapplications.sparkoperator.k8s.io -o yaml
    	

Ако операторът е инсталиран правилно, тогава съответният проект ще има активен Pod с оператора Spark (например cloudflow-fdp-sparkoperator в пространството Cloudflow за инсталиране на Cloudflow) и ще се появи съответният тип ресурс на Kubernetes, наречен „sparkapplications“. Можете да разгледате наличните приложения на Spark със следната команда:

oc get sparkapplications -n {project}

За да изпълнявате задачи с помощта на Spark Operator, трябва да направите 3 неща:

  • създайте Docker изображение, което включва всички необходими библиотеки, както и конфигурационни и изпълними файлове. В целевата картина това е изображение, създадено на етап CI / CD и тествано на тестов клъстер;
  • публикувайте изображението на Docker в регистър, достъпен от клъстера Kubernetes;
  • генерирайте манифест с тип "SparkApplication" и описание на задачата, която ще се стартира. Примерни манифести са налични в официалното хранилище (напр. github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta1-0.9.0-2.4.0/examples/spark-pi.yaml). Струва си да се отбележат важни моменти относно манифеста:
    1. речникът "apiVersion" трябва да съдържа версията на API, съответстваща на версията на оператора;
    2. речникът "metadata.namespace" трябва да съдържа пространството от имена, в което ще се стартира приложението;
    3. речникът "spec.image" трябва да съдържа адреса на създаденото изображение на Docker в наличния регистър;
    4. речникът "spec.mainClass" трябва да съдържа класа на задачата на Spark, която искате да изпълните, когато процесът стартира;
    5. речникът "spec.mainApplicationFile" трябва да съдържа пътя до изпълнимия jar файл;
    6. речникът "spec.sparkVersion" трябва да бъде използваната версия на Spark;
    7. речникът "spec.driver.serviceAccount" трябва да съдържа акаунта на услугата в съответното пространство от имена на Kubernetes, което ще се използва за изпълнение на приложението;
    8. речникът "spec.executor" трябва да показва количеството ресурси, разпределени за приложението;
    9. речникът "spec.volumeMounts" трябва да бъде настроен на локалната директория, където ще бъдат създадени локалните файлове със задачи на Spark.

Пример за генериране на манифест (тук {spark-service-account} е акаунт за услуга в рамките на Kubernetes клъстер за изпълнение на задачи на Spark):

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: {spark-service-account}
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Този манифест посочва акаунт за услуга, който изисква необходимите обвързвания на ролите да бъдат създадени преди публикуването на манифеста, за да предостави необходимите разрешения за приложението Spark да взаимодейства с Kubernetes API (ако е необходимо). В нашия случай приложението се нуждае от правата за създаване на Pods. Нека създадем необходимото обвързване на роли:

oc adm policy add-role-to-user edit system:serviceaccount:{project}:{spark-service-account} -n {project}

Също така си струва да се отбележи, че параметърът „hadoopConfigMap“ може да бъде указан в спецификацията на този манифест, което ви позволява да посочите ConfigMap с конфигурация на Hadoop, без да се налага първо да поставите съответния файл в изображението на Docker. Подходящ е и за редовно стартиране на задачи - с помощта на параметъра "разписание" може да се посочи графикът за стартиране на тази задача.

След това запазваме нашия манифест във файла spark-pi.yaml и го прилагаме към нашия Kubernetes клъстер:

oc apply -f spark-pi.yaml

Това ще създаде обект от тип "sparkapplications":

oc get sparkapplications -n {project}
> NAME       AGE
> spark-pi   22h

Това ще създаде под с приложение, чийто статус ще се показва в създадените „sparkapplications“. Може да се види със следната команда:

oc get sparkapplications spark-pi -o yaml -n {project}

След завършване на задачата POD ще премине в статус „Завършен“, който също ще бъде актуализиран в „sparkapplications“. Регистрационните файлове на приложенията могат да бъдат прегледани в браузъра или със следната команда (тук {sparkapplications-pod-name} е името на текущата група задачи):

oc logs {sparkapplications-pod-name} -n {project}

Освен това задачите на Spark могат да се управляват с помощта на специализираната помощна програма sparkctl. За да го инсталираме, ние клонираме хранилището с неговия изходен код, инсталираме Go и изграждаме тази помощна програма:

git clone https://github.com/GoogleCloudPlatform/spark-on-k8s-operator.git
cd spark-on-k8s-operator/
wget https://dl.google.com/go/go1.13.3.linux-amd64.tar.gz
tar -xzf go1.13.3.linux-amd64.tar.gz
sudo mv go /usr/local
mkdir $HOME/Projects
export GOROOT=/usr/local/go
export GOPATH=$HOME/Projects
export PATH=$GOPATH/bin:$GOROOT/bin:$PATH
go -version
cd sparkctl
go build -o sparkctl
sudo mv sparkctl /usr/local/bin

Нека разгледаме списъка с изпълнявани задачи на Spark:

sparkctl list -n {project}

Нека създадем описание за задачата на Spark:

vi spark-app.yaml

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1000m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Нека изпълним описаната задача с помощта на sparkctl:

sparkctl create spark-app.yaml -n {project}

Нека разгледаме списъка с изпълнявани задачи на Spark:

sparkctl list -n {project}

Нека разгледаме списъка със събития на изпълнявана задача на Spark:

sparkctl event spark-pi -n {project} -f

Нека разгледаме състоянието на изпълнявана задача на Spark:

sparkctl status spark-pi -n {project}

В заключение, бих искал да разгледам откритите недостатъци на работата на текущата стабилна версия на Spark (2.4.5) в Kubernetes:

  1. Първият и може би основен недостатък е липсата на Data Locality. Въпреки всички недостатъци на YARN, имаше плюсове в използването му, например принципът на доставяне на код към данни (а не данни към код). Благодарение на него задачите на Spark бяха изпълнени на възлите, където се намираха данните, включени в изчисленията, и времето за доставяне на данни в мрежата беше значително намалено. Когато използваме Kubernetes, ние сме изправени пред необходимостта да преместим през мрежата данните, включени в работата на задачата. Ако те са достатъчно големи, тогава времето за изпълнение на задачата може да се увеличи значително и е необходимо достатъчно голямо количество дисково пространство да бъде разпределено на екземплярите на задачи на Spark за тяхното временно съхранение. Този недостатък може да бъде намален чрез използване на специализирани софтуерни инструменти, които осигуряват локалност на данните в Kubernetes (например Alluxio), но това всъщност означава необходимостта от съхраняване на пълно копие на данните на възлите на Kubernetes клъстера.
  2. Вторият основен недостатък е сигурността. По подразбиране функциите, свързани със сигурността по отношение на изпълнение на задачи на Spark, са деактивирани, използването на Kerberos не е обхванато в официалната документация (въпреки че съответните опции се появиха във версия 3.0.0, което ще изисква допълнителна работа) и в документацията за сигурност, когато при използване на Spark (https://spark.apache.org/docs/2.4.5/security.html) само YARN, Mesos и Standalone Cluster се показват като хранилища за ключове. В същото време потребителят, под който се стартират задачите на Spark, не може да бъде посочен директно - ние само задаваме акаунта на услугата, под който ще работи, и потребителят се избира въз основа на конфигурираните политики за сигурност. В това отношение се използва или root потребител, което не е безопасно в продуктивна среда, или потребител с произволен UID, което е неудобно при разпределяне на права за достъп до данни (решено чрез създаване на PodSecurityPolicies и свързването им със съответните акаунти за услуги) . В момента решението е или да поставите всички необходими файлове директно в изображението на Docker, или да промените скрипта за стартиране на Spark, за да използвате механизма за съхранение и извличане на тайни, приети във вашата организация.
  3. Изпълнението на задачи на Spark с Kubernetes все още е официално в експериментален режим и може да има значителни промени в използваните артефакти (конфигурационни файлове, базови изображения на Docker и скриптове за стартиране) в бъдеще. И наистина - при подготовката на материала бяха тествани версии 2.3.0 и 2.4.5, поведението беше значително различно.

Нека изчакаме актуализации - наскоро беше пусната нова версия на Spark (3.0.0), която донесе осезаеми промени в работата на Spark на Kubernetes, но запази експерименталния статус на поддръжка за този мениджър на ресурси. Може би следващите актуализации наистина ще направят възможно напълно да препоръчаме изоставянето на YARN и изпълнението на задачи на Spark на Kubernetes, без да се страхувате за сигурността на вашата система и без да е необходимо сами да прецизирате функционалните компоненти.

Фин.

Източник: www.habr.com

Добавяне на нов коментар