Wéi Dir Ären eegene Autoscaler fir e Cluster mécht

Hallo! Mir trainéieren Leit fir mat Big Data ze schaffen. Et ass onméiglech e Bildungsprogramm iwwer Big Data ouni säin eegene Cluster virzestellen, un deem all Participanten zesumme schaffen. Aus dësem Grond, eise Programm huet et ëmmer 🙂 Mir sinn a senger Configuratioun engagéiert, tuning an Administratioun, an der Kärelen direkt MapReduce Aarbechtsplazen do lancéiert a benotzen Spark.

An dësem Post wäerte mir Iech soen wéi mir de Problem vun der ongläicher Clusterbelaaschtung geléist hunn andeems mir eisen eegenen Autoscaler mat der Wollek schreiwen Mail.ru Cloud Léisunge.

Problem

Eise Stärekoup gëtt net an engem typesche Modus benotzt. Entsuergung ass héich ongläich. Zum Beispill gëtt et praktesch Coursë, wann all 30 Leit an e Schoulmeeschter an de Cluster ginn an ufänken se ze benotzen. Oder nach eng Kéier, et ginn Deeg virun der Frist wou d'Laascht staark eropgeet. De Rescht vun der Zäit funktionnéiert de Stärekoup am Ënnerlaaschtmodus.

Léisung #1 ass e Stärekoup ze halen deen Spëtzlaaschten widderstoen, awer de Rescht vun der Zäit idle wäert sinn.

Léisung #2 ass e klenge Stärekoup ze halen, un deen Dir manuell Node virun de Klassen a wärend Spëtzlaaschten bäidréit.

Léisung #3 ass e klenge Stärekoup ze halen an en Autoscaler ze schreiwen deen d'aktuell Belaaschtung vum Stärekoup iwwerwaacht an, andeems verschidde APIen benotzt, Noden aus dem Stärekoup addéieren an ewechhuelen.

An dësem Post wäerte mir iwwer Léisung #3 schwätzen. Dësen Autoscaler ass héich ofhängeg vun externen Faktoren anstatt internen, an d'Provider bidden et dacks net. Mir benotzen d'Mail.ru Cloud Solutions Cloud Infrastruktur a schreift en Autoscaler mat der MCS API. A well mir léieren wéi mat Daten ze schaffen, hu mir beschloss ze weisen wéi Dir en ähnlechen Autoscaler fir Är eegen Zwecker schreift an et mat Ärer Wollek benotzt

Viraussetzunge

Als éischt musst Dir en Hadoop Cluster hunn. Zum Beispill benotze mir d'HDP Verdeelung.

Fir datt Är Wirbelen séier bäigefüügt a geläscht ginn, musst Dir eng gewësse Rollverdeelung tëscht den Knäppercher hunn.

  1. Meeschtesch Node. Gutt, et ass net néideg eppes speziell z'erklären: den Haaptknuet vum Stärekoup, op deem zum Beispill de Spark Chauffer lancéiert gëtt, wann Dir den interaktiven Modus benotzt.
  2. Datum Node. Dëst ass den Node op deem Dir Daten op HDFS späichert a wou Berechnunge stattfannen.
  3. Rechen Node. Dëst ass en Node wou Dir näischt op HDFS späichert, awer wou Berechnunge geschéien.

Wichteg Punkt. Autoscaling wäert geschéien wéinst Node vun der drëtter Zort. Wann Dir ufänkt Node vun der zweeter Zort ze huelen an ze addéieren, wäert d'Äntwertgeschwindegkeet ganz niddereg sinn - d'Entféierung an d'Recommitting dauert Stonnen op Ärem Cluster. Dëst ass natierlech net wat Dir vun Autoscaling erwaart. Dat ass, mir beréieren net Node vun der éischter an zweeter Zort. Si wäerten e Minimum liewensfäeg Cluster representéieren, deen während der Dauer vum Programm existéiert.

Also, eisen Autoscaler ass am Python 3 geschriwwen, benotzt d'Ambari API fir Clusterservicer ze managen, benotzt API vun Mail.ru Cloud Solutions (MCS) fir Start- an Stoppmaschinnen.

Léisung Architektur

  1. Modul autoscaler.py. Et enthält dräi Klassen: 1) Funktiounen fir mat Ambari ze schaffen, 2) Funktiounen fir mat MCS ze schaffen, 3) Funktiounen am Zesummenhang mat der Logik vum Autoscaler.
  2. Schrëft observer.py. Wesentlech besteet et aus verschiddene Reegelen: wéini a wéi eng Momenter fir d'Autoscaler Funktiounen ze nennen.
  3. Configuratiounsdatei config.py. Et enthält, zum Beispill, eng Lëscht vun Noden erlaabt fir autoscaling an aner Parameteren, déi Afloss, zum Beispill, wéi laang ze wait aus dem Moment en neien Node gouf dobäi. Et ginn och Zäitstempel fir den Ufank vun de Klassen, sou datt virun der Klass déi maximal zulässlech Clusterkonfiguratioun lancéiert gëtt.

Loosst eis elo d'Stécker vum Code an den éischten zwee Dateien kucken.

1. Autoscaler.py Modul

Ambari Klass

