Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк

Эй Хабр!

Сиз учкан учактарды жакшы көрөсүзбү? Мага аны жакшы көрөм, бирок өзүнчө обочолонуу учурунда мен бир белгилүү ресурстан - Aviasalesтен авиабилеттер боюнча маалыматтарды талдоону сүйүп калдым.

Бүгүн биз Amazon Kinesis ишин талдайбыз, реалдуу убакыт аналитикасы менен агымдык системаны курабыз, Amazon DynamoDB NoSQL маалымат базасын негизги маалымат сактагыч катары орнотобуз жана кызыктуу билеттер үчүн SMS эскертмелерин орнотобуз.

Бардык майда-чүйдөсүнө чейин кесип астында! Go!

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк

тааныштыруу

Мисалы, бизге кирүү мүмкүнчүлүгү керек Aviasales API. Ага кирүү акысыз жана чектөөсүз берилет; маалыматтарга жетүү үчүн API белгисин алуу үчүн "Иштеп чыгуучулар" бөлүмүндө катталуу керек.

Бул макаланын негизги максаты - AWSде маалымат агымын колдонуу жөнүндө жалпы түшүнүк берүү; биз колдонулган API тарабынан кайтарылган маалыматтар такыр жаңыртылган эмес жана кэштен берилээрин эске алабыз. Aviasales.ru жана Jetradar.com сайттарынын колдонуучуларынын акыркы 48 саат ичиндеги издөөлөрүнүн негизинде түзүлгөн.

API аркылуу кабыл алынган, өндүрүүчү машинага орнотулган Kinesis-агент Kinesis Data Analytics аркылуу маалыматтарды автоматтык түрдө талдап, керектүү агымга өткөрүп берет. Бул агымдын чийки версиясы түз дүкөнгө жазылат. DynamoDBде орнотулган чийки маалымат сактагычы AWS Quick Sight сыяктуу BI куралдары аркылуу билеттерди тереңирээк талдоо мүмкүнчүлүгүн берет.

Биз бардык инфраструктураны жайылтуу үчүн эки вариантты карап чыгабыз:

  • Кол менен - ​​AWS башкаруу консолу аркылуу;
  • Terraform кодунун инфраструктурасы жалкоо автоматтар үчүн;

Иштеп чыккан системанын архитектурасы

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Колдонулган компоненттер:

  • Aviasales API — бул API тарабынан кайтарылган маалыматтар бардык кийинки иштер үчүн колдонулат;
  • EC2 Продюсер инстанциясы — киргизүү маалымат агымы түзүлө турган булуттагы кадимки виртуалдык машина:
    • Kinesis Agent бул Kinesis (Kinesis Data Streams же Kinesis Firehose) кызматына маалыматтарды чогултуунун жана жөнөтүүнүн оңой жолун камсыз кылган машинада жергиликтүү түрдө орнотулган Java колдонмосу. Агент ар дайым көрсөтүлгөн каталогдордогу файлдардын топтомун көзөмөлдөйт жана Kinesisге жаңы маалыматтарды жөнөтөт;
    • API Caller Script — API'ге суроо-талаптарды жасаган жана жоопту Kinesis агенти көзөмөлдөгөн папкага салган Python скрипти;
  • Kinesis Data Streams — кеңири масштабдоо мүмкүнчүлүктөрү менен реалдуу убакыт режиминдеги маалыматтарды агымдык тейлөө;
  • Kinesis Analytics реалдуу убакытта агымдык маалыматтарды талдоону жөнөкөйлөтүүчү серверсиз кызмат. Amazon Kinesis Data Analytics колдонмо ресурстарын конфигурациялайт жана келген маалыматтардын каалаган көлөмүн иштетүү үчүн автоматтык түрдө масштабдашат;
  • AWS Lambda — серверлердин резервдик көчүрмөсүн сактабастан же орнотуусуз кодду иштетүүгө мүмкүндүк берүүчү кызмат. Бардык эсептөө күчү автоматтык түрдө ар бир чалуу үчүн масштабдалат;
  • Amazon DynamoDB - Каалаган масштабда иштегенде 10 миллисекунддан аз күтүү мөөнөтүн камсыз кылган ачкыч-маани түгөйлөрүнүн жана документтердин маалымат базасы. DynamoDB колдонууда сизге серверлерди камсыздоонун, оңдоонун же башкаруунун кереги жок. DynamoDB колдо болгон ресурстардын көлөмүн тууралоо жана жогорку өндүрүмдүүлүктү сактоо үчүн таблицаларды автоматтык түрдө таразалайт. Системалык башкаруу талап кылынбайт;
  • Amazon SNS - жарыялоочу-абоненттик (Pub/Sub) моделин колдонуу менен билдирүүлөрдү жөнөтүү үчүн толук башкарылуучу кызмат, анын жардамы менен сиз микросервистерди, бөлүштүрүлгөн системаларды жана серверсиз тиркемелерди изоляциялай аласыз. SNS мобилдик push эскертмелери, SMS билдирүүлөр жана электрондук почта аркылуу акыркы колдонуучуларга маалымат жөнөтүү үчүн колдонулушу мүмкүн.

