Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት

ሃይ ሀብር!

ማብረር ይወዳሉ? ወድጄዋለሁ፣ ነገር ግን ራስን ማግለል በነበረበት ወቅት ከአንድ ታዋቂ የመረጃ ምንጭ - Aviasales የአየር ትኬቶችን መረጃ በመተንተን ፍቅር ያዝኩ።

ዛሬ የአማዞን ኪኔሲስን ስራ እንመረምራለን ፣ የዥረት ስርዓት በእውነተኛ ጊዜ ትንታኔ እንገነባለን ፣ Amazon DynamoDB NoSQL ዳታቤዝ እንደ ዋና የመረጃ ማከማቻ እንጭናለን እና አስደሳች ለሆኑ ትኬቶች የኤስኤምኤስ ማሳወቂያዎችን እናዘጋጃለን።

ሁሉም ዝርዝሮች በቆራጩ ስር ናቸው! ሂድ!

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት

መግቢያ

ለምሳሌ, መዳረሻ ያስፈልገናል Aviasales ኤፒአይ. የሱ መዳረሻ ያለክፍያ እና ያለ ገደብ ይሰጣል፤ ውሂቡን ለመድረስ የእርስዎን API token ለመቀበል በ«ገንቢዎች» ክፍል ውስጥ መመዝገብ ብቻ ያስፈልግዎታል።

የዚህ ጽሁፍ ዋና አላማ በAWS ውስጥ ያለውን የመረጃ ስርጭት አጠቃቀምን በተመለከተ አጠቃላይ ግንዛቤን ለመስጠት ነው፡ በተጠቀመው ኤፒአይ የተመለሰው መረጃ በጥብቅ ያልተዘመነ እና ከካሼው የሚተላለፍ መሆኑን ከግምት ውስጥ እናስገባለን። በ Aviasales.ru እና Jetradar.com ድረ-ገጾች ተጠቃሚዎች ላለፉት 48 ሰአታት ባደረጉት ፍለጋ መሰረት ተመስርቷል።

በአምራች ማሽኑ ላይ የተጫነው Kinesis-ኤጀንት በኤፒአይ የተቀበለው በራስ ሰር ይተነትናል እና በ Kinesis Data Analytics በኩል ወደሚፈለገው ዥረት ያስተላልፋል። የዚህ ዥረት ጥሬ እትም በቀጥታ ወደ መደብሩ ይጻፋል። በDynamoDB ውስጥ የተዘረጋው የጥሬ መረጃ ማከማቻ እንደ AWS ፈጣን እይታ ባሉ BI መሳሪያዎች በኩል ጥልቅ የትኬት ትንተና እንዲኖር ያስችላል።

መላውን መሠረተ ልማት ለማሰማራት ሁለት አማራጮችን እንመለከታለን.

  • መመሪያ - በ AWS አስተዳደር ኮንሶል በኩል;
  • መሠረተ ልማት ከ Terraform ኮድ ለሰነፍ አውቶሜትሮች;

