วิธีสร้างตัวปรับขนาดอัตโนมัติของคุณเองสำหรับคลัสเตอร์

สวัสดี! เราฝึกอบรมผู้คนให้ทำงานกับข้อมูลขนาดใหญ่ เป็นไปไม่ได้เลยที่จะจินตนาการถึงโปรแกรมการศึกษาเกี่ยวกับข้อมูลขนาดใหญ่โดยไม่มีคลัสเตอร์ของตัวเอง ซึ่งผู้เข้าร่วมทั้งหมดทำงานร่วมกัน ด้วยเหตุนี้ โปรแกรมของเราจึงมีสิ่งนี้อยู่เสมอ 🙂 เรามีส่วนร่วมในการกำหนดค่า การปรับแต่ง และการดูแลระบบ และคนเหล่านั้นก็เปิดตัวงาน MapReduce โดยตรงที่นั่นและใช้ Spark

ในโพสต์นี้ เราจะบอกคุณว่าเราแก้ไขปัญหาการโหลดคลัสเตอร์ที่ไม่สม่ำเสมอโดยการเขียนตัวปรับขนาดอัตโนมัติของเราเองโดยใช้ระบบคลาวด์ได้อย่างไร Mail.ru โซลูชั่นคลาวด์.

ปัญหา

คลัสเตอร์ของเราไม่ได้ใช้ในโหมดทั่วไป การกำจัดมีความไม่สม่ำเสมออย่างมาก ตัวอย่างเช่น มีชั้นเรียนภาคปฏิบัติ เมื่อทั้ง 30 คนและครูหนึ่งคนไปที่คลัสเตอร์และเริ่มใช้งาน หรืออีกครั้ง มีหลายวันก่อนถึงเส้นตายเมื่อปริมาณงานเพิ่มขึ้นอย่างมาก เวลาที่เหลือคลัสเตอร์ทำงานในโหมดอันเดอร์โหลด

โซลูชัน #1 คือการรักษาคลัสเตอร์ที่จะทนทานต่อการโหลดสูงสุด แต่จะไม่ได้ใช้งานตลอดเวลาที่เหลือ

โซลูชัน #2 คือการเก็บคลัสเตอร์ขนาดเล็กไว้ ซึ่งคุณเพิ่มโหนดด้วยตนเองก่อนคลาสและระหว่างโหลดสูงสุด

โซลูชัน #3 คือการเก็บคลัสเตอร์ขนาดเล็กไว้และเขียนตัวปรับขนาดอัตโนมัติที่จะตรวจสอบโหลดปัจจุบันของคลัสเตอร์ และใช้ API ต่างๆ ในการเพิ่มและลบโหนดออกจากคลัสเตอร์

ในโพสต์นี้ เราจะพูดถึงโซลูชัน #3 ตัวปรับขนาดอัตโนมัตินี้ขึ้นอยู่กับปัจจัยภายนอกมากกว่าปัจจัยภายใน และผู้ให้บริการมักไม่ได้จัดเตรียมไว้ให้ เราใช้โครงสร้างพื้นฐานคลาวด์ Mail.ru Cloud Solutions และเขียนตัวปรับขนาดอัตโนมัติโดยใช้ MCS API และเนื่องจากเราสอนวิธีทำงานกับข้อมูล เราจึงตัดสินใจที่จะแสดงให้เห็นว่าคุณสามารถเขียนตัวปรับขนาดอัตโนมัติที่คล้ายกันเพื่อวัตถุประสงค์ของคุณเองและใช้กับระบบคลาวด์ของคุณได้อย่างไร

เบื้องต้น

ขั้นแรก คุณต้องมีคลัสเตอร์ Hadoop ตัวอย่างเช่น เราใช้การกระจาย HDP

เพื่อให้โหนดของคุณถูกเพิ่มและลบอย่างรวดเร็ว คุณต้องมีการกระจายบทบาทบางอย่างระหว่างโหนดต่างๆ

  1. โหนดหลัก ไม่จำเป็นต้องอธิบายอะไรเป็นพิเศษ: โหนดหลักของคลัสเตอร์ซึ่งตัวอย่างเช่นไดรเวอร์ Spark จะถูกเปิดใช้งานหากคุณใช้โหมดโต้ตอบ
  2. โหนดวันที่ นี่คือโหนดที่คุณจัดเก็บข้อมูลบน HDFS และที่ที่ใช้คำนวณ
  3. โหนดคอมพิวเตอร์ นี่คือโหนดที่คุณไม่ได้จัดเก็บสิ่งใดบน HDFS แต่เป็นที่ที่มีการคำนวณ

