ProHoster > Blog > Bestjoer > Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Hoi Habr!
Hâldsto fan fleanende fleantugen? Ik hâld derfan, mar tidens selsisolaasje rekke ik ek fereale op it analysearjen fan gegevens oer fleankaarten fan ien bekende boarne - Aviasales.
Hjoed sille wy analysearje it wurk fan Amazon Kinesis, bouwe in streaming systeem mei real-time analytics, ynstallearje de Amazon DynamoDB NoSQL databank as de wichtichste gegevens opslach, en set SMS notifikaasjes foar nijsgjirrige tickets.
Alle details binne ûnder de besuniging! Go!
Ynlieding
Foar it foarbyld, wy moatte tagong ta Aviasales API. Tagong ta it wurdt fergees en sûnder beheiningen levere; jo moatte gewoan registrearje yn 'e seksje "ûntwikkelders" om jo API-token te ûntfangen om tagong te krijen ta de gegevens.
It haaddoel fan dit artikel is om in algemien begryp te jaan fan it gebrûk fan streaming fan ynformaasje yn AWS; wy nimme rekken mei dat de gegevens weromjûn troch de brûkte API net strikt aktueel binne en wurde oerdroegen fanút it cache, dat is foarme basearre op sykopdrachten troch brûkers fan de Aviasales.ru en Jetradar.com siden foar de lêste 48 oeren.
Kinesis-agent, ynstalleare op 'e produsearjende masine, ûntfongen fia de API sil automatysk gegevens analysearje en ferstjoere nei de winske stream fia Kinesis Data Analytics. De rauwe ferzje fan dizze stream sil direkt nei de winkel skreaun wurde. De rûge gegevens opslach ynset yn DynamoDB sil djipper ticketanalyse mooglik meitsje fia BI-ark, lykas AWS Quick Sight.
Wy sille twa opsjes beskôgje foar it ynsetten fan 'e heule ynfrastruktuer:
Hânlieding - fia AWS Management Console;
Ynfrastruktuer út Terraform koade is foar lui automators;
Arsjitektuer fan it ûntwikkele systeem
Gebrûkte komponinten:
Aviasales API - de gegevens weromjûn troch dizze API sille wurde brûkt foar alle folgjende wurk;
EC2 Producer Instance - in gewoane firtuele masine yn 'e wolk wêrop de ynfiergegevensstream sil wurde generearre:
Kinesis Agent is in Java-applikaasje ynstalleare lokaal op 'e masine dy't in maklike manier leveret om gegevens te sammeljen en te stjoeren nei Kinesis (Kinesis Data Streams of Kinesis Firehose). De agint kontrolearret konstant in set bestannen yn 'e oantsjutte mappen en stjoert nije gegevens nei Kinesis;
API Caller Script - In Python-skript dat oanfragen oan 'e API makket en it antwurd pleatst yn in map dy't wurdt kontrolearre troch de Kinesis Agent;
Kinesis Data Streams - realtime datastreamingtsjinst mei brede skaalmooglikheden;
Kinesis Analytics is in serverless tsjinst dy't simplifies de analyze fan streaming gegevens yn real time. Amazon Kinesis Data Analytics konfigurearret applikaasje-boarnen en skalen automatysk om elke folume fan ynkommende gegevens te behanneljen;
AWS Lambda - in tsjinst wêrmei jo koade kinne útfiere sûnder in reservekopy te meitsjen of servers yn te stellen. Alle kompjûterkrêft wurdt automatysk skale foar elke oprop;
Amazon DynamoDB - In databank fan kaai-wearde-pearen en dokuminten dy't wachttiid leveret fan minder dan 10 millisekonden by it rinnen op elke skaal. By it brûken fan DynamoDB hoege jo gjin servers te leverjen, te patchjen of te behearjen. DynamoDB skale automatysk tabellen om it bedrach fan beskikbere boarnen oan te passen en hege prestaasjes te behâlden. Gjin systeem administraasje is nedich;
Amazon SNS - in folslein beheare tsjinst foar it ferstjoeren fan berjochten mei it model fan 'e publisher-abonnee (Pub / Sub) wêrmei jo mikrotsjinsten, ferspraat systemen en serverless applikaasjes kinne isolearje. SNS kin brûkt wurde om ynformaasje te stjoeren nei ein brûkers fia mobile push-notifikaasjes, SMS-berjochten en e-mails.
Initial training
Om de gegevensstream te emulearjen, besleat ik de ynformaasje oer loftlinekaarten te brûken weromjûn troch de Aviasales API. YN dokumintaasje nochal in wiidweidige list fan ferskillende metoaden, lit ús nimme ien fan harren - "Moanlikse Priis Calendar", dy't jout prizen foar elke dei fan 'e moanne, groepearre troch it oantal oerstappen. As jo de sykmoanne net oantsjutte yn it fersyk, sil ynformaasje weromjûn wurde foar de moanne nei de aktuele.
De boppesteande metoade foar it ûntfangen fan gegevens fan 'e API troch it opjaan fan in token yn it fersyk sil wurkje, mar ik leaver it tagongstoken troch de koptekst troch te jaan, dus wy sille dizze metoade brûke yn it api_caller.py-skript.
It foarbyld API-antwurd hjirboppe toant in kaartsje fan Sint Petersburch nei Phuk ... Oh, wat in dream ...
Sûnt ik bin út Kazan, en Phuket is no "allinnich in dream", lit ús sykje kaartsjes út Sint Petersburch nei Kazan.
It giet derfan út dat jo al in AWS-akkount hawwe. Ik wol fuortendaliks spesjaal omtinken jaan oan it feit dat Kinesis en it ferstjoeren fan notifikaasjes fia SMS net opnommen binne yn 'e jierlikse Free Tier (fergees gebrûk). Mar sels nettsjinsteande dit, mei in pear dollar yn gedachten, is it hiel mooglik om it foarstelde systeem te bouwen en dermei te boartsje. En ferjit fansels net alle boarnen te wiskjen nei't se net mear nedich binne.
Gelokkich sille DynamoDb- en lambda-funksjes fergees wêze foar ús as wy foldogge oan ús moanlikse frije grinzen. Bygelyks foar DynamoDB: 25 GB opslach, 25 WCU / RCU en 100 miljoen queries. En in miljoen lambdafunksje-oproppen per moanne.
Hânlieding systeem ynset
Kinesis Data Streams ynstelle
Litte wy nei de Kinesis Data Streams-tsjinst gean en twa nije streamen meitsje, ien shard foar elk.
Wat is in skerpe?
In shard is de basisgegevensferfier-ienheid fan in Amazon Kinesis-stream. Ien segmint soarget foar oerdracht fan ynfiergegevens mei in snelheid fan 1 MB / s en útfiergegevensferfier mei in snelheid fan 2 MB / s. Ien segmint stipet maksimaal 1000 PUT-yngongen per sekonde. By it meitsjen fan in gegevensstream moatte jo it fereaske oantal segminten opjaan. Jo kinne bygelyks in gegevensstream meitsje mei twa segminten. Dizze gegevensstream sil ynfiergegevensferfier leverje mei 2 MB / s en útfiergegevensferfier op 4 MB / s, en stypje oant 2000 PUT-records per sekonde.
Hoe mear stikken yn jo stream, hoe grutter de trochslach is. Yn prinsipe is dit hoe't streamen wurde skalearre - troch it tafoegjen fan shards. Mar hoe mear shards jo hawwe, hoe heger de priis. Elke shard kostet 1,5 sinten per oere en in ekstra 1.4 sinten foar elke miljoen PUT-ladingsienheden.
Litte wy in nije stream meitsje mei de namme airline_tickets, 1 skerp sil him genôch wêze:
Litte wy no in oare thread meitsje mei de namme spesjale_stream:
Produsint opset
Om in taak te analysearjen, is it genôch om in gewoane EC2-eksimplaar te brûken as gegevensprodusint. It hoecht net in krêftige, djoere firtuele masine te wêzen; in plak t2.micro sil it goed dwaan.
Wichtige opmerking: jo moatte bygelyks ôfbylding brûke - Amazon Linux AMI 2018.03.0, it hat minder ynstellingen foar it fluch starten fan de Kinesis Agent.
Gean nei de EC2-tsjinst, meitsje in nije firtuele masine, selektearje de winske AMI mei type t2.micro, dy't opnommen is yn 'e Free Tier:
Om de nij oanmakke firtuele masine te kinnen ynteraksje mei de Kinesis-tsjinst, moat it rjochten krije om dat te dwaan. De bêste manier om dit te dwaan is om in IAM-rol te jaan. Dêrom moatte jo op it skerm Stap 3: Ynstellingsdetails ynstelle, selektearje Meitsje nije IAM Rol:
It meitsjen fan in IAM-rol foar EC2
Selektearje yn it finster dat iepent dat wy in nije rol meitsje foar EC2 en gean nei de seksje Permissions:
Mei it brûken fan it trainingsfoarbyld hoege wy net yn te gean yn alle yngewikkeldheden fan korrelige konfiguraasje fan boarnerjochten, dus wy sille it belied selektearje dat foarôf konfigureare is troch Amazon: AmazonKinesisFullAccess en CloudWatchFullAccess.
Lit ús in betsjuttingsfolle namme jaan foar dizze rol, bygelyks: EC2-KinesisStreams-FullAccess. It resultaat moat itselde wêze as werjûn yn 'e foto hjirûnder:
Nei it oanmeitsjen fan dizze nije rol, ferjit it net te heakjen oan it oanmakke firtuele masine-eksimplaar:
Wy feroarje neat oars op dit skerm en gean troch nei de folgjende finsters.
De hurde skiifynstellingen kinne as standert oerlitten wurde, lykas de tags (hoewol't it in goede praktyk is om tags te brûken, jou it eksimplaar teminsten in namme en jouwe de omjouwing oan).
No binne wy op 'e stap 6: ljepblêd Feiligensgroep konfigurearje, wêr't jo in nije moatte oanmeitsje of jo besteande Feiligensgroep opjaan, wêrtroch jo kinne ferbine fia ssh (poarte 22) mei it eksimplaar. Selektearje Boarne -> Myn IP dêr en jo kinne it eksimplaar starte.
Sadree't it oerstapt nei rinnende status, kinne jo besykje te ferbinen mei it fia ssh.
Om mei Kinesis Agent te wurkjen, moatte jo nei suksesfolle ferbining mei de masine de folgjende kommando's ynfiere yn 'e terminal:
As kin sjoen wurde út it konfiguraasjetriem, sil de agint bestannen kontrolearje mei de .log-útwreiding yn 'e /var/log/airline_tickets/-map, se parse en oerdrage nei de airline_tickets-stream.
Wy starte de tsjinst opnij en soargje derfoar dat it op en rint:
sudo service aws-kinesis-agent restart
Litte wy no it Python-skript downloade dat gegevens sil freegje fan 'e API:
It skript api_caller.py freget gegevens fan Aviasales en bewarret it ûntfongen antwurd yn 'e map dy't de Kinesis-agint scant. De ymplemintaasje fan dit skript is frij standert, der is in TicketsApi klasse, it kinne jo asynchronously lûke de API. Wy passe in koptekst mei in token en freegje parameters oan dizze klasse:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
Om de juste ynstellingen en funksjonaliteit fan 'e agint te testen, litte wy it skript api_caller.py testje:
sudo ./api_caller.py TOKEN
En wy sjogge nei it resultaat fan it wurk yn 'e Agent-logs en op it ljepblêd Monitoring yn' e gegevensstream fan airline_tickets:
Sa't jo sjen kinne, wurket alles en de Kinesis Agent stjoert mei súkses gegevens nei de stream. Litte wy no de konsumint konfigurearje.
Kinesis Data Analytics ynstelle
Litte wy trochgean nei de sintrale komponint fan it heule systeem - meitsje in nije applikaasje yn Kinesis Data Analytics mei de namme kinesis_analytics_airlines_app:
Kinesis Data Analytics kinne jo realtime gegevensanalytyk útfiere fan Kinesis Streams mei de SQL-taal. It is in folslein autoskalearjende tsjinst (oars as Kinesis Streams) dat:
kinne jo meitsje nije streamen (Output Stream) basearre op fersiken om boarne gegevens;
jout in stream mei flaters dy't barde wylst applikaasjes rinne (Flater Stream);
kin automatysk bepale de ynfier gegevens skema (it kin wurde manuell op 'e nij definiearre as it nedich is).
Dit is gjin goedkeape tsjinst - 0.11 USD per oere wurk, dus jo moatte it foarsichtich brûke en wiskje as jo klear binne.
Litte wy de applikaasje ferbine mei de gegevensboarne:
Selektearje de stream wêrmei wy ferbine sille (airline_tickets):
Dêrnei moatte jo in nije IAM-rol taheakje, sadat de applikaasje kin lêze fan 'e stream en skriuwe nei de stream. Om dit te dwaan is it genôch om neat te feroarjen yn it blok tagongsrjochten:
Litte wy no ûntdekking oanfreegje fan it gegevensskema yn 'e stream; Klikje om dit te dwaan op de knop "Skema ûntdekke". As gefolch sil de IAM-rol bywurke wurde (in nije sil oanmakke wurde) en skemadeteksje sil wurde lansearre fanút de gegevens dy't al yn 'e stream binne oankommen:
No moatte jo nei de SQL-bewurker gean. As jo op dizze knop klikke, sil in finster ferskine dat jo freget de applikaasje te starten - selektearje wat jo wolle starte:
Foegje de folgjende ienfâldige query yn it SQL-bewurkerfinster yn en klikje op Save and Run SQL:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
Yn relasjonele databases wurkje jo mei tabellen mei INSERT-útspraken om records ta te foegjen en in SELECT-útspraak om gegevens te freegjen. Yn Amazon Kinesis Data Analytics wurkje jo mei streamen (STREAM's) en pompen (PUMP's) - trochgeande ynfoegje oanfragen dy't gegevens fan ien stream yn in applikaasje ynfoegje yn in oare stream.
De hjirboppe presintearre SQL-query siket nei Aeroflot-kaarten foar in kosten fan minder dan fiif tûzen roebel. Alle records dy't foldogge oan dizze betingsten sille wurde pleatst yn 'e DESTINATION_SQL_STREAM-stream.
Selektearje yn it bestimmingsblok de special_stream stream, en yn 'e yn-applikaasje streamnamme DESTINATION_SQL_STREAM drop-down list:
It resultaat fan alle manipulaasjes moat wat ferlykber wêze mei de ôfbylding hjirûnder:
In SNS-ûnderwerp oanmeitsje en ynskriuwe
Gean nei de Simple Notification Service en meitsje dêr in nij ûnderwerp mei de namme Airlines:
Abonnearje op dit ûnderwerp en jou it mobile tillefoannûmer oan wêrnei't SMS-notifikaasjes sille wurde ferstjoerd:
Meitsje in tabel yn DynamoDB
Om de rauwe gegevens fan har airline_tickets-stream op te slaan, litte wy in tabel meitsje yn DynamoDB mei deselde namme. Wy sille record_id brûke as de primêre kaai:
It meitsjen fan in lambda funksje samler
Litte wy in lambda-funksje oanmeitsje mei de namme Collector, waans taak sil wêze om de stream fan airline_tickets te ûndersiikjen en, as der nije records wurde fûn, dizze records ynfoegje yn 'e DynamoDB-tabel. Fansels moat dizze lambda, neist de standertrjochten, lêstagong hawwe ta de Kinesis-gegevensstream en skriuwtagong ta DynamoDB.
It meitsjen fan in IAM-rol foar de samler lambda-funksje
Litte wy earst in nije IAM-rol meitsje foar de lambda mei de namme Lambda-TicketsProcessingRole:
Foar it testfoarbyld binne it foarôf ynstelde AmazonKinesisReadOnlyAccess- en AmazonDynamoDBFullAccess-belied frij geskikt, lykas werjûn yn 'e ôfbylding hjirûnder:
Dizze lambda moat wurde lansearre troch in trigger fan Kinesis as nije yngongen de airline_stream ynfiere, dus moatte wy in nije trigger tafoegje:
Alles wat oerbliuwt is de koade yn te foegjen en de lambda op te slaan.
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
It meitsjen fan in notifier foar lambdafunksje
De twadde lambda-funksje, dy't de twadde stream (special_stream) kontrolearret en in notifikaasje nei SNS stjoert, wurdt op in fergelykbere manier makke. Dêrom moat dizze lambda tagong hawwe om te lêzen fan Kinesis en berjochten te stjoeren nei in opjûne SNS-ûnderwerp, dy't dan troch de SNS-tsjinst stjoerd wurde nei alle abonnees fan dit ûnderwerp (e-post, SMS, ensfh.).
It meitsjen fan in IAM-rol
Earst meitsje wy de IAM-rol Lambda-KinesisAlarm foar dizze lambda, en jouwe dizze rol dan ta oan de alarm_notifier lambda dy't wurdt makke:
Dizze lambda moat wurkje oan in trigger foar nije records om de special_stream yn te gean, dus jo moatte de trigger op deselde manier konfigurearje as wy diene foar de Collector lambda.
Om it makliker te meitsjen om dizze lambda te konfigurearjen, litte wy in nije omjouwingsfariabele yntrodusearje - TOPIC_ARN, wêr't wy de ANR (Amazon Recourse Names) fan it Airlines-ûnderwerp pleatse:
En foegje de lambda-koade yn, it is hielendal net yngewikkeld:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
It liket derop dat dit is wêr't de hânmjittige systeemkonfiguraasje is foltôge. Alles wat oerbliuwt is te testen en te soargjen dat wy alles goed hawwe ynsteld.
Ynsette fan Terraform koade
Ferplichte tarieding
Terraform is in heul handich iepen boarne-ark foar it ynsetten fan ynfrastruktuer fan koade. It hat in eigen syntaksis dy't maklik te learen is en hat in protte foarbylden fan hoe en wat te ynsetten. De Atom-bewurker as Visual Studio Code hat in protte handige plugins dy't it wurkjen mei Terraform makliker meitsje.
Jo kinne download de distribúsje fan hjir. In detaillearre analyze fan alle Terraform-mooglikheden is bûten it berik fan dit artikel, dus wy sille ús beheine ta de haadpunten.
Hoe begjinne
De folsleine koade fan it projekt is yn myn repository. Wy klonearje it repository foar ússels. Foardat jo begjinne, moatte jo derfoar soargje dat jo AWS CLI ynstalleare en konfigureare hawwe, om't ... Terraform sil sykje nei bewiisbrieven yn it ~/.aws/credentials-bestân.
In goede praktyk is om it plankommando út te fieren foardat de heule ynfrastruktuer ynset wurdt om te sjen wat Terraform op it stuit foar ús yn 'e wolk makket:
terraform.exe plan
Jo wurde frege om in telefoannûmer yn te fieren om notifikaasjes nei te stjoeren. It is net nedich om it yn dit stadium yn te fieren.
Nei it analysearjen fan it operaasjeplan fan it programma, kinne wy begjinne mei it meitsjen fan boarnen:
terraform.exe apply
Nei it ferstjoeren fan dit kommando wurde jo opnij frege om in telefoannûmer yn te fieren; skilje "ja" as in fraach oer it feitlik útfieren fan de aksjes wurdt werjûn. Hjirmei kinne jo de heule ynfrastruktuer ynstelle, alle nedige konfiguraasje fan EC2 útfiere, lambda-funksjes ynsette, ensfh.
Nei't alle boarnen mei sukses makke binne troch de Terraform-koade, moatte jo yn 'e details fan' e Kinesis Analytics-applikaasje gean (spitigernôch haw ik net fûn hoe't jo dit direkt fan 'e koade kinne dwaan).
Starte de applikaasje:
Hjirnei moatte jo de namme fan 'e stream yn' e applikaasje eksplisyt ynstelle troch te selektearjen út 'e dellûklist:
No is alles klear om te gean.
Testje de applikaasje
Nettsjinsteande hoe't jo it systeem ynset hawwe, mei de hân of fia Terraform-koade, sil it itselde wurkje.
Wy ynlogge fia SSH nei de EC2 firtuele masine dêr't Kinesis Agent is ynstallearre en rinne it api_caller.py skript
sudo ./api_caller.py TOKEN
Alles wat jo hoege te dwaan is wachtsje op in SMS nei jo nûmer:
SMS - in berjocht komt yn hast 1 minút op 'e telefoan:
It bliuwt om te sjen oft de records waarden bewarre yn 'e DynamoDB-database foar folgjende, mear detaillearre analyse. De tabel foar airline_tickets befettet sawat de folgjende gegevens:
konklúzje
Yn 'e rin fan it dien wurk waard in online gegevensferwurkingssysteem boud basearre op Amazon Kinesis. Opsjes foar it brûken fan de Kinesis Agent yn gearhing mei Kinesis Data Streams en real-time analytics Kinesis Analytics mei SQL-kommando's, lykas de ynteraksje fan Amazon Kinesis mei oare AWS-tsjinsten waarden beskôge.
Wy hawwe it boppesteande systeem op twa manieren ynset: in frij lange hânlieding en in flugge fan 'e Terraform-koade.
Alle projekt boarne koade is beskikber yn myn GitHub-repository, Ik stel foar dat jo jo dermei fertroud meitsje.
Ik bin bliid om it artikel te besprekken, ik sjoch út nei jo opmerkings. Ik hoopje op konstruktive krityk.