Jak stworzyć własny autoskaler dla klastra

Cześć! Szkolimy ludzi do pracy z big data. Nie sposób wyobrazić sobie programu edukacyjnego dotyczącego big data bez własnego klastra, nad którym pracują wszyscy uczestnicy. Z tego powodu nasz program zawsze to ma 🙂 Zajmujemy się jego konfiguracją, tuningiem i administracją, a chłopaki bezpośrednio tam uruchamiają zadania MapReduce i używają Sparka.

W tym poście opowiemy jak rozwiązaliśmy problem nierównomiernego ładowania klastrów pisząc własny autoskaler za pomocą chmury Rozwiązania chmurowe Mail.ru.

problem

Nasz klaster nie jest używany w typowym trybie. Utylizacja jest bardzo nierówna. Są na przykład zajęcia praktyczne, podczas których całe 30 osób wraz z nauczycielem udaje się do klastra i zaczyna z niego korzystać. Lub znowu są dni przed terminem, kiedy obciążenie znacznie wzrasta. Przez resztę czasu klaster działa w trybie niedociążenia.

Rozwiązanie nr 1 polega na utrzymaniu klastra, który wytrzyma obciążenia szczytowe, ale przez resztę czasu będzie bezczynny.

Rozwiązanie nr 2 polega na utrzymaniu małego klastra, do którego ręcznie dodajesz węzły przed zajęciami i podczas szczytowych obciążeń.

Rozwiązanie nr 3 polega na utrzymaniu małego klastra i napisaniu autoskalera, który będzie monitorował bieżące obciążenie klastra oraz przy użyciu różnych interfejsów API dodawał i usuwał węzły z klastra.

W tym poście omówimy rozwiązanie nr 3. Ten autoskaler jest w dużym stopniu zależny od czynników zewnętrznych, a nie wewnętrznych, a dostawcy często go nie zapewniają. Korzystamy z infrastruktury chmurowej Mail.ru Cloud Solutions i napisaliśmy autoskaler za pomocą API MCS. A ponieważ uczymy pracy z danymi, postanowiliśmy pokazać, jak można napisać podobny autoskaler na własne potrzeby i wykorzystać go ze swoją chmurą

Wymagania wstępne

Po pierwsze, musisz mieć klaster Hadoop. Na przykład używamy dystrybucji HDP.

Aby Twoje węzły mogły być szybko dodawane i usuwane, musisz mieć określony podział ról pomiędzy węzłami.

  1. Węzeł główny. No cóż, nie trzeba nic specjalnie wyjaśniać: główny węzeł klastra, na którym uruchamiany jest np. sterownik Spark, jeśli korzystamy z trybu interaktywnego.
  2. Węzeł daty. Jest to węzeł, w którym przechowujesz dane w systemie HDFS i gdzie przeprowadzane są obliczenia.
  3. Węzeł obliczeniowy. Jest to węzeł, w którym nie przechowujesz niczego w systemie HDFS, ale w którym przeprowadzane są obliczenia.

Ważny punkt. Autoskalowanie nastąpi z powodu węzłów trzeciego typu. Jeśli zaczniesz pobierać i dodawać węzły drugiego typu, szybkość reakcji będzie bardzo niska — likwidacja i ponowne zatwierdzanie w klastrze zajmie kilka godzin. To oczywiście nie jest to, czego oczekujesz od automatycznego skalowania. Oznacza to, że nie dotykamy węzłów pierwszego i drugiego typu. Będą reprezentować minimalny realny klaster, który będzie istniał przez cały czas trwania programu.

Tak więc nasz autoskaler jest napisany w Pythonie 3, wykorzystuje API Ambari do zarządzania usługami klastrowymi, wykorzystuje API firmy Mail.ru Cloud Solutions (MCS) do uruchamiania i zatrzymywania maszyn.

Architektura rozwiązania

  1. Moduł autoscaler.py. Zawiera trzy klasy: 1) funkcje do pracy z Ambari, 2) funkcje do pracy z MCS, 3) funkcje związane bezpośrednio z logiką autoskalera.
  2. Scenariusz observer.py. Zasadniczo składa się z różnych zasad: kiedy i w jakich momentach wywoływać funkcje autoskalera.
  3. Plik konfiguracyjny config.py. Zawiera np. listę węzłów dopuszczonych do autoskalowania oraz inne parametry, które wpływają np. na to jak długo trzeba czekać od momentu dodania nowego węzła. Istnieją również znaczniki czasu rozpoczęcia zajęć, dzięki czemu przed zajęciami zostanie uruchomiona maksymalna dozwolona konfiguracja klastra.

Przyjrzyjmy się teraz fragmentom kodu znajdującym się w pierwszych dwóch plikach.

