Sveiki visi. Dalinamės paskutinės straipsnio dalies vertimu, parengtu specialiai kurso studentams.
„Apache Beam“ ir „DataFlow“, skirta realaus laiko vamzdynams
„Google Cloud“ nustatymas
Pastaba: naudoju „Google Cloud Shell“, kad paleisčiau dujotiekį ir paskelbčiau tinkintus žurnalo duomenis, nes kilo problemų paleisdamas dujotiekį „Python 3“. „Google Cloud Shell“ naudoja Python 2, kuri labiau suderinama su „Apache Beam“.
Norėdami pradėti dujotiekį, turime šiek tiek įsigilinti į nustatymus. Tiems iš jūsų, kurie anksčiau nenaudojote GSP, turėsite atlikti toliau nurodytus 6 veiksmus
Po to turėsime įkelti savo scenarijus į „Google Cloud Storage“ ir nukopijuoti juos į „Google Cloud Shel“. Įkėlimas į debesies saugyklą yra gana trivialus (aprašą rasite
Pav 2
Toliau pateiktos komandos, kurių mums reikia norint nukopijuoti failus ir įdiegti reikiamas bibliotekas.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Sukuriame mūsų duomenų bazę ir lentelę
Kai atliksime visus su sąranka susijusius veiksmus, kitas dalykas, kurį turime padaryti, yra sukurti duomenų rinkinį ir lentelę „BigQuery“. Tai galima padaryti keliais būdais, tačiau paprasčiausias yra naudoti „Google Cloud“ konsolę, pirmiausia sukuriant duomenų rinkinį. Galite atlikti toliau nurodytus veiksmus
3 pav. Lentelės išdėstymas
Naudotojo žurnalo duomenų publikavimas
„Pub/Sub“ yra esminis mūsų konvejerio komponentas, nes jis leidžia kelioms nepriklausomoms programoms bendrauti tarpusavyje. Visų pirma, jis veikia kaip tarpininkas, leidžiantis siųsti ir gauti pranešimus tarp programų. Pirmas dalykas, kurį turime padaryti, yra sukurti temą. Tiesiog eikite į „Pub/Sub“ pulte ir spustelėkite KURTI TEMĄ.
Toliau pateiktas kodas iškviečia mūsų scenarijų, kad sugeneruotų aukščiau apibrėžtus žurnalo duomenis, tada prisijungia ir siunčia žurnalus į Pub / Sub. Vienintelis dalykas, kurį turime padaryti, yra sukurti objektą PublisherClient, nurodykite kelią į temą naudodami metodą topic_path
ir iškvieskite funkciją publish
с topic_path
ir duomenis. Atkreipkite dėmesį, kad mes importuojame generate_log_line
iš mūsų scenarijaus stream_logs
, todėl įsitikinkite, kad šie failai yra tame pačiame aplanke, kitaip gausite importavimo klaidą. Tada galime tai paleisti naudodami „Google“ konsolę naudodami:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Kai tik failas bus paleistas, galėsime matyti žurnalo duomenų išvestį į konsolę, kaip parodyta paveikslėlyje žemiau. Šis scenarijus veiks tol, kol jo nenaudosime CTRL + Cją užbaigti.
4 pav. Išvestis publish_logs.py
Rašome mūsų vamzdyno kodą
Dabar, kai jau viską paruošėme, galime pradėti smagiąją dalį – koduoti dujotiekį naudojant „Beam“ ir „Python“. Norėdami sukurti „Beam“ vamzdyną, turime sukurti dujotiekio objektą (p). Sukūrę dujotiekio objektą, naudodami operatorių galime taikyti kelias funkcijas vieną po kitos pipe (|)
. Apskritai darbo eiga atrodo kaip paveikslėlyje žemiau.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Savo kode sukursime dvi pasirinktines funkcijas. Funkcija regex_clean
, kuris nuskaito duomenis ir nuskaito atitinkamą eilutę pagal sąrašą PATTERNS naudojant funkciją re.search
. Funkcija grąžina kableliais atskirtą eilutę. Jei nesate reguliariųjų reiškinių ekspertas, rekomenduoju tai patikrinti datetime
funkcijos viduje, kad ji veiktų. Failo pradžioje gavau importavimo klaidą, kuri buvo keista. Tada šis sąrašas perduodamas funkcijai WriteToBigQuery, kuris tiesiog prideda mūsų duomenis į lentelę. Batch DataFlow Job ir Streaming DataFlow Job kodas pateiktas toliau. Vienintelis skirtumas tarp paketinio ir srautinio kodo yra tas, kad paketu mes skaitome CSV iš src_path
naudojant funkciją ReadFromText
iš Beam.
Paketinis duomenų srauto darbas (paketinis apdorojimas)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Srautinio duomenų srauto užduotis (srautinis apdorojimas)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Konvejerio paleidimas
Dujotiekį galime paleisti keliais skirtingais būdais. Jei norėtume, galėtume tiesiog paleisti jį vietoje iš terminalo, nuotoliniu būdu prisijungdami prie GCP.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Tačiau mes ketiname jį paleisti naudodami „DataFlow“. Tai galime padaryti naudodami toliau pateiktą komandą, nustatydami šiuos būtinus parametrus.
project
– GCP projekto ID.runner
yra dujotiekio bėgikas, kuris išanalizuos jūsų programą ir sukurs jūsų dujotiekį. Norėdami paleisti debesyje, turite nurodyti DataflowRunner.staging_location
— kelias į debesies saugyklą „Cloud Dataflow“, skirtas kodo paketams, reikalingiems darbus atliekantiems procesoriams, indeksuoti.temp_location
— kelias į „Cloud Dataflow“ debesies saugyklą, skirtą laikiniems darbo failams, sukurtiems, kai veikia dujotiekis, saugoti.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Kol ši komanda vykdoma, galime eiti į „Google“ pulto „DataFlow“ skirtuką ir peržiūrėti savo konvejerį. Kai spustelėjame dujotiekį, turėtume pamatyti kažką panašaus į 4 pav. Derinimo tikslais gali būti labai naudinga eiti į žurnalus ir tada į „Stackdriver“, kad peržiūrėtumėte išsamius žurnalus. Tai padėjo man išspręsti vamzdyno problemas daugeliu atvejų.
4 pav. Sijos konvejeris
Pasiekite mūsų duomenis naudodami „BigQuery“.
Taigi, jau turėtume turėti konvejerį, kuriame duomenys patenka į mūsų lentelę. Norėdami tai patikrinti, galime eiti į „BigQuery“ ir peržiūrėti duomenis. Panaudoję toliau pateiktą komandą, turėtumėte pamatyti kelias pirmąsias duomenų rinkinio eilutes. Dabar, kai turime BigQuery saugomus duomenis, galime atlikti tolesnę analizę, dalytis duomenimis su kolegomis ir pradėti atsakyti į verslo klausimus.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
5 pav. „BigQuery“.
išvada
Tikimės, kad šis įrašas bus naudingas srautinio duomenų perdavimo kanalo kūrimo ir būdų, kaip padaryti duomenis prieinamesnius, pavyzdys. Duomenų saugojimas šiuo formatu suteikia mums daug privalumų. Dabar galime pradėti atsakyti į svarbius klausimus, pvz., kiek žmonių naudojasi mūsų produktu? Ar laikui bėgant jūsų vartotojų bazė auga? Su kokiais produkto aspektais žmonės bendrauja dažniausiai? O ar yra klaidų ten, kur jų neturėtų būti? Tai klausimai, kurie bus įdomūs organizacijai. Remdamiesi įžvalgomis, gautomis iš atsakymų į šiuos klausimus, galime patobulinti produktą ir padidinti vartotojų įsitraukimą.
Beam yra tikrai naudinga tokio tipo pratimams ir turi daug kitų įdomių naudojimo atvejų. Pavyzdžiui, galbūt norėsite analizuoti akcijų kainų duomenis realiuoju laiku ir sudaryti sandorius remiantis analize, galbūt turite jutiklių duomenis iš transporto priemonių ir norite apskaičiuoti srauto lygio skaičiavimus. Taip pat galite, pavyzdžiui, būti žaidimų įmone, kuri renka naudotojų duomenis ir naudoja juos prietaisų skydeliams kurti pagrindinei metrikai stebėti. Gerai, ponai, tai yra kito įrašo tema, ačiū, kad skaitėte, o tiems, kurie nori pamatyti visą kodą, žemiau yra nuoroda į mano „GitHub“.
Tai viskas.
Šaltinis: www.habr.com