Sådan laver du din egen autoscaler til en klynge

Hej! Vi uddanner folk til at arbejde med big data. Det er umuligt at forestille sig et uddannelsesprogram om big data uden sin egen klynge, hvor alle deltagere arbejder sammen. Af denne grund har vores program det altid 🙂 Vi er engageret i dets konfiguration, tuning og administration, og fyrene starter direkte MapReduce-job der og bruger Spark.

I dette indlæg vil vi fortælle dig, hvordan vi løste problemet med ujævn klyngebelastning ved at skrive vores egen autoscaler ved hjælp af skyen Mail.ru Cloud-løsninger.

problem

Vores klynge bruges ikke i en typisk tilstand. Bortskaffelsen er meget ujævn. For eksempel er der praktiske timer, hvor alle 30 personer og en lærer går i klyngen og begynder at bruge den. Eller igen er der dage før deadline, hvor belastningen stiger meget. Resten af ​​tiden kører klyngen i underbelastningstilstand.

Løsning #1 er at beholde en klynge, der vil modstå spidsbelastninger, men vil være inaktiv resten af ​​tiden.

Løsning #2 er at beholde en lille klynge, hvortil du manuelt tilføjer noder før klasser og under spidsbelastninger.

Løsning #3 er at beholde en lille klynge og skrive en autoscaler, der vil overvåge den aktuelle belastning af klyngen og ved hjælp af forskellige API'er tilføje og fjerne noder fra klyngen.

I dette indlæg vil vi tale om løsning #3. Denne autoscaler er meget afhængig af eksterne faktorer frem for interne, og udbydere leverer det ofte ikke. Vi bruger Mail.ru Cloud Solutions cloud-infrastruktur og skrev en autoscaler ved hjælp af MCS API. Og da vi lærer at arbejde med data, besluttede vi at vise, hvordan du kan skrive en lignende autoscaler til dine egne formål og bruge den med din sky

Forudsætninger

Først skal du have en Hadoop-klynge. For eksempel bruger vi HDP-distributionen.

For at dine noder hurtigt kan tilføjes og fjernes, skal du have en vis rollefordeling blandt noderne.

  1. Master node. Nå, der er ingen grund til at forklare noget særligt: ​​klyngens hovedknude, hvor for eksempel Spark-driveren startes, hvis du bruger den interaktive tilstand.
  2. Datoknudepunkt. Dette er den node, hvorpå du gemmer data på HDFS, og hvor beregningerne finder sted.
  3. Computernode. Dette er en node, hvor du ikke gemmer noget på HDFS, men hvor der sker beregninger.

Vigtigt punkt. Autoskalering vil forekomme på grund af noder af den tredje type. Hvis du begynder at tage og tilføje noder af den anden type, vil responshastigheden være meget lav - nedlukning og genindsættelse vil tage timer på din klynge. Dette er selvfølgelig ikke, hvad du forventer af autoskalering. Det vil sige, vi rører ikke noder af den første og anden type. De vil repræsentere en minimum levedygtig klynge, der vil eksistere i hele programmets varighed.

Så vores autoscaler er skrevet i Python 3, bruger Ambari API til at administrere klyngetjenester, bruger API fra Mail.ru Cloud Solutions (MCS) til start og stop af maskiner.

Løsningsarkitektur

  1. Modul autoscaler.py. Den indeholder tre klasser: 1) funktioner til at arbejde med Ambari, 2) funktioner til at arbejde med MCS, 3) funktioner relateret direkte til autoscalerens logik.
  2. Manuskript observer.py. Grundlæggende består den af ​​forskellige regler: hvornår og på hvilke tidspunkter man skal kalde autoscaler-funktionerne.
  3. Konfigurationsfil config.py. Den indeholder for eksempel en liste over noder, der er tilladt for autoskalering og andre parametre, der for eksempel påvirker, hvor længe der skal ventes fra det øjeblik, en ny node blev tilføjet. Der er også tidsstempler for start af klasser, så før klassen er den maksimalt tilladte klyngekonfiguration lanceret.

Lad os nu se på kodestykkerne inde i de to første filer.

1. Autoscaler.py-modul

Ambari klasse

