Kami membuat jalur pemrosesan data aliran. Bagian 2

Halo semua. Kami membagikan terjemahan bagian akhir artikel, yang disiapkan khusus untuk mahasiswa kursus. Insinyur Data. Anda dapat membaca bagian pertama di sini.

Apache Beam dan DataFlow untuk Saluran Pipa Real-Time

Kami membuat jalur pemrosesan data aliran. Bagian 2

Menyiapkan Google Cloud

Catatan: Saya menggunakan Google Cloud Shell untuk menjalankan pipeline dan memublikasikan data log kustom karena saya mengalami masalah dalam menjalankan pipeline dengan Python 3. Google Cloud Shell menggunakan Python 2, yang lebih konsisten dengan Apache Beam.

Untuk memulai pipeline, kita perlu menggali sedikit pengaturannya. Bagi Anda yang belum pernah menggunakan GCP, Anda harus mengikuti 6 langkah berikut yang diuraikan di sini halaman.

Setelah ini, kita perlu mengunggah skrip kita ke Google Cloud Storage dan menyalinnya ke Google Cloud Shel. Mengunggah ke penyimpanan cloud cukup sepele (deskripsi dapat ditemukan di sini). Untuk menyalin file kita, kita dapat membuka Google Cloud Shel dari toolbar dengan mengklik ikon pertama di sebelah kiri pada Gambar 2 di bawah.

Kami membuat jalur pemrosesan data aliran. Bagian 2
Gambar 2

Perintah yang kita perlukan untuk menyalin file dan menginstal perpustakaan yang diperlukan tercantum di bawah ini.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Membuat database dan tabel kami

Setelah kita menyelesaikan semua langkah terkait penyiapan, hal berikutnya yang perlu kita lakukan adalah membuat kumpulan data dan tabel di BigQuery. Ada beberapa cara untuk melakukan hal ini, namun yang paling sederhana adalah menggunakan Google Cloud Console dengan membuat set data terlebih dahulu. Anda dapat mengikuti langkah-langkah di bawah ini linkuntuk membuat tabel dengan skema. Meja kami akan memilikinya 7 kolom, sesuai dengan komponen setiap log pengguna. Untuk kenyamanan, kita akan mendefinisikan semua kolom sebagai string, kecuali variabel timelocal, dan menamainya sesuai dengan variabel yang kita buat sebelumnya. Tata letak tabel kita akan terlihat seperti pada Gambar 3.

Kami membuat jalur pemrosesan data aliran. Bagian 2
Gambar 3. Tata letak tabel

Menerbitkan data log pengguna

Pub/Sub adalah komponen penting dari pipeline kami karena memungkinkan beberapa aplikasi independen untuk berkomunikasi satu sama lain. Secara khusus, ini berfungsi sebagai perantara yang memungkinkan kita mengirim dan menerima pesan antar aplikasi. Hal pertama yang perlu kita lakukan adalah membuat topik. Cukup buka Pub/Sub di konsol dan klik BUAT TOPIK.

Kode di bawah ini memanggil skrip kita untuk menghasilkan data log yang ditentukan di atas dan kemudian menghubungkan dan mengirimkan log ke Pub/Sub. Satu-satunya hal yang perlu kita lakukan adalah membuat sebuah objek Klien Penerbit, tentukan jalur ke topik menggunakan metode ini topic_path dan panggil fungsinya publish с topic_path dan data. Harap dicatat bahwa kami mengimpor generate_log_line dari naskah kita stream_logs, jadi pastikan file-file ini berada di folder yang sama, jika tidak, Anda akan mendapatkan kesalahan impor. Kami kemudian dapat menjalankan ini melalui konsol Google kami menggunakan:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Segera setelah file dijalankan, kita akan dapat melihat output data log ke konsol, seperti yang ditunjukkan pada gambar di bawah ini. Script ini akan berfungsi selama kita tidak menggunakannya CTRL + Cuntuk menyelesaikannya.

Kami membuat jalur pemrosesan data aliran. Bagian 2
Gambar 4. Keluaran publish_logs.py

Menulis kode saluran kami

