Sveiki! MÄs apmÄcÄm cilvÄkus strÄdÄt ar lielajiem datiem. Nav iespÄjams iedomÄties izglÄ«tojoÅ”u programmu par lielajiem datiem bez sava klastera, kurÄ visi dalÄ«bnieki strÄdÄ kopÄ. Å Ä« iemesla dÄļ mÅ«su programmai tas vienmÄr ir š MÄs nodarbojamies ar tÄs konfigurÄÅ”anu, regulÄÅ”anu un administrÄÅ”anu, un puiÅ”i tur tieÅ”i palaiž MapReduce darbus un izmanto Spark.
Å ajÄ ierakstÄ mÄs jums pastÄstÄ«sim, kÄ mÄs atrisinÄjÄm nevienmÄrÄ«gas klasteru ielÄdes problÄmu, rakstot paÅ”i savu autoscaler, izmantojot mÄkoni.
problÄma
MÅ«su klasteris netiek izmantots parastajÄ režīmÄ. ApglabÄÅ”ana ir ļoti nevienmÄrÄ«ga. PiemÄram, ir praktiskÄs nodarbÄ«bas, kad visi 30 cilvÄki un skolotÄjs aiziet uz klasteri un sÄk to lietot. Vai arÄ« atkal ir dienas pirms termiÅa, kad slodze ievÄrojami palielinÄs. PÄrÄjÄ laikÄ klasteris darbojas nepietiekamas slodzes režīmÄ.
RisinÄjums Nr. 1 ir saglabÄt kopu, kas izturÄs maksimÄlÄs slodzes, bet pÄrÄjÄ laikÄ bÅ«s dÄ«kstÄvÄ.
2. risinÄjums ir saglabÄt nelielu kopu, kurai manuÄli pievienojat mezglus pirms nodarbÄ«bÄm un maksimÄlÄs slodzes laikÄ.
RisinÄjums Nr. 3 ir saglabÄt nelielu klasteru un uzrakstÄ«t automÄtisko mÄrogoÅ”anu, kas uzraudzÄ«s klastera paÅ”reizÄjo slodzi un, izmantojot dažÄdus API, pievienos un noÅems mezglus no klastera.
Å ajÄ rakstÄ mÄs runÄsim par risinÄjumu Nr. 3. Å is automÄtiskais mÄrogoÅ”anas lÄ«dzeklis ir ļoti atkarÄ«gs no ÄrÄjiem faktoriem, nevis no iekÅ”Äjiem faktoriem, un pakalpojumu sniedzÄji to bieži nenodroÅ”ina. MÄs izmantojam Mail.ru Cloud Solutions mÄkoÅa infrastruktÅ«ru un uzrakstÄ«jÄm automÄtisko mÄrogoÅ”anu, izmantojot MCS API. Un tÄ kÄ mÄs mÄcÄm strÄdÄt ar datiem, mÄs nolÄmÄm parÄdÄ«t, kÄ varat uzrakstÄ«t lÄ«dzÄ«gu automÄtisko skalu saviem mÄrÄ·iem un izmantot to savÄ mÄkonÄ«.
PriekÅ”zinÄÅ”anas
PirmkÄrt, jums ir jÄbÅ«t Hadoop klasterim. PiemÄram, mÄs izmantojam HDP izplatÄ«Å”anu.
Lai jÅ«su mezgli tiktu Ätri pievienoti un noÅemti, jums ir jÄbÅ«t noteiktam lomu sadalÄ«jumam starp mezgliem.
- Galvenais mezgls. Nu, nekas Ä«paÅ”i nav jÄpaskaidro: galvenais klastera mezgls, kurÄ, piemÄram, tiek palaists Spark draiveris, ja izmantojat interaktÄ«vo režīmu.
- Datuma mezgls. Å is ir mezgls, kurÄ glabÄjat datus HDFS un kur tiek veikti aprÄÄ·ini.
- SkaitļoÅ”anas mezgls. Å is ir mezgls, kurÄ jÅ«s neko neglabÄjat HDFS, bet kur notiek aprÄÄ·ini.
SvarÄ«gs punkts. AutomÄtiskÄ mÄrogoÅ”ana notiks treÅ”Ä tipa mezglu dÄļ. Ja sÄkat Åemt un pievienot otrÄ tipa mezglus, reakcijas Ätrums bÅ«s ļoti zems ā ekspluatÄcijas pÄrtraukÅ”ana un atkÄrtota ievieÅ”ana jÅ«su klasterÄ« prasÄ«s stundas. Tas, protams, nav tas, ko jÅ«s sagaidÄt no automÄtiskÄs mÄrogoÅ”anas. Tas ir, mÄs nepieskaramies pirmÄ un otrÄ veida mezgliem. Tie veidos minimÄlu dzÄ«votspÄjÄ«gu kopu, kas pastÄvÄs visÄ programmas darbÄ«bas laikÄ.
TÄtad, mÅ«su autoscaler ir rakstÄ«ts Python 3, izmanto Ambari API, lai pÄrvaldÄ«tu klasteru pakalpojumus,
RisinÄjuma arhitektÅ«ra
- Modulis
autoscaler.py
. TajÄ ir trÄ«s klases: 1) funkcijas darbam ar Ambari, 2) funkcijas darbam ar MCS, 3) funkcijas, kas ir tieÅ”i saistÄ«tas ar automÄtiskÄ mÄrogoÅ”anas loÄ£iku. - Skripts
observer.py
. BÅ«tÄ«bÄ tas sastÄv no dažÄdiem noteikumiem: kad un kÄdos brīžos izsaukt automÄtiskÄs mÄrogoÅ”anas funkcijas. - KonfigurÄcijas fails
config.py
. Tas satur, piemÄram, to mezglu sarakstu, kuriem atļauts veikt automÄtisko mÄrogoÅ”anu, un citus parametrus, kas ietekmÄ, piemÄram, cik ilgi jÄgaida no brīža, kad tika pievienots jauns mezgls. Ir arÄ« nodarbÄ«bu sÄkuma laikspiedoli, lai pirms nodarbÄ«bas tiktu palaista maksimÄli pieļaujamÄ klastera konfigurÄcija.
Tagad apskatīsim koda fragmentus pirmajos divos failos.
1. Autoscaler.py modulis
Ambari klase
Å Ädi izskatÄs koda fragments, kas satur klasi Ambari
:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return message
IepriekÅ” kÄ piemÄru varat apskatÄ«t funkcijas ievieÅ”anu stop_all_services
, kas aptur visus pakalpojumus vÄlamajÄ klastera mezglÄ.
Pie ieejas klasÄ Ambari
tu izturi:
ambari_url
, piemÄram, patÄ«k'http://localhost:8080/api/v1/clusters/'
,cluster_name
- jūsu klastera nosaukums Ambari,headers = {'X-Requested-By': 'ambari'}
- un iekÅ”Ä
auth
Å”eit ir jÅ«su Ambari lietotÄjvÄrds un parole:auth = ('login', 'password')
.
Pati funkcija ir nekas vairÄk kÄ pÄris zvani caur REST API uz Ambari. No loÄ£iskÄ viedokļa mÄs vispirms saÅemam mezglÄ darbojoÅ”os pakalpojumu sarakstu un pÄc tam prasÄm noteiktÄ klasterÄ«, konkrÄtajÄ mezglÄ pÄrsÅ«tÄ«t pakalpojumus no saraksta uz stÄvokli. INSTALLED
. Funkcijas visu pakalpojumu palaiÅ”anai, mezglu pÄrsÅ«tÄ«Å”anai uz stÄvokli Maintenance
utt. izskatÄs lÄ«dzÄ«gi ā tie ir tikai daži pieprasÄ«jumi caur API.
klase Mcs
Å Ädi izskatÄs koda fragments, kas satur klasi Mcs
:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_code
Pie ieejas klasÄ Mcs
mÄs nododam projekta ID mÄkonÄ« un lietotÄja ID, kÄ arÄ« viÅa paroli. FunkcijÄ vm_turn_on
mÄs vÄlamies ieslÄgt vienu no maŔīnÄm. Å eit loÄ£ika ir nedaudz sarežģītÄka. Koda sÄkumÄ tiek izsauktas trÄ«s citas funkcijas: 1) jÄiegÅ«st marÄ·ieris, 2) jÄkonvertÄ resursdatora nosaukums par maŔīnas nosaukumu MCS, 3) jÄiegÅ«st Ŕīs maŔīnas id. PÄc tam mÄs vienkÄrÅ”i veicam pasta pieprasÄ«jumu un palaižam Å”o iekÄrtu.
Å Ädi izskatÄs marÄ·iera iegÅ«Å”anas funkcija:
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.token
Autoscaler klase
Å ajÄ klasÄ ir funkcijas, kas saistÄ«tas ar paÅ”u darbÄ«bas loÄ£iku.
LÅ«k, kÄ izskatÄs Ŕīs klases koda daļa:
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return message
PieÅemam nodarbÄ«bas ieejai. Ambari
Šø Mcs
, to mezglu saraksts, kuriem ir atļauts mÄrogot, kÄ arÄ« mezgla konfigurÄcijas parametri: atmiÅa un CPU, kas pieŔķirts mezglam YARN. Ir arÄ« 2 iekÅ”Äjie parametri q_ram, q_cpu, kas ir rindas. Izmantojot tos, mÄs saglabÄjam paÅ”reizÄjÄs klastera slodzes vÄrtÄ«bas. Ja mÄs redzam, ka pÄdÄjo 5 minÅ«Å”u laikÄ ir pastÄvÄ«gi palielinÄta slodze, mÄs nolemjam, ka mums ir jÄpievieno klasterim +1 mezgls. Tas pats attiecas uz klastera nepietiekamas izmantoÅ”anas stÄvokli.
IepriekÅ” minÄtais kods ir tÄdas funkcijas piemÄrs, kas noÅem maŔīnu no klastera un aptur to mÄkonÄ«. Vispirms notiek ekspluatÄcijas pÄrtraukÅ”ana YARN Nodemanager
, tad režīms tiek ieslÄgts Maintenance
, tad mÄs apturam visus pakalpojumus maŔīnÄ un izslÄdzam virtuÄlo maŔīnu mÄkonÄ«.
2. Skripta novÄrotÄjs.py
Koda paraugs no turienes:
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)
TajÄ pÄrbaudÄm, vai ir radÄ«ti apstÄkļi klastera kapacitÄtes palielinÄÅ”anai un vai nav rezervÄ kÄdas maŔīnas, iegÅ«stam kÄda no tÄm resursdatora nosaukumu, pievienojam to klasterim un par to publicÄjam ziÅu mÅ«su komandas Slack. PÄc kura tas sÄkas cooldown_period
, kad mÄs neko nepievienojam un neizÅemam no klastera, bet vienkÄrÅ”i uzraugÄm slodzi. Ja tas ir nostabilizÄjies un atrodas optimÄlo slodzes vÄrtÄ«bu koridorÄ, tad vienkÄrÅ”i turpinÄm uzraudzÄ«bu. Ja ar vienu mezglu nepietika, tad pievienojam vÄl vienu.
GadÄ«jumiem, kad mums priekÅ”Ä nodarbÄ«ba, mÄs jau droÅ”i zinÄm, ka ar vienu mezglu nepietiks, tÄpÄc uzreiz iedarbinÄm visus brÄ«vos mezglus un turam tos aktÄ«vus lÄ«dz nodarbÄ«bas beigÄm. Tas notiek, izmantojot aktivitÄÅ”u laikspiedolu sarakstu.
SecinÄjums
Autoscaler ir labs un Ärts risinÄjums tiem gadÄ«jumiem, kad rodas nevienmÄrÄ«ga klasteru ielÄde. JÅ«s vienlaikus sasniedzat vÄlamo klastera konfigurÄciju maksimÄlajÄm slodzÄm un tajÄ paÅ”Ä laikÄ nesaglabÄjat Å”o klasteru nepietiekamas slodzes laikÄ, ietaupot naudu. TurklÄt tas viss notiek automÄtiski bez jÅ«su lÄ«dzdalÄ«bas. Pats automÄtiskais mÄrogoÅ”anas lÄ«dzeklis ir nekas vairÄk kÄ pieprasÄ«jumu kopa klasteru pÄrvaldnieka API un mÄkoÅa nodroÅ”inÄtÄja API, kas rakstÄ«ti saskaÅÄ ar noteiktu loÄ£iku. Tas, kas jums noteikti ir jÄatceras, ir mezglu sadalÄ«jums 3 veidos, kÄ mÄs rakstÄ«jÄm iepriekÅ”. Un tu bÅ«si laimÄ«gs.
Avots: www.habr.com