Hvordan lage din egen autoscaler for en klynge

Hallo! Vi lærer opp folk til å jobbe med big data. Det er umulig å forestille seg et utdanningsprogram om big data uten en egen klynge, der alle deltakerne jobber sammen. Av denne grunn har programmet vårt alltid det 🙂 Vi er engasjert i konfigurasjon, tuning og administrasjon, og gutta starter direkte MapReduce-jobber der og bruker Spark.

I dette innlegget vil vi fortelle deg hvordan vi løste problemet med ujevn klyngebelastning ved å skrive vår egen autoskaler ved hjelp av skyen Mail.ru skyløsninger.

problem

Vår klynge brukes ikke i en typisk modus. Deponeringen er svært ujevn. For eksempel er det praktiske timer, når alle de 30 personene og en lærer går til klyngen og begynner å bruke den. Eller igjen, det er dager før fristen når belastningen øker kraftig. Resten av tiden opererer klyngen i underbelastningsmodus.

Løsning #1 er å beholde en klynge som tåler toppbelastninger, men som vil være inaktiv resten av tiden.

Løsning #2 er å beholde en liten klynge, som du manuelt legger til noder før klasser og under toppbelastninger.

Løsning #3 er å beholde en liten klynge og skrive en autoskaler som vil overvåke gjeldende belastning av klyngen og, ved hjelp av forskjellige APIer, legge til og fjerne noder fra klyngen.

I dette innlegget skal vi snakke om løsning #3. Denne autoscaleren er svært avhengig av eksterne faktorer i stedet for interne, og leverandørene gir den ofte ikke. Vi bruker Mail.ru Cloud Solutions skyinfrastruktur og skrev en autoscaler ved å bruke MCS API. Og siden vi lærer hvordan man jobber med data, bestemte vi oss for å vise hvordan du kan skrive en lignende autoscaler for dine egne formål og bruke den med skyen din

Forutsetninger

Først må du ha en Hadoop-klynge. For eksempel bruker vi HDP-distribusjonen.

For at nodene dine raskt skal legges til og fjernes, må du ha en viss rollefordeling blant nodene.

  1. Master node. Vel, det er ikke nødvendig å forklare noe spesielt: hovednoden til klyngen, som for eksempel Spark-driveren startes på, hvis du bruker den interaktive modusen.
  2. Datoknutepunkt. Dette er noden du lagrer data på HDFS og hvor beregninger finner sted.
  3. Datamaskinnode. Dette er en node hvor du ikke lagrer noe på HDFS, men hvor beregninger skjer.

Viktig poeng. Autoskalering vil skje på grunn av noder av den tredje typen. Hvis du begynner å ta og legge til noder av den andre typen, vil responshastigheten være svært lav - avvikling og recommitting vil ta timer på klyngen din. Dette er selvfølgelig ikke det du forventer av autoskalering. Det vil si at vi ikke berører noder av den første og andre typen. De vil representere en minimum levedyktig klynge som vil eksistere gjennom hele programmets varighet.

Så vår autoscaler er skrevet i Python 3, bruker Ambari API for å administrere klyngetjenester, bruker API fra Mail.ru Cloud Solutions (MCS) for start og stopp av maskiner.

Løsningsarkitektur

  1. Modul autoscaler.py. Den inneholder tre klasser: 1) funksjoner for å jobbe med Ambari, 2) funksjoner for å jobbe med MCS, 3) funksjoner relatert direkte til logikken til autoscaleren.
  2. Manus observer.py. I hovedsak består den av forskjellige regler: når og i hvilke øyeblikk skal autoscaler-funksjonene kalles.
  3. Konfigurasjonsfil config.py. Den inneholder for eksempel en liste over noder som er tillatt for autoskalering og andre parametere som påvirker for eksempel hvor lenge man skal vente fra det øyeblikket en ny node ble lagt til. Det er også tidsstempler for starten av klasser, slik at før klassen er den maksimalt tillatte klyngekonfigurasjonen lansert.

La oss nå se på kodebitene i de to første filene.

1. Autoscaler.py-modul

Ambari klasse

