สวัสดี! เราฝึกอบรมผู้คนให้ทำงานกับข้อมูลขนาดใหญ่ เป็นไปไม่ได้เลยที่จะจินตนาการถึงโปรแกรมการศึกษาเกี่ยวกับข้อมูลขนาดใหญ่โดยไม่มีคลัสเตอร์ของตัวเอง ซึ่งผู้เข้าร่วมทั้งหมดทำงานร่วมกัน ด้วยเหตุนี้ โปรแกรมของเราจึงมีสิ่งนี้อยู่เสมอ 🙂 เรามีส่วนร่วมในการกำหนดค่า การปรับแต่ง และการดูแลระบบ และคนเหล่านั้นก็เปิดตัวงาน MapReduce โดยตรงที่นั่นและใช้ Spark
ในโพสต์นี้ เราจะบอกคุณว่าเราแก้ไขปัญหาการโหลดคลัสเตอร์ที่ไม่สม่ำเสมอโดยการเขียนตัวปรับขนาดอัตโนมัติของเราเองโดยใช้ระบบคลาวด์ได้อย่างไร
ปัญหา
คลัสเตอร์ของเราไม่ได้ใช้ในโหมดทั่วไป การกำจัดมีความไม่สม่ำเสมออย่างมาก ตัวอย่างเช่น มีชั้นเรียนภาคปฏิบัติ เมื่อทั้ง 30 คนและครูหนึ่งคนไปที่คลัสเตอร์และเริ่มใช้งาน หรืออีกครั้ง มีหลายวันก่อนถึงเส้นตายเมื่อปริมาณงานเพิ่มขึ้นอย่างมาก เวลาที่เหลือคลัสเตอร์ทำงานในโหมดอันเดอร์โหลด
โซลูชัน #1 คือการรักษาคลัสเตอร์ที่จะทนทานต่อการโหลดสูงสุด แต่จะไม่ได้ใช้งานตลอดเวลาที่เหลือ
โซลูชัน #2 คือการเก็บคลัสเตอร์ขนาดเล็กไว้ ซึ่งคุณเพิ่มโหนดด้วยตนเองก่อนคลาสและระหว่างโหลดสูงสุด
โซลูชัน #3 คือการเก็บคลัสเตอร์ขนาดเล็กไว้และเขียนตัวปรับขนาดอัตโนมัติที่จะตรวจสอบโหลดปัจจุบันของคลัสเตอร์ และใช้ API ต่างๆ ในการเพิ่มและลบโหนดออกจากคลัสเตอร์
ในโพสต์นี้ เราจะพูดถึงโซลูชัน #3 ตัวปรับขนาดอัตโนมัตินี้ขึ้นอยู่กับปัจจัยภายนอกมากกว่าปัจจัยภายใน และผู้ให้บริการมักไม่ได้จัดเตรียมไว้ให้ เราใช้โครงสร้างพื้นฐานคลาวด์ Mail.ru Cloud Solutions และเขียนตัวปรับขนาดอัตโนมัติโดยใช้ MCS API และเนื่องจากเราสอนวิธีทำงานกับข้อมูล เราจึงตัดสินใจที่จะแสดงให้เห็นว่าคุณสามารถเขียนตัวปรับขนาดอัตโนมัติที่คล้ายกันเพื่อวัตถุประสงค์ของคุณเองและใช้กับระบบคลาวด์ของคุณได้อย่างไร
เบื้องต้น
ขั้นแรก คุณต้องมีคลัสเตอร์ Hadoop ตัวอย่างเช่น เราใช้การกระจาย HDP
เพื่อให้โหนดของคุณถูกเพิ่มและลบอย่างรวดเร็ว คุณต้องมีการกระจายบทบาทบางอย่างระหว่างโหนดต่างๆ
- โหนดหลัก ไม่จำเป็นต้องอธิบายอะไรเป็นพิเศษ: โหนดหลักของคลัสเตอร์ซึ่งตัวอย่างเช่นไดรเวอร์ Spark จะถูกเปิดใช้งานหากคุณใช้โหมดโต้ตอบ
- โหนดวันที่ นี่คือโหนดที่คุณจัดเก็บข้อมูลบน HDFS และที่ที่ใช้คำนวณ
- โหนดคอมพิวเตอร์ นี่คือโหนดที่คุณไม่ได้จัดเก็บสิ่งใดบน HDFS แต่เป็นที่ที่มีการคำนวณ
จุดสำคัญ. การปรับขนาดอัตโนมัติจะเกิดขึ้นเนื่องจากโหนดประเภทที่สาม หากคุณเริ่มรับและเพิ่มโหนดประเภทที่สอง ความเร็วในการตอบสนองจะต่ำมาก การเลิกใช้งานและการคอมมิตใหม่จะใช้เวลาหลายชั่วโมงบนคลัสเตอร์ของคุณ แน่นอนว่านี่ไม่ใช่สิ่งที่คุณคาดหวังจากการปรับขนาดอัตโนมัติ นั่นคือเราไม่ได้สัมผัสโหนดประเภทที่หนึ่งและสอง พวกเขาจะเป็นตัวแทนของคลัสเตอร์ที่มีชีวิตขั้นต่ำที่จะมีอยู่ตลอดระยะเวลาของโปรแกรม
ดังนั้น ตัวปรับขนาดอัตโนมัติของเราจึงเขียนด้วย Python 3 ใช้ Ambari API เพื่อจัดการบริการคลัสเตอร์ การใช้งาน
สถาปัตยกรรมโซลูชัน
- โมดึล
autoscaler.py
. ประกอบด้วยสามคลาส: 1) ฟังก์ชันสำหรับการทำงานกับ Ambari 2) ฟังก์ชันสำหรับการทำงานกับ MCS 3) ฟังก์ชันที่เกี่ยวข้องโดยตรงกับลอจิกของตัวปรับขนาดอัตโนมัติ - สคริปต์
observer.py
. โดยพื้นฐานแล้วประกอบด้วยกฎที่แตกต่างกัน: เมื่อใดและเมื่อไรที่จะเรียกใช้ฟังก์ชันตัวปรับขนาดอัตโนมัติ - ไฟล์การกำหนดค่า
config.py
. ซึ่งประกอบด้วยรายการโหนดที่อนุญาตสำหรับการปรับขนาดอัตโนมัติและพารามิเตอร์อื่นๆ ที่ส่งผลต่อ เช่น ระยะเวลาในการรอนับจากเวลาที่เพิ่มโหนดใหม่ เป็นต้น นอกจากนี้ยังมีการประทับเวลาสำหรับการเริ่มคลาส ดังนั้นก่อนคลาส การกำหนดค่าคลัสเตอร์สูงสุดที่อนุญาตจะถูกเปิดใช้งาน
ตอนนี้เรามาดูส่วนของโค้ดภายในสองไฟล์แรกกัน
1. โมดูล Autoscaler.py
คลาสอัมบารี
นี่คือลักษณะของโค้ดที่มีคลาส Ambari
:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return message
ข้างต้นเป็นตัวอย่าง คุณสามารถดูการใช้งานฟังก์ชันได้ stop_all_services
ซึ่งจะหยุดบริการทั้งหมดบนโหนดคลัสเตอร์ที่ต้องการ
ที่ทางเข้าชั้นเรียน Ambari
คุณผ่าน:
ambari_url
เช่น เช่น'http://localhost:8080/api/v1/clusters/'
,cluster_name
– ชื่อคลัสเตอร์ของคุณใน Ambariheaders = {'X-Requested-By': 'ambari'}
- และข้างใน
auth
นี่คือข้อมูลเข้าสู่ระบบและรหัสผ่านของคุณสำหรับ Ambari:auth = ('login', 'password')
.
ฟังก์ชั่นนี้ไม่มีอะไรมากไปกว่าการโทรสองครั้งผ่าน REST API ไปยัง Ambari จากมุมมองเชิงตรรกะ อันดับแรกเราได้รับรายการบริการที่ทำงานอยู่บนโหนด จากนั้นจึงขอให้คลัสเตอร์ที่กำหนดบนโหนดที่กำหนด เพื่อถ่ายโอนบริการจากรายการไปยังสถานะ INSTALLED
. ฟังก์ชั่นสำหรับการเปิดตัวบริการทั้งหมด สำหรับการถ่ายโอนโหนดสู่สถานะ Maintenance
ฯลฯ ดูคล้ายกัน - เป็นเพียงคำขอบางส่วนผ่าน API
คลาสแมค
นี่คือลักษณะของโค้ดที่มีคลาส Mcs
:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_code
ที่ทางเข้าชั้นเรียน Mcs
เราส่งรหัสโครงการภายในคลาวด์และรหัสผู้ใช้ตลอดจนรหัสผ่านของเขา ในการทำงาน vm_turn_on
เราต้องการเปิดเครื่องเครื่องใดเครื่องหนึ่ง ตรรกะที่นี่ซับซ้อนกว่าเล็กน้อย ที่ตอนต้นของโค้ด มีการเรียกฟังก์ชันอื่นอีกสามฟังก์ชัน: 1) เราจำเป็นต้องได้รับโทเค็น 2) เราจำเป็นต้องแปลงชื่อโฮสต์เป็นชื่อของเครื่องใน MCS 3) รับ id ของเครื่องนี้ ต่อไป เราเพียงแค่ส่งคำขอโพสต์และเปิดตัวเครื่องนี้
นี่คือลักษณะของฟังก์ชันในการรับโทเค็น:
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.token
คลาสตัวปรับขนาดอัตโนมัติ
คลาสนี้มีฟังก์ชันที่เกี่ยวข้องกับตรรกะการดำเนินงานนั่นเอง
นี่คือลักษณะของโค้ดสำหรับคลาสนี้:
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return message
เรารับเข้าเรียน Ambari
и Mcs
รายการโหนดที่ได้รับอนุญาตให้ปรับขนาด รวมถึงพารามิเตอร์การกำหนดค่าโหนด: หน่วยความจำและ cpu ที่จัดสรรให้กับโหนดใน YARN นอกจากนี้ยังมีพารามิเตอร์ภายใน 2 ตัว q_ram, q_cpu ซึ่งเป็นคิว เมื่อใช้สิ่งเหล่านี้เราจะจัดเก็บค่าของโหลดคลัสเตอร์ปัจจุบัน หากเราเห็นว่าในช่วง 5 นาทีที่ผ่านมามีการโหลดเพิ่มขึ้นอย่างต่อเนื่อง เราก็ตัดสินใจว่าจะต้องเพิ่มโหนด +1 ให้กับคลัสเตอร์ เช่นเดียวกับสถานะการใช้งานคลัสเตอร์น้อยเกินไป
โค้ดด้านบนเป็นตัวอย่างของฟังก์ชันที่จะลบเครื่องออกจากคลัสเตอร์และหยุดเครื่องในระบบคลาวด์ ขั้นแรกจะมีการรื้อถอน YARN Nodemanager
จากนั้นโหมดจะเปิดขึ้น Maintenance
จากนั้นเราจะหยุดบริการทั้งหมดบนเครื่องและปิดเครื่องเสมือนในระบบคลาวด์
2. สคริปต์ Observer.py
โค้ดตัวอย่างจากที่นั่น:
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)
ในนั้น เราจะตรวจสอบว่ามีการสร้างเงื่อนไขเพื่อเพิ่มความจุของคลัสเตอร์หรือไม่ และมีเครื่องใด ๆ สำรองหรือไม่ รับชื่อโฮสต์ของหนึ่งในนั้น เพิ่มลงในคลัสเตอร์ และเผยแพร่ข้อความเกี่ยวกับเครื่องนั้นใน Slack ของทีมของเรา หลังจากนั้นจะเริ่ม cooldown_period
เมื่อเราไม่เพิ่มหรือลบสิ่งใดออกจากคลัสเตอร์ แต่เพียงตรวจสอบโหลด หากมีความเสถียรและอยู่ภายในทางเดินของค่าโหลดที่เหมาะสมที่สุด เราก็จะตรวจสอบต่อไป หากโหนดหนึ่งไม่เพียงพอ เราจะเพิ่มอีกโหนดหนึ่ง
สำหรับกรณีที่เรามีบทเรียนข้างหน้า เรารู้อยู่แล้วว่าโหนดเดียวจะไม่เพียงพอ ดังนั้นเราจึงเริ่มโหนดฟรีทั้งหมดทันทีและปล่อยให้โหนดเหล่านั้นใช้งานได้จนจบบทเรียน สิ่งนี้เกิดขึ้นโดยใช้รายการการประทับเวลาของกิจกรรม
ข้อสรุป
ตัวปรับขนาดอัตโนมัติเป็นโซลูชันที่ดีและสะดวกสำหรับกรณีเหล่านั้นเมื่อคุณพบปัญหาการโหลดคลัสเตอร์ที่ไม่สม่ำเสมอ คุณได้รับการกำหนดค่าคลัสเตอร์ที่ต้องการสำหรับการโหลดสูงสุดไปพร้อมๆ กัน และในขณะเดียวกันก็ไม่ต้องเก็บคลัสเตอร์นี้ไว้ในระหว่างการโหลดน้อยเกินไป ซึ่งช่วยประหยัดเงินได้ นอกจากนี้ทั้งหมดนี้จะเกิดขึ้นโดยอัตโนมัติโดยที่คุณไม่ต้องมีส่วนร่วม ตัวปรับขนาดอัตโนมัตินั้นไม่มีอะไรมากไปกว่าชุดคำขอไปยัง API ตัวจัดการคลัสเตอร์และ API ของผู้ให้บริการคลาวด์ ซึ่งเขียนตามตรรกะบางอย่าง สิ่งที่คุณต้องจำไว้อย่างแน่นอนคือการแบ่งโหนดออกเป็น 3 ประเภทดังที่เราเขียนไว้ก่อนหน้านี้ และคุณจะมีความสุข
ที่มา: will.com