Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta

Haye Habr!

Ma jeceshahay diyaaradaha duulaya? Waan jeclahay, laakiin inta lagu jiro gooni-u-goynta waxaan sidoo kale jeclaaday falanqaynta xogta tikidhada hawada ee hal kheyraad oo caan ah - Aviasales.

Maanta waxaan falanqeyn doonaa shaqada Amazon Kinesis, waxaan dhisi doonaa nidaamka qulqulka leh falanqaynta waqtiga-dhabta ah, ku rakib Amazon DynamoDB NoSQL database sida kaydinta xogta ugu weyn, iyo dejinta ogeysiisyada SMS ee tigidhada xiisaha leh.

Faahfaahinta oo dhami waa hoos goynta! Tag!

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta

Horudhac

Tusaale ahaan, waxaan u baahannahay galaangal Aviasales API. Helitaanka waxaa lagu bixiyaa lacag la'aan iyo xannibaad la'aan; kaliya waxaad u baahan tahay inaad iska diiwaan geliso qaybta "Developers" si aad u hesho calaamada API-ga si aad u gasho xogta.

Ujeedada ugu weyn ee maqaalkani waa in aan faham guud ka bixiyo isticmaalka baahinta macluumaadka ee AWS, waxaan xisaabta ku dareynaa in xogta API-ku soo celisay aysan ahayn mid casri ah oo laga soo qaado kaydka, taas oo ah la sameeyay iyadoo lagu salaynayo raadinta isticmaaleyaasha bogagga Aviasales.ru iyo Jetradar.com 48-kii saac ee la soo dhaafay.

Kinesis-agent, oo lagu rakibay mishiinka wax soo saarka, ee lagu helo API ayaa si toos ah u kala saari doona oo u gudbin doona xogta la doonayo iyada oo loo marayo Kinesis Data Analytics. Nooca ceeriin ee qulqulkan ayaa si toos ah loogu qori doonaa dukaanka. Kaydinta xogta cayriin ee la geeyay DynamoDB waxay u oggolaan doontaa falanqaynta tigidhada qoto dheer iyada oo loo marayo aaladaha BI, sida AWS Quick Sight.

Waxaan tixgelin doonaa laba ikhtiyaar oo lagu geynayo kaabayaasha oo dhan:

  • Buug-gacmeedka - iyada oo loo marayo AWS Management Console;
  • Kaabayaasha koodhka Terraform waxaa loogu talagalay mashiinnada caajiska ah;

Dhismaha nidaamka la horumariyay

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Qaybaha la isticmaalay:

  • Aviasales API - xogta ay soo celisay API-ga waxa loo isticmaali doonaa dhammaan shaqada xigta;
  • Tusaalaha soo saaraha EC2 - mashiinka farsamada ee caadiga ah ee daruuraha kaas oo socodka xogta gelinta lagu soo saari doono:
    • Wakiilka Kinesis waa codsi Java ah oo lagu rakibay gudaha mashiinka kaas oo siinaya hab sahlan oo lagu ururiyo loona diro xogta Kinesis (Kinesis Data Streams ama Kinesis Firehose). Wakiilku wuxuu si joogto ah ula socdaa faylal badan oo ku jira tusaha la cayimay wuxuuna u diraa xog cusub Kinesis;
    • Qoraalka soo wacaha API - Qoraal Python ah oo soo gudbiya codsiyada API-ga oo ku dhejiya jawaabta gal ah oo ay kormeerto Wakiilka Kinesis;
  • Kinesis Data Streams - adeegga qulqulka xogta ee waqtiga-dhabta ah oo leh awood ballaaran;
  • Falanqaynta Kinesis waa adeeg aan server lahayn oo fududeeya falanqaynta xogta baahinta wakhtiga dhabta ah. Amazon Kinesis Data Analytics waxay habaysaa ilaha codsiga waxayna si toos ah u miisaamaysaa si ay u qabato mug kasta oo xogta soo socota;
  • AWS Lambda - adeeg kuu ogolaanaya inaad socodsiiso koodka adiga oo aan kaydin ama dejin server-yada. Dhammaan awoodda xisaabinta si toos ah ayaa loo miisaamayaa wicitaan kasta;
  • Amazon DynamoDB - Kaydka lamaanaha-qiimaha muhiimka ah iyo dukumentiyada bixiya daahida wax ka yar 10 millise seconds marka lagu ordo miisaan kasta. Markaad isticmaalayso DynamoDB, uma baahnid inaad bixiso, dhejiso, ama aad maamusho wax server ah. DynamoDB waxay si toos ah u miisaamaysaa miisaska si ay u hagaajiso cadadka agabka jira oo ay u ilaaliso waxqabadka sare. Looma baahna maamul nidaam;
  • Amazon SNS - adeeg si buuxda loo maareeyo oo loogu diro fariimaha iyadoo la adeegsanayo qaabka daabacaha-macaamilka (Pub/Sub), kaas oo aad ka saari karto adeeg-yaraha, nidaamyada la qaybiyay iyo codsiyada server-la'aanta ah. SNS waxa loo isticmaali karaa in lagu diro macluumaadka isticmaalayaasha dhamaadka iyada oo loo marayo ogaysiisyada riixitaanka moobilka, fariimaha SMS-ka iyo iimaylada.

