Kif tagħmel l-autoscaler tiegħek għal cluster

Bongu! Aħna nħarrġu lin-nies biex jaħdmu bil-big data. Huwa impossibbli li wieħed jimmaġina programm edukattiv dwar il-big data mingħajr il-cluster tiegħu stess, li fih il-parteċipanti kollha jaħdmu flimkien. Għal din ir-raġuni, il-programm tagħna dejjem għandu 🙂 Aħna impenjati fil-konfigurazzjoni, l-irfinar u l-amministrazzjoni tiegħu, u l-ġuvini iniedu direttament l-impjiegi MapReduce hemmhekk u jużaw Spark.

F'din il-kariga se ngħidulek kif solvejna l-problema tat-tagħbija irregolari tal-clusters billi niktbu l-autoscaler tagħna stess bl-użu tas-sħab Mail.ru Soluzzjonijiet Cloud.

problema

Il-cluster tagħna ma jintużax f'mod tipiku. Ir-rimi huwa irregolari ħafna. Pereżempju, hemm klassijiet prattiċi, meta t-30 persuna kollha u għalliem imorru fil-cluster u jibdew jużawh. Jew għal darb'oħra, hemm ġranet qabel l-iskadenza meta t-tagħbija tiżdied ħafna. Il-bqija tal-ħin il-cluster jopera fil-modalità taħt tagħbija.

Is-soluzzjoni #1 hija li żżomm raggruppament li jiflaħ l-ogħla tagħbijiet, iżda se jkun idle l-bqija tal-ħin.

Is-soluzzjoni #2 hija li żżomm cluster żgħir, li miegħu żżid manwalment nodi qabel il-klassijiet u waqt l-ogħla tagħbija.

Is-soluzzjoni #3 hija li żżomm cluster żgħir u tikteb autoscaler li se jimmonitorja t-tagħbija attwali tal-cluster u, bl-użu ta 'diversi APIs, iżid u neħħi nodi mill-cluster.

F'din il-post se nitkellmu dwar is-soluzzjoni #3. Dan l-autoscaler huwa dipendenti ħafna fuq fatturi esterni aktar milli dawk interni, u l-fornituri ħafna drabi ma jipprovduhx. Aħna nużaw l-infrastruttura tal-cloud Mail.ru Cloud Solutions u ktibna autoscaler bl-użu tal-MCS API. U peress li ngħallmu kif taħdem bid-dejta, iddeċidejna li nuru kif tista’ tikteb autoscaler simili għall-iskopijiet tiegħek u tużah mal-cloud tiegħek

Prerekwiżiti

L-ewwel, irid ikollok cluster Hadoop. Pereżempju, nużaw id-distribuzzjoni HDP.

Sabiex in-nodi tiegħek jiġu miżjuda u mneħħija malajr, irid ikollok ċerta distribuzzjoni ta 'rwoli fost in-nodi.

  1. Nodu prinċipali. Ukoll, m'hemmx għalfejn tispjega xi ħaġa b'mod partikolari: in-nodu ewlieni tal-cluster, li fuqu, pereżempju, jitnieda s-sewwieq Spark, jekk tuża l-mod interattiv.
  2. Data node. Dan huwa n-nodu li fih taħżen id-dejta fuq HDFS u fejn isiru l-kalkoli.
  3. Nodu tal-kompjuter. Dan huwa node fejn ma taħżen xejn fuq HDFS, iżda fejn iseħħu l-kalkoli.

Важный момент. Автоскейлинг будет происходить за счет нод третьего типа. Если вы начнете забирать и добавлять ноды второго типа, то скорость реагирования будет сильно низкой – декомишен и рекомишен будет занимать часы на вашем кластере. Это, конечно, не то, что ожидаешь от автоскейлинга. То есть ноды первого и второго типа мы не трогаем. Они будут представлять собой минимально жизнеспособный кластер, который будет существовать на протяжении всего действия программы.

Allura, l-autoscaler tagħna huwa miktub f'Python 3, juża l-API Ambari biex jimmaniġġja s-servizzi tal-clusters, juża API minn Mail.ru Cloud Solutions (MCS) għall-istartjar u l-waqfien tal-magni.

Arkitettura tas-soluzzjoni

  1. Modulu autoscaler.py. Fiha tliet klassijiet: 1) funzjonijiet biex taħdem ma' Ambari, 2) funzjonijiet biex taħdem ma' MCS, 3) funzjonijiet relatati direttament mal-loġika tal-autoscaler.
  2. Iskript observer.py. Essenzjalment tikkonsisti f'regoli differenti: meta u f'liema mumenti tissejjaħ il-funzjonijiet tal-autoscaler.
  3. Fajl ta' konfigurazzjoni config.py. Fih, pereżempju, lista ta’ nodi permessi għall-autoscaling u parametri oħra li jaffettwaw, pereżempju, kemm tistenna mill-mument li ġie miżjud nodu ġdid. Hemm ukoll timestamps għall-bidu tal-klassijiet, sabiex qabel il-klassi titnieda l-konfigurazzjoni massima permessa tal-cluster.

Ejja issa nħarsu lejn il-biċċiet tal-kodiċi ġewwa l-ewwel żewġ fajls.

