Hoʻokumu mākou i kahi pipeline hoʻoili ʻikepili kahawai. Mahele 2

Aloha kākou. Ke kaʻana nei mākou i ka unuhi ʻana o ka hapa hope o ka ʻatikala, i hoʻomākaukau pono ʻia no nā haumāna o ka papa. ʻEnekinia ʻIkepili. Hiki iā ʻoe ke heluhelu i ka hapa mua maanei.

ʻO Apache Beam a me DataFlow no nā Pipeline manawa maoli

Hoʻokumu mākou i kahi pipeline hoʻoili ʻikepili kahawai. Mahele 2

Hoʻonohonoho ʻana iā Google Cloud

Nānā: Ua hoʻohana au iā Google Cloud Shell e holo i ka pipeline a hoʻopuka i ka ʻikepili log maʻamau no ka mea ua pilikia wau i ka holo ʻana i ka pipeline ma Python 3. Hoʻohana ʻo Google Cloud Shell i ka Python 2, ʻoi aku ka kūlike me Apache Beam.

No ka hoʻomakaʻana i ka pipeline, pono mākou eʻeli iki i nā hoʻonohonoho. No ʻoukou i hoʻohana ʻole i ka GCP ma mua, pono ʻoe e hahai i kēia mau ʻanuʻu 6 i wehewehe ʻia ma kēia ʻaoʻao.

Ma hope o kēia, pono mākou e hoʻouka i kā mākou mau palapala i Google Cloud Storage a kope iā lākou i kā mākou Google Cloud Shel. He mea liʻiliʻi loa ka hoʻouka ʻana i ka waihona kapuaʻi (hiki ke loaʻa kahi wehewehe maanei). No ke kope ʻana i kā mākou mau faila, hiki iā mākou ke wehe iā Google Cloud Shel mai ka hāmeʻa ma ke kaomi ʻana i ka ikona mua ma ka hema ma ke Kiʻi 2 ma lalo.

Hoʻokumu mākou i kahi pipeline hoʻoili ʻikepili kahawai. Mahele 2
2 Kānāwai

ʻO nā kauoha e pono ai mākou e kope i nā faila a hoʻokomo i nā hale waihona puke i makemake ʻia ma lalo nei.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Ke hana nei i kā mākou waihona a me ka papa

Ke hoʻopau mākou i nā ʻanuʻu pili i ka hoʻonohonoho ʻana, ʻo ka mea aʻe e pono ai mākou e hana, ʻo ia ka hana ʻana i kahi waihona a me ka papa ma BigQuery. Nui nā ala e hana ai i kēia, akā ʻo ka maʻalahi ka hoʻohana ʻana i ka Google Cloud console ma ka hana mua ʻana i kahi waihona. Hiki iā ʻoe ke hahai i nā ʻanuʻu ma lalo louloue hana i papaʻaina me ka hoʻolālā. E loaʻa i kā mākou papaʻaina 7 kolamu, e pili ana i nā ʻāpana o kēlā me kēia log mea hoʻohana. No ka maʻalahi, e wehewehe mākou i nā kolamu āpau ma ke ʻano he mau kaula, koe wale nō ka hoʻololi timelocal, a kapa iā lākou e like me nā ʻano hoʻololi a mākou i hana ai ma mua. Pono ke ʻano o kā mākou papa ʻaina e like me ke kiʻi 3.

Hoʻokumu mākou i kahi pipeline hoʻoili ʻikepili kahawai. Mahele 2
Kiʻi 3. Hoʻolālā papa

Ke hoʻopuka nei i ka ʻikepili mooolelo mea hoʻohana

ʻO Pub/Sub kahi mea koʻikoʻi o kā mākou pipeline no ka mea hiki i nā noi kūʻokoʻa lehulehu ke kamaʻilio me kekahi. ʻO ka mea kūikawā, hana ia ma ke ʻano he mea waena e hiki ai iā mākou ke hoʻouna a loaʻa i nā leka ma waena o nā noi. ʻO ka mea mua e pono ai mākou e hana i kahi kumuhana. E hele wale i Pub/Sub i ka console a kaomi i CREATE TOPIC.