Tababarka hore

Si aan ugu daydo socodka xogta, waxaan go'aansaday inaan isticmaalo macluumaadka tigidhada diyaarada ee uu soo celiyay Aviasales API. IN dukumentiyo Liis aad u ballaaran oo hababka kala duwan, aynu soo qaadanno mid ka mid ah - "Jadwalka Qiimaha billaha ah", kaas oo soo celisa qiimaha maalin kasta oo bisha ah, oo lagu soo koobay tirada wareejinta. Haddii aadan ku qeexin codsiga bisha raadinta, macluumaadka waa la soo celin doonaa bisha xigta ee hadda jirta.

Markaa, aynu is-diiwaangelino oo aynu helno calaamadayada.

Codsi tusaale ah waa hoos:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Habka kor ku xusan ee xogta laga helo API-ga iyadoo lagu qeexayo calaamadda codsiga ayaa shaqeyn doonta, laakiin waxaan doorbidayaa inaan ku gudbiyo calaamadda gelitaanka iyada oo loo marayo madaxa, markaa waxaan isticmaali doonaa habkan qoraalka api_caller.py.

Tusaale ahaan jawaabta:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Tusaalaha jawaabta API ee kore waxay tusinaysaa tigidh ka yimid St. Petersburg ilaa Phuk... Oh, maxay riyadu...
Maadaama aan ka imid Kazan, Phuketna hadda waa "riyo kaliya", aan raadinno tikidhada St. Petersburg ilaa Kazan.

Waxay u malaynaysaa inaad hore u lahayd akoon AWS ah. Waxaan jeclaan lahaa inaan isla markiiba soo jeediyo dareen gaar ah xaqiiqda ah in Kinesis iyo u dirida ogeysiisyada SMS-ka aan lagu darin sanadlaha Heerka bilaashka ah (isticmaalka bilaashka ah). Laakiin inkasta oo ay taasi jirto, iyada oo maskaxda lagu hayo laba doollar, waa suurtogal in la dhiso nidaamka la soo jeediyay oo lagu ciyaaro. Iyo, dabcan, ha ilaawin inaad tirtirto dhammaan agabyada ka dib markii aan loo baahnayn.

Nasiib wanaag, hawlaha DynamoDb iyo lambda waxay noo ahaan doonaan bilaash haddii aan la kulanno xadkayada bilaashka ah ee billaha ah. Tusaale ahaan, loogu talagalay DynamoDB: 25 GB ee kaydinta, 25 WCU/RCU iyo 100 milyan oo weydiimo ah. Iyo hal milyan oo lambda ah oo wacaya bishiiba.

Gelitaanka nidaamka gacanta

Dejinta Kinesis Data Streams

Aan tagno adeegga Kinesis Data Streams oo aan abuurno laba durdur oo cusub, midkiiba hal jeex ah.

Waa maxay jeexjeex?
Shard waa unugga wareejinta xogta aasaasiga ah ee qulqulka Amazon Kinesis. Qayb ka mid ah ayaa bixisa gudbinta xogta wax gelinta xawre dhan 1 MB/s iyo wareejinta xogta soo saarista xawli dhan 2 MB/s. Hal qayb ayaa taageerta ilaa 1000 PUT gelis ilbiriqsikii. Markaad abuureyso qulqulka xogta, waxaad u baahan tahay inaad qeexdo tirada loo baahan yahay ee qaybaha. Tusaale ahaan, waxaad samayn kartaa qulqulka xogta oo leh laba qaybood. Qulqulka xogtan ayaa ku siin doona wareejinta xogta gelinta 2 MB/s iyo wareejinta xogta soo saarista 4 MB/s, taageerta ilaa 2000 diiwaannada PUT ilbiriqsi kasta.

