Како да направите ваш сопствен автоскалирач за кластер

Здраво! Ги обучуваме луѓето да работат со големи податоци. Невозможно е да се замисли образовна програма за големи податоци без свој кластер, на кој сите учесници работат заедно. Поради оваа причина, нашата програма секогаш ја има 🙂 Ние сме ангажирани во нејзината конфигурација, подесување и администрација, а момците директно ги лансираат работните места на MapReduce таму и користат Spark.

Во оваа објава ќе ви кажеме како го решивме проблемот со нерамномерно вчитување на кластерот со пишување на сопствен автоскалирач користејќи го облакот Mail.ru Cloud Solutions.

проблем

Нашиот кластер не се користи во типичен режим. Отстранувањето е многу нерамномерно. На пример, има практични часови, кога сите 30 луѓе и еден наставник одат во кластерот и почнуваат да го користат. Или повторно, има денови пред крајниот рок кога товарот значително се зголемува. Остатокот од времето кластерот работи во режим на недоволно оптоварување.

Решението бр. 1 е да се задржи кластер што ќе издржи врвни оптоварувања, но ќе биде неактивен остатокот од времето.

Решението бр. 2 е да се задржи мал кластер, на кој рачно додавате јазли пред часовите и за време на врвните оптоварувања.

Решението бр. 3 е да се задржи мал кластер и да се напише автоскалирач кој ќе го следи тековното оптоварување на кластерот и, користејќи различни API, ќе додава и отстранува јазли од кластерот.

Во овој пост ќе зборуваме за решението бр. 3. Овој автоскалилер е многу зависен од надворешни фактори, а не од внатрешни, и често давателите на услуги не го обезбедуваат. Ја користиме инфраструктурата на облакот на Mail.ru Cloud Solutions и напишавме автоскалер користејќи го MCS API. И бидејќи учиме како да работиме со податоци, решивме да покажеме како можете да напишете сличен автоскалер за ваши цели и да го користите со вашиот облак

Предуслови

Прво, мора да имате Hadoop кластер. На пример, ја користиме дистрибуцијата на HDP.

Со цел вашите јазли да бидат брзо додадени и отстранети, мора да имате одредена дистрибуција на улоги меѓу јазлите.

  1. Главен јазол. Па, нема потреба да се објаснува ништо посебно: главниот јазол на кластерот, на кој, на пример, се активира двигателот Spark, ако го користите интерактивниот режим.
  2. Датумски јазол. Ова е јазолот на кој складирате податоци на HDFS и каде што се вршат пресметките.
  3. Компјутерски јазол. Ова е јазол каде што не складирате ништо на HDFS, туку каде што се случуваат пресметките.

Важна точка. Автоматското скалирање ќе се случи поради јазли од третиот тип. Ако почнете да земате и додавате јазли од вториот тип, брзината на одговор ќе биде многу мала - деактивирањето и повторното вклучување ќе траат со часови на вашиот кластер. Ова, се разбира, не е она што го очекувате од автоматското скалирање. Тоа е, ние не ги допираме јазлите од првиот и вториот тип. Тие ќе претставуваат минимален остварлив кластер што ќе постои во текот на целото времетраење на програмата.

Значи, нашиот автоскалирач е напишан во Python 3, го користи Ambari API за управување со кластер услуги, користи API од Mail.ru Cloud Solutions (MCS) за палење и запирање машини.

Архитектура на решенија

  1. Модул autoscaler.py. Содржи три класи: 1) функции за работа со Ambari, 2) функции за работа со MCS, 3) функции поврзани директно со логиката на автоскалерот.
  2. Скрипта observer.py. Во суштина, тој се состои од различни правила: кога и во кои моменти да се повикаат функциите на автоскалерот.
  3. Конфигурациска датотека config.py. Содржи, на пример, листа на јазли дозволени за автоматско скалирање и други параметри кои влијаат, на пример, на тоа колку долго да се чека од моментот кога е додаден нов јазол. Исто така, постојат временски ознаки за почеток на часовите, така што пред класата се активира максималната дозволена конфигурација на кластерот.

Ајде сега да ги погледнеме парчињата код во првите две датотеки.

