Nola egin zure autoeskalatzailea kluster baterako

Kaixo! Jendea big datarekin lan egiteko entrenatzen dugu. Ezinezkoa da big datari buruzko hezkuntza programa bat imajinatzea bere klusterrik gabe, zeinean parte-hartzaile guztiek elkarrekin lan egiten duten. Hori dela eta, gure programak beti dauka πŸ™‚ Bere konfigurazioan, doikuntzan eta administrazioan aritzen gara, eta mutilek zuzenean MapReduce lanak abiarazten dituzte bertan eta Spark erabiltzen dute.

Argitalpen honetan kontatuko dizugu nola konpondu genuen kluster karga irregularren arazoa, hodeia erabiliz gure eskalatzaile automatikoa idatziz. Mail.ru Cloud Solutions.

arazoa

Gure klusterra ez da ohiko moduan erabiltzen. Ezabaketa oso irregularra da. Esaterako, klase praktikoak daude, 30 pertsona guztiak eta irakasle bat klusterera joan eta erabiltzen hasten direnean. Edo, berriro ere, karga asko handitzen den egunak daude epea baino lehen. Gainontzeko denboran klusterrak azpikarga moduan funtzionatzen du.

1. irtenbidea da puntako kargak jasango dituen kluster bat mantentzea, baina gainerako denboran inaktibo egongo dena.

2. irtenbidea kluster txiki bat mantentzea da, zeinari eskuz gehitzen dizkiozun nodoak klaseak baino lehen eta karga gorenetan.

3. irtenbidea da kluster txiki bat mantentzea eta klusterraren uneko karga kontrolatuko duen autoscaler bat idaztea eta, hainbat API erabiliz, klusterretik nodoak gehitu eta kenduko dituena.

Post honetan #3 irtenbideari buruz hitz egingo dugu. Eskalatzaile automatiko hau kanpoko faktoreen menpekoa da, barnekoen ordez, eta hornitzaileek askotan ez dute ematen. Mail.ru Cloud Solutions hodeiko azpiegitura erabiltzen dugu eta eskalatzaile automatikoa idatzi dugu MCS APIa erabiliz. Eta datuekin nola lan egiten irakasten dugunez, zure helburuetarako antzeko autoeskalatzaile bat nola idatz dezakezun eta hodeiarekin nola erabil dezakezun erakustea erabaki dugu.

Aurrebaldintzak

Lehenik eta behin, Hadoop kluster bat izan behar duzu. Adibidez, HDP banaketa erabiltzen dugu.

Zure nodoak azkar gehitu eta kendu ahal izateko, nodoen artean rolen banaketa jakin bat izan behar duzu.

  1. Nodo nagusia. Bada, ez dago ezer bereziki azaldu beharrik: klusterraren nodo nagusia, zeinean, adibidez, Spark kontrolatzailea abiarazten den, modu interaktiboa erabiltzen baduzu.
  2. Data-nodoa. Hau da HDFS-n datuak gordetzen dituzun nodoa eta non kalkuluak egiten diren.
  3. Konputazio-nodoa. HDFS-n ezer gordetzen ez duzun nodo bat da, baina kalkuluak gertatzen dira.

Puntu garrantzitsua. Autoeskala hirugarren motako nodoengatik gertatuko da. Bigarren motako nodoak hartzen eta gehitzen hasten bazara, erantzun-abiadura oso baxua izango da; desaktibatzeko eta berriro konprometitzeak orduak beharko ditu zure klusterrean. Hau, noski, ez da autoeskalatzetik espero duzuna. Hau da, ez ditugu lehenengo eta bigarren motako nodoak ukitzen. Programak irauten duen bitartean egongo den gutxieneko kluster bideragarria irudikatuko dute.

Beraz, gure autoscaler Python 3-n idatzita dago, Ambari APIa erabiltzen du kluster zerbitzuak kudeatzeko, erabiltzen Mail.ru Cloud Solutions-en APIa (MCS) makinak abiarazteko eta gelditzeko.

Irtenbideen arkitektura

  1. Modulua autoscaler.py. Hiru klase ditu: 1) Ambari-rekin lan egiteko funtzioak, 2) MCS-ekin lan egiteko funtzioak, 3) eskalatzaile automatikoaren logikarekin zuzenean lotutako funtzioak.
  2. Gidoia observer.py. Funtsean, arau ezberdinek osatzen dute: noiz eta zein unetan deitu autoeskalatzaile funtzioei.
  3. Konfigurazio fitxategia config.py. Bertan, adibidez, eskala automatikorako baimendutako nodoen zerrenda eta nodo berri bat gehitzen den unetik zenbat denbora itxaron behar den eragina duten beste parametro batzuk daude, adibidez. Klaseak hasteko denbora-zigiluak ere badaude, klasearen aurretik baimendutako gehienezko kluster konfigurazioa abiarazten da.

Ikus ditzagun orain lehen bi fitxategien barruan dauden kode zatiak.

1. Autoscaler.py modulua

Ambari klasea