Баштапкы тренинг

Берилиштер агымын туураш үчүн, мен Aviasales API тарабынан кайтарылган авиабилеттер маалыматын колдонууну чечтим. IN документтер ар кандай ыкмалардын кеңири тизмеси, алардын бирин алып көрөлү - которуулардын саны боюнча топтоштурулган айдын ар бир күнү үчүн бааларды кайтарып турган "Айлык баа календары". Сурамда издөө айын көрсөтпөсөңүз, маалымат учурдагыдан кийинки ай үчүн кайтарылат.

Андыктан, келгиле, катталып, токенибизди алалы.

Мисал суроо төмөндө:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Сурамда токенди көрсөтүү менен APIден маалыматтарды алуунун жогорудагы ыкмасы иштейт, бирок мен кирүү белгисин баш аты аркылуу өткөрүүнү туура көрөм, ошондуктан биз бул ыкманы api_caller.py скриптинде колдонобуз.

Жооптун мисалы:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Жогорудагы мисал API жообу Санкт-Петербургдан Пхукка билетти көрсөтөт ... О, кандай түш ...
Мен Казандан болгондуктан, Пхукет азыр "жөн гана кыял" болгондуктан, Санкт-Петербургдан Казанга билеттерди издейли.

Бул сизде AWS каттоо эсебиңиз бар деп болжолдойт. Мен дароо өзгөчө көңүл бургум келет, Kinesis жана SMS аркылуу билдирүүлөрдү жөнөтүү жылдык эсепке кирбейт. Акысыз деңгээл (акысыз колдонуу). Бирок буга карабастан, бир-эки долларды эске алуу менен, сунушталган системаны куруп, аны менен ойноого толук мүмкүн. Жана, албетте, керексиз болуп калгандан кийин бардык ресурстарды жок кылууну унутпаңыз.

Бактыга жараша, DynamoDb жана lambda функциялары биз үчүн акысыз болот, эгерде биз айлык акысыз чектөөлөрүбүздү аткарсак. Мисалы, DynamoDB үчүн: 25 ГБ сактагыч, 25 WCU/RCU жана 100 миллион суроо. Ал эми айына миллион ламбда функциясы чалуу.

Кол менен системаны жайылтуу

Kinesis маалымат агымдарын орнотуу

Келгиле, Kinesis Data Streams кызматына барып, эки жаңы агым түзөлү, ар бири үчүн бирден сынык.

сынык деген эмне?
Сынык Amazon Kinesis агымынын негизги маалымат берүү бирдиги болуп саналат. Бир сегмент 1 МБ/сек ылдамдыкта кириш маалыматтарды берүүнү жана 2 МБ/сек ылдамдыкта маалыматтарды берүүнү камсыз кылат. Бир сегмент секундасына 1000 PUT жазуусун колдойт. Маалымат агымын түзүп жатканда, сегменттердин керектүү санын көрсөтүү керек. Мисалы, сиз эки сегменттен турган маалымат агымын түзө аласыз. Бул маалымат агымы секундасына 2 PUT жазуусун колдогон 4 МБ/сек ылдамдыкта кириш маалыматын жана 2000 МБ/сек ылдамдыкта маалымат берүүнү камсыз кылат.

