Vytvárame kanál na spracovanie dátových prúdov. Časť 2

Ahojte všetci. Zdieľame preklad záverečnej časti článku pripravenej špeciálne pre študentov kurzu. dátový inžinier. Môžete si prečítať prvú časť tu.

Apache Beam a DataFlow pre potrubia v reálnom čase

Vytvárame kanál na spracovanie dátových prúdov. Časť 2

Nastavenie služby Google Cloud

Poznámka: Použil som Google Cloud Shell na spustenie kanála a zverejnenie údajov vlastného denníka, pretože som mal problémy so spustením kanála v Pythone 3. Google Cloud Shell používa Python 2, ktorý je konzistentnejší s Apache Beam.

Ak chcete spustiť potrubie, musíme sa trochu pohrabať v nastaveniach. Pre tých z vás, ktorí ešte GCP nepoužili, budete musieť postupovať podľa nasledujúcich 6 krokov uvedených v tomto dokumente strana.

Potom budeme musieť nahrať naše skripty do Google Cloud Storage a skopírovať ich do nášho Google Cloud Shel. Nahrávanie do cloudového úložiska je celkom triviálne (popis nájdete tu). Ak chcete skopírovať naše súbory, môžeme otvoriť Google Cloud Shel z panela nástrojov kliknutím na prvú ikonu vľavo na obrázku 2 nižšie.

Vytvárame kanál na spracovanie dátových prúdov. Časť 2
Obrázok 2

Príkazy, ktoré potrebujeme na skopírovanie súborov a inštaláciu požadovaných knižníc, sú uvedené nižšie.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Vytvorenie našej databázy a tabuľky

Keď dokončíme všetky kroky súvisiace s nastavením, ďalšia vec, ktorú musíme urobiť, je vytvoriť množinu údajov a tabuľku v nástroji BigQuery. Existuje niekoľko spôsobov, ako to urobiť, ale najjednoduchšie je použiť konzolu Google Cloud tak, že najskôr vytvoríte množinu údajov. Môžete postupovať podľa nižšie uvedených krokov odkazna vytvorenie tabuľky so schémou. Náš stôl bude mať 7 stĺpcov, ktoré zodpovedajú komponentom každého denníka používateľov. Pre pohodlie zadefinujeme všetky stĺpce ako reťazce, okrem premennej timelocal, a pomenujeme ich podľa premenných, ktoré sme predtým vygenerovali. Rozloženie našej tabuľky by malo vyzerať ako na obrázku 3.

Vytvárame kanál na spracovanie dátových prúdov. Časť 2
Obrázok 3. Rozloženie tabuľky

Publikovanie údajov denníka používateľov

Pub/Sub je kritickým komponentom nášho potrubia, pretože umožňuje viacerým nezávislým aplikáciám navzájom komunikovať. Funguje najmä ako sprostredkovateľ, ktorý nám umožňuje posielať a prijímať správy medzi aplikáciami. Prvá vec, ktorú musíme urobiť, je vytvoriť tému. Jednoducho prejdite na Pub/Sub v konzole a kliknite na VYTVORIŤ TÉMU.

Nižšie uvedený kód zavolá náš skript na vygenerovanie údajov denníka definovaných vyššie a potom sa pripojí a odošle denníky do Pub/Sub. Jediné, čo musíme urobiť, je vytvoriť objekt PublisherClient, zadajte cestu k téme pomocou metódy topic_path a zavolajte funkciu publish с topic_path a údaje. Upozorňujeme, že dovážame generate_log_line z nášho scenára stream_logs, takže sa uistite, že sú tieto súbory v rovnakom priečinku, inak sa zobrazí chyba importu. Potom to môžeme spustiť cez našu konzolu Google pomocou:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Hneď ako sa súbor spustí, budeme môcť vidieť výstup údajov denníka do konzoly, ako je znázornené na obrázku nižšie. Tento skript bude fungovať, kým ho nebudeme používať CTRL + Cdokončiť to.

Vytvárame kanál na spracovanie dátových prúdov. Časť 2
Obrázok 4. Výstup publish_logs.py

Písanie nášho kódu potrubia

Teraz, keď máme všetko pripravené, môžeme začať zábavnú časť – kódovanie nášho potrubia pomocou Beam a Python. Na vytvorenie potrubia Beam musíme vytvoriť objekt potrubia (p). Po vytvorení objektu potrubia môžeme pomocou operátora použiť viacero funkcií jednu po druhej pipe (|). Vo všeobecnosti pracovný postup vyzerá ako na obrázku nižšie.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

