Bonghjornu! Formemu a ghjente per travaglià cù big data. Hè impussibile di imaginà un prugramma educativu nantu à big data senza u so propiu cluster, nantu à quale tutti i participanti travaglianu inseme. Per quessa, u nostru prugramma hà sempre :) Semu impegnati in a so cunfigurazione, sintonizazione è amministrazione, è i picciotti lancianu direttamente i travaglii MapReduce quì è utilizanu Spark.
In questu post vi diceremu cumu risolvemu u prublema di carica di cluster irregolari scrivendu u nostru propiu autoscaler utilizendu u nuvulu
prublemu
U nostru cluster ùn hè micca usatu in un modu tipicu. L'eliminazione hè assai irregulare. Per esempiu, ci sò classi pratiche, quandu tutte e 30 persone è un maestru vanu à u cluster è cumincianu à aduprà. O dinò, ci sò ghjorni prima di u termini quandu a carica aumenta assai. U restu di u tempu, u cluster opera in modu di underload.
A suluzione #1 hè di mantene un cluster chì resisterà à i picchi di carica, ma sarà inattivu u restu di u tempu.
A suluzione #2 hè di mantene un picculu cluster, à quale aghjunghje manualmente nodi prima di e classi è durante i picchi di carica.
A suluzione #3 hè di mantene un picculu cluster è scrive un autoscaler chì monitorerà a carica attuale di u cluster è, utilizendu diverse API, aghjunghje è sguassate nodi da u cluster.
In questu post parlemu di a suluzione #3. Stu autoscaler hè assai dipendente di fatturi esterni piuttostu cà di quelli interni, è i fornituri spessu ùn furnisce micca. Utilizemu l'infrastruttura nuvola di Mail.ru Cloud Solutions è hà scrittu un autoscaler utilizendu l'API MCS. E postu chì insegnemu cumu travaglià cù e dati, avemu decisu di dimustrà cumu pudete scrive un autoscaler simili per i vostri scopi è aduprà cù u vostru nuvulu.
Prerequisite
Prima, duvete avè un cluster Hadoop. Per esempiu, usemu a distribuzione HDP.
Per chì i vostri nodi ponu esse aghjuntu è eliminati rapidamente, duvete avè una certa distribuzione di roli trà i nodi.
- Nodu maestru. Ebbè, ùn ci hè nunda particularmente necessariu di spiegà quì: u node principale di u cluster, nantu à quale, per esempiu, u driver Spark hè lanciatu, se utilizate u modu interattivu.
- Node di data. Questu hè u node nantu à quale guardate e dati in HDFS è induve i calculi sò stati.
- Nodu di l'informatica. Questu hè un node induve ùn guardate nunda in HDFS, ma induve i calculi succedenu.
Puntu impurtante. L'autoscaling si farà per via di i nodi di u terzu tipu. Se cuminciate à piglià è aghjunghje nodi di u sicondu tipu, a velocità di risposta serà assai bassu - a disattivazione è a ricommissione durarà ore nantu à u vostru cluster. Questu, sicuru, ùn hè micca ciò chì aspetta da autoscaling. Questu hè, ùn avemu micca toccu i nodi di u primu è u sicondu tipu. Rappresentaranu un cluster minimu viable chì esisterà per tutta a durata di u prugramma.
Allora, u nostru autoscaler hè scrittu in Python 3, usa l'API Ambari per gestisce i servizii di cluster, usa
architettura di suluzione
- Modulu
autoscaler.py
. Contene trè classi: 1) funzioni per travaglià cù Ambari, 2) funzioni per travaglià cù MCS, 3) funzioni ligati direttamente à a logica di l'autoscaler. - Scrittura
observer.py
. Essenzialmente hè custituitu di diverse regule: quandu è in quale mumenti chjamà e funzioni autoscaler. - File di cunfigurazione
config.py
. Contene, per esempiu, una lista di nodi permessi per l'autoscaling è altri paràmetri chì affettanu, per esempiu, quantu aspittà da u mumentu chì un novu node hè aghjuntu. Ci sò ancu timestamps per l'iniziu di e classi, perchè prima di a classa a cunfigurazione massima permessa di cluster hè lanciata.
Fighjemu avà i pezzi di codice in i primi dui schedari.
1. Modulu Autoscaler.py
Classe Ambari
Questu hè un pezzu di codice chì cuntene una classa Ambari
:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return message
Sopra, cum'è un esempiu, pudete vede l'implementazione di a funzione stop_all_services
, chì ferma tutti i servizii nantu à u node di cluster desideratu.
À l'entrata di a classe Ambari
tu passa:
ambari_url
, per esempiu, cum'è'http://localhost:8080/api/v1/clusters/'
,cluster_name
- u nome di u vostru cluster in Ambari,headers = {'X-Requested-By': 'ambari'}
- è dentru
auth
Eccu u vostru login è password per Ambari:auth = ('login', 'password')
.
A funzione stessa ùn hè nunda di più chè un paru di chjama via l'API REST à Ambari. Da un puntu di vista lògicu, avemu prima riceve una lista di servizii in esecuzione nantu à un node, è poi dumandà à un cluster daveru, nantu à un node datu, per trasfiriri servizii da a lista à u statu. INSTALLED
. Funzioni per lancià tutti i servizii, per trasferisce i nodi à u statu Maintenance
ecc parenu simili - sò solu uni pochi di richieste attraversu l'API.
Classe Mcs
Questu hè un pezzu di codice chì cuntene una classa Mcs
:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_code
À l'entrata di a classe Mcs
passemu l'ID di u prughjettu in u nuvulu è l'ID d'utilizatore, è ancu a so password. In funzione vm_turn_on
vulemu accende unu di i machini. A logica quì hè un pocu più cumplicata. À u principiu di u codice, trè altre funzioni sò chjamati: 1) avemu bisognu di ottene un token, 2) avemu bisognu di cunvertisce u nome d'ospitu in u nome di a macchina in MCS, 3) uttene l'id di sta macchina. In seguitu, simpricimenti fà una dumanda post è lanciari sta macchina.
Eccu ciò chì a funzione per ottene un token pare:
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.token
Classe Autoscaler
Questa classa cuntene funzioni ligati à a logica operativa stessu.
Eccu ciò chì pare un pezzu di codice per sta classa:
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return message
Acceptemu classi per l'entrata. Ambari
и Mcs
, una lista di nodi chì sò permessi per a scala, è ancu i paràmetri di cunfigurazione di u node: memoria è cpu assignati à u node in YARN. Ci hè ancu 2 paràmetri interni q_ram, q_cpu, chì sò file. Utilizendu elli, almacenemu i valori di a carica di cluster attuale. Se vedemu chì in l'ultimi minuti 5 ci hè stata una carica constantemente aumentata, allora decidemu chì avemu bisognu di aghjunghje +1 node à u cluster. U stessu hè veru per u statu di sottoutilizazione di cluster.
U codice sopra hè un esempiu di una funzione chì sguassate una macchina da u cluster è si ferma in u nuvulu. Prima ci hè una decommissioning YARN Nodemanager
, allura u modu si accende Maintenance
, tandu fermamu tutti i servizii nantu à a macchina è spegnemu a macchina virtuale in u nuvulu.
2. Script observer.py
Esempiu di codice da quì:
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)
In questu, cuntrollemu s'ellu sò stati creati e cundizioni per aumentà a capacità di u cluster è s'ellu ci hè una macchina in riserva, uttene u nome d'ospitu di unu di elli, aghjunghje à u cluster è pubblicà un messagiu annantu à u nostru squadra Slack. Dopu à quale si principia cooldown_period
, Quandu ùn aghjustemu micca o sguassate nunda da u cluster, ma simpricimenti monitorizà a carica. S'ellu s'hè stabilizatu è hè in u corridore di i valori di carica ottimali, allora simpricimenti cuntinuemu a monitorizazione. Se un node ùn era micca abbastanza, allora aghjunghjemu un altru.
Per i casi quandu avemu una lezioni avanti, sapemu digià sicuru chì un nodu ùn serà micca abbastanza, cusì cuminciamu immediatamente tutti i nodi liberi è mantenenu attivu finu à a fine di a lezziò. Questu succede cù una lista di timestamps di attività.
cunchiusioni
L'Autoscaler hè una soluzione bona è còmuda per quelli casi quandu avete una carica di cluster irregolare. Simultaneamente ottene a cunfigurazione di u cluster desiderata per i picchi di carica è à u stessu tempu ùn mantene micca stu cluster durante a subcarga, risparmiendu soldi. Eppo, più questu tuttu succede automaticamente senza a vostra participazione. L'autoscaler stessu ùn hè più cà un inseme di dumande à l'API di u cluster manager è l'API di u fornitore di nuvola, scritte secondu una certa logica. Ciò chì avete bisognu di ricurdà hè a divisione di nodi in 3 tipi, cum'è avemu scrittu prima. È sarete felice.
Source: www.habr.com