Cumu fà u vostru propiu autoscaler per un cluster

Bonghjornu! Formemu a ghjente per travaglià cù big data. Hè impussibile di imaginà un prugramma educativu nantu à big data senza u so propiu cluster, nantu à quale tutti i participanti travaglianu inseme. Per quessa, u nostru prugramma hà sempre :) Semu impegnati in a so cunfigurazione, sintonizazione è amministrazione, è i picciotti lancianu direttamente i travaglii MapReduce quì è utilizanu Spark.

In questu post vi diceremu cumu risolvemu u prublema di carica di cluster irregolari scrivendu u nostru propiu autoscaler utilizendu u nuvulu Mail.ru Soluzioni Cloud.

prublemu

U nostru cluster ùn hè micca usatu in un modu tipicu. L'eliminazione hè assai irregulare. Per esempiu, ci sò classi pratiche, quandu tutte e 30 persone è un maestru vanu à u cluster è cumincianu à aduprà. O dinò, ci sò ghjorni prima di u termini quandu a carica aumenta assai. U restu di u tempu, u cluster opera in modu di underload.

A suluzione #1 hè di mantene un cluster chì resisterà à i picchi di carica, ma sarà inattivu u restu di u tempu.

A suluzione #2 hè di mantene un picculu cluster, à quale aghjunghje manualmente nodi prima di e classi è durante i picchi di carica.

A suluzione #3 hè di mantene un picculu cluster è scrive un autoscaler chì monitorerà a carica attuale di u cluster è, utilizendu diverse API, aghjunghje è sguassate nodi da u cluster.

In questu post parlemu di a suluzione #3. Stu autoscaler hè assai dipendente di fatturi esterni piuttostu cà di quelli interni, è i fornituri spessu ùn furnisce micca. Utilizemu l'infrastruttura nuvola di Mail.ru Cloud Solutions è hà scrittu un autoscaler utilizendu l'API MCS. E postu chì insegnemu cumu travaglià cù e dati, avemu decisu di dimustrà cumu pudete scrive un autoscaler simili per i vostri scopi è aduprà cù u vostru nuvulu.

Prerequisite

Prima, duvete avè un cluster Hadoop. Per esempiu, usemu a distribuzione HDP.

Per chì i vostri nodi ponu esse aghjuntu è eliminati rapidamente, duvete avè una certa distribuzione di roli trà i nodi.

  1. Nodu maestru. Ebbè, ùn ci hè nunda particularmente necessariu di spiegà quì: u node principale di u cluster, nantu à quale, per esempiu, u driver Spark hè lanciatu, se utilizate u modu interattivu.
  2. Node di data. Questu hè u node nantu à quale guardate e dati in HDFS è induve i calculi sò stati.
  3. Nodu di l'informatica. Questu hè un node induve ùn guardate nunda in HDFS, ma induve i calculi succedenu.

Puntu impurtante. L'autoscaling si farà per via di i nodi di u terzu tipu. Se cuminciate à piglià è aghjunghje nodi di u sicondu tipu, a velocità di risposta serà assai bassu - a disattivazione è a ricommissione durarà ore nantu à u vostru cluster. Questu, sicuru, ùn hè micca ciò chì aspetta da autoscaling. Questu hè, ùn avemu micca toccu i nodi di u primu è u sicondu tipu. Rappresentaranu un cluster minimu viable chì esisterà per tutta a durata di u prugramma.

Allora, u nostru autoscaler hè scrittu in Python 3, usa l'API Ambari per gestisce i servizii di cluster, usa API da Mail.ru Cloud Solutions (MCS) per l'avviamentu è l'arrestu di e macchine.

architettura di suluzione

  1. Modulu autoscaler.py. Contene trè classi: 1) funzioni per travaglià cù Ambari, 2) funzioni per travaglià cù MCS, 3) funzioni ligati direttamente à a logica di l'autoscaler.
  2. Scrittura observer.py. Essenzialmente hè custituitu di diverse regule: quandu è in quale mumenti chjamà e funzioni autoscaler.
  3. File di cunfigurazione config.py. Contene, per esempiu, una lista di nodi permessi per l'autoscaling è altri paràmetri chì affettanu, per esempiu, quantu aspittà da u mumentu chì un novu node hè aghjuntu. Ci sò ancu timestamps per l'iniziu di e classi, perchè prima di a classa a cunfigurazione massima permessa di cluster hè lanciata.

Fighjemu avà i pezzi di codice in i primi dui schedari.

1. Modulu Autoscaler.py

Classe Ambari

