Executando Apache Spark no Kubernetes

Caros leitores, boa tarde. Hoje falaremos um pouco sobre o Apache Spark e suas perspectivas de desenvolvimento.

Executando Apache Spark no Kubernetes

No mundo moderno de Big Data, o Apache Spark é o padrão de fato para o desenvolvimento de tarefas de processamento de dados em lote. Além disso, também é utilizado para criar aplicações de streaming que funcionam no conceito micro batch, processando e enviando dados em pequenas porções (Spark Structured Streaming). E tradicionalmente faz parte da pilha geral do Hadoop, usando o YARN (ou em alguns casos o Apache Mesos) como gerenciador de recursos. Em 2020, a sua utilização na sua forma tradicional está em causa para a maioria das empresas devido à falta de distribuições decentes de Hadoop - o desenvolvimento de HDP e CDH parou, o CDH não está bem desenvolvido e tem um custo elevado, e os restantes fornecedores de Hadoop têm deixou de existir ou tem um futuro sombrio. Portanto, o lançamento do Apache Spark usando Kubernetes é de interesse crescente entre a comunidade e grandes empresas - tornando-se um padrão em orquestração de contêineres e gerenciamento de recursos em nuvens privadas e públicas, resolve o problema com agendamento inconveniente de recursos de tarefas Spark no YARN e fornece uma plataforma em constante desenvolvimento com muitas distribuições comerciais e abertas para empresas de todos os tamanhos e segmentos. Além disso, na esteira da popularidade, a maioria já conseguiu adquirir algumas instalações próprias e aumentou sua experiência no seu uso, o que simplifica a mudança.

A partir da versão 2.3.0, o Apache Spark adquiriu suporte oficial para execução de tarefas em um cluster Kubernetes e hoje falaremos sobre a maturidade atual dessa abordagem, diversas opções de uso e armadilhas que serão encontradas durante a implementação.

Em primeiro lugar, vamos dar uma olhada no processo de desenvolvimento de tarefas e aplicativos baseados no Apache Spark e destacar casos típicos em que é necessário executar uma tarefa em um cluster Kubernetes. Na preparação deste post, o OpenShift é usado como uma distribuição e serão fornecidos comandos relevantes para seu utilitário de linha de comando (oc). Para outras distribuições do Kubernetes, os comandos correspondentes do utilitário de linha de comando padrão do Kubernetes (kubectl) ou seus análogos (por exemplo, para política oc adm) podem ser usados.

Primeiro caso de uso – spark-submit

Durante o desenvolvimento de tarefas e aplicativos, o desenvolvedor precisa executar tarefas para depurar a transformação de dados. Teoricamente, stubs podem ser usados ​​para esses fins, mas o desenvolvimento com a participação de instâncias reais (embora de teste) de sistemas finais provou ser mais rápido e melhor nesta classe de tarefas. No caso de depurarmos instâncias reais de sistemas finais, dois cenários são possíveis:

  • o desenvolvedor executa uma tarefa do Spark localmente em modo autônomo;

    Executando Apache Spark no Kubernetes

  • um desenvolvedor executa uma tarefa Spark em um cluster Kubernetes em um loop de teste.

    Executando Apache Spark no Kubernetes

A primeira opção tem o direito de existir, mas acarreta uma série de desvantagens:

  • Cada desenvolvedor deve ter acesso do local de trabalho a todas as instâncias dos sistemas finais de que necessita;
  • uma quantidade suficiente de recursos é necessária na máquina de trabalho para executar a tarefa que está sendo desenvolvida.

A segunda opção não apresenta essas desvantagens, pois o uso de um cluster Kubernetes permite alocar o pool de recursos necessário para a execução de tarefas e fornecer-lhe o acesso necessário às instâncias do sistema final, fornecendo acesso flexível a ele usando o modelo de função Kubernetes para todos os membros da equipe de desenvolvimento. Vamos destacá-lo como o primeiro caso de uso: iniciar tarefas do Spark a partir de uma máquina de desenvolvedor local em um cluster Kubernetes em um loop de teste.

Vamos falar mais sobre o processo de configuração do Spark para execução local. Para começar a usar o Spark você precisa instalá-lo:

mkdir /opt/spark
cd /opt/spark
wget http://mirror.linux-ia64.org/apache/spark/spark-2.4.5/spark-2.4.5.tgz
tar zxvf spark-2.4.5.tgz
rm -f spark-2.4.5.tgz

