Hoe u uw eigen autoscaler voor een cluster kunt maken

Hallo! Wij leiden mensen op om met big data te werken. Een onderwijsprogramma over big data is niet denkbaar zonder een eigen cluster, waaraan alle deelnemers samenwerken. Om deze reden heeft ons programma het altijd 🙂 We zijn bezig met de configuratie, afstemming en administratie ervan, en de jongens starten daar direct MapReduce-taken en gebruiken Spark.

In dit bericht zullen we u vertellen hoe we het probleem van ongelijkmatige clusterbelasting hebben opgelost door onze eigen autoscaler te schrijven met behulp van de cloud Mail.ru Cloud-oplossingen.

probleem

Ons cluster wordt niet in een typische modus gebruikt. De afvoer is zeer ongelijkmatig. Er zijn bijvoorbeeld praktijklessen, waarbij alle dertig mensen en een docent naar het cluster gaan en er gebruik van gaan maken. Of nogmaals, er zijn dagen vóór de deadline waarop de belasting enorm toeneemt. De rest van de tijd werkt het cluster in de onderbelastingsmodus.

Oplossing #1 is om een ​​cluster te behouden dat piekbelastingen kan weerstaan, maar de rest van de tijd inactief zal zijn.

Oplossing #2 is het behouden van een klein cluster, waaraan u handmatig knooppunten toevoegt vóór de lessen en tijdens piekbelastingen.

Oplossing #3 is om een ​​klein cluster te behouden en een autoscaler te schrijven die de huidige belasting van het cluster bewaakt en, met behulp van verschillende API's, knooppunten aan het cluster toevoegt en verwijdert.

In dit bericht zullen we het hebben over oplossing #3. Deze autoscaler is sterk afhankelijk van externe factoren in plaats van interne factoren, en providers bieden deze vaak niet. We maken gebruik van de cloudinfrastructuur van Mail.ru Cloud Solutions en schreven een autoscaler met behulp van de MCS API. En omdat we leren hoe je met data kunt werken, hebben we besloten om te laten zien hoe je een vergelijkbare autoscaler voor je eigen doeleinden kunt schrijven en deze met je cloud kunt gebruiken.

Voorwaarden

Ten eerste moet u een Hadoop-cluster hebben. We gebruiken bijvoorbeeld de HDP-distributie.

Om ervoor te zorgen dat uw knooppunten snel kunnen worden toegevoegd en verwijderd, moet u een bepaalde rolverdeling tussen de knooppunten hebben.

  1. Hoofdknooppunt. Nou, het is niet nodig om iets bijzonders uit te leggen: het hoofdknooppunt van het cluster, waarop bijvoorbeeld het Spark-stuurprogramma wordt gestart, als je de interactieve modus gebruikt.
  2. Datum knooppunt. Dit is het knooppunt waarop u gegevens op HDFS opslaat en waar berekeningen plaatsvinden.
  3. Computerknooppunt. Dit is een knooppunt waar je niets op HDFS opslaat, maar waar berekeningen plaatsvinden.

Belangrijk punt. Automatisch schalen vindt plaats vanwege knooppunten van het derde type. Als u knooppunten van het tweede type begint te nemen en toe te voegen, zal de reactiesnelheid erg laag zijn: het buiten gebruik stellen en opnieuw vastleggen van uw cluster zal uren duren. Dit is natuurlijk niet wat u van automatisch schalen verwacht. Dat wil zeggen, we raken de knooppunten van het eerste en tweede type niet aan. Zij zullen een minimaal levensvatbare cluster vertegenwoordigen die gedurende de gehele duur van het programma zal bestaan.

Onze autoscaler is dus geschreven in Python 3 en gebruikt de Ambari API om clusterservices te beheren en te gebruiken API van Mail.ru Cloud Solutions (MCS) voor het starten en stoppen van machines.

Oplossingsarchitectuur

  1. Module autoscaler.py. Het bevat drie klassen: 1) functies voor het werken met Ambari, 2) functies voor het werken met MCS, 3) functies die rechtstreeks verband houden met de logica van de autoscaler.
  2. Script observer.py. In wezen bestaat het uit verschillende regels: wanneer en op welke momenten de autoscaler-functies moeten worden aangeroepen.
  3. Configuratiebestand config.py. Het bevat bijvoorbeeld een lijst met knooppunten die zijn toegestaan ​​voor automatisch schalen en andere parameters die bijvoorbeeld van invloed zijn op hoe lang er moet worden gewacht vanaf het moment dat een nieuw knooppunt is toegevoegd. Er zijn ook tijdstempels voor het starten van klassen, zodat vóór de klasse de maximaal toegestane clusterconfiguratie wordt gestart.

