Creăm o conductă de procesare a datelor în flux. Partea 2

Salutare tuturor. Împărtășim traducerea părții finale a articolului, pregătită special pentru studenții cursului. Inginer de date. Puteți citi prima parte aici.

Apache Beam și DataFlow pentru conducte în timp real

Creăm o conductă de procesare a datelor în flux. Partea 2

Configurarea Google Cloud

Notă: am folosit Google Cloud Shell pentru a rula conducta și a publica date de jurnal personalizate, deoarece aveam probleme la rularea conductei în Python 3. Google Cloud Shell folosește Python 2, care este mai consistent cu Apache Beam.

Pentru a începe conducta, trebuie să pătrundem puțin în setări. Pentru cei dintre voi care nu au folosit GCP înainte, va trebui să urmați următorii 6 pași descriși în acest pagină.

După aceasta, va trebui să încărcăm scripturile noastre în Google Cloud Storage și să le copiam în Google Cloud Shel. Încărcarea în spațiul de stocare în cloud este destul de banală (o descriere poate fi găsită aici). Pentru a copia fișierele noastre, putem deschide Google Cloud Shel din bara de instrumente făcând clic pe prima pictogramă din stânga din Figura 2 de mai jos.

Creăm o conductă de procesare a datelor în flux. Partea 2
Figura 2

Comenzile de care avem nevoie pentru a copia fișierele și a instala bibliotecile necesare sunt enumerate mai jos.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Crearea bazei de date și a tabelului nostru

După ce am finalizat toți pașii legați de configurare, următorul lucru pe care trebuie să-l facem este să creăm un set de date și un tabel în BigQuery. Există mai multe moduri de a face acest lucru, dar cel mai simplu este să utilizați consola Google Cloud creând mai întâi un set de date. Puteți urma pașii de mai jos legăturăpentru a crea un tabel cu o schemă. Masa noastră va avea 7 coloane, corespunzătoare componentelor fiecărui jurnal de utilizator. Pentru comoditate, vom defini toate coloanele ca șiruri de caractere, cu excepția variabilei timelocal, și le vom numi în funcție de variabilele pe care le-am generat mai devreme. Dispunerea tabelului nostru ar trebui să arate ca în Figura 3.

Creăm o conductă de procesare a datelor în flux. Partea 2
Figura 3. Dispunerea tabelului

Publicarea datelor din jurnalul utilizatorului

Pub/Sub este o componentă critică a conductei noastre, deoarece permite mai multor aplicații independente să comunice între ele. În special, funcționează ca un intermediar care ne permite să trimitem și să primim mesaje între aplicații. Primul lucru pe care trebuie să-l facem este să creăm un subiect. Pur și simplu accesați Pub/Sub în consolă și faceți clic pe CREATE TOPIC.

Codul de mai jos apelează scriptul nostru pentru a genera datele de jurnal definite mai sus și apoi se conectează și trimite jurnalele către Pub/Sub. Singurul lucru pe care trebuie să-l facem este să creăm un obiect PublisherClient, specificați calea către subiect folosind metoda topic_path și apelați funcția publish с topic_path și date. Vă rugăm să rețineți că importăm generate_log_line din scenariul nostru stream_logs, deci asigurați-vă că aceste fișiere sunt în același folder, altfel veți primi o eroare de import. Apoi putem rula acest lucru prin consola noastră Google folosind:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

De îndată ce fișierul rulează, vom putea vedea rezultatul datelor de jurnal către consolă, așa cum se arată în figura de mai jos. Acest script va funcționa atâta timp cât nu îl folosim CTRL + Cpentru a o completa.

Creăm o conductă de procesare a datelor în flux. Partea 2
Figura 4. Ieșire publish_logs.py

Scrierea codului conductei noastre

Acum că avem totul pregătit, putem începe partea distractivă - codificarea conductei noastre folosind Beam și Python. Pentru a crea o conductă Beam, trebuie să creăm un obiect conductă (p). Odată ce am creat un obiect pipeline, putem aplica mai multe funcții una după alta folosind operatorul pipe (|). În general, fluxul de lucru arată ca imaginea de mai jos.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

