Sukuriame srauto duomenų apdorojimo vamzdyną. 2 dalis

Sveiki visi. Dalinamės paskutinės straipsnio dalies vertimu, parengtu specialiai kurso studentams. Duomenų inžinierius. Galite perskaityti pirmąją dalį čia.

„Apache Beam“ ir „DataFlow“, skirta realaus laiko vamzdynams

Sukuriame srauto duomenų apdorojimo vamzdyną. 2 dalis

„Google Cloud“ nustatymas

Pastaba: naudoju „Google Cloud Shell“, kad paleisčiau dujotiekį ir paskelbčiau tinkintus žurnalo duomenis, nes kilo problemų paleisdamas dujotiekį „Python 3“. „Google Cloud Shell“ naudoja Python 2, kuri labiau suderinama su „Apache Beam“.

Norėdami pradėti dujotiekį, turime šiek tiek įsigilinti į nustatymus. Tiems iš jūsų, kurie anksčiau nenaudojote GSP, turėsite atlikti toliau nurodytus 6 veiksmus puslapis.

Po to turėsime įkelti savo scenarijus į „Google Cloud Storage“ ir nukopijuoti juos į „Google Cloud Shel“. Įkėlimas į debesies saugyklą yra gana trivialus (aprašą rasite čia). Norėdami nukopijuoti failus, galime atidaryti Google Cloud Shel iš įrankių juostos spustelėdami pirmąją piktogramą kairėje 2 paveiksle žemiau.

Sukuriame srauto duomenų apdorojimo vamzdyną. 2 dalis
Pav 2

Toliau pateiktos komandos, kurių mums reikia norint nukopijuoti failus ir įdiegti reikiamas bibliotekas.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Sukuriame mūsų duomenų bazę ir lentelę

Kai atliksime visus su sąranka susijusius veiksmus, kitas dalykas, kurį turime padaryti, yra sukurti duomenų rinkinį ir lentelę „BigQuery“. Tai galima padaryti keliais būdais, tačiau paprasčiausias yra naudoti „Google Cloud“ konsolę, pirmiausia sukuriant duomenų rinkinį. Galite atlikti toliau nurodytus veiksmus nuorodasukurti lentelę su schema. Mūsų stalas turės 7 stulpeliai, atitinkantis kiekvieno vartotojo žurnalo komponentus. Patogumui visus stulpelius apibrėžsime kaip eilutes, išskyrus laiko lokalinį kintamąjį, ir pavadinsime juos pagal anksčiau sugeneruotus kintamuosius. Mūsų lentelės išdėstymas turėtų atrodyti taip, kaip parodyta 3 paveiksle.

Sukuriame srauto duomenų apdorojimo vamzdyną. 2 dalis
3 pav. Lentelės išdėstymas

Naudotojo žurnalo duomenų publikavimas

„Pub/Sub“ yra esminis mūsų konvejerio komponentas, nes jis leidžia kelioms nepriklausomoms programoms bendrauti tarpusavyje. Visų pirma, jis veikia kaip tarpininkas, leidžiantis siųsti ir gauti pranešimus tarp programų. Pirmas dalykas, kurį turime padaryti, yra sukurti temą. Tiesiog eikite į „Pub/Sub“ pulte ir spustelėkite KURTI TEMĄ.

Toliau pateiktas kodas iškviečia mūsų scenarijų, kad sugeneruotų aukščiau apibrėžtus žurnalo duomenis, tada prisijungia ir siunčia žurnalus į Pub / Sub. Vienintelis dalykas, kurį turime padaryti, yra sukurti objektą PublisherClient, nurodykite kelią į temą naudodami metodą topic_path ir iškvieskite funkciją publish с topic_path ir duomenis. Atkreipkite dėmesį, kad mes importuojame generate_log_line iš mūsų scenarijaus stream_logs, todėl įsitikinkite, kad šie failai yra tame pačiame aplanke, kitaip gausite importavimo klaidą. Tada galime tai paleisti naudodami „Google“ konsolę naudodami:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Kai tik failas bus paleistas, galėsime matyti žurnalo duomenų išvestį į konsolę, kaip parodyta paveikslėlyje žemiau. Šis scenarijus veiks tol, kol jo nenaudosime CTRL + Cją užbaigti.

Sukuriame srauto duomenų apdorojimo vamzdyną. 2 dalis
4 pav. Išvestis publish_logs.py

Rašome mūsų vamzdyno kodą

