Біз ағындық деректерді өңдеу құбырын жасаймыз. 2 бөлім

Бәріңе сәлем. Біз курс студенттері үшін арнайы дайындалған мақаланың қорытынды бөлімінің аудармасымен бөлісеміз. Деректер инженері. Бірінші бөлімді оқуға болады осында.

Нақты уақыттағы құбыр желілеріне арналған Apache Beam және DataFlow

Біз ағындық деректерді өңдеу құбырын жасаймыз. 2 бөлім

Google Cloud орнатылуда

Ескертпе: Мен құбырды іске қосу және реттелетін журнал деректерін жариялау үшін Google Cloud Shell қолданбасын пайдаландым, себебі Python 3 жүйесінде құбырды іске қосуда қиындықтар туындады. Google Cloud Shell Apache Beam жүйесіне сәйкес келетін Python 2 нұсқасын пайдаланады.

Құбырды бастау үшін параметрлерді аздап қазып алу керек. Бұрын GCP қолданбағандарыңыз үшін осында көрсетілген келесі 6 қадамды орындауыңыз қажет бет.

Осыдан кейін біз сценарийлерді Google Cloud Storage жүйесіне жүктеп салып, оларды Google Cloud Shel жүйесіне көшіруіміз керек. Бұлттық қоймаға жүктеп салу өте маңызды емес (сипаттаманы табуға болады осында). Файлдарды көшіру үшін төмендегі 2-суреттегі сол жақтағы бірінші белгішені басу арқылы Google Cloud Shel қолданбасын құралдар тақтасынан аша аламыз.

Біз ағындық деректерді өңдеу құбырын жасаймыз. 2 бөлім
Сурет 2

Файлдарды көшіру және қажетті кітапханаларды орнату үшін бізге қажет командалар төменде берілген.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Деректер базасын және кестемізді құру

Орнатуға қатысты барлық қадамдарды орындағаннан кейін, бізге қажет келесі нәрсе - BigQuery-де деректер жинағы мен кестені жасау. Мұны істеудің бірнеше жолы бар, бірақ ең қарапайымы - алдымен деректер жинағын жасау арқылы Google Cloud консолін пайдалану. Төмендегі қадамдарды орындауға болады байланыссхемасы бар кестені құру. Біздің дастарханымыз болады 7 баған, әрбір пайдаланушы журналының құрамдастарына сәйкес. Ыңғайлы болу үшін біз барлық бағандарды жолдар ретінде анықтаймыз, уақыттық айнымалы мәннен басқа және оларды бұрын жасаған айнымалы мәндерге сәйкес атаймыз. Біздің кестенің орналасуы 3-суреттегідей болуы керек.

Біз ағындық деректерді өңдеу құбырын жасаймыз. 2 бөлім
Сурет 3. Кестенің орналасуы

Пайдаланушы журналының деректерін жариялау

Pub/Sub - біздің құбырымыздың маңызды құрамдас бөлігі, себебі ол бірнеше тәуелсіз қолданбалардың бір-бірімен байланысуына мүмкіндік береді. Атап айтқанда, ол қосымшалар арасында хабарламаларды жіберуге және алуға мүмкіндік беретін делдал ретінде жұмыс істейді. Бізге бірінші кезекте тақырып құру керек. Консольдегі Pub/Sub бөліміне өтіп, ТАҚЫРЫП ЖАСАУ түймесін басыңыз.

Төмендегі код жоғарыда анықталған журнал деректерін жасау үшін сценарийді шақырады, содан кейін журналдарды Pub/Sub қызметіне қосады және жібереді. Бізге тек объект жасау керек PublisherClient, әдісі арқылы тақырыпқа жолды көрсетіңіз topic_path және функцияны шақырыңыз publish с topic_path және деректер. Біз импорттайтынымызды ескеріңіз generate_log_line біздің сценарийден stream_logs, сондықтан бұл файлдардың бір қалтада екеніне көз жеткізіңіз, әйтпесе импорттау қатесін аласыз. Содан кейін біз оны Google консолі арқылы іске қоса аламыз:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Файл іске қосылғаннан кейін біз төмендегі суретте көрсетілгендей журнал деректерінің консольге шығуын көре аламыз. Бұл сценарий біз пайдаланбағанша жұмыс істейді CTRL + Cоны аяқтау үшін.

Біз ағындық деректерді өңдеу құбырын жасаймыз. 2 бөлім
Сурет 4. Шығару publish_logs.py

Біздің құбыр кодын жазу

