Cara membuat autoscaler Anda sendiri untuk sebuah cluster

Halo! Kami melatih orang untuk bekerja dengan data besar. Tidak mungkin membayangkan sebuah program pendidikan tentang big data tanpa clusternya sendiri, di mana semua peserta bekerja sama. Oleh karena itu, program kami selalu memilikinya πŸ™‚ Kami terlibat dalam konfigurasi, penyetelan, dan administrasinya, dan orang-orang langsung meluncurkan pekerjaan MapReduce di sana dan menggunakan Spark.

Dalam postingan ini kami akan memberi tahu Anda bagaimana kami memecahkan masalah pemuatan cluster yang tidak merata dengan menulis autoscaler kami sendiri menggunakan cloud Solusi Cloud Mail.ru.

masalah

Cluster kami tidak digunakan dalam mode biasa. Pembuangan sangat tidak merata. Misalnya, ada kelas praktik, ketika 30 orang dan seorang guru pergi ke cluster dan mulai menggunakannya. Atau lagi, ada hari-hari sebelum batas waktu dimana beban meningkat pesat. Sisa waktu cluster beroperasi dalam mode underload.

Solusi #1 adalah mempertahankan cluster yang dapat menahan beban puncak, namun akan menganggur sepanjang waktu.

Solusi #2 adalah mempertahankan cluster kecil, yang mana Anda menambahkan node secara manual sebelum kelas dan selama beban puncak.

Solusi #3 adalah mempertahankan cluster kecil dan menulis autoscaler yang akan memantau beban cluster saat ini dan, dengan menggunakan berbagai API, menambah dan menghapus node dari cluster.

Dalam posting ini kita akan berbicara tentang solusi #3. Penskala otomatis ini sangat bergantung pada faktor eksternal dibandingkan faktor internal, dan penyedia sering kali tidak menyediakannya. Kami menggunakan infrastruktur cloud Mail.ru Cloud Solutions dan menulis penskala otomatis menggunakan API MCS. Dan karena kami mengajarkan cara bekerja dengan data, kami memutuskan untuk menunjukkan bagaimana Anda dapat menulis penskala otomatis serupa untuk tujuan Anda sendiri dan menggunakannya dengan cloud Anda.

Prasyarat

Pertama, Anda harus memiliki cluster Hadoop. Misalnya kita menggunakan distribusi HDP.

Agar node Anda dapat ditambahkan dan dihapus dengan cepat, Anda harus memiliki pembagian peran tertentu di antara node.

  1. simpul utama. Ya, tidak perlu menjelaskan apa pun secara khusus: node utama cluster, tempat, misalnya, driver Spark diluncurkan, jika Anda menggunakan mode interaktif.
  2. simpul tanggal. Ini adalah node tempat Anda menyimpan data di HDFS dan tempat penghitungan dilakukan.
  3. Node komputasi. Ini adalah node tempat Anda tidak menyimpan apa pun di HDFS, tetapi tempat penghitungan dilakukan.

Poin penting. Penskalaan otomatis akan terjadi karena node tipe ketiga. Jika Anda mulai mengambil dan menambahkan node tipe kedua, kecepatan respons akan sangat rendah - penonaktifan dan komitmen ulang akan memakan waktu berjam-jam di cluster Anda. Tentu saja, ini bukan yang Anda harapkan dari penskalaan otomatis. Artinya, kita tidak menyentuh node tipe pertama dan kedua. Mereka akan mewakili cluster minimum yang layak dan akan ada sepanjang durasi program.

Jadi, autoscaler kami ditulis dengan Python 3, menggunakan API Ambari untuk mengelola layanan cluster, menggunakan API dari Solusi Cloud Mail.ru (MCS) untuk menghidupkan dan mematikan mesin.

Arsitektur solusi

  1. Modul autoscaler.py. Ini berisi tiga kelas: 1) fungsi untuk bekerja dengan Ambari, 2) fungsi untuk bekerja dengan MCS, 3) fungsi yang berhubungan langsung dengan logika autoscaler.
  2. Skrip observer.py. Pada dasarnya ini terdiri dari aturan yang berbeda: kapan dan pada saat apa memanggil fungsi autoscaler.
  3. Berkas konfigurasi config.py. Ini berisi, misalnya, daftar node yang diizinkan untuk penskalaan otomatis dan parameter lain yang memengaruhi, misalnya, berapa lama waktu menunggu sejak node baru ditambahkan. Ada juga stempel waktu untuk dimulainya kelas, sehingga sebelum kelas, konfigurasi cluster maksimum yang diizinkan diluncurkan.

Sekarang mari kita lihat potongan kode di dalam dua file pertama.

1. Modul autoscaler.py

