Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva

Sawubona Habr!

Uyathanda izindiza ezindizayo? Ngiyayithanda, kodwa ngesikhathi sokuzihlukanisa ngaphinde ngathandana nokuhlaziya idatha yamathikithi endiza evela kumthombo owodwa owaziwa kakhulu - i-Aviasales.

Namuhla sizohlaziya umsebenzi we-Amazon Kinesis, sakhe isistimu yokusakaza nge-real-time analytics, sifake i-Amazon DynamoDB NoSQL database njengendawo yokugcina idatha eyinhloko, futhi simise izaziso ze-SMS zamathikithi athakazelisayo.

Yonke imininingwane ingaphansi kokusikwa! Hamba!

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva

Isingeniso

Ngokwesibonelo, sidinga ukufinyelela I-Aviasales API. Ukufinyelela kuyo kunikezwa mahhala futhi ngaphandle kwemikhawulo; udinga nje ukubhalisa esigabeni esithi "Onjiniyela" ukuze uthole ithokheni yakho ye-API ukuze ufinyelele idatha.

Inhloso enkulu yalesi sihloko ukunikeza ukuqonda okujwayelekile kokusetshenziswa kokusakaza kolwazi ku-AWS; sicabangela ukuthi idatha ebuyisiwe yi-API esetshenzisiwe ayisesikhathini samanje futhi idluliselwa kusuka kunqolobane, okuyi-API. kwakhiwa ngokusekelwe ekusesheni ngabasebenzisi bezindawo ze-Aviasales.ru kanye ne-Jetradar.com amahora angu-48 edlule.

I-Kinesis-ejenti, efakwe emshinini okhiqizayo, etholwe nge-API izohlaziya ngokuzenzakalelayo futhi idlulisele idatha ekusakazeni okufunayo nge-Kinesis Data Analytics. Inguqulo eluhlaza yalokhu kusakaza izobhalwa ngokuqondile esitolo. Isitoreji sedatha esingahluziwe esifakwe ku-DynamoDB sizovumela ukuhlaziywa kwethikithi okujulile ngamathuluzi e-BI, njenge-AWS Quick Sight.

Sizobheka izinketho ezimbili zokusebenzisa yonke ingqalasizinda:

  • Imanuwali - nge-AWS Management Console;
  • Ingqalasizinda evela kwikhodi ye-Terraform ingeyama-automator angamavila;

I-Architecture yesistimu ethuthukisiwe

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Izingxenye ezisetshenzisiwe:

  • I-Aviasales API — idatha ebuyiswe yile API izosetshenziselwa wonke umsebenzi olandelayo;
  • I-EC2 Producer Instance - umshini ojwayelekile osefwini lapho kuzokwenziwa ukusakaza kwedatha okokufaka:
    • I-ejenti ye-Kinesis uhlelo lokusebenza lwe-Java olufakwe endaweni emshinini olunikeza indlela elula yokuqoqa nokuthumela idatha ku-Kinesis (I-Kinesis Data Streams noma i-Kinesis Firehose). I-ejenti ihlale iqapha isethi yamafayela ezinhlwini ezishiwo futhi ithumela idatha entsha ku-Kinesis;
    • Isikripthi Somshayeli we-API - Iskripthi sePython esenza izicelo ku-API futhi sibeke impendulo kufolda eqashwe yi-Kinesis Agent;
  • Kinesis Data Streams - Isevisi yokusakaza idatha yesikhathi sangempela enekhono elibanzi lokukala;
  • I-Kinesis Analytics iyisevisi engenaseva eyenza kube lula ukuhlaziywa kokusakaza idatha ngesikhathi sangempela. I-Amazon Kinesis Data Analytics ilungiselela izinsiza zohlelo lokusebenza futhi izikali ngokuzenzakalelayo ukuphatha noma iyiphi ivolumu yedatha engenayo;
  • I-AWS Lambda — isevisi ekuvumela ukuthi usebenzise ikhodi ngaphandle kokubhekhapha noma ukumisa amaseva. Wonke amandla ekhompyutha akalwa ngokuzenzakalelayo kukholi ngayinye;
  • I-Amazon DynamoDB - Isizindalwazi samapheya enani elingukhiye namadokhumenti ahlinzeka ngokubambezeleka okungaphansi kwama-millisecond ayi-10 uma isebenza nganoma yisiphi isikali. Uma usebenzisa i-DynamoDB, awudingi ukunikeza, ukuchibiyela, noma ukuphatha noma yimaphi amaseva. I-DynamoDB ikala amathebula ngokuzenzakalelayo ukuze ilungise inani lezinsiza ezitholakalayo futhi igcine ukusebenza okuphezulu. Akukho ukuphathwa kwesistimu okudingekayo;
  • I-Amazon SNS - isevisi ephethwe ngokugcwele yokuthumela imilayezo usebenzisa imodeli yomshicileli-obhalisile (I-Pub/Sub), ongahlukanisa ngayo ama-microservices, amasistimu asabalalisiwe nezinhlelo zokusebenza ezingenasiphakeli. I-SNS ingasetshenziswa ukuthumela imininingwane kubasebenzisi bokugcina ngezaziso zeselula, imilayezo ye-SMS nama-imeyili.

