Запускаємо Apache Spark на Kubernetes

Дорогі читачі, добрий день. Сьогодні поговоримо трохи про Apache Spark та його перспективи розвитку.

Запускаємо Apache Spark на Kubernetes

У світі Big Data Apache Spark є де факто стандартом розробки завдань пакетної обробки даних. Крім цього, він також використовується для створення стрімінгових додатків, що працюють у концепції micro batch, що обробляють та відвантажують дані маленькими порціями (Spark Structured Streaming). І традиційно він був частиною загального стека Hadoop, використовуючи як менеджер ресурсів YARN (або, у деяких випадках, Apache Mesos). До 2020 року його використання в традиційному вигляді для більшості компаній знаходиться під великим питанням через відсутність пристойних дистрибутивів Hadoop - розвиток HDP і CDH зупинено, CDH недостатньо опрацьовано і має високу вартість, а решта постачальників Hadoop або припинила своє існування або має туманне майбутнє. Тому все більший інтерес у спільноти та великих компаній викликає запуск Apache Spark за допомогою Kubernetes — став стандартом в оркестрації контейнерів та управлінні ресурсами в приватних та публічних хмарах, він вирішує проблему з незручним плануванням ресурсів завдань Spark на YARN і надає платформу, що стабільно розвивається, з безліччю комерційних та відкритих дистрибутивів для компаній усіх розмірів та мастей. До того ж на хвилі популярності більшість вже встигла обзавестися парою-трійкою своїх інсталяцій та наростити експертизу у його використанні, що спрощує переїзд.

Починаючи з версії 2.3.0 Apache Spark обзавівся офіційною підтримкою запуску завдань у кластері Kubernetes і сьогодні, ми поговоримо про поточну зрілість даного підходу, різні варіанти його використання і підводні камені, з якими доведеться зіткнутися при впровадженні.

Перш за все, розглянемо процес розробки завдань та програм на базі Apache Spark і виділимо типові випадки, в яких потрібно запустити завдання на кластері Kubernetes. Під час підготовки даного поста як дистрибутив використовується OpenShift і будуть наведені команди, актуальні для його утиліти командного рядка (oc). Для інших дистрибутивів Kubernetes можуть бути використані відповідні команди стандартної утиліти командного рядка Kubernetes (kubectl) або їх аналоги (наприклад, для oc adm policy).

Перший варіант використання - spark-submit

У процесі розробки завдань та програм розробнику потрібно запускати завдання для налагодження трансформації даних. Теоретично для цих цілей можуть бути використані заглушки, але розробка за участю реальних (нехай і тестових) екземплярів кінцевих систем показала себе в цьому класі завдань швидше та якісніше. У тому випадку, коли ми робимо налагодження на реальних примірниках кінцевих систем, можливі два сценарії роботи:

  • розробник запускає завдання Spark локально як standalone;

    Запускаємо Apache Spark на Kubernetes

  • розробник запускає завдання Spark на кластері Kubernetes у тестовому контурі.

    Запускаємо Apache Spark на Kubernetes

Перший варіант має право на існування, але тягне за собою низку недоліків:

  • для кожного розробника потрібно забезпечити доступ з робочого місця до всіх необхідних екземплярів кінцевих систем;
  • на робочій машині потрібна достатня кількість ресурсів для запуску задачі, що розробляється.

Другий варіант позбавлений даних недоліків, оскільки використання кластера Kubernetes дозволяє виділити необхідний пул ресурсів для запуску завдань та забезпечити йому необхідні доступи до екземплярів кінцевих систем, гнучко надаючи до нього доступ за допомогою рольової моделі Kubernetes для всіх членів команди розробки. Виділимо його як перший варіант використання - запуск завдань Spark з локальної машини розробника на кластері Kubernetes у тестовому контурі.

Розповімо докладніше про процес налаштування Spark для локального запуску. Щоб почати користуватися Spark, його потрібно встановити:

mkdir /opt/spark
cd /opt/spark
wget http://mirror.linux-ia64.org/apache/spark/spark-2.4.5/spark-2.4.5.tgz
tar zxvf spark-2.4.5.tgz
rm -f spark-2.4.5.tgz

Збираємо необхідні пакети для роботи з Kubernetes:

cd spark-2.4.5/
./build/mvn -Pkubernetes -DskipTests clean package

Повне складання займає багато часу, а для створення образів Docker та їх запуску на кластері Kubernetes насправді потрібні лише jar файли з директорії «assembly/», тому можна зібрати лише даний підпроект:

./build/mvn -f ./assembly/pom.xml -Pkubernetes -DskipTests clean package

Для запуску завдань Spark у Kubernetes потрібно створити образ Docker, який використовуватиметься як базовий. Тут можливі 2 підходи:

  • Створений образ Docker включає виконуваний код завдання Spark;
  • Створений образ включає тільки Spark і необхідні залежності, виконуваний код розміщується віддалено (наприклад, в HDFS).

Для початку зберемо образ Docker, що містить тестовий приклад завдання Spark. Для створення образів Docker Spark має відповідну утиліту під назвою «docker-image-tool». Вивчимо по ній довідку:

./bin/docker-image-tool.sh --help

З її допомогою можна створювати образи Docker та здійснювати їх завантаження у віддалені реєстри, але за умовчанням вона має низку недоліків:

  • в обов'язковому порядку створює відразу 3 образи Docker - під Spark, PySpark та R;
  • не дозволяє вказати ім'я образу.

Тому ми будемо використовувати модифікований варіант цієї утиліти, наведений нижче:

vi bin/docker-image-tool-upd.sh

#!/usr/bin/env bash

function error {
  echo "$@" 1>&2
  exit 1
}

if [ -z "${SPARK_HOME}" ]; then
  SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
. "${SPARK_HOME}/bin/load-spark-env.sh"

function image_ref {
  local image="$1"
  local add_repo="${2:-1}"
  if [ $add_repo = 1 ] && [ -n "$REPO" ]; then
    image="$REPO/$image"
  fi
  if [ -n "$TAG" ]; then
    image="$image:$TAG"
  fi
  echo "$image"
}

function build {
  local BUILD_ARGS
  local IMG_PATH

  if [ ! -f "$SPARK_HOME/RELEASE" ]; then
    IMG_PATH=$BASEDOCKERFILE
    BUILD_ARGS=(
      ${BUILD_PARAMS}
      --build-arg
      img_path=$IMG_PATH
      --build-arg
      datagram_jars=datagram/runtimelibs
      --build-arg
      spark_jars=assembly/target/scala-$SPARK_SCALA_VERSION/jars
    )
  else
    IMG_PATH="kubernetes/dockerfiles"
    BUILD_ARGS=(${BUILD_PARAMS})
  fi

  if [ -z "$IMG_PATH" ]; then
    error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
  fi

  if [ -z "$IMAGE_REF" ]; then
    error "Cannot find docker image reference. Please add -i arg."
  fi

  local BINDING_BUILD_ARGS=(
    ${BUILD_PARAMS}
    --build-arg
    base_img=$(image_ref $IMAGE_REF)
  )
  local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/docker/Dockerfile"}

  docker build $NOCACHEARG "${BUILD_ARGS[@]}" 
    -t $(image_ref $IMAGE_REF) 
    -f "$BASEDOCKERFILE" .
}

function push {
  docker push "$(image_ref $IMAGE_REF)"
}

function usage {
  cat <<EOF
Usage: $0 [options] [command]
Builds or pushes the built-in Spark Docker image.

Commands:
  build       Build image. Requires a repository address to be provided if the image will be
              pushed to a different registry.
  push        Push a pre-built image to a registry. Requires a repository address to be provided.

Options:
  -f file               Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
  -p file               Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark.
  -R file               Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark.
  -r repo               Repository address.
  -i name               Image name to apply to the built image, or to identify the image to be pushed.  
  -t tag                Tag to apply to the built image, or to identify the image to be pushed.
  -m                    Use minikube's Docker daemon.
  -n                    Build docker image with --no-cache
  -b arg      Build arg to build or push the image. For multiple build args, this option needs to
              be used separately for each build arg.

Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
available when running applications inside the minikube cluster.

Check the following documentation for more information on using the minikube Docker daemon:

  https://kubernetes.io/docs/getting-started-guides/minikube/#reusing-the-docker-daemon

Examples:
  - Build image in minikube with tag "testing"
    $0 -m -t testing build

  - Build and push image with tag "v2.3.0" to docker.io/myrepo
    $0 -r docker.io/myrepo -t v2.3.0 build
    $0 -r docker.io/myrepo -t v2.3.0 push
EOF
}

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
  usage
  exit 0
