Cara membuat autoscaler anda sendiri untuk kluster

hello! Kami melatih orang ramai untuk bekerja dengan data besar. Adalah mustahil untuk membayangkan program pendidikan pada data besar tanpa klusternya sendiri, di mana semua peserta bekerjasama. Atas sebab ini, program kami sentiasa mempunyainya :) Kami terlibat dalam konfigurasi, penalaan dan pentadbirannya, dan mereka secara langsung melancarkan pekerjaan MapReduce di sana dan menggunakan Spark.

Dalam siaran ini kami akan memberitahu anda bagaimana kami menyelesaikan masalah pemuatan kluster yang tidak sekata dengan menulis autoscaler kami sendiri menggunakan awan Penyelesaian Awan Mail.ru.

masalah

Kelompok kami tidak digunakan dalam mod biasa. Pelupusan sangat tidak sekata. Sebagai contoh, terdapat kelas praktikal, apabila semua 30 orang dan seorang guru pergi ke kluster dan mula menggunakannya. Atau sekali lagi, terdapat beberapa hari sebelum tarikh akhir apabila beban meningkat dengan banyak. Selebihnya kluster beroperasi dalam mod kurang beban.

Penyelesaian #1 adalah untuk mengekalkan gugusan yang akan menahan beban puncak, tetapi akan melahu sepanjang masa.

Penyelesaian #2 adalah untuk menyimpan gugusan kecil, yang mana anda menambah nod secara manual sebelum kelas dan semasa beban puncak.

Penyelesaian #3 adalah untuk menyimpan kluster kecil dan menulis penskala auto yang akan memantau beban semasa kluster dan, menggunakan pelbagai API, menambah dan mengalih keluar nod daripada kluster.

Dalam siaran ini kita akan bercakap tentang penyelesaian #3. Autoscaler ini sangat bergantung pada faktor luaran dan bukannya faktor dalaman, dan pembekal selalunya tidak menyediakannya. Kami menggunakan infrastruktur awan Mail.ru Cloud Solutions dan menulis autoscaler menggunakan API MCS. Memandangkan kami mengajar cara bekerja dengan data, kami memutuskan untuk menunjukkan cara anda boleh menulis autoscaler yang serupa untuk tujuan anda sendiri dan menggunakannya dengan awan anda

Prasyarat

Pertama, anda mesti mempunyai kluster Hadoop. Sebagai contoh, kami menggunakan pengedaran HDP.

Untuk membolehkan nod anda ditambah dan dialih keluar dengan cepat, anda mesti mempunyai pengedaran peranan tertentu antara nod.

  1. Nod induk. Nah, tiada apa yang perlu dijelaskan di sini: nod utama kluster, di mana, sebagai contoh, pemacu Spark dilancarkan, jika anda menggunakan mod interaktif.
  2. Nod tarikh. Ini ialah nod tempat anda menyimpan data pada HDFS dan tempat pengiraan berlaku.
  3. Nod pengkomputeran. Ini ialah nod di mana anda tidak menyimpan apa-apa pada HDFS, tetapi tempat pengiraan berlaku.

Perkara penting. Autoscaling akan berlaku disebabkan oleh nod jenis ketiga. Jika anda mula mengambil dan menambah nod jenis kedua, kelajuan tindak balas akan menjadi sangat rendah - penyahtauliahan dan penumpuan semula akan mengambil masa berjam-jam pada kluster anda. Ini, sudah tentu, bukan apa yang anda harapkan daripada autoscaling. Iaitu, kami tidak menyentuh nod jenis pertama dan kedua. Mereka akan mewakili kluster berdaya maju minimum yang akan wujud sepanjang tempoh program.

Jadi, autoscaler kami ditulis dalam Python 3, menggunakan API Ambari untuk mengurus perkhidmatan kluster, menggunakan API daripada Mail.ru Cloud Solutions (MCS) untuk memulakan dan memberhentikan mesin.

Seni bina penyelesaian

  1. Modul autoscaler.py. Ia mengandungi tiga kelas: 1) fungsi untuk bekerja dengan Ambari, 2) fungsi untuk bekerja dengan MCS, 3) fungsi yang berkaitan terus dengan logik autoscaler.
  2. Skrip observer.py. Pada asasnya ia terdiri daripada peraturan yang berbeza: bila dan pada saat untuk memanggil fungsi autoscaler.
  3. Fail konfigurasi config.py. Ia mengandungi, sebagai contoh, senarai nod yang dibenarkan untuk penskalaan automatik dan parameter lain yang mempengaruhi, contohnya, berapa lama menunggu dari saat nod baharu ditambahkan. Terdapat juga cap masa untuk permulaan kelas, supaya sebelum kelas konfigurasi kelompok maksimum yang dibenarkan dilancarkan.

Sekarang mari kita lihat kepingan kod dalam dua fail pertama.

1. Modul Autoscaler.py

