Apache Spark draaien op Kubernetes

Beste lezers, goedemiddag. Vandaag zullen we het een beetje hebben over Apache Spark en zijn ontwikkelingsvooruitzichten.

Apache Spark draaien op Kubernetes

In de moderne wereld van Big Data is Apache Spark de de facto standaard voor het ontwikkelen van batchgegevensverwerkingstaken. Daarnaast wordt het ook gebruikt om streaming-applicaties te maken die werken in het microbatch-concept, waarbij gegevens in kleine porties worden verwerkt en verzonden (Spark Structured Streaming). En traditioneel maakte het deel uit van de algehele Hadoop-stack, waarbij YARN (of in sommige gevallen Apache Mesos) als resourcemanager werd gebruikt. Tegen 2020 is het gebruik ervan in zijn traditionele vorm voor de meeste bedrijven twijfelachtig vanwege het gebrek aan fatsoenlijke Hadoop-distributies - de ontwikkeling van HDP en CDH is gestopt, CDH is niet goed ontwikkeld en heeft hoge kosten, en de overige Hadoop-leveranciers hebben óf ophield te bestaan, óf een sombere toekomst had. Daarom is de lancering van Apache Spark met behulp van Kubernetes van toenemend belang onder de gemeenschap en grote bedrijven. Het wordt een standaard in containerorkestratie en resourcebeheer in private en publieke clouds, het lost het probleem op met lastige resourceplanning van Spark-taken op YARN en biedt een zich gestaag ontwikkelend platform met veel commerciële en open distributies voor bedrijven van alle soorten en maten. Bovendien zijn de meesten er, in de nasleep van de populariteit, al in geslaagd een aantal eigen installaties aan te schaffen en hun expertise in het gebruik ervan te vergroten, wat de verhuizing vereenvoudigt.

Vanaf versie 2.3.0 heeft Apache Spark officiële ondersteuning gekregen voor het uitvoeren van taken in een Kubernetes-cluster en vandaag zullen we het hebben over de huidige volwassenheid van deze aanpak, verschillende opties voor het gebruik ervan en valkuilen die je tijdens de implementatie tegenkomt.

Laten we eerst eens kijken naar het proces van het ontwikkelen van taken en applicaties op basis van Apache Spark en typische gevallen benadrukken waarin u een taak op een Kubernetes-cluster moet uitvoeren. Bij het voorbereiden van dit bericht wordt OpenShift gebruikt als distributie en worden opdrachten gegeven die relevant zijn voor het opdrachtregelhulpprogramma (oc). Voor andere Kubernetes-distributies kunnen de overeenkomstige opdrachten van het standaard Kubernetes-opdrachtregelhulpprogramma (kubectl) of hun analogen (bijvoorbeeld voor oc adm-beleid) worden gebruikt.

Eerste gebruiksscenario: Spark-verzenden

Tijdens de ontwikkeling van taken en applicaties moet de ontwikkelaar taken uitvoeren om gegevenstransformatie te debuggen. Theoretisch kunnen stubs voor deze doeleinden worden gebruikt, maar ontwikkeling met deelname van echte (zij het test) exemplaren van eindsystemen is sneller en beter gebleken in deze klasse van taken. In het geval dat we fouten opsporen op echte exemplaren van eindsystemen, zijn er twee scenario's mogelijk:

  • de ontwikkelaar voert een Spark-taak lokaal uit in de standalone-modus;

    Apache Spark draaien op Kubernetes

  • een ontwikkelaar voert een Spark-taak uit op een Kubernetes-cluster in een testlus.

    Apache Spark draaien op Kubernetes

De eerste optie heeft bestaansrecht, maar brengt een aantal nadelen met zich mee:

  • Elke ontwikkelaar moet vanaf de werkplek toegang krijgen tot alle instances van de eindsystemen die hij nodig heeft;
  • Er zijn voldoende middelen nodig op de werkende machine om de taak die wordt ontwikkeld uit te voeren.

De tweede optie heeft deze nadelen niet, omdat u met het gebruik van een Kubernetes-cluster de benodigde resourcepool voor het uitvoeren van taken kunt toewijzen en deze de nodige toegang kunt geven tot eindsysteeminstanties, waarbij u op flexibele wijze toegang daartoe kunt verlenen met behulp van het Kubernetes-rolmodel voor alle leden van het ontwikkelteam. Laten we het benadrukken als de eerste use case: het starten van Spark-taken vanaf een lokale ontwikkelaarsmachine op een Kubernetes-cluster in een testlus.

Laten we het meer hebben over het proces van het instellen van Spark om lokaal te draaien. Om Spark te kunnen gebruiken, moet je het installeren:

mkdir /opt/spark
cd /opt/spark
wget http://mirror.linux-ia64.org/apache/spark/spark-2.4.5/spark-2.4.5.tgz
tar zxvf spark-2.4.5.tgz
rm -f spark-2.4.5.tgz

