Kako narediti svoj samodejni skalirnik za gručo

Zdravo! Ljudi usposabljamo za delo z velikimi podatki. Nemogoče si je predstavljati izobraževalni program o velikih podatkih brez lastnega grozda, na katerem sodelujejo vsi udeleženci. Zato ga naš program vedno ima 🙂 Ukvarjamo se z njegovo konfiguracijo, prilagajanjem in administracijo, fantje pa tam neposredno zaženejo opravila MapReduce in uporabljajo Spark.

V tej objavi vam bomo povedali, kako smo rešili problem neenakomernega nalaganja gruče s pisanjem lastnega samodejnega skalirnika z uporabo oblaka Mail.ru rešitve v oblaku.

problem

Naša gruča se ne uporablja v običajnem načinu. Odlaganje je zelo neenakomerno. Na primer, obstajajo praktične ure, ko gre vseh 30 ljudi in učitelj v grozd in ga začne uporabljati. Ali pa so spet dnevi pred rokom, ko se obremenitev močno poveča. Preostali čas deluje grozd v načinu podobremenitve.

Rešitev št. 1 je ohraniti gručo, ki bo zdržala največje obremenitve, vendar bo preostali čas mirovala.

Rešitev št. 2 je ohraniti majhno gručo, v katero ročno dodate vozlišča pred razredi in med največjimi obremenitvami.

Rešitev št. 3 je ohraniti majhno gručo in napisati samodejni skalirnik, ki bo spremljal trenutno obremenitev gruče ter z uporabo različnih API-jev dodajal in odstranjeval vozlišča iz gruče.

V tej objavi bomo govorili o rešitvi #3. Ta autoscaler je bolj odvisen od zunanjih dejavnikov kot od notranjih in ponudniki ga pogosto ne zagotavljajo. Uporabljamo infrastrukturo v oblaku Mail.ru Cloud Solutions in smo napisali samodejni skalirnik z uporabo API-ja MCS. In ker učimo, kako delati s podatki, smo se odločili pokazati, kako lahko napišete podoben samodejni skalirnik za lastne namene in ga uporabite s svojim oblakom

Predpogoji

Najprej morate imeti gručo Hadoop. Na primer, uporabljamo distribucijo HDP.

Za hitro dodajanje in odstranjevanje vozlišč morate imeti določeno porazdelitev vlog med vozlišči.

  1. Glavno vozlišče. No, ni treba ničesar posebej razlagati: glavno vozlišče gruče, na katerem se na primer zažene gonilnik Spark, če uporabljate interaktivni način.
  2. Datumsko vozlišče. To je vozlišče, na katerem shranjujete podatke o HDFS in kjer potekajo izračuni.
  3. Računalniško vozlišče. To je vozlišče, kjer ne shranjujete ničesar v HDFS, ampak kjer se izvajajo izračuni.

Pomembna točka. Samodejno skaliranje bo prišlo zaradi vozlišč tretje vrste. Če začnete jemati in dodajati vozlišča druge vrste, bo hitrost odziva zelo nizka – razgradnja in ponovna zaveza bosta v vaši gruči trajala ure. To seveda ni tisto, kar pričakujete od samodejnega skaliranja. To pomeni, da se ne dotikamo vozlišč prve in druge vrste. Predstavljali bodo minimalno sposobni grozd, ki bo obstajal ves čas trajanja programa.

Torej je naš samodejni skalirnik napisan v Pythonu 3, uporablja API Ambari za upravljanje storitev gruče, uporablja API iz Mail.ru Cloud Solutions (MCS) za zagon in zaustavitev strojev.

Arhitektura rešitve

  1. Modul autoscaler.py. Vsebuje tri razrede: 1) funkcije za delo z Ambari, 2) funkcije za delo z MCS, 3) funkcije, ki so neposredno povezane z logiko samodejnega merilnika.
  2. Skripta observer.py. V bistvu je sestavljen iz različnih pravil: kdaj in v katerih trenutkih poklicati funkcije samodejnega skaliranja.
  3. Konfiguracijska datoteka config.py. Vsebuje na primer seznam vozlišč, dovoljenih za samodejno skaliranje, in druge parametre, ki na primer vplivajo na to, kako dolgo je treba čakati od trenutka, ko je bilo dodano novo vozlišče. Obstajajo tudi časovni žigi za začetek predavanj, tako da se pred razredom zažene največja dovoljena konfiguracija gruče.

