Cómo crear tu propio escalador automático para un clúster

¡Hola! Formamos personas para trabajar con big data. Es imposible imaginar un programa educativo sobre big data sin un clúster propio en el que todos los participantes trabajen juntos. Por esta razón, nuestro programa siempre lo tiene 🙂 Nos dedicamos a su configuración, ajuste y administración, y los muchachos inician directamente los trabajos de MapReduce allí y usan Spark.

En esta publicación, le contaremos cómo resolvimos el problema de la carga desigual del clúster escribiendo nuestro propio escalador automático usando la nube. Soluciones en la nube Mail.ru.

problema

Nuestro clúster no se utiliza en un modo típico. La eliminación es muy desigual. Por ejemplo, hay clases prácticas, cuando las 30 personas y un profesor van al cluster y empiezan a utilizarlo. O también, hay días antes de la fecha límite en los que la carga aumenta mucho. El resto del tiempo el clúster funciona en modo de subcarga.

La solución número 1 es mantener un clúster que soporte cargas máximas, pero que esté inactivo el resto del tiempo.

La solución n.º 2 es mantener un clúster pequeño, al que se agregan nodos manualmente antes de las clases y durante las cargas máximas.

La solución n.° 3 es mantener un clúster pequeño y escribir un escalador automático que monitoreará la carga actual del clúster y, utilizando varias API, agregará y eliminará nodos del clúster.

En esta publicación hablaremos sobre la solución n.° 3. Este escalador automático depende en gran medida de factores externos más que internos, y los proveedores a menudo no lo proporcionan. Usamos la infraestructura en la nube de Mail.ru Cloud Solutions y escribimos un escalador automático usando la API MCS. Y como enseñamos cómo trabajar con datos, decidimos mostrarle cómo puede escribir un escalador automático similar para sus propios fines y usarlo con su nube.

Requisitos previos

Primero, debes tener un clúster de Hadoop. Por ejemplo, utilizamos la distribución HDP.

Para que sus nodos se agreguen y eliminen rápidamente, debe tener una cierta distribución de roles entre los nodos.

  1. Nodo maestro. Bueno, no es necesario explicar nada en particular: el nodo principal del clúster, en el que, por ejemplo, se inicia el controlador Spark, si se utiliza el modo interactivo.
  2. Nodo de fecha. Este es el nodo en el que almacena datos en HDFS y donde se realizan los cálculos.
  3. Nodo informático. Este es un nodo donde no se almacena nada en HDFS, pero donde se realizan los cálculos.

Punto importante. El escalado automático se producirá debido a los nodos del tercer tipo. Si comienza a tomar y agregar nodos del segundo tipo, la velocidad de respuesta será muy baja: desmantelar y volver a comprometer su clúster llevará horas. Esto, por supuesto, no es lo que se espera del ajuste de escala automático. Es decir, no tocamos nodos del primer y segundo tipo. Representarán un grupo mínimo viable que existirá durante toda la duración del programa.

Entonces, nuestro escalador automático está escrito en Python 3, usa la API de Ambari para administrar los servicios del clúster, usa API de soluciones en la nube Mail.ru (MCS) para arrancar y parar máquinas.

Arquitectura de soluciones

  1. Módulo autoscaler.py. Contiene tres clases: 1) funciones para trabajar con Ambari, 2) funciones para trabajar con MCS, 3) funciones relacionadas directamente con la lógica del escalador automático.
  2. Guión observer.py. Básicamente, consta de diferentes reglas: cuándo y en qué momentos llamar a las funciones del escalador automático.
  3. Archivo de configuración config.py. Contiene, por ejemplo, una lista de nodos permitidos para el escalado automático y otros parámetros que afectan, por ejemplo, el tiempo de espera desde el momento en que se agrega un nuevo nodo. También hay marcas de tiempo para el inicio de las clases, de modo que antes de la clase se lanza la configuración de clúster máxima permitida.

Veamos ahora los fragmentos de código dentro de los dos primeros archivos.

1. Módulo Autoscaler.py

clase ambari

