Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva

Hayi Habr!

Ngaba uyazithanda iinqwelomoya ezibhabhayo? Ndiyayithanda, kodwa ngexesha lokuzimela ndedwa ndaye ndathandana nokuhlalutya idatha kumatikiti omoya avela kwisixhobo esinye esaziwayo-Aviasales.

Namhlanje siza kuhlalutya umsebenzi we-Amazon Kinesis, ukwakha inkqubo yokusasaza kunye ne-analytics yexesha langempela, faka i-Amazon DynamoDB NoSQL database njengeyona ndawo yokugcina idatha, kwaye usethe izaziso ze-SMS kwiitikiti ezinomdla.

Zonke iinkcukacha ziphantsi kokunqunyulwa! Hamba!

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva

Intshayelelo

Umzekelo, sifuna ukufikelela Aviasales API. Ukufikelela kuyo kunikezelwa simahla kwaye ngaphandle kwezithintelo; kufuneka nje ubhalise kwicandelo elithi "Abaphuhlisi" ukufumana ithokheni yakho ye-API ukufikelela kwidatha.

Injongo ephambili yeli nqaku kukunika ukuqonda ngokubanzi kokusetyenziswa kokusasazwa kolwazi kwi-AWS; sithathela ingqalelo ukuba idatha ebuyisiwe yi-API esetyenzisiweyo ayigcinwanga ngokungqongqo kwaye idluliselwa kwi-cache, eyiyo. yenziwe ngokusekwe kukhangelo lwabasebenzisi beendawo ze-Aviasales.ru kunye neJetradar.com kwiiyure ezingama-48 zokugqibela.

I-Kinesis-agent, efakwe kumatshini wokuvelisa, efunyenwe nge-API iya kucazulula ngokuzenzekelayo kwaye idlulisele idatha kumlambo ofunwayo nge-Kinesis Data Analytics. Inguqulelo ekrwada yalo msinga iya kubhalwa ngqo kwivenkile. Ukugcinwa kwedatha eluhlaza esetyenziswe kwi-DynamoDB kuya kuvumela uhlalutyo olunzulu lwetikiti ngokusebenzisa izixhobo ze-BI, ezifana ne-AWS Quick Sight.

Siza kuqwalasela iindlela ezimbini zokusebenzisa iziseko zophuhliso:

  • Incwadana - nge-AWS Management Console;
  • Iziseko ezingundoqo ezisuka kwikhowudi yeTerraform zenzelwe ii-automator ezinobuvila;

Uyilo lwenkqubo ephuhlisiweyo

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Amacandelo asetyenzisiweyo:

  • Aviasales API - idatha ebuyisiwe yile API iya kusetyenziswa kuwo wonke umsebenzi olandelayo;
  • EC2 uMvelisi umzekelo -umatshini oqhelekileyo okwilifu apho ukuhanjiswa kwedatha yegalelo kuya kuveliswa:
    • Ummeli weKinesis sisicelo seJava esifakwe kwindawo kumatshini obonelela ngendlela elula yokuqokelela kunye nokuthumela idatha kwi-Kinesis (i-Kinesis Data Streams okanye i-Kinesis Firehose). I-arhente isoloko ibeka iliso kwisethi yeefayile kwiinkcukacha ezichaziweyo kwaye ithumela idatha entsha kwi-Kinesis;
    • API Caller Script - Isikripthi sePython esenza izicelo kwi-API kwaye ibeka impendulo kwifolda ebekwe esweni yi-Agent ye-Kinesis;
  • Kinesis Data Streams -Inkonzo yokusasazwa kwedatha yexesha lokwenyani enobuchule obubanzi bokukala;
  • Uhlalutyo lweKinesis yinkonzo engenamncedisi eyenza lula uhlalutyo lwedatha yokusasazwa ngexesha langempela. I-Amazon Kinesis Data Analytics iqwalasela izixhobo zokusebenza kunye nezikali ngokuzenzekelayo ukusingatha nayiphi na ivolumu yedatha engenayo;
  • I-AWS Lambda β€” inkonzo ekuvumela ukuba usebenzise ikhowudi ngaphandle kokuxhasa okanye ukuseta iiseva. Onke amandla ekhompyuter alinganiswa ngokuzenzekelayo kumnxeba ngamnye;
  • IAmazon DynamoDB - I-database ye-key-value pairs kunye namaxwebhu abonelela nge-latency engaphantsi kwe-10 millisecond xa eqhuba nakwesiphi na isikali. Xa usebenzisa iDynamoDB, awudingi ukunika, ukupakisha, okanye ukulawula naziphi na iiseva. I-DynamoDB yenza izikali ngokuzenzekelayo iitafile ukulungisa inani lezixhobo ezikhoyo kunye nokugcina ukusebenza okuphezulu. Akukho lawulo lwenkqubo olufunekayo;
  • Amazon SNS - inkonzo elawulwa ngokupheleleyo yokuthumela imiyalezo usebenzisa umshicileli-umrhumo (i-Pub / Sub) imodeli, onokuthi uhlukanise ngayo ii-microservices, iinkqubo ezisasazwayo kunye nezicelo ezingenamncedisi. I-SNS inokusetyenziselwa ukuthumela ulwazi kubasebenzisi bokugqibela ngezaziso zokutyhala ngeselfowuni, imiyalezo yeSMS kunye nee-imeyile.

