మేము స్ట్రీమ్ డేటా ప్రాసెసింగ్ పైప్‌లైన్‌ను సృష్టిస్తాము. 2 వ భాగము

అందరికి వందనాలు. మేము వ్యాసం యొక్క చివరి భాగం యొక్క అనువాదాన్ని భాగస్వామ్యం చేస్తున్నాము, ఇది కోర్సులోని విద్యార్థుల కోసం ప్రత్యేకంగా సిద్ధం చేయబడింది. డేటా ఇంజనీర్. మీరు మొదటి భాగాన్ని చదవగలరు ఇక్కడ.

రియల్-టైమ్ పైప్‌లైన్‌ల కోసం అపాచీ బీమ్ మరియు డేటాఫ్లో

మేము స్ట్రీమ్ డేటా ప్రాసెసింగ్ పైప్‌లైన్‌ను సృష్టిస్తాము. 2 వ భాగము

Google క్లౌడ్‌ని సెటప్ చేస్తోంది

గమనిక: పైథాన్ 3లో పైప్‌లైన్‌ను రన్ చేయడంలో నాకు సమస్య ఉన్నందున పైప్‌లైన్‌ను అమలు చేయడానికి మరియు అనుకూల లాగ్ డేటాను ప్రచురించడానికి నేను Google Cloud Shellని ఉపయోగించాను. Google Cloud Shell Apache Beamతో మరింత స్థిరంగా ఉండే Python 2ని ఉపయోగిస్తుంది.

పైప్లైన్ను ప్రారంభించడానికి, మేము సెట్టింగులలో కొద్దిగా త్రవ్వాలి. మీలో ఇంతకు ముందు GCP ఉపయోగించని వారి కోసం, మీరు ఇందులో వివరించిన క్రింది 6 దశలను అనుసరించాలి పేజీ.

దీని తర్వాత, మేము మా స్క్రిప్ట్‌లను Google క్లౌడ్ స్టోరేజ్‌కి అప్‌లోడ్ చేయాలి మరియు వాటిని మా Google క్లౌడ్ షెల్‌కి కాపీ చేయాలి. క్లౌడ్ నిల్వకు అప్‌లోడ్ చేయడం చాలా చిన్న విషయం (వివరణ కనుగొనవచ్చు ఇక్కడ) మా ఫైల్‌లను కాపీ చేయడానికి, దిగువన ఉన్న చిత్రం 2లో ఎడమవైపున ఉన్న మొదటి చిహ్నాన్ని క్లిక్ చేయడం ద్వారా మేము టూల్‌బార్ నుండి Google క్లౌడ్ షెల్‌ను తెరవవచ్చు.

మేము స్ట్రీమ్ డేటా ప్రాసెసింగ్ పైప్‌లైన్‌ను సృష్టిస్తాము. 2 వ భాగము
మూర్తి 2

ఫైల్‌లను కాపీ చేసి, అవసరమైన లైబ్రరీలను ఇన్‌స్టాల్ చేయడానికి మనకు అవసరమైన ఆదేశాలు క్రింద ఇవ్వబడ్డాయి.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

మా డేటాబేస్ మరియు పట్టికను సృష్టిస్తోంది

మేము అన్ని సెటప్ సంబంధిత దశలను పూర్తి చేసిన తర్వాత, మేము చేయవలసిన తదుపరి పని BigQueryలో డేటాసెట్ మరియు పట్టికను సృష్టించడం. దీన్ని చేయడానికి అనేక మార్గాలు ఉన్నాయి, అయితే ముందుగా డేటాసెట్‌ను సృష్టించడం ద్వారా Google క్లౌడ్ కన్సోల్‌ను ఉపయోగించడం చాలా సరళమైనది. మీరు క్రింది దశలను అనుసరించవచ్చు లింక్స్కీమాతో పట్టికను రూపొందించడానికి. మా టేబుల్ ఉంటుంది 7 నిలువు వరుసలు, ప్రతి వినియోగదారు లాగ్ యొక్క భాగాలకు అనుగుణంగా. సౌలభ్యం కోసం, మేము టైమ్‌లోకల్ వేరియబుల్ మినహా అన్ని నిలువు వరుసలను స్ట్రింగ్‌లుగా నిర్వచిస్తాము మరియు మేము ముందుగా రూపొందించిన వేరియబుల్స్ ప్రకారం వాటికి పేరు పెడతాము. మా టేబుల్ యొక్క లేఅవుట్ మూర్తి 3 లో వలె ఉండాలి.

మేము స్ట్రీమ్ డేటా ప్రాసెసింగ్ పైప్‌లైన్‌ను సృష్టిస్తాము. 2 వ భాగము
మూర్తి 3. టేబుల్ లేఅవుట్

వినియోగదారు లాగ్ డేటాను ప్రచురించడం