Questu hè un pezzu di codice chì cuntene una classa Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Sopra, cum'è un esempiu, pudete vede l'implementazione di a funzione stop_all_services, chì ferma tutti i servizii nantu à u node di cluster desideratu.

À l'entrata di a classe Ambari tu passa:

  • ambari_url, per esempiu, cum'è 'http://localhost:8080/api/v1/clusters/',
  • cluster_name - u nome di u vostru cluster in Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • è dentru auth Eccu u vostru login è password per Ambari: auth = ('login', 'password').

A funzione stessa ùn hè nunda di più chè un paru di chjama via l'API REST à Ambari. Da un puntu di vista lògicu, avemu prima riceve una lista di servizii in esecuzione nantu à un node, è poi dumandà à un cluster daveru, nantu à un node datu, per trasfiriri servizii da a lista à u statu. INSTALLED. Funzioni per lancià tutti i servizii, per trasferisce i nodi à u statu Maintenance ecc parenu simili - sò solu uni pochi di richieste attraversu l'API.

Classe Mcs

Questu hè un pezzu di codice chì cuntene una classa Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

À l'entrata di a classe Mcs passemu l'ID di u prughjettu in u nuvulu è l'ID d'utilizatore, è ancu a so password. In funzione vm_turn_on vulemu accende unu di i machini. A logica quì hè un pocu più cumplicata. À u principiu di u codice, trè altre funzioni sò chjamati: 1) avemu bisognu di ottene un token, 2) avemu bisognu di cunvertisce u nome d'ospitu in u nome di a macchina in MCS, 3) uttene l'id di sta macchina. In seguitu, simpricimenti fà una dumanda post è lanciari sta macchina.

Eccu ciò chì a funzione per ottene un token pare:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Classe Autoscaler

Questa classa cuntene funzioni ligati à a logica operativa stessu.

Eccu ciò chì pare un pezzu di codice per sta classa:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Acceptemu classi per l'entrata. Ambari и Mcs, una lista di nodi chì sò permessi per a scala, è ancu i paràmetri di cunfigurazione di u node: memoria è cpu assignati à u node in YARN. Ci hè ancu 2 paràmetri interni q_ram, q_cpu, chì sò file. Utilizendu elli, almacenemu i valori di a carica di cluster attuale. Se vedemu chì in l'ultimi minuti 5 ci hè stata una carica constantemente aumentata, allora decidemu chì avemu bisognu di aghjunghje +1 node à u cluster. U stessu hè veru per u statu di sottoutilizazione di cluster.

U codice sopra hè un esempiu di una funzione chì sguassate una macchina da u cluster è si ferma in u nuvulu. Prima ci hè una decommissioning YARN Nodemanager, allura u modu si accende Maintenance, tandu fermamu tutti i servizii nantu à a macchina è spegnemu a macchina virtuale in u nuvulu.

2. Script observer.py

Esempiu di codice da quì:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

In questu, cuntrollemu s'ellu sò stati creati e cundizioni per aumentà a capacità di u cluster è s'ellu ci hè una macchina in riserva, uttene u nome d'ospitu di unu di elli, aghjunghje à u cluster è pubblicà un messagiu annantu à u nostru squadra Slack. Dopu à quale si principia cooldown_period, Quandu ùn aghjustemu micca o sguassate nunda da u cluster, ma simpricimenti monitorizà a carica. S'ellu s'hè stabilizatu è hè in u corridore di i valori di carica ottimali, allora simpricimenti cuntinuemu a monitorizazione. Se un node ùn era micca abbastanza, allora aghjunghjemu un altru.

Per i casi quandu avemu una lezioni avanti, sapemu digià sicuru chì un nodu ùn serà micca abbastanza, cusì cuminciamu immediatamente tutti i nodi liberi è mantenenu attivu finu à a fine di a lezziò. Questu succede cù una lista di timestamps di attività.

cunchiusioni

L'Autoscaler hè una soluzione bona è còmuda per quelli casi quandu avete una carica di cluster irregolare. Simultaneamente ottene a cunfigurazione di u cluster desiderata per i picchi di carica è à u stessu tempu ùn mantene micca stu cluster durante a subcarga, risparmiendu soldi. Eppo, più questu tuttu succede automaticamente senza a vostra participazione. L'autoscaler stessu ùn hè più cà un inseme di dumande à l'API di u cluster manager è l'API di u fornitore di nuvola, scritte secondu una certa logica. Ciò chì avete bisognu di ricurdà hè a divisione di nodi in 3 tipi, cum'è avemu scrittu prima. È sarete felice.

Source: www.habr.com

Add a comment