Kiel fari vian propran aŭtoskalilon por areto

Saluton! Ni trejnas homojn por labori kun grandaj datumoj. Ne eblas imagi edukan programon pri grandaj datumoj sen propra aro, pri kiu ĉiuj partoprenantoj kunlaboras. Tial, nia programo ĉiam havas ĝin 🙂 Ni okupiĝas pri ĝia agordo, agordado kaj administrado, kaj la infanoj rekte lanĉas MapReduce-laborojn tie kaj uzas Spark.

En ĉi tiu afiŝo ni rakontos al vi kiel ni solvis la problemon de neegala ŝarĝado de grapoloj skribante nian propran aŭtoskalilon uzante la nubon. Mail.ru Cloud Solutions.

problemo

Nia areto ne estas uzata en tipa reĝimo. Forigo estas tre malebena. Ekzemple, ekzistas praktikaj klasoj, kiam ĉiuj 30 homoj kaj instruisto iras al la areto kaj komencas uzi ĝin. Aŭ denove, estas tagoj antaŭ la limdato, kiam la ŝarĝo multe pliiĝas. La resto de la tempo la areto funkcias en subŝarĝa reĝimo.

Solvo n-ro 1 estas konservi areton kiu eltenos pintajn ŝarĝojn, sed restos neaktiva la reston de la tempo.

Solvo #2 estas konservi malgrandan areton, al kiu vi permane aldonas nodojn antaŭ klasoj kaj dum pintaj ŝarĝoj.

Solvo n-ro 3 estas konservi malgrandan areton kaj verki aŭtoskalon kiu kontrolos la nunan ŝarĝon de la areto kaj, uzante diversajn APIojn, aldonos kaj forigos nodojn de la areto.

En ĉi tiu afiŝo ni parolos pri solvo #3. Ĉi tiu aŭtoskalilo estas tre dependa de eksteraj faktoroj prefere ol internaj, kaj provizantoj ofte ne disponigas ĝin. Ni uzas la nuban infrastrukturon de Mail.ru Cloud Solutions kaj verkis aŭtoskalilon uzante la MCS-API. Kaj ĉar ni instruas kiel labori kun datumoj, ni decidis montri kiel vi povas skribi similan aŭtoskalilon por viaj propraj celoj kaj uzi ĝin kun via nubo.

antaŭkondiĉoj

Unue, vi devas havi Hadoop-grupon. Ekzemple, ni uzas la HDP-distribuon.

Por ke viaj nodoj estu rapide aldonitaj kaj forigitaj, vi devas havi certan distribuadon de roloj inter la nodoj.

  1. Majstra nodo. Nu, ne necesas klarigi ion aparte: la ĉefa nodo de la grapolo, sur kiu, ekzemple, la Spark-ŝoforo estas lanĉita, se vi uzas la interagan reĝimon.
  2. Datnodo. Ĉi tiu estas la nodo sur kiu vi stokas datumojn sur HDFS kaj kie kalkuloj okazas.
  3. Komputila nodo. Ĉi tio estas nodo, kie vi stokas nenion en HDFS, sed kie okazas kalkuloj.

Grava punkto. Aŭtoskalado okazos pro nodoj de la tria tipo. Se vi komencas preni kaj aldoni nodojn de la dua tipo, la respondrapideco estos tre malalta - malfunkciigo kaj rekomisio daŭros horojn sur via areto. Ĉi tio, kompreneble, ne estas tio, kion vi atendas de aŭtoskalo. Tio estas, ni ne tuŝas nodojn de la unua kaj dua tipoj. Ili reprezentos minimuman realigeblan areton, kiu ekzistos dum la daŭro de la programo.

Do, nia aŭtoskaler estas skribita en Python 3, uzas la Ambari API por administri clusterservojn, uzas API de Mail.ru Cloud Solutions (MCS) por starti kaj haltigi maŝinojn.

Solva arkitekturo

  1. Modulo autoscaler.py. Ĝi enhavas tri klasojn: 1) funkcioj por labori kun Ambari, 2) funkcioj por labori kun MCS, 3) funkcioj rilataj rekte al la logiko de la aŭtoskaler.
  2. Skripto observer.py. Esence ĝi konsistas el malsamaj reguloj: kiam kaj en kiuj momentoj nomi la aŭtomatajn funkciojn.
  3. Agorda dosiero config.py. Ĝi enhavas, ekzemple, liston de nodoj permesitaj por aŭtoskalo kaj aliajn parametrojn kiuj influas, ekzemple, kiom longe atendi de la momento kiam nova nodo estis aldonita. Ekzistas ankaŭ tempomarkoj por la komenco de klasoj, tiel ke antaŭ la klaso la maksimuma permesita grapolkonfiguracio estas lanĉita.

Ni nun rigardu la pecojn de kodo en la unuaj du dosieroj.

1. Modulo Autoscaler.py

Ambari klaso

