Kuinka tehdä oma automaattinen skaalaaja klusteriin

Hei! Koulutamme ihmisiä työskentelemään big datan parissa. On mahdotonta kuvitella suurdataa käsittelevää koulutusohjelmaa ilman omaa klusteriaan, jossa kaikki osallistujat työskentelevät yhdessä. Tästä syystä ohjelmassamme on se aina 🙂 Olemme mukana sen konfiguroinnissa, virittämisessä ja hallinnassa, ja kaverit käynnistävät siellä suoraan MapReduce-töitä ja käyttävät Sparkia.

Tässä viestissä kerromme, kuinka ratkaisimme klusterin epätasaisen latauksen ongelman kirjoittamalla oman autoskaalaimemme pilven avulla Mail.ru Pilviratkaisut.

ongelma

Klusteriamme ei käytetä tyypillisessä tilassa. Hävittäminen on erittäin epätasaista. Esimerkiksi on käytännön tunteja, jolloin kaikki 30 henkilöä ja opettaja menevät klusteriin ja alkavat käyttää sitä. Tai taas on päiviä ennen määräaikaa, jolloin kuormitus kasvaa huomattavasti. Muun ajan klusteri toimii alikuormitustilassa.

Ratkaisu #1 on pitää klusteri, joka kestää huippukuormitukset, mutta on käyttämättömänä muun ajan.

Ratkaisu #2 on säilyttää pieni klusteri, johon lisäät manuaalisesti solmuja ennen luokkia ja huippukuormituksen aikana.

Ratkaisu #3 on pitää pieni klusteri ja kirjoittaa automaattinen skaalaaja, joka tarkkailee klusterin nykyistä kuormitusta ja lisää ja poistaa solmuja klusterista erilaisten API:iden avulla.

Tässä viestissä puhumme ratkaisusta #3. Tämä automaattinen skaalaus on erittäin riippuvainen ulkoisista tekijöistä pikemminkin kuin sisäisistä, eivätkä palveluntarjoajat usein tarjoa sitä. Käytämme Mail.ru Cloud Solutions -pilviinfrastruktuuria ja kirjoitimme automaattisen skaalaimen MCS API:lla. Ja koska opetamme työskentelemään datan kanssa, päätimme näyttää, kuinka voit kirjoittaa samanlaisen automaattisen skaalaimen omiin tarkoituksiin ja käyttää sitä pilvesi kanssa.

Edellytykset

Ensinnäkin sinulla on oltava Hadoop-klusteri. Käytämme esimerkiksi HDP-jakelua.

Jotta solmuja voidaan lisätä ja poistaa nopeasti, sinulla on oltava tietty roolijakauma solmujen kesken.

  1. Pääsolmu. No, ei tarvitse selittää mitään erityisemmin: klusterin pääsolmu, johon esimerkiksi Spark-ajuri käynnistetään, jos käytät interaktiivista tilaa.
  2. Päivämääräsolmu. Tämä on solmu, johon tallennat tietoja HDFS:ään ja jossa laskelmat suoritetaan.
  3. Laskentasolmu. Tämä on solmu, jossa et tallenna mitään HDFS:ään, mutta jossa laskelmat tapahtuvat.

Tärkeä pointti. Automaattinen skaalaus tapahtuu kolmannen tyypin solmujen vuoksi. Jos alat ottaa ja lisätä toisen tyyppisiä solmuja, vastenopeus on erittäin alhainen - käytöstä poistaminen ja uudelleensovittaminen vie klusterillasi tunteja. Tämä ei tietenkään ole sitä, mitä odotat automaattisesta skaalauksesta. Eli emme kosketa ensimmäisen ja toisen tyypin solmuja. Ne edustavat vähintään elinkelpoista klusteria, joka on olemassa koko ohjelman ajan.

Joten automaattinen skaalaajamme on kirjoitettu Python 3:ssa, käyttää Ambari API:ta klusteripalvelujen hallintaan, käyttää API:lta Mail.ru Cloud Solutions (MCS) koneiden käynnistämiseen ja pysäyttämiseen.

Ratkaisuarkkitehtuuri

  1. Moduuli autoscaler.py. Se sisältää kolme luokkaa: 1) Ambarin kanssa työskentelyyn tarkoitetut toiminnot, 2) MCS:n kanssa työskentelyn toiminnot, 3) toiminnot, jotka liittyvät suoraan automaattisen skaalaimen logiikkaan.
  2. Käsikirjoitus observer.py. Pohjimmiltaan se koostuu erilaisista säännöistä: milloin ja millä hetkillä autoscaler-toimintoja kutsutaan.
  3. Asetustiedosto config.py. Se sisältää esimerkiksi luettelon solmuista, jotka sallitaan automaattisella skaalauksella, ja muita parametreja, jotka vaikuttavat esimerkiksi siihen, kuinka kauan odottaa uuden solmun lisäyshetkestä. Tuntien alkamiseen on myös aikaleimat, jotta ennen luokkaa käynnistetään suurin sallittu klusterikonfiguraatio.

