איך להכין אוטוscaler משלך לאשכול

שלום! אנו מאמנים אנשים לעבוד עם ביג דאטה. אי אפשר לדמיין תוכנית חינוכית בנושא ביג דאטה בלי אשכול משלה, שעליו עובדים יחד כל המשתתפים. מסיבה זו, לתוכנית שלנו יש את זה תמיד 🙂 אנחנו עוסקים בתצורה, כוונון וניהול שלה, והחבר'ה משיקים שם ישירות משרות MapReduce ומשתמשים ב-Spark.

בפוסט זה נספר לכם כיצד פתרנו את בעיית טעינת האשכולות הלא אחידה על ידי כתיבת ה-autoscaler שלנו באמצעות הענן פתרונות ענן של Mail.ru.

בעיה

האשכול שלנו אינו בשימוש במצב טיפוסי. הסילוק הוא מאוד לא אחיד. למשל, יש שיעורים מעשיים, כאשר כל 30 האנשים ומורה הולכים לאשכול ומתחילים להשתמש בו. או שוב, יש ימים לפני המועד שבו העומס גדל מאוד. בשאר הזמן האשכול פועל במצב תת עומס.

פתרון מס' 1 הוא לשמור על אשכול שיעמוד בעומסי שיא, אבל יהיה פעיל בשאר הזמן.

פתרון מס' 2 הוא לשמור על אשכול קטן, אליו אתה מוסיף ידנית צמתים לפני שיעורים ובמהלך עומסי שיא.

פתרון מס' 3 הוא לשמור על אשכול קטן ולכתוב אוטוסקאלר שינטר את העומס הנוכחי של האשכול, ובאמצעות ממשקי API שונים, להוסיף ולהסיר צמתים מהאשכול.

בפוסט הזה נדבר על פתרון מס' 3. ה-autoscaler הזה תלוי מאוד בגורמים חיצוניים ולא פנימיים, ולעיתים ספקים לא מספקים אותו. אנו משתמשים בתשתית הענן של Mail.ru Cloud Solutions וכתבנו גודל אוטומטי באמצעות ה-API של MCS. ומכיוון שאנחנו מלמדים איך לעבוד עם נתונים, החלטנו להראות איך אתה יכול לכתוב אוטוסקילר דומה למטרות שלך ולהשתמש בו עם הענן שלך

תנאים מוקדמים

ראשית, עליך להיות בעל אשכול Hadoop. לדוגמה, אנו משתמשים בהפצת HDP.

על מנת שהצמתים שלך יוסיפו ויוסרו במהירות, חייבת להיות לך חלוקה מסוימת של תפקידים בין הצמתים.

  1. צומת מאסטר. ובכן, אין צורך להסביר שום דבר במיוחד: הצומת הראשי של האשכול, שבו, למשל, מופעל מנהל ההתקן Spark, אם אתה משתמש במצב האינטראקטיבי.
  2. צומת תאריך. זהו הצומת שבו אתה מאחסן נתונים ב-HDFS ושם מתבצעים חישובים.
  3. צומת מחשוב. זהו צומת שבו אתה לא מאחסן שום דבר ב-HDFS, אלא שבו מתרחשים חישובים.

נקודה חשובה. קנה מידה אוטומטי יתרחש עקב צמתים מהסוג השלישי. אם תתחיל לקחת ולהוסיף צמתים מהסוג השני, מהירות התגובה תהיה נמוכה מאוד - ביטול וביצוע מחדש ייקח שעות על האשכול שלך. זה, כמובן, לא מה שאתה מצפה משינוי קנה מידה אוטומטי. כלומר, אנחנו לא נוגעים בצמתים מהסוג הראשון והשני. הם ייצגו אשכול מינימלי בר-קיימא שיתקיים לאורך כל משך התוכנית.

אז, ה-autoscaler שלנו כתוב ב-Python 3, משתמש ב-Ambari API לניהול שירותי אשכולות, משתמש API מ-Mail.ru Cloud Solutions (MCS) להפעלה ועצירה של מכונות.

ארכיטקטורת פתרונות

  1. Модуль autoscaler.py. הוא מכיל שלוש מחלקות: 1) פונקציות לעבודה עם Ambari, 2) פונקציות לעבודה עם MCS, 3) פונקציות הקשורות ישירות ללוגיקה של ה-autoscaler.
  2. תַסרִיט observer.py. בעיקרו של דבר זה מורכב מחוקים שונים: מתי ובאילו רגעים לקרוא לפונקציות האוטוסקלר.
  3. קובץ תצורה config.py. הוא מכיל, למשל, רשימה של צמתים המותרים לשינוי קנה מידה אוטומטי ופרמטרים נוספים המשפיעים, למשל, על כמה זמן לחכות מרגע הוספת צומת חדש. יש גם חותמות זמן לתחילת השיעורים, כך שלפני המחלקה מופעלת תצורת האשכול המקסימלית המותרת.

הבה נסתכל כעת על פיסות הקוד בתוך שני הקבצים הראשונים.

1. מודול Autoscaler.py

כיתת אמברי

כך נראית קטע קוד שמכיל מחלקה Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

למעלה, כדוגמה, אתה יכול להסתכל על יישום הפונקציה stop_all_services, שעוצר את כל השירותים בצומת האשכול הרצוי.

בכניסה לכיתה Ambari עברת:

  • ambari_url, למשל, כמו 'http://localhost:8080/api/v1/clusters/',
  • cluster_name - שם האשכול שלך באמברי,
  • headers = {'X-Requested-By': 'ambari'}
  • ובפנים auth הנה פרטי הכניסה והסיסמה שלך עבור Ambari: auth = ('login', 'password').

הפונקציה עצמה היא לא יותר מכמה שיחות דרך REST API לאמבארי. מנקודת מבט הגיונית, אנו מקבלים תחילה רשימה של שירותים פועלים בצומת, ולאחר מכן מבקשים באשכול נתון, בצומת נתון, להעביר שירותים מהרשימה למדינה INSTALLED. פונקציות להפעלת כל השירותים, להעברת צמתים למצב Maintenance וכו' נראים דומים - אלו רק כמה בקשות דרך ה-API.

מחלקה מק'

כך נראית קטע קוד שמכיל מחלקה Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

בכניסה לכיתה Mcs אנחנו מעבירים את מזהה הפרויקט בתוך הענן ואת מזהה המשתמש, כמו גם את הסיסמה שלו. בתפקוד vm_turn_on אנחנו רוצים להפעיל את אחת המכונות. ההיגיון כאן קצת יותר מסובך. בתחילת הקוד נקראות שלוש פונקציות נוספות: 1) אנחנו צריכים לקבל אסימון, 2) אנחנו צריכים להמיר את שם המארח לשם של המכונה ב-MCS, 3) לקבל את המזהה של המכונה הזו. לאחר מכן, אנו פשוט שולחים בקשה לפרסום ומפעילים את המכונה הזו.

כך נראית הפונקציה להשגת אסימון:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

כיתת Autoscaler

מחלקה זו מכילה פונקציות הקשורות ללוגיקה ההפעלה עצמה.

כך נראית קטע קוד עבור מחלקה זו:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

אנו מקבלים שיעורים לכניסה. Ambari и Mcs, רשימה של צמתים המותרים לשינוי קנה מידה, וכן פרמטרים של תצורת צומת: זיכרון ומעבד המוקצים לצומת ב-YARN. ישנם גם 2 פרמטרים פנימיים q_ram, q_cpu, שהם תורים. באמצעותם, אנו מאחסנים את הערכים של עומס האשכול הנוכחי. אם אנו רואים שבמהלך 5 הדקות האחרונות היה עומס מוגבר בעקביות, אז נחליט שעלינו להוסיף צומת +1 לאשכול. הדבר נכון גם לגבי מצב תת-ניצול האשכולות.

הקוד למעלה הוא דוגמה לפונקציה שמסירה מכונה מהאשכול ועוצרת אותה בענן. ראשית יש פירוק YARN Nodemanager, ואז המצב נדלק Maintenance, אז אנחנו עוצרים את כל השירותים במכונה ומכבים את המכונה הוירטואלית בענן.

2. Script observer.py

קוד לדוגמה משם:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

בו, אנו בודקים האם נוצרו תנאים להגדלת הקיבולת של האשכול והאם יש מכונות כלשהן במילואים, מקבלים את שם המארח של אחת מהן, מוסיפים אותה לאשכול ומפרסמים הודעה על כך ב-Slack של הצוות שלנו. אחרי זה זה מתחיל cooldown_period, כאשר אנחנו לא מוסיפים או מסירים שום דבר מהאשכול, אלא פשוט עוקבים אחר העומס. אם הוא התייצב ונמצא במסדרון של ערכי עומס אופטימליים, אז אנחנו פשוט ממשיכים במעקב. אם צומת אחד לא הספיק, נוסיף עוד אחד.

למקרים שבהם יש לנו שיעור לפנינו, אנחנו כבר יודעים בוודאות שצומת אחד לא יספיק, אז אנחנו מתחילים מיד את כל הצמתים הפנויים ומשאירים אותם פעילים עד סוף השיעור. זה קורה באמצעות רשימה של חותמות זמן של פעילות.

מסקנה

Autoscaler הוא פתרון טוב ונוח לאותם מקרים שבהם אתה חווה טעינת אשכול לא אחידה. אתה משיג בו זמנית את תצורת האשכול הרצויה עבור עומסי שיא ובו זמנית לא שומרים על אשכול זה בזמן עומס נמוך, וחוסך כסף. ובכן, בנוסף כל זה קורה אוטומטית ללא השתתפותך. ה-autoscaler עצמו הוא לא יותר מסט של בקשות ל-API של מנהל האשכולות ול-API של ספק הענן, שנכתבו על פי היגיון מסוים. מה שאתה בהחלט צריך לזכור הוא חלוקת הצמתים ל-3 סוגים, כפי שכתבנו קודם לכן. ואתה תהיה מאושר.

מקור: www.habr.com

קנה אירוח אמין לאתרים עם הגנת DDoS, שרתי VPS VDS 🔥 קנה אחסון אתרים אמין עם הגנת DDoS, שרתי VPS VDS | ProHoster