Ka hangaia e matou he paipa mo te whakawhiti raraunga. Wāhanga 2

Kia ora katoa. Kei te tohatohahia e matou te whakamaoritanga o te waahanga whakamutunga o te tuhinga, kua whakaritea mo nga akonga o te akoranga. Kaihanga Raraunga. Ka taea e koe te panui i te waahanga tuatahi konei.

Apache Beam me te Raraunga Raraunga mo nga Pipeline Wa-Tuturu

Ka hangaia e matou he paipa mo te whakawhiti raraunga. Wāhanga 2

Te whakatu Google Cloud

Tuhipoka: I whakamahia e ahau a Google Cloud Shell ki te whakahaere i te pipeline me te whakaputa i nga raraunga rangitaki ritenga na te mea i raru ahau ki te whakahaere i te pipeline i Python 3. Ka whakamahi a Google Cloud Shell i te Python 2, he pai ake ki a Apache Beam.

Hei timata i te paipa, me keri iti ki nga tautuhinga. Mo koe kaore ano kia whakamahi i te GCP i mua, me whai koe i nga hikoinga e 6 e whai ake nei i tuhia ki tenei Whārangi.

Whai muri i tenei, me tuku ake a maatau tuhinga ki te Rokiroki Kapua a Google ka kape ki to maatau Google Cloud Shel. He iti noa te tuku ki te rokiroki kapua (ka kitea he whakaahuatanga konei). Hei kape i a maatau konae, ka taea e tatou te whakatuwhera i te Google Cloud Shel mai i te paeutauta ma te panui i te ata tuatahi kei te taha maui i te Whakaahua 2 i raro nei.

Ka hangaia e matou he paipa mo te whakawhiti raraunga. Wāhanga 2
2 Whakaatu

Ko nga whakahau hei kape i nga konae me te whakauru i nga whare pukapuka e hiahiatia ana kei raro nei.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Te hanga i to maatau paparangi raraunga me te ripanga

Kia oti i a matou nga mahi katoa e pa ana ki te tatūnga, ko te mahi ka whai ake ko te hanga i tetahi huingararaunga me te ripanga ki BigQuery. He maha nga huarahi hei mahi i tenei, engari ko te mea ngawari ko te whakamahi i te papatohu Google Cloud ma te hanga tuatahi i tetahi huingararaunga. Ka taea e koe te whai i nga kaupae i raro nei honoki te hanga ripanga me te aronuinga. Ka whai ta matou tepu 7 poupou, e rite ana ki nga waahanga o ia raarangi kaiwhakamahi. Mo te waatea, ka tautuhia e matou nga pou katoa hei aho, engari mo te taurangi roherohe, ka whakaingoatia kia rite ki nga taurangi i mahia e matou i mua. Ko te whakatakotoranga o ta maatau tepu kia rite ki te Whakaahua 3.

Ka hangaia e matou he paipa mo te whakawhiti raraunga. Wāhanga 2
Whakaatu 3. Tahora ripanga

Te whakaputa raraunga rangitaki kaiwhakamahi

Ko te Pub/Sub tetahi waahanga nui o to maatau paipa na te mea ka taea e nga tono motuhake maha te korero ki a raatau ano. Ina koa, he mahi takawaenga e taea ai e tatou te tuku me te whiwhi karere i waenga i nga tono. Ko te mahi tuatahi ko te hanga kaupapa. Haere noa ki te Pub/Sub i roto i te papatohu ka paato i te WAIHANGA TOPIC.

Ko te waehere i raro nei ka karanga i to maatau tuhinga ki te whakaputa i nga raraunga rangitaki kua tautuhia i runga ake nei, ka hono, ka tukuna nga raarangi ki te Pub/Sub. Ko te mea anake hei mahi ma tatou ko te hanga taonga KaiwhakaputaKaihoko, tohua te ara ki te kaupapa ma te whakamahi i te tikanga topic_path ka karanga i te mahi publish с topic_path me nga raraunga. Kia mahara mai kei te kawemai matou generate_log_line mai i ta maatau tuhinga stream_logs, na kia mohio kei roto enei konae i te kōpaki kotahi, ki te kore ka puta he hapa kawemai. Ka taea e taatau te whakahaere i tenei ma to maatau papatohu google ma te whakamahi:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

I te wa e rere ana te konae, ka taea e tatou te kite i te putanga o nga raraunga rangitaki ki te papatohu, penei i te ahua i raro nei. Ka mahi tenei tuhinga ki te kore tatou e whakamahi CTRL + Cki te whakaoti.

Ka hangaia e matou he paipa mo te whakawhiti raraunga. Wāhanga 2
Whakaahua 4. Putanga publish_logs.py

Te tuhi i a maatau waehere paipa

Inaianei kua rite nga mea katoa, ka taea e taatau te tiimata i te waahanga ngahau - te tohu i ta maatau paipa ma te whakamahi i te Beam me te Python. Hei hanga paipa kurupae, me hanga he ahanoa paipa (p). Ina oti te hanga i tetahi ahanoa paipa, ka taea e taatau te whakamahi i nga mahi maha tetahi i muri i tetahi ma te whakamahi i te kaiwhakahaere pipe (|). I te nuinga o te waa, he rite te ahua o te rerengamahi ki te ahua o raro nei.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

