Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem

Čau Habr!

Vai jums patÄ«k lidot ar lidmaŔīnām? Man tas patÄ«k, bet paÅ”izolācijas laikā es iemÄ«lējos arÄ« aviobiļeÅ”u datu analÄ«zē no viena labi zināma resursa - Aviasales.

Å odien mēs analizēsim Amazon Kinesis darbu, izveidosim straumÄ“Å”anas sistēmu ar reāllaika analÄ«zi, kā galveno datu krātuvi instalēsim Amazon DynamoDB NoSQL datu bāzi un iestatÄ«sim SMS paziņojumus interesantām biļetēm.

Visas detaļas ir zem griezuma! Aiziet!

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem

Ievads

Piemēram, mums ir nepiecieÅ”ama piekļuve Aviasales API. Piekļuve tai tiek nodroÅ”ināta bez maksas un bez ierobežojumiem; jums tikai jāreÄ£istrējas sadaļā ā€œIzstrādātājiā€, lai saņemtu API marÄ·ieri, lai piekļūtu datiem.

Å Ä« raksta galvenais mērÄ·is ir sniegt vispārēju izpratni par informācijas straumÄ“Å”anas izmantoÅ”anu AWS; mēs ņemam vērā, ka izmantotie API atgrieztie dati nav stingri atjaunināti un tiek pārsÅ«tÄ«ti no keÅ”atmiņas, kas ir izveidots, pamatojoties uz vietņu Aviasales.ru un Jetradar.com lietotāju meklējumiem pēdējo 48 stundu laikā.

Kinesis aÄ£ents, kas instalēts ražoÅ”anas maŔīnā un saņemts, izmantojot API, automātiski parsēs un pārsÅ«tÄ«s datus uz vēlamo straumi, izmantojot Kinesis Data Analytics. Å Ä«s straumes neapstrādātā versija tiks rakstÄ«ta tieÅ”i veikalā. DynamoDB izvietotā neapstrādāto datu krātuve ļaus veikt dziļāku biļeÅ”u analÄ«zi, izmantojot BI rÄ«kus, piemēram, AWS Quick Sight.

Mēs apsvērsim divas iespējas visas infrastruktÅ«ras izvietoÅ”anai:

  • Rokasgrāmata - izmantojot AWS pārvaldÄ«bas konsoli;
  • InfrastruktÅ«ra no Terraform koda ir paredzēta slinkiem automātiem;

Izstrādātās sistēmas arhitektūra

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Izmantotās sastāvdaļas:

  • Aviasales API ā€” Ŕīs API atgrieztie dati tiks izmantoti visam turpmākajam darbam;
  • EC2 ražotāja instance ā€” parasta virtuālā maŔīna mākonÄ«, kurā tiks Ä£enerēta ievades datu straume:
    • Kinēzes aÄ£ents ir ierÄ«cē lokāli instalēta Java lietojumprogramma, kas nodroÅ”ina vienkārÅ”u veidu, kā apkopot un nosÅ«tÄ«t datus uz Kinesis (Kinesis Data Streams vai Kinesis Firehose). AÄ£ents pastāvÄ«gi uzrauga failu kopu norādÄ«tajos direktorijos un nosÅ«ta jaunus datus uz Kinesis;
    • API zvanÄ«tāja skripts ā€” Python skripts, kas veic pieprasÄ«jumus API un ievieto atbildi mapē, kuru uzrauga Kinesis aÄ£ents;
  • Kinesis datu straumes ā€” reāllaika datu straumÄ“Å”anas pakalpojums ar plaŔām mērogoÅ”anas iespējām;
  • Kinēzes analÄ«ze ir pakalpojums bez serveriem, kas vienkārÅ”o straumÄ“Å”anas datu analÄ«zi reāllaikā. Amazon Kinesis Data Analytics konfigurē lietojumprogrammu resursus un automātiski mērogojas, lai apstrādātu jebkāda apjoma ienākoÅ”os datus;
  • AWS Lambda ā€” pakalpojums, kas ļauj palaist kodu bez dublÄ“Å”anas vai serveru iestatÄ«Å”anas. Visa skaitļoÅ”anas jauda tiek automātiski mērogota katram zvanam;
  • Amazon DynamoDB - Atslēgu un vērtÄ«bu pāru un dokumentu datu bāze, kas nodroÅ”ina latentumu, kas mazāks par 10 milisekundēm, ja darbojas jebkurā mērogā. Izmantojot DynamoDB, jums nav jānodroÅ”ina, jālabo vai jāpārvalda serveri. DynamoDB automātiski mērogo tabulas, lai pielāgotu pieejamo resursu apjomu un uzturētu augstu veiktspēju. Nav nepiecieÅ”ama sistēmas administrÄ“Å”ana;
  • Amazon SNS - pilnÄ«bā pārvaldÄ«ts pakalpojums ziņojumu sÅ«tÄ«Å”anai, izmantojot izdevēja-abonenta (Pub/Sub) modeli, ar kuru varat izolēt mikropakalpojumus, izkliedētās sistēmas un lietojumprogrammas bez serveriem. SNS var izmantot, lai nosÅ«tÄ«tu informāciju gala lietotājiem, izmantojot mobilos push paziņojumus, SMS un e-pastus.

