سلام! ما افراد را برای کار با داده های بزرگ آموزش می دهیم. تصور یک برنامه آموزشی روی داده های بزرگ بدون خوشه خاص خود غیرممکن است که همه شرکت کنندگان روی آن با هم کار کنند. به همین دلیل، برنامه ما همیشه آن را دارد 🙂 ما درگیر پیکربندی، تنظیم و مدیریت آن هستیم و بچه ها مستقیماً کارهای MapReduce را در آنجا راه اندازی می کنند و از Spark استفاده می کنند.
در این پست به شما خواهیم گفت که چگونه مشکل بارگیری ناهموار خوشه را با نوشتن خودکار مقیاس کننده خود با استفاده از ابر حل کردیم.
مشکل
خوشه ما در حالت معمولی استفاده نمی شود. دفع بسیار ناهموار است. به عنوان مثال، کلاس های عملی وجود دارد که هر 30 نفر و یک معلم به کلاستر می روند و شروع به استفاده از آن می کنند. یا دوباره، روزهایی قبل از ضرب الاجل وجود دارد که بار به شدت افزایش می یابد. مابقی زمان خوشه در حالت زیر بار کار می کند.
راه حل شماره 1 این است که خوشه ای را نگه دارید که بارهای اوج را تحمل کند، اما بقیه زمان ها بیکار باشد.
راه حل شماره 2 نگه داشتن یک خوشه کوچک است که به صورت دستی قبل از کلاس ها و در زمان اوج بار، گره ها را به آن اضافه می کنید.
راه حل شماره 3 این است که یک خوشه کوچک نگه دارید و یک خودکار مقیاس کننده بنویسید که بار فعلی خوشه را نظارت می کند و با استفاده از API های مختلف، گره ها را از خوشه اضافه و حذف می کند.
در این پست در مورد راه حل شماره 3 صحبت خواهیم کرد. این خودکار مقیاسکننده بهجای عوامل داخلی به شدت به عوامل خارجی وابسته است و ارائهدهندگان اغلب آن را ارائه نمیدهند. ما از زیرساخت ابری Mail.ru Cloud Solutions استفاده می کنیم و با استفاده از MCS API یک مقیاس خودکار نوشتیم. و از آنجایی که ما نحوه کار با داده ها را آموزش می دهیم، تصمیم گرفتیم نشان دهیم که چگونه می توانید مقیاس خودکار مشابهی را برای اهداف خود بنویسید و از آن در فضای ابری خود استفاده کنید.
پیش نیازها
ابتدا باید یک خوشه Hadoop داشته باشید. به عنوان مثال، ما از توزیع HDP استفاده می کنیم.
برای اینکه گره های شما به سرعت اضافه و حذف شوند، باید توزیع مشخصی از نقش ها در بین گره ها داشته باشید.
- گره اصلی. خوب، نیازی به توضیح خاصی نیست: گره اصلی خوشه، که برای مثال، درایور Spark روی آن راه اندازی می شود، اگر از حالت تعاملی استفاده می کنید.
- گره تاریخ. این گره ای است که داده ها را روی HDFS ذخیره می کنید و محاسبات در آن انجام می شود.
- گره محاسباتی این گره ای است که در آن چیزی در HDFS ذخیره نمی کنید، اما محاسبات در آن انجام می شود.
نکته مهم. مقیاس خودکار به دلیل گره های نوع سوم رخ می دهد. اگر شروع به گرفتن و اضافه کردن گرههای نوع دوم کنید، سرعت پاسخدهی بسیار کم خواهد بود - از کار انداختن و ارتکاب مجدد ساعتها در خوشه شما طول میکشد. البته این چیزی نیست که شما از مقیاس خودکار انتظار دارید. یعنی گره های نوع اول و دوم را لمس نمی کنیم. آنها حداقل خوشه قابل دوام را نشان خواهند داد که در طول مدت برنامه وجود خواهد داشت.
بنابراین، مقیاسکننده خودکار ما در پایتون 3 نوشته شده است، از Ambari API برای مدیریت خدمات خوشهای استفاده میکند.
معماری راه حل
- مدول
autoscaler.py
. این شامل سه کلاس است: 1) توابع برای کار با Ambari، 2) توابع برای کار با MCS، 3) توابع مرتبط مستقیم با منطق خودکار مقیاس. - اسکریپت
observer.py
. اساساً از قوانین مختلفی تشکیل شده است: چه زمانی و در چه لحظاتی باید توابع خودکار مقیاس کننده را فراخوانی کرد. - فایل پیکربندی
config.py
. برای مثال، این شامل فهرستی از گرههای مجاز برای مقیاسبندی خودکار و سایر پارامترهایی است که بر مدت زمان انتظار از لحظه اضافه شدن یک گره جدید تأثیر میگذارند. همچنین مهرهای زمانی برای شروع کلاس ها وجود دارد، به طوری که قبل از کلاس حداکثر پیکربندی کلاستر مجاز راه اندازی می شود.
بیایید اکنون به تکه های کد داخل دو فایل اول نگاه کنیم.
1. ماژول Autoscaler.py
کلاس عمبری
این همان چیزی است که یک قطعه کد حاوی یک کلاس به نظر می رسد Ambari
:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return message
در بالا، به عنوان مثال، می توانید به اجرای تابع نگاه کنید stop_all_services
، که تمام خدمات را در گره خوشه مورد نظر متوقف می کند.
در ورودی کلاس Ambari
شما عبور می کنید:
ambari_url
به عنوان مثال، مانند'http://localhost:8080/api/v1/clusters/'
,cluster_name
– نام خوشه شما در آمبری،headers = {'X-Requested-By': 'ambari'}
- و داخل
auth
در اینجا نام کاربری و رمز عبور شما برای آمبری آمده است:auth = ('login', 'password')
.
خود این تابع چیزی بیش از چند تماس از طریق REST API به Ambari نیست. از نقطه نظر منطقی، ما ابتدا لیستی از خدمات در حال اجرا را در یک گره دریافت می کنیم، و سپس در یک خوشه معین، در یک گره داده شده، می خواهیم خدمات را از لیست به وضعیت منتقل کنیم. INSTALLED
. توابعی برای راه اندازی همه سرویس ها، برای انتقال گره ها به حالت Maintenance
و غیره شبیه به هم هستند - آنها فقط چند درخواست از طریق API هستند.
کلاس مک
این همان چیزی است که یک قطعه کد حاوی یک کلاس به نظر می رسد Mcs
:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_code
در ورودی کلاس Mcs
شناسه پروژه را داخل ابر و شناسه کاربری و همچنین رمز عبور او را ارسال می کنیم. در عمل vm_turn_on
می خواهیم یکی از ماشین ها را روشن کنیم. منطق اینجا کمی پیچیده تر است. در ابتدای کد، سه تابع دیگر نامیده می شود: 1) باید یک توکن دریافت کنیم، 2) باید نام میزبان را به نام ماشین در MCS تبدیل کنیم، 3) شناسه این ماشین را دریافت می کنیم. در مرحله بعد، ما به سادگی یک درخواست پست می کنیم و این دستگاه را راه اندازی می کنیم.
تابع بدست آوردن توکن به این صورت است:
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.token
کلاس Autoscaler
این کلاس حاوی توابع مربوط به خود منطق عملیاتی است.
یک قطعه کد برای این کلاس به این صورت است:
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return message
ما کلاس ها را برای ورود می پذیریم. Ambari
и Mcs
، لیستی از گره هایی که برای مقیاس بندی مجاز هستند، و همچنین پارامترهای پیکربندی گره: حافظه و پردازنده مرکزی اختصاص داده شده به گره در YARN. همچنین 2 پارامتر داخلی q_ram, q_cpu وجود دارد که به صورت صف هستند. با استفاده از آنها، مقادیر بار کلاستر فعلی را ذخیره می کنیم. اگر ببینیم که در طول 5 دقیقه گذشته به طور مداوم بار افزایش یافته است، تصمیم می گیریم که باید گره +1 را به خوشه اضافه کنیم. همین امر در مورد حالت عدم استفاده از خوشه نیز صادق است.
کد بالا نمونه ای از تابعی است که ماشینی را از خوشه حذف می کند و آن را در فضای ابری متوقف می کند. اول از کار افتادن است YARN Nodemanager
، سپس حالت روشن می شود Maintenance
، سپس تمام سرویس های موجود در دستگاه را متوقف می کنیم و ماشین مجازی را در فضای ابری خاموش می کنیم.
2. اسکریپت observer.py
نمونه کد از آنجا:
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)
در آن بررسی میکنیم که آیا شرایطی برای افزایش ظرفیت کلاستر ایجاد شده است و آیا ماشینهایی در رزرو وجود دارد یا خیر، نام میزبان یکی از آنها را دریافت میکنیم، آن را به خوشه اضافه میکنیم و پیامی در مورد آن در Slack تیم خود منتشر میکنیم. پس از آن شروع می شود cooldown_period
، زمانی که چیزی از خوشه اضافه یا حذف نمی کنیم، بلکه به سادگی بارگذاری را نظارت می کنیم. اگر تثبیت شده باشد و در دالان مقادیر بهینه بار باشد، به سادگی به نظارت ادامه می دهیم. اگر یک گره کافی نبود، یکی دیگر را اضافه می کنیم.
برای مواردی که درسی در پیش داریم، از قبل مطمئن هستیم که یک گره کافی نخواهد بود، بنابراین بلافاصله تمام گره های آزاد را راه اندازی می کنیم و آنها را تا پایان درس فعال نگه می داریم. این با استفاده از فهرستی از مُهرهای فعالیت اتفاق میافتد.
نتیجه
Autoscaler یک راه حل خوب و راحت برای مواردی است که بارگذاری ناهموار خوشه ای را تجربه می کنید. شما به طور همزمان به پیکربندی خوشه دلخواه برای بارهای پیک دست مییابید و در عین حال این خوشه را در زمان کمبار نگهداری نمیکنید و در هزینه صرفهجویی میکنید. خوب، به علاوه این همه به طور خودکار بدون مشارکت شما اتفاق می افتد. خود مقیاسکننده خودکار چیزی بیش از مجموعهای از درخواستها به API مدیر خوشه و API ارائهدهنده ابری نیست که بر اساس یک منطق خاص نوشته شدهاند. آنچه شما قطعاً باید به خاطر بسپارید، تقسیم گره ها به 3 نوع است، همانطور که قبلاً نوشتیم. و شما خوشحال خواهید شد.
منبع: www.habr.com