Mar kasta oo jeexjeexyada qulqulkaagu ku badan yihiin, ayaa ka sii weyn wax soo saarkooda. Mabda 'ahaan, tani waa sida qulqulka loo cabbiro - iyadoo lagu darayo jajabyo. Laakiin jeexjeexyada badan ee aad haysato, qiimaha sarreeya. Shimbir kasta waxa uu ku kacayaa 1,5 senti saacaddii iyo 1.4 senti oo dheeraad ah milyan kasta oo ah unugyada wax lagu shubo ee PUT.

Aan abuurno durdur cusub oo magaca leh tigidhada diyaaradaha, 1 xabo ayaa ku filnaan doona:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Hadda aan abuurno dun kale oo magaca leh gaar ah_stream:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta

Habaynta soo saaraha

Si loo falanqeeyo hawsha, waa ku filan inaad isticmaasho tusaale ahaan EC2 caadiga ah soo saaraha xogta. Ma aha inay noqoto mishiin xoog badan, qaali ah; meel t2.micro ah ayaa si fiican u samayn doonta.

Xusuusin muhiim ah: tusaale ahaan, waa inaad isticmaashaa sawirka - Amazon Linux AMI 2018.03.0, waxay leedahay jaangooyo yar oo si dhakhso ah loo bilaabo Wakiilka Kinesis.

Tag adeega EC2, samee mashiin cusub, dooro AMI la rabo oo leh nooca t2.micro, kaas oo ku jira Heerka Bilaashka ah:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Si mishiinka casriga ah ee dhawaan la sameeyay uu awood ugu yeesho inuu la falgalo adeegga Kinesis, waa in la siiyaa xuquuq ay sidaas ku samayso. Sida ugu fiican ee tan loo sameeyo waa in lagu meeleeyo Doorka IAM. Sidaa darteed, Talaabada 3: Habee Shaashadda Faahfaahinta Tusaalaha, waa inaad doorataa Abuur Door cusub oo IAM:

Abuuritaanka doorka IAM ee EC2
Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Daaqadda furmo, dooro inaan u abuureyno door cusub EC2 oo aad qaybta oggolaanshaha:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Isticmaalka tusaalaha tababarka, ma aha inaan galno dhammaan qallafsanaanta qaabeynta granular ee xuquuqda kheyraadka, marka waxaan dooran doonaa siyaasadaha ay horay u habeysay Amazon: AmazonKinesisFullAccess iyo CloudWatchFullAccess.

Aan u bixino magac macno leh doorkan, tusaale ahaan: EC2-KinesisStreams-FullAccess. Natiijadu waa inay la mid noqotaa sida ka muuqata sawirka hoose:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Kadib abuurista doorkan cusub, ha ilaawin inaad ku lifaaqdo tusaalaha mashiinka farsamada ee la abuuray:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Wax kale kama beddeleyno shaashaddan oo u gudub daaqadaha xiga.

Dejinta darawalada adag waxaa looga tagi karaa sida default, iyo sidoo kale tags (in kasta oo ay tahay ku dhaqanka wanaagsan in la isticmaalo tags, ugu yaraan siin tusaale ahaan magac iyo tilmaam deegaanka).

Hadda waxaan joognaa tillaabada 6: Isku-dubarid Kooxda Ammaanka tab, halkaas oo aad u baahan tahay inaad abuurto mid cusub ama qeex kooxdaada Amniga ee jirta, taasoo kuu oggolaaneysa inaad ku xirto ssh (dekedda 22) tusaale ahaan. Dooro Source -> My IP halkaas oo waxaad bilaabi kartaa tusaale ahaan.

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Isla marka ay u wareegto heerka socodsiinta, waxaad isku dayi kartaa inaad ku xidho ssh.

Si aad ula shaqeyso wakiilka Kinesis, ka dib markaad si guul leh ugu xirto mashiinka, waa inaad gelisaa amarrada soo socda ee terminalka:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Aan abuurno gal si aan u kaydiyo jawaabaha API:

sudo mkdir /var/log/airline_tickets

Kahor intaadan bilaabin wakiilka, waxaad u baahan tahay inaad habayso qaabkeeda:

sudo vim /etc/aws-kinesis/agent.json

