Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche

Hi Habr!

An toil leat itealain itealaich? Tha gaol agam air, ach fhad ‘s a bha mi fèin-aonaranach thuit mi ann an gaol le bhith a’ dèanamh anailis air dàta air tiogaidean adhair bho aon ghoireas ainmeil - Aviasales.

An-diugh nì sinn sgrùdadh air obair Amazon Kinesis, togaidh sinn siostam sruthadh le mion-sgrùdaidhean fìor-ùine, stàlaichidh sinn stòr-dàta Amazon DynamoDB NoSQL mar phrìomh stòradh dàta, agus cuiridh sinn fiosan SMS air dòigh airson tiogaidean inntinneach.

Tha a h-uile mion-fhiosrachadh fon ghearradh! Rach!

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche

Ro-ràdh

Mar eisimpleir, feumaidh sinn ruigsinneachd Aviasales API. Tha ruigsinneachd air a thoirt seachad an-asgaidh agus gun chuingealachaidhean; cha leig thu leas ach clàradh anns an roinn “Leasaichean” gus an comharra API agad fhaighinn airson faighinn chun dàta.

Is e prìomh adhbhar an artaigil seo tuigse choitcheann a thoirt seachad air cleachdadh sruthadh fiosrachaidh ann an AWS; bidh sinn a’ toirt aire nach eil an dàta a thill an API a chaidh a chleachdadh gu tur ùraichte agus gu bheil e air a ghluasad bhon tasgadan, a tha air a chruthachadh stèidhichte air rannsachaidhean le luchd-cleachdaidh làraich Aviasales.ru agus Jetradar.com airson na 48 uairean mu dheireadh.

Bidh àidseant Kinesis, air a chuir a-steach air an inneal toraidh, a gheibhear tron ​​​​API gu fèin-ghluasadach a’ parsadh agus a ’sgaoileadh dàta chun t-sruth a tha thu ag iarraidh tro Kinesis Data Analytics. Thèid an dreach amh den t-sruth seo a sgrìobhadh gu dìreach chun stòr. Leigidh an stòradh dàta amh a thèid a chleachdadh ann an DynamoDB airson mion-sgrùdadh tiogaidean nas doimhne tro innealan BI, leithid AWS Quick Sight.

Beachdaichidh sinn air dà roghainn airson a’ bhun-structair gu lèir a chleachdadh:

  • Leabhar-làimhe - tro AWS Management Console;
  • Tha bun-structar bho chòd Terraform airson fèin-ghluasadan leisg;

