Stvaramo cjevovod za obradu tokovnih podataka. 2. dio

Bok svima. Dijelimo prijevod završnog dijela članka, pripremljen posebno za studente kolegija. Inženjer podataka. Možete pročitati prvi dio ovdje.

Apache Beam i DataFlow za cjevovode u stvarnom vremenu

Stvaramo cjevovod za obradu tokovnih podataka. 2. dio

Postavljanje Google Clouda

Napomena: koristio sam Google Cloud Shell za pokretanje cjevovoda i objavljivanje prilagođenih podataka dnevnika jer sam imao problema s pokretanjem cjevovoda u Pythonu 3. Google Cloud Shell koristi Python 2, koji je dosljedniji s Apache Beamom.

Da bismo pokrenuli cjevovod, moramo malo kopati po postavkama. Za one od vas koji prije nisu koristili GCP, morat ćete slijediti sljedećih 6 koraka opisanih u ovom stranica.

Nakon toga, morat ćemo učitati naše skripte u Google Cloud Storage i kopirati ih u naš Google Cloud Shel. Prijenos u pohranu u oblaku prilično je trivijalan (možete pronaći opis здесь). Da bismo kopirali svoje datoteke, možemo otvoriti Google Cloud Shel s alatne trake klikom na prvu ikonu s lijeve strane na slici 2 u nastavku.

Stvaramo cjevovod za obradu tokovnih podataka. 2. dio
Slika 2

Naredbe koje su nam potrebne za kopiranje datoteka i instaliranje potrebnih biblioteka navedene su u nastavku.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Izrada naše baze podataka i tablice

Nakon što smo dovršili sve korake povezane s postavljanjem, sljedeća stvar koju trebamo učiniti je stvoriti skup podataka i tablicu u BigQueryju. Postoji nekoliko načina da to učinite, ali najjednostavniji je koristiti Google Cloud konzolu tako da prvo napravite skup podataka. Možete slijediti korake u nastavku linkza izradu tablice sa shemom. Naš će stol imati 7 stupaca, koji odgovara komponentama svakog korisničkog dnevnika. Radi praktičnosti, definirat ćemo sve stupce kao nizove, osim varijable timelocal, i imenovati ih prema varijablama koje smo ranije generirali. Izgled naše tablice trebao bi izgledati kao na slici 3.

Stvaramo cjevovod za obradu tokovnih podataka. 2. dio
Slika 3. Izgled tablice

Objavljivanje podataka korisničkog dnevnika

Pub/Sub je kritična komponenta našeg cjevovoda jer omogućuje međusobnu komunikaciju više neovisnih aplikacija. Točnije, radi kao posrednik koji nam omogućuje slanje i primanje poruka između aplikacija. Prvo što trebamo učiniti je stvoriti temu. Jednostavno idite na Pub/Sub u konzoli i kliknite STVORI TEMU.

Kôd u nastavku poziva našu skriptu za generiranje gore definiranih podataka zapisnika, a zatim se povezuje i šalje zapisnike u Pub/Sub. Jedina stvar koju trebamo učiniti je stvoriti objekt PublisherClient, odredite put do teme pomoću metode topic_path i pozvati funkciju publish с topic_path i podataka. Napominjemo da uvozimo generate_log_line iz našeg scenarija stream_logs, stoga provjerite jesu li ove datoteke u istoj mapi, inače ćete dobiti pogrešku pri uvozu. Zatim to možemo pokrenuti putem naše google konzole koristeći:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Čim se datoteka pokrene, moći ćemo vidjeti izlaz podataka dnevnika na konzolu, kao što je prikazano na slici ispod. Ova skripta će raditi sve dok je ne koristimo CTRL + Cda ga dovršim.

Stvaramo cjevovod za obradu tokovnih podataka. 2. dio
Slika 4. Izlaz publish_logs.py

Pisanje našeg koda cjevovoda

Sada kada smo sve pripremili, možemo započeti zabavni dio - kodiranje našeg cjevovoda pomoću Beama i Pythona. Da bismo stvorili Beam cjevovod, moramo stvoriti objekt cjevovoda (p). Nakon što smo stvorili objekt cjevovoda, možemo primijeniti više funkcija jednu za drugom pomoću operatora pipe (|). Općenito, tijek rada izgleda kao na slici ispod.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