Uqeqesho lokuqala

Ukuxelisa ukuhamba kwedatha, ndagqiba ekubeni ndisebenzise ulwazi lwetikiti le-airline elibuyiswe yi-Aviasales API. IN amaxwebhu uluhlu olubanzi lweendlela ezahlukeneyo, masithathe enye yazo - "IKhalenda yexabiso leNyanga", ebuyisela amaxabiso ngosuku ngalunye lwenyanga, ehlelwe ngenani lokudluliselwa. Ukuba awuyikhankanyi inyanga yokukhangela kwisicelo, ulwazi luya kubuyiselwa kwinyanga elandela le yangoku.

Ke, masibhalise kwaye sifumane uphawu lwethu.

Umzekelo wesicelo ungezantsi:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Indlela engentla yokufumana idatha kwi-API ngokucacisa ithokheni kwisicelo iya kusebenza, kodwa ndikhetha ukudlulisa ithokheni yokufikelela kwi-header, ngoko siya kusebenzisa le ndlela kwiskripthi se-api_caller.py.

Umzekelo wokuphendula:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Umzekelo impendulo ye-API ngasentla ibonisa ithikithi ukusuka eSt. Petersburg ukuya ePhuk ... O, yintoni iphupha ...
Ekubeni ndivela eKazan, kwaye iPhuket ngoku "iphupha kuphela", makhe sijonge amathikithi ukusuka eSt. Petersburg ukuya eKazan.

Ithatha ukuba sele unayo i-akhawunti ye-AWS. Ndingathanda ngokukhawuleza ukutsala ingqalelo ekhethekileyo kwinto yokuba iKinesis kunye nokuthumela izaziso ngeSMS azifakwanga kunyaka. Inqanaba lasimahla (ukusetyenziswa simahla). Kodwa nangona oku, kunye needola ezimbalwa engqondweni, kunokwenzeka ukuba wakhe inkqubo ecetywayo kwaye udlale ngayo. Kwaye, ngokuqinisekileyo, ungalibali ukucima zonke izixhobo emva kokuba zingasadingeki.

Ngethamsanqa, imisebenzi yeDynamoDb kunye ne-lambda iya kukhululeka kuthi ukuba sihlangabezana nemida yethu yenyanga yasimahla. Ngokomzekelo, kwi-DynamoDB: i-25 GB yokugcina, i-25 WCU / RCU kunye ne-100 yezigidi zemibuzo. Kwaye isigidi se-lambda sifowuna ngenyanga.

Ukusasazwa kwenkqubo ngesandla

Ukumisela i-Kinesis Data Streams

Makhe siye kwinkonzo ye-Kinesis Data Streams kwaye senze imijelo emibini emitsha, i-shard enye nganye.

Yintoni ishadi?
I-shard yiyunithi esisiseko yokudlulisa idatha ye-Amazon Kinesis stream. Elinye icandelo libonelela ngokudluliselwa kwedatha yegalelo kwisantya se-1 MB / s kunye nokudluliselwa kwedatha yokuphuma kwisantya se-2 MB / s. Icandelo elinye lixhasa ukuya kuthi ga kwi-1000 lamangeno e-PUT ngesekhondi. Xa udala umjelo wedatha, kufuneka ucacise inani elifunekayo lamacandelo. Ngokomzekelo, unokwenza umjelo wedatha kunye namacandelo amabini. Olu luhlu lwedatha luya kunika ukuhanjiswa kwedatha yegalelo kwi-2 MB / s kunye nokudluliselwa kwedatha yokuphuma kwi-4 MB / s, ukuxhasa ukuya kwiirekhodi ze-PUT ze-2000 ngesibini.

