Si të bëni vetë shkallëzuesin automatik për një grup

Përshëndetje! Ne trajnojmë njerëzit të punojnë me të dhëna të mëdha. Është e pamundur të imagjinohet një program arsimor mbi të dhëna të mëdha pa grupin e tij, mbi të cilin të gjithë pjesëmarrësit punojnë së bashku. Për këtë arsye, programi ynë e ka gjithmonë atë 🙂 Ne jemi të angazhuar në konfigurimin, akordimin dhe administrimin e tij, dhe djemtë hapin drejtpërdrejt punët e MapReduce atje dhe përdorin Spark.

Në këtë postim do t'ju tregojmë se si e zgjidhëm problemin e ngarkimit të pabarabartë të grupimeve duke shkruar shkallëzuesin tonë automatik duke përdorur renë Mail.ru Cloud Solutions.

problem

Grupi ynë nuk përdoret në një mënyrë tipike. Hedhja është shumë e pabarabartë. Për shembull, ka klasa praktike, kur të 30 personat dhe një mësues shkojnë në grup dhe fillojnë ta përdorin atë. Ose përsëri, ka ditë para afatit kur ngarkesa rritet shumë. Pjesën tjetër të kohës grupi funksionon në modalitetin e nënngarkimit.

Zgjidhja #1 është të mbash një grup që do t'i rezistojë ngarkesave maksimale, por do të jetë i papunë pjesën tjetër të kohës.

Zgjidhja #2 është të mbani një grup të vogël, në të cilin ju shtoni manualisht nyjet para klasave dhe gjatë ngarkesave maksimale.

Zgjidhja #3 është të mbani një grup të vogël dhe të shkruani një shkallëzues automatik që do të monitorojë ngarkesën aktuale të grupit dhe, duke përdorur API të ndryshme, do të shtojë dhe heqë nyjet nga grupi.

Në këtë postim do të flasim për zgjidhjen numër 3. Ky shkallëzues automatik është shumë i varur nga faktorë të jashtëm dhe jo nga ata të brendshëm, dhe ofruesit shpesh nuk e ofrojnë atë. Ne përdorim infrastrukturën e resë kompjuterike Mail.ru Cloud Solutions dhe kemi shkruar një shkallëzues automatik duke përdorur MCS API. Dhe meqenëse ne mësojmë se si të punojmë me të dhëna, vendosëm të tregojmë se si mund të shkruani një shkallëzues automatik të ngjashëm për qëllimet tuaja dhe ta përdorni atë me renë tuaj kompjuterike

Parakushte

Së pari, duhet të keni një grup Hadoop. Për shembull, ne përdorim shpërndarjen HDP.

Në mënyrë që nyjet tuaja të shtohen dhe hiqen shpejt, duhet të keni një shpërndarje të caktuar rolesh midis nyjeve.

  1. Nyja kryesore. Epo, nuk ka nevojë të shpjegohet asgjë veçanërisht: nyja kryesore e grupit, në të cilën, për shembull, lëshohet drejtuesi Spark, nëse përdorni modalitetin interaktiv.
  2. Nyja e datës. Kjo është nyja në të cilën ruani të dhënat në HDFS dhe ku bëhen llogaritjet.
  3. Nyja llogaritëse. Kjo është një nyje ku nuk ruani asgjë në HDFS, por ku ndodhin llogaritjet.

Pika e rëndësishme. Shkallëzimi automatik do të ndodhë për shkak të nyjeve të llojit të tretë. Nëse filloni të merrni dhe shtoni nyje të llojit të dytë, shpejtësia e përgjigjes do të jetë shumë e ulët - çmontimi dhe rikomponimi do të zgjasin orë të tëra në grupin tuaj. Kjo, natyrisht, nuk është ajo që prisni nga autoscaling. Kjo do të thotë, ne nuk prekim nyjet e tipit të parë dhe të dytë. Ato do të përfaqësojnë një grup minimal të zbatueshëm që do të ekzistojë gjatë gjithë kohëzgjatjes së programit.

Pra, autoscaler-i ynë është shkruar në Python 3, përdor Ambari API për të menaxhuar shërbimet e grupimit, përdor API nga Mail.ru Cloud Solutions (MCS) për ndezjen dhe ndalimin e makinave.

Arkitektura e zgjidhjeve

  1. Modul autoscaler.py. Ai përmban tre klasa: 1) funksione për të punuar me Ambari, 2) funksione për të punuar me MCS, 3) funksione që lidhen drejtpërdrejt me logjikën e autoscaler.
  2. Script observer.py. Në thelb ai përbëhet nga rregulla të ndryshme: kur dhe në cilat momente të thirren funksionet e autoscaler.
  3. Skedari i konfigurimit config.py. Ai përmban, për shembull, një listë nyjesh të lejuara për shkallëzim automatik dhe parametra të tjerë që ndikojnë, për shembull, sa kohë duhet pritur nga momenti i shtimit të një nyje të re. Ekzistojnë gjithashtu vula kohore për fillimin e klasave, në mënyrë që përpara klasës të hapet konfigurimi maksimal i lejuar i grupimit.

Le të shohim tani pjesët e kodit brenda dy skedarëve të parë.

