Re theha phaephe ea ho sebetsa ha data. Karolo ea 2

Lumelang bohle. Re arolelana phetolelo ea karolo ea ho qetela ea sehlooho, e lokiselitsoeng ka ho khetheha bakeng sa liithuti tsa thupelo. Moenjiniere oa Boitsebiso. U ka bala karolo ea pele mona.

Apache Beam le DataFlow bakeng sa Liphaephe tsa Nako ea 'Nete

Re theha phaephe ea ho sebetsa ha data. Karolo ea 2

Ho theha Google Cloud

Tlhokomeliso: Ke sebelisitse Google Cloud Shell ho tsamaisa lipeipi le ho phatlalatsa lintlha tse tloahelehileng hobane ke ne ke thatafalloa ke ho tsamaisa lipeipi tsa Python 3. Google Cloud Shell e sebelisa Python 2, e lumellanang haholoanyane le Apache Beam.

Ho qala liphaephe, re hloka ho cheka hanyenyane ho li-setting. Bakeng sa lona ba so kang ba sebelisa GCP pele, le tla hloka ho latela mehato e 6 e latelang e boletsoeng ho sena leqephe.

Ka mor'a sena, re tla hloka ho kenya litokomane tsa rona ho Google Cloud Storage le ho li kopitsa ho Google Cloud Shel ea rona. Ho kenya polokelong ea maru ke ntho e nyane haholo (tlhaloso e ka fumanoa mona). Ho kopitsa lifaele tsa rona, re ka bula Google Cloud Shel ho tsoa ho toolbar ka ho tobetsa letšoao la pele le letšehali setšoantšong sa 2 se ka tlase.

Re theha phaephe ea ho sebetsa ha data. Karolo ea 2
Setšoantšo sa 2

Litaelo tseo re li hlokang ho kopitsa lifaele le ho kenya lilaebrari tse hlokahalang li thathamisitsoe ka tlase.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Ho theha database ea rona le tafole

Hang ha re qetile mehato eohle e amanang le ho seta, ntho e latelang eo re lokelang ho e etsa ke ho theha dataset le tafole ho BigQuery. Ho na le mekhoa e mengata ea ho etsa sena, empa e bonolo ka ho fetisisa ke ho sebelisa Google Cloud console ka ho qala dataset. U ka latela mehato e ka tlase kgokahanyoho bopa tafole e nang le leano. Tafole ea rona e tla ba le 7 litšiea, e lumellanang le likarolo tsa tlaleho e 'ngoe le e' ngoe ea basebelisi. Bakeng sa boiketlo, re tla hlalosa litšiea tsohle e le likhoele, ntle le ho feto-fetoha ha nako, 'me re li rehe ho latela mefuta eo re e hlahisitseng pejana. Sebopeho sa tafole ea rona se lokela ho shebahala joalo ka setšoantšo sa 3.

Re theha phaephe ea ho sebetsa ha data. Karolo ea 2
Setšoantšo sa 3. Sebopeho sa tafole

Ho phatlalatsa lintlha tsa mosebelisi

Pub/Sub ke karolo ea bohlokoa ea lipeipi tsa rona hobane e lumella lisebelisoa tse ngata tse ikemetseng ho buisana. Haholo-holo, e sebetsa e le mokena-lipakeng o re lumellang ho romela le ho amohela melaetsa pakeng tsa lits'ebetso. Ntho ea pele eo re lokelang ho e etsa ke ho theha sehlooho. Eya feela ho Pub/Sub ho khomphutha ebe o tobetsa CREATE TOPIC.