Ailtireachd an t-siostam leasaichte

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Co-phàirtean air an cleachdadh:

  • Aviasales API - thèid an dàta a thilleas an API seo a chleachdadh airson gach obair às deidh sin;
  • Cùis Riochdaire EC2 - inneal brìgheil cunbhalach san sgòth air an tèid an sruth dàta cuir a-steach a chruthachadh:
    • Neach-ionaid Kinesis na aplacaid Java air a chuir a-steach gu h-ionadail air an inneal a bheir seachad dòigh furasta air dàta a chruinneachadh agus a chuir gu Kinesis (Kinesis Data Streams no Kinesis Firehose). Bidh an neach-ionaid an-còmhnaidh a’ cumail sùil air seata de fhaidhlichean anns na clàran ainmichte agus a’ cur dàta ùr gu Kinesis;
    • Sgriob neach-gairm API - Sgriobt Python a nì iarrtasan don API agus a chuireas am freagairt ann am pasgan a tha air a sgrùdadh leis an Kinesis Agent;
  • Sruth dàta Kinesis - seirbheis sruthadh dàta fìor-ùine le comasan sgèileadh farsaing;
  • Mion-sgrùdadh Kinesis na sheirbheis gun fhrithealaiche a bhios a’ sìmpleachadh mion-sgrùdadh air dàta sruthadh ann an àm fìor. Bidh Amazon Kinesis Data Analytics a’ rèiteachadh goireasan tagraidh agus a’ sgèileadh gu fèin-ghluasadach gus tomhas de dhàta a thig a-steach a làimhseachadh;
  • AWS Lambda - seirbheis a leigeas leat còd a ruith gun a bhith a’ cumail suas no a’ stèidheachadh frithealaichean. Tha a h-uile cumhachd coimpiutaireachd air a sgèileadh gu fèin-ghluasadach airson gach gairm;
  • Amazon DynamoDB - Stòr-dàta de chàraidean agus sgrìobhainnean prìomh luach a bheir seachad latency nas lugha na 10 milliseconds nuair a bhios iad a’ ruith aig sgèile sam bith. Nuair a bhios tu a’ cleachdadh DynamoDB, cha leig thu leas frithealaichean sam bith a sholarachadh, a phasgadh no a riaghladh. Bidh DynamoDB gu fèin-ghluasadach a’ sgèileadh chlàran gus na tha de ghoireasan ri fhaighinn atharrachadh agus àrd-choileanadh a chumail suas. Chan eil feum air rianachd siostam;
  • SNS Amazon - seirbheis làn-riaghlaidh airson teachdaireachdan a chuir a’ cleachdadh a’ mhodail foillsichear-fo-sgrìobhaiche (Taigh-seinnse / Fo), leis an urrainn dhut microservices, siostaman sgaoilte agus tagraidhean gun fhrithealaiche a sgaradh. Faodar SNS a chleachdadh gus fiosrachadh a chuir gu luchd-cleachdaidh deireannach tro fhiosan putaidh gluasadach, teachdaireachdan SMS agus puist-d.

Trèanadh tòiseachaidh

Gus an sruth dàta aithris, chuir mi romham am fiosrachadh tiogaid companaidh-adhair a thill an Aviasales API a chleachdadh. ANNS sgrìobhainnean liosta gu math farsaing de dhòighean eadar-dhealaichte, bheir sinn aon dhiubh - “Mìosachan Prìs Mìosail”, a thilleas prìsean airson gach latha den mhìos, air an cruinneachadh leis an àireamh de ghluasadan. Mura sònraich thu a’ mhìos sgrùdaidh san iarrtas, thèid fiosrachadh a thilleadh airson na mìos às deidh an tè làithreach.

Mar sin, leig leinn clàradh agus faigh an comharra againn.

Tha eisimpleir iarrtas gu h-ìosal:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Obraichidh an dòigh gu h-àrd airson dàta fhaighinn bhon API le bhith a’ sònrachadh tòcan san iarrtas, ach is fheàrr leam an comharra ruigsinneachd a thoirt seachad tron ​​​​cheann-cinn, agus mar sin cleachdaidh sinn an dòigh seo anns an sgriobt api_caller.py.

Freagair eisimpleir:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Tha an eisimpleir freagairt API gu h-àrd a’ sealltainn tiogaid bho St. Petersburg gu Phuk... Oh, dè an aisling...
Leis gur ann à Kazan a tha mi, agus Phuket a-nis “dìreach mar aisling”, leig dhuinn sùil a thoirt airson tiogaidean bho St. Petersburg gu Kazan.

Tha e a’ gabhail ris gu bheil cunntas AWS agad mu thràth. Bu mhath leam aire shònraichte a tharraing sa bhad nach eil Kinesis agus a’ cur fiosan tro SMS air an toirt a-steach don bhliadhna Ìre an-asgaidh (cleachdadh an-asgaidh). Ach eadhon a dh'aindeoin seo, le dà dholair san amharc, tha e gu math comasach an siostam a thathar a 'moladh a thogail agus cluich leis. Agus, gu dearbh, na dìochuimhnich a h-uile goireas a dhubhadh às às deidh nach eil feum orra tuilleadh.

Gu fortanach, bidh gnìomhan DynamoDb agus lambda an-asgaidh dhuinn ma choinnicheas sinn ris na crìochan an-asgaidh mìosail againn. Mar eisimpleir, airson DynamoDB: 25 GB de stòradh, 25 WCU / RCU agus 100 millean ceist. Agus millean gairm lambda gach mìos.

