Salom! Biz odamlarni katta ma'lumotlar bilan ishlashga o'rgatamiz. Katta ma'lumotlar bo'yicha ta'lim dasturini barcha ishtirokchilar birgalikda ishlaydigan o'z klasterisiz tasavvur qilib bo'lmaydi. Shu sababli, bizning dasturimizda har doim mavjud π Biz uning konfiguratsiyasi, sozlashi va boshqaruvi bilan shug'ullanamiz va yigitlar u erda to'g'ridan-to'g'ri MapReduce ishlarini ishga tushiradilar va Spark dan foydalanadilar.
Ushbu postda biz bulut yordamida o'z avtoskalerimizni yozish orqali klasterni notekis yuklash muammosini qanday hal qilganimizni aytib beramiz.
muammo
Bizning klasterimiz odatiy rejimda ishlatilmaydi. Utilizatsiya juda notekis. Misol uchun, amaliy mashg'ulotlar bor, 30 kishi va o'qituvchi klasterga borib, undan foydalanishni boshlaydi. Yoki yana, yuk juda ko'payib ketadigan muddatdan bir necha kun oldin. Qolgan vaqtda klaster kam yuklanish rejimida ishlaydi.
β1 yechim - eng yuqori yuklarga bardosh beradigan klasterni saqlash, ammo qolgan vaqtlarda ishlamay qoladi.
Yechim β2 kichik klasterni saqlashdan iborat bo'lib, siz darslardan oldin va eng yuqori yuklanish vaqtida tugunlarni qo'lda qo'shasiz.
Yechim β3 kichik klasterni saqlash va klasterning joriy yukini kuzatib boradigan va turli API-lardan foydalanib, klasterdan tugunlarni qo'shish va olib tashlash uchun avtoskalerni yozishdir.
Ushbu postda biz β3 yechim haqida gaplashamiz. Ushbu avtoskaler ichki omillarga emas, balki tashqi omillarga juda bog'liq va provayderlar ko'pincha buni ta'minlamaydilar. Biz Mail.ru Cloud Solutions bulutli infratuzilmasidan foydalanamiz va MCS API yordamida avtoskaler yozdik. Va biz ma'lumotlar bilan ishlashni o'rgatganimiz sababli, o'z maqsadlaringiz uchun qanday qilib shunga o'xshash avtoskalerni yozishingiz va uni bulutingiz bilan ishlatishingiz mumkinligini ko'rsatishga qaror qildik.
Talablar
Birinchidan, sizda Hadoop klasteri bo'lishi kerak. Misol uchun, biz HDP taqsimotidan foydalanamiz.
Tugunlaringiz tezda qo'shilishi va o'chirilishi uchun siz tugunlar o'rtasida ma'lum bir rol taqsimotiga ega bo'lishingiz kerak.
- Asosiy tugun. Xo'sh, ayniqsa, biror narsani tushuntirishning hojati yo'q: agar siz interaktiv rejimdan foydalansangiz, masalan, Spark drayverini ishga tushiradigan klasterning asosiy tuguni.
- Sana tugun. Bu HDFS-da ma'lumotlarni saqlaydigan va hisob-kitoblar amalga oshiriladigan tugun.
- Hisoblash tugun. Bu HDFS-da hech narsa saqlamaydigan, ammo hisob-kitoblar sodir bo'ladigan tugun.
Muhim nuqta. Avtomatik o'lchov uchinchi turdagi tugunlar tufayli sodir bo'ladi. Agar siz ikkinchi turdagi tugunlarni olishni va qo'shishni boshlasangiz, javob tezligi juda past bo'ladi - o'chirish va qayta ishga tushirish klasteringizda bir necha soat davom etadi. Bu, albatta, avtomatik o'lchovdan kutgan narsa emas. Ya'ni, biz birinchi va ikkinchi turdagi tugunlarga tegmaymiz. Ular dastur davomida mavjud bo'lgan minimal hayotiy klasterni ifodalaydi.
Shunday qilib, bizning avtoskalerimiz Python 3 da yozilgan, klaster xizmatlarini boshqarish uchun Ambari API-dan foydalanadi, foydalanadi
Yechim arxitekturasi
- Modul
autoscaler.py
. U uchta sinfni o'z ichiga oladi: 1) Ambari bilan ishlash funktsiyalari, 2) MCS bilan ishlash funktsiyalari, 3) to'g'ridan-to'g'ri avtoskaler mantig'i bilan bog'liq funktsiyalar. - Skript
observer.py
. Asosan, u turli xil qoidalardan iborat: qachon va qaysi daqiqalarda avtoskaler funksiyalarini chaqirish kerak. - Konfiguratsiya fayli
config.py
. U, masalan, avtomatik o'lchamga ruxsat berilgan tugunlar ro'yxatini va boshqa parametrlarni o'z ichiga oladi, masalan, yangi tugun qo'shilgan paytdan boshlab qancha kutish kerak. Sinf boshlanishidan oldin maksimal ruxsat etilgan klaster konfiguratsiyasi ishga tushirilishi uchun darslarni boshlash vaqt belgilari ham mavjud.
Endi birinchi ikkita fayl ichidagi kod qismlarini ko'rib chiqamiz.
1. Autoscaler.py moduli
Ambari sinfi
Sinfni o'z ichiga olgan kod qismi shunday ko'rinadi Ambari
:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return message
Yuqorida, misol sifatida, siz funktsiyaning bajarilishini ko'rishingiz mumkin stop_all_services
, bu istalgan klaster tugunidagi barcha xizmatlarni to'xtatadi.
Sinfga kiraverishda Ambari
siz o'tasiz:
ambari_url
, masalan, kabi'http://localhost:8080/api/v1/clusters/'
,cluster_name
β Ambaridagi klasteringiz nomi,headers = {'X-Requested-By': 'ambari'}
- va ichkarida
auth
Mana sizning Ambari uchun login va parolingiz:auth = ('login', 'password')
.
Funktsiyaning o'zi REST API orqali Ambari-ga bir nechta qo'ng'iroqlardan boshqa narsa emas. Mantiqiy nuqtai nazardan, biz birinchi navbatda tugundagi ishlaydigan xizmatlar ro'yxatini olamiz, so'ngra ma'lum bir klasterda, berilgan tugunda xizmatlarni ro'yxatdan davlatga o'tkazishni so'raymiz. INSTALLED
. Barcha xizmatlarni ishga tushirish, tugunlarni holatga o'tkazish funktsiyalari Maintenance
va hokazolar o'xshash ko'rinadi - ular API orqali bir nechta so'rovlardir.
Class Mcs
Sinfni o'z ichiga olgan kod qismi shunday ko'rinadi Mcs
:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_code
Sinfga kiraverishda Mcs
biz loyiha identifikatorini bulut ichida va foydalanuvchi identifikatorini, shuningdek uning parolini o'tkazamiz. Funktsiyada vm_turn_on
biz mashinalardan birini yoqmoqchimiz. Bu erda mantiq biroz murakkabroq. Kodning boshida yana uchta funksiya chaqiriladi: 1) token olishimiz kerak, 2) xost nomini MCS da mashina nomiga aylantirishimiz kerak, 3) ushbu mashinaning identifikatorini olish. Keyinchalik, biz shunchaki so'rov yuboramiz va ushbu mashinani ishga tushiramiz.
Tokenni olish funksiyasi shunday ko'rinadi:
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.token
Autoscaler klassi
Bu sinf operatsion mantiqning o'zi bilan bog'liq funktsiyalarni o'z ichiga oladi.
Bu sinf uchun kodning bir qismi shunday ko'rinadi:
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return message
Kirish uchun darslarni qabul qilamiz. Ambari
ΠΈ Mcs
, masshtablash uchun ruxsat etilgan tugunlar ro'yxati, shuningdek, tugun konfiguratsiyasi parametrlari: YARN-dagi tugunga ajratilgan xotira va CPU. Bundan tashqari, navbatlar bo'lgan 2 ta ichki parametr q_ram, q_cpu mavjud. Ulardan foydalanib, biz joriy klaster yukining qiymatlarini saqlaymiz. Agar so'nggi 5 daqiqada yuk doimiy ravishda ortib borayotganini ko'rsak, biz klasterga +1 tugunni qo'shishimiz kerak deb qaror qilamiz. Xuddi shu narsa klasterdan to'liq foydalanilmaslik holati uchun ham amal qiladi.
Yuqoridagi kod mashinani klasterdan olib tashlaydigan va uni bulutda to'xtatuvchi funksiyaga misoldir. Avval foydalanishdan chiqarish mavjud YARN Nodemanager
, keyin rejim yoqiladi Maintenance
, keyin biz mashinadagi barcha xizmatlarni to'xtatamiz va bulutdagi virtual mashinani o'chirib qo'yamiz.
2. Observer.py skripti
U yerdan namuna kodi:
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)
Unda biz klaster quvvatini oshirish uchun sharoitlar yaratilganmi yoki zaxirada mashinalar bor-yoβqligini tekshiramiz, ulardan birining xost nomini olamiz, uni klasterga qoβshamiz va jamoamizning Slack-da bu haqda xabar eβlon qilamiz. Shundan so'ng u boshlanadi cooldown_period
, biz klasterdan hech narsa qo'shmasak yoki olib tashlamasak, shunchaki yukni kuzatamiz. Agar u barqarorlashgan bo'lsa va optimal yuk qiymatlari koridorida bo'lsa, biz shunchaki monitoringni davom ettiramiz. Agar bitta tugun etarli bo'lmasa, biz boshqasini qo'shamiz.
Oldinda dars bo'lgan holatlar uchun biz bitta tugun etarli bo'lmasligini aniq bilamiz, shuning uchun biz darhol barcha bo'sh tugunlarni ishga tushiramiz va ularni dars oxirigacha faol tutamiz. Bu harakat vaqt belgilari ro'yxati yordamida sodir bo'ladi.
xulosa
Autoscaler klasterni notekis yuklash holatlari uchun yaxshi va qulay yechimdir. Siz bir vaqtning o'zida eng yuqori yuklanishlar uchun kerakli klaster konfiguratsiyasiga erishasiz va shu bilan birga pulni tejab, kam yuklanish vaqtida ushbu klasterni saqlamaysiz. Bundan tashqari, bularning barchasi sizning ishtirokingizsiz avtomatik ravishda sodir bo'ladi. Avtomatik o'lchovchining o'zi ma'lum bir mantiq bo'yicha yozilgan klaster menejeri API va bulut provayderi API-ga so'rovlar to'plamidan boshqa narsa emas. Siz aniq eslashingiz kerak bo'lgan narsa, biz ilgari yozganimizdek, tugunlarni 3 turga bo'lishdir. Va siz baxtli bo'lasiz.
Manba: www.habr.com