Агымыңыздагы сыныктар канчалык көп болсо, анын өткөрүү жөндөмдүүлүгү ошончолук чоң болот. Негизи, агымдардын масштабы ушундай - сыныктарды кошуу менен. Бирок сыныктар канчалык көп болсо, баасы ошончолук жогору болот. Ар бир сынык саатына 1,5 цент жана ар бир миллион PUT пайдалуу жүк бирдиги үчүн кошумча 1.4 цент турат.

аттуу жаңы агым түзөлү учак билеттери, 1 сынык ага жетиштүү болот:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Эми аты менен дагы бир жип түзөлү өзгөчө_агым:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк

Продюсер орнотуу

Тапшырманы талдоо үчүн, маалымат өндүрүүчүсү катары кадимки EC2 инстанциясын колдонуу жетиштүү. Бул күчтүү, кымбат виртуалдык машина болбошу керек; t2.micro спот жакшы иштейт.

Маанилүү эскертүү: мисалы, сиз сүрөттү колдонушуңуз керек - Amazon Linux AMI 2018.03.0, анын Kinesis агентин тез ишке киргизүү үчүн жөндөөлөрү азыраак.

EC2 кызматына өтүңүз, жаңы виртуалдык машинаны түзүңүз, Акысыз деңгээлге кирген t2.micro түрү менен каалаган AMIди тандаңыз:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Жаңы түзүлгөн виртуалдык машина Kinesis кызматы менен иштеше алышы үчүн, ага укук берилиши керек. Муну жасоонун эң жакшы жолу - IAM ролун дайындоо. Ошондуктан, 3-кадам: Конфигурациялоо Instance Details экранында, сиз тандоо керек Жаңы IAM ролун түзүү:

EC2 үчүн IAM ролун түзүү
Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Ачылган терезеде EC2 үчүн жаңы ролду түзүп жатканыбызды тандап, Уруксаттар бөлүмүнө өтүңүз:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Тренингдин мисалын колдонуу менен биз ресурстук укуктардын майдаланган конфигурациясынын бардык татаалдыктарына кирүүнүн кереги жок, ошондуктан биз Amazon тарабынан алдын ала конфигурацияланган саясаттарды тандайбыз: AmazonKinesisFullAccess жана CloudWatchFullAccess.

Келгиле, бул ролго кандайдыр бир мааниге ээ ат берели, мисалы: EC2-KinesisStreams-FullAccess. Натыйжа төмөнкү сүрөттө көрсөтүлгөндөй болушу керек:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Бул жаңы ролду жараткандан кийин, аны түзүлгөн виртуалдык машина инстанциясына тиркөөнү унутпаңыз:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Биз бул экранда башка эч нерсени өзгөртпөйбүз жана кийинки терезелерге өтөбүз.

Катуу дисктин жөндөөлөрү демейки катары калтырылышы мүмкүн, ошондой эле тегдер (тегдерди колдонуу жакшы практика болсо да, жок дегенде инстанцияга ат берип, чөйрөнү көрсөтөт).

Азыр биз 6-кадам: Коопсуздук тобун конфигурациялоо өтмөгүндө турабыз, анда жаңысын түзүшүңүз керек же ssh (порт 22) аркылуу инстанцияга туташууга мүмкүндүк берген учурдагы Коопсуздук тобуңузду көрсөтүшүңүз керек. Булак -> Менин IP дегенди тандаңыз жана сиз инстанцияны иштете аласыз.

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Ал иштеп жаткан абалга өтөөр замат, ага ssh аркылуу туташууга аракет кылсаңыз болот.

Kinesis Agent менен иштөө үчүн, машинага ийгиликтүү туташкандан кийин, терминалга төмөнкү буйруктарды киргизишиңиз керек:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

API жоопторун сактоо үчүн папканы түзөлү:

sudo mkdir /var/log/airline_tickets

Агентти баштоодон мурун анын конфигурациясын конфигурациялашыңыз керек:

sudo vim /etc/aws-kinesis/agent.json

agent.json файлынын мазмуну төмөнкүдөй болушу керек:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Конфигурация файлынан көрүнүп тургандай, агент /var/log/airline_tickets/ каталогундагы .log кеңейтүүсү бар файлдарды көзөмөлдөйт, аларды талдап, airline_tickets агымына өткөрүп берет.

Кызматты кайра иштетип, анын иштеп жатканын текшеребиз:

sudo service aws-kinesis-agent restart

Эми APIден маалыматтарды талап кыла турган Python скриптин жүктөп алалы:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

api_caller.py скрипти Aviasalesтен берилиштерди сурайт жана алынган жоопту Kinesis агенти сканерлеген каталогдо сактайт. Бул скрипттин ишке ашырылышы абдан стандарттуу, TicketsApi классы бар, ал APIди асинхрондук түрдө тартууга мүмкүндүк берет. Бул класска токен жана сурам параметрлери менен башты өткөрүп беребиз:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Агенттин туура орнотууларын жана иштешин текшерүү үчүн api_caller.py скриптин иштетип көрөлү:

sudo ./api_caller.py TOKEN

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Жана биз иштин жыйынтыгын Агенттин журналдарынан жана airline_tickets маалымат агымындагы Мониторинг өтмөгүнөн карайбыз:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Көрүнүп тургандай, баары иштейт жана Kinesis Agent агымга маалыматтарды ийгиликтүү жөнөтөт. Эми керектөөчүнү конфигурациялайлы.

Kinesis Data Analytics орнотуу

Келгиле, бүт системанын борбордук компонентине өтөбүз - Kinesis Data Analyticsте kinesis_analytics_airlines_app аттуу жаңы тиркемени түзүңүз:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Kinesis Data Analytics сизге SQL тилин колдонуу менен Kinesis Streams'тен реалдуу убакыт режиминде маалымат аналитикасын жүргүзүүгө мүмкүндүк берет. Бул толугу менен автоскалдаштыруу кызматы (Kinesis Streamsтен айырмаланып):

  1. булак маалыматтарына суроо-талаптардын негизинде жаңы агымдарды (Output Stream) түзүүгө мүмкүндүк берет;
  2. колдонмолор иштеп жатканда пайда болгон каталар менен агымды камсыз кылат (Error Stream);
  3. киргизүү маалымат схемасын автоматтык түрдө аныктай алат (зарыл болсо кол менен кайра аныкталышы мүмкүн).

Бул арзан кызмат эмес - жумуш саатына 0.11 АКШ доллары, андыктан аны кылдаттык менен колдонуп, бүткөндөн кийин өчүрүү керек.

Колдонмону маалымат булагына туташтыралы:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Биз кошула турган агымды тандаңыз (авиакомпания_билеттери):

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Андан кийин, колдонмо агымдан окуп, агымга жаза алышы үчүн жаңы IAM ролун тиркөөңүз керек. Бул үчүн, Access уруксаттар блогунда эч нерсени өзгөртпөө жетиштүү:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Эми агымдагы маалымат схемасын ачууну сураналы; бул үчүн, "Схеманы табуу" баскычын чыкылдатыңыз. Натыйжада, IAM ролу жаңыртылат (жаңысы түзүлөт) жана агымга келген маалыматтардан схемаларды аныктоо ишке киргизилет:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Эми сиз SQL редакторуна барышыңыз керек. Бул баскычты басканда, сизден колдонмону ишке киргизүүнү суранган терезе пайда болот - эмнени ишке киргизгиңиз келгенин тандаңыз:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
SQL редакторунун терезесине төмөнкү жөнөкөй суроону киргизиңиз жана SQLди сактоо жана иштетүү баскычын чыкылдатыңыз:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Реляциялык маалымат базаларында сиз жазууларды кошуу үчүн INSERT операторлорун жана маалыматтарды суроо үчүн SELECT операторун колдонуп таблицалар менен иштейсиз. Amazon Kinesis Data Analytics'те сиз агымдар (STREAMs) жана насостор (PUMPs) менен иштейсиз — тиркемедеги бир агымдагы маалыматтарды башка агымга киргизген үзгүлтүксүз киргизүү сурамдары.

