Kuidas teha klastri jaoks oma automaatskaalaerit

Tere! Koolitame inimesi suurandmetega töötama. Suurandmete alast haridusprogrammi on võimatu ette kujutada ilma oma klastrita, mille kallal kõik osalejad koos töötavad. Sel põhjusel on see meie programmis alati olemas 🙂 Tegeleme selle seadistamise, häälestamise ja administreerimisega ning kutid käivitavad seal otse MapReduce'i töid ja kasutavad Sparki.

Selles postituses räägime teile, kuidas lahendasime klastrite ebaühtlase laadimise probleemi, kirjutades pilve abil oma automaatskaalari Mail.ru pilvelahendused.

probleem

Meie klastrit ei kasutata tüüpilises režiimis. Kõrvaldamine on väga ebaühtlane. Näiteks on praktilised tunnid, kui kõik 30 inimest koos õpetajaga lähevad klastrisse ja hakkavad seda kasutama. Või jällegi on päevi enne tähtaega, mil koormus kasvab kõvasti. Ülejäänud aja töötab klaster alakoormusrežiimis.

Lahendus nr 1 on hoida klaster, mis peab vastu tippkoormusele, kuid on ülejäänud aja jõude.

Lahendus nr 2 on väikese klastri säilitamine, kuhu lisate käsitsi sõlmed enne tunde ja tippkoormuse ajal.

Lahendus nr 3 on hoida väikest klastrit ja kirjutada automaatne skaleerija, mis jälgib klastri praegust koormust ning erinevate API-de abil lisab ja eemaldab klastrist sõlme.

Selles postituses räägime lahendusest nr 3. See automaatne skaleerija sõltub pigem välistest kui sisemistest teguritest ja teenusepakkujad seda sageli ei paku. Kasutame Mail.ru Cloud Solutionsi pilveinfrastruktuuri ja kirjutasime MCS API abil automaatskaalari. Ja kuna me õpetame andmetega töötamist, otsustasime näidata, kuidas saate kirjutada sarnase automaatskaalaeri enda tarbeks ja kasutada seda oma pilvega

Eeldused

Esiteks peab teil olema Hadoopi klaster. Näiteks kasutame HDP-jaotust.

Teie sõlmede kiireks lisamiseks ja eemaldamiseks peavad teil olema sõlmede vahel teatud rollide jaotus.

  1. Peasõlm. Noh, pole vaja midagi eriti selgitada: klastri põhisõlm, millel käivitatakse näiteks Sparki draiver, kui kasutate interaktiivset režiimi.
  2. Kuupäeva sõlm. See on sõlm, kuhu salvestate HDFS-i andmeid ja kus tehakse arvutusi.
  3. Arvutussõlm. See on sõlm, kus te ei salvesta midagi HDFS-i, kuid kus toimuvad arvutused.

Oluline punkt. Automaatne skaleerimine toimub kolmandat tüüpi sõlmede tõttu. Kui alustate teist tüüpi sõlmede võtmist ja lisamist, on reageerimiskiirus väga madal – dekomisjoneerimine ja uuesti kasutuselevõtmine võtab teie klastris tunde. See pole muidugi see, mida te automaatselt skaleerimiselt ootate. See tähendab, et me ei puuduta esimest ja teist tüüpi sõlmi. Need kujutavad endast minimaalset elujõulist klastrit, mis eksisteerib kogu programmi kestuse jooksul.

Niisiis, meie automaatne skaleerija on kirjutatud Python 3-s, kasutab klastriteenuste haldamiseks Ambari API-d, API teenusest Mail.ru Cloud Solutions (MCS) masinate käivitamiseks ja seiskamiseks.

Lahenduse arhitektuur

  1. Moodul autoscaler.py. See sisaldab kolme klassi: 1) funktsioonid Ambariga töötamiseks, 2) funktsioonid MCS-iga töötamiseks, 3) funktsioonid, mis on otseselt seotud automaatse skaleerija loogikaga.
  2. Skript observer.py. Põhimõtteliselt koosneb see erinevatest reeglitest: millal ja millistel hetkedel kutsuda automaatskaleri funktsioone.
  3. Konfiguratsioonifail config.py. See sisaldab näiteks loendit sõlmedest, mis on lubatud automaatsel skaleerimisel, ja muid parameetreid, mis mõjutavad näiteks seda, kui kaua oodata uue sõlme lisamise hetkest. Tundide alguseks on ka ajatemplid, et enne tundi käivitub maksimaalne lubatud klastri konfiguratsioon.

