Urang nyieun pipa processing data stream. Bagian 2

Halo sadayana. Kami ngabagikeun tarjamahan bagian ahir tulisan, disiapkeun khusus pikeun mahasiswa kursus. "Insinyur Data". Anjeun tiasa maca bagian kahiji di dieu.

Apache Beam sareng DataFlow pikeun Saluran Pipa Waktu Nyata

Urang nyieun pipa processing data stream. Bagian 2

Nyetél Google Cloud

Catetan: I dipaké Google Cloud Shell pikeun ngajalankeun pipa jeung nyebarkeun data log custom sabab kuring ngalaman kasulitan ngajalankeun pipa di Python 3. Google Cloud Shell migunakeun Python 2, nu leuwih konsisten jeung Apache Beam.

Pikeun ngamimitian pipa, urang kedah ngagali sakedik kana setélan. Pikeun anjeun anu teu acan nganggo GCP sateuacanna, anjeun kedah nuturkeun 6 léngkah ieu di handap ieu kaca.

Saatos ieu, urang kedah unggah naskah ka Google Cloud Storage sareng nyalin kana Google Cloud Shel urang. Unggah ka panyimpenan awan rada sepele (deskripsi tiasa dipendakan di dieu). Pikeun nyalin file urang, urang tiasa muka Google Cloud Shel tina toolbar ku ngaklik ikon munggaran di kénca dina Gambar 2 handap.

Urang nyieun pipa processing data stream. Bagian 2
Gambar 2

Paréntah anu urang kedah nyalin file sareng pasang perpustakaan anu diperyogikeun dibéréndélkeun di handap.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Nyieun database na tabel urang

Sakali kami parantos réngsé sadaya léngkah anu aya hubunganana, hal salajengna anu kedah urang laksanakeun nyaéta ngadamel set data sareng tabel di BigQuery. Aya sababaraha cara pikeun ngalakukeun ieu, tapi anu pangbasajanna nyaéta ngagunakeun konsol Google Cloud ku mimiti nyiptakeun set data. Anjeun tiasa nuturkeun léngkah di handap ieu linkpikeun nyieun tabel kalawan schema a. méja kami bakal boga 7 kolom, pakait jeung komponén unggal log pamaké. Pikeun genah, urang bakal nangtukeun sakabeh kolom salaku string, iwal variabel timelocal, sarta ngaranan aranjeunna nurutkeun variabel urang dihasilkeun saméméhna. Tata perenah tabel urang kedah sapertos dina Gambar 3.

Urang nyieun pipa processing data stream. Bagian 2
Gambar 3. Tata perenah méja

Nerbitkeun data log pamaké

Pub/Sub mangrupakeun komponén kritis pipa kami sabab ngamungkinkeun sababaraha aplikasi bebas pikeun komunikasi saling. Khususna, éta tiasa dianggo salaku perantara anu ngamungkinkeun urang pikeun ngirim sareng nampi pesen antara aplikasi. Hal kahiji anu urang kedah laksanakeun nyaéta nyiptakeun topik. Kantun angkat ka Pub / Sub dina konsol teras klik JIEUNAN TOPIK.

Kodeu di handap nyauran naskah kami pikeun ngahasilkeun data log anu didefinisikeun di luhur teras nyambungkeun sareng ngirim log ka Pub / Sub. Hiji-hijina hal anu urang kedah laksanakeun nyaéta nyiptakeun obyék Klién Penerbit, tangtukeun jalur ka topik ngagunakeun métode topic_path jeung nelepon fungsi publish с topic_path jeung data. Punten dicatet yén urang ngimpor generate_log_line tina naskah urang stream_logs, janten pastikeun file ieu aya dina polder anu sami, upami henteu anjeun bakal nampi kasalahan impor. Urang teras tiasa ngajalankeun ieu ngaliwatan konsol google kami nganggo:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Pas file dijalankeun, urang bakal tiasa ningali kaluaran data log kana konsol, ditémbongkeun saperti dina gambar di handap ieu. Skrip ieu bakal tiasa dianggo salami urang henteu dianggo Ctrl + Cpikeun ngalengkepan éta.

Urang nyieun pipa processing data stream. Bagian 2
Gambar 4. Kaluaran publish_logs.py

Nulis kode pipa kami