Жогоруда берилген SQL сурамы беш миң рублдан төмөн баадагы Aeroflot билеттерин издейт. Бул шарттарга жооп берген бардык жазуулар DESTINATION_SQL_STREAM агымына жайгаштырылат.

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Destination блогунда special_stream агымын жана In-application stream name DESTINATION_SQL_STREAM ылдый түшүүчү тизмеден тандаңыз:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Бардык манипуляциялардын натыйжасы төмөндөгү сүрөттөгүдөй болушу керек:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк

SNS темасын түзүү жана ага жазылуу

Жөнөкөй кабарлоо кызматына өтүңүз жана ал жерде авиакомпаниялар деген жаңы тема түзүңүз:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Бул темага жазылыңыз жана SMS билдирүүлөр жөнөтүлө турган мобилдик телефондун номерин көрсөтүңүз:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк

DynamoDBде таблица түзүңүз

Алардын airline_tickets агымынан чийки маалыматтарды сактоо үчүн, келгиле, DynamoDBде ошол эле ат менен таблица түзөлү. Биз негизги ачкыч катары record_id колдонобуз:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк

Ламбда функциясынын коллекторун түзүү

Келгиле, Collector деп аталган lambda функциясын түзөлү, анын милдети авиакомпания_tickets агымын сурамжылоо жана ал жерден жаңы жазуулар табылса, бул жазууларды DynamoDB таблицасына салыңыз. Албетте, демейки укуктардан тышкары, бул ламбда Kinesis маалымат агымын окуу мүмкүнчүлүгүнө жана DynamoDBке жазуу мүмкүнчүлүгүнө ээ болушу керек.

Коллектор ламбда функциясы үчүн IAM ролун түзүү
Биринчиден, келгиле, Lambda-TicketsProcessingRole аттуу ламбда үчүн жаңы IAM ролун түзөлү:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Сыноо мисалы үчүн, алдын ала конфигурацияланган AmazonKinesisReadOnlyAccess жана AmazonDynamoDBFullAccess саясаттары төмөндөгү сүрөттө көрсөтүлгөндөй, абдан ылайыктуу:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк

Бул ламбда Kinesis триггери тарабынан airline_streamге жаңы жазуулар киргенде ишке киргизилиши керек, ошондуктан биз жаңы триггерди кошушубуз керек:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Кодду киргизүү жана ламбданы сактоо гана калды.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Lambda функциясынын кабарлоочусун түзүү

Экинчи агымга (special_stream) көз салып, SNSге эскертме жөнөтө турган экинчи ламбда функциясы да ушундай жол менен түзүлгөн. Ошондуктан, бул ламбда Kinesisден окууга жана берилген SNS темасына билдирүүлөрдү жөнөтүүгө мүмкүнчүлүгү болушу керек, андан кийин бул теманын бардык абоненттерине SNS кызматы тарабынан жөнөтүлөт (электрондук почта, SMS ж.б.).

IAM ролун түзүү
Биринчиден, биз бул ламбда үчүн IAM ролун Lambda-KinesisAlarm түзүп, андан кийин бул ролду түзүлүп жаткан alarm_notifier lambdaга дайындайбыз:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк

Бул ламбда special_streamге кирүү үчүн жаңы жазуулар үчүн триггерде иштеши керек, андыктан сиз триггерди Collector lambda үчүн кылгандай конфигурациялашыңыз керек.

Бул ламбданы конфигурациялоону жеңилдетүү үчүн, келгиле, жаңы чөйрө өзгөрмөсүн киргизели - TOPIC_ARN, анда биз Авиакомпаниялар темасынын ANR (Amazon Recourse Names) жайгаштырабыз:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Жана ламбда кодун киргизиңиз, бул такыр татаал эмес:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Бул жерде кол менен система конфигурациясы аяктады окшойт. Баарын туура конфигурациялаганыбызды текшерип көрүү гана калды.

Terraform кодунан жайгаштыруу

Керектүү даярдык