የተገነባው ስርዓት አርክቴክቸር

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
ያገለገሉ ክፍሎች፡-

  • Aviasales ኤፒአይ - በዚህ ኤፒአይ የተመለሰው መረጃ ለሁሉም ቀጣይ ስራዎች ጥቅም ላይ ይውላል;
  • EC2 የአምራች ምሳሌ - የግቤት ውሂብ ዥረቱ የሚፈጠርበት መደበኛ ምናባዊ ማሽን በደመና ውስጥ።
    • Kinesis ወኪል ወደ Kinesis (Kinesis Data Streams ወይም Kinesis Firehose) መረጃን ለመሰብሰብ እና ለመላክ ቀላል መንገድ የሚያቀርብ በማሽኑ ላይ በአካባቢው የተጫነ የጃቫ መተግበሪያ ነው። ተወካዩ በተገለጹት ማውጫዎች ውስጥ ያሉትን የፋይሎች ስብስብ በቋሚነት ይከታተላል እና አዲስ ውሂብ ወደ Kinesis ይልካል;
    • API ደዋይ ስክሪፕት - ወደ ኤፒአይ የሚጠይቅ የፓይዘን ስክሪፕት እና ምላሹን በ Kinesis Agent ክትትል በሚደረግበት አቃፊ ውስጥ ያስቀምጣል;
  • Kinesis የውሂብ ዥረቶች - ሰፊ የመለኪያ ችሎታዎች ያለው የእውነተኛ ጊዜ የውሂብ ማስተላለፍ አገልግሎት;
  • Kinesis ትንታኔ በእውነተኛ ጊዜ የዥረት ዳታ ትንታኔን ቀላል የሚያደርግ አገልጋይ አልባ አገልግሎት ነው። Amazon Kinesis Data Analytics የመተግበሪያ ሃብቶችን ያዋቅራል እና ማንኛውንም የገቢ ውሂብ መጠን ለመቆጣጠር በራስ-ሰር ይመዝናል;
  • ኤስኤስኤስ ላምዳ - ምትኬ ሳያደርጉ ወይም ሰርቨሮችን ሳያዘጋጁ ኮድ እንዲያሄዱ የሚያስችልዎ አገልግሎት። ለእያንዳንዱ ጥሪ ሁሉም የማስላት ኃይል በራስ-ሰር ይመዘናል;
  • Amazon DynamoDB - በማንኛውም ሚዛን ሲሰራ ከ10 ሚሊሰከንዶች በታች መዘግየትን የሚያቀርብ የቁልፍ-እሴት ጥንዶች እና ሰነዶች ዳታቤዝ። ዳይናሞዲቢን በሚጠቀሙበት ጊዜ ማንኛውንም አገልጋይ ማቅረብ፣ መለጠፍ ወይም ማስተዳደር አያስፈልግዎትም። ዳይናሞዲቢ ያሉትን ሀብቶች መጠን ለማስተካከል እና ከፍተኛ አፈጻጸም ለማስቀጠል ሰንጠረዦችን በራስ-ሰር ይመዝናል። የስርዓት አስተዳደር አያስፈልግም;
  • Amazon SNS - ማይክሮ አገልግሎቶችን ፣ ስርጭቶችን እና አገልጋይ-አልባ አፕሊኬሽኖችን የሚለዩበት የአታሚ-ደንበኝነት ተመዝጋቢ (Pub/Sub) ሞዴልን በመጠቀም መልዕክቶችን ለመላክ ሙሉ በሙሉ የሚተዳደር አገልግሎት። SNS በሞባይል የግፋ ማሳወቂያዎች፣ የኤስኤምኤስ መልዕክቶች እና ኢሜይሎች ለዋና ተጠቃሚዎች መረጃን ለመላክ ጥቅም ላይ ሊውል ይችላል።

የመጀመሪያ ደረጃ ስልጠና

የመረጃ ፍሰቱን ለመኮረጅ በAviasales API የተመለሰውን የአየር መንገድ ትኬት መረጃ ለመጠቀም ወሰንኩ። ውስጥ ሰነድ በጣም ሰፊ የተለያዩ ዘዴዎችን እንውሰድ ፣ ከመካከላቸው አንዱን እንውሰድ - “ወርሃዊ የዋጋ የቀን መቁጠሪያ” ፣ ይህም በየወሩ ለእያንዳንዱ ቀን ዋጋዎችን ይመልሳል ፣ በማስተላለፎች ብዛት ይመደባል ። በጥያቄው ውስጥ የፍለጋውን ወር ካልገለጹ፣ መረጃው ከአሁኑ ወር በኋላ ይመለሳል።

እንግዲያው, እንመዘገብ እና ቶክን እንውሰድ.

የምሳሌ ጥያቄ ከዚህ በታች ነው፡-

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

በጥያቄው ውስጥ ቶከንን በመግለጽ ከላይ ያለው መረጃ ከኤፒአይ የመቀበል ዘዴ ይሰራል ነገር ግን የመዳረሻ ማስመሰያውን በአርዕስቱ በኩል ማለፍ እመርጣለሁ ስለዚህ ይህንን ዘዴ በ api_caller.py ስክሪፕት ውስጥ እንጠቀማለን።

ምሳሌ መልስ፡-

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

ከላይ ያለው ምሳሌ ኤፒአይ ምላሽ ከሴንት ፒተርስበርግ እስከ ፉክ ያለውን ትኬት ያሳያል... ኦህ፣ እንዴት ያለ ህልም ነው...
እኔ ከካዛን ስለሆንኩ እና ፑኬት አሁን "ህልም ብቻ" ስለሆነ ከሴንት ፒተርስበርግ ወደ ካዛን ትኬቶችን እንፈልግ.

