Ինչպես պատրաստել ձեր սեփական ավտոմատ սանդղակը կլաստերի համար

Բարեւ Ձեզ! Մենք մարդկանց սովորեցնում ենք աշխատել մեծ տվյալների հետ: Անհնար է պատկերացնել մեծ տվյալների վերաբերյալ կրթական ծրագիր առանց սեփական կլաստերի, որի վրա բոլոր մասնակիցները միասին աշխատեն։ Այդ իսկ պատճառով մեր ծրագիրը միշտ ունի այն 🙂 Մենք զբաղվում ենք դրա կազմաձևմամբ, թյունինգով և կառավարմամբ, և տղաները ուղղակիորեն գործարկում են MapReduce-ի աշխատանքները և օգտագործում Spark-ը:

Այս գրառման մեջ մենք ձեզ կպատմենք, թե ինչպես լուծեցինք կլաստերի անհավասար բեռնման խնդիրը՝ գրելով մեր սեփական autoscaler-ը՝ օգտագործելով ամպը: Mail.ru Cloud Solutions.

խնդիր

Մեր կլաստերը չի օգտագործվում տիպիկ ռեժիմում: Հեռացումը խիստ անհավասար է: Օրինակ, կան գործնական պարապմունքներ, երբ բոլոր 30 հոգին և ուսուցիչը գնում են կլաստերի և սկսում են օգտագործել այն։ Կամ կրկին, վերջնաժամկետից առաջ կան օրեր, երբ ծանրաբեռնվածությունը մեծապես ավելանում է: Մնացած ժամանակ կլաստերը գործում է թերբեռնվածության ռեժիմում:

Լուծումը թիվ 1-ն է՝ պահպանել այնպիսի կլաստեր, որը կդիմանա գագաթնակետային բեռներին, բայց մնացած ժամանակ անգործուն մնա:

Լուծումը թիվ 2-ն է փոքր կլաստեր պահելը, որին դուք ձեռքով հանգույցներ եք ավելացնում դասերից առաջ և առավելագույն բեռների ժամանակ:

Լուծումը #3-ն է՝ պահել փոքր կլաստեր և գրել ավտոմատ սանդղակ, որը կվերահսկի կլաստերի ընթացիկ ծանրաբեռնվածությունը և օգտագործելով տարբեր API-ներ, կավելացնի և կհեռացնի հանգույցները կլաստերից:

Այս գրառման մեջ կխոսենք թիվ 3 լուծման մասին։ Այս autoscaler-ը մեծապես կախված է արտաքին գործոններից, այլ ոչ թե ներքին գործոններից, և մատակարարները հաճախ դա չեն տրամադրում: Մենք օգտագործում ենք Mail.ru Cloud Solutions ամպային ենթակառուցվածքը և գրել ենք autoscaler՝ օգտագործելով MCS API: Եվ քանի որ մենք սովորեցնում ենք, թե ինչպես աշխատել տվյալների հետ, մենք որոշեցինք ցույց տալ, թե ինչպես կարող եք գրել նմանատիպ autoscaler ձեր սեփական նպատակների համար և օգտագործել այն ձեր ամպի հետ:

նախադրյալները

Նախ, դուք պետք է ունենաք Hadoop կլաստեր: Օրինակ, մենք օգտագործում ենք HDP բաշխումը:

Որպեսզի ձեր հանգույցները արագ ավելացվեն և հեռացվեն, դուք պետք է ունենաք դերերի որոշակի բաշխում հանգույցների միջև:

  1. Վարպետ հանգույց. Դե, առանձնապես որևէ բան բացատրելու կարիք չկա. կլաստերի հիմնական հանգույցը, որի վրա, օրինակ, գործարկվում է Spark դրայվերը, եթե օգտագործում եք ինտերակտիվ ռեժիմը:
  2. Ամսաթվի հանգույց. Սա այն հանգույցն է, որի վրա դուք տվյալները պահում եք HDFS-ում և որտեղ կատարվում են հաշվարկները:
  3. Հաշվողական հանգույց. Սա հանգույց է, որտեղ դուք ոչինչ չեք պահում HDFS-ում, բայց որտեղ կատարվում են հաշվարկներ:

