Kubernetes Operator in Python without frameworks and SDKs

Kubernetes Operator in Python without frameworks and SDKs

Go currently has a monopoly on the programming languages ​​people choose to write statements for Kubernetes. There are such objective reasons as:

  1. There is a powerful framework for developing operators in Go - Operator SDK.
  2. Game-changing applications such as Docker and Kubernetes are written in Go. To write your operator in Go is to speak the same language with the ecosystem.
  3. High performance Go applications and simple out-of-the-box concurrency tools.

NB: By the way, how to write your own operator in Go, we already described in one of our translations by foreign authors.

But what if lack of time or, corny, motivation prevents you from learning Go? The article provides an example of how you can write a good statement using one of the most popular languages ​​​​that almost every DevOps engineer knows - Python.

Meet: Copier - copy operator!

As an example, consider developing a simple operator to copy a ConfigMap either when a new namespace appears or when one of two entities changes: ConfigMap and Secret. From the point of view of practical application, the operator can be useful for mass updating application configurations (by updating ConfigMap) or for updating secrets - for example, keys for working with the Docker Registry (when adding a Secret to the namespace).

So, what should a good operator have:

  1. Interaction with the operator is carried out using Custom Resource Definitions (hereinafter - CRD).
  2. The operator can be customized. To do this, we will use command line flags and environment variables.
  3. The assembly of the Docker container and the Helm chart are being worked out so that users can easily (literally with one command) install the operator in their Kubernetes cluster.

CRD

In order for the operator to know what resources and where to look for, we need to set a rule for him. Each rule will be represented as a single CRD object. What fields should this CRD have?

  1. resource type, which we will look for (ConfigMap or Secret).
  2. List of namespaces, which should contain resources.
  3. Selector, by which we will search for resources in the namespace.

Let's describe CRD:

apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: copyrator.flant.com
spec:
  group: flant.com
  versions:
  - name: v1
    served: true
    storage: true
  scope: Namespaced
  names:
    plural: copyrators
    singular: copyrator
    kind: CopyratorRule
    shortNames:
    - copyr
  validation:
    openAPIV3Schema:
      type: object
      properties:
        ruleType:
          type: string
        namespaces:
          type: array
          items:
            type: string
        selector:
          type: string

And immediately create simple rule - to search in the namespace with the name default all ConfigMap with labels of the form copyrator: "true":

apiVersion: flant.com/v1
kind: CopyratorRule
metadata:
  name: main-rule
  labels:
    module: copyrator
ruleType: configmap
selector:
  copyrator: "true"
namespace: default

Ready! Now we need to somehow get information about our rule. I’ll make a reservation right away that we will not write queries to the cluster API Server ourselves. To do this, we use the ready-made Python library kubernetes-client:

import kubernetes
from contextlib import suppress


CRD_GROUP = 'flant.com'
CRD_VERSION = 'v1'
CRD_PLURAL = 'copyrators'


def load_crd(namespace, name):
    client = kubernetes.client.ApiClient()
    custom_api = kubernetes.client.CustomObjectsApi(client)

    with suppress(kubernetes.client.api_client.ApiException):
        crd = custom_api.get_namespaced_custom_object(
            CRD_GROUP,
            CRD_VERSION,
            namespace,
            CRD_PLURAL,
            name,
        )
    return {x: crd[x] for x in ('ruleType', 'selector', 'namespace')}

As a result of this code, we get the following:

{'ruleType': 'configmap', 'selector': {'copyrator': 'true'}, 'namespace': ['default']}

Great: we managed to get a rule for the operator. And most importantly, we did it, what is called the Kubernetes way.

Environment variables or flags? We take everything!

We pass to the main configuration of the operator. There are two basic approaches to configuring applications:

  1. use command line options;
  2. use environment variables.

Command line options allow you to read settings more flexibly, with support and validation of data types. Python's standard library has a module argparser, which we will use. Details and examples of its capabilities are available in official documentation.

Here is how an example of setting up reading command line flags will look like for our case:

   parser = ArgumentParser(
        description='Copyrator - copy operator.',
        prog='copyrator'
    )
    parser.add_argument(
        '--namespace',
        type=str,
        default=getenv('NAMESPACE', 'default'),
        help='Operator Namespace'
    )
    parser.add_argument(
        '--rule-name',
        type=str,
        default=getenv('RULE_NAME', 'main-rule'),
        help='CRD Name'
    )
    args = parser.parse_args()

On the other hand, using environment variables in Kubernetes, you can easily transfer service information about a pod inside a container. For example, we can get information about the namespace in which the pod is running with the following construction:

env:
- name: NAMESPACE
  valueFrom:
     fieldRef:
         fieldPath: metadata.namespace 

Operator logic

To understand how to separate methods for working with ConfigMap and Secret, we will use special maps. Then we can understand what methods we need to track and create an object:

LIST_TYPES_MAP = {
    'configmap': 'list_namespaced_config_map',
    'secret': 'list_namespaced_secret',
}

CREATE_TYPES_MAP = {
    'configmap': 'create_namespaced_config_map',
    'secret': 'create_namespaced_secret',
}

