Cześć wszystkim. Udostępniamy tłumaczenie końcowej części artykułu, przygotowanej specjalnie dla studentów kursu.
Apache Beam i DataFlow dla potoków w czasie rzeczywistym
Konfigurowanie Google Cloud
Uwaga: do uruchomienia potoku i opublikowania niestandardowych danych dziennika użyłem Google Cloud Shell, ponieważ miałem problemy z uruchomieniem potoku w Pythonie 3. Google Cloud Shell używa Pythona 2, który jest bardziej spójny z Apache Beam.
Aby rozpocząć potok, musimy trochę pogrzebać w ustawieniach. Ci z Was, którzy nie korzystali wcześniej z GCP, będą musieli wykonać 6 następujących kroków opisanych w tym dokumencie
Następnie będziemy musieli przesłać nasze skrypty do Google Cloud Storage i skopiować je do naszego Google Cloud Shel. Przesyłanie do magazynu w chmurze jest dość banalne (opis można znaleźć
Rysunek 2
Polecenia potrzebne do skopiowania plików i zainstalowania wymaganych bibliotek wymieniono poniżej.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Tworzenie naszej bazy danych i tabeli
Po wykonaniu wszystkich kroków związanych z konfiguracją kolejną rzeczą, którą musimy zrobić, jest utworzenie zbioru danych i tabeli w BigQuery. Można to zrobić na kilka sposobów, ale najprostszym jest skorzystanie z konsoli Google Cloud i utworzenie najpierw zbioru danych. Możesz wykonać poniższe kroki
Rysunek 3. Układ tabeli
Publikowanie danych dziennika użytkownika
Pub/Sub jest kluczowym elementem naszego potoku, ponieważ umożliwia komunikację wielu niezależnych aplikacji ze sobą. W szczególności pełni rolę pośrednika, który pozwala nam wysyłać i odbierać wiadomości pomiędzy aplikacjami. Pierwszą rzeczą, którą musimy zrobić, to stworzyć temat. Po prostu przejdź do Pub/Sub w konsoli i kliknij UTWÓRZ TEMAT.
Poniższy kod wywołuje nasz skrypt w celu wygenerowania danych dziennika zdefiniowanych powyżej, a następnie łączy się i wysyła dzienniki do Pub/Sub. Jedyne, co musimy zrobić, to stworzyć obiekt WydawcaKlient, określ ścieżkę do tematu za pomocą metody topic_path
i wywołaj funkcję publish
с topic_path
i dane. Należy pamiętać, że importujemy generate_log_line
z naszego skryptu stream_logs
, więc upewnij się, że te pliki znajdują się w tym samym folderze, w przeciwnym razie pojawi się błąd importu. Następnie możemy uruchomić to za pomocą naszej konsoli Google, używając:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Gdy tylko plik zostanie uruchomiony, będziemy mogli zobaczyć dane wyjściowe dziennika przesyłane do konsoli, jak pokazano na poniższym rysunku. Ten skrypt będzie działał tak długo, jak nie będziemy go używać CTRL + Caby go ukończyć.
Rysunek 4. Dane wyjściowe publish_logs.py
Pisanie naszego kodu potoku
Teraz, gdy już wszystko przygotowaliśmy, możemy rozpocząć zabawną część - kodowanie naszego potoku przy użyciu Beam i Python. Aby utworzyć potok Beam, musimy utworzyć obiekt potoku (p). Po utworzeniu obiektu potoku możemy zastosować wiele funkcji jedna po drugiej za pomocą operatora pipe (|)
. Ogólnie rzecz biorąc, przepływ pracy wygląda jak na obrazku poniżej.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
W naszym kodzie utworzymy dwie niestandardowe funkcje. Funkcjonować regex_clean
, który skanuje dane i pobiera odpowiedni wiersz na podstawie listy PATTERNS za pomocą tej funkcji re.search
. Funkcja zwraca ciąg znaków oddzielony przecinkami. Jeśli nie jesteś ekspertem od wyrażeń regularnych, polecam sprawdzić to datetime
wewnątrz funkcji, aby działała. Na początku pliku wystąpił błąd importu, co było dziwne. Lista ta jest następnie przekazywana do funkcji Napisz do BigQuery, który po prostu dodaje nasze dane do tabeli. Poniżej podano kod zadań Batch DataFlow i Streaming DataFlow. Jedyna różnica między kodem wsadowym a kodem strumieniowym polega na tym, że wsadowo odczytujemy plik CSV src_path
korzystanie z funkcji ReadFromText
od Bema.
Zadanie Batch DataFlow (przetwarzanie wsadowe)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Zadanie przesyłania strumieniowego DataFlow (przetwarzanie strumienia)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Uruchomienie przenośnika
Rurociąg możemy poprowadzić na kilka różnych sposobów. Gdybyśmy chcieli, moglibyśmy po prostu uruchomić go lokalnie z terminala, logując się zdalnie do GCP.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Będziemy go jednak uruchamiać za pomocą DataFlow. Możemy to zrobić za pomocą poniższego polecenia, ustawiając następujące wymagane parametry.
project
— Identyfikator projektu GCP.runner
to narzędzie do uruchamiania potoku, które przeanalizuje Twój program i zbuduje potok. Aby działać w chmurze, musisz określić DataflowRunner.staging_location
— ścieżka do magazynu w chmurze Cloud Dataflow w celu indeksowania pakietów kodów potrzebnych procesorom wykonującym pracę.temp_location
— ścieżka do magazynu w chmurze Cloud Dataflow, w którym przechowywane są tymczasowe pliki zadań utworzone podczas działania potoku.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Gdy to polecenie jest uruchomione, możemy przejść do karty DataFlow w konsoli Google i wyświetlić nasz potok. Kiedy klikniemy potok, powinniśmy zobaczyć coś podobnego do rysunku 4. Do celów debugowania bardzo pomocne może być przejście do Logs, a następnie do Stackdriver, aby wyświetlić szczegółowe logi. W wielu przypadkach pomogło mi to rozwiązać problemy z rurociągami.
Rysunek 4: Przenośnik belkowy
Uzyskaj dostęp do naszych danych w BigQuery
Zatem powinniśmy już mieć uruchomiony potok z danymi wpływającymi do naszej tabeli. Aby to przetestować, możemy przejść do BigQuery i przejrzeć dane. Po użyciu poniższego polecenia powinieneś zobaczyć kilka pierwszych wierszy zbioru danych. Teraz, gdy mamy już dane zapisane w BigQuery, możemy przeprowadzić dalszą analizę, a także udostępnić dane współpracownikom i zacząć odpowiadać na pytania biznesowe.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Rysunek 5: BigQuery
wniosek
Mamy nadzieję, że ten post będzie użytecznym przykładem tworzenia potoku danych przesyłanych strumieniowo, a także znajdowania sposobów na zwiększenie dostępności danych. Przechowywanie danych w tym formacie daje nam wiele korzyści. Teraz możemy zacząć odpowiadać na ważne pytania, np. ile osób korzysta z naszego produktu? Czy Twoja baza użytkowników rośnie z biegiem czasu? Z jakimi aspektami produktu ludzie najczęściej wchodzą w interakcję? A czy są błędy tam, gdzie ich nie powinno być? To są pytania, które będą interesujące dla organizacji. Na podstawie wniosków płynących z odpowiedzi na te pytania możemy ulepszyć produkt i zwiększyć zaangażowanie użytkowników.
Beam jest naprawdę przydatny w tego typu ćwiczeniach i ma także wiele innych interesujących zastosowań. Możesz na przykład analizować dane dotyczące notowań giełdowych w czasie rzeczywistym i dokonywać transakcji na podstawie tej analizy, być może dysponujesz danymi z czujników pochodzących z pojazdów i chcesz obliczyć natężenie ruchu. Możesz także być firmą zajmującą się grami, która gromadzi dane użytkowników i wykorzystuje je do tworzenia pulpitów nawigacyjnych do śledzenia kluczowych wskaźników. OK, panowie, to temat na inny post, dziękuję za przeczytanie, a dla tych, którzy chcą zobaczyć pełny kod, poniżej zamieszczam link do mojego GitHuba.
To wszystko.
Źródło: www.habr.com