పబ్/సబ్ అనేది మా పైప్‌లైన్‌లో కీలకమైన భాగం ఎందుకంటే ఇది బహుళ స్వతంత్ర అప్లికేషన్‌లను ఒకదానితో ఒకటి కమ్యూనికేట్ చేయడానికి అనుమతిస్తుంది. ప్రత్యేకంగా, ఇది అప్లికేషన్‌ల మధ్య సందేశాలను పంపడానికి మరియు స్వీకరించడానికి మమ్మల్ని అనుమతించే మధ్యవర్తిగా పనిచేస్తుంది. మేము చేయవలసిన మొదటి విషయం ఏమిటంటే ఒక అంశాన్ని సృష్టించడం. కన్సోల్‌లోని పబ్/సబ్‌కి వెళ్లి, టాపిక్ సృష్టించు క్లిక్ చేయండి.

దిగువన ఉన్న కోడ్ పైన నిర్వచించిన లాగ్ డేటాను రూపొందించడానికి మా స్క్రిప్ట్‌ని పిలుస్తుంది మరియు ఆపై లాగ్‌లను కనెక్ట్ చేసి పబ్/సబ్‌కి పంపుతుంది. మనం చేయవలసింది ఒక వస్తువును సృష్టించడమే పబ్లిషర్ క్లయింట్, పద్ధతిని ఉపయోగించి అంశానికి మార్గాన్ని పేర్కొనండి topic_path మరియు ఫంక్షన్‌కు కాల్ చేయండి publish с topic_path మరియు డేటా. మేము దిగుమతి చేసుకుంటున్నామని దయచేసి గమనించండి generate_log_line మా స్క్రిప్ట్ నుండి stream_logs, కాబట్టి ఈ ఫైల్‌లు ఒకే ఫోల్డర్‌లో ఉన్నాయని నిర్ధారించుకోండి, లేకుంటే మీరు దిగుమతి ఎర్రర్‌ను పొందుతారు. మేము దీన్ని ఉపయోగించి మా గూగుల్ కన్సోల్ ద్వారా దీన్ని అమలు చేయవచ్చు:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

ఫైల్ రన్ అయిన వెంటనే, దిగువ చిత్రంలో చూపిన విధంగా, కన్సోల్‌కు లాగ్ డేటా యొక్క అవుట్‌పుట్‌ను చూడగలుగుతాము. మనం ఉపయోగించనంత కాలం ఈ స్క్రిప్ట్ పని చేస్తుంది CTRL + Cదాన్ని పూర్తి చేయడానికి.

మేము స్ట్రీమ్ డేటా ప్రాసెసింగ్ పైప్‌లైన్‌ను సృష్టిస్తాము. 2 వ భాగము
మూర్తి 4. అవుట్పుట్ publish_logs.py

Написание кода нашего конвейера

ఇప్పుడు మేము ప్రతిదీ సిద్ధం చేసాము, మేము సరదా భాగాన్ని ప్రారంభించవచ్చు - బీమ్ మరియు పైథాన్ ఉపయోగించి మా పైప్‌లైన్ కోడింగ్. బీమ్ పైప్‌లైన్‌ను రూపొందించడానికి, మేము పైప్‌లైన్ వస్తువు (p)ని సృష్టించాలి. మేము పైప్‌లైన్ ఆబ్జెక్ట్‌ను సృష్టించిన తర్వాత, ఆపరేటర్‌ని ఉపయోగించి మనం బహుళ ఫంక్షన్‌లను ఒకదాని తర్వాత ఒకటి వర్తింపజేయవచ్చు pipe (|). సాధారణంగా, వర్క్‌ఫ్లో క్రింద ఉన్న చిత్రం వలె కనిపిస్తుంది.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