Okukhona iingceba ezininzi kumjelo wakho, kokukhona kukhula ukuphumela kwayo. Ngokomgaqo, le yindlela ehamba ngayo imilinganiselo - ngokongeza i-shards. Kodwa i-shards eninzi onayo, ixabiso eliphezulu. Ishadi ngalinye lixabisa i-1,5 yeesenti ngeyure kunye ne-1.4 yeesenti ezongezelelweyo kwisigidi ngasinye seeyunithi zomvuzo we-PUT.

Masenze umsinga omtsha onegama amatikiti_endiza, i-1 shard iya kunela kuye:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Ngoku makhe senze omnye umsonto onegama special_stream:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva

Ukuseta umvelisi

Ukuhlalutya umsebenzi, kwanele ukusebenzisa umzekelo oqhelekileyo we-EC2 njengomvelisi wedatha. Akunyanzelekanga ukuba ibe ngumatshini onamandla, obiza imali eninzi; indawo ethi t2.micro iya kwenza kakuhle.

Inqaku elibalulekileyo: umzekelo, kufuneka usebenzise umfanekiso - i-Amazon Linux AMI 2018.03.0, inezicwangciso ezimbalwa zokuqalisa ngokukhawuleza i-Agent ye-Kinesis.

Yiya kwinkonzo ye-EC2, yenza umatshini omtsha wenyani, khetha i-AMI efunekayo ngohlobo lwe-t2.micro, olubandakanyiweyo kwiTier yasimahla:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Ukuze umatshini owenziwe ngokutsha owenziweyo ukwazi ukusebenzisana nenkonzo ye-Kinesis, kufuneka inikwe amalungelo okwenza oko. Eyona ndlela ilungileyo yokwenza oku kukwabela iNdima ye-IAM. Ke ngoko, kwiNyathelo 3: Qwalasela Iinkcukacha zesikrini, kufuneka ukhethe Yenza indima entsha ye-IAM:

Ukudala indima ye-IAM ye-EC2
Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Kwifestile evulayo, khetha ukuba senza indima entsha ye-EC2 kwaye uye kwicandelo leeMvume:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Ukusebenzisa umzekelo woqeqesho, akufuneki ukuba singene kuzo zonke izinto eziyinkimbinkimbi zokucwangciswa kwegranular kwamalungelo omthombo, ngoko siya kukhetha imigaqo-nkqubo elungiselelwe kwangaphambili yi-Amazon: AmazonKinesisFullAccess kunye ne-CloudWatchFullAccess.

Masinike igama elinentsingiselo yale ndima, umzekelo: EC2-KinesisStreams-FullAccess. Isiphumo kufuneka sifane njengoko kubonisiwe kumfanekiso ongezantsi:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Emva kokudala le ndima intsha, ungalibali ukuyincamathisela kumzekelo womatshini owenziweyo:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Asitshintshi enye into kwesi sikrini kwaye sidlulele kwiifestile ezilandelayo.

Izicwangciso ze-hard drive zinokushiywa njengokungagqibekanga, kunye neethegi (nangona iyisenzo esihle ukusebenzisa iithegi, unike umzekelo igama kwaye ubonise imeko-bume).

Ngoku sikwiNqanaba lesi-6: Qwalasela ithebhu yeQela loKhuseleko, apho kufuneka wenze entsha okanye ucacise iqela lakho elikhoyo loKhuseleko, elikuvumela ukuba uqhagamshele nge-ssh (port 22) kumzekelo. Khetha uMthombo -> IP yam apho kwaye unokuqalisa umzekelo.

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Nje ukuba itshintshele kwisimo sokusebenza, ungazama ukudibanisa kuyo nge-ssh.

Ukuze ukwazi ukusebenza kunye ne-Kinesis Agent, emva kokudibanisa ngempumelelo kumatshini, kufuneka ufake le miyalelo ilandelayo kwi-terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Masenze ifolda yokugcina iimpendulo ze-API:

sudo mkdir /var/log/airline_tickets

Ngaphambi kokuba uqalise i-arhente, kufuneka ulungiselele uqwalaselo lwayo:

sudo vim /etc/aws-kinesis/agent.json

Imixholo yefayile ye agent.json kufuneka ijongeke ngolu hlobo:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Njengoko kunokubonwa kwifayile yoqwalaselo, i-arhente iya kubeka iliso kwiifayile nge-.log extension kwi /var/log/airline_tickets/directory, zicazulule kwaye uzidlulisele kwi-airline_tickets stream.

