Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí

Hey Habr!

An maith leat eitleáin ag eitilt? Is breá liom é, ach le linn féin-aonraithe thit mé i ngrá freisin le sonraí a anailísiú ar thicéid aeir ó acmhainn aitheanta amháin - Aviasales.

Sa lá atá inniu déanfaimid anailís ar obair Amazon Kinesis, tógfaimid córas sruthaithe le hanailís fíor-ama, suiteáil bunachar sonraí Amazon DynamoDB NoSQL mar phríomhstóráil sonraí, agus socróimid fógraí SMS le haghaidh ticéid suimiúla.

Tá na sonraí go léir faoin gearrtha! Téigh!

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí

Réamhrá

Mar shampla, ní mór dúinn rochtain ar Aviasales API. Soláthraítear rochtain air saor in aisce agus gan srianta; ní gá duit ach clárú sa rannán “Forbróirí” chun do chomhartha API a fháil chun rochtain a fháil ar na sonraí.

Is é príomhchuspóir an ailt seo ná tuiscint ghinearálta a thabhairt ar úsáid sruthú faisnéise in AWS; cuirimid san áireamh nach bhfuil na sonraí a sheol an API a úsáideadh cothrom le dáta go docht agus go ndéantar iad a tharchur ón taisce, is é sin déanta bunaithe ar chuardaigh ó úsáideoirí na suíomhanna Aviasales.ru agus Jetradar.com le 48 uair an chloig anuas.

Déanfaidh gníomhaire Kinesis, suiteáilte ar an meaisín táirgthe, a fhaightear tríd an API sonraí a pharsáil agus a tharchur go huathoibríoch chuig an sruth atá ag teastáil trí Kinesis Data Analytics. Scríobhfar an leagan amh den sruth seo go díreach chuig an siopa. Ceadóidh an stóráil sonraí amh a imscartar i DynamoDB anailís ticéad níos doimhne trí uirlisí BI, mar AWS Quick Sight.

Déanfaimid breithniú ar dhá rogha chun an bonneagar iomlán a úsáid:

  • Lámhleabhar - trí Chonsól Bainistíochta AWS;
  • Is le haghaidh uathoibritheoirí leisciúla atá bonneagair ó chód Terraform;

Ailtireacht an chórais fhorbartha

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Comhpháirteanna a úsáideadh:

  • Aviasales API — úsáidfear na sonraí a sheolann an API seo ar ais le haghaidh gach oibre ina dhiaidh sin;
  • Cás Táirgeora EC2 — meaisín fíorúil rialta sa néal ar a nginfear an sruth sonraí ionchuir:
    • Gníomhaire Kinesis Is feidhmchlár Java é atá suiteáilte go háitiúil ar an meaisín a sholáthraíonn bealach éasca chun sonraí a bhailiú agus a sheoladh chuig Kinesis (Sruthanna Sonraí Kinesis nó Kinesis Firehose). Déanann an gníomhaire monatóireacht leanúnach ar shraith comhad sna heolairí sonraithe agus cuireann sé sonraí nua chuig Kinesis;
    • Script Glaoiteora API — Script Python a dhéanann iarratais chuig an API agus a chuireann an freagra i bhfillteán a ndéanann Gníomhaire Kinesis monatóireacht air;
  • Sruthanna Sonraí Kinesis — seirbhís sruthaithe sonraí fíor-ama a bhfuil cumais leathanscála aici;
  • Anailísíocht Kinesis is seirbhís gan fhreastalaí é a shimplíonn anailís ar shonraí sruthaithe i bhfíor-am. Déanann Amazon Kinesis Data Analytics acmhainní feidhmchláir a chumrú agus scálaí go huathoibríoch chun aon mhéid sonraí a thagann isteach a láimhseáil;
  • AWS Lambda — seirbhís a ligeann duit cód a rith gan tacaíocht a thabhairt do fhreastalaithe ná iad a shocrú. Déantar gach cumhacht ríomhaireachta a scála go huathoibríoch do gach glao;
  • Amazon DynamoDB - Bunachar sonraí de phéirí agus de dhoiciméid eochairluacha a sholáthraíonn latency níos lú ná 10 milleasoicindí agus iad ag rith ar scála ar bith. Agus DynamoDB in úsáid agat, ní gá duit freastalaithe ar bith a sholáthar, a phaisteáil nó a bhainistiú. Déanann DynamoDB táblaí a scála go huathoibríoch chun méid na n-acmhainní atá ar fáil a choigeartú agus ardfheidhmíocht a chothabháil. Níl aon riarachán córais ag teastáil;
  • SNS Amazon - seirbhís lán-bhainistithe chun teachtaireachtaí a sheoladh ag baint úsáide as an tsamhail foilsitheoir-síntiúsóra (Teach Tábhairne/Fo), lenar féidir leat micreasheirbhísí, córais dáilte agus feidhmchláir gan fhreastalaí a leithlisiú. Is féidir SNS a úsáid chun faisnéis a sheoladh chuig úsáideoirí deiridh trí fhógraí brú soghluaiste, teachtaireachtaí SMS agus ríomhphoist.

