Nou kreye yon tiyo pou pwosesis done difizyon. Pati 2

Bonjou tout moun. N ap pataje tradiksyon final la nan atik la, ki prepare espesyalman pou elèv ki nan kou a. Enjenyè Done. Ou ka li premye pati a isit la.

Apache Beam ak DataFlow pou tiyo an tan reyèl

Nou kreye yon tiyo pou pwosesis done difizyon. Pati 2

Mete kanpe Google Cloud

Remak: Mwen te itilize Google Cloud Shell pou kouri tiyo a epi pibliye done log koutim paske mwen te gen pwoblèm pou kouri tiyo a nan Python 3. Google Cloud Shell itilize Python 2, ki pi konsistan avèk Apache Beam.

Pou kòmanse tiyo a, nou bezwen fouye yon ti kras nan anviwònman yo. Pou moun nan nou ki pa te itilize GCP anvan, w ap bezwen swiv 6 etap sa yo ki endike nan sa a. paj.

Apre sa, nou pral bezwen telechaje scripts nou yo nan Google Cloud Storage epi kopye yo nan Google Cloud Shel nou an. Téléchargement nan depo nwaj se byen trivial (yo ka jwenn yon deskripsyon isit la). Pou kopye fichye nou yo, nou ka louvri Google Cloud Shel nan ba zouti a lè nou klike sou premye ikòn sou bò gòch la nan Figi 2 ki anba a.

Nou kreye yon tiyo pou pwosesis done difizyon. Pati 2
KI MOUN KI Figi

Kòmandman nou bezwen pou kopye fichye yo epi enstale bibliyotèk obligatwa yo ki nan lis anba a.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Kreye baz done nou an ak tab la

Yon fwa nou fin ranpli tout etap ki gen rapò ak konfigirasyon yo, pwochen bagay nou bezwen fè se kreye yon seri done ak tab nan BigQuery. Gen plizyè fason pou fè sa, men pi senp lan se sèvi ak konsole Google Cloud pa premye kreye yon seri done. Ou ka swiv etap ki anba yo lyenpou kreye yon tab ak yon chema. Tab nou an ap genyen 7 kolòn, ki koresponn ak eleman yo nan chak boutèy demi lit itilizatè. Pou konvenyans, nou pral defini tout kolòn kòm fisèl, eksepte pou varyab timelocal, epi non yo dapre varyab nou te pwodwi pi bonè. Layout tab nou an ta dwe sanble nan Figi 3.

Nou kreye yon tiyo pou pwosesis done difizyon. Pati 2
Figi 3. Layout tab la

Pibliye done boutèy demi lit itilizatè yo

Pub/Sub se yon eleman enpòtan nan tiyo nou an paske li pèmèt plizyè aplikasyon endepandan kominike youn ak lòt. An patikilye, li travay kòm yon entèmedyè ki pèmèt nou voye ak resevwa mesaj ant aplikasyon yo. Premye bagay nou bezwen fè se kreye yon sijè. Senpleman ale nan Pub/Sub nan konsole a epi klike sou CREATE TOPIC.

Kòd ki anba a rele script nou an pou jenere done boutèy demi lit yo defini pi wo a epi answit konekte epi voye mòso bwa yo nan Pub/Sub. Sèl bagay nou bezwen fè se kreye yon objè PublisherClient, presize chemen an nan sijè a lè l sèvi avèk metòd la topic_path epi rele fonksyon an publish с topic_path ak done. Tanpri sonje ke nou enpòte generate_log_line soti nan script nou an stream_logs, Se konsa, asire w ke dosye sa yo yo nan katab la menm, otreman ou pral jwenn yon erè enpòte. Lè sa a, nou ka kouri sa a atravè konsole Google nou an lè l sèvi avèk:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Le pli vit ke dosye a kouri, nou pral kapab wè pwodiksyon an nan done boutèy la nan konsole a, jan yo montre nan figi ki anba a. Script sa a pral travay osi lontan ke nou pa itilize Ctrl + Cpou konplete li.

Nou kreye yon tiyo pou pwosesis done difizyon. Pati 2
Figi 4. Sòti publish_logs.py

Ekri kòd tiyo nou an

