Здраво свима. Делимо превод завршног дела чланка, припремљен посебно за студенте курса.
Апацхе Беам и ДатаФлов за цевоводе у реалном времену
Подешавање Гоогле Цлоуд-а
Напомена: Користио сам Гоогле Цлоуд Схелл да покренем цевовод и објавим прилагођене податке евиденције јер сам имао проблема са покретањем цевовода у Питхон-у 3. Гоогле Цлоуд Схелл користи Питхон 2, који је конзистентнији са Апацхе Беам-ом.
Да бисмо покренули цевовод, морамо мало да копамо у подешавања. За оне од вас који раније нису користили ГЦП, мораћете да пратите следећих 6 корака наведених у овоме
Након овога, мораћемо да отпремимо наше скрипте у Гоогле Цлоуд Стораге и копирамо их у наш Гоогле Цлоуд Схел. Отпремање у складиште у облаку је прилично тривијално (може се пронаћи опис
Слика КСНУМКС
Команде које су нам потребне за копирање датотека и инсталирање потребних библиотека су наведене у наставку.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Креирање наше базе података и табеле
Када завршимо све кораке у вези са подешавањем, следећа ствар коју треба да урадимо је да креирамо скуп података и табелу у БигКуери-ју. Постоји неколико начина да се то уради, али најједноставнији је да користите Гоогле Цлоуд конзолу тако што ћете прво направити скуп података. Можете пратити доле наведене кораке
Слика 3. Изглед табеле
Објављивање података дневника корисника
Пуб/Суб је критична компонента нашег цевовода јер омогућава више независних апликација да међусобно комуницирају. Конкретно, ради као посредник који нам омогућава да шаљемо и примамо поруке између апликација. Прво што треба да урадимо је да направимо тему. Једноставно идите на Пуб/Суб у конзоли и кликните на КРЕИРАЈ ТЕМУ.
Код у наставку позива нашу скрипту да генерише податке дневника дефинисане изнад, а затим повезује и шаље евиденције у Пуб/Суб. Једино што треба да урадимо је да направимо објекат ПублисхерЦлиент, наведите путању до теме користећи метод topic_path
и позовите функцију publish
с topic_path
и података. Имајте на уму да увозимо generate_log_line
из нашег сценарија stream_logs
, па се уверите да су ове датотеке у истој фасцикли, иначе ћете добити грешку при увозу. Затим ово можемо покренути кроз нашу Гоогле конзолу користећи:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Чим се датотека покрене, моћи ћемо да видимо излаз података дневника на конзолу, као што је приказано на слици испод. Ова скрипта ће радити све док је не користимо ЦТРЛ + Цда га доврши.
Слика 4. Излаз publish_logs.py
Писање нашег кода за цевовод
Сада када смо све припремили, можемо да почнемо са забавним делом - кодирањем нашег цевовода користећи Беам и Питхон. Да бисмо креирали цевовод Беам, морамо да креирамо објекат цевовода (п). Када креирамо објекат цевовода, можемо применити више функција једну за другом користећи оператор pipe (|)
. Генерално, ток посла изгледа као на слици испод.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
У нашем коду ћемо креирати две прилагођене функције. Функција regex_clean
који скенира податке и преузима одговарајући ред на основу листе ПАТТЕРНС користећи функцију re.search
. Функција враћа стринг раздвојен зарезима. Ако нисте стручњак за регуларне изразе, препоручујем да погледате ово datetime
унутар функције да би она функционисала. Добијао сам грешку при увозу на почетку датотеке, што је било чудно. Ова листа се затим прослеђује функцији ВритеТоБигКуери, који једноставно додаје наше податке у табелу. Код за Батцх ДатаФлов посао и Стреаминг ДатаФлов посао је дат у наставку. Једина разлика између пакетног и стриминг кода је у томе што у групи читамо ЦСВ из src_path
користећи функцију ReadFromText
фром Беам.
Батцх ДатаФлов посао (скупна обрада)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Стреаминг ДатаФлов посао (обрада стрима)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Покретање транспортера
Можемо водити цевовод на неколико различитих начина. Ако желимо, могли бисмо само да га покренемо локално са терминала док се даљински пријављујемо на ГЦП.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Међутим, ми ћемо га покренути користећи ДатаФлов. То можемо урадити помоћу доње команде постављањем следећих потребних параметара.
project
— ИД вашег ГЦП пројекта.runner
је цевовод који ће анализирати ваш програм и конструисати ваш цевовод. Да бисте покренули у облаку, морате навести ДатафловРуннер.staging_location
— путања до Цлоуд Датафлов складишта у облаку за индексирање пакета кодова потребних процесорима који обављају посао.temp_location
— путања до Цлоуд Датафлов складишта у облаку за складиштење привремених датотека послова креираних док је цевовод покренут.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Док је ова команда покренута, можемо да одемо на картицу ДатаФлов у гоогле конзоли и погледамо наш цевовод. Када кликнемо на цевовод, требало би да видимо нешто слично као на слици 4. За потребе отклањања грешака, може бити од велике помоћи да одете на Логс, а затим на Стацкдривер да видите детаљне евиденције. Ово ми је помогло да решим проблеме са цевоводом у бројним случајевима.
Слика 4: Транспортер са гредама
Приступите нашим подацима у БигКуери-ју
Дакле, већ би требало да имамо цевовод са подацима који теку у нашу табелу. Да бисмо ово тестирали, можемо да одемо на БигКуери и погледамо податке. Након коришћења наредбе испод, требало би да видите првих неколико редова скупа података. Сада када имамо податке ускладиштене у БигКуери-ју, можемо да спроведемо даљу анализу, као и да поделимо податке са колегама и почнемо да одговарамо на пословна питања.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Слика 5: БигКуери
Закључак
Надамо се да ће овај пост послужити као користан пример за креирање цевовода за стримовање података, као и за проналажење начина да подаци буду доступнији. Чување података у овом формату даје нам многе предности. Сада можемо да почнемо да одговарамо на важна питања као што је колико људи користи наш производ? Да ли ваша корисничка база временом расте? Са којим аспектима производа људи највише комуницирају? А има ли грешака тамо где не би требало да буде? Ово су питања која ће интересовати организацију. На основу увида који произилазе из одговора на ова питања, можемо побољшати производ и повећати ангажовање корисника.
Беам је заиста користан за ову врсту вежби и има низ других занимљивих случајева употребе. На пример, можда ћете желети да анализирате податке о акцијама у реалном времену и да тргујете на основу анализе, можда имате податке сензора који долазе из возила и желите да израчунате калкулације нивоа саобраћаја. Такође можете, на пример, да будете компанија за игре на срећу која прикупља корисничке податке и користи их за креирање контролних табли за праћење кључних метрика. Добро, господо, ово је тема за још један пост, хвала на читању, а за оне који желе да виде цео код, испод је линк до мог ГитХуб-а.
То је све.
Извор: ввв.хабр.цом