Voor het werken met Kubernetes verzamelen wij de benodigde pakketten:

cd spark-2.4.5/
./build/mvn -Pkubernetes -DskipTests clean package

Een volledige build kost veel tijd, en om Docker-images te maken en deze op een Kubernetes-cluster uit te voeren, heb je eigenlijk alleen jar-bestanden uit de map "assembly/" nodig, dus je kunt alleen dit subproject bouwen:

./build/mvn -f ./assembly/pom.xml -Pkubernetes -DskipTests clean package

Als u Spark-taken op Kubernetes wilt uitvoeren, moet u een Docker-installatiekopie maken die u als basisinstallatiekopie kunt gebruiken. Er zijn hier 2 mogelijke benaderingen:

  • De gegenereerde Docker-image bevat de uitvoerbare Spark-taakcode;
  • De gemaakte image bevat alleen Spark en de benodigde afhankelijkheden, de uitvoerbare code wordt op afstand gehost (bijvoorbeeld in HDFS).

Laten we eerst een Docker-image bouwen met een testvoorbeeld van een Spark-taak. Om Docker-images te maken, heeft Spark een hulpprogramma genaamd "docker-image-tool". Laten we de hulp erover bestuderen:

./bin/docker-image-tool.sh --help

Met zijn hulp kun je Docker-images maken en deze uploaden naar externe registers, maar standaard heeft het een aantal nadelen:

  • creëert zonder fouten 3 Docker-images tegelijk - voor Spark, PySpark en R;
  • staat u niet toe een afbeeldingsnaam op te geven.

Daarom zullen we een aangepaste versie van dit hulpprogramma gebruiken, hieronder weergegeven:

vi bin/docker-image-tool-upd.sh

#!/usr/bin/env bash

function error {
  echo "$@" 1>&2
  exit 1
}

if [ -z "${SPARK_HOME}" ]; then
  SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
. "${SPARK_HOME}/bin/load-spark-env.sh"

function image_ref {
  local image="$1"
  local add_repo="${2:-1}"
  if [ $add_repo = 1 ] && [ -n "$REPO" ]; then
    image="$REPO/$image"
  fi
  if [ -n "$TAG" ]; then
    image="$image:$TAG"
  fi
  echo "$image"
}

function build {
  local BUILD_ARGS
  local IMG_PATH

  if [ ! -f "$SPARK_HOME/RELEASE" ]; then
    IMG_PATH=$BASEDOCKERFILE
    BUILD_ARGS=(
      ${BUILD_PARAMS}
      --build-arg
      img_path=$IMG_PATH
      --build-arg
      datagram_jars=datagram/runtimelibs
      --build-arg
      spark_jars=assembly/target/scala-$SPARK_SCALA_VERSION/jars
    )
  else
    IMG_PATH="kubernetes/dockerfiles"
    BUILD_ARGS=(${BUILD_PARAMS})
  fi

  if [ -z "$IMG_PATH" ]; then
    error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
  fi

  if [ -z "$IMAGE_REF" ]; then
    error "Cannot find docker image reference. Please add -i arg."
  fi

  local BINDING_BUILD_ARGS=(
    ${BUILD_PARAMS}
    --build-arg
    base_img=$(image_ref $IMAGE_REF)
  )
  local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/docker/Dockerfile"}

  docker build $NOCACHEARG "${BUILD_ARGS[@]}" 
    -t $(image_ref $IMAGE_REF) 
    -f "$BASEDOCKERFILE" .
}

function push {
  docker push "$(image_ref $IMAGE_REF)"
}

function usage {
  cat <<EOF
Usage: $0 [options] [command]
Builds or pushes the built-in Spark Docker image.

Commands:
  build       Build image. Requires a repository address to be provided if the image will be
              pushed to a different registry.
  push        Push a pre-built image to a registry. Requires a repository address to be provided.

Options:
  -f file               Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
  -p file               Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark.
  -R file               Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark.
  -r repo               Repository address.
  -i name               Image name to apply to the built image, or to identify the image to be pushed.  
  -t tag                Tag to apply to the built image, or to identify the image to be pushed.
  -m                    Use minikube's Docker daemon.
  -n                    Build docker image with --no-cache
  -b arg      Build arg to build or push the image. For multiple build args, this option needs to
              be used separately for each build arg.

Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
available when running applications inside the minikube cluster.

Check the following documentation for more information on using the minikube Docker daemon:

  https://kubernetes.io/docs/getting-started-guides/minikube/#reusing-the-docker-daemon

Examples:
  - Build image in minikube with tag "testing"
    $0 -m -t testing build

  - Build and push image with tag "v2.3.0" to docker.io/myrepo
    $0 -r docker.io/myrepo -t v2.3.0 build
    $0 -r docker.io/myrepo -t v2.3.0 push
EOF
}

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
  usage
  exit 0
