Πώς να φτιάξετε το δικό σας σύστημα αυτόματης κλίμακας για ένα σύμπλεγμα

Γειά σου! Εκπαιδεύουμε ανθρώπους να εργάζονται με μεγάλα δεδομένα. Είναι αδύνατο να φανταστεί κανείς ένα εκπαιδευτικό πρόγραμμα για μεγάλα δεδομένα χωρίς το δικό του σύμπλεγμα, στο οποίο όλοι οι συμμετέχοντες συνεργάζονται. Για αυτόν τον λόγο, το πρόγραμμά μας το έχει πάντα 🙂 Εμείς ασχολούμαστε με τη διαμόρφωση, τον συντονισμό και τη διαχείρισή του και τα παιδιά ξεκινούν απευθείας τις εργασίες MapReduce εκεί και χρησιμοποιούν το Spark.

Σε αυτήν την ανάρτηση θα σας πούμε πώς λύσαμε το πρόβλημα της ανομοιόμορφης φόρτωσης συμπλέγματος γράφοντας το δικό μας autoscaler χρησιμοποιώντας το σύννεφο Mail.ru Cloud Solutions.

πρόβλημα

Το σύμπλεγμα μας δεν χρησιμοποιείται σε τυπική λειτουργία. Η απόρριψη είναι εξαιρετικά άνιση. Για παράδειγμα, υπάρχουν πρακτικά μαθήματα, όταν και τα 30 άτομα και ένας δάσκαλος πηγαίνουν στο cluster και αρχίζουν να το χρησιμοποιούν. Ή πάλι, υπάρχουν μέρες πριν από την προθεσμία που το φορτίο αυξάνεται πολύ. Τον υπόλοιπο χρόνο το σύμπλεγμα λειτουργεί σε λειτουργία υποφόρτωσης.

Η λύση #1 είναι να διατηρήσετε ένα σύμπλεγμα που θα αντέχει τα φορτία αιχμής, αλλά θα είναι αδρανές τον υπόλοιπο χρόνο.

Η λύση #2 είναι να διατηρήσετε ένα μικρό σύμπλεγμα, στο οποίο προσθέτετε με μη αυτόματο τρόπο κόμβους πριν από τις τάξεις και κατά τη διάρκεια φορτίων αιχμής.

Η λύση #3 είναι να διατηρήσετε ένα μικρό σύμπλεγμα και να γράψετε έναν αυτόματο διαβαθμιστή που θα παρακολουθεί το τρέχον φορτίο του συμπλέγματος και, χρησιμοποιώντας διάφορα API, θα προσθέτει και θα αφαιρεί κόμβους από το σύμπλεγμα.

Σε αυτή την ανάρτηση θα μιλήσουμε για τη λύση #3. Αυτός ο αυτόματος διαβαθμιστής εξαρτάται σε μεγάλο βαθμό από εξωτερικούς παράγοντες παρά από εσωτερικούς και οι πάροχοι συχνά δεν τον παρέχουν. Χρησιμοποιούμε την υποδομή cloud του Mail.ru Cloud Solutions και γράψαμε έναν αυτόματο διαβαθμιστή χρησιμοποιώντας το MCS API. Και δεδομένου ότι διδάσκουμε πώς να εργάζεστε με δεδομένα, αποφασίσαμε να δείξουμε πώς μπορείτε να γράψετε ένα παρόμοιο αυτόματο σύστημα κλίμακας για τους δικούς σας σκοπούς και να το χρησιμοποιήσετε με το σύννεφο σας

Προϋποθέσεις

Αρχικά, πρέπει να έχετε ένα σύμπλεγμα Hadoop. Για παράδειγμα, χρησιμοποιούμε την κατανομή HDP.

Για να προστεθούν και να αφαιρεθούν γρήγορα οι κόμβοι σας, πρέπει να έχετε μια ορισμένη κατανομή ρόλων μεταξύ των κόμβων.

  1. Κύριος κόμβος. Λοιπόν, δεν χρειάζεται να εξηγήσουμε τίποτα συγκεκριμένα: τον κύριο κόμβο του συμπλέγματος, στον οποίο, για παράδειγμα, εκκινείται το πρόγραμμα οδήγησης Spark, εάν χρησιμοποιείτε τη διαδραστική λειτουργία.
  2. Κόμβος ημερομηνίας. Αυτός είναι ο κόμβος στον οποίο αποθηκεύετε δεδομένα στο HDFS και όπου γίνονται οι υπολογισμοί.
  3. Υπολογιστικός κόμβος. Αυτός είναι ένας κόμβος όπου δεν αποθηκεύετε τίποτα στο HDFS, αλλά όπου γίνονται υπολογισμοί.

