Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή

Γεια σου Χαμπρ!

Σας αρέσουν τα αεροπλάνα που πετούν; Μου αρέσει, αλλά κατά τη διάρκεια της αυτοαπομόνωσης ερωτεύτηκα επίσης την ανάλυση δεδομένων για αεροπορικά εισιτήρια από έναν γνωστό πόρο - το Aviasales.

Σήμερα θα αναλύσουμε το έργο του Amazon Kinesis, θα δημιουργήσουμε ένα σύστημα ροής με αναλυτικά στοιχεία σε πραγματικό χρόνο, θα εγκαταστήσουμε τη βάση δεδομένων Amazon DynamoDB NoSQL ως κύρια αποθήκευση δεδομένων και θα ρυθμίσουμε ειδοποιήσεις SMS για ενδιαφέροντα εισιτήρια.

Όλες οι λεπτομέρειες είναι κάτω από το κόψιμο! Πηγαίνω!

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή

Εισαγωγή

Για παράδειγμα, χρειαζόμαστε πρόσβαση σε Aviasales API. Η πρόσβαση σε αυτό παρέχεται δωρεάν και χωρίς περιορισμούς. Απλώς πρέπει να εγγραφείτε στην ενότητα "Προγραμματιστές" για να λάβετε το διακριτικό API σας για πρόσβαση στα δεδομένα.

Ο κύριος σκοπός αυτού του άρθρου είναι να δώσει μια γενική κατανόηση της χρήσης της ροής πληροφοριών στο AWS· λαμβάνουμε υπόψη ότι τα δεδομένα που επιστρέφονται από το API που χρησιμοποιείται δεν είναι αυστηρά ενημερωμένα και μεταδίδονται από τη μνήμη cache, η οποία είναι δημιουργήθηκε με βάση αναζητήσεις από χρήστες των ιστοτόπων Aviasales.ru και Jetradar.com για τις τελευταίες 48 ώρες.

Το Kinesis-agent, εγκατεστημένο στη μηχανή παραγωγής, που λαμβάνεται μέσω του API θα αναλύει αυτόματα και θα μεταδίδει δεδομένα στην επιθυμητή ροή μέσω του Kinesis Data Analytics. Η μη επεξεργασμένη έκδοση αυτής της ροής θα γραφτεί απευθείας στο κατάστημα. Η αποθήκευση ακατέργαστων δεδομένων που αναπτύσσεται στο DynamoDB θα επιτρέψει τη βαθύτερη ανάλυση εισιτηρίων μέσω εργαλείων BI, όπως το AWS Quick Sight.

Θα εξετάσουμε δύο επιλογές για την ανάπτυξη ολόκληρης της υποδομής:

  • Εγχειρίδιο - μέσω AWS Management Console.
  • Η υποδομή από τον κώδικα Terraform είναι για τεμπέληδες αυτοματοποιητές.

