Com crear el vostre propi escalador automàtic per a un clúster

Hola! Formem persones per treballar amb big data. És impossible imaginar un programa educatiu sobre big data sense un clúster propi, en el qual treballen tots els participants. Per aquest motiu, el nostre programa sempre ho té 🙂 Ens dediquem a la seva configuració, ajust i administració, i els nois hi lancen directament les feines de MapReduce i utilitzen Spark.

En aquesta publicació us explicarem com hem resolt el problema de la càrrega desigual del clúster escrivint el nostre propi escalador automàtic mitjançant el núvol Mail.ru Solucions al núvol.

problema

El nostre clúster no s'utilitza en un mode típic. L'eliminació és molt desigual. Per exemple, hi ha classes pràctiques, quan les 30 persones i un professor van al clúster i comencen a utilitzar-lo. O de nou, hi ha dies abans de la data límit quan la càrrega augmenta molt. La resta del temps, el clúster funciona en mode de baixa càrrega.

La solució núm. 1 és mantenir un clúster que suporti les càrregues màximes, però que estarà inactiu la resta del temps.

La solució núm. 2 és mantenir un petit clúster, al qual afegiu manualment nodes abans de les classes i durant les càrregues punta.

La solució núm. 3 és mantenir un clúster petit i escriure un escalador automàtic que supervisarà la càrrega actual del clúster i, mitjançant diverses API, afegir i eliminar nodes del clúster.

En aquest post parlarem de la solució #3. Aquest escalador automàtic depèn molt de factors externs més que interns, i els proveïdors sovint no ho proporcionen. Utilitzem la infraestructura de núvol de Mail.ru Cloud Solutions i vam escriure un escalador automàtic mitjançant l'API MCS. I com que ensenyem a treballar amb dades, vam decidir mostrar com podeu escriure un autoescalador similar per als vostres propis propòsits i utilitzar-lo amb el vostre núvol.

Requisits previs

Primer, heu de tenir un clúster Hadoop. Per exemple, fem servir la distribució HDP.

Perquè els vostres nodes s'afegeixin i s'eliminin ràpidament, heu de tenir una certa distribució de rols entre els nodes.

  1. Node mestre. Bé, no cal explicar res en particular: el node principal del clúster, on, per exemple, es llança el controlador Spark, si utilitzeu el mode interactiu.
  2. Node de data. Aquest és el node on s'emmagatzemen les dades a HDFS i on es fan els càlculs.
  3. Node informàtic. Aquest és un node on no emmagatzemeu res a HDFS, però on es fan càlculs.

Punt important. L'autoescala es produirà a causa dels nodes del tercer tipus. Si comenceu a agafar i afegir nodes del segon tipus, la velocitat de resposta serà molt baixa; el desmantellament i la tornada a comprometre's trigaran hores al vostre clúster. Això, per descomptat, no és el que espereu de l'escala automàtica. És a dir, no toquem nodes del primer i segon tipus. Representaran un clúster mínim viable que existirà durant tota la durada del programa.

Per tant, el nostre escalador automàtic està escrit en Python 3, utilitza l'API Ambari per gestionar els serveis de clúster, utilitza API de Mail.ru Cloud Solutions (MCS) per engegar i aturar màquines.

Arquitectura de la solució

  1. Mòdul autoscaler.py. Conté tres classes: 1) funcions per treballar amb Ambari, 2) funcions per treballar amb MCS, 3) funcions relacionades directament amb la lògica de l'autoscaler.
  2. Guió observer.py. Essencialment, consta de diferents regles: quan i en quins moments s'ha d'anomenar les funcions de l'autoscaler.
  3. Fitxer de configuració config.py. Conté, per exemple, una llista de nodes permesos per a l'escalat automàtic i altres paràmetres que afecten, per exemple, quant de temps cal esperar des del moment en què s'ha afegit un nou node. També hi ha marques de temps per a l'inici de les classes, de manera que abans de la classe s'inicia la configuració màxima de clúster permesa.

Vegem ara les peces de codi dins dels dos primers fitxers.

1. Mòdul Autoscaler.py

Classe Ambari