I roto i to maatau waehere, ka hangaia e matou nga mahi ritenga e rua. Mahi regex_clean, ka matawai i nga raraunga me te tiki i te rarangi e rite ana i runga i te rarangi PATTERNS ma te whakamahi i te mahi re.search. Ka whakahokia e te taumahi he aho wehea piko. Mena ehara koe i te tohunga ki te whakaputa korero, ka tūtohu ahau kia tirohia tenei akoranga me te mahi i roto i te papa tuhipoka hei tirotiro i te waehere. I muri i tenei ka tautuhia e matou he mahinga ParDo ritenga e kiia ana Ritua, he rereketanga o te huringa Beam mo te tukatuka whakarara. I roto i te Python, he mahi motuhake tenei - me hanga he karaehe ka riro mai i te akomanga DoFn Beam. Ka tangohia e te taumahi Ritua te rarangi porotiti mai i te mahi o mua, ka whakahoki mai i te rarangi papakupu me nga taviri e rite ana ki nga ingoa pou i roto i ta maatau ripanga BigQuery. He mea hei tohu mo tenei mahi: Me kawemai e au datetime i roto i tetahi mahi kia mahi ai. He hapa kawemai ahau i te timatanga o te konae, he ahua ke. Ka tukuna tenei rarangi ki te mahi TuhiaToBigQuery, ka taapiri noa i o maatau raraunga ki te ripanga. Kei raro iho nei te waehere mo te Putunga DataFlow Job me te Streaming DataFlow Job. Ko te rereketanga anake i waenga i te puranga me te waehere rerenga ko te roopu ka panuihia e matou te CSV mai src_pathte whakamahi i te mahi ReadFromText na Beam.

Te Mahi Raraunga Raraunga (tukatuka puranga)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Rere Raraunga Mahi (tukatuka awa)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Ka timata te kaikawe

Ka taea e tatou te whakahaere i te paipa i roto i nga huarahi rereke. Ki te hiahia matou, ka taea noa e matou te whakahaere i te rohe mai i te tauranga i te wa e takiuru mamao ana ki te GCP.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Heoi, ka whakahaerehia e maatau ma te whakamahi i te DataFlow. Ka taea e tatou te mahi ma te whakamahi i te whakahau i raro nei ma te whakarite i nga tawhā e whai ake nei.

  • project — ID o to kaupapa GCP.
  • runner he kaiwhangai paipa hei tātari i to kaupapa me te hanga i to paipa. Hei rere i roto i te kapua, me tohu e koe he DataflowRunner.
  • staging_location — te ara ki te rokiroki kapua Cloud Dataflow mo nga kohinga tohu tohu e hiahiatia ana e nga kaitukatuka e mahi ana i te mahi.
  • temp_location — ara ki te kapua rokiroki kapua Cloud Dataflow mo te penapena i nga konae mahi rangitahi i hangaia i te wa e rere ana te paipa.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

I te wa e rere ana tenei whakahau, ka taea e tatou te haere ki te ripa DataFlow i te papatohu google me te tiro i to maatau paipa. Ka paato tatou i te paipa, me kite tatou i tetahi mea e rite ana ki te Whakaahua 4. Mo nga kaupapa patuiro, ka tino awhina ki te haere ki te Rangitaki katahi ki te Stackdriver ki te tiro i nga raarangi taipitopito. Na tenei i awhina i ahau ki te whakatau i nga take paipa i roto i te maha o nga keehi.

Ka hangaia e matou he paipa mo te whakawhiti raraunga. Wāhanga 2
Whakaahua 4: Kaikawe kurupae

Uru ki a maatau raraunga i BigQuery

Na, me whai paipa kee taatau me nga raraunga e rere ana ki roto i ta maatau ripanga. Hei whakamatautau i tenei, ka taea e tatou te haere ki BigQuery me te titiro ki nga raraunga. Whai muri i te whakamahi i te whakahau i raro nei me kite koe i nga rarangi tuatahi o te huingararaunga. Inaianei kei a matou nga raraunga kua penapena ki BigQuery, ka taea e matou te whakahaere i etahi atu tātaritanga, me te tiri i nga raraunga ki o hoa mahi ka tiimata ki te whakautu i nga paatai ​​pakihi.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Ka hangaia e matou he paipa mo te whakawhiti raraunga. Wāhanga 2
Whakaahua 5: BigQuery

mutunga

Ko te tumanako ka waiho tenei pou hei tauira whaihua mo te hanga i te paipa raraunga roma, me te rapu huarahi kia pai ake te uru atu o nga raraunga. Ko te penapena raraunga ki tenei whakatakotoranga he maha nga painga. Inaianei ka tiimata taatau ki te whakautu i nga patai nui penei i te tini o nga tangata e whakamahi ana i a maatau hua? Kei te tipu haere to turanga kaiwhakamahi i roto i te waa? He aha nga ahuatanga o te hua ka tino mahi tahi te tangata? A he hapa kei te kore e tika? Ko enei nga patai ka pai ki te whakahaere. I runga i nga whakaaro ka puta mai i nga whakautu ki enei patai, ka taea e tatou te whakapai ake i te hua me te whakanui ake i te whakauru o nga kaiwhakamahi.

He tino whai hua a Beam mo tenei momo whakakori tinana, he maha atu ano nga keehi whakamahi pai. Hei tauira, ka hiahia koe ki te wetewete i nga raraunga tohu kararehe i roto i te waa tuuturu me te mahi hokohoko i runga i te tātaritanga, tera pea kei a koe nga raraunga puoro mai i nga waka me te hiahia ki te tatau i nga tatauranga taumata waka. Ka taea hoki e koe, hei tauira, he kamupene petipeti e kohi raraunga kaiwhakamahi me te whakamahi hei hanga papatohu ki te whai i nga inenga matua. Kaati, e koro, he kaupapa tenei mo tetahi atu panui, whakawhetai mo te panui, mo te hunga e hiahia ana ki te kite i te katoa o te waehere, kei raro nei te hono ki taku GitHub.

https://github.com/DFoly/User_log_pipeline

Ko te katoa. Panuitia te wahanga tuatahi.

Source: will.com

Tāpiri i te kōrero