Kreiramo cevovod za obradu striming podataka. Dio 2

Zdravo svima. Delimo prevod završnog dela članka, pripremljen posebno za studente kursa. Data Engineer. Možete pročitati prvi dio ovdje.

Apache Beam i DataFlow za cjevovode u realnom vremenu

Kreiramo cevovod za obradu striming podataka. Dio 2

Postavljanje Google Clouda

Napomena: Koristio sam Google Cloud Shell da pokrenem cjevovod i objavim prilagođene podatke dnevnika jer sam imao problema s pokretanjem cjevovoda u Pythonu 3. Google Cloud Shell koristi Python 2, koji je više konzistentan sa Apache Beamom.

Da bismo pokrenuli cevovod, trebamo malo kopati u postavkama. Za one od vas koji ranije niste koristili GCP, morat ćete slijediti sljedećih 6 koraka navedenih u ovome stranica.

Nakon ovoga, morat ćemo otpremiti naše skripte u Google Cloud Storage i kopirati ih u naš Google Cloud Shel. Otpremanje u pohranu u oblaku je prilično trivijalno (možete pronaći opis ovdje). Da kopiramo naše datoteke, možemo otvoriti Google Cloud Shel sa trake sa alatkama klikom na prvu ikonu s lijeve strane na slici 2 ispod.

Kreiramo cevovod za obradu striming podataka. Dio 2
2 Figure

Komande koje su nam potrebne za kopiranje datoteka i instaliranje potrebnih biblioteka su navedene u nastavku.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Kreiranje naše baze podataka i tabele

Nakon što završimo sve korake vezane za podešavanje, sljedeća stvar koju trebamo učiniti je kreirati skup podataka i tablicu u BigQueryju. Postoji nekoliko načina da to učinite, ali najjednostavniji je korištenje Google Cloud konzole tako što ćete prvo kreirati skup podataka. Možete slijediti dolje navedene korake linkda kreirate tabelu sa šemom. Naš sto će imati 7 kolona, koji odgovara komponentama svakog korisničkog dnevnika. Radi praktičnosti, sve kolone ćemo definirati kao nizove, osim vremenske lokalne varijable, i imenovati ih prema varijablama koje smo ranije generirali. Izgled naše tabele trebao bi izgledati kao na slici 3.

Kreiramo cevovod za obradu striming podataka. Dio 2
Slika 3. Izgled tabele

Objavljivanje podataka dnevnika korisnika

Pub/Sub je kritična komponenta našeg cevovoda jer omogućava više nezavisnih aplikacija da međusobno komuniciraju. Konkretno, radi kao posrednik koji nam omogućava slanje i primanje poruka između aplikacija. Prvo što treba da uradimo je da kreiramo temu. Jednostavno idite na Pub/Sub na konzoli i kliknite KREIRAJ TEMU.

Kod u nastavku poziva našu skriptu za generiranje podataka dnevnika definiranih gore, a zatim se povezuje i šalje dnevnike u Pub/Sub. Jedino što treba da uradimo je da kreiramo objekat PublisherClient, navedite putanju do teme koristeći metodu topic_path i pozovite funkciju publish с topic_path i podatke. Napominjemo da uvozimo generate_log_line iz našeg scenarija stream_logs, pa provjerite jesu li ove datoteke u istom folderu, inače ćete dobiti grešku pri uvozu. Zatim ovo možemo pokrenuti kroz našu google konzolu koristeći:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Čim se datoteka pokrene, moći ćemo vidjeti izlaz podataka dnevnika na konzolu, kao što je prikazano na donjoj slici. Ova skripta će raditi sve dok je ne koristimo CTRL + Cda ga dovršim.

Kreiramo cevovod za obradu striming podataka. Dio 2
Slika 4. Izlaz publish_logs.py

Pisanje našeg koda za cjevovod

Sada kada smo sve pripremili, možemo početi sa zabavnim dijelom - kodiranjem našeg cjevovoda koristeći Beam i Python. Da bismo kreirali cevovod Beam, moramo kreirati objekat cjevovoda (p). Nakon što smo kreirali objekt cjevovoda, možemo primijeniti više funkcija jednu za drugom koristeći operator pipe (|). Općenito, tok posla izgleda kao na slici ispod.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

