Jak vytvořit svůj vlastní autoscaler pro cluster

Ahoj! Školíme lidi pro práci s velkými daty. Vzdělávací program o velkých datech si nelze představit bez vlastního clusteru, na kterém spolupracují všichni účastníci. Z toho důvodu to náš program vždy má 🙂 Zabýváme se jeho konfigurací, laděním a správou a kluci tam přímo spouštějí úlohy MapReduce a používají Spark.

V tomto příspěvku vám řekneme, jak jsme vyřešili problém nerovnoměrného načítání clusteru napsáním vlastního autoscaleru pomocí cloudu Cloudová řešení Mail.ru.

problém

Náš cluster se nepoužívá v typickém režimu. Likvidace je značně nerovnoměrná. Existují například praktické hodiny, kdy všech 30 lidí a učitel půjde do clusteru a začnou ho používat. Nebo jsou opět dny před termínem, kdy se zatížení velmi zvyšuje. Zbytek času cluster pracuje v režimu nedostatečného zatížení.

Řešením č. 1 je zachovat cluster, který vydrží špičkové zatížení, ale po zbytek času bude nečinný.

Řešením č. 2 je ponechat malý cluster, do kterého ručně přidáváte uzly před třídami a během špičkového zatížení.

Řešením č. 3 je ponechat malý cluster a napsat autoscaler, který bude sledovat aktuální zatížení clusteru a pomocí různých API přidávat a odebírat uzly z clusteru.

V tomto příspěvku budeme hovořit o řešení č. 3. Tento autoscaler je vysoce závislý na vnějších faktorech spíše než na interních a poskytovatelé jej často neposkytují. Používáme cloudovou infrastrukturu Mail.ru Cloud Solutions a vytvořili jsme autoscaler pomocí MCS API. A protože učíme, jak s daty pracovat, rozhodli jsme se ukázat, jak si můžete podobný autoscaler napsat pro vlastní účely a použít ho se svým cloudem

Předpoklady

Nejprve musíte mít cluster Hadoop. Využíváme například distribuci HDP.

Aby bylo možné vaše uzly rychle přidávat a odebírat, musíte mít mezi uzly určité rozdělení rolí.

  1. Hlavní uzel. No, není třeba nic zvlášť vysvětlovat: hlavní uzel clusteru, na kterém se spouští například ovladač Spark, pokud používáte interaktivní režim.
  2. Datumový uzel. Toto je uzel, na kterém ukládáte data na HDFS a kde probíhají výpočty.
  3. Výpočetní uzel. Toto je uzel, kde na HDFS nic neukládáte, ale kde se dějí výpočty.

Důležitý bod. K automatickému škálování dojde kvůli uzlům třetího typu. Pokud začnete brát a přidávat uzly druhého typu, rychlost odezvy bude velmi nízká – vyřazení z provozu a opětovné zprovoznění zabere na vašem clusteru hodiny. To samozřejmě není to, co od automatického škálování očekáváte. To znamená, že se nedotýkáme uzlů prvního a druhého typu. Budou představovat minimální životaschopný klastr, který bude existovat po celou dobu trvání programu.

Takže náš autoscaler je napsán v Pythonu 3, používá Ambari API ke správě clusterových služeb API od Mail.ru Cloud Solutions (MCS) pro spouštění a zastavování strojů.

Architektura řešení

  1. Modul autoscaler.py. Obsahuje tři třídy: 1) funkce pro práci s Ambari, 2) funkce pro práci s MCS, 3) funkce související přímo s logikou autoscaleru.
  2. Skript observer.py. V podstatě se skládá z různých pravidel: kdy a v jakých okamžicích volat funkce autoscaler.
  3. Konfigurační soubor config.py. Obsahuje například seznam uzlů povolených pro autoscaling a další parametry, které ovlivňují například to, jak dlouho se bude čekat od okamžiku přidání nového uzlu. K dispozici jsou také časová razítka pro zahájení tříd, takže před třídou je spuštěna maximální povolená konfigurace clusteru.

Podívejme se nyní na části kódu uvnitř prvních dvou souborů.

1. Modul Autoscaler.py

třída Ambari

