Кластерт зориулж өөрийн автомат масштабыг хэрхэн хийх вэ

Сайн уу? Бид хүмүүсийг том өгөгдөлтэй ажиллахад сургадаг. Том өгөгдлийн боловсролын хөтөлбөрийг бүх оролцогчид хамтран ажилладаг өөрийн кластергүйгээр төсөөлөхийн аргагүй юм. Ийм учраас манай хөтөлбөрт үргэлж байдаг 🙂 Бид түүний тохиргоо, тааруулах, удирдах ажил хийдэг бөгөөд залуус тэнд MapReduce ажлуудыг шууд эхлүүлж, Spark ашигладаг.

Энэ нийтлэлд бид үүлэн ашиглан өөрийн автомат масштаблагчийг бичих замаар жигд бус кластер ачаалах асуудлыг хэрхэн шийдсэнийг танд хэлэх болно. Mail.ru үүлэн шийдэл.

асуудал

Манай кластер ердийн горимд ашиглагддаггүй. Устгах нь маш жигд бус байдаг. Жишээ нь, 30-аад хүн, нэг багш бүгдээрээ кластерт очоод хэрэглэж эхлэх практик хичээл байдаг. Эсвэл дахин хэлэхэд ачаалал маш их нэмэгдэх хугацаа дуусахаас өмнө хэд хоног байна. Үлдсэн хугацаанд кластер ачаалал багатай горимд ажилладаг.

Шийдэл №1 бол хамгийн их ачааллыг тэсвэрлэх чадвартай кластерыг хадгалах явдал юм, гэхдээ үлдсэн хугацаанд сул зогсох болно.

Шийдэл №2 бол хичээл эхлэхээс өмнө болон ачаалал ихтэй үед гараар зангилаа нэмэх жижиг кластер байлгах явдал юм.

Шийдэл №3 нь жижиг кластер хадгалж, кластерын одоогийн ачааллыг хянаж, янз бүрийн API ашиглан кластерт зангилаа нэмж, устгах автомат масштабыг бичих явдал юм.

Энэ нийтлэлд бид №3 шийдлийн талаар ярих болно. Энэхүү автомат хэмжээс нь дотоод хүчин зүйлээс илүүтэй гадны хүчин зүйлээс ихээхэн хамааралтай байдаг бөгөөд үйлчилгээ үзүүлэгч нь ихэнхдээ үүнийг өгдөггүй. Бид Mail.ru Cloud Solutions үүлний дэд бүтцийг ашигладаг бөгөөд MCS API ашиглан автомат масштаблагчийг бичсэн. Бид өгөгдөлтэй хэрхэн ажиллахыг заадаг тул та өөрийн зорилгоор ижил төстэй автомат масштаблагчийг хэрхэн бичиж, клоуд дээрээ хэрхэн ашиглахыг харуулахаар шийдсэн.

Шаардлагатай зүйл

Эхлээд та Hadoop кластертай байх ёстой. Жишээлбэл, бид HDP түгээлтийг ашигладаг.

Зангилаануудаа хурдан нэмж, арилгахын тулд зангилааны хооронд үүргийн тодорхой хуваарилалт байх ёстой.

  1. Мастер зангилаа. Ялангуяа ямар нэг зүйлийг тайлбарлах шаардлагагүй: хэрэв та интерактив горимыг ашигладаг бол жишээ нь Spark драйвер ажиллуулдаг кластерын гол зангилаа.
  2. Огнооны зангилаа. Энэ бол HDFS дээр өгөгдөл хадгалах, тооцоолол хийх цэг юм.
  3. Тооцооллын зангилаа. Энэ бол HDFS дээр юу ч хадгалдаггүй, гэхдээ тооцоолол хийдэг зангилаа юм.

