Comment créer votre propre autoscaler pour un cluster

Bonjour! Nous formons les gens à travailler avec le Big Data. Il est impossible d'imaginer un programme éducatif sur le big data sans son propre cluster, sur lequel tous les participants travaillent ensemble. Pour cette raison, notre programme l'a toujours 🙂 Nous sommes engagés dans sa configuration, son réglage et son administration, et les gars y lancent directement des travaux MapReduce et utilisent Spark.

Dans cet article, nous vous expliquerons comment nous avons résolu le problème du chargement inégal des clusters en écrivant notre propre autoscaler à l'aide du cloud. Solutions Cloud Mail.ru.

problème

Notre cluster n'est pas utilisé de manière classique. L'élimination est très inégale. Par exemple, il y a des cours pratiques, où les 30 personnes et un enseignant se rendent au cluster et commencent à l'utiliser. Ou encore, il y a des jours avant l’échéance où la charge augmente fortement. Le reste du temps, le cluster fonctionne en mode sous-charge.

La solution n°1 consiste à conserver un cluster qui résistera aux pics de charge, mais qui restera inactif le reste du temps.

La solution n°2 consiste à conserver un petit cluster, auquel vous ajoutez manuellement des nœuds avant les cours et pendant les pics de charge.

La solution n°3 consiste à conserver un petit cluster et à écrire un autoscaler qui surveillera la charge actuelle du cluster et, à l'aide de diverses API, ajoutera et supprimera des nœuds du cluster.

Dans cet article, nous parlerons de la solution n°3. Cet autoscaler dépend fortement de facteurs externes plutôt que internes, et les fournisseurs ne le fournissent souvent pas. Nous utilisons l'infrastructure cloud Mail.ru Cloud Solutions et avons écrit un autoscaler à l'aide de l'API MCS. Et puisque nous enseignons comment travailler avec des données, nous avons décidé de montrer comment vous pouvez écrire un autoscaler similaire pour vos propres besoins et l'utiliser avec votre cloud.

Pré-requis

Tout d'abord, vous devez disposer d'un cluster Hadoop. Par exemple, nous utilisons la distribution HDP.

Pour que vos nœuds soient rapidement ajoutés et supprimés, vous devez avoir une certaine répartition des rôles entre les nœuds.

  1. Nœud maître. Eh bien, inutile d'expliquer quoi que ce soit de particulier : le nœud principal du cluster, sur lequel, par exemple, le pilote Spark est lancé, si vous utilisez le mode interactif.
  2. Nœud de dates. Il s'agit du nœud sur lequel vous stockez les données sur HDFS et où s'effectuent les calculs.
  3. Nœud informatique. Il s'agit d'un nœud où vous ne stockez rien sur HDFS, mais où se produisent les calculs.

Point important. La mise à l'échelle automatique se produira en raison des nœuds du troisième type. Si vous commencez à prendre et à ajouter des nœuds du deuxième type, la vitesse de réponse sera très faible - la mise hors service et la réactivation prendront des heures sur votre cluster. Bien entendu, ce n’est pas ce que vous attendez de la mise à l’échelle automatique. Autrement dit, nous ne touchons pas aux nœuds des premier et deuxième types. Ils représenteront un cluster minimum viable qui existera pendant toute la durée du programme.

Ainsi, notre autoscaler est écrit en Python 3, utilise l'API Ambari pour gérer les services de cluster, utilise API de solutions cloud Mail.ru (MCS) pour le démarrage et l’arrêt des machines.

Architecture des solutions

  1. Module autoscaler.py. Il contient trois classes : 1) fonctions pour travailler avec Ambari, 2) fonctions pour travailler avec MCS, 3) fonctions directement liées à la logique de l'autoscaler.
  2. Scénario observer.py. Essentiellement, il se compose de différentes règles : quand et à quels moments appeler les fonctions de l'autoscaler.
  3. Fichier de configuration config.py. Il contient, par exemple, une liste de nœuds autorisés pour la mise à l'échelle automatique et d'autres paramètres qui affectent, par exemple, le temps d'attente à partir du moment où un nouveau nœud a été ajouté. Il existe également des horodatages pour le début des cours, de sorte qu'avant le cours, la configuration de cluster maximale autorisée soit lancée.

Examinons maintenant les morceaux de code contenus dans les deux premiers fichiers.

1.Module Autoscaler.py

Classe Ambari