አስቀድሞ የAWS መለያ እንዳለህ ያስባል። Kinesis እና በኤስኤምኤስ ማሳወቂያዎችን መላክ በዓመታዊው ውስጥ ያልተካተቱ የመሆኑን እውነታ ወዲያውኑ ልዩ ትኩረት መስጠት እፈልጋለሁ. ነፃ ደረጃ (ነፃ አጠቃቀም). ነገር ግን ይህ ቢሆንም, ሁለት ዶላሮችን ግምት ውስጥ በማስገባት, የታቀደውን ስርዓት መገንባት እና ከእሱ ጋር መጫወት በጣም ይቻላል. እና በእርግጥ, ከአሁን በኋላ አስፈላጊ ካልሆኑ በኋላ ሁሉንም ሀብቶች መሰረዝን አይርሱ.

እንደ እድል ሆኖ፣ ወርሃዊ የነጻ ገደቦችን ካሟላን DynamoDb እና lambda ተግባራት ነፃ ይሆናሉ። ለምሳሌ፣ ለ DynamoDB፡ 25GB ማከማቻ፣ 25 WCU/RCU እና 100 ሚሊዮን መጠይቆች። እና በወር አንድ ሚሊዮን ላምዳ ተግባር ጥሪዎች።

በእጅ ስርዓት መዘርጋት

የ Kinesis ውሂብ ዥረቶችን በማቀናበር ላይ

ወደ Kinesis Data Streams አገልግሎት እንሂድ እና ሁለት አዳዲስ ዥረቶችን እንፍጠር፣ ለእያንዳንዱ አንድ ሸርተቴ።

ሻርድ ምንድን ነው?
ሻርድ የአማዞን ኪኔሲስ ዥረት መሰረታዊ የውሂብ ማስተላለፊያ ክፍል ነው። አንድ ክፍል በ 1 ሜባ / ሰ ፍጥነት የግቤት ውሂብ ማስተላለፍ እና በ 2 ሜባ / ሰ ፍጥነት የውሂብ ማስተላለፍን ያቀርባል. አንድ ክፍል በሰከንድ እስከ 1000 PUT ግቤቶችን ይደግፋል። የውሂብ ዥረት ሲፈጥሩ የሚፈለጉትን የክፍሎች ብዛት መግለጽ ያስፈልግዎታል። ለምሳሌ, ከሁለት ክፍሎች ጋር የውሂብ ዥረት መፍጠር ይችላሉ. ይህ የዳታ ዥረት የግብዓት መረጃን በ2 ሜባ/ሰ እና የውጤት ዳታ ማስተላለፍን በ4 ሜባ/ሰ ይሰጣል፣ ይህም በሰከንድ እስከ 2000 PUT ሪከርዶችን ይደግፋል።

በዥረትዎ ውስጥ ብዙ ሸርተቴዎች በበዙ ቁጥር የፍሰቱ መጠን ይጨምራል። በመርህ ደረጃ, ፍሰቶች እንዴት እንደሚመዘኑ ነው - ሸርቆችን በመጨመር. ነገር ግን ብዙ ሻርዶች ባላችሁ ቁጥር ዋጋው ከፍ ይላል። እያንዳንዱ ሻርድ በሰዓት 1,5 ሳንቲም እና ለእያንዳንዱ ሚሊዮን PUT የመጫኛ ክፍሎች ተጨማሪ 1.4 ሳንቲም ያስወጣል።

በስሙ አዲስ ዥረት እንፍጠር የአየር መንገድ_ትኬቶች, 1 ሻርክ ይበቃዋል;

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
አሁን ከስሙ ጋር ሌላ ክር እንፍጠር ልዩ_ዥረት:

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት

የአምራች ማዋቀር

አንድን ተግባር ለመተንተን መደበኛውን የኢ.ሲ.2 ምሳሌ እንደ ዳታ አዘጋጅ መጠቀም በቂ ነው። ኃይለኛ፣ ውድ የሆነ ምናባዊ ማሽን መሆን የለበትም፤ ስፖት t2.micro በትክክል ይሰራል።

ጠቃሚ ማሳሰቢያ፡ ለምሳሌ ምስልን መጠቀም አለቦት - Amazon Linux AMI 2018.03.0፣ የ Kinesis Agentን በፍጥነት ለመጀመር ቅንጅቶች አሉት።

