Hoe meitsje jo eigen autoscaler foar in kluster

Hallo! Wy traine minsken om mei grutte data te wurkjen. It is ûnmooglik om in edukatyf programma oer big data foar te stellen sûnder in eigen kluster, dêr't alle dielnimmers oan gearwurkje. Om dizze reden, ús programma altyd hat it 🙂 Wy binne dwaande mei syn konfiguraasje, tuning en administraasje, en de jongens direkt launch MapReduce banen dêr en brûk Spark.

Yn dizze post sille wy jo fertelle hoe't wy it probleem fan unjildich klusterladen hawwe oplost troch ús eigen autoscaler te skriuwen mei de wolk Mail.ru Cloud Solutions.

probleem

Us kluster wurdt net brûkt yn in typyske modus. De ôffier is tige unjildich. Der binne bygelyks praktyske lessen, as alle 30 minsken en in learaar nei it kluster geane en begjinne te brûken. Of nochris, d'r binne dagen foar de deadline wêryn't de lading sterk tanimt. De rest fan 'e tiid wurket it kluster yn underloadmodus.

Oplossing #1 is om in kluster te hâlden dy't pykladen wjerstean sil, mar de rest fan 'e tiid idle sil wêze.

Oplossing #2 is om in lyts kluster te hâlden, wêryn jo knooppunten manuell tafoegje foar klassen en tidens pykladen.

Oplossing #3 is om in lyts kluster te hâlden en in autoscaler te skriuwen dy't de aktuele lading fan it kluster kontrolearje sil en, mei help fan ferskate API's, knopen tafoegje en fuortsmite fan it kluster.

Yn dizze post sille wy prate oer oplossing #3. Dizze autoscaler is heul ôfhinklik fan eksterne faktoaren ynstee fan ynterne, en providers jouwe it faaks net. Wy brûke de wolkynfrastruktuer fan Mail.ru Cloud Solutions en skreau in autoscaler mei de MCS API. En om't wy leare hoe't jo kinne wurkje mei gegevens, hawwe wy besletten om sjen te litten hoe't jo in ferlykbere autoscaler kinne skriuwe foar jo eigen doelen en it brûke mei jo wolk

dy talittingseasken pleatst

Earst moatte jo in Hadoop-kluster hawwe. Wy brûke bygelyks de HDP-distribúsje.

Om jo knopen fluch te tafoege en te ferwiderjen, moatte jo in bepaalde ferdieling fan rollen hawwe tusken de knopen.

  1. Master node. No, d'r is net nedich om wat spesjaal te ferklearjen: it haadknooppunt fan it kluster, wêrop bygelyks de Spark-bestjoerder wurdt lansearre, as jo de ynteraktive modus brûke.
  2. Datum node. Dit is it knooppunt wêrop jo gegevens op HDFS opslaan en wêr't berekkeningen plakfine.
  3. Computing node. Dit is in knooppunt dêr't jo net opslaan neat op HDFS, mar dêr't berekkeningen barre.

Wichtich punt. Autoscaling sil foarkomme fanwege knooppunten fan it tredde type. As jo ​​​​begjinne mei it nimmen en tafoegjen fan knooppunten fan it twadde type, sil de antwurdsnelheid heul leech wêze - ûntbining en opnij ynsette sil oeren duorje op jo kluster. Dit is fansels net wat jo ferwachtsje fan autoscaling. Dat is, wy reitsje gjin knopen fan 'e earste en twadde soarten. Se sille in minimum libbensfetbere kluster fertsjinwurdigje dy't yn 'e rin fan it programma sil bestean.

Dat, ús autoscaler is skreaun yn Python 3, brûkt de Ambari API om klustertsjinsten te behearjen, brûkt API fan Mail.ru Cloud Solutions (MCS) foar it starten en stopjen fan masines.

Solution arsjitektuer

  1. Module autoscaler.py. It befettet trije klassen: 1) funksjes foar wurkjen mei Ambari, 2) funksjes foar wurkjen mei MCS, 3) funksjes dy't direkt relatearre binne oan de logika fan 'e autoscaler.
  2. Skrift observer.py. Yn essinsje bestiet it út ferskate regels: wannear en op hokker mominten de autoscalerfunksjes te neamen.
  3. Konfiguraasjetriem config.py. It befettet bygelyks in list mei knooppunten tastien foar autoscaling en oare parameters dy't bygelyks beynfloedzje hoe lang te wachtsjen fan it momint dat in nije knooppunt is tafoege. D'r binne ek tiidstempels foar it begjin fan klassen, sadat foar de klasse de maksimale tastiene klusterkonfiguraasje wurdt lansearre.

Litte wy no sjen nei de stikken koade binnen de earste twa bestannen.

1. Autoscaler.py module