Así es como se ve un fragmento de código que contiene una clase Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Arriba, como ejemplo, puedes ver la implementación de la función. stop_all_services, que detiene todos los servicios en el nodo del clúster deseado.

A la entrada de la clase Ambari pasas:

  • ambari_url, por ejemplo, como 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – el nombre de su grupo en Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • y dentro auth aquí está su nombre de usuario y contraseña para Ambari: auth = ('login', 'password').

La función en sí no es más que un par de llamadas a través de la API REST a Ambari. Desde un punto de vista lógico, primero recibimos una lista de servicios en ejecución en un nodo y luego solicitamos en un clúster determinado, en un nodo determinado, que transfiera servicios de la lista al estado. INSTALLED. Funciones para iniciar todos los servicios, para transferir nodos al estado. Maintenance etc. se ven similares: son solo algunas solicitudes a través de la API.

Mc de clase

Así es como se ve un fragmento de código que contiene una clase Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

A la entrada de la clase Mcs Pasamos el id del proyecto dentro de la nube y el id del usuario, así como su contraseña. En función vm_turn_on Queremos encender una de las máquinas. La lógica aquí es un poco más complicada. Al comienzo del código, se llaman otras tres funciones: 1) necesitamos obtener un token, 2) necesitamos convertir el nombre de host en el nombre de la máquina en MCS, 3) obtener la identificación de esta máquina. A continuación, simplemente realizamos una solicitud de publicación e iniciamos esta máquina.

Así es como se ve la función para obtener un token:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Clase de escalador automático

Esta clase contiene funciones relacionadas con la propia lógica operativa.

Así es como se ve un fragmento de código para esta clase:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Aceptamos clases para el ingreso. Ambari и Mcs, una lista de nodos que pueden escalarse, así como los parámetros de configuración del nodo: memoria y CPU asignadas al nodo en YARN. También hay 2 parámetros internos q_ram, q_cpu, que son colas. Utilizándolos, almacenamos los valores de la carga actual del clúster. Si vemos que durante los últimos 5 minutos ha habido un aumento constante de la carga, entonces decidimos que necesitamos agregar +1 nodo al clúster. Lo mismo ocurre con el estado de subutilización del clúster.

El código anterior es un ejemplo de una función que elimina una máquina del clúster y la detiene en la nube. Primero hay un desmantelamiento. YARN Nodemanager, luego el modo se enciende Maintenance, luego detenemos todos los servicios en la máquina y apagamos la máquina virtual en la nube.

2. Script observador.py

Código de muestra desde allí:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

En él, verificamos si se han creado las condiciones para aumentar la capacidad del clúster y si hay máquinas en reserva, obtenemos el nombre de host de una de ellas, la agregamos al clúster y publicamos un mensaje al respecto en el Slack de nuestro equipo. Después de lo cual comienza cooldown_period, cuando no agregamos ni eliminamos nada del clúster, sino que simplemente monitoreamos la carga. Si se ha estabilizado y se encuentra dentro del corredor de valores de carga óptimos, simplemente continuamos monitoreando. Si un nodo no fuera suficiente, agregamos otro.

Para los casos en los que tenemos una lección por delante, ya sabemos con certeza que un nodo no será suficiente, por lo que iniciamos inmediatamente todos los nodos libres y los mantenemos activos hasta el final de la lección. Esto sucede usando una lista de marcas de tiempo de actividad.

Conclusión

Autoscaler es una solución buena y conveniente para aquellos casos en los que experimenta una carga desigual del clúster. Al mismo tiempo, logra la configuración de clúster deseada para cargas máximas y al mismo tiempo no mantiene este clúster durante la carga insuficiente, lo que ahorra dinero. Bueno, además, todo esto sucede automáticamente sin tu participación. El escalador automático en sí no es más que un conjunto de solicitudes a la API del administrador del clúster y a la API del proveedor de la nube, escritas de acuerdo con una lógica determinada. Lo que definitivamente debes recordar es la división de los nodos en 3 tipos, como escribimos anteriormente. Y serás feliz.

Fuente: habr.com

Añadir un comentario