Coletamos os pacotes necessários para trabalhar com Kubernetes:

cd spark-2.4.5/
./build/mvn -Pkubernetes -DskipTests clean package

Uma compilação completa leva muito tempo e, para criar imagens Docker e executá-las em um cluster Kubernetes, você realmente só precisa de arquivos jar do diretório “assembly/”, portanto, só pode construir este subprojeto:

./build/mvn -f ./assembly/pom.xml -Pkubernetes -DskipTests clean package

Para executar jobs do Spark no Kubernetes, você precisa criar uma imagem Docker para usar como imagem base. Existem 2 abordagens possíveis aqui:

  • A imagem Docker gerada inclui o código de tarefa executável do Spark;
  • A imagem criada inclui apenas Spark e as dependências necessárias, o código executável é hospedado remotamente (por exemplo, em HDFS).

Primeiro, vamos construir uma imagem Docker contendo um exemplo de teste de uma tarefa Spark. Para criar imagens Docker, o Spark possui um utilitário chamado "docker-image-tool". Vamos estudar a ajuda sobre isso:

./bin/docker-image-tool.sh --help

Com sua ajuda, você pode criar imagens Docker e carregá-las em registros remotos, mas por padrão tem uma série de desvantagens:

  • sem falhas, cria 3 imagens Docker de uma vez - para Spark, PySpark e R;
  • não permite que você especifique um nome de imagem.

Portanto, usaremos uma versão modificada deste utilitário fornecida a seguir:

vi bin/docker-image-tool-upd.sh

#!/usr/bin/env bash

function error {
  echo "$@" 1>&2
  exit 1
}

if [ -z "${SPARK_HOME}" ]; then
  SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
. "${SPARK_HOME}/bin/load-spark-env.sh"

function image_ref {
  local image="$1"
  local add_repo="${2:-1}"
  if [ $add_repo = 1 ] && [ -n "$REPO" ]; then
    image="$REPO/$image"
  fi
  if [ -n "$TAG" ]; then
    image="$image:$TAG"
  fi
  echo "$image"
}

function build {
  local BUILD_ARGS
  local IMG_PATH

  if [ ! -f "$SPARK_HOME/RELEASE" ]; then
    IMG_PATH=$BASEDOCKERFILE
    BUILD_ARGS=(
      ${BUILD_PARAMS}
      --build-arg
      img_path=$IMG_PATH
      --build-arg
      datagram_jars=datagram/runtimelibs
      --build-arg
      spark_jars=assembly/target/scala-$SPARK_SCALA_VERSION/jars
    )
  else
    IMG_PATH="kubernetes/dockerfiles"
    BUILD_ARGS=(${BUILD_PARAMS})
  fi

  if [ -z "$IMG_PATH" ]; then
    error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
  fi

  if [ -z "$IMAGE_REF" ]; then
    error "Cannot find docker image reference. Please add -i arg."
  fi

  local BINDING_BUILD_ARGS=(
    ${BUILD_PARAMS}
    --build-arg
    base_img=$(image_ref $IMAGE_REF)
  )
  local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/docker/Dockerfile"}

  docker build $NOCACHEARG "${BUILD_ARGS[@]}" 
    -t $(image_ref $IMAGE_REF) 
    -f "$BASEDOCKERFILE" .
}

function push {
  docker push "$(image_ref $IMAGE_REF)"
}

function usage {
  cat <<EOF
Usage: $0 [options] [command]
Builds or pushes the built-in Spark Docker image.

Commands:
  build       Build image. Requires a repository address to be provided if the image will be
              pushed to a different registry.
  push        Push a pre-built image to a registry. Requires a repository address to be provided.

Options:
  -f file               Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
  -p file               Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark.
  -R file               Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark.
  -r repo               Repository address.
  -i name               Image name to apply to the built image, or to identify the image to be pushed.  
  -t tag                Tag to apply to the built image, or to identify the image to be pushed.
  -m                    Use minikube's Docker daemon.
  -n                    Build docker image with --no-cache
  -b arg      Build arg to build or push the image. For multiple build args, this option needs to
              be used separately for each build arg.

Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
available when running applications inside the minikube cluster.

Check the following documentation for more information on using the minikube Docker daemon:

  https://kubernetes.io/docs/getting-started-guides/minikube/#reusing-the-docker-daemon

Examples:
  - Build image in minikube with tag "testing"
    $0 -m -t testing build

  - Build and push image with tag "v2.3.0" to docker.io/myrepo
    $0 -r docker.io/myrepo -t v2.3.0 build
    $0 -r docker.io/myrepo -t v2.3.0 push
EOF
}

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
  usage
  exit 0