ወደ EC2 አገልግሎት ይሂዱ ፣ አዲስ ቨርቹዋል ማሽን ይፍጠሩ ፣ የሚፈልጉትን ኤኤምአይ ከ t2.micro ዓይነት ጋር ይምረጡ ፣ ይህም በነፃ ደረጃ ውስጥ የተካተተ ነው ።

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
አዲስ የተፈጠረ ቨርቹዋል ማሽን ከኪኔሲስ አገልግሎት ጋር መስተጋብር መፍጠር እንዲችል ይህን ለማድረግ መብት ሊሰጠው ይገባል። ይህንን ለማድረግ ምርጡ መንገድ የIAM ሚና መመደብ ነው። ስለዚህ፣ በደረጃ 3፡ የአብነት ዝርዝሮችን ስክሪን አዋቅር፣ መምረጥ አለብህ አዲስ የIAM ሚና ይፍጠሩ:

ለ EC2 የIAM ሚና መፍጠር
Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
በሚከፈተው መስኮት ውስጥ ለ EC2 አዲስ ሚና እየፈጠርን መሆኑን ይምረጡ እና ወደ ፈቃዶች ክፍል ይሂዱ።

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
የሥልጠና ምሳሌውን በመጠቀም፣ ወደ ሁሉም ውስብስብ ነገሮች ውስጥ መግባት የለብንም የሀብት መብቶች ቅንጅቶች፣ ስለዚህ በአማዞን ቀድሞ የተዋቀሩ ፖሊሲዎችን እንመርጣለን-AmazonKinesisFullAccess እና CloudWatchFullAccess።

ለዚህ ሚና አንዳንድ ትርጉም ያለው ስም እንስጥ፣ ለምሳሌ፡- EC2-KinesisStreams-FullAccess። ውጤቱ ከዚህ በታች ባለው ስእል ላይ እንደሚታየው አንድ አይነት መሆን አለበት.

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
ይህንን አዲስ ሚና ከፈጠሩ በኋላ ፣ ከተፈጠረው ምናባዊ ማሽን ምሳሌ ጋር ማያያዝን አይርሱ-

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
በዚህ ስክሪን ላይ ሌላ ምንም ነገር አንቀይርም እና ወደሚቀጥሉት መስኮቶች እንቀጥላለን።

የሃርድ ድራይቭ ቅንጅቶች በነባሪነት ሊቀመጡ ይችላሉ, እንዲሁም መለያዎች (ምንም እንኳን መለያዎችን መጠቀም ጥሩ ልምድ ቢሆንም, ቢያንስ ለአብነት ስም ይስጡ እና አካባቢውን ይጠቁሙ).

አሁን ደረጃ 6 ላይ እንገኛለን፡ የደህንነት ቡድንን አዋቅር፣ አዲስ መፍጠር ያለብህ ወይም ያለህን የደህንነት ቡድን ይግለጽ፣ ይህም በ ssh (ፖርት 22) ከምሳሌው ጋር እንድታገናኝ ያስችልሃል። ምንጭ -> የእኔ አይፒን ይምረጡ እና ምሳሌውን ማስጀመር ይችላሉ።

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
ልክ ወደ ሩጫ ሁኔታ እንደተለወጠ፣ በssh በኩል ከእሱ ጋር ለመገናኘት መሞከር ይችላሉ።

ከ Kinesis Agent ጋር ለመስራት በተሳካ ሁኔታ ከማሽኑ ጋር ከተገናኙ በኋላ በተርሚናል ውስጥ የሚከተሉትን ትዕዛዞች ማስገባት አለብዎት:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

የኤፒአይ ምላሾችን ለማስቀመጥ አቃፊ እንፍጠር፡-

sudo mkdir /var/log/airline_tickets

ወኪሉን ከመጀመርዎ በፊት አወቃቀሩን ማዋቀር ያስፈልግዎታል:

sudo vim /etc/aws-kinesis/agent.json

የኤጀንት.json ፋይል ይዘቶች ይህን መምሰል አለባቸው፡-

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

ከማዋቀሪያው ፋይል እንደሚታየው ወኪሉ በ /var/log/airline_tickets/ directory ውስጥ ባለው የሎግ ቅጥያ ፋይሎችን ይከታተላል እና ይተነተን እና ወደ አየር መንገድ_ቲኬቶች ዥረት ያስተላልፋል።

