Matou te faia se paipa e fa'agasolo ai fa'amaumauga. Vaega 2

Talofa uma. O loʻo matou faʻasoaina le faʻaliliuga o le vaega mulimuli o le tusiga, saunia faapitoa mo tamaiti aoga o le vasega. Inisinia Fa'amaumauga. E mafai ona e faitau i le vaega muamua iinei.

Apache Beam ma DataFlow mo Pipeline Taimi Moni

Matou te faia se paipa e fa'agasolo ai fa'amaumauga. Vaega 2

Fa'atulaga Google Cloud

Faʻaaliga: Na ou faʻaogaina le Google Cloud Shell e faʻatautaia ai le paipa ma faʻasalalau faʻamaumauga o faʻamaumauga masani ona o loʻu faʻafitauli i le faʻaogaina o le paipa i le Python 3. Google Cloud Shell faʻaaogaina le Python 2, lea e sili atu ona ogatasi ma Apache Beam.

Ina ia amata le paipa, e tatau ona tatou eli teisi i totonu o tulaga. Mo outou e le'i fa'aogaina muamua le GCP, e tatau ona e mulimuli i la'asaga nei e 6 o lo'o fa'amatala atu i lenei mea itulau.

A maeʻa lenei mea, e manaʻomia le tuʻuina atu o a tatou tusitusiga ile Google Cloud Storage ma kopi i la tatou Google Cloud Shel. O le tuʻuina atu i luga o le teuina o ao e matua le taua (e mafai ona maua se faʻamatalaga iinei). Ina ia kopi a matou faila, e mafai ona matou tatalaina Google Cloud Shel mai le meafaigaluega e ala i le kilikiina o le ata muamua i le agavale i le Ata 2 i lalo.

Matou te faia se paipa e fa'agasolo ai fa'amaumauga. Vaega 2
2 Ata

O poloaiga tatou te manaʻomia e kopi ai faila ma faʻapipiʻi faletusi manaʻomia o loʻo lisiina i lalo.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Fausia la matou fa'amaumauga ma le laulau

A mae'a uma la'asaga feso'ota'i, o le isi mea e tatau ona tatou faia o le faia lea o se fa'amaumauga ma le laulau ile BigQuery. E tele auala e fai ai lenei mea, ae o le faigofie o le faʻaogaina o le Google Cloud console e ala i le faia muamua o se faʻamaumauga. E mafai ona e mulimuli i laasaga o loʻo i lalo fesoʻotaʻigae fai ai se laulau ma se fuafuaga. O le a iai la tatou laulau 7 koluma, e fetaui ma vaega o fa'amaumauga a tagata ta'itasi. Mo le faʻafaigofie, o le a matou faʻamalamalamaina koluma uma o ni manoa, sei vagana ai le fesuiaiga o le taimi, ma taʻu i latou e tusa ai ma fesuiaiga na matou fatuina muamua. O le faʻatulagaina o la matou laulau e tatau ona foliga i le Ata 3.

Matou te faia se paipa e fa'agasolo ai fa'amaumauga. Vaega 2
Ata 3. Fa'atulagaina o laulau

Fa'asalalau fa'amaumauga o fa'amaumauga a tagata fa'aoga

Pub/Sub ose vaega taua o la tatou paipa aua e mafai ai e le tele o talosaga tuto'atasi ona feso'ota'i ma isi. Aemaise lava, e galue o se tagata faufautua e mafai ai ona matou auina atu ma maua feʻau i le va o talosaga. O le mea muamua e tatau ona tatou faia o le fatuina o se autu. Na'o le alu ile Pub/Sub ile fa'amafanafanaga ma kiliki FUA LE AUTU.

O le fa'ailoga o lo'o i lalo e vala'au ai la tatou fa'amaumauga e fa'atupu ai fa'amaumauga o fa'amaumauga o lo'o fa'amatalaina i luga ona fa'afeso'ota'i lea ma lafo atu ogalaau ile Pub/Sub. Pau lava le mea e tatau ona tatou faia o le fatuina o se mea PublisherClient, faʻamaonia le ala i le autu e faʻaaoga ai le metotia topic_path ma valaau le galuega publish с topic_path ma fa'amaumauga. Faamolemole ia matau mai matou te faaulufale mai generate_log_line mai la matou tusitusiga stream_logs, ia mautinoa o nei faila o loʻo i totonu o le pusa lava e tasi, a leai o le ae maua se mea sese mai fafo. Ona mafai lea ona matou faʻatautaia lenei mea e ala i la matou google console e faʻaaoga ai:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

