Запускаем Apache Spark на Kubernetes

Дарагія чытачы, добрага дня. Сёння пагаворым крыху пра Apache Spark і яго перспектывы развіцця.

Запускаем Apache Spark на Kubernetes

У сучасным свеце Big Data Apache Spark з'яўляецца дэ факта стандартам пры распрацоўцы задач пакетнай апрацоўкі дадзеных. Акрамя гэтага, ён таксама выкарыстоўваецца для стварэння стрымінгавых прыкладанняў, якія працуюць у канцэпцыі micro batch, якія апрацоўваюць і адгружаюць дадзеныя маленькімі порцыямі (Spark Structured Streaming). І традыцыйна ён з'яўляўся часткай агульнага стэка Hadoop, выкарыстоўваючы ў якасці мэнэджэра рэсурсаў YARN (ці, у некаторых выпадках, Apache Mesos). Да 2020 года яго выкарыстанне ў традыцыйным выглядзе для большасці кампаній знаходзіцца пад вялікім пытаннем з прычыны адсутнасці прыстойных дыстрыбутываў Hadoop – развіццё HDP і CDH спынена, CDH недастаткова прапрацаваны і мае высокі кошт, а астатнія пастаўшчыкі Hadoop альбо спынілі сваё існаванне, альбо маюць туманную будучыню. Таму ўсё большую цікавасць у супольнасці і буйных кампаній выклікае запуск Apache Spark з дапамогай Kubernetes - стаўшы стандартам у аркестрацыі кантэйнераў і кіраванні рэсурсамі ў прыватных і публічных аблоках, ён вырашае праблему з нязручным планаваннем рэсурсаў задач Spark на YARN і дае стабільна развіваецца платформу з мноствам камерцыйных і адкрытых дыстрыбутываў для кампаній усіх памераў і масцей. Да таго ж на хвалі папулярнасці большасць ужо паспела абзавесціся парай-тройкай сваіх усталёвак і нарасціць экспертызу ў яго выкарыстанні, што спрашчае пераезд.

Пачынаючы з версіі 2.3.0 Apache Spark абзавёўся афіцыйнай падтрымкай запуску задач у кластары Kubernetes і сёння, мы пагаворым аб бягучай сталасці дадзенага падыходу, розных варыянтах яго выкарыстання і падводных камянях, з якімі маецца быць сутыкнуцца пры ўкараненні.

Першым чынам, разгледзім працэс распрацоўкі задач і прыкладанняў на базе Apache Spark і вылучым тыпавыя выпадкі, у якіх патрабуецца запусціць задачу на кластары Kubernetes. Пры падрыхтоўцы дадзенага паста ў якасці дыстрыбутыва выкарыстоўваецца OpenShift і будуць прыведзены каманды, актуальныя для яго ўтыліты каманднага радка (oc). Для іншых дыстрыбутываў Kubernetes могуць быць скарыстаны адпаведныя каманды стандартнай утыліты каманднага радка Kubernetes (kubectl) альбо іх аналогі (напрыклад, для oc adm policy).

Першы варыянт выкарыстання - spark-submit

У працэсе распрацоўкі задач і прыкладанняў распрацоўшчыку патрабуецца запускаць задачы для адладкі трансфармацыі дадзеных. Тэарэтычна для гэтых мэт могуць быць скарыстаны заглушкі, але распрацоўка з удзелам рэальных (хай і тэставых) асобнікаў канчатковых сістэм, паказала сябе ў гэтым класе задач хутчэй і якасней. У тым выпадку, калі мы вырабляем адладку на рэальных асобніках канчатковых сістэм, магчымыя два сцэнара працы:

  • распрацоўшчык запускае задачу Spark лакальна ў рэжыме standalone;

    Запускаем Apache Spark на Kubernetes

  • распрацоўшчык запускае задачу Spark на кластары Kubernetes у тэставым контуры.

    Запускаем Apache Spark на Kubernetes

Першы варыянт мае права на існаванне, але цягне за сабой шэраг недахопаў:

  • для кожнага распрацоўніка патрабуецца забяспечыць доступ з працоўнага месца да ўсіх неабходных яму асобнікаў канчатковых сістэм;
  • на працоўнай машыне патрабуецца дастатковая колькасць рэсурсаў для запуску распрацоўванай задачы.

Другі варыянт пазбаўлены дадзеных недахопаў, паколькі выкарыстанне кластара Kubernetes дазваляе вылучыць неабходны пул рэсурсаў для запуску задач і забяспечыць для яго неабходныя доступы да асобнікаў канчатковых сістэм, гнутка падаючы да яго доступ з дапамогай ролевай мадэлі Kubernetes для ўсіх чальцоў каманды распрацоўкі. Вылучым яго ў якасці першага варыянту выкарыстання – запуск задач Spark з лакальнай машыны распрацоўніка на кластары Kubernetes у тэставым контуры.

