Naghimo kami usa ka pipeline sa pagproseso sa datos sa sapa. Bahin 2

Kumusta tanan. Among gipaambit ang hubad sa kataposang bahin sa artikulo, nga espesipikong giandam alang sa mga estudyante sa kurso. Data Engineer. Mahimo nimong basahon ang unang bahin dinhi.

Apache Beam ug DataFlow alang sa Real-Time nga mga Pipeline

Naghimo kami usa ka pipeline sa pagproseso sa datos sa sapa. Bahin 2

Pag-set up sa Google Cloud

Mubo nga sulat: Gigamit nako ang Google Cloud Shell sa pagpadagan sa pipeline ug pagmantala sa custom log data tungod kay naglisud ko sa pagpadagan sa pipeline sa Python 3. Ang Google Cloud Shell naggamit sa Python 2, nga mas nahiuyon sa Apache Beam.

Aron masugdan ang pipeline, kinahanglan namon nga magkalot og gamay sa mga setting. Para nimo nga wala pa mogamit sa GCP kaniadto, kinahanglan nimong sundon ang mosunod nga 6 nga mga lakang nga gilatid niini panid.

Pagkahuman niini, kinahanglan namong i-upload ang among mga script sa Google Cloud Storage ug kopyahon kini sa among Google Cloud Shel. Ang pag-upload sa cloud storage kay gamay ra (usa ka paghulagway ang makit-an dinhi). Aron kopyahon ang among mga file, mahimo namong ablihan ang Google Cloud Shel gikan sa toolbar pinaagi sa pag-klik sa unang icon sa wala sa Figure 2 sa ubos.

Naghimo kami usa ka pipeline sa pagproseso sa datos sa sapa. Bahin 2
Hulagway 2

Ang mga sugo nga kinahanglan natong kopyahon ang mga file ug i-install ang gikinahanglan nga mga librarya gilista sa ubos.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Paghimo sa among database ug lamesa

Kung nahuman na namo ang tanan nga mga lakang nga may kalabutan sa pag-setup, ang sunod nga kinahanglan namon buhaton mao ang paghimo usa ka dataset ug lamesa sa BigQuery. Adunay ubay-ubay nga mga paagi sa pagbuhat niini, apan ang pinakasimple mao ang paggamit sa Google Cloud console pinaagi sa paghimo una og dataset. Mahimo nimong sundon ang mga lakang sa ubos linksa paghimo sa usa ka lamesa nga adunay usa ka schema. Ang among lamesa adunay 7 kolum, nga katumbas sa mga sangkap sa matag log sa gumagamit. Alang sa kasayon, among ipasabut ang tanan nga mga kolum ingon mga kuwerdas, gawas sa timelocal variable, ug nganlan sila sumala sa mga variable nga among namugna sa sayo pa. Ang layout sa among lamesa kinahanglan nga tan-awon sama sa Figure 3.

Naghimo kami usa ka pipeline sa pagproseso sa datos sa sapa. Bahin 2
Figure 3. Layout sa lamesa

Pagmantala sa datos sa log sa gumagamit

Ang Pub/Sub usa ka kritikal nga bahin sa among pipeline tungod kay gitugotan niini ang daghang mga independente nga aplikasyon nga makigkomunikar sa usag usa. Sa partikular, kini naglihok isip tigpataliwala nga nagtugot kanamo sa pagpadala ug pagdawat sa mga mensahe tali sa mga aplikasyon. Ang unang butang nga kinahanglan natong buhaton mao ang paghimo og usa ka hilisgutan. Adto lang sa Pub/Sub sa console ug i-klik PAGHIMO TOPIC.

Ang code sa ubos nagtawag sa among script aron makamugna ang data sa log nga gihubit sa ibabaw ug dayon nagkonektar ug nagpadala sa mga troso sa Pub/Sub. Ang kinahanglan ra natong buhaton mao ang paghimo og butang Kliyente sa Publisher, ipiho ang dalan padulong sa hilisgutan gamit ang pamaagi topic_path ug tawagan ang function publish с topic_path ug data. Palihug timan-i nga kami nag-import generate_log_line gikan sa among script stream_logs, busa siguruha nga kini nga mga file naa sa parehas nga folder, kung dili makakuha ka usa ka sayup sa pag-import. Mahimo namong ipadagan kini pinaagi sa among google console gamit ang:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Sa diha nga ang file modagan, atong makita ang output sa log data ngadto sa console, sama sa gipakita sa hulagway sa ubos. Kini nga script molihok basta dili kami mogamit CTRL + Caron makompleto kini.

Naghimo kami usa ka pipeline sa pagproseso sa datos sa sapa. Bahin 2
Hulagway 4. Output publish_logs.py

Pagsulat sa among pipeline code

Karon nga naandam na namo ang tanan, makasugod na kami sa makalingaw nga bahin - pag-coding sa among pipeline gamit ang Beam ug Python. Aron makamugna ug Beam pipeline, kinahanglang maghimo kita ug pipeline object (p). Sa higayon nga nakahimo na kami og pipeline nga butang, mahimo namong magamit ang daghang mga function sa usag usa gamit ang operator pipe (|). Sa kinatibuk-an, ang workflow sama sa imahe sa ubos.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Sa among code, maghimo kami og duha ka custom functions. Kalihokan regex_clean, nga nag-scan sa datos ug nagkuha sa katugbang nga laray base sa lista sa PATTERNS gamit ang function re.search. Ang function nagbalik sa usa ka comma separated string. Kung dili ka usa ka eksperto sa regular nga ekspresyon, girekomenda nako nga susihon kini tutorial ug pagpraktis sa usa ka notepad aron masusi ang code. Pagkahuman niini among gihubit ang usa ka naandan nga ParDo function nga gitawag split, nga usa ka variation sa Beam transform para sa parallel nga pagproseso. Sa Python, kini gihimo sa usa ka espesyal nga paagi - kinahanglan nga maghimo kita usa ka klase nga makapanunod gikan sa klase sa DoFn Beam. Gikuha sa Split function ang parsed row gikan sa miaging function ug gibalik ang lista sa mga diksyonaryo nga adunay mga yawe nga katumbas sa mga ngalan sa kolum sa among lamesa sa BigQuery. Adunay usa ka butang nga timan-an bahin sa kini nga function: Kinahanglan kong mag-import datetime sulod sa usa ka function aron kini molihok. Nakakuha ako usa ka sayup sa pag-import sa sinugdanan sa file, nga katingad-an. Kini nga lista ipasa sa function WriteToBigQuery, nga nagdugang lang sa among datos sa lamesa. Ang code alang sa Batch DataFlow Job ug Streaming DataFlow Job gihatag sa ubos. Ang bugtong kalainan tali sa batch ug streaming code mao nga sa batch atong gibasa ang CSV gikan sa src_pathgamit ang function ReadFromText gikan sa Beam.

Batch DataFlow Job (pagproseso sa batch)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Pag-stream sa DataFlow Job (pagproseso sa sapa)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Pagsugod sa conveyor