fi

REPO=
TAG=
BASEDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
IMAGE_REF=
while getopts f:mr:t:nb:i: option
do
 case "${option}"
 in
 f) BASEDOCKERFILE=${OPTARG};;
 r) REPO=${OPTARG};;
 t) TAG=${OPTARG};;
 n) NOCACHEARG="--no-cache";;
 i) IMAGE_REF=${OPTARG};;
 b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};;
 esac
done

case "${@: -1}" in
  build)
    build
    ;;
  push)
    if [ -z "$REPO" ]; then
      usage
      exit 1
    fi
    push
    ;;
  *)
    usage
    exit 1
    ;;
esac

З її допомогою збираємо базовий образ Spark, що містить тестове завдання для обчислення числа Pi за допомогою Spark (тут {docker-registry-url} — URL вашого реєстру образів Docker, {repo} — ім'я репозиторія всередині реєстру, що збігається з проектом в OpenShift , {image-name} - ім'я образу (якщо використовується трирівневий поділ образів, наприклад, як інтегрований реєстр образів Red Hat OpenShift), {tag} - тег даної версії образу):

./bin/docker-image-tool-upd.sh -f resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile -r {docker-registry-url}/{repo} -i {image-name} -t {tag} build

Авторизуємося в кластері OKD за допомогою консольної утиліти (тут {OKD-API-URL} — URL API кластера OKD):

oc login {OKD-API-URL}

Отримаємо токен поточного користувача для авторизації в Docker Registry:

oc whoami -t

Авторизуємося у внутрішньому Docker Registry кластера OKD (як пароль використовуємо токен, отриманий за допомогою попередньої команди):

docker login {docker-registry-url}

Завантажимо зібраний образ Docker у Docker Registry OKD:

./bin/docker-image-tool-upd.sh -r {docker-registry-url}/{repo} -i {image-name} -t {tag} push