Katsotaan nyt kahden ensimmäisen tiedoston sisällä olevia koodinpätkiä.

1. Autoscaler.py-moduuli

Ambari luokka

Tältä näyttää luokan sisältävä koodinpätkä Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Yllä esimerkkinä voit tarkastella toiminnon toteutusta stop_all_services, joka pysäyttää kaikki palvelut halutussa klusterisolmussa.

Luokan sisäänkäynnillä Ambari ohitat:

  • ambari_urlesimerkiksi kuten 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – klusterin nimi Ambarissa,
  • headers = {'X-Requested-By': 'ambari'}
  • ja sisällä auth tässä on Ambari-tunnuksesi ja salasanasi: auth = ('login', 'password').

Itse toiminto ei ole muuta kuin pari kutsua REST API:n kautta Ambarille. Loogisesta näkökulmasta katsottuna saamme ensin luettelon solmussa käynnissä olevista palveluista ja sitten pyydämme tietyssä klusterissa, tietyssä solmussa siirtämään palvelut luettelosta tilaan. INSTALLED. Toiminnot kaikkien palveluiden käynnistämiseen, solmujen siirtämiseen tilaan Maintenance jne. näyttävät samanlaisilta - ne ovat vain muutamia pyyntöjä API:n kautta.

Luokan Mcs

Tältä näyttää luokan sisältävä koodinpätkä Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Luokan sisäänkäynnillä Mcs välitämme projektin tunnuksen pilven sisällä ja käyttäjätunnuksen sekä hänen salasanansa. Toiminnassa vm_turn_on haluamme käynnistää yhden koneista. Logiikka tässä on hieman monimutkaisempi. Koodin alussa kutsutaan kolme muuta toimintoa: 1) meidän täytyy saada token, 2) meidän on muutettava isäntänimi koneen nimeksi MCS:ssä, 3) hankittava tämän koneen tunnus. Seuraavaksi teemme vain postituspyynnön ja käynnistämme tämän koneen.

Tokenin hankkimistoiminto näyttää tältä:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Autoscaler-luokka

Tämä luokka sisältää itse toimintalogiikkaan liittyviä toimintoja.

Tämän luokan koodinpätkä näyttää tältä:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Otamme kursseja vastaan. Ambari и Mcs, luettelo solmuista, jotka ovat sallittuja skaalattavaksi, sekä solmun kokoonpanoparametrit: YARN:n solmulle varattu muisti ja suoritin. On myös 2 sisäistä parametria q_ram, q_cpu, jotka ovat jonoja. Niiden avulla tallennamme nykyisen klusterin kuormituksen arvot. Jos näemme, että viimeisten 5 minuutin aikana kuormitus on jatkuvasti lisääntynyt, päätämme, että meidän on lisättävä +1-solmu klusteriin. Sama pätee klusterin vajaakäytön tilaan.

Yllä oleva koodi on esimerkki funktiosta, joka poistaa koneen klusterista ja pysäyttää sen pilveen. Ensin on käytöstäpoisto YARN Nodemanager, tila kytkeytyy päälle Maintenance, sitten pysäytämme kaikki palvelut koneella ja sammutamme virtuaalikoneen pilvessä.

2. Käsikirjoituksen tarkkailija.py

Esimerkkikoodi sieltä:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Siinä tarkistamme, onko klusterin kapasiteetin lisäämiselle luotu edellytykset ja onko koneita varassa, hankimme niistä yhden isäntänimen, lisäämme sen klusteriin ja julkaisemme siitä viestin tiimimme Slackiin. Sen jälkeen se alkaa cooldown_period, kun emme lisää tai poista klusteriin mitään, vaan valvomme vain kuormitusta. Jos se on tasaantunut ja on optimaalisten kuormitusarvojen käytävän sisällä, jatkamme vain seurantaa. Jos yksi solmu ei riittänyt, lisäämme toisen.

Tapauksissa, joissa meillä on oppitunti edessä, tiedämme jo varmasti, että yksi solmu ei riitä, joten käynnistämme välittömästi kaikki vapaat solmut ja pidämme ne aktiivisina oppitunnin loppuun asti. Tämä tapahtuu toimintojen aikaleimaluettelon avulla.

Johtopäätös

Autoscaler on hyvä ja kätevä ratkaisu tapauksiin, joissa klusterin kuormitus on epätasaista. Saavutat samanaikaisesti halutun klusterin konfiguraation huippukuormituksille ja samalla et säilytä tätä klusteria alikuormituksen aikana, mikä säästää rahaa. No, ja tämä kaikki tapahtuu automaattisesti ilman osallistumistasi. Itse automaattinen skaalaaja ei ole muuta kuin joukko pyyntöjä klusterinhallinnan API:lle ja pilvipalveluntarjoajan API:lle, jotka on kirjoitettu tietyn logiikan mukaisesti. Sinun on ehdottomasti muistettava solmujen jakaminen 3 tyyppiin, kuten kirjoitimme aiemmin. Ja sinä tulet olemaan onnellinen.

Lähde: will.com

Lisää kommentti