O le taimi lava e alu ai le faila, o le a mafai ona tatou vaʻaia le gaioiga o faʻamaumauga o log i le faʻamafanafanaga, e pei ona faʻaalia i le ata o loʻo i lalo. O le a aoga lenei tusitusiga pe a tatou le fa'aaogaina FMT + Ce faauma ai.

Matou te faia se paipa e fa'agasolo ai fa'amaumauga. Vaega 2
Ata 4. Galuega Fa'atino publish_logs.py

Tusia la matou code paipa

O lea la ua uma ona saunia mea uma, e mafai ona tatou amataina le vaega malie - faʻavasega la tatou paipa e faʻaaoga ai le Beam ma le Python. Ina ia faia se paipa Beam, tatou te manaʻomia le fatuina o se mea paipa (p). O le taimi lava na matou fatuina ai se mea paipa, e mafai ona matou faʻaogaina le tele o galuega faʻatasi ma le isi e faʻaaoga ai le tagata faʻaoga pipe (|). I se tulaga lautele, o le galuega e pei o le ata o loʻo i lalo.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

I la tatou code, o le a tatou faia ni galuega masani se lua. Galuega regex_clean, lea e su'esu'e fa'amaumauga ma toe aumai le laina tutusa e fa'atatau i le lisi PATTERNS e fa'aaoga ai le galuega. re.search. O le galuega e toe fa'afo'i mai ai se manoa e vaelua koma. Afai e le o oe o se tagata poto masani faʻamatalaga, ou te fautuaina e siaki lenei mea a'oa'oga ma fa'ata'ita'i i se notepad e siaki ai le code. A maeʻa lenei mea matou te faʻamatalaina se galuega masani a ParDo e taʻua vaeluaina, o se fesuiaiga o le suiga o le Beam mo le faiga tutusa. I le Python, e faia lenei mea i se auala faʻapitoa - e tatau ona tatou fatuina se vasega e maua mai le vasega DoFn Beam. O le galuega Split e ave ai le laina fa'avasega mai le galuega muamua ma toe fa'afo'i se lisi o lomifefiloi ma ki e fetaui ma igoa o koluma i la tatou laulau BigQuery. E i ai se mea e tatau ona matauina e uiga i lenei galuega: Sa tatau ona ou faaulufale mai datetime i totonu o se galuega ina ia galue. Sa ou mauaina se mea sese mai fafo i le amataga o le faila, e ese. O le lisi lea e pasi atu i le galuega TusiToBigQuery, lea e na'o le fa'aopoopoina oa matou fa'amatalaga i le laulau. O le code mo Batch DataFlow Job ma Streaming DataFlow Job o loʻo tuʻuina atu i lalo. Pau lava le eseesega i le va o le batch ma le streaming code o le vaega matou te faitau ai le CSV mai src_pathfa'aaogaina le galuega ReadFromText mai Beam.

Tulaga Fa'amatalagaFlow Galuega (vaega fa'aputuga)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Fa'asalalauina Fa'amatalagaFlow Galuega

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Amataina le conveyor

E mafai ona tatou faʻatautaia le paipa i ni auala eseese. Afai matou te mananaʻo, e mafai ona matou taʻavale faʻapitonuʻu mai se faʻailoga aʻo matou saini i le GCP mamao.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Ae ui i lea, o le a matou faʻaaogaina e faʻaaoga ai le DataFlow. E mafai ona tatou faia lenei mea e faʻaaoga ai le faʻatonuga o loʻo i lalo e ala i le setiina o taʻiala manaʻomia.

  • project — ID o lau poloketi GCP.
  • runner ose paipa tamo'e o le a au'ili'ili lau polokalame ma fausia lau paipa. Ina ia tamoe i le ao, e tatau ona e faʻamaonia se DataflowRunner.
  • staging_location - o le ala i le Cloud Dataflow cloud storage mo faʻamaumauga o faʻamaumauga o loʻo manaʻomia e le au faʻatautaia o loʻo faʻatinoina le galuega.
  • temp_location - auala i le Cloud Dataflow cloud storage mo le teuina o faila galuega le tumau na faia aʻo faʻagasolo le paipa.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

