Tworzymy potok przetwarzania danych strumieniowych. Część 2

Cześć wszystkim. Udostępniamy tłumaczenie końcowej części artykułu, przygotowanej specjalnie dla studentów kursu. Inżynier danych. Możesz przeczytać pierwszą część tutaj.

Apache Beam i DataFlow dla potoków w czasie rzeczywistym

Tworzymy potok przetwarzania danych strumieniowych. Część 2

Konfigurowanie Google Cloud

Uwaga: do uruchomienia potoku i opublikowania niestandardowych danych dziennika użyłem Google Cloud Shell, ponieważ miałem problemy z uruchomieniem potoku w Pythonie 3. Google Cloud Shell używa Pythona 2, który jest bardziej spójny z Apache Beam.

Aby rozpocząć potok, musimy trochę pogrzebać w ustawieniach. Ci z Was, którzy nie korzystali wcześniej z GCP, będą musieli wykonać 6 następujących kroków opisanych w tym dokumencie strona.

Następnie będziemy musieli przesłać nasze skrypty do Google Cloud Storage i skopiować je do naszego Google Cloud Shel. Przesyłanie do magazynu w chmurze jest dość banalne (opis można znaleźć tutaj). Aby skopiować nasze pliki, możemy otworzyć Google Cloud Shel z paska narzędzi, klikając pierwszą ikonę po lewej stronie na rysunku 2 poniżej.

Tworzymy potok przetwarzania danych strumieniowych. Część 2
Rysunek 2

Polecenia potrzebne do skopiowania plików i zainstalowania wymaganych bibliotek wymieniono poniżej.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Tworzenie naszej bazy danych i tabeli

Po wykonaniu wszystkich kroków związanych z konfiguracją kolejną rzeczą, którą musimy zrobić, jest utworzenie zbioru danych i tabeli w BigQuery. Można to zrobić na kilka sposobów, ale najprostszym jest skorzystanie z konsoli Google Cloud i utworzenie najpierw zbioru danych. Możesz wykonać poniższe kroki powiązanieaby utworzyć tabelę ze schematem. Nasz stół będzie miał 7 kolumn, odpowiadające składnikom każdego dziennika użytkownika. Dla wygody zdefiniujemy wszystkie kolumny jako ciągi znaków, za wyjątkiem zmiennej timelocal i nazwiemy je zgodnie ze zmiennymi, które wygenerowaliśmy wcześniej. Układ naszej tabeli powinien wyglądać jak na rysunku 3.

Tworzymy potok przetwarzania danych strumieniowych. Część 2
Rysunek 3. Układ tabeli

Publikowanie danych dziennika użytkownika

Pub/Sub jest kluczowym elementem naszego potoku, ponieważ umożliwia komunikację wielu niezależnych aplikacji ze sobą. W szczególności pełni rolę pośrednika, który pozwala nam wysyłać i odbierać wiadomości pomiędzy aplikacjami. Pierwszą rzeczą, którą musimy zrobić, to stworzyć temat. Po prostu przejdź do Pub/Sub w konsoli i kliknij UTWÓRZ TEMAT.

Poniższy kod wywołuje nasz skrypt w celu wygenerowania danych dziennika zdefiniowanych powyżej, a następnie łączy się i wysyła dzienniki do Pub/Sub. Jedyne, co musimy zrobić, to stworzyć obiekt WydawcaKlient, określ ścieżkę do tematu za pomocą metody topic_path i wywołaj funkcję publish с topic_path i dane. Należy pamiętać, że importujemy generate_log_line z naszego skryptu stream_logs, więc upewnij się, że te pliki znajdują się w tym samym folderze, w przeciwnym razie pojawi się błąd importu. Następnie możemy uruchomić to za pomocą naszej konsoli Google, używając:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Gdy tylko plik zostanie uruchomiony, będziemy mogli zobaczyć dane wyjściowe dziennika przesyłane do konsoli, jak pokazano na poniższym rysunku. Ten skrypt będzie działał tak długo, jak nie będziemy go używać CTRL + Caby go ukończyć.

Tworzymy potok przetwarzania danych strumieniowych. Część 2
Rysunek 4. Dane wyjściowe publish_logs.py

Pisanie naszego kodu potoku

Teraz, gdy już wszystko przygotowaliśmy, możemy rozpocząć zabawną część - kodowanie naszego potoku przy użyciu Beam i Python. Aby utworzyć potok Beam, musimy utworzyć obiekt potoku (p). Po utworzeniu obiektu potoku możemy zastosować wiele funkcji jedna po drugiej za pomocą operatora pipe (|). Ogólnie rzecz biorąc, przepływ pracy wygląda jak na obrazku poniżej.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