Laten we nu eens kijken naar de stukjes code in de eerste twee bestanden.

1. Autoscaler.py-module

Ambari-klasse

Dit is hoe een stukje code met een klasse eruit ziet Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Hierboven kunt u als voorbeeld kijken naar de implementatie van de functie stop_all_services, waarmee alle services op het gewenste clusterknooppunt worden gestopt.

Bij de ingang van de klas Ambari jij slaagt:

  • ambari_urlbijvoorbeeld, zoals 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – de naam van uw cluster in Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • en binnen auth hier zijn uw gebruikersnaam en wachtwoord voor Ambari: auth = ('login', 'password').

De functie zelf is niets meer dan een paar aanroepen via de REST API naar Ambari. Vanuit logisch oogpunt ontvangen we eerst een lijst met actieve services op een knooppunt en vragen we vervolgens op een bepaald cluster, op een bepaald knooppunt, om services van de lijst naar de staat over te dragen INSTALLED. Functies voor het starten van alle services, voor het overbrengen van knooppunten naar de staat Maintenance enz. zien er hetzelfde uit - het zijn slechts een paar verzoeken via de API.

Klasse Mc's

Dit is hoe een stukje code met een klasse eruit ziet Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Bij de ingang van de klas Mcs we geven de project-ID in de cloud en de gebruikers-ID door, evenals zijn wachtwoord. In functie vm_turn_on we willen een van de machines aanzetten. De logica hier is iets ingewikkelder. Aan het begin van de code worden drie andere functies aangeroepen: 1) we moeten een token verkrijgen, 2) we moeten de hostnaam converteren naar de naam van de machine in MCS, 3) de id van deze machine ophalen. Vervolgens doen we eenvoudigweg een postverzoek en starten we deze machine.

Zo ziet de functie voor het verkrijgen van een token eruit:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Autoscaler-klasse

Deze klasse bevat functies die verband houden met de bedieningslogica zelf.

Zo ziet een stukje code voor deze klasse eruit:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Wij accepteren lessen voor deelname. Ambari и Mcs, een lijst met knooppunten die mogen worden geschaald, evenals knooppuntconfiguratieparameters: geheugen en CPU toegewezen aan het knooppunt in YARN. Er zijn ook 2 interne parameters q_ram, q_cpu, dit zijn wachtrijen. Met behulp hiervan slaan we de waarden van de huidige clusterbelasting op. Als we zien dat er de afgelopen vijf minuten een consistent verhoogde belasting is geweest, besluiten we dat we +5 knooppunt aan het cluster moeten toevoegen. Hetzelfde geldt voor de status van onderbenutting van het cluster.

De bovenstaande code is een voorbeeld van een functie die een machine uit het cluster verwijdert en in de cloud stopt. Eerst vindt er een ontmanteling plaats YARN Nodemanager, waarna de modus wordt ingeschakeld Maintenance, dan stoppen we alle services op de machine en schakelen we de virtuele machine in de cloud uit.

2. Script waarnemer.py

Voorbeeldcode vanaf daar:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Daarin controleren we of er voorwaarden zijn geschapen om de capaciteit van het cluster te vergroten en of er machines in reserve zijn, halen we de hostnaam van een daarvan op, voegen deze toe aan het cluster en publiceren daar een bericht over op de Slack van ons team. Daarna begint het cooldown_period, wanneer we niets toevoegen aan of verwijderen uit het cluster, maar eenvoudigweg de belasting monitoren. Als het zich heeft gestabiliseerd en binnen de bandbreedte van optimale belastingswaarden valt, gaan we gewoon door met monitoren. Als één knooppunt niet genoeg was, voegen we er nog een toe.

Voor gevallen waarin we een les voor de boeg hebben, weten we al zeker dat één knooppunt niet genoeg zal zijn, dus starten we onmiddellijk alle vrije knooppunten en houden ze actief tot het einde van de les. Dit gebeurt aan de hand van een lijst met tijdstempels van activiteiten.

Conclusie

Autoscaler is een goede en handige oplossing voor die gevallen waarin u ongelijkmatige clusterbelasting ervaart. Je realiseert tegelijkertijd de gewenste clusterconfiguratie voor piekbelasting en houdt tegelijkertijd dit cluster niet vast tijdens onderbelasting, waardoor je geld bespaart. Nou ja, en dit gebeurt allemaal automatisch, zonder jouw deelname. De autoscaler zelf is niets meer dan een reeks verzoeken aan de clustermanager-API en de cloudprovider-API, geschreven volgens een bepaalde logica. Wat je zeker moet onthouden is de verdeling van knooppunten in 3 typen, zoals we eerder schreven. En je zult gelukkig zijn.

Bron: www.habr.com

Voeg een reactie