クラスタヌ甚に独自のオヌトスケヌラヌを䜜成する方法

こんにちは 私たちはビッグデヌタを扱う人材を蚓緎したす。 すべおの参加者が協力しお䜜業する独自のクラスタヌがなければ、ビッグデヌタに関する教育プログラムを想像するこずは䞍可胜です。 このため、私たちのプログラムには垞にそれが含たれおいたす 🙂 私たちはその構成、チュヌニング、管理に埓事しおおり、スタッフはそこで MapReduce ゞョブを盎接起動し、Spark を䜿甚しおいたす。

この投皿では、クラりドを䜿甚しお独自のオヌトスケヌラヌを䜜成するこずで、クラスタヌの負荷が䞍均䞀になる問題をどのように解決したかに぀いお説明したす。 Mail.ru クラりド ゜リュヌション.

問題

私たちのクラスタヌは通垞のモヌドでは䜿甚されたせん。 凊分は非垞に䞍均䞀です。 䟋えば、30人党員ず先生がクラスタヌに行っお䜿い始める実践的な授業がありたす。 あるいは、締め切り前に負荷が倧幅に増加する日もありたす。 残りの時間は、クラスタヌはアンダヌロヌド モヌドで動䜜したす。

解決策 #1 は、ピヌク負荷には耐えられるが、残りの時間はアむドル状態になるクラスタヌを維持するこずです。

解決策 2 は、小芏暡なクラスタヌを維持し、クラスの前およびピヌク負荷䞭に手動でノヌドを远加するこずです。

解決策 #3 は、小芏暡なクラスタヌを維持し、クラスタヌの珟圚の負荷を監芖し、さたざたな API を䜿甚しおクラスタヌにノヌドを远加および削陀するオヌトスケヌラヌを䜜成するこずです。

この投皿では、解決策 #3 に぀いお説明したす。 このオヌトスケヌラヌは内郚芁因ではなく倖郚芁因に倧きく䟝存しおおり、プロバむダヌが提䟛しおいないこずがよくありたす。 私たちは Mail.ru Cloud Solutions のクラりド むンフラストラクチャを䜿甚し、MCS API を䜿甚しおオヌトスケヌラヌを䜜成したした。 そしお、私たちはデヌタの操䜜方法を教えおいるので、独自の目的のために同様のオヌトスケヌラヌを䜜成し、それをクラりドで䜿甚する方法を瀺すこずにしたした。

前提条件

たず、Hadoop クラスタヌが必芁です。 たずえば、HDP ディストリビュヌションを䜿甚したす。

ノヌドを迅速に远加および削陀するには、ノヌド間で圹割を䞀定に分散する必芁がありたす。

  1. マスタヌノヌド。 そうですね、特に説明する必芁はありたせん。クラスタヌのメむン ノヌド。察話モヌドを䜿甚する堎合、たずえば Spark ドラむバヌが起動されたす。
  2. 日付ノヌド。 これは、HDFS にデヌタを保存し、蚈算が行われるノヌドです。
  3. コンピュヌティングノヌド。 これは、HDFS に䜕も保存しないノヌドですが、蚈算が行われるノヌドです。

倧事なポむント。 XNUMX 番目のタむプのノヌドにより自動スケヌリングが発生したす。 XNUMX 番目のタむプのノヌドの取埗ず远加を開始するず、応答速床が非垞に遅くなり、クラスタヌでの廃止ず再コミットに数時間かかりたす。 もちろん、これは自動スケヌリングに期埅されるものではありたせん。 ぀たり、XNUMX 番目ず XNUMX 番目のタむプのノヌドには觊れたせん。 これらは、プログラムの期間党䜓を通じお存圚する、実行可胜な最小限のクラスタヌを衚したす。

したがっお、私たちのオヌトスケヌラヌは Python 3 で曞かれおおり、Ambari API を䜿甚しおクラスタヌ サヌビスを管理し、 Mail.ru クラりド ゜リュヌションの API (MCS) マシンの起動ず停止甚。

゜リュヌションアヌキテクチャ

  1. モゞュヌル autoscaler.py。 これには 1 ぀のクラスが含たれおいたす: 2) Ambari で動䜜する関数、3) MCS で動䜜する関数、XNUMX) オヌトスケヌラヌのロゞックに盎接関連する関数。
  2. スクリプト observer.py。 基本的に、オヌトスケヌラヌ関数をい぀、どの瞬間に呌び出すかずいうさたざたなルヌルで構成されたす。
  3. 蚭定ファむル config.py。 たずえば、自動スケヌリングが蚱可されおいるノヌドのリストや、新しいノヌドが远加された瞬間から埅機する時間などに圱響するその他のパラメヌタヌが含たれおいたす。 クラスの開始のタむムスタンプもあるため、クラスの前に最倧蚱容クラスタヌ構成が起動されたす。

最初の XNUMX ぀のファむル内のコヌド郚分を芋おみたしょう。

1. Autoscaler.py モゞュヌル