Next, you need to receive events from the API server. Let's implement it like this:

def handle(specs):
    kubernetes.config.load_incluster_config()
    v1 = kubernetes.client.CoreV1Api()

    # Получаем метод для слежения за объектами
    method = getattr(v1, LIST_TYPES_MAP[specs['ruleType']])
    func = partial(method, specs['namespace'])

    w = kubernetes.watch.Watch()
    for event in w.stream(func, _request_timeout=60):
        handle_event(v1, specs, event)

After receiving the event, we proceed to the main logic of its processing:

# Типы событий, на которые будем реагировать
ALLOWED_EVENT_TYPES = {'ADDED', 'UPDATED'}


def handle_event(v1, specs, event):
    if event['type'] not in ALLOWED_EVENT_TYPES:
        return

    object_ = event['object']
    labels = object_['metadata'].get('labels', {})

    # Ищем совпадения по selector'у
    for key, value in specs['selector'].items():
        if labels.get(key) != value:
            return
    # Получаем активные namespace'ы
    namespaces = map(
        lambda x: x.metadata.name,
        filter(
            lambda x: x.status.phase == 'Active',
            v1.list_namespace().items
        )
    )
    for namespace in namespaces:
        # Очищаем метаданные, устанавливаем namespace
        object_['metadata'] = {
            'labels': object_['metadata']['labels'],
            'namespace': namespace,
            'name': object_['metadata']['name'],
        }
        # Вызываем метод создания/обновления объекта
        methodcaller(
            CREATE_TYPES_MAP[specs['ruleType']],
            namespace,
            object_
        )(v1)

The main logic is ready! Now we need to pack it all into one Python package. We make out the file setup.py, write meta-information about the project there:

from sys import version_info

from setuptools import find_packages, setup

if version_info[:2] < (3, 5):
    raise RuntimeError(
        'Unsupported python version %s.' % '.'.join(version_info)
    )


_NAME = 'copyrator'
setup(
    name=_NAME,
    version='0.0.1',
    packages=find_packages(),
    classifiers=[
        'Development Status :: 3 - Alpha',
        'Programming Language :: Python',
        'Programming Language :: Python :: 3',
        'Programming Language :: Python :: 3.5',
        'Programming Language :: Python :: 3.6',
        'Programming Language :: Python :: 3.7',
    ],
    author='Flant',
    author_email='[email protected]',
    include_package_data=True,
    install_requires=[
        'kubernetes==9.0.0',
    ],
    entry_points={
        'console_scripts': [
            '{0} = {0}.cli:main'.format(_NAME),
        ]
    }
)

NB: The kubernetes client for Python has its own versioning. For more information on client and Kubernetes version compatibility, see compatibility matrices.

Now our project looks like this:

copyrator
├── copyrator
│   ├── cli.py # Логика работы с командной строкой
│   ├── constant.py # Константы, которые мы приводили выше
│   ├── load_crd.py # Логика загрузки CRD
│   └── operator.py # Основная логика работы оператора
└── setup.py # Оформление пакета

Docker and Helms

The Dockerfile will be insanely simple: take a base python-alpine image and install our package. We will postpone its optimization until better times:

FROM python:3.7.3-alpine3.9

ADD . /app

RUN pip3 install /app

ENTRYPOINT ["copyrator"]

Deployment for the operator is also very simple:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ .Chart.Name }}
spec:
  selector:
    matchLabels:
      name: {{ .Chart.Name }}
  template:
    metadata:
      labels:
        name: {{ .Chart.Name }}
    spec:
      containers:
      - name: {{ .Chart.Name }}
        image: privaterepo.yourcompany.com/copyrator:latest
        imagePullPolicy: Always
        args: ["--rule-type", "main-rule"]
        env:
        - name: NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
      serviceAccountName: {{ .Chart.Name }}-acc

Finally, you need to create an appropriate role for the operator with the necessary rights:

apiVersion: v1
kind: ServiceAccount
metadata:
  name: {{ .Chart.Name }}-acc

---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
  name: {{ .Chart.Name }}
rules:
  - apiGroups: [""]
    resources: ["namespaces"]
    verbs: ["get", "watch", "list"]
  - apiGroups: [""]
    resources: ["secrets", "configmaps"]
    verbs: ["*"]
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
  name: {{ .Chart.Name }}
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: {{ .Chart.Name }}
subjects:
- kind: ServiceAccount
  name: {{ .Chart.Name }}

Сonclusion

So, without fear, reproach and learning Go, we were able to assemble our own operator for Kubernetes in Python. Of course, he still has room to grow: in the future he will be able to process several rules, work in several threads, independently monitor changes to his CRDs ...

To be able to get to know the code better, we put it in public repository. If you want examples of more serious statements implemented with Python, you can turn your attention to two statements for deploying mongodb (first и second).

PS And if you are too lazy to deal with Kubernetes events, or if you are simply more accustomed to using Bash, our colleagues have prepared a ready-made solution in the form shell-operator (We announced him in April).

P.P.S

Read also on our blog:

Source: habr.com

Add a comment