Korronteen datuak prozesatzeko kanalizazioa sortzen dugu. 2. zatia

Kaixo guztioi. Artikuluaren azken zatiaren itzulpena partekatzen ari gara, ikastaroko ikasleentzat espresuki prestatua. Datuen ingeniaria. Lehen zatia irakur dezakezu Hemen.

Apache Beam eta DataFlow denbora errealeko kanalizazioetarako

Korronteen datuak prozesatzeko kanalizazioa sortzen dugu. 2. zatia

Google Cloud konfiguratzen

Oharra: Google Cloud Shell erabili dut kanalizazioa exekutatzeko eta erregistro pertsonalizatuko datuak argitaratzeko, arazoak izan nituelako kanalizazioa Python 3-n exekutatzeko. Google Cloud Shell-ek Python 2 erabiltzen du, Apache Beam-ekin koherenteagoa dena.

Hodibidea hasteko, ezarpenetan pixka bat sakondu behar dugu. Aurretik GCP erabili ez duzuenontzat, honako 6 urrats hauek jarraitu beharko dituzue. Orrialdearen.

Horren ondoren, gure scriptak Google Cloud Storage-ra igo beharko ditugu eta gure Google Cloud Shel-era kopiatu beharko ditugu. Hodeiko biltegiratzera igotzea nahiko hutsala da (deskribapen bat aurki daiteke Hemen). Gure fitxategiak kopiatzeko, Google Cloud Shel tresna-barratik ireki dezakegu, beheko 2. irudiko ezkerreko lehenengo ikonoan klik eginez.

Korronteen datuak prozesatzeko kanalizazioa sortzen dugu. 2. zatia
2 irudia

Fitxategiak kopiatzeko eta beharrezko liburutegiak instalatzeko behar ditugun komandoak behean ageri dira.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Gure datu-basea eta taula sortzea

Behin konfigurazioarekin lotutako urrats guztiak amaituta, egin behar dugun hurrengo gauza BigQuery-n datu-multzo eta taula bat sortzea da. Horretarako hainbat modu daude, baina errazena Google Cloud kontsola erabiltzea da lehenik datu multzo bat sortuz. Beheko urratsak jarraitu ditzakezu linkeskema batekin taula bat sortzeko. Gure mahaia izango da 7 zutabe, erabiltzaileen erregistro bakoitzaren osagaiei dagozkienak. Erosotasunerako, zutabe guztiak kate gisa definituko ditugu, timelocal aldagaia izan ezik, eta lehenago sortu ditugun aldagaien arabera izendatuko ditugu. Gure taularen diseinuak 3. irudian bezala izan beharko luke.

Korronteen datuak prozesatzeko kanalizazioa sortzen dugu. 2. zatia
3. Irudia Taularen diseinua

Erabiltzaileen erregistro-datuak argitaratzea

Pub/Sub gure kanalizazioaren osagai kritikoa da, hainbat aplikazio independente elkarren artean komunikatzeko aukera ematen duelako. Bereziki, bitartekari gisa funtzionatzen du, aplikazioen artean mezuak bidali eta jasotzeko aukera ematen diguna. Egin behar dugun lehenengo gauza gai bat sortzea da. Besterik gabe, joan kontsolako Pub/Sub-era eta egin klik SORTU GAIA.

Beheko kodeak gure script-a deitzen du goian definitutako erregistro-datuak sortzeko eta gero erregistroak konektatu eta bidaltzen ditu Pub/Sub-era. Egin behar dugun gauza bakarra objektu bat sortzea da PublisherClient, zehaztu gaiaren bidea metodoa erabiliz topic_path eta deitu funtzioari publish с topic_path eta datuak. Kontuan izan inportatzen dugula generate_log_line gure gidoitik stream_logs, beraz, ziurtatu fitxategi hauek karpeta berean daudela, bestela inportazio-errore bat jasoko duzu. Ondoren, gure google kontsolaren bidez exekutatu dezakegu:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Fitxategia exekutatu bezain laster, erregistroko datuen irteera kontsolara ikusi ahal izango dugu, beheko irudian ikusten den moduan. Script honek funtzionatuko du erabiltzen ez dugun bitartean CTRL + Cosatzeko.

Korronteen datuak prozesatzeko kanalizazioa sortzen dugu. 2. zatia
4. irudia. Irteera publish_logs.py

Gure kanalizazioaren kodea idazten

