你好! 我們訓練人們使用大數據。 無法想像一個大數據教育計畫沒有自己的集群,所有參與者都在集群上共同工作。 出於這個原因,我們的程式總是有它 🙂 我們參與它的配置、調整和管理,並且這些人直接在那裡啟動 MapReduce 作業並使用 Spark。
在這篇文章中,我們將告訴您如何透過使用雲端編寫我們自己的自動縮放器來解決叢集負載不均勻的問題
問題
我們的集群不是在典型模式下使用的。 處置極不均勻。 比如說有實作課,30個人加上一個老師都到集群裡開始使用。 或者,在截止日期前的幾天,負載會大幅增加。 其餘時間集群以欠載模式運作。
解決方案#1 是保持一個能夠承受尖峰負載但在其餘時間處於空閒狀態的叢集。
解決方案#2 是保留一個小型集群,您可以在類別之前和峰值負載期間手動向其中新增節點。
解決方案#3 是保留一個小型叢集並編寫一個自動縮放程式來監視叢集的當前負載,並使用各種 API 在叢集中新增和刪除節點。
在這篇文章中,我們將討論解決方案#3。 此自動縮放器高度依賴外部因素而不是內部因素,並且提供者通常不提供它。 我們使用 Mail.ru Cloud Solutions 雲端基礎設施,並使用 MCS API 編寫了一個自動縮放器。 由於我們教授如何處理數據,因此我們決定向您展示如何根據自己的目的編寫類似的自動縮放器並將其與您的雲端一起使用
條件:
首先,您必須有一個 Hadoop 叢集。 例如,我們使用 HDP 發行版。
為了使您的節點能夠快速新增和刪除,您必須在節點之間具有一定的角色分配。
- 主節點。 嗯,不需要特別解釋什麼:叢集的主節點,例如,如果使用互動模式,則在該節點上啟動 Spark 驅動程式。
- 日期節點。 這是在 HDFS 上儲存資料並進行計算的節點。
- 計算節點。 在這個節點上,您不會在 HDFS 上儲存任何內容,但會進行計算。
很重要的一點。 由於第三種類型的節點,將會發生自動縮放。 如果您開始取得和新增第二種類型的節點,回應速度將非常低 - 在叢集上停用和重新提交將需要數小時。 當然,這不是您對自動縮放的期望。 也就是說,我們不接觸第一類和第二類節點。 它們將代表在整個計劃期間存在的最小可行集群。
因此,我們的自動縮放器是用 Python 3 編寫的,使用 Ambari API 來管理叢集服務,使用
解決方案架構
- 模
autoscaler.py
。 它包含三個類別:1)用於使用 Ambari 的函數,2)用於使用 MCS 的函數,3)與自動縮放器邏輯直接相關的函數。 - 腳本
observer.py
。 本質上它由不同的規則組成:何時以及在什麼時刻調用自動縮放器函數。 - 設定檔
config.py
。 例如,它包含允許自動縮放的節點清單以及其他影響的參數,例如,從新增節點的那一刻起等待多長時間。 還有課程開始的時間戳,以便在課程之前啟動最大允許的叢集配置。
現在讓我們來看看前兩個檔案中的程式碼片段。
1.Autoscaler.py模組
安巴里級
這就是一段包含類別的程式碼的樣子 Ambari
:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return message
上面作為例子,可以看一下函數的實現 stop_all_services
,這會停止所需叢集節點上的所有服務。
在班級門口 Ambari
你過關了:
ambari_url
,例如,像'http://localhost:8080/api/v1/clusters/'
,cluster_name
– 您的群組在 Ambari 中的名稱,headers = {'X-Requested-By': 'ambari'}
- 和裡面
auth
這是您的 Ambari 使用者名稱和密碼:auth = ('login', 'password')
.
該函數本身只不過是透過 REST API 對 Ambari 進行幾次呼叫。 從邏輯的角度來看,我們首先收到一個節點上正在運行的服務的列表,然後在給定的叢集、給定的節點上請求將服務從列表轉移到狀態 INSTALLED
。 用於啟動所有服務、將節點轉移到狀態的函數 Maintenance
等等看起來很相似——它們只是透過 API 的一些請求。
麥克斯級
這就是一段包含類別的程式碼的樣子 Mcs
:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_code
在班級門口 Mcs
我們傳遞雲端中的項目 ID 和使用者 ID,以及他的密碼。 功能中 vm_turn_on
我們想打開其中一台機器。 這裡的邏輯有點複雜。 在程式碼的開頭,呼叫了另外三個函數:1)我們需要取得令牌,2)我們需要將主機名稱轉換為MCS中機器的名稱,3)取得這台機器的id。 接下來,我們只需發出一個發布請求並啟動這台機器。
取得令牌的函數如下所示:
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.token
自動定標器類
該類別包含與操作邏輯本身相關的函數。
這個類別的一段程式碼如下所示:
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return message
我們接受入學課程。 Ambari
и Mcs
,允許擴展的節點列表,以及節點配置參數:YARN 中分配給該節點的記憶體和 cpu。 還有2個內部參數q_ram、q_cpu,都是佇列。 使用它們,我們儲存當前叢集負載的值。 如果我們發現在過去 5 分鐘內負載持續增加,那麼我們決定需要在叢集中新增 +1 個節點。 集群未充分利用狀態也是如此。
上面的程式碼是從叢集中刪除機器並將其停止在雲端中的函數範例。 首先是退役 YARN Nodemanager
,然後模式開啟 Maintenance
,然後我們停止機器上的所有服務,並關閉雲端的虛擬機器。
2.腳本observer.py
那裡的範例程式碼:
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)
在其中,我們檢查是否已經創建了增加叢集容量的條件以及是否有任何預留的機器,獲取其中一台機器的主機名,將其添加到叢集中並在我們團隊的Slack 上發布有關它的消息。 之後就開始了 cooldown_period
,當我們不從叢集中新增或刪除任何內容,而只是監視負載時。 如果它已經穩定並且處於最佳負載值的範圍內,那麼我們只需繼續監控即可。 如果一個節點不夠,那我們再增加另一個節點。
對於前面有課的情況,我們已經確定一個節點是不夠的,因此我們立即啟動所有空閒節點並保持它們活動直到課程結束。 這是使用活動時間戳列表來實現的。
結論
當您遇到叢集負載不均勻的情況時,Autoscaler 是一個很好且方便的解決方案。 您可以同時實現峰值負載所需的集群配置,同時不會在負載不足時保留該集群,從而節省資金。 好吧,而且這一切都是自動發生的,無需您的參與。 自動縮放器本身無非是一組對叢集管理器 API 和雲端提供者 API 的請求,依照一定的邏輯編寫。 正如我們之前所寫,您絕對需要記住的是節點分為 3 種類型。 你會很高興。
來源: www.habr.com