Како да направите сопствени аутоматски скалер за кластер

Здраво! Обучавамо људе да раде са великим подацима. Немогуће је замислити образовни програм о великим подацима без сопственог кластера, на коме сви учесници раде заједно. Из тог разлога, наш програм га увек има 🙂 Бавимо се његовом конфигурацијом, подешавањем и администрацијом, а момци тамо директно покрећу МапРедуце послове и користе Спарк.

У овом посту ћемо вам рећи како смо решили проблем неравномерног учитавања кластера писањем сопственог аутоскалера користећи облак Маил.ру Цлоуд Солутионс.

проблем

Наш кластер се не користи у типичном режиму. Одлагање је веома неравномерно. На пример, постоји практична настава, када свих 30 људи и наставник оду у кластер и почну да га користе. Или опет, постоје дани пре рока када се оптерећење увелико повећава. Остатак времена кластер ради у режиму подоптерећења.

Решење број 1 је да задржите кластер који ће издржати вршна оптерећења, али ће остати неактиван остатак времена.

Решење #2 је да задржите мали кластер, коме ручно додајете чворове пре наставе и током вршног оптерећења.

Решење #3 је да задржите мали кластер и напишете аутоматски скалер који ће пратити тренутно оптерећење кластера и, користећи различите АПИ-је, додавати и уклањати чворове из кластера.

У овом посту ћемо говорити о решењу #3. Овај аутоматски скалер у великој мери зависи од спољних фактора, а не од интерних, а провајдери га често не обезбеђују. Користимо инфраструктуру облака Маил.ру Цлоуд Солутионс и написали смо аутоматски скалер користећи МЦС АПИ. И пошто учимо како да радимо са подацима, одлучили смо да покажемо како можете да напишете сличан аутоматски скалер за сопствене потребе и да га користите са својим облаком

Предуслови

Прво, морате имати Хадооп кластер. На пример, користимо ХДП дистрибуцију.

Да би се ваши чворови брзо додавали и уклањали, морате имати одређену расподелу улога међу чворовима.

  1. Главни чвор. Па, нема потребе посебно објашњавати: главни чвор кластера, на коме се, на пример, покреће Спарк драјвер, ако користите интерактивни режим.
  2. Датум чвор. Ово је чвор на коме складиштите податке на ХДФС и где се врше прорачуни.
  3. Рачунарски чвор. Ово је чвор где не складиштите ништа на ХДФС, али где се калкулације дешавају.

Важна тачка. Аутоматско скалирање ће се десити због чворова трећег типа. Ако почнете да узимате и додајете чворове другог типа, брзина одговора ће бити веома мала - повлачење и поновно покретање ће трајати сатима на вашем кластеру. Ово, наравно, није оно што очекујете од аутоматског скалирања. То јест, не додирујемо чворове првог и другог типа. Они ће представљати минимални одрживи кластер који ће постојати током трајања програма.

Дакле, наш аутосцалер је написан у Питхон 3, користи Амбари АПИ за управљање услугама кластера, користи АПИ из Маил.ру Цлоуд Солутионс (МЦС) за покретање и заустављање машина.

Арһитектура решења

  1. Модул autoscaler.py. Садржи три класе: 1) функције за рад са Амбари, 2) функције за рад са МЦС, 3) функције које се директно односе на логику аутоскалера.
  2. Скрипта observer.py. У суштини се састоји од различитих правила: када и у којим тренуцима позвати функције аутоскалера.
  3. Конфигурациони фајл config.py. Садржи, на пример, листу чворова дозвољених за аутоматско скалирање и друге параметре који утичу, на пример, на то колико дуго треба чекати од тренутка када је додат нови чвор. Постоје и временске ознаке за почетак наставе, тако да се пре часа покреће максимално дозвољена конфигурација кластера.

Погледајмо сада делове кода унутар прве две датотеке.

1. Аутосцалер.пи модул

