Vytváříme kanál pro zpracování datových proudů. Část 2

Ahoj všichni. Sdílíme překlad závěrečné části článku, připravené speciálně pro studenty kurzu. datový inženýr. První díl si můžete přečíst zde.

Apache Beam a DataFlow pro potrubí v reálném čase

Vytváříme kanál pro zpracování datových proudů. Část 2

Nastavení Google Cloud

Poznámka: Ke spuštění kanálu a publikování vlastních dat protokolu jsem použil Google Cloud Shell, protože jsem měl potíže se spuštěním kanálu v Pythonu 3. Google Cloud Shell používá Python 2, který je konzistentnější s Apache Beam.

Pro spuštění pipeline se musíme trochu pohrabat v nastavení. Pro ty z vás, kteří GCP dosud nepoužívali, budete muset postupovat podle následujících 6 kroků popsaných v tomto dokumentu strana.

Poté budeme muset nahrát naše skripty do Google Cloud Storage a zkopírovat je do našeho Google Cloud Shel. Nahrávání do cloudového úložiště je celkem triviální (popis naleznete zde). Chcete-li zkopírovat naše soubory, můžeme otevřít Google Cloud Shel z panelu nástrojů kliknutím na první ikonu vlevo na obrázku 2 níže.

Vytváříme kanál pro zpracování datových proudů. Část 2
Obrázek 2

Příkazy, které potřebujeme ke zkopírování souborů a instalaci požadovaných knihoven, jsou uvedeny níže.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Vytvoření naší databáze a tabulky

Jakmile dokončíme všechny kroky související s nastavením, další věc, kterou musíme udělat, je vytvořit datovou sadu a tabulku v BigQuery. Existuje několik způsobů, jak to provést, ale nejjednodušší je použít konzolu Google Cloud a nejprve vytvořit datovou sadu. Můžete postupovat podle níže uvedených kroků odkazk vytvoření tabulky se schématem. Náš stůl bude mít 7 sloupců, odpovídající komponentám každého uživatelského protokolu. Pro usnadnění nadefinujeme všechny sloupce jako řetězce, kromě proměnné timelocal, a pojmenujeme je podle proměnných, které jsme vygenerovali dříve. Rozložení naší tabulky by mělo vypadat jako na obrázku 3.

Vytváříme kanál pro zpracování datových proudů. Část 2
Obrázek 3. Rozvržení tabulky

Publikování dat uživatelského protokolu

Pub/Sub je kritickou součástí našeho potrubí, protože umožňuje vzájemnou komunikaci více nezávislých aplikací. Funguje zejména jako prostředník, který nám umožňuje posílat a přijímat zprávy mezi aplikacemi. První věc, kterou musíme udělat, je vytvořit téma. Jednoduše přejděte na Pub/Sub v konzole a klikněte na VYTVOŘIT TÉMA.

Níže uvedený kód zavolá náš skript, aby vygeneroval data protokolu definovaná výše, a poté se připojí a odešle protokoly do Pub/Sub. Jediné, co musíme udělat, je vytvořit objekt Vydavatelský klient, určete cestu k tématu pomocí metody topic_path a zavolejte funkci publish с topic_path a dat. Upozorňujeme, že dovážíme generate_log_line z našeho scénáře stream_logs, takže se ujistěte, že jsou tyto soubory ve stejné složce, jinak se zobrazí chyba importu. Poté to můžeme spustit prostřednictvím naší konzole Google pomocí:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Jakmile se soubor spustí, budeme moci vidět výstup dat protokolu do konzole, jak je znázorněno na obrázku níže. Tento skript bude fungovat, dokud ho nebudeme používat CTRL + Cdokončit to.

Vytváříme kanál pro zpracování datových proudů. Část 2
Obrázek 4. Výstup publish_logs.py

Psaní kódu kanálu

Nyní, když máme vše připraveno, můžeme začít zábavnou část – kódování našeho potrubí pomocí Beam a Python. Chcete-li vytvořit potrubí Beam, musíme vytvořit objekt potrubí (p). Jakmile vytvoříme objekt potrubí, můžeme použít několik funkcí jednu po druhé pomocí operátoru pipe (|). Obecně pracovní postup vypadá jako na obrázku níže.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