Раскажам падрабязней аб працэсе налады Spark для лакальнага запуску. Каб пачаць карыстацца Spark яго патрабуецца ўсталяваць:

mkdir /opt/spark
cd /opt/spark
wget http://mirror.linux-ia64.org/apache/spark/spark-2.4.5/spark-2.4.5.tgz
tar zxvf spark-2.4.5.tgz
rm -f spark-2.4.5.tgz

Збіраны неабходныя пакеты для працы з Kubernetes:

cd spark-2.4.5/
./build/mvn -Pkubernetes -DskipTests clean package

Поўная зборка займае шмат часу, а для стварэння выяў Docker і іх запуску на кластары Kubernetes у рэальнасці патрэбныя толькі jar файлы з дырэкторыі "assembly/", таму можна сабраць толькі дадзены падпраект:

./build/mvn -f ./assembly/pom.xml -Pkubernetes -DskipTests clean package

Для запуску задач Spark у Kubernetes патрабуецца стварыць выяву Docker, які будзе выкарыстоўвацца ў якасці базавага. Тут магчымыя 2 падыходы:

  • Створаны вобраз Docker уключае ў сябе выкананы код задачы Spark;
  • Створаны вобраз уключае ў сябе толькі Spark і неабходныя залежнасці, выкананы код размяшчаецца выдалена (напрыклад, у HDFS).

Для пачатку збяром выяву Docker, які змяшчае тэставы прыклад задачы Spark. Для стварэння выяў Docker у Spark ёсць якая адпавядае ўтыліта пад назовам «docker-image-tool». Вывучым па ёй даведку:

./bin/docker-image-tool.sh --help

З яе дапамогай можна ствараць выявы Docker і ажыццяўляць іх загрузку ў выдаленыя рэестры, але па змаўчанні яна мае шэраг недахопаў:

  • у абавязковым парадку стварае адразу 3 выявы Docker - пад Spark, PySpark і R;
  • не дазваляе паказаць імя выявы.

Таму мы будзем выкарыстоўваць мадыфікаваны варыянт дадзенай утыліты, прыведзены ніжэй:

vi bin/docker-image-tool-upd.sh

#!/usr/bin/env bash

function error {
  echo "$@" 1>&2
  exit 1
}

if [ -z "${SPARK_HOME}" ]; then
  SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
. "${SPARK_HOME}/bin/load-spark-env.sh"

function image_ref {
  local image="$1"
  local add_repo="${2:-1}"
  if [ $add_repo = 1 ] && [ -n "$REPO" ]; then
    image="$REPO/$image"
  fi
  if [ -n "$TAG" ]; then
    image="$image:$TAG"
  fi
  echo "$image"
}

function build {
  local BUILD_ARGS
  local IMG_PATH

  if [ ! -f "$SPARK_HOME/RELEASE" ]; then
    IMG_PATH=$BASEDOCKERFILE
    BUILD_ARGS=(
      ${BUILD_PARAMS}
      --build-arg
      img_path=$IMG_PATH
      --build-arg
      datagram_jars=datagram/runtimelibs
      --build-arg
      spark_jars=assembly/target/scala-$SPARK_SCALA_VERSION/jars
    )
  else
    IMG_PATH="kubernetes/dockerfiles"
    BUILD_ARGS=(${BUILD_PARAMS})
  fi

  if [ -z "$IMG_PATH" ]; then
    error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
  fi

  if [ -z "$IMAGE_REF" ]; then
    error "Cannot find docker image reference. Please add -i arg."
  fi

  local BINDING_BUILD_ARGS=(
    ${BUILD_PARAMS}
    --build-arg
    base_img=$(image_ref $IMAGE_REF)
  )
  local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/docker/Dockerfile"}

  docker build $NOCACHEARG "${BUILD_ARGS[@]}" 
    -t $(image_ref $IMAGE_REF) 
    -f "$BASEDOCKERFILE" .
}

function push {
  docker push "$(image_ref $IMAGE_REF)"
}

function usage {
  cat <<EOF
Usage: $0 [options] [command]
Builds or pushes the built-in Spark Docker image.

Commands:
  build       Build image. Requires a repository address to be provided if the image will be
              pushed to a different registry.
  push        Push a pre-built image to a registry. Requires a repository address to be provided.

Options:
  -f file               Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
  -p file               Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark.
  -R file               Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark.
  -r repo               Repository address.
  -i name               Image name to apply to the built image, or to identify the image to be pushed.  
  -t tag                Tag to apply to the built image, or to identify the image to be pushed.
  -m                    Use minikube's Docker daemon.
  -n                    Build docker image with --no-cache
  -b arg      Build arg to build or push the image. For multiple build args, this option needs to
              be used separately for each build arg.

Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
available when running applications inside the minikube cluster.

Check the following documentation for more information on using the minikube Docker daemon:

  https://kubernetes.io/docs/getting-started-guides/minikube/#reusing-the-docker-daemon

Examples:
  - Build image in minikube with tag "testing"
    $0 -m -t testing build

  - Build and push image with tag "v2.3.0" to docker.io/myrepo
    $0 -r docker.io/myrepo -t v2.3.0 build
    $0 -r docker.io/myrepo -t v2.3.0 push
EOF
}

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
  usage
  exit 0