Traenáil tosaigh

Chun aithris a dhéanamh ar an sreabhadh sonraí, chinn mé úsáid a bhaint as an bhfaisnéis ticéad aerlíne a chuir an Aviasales API ar ais. IN doiciméadú liosta go leor fairsing de mhodhanna éagsúla, a ligean ar a ghlacadh ar cheann acu - "Féilire Praghsanna Míosúil", a thugann ar ais praghsanna do gach lá den mhí, grúpáilte de réir líon na n-aistrithe. Mura sonraíonn tú an mhí chuardaigh san iarratas, cuirfear faisnéis ar ais don mhí i ndiaidh na míosa reatha.

Mar sin, déanaimis clárú agus faigh ár n-chomhartha.

Tá iarratas samplach thíos:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Oibreoidh an modh thuas chun sonraí a fháil ón API trí chomhartha a shonrú san iarratas, ach is fearr liom an comhartha rochtana a chur tríd an gceanntásc, mar sin úsáidfimid an modh seo sa script api_caller.py.

Sampla freagra:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Taispeánann an freagra API samplach thuas ticéad ó St. Petersburg go Phuk... Ó, cén aisling...
Ós rud é gur as Kazan mé, agus nach bhfuil i Phuket anois ach “brionglóid”, lorgaimis ticéid ó St Petersburg go Kazan.

Glacann sé leis go bhfuil cuntas AWS agat cheana féin. Ba mhaith liom aird ar leith a tharraingt láithreach ar an bhfíric nach bhfuil Kinesis agus fógraí a sheoladh trí SMS san áireamh sa chlár bliantúil. Sraith In Aisce (úsáid saor in aisce). Ach fiú in ainneoin seo, le cúpla dollar san áireamh, is féidir go leor a thógáil ar an gcóras atá beartaithe agus ag imirt leis. Agus, ar ndóigh, ná déan dearmad na hacmhainní go léir a scriosadh tar éis nach bhfuil siad ag teastáil a thuilleadh.

Go fortunately, beidh feidhmeanna DynamoDb agus lambda saor in aisce dúinn má chomhlíonann muid ár dteorainneacha in aisce míosúla. Mar shampla, i gcás DynamoDB: 25 GB de stóras, 25 WCU/RCU agus 100 milliún fiosrúchán. Agus glaonna feidhm lambda milliún in aghaidh na míosa.

Imscaradh córais láimhe

Socrú Sruthanna Sonraí Kinesis

A ligean ar dul chuig an tseirbhís Kinesis Sonraí Sruthanna agus a chruthú dhá shruth nua, shard amháin do gach ceann acu.

Cad is shard ann?
Is é shard an bunaonad aistrithe sonraí de shruth Kinesis Amazon. Soláthraíonn teascán amháin aistriú sonraí ionchuir ar luas 1 MB/s agus aistriú sonraí aschuir ar luas 2 MB/s. Tacaíonn teascán amháin suas le 1000 iontráil PUT in aghaidh an tsoicind. Agus sruth sonraí á chruthú, ní mór duit an líon riachtanach deighleoga a shonrú. Mar shampla, is féidir leat sruth sonraí a chruthú le dhá mhír. Soláthróidh an sruth sonraí seo aistriú sonraí ionchuir ag 2 MB/s agus aistriú sonraí aschuir ag 4 MB/s, ag tacú le suas le 2000 taifead PUT in aghaidh an tsoicind.

