Kaip sukurti savo automatinį klasterį

Sveiki! Apmokome žmones dirbti su dideliais duomenimis. Neįmanoma įsivaizduoti švietimo programos apie didžiuosius duomenis be savo klasterio, kuriame visi dalyviai dirba kartu. Dėl šios priežasties mūsų programa visada ją turi 🙂 Mes užsiimame jos konfigūravimu, derinimu ir administravimu, o vaikinai ten tiesiogiai paleidžia MapReduce darbus ir naudoja Spark.

Šiame įraše mes jums pasakysime, kaip išsprendėme netolygaus klasterio įkėlimo problemą parašydami savo automatinį skalę naudodami debesį Mail.ru debesų sprendimai.

problema

Mūsų klasteris nenaudojamas įprastu režimu. Išmetimas labai netolygus. Pavyzdžiui, yra praktinių užsiėmimų, kai visi 30 žmonių ir mokytojas nueina į klasterį ir pradeda juo naudotis. Arba vėl yra dienų iki termino, kai krūvis labai padidėja. Likusį laiką klasteris veikia per mažos apkrovos režimu.

1 sprendimas yra išlaikyti klasterį, kuris atlaikytų didžiausias apkrovas, bet likusį laiką bus neaktyvus.

2 sprendimas yra išlaikyti nedidelį klasterį, į kurį rankiniu būdu pridedate mazgus prieš pamokas ir didžiausios apkrovos metu.

3 sprendimas yra išlaikyti nedidelį klasterį ir parašyti automatinį skalę, kuri stebės esamą klasterio apkrovą ir, naudodama įvairias API, pridės ir pašalins mazgus iš klasterio.

Šiame įraše kalbėsime apie 3 sprendimą. Šis automatinis skaleris labai priklauso nuo išorinių veiksnių, o ne nuo vidinių, o tiekėjai dažnai jo neteikia. Naudojame Mail.ru Cloud Solutions debesies infrastruktūrą ir parašėme automatinį skalę naudodami MCS API. Kadangi mes mokome dirbti su duomenimis, nusprendėme parodyti, kaip galite parašyti panašų automatinį skalavimo įrenginį savo tikslams ir naudoti jį savo debesyje.

Būtinos sąlygos

Pirma, turite turėti Hadoop klasterį. Pavyzdžiui, mes naudojame HDP paskirstymą.

Kad jūsų mazgai būtų greitai pridėti ir pašalinti, turite turėti tam tikrą vaidmenų paskirstymą tarp mazgų.

  1. Pagrindinis mazgas. Na, nereikia nieko ypač aiškinti: pagrindinis klasterio mazgas, kuriame, pavyzdžiui, paleidžiama „Spark“ tvarkyklė, jei naudojate interaktyvųjį režimą.
  2. Datos mazgas. Tai mazgas, kuriame saugote duomenis HDFS ir kuriame atliekami skaičiavimai.
  3. Skaičiavimo mazgas. Tai mazgas, kuriame nieko nesaugote HDFS, bet kur vyksta skaičiavimai.

Svarbus punktas. Automatinis mastelis įvyks dėl trečiojo tipo mazgų. Jei pradėsite imti ir pridėti antrojo tipo mazgus, atsako greitis bus labai mažas – eksploatacijos nutraukimas ir pakartotinis įjungimas jūsų klasteryje užtruks valandas. Žinoma, tai nėra tai, ko tikitės iš automatinio mastelio keitimo. Tai yra, mes neliečiame pirmojo ir antrojo tipų mazgų. Jie bus minimali gyvybinga grupė, kuri egzistuos visą programos laikotarpį.

Taigi, mūsų automatinis skaleris parašytas Python 3, naudoja Ambari API klasterio paslaugoms valdyti, naudoja API iš Mail.ru Cloud Solutions (MCS) mašinoms paleisti ir sustabdyti.

Sprendimo architektūra

  1. Modulis autoscaler.py. Jame yra trys klasės: 1) funkcijos, skirtos darbui su „Ambari“, 2) funkcijos, skirtos darbui su MCS, 3) funkcijos, tiesiogiai susijusios su automatinio skalerio logika.
  2. Scenarijus observer.py. Iš esmės tai susideda iš skirtingų taisyklių: kada ir kokiais momentais iškviesti automatinio mastelio funkcijas.
  3. Konfigūracijos failas config.py. Jame, pavyzdžiui, yra mazgų, leidžiamų automatiniam mastelio keitimui, sąrašas ir kiti parametrai, kurie turi įtakos, pavyzdžiui, kiek laiko reikia laukti nuo naujo mazgo pridėjimo momento. Taip pat yra pamokų pradžios laiko žymos, kad prieš pamoką būtų paleista maksimali leistina klasterio konfigūracija.

Dabar pažvelkime į kodo dalis pirmuosiuose dviejuose failuose.

1. Autoscaler.py modulis

