Hallo! Wir lehren Menschen, wie man mit Big Data arbeitet. Ein Bildungsprogramm zu Big Data ist ohne einen eigenen Cluster, in dem alle Teilnehmer zusammenarbeiten, nicht vorstellbar. Deshalb gibt es in unserem Programm immer einen :) Wir kĂŒmmern uns um dessen Einrichtung, Optimierung und Verwaltung, und die Jungs starten dort direkt MapReduce-Jobs und verwenden Spark.
In diesem Beitrag erzĂ€hlen wir Ihnen, wie wir das Problem der ungleichmĂ€Ăigen Clusterauslastung gelöst haben, indem wir mithilfe der Cloud unseren eigenen Autoscaler geschrieben haben. .
Problem
Unser Cluster wird nicht im Standardbetrieb genutzt. Die Auslastung ist sehr ungleichmĂ€Ăig. Beispielsweise gibt es praktische Lehrveranstaltungen, bei denen sich alle 30 Personen und der Dozent am Cluster anmelden und ihn nutzen. Oder es gibt Tage vor der Deadline, an denen die Auslastung deutlich ansteigt. In der ĂŒbrigen Zeit arbeitet der Cluster im Unterlastmodus.
Lösung Nr. 1 besteht darin, einen Cluster aufrechtzuerhalten, der Spitzenlasten bewÀltigen kann, aber die restliche Zeit im Leerlauf ist.
Lösung Nr. 2 besteht darin, einen kleinen Cluster zu verwalten und ihm vor dem Unterricht und wĂ€hrend Spitzenlasten manuell Knoten hinzuzufĂŒgen.
Lösung Nr. 3 besteht darin, einen kleinen Cluster zu verwalten und einen Autoscaler zu schreiben, der die aktuelle Clusterlast ĂŒberwacht und mithilfe verschiedener APIs Knoten zum Cluster selbst hinzufĂŒgt und daraus entfernt.
In diesem Beitrag sprechen wir ĂŒber Lösung Nr. 3. Ein solcher Autoscaler ist stark von externen, nicht von internen Faktoren abhĂ€ngig und wird von Anbietern oft nicht angeboten. Wir nutzen die Cloud-Infrastruktur von Mail.ru Cloud Solutions und haben einen Autoscaler mit der MCS-API entwickelt. Da wir den Umgang mit Daten lehren, zeigen wir Ihnen, wie Sie einen Ă€hnlichen Autoscaler fĂŒr Ihre eigenen Zwecke entwickeln und in Ihrer Cloud nutzen können.
Voraussetzungen:
ZunÀchst benötigen Sie einen Hadoop-Cluster. Wir verwenden beispielsweise die HDP-Distribution.
Damit Sie Knoten schnell hinzufĂŒgen und entfernen können, ist eine bestimmte Rollenverteilung unter den Knoten erforderlich.
- Masterknoten. Hier gibt es nichts Besonderes zu erklÀren: der Hauptknoten des Clusters, auf dem beispielsweise der Spark-Treiber gestartet wird, wenn Sie den interaktiven Modus verwenden.
- Datenknoten. Dies ist der Knoten, auf dem Sie Daten auf HDFS speichern und Berechnungen durchfĂŒhren.
- Rechenknoten. Dies ist ein Knoten, auf dem Sie nichts auf HDFS gespeichert haben, auf dem aber Berechnungen stattfinden.
Wichtiger Punkt. Die automatische Skalierung erfolgt durch Knoten des dritten Typs. Wenn Sie Knoten des zweiten Typs hinzufĂŒgen, ist die Reaktionsgeschwindigkeit sehr gering â die AuĂerbetriebnahme und Wiederinbetriebnahme Ihres Clusters dauert Stunden. Das entspricht natĂŒrlich nicht dem, was Sie von der automatischen Skalierung erwarten. Das heiĂt, wir berĂŒhren Knoten des ersten und zweiten Typs nicht. Sie stellen einen minimal funktionsfĂ€higen Cluster dar, der wĂ€hrend des gesamten Programms bestehen bleibt.
Unser Autoscaler ist in Python 3 geschrieben, nutzt die Ambari-API zur Verwaltung von Cluster-Diensten, nutzt (MCS) zum Starten und Stoppen von Maschinen.
Lösungsarchitektur
- Modul
autoscaler.py. Es enthĂ€lt drei Klassen: 1) Funktionen fĂŒr die Arbeit mit Ambari, 2) Funktionen fĂŒr die Arbeit mit MCS, 3) Funktionen, die direkt mit der Logik des Autoscalers zusammenhĂ€ngen. - Skript
observer.py. Besteht im Wesentlichen aus verschiedenen Regeln: wann und zu welchen Zeitpunkten die Autoscaler-Funktionen aufgerufen werden sollen. - Datei mit Konfigurationsparametern
config.py. Es enthĂ€lt beispielsweise eine Liste der fĂŒr die automatische Skalierung zulĂ€ssigen Knoten und andere Parameter, die beispielsweise beeinflussen, wie lange ab dem Zeitpunkt des HinzufĂŒgens eines neuen Knotens gewartet werden soll. Es enthĂ€lt auch die Zeitstempel fĂŒr den Start von Klassen, sodass die maximal zulĂ€ssige Clusterkonfiguration vor der Klasse gestartet wird.
Sehen wir uns nun die Codeteile in den ersten beiden Dateien an.
1. Modul autoscaler.py
Ambari-Klasse
So sieht ein CodestĂŒck aus, das eine Klasse enthĂ€lt Ambari:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return messageOben sehen Sie als Beispiel die Implementierung der Funktion stop_all_services, wodurch alle Dienste auf dem erforderlichen Clusterknoten gestoppt werden.
Am Eingang der Klasse Ambari Sie ĂŒbermitteln:
ambari_url, zum Beispiel vom Typ'http://localhost:8080/api/v1/clusters/',cluster_nameâ der Name Ihres Clusters in Ambari,headers = {'X-Requested-By': 'ambari'}- und drinnen
authhier sind Ihr Login und Ihr Passwort von Ambari:auth = ('login', 'password').
Die Funktion selbst besteht lediglich aus ein paar Aufrufen ĂŒber die REST-API an Ambari. Logisch gesehen erhalten wir zunĂ€chst eine Liste der laufenden Dienste auf dem Knoten und bitten dann diesen Cluster, auf diesem Knoten, die Dienste aus der Liste in den Status zu ĂŒbertragen. INSTALLEDFunktionen zum Starten aller Dienste, zum Ăbertragen von Knoten in den Status Maintenance und andere sehen Ă€hnlich aus â es sind nur ein paar Anfragen ĂŒber die API.
MCs-Klasse
So sieht ein CodestĂŒck aus, das eine Klasse enthĂ€lt Mcs:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_codeAm Eingang der Klasse Mcs Wir ĂŒbergeben die Projekt-ID innerhalb der Cloud und die Benutzer-ID sowie sein Passwort. In der Funktion vm_turn_on Wir möchten eine der Maschinen einschalten. Die Logik ist hier etwas komplizierter. Zu Beginn des Codes werden drei weitere Funktionen aufgerufen: 1) Wir benötigen ein Token, 2) wir mĂŒssen den Hostnamen in MCS in den Namen der Maschine konvertieren und 3) wir mĂŒssen die ID dieser Maschine ermitteln. AnschlieĂend stellen wir einfach eine Post-Anfrage und starten die Maschine.
So sieht die Funktion zum Erhalt eines Tokens aus:
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.tokenAutoscaler-Klasse
Diese Klasse enthÀlt Funktionen, die sich auf die eigentliche Logik der Arbeit beziehen.
So sieht ein Codeabschnitt fĂŒr diese Klasse aus:
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return messageWir akzeptieren Klassen am Eingang Ambari Đž Mcs, eine Liste der Knoten, die skaliert werden können, und Knotenkonfigurationsparameter: Speicher und CPU, die dem Knoten in YARN zugewiesen sind. Es gibt auĂerdem zwei interne Parameter, q_ram und q_cpu, die Warteschlangen darstellen. Mit ihrer Hilfe speichern wir die Werte der aktuellen Clusterauslastung. Wenn wir feststellen, dass die Auslastung in den letzten 2 Minuten konstant gestiegen ist, entscheiden wir uns, dem Cluster einen weiteren Knoten hinzuzufĂŒgen. Gleiches gilt fĂŒr den Zustand der Clusterunterlastung.
Der obige Code zeigt ein Beispiel fĂŒr eine Funktion, die eine Maschine aus dem Cluster entfernt und in der Cloud stoppt. ZunĂ€chst erfolgt eine AuĂerbetriebnahme YARN Nodemanager, dann ist der Modus eingeschaltet Maintenance, dann stoppen wir alle Dienste auf der Maschine und schalten die virtuelle Maschine in der Cloud aus.
2. Skript observer.py
Beispielcode von dort:
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)Darin prĂŒfen wir, ob die Voraussetzungen fĂŒr die Erhöhung der ClusterkapazitĂ€t erfĂŒllt sind und ob ĂŒberhaupt noch Maschinen in Reserve sind. Wir ermitteln den Hostnamen einer dieser Maschinen, fĂŒgen sie dem Cluster hinzu und veröffentlichen eine Nachricht darĂŒber im Slack unseres Teams. Danach geht es los. cooldown_period, wenn wir dem Cluster nichts hinzufĂŒgen oder entfernen, sondern lediglich die Last ĂŒberwachen. Hat sie sich stabilisiert und liegt sie im Korridor optimaler Lastwerte, setzen wir die Ăberwachung einfach fort. Reicht ein Knoten nicht aus, fĂŒgen wir einen weiteren hinzu.
Wenn wir eine Unterrichtsstunde vor uns haben, wissen wir bereits, dass ein Knoten nicht ausreichen wird. Daher starten wir sofort alle freien Knoten und halten sie bis zum Ende der Unterrichtsstunde aktiv. Dies geschieht mithilfe der Zeitstempelliste der Unterrichtsstunde.
Fazit
Autoscaler ist eine gute und praktische Lösung fĂŒr FĂ€lle mit ungleichmĂ€Ăiger Clusterauslastung. Sie erreichen gleichzeitig die erforderliche Clusterkonfiguration fĂŒr Spitzenlasten und mĂŒssen diesen Cluster bei Unterlast nicht lĂ€nger nutzen, was Kosten spart. All dies geschieht automatisch und ohne Ihr Zutun. Der Autoscaler selbst ist nichts anderes als eine Reihe von Anfragen an die Cluster-Manager-API und die Cloud-Provider-API, die nach einer bestimmten Logik geschrieben sind. Beachten Sie unbedingt die bereits beschriebene Aufteilung der Knoten in drei Typen. Und Sie werden zufrieden sein.
Source: habr.com