Kāhea ka code ma lalo i kā mākou palapala e hana i ka ʻikepili log i wehewehe ʻia ma luna a laila hoʻohui a hoʻouna i nā lāʻau i Pub/Sub. ʻO ka mea wale nō e pono ai mākou e hana i kahi mea Mea hoʻopuka Client, e kuhikuhi i ke ala i ke kumuhana me ke ala topic_path a kāhea i ka hana publish с topic_path a me ka ʻikepili. E ʻoluʻolu, e lawe mai mākou generate_log_line mai kā mākou palapala stream_logs, no laila e hōʻoia i kēia mau faila i loko o ka waihona hoʻokahi, i ʻole e loaʻa iā ʻoe kahi hewa lawe mai. Hiki iā mākou ke holo i kēia ma kā mākou google console me ka hoʻohana ʻana:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

I ka holo ʻana o ka faila, hiki iā mākou ke ʻike i ka hoʻopuka ʻana o ka ʻikepili log i ka console, e like me ka mea i hōʻike ʻia ma ke kiʻi ma lalo nei. E holo ana kēia palapala inā ʻaʻole mākou e hoʻohana CTRL + Ce hoopau.

Hoʻokumu mākou i kahi pipeline hoʻoili ʻikepili kahawai. Mahele 2
Kiʻi 4. Hoʻopuka publish_logs.py

Ke kākau ʻana i kā mākou code pipeline

I kēia manawa ua hoʻomākaukau mākou i nā mea āpau, hiki iā mākou ke hoʻomaka i ka ʻāpana leʻaleʻa - ka hoʻopili ʻana i kā mākou pipeline me ka Beam a me Python. No ka hana ʻana i kahi pipeline Beam, pono mākou e hana i kahi mea pipeline (p). Ke hana mākou i kahi mea pipeline, hiki iā mākou ke hoʻohana i nā hana he nui ma hope o kekahi me ka hoʻohana ʻana i ka mea hoʻohana pipe (|). Ma keʻano laulā, ua like ke ʻano o ke kaʻina hana me ke kiʻi ma lalo nei.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Ma kā mākou code, e hana mākou i ʻelua mau hana maʻamau. Hana regex_clean, ka mea nānā i ka ʻikepili a hoʻihoʻi i ka lālani pili e pili ana i ka papa inoa PATTERNS me ka hoʻohana ʻana i ka hana. re.search. Hoʻihoʻi ka hana i kahi kaula i hoʻokaʻawale ʻia i ke koma. Inā ʻaʻole ʻoe he loea ʻōlelo maʻamau, paipai wau e nānā i kēia kumu aʻo a hoʻomaʻamaʻa i kahi notepad e nānā i ke code. Ma hope o kēia, wehewehe mākou i kahi hana ParDo maʻamau i kapa ʻia mahae, he ʻano hoʻololi o ka hoʻololi ʻana i ka Beam no ka hana like. Ma Python, hana ʻia kēia ma kahi ala kūikawā - pono mākou e hana i kahi papa i hoʻoilina mai ka papa DoFn Beam. Lawe ka hana Split i ka lālani i ʻoki ʻia mai ka hana mua a hoʻihoʻi i ka papa inoa o nā puke wehewehe ʻōlelo me nā kī e pili ana i nā inoa kolamu ma kā mākou papa BigQuery. Aia kekahi mea e nānā ai e pili ana i kēia hana: pono wau e hoʻokomo datetime i loko o kahi hana e hana ai. Loaʻa iaʻu kahi hewa lawe mai i ka hoʻomaka ʻana o ka faila, he mea ʻē. Hāʻawi ʻia kēia papa inoa i ka hana KākauToBigQuery, ka mea e hoʻohui wale i kā mākou ʻikepili i ka papaʻaina. Hāʻawi ʻia ka code no Batch DataFlow Job a me Streaming DataFlow Job ma lalo nei. ʻO ka ʻokoʻa wale nō ma waena o ka pūʻulu a me ke code streaming ʻo ia ma ka batch heluhelu mākou i ka CSV mai src_pathhoʻohana i ka hana ReadFromText mai ka Beam.

Hui Pūʻulu DataFlow (ka hana pūʻulu)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Ke hoʻoheheʻe ʻana i ka DataFlow Job (ka hoʻoili kahawai)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Hoʻomaka i ka conveyor

