Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Прывітанне, Хабр!
А вы любіце лятаць на самалётах? Я люблю, але на самаізаляцыі палюбіў яшчэ і аналізаваць дадзеныя аб авіябілетах аднаго вядомага рэсурсу – Aviasales.
Сёння мы разбяром працу Amazon Kinesis, пабудуем стрыммінгавую сістэму з рэал-тайм аналітыкай, паставім NoSQL базу дадзеных Amazon DynamoDB у якасці асноўнага сховішчы дадзеных і наладзім апавяшчэнне праз SMS па цікавых квітках.
Усе падрабязнасці пад катом! Паехалі!
Увядзенне
Для прыкладу нам спатрэбіцца доступ да API Aviasales. Доступ да яго прадастаўляецца бясплатна і без абмежаванняў, неабходна толькі зарэгістравацца ў раздзеле "Распрацоўшчыкам", каб атрымаць свой API токен для доступу да дадзеных.
Асноўная мэта дадзенага артыкула – даць агульнае разуменне выкарыстання струменевай перадачы інфармацыі ў AWS, мы выносім за дужкі, што дадзеныя, якія вяртаюцца выкарыстоўваным API не з'яўляюцца строга актуальнымі і перадаюцца з кэша, які фармуецца на падставе пошукаў карыстачоў сайтаў Aviasales.ru і Jetradar.com за апошнія 48 гадзін.
Атрыманыя праз API дадзеныя аб авіябілетах Kinesis-agent, усталяваны на машыне-прадзюсары, будзе аўтаматам парсіць і перадаваць у патрэбны струмень праз Kinesis Data Analytics. Неапрацаваная версія гэтага патоку будзе пісацца напрамую ў сховішча. Разгорнутае ў DynamoDB сховішча "волкіх" дадзеных дазволіць праводзіць глыбейшы аналіз квіткоў праз BI прылады, напрыклад, AWS Quick Sight.
Мы разгледзім два варыянты дэплою ўсёй інфраструктуры:
Ручны - праз AWS Management Console;
Інфраструктура з кода Terraform – для лянівых аўтаматызатараў;
Архітэктура распрацоўванай сістэмы
Выкарыстоўваныя кампаненты:
Aviasales API - дадзеныя, якія вяртаюцца гэтым API, будуць выкарыстоўвацца для ўсёй наступнай працы;
EC2 Producer Instance - звычайная віртуальная машына ў воблаку, на якой будзе генеравацца ўваходны паток дадзеных:
Kinesis Agent - гэта Java-дадатак, якое ўстанаўліваецца лакальна на машыну, якое дае просты спосаб збору і адпраўкі дадзеных у Kinesis (Kinesis Data Streams або Kinesis Firehose). Агент увесь час адсочвае набор файлаў у паказаных дырэкторыях і адпраўляе новыя дадзеныя ў Kinesis;
Скрыпт API Caller - Python-скрыпт, які робіць запыты да API і які складае адказ у тэчку, якую маніторыць Kinesis Agent;
Kinesis Data Streams - сэрвіс струменевай перадачы дадзеных у рэжыме рэальнага часу з шырокімі магчымасцямі маштабавання;
Kinesis Analytics - бессерверны сэрвіс, які спрашчае аналіз струменевых дадзеных у рэжыме рэальнага часу. Amazon Kinesis Data Analytics наладжвае рэсурсы для працы прыкладанняў і аўтаматычна маштабуецца для апрацоўкі любых аб'ёмаў уваходных дадзеных;
AWS лямбда - сэрвіс, які дазваляе запускаць код без рэзервавання і налады сервераў. Усе вылічальныя магутнасці аўтаматычна маштабуюцца пад кожны выклік;
Amazon DynamoDB - база дадзеных пар "ключ-значэнне" і дакументаў, якая забяспечвае затрымку менш за 10 мілісекунд пры працы ў любым маштабе. Пры выкарыстанні DynamoDB не патрабуецца размяркоўваць якія-небудзь серверы, усталёўваць на іх выпраўленні ці кіраваць імі. DynamoDB аўтаматычна маштабуе табліцы, карэктуючы аб'ём даступных рэсурсаў і захоўваючы высокую прадукцыйнасць. Ніякія дзеянні па адміністраванні сістэмы не патрабуюцца;
Amazon SNS – цалкам кіраваны сэрвіс адпраўкі паведамленняў па мадэлі «выдавец – падпісант» (Pub/Sub), з дапамогай якога можна ізаляваць мікрасэрвісы, размеркаваныя сістэмы і бессерверныя прыкладанні. SNS можна выкарыстоўваць для рассылання інфармацыі канчатковым карыстальнікам з дапамогай мабільных push-паведамленняў, SMS-паведамленняў і электронных лістоў.
Пачатковая падрыхтоўка
Для эмуляцыі патоку дадзеных я вырашыў выкарыстоўваць інфармацыю аб авіябілетах, якая вяртаецца API Aviasales. У дакументацыі даволі шырокі спіс розных метадаў, возьмем адзін з іх - «Каляндар коштаў на месяц», які вяртае кошты за кожны дзень месяца, згрупаваныя па колькасці перасадак. Калі не перадаваць у запыце месяц пошуку, то будзе вернутая інфармацыя за месяц, наступны за бягучым.
Такім чынам, рэгіструемся, атрымліваем свой токен.
Вышэйапісаны спосаб атрымання дадзеных ад API з указаннем токена ў запыце будзе працаваць, але мне больш падабаецца перадаваць токен доступу праз загаловак, таму ў скрыпце api_caller.py будзем карыстацца менавіта гэтым спосабам.
У прыкладзе адказу API вышэй паказаны білет з Санкт-Пецярбурга ў Пхук… Эх, ды што марыць…
Так як я з Казані, а Пхукет зараз «нам толькі сніцца», пашукаем квіткі з Санкт-Пецярбурга ў Казань.
Мяркуецца, што ў вас ужо ёсць рахунак на AWS. Адразу хачу звярнуць асаблівую ўвагу, што Kinesis і адпраўка апавяшчэнняў праз SMS не ўваходзяць у гадавы Free Tier (бясплатнае выкарыстанне). Але нават нягледзячы на гэта, заклаўшы ў розуме пары даляраў, суцэль можна пабудаваць прапанаваную сістэму і пагуляць з ёй. І, вядома ж, не варта забываць выдаляць усе рэсурсы пасля таго, як яны сталі не патрэбны.
На шчасце, DynamoDb і лямбда-функцыі будуць для нас умоўна бясплатнымі, калі ўкласціся ў месячныя бясплатныя ліміты. Напрыклад, для DynamoDB: 25 Гб сховішчы, 25 WCU/RCU і 100 млн. запытаў. І мільён выклікаў лямбда функцый у месяц.
Ручны дэплой сістэмы
Настройка Kinesis Data Streams
Пяройдзем у сэрвіс Kinesis Data Streams і ствараем два новых патоку па адным шардзе на кожны.
Што такое шард?
Шард - гэта асноўная адзінка перадачы дадзеных патоку Amazon Kinesis. Адзін сегмент забяспечвае перадачу ўваходных дадзеных са хуткасцю 1 МБ/С і перадачу выходных дадзеных са хуткасцю 2 МБ/С. Адзін сегмент падтрымлівае да 1000 запісаў PUT у секунду. Пры стварэнні струменя дадзеных патрабуецца паказаць патрэбную колькасць сегментаў. Напрыклад, можна стварыць струмень дадзеных з двума сегментамі. Гэты струмень дадзеных забяспечыць перадачу ўваходных дадзеных са хуткасцю 2 МБ/С і перадачу выходных дадзеных са хуткасцю 4 МБ/С з падтрымкай да 2000 запісаў PUT у секунду.
Чым больш шардов у вашым струмені - тым больш яго прапускная здольнасць. У прынцыпе, так і маштабуюцца патокі - шляхам дадання шардаў. Але чым больш у вас шардаў, тым вышэй і кошт. Кожны шард каштуе 1,5 цэнта ў гадзіну і дадаткова 1.4/XNUMX цэнта за кожныя мільён аперацый дадання ў струмень (PUT payload units).
Створым новы паток з імем airline_tickets, яму цалкам дастаткова будзе 1 шарда:
Цяпер створым яшчэ адзін паток з імем special_stream:
Настройка прадзюсара
У якасці прадзюсара дадзеных для разбору задачы дастаткова выкарыстоўваць звычайны EC2 інстанс. Гэта не павінна быць магутная дарагая віртуальная машына, суцэль падыдзе спотавых t2.micro.
Важная заўвага: для прыкладу варта выкарыстоўваць image – Amazon Linux AMI 2018.03.0, з ім менш налад для хуткага запуску Kinesis Agent.
Пераходзім у сэрвіс EC2, ствараем новую віртуальную машыну, выбіраемы патрэбны AMI з тыпам t2.micro, які ўваходзіць ва Free Tier:
Для таго, каб ізноў створаная віртуальная машына змагла ўзаемадзейнічаць з сэрвісам Kinesis, неабходна даць ёй на гэтае права. Лепшы спосаб гэта зрабіць - прызначыць IAM Role. Таму, на экране Step 3: Configure Instance Details трэба абраць Create new IAM Role:
Стварэнне IAM ролі для EC2
У якое адкрылася акне, выбіраемы, што новую ролю ствараем для EC2 і пераходзім у падзел Permissions:
На вучэбным прыкладзе можна не ўдавацца ва ўсе тонкасці гранулярнай наладкі правоў на рэсурсы, таму выберам праднастроеныя Амазонам полісі: AmazonKinesisFullAccess і CloudWatchFullAccess.
Дамо якое-небудзь асэнсаванае імя для гэтай ролі, напрыклад: EC2-KinesisStreams-FullAccess. У выніку, павінна атрымацца тое ж самае, што паказана на малюнку ніжэй:
Пасля стварэння гэтай новай ролі, не забываем прычапіць яе да стваранага інстанса віртуальнай машыны:
Больш на гэтым экране нічога не мяняем і пераходзім да наступных вокнаў.
Параметры жорсткага дыска можна пакінуць па змаўчанні, тэгі таксама (хоць, добрай практыкай з'яўляецца тэгі выкарыстоўваць, хаця б даваць імя інстансу і ўказваць энвайранмент).
Цяпер мы на закладцы Step 6: Configure Security Group, дзе неабходна стварыць новы або паказаць наяўны ў вас Sеcurity group, які дазваляе рабіць канэкт праз ssh (порт 22) на інстанс. Абярыце там Source —> My IP і можаце запускаць інстанс.
Як толькі ён пяройдзе ў статут running, можна спрабаваць законнектіться на яго праз ssh.
Каб атрымаць магчымасць працы з Kinesis Agent, пасля паспяховага канэкту да машыны, неабходна ўвесці наступныя каманды ў тэрмінале:
Як відаць з файла канфігурацыі, агент будзе маніторыць у дырэкторыі /var/log/airline_tickets/ файлы з пашырэннем .log, парсіць іх і перадаваць у струмень airline_tickets.
Перазапускаем сэрвіс і пераконваемся, што ён запусціўся і працуе:
sudo service aws-kinesis-agent restart
Цяпер спампуем Python-скрыпт, які будзе запытваць дадзеныя ў API:
Скрыпт api_caller.py запытвае дадзеныя ў Aviasales і захоўвае атрыманы адказ у дырэкторыі, якую скануе Kinesis agent. Рэалізацыя гэтага скрыпту дастаткова стандартная, ёсць клас TicketsApi, ён дазваляе асінхронна тузаць API. У гэты клас перадаем загаловак з токенам і параметры запыту:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
Для тэставання правільнасці налад і працаздольнасці агента зробім тэставы запуск скрыпту api_caller.py:
sudo ./api_caller.py TOKEN
І глядзім вынік працы ў логах Агента і на закладцы Monitoring у струмені дадзеных airline_tickets:
Як відаць, усё працуе і Kinesis Agent паспяхова адпраўляе дадзеныя ў струмень. Цяпер наладзім consumer.
Настройка Kinesis Data Analytics
Пяройдзем да цэнтральнага кампанента ўсёй сістэмы - створым новае прыкладанне ў Kinesis Data Analytics з імем kinesis_analytics_airlines_app:
Kinesis Data Analytics дазваляе выконваць аналітыку дадзеных у рэальным часе з Kinesis Streams з дапамогай мовы SQL. Гэта цалкам аўтамаштабаваны сэрвіс (у адрозненне ад Kinesis Streams), які:
дазваляе ствараць новыя плыні (Output Stream) на аснове запытаў да зыходных дадзеных;
падае струмень з памылкамі, якія ўзніклі падчас працы прыкладанняў (Error Stream);
умее аўтаматычна вызначаць схему ўваходных дадзеных (яе можна ўручную перавызначыць пры неабходнасці).
Гэта нятанны сэрвіс - 0.11 USD за гадзіну працы, таму карыстацца ім варта акуратна і выдаляць пры завяршэнні працы.
Падключым дадатак да крыніцы дадзеных:
Выбіраемы струмень, да якога збіраемся падлучыцца (airline_tickets):
Далей, неабходна прынадаць новую IAM Роля для таго, каб прыкладанне магло чытаць з патоку і пісаць у паток. Для гэтага дастаткова нічога не мяняць у блоку Access permissions:
Цяпер запытаем выяўленне схемы дадзеных у струмені, для гэтага націскаем на кнопку "Discover schema". У выніку абновіцца (створыцца новая) роля IAM і будзе запушчана выяўленне схемы з дадзеных, якія ўжо прыляцелі ў струмень:
Цяпер неабходна перайсці ў рэдактар SQL. Пры націску на гэтую кнопку, выйдзе акно з пытаннем аб запуску прыкладання - выбіраемы што жадаем запусціць:
У акно рэдактара SQL уставім такі просты запыт і націсканы Save and Run SQL:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
У рэляцыйных базах дадзеных вы працуеце з табліцамі, выкарыстоўваючы аператары INSERT для дадання запісаў і аператар SELECT для запыту даных. У Amazon Kinesis Data Analytics вы працуеце з струменямі (STREAM) і "помпамі" (PUMP) - бесперапыннымі запытамі ўстаўкі, якія ўстаўляюць дадзеныя з аднаго патоку ў дадатку ў іншы паток.
У прадстаўленым вышэй SQL запыце адбываецца пошук квіткоў Аэрафлота па кошце ніжэй пяці тысяч рублёў. Усе запісы, якія трапляюць пад гэтыя ўмовы, будуць змешчаныя ў струмень DESTINATION_SQL_STREAM.
У блоку Destination выбіраемы струмень special_stream, а ў які расчыняецца спісе In-application stream name DESTINATION_SQL_STREAM:
У выніку ўсіх маніпуляцый павінна атрымацца нешта падобнае на карцінку ніжэй:
Стварэнне і падпіска на топік SNS
Пераходзім у сэрвіс Simple Notification Service і ствараем тамака новы топік з імем Airlines:
Афармляем падпіску на гэты топік, у ёй паказваем нумар мабільнага тэлефона, на які будуць прыходзіць СМС-паведамленні:
Стварэнне табліцы ў DynamoDB
Для захоўвання неапрацаваных дадзеных іх струменя airline_tickets, створым табліцу ў DynamoDB з такім жа імем. У якасці першаснага ключа будзем выкарыстоўваць record_id:
Стварэнне лямбда-функцыі collector
Створым лямбда-функцыю пад назовам Collector, задачай якой будзе апытанне струменя airline_tickets і, у выпадку знаходжання тамака новых запісаў, устаўка гэтых запісаў у табліцу DynamoDB. Відавочна, што апроч правоў па змаўчанні, гэтая лямбда павінна мець доступ да чытання струменя дадзеных Kinesis і запісы ў DynamoDB.
Стварэнне IAM ролі для лямбда-функцыі collector
Для пачатку створым новую IAM роля для лямбды з імем Lambda-TicketsProcessingRole:
Для тэставага прыкладу цалкам падыдуць пераднастроеныя полісі AmazonKinesisReadOnlyAccess і AmazonDynamoDBFullAccess, як паказана на малюнку ніжэй:
Дадзеная лямбда павінна запускацца па трыгеру ад Kinesis пры трапленні новых запісаў у струмень airline_stream, таму трэба дадаць новы трыгер:
Засталося ўставіць код і захаваць лямбду.
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
Стварэнне лямбда-функцыі notifier
Другая лямбда-функцыя, якая будзе маніторыць другі струмень (special_stream) і адпраўляць апавяшчэнне ў SNS, ствараецца аналагічна. Такім чынам, гэтая лямбда павінна мець доступ на чытанне з Kinesis і адпраўку паведамленняў у зададзены SNS-топік, які далей сервісам SNS будзе адпраўлены ўсім падпісчыкам гэтага топіка (email, SMS і г.д).
Стварэнне IAM ролі
Спачатку ствараем IAM роля Lambda-KinesisAlarm для гэтай лямбды, а потым прызначаем гэтую ролю для стваранай лямбды alarm_notifier:
Гэтая лямбда павінна працаваць па трыгеры на трапленне новых запісаў у струмень special_stream, таму неабходна наладзіць трыгер аналагічна таму, як мы гэта рабілі для лямбды Collector.
Для выгоды налады гэтай лямбды, увядзем новую зменную асяроддзі - TOPIC_ARN, куды змяшчаем ANR (Amazon Recourse Names) топіка Airlines:
І ўстаўляемы код лямбды, ён зусім нескладаны:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
Здаецца, на гэтым ручная настройка сістэмы завершана. Застаецца толькі пратэставаць і пераканацца ў тым, што мы настроілі ўсё правільна.
Дэплой з кода Terraform
Неабходная падрыхтоўка
Terraform - Вельмі зручны open-source інструмент для разгортвання інфраструктуры з кода. У яго свой сінтаксіс, які лёгка асвоіць і мноства прыкладаў, як і што разгарнуць. У рэдактары Atom або Visual Studio Code шмат зручных убудоў, якія дазваляюць палегчыць працу з Terraform.
Дыстрыбутыў спампаваць можна адсюль. Падрабязны разбор усіх магчымасцяў Terraform выходзіць за рамкі дадзенага артыкула, таму абмяжуемся асноўнымі момантамі.
як запусціць
Поўны код праекта ляжыць у маім рэпазітары. Клануем да сябе рэпазітар. Перад запускам неабходна пераканацца, што ў вас усталяваны і наладжаны AWS CLI, т.я. Terraform будзе шукаць уліковыя дадзеныя ў файле ~/.aws/credentials.
Добрай практыкай з'яўляецца перад дэплоем усёй інфраструктуры, запускаць каманду plan, каб паглядзець, што Terraform нам зараз стварае ў воблаку:
terraform.exe plan
Будзе прапанавана ўвесці нумар тэлефона для адпраўкі на яго апавяшчэнняў. На гэтым этапе яго ўводзіць неабавязкова.
Прааналізаваўшы план працы праграмы, можам запускаць стварэнне рэсурсаў:
terraform.exe apply
Пасля адпраўкі гэтай каманды зноў з'явіцца запыт на ўвядзенне нумара тэлефона, набіраем "yes", калі будзе паказана пытанне аб рэальным выкананні дзеянняў. Гэта дазволіць падняць усю інфраструктуру, правесці ўсю неабходную настройку EC2, разгарнуць лямбда-функцыі і г.д.
Пасля таго, як усе рэсурсы будуць паспяхова створаны праз код Terraform, неабходна зайсці ў дэталі дадатку Kinesis Analytics (на жаль, я не знайшоў як гэта зрабіць адразу з кода).
Запускаем дадатак:
Пасля гэтага неабходна відавочна задаць in-application stream name, абраўшы з які расчыняецца спісу:
Цяпер усё гатова да працы.
Тэставанне працы прыкладання
Незалежна, як вы дэплоілі сістэму, уручную ці праз код Terraform, працаваць яна будзе аднолькава.
Заходзім па SSH на віртуальную машыну EC2, дзе ўсталяваны Kinesis Agent і запускаем скрыпт api_caller.py
sudo ./api_caller.py TOKEN
Засталося дачакацца SMS на ваш нумар:
SMS - паведамленне прыходзіць на тэлефон практычна праз 1 хвіліну:
Засталося паглядзець, ці захаваліся запісы ў базе дадзеных DynamoDB для наступнага, больш дэталёвага аналізу. Табліца airline_tickets змяшчае прыкладна такія дадзеныя:
Заключэнне
Падчас праведзенай працы была пабудавана сістэма анлайн-апрацоўкі дадзеных на базе Amazon Kinesis. Былі разгледжаны варыянты выкарыстання Kinesis Agent у звязку з Kinesis Data Streams і рэал-тайм аналітыкай Kinesis Analytics пры дапамозе SQL каманд, а таксама ўзаемадзеянне Amazon Kinesis з іншымі сэрвісамі AWS.
Вышэйапісаную сістэму мы разгарнулі двума спосабамі: досыць доўгім ручным і хуткім з кода Terraform.