fi

REPO=
TAG=
BASEDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
IMAGE_REF=
while getopts f:mr:t:nb:i: option
do
 case "${option}"
 in
 f) BASEDOCKERFILE=${OPTARG};;
 r) REPO=${OPTARG};;
 t) TAG=${OPTARG};;
 n) NOCACHEARG="--no-cache";;
 i) IMAGE_REF=${OPTARG};;
 b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};;
 esac
done

case "${@: -1}" in
  build)
    build
    ;;
  push)
    if [ -z "$REPO" ]; then
      usage
      exit 1
    fi
    push
    ;;
  *)
    usage
    exit 1
    ;;
esac

З яе дапамогай збіраем базавую выяву Spark, утрымоўвальны ў сабе тэставую задачу для вылічэння ліку Pi з дапамогай Spark (тут {docker-registry-url} - URL вашага рэестра выяў Docker, {repo} - імя рэпазітара ўсярэдзіне рэестра, супадаючае з праектам у OpenShift , {image-name} - імя выявы (калі выкарыстоўваецца трохузроўневы падзел вобразаў, напрыклад, як у інтэграваным рэестры вобразаў Red Hat OpenShift), {tag} - тэг дадзенай версіі выявы):

./bin/docker-image-tool-upd.sh -f resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile -r {docker-registry-url}/{repo} -i {image-name} -t {tag} build

Аўтарызуемся ў кластары OKD з дапамогай кансольнай утыліты (тут {OKD-API-URL} — URL API кластара OKD):

oc login {OKD-API-URL}

Атрымаем токен бягучага карыстальніка для аўтарызацыі ў Docker Registry:

oc whoami -t

Аўтарызуемся ва ўнутраным Docker Registry кластара OKD (у якасці пароля выкарыстоўваем токен, атрыманы з дапамогай папярэдняй каманды):

docker login {docker-registry-url}

Загрузім сабраную выяву Docker у Docker Registry OKD:

./bin/docker-image-tool-upd.sh -r {docker-registry-url}/{repo} -i {image-name} -t {tag} push

Праверым, што сабраная выява даступны ў OKD. Для гэтага адкрыем у браўзэры URL са спісам выяў адпаведнага праекту (тут {project} - імя праекту ўсярэдзіне кластара OpenShift, {OKD-WEBUI-URL} - URL Web кансолі OpenShift) - https://{OKD-WEBUI-URL}/console /project/{project}/browse/images/{image-name}.

Для запуску задач павінен быць створаны сэрвісны акаўнт з прывілеямі запуску падоў пад root (у далейшым абмяркуем гэты момант):

oc create sa spark -n {project}
oc adm policy add-scc-to-user anyuid -z spark -n {project}

Выканаем каманду spark-submit для публікацыі задачы Spark у кластары OKD, паказаўшы створаны сэрвісны акаўнт і выява Docker:

 /opt/spark/bin/spark-submit --name spark-test --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar

Тут:

-name - імя задачы, якое будзе ўдзельнічаць у фарміраванні імя падоў Kubernetes;

-class - клас выкананага файла, які выклікаецца пры запуску задачы;

-conf - канфігурацыйныя параметры Spark;

spark.executor.instances - колькасць запускаемых экзэк'ютараў Spark;

spark.kubernetes.authenticate.driver.serviceAccountName — імя службовага ўліковага запісу Kubernetes, які выкарыстоўваецца пры запуску подаў (для вызначэння кантэксту бяспекі і магчымасцяў пры ўзаемадзеянні з API Kubernetes);

spark.kubernetes.namespace - прастора імёнаў Kubernetes, у якім будуць запускацца поды драйвера і экзэк'ютараў;

spark.submit.deployMode - спосаб запуску Spark (для стандартнага spark-submit выкарыстоўваецца "cluster", для Spark Operator і пазнейшых версій Spark "client");

spark.kubernetes.container.image - выява Docker, які выкарыстоўваецца для запуску подаў;

spark.master - URL API Kubernetes (паказваецца знешні так зварот адбываецца з лакальнай машыны);

local:// — шлях да выкананага файла Spark усярэдзіне выявы Docker.

Пераходзім у адпаведны праект OKD і вывучаем створаныя поды - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods.

