Anyị na-emepụta pipeline nhazi data iyi. Akụkụ 2

Ndewo, unu niile. Anyị na-ekerịta nsụgharị nke akụkụ ikpeazụ nke isiokwu ahụ, nke a kwadoro kpọmkwem maka ụmụ akwụkwọ nke usoro ahụ. Onye injinia data. Ị nwere ike ịgụ akụkụ nke mbụ ebe a.

Apache Beam na DataFlow maka pipeline ezigbo oge

Anyị na-emepụta pipeline nhazi data iyi. Akụkụ 2

Ịtọlite ​​​​Google Cloud

Rịba ama: M na-eji Google Cloud Shell na-agba ọsọ pipeline ma bipụta data ndekọ omenala n'ihi na enwere m nsogbu na-agba ọsọ pipeline na Python 3. Google Cloud Shell na-eji Python 2, nke na-adabere na Apache Beam.

Iji malite pipeline, anyị kwesịrị igwu ntakịrị n'ime ntọala. Maka ndị na-ejibeghị GCP mbụ, ị ga-achọ ịgbaso usoro 6 ndị a akọwapụtara na nke a peeji nke.

Mgbe nke a gachara, anyị ga-ebugo script anyị na Google Cloud Storage wee detuo ha na Google Cloud Shel anyị. Ibugo na nchekwa igwe ojii dị obere (enwere ike ịchọta nkọwa ebe a). Iji detuo faịlụ anyị, anyị nwere ike imepe Google Cloud Shel site na toolbar site na ịpị akara ngosi nke mbụ n'aka ekpe na eserese 2 n'okpuru.

Anyị na-emepụta pipeline nhazi data iyi. Akụkụ 2
Ihe osise 2

Iwu ndị anyị kwesịrị idetuo faịlụ na ịwụnye ọba akwụkwọ achọrọ ka edepụtara n'okpuru.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Ịmepụta nchekwa data na tebụl anyị

Ozugbo anyị mechara usoro niile metụtara ntọala, ihe ọzọ anyị kwesịrị ime bụ imepụta dataset na tebụl na BigQuery. Enwere ụzọ dị iche iche iji mee nke a, mana nke kachasị mfe bụ iji Google Cloud console site na mbụ ịmepụta dataset. Ị nwere ike soro usoro n'okpuru njikọiji mepụta tebụl nwere schema. Tebụl anyị ga-enwe 7 ogidi, kwekọrọ na akụkụ nke ndekọ onye ọrụ ọ bụla. Maka ịdị mma, anyị ga-akọwapụta kọlụm niile dị ka eriri, ma e wezụga maka mgbanwe oge, wee kpọọ ha aha dịka mgbanwe ndị anyị mepụtara na mbụ. Nhazi nke tebụl anyị kwesịrị ịdị na foto nke 3.

Anyị na-emepụta pipeline nhazi data iyi. Akụkụ 2
Ọgụgụ 3. Nhazi tebụl

Na-ebipụta data ndekọ onye ọrụ

Pub/Sub bụ akụkụ dị oke mkpa nke pipeline anyị n'ihi na ọ na-enye ohere ka ọtụtụ ngwa nọọrọ onwe ha na-ekwurịta okwu. Karịsịa, ọ na-arụ ọrụ dị ka intermediary na-enye anyị ohere izipu ma nata ozi n'etiti ngwa. Ihe mbụ anyị kwesịrị ime bụ ịmepụta isiokwu. Naanị gaa na Pub/Sub n'ime njikwa wee pịa Mepụta TOPIC.