Σημαντικό σημείο. Η αυτόματη κλιμάκωση θα συμβεί λόγω κόμβων τρίτου τύπου. Εάν αρχίσετε να λαμβάνετε και να προσθέτετε κόμβους του δεύτερου τύπου, η ταχύτητα απόκρισης θα είναι πολύ χαμηλή - ο παροπλισμός και η εκ νέου δέσμευση θα διαρκέσουν ώρες στο σύμπλεγμα σας. Αυτό, φυσικά, δεν είναι αυτό που περιμένετε από την αυτόματη κλιμάκωση. Δηλαδή δεν αγγίζουμε κόμβους πρώτου και δεύτερου τύπου. Θα αντιπροσωπεύουν ένα ελάχιστο βιώσιμο σύμπλεγμα που θα υπάρχει καθ' όλη τη διάρκεια του προγράμματος.

Έτσι, ο αυτόματος διαβαθμιστής μας είναι γραμμένος σε Python 3, χρησιμοποιεί το Ambari API για τη διαχείριση υπηρεσιών συμπλέγματος, χρησιμοποιεί API από το Mail.ru Cloud Solutions (MCS) για εκκίνηση και παύση μηχανών.

Αρχιτεκτονική λύσης

  1. Ενότητα autoscaler.py. Περιλαμβάνει τρεις κατηγορίες: 1) λειτουργίες για εργασία με Ambari, 2) λειτουργίες για εργασία με MCS, 3) λειτουργίες που σχετίζονται άμεσα με τη λογική του αυτόματου κλιμακωτή.
  2. Γραφή observer.py. Ουσιαστικά αποτελείται από διαφορετικούς κανόνες: πότε και σε ποιες στιγμές να καλέσετε τις λειτουργίες αυτόματης κλίμακας.
  3. Αρχείο διαμόρφωσης config.py. Περιέχει, για παράδειγμα, μια λίστα με κόμβους που επιτρέπονται για αυτόματη κλιμάκωση και άλλες παραμέτρους που επηρεάζουν, για παράδειγμα, το χρόνο αναμονής από τη στιγμή που προστέθηκε ένας νέος κόμβος. Υπάρχουν επίσης χρονικές σημάνσεις για την έναρξη των τάξεων, έτσι ώστε πριν από την τάξη να ξεκινήσει η μέγιστη επιτρεπόμενη διαμόρφωση συμπλέγματος.

Ας δούμε τώρα τα κομμάτια του κώδικα μέσα στα δύο πρώτα αρχεία.

1. Μονάδα Autoscaler.py

τάξη Αμπαρίου

Έτσι μοιάζει ένα κομμάτι κώδικα που περιέχει μια κλάση Ambari:

class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Παραπάνω, ως παράδειγμα, μπορείτε να δείτε την υλοποίηση της συνάρτησης stop_all_services, το οποίο σταματά όλες τις υπηρεσίες στον επιθυμητό κόμβο συμπλέγματος.

Στην είσοδο της τάξης Ambari περνάς:

  • ambari_url, για παράδειγμα, όπως 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – το όνομα του συμπλέγματός σας στο Αμπάρι,
  • headers = {'X-Requested-By': 'ambari'}
  • και μέσα auth εδώ είναι το όνομα χρήστη και ο κωδικός πρόσβασής σας για το Ambari: auth = ('login', 'password').

Η ίδια η λειτουργία δεν είναι τίποτα άλλο από μερικές κλήσεις μέσω του REST API προς το Ambari. Από λογική άποψη, λαμβάνουμε πρώτα μια λίστα υπηρεσιών που εκτελούνται σε έναν κόμβο και, στη συνέχεια, ζητάμε σε ένα δεδομένο σύμπλεγμα, σε έναν δεδομένο κόμβο, να μεταφέρουμε υπηρεσίες από τη λίστα στην κατάσταση INSTALLED. Λειτουργίες για την εκκίνηση όλων των υπηρεσιών, για τη μεταφορά κόμβων σε κατάσταση Maintenance κ.λπ. μοιάζουν - είναι μόνο μερικά αιτήματα μέσω του API.

Τάξη Mcs

Έτσι μοιάζει ένα κομμάτι κώδικα που περιέχει μια κλάση Mcs:

class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

Στην είσοδο της τάξης Mcs περνάμε το αναγνωριστικό έργου μέσα στο cloud και το αναγνωριστικό χρήστη, καθώς και τον κωδικό πρόσβασής του. Σε λειτουργία vm_turn_on θέλουμε να ενεργοποιήσουμε ένα από τα μηχανήματα. Η λογική εδώ είναι λίγο πιο περίπλοκη. Στην αρχή του κώδικα, καλούνται τρεις άλλες συναρτήσεις: 1) πρέπει να πάρουμε ένα διακριτικό, 2) πρέπει να μετατρέψουμε το όνομα κεντρικού υπολογιστή στο όνομα του μηχανήματος στο MCS, 3) να λάβουμε το αναγνωριστικό αυτού του μηχανήματος. Στη συνέχεια, κάνουμε απλώς ένα αίτημα ανάρτησης και εκκινούμε αυτό το μηχάνημα.