Sākotnējā apmācība

Lai atdarinātu datu plÅ«smu, es nolēmu izmantot Aviasales API atgriezto informāciju par aviobiļetēm. IN dokumentācija diezgan plaÅ”s dažādu metožu saraksts, ņemsim vienu no tiem - ā€œMēneÅ”a cenu kalendārsā€, kas atgriež cenas katrai mēneÅ”a dienai, sagrupētas pēc pārskaitÄ«jumu skaita. Ja pieprasÄ«jumā nenorādÄ«siet meklÄ“Å”anas mēnesi, informācija tiks atgriezta par nākamo mēnesi pēc paÅ”reizējā mēneÅ”a.

Tātad, reģistrēsimies un saņemsim savu žetonu.

Pieprasījuma piemērs ir zemāk:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

IepriekÅ” minētā metode datu saņemÅ”anai no API, pieprasÄ«jumā norādot pilnvaru, darbosies, taču es dodu priekÅ”roku piekļuves pilnvaras nodoÅ”anai caur galveni, tāpēc mēs izmantosim Å”o metodi skriptā api_caller.py.

Atbildes piemērs:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

AugŔējā API atbildes piemērā ir redzama biļete no Sanktpēterburgas uz Puku... Ak, kāds sapnis...
Tā kā esmu no Kazaņas, un Puketa tagad ir ā€œtikai sapnisā€, tad meklēsim biļetes no Sanktpēterburgas uz Kazaņu.

Tiek pieņemts, ka jums jau ir AWS konts. Uzreiz vēlos vērst Ä«paÅ”u uzmanÄ«bu, ka Kinesis un paziņojumu sÅ«tÄ«Å”ana ar SMS nav iekļauta gada Bezmaksas lÄ«menis (bezmaksas izmantoÅ”ana). Bet pat neskatoties uz to, ar pāris dolāriem prātā, ir pilnÄ«gi iespējams izveidot piedāvāto sistēmu un spēlēt ar to. Un, protams, neaizmirstiet izdzēst visus resursus pēc tam, kad tie vairs nav vajadzÄ«gi.

Par laimi, DynamoDb un lambda funkcijas mums bÅ«s bez maksas, ja mēs izpildÄ«sim savus mēneÅ”a bezmaksas limitus. Piemēram, DynamoDB: 25 GB krātuve, 25 WCU/RCU un 100 miljoni vaicājumu. Un miljons lambda funkciju zvanu mēnesÄ«.

Sistēmas manuāla izvietoÅ”ana

Kinesis datu straumju iestatīŔana

Dosimies uz pakalpojumu Kinesis Data Streams un izveidosim divas jaunas straumes, katrai pa vienai shardam.

Kas ir lauskas?
Shard ir Amazon Kinesis straumes pamata datu pārsÅ«tÄ«Å”anas vienÄ«ba. Viens segments nodroÅ”ina ievades datu pārraidi ar ātrumu 1 MB/s un izejas datu pārraidi ar ātrumu 2 MB/s. Viens segments atbalsta lÄ«dz 1000 PUT ierakstiem sekundē. Veidojot datu straumi, jānorāda nepiecieÅ”amais segmentu skaits. Piemēram, varat izveidot datu straumi ar diviem segmentiem. Å Ä« datu straume nodroÅ”inās ieejas datu pārsÅ«tÄ«Å”anu ar ātrumu 2 MB/s un izejas datu pārraidi ar ātrumu 4 MB/s, atbalstot lÄ«dz 2000 PUT ierakstiem sekundē.

