Як зробити свій автоскейлер для кластера

Вітання! Ми навчаємо людей працювати з великими даними. Неможливо собі уявити освітню програму за великими даними без свого кластера, де всі учасники спільно працюють. З цієї причини на нашій програмі він завжди є 🙂 Ми займаємось його налаштуванням, тюнінгом та адмініструванням, а хлопці безпосередньо запускають там MapReduce-джоби та користуються Spark'ом.

У цьому пості ми розповімо, як вирішували проблему нерівномірного завантаження кластера, написавши свій автоскейлер, використовуючи хмару Mail.ru Cloud Solutions.

проблема

Кластер у нас використовується не зовсім у типовому режимі. Утилізація дуже нерівномірна. Наприклад, є практичні заняття, коли всі 30 чоловік та викладач заходять на кластер і починають ним користуватися. Або знову ж таки є дні перед дедлайном, коли завантаження сильно зростає. У решту часу кластер працює в режимі недозавантаження.

Рішення №1 – це тримати кластер, який витримуватиме пікові завантаження, але простоюватиме в решту часу.

Рішення №2 – це тримати невеликий кластер, до якого вручну додавати ноди перед заняттями та під час пікових навантажень.

Рішення №3 – це тримати невеликий кластер і написати автоскейлер, який слідкуватиме за поточним завантаженням кластера і сам, використовуючи різні API, додавати та видаляти ноди з кластера.

У цьому посту ми говоритимемо про рішення №3. Такий автоскейлер сильно залежить від зовнішніх факторів, а не від внутрішніх, і провайдери часто не надають. Ми користуємося хмарною інфраструктурою Mail.ru Cloud Solutions та написали автоскейлер, використовуючи API MCS. Оскільки ми навчаємо роботі з даними, вирішили показати, як ви можете написати подібний автоскейлер для своїх цілей і використовувати зі своєю хмарою

Передумови

По-перше, у вас має бути Hadoop-кластер. Ми, наприклад, користуємось дистрибутивом HDP.

Щоб у вас ноди могли швидко додаватись та видалятися, у вас має бути певний розподіл ролей по нодах.

  1. Майстер-нода. Ну тут пояснювати нічого особливо не треба: головна нода кластера, на якій запускається, наприклад, драйвер Spark'а, якщо ви користуєтесь інтерактивним режимом.
  2. Дата-нода. Це нода, на якій у вас зберігаються дані на HDFS і на ній відбуваються обчислення.
  3. Обчислювальна нода. Це нода, на якій у вас нічого не зберігається на HDFS, але на ній відбуваються обчислення.

Важливий момент. Автоскейлінг відбуватиметься за рахунок нід третього типу. Якщо ви почнете забирати і додавати ноди другого типу, то швидкість реагування буде дуже низькою – декоміш і рекоміш займатиме годинник на вашому кластері. Це, звичайно, не те, що очікуєш від автоскейлінгу. Тобто ноди першого та другого типу ми не чіпаємо. Вони будуть мінімально життєздатним кластером, який існуватиме протягом всієї дії програми.

Отже, наш автоскейлер написано на Python 3, використовує Ambari API для управління сервісами кластера, використовує API від Mail.ru Cloud Solutions (MCS) для запуску та зупинки машин.

Архітектура рішення

  1. модуль autoscaler.py. У ньому прописані три класи: 1) функції для роботи з Ambari; 2) функції для роботи з MCS; 3) функції, пов'язані безпосередньо з логікою роботи автоскейлера.
  2. Сценарій observer.py. Насправді складається з різних правил: коли і в які моменти викликати функції автоскейлера.
  3. Файл із параметрами конфігурації config.py. Там міститься, наприклад, список нод, дозволених для автоскейлінгу та інші параметри, що впливають, наприклад, скільки часу почекати з того моменту, коли була додана нова нода. Там знаходяться ще й таймстемпи початку занять, щоб перед заняттям була запущена максимальна дозволена конфігурація кластера.

Давайте тепер подивимося на шматки коду всередині перших двох файлів.