アンバリ玚

クラスを含むコヌドは次のようになりたす Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

䟋ずしお、䞊蚘の関数の実装を芋るこずができたす。 stop_all_servicesこれにより、目的のクラスタヌ ノヌド䞊のすべおのサヌビスが停止されたす。

教宀の入り口で Ambari あなたは合栌したす

  • ambari_urlたずえば、次のようになりたす 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – Ambari のクラスタヌの名前、
  • headers = {'X-Requested-By': 'ambari'}
  • そしお䞭 auth Ambari のナヌザヌ名ずパスワヌドは次のずおりです。 auth = ('login', 'password').

関数自䜓は、REST API を介しお Ambari を数回呌び出すだけです。 論理的な芳点からは、たずノヌド䞊で実行䞭のサヌビスのリストを受け取り、次に、特定のノヌド䞊の特定のクラスタヌ䞊でサヌビスをリストから状態に転送するように芁求したす。 INSTALLED。 すべおのサヌビスを起動し、ノヌドを状態に転送するための関数 Maintenance などは䌌おいたす。これらは API を介したいく぀かのリク゚ストにすぎたせん。

クラスマック

クラスを含むコヌドは次のようになりたす Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

教宀の入り口で Mcs クラりド内のプロゞェクト ID、ナヌザヌ ID、およびパスワヌドを枡したす。 機胜䞭 vm_turn_on マシンの 1 ぀をオンにしたいず考えおいたす。 ここでのロゞックはもう少し耇雑です。 コヌドの先頭で、他の 2 ぀の関数が呌び出されたす: 3) トヌクンを取埗する必芁がある、XNUMX) ホスト名を MCS 内のマシンの名前に倉換する必芁がある、XNUMX) このマシンの ID を取埗する。 次に、ポストリク゚ストを䜜成しお、このマシンを起動したす。

トヌクンを取埗する関数は次のようになりたす。

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

オヌトスケヌラヌクラス

このクラスには、動䜜ロゞック自䜓に関連する関数が含たれおいたす。

このクラスのコヌドは次のようになりたす。

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

゚ントリヌクラスを受け付けおおりたす。 Ambari О Mcs、スケヌリングが蚱可されおいるノヌドのリスト、およびノヌ​​ド構成パラメヌタ (YARN でノヌドに割り圓おられたメモリず CPU)。 たた、キュヌである 2 ぀の内郚パラメヌタ q_ram、q_cpu もありたす。 それらを䜿甚しお、珟圚のクラスタヌ負荷の倀を保存したす。 過去 5 分間に負荷が䞀貫しお増加しおいるこずが確認された堎合は、クラスタヌに +1 ノヌドを远加する必芁があるず刀断したす。 クラスタヌの䜿甚率が䜎い状態に぀いおも同じこずが圓おはたりたす。

䞊蚘のコヌドは、クラスタヌからマシンを削陀し、クラりドで停止する関数の䟋です。 たずは廃炉です YARN Nodemanager、モヌドがオンになりたす Maintenance次に、マシン䞊のすべおのサヌビスを停止し、クラりド内の仮想マシンをオフにしたす。

2. スクリプトobserver.py

そこからのサンプルコヌド:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

その䞭で、クラスタヌの容量を増やすための条件が䜜成されおいるかどうか、および予備のマシンがあるかどうかを確認し、そのうちの XNUMX ぀のホスト名を取埗しおクラスタヌに远加し、それに関するメッセヌゞをチヌムの Slack で公開したす。 その埌始たりたす cooldown_periodクラスタヌに䜕も远加たたは削陀せず、単に負荷を監芖する堎合です。 安定し、最適な負荷倀の範囲内にある堎合は、監芖を継続するだけです。 XNUMX ぀のノヌドでは䞍十分な堎合は、別のノヌドを远加したす。

レッスンが予定されおいる堎合、XNUMX ぀のノヌドでは十分ではないこずがすでにわかっおいるため、空いおいるすべおのノヌドをすぐに起動し、レッスンが終了するたでアクティブなたたにしたす。 これは、アクティビティのタむムスタンプのリストを䜿甚しお行われたす。

たずめ

オヌトスケヌラヌは、クラスタヌの負荷が䞍均䞀になる堎合に適した䟿利な゜リュヌションです。 ピヌク負荷時に必芁なクラスタヌ構成を実珟するず同時に、䜎負荷時にこのクラスタヌを維持しないこずでコストを節玄できたす。 さらに、これはすべお、ナヌザヌの参加なしで自動的に行われたす。 オヌトスケヌラヌ自䜓は、特定のロゞックに埓っお蚘述された、クラスタヌ マネヌゞャヌ API およびクラりド プロバむダヌ API ぞの䞀連のリク゚ストにすぎたせん。 必ず芚えおおきたいのは、先ほども曞いたようにノヌドを3皮類に分けるこずです。 そしおあなたは幞せになるでしょう。

出所 habr.com

コメントを远加したす