Paano gumawa ng sarili mong autoscaler para sa isang cluster

Kamusta! Sinasanay namin ang mga tao na magtrabaho gamit ang malaking data. Imposibleng isipin ang isang programang pang-edukasyon sa malaking data na walang sariling kumpol, kung saan ang lahat ng mga kalahok ay nagtutulungan. Para sa kadahilanang ito, palaging mayroon nito ang aming programa :) Kami ay nakikibahagi sa pagsasaayos, pag-tune at pangangasiwa nito, at ang mga lalaki ay direktang naglulunsad ng mga trabaho sa MapReduce doon at gumagamit ng Spark.

Sa post na ito sasabihin namin sa iyo kung paano namin nalutas ang problema ng hindi pantay na pag-load ng cluster sa pamamagitan ng pagsulat ng aming sariling autoscaler gamit ang cloud Mail.ru Cloud Solutions.

problema

Ang aming cluster ay hindi ginagamit sa isang karaniwang mode. Ang pagtatapon ay lubos na hindi pantay. Halimbawa, may mga praktikal na klase, kapag ang lahat ng 30 tao at isang guro ay pumunta sa cluster at nagsimulang gamitin ito. O muli, may mga araw bago ang deadline kung kailan tumataas nang husto ang load. Ang natitirang oras ay gumagana ang cluster sa underload mode.

Ang solusyon #1 ay panatilihin ang isang kumpol na makatiis sa mga peak load, ngunit magiging idle sa natitirang oras.

Ang solusyon #2 ay upang panatilihin ang isang maliit na kumpol, kung saan manu-mano kang magdagdag ng mga node bago ang mga klase at sa panahon ng mga peak load.

Ang solusyon #3 ay upang magpanatili ng isang maliit na cluster at magsulat ng isang autoscaler na susubaybay sa kasalukuyang pagkarga ng cluster at, gamit ang iba't ibang mga API, magdagdag at mag-alis ng mga node mula sa cluster.

Sa post na ito ay pag-uusapan natin ang solusyon #3. Ang autoscaler na ito ay lubos na nakadepende sa mga panlabas na salik kaysa sa mga panloob, at kadalasang hindi ito ibinibigay ng mga provider. Ginagamit namin ang imprastraktura ng ulap ng Mail.ru Cloud Solutions at nagsulat ng autoscaler gamit ang MCS API. At dahil itinuro namin kung paano gumawa ng data, nagpasya kaming ipakita kung paano ka makakasulat ng katulad na autoscaler para sa iyong sariling mga layunin at gamitin ito sa iyong cloud

Kinakailangan

Una, dapat mayroon kang Hadoop cluster. Halimbawa, ginagamit namin ang pamamahagi ng HDP.

Upang mabilis na maidagdag at maalis ang iyong mga node, dapat ay mayroon kang isang tiyak na pamamahagi ng mga tungkulin sa mga node.

  1. Master node. Buweno, walang partikular na kailangang ipaliwanag dito: ang pangunahing node ng kumpol, kung saan, halimbawa, ang driver ng Spark ay inilunsad, kung gagamitin mo ang interactive na mode.
  2. Node ng petsa. Ito ang node kung saan ka nag-iimbak ng data sa HDFS at kung saan nagaganap ang mga kalkulasyon.
  3. Pag-compute ng node. Ito ay isang node kung saan hindi ka nag-iimbak ng anuman sa HDFS, ngunit kung saan nangyayari ang mga kalkulasyon.

Mahalagang punto. Ang autoscaling ay magaganap dahil sa mga node ng ikatlong uri. Kung sisimulan mo ang pagkuha at pagdaragdag ng mga node ng pangalawang uri, ang bilis ng pagtugon ay magiging napakababa - ang pag-decommission at muling pagkokomisyon ay aabot ng ilang oras sa iyong cluster. Siyempre, hindi ito ang iyong inaasahan mula sa autoscaling. Iyon ay, hindi namin hinawakan ang mga node ng una at pangalawang uri. Kakatawanin nila ang isang minimum na mabubuhay na cluster na iiral sa buong tagal ng programa.

Kaya, ang aming autoscaler ay nakasulat sa Python 3, ginagamit ang Ambari API upang pamahalaan ang mga serbisyo ng cluster, ginagamit API mula sa Mail.ru Cloud Solutions (MCS) para sa pagsisimula at pagpapahinto ng mga makina.

Arkitektura ng solusyon

  1. Modyul autoscaler.py. Naglalaman ito ng tatlong klase: 1) mga function para sa pagtatrabaho sa Ambari, 2) mga function para sa pagtatrabaho sa MCS, 3) mga function na direktang nauugnay sa logic ng autoscaler.
  2. Script observer.py. Sa pangkalahatan, binubuo ito ng iba't ibang panuntunan: kailan at sa anong mga sandali tatawagan ang mga function ng autoscaler.
  3. File ng configuration config.py. Naglalaman ito, halimbawa, ng isang listahan ng mga node na pinapayagan para sa autoscaling at iba pang mga parameter na nakakaapekto, halimbawa, kung gaano katagal maghintay mula sa sandaling naidagdag ang isang bagong node. Mayroon ding mga timestamp para sa pagsisimula ng mga klase, upang bago ang klase ay mailunsad ang maximum na pinahihintulutang configuration ng cluster.

Tingnan natin ngayon ang mga piraso ng code sa loob ng unang dalawang file.

1. Autoscaler.py module

