Gumagawa kami ng pipeline ng pagpoproseso ng data ng stream. Bahagi 2

Kamusta kayong lahat. Ibinabahagi namin ang pagsasalin ng huling bahagi ng artikulo, na partikular na inihanda para sa mga mag-aaral ng kurso. Data Engineer. Maaari mong basahin ang unang bahagi dito.

Apache Beam at DataFlow para sa Mga Real-Time na Pipeline

Gumagawa kami ng pipeline ng pagpoproseso ng data ng stream. Bahagi 2

Pagse-set up ng Google Cloud

Tandaan: Ginamit ko ang Google Cloud Shell upang patakbuhin ang pipeline at mag-publish ng custom na data ng log dahil nagkakaproblema ako sa pagpapatakbo ng pipeline sa Python 3. Gumagamit ang Google Cloud Shell ng Python 2, na mas pare-pareho sa Apache Beam.

Upang simulan ang pipeline, kailangan nating maghukay ng kaunti sa mga setting. Para sa iyo na hindi pa nakagamit ng GCP dati, kakailanganin mong sundin ang sumusunod na 6 na hakbang na nakabalangkas dito pahina.

Pagkatapos nito, kakailanganin naming i-upload ang aming mga script sa Google Cloud Storage at kopyahin ang mga ito sa aming Google Cloud Shel. Ang pag-upload sa cloud storage ay medyo maliit (maaaring mahanap ang isang paglalarawan dito). Upang kopyahin ang aming mga file, maaari naming buksan ang Google Cloud Shel mula sa toolbar sa pamamagitan ng pag-click sa unang icon sa kaliwa sa Figure 2 sa ibaba.

Gumagawa kami ng pipeline ng pagpoproseso ng data ng stream. Bahagi 2
Figure 2

Ang mga utos na kailangan naming kopyahin ang mga file at i-install ang mga kinakailangang aklatan ay nakalista sa ibaba.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Paglikha ng aming database at talahanayan

Kapag nakumpleto na namin ang lahat ng hakbang na nauugnay sa pag-setup, ang susunod na kailangan naming gawin ay gumawa ng dataset at talahanayan sa BigQuery. Mayroong ilang mga paraan upang gawin ito, ngunit ang pinakasimpleng ay ang paggamit ng Google Cloud console sa pamamagitan ng unang paggawa ng isang dataset. Maaari mong sundin ang mga hakbang sa ibaba linkpara gumawa ng table na may schema. Magkakaroon ng table namin 7 hanay, naaayon sa mga bahagi ng bawat log ng user. Para sa kaginhawahan, tutukuyin namin ang lahat ng column bilang mga string, maliban sa timelocal variable, at pangalanan ang mga ito ayon sa mga variable na nabuo namin kanina. Ang layout ng aming talahanayan ay dapat magmukhang sa Figure 3.

Gumagawa kami ng pipeline ng pagpoproseso ng data ng stream. Bahagi 2
Larawan 3. Layout ng talahanayan

Pag-publish ng data ng log ng user

Ang Pub/Sub ay isang kritikal na bahagi ng aming pipeline dahil pinapayagan nito ang maraming independiyenteng application na makipag-ugnayan sa isa't isa. Sa partikular, ito ay gumagana bilang isang tagapamagitan na nagpapahintulot sa amin na magpadala at tumanggap ng mga mensahe sa pagitan ng mga application. Ang unang bagay na kailangan nating gawin ay lumikha ng isang paksa. Pumunta lang sa Pub/Sub sa console at i-click ang GUMAWA NG PAKSA.

Ang code sa ibaba ay tumatawag sa aming script upang bumuo ng data ng log na tinukoy sa itaas at pagkatapos ay ikinokonekta at ipapadala ang mga log sa Pub/Sub. Ang tanging bagay na kailangan nating gawin ay lumikha ng isang bagay PublisherClient, tukuyin ang landas patungo sa paksa gamit ang pamamaraan topic_path at tawagan ang function publish с topic_path at datos. Pakitandaan na nag-import kami generate_log_line mula sa aming script stream_logs, kaya siguraduhin na ang mga file na ito ay nasa parehong folder, kung hindi, magkakaroon ka ng error sa pag-import. Maaari naming patakbuhin ito sa pamamagitan ng aming google console gamit ang:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Sa sandaling tumakbo ang file, makikita natin ang output ng data ng log sa console, tulad ng ipinapakita sa figure sa ibaba. Gagana ang script na ito hangga't hindi namin ginagamit CTRL + Cpara makumpleto ito.

Gumagawa kami ng pipeline ng pagpoproseso ng data ng stream. Bahagi 2
Larawan 4. Output publish_logs.py

Pagsusulat ng aming pipeline code

