Здравейте всички. Споделяме превода на заключителната част на статията, подготвен специално за студентите от курса.
Apache Beam и DataFlow за тръбопроводи в реално време
Настройване на Google Cloud
Забележка: Използвах Google Cloud Shell, за да стартирам конвейера и да публикувам персонализирани регистрационни данни, защото имах проблеми с стартирането на конвейера в Python 3. Google Cloud Shell използва Python 2, който е по-съвместим с Apache Beam.
За да стартираме тръбопровода, трябва да се поразровим малко в настройките. За тези от вас, които не са използвали GCP преди, ще трябва да следвате следните 6 стъпки, описани в това
След това ще трябва да качим нашите скриптове в Google Cloud Storage и да ги копираме в нашия Google Cloud Shel. Качването в облачно хранилище е доста тривиално (можете да намерите описание
Фигура 2
Командите, от които се нуждаем, за да копираме файловете и да инсталираме необходимите библиотеки, са изброени по-долу.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Създаване на нашата база данни и таблица
След като завършим всички стъпки, свързани с настройката, следващото нещо, което трябва да направим, е да създадем набор от данни и таблица в BigQuery. Има няколко начина да направите това, но най-простият е да използвате Google Cloud конзолата, като първо създадете набор от данни. Можете да следвате стъпките по-долу
Фигура 3. Оформление на таблицата
Публикуване на потребителски регистрационни данни
Pub/Sub е критичен компонент от нашия конвейер, защото позволява на множество независими приложения да комуникират помежду си. По-специално, той работи като посредник, който ни позволява да изпращаме и получаваме съобщения между приложенията. Първото нещо, което трябва да направим, е да създадем тема. Просто отидете на Pub/Sub в конзолата и щракнете върху СЪЗДАВАНЕ НА ТЕМА.
Кодът по-долу извиква нашия скрипт, за да генерира регистрационните данни, дефинирани по-горе, и след това се свързва и изпраща регистрационните файлове към Pub/Sub. Единственото нещо, което трябва да направим, е да създадем обект PublisherClient, посочете пътя до темата с помощта на метода topic_path
и извикайте функцията publish
с topic_path
и данни. Моля, имайте предвид, че ние внасяме generate_log_line
от нашия скрипт stream_logs
, затова се уверете, че тези файлове са в една и съща папка, в противен случай ще получите грешка при импортиране. След това можем да стартираме това през нашата конзола на Google, използвайки:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Веднага щом файлът се стартира, ще можем да видим изходните данни от регистрационния файл към конзолата, както е показано на фигурата по-долу. Този скрипт ще работи, докато не го използваме CTRL + Cза да го завършите.
Фигура 4. Изход publish_logs.py
Писане на нашия тръбопроводен код
Сега, след като сме подготвили всичко, можем да започнем забавната част - кодиране на нашия тръбопровод с помощта на Beam и Python. За да създадем Beam pipeline, трябва да създадем тръбопроводен обект (p). След като сме създали тръбопроводен обект, можем да приложим множество функции една след друга с помощта на оператора pipe (|)
. Като цяло работният процес изглежда като изображението по-долу.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
В нашия код ще създадем две персонализирани функции. функция regex_clean
, който сканира данните и извлича съответния ред въз основа на списъка PATTERNS с помощта на функцията re.search
. Функцията връща низ, разделен със запетая. Ако не сте експерт по регулярни изрази, препоръчвам ви да проверите това datetime
вътре във функция, за да работи. Получавах грешка при импортиране в началото на файла, което беше странно. След това този списък се предава на функцията WriteToBigQuery, което просто добавя нашите данни към таблицата. Кодът за задание за партиден поток от данни и задание за поточно предаване на данни е даден по-долу. Единствената разлика между пакетния и стрийминг кода е, че в пакета четем CSV от src_path
използване на функцията ReadFromText
от Beam.
Пакетно задание на DataFlow (пакетна обработка)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Поточно задание на DataFlow (поточна обработка)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Изпълнение на тръбопровода
Можем да управляваме тръбопровода по няколко различни начина. Ако искахме, бихме могли просто да го стартираме локално от терминал, докато влизаме в GCP дистанционно.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Ние обаче ще го стартираме с помощта на DataFlow. Можем да направим това с помощта на командата по-долу, като зададем следните необходими параметри.
project
— ID на вашия GCP проект.runner
е тръбопровод, който ще анализира вашата програма и ще конструира вашия тръбопровод. За да работите в облака, трябва да посочите DataflowRunner.staging_location
— пътят до облачното хранилище на Cloud Dataflow за индексиране на кодови пакети, необходими на процесорите, изпълняващи работата.temp_location
— път към облачното хранилище на Cloud Dataflow за съхраняване на временни работни файлове, създадени, докато конвейерът работи.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Докато тази команда се изпълнява, можем да отидем в раздела DataFlow в конзолата на Google и да видим нашия тръбопровод. Когато щракнем върху тръбопровода, трябва да видим нещо подобно на Фигура 4. За целите на отстраняване на грешки може да бъде много полезно да отидете на Регистри и след това на Stackdriver, за да видите подробните регистрационни файлове. Това ми помогна да разреша проблеми с тръбопровода в редица случаи.
Фигура 4: Конвейерна греда
Достъп до нашите данни в BigQuery
Така че вече трябва да имаме работещ конвейер с данни, постъпващи в нашата таблица. За да тестваме това, можем да отидем в BigQuery и да разгледаме данните. След като използвате командата по-долу, трябва да видите първите няколко реда от набора от данни. Сега, когато имаме данните, съхранени в BigQuery, можем да извършим допълнителен анализ, както и да споделим данните с колеги и да започнем да отговаряме на бизнес въпроси.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Фигура 5: BigQuery
Заключение
Надяваме се, че тази публикация служи като полезен пример за създаване на тръбопровод за поточно предаване на данни, както и за намиране на начини да направите данните по-достъпни. Съхраняването на данни в този формат ни дава много предимства. Сега можем да започнем да отговаряме на важни въпроси като колко хора използват нашия продукт? Вашата потребителска база расте ли с времето? С кои аспекти на продукта хората взаимодействат най-много? И има ли грешки там, където не трябва да има? Това са въпросите, които ще представляват интерес за организацията. Въз основа на прозренията, произтичащи от отговорите на тези въпроси, можем да подобрим продукта и да увеличим ангажираността на потребителите.
Beam е наистина полезен за този тип упражнения и има редица други интересни случаи на употреба. Например, може да искате да анализирате данните за тиковете на акциите в реално време и да правите сделки въз основа на анализа, може би имате данни от сензори, идващи от превозни средства и искате да изчислите изчисленията на нивото на трафика. Можете също така, например, да сте компания за игри, която събира потребителски данни и ги използва за създаване на табла за управление за проследяване на ключови показатели. Добре, господа, това е тема за друга публикация, благодаря за четенето, а за тези, които искат да видят пълния код, по-долу е връзката към моя GitHub.
Това е всичко.
Източник: www.habr.com