Ukuqeqeshwa kokuqala

Ukuze ngilingise ukugeleza kwedatha, nginqume ukusebenzisa ulwazi lwethikithi lendiza elibuyiswe i-Aviasales API. IN imibhalo Uhlu olubanzi lwezindlela ezahlukahlukene, ake sithathe eyodwa yazo - "Ikhalenda Lamanani Lanyanga zonke", elibuyisela izintengo zosuku ngalunye lwenyanga, eziqoqwe ngenani lokudluliswa kwemali. Uma ungayicacisi inyanga yokusesha esicelweni, imininingwane izobuyiselwa ngenyanga elandela leyo yamanje.

Ngakho-ke, masibhalise futhi sithole ithokheni yethu.

Isibonelo sesicelo singezansi:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Indlela engenhla yokwamukela idatha ku-API ngokucacisa ithokheni esicelweni izosebenza, kodwa ngincamela ukudlulisa ithokheni yokufinyelela kunhlokweni, ngakho sizosebenzisa le ndlela kusikripthi se-api_caller.py.

Isibonelo sempendulo:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Isibonelo sempendulo ye-API ngenhla ibonisa ithikithi elisuka e-St. Petersburg liya e-Phuk... Oh, yeka iphupho...
Njengoba ngivela eKazan, futhi iPhuket manje "iphupho kuphela", ake sibheke amathikithi asuka eSt. Petersburg kuya eKazan.

Kucabanga ukuthi usuvele unayo i-akhawunti ye-AWS. Ngithanda ukudonsela ukunaka okukhethekile ngokushesha eqinisweni lokuthi i-Kinesis nokuthumela izaziso nge-SMS akufakiwe onyakeni. Isigaba samahhala (ukusetshenziswa kwamahhala). Kodwa naphezu kwalokhu, uma ucabanga ngamadola ambalwa, kungenzeka ukwakha uhlelo oluhlongozwayo futhi udlale ngalo. Futhi-ke, ungakhohlwa ukususa zonke izinsiza ngemuva kokuthi zingasadingeki.

Ngenhlanhla, imisebenzi ye-DynamoDb kanye ne-lambda izoba mahhala kithi uma uhlangabezana nemikhawulo yamahhala yanyanga zonke. Isibonelo, ku-DynamoDB: 25 GB wesitoreji, 25 WCU/RCU nemibuzo eyizigidi eziyi-100. Futhi izingcingo ze-lambda eziyisigidi ngenyanga.

Ukuthunyelwa kwesistimu ngesandla

Ukusetha Ukusakazwa Kwedatha ye-Kinesis

Ake siye kusevisi ye-Kinesis Data Streams futhi sakhe imifudlana emibili emisha, ucezu olulodwa kokukodwa.

Kuyini utshani?
I-shard iyunithi yokudlulisa idatha eyisisekelo yokusakaza kwe-Amazon Kinesis. Ingxenye eyodwa ihlinzeka ngokudluliswa kwedatha okokufaka ngesivinini esingu-1 MB/s kanye nokudluliswa kwedatha okukhiphayo ngesivinini esingu-2 MB/s. Ingxenye eyodwa isekela okufakiwe kwe-PUT okungu-1000 ngomzuzwana. Lapho udala ukusakazwa kwedatha, udinga ukucacisa inombolo edingekayo yamasegimenti. Isibonelo, ungakha ukusakazwa kwedatha ngamasegimenti amabili. Lokhu kusakazwa kwedatha kuzohlinzeka ngokudluliswa kwedatha okokufaka ku-2 MB/s kanye nokudluliswa kwedatha okuphumayo ku-4 MB/s, okusekela amarekhodi angafika kwangu-2000 e-PUT ngomzuzwana.

