Kami mencipta saluran pemprosesan data aliran. Bahagian 2

Hai semua. Kami berkongsi terjemahan bahagian akhir artikel, yang disediakan khusus untuk pelajar kursus. Jurutera Data. Anda boleh membaca bahagian pertama di sini.

Apache Beam dan DataFlow untuk Talian Paip Masa Nyata

Kami mencipta saluran pemprosesan data aliran. Bahagian 2

Menyediakan Google Cloud

Nota: Saya menggunakan Google Cloud Shell untuk menjalankan saluran paip dan menerbitkan data log tersuai kerana saya menghadapi masalah menjalankan saluran paip dalam Python 3. Google Cloud Shell menggunakan Python 2, yang lebih konsisten dengan Apache Beam.

Untuk memulakan saluran paip, kita perlu menggali sedikit tetapan. Bagi anda yang belum pernah menggunakan GCP sebelum ini, anda perlu mengikuti 6 langkah berikut yang digariskan dalam perkara ini Laman.

Selepas ini, kami perlu memuat naik skrip kami ke Storan Awan Google dan menyalinnya ke Shel Awan Google kami. Memuat naik ke storan awan agak remeh (huraian boleh didapati di sini). Untuk menyalin fail kami, kami boleh membuka Google Cloud Shel dari bar alat dengan mengklik ikon pertama di sebelah kiri dalam Rajah 2 di bawah.

Kami mencipta saluran pemprosesan data aliran. Bahagian 2
Rajah 2

Perintah yang kami perlukan untuk menyalin fail dan memasang perpustakaan yang diperlukan disenaraikan di bawah.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Mencipta pangkalan data dan jadual kami

Setelah kami menyelesaikan semua langkah berkaitan persediaan, perkara seterusnya yang perlu kami lakukan ialah membuat set data dan jadual dalam BigQuery. Terdapat beberapa cara untuk melakukan ini, tetapi yang paling mudah ialah menggunakan konsol Google Cloud dengan membuat set data terlebih dahulu. Anda boleh ikuti langkah di bawah pautanuntuk membuat jadual dengan skema. Meja kami akan ada 7 lajur, sepadan dengan komponen setiap log pengguna. Untuk kemudahan, kami akan mentakrifkan semua lajur sebagai rentetan, kecuali pembolehubah timelocal, dan namakannya mengikut pembolehubah yang kami hasilkan sebelum ini. Susun atur jadual kami sepatutnya kelihatan seperti dalam Rajah 3.

Kami mencipta saluran pemprosesan data aliran. Bahagian 2
Rajah 3. Susun atur meja

Menerbitkan data log pengguna

Pub/Sub ialah komponen penting dalam saluran paip kami kerana ia membenarkan berbilang aplikasi bebas untuk berkomunikasi antara satu sama lain. Khususnya, ia berfungsi sebagai perantara yang membolehkan kami menghantar dan menerima mesej antara aplikasi. Perkara pertama yang perlu kita lakukan ialah mencipta topik. Hanya pergi ke Pub/Sub dalam konsol dan klik CREATE TOPIC.

Kod di bawah memanggil skrip kami untuk menjana data log yang ditakrifkan di atas dan kemudian menyambung dan menghantar log ke Pub/Sub. Satu-satunya perkara yang perlu kita lakukan ialah mencipta objek PenerbitKlien, nyatakan laluan ke topik menggunakan kaedah topic_path dan panggil fungsi publish с topic_path dan data. Sila ambil perhatian bahawa kami mengimport generate_log_line daripada skrip kami stream_logs, jadi pastikan fail ini berada dalam folder yang sama, jika tidak, anda akan mendapat ralat import. Kami kemudiannya boleh menjalankan ini melalui konsol google kami menggunakan:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Sebaik sahaja fail dijalankan, kami akan dapat melihat output data log ke konsol, seperti yang ditunjukkan dalam rajah di bawah. Skrip ini akan berfungsi selagi kita tidak menggunakannya CTRL + Cuntuk melengkapkannya.

Kami mencipta saluran pemprosesan data aliran. Bahagian 2
Rajah 4. Output publish_logs.py

Menulis kod saluran paip kami