1. Moduł Autoscaler.py

Klasa Ambari

Tak wygląda fragment kodu zawierający klasę Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Powyżej jako przykład możesz przyjrzeć się implementacji funkcji stop_all_services, co powoduje zatrzymanie wszystkich usług w żądanym węźle klastra.

Przy wejściu do klasy Ambari przechodzisz:

  • ambari_urlna przykład, jak 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – nazwę Twojego klastra w Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • i w środku auth oto Twój login i hasło do Ambari: auth = ('login', 'password').

Sama funkcja to nic innego jak kilka wywołań poprzez REST API do Ambari. Z logicznego punktu widzenia najpierw otrzymujemy listę uruchomionych usług na węźle, a następnie prosimy na danym klastrze, na danym węźle, o przeniesienie usług z listy do stanu INSTALLED. Funkcje uruchamiania wszystkich usług, przesyłania węzłów do stanu Maintenance itp. wyglądają podobnie - to tylko kilka żądań poprzez API.

Klasa Mc

Tak wygląda fragment kodu zawierający klasę Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Przy wejściu do klasy Mcs wewnątrz chmury przekazujemy identyfikator projektu oraz identyfikator użytkownika i jego hasło. W działaniu vm_turn_on chcemy włączyć jedną z maszyn. Logika jest tutaj nieco bardziej skomplikowana. Na początku kodu wywoływane są trzy inne funkcje: 1) musimy uzyskać token, 2) musimy przekonwertować nazwę hosta na nazwę maszyny w MCS, 3) uzyskać identyfikator tej maszyny. Następnie po prostu wysyłamy żądanie wysłania wiadomości i uruchamiamy tę maszynę.

Tak wygląda funkcja uzyskania tokena:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Klasa autoskalera

Klasa ta zawiera funkcje związane z samą logiką działania.

Tak wygląda fragment kodu tej klasy:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Przyjmujemy zajęcia do wpisu. Ambari и Mcs, listę węzłów, które można skalować, a także parametry konfiguracyjne węzła: pamięć i procesor przydzielone do węzła w YARN. Istnieją również 2 parametry wewnętrzne q_ram, q_cpu, które są kolejkami. Za ich pomocą przechowujemy wartości aktualnego obciążenia klastra. Jeśli widzimy, że w ciągu ostatnich 5 minut obciążenie stale wzrastało, wówczas decydujemy, że musimy dodać węzeł +1 do klastra. To samo dotyczy stanu niedostatecznego wykorzystania klastra.

Powyższy kod jest przykładem funkcji usuwającej maszynę z klastra i zatrzymującej ją w chmurze. Najpierw następuje likwidacja YARN Nodemanager, następnie tryb zostanie włączony Maintenance, wówczas zatrzymujemy wszystkie usługi na maszynie i wyłączamy maszynę wirtualną w chmurze.

2. Skrypt obserwator.py

Przykładowy kod stamtąd:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Sprawdzamy w nim, czy zostały stworzone warunki do zwiększenia pojemności klastra i czy są jakieś maszyny w rezerwie, uzyskujemy nazwę hosta jednej z nich, dodajemy ją do klastra i publikujemy o tym wiadomość na Slacku naszego zespołu. Po czym się zaczyna cooldown_period, gdy nie dodajemy ani nie usuwamy niczego z klastra, a jedynie monitorujemy obciążenie. Jeśli się ustabilizowało i mieści się w korytarzu optymalnych wartości obciążeń, to po prostu kontynuujemy monitoring. Jeśli jeden węzeł nie wystarczył, dodajemy kolejny.

W przypadkach, gdy mamy przed sobą lekcję, wiemy już na pewno, że jeden węzeł nie wystarczy, dlatego od razu uruchamiamy wszystkie wolne węzły i utrzymujemy je aktywne do końca lekcji. Odbywa się to przy użyciu listy znaczników czasu działań.

wniosek

Autoskaler jest dobrym i wygodnym rozwiązaniem w przypadkach, gdy występuje nierównomierne ładowanie klastra. Jednocześnie uzyskujesz pożądaną konfigurację klastra dla obciążeń szczytowych i jednocześnie nie utrzymujesz tego klastra podczas niedociążenia, oszczędzając pieniądze. Cóż, a wszystko to dzieje się automatycznie, bez Twojego udziału. Sam autoskaler to nic innego jak zestaw żądań do API menedżera klastrów i API dostawcy chmury, zapisanych według określonej logiki. To o czym na pewno trzeba pamiętać to podział węzłów na 3 typy, o czym pisaliśmy wcześniej. I będziesz szczęśliwy.

Źródło: www.habr.com

Dodaj komentarz