Uma ama-shards engeziwe ekusakazweni kwakho, kukhulu ukuphumela kwakho. Empeleni, le yindlela ukugeleza kukalwa ngayo - ngokungeza ama-shards. Kodwa uma unamashadi amaningi, intengo iyanda. Ishadi ngalinye libiza amasenti angu-1,5 ngehora kanye namasenti angu-1.4 engeziwe kuwo wonke amayunithi okukhokha ayisigidi e-PUT.

Masidale ukusakaza okusha ngegama amathikithi_endiza, ucezu olu-1 luzomenela:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Manje ake sakhe olunye uchungechunge olunegama ukusakaza_okukhethekile:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva

Ukusethwa komkhiqizi

Ukuze uhlaziye umsebenzi, kwanele ukusebenzisa isenzakalo esivamile se-EC2 njengomkhiqizi wedatha. Akumele kube umshini obonakalayo onamandla, obizayo; indawo ethi t2.micro izokwenza kahle.

Inothi elibalulekile: isibonelo, kufanele usebenzise isithombe - I-Amazon Linux AMI 2018.03.0, inezilungiselelo ezimbalwa zokuqalisa ngokushesha i-Kinesis Agent.

Iya kusevisi ye-EC2, dala umshini omusha we-virtual, khetha i-AMI oyifunayo enohlobo lwe-t2.micro, olufakwe ku-Free Tier:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Ukuze umshini obonakalayo osanda kwakhiwa ukwazi ukusebenzisana nesevisi ye-Kinesis, kufanele unikezwe amalungelo okwenza kanjalo. Indlela engcono kakhulu yokwenza lokhu ukwabela Indima ye-IAM. Ngakho-ke, ku-Isinyathelo sesi-3: Lungiselela isikrini Semininingwane Yesimo, kufanele ukhethe Dala indima entsha ye-IAM:

Ukudala indima ye-IAM ye-EC2
Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Ewindini elivulayo, khetha ukuthi sakha indima entsha ye-EC2 bese uya esigabeni Sezimvume:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Ngokusebenzisa isibonelo sokuqeqesha, asikho isidingo sokuthi singene kukho konke okuyinkimbinkimbi yokucushwa kwegranular kwamalungelo esisetshenziswa, ngakho-ke sizokhetha izinqubomgomo ezilungiselelwe kusengaphambili yi-Amazon: AmazonKinesisFullAccess and CloudWatchFullAccess.

Ake sinikeze igama elibalulekile lale ndima, isibonelo: EC2-KinesisStreams-FullAccess. Umphumela kufanele ufane njengoba kukhonjisiwe esithombeni esingezansi:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Ngemuva kokudala le ndima entsha, ungakhohlwa ukuyinamathisela esibonelweni somshini esidaliwe:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Asishintshi lutho olunye kulesi sikrini bese sidlulela kumawindi alandelayo.

Izilungiselelo ze-hard drive zingashiywa njengezizenzakalelayo, kanye namathegi (yize kuwumkhuba omuhle ukusebenzisa amathegi, okungenani unikeze isibonelo igama futhi ubonise indawo ezungezile).

Manje sikusinyathelo sesi-6: Lungiselela ithebhu Yeqembu Lokuphepha, lapho udinga ukudala entsha noma ucacise iqembu lakho lezokuphepha elikhona, elikuvumela ukuthi uxhume nge-ssh (port 22) kusibonelo. Khetha Umthombo -> I-IP yami lapho futhi ungaqalisa isibonelo.

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Lapho nje ishintshela esimweni sokusebenza, ungazama ukuxhuma kuyo nge-ssh.

Ukuze ukwazi ukusebenza ne-Kinesis Agent, ngemuva kokuxhuma ngempumelelo emshinini, kufanele ufake imiyalo elandelayo kutheminali:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Masidale ifolda ukuze silondoloze izimpendulo ze-API:

sudo mkdir /var/log/airline_tickets

Ngaphambi kokuqala i-ejenti, udinga ukumisa ukucushwa kwayo:

sudo vim /etc/aws-kinesis/agent.json

Okuqukethwe kwefayela le-agent.json kufanele kubukeke kanje:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Njengoba kungabonwa efayeleni lokumisa, umenzeli uzoqapha amafayela ngesandiso esithi .log kuhlu lwemibhalo/var/log/airline_tickets/, awahlaziye futhi awadlulisele ekusakazweni kwamathikithi_endiza.