Jen kiel aspektas kodo enhavanta klason Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Supre, kiel ekzemplo, vi povas rigardi la efektivigon de la funkcio stop_all_services, kiu ĉesigas ĉiujn servojn sur la dezirata grapolnodo.

Ĉe la enirejo al la klaso Ambari vi pasas:

  • ambari_url, ekzemple, kiel 'http://localhost:8080/api/v1/clusters/',
  • cluster_name - la nomo de via areto en Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • kaj interne auth jen via ensaluto kaj pasvorto por Ambari: auth = ('login', 'password').

La funkcio mem estas nenio pli ol kelkaj vokoj per la REST API al Ambari. De logika vidpunkto, ni unue ricevas liston de kurantaj servoj sur nodo, kaj poste petas sur difinita areto, sur difinita nodo, transdoni servojn de la listo al la ŝtato. INSTALLED. Funkcioj por lanĉi ĉiujn servojn, por translokigi nodojn al ŝtato Maintenance ktp aspektas simile - ili estas nur kelkaj petoj per la API.

Klaso Mcs

Jen kiel aspektas kodo enhavanta klason Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Ĉe la enirejo al la klaso Mcs ni pasas la projektididentigilon ene de la nubo kaj la uzantidentigilon, same kiel lian pasvorton. En funkcio vm_turn_on ni volas ŝalti unu el la maŝinoj. La logiko ĉi tie estas iom pli komplika. Komence de la kodo, tri aliaj funkcioj estas nomitaj: 1) ni devas akiri ĵetonon, 2) ni devas konverti la gastigan nomon en la nomon de la maŝino en MCS, 3) akiri la id de ĉi tiu maŝino. Poste, ni simple faras afiŝpeton kaj lanĉas ĉi tiun maŝinon.

Jen kiel aspektas la funkcio por akiri ĵetonon:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Autoscaler klaso

Ĉi tiu klaso enhavas funkciojn rilatajn al la operacia logiko mem.

Jen kiel aspektas kodo por ĉi tiu klaso:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Ni akceptas klasojn por eniro. Ambari и Mcs, listo de nodoj kiuj estas permesitaj por skalado, same kiel nodaj agordaj parametroj: memoro kaj cpu asignitaj al la nodo en YARN. Estas ankaŭ 2 internaj parametroj q_ram, q_cpu, kiuj estas vostoj. Uzante ilin, ni stokas la valorojn de la nuna amasŝarĝo. Se ni vidas, ke dum la lastaj 5 minutoj estis konstante pliigita ŝarĝo, tiam ni decidas, ke ni devas aldoni +1-nodon al la areto. La sama validas por la stato de subutiligo de grapo.

La ĉi-supra kodo estas ekzemplo de funkcio, kiu forigas maŝinon de la areto kaj haltigas ĝin en la nubo. Unue estas malfunkciigo YARN Nodemanager, tiam la reĝimo ŝaltiĝas Maintenance, tiam ni haltigas ĉiujn servojn sur la maŝino kaj malŝaltas la virtualan maŝinon en la nubo.

2. Skripto observanto.py

Ekzempla kodo de tie:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

En ĝi, ni kontrolas ĉu estis kreitaj kondiĉoj por pliigi la kapaciton de la areto kaj ĉu ekzistas iuj maŝinoj en rezervo, ricevas la gastigan nomon de unu el ili, aldonu ĝin al la areto kaj publikigas mesaĝon pri ĝi en la Slack de nia teamo. Post kio ĝi komenciĝas cooldown_period, kiam ni ne aldonas aŭ forigas ion el la areto, sed simple monitoras la ŝarĝon. Se ĝi stabiliĝis kaj estas ene de la koridoro de optimumaj ŝarĝvaloroj, tiam ni simple daŭrigas monitoradon. Se unu nodo ne sufiĉis, tiam ni aldonas alian.

Por kazoj, kiam ni havas lecionon antaŭe, ni jam certe scias, ke unu nodo ne sufiĉos, do ni tuj komencas ĉiujn liberajn nodojn kaj konservas ilin aktivaj ĝis la fino de la leciono. Ĉi tio okazas uzante liston de agadtempomarkoj.

konkludo

Autoscaler estas bona kaj oportuna solvo por tiuj kazoj, kiam vi spertas neegalan grapolŝarĝadon. Vi samtempe atingas la deziratan grapolan agordon por pintaj ŝarĝoj kaj samtempe ne konservas ĉi tiun areton dum subŝarĝo, ŝparante monon. Nu, krom tio ĉio okazas aŭtomate sen via partopreno. La aŭtoskalo mem estas nenio pli ol aro de petoj al la API-administranto de la cluster kaj la API de la nuba provizanto, skribitaj laŭ certa logiko. Kion vi certe devas memori estas la dividado de nodoj en 3 tipojn, kiel ni skribis antaŭe. Kaj vi estos feliĉa.

fonto: www.habr.com

Aldoni komenton