Для спрашчэння працэсу распрацоўкі можа быць скарыстаны яшчэ адзін варыянт, пры якім ствараецца агульная базавая выява Spark, выкарыстоўваны ўсімі задачамі для запуску, а снэпшоты выкананых файлаў публікуюцца ў вонкавае сховішча (напрыклад, Hadoop) і паказваюцца пры выкліку spark-submit у выглядзе спасылкі. У гэтым выпадку можна запускаць розныя версіі задач Spark без перазборкі выяў Docker, выкарыстаючы для публікацыі выяў, напрыклад, WebHDFS. Адпраўляем запыт на стварэнне файла (тут {host} - хост сэрвісу WebHDFS, {port} - порт сэрвісу WebHDFS, {path-to-file-on-hdfs} - жаданы шлях да файла на HDFS):

curl -i -X PUT "http://{host}:{port}/webhdfs/v1/{path-to-file-on-hdfs}?op=CREATE

Пры гэтым будзе атрыманы адказ выгляду (тут {location} - гэта URL, які трэба выкарыстоўваць для загрузкі файла):

HTTP/1.1 307 TEMPORARY_REDIRECT
Location: {location}
Content-Length: 0

Загружаем выкананы файл Spark у HDFS (тут {path-to-local-file} - шлях да выкананага файла Spark на бягучым хасце):

curl -i -X PUT -T {path-to-local-file} "{location}"

Пасля гэтага можам рабіць spark-submit з выкарыстаннем файла Spark, загружанага на HDFS (тут {class-name} - імя класа, які патрабуецца запусціць для выканання задачы):

/opt/spark/bin/spark-submit --name spark-test --class {class-name} --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  hdfs://{host}:{port}/{path-to-file-on-hdfs}

Пры гэтым трэба заўважыць, што для доступу да HDFS і забеспячэнні працы задачы можа запатрабавацца змяніць Dockerfile і скрыпт entrypoint.sh - дадаць у Dockerfile дырэктыву для капіявання залежных бібліятэк у дырэкторыю /opt/spark/jars і ўлучыць файл канфігурацыі HDFS у SPARK_CLASSPATH у entrypoint. sh.

Другі варыянт выкарыстання - Apache Livy

Далей, калі задача распрацавана і патрабуецца пратэставаць атрыманы вынік, узнікае пытанне яе запуску ў рамках працэсу CI/CD і адсочванні статутаў яе выканання. Вядома, можна запускаць яе і з дапамогай лакальнага выкліку spark-submit, але гэта ўскладняе інфраструктуру CI/CD паколькі патрабуе ўсталёўку і канфігурацыю Spark на агентах/ранерах CI сервера і налады доступу да API Kubernetes. Для дадзенага выпадку мэтавай рэалізацыяй абрана выкарыстанне Apache Livy у якасці REST API для запуску задач Spark, размешчанага ўсярэдзіне кластара Kubernetes. З яго дапамогай можна запускаць задачы Spark на кластары Kubernetes выкарыстоўваючы звычайныя cURL запыты, што лёгка рэалізуецца на базе любога CI рашэння, а яго размяшчэнне ўсярэдзіне кластара Kubernetes вырашае пытанне аўтэнтыфікацыі пры ўзаемадзеянні з API Kubernetes.

Запускаем Apache Spark на Kubernetes

Вылучым яго ў якасці другога варыянту выкарыстання – запуск задач Spark у рамках працэсу CI/CD на кластары Kubernetes у тэставым контуры.

Трохі пра Apache Livy - ён працуе як HTTP сервер, які прадстаўляе Web інтэрфейс і RESTful API, які дазваляе выдалена запусціць spark-submit, перадаўшы неабходныя параметры. Традыцыйна ён пастаўляўся ў складзе дыстрыбутыва HDP, але таксама можа быць разгорнуты ў OKD або любой іншай усталёўкі Kubernetes з дапамогай адпаведнага маніфесту і набору вобразаў Docker, напрыклад, гэтага. github.com/ttauveron/k8s-big-data-experiments/tree/master/livy-spark-2.3. Для нашага выпадку была сабрана аналагічная выява Docker, улучальны ў сябе Spark версіі 2.4.5 з наступнага Dockerfile:

FROM java:8-alpine

ENV SPARK_HOME=/opt/spark
ENV LIVY_HOME=/opt/livy
ENV HADOOP_CONF_DIR=/etc/hadoop/conf
ENV SPARK_USER=spark

WORKDIR /opt

RUN apk add --update openssl wget bash && 
    wget -P /opt https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz && 
    tar xvzf spark-2.4.5-bin-hadoop2.7.tgz && 
    rm spark-2.4.5-bin-hadoop2.7.tgz && 
    ln -s /opt/spark-2.4.5-bin-hadoop2.7 /opt/spark

