Sveiki visiem. Dalāmies ar raksta beigu daļas tulkojumu, kas sagatavots īpaši kursa studentiem.
Apache Beam un DataFlow reāllaika cauruļvadiem
Google mākoņa iestatīšana
Piezīme. Es izmantoju Google Cloud Shell, lai palaistu konveijeru un publicētu pielāgotus žurnāla datus, jo man radās problēmas ar cauruļvada palaišanu programmā Python 3. Google Cloud Shell izmanto Python 2, kas vairāk atbilst Apache Beam.
Lai sāktu cauruļvadu, mums ir nedaudz jāiedziļinās iestatījumos. Tiem no jums, kuri iepriekš nav izmantojuši GSP, jums būs jāveic tālāk norādītās 6 darbības, kas aprakstītas šajā sadaļā
Pēc tam mums būs jāaugšupielādē savi skripti pakalpojumā Google Cloud Storage un jāpārkopē uz mūsu Google Cloud Shel. Augšupielāde mākoņkrātuvē ir diezgan triviāla (aprakstu var atrast
Skaitlis 2
Tālāk ir norādītas komandas, kas mums vajadzīgas, lai kopētu failus un instalētu nepieciešamās bibliotēkas.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Mūsu datu bāzes un tabulas izveide
Kad esam pabeiguši visas ar iestatīšanu saistītās darbības, nākamā lieta, kas mums jādara, ir BigQuery izveidot datu kopu un tabulu. Ir vairāki veidi, kā to izdarīt, taču vienkāršākais ir izmantot Google Cloud konsoli, vispirms izveidojot datu kopu. Varat veikt tālāk norādītās darbības
3. attēls. Tabulas izkārtojums
Lietotāju žurnāla datu publicēšana
Pub/Sub ir būtiska mūsu konveijera sastāvdaļa, jo tā ļauj vairākām neatkarīgām lietojumprogrammām sazināties savā starpā. Jo īpaši tas darbojas kā starpnieks, kas ļauj mums nosūtīt un saņemt ziņojumus starp lietojumprogrammām. Pirmā lieta, kas mums jādara, ir izveidot tēmu. Vienkārši dodieties uz Pub/Sub konsolē un noklikšķiniet uz IZVEIDOT TĒMU.
Tālāk norādītais kods izsauc mūsu skriptu, lai ģenerētu iepriekš definētos žurnāla datus, un pēc tam izveido savienojumu un nosūta žurnālus uz Pub/Sub. Vienīgais, kas mums jādara, ir izveidot objektu PublisherClient, norādiet ceļu uz tēmu, izmantojot metodi topic_path
un izsauciet funkciju publish
с topic_path
un dati. Lūdzu, ņemiet vērā, ka mēs importējam generate_log_line
no mūsu skripta stream_logs
, tāpēc pārliecinieties, vai šie faili atrodas vienā mapē, pretējā gadījumā tiks parādīta importēšanas kļūda. Pēc tam mēs to varam palaist, izmantojot mūsu Google konsoli, izmantojot:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Tiklīdz fails tiks palaists, mēs varēsim redzēt žurnāla datu izvadi konsolei, kā parādīts attēlā zemāk. Šis skripts darbosies tik ilgi, kamēr mēs to neizmantosim CTRL + Clai to pabeigtu.
4. attēls. Izvade publish_logs.py
Rakstām mūsu cauruļvada kodu
Tagad, kad viss ir sagatavots, varam sākt jautro daļu — mūsu konveijera kodēšanu, izmantojot Beam un Python. Lai izveidotu Beam cauruļvadu, mums ir jāizveido konveijera objekts (p). Kad esam izveidojuši konveijera objektu, mēs varam lietot vairākas funkcijas vienu pēc otras, izmantojot operatoru pipe (|)
. Kopumā darbplūsma izskatās kā attēlā zemāk.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Mūsu kodā mēs izveidosim divas pielāgotas funkcijas. Funkcija regex_clean
, kas skenē datus un izgūst atbilstošo rindu, pamatojoties uz RAKSTU sarakstu, izmantojot funkciju re.search
. Funkcija atgriež ar komatu atdalītu virkni. Ja neesat regulāro izteiksmju eksperts, iesaku to pārbaudīt datetime
funkcijā, lai tā darbotos. Faila sākumā es saņēmu importēšanas kļūdu, kas bija dīvaini. Pēc tam šis saraksts tiek nodots funkcijai WriteToBigQuery, kas vienkārši pievieno mūsu datus tabulai. Batch DataFlow Job un Streaming DataFlow Job kods ir norādīts tālāk. Vienīgā atšķirība starp pakešu un straumēšanas kodu ir tā, ka paketē mēs lasām CSV no src_path
izmantojot funkciju ReadFromText
no Beam.
Pakešu datu plūsmas darbs (pakešu apstrāde)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Straumēšanas DataFlow darbs (straumes apstrāde)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Konveijera palaišana
Mēs varam vadīt cauruļvadu vairākos dažādos veidos. Ja mēs vēlamies, mēs varētu to vienkārši palaist lokāli no termināļa, vienlaikus attālināti piesakoties GCP.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Tomēr mēs to darbosim, izmantojot DataFlow. Mēs to varam izdarīt, izmantojot tālāk norādīto komandu, iestatot šādus nepieciešamos parametrus.
project
— jūsu GCP projekta ID.runner
ir cauruļvadu palaišanas programma, kas analizēs jūsu programmu un izveidos jūsu cauruļvadu. Lai palaistu mākonī, ir jānorāda DataflowRunner.staging_location
— ceļš uz Cloud Dataflow mākoņkrātuvi, lai indeksētu koda pakotnes, kas nepieciešamas procesoriem, kas veic darbu.temp_location
— ceļš uz mākoņa datu plūsmas mākoņkrātuvi pagaidu darba failu glabāšanai, kas izveidoti, kamēr darbojas konveijers.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Kamēr šī komanda darbojas, mēs varam doties uz cilni DataFlow Google konsolē un skatīt mūsu konveijeru. Kad mēs noklikšķinām uz konveijera, mums vajadzētu redzēt kaut ko līdzīgu 4. attēlā. Atkļūdošanas nolūkos var būt ļoti noderīgi doties uz žurnālus un pēc tam uz Stackdriver, lai skatītu detalizētus žurnālus. Tas man vairākos gadījumos ir palīdzējis atrisināt cauruļvada problēmas.
4. attēls. Siju konveijers
Piekļūstiet mūsu datiem pakalpojumā BigQuery
Tātad mums jau vajadzētu darboties konveijeram, kurā dati ieplūst mūsu tabulā. Lai to pārbaudītu, mēs varam doties uz BigQuery un apskatīt datus. Pēc tālāk norādītās komandas izmantošanas jums vajadzētu redzēt dažas pirmās datu kopas rindas. Tagad, kad BigQuery ir glabāti dati, mēs varam veikt turpmāku analīzi, kā arī kopīgot datus ar kolēģiem un sākt atbildēt uz biznesa jautājumiem.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
5. attēls: BigQuery
Secinājums
Mēs ceram, ka šī ziņa kalpos kā noderīgs piemērs straumēšanas datu cauruļvada izveidei, kā arī veidu atrašanai, kā padarīt datus pieejamākus. Datu glabāšana šajā formātā sniedz mums daudzas priekšrocības. Tagad mēs varam sākt atbildēt uz svarīgiem jautājumiem, piemēram, cik cilvēku izmanto mūsu produktu? Vai jūsu lietotāju bāze laika gaitā pieaug? Ar kādiem produkta aspektiem cilvēki mijiedarbojas visvairāk? Un vai ir kļūdas tur, kur tām nevajadzētu būt? Šie ir jautājumi, kas interesēs organizāciju. Pamatojoties uz ieskatiem, kas izriet no atbildēm uz šiem jautājumiem, mēs varam uzlabot produktu un palielināt lietotāju iesaisti.
Beam ir patiešām noderīgs šāda veida vingrinājumiem, un tam ir arī vairāki citi interesanti lietošanas gadījumi. Piemēram, iespējams, vēlēsities analizēt akciju zīmju datus reāllaikā un veikt darījumus, pamatojoties uz analīzi, iespējams, jums ir sensoru dati, kas nāk no transportlīdzekļiem un vēlaties aprēķināt satiksmes līmeņa aprēķinus. Varat arī, piemēram, būt spēļu uzņēmums, kas apkopo lietotāju datus un izmanto tos, lai izveidotu informācijas paneļus, lai izsekotu galvenos rādītājus. Labi, kungi, šī ir tēma citam ierakstam, paldies, ka izlasījāt, un tiem, kas vēlas redzēt pilnu kodu, zemāk ir saite uz manu GitHub.
Tas ir viss.
Avots: www.habr.com