Mahimo namon nga mapadagan ang pipeline sa lainlaing mga paagi. Kung gusto namon, mahimo ra namon kini nga ipadagan sa lokal gikan sa usa ka terminal samtang nag-log in sa GCP sa layo.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Bisan pa, ipadagan namon kini gamit ang DataFlow. Mahimo nato kini gamit ang ubos nga sugo pinaagi sa pagtakda sa mosunod nga gikinahanglan nga mga parameter.

  • project β€” ID sa imong proyekto sa GCP.
  • runner usa ka pipeline runner nga mag-analisar sa imong programa ug magtukod sa imong pipeline. Aron makadagan sa panganod, kinahanglan nimong itakda ang usa ka DataflowRunner.
  • staging_location β€” ang agianan padulong sa Cloud Dataflow cloud storage alang sa pag-index sa mga pakete sa code nga gikinahanglan sa mga processor nga nagpahigayon sa trabaho.
  • temp_location β€” agianan padulong sa Cloud Dataflow cloud storage para sa pagtipig sa temporaryo nga mga file sa trabaho nga gihimo samtang ang pipeline nagdagan.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Samtang nagdagan kini nga sugo, makaadto kami sa tab nga DataFlow sa google console ug tan-awon ang among pipeline. Kung mag-klik kami sa pipeline, kinahanglan namon nga makita ang usa ka butang nga susama sa Figure 4. Alang sa mga katuyoan sa pag-debug, makatabang kaayo nga moadto sa Logs ug dayon sa Stackdriver aron makita ang detalyado nga mga troso. Nakatabang kini kanako nga masulbad ang mga isyu sa pipeline sa daghang mga kaso.

Naghimo kami usa ka pipeline sa pagproseso sa datos sa sapa. Bahin 2
Hulagway 4: Beam conveyor

I-access ang among data sa BigQuery

Busa, kinahanglan na kita nga adunay usa ka pipeline nga nagdagan nga adunay mga datos nga nagdagayday sa atong lamesa. Aron sulayan kini, makaadto kami sa BigQuery ug tan-awon ang datos. Human sa paggamit sa sugo sa ubos kinahanglan nimo nga makita ang unang pipila ka laray sa dataset. Karon nga naa na namo ang data nga gitipigan sa BigQuery, mahimo namong ipahigayon ang dugang nga pagtuki, ingon man ipaambit ang datos sa mga kauban ug magsugod sa pagtubag sa mga pangutana sa negosyo.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Naghimo kami usa ka pipeline sa pagproseso sa datos sa sapa. Bahin 2
Hulagway 5: BigQuery

konklusyon

Kami nanghinaut nga kini nga post magsilbi nga usa ka mapuslanon nga panig-ingnan sa paghimo sa usa ka streaming data pipeline, ingon man usab sa pagpangita og mga paagi aron mahimo ang datos nga mas dali ma-access. Ang pagtipig sa datos sa kini nga format naghatag kanamo daghang mga bentaha. Karon makasugod na ta sa pagtubag sa importanteng mga pangutana sama sa pila ka tawo ang naggamit sa atong produkto? Nagtubo ba ang imong base sa tiggamit sa paglabay sa panahon? Unsa nga mga aspeto sa produkto ang labing nakig-uban sa mga tawo? Ug naa bay mga sayup nga wala kinahanglana? Kini ang mga pangutana nga makapainteres sa organisasyon. Pinasukad sa mga panabut nga mitumaw gikan sa mga tubag sa kini nga mga pangutana, mahimo namon nga mapaayo ang produkto ug madugangan ang pakiglambigit sa gumagamit.

Ang Beam mapuslanon kaayo alang sa kini nga klase sa ehersisyo ug adunay daghang uban pang makapaikag nga mga kaso sa paggamit usab. Pananglitan, mahimo nimong analisahon ang datos sa stock tick sa tinuud nga oras ug maghimo mga patigayon base sa pag-analisar, tingali adunay ka data sa sensor nga gikan sa mga salakyanan ug gusto nimo kuwentahon ang mga kalkulasyon sa lebel sa trapiko. Mahimo ka usab, pananglitan, usa ka kompanya sa dula nga nagkolekta sa datos sa gumagamit ug gigamit kini aron maghimo mga dashboard aron masubay ang mga hinungdan nga sukatan. Okay, mga ginoo, kini usa ka hilisgutan alang sa lain nga post, salamat sa pagbasa, ug alang sa gusto nga makita ang tibuuk nga code, sa ubos ang link sa akong GitHub.

https://github.com/DFoly/User_log_pipeline

Mao ra na. Basaha ang unang bahin.

Source: www.habr.com

Idugang sa usa ka comment