Kako napraviti vlastiti autoscaler za klaster

Zdravo! Obučavamo ljude za rad s velikim podacima. Nemoguće je zamisliti obrazovni program o velikim podacima bez vlastitog klastera na kojem svi sudionici rade zajedno. Iz tog razloga ga naš program uvijek ima 🙂 Bavimo se njegovom konfiguracijom, podešavanjem i administracijom, a dečki tamo izravno pokreću MapReduce poslove i koriste Spark.

U ovom postu ćemo vam reći kako smo riješili problem neravnomjernog učitavanja klastera pisanjem vlastitog autoscalera pomoću oblaka Mail.ru Cloud rješenja.

problem

Naš se klaster ne koristi u tipičnom načinu rada. Odlaganje je vrlo neravnomjerno. Recimo, postoji praktična nastava, kada svih 30 ljudi i profesor odu u klaster i počnu ga koristiti. Ili opet, ima dana prije roka kada se opterećenje jako poveća. Ostatak vremena klaster radi u načinu rada pod opterećenjem.

Rješenje broj 1 je zadržati klaster koji će izdržati vršna opterećenja, ali će ostatak vremena biti u stanju mirovanja.

Rješenje #2 je zadržati mali klaster, kojem ručno dodajete čvorove prije nastave i tijekom vršnih opterećenja.

Rješenje #3 je zadržati mali klaster i napisati autoscaler koji će pratiti trenutno opterećenje klastera i, koristeći različite API-je, dodavati i uklanjati čvorove iz klastera.

U ovom postu ćemo govoriti o rješenju #3. Ovaj autoscaler uvelike ovisi o vanjskim čimbenicima, a ne o internim, a pružatelji ga često ne pružaju. Koristimo infrastrukturu oblaka Mail.ru Cloud Solutions i napisali smo autoscaler koristeći MCS API. A budući da učimo kako raditi s podacima, odlučili smo pokazati kako možete napisati sličan autoscaler za vlastite potrebe i koristiti ga sa svojim oblakom

Preduvjeti

Prvo, morate imati Hadoop klaster. Na primjer, koristimo HDP distribuciju.

Kako bi se vaši čvorovi brzo dodavali i uklanjali, morate imati određenu raspodjelu uloga među čvorovima.

  1. Glavni čvor. Pa, nema potrebe posebno objašnjavati: glavni čvor klastera, na kojem se, na primjer, pokreće Spark driver, ako koristite interaktivni način rada.
  2. Datumski čvor. Ovo je čvor na kojem pohranjujete podatke o HDFS-u i gdje se odvijaju izračuni.
  3. Računalni čvor. Ovo je čvor gdje ne pohranjujete ništa na HDFS, ali gdje se događaju izračuni.

Važna točka. Automatsko skaliranje će se dogoditi zbog čvorova treće vrste. Ako počnete uzimati i dodavati čvorove druge vrste, brzina odgovora bit će vrlo niska - dekomisioniranje i ponovno uključivanje trajat će satima na vašem klasteru. To, naravno, nije ono što očekujete od automatskog skaliranja. Odnosno, ne diramo čvorove prve i druge vrste. Oni će predstavljati minimalni održivi klaster koji će postojati tijekom trajanja programa.

Dakle, naš autoscaler je napisan u Pythonu 3, koristi Ambari API za upravljanje uslugama klastera, koristi API tvrtke Mail.ru Cloud Solutions (MCS) za pokretanje i zaustavljanje strojeva.

Arhitektura rješenja

  1. Modul autoscaler.py. Sadrži tri klase: 1) funkcije za rad s Ambarijem, 2) funkcije za rad s MCS-om, 3) funkcije koje su izravno povezane s logikom autoscalera.
  2. Skripta observer.py. U biti se sastoji od različitih pravila: kada i u kojim trenucima pozvati funkcije autoscalera.
  3. Konfiguracijska datoteka config.py. Sadrži, na primjer, popis čvorova dopuštenih za automatsko skaliranje i druge parametre koji utječu na, na primjer, koliko dugo treba čekati od trenutka dodavanja novog čvora. Postoje i vremenske oznake za početak nastave, tako da se prije nastave pokreće maksimalno dopuštena konfiguracija klastera.

Pogledajmo sada dijelove koda unutar prve dvije datoteke.

1. Modul Autoscaler.py

klasa Ambari