Koodu dị n'okpuru na-akpọ edemede anyị ka imepụta data ndekọ akọwara n'elu wee jikọọ wee zipu ndekọ na Pub/Sub. Naanị ihe anyị kwesịrị ime bụ imepụta ihe Onye mbipụta akwụkwọ, ezipụta ụzọ nke isiokwu na-eji usoro topic_path ma kpọọ ọrụ ahụ publish с topic_path na data. Biko mara na anyị na-ebubata generate_log_line site na edemede anyị stream_logs, yabụ jide n'aka na faịlụ ndị a dị n'otu nchekwa ahụ, ma ọ bụghị na ị ga-enweta njehie mbubata. Anyị nwere ike mee nke a site na google console site na iji:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Ozugbo faịlụ ahụ na-agba ọsọ, anyị ga-enwe ike ịhụ mmepụta nke data log na console, dị ka egosiri na foto dị n'okpuru. Edemede a ga-arụ ọrụ ma ọ bụrụhaala na anyị anaghị eji CTRL + Cimezu ya.

Anyị na-emepụta pipeline nhazi data iyi. Akụkụ 2
Ọgụgụ 4. Mmepụta publish_logs.py

Na-ede koodu pipeline anyị

Ugbu a anyị nwere ihe niile akwadoro, anyị nwere ike ịmalite akụkụ ntụrụndụ - iji Beam na Python na-edobe pipeline anyị. Iji mepụta pipeline Beam, anyị kwesịrị ịmepụta ihe ọkpọkọ (p). Ozugbo anyị mepụtara ihe pipeline, anyị nwere ike itinye ọtụtụ ọrụ n'otu n'otu site na iji onye ọrụ pipe (|). N'ozuzu, usoro ọrụ ahụ dị ka foto dị n'okpuru.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Na koodu anyị, anyị ga-emepụta ọrụ omenala abụọ. Ọrụ regex_clean, nke na-enyocha data wee weghachite ahịrị kwekọrọ dabere na ndepụta PATTERNS site na iji ọrụ ahụ. re.search. Ọrụ ahụ na-eweghachi eriri rikoma kewapụrụ. Ọ bụrụ na ị bụghị ọkachamara okwu mgbe niile, ana m akwado ịlele nke a nkuzi ma na-eme ihe na mpempe akwụkwọ iji lelee koodu ahụ. Mgbe nke a gasịrị, anyị na-akọwapụta ọrụ ParDo omenala a na-akpọ Kewaa, nke bụ mgbanwe nke mgbanwe Beam maka nhazi nhazi. Na Python, a na-eme nke a n'ụzọ pụrụ iche - anyị ga-emepụta klas nke ketara na klaasị DoFn Beam. Ọrụ Split na-ewepụta ahịrị tụgharịrị site na ọrụ gara aga wee weghachi ndepụta ọkọwa okwu nwere igodo dabara na aha kọlụm dị na tebụl BigQuery anyị. Onwere ihe m ga-arịba ama gbasara ọrụ a: M ga-ebubata datetime n'ime ọrụ iji mee ka ọ rụọ ọrụ. M na-enweta njehie mbubata na mmalite nke faịlụ ahụ, nke dị egwu. A na-agafe ndepụta a na ọrụ ahụ DeeToBigQuery, nke na-agbakwụnye naanị data anyị na tebụl. Enyere koodu maka Batch DataFlow Job na Streaming DataFlow Job n'okpuru. Naanị ihe dị iche n'etiti batch na koodu nkwanye bụ na na batch anyị na-agụ CSV si src_pathiji ọrụ ReadFromText sitere na Beam.

Batch DataFlow Job (nhazi ogbe)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Ọrụ DataFlow gụgharia (nhazi iyi)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Malite ebufe