Siqala kabusha isevisi futhi siqinisekisa ukuthi iyasebenza futhi:

sudo service aws-kinesis-agent restart

Manje ake silande iskripthi sePython esizocela idatha ku-API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Umbhalo we-api_caller.py ucela idatha kwa-Aviasales futhi ulondoloza impendulo eyamukelwe ohlwini lwemibhalo oluskenwa umenzeli we-Kinesis. Ukuqaliswa kwalesi script kuyinto ejwayelekile, kukhona ikilasi leTicketsApi, likuvumela ukuthi udonse i-API ngokulinganayo. Sidlulisa unhlokweni ngethokheni futhi sicele imingcele kuleli klasi:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Ukuhlola izilungiselelo ezilungile nokusebenza komenzeli, ake sihlole sisebenzise umbhalo we-api_caller.py:

sudo ./api_caller.py TOKEN

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Futhi sibheka umphumela womsebenzi kulogi lomenzeli nakuthebhu yokuqapha ekusakazweni kwedatha yamathikithi e-airline:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Njengoba ubona, yonke into iyasebenza futhi I-Kinesis Agent ithumela ngempumelelo idatha ekusakazweni. Manje ake silungiselele umthengi.

Isetha i-Kinesis Data Analytics

Masiqhubekele engxenyeni emaphakathi yalo lonke uhlelo - dala uhlelo lokusebenza olusha ku-Kinesis Data Analytics oluqanjwe i-kinesis_analytics_airlines_app:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
I-Kinesis Data Analytics ikuvumela ukuthi wenze ukuhlaziya idatha yesikhathi sangempela kusuka ku-Kinesis Streams usebenzisa ulimi lwe-SQL. Kuyisevisi ye-autoscaling ngokugcwele (ngokungafani ne-Kinesis Streams) ethi:

  1. ikuvumela ukuthi udale ukusakazwa okusha (I-Output Stream) ngokusekelwe ezicelweni zomthombo wedatha;
  2. inikeza ukusakaza ngamaphutha enzeke ngesikhathi izinhlelo zokusebenza zisasebenza (Ukusakazwa Kwephutha);
  3. inganquma ngokuzenzakalelayo uhlelo lwedatha yokufaka (ingachazwa kabusha uma kunesidingo).

Lena akuyona isevisi eshibhile - i-0.11 USD ngehora ngalinye lomsebenzi, ngakho kufanele uyisebenzise ngokucophelela futhi uyisuse uma usuqedile.

Masixhume uhlelo lokusebenza kumthombo wedatha:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Khetha ukusakaza esizoxhuma kukho (amathikithi_endiza):

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Okulandelayo, udinga ukunamathisela Iqhaza elisha le-IAM ukuze uhlelo lokusebenza lufunde kusuka ekusakazweni futhi lubhalele ekusakazweni. Ukwenza lokhu, kwanele ukuthi ungashintshi lutho kubhulokhi yezimvume zokufinyelela:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Manje ake sicele ukutholwa kwe-schema sedatha ekusakazweni; ukwenza lokhu, chofoza inkinobho ethi "Zitholele i-schema". Ngenxa yalokho, indima ye-IAM izobuyekezwa (entsha izokwakhiwa) futhi ukutholwa kwe-schema kuzoqaliswa kusukela kudatha esivele ifikile ekusakazeni:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Manje udinga ukuya kusihleli se-SQL. Uma uchofoza le nkinobho, kuzovela iwindi likucela ukuthi uqalise uhlelo lokusebenza - khetha ofuna ukukuqalisa:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Faka umbuzo olula olandelayo efasiteleni lomhleli we-SQL bese uchofoza okuthi Gcina futhi Usebenzise i-SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Kumininingwane yolwazi ehlobene, usebenza namathebula usebenzisa izitatimende ze-FAKA ukuze wengeze amarekhodi kanye nesitatimende esithi KHETHA ukuze ubuze idatha. Ku-Amazon Kinesis Data Analytics, usebenza nemifudlana (STREAM) namaphampu (AMAPUMP)—izicelo eziqhubekayo zokufaka ezifaka idatha isuka ekusakazweni okukodwa ohlelweni iye kokunye ukusakaza.