fi

REPO=
TAG=
BASEDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
IMAGE_REF=
while getopts f:mr:t:nb:i: option
do
 case "${option}"
 in
 f) BASEDOCKERFILE=${OPTARG};;
 r) REPO=${OPTARG};;
 t) TAG=${OPTARG};;
 n) NOCACHEARG="--no-cache";;
 i) IMAGE_REF=${OPTARG};;
 b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};;
 esac
done

case "${@: -1}" in
  build)
    build
    ;;
  push)
    if [ -z "$REPO" ]; then
      usage
      exit 1
    fi
    push
    ;;
  *)
    usage
    exit 1
    ;;
esac

Met zijn hulp stellen we een basis Spark-image samen met een testtaak voor het berekenen van Pi met Spark (hier is {docker-registry-url} de URL van uw Docker-imageregister, {repo} is de naam van de repository in het register, die overeenkomt met het project in OpenShift, {image-name} - naam van de afbeelding (als scheiding op drie niveaus van afbeeldingen wordt gebruikt, bijvoorbeeld, zoals in het geïntegreerde register van Red Hat OpenShift-afbeeldingen), {tag} - tag hiervan versie van de afbeelding):

./bin/docker-image-tool-upd.sh -f resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile -r {docker-registry-url}/{repo} -i {image-name} -t {tag} build

Meld u aan bij het OKD-cluster met behulp van het consolehulpprogramma (hier is {OKD-API-URL} de URL van de OKD-cluster-API):

oc login {OKD-API-URL}

Laten we het token van de huidige gebruiker ophalen voor autorisatie in het Docker-register:

oc whoami -t

Log in op het interne Docker-register van het OKD-cluster (we gebruiken het token verkregen met de vorige opdracht als wachtwoord):

docker login {docker-registry-url}

Laten we de samengestelde Docker-image uploaden naar de Docker Registry OKD:

./bin/docker-image-tool-upd.sh -r {docker-registry-url}/{repo} -i {image-name} -t {tag} push

Laten we controleren of de samengestelde afbeelding beschikbaar is in OKD. Open hiervoor de URL in de browser met een lijst met afbeeldingen van het overeenkomstige project (hier is {project} de naam van het project binnen het OpenShift-cluster, {OKD-WEBUI-URL} is de URL van de OpenShift-webconsole ) - https://{OKD-WEBUI-URL}/console /project/{project}/browse/images/{afbeeldingsnaam}.

Om taken uit te voeren, moet een serviceaccount worden aangemaakt met de rechten om pods als root uit te voeren (we zullen dit punt later bespreken):

oc create sa spark -n {project}
oc adm policy add-scc-to-user anyuid -z spark -n {project}

Laten we de opdracht spark-submit uitvoeren om een ​​Spark-taak naar het OKD-cluster te publiceren, waarbij het gemaakte serviceaccount en de Docker-installatiekopie worden opgegeven:

 /opt/spark/bin/spark-submit --name spark-test --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar

Здесь:

—name — de naam van de taak die zal deelnemen aan de vorming van de naam van de Kubernetes-pods;

—class — klasse van het uitvoerbare bestand, aangeroepen wanneer de taak start;

—conf — Spark-configuratieparameters;

spark.executor.instances — het aantal Spark-uitvoerders dat moet worden gestart;

spark.kubernetes.authenticate.driver.serviceAccountName - de naam van het Kubernetes-serviceaccount dat wordt gebruikt bij het starten van pods (om de beveiligingscontext en -mogelijkheden te definiëren bij interactie met de Kubernetes API);

spark.kubernetes.namespace — Kubernetes-naamruimte waarin driver- en executor-pods worden gelanceerd;

spark.submit.deployMode — methode voor het starten van Spark (voor standaard spark-submit wordt “cluster” gebruikt, voor Spark Operator en latere versies van Spark “client”);

spark.kubernetes.container.image - Docker-image gebruikt om pods te starten;

spark.master — Kubernetes API-URL (extern is gespecificeerd zodat toegang plaatsvindt vanaf de lokale machine);

local:// is het pad naar het uitvoerbare Spark-bestand in de Docker-installatiekopie.

We gaan naar het overeenkomstige OKD-project en bestuderen de gemaakte pods - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods.

Om het ontwikkelingsproces te vereenvoudigen kan een andere optie worden gebruikt, waarbij een gemeenschappelijk basisimage van Spark wordt gemaakt, dat door alle taken wordt gebruikt om uit te voeren, en momentopnamen van uitvoerbare bestanden worden gepubliceerd naar externe opslag (bijvoorbeeld Hadoop) en gespecificeerd bij het aanroepen spark-submit als link. In dit geval kunt u verschillende versies van Spark-taken uitvoeren zonder Docker-installatiekopieën opnieuw op te bouwen, waarbij u bijvoorbeeld WebHDFS gebruikt om installatiekopieën te publiceren. We sturen een verzoek om een ​​bestand aan te maken (hier is {host} de host van de WebHDFS-service, {port} is de poort van de WebHDFS-service, {path-to-file-on-hdfs} is het gewenste pad naar het bestand op HDFS):