Dá mhéad smior i do shruth is ea is mó a thréchur. I bprionsabal, is é seo an chaoi a ndéantar sreafaí a scála - trí shards a chur leis. Ach dá mhéad shards atá agat, is airde an praghas. Cosnaíonn gach shard 1,5 cent in aghaidh na huaire agus 1.4 cent breise in aghaidh gach milliún aonad pálasta PUT.

Cruthaimis sruth nua leis an ainm ticéid_aerlíne, Is leor 1 shard dó:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Anois, déanaimis snáithe eile a chruthú leis an ainm speisialta_sruth:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí

Socrú táirgeora

Chun tasc a anailísiú, is leor sampla EC2 rialta a úsáid mar tháirgeoir sonraí. Ní gá gur meaisín fíorúil cumhachtach, costasach é; déanfaidh spot t2.micro go breá.

Nóta tábhachtach: mar shampla, ba cheart duit íomhá a úsáid - Amazon Linux AMI 2018.03.0, tá níos lú socruithe aige chun an Gníomhaire Kinesis a sheoladh go tapa.

Téigh go dtí an tseirbhís EC2, cruthaigh meaisín fíorúil nua, roghnaigh an AMI atá ag teastáil le cineál t2.micro, atá san áireamh sa tSraith In Aisce:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Ionas go mbeidh an meaisín fíorúil nuachruthaithe in ann idirghníomhú leis an tseirbhís Kinesis, ní mór cearta a thabhairt dó déanamh amhlaidh. Is é an bealach is fearr chun é seo a dhéanamh ná Ról IAM a shannadh. Dá bhrí sin, ar an scáileán Céim 3: Cumraigh Sonraí Cásanna, ba cheart duit a roghnú Cruthaigh Ról IAM nua:

Ról IAM a chruthú do EC2
Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Sa fhuinneog a osclaítear, roghnaigh go bhfuil ról nua á chruthú againn do EC2 agus téigh go dtí an rannán Ceadanna:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Ag baint úsáide as an sampla oiliúna, ní gá dúinn dul i ngleic leis na castaí cumraíochta gráinneach cearta acmhainne, mar sin roghnóimid na beartais atá réamh-chumraithe ag Amazon: AmazonKinesisFullAccess agus CloudWatchFullAccess.

Tabhair dúinn roinnt ainm brí don ról seo, mar shampla: EC2-KinesisStreams-FullAccess. Ba cheart go mbeadh an toradh mar a thaispeántar sa phictiúr thíos:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Tar éis duit an ról nua seo a chruthú, ná déan dearmad é a cheangal leis an sampla meaisín fíorúil cruthaithe:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Ní athraíonn muid aon rud eile ar an scáileán seo agus bogadh ar aghaidh go dtí an chéad fhuinneog eile.

Is féidir na socruithe tiomántán crua a fhágáil mar réamhshocrú, chomh maith leis na clibeanna (cé gur dea-chleachtas é clibeanna a úsáid, ar a laghad ainm a thabhairt don ásc agus cuir an timpeallacht in iúl).

Anois táimid ar an gcluaisín Céim 6: Cumraigh Grúpa Slándála, nuair is gá duit ceann nua a chruthú nó do ghrúpa Slándála atá ann cheana féin a shonrú, a ligeann duit nascadh trí ssh (port 22) leis an gcás. Roghnaigh Foinse -> Mo IP ann agus is féidir leat an sampla a sheoladh.

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Chomh luath agus a athraíonn sé go stádas reatha, is féidir leat iarracht a dhéanamh ceangal leis trí ssh.

Chun a bheith in ann oibriú le Gníomhaire Kinesis, tar éis duit an meaisín a nascadh go rathúil, ní mór duit na horduithe seo a leanas a chur isteach sa chríochfort:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Cruthaimis fillteán chun freagraí API a shábháil:

sudo mkdir /var/log/airline_tickets

Sula dtosaíonn tú ar an ngníomhaire, ní mór duit a chumrú a chumrú:

sudo vim /etc/aws-kinesis/agent.json