Αυτή είναι η συνάρτηση για την απόκτηση ενός διακριτικού:

def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Κατηγορία Autoscaler

Αυτή η κλάση περιέχει συναρτήσεις που σχετίζονται με την ίδια τη λογική λειτουργίας.

Έτσι φαίνεται ένα κομμάτι κώδικα για αυτήν την τάξη:

class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

Δεχόμαστε μαθήματα για είσοδο. Ambari и Mcs, μια λίστα κόμβων που επιτρέπεται για κλιμάκωση, καθώς και παραμέτρους διαμόρφωσης κόμβου: μνήμη και cpu που έχουν εκχωρηθεί στον κόμβο στο YARN. Υπάρχουν επίσης 2 εσωτερικές παράμετροι q_ram, q_cpu, οι οποίες είναι ουρές. Χρησιμοποιώντας τα, αποθηκεύουμε τις τιμές του τρέχοντος φορτίου συμπλέγματος. Αν δούμε ότι τα τελευταία 5 λεπτά υπήρξε σταθερά αυξημένο φορτίο, τότε αποφασίζουμε ότι πρέπει να προσθέσουμε +1 κόμβο στο σύμπλεγμα. Το ίδιο ισχύει και για την κατάσταση υποχρησιμοποίησης συστάδων.

Ο παραπάνω κώδικας είναι ένα παράδειγμα συνάρτησης που αφαιρεί ένα μηχάνημα από το σύμπλεγμα και το σταματά στο σύννεφο. Πρώτα υπάρχει παροπλισμός YARN Nodemanager, τότε η λειτουργία ενεργοποιείται Maintenance, μετά σταματάμε όλες τις υπηρεσίες στο μηχάνημα και απενεργοποιούμε την εικονική μηχανή στο cloud.

2. Παρατηρητής σεναρίου.py

Δείγμα κώδικα από εκεί:

if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

Σε αυτό, ελέγχουμε αν έχουν δημιουργηθεί συνθήκες για την αύξηση της χωρητικότητας του συμπλέγματος και αν υπάρχουν μηχανήματα σε ρεζέρβα, παίρνουμε το όνομα κεντρικού υπολογιστή ενός από αυτά, τον προσθέτουμε στο σύμπλεγμα και δημοσιεύουμε ένα μήνυμα σχετικά με αυτό στο Slack της ομάδας μας. Μετά από αυτό ξεκινά cooldown_period, όταν δεν προσθέτουμε ή αφαιρούμε τίποτα από το σύμπλεγμα, αλλά απλώς παρακολουθούμε το φορτίο. Εάν έχει σταθεροποιηθεί και βρίσκεται εντός του διαδρόμου των βέλτιστων τιμών φορτίου, τότε απλά συνεχίζουμε την παρακολούθηση. Αν ένας κόμβος δεν ήταν αρκετός, τότε προσθέτουμε έναν άλλο.

Για περιπτώσεις που έχουμε μάθημα μπροστά, γνωρίζουμε ήδη με σιγουριά ότι ένας κόμβος δεν θα είναι αρκετός, οπότε ξεκινάμε αμέσως όλους τους ελεύθερους κόμβους και τους κρατάμε ενεργούς μέχρι το τέλος του μαθήματος. Αυτό συμβαίνει χρησιμοποιώντας μια λίστα με χρονικές σημάνσεις δραστηριότητας.

Συμπέρασμα

Το Autoscaler είναι μια καλή και βολική λύση για τις περιπτώσεις που αντιμετωπίζετε ανομοιόμορφη φόρτωση συμπλέγματος. Ταυτόχρονα επιτυγχάνετε την επιθυμητή διαμόρφωση συμπλέγματος για φορτία αιχμής και ταυτόχρονα δεν διατηρείτε αυτό το σύμπλεγμα κατά την υποφόρτωση, εξοικονομώντας χρήματα. Λοιπόν, συν όλα αυτά γίνονται αυτόματα χωρίς τη συμμετοχή σας. Το ίδιο το πρόγραμμα αυτόματης κλίμακας δεν είναι τίποτα άλλο από ένα σύνολο αιτημάτων προς το API διαχείρισης συμπλέγματος και το API του παρόχου cloud, γραμμένα σύμφωνα με μια συγκεκριμένη λογική. Αυτό που σίγουρα πρέπει να θυμάστε είναι η διαίρεση των κόμβων σε 3 τύπους, όπως γράψαμε νωρίτερα. Και θα είσαι ευτυχισμένος.

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο