Бид мэдээллийн урсгалын боловсруулалтын шугамыг бий болгодог. 2-р хэсэг

Сайн уу. Тус курсын оюутнуудад зориулан бэлтгэсэн нийтлэлийн эцсийн хэсгийн орчуулгыг хүргэж байна. "Өгөгдлийн инженер". Та эхний хэсгийг уншиж болно энд.

Бодит цагийн дамжуулах хоолойд зориулсан Apache Beam ба DataFlow

Бид мэдээллийн урсгалын боловсруулалтын шугамыг бий болгодог. 2-р хэсэг

Google Cloud-г тохируулж байна

Тайлбар: Би Python 3 дээр дамжуулах хоолойг ажиллуулахад асуудалтай байсан тул Google Cloud Shell-ийг ашиглаж, хэрэглэгчийн бүртгэлийн өгөгдлийг нийтлэх боломжтой. Google Cloud Shell нь Python 2-г ашигладаг бөгөөд энэ нь Apache Beam-тэй илүү нийцдэг.

Дамжуулах хоолойг эхлүүлэхийн тулд бид тохиргоонд бага зэрэг ухах хэрэгтэй. Өмнө нь GCP ашиглаж байгаагүй хүмүүсийн хувьд та үүнд дурдсан дараах 6 алхамыг дагах хэрэгтэй хуудас.

Үүний дараа бид скриптүүдээ Google Cloud Storage-д байршуулж, Google Cloud Shel руу хуулах хэрэгтэй болно. Клоуд санд байршуулах нь маш энгийн зүйл юм (тайлбарыг олж болно энд). Файлуудаа хуулахын тулд бид доорх Зураг 2-ын зүүн талд байгаа эхний дүрс дээр дарж Google Cloud Shel-ийг багаж самбараас нээж болно.

Бид мэдээллийн урсгалын боловсруулалтын шугамыг бий болгодог. 2-р хэсэг
2 зураг

Файлуудыг хуулж, шаардлагатай сангуудыг суулгахад шаардлагатай командуудыг доор жагсаав.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Манай мэдээллийн сан болон хүснэгтийг бий болгож байна

Тохируулгатай холбоотой бүх алхмуудыг хийж дууссаны дараа бидний хийх дараагийн зүйл бол BigQuery дээр өгөгдлийн багц болон хүснэгт үүсгэх явдал юм. Үүнийг хийх хэд хэдэн арга байдаг ч хамгийн энгийн нь Google Cloud консолыг ашиглан эхлээд датасет үүсгэх явдал юм. Та доорх алхмуудыг дагаж болно холбооссхем бүхий хүснэгт үүсгэх. Манай ширээ байх болно 7 багана, хэрэглэгчийн бүртгэл бүрийн бүрэлдэхүүн хэсгүүдэд харгалзах. Тохиромжтой болгох үүднээс бид цагийн орон нутгийн хувьсагчаас бусад бүх баганыг мөр гэж тодорхойлж, өмнө нь үүсгэсэн хувьсагчийн дагуу нэрлэнэ. Манай хүснэгтийн зохион байгуулалт нь Зураг 3-т үзүүлсэн шиг харагдах ёстой.

Бид мэдээллийн урсгалын боловсруулалтын шугамыг бий болгодог. 2-р хэсэг
Зураг 3. Хүснэгтийн зохион байгуулалт

Хэрэглэгчийн бүртгэлийн өгөгдлийг нийтэлж байна

Pub/Sub нь олон бие даасан программууд хоорондоо холбогдох боломжийг олгодог тул манай дамжуулах хоолойн чухал бүрэлдэхүүн хэсэг юм. Ялангуяа энэ нь бидэнд программуудын хооронд мессеж илгээх, хүлээн авах боломжийг олгодог зуучлагчийн үүрэг гүйцэтгэдэг. Бидний хийх ёстой хамгийн эхний зүйл бол сэдэв үүсгэх явдал юм. Зүгээр л консол дахь Pub/Sub руу ороод СЭДВИЙГ ҮЗҮҮЛЭХ дээр дарна уу.

Доорх код нь дээр тодорхойлсон бүртгэлийн өгөгдлийг үүсгэхийн тулд манай скриптийг дуудаж, дараа нь логуудыг Pub/Sub руу холбож илгээдэг. Бидний хийх ёстой цорын ганц зүйл бол объект үүсгэх явдал юм PublisherClient, аргыг ашиглан сэдэвт хүрэх замыг зааж өгнө topic_path мөн функцийг дуудна publish с topic_path болон өгөгдөл. Манайх импортолж байгааг анхаарна уу generate_log_line манай скриптээс stream_logs, тиймээс эдгээр файлууд нэг хавтсанд байгаа эсэхийг шалгаарай, эс тэгвээс импортын алдаа гарах болно. Дараа нь бид үүнийг Google консолоор дамжуулан ажиллуулж болно:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Файл ажиллаж эхэлмэгц бид доорх зурагт үзүүлсэн шиг консол руу лог өгөгдлийн гаралтыг харах боломжтой болно. Энэ скрипт нь бид ашиглахгүй бол ажиллах болно CTRL + Cдуусгахын тулд.

Бид мэдээллийн урсгалын боловсруулалтын шугамыг бий болгодог. 2-р хэсэг
Зураг 4. Гаралт publish_logs.py

Бидний дамжуулах хоолойн кодыг бичиж байна

Одоо бид бүх зүйлээ бэлтгэсэн тул бид хөгжилтэй хэсгийг эхлүүлж болно - Beam болон Python ашиглан дамжуулах хоолойг кодлох. Beam дамжуулах хоолой үүсгэхийн тулд бид дамжуулах хоолойн объектыг (p) үүсгэх хэрэгтэй. Дамжуулах шугамын объектыг үүсгэсний дараа бид операторыг ашиглан олон функцийг ар араас нь ашиглаж болно pipe (|). Ерөнхийдөө ажлын явц нь доорх зураг шиг харагдаж байна.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Манай кодонд бид хоёр захиалгат функц үүсгэх болно. Чиг үүрэг regex_clean, энэ нь өгөгдлийг сканнердаж, функцийг ашиглан PATTERNS жагсаалтад үндэслэн харгалзах мөрийг гаргаж авдаг. re.search. Уг функц нь таслалаар тусгаарлагдсан мөрийг буцаана. Хэрэв та ердийн илэрхийлэлийн мэргэжилтэн биш бол би үүнийг шалгахыг зөвлөж байна заавар кодыг шалгахын тулд тэмдэглэлийн дэвтэр дээр дадлага хий. Үүний дараа бид өөрчлөн тохируулсан ParDo функцийг тодорхойлно салгах, энэ нь параллель боловсруулалтад зориулсан Beam хувиргалтын хувилбар юм. Python-д үүнийг тусгай аргаар хийдэг - бид DoFn Beam ангиас удамшсан анги үүсгэх ёстой. Split функц нь өмнөх функцээс задлан шинжилсэн мөрийг авч, манай BigQuery хүснэгтийн баганын нэрэнд тохирох товчлуур бүхий толь бичгийн жагсаалтыг буцаана. Энэ функцийн талаар анхаарах зүйл байна: Би импортлох шаардлагатай болсон datetime Үүнийг ажиллуулахын тулд функц дотор. Би файлын эхэнд импортын алдаа гаргаж байсан нь хачирхалтай байсан. Дараа нь энэ жагсаалтыг функц руу шилжүүлнэ WriteToBigQuery, энэ нь зүгээр л бидний өгөгдлийг хүснэгтэд нэмдэг. Batch DataFlow Job болон Streaming DataFlow Job-ын кодыг доор өгөв. Багц болон урсгал код хоёрын цорын ганц ялгаа нь багц хэлбэрээр бид CSV-г уншдаг явдал юм src_pathфункцийг ашиглан ReadFromText Beam-аас.

Багц DataFlow ажил (багц боловсруулах)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (stream processing)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Конвейерийг эхлүүлж байна

