كيفية صنع المقياس التلقائي الخاص بك للمجموعة

مرحبًا! نحن ندرب الناس على العمل مع البيانات الضخمة. من المستحيل تخيل برنامج تعليمي حول البيانات الضخمة بدون مجموعته الخاصة، والتي يعمل عليها جميع المشاركين معًا. لهذا السبب، فإن برنامجنا لديه دائمًا 🙂 نحن منخرطون في تكوينه وضبطه وإدارته، ويقوم الرجال مباشرة بتشغيل وظائف MapReduce هناك ويستخدمون Spark.

سنخبرك في هذا المنشور كيف قمنا بحل مشكلة التحميل غير المتكافئ للمجموعة عن طريق كتابة مقياس تلقائي خاص بنا باستخدام السحابة حلول سحابة Mail.ru.

مشكلة

لا يتم استخدام مجموعتنا في الوضع النموذجي. التخلص غير متساو للغاية. على سبيل المثال، هناك فصول عملية، حيث يذهب جميع الأشخاص الثلاثين والمعلم إلى المجموعة ويبدأون في استخدامها. أو مرة أخرى، هناك أيام قبل الموعد النهائي يزيد فيها الحمل بشكل كبير. بقية الوقت تعمل المجموعة في وضع التحميل الزائد.

الحل رقم 1 هو الحفاظ على مجموعة قادرة على تحمل الأحمال القصوى، ولكنها ستكون خاملة بقية الوقت.

الحل رقم 2 هو الاحتفاظ بمجموعة صغيرة، والتي يمكنك إضافة العقد إليها يدويًا قبل الفئات وأثناء ذروة التحميل.

الحل رقم 3 هو الاحتفاظ بمجموعة صغيرة وكتابة مقياس تلقائي يراقب الحمل الحالي للمجموعة، وباستخدام واجهات برمجة التطبيقات المختلفة، يمكنك إضافة العقد وإزالتها من المجموعة.

في هذه التدوينة سنتحدث عن الحل رقم 3. يعتمد المقياس التلقائي هذا بشكل كبير على العوامل الخارجية بدلاً من العوامل الداخلية، وغالبًا لا يوفره مقدمو الخدمة. نحن نستخدم البنية التحتية السحابية لـ Mail.ru Cloud Solutions وقمنا بكتابة مقياس تلقائي باستخدام MCS API. وبما أننا نعلم كيفية التعامل مع البيانات، فقد قررنا أن نوضح كيف يمكنك كتابة مقياس تلقائي مماثل لأغراضك الخاصة واستخدامه مع السحابة الخاصة بك

المتطلبات الأساسية المسبقة

أولا، يجب أن يكون لديك مجموعة Hadoop. على سبيل المثال، نستخدم توزيع HDP.

لكي تتم إضافة العقد الخاصة بك وإزالتها بسرعة، يجب أن يكون لديك توزيع معين للأدوار بين العقد.

  1. العقدة الرئيسية. حسنًا، ليست هناك حاجة لشرح أي شيء على وجه الخصوص: العقدة الرئيسية للمجموعة، والتي، على سبيل المثال، يتم تشغيل برنامج تشغيل Spark، إذا كنت تستخدم الوضع التفاعلي.
  2. عقدة التاريخ. هذه هي العقدة التي تقوم بتخزين البيانات عليها على HDFS والتي تتم فيها الحسابات.
  3. عقدة الحوسبة. هذه هي العقدة التي لا تقوم فيها بتخزين أي شيء على HDFS، ولكن تتم فيها العمليات الحسابية.

نقطة مهمة. سيحدث القياس التلقائي بسبب العقد من النوع الثالث. إذا بدأت في أخذ العقد من النوع الثاني وإضافتها، فستكون سرعة الاستجابة منخفضة جدًا - وسيستغرق إيقاف التشغيل وإعادة الالتزام ساعات على مجموعتك. وهذا، بالطبع، ليس ما تتوقعه من القياس التلقائي. أي أننا لا نلمس العقد من النوعين الأول والثاني. وسوف تمثل الحد الأدنى من المجموعة القابلة للحياة والتي ستكون موجودة طوال مدة البرنامج.

لذا، فإن المقياس التلقائي الخاص بنا مكتوب بلغة Python 3، ويستخدم واجهة برمجة تطبيقات Ambari لإدارة خدمات المجموعة واستخداماتها واجهة برمجة التطبيقات من Mail.ru Cloud Solutions (MCS) لتشغيل وإيقاف الآلات.

هندسة الحل

  1. وحدة autoscaler.py. يحتوي على ثلاث فئات: 1) وظائف للعمل مع Ambari، 2) وظائف للعمل مع MCS، 3) وظائف مرتبطة مباشرة بمنطق المقياس التلقائي.
  2. سيناريو observer.py. يتكون بشكل أساسي من قواعد مختلفة: متى وفي أي لحظات يتم استدعاء وظائف المقياس التلقائي.
  3. ملف الضبط config.py. فهو يحتوي، على سبيل المثال، على قائمة العقد المسموح بها للقياس التلقائي والمعلمات الأخرى التي تؤثر، على سبيل المثال، على مدة الانتظار من لحظة إضافة عقدة جديدة. هناك أيضًا طوابع زمنية لبدء الفصول الدراسية، بحيث يتم تشغيل الحد الأقصى المسموح به لتكوين المجموعة قبل الفصل الدراسي.

دعونا الآن نلقي نظرة على أجزاء التعليمات البرمجية الموجودة داخل الملفين الأولين.

1. وحدة Autoscaler.py

فئة أمباري

هذا ما يبدو عليه جزء من التعليمات البرمجية التي تحتوي على فئة Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

أعلاه، على سبيل المثال، يمكنك إلقاء نظرة على تنفيذ الوظيفة stop_all_services، مما يؤدي إلى إيقاف كافة الخدمات على عقدة الكتلة المطلوبة.

عند مدخل الفصل Ambari تمر:

  • ambari_url، على سبيل المثال، مثل 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – اسم مجموعتك في أمباري،
  • headers = {'X-Requested-By': 'ambari'}
  • داخليا ايضا auth هنا هو تسجيل الدخول وكلمة المرور الخاصة بك لـ Ambari: auth = ('login', 'password').

الوظيفة نفسها ليست أكثر من مجرد مكالمتين عبر REST API إلى Ambari. من وجهة نظر منطقية، نتلقى أولاً قائمة بالخدمات قيد التشغيل على عقدة، ثم نطلب من مجموعة معينة، على عقدة معينة، نقل الخدمات من القائمة إلى الحالة INSTALLED. وظائف لإطلاق كافة الخدمات، لنقل العقد إلى الحالة Maintenance وما إلى ذلك تبدو متشابهة - فهي مجرد عدد قليل من الطلبات من خلال واجهة برمجة التطبيقات.

فئة مولودية

هذا ما يبدو عليه جزء من التعليمات البرمجية التي تحتوي على فئة Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

عند مدخل الفصل Mcs نقوم بتمرير معرف المشروع داخل السحابة ومعرف المستخدم وكذلك كلمة المرور الخاصة به. فى مهمة vm_turn_on نريد تشغيل إحدى الآلات. المنطق هنا أكثر تعقيدًا بعض الشيء. في بداية الكود، يتم استدعاء ثلاث وظائف أخرى: 1) نحتاج إلى الحصول على رمز مميز، 2) نحتاج إلى تحويل اسم المضيف إلى اسم الجهاز في MCS، 3) الحصول على معرف هذا الجهاز. بعد ذلك، نقوم ببساطة بتقديم طلب نشر وتشغيل هذا الجهاز.

هذا ما تبدو عليه وظيفة الحصول على الرمز المميز:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

فئة المقياس التلقائي

تحتوي هذه الفئة على وظائف تتعلق بمنطق التشغيل نفسه.

هذا ما يبدو عليه جزء من التعليمات البرمجية لهذه الفئة:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

نحن نقبل الطبقات للدخول. Ambari и Mcs، قائمة العقد المسموح لها بالتوسع، بالإضافة إلى معلمات تكوين العقدة: الذاكرة ووحدة المعالجة المركزية المخصصة للعقدة في YARN. هناك أيضًا معلمتان داخليتان q_ram و q_cpu وهما عبارة عن قوائم انتظار. باستخدامها، نقوم بتخزين قيم تحميل الكتلة الحالي. إذا رأينا أنه خلال الدقائق الخمس الأخيرة كان هناك حمل متزايد باستمرار، فإننا نقرر أننا بحاجة إلى إضافة عقدة +2 إلى المجموعة. وينطبق الشيء نفسه على حالة نقص استخدام الكتلة.

يعد الكود أعلاه مثالاً على وظيفة تقوم بإزالة جهاز من المجموعة وإيقافه في السحابة. أولا هناك وقف التشغيل YARN Nodemanager، ثم يتم تشغيل الوضع Maintenance، ثم نقوم بإيقاف جميع الخدمات على الجهاز وإيقاف تشغيل الجهاز الظاهري في السحابة.

2. البرنامج النصي Observer.py

نموذج التعليمات البرمجية من هناك:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

في ذلك، نتحقق مما إذا تم تهيئة الظروف لزيادة سعة المجموعة وما إذا كانت هناك أي أجهزة احتياطية، ونحصل على اسم المضيف لأحدها، ونضيفه إلى المجموعة وننشر رسالة عنها على Slack الخاص بفريقنا. وبعد ذلك يبدأ cooldown_period، عندما لا نضيف أو نزيل أي شيء من المجموعة، ولكن ببساطة نراقب الحمل. إذا كان قد استقر وكان ضمن ممر قيم التحميل الأمثل، فإننا ببساطة نواصل المراقبة. إذا لم تكن عقدة واحدة كافية، فإننا نضيف عقدة أخرى.

بالنسبة للحالات التي يكون أمامنا درس فيها، فنحن نعلم بالفعل على وجه اليقين أن عقدة واحدة لن تكون كافية، لذلك نبدأ على الفور جميع العقد المجانية ونبقيها نشطة حتى نهاية الدرس. يحدث هذا باستخدام قائمة الطوابع الزمنية للنشاط.

اختتام

يعد Autoscaler حلاً جيدًا ومريحًا لتلك الحالات التي تواجه فيها تحميلًا غير متساوٍ للكتلة. يمكنك في نفس الوقت تحقيق تكوين المجموعة المطلوب للأحمال القصوى وفي نفس الوقت لا تحتفظ بهذه المجموعة أثناء التحميل المنخفض، مما يوفر المال. حسنًا، بالإضافة إلى أن كل هذا يحدث تلقائيًا دون مشاركتك. إن المقياس التلقائي نفسه ليس أكثر من مجموعة من الطلبات الموجهة إلى واجهة برمجة تطبيقات مدير المجموعة وواجهة برمجة تطبيقات موفر السحابة، والتي تمت كتابتها وفقًا لمنطق معين. ما عليك بالتأكيد أن تتذكره هو تقسيم العقد إلى 3 أنواع، كما كتبنا سابقًا. وسوف تكون سعيدا.

المصدر: www.habr.com

إضافة تعليق