curl -i -X PUT "http://{host}:{port}/webhdfs/v1/{path-to-file-on-hdfs}?op=CREATE

U ontvangt een reactie als deze (hier is {location} de URL die moet worden gebruikt om het bestand te downloaden):

HTTP/1.1 307 TEMPORARY_REDIRECT
Location: {location}
Content-Length: 0

Laad het uitvoerbare Spark-bestand in HDFS (hier is {path-to-local-file} het pad naar het uitvoerbare Spark-bestand op de huidige host):

curl -i -X PUT -T {path-to-local-file} "{location}"

Hierna kunnen we spark-submit uitvoeren met behulp van het Spark-bestand dat is geüpload naar HDFS (hier is {class-name} de naam van de klasse die moet worden gestart om de taak te voltooien):

/opt/spark/bin/spark-submit --name spark-test --class {class-name} --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  hdfs://{host}:{port}/{path-to-file-on-hdfs}

Opgemerkt moet worden dat om toegang te krijgen tot HDFS en ervoor te zorgen dat de taak werkt, u mogelijk het Dockerfile en het entrypoint.sh-script moet wijzigen. Voeg een richtlijn toe aan het Dockerfile om afhankelijke bibliotheken naar de map /opt/spark/jars te kopiëren en neem het HDFS-configuratiebestand op in SPARK_CLASSPATH in entrypoint.sh.

Tweede gebruiksscenario: Apache Livy

Wanneer een taak wordt ontwikkeld en het resultaat moet worden getest, rijst bovendien de vraag of deze moet worden gelanceerd als onderdeel van het CI/CD-proces en hoe de status van de uitvoering ervan kan worden gevolgd. Natuurlijk kunt u het uitvoeren met behulp van een lokale spark-submit-aanroep, maar dit bemoeilijkt de CI/CD-infrastructuur omdat hiervoor Spark op de CI-serveragenten/runners moet worden geïnstalleerd en geconfigureerd en toegang tot de Kubernetes API moet worden ingesteld. In dit geval heeft de doelimplementatie ervoor gekozen Apache Livy te gebruiken als REST API voor het uitvoeren van Spark-taken die worden gehost in een Kubernetes-cluster. Met zijn hulp kun je Spark-taken uitvoeren op een Kubernetes-cluster met behulp van reguliere cURL-verzoeken, die eenvoudig kunnen worden geïmplementeerd op basis van elke CI-oplossing, en de plaatsing ervan binnen het Kubernetes-cluster lost het probleem van authenticatie op bij interactie met de Kubernetes API.

Apache Spark draaien op Kubernetes

Laten we het benadrukken als een tweede gebruiksscenario: het uitvoeren van Spark-taken als onderdeel van een CI/CD-proces op een Kubernetes-cluster in een testlus.

Iets over Apache Livy: het werkt als een HTTP-server die een webinterface en een RESTful API biedt waarmee u op afstand spark-submit kunt starten door de noodzakelijke parameters door te geven. Traditioneel wordt het geleverd als onderdeel van een HDP-distributie, maar het kan ook worden geïmplementeerd op OKD of een andere Kubernetes-installatie met behulp van het juiste manifest en een set Docker-images, zoals deze: github.com/ttauveron/k8s-big-data-experiments/tree/master/livy-spark-2.3. Voor ons geval is een vergelijkbare Docker-image gebouwd, inclusief Spark-versie 2.4.5 uit het volgende Docker-bestand:

FROM java:8-alpine

ENV SPARK_HOME=/opt/spark
ENV LIVY_HOME=/opt/livy
ENV HADOOP_CONF_DIR=/etc/hadoop/conf
ENV SPARK_USER=spark

WORKDIR /opt

RUN apk add --update openssl wget bash && 
    wget -P /opt https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz && 
    tar xvzf spark-2.4.5-bin-hadoop2.7.tgz && 
    rm spark-2.4.5-bin-hadoop2.7.tgz && 
    ln -s /opt/spark-2.4.5-bin-hadoop2.7 /opt/spark

RUN wget http://mirror.its.dal.ca/apache/incubator/livy/0.7.0-incubating/apache-livy-0.7.0-incubating-bin.zip && 
    unzip apache-livy-0.7.0-incubating-bin.zip && 
    rm apache-livy-0.7.0-incubating-bin.zip && 
    ln -s /opt/apache-livy-0.7.0-incubating-bin /opt/livy && 
    mkdir /var/log/livy && 
    ln -s /var/log/livy /opt/livy/logs && 
    cp /opt/livy/conf/log4j.properties.template /opt/livy/conf/log4j.properties

