Como crear o teu propio escalador automático para un clúster

Ola! Formamos persoas para traballar con big data. É imposible imaxinar un programa educativo sobre big data sen un clúster propio, no que todos os participantes traballen xuntos. Por iso, o noso programa sempre o ten 🙂 Dedicámonos á súa configuración, axuste e administración, e os mozos lanzan directamente traballos de MapReduce alí e usan Spark.

Nesta publicación contarémosche como resolvemos o problema da carga desigual do clúster escribindo o noso propio escalador automático usando a nube Solucións na nube Mail.ru.

problema

O noso clúster non se usa nun modo típico. A eliminación é moi irregular. Por exemplo, hai clases prácticas, cando as 30 persoas e un profesor van ao clúster e comezan a utilizalo. Ou de novo, hai días antes do prazo nos que a carga aumenta moito. O resto do tempo o clúster funciona en modo de subcarga.

A solución número 1 é manter un clúster que resista as cargas punta, pero que estará inactivo o resto do tempo.

A solución n.º 2 é manter un pequeno clúster, ao que engades manualmente nós antes das clases e durante os picos de carga.

A solución n.º 3 é manter un clúster pequeno e escribir un escalador automático que supervisará a carga actual do clúster e, mediante varias API, engadirá e eliminará nodos do clúster.

Neste post falaremos da solución #3. Este autoescalador depende moito de factores externos en lugar de internos, e os provedores moitas veces non o proporcionan. Usamos a infraestrutura de nube de Mail.ru Cloud Solutions e escribimos un escalador automático mediante a API de MCS. E como ensinamos a traballar con datos, decidimos mostrar como podes escribir un autoescalador similar para os teus propios propósitos e usalo coa túa nube.

Requisitos previos

En primeiro lugar, debes ter un clúster Hadoop. Por exemplo, usamos a distribución HDP.

Para que os teus nós se engadan e eliminen rapidamente, debes ter unha determinada distribución de roles entre os nodos.

  1. Nodo mestre. Ben, non hai que explicar nada en particular: o nodo principal do clúster, no que, por exemplo, se inicia o controlador Spark, se usa o modo interactivo.
  2. Nodo de data. Este é o nodo no que almacena os datos en HDFS e onde se realizan os cálculos.
  3. Nodo informático. Este é un nodo no que non se almacena nada en HDFS, pero onde se realizan os cálculos.

Punto importante. O autoescalado producirase debido aos nodos do terceiro tipo. Se comezas a tomar e engadir nodos do segundo tipo, a velocidade de resposta será moi baixa; a desactivación e a reactivación da actividade levará horas no teu clúster. Isto, por suposto, non é o que esperas do autoescalado. É dicir, non tocamos nós do primeiro e segundo tipo. Representarán un clúster mínimo viable que existirá durante toda a duración do programa.

Así, o noso escalador automático está escrito en Python 3, usa a API de Ambari para xestionar os servizos de clúster, usa API de Mail.ru Cloud Solutions (MCS) para arrancar e parar máquinas.

Arquitectura de solución

  1. Módulo autoscaler.py. Contén tres clases: 1) funcións para traballar con Ambari, 2) funcións para traballar con MCS, 3) funcións relacionadas directamente coa lóxica do autoescalador.
  2. Guión observer.py. Esencialmente, consta de diferentes regras: cando e en que momentos chamar ás funcións do autoescalador.
  3. Ficheiro de configuración config.py. Contén, por exemplo, unha lista de nodos permitidos para a escala automática e outros parámetros que afectan, por exemplo, ao tempo que hai que esperar desde o momento en que se engadiu un novo nodo. Tamén hai marcas de tempo para o inicio das clases, de xeito que antes da clase se inicia a configuración máxima permitida do clúster.

Vexamos agora as pezas de código dentro dos dous primeiros ficheiros.

1. Módulo Autoscaler.py

Clase Ambari

Así se ve un fragmento de código que contén unha clase Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Arriba, como exemplo, podes ver a implementación da función stop_all_services, que detén todos os servizos no nodo do clúster desexado.