RUN wget http://mirror.its.dal.ca/apache/incubator/livy/0.7.0-incubating/apache-livy-0.7.0-incubating-bin.zip && 
    unzip apache-livy-0.7.0-incubating-bin.zip && 
    rm apache-livy-0.7.0-incubating-bin.zip && 
    ln -s /opt/apache-livy-0.7.0-incubating-bin /opt/livy && 
    mkdir /var/log/livy && 
    ln -s /var/log/livy /opt/livy/logs && 
    cp /opt/livy/conf/log4j.properties.template /opt/livy/conf/log4j.properties

ADD livy.conf /opt/livy/conf
ADD spark-defaults.conf /opt/spark/conf/spark-defaults.conf
ADD entrypoint.sh /entrypoint.sh

ENV PATH="/opt/livy/bin:${PATH}"

EXPOSE 8998

ENTRYPOINT ["/entrypoint.sh"]
CMD ["livy-server"]

Створаная выява можа быць сабраны і загружаны ў наяўны ў вас рэпазітар Docker, напрыклад, унутраны рэпазітар OKD. Для яго разгортвання выкарыстоўваецца наступны маніфест ({registry-url} - URL рэестра выяў Docker, {image-name} - імя выявы Docker, {tag} - тэг выявы Docker, {livy-url} - жаданы URL, па якім будзе даступны сервер Livy, маніфест "Route" прымяняецца ў выпадку, калі ў якасці дыстрыбутыва Kubernetes выкарыстоўваецца Red Hat OpenShift, у адваротным выпадку выкарыстоўваецца адпаведны маніфест Ingress або Service тыпу NodePort):

---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: livy
  name: livy
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      component: livy
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        component: livy
    spec:
      containers:
        - command:
            - livy-server
          env:
            - name: K8S_API_HOST
              value: localhost
            - name: SPARK_KUBERNETES_IMAGE
              value: 'gnut3ll4/spark:v1.0.14'
          image: '{registry-url}/{image-name}:{tag}'
          imagePullPolicy: Always
          name: livy
          ports:
            - containerPort: 8998
              name: livy-rest
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - mountPath: /var/log/livy
              name: livy-log
            - mountPath: /opt/.livy-sessions/
              name: livy-sessions
            - mountPath: /opt/livy/conf/livy.conf
              name: livy-config
              subPath: livy.conf
            - mountPath: /opt/spark/conf/spark-defaults.conf
              name: spark-config
              subPath: spark-defaults.conf
        - command:
            - /usr/local/bin/kubectl
            - proxy
            - '--port'
            - '8443'
          image: 'gnut3ll4/kubectl-sidecar:latest'
          imagePullPolicy: Always
          name: kubectl
          ports:
            - containerPort: 8443
              name: k8s-api
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      serviceAccount: spark
      serviceAccountName: spark
      terminationGracePeriodSeconds: 30
      volumes:
        - emptyDir: {}
          name: livy-log
        - emptyDir: {}
          name: livy-sessions
        - configMap:
            defaultMode: 420
            items:
              - key: livy.conf
                path: livy.conf
            name: livy-config
          name: livy-config
        - configMap:
            defaultMode: 420
            items:
              - key: spark-defaults.conf
                path: spark-defaults.conf
            name: livy-config
          name: spark-config
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: livy-config
data:
  livy.conf: |-
    livy.spark.deploy-mode=cluster
    livy.file.local-dir-whitelist=/opt/.livy-sessions/
    livy.spark.master=k8s://http://localhost:8443
    livy.server.session.state-retain.sec = 8h
  spark-defaults.conf: 'spark.kubernetes.container.image        "gnut3ll4/spark:v1.0.14"'
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: livy
  name: livy
spec:
  ports:
    - name: livy-rest
      port: 8998
      protocol: TCP
      targetPort: 8998
  selector:
    component: livy
  sessionAffinity: None
  type: ClusterIP
---
apiVersion: route.openshift.io/v1
kind: Route
metadata:
  labels:
    app: livy
  name: livy
spec:
  host: {livy-url}
  port:
    targetPort: livy-rest
  to:
    kind: Service
    name: livy
    weight: 100
  wildcardPolicy: None

Пасля яго прымянення і паспяховага запуску пода графічны інтэрфейс Livy даступны па спасылцы: http://{livy-url}/ui. З дапамогай Livy мы можам апублікаваць нашу задачу Spark, выкарыстоўваючы REST запыт, напрыклад, з Postman. Прыклад калекцыі з запытамі прадстаўлены ніжэй (у масіве «args» могуць быць перададзены канфігурацыйныя аргументы са зменнымі, неабходнымі для працы запускаемай задачы):

