Sveiki! Apmokome žmones dirbti su dideliais duomenimis. Neįmanoma įsivaizduoti švietimo programos apie didžiuosius duomenis be savo klasterio, kuriame visi dalyviai dirba kartu. Dėl šios priežasties mūsų programa visada ją turi 🙂 Mes užsiimame jos konfigūravimu, derinimu ir administravimu, o vaikinai ten tiesiogiai paleidžia MapReduce darbus ir naudoja Spark.
Šiame įraše mes jums pasakysime, kaip išsprendėme netolygaus klasterio įkėlimo problemą parašydami savo automatinį skalę naudodami debesį
problema
Mūsų klasteris nenaudojamas įprastu režimu. Išmetimas labai netolygus. Pavyzdžiui, yra praktinių užsiėmimų, kai visi 30 žmonių ir mokytojas nueina į klasterį ir pradeda juo naudotis. Arba vėl yra dienų iki termino, kai krūvis labai padidėja. Likusį laiką klasteris veikia per mažos apkrovos režimu.
1 sprendimas yra išlaikyti klasterį, kuris atlaikytų didžiausias apkrovas, bet likusį laiką bus neaktyvus.
2 sprendimas yra išlaikyti nedidelį klasterį, į kurį rankiniu būdu pridedate mazgus prieš pamokas ir didžiausios apkrovos metu.
3 sprendimas yra išlaikyti nedidelį klasterį ir parašyti automatinį skalę, kuri stebės esamą klasterio apkrovą ir, naudodama įvairias API, pridės ir pašalins mazgus iš klasterio.
Šiame įraše kalbėsime apie 3 sprendimą. Šis automatinis skaleris labai priklauso nuo išorinių veiksnių, o ne nuo vidinių, o tiekėjai dažnai jo neteikia. Naudojame Mail.ru Cloud Solutions debesies infrastruktūrą ir parašėme automatinį skalę naudodami MCS API. Kadangi mes mokome dirbti su duomenimis, nusprendėme parodyti, kaip galite parašyti panašų automatinį skalavimo įrenginį savo tikslams ir naudoti jį savo debesyje.
Būtinos sąlygos
Pirma, turite turėti Hadoop klasterį. Pavyzdžiui, mes naudojame HDP paskirstymą.
Kad jūsų mazgai būtų greitai pridėti ir pašalinti, turite turėti tam tikrą vaidmenų paskirstymą tarp mazgų.
- Pagrindinis mazgas. Na, nereikia nieko ypač aiškinti: pagrindinis klasterio mazgas, kuriame, pavyzdžiui, paleidžiama „Spark“ tvarkyklė, jei naudojate interaktyvųjį režimą.
- Datos mazgas. Tai mazgas, kuriame saugote duomenis HDFS ir kuriame atliekami skaičiavimai.
- Skaičiavimo mazgas. Tai mazgas, kuriame nieko nesaugote HDFS, bet kur vyksta skaičiavimai.
Svarbus punktas. Automatinis mastelis įvyks dėl trečiojo tipo mazgų. Jei pradėsite imti ir pridėti antrojo tipo mazgus, atsako greitis bus labai mažas – eksploatacijos nutraukimas ir pakartotinis įjungimas jūsų klasteryje užtruks valandas. Žinoma, tai nėra tai, ko tikitės iš automatinio mastelio keitimo. Tai yra, mes neliečiame pirmojo ir antrojo tipų mazgų. Jie bus minimali gyvybinga grupė, kuri egzistuos visą programos laikotarpį.
Taigi, mūsų automatinis skaleris parašytas Python 3, naudoja Ambari API klasterio paslaugoms valdyti, naudoja
Sprendimo architektūra
- Modulis
autoscaler.py
. Jame yra trys klasės: 1) funkcijos, skirtos darbui su „Ambari“, 2) funkcijos, skirtos darbui su MCS, 3) funkcijos, tiesiogiai susijusios su automatinio skalerio logika. - Scenarijus
observer.py
. Iš esmės tai susideda iš skirtingų taisyklių: kada ir kokiais momentais iškviesti automatinio mastelio funkcijas. - Konfigūracijos failas
config.py
. Jame, pavyzdžiui, yra mazgų, leidžiamų automatiniam mastelio keitimui, sąrašas ir kiti parametrai, kurie turi įtakos, pavyzdžiui, kiek laiko reikia laukti nuo naujo mazgo pridėjimo momento. Taip pat yra pamokų pradžios laiko žymos, kad prieš pamoką būtų paleista maksimali leistina klasterio konfigūracija.
Dabar pažvelkime į kodo dalis pirmuosiuose dviejuose failuose.
1. Autoscaler.py modulis
Ambari klasė
Taip atrodo kodo dalis, kurioje yra klasė Ambari
:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return message
Aukščiau, kaip pavyzdį, galite pažvelgti į funkcijos įgyvendinimą stop_all_services
, kuris sustabdo visas paslaugas norimame klasterio mazge.
Prie įėjimo į klasę Ambari
tu praeini:
ambari_url
, pavyzdžiui, patinka'http://localhost:8080/api/v1/clusters/'
,cluster_name
– jūsų grupės pavadinimas Ambari,headers = {'X-Requested-By': 'ambari'}
- ir viduje
auth
čia yra jūsų Ambari prisijungimo vardas ir slaptažodis:auth = ('login', 'password')
.
Pati funkcija yra ne kas kita, kaip keli skambučiai per REST API į Ambari. Loginiu požiūriu pirmiausia gauname mazge veikiančių paslaugų sąrašą, o tada prašome tam tikrame klasteryje, nurodytame mazge, perkelti paslaugas iš sąrašo į būseną. INSTALLED
. Visų paslaugų paleidimo, mazgų perkėlimo į būseną funkcijos Maintenance
ir tt atrodo panašiai – tai tik kelios užklausos per API.
Mc klasės
Taip atrodo kodo dalis, kurioje yra klasė Mcs
:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_code
Prie įėjimo į klasę Mcs
mes perduodame projekto ID debesyje ir vartotojo ID, taip pat jo slaptažodį. Funkcijoje vm_turn_on
norime įjungti vieną iš mašinų. Logika čia yra šiek tiek sudėtingesnė. Kodo pradžioje vadinamos dar trys funkcijos: 1) turime gauti prieigos raktą, 2) turime konvertuoti pagrindinio kompiuterio pavadinimą į mašinos pavadinimą MCS, 3) gauti šio įrenginio ID. Tada mes tiesiog pateikiame užklausą dėl paskelbimo ir paleidžiame šį įrenginį.
Štai kaip atrodo žetono gavimo funkcija:
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.token
Autoscaler klasė
Šioje klasėje yra funkcijų, susijusių su pačia veikimo logika.
Štai kaip atrodo šios klasės kodo dalis:
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return message
Priimame užsiėmimus. Ambari
и Mcs
, mazgų, kuriems leidžiama keisti mastelį, sąrašas, taip pat mazgo konfigūracijos parametrai: atmintis ir procesorius, priskirtas mazgui YARN. Taip pat yra 2 vidiniai parametrai q_ram, q_cpu, kurie yra eilės. Naudodami juos išsaugome esamos klasterio apkrovos reikšmes. Jei matome, kad per paskutines 5 minutes nuolat didėjo apkrova, nusprendžiame, kad prie klasterio reikia pridėti +1 mazgą. Tas pats pasakytina ir apie klasterio nepakankamo panaudojimo būseną.
Aukščiau pateiktas kodas yra funkcijos, kuri pašalina mašiną iš klasterio ir sustabdo ją debesyje, pavyzdys. Pirmiausia vyksta eksploatavimo nutraukimas YARN Nodemanager
, tada režimas įsijungia Maintenance
, tada sustabdome visas mašinoje esančias paslaugas ir išjungiame virtualią mašiną debesyje.
2. Scenarijaus stebėtojas.py
Kodo pavyzdys iš ten:
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)
Jame patikriname, ar yra sudarytos sąlygos didinti klasterio pajėgumą ir ar nėra rezervuotų mašinų, gauname vieno iš jų host pavadinimą, pridedame jį prie klasterio ir apie tai paskelbiame pranešimą mūsų komandos Slack. Po kurio prasideda cooldown_period
, kai mes nieko nepridedame ir nepašaliname iš klasterio, o tiesiog stebime apkrovą. Jei jis stabilizavosi ir yra optimalių apkrovos verčių koridoriuje, mes tiesiog tęsiame stebėjimą. Jei vieno mazgo nepakako, tada pridedame kitą.
Tais atvejais, kai mūsų laukia pamoka, jau tikrai žinome, kad vieno mazgo neužteks, todėl iškart paleidžiame visus laisvus mazgus ir palaikome juos aktyvius iki pamokos pabaigos. Tai nutinka naudojant veiklos laiko žymų sąrašą.
išvada
Autoscaler yra geras ir patogus sprendimas tais atvejais, kai susiduriate su netolygiu klasterių apkrovimu. Jūs vienu metu pasiekiate norimą klasterio konfigūraciją didžiausioms apkrovoms ir tuo pačiu metu neišlaikote šio klasterio per mažą apkrovą, taip sutaupydami pinigų. Na, be to, visa tai vyksta automatiškai be jūsų dalyvavimo. Pats automatinis skaleris yra ne kas kita, kaip užklausų rinkinys klasterio valdytojo API ir debesies tiekėjo API, parašytas pagal tam tikrą logiką. Ką tikrai reikia atsiminti, tai mazgų padalijimas į 3 tipus, kaip rašėme anksčiau. Ir tu būsi laimingas.
Šaltinis: www.habr.com