Sådan ser et stykke kode, der indeholder en klasse, ud Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Ovenfor kan du som eksempel se på implementeringen af ​​funktionen stop_all_services, som stopper alle tjenester på den ønskede klynge node.

Ved indgangen til klassen Ambari du bestå:

  • ambari_url, for eksempel som 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – navnet på din klynge i Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • og indeni auth her er dit login og adgangskode til Ambari: auth = ('login', 'password').

Selve funktionen er ikke andet end et par opkald via REST API til Ambari. Fra et logisk synspunkt modtager vi først en liste over kørende tjenester på en node og beder derefter på en given klynge, på en given node, om at overføre tjenester fra listen til staten INSTALLED. Funktioner til lancering af alle tjenester, til overførsel af noder til tilstand Maintenance osv. ligner hinanden - de er blot nogle få anmodninger gennem API'et.

Klasse Mcs

Sådan ser et stykke kode, der indeholder en klasse, ud Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Ved indgangen til klassen Mcs vi videregiver projekt-id'et inde i skyen og bruger-id'et, samt hans adgangskode. I funktion vm_turn_on vi vil tænde for en af ​​maskinerne. Logikken her er lidt mere kompliceret. I begyndelsen af ​​koden kaldes tre andre funktioner: 1) vi skal have et token, 2) vi skal konvertere værtsnavnet til navnet på maskinen i MCS, 3) få id'et på denne maskine. Dernæst laver vi blot en postanmodning og starter denne maskine.

Sådan ser funktionen til at få et token ud:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Autoscaler klasse

Denne klasse indeholder funktioner relateret til selve driftslogikken.

Sådan ser et stykke kode til denne klasse ud:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Vi modtager klasser for adgang. Ambari и Mcs, en liste over noder, der er tilladt til skalering, samt nodekonfigurationsparametre: hukommelse og cpu allokeret til noden i YARN. Der er også 2 interne parametre q_ram, q_cpu, som er køer. Ved hjælp af dem gemmer vi værdierne for den aktuelle klyngebelastning. Hvis vi ser, at der i løbet af de sidste 5 minutter har været en konsekvent øget belastning, beslutter vi, at vi skal tilføje +1 node til klyngen. Det samme gælder for klyngeunderudnyttelsestilstanden.

Ovenstående kode er et eksempel på en funktion, der fjerner en maskine fra klyngen og stopper den i skyen. Først er der en nedlukning YARN Nodemanager, så aktiveres tilstanden Maintenance, så stopper vi alle tjenester på maskinen og slukker den virtuelle maskine i skyen.

2. Script observer.py

Eksempelkode derfra:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

I den tjekker vi, om der er skabt betingelser for at øge klyngens kapacitet, og om der er nogen maskiner i reserve, får værtsnavnet på en af ​​dem, tilføjer det til klyngen og offentliggør en besked om det på vores teams Slack. Hvorefter det starter cooldown_period, når vi ikke tilføjer eller fjerner noget fra klyngen, men blot overvåger belastningen. Hvis det har stabiliseret sig og er inden for korridoren af ​​optimale belastningsværdier, så fortsætter vi blot med overvågningen. Hvis en node ikke var nok, tilføjer vi en anden.

I tilfælde, hvor vi har en lektion forude, ved vi allerede med sikkerhed, at én node ikke vil være nok, så vi starter straks alle de frie noder og holder dem aktive indtil slutningen af ​​lektionen. Dette sker ved hjælp af en liste over aktivitetstidsstempler.

Konklusion

Autoscaler er en god og bekvem løsning til de tilfælde, hvor du oplever ujævn klyngebelastning. Du opnår samtidig den ønskede klyngekonfiguration til spidsbelastninger og beholder samtidig ikke denne klynge under underbelastning, hvilket sparer penge. Nå, plus det hele sker automatisk uden din deltagelse. Selve autoscaleren er ikke andet end et sæt anmodninger til cluster manager API og cloud udbyder API, skrevet efter en vis logik. Hvad du helt sikkert skal huske er opdelingen af ​​noder i 3 typer, som vi skrev tidligere. Og du vil blive glad.

Kilde: www.habr.com

Tilføj en kommentar