Siyiqala kwakhona inkonzo kwaye siqinisekise ukuba iyasebenza kwaye iyasebenza:

sudo service aws-kinesis-agent restart

Ngoku makhe sikhuphele iskripthi sePython esiza kucela idatha kwi-API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Iskripthi se-api_caller.py sicela idatha esuka kwi-Aviasales kwaye igcina impendulo efunyenweyo kwi-directory apho i-agent ye-Kinesis ihlola. Ukuphunyezwa kwesi script kusemgangathweni, kukho iklasi yeTicketsApi, ikuvumela ukuba utsale i-API. Sidlula i-header kunye nethokheni kwaye sicele iiparamitha kule klasi:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Ukuvavanya useto oluchanekileyo kunye nokusebenza kwearhente, masivavanye i-api_caller.py script:

sudo ./api_caller.py TOKEN

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Kwaye sijonga isiphumo somsebenzi kwiilogi ze-Agent nakwithebhu yokuJonga kwi-airline_tickets data stream:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Njengoko ubona, yonke into isebenza kwaye i-Agent ye-Kinesis ithumela ngempumelelo idatha kumlambo. Ngoku makhe siqwalasele umthengi.

Ukumisela i-Kinesis Data Analytics

Masiqhubele phambili kwinxalenye ephakathi yenkqubo yonke - yenza isicelo esitsha kwi-Kinesis Data Analytics ebizwa ngokuba kinesis_analytics_airlines_app:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
I-Kinesis Data Analytics ikuvumela ukuba wenze uhlalutyo lwedatha lwexesha langempela kwi-Kinesis Streams usebenzisa ulwimi lwe-SQL. Yinkonzo yokulinganisa ngokuzenzekelayo (ngokungafaniyo neKinesis Streams) ethi:

  1. ikuvumela ukuba wenze imijelo emitsha (i-Output Stream) ngokusekelwe kwizicelo zomthombo wedatha;
  2. ibonelela ngomjelo ngeemposiso ezenzeke ngexesha izicelo bezisebenza (Imposiso Ujelo);
  3. inokumisela ngokuzenzekelayo iskimu sedatha yegalelo (inokuthi ichazwe kwakhona ngesandla ukuba kuyimfuneko).

Oku akusiyo inkonzo encinci - i-0.11 USD ngeyure yomsebenzi, ngoko kufuneka uyisebenzise ngononophelo kwaye uyicime xa ugqibile.

Masiqhagamshele usetyenziso kumthombo wedatha:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Khetha umsinga esiza kuqhagamshelwa kuwo (amatikiti_endiza):

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Okulandelayo, kufuneka uqhoboshele indima entsha ye-IAM ukuze isicelo sifunde ukusuka kumsinga kwaye sibhalele kumsinga. Ukwenza oku, kwanele ukuba ungatshintshi nantoni na kwibhloko yeemvume zokuFikelela:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Ngoku masicele ukufunyanwa kweschema sedatha kumsinga; ukwenza oku, cofa kwiqhosha elithi "Fumana ischema". Ngenxa yoko, indima ye-IAM iya kuhlaziywa (entsha iya kwenziwa) kwaye ukufunyanwa kwe-schema kuya kuqaliswa kwidatha esele ifikile kumlambo:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Ngoku kufuneka uye kumhleli weSQL. Xa unqakraza kweli qhosha, iwindow iya kuvela icela ukuba uqalise isicelo- khetha into ofuna ukuyiqalisa:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Faka lo mbuzo ulula ulandelayo kwifestile yomhleli weSQL kwaye ucofe Gcina kwaye Uqhube iSQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Kwiinkcukacha zolwazi olunxulumeneyo, usebenza kunye neetafile usebenzisa i-INSERT statements ukongeza iirekhodi kunye ne-SELECT statement to query data. Kwi-Amazon Kinesis Data Analytics, usebenza kunye nemifudlana (STREAMs) kunye neepompo (i-PUMPs) -izicelo eziqhubekayo zokufaka ezifaka idatha ukusuka kumlambo omnye kwisicelo ukuya komnye umlambo.