Dëst ass wéi e Stéck Code mat enger Klass ausgesäit Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Uewen, als Beispill, kënnt Dir d'Ëmsetzung vun der Funktioun kucken stop_all_services, déi all Servicer op der gewënschter Cluster Node stoppt.

Bei der Entrée an der Klass Ambari du pass:

  • ambari_url, zum Beispill, wéi 'http://localhost:8080/api/v1/clusters/',
  • cluster_name - den Numm vun Ärem Stärekoup zu Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • an bannen auth hei ass Äre Benotzernumm a Passwuert fir Ambari: auth = ('login', 'password').

D'Funktioun selwer ass näischt méi wéi e puer Uruff iwwer d'REST API op Ambari. Aus enger logescher Siicht kréie mir fir d'éischt eng Lëscht vun de lafende Servicer op engem Node, a froen dann op engem bestëmmte Cluster, op engem bestëmmte Node, fir Servicer vun der Lëscht op de Staat ze transferéieren INSTALLED. Funktiounen fir all Servicer ze lancéieren, fir Noden op Staat ze transferéieren Maintenance etc kucken ähnlech - si sinn nëmmen e puer Ufroen duerch d'API.

Klass Mcs

Dëst ass wéi e Stéck Code mat enger Klass ausgesäit Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Bei der Entrée an der Klass Mcs mir passéieren de Projet ID an der Wollek an d'Benotzer ID, souwéi säi Passwuert. An Funktioun vm_turn_on mir wëllen eng vun de Maschinnen opzemaachen. D'Logik hei ass e bësse méi komplizéiert. Am Ufank vum Code ginn dräi aner Funktiounen genannt: 1) mir mussen en Token kréien, 2) mir mussen den Hostnumm an den Numm vun der Maschinn an MCS konvertéieren, 3) d'Id vun dëser Maschinn kréien. Als nächst maache mir einfach eng Post Ufro a starten dës Maschinn.

Dëst ass wéi d'Funktioun fir en Token ze kréien ausgesäit:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Autoscaler Klass

Dës Klass enthält Funktiounen am Zesummenhang mat der Operatiounslogik selwer.

Dëst ass wéi e Stéck Code fir dës Klass ausgesäit:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Mir akzeptéieren Klassen fir Entrée. Ambari и Mcs, eng Lëscht vun Noden, déi fir Skaléierung erlaabt sinn, souwéi Node Konfiguratiounsparameter: Erënnerung an CPU, déi dem Node am YARN zougewisen sinn. Et ginn och 2 intern Parameteren q_ram, q_cpu, déi Schlaangen sinn. Mat hinnen späichere mir d'Wäerter vun der aktueller Clusterbelaaschtung. Wa mir gesinn datt an de leschte 5 Minutten eng konsequent erhéicht Belaaschtung gouf, da entscheede mir datt mir +1 Node an de Stärekoup musse addéieren. Datselwecht ass wouer fir de Cluster-Ënnerutilisatiounsstaat.

De Code hei uewen ass e Beispill vun enger Funktioun déi eng Maschinn aus dem Stärekoup läscht an et an der Wollek stoppt. Als éischt gëtt et eng Ofbau YARN Nodemanager, da gëtt de Modus ageschalt Maintenance, da stoppen mir all Servicer op der Maschinn a schalten déi virtuell Maschinn an der Wollek aus.

2. Script observer.py

Beispill Code vun do aus:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

An et kontrolléiere mir ob Konditioune geschaf goufen fir d'Kapazitéit vum Stärekoup ze erhéijen an ob et Maschinnen an der Reserve sinn, kréien den Hostnumm vun engem vun hinnen, fügen se an de Stärekoup a publizéieren e Message doriwwer op eisem Team Slack. Duerno fänkt et un cooldown_period, wa mir näischt aus dem Cluster addéieren oder ewechhuelen, awer einfach d'Laascht iwwerwaachen. Wann et stabiliséiert ass an am Korridor vun optimale Belaaschtungswäerter ass, da fuere mir einfach weider iwwerwaachen. Wann een Node net genuch war, da addéiere mer en aneren.

Fir Fäll wou mir eng Lektioun viraus hunn, wësse mer scho sécher datt een Node net genuch ass, also fänken mir direkt un all gratis Wirbelen an halen se aktiv bis zum Enn vun der Lektioun. Dëst geschitt mat enger Lëscht vun Aktivitéiten Zäitstempel.

Konklusioun

Autoscaler ass eng gutt a praktesch Léisung fir déi Fäll wou Dir ongläiche Cluster Luede erliewt. Dir erreechen gläichzäiteg déi gewënschte Stärekoup Configuratioun fir Biergspëtzten Luede a gläichzäiteg halen dëse Stärekoup net während underload, Geld spueren. Gutt, plus dëst alles geschitt automatesch ouni Är Participatioun. Den Autoscaler selwer ass näischt méi wéi eng Rei vun Ufroen un de Cluster Manager API an de Cloud Provider API, geschriwwen no enger gewësser Logik. Wat Dir definitiv erënnere musst ass d'Divisioun vun Noden an 3 Typen, wéi mir virdru geschriwwen hunn. An Dir wäert glécklech sinn.

Source: will.com

Setzt e Commentaire