Hiki iā mākou ke holo i ka pipeline ma nā ʻano like ʻole. Inā makemake mākou, hiki iā mākou ke holo ma ka ʻāina mai kahi pahu i ka wā e hoʻopaʻa ana i ka GCP mamao.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Eia naʻe, e holo mākou me ka hoʻohana ʻana i DataFlow. Hiki iā mākou ke hana i kēia me ka hoʻohana ʻana i ke kauoha ma lalo nei ma ka hoʻonohonoho ʻana i nā ʻāpana e pono ai.

  • project — ID o kāu papahana GCP.
  • runner he mea holo paipu e kālailai i kāu polokalamu a kūkulu i kāu paipu. No ka holo ʻana i ke ao, pono ʻoe e kuhikuhi i kahi DataflowRunner.
  • staging_location - ke ala i ka Cloud Dataflow cloud storage no ka helu ʻana i nā pūʻolo helu helu e pono ai e nā mea hana e hana ana i ka hana.
  • temp_location — ala i ka waihona kapuaʻi Cloud Dataflow no ka mālama ʻana i nā faila hana pōkole i hana ʻia i ka wā e holo ana ka pipeline.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

ʻOiai e holo ana kēia kauoha, hiki iā mākou ke hele i ka DataFlow tab i ka google console a nānā i kā mākou pipeline. Ke kaomi mākou i ka pipeline, pono mākou e ʻike i kahi mea e like me ka Figure 4. No ka hoʻopau ʻana, hiki ke kōkua nui i ka hele ʻana i Logs a laila i Stackdriver e nānā i nā kikoʻī kikoʻī. Ua kōkua kēia iaʻu e hoʻoponopono i nā pilikia pipeline i kekahi mau hihia.

Hoʻokumu mākou i kahi pipeline hoʻoili ʻikepili kahawai. Mahele 2
Kiʻi 4: ʻO ka mea lawe kukui

E kiʻi i kā mākou ʻikepili ma BigQuery

No laila, pono mākou i kahi pipeline e holo ana me ka ʻikepili e kahe ana i kā mākou papaʻaina. No ka hoʻāʻo ʻana i kēia, hiki iā mākou ke hele i BigQuery a nānā i ka ʻikepili. Ma hope o ka hoʻohana ʻana i ke kauoha ma lalo nei e ʻike ʻoe i nā lālani mua o ka dataset. I kēia manawa ua loaʻa iā mākou ka ʻikepili i mālama ʻia ma BigQuery, hiki iā mākou ke hana hou i ka nānā ʻana, a kaʻana like i ka ʻikepili me nā hoa hana a hoʻomaka e pane i nā nīnau ʻoihana.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Hoʻokumu mākou i kahi pipeline hoʻoili ʻikepili kahawai. Mahele 2
Kiʻi 5: BigQuery

hopena

Manaʻo mākou e lilo kēia pou i kumu hoʻohālike maikaʻi no ka hoʻokumu ʻana i kahi pipeline data streaming, a me ka ʻimi ʻana i nā ala e hiki ai ke ʻike i ka ʻikepili. ʻO ka mālama ʻana i ka ʻikepili i kēia ʻano hāʻawi iā mākou i nā pono he nui. I kēia manawa hiki iā mākou ke hoʻomaka e pane i nā nīnau koʻikoʻi e like me ka nui o ka poʻe e hoʻohana nei i kā mākou huahana? Ke ulu nei kāu waihona mea hoʻohana i ka manawa? He aha nā hiʻohiʻona o ka huahana e launa nui ai nā kānaka? A he mau hewa paha kahi e pono ʻole ai? ʻO kēia nā nīnau e pili ana i ka hui. Ma muli o nā ʻike e puka mai ana mai nā pane i kēia mau nīnau, hiki iā mākou ke hoʻomaikaʻi i ka huahana a hoʻonui i ka hoʻopili ʻana o ka mea hoʻohana.

Pono maoli ʻo Beam no kēia ʻano hoʻomaʻamaʻa a loaʻa kekahi mau hihia hoʻohana hoihoi. No ka laʻana, makemake paha ʻoe e kālailai i ka ʻikepili tick stock i ka manawa maoli a hana i nā kālepa e pili ana i ka nānā ʻana, malia paha loaʻa iā ʻoe ka ʻikepili sensor e hele mai ana mai nā kaʻa a makemake ʻoe e helu i ka helu ʻana o ka pae kaʻa. Hiki iā ʻoe, no ka laʻana, he hui pāʻani e hōʻiliʻili i ka ʻikepili mea hoʻohana a hoʻohana iā ia e hana i nā dashboards e hahai i nā metric koʻikoʻi. ʻAe, e nā keʻena, he kumuhana kēia no kekahi pou, mahalo no ka heluhelu ʻana, a no ka poʻe makemake e ʻike i ke code piha, aia ma lalo ka loulou i kaʻu GitHub.

https://github.com/DFoly/User_log_pipeline

Pau kēlā. Heluhelu i ka hapa mua.

Source: www.habr.com

Pākuʻi i ka manaʻo hoʻopuka