Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar

Hai Habr!

Kuna son jiragen sama? Ina son shi, amma yayin ware kai na kuma ƙaunaci nazarin bayanai kan tikitin jirgin sama daga sanannen hanya - Aviasales.

A yau za mu bincika aikin Amazon Kinesis, gina tsarin yawo tare da ƙididdigar lokaci na ainihi, shigar da Amazon DynamoDB NoSQL database a matsayin babban ajiyar bayanai, da kuma kafa sanarwar SMS don tikiti masu ban sha'awa.

Duk cikakkun bayanai suna ƙarƙashin yanke! Tafi!

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar

Gabatarwar

Misali, muna buƙatar samun dama ga API ɗin Aviasales. Ana ba da damar zuwa gare ta kyauta kuma ba tare da hani ba; kawai kuna buƙatar yin rajista a cikin sashin “Masu Haɓaka” don karɓar alamar API ɗin ku don samun damar bayanan.

Babban manufar wannan labarin shine don ba da cikakkiyar fahimta game da amfani da bayanan da ke gudana a cikin AWS; muna la'akari da cewa bayanan da API ɗin da aka yi amfani da su ba su dace da zamani ba kuma ana watsa su daga cache, wanda shine an kafa bisa binciken masu amfani da shafukan Aviasales.ru da Jetradar.com na awanni 48 da suka gabata.

Kinesis-agent, wanda aka sanya akan na'ura mai samarwa, wanda aka karɓa ta hanyar API za ta watsa ta atomatik kuma ta watsa bayanai zuwa rafi da ake so ta hanyar Kinesis Data Analytics. Za a rubuta danyen sigar wannan rafi kai tsaye zuwa kantin sayar da kayayyaki. Danyen ajiyar bayanan da aka tura a cikin DynamoDB zai ba da damar yin zurfafa nazarin tikiti ta hanyar kayan aikin BI, kamar AWS Quick Sight.

Za mu yi la'akari da zaɓuɓɓuka biyu don ƙaddamar da dukkanin kayan aikin:

  • Manual - ta hanyar AWS Gudanar da Console;
  • Kayayyakin kayan aiki daga lambar Terraform shine na masu sarrafa injin kasala;

Gine-gine na tsarin ci gaba

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Abubuwan da aka yi amfani da su:

  • API ɗin Aviasales - za a yi amfani da bayanan da wannan API ɗin ya dawo don duk aikin da ke gaba;
  • EC2 Mai samarwa Misali - injin kama-da-wane na yau da kullun a cikin gajimare wanda za a samar da kwararar bayanan shigar:
    • Kinesis Agent aikace-aikacen Java ne da aka sanya a cikin gida akan injin da ke ba da hanya mai sauƙi don tattarawa da aika bayanai zuwa Kinesis (Kinesis Data Streams ko Kinesis Firehose). Wakilin koyaushe yana lura da saitin fayiloli a cikin ƙayyadaddun kundayen adireshi kuma yana aika sabbin bayanai zuwa Kinesis;
    • Rubutun mai kiran API - Rubutun Python wanda ke yin buƙatun zuwa API kuma yana sanya amsa a cikin babban fayil wanda Wakilin Kinesis ke kulawa;
  • Kinesis Data Streams - sabis na yawo bayanai na ainihin-lokaci tare da iyawar ƙira mai faɗi;
  • Kinesis Analytics sabis ne marar uwar garken da ke sauƙaƙe nazarin bayanan yawo a ainihin lokacin. Amazon Kinesis Data Analytics yana daidaita albarkatun aikace-aikace kuma ta atomatik ma'auni don ɗaukar kowane girman bayanan mai shigowa;
  • AWS Lambda - sabis ɗin da ke ba ku damar gudanar da lambar ba tare da tallafi ko saita sabar ba. Ana ƙididdige duk ƙarfin kwamfuta ta atomatik don kowane kira;
  • DynamoDB na Amazon - Taskar bayanai na nau'i-nau'i masu mahimmanci da takaddun da ke ba da latency na ƙasa da mili seconds 10 lokacin aiki a kowane sikeli. Lokacin amfani da DynamoDB, ba kwa buƙatar samarwa, faci, ko sarrafa kowane sabar. DynamoDB yana daidaita ma'aunin tebur ta atomatik don daidaita adadin albarkatun da ake da su da kuma kula da babban aiki. Babu tsarin gudanar da tsarin da ake buƙata;
  • Amazon SNS - cikakken sabis ɗin da aka sarrafa don aika saƙonni ta amfani da samfurin wallafe-wallafen (Pub/Sub), wanda tare da shi za ku iya ware ƙananan ayyuka, tsarin rarrabawa da aikace-aikace marasa uwar garke. Ana iya amfani da SNS don aika bayanai zuwa ƙarshen masu amfani ta hanyar sanarwar turawa ta hannu, saƙonnin SMS da imel.

