Ako si vyrobiť vlastný autoscaler pre klaster

Ahoj! Školíme ľudí na prácu s veľkými dátami. Vzdelávací program o veľkých dátach si nemožno predstaviť bez vlastného klastra, na ktorom spolupracujú všetci účastníci. Z tohto dôvodu ho náš program vždy má 🙂 Venujeme sa jeho konfigurácii, ladeniu a správe a chalani tam priamo spúšťajú úlohy MapReduce a používajú Spark.

V tomto príspevku vám povieme, ako sme vyriešili problém nerovnomerného načítania klastra napísaním vlastného automatického škálovača pomocou cloudu Cloudové riešenia Mail.ru.

problém

Náš klaster sa nepoužíva v typickom režime. Likvidácia je veľmi nerovnomerná. Existujú napríklad praktické hodiny, keď všetkých 30 ľudí a učiteľ idú do klastra a začnú ho používať. Alebo opäť existujú dni pred termínom, kedy sa zaťaženie výrazne zvyšuje. Zvyšok času klaster pracuje v režime nedostatočného zaťaženia.

Riešením č. 1 je zachovať klaster, ktorý vydrží špičkové zaťaženie, ale zvyšok času bude nečinný.

Riešením č. 2 je ponechať malý klaster, do ktorého ručne pridávate uzly pred triedami a počas špičkového zaťaženia.

Riešením č. 3 je ponechať malý klaster a napísať automatický škálovač, ktorý bude monitorovať aktuálne zaťaženie klastra a pomocou rôznych API pridávať a odstraňovať uzly z klastra.

V tomto príspevku budeme hovoriť o riešení #3. Tento autoscaler je vo veľkej miere závislý skôr od vonkajších faktorov ako od vnútorných a poskytovatelia ho často neposkytujú. Používame cloudovú infraštruktúru Mail.ru Cloud Solutions a napísali sme autoscaler pomocou MCS API. A keďže učíme pracovať s dátami, rozhodli sme sa ukázať, ako si môžete podobný autoscaler napísať pre vlastné účely a použiť ho s vašim cloudom

Predpoklady

Najprv musíte mať klaster Hadoop. Napríklad používame distribúciu HDP.

Aby bolo možné rýchlo pridávať a odstraňovať vaše uzly, musíte mať medzi uzlami určité rozdelenie rolí.

  1. Hlavný uzol. Nie je potrebné nič špeciálne vysvetľovať: hlavný uzol klastra, na ktorom sa napríklad spúšťa ovládač Spark, ak používate interaktívny režim.
  2. Dátumový uzol. Toto je uzol, na ktorom ukladáte dáta na HDFS a kde prebiehajú výpočty.
  3. Výpočtový uzol. Toto je uzol, kde neukladáte nič na HDFS, ale kde prebiehajú výpočty.

Dôležitý bod. Automatické škálovanie nastane v dôsledku uzlov tretieho typu. Ak začnete brať a pridávať uzly druhého typu, rýchlosť odozvy bude veľmi nízka – vyradenie z prevádzky a opätovné spustenie bude na vašom klastri trvať hodiny. To, samozrejme, nie je to, čo od automatického škálovania očakávate. To znamená, že sa nedotýkame uzlov prvého a druhého typu. Budú predstavovať minimálny životaschopný klaster, ktorý bude existovať počas trvania programu.

Takže náš automatický škálovač je napísaný v Pythone 3, používa Ambari API na správu klastrových služieb API od Mail.ru Cloud Solutions (MCS) na spúšťanie a zastavovanie strojov.

Architektúra riešenia

  1. Modul autoscaler.py. Obsahuje tri triedy: 1) funkcie pre prácu s Ambari, 2) funkcie pre prácu s MCS, 3) funkcie súvisiace priamo s logikou autoscaleru.
  2. Skript observer.py. V podstate pozostáva z rôznych pravidiel: kedy a v ktorých momentoch volať funkcie automatického škálovača.
  3. Konfiguračný súbor config.py. Obsahuje napríklad zoznam uzlov povolených pre automatické škálovanie a ďalšie parametre, ktoré ovplyvňujú napríklad to, ako dlho sa bude čakať od pridania nového uzla. K dispozícii sú aj časové pečiatky pre začiatok tried, aby sa pred triedou spustila maximálna povolená konfigurácia klastra.

Pozrime sa teraz na časti kódu v prvých dvoch súboroch.