V našem kódu vytvoříme dvě vlastní funkce. Funkce regex_clean, který naskenuje data a pomocí funkce načte odpovídající řádek na základě seznamu PATTERNS re.search. Funkce vrací řetězec oddělený čárkami. Pokud nejste odborníkem na regulární výrazy, doporučuji toto zkontrolovat tutorial a procvičte si v poznámkovém bloku kontrolu kódu. Poté definujeme vlastní funkci ParDo nazvanou Rozdělit, což je varianta Beam transformace pro paralelní zpracování. V Pythonu se to dělá speciálním způsobem – musíme vytvořit třídu, která dědí z třídy DoFn Beam. Funkce Split přebírá analyzovaný řádek z předchozí funkce a vrací seznam slovníků s klíči odpovídajícími názvům sloupců v naší tabulce BigQuery. K této funkci je třeba poznamenat: Musel jsem importovat datetime uvnitř funkce, aby fungovala. Zobrazovala se mi chyba importu na začátku souboru, což bylo divné. Tento seznam je poté předán funkci WriteToBigQuery, který jednoduše přidá naše data do tabulky. Kód pro Batch DataFlow Job a Streaming DataFlow Job je uveden níže. Jediný rozdíl mezi dávkovým a streamovacím kódem je ten, že v dávce čteme CSV src_pathpomocí funkce ReadFromText od Beam.

Batch DataFlow Job (dávkové zpracování)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (zpracování streamu)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Spuštění dopravníku

Potrubí můžeme provozovat několika různými způsoby. Kdybychom chtěli, mohli bychom jej spustit lokálně z terminálu a zároveň se vzdáleně přihlašovat do GCP.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Budeme jej však provozovat pomocí DataFlow. Můžeme to udělat pomocí níže uvedeného příkazu nastavením následujících požadovaných parametrů.

  • project — ID vašeho projektu GCP.
  • runner je pipeline runner, který bude analyzovat váš program a sestavovat vaše potrubí. Chcete-li spustit v cloudu, musíte zadat DataflowRunner.
  • staging_location — cesta ke cloudovému úložišti Cloud Dataflow pro indexování balíčků kódu, které potřebují procesory provádějící práci.
  • temp_location — cesta ke cloudovému úložišti Cloud Dataflow pro ukládání dočasných souborů úloh vytvořených za běhu kanálu.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Zatímco je tento příkaz spuštěn, můžeme přejít na kartu DataFlow v konzoli Google a zobrazit náš kanál. Když klikneme na kanál, měli bychom vidět něco podobného jako na obrázku 4. Pro účely ladění může být velmi užitečné přejít do Logs (Protokoly) a poté do Stackdriveru pro zobrazení podrobných logů. To mi pomohlo vyřešit problémy s potrubím v řadě případů.

Vytváříme kanál pro zpracování datových proudů. Část 2
Obrázek 4: Pásový dopravník

Získejte přístup k našim datům v BigQuery

Měli bychom tedy již mít spuštěný kanál s daty proudícími do naší tabulky. Abychom to otestovali, můžeme přejít do BigQuery a podívat se na data. Po použití níže uvedeného příkazu byste měli vidět prvních několik řádků datové sady. Nyní, když máme data uložená v BigQuery, můžeme provádět další analýzy, sdílet data s kolegy a začít odpovídat na obchodní otázky.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Vytváříme kanál pro zpracování datových proudů. Část 2
Obrázek 5: BigQuery

Závěr

Doufáme, že tento příspěvek poslouží jako užitečný příklad vytvoření streamingového datového kanálu a také jako nalezení způsobů, jak zpřístupnit data. Ukládání dat v tomto formátu nám poskytuje mnoho výhod. Nyní můžeme začít odpovídat na důležité otázky, například kolik lidí používá náš produkt? Roste v čase vaše uživatelská základna? S jakými aspekty produktu lidé nejvíce interagují? A jsou chyby tam, kde by být neměly? To jsou otázky, které budou organizaci zajímat. Na základě poznatků, které vyplynou z odpovědí na tyto otázky, můžeme vylepšit produkt a zvýšit zapojení uživatelů.

Beam je pro tento typ cvičení opravdu užitečný a má také řadu dalších zajímavých případů použití. Můžete například chtít analyzovat údaje o akciích v reálném čase a provádět obchody na základě analýzy, možná máte data ze senzorů pocházející z vozidel a chcete vypočítat výpočty úrovně provozu. Můžete také být například herní společností, která shromažďuje uživatelská data a používá je k vytváření řídicích panelů pro sledování klíčových metrik. Dobře, pánové, toto je téma na další příspěvek, díky za přečtení a pro ty, kteří chtějí vidět celý kód, níže je odkaz na můj GitHub.

https://github.com/DFoly/User_log_pipeline

To je všechno. Přečtěte si část první.

Zdroj: www.habr.com

Přidat komentář