Umbuzo weSQL owenziwe ngasentla ukhangela amatikiti e-Aeroflot ngexabiso elingaphantsi kwamawaka amahlanu eeruble. Zonke iirekhodi ezihlangabezana nale miqathango ziya kubekwa kwi-DESTINATION_SQL_STREAM yomsinga.

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Kwibhloko yendawo ekuyiyo, khetha umlambo okhethekileyo, kwaye kwigama lomjelo ongaphakathi kwesicelo DESTINATION_SQL_STREAM uluhlu oluhlayo:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Isiphumo sazo zonke ii-manipulations kufuneka zibe yinto efana nomfanekiso ongezantsi:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva

Ukudala kunye nokurhuma kwisihloko se-SNS

Yiya kwiNkonzo yeSaziso esiLula kwaye wenze isihloko esitsha apho ngegama Iinqwelomoya:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Bhalisa kwesi sihloko kwaye ubonise inombolo yefowuni ephathwayo apho izaziso zeSMS ziya kuthunyelwa khona:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva

Yenza itafile kwiDynamoDB

Ukugcina idatha ekrwada ukusuka kwi-airline_tickets stream, masenze itafile kwiDynamoDB enegama elifanayo. Siza kusebenzisa irekhodi_id njengesitshixo sokuqala:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva

Ukudala umqokeleli wemisebenzi ye-lambda

Masenze umsebenzi we-lambda obizwa ngokuba nguMqokeleli, omsebenzi wakhe uya kuba kukuvota i-airline_tickets umjelo kwaye, ukuba iirekhodi ezintsha zifunyenwe apho, faka ezi rekhodi kwitafile yeDynamoDB. Ngokucacileyo, ukongeza kumalungelo angagqibekanga, le lambda kufuneka ifunde ukufikelela kwidatha yeKinesis kwaye ibhale ukufikelela kwiDynamoDB.

Ukudala indima ye-IAM yomsebenzi womqokeleli we-lambda
Okokuqala, masenze indima entsha ye-IAM yelambda ebizwa ngokuba yiLambda-TicketsProcessingRole:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Kumzekelo wovavanyo, i-AmazonKinesisReadOnlyAccess esele iqwalaselwe kunye ne-AmazonDynamoDBFullAccess imigaqo-nkqubo ifanelekile, njengoko kubonisiwe kumfanekiso ongezantsi:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva

Le lambda kufuneka iqaliswe yi-trigger esuka kwi-Kinesis xa amangeno amatsha angena kwi-airline_stream, ngoko kufuneka songeze i-trigger entsha:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Konke okuseleyo kukufaka ikhowudi kwaye ugcine i-lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Ukwenza isaziso somsebenzi we-lambda

Umsebenzi wesibini we-lambda, oza kubeka iliso kwi-stream yesibini (special_stream) kwaye uthumele isaziso kwi-SNS, yenziwe ngendlela efanayo. Ngoko ke, le lambda kufuneka ikwazi ukufikelela ekufundeni ukusuka kwi-Kinesis kwaye ithumele imiyalezo kwisihloko esinikiweyo se-SNS, eya kuthi emva koko ithunyelwe yinkonzo ye-SNS kubo bonke ababhalisi besi sihloko (i-imeyile, i-SMS, njl.).

Ukudala indima ye-IAM
Okokuqala, sidala indima ye-IAM iLambda-KinesisAlarm yale lambda, kwaye emva koko sabelane le ndima kwi-alarm_notifier lambda eyenziwayo:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva

Le lambda kufuneka isebenze kwi-trigger yeerekhodi ezintsha ukungena kwi-special_stream, ngoko ke kufuneka uqwalasele i-trigger ngendlela efanayo njengoko senzayo kwi-Collector lambda.

Ukwenza kube lula ukuqwalasela le lambda, makhe sazise uguquko olutsha lokusingqongileyo - TOPIC_ARN, apho sibeka khona i-ANR (Amagama e-Amazon Recourse) yesihloko se-Airlines:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Kwaye faka ikhowudi ye-lambda, ayintsonkothanga kwaphela:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Kubonakala ngathi kulapho uqwalaselo lwenkqubo yezandla lugqitywe. Ekuphela kwento eseleyo kukuvavanya kwaye siqinisekise ukuba silungiselele yonke into ngokuchanekileyo.

Sebenzisa ikhowudi yeTerraform

Ulungiselelo olufunekayo

Terraform sisixhobo esiluncedo kakhulu somthombo ovulekileyo wokusasaza isiseko esisuka kwikhowudi. Ine-syntax yayo ekulula ukuyifunda kwaye inemizekelo emininzi yendlela kunye nento emayisetyenziswe ngayo. Umhleli weAtom okanye iKhowudi yeVisual Studio ineeplagi ezininzi eziluncedo ezenza ukusebenza ngeTerraform kube lula.

