ããã«ã¡ã¯ïŒ ç§ãã¡ã¯ããã°ããŒã¿ãæ±ã人æãèšç·ŽããŸãã ãã¹ãŠã®åå è ãååããŠäœæ¥ããç¬èªã®ã¯ã©ã¹ã¿ãŒããªããã°ãããã°ããŒã¿ã«é¢ããæè²ããã°ã©ã ãæ³åããããšã¯äžå¯èœã§ãã ãã®ãããç§ãã¡ã®ããã°ã©ã ã«ã¯åžžã«ãããå«ãŸããŠããŸã ð ç§ãã¡ã¯ãã®æ§æããã¥ãŒãã³ã°ã管çã«åŸäºããŠãããã¹ã¿ããã¯ãã㧠MapReduce ãžã§ããçŽæ¥èµ·åããSpark ã䜿çšããŠããŸãã
ãã®æçš¿ã§ã¯ãã¯ã©ãŠãã䜿çšããŠç¬èªã®ãªãŒãã¹ã±ãŒã©ãŒãäœæããããšã§ãã¯ã©ã¹ã¿ãŒã®è² è·ãäžåäžã«ãªãåé¡ãã©ã®ããã«è§£æ±ºãããã«ã€ããŠèª¬æããŸãã
åé¡
ç§ãã¡ã®ã¯ã©ã¹ã¿ãŒã¯éåžžã®ã¢ãŒãã§ã¯äœ¿çšãããŸããã åŠåã¯éåžžã«äžåäžã§ãã äŸãã°ã30äººå šå¡ãšå çãã¯ã©ã¹ã¿ãŒã«è¡ã£ãŠäœ¿ãå§ããå®è·µçãªææ¥ããããŸãã ãããã¯ãç· ãåãåã«è² è·ãå€§å¹ ã«å¢å ããæ¥ããããŸãã æ®ãã®æéã¯ãã¯ã©ã¹ã¿ãŒã¯ã¢ã³ããŒããŒã ã¢ãŒãã§åäœããŸãã
解決ç #1 ã¯ãããŒã¯è² è·ã«ã¯èããããããæ®ãã®æéã¯ã¢ã€ãã«ç¶æ ã«ãªãã¯ã©ã¹ã¿ãŒãç¶æããããšã§ãã
解決ç 2 ã¯ãå°èŠæš¡ãªã¯ã©ã¹ã¿ãŒãç¶æããã¯ã©ã¹ã®åããã³ããŒã¯è² è·äžã«æåã§ããŒããè¿œå ããããšã§ãã
解決ç #3 ã¯ãå°èŠæš¡ãªã¯ã©ã¹ã¿ãŒãç¶æããã¯ã©ã¹ã¿ãŒã®çŸåšã®è² è·ãç£èŠããããŸããŸãª API ã䜿çšããŠã¯ã©ã¹ã¿ãŒã«ããŒããè¿œå ããã³åé€ãããªãŒãã¹ã±ãŒã©ãŒãäœæããããšã§ãã
ãã®æçš¿ã§ã¯ã解決ç #3 ã«ã€ããŠèª¬æããŸãã ãã®ãªãŒãã¹ã±ãŒã©ãŒã¯å éšèŠå ã§ã¯ãªãå€éšèŠå ã«å€§ããäŸåããŠããããããã€ããŒãæäŸããŠããªãããšããããããŸãã ç§ãã¡ã¯ Mail.ru Cloud Solutions ã®ã¯ã©ãŠã ã€ã³ãã©ã¹ãã©ã¯ãã£ã䜿çšããMCS API ã䜿çšããŠãªãŒãã¹ã±ãŒã©ãŒãäœæããŸããã ãããŠãç§ãã¡ã¯ããŒã¿ã®æäœæ¹æ³ãæããŠããã®ã§ãç¬èªã®ç®çã®ããã«åæ§ã®ãªãŒãã¹ã±ãŒã©ãŒãäœæãããããã¯ã©ãŠãã§äœ¿çšããæ¹æ³ã瀺ãããšã«ããŸããã
åææ¡ä»¶
ãŸããHadoop ã¯ã©ã¹ã¿ãŒãå¿ èŠã§ãã ããšãã°ãHDP ãã£ã¹ããªãã¥ãŒã·ã§ã³ã䜿çšããŸãã
ããŒããè¿ éã«è¿œå ããã³åé€ããã«ã¯ãããŒãéã§åœ¹å²ãäžå®ã«åæ£ããå¿ èŠããããŸãã
- ãã¹ã¿ãŒããŒãã ããã§ãããç¹ã«èª¬æããå¿ èŠã¯ãããŸãããã¯ã©ã¹ã¿ãŒã®ã¡ã€ã³ ããŒãã察話ã¢ãŒãã䜿çšããå Žåãããšãã° Spark ãã©ã€ããŒãèµ·åãããŸãã
- æ¥ä»ããŒãã ããã¯ãHDFS ã«ããŒã¿ãä¿åããèšç®ãè¡ãããããŒãã§ãã
- ã³ã³ãã¥ãŒãã£ã³ã°ããŒãã ããã¯ãHDFS ã«äœãä¿åããªãããŒãã§ãããèšç®ãè¡ãããããŒãã§ãã
倧äºãªãã€ã³ãã XNUMX çªç®ã®ã¿ã€ãã®ããŒãã«ããèªåã¹ã±ãŒãªã³ã°ãçºçããŸãã XNUMX çªç®ã®ã¿ã€ãã®ããŒãã®ååŸãšè¿œå ãéå§ãããšãå¿çé床ãéåžžã«é ããªããã¯ã©ã¹ã¿ãŒã§ã®å»æ¢ãšåã³ãããã«æ°æéããããŸãã ãã¡ãããããã¯èªåã¹ã±ãŒãªã³ã°ã«æåŸ ããããã®ã§ã¯ãããŸããã ã€ãŸããXNUMX çªç®ãš XNUMX çªç®ã®ã¿ã€ãã®ããŒãã«ã¯è§ŠããŸããã ãããã¯ãããã°ã©ã ã®æéå šäœãéããŠååšãããå®è¡å¯èœãªæå°éã®ã¯ã©ã¹ã¿ãŒãè¡šããŸãã
ãããã£ãŠãç§ãã¡ã®ãªãŒãã¹ã±ãŒã©ãŒã¯ Python 3 ã§æžãããŠãããAmbari API ã䜿çšããŠã¯ã©ã¹ã¿ãŒ ãµãŒãã¹ã管çãã
ãœãªã¥ãŒã·ã§ã³ã¢ãŒããã¯ãã£
- ã¢ãžã¥ãŒã«
autoscaler.py
ã ããã«ã¯ 1 ã€ã®ã¯ã©ã¹ãå«ãŸããŠããŸã: 2) Ambari ã§åäœããé¢æ°ã3) MCS ã§åäœããé¢æ°ãXNUMX) ãªãŒãã¹ã±ãŒã©ãŒã®ããžãã¯ã«çŽæ¥é¢é£ããé¢æ°ã - ã¹ã¯ãªãã
observer.py
ã åºæ¬çã«ããªãŒãã¹ã±ãŒã©ãŒé¢æ°ããã€ãã©ã®ç¬éã«åŒã³åºãããšããããŸããŸãªã«ãŒã«ã§æ§æãããŸãã - èšå®ãã¡ã€ã«
config.py
ã ããšãã°ãèªåã¹ã±ãŒãªã³ã°ãèš±å¯ãããŠããããŒãã®ãªã¹ãããæ°ããããŒããè¿œå ãããç¬éããåŸ æ©ããæéãªã©ã«åœ±é¿ãããã®ä»ã®ãã©ã¡ãŒã¿ãŒãå«ãŸããŠããŸãã ã¯ã©ã¹ã®éå§ã®ã¿ã€ã ã¹ã¿ã³ãããããããã¯ã©ã¹ã®åã«æ倧蚱容ã¯ã©ã¹ã¿ãŒæ§æãèµ·åãããŸãã
æåã® XNUMX ã€ã®ãã¡ã€ã«å ã®ã³ãŒãéšåãèŠãŠã¿ãŸãããã
1. Autoscaler.py ã¢ãžã¥ãŒã«
ã¢ã³ããªçŽ
ã¯ã©ã¹ãå«ãã³ãŒãã¯æ¬¡ã®ããã«ãªããŸã Ambari
:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return message
äŸãšããŠãäžèšã®é¢æ°ã®å®è£
ãèŠãããšãã§ããŸãã stop_all_services
ããã«ãããç®çã®ã¯ã©ã¹ã¿ãŒ ããŒãäžã®ãã¹ãŠã®ãµãŒãã¹ãåæ¢ãããŸãã
æ宀ã®å
¥ãå£ã§ Ambari
ããªãã¯åæ ŒããŸãïŒ
ambari_url
ããšãã°ã次ã®ããã«ãªããŸã'http://localhost:8080/api/v1/clusters/'
,cluster_name
â Ambari ã®ã¯ã©ã¹ã¿ãŒã®ååãheaders = {'X-Requested-By': 'ambari'}
- ãããŠäž
auth
Ambari ã®ãŠãŒã¶ãŒåãšãã¹ã¯ãŒãã¯æ¬¡ã®ãšããã§ããauth = ('login', 'password')
.
é¢æ°èªäœã¯ãREST API ãä»ã㊠Ambari ãæ°ååŒã³åºãã ãã§ãã è«ççãªèŠ³ç¹ããã¯ããŸãããŒãäžã§å®è¡äžã®ãµãŒãã¹ã®ãªã¹ããåãåãã次ã«ãç¹å®ã®ããŒãäžã®ç¹å®ã®ã¯ã©ã¹ã¿ãŒäžã§ãµãŒãã¹ããªã¹ãããç¶æ
ã«è»¢éããããã«èŠæ±ããŸãã INSTALLED
ã ãã¹ãŠã®ãµãŒãã¹ãèµ·åããããŒããç¶æ
ã«è»¢éããããã®é¢æ° Maintenance
ãªã©ã¯äŒŒãŠããŸããããã㯠API ãä»ããããã€ãã®ãªã¯ãšã¹ãã«ãããŸããã
ã¯ã©ã¹ããã¯
ã¯ã©ã¹ãå«ãã³ãŒãã¯æ¬¡ã®ããã«ãªããŸã Mcs
:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_code
æ宀ã®å
¥ãå£ã§ Mcs
ã¯ã©ãŠãå
ã®ãããžã§ã¯ã IDããŠãŒã¶ãŒ IDãããã³ãã¹ã¯ãŒããæž¡ããŸãã æ©èœäž vm_turn_on
ãã·ã³ã® 1 ã€ããªã³ã«ããããšèããŠããŸãã ããã§ã®ããžãã¯ã¯ããå°ãè€éã§ãã ã³ãŒãã®å
é ã§ãä»ã® 2 ã€ã®é¢æ°ãåŒã³åºãããŸã: 3) ããŒã¯ã³ãååŸããå¿
èŠããããXNUMX) ãã¹ãåã MCS å
ã®ãã·ã³ã®ååã«å€æããå¿
èŠããããXNUMX) ãã®ãã·ã³ã® ID ãååŸããã 次ã«ããã¹ããªã¯ãšã¹ããäœæããŠããã®ãã·ã³ãèµ·åããŸãã
ããŒã¯ã³ãååŸããé¢æ°ã¯æ¬¡ã®ããã«ãªããŸãã
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.token
ãªãŒãã¹ã±ãŒã©ãŒã¯ã©ã¹
ãã®ã¯ã©ã¹ã«ã¯ãåäœããžãã¯èªäœã«é¢é£ããé¢æ°ãå«ãŸããŠããŸãã
ãã®ã¯ã©ã¹ã®ã³ãŒãã¯æ¬¡ã®ããã«ãªããŸãã
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return message
ãšã³ããªãŒã¯ã©ã¹ãåãä»ããŠãããŸãã Ambari
О Mcs
ãã¹ã±ãŒãªã³ã°ãèš±å¯ãããŠããããŒãã®ãªã¹ããããã³ããŒââãæ§æãã©ã¡ãŒã¿ (YARN ã§ããŒãã«å²ãåœãŠãããã¡ã¢ãªãš CPU)ã ãŸãããã¥ãŒã§ãã 2 ã€ã®å
éšãã©ã¡ãŒã¿ q_ramãq_cpu ããããŸãã ãããã䜿çšããŠãçŸåšã®ã¯ã©ã¹ã¿ãŒè² è·ã®å€ãä¿åããŸãã éå» 5 åéã«è² è·ãäžè²«ããŠå¢å ããŠããããšã確èªãããå Žåã¯ãã¯ã©ã¹ã¿ãŒã« +1 ããŒããè¿œå ããå¿
èŠããããšå€æããŸãã ã¯ã©ã¹ã¿ãŒã®äœ¿çšçãäœãç¶æ
ã«ã€ããŠãåãããšãåœãŠã¯ãŸããŸãã
äžèšã®ã³ãŒãã¯ãã¯ã©ã¹ã¿ãŒãããã·ã³ãåé€ããã¯ã©ãŠãã§åæ¢ããé¢æ°ã®äŸã§ãã ãŸãã¯å»çã§ã YARN Nodemanager
ãã¢ãŒãããªã³ã«ãªããŸã Maintenance
次ã«ããã·ã³äžã®ãã¹ãŠã®ãµãŒãã¹ãåæ¢ããã¯ã©ãŠãå
ã®ä»®æ³ãã·ã³ããªãã«ããŸãã
2. ã¹ã¯ãªããobserver.py
ããããã®ãµã³ãã«ã³ãŒã:
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)
ãã®äžã§ãã¯ã©ã¹ã¿ãŒã®å®¹éãå¢ããããã®æ¡ä»¶ãäœæãããŠãããã©ãããããã³äºåã®ãã·ã³ããããã©ããã確èªãããã®ãã¡ã® XNUMX ã€ã®ãã¹ãåãååŸããŠã¯ã©ã¹ã¿ãŒã«è¿œå ããããã«é¢ããã¡ãã»ãŒãžãããŒã ã® Slack ã§å
¬éããŸãã ãã®åŸå§ãŸããŸã cooldown_period
ã¯ã©ã¹ã¿ãŒã«äœãè¿œå ãŸãã¯åé€ãããåã«è² è·ãç£èŠããå Žåã§ãã å®å®ããæé©ãªè² è·å€ã®ç¯å²å
ã«ããå Žåã¯ãç£èŠãç¶ç¶ããã ãã§ãã XNUMX ã€ã®ããŒãã§ã¯äžååãªå Žåã¯ãå¥ã®ããŒããè¿œå ããŸãã
ã¬ãã¹ã³ãäºå®ãããŠããå ŽåãXNUMX ã€ã®ããŒãã§ã¯ååã§ã¯ãªãããšããã§ã«ããã£ãŠããããã空ããŠãããã¹ãŠã®ããŒããããã«èµ·åããã¬ãã¹ã³ãçµäºãããŸã§ã¢ã¯ãã£ããªãŸãŸã«ããŸãã ããã¯ãã¢ã¯ãã£ããã£ã®ã¿ã€ã ã¹ã¿ã³ãã®ãªã¹ãã䜿çšããŠè¡ãããŸãã
ãŸãšã
ãªãŒãã¹ã±ãŒã©ãŒã¯ãã¯ã©ã¹ã¿ãŒã®è² è·ãäžåäžã«ãªãå Žåã«é©ãã䟿å©ãªãœãªã¥ãŒã·ã§ã³ã§ãã ããŒã¯è² è·æã«å¿ èŠãªã¯ã©ã¹ã¿ãŒæ§æãå®çŸãããšåæã«ãäœè² è·æã«ãã®ã¯ã©ã¹ã¿ãŒãç¶æããªãããšã§ã³ã¹ããç¯çŽã§ããŸãã ããã«ãããã¯ãã¹ãŠããŠãŒã¶ãŒã®åå ãªãã§èªåçã«è¡ãããŸãã ãªãŒãã¹ã±ãŒã©ãŒèªäœã¯ãç¹å®ã®ããžãã¯ã«åŸã£ãŠèšè¿°ããããã¯ã©ã¹ã¿ãŒ ãããŒãžã£ãŒ API ããã³ã¯ã©ãŠã ãããã€ã㌠API ãžã®äžé£ã®ãªã¯ãšã¹ãã«ãããŸããã å¿ ãèŠããŠããããã®ã¯ãå ã»ã©ãæžããããã«ããŒãã3çš®é¡ã«åããããšã§ãã ãããŠããªãã¯å¹žãã«ãªãã§ãããã
åºæïŒ habr.com