Hoe om jou eie autoscaler vir 'n cluster te maak

Hallo! Ons lei mense op om met groot data te werk. Dit is onmoontlik om 'n opvoedkundige program oor groot data voor te stel sonder 'n eie groepering, waaraan alle deelnemers saamwerk. Om hierdie rede het ons program dit altyd 🙂 Ons is besig met die konfigurasie, tuning en administrasie daarvan, en die ouens begin MapReduce-werksgeleenthede direk daar en gebruik Spark.

In hierdie pos sal ons jou vertel hoe ons die probleem van ongelyke groeplading opgelos het deur ons eie outoscaler met behulp van die wolk te skryf Mail.ru Wolkoplossings.

probleem

Ons cluster word nie in 'n tipiese modus gebruik nie. Beskikking is hoogs ongelyk. Daar is byvoorbeeld praktiese klasse, wanneer al 30 mense en 'n onderwyser na die groep gaan en dit begin gebruik. Of weer, daar is dae voor die sperdatum wanneer die vrag baie toeneem. Die res van die tyd werk die groep in onderladingsmodus.

Oplossing #1 is om 'n tros te hou wat piekladings sal weerstaan, maar die res van die tyd ledig sal wees.

Oplossing #2 is om 'n klein groepie te hou, waarby jy nodusse handmatig byvoeg voor klasse en tydens piekladings.

Oplossing #3 is om 'n klein groepie te hou en 'n outoskaaler te skryf wat die huidige las van die groep sal monitor en, met behulp van verskeie API's, nodusse by die groep by te voeg en te verwyder.

In hierdie pos sal ons praat oor oplossing #3. Hierdie outoskaaler is baie afhanklik van eksterne faktore eerder as interne faktore, en verskaffers verskaf dit dikwels nie. Ons gebruik die Mail.ru Cloud Solutions-wolkinfrastruktuur en het 'n outoskaaler geskryf met die MCS API. En aangesien ons leer hoe om met data te werk, het ons besluit om te wys hoe jy 'n soortgelyke outoscaler vir jou eie doeleindes kan skryf en dit met jou wolk kan gebruik

Voorvereistes

Eerstens moet jy 'n Hadoop-kluster hê. Ons gebruik byvoorbeeld die HDP-verspreiding.

Om jou nodusse vinnig by te voeg en te verwyder, moet jy 'n sekere verdeling van rolle tussen die nodusse hê.

  1. Meester nodus. Wel, dit is nie nodig om iets besonders te verduidelik nie: die hoofknoop van die groep, waarop, byvoorbeeld, die Spark-bestuurder geloods word, as jy die interaktiewe modus gebruik.
  2. Datum nodus. Dit is die nodus waarop jy data op HDFS stoor en waar berekeninge plaasvind.
  3. Rekenaar nodus. Dit is 'n nodus waar jy niks op HDFS stoor nie, maar waar berekeninge plaasvind.

Belangrike punt. Outoskaal sal plaasvind as gevolg van nodusse van die derde tipe. As jy begin om nodusse van die tweede tipe te neem en by te voeg, sal die reaksiespoed baie laag wees - ontmanteling en hertoewyding sal ure op jou groep neem. Dit is natuurlik nie wat jy van outoskaal verwag nie. Dit wil sê, ons raak nie aan nodusse van die eerste en tweede tipe nie. Hulle sal 'n minimum lewensvatbare groepering verteenwoordig wat gedurende die duur van die program sal bestaan.

Dus, ons outoscaler is in Python 3 geskryf, gebruik die Ambari API om groepdienste te bestuur, gebruik API van Mail.ru Cloud Solutions (MCS) vir die aansit en stop van masjiene.

Oplossingsargitektuur

  1. Module autoscaler.py. Dit bevat drie klasse: 1) funksies om met Ambari te werk, 2) funksies om met MCS te werk, 3) funksies wat direk verband hou met die logika van die outoskaaler.
  2. Skripsie observer.py. In wese bestaan ​​dit uit verskillende reëls: wanneer en op watter oomblikke om die outoskaaler-funksies te noem.
  3. Konfigurasie lêer config.py. Dit bevat byvoorbeeld 'n lys nodusse wat toegelaat word vir outoskaling en ander parameters wat byvoorbeeld beïnvloed hoe lank om te wag vanaf die oomblik dat 'n nuwe nodus bygevoeg is. Daar is ook tydstempels vir die begin van klasse, sodat die maksimum toegelate groepkonfigurasie voor die klas geloods word.

Kom ons kyk nou na die stukke kode binne die eerste twee lêers.

1. Autoscaler.py-module