Vaatame nüüd kahe esimese faili sees olevaid kooditükke.

1. Moodul Autoscaler.py

Ambari klass

Selline näeb välja klassi sisaldav kooditükk Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Ülaltoodud näitena saate vaadata funktsiooni rakendamist stop_all_services, mis peatab kõik soovitud klastri sõlme teenused.

Klassi sissepääsu juures Ambari sa läbid:

  • ambari_urlnäiteks meeldib 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – teie klastri nimi Ambaris,
  • headers = {'X-Requested-By': 'ambari'}
  • ja sees auth siin on teie Ambari sisselogimine ja parool: auth = ('login', 'password').

Funktsioon ise ei ole midagi muud kui paar kõnet REST API kaudu Ambarile. Loogilisest vaatenurgast saame esmalt sõlmes töötavate teenuste loendi ja seejärel palume antud klastris antud sõlmel teenused loendist olekusse üle kanda. INSTALLED. Funktsioonid kõigi teenuste käivitamiseks, sõlmede olekusse ülekandmiseks Maintenance jms näevad välja sarnased – need on vaid mõned API kaudu tehtud taotlused.

klassi Mcs

Selline näeb välja klassi sisaldav kooditükk Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Klassi sissepääsu juures Mcs edastame projekti ID pilve sees ja kasutaja ID, samuti tema parooli. Funktsioonis vm_turn_on tahame ühe masina sisse lülitada. Loogika on siin veidi keerulisem. Koodi alguses kutsutakse välja veel kolm funktsiooni: 1) peame hankima märgi, 2) peame teisendama hostinime MCS-is oleva masina nimeks, 3) saama selle masina id. Järgmiseks teeme lihtsalt postitaotluse ja käivitame selle masina.

Tokeni saamise funktsioon näeb välja järgmine:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Autoscaleri klass

See klass sisaldab funktsioone, mis on seotud tööloogika endaga.

Selle klassi koodiosa näeb välja selline:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Võtame vastu klasse sisenemiseks. Ambari и Mcs, skaleerimiseks lubatud sõlmede loend, samuti sõlmede konfiguratsiooniparameetrid: YARN-i sõlmele eraldatud mälu ja protsessor. Samuti on 2 sisemist parameetrit q_ram, q_cpu, mis on järjekorrad. Neid kasutades salvestame praeguse klastri koormuse väärtused. Kui näeme, et viimase 5 minuti jooksul on koormus pidevalt suurenenud, siis otsustame, et peame klastrisse lisama +1 sõlme. Sama kehtib ka klastri alakasutamise oleku kohta.

Ülaltoodud kood on näide funktsioonist, mis eemaldab masina klastrist ja peatab selle pilves. Esiteks toimub dekomisjoneerimine YARN Nodemanager, siis lülitub režiim sisse Maintenance, siis peatame kõik teenused masinas ja lülitame virtuaalmasina pilves välja.

2. Skripti vaatleja.py

Näidiskood sealt:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Selles kontrollime, kas klastri läbilaskevõime suurendamiseks on loodud tingimused ja kas masinaid on reservis, hangime neist ühe hostinime, lisame selle klastrisse ja avaldame selle kohta teate meie meeskonna Slackis. Pärast mida see algab cooldown_period, kui me ei lisa ega eemalda klastrist midagi, vaid lihtsalt jälgime koormust. Kui see on stabiliseerunud ja jääb optimaalsete koormusväärtuste koridori, siis jätkame lihtsalt jälgimist. Kui ühest sõlmest ei piisanud, lisame teise.

Juhtudeks, kui meil on ees õppetund, teame juba kindlalt, et ühest sõlmest ei piisa, seega käivitame kohe kõik vabad sõlmed ja hoiame neid aktiivsena kuni tunni lõpuni. See juhtub tegevuste ajatemplite loendi abil.

Järeldus

Autoscaler on hea ja mugav lahendus nendeks puhkudeks, kui kogete ebaühtlast klastri laadimist. Samal ajal saavutate soovitud klastri konfiguratsiooni tippkoormuse jaoks ja samal ajal ei hoia seda klastrit alakoormuse ajal, säästes raha. Noh, see kõik juhtub automaatselt ilma teie osaluseta. Automaatne skaleerija ise pole midagi muud kui teatud loogika järgi kirjutatud taotluste kogum klastrihalduri API-le ja pilvepakkuja API-le. Mida peate kindlasti meeles pidama, on sõlmede jagamine 3 tüüpi, nagu me varem kirjutasime. Ja sa saad õnnelikuks.

Allikas: www.habr.com

Lisa kommentaar