Zdravo! Obučavamo ljude za rad s velikim podacima. Nemoguće je zamisliti obrazovni program o velikim podacima bez vlastitog klastera na kojem svi sudionici rade zajedno. Iz tog razloga ga naš program uvijek ima 🙂 Bavimo se njegovom konfiguracijom, podešavanjem i administracijom, a dečki tamo izravno pokreću MapReduce poslove i koriste Spark.
U ovom postu ćemo vam reći kako smo riješili problem neravnomjernog učitavanja klastera pisanjem vlastitog autoscalera pomoću oblaka
problem
Naš se klaster ne koristi u tipičnom načinu rada. Odlaganje je vrlo neravnomjerno. Recimo, postoji praktična nastava, kada svih 30 ljudi i profesor odu u klaster i počnu ga koristiti. Ili opet, ima dana prije roka kada se opterećenje jako poveća. Ostatak vremena klaster radi u načinu rada pod opterećenjem.
Rješenje broj 1 je zadržati klaster koji će izdržati vršna opterećenja, ali će ostatak vremena biti u stanju mirovanja.
Rješenje #2 je zadržati mali klaster, kojem ručno dodajete čvorove prije nastave i tijekom vršnih opterećenja.
Rješenje #3 je zadržati mali klaster i napisati autoscaler koji će pratiti trenutno opterećenje klastera i, koristeći različite API-je, dodavati i uklanjati čvorove iz klastera.
U ovom postu ćemo govoriti o rješenju #3. Ovaj autoscaler uvelike ovisi o vanjskim čimbenicima, a ne o internim, a pružatelji ga često ne pružaju. Koristimo infrastrukturu oblaka Mail.ru Cloud Solutions i napisali smo autoscaler koristeći MCS API. A budući da učimo kako raditi s podacima, odlučili smo pokazati kako možete napisati sličan autoscaler za vlastite potrebe i koristiti ga sa svojim oblakom
Preduvjeti
Prvo, morate imati Hadoop klaster. Na primjer, koristimo HDP distribuciju.
Kako bi se vaši čvorovi brzo dodavali i uklanjali, morate imati određenu raspodjelu uloga među čvorovima.
- Glavni čvor. Pa, nema potrebe posebno objašnjavati: glavni čvor klastera, na kojem se, na primjer, pokreće Spark driver, ako koristite interaktivni način rada.
- Datumski čvor. Ovo je čvor na kojem pohranjujete podatke o HDFS-u i gdje se odvijaju izračuni.
- Računalni čvor. Ovo je čvor gdje ne pohranjujete ništa na HDFS, ali gdje se događaju izračuni.
Važna točka. Automatsko skaliranje će se dogoditi zbog čvorova treće vrste. Ako počnete uzimati i dodavati čvorove druge vrste, brzina odgovora bit će vrlo niska - dekomisioniranje i ponovno uključivanje trajat će satima na vašem klasteru. To, naravno, nije ono što očekujete od automatskog skaliranja. Odnosno, ne diramo čvorove prve i druge vrste. Oni će predstavljati minimalni održivi klaster koji će postojati tijekom trajanja programa.
Dakle, naš autoscaler je napisan u Pythonu 3, koristi Ambari API za upravljanje uslugama klastera, koristi
Arhitektura rješenja
- Modul
autoscaler.py
. Sadrži tri klase: 1) funkcije za rad s Ambarijem, 2) funkcije za rad s MCS-om, 3) funkcije koje su izravno povezane s logikom autoscalera. - Skripta
observer.py
. U biti se sastoji od različitih pravila: kada i u kojim trenucima pozvati funkcije autoscalera. - Konfiguracijska datoteka
config.py
. Sadrži, na primjer, popis čvorova dopuštenih za automatsko skaliranje i druge parametre koji utječu na, na primjer, koliko dugo treba čekati od trenutka dodavanja novog čvora. Postoje i vremenske oznake za početak nastave, tako da se prije nastave pokreće maksimalno dopuštena konfiguracija klastera.
Pogledajmo sada dijelove koda unutar prve dvije datoteke.
1. Modul Autoscaler.py
klasa Ambari
Ovako izgleda dio koda koji sadrži klasu Ambari
:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return message
Gore, kao primjer, možete pogledati implementaciju funkcije stop_all_services
, koji zaustavlja sve usluge na željenom čvoru klastera.
Na ulazu u razred Ambari
ti prolaziš:
ambari_url
, na primjer, poput'http://localhost:8080/api/v1/clusters/'
,cluster_name
– naziv vašeg klastera u Ambariju,headers = {'X-Requested-By': 'ambari'}
- i iznutra
auth
ovdje je vaša prijava i lozinka za Ambari:auth = ('login', 'password')
.
Sama funkcija nije ništa više od nekoliko poziva putem REST API-ja prema Ambariju. S logičke točke gledišta, prvo primamo popis pokrenutih usluga na čvoru, a zatim tražimo od danog klastera, na danom čvoru, da prenese usluge s popisa u stanje INSTALLED
. Funkcije za pokretanje svih servisa, za prijenos čvorova u stanje Maintenance
itd. izgledaju slično - to je samo nekoliko zahtjeva putem API-ja.
Klasa Mcs
Ovako izgleda dio koda koji sadrži klasu Mcs
:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_code
Na ulazu u razred Mcs
prosljeđujemo ID projekta unutar oblaka i ID korisnika, kao i njegovu lozinku. U funkciji vm_turn_on
želimo uključiti jedan od strojeva. Logika je ovdje malo kompliciranija. Na početku koda pozivaju se tri druge funkcije: 1) trebamo dobiti token, 2) trebamo pretvoriti naziv hosta u naziv stroja u MCS-u, 3) dobiti id ovog stroja. Zatim jednostavno napravimo zahtjev za objavu i pokrenemo ovaj stroj.
Ovako izgleda funkcija za dobivanje tokena:
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.token
Klasa autoscalera
Ova klasa sadrži funkcije povezane sa samom operativnom logikom.
Ovako izgleda dio koda za ovu klasu:
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return message
Primamo tečajeve za upis. Ambari
и Mcs
, popis čvorova koji su dopušteni za skaliranje, kao i konfiguracijski parametri čvora: memorija i CPU dodijeljeni čvoru u YARN-u. Također postoje 2 interna parametra q_ram, q_cpu, koji su redovi čekanja. Koristeći ih, pohranjujemo vrijednosti trenutnog opterećenja klastera. Ako vidimo da je tijekom zadnjih 5 minuta postojalo stalno povećano opterećenje, tada odlučujemo da trebamo dodati +1 čvor u klaster. Isto vrijedi i za stanje podiskorištenosti klastera.
Gornji kod primjer je funkcije koja uklanja stroj iz klastera i zaustavlja ga u oblaku. Prvo dolazi do razgradnje YARN Nodemanager
, tada se uključuje način rada Maintenance
, tada zaustavljamo sve usluge na stroju i isključujemo virtualni stroj u oblaku.
2. Skripta promatrač.py
Ogledni kod od tamo:
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)
U njemu provjeravamo jesu li stvoreni uvjeti za povećanje kapaciteta klastera i ima li strojeva u rezervi, dobivamo hostname jednog od njih, dodajemo ga u klaster i o tome objavljujemo poruku na Slacku našeg tima. Nakon čega počinje cooldown_period
, kada ništa ne dodajemo niti uklanjamo iz klastera, već samo pratimo opterećenje. Ako se stabilizirao i nalazi se u koridoru optimalnih vrijednosti opterećenja, onda jednostavno nastavljamo praćenje. Ako jedan čvor nije bio dovoljan, tada dodajemo još jedan.
Za slučajeve kada nas čeka lekcija, već sigurno znamo da jedan čvor neće biti dovoljan, pa odmah pokrećemo sve slobodne čvorove i držimo ih aktivnima do kraja lekcije. To se događa pomoću popisa vremenskih oznaka aktivnosti.
Zaključak
Autoscaler je dobro i praktično rješenje za one slučajeve kada imate neravnomjerno učitavanje klastera. Istovremeno postižete željenu konfiguraciju klastera za vršna opterećenja i u isto vrijeme ne zadržavate ovaj klaster tijekom podopterećenja, štedeći novac. Pa, plus to se sve događa automatski bez vašeg sudjelovanja. Sam autoscaler nije ništa više od skupa zahtjeva za API upravitelja klastera i API pružatelja usluga oblaka, napisanih prema određenoj logici. Ono što svakako trebate zapamtiti je podjela čvorova na 3 vrste, kao što smo već napisali. I bit ćete sretni.
Izvor: www.habr.com