Енді бізде барлығы дайын, біз қызықты бөлікті - Beam және Python көмегімен құбырды кодтауды бастай аламыз. Beam құбырын жасау үшін құбыр нысанын (p) жасау керек. Біз конвейер нысанын жасағаннан кейін операторды пайдаланып бірнеше функцияларды бірінен соң бірін қолдана аламыз pipe (|). Жалпы, жұмыс процесі төмендегі суретке ұқсайды.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Біздің кодта біз екі теңшелетін функцияны жасаймыз. Функция regex_clean, ол деректерді сканерлейді және функцияны пайдаланып PATTERNS тізімі негізінде сәйкес жолды шығарады re.search. Функция үтірмен бөлінген жолды қайтарады. Егер сіз тұрақты өрнек маманы болмасаңыз, мен мұны тексеруді ұсынамын оқу құралы және кодты тексеру үшін блокнотта жаттығу жасаңыз. Осыдан кейін біз шақырылатын теңшелетін ParDo функциясын анықтаймыз Сызат, бұл параллельді өңдеуге арналған Beam түрлендіруінің нұсқасы. Python-да бұл ерекше жолмен жасалады - біз DoFn Beam сыныбынан мұрагер болатын класс жасауымыз керек. Бөлу функциясы алдыңғы функциядан талданған жолды алады және BigQuery кестесіндегі баған атауларына сәйкес пернелері бар сөздіктер тізімін қайтарады. Бұл функция туралы ескеретін бір нәрсе бар: импорттауым керек болды datetime оны жұмыс істеу үшін функцияның ішінде. Мен файлдың басында импорттау қатесін алдым, бұл біртүрлі болды. Содан кейін бұл тізім функцияға беріледі WriteToBigQuery, бұл біздің деректерімізді кестеге жай ғана қосады. Пакеттік DataFlow тапсырмасы және Streaming DataFlow тапсырмасының коды төменде берілген. Пакет пен ағындық кодтың арасындағы жалғыз айырмашылық - бұл пакетте біз CSV файлын оқимыз src_pathфункциясын пайдалану ReadFromText Beam бастап.

Пакеттік DataFlow тапсырмасы (пакетті өңдеу)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (ағынды өңдеу)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Конвейерді іске қосу

Біз құбырды бірнеше түрлі жолмен жүргізе аламыз. Қаласақ, GCP жүйесіне қашықтан кіру кезінде оны терминалдан жергілікті түрде іске қоса аламыз.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Дегенмен, біз оны DataFlow арқылы іске қосамыз. Біз мұны төмендегі пәрменді пайдаланып, келесі қажетті параметрлерді орнату арқылы жасай аламыз.

  • project — GCP жобаңыздың идентификаторы.
  • runner сіздің бағдарламаңызды талдайтын және құбырды құрастыратын құбыр жолшысы. Бұлтта іске қосу үшін DataflowRunner көрсету керек.
  • staging_location — жұмысты орындайтын процессорларға қажет код пакеттерін индекстеу үшін Cloud Dataflow бұлттық қоймасына апаратын жол.
  • temp_location — құбыр жұмыс істеп тұрған кезде жасалған уақытша жұмыс файлдарын сақтауға арналған Cloud Dataflow бұлттық қоймасына апаратын жол.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Бұл пәрмен жұмыс істеп тұрғанда, біз Google консоліндегі DataFlow қойындысына өтіп, құбырымызды көре аламыз. Құбырды басқан кезде біз 4-суретке ұқсас нәрсені көруіміз керек. Түзету мақсатында егжей-тегжейлі журналдарды көру үшін Журналдар, содан кейін Stackdriver бөліміне өту өте пайдалы болуы мүмкін. Бұл маған бірқатар жағдайларда құбыр мәселелерін шешуге көмектесті.

Біз ағындық деректерді өңдеу құбырын жасаймыз. 2 бөлім
4-сурет: Арқалық конвейер

BigQuery ішіндегі деректерімізге қол жеткізіңіз

Сонымен, бізде кестеге деректер ағынымен жұмыс істейтін құбыр болуы керек. Мұны тексеру үшін BigQuery-ге өтіп, деректерді қарауға болады. Төмендегі пәрменді пайдаланғаннан кейін деректер жиынының алғашқы бірнеше жолын көруіңіз керек. Енді бізде BigQuery-де сақталған деректер бар, біз әрі қарай талдау жасай аламыз, сонымен қатар деректерді әріптестермен бөлісіп, бизнес сұрақтарына жауап бере аламыз.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Біз ағындық деректерді өңдеу құбырын жасаймыз. 2 бөлім
5-сурет: BigQuery

қорытынды

Бұл пост ағынды деректер құбырын құрудың, сондай-ақ деректерді қол жетімді ету жолдарын табудың пайдалы үлгісі болады деп үміттенеміз. Бұл форматта деректерді сақтау бізге көптеген артықшылықтар береді. Енді біздің өнімді қанша адам пайдаланады сияқты маңызды сұрақтарға жауап беруге болады. Сіздің пайдаланушы базаңыз уақыт өте келе өсіп жатыр ма? Адамдар өнімнің қандай аспектілерімен көбірек әрекеттеседі? Ал болмауы керек жерде қателер бар ма? Бұл ұйымды қызықтыратын сұрақтар. Осы сұрақтардың жауаптарынан алынған түсініктерге сүйене отырып, біз өнімді жақсарта аламыз және пайдаланушылардың қатысуын арттыра аламыз.

Beam жаттығудың бұл түрі үшін өте пайдалы және басқа да қызықты қолдану жағдайлары бар. Мысалы, сіз нақты уақыт режимінде акция деректерін талдап, талдау негізінде сауда жасағыңыз келуі мүмкін, мүмкін сізде көліктерден келетін сенсор деректері бар және трафик деңгейінің есептеулерін есептегіңіз келеді. Сондай-ақ, мысалы, пайдаланушы деректерін жинайтын және оны негізгі көрсеткіштерді бақылау үшін бақылау тақталарын жасау үшін пайдаланатын ойын компаниясы бола аласыз. Жарайды, мырзалар, бұл басқа посттың тақырыбы, оқығаныңыз үшін рахмет және толық кодты көргісі келетіндер үшін төменде менің GitHub сілтемесі бар.

https://github.com/DFoly/User_log_pipeline

Мұның бәрі. Бірінші бөлімді оқыңыз.

Ақпарат көзі: www.habr.com

пікір қалдыру