Hur man gör din egen autoscaler för ett kluster

Hallå! Vi utbildar människor att arbeta med big data. Det är omöjligt att föreställa sig ett utbildningsprogram om big data utan ett eget kluster, där alla deltagare arbetar tillsammans. Av denna anledning har vårt program alltid det 🙂 Vi är engagerade i dess konfiguration, justering och administration, och killarna lanserar direkt MapReduce-jobb där och använder Spark.

I det här inlägget kommer vi att berätta hur vi löste problemet med ojämn klusterbelastning genom att skriva vår egen autoscaler med hjälp av molnet Mail.ru molnlösningar.

problem

Vårt kluster används inte i ett typiskt läge. Avfallshanteringen är mycket ojämn. Det finns till exempel praktiska lektioner, då alla 30 personer och en lärare går till klustret och börjar använda det. Eller igen, det finns dagar innan deadline när belastningen ökar kraftigt. Resten av tiden arbetar klustret i underlastläge.

Lösning #1 är att behålla ett kluster som klarar toppbelastningar, men som kommer att vara ledigt resten av tiden.

Lösning #2 är att behålla ett litet kluster, till vilket du manuellt lägger till noder före klasser och under toppbelastningar.

Lösning #3 är att behålla ett litet kluster och skriva en autoscaler som övervakar den aktuella belastningen av klustret och, med hjälp av olika API:er, lägga till och ta bort noder från klustret.

I det här inlägget kommer vi att prata om lösning #3. Denna autoscaler är starkt beroende av externa faktorer snarare än interna, och leverantörer tillhandahåller det ofta inte. Vi använder Mail.ru Cloud Solutions molninfrastruktur och skrev en autoscaler med MCS API. Och eftersom vi lär ut hur man arbetar med data, bestämde vi oss för att visa hur du kan skriva en liknande autoscaler för dina egna syften och använda den med ditt moln

Förutsättningar

Först måste du ha ett Hadoop-kluster. Vi använder till exempel HDP-distributionen.

För att dina noder snabbt ska läggas till och tas bort måste du ha en viss rollfördelning bland noderna.

  1. Master nod. Tja, det finns inget behov av att förklara något särskilt: klustrets huvudnod, på vilken till exempel Spark-drivrutinen startas, om du använder det interaktiva läget.
  2. Datumnod. Detta är den nod på vilken du lagrar data på HDFS och där beräkningar sker.
  3. Beräkningsnod. Detta är en nod där du inte lagrar något på HDFS, utan där beräkningar sker.

Viktig poäng. Autoskalning kommer att inträffa på grund av noder av den tredje typen. Om du börjar ta och lägga till noder av den andra typen kommer svarshastigheten att vara mycket låg - avveckling och återupptagande kommer att ta timmar på ditt kluster. Detta är naturligtvis inte vad du förväntar dig av autoskalning. Det vill säga, vi rör inte noder av den första och andra typen. De kommer att representera ett lägsta livskraftigt kluster som kommer att finnas under hela programmets varaktighet.

Så vår autoscaler är skriven i Python 3, använder Ambari API för att hantera klustertjänster, använder API från Mail.ru Cloud Solutions (MCS) för start och stopp av maskiner.

Lösningsarkitektur

  1. Modul autoscaler.py. Den innehåller tre klasser: 1) funktioner för att arbeta med Ambari, 2) funktioner för att arbeta med MCS, 3) funktioner relaterade direkt till autoscalerns logik.
  2. Manus observer.py. I huvudsak består den av olika regler: när och vid vilka ögonblick man ska anropa autoscaler-funktionerna.
  3. Konfigurationsfil config.py. Den innehåller till exempel en lista över noder tillåtna för autoskalning och andra parametrar som påverkar till exempel hur länge man ska vänta från det att en ny nod lades till. Det finns också tidsstämplar för start av klasser, så att den maximalt tillåtna klusterkonfigurationen lanseras före klassen.

Låt oss nu titta på kodbitarna i de två första filerna.

1. Autoscaler.py-modul