Cleachdadh siostam làimhe

A’ stèidheachadh sruthan dàta Kinesis

Rachamaid gu seirbheis Kinesis Data Streams agus cruthaich sinn dà shruth ùr, aon shard airson gach fear.

Dè a th' ann an sgadan?
Is e shard an aonad gluasad dàta bunaiteach de shruth Amazon Kinesis. Bidh aon roinn a’ toirt seachad gluasad dàta cuir a-steach aig astar 1 MB / s agus gluasad dàta toraidh aig astar 2 MB / s. Bidh aon roinn a’ toirt taic do suas ri 1000 inntrigeadh PUT gach diog. Nuair a chruthaicheas tu sruth dàta, feumaidh tu an àireamh riatanach de earrannan a shònrachadh. Mar eisimpleir, faodaidh tu sruth dàta a chruthachadh le dà earrann. Bheir an sruth dàta seo seachad gluasad dàta cuir a-steach aig 2 MB / s agus gluasad dàta toraidh aig 4 MB / s, a’ toirt taic do suas ri clàran 2000 PUT gach diog.

Mar as motha de shards anns an t-sruth agad, is ann as motha a bhios e a’ dol a-steach. Ann am prionnsabal, seo mar a tha sruthan air an sgèile - le bhith a 'cur shards ris. Ach mar as motha de shards a th’ agad, is ann as àirde a’ phrìs. Bidh gach shard a’ cosg 1,5 sgillin san uair agus 1.4 sgillin a bharrachd airson gach millean aonad pàighidh pàighidh PUT.

Nach cruthaich sinn sruth ùr leis an ainm companaidh-adhair_tiogaid, Bidh 1 shard gu leoir dha :

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
A-nis cruthaichidh sinn snàithlean eile leis an ainm sruth_sònraichte:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche

Suidheachadh riochdaire

Gus mion-sgrùdadh a dhèanamh air gnìomh, tha e gu leòr eisimpleir àbhaisteach EC2 a chleachdadh mar riochdaire dàta. Chan fheum e a bhith na inneal brìgheil cumhachdach, daor; nì spot t2.micro gu math.

Nòta cudromach: mar eisimpleir, bu chòir dhut ìomhaigh a chleachdadh - Amazon Linux AMI 2018.03.0, tha nas lugha de shuidheachaidhean aige airson an Kinesis Agent a chuir air bhog gu sgiobalta.

Rach gu seirbheis EC2, cruthaich inneal brìgheil ùr, tagh an AMI a tha thu ag iarraidh le seòrsa t2.micro, a tha air a ghabhail a-steach san t-sreath an-asgaidh:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Gus am bi an inneal brìgheil ùr-chruthaichte comasach air eadar-obrachadh leis an t-seirbheis Kinesis, feumar còraichean a thoirt dha sin a dhèanamh. Is e an dòigh as fheàrr air seo a dhèanamh dreuchd IAM a shònrachadh. Mar sin, air an sgrìn Ceum 3: Dèan rèiteachadh air Instance Details, bu chòir dhut taghadh Cruthaich dreuchd IAM ùr:

A’ cruthachadh dreuchd IAM airson EC2
Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Anns an uinneag a tha a’ fosgladh, tagh gu bheil sinn a’ cruthachadh dreuchd ùr airson EC2 agus rach chun roinn Ceadan:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
A’ cleachdadh an eisimpleir trèanaidh, cha leig sinn a leas a dhol a-steach don a h-uile iom-fhillteachd de rèiteachadh granular de chòraichean stòrais, agus mar sin taghaidh sinn na poileasaidhean a chaidh a dhealbhadh ro-làimh le Amazon: AmazonKinesisFullAccess agus CloudWatchFullAccess.