మా కోడ్‌లో, మేము రెండు అనుకూల ఫంక్షన్‌లను సృష్టిస్తాము. ఫంక్షన్ regex_clean, ఇది డేటాను స్కాన్ చేస్తుంది మరియు ఫంక్షన్‌ని ఉపయోగించి PATTERNS జాబితా ఆధారంగా సంబంధిత అడ్డు వరుసను తిరిగి పొందుతుంది re.search. ఫంక్షన్ కామాతో వేరు చేయబడిన స్ట్రింగ్‌ను అందిస్తుంది. మీరు సాధారణ వ్యక్తీకరణ నిపుణుడు కాకపోతే, దీన్ని తనిఖీ చేయాలని నేను సిఫార్సు చేస్తున్నాను ట్యుటోరియల్ మరియు కోడ్‌ని తనిఖీ చేయడానికి నోట్‌ప్యాడ్‌లో ప్రాక్టీస్ చేయండి. దీని తర్వాత మేము కస్టమ్ ParDo ఫంక్షన్‌ని నిర్వచించాము స్ప్లిట్, ఇది సమాంతర ప్రాసెసింగ్ కోసం బీమ్ రూపాంతరం యొక్క వైవిధ్యం. పైథాన్‌లో, ఇది ప్రత్యేక పద్ధతిలో చేయబడుతుంది - మేము తప్పనిసరిగా DoFn బీమ్ క్లాస్ నుండి వారసత్వంగా పొందే తరగతిని సృష్టించాలి. స్ప్లిట్ ఫంక్షన్ మునుపటి ఫంక్షన్ నుండి అన్వయించబడిన అడ్డు వరుసను తీసుకుంటుంది మరియు మా BigQuery పట్టికలోని నిలువు వరుస పేర్లకు సంబంధించిన కీలతో నిఘంటువుల జాబితాను అందిస్తుంది. ఈ ఫంక్షన్ గురించి గమనించాల్సిన విషయం ఉంది: నేను దిగుమతి చేసుకోవలసి వచ్చింది datetime అది పని చేయడానికి ఒక ఫంక్షన్ లోపల. నేను ఫైల్ ప్రారంభంలో దిగుమతి ఎర్రర్‌ను పొందుతున్నాను, ఇది విచిత్రంగా ఉంది. ఈ జాబితా తర్వాత ఫంక్షన్‌కు పంపబడుతుంది WriteToBigQuery, ఇది కేవలం మన డేటాను టేబుల్‌కి జోడిస్తుంది. బ్యాచ్ డేటాఫ్లో జాబ్ మరియు స్ట్రీమింగ్ డేటాఫ్లో జాబ్ కోసం కోడ్ క్రింద ఇవ్వబడింది. బ్యాచ్ మరియు స్ట్రీమింగ్ కోడ్ మధ్య ఉన్న ఏకైక తేడా ఏమిటంటే, బ్యాచ్‌లో మనం CSVని చదవడం src_pathఫంక్షన్ ఉపయోగించి ReadFromText బీమ్ నుండి.

బ్యాచ్ డేటాఫ్లో జాబ్ (బ్యాచ్ ప్రాసెసింగ్)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

స్ట్రీమింగ్ డేటాఫ్లో జాబ్ (స్ట్రీమ్ ప్రాసెసింగ్)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

కన్వేయర్‌ను ప్రారంభిస్తోంది

మేము పైప్‌లైన్‌ను వివిధ మార్గాల్లో అమలు చేయవచ్చు. మనకు కావాలంటే, GCPకి రిమోట్‌గా లాగిన్ చేస్తున్నప్పుడు మేము దానిని టెర్మినల్ నుండి స్థానికంగా అమలు చేయవచ్చు.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

అయితే, మేము డేటాఫ్లో ఉపయోగించి దీన్ని అమలు చేయబోతున్నాము. కింది అవసరమైన పారామితులను సెట్ చేయడం ద్వారా దిగువ ఆదేశాన్ని ఉపయోగించి మనం దీన్ని చేయవచ్చు.

  • project — మీ GCP ప్రాజెక్ట్ యొక్క ID.
  • runner మీ ప్రోగ్రామ్‌ను విశ్లేషించి, మీ పైప్‌లైన్‌ని నిర్మించే పైప్‌లైన్ రన్నర్. క్లౌడ్‌లో అమలు చేయడానికి, మీరు తప్పనిసరిగా డేటాఫ్లోరన్నర్‌ని పేర్కొనాలి.
  • staging_location — పనిని నిర్వహిస్తున్న ప్రాసెసర్‌లకు అవసరమైన కోడ్ ప్యాకేజీలను ఇండెక్సింగ్ చేయడానికి క్లౌడ్ డేటాఫ్లో క్లౌడ్ స్టోరేజ్‌కి మార్గం.
  • temp_location — పైప్‌లైన్ నడుస్తున్నప్పుడు సృష్టించబడిన తాత్కాలిక జాబ్ ఫైల్‌లను నిల్వ చేయడానికి క్లౌడ్ డేటాఫ్లో క్లౌడ్ స్టోరేజ్‌కి మార్గం.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

ఈ కమాండ్ రన్ అవుతున్నప్పుడు, మనం గూగుల్ కన్సోల్‌లోని డేటాఫ్లో ట్యాబ్‌కి వెళ్లి మన పైప్‌లైన్‌ను చూడవచ్చు. మనం పైప్‌లైన్‌పై క్లిక్ చేసినప్పుడు, మనకు మూర్తి 4 వంటిది కనిపిస్తుంది. డీబగ్గింగ్ ప్రయోజనాల కోసం, వివరణాత్మక లాగ్‌లను వీక్షించడానికి లాగ్‌లకు వెళ్లి ఆపై స్టాక్‌డ్రైవర్‌కు వెళ్లడం చాలా ఉపయోగకరంగా ఉంటుంది. ఇది అనేక సందర్భాల్లో పైప్‌లైన్ సమస్యలను పరిష్కరించడంలో నాకు సహాయపడింది.