Jo vairāk Ŕķembu jÅ«su straumē, jo lielāka ir tās caurlaidspēja. Principā plÅ«smas tiek mērogotas Ŕādi - pievienojot skaidiņas. Bet jo vairāk lauskas jums ir, jo augstāka cena. Katra Ŕķemba maksā 1,5 centus stundā un papildu 1.4 centus par katru miljonu PUT kravas vienÄ«bu.

Izveidosim jaunu straumi ar nosaukumu aviobiļetes, viņam pietiks ar 1 skaidiņu:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Tagad izveidosim citu pavedienu ar nosaukumu īpaŔā_straume:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem

Ražotāja iestatīŔana

Lai analizētu uzdevumu, pietiek ar parastu EC2 gadÄ«jumu kā datu veidotāju. Tai nav jābÅ«t jaudÄ«gai, dārgai virtuālai maŔīnai; t2.micro bÅ«s lieliski piemērots.

Svarīga piezīme: piemēram, jums vajadzētu izmantot attēlu - Amazon Linux AMI 2018.03.0, tam ir mazāk iestatījumu, lai ātri palaistu Kinesis Agent.

Dodieties uz pakalpojumu EC2, izveidojiet jaunu virtuālo maŔīnu, atlasiet vajadzīgo AMI ar tipu t2.micro, kas ir iekļauts bezmaksas līmenī:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Lai jaunizveidotā virtuālā maŔīna varētu mijiedarboties ar Kinesis servisu, tai ir jāpieŔķir tiesÄ«bas to darÄ«t. Labākais veids, kā to izdarÄ«t, ir pieŔķirt IAM lomu. Tāpēc ekrānā 3. darbÄ«ba: Instances informācijas konfigurÄ“Å”ana ir jāatlasa Izveidojiet jaunu IAM lomu:

IAM lomas izveide EC2
Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Atvērtajā logā atlasiet, ka mēs veidojam jaunu lomu EC2, un dodieties uz sadaļu Atļaujas:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Izmantojot apmācÄ«bu piemēru, mums nav jāiedziļinās visās resursu tiesÄ«bu detalizētās konfigurācijas sarežģītÄ«bās, tāpēc mēs atlasÄ«sim Amazon iepriekÅ” konfigurētās politikas: AmazonKinesisFullAccess un CloudWatchFullAccess.

PieŔķirsim Å”ai lomai kādu jēgpilnu nosaukumu, piemēram: EC2-KinesisStreams-FullAccess. Rezultātam jābÅ«t tādam paÅ”am, kā parādÄ«ts zemāk esoÅ”ajā attēlā:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Pēc Ŕīs jaunās lomas izveides neaizmirstiet to pievienot izveidotajai virtuālās maŔīnas instancei:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Mēs neko citu Å”ajā ekrānā nemainām un pārejam pie nākamajiem logiem.

Cietā diska iestatÄ«jumus var atstāt kā noklusējuma iestatÄ«jumus, kā arÄ« tagus (lai gan laba prakse ir izmantot tagus, vismaz pieŔķiriet instancei nosaukumu un norādiet vidi).

Tagad mēs atrodamies cilnē 6. darbÄ«ba: droŔības grupas konfigurÄ“Å”ana, kur jums ir jāizveido jauna vai jānorāda esoŔā droŔības grupa, kas ļauj izveidot savienojumu ar gadÄ«jumu, izmantojot ssh (ports 22). Tur atlasiet Avots -> Mans IP un varat palaist instanci.

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Tiklīdz tas pārslēdzas uz darbības statusu, varat mēģināt izveidot savienojumu ar to, izmantojot ssh.

Lai varētu strādāt ar Kinesis Agent, pēc veiksmÄ«gas savienojuma izveides ar iekārtu, terminālā jāievada Ŕādas komandas:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Izveidosim mapi API atbilžu saglabāŔanai:

sudo mkdir /var/log/airline_tickets

Pirms aÄ£enta palaiÅ”anas jums jākonfigurē tā konfigurācija:

sudo vim /etc/aws-kinesis/agent.json

Faila agent.json saturam vajadzētu izskatÄ«ties Ŕādi:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Kā redzams no konfigurācijas faila, aÄ£ents pārraudzÄ«s failus ar paplaÅ”inājumu .log direktorijā /var/log/airline_tickets/, parsēs tos un pārsÅ«tÄ«s uz aviobiļetes straumi.

Mēs restartējam pakalpojumu un pārliecināmies, ka tas ir izveidots un darbojas:

sudo service aws-kinesis-agent restart

Tagad lejupielādēsim Python skriptu, kas pieprasīs datus no API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Skripts api_caller.py pieprasa datus no Aviasales un saglabā saņemto atbildi direktorijā, ko skenē Kinesis aÄ£ents. Å Ä« skripta ievieÅ”ana ir diezgan standarta, ir TicketsApi klase, tā ļauj asinhroni vilkt API. Mēs nododam galveni ar pilnvaru un pieprasām parametrus Å”ai klasei:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Lai pārbaudītu pareizos aģenta iestatījumus un funkcionalitāti, pārbaudīsim skriptu api_caller.py:

sudo ./api_caller.py TOKEN

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Un mēs aplūkojam darba rezultātus aģentu žurnālos un cilnē Monitorings datu straumē airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Kā redzat, viss darbojas un Kinesis Agent veiksmīgi nosūta datus uz straumi. Tagad konfigurēsim patērētāju.

Kinesis Data Analytics iestatīŔana

Pārejam pie visas sistēmas centrālās sastāvdaļas ā€” izveidojiet jaunu lietojumprogrammu programmā Kinesis Data Analytics ar nosaukumu kinesis_analytics_airlines_app:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Kinesis Data Analytics ļauj veikt reāllaika datu analÄ«zi no Kinesis Streams, izmantojot SQL valodu. Tas ir pilnÄ«bā automātiskās mērogoÅ”anas pakalpojums (atŔķirÄ«bā no Kinesis Streams), kas:

  1. ļauj izveidot jaunas straumes (Output Stream), pamatojoties uz avota datu pieprasījumiem;
  2. nodroŔina straumi ar kļūdām, kas raduŔās lietojumprogrammu darbības laikā (Error Stream);
  3. var automātiski noteikt ievaddatu shēmu (ja nepiecieÅ”ams, to var manuāli pārdefinēt).

Å is nav lēts pakalpojums - 0.11 USD par darba stundu, tāpēc jums tas jāizmanto uzmanÄ«gi un jāizdzÄ“Å”, kad esat pabeidzis.

Savienosim lietojumprogrammu ar datu avotu:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Atlasiet straumi, ar kuru mēs izveidosim savienojumu (airline_tickets):

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Pēc tam jums jāpievieno jauna IAM loma, lai lietojumprogramma varētu lasīt no straumes un rakstīt straumē. Lai to izdarītu, pietiek neko nemainīt piekļuves atļauju blokā:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Tagad pieprasÄ«sim datu shēmas atklāŔanu straumē; lai to izdarÄ«tu, noklikŔķiniet uz pogas ā€œAtklāt shēmuā€. Rezultātā IAM loma tiks atjaunināta (tiks izveidota jauna) un tiks palaists shēmas noteikÅ”ana no datiem, kas jau ir nonākuÅ”i straumē:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Tagad jums jāiet uz SQL redaktoru. NoklikŔķinot uz Ŕīs pogas, parādÄ«sies logs ar aicinājumu palaist lietojumprogrammu - atlasiet, ko vēlaties palaist:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
SQL redaktora logā ievietojiet Ŕādu vienkārŔo vaicājumu un noklikŔķiniet uz Saglabāt un palaist SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Relāciju datu bāzēs jÅ«s strādājat ar tabulām, izmantojot INSERT priekÅ”rakstus, lai pievienotu ierakstus, un SELECT priekÅ”rakstu datu vaicāŔanai. Programmā Amazon Kinesis Data Analytics jÅ«s strādājat ar straumēm (STREAM) un sÅ«kņiem (PUMP) ā€” nepārtrauktiem ievietoÅ”anas pieprasÄ«jumiem, kas ievieto datus no vienas lietojumprogrammas straumes citā straumē.

IepriekÅ” parādÄ«tais SQL vaicājums meklē Aeroflot biļetes, kuru cena ir mazāka par pieciem tÅ«kstoÅ”iem rubļu. Visi ieraksti, kas atbilst Å”iem nosacÄ«jumiem, tiks ievietoti straumē DESTINATION_SQL_STREAM.

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Blokā Galamērķis atlasiet straumi special_stream un nolaižamajā sarakstā straumes nosaukums lietojumprogrammā DESTINATION_SQL_STREAM:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Visu manipulāciju rezultātam vajadzētu būt līdzīgam attēlā redzamajam:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem

SNS tēmas izveide un abonÄ“Å”ana

Dodieties uz VienkārÅ”o paziņojumu pakalpojumu un izveidojiet jaunu tēmu ar nosaukumu Aviokompānijas:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Abonējiet Å”o tēmu un norādiet mobilā tālruņa numuru, uz kuru tiks nosÅ«tÄ«ti SMS paziņojumi:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem

Izveidojiet tabulu DynamoDB

Lai saglabātu neapstrādātos datus no straumes airline_tickets, izveidosim DynamoDB tabulu ar tādu paÅ”u nosaukumu. Mēs izmantosim ieraksta_id kā primāro atslēgu:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem

Lambda funkciju savācēja izveide

Izveidosim lambda funkciju Collector, kuras uzdevums bÅ«s aptaujāt airline_tickets straumi un, ja tur tiks atrasti jauni ieraksti, ievietot Å”os ierakstus DynamoDB tabulā. AcÄ«mredzot papildus noklusējuma tiesÄ«bām Å”ai lambda ir jābÅ«t lasÄ«Å”anas piekļuvei Kinesis datu straumei un rakstÄ«Å”anas piekļuvei DynamoDB.

IAM lomas izveide kolektora lambda funkcijai
Vispirms izveidosim jaunu IAM lomu lambda ar nosaukumu Lambda-TicketsProcessingRole:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Testa piemērā iepriekÅ” konfigurētās AmazonKinesisReadOnlyAccess un AmazonDynamoDBFullAccess politikas ir diezgan piemērotas, kā parādÄ«ts attēlā zemāk:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem

Šī lambda ir jāpalaiž ar Kinesis aktivizētāju, kad airline_stream tiek ievadīti jauni ieraksti, tāpēc mums ir jāpievieno jauns aktivizētājs:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Atliek tikai ievietot kodu un saglabāt lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Lambda funkcijas paziņotāja izveide

LÄ«dzÄ«gi tiek izveidota arÄ« otrā lambda funkcija, kas uzraudzÄ«s otro straumi (special_stream) un nosÅ«tÄ«s paziņojumu SNS. Tāpēc Å”ai lambda ir jābÅ«t pieejai, lai lasÄ«tu no Kinesis un nosÅ«tÄ«tu ziņojumus uz doto SNS tēmu, ko pēc tam SNS pakalpojums nosÅ«tÄ«s visiem Ŕīs tēmas abonentiem (e-pasts, SMS utt.).

IAM lomas izveide
Vispirms mēs izveidojam IAM lomu Lambda-KinesisAlarm Å”ai lambda un pēc tam pieŔķiram Å”o lomu alarm_notifier lambda, kas tiek veidota:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem

Å ai lambda ir jādarbojas ar trigeri, lai jauni ieraksti ievadÄ«tu special_stream, tāpēc jums ir jākonfigurē aktivizētājs tādā paŔā veidā, kā mēs to darÄ«jām Collector lambda.

Lai atvieglotu Ŕīs lambda konfigurÄ“Å”anu, ieviesÄ«sim jaunu vides mainÄ«go ā€” TOPIC_ARN, kurā ievietojam Airlines tēmas ANR (Amazon Recourse Names):

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Un ievietojiet lambda kodu, tas nemaz nav sarežģīti:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Å Ä·iet, ka Å”eit ir pabeigta manuālā sistēmas konfigurÄ“Å”ana. Atliek tikai pārbaudÄ«t un pārliecināties, ka esam visu pareizi konfigurējuÅ”i.

Izvietot no Terraform koda

NepiecieŔamā sagatavoŔanās

