Kā izveidot klasterim savu automātisko skalu

Sveiki! Mēs apmācām cilvēkus strādāt ar lielajiem datiem. Nav iespējams iedomāties izglÄ«tojoÅ”u programmu par lielajiem datiem bez sava klastera, kurā visi dalÄ«bnieki strādā kopā. Å Ä« iemesla dēļ mÅ«su programmai tas vienmēr ir šŸ™‚ Mēs nodarbojamies ar tās konfigurÄ“Å”anu, regulÄ“Å”anu un administrÄ“Å”anu, un puiÅ”i tur tieÅ”i palaiž MapReduce darbus un izmanto Spark.

Å ajā ierakstā mēs jums pastāstÄ«sim, kā mēs atrisinājām nevienmērÄ«gas klasteru ielādes problēmu, rakstot paÅ”i savu autoscaler, izmantojot mākoni. Mail.ru mākoņa risinājumi.

problēma

MÅ«su klasteris netiek izmantots parastajā režīmā. ApglabāŔana ir ļoti nevienmērÄ«ga. Piemēram, ir praktiskās nodarbÄ«bas, kad visi 30 cilvēki un skolotājs aiziet uz klasteri un sāk to lietot. Vai arÄ« atkal ir dienas pirms termiņa, kad slodze ievērojami palielinās. Pārējā laikā klasteris darbojas nepietiekamas slodzes režīmā.

Risinājums Nr. 1 ir saglabāt kopu, kas izturēs maksimālās slodzes, bet pārējā laikā būs dīkstāvē.

2. risinājums ir saglabāt nelielu kopu, kurai manuāli pievienojat mezglus pirms nodarbībām un maksimālās slodzes laikā.

Risinājums Nr. 3 ir saglabāt nelielu klasteru un uzrakstÄ«t automātisko mērogoÅ”anu, kas uzraudzÄ«s klastera paÅ”reizējo slodzi un, izmantojot dažādus API, pievienos un noņems mezglus no klastera.

Å ajā rakstā mēs runāsim par risinājumu Nr. 3. Å is automātiskais mērogoÅ”anas lÄ«dzeklis ir ļoti atkarÄ«gs no ārējiem faktoriem, nevis no iekŔējiem faktoriem, un pakalpojumu sniedzēji to bieži nenodroÅ”ina. Mēs izmantojam Mail.ru Cloud Solutions mākoņa infrastruktÅ«ru un uzrakstÄ«jām automātisko mērogoÅ”anu, izmantojot MCS API. Un tā kā mēs mācām strādāt ar datiem, mēs nolēmām parādÄ«t, kā varat uzrakstÄ«t lÄ«dzÄ«gu automātisko skalu saviem mērÄ·iem un izmantot to savā mākonÄ«.

PriekŔzināŔanas

Pirmkārt, jums ir jābÅ«t Hadoop klasterim. Piemēram, mēs izmantojam HDP izplatÄ«Å”anu.

Lai jūsu mezgli tiktu ātri pievienoti un noņemti, jums ir jābūt noteiktam lomu sadalījumam starp mezgliem.

  1. Galvenais mezgls. Nu, nekas Ä«paÅ”i nav jāpaskaidro: galvenais klastera mezgls, kurā, piemēram, tiek palaists Spark draiveris, ja izmantojat interaktÄ«vo režīmu.
  2. Datuma mezgls. Šis ir mezgls, kurā glabājat datus HDFS un kur tiek veikti aprēķini.
  3. SkaitļoÅ”anas mezgls. Å is ir mezgls, kurā jÅ«s neko neglabājat HDFS, bet kur notiek aprēķini.

SvarÄ«gs punkts. Automātiskā mērogoÅ”ana notiks treŔā tipa mezglu dēļ. Ja sākat ņemt un pievienot otrā tipa mezglus, reakcijas ātrums bÅ«s ļoti zems ā€” ekspluatācijas pārtraukÅ”ana un atkārtota ievieÅ”ana jÅ«su klasterÄ« prasÄ«s stundas. Tas, protams, nav tas, ko jÅ«s sagaidāt no automātiskās mērogoÅ”anas. Tas ir, mēs nepieskaramies pirmā un otrā veida mezgliem. Tie veidos minimālu dzÄ«votspējÄ«gu kopu, kas pastāvēs visā programmas darbÄ«bas laikā.

Tātad, mÅ«su autoscaler ir rakstÄ«ts Python 3, izmanto Ambari API, lai pārvaldÄ«tu klasteru pakalpojumus, API no Mail.ru Cloud Solutions (MCS) maŔīnu iedarbināŔanai un apturÄ“Å”anai.

Risinājuma arhitektūra

  1. Modulis autoscaler.py. Tajā ir trÄ«s klases: 1) funkcijas darbam ar Ambari, 2) funkcijas darbam ar MCS, 3) funkcijas, kas ir tieÅ”i saistÄ«tas ar automātiskā mērogoÅ”anas loÄ£iku.
  2. Skripts observer.py. BÅ«tÄ«bā tas sastāv no dažādiem noteikumiem: kad un kādos brīžos izsaukt automātiskās mērogoÅ”anas funkcijas.
  3. Konfigurācijas fails config.py. Tas satur, piemēram, to mezglu sarakstu, kuriem atļauts veikt automātisko mērogoÅ”anu, un citus parametrus, kas ietekmē, piemēram, cik ilgi jāgaida no brīža, kad tika pievienots jauns mezgls. Ir arÄ« nodarbÄ«bu sākuma laikspiedoli, lai pirms nodarbÄ«bas tiktu palaista maksimāli pieļaujamā klastera konfigurācija.

Tagad apskatīsim koda fragmentus pirmajos divos failos.

1. Autoscaler.py modulis

