Բարեւ Ձեզ! Մենք մարդկանց սովորեցնում ենք աշխատել մեծ տվյալների հետ: Անհնար է պատկերացնել մեծ տվյալների վերաբերյալ կրթական ծրագիր առանց սեփական կլաստերի, որի վրա բոլոր մասնակիցները միասին աշխատեն։ Այդ իսկ պատճառով մեր ծրագիրը միշտ ունի այն 🙂 Մենք զբաղվում ենք դրա կազմաձևմամբ, թյունինգով և կառավարմամբ, և տղաները ուղղակիորեն գործարկում են MapReduce-ի աշխատանքները և օգտագործում Spark-ը:
Այս գրառման մեջ մենք ձեզ կպատմենք, թե ինչպես լուծեցինք կլաստերի անհավասար բեռնման խնդիրը՝ գրելով մեր սեփական autoscaler-ը՝ օգտագործելով ամպը:
խնդիր
Մեր կլաստերը չի օգտագործվում տիպիկ ռեժիմում: Հեռացումը խիստ անհավասար է: Օրինակ, կան գործնական պարապմունքներ, երբ բոլոր 30 հոգին և ուսուցիչը գնում են կլաստերի և սկսում են օգտագործել այն։ Կամ կրկին, վերջնաժամկետից առաջ կան օրեր, երբ ծանրաբեռնվածությունը մեծապես ավելանում է: Մնացած ժամանակ կլաստերը գործում է թերբեռնվածության ռեժիմում:
Լուծումը թիվ 1-ն է՝ պահպանել այնպիսի կլաստեր, որը կդիմանա գագաթնակետային բեռներին, բայց մնացած ժամանակ անգործուն մնա:
Լուծումը թիվ 2-ն է փոքր կլաստեր պահելը, որին դուք ձեռքով հանգույցներ եք ավելացնում դասերից առաջ և առավելագույն բեռների ժամանակ:
Լուծումը #3-ն է՝ պահել փոքր կլաստեր և գրել ավտոմատ սանդղակ, որը կվերահսկի կլաստերի ընթացիկ ծանրաբեռնվածությունը և օգտագործելով տարբեր API-ներ, կավելացնի և կհեռացնի հանգույցները կլաստերից:
Այս գրառման մեջ կխոսենք թիվ 3 լուծման մասին։ Այս autoscaler-ը մեծապես կախված է արտաքին գործոններից, այլ ոչ թե ներքին գործոններից, և մատակարարները հաճախ դա չեն տրամադրում: Մենք օգտագործում ենք Mail.ru Cloud Solutions ամպային ենթակառուցվածքը և գրել ենք autoscaler՝ օգտագործելով MCS API: Եվ քանի որ մենք սովորեցնում ենք, թե ինչպես աշխատել տվյալների հետ, մենք որոշեցինք ցույց տալ, թե ինչպես կարող եք գրել նմանատիպ autoscaler ձեր սեփական նպատակների համար և օգտագործել այն ձեր ամպի հետ:
նախադրյալները
Նախ, դուք պետք է ունենաք Hadoop կլաստեր: Օրինակ, մենք օգտագործում ենք HDP բաշխումը:
Որպեսզի ձեր հանգույցները արագ ավելացվեն և հեռացվեն, դուք պետք է ունենաք դերերի որոշակի բաշխում հանգույցների միջև:
- Վարպետ հանգույց. Դե, առանձնապես որևէ բան բացատրելու կարիք չկա. կլաստերի հիմնական հանգույցը, որի վրա, օրինակ, գործարկվում է Spark դրայվերը, եթե օգտագործում եք ինտերակտիվ ռեժիմը:
- Ամսաթվի հանգույց. Սա այն հանգույցն է, որի վրա դուք տվյալները պահում եք HDFS-ում և որտեղ կատարվում են հաշվարկները:
- Հաշվողական հանգույց. Սա հանգույց է, որտեղ դուք ոչինչ չեք պահում HDFS-ում, բայց որտեղ կատարվում են հաշվարկներ:
Կարևոր կետ. Ավտոմատ մասշտաբը տեղի կունենա երրորդ տիպի հանգույցների պատճառով: Եթե դուք սկսեք վերցնել և ավելացնել երկրորդ տիպի հանգույցներ, արձագանքման արագությունը կլինի շատ ցածր՝ ապամոնտաժումը և վերագործարկումը ժամեր կպահանջեն ձեր կլաստերի վրա: Սա, իհարկե, այն չէ, ինչ դուք ակնկալում եք autoscaling-ից: Այսինքն՝ մենք չենք դիպչում առաջին և երկրորդ տիպի հանգույցներին։ Դրանք կներկայացնեն նվազագույն կենսունակ կլաստեր, որը գոյություն կունենա ծրագրի ողջ ընթացքում:
Այսպիսով, մեր autoscaler-ը գրված է Python 3-ում, օգտագործում է Ambari API՝ կլաստերային ծառայությունները կառավարելու համար, օգտագործում է
Լուծման ճարտարապետություն
- Մոդուլ
autoscaler.py
. Այն պարունակում է երեք դաս՝ 1) Ambari-ի հետ աշխատելու գործառույթներ, 2) MCS-ի հետ աշխատելու գործառույթներ, 3) գործառույթներ, որոնք անմիջականորեն կապված են ավտոսանդղակի տրամաբանության հետ։ - Սցենար
observer.py
. Ըստ էության, այն բաղկացած է տարբեր կանոններից՝ երբ և որ պահերին կանչել ավտոսանդղակի գործառույթները: - Կազմաձևման ֆայլ
config.py
. Այն պարունակում է, օրինակ, հանգույցների ցանկ, որոնք թույլատրվում են ավտոմատ մասշտաբավորման համար և այլ պարամետրեր, որոնք ազդում են, օրինակ, նոր հանգույցի ավելացման պահից սպասելու որքան ժամանակ: Կան նաև դասերի մեկնարկի ժամանակացույցեր, որպեսզի դասից առաջ գործարկվի առավելագույն թույլատրելի կլաստերի կազմաձևումը:
Եկեք հիմա նայենք առաջին երկու ֆայլերի ներսում գտնվող կոդի կտորներին:
1. Autoscaler.py մոդուլ
Ամբարի դաս
Ահա թե ինչ տեսք ունի դաս պարունակող կոդի հատվածը Ambari
:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return message
Վերևում, որպես օրինակ, կարող եք դիտել գործառույթի իրականացումը stop_all_services
, որը դադարեցնում է բոլոր ծառայությունները ցանկալի կլաստերի հանգույցում:
Դասի մուտքի մոտ Ambari
դու անցնում ես:
ambari_url
, օրինակ, նման'http://localhost:8080/api/v1/clusters/'
,cluster_name
– Ձեր կլաստերի անունը Ամբարիում,headers = {'X-Requested-By': 'ambari'}
- և ներսում
auth
ահա ձեր օգտանունը և գաղտնաբառը Ambari-ի համար.auth = ('login', 'password')
.
Ֆունկցիան ինքնին ոչ այլ ինչ է, քան մի քանի զանգ REST API-ի միջոցով Ambari-ին: Տրամաբանական տեսանկյունից մենք սկզբում ստանում ենք մի հանգույցի վրա գործող ծառայությունների ցանկ, այնուհետև խնդրում ենք տվյալ կլաստերի վրա, տվյալ հանգույցի վրա, ծառայությունները ցուցակից տեղափոխել վիճակ: INSTALLED
. Բոլոր ծառայությունները գործարկելու, հանգույցները պետությանը փոխանցելու գործառույթներ Maintenance
և այլն նման տեսք ունեն. դրանք ընդամենը մի քանի հարցումներ են API-ի միջոցով:
Դասարան Mcs
Ահա թե ինչ տեսք ունի դաս պարունակող կոդի հատվածը Mcs
:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_code
Դասի մուտքի մոտ Mcs
մենք փոխանցում ենք նախագծի ID-ն ամպի ներսում և օգտվողի ID-ն, ինչպես նաև նրա գաղտնաբառը: Գործառույթի մեջ vm_turn_on
մենք ուզում ենք միացնել մեքենաներից մեկը: Այստեղ տրամաբանությունը մի փոքր ավելի բարդ է։ Կոդի սկզբում երեք այլ ֆունկցիաներ են կոչվում՝ 1) պետք է ստանանք նշան, 2) պետք է հոսթի անունը փոխարկենք մեքենայի անվանման MCS-ում, 3) ստանանք այս մեքենայի id-ը։ Հաջորդը, մենք պարզապես կատարում ենք գրառման հարցում և գործարկում այս մեքենան:
Նշան ստանալու ֆունկցիան այսպիսին է թվում.
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.token
Autoscaler դաս
Այս դասը պարունակում է գործառույթներ՝ կապված բուն գործող տրամաբանության հետ։
Այս դասի կոդի կտորն այսպիսի տեսք ունի.
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return message
Ընդունում ենք դասեր մուտքի համար։ Ambari
и Mcs
, հանգույցների ցանկը, որոնք թույլատրվում են մասշտաբավորման համար, ինչպես նաև հանգույցի կազմաձևման պարամետրերը. YARN-ում հանգույցին հատկացված հիշողություն և պրոցեսոր: Կան նաև 2 ներքին պարամետրեր q_ram, q_cpu, որոնք հերթեր են։ Օգտագործելով դրանք, մենք պահում ենք ընթացիկ կլաստերի բեռնվածության արժեքները: Եթե մենք տեսնում ենք, որ վերջին 5 րոպեների ընթացքում հետևողականորեն ավելացել է բեռը, ապա մենք որոշում ենք, որ մենք պետք է ավելացնենք +1 հանգույց կլաստերին: Նույնը վերաբերում է կլաստերների թերօգտագործման վիճակին:
Վերևի կոդը մի ֆունկցիայի օրինակ է, որը հեռացնում է մեքենան կլաստերից և կանգնեցնում այն ամպի մեջ: Նախ կա շահագործումից հանում YARN Nodemanager
, ապա ռեժիմը միանում է Maintenance
, ապա մենք դադարեցնում ենք բոլոր ծառայությունները մեքենայի վրա և անջատում ենք վիրտուալ մեքենան ամպի մեջ:
2. Սցենարի դիտորդ.py
Կոդի նմուշ այնտեղից.
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)
Դրանում մենք ստուգում ենք, թե արդյոք պայմաններ են ստեղծվել կլաստերի հզորությունը մեծացնելու համար և կա՞ն արդյոք պահեստային մեքենաներ, ստանում ենք դրանցից մեկի հոսթի անունը, ավելացնում ենք այն կլաստերին և դրա մասին հաղորդագրություն ենք հրապարակում մեր թիմի Slack-ում։ Որից հետո սկսվում է cooldown_period
, երբ մենք ոչինչ չենք ավելացնում կամ հեռացնում կլաստերից, այլ պարզապես վերահսկում ենք բեռը։ Եթե այն կայունացել է և գտնվում է բեռնվածքի օպտիմալ արժեքների միջանցքում, ապա մենք պարզապես շարունակում ենք մոնիտորինգը: Եթե մի հանգույցը բավարար չէր, ապա ավելացնում ենք եւս մեկը։
Այն դեպքերի համար, երբ առջեւում դաս ունենք, արդեն հաստատ գիտենք, որ մեկ հանգույցը չի բավականացնի, ուստի անմիջապես սկսում ենք բոլոր անվճար հանգույցները և ակտիվ ենք պահում մինչև դասի ավարտը։ Դա տեղի է ունենում՝ օգտագործելով գործունեության ժամանակային դրոշմանիշների ցանկը:
Ամփոփում
Autoscaler-ը լավ և հարմար լուծում է այն դեպքերի համար, երբ դուք զգում եք անհավասար կլաստերային բեռնում: Դուք միաժամանակ հասնում եք գագաթնակետային բեռների համար ցանկալի կլաստերի կոնֆիգուրացիային և միևնույն ժամանակ չեք պահում այս կլաստերը թերբեռնվածության ժամանակ՝ խնայելով գումար: Դե, գումարած, այս ամենը տեղի է ունենում ինքնաբերաբար առանց ձեր մասնակցության: Ինքնին autoscaler-ը ոչ այլ ինչ է, քան կլաստերի կառավարչի API-ին և ամպային մատակարարի API-ին ուղղված հարցումների մի շարք, որոնք գրված են որոշակի տրամաբանությամբ: Այն, ինչ դուք անպայման պետք է հիշեք, հանգույցների բաժանումն է 3 տեսակի, ինչպես ավելի վաղ գրել էինք: Եվ դուք երջանիկ կլինեք:
Source: www.habr.com