fi

REPO=
TAG=
BASEDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
IMAGE_REF=
while getopts f:mr:t:nb:i: option
do
 case "${option}"
 in
 f) BASEDOCKERFILE=${OPTARG};;
 r) REPO=${OPTARG};;
 t) TAG=${OPTARG};;
 n) NOCACHEARG="--no-cache";;
 i) IMAGE_REF=${OPTARG};;
 b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};;
 esac
done

case "${@: -1}" in
  build)
    build
    ;;
  push)
    if [ -z "$REPO" ]; then
      usage
      exit 1
    fi
    push
    ;;
  *)
    usage
    exit 1
    ;;
esac

Com sua ajuda, montamos uma imagem básica do Spark contendo uma tarefa de teste para calcular Pi usando Spark (aqui {docker-registry-url} é a URL do seu registro de imagem Docker, {repo} é o nome do repositório dentro do registro, que corresponde ao projeto no OpenShift , {image-name} - nome da imagem (se a separação de imagens em três níveis for usada, por exemplo, como no registro integrado de imagens Red Hat OpenShift), {tag} - tag deste versão da imagem):

./bin/docker-image-tool-upd.sh -f resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile -r {docker-registry-url}/{repo} -i {image-name} -t {tag} build

Faça login no cluster OKD usando o utilitário de console (aqui {OKD-API-URL} é o URL da API do cluster OKD):

oc login {OKD-API-URL}

Vamos obter o token do usuário atual para autorização no Docker Registry:

oc whoami -t

Faça login no Docker Registry interno do cluster OKD (usamos o token obtido com o comando anterior como senha):

docker login {docker-registry-url}

Vamos fazer upload da imagem Docker montada para o Docker Registry OKD:

./bin/docker-image-tool-upd.sh -r {docker-registry-url}/{repo} -i {image-name} -t {tag} push

Vamos verificar se a imagem montada está disponível em OKD. Para fazer isso, abra a URL no navegador com uma lista de imagens do projeto correspondente (aqui {project} é o nome do projeto dentro do cluster OpenShift, {OKD-WEBUI-URL} é a URL do console Web OpenShift ) - https://{OKD-WEBUI-URL}/console /project/{project}/browse/images/{image-name}.

Para executar tarefas, uma conta de serviço deve ser criada com privilégios para executar pods como root (discutiremos esse ponto mais tarde):

oc create sa spark -n {project}
oc adm policy add-scc-to-user anyuid -z spark -n {project}

Vamos executar o comando spark-submit para publicar uma tarefa Spark no cluster OKD, especificando a conta de serviço criada e a imagem Docker:

 /opt/spark/bin/spark-submit --name spark-test --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar

Aqui:

—name — o nome da tarefa que participará da formação do nome dos pods do Kubernetes;

—class — classe do arquivo executável, chamada quando a tarefa é iniciada;

—conf — Parâmetros de configuração do Spark;

spark.executor.instances — o número de executores Spark a serem lançados;

spark.kubernetes.authenticate.driver.serviceAccountName - o nome da conta de serviço Kubernetes usada ao iniciar pods (para definir o contexto de segurança e os recursos ao interagir com a API Kubernetes);

spark.kubernetes.namespace — Namespace Kubernetes no qual os pods de driver e executor serão iniciados;

spark.submit.deployMode — método de inicialização do Spark (para o spark-submit padrão é usado “cluster”, para o Spark Operator e versões posteriores do Spark “client”);

spark.kubernetes.container.image - Imagem Docker usada para iniciar pods;

spark.master — URL da API Kubernetes (externo é especificado para que o acesso ocorra a partir da máquina local);

local:// é o caminho para o executável do Spark dentro da imagem do Docker.

Vamos ao projeto OKD correspondente e estudamos os pods criados - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods.