Бид дамжуулах хоолойг хэд хэдэн аргаар ажиллуулж болно. Хэрэв бид хүсвэл GCP руу алсаас нэвтэрч байхдаа терминалаас үүнийг дотооддоо ажиллуулж болно.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Гэсэн хэдий ч бид үүнийг DataFlow ашиглан ажиллуулах болно. Дараах шаардлагатай параметрүүдийг тохируулснаар бид доорх тушаалыг ашиглан үүнийг хийж болно.

  • project — Таны GCP төслийн ID.
  • runner нь таны хөтөлбөрт дүн шинжилгээ хийж, дамжуулах хоолойг тань барих шугам хоолой юм. Үүлэн дээр ажиллахын тулд та DataflowRunner-г зааж өгөх ёстой.
  • staging_location — уг ажлыг гүйцэтгэж буй процессоруудад шаардлагатай кодын багцыг индексжүүлэх зориулалттай Cloud Dataflow үүл хадгалах сан руу хүрэх зам.
  • temp_location — Дамжуулах хоолой ажиллаж байх үед үүссэн түр зуурын ажлын файлуудыг хадгалах зориулалттай Cloud Dataflow үүл сан руу орох зам.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Энэ тушаал ажиллаж байх үед бид google консол дахь DataFlow таб руу орж, дамжуулах шугамаа харах боломжтой. Дамжуулах хоолой дээр дарахад бид Зураг 4-тэй төстэй зүйлийг харах болно. Дибаг хийх зорилгоор Logs, дараа нь Stackdriver руу очиж нарийвчилсан бүртгэлүүдийг үзэх нь маш тустай. Энэ нь надад хэд хэдэн тохиолдолд дамжуулах хоолойн асуудлыг шийдвэрлэхэд тусалсан.

Бид мэдээллийн урсгалын боловсруулалтын шугамыг бий болгодог. 2-р хэсэг
Зураг 4: Цацраг дамжуулагч

BigQuery дээрх манай өгөгдөлд хандах

Тиймээс, бид хүснэгтэд өгөгдөл урсаж байгаа дамжуулах хоолойтой байх ёстой. Үүнийг шалгахын тулд бид BigQuery руу орж өгөгдлийг харах боломжтой. Доорх тушаалыг ашигласны дараа та өгөгдлийн багцын эхний хэдэн мөрийг харах ёстой. Одоо бид BigQuery-д хадгалагдсан өгөгдөлтэй болсон тул бид цаашдын дүн шинжилгээ хийх, мөн мэдээллийг хамтран ажиллагсадтайгаа хуваалцаж, бизнесийн асуултуудад хариулж эхлэх боломжтой.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Бид мэдээллийн урсгалын боловсруулалтын шугамыг бий болгодог. 2-р хэсэг
Зураг 5: BigQuery

дүгнэлт

Энэ нийтлэл нь өгөгдлийн урсгалыг бий болгох, мөн өгөгдлийг илүү хүртээмжтэй болгох арга замыг олоход хэрэгтэй жишээ болно гэж найдаж байна. Энэ форматаар өгөгдлийг хадгалах нь бидэнд олон давуу талыг өгдөг. Одоо манай бүтээгдэхүүнийг хэр олон хүн ашигладаг вэ гэх мэт чухал асуултуудад хариулж эхлэх боломжтой. Таны хэрэглэгчийн бааз цаг хугацаа өнгөрөх тусам нэмэгдэж байна уу? Хүмүүс бүтээгдэхүүний ямар талуудтай хамгийн их харьцдаг вэ? Мөн байх ёсгүй алдаанууд байдаг уу? Эдгээр нь байгууллагын сонирхлыг татах асуултууд юм. Эдгээр асуултын хариултаас олж авсан ойлголт дээр үндэслэн бид бүтээгдэхүүнийг сайжруулж, хэрэглэгчийн оролцоог нэмэгдүүлэх боломжтой.

Beam нь энэ төрлийн дасгалд үнэхээр хэрэгтэй бөгөөд бусад олон сонирхолтой хэрэглээтэй. Жишээлбэл, та хувьцааны хачгийн мэдээллийг бодит цаг хугацаанд нь шинжилж, дүн шинжилгээнд үндэслэн арилжаа хийх, магадгүй танд тээврийн хэрэгслээс мэдрэгчийн өгөгдөл ирж байгаа бөгөөд замын хөдөлгөөний түвшинг тооцоолохыг хүсч болно. Жишээлбэл, та хэрэглэгчийн мэдээллийг цуглуулж, үндсэн хэмжигдэхүүнийг хянах самбар үүсгэхэд ашигладаг тоглоомын компани байж болно. За, ноёд оо, энэ бол өөр нийтлэлийн сэдэв, уншсанд баярлалаа, мөн кодыг бүрэн эхээр нь үзэхийг хүссэн хүмүүст миний GitHub-ийн линк доор байна.

https://github.com/DFoly/User_log_pipeline

Ердөө л тэр. Нэгдүгээр хэсгийг уншина уу.

Эх сурвалж: www.habr.com

сэтгэгдэл нэмэх