Перевіримо, що зібраний образ доступний OKD. Для цього відкриємо у браузері URL зі списком образів відповідного проекту (тут {project} - ім'я проекту всередині кластера OpenShift, {OKD-WEBUI-URL} - URL Web консолі OpenShift) - https://{OKD-WEBUI-URL}/console /project/{project}/browse/images/{image-name}.

Для запуску завдань повинен бути створений сервісний обліковий запис з привілеями запуску подов під root (надалі обговоримо цей момент):

oc create sa spark -n {project}
oc adm policy add-scc-to-user anyuid -z spark -n {project}

Виконаємо команду spark-submit для публікації завдання Spark у кластері OKD, вказавши створений сервісний обліковий запис та образ Docker:

 /opt/spark/bin/spark-submit --name spark-test --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar

тут:

-name - Ім'я завдання, яке братиме участь у формуванні імені подів Kubernetes;

-class - клас виконуваного файлу, що викликається при запуску завдання;

-conf - конфігураційні параметри Spark;

spark.executor.instances — кількість екзекуторів Spark, що запускаються;

spark.kubernetes.authenticate.driver.serviceAccountName — ім'я службового облікового запису Kubernetes, що використовується при запуску подів (для визначення контексту безпеки та можливостей при взаємодії з API Kubernetes);

spark.kubernetes.namespace - простір імен Kubernetes, в якому будуть запускатися поди драйвера та екз'ютерів;

spark.submit.deployMode - спосіб запуску Spark (для стандартного spark-submit використовується "cluster", для Spark Operator і пізніших версій Spark "client");

spark.kubernetes.container.image - образ Docker, що використовується для запуску подів;

spark.master - URL API Kubernetes (вказується зовнішній так звернення походить з локальної машини);

local:// — шлях до файлу Spark, що виконується, всередині образу Docker.

Переходимо у відповідний проект OKD та вивчаємо створені поди - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods.

Для спрощення процесу розробки може бути використаний ще один варіант, при якому створюється загальний базовий образ Spark, який використовується всіма завданнями для запуску, а снепшоти файлів, що виконуються, публікуються в зовнішнє сховище (наприклад, Hadoop) і вказуються при викликі spark-submit у вигляді посилання. У цьому випадку можна запускати різні версії завдань Spark без перескладання образів Docker, використовуючи для публікації образів, наприклад WebHDFS. Відправляємо запит на створення файлу (тут {host} – хост сервісу WebHDFS, {port} – порт сервісу WebHDFS, {path-to-file-on-hdfs} – бажаний шлях до файлу на HDFS):

curl -i -X PUT "http://{host}:{port}/webhdfs/v1/{path-to-file-on-hdfs}?op=CREATE

При цьому буде отримано відповідь виду (тут {location} – це URL, яку потрібно використовувати для завантаження файлу):

HTTP/1.1 307 TEMPORARY_REDIRECT
Location: {location}
Content-Length: 0

Завантажуємо файл Spark, що виконується, в HDFS (тут {path-to-local-file} — шлях до виконуваного файлу Spark на поточному хості):

curl -i -X PUT -T {path-to-local-file} "{location}"

Після цього можемо робити spark-submit з використанням файлу Spark, завантаженого на HDFS (тут {class-name} - ім'я класу, який потрібно запустити для виконання завдання):

/opt/spark/bin/spark-submit --name spark-test --class {class-name} --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  hdfs://{host}:{port}/{path-to-file-on-hdfs}

При цьому слід зазначити, що для доступу до HDFS і забезпечення роботи завдання може знадобитися змінити Dockerfile і скрипт entrypoint.sh - додати в Dockerfile директиву для копіювання залежних бібліотек директорію /opt/spark/jars і включити файл конфігурації HDFS в SPARK_CLASSPATH в entrypoint. sh.

Другий варіант використання - Apache Livy

Далі, коли завдання розроблено і потрібно протестувати отриманий результат, виникає питання її запуску у рамках процесу CI/CD та відстеження статусів її виконання. Звичайно, можна запускати її і за допомогою локального виклику spark-submit, але це ускладнює інфраструктуру CI/CD, оскільки вимагає встановлення та конфігурацію Spark на агентах/раннерах CI сервера та налаштування доступу до API Kubernetes. Для цього цільової реалізацією вибрано використання Apache Livy як REST API для запуску завдань Spark, розміщеного всередині кластера Kubernetes. З його допомогою можна запускати завдання Spark на кластері Kubernetes, використовуючи звичайні cURL запити, що легко реалізується на базі будь-якого CI рішення, а його розміщення всередині кластера Kubernetes вирішує питання автентифікації при взаємодії з API Kubernetes.

Запускаємо Apache Spark на Kubernetes

Виділимо його як другий варіант використання - запуск завдань Spark в рамках процесу CI/CD на кластері Kubernetes в тестовому контурі.

Трохи про Apache Livy - він працює як HTTP сервер, що надає Web інтерфейс і RESTful API, що дозволяє віддалено запустити spark-submit, передавши необхідні параметри. Традиційно він поставлявся у складі дистрибутива HDP, але також може бути розгорнутий в OKD або будь-якій інсталяції Kubernetes за допомогою відповідного маніфесту та набору образів Docker, наприклад цього github.com/ttauveron/k8s-big-data-experiments/tree/master/livy-spark-2.3. Для нашого випадку був зібраний аналогічний образ Docker, що включає Spark версії 2.4.5 з наступного Dockerfile:

FROM java:8-alpine

ENV SPARK_HOME=/opt/spark
ENV LIVY_HOME=/opt/livy
ENV HADOOP_CONF_DIR=/etc/hadoop/conf
ENV SPARK_USER=spark

WORKDIR /opt

RUN apk add --update openssl wget bash && 
    wget -P /opt https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz && 
    tar xvzf spark-2.4.5-bin-hadoop2.7.tgz && 
    rm spark-2.4.5-bin-hadoop2.7.tgz && 
    ln -s /opt/spark-2.4.5-bin-hadoop2.7 /opt/spark

RUN wget http://mirror.its.dal.ca/apache/incubator/livy/0.7.0-incubating/apache-livy-0.7.0-incubating-bin.zip && 
    unzip apache-livy-0.7.0-incubating-bin.zip && 
    rm apache-livy-0.7.0-incubating-bin.zip && 
    ln -s /opt/apache-livy-0.7.0-incubating-bin /opt/livy && 
    mkdir /var/log/livy && 
    ln -s /var/log/livy /opt/livy/logs && 
    cp /opt/livy/conf/log4j.properties.template /opt/livy/conf/log4j.properties

ADD livy.conf /opt/livy/conf
ADD spark-defaults.conf /opt/spark/conf/spark-defaults.conf
ADD entrypoint.sh /entrypoint.sh

ENV PATH="/opt/livy/bin:${PATH}"

EXPOSE 8998

ENTRYPOINT ["/entrypoint.sh"]
CMD ["livy-server"]

Створений образ може бути зібраний і завантажений у репозиторій Docker, що є у вас, наприклад, внутрішній репозиторій OKD. Для його розгортання використовується наступний маніфест ({registry-url} – URL реєстру образів Docker, {image-name} – ім'я образу Docker, {tag} – тег образу Docker, {livy-url} – бажаний URL, за яким буде доступний сервер Livy, маніфест Route застосовується у випадку, якщо в якості дистрибутива Kubernetes використовується Red Hat OpenShift, в іншому випадку використовується відповідний маніфест Ingress або Service типу NodePort):

---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: livy
  name: livy
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      component: livy
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        component: livy
    spec:
      containers:
        - command:
            - livy-server
          env:
            - name: K8S_API_HOST
              value: localhost
            - name: SPARK_KUBERNETES_IMAGE
              value: 'gnut3ll4/spark:v1.0.14'
          image: '{registry-url}/{image-name}:{tag}'
          imagePullPolicy: Always
          name: livy
          ports:
            - containerPort: 8998
              name: livy-rest
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - mountPath: /var/log/livy
              name: livy-log
            - mountPath: /opt/.livy-sessions/
              name: livy-sessions
            - mountPath: /opt/livy/conf/livy.conf
              name: livy-config
              subPath: livy.conf
            - mountPath: /opt/spark/conf/spark-defaults.conf
              name: spark-config
              subPath: spark-defaults.conf
        - command:
            - /usr/local/bin/kubectl
            - proxy
            - '--port'
            - '8443'
          image: 'gnut3ll4/kubectl-sidecar:latest'
          imagePullPolicy: Always
          name: kubectl
          ports:
            - containerPort: 8443
              name: k8s-api
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      serviceAccount: spark
      serviceAccountName: spark
      terminationGracePeriodSeconds: 30
      volumes:
        - emptyDir: {}
          name: livy-log
        - emptyDir: {}
          name: livy-sessions
        - configMap:
            defaultMode: 420
            items:
              - key: livy.conf
                path: livy.conf
            name: livy-config
          name: livy-config
        - configMap:
            defaultMode: 420
            items:
              - key: spark-defaults.conf
                path: spark-defaults.conf
            name: livy-config
          name: spark-config
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: livy-config
data:
  livy.conf: |-
    livy.spark.deploy-mode=cluster
    livy.file.local-dir-whitelist=/opt/.livy-sessions/
    livy.spark.master=k8s://http://localhost:8443
    livy.server.session.state-retain.sec = 8h
  spark-defaults.conf: 'spark.kubernetes.container.image        "gnut3ll4/spark:v1.0.14"'
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: livy
  name: livy
spec:
  ports:
    - name: livy-rest
      port: 8998
      protocol: TCP
      targetPort: 8998
  selector:
    component: livy
  sessionAffinity: None
  type: ClusterIP
---
apiVersion: route.openshift.io/v1
kind: Route
metadata:
  labels:
    app: livy
  name: livy
spec:
  host: {livy-url}
  port:
    targetPort: livy-rest
  to:
    kind: Service
    name: livy
    weight: 100
  wildcardPolicy: None

Після його застосування та успішного запуску поданий графічний інтерфейс Livy доступний за посиланням: http://{livy-url}/ui. За допомогою Livy ми можемо опублікувати наше завдання Spark, використовуючи запит REST, наприклад, з Postman. Приклад колекції із запитами представлений нижче (у масиві «args» можуть бути передані конфігураційні аргументи зі змінними, необхідними для роботи завдання, що запускається):

{
    "info": {
        "_postman_id": "be135198-d2ff-47b6-a33e-0d27b9dba4c8",
        "name": "Spark Livy",
        "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
    },
    "item": [
        {
            "name": "1 Submit job with jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar", nt"className": "org.apache.spark.examples.SparkPi",nt"numExecutors":1,nt"name": "spark-test-1",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt}n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        },
        {
            "name": "2 Submit job without jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "hdfs://{host}:{port}/{path-to-file-on-hdfs}", nt"className": "{class-name}",nt"numExecutors":1,nt"name": "spark-test-2",nt"proxyUser": "0",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt},nt"args": [ntt"HADOOP_CONF_DIR=/opt/spark/hadoop-conf",ntt"MASTER=k8s://https://kubernetes.default.svc:8443"nt]n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        }
    ],
    "event": [
        {
            "listen": "prerequest",
            "script": {
                "id": "41bea1d0-278c-40c9-ad42-bf2e6268897d",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        },
        {
            "listen": "test",
            "script": {
                "id": "3cdd7736-a885-4a2d-9668-bd75798f4560",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        }
    ],
    "protocolProfileBehavior": {}
}

Виконаємо перший запит із колекції, перейдемо в інтерфейс OKD та перевіримо, що завдання успішно запущено — https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods. При цьому в інтерфейсі Livy (http://{livy-url}/ui) з'явиться сесія, в рамках якої за допомогою API Livy або графічного інтерфейсу можна відслідковувати хід виконання завдання та вивчати сесії.

Тепер покажемо механізм роботи Livy. Для цього вивчимо журнали контейнера Livy всередині пода з сервером Livy - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods/{livy-pod-name}?tab=logs. З них видно, що при виклику REST API Livy в контейнері з ім'ям «livy» виконується spark-submit, аналогічний вище, що використовується (тут {livy-pod-name} — ім'я створеного пода з сервером Livy). У колекції також представлений другий запит, що дозволяє запускати завдання з віддаленим розміщенням файлу Spark, що виконується, за допомогою сервера Livy.

Третій варіант використання - Spark Operator

Тепер, коли завдання протестовано, постає питання її регулярного запуску. Нативним способом для регулярного запуску завдань у кластері Kubernetes є сутність CronJob і можна використовувати її, але в даний момент велику популярність має використання операторів для керування додатками в Kubernetes та для Spark існує досить зрілий оператор, який, у тому числі, використовується в рішеннях Enterprise рівня (наприклад, Lightbend FastData Platform). Ми рекомендуємо використовувати його — поточна стабільна версія Spark (2.4.5) має досить обмежені можливості конфігурації запуску завдань Spark у Kubernetes, при цьому в наступній мажорній версії (3.0.0) заявлено повноцінну підтримку Kubernetes, але дата її виходу залишається невідомою. Spark Operator компенсує цей недолік, додаючи важливі параметри конфігурації (наприклад, монтування ConfigMap з конфігурацією доступу до Hadoop у поди Spark) та можливість регулярного запуску завдання за розкладом.

Запускаємо Apache Spark на Kubernetes
Виділимо його як третій варіант використання - регулярний запуск завдань Spark на кластері Kubernetes у продуктивному контурі.

Spark Operator має відкритий вихідний код і розробляється в рамках Google Cloud Platform. github.com/GoogleCloudPlatform/spark-on-k8s-operator. Його установка може бути виконана 3 способами:

  1. У рамках встановлення Lightbend FastData Platform/Cloudflow;
  2. За допомогою Helm:
    helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
    helm install incubator/sparkoperator --namespace spark-operator
    	

  3. Застосування маніфестів з офіційного репозиторію (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/tree/master/manifest). При цьому варто зазначити таке: до складу Cloudflow входить оператор з версією API v1beta1. Якщо використовується цей тип установки, то описи маніфестів додатків Spark повинні будуватися на основі прикладів з тегів Git з відповідною версією API, наприклад, v1beta1-0.9.0-2.4.0. Версію оператора можна переглянути в описі CRD, що входить до складу оператора у словнику «versions»:
    oc get crd sparkapplications.sparkoperator.k8s.io -o yaml
    	

Якщо оператор встановлено коректно, то у відповідному проекті з'явиться активний під оператором Spark (наприклад, cloudflow-fdp-sparkoperator у просторі Cloudflow для встановлення Cloudflow) і з'явиться відповідний тип ресурсів Kubernetes з ім'ям «sparkapplications». Вивчити наявні програми Spark можна наступною командою:

oc get sparkapplications -n {project}

Для запуску завдань за допомогою Spark Operator потрібно зробити 3 речі:

  • створити образ Docker, що включає всі необхідні бібліотеки, а також конфігураційні і виконувані файли. У цільовій картині це образ, створений на етапі CI/CD та протестований на тестовому кластері;
  • опублікувати образ Docker у реєстр, доступний із кластера Kubernetes;
  • сформувати маніфест з типом «SparkApplication» та описом завдання, що запускається. Приклади маніфестів доступні в офіційному репозиторії (наприклад, github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta1-0.9.0-2.4.0/examples/spark-pi.yaml). Варто відзначити важливі моменти щодо маніфесту:
    1. у словнику «apiVersion» має бути вказана версія API, що відповідає версії оператора;
    2. у словнику «metadata.namespace» має бути вказано простір імен, у якому буде запущено програму;
    3. у словнику «spec.image» має бути вказана адреса створеного образу Docker у доступному реєстрі;
    4. у словнику «spec.mainClass» має бути вказано клас завдання Spark, який потрібно запустити під час запуску процесу;
    5. у словнику «spec.mainApplicationFile» має бути вказано шлях до виконуваного jar файлу;
    6. у словнику «spec.sparkVersion» повинна бути вказана версія Spark;
    7. у словнику «spec.driver.serviceAccount» має бути зазначений сервісний обліковий запис усередині відповідного простору імен Kubernetes, який буде використаний для запуску програми;
    8. у словнику «spec.executor» має бути зазначено кількість ресурсів, що виділяються додатком;
    9. у словнику «spec.volumeMounts» має бути вказана локальна директорія, у якій створюватимуться локальні файли завдання Spark.

Приклад формування маніфесту (тут {spark-service-account} — сервісний обліковий запис усередині кластера Kubernetes для запуску завдань Spark):

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: {spark-service-account}
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

У цьому маніфесті вказано сервісний обліковий запис, для якого потрібно до публікації маніфесту створити необхідні прив'язки ролей, які надають необхідні права доступу для взаємодії програми Spark з API Kubernetes (якщо потрібно). У нашому випадку додатку потрібні права створення Pod'ов. Створимо необхідну прив'язку ролі:

oc adm policy add-role-to-user edit system:serviceaccount:{project}:{spark-service-account} -n {project}

Також варто зазначити, що в специфікації даного маніфесту може бути вказано параметр «hadoopConfigMap», який дозволяє вказати ConfigMap з конфігурацією Hadoop без необхідності попереднього розміщення файлу в образі Docker. Також він підходить для регулярного запуску завдань - за допомогою параметра "schedule" може бути вказано розклад запуску цієї задачі.

Після цього зберігаємо наш маніфест у файл spark-pi.yaml і застосовуємо його до нашого кластера Kubernetes:

oc apply -f spark-pi.yaml

При цьому буде створено об'єкт типу «sparkapplications»:

oc get sparkapplications -n {project}
> NAME       AGE
> spark-pi   22h

При цьому буде створено під додатком, статус якого відображатиметься у створеному «sparkapplications». Його можна подивитися наступною командою:

oc get sparkapplications spark-pi -o yaml -n {project}

Після завершення завдання POD перейде в статус Completed, який також оновиться в sparkapplications. Логи програми можна переглянути в браузері або за допомогою наступної команди (тут {sparkapplications-pod-name} — ім'я поданого завдання):

oc logs {sparkapplications-pod-name} -n {project}

Також керування завданнями Spark може бути здійснене за допомогою спеціалізованої утиліти sparkctl. Для її встановлення клонуємо репозиторій з її вихідним кодом, встановимо Go та зберемо цю утиліту:

git clone https://github.com/GoogleCloudPlatform/spark-on-k8s-operator.git
cd spark-on-k8s-operator/
wget https://dl.google.com/go/go1.13.3.linux-amd64.tar.gz
tar -xzf go1.13.3.linux-amd64.tar.gz
sudo mv go /usr/local
mkdir $HOME/Projects
export GOROOT=/usr/local/go
export GOPATH=$HOME/Projects
export PATH=$GOPATH/bin:$GOROOT/bin:$PATH
go -version
cd sparkctl
go build -o sparkctl
sudo mv sparkctl /usr/local/bin

Вивчимо список запущених завдань Spark:

sparkctl list -n {project}

Створимо опис для завдання Spark:

vi spark-app.yaml

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1000m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Запустимо описану задачу за допомогою sparkctl:

sparkctl create spark-app.yaml -n {project}

Вивчимо список запущених завдань Spark:

sparkctl list -n {project}

Вивчимо список подій запущеного завдання Spark:

sparkctl event spark-pi -n {project} -f

Вивчимо статус запущеного завдання Spark:

sparkctl status spark-pi -n {project}

Насамкінець хотілося б розглянути виявлені мінуси експлуатації поточної стабільної версії Spark (2.4.5) у Kubernetes:

  1. Перший і, мабуть, головний мінус - відсутність Data Locality. Незважаючи на всі недоліки YARN були і плюси у його використанні, наприклад, принцип доставки коду даних (а не даних коду). Завдяки йому завдання Spark виконувались на вузлах, де розташовувалися дані, що беруть участь у обчисленнях, і помітно зменшувався час доставки даних по мережі. При використанні Kubernetes ми стикаємося з необхідністю переміщення мережею даних, задіяних у роботі завдання. У випадку, якщо вони досить великі, то час виконання завдання може суттєво збільшитись, а також знадобитися досить великий обсяг дискового простору, виділеного екземплярам завдання Spark для їх тимчасового зберігання. Цей недолік може бути знижений за рахунок використання спеціалізованих програмних засобів, що забезпечують локальність даних у Kubernetes (наприклад Alluxio), але це фактично означає необхідність зберігання повної копії даних на вузлах кластера Kubernetes.
  2. Другий важливий мінус – це безпека. За замовчуванням функції, пов'язані із забезпеченням безпеки щодо запуску завдань Spark відключені, варіант використання Kerberos в офіційній документації не охоплений (хоча відповідні параметри з'явилися у версії 3.0.0, що вимагатиме додаткового опрацювання), а в документації щодо безпеки при використанні Spark (https http://spark.apache.org/docs/2.4.5/security.html) як сховищ ключів фігурують тільки YARN, Mesos і Standalone Cluster. При цьому користувач, під яким запускаються завдання Spark, не може бути вказаний безпосередньо - ми лише задаємо сервісний обліковий запис, під яким буде працювати під, а користувач вибирається виходячи з налаштованих політик безпеки. У зв'язку з цим або використовується користувач root, що не є безпечним у продуктивному оточенні, або користувач з випадковим UID, що незручно при розподілі прав доступу до даних (вирішується створенням PodSecurityPolicies та їх прив'язкою до відповідних службових облікових записів). На даний момент вирішується або розміщенням всіх необхідних файлів безпосередньо в образі Docker, або модифікацією скрипту запуску Spark для використання механізму зберігання та отримання секретів, прийнятого у Вашій організації.
  3. Запуск завдань Spark за допомогою Kubernetes офіційно досі перебуває в експериментальному режимі і в майбутньому можливі значні зміни у артефактах, що використовуються (конфігураційних файлах, базових образів Docker і скриптах запуску). І справді — під час підготовки матеріалу тестувалися версії 2.3.0 та 2.4.5, поведінка суттєво відрізнялася.

Чекатимемо оновлень - нещодавно вийшла свіжа версія Spark (3.0.0), що принесла відчутні зміни в роботу Spark на Kubernetes, але зберегла експериментальний статус підтримки даного менеджера ресурсів. Можливо, наступні оновлення справді дозволять повною мірою рекомендувати відмовитися від YARN та запускати завдання Spark на Kubernetes, не побоюючись за безпеку Вашої системи та без необхідності самостійного доопрацювання функціональних компонентів.

Плавник

Джерело: habr.com

Додати коментар або відгук