Khoutu e ka tlase e bitsa sengoloa sa rona ho hlahisa data ea log e hlalositsoeng ka holimo ebe e hokela le ho romela lintlha ho Pub/Sub. Ntho feela eo re lokelang ho e etsa ke ho theha ntho MohatisiClient, hlakisa tsela ea sehlooho u sebelisa mokhoa topic_path ebe o bitsa tshebetso publish с topic_path le data. Ka kopo hlokomela hore re reka kantle ho naha generate_log_line ho tsoa ho script ea rona stream_logs, kahoo etsa bonnete ba hore lifaele tsena li foldareng e le 'ngoe, ho seng joalo u tla fumana phoso ea ho kenya. Joale re ka tsamaisa sena ka google console ea rona re sebelisa:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Hang ha faele e matha, re tla khona ho bona tlhahiso ea data ea log ho console, joalokaha ho bontšitsoe setšoantšong se ka tlase. Script ena e tla sebetsa ha feela re sa e sebelise CTRL + Cho e phetha.

Re theha phaephe ea ho sebetsa ha data. Karolo ea 2
Setšoantšo sa 4. Sephetho publish_logs.py

Ho ngola khoutu ea rona ea pipeline

Kaha joale re se re lokisitse tsohle, re ka qala karolo e monate - ho kenya lipeipi tsa rona re sebelisa Beam le Python. Ho theha pipeline ea Beam, re hloka ho theha ntho ea liphaephe (p). Ha re se re thehile ntho ea liphaephe, re ka sebelisa mesebetsi e mengata ka mor'a e 'ngoe re sebelisa opareitara pipe (|). Ka kakaretso, mokhoa oa ho sebetsa o shebahala joaloka setšoantšo se ka tlase.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Khoutung ea rona, re tla theha mesebetsi e 'meli ea tloaelo. Mosebetsi regex_clean, e hlahlobang data le ho fumana mola o lumellanang le lenane la PATTERNS ho sebelisa ts'ebetso re.search. Mosebetsi o khutlisa khoele e arohaneng ea phegelwana. Haeba u se setsebi sa kamehla sa polelo, ke khothaletsa ho hlahloba sena thuto 'me u itloaetse ho ngolla lintlha ho hlahloba khoutu. Ka mor'a sena re hlalosa mosebetsi oa tloaelo oa ParDo o bitsoang Arohane, e leng phapang ea phetoho ea Beam bakeng sa ts'ebetso e ts'oanang. Ho Python, sena se etsoa ka tsela e khethehileng - re tlameha ho theha sehlopha se ruang ho tsoa sehlopheng sa DoFn Beam. The Split function e nka mola o arotsweng ho tswa ho tshebetso e fetileng mme e kgutlisa lenane la didikishinari tse nang le dinotlolo tse tsamaellanang le mabitso a dikholomo tafoleng ya rona ya BigQuery. Ho na le ntho eo u lokelang ho e ela hloko mabapi le ts'ebetso ena: Ke ile ka tlameha ho kenya datetime ka hare ho mosebetsi ho etsa hore e sebetse. Ke ne ke fumana phoso ea ho reka thepa qalong ea faele, e leng ntho e makatsang. Lenane lena le fetisetsoa tšebetsong WriteToBigQuery, e leng se eketsang data ea rona tafoleng. Khoutu ea Batch DataFlow Job le Streaming DataFlow Job e fanoe ka tlase. Phapang e le 'ngoe feela pakeng tsa batch le khoutu ea ho phallela ke hore ka batch re bala CSV ho tloha src_pathka ho sebedisa tshebetso ReadFromText ho tloha Beam.

Mosebetsi oa Batch DataFlow (batch processing)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Mosebetsi oa Phallo ea DataFlow (ho sebetsa ka molapo)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Ho qala conveyor