Kounye a ke nou gen tout bagay prepare, nou ka kòmanse pati nan plezi - kode tiyo nou an lè l sèvi avèk Beam ak Python. Pou kreye yon tiyo Beam, nou bezwen kreye yon objè tiyo (p). Yon fwa nou te kreye yon objè tiyo, nou ka aplike plizyè fonksyon youn apre lòt lè l sèvi avèk operatè a pipe (|). An jeneral, workflow la sanble ak imaj ki anba a.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Nan kòd nou an, nou pral kreye de fonksyon koutim. Fonksyon regex_clean, ki analize done yo epi rekipere ranje ki koresponn lan ki baze sou lis MODEL yo lè l sèvi avèk fonksyon an re.search. Fonksyon an retounen yon kòd ki separe vigil. Si ou pa yon ekspè nan ekspresyon regilye, mwen rekòmande tcheke sa a leson patikilye epi pratike nan yon notepad pou tcheke kòd la. Apre sa nou defini yon fonksyon ParDo koutim rele Split, ki se yon varyasyon Beam transfòme pou pwosesis paralèl. Nan Python, sa fèt nan yon fason espesyal - nou dwe kreye yon klas ki eritye nan klas DoFn Beam la. Fonksyon Split la pran ranje analiz la nan fonksyon anvan an epi li retounen yon lis diksyonè ak kle ki koresponn ak non kolòn yo nan tablo BigQuery nou an. Gen yon bagay pou sonje sou fonksyon sa a: mwen te oblije enpòte datetime andedan yon fonksyon pou fè li travay. Mwen te resevwa yon erè enpòte nan kòmansman an nan dosye a, ki te etranj. Lè sa a, lis sa a pase nan fonksyon an WriteToBigQuery, ki tou senpleman ajoute done nou yo sou tab la. Kòd pou travay pakèt DataFlow ak travay Streaming DataFlow anba a. Sèl diferans ki genyen ant pakèt ak kòd difizyon se ke nan pakèt nou li CSV la src_pathlè l sèvi avèk fonksyon an ReadFromText soti nan gwo bout bwa.

Travay pakèt DataFlow (pwosesis pakèt)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (pwosesis kouran)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Kòmanse CONVEYOR la

Nou ka kouri tiyo a nan plizyè fason diferan. Si nou te vle, nou ta ka jis kouri li lokalman nan yon tèminal pandan y ap konekte nan GCP adistans.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Sepandan, nou pral kouri li lè l sèvi avèk DataFlow. Nou ka fè sa lè l sèvi avèk kòmandman ki anba a pa mete paramèt obligatwa sa yo.

  • project — ID pwojè GCP ou a.
  • runner se yon kourè tiyo ki pral analize pwogram ou a ak konstwi tiyo ou a. Pou kouri nan nwaj la, ou dwe presize yon DataflowRunner.
  • staging_location — chemen ki mennen nan depo nwaj Cloud Dataflow pou pakè kòd Indexing ki nesesè pa processeurs k ap fè travay la.
  • temp_location — chemen nan depo nwaj Cloud Dataflow pou estoke fichye travay tanporè ki te kreye pandan tiyo a ap kouri.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Pandan ke lòd sa a ap kouri, nou ka ale nan tab la DataFlow nan konsole Google la epi wè tiyo nou an. Lè nou klike sou tiyo a, nou ta dwe wè yon bagay ki sanble ak Figi 4. Pou rezon debogaj, li ka trè itil pou ale nan Logs ak Lè sa a, nan Stackdriver yo wè mòso detay yo. Sa a te ede m rezoud pwoblèm tiyo nan yon kantite ka.

Nou kreye yon tiyo pou pwosesis done difizyon. Pati 2
Figi 4: Beam CONVEYOR

Aksede done nou yo nan BigQuery

Se konsa, nou ta dwe deja gen yon tiyo kouri ak done ap koule tankou dlo nan tab nou an. Pou teste sa a, nou ka ale nan BigQuery epi gade done yo. Apre w fin itilize kòmandman ki anba a, ou ta dwe wè premye ranje done yo. Kounye a ke nou gen done yo ki estoke nan BigQuery, nou ka fè plis analiz, osi byen ke pataje done yo ak kòlèg yo epi kòmanse reponn kesyon biznis yo.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Nou kreye yon tiyo pou pwosesis done difizyon. Pati 2
Figi 5: BigQuery

Konklizyon

Nou espere pòs sa a sèvi kòm yon egzanp itil pou kreye yon tiyo done difizyon, ansanm ak jwenn fason pou fè done yo pi aksesib. Sere done nan fòma sa a ban nou anpil avantaj. Koulye a, nou ka kòmanse reponn kesyon enpòtan tankou konbyen moun ki sèvi ak pwodwi nou an? Èske baz itilizatè ou an ap grandi sou tan? Ki aspè nan pwodwi a moun yo kominike ak pi plis? Epi èske gen erè kote pa ta dwe genyen? Se kesyon sa yo ki pral enterese òganizasyon an. Dapre enfòmasyon ki soti nan repons kesyon sa yo, nou ka amelyore pwodwi a epi ogmante angajman itilizatè yo.

Beam se reyèlman itil pou sa a ki kalite egzèsis epi li gen yon kantite lòt ka itilize enteresan tou. Pou egzanp, ou ka vle analize done tik stock nan tan reyèl epi fè echanj ki baze sou analiz la, petèt ou gen done Capteur ki soti nan machin epi ou vle kalkile kalkil nivo trafik. Ou ta ka tou, pou egzanp, yon konpayi jwèt ki kolekte done itilizatè epi sèvi ak li yo kreye tablodbò pou swiv mezi kle yo. Oke, mesye, sa a se yon sijè pou yon lòt pòs, mèsi pou lekti, ak pou moun ki vle wè kòd la konplè, anba a se lyen ki mennen nan GitHub mwen an.

https://github.com/DFoly/User_log_pipeline

Sa se tout. Li premye pati.

Sous: www.habr.com

Add nouvo kòmantè