Hogyan készítsünk saját autoscaler-t egy fürthöz

Helló! Megtanítjuk az embereket a nagy adatokkal való munkavégzésre. Lehetetlen elképzelni egy big data oktatási programot saját klaszter nélkül, amelyen minden résztvevő együtt dolgozik. Emiatt a mi programunkban mindig van 🙂 Mi foglalkozunk a konfigurálásával, tuningolásával és adminisztrációjával, a srácok pedig közvetlenül elindítják ott a MapReduce munkákat és a Sparkot használják.

Ebben a bejegyzésben elmeséljük, hogyan oldottuk meg az egyenetlen fürtbetöltés problémáját úgy, hogy saját autoscalerünket írtuk a felhő segítségével Mail.ru Cloud Solutions.

probléma

Klaszterünket nem egy tipikus módban használják. Az ártalmatlanítás nagyon egyenetlen. Például vannak gyakorlati órák, amikor mind a 30 ember és egy tanár bemegy a klaszterbe és elkezdi használni. Vagy megint vannak napok a határidő előtt, amikor nagyon megnő a terhelés. Az idő további részében a fürt alulterhelés üzemmódban működik.

Az 1. megoldás egy olyan fürt megtartása, amely ellenáll a csúcsterhelésnek, de a hátralévő időben tétlen lesz.

A 2. megoldás egy kis fürt megtartása, amelyhez manuálisan kell csomópontokat hozzáadni az órák előtt és a csúcsterhelés alatt.

A 3. megoldás egy kis fürt megtartása és egy automatikus skálázó írása, amely figyeli a fürt aktuális terhelését, és különféle API-k segítségével csomópontokat ad hozzá és távolít el a fürtből.

Ebben a bejegyzésben a 3. megoldásról fogunk beszélni. Ez az automatikus skálázó nagymértékben függ a külső tényezőktől, nem pedig a belsőektől, és a szolgáltatók gyakran nem biztosítják. A Mail.ru Cloud Solutions felhőinfrastruktúrát használjuk, és az MCS API-t használva automatikus skálázót írtunk. És mivel megtanítjuk, hogyan kell dolgozni az adatokkal, úgy döntöttünk, hogy megmutatjuk, hogyan írhat egy hasonló automatikus skálázót saját céljaira, és használhatja azt a felhőben.

Előfeltételek

Először is rendelkeznie kell egy Hadoop-fürttel. Például a HDP disztribúciót használjuk.

A csomópontok gyors hozzáadásához és eltávolításához a csomópontok között bizonyos szereposztásra van szükség.

  1. Főcsomópont. Nos, nem kell különösebben magyarázni: a fürt fő csomópontja, amelyen például a Spark illesztőprogram indul el, ha interaktív módot használunk.
  2. Dátum csomópont. Ez az a csomópont, amelyen adatokat tárol a HDFS-en, és ahol a számítások zajlanak.
  3. Számítási csomópont. Ez egy csomópont, ahol nem tárol semmit a HDFS-en, de ahol számítások történnek.

Fontos pont. Az automatikus skálázás a harmadik típusú csomópontok miatt történik. Ha elkezdi a második típusú csomópontok felvételét és hozzáadását, a válaszsebesség nagyon alacsony lesz - a leszerelés és az újraaktiválás órákig tart a fürtben. Ez természetesen nem az, amit az automatikus skálázástól elvár. Vagyis nem érintjük meg az első és a második típusú csomópontokat. Egy minimális életképes klasztert képviselnek, amely a program teljes időtartama alatt létezik.

Tehát az autoscalerünk Python 3-ban van írva, az Ambari API-t használja a fürtszolgáltatások kezelésére, API a Mail.ru Cloud Solutions-tól (MCS) gépek indításához és leállításához.

Megoldás architektúra

  1. Modul autoscaler.py. Három osztályt tartalmaz: 1) az Ambarival való munkavégzésre szolgáló funkciók, 2) az MCS-sel való munkavégzés funkciói, 3) az automatikus skálázó logikájához közvetlenül kapcsolódó funkciók.
  2. Forgatókönyv observer.py. Lényegében különböző szabályokból áll: mikor és mikor kell meghívni az autoscaler függvényeket.
  3. Konfigurációs fájl config.py. Tartalmazza például az automatikus skálázásra engedélyezett csomópontok listáját és egyéb paramétereket, amelyek például befolyásolják, hogy mennyi ideig kell várni az új csomópont hozzáadása pillanatától számítva. Vannak időbélyegek is az órák kezdetére, így az óra előtt elindul a maximálisan megengedett fürtkonfiguráció.

Nézzük most az első két fájlban található kódrészleteket.

1. Autoscaler.py modul

Ambari osztály