Амбари класа

Овако изгледа део кода који садржи класу Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Изнад, као пример, можете погледати имплементацију функције stop_all_services, који зауставља све услуге на жељеном чвору кластера.

На улазу у разред Ambari пролазиш:

  • ambari_url, на пример, као 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – назив вашег кластера у Амбарију,
  • headers = {'X-Requested-By': 'ambari'}
  • и унутра auth ево ваше пријаве и лозинке за Амбари: auth = ('login', 'password').

Сама функција није ништа више од неколико позива преко РЕСТ АПИ-ја за Амбари. Са логичне тачке гледишта, прво добијамо листу покренутих услуга на чвору, а затим тражимо од датог кластера, на датом чвору, да пренесемо услуге са листе у стање INSTALLED. Функције за покретање свих сервиса, за пребацивање чворова у стање Maintenance итд. изгледају слично - то је само неколико захтева преко АПИ-ја.

Цласс Мцс

Овако изгледа део кода који садржи класу Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

На улазу у разред Mcs прослеђујемо ИД пројекта унутар облака и ИД корисника, као и његову лозинку. У функцији vm_turn_on желимо да укључимо једну од машина. Логика је овде мало компликованија. На почетку кода се позивају још три функције: 1) треба да добијемо токен, 2) треба да конвертујемо име хоста у име машине у МЦС-у, 3) добијемо ид ове машине. Затим једноставно постављамо захтев за објављивање и покрећемо ову машину.

Овако изгледа функција за добијање токена:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Аутосцалер цласс

Ова класа садржи функције које се односе на саму оперативну логику.

Овако изгледа део кода за ову класу:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Прихватамо часове за упис. Ambari и Mcs, списак чворова који су дозвољени за скалирање, као и конфигурациони параметри чвора: меморија и ЦПУ додељени чвору у ИАРН. Постоје и 2 интерна параметра к_рам, к_цпу, који су редови. Користећи их, чувамо вредности тренутног оптерећења кластера. Ако видимо да је током последњих 5 минута дошло до константног повећања оптерећења, онда одлучујемо да треба да додамо +1 чвор у кластер. Исто важи и за стање неискоришћености кластера.

Код изнад је пример функције која уклања машину из кластера и зауставља је у облаку. Прво је декомисија YARN Nodemanager, затим се режим укључује Maintenance, затим заустављамо све услуге на машини и искључујемо виртуелну машину у облаку.

2. Сцрипт обсервер.пи

Пример кода одатле:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

У њему проверавамо да ли су створени услови за повећање капацитета кластера и да ли има машина у резерви, добијамо хостнаме једне од њих, додајемо је у кластер и објављујемо поруку о томе на Слацк-у нашег тима. Након чега почиње cooldown_period, када ништа не додајемо и не уклањамо из кластера, већ једноставно пратимо оптерећење. Ако се стабилизовао и налази се у коридору оптималних вредности оптерећења, онда једноставно настављамо са праћењем. Ако један чвор није био довољан, додајемо још један.

За случајеве када је пред нама лекција, већ сигурно знамо да један чвор неће бити довољан, па одмах покрећемо све слободне чворове и држимо их активним до краја лекције. Ово се дешава помоћу листе временских ознака активности.

Закључак

Аутосцалер је добро и згодно решење за оне случајеве када имате неравномерно учитавање кластера. Истовремено постижете жељену конфигурацију кластера за вршна оптерећења и у исто време не задржавате овај кластер током подоптерећења, штедећи новац. Па, плус ово се све дешава аутоматски без вашег учешћа. Сам аутоскалер није ништа друго до скуп захтева ка АПИ-ју менаџера кластера и АПИ-ју добављача облака, написаних према одређеној логици. Оно што свакако треба да запамтите је подела чворова на 3 типа, као што смо раније писали. И бићете срећни.

Извор: ввв.хабр.цом

Додај коментар