ADD livy.conf /opt/livy/conf
ADD spark-defaults.conf /opt/spark/conf/spark-defaults.conf
ADD entrypoint.sh /entrypoint.sh

ENV PATH="/opt/livy/bin:${PATH}"

EXPOSE 8998

ENTRYPOINT ["/entrypoint.sh"]
CMD ["livy-server"]

De gegenereerde image kan worden gebouwd en geüpload naar uw bestaande Docker-repository, zoals de interne OKD-repository. Om het te implementeren, gebruikt u het volgende manifest ({registry-url} - URL van het Docker-imageregister, {image-name} - Docker-imagenaam, {tag} - Docker-imagetag, {livy-url} - gewenste URL waar de server zal toegankelijk zijn Livy; het ‘Route’-manifest wordt gebruikt als Red Hat OpenShift wordt gebruikt als de Kubernetes-distributie, anders wordt het overeenkomstige Ingress- of Service-manifest van het type NodePort gebruikt):

---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: livy
  name: livy
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      component: livy
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        component: livy
    spec:
      containers:
        - command:
            - livy-server
          env:
            - name: K8S_API_HOST
              value: localhost
            - name: SPARK_KUBERNETES_IMAGE
              value: 'gnut3ll4/spark:v1.0.14'
          image: '{registry-url}/{image-name}:{tag}'
          imagePullPolicy: Always
          name: livy
          ports:
            - containerPort: 8998
              name: livy-rest
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - mountPath: /var/log/livy
              name: livy-log
            - mountPath: /opt/.livy-sessions/
              name: livy-sessions
            - mountPath: /opt/livy/conf/livy.conf
              name: livy-config
              subPath: livy.conf
            - mountPath: /opt/spark/conf/spark-defaults.conf
              name: spark-config
              subPath: spark-defaults.conf
        - command:
            - /usr/local/bin/kubectl
            - proxy
            - '--port'
            - '8443'
          image: 'gnut3ll4/kubectl-sidecar:latest'
          imagePullPolicy: Always
          name: kubectl
          ports:
            - containerPort: 8443
              name: k8s-api
              protocol: TCP
          resources: {}
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      serviceAccount: spark
      serviceAccountName: spark
      terminationGracePeriodSeconds: 30
      volumes:
        - emptyDir: {}
          name: livy-log
        - emptyDir: {}
          name: livy-sessions
        - configMap:
            defaultMode: 420
            items:
              - key: livy.conf
                path: livy.conf
            name: livy-config
          name: livy-config
        - configMap:
            defaultMode: 420
            items:
              - key: spark-defaults.conf
                path: spark-defaults.conf
            name: livy-config
          name: spark-config
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: livy-config
data:
  livy.conf: |-
    livy.spark.deploy-mode=cluster
    livy.file.local-dir-whitelist=/opt/.livy-sessions/
    livy.spark.master=k8s://http://localhost:8443
    livy.server.session.state-retain.sec = 8h
  spark-defaults.conf: 'spark.kubernetes.container.image        "gnut3ll4/spark:v1.0.14"'
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: livy
  name: livy
spec:
  ports:
    - name: livy-rest
      port: 8998
      protocol: TCP
      targetPort: 8998
  selector:
    component: livy
  sessionAffinity: None
  type: ClusterIP
---
apiVersion: route.openshift.io/v1
kind: Route
metadata:
  labels:
    app: livy
  name: livy
spec:
  host: {livy-url}
  port:
    targetPort: livy-rest
  to:
    kind: Service
    name: livy
    weight: 100
  wildcardPolicy: None

Nadat u deze hebt toegepast en de pod succesvol hebt gestart, is de grafische interface van Livy beschikbaar via de link: http://{livy-url}/ui. Met Livy kunnen we onze Spark-taak publiceren met behulp van een REST-verzoek van bijvoorbeeld Postman. Een voorbeeld van een verzameling met verzoeken wordt hieronder weergegeven (configuratieargumenten met variabelen die nodig zijn voor de werking van de gestarte taak kunnen worden doorgegeven in de array "args"):

