Креирамо ток обраде података. Део 2

Здраво свима. Делимо превод завршног дела чланка, припремљен посебно за студенте курса. Дата Енгинеер. Можете прочитати први део овде.

Апацхе Беам и ДатаФлов за цевоводе у реалном времену

Креирамо ток обраде података. Део 2

Подешавање Гоогле Цлоуд-а

Напомена: Користио сам Гоогле Цлоуд Схелл да покренем цевовод и објавим прилагођене податке евиденције јер сам имао проблема са покретањем цевовода у Питхон-у 3. Гоогле Цлоуд Схелл користи Питхон 2, који је конзистентнији са Апацхе Беам-ом.

Да бисмо покренули цевовод, морамо мало да копамо у подешавања. За оне од вас који раније нису користили ГЦП, мораћете да пратите следећих 6 корака наведених у овоме страна.

Након овога, мораћемо да отпремимо наше скрипте у Гоогле Цлоуд Стораге и копирамо их у наш Гоогле Цлоуд Схел. Отпремање у складиште у облаку је прилично тривијално (може се пронаћи опис овде). Да бисмо копирали наше датотеке, можемо да отворимо Гоогле Цлоуд Схел са траке са алаткама тако што ћемо кликнути на прву икону са леве стране на слици 2 испод.

Креирамо ток обраде података. Део 2
Слика КСНУМКС

Команде које су нам потребне за копирање датотека и инсталирање потребних библиотека су наведене у наставку.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Креирање наше базе података и табеле

Када завршимо све кораке у вези са подешавањем, следећа ствар коју треба да урадимо је да креирамо скуп података и табелу у БигКуери-ју. Постоји неколико начина да се то уради, али најједноставнији је да користите Гоогле Цлоуд конзолу тако што ћете прво направити скуп података. Можете пратити доле наведене кораке везада направите табелу са шемом. Наш сто ће имати 7 колона, што одговара компонентама сваког корисничког дневника. Ради погодности, дефинисаћемо све колоне као стрингове, осим временске локалне променљиве, и именовати их према променљивим које смо раније генерисали. Изглед наше табеле треба да изгледа као на слици 3.

Креирамо ток обраде података. Део 2
Слика 3. Изглед табеле

Објављивање података дневника корисника

Пуб/Суб је критична компонента нашег цевовода јер омогућава више независних апликација да међусобно комуницирају. Конкретно, ради као посредник који нам омогућава да шаљемо и примамо поруке између апликација. Прво што треба да урадимо је да направимо тему. Једноставно идите на Пуб/Суб у конзоли и кликните на КРЕИРАЈ ТЕМУ.

Код у наставку позива нашу скрипту да генерише податке дневника дефинисане изнад, а затим повезује и шаље евиденције у Пуб/Суб. Једино што треба да урадимо је да направимо објекат ПублисхерЦлиент, наведите путању до теме користећи метод topic_path и позовите функцију publish с topic_path и података. Имајте на уму да увозимо generate_log_line из нашег сценарија stream_logs, па се уверите да су ове датотеке у истој фасцикли, иначе ћете добити грешку при увозу. Затим ово можемо покренути кроз нашу Гоогле конзолу користећи:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Чим се датотека покрене, моћи ћемо да видимо излаз података дневника на конзолу, као што је приказано на слици испод. Ова скрипта ће радити све док је не користимо ЦТРЛ + Цда га доврши.

Креирамо ток обраде података. Део 2
Слика 4. Излаз publish_logs.py

Писање нашег кода за цевовод

Сада када смо све припремили, можемо да почнемо са забавним делом - кодирањем нашег цевовода користећи Беам и Питхон. Да бисмо креирали цевовод Беам, морамо да креирамо објекат цевовода (п). Када креирамо објекат цевовода, можемо применити више функција једну за другом користећи оператор pipe (|). Генерално, ток посла изгледа као на слици испод.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