Hauxe da klase bat duen kode zati batek Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Goian, adibide gisa, funtzioaren inplementazioa ikus dezakezu stop_all_services, nahi den klusterreko nodoko zerbitzu guztiak geldiarazten dituena.

Klaseko sarreran Ambari pasatzen duzu:

  • ambari_url, adibidez, atsegin 'http://localhost:8080/api/v1/clusters/',
  • cluster_name - Ambari-n zure klusterraren izena,
  • headers = {'X-Requested-By': 'ambari'}
  • eta barruan auth Hona hemen zure saioa eta pasahitza Ambarirako: auth = ('login', 'password').

Funtzioa bera Ambari-ri REST APIaren bidez dei pare bat baino ez da. Ikuspuntu logikotik, lehenik eta behin nodo batean martxan dauden zerbitzuen zerrenda jasotzen dugu, eta gero kluster jakin batean, nodo jakin batean, zerbitzuak zerrendatik egoerara transferitzeko eskatuko dugu. INSTALLED. Zerbitzu guztiak abiarazteko funtzioak, nodoak estatura transferitzeko Maintenance eta abar antzekoak dira - APIaren bidezko eskaera batzuk besterik ez dira.

Klase Mcs

Hauxe da klase bat duen kode zati batek Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Klaseko sarreran Mcs hodeiaren barruan proiektuaren IDa eta erabiltzaile IDa pasatzen ditugu, baita bere pasahitza ere. Funtzioan vm_turn_on makinetako bat piztu nahi dugu. Hemen logika pixka bat konplexuagoa da. Kodearen hasieran, beste hiru funtzio deitzen dira: 1) token bat lortu behar dugu, 2) ostalari-izena MCS-en makinaren izen bihurtu behar dugu, 3) makina honen id-a lortu. Ondoren, argitalpen eskaera bat egin eta makina hau abiarazten dugu.

Hona hemen token bat lortzeko funtzioaren itxura:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Autoscaler klasea

Klase honek funtzionamendu-logikari berari lotutako funtzioak ditu.

Hau da klase honetako kode baten itxura:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Sarrerarako klaseak onartzen ditugu. Ambari ΠΈ Mcs, eskalatzeko onartzen diren nodoen zerrenda, baita nodoen konfigurazio-parametroak ere: YARN-en nodoari esleitutako memoria eta CPU-a. Barneko 2 parametro ere badaude q_ram, q_cpu, ilarak direnak. Horiek erabiliz, uneko kluster kargaren balioak gordetzen ditugu. Azken 5 minutuetan karga etengabe handitu dela ikusten badugu, orduan klusterari +1 nodoa gehitu behar diogula erabakiko dugu. Gauza bera gertatzen da klusterraren azpierabilera egoerarekin.

Goiko kodea makina bat clusterretik kentzen eta hodeian gelditzen duen funtzio baten adibidea da. Lehenik eta behin, desalojoa dago YARN Nodemanager, orduan modua pizten da Maintenance, orduan makinan dauden zerbitzu guztiak gelditzen ditugu eta makina birtuala itzaltzen dugu hodeian.

2. Gidoi-behatzailea.py

Hortik datorren kodearen adibidea:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Bertan, klusterraren ahalmena handitzeko baldintzak sortu diren eta erreserbatan makinarik dagoen ala ez egiaztatzen dugu, horietako baten ostalari-izena lortu, klusterrean gehitu eta horri buruzko mezu bat argitaratzen dugu gure taldeko Slack-en. Horren ostean hasten da cooldown_period, clusterretik ezer gehitzen edo kentzen ez dugunean, karga kontrolatu besterik ez dugunean. Egonkortu bada eta karga-balio optimoen korridorearen barruan badago, jarraipenarekin jarraituko dugu. Nodo bat nahikoa ez balitz, beste bat gehituko dugu.

Ikasgai bat aurretik daukagun kasuetarako, dagoeneko ziur badakigu nodo bat ez dela nahikoa izango, beraz, berehala abiarazten ditugu doako nodo guztiak eta aktibo mantentzen ditugu ikasgaia amaitu arte. Hau jardueren denbora-zigiluen zerrenda erabiliz gertatzen da.

Ondorioa

Autoscaler irtenbide ona eta erosoa da kluster karga irregularra jasaten duzun kasuetarako. Aldi berean, nahi duzun klusterraren konfigurazioa lortzen duzu karga gorenetarako eta, aldi berean, ez duzu kluster hau mantendu azpikargan, dirua aurreztuz. Beno, gainera, hau guztia automatikoki gertatzen da zure parte-hartzerik gabe. Autoscaler bera kluster-kudeatzailearen APIari eta hodei-hornitzailearen APIari egindako eskaera multzo bat baino ez da, logika jakin baten arabera idatzita. Zalantzarik gabe, gogoratu behar duzuna nodoak 3 motatan banatzea da, lehen idatzi dugun bezala. Eta pozik egongo zara.

Iturria: www.habr.com

Gehitu iruzkin berria