Horon farko

Don yin koyi da kwararar bayanai, na yanke shawarar yin amfani da bayanan tikitin jirgin sama da Aviasales API ya dawo. IN takardun quite m jerin daban-daban hanyoyin, bari mu dauki daya daga cikinsu - "Monthly Price Kalanda", wanda ya mayar da farashin ga kowace rana na wata-wata, harhada da adadin canja wuri. Idan ba ku ƙididdige watan nema a cikin buƙatun ba, za a dawo da bayanai na watan da ke biye da na yanzu.

Don haka, bari mu yi rajista kuma mu sami alamar mu.

Buƙatar misali tana ƙasa:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Hanyar da ke sama ta karɓar bayanai daga API ta hanyar ƙididdige alamar a cikin buƙatar za ta yi aiki, amma na fi so in wuce alamar shiga ta hanyar rubutun, don haka za mu yi amfani da wannan hanyar a cikin rubutun api_caller.py.

Misalin amsa:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Amsar API na misalin da ke sama yana nuna tikiti daga St. Petersburg zuwa Phuk... Oh, menene mafarki...
Tun da na fito daga Kazan, kuma Phuket yanzu "mafarki ne kawai", bari mu nemi tikiti daga St. Petersburg zuwa Kazan.

Yana ɗauka cewa kun riga kuna da asusun AWS. Ina so in jawo hankali na musamman ga gaskiyar cewa Kinesis da aika sanarwar ta hanyar SMS ba a haɗa su a cikin shekara-shekara. Matsayin Kyauta (amfani kyauta). Amma duk da haka, tare da dala biyu a hankali, yana yiwuwa a gina tsarin da aka tsara kuma a yi wasa da shi. Kuma, ba shakka, kar a manta da share duk albarkatun bayan an daina buƙatar su.

Abin farin ciki, ayyukan DynamoDb da lambda za su kasance kyauta gare mu idan muka cika iyakokinmu na kyauta na wata-wata. Misali, don DynamoDB: 25 GB na ajiya, 25 WCU/RCU da tambayoyin miliyan 100. Kuma ana kiran aikin lambda miliyan a kowane wata.

Aiwatar da tsarin da hannu

Saita Kinesis Data Streams

Bari mu je sabis ɗin Kinesis Data Streams kuma mu ƙirƙiri sababbin rafuka biyu, shard ɗaya ga kowane.

Menene shard?
Shard shine ainihin sashin canja wurin bayanai na rafin Amazon Kinesis. Sashe ɗaya yana ba da bayanan shigar da bayanai a gudun 1 MB/s da fitarwar bayanai a gudun 2 MB/s. Bangare ɗaya yana goyan bayan shigarwar PUT 1000 a sakan daya. Lokacin ƙirƙirar rafin bayanai, kuna buƙatar ƙayyade adadin da ake buƙata na sassan. Misali, zaku iya ƙirƙirar rafin bayanai tare da sassa biyu. Wannan rafi na bayanai zai samar da bayanan shigar da bayanai a 2 MB / s da kuma canja wurin bayanai a 4 MB / s, yana tallafawa har zuwa 2000 PUT records a sakan daya.