Η αρχιτεκτονική του ανεπτυγμένου συστήματος

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Μεταχειρισμένα εξαρτήματα:

  • Aviasales API — τα δεδομένα που επιστρέφονται από αυτό το API θα χρησιμοποιηθούν για όλες τις επόμενες εργασίες·
  • Παράδειγμα παραγωγού EC2 — μια κανονική εικονική μηχανή στο σύννεφο στην οποία θα δημιουργηθεί η ροή δεδομένων εισόδου:
    • Kinesis Agent είναι μια εφαρμογή Java εγκατεστημένη τοπικά στο μηχάνημα που παρέχει έναν εύκολο τρόπο συλλογής και αποστολής δεδομένων στο Kinesis (Kinesis Data Streams ή Kinesis Firehose). Ο πράκτορας παρακολουθεί συνεχώς ένα σύνολο αρχείων στους καθορισμένους καταλόγους και στέλνει νέα δεδομένα στο Kinesis.
    • Σενάριο κλήσης API — Ένα σενάριο Python που κάνει αιτήματα στο API και τοποθετεί την απάντηση σε έναν φάκελο που παρακολουθείται από τον παράγοντα Kinesis.
  • Ροές δεδομένων Kinesis — υπηρεσία ροής δεδομένων σε πραγματικό χρόνο με δυνατότητες ευρείας κλίμακας·
  • Kinesis Analytics είναι μια υπηρεσία χωρίς διακομιστή που απλοποιεί την ανάλυση των δεδομένων ροής σε πραγματικό χρόνο. Το Amazon Kinesis Data Analytics διαμορφώνει τους πόρους της εφαρμογής και κλιμακώνεται αυτόματα για να χειριστεί οποιοδήποτε όγκο εισερχόμενων δεδομένων.
  • AWS Lambda — μια υπηρεσία που σας επιτρέπει να εκτελείτε κώδικα χωρίς δημιουργία αντιγράφων ασφαλείας ή εγκατάσταση διακομιστών. Όλη η υπολογιστική ισχύς κλιμακώνεται αυτόματα για κάθε κλήση.
  • Amazon DynamoDB - Μια βάση δεδομένων ζευγών κλειδιών-τιμών και εγγράφων που παρέχει καθυστέρηση μικρότερη από 10 χιλιοστά του δευτερολέπτου όταν εκτελείται σε οποιαδήποτε κλίμακα. Όταν χρησιμοποιείτε το DynamoDB, δεν χρειάζεται να παρέχετε, να επιδιορθώνετε ή να διαχειρίζεστε διακομιστές. Το DynamoDB κλιμακώνει αυτόματα τους πίνακες για να προσαρμόσει την ποσότητα των διαθέσιμων πόρων και να διατηρήσει υψηλή απόδοση. Δεν απαιτείται διαχείριση συστήματος.
  • Amazon SNS - μια πλήρως διαχειριζόμενη υπηρεσία για την αποστολή μηνυμάτων χρησιμοποιώντας το μοντέλο εκδότη-συνδρομητή (Pub/Sub), με την οποία μπορείτε να απομονώσετε μικροϋπηρεσίες, κατανεμημένα συστήματα και εφαρμογές χωρίς διακομιστή. Το SNS μπορεί να χρησιμοποιηθεί για την αποστολή πληροφοριών στους τελικούς χρήστες μέσω ειδοποιήσεων push για κινητά, μηνυμάτων SMS και email.

Αρχική εκπαίδευση

Για να προσομοιώσω τη ροή δεδομένων, αποφάσισα να χρησιμοποιήσω τις πληροφορίες αεροπορικών εισιτηρίων που επιστράφηκαν από το API της Aviasales. ΣΕ τεκμηρίωση μια αρκετά εκτενής λίστα διαφορετικών μεθόδων, ας πάρουμε μία από αυτές - το "Μηνιαία Ημερολόγιο Τιμών", το οποίο επιστρέφει τιμές για κάθε ημέρα του μήνα, ομαδοποιημένες με βάση τον αριθμό των μεταφορών. Εάν δεν καθορίσετε τον μήνα αναζήτησης στο αίτημα, οι πληροφορίες θα επιστραφούν για τον μήνα που ακολουθεί τον τρέχοντα.

Λοιπόν, ας εγγραφούμε και πάρουμε το διακριτικό μας.

Ένα παράδειγμα αιτήματος είναι παρακάτω:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Η παραπάνω μέθοδος λήψης δεδομένων από το API με τον καθορισμό ενός διακριτικού στο αίτημα θα λειτουργήσει, αλλά προτιμώ να περάσω το διακριτικό πρόσβασης μέσω της κεφαλίδας, επομένως θα χρησιμοποιήσουμε αυτήν τη μέθοδο στο σενάριο api_caller.py.

Παράδειγμα απάντησης:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Το παράδειγμα απάντησης API παραπάνω δείχνει ένα εισιτήριο από την Αγία Πετρούπολη προς το Phuk... Ω, τι όνειρο...
Επειδή είμαι από το Καζάν και το Πουκέτ είναι πλέον «μόνο ένα όνειρο», ας ψάξουμε για εισιτήρια από την Αγία Πετρούπολη προς το Καζάν.

