Kamusta! Sinasanay namin ang mga tao na magtrabaho gamit ang malaking data. Imposibleng isipin ang isang programang pang-edukasyon sa malaking data na walang sariling kumpol, kung saan ang lahat ng mga kalahok ay nagtutulungan. Para sa kadahilanang ito, palaging mayroon nito ang aming programa :) Kami ay nakikibahagi sa pagsasaayos, pag-tune at pangangasiwa nito, at ang mga lalaki ay direktang naglulunsad ng mga trabaho sa MapReduce doon at gumagamit ng Spark.
Sa post na ito sasabihin namin sa iyo kung paano namin nalutas ang problema ng hindi pantay na pag-load ng cluster sa pamamagitan ng pagsulat ng aming sariling autoscaler gamit ang cloud
problema
Ang aming cluster ay hindi ginagamit sa isang karaniwang mode. Ang pagtatapon ay lubos na hindi pantay. Halimbawa, may mga praktikal na klase, kapag ang lahat ng 30 tao at isang guro ay pumunta sa cluster at nagsimulang gamitin ito. O muli, may mga araw bago ang deadline kung kailan tumataas nang husto ang load. Ang natitirang oras ay gumagana ang cluster sa underload mode.
Ang solusyon #1 ay panatilihin ang isang kumpol na makatiis sa mga peak load, ngunit magiging idle sa natitirang oras.
Ang solusyon #2 ay upang panatilihin ang isang maliit na kumpol, kung saan manu-mano kang magdagdag ng mga node bago ang mga klase at sa panahon ng mga peak load.
Ang solusyon #3 ay upang magpanatili ng isang maliit na cluster at magsulat ng isang autoscaler na susubaybay sa kasalukuyang pagkarga ng cluster at, gamit ang iba't ibang mga API, magdagdag at mag-alis ng mga node mula sa cluster.
Sa post na ito ay pag-uusapan natin ang solusyon #3. Ang autoscaler na ito ay lubos na nakadepende sa mga panlabas na salik kaysa sa mga panloob, at kadalasang hindi ito ibinibigay ng mga provider. Ginagamit namin ang imprastraktura ng ulap ng Mail.ru Cloud Solutions at nagsulat ng autoscaler gamit ang MCS API. At dahil itinuro namin kung paano gumawa ng data, nagpasya kaming ipakita kung paano ka makakasulat ng katulad na autoscaler para sa iyong sariling mga layunin at gamitin ito sa iyong cloud
Kinakailangan
Una, dapat mayroon kang Hadoop cluster. Halimbawa, ginagamit namin ang pamamahagi ng HDP.
Upang mabilis na maidagdag at maalis ang iyong mga node, dapat ay mayroon kang isang tiyak na pamamahagi ng mga tungkulin sa mga node.
- Master node. Buweno, walang partikular na kailangang ipaliwanag dito: ang pangunahing node ng kumpol, kung saan, halimbawa, ang driver ng Spark ay inilunsad, kung gagamitin mo ang interactive na mode.
- Node ng petsa. Ito ang node kung saan ka nag-iimbak ng data sa HDFS at kung saan nagaganap ang mga kalkulasyon.
- Pag-compute ng node. Ito ay isang node kung saan hindi ka nag-iimbak ng anuman sa HDFS, ngunit kung saan nangyayari ang mga kalkulasyon.
Mahalagang punto. Ang autoscaling ay magaganap dahil sa mga node ng ikatlong uri. Kung sisimulan mo ang pagkuha at pagdaragdag ng mga node ng pangalawang uri, ang bilis ng pagtugon ay magiging napakababa - ang pag-decommission at muling pagkokomisyon ay aabot ng ilang oras sa iyong cluster. Siyempre, hindi ito ang iyong inaasahan mula sa autoscaling. Iyon ay, hindi namin hinawakan ang mga node ng una at pangalawang uri. Kakatawanin nila ang isang minimum na mabubuhay na cluster na iiral sa buong tagal ng programa.
Kaya, ang aming autoscaler ay nakasulat sa Python 3, ginagamit ang Ambari API upang pamahalaan ang mga serbisyo ng cluster, ginagamit
Arkitektura ng solusyon
- Modyul
autoscaler.py
. Naglalaman ito ng tatlong klase: 1) mga function para sa pagtatrabaho sa Ambari, 2) mga function para sa pagtatrabaho sa MCS, 3) mga function na direktang nauugnay sa logic ng autoscaler. - Script
observer.py
. Sa pangkalahatan, binubuo ito ng iba't ibang panuntunan: kailan at sa anong mga sandali tatawagan ang mga function ng autoscaler. - File ng configuration
config.py
. Naglalaman ito, halimbawa, ng isang listahan ng mga node na pinapayagan para sa autoscaling at iba pang mga parameter na nakakaapekto, halimbawa, kung gaano katagal maghintay mula sa sandaling naidagdag ang isang bagong node. Mayroon ding mga timestamp para sa pagsisimula ng mga klase, upang bago ang klase ay mailunsad ang maximum na pinahihintulutang configuration ng cluster.
Tingnan natin ngayon ang mga piraso ng code sa loob ng unang dalawang file.
1. Autoscaler.py module
klase ng Ambari
Ito ang hitsura ng isang piraso ng code na naglalaman ng isang klase Ambari
:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return message
Sa itaas, bilang isang halimbawa, maaari mong tingnan ang pagpapatupad ng function stop_all_services
, na humihinto sa lahat ng serbisyo sa gustong cluster node.
Sa pasukan ng klase Ambari
pumasa ka:
ambari_url
, halimbawa, tulad ng'http://localhost:8080/api/v1/clusters/'
,cluster_name
β ang pangalan ng iyong kumpol sa Ambari,headers = {'X-Requested-By': 'ambari'}
- at sa loob
auth
narito ang iyong username at password para sa Ambari:auth = ('login', 'password')
.
Ang function mismo ay hindi hihigit sa ilang mga tawag sa pamamagitan ng REST API sa Ambari. Mula sa isang lohikal na pananaw, una kaming makakatanggap ng isang listahan ng mga tumatakbong serbisyo sa isang node, at pagkatapos ay humihiling sa isang partikular na kumpol, sa isang ibinigay na node, na ilipat ang mga serbisyo mula sa listahan patungo sa estado. INSTALLED
. Mga function para sa paglulunsad ng lahat ng mga serbisyo, para sa paglilipat ng mga node sa estado Maintenance
atbp. magkatulad - ang mga ito ay ilan lamang sa mga kahilingan sa pamamagitan ng API.
Class Mcs
Ito ang hitsura ng isang piraso ng code na naglalaman ng isang klase Mcs
:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_code
Sa pasukan ng klase Mcs
ipinapasa namin ang project id sa loob ng cloud at ang user id, pati na rin ang kanyang password. Sa pag-andar vm_turn_on
gusto naming i-on ang isa sa mga makina. Ang lohika dito ay medyo mas kumplikado. Sa simula ng code, tatlong iba pang function ang tinatawag: 1) kailangan nating kumuha ng token, 2) kailangan nating i-convert ang hostname sa pangalan ng makina sa MCS, 3) kunin ang id ng makinang ito. Susunod, gumawa lang kami ng kahilingan sa pag-post at ilunsad ang makinang ito.
Ganito ang hitsura ng function para sa pagkuha ng token:
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.token
Klase ng Autoscaler
Ang klase na ito ay naglalaman ng mga function na nauugnay sa operating logic mismo.
Ito ang hitsura ng isang piraso ng code para sa klase na ito:
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return message
Tumatanggap kami ng mga klase para sa pagpasok. Ambari
ΠΈ Mcs
, isang listahan ng mga node na pinapayagan para sa scaling, pati na rin ang mga parameter ng configuration ng node: memory at cpu na inilalaan sa node sa YARN. Mayroon ding 2 panloob na parameter q_ram, q_cpu, na mga pila. Gamit ang mga ito, iniimbak namin ang mga halaga ng kasalukuyang pag-load ng cluster. Kung nakita namin na sa nakalipas na 5 minuto ay nagkaroon ng patuloy na pagtaas ng load, pagkatapos ay magpasya kaming kailangan naming magdagdag ng +1 node sa cluster. Ang parehong ay totoo para sa cluster underutilization estado.
Ang code sa itaas ay isang halimbawa ng isang function na nag-aalis ng machine mula sa cluster at huminto ito sa cloud. Una ay may decommissioning YARN Nodemanager
, pagkatapos ay i-on ang mode Maintenance
, pagkatapos ay ihihinto namin ang lahat ng serbisyo sa makina at i-off ang virtual machine sa cloud.
2. Script observer.py
Sample code mula doon:
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)
Dito, sinusuri namin kung nalikha na ang mga kundisyon para sa pagtaas ng kapasidad ng cluster at kung mayroong anumang mga machine na nakalaan, kunin ang hostname ng isa sa mga ito, idagdag ito sa cluster at mag-publish ng mensahe tungkol dito sa Slack ng aming team. Pagkatapos nito ay magsisimula cooldown_period
, kapag hindi kami nagdagdag o nag-alis ng anuman mula sa kumpol, ngunit sinusubaybayan lamang ang pagkarga. Kung ito ay naging matatag at nasa loob ng koridor ng pinakamainam na mga halaga ng pag-load, pagkatapos ay ipagpatuloy lang namin ang pagsubaybay. Kung ang isang node ay hindi sapat, pagkatapos ay magdagdag kami ng isa pa.
Para sa mga kaso kung kailan mayroon tayong aralin sa unahan, alam na nating sigurado na ang isang node ay hindi magiging sapat, kaya agad naming sinisimulan ang lahat ng mga libreng node at panatilihing aktibo ang mga ito hanggang sa katapusan ng aralin. Nangyayari ito gamit ang isang listahan ng mga timestamp ng aktibidad.
Konklusyon
Ang Autoscaler ay isang mahusay at maginhawang solusyon para sa mga kasong iyon kapag nakakaranas ka ng hindi pantay na pag-load ng cluster. Sabay-sabay mong makakamit ang gustong configuration ng cluster para sa mga peak load at sa parehong oras ay hindi pinapanatili ang cluster na ito sa panahon ng underload, na nakakatipid ng pera. Well, at ang lahat ng ito ay awtomatikong nangyayari nang wala ang iyong paglahok. Ang autoscaler mismo ay hindi hihigit sa isang hanay ng mga kahilingan sa cluster manager API at cloud provider API, na isinulat ayon sa isang partikular na lohika. Ang talagang kailangan mong tandaan ay ang paghahati ng mga node sa 3 uri, tulad ng isinulat namin kanina. At magiging masaya ka.
Pinagmulan: www.habr.com