Здраво! Ги обучуваме луѓето да работат со големи податоци. Невозможно е да се замисли образовна програма за големи податоци без свој кластер, на кој сите учесници работат заедно. Поради оваа причина, нашата програма секогаш ја има 🙂 Ние сме ангажирани во нејзината конфигурација, подесување и администрација, а момците директно ги лансираат работните места на MapReduce таму и користат Spark.
Во оваа објава ќе ви кажеме како го решивме проблемот со нерамномерно вчитување на кластерот со пишување на сопствен автоскалирач користејќи го облакот
проблем
Нашиот кластер не се користи во типичен режим. Отстранувањето е многу нерамномерно. На пример, има практични часови, кога сите 30 луѓе и еден наставник одат во кластерот и почнуваат да го користат. Или повторно, има денови пред крајниот рок кога товарот значително се зголемува. Остатокот од времето кластерот работи во режим на недоволно оптоварување.
Решението бр. 1 е да се задржи кластер што ќе издржи врвни оптоварувања, но ќе биде неактивен остатокот од времето.
Решението бр. 2 е да се задржи мал кластер, на кој рачно додавате јазли пред часовите и за време на врвните оптоварувања.
Решението бр. 3 е да се задржи мал кластер и да се напише автоскалирач кој ќе го следи тековното оптоварување на кластерот и, користејќи различни API, ќе додава и отстранува јазли од кластерот.
Во овој пост ќе зборуваме за решението бр. 3. Овој автоскалилер е многу зависен од надворешни фактори, а не од внатрешни, и често давателите на услуги не го обезбедуваат. Ја користиме инфраструктурата на облакот на Mail.ru Cloud Solutions и напишавме автоскалер користејќи го MCS API. И бидејќи учиме како да работиме со податоци, решивме да покажеме како можете да напишете сличен автоскалер за ваши цели и да го користите со вашиот облак
Предуслови
Прво, мора да имате Hadoop кластер. На пример, ја користиме дистрибуцијата на HDP.
Со цел вашите јазли да бидат брзо додадени и отстранети, мора да имате одредена дистрибуција на улоги меѓу јазлите.
- Главен јазол. Па, нема потреба да се објаснува ништо посебно: главниот јазол на кластерот, на кој, на пример, се активира двигателот Spark, ако го користите интерактивниот режим.
- Датумски јазол. Ова е јазолот на кој складирате податоци на HDFS и каде што се вршат пресметките.
- Компјутерски јазол. Ова е јазол каде што не складирате ништо на HDFS, туку каде што се случуваат пресметките.
Важна точка. Автоматското скалирање ќе се случи поради јазли од третиот тип. Ако почнете да земате и додавате јазли од вториот тип, брзината на одговор ќе биде многу мала - деактивирањето и повторното вклучување ќе траат со часови на вашиот кластер. Ова, се разбира, не е она што го очекувате од автоматското скалирање. Тоа е, ние не ги допираме јазлите од првиот и вториот тип. Тие ќе претставуваат минимален остварлив кластер што ќе постои во текот на целото времетраење на програмата.
Значи, нашиот автоскалирач е напишан во Python 3, го користи Ambari API за управување со кластер услуги, користи
Архитектура на решенија
- Модул
autoscaler.py
. Содржи три класи: 1) функции за работа со Ambari, 2) функции за работа со MCS, 3) функции поврзани директно со логиката на автоскалерот. - Скрипта
observer.py
. Во суштина, тој се состои од различни правила: кога и во кои моменти да се повикаат функциите на автоскалерот. - Конфигурациска датотека
config.py
. Содржи, на пример, листа на јазли дозволени за автоматско скалирање и други параметри кои влијаат, на пример, на тоа колку долго да се чека од моментот кога е додаден нов јазол. Исто така, постојат временски ознаки за почеток на часовите, така што пред класата се активира максималната дозволена конфигурација на кластерот.
Ајде сега да ги погледнеме парчињата код во првите две датотеки.
1. Autoscaler.py модул
Час Амбари
Вака изгледа парче код што содржи класа Ambari
:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return message
Погоре, како пример, можете да ја погледнете имплементацијата на функцијата stop_all_services
, што ги запира сите услуги на саканиот јазол на кластерот.
На влезот во класот Ambari
поминуваш:
ambari_url
, на пример, како'http://localhost:8080/api/v1/clusters/'
,cluster_name
– името на вашиот кластер во Амбари,headers = {'X-Requested-By': 'ambari'}
- и внатре
auth
еве го вашето најавување и лозинка за Ambari:auth = ('login', 'password')
.
Самата функција не е ништо повеќе од неколку повици преку REST API до Ambari. Од логичка гледна точка, прво добиваме листа на активни услуги на јазол, а потоа бараме на даден кластер, на даден јазол, да се префрлат услугите од списокот во состојбата INSTALLED
. Функции за стартување на сите услуги, за пренос на јазли во состојба Maintenance
итн изгледаат слично - тие се само неколку барања преку API.
Класа Mcs
Вака изгледа парче код што содржи класа Mcs
:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_code
На влезот во класот Mcs
го пренесуваме проектниот проект во облакот и корисничкиот ID, како и неговата лозинка. Во функција vm_turn_on
сакаме да вклучиме една од машините. Логиката овде е малку покомплицирана. На почетокот на кодот се нарекуваат три други функции: 1) треба да добиеме токен, 2) треба да го конвертираме името на домаќинот во име на машината во MCS, 3) да го добиеме ID на оваа машина. Следно, ние едноставно правиме барање за објавување и ја стартуваме оваа машина.
Вака изгледа функцијата за добивање токен:
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.token
Класа на Autoscaler
Оваа класа содржи функции поврзани со самата оперативна логика.
Вака изгледа дел од кодот за оваа класа:
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return message
Ние прифаќаме часови за влез. Ambari
и Mcs
, список на јазли што се дозволени за скалирање, како и параметри за конфигурација на јазли: меморија и процесор доделени на јазолот во YARN. Има и 2 внатрешни параметри q_ram, q_cpu, кои се редици. Користејќи ги, ги складираме вредностите на тековното оптоварување на кластерот. Ако видиме дека во последните 5 минути има постојано зголемено оптоварување, тогаш одлучивме дека треба да додадеме +1 јазол во кластерот. Истото важи и за состојбата на недоволно искористување на кластерот.
Кодот погоре е пример за функција која отстранува машина од кластерот и ја запира во облакот. Прво има деактивирање YARN Nodemanager
, потоа режимот се вклучува Maintenance
, потоа ги запираме сите услуги на машината и ја исклучуваме виртуелната машина во облакот.
2. Набљудувач на скрипта.py
Примерок код од таму:
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)
Во него, проверуваме дали се создадени услови за зголемување на капацитетот на кластерот и дали има машини во резерва, го добиваме името на домаќинот на една од нив, го додаваме во кластерот и објавуваме порака за тоа на Slack на нашиот тим. По што започнува cooldown_period
, кога не додаваме или отстрануваме ништо од кластерот, туку едноставно го следиме оптоварувањето. Доколку се стабилизира и е во коридорот на оптималните вредности на оптоварување, тогаш едноставно продолжуваме со следењето. Ако еден јазол не беше доволен, тогаш додаваме уште еден.
За случаите кога ни претстои лекција, веќе со сигурност знаеме дека еден јазол нема да биде доволен, па веднаш ги стартуваме сите бесплатни јазли и ги одржуваме активни до крајот на часот. Ова се случува со помош на список на временски ознаки за активност.
Заклучок
Autoscaler е добро и практично решение за оние случаи кога доживувате нерамномерно вчитување на кластерот. Истовремено ја постигнувате саканата конфигурација на кластерот за врвни оптоварувања и во исто време не го задржувате овој кластер за време на недоволно оптоварување, заштедувајќи пари. Па, плус сето ова се случува автоматски без ваше учество. Самиот автоскалер не е ништо повеќе од збир на барања до API на менаџерот на кластерот и API на давателот на облак, напишани според одредена логика. Она што дефинитивно треба да го запомните е поделбата на јазлите на 3 типа, како што напишавме претходно. И ќе бидете среќни.
Извор: www.habr.com