klase ng Ambari

Ito ang hitsura ng isang piraso ng code na naglalaman ng isang klase Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Sa itaas, bilang isang halimbawa, maaari mong tingnan ang pagpapatupad ng function stop_all_services, na humihinto sa lahat ng serbisyo sa gustong cluster node.

Sa pasukan ng klase Ambari pumasa ka:

  • ambari_url, halimbawa, tulad ng 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – ang pangalan ng iyong kumpol sa Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • at sa loob auth narito ang iyong username at password para sa Ambari: auth = ('login', 'password').

Ang function mismo ay hindi hihigit sa ilang mga tawag sa pamamagitan ng REST API sa Ambari. Mula sa isang lohikal na pananaw, una kaming makakatanggap ng isang listahan ng mga tumatakbong serbisyo sa isang node, at pagkatapos ay humihiling sa isang partikular na kumpol, sa isang ibinigay na node, na ilipat ang mga serbisyo mula sa listahan patungo sa estado. INSTALLED. Mga function para sa paglulunsad ng lahat ng mga serbisyo, para sa paglilipat ng mga node sa estado Maintenance atbp. magkatulad - ang mga ito ay ilan lamang sa mga kahilingan sa pamamagitan ng API.

Class Mcs

Ito ang hitsura ng isang piraso ng code na naglalaman ng isang klase Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Sa pasukan ng klase Mcs ipinapasa namin ang project id sa loob ng cloud at ang user id, pati na rin ang kanyang password. Sa pag-andar vm_turn_on gusto naming i-on ang isa sa mga makina. Ang lohika dito ay medyo mas kumplikado. Sa simula ng code, tatlong iba pang function ang tinatawag: 1) kailangan nating kumuha ng token, 2) kailangan nating i-convert ang hostname sa pangalan ng makina sa MCS, 3) kunin ang id ng makinang ito. Susunod, gumawa lang kami ng kahilingan sa pag-post at ilunsad ang makinang ito.

Ganito ang hitsura ng function para sa pagkuha ng token:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Klase ng Autoscaler

Ang klase na ito ay naglalaman ng mga function na nauugnay sa operating logic mismo.

Ito ang hitsura ng isang piraso ng code para sa klase na ito:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Tumatanggap kami ng mga klase para sa pagpasok. Ambari ΠΈ Mcs, isang listahan ng mga node na pinapayagan para sa scaling, pati na rin ang mga parameter ng configuration ng node: memory at cpu na inilalaan sa node sa YARN. Mayroon ding 2 panloob na parameter q_ram, q_cpu, na mga pila. Gamit ang mga ito, iniimbak namin ang mga halaga ng kasalukuyang pag-load ng cluster. Kung nakita namin na sa nakalipas na 5 minuto ay nagkaroon ng patuloy na pagtaas ng load, pagkatapos ay magpasya kaming kailangan naming magdagdag ng +1 node sa cluster. Ang parehong ay totoo para sa cluster underutilization estado.

Ang code sa itaas ay isang halimbawa ng isang function na nag-aalis ng machine mula sa cluster at huminto ito sa cloud. Una ay may decommissioning YARN Nodemanager, pagkatapos ay i-on ang mode Maintenance, pagkatapos ay ihihinto namin ang lahat ng serbisyo sa makina at i-off ang virtual machine sa cloud.

2. Script observer.py

Sample code mula doon:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Dito, sinusuri namin kung nalikha na ang mga kundisyon para sa pagtaas ng kapasidad ng cluster at kung mayroong anumang mga machine na nakalaan, kunin ang hostname ng isa sa mga ito, idagdag ito sa cluster at mag-publish ng mensahe tungkol dito sa Slack ng aming team. Pagkatapos nito ay magsisimula cooldown_period, kapag hindi kami nagdagdag o nag-alis ng anuman mula sa kumpol, ngunit sinusubaybayan lamang ang pagkarga. Kung ito ay naging matatag at nasa loob ng koridor ng pinakamainam na mga halaga ng pag-load, pagkatapos ay ipagpatuloy lang namin ang pagsubaybay. Kung ang isang node ay hindi sapat, pagkatapos ay magdagdag kami ng isa pa.

Para sa mga kaso kung kailan mayroon tayong aralin sa unahan, alam na nating sigurado na ang isang node ay hindi magiging sapat, kaya agad naming sinisimulan ang lahat ng mga libreng node at panatilihing aktibo ang mga ito hanggang sa katapusan ng aralin. Nangyayari ito gamit ang isang listahan ng mga timestamp ng aktibidad.

Konklusyon

Ang Autoscaler ay isang mahusay at maginhawang solusyon para sa mga kasong iyon kapag nakakaranas ka ng hindi pantay na pag-load ng cluster. Sabay-sabay mong makakamit ang gustong configuration ng cluster para sa mga peak load at sa parehong oras ay hindi pinapanatili ang cluster na ito sa panahon ng underload, na nakakatipid ng pera. Well, at ang lahat ng ito ay awtomatikong nangyayari nang wala ang iyong paglahok. Ang autoscaler mismo ay hindi hihigit sa isang hanay ng mga kahilingan sa cluster manager API at cloud provider API, na isinulat ayon sa isang partikular na lohika. Ang talagang kailangan mong tandaan ay ang paghahati ng mga node sa 3 uri, tulad ng isinulat namin kanina. At magiging masaya ka.

Pinagmulan: www.habr.com

Magdagdag ng komento