Waxa ku jira faylka agent.json waa in uu u ekaado sidan:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Sida laga arki karo faylka qaabeynta, wakiilku wuxuu la socon doonaa faylalka leh .log kordhinta ee ku jirta /var/log/airline_tickets/ directory, kala saar oo u wareejin doona qulqulka tigidhada diyaaradaha.

Waxaan dib u bilownay adeegga waxaanan hubineynaa inuu shaqeynayo oo uu socdo:

sudo service aws-kinesis-agent restart

Hadda aan soo dejinno qoraalka Python kaas oo ka codsan doona xogta API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Qoraalka api_caller.py wuxuu ka codsadaa xogta Aviasales wuxuuna kaydiyaa jawaabta la helay tusaha uu wakiilka Kinesis sawiro. Hirgelinta qoraalkani waa heer caadi ah, waxaa jira fasalka TicketsApi, wuxuu kuu ogolaanayaa inaad si isku mid ah u jiido API-ga. Waxaan gudbineynaa madax leh calaamad waxaanan codsaneynaa cabbirrada fasalkan:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Si loo tijaabiyo habaynta saxda ah iyo shaqaynta wakiilka, aynu tijaabino qoraalka api_caller.py:

sudo ./api_caller.py TOKEN

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Oo waxaanu eegnaa natiijada shaqada ee diiwaanka wakiilka iyo tabka la socodka ee socodka xogta tigidhada diyaaradaha:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Sida aad arki karto, wax walba way shaqeeyaan oo Wakiilka Kinesis wuxuu si guul leh u soo diraa xogta qulqulka. Hadda aynu habaynno macaamilka.

Dejinta Kinesis Data Analytics

Aan u gudubno qaybta dhexe ee nidaamka oo dhan - ku samee codsi cusub Kinesis Data Analytics oo magaciisu yahay kinesis_analytics_airlines_app:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Falanqaynta Xogta Kinesis waxay kuu ogolaanaysaa inaad sameyso falanqaynta xogta wakhtiga dhabta ah ee Kinesis Streams adoo isticmaalaya luqadda SQL. Waa adeeg otomaatik ah (oo ka duwan Kinesis Streams) kaas:

  1. wuxuu kuu ogolaanayaa inaad abuurto durdurro cusub (Output Stream) oo ku salaysan codsiyada xogta ilaha;
  2. waxay ku siinaysaa qulqulka khaladaadka dhacay intii ay codsiyada socdeen (Error Stream);
  3. si toos ah ayey u go'aamin kartaa nidaamka xogta gelinta (waxaa dib loo qeexi karaa gacanta haddii loo baahdo).

Kani maaha adeeg raqiis ah - 0.11 USD saacaddii shaqada, markaa waa inaad si taxadar leh u isticmaashaa oo tirtirtaa markaad dhammayso.

Aan ku xidhno codsiga iyo isha xogta:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Dooro qulqulka aan ku xidhi doonno (airline_tickets):

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Marka xigta, waxaad u baahan tahay inaad ku lifaaqdo door cusub oo IAM ah si uu codsigu uga akhriyo qulqulka oo uu wax ugu qoro qulqulka. Si tan loo sameeyo, way ku filan tahay inaadan waxba ka beddelin xannibaadda oggolaanshaha Helitaanka:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Hadda aan codsano helitaanka xogta schema ee qulqulka; si tan loo sameeyo, dhagsii badhanka "Baahso schema". Natiijo ahaan, doorka IAM waa la cusboonaysiin doonaa (mid cusub ayaa la abuuri doonaa) iyo ogaanshaha schema ayaa laga bilaabi doonaa xogta horay u soo gashay qulqulka:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Hadda waxaad u baahan tahay inaad tagto tifaftiraha SQL. Markaad gujiso batoonkan, daaqad ayaa soo bixi doonta oo ku weydiinaysa inaad bilowdo codsiga - dooro waxaad rabto inaad bilowdo:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Geli su'aalaha fudud ee soo socda daaqada tifaftiraha SQL oo guji Save and Run SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Xogta la xidhiidha, waxaad la shaqeysaa miisaska adigoo isticmaalaya INSERT weedhaha si aad ugu darto diiwaanada iyo bayaanka SELECT ee xogta waydiinta. Falanqaynta Xogta Amazon Kinesis, waxaad la shaqeysaa durdurrada (STREAMs) iyo bambooyin (PUMPs) -codsiyada gelista joogtada ah ee gelinaya xogta hal il oo arji ah durdur kale.