Bheir sinn beagan ainm brìoghmhor airson na dreuchd seo, mar eisimpleir: EC2-KinesisStreams-FullAccess. Bu chòir an toradh a bhith mar a chithear san dealbh gu h-ìosal:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Às deidh dhut an dreuchd ùr seo a chruthachadh, na dìochuimhnich a cheangal ris an eisimpleir inneal brìgheil a chaidh a chruthachadh:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Cha bhith sinn ag atharrachadh dad sam bith eile air an sgrion seo agus a’ gluasad air adhart gu na h-ath uinneagan.

Faodar na roghainnean cruaidh-chruaidh fhàgail mar an àbhaist, a bharrachd air na tagaichean (ged a tha e na chleachdadh math tagaichean a chleachdadh, co-dhiù thoir ainm don eisimpleir agus comharraich an àrainneachd).

A-nis tha sinn air an Step 6: Configure Security Group tab, far am feum thu fear ùr a chruthachadh no am buidheann Tèarainteachd a th’ agad mu thràth a shònrachadh, a leigeas leat ceangal tro ssh (port 22) ris an eisimpleir. Tagh Stòr -> My IP an sin agus faodaidh tu an eisimpleir a chuir air bhog.

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Cho luath ‘s a thionndaidheas e gu inbhe ruith, faodaidh tu feuchainn ri ceangal ris tro ssh.

Gus a bhith comasach air obrachadh le Kinesis Agent, às deidh dhut ceangal gu soirbheachail ris an inneal, feumaidh tu na h-òrdughan a leanas a chuir a-steach don cheann-uidhe:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Cruthaichidh sinn pasgan gus freagairtean API a shàbhaladh:

sudo mkdir /var/log/airline_tickets

Mus tòisich thu air an àidseant, feumaidh tu a rèiteachadh a rèiteachadh:

sudo vim /etc/aws-kinesis/agent.json

Bu chòir gum biodh susbaint an fhaidhle agent.json a’ coimhead mar seo:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Mar a chithear bhon fhaidhle rèiteachaidh, bidh an neach-ionaid a’ cumail sùil air faidhlichean leis an leudachadh .log anns an /var/log/airline_tickets/ directory, gan parsadh agus gan gluasad gu sruth airline_tickets.

Bidh sinn ag ath-thòiseachadh an t-seirbheis agus a’ dèanamh cinnteach gu bheil i ag obair:

sudo service aws-kinesis-agent restart

A-nis leig dhuinn an sgriobt Python a luchdachadh sìos a dh ’iarras dàta bhon API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Bidh an sgriobt api_caller.py ag iarraidh dàta bho Aviasales agus a’ sàbhaladh an fhreagairt a fhuaireadh anns an eòlaire a bhios an riochdaire Kinesis a’ sganadh. Tha buileachadh an sgriobt seo gu math àbhaisteach, tha clas TicketsApi ann, leigidh e leat an API a tharraing gu neo-chinnteach. Bidh sinn a’ dol seachad air bann-cinn le tòcan agus ag iarraidh paramadairean don chlas seo:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Gus deuchainn a dhèanamh air na roghainnean ceart agus gnìomhachd an neach-ionaid, leig dhuinn deuchainn a dhèanamh air an sgriobt api_caller.py:

sudo ./api_caller.py TOKEN

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Agus bidh sinn a’ coimhead air toradh na h-obrach anns na logaichean Agent agus air an taba Sgrùdaidh anns an t-sruth dàta airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Mar a chì thu, bidh a h-uile càil ag obair agus bidh an Kinesis Agent gu soirbheachail a’ cur dàta chun t-sruth. A-nis leig leinn neach-cleachdaidh a rèiteachadh.

Stèidhich Kinesis Data Analytics

Gluaisidh sinn air adhart gu prìomh phàirt an t-siostaim gu lèir - cruthaich tagradh ùr ann an Kinesis Data Analytics ainmichte kinesis_analytics_airlines_app:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Leigidh Kinesis Data Analytics leat mion-sgrùdadh dàta fìor-ùine a dhèanamh bho Kinesis Streams a’ cleachdadh cànan SQL. Is e seirbheis làn fèin-sgèile a th’ ann (eu-coltach ri Kinesis Streams) a tha:

  1. a’ leigeil leat sruthan ùra a chruthachadh (Sruth Toraidh) stèidhichte air iarrtasan airson dàta a lorg;
  2. a’ toirt seachad sruth le mearachdan a thachair fhad ‘s a bha tagraidhean a’ ruith (Sruth Mearachd);
  3. comasach air an sgeama dàta cuir a-steach a dhearbhadh gu fèin-ghluasadach (faodar ath-mhìneachadh le làimh ma tha sin riatanach).