Կարևոր կետ. Ավտոմատ մասշտաբը տեղի կունենա երրորդ տիպի հանգույցների պատճառով: Եթե ​​դուք սկսեք վերցնել և ավելացնել երկրորդ տիպի հանգույցներ, արձագանքման արագությունը կլինի շատ ցածր՝ ապամոնտաժումը և վերագործարկումը ժամեր կպահանջեն ձեր կլաստերի վրա: Սա, իհարկե, այն չէ, ինչ դուք ակնկալում եք autoscaling-ից: Այսինքն՝ մենք չենք դիպչում առաջին և երկրորդ տիպի հանգույցներին։ Դրանք կներկայացնեն նվազագույն կենսունակ կլաստեր, որը գոյություն կունենա ծրագրի ողջ ընթացքում:

Այսպիսով, մեր autoscaler-ը գրված է Python 3-ում, օգտագործում է Ambari API՝ կլաստերային ծառայությունները կառավարելու համար, օգտագործում է API Mail.ru Cloud Solutions-ից (MCS) մեքենաներ գործարկելու և կանգնեցնելու համար:

Լուծման ճարտարապետություն

  1. Մոդուլ autoscaler.py. Այն պարունակում է երեք դաս՝ 1) Ambari-ի հետ աշխատելու գործառույթներ, 2) MCS-ի հետ աշխատելու գործառույթներ, 3) գործառույթներ, որոնք անմիջականորեն կապված են ավտոսանդղակի տրամաբանության հետ։
  2. Սցենար observer.py. Ըստ էության, այն բաղկացած է տարբեր կանոններից՝ երբ և որ պահերին կանչել ավտոսանդղակի գործառույթները:
  3. Կազմաձևման ֆայլ config.py. Այն պարունակում է, օրինակ, հանգույցների ցանկ, որոնք թույլատրվում են ավտոմատ մասշտաբավորման համար և այլ պարամետրեր, որոնք ազդում են, օրինակ, նոր հանգույցի ավելացման պահից սպասելու որքան ժամանակ: Կան նաև դասերի մեկնարկի ժամանակացույցեր, որպեսզի դասից առաջ գործարկվի առավելագույն թույլատրելի կլաստերի կազմաձևումը:

Եկեք հիմա նայենք առաջին երկու ֆայլերի ներսում գտնվող կոդի կտորներին:

1. Autoscaler.py մոդուլ

Ամբարի դաս

Ահա թե ինչ տեսք ունի դաս պարունակող կոդի հատվածը Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Վերևում, որպես օրինակ, կարող եք դիտել գործառույթի իրականացումը stop_all_services, որը դադարեցնում է բոլոր ծառայությունները ցանկալի կլաստերի հանգույցում:

Դասի մուտքի մոտ Ambari դու անցնում ես:

  • ambari_url, օրինակ, նման 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – Ձեր կլաստերի անունը Ամբարիում,
  • headers = {'X-Requested-By': 'ambari'}
  • և ներսում auth ահա ձեր օգտանունը և գաղտնաբառը Ambari-ի համար. auth = ('login', 'password').

Ֆունկցիան ինքնին ոչ այլ ինչ է, քան մի քանի զանգ REST API-ի միջոցով Ambari-ին: Տրամաբանական տեսանկյունից մենք սկզբում ստանում ենք մի հանգույցի վրա գործող ծառայությունների ցանկ, այնուհետև խնդրում ենք տվյալ կլաստերի վրա, տվյալ հանգույցի վրա, ծառայությունները ցուցակից տեղափոխել վիճակ: INSTALLED. Բոլոր ծառայությունները գործարկելու, հանգույցները պետությանը փոխանցելու գործառույթներ Maintenance և այլն նման տեսք ունեն. դրանք ընդամենը մի քանի հարցումներ են API-ի միջոցով:

Դասարան Mcs

Ահա թե ինչ տեսք ունի դաս պարունակող կոդի հատվածը Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Դասի մուտքի մոտ Mcs մենք փոխանցում ենք նախագծի ID-ն ամպի ներսում և օգտվողի ID-ն, ինչպես նաև նրա գաղտնաբառը: Գործառույթի մեջ vm_turn_on մենք ուզում ենք միացնել մեքենաներից մեկը: Այստեղ տրամաբանությունը մի փոքր ավելի բարդ է։ Կոդի սկզբում երեք այլ ֆունկցիաներ են կոչվում՝ 1) պետք է ստանանք նշան, 2) պետք է հոսթի անունը փոխարկենք մեքենայի անվանման MCS-ում, 3) ստանանք այս մեքենայի id-ը։ Հաջորդը, մենք պարզապես կատարում ենք գրառման հարցում և գործարկում այս մեքենան:

Նշան ստանալու ֆունկցիան այսպիսին է թվում.

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Autoscaler դաս

Այս դասը պարունակում է գործառույթներ՝ կապված բուն գործող տրամաբանության հետ։

Այս դասի կոդի կտորն այսպիսի տեսք ունի.

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Ընդունում ենք դասեր մուտքի համար։ Ambari и Mcs, հանգույցների ցանկը, որոնք թույլատրվում են մասշտաբավորման համար, ինչպես նաև հանգույցի կազմաձևման պարամետրերը. YARN-ում հանգույցին հատկացված հիշողություն և պրոցեսոր: Կան նաև 2 ներքին պարամետրեր q_ram, q_cpu, որոնք հերթեր են։ Օգտագործելով դրանք, մենք պահում ենք ընթացիկ կլաստերի բեռնվածության արժեքները: Եթե ​​մենք տեսնում ենք, որ վերջին 5 րոպեների ընթացքում հետևողականորեն ավելացել է բեռը, ապա մենք որոշում ենք, որ մենք պետք է ավելացնենք +1 հանգույց կլաստերին: Նույնը վերաբերում է կլաստերների թերօգտագործման վիճակին:

Վերևի կոդը մի ֆունկցիայի օրինակ է, որը հեռացնում է մեքենան կլաստերից և կանգնեցնում այն ​​ամպի մեջ: Նախ կա շահագործումից հանում YARN Nodemanager, ապա ռեժիմը միանում է Maintenance, ապա մենք դադարեցնում ենք բոլոր ծառայությունները մեքենայի վրա և անջատում ենք վիրտուալ մեքենան ամպի մեջ:

2. Սցենարի դիտորդ.py

Կոդի նմուշ այնտեղից.

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Դրանում մենք ստուգում ենք, թե արդյոք պայմաններ են ստեղծվել կլաստերի հզորությունը մեծացնելու համար և կա՞ն արդյոք պահեստային մեքենաներ, ստանում ենք դրանցից մեկի հոսթի անունը, ավելացնում ենք այն կլաստերին և դրա մասին հաղորդագրություն ենք հրապարակում մեր թիմի Slack-ում։ Որից հետո սկսվում է cooldown_period, երբ մենք ոչինչ չենք ավելացնում կամ հեռացնում կլաստերից, այլ պարզապես վերահսկում ենք բեռը։ Եթե ​​այն կայունացել է և գտնվում է բեռնվածքի օպտիմալ արժեքների միջանցքում, ապա մենք պարզապես շարունակում ենք մոնիտորինգը: Եթե ​​մի հանգույցը բավարար չէր, ապա ավելացնում ենք եւս մեկը։

Այն դեպքերի համար, երբ առջեւում դաս ունենք, արդեն հաստատ գիտենք, որ մեկ հանգույցը չի բավականացնի, ուստի անմիջապես սկսում ենք բոլոր անվճար հանգույցները և ակտիվ ենք պահում մինչև դասի ավարտը։ Դա տեղի է ունենում՝ օգտագործելով գործունեության ժամանակային դրոշմանիշների ցանկը:

Ամփոփում

Autoscaler-ը լավ և հարմար լուծում է այն դեպքերի համար, երբ դուք զգում եք անհավասար կլաստերային բեռնում: Դուք միաժամանակ հասնում եք գագաթնակետային բեռների համար ցանկալի կլաստերի կոնֆիգուրացիային և միևնույն ժամանակ չեք պահում այս կլաստերը թերբեռնվածության ժամանակ՝ խնայելով գումար: Դե, գումարած, այս ամենը տեղի է ունենում ինքնաբերաբար առանց ձեր մասնակցության: Ինքնին autoscaler-ը ոչ այլ ինչ է, քան կլաստերի կառավարչի API-ին և ամպային մատակարարի API-ին ուղղված հարցումների մի շարք, որոնք գրված են որոշակի տրամաբանությամբ: Այն, ինչ դուք անպայման պետք է հիշեք, հանգույցների բաժանումն է 3 տեսակի, ինչպես ավելի վաղ գրել էինք: Եվ դուք երջանիկ կլինեք:

Source: www.habr.com

Добавить комментарий