1. Moduli Autoscaler.py

Klasa Ambari

Kështu duket një pjesë e kodit që përmban një klasë Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Më sipër, si shembull, mund të shikoni zbatimin e funksionit stop_all_services, i cili ndalon të gjitha shërbimet në nyjen e dëshiruar të grupimit.

Në hyrje të klasës Ambari ti kalon:

  • ambari_url, për shembull, si 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – emri i grupit tuaj në Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • dhe brenda auth këtu është emri juaj i përdoruesit dhe fjalëkalimi për Ambari: auth = ('login', 'password').

Vetë funksioni nuk është gjë tjetër veçse disa thirrje nëpërmjet API REST në Ambari. Nga pikëpamja logjike, ne fillimisht marrim një listë të shërbimeve të ekzekutuara në një nyje, dhe më pas kërkojmë në një grupim të caktuar, në një nyje të caktuar, të transferojmë shërbimet nga lista në gjendje. INSTALLED. Funksionet për nisjen e të gjitha shërbimeve, për transferimin e nyjeve në gjendje Maintenance etj. duken të ngjashme - ato janë vetëm disa kërkesa përmes API-së.

Klasa Mcs

Kështu duket një pjesë e kodit që përmban një klasë Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Në hyrje të klasës Mcs kalojmë ID-në e projektit brenda cloud dhe ID-në e përdoruesit, si dhe fjalëkalimin e tij. Në funksion vm_turn_on ne duam të ndezim një nga makinat. Logjika këtu është pak më e ndërlikuar. Në fillim të kodit quhen tre funksione të tjera: 1) duhet të marrim një shenjë, 2) duhet të konvertojmë emrin e hostit në emrin e makinës në MCS, 3) të marrim ID-në e kësaj makine. Më pas, ne thjesht bëjmë një kërkesë postimi dhe e nisim këtë makinë.

Kështu duket funksioni për marrjen e një token:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Klasa e shkallëzimit automatik

Kjo klasë përmban funksione që lidhen me vetë logjikën e funksionimit.

Kjo është se si duket një pjesë e kodit për këtë klasë:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Ne pranojmë klasa për hyrje. Ambari и Mcs, një listë nyjesh që lejohen për shkallëzim, si dhe parametrat e konfigurimit të nyjeve: memoria dhe cpu e alokuar nyjes në YARN. Ka edhe 2 parametra të brendshëm q_ram, q_cpu, të cilët janë radhë. Duke përdorur ato, ne ruajmë vlerat e ngarkesës aktuale të grupimit. Nëse shohim që gjatë 5 minutave të fundit ka pasur një ngarkesë të shtuar vazhdimisht, atëherë vendosim që duhet të shtojmë nyjen +1 në grup. E njëjta gjë vlen edhe për gjendjen e nënshfrytëzimit të grupimeve.

Kodi i mësipërm është një shembull i një funksioni që heq një makinë nga grupi dhe e ndalon atë në re. Së pari ka një çmontim YARN Nodemanager, më pas modaliteti ndizet Maintenance, më pas ndalojmë të gjitha shërbimet në makinë dhe fikim makinën virtuale në re.

2. Vëzhguesi i skriptit.py

Shembull kodi nga atje:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Në të, ne kontrollojmë nëse janë krijuar kushte për rritjen e kapacitetit të grupit dhe nëse ka ndonjë makinë në rezervë, marrim emrin e hostit të njërës prej tyre, e shtojmë atë në grup dhe publikojmë një mesazh në lidhje me të në Slack të ekipit tonë. Pas së cilës fillon cooldown_period, kur nuk shtojmë ose heqim asgjë nga grupi, por thjesht monitorojmë ngarkesën. Nëse është stabilizuar dhe është brenda korridorit të vlerave optimale të ngarkesës, atëherë thjesht vazhdojmë monitorimin. Nëse një nyje nuk ishte e mjaftueshme, atëherë shtojmë një tjetër.

Për rastet kur kemi një orë mësimi përpara, tashmë e dimë me siguri që një nyje nuk do të mjaftojë, ndaj nisim menjëherë të gjitha nyjet e lira dhe i mbajmë aktive deri në fund të orës së mësimit. Kjo ndodh duke përdorur një listë të vulave kohore të aktivitetit.

Përfundim

Autoscaler është një zgjidhje e mirë dhe e përshtatshme për ato raste kur përjetoni ngarkim të pabarabartë të grupimeve. Njëkohësisht arrini konfigurimin e dëshiruar të grupimit për ngarkesat maksimale dhe në të njëjtën kohë nuk e mbani këtë grup gjatë nënngarkimit, duke kursyer para. Epo, plus e gjithë kjo ndodh automatikisht pa pjesëmarrjen tuaj. Vetë shkallëzuesi automatik nuk është gjë tjetër veçse një grup kërkesash për API-në e menaxherit të grupit dhe API-në e ofruesit të cloud, të shkruara sipas një logjike të caktuar. Ajo që patjetër duhet të mbani mend është ndarja e nyjeve në 3 lloje, siç kemi shkruar më parë. Dhe do të jeni të lumtur.

Burimi: www.habr.com

Shto një koment