Terraform ir ļoti ērts atvērtā pirmkoda rÄ«ks infrastruktÅ«ras izvietoÅ”anai no koda. Tam ir sava sintakse, ko ir viegli iemācÄ«ties, un tajā ir daudz piemēru, kā un ko izvietot. Atom redaktoram vai Visual Studio Code ir daudz parocÄ«gu spraudņu, kas atvieglo darbu ar Terraform.

JÅ«s varat lejupielādēt izplatÄ«Å”anu tātad. Detalizēta visu Terraform iespēju analÄ«ze ir ārpus Ŕī raksta darbÄ«bas jomas, tāpēc mēs aprobežosimies ar galvenajiem punktiem.

Kā palaist

Pilns projekta kods ir manā repozitorijā. Mēs klonējam krātuvi sev. Pirms darba sākÅ”anas jums ir jāpārliecinās, vai AWS CLI ir instalēta un konfigurēta, jo... Terraform meklēs akreditācijas datus failā ~/.aws/credentials.

Laba prakse ir palaist plāna komandu pirms visas infrastruktÅ«ras izvietoÅ”anas, lai redzētu, ko Terraform paÅ”laik mums veido mākonÄ«:

terraform.exe plan

Jums tiks piedāvāts ievadīt tālruņa numuru, uz kuru nosūtīt paziņojumus. Šajā posmā tas nav jāievada.

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Izanalizējot programmas darbības plānu, varam sākt veidot resursus:

terraform.exe apply

Pēc Ŕīs komandas nosÅ«tÄ«Å”anas jums atkal tiks lÅ«gts ievadÄ«t tālruņa numuru; sastādiet ā€œjāā€, kad tiek parādÄ«ts jautājums par darbÄ«bu faktisko izpildi. Tas ļaus jums izveidot visu infrastruktÅ«ru, veikt visu nepiecieÅ”amo EC2 konfigurāciju, izvietot lambda funkcijas utt.

Kad visi resursi ir veiksmÄ«gi izveidoti, izmantojot Terraform kodu, jums jāiedziļinās Kinesis Analytics lietojumprogrammas detaļās (diemžēl es neatradu, kā to izdarÄ«t tieÅ”i no koda).

Palaidiet lietojumprogrammu:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Pēc tam jums ir skaidri jāiestata lietojumprogrammas straumes nosaukums, nolaižamajā sarakstā atlasot:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Tagad viss ir gatavs darbam.

Lietojumprogrammas testēŔana

Neatkarīgi no tā, kā jūs izvietojāt sistēmu, manuāli vai izmantojot Terraform kodu, tā darbosies tāpat.

Mēs piesakāmies, izmantojot SSH, virtuālajā maŔīnā EC2, kurā ir instalēts Kinesis Agent, un palaižam skriptu api_caller.py.

sudo ./api_caller.py TOKEN

Viss, kas jums jādara, ir jāgaida SMS uz jūsu numuru:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
SMS ā€” ziņa uz jÅ«su tālruni nonāk gandrÄ«z 1 minÅ«tes laikā:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem
Atliek noskaidrot, vai ieraksti tika saglabāti DynamoDB datu bāzē turpmākai, detalizētākai analÄ«zei. Aviobiļetes tabulā ir aptuveni Ŕādi dati:

Aviasales API integrācija ar Amazon Kinesis un vienkārŔība bez serveriem

Secinājums

Paveiktā darba gaitā tika uzbÅ«vēta tieÅ”saistes datu apstrādes sistēma uz Amazon Kinesis bāzes. Tika apsvērtas iespējas izmantot Kinesis Agent kopā ar Kinesis datu straumēm un reāllaika analÄ«zi Kinesis Analytics, izmantojot SQL komandas, kā arÄ« Amazon Kinesis mijiedarbÄ«ba ar citiem AWS pakalpojumiem.

IepriekÅ” minēto sistēmu mēs izvietojām divos veidos: diezgan garu manuālu un ātru no Terraform koda.

Ir pieejams viss projekta pirmkods manā GitHub repozitorijā, iesaku ar to iepazīties.

Es priecājos apspriest rakstu, es gaidu jūsu komentārus. Ceru uz konstruktīvu kritiku.

Es vēlu jums veiksmi!

Avots: www.habr.com

Pievieno komentāru