Як зрабіць свой аўтаскейлер для кластара

Прывітанне! Мы навучаем людзей рабоце з вялікімі дадзенымі. Немагчыма сабе ўявіць адукацыйную праграму па вялікіх звестках без свайго кластара, на якім усе ўдзельнікі сумесна працуюць. Па гэтай прычыне на нашай праграме ён заўсёды ёсць 🙂 Мы займаемся яго настройкай, цюнінгам і адміністраваннем, а хлопцы непасрэдна запускаюць там MapReduce-джобы і карыстаюцца Spark'ам.

У гэтым пасце мы раскажам, як мы вырашалі праблему нераўнамернай загрузкі кластара, напісаўшы свой аўтаскейлер, выкарыстоўваючы воблака Mail.ru Cloud Solutions.

праблема

Кластар у нас выкарыстоўваецца не зусім у тыповым рэжыме. Утылізацыя моцна нераўнамерная. Напрыклад, ёсць практычныя заняткі, калі ўсе 30 чалавек і выкладчык заходзяць на кластар і пачынаюць ім карыстацца. Ці зноў жа ёсць дні перад дэдлайнам, калі загрузка моцна ўзрастае. Ва ўвесь астатні час кластар працуе ў рэжыме недазагрузкі.

Рашэнне №1 - гэта трымаць кластар, які будзе вытрымліваць пікавыя загрузкі, але будзе прастойваць ва ўвесь астатні час.

Рашэнне №2 - гэта трымаць невялікі кластар, у які ўручную дадаваць ноды перад заняткамі і падчас пікавых нагрузак.

Рашэнне №3 - гэта трымаць невялікі кластар і напісаць аўтаскейлер, які будзе сачыць за бягучай загрузкай кластара і сам, выкарыстоўваючы розныя API, дадаваць і выдаляць ноды з кластара.

У гэтым пасце мы будзем казаць аб рашэнні №3. Такі аўтаскейлер моцна залежыць ад вонкавых фактараў, а не ад унутраных, і правайдэры яго часта не падаюць. Мы карыстаемся хмарнай інфраструктурай Mail.ru Cloud Solutions і напісалі аўтаскейлер, выкарыстоўваючы API MCS. А бо мы навучаем працы з дадзенымі, вырашылі паказаць, як вы можаце напісаць падобны аўтаскейлер для сваіх мэт і выкарыстоўваць са сваім воблакам

перадумовы

Па-першае, у вас павінен быць Hadoop-кластар. Мы, напрыклад, карыстаемся дыстрыбутывам HDP.

Каб у вас ноды маглі хутка дадавацца і выдаляцца, у вас павінна быць вызначанае размеркаванне роляў па нодах.

  1. Майстар-нода. Ну тут тлумачыць нічога асабліва не трэба: галоўная нода кластара, на якой запускаецца, напрыклад, драйвер Spark'а, калі вы карыстаецеся інтэрактыўным рэжымам.
  2. Дата-нода. Гэта нода, на якой у вас захоўваюцца дадзеныя на HDFS і на ёй жа адбываюцца вылічэнні.
  3. Вылічальная нода. Гэта нода, на якой у вас нічога не захоўваецца на HDFS, але на ёй адбываюцца вылічэнні.

Важны момант. Аўтаскейлінг будзе адбывацца за кошт нод трэцяга тыпу. Калі вы пачнеце забіраць і дадаваць ноды другога тыпу, то хуткасць рэагавання будзе моцна нізкай - дэкамішэн і рэкамішэн будзе займаць гадзіннік на вашым кластары. Гэта, вядома, не тое, што чакаеш ад аўтаскейлінгу. Гэта значыць ноды першага і другога тыпу мы не чапаем. Яны будуць уяўляць сабой мінімальна жыццяздольны кластар, які будзе існаваць на працягу ўсяго дзеяння праграмы.

Такім чынам, наш аўтаскейлер напісаны на Python 3, выкарыстоўвае Ambari API для кіравання сэрвісамі кластара, выкарыстоўвае API ад Mail.ru Cloud Solutions (MCS) для запуску і прыпынку машын.

Архітэктура рашэння

  1. модуль autoscaler.py. У ім прапісаны тры класы: 1) функцыі для працы з Ambari, 2) функцыі для працы з MCS, 3) функцыі, звязаныя непасрэдна з логікай працы аўтаскейлера.
  2. Скрыпт observer.py. Па сутнасці складаецца з розных правілаў: калі і ў якія моманты выклікаць функцыі аўтаскейлера.
  3. Файл з канфігурацыйнымі параметрамі config.py. Там змяшчаецца, напрыклад, спіс нод, дазволеных для аўтаскейлінгу і іншыя параметры, якія ўплываюць, напрыклад, на тое, колькі часу пачакаць з таго моманту, калі была дададзена новая нода. Там жа знаходзяцца яшчэ і таймстэмпы пачатку заняткаў, каб перад заняткам была запушчана максімальная дазволеная канфігурацыя кластара.

Давайце зараз паглядзім на кавалкі кода, якія знаходзяцца ўнутры першых двух файлаў.

1. Модуль autoscaler.py

Клас Ambari

Так выглядае кавалачак кода, які змяшчае клас Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Вышэй для прыкладу можна паглядзець на рэалізацыю функцыі stop_all_services, якая спыняе ўсе сэрвісы на патрэбнай надзе кластара.

На ўваход класу Ambari вы перадаеце:

  • ambari_url, напрыклад, выгляду 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – назва вашага кластара ў Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • і ўнутры auth ляжыць ваш лагін і пароль ад Ambari: auth = ('login', 'password').

Сама функцыя ўяўляе з сябе не больш за парачку зваротаў праз REST API да Ambari. З пункту гледжання логікі мы спачатку атрымліваем спіс запушчаных сэрвісаў на нодзе, а затым просім на дадзеным кластары, на дадзенай нодзе перавесці сэрвісы з спісу ў стан INSTALLED. Функцыі па запуску ўсё сэрвісаў, па перакладзе нод у стан Maintenance і інш. выглядаюць падобнай выявай – гэта проста некалькі запытаў праз API.

Клас Mcs

Так выглядае кавалачак кода, які змяшчае клас Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

На ўваход класу Mcs мы перадаем id праекта ўнутры аблокі і id карыстальніка, а таксама яго пароль. У функцыі vm_turn_on мы хочам уключыць адну з машын. Логіка тут крыху больш складаная. У пачатку кода ідзе выклік трох іншых функцый: 1) нам трэба атрымаць токен, 2) нам трэба канвертаваць hostname у назву машыны ў MCS, 3) атрымаць id гэтай машыны. Далей мы робім проста post-запыт і запускаем гэтую машыну.

Так выглядае сама функцыя па атрыманні токена:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Клас Autoscaler

У гэтым класе змяшчаюцца функцыі, якія адносяцца да самой логіцы працы.

Так выглядае кавалак кода гэтага класа:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

На ўваход мы прымаем класы Ambari и Mcs, спіс нод, якія дазволеныя для скейлінга, а таксама параметры канфігурацыі нод: памяць і cpu, выдзеленыя на ноду ў YARN. Таксама ёсць 2 унутраных параметры q_ram, q_cpu, якія з'яўляюцца чэргамі. Пры дапамозе іх мы захоўваем значэнні бягучай нагрузкі кластара. Калі мы бачым, што на працягу апошніх 5 хвілін стабільна была падвышаная нагрузка, то мы прымаем рашэнне аб тым, што трэба дадаць 1 ноду ў кластар. Тое ж самае справядліва і для стану недазагрузкі кластара.

У кодзе вышэй прыведзены прыклад функцыі, якая выдаляе машыну з кластара і спыняе яе ў воблаку. Спачатку адбываецца дэкамішэн YARN Nodemanager, далей уключаецца рэжым Maintenance, далей мы спыняем усе сэрвісы на машыне і выключаем віртуальную машыну ў воблаку.

2. Скрыпт observer.py

Прыклад кода адтуль:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

У ім мы правяраем, ці склаліся ўмовы для павелічэння магутнасцяў кластара і ці ёсць наогул у рэзерве машыны, атрымліваем хостнейм адной з іх, дадаем яе ў кластар і публікуем пра гэта паведамленне ў Slack нашай каманды. Пасля чаго запускаецца cooldown_period, калі мы нічога не дадаем і не прыбіраем з кластара, а проста маніторым загрузку. Калі яна стабілізавалася і знаходзіцца ўсярэдзіне калідора аптымальных значэнняў загрузкі, то мы проста працягваем маніторынг. Калі ж адной ноды не хапіла, то дадаем яшчэ адну.

Для выпадкаў калі ў нас наперадзе занятак, мы ўжо ведаем напэўна, што адной ноды не хопіць, таму мы адразу стартуем усе вольныя ноды і трымаем іх актыўнымі да канца занятку. Гэта адбываецца пры дапамозе спісу таймстэмпаў заняткаў.

Заключэнне

Аўтаскейлер - гэта добрае і зручнае рашэнне для тых выпадкаў, калі ў вас назіраецца нераўнамерная загрузка кластара. Вы адначасова дамагаецеся патрэбнай канфігурацыі кластара пад віновыя нагрузкі і пры гэтым не трымаеце гэты кластар падчас недазагрузкі, эканомячы сродкі. Ну і плюс гэта ўсё адбываецца аўтаматызавана без вашага ўдзелу. Сам аўтаскейлер - гэта не больш, чым набор запытаў да API кластар-мэнэджара і API хмарнага правайдэра, прапісаных па вызначанай логіцы. Пра што сапраўды трэба памятаць - гэта аб падзеле нод на 3 тыпу, як мы пісалі раней. І будзе вам шчасце.

Крыніца: habr.com

Дадаць каментар