A'o fa'agasolo lenei fa'atonuga, e mafai ona matou o atu i le Fa'amaumauga o le DataFlow i le google console ma va'ai i la matou paipa. A tatou kiliki i luga o le paipa, e tatau ona tatou vaʻai i se mea e tutusa ma le Ata 4. Mo faʻamoemoega faʻapipiʻi, e mafai ona fesoasoani tele le alu i Logs ona sosoo ai lea ma le Stackdriver e matamata i faʻamatalaga auiliili. Ua fesoasoani lenei mea ia te au e foia ai faafitauli o le paipa i le tele o mataupu.

Matou te faia se paipa e fa'agasolo ai fa'amaumauga. Vaega 2
Ata 4: Fa'alava fa'alava

Avanoa a matou fa'amatalaga ile BigQuery

O lea la, e tatau ona i ai se paipa o loʻo taʻavale ma faʻamaumauga o loʻo tafe i totonu o la tatou laulau. Ina ia faʻataʻitaʻiina lenei mea, e mafai ona tatou alu i BigQuery ma vaʻai i faʻamaumauga. A maeʻa ona faʻaogaina le faʻatonuga o loʻo i lalo e tatau ona e vaʻai i nai laina muamua o le faʻamaumauga. O lea la ua i ai fa'amaumauga o lo'o teuina i BigQuery, e mafai ona tatou fa'atautaia atili au'ili'iliga, fa'apea fo'i le fa'asoa atu o fa'amaumauga i a tatou pa'aga ma amata tali fesili fa'apisinisi.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Matou te faia se paipa e fa'agasolo ai fa'amaumauga. Vaega 2
Ata 5: BigQuery

iʻuga

Matou te faʻamoemoe o lenei pou e avea o se faʻataʻitaʻiga aoga o le fatuina o se paipa faʻamaumauga faʻasalalau, faʻapea foʻi ma le sailia o auala e faʻafaigofie ai faʻamatalaga. O le teuina o faʻamaumauga i lenei faatulagaga e maua ai le tele o mea lelei. O lea la e mafai ona tatou amata taliina fesili taua e pei o le toʻafia o tagata e faʻaaogaina a tatou oloa? O fa'atupula'ia lau fa'aoga fa'aoga ile taimi? O a vaega o le oloa e sili ona fegalegaleai ai tagata? Ma e i ai ni mea sese e le tatau ona i ai? O fesili ia o le a fiafia i ai le faalapotopotoga. Faʻavae i luga o faʻamatalaga e faʻaalia mai tali i nei fesili, e mafai ona tatou faʻaleleia le oloa ma faʻateleina le faʻaogaina o tagata.

E aoga tele le Beam mo lenei ituaiga o faʻamalositino ma e iai foʻi le tele o isi faʻaoga mataʻina. Mo se faʻataʻitaʻiga, atonu e te manaʻo e suʻesuʻe faʻamaumauga o faʻamaumauga i le taimi moni ma fai fefaʻatauaiga e faʻavae i luga o le auʻiliʻiliga, masalo o loʻo i ai sau faʻamatalaga faʻamatalaga e sau mai taʻavale ma e te manaʻo e faʻatatau le faʻatusatusaga o tulaga tau taavale. E mafai foi, mo se faʻataʻitaʻiga, avea oe ma kamupani taʻaloga e aoina faʻamatalaga tagata faʻaoga ma faʻaaogaina e fai ai lisi laupapa e siaki ai metotia autu. Lelei, alii, o se autu lenei mo se isi pou, faafetai mo le faitau, ma mo i latou e fia vaʻai i le code atoa, o loʻo i lalo le sootaga i laʻu GitHub.

https://github.com/DFoly/User_log_pipeline

Pau lava lena. Faitau le vaega muamua.

puna: www.habr.com

Faaopoopo i ai se faamatalaga