Carane nggawe autoscaler dhewe kanggo kluster

Hello! Kita nglatih wong supaya bisa nggarap data gedhe. Ora bisa mbayangno program pendidikan babagan data gedhe tanpa kluster dhewe, sing kabeh peserta kerja bareng. Mulane, program kita tansah duwe πŸ™‚ Kita melu konfigurasi, tuning lan administrasi, lan wong lanang langsung miwiti proyek MapReduce ing kana lan nggunakake Spark.

Ing kirim iki, kita bakal menehi pitutur marang kowe carane ngatasi masalah loading kluster sing ora rata kanthi nulis autoscaler dhewe nggunakake awan. Solusi Cloud Mail.ru.

masalah

Kluster kita ora digunakake ing mode khas. Pembuangan banget ora rata. Contone, ana kelas praktis, nalika kabeh 30 wong lan guru menyang kluster lan miwiti nggunakake. Utawa maneh, ana dina sadurunge deadline nalika beban mundhak banget. Wektu liyane kluster beroperasi ing mode underload.

Solusi #1 yaiku njaga kluster sing bakal tahan beban puncak, nanging bakal nganggur ing wektu liyane.

Solusi #2 yaiku njaga kluster cilik, sing sampeyan tambahake simpul kanthi manual sadurunge kelas lan sajrone beban puncak.

Solusi # 3 kanggo njaga kluster cilik lan nulis autoscaler sing bakal ngawasi beban kluster saiki lan, nggunakake macem-macem API, nambah lan mbusak kelenjar saka kluster.

Ing kirim iki kita bakal ngomong babagan solusi #3. Autoscaler iki gumantung banget marang faktor eksternal tinimbang faktor internal, lan panyedhiya asring ora nyedhiyakake. Kita nggunakake infrastruktur awan Mail.ru Cloud Solutions lan nulis autoscaler nggunakake API MCS. Lan amarga kita mulang carane nggarap data, kita mutusake kanggo nuduhake carane sampeyan bisa nulis autoscaler sing padha kanggo tujuan sampeyan dhewe lan nggunakake awan sampeyan.

Prasyarat

Pisanan, sampeyan kudu duwe kluster Hadoop. Contone, kita nggunakake distribusi HDP.

Supaya simpul bisa ditambah lan dibusak kanthi cepet, sampeyan kudu duwe distribusi peran tartamtu ing antarane simpul kasebut.

  1. Node master. Ya, ora perlu nerangake apa-apa utamane: simpul utama kluster, sing, contone, driver Spark diluncurake, yen sampeyan nggunakake mode interaktif.
  2. Node tanggal. Iki minangka simpul sing sampeyan nyimpen data ing HDFS lan ing ngendi petungan ditindakake.
  3. Node komputasi. Iki simpul ngendi sampeyan ora nyimpen apa-apa ing HDFS, nanging ngendi petungan kelakon.

Titik penting. Autoscaling bakal kedadeyan amarga simpul saka jinis katelu. Yen sampeyan miwiti njupuk lan nambah simpul saka jinis kapindho, kacepetan respon bakal kurang banget - decommissioning lan recommitting bakal njupuk jam ing kluster. Iki, mesthi, dudu sing dikarepake saka autoscaling. Yaiku, kita ora ndemek kelenjar saka jinis pisanan lan kaloro. Dheweke bakal makili kluster sregep minimal sing bakal ana sajrone program kasebut.

Dadi, autoscaler kita ditulis ing Python 3, nggunakake API Ambari kanggo ngatur layanan kluster, nggunakake API saka Mail.ru Cloud Solutions (MCS) kanggo miwiti lan mungkasi mesin.

Arsitektur solusi

  1. Modul autoscaler.py. Isine telung kelas: 1) fungsi kanggo nggarap Ambari, 2) fungsi kanggo nggarap MCS, 3) fungsi sing ana gandhengane langsung karo logika autoscaler.
  2. Skripsi observer.py. Ateges kasusun saka aturan sing beda-beda: kapan lan ing wektu apa nelpon fungsi autoscaler.
  3. File konfigurasi config.py. Isine, contone, dhaptar simpul sing diidini kanggo autoscaling lan paramèter liyane sing mengaruhi, contone, suwene ngenteni wiwit simpul anyar ditambahake. Ana uga cap wektu kanggo wiwitan kelas, supaya sadurunge kelas, konfigurasi kluster sing diidinake maksimal diluncurake.

Ayo saiki ndeleng potongan kode ing rong file pisanan.

1. Modul Autoscaler.py

Ambari kelas