Ovako izgleda dio koda koji sadrži klasu Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Gore, kao primjer, možete pogledati implementaciju funkcije stop_all_services, koji zaustavlja sve usluge na željenom čvoru klastera.

Na ulazu u razred Ambari ti prolaziš:

  • ambari_url, na primjer, poput 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – naziv vašeg klastera u Ambariju,
  • headers = {'X-Requested-By': 'ambari'}
  • i iznutra auth ovdje je vaša prijava i lozinka za Ambari: auth = ('login', 'password').

Sama funkcija nije ništa više od nekoliko poziva putem REST API-ja prema Ambariju. S logičke točke gledišta, prvo primamo popis pokrenutih usluga na čvoru, a zatim tražimo od danog klastera, na danom čvoru, da prenese usluge s popisa u stanje INSTALLED. Funkcije za pokretanje svih servisa, za prijenos čvorova u stanje Maintenance itd. izgledaju slično - to je samo nekoliko zahtjeva putem API-ja.

Klasa Mcs

Ovako izgleda dio koda koji sadrži klasu Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Na ulazu u razred Mcs prosljeđujemo ID projekta unutar oblaka i ID korisnika, kao i njegovu lozinku. U funkciji vm_turn_on želimo uključiti jedan od strojeva. Logika je ovdje malo kompliciranija. Na početku koda pozivaju se tri druge funkcije: 1) trebamo dobiti token, 2) trebamo pretvoriti naziv hosta u naziv stroja u MCS-u, 3) dobiti id ovog stroja. Zatim jednostavno napravimo zahtjev za objavu i pokrenemo ovaj stroj.

Ovako izgleda funkcija za dobivanje tokena:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Klasa autoscalera

Ova klasa sadrži funkcije povezane sa samom operativnom logikom.

Ovako izgleda dio koda za ovu klasu:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Primamo tečajeve za upis. Ambari и Mcs, popis čvorova koji su dopušteni za skaliranje, kao i konfiguracijski parametri čvora: memorija i CPU dodijeljeni čvoru u YARN-u. Također postoje 2 interna parametra q_ram, q_cpu, koji su redovi čekanja. Koristeći ih, pohranjujemo vrijednosti trenutnog opterećenja klastera. Ako vidimo da je tijekom zadnjih 5 minuta postojalo stalno povećano opterećenje, tada odlučujemo da trebamo dodati +1 čvor u klaster. Isto vrijedi i za stanje podiskorištenosti klastera.

Gornji kod primjer je funkcije koja uklanja stroj iz klastera i zaustavlja ga u oblaku. Prvo dolazi do razgradnje YARN Nodemanager, tada se uključuje način rada Maintenance, tada zaustavljamo sve usluge na stroju i isključujemo virtualni stroj u oblaku.

2. Skripta promatrač.py

Ogledni kod od tamo:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

U njemu provjeravamo jesu li stvoreni uvjeti za povećanje kapaciteta klastera i ima li strojeva u rezervi, dobivamo hostname jednog od njih, dodajemo ga u klaster i o tome objavljujemo poruku na Slacku našeg tima. Nakon čega počinje cooldown_period, kada ništa ne dodajemo niti uklanjamo iz klastera, već samo pratimo opterećenje. Ako se stabilizirao i nalazi se u koridoru optimalnih vrijednosti opterećenja, onda jednostavno nastavljamo praćenje. Ako jedan čvor nije bio dovoljan, tada dodajemo još jedan.

Za slučajeve kada nas čeka lekcija, već sigurno znamo da jedan čvor neće biti dovoljan, pa odmah pokrećemo sve slobodne čvorove i držimo ih aktivnima do kraja lekcije. To se događa pomoću popisa vremenskih oznaka aktivnosti.

Zaključak

Autoscaler je dobro i praktično rješenje za one slučajeve kada imate neravnomjerno učitavanje klastera. Istovremeno postižete željenu konfiguraciju klastera za vršna opterećenja i u isto vrijeme ne zadržavate ovaj klaster tijekom podopterećenja, štedeći novac. Pa, plus to se sve događa automatski bez vašeg sudjelovanja. Sam autoscaler nije ništa više od skupa zahtjeva za API upravitelja klastera i API pružatelja usluga oblaka, napisanih prema određenoj logici. Ono što svakako trebate zapamtiti je podjela čvorova na 3 vrste, kao što smo već napisali. I bit ćete sretni.

Izvor: www.habr.com

Dodajte komentar