Προϋποθέτει ότι έχετε ήδη λογαριασμό AWS. Θα ήθελα να επιστήσω αμέσως ιδιαίτερη προσοχή στο γεγονός ότι το Kinesis και η αποστολή ειδοποιήσεων μέσω SMS δεν περιλαμβάνονται στο ετήσιο Δωρεάν Επίπεδο (δωρεάν χρήση). Αλλά ακόμα κι παρά αυτό, με μερικά δολάρια κατά νου, είναι πολύ πιθανό να χτιστεί το προτεινόμενο σύστημα και να παίξετε μαζί του. Και, φυσικά, μην ξεχάσετε να διαγράψετε όλους τους πόρους αφού δεν χρειάζονται πλέον.

Ευτυχώς, οι λειτουργίες DynamoDb και λάμδα θα είναι δωρεάν για εμάς εάν τηρήσουμε τα μηνιαία δωρεάν όριά μας. Για παράδειγμα, για το DynamoDB: 25 GB αποθηκευτικού χώρου, 25 WCU/RCU και 100 εκατομμύρια ερωτήματα. Και ένα εκατομμύριο κλήσεις συνάρτησης λάμδα το μήνα.

Χειροκίνητη ανάπτυξη συστήματος

Ρύθμιση ροών δεδομένων Kinesis

Ας πάμε στην υπηρεσία Kinesis Data Streams και δημιουργούμε δύο νέες ροές, ένα θραύσμα για το καθένα.

Τι είναι ένα θραύσμα;
Ένα θραύσμα είναι η βασική μονάδα μεταφοράς δεδομένων μιας ροής Amazon Kinesis. Ένα τμήμα παρέχει μεταφορά δεδομένων εισόδου με ταχύτητα 1 MB/s και μεταφορά δεδομένων εξόδου με ταχύτητα 2 MB/s. Ένα τμήμα υποστηρίζει έως και 1000 καταχωρήσεις PUT ανά δευτερόλεπτο. Όταν δημιουργείτε μια ροή δεδομένων, πρέπει να καθορίσετε τον απαιτούμενο αριθμό τμημάτων. Για παράδειγμα, μπορείτε να δημιουργήσετε μια ροή δεδομένων με δύο τμήματα. Αυτή η ροή δεδομένων θα παρέχει μεταφορά δεδομένων εισόδου στα 2 MB/s και μεταφορά δεδομένων εξόδου στα 4 MB/s, υποστηρίζοντας έως και 2000 εγγραφές PUT ανά δευτερόλεπτο.

Όσο περισσότερα θραύσματα στη ροή σας, τόσο μεγαλύτερη είναι η απόδοσή της. Κατ' αρχήν, έτσι κλιμακώνονται οι ροές - με την προσθήκη θραυσμάτων. Αλλά όσο περισσότερα θραύσματα έχετε, τόσο υψηλότερη είναι η τιμή. Κάθε θραύσμα κοστίζει 1,5 σεντ την ώρα και επιπλέον 1.4 σεντ για κάθε εκατομμύριο μονάδες ωφέλιμου φορτίου PUT.

Ας δημιουργήσουμε μια νέα ροή με το όνομα αεροπορικά εισιτήρια, 1 κομμάτι θα του είναι αρκετό:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Τώρα ας δημιουργήσουμε ένα άλλο νήμα με το όνομα special_stream:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή

Ρύθμιση παραγωγού

Για να αναλύσετε μια εργασία, αρκεί να χρησιμοποιήσετε μια κανονική παρουσία EC2 ως παραγωγό δεδομένων. Δεν χρειάζεται να είναι μια ισχυρή, ακριβή εικονική μηχανή· ένα spot t2.micro θα κάνει μια χαρά.

Σημαντική σημείωση: για παράδειγμα, θα πρέπει να χρησιμοποιήσετε την εικόνα - Amazon Linux AMI 2018.03.0, έχει λιγότερες ρυθμίσεις για γρήγορη εκκίνηση του Kinesis Agent.

