Bir çoxluq üçün öz avtomiqyaslayıcınızı necə etmək olar

Salam! Biz insanları böyük verilənlərlə işləmək üçün öyrədirik. Böyük verilənlər üzrə təhsil proqramını bütün iştirakçıların birlikdə işlədiyi öz klasteri olmadan təsəvvür etmək mümkün deyil. Bu səbəbdən proqramımızda həmişə var :) Biz onun konfiqurasiyası, sazlanması və idarə edilməsi ilə məşğul oluruq və uşaqlar birbaşa MapReduce işini orada işə salır və Spark-dan istifadə edirlər.

Bu yazıda buluddan istifadə edərək öz avtoskalerimizi yazmaqla qeyri-bərabər klaster yükləmə problemini necə həll etdiyimizi sizə xəbər verəcəyik. Mail.ru Bulud Həlləri.

problem

Klasterimiz tipik rejimdə istifadə edilmir. Utilizasiya çox qeyri-bərabərdir. Məsələn, praktiki məşğələlər var ki, 30 nəfərin hamısı və bir müəllim klasterə gedib ondan istifadə etməyə başlayır. Və ya yenə, yükün çox artdığı son tarixdən əvvəl günlər var. Qalan vaxt klaster aşağı yükləmə rejimində işləyir.

Həll №1, pik yüklərə tab gətirəcək, lakin qalan vaxtlarda boş qalacaq klasteri saxlamaqdır.

Həll №2, dərslərdən əvvəl və pik yüklər zamanı əl ilə qovşaqları əlavə etdiyiniz kiçik bir çoxluq saxlamaqdır.

Həll №3 kiçik bir klaster saxlamaq və klasterin cari yükünü izləyəcək və müxtəlif API-lərdən istifadə edərək qovşaqları klasterə əlavə etmək və silmək üçün avtomiqyaslayıcı yazmaqdır.

Bu yazıda biz №3 həll haqqında danışacağıq. Bu avtomiqyaslayıcı daxili amillərdən çox xarici amillərdən çox asılıdır və provayderlər çox vaxt bunu təmin etmirlər. Biz Mail.ru Cloud Solutions bulud infrastrukturundan istifadə edirik və MCS API istifadə edərək avtomiqyaslayıcı yazırıq. Biz verilənlərlə işləməyi öyrətdiyimiz üçün öz məqsədləriniz üçün oxşar avtomiqyaslayıcı yazmağı və onu buludunuzla necə istifadə edə biləcəyinizi göstərmək qərarına gəldik.

Önkoşullar

Əvvəlcə Hadoop klasteriniz olmalıdır. Məsələn, HDP paylanmasından istifadə edirik.

Düyünlərinizin tez bir zamanda əlavə edilməsi və silinməsi üçün qovşaqlar arasında müəyyən bir rol paylanması olmalıdır.

  1. Master node. Yaxşı, burada izah etməyə xüsusi ehtiyac yoxdur: interaktiv rejimdən istifadə etsəniz, məsələn, Spark sürücüsünün işə salındığı klasterin əsas nodu.
  2. Tarix qovşağı. Bu, HDFS-də məlumatları saxladığınız və hesablamaların aparıldığı qovşaqdır.
  3. Hesablama qovşağı. Bu, HDFS-də heç bir şey saxlamadığınız, lakin hesablamaların baş verdiyi bir qovşaqdır.

Əhəmiyyətli məqam. Avtomatik miqyaslama üçüncü növ qovşaqlara görə baş verəcək. İkinci növ qovşaqları götürməyə və əlavə etməyə başlasanız, cavab sürəti çox aşağı olacaq - istismardan çıxarmaq və yenidən işə salmaq çoxluqda saatlar çəkəcək. Bu, əlbəttə ki, avtomatik miqyasdan gözlədiyiniz şey deyil. Yəni birinci və ikinci növ düyünlərə toxunmuruq. Onlar proqramın bütün müddəti ərzində mövcud olacaq minimum canlı klasteri təmsil edəcəklər.

Beləliklə, bizim autoscaler Python 3-də yazılmışdır, klaster xidmətlərini idarə etmək üçün Ambari API istifadə edir, istifadə edir Mail.ru Cloud Solutions-dan API (MCS) maşınları işə salmaq və dayandırmaq üçün.

Həll arxitekturası

  1. Modul autoscaler.py. O, üç sinfi ehtiva edir: 1) Ambari ilə işləmək üçün funksiyalar, 2) MCS ilə işləmək üçün funksiyalar, 3) avtomatik miqyaslayıcının məntiqi ilə birbaşa əlaqəli funksiyalar.
  2. Skript observer.py. Əsasən o, müxtəlif qaydalardan ibarətdir: avtomatik miqyaslayıcı funksiyaları nə vaxt və hansı anlarda çağırmaq.
  3. Konfiqurasiya faylı config.py. O, məsələn, avtomatik miqyasda icazə verilən qovşaqların siyahısını və məsələn, yeni node əlavə edildiyi andan nə qədər gözləməli olduğuna təsir edən digər parametrləri ehtiva edir. Dərslərin başlaması üçün vaxt möhürləri də var ki, sinifdən əvvəl maksimum icazə verilən klaster konfiqurasiyası işə salınsın.

İndi ilk iki faylın içindəki kod parçalarına baxaq.

1. Autoscaler.py modulu

Ambari sinfi

Sinfi ehtiva edən kod parçası belə görünür Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Yuxarıda, nümunə olaraq, funksiyanın həyata keçirilməsinə baxa bilərsiniz stop_all_services, istənilən klaster node-da bütün xidmətləri dayandırır.

Sinfin girişində Ambari keçirsən:

  • ambari_urlməsələn, kimi 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – Ambaridəki klasterinizin adı,
  • headers = {'X-Requested-By': 'ambari'}
  • və içəridə auth Ambari üçün giriş və şifrəniz budur: auth = ('login', 'password').

Funksiya özü REST API vasitəsilə Ambari-yə bir neçə zəngdən başqa bir şey deyil. Məntiqi nöqteyi-nəzərdən, biz əvvəlcə bir qovşaqda işləyən xidmətlərin siyahısını alırıq, sonra isə verilmiş klasterdə, verilmiş qovşaqda xidmətləri siyahıdan dövlətə köçürməyi xahiş edirik. INSTALLED. Bütün xidmətləri işə salmaq, qovşaqları vəziyyətə köçürmək üçün funksiyalar Maintenance və s. oxşar görünür - bunlar API vasitəsilə yalnız bir neçə sorğudur.

Sinif Mcs

Sinfi ehtiva edən kod parçası belə görünür Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Sinfin girişində Mcs biz bulud daxilində layihə identifikatorunu və istifadəçi identifikatorunu, həmçinin onun parolunu ötürürük. Funksiyada vm_turn_on maşınlardan birini işə salmaq istəyirik. Burada məntiq bir az daha mürəkkəbdir. Kodun əvvəlində daha üç funksiya çağırılır: 1) token əldə etməliyik, 2) host adını MCS-də maşının adına çevirməliyik, 3) bu maşının id-sini əldə etməliyik. Sonra, sadəcə bir yazı sorğusu göndəririk və bu maşını işə salırıq.

Token əldə etmək funksiyası belə görünür:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Autoscaler sinfi

Bu sinif əməliyyat məntiqinin özü ilə əlaqəli funksiyaları ehtiva edir.

Bu sinif üçün kod parçası belə görünür:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Giriş üçün dərsləri qəbul edirik. Ambari и Mcs, miqyasına icazə verilən qovşaqların siyahısı, həmçinin qovşaq konfiqurasiya parametrləri: YARN-də node üçün ayrılmış yaddaş və cpu. Həmçinin növbələr olan 2 daxili parametr q_ram, q_cpu var. Onlardan istifadə edərək, cari klaster yükünün dəyərlərini saxlayırıq. Son 5 dəqiqə ərzində yükün davamlı olaraq artdığını görsək, o zaman klasterə +1 node əlavə etməyi qərara alırıq. Eyni şey klasterin tam istifadə edilməməsi vəziyyətinə də aiddir.

Yuxarıdakı kod maşını çoxluqdan çıxaran və buludda dayandıran funksiyaya nümunədir. Əvvəlcə istismardan çıxarılma var YARN Nodemanager, sonra rejim açılır Maintenance, sonra biz maşındakı bütün xidmətləri dayandırırıq və buludda virtual maşını söndürürük.

2. Observer.py skripti

Oradan kod nümunəsi:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Orada klasterin tutumunun artırılması üçün şəraitin yaradılıb-yaratılmadığını və ehtiyatda olan maşınların olub-olmamasını yoxlayırıq, onlardan birinin host adını alır, onu klasterə əlavə edirik və komandamızın Slack-də bu barədə mesaj dərc edirik. Bundan sonra başlayır cooldown_period, biz klasterdən heç nə əlavə etmədikdə və ya silmədikdə, sadəcə yükü izlədikdə. Əgər sabitləşibsə və optimal yük dəyərləri dəhlizindədirsə, onda biz sadəcə olaraq monitorinqi davam etdiririk. Bir node kifayət deyilsə, onda başqa birini əlavə edirik.

Qarşıda bir dərsimiz olduğu hallarda, bir node kifayət etməyəcəyini artıq dəqiq bilirik, buna görə də dərhal bütün pulsuz qovşaqları işə salırıq və dərsin sonuna qədər onları aktiv saxlayırıq. Bu, fəaliyyət vaxt nişanlarının siyahısından istifadə etməklə baş verir.

Nəticə

Autoscaler qeyri-bərabər klaster yüklənməsi ilə qarşılaşdığınız hallar üçün yaxşı və rahat bir həlldir. Siz eyni vaxtda pik yüklənmələr üçün istədiyiniz klaster konfiqurasiyasına nail olursunuz və eyni zamanda pula qənaət edərək, az yüklənmə zamanı bu klasteri saxlamırsınız. Yaxşı, üstəlik bütün bunlar sizin iştirakınız olmadan avtomatik olaraq baş verir. Avtomatik miqyaslayıcının özü müəyyən bir məntiqə uyğun yazılmış klaster meneceri API və bulud provayderi API-yə müraciətlər toplusundan başqa bir şey deyil. Mütləq xatırlamağınız lazım olan şey, əvvəllər yazdığımız kimi qovşaqların 3 növə bölünməsidir. Və xoşbəxt olacaqsan.

Mənbə: www.habr.com

Добавить комментарий