Кантип кластер үчүн өзүңүздүн автомасштабыңызды жасоо керек

Салам! Биз адамдарды чоң маалыматтар менен иштөөгө үйрөтөбүз. Чоң маалыматтар боюнча билим берүү программасын бардык катышуучулар чогуу иштеген өзүнүн кластерисиз элестетүү мүмкүн эмес. Ушул себептен улам, биздин программа ар дайым бар :) Биз анын конфигурациясы, тюнинги жана башкаруусу менен алектенебиз, ал эми балдар ал жерде MapReduce жумуштарын түз ишке киргизип, Spark колдонушат.

Бул постто биз булуттун жардамы менен өзүбүздүн автоскализаторду жазуу менен кластердин бирдей эмес жүктөө маселесин кантип чечкенибизди айтып беребиз. Mail.ru Cloud Solutions.

маселе

Биздин кластер типтүү режимде колдонулбайт. Жоюу өтө бирдей эмес. Мисалы, практикалык сабактар ​​бар, 30 адам жана бир мугалим кластерге барып, аны колдоно башташат. Же дагы, жүк абдан көбөйгөн мөөнөткө чейин күн бар. Калган убакта кластер аз жүктөө режиминде иштейт.

Чечим №1 - эң чоң жүктөмгө туруштук бере турган, бирок калган убакта бош турган кластерди сактоо.

Чечим №2 - кичинекей кластерди сактоо, ага сиз класстардын алдында жана эң жогорку жүктөмдөрдүн учурунда кол менен түйүндөрдү кошосуз.

Чечим №3 кичинекей кластерди сактап, кластердин учурдагы жүгүн көзөмөлдөй турган автомасштабтарды жазуу жана ар кандай API'лерди колдонуу менен кластерден түйүндөрдү кошуу жана алып салуу.

Бул постто биз №3 чечим жөнүндө сүйлөшөбүз. Бул автошкалагыч ички факторлорго караганда тышкы факторлорго абдан көз каранды жана провайдерлер көбүнчө аны камсыз кылышпайт. Биз Mail.ru Cloud Solutions булут инфраструктурасын колдонобуз жана MCS API аркылуу автоматтык масштабдарды жаздык. Жана биз маалыматтар менен иштөөнү үйрөткөндүктөн, сиз өз максаттарыңыз үчүн ушундай авто масштаб салгычты кантип жазып, аны булутуңуз менен кантип колдонсоңуз болорун көрсөтүүнү чечтик.

Талаптар

Биринчиден, сизде Hadoop кластери болушу керек. Мисалы, биз HDP бөлүштүрүүнү колдонобуз.

Түйүндөрүңүз тез кошулуп жана алынып салынышы үчүн сизде түйүндөр арасында ролдордун белгилүү бир бөлүштүрүлүшү керек.

  1. Мастер түйүн. Ооба, бул жерде түшүндүрүүнүн кереги жок: кластердин негизги түйүнү, мисалы, интерактивдүү режимди колдонсоңуз, Spark драйвери ишке киргизилет.
  2. Дата түйүнү. Бул HDFSде маалыматтарды сактаган түйүн жана эсептөөлөр орун алган жерде.
  3. Эсептөө түйүнү. Бул HDFSде эч нерсени сактабай турган түйүн, бирок эсептөөлөр болгон жерде.

Маанилүү чекит. Автомасштабтоо үчүнчү типтеги түйүндөрдөн улам пайда болот. Эгер сиз экинчи типтеги түйүндөрдү алып жана кошуп баштасаңыз, жооп берүү ылдамдыгы өтө төмөн болот - өчүрүү жана кайра иштетүү кластериңизде бир нече саатты талап кылат. Бул, албетте, автоматтык масштабдан күткөн нерсе эмес. Башкача айтканда, биз биринчи жана экинчи типтеги түйүндөргө тийбейбиз. Алар программанын бүткүл мөөнөтүндө боло турган минималдуу жашоого жөндөмдүү кластерди билдирет.

Ошентип, биздин autoscaler Python 3 менен жазылган, кластердик кызматтарды башкаруу үчүн Ambari API колдонот, колдонот Mail.ru Cloud Solutionsдан API (MCS) машиналарды иштетүү жана токтотуу үчүн.

Чечимдин архитектурасы

  1. Модуль autoscaler.py. Ал үч классты камтыйт: 1) Ambari менен иштөө функциялары, 2) MCS менен иштөө функциялары, 3) автоматтык масштабдардын логикасына түздөн-түз байланыштуу функциялар.
  2. Скрипт observer.py. Негизинен ал ар кандай эрежелерден турат: качан жана кайсы учурда автоматтык масштабдагы функцияларды чакыруу керек.
  3. Тарам билэ config.py. Ал, мисалы, автоматтык масштабдатууга уруксат берилген түйүндөрдүн тизмесин жана башка параметрлерди камтыйт, алар, мисалы, жаңы түйүн кошулган учурдан тартып канча убакыт күтүш керек. Класс башталганга чейин максималдуу уруксат берилген кластер конфигурациясы ишке кириши үчүн сабактардын башталышы үчүн убакыт белгилери да бар.

Эми биринчи эки файлдын ичиндеги код бөлүктөрүн карап көрөлү.

1. Autoscaler.py модулу

