Tere! Koolitame inimesi suurandmetega töötama. Suurandmete alast haridusprogrammi on võimatu ette kujutada ilma oma klastrita, mille kallal kõik osalejad koos töötavad. Sel põhjusel on see meie programmis alati olemas 🙂 Tegeleme selle seadistamise, häälestamise ja administreerimisega ning kutid käivitavad seal otse MapReduce'i töid ja kasutavad Sparki.
Selles postituses räägime teile, kuidas lahendasime klastrite ebaühtlase laadimise probleemi, kirjutades pilve abil oma automaatskaalari
probleem
Meie klastrit ei kasutata tüüpilises režiimis. Kõrvaldamine on väga ebaühtlane. Näiteks on praktilised tunnid, kui kõik 30 inimest koos õpetajaga lähevad klastrisse ja hakkavad seda kasutama. Või jällegi on päevi enne tähtaega, mil koormus kasvab kõvasti. Ülejäänud aja töötab klaster alakoormusrežiimis.
Lahendus nr 1 on hoida klaster, mis peab vastu tippkoormusele, kuid on ülejäänud aja jõude.
Lahendus nr 2 on väikese klastri säilitamine, kuhu lisate käsitsi sõlmed enne tunde ja tippkoormuse ajal.
Lahendus nr 3 on hoida väikest klastrit ja kirjutada automaatne skaleerija, mis jälgib klastri praegust koormust ning erinevate API-de abil lisab ja eemaldab klastrist sõlme.
Selles postituses räägime lahendusest nr 3. See automaatne skaleerija sõltub pigem välistest kui sisemistest teguritest ja teenusepakkujad seda sageli ei paku. Kasutame Mail.ru Cloud Solutionsi pilveinfrastruktuuri ja kirjutasime MCS API abil automaatskaalari. Ja kuna me õpetame andmetega töötamist, otsustasime näidata, kuidas saate kirjutada sarnase automaatskaalaeri enda tarbeks ja kasutada seda oma pilvega
Eeldused
Esiteks peab teil olema Hadoopi klaster. Näiteks kasutame HDP-jaotust.
Teie sõlmede kiireks lisamiseks ja eemaldamiseks peavad teil olema sõlmede vahel teatud rollide jaotus.
- Peasõlm. Noh, pole vaja midagi eriti selgitada: klastri põhisõlm, millel käivitatakse näiteks Sparki draiver, kui kasutate interaktiivset režiimi.
- Kuupäeva sõlm. See on sõlm, kuhu salvestate HDFS-i andmeid ja kus tehakse arvutusi.
- Arvutussõlm. See on sõlm, kus te ei salvesta midagi HDFS-i, kuid kus toimuvad arvutused.
Oluline punkt. Automaatne skaleerimine toimub kolmandat tüüpi sõlmede tõttu. Kui alustate teist tüüpi sõlmede võtmist ja lisamist, on reageerimiskiirus väga madal – dekomisjoneerimine ja uuesti kasutuselevõtmine võtab teie klastris tunde. See pole muidugi see, mida te automaatselt skaleerimiselt ootate. See tähendab, et me ei puuduta esimest ja teist tüüpi sõlmi. Need kujutavad endast minimaalset elujõulist klastrit, mis eksisteerib kogu programmi kestuse jooksul.
Niisiis, meie automaatne skaleerija on kirjutatud Python 3-s, kasutab klastriteenuste haldamiseks Ambari API-d,
Lahenduse arhitektuur
- Moodul
autoscaler.py
. See sisaldab kolme klassi: 1) funktsioonid Ambariga töötamiseks, 2) funktsioonid MCS-iga töötamiseks, 3) funktsioonid, mis on otseselt seotud automaatse skaleerija loogikaga. - Skript
observer.py
. Põhimõtteliselt koosneb see erinevatest reeglitest: millal ja millistel hetkedel kutsuda automaatskaleri funktsioone. - Konfiguratsioonifail
config.py
. See sisaldab näiteks loendit sõlmedest, mis on lubatud automaatsel skaleerimisel, ja muid parameetreid, mis mõjutavad näiteks seda, kui kaua oodata uue sõlme lisamise hetkest. Tundide alguseks on ka ajatemplid, et enne tundi käivitub maksimaalne lubatud klastri konfiguratsioon.
Vaatame nüüd kahe esimese faili sees olevaid kooditükke.
1. Moodul Autoscaler.py
Ambari klass
Selline näeb välja klassi sisaldav kooditükk Ambari
:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return message
Ülaltoodud näitena saate vaadata funktsiooni rakendamist stop_all_services
, mis peatab kõik soovitud klastri sõlme teenused.
Klassi sissepääsu juures Ambari
sa läbid:
ambari_url
näiteks meeldib'http://localhost:8080/api/v1/clusters/'
,cluster_name
– teie klastri nimi Ambaris,headers = {'X-Requested-By': 'ambari'}
- ja sees
auth
siin on teie Ambari sisselogimine ja parool:auth = ('login', 'password')
.
Funktsioon ise ei ole midagi muud kui paar kõnet REST API kaudu Ambarile. Loogilisest vaatenurgast saame esmalt sõlmes töötavate teenuste loendi ja seejärel palume antud klastris antud sõlmel teenused loendist olekusse üle kanda. INSTALLED
. Funktsioonid kõigi teenuste käivitamiseks, sõlmede olekusse ülekandmiseks Maintenance
jms näevad välja sarnased – need on vaid mõned API kaudu tehtud taotlused.
klassi Mcs
Selline näeb välja klassi sisaldav kooditükk Mcs
:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_code
Klassi sissepääsu juures Mcs
edastame projekti ID pilve sees ja kasutaja ID, samuti tema parooli. Funktsioonis vm_turn_on
tahame ühe masina sisse lülitada. Loogika on siin veidi keerulisem. Koodi alguses kutsutakse välja veel kolm funktsiooni: 1) peame hankima märgi, 2) peame teisendama hostinime MCS-is oleva masina nimeks, 3) saama selle masina id. Järgmiseks teeme lihtsalt postitaotluse ja käivitame selle masina.
Tokeni saamise funktsioon näeb välja järgmine:
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.token
Autoscaleri klass
See klass sisaldab funktsioone, mis on seotud tööloogika endaga.
Selle klassi koodiosa näeb välja selline:
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return message
Võtame vastu klasse sisenemiseks. Ambari
и Mcs
, skaleerimiseks lubatud sõlmede loend, samuti sõlmede konfiguratsiooniparameetrid: YARN-i sõlmele eraldatud mälu ja protsessor. Samuti on 2 sisemist parameetrit q_ram, q_cpu, mis on järjekorrad. Neid kasutades salvestame praeguse klastri koormuse väärtused. Kui näeme, et viimase 5 minuti jooksul on koormus pidevalt suurenenud, siis otsustame, et peame klastrisse lisama +1 sõlme. Sama kehtib ka klastri alakasutamise oleku kohta.
Ülaltoodud kood on näide funktsioonist, mis eemaldab masina klastrist ja peatab selle pilves. Esiteks toimub dekomisjoneerimine YARN Nodemanager
, siis lülitub režiim sisse Maintenance
, siis peatame kõik teenused masinas ja lülitame virtuaalmasina pilves välja.
2. Skripti vaatleja.py
Näidiskood sealt:
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)
Selles kontrollime, kas klastri läbilaskevõime suurendamiseks on loodud tingimused ja kas masinaid on reservis, hangime neist ühe hostinime, lisame selle klastrisse ja avaldame selle kohta teate meie meeskonna Slackis. Pärast mida see algab cooldown_period
, kui me ei lisa ega eemalda klastrist midagi, vaid lihtsalt jälgime koormust. Kui see on stabiliseerunud ja jääb optimaalsete koormusväärtuste koridori, siis jätkame lihtsalt jälgimist. Kui ühest sõlmest ei piisanud, lisame teise.
Juhtudeks, kui meil on ees õppetund, teame juba kindlalt, et ühest sõlmest ei piisa, seega käivitame kohe kõik vabad sõlmed ja hoiame neid aktiivsena kuni tunni lõpuni. See juhtub tegevuste ajatemplite loendi abil.
Järeldus
Autoscaler on hea ja mugav lahendus nendeks puhkudeks, kui kogete ebaühtlast klastri laadimist. Samal ajal saavutate soovitud klastri konfiguratsiooni tippkoormuse jaoks ja samal ajal ei hoia seda klastrit alakoormuse ajal, säästes raha. Noh, see kõik juhtub automaatselt ilma teie osaluseta. Automaatne skaleerija ise pole midagi muud kui teatud loogika järgi kirjutatud taotluste kogum klastrihalduri API-le ja pilvepakkuja API-le. Mida peate kindlasti meeles pidama, on sõlmede jagamine 3 tüüpi, nagu me varem kirjutasime. Ja sa saad õnnelikuks.
Allikas: www.habr.com