Iki minangka potongan kode sing ngemot kelas Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Ndhuwur, minangka conto, sampeyan bisa ndeleng implementasine fungsi kasebut stop_all_services, sing mandheg kabeh layanan ing simpul kluster sing dikarepake.

Ing lawang mlebu kelas Ambari sampeyan lulus:

  • ambari_url, contone, kaya 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – jeneng kluster sampeyan ing Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • lan njero auth iki jeneng pangguna lan sandhi panjenengan kanggo Ambari: auth = ('login', 'password').

Fungsi kasebut ora luwih saka saperangan telpon liwat API REST menyang Ambari. Saka sudut pandang logis, pisanan kita nampa dhaptar layanan sing mlaku ing simpul, banjur takon ing kluster tartamtu, ing simpul tartamtu, kanggo mindhah layanan saka dhaptar menyang negara. INSTALLED. Fungsi kanggo mbukak kabeh layanan, kanggo nransfer simpul menyang negara Maintenance etc katon padha - padha mung sawetara panjalukan liwat API.

Kelas Mc

Iki minangka potongan kode sing ngemot kelas Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Ing lawang mlebu kelas Mcs kita ngliwati id proyek ing njero awan lan id pangguna, uga sandhi. Ing fungsi vm_turn_on kita arep kanggo nguripake salah siji mesin. Logika ing kene rada rumit. Ing wiwitan kode kasebut, telung fungsi liyane diarani: 1) kita kudu entuk token, 2) kita kudu ngowahi jeneng host dadi jeneng mesin ing MCS, 3) entuk id mesin iki. Sabanjure, kita mung nggawe panjalukan kirim lan miwiti mesin iki.

Iki minangka fungsi kanggo entuk token:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Kelas Autoscaler

Kelas iki ngemot fungsi sing ana gandhengane karo logika operasi kasebut.

Iki minangka potongan kode kanggo kelas iki:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Kita nampa kelas kanggo mlebu. Ambari и Mcs, dhaptar simpul sing diijini kanggo skala, uga paramèter konfigurasi simpul: memori lan cpu sing diparengake kanggo simpul ing BENANG. Ana uga 2 parameter internal q_ram, q_cpu, sing antrian. Nggunakake, kita nyimpen nilai beban kluster saiki. Yen kita weruh yen sajrone 5 menit pungkasan ana beban sing terus-terusan tambah, mula kita kudu nambah simpul +1 menyang kluster. Padha bener kanggo negara underutilization kluster.

Kode ing ndhuwur minangka conto fungsi sing mbusak mesin saka kluster lan mandheg ing mΓ©ga. Kaping pisanan ana decommissioning YARN Nodemanager, banjur mode urip Maintenance, banjur kita mungkasi kabeh layanan ing mesin lan mateni mesin virtual ing mΓ©ga.

2. Pengamat naskah.py

Kode conto saka ing kono:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Ing, kita mriksa apa kahanan wis dikembangakΓ© kanggo nambah kapasitas kluster lan apa ana mesin ing cadangan, njaluk jeneng host siji saka wong-wong mau, ditambahake menyang kluster lan nerbitakΓ© pesen babagan ing Slack tim kita. Sawise iku diwiwiti cooldown_period, nalika kita ora nambah utawa mbusak apa-apa saka kluster, nanging mung ngawasi mbukak. Yen wis stabil lan ana ing koridor nilai beban sing optimal, mula kita terus ngawasi. Yen siji simpul ora cukup, banjur kita nambah siji liyane.

Kanggo kasus nalika kita duwe pawulangan ing ngarep, kita wis ngerti manawa siji simpul ora bakal cukup, mula kita langsung miwiti kabeh simpul gratis lan tetep aktif nganti pungkasan pelajaran. Iki kedadeyan nggunakake dhaptar cap wektu kegiatan.

kesimpulan

Autoscaler minangka solusi sing apik lan trep kanggo kasus kasebut nalika sampeyan ngalami loading kluster sing ora rata. Sampeyan bebarengan entuk konfigurasi kluster sing dikarepake kanggo beban puncak lan ing wektu sing padha ora njaga kluster iki sajrone underload, ngirit dhuwit. Kajaba iku, kabeh iki kedadeyan kanthi otomatis tanpa partisipasi sampeyan. Autoscaler dhewe ora luwih saka seperangkat panjalukan kanggo API manager cluster lan API panyedhiya maya, ditulis miturut logika tartamtu. Apa sampeyan kudu ngelingi yaiku divisi simpul dadi 3 jinis, kaya sing wis ditulis sadurunge. Lan sampeyan bakal seneng.

Source: www.habr.com

Add a comment