Halo semua. Kami membagikan terjemahan bagian akhir artikel, yang disiapkan khusus untuk mahasiswa kursus.
Apache Beam dan DataFlow untuk Saluran Pipa Real-Time
Menyiapkan Google Cloud
Catatan: Saya menggunakan Google Cloud Shell untuk menjalankan pipeline dan memublikasikan data log kustom karena saya mengalami masalah dalam menjalankan pipeline dengan Python 3. Google Cloud Shell menggunakan Python 2, yang lebih konsisten dengan Apache Beam.
Untuk memulai pipeline, kita perlu menggali sedikit pengaturannya. Bagi Anda yang belum pernah menggunakan GCP, Anda harus mengikuti 6 langkah berikut yang diuraikan di sini
Setelah ini, kita perlu mengunggah skrip kita ke Google Cloud Storage dan menyalinnya ke Google Cloud Shel. Mengunggah ke penyimpanan cloud cukup sepele (deskripsi dapat ditemukan
Gambar 2
Perintah yang kita perlukan untuk menyalin file dan menginstal perpustakaan yang diperlukan tercantum di bawah ini.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Membuat database dan tabel kami
Setelah kita menyelesaikan semua langkah terkait penyiapan, hal berikutnya yang perlu kita lakukan adalah membuat kumpulan data dan tabel di BigQuery. Ada beberapa cara untuk melakukan hal ini, namun yang paling sederhana adalah menggunakan Google Cloud Console dengan membuat set data terlebih dahulu. Anda dapat mengikuti langkah-langkah di bawah ini
Gambar 3. Tata letak tabel
Menerbitkan data log pengguna
Pub/Sub adalah komponen penting dari pipeline kami karena memungkinkan beberapa aplikasi independen untuk berkomunikasi satu sama lain. Secara khusus, ini berfungsi sebagai perantara yang memungkinkan kita mengirim dan menerima pesan antar aplikasi. Hal pertama yang perlu kita lakukan adalah membuat topik. Cukup buka Pub/Sub di konsol dan klik BUAT TOPIK.
Kode di bawah ini memanggil skrip kita untuk menghasilkan data log yang ditentukan di atas dan kemudian menghubungkan dan mengirimkan log ke Pub/Sub. Satu-satunya hal yang perlu kita lakukan adalah membuat sebuah objek Klien Penerbit, tentukan jalur ke topik menggunakan metode ini topic_path
dan panggil fungsinya publish
Ρ topic_path
dan data. Harap dicatat bahwa kami mengimpor generate_log_line
dari naskah kita stream_logs
, jadi pastikan file-file ini berada di folder yang sama, jika tidak, Anda akan mendapatkan kesalahan impor. Kami kemudian dapat menjalankan ini melalui konsol Google kami menggunakan:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Segera setelah file dijalankan, kita akan dapat melihat output data log ke konsol, seperti yang ditunjukkan pada gambar di bawah ini. Script ini akan berfungsi selama kita tidak menggunakannya CTRL + Cuntuk menyelesaikannya.
Gambar 4. Keluaran publish_logs.py
Menulis kode saluran kami
Sekarang setelah semuanya siap, kita bisa memulai bagian yang menyenangkan - mengkodekan pipeline kita menggunakan Beam dan Python. Untuk membuat pipeline Beam, kita perlu membuat objek pipeline (p). Setelah kita membuat objek pipeline, kita dapat menerapkan beberapa fungsi satu demi satu menggunakan operator pipe (|)
. Secara umum alur kerjanya seperti gambar di bawah ini.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Dalam kode kita, kita akan membuat dua fungsi khusus. Fungsi regex_clean
, yang memindai data dan mengambil baris terkait berdasarkan daftar POLA menggunakan fungsi tersebut re.search
. Fungsi ini mengembalikan string yang dipisahkan koma. Jika Anda bukan ahli ekspresi reguler, saya sarankan untuk memeriksanya datetime
di dalam suatu fungsi untuk membuatnya berfungsi. Saya mendapatkan kesalahan impor di awal file, dan itu aneh. Daftar ini kemudian diteruskan ke fungsi TulisToBigQuery, yang hanya menambahkan data kita ke tabel. Kode untuk Tugas Batch DataFlow dan Tugas Streaming DataFlow diberikan di bawah ini. Satu-satunya perbedaan antara kode batch dan streaming adalah dalam batch kita membaca CSV-nya src_path
menggunakan fungsi tersebut ReadFromText
dari Balok.
Tugas Batch DataFlow (pemrosesan batch)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Tugas Streaming DataFlow (pemrosesan streaming)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Memulai konveyor
Kita dapat menjalankan pipeline dengan beberapa cara berbeda. Jika mau, kami dapat menjalankannya secara lokal dari terminal sambil login ke GCP dari jarak jauh.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Namun, kami akan menjalankannya menggunakan DataFlow. Kita dapat melakukan ini menggunakan perintah di bawah ini dengan mengatur parameter yang diperlukan berikut.
project
β ID proyek GCP Anda.runner
adalah pipeline runner yang akan menganalisis program Anda dan membangun pipeline Anda. Untuk berjalan di cloud, Anda harus menentukan DataflowRunner.staging_location
β jalur ke penyimpanan cloud Cloud Dataflow untuk mengindeks paket kode yang diperlukan oleh prosesor yang melakukan pekerjaan.temp_location
β jalur ke penyimpanan cloud Cloud Dataflow untuk menyimpan file pekerjaan sementara yang dibuat saat pipeline sedang berjalan.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Saat perintah ini berjalan, kita bisa membuka tab DataFlow di konsol Google dan melihat pipeline kita. Saat kita mengklik pipeline, kita akan melihat sesuatu yang mirip dengan Gambar 4. Untuk tujuan debugging, akan sangat membantu jika kita membuka Logs lalu ke Stackdriver untuk melihat log detailnya. Ini telah membantu saya menyelesaikan masalah saluran pipa dalam beberapa kasus.
Gambar 4: Konveyor balok
Akses data kami di BigQuery
Jadi, kita seharusnya sudah menjalankan pipeline dengan data yang mengalir ke tabel kita. Untuk mengujinya, kita bisa membuka BigQuery dan melihat datanya. Setelah menggunakan perintah di bawah ini Anda akan melihat beberapa baris pertama dari kumpulan data. Kini setelah data disimpan di BigQuery, kami dapat melakukan analisis lebih lanjut, serta berbagi data dengan rekan kerja dan mulai menjawab pertanyaan bisnis.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Gambar 5: BigQuery
Kesimpulan
Kami berharap postingan ini dapat menjadi contoh berguna dalam membuat saluran data streaming, serta menemukan cara untuk membuat data lebih mudah diakses. Menyimpan data dalam format ini memberi kita banyak keuntungan. Sekarang kita bisa mulai menjawab pertanyaan penting seperti berapa banyak orang yang menggunakan produk kita? Apakah basis pengguna Anda bertambah seiring waktu? Aspek produk apa yang paling sering berinteraksi dengan orang-orang? Dan apakah ada kesalahan yang tidak seharusnya terjadi? Ini adalah pertanyaan-pertanyaan yang menarik bagi organisasi. Berdasarkan wawasan yang muncul dari jawaban atas pertanyaan-pertanyaan ini, kami dapat meningkatkan produk dan meningkatkan keterlibatan pengguna.
Beam sangat berguna untuk jenis latihan ini dan juga memiliki sejumlah kasus penggunaan menarik lainnya. Misalnya, Anda mungkin ingin menganalisis data tick saham secara real time dan melakukan perdagangan berdasarkan analisis tersebut, mungkin Anda memiliki data sensor yang berasal dari kendaraan dan ingin menghitung penghitungan tingkat lalu lintas. Anda juga bisa, misalnya, menjadi perusahaan game yang mengumpulkan data pengguna dan menggunakannya untuk membuat dasbor guna melacak metrik utama. Oke bapak-bapak, ini topik postingan lain, terima kasih sudah membaca, dan bagi yang ingin melihat kode lengkapnya, di bawah ini link GitHub saya.
Itu saja.
Sumber: www.habr.com