So erstellen Sie Ihren eigenen Autoscaler für einen Cluster

Hallo! Wir schulen Menschen im Umgang mit Big Data. Ein Bildungsprogramm zum Thema Big Data ist ohne einen eigenen Cluster, an dem alle Teilnehmer gemeinsam arbeiten, nicht vorstellbar. Aus diesem Grund hat unser Programm es immer 🙂 Wir beschäftigen uns mit der Konfiguration, Abstimmung und Verwaltung, und die Jungs starten dort direkt MapReduce-Jobs und verwenden Spark.

In diesem Beitrag erzählen wir Ihnen, wie wir das Problem der ungleichmäßigen Clusterauslastung gelöst haben, indem wir unseren eigenen Autoscaler mithilfe der Cloud geschrieben haben Mail.ru Cloud-Lösungen.

Problem

Unser Cluster wird nicht in einem typischen Modus verwendet. Die Entsorgung ist sehr ungleichmäßig. Zum Beispiel gibt es praktische Kurse, bei denen alle 30 Personen und ein Lehrer zum Cluster gehen und mit der Nutzung beginnen. Oder es gibt Tage vor Ablauf der Frist, an denen die Belastung stark ansteigt. Die restliche Zeit arbeitet der Cluster im Unterlastmodus.

Lösung Nr. 1 besteht darin, einen Cluster beizubehalten, der Spitzenlasten standhält, die restliche Zeit jedoch im Leerlauf bleibt.

Lösung Nr. 2 besteht darin, einen kleinen Cluster beizubehalten, zu dem Sie vor dem Unterricht und bei Spitzenlasten manuell Knoten hinzufügen.

Lösung Nr. 3 besteht darin, einen kleinen Cluster zu behalten und einen Autoscaler zu schreiben, der die aktuelle Auslastung des Clusters überwacht und mithilfe verschiedener APIs Knoten zum Cluster hinzufügt und daraus entfernt.

In diesem Beitrag werden wir über Lösung Nr. 3 sprechen. Dieser Autoscaling ist stark von externen und nicht von internen Faktoren abhängig und wird von den Anbietern häufig nicht bereitgestellt. Wir nutzen die Cloud-Infrastruktur von Mail.ru Cloud Solutions und haben einen Autoscaler mit der MCS-API geschrieben. Und da wir lehren, wie man mit Daten arbeitet, haben wir uns entschieden, zu zeigen, wie Sie einen ähnlichen Autoscaler für Ihre eigenen Zwecke schreiben und ihn mit Ihrer Cloud verwenden können

Voraussetzungen:

Zunächst müssen Sie über einen Hadoop-Cluster verfügen. Wir nutzen zum Beispiel die HDP-Distribution.

Damit Ihre Knoten schnell hinzugefügt und entfernt werden können, müssen Sie über eine bestimmte Rollenverteilung zwischen den Knoten verfügen.

  1. Masterknoten. Nun, es muss nichts Besonderes erklärt werden: der Hauptknoten des Clusters, auf dem beispielsweise der Spark-Treiber gestartet wird, wenn Sie den interaktiven Modus verwenden.
  2. Datumsknoten. Dies ist der Knoten, auf dem Sie Daten auf HDFS speichern und auf dem Berechnungen stattfinden.
  3. Rechenknoten. Dies ist ein Knoten, auf dem Sie nichts auf HDFS speichern, sondern auf dem Berechnungen stattfinden.

Wichtiger Punkt. Aufgrund von Knoten des dritten Typs erfolgt eine automatische Skalierung. Wenn Sie mit der Aufnahme und dem Hinzufügen von Knoten des zweiten Typs beginnen, ist die Reaktionsgeschwindigkeit sehr niedrig – die Außerbetriebnahme und erneute Festschreibung Ihres Clusters wird Stunden dauern. Das ist natürlich nicht das, was Sie von der automatischen Skalierung erwarten. Das heißt, wir berühren keine Knoten des ersten und zweiten Typs. Sie stellen einen minimal lebensfähigen Cluster dar, der während der gesamten Laufzeit des Programms bestehen bleibt.

Unser Autoscaler ist also in Python 3 geschrieben und verwendet die Ambari-API zur Verwaltung von Clusterdiensten API von Mail.ru Cloud Solutions (MCS) zum Starten und Stoppen von Maschinen.

Lösungsarchitektur

  1. Modul autoscaler.py. Es enthält drei Klassen: 1) Funktionen für die Arbeit mit Ambari, 2) Funktionen für die Arbeit mit MCS, 3) Funktionen, die sich direkt auf die Logik des Autoscalings beziehen.
  2. Skript observer.py. Im Wesentlichen besteht es aus verschiedenen Regeln: wann und zu welchen Zeitpunkten die Autoscaling-Funktionen aufgerufen werden sollen.
  3. Konfigurationsdatei config.py. Es enthält beispielsweise eine Liste von Knoten, die für die automatische Skalierung zugelassen sind, und andere Parameter, die sich beispielsweise darauf auswirken, wie lange nach dem Hinzufügen eines neuen Knotens gewartet werden soll. Es gibt auch Zeitstempel für den Beginn von Kursen, sodass vor dem Kurs die maximal zulässige Clusterkonfiguration gestartet wird.