Para simplificar o processo de desenvolvimento, outra opção pode ser usada, na qual uma imagem base comum do Spark é criada, usada por todas as tarefas a serem executadas, e instantâneos de arquivos executáveis ​​​​são publicados em armazenamento externo (por exemplo, Hadoop) e especificados ao chamar spark-submit como um link. Nesse caso, você pode executar diferentes versões de tarefas do Spark sem reconstruir imagens do Docker, usando, por exemplo, WebHDFS para publicar imagens. Enviamos uma solicitação para criar um arquivo (aqui {host} é o host do serviço WebHDFS, {port} é a porta do serviço WebHDFS, {path-to-file-on-hdfs} é o caminho desejado para o arquivo em HDFS):

curl -i -X PUT "http://{host}:{port}/webhdfs/v1/{path-to-file-on-hdfs}?op=CREATE

Você receberá uma resposta como esta (aqui {location} é o URL que precisa ser usado para baixar o arquivo):

HTTP/1.1 307 TEMPORARY_REDIRECT
Location: {location}
Content-Length: 0

Carregue o arquivo executável do Spark no HDFS (aqui {path-to-local-file} é o caminho para o arquivo executável do Spark no host atual):

curl -i -X PUT -T {path-to-local-file} "{location}"

Depois disso, podemos fazer o spark-submit usando o arquivo Spark carregado no HDFS (aqui {class-name} é o nome da classe que precisa ser iniciada para concluir a tarefa):

/opt/spark/bin/spark-submit --name spark-test --class {class-name} --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  hdfs://{host}:{port}/{path-to-file-on-hdfs}

Deve-se observar que para acessar o HDFS e garantir que a tarefa funcione, pode ser necessário alterar o Dockerfile e o script entrypoint.sh - adicionar uma diretiva ao Dockerfile para copiar bibliotecas dependentes para o diretório /opt/spark/jars e inclua o arquivo de configuração do HDFS em SPARK_CLASSPATH no entrypoint.sh.

Segundo caso de uso - Apache Livy

Além disso, quando uma tarefa é desenvolvida e o resultado precisa ser testado, surge a questão de lançá-la como parte do processo de CI/CD e acompanhar o status de sua execução. Claro, você pode executá-lo usando uma chamada local spark-submit, mas isso complica a infraestrutura de CI/CD, pois requer a instalação e configuração do Spark nos agentes/executores do servidor CI e a configuração do acesso à API Kubernetes. Para este caso, a implementação alvo optou por usar Apache Livy como uma API REST para executar tarefas Spark hospedadas dentro de um cluster Kubernetes. Com sua ajuda, você pode executar tarefas Spark em um cluster Kubernetes usando solicitações cURL regulares, que são facilmente implementadas com base em qualquer solução de CI, e sua colocação dentro do cluster Kubernetes resolve o problema de autenticação ao interagir com a API Kubernetes.

Executando Apache Spark no Kubernetes

Vamos destacá-lo como um segundo caso de uso: executar tarefas do Spark como parte de um processo de CI/CD em um cluster Kubernetes em um loop de teste.

Um pouco sobre o Apache Livy - ele funciona como um servidor HTTP que fornece uma interface Web e uma API RESTful que permite iniciar remotamente o spark-submit, passando os parâmetros necessários. Tradicionalmente, ele é enviado como parte de uma distribuição HDP, mas também pode ser implantado no OKD ou em qualquer outra instalação do Kubernetes usando o manifesto apropriado e um conjunto de imagens Docker, como esta - github.com/ttauveron/k8s-big-data-experiments/tree/master/livy-spark-2.3. Para o nosso caso, uma imagem Docker semelhante foi construída, incluindo Spark versão 2.4.5 do seguinte Dockerfile:

FROM java:8-alpine

ENV SPARK_HOME=/opt/spark
ENV LIVY_HOME=/opt/livy
ENV HADOOP_CONF_DIR=/etc/hadoop/conf
ENV SPARK_USER=spark

WORKDIR /opt

RUN apk add --update openssl wget bash && 
    wget -P /opt https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz && 
    tar xvzf spark-2.4.5-bin-hadoop2.7.tgz && 
    rm spark-2.4.5-bin-hadoop2.7.tgz && 
    ln -s /opt/spark-2.4.5-bin-hadoop2.7 /opt/spark

RUN wget http://mirror.its.dal.ca/apache/incubator/livy/0.7.0-incubating/apache-livy-0.7.0-incubating-bin.zip && 
    unzip apache-livy-0.7.0-incubating-bin.zip && 
    rm apache-livy-0.7.0-incubating-bin.zip && 
    ln -s /opt/apache-livy-0.7.0-incubating-bin /opt/livy && 
    mkdir /var/log/livy && 
    ln -s /var/log/livy /opt/livy/logs && 
    cp /opt/livy/conf/log4j.properties.template /opt/livy/conf/log4j.properties

ADD livy.conf /opt/livy/conf
ADD spark-defaults.conf /opt/spark/conf/spark-defaults.conf
ADD entrypoint.sh /entrypoint.sh

ENV PATH="/opt/livy/bin:${PATH}"

EXPOSE 8998

ENTRYPOINT ["/entrypoint.sh"]
CMD ["livy-server"]

A imagem gerada pode ser construída e carregada em seu repositório Docker existente, como o repositório OKD interno. Para implantá-lo, use o seguinte manifesto ({registry-url} - URL do registro da imagem Docker, {image-name} - nome da imagem Docker, {tag} - tag da imagem Docker, {livy-url} - URL desejada onde o será acessível Livy; o manifesto “Route” será usado se o Red Hat OpenShift for usado como distribuição Kubernetes, caso contrário, o manifesto Ingress ou Service correspondente do tipo NodePort será usado):

---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: livy
  name: livy
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      component: livy
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        component: livy
    spec:
      containers:
        - command:
            - livy-server
          env:
            - name: K8S_API_HOST
              value: localhost
            - name: SPARK_KUBERNETES_IMAGE
              value: 'gnut3ll4/spark:v1.0.14'
          image: '{registry-url}/{image-name}:{tag}'
          imagePullPolicy: Always
          name: livy
          ports:
            - containerPort: 8998
              name: livy-rest
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - mountPath: /var/log/livy
              name: livy-log
            - mountPath: /opt/.livy-sessions/
              name: livy-sessions
            - mountPath: /opt/livy/conf/livy.conf
              name: livy-config
              subPath: livy.conf
            - mountPath: /opt/spark/conf/spark-defaults.conf
              name: spark-config
              subPath: spark-defaults.conf
        - command:
            - /usr/local/bin/kubectl
            - proxy
            - '--port'
            - '8443'
          image: 'gnut3ll4/kubectl-sidecar:latest'
          imagePullPolicy: Always
          name: kubectl
          ports:
            - containerPort: 8443
              name: k8s-api
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      serviceAccount: spark
      serviceAccountName: spark
      terminationGracePeriodSeconds: 30
      volumes:
        - emptyDir: {}
          name: livy-log
        - emptyDir: {}
          name: livy-sessions
        - configMap:
            defaultMode: 420
            items:
              - key: livy.conf
                path: livy.conf
            name: livy-config
          name: livy-config
        - configMap:
            defaultMode: 420
            items:
              - key: spark-defaults.conf
                path: spark-defaults.conf
            name: livy-config
          name: spark-config
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: livy-config
data:
  livy.conf: |-
    livy.spark.deploy-mode=cluster
    livy.file.local-dir-whitelist=/opt/.livy-sessions/
    livy.spark.master=k8s://http://localhost:8443
    livy.server.session.state-retain.sec = 8h
  spark-defaults.conf: 'spark.kubernetes.container.image        "gnut3ll4/spark:v1.0.14"'
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: livy
  name: livy
spec:
  ports:
    - name: livy-rest
      port: 8998
      protocol: TCP
      targetPort: 8998
  selector:
    component: livy
  sessionAffinity: None
  type: ClusterIP
---
apiVersion: route.openshift.io/v1
kind: Route
metadata:
  labels:
    app: livy
  name: livy
spec:
  host: {livy-url}
  port:
    targetPort: livy-rest
  to:
    kind: Service
    name: livy
    weight: 100
  wildcardPolicy: None

Após aplicá-lo e iniciar o pod com sucesso, a interface gráfica do Livy está disponível no link: http://{livy-url}/ui. Com Livy, podemos publicar nossa tarefa Spark usando uma solicitação REST de, por exemplo, Postman. Um exemplo de coleção com solicitações é apresentado a seguir (argumentos de configuração com variáveis ​​​​necessárias ao funcionamento da tarefa iniciada podem ser passados ​​​​no array “args”):