U našem kodu stvorit ćemo dvije prilagođene funkcije. Funkcija regex_clean, koji skenira podatke i dohvaća odgovarajući red na temelju popisa PATTERNS pomoću funkcije re.search. Funkcija vraća niz odvojen zarezom. Ako niste stručnjak za regularne izraze, preporučujem da pogledate ovo tutorial i vježbajte u bilježnici za provjeru koda. Nakon toga definiramo prilagođenu ParDo funkciju koja se zove Split, što je varijacija Beam transformacije za paralelnu obradu. U Pythonu se to radi na poseban način - moramo stvoriti klasu koja nasljeđuje klasu DoFn Beam. Funkcija Split preuzima raščlanjeni redak iz prethodne funkcije i vraća popis rječnika s ključevima koji odgovaraju nazivima stupaca u našoj BigQuery tablici. Ima nešto za napomenuti o ovoj funkciji: morao sam uvoziti datetime unutar funkcije kako bi ona radila. Dobivao sam pogrešku pri uvozu na početku datoteke, što je bilo čudno. Ovaj popis se zatim prosljeđuje funkciji WriteToBigQuery, koji jednostavno dodaje naše podatke u tablicu. Kôd za Batch DataFlow Job i Streaming DataFlow Job naveden je u nastavku. Jedina razlika između batch i streaming koda je da u batchu čitamo CSV src_pathkoristeći funkciju ReadFromText od Beama.

Batch DataFlow Job (skupna obrada)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (stream obrada)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Pokretanje pokretne trake

Cjevovod možemo pokrenuti na nekoliko različitih načina. Da smo htjeli, mogli bismo ga jednostavno pokrenuti lokalno s terminala dok se daljinski prijavljujemo na GCP.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Međutim, mi ćemo ga pokrenuti koristeći DataFlow. To možemo učiniti pomoću donje naredbe postavljanjem sljedećih potrebnih parametara.

  • project — ID vašeg GCP projekta.
  • runner je cjevovod koji će analizirati vaš program i konstruirati vaš cjevovod. Za rad u oblaku morate navesti DataflowRunner.
  • staging_location — put do pohrane u oblaku Cloud Dataflow za indeksiranje paketa kodova potrebnih procesorima koji obavljaju posao.
  • temp_location — put do Cloud Dataflow pohrane u oblaku za pohranu privremenih datoteka poslova stvorenih dok cjevovod radi.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Dok se ova naredba izvodi, možemo otići na karticu DataFlow na google konzoli i vidjeti naš cjevovod. Kada kliknemo na cjevovod, trebali bismo vidjeti nešto slično kao na slici 4. Za potrebe otklanjanja pogrešaka, može biti od velike pomoći otići na Dnevnike, a zatim na Stackdriver za pregled detaljnih zapisnika. To mi je pomoglo u rješavanju problema s cjevovodom u brojnim slučajevima.

Stvaramo cjevovod za obradu tokovnih podataka. 2. dio
Slika 4: Transportna greda

Pristupite našim podacima u BigQueryju

Dakle, već bismo trebali imati pokrenut cjevovod s podacima koji teku u našu tablicu. Da bismo ovo testirali, možemo otići na BigQuery i pogledati podatke. Nakon korištenja donje naredbe trebali biste vidjeti prvih nekoliko redaka skupa podataka. Sada kada imamo podatke pohranjene u BigQueryju, možemo provesti daljnju analizu, kao i podijeliti podatke s kolegama i početi odgovarati na poslovna pitanja.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Stvaramo cjevovod za obradu tokovnih podataka. 2. dio
Slika 5: BigQuery

Zaključak

Nadamo se da će ovaj post poslužiti kao koristan primjer stvaranja cjevovoda za strujanje podataka, kao i pronalaženja načina da podaci budu dostupniji. Pohranjivanje podataka u ovom formatu daje nam mnoge prednosti. Sada možemo početi odgovarati na važna pitanja kao što je koliko ljudi koristi naš proizvod? Raste li vaša baza korisnika s vremenom? S kojim aspektima proizvoda ljudi najviše komuniciraju? A ima li grešaka tamo gdje ih ne bi trebalo biti? To su pitanja koja će biti od interesa za organizaciju. Na temelju spoznaja koje proizlaze iz odgovora na ova pitanja, možemo poboljšati proizvod i povećati angažman korisnika.

Beam je stvarno koristan za ovu vrstu vježbi, a ima i niz drugih zanimljivih slučajeva upotrebe. Na primjer, možda želite analizirati podatke o dionicama u stvarnom vremenu i trgovati na temelju analize, možda imate podatke senzora koji dolaze iz vozila i želite izračunati izračune razine prometa. Možete također, na primjer, biti tvrtka za igre na sreću koja prikuplja korisničke podatke i koristi ih za izradu nadzornih ploča za praćenje ključnih metrika. U redu, gospodo, ovo je tema za drugi post, hvala na čitanju, a za one koji žele vidjeti cijeli kod, ispod je poveznica na moj GitHub.

https://github.com/DFoly/User_log_pipeline

To je sve. Pročitajte prvi dio.

Izvor: www.habr.com

Dodajte komentar