አገልግሎቱን እንደገና እንጀምራለን እና እየሰራ መሆኑን እናረጋግጣለን።

sudo service aws-kinesis-agent restart

አሁን ከኤፒአይ መረጃ የሚጠይቀውን የፓይዘን ስክሪፕት እናውርድ፡-

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

የ api_caller.py ስክሪፕት ከ Aviasales መረጃን ይጠይቃል እና የተቀበለውን ምላሽ የ Kinesis ወኪል በሚቃኘው ማውጫ ውስጥ ያስቀምጣል። የዚህ ስክሪፕት አተገባበር በጣም ደረጃውን የጠበቀ ነው፣ የቲኬት ኤፒ ክፍል አለ፣ ኤፒአይውን በተመሳሰል መልኩ እንዲጎትቱ ያስችልዎታል። ራስጌን ከቶከን ጋር እናልፋለን እና ለዚህ ክፍል መለኪያዎችን እንጠይቃለን፡-

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

የተወካዩን ትክክለኛ መቼቶች እና ተግባራዊነት ለመፈተሽ የ api_caller.py ስክሪፕት እንሞክር፡-

sudo ./api_caller.py TOKEN

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
እና በኤጀንት ምዝግብ ማስታወሻዎች እና በአየር መንገድ_ቲኬቶች የውሂብ ዥረት ውስጥ ባለው የክትትል ትር ላይ የሥራውን ውጤት እንመለከታለን።

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
እንደሚመለከቱት, ሁሉም ነገር ይሰራል እና የ Kinesis Agent በተሳካ ሁኔታ ውሂብ ወደ ዥረቱ ይልካል. አሁን ተጠቃሚን እናዋቅር።

Kinesis Data Analytics በማዋቀር ላይ

ወደ አጠቃላይ ስርዓቱ ማዕከላዊ አካል እንሂድ - በ Kinesis Data Analytics ውስጥ kinesis_analytics_airlines_app የሚል አዲስ መተግበሪያ እንፍጠር፡-

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
የኪኔሲስ ዳታ ትንታኔ የSQL ቋንቋን በመጠቀም ከ Kinesis Streams የእውነተኛ ጊዜ የውሂብ ትንታኔን እንዲሰሩ ይፈቅድልዎታል። እሱ ሙሉ በሙሉ በራስ የመመዘን አገልግሎት ነው (ከ Kinesis Streams በተለየ)፡-

  1. የውሂብ ምንጭ ጥያቄዎችን መሰረት በማድረግ አዲስ ዥረቶችን (የውጤት ዥረት) እንዲፈጥሩ ይፈቅድልዎታል;
  2. አፕሊኬሽኖች በሚሰሩበት ጊዜ የተከሰቱ ስህተቶችን ዥረት ያቀርባል (ስህተት ዥረት)።
  3. የግቤት ውሂብ እቅድን በራስ-ሰር መወሰን ይችላል (አስፈላጊ ከሆነ በእጅ ሊገለጽ ይችላል)።

ይህ ርካሽ አገልግሎት አይደለም - 0.11 ዶላር በሰዓት ሥራ, ስለዚህ በጥንቃቄ ይጠቀሙበት እና ሲጨርሱ ይሰርዙት.

መተግበሪያውን ከውሂቡ ምንጭ ጋር እናገናኘው፡-

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
የምንገናኝበትን ዥረት ይምረጡ (አየር መንገድ_ቲኬቶች)፡-

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
በመቀጠል አፕሊኬሽኑ ከዥረቱ ላይ ማንበብ እና በዥረቱ ላይ መጻፍ እንዲችል አዲስ IAM Role ማያያዝ አለብዎት። ይህንን ለማድረግ በመዳረሻ ፍቃዶች እገዳ ውስጥ ምንም ነገር አለመቀየር በቂ ነው፡-

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
አሁን በዥረቱ ውስጥ ያለውን የውሂብ መርሃ ግብር ለማግኘት እንጠይቅ፤ ይህንን ለማድረግ “አግኝ schema” ቁልፍን ጠቅ ያድርጉ። በውጤቱም፣ የIAM ሚና ይዘምናል (አዲስ ይፈጠራል) እና ቀደም ሲል በዥረቱ ላይ ከደረሰው መረጃ ሼማ ማወቂያ ይጀምራል፡

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
አሁን ወደ SQL አርታዒ መሄድ ያስፈልግዎታል. ይህን ቁልፍ ሲጫኑ አፕሊኬሽኑን እንዲጀምሩ የሚጠይቅ መስኮት ይመጣል - ማስጀመር የሚፈልጉትን ይምረጡ፡-

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
የሚከተለውን ቀላል መጠይቅ ወደ SQL አርታኢ መስኮት አስገባ እና አስቀምጥ እና አሂድ SQL ን ጠቅ አድርግ፡

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