Terraform коддон инфраструктураны жайылтуу үчүн абдан ыңгайлуу ачык булак куралы. Анын өз синтаксиси бар, аны үйрөнүү оңой жана кантип жана эмнени колдонуу керектиги боюнча көптөгөн мисалдар бар. Atom редактору же Visual Studio Code Terraform менен иштөөнү жеңилдеткен көптөгөн ыңгайлуу плагиндерге ээ.

Сиз бөлүштүрүүнү жүктөп алсаңыз болот бул жерде. Терраформдун бардык мүмкүнчүлүктөрүн деталдуу талдоо бул макаланын алкагына кирбейт, ошондуктан биз негизги пункттар менен чектелебиз.

Кантип баштоо керек

Долбоордун толук коду болуп саналат менин репозиторийде. Биз репозиторийди өзүбүзгө клондойбуз. Баштоодон мурун, сизде AWS CLI орнотулганын жана конфигурацияланганын текшеришиңиз керек, анткени... Terraform ~/.aws/credentials файлынан эсептик дайындарды издейт.

Жакшы практика булутта Terraform биз үчүн эмне түзүп жатканын көрүү үчүн бүт инфраструктураны жайылтуудан мурун план буйругун иштетүү:

terraform.exe plan

Сизден эскертмелерди жөнөтүү үчүн телефон номерин киргизүү сунушталат. Бул этапта аны киргизүү зарыл эмес.

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Программанын иш планын талдап чыгып, биз ресурстарды түзө баштасак болот:

terraform.exe apply

Бул буйрукту жөнөткөндөн кийин, сизден кайрадан телефон номерин киргизүү талап кылынат; иш-аракеттерди иш жүзүндө аткаруу жөнүндө суроо көрсөтүлгөндө "ооба" деп териңиз. Бул сизге бүт инфраструктураны орнотууга, EC2 бардык керектүү конфигурацияларын жүргүзүүгө, ламбда функцияларын жайылтууга ж.б.

Бардык ресурстар Terraform коду аркылуу ийгиликтүү түзүлгөндөн кийин, сиз Kinesis Analytics тиркемесинин деталдарына киришиңиз керек (тилекке каршы, мен муну түздөн-түз коддон кантип жасоону таба алган жокмун).

Колдонмону ишке киргизиңиз:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Андан кийин, сиз ылдый түшүүчү тизмеден тандоо менен колдонмодогу агымдын атын так коюшуңуз керек:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Азыр баары барууга даяр.

Колдонмо сыналууда

Системаны кол менен же Terraform коду аркылуу кантип орнотконуңузга карабастан, ал бирдей иштейт.

Биз SSH аркылуу Kinesis Agent орнотулган EC2 виртуалдык машинасына кирип, api_caller.py скриптин иштетебиз

sudo ./api_caller.py TOKEN

Болгону номериңизге SMS келишин күтсөңүз болот:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
SMS - билдирүү телефонго дээрлик 1 мүнөттө келет:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк
Кийинки, деталдуу талдоо үчүн жазуулар DynamoDB маалымат базасында сакталганбы же жокпу, көрүш керек. Airline_tickets таблицасы болжол менен төмөнкү маалыматтарды камтыйт:

Amazon Kinesis менен Aviasales API интеграциясы жана серверсиз жөнөкөйлүк

жыйынтыктоо

Аткарылган иштин жүрүшүндө Amazon Kinesis негизинде маалыматтарды онлайн иштетүү системасы курулган. Kinesis агентин Kinesis Data Streams жана SQL буйруктарын колдонуу менен реалдуу убакыт режиминдеги Kinesis Analytics аналитикасы менен бирге колдонуу параметрлери, ошондой эле Amazon Kinesisтин башка AWS кызматтары менен өз ара аракеттенүүсү каралды.

Биз жогорудагы системаны эки жол менен орноттук: бир кыйла узун кол менен жана Terraform кодунан тез.

Бардык долбоордун баштапкы коду жеткиликтүү менин GitHub репозиторийимде, Мен сизге аны менен таанышууну сунуштайм.

Мен макаланы талкуулоого кубанычтамын, комментарийлериңизди чыдамсыздык менен күтөм. Мен конструктивдүү сынга үмүттөнөм.

Ийгилик каалайм!

Source: www.habr.com

Комментарий кошуу