Peb tsim cov kwj cov ntaub ntawv ua cov kav dej. Ntu 2

Nyob zoo sawv daws. Peb tab tom sib koom kev txhais lus ntawm qhov kawg ntawm tsab xov xwm, npaj tshwj xeeb rau cov tub ntxhais kawm ntawm chav kawm. Data Engineer. Koj tuaj yeem nyeem thawj ntu S, SΡ“S,.

Apache Beam thiab DataFlow rau Real-Time Pipelines

Peb tsim cov kwj cov ntaub ntawv ua cov kav dej. Ntu 2

Teeb tsa Google Cloud

Nco tseg: Kuv tau siv Google Cloud Plhaub los khiav cov raj xa dej thiab tshaj tawm cov ntaub ntawv teev tseg kev cai vim tias kuv muaj teeb meem khiav cov raj xa dej hauv Python 3. Google Cloud Plhaub siv Python 2, uas zoo sib xws nrog Apache Beam.

Txhawm rau pib lub raj xa dej, peb yuav tsum khawb me ntsis rau hauv qhov chaw. Rau cov neeg ntawm koj uas tsis tau siv GCP ua ntej, koj yuav tsum ua raws li 6 cov kauj ruam hauv qab no nplooj ntawv.

Tom qab no, peb yuav tsum tau muab peb cov ntawv sau rau hauv Google Cloud Storage thiab luam rau peb Google Cloud Shel. Uploading rau huab cia yog qhov tsis tseem ceeb (cov lus piav qhia tuaj yeem pom no). Txhawm rau luam peb cov ntaub ntawv, peb tuaj yeem qhib Google Cloud Shel ntawm lub toolbar los ntawm txhaj rau thawj lub cim ntawm sab laug hauv daim duab 2 hauv qab no.

Peb tsim cov kwj cov ntaub ntawv ua cov kav dej. Ntu 2
2 teeb duab

Cov lus txib peb yuav tsum luam cov ntaub ntawv thiab teeb tsa cov tsev qiv ntawv yuav tsum tau teev tseg hauv qab no.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Tsim peb database thiab rooj

Thaum peb tau ua tiav tag nrho cov txheej txheem teeb tsa, qhov txuas ntxiv peb yuav tsum ua yog tsim cov ntaub ntawv thiab cov lus hauv BigQuery. Muaj ntau txoj hauv kev los ua qhov no, tab sis qhov yooj yim tshaj plaws yog siv Google Cloud console los ntawm thawj zaug tsim cov ntaub ntawv. Koj tuaj yeem ua raws li cov kauj ruam hauv qab no txuaslos tsim ib lub rooj nrog ib tug schema. Peb lub rooj yuav muaj 7 kab, sib xws rau cov khoom ntawm txhua tus neeg siv lub cav. Txhawm rau kom yooj yim, peb yuav txhais tag nrho cov kab ntawv raws li cov hlua, tshwj tsis yog lub sijhawm hloov pauv, thiab npe lawv raws li qhov hloov pauv peb tau tsim ua ntej. Cov txheej txheem ntawm peb lub rooj yuav tsum zoo li hauv daim duab 3.

Peb tsim cov kwj cov ntaub ntawv ua cov kav dej. Ntu 2
Daim duab 3. Rooj layout

Tshaj tawm cov ntaub ntawv teev cov neeg siv

Pub / Sub yog ib qho tseem ceeb ntawm peb cov raj xa dej vim nws tso cai rau ntau daim ntawv thov kev ywj pheej sib txuas lus. Tshwj xeeb, nws ua haujlwm ua tus neeg nruab nrab uas tso cai rau peb xa thiab tau txais cov lus ntawm cov ntawv thov. Thawj qhov peb yuav tsum tau ua yog tsim lub ntsiab lus. Tsuas yog mus rau Pub / Sub hauv lub console thiab nyem CREATE TOPIC.