{
    "info": {
        "_postman_id": "be135198-d2ff-47b6-a33e-0d27b9dba4c8",
        "name": "Spark Livy",
        "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
    },
    "item": [
        {
            "name": "1 Submit job with jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar", nt"className": "org.apache.spark.examples.SparkPi",nt"numExecutors":1,nt"name": "spark-test-1",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt}n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        },
        {
            "name": "2 Submit job without jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "hdfs://{host}:{port}/{path-to-file-on-hdfs}", nt"className": "{class-name}",nt"numExecutors":1,nt"name": "spark-test-2",nt"proxyUser": "0",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt},nt"args": [ntt"HADOOP_CONF_DIR=/opt/spark/hadoop-conf",ntt"MASTER=k8s://https://kubernetes.default.svc:8443"nt]n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        }
    ],
    "event": [
        {
            "listen": "prerequest",
            "script": {
                "id": "41bea1d0-278c-40c9-ad42-bf2e6268897d",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        },
        {
            "listen": "test",
            "script": {
                "id": "3cdd7736-a885-4a2d-9668-bd75798f4560",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        }
    ],
    "protocolProfileBehavior": {}
}

Выканаем першы запыт з калекцыі, пяройдзем у інтэрфейс OKD і праверым, што задача паспяхова запушчана - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods. Пры гэтым у інтэрфейсе Livy (http://{livy-url}/ui) з'явіцца сесія, у рамках якой з дапамогай API Livy ці графічнага інтэрфейсу можна адсочваць ход выканання задачы і вывучаць логі сесіі.

Цяпер пакажам механізм працы Livy. Для гэтага вывучым часопісы кантэйнера Livy ўнутры пода з серверам Livy - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods/{livy-pod-name}?tab=logs. З іх відаць, што пры выкліку REST API Livy у кантэйнеры з імем «livy» выконваецца spark-submit, аналагічны які выкарыстоўваецца намі вышэй (тут {livy-pod-name} – імя створанага пода з серверам Livy). У калекцыі таксама прадстаўлены другі запыт, які дазваляе запускаць задачы з выдаленым размяшчэннем выкананага файла Spark з дапамогай сервера Livy.

Трэці варыянт выкарыстання - Spark Operator

Цяпер, калі задача пратэставаная, устае пытанне яе рэгулярнага запуску. Натыўным спосабам для рэгулярнага запуску задач у кластары Kubernetes з'яўляецца сутнасць CronJob і можна выкарыстоўваць яе, але ў дадзены момант вялікую папулярнасць мае выкарыстанне аператараў для кіравання праграмамі ў Kubernetes і для Spark існуе дастаткова спелы аператар, які, у тым ліку, выкарыстоўваецца ў рашэннях Enterprise ўзроўню (напрыклад, Lightbend FastData Platform). Мы рэкамендуем выкарыстоўваць яго - бягучая стабільная версія Spark (2.4.5) мае дастаткова абмежаваныя магчымасці па канфігурацыі запуску задач Spark у Kubernetes, пры гэтым у наступнай мажорнай версіі (3.0.0) заяўлена паўнавартасная падтрымка Kubernetes, але дата яе выхаду застаецца невядомай. Spark Operator кампенсуе гэты недахоп, дадаючы важныя параметры канфігурацыі (напрыклад, мантаванне ConfigMap з канфігурацыяй доступу да Hadoop у поды Spark) і магчымасць рэгулярнага запуску задачы па раскладзе.

Запускаем Apache Spark на Kubernetes
Вылучым яго ў якасці трэцяга варыянту выкарыстання - рэгулярны запуск задач Spark на кластары Kubernetes у прадуктыўным контуры.

Spark Operator мае адкрыты зыходны код і распрацоўваецца ў рамках Google Cloud Platform. github.com/GoogleCloudPlatform/spark-on-k8s-operator. Яго ўстаноўка можа быць праведзена 3 спосабамі:

  1. У рамках усталёўкі Lightbend FastData Platform/Cloudflow;
  2. З дапамогай Helm:
    helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
    helm install incubator/sparkoperator --namespace spark-operator
    	

  3. Ужываннем маніфестаў з афіцыйнага рэпазітара (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/tree/master/manifest). Пры гэтым варта адзначыць наступнае – у склад Cloudflow ўваходзіць аператар з версіяй API v1beta1. Калі выкарыстоўваецца дадзены тып усталёўкі, то апісанні маніфестаў прыкладанняў Spark павінны будавацца на аснове прыкладаў з тэгаў у Git з адпаведнай версіяй API, напрыклад, «v1beta1-0.9.0-2.4.0». Версію аператара можна паглядзець у апісанні CRD, які ўваходзіць у склад аператара ў слоўніку «versions»:
    oc get crd sparkapplications.sparkoperator.k8s.io -o yaml
    	

Калі аператар усталяваны карэктна, то ў які адпавядае праекце з'явіцца актыўны пад з аператарам Spark (напрыклад, cloudflow-fdp-sparkoperator у прасторы Cloudflow для ўсталёўкі Cloudflow) і з'явіцца які адпавядае тып рэсурсаў Kubernetes з імем «sparkapplications». Вывучыць наяўныя прыкладанняў Spark можна наступнай камандай:

oc get sparkapplications -n {project}

Для запуску задач з дапамогай Spark Operator патрабуецца зрабіць 3 рэчы:

  • стварыць выяву Docker, улучальны ў сябе ўсе неабходныя бібліятэкі, а таксама канфігурацыйныя і выкананыя файлы. У мэтавай карціне гэта выява, створаны на этапе CI/CD і пратэставаны на тэставым кластары;
  • апублікаваць выяву Docker у рэестр, даступны з кластара Kubernetes;
  • сфарміраваць маніфест з тыпам «SparkApplication» і апісаннем запускае задачы. Прыклады маніфестаў даступныя ў афіцыйным рэпазітары (напрыклад, github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta1-0.9.0-2.4.0/examples/spark-pi.yaml). Варта адзначыць важныя моманты датычна маніфесту:
    1. у слоўніку "apiVersion" павінна быць указана версія API, якая адпавядае версіі аператара;
    2. у слоўніку "metadata.namespace" павінна быць указана прастора імёнаў, у якім будзе запушчана дадатак;
    3. у слоўніку "spec.image" павінен быць паказаны адрас створанай выявы Docker у даступным рэестры;
    4. у слоўніку "spec.mainClass" павінен быць паказаны клас задачы Spark, які патрабуецца запусціць пры запуску працэсу;
    5. у слоўніку "spec.mainApplicationFile" павінен быць паказаны шлях да выкананага jar файла;
    6. у слоўніку "spec.sparkVersion" павінна быць указана выкарыстоўваная версія Spark;
    7. у слоўніку «spec.driver.serviceAccount» павінен быць указаны сэрвісны ўліковы запіс усярэдзіне адпаведнай прасторы імёнаў Kubernetes, які будзе выкарыстаны для запуску прыкладання;
    8. у слоўніку "spec.executor" павінна быць указана колькасць рэсурсаў, што выдзяляюцца з дадаткам;
    9. у слоўніку "spec.volumeMounts" павінна быць паказана лакальная дырэкторыя, у якой будуць стварацца лакальныя файлы задачы Spark.

Прыклад фармавання маніфеста (тут {spark-service-account} — сэрвісны акаўнт усярэдзіне кластара Kubernetes для запуску задач Spark):

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: {spark-service-account}
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

У дадзеным маніфесце паказаны сэрвісны ўліковы запіс, для якога патрабуецца да публікацыі маніфесту стварыць неабходныя прывязкі роляў, якія прадстаўляюць неабходныя правы доступу для ўзаемадзеяння прыкладання Spark з API Kubernetes (калі трэба). У нашым выпадку з дадаткам патрэбныя правы на стварэнне Pod'ов. Створым неабходную прывязку ролі:

oc adm policy add-role-to-user edit system:serviceaccount:{project}:{spark-service-account} -n {project}

Таксама варта адзначыць, што ў спецыфікацыі дадзенага маніфеста можа быць паказаны параметр "hadoopConfigMap", які дазваляе паказаць ConfigMap з канфігурацыяй Hadoop без неабходнасці папярэдняга памяшкання адпаведнага файла ў выяву Docker. Таксама ён падыходзіць для рэгулярнага запуску задач - з дапамогай параметра "schedule" можа быць паказана расклад запуску дадзенай задачы.

Пасля гэтага захоўваем наш маніфест у файл spark-pi.yaml і ўжывальны яго да нашага кластара Kubernetes:

oc apply -f spark-pi.yaml

Пры гэтым створыцца аб'ект тыпу "sparkapplications":

oc get sparkapplications -n {project}
> NAME       AGE
> spark-pi   22h

Пры гэтым будзе створаны пад дадаткам, статус якога будзе адлюстроўвацца ў створаным «sparkapplications». Яго можна паглядзець наступнай камандай:

oc get sparkapplications spark-pi -o yaml -n {project}

Па завяршэнні задачы POD пяройдзе ў статут "Completed", які таксама абновіцца ў "sparkapplications". Логі прыкладання можна паглядзець у браўзэры або з дапамогай наступнай каманды (тут {sparkapplications-pod-name} - імя пода запушчанай задачы):

oc logs {sparkapplications-pod-name} -n {project}

Таксама кіраванне задачамі Spark можа быць ажыццёўлена з дапамогай спецыялізаванай утыліты sparkctl. Для яе ўстаноўкі клануем рэпазітар з яе зыходным кодам, усталюем Go і збяром дадзеную ўтыліту:

git clone https://github.com/GoogleCloudPlatform/spark-on-k8s-operator.git
cd spark-on-k8s-operator/
wget https://dl.google.com/go/go1.13.3.linux-amd64.tar.gz
tar -xzf go1.13.3.linux-amd64.tar.gz
sudo mv go /usr/local
mkdir $HOME/Projects
export GOROOT=/usr/local/go
export GOPATH=$HOME/Projects
export PATH=$GOPATH/bin:$GOROOT/bin:$PATH
go -version
cd sparkctl
go build -o sparkctl
sudo mv sparkctl /usr/local/bin

Вывучым спіс запушчаных задач Spark:

sparkctl list -n {project}

Створым апісанне для задачы Spark:

vi spark-app.yaml

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1000m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Запусцім апісаную задачу з дапамогай sparkctl:

sparkctl create spark-app.yaml -n {project}

Вывучым спіс запушчаных задач Spark:

sparkctl list -n {project}

Вывучым спіс падзей запушчанай задачы Spark:

sparkctl event spark-pi -n {project} -f

Вывучым статут запушчанай задачы Spark:

sparkctl status spark-pi -n {project}

У заключэнне хацелася б разгледзець выяўленыя мінусы эксплуатацыі бягучай стабільнай версіі Spark (2.4.5) у Kubernetes:

  1. Першы і, мабыць, галоўны мінус - гэта адсутнасць Data Locality. Нягледзячы на ​​ўсе недахопы YARN былі і плюсы ў яго выкарыстанні, напрыклад, прынцып дастаўкі кода да дадзеных (а не дадзеных да кода). Дзякуючы яму задачы Spark выконваліся на вузлах, дзе размяшчаліся дадзеныя, якія ўдзельнічаюць у вылічэннях, і прыкметна памяншаўся час на дастаўку дадзеных па сетцы. Пры выкарыстанні Kubernetes мы сутыкаемся з неабходнасцю перасоўвання па сетцы дадзеных, задзейнічаных у працы задачы. У выпадку, калі яны досыць вялікія, той час выканання задачы можа істотна павялічыцца, а таксама запатрабавацца досыць вялікі аб'ём дыскавай прасторы, вылучанага асобнікам задачы Spark для іх часавага захоўвання. Гэты недахоп можа быць паніжаны за кошт выкарыстання спецыялізаваных праграмных сродкаў, якія забяспечваюць лакальнасць дадзеных у Kubernetes (напрыклад, Alluxio), але гэта фактычна азначае неабходнасць захоўвання поўнай копіі дадзеных на вузлах кластара Kubernetes.
  2. Другі важны мінус - гэта бяспека. Па змаўчанні функцыі, злучаныя з забеспячэннем бяспекі датычна запуску задач Spark адключаныя, варыянт выкарыстання Kerberos у афіцыйнай дакументацыі не ахоплены (хоць адпаведныя параметры з'явіліся ў версіі 3.0.0, што запатрабуе дадатковай прапрацоўкі), а ў дакументацыі па забеспячэнні бяспекі пры выкарыстанні Spark (https ://spark.apache.org/docs/2.4.5/security.html) у якасці сховішчаў ключоў фігуруюць толькі YARN, Mesos і Standalone Cluster. Пры гэтым карыстач, пад якім запускаюцца задачы Spark, не можа быць паказаны напроста - мы толькі задаём сэрвісны ўліковы запіс, пад якой будзе працаваць пад, а карыстач выбіраецца зыходзячы з настроеных палітык бяспекі. У сувязі з гэтым альбо выкарыстоўваецца карыстач root, што не бяспечна ў прадуктыўным асяроддзі, альбо карыстач з выпадковым UID, што няёмка пры размеркаванні мае рацыю доступу да дадзеных (вырашальна стварэннем PodSecurityPolicies і іх прывязкай да адпаведных службовых уліковых запісаў). На бягучы момант вырашаецца альбо памяшканнем усіх неабходных файлаў непасрэдна ў выяву Docker, альбо мадыфікацыяй скрыпту запуску Spark для выкарыстання механізму захоўвання і атрыманні сакрэтаў, прынятага ў Вашай арганізацыі.
  3. Запуск задач Spark з дапамогай Kubernetes афіцыйна да гэтага часу знаходзіцца ў эксперыментальным рэжыме і ў будучыні магчымыя значныя змены ў выкарыстоўваных артэфактах (канфігурацыйных файлах, базавых вобразаў Docker і скрыптах запуску). І сапраўды - пры падрыхтоўцы матэрыялу тэставаліся версіі 2.3.0 і 2.4.5, паводзіны істотна адрознівалася.

Будзем чакаць абнаўленняў - нядаўна выйшла свежая версія Spark (3.0.0), якая прынесла адчувальныя змены ў працу Spark на Kubernetes, але якая захавала эксперыментальны статус падтрымкі дадзенага мэнэджара рэсурсаў. Магчыма, наступныя абнаўленні сапраўды дазволяць у поўнай меры рэкамендаваць адмовіцца ад YARN і запускаць задачы Spark на Kubernetes, не асцерагаючыся за бяспеку Вашай сістэмы і без неабходнасці самастойнай дапрацоўкі функцыянальных кампанентаў.

Плаўнік

Крыніца: habr.com

Дадаць каментар