Unokukhuphela ukuhanjiswa kusuka apha. Uhlalutyo oluneenkcukacha lwazo zonke izakhono zeTerraform zingaphaya komda weli nqaku, ngoko ke siya kuzikhawulela kumanqaku aphambili.

Ubaleka njani

Ikhowudi epheleleyo yeprojekthi yile kwindawo yam yokugcina. Sizenzele ngokwethu uvimba. Ngaphambi kokuqala, kufuneka uqiniseke ukuba unayo i-AWS CLI efakiweyo kwaye iqwalaselwe, kuba... I-Terraform iyakukhangela iziqinisekiso kwi ~/.aws/iinkcukacha zefayile.

Isiqhelo esilungileyo kukuqhuba isicwangciso somyalelo ngaphambi kokuthumela isiseko sonke ukuze sibone ukuba yintoni iTerraform esenzela yona ngoku efini:

terraform.exe plan

Uya kucelwa ukuba ufake inombolo yefowuni ukuthumela izaziso kuyo. Akuyimfuneko ukungena kuyo ngeli nqanaba.

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Emva kokuhlalutya isicwangciso sokusebenza kwenkqubo, sinokuqala ukudala izixhobo:

terraform.exe apply

Emva kokuthumela lo myalelo, uya kuphinda ucelwe ukuba ufake inombolo yefowuni; cofa u-β€œewe” xa umbuzo omalunga nokwenza ngokwenyani ubonisiwe. Oku kuya kukuvumela ukuba umise yonke iziseko zophuhliso, wenze lonke uqwalaselo oluyimfuneko lwe-EC2, sebenzisa imisebenzi ye-lambda, njl.

Emva kokuba zonke izixhobo zenziwe ngempumelelo ngekhowudi yeTerraform, kufuneka ungene kwiinkcukacha zesicelo se-Kinesis Analytics (ngelishwa, andizange ndifumane indlela yokwenza oku ngokuthe ngqo kwikhowudi).

Qalisa isicelo:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Emva koku, kufuneka usete ngokucacileyo igama lomjelo wangaphakathi-sicelo ngokukhetha kuluhlu oluhlayo:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Ngoku yonke into ilungele ukuhamba.

Ukuvavanya isicelo

Nokuba uyisebenzise njani inkqubo, ngesandla okanye ngekhowudi yeTerraform, iya kusebenza ngokufanayo.

Singena nge-SSH kumatshini we-EC2 apho i-Agent ye-Kinesis ifakwe kwaye iqhube i-api_caller.py script.

sudo ./api_caller.py TOKEN

Okufuneka ukwenze kukulinda iSMS kwinombolo yakho:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
I-SMS-umyalezo ufika kwifowuni malunga nomzuzu omnye:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva
Kuhlala ukubona ukuba ngaba iirekhodi zigcinwe kwi-database ye-DynamoDB ukwenzela uhlalutyo olulandelayo, oluneenkcukacha ngakumbi. Itheyibhile yetikiti_yendiza iqulathe malunga nale datha ilandelayo:

Ukudityaniswa kwe-Aviasales API kunye neAmazon Kinesis kunye nokulula okungenaseva

isiphelo

Ngethuba lomsebenzi owenziweyo, inkqubo yokucubungula idatha ye-intanethi yakhiwe ngokusekelwe kwi-Amazon Kinesis. Izinketho zokusebenzisa i-Agent ye-Kinesis ngokubambisana ne-Kinesis Data Streams kunye ne-real-time analytics Kinesis Analytics usebenzisa imiyalelo ye-SQL, kunye nokusebenzisana kwe-Amazon Kinesis kunye nezinye iinkonzo ze-AWS zaqwalaselwa.

Sisebenzise le nkqubo ingentla ngeendlela ezimbini: incwadana yemigaqo ende kunye nekhawulezayo ukusuka kwikhowudi yeTerraform.

Yonke ikhowudi yomthombo weprojekthi iyafumaneka kwindawo yam yokugcina iGitHub, ndicebisa ukuba uziqhelanise nayo.

Ndiyavuya ukuxubusha inqaku, ndikhangele phambili kwizimvo zakho. Ndiyathemba ukugxekwa okwakhayo.

Ndikunqwenelela impumelelo!

umthombo: www.habr.com

Yongeza izimvo