Como fazer seu próprio escalonador automático para um cluster

Olá! Treinamos pessoas para trabalhar com big data. É impossível imaginar um programa educacional sobre big data sem um cluster próprio, no qual todos os participantes trabalhem juntos. Por isso nosso programa sempre tem 🙂 Estamos empenhados em sua configuração, ajuste e administração, e a galera lança diretamente os jobs do MapReduce lá e usa o Spark.

Neste post, contaremos como resolvemos o problema de carregamento desigual do cluster escrevendo nosso próprio escalonador automático usando a nuvem Soluções em nuvem Mail.ru.

problema

Nosso cluster não é usado em modo típico. O descarte é altamente desigual. Por exemplo, há aulas práticas, quando todas as 30 pessoas e um professor vão até o cluster e começam a utilizá-lo. Ou ainda, há dias antes do prazo em que a carga aumenta muito. No resto do tempo, o cluster opera no modo de subcarga.

A solução nº 1 é manter um cluster que suporte cargas de pico, mas que ficará ocioso no resto do tempo.

A solução nº 2 é manter um cluster pequeno, ao qual você adiciona nós manualmente antes das aulas e durante picos de carga.

A solução nº 3 é manter um cluster pequeno e escrever um escalonador automático que monitorará a carga atual do cluster e, usando várias APIs, adicionará e removerá nós do cluster.

Neste post falaremos sobre a solução nº 3. Esse escalonador automático é altamente dependente de fatores externos, e não de fatores internos, e os provedores muitas vezes não o fornecem. Usamos a infraestrutura em nuvem Mail.ru Cloud Solutions e escrevemos um escalonador automático usando a API MCS. E como ensinamos como trabalhar com dados, decidimos mostrar como você pode escrever um autoescalador semelhante para seus próprios propósitos e usá-lo com sua nuvem

Pré-requisitos

Primeiro, você deve ter um cluster Hadoop. Por exemplo, usamos a distribuição HDP.

Para que seus nós sejam adicionados e removidos rapidamente, você deve ter uma determinada distribuição de funções entre os nós.

  1. Nó mestre. Bem, não há necessidade de explicar nada em particular: o nó principal do cluster, no qual, por exemplo, o driver Spark é iniciado, se você usar o modo interativo.
  2. Nó de data. Este é o nó no qual você armazena dados no HDFS e onde ocorrem os cálculos.
  3. Nó de computação. Este é um nó onde você não armazena nada no HDFS, mas onde os cálculos acontecem.

Ponto importante. O escalonamento automático ocorrerá devido aos nós do terceiro tipo. Se você começar a adicionar e adicionar nós do segundo tipo, a velocidade de resposta será muito baixa - o descomissionamento e a nova confirmação levarão horas em seu cluster. É claro que isso não é o que você espera do escalonamento automático. Ou seja, não tocamos nos nós do primeiro e segundo tipos. Eles representarão um cluster mínimo viável que existirá durante toda a duração do programa.

Portanto, nosso escalonador automático é escrito em Python 3, usa a API Ambari para gerenciar serviços de cluster, usa API de soluções em nuvem Mail.ru (MCS) para partida e parada de máquinas.

Arquitetura da solução

  1. Módulo autoscaler.py. Contém três classes: 1) funções para trabalhar com Ambari, 2) funções para trabalhar com MCS, 3) funções relacionadas diretamente à lógica do autoescalador.
  2. Escrita observer.py. Essencialmente consiste em regras diferentes: quando e em que momentos chamar as funções do autoescalador.
  3. Arquivo de configuração config.py. Ele contém, por exemplo, uma lista de nós permitidos para escalonamento automático e outros parâmetros que afetam, por exemplo, quanto tempo esperar a partir do momento em que um novo nó foi adicionado. Existem também carimbos de data e hora para o início das aulas, para que antes da aula seja lançada a configuração máxima permitida do cluster.

Vejamos agora os trechos de código dentro dos dois primeiros arquivos.

