Hei! Koulutamme ihmisiä työskentelemään big datan parissa. On mahdotonta kuvitella suurdataa käsittelevää koulutusohjelmaa ilman omaa klusteriaan, jossa kaikki osallistujat työskentelevät yhdessä. Tästä syystä ohjelmassamme on se aina 🙂 Olemme mukana sen konfiguroinnissa, virittämisessä ja hallinnassa, ja kaverit käynnistävät siellä suoraan MapReduce-töitä ja käyttävät Sparkia.
Tässä viestissä kerromme, kuinka ratkaisimme klusterin epätasaisen latauksen ongelman kirjoittamalla oman autoskaalaimemme pilven avulla
ongelma
Klusteriamme ei käytetä tyypillisessä tilassa. Hävittäminen on erittäin epätasaista. Esimerkiksi on käytännön tunteja, jolloin kaikki 30 henkilöä ja opettaja menevät klusteriin ja alkavat käyttää sitä. Tai taas on päiviä ennen määräaikaa, jolloin kuormitus kasvaa huomattavasti. Muun ajan klusteri toimii alikuormitustilassa.
Ratkaisu #1 on pitää klusteri, joka kestää huippukuormitukset, mutta on käyttämättömänä muun ajan.
Ratkaisu #2 on säilyttää pieni klusteri, johon lisäät manuaalisesti solmuja ennen luokkia ja huippukuormituksen aikana.
Ratkaisu #3 on pitää pieni klusteri ja kirjoittaa automaattinen skaalaaja, joka tarkkailee klusterin nykyistä kuormitusta ja lisää ja poistaa solmuja klusterista erilaisten API:iden avulla.
Tässä viestissä puhumme ratkaisusta #3. Tämä automaattinen skaalaus on erittäin riippuvainen ulkoisista tekijöistä pikemminkin kuin sisäisistä, eivätkä palveluntarjoajat usein tarjoa sitä. Käytämme Mail.ru Cloud Solutions -pilviinfrastruktuuria ja kirjoitimme automaattisen skaalaimen MCS API:lla. Ja koska opetamme työskentelemään datan kanssa, päätimme näyttää, kuinka voit kirjoittaa samanlaisen automaattisen skaalaimen omiin tarkoituksiin ja käyttää sitä pilvesi kanssa.
Edellytykset
Ensinnäkin sinulla on oltava Hadoop-klusteri. Käytämme esimerkiksi HDP-jakelua.
Jotta solmuja voidaan lisätä ja poistaa nopeasti, sinulla on oltava tietty roolijakauma solmujen kesken.
- Pääsolmu. No, ei tarvitse selittää mitään erityisemmin: klusterin pääsolmu, johon esimerkiksi Spark-ajuri käynnistetään, jos käytät interaktiivista tilaa.
- Päivämääräsolmu. Tämä on solmu, johon tallennat tietoja HDFS:ään ja jossa laskelmat suoritetaan.
- Laskentasolmu. Tämä on solmu, jossa et tallenna mitään HDFS:ään, mutta jossa laskelmat tapahtuvat.
Tärkeä pointti. Automaattinen skaalaus tapahtuu kolmannen tyypin solmujen vuoksi. Jos alat ottaa ja lisätä toisen tyyppisiä solmuja, vastenopeus on erittäin alhainen - käytöstä poistaminen ja uudelleensovittaminen vie klusterillasi tunteja. Tämä ei tietenkään ole sitä, mitä odotat automaattisesta skaalauksesta. Eli emme kosketa ensimmäisen ja toisen tyypin solmuja. Ne edustavat vähintään elinkelpoista klusteria, joka on olemassa koko ohjelman ajan.
Joten automaattinen skaalaajamme on kirjoitettu Python 3:ssa, käyttää Ambari API:ta klusteripalvelujen hallintaan, käyttää
Ratkaisuarkkitehtuuri
- Moduuli
autoscaler.py
. Se sisältää kolme luokkaa: 1) Ambarin kanssa työskentelyyn tarkoitetut toiminnot, 2) MCS:n kanssa työskentelyn toiminnot, 3) toiminnot, jotka liittyvät suoraan automaattisen skaalaimen logiikkaan. - Käsikirjoitus
observer.py
. Pohjimmiltaan se koostuu erilaisista säännöistä: milloin ja millä hetkillä autoscaler-toimintoja kutsutaan. - Asetustiedosto
config.py
. Se sisältää esimerkiksi luettelon solmuista, jotka sallitaan automaattisella skaalauksella, ja muita parametreja, jotka vaikuttavat esimerkiksi siihen, kuinka kauan odottaa uuden solmun lisäyshetkestä. Tuntien alkamiseen on myös aikaleimat, jotta ennen luokkaa käynnistetään suurin sallittu klusterikonfiguraatio.
Katsotaan nyt kahden ensimmäisen tiedoston sisällä olevia koodinpätkiä.
1. Autoscaler.py-moduuli
Ambari luokka
Tältä näyttää luokan sisältävä koodinpätkä Ambari
:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return message
Yllä esimerkkinä voit tarkastella toiminnon toteutusta stop_all_services
, joka pysäyttää kaikki palvelut halutussa klusterisolmussa.
Luokan sisäänkäynnillä Ambari
ohitat:
ambari_url
esimerkiksi kuten'http://localhost:8080/api/v1/clusters/'
,cluster_name
– klusterin nimi Ambarissa,headers = {'X-Requested-By': 'ambari'}
- ja sisällä
auth
tässä on Ambari-tunnuksesi ja salasanasi:auth = ('login', 'password')
.
Itse toiminto ei ole muuta kuin pari kutsua REST API:n kautta Ambarille. Loogisesta näkökulmasta katsottuna saamme ensin luettelon solmussa käynnissä olevista palveluista ja sitten pyydämme tietyssä klusterissa, tietyssä solmussa siirtämään palvelut luettelosta tilaan. INSTALLED
. Toiminnot kaikkien palveluiden käynnistämiseen, solmujen siirtämiseen tilaan Maintenance
jne. näyttävät samanlaisilta - ne ovat vain muutamia pyyntöjä API:n kautta.
Luokan Mcs
Tältä näyttää luokan sisältävä koodinpätkä Mcs
:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_code
Luokan sisäänkäynnillä Mcs
välitämme projektin tunnuksen pilven sisällä ja käyttäjätunnuksen sekä hänen salasanansa. Toiminnassa vm_turn_on
haluamme käynnistää yhden koneista. Logiikka tässä on hieman monimutkaisempi. Koodin alussa kutsutaan kolme muuta toimintoa: 1) meidän täytyy saada token, 2) meidän on muutettava isäntänimi koneen nimeksi MCS:ssä, 3) hankittava tämän koneen tunnus. Seuraavaksi teemme vain postituspyynnön ja käynnistämme tämän koneen.
Tokenin hankkimistoiminto näyttää tältä:
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.token
Autoscaler-luokka
Tämä luokka sisältää itse toimintalogiikkaan liittyviä toimintoja.
Tämän luokan koodinpätkä näyttää tältä:
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return message
Otamme kursseja vastaan. Ambari
и Mcs
, luettelo solmuista, jotka ovat sallittuja skaalattavaksi, sekä solmun kokoonpanoparametrit: YARN:n solmulle varattu muisti ja suoritin. On myös 2 sisäistä parametria q_ram, q_cpu, jotka ovat jonoja. Niiden avulla tallennamme nykyisen klusterin kuormituksen arvot. Jos näemme, että viimeisten 5 minuutin aikana kuormitus on jatkuvasti lisääntynyt, päätämme, että meidän on lisättävä +1-solmu klusteriin. Sama pätee klusterin vajaakäytön tilaan.
Yllä oleva koodi on esimerkki funktiosta, joka poistaa koneen klusterista ja pysäyttää sen pilveen. Ensin on käytöstäpoisto YARN Nodemanager
, tila kytkeytyy päälle Maintenance
, sitten pysäytämme kaikki palvelut koneella ja sammutamme virtuaalikoneen pilvessä.
2. Käsikirjoituksen tarkkailija.py
Esimerkkikoodi sieltä:
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)
Siinä tarkistamme, onko klusterin kapasiteetin lisäämiselle luotu edellytykset ja onko koneita varassa, hankimme niistä yhden isäntänimen, lisäämme sen klusteriin ja julkaisemme siitä viestin tiimimme Slackiin. Sen jälkeen se alkaa cooldown_period
, kun emme lisää tai poista klusteriin mitään, vaan valvomme vain kuormitusta. Jos se on tasaantunut ja on optimaalisten kuormitusarvojen käytävän sisällä, jatkamme vain seurantaa. Jos yksi solmu ei riittänyt, lisäämme toisen.
Tapauksissa, joissa meillä on oppitunti edessä, tiedämme jo varmasti, että yksi solmu ei riitä, joten käynnistämme välittömästi kaikki vapaat solmut ja pidämme ne aktiivisina oppitunnin loppuun asti. Tämä tapahtuu toimintojen aikaleimaluettelon avulla.
Johtopäätös
Autoscaler on hyvä ja kätevä ratkaisu tapauksiin, joissa klusterin kuormitus on epätasaista. Saavutat samanaikaisesti halutun klusterin konfiguraation huippukuormituksille ja samalla et säilytä tätä klusteria alikuormituksen aikana, mikä säästää rahaa. No, ja tämä kaikki tapahtuu automaattisesti ilman osallistumistasi. Itse automaattinen skaalaaja ei ole muuta kuin joukko pyyntöjä klusterinhallinnan API:lle ja pilvipalveluntarjoajan API:lle, jotka on kirjoitettu tietyn logiikan mukaisesti. Sinun on ehdottomasti muistettava solmujen jakaminen 3 tyyppiin, kuten kirjoitimme aiemmin. Ja sinä tulet olemaan onnellinen.
Lähde: will.com