Anyị nwere ike ịgbasa pipeline n'ụzọ dị iche iche. Ọ bụrụ na anyị chọrọ, anyị nwere ike ịgba ọsọ ya na mpaghara site na ọdụ ka anyị na-abanye na GCP n'ime ime.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Agbanyeghị, anyị ga-eji DataFlow mee ya. Anyị nwere ike ime nke a site na iji iwu dị n'okpuru site na ịtọ ntọala ndị a chọrọ.

  • project - NJ nke ọrụ GCP gị.
  • runner bụ onye na-agba ọkpọ ọkpọ nke ga-enyocha mmemme gị wee wuo ọkpọkọ gị. Iji na-agba ọsọ n'igwe ojii, ị ga-ezipụta DataflowRunner.
  • staging_location - ụzọ na nchekwa igwe ojii Dataflow maka nchịkọta koodu ntinye nke ndị nhazi na-arụ ọrụ ahụ chọrọ.
  • temp_location - ụzọ na nchekwa igwe ojii Dataflow maka ịchekwa faịlụ ọrụ nwa oge emepụtara ka pipeline na-agba ọsọ.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Mgbe iwu a na-agba ọsọ, anyị nwere ike ịga na taabụ DataFlow na google console wee lelee pipeline anyị. Mgbe anyị pịrị na pipeline, anyị kwesịrị ịhụ ihe yiri ihe osise 4. Maka ebumnuche debugging, ọ nwere ike inye aka na-aga na Logs wee gaa Stackdriver iji lelee ndekọ zuru ezu. Nke a enyerela m aka idozi nsogbu pipeline n'ọtụtụ ikpe.

Anyị na-emepụta pipeline nhazi data iyi. Akụkụ 2
Onyonyo 4: Ntugharị beam

Nweta data anyị na BigQuery

Ya mere, anyị kwesịrị inweworị pipeline na-agba ọsọ na data na-abanye na tebụl anyị. Iji nwalee nke a, anyị nwere ike ịga na BigQuery wee lelee data ahụ. Mgbe ijiri iwu dị n'okpuru, ị ga-ahụ ahịrị ole na ole mbụ nke dataset. Ugbu a anyị nwere data echekwara na BigQuery, anyị nwere ike ịme nyocha ọzọ, yana ịkekọrịta data na ndị ọrụ ibe wee malite ịza ajụjụ azụmahịa.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Anyị na-emepụta pipeline nhazi data iyi. Akụkụ 2
Ọgụgụ 5: BigQuery

nkwubi

Anyị na-atụ anya na post a ga-abụ ihe atụ bara uru nke ịmepụta pipeline data gụgharia, yana ịchọta ụzọ isi mee ka data dịkwuo mfe. Ịchekwa data n'ụdị a na-enye anyị ọtụtụ uru. Ugbu a, anyị nwere ike ịmalite ịza ajụjụ ndị dị mkpa dị ka mmadụ ole na-eji ngwaahịa anyị? Ebe onye ọrụ gị ọ na-eto ka oge na-aga? Kedu akụkụ nke ngwaahịa ndị mmadụ na-ejikarị emekọrịta ihe? Ma enwere mmejọ ebe a na-ekwesịghị ịdị? Ajụjụ ndị a ga-amasị nzukọ a. Dabere na nghọta ndị na-apụta na azịza nke ajụjụ ndị a, anyị nwere ike imeziwanye ngwaahịa ahụ ma nwekwuo njikọ aka onye ọrụ.

Beam bara uru n'ezie maka ụdị mmega ahụ ma nweekwa ọtụtụ ikpe ojiji ndị ọzọ na-atọ ụtọ. Dịka ọmụmaatụ, ị nwere ike ịchọrọ iji nyochaa data akara ngwaahịa ozugbo wee mee azụmahịa dabere na nyocha ahụ, ikekwe ị nwere data sensọ na-abịa site na ụgbọ ala ma chọọ ịgbakọ ngụkọta ọkwa okporo ụzọ. Ị nwekwara ike, dịka ọmụmaatụ, bụrụ ụlọ ọrụ egwuregwu na-anakọta data onye ọrụ ma jiri ya mepụta dashboard iji soro metrics isi. Ọ dị mma, ụmụ nwoke, nke a bụ isiokwu maka ọkwa ọzọ, daalụ maka ịgụ akwụkwọ, yana maka ndị chọrọ ịhụ koodu zuru ezu, n'okpuru bụ njikọ nke GitHub m.

https://github.com/DFoly/User_log_pipeline

Nke ahụ bụ ihe niile. Gụọ akụkụ nke mbụ.

isi: www.habr.com

Tinye a comment