How to make your own autoscaler for a cluster

Hello! We train people to work with big data. It is impossible to imagine an educational program on big data without its own cluster, on which all participants work together. For this reason, our program always has it πŸ™‚ We are engaged in its configuration, tuning and administration, and the guys directly run MapReduce jobs there and use Spark.

In this post, we will tell you how we solved the problem of uneven cluster loading by writing our own autoscaler using the cloud Mail.ru Cloud Solutions.

Problem

The cluster we use is not quite in a typical mode. Utilization is highly uneven. For example, there are practical classes when all 30 people and the teacher enter the cluster and start using it. Or again, there are days before the deadline when the load increases a lot. The rest of the time, the cluster runs in underload mode.

Solution #1 is to have a cluster that can handle peak usage but be idle the rest of the time.

Solution #2 is to keep a small cluster where you manually add nodes before classes and during peak times.

Solution number 3 is to keep a small cluster and write an autoscaler that will monitor the current load of the cluster and itself, using various APIs, add and remove nodes from the cluster.

In this post, we will talk about solution #3. Such an autoscaler is highly dependent on external factors, not internal ones, and providers often do not provide it. We use the Mail.ru Cloud Solutions cloud infrastructure and have written an auto scaler using the MCS API. And since we teach how to work with data, we decided to show how you can write a similar autoscaler for your own purposes and use it with your cloud

Prerequisites

First, you must have a Hadoop cluster. For example, we use the HDP distribution.

In order for you to quickly add and remove nodes, you must have a certain distribution of roles among the nodes.

  1. Master node. Well, there is nothing special to explain here: the main node of the cluster, on which, for example, the Spark driver is launched, if you use the interactive mode.
  2. Date node. This is the node on which you store data on HDFS and calculations take place on it.
  3. Computing node. This is a node where you don't have anything stored on HDFS, but calculations take place on it.

An important point. Autoscaling will occur due to nodes of the third type. If you start picking up and adding nodes of the second type, then the response speed will be very low - decommissioning and recommissioning will take hours on your cluster. This, of course, is not what you expect from autoscaling. That is, we do not touch the nodes of the first and second types. They will represent a minimum viable cluster that will exist throughout the duration of the program.

So, our autoscaler is written in Python 3, uses the Ambari API to manage cluster services, uses API from Mail.ru Cloud Solutions (MCS) to start and stop machines.

Solution architecture

  1. Module autoscaler.py. It contains three classes: 1) functions for working with Ambari, 2) functions for working with MCS, 3) functions directly related to the logic of the autoscaler.
  2. Script observer.py. In fact, it consists of different rules: when and at what moments to call autoscaler functions.
  3. File with configuration parameters config.py. It contains, for example, a list of nodes allowed for autoscaling and other parameters that affect, for example, how long to wait from the moment a new node was added. There are also timestamps for the start of classes, so that the maximum allowed cluster configuration is launched before the lesson.

Let's now look at the code snippets inside the first two files.

1. Autoscaler.py module

Ambari class

This is how the piece of code containing the class looks like Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Above, for an example, you can look at the implementation of the function stop_all_services, which stops all services on the desired cluster node.

At the entrance to the class Ambari you pass:

  • ambari_url, for example, of the form 'http://localhost:8080/api/v1/clusters/',
  • cluster_name - the name of your cluster in Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • and inside auth is your username and password from Ambari: auth = ('login', 'password').

The function itself is nothing more than a couple of calls through the REST API to Ambari. From the point of view of logic, we first get a list of running services on the node, and then we ask on this cluster, on this node, to transfer services from the list to the state INSTALLED. Functions for launching all services, for transferring nodes to a state Maintenance and others look similar - it's just a few requests through the API.

Mcs class

This is how the piece of code containing the class looks like Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

At the entrance to the class Mcs we pass the project id inside the cloud and the user id and password. In function vm_turn_on we want to turn on one of the machines. The logic here is a little more complicated. Three other functions are called at the beginning of the code: 1) we need to get a token, 2) we need to convert hostname to the name of the machine in MCS, 3) get the id of this machine. Next, we just make a post request and start this machine.

This is how the function for getting the token looks like:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Autoscaler class

This class contains functions related to the logic of work itself.

This is how the piece of code for this class looks like:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

We accept classes as input Ambari ΠΈ Mcs, a list of nodes that are allowed for scaling, as well as node configuration parameters: memory and cpu allocated per node in YARN. There are also 2 internal parameters q_ram, q_cpu, which are queues. With the help of them, we store the values ​​of the current load of the cluster. If we see that over the past 5 minutes there has been a stable increase in load, then we decide that we need to add +1 node to the cluster. The same is true for the cluster underload state.

The code above is an example of a function that removes a machine from the cluster and stops it in the cloud. Decommissioning occurs first YARN Nodemanager, then the mode is switched on Maintenance, then we stop all services on the machine and turn off the virtual machine in the cloud.

2. Script observer.py

Sample code from there:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

In it, we check whether the conditions for increasing the capacity of the cluster have developed and whether there are machines in reserve at all, we get the hostname of one of them, add it to the cluster, and publish a message about this in our team's Slack. After that it starts cooldown_period, when we do not add or remove anything from the cluster, but simply monitor the load. If it has stabilized and is within the corridor of optimal load values, then we simply continue monitoring. If one node is not enough, then add another one.

For cases when we have a lesson ahead, we already know for sure that one node is not enough, so we immediately start all free nodes and keep them active until the end of the lesson. This is done using a list of activity timestamps.

Conclusion

Autoscaler is a good and convenient solution for those cases when you experience uneven cluster loading. You simultaneously achieve the desired cluster configuration for peak loads and at the same time do not keep this cluster during underload, saving money. Well, plus it all happens automatically without your participation. The autoscaler itself is nothing more than a set of requests to the cluster manager API and the cloud provider API, written according to a certain logic. What you definitely need to remember is the division of nodes into 3 types, as we wrote earlier. And you will be happy.

Source: habr.com

Add a comment