Μεταβείτε στην υπηρεσία EC2, δημιουργήστε μια νέα εικονική μηχανή, επιλέξτε το επιθυμητό AMI με τύπο t2.micro, το οποίο περιλαμβάνεται στο Free Tier:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Για να μπορεί η νεοδημιουργηθείσα εικονική μηχανή να αλληλεπιδρά με την υπηρεσία Kinesis, πρέπει να της δοθούν δικαιώματα για να το κάνει. Ο καλύτερος τρόπος για να γίνει αυτό είναι να εκχωρήσετε έναν ρόλο IAM. Επομένως, στην οθόνη Βήμα 3: Διαμόρφωση λεπτομερειών παρουσίας, θα πρέπει να επιλέξετε Δημιουργία νέου ρόλου IAM:

Δημιουργία ρόλου IAM για το EC2
Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Στο παράθυρο που ανοίγει, επιλέξτε ότι δημιουργούμε νέο ρόλο για το EC2 και μεταβείτε στην ενότητα Δικαιώματα:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Χρησιμοποιώντας το παράδειγμα εκπαίδευσης, δεν χρειάζεται να μπούμε σε όλες τις περιπλοκές της λεπτομερούς διαμόρφωσης των δικαιωμάτων πόρων, επομένως θα επιλέξουμε τις πολιτικές που έχουν προρυθμιστεί από την Amazon: AmazonKinesisFullAccess και CloudWatchFullAccess.

Ας δώσουμε κάποιο ουσιαστικό όνομα για αυτόν τον ρόλο, για παράδειγμα: EC2-KinesisStreams-FullAccess. Το αποτέλεσμα πρέπει να είναι το ίδιο όπως φαίνεται στην παρακάτω εικόνα:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Αφού δημιουργήσετε αυτόν τον νέο ρόλο, μην ξεχάσετε να τον επισυνάψετε στην παρουσία εικονικής μηχανής που δημιουργήθηκε:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Δεν αλλάζουμε τίποτα άλλο σε αυτήν την οθόνη και προχωράμε στα επόμενα παράθυρα.

Οι ρυθμίσεις του σκληρού δίσκου μπορούν να παραμείνουν ως προεπιλογές, καθώς και οι ετικέτες (αν και είναι καλή πρακτική να χρησιμοποιείτε ετικέτες, τουλάχιστον να δώσετε στην παρουσία ένα όνομα και να υποδείξετε το περιβάλλον).

Τώρα βρισκόμαστε στην καρτέλα Βήμα 6: Διαμόρφωση ομάδας ασφαλείας, όπου πρέπει να δημιουργήσετε μια νέα ή να καθορίσετε την υπάρχουσα ομάδα ασφαλείας σας, η οποία σας επιτρέπει να συνδεθείτε μέσω ssh (θύρα 22) στην παρουσία. Επιλέξτε Source -> My IP εκεί και μπορείτε να ξεκινήσετε την παρουσία.

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Μόλις μεταβεί σε κατάσταση λειτουργίας, μπορείτε να δοκιμάσετε να συνδεθείτε σε αυτό μέσω ssh.

Για να μπορέσετε να εργαστείτε με το Kinesis Agent, μετά την επιτυχή σύνδεση στο μηχάνημα, πρέπει να εισαγάγετε τις ακόλουθες εντολές στο τερματικό:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Ας δημιουργήσουμε έναν φάκελο για την αποθήκευση των απαντήσεων API:

sudo mkdir /var/log/airline_tickets

Πριν ξεκινήσετε τον παράγοντα, πρέπει να διαμορφώσετε τις παραμέτρους του:

sudo vim /etc/aws-kinesis/agent.json

Τα περιεχόμενα του αρχείου agent.json θα πρέπει να έχουν την εξής μορφή:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Όπως φαίνεται από το αρχείο διαμόρφωσης, ο πράκτορας θα παρακολουθεί αρχεία με την επέκταση .log στον κατάλογο /var/log/airline_tickets/, θα τα αναλύει και θα τα μεταφέρει στη ροή airline_tickets.

Επανεκκινούμε την υπηρεσία και βεβαιωνόμαστε ότι είναι σε λειτουργία:

sudo service aws-kinesis-agent restart

Τώρα ας κατεβάσουμε το σενάριο Python που θα ζητήσει δεδομένα από το API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Το σενάριο api_caller.py ζητά δεδομένα από το Aviasales και αποθηκεύει τη ληφθείσα απάντηση στον κατάλογο που σαρώνει ο παράγοντας Kinesis. Η υλοποίηση αυτού του σεναρίου είναι αρκετά τυπική, υπάρχει μια κλάση TicketsApi, σας επιτρέπει να τραβάτε ασύγχρονα το API. Περνάμε μια κεφαλίδα με ένα διακριτικό και ζητάμε παραμέτρους σε αυτήν την κλάση:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Για να ελέγξουμε τις σωστές ρυθμίσεις και τη λειτουργικότητα του πράκτορα, ας δοκιμάσουμε να εκτελέσουμε το σενάριο api_caller.py:

sudo ./api_caller.py TOKEN

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Και εξετάζουμε το αποτέλεσμα της εργασίας στα αρχεία καταγραφής του πράκτορα και στην καρτέλα Παρακολούθηση στη ροή δεδομένων αεροπορικών εισιτηρίων:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Όπως μπορείτε να δείτε, όλα λειτουργούν και το Kinesis Agent στέλνει με επιτυχία δεδομένα στη ροή. Τώρα ας διαμορφώσουμε τον καταναλωτή.

Ρύθμιση Kinesis Data Analytics

Ας προχωρήσουμε στο κεντρικό στοιχείο ολόκληρου του συστήματος - δημιουργήστε μια νέα εφαρμογή στο Kinesis Data Analytics με το όνομα kinesis_analytics_airlines_app:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Το Kinesis Data Analytics σάς επιτρέπει να εκτελείτε αναλύσεις δεδομένων σε πραγματικό χρόνο από το Kinesis Streams χρησιμοποιώντας τη γλώσσα SQL. Είναι μια υπηρεσία πλήρως αυτόματης κλιμάκωσης (σε αντίθεση με την Kinesis Streams) που:

  1. σας επιτρέπει να δημιουργείτε νέες ροές (Output Stream) με βάση αιτήματα για πηγή δεδομένων.
  2. παρέχει μια ροή με σφάλματα που εμφανίστηκαν κατά την εκτέλεση των εφαρμογών (Ροή σφάλματος).
  3. μπορεί να καθορίσει αυτόματα το σχήμα δεδομένων εισόδου (μπορεί να επαναπροσδιοριστεί χειροκίνητα εάν είναι απαραίτητο).

Αυτή δεν είναι μια φθηνή υπηρεσία - 0.11 USD ανά ώρα εργασίας, επομένως θα πρέπει να τη χρησιμοποιήσετε προσεκτικά και να τη διαγράψετε όταν τελειώσετε.

Ας συνδέσουμε την εφαρμογή με την πηγή δεδομένων:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Επιλέξτε τη ροή στην οποία πρόκειται να συνδεθούμε (αεροπορικά_εισιτήρια):

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Στη συνέχεια, πρέπει να επισυνάψετε έναν νέο ρόλο IAM, ώστε η εφαρμογή να μπορεί να διαβάζει από τη ροή και να γράφει στη ροή. Για να γίνει αυτό, αρκεί να μην αλλάξετε τίποτα στο μπλοκ αδειών πρόσβασης:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Τώρα ας ζητήσουμε την ανακάλυψη του σχήματος δεδομένων στη ροή· για να το κάνετε αυτό, κάντε κλικ στο κουμπί «Ανακάλυψη σχήματος». Ως αποτέλεσμα, ο ρόλος IAM θα ​​ενημερωθεί (θα δημιουργηθεί ένας νέος) και η ανίχνευση σχήματος θα ξεκινήσει από τα δεδομένα που έχουν ήδη φτάσει στη ροή:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Τώρα πρέπει να μεταβείτε στον επεξεργαστή SQL. Όταν κάνετε κλικ σε αυτό το κουμπί, θα εμφανιστεί ένα παράθυρο που σας ζητά να ξεκινήσετε την εφαρμογή - επιλέξτε αυτό που θέλετε να εκκινήσετε:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Εισαγάγετε το ακόλουθο απλό ερώτημα στο παράθυρο του προγράμματος επεξεργασίας SQL και κάντε κλικ στην επιλογή Αποθήκευση και εκτέλεση SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Στις σχεσιακές βάσεις δεδομένων, εργάζεστε με πίνακες χρησιμοποιώντας εντολές INSERT για να προσθέσετε εγγραφές και μια πρόταση SELECT σε δεδομένα ερωτημάτων. Στο Amazon Kinesis Data Analytics, εργάζεστε με ροές (STREAM) και αντλίες (PUMPs)—αιτήματα συνεχούς εισαγωγής που εισάγουν δεδομένα από μια ροή σε μια εφαρμογή σε μια άλλη ροή.

Το ερώτημα SQL που παρουσιάστηκε παραπάνω αναζητά εισιτήρια Aeroflot με κόστος κάτω από πέντε χιλιάδες ρούβλια. Όλες οι εγγραφές που πληρούν αυτές τις προϋποθέσεις θα τοποθετηθούν στη ροή DESTINATION_SQL_STREAM.

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Στο μπλοκ Προορισμός, επιλέξτε τη ροή special_stream και στην αναπτυσσόμενη λίστα όνομα ροής εντός εφαρμογής DESTINATION_SQL_STREAM:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Το αποτέλεσμα όλων των χειρισμών θα πρέπει να είναι κάτι παρόμοιο με την παρακάτω εικόνα:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή

Δημιουργία και εγγραφή σε ένα θέμα SNS

Μεταβείτε στην Υπηρεσία Απλής Ειδοποίησης και δημιουργήστε ένα νέο θέμα εκεί με το όνομα Airlines:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Εγγραφείτε σε αυτό το θέμα και υποδείξτε τον αριθμό κινητού τηλεφώνου στον οποίο θα αποστέλλονται ειδοποιήσεις SMS:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή

Δημιουργήστε έναν πίνακα στο DynamoDB

Για να αποθηκεύσουμε τα ακατέργαστα δεδομένα από τη ροή αεροπορικών εισιτηρίων τους, ας δημιουργήσουμε έναν πίνακα στο DynamoDB με το ίδιο όνομα. Θα χρησιμοποιήσουμε το record_id ως πρωτεύον κλειδί:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή

Δημιουργία συλλέκτη συναρτήσεων λάμδα

Ας δημιουργήσουμε μια συνάρτηση λάμδα που ονομάζεται Συλλέκτης, της οποίας η αποστολή θα είναι να κάνει δημοσκόπηση στη ροή αεροπορικών εισιτηρίων και, εάν βρεθούν νέες εγγραφές εκεί, να εισαγάγετε αυτές τις εγγραφές στον πίνακα DynamoDB. Προφανώς, εκτός από τα προεπιλεγμένα δικαιώματα, αυτό το λάμδα πρέπει να έχει πρόσβαση ανάγνωσης στη ροή δεδομένων Kinesis και πρόσβαση εγγραφής στο DynamoDB.

Δημιουργία ρόλου IAM για τη συνάρτηση λάμδα συλλέκτη
Αρχικά, ας δημιουργήσουμε έναν νέο ρόλο IAM για το λάμδα με το όνομα Lambda-TicketsProcessingRole:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Για το παράδειγμα δοκιμής, οι προρυθμισμένες πολιτικές AmazonKinesisReadOnlyAccess και AmazonDynamoDBFullAccess είναι αρκετά κατάλληλες, όπως φαίνεται στην παρακάτω εικόνα:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή

Αυτό το λάμδα θα πρέπει να εκτοξεύεται από ένα έναυσμα από το Kinesis όταν εισέρχονται νέες καταχωρήσεις στο airline_stream, επομένως πρέπει να προσθέσουμε ένα νέο έναυσμα:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Το μόνο που μένει είναι να εισαγάγετε τον κωδικό και να αποθηκεύσετε το λάμδα.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Δημιουργία ειδοποιητή συνάρτησης λάμδα

Η δεύτερη συνάρτηση λάμδα, η οποία θα παρακολουθεί τη δεύτερη ροή (special_stream) και θα στέλνει μια ειδοποίηση στο SNS, δημιουργείται με παρόμοιο τρόπο. Επομένως, αυτό το λάμδα πρέπει να έχει πρόσβαση για ανάγνωση από το Kinesis και αποστολή μηνυμάτων σε ένα δεδομένο θέμα SNS, το οποίο στη συνέχεια θα σταλεί από την υπηρεσία SNS σε όλους τους συνδρομητές αυτού του θέματος (email, SMS κ.λπ.).

Δημιουργία ρόλου IAM
Αρχικά, δημιουργούμε τον ρόλο IAM Lambda-KinesisAlarm για αυτό το lambda και, στη συνέχεια, εκχωρούμε αυτόν τον ρόλο στο alarm_notifier lambda που δημιουργείται:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή

Αυτό το λάμδα θα πρέπει να λειτουργεί σε ένα έναυσμα για να εισέλθουν νέες εγγραφές στο special_stream, επομένως πρέπει να διαμορφώσετε τον κανόνα με τον ίδιο τρόπο που κάναμε για το Συλλεκτικό λάμδα.

Για να διευκολύνουμε τη διαμόρφωση αυτού του λάμδα, ας εισαγάγουμε μια νέα μεταβλητή περιβάλλοντος - TOPIC_ARN, όπου τοποθετούμε το ANR (Amazon Recourse Names) του θέματος των αεροπορικών εταιρειών:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Και εισάγετε τον κωδικό λάμδα, δεν είναι καθόλου περίπλοκο:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Φαίνεται ότι εδώ ολοκληρώνεται η μη αυτόματη διαμόρφωση του συστήματος. Το μόνο που μένει είναι να δοκιμάσουμε και να βεβαιωθούμε ότι έχουμε ρυθμίσει τα πάντα σωστά.

Ανάπτυξη από τον κώδικα Terraform

Απαιτούμενη προετοιμασία

Terraform είναι ένα πολύ βολικό εργαλείο ανοιχτού κώδικα για την ανάπτυξη υποδομής από κώδικα. Έχει τη δική του σύνταξη που είναι εύκολη στην εκμάθηση και έχει πολλά παραδείγματα για το πώς και τι να αναπτυχθεί. Το πρόγραμμα επεξεργασίας Atom ή ο κώδικας του Visual Studio έχει πολλά εύχρηστα πρόσθετα που διευκολύνουν την εργασία με το Terraform.

Μπορείτε να κατεβάσετε τη διανομή ως εκ τούτου,. Μια λεπτομερής ανάλυση όλων των δυνατοτήτων του Terraform ξεφεύγει από το πεδίο εφαρμογής αυτού του άρθρου, επομένως θα περιοριστούμε στα κύρια σημεία.

Πώς να τρέξετε

Ο πλήρης κωδικός του έργου είναι στο αποθετήριο μου. Κλωνοποιούμε το αποθετήριο στον εαυτό μας. Πριν ξεκινήσετε, πρέπει να βεβαιωθείτε ότι έχετε εγκαταστήσει και διαμορφώσει το AWS CLI, επειδή... Η Terraform θα αναζητήσει διαπιστευτήρια στο αρχείο ~/.aws/credentials.

Μια καλή πρακτική είναι να εκτελέσετε την εντολή σχεδίου πριν αναπτύξετε ολόκληρη την υποδομή για να δείτε τι δημιουργεί αυτή τη στιγμή η Terraform για εμάς στο cloud:

terraform.exe plan

Θα σας ζητηθεί να εισαγάγετε έναν αριθμό τηλεφώνου στον οποίο θα στέλνετε ειδοποιήσεις. Δεν είναι απαραίτητο να το εισάγετε σε αυτό το στάδιο.

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Έχοντας αναλύσει το σχέδιο λειτουργίας του προγράμματος, μπορούμε να ξεκινήσουμε τη δημιουργία πόρων:

terraform.exe apply

Μετά την αποστολή αυτής της εντολής, θα σας ζητηθεί και πάλι να εισαγάγετε έναν αριθμό τηλεφώνου. Πληκτρολογήστε "ναι" όταν εμφανιστεί μια ερώτηση σχετικά με την πραγματική εκτέλεση των ενεργειών. Αυτό θα σας επιτρέψει να ρυθμίσετε ολόκληρη την υποδομή, να πραγματοποιήσετε όλες τις απαραίτητες ρυθμίσεις παραμέτρων του EC2, να αναπτύξετε λειτουργίες λάμδα κ.λπ.

Αφού δημιουργηθούν επιτυχώς όλοι οι πόροι μέσω του κώδικα Terraform, πρέπει να μεταβείτε στις λεπτομέρειες της εφαρμογής Kinesis Analytics (δυστυχώς, δεν βρήκα πώς να το κάνω απευθείας από τον κώδικα).

Εκκινήστε την εφαρμογή:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Μετά από αυτό, πρέπει να ορίσετε ρητά το όνομα ροής εντός εφαρμογής επιλέγοντας από την αναπτυσσόμενη λίστα:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Τώρα όλα είναι έτοιμα να ξεκινήσουν.

Δοκιμή της εφαρμογής

Ανεξάρτητα από το πώς αναπτύξατε το σύστημα, χειροκίνητα ή μέσω κώδικα Terraform, θα λειτουργεί το ίδιο.

Συνδεόμαστε μέσω SSH στην εικονική μηχανή EC2 όπου είναι εγκατεστημένο το Kinesis Agent και εκτελούμε το σενάριο api_caller.py

sudo ./api_caller.py TOKEN

Το μόνο που έχετε να κάνετε είναι να περιμένετε ένα SMS στον αριθμό σας:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
SMS - ένα μήνυμα φτάνει στο τηλέφωνο σε σχεδόν 1 λεπτό:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή
Μένει να δούμε αν οι εγγραφές αποθηκεύτηκαν στη βάση δεδομένων DynamoDB για μεταγενέστερη, πιο λεπτομερή ανάλυση. Ο πίνακας αεροπορικών_εισιτηρίων περιέχει περίπου τα ακόλουθα δεδομένα:

Ενσωμάτωση API της Aviasales με το Amazon Kinesis και απλότητα χωρίς διακομιστή

Συμπέρασμα

Κατά τη διάρκεια της εργασίας που έγινε, κατασκευάστηκε ένα διαδικτυακό σύστημα επεξεργασίας δεδομένων με βάση το Amazon Kinesis. Εξετάστηκαν επιλογές για τη χρήση του Kinesis Agent σε συνδυασμό με τα Kinesis Data Streams και την ανάλυση σε πραγματικό χρόνο Kinesis Analytics χρησιμοποιώντας εντολές SQL, καθώς και την αλληλεπίδραση του Amazon Kinesis με άλλες υπηρεσίες AWS.

Αναπτύξαμε το παραπάνω σύστημα με δύο τρόπους: έναν αρκετά μεγάλο χειροκίνητο και έναν γρήγορο από τον κώδικα Terraform.

Όλος ο πηγαίος κώδικας του έργου είναι διαθέσιμος στο αποθετήριο GitHub μου, σας προτείνω να εξοικειωθείτε με αυτό.

Είμαι στην ευχάριστη θέση να συζητήσω το άρθρο, περιμένω τα σχόλιά σας. Ελπίζω σε εποικοδομητική κριτική.

Σας εύχομαι καλή επιτυχία!

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο