Мо лӯлаи коркарди маълумотро эҷод мекунем. Қисми 2

Салом ба ҳама. Мо тарҷумаи қисми ниҳоии мақоларо, ки махсус барои донишҷӯёни курс таҳия шудааст, мубодила мекунем. Муҳандиси маълумот. Шумо метавонед қисми аввалро хонед дар ин ҷо.

Apache Beam ва DataFlow барои лӯлаҳои вақти воқеӣ

Мо лӯлаи коркарди маълумотро эҷод мекунем. Қисми 2

Насб кардани Google Cloud

Эзоҳ: Ман Google Cloud Shell-ро барои иҷро кардани лӯла ва интишори маълумоти сабти фармоишӣ истифода мебурдам, зеро дар идоракунии қубур дар Python 3 мушкилӣ доштам. Google Cloud Shell Python 2-ро истифода мебарад, ки бо Apache Beam мувофиқтар аст.

Барои оғоз кардани қубур, мо бояд каме ба танзимот кобед. Барои онҳое, ки қаблан GCP-ро истифода накардаанд, ба шумо лозим меояд, ки 6 қадами дар ин бора зикршударо иҷро кунед саҳифа.

Пас аз ин, мо бояд скриптҳои худро ба Google Cloud Storage бор кунем ва онҳоро ба Google Cloud Shel нусхабардорӣ кунем. Боргузорӣ ба анбори абр хеле ночиз аст (тавсифро метавон пайдо кард дар ин ҷо). Барои нусхабардории файлҳои худ, мо метавонем Google Cloud Shel-ро аз панели асбобҳо бо пахш кардани нишонаи аввал дар тарафи чап дар расми 2 дар зер кушоем.

Мо лӯлаи коркарди маълумотро эҷод мекунем. Қисми 2
Расми 2

Фармонҳое, ки мо бояд файлҳоро нусхабардорӣ кунем ва китобхонаҳои лозимиро насб кунем, дар зер оварда шудаанд.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Эҷоди пойгоҳи додаҳо ва ҷадвали мо

Пас аз он ки мо ҳамаи қадамҳои марбут ба танзимро анҷом додем, чизи дигаре, ки мо бояд анҷом диҳем, дар BigQuery маҷмӯи додаҳо ва ҷадвал эҷод кардан аст. Якчанд роҳҳо барои ин вуҷуд доранд, аммо соддатаринаш истифодаи консоли Google Cloud тавассути эҷоди маҷмӯи додаҳо мебошад. Шумо метавонед қадамҳои зеринро иҷро кунед пайвандбарои сохтани ҷадвал бо схема. Дастархони мо хохад дошт 7 сутун, ки ба ҷузъҳои ҳар як гузориши корбар мувофиқ аст. Барои роҳат, мо ҳама сутунҳоро ҳамчун сатр муайян мекунем, ба истиснои тағирёбандаи вақти маҳаллӣ ва онҳоро мувофиқи тағирёбандаҳое, ки қаблан тавлид карда будем, номбар мекунем. Тартиби ҷадвали мо бояд дар расми 3 бошад.

Мо лӯлаи коркарди маълумотро эҷод мекунем. Қисми 2
Расми 3. Тартиби ҷадвал

Интишори маълумоти сабти корбар

Pub/Sub ҷузъи муҳими лӯлаи мост, зеро он имкон медиҳад, ки барномаҳои сершумори мустақил бо ҳамдигар муошират кунанд. Аз ҷумла, он ҳамчун миёнарав кор мекунад, ки ба мо имкон медиҳад, ки паёмҳоро байни барномаҳо ирсол ва қабул кунем. Аввалин чизе, ки мо бояд анҷом диҳем, эҷод кардани мавзӯъ аст. Танҳо дар консол ба Pub/Sub равед ва Эҷоди МАВЗУро пахш кунед.

Рамзи дар поён овардашуда скрипти моро даъват мекунад, то маълумоти сабти дар боло муайяншударо тавлид кунад ва пас гузоришҳоро ба Pub/Sub пайваст ва мефиристад. Ягона чизе, ки мо бояд кунем, сохтани объект аст PublisherClient, бо истифода аз усул роҳи мавзӯъро муайян кунед topic_path ва функсияро даъват кунед publish с topic_path ва маълумот. Лутфан қайд кунед, ки мо ворид мекунем generate_log_line аз скрипти мо stream_logs, бинобар ин боварӣ ҳосил кунед, ки ин файлҳо дар як ҷузвдон ҳастанд, вагарна шумо хатои воридотро мегиред. Пас мо метавонем инро тавассути консоли google-и худ бо истифода аз:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Ҳамин ки файл кор мекунад, мо метавонем баромади маълумоти сабтро ба консол бубинем, тавре ки дар расми зер нишон дода шудааст. Ин скрипт то он даме, ки мо истифода набарем, кор хоҳад кард CTRL + Cба анчом расондани он.

Мо лӯлаи коркарди маълумотро эҷод мекунем. Қисми 2
Расми 4. Натиҷа publish_logs.py

Навиштани рамзи қубури мо

Акнун, ки мо ҳама чизро омода кардаем, мо метавонем қисми шавқоварро оғоз кунем - рамзгузории лӯлаи худро бо истифода аз Beam ва Python. Барои сохтани лӯлаи Beam, мо бояд объекти қубурро созем (р). Пас аз он ки мо объекти қубурро эҷод кардем, мо метавонем бо истифода аз оператор якчанд вазифаҳоро пай дар пай татбиқ кунем pipe (|). Умуман, ҷараёни кор ба тасвири дар поён овардашуда монанд аст.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Дар рамзи мо, мо ду функсияи фармоиширо эҷод мекунем. Функсия regex_clean, ки маълумотро скан мекунад ва сатри мувофиқро дар асоси рӯйхати PATTERNS бо истифода аз функсия бармегардонад re.search. Функсия сатри аз вергул ҷудошударо бармегардонад. Агар шумо коршиноси ифодаи муқаррарӣ набошед, ман тавсия медиҳам, ки инро тафтиш кунед дастур ва дар блокнот машқ кунед, то кодро тафтиш кунед. Пас аз ин мо функсияи фармоишии ParDo -ро муайян мекунем, ки номида мешавад Зада шикастан, ки як варианти табдили Beam барои коркарди мувозӣ мебошад. Дар Python, ин ба таври махсус анҷом дода мешавад - мо бояд синферо эҷод кунем, ки аз синфи DoFn Beam мерос мегирад. Функсияи тақсимкунӣ сатри таҳлилшударо аз функсияи қаблӣ мегирад ва рӯйхати луғатҳоро бо калидҳои мувофиқ ба номҳои сутуни ҷадвали BigQuery мо бармегардонад. Дар бораи ин функсия чизе бояд қайд карда шавад: ман бояд ворид кунам datetime дар дохили функсия барои кор кардани он. Ман дар оғози файл хатои воридотӣ гирифтам, ки аҷиб буд. Пас аз ин рӯйхат ба функсия интиқол дода мешавад WriteToBigQuery, ки танҳо маълумоти моро ба ҷадвал илова мекунад. Рамзи Batch DataFlow Job ва Streaming DataFlow Job дар зер оварда шудааст. Ягона фарқияти байни коди партия ва ҷараён дар он аст, ки дар партия мо CSV-ро аз он мехонем src_pathбо истифода аз функсия ReadFromText аз Beam.

Batch DataFlow Job (коркарди партия)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Ҷараёни DataFlow Job (коркарди ҷараён)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Ба кор андохтани конвейер

Мо метавонем қубурро бо якчанд роҳҳои гуногун гузаронем. Агар мо мехостем, мо метавонем онро танҳо аз терминал ҳангоми ворид шудан ба GCP дурдаст ба таври маҳаллӣ иҷро кунем.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Аммо, мо онро бо истифода аз DataFlow иҷро мекунем. Мо метавонем ин корро бо истифода аз фармони зер тавассути танзими параметрҳои зарурии зерин иҷро кунем.

  • project — ID-и лоиҳаи GCP-и шумо.
  • runner як давандаи қубур аст, ки барномаи шуморо таҳлил мекунад ва лӯлаи шуморо месозад. Барои дар абр кор кардан, шумо бояд DataflowRunner-ро муайян кунед.
  • staging_location — роҳ ба нигоҳдории абрии Cloud Dataflow барои индексатсияи бастаҳои кодҳо, ки ба коркардкунандагони ин кор лозиманд.
  • temp_location — роҳ ба анбори абрии Cloud Dataflow барои нигоҳ доштани файлҳои кори муваққатӣ, ки ҳангоми кор кардани қубур сохта шудаанд.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Ҳангоме ки ин фармон иҷро мешавад, мо метавонем ба ҷадвали DataFlow дар консоли google равем ва лӯлаи худро бубинем. Вақте ки мо қубурро пахш мекунем, мо бояд чизе монандро ба расми 4 бубинем. Бо мақсади ислоҳ, рафтан ба Гузоришҳо ва сипас ба Stackdriver барои дидани гузоришҳои муфассал хеле муфид аст. Ин ба ман кӯмак кард, ки дар як қатор ҳолатҳо мушкилоти қубурро ҳал кунам.

Мо лӯлаи коркарди маълумотро эҷод мекунем. Қисми 2
Расми 4: конвейери нур

Ба маълумоти мо дар BigQuery дастрасӣ пайдо кунед

Ҳамин тавр, мо бояд аллакай лӯлае дошта бошем, ки маълумот ба ҷадвали мо ҷорӣ мешавад. Барои санҷидани ин, мо метавонем ба BigQuery рафта, маълумотро бубинем. Пас аз истифодаи фармони дар поён овардашуда шумо бояд чанд сатри аввали маҷмӯи маълумотро бинед. Акнун, ки мо маълумоти дар BigQuery захирашударо дорем, мо метавонем таҳлили минбаъда гузаронем, инчунин маълумотро бо ҳамкорон мубодила кунем ва ба саволҳои тиҷоратӣ ҷавоб диҳем.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Мо лӯлаи коркарди маълумотро эҷод мекунем. Қисми 2
Расми 5: BigQuery

хулоса

Мо умедворем, ки ин паём ҳамчун як намунаи муфид барои эҷоди лӯлаи интиқоли маълумот ва инчунин дарёфти роҳҳои дастрас кардани маълумот хидмат мекунад. Нигоҳ доштани маълумот дар ин формат ба мо бартариҳои зиёд медиҳад. Акнун мо метавонем ба саволҳои муҳим ҷавоб диҳем, масалан чанд нафар аз маҳсулоти моро истифода мебаранд? Оё пойгоҳи корбарони шумо бо мурури замон афзоиш меёбад? Одамон бо кадом ҷанбаҳои маҳсулот бештар муошират мекунанд? Ва оё хатоҳое ҳастанд, ки дар он ҷо набояд вуҷуд дошта бошад? Инҳо саволҳое ҳастанд, ки барои созмон таваҷҷӯҳ хоҳанд кард. Дар асоси фаҳмишҳое, ки аз ҷавобҳо ба ин саволҳо бармеоянд, мо метавонем маҳсулотро такмил диҳем ва ҷалби корбаронро зиёд кунем.

Beam барои ин намуди машқ воқеан муфид аст ва як қатор ҳолатҳои дигари ҷолиби истифода низ дорад. Масалан, шумо метавонед маълумотро дар вақти воқеӣ таҳлил кунед ва дар асоси таҳлил савдо кунед, шояд шумо маълумоти сенсорӣ аз мошинҳо дошта бошед ва мехоҳед ҳисобҳои сатҳи трафикро ҳисоб кунед. Шумо инчунин метавонед, масалан, як ширкати бозӣ бошед, ки маълумоти корбарро ҷамъоварӣ мекунад ва онро барои эҷоди панелҳои идоракунӣ барои пайгирии ченакҳои калидӣ истифода мебарад. Хуб, ҷанобон, ин мавзӯъ барои мақолаи дигар аст, ташаккур барои хондан ва барои онҳое, ки мехоҳанд рамзи пурраро бубинанд, дар зер истиноди GitHub-и ман аст.

https://github.com/DFoly/User_log_pipeline

Ҳамааш ҳамин. Қисми якумро хонед.

Манбаъ: will.com

Илова Эзоҳ