Ngayong handa na natin ang lahat, maaari na nating simulan ang nakakatuwang bahagi - coding ang ating pipeline gamit ang Beam at Python. Upang lumikha ng isang Beam pipeline, kailangan nating lumikha ng isang pipeline object (p). Kapag nakagawa na kami ng pipeline object, maaari kaming maglapat ng maraming function nang sunud-sunod gamit ang operator pipe (|). Sa pangkalahatan, ang daloy ng trabaho ay kamukha ng larawan sa ibaba.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Sa aming code, gagawa kami ng dalawang custom na function. Function regex_clean, na nag-scan ng data at kinukuha ang kaukulang row batay sa listahan ng PATTERNS gamit ang function re.search. Ang function ay nagbabalik ng comma separated string. Kung hindi ka isang regular na eksperto sa pagpapahayag, inirerekomenda kong suriin ito pagtuturo at magsanay sa isang notepad upang suriin ang code. Pagkatapos nito, tinukoy namin ang isang custom na ParDo function na tinatawag split, na isang variation ng Beam transform para sa parallel processing. Sa Python, ito ay ginagawa sa isang espesyal na paraan - dapat tayong lumikha ng isang klase na nagmana mula sa klase ng DoFn Beam. Kinukuha ng Split function ang na-parse na row mula sa nakaraang function at nagbabalik ng listahan ng mga diksyunaryo na may mga key na tumutugma sa mga pangalan ng column sa aming BigQuery table. May dapat tandaan tungkol sa function na ito: Kailangan kong mag-import datetime sa loob ng isang function upang gawin itong gumana. Nakakakuha ako ng error sa pag-import sa simula ng file, na kakaiba. Ang listahang ito ay ipapasa sa function WriteToBigQuery, na nagdaragdag lamang ng aming data sa talahanayan. Ang code para sa Batch DataFlow Job at Streaming DataFlow Job ay ibinigay sa ibaba. Ang pagkakaiba lang sa pagitan ng batch at streaming code ay sa batch na binabasa namin ang CSV mula sa src_pathgamit ang function ReadFromText mula kay Beam.

Batch DataFlow Job (batch processing)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (pagproseso ng stream)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Pagsisimula ng conveyor

Maaari nating patakbuhin ang pipeline sa iba't ibang paraan. Kung gusto namin, maaari lang namin itong patakbuhin nang lokal mula sa isang terminal habang nagla-log in sa GCP nang malayuan.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Gayunpaman, patakbuhin namin ito gamit ang DataFlow. Magagawa natin ito gamit ang command sa ibaba sa pamamagitan ng pagtatakda ng mga sumusunod na kinakailangang parameter.

  • project β€” ID ng iyong proyekto sa GCP.
  • runner ay isang pipeline runner na susuriin ang iyong programa at bubuo ng iyong pipeline. Upang tumakbo sa cloud, dapat kang tumukoy ng DataflowRunner.
  • staging_location β€” ang landas patungo sa cloud storage ng Cloud Dataflow para sa pag-index ng mga pakete ng code na kailangan ng mga processor na gumaganap ng trabaho.
  • temp_location β€” landas patungo sa cloud storage ng Cloud Dataflow para sa pag-iimbak ng mga pansamantalang file ng trabaho na ginawa habang tumatakbo ang pipeline.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Habang tumatakbo ang command na ito, maaari tayong pumunta sa tab na DataFlow sa google console at tingnan ang aming pipeline. Kapag nag-click kami sa pipeline, dapat kaming makakita ng isang bagay na katulad ng Figure 4. Para sa mga layunin ng pag-debug, maaaring maging kapaki-pakinabang na pumunta sa Logs at pagkatapos ay sa Stackdriver upang tingnan ang mga detalyadong log. Nakatulong ito sa akin na malutas ang mga isyu sa pipeline sa ilang mga kaso.

Gumagawa kami ng pipeline ng pagpoproseso ng data ng stream. Bahagi 2
Larawan 4: Beam conveyor

I-access ang aming data sa BigQuery

Kaya, dapat mayroon na tayong pipeline na tumatakbo na may data na dumadaloy sa ating talahanayan. Para subukan ito, maaari tayong pumunta sa BigQuery at tingnan ang data. Pagkatapos gamitin ang utos sa ibaba dapat mong makita ang unang ilang hilera ng dataset. Ngayong mayroon na kaming data na nakaimbak sa BigQuery, maaari na kaming magsagawa ng karagdagang pagsusuri, gayundin ang pagbabahagi ng data sa mga kasamahan at simulan ang pagsagot sa mga tanong sa negosyo.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Gumagawa kami ng pipeline ng pagpoproseso ng data ng stream. Bahagi 2
Larawan 5: BigQuery

Konklusyon

Umaasa kami na ang post na ito ay nagsisilbing isang kapaki-pakinabang na halimbawa ng paglikha ng isang streaming data pipeline, pati na rin ang paghahanap ng mga paraan upang gawing mas madaling ma-access ang data. Ang pag-iimbak ng data sa format na ito ay nagbibigay sa amin ng maraming pakinabang. Ngayon ay maaari na nating simulan ang pagsagot sa mahahalagang tanong tulad ng ilang tao ang gumagamit ng ating produkto? Lumalaki ba ang iyong user base sa paglipas ng panahon? Anong mga aspeto ng produkto ang pinakamadalas na nakikipag-ugnayan sa mga tao? At mayroon bang mga pagkakamali kung saan hindi dapat magkaroon? Ito ang mga tanong na magiging interesante sa organisasyon. Batay sa mga insight na lumalabas mula sa mga sagot sa mga tanong na ito, maaari naming pagbutihin ang produkto at pataasin ang pakikipag-ugnayan ng user.

Ang Beam ay talagang kapaki-pakinabang para sa ganitong uri ng ehersisyo at mayroon ding ilang iba pang mga kawili-wiling kaso ng paggamit. Halimbawa, maaaring gusto mong suriin ang data ng stock tick sa real time at gumawa ng mga trade batay sa pagsusuri, marahil ay mayroon kang data ng sensor na nagmumula sa mga sasakyan at gusto mong kalkulahin ang mga kalkulasyon sa antas ng trapiko. Maaari ka ring, halimbawa, maging isang kumpanya ng paglalaro na nangongolekta ng data ng user at ginagamit ito upang lumikha ng mga dashboard upang subaybayan ang mga pangunahing sukatan. Okay, mga ginoo, ito ay isang paksa para sa isa pang post, salamat sa pagbabasa, at para sa mga nais makita ang buong code, sa ibaba ay ang link sa aking GitHub.

https://github.com/DFoly/User_log_pipeline

Iyon lang. Basahin ang unang bahagi.

Pinagmulan: www.habr.com

Magdagdag ng komento