Ambari klasse

Dit is wat in stikje koade mei in klasse derút sjocht Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Hjirboppe kinne jo as foarbyld sjen nei de ymplemintaasje fan 'e funksje stop_all_services, dy't alle tsjinsten op 'e winske klusterknoop stopet.

By de yngong fan de klasse Ambari do giet troch:

  • ambari_url, bygelyks, lykas 'http://localhost:8080/api/v1/clusters/',
  • cluster_name - de namme fan jo kluster yn Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • en binnen auth hjir is jo oanmelding en wachtwurd foar Ambari: auth = ('login', 'password').

De funksje sels is neat mear as in pear oproppen fia de REST API nei Ambari. Fanút in logysk eachpunt krije wy earst in list mei rinnende tsjinsten op in knooppunt, en freegje dan op in opjûne kluster, op in opjûne knooppunt, om tsjinsten fan 'e list nei de steat oer te dragen INSTALLED. Funksjes foar it lansearjen fan alle tsjinsten, foar it oerdragen fan knopen nei steat Maintenance ensfh lykje ferlykber - se binne mar in pear fersiken fia de API.

Klasse Mcs

Dit is wat in stikje koade mei in klasse derút sjocht Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

By de yngong fan de klasse Mcs wy passe de projekt-id binnen de wolk en de brûkers-id troch, lykas syn wachtwurd. Yn funksje vm_turn_on wy wolle oansette ien fan 'e masines. De logika hjir is wat yngewikkelder. Oan it begjin fan 'e koade wurde trije oare funksjes neamd: 1) wy moatte in token krije, 2) wy moatte de hostnamme konvertearje yn' e namme fan 'e masine yn MCS, 3) de id fan dizze masine krije. Folgjende meitsje wy gewoan in postfersyk en starte dizze masine.

Dit is hoe't de funksje foar it krijen fan in token derút sjocht:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Autoscaler klasse

Dizze klasse befettet funksjes yn ferbân mei de bestjoeringslogika sels.

Dit is hoe in stikje koade foar dizze klasse derút sjocht:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Wy akseptearje klassen foar yngong. Ambari и Mcs, in list mei knooppunten dy't tastien binne foar skaalfergrutting, lykas nodekonfiguraasjeparameters: ûnthâld en cpu tawiisd oan it knooppunt yn YARN. D'r binne ek 2 ynterne parameters q_ram, q_cpu, dy't wachtrijen binne. Mei help fan se bewarje wy de wearden fan 'e hjoeddeistige klusterlast. As wy sjogge dat der oer de lêste 5 minuten in konsekwint ferhege lading west hat, dan beslute wy dat wy +1 knooppunt oan it kluster taheakje moatte. Itselde jildt foar de kluster underutilization steat.

De koade hjirboppe is in foarbyld fan in funksje dy't in masine út it kluster ferwideret en stopet yn 'e wolk. Earst is der in ûntmanteling YARN Nodemanager, dan giet de modus yn Maintenance, dan stopje wy alle tsjinsten op 'e masine en skeakelje de firtuele masine yn' e wolk út.

2. Skript observer.py

Sample koade fan dêrút:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Dêryn kontrolearje wy oft betingsten makke binne foar it fergrutsjen fan de kapasiteit fan it kluster en oft d'r masines yn reserve binne, krije de hostnamme fan ien fan har, foegje it ta oan it kluster en publisearje in berjocht deroer op 'e Slack fan ús team. Dêrnei begjint it cooldown_period, as wy neat tafoegje of fuortsmite fan it kluster, mar gewoan de lading kontrolearje. As it is stabilisearre en is binnen de korridor fan optimale lading wearden, dan wy gewoan trochgean tafersjoch. As ien knooppunt net genôch wie, dan foegje wy in oare ta.

Foar gefallen dat wy in les foarút hawwe, witte wy al wis dat ien knooppunt net genôch is, dus begjinne wy ​​​​alle frije knopen fuortendaliks en hâlde se aktyf oant it ein fan 'e les. Dit bart mei in list mei tiidstempels foar aktiviteit.

konklúzje

Autoscaler is in goede en handige oplossing foar dy gefallen as jo uneven klusterladen ûnderfine. Jo berikke tagelyk de winske kluster konfiguraasje foar peak loads en tagelyk net hâlden dit kluster ûnder underload, saving jild. No, plus dit bart allegear automatysk sûnder jo dielname. De autoscaler sels is neat mear as in set oanfragen oan 'e klusterbehearder API en de wolkprovider API, skreaun neffens in bepaalde logika. Wat jo perfoarst moatte ûnthâlde is de ferdieling fan knopen yn 3 soarten, lykas wy earder skreaun hawwe. En do silst bliid wêze.

Boarne: www.habr.com

Add a comment