Umbuzo we-SQL owethulwe ngenhla usesha amathikithi e-Aeroflot ngenani elingaphansi kwama-ruble ayizinkulungwane ezinhlanu. Wonke amarekhodi ahlangabezana nalezi zimo azofakwa ekusakazweni kwe-DESTINATION_SQL_STREAM.

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Ebhulokhini Lendawo, khetha ukusakaza okukhethekile_, nakuhlu lokudonsela phansi lwegama lohlelo lokusebenza DESTINATION_SQL_STREAM:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Umphumela wawo wonke ama-manipulations kufanele ube into efana nesithombe esingezansi:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva

Ukudala nokubhalisela isihloko se-SNS

Iya kusevisi yezaziso ezilula bese udala isihloko esisha lapho ngegama elithi Izindiza:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Bhalisela lesi sihloko futhi ubonise inombolo yefoni yeselula okuzothunyelwa kuyo izaziso ze-SMS:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva

Dala ithebula ku-DynamoDB

Ukuze sigcine idatha eluhlaza evela ekusakazweni kwamathikithi e-airline_tickets, masidale ithebula ku-DynamoDB elinegama elifanayo. Sizosebenzisa record_id njengokhiye oyinhloko:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva

Ukudala isiqoqi somsebenzi we-lambda

Masidale umsebenzi we-lambda obizwa ngokuthi Umqoqi, omsebenzi wakhe kuzoba ukuhlola ukusakazwa kwamathikithi endiza yezindiza futhi, uma amarekhodi amasha etholakala lapho, faka lawa marekhodi kuthebula le-DynamoDB. Ngokusobala, ngaphezu kwamalungelo azenzakalelayo, le lambda kufanele ibe nokufinyelela kokufunda ekusakazweni kwedatha ye-Kinesis futhi ibhale ukufinyelela ku-DynamoDB.

Ukudala indima ye-IAM yomsebenzi womqoqi we-lambda
Okokuqala, masidale indima entsha ye-IAM ye-lambda ebizwa ngokuthi i-Lambda-TicketsProcessingRole:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Ngokwesibonelo sokuhlola, izinqubomgomo ezilungiselelwe kusengaphambili ze-AmazonKinesisReadOnlyAccess kanye ne-AmazonDynamoDBFullAccess zifanelekile, njengoba kuboniswe esithombeni esingezansi:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva

Le lambda kufanele yethulwe i-trigger evela ku-Kinesis lapho okufakiwe okusha kungena ku-airline_stream, ngakho-ke sidinga ukungeza i-trigger entsha:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Okusele nje ukufaka ikhodi bese ugcine i-lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Idala isaziso somsebenzi we-lambda

Umsebenzi wesibili we-lambda, ozoqapha ukusakaza kwesibili (special_stream) futhi uthumele isaziso ku-SNS, udalwe ngendlela efanayo. Ngakho-ke, le lambda kufanele ibe nokufinyelela kokufunda ku-Kinesis futhi ithumele imilayezo esihlokweni se-SNS esinikeziwe, esizothunyelwa yisevisi ye-SNS kubo bonke ababhalisile balesi sihloko (i-imeyili, i-SMS, njll.).

Ukudala indima ye-IAM
Okokuqala, sidala indima ye-IAM ethi Lambda-KinesisAlarm yale lambda, bese sinika le ndima ku-alarm_notifier lambda edalwayo:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva

Le lambda kufanele isebenze ekucupheni ukuze amarekhodi amasha angene ku-special_stream, ngakho-ke udinga ukulungisa i-trigger ngendlela efanayo nesenze ngayo ku-Lambda Yokuqoqwa.

Ukwenza kube lula ukumisa le lambda, ake sethule okuhlukile kwemvelo okusha - TOPIC_ARN, lapho sibeka khona i-ANR (Ama-Amazon Recourse Names) esihlokweni se-Airlines:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Bese ufaka ikhodi ye-lambda, ayinzima neze:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Kubonakala sengathi kulapho ukucushwa kwesistimu okwenziwa ngesandla kuqedwa khona. Okusele nje ukuhlola nokuqinisekisa ukuthi silungiselele yonke into ngendlela efanele.

Sebenzisa kusuka kukhodi ye-Terraform

Ukulungiselela okudingekayo

I-Terraform iyithuluzi elilula kakhulu lomthombo ovulekile lokusebenzisa ingqalasizinda kusuka kukhodi. Ine-syntax yayo okulula ukuyifunda futhi inezibonelo eziningi zokuthi kufanele kusetshenziswe kanjani futhi yini. Umhleli we-Atom noma Ikhodi ye-Visual Studio inama-plugin amaningi asebenzayo enza ukusebenza ngeTerraform kube lula.