Yawan shards a cikin rafinku, mafi girman abin da ake samu. A ka'ida, wannan shine yadda ake daidaita magudanar ruwa - ta hanyar ƙara shards. Amma yawan shards da kuke da shi, mafi girman farashin. Kowane shard yana kashe cent 1,5 a kowace awa da ƙarin cent 1.4 na kowane rukunin biyan kuɗi na PUT miliyan.

Bari mu ƙirƙiri sabon rafi mai suna tikitin jirgin sama, 1 sharadi zai ishe shi:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Yanzu bari mu ƙirƙiri wani zaren da sunan musamman_ruwa:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar

Saitin furodusa

Don nazarin ɗawainiya, ya isa a yi amfani da misalin EC2 na yau da kullun azaman mai samar da bayanai. Ba dole ba ne ya zama na'ura mai ƙarfi, tsada mai tsada; tabo t2.micro zai yi daidai.

Muhimmin bayanin kula: misali, yakamata kuyi amfani da hoto - Amazon Linux AMI 2018.03.0, yana da ƙarancin saiti don ƙaddamar da Agent Kinesis da sauri.

Je zuwa sabis na EC2, ƙirƙirar sabon injin kama-da-wane, zaɓi AMI da ake so tare da nau'in t2.micro, wanda aka haɗa a cikin Tier Kyauta:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Domin sabuwar na'ura mai mahimmanci ta sami damar yin hulɗa tare da sabis na Kinesis, dole ne a ba shi haƙƙin yin haka. Hanya mafi kyau don yin wannan ita ce sanya aikin IAM. Don haka, a kan Mataki na 3: Tsara Allon Ƙididdigar Misali, ya kamata ka zaɓi Ƙirƙiri sabon Matsayin IAM:

Ƙirƙirar rawar IAM don EC2
Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
A cikin taga da ke buɗe, zaɓi cewa muna ƙirƙirar sabon matsayi don EC2 kuma je zuwa sashin Izinin:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Yin amfani da misalin horarwa, ba dole ba ne mu shiga cikin duk ɓangarorin ƙayyadaddun tsari na haƙƙin albarkatun ƙasa, don haka za mu zaɓi manufofin da Amazon ya riga ya tsara: AmazonKinesisFullAccess da CloudWatchFullAccess.

Bari mu ba da suna mai ma'ana don wannan rawar, misali: EC2-KinesisStreams-FullAccess. Sakamakon yakamata ya zama iri ɗaya kamar yadda aka nuna a hoton da ke ƙasa:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Bayan ƙirƙirar wannan sabuwar rawar, kar a manta ku haɗa ta zuwa misalin injin kama-da-wane da aka ƙirƙira:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Ba mu canza wani abu akan wannan allon kuma matsa zuwa windows na gaba.

Za a iya barin saitunan rumbun kwamfutarka azaman tsoho, da kuma tags (ko da yake yana da kyau a yi amfani da tags, aƙalla ba da misali suna da nuna yanayin).

Yanzu muna kan Mataki na 6: Sanya shafin rukunin Tsaro, inda kuke buƙatar ƙirƙirar sabo ko ƙirƙiri rukunin Tsaro na yanzu, wanda ke ba ku damar haɗa ta ssh (tashar jiragen ruwa 22) zuwa misali. Zaɓi Source -> My IP a can kuma zaka iya ƙaddamar da misalin.

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Da zaran ya canza zuwa matsayi mai gudana, zaku iya ƙoƙarin haɗa shi ta hanyar ssh.

Don samun damar yin aiki tare da Agent Kinesis, bayan nasarar haɗawa da injin, dole ne ku shigar da umarni masu zuwa a cikin tashar:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Bari mu ƙirƙiri babban fayil don adana martanin API:

sudo mkdir /var/log/airline_tickets

Kafin fara wakili, kuna buƙatar saita tsarin sa:

sudo vim /etc/aws-kinesis/agent.json

Abubuwan da ke cikin fayil agent.json yakamata suyi kama da wannan:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Kamar yadda ake iya gani daga fayil ɗin daidaitawa, wakilin zai saka idanu fayiloli tare da tsawo na .log a cikin /var/log/airline_tickets/ directory, rarraba su kuma canza su zuwa rafi na tikitin jirgin sama.