kelas Ambari

Beginilah rupa sekeping kod yang mengandungi kelas Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Di atas, sebagai contoh, anda boleh melihat pelaksanaan fungsi tersebut stop_all_services, yang menghentikan semua perkhidmatan pada nod kelompok yang dikehendaki.

Di pintu masuk kelas Ambari awak lulus:

  • ambari_url, sebagai contoh, seperti 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – nama kluster anda di Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • dan dalam auth inilah nama pengguna dan kata laluan anda untuk Ambari: auth = ('login', 'password').

Fungsi itu sendiri tidak lebih daripada beberapa panggilan melalui API REST ke Ambari. Dari sudut pandangan logik, kami mula-mula menerima senarai perkhidmatan yang sedang berjalan pada nod, dan kemudian meminta pada kluster tertentu, pada nod tertentu, untuk memindahkan perkhidmatan daripada senarai ke keadaan INSTALLED. Berfungsi untuk melancarkan semua perkhidmatan, untuk memindahkan nod ke keadaan Maintenance dan lain-lain kelihatan serupa - ia hanyalah beberapa permintaan melalui API.

Mcs Kelas

Beginilah rupa sekeping kod yang mengandungi kelas Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Di pintu masuk kelas Mcs kami lulus id projek di dalam awan dan id pengguna, serta kata laluannya. Dalam fungsi vm_turn_on kami ingin menghidupkan salah satu mesin. Logik di sini adalah sedikit lebih rumit. Pada permulaan kod, tiga fungsi lain dipanggil: 1) kita perlu mendapatkan token, 2) kita perlu menukar nama hos kepada nama mesin dalam MCS, 3) dapatkan id mesin ini. Seterusnya, kami hanya membuat permintaan pos dan melancarkan mesin ini.

Beginilah rupa fungsi untuk mendapatkan token:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Kelas autoscaler

Kelas ini mengandungi fungsi yang berkaitan dengan logik operasi itu sendiri.

Beginilah rupa sekeping kod untuk kelas ini:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Kami menerima kelas untuk kemasukan. Ambari ΠΈ Mcs, senarai nod yang dibenarkan untuk penskalaan, serta parameter konfigurasi nod: memori dan cpu yang diperuntukkan kepada nod dalam YARN. Terdapat juga 2 parameter dalaman q_ram, q_cpu, iaitu baris gilir. Menggunakannya, kami menyimpan nilai beban kluster semasa. Jika kami melihat bahawa dalam tempoh 5 minit yang lalu terdapat beban yang meningkat secara konsisten, maka kami memutuskan bahawa kami perlu menambah nod +1 pada kluster. Perkara yang sama berlaku untuk keadaan kurang penggunaan kelompok.

Kod di atas ialah contoh fungsi yang mengalih keluar mesin daripada kluster dan menghentikannya dalam awan. Mula-mula ada penyahtauliahan YARN Nodemanager, kemudian mod dihidupkan Maintenance, kemudian kami menghentikan semua perkhidmatan pada mesin dan mematikan mesin maya dalam awan.

2. Pemerhati skrip.py

Contoh kod dari sana:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Di dalamnya, kami menyemak sama ada syarat telah dibuat untuk meningkatkan kapasiti kluster dan sama ada terdapat sebarang mesin dalam simpanan, dapatkan nama hos salah satu daripadanya, tambahkannya pada kluster dan terbitkan mesej mengenainya pada Slack pasukan kami. Selepas itu ia bermula cooldown_period, apabila kami tidak menambah atau mengalih keluar apa-apa daripada kluster, tetapi hanya memantau beban. Jika ia telah stabil dan berada dalam koridor nilai beban optimum, maka kami teruskan pemantauan. Jika satu nod tidak mencukupi, maka kami menambah satu lagi.

Untuk kes apabila kita mempunyai pelajaran di hadapan, kita sudah tahu pasti bahawa satu nod tidak akan mencukupi, jadi kami segera memulakan semua nod percuma dan memastikan mereka aktif sehingga akhir pelajaran. Ini berlaku menggunakan senarai cap masa aktiviti.

Kesimpulan

Autoscaler ialah penyelesaian yang baik dan mudah untuk kes apabila anda mengalami pemuatan kelompok yang tidak sekata. Anda pada masa yang sama mencapai konfigurasi kluster yang diingini untuk beban puncak dan pada masa yang sama tidak menyimpan kluster ini semasa kurang muatan, menjimatkan wang. Selain itu, semua ini berlaku secara automatik tanpa penyertaan anda. Autoscaler itu sendiri tidak lebih daripada satu set permintaan kepada API pengurus kluster dan API penyedia awan, yang ditulis mengikut logik tertentu. Apa yang anda pasti perlu ingat ialah pembahagian nod kepada 3 jenis, seperti yang kami tulis sebelum ini. Dan anda akan gembira.

Sumber: www.habr.com

Tambah komen