Кластер үшін өзіңіздің автомасштабыңызды қалай жасауға болады

Сәлеметсіз бе! Біз адамдарды үлкен деректермен жұмыс істеуге үйретеміз. Үлкен деректер бойынша білім беру бағдарламасын барлық қатысушылар бірлесіп жұмыс істейтін өз кластерісіз елестету мүмкін емес. Осы себепті, біздің бағдарламада әрқашан бар 🙂 Біз оны конфигурациялаумен, баптаумен және басқарумен айналысамыз, ал жігіттер MapReduce тапсырмаларын сол жерде тікелей іске қосып, Spark пайдаланады.

Бұл постта біз бұлтты пайдаланып өз автомасштабымызды жазу арқылы біркелкі емес кластерді жүктеу мәселесін қалай шешкенімізді айтамыз. Mail.ru бұлтты шешімдері.

проблема

Біздің кластер әдеттегі режимде пайдаланылмайды. Жою өте біркелкі емес. Мысалы, 30 адам мен мұғалімнің барлығы кластерге кіріп, оны қолдана бастағанда практикалық сабақтар бар. Немесе тағы да, жүктеме айтарлықтай өсетін мерзімге дейін күндер бар. Қалған уақытта кластер аз жүктелу режимінде жұмыс істейді.

№1 шешім - ең жоғары жүктемелерге төтеп беретін, бірақ қалған уақытта жұмыссыз болатын кластерді сақтау.

Шешім №2 - сыныптар алдында және ең жоғары жүктеме кезінде түйіндерді қолмен қосатын шағын кластерді сақтау.

№3 шешім - шағын кластерді сақтау және кластердің ағымдағы жүктемесін бақылайтын және әртүрлі API интерфейстерін пайдалана отырып, кластерден түйіндерді қосу және жою автоматты масштабтау құралын жазу.

Бұл постта біз №3 шешім туралы айтатын боламыз. Бұл автомасштабтауыш ішкі факторларға емес, сыртқы факторларға өте тәуелді және провайдерлер оны жиі қамтамасыз етпейді. Біз Mail.ru Cloud Solutions бұлттық инфрақұрылымын қолданамыз және MCS API арқылы автомасштабты жаздық. Біз деректермен жұмыс істеуді үйрететіндіктен, ұқсас автомасштабты өз мақсаттарыңыз үшін қалай жазуға және оны бұлтпен пайдалануға болатынын көрсетуді шештік.

Пререквизиттер

Біріншіден, сізде Hadoop кластері болуы керек. Мысалы, біз HDP дистрибуциясын қолданамыз.

Түйіндеріңізді жылдам қосу және жою үшін түйіндер арасында рөлдердің белгілі бір бөлінуі болуы керек.

  1. Негізгі түйін. Ештеңені түсіндірудің қажеті жоқ: кластердің негізгі түйіні, мысалы, интерактивті режимді пайдалансаңыз, Spark драйвері іске қосылады.
  2. Күн түйіні. Бұл HDFS жүйесінде деректерді сақтайтын және есептеулер орындалатын түйін.
  3. Есептеу түйіні. Бұл HDFS жүйесінде ештеңе сақтамайтын, бірақ есептеулер орындалатын түйін.

Маңызды нүкте. Автомасштабтау үшінші түрдегі түйіндерге байланысты орын алады. Екінші типті түйіндерді қабылдауды және қосуды бастасаңыз, жауап беру жылдамдығы өте төмен болады - жою және қайта қосу кластеріңізде бірнеше сағатты алады. Бұл, әрине, автоматты масштабтаудан күткен нәрсе емес. Яғни, біз бірінші және екінші типтегі түйіндерге қол тигізбейміз. Олар бағдарламаның бүкіл ұзақтығында болатын ең аз өміршең кластерді білдіреді.

Сонымен, біздің авто масштабтауышымыз Python 3-те жазылған, кластерлік қызметтерді басқару үшін Ambari API пайдаланады, пайдаланады Mail.ru Cloud Solutions ұсынған API (MCS) машиналарды қосуға және тоқтатуға арналған.

Шешім архитектурасы

  1. Модуль autoscaler.py. Ол үш класты қамтиды: 1) Ambari-мен жұмыс істеу функциялары, 2) MCS-пен жұмыс істеу функциялары, 3) автомасштабтаушының логикасына тікелей қатысты функциялар.
  2. Сценарий observer.py. Негізінде ол әр түрлі ережелерден тұрады: қашан және қандай сәтте автоматты масштабтау функцияларын шақыру керек.
  3. Конфигурация файлы config.py. Ол, мысалы, автомасштабтауға рұқсат етілген түйіндер тізімін және мысалы, жаңа түйін қосылған сәттен бастап қанша уақыт күтуге әсер ететін басқа параметрлерді қамтиды. Сондай-ақ сабақтардың басталуына арналған уақыт белгілері бар, осылайша сынып алдында максималды рұқсат етілген кластер конфигурациясы іске қосылады.

Енді алғашқы екі файлдың ішіндегі код бөліктерін қарастырайық.

1. Autoscaler.py модулі

Амбари сыныбы

