Kita nggawe pipa pangolahan data stream. Bagean 2

Halo kabeh. Kita nuduhake terjemahan bagean pungkasan artikel, sing disiapake khusus kanggo siswa kursus kasebut. Data Engineer. Sampeyan bisa maca bagean pisanan kene.

Apache Beam lan DataFlow kanggo Pipeline Wektu Nyata

Kita nggawe pipa pangolahan data stream. Bagean 2

Nggawe Google Cloud

Cathetan: Aku nggunakake Google Cloud Shell kanggo nglakokake pipa lan nerbitake data log khusus amarga aku ngalami masalah nglakokake pipa ing Python 3. Google Cloud Shell nggunakake Python 2, sing luwih konsisten karo Apache Beam.

Kanggo miwiti pipa, kita kudu digali sethithik menyang setelan kasebut. Kanggo sampeyan sing durung nggunakake GCP sadurunge, sampeyan kudu ngetutake 6 langkah ing ngisor iki kaca.

Sawise iki, kita kudu ngunggah skrip menyang Google Cloud Storage lan nyalin menyang Google Cloud Shel. Ngunggah menyang panyimpenan maya cukup sepele (deskripsi bisa ditemokake kene). Kanggo nyalin file kita, kita bisa mbukak Google Cloud Shel saka toolbar kanthi ngeklik lambang pisanan ing sisih kiwa ing Gambar 2 ing ngisor iki.

Kita nggawe pipa pangolahan data stream. Bagean 2
Gambar 2

Printah sing dibutuhake kanggo nyalin file lan nginstal perpustakaan sing dibutuhake kapacak ing ngisor iki.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Nggawe database lan tabel kita

Sawise kita wis ngrampungake kabeh langkah-langkah sing ana gandhengane karo persiyapan, sabanjure sing kudu ditindakake yaiku nggawe set data lan tabel ing BigQuery. Ana sawetara cara kanggo nindakake iki, nanging sing paling gampang yaiku nggunakake konsol Google Cloud kanthi nggawe set data dhisik. Sampeyan bisa tindakake langkah ing ngisor iki linkkanggo nggawe tabel kanthi skema. Tabel kita bakal duwe 7 kolom, cocog karo komponen saben log pangguna. Kanggo penak, kita bakal nemtokake kabeh kolom minangka strings, kajaba variabel timelocal, lan jenenge miturut variabel sing digawe sadurunge. Tata letak meja kita kudu katon kaya ing Gambar 3.

Kita nggawe pipa pangolahan data stream. Bagean 2
Gambar 3. Tata meja

Nerbitake data log pangguna

Pub/Sub minangka komponèn kritis saka pipeline kita amarga ngidini sawetara aplikasi independen kanggo komunikasi karo saben liyane. Utamane, kerjane minangka perantara sing ngidini kita ngirim lan nampa pesen ing antarane aplikasi. Babagan pisanan sing kudu kita lakoni yaiku nggawe topik. Cukup menyang Pub / Sub ing konsol banjur klik GIPTA TOPIK.

Kode ing ngisor iki nelpon skrip kita kanggo ngasilake data log sing ditetepake ing ndhuwur banjur nyambung lan ngirim log menyang Pub/Sub. Siji-sijine sing kudu ditindakake yaiku nggawe obyek Klien Penerbit, nemtokake path menyang topik nggunakake cara topic_path lan nelpon fungsi publish с topic_path lan data. Elinga yen kita ngimpor generate_log_line saka naskah kita stream_logs, dadi priksa manawa file kasebut ana ing folder sing padha, yen ora, sampeyan bakal entuk kesalahan ngimpor. Kita banjur bisa mbukak iki liwat google console nggunakake:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Sanalika file mbukak, kita bakal bisa ndeleng output data log menyang console, minangka ditampilake ing tokoh ngisor. Skrip iki bakal bisa digunakake anggere kita ora nggunakake CTRL + Ckanggo ngrampungake.

Kita nggawe pipa pangolahan data stream. Bagean 2
Gambar 4. Output publish_logs.py

Nulis kode pipa kita

