Създаваме тръбопровод за обработка на поточни данни. Част 2

Здравейте всички. Споделяме превода на заключителната част на статията, подготвен специално за студентите от курса. Инженер по данни. Можете да прочетете първата част тук.

Apache Beam и DataFlow за тръбопроводи в реално време

Създаваме тръбопровод за обработка на поточни данни. Част 2

Настройване на Google Cloud

Забележка: Използвах Google Cloud Shell, за да стартирам конвейера и да публикувам персонализирани регистрационни данни, защото имах проблеми с стартирането на конвейера в Python 3. Google Cloud Shell използва Python 2, който е по-съвместим с Apache Beam.

За да стартираме тръбопровода, трябва да се поразровим малко в настройките. За тези от вас, които не са използвали GCP преди, ще трябва да следвате следните 6 стъпки, описани в това страница.

След това ще трябва да качим нашите скриптове в Google Cloud Storage и да ги копираме в нашия Google Cloud Shel. Качването в облачно хранилище е доста тривиално (можете да намерите описание тук). За да копираме нашите файлове, можем да отворим Google Cloud Shel от лентата с инструменти, като щракнете върху първата икона вляво на фигура 2 по-долу.

Създаваме тръбопровод за обработка на поточни данни. Част 2
Фигура 2

Командите, от които се нуждаем, за да копираме файловете и да инсталираме необходимите библиотеки, са изброени по-долу.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Създаване на нашата база данни и таблица

След като завършим всички стъпки, свързани с настройката, следващото нещо, което трябва да направим, е да създадем набор от данни и таблица в BigQuery. Има няколко начина да направите това, но най-простият е да използвате Google Cloud конзолата, като първо създадете набор от данни. Можете да следвате стъпките по-долу връзказа създаване на таблица със схема. Нашата маса ще има 7 колони, съответстващи на компонентите на всеки потребителски журнал. За удобство ще дефинираме всички колони като низове, с изключение на променливата timelocal, и ще ги именуваме според променливите, които генерирахме по-рано. Оформлението на нашата таблица трябва да изглежда като на фигура 3.

Създаваме тръбопровод за обработка на поточни данни. Част 2
Фигура 3. Оформление на таблицата

Публикуване на потребителски регистрационни данни

Pub/Sub е критичен компонент от нашия конвейер, защото позволява на множество независими приложения да комуникират помежду си. По-специално, той работи като посредник, който ни позволява да изпращаме и получаваме съобщения между приложенията. Първото нещо, което трябва да направим, е да създадем тема. Просто отидете на Pub/Sub в конзолата и щракнете върху СЪЗДАВАНЕ НА ТЕМА.

Кодът по-долу извиква нашия скрипт, за да генерира регистрационните данни, дефинирани по-горе, и след това се свързва и изпраща регистрационните файлове към Pub/Sub. Единственото нещо, което трябва да направим, е да създадем обект PublisherClient, посочете пътя до темата с помощта на метода topic_path и извикайте функцията publish с topic_path и данни. Моля, имайте предвид, че ние внасяме generate_log_line от нашия скрипт stream_logs, затова се уверете, че тези файлове са в една и съща папка, в противен случай ще получите грешка при импортиране. След това можем да стартираме това през нашата конзола на Google, използвайки:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Веднага щом файлът се стартира, ще можем да видим изходните данни от регистрационния файл към конзолата, както е показано на фигурата по-долу. Този скрипт ще работи, докато не го използваме CTRL + Cза да го завършите.

Създаваме тръбопровод за обработка на поточни данни. Част 2
Фигура 4. Изход publish_logs.py

Писане на нашия тръбопроводен код

Сега, след като сме подготвили всичко, можем да започнем забавната част - кодиране на нашия тръбопровод с помощта на Beam и Python. За да създадем Beam pipeline, трябва да създадем тръбопроводен обект (p). След като сме създали тръбопроводен обект, можем да приложим множество функции една след друга с помощта на оператора pipe (|). Като цяло работният процес изглежда като изображението по-долу.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

