Ahojte všetci. Zdieľame preklad záverečnej časti článku pripravenej špeciálne pre študentov kurzu.
Apache Beam a DataFlow pre potrubia v reálnom čase
Nastavenie služby Google Cloud
Poznámka: Použil som Google Cloud Shell na spustenie kanála a zverejnenie údajov vlastného denníka, pretože som mal problémy so spustením kanála v Pythone 3. Google Cloud Shell používa Python 2, ktorý je konzistentnejší s Apache Beam.
Ak chcete spustiť potrubie, musíme sa trochu pohrabať v nastaveniach. Pre tých z vás, ktorí ešte GCP nepoužili, budete musieť postupovať podľa nasledujúcich 6 krokov uvedených v tomto dokumente
Potom budeme musieť nahrať naše skripty do Google Cloud Storage a skopírovať ich do nášho Google Cloud Shel. Nahrávanie do cloudového úložiska je celkom triviálne (popis nájdete
Obrázok 2
Príkazy, ktoré potrebujeme na skopírovanie súborov a inštaláciu požadovaných knižníc, sú uvedené nižšie.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Vytvorenie našej databázy a tabuľky
Keď dokončíme všetky kroky súvisiace s nastavením, ďalšia vec, ktorú musíme urobiť, je vytvoriť množinu údajov a tabuľku v nástroji BigQuery. Existuje niekoľko spôsobov, ako to urobiť, ale najjednoduchšie je použiť konzolu Google Cloud tak, že najskôr vytvoríte množinu údajov. Môžete postupovať podľa nižšie uvedených krokov
Obrázok 3. Rozloženie tabuľky
Publikovanie údajov denníka používateľov
Pub/Sub je kritickým komponentom nášho potrubia, pretože umožňuje viacerým nezávislým aplikáciám navzájom komunikovať. Funguje najmä ako sprostredkovateľ, ktorý nám umožňuje posielať a prijímať správy medzi aplikáciami. Prvá vec, ktorú musíme urobiť, je vytvoriť tému. Jednoducho prejdite na Pub/Sub v konzole a kliknite na VYTVORIŤ TÉMU.
Nižšie uvedený kód zavolá náš skript na vygenerovanie údajov denníka definovaných vyššie a potom sa pripojí a odošle denníky do Pub/Sub. Jediné, čo musíme urobiť, je vytvoriť objekt PublisherClient, zadajte cestu k téme pomocou metódy topic_path
a zavolajte funkciu publish
с topic_path
a údaje. Upozorňujeme, že dovážame generate_log_line
z nášho scenára stream_logs
, takže sa uistite, že sú tieto súbory v rovnakom priečinku, inak sa zobrazí chyba importu. Potom to môžeme spustiť cez našu konzolu Google pomocou:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Hneď ako sa súbor spustí, budeme môcť vidieť výstup údajov denníka do konzoly, ako je znázornené na obrázku nižšie. Tento skript bude fungovať, kým ho nebudeme používať CTRL + Cdokončiť to.
Obrázok 4. Výstup publish_logs.py
Písanie nášho kódu potrubia
Teraz, keď máme všetko pripravené, môžeme začať zábavnú časť – kódovanie nášho potrubia pomocou Beam a Python. Na vytvorenie potrubia Beam musíme vytvoriť objekt potrubia (p). Po vytvorení objektu potrubia môžeme pomocou operátora použiť viacero funkcií jednu po druhej pipe (|)
. Vo všeobecnosti pracovný postup vyzerá ako na obrázku nižšie.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
V našom kóde vytvoríme dve vlastné funkcie. Funkcia regex_clean
, ktorý naskenuje údaje a pomocou funkcie získa príslušný riadok na základe zoznamu VZORY re.search
. Funkcia vráti reťazec oddelený čiarkou. Ak nie ste odborník na regulárne výrazy, odporúčam vám pozrieť si toto datetime
vnútri funkcie, aby fungovala. Na začiatku súboru som dostal chybu importu, čo bolo zvláštne. Tento zoznam sa potom odovzdá funkcii WriteToBigQuery, ktorý jednoducho pridá naše údaje do tabuľky. Kód pre Batch DataFlow Job a Streaming DataFlow Job je uvedený nižšie. Jediný rozdiel medzi dávkovým a streamovacím kódom je v tom, že v dávke čítame CSV z src_path
pomocou funkcie ReadFromText
od Beam.
Batch DataFlow Job (dávkové spracovanie)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Streaming DataFlow Job (spracovanie streamu)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Spustenie dopravníka
Potrubie môžeme spustiť niekoľkými rôznymi spôsobmi. Ak by sme chceli, mohli by sme ho spustiť lokálne z terminálu a zároveň sa vzdialene prihlasovať do GCP.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
My ho však spustíme pomocou DataFlow. Môžeme to urobiť pomocou nižšie uvedeného príkazu nastavením nasledujúcich požadovaných parametrov.
project
— ID vášho projektu GCP.runner
je kanál, ktorý analyzuje váš program a zostaví váš kanál. Ak chcete spustiť v cloude, musíte zadať DataflowRunner.staging_location
— cesta k cloudovému úložisku Cloud Dataflow na indexovanie balíkov kódu, ktoré potrebujú procesory vykonávajúce prácu.temp_location
— cesta k cloudovému úložisku Cloud Dataflow na ukladanie dočasných súborov úloh vytvorených počas behu kanála.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Kým je tento príkaz spustený, môžeme prejsť na kartu DataFlow v konzole Google a zobraziť náš kanál. Keď klikneme na kanál, mali by sme vidieť niečo podobné ako na obrázku 4. Na účely ladenia môže byť veľmi užitočné prejsť na protokoly a potom na položku Stackdriver, kde si môžete pozrieť podrobné protokoly. To mi pomohlo vyriešiť problémy s potrubím v mnohých prípadoch.
Obrázok 4: Nosný dopravník
Získajte prístup k našim údajom v nástroji BigQuery
Takže by sme už mali mať spustený kanál s údajmi prúdiacimi do našej tabuľky. Aby sme to otestovali, môžeme prejsť do nástroja BigQuery a pozrieť sa na údaje. Po použití nižšie uvedeného príkazu by ste mali vidieť niekoľko prvých riadkov množiny údajov. Teraz, keď máme údaje uložené v BigQuery, môžeme vykonávať ďalšiu analýzu, ako aj zdieľať údaje s kolegami a začať odpovedať na obchodné otázky.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Obrázok 5: BigQuery
Záver
Dúfame, že tento príspevok poslúži ako užitočný príklad vytvorenia streamingového dátového kanála, ako aj nájdenia spôsobov, ako sprístupniť dáta. Ukladanie údajov v tomto formáte nám poskytuje mnoho výhod. Teraz môžeme začať odpovedať na dôležité otázky, ako napríklad koľko ľudí používa náš produkt? Rastie vaša používateľská základňa v priebehu času? S akými aspektmi produktu ľudia najviac interagujú? A sú chyby tam, kde by byť nemali? Toto sú otázky, ktoré budú zaujímať organizáciu. Na základe poznatkov, ktoré vyplynú z odpovedí na tieto otázky, môžeme zlepšiť produkt a zvýšiť zapojenie používateľov.
Beam je skutočne užitočný pre tento typ cvičenia a má aj množstvo ďalších zaujímavých prípadov použitia. Môžete napríklad chcieť analyzovať údaje o akciách v reálnom čase a obchodovať na základe analýzy, možno máte údaje zo senzorov pochádzajúce z vozidiel a chcete vypočítať výpočty úrovne premávky. Môžete byť napríklad aj herná spoločnosť, ktorá zhromažďuje údaje o používateľoch a používa ich na vytváranie informačných panelov na sledovanie kľúčových metrík. Dobre, páni, toto je téma na ďalší príspevok, ďakujem za prečítanie a pre tých, ktorí chcú vidieť celý kód, nižšie je odkaz na môj GitHub.
To je všetko.
Zdroj: hab.com