Sekarang setelah semuanya siap, kita bisa memulai bagian yang menyenangkan - mengkodekan pipeline kita menggunakan Beam dan Python. Untuk membuat pipeline Beam, kita perlu membuat objek pipeline (p). Setelah kita membuat objek pipeline, kita dapat menerapkan beberapa fungsi satu demi satu menggunakan operator pipe (|). Secara umum alur kerjanya seperti gambar di bawah ini.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Dalam kode kita, kita akan membuat dua fungsi khusus. Fungsi regex_clean, yang memindai data dan mengambil baris terkait berdasarkan daftar POLA menggunakan fungsi tersebut re.search. Fungsi ini mengembalikan string yang dipisahkan koma. Jika Anda bukan ahli ekspresi reguler, saya sarankan untuk memeriksanya tutorial dan berlatih di notepad untuk memeriksa kodenya. Setelah ini kita mendefinisikan fungsi ParDo khusus yang disebut Split, yang merupakan variasi dari Transformasi balok untuk pemrosesan paralel. Dalam Python, ini dilakukan dengan cara khusus - kita harus membuat kelas yang mewarisi kelas DoFn Beam. Fungsi Split mengambil baris yang diurai dari fungsi sebelumnya dan mengembalikan daftar kamus dengan kunci yang sesuai dengan nama kolom di tabel BigQuery kami. Ada sesuatu yang perlu diperhatikan tentang fungsi ini: Saya harus mengimpor datetime di dalam suatu fungsi untuk membuatnya berfungsi. Saya mendapatkan kesalahan impor di awal file, dan itu aneh. Daftar ini kemudian diteruskan ke fungsi TulisToBigQuery, yang hanya menambahkan data kita ke tabel. Kode untuk Tugas Batch DataFlow dan Tugas Streaming DataFlow diberikan di bawah ini. Satu-satunya perbedaan antara kode batch dan streaming adalah dalam batch kita membaca CSV-nya src_pathmenggunakan fungsi tersebut ReadFromText dari Balok.

Tugas Batch DataFlow (pemrosesan batch)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Tugas Streaming DataFlow (pemrosesan streaming)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Memulai konveyor

Kita dapat menjalankan pipeline dengan beberapa cara berbeda. Jika mau, kami dapat menjalankannya secara lokal dari terminal sambil login ke GCP dari jarak jauh.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Namun, kami akan menjalankannya menggunakan DataFlow. Kita dapat melakukan ini menggunakan perintah di bawah ini dengan mengatur parameter yang diperlukan berikut.

  • project β€” ID proyek GCP Anda.
  • runner adalah pipeline runner yang akan menganalisis program Anda dan membangun pipeline Anda. Untuk berjalan di cloud, Anda harus menentukan DataflowRunner.
  • staging_location β€” jalur ke penyimpanan cloud Cloud Dataflow untuk mengindeks paket kode yang diperlukan oleh prosesor yang melakukan pekerjaan.
  • temp_location β€” jalur ke penyimpanan cloud Cloud Dataflow untuk menyimpan file pekerjaan sementara yang dibuat saat pipeline sedang berjalan.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Saat perintah ini berjalan, kita bisa membuka tab DataFlow di konsol Google dan melihat pipeline kita. Saat kita mengklik pipeline, kita akan melihat sesuatu yang mirip dengan Gambar 4. Untuk tujuan debugging, akan sangat membantu jika kita membuka Logs lalu ke Stackdriver untuk melihat log detailnya. Ini telah membantu saya menyelesaikan masalah saluran pipa dalam beberapa kasus.

Kami membuat jalur pemrosesan data aliran. Bagian 2
Gambar 4: Konveyor balok

Akses data kami di BigQuery

Jadi, kita seharusnya sudah menjalankan pipeline dengan data yang mengalir ke tabel kita. Untuk mengujinya, kita bisa membuka BigQuery dan melihat datanya. Setelah menggunakan perintah di bawah ini Anda akan melihat beberapa baris pertama dari kumpulan data. Kini setelah data disimpan di BigQuery, kami dapat melakukan analisis lebih lanjut, serta berbagi data dengan rekan kerja dan mulai menjawab pertanyaan bisnis.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Kami membuat jalur pemrosesan data aliran. Bagian 2
Gambar 5: BigQuery

Kesimpulan

Kami berharap postingan ini dapat menjadi contoh berguna dalam membuat saluran data streaming, serta menemukan cara untuk membuat data lebih mudah diakses. Menyimpan data dalam format ini memberi kita banyak keuntungan. Sekarang kita bisa mulai menjawab pertanyaan penting seperti berapa banyak orang yang menggunakan produk kita? Apakah basis pengguna Anda bertambah seiring waktu? Aspek produk apa yang paling sering berinteraksi dengan orang-orang? Dan apakah ada kesalahan yang tidak seharusnya terjadi? Ini adalah pertanyaan-pertanyaan yang menarik bagi organisasi. Berdasarkan wawasan yang muncul dari jawaban atas pertanyaan-pertanyaan ini, kami dapat meningkatkan produk dan meningkatkan keterlibatan pengguna.

Beam sangat berguna untuk jenis latihan ini dan juga memiliki sejumlah kasus penggunaan menarik lainnya. Misalnya, Anda mungkin ingin menganalisis data tick saham secara real time dan melakukan perdagangan berdasarkan analisis tersebut, mungkin Anda memiliki data sensor yang berasal dari kendaraan dan ingin menghitung penghitungan tingkat lalu lintas. Anda juga bisa, misalnya, menjadi perusahaan game yang mengumpulkan data pengguna dan menggunakannya untuk membuat dasbor guna melacak metrik utama. Oke bapak-bapak, ini topik postingan lain, terima kasih sudah membaca, dan bagi yang ingin melihat kode lengkapnya, di bawah ini link GitHub saya.

https://github.com/DFoly/User_log_pipeline

Itu saja. Baca bagian satu.

Sumber: www.habr.com

Tambah komentar