Ahoj! Školíme ľudí na prácu s veľkými dátami. Vzdelávací program o veľkých dátach si nemožno predstaviť bez vlastného klastra, na ktorom spolupracujú všetci účastníci. Z tohto dôvodu ho náš program vždy má 🙂 Venujeme sa jeho konfigurácii, ladeniu a správe a chalani tam priamo spúšťajú úlohy MapReduce a používajú Spark.
V tomto príspevku vám povieme, ako sme vyriešili problém nerovnomerného načítania klastra napísaním vlastného automatického škálovača pomocou cloudu
problém
Náš klaster sa nepoužíva v typickom režime. Likvidácia je veľmi nerovnomerná. Existujú napríklad praktické hodiny, keď všetkých 30 ľudí a učiteľ idú do klastra a začnú ho používať. Alebo opäť existujú dni pred termínom, kedy sa zaťaženie výrazne zvyšuje. Zvyšok času klaster pracuje v režime nedostatočného zaťaženia.
Riešením č. 1 je zachovať klaster, ktorý vydrží špičkové zaťaženie, ale zvyšok času bude nečinný.
Riešením č. 2 je ponechať malý klaster, do ktorého ručne pridávate uzly pred triedami a počas špičkového zaťaženia.
Riešením č. 3 je ponechať malý klaster a napísať automatický škálovač, ktorý bude monitorovať aktuálne zaťaženie klastra a pomocou rôznych API pridávať a odstraňovať uzly z klastra.
V tomto príspevku budeme hovoriť o riešení #3. Tento autoscaler je vo veľkej miere závislý skôr od vonkajších faktorov ako od vnútorných a poskytovatelia ho často neposkytujú. Používame cloudovú infraštruktúru Mail.ru Cloud Solutions a napísali sme autoscaler pomocou MCS API. A keďže učíme pracovať s dátami, rozhodli sme sa ukázať, ako si môžete podobný autoscaler napísať pre vlastné účely a použiť ho s vašim cloudom
Predpoklady
Najprv musíte mať klaster Hadoop. Napríklad používame distribúciu HDP.
Aby bolo možné rýchlo pridávať a odstraňovať vaše uzly, musíte mať medzi uzlami určité rozdelenie rolí.
- Hlavný uzol. Nie je potrebné nič špeciálne vysvetľovať: hlavný uzol klastra, na ktorom sa napríklad spúšťa ovládač Spark, ak používate interaktívny režim.
- Dátumový uzol. Toto je uzol, na ktorom ukladáte dáta na HDFS a kde prebiehajú výpočty.
- Výpočtový uzol. Toto je uzol, kde neukladáte nič na HDFS, ale kde prebiehajú výpočty.
Dôležitý bod. Automatické škálovanie nastane v dôsledku uzlov tretieho typu. Ak začnete brať a pridávať uzly druhého typu, rýchlosť odozvy bude veľmi nízka – vyradenie z prevádzky a opätovné spustenie bude na vašom klastri trvať hodiny. To, samozrejme, nie je to, čo od automatického škálovania očakávate. To znamená, že sa nedotýkame uzlov prvého a druhého typu. Budú predstavovať minimálny životaschopný klaster, ktorý bude existovať počas trvania programu.
Takže náš automatický škálovač je napísaný v Pythone 3, používa Ambari API na správu klastrových služieb
Architektúra riešenia
- Modul
autoscaler.py
. Obsahuje tri triedy: 1) funkcie pre prácu s Ambari, 2) funkcie pre prácu s MCS, 3) funkcie súvisiace priamo s logikou autoscaleru. - Skript
observer.py
. V podstate pozostáva z rôznych pravidiel: kedy a v ktorých momentoch volať funkcie automatického škálovača. - Konfiguračný súbor
config.py
. Obsahuje napríklad zoznam uzlov povolených pre automatické škálovanie a ďalšie parametre, ktoré ovplyvňujú napríklad to, ako dlho sa bude čakať od pridania nového uzla. K dispozícii sú aj časové pečiatky pre začiatok tried, aby sa pred triedou spustila maximálna povolená konfigurácia klastra.
Pozrime sa teraz na časti kódu v prvých dvoch súboroch.
1. Modul Autoscaler.py
trieda Ambari
Takto vyzerá časť kódu obsahujúca triedu Ambari
:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return message
Vyššie sa ako príklad môžete pozrieť na implementáciu funkcie stop_all_services
, ktorý zastaví všetky služby na požadovanom uzle klastra.
Pri vchode do triedy Ambari
prejdeš:
ambari_url
, napríklad, ako'http://localhost:8080/api/v1/clusters/'
,cluster_name
– názov vášho klastra v Ambari,headers = {'X-Requested-By': 'ambari'}
- a vnútri
auth
tu je vaše používateľské meno a heslo pre Ambari:auth = ('login', 'password')
.
Samotná funkcia nie je nič iné ako pár hovorov cez REST API do Ambari. Z logického hľadiska najprv dostaneme zoznam spustených služieb na uzle a potom požiadame na danom klastri, na danom uzle, aby sme preniesli služby zo zoznamu do stavu INSTALLED
. Funkcie na spustenie všetkých služieb, na prenos uzlov do stavu Maintenance
atď vyzerajú podobne - je to len niekoľko požiadaviek cez API.
Class Mcs
Takto vyzerá časť kódu obsahujúca triedu Mcs
:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_code
Pri vchode do triedy Mcs
odovzdávame ID projektu v rámci cloudu a ID používateľa, ako aj jeho heslo. Vo funkcii vm_turn_on
chceme zapnúť jeden zo strojov. Logika je tu trochu komplikovanejšia. Na začiatku kódu sa volajú ďalšie tri funkcie: 1) musíme získať token, 2) musíme previesť názov hostiteľa na názov stroja v MCS, 3) získať id tohto stroja. Potom jednoducho požiadame o príspevok a spustíme tento stroj.
Takto vyzerá funkcia získania tokenu:
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.token
Trieda Autoscaler
Táto trieda obsahuje funkcie súvisiace so samotnou prevádzkovou logikou.
Takto vyzerá časť kódu pre túto triedu:
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return message
Prijímame triedy na vstup. Ambari
и Mcs
, zoznam uzlov, ktoré sú povolené na škálovanie, ako aj konfiguračné parametre uzla: pamäť a procesor pridelené uzlu v YARN. Existujú aj 2 interné parametre q_ram, q_cpu, čo sú fronty. Pomocou nich ukladáme hodnoty aktuálneho zaťaženia klastra. Ak vidíme, že za posledných 5 minút dochádzalo k sústavne zvýšenému zaťaženiu, potom sa rozhodneme, že musíme do klastra pridať uzol +1. To isté platí pre stav nedostatočného využitia klastra.
Vyššie uvedený kód je príkladom funkcie, ktorá odstráni počítač z klastra a zastaví ho v cloude. Najprv dôjde k vyradeniu z prevádzky YARN Nodemanager
, potom sa režim zapne Maintenance
, potom zastavíme všetky služby na stroji a vypneme virtuálny stroj v cloude.
2. Pozorovateľ skriptu.py
Vzorový kód odtiaľ:
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)
V ňom skontrolujeme, či sú vytvorené podmienky na zvýšenie kapacity klastra a či sú v zálohe nejaké stroje, získame názov hostiteľa jedného z nich, pridáme ho do klastra a zverejníme o tom správu na Slacku nášho tímu. Po ktorej to začne cooldown_period
, kedy z klastra nič nepridávame ani neuberáme, ale jednoducho sledujeme záťaž. Ak sa stabilizoval a je v koridore optimálnych hodnôt zaťaženia, tak jednoducho pokračujeme v monitorovaní. Ak jeden uzol nestačil, pridáme ďalší.
Pre prípady, keď nás čaká lekcia, už s istotou vieme, že jeden uzol nebude stačiť, preto okamžite spustíme všetky voľné uzly a necháme ich aktívne až do konca hodiny. To sa deje pomocou zoznamu časových pečiatok aktivity.
Záver
Autoscaler je dobré a pohodlné riešenie pre prípady, keď sa stretnete s nerovnomerným zaťažením klastra. Súčasne dosiahnete požadovanú konfiguráciu klastra pre špičkové zaťaženie a zároveň neuchováte tento klaster počas nedostatočného zaťaženia, čím ušetríte peniaze. No a toto všetko sa deje automaticky bez vašej účasti. Samotný autoscaler nie je nič iné ako súbor požiadaviek na API správcu klastra a API poskytovateľa cloudu, napísaný podľa určitej logiky. Čo si určite musíte zapamätať, je rozdelenie uzlov na 3 typy, ako sme písali skôr. A budete šťastní.
Zdroj: hab.com