Memandangkan kami telah menyediakan segala-galanya, kami boleh memulakan bahagian yang menyeronokkan - mengekodkan saluran paip kami menggunakan Beam dan Python. Untuk mencipta saluran paip Beam, kita perlu mencipta objek saluran paip (p). Sebaik sahaja kami telah mencipta objek saluran paip, kami boleh menggunakan berbilang fungsi satu demi satu menggunakan operator pipe (|). Secara umum, aliran kerja kelihatan seperti imej di bawah.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Dalam kod kami, kami akan mencipta dua fungsi tersuai. Fungsi regex_clean, yang mengimbas data dan mendapatkan semula baris yang sepadan berdasarkan senarai PATTERNS menggunakan fungsi re.search. Fungsi ini mengembalikan rentetan dipisahkan koma. Jika anda bukan pakar ekspresi biasa, saya syorkan anda menyemaknya tutorial dan berlatih dalam pad nota untuk menyemak kod. Selepas ini kami mentakrifkan fungsi ParDo tersuai yang dipanggil Split, yang merupakan variasi transformasi Rasuk untuk pemprosesan selari. Dalam Python, ini dilakukan dengan cara yang istimewa - kita mesti mencipta kelas yang mewarisi daripada kelas DoFn Beam. Fungsi Split mengambil baris yang dihuraikan daripada fungsi sebelumnya dan mengembalikan senarai kamus dengan kunci yang sepadan dengan nama lajur dalam jadual BigQuery kami. Terdapat sesuatu yang perlu diperhatikan tentang fungsi ini: Saya terpaksa mengimport datetime di dalam fungsi untuk menjadikannya berfungsi. Saya mendapat ralat import pada permulaan fail, yang pelik. Senarai ini kemudiannya dihantar ke fungsi WriteToBigQuery, yang hanya menambah data kami pada jadual. Kod untuk Batch DataFlow Job dan Streaming DataFlow Job diberikan di bawah. Satu-satunya perbezaan antara kumpulan dan kod penstriman ialah dalam kumpulan kami membaca CSV daripada src_pathmenggunakan fungsi tersebut ReadFromText daripada Beam.

Kerja DataFlow Kelompok (pemprosesan kelompok)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Penstriman DataFlow Job (pemprosesan strim)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Memulakan penghantar

Kita boleh menjalankan saluran paip dalam beberapa cara yang berbeza. Jika kami mahu, kami hanya boleh menjalankannya secara setempat daripada terminal semasa log masuk ke GCP dari jauh.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Walau bagaimanapun, kami akan menjalankannya menggunakan DataFlow. Kita boleh melakukan ini menggunakan arahan di bawah dengan menetapkan parameter yang diperlukan berikut.

  • project β€” ID projek GCP anda.
  • runner ialah pelari saluran paip yang akan menganalisis program anda dan membina saluran paip anda. Untuk menjalankan dalam awan, anda mesti menentukan DataflowRunner.
  • staging_location β€” laluan ke storan awan Cloud Dataflow untuk mengindeks pakej kod yang diperlukan oleh pemproses yang melaksanakan kerja.
  • temp_location β€” laluan ke storan awan Cloud Dataflow untuk menyimpan fail kerja sementara yang dibuat semasa saluran paip sedang berjalan.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Semasa arahan ini dijalankan, kita boleh pergi ke tab DataFlow dalam konsol google dan melihat saluran paip kami. Apabila kita mengklik pada saluran paip, kita akan melihat sesuatu yang serupa dengan Rajah 4. Untuk tujuan penyahpepijatan, pergi ke Log dan kemudian ke Stackdriver untuk melihat log terperinci boleh membantu. Ini telah membantu saya menyelesaikan isu saluran paip dalam beberapa kes.

Kami mencipta saluran pemprosesan data aliran. Bahagian 2
Rajah 4: Penghantar rasuk

Akses data kami dalam BigQuery

Jadi, kita sepatutnya sudah mempunyai saluran paip yang berjalan dengan data mengalir ke dalam jadual kita. Untuk menguji ini, kita boleh pergi ke BigQuery dan melihat data. Selepas menggunakan arahan di bawah, anda akan melihat beberapa baris pertama set data. Memandangkan kami mempunyai data yang disimpan dalam BigQuery, kami boleh menjalankan analisis lanjut serta berkongsi data dengan rakan sekerja dan mula menjawab soalan perniagaan.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Kami mencipta saluran pemprosesan data aliran. Bahagian 2
Rajah 5: BigQuery

Kesimpulan

Kami berharap siaran ini berfungsi sebagai contoh berguna untuk membuat saluran paip data penstriman, serta mencari cara untuk menjadikan data lebih mudah diakses. Menyimpan data dalam format ini memberi kita banyak kelebihan. Sekarang kita boleh mula menjawab soalan penting seperti berapa ramai orang menggunakan produk kita? Adakah pangkalan pengguna anda berkembang dari semasa ke semasa? Apakah aspek produk yang paling banyak berinteraksi dengan orang ramai? Dan adakah terdapat ralat yang tidak sepatutnya berlaku? Ini adalah soalan-soalan yang akan menarik minat organisasi. Berdasarkan cerapan yang muncul daripada jawapan kepada soalan ini, kami boleh menambah baik produk dan meningkatkan penglibatan pengguna.

Beam sangat berguna untuk jenis senaman ini dan mempunyai beberapa kes penggunaan lain yang menarik juga. Contohnya, anda mungkin ingin menganalisis data tanda saham dalam masa nyata dan membuat dagangan berdasarkan analisis, mungkin anda mempunyai data penderia yang datang daripada kenderaan dan ingin mengira pengiraan tahap trafik. Anda juga boleh, sebagai contoh, menjadi syarikat permainan yang mengumpul data pengguna dan menggunakannya untuk membuat papan pemuka untuk menjejaki metrik utama. Baiklah, tuan-tuan, ini adalah topik untuk jawatan lain, terima kasih kerana membaca, dan bagi mereka yang ingin melihat kod penuh, di bawah adalah pautan ke GitHub saya.

https://github.com/DFoly/User_log_pipeline

Itu sahaja. Baca bahagian satu.

Sumber: www.habr.com

Tambah komen