{
    "info": {
        "_postman_id": "be135198-d2ff-47b6-a33e-0d27b9dba4c8",
        "name": "Spark Livy",
        "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
    },
    "item": [
        {
            "name": "1 Submit job with jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar", nt"className": "org.apache.spark.examples.SparkPi",nt"numExecutors":1,nt"name": "spark-test-1",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt}n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        },
        {
            "name": "2 Submit job without jar",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{nt"file": "hdfs://{host}:{port}/{path-to-file-on-hdfs}", nt"className": "{class-name}",nt"numExecutors":1,nt"name": "spark-test-2",nt"proxyUser": "0",nt"conf": {ntt"spark.jars.ivy": "/tmp/.ivy",ntt"spark.kubernetes.authenticate.driver.serviceAccountName": "spark",ntt"spark.kubernetes.namespace": "{project}",ntt"spark.kubernetes.container.image": "{docker-registry-url}/{repo}/{image-name}:{tag}"nt},nt"args": [ntt"HADOOP_CONF_DIR=/opt/spark/hadoop-conf",ntt"MASTER=k8s://https://kubernetes.default.svc:8443"nt]n}"
                },
                "url": {
                    "raw": "http://{livy-url}/batches",
                    "protocol": "http",
                    "host": [
                        "{livy-url}"
                    ],
                    "path": [
                        "batches"
                    ]
                }
            },
            "response": []
        }
    ],
    "event": [
        {
            "listen": "prerequest",
            "script": {
                "id": "41bea1d0-278c-40c9-ad42-bf2e6268897d",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        },
        {
            "listen": "test",
            "script": {
                "id": "3cdd7736-a885-4a2d-9668-bd75798f4560",
                "type": "text/javascript",
                "exec": [
                    ""
                ]
            }
        }
    ],
    "protocolProfileBehavior": {}
}