Slik ser en kode som inneholder en klasse ut Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Ovenfor kan du som eksempel se på implementeringen av funksjonen stop_all_services, som stopper alle tjenester på ønsket klyngennode.

Ved inngangen til klassen Ambari du består:

  • ambari_url, for eksempel, liker 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – navnet på klyngen din i Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • og innvendig auth her er brukernavnet og passordet ditt for Ambari: auth = ('login', 'password').

Funksjonen i seg selv er ikke mer enn et par anrop via REST API til Ambari. Fra et logisk synspunkt mottar vi først en liste over kjørende tjenester på en node, og ber deretter på en gitt klynge, på en gitt node, om å overføre tjenester fra listen til staten INSTALLED. Funksjoner for å starte alle tjenester, for å overføre noder til tilstand Maintenance etc. ser like ut - de er bare noen få forespørsler gjennom API.

Klasse Mcs

Slik ser en kode som inneholder en klasse ut Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Ved inngangen til klassen Mcs vi sender prosjekt-ID-en inne i skyen og bruker-IDen, samt passordet hans. I funksjon vm_turn_on vi ønsker å slå på en av maskinene. Logikken her er litt mer komplisert. I begynnelsen av koden kalles tre andre funksjoner: 1) vi må få et token, 2) vi må konvertere vertsnavnet til navnet på maskinen i MCS, 3) få IDen til denne maskinen. Deretter gjør vi bare en postforespørsel og starter denne maskinen.

Slik ser funksjonen for å få et token ut:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Autoscaler klasse

Denne klassen inneholder funksjoner relatert til selve driftslogikken.

Slik ser et stykke kode for denne klassen ut:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Vi tar imot klasser for påmelding. Ambari и Mcs, en liste over noder som er tillatt for skalering, samt nodekonfigurasjonsparametere: minne og cpu allokert til noden i YARN. Det er også 2 interne parametere q_ram, q_cpu, som er køer. Ved å bruke dem lagrer vi verdiene til gjeldende klyngebelastning. Hvis vi ser at det i løpet av de siste 5 minuttene har vært en konsekvent økt belastning, så bestemmer vi at vi må legge til +1 node til klyngen. Det samme gjelder for klyngeunderutnyttelsestilstanden.

Koden ovenfor er et eksempel på en funksjon som fjerner en maskin fra klyngen og stopper den i skyen. Først er det en avvikling YARN Nodemanager, så slås modusen på Maintenance, så stopper vi alle tjenester på maskinen og slår av den virtuelle maskinen i skyen.

2. Script observer.py

Eksempelkode derfra:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

I den sjekker vi om det er opprettet betingelser for å øke kapasiteten til klyngen og om det er noen maskiner i reserve, får vertsnavnet til en av dem, legger det til i klyngen og publiserer en melding om det på teamets Slack. Deretter starter det cooldown_period, når vi ikke legger til eller fjerner noe fra klyngen, men bare overvåker belastningen. Hvis den har stabilisert seg og er innenfor korridoren for optimale lastverdier, så fortsetter vi rett og slett overvåkingen. Hvis en node ikke var nok, legger vi til en annen.

For tilfeller der vi har en leksjon foran oss, vet vi allerede sikkert at én node ikke vil være nok, så vi starter umiddelbart alle de ledige nodene og holder dem aktive til slutten av leksjonen. Dette skjer ved hjelp av en liste med tidsstempler for aktivitet.

Konklusjon

Autoscaler er en god og praktisk løsning for de tilfellene du opplever ujevn klyngebelastning. Du oppnår samtidig ønsket klyngekonfigurasjon for toppbelastninger og beholder samtidig ikke denne klyngen under underbelastning, noe som sparer penger. Vel, pluss at dette skjer automatisk uten din deltakelse. Selve autoscaleren er ikke noe mer enn et sett med forespørsler til cluster manager API og cloud provider API, skrevet i henhold til en viss logikk. Det du definitivt trenger å huske er inndelingen av noder i 3 typer, som vi skrev tidligere. Og du vil bli glad.

Kilde: www.habr.com

Legg til en kommentar