Ungalanda ukusatshalaliswa kusuka lapha. Ukuhlaziywa okuningiliziwe kwawo wonke amakhono e-Terraform kungaphezu kobubanzi balesi sihloko, ngakho-ke sizozikhawulela kumaphuzu abalulekile.

Ungaqala kanjani

Ikhodi egcwele yephrojekthi ithi endaweni yami yokugcina. Sizenzele thina inqolobane. Ngaphambi kokuqala, udinga ukwenza isiqiniseko sokuthi i-AWS CLI ifakiwe futhi imisiwe, ngoba... I-Terraform izobheka iziqinisekiso kufayela elithi ~/.aws/credentials.

Umkhuba omuhle ukusebenzisa umyalo wohlelo ngaphambi kokufaka yonke ingqalasizinda ukuze ubone ukuthi iTerraform isenzela ini njengamanje emafini:

terraform.exe plan

Uzocelwa ukuthi ufake inombolo yefoni ozothumela kuyo izaziso. Akudingekile ukungena kuwo kulesi sigaba.

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Ngemva kokuhlaziya uhlelo lokusebenza lohlelo, singaqala ukudala izinsiza:

terraform.exe apply

Ngemva kokuthumela lo myalo, uzophinde ucelwe ukuthi ufake inombolo yocingo, shayela u-“yebo” uma kuboniswa umbuzo mayelana nokwenza ngempela lokho. Lokhu kuzokuvumela ukuthi umise yonke ingqalasizinda, wenze konke ukucushwa okudingekayo kwe-EC2, sebenzisa imisebenzi ye-lambda, njll.

Ngemuva kokuthi zonke izinsiza zidalwe ngempumelelo ngekhodi ye-Terraform, udinga ukungena emininingwaneni yohlelo lokusebenza lwe-Kinesis Analytics (ngeshwa, angitholanga ukuthi ngingakwenza kanjani lokhu ngokuqondile kukhodi).

Yethula uhlelo lokusebenza:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Ngemva kwalokhu, kufanele usethe ngokusobala igama lokusakaza kwangaphakathi nohlelo ngokukhetha ohlwini lokudonsela phansi:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Manje konke sekumi ngomumo.

Ukuhlola isicelo

Kungakhathaliseki ukuthi usebenzise kanjani isistimu, ngesandla noma ngekhodi ye-Terraform, izosebenza ngendlela efanayo.

Singena nge-SSH emshinini obonakalayo we-EC2 lapho i-Kinesis Agent ifakiwe futhi isebenzisa umbhalo we-api_caller.py

sudo ./api_caller.py TOKEN

Okufanele ukwenze ukulinda i-SMS enombolweni yakho:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
I-SMS - umlayezo ufika ocingweni cishe ngomzuzu ongu-1:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva
Kusasele ukubona ukuthi amarekhodi alondolozwe kusizindalwazi seDynamoDB ukuze ahlaziywe okulandelayo, anemininingwane eyengeziwe. Ithebula lamathikithi_lendiza liqukethe cishe idatha elandelayo:

Ukuhlanganiswa kwe-Aviasales API ne-Amazon Kinesis kanye nokulula okungenaseva

isiphetho

Ngokuqhubeka komsebenzi owenziwe, kwakhiwa uhlelo lwe-inthanethi lokucubungula idatha olusekelwe ku-Amazon Kinesis. Izinketho zokusebenzisa i-Kinesis Agent ngokubambisana ne-Kinesis Data Streams kanye nokuhlaziywa kwesikhathi sangempela I-Kinesis Analytics isebenzisa imiyalo ye-SQL, kanye nokusebenzisana kwe-Amazon Kinesis nezinye izinsizakalo ze-AWS kucatshangelwe.

Sisebenzise isistimu engenhla ngezindlela ezimbili: eyamanuwali ende kanye nesheshayo evela kukhodi ye-Terraform.

Yonke ikhodi yomthombo wephrojekthi iyatholakala endaweni yami yokugcina ye-GitHub, ngiphakamisa ukuthi uzijwayeze nayo.

Ngiyajabula ukuxoxa ngesihloko, ngibheke ngabomvu ukuphawula kwenu. Ngethemba ukugxeka okwakhayo.

Ngikufisela impumelelo!

Source: www.habr.com

Engeza amazwana