Merhaba! İnsanları büyük verilerle çalışacak şekilde eğitiyoruz. Büyük veri üzerine, tüm katılımcıların birlikte çalıştığı, kendi kümesi olmayan bir eğitim programı hayal etmek imkansızdır. Bu nedenle programımızda her zaman var 🙂 Biz onun konfigürasyonu, ayarı ve yönetimi ile ilgileniyoruz ve adamlar doğrudan orada MapReduce işlerini başlatıyor ve Spark'ı kullanıyor.
Bu yazıda, bulutu kullanarak kendi otomatik ölçekleyicimizi yazarak eşit olmayan küme yükleme sorununu nasıl çözdüğümüzü anlatacağız. .
Sorun
Kümemiz tipik bir modda kullanılmaz. Bertaraf oldukça düzensizdir. Mesela 30 kişinin tamamı ve bir öğretmenin kümeye gidip kullanmaya başladığı uygulamalı dersler var. Ya da yine son teslim tarihine kala yükün çok arttığı günler oluyor. Zamanın geri kalanında küme düşük yük modunda çalışır.
1. Çözüm, en yüksek yüklere dayanabilecek ancak geri kalan zamanda boşta kalacak bir kümeyi tutmaktır.
2. Çözüm, derslerden önce ve yoğun yükler sırasında düğümleri manuel olarak eklediğiniz küçük bir kümeyi tutmaktır.
3. Çözüm, küçük bir küme tutmak ve kümenin mevcut yükünü izleyecek ve çeşitli API'leri kullanarak kümeye düğüm ekleyip kaldıracak bir otomatik ölçekleyici yazmaktır.
Bu yazıda 3 numaralı çözümden bahsedeceğiz. Bu otomatik ölçekleyici, dahili faktörlerden çok dış faktörlere oldukça bağımlıdır ve sağlayıcılar genellikle bunu sağlamaz. Mail.ru Cloud Solutions bulut altyapısını kullanıyoruz ve MCS API'yi kullanarak bir otomatik ölçekleyici yazdık. Verilerle nasıl çalışılacağını öğrettiğimiz için, benzer bir otomatik ölçekleyiciyi kendi amaçlarınız için nasıl yazabileceğinizi ve bulutunuzla nasıl kullanabileceğinizi göstermeye karar verdik.
Önkoşullar
Öncelikle bir Hadoop kümesine sahip olmanız gerekir. Mesela HDP dağıtımını kullanıyoruz.
Düğümlerinizin hızlı bir şekilde eklenip çıkarılabilmesi için düğümler arasında belirli bir rol dağılımının olması gerekir.
- Ana düğüm. Özel olarak herhangi bir şeyi açıklamaya gerek yok: etkileşimli modu kullanıyorsanız, örneğin Spark sürücüsünün başlatıldığı kümenin ana düğümü.
- Tarih düğümü. Bu, HDFS'de veri depoladığınız ve hesaplamaların gerçekleştiği düğümdür.
- Bilgi işlem düğümü. Bu, HDFS'de hiçbir şey depolamadığınız ancak hesaplamaların gerçekleştiği bir düğümdür.
Önemli nokta. Üçüncü tipteki düğümler nedeniyle otomatik ölçeklendirme meydana gelecektir. İkinci türden düğümleri alıp eklemeye başlarsanız yanıt hızı çok düşük olacaktır; kümenizde hizmetten çıkarma ve yeniden bağlama işlemleri saatler sürecektir. Otomatik ölçeklendirmeden beklediğiniz şey elbette bu değil. Yani birinci ve ikinci tipteki düğümlere dokunmuyoruz. Program süresince var olacak minimum uygulanabilir kümeyi temsil edeceklerdir.
Dolayısıyla, otomatik ölçekleyicimiz Python 3'te yazılmıştır, küme hizmetlerini yönetmek için Ambari API'sini kullanır, (MCS) makineleri başlatmak ve durdurmak için.
Çözüm mimarisi
- Modül
autoscaler.py. Üç sınıf içerir: 1) Ambari ile çalışmaya yönelik işlevler, 2) MCS ile çalışmaya yönelik işlevler, 3) doğrudan otomatik ölçekleyicinin mantığıyla ilgili işlevler. - Senaryo
observer.py. Temel olarak farklı kurallardan oluşur: otomatik ölçekleyici işlevlerinin ne zaman ve hangi anlarda çağrılacağı. - Yapılandırma dosyası
config.py. Örneğin, otomatik ölçeklendirme için izin verilen düğümlerin bir listesini ve örneğin yeni bir düğümün eklendiği andan itibaren ne kadar bekleneceğini etkileyen diğer parametreleri içerir. Ayrıca sınıfların başlangıcı için zaman damgaları da vardır, böylece sınıftan önce izin verilen maksimum küme yapılandırması başlatılır.
Şimdi ilk iki dosyanın içindeki kod parçalarına bakalım.
1. Autoscaler.py modülü
Ambari sınıfı
Bir sınıfı içeren bir kod parçası böyle görünür Ambari:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return messageYukarıda örnek olarak fonksiyonun uygulanmasına bakabilirsiniz. stop_all_services, istenen küme düğümündeki tüm hizmetleri durdurur.
Sınıfın girişinde Ambari geçersin:
ambari_urlörneğin, şunun gibi'http://localhost:8080/api/v1/clusters/',cluster_name– Ambari'deki kümenizin adı,headers = {'X-Requested-By': 'ambari'}- ve içeride
authAmbari için kullanıcı adınız ve şifreniz:auth = ('login', 'password').
İşlevin kendisi, REST API aracılığıyla Ambari'ye yapılan birkaç çağrıdan başka bir şey değildir. Mantıksal bir bakış açısından, önce bir düğümde çalışan hizmetlerin bir listesini alırız ve ardından belirli bir kümede, belirli bir düğümde, hizmetleri listeden duruma aktarmamızı isteriz. INSTALLED. Düğümleri duruma aktarmak için tüm hizmetleri başlatma işlevleri Maintenance vb. benzer görünüyor; bunlar API aracılığıyla yapılan isteklerden yalnızca birkaçı.
Sınıf Mc'leri
Bir sınıfı içeren bir kod parçası böyle görünür Mcs:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_codeSınıfın girişinde Mcs proje kimliğini, kullanıcı kimliğini ve şifresini bulutun içine aktarıyoruz. İşlevde vm_turn_on makinelerden birini açmak istiyoruz. Buradaki mantık biraz daha karmaşıktır. Kodun başında diğer üç fonksiyon çağrılıyor: 1) bir token almamız gerekiyor, 2) ana bilgisayar adını MCS'deki makinenin adına dönüştürmemiz gerekiyor, 3) bu makinenin id'sini almamız gerekiyor. Daha sonra, bir gönderi isteğinde bulunup bu makineyi başlatıyoruz.
Bir jeton elde etme işlevi şöyle görünür:
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.tokenOtomatik ölçekleyici sınıfı
Bu sınıf, çalışma mantığının kendisiyle ilgili işlevleri içerir.
Bu sınıfa ait bir kod parçası şöyle görünür:
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return messageGiriş için ders kabul ediyoruz. Ambari и Mcs, ölçeklendirme için izin verilen düğümlerin yanı sıra düğüm yapılandırma parametrelerinin bir listesi: YARN'da düğüme tahsis edilen bellek ve işlemci. Ayrıca kuyruk olan 2 dahili parametre q_ram, q_cpu vardır. Bunları kullanarak mevcut küme yükünün değerlerini saklıyoruz. Eğer son 5 dakikadır sürekli artan bir yük olduğunu görürsek, Cluster’a +1 node eklememiz gerektiğine karar veririz. Aynı durum kümenin yetersiz kullanım durumu için de geçerlidir.
Yukarıdaki kod, bir makineyi kümeden çıkarıp bulutta durduran bir fonksiyon örneğidir. İlk önce hizmetten çıkarma var YARN Nodemanager, ardından mod açılır Maintenance, ardından makinedeki tüm hizmetleri durdurup buluttaki sanal makineyi kapatıyoruz.
2. Komut dosyası gözlemci.py
Oradan örnek kod:
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)İçinde, kümenin kapasitesini artırmak için koşulların yaratılıp yaratılmadığını ve yedekte makine olup olmadığını kontrol ediyoruz, bunlardan birinin ana bilgisayar adını alıyoruz, kümeye ekliyoruz ve ekibimizin Slack'inde bununla ilgili bir mesaj yayınlıyoruz. Bundan sonra başlıyor cooldown_period, kümeye hiçbir şey eklemediğimiz veya kümeden hiçbir şey çıkarmadığımız, yalnızca yükü izlediğimizde. Eğer stabil hale gelmişse ve optimum yük değerleri koridorundaysa izlemeye devam ederiz. Bir düğüm yeterli değilse bir tane daha ekliyoruz.
Önümüzde bir dersimiz olduğu durumlarda, bir düğümün yeterli olmayacağından emin olduğumuzdan, tüm boş düğümleri hemen başlatıp ders sonuna kadar aktif tutuyoruz. Bu, etkinlik zaman damgalarının bir listesi kullanılarak gerçekleşir.
Sonuç
Otomatik ölçekleyici, düzensiz küme yüklemesi yaşadığınız durumlar için iyi ve kullanışlı bir çözümdür. Pik yükler için istediğiniz küme konfigürasyonunu aynı anda elde edersiniz ve aynı zamanda bu kümeyi düşük yük sırasında tutmazsınız, böylece paradan tasarruf edersiniz. Üstelik bunların hepsi sizin katılımınız olmadan otomatik olarak gerçekleşiyor. Otomatik ölçekleyicinin kendisi, küme yöneticisi API'sine ve bulut sağlayıcı API'sine belirli bir mantığa göre yazılmış bir dizi istekten başka bir şey değildir. Kesinlikle hatırlamanız gereken şey, daha önce de yazdığımız gibi düğümlerin 3 türe ayrılmasıdır. Ve mutlu olacaksın.
Kaynak: habr.com