Így néz ki egy osztályt tartalmazó kódrészlet Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Fent, példaként, megnézheti a függvény megvalósítását stop_all_services, amely leállítja az összes szolgáltatást a kívánt fürtcsomóponton.

Az osztály bejáratánál Ambari átpasszol:

  • ambari_urlpéldául tetszik 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – a klaszter neve Ambari nyelven,
  • headers = {'X-Requested-By': 'ambari'}
  • és belül auth itt van az Ambari felhasználóneve és jelszava: auth = ('login', 'password').

Maga a funkció nem más, mint néhány hívás a REST API-n keresztül az Ambari felé. Logikai szempontból először megkapjuk a futó szolgáltatások listáját egy csomóponton, majd egy adott klaszteren, egy adott csomóponton megkérjük, hogy a szolgáltatásokat a listából az állapotba vigyük át. INSTALLED. Funkciók az összes szolgáltatás elindításához, a csomópontok állapotba átviteléhez Maintenance stb. hasonlóan néznek ki – ezek csak néhány kérés az API-n keresztül.

osztály Mcs

Így néz ki egy osztályt tartalmazó kódrészlet Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Az osztály bejáratánál Mcs átadjuk a projektazonosítót a felhőn belül és a felhasználói azonosítót, valamint a jelszavát. Funkcióban vm_turn_on be akarjuk kapcsolni az egyik gépet. A logika itt egy kicsit bonyolultabb. A kód elején még három függvényt hívunk meg: 1) kapnunk kell egy tokent, 2) a gazdagépnevet át kell alakítanunk a gép nevére az MCS-ben, 3) meg kell szereznünk ennek a gépnek az azonosítóját. Ezután egyszerűen küldünk egy bejegyzési kérelmet, és elindítjuk a gépet.

Így néz ki a token megszerzésére szolgáló függvény:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Autoscaler osztály

Ez az osztály magával a működési logikával kapcsolatos függvényeket tartalmazza.

Így néz ki egy kódrészlet ehhez az osztályhoz:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Belépő órákat elfogadunk. Ambari и Mcs, a méretezésre engedélyezett csomópontok listája, valamint a csomópont konfigurációs paraméterei: a YARN csomópontjához lefoglalt memória és processzor. Van még 2 belső paraméter: q_ram, q_cpu, amelyek sorok. Ezek segítségével tároljuk az aktuális klaszterterhelés értékeit. Ha azt látjuk, hogy az elmúlt 5 percben folyamatosan nőtt a terhelés, akkor úgy döntünk, hogy +1 csomópontot kell hozzáadnunk a fürthöz. Ugyanez igaz a fürt kihasználatlan állapotára is.

A fenti kód egy példa egy olyan függvényre, amely eltávolít egy gépet a fürtből, és leállítja a felhőben. Először is leszerelés történik YARN Nodemanager, majd az üzemmód bekapcsol Maintenance, akkor leállítjuk az összes szolgáltatást a gépen, és kikapcsoljuk a virtuális gépet a felhőben.

2. Script megfigyelő.py

Minta kód onnan:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Ebben ellenőrizzük, hogy megteremtődtek-e a feltételek a klaszter kapacitásának növeléséhez, és van-e tartalék gép, megkapjuk az egyik gépnevét, hozzáadjuk a klaszterhez, és erről üzenetet teszünk közzé csapatunk Slack oldalán. Ami után elindul cooldown_period, amikor nem adunk hozzá vagy távolítunk el semmit a klaszterből, hanem egyszerűen figyeljük a terhelést. Ha stabilizálódott és az optimális terhelési értékek folyosóján belül van, akkor egyszerűen folytatjuk a megfigyelést. Ha egy csomópont nem lenne elég, akkor hozzáadunk egy másikat.

Azokban az esetekben, amikor előttünk van egy lecke, már biztosan tudjuk, hogy egy csomópont nem lesz elég, ezért azonnal elindítjuk az összes szabad csomópontot, és aktívan tartjuk őket az óra végéig. Ez a tevékenységi időbélyegek listájának használatával történik.

Következtetés

Az Autoscaler jó és kényelmes megoldás azokra az esetekre, amikor egyenetlen fürtterhelést tapasztal. Egyszerre éri el a kívánt fürtkonfigurációt a csúcsterhelésekhez, ugyanakkor nem tartja meg ezt a klasztert alulterhelés alatt, így pénzt takarít meg. Nos, ráadásul mindez automatikusan, az Ön részvétele nélkül történik. Maga az autoscaler nem más, mint a fürtkezelő API-hoz és a felhőszolgáltató API-hoz intézett kérések halmaza, egy bizonyos logika szerint megírva. Amire mindenképpen emlékezni kell, az a csomópontok 3 típusra való felosztása, ahogy korábban írtuk. És boldog leszel.

Forrás: will.com

Hozzászólás