{
    "info": {
        "_postman_id": "be135198-d2ff-47b6-a33e-0d27b9dba4c8",
        "name": "Spark Livy",
        "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
    },
    "item": [
        {
            "name": "1 Submit job with jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar", nt"className": "org.apache.spark.examples.SparkPi",nt"numExecutors":1,nt"name": "spark-test-1",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt}n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        },
        {
            "name": "2 Submit job without jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "hdfs://{host}:{port}/{path-to-file-on-hdfs}", nt"className": "{class-name}",nt"numExecutors":1,nt"name": "spark-test-2",nt"proxyUser": "0",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt},nt"args": [ntt"HADOOP_CONF_DIR=/opt/spark/hadoop-conf",ntt"MASTER=k8s://https://kubernetes.default.svc:8443"nt]n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        }
    ],
    "event": [
        {
            "listen": "prerequest",
            "script": {
                "id": "41bea1d0-278c-40c9-ad42-bf2e6268897d",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        },
        {
            "listen": "test",
            "script": {
                "id": "3cdd7736-a885-4a2d-9668-bd75798f4560",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        }
    ],
    "protocolProfileBehavior": {}
}

Vamos executar a primeira solicitação da coleção, acessar a interface OKD e verificar se a tarefa foi iniciada com sucesso - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods. Ao mesmo tempo, aparecerá uma sessão na interface do Livy (http://{livy-url}/ui), dentro da qual, usando a API do Livy ou interface gráfica, você poderá acompanhar o andamento da tarefa e estudar a sessão Histórico.

Agora vamos mostrar como Lívio funciona. Para fazer isso, vamos examinar os logs do contêiner Livy dentro do pod com o servidor Livy - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods/{livy-pod-name }?tab=logs. A partir deles podemos ver que ao chamar a API REST do Livy em um contêiner chamado “livy”, um spark-submit é executado, semelhante ao que usamos acima (aqui {livy-pod-name} é o nome do pod criado com o servidor Livy). A coleção também apresenta uma segunda consulta que permite executar tarefas que hospedam remotamente um executável Spark usando um servidor Livy.

Terceiro caso de uso - Operador Spark

Agora que a tarefa foi testada, surge a questão de executá-la regularmente. A maneira nativa de executar tarefas regularmente em um cluster Kubernetes é a entidade CronJob e você pode usá-la, mas no momento o uso de operadores para gerenciar aplicativos no Kubernetes é muito popular e para Spark existe um operador bastante maduro, que também é usado em soluções de nível empresarial (por exemplo, Lightbend FastData Platform). Recomendamos usá-lo - a versão estável atual do Spark (2.4.5) tem opções de configuração bastante limitadas para executar tarefas do Spark no Kubernetes, enquanto a próxima versão principal (3.0.0) declara suporte total para Kubernetes, mas sua data de lançamento permanece desconhecida . O Spark Operator compensa essa deficiência adicionando opções de configuração importantes (por exemplo, montar um ConfigMap com configuração de acesso Hadoop para pods Spark) e a capacidade de executar uma tarefa agendada regularmente.

Executando Apache Spark no Kubernetes
Vamos destacá-lo como um terceiro caso de uso: execução regular de tarefas do Spark em um cluster Kubernetes em um loop de produção.

O Spark Operator é de código aberto e desenvolvido no Google Cloud Platform - github.com/GoogleCloudPlatform/spark-on-k8s-operator. Sua instalação pode ser feita de 3 formas:

  1. Como parte da instalação do Lightbend FastData Platform/Cloudflow;
  2. Usando o Helm:
    helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
    helm install incubator/sparkoperator --namespace spark-operator
    	

  3. Usando manifestos do repositório oficial (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/tree/master/manifest). Vale ressaltar o seguinte: Cloudflow inclui um operador com API versão v1beta1. Se esse tipo de instalação for usado, as descrições do manifesto do aplicativo Spark deverão ser baseadas em tags de exemplo no Git com a versão de API apropriada, por exemplo, "v1beta1-0.9.0-2.4.0". A versão do operador pode ser encontrada na descrição do CRD incluído no operador no dicionário “versões”:
    oc get crd sparkapplications.sparkoperator.k8s.io -o yaml
    	

Se o operador estiver instalado corretamente, um pod ativo com o operador Spark aparecerá no projeto correspondente (por exemplo, cloudflow-fdp-sparkoperator no espaço Cloudflow para a instalação do Cloudflow) e um tipo de recurso Kubernetes correspondente chamado “sparkapplications” aparecerá . Você pode explorar os aplicativos Spark disponíveis com o seguinte comando:

oc get sparkapplications -n {project}

Para executar tarefas usando o Spark Operator, você precisa fazer três coisas:

  • crie uma imagem Docker que inclua todas as bibliotecas necessárias, bem como arquivos de configuração e executáveis. Na imagem de destino, esta é uma imagem criada no estágio CI/CD e testada em um cluster de teste;
  • publicar uma imagem Docker em um registro acessível no cluster Kubernetes;
  • gerar um manifesto com o tipo “SparkApplication” e uma descrição da tarefa a ser lançada. Manifestos de exemplo estão disponíveis no repositório oficial (por exemplo, github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta1-0.9.0-2.4.0/examples/spark-pi.yaml). Há pontos importantes a serem observados sobre o manifesto:
    1. o dicionário “apiVersion” deverá indicar a versão da API correspondente à versão do operador;
    2. o dicionário “metadata.namespace” deverá indicar o namespace no qual a aplicação será lançada;
    3. o dicionário “spec.image” deve conter o endereço da imagem Docker criada em um registro acessível;
    4. o dicionário “spec.mainClass” deve conter a classe de tarefa Spark que precisa ser executada quando o processo for iniciado;
    5. o dicionário “spec.mainApplicationFile” deve conter o caminho para o arquivo jar executável;
    6. o dicionário “spec.sparkVersion” deve indicar a versão do Spark que está sendo utilizada;
    7. o dicionário “spec.driver.serviceAccount” deve especificar a conta de serviço dentro do namespace Kubernetes correspondente que será usado para executar a aplicação;
    8. o dicionário “spec.executor” deve indicar a quantidade de recursos alocados para a aplicação;
    9. o dicionário "spec.volumeMounts" deve especificar o diretório local no qual os arquivos de tarefas locais do Spark serão criados.

Um exemplo de geração de um manifesto (aqui {spark-service-account} é uma conta de serviço dentro do cluster Kubernetes para executar tarefas do Spark):

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: {spark-service-account}
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Este manifesto especifica uma conta de serviço para a qual, antes de publicar o manifesto, você deve criar as vinculações de função necessárias que fornecem os direitos de acesso necessários para que o aplicativo Spark interaja com a API do Kubernetes (se necessário). No nosso caso, o aplicativo precisa de direitos para criar Pods. Vamos criar a vinculação de função necessária:

oc adm policy add-role-to-user edit system:serviceaccount:{project}:{spark-service-account} -n {project}

Também é importante notar que esta especificação de manifesto pode incluir um parâmetro "hadoopConfigMap", que permite especificar um ConfigMap com a configuração do Hadoop sem ter que primeiro colocar o arquivo correspondente na imagem do Docker. Também é adequado para executar tarefas regularmente - usando o parâmetro “schedule”, um cronograma para executar uma determinada tarefa pode ser especificado.

Depois disso, salvamos nosso manifesto no arquivo spark-pi.yaml e o aplicamos ao nosso cluster Kubernetes:

oc apply -f spark-pi.yaml

Isso criará um objeto do tipo “sparkapplications”:

oc get sparkapplications -n {project}
> NAME       AGE
> spark-pi   22h

Neste caso, será criado um pod com uma aplicação, cujo status será exibido nos “sparkapplications” criados. Você pode visualizá-lo com o seguinte comando:

oc get sparkapplications spark-pi -o yaml -n {project}

Após a conclusão da tarefa, o POD passará para o status “Concluído”, que também será atualizado em “sparkapplications”. Os logs do aplicativo podem ser visualizados no navegador ou usando o seguinte comando (aqui {sparkapplications-pod-name} é o nome do pod da tarefa em execução):

oc logs {sparkapplications-pod-name} -n {project}

As tarefas do Spark também podem ser gerenciadas usando o utilitário especializado sparkctl. Para instalá-lo, clone o repositório com seu código-fonte, instale o Go e construa este utilitário:

git clone https://github.com/GoogleCloudPlatform/spark-on-k8s-operator.git
cd spark-on-k8s-operator/
wget https://dl.google.com/go/go1.13.3.linux-amd64.tar.gz
tar -xzf go1.13.3.linux-amd64.tar.gz
sudo mv go /usr/local
mkdir $HOME/Projects
export GOROOT=/usr/local/go
export GOPATH=$HOME/Projects
export PATH=$GOPATH/bin:$GOROOT/bin:$PATH
go -version
cd sparkctl
go build -o sparkctl
sudo mv sparkctl /usr/local/bin

Vamos examinar a lista de tarefas do Spark em execução:

sparkctl list -n {project}

Vamos criar uma descrição para a tarefa Spark:

vi spark-app.yaml

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1000m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Vamos executar a tarefa descrita usando sparkctl:

sparkctl create spark-app.yaml -n {project}

Vamos examinar a lista de tarefas do Spark em execução:

sparkctl list -n {project}

Vamos examinar a lista de eventos de uma tarefa do Spark iniciada:

sparkctl event spark-pi -n {project} -f

Vamos examinar o status da tarefa Spark em execução:

sparkctl status spark-pi -n {project}

Concluindo, gostaria de considerar as desvantagens descobertas do uso da versão estável atual do Spark (2.4.5) no Kubernetes:

  1. A primeira e, talvez, principal desvantagem é a falta de localidade de dados. Apesar de todas as deficiências do YARN, também havia vantagens em utilizá-lo, por exemplo, o princípio de entregar código aos dados (em vez de dados ao código). Graças a ele, as tarefas do Spark foram executadas nos nós onde estavam localizados os dados envolvidos nos cálculos e o tempo de entrega dos dados pela rede foi significativamente reduzido. Ao usar o Kubernetes, nos deparamos com a necessidade de mover os dados envolvidos em uma tarefa pela rede. Se forem grandes o suficiente, o tempo de execução da tarefa pode aumentar significativamente e também exigir uma quantidade bastante grande de espaço em disco alocado para instâncias de tarefas do Spark para armazenamento temporário. Essa desvantagem pode ser mitigada com o uso de software especializado que garanta a localidade dos dados no Kubernetes (por exemplo, Alluxio), mas isso na verdade significa a necessidade de armazenar uma cópia completa dos dados nos nós do cluster Kubernetes.
  2. A segunda desvantagem importante é a segurança. Por padrão, os recursos relacionados à segurança relativos à execução de tarefas do Spark estão desabilitados, o uso do Kerberos não é abordado na documentação oficial (embora as opções correspondentes tenham sido introduzidas na versão 3.0.0, o que exigirá trabalho adicional) e a documentação de segurança para usando Spark (https://spark.apache.org/docs/2.4.5/security.html) apenas YARN, Mesos e Standalone Cluster aparecem como armazenamentos de chaves. Ao mesmo tempo, o usuário sob o qual as tarefas do Spark são iniciadas não pode ser especificado diretamente - apenas especificamos a conta de serviço sob a qual ela funcionará e o usuário é selecionado com base nas políticas de segurança configuradas. Nesse sentido, ou é utilizado o usuário root, que não é seguro em um ambiente produtivo, ou um usuário com UID aleatório, o que é inconveniente na distribuição de direitos de acesso aos dados (isso pode ser resolvido criando PodSecurityPolicies e vinculando-os ao contas de serviço correspondentes). Atualmente, a solução é colocar todos os arquivos necessários diretamente na imagem do Docker ou modificar o script de inicialização do Spark para utilizar o mecanismo de armazenamento e recuperação de segredos adotado em sua organização.
  3. A execução de jobs do Spark usando Kubernetes ainda está oficialmente em modo experimental e pode haver mudanças significativas nos artefatos usados ​​(arquivos de configuração, imagens base do Docker e scripts de inicialização) no futuro. E de fato, na preparação do material, foram testadas as versões 2.3.0 e 2.4.5, o comportamento foi significativamente diferente.

Vamos aguardar as atualizações - foi lançada recentemente uma nova versão do Spark (3.0.0), que trouxe mudanças significativas no trabalho do Spark no Kubernetes, mas manteve o status experimental de suporte a este gerenciador de recursos. Talvez as próximas atualizações realmente tornem possível recomendar totalmente o abandono do YARN e a execução de tarefas do Spark no Kubernetes sem medo pela segurança do seu sistema e sem a necessidade de modificar componentes funcionais de forma independente.

Final

Fonte: habr.com

Adicionar um comentário