Ayeuna urang parantos nyiapkeun sadayana, urang tiasa ngamimitian bagian anu pikaresepeun - coding pipa kami nganggo Beam sareng Python. Pikeun nyieun pipa Beam, urang kudu nyieun hiji objek pipeline (p). Sakali kami geus nyieun hiji objek pipeline, urang bisa nerapkeun sababaraha fungsi hiji sanggeus sejen ngagunakeun operator pipe (|). Sacara umum, alur kerja sapertos gambar di handap ieu.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Dina kode urang, urang bakal nyieun dua fungsi custom. Fungsi regex_clean, nu nyeken data jeung retrieves baris pakait dumasar kana daptar POLA ngagunakeun fungsi re.search. Fungsina mulihkeun string anu dipisahkeun koma. Upami anjeun sanés ahli éksprési biasa, kuring nyarankeun mariksa ieu tutorial sareng latihan dina notepad pikeun mariksa kodeu. Sanggeus ieu kami nangtukeun hiji fungsi ParDo custom disebut Beulah, nu mangrupakeun variasi tina transformasi Beam pikeun ngolah paralel. Dina Python, ieu dipigawé ku cara husus - urang kudu nyieun kelas nu inherits ti kelas DoFn Beam. Fungsi Split nyokot baris parsed ti fungsi saméméhna tur mulih daptar kamus kalawan konci pakait jeung ngaran kolom dina tabel BigQuery urang. Aya hal anu kudu dicatet ngeunaan fungsi ieu: Kuring kungsi ngimpor datetime jero hiji fungsi sangkan eta jalan. Kuring meunang kasalahan impor di awal file, nu aneh. Daptar ieu lajeng dibikeun ka fungsi TulisToBigQuery, nu saukur nambahkeun data urang kana tabél. Kodeu pikeun Angkatan DataFlow Job sareng Streaming DataFlow Job dirumuskeun di handap ieu. Hiji-hijina bédana antara bets sareng kode streaming nyaéta dina bets urang maca CSV tina src_pathngagunakeun fungsi ReadFromText ti Beam.

Angkatan DataFlow Job (pangolah angkatan)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (pangolah aliran)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Ngamimitian conveyor nu

Urang tiasa ngajalankeun pipa ku sababaraha cara. Upami urang hoyong, urang tiasa ngajalankeun éta sacara lokal tina terminal bari asup kana GCP jarak jauh.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Nanging, urang badé ngajalankeun éta nganggo DataFlow. Urang tiasa ngalakukeun ieu nganggo paréntah di handap ieu ku netepkeun parameter anu diperyogikeun di handap ieu.

  • project - ID proyék GCP anjeun.
  • runner nyaéta runner pipa anu bakal nganalisis program anjeun sareng ngawangun pipa anjeun. Pikeun ngajalankeun dina awan, anjeun kedah netepkeun DataflowRunner.
  • staging_location - jalur ka panyimpen awan Cloud Dataflow pikeun ngindeks bungkusan kode anu diperyogikeun ku prosesor anu ngalaksanakeun pagawéan.
  • temp_location - jalur ka panyimpenan awan Cloud Dataflow pikeun nyimpen file padamelan samentawis anu didamel nalika jalur pipa dijalankeun.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Nalika paréntah ieu dijalankeun, urang tiasa angkat ka tab DataFlow dina konsol google sareng ningali jalur pipa kami. Lamun urang klik dina pipa nu, urang kedah tingali hal sarupa Gambar 4. Pikeun tujuan debugging, bisa jadi pohara mantuan pikeun buka Log lajeng ka Stackdriver pikeun nempo log lengkep. Ieu parantos ngabantosan kuring ngabéréskeun masalah pipa dina sababaraha kasus.

Urang nyieun pipa processing data stream. Bagian 2
Gambar 4: Beam conveyor

Aksés data urang dina BigQuery

Janten, urang kedah gaduh pipa anu ngajalankeun sareng data anu ngalir kana méja urang. Pikeun nguji ieu, urang tiasa angkat ka BigQuery sareng ningali datana. Saatos nganggo paréntah di handap ieu anjeun kedah ningali sababaraha barisan data anu munggaran. Ayeuna urang gaduh data anu disimpen dina BigQuery, urang tiasa ngalaksanakeun analisa salajengna, ogé ngabagi data sareng kolega sareng ngamimitian ngajawab patarosan bisnis.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Urang nyieun pipa processing data stream. Bagian 2
Gambar 5: BigQuery

kacindekan

Kami ngarepkeun tulisan ieu janten conto anu mangpaat pikeun nyiptakeun jalur pipa data streaming, ogé milarian cara pikeun ngajantenkeun data langkung diaksés. Nyimpen data dina format ieu méré urang loba kaunggulan. Ayeuna urang tiasa ngamimitian ngawalon patarosan penting sapertos sabaraha jalma anu nganggo produk urang? Naha basis pangguna anjeun ningkat kana waktosna? Aspék produk naon anu paling sering berinteraksi sareng jalma? Sareng aya kasalahan anu henteu kedah aya? Ieu mangrupikeun patarosan anu bakal dipikaresep ku organisasi. Dumasar kana wawasan anu muncul tina jawaban kana patarosan ieu, urang tiasa ningkatkeun produk sareng ningkatkeun keterlibatan pangguna.

Beam estu mangpaat pikeun jenis latihan ieu sarta ngabogaan sajumlah kasus pamakéan metot séjén ogé. Contona, Anjeun meureun hoyong nganalisis data keletik stock sacara real waktu jeung nyieun trades dumasar kana analisis, sugan anjeun boga data sensor datang ti kandaraan jeung hayang ngitung itungan tingkat lalulintas. Anjeun ogé tiasa, contona, janten perusahaan kaulinan anu ngumpulkeun data pangguna sareng dianggo pikeun nyiptakeun dasbor pikeun ngalacak métrik konci. Oké, gentlemen, ieu topik pikeun pos sejen, hatur nuhun pikeun maca, sarta pikeun maranéhanana anu rék ningali kode pinuh, handap link ka GitHub abdi.

https://github.com/DFoly/User_log_pipeline

Éta sadaya na. Baca bagian hiji.

sumber: www.habr.com

Tambahkeun komentar