Halo! Kami melatih orang untuk bekerja dengan data besar. Tidak mungkin membayangkan sebuah program pendidikan tentang big data tanpa clusternya sendiri, di mana semua peserta bekerja sama. Oleh karena itu, program kami selalu memilikinya π Kami terlibat dalam konfigurasi, penyetelan, dan administrasinya, dan orang-orang langsung meluncurkan pekerjaan MapReduce di sana dan menggunakan Spark.
Dalam postingan ini kami akan memberi tahu Anda bagaimana kami memecahkan masalah pemuatan cluster yang tidak merata dengan menulis autoscaler kami sendiri menggunakan cloud
masalah
Cluster kami tidak digunakan dalam mode biasa. Pembuangan sangat tidak merata. Misalnya, ada kelas praktik, ketika 30 orang dan seorang guru pergi ke cluster dan mulai menggunakannya. Atau lagi, ada hari-hari sebelum batas waktu dimana beban meningkat pesat. Sisa waktu cluster beroperasi dalam mode underload.
Solusi #1 adalah mempertahankan cluster yang dapat menahan beban puncak, namun akan menganggur sepanjang waktu.
Solusi #2 adalah mempertahankan cluster kecil, yang mana Anda menambahkan node secara manual sebelum kelas dan selama beban puncak.
Solusi #3 adalah mempertahankan cluster kecil dan menulis autoscaler yang akan memantau beban cluster saat ini dan, dengan menggunakan berbagai API, menambah dan menghapus node dari cluster.
Dalam posting ini kita akan berbicara tentang solusi #3. Penskala otomatis ini sangat bergantung pada faktor eksternal dibandingkan faktor internal, dan penyedia sering kali tidak menyediakannya. Kami menggunakan infrastruktur cloud Mail.ru Cloud Solutions dan menulis penskala otomatis menggunakan API MCS. Dan karena kami mengajarkan cara bekerja dengan data, kami memutuskan untuk menunjukkan bagaimana Anda dapat menulis penskala otomatis serupa untuk tujuan Anda sendiri dan menggunakannya dengan cloud Anda.
Prasyarat
Pertama, Anda harus memiliki cluster Hadoop. Misalnya kita menggunakan distribusi HDP.
Agar node Anda dapat ditambahkan dan dihapus dengan cepat, Anda harus memiliki pembagian peran tertentu di antara node.
- simpul utama. Ya, tidak perlu menjelaskan apa pun secara khusus: node utama cluster, tempat, misalnya, driver Spark diluncurkan, jika Anda menggunakan mode interaktif.
- simpul tanggal. Ini adalah node tempat Anda menyimpan data di HDFS dan tempat penghitungan dilakukan.
- Node komputasi. Ini adalah node tempat Anda tidak menyimpan apa pun di HDFS, tetapi tempat penghitungan dilakukan.
Poin penting. Penskalaan otomatis akan terjadi karena node tipe ketiga. Jika Anda mulai mengambil dan menambahkan node tipe kedua, kecepatan respons akan sangat rendah - penonaktifan dan komitmen ulang akan memakan waktu berjam-jam di cluster Anda. Tentu saja, ini bukan yang Anda harapkan dari penskalaan otomatis. Artinya, kita tidak menyentuh node tipe pertama dan kedua. Mereka akan mewakili cluster minimum yang layak dan akan ada sepanjang durasi program.
Jadi, autoscaler kami ditulis dengan Python 3, menggunakan API Ambari untuk mengelola layanan cluster, menggunakan
Arsitektur solusi
- Modul
autoscaler.py
. Ini berisi tiga kelas: 1) fungsi untuk bekerja dengan Ambari, 2) fungsi untuk bekerja dengan MCS, 3) fungsi yang berhubungan langsung dengan logika autoscaler. - Skrip
observer.py
. Pada dasarnya ini terdiri dari aturan yang berbeda: kapan dan pada saat apa memanggil fungsi autoscaler. - Berkas konfigurasi
config.py
. Ini berisi, misalnya, daftar node yang diizinkan untuk penskalaan otomatis dan parameter lain yang memengaruhi, misalnya, berapa lama waktu menunggu sejak node baru ditambahkan. Ada juga stempel waktu untuk dimulainya kelas, sehingga sebelum kelas, konfigurasi cluster maksimum yang diizinkan diluncurkan.
Sekarang mari kita lihat potongan kode di dalam dua file pertama.
1. Modul autoscaler.py
kelas Ambari
Ini adalah potongan kode yang berisi sebuah kelas Ambari
:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return message
Di atas, sebagai contoh, Anda dapat melihat implementasi fungsinya stop_all_services
, yang menghentikan semua layanan pada node cluster yang diinginkan.
Di pintu masuk kelas Ambari
kamu lulus:
ambari_url
, misalnya, seperti'http://localhost:8080/api/v1/clusters/'
,cluster_name
β nama cluster Anda di Ambari,headers = {'X-Requested-By': 'ambari'}
- dan di dalam
auth
ini nama pengguna dan kata sandi Anda untuk Ambari:auth = ('login', 'password')
.
Fungsinya sendiri tidak lebih dari beberapa panggilan melalui REST API ke Ambari. Dari sudut pandang logis, pertama-tama kita menerima daftar layanan yang berjalan pada sebuah node, dan kemudian meminta pada cluster tertentu, pada node tertentu, untuk mentransfer layanan dari daftar ke status INSTALLED
. Fungsi untuk meluncurkan semua layanan, untuk mentransfer node ke negara bagian Maintenance
dll. terlihat serupa - itu hanya beberapa permintaan melalui API.
Kelas Mcs
Ini adalah potongan kode yang berisi sebuah kelas Mcs
:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_code
Di pintu masuk kelas Mcs
kami meneruskan id proyek di dalam cloud dan id pengguna, serta kata sandinya. Dalam fungsi vm_turn_on
kami ingin menghidupkan salah satu mesin. Logikanya di sini sedikit lebih rumit. Di awal kode, tiga fungsi lain dipanggil: 1) kita perlu mendapatkan token, 2) kita perlu mengubah nama host menjadi nama mesin di MCS, 3) mendapatkan id mesin ini. Selanjutnya, kita cukup membuat permintaan posting dan meluncurkan mesin ini.
Seperti inilah fungsi untuk mendapatkan token:
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.token
Kelas penskala otomatis
Kelas ini berisi fungsi-fungsi yang berkaitan dengan logika operasi itu sendiri.
Seperti inilah potongan kode untuk kelas ini:
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return message
Kami menerima kelas untuk masuk. Ambari
ΠΈ Mcs
, daftar node yang diizinkan untuk penskalaan, serta parameter konfigurasi node: memori dan cpu yang dialokasikan ke node di YARN. Ada juga 2 parameter internal q_ram, q_cpu, yang merupakan antrian. Dengan menggunakannya, kami menyimpan nilai beban cluster saat ini. Jika kami melihat bahwa selama 5 menit terakhir terjadi peningkatan beban secara konsisten, maka kami memutuskan bahwa kami perlu menambahkan +1 node ke cluster. Hal yang sama juga berlaku untuk status kurang dimanfaatkannya klaster.
Kode di atas adalah contoh fungsi yang menghapus mesin dari cluster dan menghentikannya di cloud. Pertama ada dekomisioning YARN Nodemanager
, maka mode akan aktif Maintenance
, lalu kita hentikan semua layanan di mesin dan matikan mesin virtual di cloud.
2. Pengamat skrip.py
Contoh kode dari sana:
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)
Di dalamnya, kami memeriksa apakah kondisi telah diciptakan untuk meningkatkan kapasitas cluster dan apakah ada mesin cadangan, mendapatkan nama host salah satunya, menambahkannya ke cluster dan mempublikasikan pesan tentang hal itu di Slack tim kami. Setelah itu dimulai cooldown_period
, ketika kita tidak menambah atau menghapus apa pun dari cluster, tetapi hanya memantau bebannya. Jika sudah stabil dan berada dalam koridor nilai beban optimal, baru kita lanjutkan pemantauan. Jika satu node tidak cukup, kita tambahkan node lainnya.
Untuk kasus ketika kita memiliki pelajaran di depan, kita sudah tahu pasti bahwa satu node tidak akan cukup, jadi kita segera memulai semua node bebas dan menjaganya tetap aktif hingga akhir pelajaran. Hal ini terjadi dengan menggunakan daftar stempel waktu aktivitas.
Kesimpulan
Autoscaler adalah solusi yang baik dan nyaman untuk kasus-kasus ketika Anda mengalami pemuatan cluster yang tidak merata. Anda secara bersamaan mencapai konfigurasi cluster yang diinginkan untuk beban puncak dan pada saat yang sama tidak menyimpan cluster ini saat underload, sehingga menghemat uang. Ditambah lagi, semua ini terjadi secara otomatis tanpa partisipasi Anda. Autoscaler itu sendiri tidak lebih dari sekumpulan permintaan ke API manajer cluster dan API penyedia cloud, yang ditulis berdasarkan logika tertentu. Yang pasti perlu anda ingat adalah pembagian node menjadi 3 jenis seperti yang sudah kami tulis sebelumnya. Dan kamu akan bahagia.
Sumber: www.habr.com