Saiki kita wis nyiapake kabeh, kita bisa miwiti bagean sing nyenengake - ngode pipa nggunakake Beam lan Python. Kanggo nggawe pipa Beam, kita kudu nggawe obyek pipa (p). Sawise nggawe obyek pipa, kita bisa ngetrapake pirang-pirang fungsi kanthi nggunakake operator pipe (|). UmumΓ©, alur kerja katon kaya gambar ing ngisor iki.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Ing kode kita, kita bakal nggawe rong fungsi khusus. Fungsi regex_clean, sing mindhai data lan njupuk baris sing cocog adhedhasar dhaptar POLA nggunakake fungsi re.search. Fungsi kasebut ngasilake string sing dipisahake koma. Yen sampeyan dudu pakar ekspresi biasa, aku nyaranake mriksa iki tutorial lan praktek ing notepad kanggo mriksa kode. Sawise iki kita nemtokake fungsi ParDo adat disebut Split, yaiku variasi saka transformasi Beam kanggo pangolahan paralel. Ing Python, iki ditindakake kanthi cara khusus - kita kudu nggawe kelas sing entuk warisan saka kelas DoFn Beam. Fungsi Split njupuk baris sing diurai saka fungsi sadurunge lan ngasilake dhaptar kamus kanthi tombol sing cocog karo jeneng kolom ing tabel BigQuery. Ana sing kudu dicathet babagan fungsi iki: Aku kudu ngimpor datetime nang fungsi kanggo nggawe iku bisa. Aku entuk kesalahan impor ing wiwitan file, sing aneh. Dhaptar iki banjur diterusake menyang fungsi kasebut WriteToBigQuery, sing mung nambahake data kita menyang tabel. Kode kanggo Batch DataFlow Job lan Streaming DataFlow Job diwenehi ing ngisor iki. Bentenipun mung antarane kumpulan lan kode streaming iku ing kumpulan kita maca CSV saka src_pathnggunakake fungsi ReadFromText saka Beam.

Batch DataFlow Job (proses batch)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (stream processing)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Miwiti conveyor

Kita bisa mbukak pipa kanthi macem-macem cara. Yen pengin, kita mung bisa mbukak kanthi lokal saka terminal nalika mlebu menyang GCP saka jarak jauh.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Nanging, kita bakal mbukak nggunakake DataFlow. Kita bisa nindakake iki nggunakake printah ing ngisor iki kanthi nyetel paramèter sing dibutuhake ing ngisor iki.

  • project - ID proyek GCP sampeyan.
  • runner minangka pelari pipa sing bakal nganalisa program sampeyan lan nggawe pipa sampeyan. Kanggo mbukak ing mΓ©ga, sampeyan kudu nemtokake DataflowRunner.
  • staging_location - dalan menyang panyimpenan maya Cloud Dataflow kanggo ngindeks paket kode sing dibutuhake dening prosesor sing nindakake karya kasebut.
  • temp_location - path menyang panyimpenan maya Cloud Dataflow kanggo nyimpen file proyek sementara sing digawe nalika saluran pipa mlaku.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Nalika printah iki mlaku, kita bisa pindhah menyang tab DataFlow ing google console lan ndeleng pipeline kita. Nalika kita ngeklik pipa, kita kudu ndeleng sing padha karo Figure 4. Kanggo tujuan debugging, bisa migunani banget kanggo pindhah menyang Log banjur menyang Stackdriver kanggo ndeleng log sing rinci. Iki wis mbantu ngatasi masalah pipa ing sawetara kasus.

Kita nggawe pipa pangolahan data stream. Bagean 2
Gambar 4: Beam conveyor

Ngakses data kita ing BigQuery

Dadi, kita kudu duwe pipa sing mlaku kanthi data sing mili menyang tabel. Kanggo nguji iki, kita bisa pindhah menyang BigQuery lan ndeleng data kasebut. Sawise nggunakake printah ing ngisor iki sampeyan kudu ndeleng sawetara baris pisanan saka dataset. Saiki kita duwe data sing disimpen ing BigQuery, kita bisa nindakake analisis luwih lanjut, uga nuduhake data karo kolega lan miwiti mangsuli pitakon bisnis.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Kita nggawe pipa pangolahan data stream. Bagean 2
Gambar 5: BigQuery

kesimpulan

Muga-muga kiriman iki bisa dadi conto sing migunani kanggo nggawe saluran pipa data streaming, uga golek cara supaya data luwih gampang diakses. Nyimpen data ing format iki menehi akeh kaluwihan. Saiki kita bisa miwiti mangsuli pitakonan penting kaya carane akeh wong nggunakake produk kita? Apa basis pangguna sampeyan saya suwe saya suwe? Apa aspek produk sing paling disenengi wong? Lan ana kesalahan sing ora kudu ana? Iki minangka pitakonan sing bakal dadi kapentingan kanggo organisasi. Adhedhasar wawasan sing muncul saka jawaban kanggo pitakonan kasebut, kita bisa nambah produk lan nambah keterlibatan pangguna.

Beam pancen migunani kanggo jinis olahraga iki lan uga duwe sawetara kasus panggunaan liyane sing menarik. Contone, sampeyan bisa uga pengin nganalisis data obah saham ing wektu nyata lan nggawe perdagangan adhedhasar analisis, mbok menawa sampeyan duwe data sensor teka saka kendaraan lan pengin ngetung petungan tingkat lalu lintas. Sampeyan uga bisa, contone, dadi perusahaan game sing ngumpulake data pangguna lan digunakake kanggo nggawe dashboard kanggo trek metrik tombol. Oke, Pak, iki topik kanggo kirim liyane, matur nuwun kanggo maca, lan kanggo sing pengin ndeleng kode lengkap, ing ngisor iki link menyang GitHub.

https://github.com/DFoly/User_log_pipeline

Iku pancen kabeh. Maca bagean siji.

Source: www.habr.com

Add a comment