Чухал цэг. Гурав дахь төрлийн зангилааны улмаас автоматаар масштаблах болно. Хэрэв та хоёр дахь төрлийн зангилаа авч, нэмж эхэлбэл хариу өгөх хурд маш бага байх болно - ашиглалтаас хасах, дахин ажиллуулах нь таны кластер дээр хэдэн цаг зарцуулагдах болно. Энэ нь мэдээжийн хэрэг автоматаар масштаблахаас хүлээж байгаа зүйл биш юм. Өөрөөр хэлбэл, бид эхний болон хоёр дахь төрлийн зангилаанд хүрдэггүй. Эдгээр нь хөтөлбөрийн бүх хугацаанд оршин тогтнох боломжтой хамгийн бага кластерийг төлөөлөх болно.

Тиймээс манай автомат масштаблагч нь Python 3 дээр бичигдсэн бөгөөд кластерын үйлчилгээг удирдахад Ambari API ашигладаг, ашигладаг. Mail.ru Cloud Solutions-ийн API (MCS) машиныг асаах, зогсоох зориулалттай.

Шийдлийн архитектур

  1. Модуль autoscaler.py. Энэ нь гурван ангиас бүрдэнэ: 1) Ambari-тай ажиллах функцууд, 2) MCS-тэй ажиллах функцууд, 3) автомат масштабын логиктой шууд холбоотой функцууд.
  2. Скрипт observer.py. Үндсэндээ энэ нь өөр өөр дүрмээс бүрддэг: хэзээ, ямар мөчид автомат масштабын функцийг дуудах вэ.
  3. Тохиргооны файл config.py. Энэ нь жишээлбэл автоматаар масштаблахыг зөвшөөрсөн зангилааны жагсаалт болон шинэ зангилаа нэмэгдсэнээс хойш хэр удаан хүлээх зэрэгт нөлөөлдөг бусад параметрүүдийг агуулдаг. Хичээл эхлэхээс өмнө хамгийн их зөвшөөрөгдөх кластерийн тохиргоог эхлүүлэхийн тулд хичээл эхлэх цагийн тэмдэг байдаг.

Одоо эхний хоёр файл доторх кодын хэсгүүдийг харцгаая.

1. Autoscaler.py модуль

Амбари анги

Анги агуулсан кодын хэсэг ийм харагдаж байна Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Дээрхээс жишээ болгон та функцийн хэрэгжилтийг харж болно stop_all_services, энэ нь хүссэн кластерийн зангилаа дээрх бүх үйлчилгээг зогсооно.

Ангийн үүдэнд Ambari чи өнгөрч:

  • ambari_url, жишээ нь, гэх мэт 'http://localhost:8080/api/v1/clusters/',
  • cluster_name - Амбари дахь кластерын нэр,
  • headers = {'X-Requested-By': 'ambari'}
  • болон дотор auth Энд таны Ambari-д нэвтрэх нэр, нууц үг байна: auth = ('login', 'password').

Энэ функц нь REST API-ээр дамжуулан Ambari руу хэд хэдэн дуудлага хийхээс өөр зүйл биш юм. Логик үүднээс авч үзвэл бид эхлээд зангилаа дээр ажиллаж байгаа үйлчилгээний жагсаалтыг хүлээн авч, дараа нь тухайн кластер, өгөгдсөн зангилаа дээрх үйлчилгээг жагсаалтаас муж руу шилжүүлэхийг хүснэ. INSTALLED. Бүх үйлчилгээг эхлүүлэх, зангилааг төлөв рүү шилжүүлэх функцууд Maintenance гэх мэт төстэй харагдаж байна - эдгээр нь API-ээр дамжуулан хэдхэн хүсэлт юм.

Ангийн Mcs

Анги агуулсан кодын хэсэг ийм харагдаж байна Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Ангийн үүдэнд Mcs Бид үүлэн доторх төслийн ID болон хэрэглэгчийн ID, мөн түүний нууц үгийг дамжуулдаг. Функцэд vm_turn_on Бид нэг машиныг асаахыг хүсч байна. Энд байгаа логик нь арай илүү төвөгтэй юм. Кодын эхэнд өөр гурван функц дуудагддаг: 1) бид токен авах хэрэгтэй, 2) бид хостын нэрийг MCS дахь машины нэр болгон хувиргах хэрэгтэй, 3) энэ машины id-г авна. Дараа нь бид зүгээр л шуудангийн хүсэлт гаргаж, энэ машиныг ажиллуулна.