Re ka tsamaisa lipeipi ka mekhoa e mengata e fapaneng. Haeba re ne re batla, re ka e tsamaisa sebakeng sa heno ho tsoa ho terminal ha re ntse re kena ho GCP re le hole.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Leha ho le joalo, re tla e tsamaisa re sebelisa DataFlow. Re ka etsa sena re sebelisa taelo e ka tlase ka ho beha li-parameter tse hlokahalang tse latelang.

  • project - ID ea projeke ea hau ea GCP.
  • runner ke semathi sa lipeipi se tla hlahlobisisa lenaneo la hau le ho etsa lipeipi tsa hau. Ho sebetsa lerung, o tlameha ho hlakisa DataflowRunner.
  • staging_location - tsela e eang polokelong ea leru ea Cloud Dataflow bakeng sa liphutheloana tsa khoutu tse hlokoang ke li-processor tse etsang mosebetsi.
  • temp_location - tsela e eang polokelong ea leru ea Cloud Dataflow bakeng sa ho boloka lifaele tsa nakoana tsa mosebetsi tse entsoeng ha phala e ntse e sebetsa.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Ha taelo ena e ntse e sebetsa, re ka ea ho "DataFlow tab" ho google console 'me ra sheba lipeipi tsa rona. Ha re tobetsa pipeline, re lokela ho bona ntho e tšoanang le Setšoantšo sa 4. Bakeng sa merero ea ho lokisa liphoso, ho ka ba molemo haholo ho ea ho Li-Logs le ho Stackdriver ho sheba lintlha tse qaqileng. Sena se nthusitse ho rarolla mathata a lipeipi maemong a 'maloa.

Re theha phaephe ea ho sebetsa ha data. Karolo ea 2
Setšoantšo sa 4: Beam conveyor

Fumana lintlha tsa rona ho BigQuery

Kahoo, re lokela ho se ntse re na le pipeline e tsamaeang le data e phallang tafoleng ea rona. Ho leka sena, re ka ea ho BigQuery 'me ra sheba lintlha. Kamora ho sebelisa taelo e ka tlase o lokela ho bona mela e 'maloa ea pele ea dataset. Kaha joale re na le lintlha tse bolokiloeng ho BigQuery, re ka etsa tlhahlobo e eketsehileng, hammoho le ho arolelana lintlha le basebetsi-'moho le rona le ho qala ho araba lipotso tsa khoebo.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Re theha phaephe ea ho sebetsa ha data. Karolo ea 2
Setšoantšo sa 5: BigQuery

fihlela qeto e

Re tšepa hore poso ena e sebetsa e le mohlala o molemo oa ho theha pipeline ea data e phallelang, hammoho le ho fumana litsela tsa ho etsa hore data e fumanehe haholoanyane. Ho boloka data ka mokhoa ona ho re fa melemo e mengata. Joale re ka qala ho araba lipotso tsa bohlokoa tse kang hore na ke batho ba bakae ba sebelisang sehlahisoa sa rona? Na basebelisi ba hau ba ntse ba eketseha ha nako e ntse e feta? Ke likarolo life tsa sehlahisoa tseo batho ba kopanang le tsona haholo? Hona na ho na le liphoso moo ho sa lokelang ho ba teng? Tsena ke lipotso tse tla khahla mokhatlo. Ho ipapisitsoe le lintlha tse hlahang likarabong tsa lipotso tsena, re ka ntlafatsa sehlahisoa le ho eketsa tšebelisano ea basebelisi.

Beam e bohlokoa haholo bakeng sa boikoetliso ba mofuta ona mme e na le linyeoe tse ling tse ngata tse khahlisang tsa ts'ebeliso. Mohlala, o kanna oa batla ho sekaseka data ea stock tick ka nako ea nnete mme o etse khoebo ho latela tlhahlobo, mohlomong o na le data ea sensor e tsoang likoloing mme o batla ho bala lipalo tsa boemo ba sephethephethe. Hape, ka mohlala, u ka ba k'hamphani ea lipapali e bokellang lintlha tsa basebelisi le ho e sebelisa ho etsa li-dashboards ho latela metrics ea bohlokoa. Ho lokile, beng ba ka, sena ke sehlooho sa poso e 'ngoe, ke leboha ho bala, le bakeng sa ba batlang ho bona khoutu e feletseng, ka tlase ke sehokelo sa GitHub ea ka.

https://github.com/DFoly/User_log_pipeline

Ke phetho. Bala karolo ea pele.

Source: www.habr.com

Eketsa ka tlhaloso