Chan e seirbheis saor a tha seo - 0.11 USD gach uair a thìde de dh'obair, agus mar sin bu chòir dhut a chleachdadh gu faiceallach agus a sguabadh às nuair a bhios tu deiseil.

Nach ceangail sinn an tagradh ris an stòr dàta:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Tagh an t-sruth a tha sinn a’ dol a cheangal ris (airline_tickets):

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
An ath rud, feumaidh tu Dreuchd IAM ùr a cheangal gus an urrainn don tagradh leughadh bhon t-sruth agus sgrìobhadh chun t-sruth. Gus seo a dhèanamh, tha e gu leòr gun dad atharrachadh sa bhloc cead ruigsinneachd:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
A-nis iarramaid gun lorg sinn an sgeama dàta san t-sruth; gus seo a dhèanamh, cliog air a ’phutan“ Faigh a-mach schema ”. Mar thoradh air an sin, thèid dreuchd IAM ùrachadh (thèid fear ùr a chruthachadh) agus thèid lorg sgeamaichean a chuir air bhog bhon dàta a tha air ruighinn san t-sruth mar-thà:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
A-nis feumaidh tu a dhol gu deasaiche SQL. Nuair a phutas tu air a’ phutan seo, nochdaidh uinneag ag iarraidh ort an tagradh a chuir air bhog - tagh na tha thu airson a chuir air bhog:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Cuir a-steach a’ cheist shìmplidh a leanas ann an uinneag deasaiche SQL agus cliog air Save and Run SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Ann an stòran-dàta co-cheangailte, bidh thu ag obair le clàran a’ cleachdadh aithrisean INSERT gus clàran a chuir ris agus aithris SELECT gus dàta ceasnachadh. Ann an Amazon Kinesis Data Analytics, bidh thu ag obair le sruthan (STREAMs) agus pumpaichean (PUMPs) - cuir a-steach iarrtasan leantainneach a chuireas a-steach dàta bho aon sruth ann an tagradh gu sruth eile.

Bidh a’ cheist SQL a chaidh a thaisbeanadh gu h-àrd a’ lorg tiogaidean Aeroflot aig cosgais nas ìsle na còig mìle rubles. Thèid a h-uile clàr a choinnicheas ris na cumhaichean seo a chur san t-sruth DESTINATION_SQL_STREAM.

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Anns a’ bhloc Ceann-uidhe, tagh an t-sruth_special_stream, agus san liosta a-nuas ainm sruth In-app DESTINATION_SQL_STREAM:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Bu chòir toradh a h-uile làimhseachadh a bhith rudeigin coltach ris an dealbh gu h-ìosal:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche

A’ cruthachadh agus a’ fo-sgrìobhadh do chuspair SNS

Rach don t-Seirbheis Fiosrachaidh Simple agus cruthaich cuspair ùr an sin leis an ainm Airlines:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Clàraich a-steach don chuspair seo agus comharraich an àireamh fòn-làimhe far an tèid fiosan SMS a chuir:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche

Cruthaich clàr ann an DynamoDB

Gus an dàta amh a stòradh bhon t-sruth-adhair_tickets aca, cruthaichidh sinn clàr ann an DynamoDB leis an aon ainm. Cleachdaidh sinn record_id mar am prìomh iuchair:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche

Cruthaich neach-cruinneachaidh gnìomh lambda