kelas Ambari

Ini adalah potongan kode yang berisi sebuah kelas Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Di atas, sebagai contoh, Anda dapat melihat implementasi fungsinya stop_all_services, yang menghentikan semua layanan pada node cluster yang diinginkan.

Di pintu masuk kelas Ambari kamu lulus:

  • ambari_url, misalnya, seperti 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – nama cluster Anda di Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • dan di dalam auth ini nama pengguna dan kata sandi Anda untuk Ambari: auth = ('login', 'password').

Fungsinya sendiri tidak lebih dari beberapa panggilan melalui REST API ke Ambari. Dari sudut pandang logis, pertama-tama kita menerima daftar layanan yang berjalan pada sebuah node, dan kemudian meminta pada cluster tertentu, pada node tertentu, untuk mentransfer layanan dari daftar ke status INSTALLED. Fungsi untuk meluncurkan semua layanan, untuk mentransfer node ke negara bagian Maintenance dll. terlihat serupa - itu hanya beberapa permintaan melalui API.

Kelas Mcs

Ini adalah potongan kode yang berisi sebuah kelas Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Di pintu masuk kelas Mcs kami meneruskan id proyek di dalam cloud dan id pengguna, serta kata sandinya. Dalam fungsi vm_turn_on kami ingin menghidupkan salah satu mesin. Logikanya di sini sedikit lebih rumit. Di awal kode, tiga fungsi lain dipanggil: 1) kita perlu mendapatkan token, 2) kita perlu mengubah nama host menjadi nama mesin di MCS, 3) mendapatkan id mesin ini. Selanjutnya, kita cukup membuat permintaan posting dan meluncurkan mesin ini.

Seperti inilah fungsi untuk mendapatkan token:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Kelas penskala otomatis

Kelas ini berisi fungsi-fungsi yang berkaitan dengan logika operasi itu sendiri.

Seperti inilah potongan kode untuk kelas ini:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Kami menerima kelas untuk masuk. Ambari ΠΈ Mcs, daftar node yang diizinkan untuk penskalaan, serta parameter konfigurasi node: memori dan cpu yang dialokasikan ke node di YARN. Ada juga 2 parameter internal q_ram, q_cpu, yang merupakan antrian. Dengan menggunakannya, kami menyimpan nilai beban cluster saat ini. Jika kami melihat bahwa selama 5 menit terakhir terjadi peningkatan beban secara konsisten, maka kami memutuskan bahwa kami perlu menambahkan +1 node ke cluster. Hal yang sama juga berlaku untuk status kurang dimanfaatkannya klaster.

Kode di atas adalah contoh fungsi yang menghapus mesin dari cluster dan menghentikannya di cloud. Pertama ada dekomisioning YARN Nodemanager, maka mode akan aktif Maintenance, lalu kita hentikan semua layanan di mesin dan matikan mesin virtual di cloud.

2. Pengamat skrip.py

Contoh kode dari sana:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Di dalamnya, kami memeriksa apakah kondisi telah diciptakan untuk meningkatkan kapasitas cluster dan apakah ada mesin cadangan, mendapatkan nama host salah satunya, menambahkannya ke cluster dan mempublikasikan pesan tentang hal itu di Slack tim kami. Setelah itu dimulai cooldown_period, ketika kita tidak menambah atau menghapus apa pun dari cluster, tetapi hanya memantau bebannya. Jika sudah stabil dan berada dalam koridor nilai beban optimal, baru kita lanjutkan pemantauan. Jika satu node tidak cukup, kita tambahkan node lainnya.

Untuk kasus ketika kita memiliki pelajaran di depan, kita sudah tahu pasti bahwa satu node tidak akan cukup, jadi kita segera memulai semua node bebas dan menjaganya tetap aktif hingga akhir pelajaran. Hal ini terjadi dengan menggunakan daftar stempel waktu aktivitas.

Kesimpulan

Autoscaler adalah solusi yang baik dan nyaman untuk kasus-kasus ketika Anda mengalami pemuatan cluster yang tidak merata. Anda secara bersamaan mencapai konfigurasi cluster yang diinginkan untuk beban puncak dan pada saat yang sama tidak menyimpan cluster ini saat underload, sehingga menghemat uang. Ditambah lagi, semua ini terjadi secara otomatis tanpa partisipasi Anda. Autoscaler itu sendiri tidak lebih dari sekumpulan permintaan ke API manajer cluster dan API penyedia cloud, yang ditulis berdasarkan logika tertentu. Yang pasti perlu anda ingat adalah pembagian node menjadi 3 jenis seperti yang sudah kami tulis sebelumnya. Dan kamu akan bahagia.

Sumber: www.habr.com

Tambah komentar