Weydiinta SQL ee kor lagu soo bandhigay waxay raadinaysaa tigidhada Aeroflot oo qiimaheedu ka hooseeyo shan kun oo rubi. Dhammaan diiwaanada buuxiya shuruudahan waxa la gelin doonaa qulqulka DESTINATION_SQL_STREAM.

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Gudaha halka loo socdo, dooro socodka gaarka ah, iyo gudaha codsiga gudaha DESTINATION_SQL_STREAM liiska hoos u dhaca:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Natiijada dhammaan wax-is-daba-marintu waa inay noqotaa wax la mid ah sawirka hoose:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta

Abuuritaanka iyo ku biirista mawduuca SNS

Tag Adeegga Ogeysiinta Fudud oo halkaa ku samee mowduuc cusub oo wata magaca Diyaaradaha:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Isdiiwaangeli mawduucan oo sheeg lambarka taleefanka gacanta ee ogeysiisyada SMS-ka loo diri doono:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta

Ku samee miiska DynamoDB

Si loo kaydiyo xogta ceeriin ee qulqulka tigidhada diyaaradahooda, aynu ku abuurno miis DynamoDB leh isla magac. Waxaan u isticmaali doonaa record_id furaha koowaad:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta

Abuuritaanka ururiyaha shaqada lambda

Aynu abuurno hawsha lambda ee loo yaqaan Collector, kaas oo hawshiisu noqon doonto ra'yi ururinta qulqulka tigidhada diyaaradaha iyo, haddii diiwaanno cusub halkaas laga helo, geli diiwaanadan miiska DynamoDB. Sida iska cad, marka lagu daro xuquuqaha caadiga ah, lambdani waa inay lahaataa akhrinta marin u helka xogta Kinesis oo ay u qorto gelitaanka DynamoDB.

Abuurista doorka IAM ee shaqada lambda ururiyaha
Marka hore, aan u abuurno door IAM cusub oo loogu talagalay lambda ee lagu magacaabo Lambda-TicketsProcessingRole:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Tusaalaha tijaabada ah, AmazonKinesisReadOnlyAccess iyo AmazonDynamoDBFullAccess ee horay loo habeeyay waa kuwo ku habboon, sida sawirka hoose ka muuqda:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta

Lambda-kan waa in lagu bilaabo kicinta Kinesis marka gelinta cusub ay soo galaan airline_stream, markaa waxaan u baahanahay inaan ku darno kicin cusub:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Waxa hadhay oo dhan waa in la geliyo koodka oo la keydiyo lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Abuuritaanka ogeysiiska shaqada lambda

Shaqada labaad ee lambda, kaas oo la socon doona qulqulka labaad (special_stream) oo u diri doona ogeysiis SNS, ayaa loo abuuray si la mid ah. Sidaa darteed, lambdani waa inay awood u leedahay inay wax ka akhrido Kinesis oo ay u dirto farriimaha mawduuca SNS ee la bixiyay, ka dib waxaa u diri doona adeegga SNS dhammaan macaamiisha mawduucan (email, SMS, iwm.).

Abuuritaanka doorka IAM
Marka hore, waxaanu u abuurnay doorka IAM Lambda-KinesisAlarm ee lambda, ka dibna doorkan ku wareejinayna alarm_ ogeysiis lambda la abuuray:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta

Lambda Tani waa in ay ka shaqeyso kicinta diiwaannada cusub si ay u galaan special_stream , markaa waxaad u baahan tahay inaad u habayso kicinta si la mid ah sidii aan u samaynay lambda Collector.

Si loo fududeeyo habaynta lambda, aan soo bandhigno doorsooma deegaan cusub - TOPIC_ARN, halkaas oo aan dhigno ANR (Magacyada Recourse Names) ee mawduuca Duulimaadyada:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Oo geli koodka lambda, ma aha mid dhib badan:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Waxay u muuqataa in tani ay tahay meesha habaynta nidaamka gacanta lagu dhammeeyo. Waxa hadhay oo dhan waa in la tijaabiyo oo la hubiyo in aan wax walba si sax ah u habeynay.

Ka soo deji koodka Terraform

Diyaarinta lagama maarmaanka ah

Terraform waa qalab il furan oo aad ugu habboon in la geeyo kaabayaasha koodhka. Waxay leedahay hab-raac u gaar ah oo ay fududahay in la barto oo leh tusaalooyin badan oo sida iyo waxa la geynayo. Tifaftiraha Atom ama Visual Studio Code wuxuu leeyahay qalabyo badan oo anfacaya oo ka dhigaya la shaqaynta Terraform mid fudud.