Ambari klase

Šādi izskatās koda fragments, kas satur klasi Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

IepriekÅ” kā piemēru varat apskatÄ«t funkcijas ievieÅ”anu stop_all_services, kas aptur visus pakalpojumus vēlamajā klastera mezglā.

Pie ieejas klasē Ambari tu izturi:

  • ambari_url, piemēram, patÄ«k 'http://localhost:8080/api/v1/clusters/',
  • cluster_name - jÅ«su klastera nosaukums Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • un iekŔā auth Å”eit ir jÅ«su Ambari lietotājvārds un parole: auth = ('login', 'password').

Pati funkcija ir nekas vairāk kā pāris zvani caur REST API uz Ambari. No loÄ£iskā viedokļa mēs vispirms saņemam mezglā darbojoÅ”os pakalpojumu sarakstu un pēc tam prasām noteiktā klasterÄ«, konkrētajā mezglā pārsÅ«tÄ«t pakalpojumus no saraksta uz stāvokli. INSTALLED. Funkcijas visu pakalpojumu palaiÅ”anai, mezglu pārsÅ«tÄ«Å”anai uz stāvokli Maintenance utt. izskatās lÄ«dzÄ«gi ā€“ tie ir tikai daži pieprasÄ«jumi caur API.

klase Mcs

Šādi izskatās koda fragments, kas satur klasi Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Pie ieejas klasē Mcs mēs nododam projekta ID mākonÄ« un lietotāja ID, kā arÄ« viņa paroli. Funkcijā vm_turn_on mēs vēlamies ieslēgt vienu no maŔīnām. Å eit loÄ£ika ir nedaudz sarežģītāka. Koda sākumā tiek izsauktas trÄ«s citas funkcijas: 1) jāiegÅ«st marÄ·ieris, 2) jākonvertē resursdatora nosaukums par maŔīnas nosaukumu MCS, 3) jāiegÅ«st Ŕīs maŔīnas id. Pēc tam mēs vienkārÅ”i veicam pasta pieprasÄ«jumu un palaižam Å”o iekārtu.

Šādi izskatās marķiera iegūŔanas funkcija:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Autoscaler klase

Å ajā klasē ir funkcijas, kas saistÄ«tas ar paÅ”u darbÄ«bas loÄ£iku.

Lūk, kā izskatās Ŕīs klases koda daļa:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Pieņemam nodarbÄ«bas ieejai. Ambari Šø Mcs, to mezglu saraksts, kuriem ir atļauts mērogot, kā arÄ« mezgla konfigurācijas parametri: atmiņa un CPU, kas pieŔķirts mezglam YARN. Ir arÄ« 2 iekŔējie parametri q_ram, q_cpu, kas ir rindas. Izmantojot tos, mēs saglabājam paÅ”reizējās klastera slodzes vērtÄ«bas. Ja mēs redzam, ka pēdējo 5 minÅ«Å”u laikā ir pastāvÄ«gi palielināta slodze, mēs nolemjam, ka mums ir jāpievieno klasterim +1 mezgls. Tas pats attiecas uz klastera nepietiekamas izmantoÅ”anas stāvokli.

IepriekÅ” minētais kods ir tādas funkcijas piemērs, kas noņem maŔīnu no klastera un aptur to mākonÄ«. Vispirms notiek ekspluatācijas pārtraukÅ”ana YARN Nodemanager, tad režīms tiek ieslēgts Maintenance, tad mēs apturam visus pakalpojumus maŔīnā un izslēdzam virtuālo maŔīnu mākonÄ«.

2. Skripta novērotājs.py

Koda paraugs no turienes:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Tajā pārbaudām, vai ir radÄ«ti apstākļi klastera kapacitātes palielināŔanai un vai nav rezervē kādas maŔīnas, iegÅ«stam kāda no tām resursdatora nosaukumu, pievienojam to klasterim un par to publicējam ziņu mÅ«su komandas Slack. Pēc kura tas sākas cooldown_period, kad mēs neko nepievienojam un neizņemam no klastera, bet vienkārÅ”i uzraugām slodzi. Ja tas ir nostabilizējies un atrodas optimālo slodzes vērtÄ«bu koridorā, tad vienkārÅ”i turpinām uzraudzÄ«bu. Ja ar vienu mezglu nepietika, tad pievienojam vēl vienu.

GadÄ«jumiem, kad mums priekŔā nodarbÄ«ba, mēs jau droÅ”i zinām, ka ar vienu mezglu nepietiks, tāpēc uzreiz iedarbinām visus brÄ«vos mezglus un turam tos aktÄ«vus lÄ«dz nodarbÄ«bas beigām. Tas notiek, izmantojot aktivitāŔu laikspiedolu sarakstu.

Secinājums

Autoscaler ir labs un ērts risinājums tiem gadÄ«jumiem, kad rodas nevienmērÄ«ga klasteru ielāde. JÅ«s vienlaikus sasniedzat vēlamo klastera konfigurāciju maksimālajām slodzēm un tajā paŔā laikā nesaglabājat Å”o klasteru nepietiekamas slodzes laikā, ietaupot naudu. Turklāt tas viss notiek automātiski bez jÅ«su lÄ«dzdalÄ«bas. Pats automātiskais mērogoÅ”anas lÄ«dzeklis ir nekas vairāk kā pieprasÄ«jumu kopa klasteru pārvaldnieka API un mākoņa nodroÅ”inātāja API, kas rakstÄ«ti saskaņā ar noteiktu loÄ£iku. Tas, kas jums noteikti ir jāatceras, ir mezglu sadalÄ«jums 3 veidos, kā mēs rakstÄ«jām iepriekÅ”. Un tu bÅ«si laimÄ«gs.

Avots: www.habr.com

Pievieno komentāru