จุดสำคัญ. การปรับขนาดอัตโนมัติจะเกิดขึ้นเนื่องจากโหนดประเภทที่สาม หากคุณเริ่มรับและเพิ่มโหนดประเภทที่สอง ความเร็วในการตอบสนองจะต่ำมาก การเลิกใช้งานและการคอมมิตใหม่จะใช้เวลาหลายชั่วโมงบนคลัสเตอร์ของคุณ แน่นอนว่านี่ไม่ใช่สิ่งที่คุณคาดหวังจากการปรับขนาดอัตโนมัติ นั่นคือเราไม่ได้สัมผัสโหนดประเภทที่หนึ่งและสอง พวกเขาจะเป็นตัวแทนของคลัสเตอร์ที่มีชีวิตขั้นต่ำที่จะมีอยู่ตลอดระยะเวลาของโปรแกรม

ดังนั้น ตัวปรับขนาดอัตโนมัติของเราจึงเขียนด้วย Python 3 ใช้ Ambari API เพื่อจัดการบริการคลัสเตอร์ การใช้งาน API จาก Mail.ru Cloud Solutions (MCS) สำหรับการสตาร์ทและหยุดเครื่องจักร

สถาปัตยกรรมโซลูชัน

  1. โมดึล autoscaler.py. ประกอบด้วยสามคลาส: 1) ฟังก์ชันสำหรับการทำงานกับ Ambari 2) ฟังก์ชันสำหรับการทำงานกับ MCS 3) ฟังก์ชันที่เกี่ยวข้องโดยตรงกับลอจิกของตัวปรับขนาดอัตโนมัติ
  2. สคริปต์ observer.py. โดยพื้นฐานแล้วประกอบด้วยกฎที่แตกต่างกัน: เมื่อใดและเมื่อไรที่จะเรียกใช้ฟังก์ชันตัวปรับขนาดอัตโนมัติ
  3. ไฟล์การกำหนดค่า config.py. ซึ่งประกอบด้วยรายการโหนดที่อนุญาตสำหรับการปรับขนาดอัตโนมัติและพารามิเตอร์อื่นๆ ที่ส่งผลต่อ เช่น ระยะเวลาในการรอนับจากเวลาที่เพิ่มโหนดใหม่ เป็นต้น นอกจากนี้ยังมีการประทับเวลาสำหรับการเริ่มคลาส ดังนั้นก่อนคลาส การกำหนดค่าคลัสเตอร์สูงสุดที่อนุญาตจะถูกเปิดใช้งาน

ตอนนี้เรามาดูส่วนของโค้ดภายในสองไฟล์แรกกัน

1. โมดูล Autoscaler.py

คลาสอัมบารี

นี่คือลักษณะของโค้ดที่มีคลาส Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

ข้างต้นเป็นตัวอย่าง คุณสามารถดูการใช้งานฟังก์ชันได้ stop_all_servicesซึ่งจะหยุดบริการทั้งหมดบนโหนดคลัสเตอร์ที่ต้องการ

ที่ทางเข้าชั้นเรียน Ambari คุณผ่าน:

  • ambari_urlเช่น เช่น 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – ชื่อคลัสเตอร์ของคุณใน Ambari
  • headers = {'X-Requested-By': 'ambari'}
  • และข้างใน auth นี่คือข้อมูลเข้าสู่ระบบและรหัสผ่านของคุณสำหรับ Ambari: auth = ('login', 'password').

ฟังก์ชั่นนี้ไม่มีอะไรมากไปกว่าการโทรสองครั้งผ่าน REST API ไปยัง Ambari จากมุมมองเชิงตรรกะ อันดับแรกเราได้รับรายการบริการที่ทำงานอยู่บนโหนด จากนั้นจึงขอให้คลัสเตอร์ที่กำหนดบนโหนดที่กำหนด เพื่อถ่ายโอนบริการจากรายการไปยังสถานะ INSTALLED. ฟังก์ชั่นสำหรับการเปิดตัวบริการทั้งหมด สำหรับการถ่ายโอนโหนดสู่สถานะ Maintenance ฯลฯ ดูคล้ายกัน - เป็นเพียงคำขอบางส่วนผ่าน API

คลาสแมค

นี่คือลักษณะของโค้ดที่มีคลาส Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

ที่ทางเข้าชั้นเรียน Mcs เราส่งรหัสโครงการภายในคลาวด์และรหัสผู้ใช้ตลอดจนรหัสผ่านของเขา ในการทำงาน vm_turn_on เราต้องการเปิดเครื่องเครื่องใดเครื่องหนึ่ง ตรรกะที่นี่ซับซ้อนกว่าเล็กน้อย ที่ตอนต้นของโค้ด มีการเรียกฟังก์ชันอื่นอีกสามฟังก์ชัน: 1) เราจำเป็นต้องได้รับโทเค็น 2) เราจำเป็นต้องแปลงชื่อโฮสต์เป็นชื่อของเครื่องใน MCS 3) รับ id ของเครื่องนี้ ต่อไป เราเพียงแค่ส่งคำขอโพสต์และเปิดตัวเครื่องนี้