Cov cai hauv qab no hu peb tsab ntawv los tsim cov ntaub ntawv teev tseg saum toj no thiab tom qab ntawd txuas thiab xa cov cav mus rau Pub / Sub. Tib yam peb yuav tsum tau ua yog tsim ib yam khoom PublisherClient, qhia txoj kev mus rau lub ncauj lus uas siv txoj kev topic_path thiab hu rau lub luag haujlwm publish с topic_path thiab cov ntaub ntawv. Thov nco ntsoov tias peb import generate_log_line los ntawm peb tsab ntawv stream_logs, yog li xyuas kom tseeb tias cov ntaub ntawv no nyob hauv tib lub nplaub tshev, txwv tsis pub koj yuav tau txais qhov yuam kev ntshuam. Peb tuaj yeem khiav qhov no los ntawm peb lub google console siv:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Thaum cov ntaub ntawv khiav, peb yuav tuaj yeem pom qhov tso zis ntawm cov ntaub ntawv log rau lub console, raws li qhia hauv daim duab hauv qab no. Tsab ntawv no yuav ua haujlwm ntev npaum li peb tsis siv CTRL + Cua kom tiav.

Peb tsim cov kwj cov ntaub ntawv ua cov kav dej. Ntu 2
Daim duab 4. Tso zis publish_logs.py

Sau peb lub raj xa dej code

Tam sim no peb muaj txhua yam npaj, peb tuaj yeem pib qhov kev lom zem - coding peb cov raj xa dej siv Beam thiab Python. Yuav kom tsim tau ib tug Beam pipeline, peb yuav tsum tsim ib tug pipeline khoom (p). Thaum peb tau tsim cov khoom siv kav dej, peb tuaj yeem siv ntau txoj haujlwm ib zaug los ntawm kev siv tus neeg teb xov tooj pipe (|). Feem ntau, kev ua haujlwm zoo li cov duab hauv qab no.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Hauv peb cov cai, peb yuav tsim ob txoj haujlwm kev cai. Muaj nuj nqi regex_clean, uas scans cov ntaub ntawv thiab retrieves cov kab sib raug raws li cov npe PATTERNS siv cov haujlwm re.search. Cov haujlwm xa rov qab ib txoj hlua sib cais comma. Yog tias koj tsis yog ib tus kws tshaj lij hais lus tsis tu ncua, kuv xav kom kuaj xyuas qhov no kev qhia thiab xyaum nyob rau hauv lub notepad los xyuas cov cai. Tom qab no peb txhais ib qho kev cai ParDo muaj nuj nqi hu ua Split, uas yog ib tug variation ntawm Beam transform rau parallel processing. Hauv Python, qhov no tau ua tiav hauv txoj kev tshwj xeeb - peb yuav tsum tsim cov chav kawm uas tau txais los ntawm chav kawm DoFn Beam. Txoj haujlwm Split siv cov kab sib cais los ntawm cov haujlwm dhau los thiab xa rov qab cov npe ntawm phau ntawv txhais lus nrog cov yuam sij sib raug rau cov npe kab hauv peb lub rooj BigQuery. Muaj qee yam yuav tsum nco ntsoov txog qhov haujlwm no: Kuv yuav tsum tau import datetime hauv ib qho kev ua haujlwm kom nws ua haujlwm. Kuv tau txais ib qho yuam kev ntshuam thaum pib ntawm cov ntaub ntawv, uas yog weird. Cov npe no ces dhau mus rau qhov ua haujlwm WriteToBigQuery, uas tsuas ntxiv peb cov ntaub ntawv rau lub rooj. Cov cai rau Batch DataFlow Txoj Haujlwm thiab Streaming DataFlow Txoj Haujlwm tau muab hauv qab no. Tsuas yog qhov sib txawv ntawm batch thiab streaming code yog nyob rau hauv batch peb nyeem CSV los ntawm src_pathsiv lub luag haujlwm ReadFromText los ntawm Beam.

Batch DataFlow Txoj Haujlwm (batch processing)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Txoj Haujlwm (kwj ua haujlwm)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Pib lub conveyor