1. Modulu Autoscaler.py

Klassi Ambari

Hekk tidher biċċa kodiċi li fiha klassi Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Hawn fuq, bħala eżempju, tista 'tħares lejn l-implimentazzjoni tal-funzjoni stop_all_services, li jwaqqaf is-servizzi kollha fuq in-nodu tal-cluster mixtieq.

Fid-daħla tal-klassi Ambari int tgħaddi:

  • ambari_url, per eżempju, simili 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – l-isem tal-cluster tiegħek f'Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • u ġewwa auth hawn huma l-login u l-password tiegħek għal Ambari: auth = ('login', 'password').

Il-funzjoni nnifisha hija xejn aktar minn ftit sejħiet permezz tal-API REST lil Ambari. Mil-lat loġiku, l-ewwel nirċievu lista ta 'servizzi li qed jaħdmu fuq node, u mbagħad nistaqsu fuq cluster partikolari, fuq nodu partikolari, biex tittrasferixxi servizzi mil-lista għall-istat INSTALLED. Funzjonijiet għat-tnedija tas-servizzi kollha, għat-trasferiment ta 'nodi għall-istat Maintenance eċċ jidhru simili - huma biss ftit talbiet permezz tal-API.

Klassi Mcs

Hekk tidher biċċa kodiċi li fiha klassi Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Fid-daħla tal-klassi Mcs ngħaddu l-id tal-proġett ġewwa l-sħaba u l-id tal-utent, kif ukoll il-password tiegħu. Fil-funzjoni vm_turn_on irridu nixgħel waħda mill-magni. Il-loġika hawnhekk hija ftit aktar ikkumplikata. Fil-bidu tal-kodiċi, jissejħu tliet funzjonijiet oħra: 1) għandna bżonn niksbu token, 2) irridu naqilbu l-hostname fl-isem tal-magna f'MCS, 3) nikseb l-id ta 'din il-magna. Sussegwentement, aħna sempliċement nagħmlu talba tal-post u nniedu din il-magna.

Hekk tidher il-funzjoni biex jinkiseb token:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Класс Autoscaler

Din il-klassi fiha funzjonijiet relatati mal-loġika operattiva nnifisha.

Hekk tidher biċċa kodiċi għal din il-klassi:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Aħna naċċettaw klassijiet għad-dħul. Ambari и Mcs, lista ta 'nodi li huma permessi għall-iskala, kif ukoll parametri ta' konfigurazzjoni tan-nodi: memorja u cpu allokati lin-nodu f'ĦJUT. Hemm ukoll 2 parametri interni q_ram, q_cpu, li huma kjuwijiet. Bl-użu tagħhom, aħna naħżnu l-valuri tat-tagħbija attwali tal-cluster. Jekk naraw li matul l-aħħar 5 minuti kien hemm tagħbija miżjuda b'mod konsistenti, allura niddeċiedu li għandna nżidu +1 node mal-cluster. L-istess jgħodd għall-istat ta' sottoutilizzazzjoni tal-cluster.

Il-kodiċi ta 'hawn fuq huwa eżempju ta' funzjoni li tneħħi magna mill-cluster u twaqqafha fis-sħaba. L-ewwel hemm dekummissjonar YARN Nodemanager, imbagħad il-mod jinxtegħel Maintenance, imbagħad nieqfu s-servizzi kollha fuq il-magna u itfi l-magna virtwali fis-sħaba.

2. Script observer.py

Kodiċi tal-kampjun minn hemm:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Fiha, aħna niċċekkjaw jekk inħolqux kundizzjonijiet biex tiżdied il-kapaċità tal-cluster u jekk hemmx xi magni fir-riżerva, nikseb l-isem tal-host ta 'wieħed minnhom, żidha mal-cluster u tippubblika messaġġ dwaru fuq Slack tat-tim tagħna. Wara dan jibda cooldown_period, когда мы ничего не добавляем и не убираем из кластера, а просто мониторим загрузку. Если она стабилизировалась и находится внутри коридора оптимальных значений загрузки, то мы просто продолжаем мониторинг. Если же одной ноды не хватило, то добавляем еще одну.

Għal każijiet meta jkollna lezzjoni quddiemna, diġà nafu żgur li nodu wieħed mhux se jkun biżżejjed, għalhekk nibdew immedjatament in-nodi ħielsa kollha u nżommuhom attivi sa tmiem il-lezzjoni. Dan jiġri bl-użu ta' lista ta' timestamps ta' attività.

Konklużjoni

Автоскейлер – это хорошее и удобное решение для тех случаев, когда у вас наблюдается неравномерная загрузка кластера. Вы одновременно добиваетесь нужной конфигурации кластера под пиковые нагрузки и при этом не держите этот кластер во время недозагрузки, экономя средства. Ну и плюс это все происходит автоматизированно без вашего участия. Сам автоскейлер – это не более, чем набор запросов к API кластер-менеджера и API облачного провайдера, прописанных по определенной логике. О чем точно нужно помнить – это о разделении нод на 3 типа, как мы писали ранее. И будет вам счастье.

Sors: www.habr.com

Żid kumment