1. Модуль autoscaler.py

Клас Ambari

Так виглядає шматочок коду, що містить клас Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Вище для прикладу можна переглянути реалізацію функції stop_all_services, яка зупиняє всі послуги на потрібній ноді кластера.

На вхід класу Ambari ви передаєте:

  • ambari_url, наприклад, виду 'http://localhost:8080/api/v1/clusters/',
  • cluster_name - Назва вашого кластера в Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • і всередині auth лежить ваш логін та пароль від Ambari: auth = ('login', 'password').

Сама функція являє собою не більше ніж кілька звернень через REST API до Ambari. З погляду логіки ми спочатку отримуємо список запущених сервісів на ноді, а потім просимо на даному кластері, на цій ноді перевести сервіси зі списку на стан INSTALLED. Функції із запуску всіх сервісів, з переведення нід у стан Maintenance та ін виглядають схожим чином – це просто кілька запитів через API.

Клас Mcs

Так виглядає шматочок коду, що містить клас Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

На вхід класу Mcs ми передаємо id проекту всередині хмари та id користувача, а також його пароль. У функції vm_turn_on ми хочемо включити одну з машин. Логіка тут трохи складніша. На початку коду йде виклик трьох інших функцій: 1) нам потрібно отримати токен; 2) нам потрібно конвертувати hostname в назву машини в MCS; 3) отримати id цієї машини. Далі ми робимо просто post-запит і запускаємо цю машину.

Так виглядає сама функція отримання токена:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Клас Autoscaler

У цьому вся класі містяться функції, які стосуються самої логіці роботи.

Так виглядає шматок коду цього класу:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

На вхід ми приймаємо класи Ambari и Mcs, Список нод, які дозволені для скейлінгу, а також параметри конфігурації нод: пам'ять і cpu, виділені на ноду в YARN. Також є два внутрішніх параметри q_ram, q_cpu, що є чергами. За їх допомогою ми зберігаємо значення поточного навантаження кластера. Якщо ми бачимо, що протягом останніх 2 хвилин стабільно було підвищене навантаження, ми приймаємо рішення про те, що потрібно додати +5 ноду в кластер. Те саме справедливо і для стану недозавантаження кластера.

У коді вище наведено приклад функції, яка видаляє машину із кластера та зупиняє її у хмарі. Спочатку відбувається декомішен YARN Nodemanager, далі вмикається режим Maintenance, далі ми зупиняємо всі сервіси на машині та вимикаємо віртуальну машину у хмарі.

2. Скрипт observer.py

Приклад коду звідти:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

У ньому ми перевіряємо, чи склалися умови для збільшення потужностей кластера і чи є взагалі в резерві машини, отримуємо хостнейм однієї з них, додаємо її в кластер і публікуємо про це повідомлення в Slack нашої команди. Після чого запускається cooldown_periodКоли ми нічого не додаємо і не прибираємо з кластера, а просто моніторимо завантаження. Якщо вона стабілізувалася і знаходиться всередині коридору оптимальних значень завантаження, ми просто продовжуємо моніторинг. Якщо однієї ноди не вистачило, то додаємо ще одну.

Для випадків, коли у нас попереду заняття, ми вже знаємо напевно, що однієї ноди не вистачить, тому ми відразу стартуємо всі вільні ноди і тримаємо їх активними до кінця заняття. Це відбувається за допомогою списку таймстемпів занять.

Висновок

Автоскейлер – це гарне та зручне рішення для тих випадків, коли у вас спостерігається нерівномірне завантаження кластера. Ви одночасно досягаєте потрібної конфігурації кластера під пікові навантаження і при цьому не тримаєте цей кластер під час недозавантаження, заощаджуючи кошти. Та й плюс це все відбувається автоматизовано без вашої участі. Сам автоскейлер – це не більше ніж набір запитів до API кластер-менеджера та API хмарного провайдера, прописаних за певною логікою. Про що точно треба пам'ятати - це про поділ нід на 3 типи, як ми писали раніше. І буде вам щастя.

Джерело: habr.com

Додати коментар або відгук