Peb tuaj yeem khiav lub raj xa dej hauv ntau txoj kev sib txawv. Yog tias peb xav tau, peb tuaj yeem khiav nws hauv zos los ntawm lub davhlau ya nyob twg thaum nkag mus rau GCP remotely.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Txawm li cas los xij, peb yuav khiav nws siv DataFlow. Peb tuaj yeem ua qhov no siv cov lus txib hauv qab no los ntawm kev teeb tsa cov kev xav tau hauv qab no.

  • project - ID ntawm koj qhov project GCP.
  • runner yog tus kav kav dej uas yuav txheeb xyuas koj qhov kev pab cuam thiab tsim koj cov kav dej. Txhawm rau khiav hauv huab, koj yuav tsum qhia kom meej DataflowRunner.
  • staging_location - txoj kev mus rau Cloud Dataflow huab cia rau indexing code pob xav tau los ntawm cov txheej txheem ua haujlwm.
  • temp_location - txoj hauv kev mus rau Cloud Dataflow huab cia khaws cia cov ntaub ntawv ua haujlwm ib ntus tsim thaum lub raj xa dej ua haujlwm.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Thaum cov lus txib no tab tom khiav, peb tuaj yeem mus rau DataFlow tab hauv google console thiab saib peb cov raj xa dej. Thaum peb nyem rau ntawm lub raj xa dej, peb yuav tsum pom qee yam zoo ib yam li daim duab 4. Rau kev debugging lub hom phiaj, nws tuaj yeem pab tau zoo heev kom mus rau Logs thiab tom qab ntawd mus rau Stackdriver los saib cov cav kom ntxaws. Qhov no tau pab kuv daws cov teeb meem ntawm cov kav dej hauv ntau qhov xwm txheej.

Peb tsim cov kwj cov ntaub ntawv ua cov kav dej. Ntu 2
Daim duab 4: Beam conveyor

Nkag mus rau peb cov ntaub ntawv hauv BigQuery

Yog li, peb yuav tsum twb muaj lub raj xa dej nrog cov ntaub ntawv ntws mus rau hauv peb lub rooj. Txhawm rau kuaj qhov no, peb tuaj yeem mus rau BigQuery thiab saib cov ntaub ntawv. Tom qab siv cov lus txib hauv qab no koj yuav tsum pom thawj ob peb kab ntawm dataset. Tam sim no peb muaj cov ntaub ntawv khaws cia hauv BigQuery, peb tuaj yeem ua qhov kev soj ntsuam ntxiv, nrog rau muab cov ntaub ntawv nrog cov npoj yaig thiab pib teb cov lus nug ua lag luam.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Peb tsim cov kwj cov ntaub ntawv ua cov kav dej. Ntu 2
Daim duab 5: BigQuery

xaus

Peb cia siab tias cov ntawv tshaj tawm no yog ib qho piv txwv tseem ceeb ntawm kev tsim cov streaming cov ntaub ntawv xa mus, nrog rau nrhiav txoj hauv kev los ua kom cov ntaub ntawv nkag tau yooj yim dua. Kev khaws cov ntaub ntawv hauv hom ntawv no muab ntau yam zoo rau peb. Tam sim no peb tuaj yeem pib teb cov lus nug tseem ceeb xws li pes tsawg tus neeg siv peb cov khoom? Puas yog koj tus neeg siv lub hauv paus loj hlob dhau sijhawm? Dab tsi ntawm cov khoom lag luam ua rau tib neeg cuam tshuam nrog ntau tshaj? Thiab puas muaj qhov yuam kev uas yuav tsum tsis txhob muaj? Cov no yog cov lus nug uas yuav txaus siab rau lub koom haum. Raws li kev nkag siab uas tshwm sim los ntawm cov lus teb rau cov lus nug no, peb tuaj yeem txhim kho cov khoom lag luam thiab ua kom cov neeg siv kev koom tes.

Beam yeej muaj txiaj ntsig zoo rau hom kev tawm dag zog no thiab muaj ntau qhov kev siv nthuav dav thiab. Piv txwv li, tej zaum koj yuav xav txheeb xyuas cov zuam cov ntaub ntawv hauv lub sijhawm tiag tiag thiab ua lag luam raws li kev tshuaj xyuas, tej zaum koj muaj cov ntaub ntawv sensor los ntawm tsheb thiab xav xam cov lej tsheb laij teb. Koj tuaj yeem, piv txwv li, yog lub tuam txhab kev ua si uas sau cov neeg siv cov ntaub ntawv thiab siv nws los tsim dashboards txhawm rau taug qab cov ntsuas tseem ceeb. Okay, cov txiv neej, qhov no yog lub ntsiab lus rau lwm tus tshaj tawm, ua tsaug rau kev nyeem ntawv, thiab rau cov neeg uas xav pom tag nrho cov cai, hauv qab no yog qhov txuas rau kuv GitHub.

https://github.com/DFoly/User_log_pipeline

Yog tag nrho. Nyeem ib feem.

Tau qhov twg los: www.hab.com

Ntxiv ib saib