Ambari klasė

Taip atrodo kodo dalis, kurioje yra klasė Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Aukščiau, kaip pavyzdį, galite pažvelgti į funkcijos įgyvendinimą stop_all_services, kuris sustabdo visas paslaugas norimame klasterio mazge.

Prie įėjimo į klasę Ambari tu praeini:

  • ambari_url, pavyzdžiui, patinka 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – jūsų grupės pavadinimas Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • ir viduje auth čia yra jūsų Ambari prisijungimo vardas ir slaptažodis: auth = ('login', 'password').

Pati funkcija yra ne kas kita, kaip keli skambučiai per REST API į Ambari. Loginiu požiūriu pirmiausia gauname mazge veikiančių paslaugų sąrašą, o tada prašome tam tikrame klasteryje, nurodytame mazge, perkelti paslaugas iš sąrašo į būseną. INSTALLED. Visų paslaugų paleidimo, mazgų perkėlimo į būseną funkcijos Maintenance ir tt atrodo panašiai – tai tik kelios užklausos per API.

Mc klasės

Taip atrodo kodo dalis, kurioje yra klasė Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Prie įėjimo į klasę Mcs mes perduodame projekto ID debesyje ir vartotojo ID, taip pat jo slaptažodį. Funkcijoje vm_turn_on norime įjungti vieną iš mašinų. Logika čia yra šiek tiek sudėtingesnė. Kodo pradžioje vadinamos dar trys funkcijos: 1) turime gauti prieigos raktą, 2) turime konvertuoti pagrindinio kompiuterio pavadinimą į mašinos pavadinimą MCS, 3) gauti šio įrenginio ID. Tada mes tiesiog pateikiame užklausą dėl paskelbimo ir paleidžiame šį įrenginį.

Štai kaip atrodo žetono gavimo funkcija:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Autoscaler klasė

Šioje klasėje yra funkcijų, susijusių su pačia veikimo logika.

Štai kaip atrodo šios klasės kodo dalis:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Priimame užsiėmimus. Ambari и Mcs, mazgų, kuriems leidžiama keisti mastelį, sąrašas, taip pat mazgo konfigūracijos parametrai: atmintis ir procesorius, priskirtas mazgui YARN. Taip pat yra 2 vidiniai parametrai q_ram, q_cpu, kurie yra eilės. Naudodami juos išsaugome esamos klasterio apkrovos reikšmes. Jei matome, kad per paskutines 5 minutes nuolat didėjo apkrova, nusprendžiame, kad prie klasterio reikia pridėti +1 mazgą. Tas pats pasakytina ir apie klasterio nepakankamo panaudojimo būseną.

Aukščiau pateiktas kodas yra funkcijos, kuri pašalina mašiną iš klasterio ir sustabdo ją debesyje, pavyzdys. Pirmiausia vyksta eksploatavimo nutraukimas YARN Nodemanager, tada režimas įsijungia Maintenance, tada sustabdome visas mašinoje esančias paslaugas ir išjungiame virtualią mašiną debesyje.

2. Scenarijaus stebėtojas.py

Kodo pavyzdys iš ten:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Jame patikriname, ar yra sudarytos sąlygos didinti klasterio pajėgumą ir ar nėra rezervuotų mašinų, gauname vieno iš jų host pavadinimą, pridedame jį prie klasterio ir apie tai paskelbiame pranešimą mūsų komandos Slack. Po kurio prasideda cooldown_period, kai mes nieko nepridedame ir nepašaliname iš klasterio, o tiesiog stebime apkrovą. Jei jis stabilizavosi ir yra optimalių apkrovos verčių koridoriuje, mes tiesiog tęsiame stebėjimą. Jei vieno mazgo nepakako, tada pridedame kitą.

Tais atvejais, kai mūsų laukia pamoka, jau tikrai žinome, kad vieno mazgo neužteks, todėl iškart paleidžiame visus laisvus mazgus ir palaikome juos aktyvius iki pamokos pabaigos. Tai nutinka naudojant veiklos laiko žymų sąrašą.

išvada

Autoscaler yra geras ir patogus sprendimas tais atvejais, kai susiduriate su netolygiu klasterių apkrovimu. Jūs vienu metu pasiekiate norimą klasterio konfigūraciją didžiausioms apkrovoms ir tuo pačiu metu neišlaikote šio klasterio per mažą apkrovą, taip sutaupydami pinigų. Na, be to, visa tai vyksta automatiškai be jūsų dalyvavimo. Pats automatinis skaleris yra ne kas kita, kaip užklausų rinkinys klasterio valdytojo API ir debesies tiekėjo API, parašytas pagal tam tikrą logiką. Ką tikrai reikia atsiminti, tai mazgų padalijimas į 3 tipus, kaip rašėme anksčiau. Ir tu būsi laimingas.

Šaltinis: www.habr.com

Добавить комментарий