Классты қамтитын код бөлігі осылай көрінеді Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Жоғарыда, мысал ретінде, функцияның орындалуын қарауға болады stop_all_services, ол қалаған кластер түйініндегі барлық қызметтерді тоқтатады.

Сыныпқа кіре берісте Ambari сіз өтесіз:

  • ambari_url, мысалы, сияқты 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – Амбаридегі кластеріңіздің атауы,
  • headers = {'X-Requested-By': 'ambari'}
  • және ішінде auth мұнда Ambari үшін логин мен пароль: auth = ('login', 'password').

Функцияның өзі REST API арқылы Ambari-ге бірнеше қоңыраулардан басқа ештеңе емес. Логикалық тұрғыдан біз алдымен түйінде жұмыс істейтін қызметтердің тізімін аламыз, содан кейін берілген кластерде, берілген түйінде қызметтерді тізімнен күйге ауыстыруды сұраймыз. INSTALLED. Барлық қызметтерді іске қосу, түйіндерді күйге ауыстыру функциялары Maintenance т.б. ұқсас көрінеді - олар API арқылы жасалған бірнеше сұраулар ғана.

Сынып Mcs

Классты қамтитын код бөлігі осылай көрінеді Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Сыныпқа кіре берісте Mcs біз жоба идентификаторын бұлт пен пайдаланушы идентификаторын, сондай-ақ оның құпия сөзін жібереміз. Функцияда vm_turn_on біз машиналардың бірін қосқымыз келеді. Мұнда логика сәл күрделірек. Кодтың басында тағы үш функция шақырылады: 1) бізге маркер алу керек, 2) хост атын MCS жүйесіндегі машинаның атына түрлендіру керек, 3) осы машинаның идентификаторын алу. Содан кейін біз жай ғана пост сұрауын жасаймыз және осы құрылғыны іске қосамыз.

Токенді алу функциясы келесідей:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Автомасштабтау класы

Бұл класс операциялық логиканың өзіне қатысты функцияларды қамтиды.

Бұл сыныпқа арналған код бөлігі келесідей:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Кіру үшін сабақтарды қабылдаймыз. Ambari и Mcs, масштабтауға рұқсат етілген түйіндердің тізімі, сондай-ақ түйін конфигурациясының параметрлері: YARN жүйесіндегі түйінге бөлінген жад және cpu. Сондай-ақ кезек болып табылатын q_ram, q_cpu 2 ішкі параметрі бар. Оларды пайдалана отырып, біз ағымдағы кластерлік жүктеменің мәндерін сақтаймыз. Соңғы 5 минут ішінде жүктеменің тұрақты түрде артқанын көрсек, кластерге +1 түйінді қосу керек деп шешеміз. Дәл осындай жағдай кластерді толық пайдаланбау күйіне де қатысты.

Жоғарыдағы код машинаны кластерден алып тастайтын және оны бұлтта тоқтататын функцияның мысалы болып табылады. Алдымен пайдаланудан шығару бар YARN Nodemanager, содан кейін режим қосылады Maintenance, содан кейін біз машинадағы барлық қызметтерді тоқтатамыз және бұлттағы виртуалды машинаны өшіреміз.

2. observer.py сценарийі

Сол жерден үлгі код:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Онда біз кластердің сыйымдылығын арттыру үшін жағдай жасалғанын және резервте машиналар бар-жоғын тексереміз, олардың біреуінің хост атын аламыз, оны кластерге қосамыз және бұл туралы біздің команданың Slack-те хабарлама жариялаймыз. Осыдан кейін ол басталады cooldown_period, біз кластерден ештеңе қоспаймыз немесе алып тастамаймыз, тек жүктемені бақылаймыз. Егер ол тұрақтанса және оңтайлы жүктеме мәндерінің дәлізінде болса, біз жай ғана бақылауды жалғастырамыз. Егер бір түйін жеткіліксіз болса, біз басқасын қосамыз.

Алда сабақ күтіп тұрған жағдайда, біз бір түйіннің жеткіліксіз болатындығын анық білеміз, сондықтан біз барлық бос түйіндерді дереу іске қосып, сабақтың соңына дейін белсенді ұстаймыз. Бұл әрекет уақыт белгілерінің тізімін пайдалану арқылы орын алады.

қорытынды

Автомасштабтауыш кластер біркелкі емес жүктелетін жағдайлар үшін жақсы және ыңғайлы шешім болып табылады. Сіз бір уақытта ең жоғары жүктемелер үшін қалаған кластер конфигурациясына қол жеткізесіз және сонымен бірге ақшаны үнемдей отырып, аз жүктелу кезінде бұл кластерді сақтамайсыз. Сонымен қатар, мұның бәрі сіздің қатысуыңызсыз автоматты түрде болады. Автомасштабтың өзі белгілі бір логикаға сәйкес жазылған кластер менеджері API және бұлттық провайдер API сұрауларының жиынтығынан басқа ештеңе емес. Сізге міндетті түрде есте сақтау керек нәрсе - біз бұрын жазғанымыздай, түйіндерді 3 түрге бөлу. Ал сен бақытты боласың.

Ақпарат көзі: www.habr.com

пікір қалдыру