Come creare il tuo autoscaler per un cluster

Ciao! Formiamo le persone a lavorare con i big data. È impossibile immaginare un programma educativo sui big data senza un proprio cluster, sul quale tutti i partecipanti lavorino insieme. Per questo motivo, il nostro programma ce l'ha sempre 🙂 Siamo impegnati nella sua configurazione, ottimizzazione e amministrazione, e i ragazzi avviano direttamente lì i lavori MapReduce e utilizzano Spark.

In questo post ti spiegheremo come abbiamo risolto il problema del caricamento irregolare dei cluster scrivendo il nostro autoscaler utilizzando il cloud Mail.ru soluzioni cloud.

Problema

Il nostro cluster non viene utilizzato in una modalità tipica. Lo smaltimento è altamente disomogeneo. Ad esempio, ci sono lezioni pratiche in cui tutte le 30 persone e un insegnante si recano al cluster e iniziano a utilizzarlo. O ancora, ci sono giorni prima della scadenza in cui il carico aumenta notevolmente. Per il resto del tempo il cluster funziona in modalità di sottocarico.

La soluzione n. 1 consiste nel mantenere un cluster in grado di resistere ai carichi di picco, ma che rimarrà inattivo per il resto del tempo.

La soluzione n. 2 consiste nel mantenere un piccolo cluster a cui aggiungere manualmente i nodi prima delle lezioni e durante i picchi di carico.

La soluzione n. 3 consiste nel mantenere un cluster di piccole dimensioni e scrivere un gestore della scalabilità automatica che monitorerà il carico corrente del cluster e, utilizzando varie API, aggiungerà e rimuoverà i nodi dal cluster.

In questo post parleremo della soluzione n.3. Questo scalatore automatico dipende fortemente da fattori esterni piuttosto che da quelli interni e spesso i fornitori non lo forniscono. Utilizziamo l'infrastruttura cloud Mail.ru Cloud Solutions e abbiamo scritto un autoscaler utilizzando l'API MCS. E poiché insegniamo come lavorare con i dati, abbiamo deciso di mostrare come puoi scrivere un autoscaler simile per i tuoi scopi e utilizzarlo con il tuo cloud

Prerequisiti

Innanzitutto, devi avere un cluster Hadoop. Ad esempio, utilizziamo la distribuzione HDP.

Affinché i tuoi nodi possano essere aggiunti e rimossi rapidamente, devi avere una certa distribuzione dei ruoli tra i nodi.

  1. Nodo principale. Ebbene, non c'è bisogno di spiegare niente in particolare: il nodo principale del cluster, su cui, ad esempio, viene lanciato il driver Spark, se si utilizza la modalità interattiva.
  2. Nodo della data. Questo è il nodo su cui archivi i dati su HDFS e dove avvengono i calcoli.
  3. Nodo di calcolo. Questo è un nodo in cui non memorizzi nulla su HDFS, ma dove avvengono i calcoli.

Punto importante. La scalabilità automatica avverrà a causa dei nodi del terzo tipo. Se inizi a prendere e aggiungere nodi del secondo tipo, la velocità di risposta sarà molto bassa: la disattivazione e il reimpegno richiederanno ore sul tuo cluster. Questo, ovviamente, non è ciò che ti aspetti dalla scalabilità automatica. Cioè, non tocchiamo i nodi del primo e del secondo tipo. Rappresenteranno un cluster minimo vitale che esisterà per tutta la durata del programma.

Pertanto, il nostro scalatore automatico è scritto in Python 3, utilizza l'API Ambari per gestire i servizi cluster, utilizza API di Mail.ru Cloud Solutions (MCS) per l'avviamento e l'arresto delle macchine.

Architettura della soluzione

  1. modulo autoscaler.py. Contiene tre classi: 1) funzioni per lavorare con Ambari, 2) funzioni per lavorare con MCS, 3) funzioni correlate direttamente alla logica dell'autoscaler.
  2. Copione observer.py. Essenzialmente si compone di diverse regole: quando e in quali momenti richiamare le funzioni di autoscaler.
  3. File di configurazione config.py. Contiene, ad esempio, un elenco di nodi consentiti per l'autoscaling e altri parametri che influiscono, ad esempio, sul tempo di attesa dal momento in cui viene aggiunto un nuovo nodo. Sono inoltre presenti timestamp per l'inizio delle lezioni, in modo che prima della lezione venga avviata la massima configurazione del cluster consentita.