Ambari-klas

Dit is hoe 'n stukkie kode lyk wat 'n klas bevat Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Hierbo kan jy as voorbeeld kyk na die implementering van die funksie stop_all_services, wat alle dienste op die verlangde groepknoop stop.

By die ingang van die klas Ambari jy slaag:

  • ambari_url, byvoorbeeld, soos 'http://localhost:8080/api/v1/clusters/',
  • cluster_name - die naam van jou groep in Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • en binne auth hier is jou login en wagwoord vir Ambari: auth = ('login', 'password').

Die funksie self is niks meer as 'n paar oproepe via die REST API na Ambari nie. Uit 'n logiese oogpunt ontvang ons eers 'n lys van lopende dienste op 'n nodus, en vra dan op 'n gegewe kluster, op 'n gegewe nodus, om dienste van die lys na die staat oor te dra. INSTALLED. Funksies vir die bekendstelling van alle dienste, vir die oordrag van nodusse na staat Maintenance ens. lyk soortgelyk - dit is net 'n paar versoeke deur die API.

Klas Mcs

Dit is hoe 'n stukkie kode lyk wat 'n klas bevat Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

By die ingang van die klas Mcs ons gee die projek-ID binne die wolk en die gebruikers-ID, sowel as sy wagwoord, deur. In funksie vm_turn_on ons wil een van die masjiene aanskakel. Die logika hier is 'n bietjie meer ingewikkeld. Aan die begin van die kode word drie ander funksies genoem: 1) ons moet 'n teken kry, 2) ons moet die gasheernaam omskakel na die naam van die masjien in MCS, 3) die ID van hierdie masjien kry. Vervolgens maak ons ​​eenvoudig 'n posversoek en begin hierdie masjien.

Dit is hoe die funksie vir die verkryging van 'n token lyk:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Autoscaler klas

Hierdie klas bevat funksies wat verband hou met die bedryfslogika self.

Dit is hoe 'n stukkie kode vir hierdie klas lyk:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Ons aanvaar klasse vir toegang. Ambari и Mcs, 'n lys van nodusse wat toegelaat word vir skaal, sowel as nodus konfigurasie parameters: geheue en cpu toegeken aan die nodus in YARN. Daar is ook 2 interne parameters q_ram, q_cpu, wat toue is. Deur dit te gebruik, stoor ons die waardes van die huidige groeplading. As ons sien dat daar oor die afgelope 5 minute 'n konsekwent verhoogde las was, dan besluit ons dat ons +1 nodus by die groep moet voeg. Dieselfde geld vir die groeponderbenuttingstaat.

Die kode hierbo is 'n voorbeeld van 'n funksie wat 'n masjien uit die groep verwyder en dit in die wolk stop. Eerstens is daar 'n ontmanteling YARN Nodemanager, dan skakel die modus aan Maintenance, dan stop ons alle dienste op die masjien en skakel die virtuele masjien in die wolk af.

2. Script observer.py

Voorbeeldkode van daar af:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Daarin kyk ons ​​of toestande geskep is vir die verhoging van die kapasiteit van die groep en of daar enige masjiene in reserwe is, kry die gasheernaam van een van hulle, voeg dit by die groep en publiseer 'n boodskap daaroor op ons span se Slack. Daarna begin dit cooldown_period, wanneer ons niks byvoeg of uit die groep verwyder nie, maar bloot die las monitor. As dit gestabiliseer het en binne die gang van optimale laswaardes is, gaan ons eenvoudig voort met monitering. As een nodus nie genoeg was nie, voeg ons nog een by.

Vir gevalle wanneer ons 'n les voor het, weet ons reeds vir seker dat een nodus nie genoeg sal wees nie, so ons begin dadelik al die gratis nodusse en hou hulle aktief tot aan die einde van die les. Dit gebeur met 'n lys van aktiwiteit tydstempels.

Gevolgtrekking

Autoscaler is 'n goeie en gerieflike oplossing vir die gevalle wanneer jy ongelyke groeplading ervaar. Jy bereik terselfdertyd die verlangde troskonfigurasie vir piekladings en hou terselfdertyd nie hierdie tros tydens onderlading nie, wat geld bespaar. Wel, plus dit gebeur alles outomaties sonder jou deelname. Die outoskaaler self is niks meer as 'n stel versoeke aan die cluster bestuurder API en die wolk verskaffer API, geskryf volgens 'n sekere logika. Wat jy beslis moet onthou, is die verdeling van nodusse in 3 tipes, soos ons vroeër geskryf het. En jy sal gelukkig wees.

Bron: will.com

Voeg 'n opmerking