Muna sake kunna sabis ɗin kuma muna tabbatar da cewa yana aiki kuma yana gudana:

sudo service aws-kinesis-agent restart

Yanzu bari mu zazzage rubutun Python wanda zai nemi bayanai daga API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Rubutun api_caller.py yana buƙatar bayanai daga Aviasales kuma yana adana amsa da aka karɓa a cikin kundin adireshi wanda wakilin Kinesis ya bincika. Aiwatar da wannan rubutun daidai ne, akwai ajin TicketsApi, yana ba ku damar ja da API ɗin asynchronously. Mun wuce kan kai tare da alama kuma muna buƙatar sigogi zuwa wannan aji:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Don gwada daidaitattun saituna da ayyuka na wakili, bari mu gwada gudanar da rubutun api_caller.py:

sudo ./api_caller.py TOKEN

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Kuma muna duban sakamakon aikin a cikin rajistan ayyukan Agent da kuma akan shafin Kulawa a cikin rafin bayanan tikitin jirgin sama:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Kamar yadda kake gani, komai yana aiki kuma Wakilin Kinesis ya sami nasarar aika bayanai zuwa rafi. Yanzu bari mu saita mabukaci.

Kafa Kinesis Data Analytics

Bari mu matsa zuwa babban ɓangaren tsarin gaba ɗaya - ƙirƙirar sabon aikace-aikace a cikin Kinesis Data Analytics mai suna kinesis_analytics_airlines_app:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Kinesis Data Analytics yana ba ku damar yin nazarin bayanan lokaci na ainihi daga Kinesis Streams ta amfani da yaren SQL. Yana da cikakken sabis na atomatik (ba kamar Kinesis Streams) cewa:

  1. yana ba ku damar ƙirƙirar sabbin rafuka (Output Stream) dangane da buƙatun tushen bayanan;
  2. yana ba da rafi tare da kurakurai da suka faru yayin da aikace-aikacen ke gudana (Error Stream);
  3. na iya ƙayyade tsarin shigar da bayanan ta atomatik (ana iya sake fasalta shi da hannu idan ya cancanta).

Wannan ba sabis ne mai arha ba - 0.11 USD a kowace awa na aiki, don haka ya kamata ku yi amfani da shi a hankali kuma ku share shi idan kun gama.

Bari mu haɗa aikace-aikacen zuwa tushen bayanai:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Zaɓi rafin da za mu haɗa zuwa (airline_tickets):

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Bayan haka, kuna buƙatar haɗa sabon Role na IAM domin aikace-aikacen ya karanta daga rafi kuma ya rubuta zuwa rafi. Don yin wannan, ya isa kar a canza komai a cikin toshe izinin shiga:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Yanzu bari mu nemi gano tsarin bayanai a cikin rafi; don yin wannan, danna maɓallin "Gano makirci". Sakamakon haka, za a sabunta rawar IAM (za a ƙirƙiri wani sabo) kuma za a ƙaddamar da gano tsarin daga bayanan da ya riga ya isa cikin rafi:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Yanzu kuna buƙatar zuwa editan SQL. Lokacin da kuka danna wannan maɓallin, taga zai bayyana yana tambayar ku don ƙaddamar da aikace-aikacen - zaɓi abin da kuke son ƙaddamarwa:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Saka tambaya mai sauƙi mai zuwa a cikin taga editan SQL kuma danna Ajiye kuma Run SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

A cikin bayanan da suka danganci bayanai, kuna aiki tare da teburi ta amfani da bayanan INSERT don ƙara bayanai da bayanin SELECT zuwa bayanan tambaya. A cikin Amazon Kinesis Data Analytics, kuna aiki tare da rafuka (STREAMs) da famfo (PUMPs) - ci gaba da saka buƙatun da ke saka bayanai daga rafi ɗaya a cikin aikace-aikacen zuwa wani rafi.