Així és un fragment de codi que conté una classe Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

A dalt, com a exemple, podeu veure la implementació de la funció stop_all_services, que atura tots els serveis al node del clúster desitjat.

A l'entrada de la classe Ambari passes:

  • ambari_url, per exemple, com 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – el nom del vostre clúster a Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • i dins auth aquí teniu el vostre inici de sessió i contrasenya per a Ambari: auth = ('login', 'password').

La funció en si no és més que un parell de trucades mitjançant l'API REST a Ambari. Des d'un punt de vista lògic, primer rebem una llista de serveis en execució en un node, i després demanem en un clúster determinat, en un node determinat, que transfereixin serveis de la llista a l'estat. INSTALLED. Funcions per llançar tots els serveis, per transferir nodes a estat Maintenance etc. semblen semblants: només són algunes sol·licituds a través de l'API.

Classe Mcs

Així és un fragment de codi que conté una classe Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

A l'entrada de la classe Mcs passem l'identificador del projecte dins del núvol i l'identificador d'usuari, així com la seva contrasenya. En funció vm_turn_on volem encendre una de les màquines. La lògica aquí és una mica més complicada. Al principi del codi, s'anomenen tres funcions més: 1) hem d'obtenir un testimoni, 2) hem de convertir el nom de l'amfitrió en el nom de la màquina en MCS, 3) obtenir l'identificador d'aquesta màquina. A continuació, només fem una sol·licitud de publicació i iniciem aquesta màquina.

Així és com es veu la funció per obtenir un testimoni:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Classe d'autoscaler

Aquesta classe conté funcions relacionades amb la pròpia lògica de funcionament.

Així és com es veu un fragment de codi d'aquesta classe:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Acceptem classes per entrar. Ambari и Mcs, una llista de nodes que es permeten escalar, així com els paràmetres de configuració del node: memòria i CPU assignats al node a YARN. També hi ha 2 paràmetres interns q_ram, q_cpu, que són cues. Utilitzant-los, emmagatzemem els valors de la càrrega actual del clúster. Si veiem que durant els últims 5 minuts hi ha hagut una càrrega constantment augmentada, decidim que hem d'afegir un node +1 al clúster. El mateix passa amb l'estat de subutilització del clúster.

El codi anterior és un exemple d'una funció que elimina una màquina del clúster i l'atura al núvol. En primer lloc, hi ha una clausura YARN Nodemanager, llavors el mode s'activa Maintenance, després aturem tots els serveis de la màquina i apaguem la màquina virtual al núvol.

2. Script observer.py

Exemple de codi d'allà:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

En ella, comprovem si s'han creat les condicions per augmentar la capacitat del clúster i si hi ha màquines en reserva, obtenim el nom d'amfitrió d'una d'elles, l'afegim al clúster i publiquem un missatge al respecte al Slack del nostre equip. Després del qual comença cooldown_period, quan no afegim ni eliminem res del clúster, sinó que simplement supervisem la càrrega. Si s'ha estabilitzat i es troba dins del corredor dels valors de càrrega òptims, simplement continuem amb el seguiment. Si no n'hi ha prou amb un node, n'afegim un altre.

Per als casos en què tenim una lliçó per davant, ja sabem del cert que un node no serà suficient, així que iniciem immediatament tots els nodes lliures i els mantenim actius fins al final de la lliçó. Això passa amb una llista de marca de temps d'activitat.

Conclusió

Autoscaler és una solució bona i còmoda per als casos en què experimenteu una càrrega de clúster desigual. Simultàniament, aconseguiu la configuració de clúster desitjada per a les càrregues màximes i, al mateix temps, no mantingueu aquest clúster durant la subcàrrega, estalviant diners. Bé, a més, tot això passa automàticament sense la vostra participació. El propi escalador automàtic no és més que un conjunt de sol·licituds a l'API del gestor de clúster i a l'API del proveïdor de núvol, escrites segons una certa lògica. El que definitivament heu de recordar és la divisió dels nodes en 3 tipus, com hem escrit anteriorment. I seràs feliç.

Font: www.habr.com

Afegeix comentari