V našom kóde vytvoríme dve vlastné funkcie. Funkcia regex_clean, ktorý naskenuje údaje a pomocou funkcie získa príslušný riadok na základe zoznamu VZORY re.search. Funkcia vráti reťazec oddelený čiarkou. Ak nie ste odborník na regulárne výrazy, odporúčam vám pozrieť si toto tutoriál a precvičte si v poznámkovom bloku kontrolu kódu. Potom definujeme vlastnú funkciu ParDo s názvom Rozdeliť, čo je variácia transformácie Beam pre paralelné spracovanie. V Pythone sa to robí špeciálnym spôsobom – musíme vytvoriť triedu, ktorá dedí z triedy DoFn Beam. Funkcia Split preberá analyzovaný riadok z predchádzajúcej funkcie a vracia zoznam slovníkov s kľúčmi zodpovedajúcimi názvom stĺpcov v našej tabuľke BigQuery. K tejto funkcii je potrebné poznamenať niečo: Musel som importovať datetime vnútri funkcie, aby fungovala. Na začiatku súboru som dostal chybu importu, čo bolo zvláštne. Tento zoznam sa potom odovzdá funkcii WriteToBigQuery, ktorý jednoducho pridá naše údaje do tabuľky. Kód pre Batch DataFlow Job a Streaming DataFlow Job je uvedený nižšie. Jediný rozdiel medzi dávkovým a streamovacím kódom je v tom, že v dávke čítame CSV z src_pathpomocou funkcie ReadFromText od Beam.

Batch DataFlow Job (dávkové spracovanie)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (spracovanie streamu)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Spustenie dopravníka

Potrubie môžeme spustiť niekoľkými rôznymi spôsobmi. Ak by sme chceli, mohli by sme ho spustiť lokálne z terminálu a zároveň sa vzdialene prihlasovať do GCP.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

My ho však spustíme pomocou DataFlow. Môžeme to urobiť pomocou nižšie uvedeného príkazu nastavením nasledujúcich požadovaných parametrov.

  • project — ID vášho projektu GCP.
  • runner je kanál, ktorý analyzuje váš program a zostaví váš kanál. Ak chcete spustiť v cloude, musíte zadať DataflowRunner.
  • staging_location — cesta k cloudovému úložisku Cloud Dataflow na indexovanie balíkov kódu, ktoré potrebujú procesory vykonávajúce prácu.
  • temp_location — cesta k cloudovému úložisku Cloud Dataflow na ukladanie dočasných súborov úloh vytvorených počas behu kanála.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Kým je tento príkaz spustený, môžeme prejsť na kartu DataFlow v konzole Google a zobraziť náš kanál. Keď klikneme na kanál, mali by sme vidieť niečo podobné ako na obrázku 4. Na účely ladenia môže byť veľmi užitočné prejsť na protokoly a potom na položku Stackdriver, kde si môžete pozrieť podrobné protokoly. To mi pomohlo vyriešiť problémy s potrubím v mnohých prípadoch.

Vytvárame kanál na spracovanie dátových prúdov. Časť 2
Obrázok 4: Nosný dopravník

Získajte prístup k našim údajom v nástroji BigQuery

Takže by sme už mali mať spustený kanál s údajmi prúdiacimi do našej tabuľky. Aby sme to otestovali, môžeme prejsť do nástroja BigQuery a pozrieť sa na údaje. Po použití nižšie uvedeného príkazu by ste mali vidieť niekoľko prvých riadkov množiny údajov. Teraz, keď máme údaje uložené v BigQuery, môžeme vykonávať ďalšiu analýzu, ako aj zdieľať údaje s kolegami a začať odpovedať na obchodné otázky.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Vytvárame kanál na spracovanie dátových prúdov. Časť 2
Obrázok 5: BigQuery

Záver

Dúfame, že tento príspevok poslúži ako užitočný príklad vytvorenia streamingového dátového kanála, ako aj nájdenia spôsobov, ako sprístupniť dáta. Ukladanie údajov v tomto formáte nám poskytuje mnoho výhod. Teraz môžeme začať odpovedať na dôležité otázky, ako napríklad koľko ľudí používa náš produkt? Rastie vaša používateľská základňa v priebehu času? S akými aspektmi produktu ľudia najviac interagujú? A sú chyby tam, kde by byť nemali? Toto sú otázky, ktoré budú zaujímať organizáciu. Na základe poznatkov, ktoré vyplynú z odpovedí na tieto otázky, môžeme zlepšiť produkt a zvýšiť zapojenie používateľov.

Beam je skutočne užitočný pre tento typ cvičenia a má aj množstvo ďalších zaujímavých prípadov použitia. Môžete napríklad chcieť analyzovať údaje o akciách v reálnom čase a obchodovať na základe analýzy, možno máte údaje zo senzorov pochádzajúce z vozidiel a chcete vypočítať výpočty úrovne premávky. Môžete byť napríklad aj herná spoločnosť, ktorá zhromažďuje údaje o používateľoch a používa ich na vytváranie informačných panelov na sledovanie kľúčových metrík. Dobre, páni, toto je téma na ďalší príspevok, ďakujem za prečítanie a pre tých, ktorí chcú vidieť celý kód, nižšie je odkaz na môj GitHub.

https://github.com/DFoly/User_log_pipeline

To je všetko. Prečítajte si časť prvá.

Zdroj: hab.com

Pridať komentár