Ambari klass

Så här ser en kod som innehåller en klass ut Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Ovan kan du som exempel titta på implementeringen av funktionen stop_all_services, som stoppar alla tjänster på den önskade klusternoden.

Vid ingången till klassen Ambari du passerar:

  • ambari_url, till exempel, gillar 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – namnet på ditt kluster i Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • och inuti auth här är ditt användarnamn och lösenord för Ambari: auth = ('login', 'password').

Funktionen i sig är inget annat än ett par anrop via REST API till Ambari. Ur en logisk synvinkel får vi först en lista över körande tjänster på en nod, och ber sedan på ett givet kluster, på en given nod, att överföra tjänster från listan till staten INSTALLED. Funktioner för att starta alla tjänster, för att överföra noder till tillstånd Maintenance etc. ser likadana ut - de är bara några få förfrågningar via API:et.

Klass Mcs

Så här ser en kod som innehåller en klass ut Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Vid ingången till klassen Mcs vi skickar projekt-id inuti molnet och användar-id, samt hans lösenord. I funktion vm_turn_on vi vill sätta på en av maskinerna. Logiken här är lite mer komplicerad. I början av koden kallas tre andra funktioner: 1) vi behöver få en token, 2) vi måste konvertera värdnamnet till namnet på maskinen i MCS, 3) få id för denna maskin. Därefter gör vi helt enkelt en inläggsbegäran och startar den här maskinen.

Så här ser funktionen för att få en token ut:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Autoscaler klass

Den här klassen innehåller funktioner relaterade till själva driftlogiken.

Så här ser en kodbit för den här klassen ut:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Vi tar emot klasser för inträde. Ambari и Mcs, en lista över noder som är tillåtna för skalning, såväl som nodkonfigurationsparametrar: minne och cpu tilldelade noden i YARN. Det finns också 2 interna parametrar q_ram, q_cpu, som är köer. Med hjälp av dem lagrar vi värdena för den aktuella klusterbelastningen. Om vi ​​ser att det har varit en konsekvent ökad belastning under de senaste 5 minuterna, bestämmer vi oss för att vi måste lägga till +1-nod till klustret. Detsamma gäller för klustrets underutnyttjandetillstånd.

Koden ovan är ett exempel på en funktion som tar bort en maskin från klustret och stoppar den i molnet. Först är det en avveckling YARN Nodemanager, sedan slås läget på Maintenance, då stoppar vi alla tjänster på maskinen och stänger av den virtuella maskinen i molnet.

2. Script observer.py

Exempelkod därifrån:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

I den kontrollerar vi om förutsättningar har skapats för att öka kapaciteten i klustret och om det finns några maskiner i reserv, får värdnamnet på en av dem, lägger till det i klustret och publicerar ett meddelande om det på vårt teams Slack. Därefter börjar det cooldown_period, när vi inte lägger till eller tar bort något från klustret, utan bara övervakar belastningen. Om den har stabiliserats och ligger inom korridoren för optimala belastningsvärden så fortsätter vi helt enkelt att övervaka. Om en nod inte räckte, lägger vi till en annan.

För fall när vi har en lektion framför oss vet vi redan säkert att en nod inte kommer att räcka, så vi startar omedelbart alla fria noder och håller dem aktiva till slutet av lektionen. Detta sker med hjälp av en lista med tidsstämplar för aktivitet.

Slutsats

Autoscaler är en bra och bekväm lösning för de fall du upplever ojämn klusterbelastning. Du uppnår samtidigt önskad klusterkonfiguration för toppbelastningar och behåller samtidigt inte detta kluster vid underbelastning, vilket sparar pengar. Tja, plus att allt detta sker automatiskt utan ditt deltagande. Själva autoscaleren är inget annat än en uppsättning förfrågningar till klusterhanterarens API och molnleverantörens API, skrivna enligt en viss logik. Vad du definitivt behöver komma ihåg är uppdelningen av noder i 3 typer, som vi skrev tidigare. Och du kommer att bli glad.

Källa: will.com

Lägg en kommentar