1. Módulo Autoscaler.py

Aula de ambari

Esta é a aparência de um trecho de código contendo uma classe Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Acima, como exemplo, você pode ver a implementação da função stop_all_services, que interrompe todos os serviços no nó do cluster desejado.

Na entrada da aula Ambari você passa:

  • ambari_url, por exemplo, como 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – o nome do seu cluster em Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • e por dentro auth aqui está seu login e senha para Ambari: auth = ('login', 'password').

A função em si nada mais é do que algumas chamadas por meio da API REST para Ambari. Do ponto de vista lógico, primeiro obtemos uma lista de serviços em execução em um nó e, em seguida, solicitamos em um determinado cluster, em um determinado nó, para transferir serviços da lista para o estado INSTALLED. Funções para lançar todos os serviços, para transferir nós para o estado Maintenance etc. são semelhantes - são apenas algumas solicitações por meio da API.

Classe Mcs

Esta é a aparência de um trecho de código contendo uma classe Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Na entrada da aula Mcs passamos o id do projeto dentro da nuvem e o id do usuário, bem como sua senha. Em função vm_turn_on queremos ligar uma das máquinas. A lógica aqui é um pouco mais complicada. No início do código, três outras funções são chamadas: 1) precisamos obter um token, 2) precisamos converter o nome do host no nome da máquina no MCS, 3) obter o id desta máquina. Em seguida, simplesmente fazemos uma solicitação de postagem e lançamos esta máquina.

Esta é a aparência da função para obter um token:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Classe de escalonador automático

Esta classe contém funções relacionadas à própria lógica operacional.

Esta é a aparência de um trecho de código para esta classe:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Aceitamos aulas para entrada. Ambari и Mcs, uma lista de nós permitidos para escalonamento, bem como parâmetros de configuração do nó: memória e CPU alocadas para o nó no YARN. Existem também 2 parâmetros internos q_ram, q_cpu, que são filas. Usando-os, armazenamos os valores da carga atual do cluster. Se percebermos que nos últimos 5 minutos houve um aumento consistente de carga, decidimos que precisamos adicionar +1 nó ao cluster. O mesmo se aplica ao estado de subutilização do cluster.

O código acima é um exemplo de função que remove uma máquina do cluster e a interrompe na nuvem. Primeiro há um descomissionamento YARN Nodemanager, então o modo é ativado Maintenance, então paramos todos os serviços na máquina e desligamos a máquina virtual na nuvem.

2. Script observador.py

Exemplo de código daí:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Nele, verificamos se foram criadas condições para aumentar a capacidade do cluster e se há máquinas de reserva, pegamos o hostname de uma delas, adicionamos ao cluster e publicamos uma mensagem sobre isso no Slack da nossa equipe. Depois disso começa cooldown_period, quando não adicionamos ou removemos nada do cluster, mas simplesmente monitoramos a carga. Se tiver estabilizado e estiver dentro do corredor de valores de carga ótimos, simplesmente continuamos monitorando. Se um nó não for suficiente, adicionamos outro.

Para os casos em que temos uma aula pela frente, já sabemos com certeza que um nó não será suficiente, então iniciamos imediatamente todos os nós livres e os mantemos ativos até o final da aula. Isso acontece usando uma lista de carimbos de data/hora de atividades.

Conclusão

O Autoscaler é uma solução boa e conveniente para os casos em que você enfrenta um carregamento desigual do cluster. Você atinge simultaneamente a configuração de cluster desejada para cargas de pico e ao mesmo tempo não mantém esse cluster durante a subcarga, economizando dinheiro. Bem, além disso, tudo isso acontece automaticamente, sem a sua participação. O autoescalador em si nada mais é do que um conjunto de solicitações à API do gerenciador de cluster e à API do provedor de nuvem, escritas de acordo com uma determinada lógica. O que você definitivamente precisa lembrar é a divisão dos nós em 3 tipos, como escrevemos anteriormente. E você ficará feliz.

Fonte: habr.com

Adicionar um comentário