Poglejmo zdaj dele kode v prvih dveh datotekah.

1. Modul Autoscaler.py

razred Ambari

Tako je videti del kode, ki vsebuje razred Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Zgoraj, kot primer, si lahko ogledate izvajanje funkcije stop_all_services, ki ustavi vse storitve na želenem vozlišču gruče.

Na vhodu v razred Ambari greš mimo:

  • ambari_url, na primer všeč 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – ime vašega grozda v Ambariju,
  • headers = {'X-Requested-By': 'ambari'}
  • in notri auth tukaj je vaša prijava in geslo za Ambari: auth = ('login', 'password').

Sama funkcija ni nič drugega kot nekaj klicev prek API-ja REST za Ambari. Z logičnega vidika najprej prejmemo seznam delujočih storitev na vozlišču, nato pa od dane gruče, danega vozlišča zahtevamo prenos storitev s seznama v stanje INSTALLED. Funkcije za zagon vseh storitev, za prenos vozlišč v stanje Maintenance itd. videti podobno - gre le za nekaj zahtev prek API-ja.

Razred Mcs

Tako je videti del kode, ki vsebuje razred Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Na vhodu v razred Mcs posredujemo ID projekta znotraj oblaka in ID uporabnika ter njegovo geslo. V funkciji vm_turn_on želimo vklopiti enega od strojev. Logika je tu nekoliko bolj zapletena. Na začetku kode so priklicane tri druge funkcije: 1) pridobiti moramo žeton, 2) pretvoriti moramo ime gostitelja v ime stroja v MCS, 3) pridobiti ID tega stroja. Nato preprosto naredimo zahtevo za objavo in zaženemo ta stroj.

Tako izgleda funkcija za pridobitev žetona:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Razred samodejnega skaliranja

Ta razred vsebuje funkcije, povezane s samo logiko delovanja.

Tako izgleda del kode za ta razred:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Sprejemamo predavanja za vpis. Ambari и Mcs, seznam vozlišč, ki so dovoljena za skaliranje, kot tudi konfiguracijski parametri vozlišča: pomnilnik in procesor, dodeljen vozlišču v YARN. Obstajata tudi 2 notranja parametra q_ram, q_cpu, ki sta čakalni vrsti. Z njihovo pomočjo shranimo vrednosti trenutne obremenitve gruče. Če opazimo, da se je v zadnjih 5 minutah stalno povečevala obremenitev, se odločimo, da moramo dodati +1 vozlišče v gručo. Enako velja za stanje premajhne izkoriščenosti gruče.

Zgornja koda je primer funkcije, ki odstrani stroj iz gruče in ga ustavi v oblaku. Najprej je razgradnja YARN Nodemanager, potem se način vklopi Maintenance, nato zaustavimo vse storitve na stroju in izklopimo virtualni stroj v oblaku.

2. Skript opazovalec.py

Vzorčna koda od tam:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

V njem preverimo, ali so ustvarjeni pogoji za povečanje zmogljivosti gruče in ali so še kakšni stroji v rezervi, pridobimo ime gostitelja enega od njih, ga dodamo v gručo in o tem objavimo sporočilo na Slacku naše ekipe. Po katerem se začne cooldown_period, ko v gručo ničesar ne dodajamo ali odvzemamo, temveč zgolj spremljamo obremenitev. Če se je stabilizirala in je v okviru optimalnih vrednosti obremenitve, potem preprosto nadaljujemo z monitoringom. Če eno vozlišče ni bilo dovolj, dodamo še eno.

Za primere, ko je pred nami lekcija, že zagotovo vemo, da eno vozlišče ne bo dovolj, zato takoj zaženemo vsa prosta vozlišča in jih pustimo aktivne do konca lekcije. To se zgodi z uporabo seznama časovnih žigov dejavnosti.

Zaključek

Autoscaler je dobra in priročna rešitev za tiste primere, ko imate neenakomerno nalaganje gruče. Hkrati dosežete želeno konfiguracijo gruče za največje obremenitve in hkrati ne zadržite te gruče med podobremenitvijo, s čimer prihranite denar. No, poleg tega se vse to zgodi samodejno brez vaše udeležbe. Autoscaler sam po sebi ni nič drugega kot nabor zahtev za API upravitelja gruče in API ponudnika oblaka, napisanih po določeni logiki. Vsekakor si morate zapomniti razdelitev vozlišč na 3 vrste, kot smo že zapisali. In srečni boste.

Vir: www.habr.com

Dodaj komentar