چگونه خودکار مقیاس کننده خود را برای یک کلاستر بسازید

سلام! ما افراد را برای کار با داده های بزرگ آموزش می دهیم. تصور یک برنامه آموزشی روی داده های بزرگ بدون خوشه خاص خود غیرممکن است که همه شرکت کنندگان روی آن با هم کار کنند. به همین دلیل، برنامه ما همیشه آن را دارد 🙂 ما درگیر پیکربندی، تنظیم و مدیریت آن هستیم و بچه ها مستقیماً کارهای MapReduce را در آنجا راه اندازی می کنند و از Spark استفاده می کنند.

در این پست به شما خواهیم گفت که چگونه مشکل بارگیری ناهموار خوشه را با نوشتن خودکار مقیاس کننده خود با استفاده از ابر حل کردیم. Mail.ru Cloud Solutions.

مشکل

خوشه ما در حالت معمولی استفاده نمی شود. دفع بسیار ناهموار است. به عنوان مثال، کلاس های عملی وجود دارد که هر 30 نفر و یک معلم به کلاستر می روند و شروع به استفاده از آن می کنند. یا دوباره، روزهایی قبل از ضرب الاجل وجود دارد که بار به شدت افزایش می یابد. مابقی زمان خوشه در حالت زیر بار کار می کند.

راه حل شماره 1 این است که خوشه ای را نگه دارید که بارهای اوج را تحمل کند، اما بقیه زمان ها بیکار باشد.

راه حل شماره 2 نگه داشتن یک خوشه کوچک است که به صورت دستی قبل از کلاس ها و در زمان اوج بار، گره ها را به آن اضافه می کنید.

راه حل شماره 3 این است که یک خوشه کوچک نگه دارید و یک خودکار مقیاس کننده بنویسید که بار فعلی خوشه را نظارت می کند و با استفاده از API های مختلف، گره ها را از خوشه اضافه و حذف می کند.

در این پست در مورد راه حل شماره 3 صحبت خواهیم کرد. این خودکار مقیاس‌کننده به‌جای عوامل داخلی به شدت به عوامل خارجی وابسته است و ارائه‌دهندگان اغلب آن را ارائه نمی‌دهند. ما از زیرساخت ابری Mail.ru Cloud Solutions استفاده می کنیم و با استفاده از MCS API یک مقیاس خودکار نوشتیم. و از آنجایی که ما نحوه کار با داده ها را آموزش می دهیم، تصمیم گرفتیم نشان دهیم که چگونه می توانید مقیاس خودکار مشابهی را برای اهداف خود بنویسید و از آن در فضای ابری خود استفاده کنید.

پیش نیازها

ابتدا باید یک خوشه Hadoop داشته باشید. به عنوان مثال، ما از توزیع HDP استفاده می کنیم.

برای اینکه گره های شما به سرعت اضافه و حذف شوند، باید توزیع مشخصی از نقش ها در بین گره ها داشته باشید.

  1. گره اصلی. خوب، نیازی به توضیح خاصی نیست: گره اصلی خوشه، که برای مثال، درایور Spark روی آن راه اندازی می شود، اگر از حالت تعاملی استفاده می کنید.
  2. گره تاریخ. این گره ای است که داده ها را روی HDFS ذخیره می کنید و محاسبات در آن انجام می شود.
  3. گره محاسباتی این گره ای است که در آن چیزی در HDFS ذخیره نمی کنید، اما محاسبات در آن انجام می شود.

نکته مهم. مقیاس خودکار به دلیل گره های نوع سوم رخ می دهد. اگر شروع به گرفتن و اضافه کردن گره‌های نوع دوم کنید، سرعت پاسخ‌دهی بسیار کم خواهد بود - از کار انداختن و ارتکاب مجدد ساعت‌ها در خوشه شما طول می‌کشد. البته این چیزی نیست که شما از مقیاس خودکار انتظار دارید. یعنی گره های نوع اول و دوم را لمس نمی کنیم. آنها حداقل خوشه قابل دوام را نشان خواهند داد که در طول مدت برنامه وجود خواهد داشت.

بنابراین، مقیاس‌کننده خودکار ما در پایتون 3 نوشته شده است، از Ambari API برای مدیریت خدمات خوشه‌ای استفاده می‌کند. API از Mail.ru Cloud Solutions (MCS) برای راه اندازی و توقف ماشین آلات.

معماری راه حل

  1. مدول autoscaler.py. این شامل سه کلاس است: 1) توابع برای کار با Ambari، 2) توابع برای کار با MCS، 3) توابع مرتبط مستقیم با منطق خودکار مقیاس.
  2. اسکریپت observer.py. اساساً از قوانین مختلفی تشکیل شده است: چه زمانی و در چه لحظاتی باید توابع خودکار مقیاس کننده را فراخوانی کرد.
  3. فایل پیکربندی config.py. برای مثال، این شامل فهرستی از گره‌های مجاز برای مقیاس‌بندی خودکار و سایر پارامترهایی است که بر مدت زمان انتظار از لحظه اضافه شدن یک گره جدید تأثیر می‌گذارند. همچنین مهرهای زمانی برای شروع کلاس ها وجود دارد، به طوری که قبل از کلاس حداکثر پیکربندی کلاستر مجاز راه اندازی می شود.

بیایید اکنون به تکه های کد داخل دو فایل اول نگاه کنیم.

1. ماژول Autoscaler.py

کلاس عمبری

این همان چیزی است که یک قطعه کد حاوی یک کلاس به نظر می رسد Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

در بالا، به عنوان مثال، می توانید به اجرای تابع نگاه کنید stop_all_services، که تمام خدمات را در گره خوشه مورد نظر متوقف می کند.

در ورودی کلاس Ambari شما عبور می کنید:

  • ambari_urlبه عنوان مثال، مانند 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – نام خوشه شما در آمبری،
  • headers = {'X-Requested-By': 'ambari'}
  • و داخل auth در اینجا نام کاربری و رمز عبور شما برای آمبری آمده است: auth = ('login', 'password').

خود این تابع چیزی بیش از چند تماس از طریق REST API به Ambari نیست. از نقطه نظر منطقی، ما ابتدا لیستی از خدمات در حال اجرا را در یک گره دریافت می کنیم، و سپس در یک خوشه معین، در یک گره داده شده، می خواهیم خدمات را از لیست به وضعیت منتقل کنیم. INSTALLED. توابعی برای راه اندازی همه سرویس ها، برای انتقال گره ها به حالت Maintenance و غیره شبیه به هم هستند - آنها فقط چند درخواست از طریق API هستند.

کلاس مک

این همان چیزی است که یک قطعه کد حاوی یک کلاس به نظر می رسد Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

در ورودی کلاس Mcs شناسه پروژه را داخل ابر و شناسه کاربری و همچنین رمز عبور او را ارسال می کنیم. در عمل vm_turn_on می خواهیم یکی از ماشین ها را روشن کنیم. منطق اینجا کمی پیچیده تر است. در ابتدای کد، سه تابع دیگر نامیده می شود: 1) باید یک توکن دریافت کنیم، 2) باید نام میزبان را به نام ماشین در MCS تبدیل کنیم، 3) شناسه این ماشین را دریافت می کنیم. در مرحله بعد، ما به سادگی یک درخواست پست می کنیم و این دستگاه را راه اندازی می کنیم.

تابع بدست آوردن توکن به این صورت است:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

کلاس Autoscaler

این کلاس حاوی توابع مربوط به خود منطق عملیاتی است.

یک قطعه کد برای این کلاس به این صورت است:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

ما کلاس ها را برای ورود می پذیریم. Ambari и Mcs، لیستی از گره هایی که برای مقیاس بندی مجاز هستند، و همچنین پارامترهای پیکربندی گره: حافظه و پردازنده مرکزی اختصاص داده شده به گره در YARN. همچنین 2 پارامتر داخلی q_ram, q_cpu وجود دارد که به صورت صف هستند. با استفاده از آنها، مقادیر بار کلاستر فعلی را ذخیره می کنیم. اگر ببینیم که در طول 5 دقیقه گذشته به طور مداوم بار افزایش یافته است، تصمیم می گیریم که باید گره +1 را به خوشه اضافه کنیم. همین امر در مورد حالت عدم استفاده از خوشه نیز صادق است.

کد بالا نمونه ای از تابعی است که ماشینی را از خوشه حذف می کند و آن را در فضای ابری متوقف می کند. اول از کار افتادن است YARN Nodemanager، سپس حالت روشن می شود Maintenance، سپس تمام سرویس های موجود در دستگاه را متوقف می کنیم و ماشین مجازی را در فضای ابری خاموش می کنیم.

2. اسکریپت observer.py

نمونه کد از آنجا:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

در آن بررسی می‌کنیم که آیا شرایطی برای افزایش ظرفیت کلاستر ایجاد شده است و آیا ماشین‌هایی در رزرو وجود دارد یا خیر، نام میزبان یکی از آن‌ها را دریافت می‌کنیم، آن را به خوشه اضافه می‌کنیم و پیامی در مورد آن در Slack تیم خود منتشر می‌کنیم. پس از آن شروع می شود cooldown_period، زمانی که چیزی از خوشه اضافه یا حذف نمی کنیم، بلکه به سادگی بارگذاری را نظارت می کنیم. اگر تثبیت شده باشد و در دالان مقادیر بهینه بار باشد، به سادگی به نظارت ادامه می دهیم. اگر یک گره کافی نبود، یکی دیگر را اضافه می کنیم.

برای مواردی که درسی در پیش داریم، از قبل مطمئن هستیم که یک گره کافی نخواهد بود، بنابراین بلافاصله تمام گره های آزاد را راه اندازی می کنیم و آنها را تا پایان درس فعال نگه می داریم. این با استفاده از فهرستی از مُهرهای فعالیت اتفاق می‌افتد.

نتیجه

Autoscaler یک راه حل خوب و راحت برای مواردی است که بارگذاری ناهموار خوشه ای را تجربه می کنید. شما به طور همزمان به پیکربندی خوشه دلخواه برای بارهای پیک دست می‌یابید و در عین حال این خوشه را در زمان کم‌بار نگه‌داری نمی‌کنید و در هزینه صرفه‌جویی می‌کنید. خوب، به علاوه این همه به طور خودکار بدون مشارکت شما اتفاق می افتد. خود مقیاس‌کننده خودکار چیزی بیش از مجموعه‌ای از درخواست‌ها به API مدیر خوشه و API ارائه‌دهنده ابری نیست که بر اساس یک منطق خاص نوشته شده‌اند. آنچه شما قطعاً باید به خاطر بسپارید، تقسیم گره ها به 3 نوع است، همانطور که قبلاً نوشتیم. و شما خوشحال خواهید شد.

منبع: www.habr.com

اضافه کردن نظر