Dabar, kai jau viską paruošėme, galime pradėti smagiąją dalį – koduoti dujotiekį naudojant „Beam“ ir „Python“. Norėdami sukurti „Beam“ vamzdyną, turime sukurti dujotiekio objektą (p). Sukūrę dujotiekio objektą, naudodami operatorių galime taikyti kelias funkcijas vieną po kitos pipe (|). Apskritai darbo eiga atrodo kaip paveikslėlyje žemiau.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Savo kode sukursime dvi pasirinktines funkcijas. Funkcija regex_clean, kuris nuskaito duomenis ir nuskaito atitinkamą eilutę pagal sąrašą PATTERNS naudojant funkciją re.search. Funkcija grąžina kableliais atskirtą eilutę. Jei nesate reguliariųjų reiškinių ekspertas, rekomenduoju tai patikrinti pamoka ir praktikuokite bloknote, kad patikrintumėte kodą. Po to apibrėžiame pasirinktinę ParDo funkciją, vadinamą skilimas, kuri yra pluošto transformacijos variantas lygiagrečiam apdorojimui. Python tai daroma ypatingu būdu – turime sukurti klasę, kuri paveldėtų iš DoFn Beam klasės. Funkcija Padalyti paima išnagrinėtą ankstesnės funkcijos eilutę ir pateikia žodynų sąrašą su raktais, atitinkančiais stulpelių pavadinimus mūsų BigQuery lentelėje. Apie šią funkciją reikia atkreipti dėmesį: turėjau importuoti datetime funkcijos viduje, kad ji veiktų. Failo pradžioje gavau importavimo klaidą, kuri buvo keista. Tada šis sąrašas perduodamas funkcijai WriteToBigQuery, kuris tiesiog prideda mūsų duomenis į lentelę. Batch DataFlow Job ir Streaming DataFlow Job kodas pateiktas toliau. Vienintelis skirtumas tarp paketinio ir srautinio kodo yra tas, kad paketu mes skaitome CSV iš src_pathnaudojant funkciją ReadFromText iš Beam.

Paketinis duomenų srauto darbas (paketinis apdorojimas)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Srautinio duomenų srauto užduotis (srautinis apdorojimas)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Konvejerio paleidimas

Dujotiekį galime paleisti keliais skirtingais būdais. Jei norėtume, galėtume tiesiog paleisti jį vietoje iš terminalo, nuotoliniu būdu prisijungdami prie GCP.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Tačiau mes ketiname jį paleisti naudodami „DataFlow“. Tai galime padaryti naudodami toliau pateiktą komandą, nustatydami šiuos būtinus parametrus.

  • project – GCP projekto ID.
  • runner yra dujotiekio bėgikas, kuris išanalizuos jūsų programą ir sukurs jūsų dujotiekį. Norėdami paleisti debesyje, turite nurodyti DataflowRunner.
  • staging_location — kelias į debesies saugyklą „Cloud Dataflow“, skirtas kodo paketams, reikalingiems darbus atliekantiems procesoriams, indeksuoti.
  • temp_location — kelias į „Cloud Dataflow“ debesies saugyklą, skirtą laikiniems darbo failams, sukurtiems, kai veikia dujotiekis, saugoti.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Kol ši komanda vykdoma, galime eiti į „Google“ pulto „DataFlow“ skirtuką ir peržiūrėti savo konvejerį. Kai spustelėjame dujotiekį, turėtume pamatyti kažką panašaus į 4 pav. Derinimo tikslais gali būti labai naudinga eiti į žurnalus ir tada į „Stackdriver“, kad peržiūrėtumėte išsamius žurnalus. Tai padėjo man išspręsti vamzdyno problemas daugeliu atvejų.

Sukuriame srauto duomenų apdorojimo vamzdyną. 2 dalis
4 pav. Sijos konvejeris

Pasiekite mūsų duomenis naudodami „BigQuery“.

Taigi, jau turėtume turėti konvejerį, kuriame duomenys patenka į mūsų lentelę. Norėdami tai patikrinti, galime eiti į „BigQuery“ ir peržiūrėti duomenis. Panaudoję toliau pateiktą komandą, turėtumėte pamatyti kelias pirmąsias duomenų rinkinio eilutes. Dabar, kai turime BigQuery saugomus duomenis, galime atlikti tolesnę analizę, dalytis duomenimis su kolegomis ir pradėti atsakyti į verslo klausimus.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Sukuriame srauto duomenų apdorojimo vamzdyną. 2 dalis
5 pav. „BigQuery“.

išvada

Tikimės, kad šis įrašas bus naudingas srautinio duomenų perdavimo kanalo kūrimo ir būdų, kaip padaryti duomenis prieinamesnius, pavyzdys. Duomenų saugojimas šiuo formatu suteikia mums daug privalumų. Dabar galime pradėti atsakyti į svarbius klausimus, pvz., kiek žmonių naudojasi mūsų produktu? Ar laikui bėgant jūsų vartotojų bazė auga? Su kokiais produkto aspektais žmonės bendrauja dažniausiai? O ar yra klaidų ten, kur jų neturėtų būti? Tai klausimai, kurie bus įdomūs organizacijai. Remdamiesi įžvalgomis, gautomis iš atsakymų į šiuos klausimus, galime patobulinti produktą ir padidinti vartotojų įsitraukimą.

Beam yra tikrai naudinga tokio tipo pratimams ir turi daug kitų įdomių naudojimo atvejų. Pavyzdžiui, galbūt norėsite analizuoti akcijų kainų duomenis realiuoju laiku ir sudaryti sandorius remiantis analize, galbūt turite jutiklių duomenis iš transporto priemonių ir norite apskaičiuoti srauto lygio skaičiavimus. Taip pat galite, pavyzdžiui, būti žaidimų įmone, kuri renka naudotojų duomenis ir naudoja juos prietaisų skydeliams kurti pagrindinei metrikai stebėti. Gerai, ponai, tai yra kito įrašo tema, ačiū, kad skaitėte, o tiems, kurie nori pamatyti visą kodą, žemiau yra nuoroda į mano „GitHub“.

https://github.com/DFoly/User_log_pipeline

Tai viskas. Skaitykite pirmą dalį.

Šaltinis: www.habr.com

Добавить комментарий