Voici à quoi ressemble un morceau de code contenant une classe Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Ci-dessus, à titre d'exemple, vous pouvez regarder l'implémentation de la fonction stop_all_services, qui arrête tous les services sur le nœud de cluster souhaité.

A l'entrée de la classe Ambari vous passez:

  • ambari_url, par exemple, comme 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – le nom de votre cluster à Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • et à l'intérieur auth voici votre nom d'utilisateur et votre mot de passe pour Ambari : auth = ('login', 'password').

La fonction elle-même n'est rien de plus que quelques appels via l'API REST à Ambari. D'un point de vue logique, on reçoit d'abord une liste des services en cours d'exécution sur un nœud, puis on demande sur un cluster donné, sur un nœud donné, de transférer les services de la liste vers l'état INSTALLED. Fonctions de lancement de tous les services, de transfert de nœuds vers l'état Maintenance etc. se ressemblent - ce ne sont que quelques requêtes via l'API.

Mc de classe

Voici à quoi ressemble un morceau de code contenant une classe Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

A l'entrée de la classe Mcs nous transmettons l'identifiant du projet dans le cloud et l'identifiant de l'utilisateur, ainsi que son mot de passe. En fonction vm_turn_on nous voulons allumer l'une des machines. La logique ici est un peu plus compliquée. Au début du code, trois autres fonctions sont appelées : 1) nous devons obtenir un token, 2) nous devons convertir le nom d'hôte en nom de la machine dans MCS, 3) obtenir l'identifiant de cette machine. Ensuite, nous faisons simplement une demande de publication et lançons cette machine.

Voici à quoi ressemble la fonction d'obtention d'un token :

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Classe d'autoscaler

Cette classe contient des fonctions liées à la logique de fonctionnement elle-même.

Voici à quoi ressemble un morceau de code pour cette classe :

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Nous acceptons les cours pour l'entrée. Ambari и Mcs, une liste de nœuds autorisés pour la mise à l'échelle, ainsi que les paramètres de configuration du nœud : mémoire et processeur alloués au nœud dans YARN. Il existe également 2 paramètres internes q_ram, q_cpu, qui sont des files d'attente. En les utilisant, nous stockons les valeurs de la charge actuelle du cluster. Si nous constatons qu'au cours des 5 dernières minutes, la charge a constamment augmenté, nous décidons alors que nous devons ajouter un nœud +1 au cluster. Il en va de même pour l'état de sous-utilisation du cluster.

Le code ci-dessus est un exemple de fonction qui supprime une machine du cluster et l'arrête dans le cloud. Il y a d’abord un déclassement YARN Nodemanager, puis le mode s'active Maintenance, puis nous arrêtons tous les services sur la machine et désactivons la machine virtuelle dans le cloud.

2. Script observateur.py

Exemple de code à partir de là :

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Dans celui-ci, nous vérifions si les conditions ont été créées pour augmenter la capacité du cluster et s'il y a des machines en réserve, obtenons le nom d'hôte de l'une d'entre elles, l'ajoutons au cluster et publions un message à ce sujet sur Slack de notre équipe. Après quoi ça commence cooldown_period, lorsque nous n'ajoutons ou ne supprimons rien du cluster, mais surveillons simplement la charge. S'il s'est stabilisé et se situe dans la fourchette des valeurs de charge optimales, nous poursuivons simplement la surveillance. Si un nœud ne suffit pas, nous en ajoutons un autre.

Dans les cas où nous avons une leçon à venir, nous savons déjà avec certitude qu'un seul nœud ne suffira pas, nous démarrons donc immédiatement tous les nœuds libres et les maintenons actifs jusqu'à la fin de la leçon. Cela se produit à l’aide d’une liste d’horodatages d’activité.

Conclusion

Autoscaler est une solution efficace et pratique pour les cas où vous rencontrez un chargement de cluster inégal. Vous obtenez simultanément la configuration de cluster souhaitée pour les charges de pointe et en même temps vous ne conservez pas ce cluster en cas de sous-charge, ce qui permet d'économiser de l'argent. Eh bien, en plus, tout cela se fait automatiquement sans votre participation. L'autoscaler lui-même n'est rien de plus qu'un ensemble de requêtes adressées à l'API du gestionnaire de cluster et à l'API du fournisseur de cloud, écrites selon une certaine logique. Ce dont vous devez absolument vous souvenir, c'est la division des nœuds en 3 types, comme nous l'avons écrit plus tôt. Et vous serez heureux.

Source: habr.com

Ajouter un commentaire