În codul nostru, vom crea două funcții personalizate. Funcţie regex_clean, care scanează datele și preia rândul corespunzător pe baza listei PATTERNE folosind funcția re.search. Funcția returnează un șir separat prin virgulă. Dacă nu sunteți un expert în exprimarea obișnuită, vă recomand să verificați acest lucru tutorial și exersați într-un blocnotes pentru a verifica codul. După aceasta definim o funcție personalizată numită ParDo Despică, care este o variație a transformării Beam pentru procesare paralelă. În Python, acest lucru se face într-un mod special - trebuie să creăm o clasă care moștenește din clasa DoFn Beam. Funcția Split preia rândul analizat din funcția anterioară și returnează o listă de dicționare cu chei corespunzătoare numelor de coloane din tabelul nostru BigQuery. Este ceva de remarcat despre această funcție: a trebuit să import datetime în interiorul unei funcții pentru a o face să funcționeze. Primeam o eroare de import la începutul fișierului, ceea ce era ciudat. Această listă este apoi transmisă funcției WriteToBigQuery, care adaugă pur și simplu datele noastre la tabel. Codul pentru Job DataFlow Lot și Job Streaming DataFlow este prezentat mai jos. Singura diferență dintre codul lot și codul de streaming este că în lot citim CSV-ul din src_pathfolosind funcția ReadFromText de la Beam.

Batch DataFlow Job (procesare în lot)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (procesare flux)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Pornirea transportorului

Putem rula conducta în mai multe moduri diferite. Dacă dorim, am putea să-l rulăm local de pe un terminal în timp ce ne conectăm la GCP de la distanță.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Cu toate acestea, îl vom rula folosind DataFlow. Putem face acest lucru folosind comanda de mai jos, setând următorii parametri necesari.

  • project — ID-ul proiectului dvs. GCP.
  • runner este un conducător de conducte care vă va analiza programul și vă va construi conducta. Pentru a rula în cloud, trebuie să specificați un DataflowRunner.
  • staging_location — calea către stocarea cloud Cloud Dataflow pentru pachetele de coduri de indexare necesare procesoarelor care efectuează munca.
  • temp_location — calea către stocarea în cloud Cloud Dataflow pentru stocarea fișierelor de job temporare create în timpul rulării conductei.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

În timp ce această comandă rulează, putem accesa fila DataFlow din consola Google și putem vizualiza conducta noastră. Când facem clic pe conductă, ar trebui să vedem ceva similar cu Figura 4. Pentru scopuri de depanare, poate fi foarte util să mergeți la Logs și apoi la Stackdriver pentru a vizualiza jurnalele detaliate. Acest lucru m-a ajutat să rezolv problemele legate de conducte într-un număr de cazuri.

Creăm o conductă de procesare a datelor în flux. Partea 2
Figura 4: Transportor cu fascicul

Accesați datele noastre în BigQuery

Deci, ar trebui să avem deja o conductă care rulează cu date care curg în tabelul nostru. Pentru a testa acest lucru, putem accesa BigQuery și privim datele. După ce ați folosit comanda de mai jos, ar trebui să vedeți primele câteva rânduri ale setului de date. Acum că avem datele stocate în BigQuery, putem efectua analize suplimentare, precum și să împărtășim datele cu colegii și să începem să răspundem la întrebările de afaceri.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Creăm o conductă de procesare a datelor în flux. Partea 2
Figura 5: BigQuery

Concluzie

Sperăm că această postare să servească drept exemplu util de creare a unei conducte de date în flux, precum și de găsire a modalităților de a face datele mai accesibile. Stocarea datelor în acest format ne oferă multe avantaje. Acum putem începe să răspundem la întrebări importante precum câți oameni folosesc produsul nostru? Baza ta de utilizatori crește în timp? Cu ce ​​aspecte ale produsului interacționează cel mai mult oamenii? Și există erori acolo unde nu ar trebui să existe? Acestea sunt întrebările care vor fi de interes pentru organizație. Pe baza informațiilor care reiese din răspunsurile la aceste întrebări, putem îmbunătăți produsul și crește implicarea utilizatorilor.

Beam este cu adevărat util pentru acest tip de exercițiu și are și o serie de alte cazuri de utilizare interesante. De exemplu, s-ar putea să doriți să analizați datele de stocare în timp real și să faceți tranzacții pe baza analizei, poate aveți date senzoriale care provin de la vehicule și doriți să calculați calculele nivelului de trafic. De asemenea, puteți fi, de exemplu, o companie de jocuri care colectează date despre utilizatori și le folosește pentru a crea tablouri de bord pentru a urmări valorile cheie. Bine, domnilor, acesta este un subiect pentru o altă postare, mulțumesc pentru citire, iar pentru cei care doresc să vadă codul complet, mai jos este linkul către GitHub-ul meu.

https://github.com/DFoly/User_log_pipeline

Asta e tot. Citiți prima parte.

Sursa: www.habr.com

Adauga un comentariu