Ciao! Formiamo le persone a lavorare con i big data. È impossibile immaginare un programma educativo sui big data senza un proprio cluster, sul quale tutti i partecipanti lavorino insieme. Per questo motivo, il nostro programma ce l'ha sempre 🙂 Siamo impegnati nella sua configurazione, ottimizzazione e amministrazione, e i ragazzi avviano direttamente lì i lavori MapReduce e utilizzano Spark.
In questo post ti spiegheremo come abbiamo risolto il problema del caricamento irregolare dei cluster scrivendo il nostro autoscaler utilizzando il cloud
Problema
Il nostro cluster non viene utilizzato in una modalità tipica. Lo smaltimento è altamente disomogeneo. Ad esempio, ci sono lezioni pratiche in cui tutte le 30 persone e un insegnante si recano al cluster e iniziano a utilizzarlo. O ancora, ci sono giorni prima della scadenza in cui il carico aumenta notevolmente. Per il resto del tempo il cluster funziona in modalità di sottocarico.
La soluzione n. 1 consiste nel mantenere un cluster in grado di resistere ai carichi di picco, ma che rimarrà inattivo per il resto del tempo.
La soluzione n. 2 consiste nel mantenere un piccolo cluster a cui aggiungere manualmente i nodi prima delle lezioni e durante i picchi di carico.
La soluzione n. 3 consiste nel mantenere un cluster di piccole dimensioni e scrivere un gestore della scalabilità automatica che monitorerà il carico corrente del cluster e, utilizzando varie API, aggiungerà e rimuoverà i nodi dal cluster.
In questo post parleremo della soluzione n.3. Questo scalatore automatico dipende fortemente da fattori esterni piuttosto che da quelli interni e spesso i fornitori non lo forniscono. Utilizziamo l'infrastruttura cloud Mail.ru Cloud Solutions e abbiamo scritto un autoscaler utilizzando l'API MCS. E poiché insegniamo come lavorare con i dati, abbiamo deciso di mostrare come puoi scrivere un autoscaler simile per i tuoi scopi e utilizzarlo con il tuo cloud
Prerequisiti
Innanzitutto, devi avere un cluster Hadoop. Ad esempio, utilizziamo la distribuzione HDP.
Affinché i tuoi nodi possano essere aggiunti e rimossi rapidamente, devi avere una certa distribuzione dei ruoli tra i nodi.
- Nodo principale. Ebbene, non c'è bisogno di spiegare niente in particolare: il nodo principale del cluster, su cui, ad esempio, viene lanciato il driver Spark, se si utilizza la modalità interattiva.
- Nodo della data. Questo è il nodo su cui archivi i dati su HDFS e dove avvengono i calcoli.
- Nodo di calcolo. Questo è un nodo in cui non memorizzi nulla su HDFS, ma dove avvengono i calcoli.
Punto importante. La scalabilità automatica avverrà a causa dei nodi del terzo tipo. Se inizi a prendere e aggiungere nodi del secondo tipo, la velocità di risposta sarà molto bassa: la disattivazione e il reimpegno richiederanno ore sul tuo cluster. Questo, ovviamente, non è ciò che ti aspetti dalla scalabilità automatica. Cioè, non tocchiamo i nodi del primo e del secondo tipo. Rappresenteranno un cluster minimo vitale che esisterà per tutta la durata del programma.
Pertanto, il nostro scalatore automatico è scritto in Python 3, utilizza l'API Ambari per gestire i servizi cluster, utilizza
Architettura della soluzione
- modulo
autoscaler.py
. Contiene tre classi: 1) funzioni per lavorare con Ambari, 2) funzioni per lavorare con MCS, 3) funzioni correlate direttamente alla logica dell'autoscaler. - Copione
observer.py
. Essenzialmente si compone di diverse regole: quando e in quali momenti richiamare le funzioni di autoscaler. - File di configurazione
config.py
. Contiene, ad esempio, un elenco di nodi consentiti per l'autoscaling e altri parametri che influiscono, ad esempio, sul tempo di attesa dal momento in cui viene aggiunto un nuovo nodo. Sono inoltre presenti timestamp per l'inizio delle lezioni, in modo che prima della lezione venga avviata la massima configurazione del cluster consentita.
Diamo ora un'occhiata alle parti di codice all'interno dei primi due file.
1. Modulo Autoscaler.py
Classe Ambari
Ecco come appare un pezzo di codice contenente una classe Ambari
:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return message
Sopra, ad esempio, puoi vedere l'implementazione della funzione stop_all_services
, che arresta tutti i servizi sul nodo del cluster desiderato.
All'ingresso della classe Ambari
tu passi:
ambari_url
, ad esempio, piace'http://localhost:8080/api/v1/clusters/'
,cluster_name
– il nome del tuo cluster in Ambari,headers = {'X-Requested-By': 'ambari'}
- e dentro
auth
ecco il tuo nome utente e password per Ambari:auth = ('login', 'password')
.
La funzione in sé non è altro che un paio di chiamate tramite l'API REST ad Ambari. Da un punto di vista logico, riceviamo prima un elenco di servizi in esecuzione su un nodo, quindi chiediamo su un dato cluster, su un dato nodo, di trasferire i servizi dall'elenco allo stato INSTALLED
. Funzioni per l'avvio di tutti i servizi, per il trasferimento dei nodi allo stato Maintenance
ecc. sembrano simili: sono solo alcune richieste tramite l'API.
Classe Mc
Ecco come appare un pezzo di codice contenente una classe Mcs
:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_code
All'ingresso della classe Mcs
passiamo l'id del progetto all'interno del cloud e l'id dell'utente, nonché la sua password. In funzione vm_turn_on
vogliamo accendere una delle macchine. La logica qui è un po’ più complicata. All'inizio del codice vengono chiamate altre tre funzioni: 1) dobbiamo ottenere un token, 2) dobbiamo convertire il nome host nel nome della macchina in MCS, 3) ottenere l'id di questa macchina. Successivamente, facciamo semplicemente una richiesta di post e avviamo questa macchina.
Ecco come appare la funzione per ottenere un token:
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.token
Classe di scalabilità automatica
Questa classe contiene funzioni legate alla logica di funzionamento stessa.
Ecco come appare un pezzo di codice per questa classe:
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return message
Accettiamo lezioni per l'ingresso. Ambari
и Mcs
, un elenco di nodi consentiti per il ridimensionamento, nonché i parametri di configurazione del nodo: memoria e CPU allocata al nodo in YARN. Ci sono anche 2 parametri interni q_ram, q_cpu, che sono code. Usandoli, memorizziamo i valori del carico attuale del cluster. Se vediamo che negli ultimi 5 minuti c'è stato un aumento costante del carico, decidiamo che dobbiamo aggiungere +1 nodo al cluster. Lo stesso vale per lo stato di sottoutilizzo del cluster.
Il codice sopra è un esempio di una funzione che rimuove una macchina dal cluster e la ferma nel cloud. Innanzitutto c'è uno smantellamento YARN Nodemanager
, quindi la modalità si attiva Maintenance
, quindi interrompiamo tutti i servizi sulla macchina e disattiviamo la macchina virtuale nel cloud.
2. Script osservatore.py
Codice di esempio da lì:
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)
In esso controlliamo se sono state create le condizioni per aumentare la capacità del cluster e se ci sono macchine di riserva, otteniamo il nome host di una di esse, lo aggiungiamo al cluster e pubblichiamo un messaggio a riguardo sullo Slack del nostro team. Dopo di che si comincia cooldown_period
, quando non aggiungiamo o rimuoviamo nulla dal cluster, ma monitoriamo semplicemente il carico. Se si è stabilizzato e si trova nel corridoio dei valori di carico ottimali, continuiamo semplicemente a monitorare. Se un nodo non fosse sufficiente, ne aggiungiamo un altro.
Nei casi in cui abbiamo una lezione da fare, sappiamo già con certezza che un nodo non sarà sufficiente, quindi avviamo immediatamente tutti i nodi liberi e li manteniamo attivi fino alla fine della lezione. Ciò avviene utilizzando un elenco di timestamp delle attività.
conclusione
Autoscaler è una soluzione valida e conveniente per i casi in cui si verifica un caricamento irregolare del cluster. Ottieni contemporaneamente la configurazione del cluster desiderata per i picchi di carico e allo stesso tempo non mantieni questo cluster durante il sottocarico, risparmiando denaro. Bene, in più tutto questo avviene automaticamente senza la tua partecipazione. L'autoscaler stesso non è altro che un insieme di richieste all'API del gestore cluster e all'API del fornitore di servizi cloud, scritte secondo una determinata logica. Quello che devi assolutamente ricordare è la divisione dei nodi in 3 tipologie, come abbiamo scritto prima. E sarai felice.
Fonte: habr.com