Laten we het eerste verzoek uit de verzameling uitvoeren, naar de OKD-interface gaan en controleren of de taak succesvol is gestart - https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods. Tegelijkertijd verschijnt er een sessie in de Livy-interface (http://{livy-url}/ui), waarin u met behulp van de Livy API of grafische interface de voortgang van de taak kunt volgen en de sessie kunt bestuderen logboeken.

Laten we nu laten zien hoe Livy werkt. Laten we hiervoor de logboeken van de Livy-container in de pod met de Livy-server bekijken: https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods/{livy-pod-name }?tab=logboeken. Hieruit kunnen we zien dat bij het aanroepen van de Livy REST API in een container met de naam “livy”, een spark-submit wordt uitgevoerd, vergelijkbaar met degene die we hierboven hebben gebruikt (hier is {livy-pod-name} de naam van de gemaakte pod met de Livy-server). De verzameling introduceert ook een tweede query waarmee u taken kunt uitvoeren die op afstand een uitvoerbaar Spark-bestand hosten met behulp van een Livy-server.

Derde gebruiksscenario: Spark Operator

Nu de taak is getest, rijst de vraag of deze regelmatig moet worden uitgevoerd. De native manier om regelmatig taken uit te voeren in een Kubernetes-cluster is de CronJob-entiteit en die kun je gebruiken, maar op dit moment is het gebruik van operators om applicaties in Kubernetes te beheren erg populair en voor Spark is er een redelijk volwassen operator, die ook gebruikt in oplossingen op ondernemingsniveau (bijvoorbeeld Lightbend FastData Platform). We raden aan om het te gebruiken - de huidige stabiele versie van Spark (2.4.5) heeft vrij beperkte configuratie-opties voor het uitvoeren van Spark-taken in Kubernetes, terwijl de volgende hoofdversie (3.0.0) volledige ondersteuning voor Kubernetes verklaart, maar de releasedatum blijft onbekend . Spark Operator compenseert deze tekortkoming door belangrijke configuratieopties toe te voegen (bijvoorbeeld het koppelen van een ConfigMap met Hadoop-toegangsconfiguratie aan Spark-pods) en de mogelijkheid om een ​​regelmatig geplande taak uit te voeren.

Apache Spark draaien op Kubernetes
Laten we het benadrukken als een derde gebruiksscenario: het regelmatig uitvoeren van Spark-taken op een Kubernetes-cluster in een productielus.

Spark Operator is open source en ontwikkeld binnen het Google Cloud Platform - github.com/GoogleCloudPlatform/spark-on-k8s-operator. De installatie ervan kan op 3 manieren worden gedaan:

  1. Als onderdeel van de Lightbend FastData Platform/Cloudflow-installatie;
  2. Helm gebruiken:
    helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
    helm install incubator/sparkoperator --namespace spark-operator
    	

  3. Met behulp van manifesten uit de officiële repository (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/tree/master/manifest). Het is de moeite waard om het volgende op te merken: Cloudflow bevat een operator met API-versie v1beta1. Als dit type installatie wordt gebruikt, moeten de manifestbeschrijvingen van de Spark-toepassing gebaseerd zijn op voorbeeldtags in Git met de juiste API-versie, bijvoorbeeld 'v1beta1-0.9.0-2.4.0'. De versie van de operator kunt u vinden in de beschrijving van de CRD die is opgenomen in de operator in het woordenboek “versies”:
    oc get crd sparkapplications.sparkoperator.k8s.io -o yaml
    	

Als de operator correct is geïnstalleerd, verschijnt er een actieve pod met de Spark-operator in het bijbehorende project (bijvoorbeeld cloudflow-fdp-sparkoperator in de Cloudflow-ruimte voor de Cloudflow-installatie) en verschijnt er een bijbehorend Kubernetes-brontype met de naam 'sparkapplications'. . U kunt beschikbare Spark-toepassingen verkennen met de volgende opdracht:

oc get sparkapplications -n {project}

Om taken uit te voeren met Spark Operator moet je 3 dingen doen:

  • maak een Docker-image die alle benodigde bibliotheken bevat, evenals configuratie- en uitvoerbare bestanden. In de doelafbeelding is dit een afbeelding die is gemaakt in de CI/CD-fase en is getest op een testcluster;
  • publiceer een Docker-image naar een register dat toegankelijk is vanuit het Kubernetes-cluster;
  • genereer een manifest met het type “SparkApplication” en een beschrijving van de te starten taak. Voorbeeldmanifesten zijn beschikbaar in de officiële repository (bijv. github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta1-0.9.0-2.4.0/examples/spark-pi.yaml). Er zijn belangrijke opmerkingen over het manifest:
    1. het woordenboek “apiVersion” moet de API-versie aangeven die overeenkomt met de operatorversie;
    2. het woordenboek “metadata.namespace” moet de naamruimte aangeven waarin de applicatie zal worden gestart;
    3. het woordenboek “spec.image” moet het adres van de gemaakte Docker-image in een toegankelijk register bevatten;
    4. het woordenboek “spec.mainClass” moet de Spark-taakklasse bevatten die moet worden uitgevoerd wanneer het proces start;
    5. het woordenboek “spec.mainApplicationFile” moet het pad naar het uitvoerbare jar-bestand bevatten;
    6. het woordenboek “spec.sparkVersion” moet de versie van Spark aangeven die wordt gebruikt;
    7. het woordenboek “spec.driver.serviceAccount” moet het serviceaccount specificeren binnen de overeenkomstige Kubernetes-naamruimte die zal worden gebruikt om de applicatie uit te voeren;
    8. het woordenboek “spec.executor” moet het aantal bronnen aangeven dat aan de applicatie is toegewezen;
    9. het woordenboek "spec.volumeMounts" moet de lokale map opgeven waarin de lokale Spark-taakbestanden worden gemaakt.

Een voorbeeld van het genereren van een manifest (hier is {spark-service-account} een serviceaccount binnen het Kubernetes-cluster voor het uitvoeren van Spark-taken):

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: {spark-service-account}
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Dit manifest specificeert een serviceaccount waarvoor u, voordat u het manifest publiceert, de benodigde rolbindingen moet maken die de benodigde toegangsrechten bieden voor de Spark-toepassing om te communiceren met de Kubernetes API (indien nodig). In ons geval heeft de applicatie rechten nodig om Pods te maken. Laten we de noodzakelijke rolbinding creëren:

oc adm policy add-role-to-user edit system:serviceaccount:{project}:{spark-service-account} -n {project}

Het is ook vermeldenswaard dat deze manifestspecificatie een parameter "hadoopConfigMap" kan bevatten, waarmee u een ConfigMap met de Hadoop-configuratie kunt opgeven zonder dat u eerst het overeenkomstige bestand in de Docker-image hoeft te plaatsen. Het is ook geschikt voor het regelmatig uitvoeren van taken - met behulp van de parameter “schedule” kan een schema voor het uitvoeren van een bepaalde taak worden gespecificeerd.

Daarna slaan we ons manifest op in het spark-pi.yaml-bestand en passen het toe op ons Kubernetes-cluster:

oc apply -f spark-pi.yaml

Hiermee wordt een object van het type Sparkapplications gemaakt:

oc get sparkapplications -n {project}
> NAME       AGE
> spark-pi   22h

In dit geval wordt er een pod met een applicatie aangemaakt, waarvan de status wordt weergegeven in de aangemaakte “sparkapplications”. Je kunt het bekijken met het volgende commando:

oc get sparkapplications spark-pi -o yaml -n {project}

Na voltooiing van de taak gaat de POD naar de status 'Voltooid', die ook wordt bijgewerkt in 'sparkapplications'. Toepassingslogboeken kunnen worden bekeken in de browser of met behulp van de volgende opdracht (hier is {sparkapplications-pod-name} de naam van de pod van de actieve taak):

oc logs {sparkapplications-pod-name} -n {project}

Spark-taken kunnen ook worden beheerd met behulp van het gespecialiseerde sparkctl-hulpprogramma. Om het te installeren, kloont u de repository met de broncode, installeert u Go en bouwt u dit hulpprogramma:

git clone https://github.com/GoogleCloudPlatform/spark-on-k8s-operator.git
cd spark-on-k8s-operator/
wget https://dl.google.com/go/go1.13.3.linux-amd64.tar.gz
tar -xzf go1.13.3.linux-amd64.tar.gz
sudo mv go /usr/local
mkdir $HOME/Projects
export GOROOT=/usr/local/go
export GOPATH=$HOME/Projects
export PATH=$GOPATH/bin:$GOROOT/bin:$PATH
go -version
cd sparkctl
go build -o sparkctl
sudo mv sparkctl /usr/local/bin

Laten we de lijst met actieve Spark-taken bekijken:

sparkctl list -n {project}

Laten we een beschrijving maken voor de Spark-taak:

vi spark-app.yaml

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: {project}
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  sparkVersion: "2.4.0"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1000m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Laten we de beschreven taak uitvoeren met sparkctl:

sparkctl create spark-app.yaml -n {project}

Laten we de lijst met actieve Spark-taken bekijken:

sparkctl list -n {project}

Laten we de lijst met gebeurtenissen van een gelanceerde Spark-taak bekijken:

sparkctl event spark-pi -n {project} -f

Laten we de status van de actieve Spark-taak bekijken:

sparkctl status spark-pi -n {project}

Concluderend wil ik graag stilstaan ​​bij de ontdekte nadelen van het gebruik van de huidige stabiele versie van Spark (2.4.5) in Kubernetes:

  1. Het eerste en misschien wel belangrijkste nadeel is het gebrek aan Data Locality. Ondanks alle tekortkomingen van YARN waren er ook voordelen aan het gebruik ervan, bijvoorbeeld het principe van het leveren van code aan data (in plaats van data aan code). Dankzij dit werden Spark-taken uitgevoerd op de knooppunten waar de gegevens die betrokken waren bij de berekeningen zich bevonden, en werd de tijd die nodig was om gegevens via het netwerk te leveren aanzienlijk verkort. Bij het gebruik van Kubernetes worden we geconfronteerd met de noodzaak om gegevens die bij een taak betrokken zijn, over het netwerk te verplaatsen. Als ze groot genoeg zijn, kan de uitvoeringstijd van de taak aanzienlijk toenemen en is er ook een vrij grote hoeveelheid schijfruimte nodig die is toegewezen aan Spark-taakinstanties voor hun tijdelijke opslag. Dit nadeel kan worden ondervangen door gebruik te maken van gespecialiseerde software die de datalocatie in Kubernetes garandeert (bijvoorbeeld Alluxio), maar dit betekent feitelijk de noodzaak om een ​​volledige kopie van de data op te slaan op de knooppunten van het Kubernetes-cluster.
  2. Het tweede belangrijke nadeel is de veiligheid. Standaard zijn beveiligingsgerelateerde functies met betrekking tot het uitvoeren van Spark-taken uitgeschakeld, wordt het gebruik van Kerberos niet behandeld in de officiële documentatie (hoewel de overeenkomstige opties zijn geïntroduceerd in versie 3.0.0, wat extra werk vereist) en de beveiligingsdocumentatie voor met behulp van Spark (https://spark.apache.org/docs/2.4.5/security.html) verschijnen alleen YARN, Mesos en Standalone Cluster als sleutelwinkels. Tegelijkertijd kan de gebruiker onder wie Spark-taken worden gestart niet rechtstreeks worden gespecificeerd: we specificeren alleen het serviceaccount waaronder het zal werken, en de gebruiker wordt geselecteerd op basis van het geconfigureerde beveiligingsbeleid. In dit opzicht wordt ofwel de rootgebruiker gebruikt, wat niet veilig is in een productieve omgeving, ofwel een gebruiker met een willekeurige UID, wat lastig is bij het verdelen van toegangsrechten tot gegevens (dit kan worden opgelost door PodSecurityPolicies te maken en deze te koppelen aan de overeenkomstige serviceaccounts). Momenteel is de oplossing om alle benodigde bestanden rechtstreeks in de Docker-image te plaatsen, of om het Spark-startscript aan te passen om het mechanisme voor het opslaan en ophalen van geheimen te gebruiken dat in uw organisatie is aangenomen.
  3. Het uitvoeren van Spark-taken met Kubernetes bevindt zich officieel nog in de experimentele modus en er kunnen in de toekomst aanzienlijke veranderingen optreden in de gebruikte artefacten (configuratiebestanden, Docker-basisimages en startscripts). En inderdaad, bij het voorbereiden van het materiaal, versies 2.3.0 en 2.4.5 werden getest, was het gedrag aanzienlijk anders.

Laten we wachten op updates - onlangs is een nieuwe versie van Spark (3.0.0) uitgebracht, die aanzienlijke veranderingen in het werk van Spark op Kubernetes met zich meebracht, maar de experimentele status van ondersteuning voor deze resourcemanager behield. Misschien zullen de volgende updates het echt mogelijk maken om YARN volledig te verlaten en Spark-taken op Kubernetes uit te voeren zonder angst voor de veiligheid van uw systeem en zonder de noodzaak om zelfstandig functionele componenten te wijzigen.

Einde

Bron: www.habr.com

Voeg een reactie