Halo kabeh. Kita nuduhake terjemahan bagean pungkasan artikel, sing disiapake khusus kanggo siswa kursus kasebut.
Apache Beam lan DataFlow kanggo Pipeline Wektu Nyata
Nggawe Google Cloud
Cathetan: Aku nggunakake Google Cloud Shell kanggo nglakokake pipa lan nerbitake data log khusus amarga aku ngalami masalah nglakokake pipa ing Python 3. Google Cloud Shell nggunakake Python 2, sing luwih konsisten karo Apache Beam.
Kanggo miwiti pipa, kita kudu digali sethithik menyang setelan kasebut. Kanggo sampeyan sing durung nggunakake GCP sadurunge, sampeyan kudu ngetutake 6 langkah ing ngisor iki
Sawise iki, kita kudu ngunggah skrip menyang Google Cloud Storage lan nyalin menyang Google Cloud Shel. Ngunggah menyang panyimpenan maya cukup sepele (deskripsi bisa ditemokake
Gambar 2
Printah sing dibutuhake kanggo nyalin file lan nginstal perpustakaan sing dibutuhake kapacak ing ngisor iki.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Nggawe database lan tabel kita
Sawise kita wis ngrampungake kabeh langkah-langkah sing ana gandhengane karo persiyapan, sabanjure sing kudu ditindakake yaiku nggawe set data lan tabel ing BigQuery. Ana sawetara cara kanggo nindakake iki, nanging sing paling gampang yaiku nggunakake konsol Google Cloud kanthi nggawe set data dhisik. Sampeyan bisa tindakake langkah ing ngisor iki
Gambar 3. Tata meja
Nerbitake data log pangguna
Pub/Sub minangka komponèn kritis saka pipeline kita amarga ngidini sawetara aplikasi independen kanggo komunikasi karo saben liyane. Utamane, kerjane minangka perantara sing ngidini kita ngirim lan nampa pesen ing antarane aplikasi. Babagan pisanan sing kudu kita lakoni yaiku nggawe topik. Cukup menyang Pub / Sub ing konsol banjur klik GIPTA TOPIK.
Kode ing ngisor iki nelpon skrip kita kanggo ngasilake data log sing ditetepake ing ndhuwur banjur nyambung lan ngirim log menyang Pub/Sub. Siji-sijine sing kudu ditindakake yaiku nggawe obyek Klien Penerbit, nemtokake path menyang topik nggunakake cara topic_path
lan nelpon fungsi publish
Ρ topic_path
lan data. Elinga yen kita ngimpor generate_log_line
saka naskah kita stream_logs
, dadi priksa manawa file kasebut ana ing folder sing padha, yen ora, sampeyan bakal entuk kesalahan ngimpor. Kita banjur bisa mbukak iki liwat google console nggunakake:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Sanalika file mbukak, kita bakal bisa ndeleng output data log menyang console, minangka ditampilake ing tokoh ngisor. Skrip iki bakal bisa digunakake anggere kita ora nggunakake CTRL + Ckanggo ngrampungake.
Gambar 4. Output publish_logs.py
Nulis kode pipa kita
Saiki kita wis nyiapake kabeh, kita bisa miwiti bagean sing nyenengake - ngode pipa nggunakake Beam lan Python. Kanggo nggawe pipa Beam, kita kudu nggawe obyek pipa (p). Sawise nggawe obyek pipa, kita bisa ngetrapake pirang-pirang fungsi kanthi nggunakake operator pipe (|)
. UmumΓ©, alur kerja katon kaya gambar ing ngisor iki.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Ing kode kita, kita bakal nggawe rong fungsi khusus. Fungsi regex_clean
, sing mindhai data lan njupuk baris sing cocog adhedhasar dhaptar POLA nggunakake fungsi re.search
. Fungsi kasebut ngasilake string sing dipisahake koma. Yen sampeyan dudu pakar ekspresi biasa, aku nyaranake mriksa iki datetime
nang fungsi kanggo nggawe iku bisa. Aku entuk kesalahan impor ing wiwitan file, sing aneh. Dhaptar iki banjur diterusake menyang fungsi kasebut WriteToBigQuery, sing mung nambahake data kita menyang tabel. Kode kanggo Batch DataFlow Job lan Streaming DataFlow Job diwenehi ing ngisor iki. Bentenipun mung antarane kumpulan lan kode streaming iku ing kumpulan kita maca CSV saka src_path
nggunakake fungsi ReadFromText
saka Beam.
Batch DataFlow Job (proses batch)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Streaming DataFlow Job (stream processing)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Miwiti conveyor
Kita bisa mbukak pipa kanthi macem-macem cara. Yen pengin, kita mung bisa mbukak kanthi lokal saka terminal nalika mlebu menyang GCP saka jarak jauh.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Nanging, kita bakal mbukak nggunakake DataFlow. Kita bisa nindakake iki nggunakake printah ing ngisor iki kanthi nyetel paramèter sing dibutuhake ing ngisor iki.
project
- ID proyek GCP sampeyan.runner
minangka pelari pipa sing bakal nganalisa program sampeyan lan nggawe pipa sampeyan. Kanggo mbukak ing mΓ©ga, sampeyan kudu nemtokake DataflowRunner.staging_location
- dalan menyang panyimpenan maya Cloud Dataflow kanggo ngindeks paket kode sing dibutuhake dening prosesor sing nindakake karya kasebut.temp_location
- path menyang panyimpenan maya Cloud Dataflow kanggo nyimpen file proyek sementara sing digawe nalika saluran pipa mlaku.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Nalika printah iki mlaku, kita bisa pindhah menyang tab DataFlow ing google console lan ndeleng pipeline kita. Nalika kita ngeklik pipa, kita kudu ndeleng sing padha karo Figure 4. Kanggo tujuan debugging, bisa migunani banget kanggo pindhah menyang Log banjur menyang Stackdriver kanggo ndeleng log sing rinci. Iki wis mbantu ngatasi masalah pipa ing sawetara kasus.
Gambar 4: Beam conveyor
Ngakses data kita ing BigQuery
Dadi, kita kudu duwe pipa sing mlaku kanthi data sing mili menyang tabel. Kanggo nguji iki, kita bisa pindhah menyang BigQuery lan ndeleng data kasebut. Sawise nggunakake printah ing ngisor iki sampeyan kudu ndeleng sawetara baris pisanan saka dataset. Saiki kita duwe data sing disimpen ing BigQuery, kita bisa nindakake analisis luwih lanjut, uga nuduhake data karo kolega lan miwiti mangsuli pitakon bisnis.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Gambar 5: BigQuery
kesimpulan
Muga-muga kiriman iki bisa dadi conto sing migunani kanggo nggawe saluran pipa data streaming, uga golek cara supaya data luwih gampang diakses. Nyimpen data ing format iki menehi akeh kaluwihan. Saiki kita bisa miwiti mangsuli pitakonan penting kaya carane akeh wong nggunakake produk kita? Apa basis pangguna sampeyan saya suwe saya suwe? Apa aspek produk sing paling disenengi wong? Lan ana kesalahan sing ora kudu ana? Iki minangka pitakonan sing bakal dadi kapentingan kanggo organisasi. Adhedhasar wawasan sing muncul saka jawaban kanggo pitakonan kasebut, kita bisa nambah produk lan nambah keterlibatan pangguna.
Beam pancen migunani kanggo jinis olahraga iki lan uga duwe sawetara kasus panggunaan liyane sing menarik. Contone, sampeyan bisa uga pengin nganalisis data obah saham ing wektu nyata lan nggawe perdagangan adhedhasar analisis, mbok menawa sampeyan duwe data sensor teka saka kendaraan lan pengin ngetung petungan tingkat lalu lintas. Sampeyan uga bisa, contone, dadi perusahaan game sing ngumpulake data pangguna lan digunakake kanggo nggawe dashboard kanggo trek metrik tombol. Oke, Pak, iki topik kanggo kirim liyane, matur nuwun kanggo maca, lan kanggo sing pengin ndeleng kode lengkap, ing ngisor iki link menyang GitHub.
Iku pancen kabeh.
Source: www.habr.com