Cruthaichidh sinn gnìomh lambda ris an canar Collector, a bhios mar dhleastanas air an t-sruth airline_tickets a sgrùdadh agus, ma lorgar clàran ùra an sin, cuir na clàran sin a-steach don chlàr DynamoDB. Gu dearbh, a bharrachd air na còraichean bunaiteach, feumaidh cothrom a bhith aig an lambda seo air sruth dàta Kinesis a leughadh agus ruigsinneachd a sgrìobhadh gu DynamoDB.

A’ cruthachadh dreuchd IAM airson gnìomh lambda neach-cruinneachaidh
An toiseach, cruthaichidh sinn dreuchd IAM ùr airson an lambda leis an t-ainm Lambda-TicketsProcessingRole:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Airson an eisimpleir deuchainn, tha na poileasaidhean ro-shuidhichte AmazonKinesisReadOnlyAccess agus AmazonDynamoDBFullAccess gu math freagarrach, mar a chithear san dealbh gu h-ìosal:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche

Bu chòir an lambda seo a chuir air bhog le inneal-brosnachaidh bho Kinesis nuair a thig inntrigidhean ùra a-steach don airline_stream, agus mar sin feumaidh sinn inneal-brosnachaidh ùr a chuir ris:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Chan eil air fhàgail ach an còd a chuir a-steach agus an lambda a shàbhaladh.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

A 'cruthachadh fios gnìomh lambda

Tha an dàrna gnìomh lambda, a nì sùil air an dàrna sruth (special_stream) agus a chuireas fios gu SNS, air a chruthachadh san aon dòigh. Mar sin, feumaidh cothrom a bhith aig an lambda seo leughadh bho Kinesis agus teachdaireachdan a chuir gu cuspair SNS sònraichte, a thèid an uairsin leis an t-seirbheis SNS gu luchd-aontachaidh a’ chuspair seo (post-d, SMS, msaa).

A’ cruthachadh dreuchd IAM
An toiseach, bidh sinn a’ cruthachadh dreuchd IAM Lambda-KinesisAlarm airson an lambda seo, agus an uairsin a’ sònrachadh an dreuchd seo don alarm_notifier lambda a thathas a’ cruthachadh:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche

Bu chòir don lambda seo obrachadh air inneal-brosnachaidh airson clàran ùra a dhol a-steach don special_stream, agus mar sin feumaidh tu an inneal-brosnachaidh a rèiteachadh san aon dòigh ’s a rinn sinn airson an neach-cruinneachaidh lambda.

Gus a dhèanamh nas fhasa an lambda seo a rèiteachadh, leig dhuinn caochladair àrainneachd ùr a thoirt a-steach - TOPIC_ARN, far am bi sinn a’ cur ANR (Ainmean Ath-chùrsa Amazon) den chuspair Airlines:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Agus cuir a-steach an còd lambda, chan eil e iom-fhillte idir:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Tha e coltach gur ann an seo a tha rèiteachadh an t-siostaim làimhe air a chrìochnachadh. Chan eil air fhàgail ach deuchainn agus dèanamh cinnteach gu bheil sinn air a h-uile càil a rèiteachadh gu ceart.

Cuir a-steach bho chòd Terraform

Ullachadh a dhìth

Terraform na inneal stòr fosgailte gu math goireasach airson bun-structar a chleachdadh bho chòd. Tha co-chòrdadh aige fhèin a tha furasta ionnsachadh agus tha mòran eisimpleirean ann air ciamar agus dè a bu chòir a chleachdadh. Tha mòran plugins feumail aig deasaiche Atom no Visual Studio Code a nì e nas fhasa obrachadh le Terraform.

Faodaidh tu an sgaoileadh a luchdachadh sìos bho seo. Tha mion-sgrùdadh mionaideach air a h-uile comas Terraform taobh a-muigh farsaingeachd an artaigil seo, agus mar sin bidh sinn gar cuingealachadh fhèin gu na prìomh phuingean.

Mar a thòisicheas tu

Tha an còd iomlan a 'phròiseact anns an ionad-tasgaidh agam. Bidh sinn a’ gleusadh an stòr dhuinn fhìn. Mus tòisich thu, feumaidh tu dèanamh cinnteach gu bheil AWS CLI agad air a chuir a-steach agus air a rèiteachadh, oir ... Coimheadaidh Terraform airson teisteanasan anns an fhaidhle ~/.aws/credentials.