1. Modul Autoscaler.py

trieda Ambari

Takto vyzerá časť kódu obsahujúca triedu Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Vyššie sa ako príklad môžete pozrieť na implementáciu funkcie stop_all_services, ktorý zastaví všetky služby na požadovanom uzle klastra.

Pri vchode do triedy Ambari prejdeš:

  • ambari_url, napríklad, ako 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – názov vášho klastra v Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • a vnútri auth tu je vaše používateľské meno a heslo pre Ambari: auth = ('login', 'password').

Samotná funkcia nie je nič iné ako pár hovorov cez REST API do Ambari. Z logického hľadiska najprv dostaneme zoznam spustených služieb na uzle a potom požiadame na danom klastri, na danom uzle, aby sme preniesli služby zo zoznamu do stavu INSTALLED. Funkcie na spustenie všetkých služieb, na prenos uzlov do stavu Maintenance atď vyzerajú podobne - je to len niekoľko požiadaviek cez API.

Class Mcs

Takto vyzerá časť kódu obsahujúca triedu Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Pri vchode do triedy Mcs odovzdávame ID projektu v rámci cloudu a ID používateľa, ako aj jeho heslo. Vo funkcii vm_turn_on chceme zapnúť jeden zo strojov. Logika je tu trochu komplikovanejšia. Na začiatku kódu sa volajú ďalšie tri funkcie: 1) musíme získať token, 2) musíme previesť názov hostiteľa na názov stroja v MCS, 3) získať id tohto stroja. Potom jednoducho požiadame o príspevok a spustíme tento stroj.

Takto vyzerá funkcia získania tokenu:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Trieda Autoscaler

Táto trieda obsahuje funkcie súvisiace so samotnou prevádzkovou logikou.

Takto vyzerá časť kódu pre túto triedu:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Prijímame triedy na vstup. Ambari и Mcs, zoznam uzlov, ktoré sú povolené na škálovanie, ako aj konfiguračné parametre uzla: pamäť a procesor pridelené uzlu v YARN. Existujú aj 2 interné parametre q_ram, q_cpu, čo sú fronty. Pomocou nich ukladáme hodnoty aktuálneho zaťaženia klastra. Ak vidíme, že za posledných 5 minút dochádzalo k sústavne zvýšenému zaťaženiu, potom sa rozhodneme, že musíme do klastra pridať uzol +1. To isté platí pre stav nedostatočného využitia klastra.

Vyššie uvedený kód je príkladom funkcie, ktorá odstráni počítač z klastra a zastaví ho v cloude. Najprv dôjde k vyradeniu z prevádzky YARN Nodemanager, potom sa režim zapne Maintenance, potom zastavíme všetky služby na stroji a vypneme virtuálny stroj v cloude.

2. Pozorovateľ skriptu.py

Vzorový kód odtiaľ:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

V ňom skontrolujeme, či sú vytvorené podmienky na zvýšenie kapacity klastra a či sú v zálohe nejaké stroje, získame názov hostiteľa jedného z nich, pridáme ho do klastra a zverejníme o tom správu na Slacku nášho tímu. Po ktorej to začne cooldown_period, kedy z klastra nič nepridávame ani neuberáme, ale jednoducho sledujeme záťaž. Ak sa stabilizoval a je v koridore optimálnych hodnôt zaťaženia, tak jednoducho pokračujeme v monitorovaní. Ak jeden uzol nestačil, pridáme ďalší.

Pre prípady, keď nás čaká lekcia, už s istotou vieme, že jeden uzol nebude stačiť, preto okamžite spustíme všetky voľné uzly a necháme ich aktívne až do konca hodiny. To sa deje pomocou zoznamu časových pečiatok aktivity.

Záver

Autoscaler je dobré a pohodlné riešenie pre prípady, keď sa stretnete s nerovnomerným zaťažením klastra. Súčasne dosiahnete požadovanú konfiguráciu klastra pre špičkové zaťaženie a zároveň neuchováte tento klaster počas nedostatočného zaťaženia, čím ušetríte peniaze. No a toto všetko sa deje automaticky bez vašej účasti. Samotný autoscaler nie je nič iné ako súbor požiadaviek na API správcu klastra a API poskytovateľa cloudu, napísaný podľa určitej logiky. Čo si určite musíte zapamätať, je rozdelenie uzlov na 3 typy, ako sme písali skôr. A budete šťastní.

Zdroj: hab.com

Pridať komentár