A ṣẹda opo gigun ti n ṣatunṣe data ṣiṣan. Apa 2

Bawo ni gbogbo eniyan. A n pin itumọ ti apakan ikẹhin ti nkan naa, ti a pese sile ni pataki fun awọn ọmọ ile-iwe ti iṣẹ-ẹkọ naa. Data ẹlẹrọ. O le ka apakan akọkọ nibi.

Beam Apache ati Sisan Data fun Awọn Pipe-Aago-gidi

A ṣẹda opo gigun ti n ṣatunṣe data ṣiṣan. Apa 2

Ṣiṣeto Google Cloud

Akiyesi: Mo lo Google Cloud Shell lati ṣiṣẹ opo gigun ti epo ati ki o gbejade data log aṣa nitori Mo ni wahala ti nṣiṣẹ opo gigun ti epo ni Python 3. Google Cloud Shell nlo Python 2, eyiti o ni ibamu pẹlu Apache Beam.

Lati bẹrẹ opo gigun ti epo, a nilo lati ma wà diẹ sinu awọn eto. Fun awọn ti o ko tii lo GCP tẹlẹ, iwọ yoo nilo lati tẹle awọn igbesẹ 6 wọnyi ti a ṣe ilana ni eyi iwe.

Lẹhin eyi, a yoo nilo lati gbe awọn iwe afọwọkọ wa si Ibi ipamọ awọsanma Google ati daakọ wọn si Google Cloud Shel wa. Ikojọpọ si ibi ipamọ awọsanma jẹ ohun kekere (a le rii apejuwe kan nibi). Lati daakọ awọn faili wa, a le ṣii Google Cloud Shel lati ọpa irinṣẹ nipa titẹ aami akọkọ ni apa osi ni Nọmba 2 ni isalẹ.

A ṣẹda opo gigun ti n ṣatunṣe data ṣiṣan. Apa 2
2 Ẹka

Awọn aṣẹ ti a nilo lati daakọ awọn faili ati fi sori ẹrọ awọn ile-ikawe ti a beere ni a ṣe akojọ si isalẹ.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Ṣiṣẹda wa database ati tabili

Ni kete ti a ba ti pari gbogbo awọn igbesẹ ti o jọmọ iṣeto, ohun ti o tẹle ti a nilo lati ṣe ni ṣẹda dataset ati tabili ni BigQuery. Awọn ọna pupọ lo wa lati ṣe eyi, ṣugbọn o rọrun julọ ni lati lo console awọsanma Google nipa ṣiṣẹda akọkọ dataset. O le tẹle awọn igbesẹ ni isalẹ ọna asopọlati ṣẹda tabili kan pẹlu ero kan. Tabili wa yoo ni 7 ọwọn, bamu si awọn irinše ti kọọkan olumulo log. Fun wewewe, a yoo setumo gbogbo awọn ọwọn bi awọn okun, ayafi fun awọn timelocal oniyipada, ki o si lorukọ wọn gẹgẹ bi awọn oniyipada ti a ti ipilẹṣẹ sẹyìn. Ifilelẹ tabili wa yẹ ki o dabi ni Nọmba 3.

A ṣẹda opo gigun ti n ṣatunṣe data ṣiṣan. Apa 2
olusin 3. Table akọkọ

Titejade data log olumulo

Pub/Sub jẹ paati pataki ti opo gigun ti epo wa nitori pe o ngbanilaaye ọpọlọpọ awọn ohun elo ominira lati ṣe ibaraẹnisọrọ pẹlu ara wọn. Ni pato, o ṣiṣẹ bi agbedemeji ti o gba wa laaye lati firanṣẹ ati gba awọn ifiranṣẹ laarin awọn ohun elo. Ohun akọkọ ti a nilo lati ṣe ni ṣẹda koko-ọrọ kan. Nìkan lọ si Pub/Sub ninu console ki o tẹ ṢẸDA TOPIC.

Koodu ti o wa ni isalẹ n pe iwe afọwọkọ wa lati ṣe ipilẹṣẹ data log ti asọye loke ati lẹhinna sopọ ati firanṣẹ awọn akọọlẹ si Pub/Sub. Ohun kan ṣoṣo ti a nilo lati ṣe ni ṣẹda nkan kan Akede Onibara, pato ọna si koko nipa lilo ọna topic_path ati pe iṣẹ naa publish с topic_path ati data. Jọwọ ṣe akiyesi pe a gbe wọle generate_log_line lati iwe afọwọkọ wa stream_logs, nitorina rii daju pe awọn faili wọnyi wa ninu folda kanna, bibẹẹkọ iwọ yoo gba aṣiṣe agbewọle wọle. A le lẹhinna ṣiṣẹ eyi nipasẹ google console wa ni lilo:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Ni kete ti faili naa ba n ṣiṣẹ, a yoo ni anfani lati wo abajade ti data log si console, bi o ṣe han ninu eeya ni isalẹ. Iwe afọwọkọ yii yoo ṣiṣẹ niwọn igba ti a ko ba lo Ctrl + Clati pari rẹ.

A ṣẹda opo gigun ti n ṣatunṣe data ṣiṣan. Apa 2
olusin 4. Ijade publish_logs.py

Kikọ koodu opo gigun ti epo wa

Ni bayi ti a ti pese ohun gbogbo, a le bẹrẹ apakan igbadun - ifaminsi opo gigun ti epo wa ni lilo Beam ati Python. Lati ṣẹda opo gigun ti epo Beam, a nilo lati ṣẹda ohun elo opo gigun (p). Ni kete ti a ti ṣẹda ohun elo opo gigun ti epo, a le lo awọn iṣẹ lọpọlọpọ ọkan lẹhin omiiran nipa lilo oniṣẹ pipe (|). Ni gbogbogbo, iṣan-iṣẹ naa dabi aworan ni isalẹ.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Ninu koodu wa, a yoo ṣẹda awọn iṣẹ aṣa meji. Išẹ regex_clean, eyiti o ṣe ayẹwo data naa ati gba ila ti o baamu ti o da lori atokọ PATTERNS nipa lilo iṣẹ naa. re.search. Iṣẹ naa da okun ti o ya sọtọ komama pada. Ti o ko ba jẹ amoye ikosile deede, Mo ṣeduro ṣayẹwo eyi ikẹkọ ati adaṣe ni iwe akọsilẹ lati ṣayẹwo koodu naa. Lẹhin eyi a ṣalaye iṣẹ aṣa ParDo ti a pe Pin, eyi ti o jẹ iyatọ ti iyipada Beam fun sisẹ ti o jọra. Ni Python, eyi ni a ṣe ni ọna pataki - a gbọdọ ṣẹda kilasi ti o jogun lati kilasi DoFn Beam. Iṣẹ Pipin gba laini ti a ti ṣe itupalẹ lati iṣẹ iṣaaju ati da atokọ ti awọn iwe-itumọ pada pẹlu awọn bọtini ti o baamu awọn orukọ ọwọn ninu tabili BigQuery wa. Nkankan wa lati ṣe akiyesi nipa iṣẹ yii: Mo ni lati gbe wọle datetime inu iṣẹ kan lati jẹ ki o ṣiṣẹ. Mo n gba aṣiṣe agbewọle ni ibẹrẹ faili, eyiti o jẹ ajeji. Atokọ yii lẹhinna kọja si iṣẹ naa KọToBigQuery, eyi ti o rọrun ṣe afikun data wa si tabili. Awọn koodu fun Batch DataFlow Job ati ṣiṣan DataFlow Job ni a fun ni isalẹ. Iyatọ nikan laarin ipele ati koodu ṣiṣanwọle ni pe ni ipele a ka CSV lati src_pathlilo iṣẹ naa ReadFromText lati Beam.

Ipe Data Sisan Job (sisẹ ipele)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Sisanwọle DataFlow Job (sisẹ ṣiṣan)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Ti o bere awọn conveyor

A le ṣiṣẹ opo gigun ti epo ni ọpọlọpọ awọn ọna oriṣiriṣi. Ti a ba fẹ, a le kan ṣiṣẹ ni agbegbe lati ebute kan lakoko ti o wọle si GCP latọna jijin.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Sibẹsibẹ, a yoo ṣiṣẹ ni lilo DataFlow. A le ṣe eyi nipa lilo aṣẹ ti o wa ni isalẹ nipa tito awọn paramita ti o nilo atẹle.

  • project - ID ti iṣẹ akanṣe GCP rẹ.
  • runner jẹ olusare opo gigun ti epo ti yoo ṣe itupalẹ eto rẹ ki o ṣe opo gigun ti epo rẹ. Lati ṣiṣẹ ninu awọsanma, o gbọdọ pato kan DataflowRunner.
  • staging_location - ọna si ibi ipamọ awọsanma Dataflow awọsanma fun awọn idii koodu titọka ti o nilo nipasẹ awọn ilana ti n ṣe iṣẹ naa.
  • temp_location - ọna si ibi ipamọ awọsanma Dataflow awọsanma fun titoju awọn faili iṣẹ igba diẹ ti a ṣẹda lakoko ti opo gigun ti epo n ṣiṣẹ.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Lakoko ti aṣẹ yii nṣiṣẹ, a le lọ si taabu DataFlow ninu console google ki o wo opo gigun ti epo wa. Nigba ti a ba tẹ lori opo gigun ti epo, o yẹ ki a wo nkan ti o jọra si Nọmba 4. Fun awọn idi ti n ṣatunṣe aṣiṣe, o le ṣe iranlọwọ pupọ lati lọ si Awọn logs ati lẹhinna si Stackdriver lati wo awọn akọsilẹ alaye. Eyi ti ṣe iranlọwọ fun mi lati yanju awọn ọran opo gigun ti epo ni nọmba awọn ọran.

A ṣẹda opo gigun ti n ṣatunṣe data ṣiṣan. Apa 2
olusin 4: tan ina conveyor

Wọle si data wa ni BigQuery

Nitorinaa, o yẹ ki a ti ni opo gigun ti epo ti n ṣiṣẹ pẹlu data ti nṣàn sinu tabili wa. Lati ṣe idanwo eyi, a le lọ si BigQuery ki o wo data naa. Lẹhin lilo aṣẹ ni isalẹ o yẹ ki o wo awọn ori ila akọkọ ti dataset. Ni bayi pe a ni data ti o fipamọ sinu BigQuery, a le ṣe itupalẹ siwaju, bakannaa pin data pẹlu awọn ẹlẹgbẹ ati bẹrẹ dahun awọn ibeere iṣowo.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

A ṣẹda opo gigun ti n ṣatunṣe data ṣiṣan. Apa 2
olusin 5: BigQuery

ipari

A nireti pe ifiweranṣẹ yii ṣiṣẹ bi apẹẹrẹ iwulo ti ṣiṣẹda opo gigun ti epo ṣiṣanwọle, bakannaa wiwa awọn ọna lati jẹ ki data wa siwaju sii. Titoju data ni ọna kika yii fun wa ni ọpọlọpọ awọn anfani. Bayi a le bẹrẹ dahun awọn ibeere pataki bi ọpọlọpọ eniyan lo ọja wa? Njẹ ipilẹ olumulo rẹ n dagba ni akoko pupọ? Awọn ẹya wo ni ọja ṣe awọn eniyan nlo pẹlu pupọ julọ? Ati pe awọn aṣiṣe wa nibiti ko yẹ ki o jẹ? Iwọnyi ni awọn ibeere ti yoo jẹ anfani si ajo naa. Da lori awọn oye ti o farahan lati awọn idahun si awọn ibeere wọnyi, a le mu ọja dara si ati mu ilọsiwaju olumulo pọ si.

Beam wulo gaan fun iru adaṣe yii ati pe o ni nọmba awọn ọran lilo miiran ti o nifẹ si daradara. Fun apẹẹrẹ, o le fẹ lati ṣe itupalẹ awọn data ami ọja ni akoko gidi ati ṣe awọn iṣowo ti o da lori itupalẹ, boya o ni data sensọ ti o wa lati awọn ọkọ ati fẹ lati ṣe iṣiro awọn iṣiro ipele ijabọ. O tun le, fun apẹẹrẹ, jẹ ile-iṣẹ ere kan ti o gba data olumulo ti o lo lati ṣẹda dasibodu lati tọpa awọn metiriki bọtini. O dara, awọn arakunrin, eyi jẹ koko-ọrọ fun ifiweranṣẹ miiran, o ṣeun fun kika, ati fun awọn ti o fẹ lati rii koodu kikun, ni isalẹ ni ọna asopọ si GitHub mi.

https://github.com/DFoly/User_log_pipeline

Gbogbo ẹ niyẹn. Ka apa kini.

orisun: www.habr.com

Fi ọrọìwòye kun