Takto vypadá část kódu obsahující třídu Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Výše se jako příklad můžete podívat na implementaci funkce stop_all_services, který zastaví všechny služby na požadovaném uzlu clusteru.

U vchodu do třídy Ambari prošel jsi:

  • ambari_url, například jako 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – název vašeho clusteru v Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • a uvnitř auth zde je vaše přihlašovací jméno a heslo pro Ambari: auth = ('login', 'password').

Samotná funkce není nic jiného než pár volání přes REST API do Ambari. Z logického hlediska nejprve obdržíme seznam běžících služeb na uzlu a poté požádáme na daném clusteru, na daném uzlu, abychom přenesli služby ze seznamu do stavu INSTALLED. Funkce pro spouštění všech služeb, pro přenos uzlů do stavu Maintenance atd. vypadají podobně - je to jen pár požadavků přes API.

Třída Mcs

Takto vypadá část kódu obsahující třídu Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

U vchodu do třídy Mcs předáme ID projektu uvnitř cloudu a ID uživatele a také jeho heslo. Ve funkci vm_turn_on chceme zapnout jeden ze strojů. Logika je zde trochu složitější. Na začátku kódu jsou volány další tři funkce: 1) potřebujeme získat token, 2) potřebujeme převést název hostitele na název stroje v MCS, 3) získat id tohoto stroje. Dále jednoduše provedeme požadavek na příspěvek a spustíme tento stroj.

Takto vypadá funkce pro získání tokenu:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Třída Autoscaler

Tato třída obsahuje funkce související se samotnou provozní logikou.

Takto vypadá část kódu pro tuto třídu:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Přijímáme třídy pro vstup. Ambari и Mcs, seznam uzlů, které jsou povoleny pro škálování, a také parametry konfigurace uzlů: paměť a procesor přidělené uzlu v YARN. Dále jsou zde 2 interní parametry q_ram, q_cpu, což jsou fronty. Pomocí nich ukládáme hodnoty aktuálního zatížení clusteru. Pokud vidíme, že za posledních 5 minut došlo k soustavnému navýšení zátěže, rozhodneme se, že musíme do clusteru přidat uzel +1. Totéž platí pro stav nedostatečného využití clusteru.

Výše uvedený kód je příkladem funkce, která odebere počítač z clusteru a zastaví jej v cloudu. Nejprve je vyřazení z provozu YARN Nodemanager, poté se režim zapne Maintenance, poté zastavíme všechny služby na stroji a vypneme virtuální stroj v cloudu.

2. Pozorovatel skriptu.py

Ukázkový kód odtud:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

V něm zkontrolujeme, zda jsou vytvořeny podmínky pro navýšení kapacity clusteru a zda jsou v záloze nějaké stroje, zjistíme hostname jednoho z nich, přidáme ho do clusteru a zveřejníme o něm zprávu na Slacku našeho týmu. Po kterém to začíná cooldown_period, kdy z clusteru nic nepřidáváme ani neodebíráme, ale jednoduše sledujeme zátěž. Pokud se stabilizoval a je v koridoru optimálních hodnot zatížení, pak jednoduše pokračujeme ve sledování. Pokud by jeden uzel nestačil, přidáme další.

Pro případy, kdy máme hodinu před sebou, už s jistotou víme, že jeden uzel stačit nebude, proto ihned spustíme všechny volné uzly a necháme je aktivní až do konce hodiny. To se děje pomocí seznamu časových razítek aktivity.

Závěr

Autoscaler je dobré a pohodlné řešení pro případy, kdy dochází k nerovnoměrnému načítání clusteru. Současně dosáhnete požadované konfigurace clusteru pro špičkové zatížení a zároveň tento cluster neudržíte během nedostatečného zatížení, čímž ušetříte peníze. No, a to vše se děje automaticky bez vaší účasti. Samotný autoscaler není nic jiného než sada požadavků na API správce clusteru a API poskytovatele cloudu, napsané podle určité logiky. Co si určitě musíte zapamatovat, je rozdělení uzlů do 3 typů, jak jsme psali dříve. A budete šťastní.

Zdroj: www.habr.com

Přidat komentář