U našem kodu ćemo kreirati dvije prilagođene funkcije. Funkcija regex_clean, koji skenira podatke i dohvaća odgovarajući red na osnovu liste PATTERNS koristeći funkciju re.search. Funkcija vraća string odvojen zarezom. Ako niste stručnjak za regularne izraze, preporučujem da pogledate ovo tutorial i vježbajte u notesu da provjerite kod. Nakon toga definiramo prilagođenu ParDo funkciju koja se zove Split, što je varijacija Beam transformacije za paralelnu obradu. U Pythonu se to radi na poseban način - moramo kreirati klasu koja nasljeđuje klasu DoFn Beam. Funkcija Split uzima raščlanjeni red iz prethodne funkcije i vraća listu rječnika s ključevima koji odgovaraju nazivima stupaca u našoj BigQuery tablici. Treba napomenuti nešto o ovoj funkciji: morao sam uvesti datetime unutar funkcije kako bi ona funkcionirala. Dobivao sam grešku pri uvozu na početku datoteke, što je bilo čudno. Ova lista se zatim prosljeđuje funkciji WriteToBigQuery, koji jednostavno dodaje naše podatke u tabelu. Kôd za Batch DataFlow posao i Streaming DataFlow posao je dat ispod. Jedina razlika između paketnog koda i koda za striming je u tome što u paketu čitamo CSV iz src_pathkoristeći funkciju ReadFromText from Beam.

Batch DataFlow posao (skupna obrada)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow posao (obrada toka)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Pokretanje cjevovoda

Cjevovod možemo voditi na nekoliko različitih načina. Ako želimo, mogli bismo ga samo pokrenuti lokalno s terminala dok se daljinski prijavljivali na GCP.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Međutim, mi ćemo ga pokrenuti koristeći DataFlow. To možemo učiniti pomoću donje naredbe postavljanjem sljedećih potrebnih parametara.

  • project — ID vašeg GCP projekta.
  • runner je cevovod koji će analizirati vaš program i konstruisati vaš cevovod. Da biste radili u oblaku, morate navesti DataflowRunner.
  • staging_location — putanja do Cloud Dataflow skladišta u oblaku za indeksiranje paketa kodova potrebnih procesorima koji obavljaju posao.
  • temp_location — putanja do Cloud Dataflow skladišta u oblaku za pohranjivanje privremenih datoteka poslova kreiranih dok je cjevovod pokrenut.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Dok je ova naredba pokrenuta, možemo otići na karticu DataFlow u google konzoli i vidjeti naš cjevovod. Kada kliknemo na cevovod, trebalo bi da vidimo nešto slično kao na slici 4. Za potrebe otklanjanja grešaka, može biti od velike pomoći da odete na Logs, a zatim na Stackdriver da vidite detaljne evidencije. Ovo mi je pomoglo da riješim probleme s plinovodom u brojnim slučajevima.

Kreiramo cevovod za obradu striming podataka. Dio 2
Slika 4: Transporter sa gredama

Pristupite našim podacima u BigQueryju

Dakle, već bismo trebali imati cjevovod koji radi s podacima koji teku u našu tabelu. Da bismo ovo testirali, možemo otići na BigQuery i pogledati podatke. Nakon upotrebe naredbe ispod, trebali biste vidjeti prvih nekoliko redova skupa podataka. Sada kada imamo podatke pohranjene u BigQueryju, možemo vršiti dalju analizu, kao i podijeliti podatke sa kolegama i početi odgovarati na poslovna pitanja.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Kreiramo cevovod za obradu striming podataka. Dio 2
Slika 5: BigQuery

zaključak

Nadamo se da će ovaj post poslužiti kao koristan primjer za kreiranje streaming podataka, kao i za pronalaženje načina da podaci budu dostupniji. Pohranjivanje podataka u ovom formatu daje nam mnoge prednosti. Sada možemo početi odgovarati na važna pitanja poput koliko ljudi koristi naš proizvod? Raste li vaša korisnička baza s vremenom? S kojim aspektima proizvoda ljudi najviše komuniciraju? I ima li grešaka tamo gdje ih ne bi trebalo biti? Ovo su pitanja koja će zanimati organizaciju. Na osnovu uvida koji proizlaze iz odgovora na ova pitanja, možemo poboljšati proizvod i povećati angažman korisnika.

Beam je zaista koristan za ovu vrstu vježbe, a ima i niz drugih zanimljivih slučajeva korištenja. Na primjer, možda ćete htjeti analizirati podatke o zalihama u realnom vremenu i trgovati na osnovu analize, možda imate podatke senzora koji dolaze iz vozila i želite izračunati kalkulacije nivoa prometa. Možete, na primjer, biti i kompanija za igre koja prikuplja korisničke podatke i koristi ih za kreiranje nadzornih ploča za praćenje ključnih metrika. Dobro, gospodo, ovo je tema za još jedan post, hvala na čitanju, a za one koji žele da vide ceo kod, ispod je link na moj GitHub.

https://github.com/DFoly/User_log_pipeline

To je sve. Pročitaj prvi dio.

izvor: www.habr.com

Dodajte komentar