በተዛማጅ ዳታቤዝ ውስጥ፣ መዝገቦችን ለመጨመር INSERT መግለጫዎችን እና የ SELECT መግለጫን በመጠቀም ከጠረጴዛዎች ጋር ትሰራለህ። በአማዞን ኪኔሲስ ዳታ ትንታኔ ውስጥ ከዥረቶች (STREAMs) እና ከፓምፖች (PUMPs) ጋር አብረው ይሰራሉ—በአንድ መተግበሪያ ውስጥ ከአንድ ዥረት ወደ ሌላ ዥረት የሚገቡ ጥያቄዎችን ያለማቋረጥ ያስገቡ።

ከላይ የቀረበው የSQL መጠይቅ ከአምስት ሺህ ሩብልስ በታች በሆነ ዋጋ የኤሮፍሎት ትኬቶችን ይፈልጋል። እነዚህን ሁኔታዎች የሚያሟሉ ሁሉም መዝገቦች በDESTINATION_SQL_STREAM ዥረት ውስጥ ይቀመጣሉ።

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
በመዳረሻ ብሎክ፣ልዩ_ዥረት ዥረቱን እና የውስጠ-መተግበሪያ ዥረት ስም DESTINATION_SQL_STREAM ተቆልቋይ ዝርዝር ውስጥ ይምረጡ።

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
የሁሉም ማጭበርበሮች ውጤት ከዚህ በታች ካለው ሥዕል ጋር ተመሳሳይ መሆን አለበት ።

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት

ለኤስኤንኤስ ርዕስ መፍጠር እና መመዝገብ

ወደ ቀላል የማሳወቂያ አገልግሎት ይሂዱ እና አየር መንገድ በሚለው ስም አዲስ ርዕስ ይፍጠሩ፡

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
ለዚህ ርዕስ ይመዝገቡ እና የኤስኤምኤስ ማሳወቂያዎች የሚላኩበትን የሞባይል ስልክ ቁጥር ያመልክቱ።

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት

በ DynamoDB ውስጥ ጠረጴዛ ይፍጠሩ

ጥሬውን መረጃ ከአየር መንገዳቸው_ቲኬቶች ዥረት ለማከማቸት፣ በዳይናሞ ዲቢ ውስጥ ተመሳሳይ ስም ያለው ሠንጠረዥ እንፍጠር። ሪከርድ_አይድን እንደ ዋና ቁልፍ እንጠቀማለን፡-

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት

ላምዳ ተግባር ሰብሳቢ መፍጠር

ሰብሳቢ የተባለውን ላምዳ ተግባር እንፍጠር፣ ስራው የአየር መንገድ_ቲኬቶችን ዥረት መምረጥ እና አዲስ መዝገቦች ከተገኙ እነዚህን መዝገቦች በDynamoDB ሰንጠረዥ ውስጥ እናስገባቸዋለን። በግልጽ ለማየት እንደሚቻለው፣ ከነባሪው መብቶች በተጨማሪ፣ ይህ ላምዳ የ Kinesis ውሂብ ዥረት ማንበብ እና የ DynamoDB መዳረሻን መፃፍ አለበት።

ለሰብሳቢ ላምዳ ተግባር የIAM ሚና መፍጠር
በመጀመሪያ፣ ላምዳ-ቲኬቶች ፕሮሰሲንግ ሚና ለተባለ ላምዳ አዲስ የIAM ሚና እንፍጠር፡-

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
ለሙከራ ምሳሌ፣ ከዚህ በታች ባለው ስእል እንደሚታየው አስቀድሞ የተዋቀረው AmazonKinesisReadOnlyAccess እና AmazonDynamoDBFullAccess ፖሊሲዎች በጣም ተስማሚ ናቸው።

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት

አዲስ ግቤቶች ወደ አየር መንገድ_ዥረት ሲገቡ ይህ ላምዳ በኪነሲስ ቀስቅሴ መጀመር አለበት፣ ስለዚህ አዲስ ቀስቅሴ ማከል አለብን፡-

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
የሚቀረው ኮዱን ማስገባት እና ላምዳውን ማስቀመጥ ብቻ ነው።

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

የላምዳ ተግባር አሳዋቂ መፍጠር

ሁለተኛው የላምዳ ተግባር፣ ሁለተኛውን ዥረት (ልዩ_ዥረት) የሚቆጣጠር እና ለኤስኤንኤስ ማሳወቂያ የሚልክ በተመሳሳይ መንገድ ተፈጥሯል። ስለዚህ ይህ ላምዳ ከ Kinesis ለማንበብ እና ለተሰጠ የኤስኤንኤስ ርዕስ መልእክት መላክ አለበት ፣ ይህም በኤስኤንኤስ አገልግሎት ለሁሉም የዚህ ርዕስ ተመዝጋቢዎች (ኢሜል ፣ ኤስኤምኤስ ፣ ወዘተ) ይላካል።

የ IAM ሚና መፍጠር
በመጀመሪያ፣ ለዚህ ​​ላምዳ የIAM ሚና Lambda-KinesisAlarm እንፈጥራለን፣ እና ይህን ሚና ለሚፈጠረው ማንቂያ_ማሳወቂያ ላምዳ እንመድባለን።

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት

ይህ ላምዳ አዲስ መዝገቦች ወደ ልዩ_ዥረት እንዲገቡ ቀስቅሴ ላይ መስራት አለበት፣ስለዚህ ቀስቅሴውን ለሰብሳቢ ላምዳ እንዳደረግነው በተመሳሳይ መንገድ ማዋቀር ያስፈልግዎታል።

ይህን ላምዳ ማዋቀር ቀላል ለማድረግ፣ የአየር መንገዱን ርዕስ ኤኤንአር (የአማዞን ሪኮርስ ስሞች) የምናስቀምጥበትን አዲስ የአካባቢ ተለዋዋጭ - TOPIC_ARN እናስተዋውቅ።

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
እና የ lambda ኮድ ያስገቡ ፣ ምንም የተወሳሰበ አይደለም

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

በእጅ የሚሰራ የስርዓት ውቅር የተጠናቀቀበት ቦታ ይህ ይመስላል። የቀረው ነገር መሞከር እና ሁሉንም ነገር በትክክል እንዳዋቀርን ማረጋገጥ ነው።

ከቴራፎርም ኮድ አሰማራ

አስፈላጊ ዝግጅት

Terraform መሠረተ ልማትን ከኮድ ለማሰማራት በጣም ምቹ ክፍት ምንጭ መሣሪያ ነው። ለመማር ቀላል የሆነ የራሱ የሆነ አገባብ አለው እና እንዴት እና ምን ማሰማራት እንዳለበት ብዙ ምሳሌዎች አሉት። የአቶም አርታኢ ወይም ቪዥዋል ስቱዲዮ ኮድ ከቴራፎርም ጋር መስራትን ቀላል የሚያደርጉ ብዙ ምቹ ተሰኪዎች አሉት።

ስርጭቱን ማውረድ ይችላሉ እዚህ. ስለ ሁሉም የቴራፎርም ችሎታዎች ዝርዝር ትንታኔ ከዚህ ጽሑፍ ወሰን በላይ ነው, ስለዚህ እራሳችንን ወደ ዋና ዋና ነጥቦች እንገድባለን.

እንዴት እንደሚጀመር

የፕሮጀክቱ ሙሉ ኮድ ነው። በእኔ ማከማቻ ውስጥ. ማከማቻውን ወደ ራሳችን እናዘጋዋለን። ከመጀመርዎ በፊት AWS CLI መጫኑን እና መዋቀሩን ማረጋገጥ አለብዎት ፣ ምክንያቱም… Terraform በ ~/.aws/credentials ፋይል ውስጥ ምስክርነቶችን ይፈልጋል።

ጥሩ ልምምድ ቴራፎርም በአሁኑ ጊዜ በደመና ውስጥ ምን እየፈጠረልን እንዳለ ለማየት መላውን መሠረተ ልማት ከማሰማራታችን በፊት የእቅድ ትዕዛዙን ማስኬድ ነው።

terraform.exe plan