У нашем коду ћемо креирати две прилагођене функције. Функција regex_cleanкоји скенира податке и преузима одговарајући ред на основу листе ПАТТЕРНС користећи функцију re.search. Функција враћа стринг раздвојен зарезима. Ако нисте стручњак за регуларне изразе, препоручујем да погледате ово Приручник и вежбајте у бележници да проверите код. Након овога дефинишемо прилагођену ПарДо функцију која се зове сплит, што је варијација Беам трансформације за паралелну обраду. У Пајтону се то ради на посебан начин – морамо креирати класу која наслеђује класу ДоФн Беам. Функција Сплит узима рашчлањени ред из претходне функције и враћа листу речника са кључевима који одговарају називима колона у нашој БигКуери табели. Има нешто да се примети у вези са овом функцијом: морао сам да увезем datetime унутар функције да би она функционисала. Добијао сам грешку при увозу на почетку датотеке, што је било чудно. Ова листа се затим прослеђује функцији ВритеТоБигКуери, који једноставно додаје наше податке у табелу. Код за Батцх ДатаФлов посао и Стреаминг ДатаФлов посао је дат у наставку. Једина разлика између пакетног и стриминг кода је у томе што у групи читамо ЦСВ из src_pathкористећи функцију ReadFromText фром Беам.

Батцх ДатаФлов посао (скупна обрада)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Стреаминг ДатаФлов посао (обрада стрима)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Покретање транспортера

Можемо водити цевовод на неколико различитих начина. Ако желимо, могли бисмо само да га покренемо локално са терминала док се даљински пријављујемо на ГЦП.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Међутим, ми ћемо га покренути користећи ДатаФлов. То можемо урадити помоћу доње команде постављањем следећих потребних параметара.

  • project — ИД вашег ГЦП пројекта.
  • runner је цевовод који ће анализирати ваш програм и конструисати ваш цевовод. Да бисте покренули у облаку, морате навести ДатафловРуннер.
  • staging_location — путања до Цлоуд Датафлов складишта у облаку за индексирање пакета кодова потребних процесорима који обављају посао.
  • temp_location — путања до Цлоуд Датафлов складишта у облаку за складиштење привремених датотека послова креираних док је цевовод покренут.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Док је ова команда покренута, можемо да одемо на картицу ДатаФлов у гоогле конзоли и погледамо наш цевовод. Када кликнемо на цевовод, требало би да видимо нешто слично као на слици 4. За потребе отклањања грешака, може бити од велике помоћи да одете на Логс, а затим на Стацкдривер да видите детаљне евиденције. Ово ми је помогло да решим проблеме са цевоводом у бројним случајевима.

Креирамо ток обраде података. Део 2
Слика 4: Транспортер са гредама

Приступите нашим подацима у БигКуери-ју

Дакле, већ би требало да имамо цевовод са подацима који теку у нашу табелу. Да бисмо ово тестирали, можемо да одемо на БигКуери и погледамо податке. Након коришћења наредбе испод, требало би да видите првих неколико редова скупа података. Сада када имамо податке ускладиштене у БигКуери-ју, можемо да спроведемо даљу анализу, као и да поделимо податке са колегама и почнемо да одговарамо на пословна питања.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Креирамо ток обраде података. Део 2
Слика 5: БигКуери

Закључак

Надамо се да ће овај пост послужити као користан пример за креирање цевовода за стримовање података, као и за проналажење начина да подаци буду доступнији. Чување података у овом формату даје нам многе предности. Сада можемо да почнемо да одговарамо на важна питања као што је колико људи користи наш производ? Да ли ваша корисничка база временом расте? Са којим аспектима производа људи највише комуницирају? А има ли грешака тамо где не би требало да буде? Ово су питања која ће интересовати организацију. На основу увида који произилазе из одговора на ова питања, можемо побољшати производ и повећати ангажовање корисника.

Беам је заиста користан за ову врсту вежби и има низ других занимљивих случајева употребе. На пример, можда ћете желети да анализирате податке о акцијама у реалном времену и да тргујете на основу анализе, можда имате податке сензора који долазе из возила и желите да израчунате калкулације нивоа саобраћаја. Такође можете, на пример, да будете компанија за игре на срећу која прикупља корисничке податке и користи их за креирање контролних табли за праћење кључних метрика. Добро, господо, ово је тема за још један пост, хвала на читању, а за оне који желе да виде цео код, испод је линк до мог ГитХуб-а.

https://github.com/DFoly/User_log_pipeline

То је све. Прочитај први део.

Извор: ввв.хабр.цом

Додај коментар