Waxaad soo dejisan kartaa qaybinta halkan. Falanqaynta faahfaahsan ee dhammaan awoodaha Terraform waa ka baxsan baaxadda maqaalkan, markaa waxaan nafteena ku xaddidi doonaa qodobbada ugu muhiimsan.

Sida loo bilaabo

Xeerka buuxa ee mashruuca waa kaydkayga ku jira. Waxaan isku xireynaa keydka nafteena. Kahor intaadan bilaabin, waxaad u baahan tahay inaad hubiso inaad haysato AWS CLI oo la rakibay oo la habeeyey, sababtoo ah ... Terraform waxay ka raadin doontaa aqoonsiga faylka ~/.aws/credentials.

Dhaqanka wanaagsan waa in la socodsiiyo amarka qorshaha ka hor inta aan la geynin dhammaan kaabayaasha si loo arko waxa Terraform uu hadda nagu abuurayo daruuraha:

terraform.exe plan

Waxaa laguu sheegi doonaa inaad geliso lambarka taleefanka si aad ogeysiisyada ugu dirto. Looma baahna in la galo marxaladdan.

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Markaan falanqaynay qorshaha hawlgalka barnaamijka, waxaan bilaabi karnaa abuurista agab:

terraform.exe apply

Ka dib markaad dirto amarkan, waxaa mar kale lagu waydiin doonaa inaad geliso nambar telefoon; garaac "haa" marka su'aal ku saabsan samaynta ficilada la muujiyo. Tani waxay kuu ogolaaneysaa inaad dejiso dhammaan kaabayaasha, fuliso dhammaan qaabeynta lagama maarmaanka ah ee EC2, geyso hawlaha lambda, iwm.

Ka dib markii dhammaan agabka si guul leh loo abuuray iyada oo loo marayo code Terraform, waxaad u baahan tahay inaad gasho faahfaahinta codsiga Kinesis Analytics (nasiib darro, ma helin sida tan si toos ah looga sameeyo koodhka).

Bilaw codsiga:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Taas ka dib, waa inaad si cad u dejisaa magaca qulqulka codsiga dhexdiisa adiga oo ka dooranaya liiska hoos u dhaca:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Hadda wax walba waa u diyaar inay tagaan.

Tijaabinta arjiga

Iyadoo aan loo eegin sida aad u geysay nidaamka, gacanta ama code Terraform, waxay u shaqayn doontaa si la mid ah.

Waxaan ka galeynaa SSH mashiinka farsamada EC2 halkaas oo Kinesis Agent lagu rakibay oo maamula qoraalka api_caller.py

sudo ./api_caller.py TOKEN

Waxa kaliya oo ay tahay inaad sameyso waa inaad sugto SMS lambarkaaga:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
SMS- fariin ayaa ku soo dhacda taleefanka ku dhawaad ​​1 daqiiqo:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta
Way hadhsan tahay in la arko in diiwaanada lagu kaydiyay kaydka DynamoDB ee xiga, falanqayn faahfaahsan. Jadwalka diyaarada_tigidhada waxa uu ka kooban yahay ku dhawaad ​​xogtan:

Isku dhafka Aviasales API ee Amazon Kinesis iyo fududaanta server-la'aanta

gunaanad

Inta lagu jiro shaqada la qabtay, nidaamka habaynta xogta internetka ayaa la dhisay oo ku salaysan Amazon Kinesis. Ikhtiyaarada isticmaalka Wakiilka Kinesis ee la xidhiidha Kinesis Data Streams iyo falanqaynta wakhtiga dhabta ah ee Kinesis Analytics iyadoo la adeegsanayo amarrada SQL, iyo sidoo kale isdhexgalka Amazon Kinesis ee adeegyada kale ee AWS ayaa la tixgeliyey.

Nidaamka kor ku xusan waxaanu u dhignay laba siyaabood: mid dheer oo buug-gacmeed ah iyo mid degdeg ah oo ka yimid koodka Terraform.

Dhammaan koodka isha mashruuca waa la heli karaa ku jira kaydkayga GitHubWaxaan kuu soo jeedinayaa inaad is barato.

Waan ku faraxsanahay inaan ka hadlo maqaalka, waxaan rajeynayaa faallooyinkaaga. Waxaan rajeynayaa dhaleecayn wax dhisaysa.

Guul baan kuu rajaynayaa!

Source: www.habr.com

Add a comment