W naszym kodzie utworzymy dwie niestandardowe funkcje. Funkcjonować regex_clean, który skanuje dane i pobiera odpowiedni wiersz na podstawie listy PATTERNS za pomocą tej funkcji re.search. Funkcja zwraca ciąg znaków oddzielony przecinkami. Jeśli nie jesteś ekspertem od wyrażeń regularnych, polecam sprawdzić to instruktaż i poćwicz w notatniku sprawdzanie kodu. Następnie definiujemy niestandardową funkcję ParDo o nazwie Split, która jest odmianą transformacji wiązki do przetwarzania równoległego. W Pythonie robi się to w specjalny sposób – musimy stworzyć klasę, która dziedziczy po klasie DoFn Beam. Funkcja Split pobiera przeanalizowany wiersz z poprzedniej funkcji i zwraca listę słowników z kluczami odpowiadającymi nazwom kolumn w naszej tabeli BigQuery. Jest coś, na co warto zwrócić uwagę na temat tej funkcji: musiałem zaimportować datetime wewnątrz funkcji, aby działała. Na początku pliku wystąpił błąd importu, co było dziwne. Lista ta jest następnie przekazywana do funkcji Napisz do BigQuery, który po prostu dodaje nasze dane do tabeli. Poniżej podano kod zadań Batch DataFlow i Streaming DataFlow. Jedyna różnica między kodem wsadowym a kodem strumieniowym polega na tym, że wsadowo odczytujemy plik CSV src_pathkorzystanie z funkcji ReadFromText od Bema.

Zadanie Batch DataFlow (przetwarzanie wsadowe)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Zadanie przesyłania strumieniowego DataFlow (przetwarzanie strumienia)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Uruchomienie przenośnika

Rurociąg możemy poprowadzić na kilka różnych sposobów. Gdybyśmy chcieli, moglibyśmy po prostu uruchomić go lokalnie z terminala, logując się zdalnie do GCP.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Będziemy go jednak uruchamiać za pomocą DataFlow. Możemy to zrobić za pomocą poniższego polecenia, ustawiając następujące wymagane parametry.

  • project — Identyfikator projektu GCP.
  • runner to narzędzie do uruchamiania potoku, które przeanalizuje Twój program i zbuduje potok. Aby działać w chmurze, musisz określić DataflowRunner.
  • staging_location — ścieżka do magazynu w chmurze Cloud Dataflow w celu indeksowania pakietów kodów potrzebnych procesorom wykonującym pracę.
  • temp_location — ścieżka do magazynu w chmurze Cloud Dataflow, w którym przechowywane są tymczasowe pliki zadań utworzone podczas działania potoku.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Gdy to polecenie jest uruchomione, możemy przejść do karty DataFlow w konsoli Google i wyświetlić nasz potok. Kiedy klikniemy potok, powinniśmy zobaczyć coś podobnego do rysunku 4. Do celów debugowania bardzo pomocne może być przejście do Logs, a następnie do Stackdriver, aby wyświetlić szczegółowe logi. W wielu przypadkach pomogło mi to rozwiązać problemy z rurociągami.

Tworzymy potok przetwarzania danych strumieniowych. Część 2
Rysunek 4: Przenośnik belkowy

Uzyskaj dostęp do naszych danych w BigQuery

Zatem powinniśmy już mieć uruchomiony potok z danymi wpływającymi do naszej tabeli. Aby to przetestować, możemy przejść do BigQuery i przejrzeć dane. Po użyciu poniższego polecenia powinieneś zobaczyć kilka pierwszych wierszy zbioru danych. Teraz, gdy mamy już dane zapisane w BigQuery, możemy przeprowadzić dalszą analizę, a także udostępnić dane współpracownikom i zacząć odpowiadać na pytania biznesowe.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Tworzymy potok przetwarzania danych strumieniowych. Część 2
Rysunek 5: BigQuery

wniosek

Mamy nadzieję, że ten post będzie użytecznym przykładem tworzenia potoku danych przesyłanych strumieniowo, a także znajdowania sposobów na zwiększenie dostępności danych. Przechowywanie danych w tym formacie daje nam wiele korzyści. Teraz możemy zacząć odpowiadać na ważne pytania, np. ile osób korzysta z naszego produktu? Czy Twoja baza użytkowników rośnie z biegiem czasu? Z jakimi aspektami produktu ludzie najczęściej wchodzą w interakcję? A czy są błędy tam, gdzie ich nie powinno być? To są pytania, które będą interesujące dla organizacji. Na podstawie wniosków płynących z odpowiedzi na te pytania możemy ulepszyć produkt i zwiększyć zaangażowanie użytkowników.

Beam jest naprawdę przydatny w tego typu ćwiczeniach i ma także wiele innych interesujących zastosowań. Możesz na przykład analizować dane dotyczące notowań giełdowych w czasie rzeczywistym i dokonywać transakcji na podstawie tej analizy, być może dysponujesz danymi z czujników pochodzących z pojazdów i chcesz obliczyć natężenie ruchu. Możesz także być firmą zajmującą się grami, która gromadzi dane użytkowników i wykorzystuje je do tworzenia pulpitów nawigacyjnych do śledzenia kluczowych wskaźników. OK, panowie, to temat na inny post, dziękuję za przeczytanie, a dla tych, którzy chcą zobaczyć pełny kod, poniżej zamieszczam link do mojego GitHuba.

https://github.com/DFoly/User_log_pipeline

To wszystko. Przeczytaj część pierwszą.

Źródło: www.habr.com

Dodaj komentarz