Helló! Megtanítjuk az embereket a nagy adatokkal való munkavégzésre. Lehetetlen elképzelni egy big data oktatási programot saját klaszter nélkül, amelyen minden résztvevő együtt dolgozik. Emiatt a mi programunkban mindig van 🙂 Mi foglalkozunk a konfigurálásával, tuningolásával és adminisztrációjával, a srácok pedig közvetlenül elindítják ott a MapReduce munkákat és a Sparkot használják.
Ebben a bejegyzésben elmeséljük, hogyan oldottuk meg az egyenetlen fürtbetöltés problémáját úgy, hogy saját autoscalerünket írtuk a felhő segítségével
probléma
Klaszterünket nem egy tipikus módban használják. Az ártalmatlanítás nagyon egyenetlen. Például vannak gyakorlati órák, amikor mind a 30 ember és egy tanár bemegy a klaszterbe és elkezdi használni. Vagy megint vannak napok a határidő előtt, amikor nagyon megnő a terhelés. Az idő további részében a fürt alulterhelés üzemmódban működik.
Az 1. megoldás egy olyan fürt megtartása, amely ellenáll a csúcsterhelésnek, de a hátralévő időben tétlen lesz.
A 2. megoldás egy kis fürt megtartása, amelyhez manuálisan kell csomópontokat hozzáadni az órák előtt és a csúcsterhelés alatt.
A 3. megoldás egy kis fürt megtartása és egy automatikus skálázó írása, amely figyeli a fürt aktuális terhelését, és különféle API-k segítségével csomópontokat ad hozzá és távolít el a fürtből.
Ebben a bejegyzésben a 3. megoldásról fogunk beszélni. Ez az automatikus skálázó nagymértékben függ a külső tényezőktől, nem pedig a belsőektől, és a szolgáltatók gyakran nem biztosítják. A Mail.ru Cloud Solutions felhőinfrastruktúrát használjuk, és az MCS API-t használva automatikus skálázót írtunk. És mivel megtanítjuk, hogyan kell dolgozni az adatokkal, úgy döntöttünk, hogy megmutatjuk, hogyan írhat egy hasonló automatikus skálázót saját céljaira, és használhatja azt a felhőben.
Előfeltételek
Először is rendelkeznie kell egy Hadoop-fürttel. Például a HDP disztribúciót használjuk.
A csomópontok gyors hozzáadásához és eltávolításához a csomópontok között bizonyos szereposztásra van szükség.
- Főcsomópont. Nos, nem kell különösebben magyarázni: a fürt fő csomópontja, amelyen például a Spark illesztőprogram indul el, ha interaktív módot használunk.
- Dátum csomópont. Ez az a csomópont, amelyen adatokat tárol a HDFS-en, és ahol a számítások zajlanak.
- Számítási csomópont. Ez egy csomópont, ahol nem tárol semmit a HDFS-en, de ahol számítások történnek.
Fontos pont. Az automatikus skálázás a harmadik típusú csomópontok miatt történik. Ha elkezdi a második típusú csomópontok felvételét és hozzáadását, a válaszsebesség nagyon alacsony lesz - a leszerelés és az újraaktiválás órákig tart a fürtben. Ez természetesen nem az, amit az automatikus skálázástól elvár. Vagyis nem érintjük meg az első és a második típusú csomópontokat. Egy minimális életképes klasztert képviselnek, amely a program teljes időtartama alatt létezik.
Tehát az autoscalerünk Python 3-ban van írva, az Ambari API-t használja a fürtszolgáltatások kezelésére,
Megoldás architektúra
- Modul
autoscaler.py
. Három osztályt tartalmaz: 1) az Ambarival való munkavégzésre szolgáló funkciók, 2) az MCS-sel való munkavégzés funkciói, 3) az automatikus skálázó logikájához közvetlenül kapcsolódó funkciók. - Forgatókönyv
observer.py
. Lényegében különböző szabályokból áll: mikor és mikor kell meghívni az autoscaler függvényeket. - Konfigurációs fájl
config.py
. Tartalmazza például az automatikus skálázásra engedélyezett csomópontok listáját és egyéb paramétereket, amelyek például befolyásolják, hogy mennyi ideig kell várni az új csomópont hozzáadása pillanatától számítva. Vannak időbélyegek is az órák kezdetére, így az óra előtt elindul a maximálisan megengedett fürtkonfiguráció.
Nézzük most az első két fájlban található kódrészleteket.
1. Autoscaler.py modul
Ambari osztály
Így néz ki egy osztályt tartalmazó kódrészlet Ambari
:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return message
Fent, példaként, megnézheti a függvény megvalósítását stop_all_services
, amely leállítja az összes szolgáltatást a kívánt fürtcsomóponton.
Az osztály bejáratánál Ambari
átpasszol:
ambari_url
például tetszik'http://localhost:8080/api/v1/clusters/'
,cluster_name
– a klaszter neve Ambari nyelven,headers = {'X-Requested-By': 'ambari'}
- és belül
auth
itt van az Ambari felhasználóneve és jelszava:auth = ('login', 'password')
.
Maga a funkció nem más, mint néhány hívás a REST API-n keresztül az Ambari felé. Logikai szempontból először megkapjuk a futó szolgáltatások listáját egy csomóponton, majd egy adott klaszteren, egy adott csomóponton megkérjük, hogy a szolgáltatásokat a listából az állapotba vigyük át. INSTALLED
. Funkciók az összes szolgáltatás elindításához, a csomópontok állapotba átviteléhez Maintenance
stb. hasonlóan néznek ki – ezek csak néhány kérés az API-n keresztül.
osztály Mcs
Így néz ki egy osztályt tartalmazó kódrészlet Mcs
:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_code
Az osztály bejáratánál Mcs
átadjuk a projektazonosítót a felhőn belül és a felhasználói azonosítót, valamint a jelszavát. Funkcióban vm_turn_on
be akarjuk kapcsolni az egyik gépet. A logika itt egy kicsit bonyolultabb. A kód elején még három függvényt hívunk meg: 1) kapnunk kell egy tokent, 2) a gazdagépnevet át kell alakítanunk a gép nevére az MCS-ben, 3) meg kell szereznünk ennek a gépnek az azonosítóját. Ezután egyszerűen küldünk egy bejegyzési kérelmet, és elindítjuk a gépet.
Így néz ki a token megszerzésére szolgáló függvény:
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.token
Autoscaler osztály
Ez az osztály magával a működési logikával kapcsolatos függvényeket tartalmazza.
Így néz ki egy kódrészlet ehhez az osztályhoz:
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return message
Belépő órákat elfogadunk. Ambari
и Mcs
, a méretezésre engedélyezett csomópontok listája, valamint a csomópont konfigurációs paraméterei: a YARN csomópontjához lefoglalt memória és processzor. Van még 2 belső paraméter: q_ram, q_cpu, amelyek sorok. Ezek segítségével tároljuk az aktuális klaszterterhelés értékeit. Ha azt látjuk, hogy az elmúlt 5 percben folyamatosan nőtt a terhelés, akkor úgy döntünk, hogy +1 csomópontot kell hozzáadnunk a fürthöz. Ugyanez igaz a fürt kihasználatlan állapotára is.
A fenti kód egy példa egy olyan függvényre, amely eltávolít egy gépet a fürtből, és leállítja a felhőben. Először is leszerelés történik YARN Nodemanager
, majd az üzemmód bekapcsol Maintenance
, akkor leállítjuk az összes szolgáltatást a gépen, és kikapcsoljuk a virtuális gépet a felhőben.
2. Script megfigyelő.py
Minta kód onnan:
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)
Ebben ellenőrizzük, hogy megteremtődtek-e a feltételek a klaszter kapacitásának növeléséhez, és van-e tartalék gép, megkapjuk az egyik gépnevét, hozzáadjuk a klaszterhez, és erről üzenetet teszünk közzé csapatunk Slack oldalán. Ami után elindul cooldown_period
, amikor nem adunk hozzá vagy távolítunk el semmit a klaszterből, hanem egyszerűen figyeljük a terhelést. Ha stabilizálódott és az optimális terhelési értékek folyosóján belül van, akkor egyszerűen folytatjuk a megfigyelést. Ha egy csomópont nem lenne elég, akkor hozzáadunk egy másikat.
Azokban az esetekben, amikor előttünk van egy lecke, már biztosan tudjuk, hogy egy csomópont nem lesz elég, ezért azonnal elindítjuk az összes szabad csomópontot, és aktívan tartjuk őket az óra végéig. Ez a tevékenységi időbélyegek listájának használatával történik.
Következtetés
Az Autoscaler jó és kényelmes megoldás azokra az esetekre, amikor egyenetlen fürtterhelést tapasztal. Egyszerre éri el a kívánt fürtkonfigurációt a csúcsterhelésekhez, ugyanakkor nem tartja meg ezt a klasztert alulterhelés alatt, így pénzt takarít meg. Nos, ráadásul mindez automatikusan, az Ön részvétele nélkül történik. Maga az autoscaler nem más, mint a fürtkezelő API-hoz és a felhőszolgáltató API-hoz intézett kérések halmaza, egy bizonyos logika szerint megírva. Amire mindenképpen emlékezni kell, az a csomópontok 3 típusra való felosztása, ahogy korábban írtuk. És boldog leszel.
Forrás: will.com