Токен авах функц нь дараах байдалтай байна.

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Autoscaler ангилал

Энэ анги нь үйлдлийн логиктой холбоотой функцуудыг агуулдаг.

Энэ ангийн кодын хэсэг дараах байдалтай байна.

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Бид элсэлтийн анги хүлээн авна. Ambari и Mcs, томруулахыг зөвшөөрсөн зангилааны жагсаалт, түүнчлэн зангилааны тохиргооны параметрүүд: YARN дахь зангилаа руу хуваарилагдсан санах ой ба cpu. Мөн q_ram, q_cpu гэсэн 2 дотоод параметр байдаг бөгөөд эдгээр нь дараалал юм. Тэдгээрийг ашигласнаар бид кластерын одоогийн ачааллын утгыг хадгалдаг. Хэрэв бид сүүлийн 5 минутын хугацаанд ачаалал байнга нэмэгдэж байгааг харвал бид кластерт +1 зангилаа нэмэх шаардлагатай гэж шийдэв. Кластерын дутуу ашиглалтын төлөвийн хувьд мөн адил юм.

Дээрх код нь машиныг кластераас салгаж, үүлэн дотор зогсоодог функцийн жишээ юм. Эхлээд татан буулгах ажил байна YARN Nodemanager, дараа нь горим асна Maintenance, дараа нь бид машин дээрх бүх үйлчилгээг зогсоож, үүлэн доторх виртуал машиныг унтраадаг.

2. Скрипт observer.py

Тэндээс жишээ код:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Үүн дээр бид кластерын хүчин чадлыг нэмэгдүүлэх нөхцөл бүрдсэн эсэх, нөөцөд байгаа машин байгаа эсэхийг шалгаж, тэдгээрийн аль нэгнийх нь хостын нэрийг авч, кластерт нэмж, манай багийн Slack дээр энэ тухай мессеж нийтлэх болно. Үүний дараа эхэлнэ cooldown_period, бид кластераас юу ч нэмэх эсвэл хасахгүй, харин ачааллыг хянахад л хангалттай. Хэрэв энэ нь тогтворжиж, ачааллын оновчтой утгын коридорт байгаа бол бид зүгээр л хяналтаа үргэлжлүүлнэ. Хэрэв нэг зангилаа хангалтгүй байсан бол бид өөр нэгийг нэмнэ.

Бидний өмнө хичээл байгаа тохиолдолд нэг зангилаа хангалтгүй гэдгийг бид аль хэдийн мэдэж байсан тул бид бүх чөлөөт зангилааг нэн даруй эхлүүлж, хичээл дуустал идэвхтэй байлгадаг. Энэ нь үйл ажиллагааны цагийн тэмдгийн жагсаалтыг ашиглан тохиолддог.

дүгнэлт

Autoscaler нь кластерийг жигд бус ачаалах тохиолдолд сайн бөгөөд тохиромжтой шийдэл юм. Та оргил ачааллын үед хүссэн кластерын тохиргоонд нэгэн зэрэг хүрч, ачаалал багатай үед энэ кластерыг хадгалахгүй, мөнгөө хэмнэдэг. Дээрээс нь энэ бүхэн таны оролцоогүйгээр автоматаар болдог. Автомашин тохируулагч нь өөрөө тодорхой логикийн дагуу бичигдсэн кластер менежерийн API болон үүлэн үйлчилгээ үзүүлэгч API-д өгөх хүсэлтүүдийн багцаас өөр зүйл биш юм. Таны санаж байх ёстой зүйл бол бидний өмнө бичсэнчлэн зангилааг 3 төрөлд хуваах явдал юм. Мөн та аз жаргалтай байх болно.

Эх сурвалж: www.habr.com

сэтгэгдэл нэмэх