మేము స్ట్రీమ్ డేటా ప్రాసెసింగ్ పైప్‌లైన్‌ను సృష్టిస్తాము. 2 వ భాగము
మూర్తి 4: బీమ్ కన్వేయర్

BigQueryలో మా డేటాను యాక్సెస్ చేయండి

కాబట్టి, మన టేబుల్‌లోకి డేటా ప్రవహించే పైప్‌లైన్ ఇప్పటికే ఉండాలి. దీన్ని పరీక్షించడానికి, మేము BigQueryకి వెళ్లి డేటాను చూడవచ్చు. దిగువ ఆదేశాన్ని ఉపయోగించిన తర్వాత మీరు డేటాసెట్ యొక్క మొదటి కొన్ని వరుసలను చూడాలి. ఇప్పుడు మేము BigQueryలో నిల్వ చేసిన డేటాను కలిగి ఉన్నాము, మేము తదుపరి విశ్లేషణను నిర్వహించగలము, అలాగే డేటాను సహోద్యోగులతో పంచుకోవచ్చు మరియు వ్యాపార ప్రశ్నలకు సమాధానం ఇవ్వడం ప్రారంభించవచ్చు.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

మేము స్ట్రీమ్ డేటా ప్రాసెసింగ్ పైప్‌లైన్‌ను సృష్టిస్తాము. 2 వ భాగము
మూర్తి 5: BigQuery

తీర్మానం

ఈ పోస్ట్ స్ట్రీమింగ్ డేటా పైప్‌లైన్‌ను రూపొందించడానికి, అలాగే డేటాను మరింత ప్రాప్యత చేయడానికి మార్గాలను కనుగొనడంలో ఉపయోగకరమైన ఉదాహరణగా ఉపయోగపడుతుందని మేము ఆశిస్తున్నాము. ఈ ఫార్మాట్‌లో డేటాను నిల్వ చేయడం వల్ల మనకు చాలా ప్రయోజనాలు లభిస్తాయి. ఇప్పుడు మన ఉత్పత్తిని ఎంత మంది వ్యక్తులు ఉపయోగిస్తున్నారు వంటి ముఖ్యమైన ప్రశ్నలకు సమాధానం ఇవ్వడం ప్రారంభించవచ్చు. కాలక్రమేణా మీ యూజర్ బేస్ పెరుగుతోందా? ఉత్పత్తి యొక్క ఏ అంశాలతో ప్రజలు ఎక్కువగా సంకర్షణ చెందుతారు? మరియు ఉండకూడని చోట లోపాలు ఉన్నాయా? సంస్థకు ఆసక్తి కలిగించే ప్రశ్నలు ఇవి. ఈ ప్రశ్నలకు సమాధానాల నుండి వెలువడే అంతర్దృష్టుల ఆధారంగా, మేము ఉత్పత్తిని మెరుగుపరచవచ్చు మరియు వినియోగదారు నిశ్చితార్థాన్ని పెంచవచ్చు.

ఈ రకమైన వ్యాయామానికి బీమ్ నిజంగా ఉపయోగకరంగా ఉంటుంది మరియు అనేక ఇతర ఆసక్తికరమైన ఉపయోగ సందర్భాలను కూడా కలిగి ఉంది. ఉదాహరణకు, మీరు స్టాక్ టిక్ డేటాను నిజ సమయంలో విశ్లేషించి, విశ్లేషణ ఆధారంగా ట్రేడ్‌లు చేయాలనుకోవచ్చు, బహుశా మీరు వాహనాల నుండి వచ్చే సెన్సార్ డేటాను కలిగి ఉండవచ్చు మరియు ట్రాఫిక్ స్థాయి గణనలను లెక్కించాలనుకోవచ్చు. మీరు, ఉదాహరణకు, వినియోగదారు డేటాను సేకరించి, కీ కొలమానాలను ట్రాక్ చేయడానికి డాష్‌బోర్డ్‌లను రూపొందించడానికి దాన్ని ఉపయోగించే గేమింగ్ కంపెనీగా కూడా ఉండవచ్చు. సరే, పెద్దమనుషులు, ఇది మరొక పోస్ట్ కోసం ఒక అంశం, చదివినందుకు ధన్యవాదాలు, మరియు పూర్తి కోడ్‌ని చూడాలనుకునే వారికి, నా GitHub లింక్ క్రింద ఉంది.

https://github.com/DFoly/User_log_pipeline

అంతే. మొదటి భాగం చదవండి.

మూలం: www.habr.com

ఒక వ్యాఖ్యను జోడించండి