Ustvarimo cevovod za obdelavo tokovnih podatkov. 2. del

Pozdravljeni vsi skupaj. Delimo prevod zaključnega dela članka, pripravljenega posebej za študente tečaja. Podatkovni inženir. Prvi del lahko preberete tukaj.

Apache Beam in DataFlow za cevovode v realnem času

Ustvarimo cevovod za obdelavo tokovnih podatkov. 2. del

Nastavitev Google Cloud

Opomba: uporabil sem Google Cloud Shell za zagon cevovoda in objavo dnevniških podatkov po meri, ker sem imel težave z zagonom cevovoda v Pythonu 3. Google Cloud Shell uporablja Python 2, ki je bolj skladen z Apache Beam.

Za zagon cevovoda se moramo malo poglobiti v nastavitve. Tisti, ki še niste uporabljali GCP, boste morali slediti naslednjih 6 korakom, opisanih v tem Stran.

Po tem bomo morali naložiti svoje skripte v Google Cloud Storage in jih kopirati v Google Cloud Shel. Nalaganje v shrambo v oblaku je precej trivialno (opis najdete tukaj). Za kopiranje naših datotek lahko Google Cloud Shel odpremo v orodni vrstici s klikom na prvo ikono na levi na sliki 2 spodaj.

Ustvarimo cevovod za obdelavo tokovnih podatkov. 2. del
Slika 2

Ukazi, ki jih potrebujemo za kopiranje datotek in namestitev zahtevanih knjižnic, so navedeni spodaj.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Ustvarjanje naše baze podatkov in tabele

Ko končamo vse korake, povezane z nastavitvijo, je naslednja stvar, ki jo moramo narediti, ustvariti nabor podatkov in tabelo v BigQueryju. To lahko storite na več načinov, najpreprostejši pa je uporaba konzole Google Cloud tako, da najprej ustvarite nabor podatkov. Sledite lahko spodnjim korakom povezavaustvariti tabelo s shemo. Naša miza bo imela 7 stolpcev, ki ustreza komponentam vsakega uporabniškega dnevnika. Zaradi udobja bomo vse stolpce definirali kot nize, razen spremenljivke timelocal, in jih poimenovali glede na spremenljivke, ki smo jih ustvarili prej. Postavitev naše tabele bi morala izgledati kot na sliki 3.

Ustvarimo cevovod za obdelavo tokovnih podatkov. 2. del
Slika 3. Postavitev tabele

Objavljanje podatkov uporabniškega dnevnika

Pub/Sub je kritična komponenta našega cevovoda, saj omogoča medsebojno komunikacijo več neodvisnih aplikacij. Zlasti deluje kot posrednik, ki nam omogoča pošiljanje in prejemanje sporočil med aplikacijami. Prva stvar, ki jo moramo narediti, je ustvariti temo. Preprosto pojdite na Pub/Sub v konzoli in kliknite USTVARI TEMO.

Spodnja koda pokliče naš skript, da ustvari zgoraj definirane podatke dnevnika, nato pa se poveže in pošlje dnevnike v Pub/Sub. Edina stvar, ki jo moramo narediti, je ustvariti predmet PublisherClient, določite pot do teme z metodo topic_path in pokličite funkcijo publish с topic_path in podatkov. Upoštevajte, da uvažamo generate_log_line iz našega scenarija stream_logs, zato se prepričajte, da so te datoteke v isti mapi, sicer boste prejeli napako pri uvozu. To lahko nato izvajamo prek naše Googlove konzole z uporabo:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Takoj ko se datoteka zažene, bomo lahko videli izpis podatkov dnevnika v konzolo, kot je prikazano na spodnji sliki. Ta skript bo deloval, dokler ga ne bomo uporabljali CTRL + Cda ga dokončam.

Ustvarimo cevovod za obdelavo tokovnih podatkov. 2. del
Slika 4. Izhod publish_logs.py

Pisanje kode našega cevovoda

Zdaj, ko imamo vse pripravljeno, lahko začnemo z zabavnim delom – kodiranjem našega cevovoda z uporabo Beama in Pythona. Če želite ustvariti cevovod Beam, moramo ustvariti objekt cevovoda (p). Ko ustvarimo predmet cevovoda, lahko uporabimo več funkcij eno za drugo z uporabo operatorja pipe (|). Na splošno je potek dela videti kot spodnja slika.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

V naši kodi bomo ustvarili dve funkciji po meri. funkcija regex_clean, ki skenira podatke in pridobi ustrezno vrstico na podlagi seznama VZORCI z uporabo funkcije re.search. Funkcija vrne niz, ločen z vejico. Če niste strokovnjak za regularne izraze, priporočam, da preverite to vadnica in vadite v beležnici, da preverite kodo. Po tem definiramo funkcijo ParDo po meri, imenovano Split, ki je različica Beam transformacije za vzporedno obdelavo. V Pythonu je to storjeno na poseben način – ustvariti moramo razred, ki deduje razred DoFn Beam. Funkcija Split vzame razčlenjeno vrstico iz prejšnje funkcije in vrne seznam slovarjev s ključi, ki ustrezajo imenom stolpcev v naši tabeli BigQuery. Pri tej funkciji je treba nekaj opozoriti: moral sem uvoziti datetime znotraj funkcije, da bo delovala. Na začetku datoteke sem dobival napako pri uvozu, kar je bilo čudno. Ta seznam se nato posreduje funkciji WriteToBigQuery, ki preprosto doda naše podatke v tabelo. Koda za Batch DataFlow Job in Streaming DataFlow Job je navedena spodaj. Edina razlika med paketno in pretočno kodo je, da v paketu beremo CSV src_pathuporabo funkcije ReadFromText od Beam.

Batch DataFlow Job (paketna obdelava)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Pretočno opravilo DataFlow (pretočna obdelava)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Zagon tekočega traku

Cevovod lahko izvedemo na več različnih načinov. Če bi želeli, bi ga lahko zagnali lokalno s terminala, medtem ko bi se prijavljali v GCP na daljavo.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Vendar ga bomo zagnali z uporabo DataFlow. To lahko storimo s spodnjim ukazom, tako da nastavimo naslednje zahtevane parametre.

  • project — ID vašega projekta GCP.
  • runner je tekač cevovoda, ki bo analiziral vaš program in sestavil vaš cevovod. Če želite delovati v oblaku, morate določiti DataflowRunner.
  • staging_location — pot do shrambe v oblaku Cloud Dataflow za indeksiranje paketov kode, ki jih potrebujejo procesorji, ki opravljajo delo.
  • temp_location — pot do shrambe v oblaku Cloud Dataflow za shranjevanje začasnih datotek opravil, ustvarjenih med delovanjem cevovoda.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Medtem ko se ta ukaz izvaja, lahko gremo na zavihek DataFlow v Googlovi konzoli in si ogledamo naš cevovod. Ko kliknemo na cevovod, bi morali videti nekaj podobnega sliki 4. Za namene odpravljanja napak je lahko zelo koristno, če greste v Dnevnike in nato v Stackdriver, da si ogledate podrobne dnevnike. To mi je v številnih primerih pomagalo rešiti težave s cevovodom.

Ustvarimo cevovod za obdelavo tokovnih podatkov. 2. del
Slika 4: Nosilni transporter

Dostop do naših podatkov v BigQuery

Torej bi že morali imeti cevovod, ki teče s podatki, ki tečejo v našo tabelo. Če želite to preizkusiti, lahko gremo v BigQuery in pogledamo podatke. Po uporabi spodnjega ukaza bi morali videti prvih nekaj vrstic nabora podatkov. Zdaj, ko imamo podatke shranjene v BigQueryju, lahko izvajamo nadaljnje analize, podatke delimo s sodelavci in začnemo odgovarjati na poslovna vprašanja.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Ustvarimo cevovod za obdelavo tokovnih podatkov. 2. del
Slika 5: BigQuery

Zaključek

Upamo, da bo ta objava koristen primer ustvarjanja cevovoda za pretakanje podatkov in iskanja načinov za večjo dostopnost podatkov. Shranjevanje podatkov v tem formatu nam daje številne prednosti. Zdaj lahko začnemo odgovarjati na pomembna vprašanja, na primer koliko ljudi uporablja naš izdelek? Ali vaša baza uporabnikov sčasoma raste? S katerimi vidiki izdelka ljudje največ komunicirajo? In ali so napake tam, kjer jih ne bi smelo biti? To so vprašanja, ki bodo zanimala organizacijo. Na podlagi spoznanj, ki izhajajo iz odgovorov na ta vprašanja, lahko izboljšamo izdelek in povečamo angažiranost uporabnikov.

Beam je res uporaben za to vrsto vadbe in ima tudi številne druge zanimive primere uporabe. Na primer, morda boste želeli analizirati podatke o delniških tikih v realnem času in sklepati posle na podlagi analize, morda imate podatke senzorjev, ki prihajajo iz vozil, in želite izračunati izračune ravni prometa. Lahko ste na primer tudi igralniško podjetje, ki zbira uporabniške podatke in jih uporablja za ustvarjanje nadzornih plošč za sledenje ključnim meritvam. V redu, gospodje, to je tema za drugo objavo, hvala za branje in za tiste, ki želijo videti celotno kodo, je spodaj povezava do mojega GitHub-a.

https://github.com/DFoly/User_log_pipeline

To je vse. Preberi prvi del.

Vir: www.habr.com

Dodaj komentar