Is e deagh chleachdadh a bhith a’ ruith àithne a’ phlana mus cleachdar am bun-structair gu lèir gus faicinn dè tha Terraform a’ cruthachadh dhuinn san sgòth an-dràsta:

terraform.exe plan

Thèid iarraidh ort àireamh fòn a chuir a-steach gus fiosan a chuir thuige. Chan eil e riatanach a dhol a-steach aig an ìre seo.

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Às deidh dhuinn plana gnìomh a’ phrògraim a mhion-sgrùdadh, is urrainn dhuinn tòiseachadh air goireasan a chruthachadh:

terraform.exe apply

Às deidh dhut an àithne seo a chuir, thèid iarraidh ort a-rithist àireamh fòn a chuir a-steach; dial “tha” nuair a thèid ceist mu bhith a’ coileanadh nan gnìomhan a thaisbeanadh. Leigidh seo leat am bun-structar gu lèir a stèidheachadh, an rèiteachadh riatanach de EC2 a dhèanamh, gnìomhan lambda a chuir an gnìomh, msaa.

Às deidh a h-uile goireas a bhith air a chruthachadh gu soirbheachail tro chòd Terraform, feumaidh tu a dhol a-steach don fhiosrachadh mun iarrtas Kinesis Analytics (gu mì-fhortanach, cha do lorg mi mar a nì thu seo gu dìreach bhon chòd).

Cuir air bhog an tagradh:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Às deidh seo, feumaidh tu ainm an t-srutha in-aplacaid a shuidheachadh gu soilleir le bhith a’ taghadh bhon liosta tuiteam-sìos:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
A-nis tha a h-uile dad deiseil airson a dhol.

A 'dèanamh deuchainn air an tagradh

Ge bith ciamar a chleachd thu an siostam, le làimh no tro chòd Terraform, obraichidh e mar an ceudna.

Bidh sinn a’ logadh a-steach tro SSH chun inneal brìgheil EC2 far a bheil Kinesis Agent air a chuir a-steach agus a’ ruith an sgriobt api_caller.py

sudo ./api_caller.py TOKEN

Chan eil agad ach feitheamh ri SMS chun àireamh agad:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
SMS - thig an teachdaireachd air a’ fòn ann an faisg air 1 mhionaid:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche
Tha e fhathast ri fhaicinn an deach na clàran a shàbhaladh ann an stòr-dàta DynamoDB airson mion-sgrùdadh nas mionaidiche às deidh sin. Tha timcheall air an dàta a leanas anns a’ chlàr airline_tickets:

Amalachadh API Aviasales le Amazon Kinesis agus sìmplidheachd gun fhrithealaiche

co-dhùnadh

Rè na h-obrach a chaidh a dhèanamh, chaidh siostam giollachd dàta air-loidhne a thogail stèidhichte air Amazon Kinesis. Chaidh beachdachadh air roghainnean airson a bhith a’ cleachdadh an Kinesis Agent ann an co-bhonn ri Kinesis Data Streams agus Kinesis Analytics anailitigeach fìor-ùine a’ cleachdadh òrdughan SQL, a bharrachd air eadar-obrachadh Amazon Kinesis le seirbheisean AWS eile.

Chleachd sinn an siostam gu h-àrd ann an dà dhòigh: leabhar-làimhe caran fada agus fear sgiobalta bhon chòd Terraform.

Tha a h-uile còd stòr pròiseict ri fhaighinn anns an stòr GitHub agam, Tha mi a 'moladh dhut eòlas fhaighinn air.

Tha mi toilichte bruidhinn mun artaigil, tha mi a’ coimhead air adhart ri do bheachdan. Tha mi an dòchas càineadh cuideachail.

Tha mi a 'guidhe gach soirbheachas dhut!

Source: www.habr.com

Cuir beachd ann