Ni kreas fluan datumtraktadon. Parto 2

Saluton al ĉiuj. Ni dividas la tradukon de la fina parto de la artikolo, preparita specife por studentoj de la kurso. Datuma Inĝeniero. Vi povas legi la unuan parton tie.

Apache Beam kaj DataFlow por Realtempaj Duktoj

Ni kreas fluan datumtraktadon. Parto 2

Agordo de Google Cloud

Noto: Mi uzis Google Cloud Shell por ruli la dukton kaj publikigi kutimajn protokolojn ĉar mi havis problemojn pri rulado de la dukto en Python 3. Google Cloud Shell uzas Python 2, kiu estas pli konsekvenca kun Apache Beam.

Por komenci la dukton, ni devas iom fosi en la agordojn. Por tiuj el vi, kiuj antaŭe ne uzis GCP, vi devos sekvi la sekvajn 6 paŝojn priskribitajn en ĉi tiu paĝo.

Post ĉi tio, ni devos alŝuti niajn skriptojn al Google Cloud Storage kaj kopii ilin al nia Google Cloud Shel. Alŝuto al nuba stokado estas sufiĉe bagatela (priskribo troveblas tie). Por kopii niajn dosierojn, ni povas malfermi Google Cloud Shel de la ilobreto klakante la unuan ikonon maldekstre en Figuro 2 sube.

Ni kreas fluan datumtraktadon. Parto 2
XNUMF-figuro

La komandoj, kiujn ni bezonas por kopii la dosierojn kaj instali la postulatajn bibliotekojn, estas listigitaj sube.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Kreante nian datumbazon kaj tabelon

Post kiam ni kompletigis ĉiujn agordajn rilatajn paŝojn, la sekva afero, kiun ni devas fari, estas krei datumaron kaj tabelon en BigQuery. Estas pluraj manieroj fari tion, sed la plej simpla estas uzi la Google Cloud-konzolon unue kreante datumaron. Vi povas sekvi la paŝojn sube ligilokrei tabelon kun skemo. Nia tablo havos 7 kolumnoj, respondante al la komponantoj de ĉiu uzantprotokolo. Por komforto, ni difinos ĉiujn kolumnojn kiel ĉenojn, krom la temploka variablo, kaj nomos ilin laŭ la variabloj, kiujn ni generis pli frue. La aranĝo de nia tabelo devus aspekti kiel en Figuro 3.

Ni kreas fluan datumtraktadon. Parto 2
Figuro 3. Tabelaranĝo

Eldonante uzantajn protokolojn

Pub/Sub estas kritika komponanto de nia dukto ĉar ĝi permesas al pluraj sendependaj aplikoj komuniki unu kun la alia. Aparte, ĝi funkcias kiel peranto, kiu ebligas al ni sendi kaj ricevi mesaĝojn inter aplikaĵoj. La unua afero, kiun ni devas fari, estas krei temon. Simple iru al Pub/Sub en la konzolo kaj alklaku KREU TEMON.

La suba kodo vokas nian skripton por generi la protokolojn difinitajn supre kaj poste ligas kaj sendas la protokolojn al Pub/Sub. La sola afero, kiun ni devas fari, estas krei objekton PublisherClient, specifu la vojon al la temo uzante la metodon topic_path kaj voku la funkcion publish с topic_path kaj datumoj. Bonvolu noti, ke ni importas generate_log_line el nia skripto stream_logs, do certigu, ke ĉi tiuj dosieroj estas en la sama dosierujo, alie vi ricevos importan eraron. Ni povas tiam ruli ĉi tion tra nia gugla konzolo uzante:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Tuj kiam la dosiero funkcios, ni povos vidi la eligon de la protokolaj datumoj al la konzolo, kiel montrite en la suba figuro. Ĉi tiu skripto funkcios tiel longe kiel ni ne uzos CTRL + Cpor kompletigi ĝin.

Ni kreas fluan datumtraktadon. Parto 2
Figuro 4. Eligo publish_logs.py

Skribante nian duktokodon

Nun kiam ni havas ĉion pretan, ni povas komenci la amuzan parton - kodi nian dukton per Beam kaj Python. Por krei Beam-dukton, ni devas krei duktobjekton (p). Post kiam ni kreis duktobjekton, ni povas apliki plurajn funkciojn unu post alia uzante la operatoron pipe (|). Ĝenerale, la laborfluo aspektas kiel la suba bildo.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

En nia kodo, ni kreos du kutimajn funkciojn. Funkcio regex_clean, kiu skanas la datumojn kaj reakiras la respondan vicon bazitan sur la listo de Ŝablonoj uzante la funkcion re.search. La funkcio liveras komon apartigitan ĉenon. Se vi ne estas fakulo pri regula esprimo, mi rekomendas kontroli ĉi tion lernilo kaj ekzercu en notbloko por kontroli la kodon. Post tio ni difinas kutiman ParDo-funkcion nomatan Split, kiu estas vario de la Beam-transformaĵo por paralela pretigo. En Python, tio estas farita en speciala maniero - ni devas krei klason kiu heredas de la DoFn Beam klaso. La funkcio Split prenas la analizitan vicon de la antaŭa funkcio kaj resendas liston de vortaroj kun klavoj respondaj al la kolonnomoj en nia BigQuery-tabelo. Estas io rimarkinda pri ĉi tiu funkcio: mi devis importi datetime ene de funkcio por ke ĝi funkciigu. Mi ricevis importan eraron komence de la dosiero, kio estis stranga. Ĉi tiu listo tiam estas pasita al la funkcio WriteToBigQuery, kiu simple aldonas niajn datumojn al la tabelo. La kodo por Batch DataFlow Job kaj Streaming DataFlow Job estas donita sube. La nura diferenco inter batch kaj streaming-kodo estas, ke en batch ni legas la CSV el src_pathuzante la funkcion ReadFromText de Beam.

Batch DataFlow Job (loka prilaborado)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (flua prilaborado)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Ekfunkciigante la transportilon

Ni povas funkciigi la dukton en pluraj malsamaj manieroj. Se ni volus, ni povus simple ruli ĝin loke de terminalo dum ensaluti en GCP malproksime.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Tamen ni funkcios ĝin per DataFlow. Ni povas fari tion uzante la suban komandon agordante la jenajn postulatajn parametrojn.

  • project — ID de via GCP-projekto.
  • runner estas duktokuristo, kiu analizos vian programon kaj konstruos vian dukton. Por funkcii en la nubo, vi devas specifi DataflowRunner.
  • staging_location — la vojo al la nuba stokado de Cloud Dataflow por indeksado de kodpakaĵoj bezonataj de la procesoroj plenumantaj la laboron.
  • temp_location — vojo al la nuba stokado de Cloud Dataflow por stoki provizorajn labordosierojn kreitajn dum la dukto funkcias.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Dum ĉi tiu komando funkcias, ni povas iri al la langeto DataFlow en la gugla konzolo kaj vidi nian dukton. Kiam ni klakas sur la dukto, ni devus vidi ion similan al Figuro 4. Por sencimigaj celoj, povas esti tre helpe iri al Protokoloj kaj poste al Stackdriver por vidi la detalajn protokolojn. Ĉi tio helpis min solvi problemojn pri dukto en kelkaj kazoj.

Ni kreas fluan datumtraktadon. Parto 2
Figuro 4: Trabo transportilo

Aliru niajn datumojn en BigQuery

Do, ni jam havu dukton funkciantan kun datumoj fluantaj en nian tabelon. Por provi ĉi tion, ni povas iri al BigQuery kaj rigardi la datumojn. Post uzi la suban komandon vi devus vidi la unuajn vicojn de la datumaro. Nun kiam ni havas la datumojn konservitajn en BigQuery, ni povas fari plian analizon, kaj ankaŭ kunhavigi la datumojn kun kolegoj kaj komenci respondi komercajn demandojn.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Ni kreas fluan datumtraktadon. Parto 2
Figuro 5: BigQuery

konkludo

Ni esperas, ke ĉi tiu afiŝo servas kiel utila ekzemplo pri kreado de flua datumdukto, kaj ankaŭ trovi manierojn fari datumojn pli alireblaj. Stoki datumojn en ĉi tiu formato donas al ni multajn avantaĝojn. Nun ni povas komenci respondi gravajn demandojn kiel kiom da homoj uzas nian produkton? Ĉu via uzantbazo kreskas laŭlonge de la tempo? Kun kiuj aspektoj de la produkto homoj plej interagas? Kaj ĉu estas eraroj kie ne devus esti? Ĉi tiuj estas la demandoj, kiuj interesos la organizon. Surbaze de la komprenoj kiuj aperas el la respondoj al ĉi tiuj demandoj, ni povas plibonigi la produkton kaj pliigi la engaĝiĝon de la uzantoj.

Trabo estas vere utila por ĉi tiu speco de ekzerco kaj havas kelkajn aliajn interesajn uzkazojn ankaŭ. Ekzemple, vi eble volas analizi akciajn tick-datumojn en reala tempo kaj fari komercojn bazitajn sur la analizo, eble vi havas sensilajn datumojn venantajn de veturiloj kaj volas kalkuli trafiknivelajn kalkulojn. Vi ankaŭ povus, ekzemple, esti videoludada kompanio, kiu kolektas uzantajn datumojn kaj uzas ĝin por krei instrumentpanelojn por spuri ŝlosilajn metrikojn. Bone, sinjoroj, ĉi tio estas temo por alia afiŝo, dankon pro legado, kaj por tiuj, kiuj volas vidi la plenan kodon, sube estas la ligilo al mia GitHub.

https://github.com/DFoly/User_log_pipeline

Tio estas ĉio. Legu unu parton.

fonto: www.habr.com

Aldoni komenton