Амбари класс

Классты камтыган коддун бир бөлүгү ушундай көрүнөт Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Жогоруда, мисал катары, сиз функциянын аткарылышын карай аласыз stop_all_services, каалаган кластер түйүнүндөгү бардык кызматтарды токтотот.

Класстын кире беришинде Ambari сен өтүп:

  • ambari_url, мисалы, сыяктуу 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – Амбаридеги кластериңиздин аты,
  • headers = {'X-Requested-By': 'ambari'}
  • жана ичинде auth бул жерде Ambari үчүн логин жана парол: auth = ('login', 'password').

Функциянын өзү REST API аркылуу Ambariге бир нече чалуудан башка эч нерсе эмес. Логикалык көз караштан алганда, биз адегенде түйүндө иштеп жаткан кызматтардын тизмесин алабыз, андан кийин берилген кластерде, берилген түйүндөн кызматтарды тизмеден мамлекетке өткөрүп берүүнү суранабыз. INSTALLED. Бардык кызматтарды ишке киргизүү, түйүндөрдү абалга өткөрүү функциялары Maintenance ж.б. окшош көрүнөт - алар API аркылуу бир нече гана өтүнүчтөр.

Class Mcs

Классты камтыган коддун бир бөлүгү ушундай көрүнөт Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Класстын кире беришинде Mcs биз булуттун ичиндеги долбоордун идентификаторун жана колдонуучунун идентификаторун, ошондой эле анын сырсөзүн өткөрүп беребиз. Функцияда vm_turn_on биз машындардын бирин иштеткибиз келет. Бул жерде логика бир аз татаалыраак. Коддун башында үч башка функция деп аталат: 1) биз токен алышыбыз керек, 2) хосттун атын MCSдеги машинанын атына айландырышыбыз керек, 3) бул машинанын идентификаторун алуу. Андан кийин, биз жөн гана билдирүү жөнөтүп, бул машинаны ишке киргизебиз.

Токенди алуу функциясы мындай көрүнөт:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Autoscaler классы

Бул класс операциялык логикага тиешелүү функцияларды камтыйт.

Бул класс үчүн коддун бир бөлүгү мындай көрүнөт:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Кирүү үчүн класстарды кабыл алабыз. Ambari и Mcs, масштабдоо үчүн уруксат берилген түйүндөрдүн тизмеси, ошондой эле түйүн конфигурациясынын параметрлери: эстутум жана YARNдеги түйүнгө бөлүнгөн CPU. Ошондой эле кезекте турган 2 ички параметр q_ram, q_cpu бар. Аларды колдонуу менен биз учурдагы кластердик жүктүн маанилерин сактайбыз. Акыркы 5 мүнөттүн ичинде жүк тынымсыз көбөйүп жатканын көрсөк, анда кластерге +1 түйүн кошуу керек деп чечебиз. Ошол эле кластердин толук пайдаланылбаган абалына да тиешелүү.

Жогорудагы код машинаны кластерден алып салуучу жана аны булутта токтоткон функциянын мисалы. Алгач эксплуатациядан чыгаруу бар YARN Nodemanager, андан кийин режим күйөт Maintenance, анда биз машинадагы бардык кызматтарды токтотуп, булуттагы виртуалдык машинаны өчүрөбүз.

2. Script observer.py

Ал жерден үлгү код:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Анда биз кластердин кубаттуулугун жогорулатуу үчүн шарттар түзүлгөнбү жана резервде машиналар барбы, жокпу, текшерип, алардын биринин хост атын алып, кластерге кошуп, бул тууралуу биздин команданын Slack сайтында билдирүү жарыялайбыз. Андан кийин башталат cooldown_period, биз кластерден эч нерсе кошуп же алып салбайбыз, бирок жөн гана жүктү көзөмөлдөйбүз. Эгерде ал турукташып, оптималдуу жүктүн коридорунун ичинде болсо, анда биз жөн гана мониторингди улантабыз. Эгерде бир түйүн жетишсиз болсо, анда биз башкасын кошобуз.

Алдыда сабак турган учурларда, биз бир түйүн жетишсиз болорун так билебиз, ошондуктан биз дароо бардык бош түйүндөрдү баштайбыз жана аларды сабактын аягына чейин активдүү кармайбыз. Бул аракет убакыт белгилеринин тизмесин колдонуу менен болот.

жыйынтыктоо

Autoscaler бир калыпта эмес кластер жүктөлгөн учурлар үчүн жакшы жана ыңгайлуу чечим болуп саналат. Сиз бир эле учурда эң жогорку жүктөмдөр үчүн каалаган кластердин конфигурациясына жетесиз жана ошол эле учурда акчаны үнөмдөө менен, аз жүктөлгөн учурда бул кластерди сактабайсыз. Ооба, мунун баары сиздин катышуусуз автоматтык түрдө ишке ашат. Autoscaler өзү белгилүү бир логикага ылайык жазылган кластер менеджери API жана булут провайдеринин API сурамдарынын жыйындысынан башка эч нерсе эмес. Сиз, албетте, эстен чыгарбоо керек, биз мурда жазгандай, түйүндөрдү 3 түргө бөлүү. Ошондо сен бактылуу болосуң.

Source: www.habr.com

Комментарий кошуу