ማሳወቂያዎችን ለመላክ ስልክ ቁጥር እንዲያስገቡ ይጠየቃሉ። በዚህ ደረጃ ላይ ማስገባት አስፈላጊ አይደለም.

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
የፕሮግራሙን የአሠራር እቅድ ከመረመርን በኋላ ምንጮችን መፍጠር እንችላለን-

terraform.exe apply

ይህንን ትእዛዝ ከላኩ በኋላ እንደገና ስልክ ቁጥር እንዲያስገቡ ይጠየቃሉ፤ ድርጊቶቹን በትክክል ስለመፈጸም ጥያቄ በሚታይበት ጊዜ “አዎ” ብለው ይደውሉ። ይህ መላውን መሠረተ ልማት ለማዘጋጀት ፣ ሁሉንም አስፈላጊ የ EC2 ውቅር ለማካሄድ ፣ ላምዳ ተግባራትን ለማሰማራት ፣ ወዘተ.

በ Terraform ኮድ በኩል ሁሉም ሀብቶች በተሳካ ሁኔታ ከተፈጠሩ በኋላ ወደ Kinesis Analytics አፕሊኬሽኑ ዝርዝሮች ውስጥ መግባት አለብዎት (እንደ አለመታደል ሆኖ ይህንን ከኮዱ በቀጥታ እንዴት ማድረግ እንዳለብኝ አላገኘሁም)።

መተግበሪያውን ያስጀምሩ፡-

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
ከዚህ በኋላ፣ ከተቆልቋይ ዝርዝሩ ውስጥ በመምረጥ የውስጠ-መተግበሪያ ዥረት ስምን በግልፅ ማዘጋጀት አለብዎት፡-

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
አሁን ሁሉም ነገር ለመሄድ ዝግጁ ነው.

ማመልከቻውን በመሞከር ላይ

ምንም ይሁን ምን ስርዓቱን በእጅ ወይም በቴራፎርም ኮድ እንዳሰማራ ፣ እሱ በተመሳሳይ መንገድ ይሰራል።

በSSH በኩል የ Kinesis Agent ወደተጫነበት EC2 ቨርቹዋል ማሽን እንገባለን እና የ api_caller.py ስክሪፕት እንሰራለን።

sudo ./api_caller.py TOKEN

ማድረግ ያለብዎት ወደ ቁጥርዎ ኤስኤምኤስ መጠበቅ ብቻ ነው፡-

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
ኤስኤምኤስ - መልእክቱ በ1 ደቂቃ ውስጥ ወደ ስልኩ ይደርሳል፡-

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት
መዝገቦቹ በDynamoDB ዳታቤዝ ውስጥ ተቀምጠው እንደሆነ ለቀጣይ፣ የበለጠ ዝርዝር ትንታኔ ለማየት ይቀራል። የአየር መንገድ_ቲኬቶች ሰንጠረዥ በግምት የሚከተለውን ውሂብ ይዟል፡-

Aviasales API ውህደት ከአማዞን Kinesis እና አገልጋይ አልባ ቀላልነት

መደምደሚያ

በተሰራው ስራ ላይ, በአማዞን ኪኔሲስ ላይ የተመሰረተ የመስመር ላይ የውሂብ ማቀነባበሪያ ስርዓት ተገንብቷል. የSQL ትዕዛዞችን በመጠቀም የ Kinesis ወኪልን ከ Kinesis Data Streams እና የእውነተኛ ጊዜ ትንታኔዎች ኪኔሲስ ትንታኔዎችን ለመጠቀም አማራጮች እንዲሁም Amazon Kinesis ከሌሎች የ AWS አገልግሎቶች ጋር ያለው ግንኙነት ግምት ውስጥ ገብቷል።

ከላይ ያለውን ስርዓት በሁለት መንገድ ዘርግተናል፡ ይልቁንም ረጅም ማንዋል እና ፈጣን ከቴራፎርም ኮድ።

ሁሉም የፕሮጀክት ምንጭ ኮድ ይገኛል። በእኔ GitHub ማከማቻ ውስጥ, እራስዎን በደንብ እንዲያውቁት እመክራችኋለሁ.

ጽሑፉን ለመወያየት ደስተኛ ነኝ, አስተያየቶችዎን በጉጉት እጠብቃለሁ. ገንቢ ትችት ለማግኘት ተስፋ አደርጋለሁ።

ስኬት እመኛለሁ!

ምንጭ: hab.com

አስተያየት ያክሉ