Hai semua. Kami berkongsi terjemahan bahagian akhir artikel, yang disediakan khusus untuk pelajar kursus.
Apache Beam dan DataFlow untuk Talian Paip Masa Nyata
Menyediakan Google Cloud
Nota: Saya menggunakan Google Cloud Shell untuk menjalankan saluran paip dan menerbitkan data log tersuai kerana saya menghadapi masalah menjalankan saluran paip dalam Python 3. Google Cloud Shell menggunakan Python 2, yang lebih konsisten dengan Apache Beam.
Untuk memulakan saluran paip, kita perlu menggali sedikit tetapan. Bagi anda yang belum pernah menggunakan GCP sebelum ini, anda perlu mengikuti 6 langkah berikut yang digariskan dalam perkara ini
Selepas ini, kami perlu memuat naik skrip kami ke Storan Awan Google dan menyalinnya ke Shel Awan Google kami. Memuat naik ke storan awan agak remeh (huraian boleh didapati
Rajah 2
Perintah yang kami perlukan untuk menyalin fail dan memasang perpustakaan yang diperlukan disenaraikan di bawah.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Mencipta pangkalan data dan jadual kami
Setelah kami menyelesaikan semua langkah berkaitan persediaan, perkara seterusnya yang perlu kami lakukan ialah membuat set data dan jadual dalam BigQuery. Terdapat beberapa cara untuk melakukan ini, tetapi yang paling mudah ialah menggunakan konsol Google Cloud dengan membuat set data terlebih dahulu. Anda boleh ikuti langkah di bawah
Rajah 3. Susun atur meja
Menerbitkan data log pengguna
Pub/Sub ialah komponen penting dalam saluran paip kami kerana ia membenarkan berbilang aplikasi bebas untuk berkomunikasi antara satu sama lain. Khususnya, ia berfungsi sebagai perantara yang membolehkan kami menghantar dan menerima mesej antara aplikasi. Perkara pertama yang perlu kita lakukan ialah mencipta topik. Hanya pergi ke Pub/Sub dalam konsol dan klik CREATE TOPIC.
Kod di bawah memanggil skrip kami untuk menjana data log yang ditakrifkan di atas dan kemudian menyambung dan menghantar log ke Pub/Sub. Satu-satunya perkara yang perlu kita lakukan ialah mencipta objek PenerbitKlien, nyatakan laluan ke topik menggunakan kaedah topic_path
dan panggil fungsi publish
Ρ topic_path
dan data. Sila ambil perhatian bahawa kami mengimport generate_log_line
daripada skrip kami stream_logs
, jadi pastikan fail ini berada dalam folder yang sama, jika tidak, anda akan mendapat ralat import. Kami kemudiannya boleh menjalankan ini melalui konsol google kami menggunakan:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Sebaik sahaja fail dijalankan, kami akan dapat melihat output data log ke konsol, seperti yang ditunjukkan dalam rajah di bawah. Skrip ini akan berfungsi selagi kita tidak menggunakannya CTRL + Cuntuk melengkapkannya.
Rajah 4. Output publish_logs.py
Menulis kod saluran paip kami
Memandangkan kami telah menyediakan segala-galanya, kami boleh memulakan bahagian yang menyeronokkan - mengekodkan saluran paip kami menggunakan Beam dan Python. Untuk mencipta saluran paip Beam, kita perlu mencipta objek saluran paip (p). Sebaik sahaja kami telah mencipta objek saluran paip, kami boleh menggunakan berbilang fungsi satu demi satu menggunakan operator pipe (|)
. Secara umum, aliran kerja kelihatan seperti imej di bawah.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Dalam kod kami, kami akan mencipta dua fungsi tersuai. Fungsi regex_clean
, yang mengimbas data dan mendapatkan semula baris yang sepadan berdasarkan senarai PATTERNS menggunakan fungsi re.search
. Fungsi ini mengembalikan rentetan dipisahkan koma. Jika anda bukan pakar ekspresi biasa, saya syorkan anda menyemaknya datetime
di dalam fungsi untuk menjadikannya berfungsi. Saya mendapat ralat import pada permulaan fail, yang pelik. Senarai ini kemudiannya dihantar ke fungsi WriteToBigQuery, yang hanya menambah data kami pada jadual. Kod untuk Batch DataFlow Job dan Streaming DataFlow Job diberikan di bawah. Satu-satunya perbezaan antara kumpulan dan kod penstriman ialah dalam kumpulan kami membaca CSV daripada src_path
menggunakan fungsi tersebut ReadFromText
daripada Beam.
Kerja DataFlow Kelompok (pemprosesan kelompok)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Penstriman DataFlow Job (pemprosesan strim)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Memulakan penghantar
Kita boleh menjalankan saluran paip dalam beberapa cara yang berbeza. Jika kami mahu, kami hanya boleh menjalankannya secara setempat daripada terminal semasa log masuk ke GCP dari jauh.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Walau bagaimanapun, kami akan menjalankannya menggunakan DataFlow. Kita boleh melakukan ini menggunakan arahan di bawah dengan menetapkan parameter yang diperlukan berikut.
project
β ID projek GCP anda.runner
ialah pelari saluran paip yang akan menganalisis program anda dan membina saluran paip anda. Untuk menjalankan dalam awan, anda mesti menentukan DataflowRunner.staging_location
β laluan ke storan awan Cloud Dataflow untuk mengindeks pakej kod yang diperlukan oleh pemproses yang melaksanakan kerja.temp_location
β laluan ke storan awan Cloud Dataflow untuk menyimpan fail kerja sementara yang dibuat semasa saluran paip sedang berjalan.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Semasa arahan ini dijalankan, kita boleh pergi ke tab DataFlow dalam konsol google dan melihat saluran paip kami. Apabila kita mengklik pada saluran paip, kita akan melihat sesuatu yang serupa dengan Rajah 4. Untuk tujuan penyahpepijatan, pergi ke Log dan kemudian ke Stackdriver untuk melihat log terperinci boleh membantu. Ini telah membantu saya menyelesaikan isu saluran paip dalam beberapa kes.
Rajah 4: Penghantar rasuk
Akses data kami dalam BigQuery
Jadi, kita sepatutnya sudah mempunyai saluran paip yang berjalan dengan data mengalir ke dalam jadual kita. Untuk menguji ini, kita boleh pergi ke BigQuery dan melihat data. Selepas menggunakan arahan di bawah, anda akan melihat beberapa baris pertama set data. Memandangkan kami mempunyai data yang disimpan dalam BigQuery, kami boleh menjalankan analisis lanjut serta berkongsi data dengan rakan sekerja dan mula menjawab soalan perniagaan.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Rajah 5: BigQuery
Kesimpulan
Kami berharap siaran ini berfungsi sebagai contoh berguna untuk membuat saluran paip data penstriman, serta mencari cara untuk menjadikan data lebih mudah diakses. Menyimpan data dalam format ini memberi kita banyak kelebihan. Sekarang kita boleh mula menjawab soalan penting seperti berapa ramai orang menggunakan produk kita? Adakah pangkalan pengguna anda berkembang dari semasa ke semasa? Apakah aspek produk yang paling banyak berinteraksi dengan orang ramai? Dan adakah terdapat ralat yang tidak sepatutnya berlaku? Ini adalah soalan-soalan yang akan menarik minat organisasi. Berdasarkan cerapan yang muncul daripada jawapan kepada soalan ini, kami boleh menambah baik produk dan meningkatkan penglibatan pengguna.
Beam sangat berguna untuk jenis senaman ini dan mempunyai beberapa kes penggunaan lain yang menarik juga. Contohnya, anda mungkin ingin menganalisis data tanda saham dalam masa nyata dan membuat dagangan berdasarkan analisis, mungkin anda mempunyai data penderia yang datang daripada kenderaan dan ingin mengira pengiraan tahap trafik. Anda juga boleh, sebagai contoh, menjadi syarikat permainan yang mengumpul data pengguna dan menggunakannya untuk membuat papan pemuka untuk menjejaki metrik utama. Baiklah, tuan-tuan, ini adalah topik untuk jawatan lain, terima kasih kerana membaca, dan bagi mereka yang ingin melihat kod penuh, di bawah adalah pautan ke GitHub saya.
Itu sahaja.
Sumber: www.habr.com