Dena prestatuta daukagunez, zati dibertigarriari ekin diezaiokegu: Beam eta Python erabiliz gure kanalizazioa kodetzen. Beam kanalizazioa sortzeko, kanalizazio objektu bat sortu behar dugu (p). Pipeline objektu bat sortu ondoren, hainbat funtzio aplika ditzakegu bata bestearen atzetik operadorea erabiliz pipe (|). Oro har, lan-fluxuak beheko irudiaren itxura du.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Gure kodean, bi funtzio pertsonalizatu sortuko ditugu. Funtzioa regex_clean, datuak eskaneatzen dituena eta dagokion errenkada berreskuratzen duena, PATTERNS zerrendan oinarrituta funtzioa erabiliz re.search. Funtzioak komaz bereizitako kate bat itzultzen du. Adierazpen erregular aditua ez bazara, hau egiaztatzea gomendatzen dizut tutoretza eta praktikatu koaderno batean kodea egiaztatzeko. Honen ondoren, deitzen den ParDo funtzio pertsonalizatu bat definitzen dugu Split, prozesaketa paralelorako Beam transformazioaren aldakuntza bat da. Python-en, hau modu berezi batean egiten da - DoFn Beam klasetik heredatzen den klase bat sortu behar dugu. Zatitu funtzioak aurreko funtzioko analizatutako errenkada hartzen du eta gure BigQuery taulako zutabe-izenei dagozkien gakoen hiztegien zerrenda itzultzen du. Funtzio honi buruz badago zer esanik: inportatu behar izan nuen datetime funtzio baten barruan funtziona dezan. Fitxategiaren hasieran inportazio-errore bat jasotzen ari nintzen, arraroa. Zerrenda hau funtziora pasatzen da WriteToBigQuery, gure datuak taulara gehitzen dituena. Batch DataFlow Job eta Streaming DataFlow Lanaren kodea behean ematen da. Batch eta streaming kodearen arteko desberdintasun bakarra batch-ean CSV irakurtzen dugula da src_pathfuntzioa erabiliz ReadFromText Beam-etik.

Batch DataFlow lana (batch prozesatzea)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow lana (korronteen prozesatzea)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Zinta garraiatzailea abiaraztea

Hainbat modutan exekutatu dezakegu kanalizazioa. Nahi izanez gero, lokalean exekutatu genezake terminal batetik GCPn urrunetik saioa hasten dugun bitartean.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Hala ere, DataFlow erabiliz exekutatuko dugu. Hau egin dezakegu beheko komandoa erabiliz eskatzen diren parametro hauek ezarriz.

  • project β€” Zure GCP proiektuaren IDa.
  • runner zure programa aztertu eta zure kanalizazioa eraikiko duen kanalizazioa da. Hodeian exekutatzeko, DataflowRunner bat zehaztu behar duzu.
  • staging_location β€” lana egiten duten prozesadoreek behar dituzten kode paketeak indexatzeko Cloud Dataflow hodeiko biltegirako bidea.
  • temp_location β€” Hodeiko Dataflow hodeiko biltegirako bidea kanalizazioa martxan dagoen bitartean sortutako aldi baterako lan-fitxategiak gordetzeko.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Komando hau exekutatzen ari den bitartean, google kontsolako DataFlow fitxara joan eta gure kanalizazioa ikus dezakegu. Hodietan klik egiten dugunean, 4. irudiaren antzeko zerbait ikusi beharko genuke. Arazketa-helburuetarako, oso lagungarria izan daiteke Logs-era joatea eta gero Stackdriver-era erregistro zehatzak ikusteko. Honek kanalizazio-arazoak konpontzen lagundu dit hainbat kasutan.

Korronteen datuak prozesatzeko kanalizazioa sortzen dugu. 2. zatia
4. Irudia: Habe-garraiatzailea

Sartu gure datuak BigQuery-n

Beraz, dagoeneko kanalizazio bat izan beharko genuke gure taulara sartzen diren datuekin. Hau probatzeko, BigQuery-ra joan eta datuak begiratu ditzakegu. Beheko komandoa erabili ondoren datu-multzoaren lehen errenkadak ikusi beharko dituzu. Datuak BigQuery-n gordeta dauzkagunean, azterketa gehiago egin ditzakegu, baita datuak lankideekin partekatu eta negozioaren galderei erantzuten hasi.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Korronteen datuak prozesatzeko kanalizazioa sortzen dugu. 2. zatia
5. irudia: BigQuery

Ondorioa

Espero dugu argitalpen hau zuzeneko datuen kanalizazioa sortzeko adibide erabilgarria izatea, baita datuak eskuragarriago bihurtzeko moduak aurkitzeko ere. Datuak formatu honetan gordetzeak abantaila asko ematen dizkigu. Orain has gaitezke galdera garrantzitsuei erantzuten, esate baterako, zenbat pertsonek erabiltzen dute gure produktua? Zure erabiltzaile-basea hazten al da denborarekin? Produktuaren zein alderdirekin elkarreragiten du jendeak gehien? Eta egon behar ez den lekuan akatsak daude? Hauek dira erakundearentzat interesgarriak izango diren galderak. Galdera hauen erantzunetatik ateratzen diren ikuspegietan oinarrituta, produktua hobetu eta erabiltzaileen konpromisoa areagotu dezakegu.

Beam oso erabilgarria da ariketa mota honetarako eta beste erabilera kasu interesgarri batzuk ere baditu. Esate baterako, baliteke stock tick datuak denbora errealean aztertu eta analisian oinarritutako merkataritzak egin nahi izatea, agian ibilgailuetatik datozen sentsore datuak dituzu eta trafiko-mailaren kalkuluak kalkulatu nahi dituzu. Gainera, adibidez, erabiltzaileen datuak biltzen dituen joko-enpresa bat izan zaitezke eta aginte-panelak sortzeko erabiltzen dituen neurketa gakoen jarraipena egiteko. Ados, jaunak, hau beste mezu baterako gaia da, eskerrik asko irakurtzeagatik, eta kode osoa ikusi nahi dutenentzat, behean dago nire GitHub-erako esteka.

https://github.com/DFoly/User_log_pipeline

Hori da dena. Irakurri lehen zatia.

Iturria: www.habr.com

Gehitu iruzkin berria