Ba cheart go mbeadh cuma mar seo ar inneachar an chomhaid agent.json:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Mar is léir ón gcomhad cumraíochta, déanfaidh an gníomhaire monatóireacht ar chomhaid leis an síneadh .log san eolaire /var/log/airline_tickets/, parsáil iad agus aistreoidh sé iad chuig an sruth aerlíne_tickets.

Déanaimid an tseirbhís a atosú agus déanaimid cinnte de go bhfuil sí ar bun agus ag feidhmiú:

sudo service aws-kinesis-agent restart

Anois déanaimis an script Python a íoslódáil a iarrfaidh sonraí ón API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Iarrann an script api_caller.py sonraí ó Aviasales agus sábhálann sé an freagra faighte san eolaire a scanann an gníomhaire Kinesis. Tá cur i bhfeidhm an script seo sách caighdeánach, tá rang TicketsApi ann, ligeann sé duit an API a tharraingt go neamhshioncronach. Tugaimid ceanntásc le comhartha agus iarraimid paraiméadair chuig an rang seo:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Chun socruithe cearta agus feidhmiúlacht an ghníomhaire a thástáil, déanaimis an script api_caller.py a thástáil:

sudo ./api_caller.py TOKEN

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Agus féachaimid ar thoradh na hoibre sna logaí Gníomhaire agus ar an táb Monatóireachta sa sruth sonraí airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Mar a fheiceann tú, oibríonn gach rud agus cuireann an Gníomhaire Kinesis sonraí chuig an sruth go rathúil. Anois déanaimis an tomhaltóir a chumrú.

Anailísíocht Sonraí Kinesis a bhunú

Bogfaimid ar aghaidh go dtí an chomhpháirt lárnach den chóras iomlán - cruthaigh feidhmchlár nua in Kinesis Data Analytics darb ainm kinesis_analytics_airlines_app:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Ligeann Kinesis Data Analytics duit anailísíocht sonraí fíor-ama a dhéanamh ó Kinesis Streams ag baint úsáide as an teanga SQL. Is seirbhís lánscálaithe í (murab ionann agus Kinesis Streams):

  1. ligeann sé duit sruthanna nua (Sruth Aschuir) a chruthú bunaithe ar iarratais chun sonraí a aimsiú;
  2. soláthraíonn sé sruth le hearráidí a tharla agus feidhmchláir ag rith (Sruth Earráide);
  3. is féidir leis an scéim sonraí ionchuir a chinneadh go huathoibríoch (is féidir é a ath-shainmhíniú de láimh más gá).

Ní seirbhís saor é seo - 0.11 USD in aghaidh na huaire oibre, mar sin ba chóir duit é a úsáid go cúramach agus é a scriosadh nuair a bhíonn tú críochnaithe.

Déanaimis an feidhmchlár a nascadh leis an bhfoinse sonraí:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Roghnaigh an sruth a bhfuilimid chun nascadh leis (airline_tickets):

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Ansin, ní mór duit Ról IAM nua a cheangal ionas gur féidir leis an bhfeidhmchlár léamh ón sruth agus scríobh chuig an sruth. Chun seo a dhéanamh, is leor gan aon rud a athrú sa bhloc ceadanna Rochtana:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Anois iarraimid go bhfuarthas amach an scéimre sonraí sa sruth; chun é seo a dhéanamh, cliceáil ar an gcnaipe “Aimsigh scéimre”. Mar thoradh air sin, déanfar ról IAM a nuashonrú (cruthú) agus seolfar braite scéimre ó na sonraí a tháinig isteach sa sruth cheana féin:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Anois ní mór duit dul chuig an eagarthóir SQL. Nuair a chliceálann tú ar an gcnaipe seo, feicfear fuinneog ag iarraidh ort an feidhmchlár a sheoladh - roghnaigh cad ba mhaith leat a sheoladh:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Cuir an cheist shimplí seo a leanas isteach i bhfuinneog an eagarthóra SQL agus cliceáil Sábháil agus Rith SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

I mbunachair shonraí choibhneasta, oibríonn tú le táblaí ag baint úsáide as ráitis INSERT chun taifid a chur leis agus ráiteas SELECT chun sonraí a fhiosrú. In Amazon Kinesis Data Analytics, oibríonn tú le sruthanna (STREAMs) agus caidéil (PUMPanna) - iarratais a chur isteach go leanúnach a chuireann sonraí ó shruth amháin in fheidhmchlár isteach i sruth eile.