В нашия код ще създадем две персонализирани функции. функция regex_clean, който сканира данните и извлича съответния ред въз основа на списъка PATTERNS с помощта на функцията re.search. Функцията връща низ, разделен със запетая. Ако не сте експерт по регулярни изрази, препоръчвам ви да проверите това урок и тренирайте в бележник, за да проверите кода. След това дефинираме персонализирана функция ParDo, наречена разцепен, което е вариант на Beam transform за паралелна обработка. В Python това се прави по специален начин – трябва да създадем клас, който наследява класа DoFn Beam. Функцията Split взема анализирания ред от предишната функция и връща списък с речници с ключове, съответстващи на имената на колоните в нашата таблица BigQuery. Има нещо, което трябва да се отбележи за тази функция: трябваше да импортирам datetime вътре във функция, за да работи. Получавах грешка при импортиране в началото на файла, което беше странно. След това този списък се предава на функцията WriteToBigQuery, което просто добавя нашите данни към таблицата. Кодът за задание за партиден поток от данни и задание за поточно предаване на данни е даден по-долу. Единствената разлика между пакетния и стрийминг кода е, че в пакета четем CSV от src_pathизползване на функцията ReadFromText от Beam.

Пакетно задание на DataFlow (пакетна обработка)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Поточно задание на DataFlow (поточна обработка)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Изпълнение на тръбопровода

Можем да управляваме тръбопровода по няколко различни начина. Ако искахме, бихме могли просто да го стартираме локално от терминал, докато влизаме в GCP дистанционно.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Ние обаче ще го стартираме с помощта на DataFlow. Можем да направим това с помощта на командата по-долу, като зададем следните необходими параметри.

  • project — ID на вашия GCP проект.
  • runner е тръбопровод, който ще анализира вашата програма и ще конструира вашия тръбопровод. За да работите в облака, трябва да посочите DataflowRunner.
  • staging_location — пътят до облачното хранилище на Cloud Dataflow за индексиране на кодови пакети, необходими на процесорите, изпълняващи работата.
  • temp_location — път към облачното хранилище на Cloud Dataflow за съхраняване на временни работни файлове, създадени, докато конвейерът работи.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Докато тази команда се изпълнява, можем да отидем в раздела DataFlow в конзолата на Google и да видим нашия тръбопровод. Когато щракнем върху тръбопровода, трябва да видим нещо подобно на Фигура 4. За целите на отстраняване на грешки може да бъде много полезно да отидете на Регистри и след това на Stackdriver, за да видите подробните регистрационни файлове. Това ми помогна да разреша проблеми с тръбопровода в редица случаи.

Създаваме тръбопровод за обработка на поточни данни. Част 2
Фигура 4: Конвейерна греда

Достъп до нашите данни в BigQuery

Така че вече трябва да имаме работещ конвейер с данни, постъпващи в нашата таблица. За да тестваме това, можем да отидем в BigQuery и да разгледаме данните. След като използвате командата по-долу, трябва да видите първите няколко реда от набора от данни. Сега, когато имаме данните, съхранени в BigQuery, можем да извършим допълнителен анализ, както и да споделим данните с колеги и да започнем да отговаряме на бизнес въпроси.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Създаваме тръбопровод за обработка на поточни данни. Част 2
Фигура 5: BigQuery

Заключение

Надяваме се, че тази публикация служи като полезен пример за създаване на тръбопровод за поточно предаване на данни, както и за намиране на начини да направите данните по-достъпни. Съхраняването на данни в този формат ни дава много предимства. Сега можем да започнем да отговаряме на важни въпроси като колко хора използват нашия продукт? Вашата потребителска база расте ли с времето? С кои аспекти на продукта хората взаимодействат най-много? И има ли грешки там, където не трябва да има? Това са въпросите, които ще представляват интерес за организацията. Въз основа на прозренията, произтичащи от отговорите на тези въпроси, можем да подобрим продукта и да увеличим ангажираността на потребителите.

Beam е наистина полезен за този тип упражнения и има редица други интересни случаи на употреба. Например, може да искате да анализирате данните за тиковете на акциите в реално време и да правите сделки въз основа на анализа, може би имате данни от сензори, идващи от превозни средства и искате да изчислите изчисленията на нивото на трафика. Можете също така, например, да сте компания за игри, която събира потребителски данни и ги използва за създаване на табла за управление за проследяване на ключови показатели. Добре, господа, това е тема за друга публикация, благодаря за четенето, а за тези, които искат да видят пълния код, по-долу е връзката към моя GitHub.

https://github.com/DFoly/User_log_pipeline

Това е всичко. Прочетете първа част.

Източник: www.habr.com

Добавяне на нов коментар