1. Autoscaler.py модул

Час Амбари

Вака изгледа парче код што содржи класа Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Погоре, како пример, можете да ја погледнете имплементацијата на функцијата stop_all_services, што ги запира сите услуги на саканиот јазол на кластерот.

На влезот во класот Ambari поминуваш:

  • ambari_url, на пример, како 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – името на вашиот кластер во Амбари,
  • headers = {'X-Requested-By': 'ambari'}
  • и внатре auth еве го вашето најавување и лозинка за Ambari: auth = ('login', 'password').

Самата функција не е ништо повеќе од неколку повици преку REST API до Ambari. Од логичка гледна точка, прво добиваме листа на активни услуги на јазол, а потоа бараме на даден кластер, на даден јазол, да се префрлат услугите од списокот во состојбата INSTALLED. Функции за стартување на сите услуги, за пренос на јазли во состојба Maintenance итн изгледаат слично - тие се само неколку барања преку API.

Класа Mcs

Вака изгледа парче код што содржи класа Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

На влезот во класот Mcs го пренесуваме проектниот проект во облакот и корисничкиот ID, како и неговата лозинка. Во функција vm_turn_on сакаме да вклучиме една од машините. Логиката овде е малку покомплицирана. На почетокот на кодот се нарекуваат три други функции: 1) треба да добиеме токен, 2) треба да го конвертираме името на домаќинот во име на машината во MCS, 3) да го добиеме ID на оваа машина. Следно, ние едноставно правиме барање за објавување и ја стартуваме оваа машина.

Вака изгледа функцијата за добивање токен:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Класа на Autoscaler

Оваа класа содржи функции поврзани со самата оперативна логика.

Вака изгледа дел од кодот за оваа класа:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Ние прифаќаме часови за влез. Ambari и Mcs, список на јазли што се дозволени за скалирање, како и параметри за конфигурација на јазли: меморија и процесор доделени на јазолот во YARN. Има и 2 внатрешни параметри q_ram, q_cpu, кои се редици. Користејќи ги, ги складираме вредностите на тековното оптоварување на кластерот. Ако видиме дека во последните 5 минути има постојано зголемено оптоварување, тогаш одлучивме дека треба да додадеме +1 јазол во кластерот. Истото важи и за состојбата на недоволно искористување на кластерот.

Кодот погоре е пример за функција која отстранува машина од кластерот и ја запира во облакот. Прво има деактивирање YARN Nodemanager, потоа режимот се вклучува Maintenance, потоа ги запираме сите услуги на машината и ја исклучуваме виртуелната машина во облакот.

2. Набљудувач на скрипта.py

Примерок код од таму:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Во него, проверуваме дали се создадени услови за зголемување на капацитетот на кластерот и дали има машини во резерва, го добиваме името на домаќинот на една од нив, го додаваме во кластерот и објавуваме порака за тоа на Slack на нашиот тим. По што започнува cooldown_period, кога не додаваме или отстрануваме ништо од кластерот, туку едноставно го следиме оптоварувањето. Доколку се стабилизира и е во коридорот на оптималните вредности на оптоварување, тогаш едноставно продолжуваме со следењето. Ако еден јазол не беше доволен, тогаш додаваме уште еден.

За случаите кога ни претстои лекција, веќе со сигурност знаеме дека еден јазол нема да биде доволен, па веднаш ги стартуваме сите бесплатни јазли и ги одржуваме активни до крајот на часот. Ова се случува со помош на список на временски ознаки за активност.

Заклучок

Autoscaler е добро и практично решение за оние случаи кога доживувате нерамномерно вчитување на кластерот. Истовремено ја постигнувате саканата конфигурација на кластерот за врвни оптоварувања и во исто време не го задржувате овој кластер за време на недоволно оптоварување, заштедувајќи пари. Па, плус сето ова се случува автоматски без ваше учество. Самиот автоскалер не е ништо повеќе од збир на барања до API на менаџерот на кластерот и API на давателот на облак, напишани според одредена логика. Она што дефинитивно треба да го запомните е поделбата на јазлите на 3 типа, како што напишавме претходно. И ќе бидете среќни.

Извор: www.habr.com

Додадете коментар