Déanann an cheist SQL a chuirtear i láthair thuas cuardach ar thicéid Aeroflot ar chostas faoi bhun cúig mhíle rúbal. Cuirfear gach taifead a chomhlíonann na coinníollacha seo sa sruth DESTINATION_SQL_STREAM.

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Sa bhloc Ceann Scríbe, roghnaigh an sruth_speisialta_srutha, agus i liosta anuas an tsrutha In-fheidhmchláir DESTINATION_SQL_STREAM:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Ba cheart go mbeadh toradh na n-ionramhálacha go léir rud éigin cosúil leis an bpictiúr thíos:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí

Ag cruthú agus ag liostáil le topaic SNS

Téigh go dtí an tSeirbhís Fógraí Simplí agus cruthaigh ábhar nua ann leis an ainm Aerlínte:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Liostáil leis an ábhar seo agus cuir in iúl an uimhir fón póca a seolfar fógraí SMS chuici:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí

Cruthaigh tábla i DynamoDB

Chun na sonraí amh a stóráil óna sruth aerlíne_tickets, cruthaímid tábla i DynamoDB leis an ainm céanna. Úsáidfimid record_id mar phríomheochair:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí

Bailitheoir feidhm lambda a chruthú

Cruthaímid feidhm lambda ar a dtugtar Collector, a bheidh mar chúram air an sruth aerlíne_tickets a vótaíocht agus, má aimsítear taifid nua ann, cuir na taifid seo isteach sa tábla DynamoDB. Ar ndóigh, chomh maith leis na cearta réamhshocraithe, ní mór go mbeadh rochtain léite ag an lambda seo ar shruth sonraí Kinesis agus rochtain a scríobh ar DynamoDB.

Ról IAM a chruthú d’fheidhm lambda an bhailitheora
Ar dtús, cruthaimis ról IAM nua don lambda darb ainm Lambda-TicketsProcessingRole:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Mar shampla tástála, tá na beartais réamh-chumraithe AmazonKinesisReadOnlyAccess agus AmazonDynamoDBFullAccess oiriúnach go leor, mar a thaispeántar sa phictiúr thíos:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí

Ba cheart an lambda seo a sheoladh le truicear ó Kinesis nuair a théann iontrálacha nua isteach san aerlíne_sruth, mar sin ní mór dúinn truicear nua a chur leis:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Níl fágtha ach an cód a chur isteach agus an lambda a shábháil.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Ag cruthú fógra feidhm lambda

Cruthaítear an dara feidhm lambda, a dhéanfaidh monatóireacht ar an dara sruth (special_stream) agus a sheolfaidh fógra chuig SNS, ar bhealach comhchosúil. Mar sin, ní mór go mbeadh rochtain ag an lambda seo ar léamh ó Kinesis agus teachtaireachtaí a sheoladh chuig ábhar SNS ar leith, a sheolfaidh an tseirbhís SNS chuig gach síntiúsóir den ábhar seo (ríomhphost, SMS, etc.).

Ról IAM a chruthú
Ar dtús, cruthaímid ról IAM Lambda-KinesisAlarm don lambda seo, agus ansin sannaimid an ról seo don lambda alarm_notifier atá á chruthú:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí

Ba cheart go n-oibreodh an lambda seo ar thruicear le haghaidh taifid nua chun dul isteach sa special_stream, mar sin ní mór duit an truicear a chumrú ar an mbealach céanna agus a rinneamar don Bhailitheoir lambda.

Chun é a dhéanamh níos éasca an lambda seo a chumrú, tugaimid athróg timpeallachta nua isteach - TOPIC_ARN, áit a gcuirfimid ANR (Ainmneacha Athchúrsacha Amazon) den topaic Aerlínte:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Agus cuir isteach an cód lambda, níl sé casta ar chor ar bith:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Dealraíonn sé gurb é seo an áit a bhfuil cumraíocht an chórais láimhe críochnaithe. Níl fágtha ach a thástáil agus a chinntiú go bhfuil gach rud cumraithe i gceart againn.

Imscaradh ó chód Terraform

Ullmhúchán riachtanach