Na entrada da clase Ambari pasas:

  • ambari_url, por exemplo, como 'http://localhost:8080/api/v1/clusters/',
  • cluster_name - o nome do teu clúster en Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • e dentro auth aquí tes o teu nome de usuario e contrasinal para Ambari: auth = ('login', 'password').

A función en si non é máis que un par de chamadas a través da API REST a Ambari. Desde un punto de vista lóxico, primeiro recibimos unha lista de servizos en execución nun nodo e despois pedimos nun clúster determinado, nun nodo determinado, que transfiren servizos da lista ao estado. INSTALLED. Funcións para lanzar todos os servizos, para transferir nodos ao estado Maintenance etc. parécense: son só algunhas solicitudes a través da API.

Clase Mcs

Así se ve un fragmento de código que contén unha clase Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Na entrada da clase Mcs pasamos o ID do proxecto dentro da nube e o ID de usuario, así como o seu contrasinal. En función vm_turn_on queremos acender unha das máquinas. A lóxica aquí é un pouco máis complicada. Ao comezo do código, chámanse outras tres funcións: 1) necesitamos obter un token, 2) necesitamos converter o nome de host no nome da máquina en MCS, 3) obter o ID desta máquina. A continuación, simplemente realizamos unha solicitude de publicación e iniciamos esta máquina.

Este é o aspecto da función para obter un token:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Clase Autoscaler

Esta clase contén funcións relacionadas coa propia lóxica de funcionamento.

Este é o aspecto dun fragmento de código para esta clase:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Aceptamos clases para entrar. Ambari и Mcs, unha lista de nodos que se permiten escalar, así como os parámetros de configuración dos nodos: memoria e CPU asignados ao nodo en YARN. Tamén hai 2 parámetros internos q_ram, q_cpu, que son filas. Utilizándoas, almacenamos os valores da carga actual do clúster. Se vemos que nos últimos 5 minutos houbo unha carga constantemente aumentada, entón decidimos que necesitamos engadir un nodo +1 ao clúster. O mesmo ocorre co estado de subutilización do clúster.

O código anterior é un exemplo dunha función que elimina unha máquina do clúster e deténla na nube. Primeiro hai un desmantelamento YARN Nodemanager, entón o modo activarase Maintenance, entón paramos todos os servizos da máquina e desactivamos a máquina virtual na nube.

2. Observador de guións.py

Código de mostra a partir de aí:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Nel, comprobamos se se crearon condicións para aumentar a capacidade do clúster e se hai máquinas en reserva, obtemos o nome de host dunha delas, engádeo ao clúster e publicamos unha mensaxe ao respecto no Slack do noso equipo. Despois do cal comeza cooldown_period, cando non engadimos nin eliminamos nada do clúster, senón que simplemente supervisamos a carga. Se se estabilizou e está dentro do corredor dos valores de carga óptimos, simplemente continuamos coa vixilancia. Se un nodo non fose suficiente, engadimos outro.

Para os casos nos que temos unha lección por diante, xa sabemos con certeza que un nodo non será suficiente, polo que iniciamos inmediatamente todos os nodos libres e mantemos activos ata o final da lección. Isto ocorre usando unha lista de marcas de tempo de actividade.

Conclusión

Autoscaler é unha solución boa e conveniente para aqueles casos nos que experimentas unha carga de clúster irregular. Simultáneamente consegues a configuración de clúster desexada para as cargas máximas e, ao mesmo tempo, non gardas este clúster durante a subcarga, aforrando cartos. Ben, ademais, todo isto ocorre automaticamente sen a túa participación. O propio escalador automático non é máis que un conxunto de solicitudes á API do xestor de clúster e á API do provedor de nube, escritas segundo unha certa lóxica. O que definitivamente debes lembrar é a división dos nodos en 3 tipos, como escribimos anteriormente. E serás feliz.

Fonte: www.habr.com

Engadir un comentario