Diamo ora un'occhiata alle parti di codice all'interno dei primi due file.

1. Modulo Autoscaler.py

Classe Ambari

Ecco come appare un pezzo di codice contenente una classe Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Sopra, ad esempio, puoi vedere l'implementazione della funzione stop_all_services, che arresta tutti i servizi sul nodo del cluster desiderato.

All'ingresso della classe Ambari tu passi:

  • ambari_url, ad esempio, piace 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – il nome del tuo cluster in Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • e dentro auth ecco il tuo nome utente e password per Ambari: auth = ('login', 'password').

La funzione in sé non è altro che un paio di chiamate tramite l'API REST ad Ambari. Da un punto di vista logico, riceviamo prima un elenco di servizi in esecuzione su un nodo, quindi chiediamo su un dato cluster, su un dato nodo, di trasferire i servizi dall'elenco allo stato INSTALLED. Funzioni per l'avvio di tutti i servizi, per il trasferimento dei nodi allo stato Maintenance ecc. sembrano simili: sono solo alcune richieste tramite l'API.

Classe Mc

Ecco come appare un pezzo di codice contenente una classe Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

All'ingresso della classe Mcs passiamo l'id del progetto all'interno del cloud e l'id dell'utente, nonché la sua password. In funzione vm_turn_on vogliamo accendere una delle macchine. La logica qui è un po’ più complicata. All'inizio del codice vengono chiamate altre tre funzioni: 1) dobbiamo ottenere un token, 2) dobbiamo convertire il nome host nel nome della macchina in MCS, 3) ottenere l'id di questa macchina. Successivamente, facciamo semplicemente una richiesta di post e avviamo questa macchina.

Ecco come appare la funzione per ottenere un token:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Classe di scalabilità automatica

Questa classe contiene funzioni legate alla logica di funzionamento stessa.

Ecco come appare un pezzo di codice per questa classe:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Accettiamo lezioni per l'ingresso. Ambari и Mcs, un elenco di nodi consentiti per il ridimensionamento, nonché i parametri di configurazione del nodo: memoria e CPU allocata al nodo in YARN. Ci sono anche 2 parametri interni q_ram, q_cpu, che sono code. Usandoli, memorizziamo i valori del carico attuale del cluster. Se vediamo che negli ultimi 5 minuti c'è stato un aumento costante del carico, decidiamo che dobbiamo aggiungere +1 nodo al cluster. Lo stesso vale per lo stato di sottoutilizzo del cluster.

Il codice sopra è un esempio di una funzione che rimuove una macchina dal cluster e la ferma nel cloud. Innanzitutto c'è uno smantellamento YARN Nodemanager, quindi la modalità si attiva Maintenance, quindi interrompiamo tutti i servizi sulla macchina e disattiviamo la macchina virtuale nel cloud.

2. Script osservatore.py

Codice di esempio da lì:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

In esso controlliamo se sono state create le condizioni per aumentare la capacità del cluster e se ci sono macchine di riserva, otteniamo il nome host di una di esse, lo aggiungiamo al cluster e pubblichiamo un messaggio a riguardo sullo Slack del nostro team. Dopo di che si comincia cooldown_period, quando non aggiungiamo o rimuoviamo nulla dal cluster, ma monitoriamo semplicemente il carico. Se si è stabilizzato e si trova nel corridoio dei valori di carico ottimali, continuiamo semplicemente a monitorare. Se un nodo non fosse sufficiente, ne aggiungiamo un altro.

Nei casi in cui abbiamo una lezione da fare, sappiamo già con certezza che un nodo non sarà sufficiente, quindi avviamo immediatamente tutti i nodi liberi e li manteniamo attivi fino alla fine della lezione. Ciò avviene utilizzando un elenco di timestamp delle attività.

conclusione

Autoscaler è una soluzione valida e conveniente per i casi in cui si verifica un caricamento irregolare del cluster. Ottieni contemporaneamente la configurazione del cluster desiderata per i picchi di carico e allo stesso tempo non mantieni questo cluster durante il sottocarico, risparmiando denaro. Bene, in più tutto questo avviene automaticamente senza la tua partecipazione. L'autoscaler stesso non è altro che un insieme di richieste all'API del gestore cluster e all'API del fornitore di servizi cloud, scritte secondo una determinata logica. Quello che devi assolutamente ricordare è la divisione dei nodi in 3 tipologie, come abbiamo scritto prima. E sarai felice.

Fonte: habr.com

Aggiungi un commento