นี่คือลักษณะของฟังก์ชันในการรับโทเค็น:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

คลาสตัวปรับขนาดอัตโนมัติ

คลาสนี้มีฟังก์ชันที่เกี่ยวข้องกับตรรกะการดำเนินงานนั่นเอง

นี่คือลักษณะของโค้ดสำหรับคลาสนี้:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

เรารับเข้าเรียน Ambari и Mcsรายการโหนดที่ได้รับอนุญาตให้ปรับขนาด รวมถึงพารามิเตอร์การกำหนดค่าโหนด: หน่วยความจำและ cpu ที่จัดสรรให้กับโหนดใน YARN นอกจากนี้ยังมีพารามิเตอร์ภายใน 2 ตัว q_ram, q_cpu ซึ่งเป็นคิว เมื่อใช้สิ่งเหล่านี้เราจะจัดเก็บค่าของโหลดคลัสเตอร์ปัจจุบัน หากเราเห็นว่าในช่วง 5 นาทีที่ผ่านมามีการโหลดเพิ่มขึ้นอย่างต่อเนื่อง เราก็ตัดสินใจว่าจะต้องเพิ่มโหนด +1 ให้กับคลัสเตอร์ เช่นเดียวกับสถานะการใช้งานคลัสเตอร์น้อยเกินไป

โค้ดด้านบนเป็นตัวอย่างของฟังก์ชันที่จะลบเครื่องออกจากคลัสเตอร์และหยุดเครื่องในระบบคลาวด์ ขั้นแรกจะมีการรื้อถอน YARN Nodemanagerจากนั้นโหมดจะเปิดขึ้น Maintenanceจากนั้นเราจะหยุดบริการทั้งหมดบนเครื่องและปิดเครื่องเสมือนในระบบคลาวด์

2. สคริปต์ Observer.py

โค้ดตัวอย่างจากที่นั่น:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

ในนั้น เราจะตรวจสอบว่ามีการสร้างเงื่อนไขเพื่อเพิ่มความจุของคลัสเตอร์หรือไม่ และมีเครื่องใด ๆ สำรองหรือไม่ รับชื่อโฮสต์ของหนึ่งในนั้น เพิ่มลงในคลัสเตอร์ และเผยแพร่ข้อความเกี่ยวกับเครื่องนั้นใน Slack ของทีมของเรา หลังจากนั้นจะเริ่ม cooldown_periodเมื่อเราไม่เพิ่มหรือลบสิ่งใดออกจากคลัสเตอร์ แต่เพียงตรวจสอบโหลด หากมีความเสถียรและอยู่ภายในทางเดินของค่าโหลดที่เหมาะสมที่สุด เราก็จะตรวจสอบต่อไป หากโหนดหนึ่งไม่เพียงพอ เราจะเพิ่มอีกโหนดหนึ่ง

สำหรับกรณีที่เรามีบทเรียนข้างหน้า เรารู้อยู่แล้วว่าโหนดเดียวจะไม่เพียงพอ ดังนั้นเราจึงเริ่มโหนดฟรีทั้งหมดทันทีและปล่อยให้โหนดเหล่านั้นใช้งานได้จนจบบทเรียน สิ่งนี้เกิดขึ้นโดยใช้รายการการประทับเวลาของกิจกรรม

ข้อสรุป

ตัวปรับขนาดอัตโนมัติเป็นโซลูชันที่ดีและสะดวกสำหรับกรณีเหล่านั้นเมื่อคุณพบปัญหาการโหลดคลัสเตอร์ที่ไม่สม่ำเสมอ คุณได้รับการกำหนดค่าคลัสเตอร์ที่ต้องการสำหรับการโหลดสูงสุดไปพร้อมๆ กัน และในขณะเดียวกันก็ไม่ต้องเก็บคลัสเตอร์นี้ไว้ในระหว่างการโหลดน้อยเกินไป ซึ่งช่วยประหยัดเงินได้ นอกจากนี้ทั้งหมดนี้จะเกิดขึ้นโดยอัตโนมัติโดยที่คุณไม่ต้องมีส่วนร่วม ตัวปรับขนาดอัตโนมัตินั้นไม่มีอะไรมากไปกว่าชุดคำขอไปยัง API ตัวจัดการคลัสเตอร์และ API ของผู้ให้บริการคลาวด์ ซึ่งเขียนตามตรรกะบางอย่าง สิ่งที่คุณต้องจำไว้อย่างแน่นอนคือการแบ่งโหนดออกเป็น 3 ประเภทดังที่เราเขียนไว้ก่อนหน้านี้ และคุณจะมีความสุข

ที่มา: will.com

เพิ่มความคิดเห็น