Tambayar SQL da aka gabatar a sama tana neman tikitin Aeroflot akan farashi ƙasa da dubu biyar. Duk bayanan da suka cika waɗannan sharuɗɗan za a sanya su a cikin rafin DESTINATION_SQL_STREAM.

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
A cikin toshe Manufa, zaɓi madaidaicin rafi na musamman, kuma a cikin sunan rafi na cikin aikace-aikacen DESTINATION_SQL_STREAM jerin zaɓuka:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Sakamakon duk magudi ya kamata ya zama wani abu mai kama da hoton da ke ƙasa:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar

Ƙirƙirar da biyan kuɗi zuwa batun SNS

Je zuwa Sabis na Fadakarwa mai Sauƙi kuma ƙirƙirar sabon jigo a wurin tare da sunan Jiragen sama:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Biyan kuɗi zuwa wannan batu kuma nuna lambar wayar hannu wanda za a aika sanarwar SMS:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar

Ƙirƙiri tebur a DynamoDB

Don adana danyen bayanan daga rafin jirginsu_tikiti, bari mu ƙirƙiri tebur a DynamoDB mai suna iri ɗaya. Za mu yi amfani da record_id azaman maɓalli na farko:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar

Ƙirƙirar mai tara aikin lambda

Bari mu ƙirƙiri aikin lambda mai suna Collector, wanda aikinsa shine yin zaɓen rafi na tikitin jirgin sama kuma, idan an sami sabbin bayanai a wurin, saka waɗannan bayanan a cikin tebur DynamoDB. Babu shakka, ban da haƙƙin tsoho, dole ne wannan lambda ya karanta damar zuwa rafin bayanan Kinesis kuma ya rubuta damar zuwa DynamoDB.

Ƙirƙirar rawar IAM don aikin lambda mai tarawa
Da farko, bari mu ƙirƙiri sabuwar rawar IAM don lambda mai suna Lambda-TicketsProcessingRole:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Don misalin gwajin, AmazonKinesisReadOnlyAccess da aka riga aka tsara da AmazonDynamoDBFullAccess manufofin sun dace sosai, kamar yadda aka nuna a hoton da ke ƙasa:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar

Ya kamata a ƙaddamar da wannan lambda ta hanyar faɗakarwa daga Kinesis lokacin da sabbin shigarwar shiga jirgin sama_stream, don haka muna buƙatar ƙara sabon faɗakarwa:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Abin da ya rage shi ne saka lambar da ajiye lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Ƙirƙirar mai sanar da aikin lambda

Aikin lambda na biyu, wanda zai lura da rafi na biyu (special_stream) da aika sanarwa zuwa SNS, an ƙirƙira shi ta irin wannan hanya. Don haka, wannan lambda dole ne ya sami damar karantawa daga Kinesis kuma aika saƙonni zuwa wani batun SNS da aka bayar, wanda sabis ɗin SNS zai aika zuwa duk masu biyan kuɗi na wannan batu (email, SMS, da sauransu).

Ƙirƙirar rawar IAM
Da farko, mun ƙirƙiri aikin IAM Lambda-KinesisAlarm don wannan lambda, sannan mu sanya wannan rawar ga ƙararrawa_notifier lambda da ake ƙirƙira:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar

Wannan lambda yakamata yayi aiki akan faɗakarwa don sabbin bayanai don shigar da special_stream, don haka kuna buƙatar saita faɗakarwa kamar yadda muka yi don lambda Collector.

Don sauƙaƙe daidaita wannan lambda, bari mu gabatar da sabon canjin yanayi - TOPIC_ARN, inda muka sanya ANR (Amazon Recourse Names) na taken Jirgin:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Kuma saka lambar lambda, ba ta da wahala ko kaɗan:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Da alama a nan ne aka kammala saitin tsarin da hannu. Abin da ya rage shi ne gwadawa da tabbatar da cewa mun daidaita komai daidai.

Ajiye daga lambar Terraform

Da ake bukata shiri

Terraform kayan aiki ne mai sauƙin buɗewa don tura abubuwan more rayuwa daga lamba. Yana da nasa juzu'i mai sauƙin koya kuma yana da misalai da yawa na yadda da abin da za a tura. Editan Atom ko Kayayyakin Kayayyakin Kayayyakin Kayayyakin Kayayyakin Yanar Gizo yana da fa'idodi masu yawa masu amfani waɗanda ke sauƙaƙe aiki tare da Terraform.

Kuna iya saukar da rarrabawa daga nan. Cikakken bincike na duk damar Terraform ya wuce iyakar wannan labarin, don haka za mu iyakance kanmu ga manyan abubuwan.

Yadda za'a fara

Cikakken lambar aikin shine a cikin ma'adanata. Mun rufe ma'ajiyar ga kanmu. Kafin farawa, kuna buƙatar tabbatar da cewa kun shigar da AWS CLI kuma an daidaita shi, saboda ... Terraform zai nemi takaddun shaida a cikin fayil ɗin ~/.aws/credentials.

Kyakkyawan aiki shine gudanar da umarnin shirin kafin tura duk abubuwan more rayuwa don ganin abin da Terraform ke ƙirƙira mana a halin yanzu a cikin gajimare:

terraform.exe plan

Za a sa ka shigar da lambar waya don aika sanarwa zuwa gare ta. Ba lallai ba ne a shigar da shi a wannan matakin.

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Bayan nazarin tsarin aiki na shirin, za mu iya fara samar da albarkatu:

terraform.exe apply

Bayan aika wannan umarni, za a sake tambayarka don shigar da lambar waya; buga "eh" lokacin da aka nuna tambaya game da aiwatar da ayyukan. Wannan zai ba ku damar saita duk abubuwan more rayuwa, aiwatar da duk daidaitaccen tsarin EC2, tura ayyukan lambda, da sauransu.

Bayan an sami nasarar ƙirƙirar duk albarkatun ta hanyar lambar Terraform, kuna buƙatar shiga cikin cikakkun bayanai game da aikace-aikacen Kinesis Analytics (abin takaici, ban sami yadda ake yin wannan kai tsaye daga lambar ba).

Kaddamar da aikace-aikacen:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Bayan wannan, dole ne ku fito fili saita sunan rafi na cikin aikace-aikacen ta zaɓi daga jerin abubuwan da aka saukar:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Yanzu komai ya shirya don tafiya.

Gwajin aikace-aikacen

Ko da yaya kuka tura tsarin, da hannu ko ta hanyar lambar Terraform, zai yi aiki iri ɗaya.

Muna shiga ta hanyar SSH zuwa injin kama-da-wane na EC2 inda aka shigar da Agent Kinesis kuma muna gudanar da rubutun api_caller.py

sudo ./api_caller.py TOKEN

Abin da kawai za ku yi shi ne jira SMS zuwa lambar ku:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
SMS- saƙon ya zo kan wayar a cikin kusan minti 1:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar
Ya rage don ganin ko an adana bayanan a cikin bayanan DynamoDB don ƙarin cikakken bincike na gaba. Teburin tikitin jirgin sama ya ƙunshi kusan bayanai masu zuwa:

Haɗin APIasales API tare da Amazon Kinesis da sauƙi mara sabar

ƙarshe

A cikin aikin da aka yi, an gina tsarin sarrafa bayanan kan layi bisa Amazon Kinesis. Zaɓuɓɓuka don yin amfani da Wakilin Kinesis tare da Kinesis Data Streams da kuma nazari na ainihi Kinesis Analytics ta amfani da umarnin SQL, da kuma hulɗar Amazon Kinesis tare da sauran ayyukan AWS an yi la'akari da su.

Mun ƙaddamar da tsarin da ke sama ta hanyoyi biyu: mai tsayi mai tsayi da kuma mai sauri daga lambar Terraform.

Ana samun duk lambar tushen aikin a cikin ma'ajiyar GitHub dina, Ina ba da shawarar ku san kanku da shi.

Na yi farin cikin tattauna labarin, ina sa ran maganganun ku. Ina fatan za a sami ma'ana mai ma'ana.

Ina yi muku fatan nasara!

source: www.habr.com

Add a comment