Terraform is uirlis foinse oscailte an-áisiúil é chun bonneagar ó chód a imscaradh. Tá a chomhréir féin aige atá éasca le foghlaim agus tá go leor samplaí ann de conas agus cad atá le himscaradh. Tá go leor breiseán áisiúil ag eagarthóir Atom nó Visual Studio Code a éascaíonn oibriú le Terraform.

Is féidir leat an dáileadh a íoslódáil dá bhrí sin. Tá anailís mhionsonraithe ar gach cumas Terraform lasmuigh de raon feidhme an ailt seo, agus mar sin cuirfimid teorainn ar na príomhphointí.

Conas tosú

Is é cód iomlán an tionscadail i mo stór. Clónaimid an stór dúinn féin. Sula dtosaíonn tú, ní mór duit a chinntiú go bhfuil AWS CLI suiteáilte agus cumraithe agat, mar gheall ar ... Lorgóidh Terraform dintiúir sa chomhad ~/.aws/credentials.

Is dea-chleachtas é ordú an phlean a rith sula n-imscarfar an bonneagar iomlán chun a fheiceáil cad atá Terraform ag cruthú dúinn sa scamall faoi láthair:

terraform.exe plan

Tabharfar leid duit uimhir theileafóin a chur isteach chun fógraí a sheoladh chuici. Ní gá dul isteach ann ag an gcéim seo.

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Tar éis dúinn plean oibríochta an chláir a anailísiú, is féidir linn tosú ar acmhainní a chruthú:

terraform.exe apply

Tar éis duit an t-ordú seo a sheoladh, iarrfar ort arís uimhir theileafóin a chur isteach; diailigh “tá” nuair a thaispeánfar ceist faoi na gníomhartha a dhéanamh i ndáiríre. Tabharfaidh sé seo deis duit an bonneagar iomlán a bhunú, gach cumraíocht riachtanach de EC2 a dhéanamh, feidhmeanna lambda a imscaradh, etc.

Tar éis na hacmhainní go léir a chruthú go rathúil tríd an gcód Terraform, ní mór duit dul isteach ar shonraí an iarratais Kinesis Analytics (ar an drochuair, níor aimsigh mé conas é seo a dhéanamh go díreach ón gcód).

Seol an feidhmchlár:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Tar éis seo, ní mór duit ainm an tsrutha in-fheidhmchláir a shocrú go sainráite trí roghnú ón liosta anuas:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Anois tá gach rud réidh le dul.

An t-iarratas a thástáil

Is cuma conas a d'imscar tú an córas, de láimh nó trí chód Terraform, oibreoidh sé mar an gcéanna.

Logálaimid isteach trí SSH chuig an meaisín fíorúil EC2 áit a bhfuil Kinesis Agent suiteáilte agus ritheann muid an script api_caller.py

sudo ./api_caller.py TOKEN

Níl le déanamh agat ach fanacht le SMS chuig d’uimhir:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
SMS - sroicheann an teachtaireacht ar an bhfón i mbeagnach 1 nóiméad:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí
Tá sé fós le feiceáil an ndearnadh na taifid a shábháil i mbunachar sonraí DynamoDB le haghaidh anailíse níos mionsonraithe ina dhiaidh sin. Tá thart ar na sonraí seo a leanas sa tábla aerlíne_ticéid:

Comhtháthú API Aviasales le Amazon Kinesis agus simplíocht gan fhreastalaí

Conclúid

Le linn na hoibre a rinneadh, tógadh córas próiseála sonraí ar líne bunaithe ar Amazon Kinesis. Breithníodh roghanna chun an Gníomhaire Kinesis a úsáid i gcomhar le Sruthanna Sonraí Kinesis agus anailísíocht fíor-ama Kinesis Analytics ag baint úsáide as orduithe SQL, chomh maith le hidirghníomhaíocht Amazon Kinesis le seirbhísí AWS eile.

Rinneamar an córas thuas a imscaradh ar dhá bhealach: ceann láimhe sách fada agus ceann tapa ón gcód Terraform.

Tá cód foinse an tionscadail ar fáil i mo stór GitHub, Molaim duit eolas a chur air.

Táim sásta an t-alt a phlé, táim ag tnúth le do chuid tuairimí. Tá súil agam le cáineadh cuiditheach.

Is mian liom rath ort!

Foinse: will.com

Add a comment