Schauen wir uns nun die Codeteile in den ersten beiden Dateien an.

1. Autoscaler.py-Modul

Ambari-Klasse

So sieht ein Codeabschnitt aus, der eine Klasse enthält Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Oben können Sie sich als Beispiel die Implementierung der Funktion ansehen stop_all_services, wodurch alle Dienste auf dem gewünschten Clusterknoten gestoppt werden.

Am Eingang zur Klasse Ambari du passierst:

  • ambari_url, zum Beispiel, wie 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – der Name Ihres Clusters in Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • und drinnen auth Hier ist Ihr Benutzername und Passwort für Ambari: auth = ('login', 'password').

Die Funktion selbst ist nichts weiter als ein paar Aufrufe über die REST-API an Ambari. Aus logischer Sicht erhalten wir zunächst eine Liste der laufenden Dienste auf einem Knoten und bitten dann auf einem bestimmten Cluster, auf einem bestimmten Knoten, Dienste aus der Liste in den Status zu übertragen INSTALLED. Funktionen zum Starten aller Dienste, zum Übertragen von Knoten in den Status Maintenance usw. sehen ähnlich aus – es handelt sich lediglich um einige Anfragen über die API.

Klasse Mcs

So sieht ein Codeabschnitt aus, der eine Klasse enthält Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Am Eingang zur Klasse Mcs Wir übergeben die Projekt-ID innerhalb der Cloud und die Benutzer-ID sowie sein Passwort. In Funktion vm_turn_on Wir wollen eine der Maschinen einschalten. Die Logik ist hier etwas komplizierter. Am Anfang des Codes werden drei weitere Funktionen aufgerufen: 1) Wir müssen ein Token erhalten, 2) wir müssen den Hostnamen in den Namen der Maschine in MCS umwandeln, 3) wir müssen die ID dieser Maschine abrufen. Als nächstes stellen wir einfach eine Post-Anfrage und starten diese Maschine.

So sieht die Funktion zum Erhalten eines Tokens aus:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Autoscaler-Klasse

Diese Klasse enthält Funktionen, die sich auf die Betriebslogik selbst beziehen.

So sieht ein Code für diese Klasse aus:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Wir akzeptieren Kurse zur Teilnahme. Ambari и Mcs, eine Liste der Knoten, die für die Skalierung zugelassen sind, sowie Knotenkonfigurationsparameter: Speicher und CPU, die dem Knoten in YARN zugewiesen sind. Es gibt auch zwei interne Parameter q_ram und q_cpu, bei denen es sich um Warteschlangen handelt. Mit ihnen speichern wir die Werte der aktuellen Clusterlast. Wenn wir feststellen, dass die Last in den letzten 2 Minuten stetig gestiegen ist, entscheiden wir, dass wir dem Cluster +5 Knoten hinzufügen müssen. Das Gleiche gilt für den Zustand der Cluster-Unterauslastung.

Der obige Code ist ein Beispiel für eine Funktion, die eine Maschine aus dem Cluster entfernt und in der Cloud stoppt. Zunächst erfolgt eine Stilllegung YARN Nodemanager, dann schaltet sich der Modus ein Maintenance, dann stoppen wir alle Dienste auf der Maschine und schalten die virtuelle Maschine in der Cloud aus.

2. Skript Observer.py

Beispielcode von dort:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Darin prüfen wir, ob Bedingungen für die Kapazitätserhöhung des Clusters geschaffen wurden und ob Maschinen in Reserve sind, ermitteln den Hostnamen einer davon, fügen sie dem Cluster hinzu und veröffentlichen eine Nachricht darüber im Slack unseres Teams. Danach geht es los cooldown_period, wenn wir dem Cluster nichts hinzufügen oder daraus entfernen, sondern lediglich die Last überwachen. Wenn es sich stabilisiert hat und im Korridor der optimalen Belastungswerte liegt, dann setzen wir die Überwachung einfach fort. Wenn ein Knoten nicht ausreicht, fügen wir einen weiteren hinzu.

Für den Fall, dass wir eine Unterrichtsstunde vor uns haben, wissen wir bereits mit Sicherheit, dass ein Knoten nicht ausreichen wird, also starten wir sofort alle freien Knoten und lassen sie bis zum Ende der Unterrichtsstunde aktiv. Dies geschieht mithilfe einer Liste von Aktivitätszeitstempeln.

Abschluss

Autoscaler ist eine gute und praktische Lösung für Fälle, in denen eine ungleichmäßige Clusterauslastung auftritt. Sie erreichen gleichzeitig die gewünschte Cluster-Konfiguration für Spitzenlasten und behalten gleichzeitig diesen Cluster bei Unterlast nicht bei, was Kosten spart. Nun, außerdem geschieht dies alles automatisch und ohne Ihr Zutun. Der Autoscaler selbst ist nichts anderes als eine Reihe von Anfragen an die Cluster-Manager-API und die Cloud-Provider-API, die nach einer bestimmten Logik geschrieben sind. Was Sie unbedingt beachten müssen, ist die Aufteilung der Knoten in drei Typen, wie wir bereits geschrieben haben. Und du wirst glücklich sein.

Source: habr.com

Kommentar hinzufügen